1
1

Merge pull request #2461 from artpol84/oob/msg_drop

orte/oob/tcp: Fix message dropping in case of concurrent connection.
Этот коммит содержится в:
Jeff Squyres 2016-11-29 11:23:15 -05:00 коммит произвёл GitHub
родитель a3782718e7 ada93e0c02
Коммит a6d390fe7b

Просмотреть файл

@ -16,6 +16,7 @@
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -78,6 +79,7 @@
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
void* data, size_t size);
@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{
char *msg;
mca_oob_tcp_hdr_t hdr;
uint16_t ack_flag = htons(1);
int rc;
size_t sdsize;
size_t sdsize, offset = 0;
char *cred;
size_t credsize;
@ -401,21 +404,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)credsize);
/* set the number of bytes to be read beyond the header */
hdr.nbytes = strlen(orte_version_string) + 1 + credsize;
/* payload size */
sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize;
hdr.nbytes = sdsize;
MCA_OOB_TCP_HDR_HTON(&hdr);
/* create a space for our message */
sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize;
sdsize += sizeof(hdr);
if (NULL == (msg = (char*)malloc(sdsize))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(msg, 0, sdsize);
/* load the message */
memcpy(msg, &hdr, sizeof(hdr));
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize);
memcpy(msg + offset, &hdr, sizeof(hdr));
offset += sizeof(hdr);
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
offset += sizeof(ack_flag);
memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
offset += strlen(orte_version_string)+1;
memcpy(msg + offset, cred, credsize);
/* clear the memory */
if (NULL != cred) {
free(cred);
@ -433,6 +441,58 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
return ORTE_SUCCESS;
}
/* send a handshake that includes our process identifier, our
* version string, and a security token to ensure we are talking
* to another OMPI process
*/
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
{
char *msg;
mca_oob_tcp_hdr_t hdr;
uint16_t ack_flag = htons(0);
int rc = ORTE_SUCCESS;
size_t sdsize, offset = 0;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* load the header */
hdr.origin = *ORTE_PROC_MY_NAME;
hdr.dst = name;
hdr.type = MCA_OOB_TCP_IDENT;
hdr.tag = 0;
hdr.seq_num = 0;
memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
/* payload size */
sdsize = sizeof(ack_flag);
hdr.nbytes = sdsize;
MCA_OOB_TCP_HDR_HTON(&hdr);
/* create a space for our message */
sdsize += sizeof(hdr);
if (NULL == (msg = (char*)malloc(sdsize))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(msg, 0, sdsize);
/* load the message */
memcpy(msg + offset, &hdr, sizeof(hdr));
offset += sizeof(hdr);
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
offset += sizeof(ack_flag);
/* send it */
if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
/* it's ok if it fails - remote side may already
* identifiet the collision and closed the connection
*/
rc = ORTE_SUCCESS;
}
free(msg);
return rc;
}
/*
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/
@ -636,6 +696,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
return false;
} else {
/* The connection will be retried */
tcp_peer_send_connect_nack(sd, peer->name);
CLOSE_THE_SOCKET(sd);
return true;
}
@ -649,10 +710,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
char *version;
int rc;
char *cred;
size_t credsize;
size_t credsize, offset = 0;
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_peer_t *peer;
uint64_t *ui64;
uint16_t ack_flag;
bool is_new = (NULL == pr);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
@ -681,19 +744,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (NULL != peer &&
(MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
MCA_OOB_TCP_CLOSED == peer->state)) {
retry(peer, sd, false);
}
return ORTE_ERR_UNREACH;
}
@ -748,23 +798,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
CLOSE_THE_SOCKET(sd);
return ORTE_ERR_OUT_OF_RESOURCE;
}
} else {
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
if (retry(peer, sd, false)) {
return ORTE_ERR_UNREACH;
}
}
}
} else {
/* compare the peers name to the expected value */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
@ -795,23 +830,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
retry(peer, sd, true);
}
free(msg);
return ORTE_ERR_UNREACH;
}
/* Check the type of acknowledgement */
memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
offset += sizeof(ack_flag);
ack_flag = ntohs(ack_flag);
if( !ack_flag ){
if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
/* We got nack from the remote side which means that
* it will be the initiator of the connection.
*/
/* release the socket */
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
/* unregister active events */
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
/* change the state so we'll accept the remote
* connection when it'll apeear
*/
peer->state = MCA_OOB_TCP_UNCONNECTED;
} else {
/* FIXME: this shouldn't happen. We need to force next address
* to be tried.
*/
mca_oob_tcp_peer_close(peer);
}
return ORTE_ERR_UNREACH;
}
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (is_new &&
( MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
if (retry(peer, sd, false)) {
return ORTE_ERR_UNREACH;
}
}
/* check that this is from a matching version */
version = (char*)(msg);
version = (char*)((void*)msg + offset);
offset += strlen(version) + 1;
if (0 != strcmp(version, orte_version_string)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
"received different version from %s: %s instead of %s\n",
@ -830,8 +908,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
ORTE_NAME_PRINT(&peer->name));
/* check security token */
cred = (char*)(msg + strlen(version) + 1);
credsize = hdr.nbytes - strlen(version) - 1;
cred = (char*)((void*)msg + offset);
credsize = hdr.nbytes - offset;
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) {
char *hostname;
hostname = orte_get_proc_hostname(&peer->name);
@ -911,8 +989,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
{
mca_oob_tcp_send_t *snd;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp_peer_close for %s sd %d state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -964,10 +1040,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
* handle these recycled messages. This prevents us from unintentionally
* attempting to send the message again across the now-failed interface
*/
/*
if (NULL != peer->send_msg) {
}
while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
}
*/
}
/*