1
1
Mimick the btl/tcp protocol to solve the race condition that happens
when two peers try to connect to each other at the same time

cmr=v1.8.4:reviewer=rhc

This commit was SVN r32799.
Этот коммит содержится в:
Gilles Gouaillardet 2014-09-26 06:54:30 +00:00
родитель e0eb1f2e73
Коммит 9661e4537f

Просмотреть файл

@ -87,7 +87,7 @@ static int tcp_peer_create_socket(mca_oob_tcp_peer_t* peer)
{
int flags;
if (peer->sd > 0) {
if (peer->sd >= 0) {
return ORTE_SUCCESS;
}
@ -311,8 +311,8 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
* we and the peer are trying to connect at the same time. If I
* am the higher vpid, then retry the connection - otherwise,
* step aside for now */
if (ORTE_PROC_MY_NAME->vpid > peer->name.vpid) {
connected = false;
int cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_NAME, &peer->name);
if (OPAL_VALUE1_GREATER == cmpval) {
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
} else {
@ -398,6 +398,7 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
{
if (peer->sd >= 0) {
assert(!peer->send_ev_active && !peer->recv_ev_active);
opal_event_set(mca_oob_tcp_module.ev_base,
&peer->recv_event,
peer->sd,
@ -547,7 +548,7 @@ static int tcp_peer_send_blocking(int sd, void* data, size_t size)
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static void retry(mca_oob_tcp_peer_t* peer, int sd)
static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
{
int cmpval;
@ -555,30 +556,49 @@ static void retry(mca_oob_tcp_peer_t* peer, int sd)
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
if (NULL != peer->active_addr) {
peer->active_addr->retries = 0;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
if (fatal) {
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(peer->sd);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
}
return true;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(peer, mca_oob_tcp_peer_try_connect);
if (OPAL_VALUE1_GREATER == cmpval) {
/* The other end will retry the connection */
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
CLOSE_THE_SOCKET(peer->sd);
peer->state = MCA_OOB_TCP_UNCONNECTED;
return false;
} else {
/* The connection will be retried */
CLOSE_THE_SOCKET(sd);
return true;
}
}
}
@ -629,8 +649,9 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
if (NULL != peer &&
(MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state)) {
retry(peer, sd);
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
MCA_OOB_TCP_CLOSED == peer->state)) {
retry(peer, sd, false);
}
return ORTE_ERR_UNREACH;
}
@ -696,8 +717,9 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
retry(peer, sd);
return ORTE_ERR_UNREACH;
if (retry(peer, sd, false)) {
return ORTE_ERR_UNREACH;
}
}
}
} else {
@ -727,7 +749,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (!tcp_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) {
/* unable to complete the recv */
/* unable to complete the recv but should never happen */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -741,7 +763,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
retry(peer, sd);
retry(peer, sd, true);
}
free(msg);
return ORTE_ERR_UNREACH;
@ -1088,11 +1110,6 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer)
opal_event_add(&peer->recv_event, 0);
peer->recv_ev_active = true;
}
/* if a message is waiting to be sent, ensure the send event is active */
if (NULL != peer->send_msg && !peer->send_ev_active) {
opal_event_add(&peer->send_event, 0);
peer->send_ev_active = true;
}
if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
mca_oob_tcp_peer_dump(peer, "accepted");
}