diff --git a/orte/mca/oob/tcp/oob_tcp.c b/orte/mca/oob/tcp/oob_tcp.c index 54af1955ee..bbb19cf606 100644 --- a/orte/mca/oob/tcp/oob_tcp.c +++ b/orte/mca/oob/tcp/oob_tcp.c @@ -13,6 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -616,10 +617,70 @@ static void recv_connect(mca_oob_tcp_module_t *mod, } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_tcp_recv_handler: processing connection from %s", + "%s mca_oob_tcp_recv_connect: processing connection from %s for socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin)); + ORTE_NAME_PRINT(&hdr->origin), sd); + /* lookup the corresponding process */ + peer = mca_oob_tcp_peer_lookup(mod, &hdr->origin); + if (NULL == peer) { + ui64 = (uint64_t*)(&peer->name); + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s mca_oob_tcp_recv_connect: connection from new peer", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + peer = OBJ_NEW(mca_oob_tcp_peer_t); + peer->mod = mod; + peer->name = hdr->origin; + peer->state = MCA_OOB_TCP_ACCEPTING; + ui64 = (uint64_t*)(&peer->name); + if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) { + OBJ_RELEASE(peer); + return; + } + } else { + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state) { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SIMUL CONNECTION WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&hdr->origin)); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(sd); + if (NULL != peer->active_addr) { + peer->active_addr->retries = 0; + } + cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->origin, ORTE_PROC_MY_NAME); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + return; + } else { + /* retry the connection */ + peer->state = MCA_OOB_TCP_CONNECTING; + ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect); + return; + } + } + } + /* set socket up to be non-blocking */ if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)", @@ -632,27 +693,11 @@ static void recv_connect(mca_oob_tcp_module_t *mod, } } - /* lookup the corresponding process */ - peer = mca_oob_tcp_peer_lookup(mod, &hdr->origin); - if (NULL == peer) { - ui64 = (uint64_t*)(&peer->name); - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_tcp_recv_handler: connection from new peer", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - peer = OBJ_NEW(mca_oob_tcp_peer_t); - peer->mod = mod; - peer->name = hdr->origin; - ui64 = (uint64_t*)(&peer->name); - if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) { - OBJ_RELEASE(peer); - return; - } - } /* is the peer instance willing to accept this connection */ peer->sd = sd; if (mca_oob_tcp_peer_accept(mod, peer) == false) { if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { - opal_output(0, "%s-%s mca_oob_tcp_recv_handler: " + opal_output(0, "%s-%s mca_oob_tcp_recv_connect: " "rejected connection from %s connection state %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), diff --git a/orte/mca/oob/tcp/oob_tcp.h b/orte/mca/oob/tcp/oob_tcp.h index 8c26e915b4..f6f4470e87 100644 --- a/orte/mca/oob/tcp/oob_tcp.h +++ b/orte/mca/oob/tcp/oob_tcp.h @@ -104,7 +104,8 @@ typedef enum { MCA_OOB_TCP_CONNECTING, MCA_OOB_TCP_CONNECT_ACK, MCA_OOB_TCP_CONNECTED, - MCA_OOB_TCP_FAILED + MCA_OOB_TCP_FAILED, + MCA_OOB_TCP_ACCEPTING } mca_oob_tcp_state_t; /* module-level shared functions */ diff --git a/orte/mca/oob/tcp/oob_tcp_component.c b/orte/mca/oob/tcp/oob_tcp_component.c index b55f020dba..ef7570c6d2 100644 --- a/orte/mca/oob/tcp/oob_tcp_component.c +++ b/orte/mca/oob/tcp/oob_tcp_component.c @@ -13,6 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -1129,6 +1130,9 @@ void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata) ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } + + /* ensure we mark that this peer isn't reachable by this module */ + opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx); /* do we have any other modules (i.e., NICs) we can try? */ for (k=0; k < mca_oob_tcp_component.modules.size; k++) { @@ -1187,6 +1191,9 @@ void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata) ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } + + /* ensure we mark that this peer isn't reachable by this module */ + opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx); /* do we have any other modules (i.e., NICs) we can try? */ for (k=0; k < mca_oob_tcp_component.modules.size; k++) { diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 86f8cb47a7..38871c371a 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -11,8 +11,9 @@ * All rights reserved. * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2009-2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -170,6 +171,12 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata) */ } + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_tcp_peer_try_connect: " + "attempting to connect to proc %s via interface %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), mod->if_name, peer->sd); + addrlen = sizeof(struct sockaddr_in); OPAL_LIST_FOREACH(addr, &peer->addrs, mca_oob_tcp_addr_t) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, @@ -298,7 +305,6 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod, hdr.dst = peer->name; hdr.type = MCA_OOB_TCP_IDENT; hdr.tag = 0; - hdr.nbytes = 0; MCA_OOB_TCP_HDR_HTON(&hdr); if (0 > tcp_peer_send_blocking(mod, peer, &hdr, sizeof(hdr))) { ORTE_ERROR_LOG(ORTE_ERR_UNREACH); @@ -353,8 +359,9 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod, opal_socklen_t so_length = sizeof(so_error); opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s:tcp:complete_connect called", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + "%s:tcp:complete_connect called for peer %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); /* check connect completion status */ if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { @@ -458,12 +465,21 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer) { mca_oob_tcp_hdr_t hdr; + + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s RECV CONNECT ACK FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); + if (tcp_peer_recv_blocking(mod, peer, &hdr, sizeof(hdr))) { /* If the peer state is CONNECT_ACK, then we were waiting for * the connection to be ack'd */ if (peer->state != MCA_OOB_TCP_CONNECT_ACK) { /* handshake broke down - abort this connection */ + opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); mca_oob_tcp_peer_close(mod, peer); return ORTE_ERR_UNREACH; } @@ -513,9 +529,9 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, static void tcp_peer_connected(mca_oob_tcp_peer_t* peer) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s-%s tcp_peer_connected", + "%s-%s tcp_peer_connected on socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + ORTE_NAME_PRINT(&(peer->name)), peer->sd); if (peer->timer_ev_active) { opal_event_del(&peer->timer_event); @@ -712,22 +728,16 @@ void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg) } /* - * Accept incoming connection - if not already connected. We compare the name of the - * peer to our own name using the compare_fields function as we want this to be - * a LITERAL comparison - i.e., there is no occasion when the peer's name should - * be a wildcard value. - * - * To avoid competing reciprocal connection attempts, we only accept connections from - * processes whose names are "greater" than our own. + * Accept incoming connection - if not already connected */ bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s tcp:peer_accept called for peer %s in state %s", + "%s tcp:peer_accept called for peer %s in state %s on socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name), - mca_oob_tcp_state_print(peer->state)); + mca_oob_tcp_state_print(peer->state), peer->sd); if (peer->state != MCA_OOB_TCP_CONNECTED) { @@ -753,6 +763,11 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer opal_event_add(&peer->recv_event, 0); peer->recv_ev_active = true; } + /* if a message is waiting to be sent, ensure the send event is active */ + if (NULL != peer->send_msg && !peer->send_ev_active) { + opal_event_add(&peer->send_event, 0); + peer->send_ev_active = true; + } if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { mca_oob_tcp_peer_dump(peer, "accepted"); } diff --git a/orte/mca/oob/tcp/oob_tcp_listener.c b/orte/mca/oob/tcp/oob_tcp_listener.c index f81478c695..59f0fe5584 100644 --- a/orte/mca/oob/tcp/oob_tcp_listener.c +++ b/orte/mca/oob/tcp/oob_tcp_listener.c @@ -13,6 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -768,7 +769,7 @@ static void connection_handler(int sd, short flags, void* cbdata) opal_net_get_hostname((struct sockaddr*) &new_connection->addr), opal_net_get_port((struct sockaddr*) &new_connection->addr)); - /* cycle across all interfaces untile we find the one that + /* cycle across all interfaces until we find the one that * "owns" this connection - i.e., it is handling the * incoming address space */ diff --git a/orte/mca/oob/tcp/oob_tcp_sendrecv.c b/orte/mca/oob/tcp/oob_tcp_sendrecv.c index eeb514dad3..19e149cc07 100644 --- a/orte/mca/oob/tcp/oob_tcp_sendrecv.c +++ b/orte/mca/oob/tcp/oob_tcp_sendrecv.c @@ -13,6 +13,7 @@ * All rights reserved. * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -202,19 +203,19 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata) if (NULL != msg->data || NULL == msg->msg) { /* the relay is complete - release the data */ opal_output_verbose(2, orte_oob_base_framework.framework_output, - "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES", + "%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), - (int)ntohl(msg->hdr.nbytes)); + (int)ntohl(msg->hdr.nbytes), peer->sd); OBJ_RELEASE(msg); peer->send_msg = NULL; } else if (NULL != msg->msg->buffer) { /* we are done - notify the RML */ opal_output_verbose(2, orte_oob_base_framework.framework_output, - "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES", + "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), - (int)ntohl(msg->hdr.nbytes)); + (int)ntohl(msg->hdr.nbytes), peer->sd); msg->msg->status = ORTE_SUCCESS; ORTE_RML_SEND_COMPLETE(msg->msg); OBJ_RELEASE(msg); @@ -233,10 +234,10 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata) } else { /* this message is complete - notify the RML */ opal_output_verbose(2, orte_oob_base_framework.framework_output, - "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES", + "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), - (int)ntohl(msg->hdr.nbytes)); + (int)ntohl(msg->hdr.nbytes), peer->sd); msg->msg->status = ORTE_SUCCESS; ORTE_RML_SEND_COMPLETE(msg->msg); OBJ_RELEASE(msg); @@ -250,9 +251,9 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata) return; } else { // report the error - opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message", + opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + ORTE_NAME_PRINT(&(peer->name)), peer->sd); opal_event_del(&peer->send_event); msg->msg->status = rc; ORTE_RML_SEND_COMPLETE(msg->msg); @@ -281,10 +282,10 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata) } break; default: - opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d)", + opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d) on socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&(peer->name)), - peer->state); + peer->state, peer->sd); if (peer->send_ev_active) { opal_event_del(&peer->send_event); peer->send_ev_active = false; @@ -436,6 +437,9 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* allocate a new message and setup for recv */ if (NULL == peer->recv_msg) { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:tcp:recv:handler allocate new recv msg", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); peer->recv_msg = OBJ_NEW(mca_oob_tcp_recv_t); if (NULL == peer->recv_msg) { opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n", @@ -449,6 +453,9 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) } /* if the header hasn't been completely read, read it */ if (!peer->recv_msg->hdr_recvd) { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:tcp:recv:handler read hdr", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (ORTE_SUCCESS == (rc = read_bytes(peer))) { /* completed reading the header */ peer->recv_msg->hdr_recvd = true; @@ -462,6 +469,9 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag); peer->recv_msg->data = NULL; // make sure } else { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:tcp:recv:handler allocate data region", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* allocate the data region */ peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes); /* point to it */ @@ -475,6 +485,9 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) return; } else { /* close the connection */ + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:tcp:recv:handler error reading bytes - closing connection", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); mca_oob_tcp_peer_close(mod, peer); return; } diff --git a/orte/mca/oob/tcp/oob_tcp_sendrecv.h b/orte/mca/oob/tcp/oob_tcp_sendrecv.h index 6d728cebc1..298073f2c5 100644 --- a/orte/mca/oob/tcp/oob_tcp_sendrecv.h +++ b/orte/mca/oob/tcp/oob_tcp_sendrecv.h @@ -12,6 +12,7 @@ * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow