1
1

orte/oob/tcp: Fix message dropping in case of concurrent connection.

The problem was observed for direct modex used with recursive doubling
algorithm (used for collective ID calculation prior to d52a2d081e9598a9ac9a50fb4b013a6d2a72375b)
that has pairwise nature and counter-connections are highly likely.

The following scenario was uncovering the issue:
* ranks `x` and `y` want to communicate with each other, `x` < `y`;
* rank `x` initiates the connection and sends the ack;
* rank `y` starts to `connect()` and gets the ack from `x`;
* `y` identifies that it already started connecting and `y` > `x` so it rejects incoming connection.
* `x` sees that his connection was rejected in `mca_oob_tcp_peer_recv_connect_ack()` when trying to
read the message header using `tcp_peer_recv_blocking()` which calls `mca_oob_tcp_peer_close()`
that effectively flushes all the messages in the peer->send_queue.
* `y` send the ack to `x` and the connection is established, however all the messages for the peer
at `x` are vanished (except the front one in peer->send_msg).

This commit introduces a "nack" function that will be used at `y` side to tell `x` that `y` has the
priority and `x`'s connection should be closed. This allows to avoid "guessing" on the unexpectedly
closed connection.

Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
Artem Polyakov 2016-11-26 09:45:49 +07:00
родитель 7ce3ca25ef
Коммит ada93e0c02

Просмотреть файл

@ -16,6 +16,7 @@
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -78,6 +79,7 @@
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
void* data, size_t size);
@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{
char *msg;
mca_oob_tcp_hdr_t hdr;
uint16_t ack_flag = htons(1);
int rc;
size_t sdsize;
size_t sdsize, offset = 0;
char *cred;
size_t credsize;
@ -401,21 +404,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)credsize);
/* set the number of bytes to be read beyond the header */
hdr.nbytes = strlen(orte_version_string) + 1 + credsize;
/* payload size */
sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize;
hdr.nbytes = sdsize;
MCA_OOB_TCP_HDR_HTON(&hdr);
/* create a space for our message */
sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize;
sdsize += sizeof(hdr);
if (NULL == (msg = (char*)malloc(sdsize))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(msg, 0, sdsize);
/* load the message */
memcpy(msg, &hdr, sizeof(hdr));
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize);
memcpy(msg + offset, &hdr, sizeof(hdr));
offset += sizeof(hdr);
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
offset += sizeof(ack_flag);
memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
offset += strlen(orte_version_string)+1;
memcpy(msg + offset, cred, credsize);
/* clear the memory */
if (NULL != cred) {
free(cred);
@ -433,6 +441,58 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
return ORTE_SUCCESS;
}
/* send a handshake that includes our process identifier, our
* version string, and a security token to ensure we are talking
* to another OMPI process
*/
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
{
char *msg;
mca_oob_tcp_hdr_t hdr;
uint16_t ack_flag = htons(0);
int rc = ORTE_SUCCESS;
size_t sdsize, offset = 0;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* load the header */
hdr.origin = *ORTE_PROC_MY_NAME;
hdr.dst = name;
hdr.type = MCA_OOB_TCP_IDENT;
hdr.tag = 0;
hdr.seq_num = 0;
memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
/* payload size */
sdsize = sizeof(ack_flag);
hdr.nbytes = sdsize;
MCA_OOB_TCP_HDR_HTON(&hdr);
/* create a space for our message */
sdsize += sizeof(hdr);
if (NULL == (msg = (char*)malloc(sdsize))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(msg, 0, sdsize);
/* load the message */
memcpy(msg + offset, &hdr, sizeof(hdr));
offset += sizeof(hdr);
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
offset += sizeof(ack_flag);
/* send it */
if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
/* it's ok if it fails - remote side may already
* identifiet the collision and closed the connection
*/
rc = ORTE_SUCCESS;
}
free(msg);
return rc;
}
/*
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/
@ -636,6 +696,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
return false;
} else {
/* The connection will be retried */
tcp_peer_send_connect_nack(sd, peer->name);
CLOSE_THE_SOCKET(sd);
return true;
}
@ -649,10 +710,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
char *version;
int rc;
char *cred;
size_t credsize;
size_t credsize, offset = 0;
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_peer_t *peer;
uint64_t *ui64;
uint16_t ack_flag;
bool is_new = (NULL == pr);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
@ -681,19 +744,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (NULL != peer &&
(MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
MCA_OOB_TCP_CLOSED == peer->state)) {
retry(peer, sd, false);
}
return ORTE_ERR_UNREACH;
}
@ -748,23 +798,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
CLOSE_THE_SOCKET(sd);
return ORTE_ERR_OUT_OF_RESOURCE;
}
} else {
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
if (retry(peer, sd, false)) {
return ORTE_ERR_UNREACH;
}
}
}
} else {
/* compare the peers name to the expected value */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
@ -795,23 +830,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
retry(peer, sd, true);
}
free(msg);
return ORTE_ERR_UNREACH;
}
/* Check the type of acknowledgement */
memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
offset += sizeof(ack_flag);
ack_flag = ntohs(ack_flag);
if( !ack_flag ){
if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
/* We got nack from the remote side which means that
* it will be the initiator of the connection.
*/
/* release the socket */
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
/* unregister active events */
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
/* change the state so we'll accept the remote
* connection when it'll apeear
*/
peer->state = MCA_OOB_TCP_UNCONNECTED;
} else {
/* FIXME: this shouldn't happen. We need to force next address
* to be tried.
*/
mca_oob_tcp_peer_close(peer);
}
return ORTE_ERR_UNREACH;
}
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (is_new &&
( MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
if (retry(peer, sd, false)) {
return ORTE_ERR_UNREACH;
}
}
/* check that this is from a matching version */
version = (char*)(msg);
version = (char*)((void*)msg + offset);
offset += strlen(version) + 1;
if (0 != strcmp(version, orte_version_string)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
"received different version from %s: %s instead of %s\n",
@ -830,8 +908,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
ORTE_NAME_PRINT(&peer->name));
/* check security token */
cred = (char*)(msg + strlen(version) + 1);
credsize = hdr.nbytes - strlen(version) - 1;
cred = (char*)((void*)msg + offset);
credsize = hdr.nbytes - offset;
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) {
char *hostname;
hostname = orte_get_proc_hostname(&peer->name);
@ -911,8 +989,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
{
mca_oob_tcp_send_t *snd;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp_peer_close for %s sd %d state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -964,10 +1040,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
* handle these recycled messages. This prevents us from unintentionally
* attempting to send the message again across the now-failed interface
*/
/*
if (NULL != peer->send_msg) {
}
while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
}
*/
}
/*