1
1

Correct the connection logic for TCP. Now we have not only a cleaner

connection, but a more thread safe one. Thanks to Pierre for his
help on this.

This commit was SVN r17853.
Этот коммит содержится в:
George Bosilca 2008-03-18 02:42:16 +00:00
родитель 61290c0e51
Коммит 1d04ec4ded
2 изменённых файлов: 56 добавлений и 48 удалений

Просмотреть файл

@ -204,7 +204,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks. * Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
*/ */
static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint, int sd) static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
{ {
#if MCA_BTL_TCP_ENDPOINT_CACHE #if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache); btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
@ -212,15 +212,21 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set( &btl_endpoint->endpoint_recv_event, opal_event_set( &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd, btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST, OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_recv_handler, mca_btl_tcp_endpoint_recv_handler,
btl_endpoint ); btl_endpoint );
/**
* The send event should be non persistent until the endpoint is
* completely connected. This means, when the event is created it
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event_set( &btl_endpoint->endpoint_send_event, opal_event_set( &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd, btl_endpoint->endpoint_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST, OPAL_EV_WRITE,
mca_btl_tcp_endpoint_send_handler, mca_btl_tcp_endpoint_send_handler,
btl_endpoint); btl_endpoint);
} }
@ -265,9 +271,6 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag); opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
} }
break; break;
case MCA_BTL_TCP_SHUTDOWN:
rc = OMPI_ERROR;
break;
} }
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
return rc; return rc;
@ -358,7 +361,7 @@ bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false; return false;
} }
mca_btl_tcp_endpoint_event_init(btl_endpoint, sd); mca_btl_tcp_endpoint_event_init(btl_endpoint);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0); opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
mca_btl_tcp_endpoint_connected(btl_endpoint); mca_btl_tcp_endpoint_connected(btl_endpoint);
#if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP #if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP
@ -381,36 +384,26 @@ bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
*/ */
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint) void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{ {
if(btl_endpoint->endpoint_sd >= 0) { if(btl_endpoint->endpoint_sd < 0)
opal_event_del(&btl_endpoint->endpoint_recv_event); return;
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
}
btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED; btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
btl_endpoint->endpoint_retries++; btl_endpoint->endpoint_retries++;
opal_event_del(&btl_endpoint->endpoint_recv_event);
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
} }
void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t* btl_endpoint)
{
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_tcp_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_state = MCA_BTL_TCP_SHUTDOWN;
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
}
/* /*
* Setup endpoint state to reflect that connection has been established, * Setup endpoint state to reflect that connection has been established,
* and start any pending sends. * and start any pending sends. This function should be called with the
* send lock locked.
*/ */
static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint) static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
@ -418,6 +411,14 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
/* setup socket options */ /* setup socket options */
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED; btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
btl_endpoint->endpoint_retries = 0; btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */
opal_event_set( &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_send_handler,
btl_endpoint);
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) { if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag) if(NULL == btl_endpoint->endpoint_send_frag)
btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*) btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
@ -447,8 +448,8 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo
/* socket is non-blocking so handle errors */ /* socket is non-blocking so handle errors */
if(retval < 0) { if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv() failed: %s (%d)", BTL_ERROR(("recv(%d) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno)); btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp_endpoint_close(btl_endpoint); mca_btl_tcp_endpoint_close(btl_endpoint);
return -1; return -1;
} }
@ -482,9 +483,6 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
} }
#if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp_endpoint_dump(btl_endpoint, "connected");
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -549,7 +547,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd); mca_btl_tcp_set_socket_options(btl_endpoint->endpoint_sd);
/* setup event callbacks */ /* setup event callbacks */
mca_btl_tcp_endpoint_event_init(btl_endpoint, btl_endpoint->endpoint_sd); mca_btl_tcp_endpoint_event_init(btl_endpoint);
/* setup the socket as non-blocking */ /* setup the socket as non-blocking */
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) { if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
@ -642,11 +640,13 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
{ {
mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user; mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
if( sd != btl_endpoint->endpoint_sd )
return;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
switch(btl_endpoint->endpoint_state) { switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECT_ACK: case MCA_BTL_TCP_CONNECT_ACK:
{ {
int rc; int rc = OMPI_ERROR;
rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint); rc = mca_btl_tcp_endpoint_recv_connect_ack(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
if( OMPI_SUCCESS == rc ) { if( OMPI_SUCCESS == rc ) {
@ -654,6 +654,9 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock); OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_tcp_endpoint_connected(btl_endpoint); mca_btl_tcp_endpoint_connected(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
#if OMPI_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp_endpoint_dump(btl_endpoint, "connected");
#endif
} }
break; break;
} }
@ -683,7 +686,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
data_still_pending_on_endpoint: data_still_pending_on_endpoint:
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */ #endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */ /* check for completion of non-blocking recv on the current fragment */
if(mca_btl_tcp_frag_recv(frag, sd) == false) { if(mca_btl_tcp_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
btl_endpoint->endpoint_recv_frag = frag; btl_endpoint->endpoint_recv_frag = frag;
} else { } else {
btl_endpoint->endpoint_recv_frag = NULL; btl_endpoint->endpoint_recv_frag = NULL;
@ -709,7 +712,13 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break; break;
} }
case MCA_BTL_TCP_SHUTDOWN: case MCA_BTL_TCP_CLOSED:
/* This is a thread-safety issue. As multiple threads are allowed
* to generate events (in the lib event) we endup with several
* threads executing the receive callback, when we reach the end
* of the MPI_Finalize. The first one will close the connections,
* and all others will complain.
*/
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break; break;
default: default:

Просмотреть файл

@ -40,8 +40,7 @@ typedef enum {
MCA_BTL_TCP_CONNECT_ACK, MCA_BTL_TCP_CONNECT_ACK,
MCA_BTL_TCP_CLOSED, MCA_BTL_TCP_CLOSED,
MCA_BTL_TCP_FAILED, MCA_BTL_TCP_FAILED,
MCA_BTL_TCP_CONNECTED, MCA_BTL_TCP_CONNECTED
MCA_BTL_TCP_SHUTDOWN
} mca_btl_tcp_state_t; } mca_btl_tcp_state_t;
/** /**