Now that the BTLs are moving down to OPAL and becoming available to ORTE, there no longer is a need/desire to push performance in the OOB/TCP component. So we don't need multiple modules driving NICs in parallel, and can drop all the complicated distribution logic. Fall back to the simplified single module model, but retain the ability to run that module in its own progress thread if so directed.
This should eliminate the connectivity issues that have been reported, and will make maintenance of this component much easier. cmr=v1.8.2:reviewer=jsquyres:subject=simplify the OOB/TCP component This commit was SVN r31956.
Этот коммит содержится в:
родитель
f5ebd2faeb
Коммит
e21bfeadcd
@ -69,21 +69,17 @@
|
||||
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_ping.h"
|
||||
|
||||
static void tcp_init(struct mca_oob_tcp_module_t *mod);
|
||||
static void tcp_fini(struct mca_oob_tcp_module_t *mod);
|
||||
static void accept_connection(struct mca_oob_tcp_module_t *md,
|
||||
const int accepted_fd,
|
||||
static void tcp_init(void);
|
||||
static void tcp_fini(void);
|
||||
static void accept_connection(const int accepted_fd,
|
||||
const struct sockaddr *addr);
|
||||
static void set_peer(struct mca_oob_tcp_module_t *md,
|
||||
const orte_process_name_t* name,
|
||||
static void set_peer(const orte_process_name_t* name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports);
|
||||
static void ping(struct mca_oob_tcp_module_t *mod,
|
||||
const orte_process_name_t *proc);
|
||||
static void send_nb(struct mca_oob_tcp_module_t *mod,
|
||||
orte_rml_send_t *msg);
|
||||
static void ping(const orte_process_name_t *proc);
|
||||
static void send_nb(orte_rml_send_t *msg);
|
||||
static void resend(struct mca_oob_tcp_msg_error_t *mop);
|
||||
static void ft_event(struct mca_oob_tcp_module_t *mod, int state);
|
||||
static void ft_event(int state);
|
||||
|
||||
mca_oob_tcp_module_t mca_oob_tcp_module = {
|
||||
{
|
||||
@ -104,15 +100,12 @@ mca_oob_tcp_module_t mca_oob_tcp_module = {
|
||||
static void recv_handler(int sd, short flags, void* user);
|
||||
static void* progress_thread_engine(opal_object_t *obj)
|
||||
{
|
||||
opal_thread_t *t = (opal_thread_t*)obj;
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)t->t_arg;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s PROGRESS THREAD RUNNING ON INTERFACE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mod->if_name);
|
||||
"%s TCP OOB PROGRESS THREAD RUNNING",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
while (mod->ev_active) {
|
||||
opal_event_loop(mod->ev_base, OPAL_EVLOOP_ONCE);
|
||||
while (mca_oob_tcp_module.ev_active) {
|
||||
opal_event_loop(mca_oob_tcp_module.ev_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
return OPAL_THREAD_CANCELLED;
|
||||
}
|
||||
@ -121,32 +114,29 @@ static void* progress_thread_engine(opal_object_t *obj)
|
||||
/*
|
||||
* Initialize global variables used w/in this module.
|
||||
*/
|
||||
static void tcp_init(struct mca_oob_tcp_module_t *md)
|
||||
static void tcp_init(void)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
|
||||
/* setup the module's state variables */
|
||||
OBJ_CONSTRUCT(&mod->peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mod->peers, 32);
|
||||
mod->ev_active = false;
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_oob_tcp_module.peers, 32);
|
||||
mca_oob_tcp_module.ev_active = false;
|
||||
|
||||
if (orte_oob_base.use_module_threads) {
|
||||
/* if we are to use independent progress threads at
|
||||
* the module level, start it now
|
||||
*/
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s STARTING TCP PROGRESS THREAD ON INTERFACE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mod->if_name);
|
||||
mod->ev_base = opal_event_base_create();
|
||||
"%s STARTING TCP PROGRESS THREAD",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
mca_oob_tcp_module.ev_base = opal_event_base_create();
|
||||
/* construct the thread object */
|
||||
OBJ_CONSTRUCT(&mod->progress_thread, opal_thread_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.progress_thread, opal_thread_t);
|
||||
/* fork off a thread to progress it */
|
||||
mod->progress_thread.t_run = progress_thread_engine;
|
||||
mod->progress_thread.t_arg = mod;
|
||||
mod->ev_active = true;
|
||||
if (OPAL_SUCCESS != opal_thread_start(&mod->progress_thread)) {
|
||||
opal_output(0, "%s progress thread failed to start for interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mod->if_name);
|
||||
mca_oob_tcp_module.progress_thread.t_run = progress_thread_engine;
|
||||
mca_oob_tcp_module.ev_active = true;
|
||||
if (OPAL_SUCCESS != opal_thread_start(&mca_oob_tcp_module.progress_thread)) {
|
||||
opal_output(0, "%s progress thread failed to start",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -154,15 +144,14 @@ static void tcp_init(struct mca_oob_tcp_module_t *md)
|
||||
/*
|
||||
* Module cleanup.
|
||||
*/
|
||||
static void tcp_fini(struct mca_oob_tcp_module_t *md)
|
||||
static void tcp_fini(void)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
uint64_t ui64;
|
||||
char *nptr;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
|
||||
/* cleanup all peers */
|
||||
if (OPAL_SUCCESS == opal_hash_table_get_first_key_uint64(&mod->peers, &ui64,
|
||||
if (OPAL_SUCCESS == opal_hash_table_get_first_key_uint64(&mca_oob_tcp_module.peers, &ui64,
|
||||
(void**)&peer, (void**)&nptr)) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s RELEASING PEER OBJ %s",
|
||||
@ -171,7 +160,7 @@ static void tcp_fini(struct mca_oob_tcp_module_t *md)
|
||||
if (NULL != peer) {
|
||||
OBJ_RELEASE(peer);
|
||||
}
|
||||
while (OPAL_SUCCESS == opal_hash_table_get_next_key_uint64(&mod->peers, &ui64,
|
||||
while (OPAL_SUCCESS == opal_hash_table_get_next_key_uint64(&mca_oob_tcp_module.peers, &ui64,
|
||||
(void**)&peer, nptr, (void**)&nptr)) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s RELEASING PEER OBJ %s",
|
||||
@ -182,24 +171,24 @@ static void tcp_fini(struct mca_oob_tcp_module_t *md)
|
||||
}
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&mod->peers);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peers);
|
||||
|
||||
if (mod->ev_active) {
|
||||
if (mca_oob_tcp_module.ev_active) {
|
||||
/* if we used an independent progress thread at
|
||||
* the module level, stop it now
|
||||
*/
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s STOPPING TCP PROGRESS THREAD ON INTERFACE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), mod->if_name);
|
||||
"%s STOPPING TCP PROGRESS THREAD",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* stop the progress thread */
|
||||
mod->ev_active = false;
|
||||
mca_oob_tcp_module.ev_active = false;
|
||||
/* break the event loop */
|
||||
opal_event_base_loopexit(mod->ev_base);
|
||||
opal_event_base_loopexit(mca_oob_tcp_module.ev_base);
|
||||
/* wait for thread to exit */
|
||||
opal_thread_join(&mod->progress_thread, NULL);
|
||||
OBJ_DESTRUCT(&mod->progress_thread);
|
||||
opal_thread_join(&mca_oob_tcp_module.progress_thread, NULL);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.progress_thread);
|
||||
/* release the event base */
|
||||
opal_event_base_free(mod->ev_base);
|
||||
opal_event_base_free(mca_oob_tcp_module.ev_base);
|
||||
}
|
||||
}
|
||||
|
||||
@ -209,8 +198,7 @@ static void tcp_fini(struct mca_oob_tcp_module_t *md)
|
||||
* OOB-level connection handshake. Used in both the threaded and
|
||||
* event listen modes.
|
||||
*/
|
||||
static void accept_connection(struct mca_oob_tcp_module_t *md,
|
||||
const int accepted_fd,
|
||||
static void accept_connection(const int accepted_fd,
|
||||
const struct sockaddr *addr)
|
||||
{
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
@ -225,7 +213,7 @@ static void accept_connection(struct mca_oob_tcp_module_t *md,
|
||||
/* use a one-time event to wait for receipt of peer's
|
||||
* process ident message to complete this connection
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_ACCEPT_STATE((mca_oob_tcp_module_t*)md, accepted_fd, addr, recv_handler);
|
||||
ORTE_ACTIVATE_TCP_ACCEPT_STATE(accepted_fd, addr, recv_handler);
|
||||
}
|
||||
|
||||
static int parse_uri(const uint16_t af_family,
|
||||
@ -290,14 +278,14 @@ static void process_set_peer(int fd, short args, void *cbdata)
|
||||
mca_oob_tcp_addr_t *maddr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp:processing set_peer cmd for interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pop->mod->if_name);
|
||||
"%s:tcp:processing set_peer cmd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
if (AF_INET != pop->af_family) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(pop->mod, &pop->peer))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&pop->peer))) {
|
||||
peer = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
peer->name.jobid = pop->peer.jobid;
|
||||
peer->name.vpid = pop->peer.vpid;
|
||||
@ -305,7 +293,7 @@ static void process_set_peer(int fd, short args, void *cbdata)
|
||||
"%s SET_PEER ADDING PEER %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&(pop->mod)->peers, (*ui64), peer)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) {
|
||||
OBJ_RELEASE(peer);
|
||||
return;
|
||||
}
|
||||
@ -325,36 +313,30 @@ static void process_set_peer(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
opal_output_verbose(20, orte_oob_base_framework.framework_output,
|
||||
"%s set_peer: peer %s is listening on interface %s net %s port %s",
|
||||
"%s set_peer: peer %s is listening on net %s port %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer),
|
||||
pop->mod->if_name,
|
||||
(NULL == pop->net) ? "NULL" : pop->net,
|
||||
(NULL == pop->port) ? "NULL" : pop->port);
|
||||
maddr = OBJ_NEW(mca_oob_tcp_addr_t);
|
||||
memcpy(&maddr->addr, &inaddr, sizeof(inaddr));
|
||||
opal_list_append(&peer->addrs, &maddr->super);
|
||||
/* track our module */
|
||||
peer->mod = pop->mod;
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(pop);
|
||||
}
|
||||
|
||||
static void set_peer(struct mca_oob_tcp_module_t *md,
|
||||
const orte_process_name_t *name,
|
||||
static void set_peer(const orte_process_name_t *name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp set addr for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(name));
|
||||
|
||||
/* have to push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_PEER_OP(mod, name, af_family, net, ports, process_set_peer);
|
||||
ORTE_ACTIVATE_TCP_PEER_OP(name, af_family, net, ports, process_set_peer);
|
||||
}
|
||||
|
||||
|
||||
@ -371,7 +353,7 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
|
||||
/* do we know this peer? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &op->peer))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&op->peer))) {
|
||||
/* push this back to the component so it can try
|
||||
* another module within this transport. If no
|
||||
* module can be found, the component can push back
|
||||
@ -382,7 +364,7 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->peer));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(op->mod, NULL, NULL, &op->peer, mca_oob_tcp_component_hop_unknown);
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, NULL, &op->peer, mca_oob_tcp_component_hop_unknown);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
@ -409,17 +391,14 @@ static void process_ping(int fd, short args, void *cbdata)
|
||||
|
||||
/* attempt the connection */
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(op->mod, peer, mca_oob_tcp_peer_try_connect);
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
static void ping(struct mca_oob_tcp_module_t *md,
|
||||
const orte_process_name_t *proc)
|
||||
static void ping(const orte_process_name_t *proc)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s:[%s:%d] pinging peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -427,7 +406,7 @@ static void ping(struct mca_oob_tcp_module_t *md,
|
||||
ORTE_NAME_PRINT(proc));
|
||||
|
||||
/* push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_PING(mod, proc, process_ping);
|
||||
ORTE_ACTIVATE_TCP_PING(proc, process_ping);
|
||||
}
|
||||
|
||||
static void process_send(int fd, short args, void *cbdata)
|
||||
@ -445,7 +424,7 @@ static void process_send(int fd, short args, void *cbdata)
|
||||
/* do we have a route to this peer (could be direct)? */
|
||||
hop = orte_routed.get_route(&op->msg->dst);
|
||||
/* do we know this hop? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &hop))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&hop))) {
|
||||
/* push this back to the component so it can try
|
||||
* another module within this transport. If no
|
||||
* module can be found, the component can push back
|
||||
@ -456,7 +435,7 @@ static void process_send(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&hop));
|
||||
ORTE_ACTIVATE_TCP_NO_ROUTE(op->mod, op->msg, &hop, mca_oob_tcp_component_no_route);
|
||||
ORTE_ACTIVATE_TCP_NO_ROUTE(op->msg, &hop, mca_oob_tcp_component_no_route);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
@ -487,25 +466,22 @@ static void process_send(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(op->mod, peer, mca_oob_tcp_peer_try_connect);
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
static void send_nb(struct mca_oob_tcp_module_t *md,
|
||||
orte_rml_send_t *msg)
|
||||
static void send_nb(orte_rml_send_t *msg)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:send_nb to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&msg->dst));
|
||||
|
||||
/* push this into our event base for processing */
|
||||
ORTE_ACTIVATE_TCP_POST_SEND(mod, msg, process_send);
|
||||
ORTE_ACTIVATE_TCP_POST_SEND(msg, process_send);
|
||||
}
|
||||
|
||||
static void process_resend(int fd, short args, void *cbdata)
|
||||
@ -519,7 +495,7 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(&op->hop));
|
||||
|
||||
/* do we know this peer? */
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &op->hop))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&op->hop))) {
|
||||
/* push this back to the component so it can try
|
||||
* another module within this transport. If no
|
||||
* module can be found, the component can push back
|
||||
@ -530,7 +506,7 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
__FILE__, __LINE__,
|
||||
ORTE_NAME_PRINT(&op->hop));
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(op->mod, op->snd, NULL, &op->hop, mca_oob_tcp_component_hop_unknown);
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(op->snd, NULL, &op->hop, mca_oob_tcp_component_hop_unknown);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
@ -560,7 +536,7 @@ static void process_resend(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(op->mod, peer, mca_oob_tcp_peer_try_connect);
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
@ -600,15 +576,15 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* get the handshake */
|
||||
if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(op->mod, NULL, sd, &hdr)) {
|
||||
if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(NULL, sd, &hdr)) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* finish processing ident */
|
||||
if (MCA_OOB_TCP_IDENT == hdr.type) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &hdr.origin))) {
|
||||
if (NULL == (peer = mca_oob_tcp_peer_lookup(&hdr.origin))) {
|
||||
/* should never happen */
|
||||
mca_oob_tcp_peer_close(op->mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
goto cleanup;
|
||||
}
|
||||
/* set socket up to be non-blocking */
|
||||
@ -625,7 +601,7 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
|
||||
/* is the peer instance willing to accept this connection */
|
||||
peer->sd = sd;
|
||||
if (mca_oob_tcp_peer_accept(op->mod, peer) == false) {
|
||||
if (mca_oob_tcp_peer_accept(peer) == false) {
|
||||
if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
|
||||
opal_output(0, "%s-%s mca_oob_tcp_recv_connect: "
|
||||
"rejected connection from %s connection state %d",
|
||||
@ -636,7 +612,7 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
}
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
ui64 = (uint64_t*)(&peer->name);
|
||||
opal_hash_table_set_value_uint64(&op->mod->peers, (*ui64), NULL);
|
||||
opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), NULL);
|
||||
OBJ_RELEASE(peer);
|
||||
}
|
||||
}
|
||||
@ -647,13 +623,13 @@ static void recv_handler(int sd, short flg, void *cbdata)
|
||||
|
||||
/* Dummy function for when we are not using FT. */
|
||||
#if OPAL_ENABLE_FT_CR == 0
|
||||
static void ft_event(struct mca_oob_tcp_module_t *mod, int state)
|
||||
static void ft_event(int state)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
#else
|
||||
static void ft_event(struct mca_oob_tcp_module_t *mod, int state) {
|
||||
static void ft_event(int state) {
|
||||
#if 0
|
||||
opal_list_item_t *item;
|
||||
#endif
|
||||
@ -678,9 +654,9 @@ static void ft_event(struct mca_oob_tcp_module_t *mod, int state) {
|
||||
* Clean out cached connection information
|
||||
* Select pieces of finalize/init
|
||||
*/
|
||||
for (item = opal_list_remove_first(&mod->peer_list);
|
||||
for (item = opal_list_remove_first(&mca_oob_tcp_module.peer_list);
|
||||
item != NULL;
|
||||
item = opal_list_remove_first(&mod->peer_list)) {
|
||||
item = opal_list_remove_first(&mca_oob_tcp_module.peer_list)) {
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)item;
|
||||
/* JJH: Use the below command for debugging restarts with invalid sockets
|
||||
* mca_oob_tcp_peer_dump(peer, "RESTART CLEAN")
|
||||
@ -688,15 +664,15 @@ static void ft_event(struct mca_oob_tcp_module_t *mod, int state) {
|
||||
MCA_OOB_TCP_PEER_RETURN(peer);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&mod->peer_free);
|
||||
OBJ_DESTRUCT(&mod->peer_names);
|
||||
OBJ_DESTRUCT(&mod->peers);
|
||||
OBJ_DESTRUCT(&mod->peer_list);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peer_free);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peer_names);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peers);
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_module.peer_list);
|
||||
|
||||
OBJ_CONSTRUCT(&mod->peer_list, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mod->peers, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&mod->peer_names, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&mod->peer_free, opal_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_list, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peers, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_names, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_module.peer_free, opal_free_list_t);
|
||||
|
||||
/*
|
||||
* Resume event processing
|
||||
|
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -54,21 +55,17 @@ typedef struct {
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_nicaddr_t);
|
||||
|
||||
/* Module definition */
|
||||
typedef void (*mca_oob_tcp_module_init_fn_t)(struct mca_oob_tcp_module_t *mod);
|
||||
typedef void (*mca_oob_tcp_module_fini_fn_t)(struct mca_oob_tcp_module_t *mod);
|
||||
typedef void (*mca_oob_tcp_module_accept_connection_fn_t)(struct mca_oob_tcp_module_t *md,
|
||||
const int accepted_fd,
|
||||
typedef void (*mca_oob_tcp_module_init_fn_t)(void);
|
||||
typedef void (*mca_oob_tcp_module_fini_fn_t)(void);
|
||||
typedef void (*mca_oob_tcp_module_accept_connection_fn_t)(const int accepted_fd,
|
||||
const struct sockaddr *addr);
|
||||
typedef void (*mca_oob_tcp_module_set_peer_fn_t)(struct mca_oob_tcp_module_t *mod,
|
||||
const orte_process_name_t* name,
|
||||
typedef void (*mca_oob_tcp_module_set_peer_fn_t)(const orte_process_name_t* name,
|
||||
const uint16_t af_family,
|
||||
const char *net, const char *ports);
|
||||
typedef void (*mca_oob_tcp_module_ping_fn_t)(struct mca_oob_tcp_module_t *mod,
|
||||
const orte_process_name_t *proc);
|
||||
typedef void (*mca_oob_tcp_module_send_nb_fn_t)(struct mca_oob_tcp_module_t *mod,
|
||||
orte_rml_send_t *msg);
|
||||
typedef void (*mca_oob_tcp_module_ping_fn_t)(const orte_process_name_t *proc);
|
||||
typedef void (*mca_oob_tcp_module_send_nb_fn_t)(orte_rml_send_t *msg);
|
||||
typedef void (*mca_oob_tcp_module_resend_nb_fn_t)(struct mca_oob_tcp_msg_error_t *mop);
|
||||
typedef void (*mca_oob_tcp_module_ft_event_fn_t)(struct mca_oob_tcp_module_t *mod, int state);
|
||||
typedef void (*mca_oob_tcp_module_ft_event_fn_t)(int state);
|
||||
|
||||
typedef struct {
|
||||
mca_oob_tcp_module_init_fn_t init;
|
||||
@ -82,14 +79,9 @@ typedef struct {
|
||||
} mca_oob_tcp_module_api_t;
|
||||
typedef struct {
|
||||
mca_oob_tcp_module_api_t api;
|
||||
int idx; // index in the module array
|
||||
opal_event_base_t *ev_base; /* event base for the module progress thread */
|
||||
bool ev_active;
|
||||
opal_thread_t progress_thread;
|
||||
int af_family; // interface family - v4 or v6
|
||||
char* if_name; /* string name of the interface */
|
||||
int if_kidx; /* interface kernel index */
|
||||
opal_list_t addresses; /* list of addresses served by this NIC */
|
||||
opal_hash_table_t peers; // connection addresses for peers
|
||||
} mca_oob_tcp_module_t;
|
||||
ORTE_MODULE_DECLSPEC extern mca_oob_tcp_module_t mca_oob_tcp_module;
|
||||
|
@ -13,6 +13,7 @@
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -106,15 +107,13 @@ void orte_oob_tcp_set_socket_options(int sd)
|
||||
#endif
|
||||
}
|
||||
|
||||
mca_oob_tcp_peer_t* mca_oob_tcp_peer_lookup(mca_oob_tcp_module_t *md,
|
||||
const orte_process_name_t *name)
|
||||
mca_oob_tcp_peer_t* mca_oob_tcp_peer_lookup(const orte_process_name_t *name)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)md;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
uint64_t ui64;
|
||||
|
||||
memcpy(&ui64, (char*)name, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mod->peers, ui64, (void**)&peer)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_module.peers, ui64, (void**)&peer)) {
|
||||
return NULL;
|
||||
}
|
||||
return peer;
|
||||
|
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -29,6 +30,5 @@
|
||||
|
||||
ORTE_MODULE_DECLSPEC void orte_oob_tcp_set_socket_options(int sd);
|
||||
ORTE_MODULE_DECLSPEC char* mca_oob_tcp_state_print(mca_oob_tcp_state_t state);
|
||||
ORTE_MODULE_DECLSPEC mca_oob_tcp_peer_t* mca_oob_tcp_peer_lookup(mca_oob_tcp_module_t *md,
|
||||
const orte_process_name_t *name);
|
||||
ORTE_MODULE_DECLSPEC mca_oob_tcp_peer_t* mca_oob_tcp_peer_lookup(const orte_process_name_t *name);
|
||||
#endif /* _MCA_OOB_TCP_COMMON_H_ */
|
||||
|
@ -145,12 +145,8 @@ static int tcp_component_open(void)
|
||||
mca_oob_tcp_component.listen_thread_tv.tv_usec = 0;
|
||||
}
|
||||
mca_oob_tcp_component.addr_count = 0;
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.modules, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&mca_oob_tcp_component.modules, 4, INT_MAX, 2);
|
||||
mca_oob_tcp_component.ipv4conns = NULL;
|
||||
mca_oob_tcp_component.ipv4ports = NULL;
|
||||
OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
|
||||
|
||||
#if OPAL_ENABLE_IPV6
|
||||
mca_oob_tcp_component.ipv6conns = NULL;
|
||||
@ -179,31 +175,9 @@ static int tcp_component_open(void)
|
||||
*/
|
||||
static int tcp_component_close(void)
|
||||
{
|
||||
int i;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
opal_object_t *value;
|
||||
uint64_t key;
|
||||
void *node;
|
||||
int rc;
|
||||
|
||||
/* don't cleanup the listen thread as it wasn't constructed
|
||||
* for anything other than the HNP, and we don't want to incur
|
||||
* the timeout penalty when the HNP exits that would be required
|
||||
* to stop the thread
|
||||
*/
|
||||
|
||||
/* cleanup listen event list */
|
||||
/* cleanup listen event list */
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.listeners);
|
||||
|
||||
/* cleanup modules */
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL != (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
free(mod->if_name);
|
||||
free(mod);
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.modules);
|
||||
|
||||
if (NULL != mca_oob_tcp_component.ipv4conns) {
|
||||
opal_argv_free(mca_oob_tcp_component.ipv4conns);
|
||||
}
|
||||
@ -220,17 +194,6 @@ static int tcp_component_close(void)
|
||||
}
|
||||
#endif
|
||||
|
||||
/* release all peers from the hash table */
|
||||
rc = opal_hash_table_get_first_key_uint64 (&mca_oob_tcp_component.peers, &key,
|
||||
(void **) &value, &node);
|
||||
while (OPAL_SUCCESS == rc) {
|
||||
OBJ_RELEASE(value);
|
||||
rc = opal_hash_table_get_next_key_uint64 (&mca_oob_tcp_component.peers, &key,
|
||||
(void **) &value, node, &node);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&mca_oob_tcp_component.peers);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
#if ORTE_ENABLE_STATIC_PORTS
|
||||
@ -445,20 +408,15 @@ static int tcp_component_register(void)
|
||||
|
||||
|
||||
static char **split_and_resolve(char **orig_str, char *name);
|
||||
static int mca_oob_tcp_create(int if_idx,
|
||||
const char *if_name);
|
||||
|
||||
static bool component_available(void)
|
||||
{
|
||||
int i, j, rc;
|
||||
int i, rc;
|
||||
char **interfaces = NULL;
|
||||
bool including = false, excluding = false;
|
||||
char name[32];
|
||||
struct sockaddr_storage my_ss;
|
||||
int kindex;
|
||||
bool add_this_nic;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
mca_oob_tcp_nicaddr_t *nicaddr;
|
||||
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output,
|
||||
"oob:tcp: component_available called");
|
||||
@ -550,30 +508,6 @@ static bool component_available(void)
|
||||
}
|
||||
}
|
||||
|
||||
/* we know we want this address - check if we have seen this NIC before */
|
||||
add_this_nic = true;
|
||||
for (j = 0; j < mca_oob_tcp_component.modules.size; j++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, j))) {
|
||||
continue;
|
||||
}
|
||||
/* Have we seen this NIC already? */
|
||||
if (kindex == mod->if_kidx) {
|
||||
add_this_nic = false;
|
||||
/* we don't want another module to be created. But we still need to preserve
|
||||
* the address as a given NIC can have multiple addresses.
|
||||
*/
|
||||
nicaddr = OBJ_NEW(mca_oob_tcp_nicaddr_t);
|
||||
nicaddr->af_family = my_ss.ss_family;
|
||||
memcpy(&nicaddr->addr, &my_ss, sizeof(struct sockaddr));
|
||||
opal_list_append(&mod->addresses, &nicaddr->super);
|
||||
opal_output_verbose(10, orte_oob_base_framework.framework_output,
|
||||
"%s oob:tcp:init adding %s address to interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(AF_INET == my_ss.ss_family) ? "V4" : "V6", name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Refs ticket #3019
|
||||
* it would probably be worthwhile to print out a warning if OMPI detects multiple
|
||||
* IP interfaces that are "up" on the same subnet (because that's a Bad Idea). Note
|
||||
@ -582,18 +516,6 @@ static bool component_available(void)
|
||||
* them so that applications won't hang.
|
||||
*/
|
||||
|
||||
if (add_this_nic) {
|
||||
opal_output_verbose(10, orte_oob_base_framework.framework_output,
|
||||
"%s oob:tcp:init creating module for %s address on interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(AF_INET == my_ss.ss_family) ? "V4" : "V6", name);
|
||||
/* we want to support this interface, so create a module for it */
|
||||
if (ORTE_SUCCESS != (rc = mca_oob_tcp_create(kindex, name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* add this address to our connections */
|
||||
if (AF_INET == my_ss.ss_family) {
|
||||
opal_output_verbose(10, orte_oob_base_framework.framework_output,
|
||||
@ -624,7 +546,11 @@ static bool component_available(void)
|
||||
opal_argv_free(interfaces);
|
||||
}
|
||||
|
||||
if (0 == mca_oob_tcp_component.num_modules) {
|
||||
if (0 == opal_argv_count(mca_oob_tcp_component.ipv4conns)
|
||||
#if OPAL_ENABLE_IPV6
|
||||
&& 0 == opal_argv_count(mca_oob_tcp_component.ipv6conns)
|
||||
#endif
|
||||
) {
|
||||
if (including) {
|
||||
orte_show_help("help-oob-tcp.txt", "no-included-found", true, mca_oob_tcp_component.if_include);
|
||||
} else if (excluding) {
|
||||
@ -632,27 +558,26 @@ static bool component_available(void)
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/* set the module event base - this is where we would spin off a separate
|
||||
* progress thread if so desired */
|
||||
mca_oob_tcp_module.ev_base = orte_event_base;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Start all modules */
|
||||
static int component_startup(void)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod;
|
||||
int i, rc;
|
||||
int rc;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP STARTUP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* start the modules */
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
continue;
|
||||
}
|
||||
if (NULL != mod->api.init) {
|
||||
mod->api.init((struct mca_oob_tcp_module_t*)mod);
|
||||
}
|
||||
/* start the module */
|
||||
if (NULL != mca_oob_tcp_module.api.init) {
|
||||
mca_oob_tcp_module.api.init();
|
||||
}
|
||||
|
||||
/* start the listening thread/event */
|
||||
@ -664,7 +589,6 @@ static int component_startup(void)
|
||||
|
||||
static void component_shutdown(void)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod;
|
||||
int i;
|
||||
opal_list_item_t *item;
|
||||
|
||||
@ -683,87 +607,31 @@ static void component_shutdown(void)
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
|
||||
/* shutdown the modules */
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
continue;
|
||||
}
|
||||
if (NULL != mod->api.finalize) {
|
||||
mod->api.finalize((struct mca_oob_tcp_module_t*)mod);
|
||||
}
|
||||
/* shutdown the module */
|
||||
if (NULL != mca_oob_tcp_module.api.finalize) {
|
||||
mca_oob_tcp_module.api.finalize();
|
||||
}
|
||||
}
|
||||
|
||||
static int component_send(orte_rml_send_t *msg)
|
||||
{
|
||||
int i;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
uint64_t ui64;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output,
|
||||
"%s oob:tcp:send_nb to peer %s:%d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&msg->dst), msg->tag);
|
||||
|
||||
/* do we know some way of potentially reaching this peer? */
|
||||
memcpy(&ui64, (char*)&msg->dst, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr)) {
|
||||
/* nope - let someone else try */
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
/* if we knew the peer but have found all routes unreachable, then
|
||||
* we can't send it
|
||||
/* the module is potentially running on its own event
|
||||
* base, so all it can do is push our send request
|
||||
* onto an event - it cannot tell us if it will
|
||||
* succeed. The module will first see if it knows
|
||||
* of a way to send the data to the target, and then
|
||||
* attempt to send the data. It will call the cbfunc
|
||||
* with the status upon completion - if it can't do it for
|
||||
* some reason, it will call the component error
|
||||
* function so we can do something about it
|
||||
*/
|
||||
if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) {
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
|
||||
/* if a module is assigned, then use it */
|
||||
if (NULL != pr->mod) {
|
||||
/* the module is potentially running on its own event
|
||||
* base, so all it can do is push our send request
|
||||
* onto an event - it cannot tell us if it will
|
||||
* succeed. The module will attempt to send the data and
|
||||
* will call the cbfunc with the status
|
||||
* upon completion - if it can't do it for
|
||||
* some reason, it will call the component error
|
||||
* function so we can try with another module
|
||||
*/
|
||||
pr->mod->api.send_nb((struct mca_oob_tcp_module_t*)pr->mod, msg);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if a module isn't assigned, give it to the highest priority reachable
|
||||
* module as a place to start. The module will attempt to send the data and
|
||||
* will call the cbfunc with the status upon completion - if it can't do it for
|
||||
* some reason, it will call the component error function so we can try with another module
|
||||
*/
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
continue;
|
||||
}
|
||||
/* check to see if we have a contact address for this peer or
|
||||
* some route to it
|
||||
*/
|
||||
if (!opal_bitmap_is_set_bit(&pr->reachable, mod->if_kidx)) {
|
||||
continue;
|
||||
}
|
||||
/* mark that this module has been assigned */
|
||||
pr->mod = mod;
|
||||
/* pass the message along to be sent */
|
||||
mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, msg);
|
||||
/* upon successful completion, we will mark the module as the "best"
|
||||
* one for future messages
|
||||
*/
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if for some reason all our modules are down,
|
||||
* then let the base stub keep searching
|
||||
*/
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
mca_oob_tcp_module.api.send_nb(msg);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static char* component_get_addr(void)
|
||||
@ -818,12 +686,10 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
{
|
||||
char **addrs, *hptr;
|
||||
char *tcpuri=NULL, *host, *ports;
|
||||
int i, j, k, rc;
|
||||
mca_oob_tcp_module_t *mod, *firstmod;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
int i, j;
|
||||
uint16_t af_family = AF_UNSPEC;
|
||||
uint64_t ui64;
|
||||
bool found, assigned;
|
||||
bool found;
|
||||
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
/* cycle across component parts and see if one belongs to us */
|
||||
@ -890,7 +756,6 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
|
||||
|
||||
/* cycle across the provided addrs */
|
||||
assigned = false;
|
||||
for (j=0; NULL != addrs[j]; j++) {
|
||||
/* if they gave us "localhost", then just take the first conn on our list */
|
||||
if (0 == strcasecmp(addrs[j], "localhost")) {
|
||||
@ -914,37 +779,7 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
} else {
|
||||
host = addrs[j];
|
||||
}
|
||||
/* lookup the kernel index of this address */
|
||||
if (0 >= (k = opal_ifaddrtokindex(host))) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s UNFOUND KERNEL INDEX %d FOR ADDRESS %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, host);
|
||||
/* we don't have an interface on this subnet - ignore it */
|
||||
continue;
|
||||
}
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s NO MODULE AT KINDEX %d FOR ADDRESS %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, host);
|
||||
continue;
|
||||
}
|
||||
/* record that this peer may be reachable via this module, but don't assign
|
||||
* the peer to this module until later when we actually connect
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s PEER %s MAY BE REACHABLE USING MODULE AT KINDEX %d INTERFACE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), k, mod->if_name);
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
pr = OBJ_NEW(mca_oob_tcp_component_peer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void*)pr))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
opal_bitmap_set_bit(&pr->reachable, k);
|
||||
|
||||
/* pass this proc, and its ports, to the
|
||||
* module for handling - this module will be responsible
|
||||
* for communicating with the proc via this network.
|
||||
@ -953,101 +788,10 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
* call into their own event base for processing.
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s PASSING ADDR %s TO INTERFACE %s AT KERNEL INDEX %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host,
|
||||
mod->if_name, k);
|
||||
mod->api.set_peer((struct mca_oob_tcp_module_t*)mod,
|
||||
peer, af_family, host, ports);
|
||||
"%s PASSING ADDR %s TO MODULE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host);
|
||||
mca_oob_tcp_module.api.set_peer(peer, af_family, host, ports);
|
||||
found = true;
|
||||
assigned = true;
|
||||
}
|
||||
/* if we cycled thru all their addresses without finding a match in our
|
||||
* interfaces, then it remains possible that they have a routed system
|
||||
* that can still route messages to the destination. So give it a chance
|
||||
* to succeed by assigning each provided address to the first module in our list,
|
||||
* and let the normal failure progression ultimately determine if we
|
||||
* can reach this peer. A future enhancement could be to do a better
|
||||
* job of matching provided addresses with available interfaces, perhaps
|
||||
* looking at the number of matching octets and assigning the address
|
||||
* to the interface with the most matches - but that's for someone else
|
||||
* to address :-)
|
||||
*/
|
||||
if (!assigned) {
|
||||
firstmod = NULL;
|
||||
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
continue;
|
||||
}
|
||||
if (NULL == firstmod) {
|
||||
firstmod = mod;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == firstmod) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
for (j=0; NULL != addrs[j]; j++) {
|
||||
/* if they gave us "localhost", then just take the first conn on our list */
|
||||
if (0 == strcasecmp(addrs[j], "localhost")) {
|
||||
#if OPAL_ENABLE_IPV6
|
||||
if (AF_INET6 == af_family) {
|
||||
if (NULL == mca_oob_tcp_component.ipv6conns ||
|
||||
NULL == mca_oob_tcp_component.ipv6conns[0]) {
|
||||
continue;
|
||||
}
|
||||
host = mca_oob_tcp_component.ipv6conns[0];
|
||||
} else {
|
||||
#endif
|
||||
if (NULL == mca_oob_tcp_component.ipv4conns ||
|
||||
NULL == mca_oob_tcp_component.ipv4conns[0]) {
|
||||
continue;
|
||||
}
|
||||
host = mca_oob_tcp_component.ipv4conns[0];
|
||||
#if OPAL_ENABLE_IPV6
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
host = addrs[j];
|
||||
}
|
||||
/* record that this peer may be reachable via this module, but don't assign
|
||||
* the peer to this module until later when we actually connect
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s PEER %s MAY BE REACHABLE BY ROUTING - ASSIGNING MODULE AT KINDEX %d INTERFACE %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(peer), k, firstmod->if_name);
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
pr = OBJ_NEW(mca_oob_tcp_component_peer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void*)pr))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
opal_bitmap_set_bit(&pr->reachable, k);
|
||||
/* pass this proc, and its ports, to the
|
||||
* module for handling - this module will be responsible
|
||||
* for communicating with the proc via this network.
|
||||
* Note that the modules are *not* necessarily running
|
||||
* on our event base - thus, the modules will push this
|
||||
* call into their own event base for processing.
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s PASSING ADDR %s TO INTERFACE %s AT KERNEL INDEX %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host,
|
||||
firstmod->if_name, k);
|
||||
mod->api.set_peer((struct mca_oob_tcp_module_t*)firstmod,
|
||||
peer, af_family, host, ports);
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (NULL != addrs) {
|
||||
opal_argv_free(addrs);
|
||||
}
|
||||
if (NULL != tcpuri) {
|
||||
free(tcpuri);
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
@ -1062,9 +806,6 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
static bool component_is_reachable(orte_process_name_t *peer)
|
||||
{
|
||||
orte_process_name_t hop;
|
||||
uint64_t ui64;
|
||||
mca_oob_tcp_component_peer_t *pr, *pnew;
|
||||
int rc;
|
||||
|
||||
/* if we have a route to this peer, then we can reach it */
|
||||
hop = orte_routed.get_route(peer);
|
||||
@ -1075,105 +816,33 @@ static bool component_is_reachable(orte_process_name_t *peer)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return false;
|
||||
}
|
||||
/* we have a route, but which (if any) module can reach the hop? */
|
||||
memcpy(&ui64, (char*)&hop, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr)) {
|
||||
/* nope - we can't get there */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s is NOT reachable by TCP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return false;
|
||||
}
|
||||
/* if we know the hop but have found all routes unreachable, then
|
||||
* we can't send it
|
||||
*/
|
||||
if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s is NOT reachable by TCP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
return false;
|
||||
}
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s is reachable by TCP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* mark it so we can find this peer when we try to send */
|
||||
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pnew) || NULL == pnew) {
|
||||
pnew = OBJ_NEW(mca_oob_tcp_component_peer_t);
|
||||
opal_bitmap_copy(&pnew->reachable, &pr->reachable);
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void*)pnew))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
/* assume we can reach the hop - the module will tell us if it can't
|
||||
* when we try to send the first time, and then we'll correct it */
|
||||
return true;
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
static int component_ft_event(int state)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod;
|
||||
int i;
|
||||
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"%s TCP FT EVENT", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
continue;
|
||||
}
|
||||
if (NULL != mod->api.ft_event) {
|
||||
mod->api.ft_event((struct mca_oob_tcp_module_t*)mod, state);
|
||||
}
|
||||
/* pass it into the module */
|
||||
if (NULL != mca_oob_tcp_module.api.ft_event) {
|
||||
mca_oob_tcp_module.>api.ft_event(state);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Create a module instance and add to modules array.
|
||||
*/
|
||||
static int mca_oob_tcp_create(int kindex, const char *if_name)
|
||||
{
|
||||
mca_oob_tcp_module_t *mod;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output,
|
||||
"%s creating OOB-TCP module for interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), if_name));
|
||||
mod = (mca_oob_tcp_module_t*)malloc(sizeof(mca_oob_tcp_module_t));
|
||||
if (NULL == mod) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
mod->if_name = strdup(if_name);
|
||||
/* copy the APIs across */
|
||||
memcpy(mod, &mca_oob_tcp_module.api, sizeof(mca_oob_tcp_module_api_t));
|
||||
/* point to the interface it will service */
|
||||
mod->if_kidx = kindex;
|
||||
/* setup the list of addresses */
|
||||
OBJ_CONSTRUCT(&mod->addresses, opal_list_t);
|
||||
|
||||
/* setup the default event base */
|
||||
mod->ev_base = orte_event_base;
|
||||
|
||||
/* add it to our array */
|
||||
opal_pointer_array_set_item(&mca_oob_tcp_component.modules, kindex, mod);
|
||||
mca_oob_tcp_component.num_modules++;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
|
||||
uint64_t ui64;
|
||||
int rc;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
orte_oob_base_peer_t *bpr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
@ -1181,23 +850,11 @@ void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
|
||||
/* retrieve the peer's name */
|
||||
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
|
||||
|
||||
/* mark that this peer is being handled by the specified module */
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
/* must have come from an inbound connection */
|
||||
pr = OBJ_NEW(mca_oob_tcp_component_peer_t);
|
||||
opal_bitmap_set_bit(&pr->reachable, pop->mod->if_kidx);
|
||||
opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr);
|
||||
}
|
||||
pr->mod = pop->mod;
|
||||
|
||||
/* make sure the OOB knows that we are handling this peer - we
|
||||
/* make sure the OOB knows that we can reach this peer - we
|
||||
* are in the same event base as the OOB base, so we can
|
||||
* directly access its storage
|
||||
*/
|
||||
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
|
||||
ui64, (void**)&bpr) || NULL == bpr) {
|
||||
bpr = OBJ_NEW(orte_oob_base_peer_t);
|
||||
@ -1207,10 +864,8 @@ void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
|
||||
ui64, bpr))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(pop);
|
||||
}
|
||||
|
||||
@ -1218,74 +873,27 @@ void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
|
||||
uint64_t ui64;
|
||||
int rc, k;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
orte_oob_base_peer_t *bpr;
|
||||
int rc;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:lost connection called for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
|
||||
/* retrieve the peer's name */
|
||||
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
|
||||
|
||||
/* mark that this peer is no longer reachable from this module */
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
goto cleanup;
|
||||
}
|
||||
opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx);
|
||||
|
||||
/* if we are terminating, or recovery isn't enabled, then don't attempt to reconnect */
|
||||
if (!orte_enable_recovery || orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if at least one module can still reach this peer, then we *might* be okay */
|
||||
if (!opal_bitmap_is_clear(&pr->reachable)) {
|
||||
/* any pending messages were re-queued when the module closed the connection */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:lost connection still can reach peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
|
||||
/* we cannot look into the module itself to see if messages
|
||||
* are pending that would cause a connection to the next address
|
||||
* to occur as the module could be operating in a separate event
|
||||
* base. Instead, we trigger an event to ask it to start
|
||||
* the connection procedure by issuing a "ping" request
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:lost pinging peer %s on interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer), mod->if_name);
|
||||
mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(pop);
|
||||
return;
|
||||
}
|
||||
}
|
||||
/* Mark that we no longer support this peer */
|
||||
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
|
||||
ui64, (void**)&bpr) || NULL == bpr) {
|
||||
bpr = OBJ_NEW(orte_oob_base_peer_t);
|
||||
}
|
||||
|
||||
/* if we get here, then we no longer have any way to reach this peer.
|
||||
* Mark that we no longer support this peer
|
||||
*/
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* do the same to the OOB's table - for now, we don't worry about shifting to
|
||||
* another component. Eventually, we will want to push this decision to
|
||||
* the OOB so it can try other components and eventually error out
|
||||
*/
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
|
||||
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
|
||||
ui64, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
@ -1304,48 +912,28 @@ void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
|
||||
void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
uint64_t ui64;
|
||||
int k;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
int rc;
|
||||
orte_oob_base_peer_t *bpr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:no route called for peer %s on interface %s",
|
||||
"%s tcp:no route called for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop),
|
||||
mop->mod->if_name);
|
||||
ORTE_NAME_PRINT(&mop->hop));
|
||||
|
||||
/* retrieve the hop's name */
|
||||
/* mark that we cannot reach this hop */
|
||||
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
|
||||
|
||||
/* get the peer object */
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
goto cleanup;
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
|
||||
ui64, (void**)&bpr) || NULL == bpr) {
|
||||
bpr = OBJ_NEW(orte_oob_base_peer_t);
|
||||
}
|
||||
|
||||
/* ensure we mark that this peer isn't reachable by this module */
|
||||
opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx);
|
||||
|
||||
/* do we have any other modules (i.e., NICs) we can try? */
|
||||
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
|
||||
/* let this module try */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:unknown hop attempting send to peer %s via interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop), mod->if_name);
|
||||
mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, mop->rmsg);
|
||||
OBJ_RELEASE(mop);
|
||||
return;
|
||||
}
|
||||
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
|
||||
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
|
||||
ui64, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* if we get here, then we have no other modules - so we report
|
||||
* the error back to the OOB and let it try other components
|
||||
/* report the error back to the OOB and let it try other components
|
||||
* or declare a problem
|
||||
*/
|
||||
if (!orte_finalizing && !orte_abnormal_term_ordered) {
|
||||
@ -1353,32 +941,24 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
|
||||
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
|
||||
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
|
||||
} else {
|
||||
orte_show_help("help-oob-tcp.txt", "unable-to-commiunicate",
|
||||
true, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop));
|
||||
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(mop);
|
||||
}
|
||||
|
||||
void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
uint64_t ui64;
|
||||
int k;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
orte_rml_send_t *snd;
|
||||
orte_oob_base_peer_t *bpr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:unknown hop called for peer %s on interface %s",
|
||||
"%s tcp:unknown hop called for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop),
|
||||
mop->mod->if_name);
|
||||
ORTE_NAME_PRINT(&mop->hop));
|
||||
|
||||
if (orte_finalizing || orte_abnormal_term_ordered) {
|
||||
/* just ignore the problem */
|
||||
@ -1386,39 +966,8 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
|
||||
return;
|
||||
}
|
||||
|
||||
/* retrieve the hop's name */
|
||||
/* mark that this component cannot reach this hop */
|
||||
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
|
||||
|
||||
/* get the peer object */
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
/* cleanup */
|
||||
goto cleanup;
|
||||
|
||||
}
|
||||
|
||||
/* ensure we mark that this peer isn't reachable by this module */
|
||||
opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx);
|
||||
|
||||
/* do we have any other modules (i.e., NICs) we can try? */
|
||||
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
|
||||
/* let this module try */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:unknown hop attempting send to peer %s via interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->hop), mod->if_name);
|
||||
mod->api.resend((struct mca_oob_tcp_msg_error_t*)mop);
|
||||
OBJ_RELEASE(mop);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
/* mark that this component cannot reach this hop */
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
|
||||
ui64, (void**)&bpr) ||
|
||||
NULL == bpr) {
|
||||
@ -1439,13 +988,14 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
|
||||
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
|
||||
|
||||
/* mark that this component cannot reach this destination either */
|
||||
memcpy(&ui64, (char*)&(mop->snd->hdr.dst), sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
|
||||
ui64, (void**)&bpr) ||
|
||||
NULL == bpr) {
|
||||
opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of this process",
|
||||
opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of this process",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mop->snd->hdr.dst));
|
||||
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
|
||||
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
|
||||
OBJ_RELEASE(mop);
|
||||
return;
|
||||
}
|
||||
@ -1474,68 +1024,24 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
|
||||
void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
uint64_t ui64;
|
||||
int k;
|
||||
mca_oob_tcp_component_peer_t *pr;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:failed_to_connect called for peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
|
||||
/* get the peer object */
|
||||
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
|
||||
ui64, (void**)&pr) || NULL == pr) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
goto cleanup;
|
||||
}
|
||||
/* mark the peer as unreachable via this interface */
|
||||
opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx);
|
||||
|
||||
/* if we are terminating, then don't attempt to reconnect */
|
||||
/* if we are terminating, then don't attempt to reconnect */
|
||||
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
|
||||
OBJ_RELEASE(pop);
|
||||
return;
|
||||
}
|
||||
|
||||
/* if at least one module can still reach this peer, then we *might* be okay */
|
||||
if (!opal_bitmap_is_clear(&pr->reachable)) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:attempting different module for connection to peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
|
||||
/* we cannot look into the module itself to see if messages
|
||||
* are pending that would cause a connection to the next address
|
||||
* to occur as the module could be operating in a separate event
|
||||
* base. Instead, we trigger an event to ask it to start
|
||||
* the connection procedure by issuing a "ping" request
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:lost pinging peer %s on interface %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer), mod->if_name);
|
||||
mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(pop);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* get here if nobody else can reach it - activate the proc state */
|
||||
/* activate the proc state */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:failed_to_connect unable to reach peer %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer));
|
||||
|
||||
cleanup:
|
||||
/* if this was a lifeline, then alert */
|
||||
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
|
||||
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
|
||||
@ -1727,21 +1233,6 @@ OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t,
|
||||
opal_object_t,
|
||||
NULL, NULL);
|
||||
|
||||
static void cmp_peer_cons(mca_oob_tcp_component_peer_t *ptr)
|
||||
{
|
||||
ptr->mod = NULL;
|
||||
OBJ_CONSTRUCT(&ptr->reachable, opal_bitmap_t);
|
||||
opal_bitmap_init(&ptr->reachable, 8); // default to 8 bits
|
||||
}
|
||||
static void cmp_peer_des(mca_oob_tcp_component_peer_t *ptr)
|
||||
{
|
||||
OBJ_DESTRUCT(&ptr->reachable);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(mca_oob_tcp_component_peer_t,
|
||||
opal_object_t,
|
||||
cmp_peer_cons, cmp_peer_des);
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_oob_tcp_ping_t,
|
||||
opal_object_t,
|
||||
NULL, NULL);
|
||||
|
@ -43,8 +43,6 @@ typedef struct {
|
||||
mca_oob_base_component_t super; /**< base OOB component */
|
||||
uint32_t addr_count; /**< total number of addresses */
|
||||
int num_links; /**< number of logical links per physical device */
|
||||
opal_pointer_array_t modules; /**< array of available modules */
|
||||
int num_modules;
|
||||
int max_retries; /**< max number of retries before declaring peer gone */
|
||||
opal_list_t events; /**< events for monitoring connections */
|
||||
int peer_limit; /**< max size of tcp peer cache */
|
||||
@ -72,33 +70,17 @@ typedef struct {
|
||||
#endif
|
||||
|
||||
/* connection support */
|
||||
char* my_uri; /**< uri for connecting to the TCP modules */
|
||||
char* my_uri; /**< uri for connecting to the TCP module */
|
||||
int num_hnp_ports; /**< number of ports the HNP should listen on */
|
||||
opal_list_t listeners; /**< List of sockets being monitored by event or thread */
|
||||
opal_thread_t listen_thread; /**< handle to the listening thread */
|
||||
bool listen_thread_active;
|
||||
struct timeval listen_thread_tv; /**< Timeout when using listen thread */
|
||||
int stop_thread[2]; /**< pipe used to exit the listen thread */
|
||||
|
||||
/* peers available via this transport - the index is the process name,
|
||||
* and the pointer returned is the pointer to the last module that
|
||||
* said it could reach that peer. When a module loses its connection
|
||||
* to the peer, this pointer either gets set to NULL (if nobody can
|
||||
* reach the peer) or to the next module that can. Must be accessed
|
||||
* only from the framework-level event base
|
||||
*/
|
||||
opal_hash_table_t peers;
|
||||
} mca_oob_tcp_component_t;
|
||||
|
||||
ORTE_MODULE_DECLSPEC extern mca_oob_tcp_component_t mca_oob_tcp_component;
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
mca_oob_tcp_module_t *mod; // current module handling this peer
|
||||
opal_bitmap_t reachable; // marks the modules that can reach at least one of the peer's addresses
|
||||
} mca_oob_tcp_component_peer_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_component_peer_t);
|
||||
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata);
|
||||
|
@ -73,19 +73,14 @@
|
||||
#include "orte/mca/oob/tcp/oob_tcp_common.h"
|
||||
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
||||
|
||||
static void tcp_peer_event_init(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer);
|
||||
static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer);
|
||||
static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, int sd,
|
||||
void* data, size_t size);
|
||||
static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer, int sd,
|
||||
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
|
||||
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
|
||||
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
|
||||
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
|
||||
void* data, size_t size);
|
||||
static void tcp_peer_connected(mca_oob_tcp_peer_t* peer);
|
||||
|
||||
static int tcp_peer_create_socket(mca_oob_tcp_module_t *md,
|
||||
mca_oob_tcp_peer_t* peer)
|
||||
static int tcp_peer_create_socket(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
int flags;
|
||||
|
||||
@ -113,7 +108,7 @@ static int tcp_peer_create_socket(mca_oob_tcp_module_t *md,
|
||||
orte_oob_tcp_set_socket_options(peer->sd);
|
||||
|
||||
/* setup event callbacks */
|
||||
tcp_peer_event_init(md, peer);
|
||||
tcp_peer_event_init(peer);
|
||||
|
||||
/* setup the socket as non-blocking */
|
||||
if (peer->sd >= 0) {
|
||||
@ -146,7 +141,6 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
|
||||
mca_oob_tcp_peer_t *peer = op->peer;
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)op->mod;
|
||||
int rc;
|
||||
opal_socklen_t addrlen = 0;
|
||||
mca_oob_tcp_addr_t *addr;
|
||||
@ -156,11 +150,11 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_tcp_peer_try_connect: "
|
||||
"attempting to connect to proc %s via interface %s",
|
||||
"attempting to connect to proc %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)), mod->if_name);
|
||||
ORTE_NAME_PRINT(&(peer->name)));
|
||||
|
||||
rc = tcp_peer_create_socket(op->mod, peer);
|
||||
rc = tcp_peer_create_socket(peer);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
/* FIXME: we cannot create a TCP socket - this spans
|
||||
* all interfaces, so all we can do is report
|
||||
@ -179,9 +173,9 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_tcp_peer_try_connect: "
|
||||
"attempting to connect to proc %s via interface %s on socket %d",
|
||||
"attempting to connect to proc %s on socket %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)), mod->if_name, peer->sd);
|
||||
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
|
||||
|
||||
addrlen = sizeof(struct sockaddr_in);
|
||||
OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) {
|
||||
@ -262,18 +256,17 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
host = orte_get_proc_hostname(&(peer->name));
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_tcp_peer_try_connect: "
|
||||
"Connection across interface %s to proc %s on node %s failed",
|
||||
"Connection to proc %s on node %s failed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
op->mod->if_name,
|
||||
ORTE_NAME_PRINT(&peer->name),
|
||||
(NULL == host) ? "NULL" : host);
|
||||
/* let the TCP component know that this module failed to make
|
||||
* the connection so it can try other modules, and/or fail back
|
||||
* the connection so it can do some bookkeeping and fail back
|
||||
* to the OOB level so another component can try. This will activate
|
||||
* an event in the component event base, and so it will fire async
|
||||
* from us if we are in our own progress thread
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(mod, &peer->name, mca_oob_tcp_component_failed_to_connect);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, mca_oob_tcp_component_failed_to_connect);
|
||||
/* FIXME: post any messages in the send queue back to the OOB
|
||||
* level for reassignment
|
||||
*/
|
||||
@ -286,9 +279,8 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_tcp_peer_try_connect: "
|
||||
"Connection across interface %s to proc %s succeeded",
|
||||
"Connection to proc %s succeeded",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
op->mod->if_name,
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
|
||||
/* setup our recv to catch the return ack call */
|
||||
@ -298,7 +290,7 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
/* send our globally unique process identifier to the peer */
|
||||
if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(op->mod, peer))) {
|
||||
if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(peer))) {
|
||||
peer->state = MCA_OOB_TCP_CONNECT_ACK;
|
||||
} else {
|
||||
opal_output(0,
|
||||
@ -321,8 +313,7 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
|
||||
* version string, and a security token to ensure we are talking
|
||||
* to another OMPI process
|
||||
*/
|
||||
static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer)
|
||||
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
char *msg;
|
||||
mca_oob_tcp_hdr_t hdr;
|
||||
@ -363,11 +354,11 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred->credential, cred->size);
|
||||
|
||||
/* send it */
|
||||
if (ORTE_SUCCESS != tcp_peer_send_blocking(mod, peer->sd, msg, sdsize)) {
|
||||
if (ORTE_SUCCESS != tcp_peer_send_blocking(peer->sd, msg, sdsize)) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
|
||||
free(msg);
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
free(msg);
|
||||
@ -378,12 +369,10 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
/*
|
||||
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
|
||||
*/
|
||||
static void tcp_peer_event_init(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer)
|
||||
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
if (peer->sd >= 0) {
|
||||
peer->mod = mod;
|
||||
opal_event_set(mod->ev_base,
|
||||
opal_event_set(mca_oob_tcp_module.ev_base,
|
||||
&peer->recv_event,
|
||||
peer->sd,
|
||||
OPAL_EV_READ|OPAL_EV_PERSIST,
|
||||
@ -395,7 +384,7 @@ static void tcp_peer_event_init(mca_oob_tcp_module_t *mod,
|
||||
peer->recv_ev_active = false;
|
||||
}
|
||||
|
||||
opal_event_set(mod->ev_base,
|
||||
opal_event_set(mca_oob_tcp_module.ev_base,
|
||||
&peer->send_event,
|
||||
peer->sd,
|
||||
OPAL_EV_WRITE|OPAL_EV_PERSIST,
|
||||
@ -414,8 +403,7 @@ static void tcp_peer_event_init(mca_oob_tcp_module_t *mod,
|
||||
* later. Otherwise, send this processes identifier to the peer on the
|
||||
* newly connected socket.
|
||||
*/
|
||||
void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t *peer)
|
||||
void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t *peer)
|
||||
{
|
||||
int so_error = 0;
|
||||
opal_socklen_t so_length = sizeof(so_error);
|
||||
@ -433,7 +421,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
strerror(opal_socket_errno),
|
||||
opal_socket_errno);
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -449,7 +437,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
ORTE_NAME_PRINT(&(peer->name)),
|
||||
strerror(so_error),
|
||||
so_error);
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
} else if (so_error != 0) {
|
||||
/* No need to worry about the return code here - we return regardless
|
||||
@ -460,7 +448,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
"connection failed with error %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)), so_error);
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -470,7 +458,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)));
|
||||
|
||||
if (tcp_peer_send_connect_ack(mod, peer) == ORTE_SUCCESS) {
|
||||
if (tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) {
|
||||
peer->state = MCA_OOB_TCP_CONNECT_ACK;
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp_peer_complete_connect: "
|
||||
@ -487,7 +475,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)));
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
}
|
||||
}
|
||||
|
||||
@ -495,8 +483,7 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
* A blocking send on a non-blocking socket. Used to send the small amount of connection
|
||||
* information that identifies the peers endpoint.
|
||||
*/
|
||||
static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
|
||||
int sd, void* data, size_t size)
|
||||
static int tcp_peer_send_blocking(int sd, void* data, size_t size)
|
||||
{
|
||||
unsigned char* ptr = (unsigned char*)data;
|
||||
size_t cnt = 0;
|
||||
@ -534,8 +521,7 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
|
||||
* connected socket and verify the expected response. If so, move the
|
||||
* socket to a connected state.
|
||||
*/
|
||||
int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* pr,
|
||||
int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
|
||||
int sd, mca_oob_tcp_hdr_t *dhdr)
|
||||
{
|
||||
char *msg;
|
||||
@ -553,7 +539,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
|
||||
peer = pr;
|
||||
/* get the header */
|
||||
if (tcp_peer_recv_blocking(mod, peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) {
|
||||
if (tcp_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) {
|
||||
if (NULL != peer) {
|
||||
/* If the peer state is CONNECT_ACK, then we were waiting for
|
||||
* the connection to be ack'd
|
||||
@ -563,7 +549,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE (%d) FROM %s ON SOCKET %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->state,
|
||||
ORTE_NAME_PRINT(&(peer->name)), sd);
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
}
|
||||
@ -594,7 +580,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
hdr.dst = hdr.origin;
|
||||
hdr.origin = *ORTE_PROC_MY_NAME;
|
||||
MCA_OOB_TCP_HDR_HTON(&hdr);
|
||||
tcp_peer_send_blocking(mod, sd, &hdr, sizeof(mca_oob_tcp_hdr_t));
|
||||
tcp_peer_send_blocking(sd, &hdr, sizeof(mca_oob_tcp_hdr_t));
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -604,7 +590,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
hdr.type);
|
||||
if (NULL != peer) {
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
} else {
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
}
|
||||
@ -613,17 +599,16 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
|
||||
/* if we don't already have it, get the peer */
|
||||
if (NULL == peer) {
|
||||
peer = mca_oob_tcp_peer_lookup(mod, &hdr.origin);
|
||||
peer = mca_oob_tcp_peer_lookup(&hdr.origin);
|
||||
if (NULL == peer) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s mca_oob_tcp_recv_connect: connection from new peer",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
peer = OBJ_NEW(mca_oob_tcp_peer_t);
|
||||
peer->mod = mod;
|
||||
peer->name = hdr.origin;
|
||||
peer->state = MCA_OOB_TCP_ACCEPTING;
|
||||
ui64 = (uint64_t*)(&peer->name);
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) {
|
||||
OBJ_RELEASE(peer);
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return ORTE_ERR_UNREACH;
|
||||
@ -666,7 +651,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
} else {
|
||||
/* retry the connection */
|
||||
peer->state = MCA_OOB_TCP_CONNECTING;
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect);
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
}
|
||||
@ -681,7 +666,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
ORTE_NAME_PRINT(&(hdr.origin)),
|
||||
ORTE_NAME_PRINT(&(peer->name)));
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
}
|
||||
@ -694,10 +679,10 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
/* get the authentication and version payload */
|
||||
if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
if (!tcp_peer_recv_blocking(mod, peer, sd, msg, hdr.nbytes)) {
|
||||
if (!tcp_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) {
|
||||
/* unable to complete the recv */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
|
||||
@ -716,7 +701,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
ORTE_NAME_PRINT(&(peer->name)),
|
||||
version, orte_version_string);
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
free(msg);
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
@ -749,7 +734,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
/* set the peer into the component and OOB-level peer tables to indicate
|
||||
* that we know this peer and we will be handling him
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(mod, &peer->name, mca_oob_tcp_component_set_module);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, mca_oob_tcp_component_set_module);
|
||||
|
||||
/* connected */
|
||||
tcp_peer_connected(peer);
|
||||
@ -798,8 +783,7 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
|
||||
* and update the peer state to reflect the connection has
|
||||
* been closed.
|
||||
*/
|
||||
void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t *peer)
|
||||
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
|
||||
{
|
||||
mca_oob_tcp_send_t *snd;
|
||||
|
||||
@ -820,7 +804,7 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
|
||||
/* inform the component-level that we have lost a connection so
|
||||
* it can decide what to do about it.
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(mod, &peer->name, mca_oob_tcp_component_lost_connection);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, mca_oob_tcp_component_lost_connection);
|
||||
|
||||
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
|
||||
/* nothing more to do */
|
||||
@ -843,8 +827,7 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
|
||||
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
|
||||
* information that identifies the peers endpoint.
|
||||
*/
|
||||
static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer, int sd,
|
||||
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
|
||||
void* data, size_t size)
|
||||
{
|
||||
unsigned char* ptr = (unsigned char*)data;
|
||||
@ -867,7 +850,7 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
|
||||
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
|
||||
(NULL == peer) ? 0 : peer->state);
|
||||
if (NULL != peer) {
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
} else {
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
}
|
||||
@ -910,7 +893,7 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
|
||||
opal_socket_errno);
|
||||
if (NULL != peer) {
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
} else {
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
}
|
||||
@ -995,7 +978,7 @@ void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
|
||||
* Accept incoming connection - if not already connected
|
||||
*/
|
||||
|
||||
bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer)
|
||||
bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer)
|
||||
{
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s tcp:peer_accept called for peer %s in state %s on socket %d",
|
||||
@ -1005,22 +988,22 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer
|
||||
|
||||
if (peer->state != MCA_OOB_TCP_CONNECTED) {
|
||||
|
||||
tcp_peer_event_init(mod, peer);
|
||||
tcp_peer_event_init(peer);
|
||||
|
||||
if (tcp_peer_send_connect_ack(mod, peer) != ORTE_SUCCESS) {
|
||||
if (tcp_peer_send_connect_ack(peer) != ORTE_SUCCESS) {
|
||||
opal_output(0, "%s-%s tcp_peer_accept: "
|
||||
"tcp_peer_send_connect_ack failed\n",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(peer->name)));
|
||||
peer->state = MCA_OOB_TCP_FAILED;
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* set the peer into the component and OOB-level peer tables to indicate
|
||||
* that we know this peer and we will be handling him
|
||||
*/
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(mod, &peer->name, mca_oob_tcp_component_set_module);
|
||||
ORTE_ACTIVATE_TCP_CMP_OP(&peer->name, mca_oob_tcp_component_set_module);
|
||||
|
||||
tcp_peer_connected(peer);
|
||||
if (!peer->recv_ev_active) {
|
||||
|
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -39,7 +40,6 @@ typedef struct {
|
||||
opal_object_t super;
|
||||
mca_oob_tcp_peer_t *peer;
|
||||
opal_event_t ev;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
} mca_oob_tcp_conn_op_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
|
||||
@ -49,7 +49,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
close(socket); \
|
||||
} while(0)
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_CONN_STATE(m, p, cbfunc) \
|
||||
#define ORTE_ACTIVATE_TCP_CONN_STATE(p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
@ -58,26 +58,24 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->mod = (m); \
|
||||
cop->peer = (p); \
|
||||
opal_event_set((m)->ev_base, &cop->ev, -1, \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &cop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), cop); \
|
||||
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&cop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_ACCEPT_STATE(m, s, a, cbfunc) \
|
||||
#define ORTE_ACTIVATE_TCP_ACCEPT_STATE(s, a, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->mod = (m); \
|
||||
opal_event_set((m)->ev_base, &cop->ev, s, \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &cop->ev, s, \
|
||||
OPAL_EV_READ, (cbfunc), cop); \
|
||||
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_add(&cop->ev, 0); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_RETRY_TCP_CONN_STATE(m, p, cbfunc, tv) \
|
||||
#define ORTE_RETRY_TCP_CONN_STATE(p, cbfunc, tv) \
|
||||
do { \
|
||||
mca_oob_tcp_conn_op_t *cop; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
@ -86,9 +84,8 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT((&(p)->name))); \
|
||||
cop = OBJ_NEW(mca_oob_tcp_conn_op_t); \
|
||||
cop->mod = (m); \
|
||||
cop->peer = (p); \
|
||||
opal_event_evtimer_set((m)->ev_base, \
|
||||
opal_event_evtimer_set(mca_oob_tcp_module.ev_base, \
|
||||
&cop->ev, \
|
||||
(cbfunc), cop); \
|
||||
opal_event_evtimer_add(&cop->ev, (tv)); \
|
||||
@ -96,13 +93,10 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_conn_op_t);
|
||||
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
|
||||
ORTE_MODULE_DECLSPEC bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer);
|
||||
ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t* peer,
|
||||
ORTE_MODULE_DECLSPEC bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer);
|
||||
ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer,
|
||||
int sd, mca_oob_tcp_hdr_t *dhdr);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
|
||||
mca_oob_tcp_peer_t *peer);
|
||||
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer);
|
||||
|
||||
#endif /* _MCA_OOB_TCP_CONNECTION_H_ */
|
||||
|
@ -790,11 +790,6 @@ static void* listen_thread(opal_object_t *obj)
|
||||
static void connection_handler(int sd, short flags, void* cbdata)
|
||||
{
|
||||
mca_oob_tcp_pending_connection_t *new_connection;
|
||||
int i;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
bool found;
|
||||
struct sockaddr modaddr;
|
||||
uint32_t netmask;
|
||||
|
||||
new_connection = (mca_oob_tcp_pending_connection_t*)cbdata;
|
||||
|
||||
@ -806,58 +801,9 @@ static void connection_handler(int sd, short flags, void* cbdata)
|
||||
opal_net_get_hostname((struct sockaddr*) &new_connection->addr),
|
||||
opal_net_get_port((struct sockaddr*) &new_connection->addr));
|
||||
|
||||
/* cycle across all interfaces until we find the one that
|
||||
* "owns" this connection - i.e., it is handling the
|
||||
* incoming address space
|
||||
*/
|
||||
found = false;
|
||||
for(i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)){
|
||||
/* see if the incoming address is within the address space
|
||||
* of this interface
|
||||
*/
|
||||
if (OPAL_SUCCESS != opal_ifindextoaddr(i, &modaddr, sizeof(modaddr))) {
|
||||
continue;
|
||||
}
|
||||
if (OPAL_SUCCESS != opal_ifindextomask(i, &netmask, sizeof(netmask))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_net_samenetwork((struct sockaddr*)&new_connection->addr, &modaddr, netmask)) {
|
||||
/* lookup the corresponding kernel index of this interface */
|
||||
i = opal_ifindextokindex(i);
|
||||
/* get the module */
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
/* process the connection */
|
||||
mod->api.accept_connection((struct mca_oob_tcp_module_t*)mod, new_connection->fd,
|
||||
(struct sockaddr*) &(new_connection->addr));
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
/* let's just assign it to the first module we have as we
|
||||
* can't do any better - this is just an assignment to a
|
||||
* specific software module to handle the connection
|
||||
*/
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL != (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
/* process the connection */
|
||||
mod->api.accept_connection((struct mca_oob_tcp_module_t*)mod, new_connection->fd,
|
||||
(struct sockaddr*) &(new_connection->addr));
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
close(new_connection->fd); // close this so the remote end hangs up
|
||||
opal_output(0, "%s CONNECTION REQUEST ON UNKNOWN INTERFACE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
/* process the connection */
|
||||
mca_oob_tcp_module.api.accept_connection(new_connection->fd,
|
||||
(struct sockaddr*) &(new_connection->addr));
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(new_connection);
|
||||
}
|
||||
@ -870,11 +816,6 @@ static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
|
||||
struct sockaddr addr;
|
||||
opal_socklen_t addrlen = sizeof(struct sockaddr);
|
||||
int sd;
|
||||
int i;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
bool found;
|
||||
struct sockaddr modaddr;
|
||||
uint32_t netmask;
|
||||
|
||||
sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
@ -907,55 +848,8 @@ static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
|
||||
return;
|
||||
}
|
||||
|
||||
/* cycle across all interfaces untile we find the one that
|
||||
* "owns" this connection - i.e., it is handling the
|
||||
* incoming address space
|
||||
*/
|
||||
found = false;
|
||||
for(i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)){
|
||||
/* see if the incoming address is within the address space
|
||||
* of this interface
|
||||
*/
|
||||
if (OPAL_SUCCESS != opal_ifindextoaddr(i, &modaddr, sizeof(modaddr))) {
|
||||
continue;
|
||||
}
|
||||
if (OPAL_SUCCESS != opal_ifindextomask(i, &netmask, sizeof(netmask))) {
|
||||
continue;
|
||||
}
|
||||
if (opal_net_samenetwork((struct sockaddr*)&addr, &modaddr, netmask)) {
|
||||
/* lookup the corresponding kernel index of this interface */
|
||||
i = opal_ifindextokindex(i);
|
||||
/* get the module */
|
||||
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
/* process the connection */
|
||||
mod->api.accept_connection((struct mca_oob_tcp_module_t*)mod, sd, &addr);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
/* let's just assign it to the first module we have as we
|
||||
* can't do any better - this is just an assignment to a
|
||||
* specific software module to handle the connection
|
||||
*/
|
||||
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
|
||||
if (NULL != (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
|
||||
/* process the connection */
|
||||
mod->api.accept_connection((struct mca_oob_tcp_module_t*)mod, sd, &addr);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
close(sd); // close this so the remote end hangs up
|
||||
opal_output(0, "%s CONNECTION REQUEST ON UNKNOWN INTERFACE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
}
|
||||
/* process the connection */
|
||||
mca_oob_tcp_module.api.accept_connection(sd, &addr);
|
||||
}
|
||||
|
||||
|
||||
|
@ -35,14 +35,9 @@ typedef struct {
|
||||
} mca_oob_tcp_addr_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_addr_t);
|
||||
|
||||
/* object for tracking peers in modules */
|
||||
/* object for tracking peers in the module */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
/* we frequently need access to
|
||||
* the module (e.g., its event
|
||||
* base) to activate subsequent peer states
|
||||
*/
|
||||
mca_oob_tcp_module_t *mod;
|
||||
/* although not required, there is enough debug
|
||||
* value that retaining the name makes sense
|
||||
*/
|
||||
@ -67,7 +62,6 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_t);
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
orte_process_name_t peer;
|
||||
uint16_t af_family;
|
||||
char *net;
|
||||
@ -75,37 +69,35 @@ typedef struct {
|
||||
} mca_oob_tcp_peer_op_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_op_t);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_PEER_OP(m, p, a, n, pts, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_peer_op_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_peer_op_t); \
|
||||
pop->mod = (m); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
pop->af_family = (a); \
|
||||
if (NULL != (n)) { \
|
||||
pop->net = strdup((n)); \
|
||||
} \
|
||||
if (NULL != (pts)) { \
|
||||
pop->port = strdup((pts)); \
|
||||
} \
|
||||
opal_event_set((m)->ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
#define ORTE_ACTIVATE_TCP_PEER_OP(p, a, n, pts, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_peer_op_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_peer_op_t); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
pop->af_family = (a); \
|
||||
if (NULL != (n)) { \
|
||||
pop->net = strdup((n)); \
|
||||
} \
|
||||
if (NULL != (pts)) { \
|
||||
pop->port = strdup((pts)); \
|
||||
} \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_CMP_OP(m, p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_peer_op_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_peer_op_t); \
|
||||
pop->mod = (m); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
opal_event_set(orte_event_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
#define ORTE_ACTIVATE_TCP_CMP_OP(p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_peer_op_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_peer_op_t); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#endif /* _MCA_OOB_TCP_PEER_H_ */
|
||||
|
@ -12,6 +12,7 @@
|
||||
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2014 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -32,22 +33,20 @@
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
orte_process_name_t peer;
|
||||
} mca_oob_tcp_ping_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_ping_t);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_PING(m, p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_ping_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_ping_t); \
|
||||
pop->mod = (m); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
opal_event_set((m)->ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
#define ORTE_ACTIVATE_TCP_PING(p, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_ping_t *pop; \
|
||||
pop = OBJ_NEW(mca_oob_tcp_ping_t); \
|
||||
pop->peer.jobid = (p)->jobid; \
|
||||
pop->peer.vpid = (p)->vpid; \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &pop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), pop); \
|
||||
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#endif /* _MCA_OOB_TCP_PING_H_ */
|
||||
|
@ -123,7 +123,6 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
|
||||
mca_oob_tcp_send_t* msg = peer->send_msg;
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)peer->mod;
|
||||
int rc;
|
||||
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
@ -138,7 +137,7 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
|
||||
"%s tcp:send_handler %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
mca_oob_tcp_state_print(peer->state));
|
||||
mca_oob_tcp_peer_complete_connect(mod, peer);
|
||||
mca_oob_tcp_peer_complete_connect(peer);
|
||||
/* de-activate the send event until the connection
|
||||
* handshake completes
|
||||
*/
|
||||
@ -374,7 +373,7 @@ static int read_bytes(mca_oob_tcp_peer_t* peer)
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
peer->recv_msg = NULL;
|
||||
}
|
||||
mca_oob_tcp_peer_close((mca_oob_tcp_module_t*)peer->mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
//if (NULL != mca_oob_tcp.oob_exception_callback) {
|
||||
// mca_oob_tcp.oob_exception_callback(&peer->peer_name, ORTE_RML_PEER_DISCONNECTED);
|
||||
//}
|
||||
@ -397,7 +396,6 @@ static int read_bytes(mca_oob_tcp_peer_t* peer)
|
||||
void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
|
||||
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)peer->mod;
|
||||
int rc;
|
||||
orte_process_name_t hop;
|
||||
mca_oob_tcp_peer_t *relay;
|
||||
@ -414,7 +412,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
|
||||
switch (peer->state) {
|
||||
case MCA_OOB_TCP_CONNECT_ACK:
|
||||
if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer, peer->sd, NULL))) {
|
||||
if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(peer, peer->sd, NULL))) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp:recv:handler starting send/recv events",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
@ -504,7 +502,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s:tcp:recv:handler error reading bytes - closing connection",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
mca_oob_tcp_peer_close(mod, peer);
|
||||
mca_oob_tcp_peer_close(peer);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -549,22 +547,21 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
/* let the component know about the problem */
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(mod, NULL, peer->recv_msg, &hop, mca_oob_tcp_component_no_route);
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_no_route);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
return;
|
||||
} else {
|
||||
/* does this module know how to reach the next hop? */
|
||||
/* does we know how to reach the next hop? */
|
||||
memcpy(&ui64, (char*)&hop, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mod->peers, ui64, (void**)&relay)) {
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_module.peers, ui64, (void**)&relay)) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s ADDRESS OF NEXT HOP %s TO %s IS UNKNOWN VIA MODULE %s",
|
||||
"%s ADDRESS OF NEXT HOP %s TO %s IS UNKNOWN",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&hop),
|
||||
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
|
||||
peer->mod->if_name);
|
||||
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
|
||||
/* let the component know about the problem */
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(mod, NULL, peer->recv_msg, &hop, mca_oob_tcp_component_hop_unknown);
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_hop_unknown);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
return;
|
||||
|
@ -85,8 +85,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
/* if we aren't connected, then start connecting */ \
|
||||
if (MCA_OOB_TCP_CONNECTED != (p)->state) { \
|
||||
(p)->state = MCA_OOB_TCP_CONNECTING; \
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE((p)->mod, (p), \
|
||||
mca_oob_tcp_peer_try_connect); \
|
||||
ORTE_ACTIVATE_TCP_CONN_STATE((p), mca_oob_tcp_peer_try_connect); \
|
||||
} else { \
|
||||
/* ensure the send event is active */ \
|
||||
if (!(p)->send_ev_active) { \
|
||||
@ -220,12 +219,11 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
orte_rml_send_t *msg;
|
||||
} mca_oob_tcp_msg_op_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_POST_SEND(m, ms, cbfunc) \
|
||||
#define ORTE_ACTIVATE_TCP_POST_SEND(ms, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_msg_op_t *mop; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
@ -234,9 +232,8 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT(&((ms)->dst))); \
|
||||
mop = OBJ_NEW(mca_oob_tcp_msg_op_t); \
|
||||
mop->mod = (m); \
|
||||
mop->msg = (ms); \
|
||||
opal_event_set((m)->ev_base, &mop->ev, -1, \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
|
||||
@ -245,14 +242,13 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_op_t);
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
mca_oob_tcp_module_t *mod;
|
||||
orte_rml_send_t *rmsg;
|
||||
mca_oob_tcp_send_t *snd;
|
||||
orte_process_name_t hop;
|
||||
} mca_oob_tcp_msg_error_t;
|
||||
OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_MSG_ERROR(m, s, r, h, cbfunc) \
|
||||
#define ORTE_ACTIVATE_TCP_MSG_ERROR(s, r, h, cbfunc) \
|
||||
do { \
|
||||
mca_oob_tcp_msg_error_t *mop; \
|
||||
mca_oob_tcp_send_t *snd; \
|
||||
@ -263,7 +259,6 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT((h))); \
|
||||
mop = OBJ_NEW(mca_oob_tcp_msg_error_t); \
|
||||
mop->mod = (m); \
|
||||
if (NULL != (s)) { \
|
||||
mop->snd = (s); \
|
||||
} else if (NULL != (r)) { \
|
||||
@ -285,6 +280,7 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
} \
|
||||
mop->hop.jobid = (h)->jobid; \
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the OOB framework, so use that event base */ \
|
||||
opal_event_set(orte_event_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
@ -300,16 +296,15 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT(&((mop)->hop))); \
|
||||
mp = OBJ_NEW(mca_oob_tcp_msg_error_t); \
|
||||
mp->mod = (mop)->mod; \
|
||||
mp->snd = (mop)->snd; \
|
||||
mp->hop = (mop)->hop; \
|
||||
opal_event_set(mp->mod->ev_base, &mp->ev, -1, \
|
||||
opal_event_set(mca_oob_tcp_module.ev_base, &mp->ev, -1, \
|
||||
OPAL_EV_WRITE, (cbfunc), mp); \
|
||||
opal_event_set_priority(&mp->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&mp->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
#define ORTE_ACTIVATE_TCP_NO_ROUTE(m, r, h, c) \
|
||||
#define ORTE_ACTIVATE_TCP_NO_ROUTE(r, h, c) \
|
||||
do { \
|
||||
mca_oob_tcp_msg_error_t *mop; \
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
|
||||
@ -318,10 +313,10 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_error_t);
|
||||
__FILE__, __LINE__, \
|
||||
ORTE_NAME_PRINT((h))); \
|
||||
mop = OBJ_NEW(mca_oob_tcp_msg_error_t); \
|
||||
mop->mod = (m); \
|
||||
mop->rmsg = (r); \
|
||||
mop->hop.jobid = (h)->jobid; \
|
||||
mop->hop.vpid = (h)->vpid; \
|
||||
/* this goes to the OOB framework, so use that event base */ \
|
||||
opal_event_set(orte_event_base, &mop->ev, -1, \
|
||||
OPAL_EV_WRITE, (c), mop); \
|
||||
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user