Merge pull request #4295 from mohanasudhan/iss4131
Btl tcp: Fix racing condition on simultaneous handshake
Этот коммит содержится в:
Коммит
fa01fad2ca
2
.mailmap
2
.mailmap
@ -109,3 +109,5 @@ Thomas Naughton <naughtont@ornl.gov> <naughtont@ornl.gov>
|
|||||||
Geoffrey Paulsen <gpaulsen@us.ibm.com> <gpaulsen@users.noreply.github.com>
|
Geoffrey Paulsen <gpaulsen@us.ibm.com> <gpaulsen@users.noreply.github.com>
|
||||||
|
|
||||||
Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>
|
Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>
|
||||||
|
|
||||||
|
Mohan Gandhi <mohgan@amazon.com>
|
||||||
|
@ -546,8 +546,9 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
|||||||
int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0);
|
int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0);
|
||||||
/* remote closed connection */
|
/* remote closed connection */
|
||||||
if (0 == retval) {
|
if (0 == retval) {
|
||||||
BTL_ERROR(("remote peer unexpectedly closed connection while I was waiting for blocking message"));
|
OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output,
|
||||||
return -1;
|
"remote peer unexpectedly closed connection while I was waiting for a blocking message"));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* socket is non-blocking so handle errors */
|
/* socket is non-blocking so handle errors */
|
||||||
@ -556,7 +557,7 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
|||||||
opal_socket_errno != EAGAIN &&
|
opal_socket_errno != EAGAIN &&
|
||||||
opal_socket_errno != EWOULDBLOCK) {
|
opal_socket_errno != EWOULDBLOCK) {
|
||||||
BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno));
|
BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno));
|
||||||
return -1;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -568,8 +569,8 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* A blocking send on a non-blocking socket. Used to send the small
|
* A blocking send on a non-blocking socket. Used to send the small
|
||||||
* amount of connection information that identifies the endpoints
|
* amount of connection information used during the initial handshake
|
||||||
* endpoint.
|
* (magic string plus process guid)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size)
|
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size)
|
||||||
|
@ -1355,12 +1355,10 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
|||||||
char str[128];
|
char str[128];
|
||||||
|
|
||||||
/* Note, Socket will be in blocking mode during intial handshake
|
/* Note, Socket will be in blocking mode during intial handshake
|
||||||
* hence setting SO_RCVTIMEO to say 2 seconds here to avoid chance
|
* hence setting SO_RCVTIMEO to say 2 seconds here to avoid waiting
|
||||||
* of spin forever if it tries to connect to old version
|
* forever when connecting to older versions (that reply to the
|
||||||
* as older version will send just process id which won't be long enough
|
* handshake with only the guid) or when the remote side isn't OMPI
|
||||||
* to cross sizeof(str) length + process id struct
|
*/
|
||||||
* or when the remote side isn't OMPI where it's not going to send
|
|
||||||
* any data*/
|
|
||||||
|
|
||||||
/* get the current timeout value so we can reset to it */
|
/* get the current timeout value so we can reset to it */
|
||||||
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &rcvtimeo_save_len)) {
|
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &rcvtimeo_save_len)) {
|
||||||
@ -1387,7 +1385,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
|||||||
|
|
||||||
OBJ_RELEASE(event);
|
OBJ_RELEASE(event);
|
||||||
retval = mca_btl_tcp_recv_blocking(sd, (void *)&hs_msg, sizeof(hs_msg));
|
retval = mca_btl_tcp_recv_blocking(sd, (void *)&hs_msg, sizeof(hs_msg));
|
||||||
guid = hs_msg.guid;
|
|
||||||
|
|
||||||
/* An unknown process attempted to connect to Open MPI via TCP.
|
/* An unknown process attempted to connect to Open MPI via TCP.
|
||||||
* Open MPI uses a "magic" string to trivially verify that the connecting
|
* Open MPI uses a "magic" string to trivially verify that the connecting
|
||||||
@ -1413,6 +1410,8 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
|||||||
CLOSE_THE_SOCKET(sd);
|
CLOSE_THE_SOCKET(sd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
guid = hs_msg.guid;
|
||||||
if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
|
if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
|
||||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||||
"process did not receive right magic string. "
|
"process did not receive right magic string. "
|
||||||
|
@ -574,20 +574,6 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* A blocking recv on a non-blocking socket. Used to receive the small
|
|
||||||
* amount of connection information that identifies the remote endpoint (guid).
|
|
||||||
*/
|
|
||||||
static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
|
|
||||||
{
|
|
||||||
int ret = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, data, size);
|
|
||||||
if (ret <= 0) {
|
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Receive the endpoints globally unique process identification from a newly
|
* Receive the endpoints globally unique process identification from a newly
|
||||||
* connected socket and verify the expected response. If so, move the
|
* connected socket and verify the expected response. If so, move the
|
||||||
@ -604,9 +590,10 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
|
|||||||
opal_process_name_t guid;
|
opal_process_name_t guid;
|
||||||
|
|
||||||
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
||||||
retval = mca_btl_tcp_endpoint_recv_blocking(btl_endpoint, &hs_msg, sizeof(hs_msg));
|
retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
|
||||||
|
|
||||||
if (sizeof(hs_msg) != retval) {
|
if (sizeof(hs_msg) != retval) {
|
||||||
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
if (0 == retval) {
|
if (0 == retval) {
|
||||||
/* If we get zero bytes, the peer closed the socket. This
|
/* If we get zero bytes, the peer closed the socket. This
|
||||||
can happen when the two peers started the connection
|
can happen when the two peers started the connection
|
||||||
|
@ -486,10 +486,10 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* in case one of the peer addresses is already in use,
|
* in case the peer address has all intended connections,
|
||||||
* mark the complete peer interface as 'not available'
|
* mark the complete peer interface as 'not available'
|
||||||
*/
|
*/
|
||||||
if(endpoint_addr->addr_inuse) {
|
if(endpoint_addr->addr_inuse >= mca_btl_tcp_component.tcp_num_links) {
|
||||||
peer_interfaces[index]->inuse = 1;
|
peer_interfaces[index]->inuse = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -812,7 +812,9 @@ void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
|
|||||||
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
||||||
for( size_t i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
|
for( size_t i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
|
||||||
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
|
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
|
||||||
/* Check all conditions before going to try to accept the connection. */
|
/* We are not here to make a decision about what is good socket
|
||||||
|
* and what is not. We simply check that this socket fit the endpoint
|
||||||
|
* end we prepare for the real decision function mca_btl_tcp_endpoint_accept. */
|
||||||
if( btl_endpoint->endpoint_addr->addr_family != addr->sa_family ) {
|
if( btl_endpoint->endpoint_addr->addr_family != addr->sa_family ) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user