From 3adff9d3236ec8c27e27066b37689150410e9178 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Wed, 24 Aug 2016 22:42:07 -0400 Subject: [PATCH] Fixes #1793. Reshape the tearing down process (connection close) to prevent race conditions between the main thread and the progress thread. Minor cleanups. --- opal/mca/btl/tcp/btl_tcp.c | 2 +- opal/mca/btl/tcp/btl_tcp_component.c | 97 ++++++++++++++-------------- opal/mca/btl/tcp/btl_tcp_endpoint.c | 15 +++-- 3 files changed, 60 insertions(+), 54 deletions(-) diff --git a/opal/mca/btl/tcp/btl_tcp.c b/opal/mca/btl/tcp/btl_tcp.c index 0cf4c42071..cb56c105a4 100644 --- a/opal/mca/btl/tcp/btl_tcp.c +++ b/opal/mca/btl/tcp/btl_tcp.c @@ -149,7 +149,7 @@ int mca_btl_tcp_del_procs(struct mca_btl_base_module_t* btl, { mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl; size_t i; - for(i=0; iendpoint_proc != mca_btl_tcp_proc_local()) { opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint); diff --git a/opal/mca/btl/tcp/btl_tcp_component.c b/opal/mca/btl/tcp/btl_tcp_component.c index e4a547ac9c..2173da853e 100644 --- a/opal/mca/btl/tcp/btl_tcp_component.c +++ b/opal/mca/btl/tcp/btl_tcp_component.c @@ -259,7 +259,8 @@ static int mca_btl_tcp_component_register(void) " used to reduce the number of syscalls, by replacing them with memcpy." " Every read will read the expected data plus the amount of the" " endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache); - mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay); + mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", + 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay); mca_btl_tcp_param_register_int( "port_min_v4", "The minimum port where the TCP BTL will try to bind (default 1024)", 1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min); @@ -299,9 +300,8 @@ static int mca_btl_tcp_component_register(void) opal_process_info.nodename, mca_btl_tcp_component.tcp_if_seq, "Progress thread support compiled out"); - } + } #endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */ - mca_btl_tcp_component.report_all_unfound_interfaces = false; (void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version, "warn_all_unfound_interfaces", @@ -394,8 +394,46 @@ static int mca_btl_tcp_component_open(void) static int mca_btl_tcp_component_close(void) { - opal_list_item_t* item; - opal_list_item_t* next; + opal_list_item_t *item; + +#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD + /** + * If we have a progress thread we should shut it down before + * moving forward with the TCP tearing down process. + */ + if( (NULL != mca_btl_tcp_event_base) && + (mca_btl_tcp_event_base != opal_sync_event_base) ) { + /* Turn of the progress thread before moving forward */ + if( -1 != mca_btl_tcp_progress_thread_trigger ) { + void* ret = NULL; /* not currently used */ + + mca_btl_tcp_progress_thread_trigger = 0; + /* Let the progress thread know that we're going away */ + if( -1 != mca_btl_tcp_pipe_to_progress[1] ) { + close(mca_btl_tcp_pipe_to_progress[1]); + mca_btl_tcp_pipe_to_progress[1] = -1; + } + /* wait until the TCP progress thread completes */ + opal_thread_join(&mca_btl_tcp_progress_thread, &ret); + assert( -1 == mca_btl_tcp_progress_thread_trigger ); + } + opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event); + opal_event_base_free(mca_btl_tcp_event_base); + mca_btl_tcp_event_base = NULL; + + /* Close the remaining pipes */ + if( -1 != mca_btl_tcp_pipe_to_progress[0] ) { + close(mca_btl_tcp_pipe_to_progress[0]); + mca_btl_tcp_pipe_to_progress[0] = -1; + } + } + + OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex); + OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex); + + OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex); + OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue); +#endif if (NULL != mca_btl_tcp_component.tcp_btls) { free(mca_btl_tcp_component.tcp_btls); @@ -416,11 +454,8 @@ static int mca_btl_tcp_component_close(void) /* remove all pending events. Do not lock the tcp_events list as the event themselves will unregister during the destructor. */ - for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events); - item != opal_list_get_end(&mca_btl_tcp_component.tcp_events); - item = next) { + while( NULL != (item = opal_list_remove_first(&mca_btl_tcp_component.tcp_events)) ) { mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item; - next = opal_list_get_next(item); opal_event_del(&event->event); OBJ_RELEASE(event); } @@ -436,40 +471,6 @@ static int mca_btl_tcp_component_close(void) mca_common_cuda_fini(); #endif /* OPAL_CUDA_SUPPORT */ -#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD - OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex); - OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex); - - if( (NULL != mca_btl_tcp_event_base) && - (mca_btl_tcp_event_base != opal_sync_event_base) ) { - /* Turn of the progress thread before moving forward */ - if( -1 != mca_btl_tcp_progress_thread_trigger ) { - mca_btl_tcp_progress_thread_trigger = 0; - /* Let the progress thread know that we're going away */ - if( -1 != mca_btl_tcp_pipe_to_progress[1] ) { - close(mca_btl_tcp_pipe_to_progress[1]); - mca_btl_tcp_pipe_to_progress[1] = -1; - } - while( -1 != mca_btl_tcp_progress_thread_trigger ) { - /*event_base_loopbreak(mca_btl_tcp_event_base);*/ - sched_yield(); - usleep(100); /* give app a chance to re-enter library */ - } - } - opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event); - opal_event_base_free(mca_btl_tcp_event_base); - mca_btl_tcp_event_base = NULL; - - /* Close the remaining pipes */ - if( -1 != mca_btl_tcp_pipe_to_progress[0] ) { - close(mca_btl_tcp_pipe_to_progress[0]); - mca_btl_tcp_pipe_to_progress[0] = -1; - } - } - OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex); - OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue); -#endif - return OPAL_SUCCESS; } @@ -1032,6 +1033,8 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family) mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */ goto move_forward_with_no_thread; } + /* We have async progress, the rest of the library should now protect itself against races */ + opal_set_using_threads(true); } } else { @@ -1295,12 +1298,12 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd, */ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) { + mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user; opal_process_name_t guid; struct sockaddr_storage addr; - int retval; - mca_btl_tcp_proc_t* btl_proc; opal_socklen_t addr_len = sizeof(addr); - mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user; + mca_btl_tcp_proc_t* btl_proc; + int retval; OBJ_RELEASE(event); @@ -1339,6 +1342,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user) return; } - /* are there any existing peer instances will to accept this connection */ + /* are there any existing peer instances willing to accept this connection */ (void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd); } diff --git a/opal/mca/btl/tcp/btl_tcp_endpoint.c b/opal/mca/btl/tcp/btl_tcp_endpoint.c index 65f4f6437f..935bf4b277 100644 --- a/opal/mca/btl/tcp/btl_tcp_endpoint.c +++ b/opal/mca/btl/tcp/btl_tcp_endpoint.c @@ -251,7 +251,7 @@ mca_btl_tcp_endpoint_dump(int level, if (used >= DEBUG_LENGTH) goto out; #if MCA_BTL_TCP_ENDPOINT_CACHE used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]", - btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache, + (void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_length); if (used >= DEBUG_LENGTH) goto out; #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ @@ -510,6 +510,7 @@ void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, */ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) { + MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]"); if(btl_endpoint->endpoint_sd < 0) return; btl_endpoint->endpoint_retries++; @@ -517,14 +518,16 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) opal_event_del(&btl_endpoint->endpoint_recv_event); MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]"); opal_event_del(&btl_endpoint->endpoint_send_event); - CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd); - btl_endpoint->endpoint_sd = -1; + #if MCA_BTL_TCP_ENDPOINT_CACHE free( btl_endpoint->endpoint_cache ); btl_endpoint->endpoint_cache = NULL; btl_endpoint->endpoint_cache_pos = NULL; btl_endpoint->endpoint_cache_length = 0; #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ + + CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd); + btl_endpoint->endpoint_sd = -1; /** * If we keep failing to connect to the peer let the caller know about * this situation by triggering all the pending fragments callback and @@ -675,9 +678,9 @@ void mca_btl_tcp_set_socket_options(int sd) /* * Start a connection to the endpoint. This will likely not complete, * as the socket is set to non-blocking, so register for event - * notification of connect completion. On connection we send - * our globally unique process identifier to the endpoint and wait for - * the endpoints response. + * notification of connect completion. On connection we send our + * globally unique process identifier to the endpoint and wait for + * the endpoint response. */ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint) {