Btl tcp: Fix racing condition on simultaneous handshake
Their is racing condition in TCP connection establishment during simultaneous handshake. This PR handles the fix for it. Signed-off-by: Mohan Gandhi <mohgan@amazon.com>
Этот коммит содержится в:
родитель
8e99b43084
Коммит
6d642e8d94
@ -546,8 +546,9 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
||||
int retval = recv(sd, ((char *)ptr) + cnt, size - cnt, 0);
|
||||
/* remote closed connection */
|
||||
if (0 == retval) {
|
||||
BTL_ERROR(("remote peer unexpectedly closed connection while I was waiting for blocking message"));
|
||||
return -1;
|
||||
OPAL_OUTPUT_VERBOSE((100, opal_btl_base_framework.framework_output,
|
||||
"remote peer unexpectedly closed connection while I was waiting for a blocking message"));
|
||||
break;
|
||||
}
|
||||
|
||||
/* socket is non-blocking so handle errors */
|
||||
@ -556,7 +557,7 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
||||
opal_socket_errno != EAGAIN &&
|
||||
opal_socket_errno != EWOULDBLOCK) {
|
||||
BTL_ERROR(("recv(%d) failed: %s (%d)", sd, strerror(opal_socket_errno), opal_socket_errno));
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@ -568,8 +569,8 @@ int mca_btl_tcp_recv_blocking(int sd, void* data, size_t size)
|
||||
|
||||
/*
|
||||
* A blocking send on a non-blocking socket. Used to send the small
|
||||
* amount of connection information that identifies the endpoints
|
||||
* endpoint.
|
||||
* amount of connection information used during the initial handshake
|
||||
* (magic string plus process guid)
|
||||
*/
|
||||
|
||||
int mca_btl_tcp_send_blocking(int sd, const void* data, size_t size)
|
||||
|
@ -1355,12 +1355,10 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
char str[128];
|
||||
|
||||
/* Note, Socket will be in blocking mode during intial handshake
|
||||
* hence setting SO_RCVTIMEO to say 2 seconds here to avoid chance
|
||||
* of spin forever if it tries to connect to old version
|
||||
* as older version will send just process id which won't be long enough
|
||||
* to cross sizeof(str) length + process id struct
|
||||
* or when the remote side isn't OMPI where it's not going to send
|
||||
* any data*/
|
||||
* hence setting SO_RCVTIMEO to say 2 seconds here to avoid waiting
|
||||
* forever when connecting to older versions (that reply to the
|
||||
* handshake with only the guid) or when the remote side isn't OMPI
|
||||
*/
|
||||
|
||||
/* get the current timeout value so we can reset to it */
|
||||
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &rcvtimeo_save_len)) {
|
||||
@ -1387,7 +1385,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
|
||||
OBJ_RELEASE(event);
|
||||
retval = mca_btl_tcp_recv_blocking(sd, (void *)&hs_msg, sizeof(hs_msg));
|
||||
guid = hs_msg.guid;
|
||||
|
||||
/* An unknown process attempted to connect to Open MPI via TCP.
|
||||
* Open MPI uses a "magic" string to trivially verify that the connecting
|
||||
@ -1413,6 +1410,8 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||
CLOSE_THE_SOCKET(sd);
|
||||
return;
|
||||
}
|
||||
|
||||
guid = hs_msg.guid;
|
||||
if (0 != strncmp(hs_msg.magic_id, mca_btl_tcp_magic_id_string, len)) {
|
||||
opal_output_verbose(20, opal_btl_base_framework.framework_output,
|
||||
"process did not receive right magic string. "
|
||||
|
@ -574,20 +574,6 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* A blocking recv on a non-blocking socket. Used to receive the small
|
||||
* amount of connection information that identifies the remote endpoint (guid).
|
||||
*/
|
||||
static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
|
||||
{
|
||||
int ret = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, data, size);
|
||||
if (ret <= 0) {
|
||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Receive the endpoints globally unique process identification from a newly
|
||||
* connected socket and verify the expected response. If so, move the
|
||||
@ -604,9 +590,10 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
|
||||
opal_process_name_t guid;
|
||||
|
||||
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
|
||||
retval = mca_btl_tcp_endpoint_recv_blocking(btl_endpoint, &hs_msg, sizeof(hs_msg));
|
||||
retval = mca_btl_tcp_recv_blocking(btl_endpoint->endpoint_sd, &hs_msg, sizeof(hs_msg));
|
||||
|
||||
if (sizeof(hs_msg) != retval) {
|
||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||
if (0 == retval) {
|
||||
/* If we get zero bytes, the peer closed the socket. This
|
||||
can happen when the two peers started the connection
|
||||
|
@ -486,10 +486,10 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
|
||||
}
|
||||
|
||||
/*
|
||||
* in case one of the peer addresses is already in use,
|
||||
* in case the peer address has all intended connections,
|
||||
* mark the complete peer interface as 'not available'
|
||||
*/
|
||||
if(endpoint_addr->addr_inuse) {
|
||||
if(endpoint_addr->addr_inuse >= mca_btl_tcp_component.tcp_num_links) {
|
||||
peer_interfaces[index]->inuse = 1;
|
||||
}
|
||||
|
||||
@ -812,7 +812,9 @@ void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
|
||||
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
||||
for( size_t i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
|
||||
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
|
||||
/* Check all conditions before going to try to accept the connection. */
|
||||
/* We are not here to make a decision about what is good socket
|
||||
* and what is not. We simply check that this socket fit the endpoint
|
||||
* end we prepare for the real decision function mca_btl_tcp_endpoint_accept. */
|
||||
if( btl_endpoint->endpoint_addr->addr_family != addr->sa_family ) {
|
||||
continue;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user