diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 2b56f3adb2..e6fd097386 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -292,6 +292,19 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata) /* send our globally unique process identifier to the peer */ if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(peer))) { peer->state = MCA_OOB_TCP_CONNECT_ACK; + } else if (ORTE_ERR_UNREACH == rc) { + /* this could happen if we are in a race condition where both + * we and the peer are trying to connect at the same time. If I + * am the higher vpid, then retry the connection - otherwise, + * step aside for now */ + if (ORTE_PROC_MY_NAME->vpid > peer->name.vpid) { + connected = false; + peer->state = MCA_OOB_TCP_CONNECTING; + ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); + } else { + peer->state = MCA_OOB_TCP_UNCONNECTED; + } + return; } else { opal_output(0, "%s orte_tcp_peer_try_connect: " @@ -355,7 +368,6 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) /* send it */ if (ORTE_SUCCESS != tcp_peer_send_blocking(peer->sd, msg, sdsize)) { - ORTE_ERROR_LOG(ORTE_ERR_UNREACH); free(msg); peer->state = MCA_OOB_TCP_FAILED; mca_oob_tcp_peer_close(peer); @@ -521,12 +533,47 @@ static int tcp_peer_send_blocking(int sd, void* data, size_t size) * connected socket and verify the expected response. If so, move the * socket to a connected state. */ +static void retry(mca_oob_tcp_peer_t* peer, int sd) +{ + int cmpval; + + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SIMUL CONNECTION WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(sd); + if (NULL != peer->active_addr) { + peer->active_addr->retries = 0; + } + cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + } else { + /* retry the connection */ + peer->state = MCA_OOB_TCP_CONNECTING; + ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); + } +} + int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, int sd, mca_oob_tcp_hdr_t *dhdr) { char *msg; char *version; - int rc, cmpval; + int rc; opal_sec_cred_t creds; mca_oob_tcp_hdr_t hdr; mca_oob_tcp_peer_t *peer; @@ -559,6 +606,18 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (NULL != peer && + (MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state)) { + retry(peer, sd); + } return ORTE_ERR_UNREACH; } @@ -594,7 +653,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, } else { CLOSE_THE_SOCKET(sd); } - return ORTE_ERR_UNREACH; + return ORTE_ERR_COMM_FAILURE; } /* if we don't already have it, get the peer */ @@ -611,7 +670,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) { OBJ_RELEASE(peer); CLOSE_THE_SOCKET(sd); - return ORTE_ERR_UNREACH; + return ORTE_ERR_OUT_OF_RESOURCE; } } else { /* check for a race condition - if I was in the process of @@ -623,37 +682,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, if (MCA_OOB_TCP_CONNECTED == peer->state || MCA_OOB_TCP_CONNECTING == peer->state || MCA_OOB_TCP_CONNECT_ACK == peer->state) { - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s SIMUL CONNECTION WITH %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr.origin)); - if (peer->recv_ev_active) { - opal_event_del(&peer->recv_event); - peer->recv_ev_active = false; - } - if (peer->send_ev_active) { - opal_event_del(&peer->send_event); - peer->send_ev_active = false; - } - if (0 < peer->sd) { - CLOSE_THE_SOCKET(peer->sd); - peer->sd = -1; - } - CLOSE_THE_SOCKET(sd); - if (NULL != peer->active_addr) { - peer->active_addr->retries = 0; - } - cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME); - if (OPAL_VALUE1_GREATER == cmpval) { - /* force the other end to retry the connection */ - peer->state = MCA_OOB_TCP_UNCONNECTED; - return ORTE_ERR_UNREACH; - } else { - /* retry the connection */ - peer->state = MCA_OOB_TCP_CONNECTING; - ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect); - return ORTE_ERR_UNREACH; - } + retry(peer, sd); + return ORTE_ERR_UNREACH; } } } else { @@ -667,7 +697,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, ORTE_NAME_PRINT(&(peer->name))); peer->state = MCA_OOB_TCP_FAILED; mca_oob_tcp_peer_close(peer); - return ORTE_ERR_UNREACH; + return ORTE_ERR_CONNECTION_REFUSED; } } @@ -688,6 +718,17 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name), peer->sd); + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state) { + retry(peer, sd); + } free(msg); return ORTE_ERR_UNREACH; } @@ -703,7 +744,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, peer->state = MCA_OOB_TCP_FAILED; mca_oob_tcp_peer_close(peer); free(msg); - return ORTE_ERR_UNREACH; + return ORTE_ERR_CONNECTION_REFUSED; } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, @@ -716,6 +757,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, creds.size = hdr.nbytes - strlen(version) - 1; if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) { ORTE_ERROR_LOG(rc); + free(msg); + return ORTE_ERR_CONNECTION_REFUSED; } free(msg); diff --git a/orte/mca/oob/tcp/oob_tcp_sendrecv.c b/orte/mca/oob/tcp/oob_tcp_sendrecv.c index 06496d3910..172ca1b4b5 100644 --- a/orte/mca/oob/tcp/oob_tcp_sendrecv.c +++ b/orte/mca/oob/tcp/oob_tcp_sendrecv.c @@ -443,7 +443,10 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) } /* update our state */ peer->state = MCA_OOB_TCP_CONNECTED; - } else { + } else if (ORTE_ERR_UNREACH != rc) { + /* we get an unreachable error returned if a connection + * completes but is rejected - otherwise, we don't want + * to terminate as we might be retrying the connection */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s UNABLE TO COMPLETE CONNECT ACK WITH %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),