tcp btl: Fix multiple-link connection establishment.
Fix case where the btl_tcp_links MCA parameter is used to create multiple TCP connections between peers. Three issues were resulting in hangs during large message transfer: * The 2nd..btl_tcp_link connections were dropped during establishment because the per-process address check was binary, rather than a count * The accept handler would not skip a btl module that was already in use, resulting in all connections for a given address being vectored to a single btl * Multiple addresses in the same subnet caused connections to be stalled, as the receiver would always use the same (first) address found. Binding the outgoing connection solves this issue * Lastly fix race condition created by connections being started at the exact same time by accpeting connections not in the closed state, allowing endpoint_accept to resolve dispute Signed-off-by: Jordan Cherry <cherryj@amazon.com>
Этот коммит содержится в:
родитель
d3a23f9518
Коммит
d7e7e3acb7
@ -167,7 +167,10 @@ struct mca_btl_tcp_module_t {
|
|||||||
#if 0
|
#if 0
|
||||||
int tcp_ifindex; /**< BTL interface index */
|
int tcp_ifindex; /**< BTL interface index */
|
||||||
#endif
|
#endif
|
||||||
struct sockaddr_storage tcp_ifaddr; /**< BTL interface address */
|
struct sockaddr_storage tcp_ifaddr; /**< First IPv4 address discovered for this interface, bound as sending address for this BTL */
|
||||||
|
#if OPAL_ENABLE_IPV6
|
||||||
|
struct sockaddr_storage tcp_ifaddr_6; /**< First IPv6 address discovered for this interface, bound as sending address for this BTL */
|
||||||
|
#endif
|
||||||
uint32_t tcp_ifmask; /**< BTL interface netmask */
|
uint32_t tcp_ifmask; /**< BTL interface netmask */
|
||||||
|
|
||||||
opal_mutex_t tcp_endpoints_mutex;
|
opal_mutex_t tcp_endpoints_mutex;
|
||||||
|
@ -511,6 +511,17 @@ static int mca_btl_tcp_create(int if_kindex, const char* if_name)
|
|||||||
btl->tcp_send_handler = 0;
|
btl->tcp_send_handler = 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
struct sockaddr_storage addr;
|
||||||
|
opal_ifkindextoaddr(if_kindex, (struct sockaddr*) &addr,
|
||||||
|
sizeof (struct sockaddr_storage));
|
||||||
|
#if OPAL_ENABLE_IPV6
|
||||||
|
if (addr.ss_family == AF_INET6) {
|
||||||
|
btl->tcp_ifaddr_6 = addr;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (addr.ss_family == AF_INET) {
|
||||||
|
btl->tcp_ifaddr = addr;
|
||||||
|
}
|
||||||
/* allow user to specify interface bandwidth */
|
/* allow user to specify interface bandwidth */
|
||||||
sprintf(param, "bandwidth_%s", if_name);
|
sprintf(param, "bandwidth_%s", if_name);
|
||||||
mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_bandwidth, OPAL_INFO_LVL_5, &btl->super.btl_bandwidth);
|
mca_btl_tcp_param_register_uint(param, NULL, btl->super.btl_bandwidth, OPAL_INFO_LVL_5, &btl->super.btl_bandwidth);
|
||||||
|
@ -718,6 +718,34 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
|
|||||||
/* start the connect - will likely fail with EINPROGRESS */
|
/* start the connect - will likely fail with EINPROGRESS */
|
||||||
mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
|
mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
|
||||||
|
|
||||||
|
/* Bind the socket to one of the addresses associated with
|
||||||
|
* this btl module. This sets the source IP to one of the
|
||||||
|
* addresses shared in modex, so that the destination rank
|
||||||
|
* can properly pair btl modules, even in cases where Linux
|
||||||
|
* might do something unexpected with routing */
|
||||||
|
opal_socklen_t sockaddr_addrlen = sizeof(struct sockaddr_storage);
|
||||||
|
if (endpoint_addr.ss_family == AF_INET) {
|
||||||
|
assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
|
||||||
|
if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr,
|
||||||
|
sockaddr_addrlen) < 0) {
|
||||||
|
BTL_ERROR(("bind() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno));
|
||||||
|
|
||||||
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
||||||
|
return OPAL_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#if OPAL_ENABLE_IPV6
|
||||||
|
if (endpoint_addr.ss_family == AF_INET6) {
|
||||||
|
assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr_6);
|
||||||
|
if (bind(btl_endpoint->endpoint_sd, (struct sockaddr*) &btl_endpoint->endpoint_btl->tcp_ifaddr_6,
|
||||||
|
sockaddr_addrlen) < 0) {
|
||||||
|
BTL_ERROR(("bind() failed: %s (%d)", strerror(opal_socket_errno), opal_socket_errno));
|
||||||
|
|
||||||
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
||||||
|
return OPAL_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
opal_output_verbose(10, opal_btl_base_framework.framework_output,
|
opal_output_verbose(10, opal_btl_base_framework.framework_output,
|
||||||
"btl: tcp: attempting to connect() to %s address %s on port %d",
|
"btl: tcp: attempting to connect() to %s address %s on port %d",
|
||||||
OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
|
OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
|
||||||
|
@ -486,7 +486,7 @@ int mca_btl_tcp_proc_insert( mca_btl_tcp_proc_t* btl_proc,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* in case the peer address has all intended connections,
|
* in case the peer address has created all intended connections,
|
||||||
* mark the complete peer interface as 'not available'
|
* mark the complete peer interface as 'not available'
|
||||||
*/
|
*/
|
||||||
if(endpoint_addr->addr_inuse >= mca_btl_tcp_component.tcp_num_links) {
|
if(endpoint_addr->addr_inuse >= mca_btl_tcp_component.tcp_num_links) {
|
||||||
@ -810,6 +810,9 @@ mca_btl_tcp_proc_t* mca_btl_tcp_proc_lookup(const opal_process_name_t *name)
|
|||||||
void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr, int sd)
|
void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr, int sd)
|
||||||
{
|
{
|
||||||
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
||||||
|
int found_match = 0;
|
||||||
|
mca_btl_base_endpoint_t* match_btl_endpoint;
|
||||||
|
|
||||||
for( size_t i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
|
for( size_t i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
|
||||||
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
|
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
|
||||||
/* We are not here to make a decision about what is good socket
|
/* We are not here to make a decision about what is good socket
|
||||||
@ -833,6 +836,10 @@ void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
|
|||||||
tmp[1], 16),
|
tmp[1], 16),
|
||||||
(int)i, (int)btl_proc->proc_endpoint_count);
|
(int)i, (int)btl_proc->proc_endpoint_count);
|
||||||
continue;
|
continue;
|
||||||
|
} else if (btl_endpoint->endpoint_state != MCA_BTL_TCP_CLOSED) {
|
||||||
|
found_match = 1;
|
||||||
|
match_btl_endpoint = btl_endpoint;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
#if OPAL_ENABLE_IPV6
|
#if OPAL_ENABLE_IPV6
|
||||||
@ -857,10 +864,20 @@ void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
|
|||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set state to CONNECTING to ensure that subsequent conenctions do not attempt to re-use endpoint in the num_links > 1 case*/
|
||||||
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
|
||||||
(void)mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd);
|
(void)mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd);
|
||||||
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
/* In this case the connection was inbound to an address exported, but was not in a CLOSED state.
|
||||||
|
* mca_btl_tcp_endpoint_accept() has logic to deal with the race condition that has likely caused this
|
||||||
|
* scenario, so call it here.*/
|
||||||
|
if (found_match) {
|
||||||
|
(void)mca_btl_tcp_endpoint_accept(match_btl_endpoint, addr, sd);
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
/* No further use of this socket. Close it */
|
/* No further use of this socket. Close it */
|
||||||
CLOSE_THE_SOCKET(sd);
|
CLOSE_THE_SOCKET(sd);
|
||||||
{
|
{
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user