Solve another handshake problem, where one threads was calling del_event
while cleaning up after receiving a zero byte on the connect socket (localyy started connection), while another was trying to accept a new connection from the same peer. Create a zero-timed event and delocalize the accept into a timer_event. Add support for registering an error callback, that can be used when a connection is discovered as failed during the initialization process.
Этот коммит содержится в:
родитель
e20413c885
Коммит
f87a4b691b
@ -115,6 +115,7 @@ struct mca_btl_tcp_module_t {
|
|||||||
struct sockaddr_storage tcp_ifaddr; /**< BTL interface address */
|
struct sockaddr_storage tcp_ifaddr; /**< BTL interface address */
|
||||||
uint32_t tcp_ifmask; /**< BTL interface netmask */
|
uint32_t tcp_ifmask; /**< BTL interface netmask */
|
||||||
opal_list_t tcp_endpoints;
|
opal_list_t tcp_endpoints;
|
||||||
|
mca_btl_base_module_error_cb_fn_t tcp_error_cb; /**< Upper layer error callback */
|
||||||
#if MCA_BTL_TCP_STATISTICS
|
#if MCA_BTL_TCP_STATISTICS
|
||||||
size_t tcp_bytes_sent;
|
size_t tcp_bytes_sent;
|
||||||
size_t tcp_bytes_recv;
|
size_t tcp_bytes_recv;
|
||||||
|
@ -1136,9 +1136,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* are there any existing peer instances will to accept this connection */
|
/* are there any existing peer instances will to accept this connection */
|
||||||
if(mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd) == false) {
|
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
|
||||||
CLOSE_THE_SOCKET(sd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,6 +73,7 @@ static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
|
|||||||
endpoint->endpoint_proc = NULL;
|
endpoint->endpoint_proc = NULL;
|
||||||
endpoint->endpoint_addr = NULL;
|
endpoint->endpoint_addr = NULL;
|
||||||
endpoint->endpoint_sd = -1;
|
endpoint->endpoint_sd = -1;
|
||||||
|
endpoint->endpoint_sd_next = -1;
|
||||||
endpoint->endpoint_send_frag = 0;
|
endpoint->endpoint_send_frag = 0;
|
||||||
endpoint->endpoint_recv_frag = 0;
|
endpoint->endpoint_recv_frag = 0;
|
||||||
endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
|
endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
|
||||||
@ -94,8 +95,8 @@ static void mca_btl_tcp_endpoint_construct(mca_btl_tcp_endpoint_t* endpoint)
|
|||||||
*/
|
*/
|
||||||
static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
|
static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
|
||||||
{
|
{
|
||||||
mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
|
|
||||||
mca_btl_tcp_endpoint_close(endpoint);
|
mca_btl_tcp_endpoint_close(endpoint);
|
||||||
|
mca_btl_tcp_proc_remove(endpoint->endpoint_proc, endpoint);
|
||||||
OBJ_DESTRUCT(&endpoint->endpoint_frags);
|
OBJ_DESTRUCT(&endpoint->endpoint_frags);
|
||||||
OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
|
OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
|
||||||
OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
|
OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
|
||||||
@ -119,7 +120,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
|
|||||||
* Diagnostics: change this to "1" to enable the function
|
* Diagnostics: change this to "1" to enable the function
|
||||||
* mca_btl_tcp_endpoint_dump(), below
|
* mca_btl_tcp_endpoint_dump(), below
|
||||||
*/
|
*/
|
||||||
#define WANT_PEER_DUMP 0
|
#define WANT_PEER_DUMP 1
|
||||||
/*
|
/*
|
||||||
* diagnostics
|
* diagnostics
|
||||||
*/
|
*/
|
||||||
@ -134,7 +135,11 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
|
|||||||
* to the call.
|
* to the call.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint,
|
mca_btl_tcp_endpoint_dump(int level,
|
||||||
|
const char* fname,
|
||||||
|
int lineno,
|
||||||
|
const char* funcname,
|
||||||
|
mca_btl_base_endpoint_t* btl_endpoint,
|
||||||
bool full_info,
|
bool full_info,
|
||||||
const char* msg)
|
const char* msg)
|
||||||
{
|
{
|
||||||
@ -148,6 +153,7 @@ mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint,
|
|||||||
opal_socklen_t obtlen;
|
opal_socklen_t obtlen;
|
||||||
opal_socklen_t addrlen = sizeof(inaddr);
|
opal_socklen_t addrlen = sizeof(inaddr);
|
||||||
mca_btl_tcp_frag_t* item;
|
mca_btl_tcp_frag_t* item;
|
||||||
|
mca_btl_tcp_proc_t* this_proc = mca_btl_tcp_proc_local();
|
||||||
|
|
||||||
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
|
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
|
||||||
#if OPAL_ENABLE_IPV6
|
#if OPAL_ENABLE_IPV6
|
||||||
@ -252,14 +258,19 @@ mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint,
|
|||||||
OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
|
OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
|
||||||
used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], 1024 - used);
|
used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], 1024 - used);
|
||||||
}
|
}
|
||||||
BTL_VERBOSE(("%s", outmsg));
|
opal_output_verbose(level, opal_btl_base_framework.framework_output,
|
||||||
|
"[%s:%d:%s][%s -> %s] %s",
|
||||||
|
fname, lineno, funcname,
|
||||||
|
(NULL != this_proc ? OPAL_NAME_PRINT(mca_btl_tcp_proc_local()->proc_opal->proc_name) : "unknown"),
|
||||||
|
(NULL != btl_endpoint->endpoint_proc ? OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name) : "unknown remote"),
|
||||||
|
outmsg);
|
||||||
}
|
}
|
||||||
#endif /* WANT_PEER_DUMP */
|
#endif /* WANT_PEER_DUMP */
|
||||||
|
|
||||||
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
|
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
|
||||||
#define MCA_BTL_TCP_ENDPOINT_DUMP(ENDPOINT, INFO, MSG) mca_btl_tcp_endpoint_dump((ENDPOINT), (INFO), (MSG))
|
#define MCA_BTL_TCP_ENDPOINT_DUMP(LEVEL, ENDPOINT, INFO, MSG) mca_btl_tcp_endpoint_dump((LEVEL), __FILE__, __LINE__, __func__, (ENDPOINT), (INFO), (MSG))
|
||||||
#else
|
#else
|
||||||
#define MCA_BTL_TCP_ENDPOINT_DUMP(ENDPOINT, INFO, MSG)
|
#define MCA_BTL_TCP_ENDPOINT_DUMP(LEVEL, ENDPOINT, INFO, MSG)
|
||||||
#endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
|
#endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -328,16 +339,16 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp
|
|||||||
if( btl_ownership ) {
|
if( btl_ownership ) {
|
||||||
MCA_BTL_TCP_FRAG_RETURN(frag);
|
MCA_BTL_TCP_FRAG_RETURN(frag);
|
||||||
}
|
}
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "complete send fragment [endpoint_send]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(50, btl_endpoint, true, "complete send fragment [endpoint_send]");
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
btl_endpoint->endpoint_send_frag = frag;
|
btl_endpoint->endpoint_send_frag = frag;
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [endpoint_send]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_send]");
|
||||||
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
||||||
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
|
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "send fragment enqueued [endpoint_send]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "send fragment enqueued [endpoint_send]");
|
||||||
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
|
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
|
||||||
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
|
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
|
||||||
}
|
}
|
||||||
@ -362,8 +373,8 @@ mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
|
|||||||
int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
|
int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
|
||||||
if(retval < 0) {
|
if(retval < 0) {
|
||||||
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
||||||
BTL_ERROR(("send(%p, %lu) failed on socket %d: %s (%d)",
|
BTL_ERROR(("send(%d, %p, %lu/%lu) failed: %s (%d)",
|
||||||
data, size, btl_endpoint->endpoint_sd,
|
btl_endpoint->endpoint_sd, data, cnt, size,
|
||||||
strerror(opal_socket_errno), opal_socket_errno));
|
strerror(opal_socket_errno), opal_socket_errno));
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
@ -396,6 +407,68 @@ static int mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_en
|
|||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *mca_btl_tcp_endpoint_complete_accept(int fd, int flags, void *context)
|
||||||
|
{
|
||||||
|
mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t*)context;
|
||||||
|
mca_btl_tcp_proc_t* this_proc = mca_btl_tcp_proc_local();
|
||||||
|
struct timeval now = {0, 0};
|
||||||
|
int cmpval;
|
||||||
|
|
||||||
|
if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_recv_lock) ) {
|
||||||
|
opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if( OPAL_THREAD_TRYLOCK(&btl_endpoint->endpoint_send_lock) ) {
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
||||||
|
opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(NULL == btl_endpoint->endpoint_addr) {
|
||||||
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
|
||||||
|
btl_endpoint->endpoint_sd_next = -1;
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
||||||
|
if( NULL != btl_endpoint->endpoint_btl->tcp_error_cb ) {
|
||||||
|
btl_endpoint->endpoint_btl->tcp_error_cb(
|
||||||
|
&btl_endpoint->endpoint_btl->super, MCA_BTL_ERROR_FLAGS_NONFATAL,
|
||||||
|
btl_endpoint->endpoint_proc->proc_opal,
|
||||||
|
"The endpoint addr is set to NULL (unsettling)");
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
opal_event_del(&btl_endpoint->endpoint_accept_event);
|
||||||
|
|
||||||
|
cmpval = opal_compare_proc(btl_endpoint->endpoint_proc->proc_opal->proc_name,
|
||||||
|
this_proc->proc_opal->proc_name);
|
||||||
|
if((btl_endpoint->endpoint_sd < 0) ||
|
||||||
|
(btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
|
||||||
|
cmpval < 0)) {
|
||||||
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
|
btl_endpoint->endpoint_sd = btl_endpoint->endpoint_sd_next;
|
||||||
|
btl_endpoint->endpoint_sd_next = -1;
|
||||||
|
if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, " [endpoint_accept]");
|
||||||
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
||||||
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
|
goto unlock_and_return;
|
||||||
|
}
|
||||||
|
mca_btl_tcp_endpoint_event_init(btl_endpoint);
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [endpoint_accept]");
|
||||||
|
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
||||||
|
mca_btl_tcp_endpoint_connected(btl_endpoint);
|
||||||
|
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "accepted");
|
||||||
|
goto unlock_and_return;
|
||||||
|
}
|
||||||
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd_next); /* No further use of this socket. Close it */
|
||||||
|
btl_endpoint->endpoint_sd_next = -1;
|
||||||
|
unlock_and_return:
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
||||||
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check the state of this endpoint. If the incoming connection request matches
|
* Check the state of this endpoint. If the incoming connection request matches
|
||||||
* our endpoints address, check the state of our connection:
|
* our endpoints address, check the state of our connection:
|
||||||
@ -405,49 +478,17 @@ static int mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_en
|
|||||||
* otherwise, reject the connection and continue with the current connection
|
* otherwise, reject the connection and continue with the current connection
|
||||||
*/
|
*/
|
||||||
|
|
||||||
bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
|
void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
|
||||||
struct sockaddr* addr, int sd)
|
struct sockaddr* addr, int sd)
|
||||||
{
|
{
|
||||||
mca_btl_tcp_proc_t* this_proc = mca_btl_tcp_proc_local();
|
struct timeval now = {0, 0};
|
||||||
mca_btl_tcp_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
|
|
||||||
int cmpval;
|
|
||||||
|
|
||||||
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
|
assert(btl_endpoint->endpoint_sd_next == -1);
|
||||||
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
|
btl_endpoint->endpoint_sd_next = sd;
|
||||||
|
|
||||||
if(NULL == btl_endpoint->endpoint_addr) {
|
opal_event_evtimer_set(opal_event_base, &btl_endpoint->endpoint_accept_event,
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
mca_btl_tcp_endpoint_complete_accept, btl_endpoint);
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
opal_event_add(&btl_endpoint->endpoint_accept_event, &now);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cmpval = opal_compare_proc(endpoint_proc->proc_opal->proc_name, this_proc->proc_opal->proc_name);
|
|
||||||
if((btl_endpoint->endpoint_sd < 0) ||
|
|
||||||
(btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
|
|
||||||
cmpval < 0)) {
|
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
|
||||||
btl_endpoint->endpoint_sd = sd;
|
|
||||||
if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) != OPAL_SUCCESS) {
|
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
mca_btl_tcp_endpoint_event_init(btl_endpoint);
|
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(recv) [endpoint_accept]");
|
|
||||||
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
|
||||||
mca_btl_tcp_endpoint_connected(btl_endpoint);
|
|
||||||
|
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "accepted");
|
|
||||||
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -461,9 +502,9 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
|
|||||||
if(btl_endpoint->endpoint_sd < 0)
|
if(btl_endpoint->endpoint_sd < 0)
|
||||||
return;
|
return;
|
||||||
btl_endpoint->endpoint_retries++;
|
btl_endpoint->endpoint_retries++;
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(recv) [close]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
|
||||||
opal_event_del(&btl_endpoint->endpoint_recv_event);
|
opal_event_del(&btl_endpoint->endpoint_recv_event);
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(send) [close]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
|
||||||
opal_event_del(&btl_endpoint->endpoint_send_event);
|
opal_event_del(&btl_endpoint->endpoint_send_event);
|
||||||
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
|
||||||
btl_endpoint->endpoint_sd = -1;
|
btl_endpoint->endpoint_sd = -1;
|
||||||
@ -504,6 +545,7 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
|
|||||||
/* setup socket options */
|
/* setup socket options */
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
|
||||||
btl_endpoint->endpoint_retries = 0;
|
btl_endpoint->endpoint_retries = 0;
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "READY [endpoint_connected]");
|
||||||
|
|
||||||
/* Create the send event in a persistent manner. */
|
/* Create the send event in a persistent manner. */
|
||||||
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
|
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
|
||||||
@ -516,15 +558,15 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
|
|||||||
if(NULL == btl_endpoint->endpoint_send_frag)
|
if(NULL == btl_endpoint->endpoint_send_frag)
|
||||||
btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
|
btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
|
||||||
opal_list_remove_first(&btl_endpoint->endpoint_frags);
|
opal_list_remove_first(&btl_endpoint->endpoint_frags);
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [endpoint_connected]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [endpoint_connected]");
|
||||||
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
|
* A blocking recv on a non-blocking socket. Used to receive the small
|
||||||
* information that identifies the endpoints endpoint.
|
* amount of connection information that identifies the remote endpoint (guid).
|
||||||
*/
|
*/
|
||||||
static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
|
static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
|
||||||
{
|
{
|
||||||
@ -542,8 +584,8 @@ static int mca_btl_tcp_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpo
|
|||||||
/* socket is non-blocking so handle errors */
|
/* socket is non-blocking so handle errors */
|
||||||
if(retval < 0) {
|
if(retval < 0) {
|
||||||
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
||||||
BTL_ERROR(("recv(%d) failed: %s (%d)",
|
BTL_ERROR(("recv(%d, %lu/%lu) failed: %s (%d)",
|
||||||
btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
|
btl_endpoint->endpoint_sd, cnt, size, strerror(opal_socket_errno), opal_socket_errno));
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
return -1;
|
return -1;
|
||||||
@ -685,15 +727,17 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
|
|||||||
/* send our globally unique process identifier to the endpoint */
|
/* send our globally unique process identifier to the endpoint */
|
||||||
if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
|
if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(recv) [start_connect]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(recv) [start_connect]");
|
||||||
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
/* We connected to the peer, but he close the socket before we got a chance to send our guid */
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "dropped connection [start_connect]");
|
||||||
} else {
|
} else {
|
||||||
/* non-blocking so wait for completion */
|
/* non-blocking so wait for completion */
|
||||||
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
|
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [start_connect]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "event_add(send) [start_connect]");
|
||||||
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -705,7 +749,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
|
|||||||
BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
|
BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_opal,
|
||||||
( "Unable to connect to the peer %s on port %d: %s\n",
|
( "Unable to connect to the peer %s on port %d: %s\n",
|
||||||
address,
|
address,
|
||||||
btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
|
btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
|
||||||
}
|
}
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
@ -753,9 +797,10 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
|
|||||||
if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
|
if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
|
||||||
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_add(recv) [complete_connect]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_add(recv) [complete_connect]");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, " [complete_connect]");
|
||||||
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
|
||||||
mca_btl_tcp_endpoint_close(btl_endpoint);
|
mca_btl_tcp_endpoint_close(btl_endpoint);
|
||||||
}
|
}
|
||||||
@ -805,7 +850,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
|
|||||||
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
|
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
|
||||||
mca_btl_tcp_endpoint_connected(btl_endpoint);
|
mca_btl_tcp_endpoint_connected(btl_endpoint);
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "connected");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, true, "connected");
|
||||||
}
|
}
|
||||||
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
|
||||||
return;
|
return;
|
||||||
@ -927,13 +972,13 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
|
|||||||
|
|
||||||
/* if nothing else to do unregister for send event notifications */
|
/* if nothing else to do unregister for send event notifications */
|
||||||
if(NULL == btl_endpoint->endpoint_send_frag) {
|
if(NULL == btl_endpoint->endpoint_send_frag) {
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(10, btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
|
||||||
opal_event_del(&btl_endpoint->endpoint_send_event);
|
opal_event_del(&btl_endpoint->endpoint_send_event);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
|
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
|
||||||
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
|
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
|
||||||
opal_event_del(&btl_endpoint->endpoint_send_event);
|
opal_event_del(&btl_endpoint->endpoint_send_event);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@ struct mca_btl_base_endpoint_t {
|
|||||||
struct mca_btl_tcp_proc_t* endpoint_proc; /**< proc structure corresponding to endpoint */
|
struct mca_btl_tcp_proc_t* endpoint_proc; /**< proc structure corresponding to endpoint */
|
||||||
struct mca_btl_tcp_addr_t* endpoint_addr; /**< address of endpoint */
|
struct mca_btl_tcp_addr_t* endpoint_addr; /**< address of endpoint */
|
||||||
int endpoint_sd; /**< socket connection to endpoint */
|
int endpoint_sd; /**< socket connection to endpoint */
|
||||||
|
int endpoint_sd_next; /**< deadlock avoidance: socket connection to endpoint to set once the endpoint_sd has been correctly closed */
|
||||||
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
#if MCA_BTL_TCP_ENDPOINT_CACHE
|
||||||
char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall) */
|
char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall) */
|
||||||
char* endpoint_cache_pos; /**< current position in the cache */
|
char* endpoint_cache_pos; /**< current position in the cache */
|
||||||
@ -64,6 +65,7 @@ struct mca_btl_base_endpoint_t {
|
|||||||
opal_list_t endpoint_frags; /**< list of pending frags to send */
|
opal_list_t endpoint_frags; /**< list of pending frags to send */
|
||||||
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
|
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
|
||||||
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
|
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
|
||||||
|
opal_event_t endpoint_accept_event; /**< event for async processing of accept requests */
|
||||||
opal_event_t endpoint_send_event; /**< event for async processing of send frags */
|
opal_event_t endpoint_send_event; /**< event for async processing of send frags */
|
||||||
opal_event_t endpoint_recv_event; /**< event for async processing of recv frags */
|
opal_event_t endpoint_recv_event; /**< event for async processing of recv frags */
|
||||||
bool endpoint_nbo; /**< convert headers to network byte order? */
|
bool endpoint_nbo; /**< convert headers to network byte order? */
|
||||||
@ -76,7 +78,7 @@ OBJ_CLASS_DECLARATION(mca_btl_tcp_endpoint_t);
|
|||||||
void mca_btl_tcp_set_socket_options(int sd);
|
void mca_btl_tcp_set_socket_options(int sd);
|
||||||
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*);
|
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t*);
|
||||||
int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*);
|
int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp_frag_t*);
|
||||||
bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int);
|
void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int);
|
||||||
void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*);
|
void mca_btl_tcp_endpoint_shutdown(mca_btl_base_endpoint_t*);
|
||||||
|
|
||||||
END_C_DECLS
|
END_C_DECLS
|
||||||
|
@ -738,7 +738,7 @@ mca_btl_tcp_proc_t* mca_btl_tcp_proc_lookup(const opal_process_name_t *name)
|
|||||||
* loop through all available BTLs for one matching the source address
|
* loop through all available BTLs for one matching the source address
|
||||||
* of the request.
|
* of the request.
|
||||||
*/
|
*/
|
||||||
bool mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr, int sd)
|
void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr, int sd)
|
||||||
{
|
{
|
||||||
size_t i;
|
size_t i;
|
||||||
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
|
||||||
@ -770,13 +770,13 @@ bool mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t* btl_proc, struct sockaddr* addr
|
|||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd)) {
|
(void)mca_btl_tcp_endpoint_accept(btl_endpoint, addr, sd);
|
||||||
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
||||||
return true;
|
return;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
|
||||||
return false;
|
/* No further use of this socket. Close it */
|
||||||
|
CLOSE_THE_SOCKET(sd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -108,7 +108,7 @@ mca_btl_tcp_proc_t* mca_btl_tcp_proc_create(opal_proc_t* proc);
|
|||||||
mca_btl_tcp_proc_t* mca_btl_tcp_proc_lookup(const opal_process_name_t* name);
|
mca_btl_tcp_proc_t* mca_btl_tcp_proc_lookup(const opal_process_name_t* name);
|
||||||
int mca_btl_tcp_proc_insert(mca_btl_tcp_proc_t*, mca_btl_base_endpoint_t*);
|
int mca_btl_tcp_proc_insert(mca_btl_tcp_proc_t*, mca_btl_base_endpoint_t*);
|
||||||
int mca_btl_tcp_proc_remove(mca_btl_tcp_proc_t*, mca_btl_base_endpoint_t*);
|
int mca_btl_tcp_proc_remove(mca_btl_tcp_proc_t*, mca_btl_base_endpoint_t*);
|
||||||
bool mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t*, struct sockaddr*, int);
|
void mca_btl_tcp_proc_accept(mca_btl_tcp_proc_t*, struct sockaddr*, int);
|
||||||
bool mca_btl_tcp_proc_tosocks(mca_btl_tcp_addr_t*, struct sockaddr_storage*);
|
bool mca_btl_tcp_proc_tosocks(mca_btl_tcp_addr_t*, struct sockaddr_storage*);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user