diff --git a/ompi/mca/btl/tcp/btl_tcp_endpoint.c b/ompi/mca/btl/tcp/btl_tcp_endpoint.c index 684da05bc1..8aae4992b6 100644 --- a/ompi/mca/btl/tcp/btl_tcp_endpoint.c +++ b/ompi/mca/btl/tcp/btl_tcp_endpoint.c @@ -204,7 +204,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con * Initialize events to be used by the endpoint instance for TCP select/poll callbacks. */ -static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint, int sd) +static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint) { #if MCA_BTL_TCP_ENDPOINT_CACHE btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache); @@ -212,15 +212,21 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_ #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ opal_event_set( &btl_endpoint->endpoint_recv_event, - btl_endpoint->endpoint_sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - mca_btl_tcp_endpoint_recv_handler, - btl_endpoint ); + btl_endpoint->endpoint_sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_btl_tcp_endpoint_recv_handler, + btl_endpoint ); + /** + * The send event should be non persistent until the endpoint is + * completely connected. This means, when the event is created it + * will be fired only once, and when the endpoint is marked as + * CONNECTED the event should be recreated with the correct flags. + */ opal_event_set( &btl_endpoint->endpoint_send_event, - btl_endpoint->endpoint_sd, - OPAL_EV_WRITE|OPAL_EV_PERSIST, - mca_btl_tcp_endpoint_send_handler, - btl_endpoint); + btl_endpoint->endpoint_sd, + OPAL_EV_WRITE, + mca_btl_tcp_endpoint_send_handler, + btl_endpoint); } @@ -265,9 +271,6 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag); } break; - case MCA_BTL_TCP_SHUTDOWN: - rc = OMPI_ERROR; - break; } OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); return rc; @@ -358,7 +361,7 @@ bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); return false; } - mca_btl_tcp_endpoint_event_init(btl_endpoint, sd); + mca_btl_tcp_endpoint_event_init(btl_endpoint); opal_event_add(&btl_endpoint->endpoint_recv_event, 0); mca_btl_tcp_endpoint_connected(btl_endpoint); #if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP @@ -381,36 +384,26 @@ bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint, */ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) { - if(btl_endpoint->endpoint_sd >= 0) { - opal_event_del(&btl_endpoint->endpoint_recv_event); - opal_event_del(&btl_endpoint->endpoint_send_event); - CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd); - btl_endpoint->endpoint_sd = -1; -#if MCA_BTL_TCP_ENDPOINT_CACHE - free( btl_endpoint->endpoint_cache ); - btl_endpoint->endpoint_cache = NULL; - btl_endpoint->endpoint_cache_pos = NULL; - btl_endpoint->endpoint_cache_length = 0; -#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ - } + if(btl_endpoint->endpoint_sd < 0) + return; btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED; btl_endpoint->endpoint_retries++; + opal_event_del(&btl_endpoint->endpoint_recv_event); + opal_event_del(&btl_endpoint->endpoint_send_event); + CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd); + btl_endpoint->endpoint_sd = -1; +#if MCA_BTL_TCP_ENDPOINT_CACHE + free( btl_endpoint->endpoint_cache ); + btl_endpoint->endpoint_cache = NULL; + btl_endpoint->endpoint_cache_pos = NULL; + btl_endpoint->endpoint_cache_length = 0; +#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ } -void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t* btl_endpoint) -{ - OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); - OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); - mca_btl_tcp_endpoint_close(btl_endpoint); - btl_endpoint->endpoint_state = MCA_BTL_TCP_SHUTDOWN; - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); - OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); -} - - /* * Setup endpoint state to reflect that connection has been established, - * and start any pending sends. + * and start any pending sends. This function should be called with the + * send lock locked. */ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint) @@ -418,6 +411,14 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint /* setup socket options */ btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED; btl_endpoint->endpoint_retries = 0; + + /* Create the send event in a persistent manner. */ + opal_event_set( &btl_endpoint->endpoint_send_event, + btl_endpoint->endpoint_sd, + OPAL_EV_WRITE | OPAL_EV_PERSIST, + mca_btl_tcp_endpoint_send_handler, + btl_endpoint); + if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) { if(NULL == btl_endpoint->endpoint_send_frag) btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*) @@ -447,8 +448,8 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo /* socket is non-blocking so handle errors */ if(retval < 0) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - BTL_ERROR(("recv() failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno)); + BTL_ERROR(("recv(%d) failed: %s (%d)", + btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno)); mca_btl_tcp_endpoint_close(btl_endpoint); return -1; } @@ -482,9 +483,6 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en return OMPI_ERR_UNREACH; } -#if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP - mca_btl_tcp_endpoint_dump(btl_endpoint, "connected"); -#endif return OMPI_SUCCESS; } @@ -549,7 +547,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd); /* setup event callbacks */ - mca_btl_tcp_endpoint_event_init(btl_endpoint, btl_endpoint->endpoint_sd); + mca_btl_tcp_endpoint_event_init(btl_endpoint); /* setup the socket as non-blocking */ if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { @@ -642,11 +640,13 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user) { mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user; + if( sd != btl_endpoint->endpoint_sd ) + return; OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); switch(btl_endpoint->endpoint_state) { case MCA_BTL_TCP_CONNECT_ACK: { - int rc; + int rc = OMPI_ERROR; rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); if( OMPI_SUCCESS == rc ) { @@ -654,6 +654,9 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user) OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); mca_btl_tcp_endpoint_connected(btl_endpoint); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); +#if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP + mca_btl_tcp_endpoint_dump(btl_endpoint, "connected"); +#endif } break; } @@ -683,7 +686,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user) data_still_pending_on_endpoint: #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ /* check for completion of non-blocking recv on the current fragment */ - if(mca_btl_tcp_frag_recv(frag, sd) == false) { + if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) { btl_endpoint->endpoint_recv_frag = frag; } else { btl_endpoint->endpoint_recv_frag = NULL; @@ -709,7 +712,13 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user) OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); break; } - case MCA_BTL_TCP_SHUTDOWN: + case MCA_BTL_TCP_CLOSED: + /* This is a thread-safety issue. As multiple threads are allowed + * to generate events (in the lib event) we endup with several + * threads executing the receive callback, when we reach the end + * of the MPI_Finalize. The first one will close the connections, + * and all others will complain. + */ OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); break; default: diff --git a/ompi/mca/btl/tcp/btl_tcp_endpoint.h b/ompi/mca/btl/tcp/btl_tcp_endpoint.h index 44ae91b9a2..524935eb48 100644 --- a/ompi/mca/btl/tcp/btl_tcp_endpoint.h +++ b/ompi/mca/btl/tcp/btl_tcp_endpoint.h @@ -40,8 +40,7 @@ typedef enum { MCA_BTL_TCP_CONNECT_ACK, MCA_BTL_TCP_CLOSED, MCA_BTL_TCP_FAILED, - MCA_BTL_TCP_CONNECTED, - MCA_BTL_TCP_SHUTDOWN + MCA_BTL_TCP_CONNECTED } mca_btl_tcp_state_t; /**