1
1

Try to resolve the simultaneous connection problem by being a little more careful about the choice of returned status when a connection is refused. As before, have the higher vpid of the two peers retry the connection, while the lower one waits. This can happen in a couple of places, so try to hit them all. Since this is hard to test, will ask Gilles to give it a try since he's the one who is seeing it.

cmr=v1.8.3:reviewer=rhc

This commit was SVN r32744.
Этот коммит содержится в:
Ralph Castain 2014-09-16 18:59:36 +00:00
родитель a74428513d
Коммит 4bbc9a28d6
2 изменённых файлов: 84 добавлений и 38 удалений

Просмотреть файл

@ -292,6 +292,19 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
/* send our globally unique process identifier to the peer */
if (ORTE_SUCCESS == (rc = tcp_peer_send_connect_ack(peer))) {
peer->state = MCA_OOB_TCP_CONNECT_ACK;
} else if (ORTE_ERR_UNREACH == rc) {
/* this could happen if we are in a race condition where both
* we and the peer are trying to connect at the same time. If I
* am the higher vpid, then retry the connection - otherwise,
* step aside for now */
if (ORTE_PROC_MY_NAME->vpid > peer->name.vpid) {
connected = false;
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
} else {
peer->state = MCA_OOB_TCP_UNCONNECTED;
}
return;
} else {
opal_output(0,
"%s orte_tcp_peer_try_connect: "
@ -355,7 +368,6 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
/* send it */
if (ORTE_SUCCESS != tcp_peer_send_blocking(peer->sd, msg, sdsize)) {
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
free(msg);
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(peer);
@ -521,12 +533,47 @@ static int tcp_peer_send_blocking(int sd, void* data, size_t size)
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static void retry(mca_oob_tcp_peer_t* peer, int sd)
{
int cmpval;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
if (NULL != peer->active_addr) {
peer->active_addr->retries = 0;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
}
}
int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
int sd, mca_oob_tcp_hdr_t *dhdr)
{
char *msg;
char *version;
int rc, cmpval;
int rc;
opal_sec_cred_t creds;
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_peer_t *peer;
@ -559,6 +606,18 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (NULL != peer &&
(MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state)) {
retry(peer, sd);
}
return ORTE_ERR_UNREACH;
}
@ -594,7 +653,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
} else {
CLOSE_THE_SOCKET(sd);
}
return ORTE_ERR_UNREACH;
return ORTE_ERR_COMM_FAILURE;
}
/* if we don't already have it, get the peer */
@ -611,7 +670,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_module.peers, (*ui64), peer)) {
OBJ_RELEASE(peer);
CLOSE_THE_SOCKET(sd);
return ORTE_ERR_UNREACH;
return ORTE_ERR_OUT_OF_RESOURCE;
}
} else {
/* check for a race condition - if I was in the process of
@ -623,37 +682,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr.origin));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
if (NULL != peer->active_addr) {
peer->active_addr->retries = 0;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
return ORTE_ERR_UNREACH;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
return ORTE_ERR_UNREACH;
}
retry(peer, sd);
return ORTE_ERR_UNREACH;
}
}
} else {
@ -667,7 +697,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
ORTE_NAME_PRINT(&(peer->name)));
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(peer);
return ORTE_ERR_UNREACH;
return ORTE_ERR_CONNECTION_REFUSED;
}
}
@ -688,6 +718,17 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
retry(peer, sd);
}
free(msg);
return ORTE_ERR_UNREACH;
}
@ -703,7 +744,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(peer);
free(msg);
return ORTE_ERR_UNREACH;
return ORTE_ERR_CONNECTION_REFUSED;
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
@ -716,6 +757,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
creds.size = hdr.nbytes - strlen(version) - 1;
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) {
ORTE_ERROR_LOG(rc);
free(msg);
return ORTE_ERR_CONNECTION_REFUSED;
}
free(msg);

Просмотреть файл

@ -443,7 +443,10 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
}
/* update our state */
peer->state = MCA_OOB_TCP_CONNECTED;
} else {
} else if (ORTE_ERR_UNREACH != rc) {
/* we get an unreachable error returned if a connection
* completes but is rejected - otherwise, we don't want
* to terminate as we might be retrying the connection */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),