From 230336b6a8f2d87ea6e726d9f09c2a6c75cdb5f6 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 4 Feb 2014 14:47:04 +0000 Subject: [PATCH] Upgrade the security framework to avoid multiple hits against the global security server. Add support for future case where mpirun assings a global security credential for a given run, though we need to work out how to handle connect-accept from other mpirun's in that case. Remove a bunch of duplicate code in the OOB by consolidating the connection handshake code. Refs trac:4221 This commit was SVN r30554. The following Trac tickets were found above: Ticket 4221 --> https://svn.open-mpi.org/trac/ompi/ticket/4221 --- opal/include/opal/constants.h | 4 +- opal/mca/db/db_types.h | 5 +- opal/mca/sec/basic/sec_basic.c | 91 ++++--- opal/mca/sec/keystone/sec_keystone.c | 20 +- opal/mca/sec/sec.h | 45 ++-- opal/runtime/opal_init.c | 3 + orte/mca/oob/tcp/oob_tcp.c | 282 ++++------------------ orte/mca/oob/tcp/oob_tcp_connection.c | 333 ++++++++++++++++++-------- orte/mca/oob/tcp/oob_tcp_connection.h | 4 +- orte/mca/oob/tcp/oob_tcp_sendrecv.c | 2 +- 10 files changed, 358 insertions(+), 431 deletions(-) diff --git a/opal/include/opal/constants.h b/opal/include/opal/constants.h index d71348a54c..2d8ec252fb 100644 --- a/opal/include/opal/constants.h +++ b/opal/include/opal/constants.h @@ -10,6 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -75,7 +76,8 @@ enum { OPAL_ERR_TAKE_NEXT_OPTION = (OPAL_ERR_BASE - 46), OPAL_ERR_PROC_ENTRY_NOT_FOUND = (OPAL_ERR_BASE - 47), OPAL_ERR_DATA_VALUE_NOT_FOUND = (OPAL_ERR_BASE - 48), - OPAL_ERR_CONNECTION_FAILED = (OPAL_ERR_BASE - 49) + OPAL_ERR_CONNECTION_FAILED = (OPAL_ERR_BASE - 49), + OPAL_ERR_AUTHENTICATION_FAILED = (OPAL_ERR_BASE - 50) }; #define OPAL_ERR_MAX (OPAL_ERR_BASE - 100) diff --git a/opal/mca/db/db_types.h b/opal/mca/db/db_types.h index 7ef8e81a6a..9b3acd4bcc 100644 --- a/opal/mca/db/db_types.h +++ b/opal/mca/db/db_types.h @@ -24,8 +24,9 @@ BEGIN_C_DECLS /* some OPAL-appropriate key definitions */ -#define OPAL_DB_LOCALITY "opal.locality" -#define OPAL_DB_CPUSET "opal.cpuset" +#define OPAL_DB_LOCALITY "opal.locality" +#define OPAL_DB_CPUSET "opal.cpuset" +#define OPAL_DB_CREDENTIAL "opal.cred" END_C_DECLS diff --git a/opal/mca/sec/basic/sec_basic.c b/opal/mca/sec/basic/sec_basic.c index 5d61cf3748..a380a7f41c 100644 --- a/opal/mca/sec/basic/sec_basic.c +++ b/opal/mca/sec/basic/sec_basic.c @@ -20,26 +20,27 @@ #include "opal/util/error.h" #include "opal/util/output.h" #include "opal/util/show_help.h" +#include "opal/mca/db/db.h" #include "opal/mca/sec/base/base.h" #include "sec_basic.h" static int init(void); static void finalize(void); -static int get_token(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size); -static int authenticate(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size); +static int get_my_cred(opal_identifier_t *my_id, + opal_sec_cred_t **cred); +static int authenticate(opal_sec_cred_t *cred); opal_sec_base_module_t opal_sec_basic_module = { init, finalize, - get_token, + get_my_cred, authenticate }; +static opal_sec_cred_t my_cred; +static bool initialized = false; + static int init(void) { return OPAL_SUCCESS; @@ -47,50 +48,42 @@ static int init(void) static void finalize(void) { -} - -static int get_token(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size) -{ - uint32_t ui32; - - opal_output_verbose(2, opal_sec_base_framework.framework_output, - "creating sec token for %"PRIu64"", *proc); - - ui32 = htonl(12345); - memcpy(token, &ui32, 4); - - opal_output_verbose(2, opal_sec_base_framework.framework_output, - "proc %"PRIu64" was assigned token %u", - *proc, 12345); - return OPAL_SUCCESS; -} - -static int authenticate(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size) -{ - uint32_t ui32; - uint32_t chk; - - opal_output_verbose(2, opal_sec_base_framework.framework_output, - "authenticating %"PRIu64"", *proc); - - /* for now, just check the identifier against the proc id */ - memcpy(&ui32, token, 4); - - chk = ntohl(ui32); - - if (12345 != chk) { - opal_output_verbose(2, opal_sec_base_framework.framework_output, - "proc %"PRIu64" was not authenticated %u vs %u", - *proc, chk, 12345); - return OPAL_ERROR; + if (initialized) { + free(my_cred.credential); } +} + +static int get_my_cred(opal_identifier_t *my_id, + opal_sec_cred_t **cred) +{ + opal_byte_object_t *cd; + + if (!initialized) { + /* check first if a credential was stored for this job + * in the database + */ + if (OPAL_SUCCESS == opal_db.fetch(my_id, OPAL_DB_CREDENTIAL, + (void**)&cd, OPAL_BYTE_OBJECT)) { + my_cred.credential = (char*)cd->bytes; + my_cred.size = cd->size; + } else { + my_cred.credential = strdup("12345"); + my_cred.size = strlen(my_cred.credential)+1; // include the NULL + } + } + initialized = true; + + *cred = &my_cred; - opal_output_verbose(2, opal_sec_base_framework.framework_output, - "proc %"PRIu64" was authenticated", *proc); return OPAL_SUCCESS; } +static int authenticate(opal_sec_cred_t *cred) +{ + + if (0 == strncmp(cred->credential, "12345", strlen("12345"))) { + return OPAL_SUCCESS; + } + return OPAL_ERR_AUTHENTICATION_FAILED; +} + diff --git a/opal/mca/sec/keystone/sec_keystone.c b/opal/mca/sec/keystone/sec_keystone.c index 19545427e5..e9f3be3210 100644 --- a/opal/mca/sec/keystone/sec_keystone.c +++ b/opal/mca/sec/keystone/sec_keystone.c @@ -22,17 +22,14 @@ static int init(void); static void finalize(void); -static int get_token(const opal_identifier_t *proc, - opal_sec_cred_t *token, - size_t size); -static int authenticate(const opal_identifier_t *proc, - opal_sec_cred_t *token, - size_t size); +static int get_my_cred(opal_identifier_t *my_id, + opal_sec_cred_t **cred); +static int authenticate(opal_sec_cred_t *cred); opal_sec_base_module_t opal_sec_keystone_module = { init, finalize, - get_token, + get_my_cred, authenticate }; @@ -45,16 +42,13 @@ static void finalize(void) { } -static int get_token(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size) +static int get_my_cred(opal_identifier_t *my_id, + opal_sec_cred_t **cred) { return OPAL_ERR_NOT_IMPLEMENTED; } -static int authenticate(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size) +static int authenticate(opal_sec_cred_t *cred) { return OPAL_ERR_NOT_IMPLEMENTED; } diff --git a/opal/mca/sec/sec.h b/opal/mca/sec/sec.h index db3ae710a4..971e066645 100644 --- a/opal/mca/sec/sec.h +++ b/opal/mca/sec/sec.h @@ -28,12 +28,20 @@ * built should check to see if it can connect to its * respective server - if it can, then it should return * success to indicate it is ready to be used. + * + * For scalability, it is important that each process only + * contact the security server once, and only when requested + * to do so. Thus, the plugin should not get credentials for + * the process until the first call to "get_my_credentials", + * and should then cache the results for future use. */ BEGIN_C_DECLS -#define OPAL_SEC_CRED_MAX_SIZE 512 // max size of the OPAL security credential -typedef uint8_t* opal_sec_cred_t; +typedef struct { + char *credential; + size_t size; +} opal_sec_cred_t; /* * Initialize the module @@ -46,31 +54,32 @@ typedef int (*opal_sec_base_module_init_fn_t)(void); typedef void (*opal_sec_base_module_finalize_fn_t)(void); /* - * Get a security credential - given my process identifier, return - * a "token" that I can use for authenticating myself to another process. - * The value must be returned in the provided location, subject to - * the specified size constraint, in a network-byte-ordered form suitable + * Get a security credential for this process - return pointer to + * a "credential" that I can use for authenticating myself to another process. + * The value must be returned in a network-byte-ordered form suitable * for sending across the network. * - * Function returns OPAL_SUCCESS if a token was assigned, or an error + * It isn't expected that the identifier will be used to obtain a + * certificate as external security systems will have no idea what + * it means. However, some modules may use it, and there is no way + * for the opal layer to know a process identifier without being told, + * do provide it here + * + * Function returns OPAL_SUCCESS if a credential was assigned, or an error * code indicating why it failed */ -typedef int (*opal_sec_base_module_get_token_fn_t)(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size); +typedef int (*opal_sec_base_module_get_my_cred_fn_t)(opal_identifier_t *my_id, + opal_sec_cred_t **cred); /* - * Authenticate a security credential - given a process identifier and - * the security credential it provided, determine if the credential is - * valid. The credential is passed in a network-byte-ordered form as it - * came across the network. + * Authenticate a security credential - given a security credential, + * determine if the credential is valid. The credential is passed in + * a network-byte-ordered form as it came across the network. * * Function returns OPAL_SUCCESS if the token is authenticated, or an * error code indicating why it failed */ -typedef int (*opal_sec_base_module_auth_fn_t)(const opal_identifier_t *proc, - opal_sec_cred_t token, - size_t size); +typedef int (*opal_sec_base_module_auth_fn_t)(opal_sec_cred_t *cred); /* * the standard module data structure @@ -78,7 +87,7 @@ typedef int (*opal_sec_base_module_auth_fn_t)(const opal_identifier_t *proc, struct opal_sec_base_module_1_0_0_t { opal_sec_base_module_init_fn_t init; opal_sec_base_module_finalize_fn_t finalize; - opal_sec_base_module_get_token_fn_t get_token; + opal_sec_base_module_get_my_cred_fn_t get_my_credential; opal_sec_base_module_auth_fn_t authenticate; }; typedef struct opal_sec_base_module_1_0_0_t opal_sec_base_module_1_0_0_t; diff --git a/opal/runtime/opal_init.c b/opal/runtime/opal_init.c index e6d8a8de71..bd2c24ef93 100644 --- a/opal/runtime/opal_init.c +++ b/opal/runtime/opal_init.c @@ -229,6 +229,9 @@ opal_err2str(int errnum, const char **errmsg) case OPAL_ERR_CONNECTION_FAILED: retval = "Connection failed"; break; + case OPAL_ERR_AUTHENTICATION_FAILED: + retval = "Authentication failed"; + break; default: retval = NULL; } diff --git a/orte/mca/oob/tcp/oob_tcp.c b/orte/mca/oob/tcp/oob_tcp.c index 54460e5b73..dec5f65ff5 100644 --- a/orte/mca/oob/tcp/oob_tcp.c +++ b/orte/mca/oob/tcp/oob_tcp.c @@ -557,199 +557,6 @@ static void resend(struct mca_oob_tcp_msg_error_t *mp) ORTE_ACTIVATE_TCP_POST_RESEND(mop, process_resend); } -/* - * Handle probe - */ -static void recv_probe(int sd, mca_oob_tcp_hdr_t* hdr) -{ - unsigned char* ptr = (unsigned char*)hdr; - size_t cnt = 0; - - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s:tcp:recv:probe called", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - - hdr->type = MCA_OOB_TCP_PROBE; - hdr->dst = hdr->origin; - hdr->origin = *ORTE_PROC_MY_NAME; - MCA_OOB_TCP_HDR_HTON(hdr); - - while (cnt < sizeof(mca_oob_tcp_hdr_t)) { - int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0); - if (retval < 0) { - if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s-%s mca_oob_tcp_peer_recv_probe: send() failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(hdr->dst)), - strerror(opal_socket_errno), - opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return; - } - continue; - } - cnt += retval; - } - CLOSE_THE_SOCKET(sd); -} - -/* - * Complete the OOB-level handshake to establish a connection with - * another peer. Called when the remote peer replies with his process - * identifier. Used in both the threaded and event listen modes. - */ -static void recv_connect(mca_oob_tcp_module_t *mod, - int sd, uint8_t *msg) -{ - mca_oob_tcp_peer_t* peer; - int flags, cmpval; - uint64_t *ui64; - mca_oob_tcp_hdr_t *hdr; - char *version; - int rc; - - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s:tcp:recv:connect called with msg size %lu", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (unsigned long)sizeof(msg)); - - /* check for invalid name - if this is true, then we have an error - */ - hdr = (mca_oob_tcp_hdr_t*)msg; // was already converted to host order - if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->origin, ORTE_NAME_INVALID)) { - ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); - CLOSE_THE_SOCKET(sd); - return; - } - - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s connect-ack header from %s is okay", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin)); - - /* check that this is from a matching version */ - version = (char*)(msg + sizeof(mca_oob_tcp_hdr_t)); - if (0 != strcmp(version, orte_version_string)) { - opal_output(0, "%s recv_connect: " - "received different version from %s: %s instead of %s\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(hdr->origin)), - version, orte_version_string); - ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); - CLOSE_THE_SOCKET(sd); - return; - } - - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s connect-ack version from %s matches ours", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin)); - - /* check security token */ - if (OPAL_SUCCESS != (rc = opal_sec.authenticate((opal_identifier_t*)(&hdr->origin), - (opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1), - OPAL_SEC_CRED_MAX_SIZE))) { - opal_output(0, "%s SECURITY CONNECTION ERROR FROM %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin)); - ORTE_ERROR_LOG(rc); - } - - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_tcp_recv_connect: processing connection from %s for socket %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin), sd); - - /* lookup the corresponding process */ - peer = mca_oob_tcp_peer_lookup(mod, &hdr->origin); - if (NULL == peer) { - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s mca_oob_tcp_recv_connect: connection from new peer", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - peer = OBJ_NEW(mca_oob_tcp_peer_t); - peer->mod = mod; - peer->name = hdr->origin; - peer->state = MCA_OOB_TCP_ACCEPTING; - ui64 = (uint64_t*)(&peer->name); - if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) { - OBJ_RELEASE(peer); - CLOSE_THE_SOCKET(sd); - return; - } - } else { - /* check for a race condition - if I was in the process of - * creating a connection to the peer, or have already established - * such a connection, then we need to reject this connection. We will - * let the higher ranked process retry - if I'm the lower ranked - * process, I'll simply defer until I receive the request - */ - if (MCA_OOB_TCP_CONNECTED == peer->state || - MCA_OOB_TCP_CONNECTING == peer->state || - MCA_OOB_TCP_CONNECT_ACK == peer->state) { - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s SIMUL CONNECTION WITH %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hdr->origin)); - if (peer->recv_ev_active) { - opal_event_del(&peer->recv_event); - peer->recv_ev_active = false; - } - if (peer->send_ev_active) { - opal_event_del(&peer->send_event); - peer->send_ev_active = false; - } - if (0 < peer->sd) { - CLOSE_THE_SOCKET(peer->sd); - peer->sd = -1; - } - CLOSE_THE_SOCKET(sd); - if (NULL != peer->active_addr) { - peer->active_addr->retries = 0; - } - cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->origin, ORTE_PROC_MY_NAME); - if (OPAL_VALUE1_GREATER == cmpval) { - /* force the other end to retry the connection */ - peer->state = MCA_OOB_TCP_UNCONNECTED; - return; - } else { - /* retry the connection */ - peer->state = MCA_OOB_TCP_CONNECTING; - ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect); - return; - } - } - } - - /* set socket up to be non-blocking */ - if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { - opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - } else { - flags |= O_NONBLOCK; - if (fcntl(sd, F_SETFL, flags) < 0) { - opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_SETFL) failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - } - } - - /* is the peer instance willing to accept this connection */ - peer->sd = sd; - if (mca_oob_tcp_peer_accept(mod, peer) == false) { - if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { - opal_output(0, "%s-%s mca_oob_tcp_recv_connect: " - "rejected connection from %s connection state %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), - ORTE_NAME_PRINT(&(hdr->origin)), - peer->state); - } - CLOSE_THE_SOCKET(sd); - ui64 = (uint64_t*)(&peer->name); - opal_hash_table_set_value_uint64(&mod->peers, (*ui64), NULL); - OBJ_RELEASE(peer); - } -} - /* * Event callback when there is data available on the registered * socket to recv. This is called for the listen sockets to accept an @@ -757,67 +564,58 @@ static void recv_connect(mca_oob_tcp_module_t *mod, * connection process, and for probes. Data on an established * connection is handled elsewhere. */ -static void recv_handler(int sd, short flags, void *cbdata) +static void recv_handler(int sd, short flg, void *cbdata) { mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata; - size_t cnt, rdsize; - uint8_t *msg; - mca_oob_tcp_hdr_t *hdr; - int rc; + int flags; + uint64_t *ui64; + mca_oob_tcp_hdr_t hdr; + mca_oob_tcp_peer_t *peer; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s:tcp:recv:handler called", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* malloc a fixed size */ - rdsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL - msg = (uint8_t*)malloc(rdsize); - - /* ensure all is zero'd */ - memset(msg, 0, rdsize); - /* get the handshake */ - cnt = 0; - while (cnt < rdsize) { - rc = recv(sd, (char*)(msg+cnt), rdsize-cnt, 0); - if (0 == rc) { + if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(op->mod, NULL, sd, &hdr)) { + goto cleanup; + } + + /* finish processing ident */ + if (MCA_OOB_TCP_IDENT == hdr.type) { + if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &hdr.origin))) { + /* should never happen */ + mca_oob_tcp_peer_close(op->mod, peer); + goto cleanup; + } + /* set socket up to be non-blocking */ + if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { + opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, flags) < 0) { + opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_SETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } + } + + /* is the peer instance willing to accept this connection */ + peer->sd = sd; + if (mca_oob_tcp_peer_accept(op->mod, peer) == false) { if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { - opal_output(0, "%s mca_oob_tcp_recv_handler: peer closed connection", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + opal_output(0, "%s-%s mca_oob_tcp_recv_connect: " + "rejected connection from %s connection state %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + ORTE_NAME_PRINT(&(hdr.origin)), + peer->state); } CLOSE_THE_SOCKET(sd); - goto cleanup; - } else if (rc < 0) { - if (opal_socket_errno != EINTR && - opal_socket_errno != EAGAIN && - opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s mca_oob_tcp_recv_handler: recv() failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - goto cleanup; - } - continue; + ui64 = (uint64_t*)(&peer->name); + opal_hash_table_set_value_uint64(&op->mod->peers, (*ui64), NULL); + OBJ_RELEASE(peer); } - cnt += rc; - } - /* check the header */ - hdr = (mca_oob_tcp_hdr_t*)msg; - MCA_OOB_TCP_HDR_NTOH(hdr); - - /* dispatch based on message type */ - switch (hdr->type) { - case MCA_OOB_TCP_PROBE: - recv_probe(sd, hdr); - break; - case MCA_OOB_TCP_IDENT: - recv_connect(op->mod, sd, msg); - break; - default: - opal_output(0, "%s recv_handler: invalid message type: %d from peer %s\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr->type, - ORTE_NAME_PRINT(&hdr->origin)); - CLOSE_THE_SOCKET(sd); - break; } cleanup: diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index 84b08e79ce..2aa4e100cf 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -73,19 +73,18 @@ #include "orte/mca/oob/tcp/oob_tcp_connection.h" static void tcp_peer_event_init(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer); + mca_oob_tcp_peer_t* peer); static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer); -static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer, - void* data, size_t size); + mca_oob_tcp_peer_t* peer); +static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, int sd, + void* data, size_t size); static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer, - void* data, size_t size); + mca_oob_tcp_peer_t* peer, int sd, + void* data, size_t size); static void tcp_peer_connected(mca_oob_tcp_peer_t* peer); static int tcp_peer_create_socket(mca_oob_tcp_module_t *md, - mca_oob_tcp_peer_t* peer) + mca_oob_tcp_peer_t* peer) { int flags; @@ -324,42 +323,53 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata) static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer) { - uint8_t *msg; - mca_oob_tcp_hdr_t *hdr; + char *msg; + mca_oob_tcp_hdr_t hdr; int rc; size_t sdsize; + opal_sec_cred_t *cred; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* malloc a fixed size */ - sdsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL - msg = (uint8_t*)malloc(sdsize); + /* load the header */ + hdr.origin = *ORTE_PROC_MY_NAME; + hdr.dst = peer->name; + hdr.type = MCA_OOB_TCP_IDENT; + hdr.tag = 0; + + /* get our security credential*/ + if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* set the number of bytes to be read beyond the header */ + hdr.nbytes = strlen(orte_version_string) + 1 + cred->size; + MCA_OOB_TCP_HDR_HTON(&hdr); + + /* create a space for our message */ + sdsize = (sizeof(hdr) + strlen(orte_version_string) + 1 + cred->size); + if (NULL == (msg = (char*)malloc(sdsize))) { + return ORTE_ERR_OUT_OF_RESOURCE; + } memset(msg, 0, sdsize); - /* load the header */ - hdr = (mca_oob_tcp_hdr_t*)msg; - hdr->origin = *ORTE_PROC_MY_NAME; - hdr->dst = peer->name; - hdr->type = MCA_OOB_TCP_IDENT; - hdr->tag = 0; - hdr->nbytes = 0; - MCA_OOB_TCP_HDR_HTON(hdr); + /* load the message */ + memcpy(msg, &hdr, sizeof(hdr)); + memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string)); + memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred->credential, cred->size); - /* load the version string */ - memcpy(msg+sizeof(mca_oob_tcp_hdr_t), orte_version_string, strlen(orte_version_string)); - - /* load our security credential, stepping over to leave the NULL at end of version string */ - if (OPAL_SUCCESS != (rc = opal_sec.get_token((opal_identifier_t*)ORTE_PROC_MY_NAME, - (opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1), - OPAL_SEC_CRED_MAX_SIZE))) { - ORTE_ERROR_LOG(rc); - } - - if (ORTE_SUCCESS != tcp_peer_send_blocking(mod, peer, msg, sdsize)) { + /* send it */ + if (ORTE_SUCCESS != tcp_peer_send_blocking(mod, peer->sd, msg, sdsize)) { ORTE_ERROR_LOG(ORTE_ERR_UNREACH); + free(msg); + peer->state = MCA_OOB_TCP_FAILED; + mca_oob_tcp_peer_close(mod, peer); return ORTE_ERR_UNREACH; } + free(msg); + return ORTE_SUCCESS; } @@ -484,29 +494,25 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod, * information that identifies the peers endpoint. */ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer, - void* data, size_t size) + int sd, void* data, size_t size) { unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; int retval; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s sending connect-ack to %s", + "%s send blocking of %"PRIsize_t" bytes to socket %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + size, sd); while (cnt < size) { - retval = send(peer->sd, (char*)ptr+cnt, size-cnt, 0); + retval = send(sd, (char*)ptr+cnt, size-cnt, 0); if (retval < 0) { if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - opal_output(0, "%s tcp_peer_send_blocking: send() to %s failed: %s (%d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), + opal_output(0, "%s tcp_peer_send_blocking: send() to socket %d failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd, strerror(opal_socket_errno), opal_socket_errno); - peer->state = MCA_OOB_TCP_FAILED; - mca_oob_tcp_peer_close(mod, peer); return ORTE_ERR_UNREACH; } continue; @@ -515,9 +521,8 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s connect-ack sent to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + "%s connect-ack sent to socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); return ORTE_SUCCESS; } @@ -528,73 +533,155 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, * socket to a connected state. */ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer) + mca_oob_tcp_peer_t* pr, + int sd, mca_oob_tcp_hdr_t *dhdr) { - uint8_t *msg; - mca_oob_tcp_hdr_t *hdr; + char *msg; char *version; - int rc; - size_t rsize; + int rc, cmpval; + opal_sec_cred_t creds; + mca_oob_tcp_hdr_t hdr; + mca_oob_tcp_peer_t *peer; + uint64_t *ui64; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s RECV CONNECT ACK FROM %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); + (NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd); - /* malloc a fixed size */ - rsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL - msg = (uint8_t*)malloc(rsize); - - /* ensure all is zero'd */ - memset(msg, 0, rsize); - - if (tcp_peer_recv_blocking(mod, peer, msg, rsize)) { - /* If the peer state is CONNECT_ACK, then we were waiting for - * the connection to be ack'd - */ - if (peer->state != MCA_OOB_TCP_CONNECT_ACK) { - /* handshake broke down - abort this connection */ - opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); - mca_oob_tcp_peer_close(mod, peer); - return ORTE_ERR_UNREACH; + peer = pr; + /* get the header */ + if (tcp_peer_recv_blocking(mod, peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) { + if (NULL != peer) { + /* If the peer state is CONNECT_ACK, then we were waiting for + * the connection to be ack'd + */ + if (peer->state != MCA_OOB_TCP_CONNECT_ACK) { + /* handshake broke down - abort this connection */ + opal_output(0, "%s RECV CONNECT BAD HANDSHAKE (%d) FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->state, + ORTE_NAME_PRINT(&(peer->name)), sd); + mca_oob_tcp_peer_close(mod, peer); + return ORTE_ERR_UNREACH; + } } } else { /* unable to complete the recv */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s unable to complete recv of connect-ack from %s ON SOCKET %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name), peer->sd); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); return ORTE_ERR_UNREACH; } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s connect-ack recvd from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name)); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name)); - /* check the header */ - hdr = (mca_oob_tcp_hdr_t*)msg; - MCA_OOB_TCP_HDR_NTOH(hdr); - if (hdr->type != MCA_OOB_TCP_IDENT) { + /* convert the header */ + MCA_OOB_TCP_HDR_NTOH(&hdr); + /* if the requestor wanted the header returned, then do so now */ + if (NULL != dhdr) { + *dhdr = hdr; + } + + if (MCA_OOB_TCP_PROBE == hdr.type) { + /* send a header back */ + hdr.type = MCA_OOB_TCP_PROBE; + hdr.dst = hdr.origin; + hdr.origin = *ORTE_PROC_MY_NAME; + MCA_OOB_TCP_HDR_HTON(&hdr); + tcp_peer_send_blocking(mod, sd, &hdr, sizeof(mca_oob_tcp_hdr_t)); + CLOSE_THE_SOCKET(sd); + return ORTE_SUCCESS; + } + + if (hdr.type != MCA_OOB_TCP_IDENT) { opal_output(0, "tcp_peer_recv_connect_ack: invalid header type: %d\n", - hdr->type); - peer->state = MCA_OOB_TCP_FAILED; - mca_oob_tcp_peer_close(mod, peer); + hdr.type); + if (NULL != peer) { + peer->state = MCA_OOB_TCP_FAILED; + mca_oob_tcp_peer_close(mod, peer); + } else { + CLOSE_THE_SOCKET(sd); + } return ORTE_ERR_UNREACH; } - /* compare the peers name to the expected value */ - if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr->origin)) { - opal_output(0, "%s tcp_peer_recv_connect_ack: " - "received unexpected process identifier %s from %s\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(hdr->origin)), - ORTE_NAME_PRINT(&(peer->name))); - peer->state = MCA_OOB_TCP_FAILED; - mca_oob_tcp_peer_close(mod, peer); - return ORTE_ERR_UNREACH; + /* if we don't already have it, get the peer */ + if (NULL == peer) { + peer = mca_oob_tcp_peer_lookup(mod, &hdr.origin); + if (NULL == peer) { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s mca_oob_tcp_recv_connect: connection from new peer", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + peer = OBJ_NEW(mca_oob_tcp_peer_t); + peer->mod = mod; + peer->name = hdr.origin; + peer->state = MCA_OOB_TCP_ACCEPTING; + ui64 = (uint64_t*)(&peer->name); + if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) { + OBJ_RELEASE(peer); + CLOSE_THE_SOCKET(sd); + return ORTE_ERR_UNREACH; + } + } else { + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (MCA_OOB_TCP_CONNECTED == peer->state || + MCA_OOB_TCP_CONNECTING == peer->state || + MCA_OOB_TCP_CONNECT_ACK == peer->state) { + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SIMUL CONNECTION WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&hdr.origin)); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(sd); + if (NULL != peer->active_addr) { + peer->active_addr->retries = 0; + } + cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_TCP_UNCONNECTED; + return ORTE_ERR_UNREACH; + } else { + /* retry the connection */ + peer->state = MCA_OOB_TCP_CONNECTING; + ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect); + return ORTE_ERR_UNREACH; + } + } + } + } else { + + /* compare the peers name to the expected value */ + if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) { + opal_output(0, "%s tcp_peer_recv_connect_ack: " + "received unexpected process identifier %s from %s\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(hdr.origin)), + ORTE_NAME_PRINT(&(peer->name))); + peer->state = MCA_OOB_TCP_FAILED; + mca_oob_tcp_peer_close(mod, peer); + return ORTE_ERR_UNREACH; + } } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, @@ -602,8 +689,24 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&peer->name)); + /* get the authentication and version payload */ + if (NULL == (msg = (char*)malloc(hdr.nbytes))) { + peer->state = MCA_OOB_TCP_FAILED; + mca_oob_tcp_peer_close(mod, peer); + return ORTE_ERR_OUT_OF_RESOURCE; + } + if (!tcp_peer_recv_blocking(mod, peer, sd, msg, hdr.nbytes)) { + /* unable to complete the recv */ + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s unable to complete recv of connect-ack from %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); + free(msg); + return ORTE_ERR_UNREACH; + } + /* check that this is from a matching version */ - version = (char*)(msg + sizeof(mca_oob_tcp_hdr_t)); + version = (char*)(msg); if (0 != strcmp(version, orte_version_string)) { opal_output(0, "%s tcp_peer_recv_connect_ack: " "received different version from %s: %s instead of %s\n", @@ -612,6 +715,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, version, orte_version_string); peer->state = MCA_OOB_TCP_FAILED; mca_oob_tcp_peer_close(mod, peer); + free(msg); return ORTE_ERR_UNREACH; } @@ -621,11 +725,24 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, ORTE_NAME_PRINT(&peer->name)); /* check security token */ - if (OPAL_SUCCESS != (rc = opal_sec.authenticate((opal_identifier_t*)(&hdr->origin), - (opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1), - OPAL_SEC_CRED_MAX_SIZE))) { + creds.credential = (char*)(msg + strlen(version) + 1); + creds.size = hdr.nbytes - strlen(version) - 1; + if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) { ORTE_ERROR_LOG(rc); } + free(msg); + + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack %s authenticated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* if the requestor wanted the header returned, then they + * will complete their processing + */ + if (NULL != dhdr) { + return ORTE_SUCCESS; + } /* set the peer into the component and OOB-level peer tables to indicate * that we know this peer and we will be handling him @@ -725,8 +842,8 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod, * information that identifies the peers endpoint. */ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, - mca_oob_tcp_peer_t* peer, - void* data, size_t size) + mca_oob_tcp_peer_t* peer, int sd, + void* data, size_t size) { unsigned char* ptr = (unsigned char*)data; size_t cnt = 0; @@ -734,10 +851,10 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s waiting for connect ack from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); while (cnt < size) { - int retval = recv(peer->sd, (char *)ptr+cnt, size-cnt, 0); + int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0); /* remote closed connection */ if (retval == 0) { @@ -745,9 +862,13 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, "%s-%s tcp_peer_recv_blocking: " "peer closed connection: peer state %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), - peer->state); - mca_oob_tcp_peer_close(mod, peer); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), + (NULL == peer) ? 0 : peer->state); + if (NULL != peer) { + mca_oob_tcp_peer_close(mod, peer); + } else { + CLOSE_THE_SOCKET(sd); + } return false; } @@ -775,18 +896,22 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, "%s connect ack received error %s from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); return false; } else { opal_output(0, "%s tcp_peer_recv_blocking: " "recv() failed for %s: %s (%d)\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name)), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), strerror(opal_socket_errno), opal_socket_errno); - peer->state = MCA_OOB_TCP_FAILED; - mca_oob_tcp_peer_close(mod, peer); + if (NULL != peer) { + peer->state = MCA_OOB_TCP_FAILED; + mca_oob_tcp_peer_close(mod, peer); + } else { + CLOSE_THE_SOCKET(sd); + } return false; } } @@ -798,7 +923,7 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod, opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s connect ack received from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&(peer->name))); + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); return true; } diff --git a/orte/mca/oob/tcp/oob_tcp_connection.h b/orte/mca/oob/tcp/oob_tcp_connection.h index 5cef3cc608..8d4fa8ffc4 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.h +++ b/orte/mca/oob/tcp/oob_tcp_connection.h @@ -99,7 +99,9 @@ ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const ORTE_MODULE_DECLSPEC bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer); ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer); -ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer); +ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, + mca_oob_tcp_peer_t* peer, + int sd, mca_oob_tcp_hdr_t *dhdr); ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t *peer); diff --git a/orte/mca/oob/tcp/oob_tcp_sendrecv.c b/orte/mca/oob/tcp/oob_tcp_sendrecv.c index 5de2e81ac9..6f77cc0970 100644 --- a/orte/mca/oob/tcp/oob_tcp_sendrecv.c +++ b/orte/mca/oob/tcp/oob_tcp_sendrecv.c @@ -414,7 +414,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) switch (peer->state) { case MCA_OOB_TCP_CONNECT_ACK: - if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer))) { + if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer, peer->sd, NULL))) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s:tcp:recv:handler starting send/recv events", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));