diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 2aa4e100cf..0cf22fc6eb 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -521,7 +521,7 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s connect-ack sent to socket %d", + "%s blocking send complete to socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); return ORTE_SUCCESS; diff --git a/orte/mca/oob/usock/oob_usock.c b/orte/mca/oob/usock/oob_usock.c index 60d09c565c..c3b4784297 100644 --- a/orte/mca/oob/usock/oob_usock.c +++ b/orte/mca/oob/usock/oob_usock.c @@ -341,153 +341,6 @@ static void send_nb(orte_rml_send_t *msg) ORTE_ACTIVATE_USOCK_POST_SEND(msg, process_send); } -/* - * Handle probe - */ -static void recv_probe(int sd, mca_oob_usock_hdr_t* hdr) -{ - unsigned char* ptr = (unsigned char*)hdr; - size_t cnt = 0; - - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s:usock:recv:probe called", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - - hdr->type = MCA_OOB_USOCK_PROBE; - hdr->dst = *ORTE_PROC_MY_NAME; - - while (cnt < sizeof(mca_oob_usock_hdr_t)) { - int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_usock_hdr_t)-cnt, 0); - if (retval < 0) { - if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s-%s mca_oob_usock_peer_recv_probe: send() failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(hdr->dst)), - strerror(opal_socket_errno), - opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return; - } - continue; - } - cnt += retval; - } - CLOSE_THE_SOCKET(sd); -} - -/* - * Complete the OOB-level handshake to establish a connection with - * another peer. Called when the remote peer replies with his process - * identifier. Used in both the threaded and event listen modes. - */ -static void recv_connect(int sd, mca_oob_usock_hdr_t* hdr) -{ - mca_oob_usock_peer_t* peer; - int flags; - int cmpval; - uint64_t *ui64; - - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s:usock:recv:connect called", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - - /* check for invalid name - if this is true, then we have an error - */ - cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_NAME_INVALID); - if (cmpval == OPAL_EQUAL) { - ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); - return; - } - - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_usock_recv_connect: processing connection from %s for socket %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->dst), sd); - - /* lookup the corresponding process */ - peer = mca_oob_usock_peer_lookup(&hdr->dst); - if (NULL == peer) { - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_usock_recv_connect: connection from new peer %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->dst)); - peer = OBJ_NEW(mca_oob_usock_peer_t); - peer->name = hdr->dst; - peer->state = MCA_OOB_USOCK_ACCEPTING; - ui64 = (uint64_t*)(&peer->name); - if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) { - OBJ_RELEASE(peer); - return; - } - } else { - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_USOCK_CONNECTED == peer->state || - MCA_OOB_USOCK_CONNECTING == peer->state || - MCA_OOB_USOCK_CONNECT_ACK == peer->state) { - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s SIMUL CONNECTION WITH %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->dst)); - if (peer->recv_ev_active) { - opal_event_del(&peer->recv_event); - peer->recv_ev_active = false; - } - if (peer->send_ev_active) { - opal_event_del(&peer->send_event); - peer->send_ev_active = false; - } - if (0 < peer->sd) { - CLOSE_THE_SOCKET(peer->sd); - peer->sd = -1; - } - CLOSE_THE_SOCKET(sd); - cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_PROC_MY_NAME); - if (OPAL_VALUE1_GREATER == cmpval) { - /* force the other end to retry the connection */ - peer->state = MCA_OOB_USOCK_UNCONNECTED; - return; - } else { - /* retry the connection */ - peer->state = MCA_OOB_USOCK_CONNECTING; - ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect); - return; - } - } - } - - /* set socket up to be non-blocking */ - if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { - opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - } else { - flags |= O_NONBLOCK; - if (fcntl(sd, F_SETFL, flags) < 0) { - opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_SETFL) failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - } - } - - /* is the peer instance willing to accept this connection */ - peer->sd = sd; - if (mca_oob_usock_peer_accept(peer) == false) { - if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { - opal_output(0, "%s-%s mca_oob_usock_recv_connect: " - "rejected connection state %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), - peer->state); - } - CLOSE_THE_SOCKET(sd); - ui64 = (uint64_t*)(&peer->name); - opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL); - OBJ_RELEASE(peer); - } -} - /* * Event callback when there is data available on the registered * socket to recv. This is called for the listen sockets to accept an @@ -499,54 +352,52 @@ static void recv_handler(int sd, short flags, void *cbdata) { mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata; mca_oob_usock_hdr_t hdr; - int rc; - size_t cnt; + mca_oob_usock_peer_t *peer; + uint64_t *ui64; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s:usock:recv:handler called", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* ensure all is zero'd */ - memset(&hdr, 0, sizeof(hdr)); - - /* recv the process identifier */ - cnt = 0; - while (cnt < sizeof(hdr)) { - rc = recv(sd, (char *)&hdr, sizeof(hdr), 0); - if (0 == rc) { - if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { - opal_output(0, "%s mca_oob_usock_recv_handler: peer closed connection", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - } - CLOSE_THE_SOCKET(sd); - goto cleanup; - } else if (rc < 0) { - if (opal_socket_errno != EINTR && - opal_socket_errno != EAGAIN && - opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s mca_oob_usock_recv_handler: recv() failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - goto cleanup; - } - continue; - } - cnt += rc; + /* get the handshake */ + if (ORTE_SUCCESS != mca_oob_usock_peer_recv_connect_ack(NULL, sd, &hdr)) { + goto cleanup; } - /* dispatch based on message type */ - switch (hdr.type) { - case MCA_OOB_USOCK_PROBE: - recv_probe(sd, &hdr); - break; - case MCA_OOB_USOCK_IDENT: - recv_connect(sd, &hdr); - break; - default: - opal_output(0, "%s mca_oob_usock_recv_handler: invalid message type: %d\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr.type); - CLOSE_THE_SOCKET(sd); - break; + /* finish processing ident */ + if (MCA_OOB_USOCK_IDENT == hdr.type) { + if (NULL == (peer = mca_oob_usock_peer_lookup(&hdr.origin))) { + /* should never happen */ + mca_oob_usock_peer_close(peer); + goto cleanup; + } + /* set socket up to be non-blocking */ + if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { + opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, flags) < 0) { + opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_SETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } + } + + /* is the peer instance willing to accept this connection */ + peer->sd = sd; + if (mca_oob_usock_peer_accept(peer) == false) { + if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { + opal_output(0, "%s-%s mca_oob_usock_recv_connect: " + "rejected connection state %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->state); + } + CLOSE_THE_SOCKET(sd); + ui64 = (uint64_t*)(&peer->name); + opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL); + OBJ_RELEASE(peer); + } } cleanup: diff --git a/orte/mca/oob/usock/oob_usock_connection.c b/orte/mca/oob/usock/oob_usock_connection.c index d50b7e0c63..eb26f497b8 100644 --- a/orte/mca/oob/usock/oob_usock_connection.c +++ b/orte/mca/oob/usock/oob_usock_connection.c @@ -51,6 +51,7 @@ #include "opal_stdint.h" #include "opal/mca/backtrace/backtrace.h" #include "opal/mca/base/mca_base_var.h" +#include "opal/mca/sec/sec.h" #include "opal/util/output.h" #include "opal/util/net.h" #include "opal/util/error.h" @@ -73,9 +74,9 @@ static void usock_peer_event_init(mca_oob_usock_peer_t* peer); static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer); static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, - void* data, size_t size); + int sd, void* data, size_t size); static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, - void* data, size_t size); + int sd, void* data, size_t size); static void usock_peer_connected(mca_oob_usock_peer_t* peer); static int usock_peer_create_socket(mca_oob_usock_peer_t* peer) @@ -276,7 +277,11 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata) static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer) { + char *msg; mca_oob_usock_hdr_t hdr; + int rc; + size_t sdsize; + opal_sec_cred_t *cred; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); @@ -284,11 +289,34 @@ static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer) /* send a handshake that includes our process identifier * to ensure we are talking to another OMPI process */ - hdr.dst = *ORTE_PROC_MY_NAME; + hdr.origin = *ORTE_PROC_MY_NAME; + hdr.dst = peer->name; hdr.type = MCA_OOB_USOCK_IDENT; hdr.tag = 0; - hdr.nbytes = 0; - if (ORTE_SUCCESS != usock_peer_send_blocking(peer, &hdr, sizeof(hdr))) { + + /* get our security credential*/ + if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* set the number of bytes to be read beyond the header */ + hdr.nbytes = strlen(orte_version_string) + 1 + cred->size; + + /* create a space for our message */ + sdsize = (sizeof(hdr) + strlen(orte_version_string) + 1 + cred->size); + if (NULL == (msg = (char*)malloc(sdsize))) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + memset(msg, 0, sdsize); + + /* load the message */ + memcpy(msg, &hdr, sizeof(hdr)); + memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string)); + memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred->credential, cred->size); + + + if (ORTE_SUCCESS != usock_peer_send_blocking(peer, peer->sd, msg, sdsize)) { ORTE_ERROR_LOG(ORTE_ERR_UNREACH); return ORTE_ERR_UNREACH; } @@ -413,26 +441,25 @@ void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t *peer) * information that identifies the peers endpoint. */ static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, - void* data, size_t size) + int sd, void* data, size_t size) { unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; int retval; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s sending connect-ack to %s", + "%s send blocking of %"PRIsize_t" bytes to socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + size, sd); while (cnt < size) { - retval = send(peer->sd, (char*)ptr+cnt, size-cnt, 0); + retval = send(sd, (char*)ptr+cnt, size-cnt, 0); if (retval < 0) { if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s usock_peer_send_blocking: send() to %s failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), - strerror(opal_socket_errno), - opal_socket_errno); + opal_output(0, "%s usock_peer_send_blocking: send() to socket %d failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd, + strerror(opal_socket_errno), + opal_socket_errno); peer->state = MCA_OOB_USOCK_FAILED; mca_oob_usock_peer_close(peer); return ORTE_ERR_UNREACH; @@ -443,9 +470,8 @@ static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, } opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s connect-ack sent to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + "%s blocking send complete to socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); return ORTE_SUCCESS; } @@ -455,51 +481,211 @@ static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, * connected socket and verify the expected response. If so, move the * socket to a connected state. */ -int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer) +int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* pr, int sd, + mca_oob_usock_hdr_t *dhdr) { + char *msg; + char *version; + int rc, cmpval; + opal_sec_cred_t creds; + mca_oob_usock_peer_t *peer; mca_oob_usock_hdr_t hdr; + uint64_t *ui64; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s RECV CONNECT ACK FROM %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); + (NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd); + peer = pr; /* ensure all is zero'd */ - memset(&hdr, 0, sizeof(hdr)); + memset(&hdr, 0, sizeof(mca_oob_usock_hdr_t)); - if (usock_peer_recv_blocking(peer, &hdr, sizeof(hdr))) { - /* If the peer state is CONNECT_ACK, then we were waiting for - * the connection to be ack'd - */ - if (peer->state != MCA_OOB_USOCK_CONNECT_ACK) { - /* handshake broke down - abort this connection */ - opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); - mca_oob_usock_peer_close(peer); - return ORTE_ERR_UNREACH; + if (usock_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t))) { + if (NULL != peer) { + /* If the peer state is CONNECT_ACK, then we were waiting for + * the connection to be ack'd + */ + if (peer->state != MCA_OOB_USOCK_CONNECT_ACK) { + /* handshake broke down - abort this connection */ + opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), sd); + mca_oob_usock_peer_close(peer); + return ORTE_ERR_UNREACH; + } } } else { /* unable to complete the recv */ opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); return ORTE_ERR_UNREACH; } + /* if the requestor wanted the header returned, then do so now */ + if (NULL != dhdr) { + *dhdr = hdr; + } + + if (MCA_OOB_USOCK_PROBE == hdr.type) { + /* send a header back */ + hdr.type = MCA_OOB_USOCK_PROBE; + hdr.dst = hdr.origin; + hdr.origin = *ORTE_PROC_MY_NAME; + usock_peer_send_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t)); + CLOSE_THE_SOCKET(sd); + return ORTE_SUCCESS; + } if (hdr.type != MCA_OOB_USOCK_IDENT) { opal_output(0, "usock_peer_recv_connect_ack: invalid header type: %d\n", hdr.type); - peer->state = MCA_OOB_USOCK_FAILED; - mca_oob_usock_peer_close(peer); + if (NULL != peer) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + } else { + CLOSE_THE_SOCKET(sd); + } return ORTE_ERR_UNREACH; } opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s connect-ack recvd from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name)); + + /* if we don't already have it, get the peer */ + if (NULL == peer) { + peer = mca_oob_usock_peer_lookup(&hdr.origin); + if (NULL == peer) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s mca_oob_usock_recv_connect: connection from new peer", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + peer = OBJ_NEW(mca_oob_usock_peer_t); + peer->name = hdr.origin; + peer->state = MCA_OOB_USOCK_ACCEPTING; + peer->sd = sd; + ui64 = (uint64_t*)(&peer->name); + if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) { + OBJ_RELEASE(peer); + CLOSE_THE_SOCKET(sd); + return ORTE_ERR_UNREACH; + } + } else { + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (MCA_OOB_USOCK_CONNECTED == peer->state || + MCA_OOB_USOCK_CONNECTING == peer->state || + MCA_OOB_USOCK_CONNECT_ACK == peer->state) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SIMUL CONNECTION WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&hdr.origin)); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(sd); + peer->retries = 0; + cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_USOCK_UNCONNECTED; + return ORTE_ERR_UNREACH; + } else { + /* retry the connection */ + peer->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect); + return ORTE_ERR_UNREACH; + } + } + } + } else { + /* compare the peers name to the expected value */ + if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) { + opal_output(0, "%s usock_peer_recv_connect_ack: " + "received unexpected process identifier %s from %s\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(hdr.origin)), + ORTE_NAME_PRINT(&(peer->name))); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return ORTE_ERR_UNREACH; + } + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack header from %s is okay", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name)); + /* get the authentication and version payload */ + if (NULL == (msg = (char*)malloc(hdr.nbytes))) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return ORTE_ERR_OUT_OF_RESOURCE; + } + if (!usock_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) { + /* unable to complete the recv */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s unable to complete recv of connect-ack from %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); + free(msg); + return ORTE_ERR_UNREACH; + } + + /* check that this is from a matching version */ + version = (char*)(msg); + if (0 != strcmp(version, orte_version_string)) { + opal_output(0, "%s usock_peer_recv_connect_ack: " + "received different version from %s: %s instead of %s\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + version, orte_version_string); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + free(msg); + return ORTE_ERR_UNREACH; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack version from %s matches ours", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* check security token */ + creds.credential = (char*)(msg + strlen(version) + 1); + creds.size = hdr.nbytes - strlen(version) - 1; + if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) { + ORTE_ERROR_LOG(rc); + } + free(msg); + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack %s authenticated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* if the requestor wanted the header returned, then they + * will complete their processing + */ + if (NULL != dhdr) { + return ORTE_SUCCESS; + } + /* set the peer into the component and OOB-level peer tables to indicate * that we know this peer and we will be handling him */ @@ -591,7 +777,7 @@ void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer) * information that identifies the peers endpoint. */ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, - void* data, size_t size) + int sd, void* data, size_t size) { unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; @@ -599,10 +785,10 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s waiting for connect ack from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); while (cnt < size) { - int retval = recv(peer->sd, (char *)ptr+cnt, size-cnt, 0); + int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0); /* remote closed connection */ if (retval == 0) { @@ -610,8 +796,8 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, "%s-%s usock_peer_recv_blocking: " "peer closed connection: peer state %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), - peer->state); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), + (NULL == peer) ? 0 : peer->state); mca_oob_usock_peer_close(peer); return false; } @@ -640,18 +826,22 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, "%s connect ack received error %s from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); return false; } else { opal_output(0, "%s usock_peer_recv_blocking: " "recv() failed for %s: %s (%d)\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), strerror(opal_socket_errno), opal_socket_errno); - peer->state = MCA_OOB_USOCK_FAILED; - mca_oob_usock_peer_close(peer); + if (NULL != peer) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + } else { + CLOSE_THE_SOCKET(sd); + } return false; } } @@ -663,7 +853,7 @@ static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s connect ack received from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); return true; } diff --git a/orte/mca/oob/usock/oob_usock_connection.h b/orte/mca/oob/usock/oob_usock_connection.h index 56a0697229..2a009d6183 100644 --- a/orte/mca/oob/usock/oob_usock_connection.h +++ b/orte/mca/oob/usock/oob_usock_connection.h @@ -95,7 +95,8 @@ ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_try_connect(int fd, short args, voi ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg); ORTE_MODULE_DECLSPEC bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer); ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t* peer); -ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer); +ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer, + int sd, mca_oob_usock_hdr_t *hdr); ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer); #endif /* _MCA_OOB_USOCK_CONNECTION_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_hdr.h b/orte/mca/oob/usock/oob_usock_hdr.h index 944798bc9f..3ee8396773 100644 --- a/orte/mca/oob/usock/oob_usock_hdr.h +++ b/orte/mca/oob/usock/oob_usock_hdr.h @@ -40,8 +40,10 @@ typedef enum { /* header for usock msgs */ typedef struct { + /* the original sender */ + orte_process_name_t origin; /* the intended final recipient */ - orte_process_name_t dst; + orte_process_name_t dst; /* type of message */ mca_oob_usock_msg_type_t type; /* the rml tag where this message is headed */ diff --git a/orte/mca/oob/usock/oob_usock_sendrecv.c b/orte/mca/oob/usock/oob_usock_sendrecv.c index 14e0d2e505..d187fa7c04 100644 --- a/orte/mca/oob/usock/oob_usock_sendrecv.c +++ b/orte/mca/oob/usock/oob_usock_sendrecv.c @@ -393,7 +393,7 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata) switch (peer->state) { case MCA_OOB_USOCK_CONNECT_ACK: - if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer))) { + if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer, peer->sd, NULL))) { opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s:usock:recv:handler starting send/recv events", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); @@ -499,7 +499,7 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata) opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s RECVD COMPLETE MESSAGE FROM %s OF %d BYTES FOR DEST %s TAG %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), + ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin), (int)peer->recv_msg->hdr.nbytes, ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst), peer->recv_msg->hdr.tag); @@ -510,18 +510,20 @@ void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata) opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s DELIVERING TO RML", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - ORTE_RML_POST_MESSAGE(&peer->name, peer->recv_msg->hdr.tag, + ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin, peer->recv_msg->hdr.tag, peer->recv_msg->data, peer->recv_msg->hdr.nbytes); OBJ_RELEASE(peer->recv_msg); } else { /* no - we don't route things, so we promote this * back to the OOB and let another transport move - * it along + * it along. If we are a daemon and it is intended + * for another of our local procs, it will just come + * back to us and be handled then */ snd = OBJ_NEW(orte_rml_send_t); snd->dst = peer->recv_msg->hdr.dst; - snd->origin = peer->name; + snd->origin = peer->recv_msg->hdr.origin; snd->tag = peer->recv_msg->hdr.tag; snd->data = peer->recv_msg->data; snd->count = peer->recv_msg->hdr.nbytes; diff --git a/orte/mca/oob/usock/oob_usock_sendrecv.h b/orte/mca/oob/usock/oob_usock_sendrecv.h index d1d78abd40..c704c4f89f 100644 --- a/orte/mca/oob/usock/oob_usock_sendrecv.h +++ b/orte/mca/oob/usock/oob_usock_sendrecv.h @@ -119,6 +119,7 @@ OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t); ORTE_NAME_PRINT(&((m)->dst))); \ msg = OBJ_NEW(mca_oob_usock_send_t); \ /* setup the header */ \ + msg->hdr.origin = (m)->origin; \ msg->hdr.dst = (m)->dst; \ msg->hdr.type = MCA_OOB_USOCK_USER; \ msg->hdr.tag = (m)->tag; \ @@ -159,6 +160,7 @@ OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t); ORTE_NAME_PRINT(&((m)->dst))); \ msg = OBJ_NEW(mca_oob_usock_send_t); \ /* setup the header */ \ + msg->hdr.origin = (m)->origin; \ msg->hdr.dst = (m)->dst; \ msg->hdr.type = MCA_OOB_USOCK_USER; \ msg->hdr.tag = (m)->tag; \