diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 4fb71a3ba9..5b5e8844e5 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -16,6 +16,7 @@ * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -78,6 +79,7 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer); static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer); +static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name); static int tcp_peer_send_blocking(int sd, void* data, size_t size); static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size); @@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) { char *msg; mca_oob_tcp_hdr_t hdr; + uint16_t ack_flag = htons(1); int rc; - size_t sdsize; + size_t sdsize, offset = 0; char *cred; size_t credsize; @@ -401,21 +404,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)credsize); - /* set the number of bytes to be read beyond the header */ - hdr.nbytes = strlen(orte_version_string) + 1 + credsize; + /* payload size */ + sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize; + hdr.nbytes = sdsize; MCA_OOB_TCP_HDR_HTON(&hdr); /* create a space for our message */ - sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize; + sdsize += sizeof(hdr); if (NULL == (msg = (char*)malloc(sdsize))) { return ORTE_ERR_OUT_OF_RESOURCE; } memset(msg, 0, sdsize); /* load the message */ - memcpy(msg, &hdr, sizeof(hdr)); - memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string)); - memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize); + memcpy(msg + offset, &hdr, sizeof(hdr)); + offset += sizeof(hdr); + memcpy(msg + offset, &ack_flag, sizeof(ack_flag)); + offset += sizeof(ack_flag); + memcpy(msg + offset, orte_version_string, strlen(orte_version_string)); + offset += strlen(orte_version_string)+1; + memcpy(msg + offset, cred, credsize); /* clear the memory */ if (NULL != cred) { free(cred); @@ -433,6 +441,58 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) return ORTE_SUCCESS; } +/* send a handshake that includes our process identifier, our + * version string, and a security token to ensure we are talking + * to another OMPI process + */ +static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name) +{ + char *msg; + mca_oob_tcp_hdr_t hdr; + uint16_t ack_flag = htons(0); + int rc = ORTE_SUCCESS; + size_t sdsize, offset = 0; + + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* load the header */ + hdr.origin = *ORTE_PROC_MY_NAME; + hdr.dst = name; + hdr.type = MCA_OOB_TCP_IDENT; + hdr.tag = 0; + hdr.seq_num = 0; + memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1); + + /* payload size */ + sdsize = sizeof(ack_flag); + hdr.nbytes = sdsize; + MCA_OOB_TCP_HDR_HTON(&hdr); + + /* create a space for our message */ + sdsize += sizeof(hdr); + if (NULL == (msg = (char*)malloc(sdsize))) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + memset(msg, 0, sdsize); + + /* load the message */ + memcpy(msg + offset, &hdr, sizeof(hdr)); + offset += sizeof(hdr); + memcpy(msg + offset, &ack_flag, sizeof(ack_flag)); + offset += sizeof(ack_flag); + + /* send it */ + if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) { + /* it's ok if it fails - remote side may already + * identifiet the collision and closed the connection + */ + rc = ORTE_SUCCESS; + } + free(msg); + return rc; +} + /* * Initialize events to be used by the peer instance for TCP select/poll callbacks. */ @@ -636,6 +696,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal) return false; } else { /* The connection will be retried */ + tcp_peer_send_connect_nack(sd, peer->name); CLOSE_THE_SOCKET(sd); return true; } @@ -649,10 +710,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, char *version; int rc; char *cred; - size_t credsize; + size_t credsize, offset = 0; mca_oob_tcp_hdr_t hdr; mca_oob_tcp_peer_t *peer; uint64_t *ui64; + uint16_t ack_flag; + bool is_new = (NULL == pr); opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s RECV CONNECT ACK FROM %s ON SOCKET %d", @@ -681,19 +744,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (NULL != peer && - (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state || - MCA_OOB_TCP_CLOSED == peer->state)) { - retry(peer, sd, false); - } return ORTE_ERR_UNREACH; } @@ -748,23 +798,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, CLOSE_THE_SOCKET(sd); return ORTE_ERR_OUT_OF_RESOURCE; } - } else { - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state) { - if (retry(peer, sd, false)) { - return ORTE_ERR_UNREACH; - } - } } } else { - /* compare the peers name to the expected value */ if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " @@ -795,23 +830,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name), peer->sd); - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state) { - retry(peer, sd, true); - } free(msg); return ORTE_ERR_UNREACH; } + /* Check the type of acknowledgement */ + memcpy(&ack_flag, msg + offset, sizeof(ack_flag)); + offset += sizeof(ack_flag); + + ack_flag = ntohs(ack_flag); + if( !ack_flag ){ + if (MCA_OOB_TCP_CONNECT_ACK == peer->state) { + /* We got nack from the remote side which means that + * it will be the initiator of the connection. + */ + + /* release the socket */ + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + + /* unregister active events */ + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + + /* change the state so we'll accept the remote + * connection when it'll apeear + */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + } else { + /* FIXME: this shouldn't happen. We need to force next address + * to be tried. + */ + mca_oob_tcp_peer_close(peer); + } + return ORTE_ERR_UNREACH; + } + + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (is_new && + ( MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) { + if (retry(peer, sd, false)) { + return ORTE_ERR_UNREACH; + } + } + /* check that this is from a matching version */ - version = (char*)(msg); + version = (char*)((void*)msg + offset); + offset += strlen(version) + 1; if (0 != strcmp(version, orte_version_string)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " "received different version from %s: %s instead of %s\n", @@ -830,8 +908,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr, ORTE_NAME_PRINT(&peer->name)); /* check security token */ - cred = (char*)(msg + strlen(version) + 1); - credsize = hdr.nbytes - strlen(version) - 1; + cred = (char*)((void*)msg + offset); + credsize = hdr.nbytes - offset; if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) { char *hostname; hostname = orte_get_proc_hostname(&peer->name); @@ -911,8 +989,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer) */ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer) { - mca_oob_tcp_send_t *snd; - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp_peer_close for %s sd %d state %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -964,10 +1040,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer) * handle these recycled messages. This prevents us from unintentionally * attempting to send the message again across the now-failed interface */ + /* if (NULL != peer->send_msg) { } while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) { } + */ } /*