Fixes #1793.
Reshape the tearing down process (connection close) to prevent race conditions between the main thread and the progress thread. Minor cleanups.
Этот коммит содержится в:
родитель
6de64ddbc1
Коммит
3adff9d323
@ -149,7 +149,7 @@ int mca_btl_tcp_del_procs(struct mca_btl_base_module_t* btl,
|
|||||||
{
|
{
|
||||||
mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
|
mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
|
||||||
size_t i;
|
size_t i;
|
||||||
for(i=0; i<nprocs; i++) {
|
for( i = 0; i < nprocs; i++ ) {
|
||||||
mca_btl_tcp_endpoint_t* tcp_endpoint = endpoints[i];
|
mca_btl_tcp_endpoint_t* tcp_endpoint = endpoints[i];
|
||||||
if(tcp_endpoint->endpoint_proc != mca_btl_tcp_proc_local()) {
|
if(tcp_endpoint->endpoint_proc != mca_btl_tcp_proc_local()) {
|
||||||
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
|
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
|
||||||
|
@ -259,7 +259,8 @@ static int mca_btl_tcp_component_register(void)
|
|||||||
" used to reduce the number of syscalls, by replacing them with memcpy."
|
" used to reduce the number of syscalls, by replacing them with memcpy."
|
||||||
" Every read will read the expected data plus the amount of the"
|
" Every read will read the expected data plus the amount of the"
|
||||||
" endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache);
|
" endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache);
|
||||||
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
|
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)",
|
||||||
|
0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
|
||||||
mca_btl_tcp_param_register_int( "port_min_v4",
|
mca_btl_tcp_param_register_int( "port_min_v4",
|
||||||
"The minimum port where the TCP BTL will try to bind (default 1024)",
|
"The minimum port where the TCP BTL will try to bind (default 1024)",
|
||||||
1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min);
|
1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min);
|
||||||
@ -299,9 +300,8 @@ static int mca_btl_tcp_component_register(void)
|
|||||||
opal_process_info.nodename,
|
opal_process_info.nodename,
|
||||||
mca_btl_tcp_component.tcp_if_seq,
|
mca_btl_tcp_component.tcp_if_seq,
|
||||||
"Progress thread support compiled out");
|
"Progress thread support compiled out");
|
||||||
}
|
}
|
||||||
#endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */
|
#endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */
|
||||||
|
|
||||||
mca_btl_tcp_component.report_all_unfound_interfaces = false;
|
mca_btl_tcp_component.report_all_unfound_interfaces = false;
|
||||||
(void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
|
(void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
|
||||||
"warn_all_unfound_interfaces",
|
"warn_all_unfound_interfaces",
|
||||||
@ -394,8 +394,46 @@ static int mca_btl_tcp_component_open(void)
|
|||||||
|
|
||||||
static int mca_btl_tcp_component_close(void)
|
static int mca_btl_tcp_component_close(void)
|
||||||
{
|
{
|
||||||
opal_list_item_t* item;
|
opal_list_item_t *item;
|
||||||
opal_list_item_t* next;
|
|
||||||
|
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
|
||||||
|
/**
|
||||||
|
* If we have a progress thread we should shut it down before
|
||||||
|
* moving forward with the TCP tearing down process.
|
||||||
|
*/
|
||||||
|
if( (NULL != mca_btl_tcp_event_base) &&
|
||||||
|
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
|
||||||
|
/* Turn of the progress thread before moving forward */
|
||||||
|
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
|
||||||
|
void* ret = NULL; /* not currently used */
|
||||||
|
|
||||||
|
mca_btl_tcp_progress_thread_trigger = 0;
|
||||||
|
/* Let the progress thread know that we're going away */
|
||||||
|
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
|
||||||
|
close(mca_btl_tcp_pipe_to_progress[1]);
|
||||||
|
mca_btl_tcp_pipe_to_progress[1] = -1;
|
||||||
|
}
|
||||||
|
/* wait until the TCP progress thread completes */
|
||||||
|
opal_thread_join(&mca_btl_tcp_progress_thread, &ret);
|
||||||
|
assert( -1 == mca_btl_tcp_progress_thread_trigger );
|
||||||
|
}
|
||||||
|
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
|
||||||
|
opal_event_base_free(mca_btl_tcp_event_base);
|
||||||
|
mca_btl_tcp_event_base = NULL;
|
||||||
|
|
||||||
|
/* Close the remaining pipes */
|
||||||
|
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
|
||||||
|
close(mca_btl_tcp_pipe_to_progress[0]);
|
||||||
|
mca_btl_tcp_pipe_to_progress[0] = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
|
||||||
|
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
|
||||||
|
|
||||||
|
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
|
||||||
|
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
|
||||||
|
#endif
|
||||||
|
|
||||||
if (NULL != mca_btl_tcp_component.tcp_btls) {
|
if (NULL != mca_btl_tcp_component.tcp_btls) {
|
||||||
free(mca_btl_tcp_component.tcp_btls);
|
free(mca_btl_tcp_component.tcp_btls);
|
||||||
@ -416,11 +454,8 @@ static int mca_btl_tcp_component_close(void)
|
|||||||
|
|
||||||
/* remove all pending events. Do not lock the tcp_events list as
|
/* remove all pending events. Do not lock the tcp_events list as
|
||||||
the event themselves will unregister during the destructor. */
|
the event themselves will unregister during the destructor. */
|
||||||
for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events);
|
while( NULL != (item = opal_list_remove_first(&mca_btl_tcp_component.tcp_events)) ) {
|
||||||
item != opal_list_get_end(&mca_btl_tcp_component.tcp_events);
|
|
||||||
item = next) {
|
|
||||||
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
|
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
|
||||||
next = opal_list_get_next(item);
|
|
||||||
opal_event_del(&event->event);
|
opal_event_del(&event->event);
|
||||||
OBJ_RELEASE(event);
|
OBJ_RELEASE(event);
|
||||||
}
|
}
|
||||||
@ -436,40 +471,6 @@ static int mca_btl_tcp_component_close(void)
|
|||||||
mca_common_cuda_fini();
|
mca_common_cuda_fini();
|
||||||
#endif /* OPAL_CUDA_SUPPORT */
|
#endif /* OPAL_CUDA_SUPPORT */
|
||||||
|
|
||||||
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
|
|
||||||
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
|
|
||||||
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
|
|
||||||
|
|
||||||
if( (NULL != mca_btl_tcp_event_base) &&
|
|
||||||
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
|
|
||||||
/* Turn of the progress thread before moving forward */
|
|
||||||
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
|
|
||||||
mca_btl_tcp_progress_thread_trigger = 0;
|
|
||||||
/* Let the progress thread know that we're going away */
|
|
||||||
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
|
|
||||||
close(mca_btl_tcp_pipe_to_progress[1]);
|
|
||||||
mca_btl_tcp_pipe_to_progress[1] = -1;
|
|
||||||
}
|
|
||||||
while( -1 != mca_btl_tcp_progress_thread_trigger ) {
|
|
||||||
/*event_base_loopbreak(mca_btl_tcp_event_base);*/
|
|
||||||
sched_yield();
|
|
||||||
usleep(100); /* give app a chance to re-enter library */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
|
|
||||||
opal_event_base_free(mca_btl_tcp_event_base);
|
|
||||||
mca_btl_tcp_event_base = NULL;
|
|
||||||
|
|
||||||
/* Close the remaining pipes */
|
|
||||||
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
|
|
||||||
close(mca_btl_tcp_pipe_to_progress[0]);
|
|
||||||
mca_btl_tcp_pipe_to_progress[0] = -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
|
|
||||||
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1032,6 +1033,8 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
|
|||||||
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
|
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
|
||||||
goto move_forward_with_no_thread;
|
goto move_forward_with_no_thread;
|
||||||
}
|
}
|
||||||
|
/* We have async progress, the rest of the library should now protect itself against races */
|
||||||
|
opal_set_using_threads(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -1295,12 +1298,12 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
|
|||||||
*/
|
*/
|
||||||
static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
||||||
{
|
{
|
||||||
|
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
|
||||||
opal_process_name_t guid;
|
opal_process_name_t guid;
|
||||||
struct sockaddr_storage addr;
|
struct sockaddr_storage addr;
|
||||||
int retval;
|
|
||||||
mca_btl_tcp_proc_t* btl_proc;
|
|
||||||
opal_socklen_t addr_len = sizeof(addr);
|
opal_socklen_t addr_len = sizeof(addr);
|
||||||
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
|
mca_btl_tcp_proc_t* btl_proc;
|
||||||
|
int retval;
|
||||||
|
|
||||||
OBJ_RELEASE(event);
|
OBJ_RELEASE(event);
|
||||||
|
|
||||||
@ -1339,6 +1342,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* are there any existing peer instances will to accept this connection */
|
/* are there any existing peer instances willing to accept this connection */
|
||||||
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
|
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
|
||||||
}
|
}
|
||||||
|
@ -251,7 +251,7 @@ mca_btl_tcp_endpoint_dump(int level,
|
|||||||
if (used >= DEBUG_LENGTH) goto out;
|
if (used >= DEBUG_LENGTH) goto out;
|
||||||
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
||||||
used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
|
used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
|
||||||
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
|
(void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
|
||||||
btl_endpoint->endpoint_cache_length);
|
btl_endpoint->endpoint_cache_length);
|
||||||
if (used >= DEBUG_LENGTH) goto out;
|
if (used >= DEBUG_LENGTH) goto out;
|
||||||
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
|
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
|
||||||
@ -510,6 +510,7 @@ void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
|
|||||||
*/
|
*/
|
||||||
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
|
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
|
||||||
{
|
{
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
|
||||||
if(btl_endpoint->endpoint_sd < 0)
|
if(btl_endpoint->endpoint_sd < 0)
|
||||||
return;
|
return;
|
||||||
btl_endpoint->endpoint_retries++;
|
btl_endpoint->endpoint_retries++;
|
||||||
@ -517,14 +518,16 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
|
|||||||
opal_event_del(&btl_endpoint->endpoint_recv_event);
|
opal_event_del(&btl_endpoint->endpoint_recv_event);
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
|
||||||
opal_event_del(&btl_endpoint->endpoint_send_event);
|
opal_event_del(&btl_endpoint->endpoint_send_event);
|
||||||
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
|
||||||
btl_endpoint->endpoint_sd = -1;
|
|
||||||
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
||||||
free( btl_endpoint->endpoint_cache );
|
free( btl_endpoint->endpoint_cache );
|
||||||
btl_endpoint->endpoint_cache = NULL;
|
btl_endpoint->endpoint_cache = NULL;
|
||||||
btl_endpoint->endpoint_cache_pos = NULL;
|
btl_endpoint->endpoint_cache_pos = NULL;
|
||||||
btl_endpoint->endpoint_cache_length = 0;
|
btl_endpoint->endpoint_cache_length = 0;
|
||||||
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
|
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
|
||||||
|
|
||||||
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
||||||
|
btl_endpoint->endpoint_sd = -1;
|
||||||
/**
|
/**
|
||||||
* If we keep failing to connect to the peer let the caller know about
|
* If we keep failing to connect to the peer let the caller know about
|
||||||
* this situation by triggering all the pending fragments callback and
|
* this situation by triggering all the pending fragments callback and
|
||||||
@ -675,9 +678,9 @@ void mca_btl_tcp_set_socket_options(int sd)
|
|||||||
/*
|
/*
|
||||||
* Start a connection to the endpoint. This will likely not complete,
|
* Start a connection to the endpoint. This will likely not complete,
|
||||||
* as the socket is set to non-blocking, so register for event
|
* as the socket is set to non-blocking, so register for event
|
||||||
* notification of connect completion. On connection we send
|
* notification of connect completion. On connection we send our
|
||||||
* our globally unique process identifier to the endpoint and wait for
|
* globally unique process identifier to the endpoint and wait for
|
||||||
* the endpoints response.
|
* the endpoint response.
|
||||||
*/
|
*/
|
||||||
static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
|
static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
|
||||||
{
|
{
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user