1
1

Upgrade the security framework to avoid multiple hits against the global security server. Add support for future case where mpirun assings a global security credential for a given run, though we need to work out how to handle connect-accept from other mpirun's in that case. Remove a bunch of duplicate code in the OOB by consolidating the connection handshake code.

Refs trac:4221

This commit was SVN r30554.

The following Trac tickets were found above:
  Ticket 4221 --> https://svn.open-mpi.org/trac/ompi/ticket/4221
Этот коммит содержится в:
Ralph Castain 2014-02-04 14:47:04 +00:00
родитель 3d8c06d1b4
Коммит 230336b6a8
10 изменённых файлов: 358 добавлений и 431 удалений

Просмотреть файл

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -75,7 +76,8 @@ enum {
OPAL_ERR_TAKE_NEXT_OPTION = (OPAL_ERR_BASE - 46),
OPAL_ERR_PROC_ENTRY_NOT_FOUND = (OPAL_ERR_BASE - 47),
OPAL_ERR_DATA_VALUE_NOT_FOUND = (OPAL_ERR_BASE - 48),
OPAL_ERR_CONNECTION_FAILED = (OPAL_ERR_BASE - 49)
OPAL_ERR_CONNECTION_FAILED = (OPAL_ERR_BASE - 49),
OPAL_ERR_AUTHENTICATION_FAILED = (OPAL_ERR_BASE - 50)
};
#define OPAL_ERR_MAX (OPAL_ERR_BASE - 100)

Просмотреть файл

@ -26,6 +26,7 @@ BEGIN_C_DECLS
/* some OPAL-appropriate key definitions */
#define OPAL_DB_LOCALITY "opal.locality"
#define OPAL_DB_CPUSET "opal.cpuset"
#define OPAL_DB_CREDENTIAL "opal.cred"
END_C_DECLS

Просмотреть файл

@ -20,26 +20,27 @@
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
#include "opal/mca/db/db.h"
#include "opal/mca/sec/base/base.h"
#include "sec_basic.h"
static int init(void);
static void finalize(void);
static int get_token(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size);
static int authenticate(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size);
static int get_my_cred(opal_identifier_t *my_id,
opal_sec_cred_t **cred);
static int authenticate(opal_sec_cred_t *cred);
opal_sec_base_module_t opal_sec_basic_module = {
init,
finalize,
get_token,
get_my_cred,
authenticate
};
static opal_sec_cred_t my_cred;
static bool initialized = false;
static int init(void)
{
return OPAL_SUCCESS;
@ -47,50 +48,42 @@ static int init(void)
static void finalize(void)
{
}
static int get_token(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size)
{
uint32_t ui32;
opal_output_verbose(2, opal_sec_base_framework.framework_output,
"creating sec token for %"PRIu64"", *proc);
ui32 = htonl(12345);
memcpy(token, &ui32, 4);
opal_output_verbose(2, opal_sec_base_framework.framework_output,
"proc %"PRIu64" was assigned token %u",
*proc, 12345);
return OPAL_SUCCESS;
}
static int authenticate(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size)
{
uint32_t ui32;
uint32_t chk;
opal_output_verbose(2, opal_sec_base_framework.framework_output,
"authenticating %"PRIu64"", *proc);
/* for now, just check the identifier against the proc id */
memcpy(&ui32, token, 4);
chk = ntohl(ui32);
if (12345 != chk) {
opal_output_verbose(2, opal_sec_base_framework.framework_output,
"proc %"PRIu64" was not authenticated %u vs %u",
*proc, chk, 12345);
return OPAL_ERROR;
if (initialized) {
free(my_cred.credential);
}
}
static int get_my_cred(opal_identifier_t *my_id,
opal_sec_cred_t **cred)
{
opal_byte_object_t *cd;
if (!initialized) {
/* check first if a credential was stored for this job
* in the database
*/
if (OPAL_SUCCESS == opal_db.fetch(my_id, OPAL_DB_CREDENTIAL,
(void**)&cd, OPAL_BYTE_OBJECT)) {
my_cred.credential = (char*)cd->bytes;
my_cred.size = cd->size;
} else {
my_cred.credential = strdup("12345");
my_cred.size = strlen(my_cred.credential)+1; // include the NULL
}
}
initialized = true;
*cred = &my_cred;
opal_output_verbose(2, opal_sec_base_framework.framework_output,
"proc %"PRIu64" was authenticated", *proc);
return OPAL_SUCCESS;
}
static int authenticate(opal_sec_cred_t *cred)
{
if (0 == strncmp(cred->credential, "12345", strlen("12345"))) {
return OPAL_SUCCESS;
}
return OPAL_ERR_AUTHENTICATION_FAILED;
}

Просмотреть файл

@ -22,17 +22,14 @@
static int init(void);
static void finalize(void);
static int get_token(const opal_identifier_t *proc,
opal_sec_cred_t *token,
size_t size);
static int authenticate(const opal_identifier_t *proc,
opal_sec_cred_t *token,
size_t size);
static int get_my_cred(opal_identifier_t *my_id,
opal_sec_cred_t **cred);
static int authenticate(opal_sec_cred_t *cred);
opal_sec_base_module_t opal_sec_keystone_module = {
init,
finalize,
get_token,
get_my_cred,
authenticate
};
@ -45,16 +42,13 @@ static void finalize(void)
{
}
static int get_token(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size)
static int get_my_cred(opal_identifier_t *my_id,
opal_sec_cred_t **cred)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int authenticate(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size)
static int authenticate(opal_sec_cred_t *cred)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}

Просмотреть файл

@ -28,12 +28,20 @@
* built should check to see if it can connect to its
* respective server - if it can, then it should return
* success to indicate it is ready to be used.
*
* For scalability, it is important that each process only
* contact the security server once, and only when requested
* to do so. Thus, the plugin should not get credentials for
* the process until the first call to "get_my_credentials",
* and should then cache the results for future use.
*/
BEGIN_C_DECLS
#define OPAL_SEC_CRED_MAX_SIZE 512 // max size of the OPAL security credential
typedef uint8_t* opal_sec_cred_t;
typedef struct {
char *credential;
size_t size;
} opal_sec_cred_t;
/*
* Initialize the module
@ -46,31 +54,32 @@ typedef int (*opal_sec_base_module_init_fn_t)(void);
typedef void (*opal_sec_base_module_finalize_fn_t)(void);
/*
* Get a security credential - given my process identifier, return
* a "token" that I can use for authenticating myself to another process.
* The value must be returned in the provided location, subject to
* the specified size constraint, in a network-byte-ordered form suitable
* Get a security credential for this process - return pointer to
* a "credential" that I can use for authenticating myself to another process.
* The value must be returned in a network-byte-ordered form suitable
* for sending across the network.
*
* Function returns OPAL_SUCCESS if a token was assigned, or an error
* It isn't expected that the identifier will be used to obtain a
* certificate as external security systems will have no idea what
* it means. However, some modules may use it, and there is no way
* for the opal layer to know a process identifier without being told,
* do provide it here
*
* Function returns OPAL_SUCCESS if a credential was assigned, or an error
* code indicating why it failed
*/
typedef int (*opal_sec_base_module_get_token_fn_t)(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size);
typedef int (*opal_sec_base_module_get_my_cred_fn_t)(opal_identifier_t *my_id,
opal_sec_cred_t **cred);
/*
* Authenticate a security credential - given a process identifier and
* the security credential it provided, determine if the credential is
* valid. The credential is passed in a network-byte-ordered form as it
* came across the network.
* Authenticate a security credential - given a security credential,
* determine if the credential is valid. The credential is passed in
* a network-byte-ordered form as it came across the network.
*
* Function returns OPAL_SUCCESS if the token is authenticated, or an
* error code indicating why it failed
*/
typedef int (*opal_sec_base_module_auth_fn_t)(const opal_identifier_t *proc,
opal_sec_cred_t token,
size_t size);
typedef int (*opal_sec_base_module_auth_fn_t)(opal_sec_cred_t *cred);
/*
* the standard module data structure
@ -78,7 +87,7 @@ typedef int (*opal_sec_base_module_auth_fn_t)(const opal_identifier_t *proc,
struct opal_sec_base_module_1_0_0_t {
opal_sec_base_module_init_fn_t init;
opal_sec_base_module_finalize_fn_t finalize;
opal_sec_base_module_get_token_fn_t get_token;
opal_sec_base_module_get_my_cred_fn_t get_my_credential;
opal_sec_base_module_auth_fn_t authenticate;
};
typedef struct opal_sec_base_module_1_0_0_t opal_sec_base_module_1_0_0_t;

Просмотреть файл

@ -229,6 +229,9 @@ opal_err2str(int errnum, const char **errmsg)
case OPAL_ERR_CONNECTION_FAILED:
retval = "Connection failed";
break;
case OPAL_ERR_AUTHENTICATION_FAILED:
retval = "Authentication failed";
break;
default:
retval = NULL;
}

Просмотреть файл

@ -558,168 +558,36 @@ static void resend(struct mca_oob_tcp_msg_error_t *mp)
}
/*
* Handle probe
* Event callback when there is data available on the registered
* socket to recv. This is called for the listen sockets to accept an
* incoming connection, on new sockets trying to complete the software
* connection process, and for probes. Data on an established
* connection is handled elsewhere.
*/
static void recv_probe(int sd, mca_oob_tcp_hdr_t* hdr)
static void recv_handler(int sd, short flg, void *cbdata)
{
unsigned char* ptr = (unsigned char*)hdr;
size_t cnt = 0;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:tcp:recv:probe called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
hdr->type = MCA_OOB_TCP_PROBE;
hdr->dst = hdr->origin;
hdr->origin = *ORTE_PROC_MY_NAME;
MCA_OOB_TCP_HDR_HTON(hdr);
while (cnt < sizeof(mca_oob_tcp_hdr_t)) {
int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_tcp_hdr_t)-cnt, 0);
if (retval < 0) {
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_probe: send() failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(hdr->dst)),
strerror(opal_socket_errno),
opal_socket_errno);
CLOSE_THE_SOCKET(sd);
return;
}
continue;
}
cnt += retval;
}
CLOSE_THE_SOCKET(sd);
}
/*
* Complete the OOB-level handshake to establish a connection with
* another peer. Called when the remote peer replies with his process
* identifier. Used in both the threaded and event listen modes.
*/
static void recv_connect(mca_oob_tcp_module_t *mod,
int sd, uint8_t *msg)
{
mca_oob_tcp_peer_t* peer;
int flags, cmpval;
mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
int flags;
uint64_t *ui64;
mca_oob_tcp_hdr_t *hdr;
char *version;
int rc;
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_peer_t *peer;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:tcp:recv:connect called with msg size %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)sizeof(msg));
/* check for invalid name - if this is true, then we have an error
*/
hdr = (mca_oob_tcp_hdr_t*)msg; // was already converted to host order
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->origin, ORTE_NAME_INVALID)) {
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
CLOSE_THE_SOCKET(sd);
return;
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack header from %s is okay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin));
/* check that this is from a matching version */
version = (char*)(msg + sizeof(mca_oob_tcp_hdr_t));
if (0 != strcmp(version, orte_version_string)) {
opal_output(0, "%s recv_connect: "
"received different version from %s: %s instead of %s\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(hdr->origin)),
version, orte_version_string);
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
CLOSE_THE_SOCKET(sd);
return;
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack version from %s matches ours",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin));
/* check security token */
if (OPAL_SUCCESS != (rc = opal_sec.authenticate((opal_identifier_t*)(&hdr->origin),
(opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1),
OPAL_SEC_CRED_MAX_SIZE))) {
opal_output(0, "%s SECURITY CONNECTION ERROR FROM %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin));
ORTE_ERROR_LOG(rc);
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s mca_oob_tcp_recv_connect: processing connection from %s for socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin), sd);
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(mod, &hdr->origin);
if (NULL == peer) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s mca_oob_tcp_recv_connect: connection from new peer",
"%s:tcp:recv:handler called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
peer = OBJ_NEW(mca_oob_tcp_peer_t);
peer->mod = mod;
peer->name = hdr->origin;
peer->state = MCA_OOB_TCP_ACCEPTING;
ui64 = (uint64_t*)(&peer->name);
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) {
OBJ_RELEASE(peer);
CLOSE_THE_SOCKET(sd);
return;
}
} else {
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->origin));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
if (NULL != peer->active_addr) {
peer->active_addr->retries = 0;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->origin, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
return;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect);
return;
}
}
/* get the handshake */
if (ORTE_SUCCESS != mca_oob_tcp_peer_recv_connect_ack(op->mod, NULL, sd, &hdr)) {
goto cleanup;
}
/* finish processing ident */
if (MCA_OOB_TCP_IDENT == hdr.type) {
if (NULL == (peer = mca_oob_tcp_peer_lookup(op->mod, &hdr.origin))) {
/* should never happen */
mca_oob_tcp_peer_close(op->mod, peer);
goto cleanup;
}
/* set socket up to be non-blocking */
if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
opal_output(0, "%s mca_oob_tcp_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
@ -734,90 +602,20 @@ static void recv_connect(mca_oob_tcp_module_t *mod,
/* is the peer instance willing to accept this connection */
peer->sd = sd;
if (mca_oob_tcp_peer_accept(mod, peer) == false) {
if (mca_oob_tcp_peer_accept(op->mod, peer) == false) {
if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
opal_output(0, "%s-%s mca_oob_tcp_recv_connect: "
"rejected connection from %s connection state %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
ORTE_NAME_PRINT(&(hdr->origin)),
ORTE_NAME_PRINT(&(hdr.origin)),
peer->state);
}
CLOSE_THE_SOCKET(sd);
ui64 = (uint64_t*)(&peer->name);
opal_hash_table_set_value_uint64(&mod->peers, (*ui64), NULL);
opal_hash_table_set_value_uint64(&op->mod->peers, (*ui64), NULL);
OBJ_RELEASE(peer);
}
}
/*
* Event callback when there is data available on the registered
* socket to recv. This is called for the listen sockets to accept an
* incoming connection, on new sockets trying to complete the software
* connection process, and for probes. Data on an established
* connection is handled elsewhere.
*/
static void recv_handler(int sd, short flags, void *cbdata)
{
mca_oob_tcp_conn_op_t *op = (mca_oob_tcp_conn_op_t*)cbdata;
size_t cnt, rdsize;
uint8_t *msg;
mca_oob_tcp_hdr_t *hdr;
int rc;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:tcp:recv:handler called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* malloc a fixed size */
rdsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL
msg = (uint8_t*)malloc(rdsize);
/* ensure all is zero'd */
memset(msg, 0, rdsize);
/* get the handshake */
cnt = 0;
while (cnt < rdsize) {
rc = recv(sd, (char*)(msg+cnt), rdsize-cnt, 0);
if (0 == rc) {
if (OOB_TCP_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
opal_output(0, "%s mca_oob_tcp_recv_handler: peer closed connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
CLOSE_THE_SOCKET(sd);
goto cleanup;
} else if (rc < 0) {
if (opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s mca_oob_tcp_recv_handler: recv() failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(sd);
goto cleanup;
}
continue;
}
cnt += rc;
}
/* check the header */
hdr = (mca_oob_tcp_hdr_t*)msg;
MCA_OOB_TCP_HDR_NTOH(hdr);
/* dispatch based on message type */
switch (hdr->type) {
case MCA_OOB_TCP_PROBE:
recv_probe(sd, hdr);
break;
case MCA_OOB_TCP_IDENT:
recv_connect(op->mod, sd, msg);
break;
default:
opal_output(0, "%s recv_handler: invalid message type: %d from peer %s\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr->type,
ORTE_NAME_PRINT(&hdr->origin));
CLOSE_THE_SOCKET(sd);
break;
}
cleanup:

Просмотреть файл

@ -76,11 +76,10 @@ static void tcp_peer_event_init(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer);
static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer,
static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod, int sd,
void* data, size_t size);
static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer,
mca_oob_tcp_peer_t* peer, int sd,
void* data, size_t size);
static void tcp_peer_connected(mca_oob_tcp_peer_t* peer);
@ -324,42 +323,53 @@ void mca_oob_tcp_peer_try_connect(int fd, short args, void *cbdata)
static int tcp_peer_send_connect_ack(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer)
{
uint8_t *msg;
mca_oob_tcp_hdr_t *hdr;
char *msg;
mca_oob_tcp_hdr_t hdr;
int rc;
size_t sdsize;
opal_sec_cred_t *cred;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* malloc a fixed size */
sdsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL
msg = (uint8_t*)malloc(sdsize);
/* load the header */
hdr.origin = *ORTE_PROC_MY_NAME;
hdr.dst = peer->name;
hdr.type = MCA_OOB_TCP_IDENT;
hdr.tag = 0;
/* get our security credential*/
if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set the number of bytes to be read beyond the header */
hdr.nbytes = strlen(orte_version_string) + 1 + cred->size;
MCA_OOB_TCP_HDR_HTON(&hdr);
/* create a space for our message */
sdsize = (sizeof(hdr) + strlen(orte_version_string) + 1 + cred->size);
if (NULL == (msg = (char*)malloc(sdsize))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(msg, 0, sdsize);
/* load the header */
hdr = (mca_oob_tcp_hdr_t*)msg;
hdr->origin = *ORTE_PROC_MY_NAME;
hdr->dst = peer->name;
hdr->type = MCA_OOB_TCP_IDENT;
hdr->tag = 0;
hdr->nbytes = 0;
MCA_OOB_TCP_HDR_HTON(hdr);
/* load the message */
memcpy(msg, &hdr, sizeof(hdr));
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred->credential, cred->size);
/* load the version string */
memcpy(msg+sizeof(mca_oob_tcp_hdr_t), orte_version_string, strlen(orte_version_string));
/* load our security credential, stepping over to leave the NULL at end of version string */
if (OPAL_SUCCESS != (rc = opal_sec.get_token((opal_identifier_t*)ORTE_PROC_MY_NAME,
(opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1),
OPAL_SEC_CRED_MAX_SIZE))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != tcp_peer_send_blocking(mod, peer, msg, sdsize)) {
/* send it */
if (ORTE_SUCCESS != tcp_peer_send_blocking(mod, peer->sd, msg, sdsize)) {
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
free(msg);
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
return ORTE_ERR_UNREACH;
}
free(msg);
return ORTE_SUCCESS;
}
@ -484,29 +494,25 @@ void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
* information that identifies the peers endpoint.
*/
static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer,
void* data, size_t size)
int sd, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
int retval;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s sending connect-ack to %s",
"%s send blocking of %"PRIsize_t" bytes to socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
size, sd);
while (cnt < size) {
retval = send(peer->sd, (char*)ptr+cnt, size-cnt, 0);
retval = send(sd, (char*)ptr+cnt, size-cnt, 0);
if (retval < 0) {
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s tcp_peer_send_blocking: send() to %s failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
opal_output(0, "%s tcp_peer_send_blocking: send() to socket %d failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd,
strerror(opal_socket_errno),
opal_socket_errno);
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
return ORTE_ERR_UNREACH;
}
continue;
@ -515,9 +521,8 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack sent to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
"%s connect-ack sent to socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd);
return ORTE_SUCCESS;
}
@ -528,82 +533,180 @@ static int tcp_peer_send_blocking(mca_oob_tcp_module_t *mod,
* socket to a connected state.
*/
int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer)
mca_oob_tcp_peer_t* pr,
int sd, mca_oob_tcp_hdr_t *dhdr)
{
uint8_t *msg;
mca_oob_tcp_hdr_t *hdr;
char *msg;
char *version;
int rc;
size_t rsize;
int rc, cmpval;
opal_sec_cred_t creds;
mca_oob_tcp_hdr_t hdr;
mca_oob_tcp_peer_t *peer;
uint64_t *ui64;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
(NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd);
/* malloc a fixed size */
rsize = OPAL_SEC_CRED_MAX_SIZE + sizeof(mca_oob_tcp_hdr_t) + strlen(orte_version_string) + 1; // need to include the NULL
msg = (uint8_t*)malloc(rsize);
/* ensure all is zero'd */
memset(msg, 0, rsize);
if (tcp_peer_recv_blocking(mod, peer, msg, rsize)) {
peer = pr;
/* get the header */
if (tcp_peer_recv_blocking(mod, peer, sd, &hdr, sizeof(mca_oob_tcp_hdr_t))) {
if (NULL != peer) {
/* If the peer state is CONNECT_ACK, then we were waiting for
* the connection to be ack'd
*/
if (peer->state != MCA_OOB_TCP_CONNECT_ACK) {
/* handshake broke down - abort this connection */
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE (%d) FROM %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), peer->state,
ORTE_NAME_PRINT(&(peer->name)), sd);
mca_oob_tcp_peer_close(mod, peer);
return ORTE_ERR_UNREACH;
}
}
} else {
/* unable to complete the recv */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
return ORTE_ERR_UNREACH;
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name));
/* check the header */
hdr = (mca_oob_tcp_hdr_t*)msg;
MCA_OOB_TCP_HDR_NTOH(hdr);
if (hdr->type != MCA_OOB_TCP_IDENT) {
/* convert the header */
MCA_OOB_TCP_HDR_NTOH(&hdr);
/* if the requestor wanted the header returned, then do so now */
if (NULL != dhdr) {
*dhdr = hdr;
}
if (MCA_OOB_TCP_PROBE == hdr.type) {
/* send a header back */
hdr.type = MCA_OOB_TCP_PROBE;
hdr.dst = hdr.origin;
hdr.origin = *ORTE_PROC_MY_NAME;
MCA_OOB_TCP_HDR_HTON(&hdr);
tcp_peer_send_blocking(mod, sd, &hdr, sizeof(mca_oob_tcp_hdr_t));
CLOSE_THE_SOCKET(sd);
return ORTE_SUCCESS;
}
if (hdr.type != MCA_OOB_TCP_IDENT) {
opal_output(0, "tcp_peer_recv_connect_ack: invalid header type: %d\n",
hdr->type);
hdr.type);
if (NULL != peer) {
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
} else {
CLOSE_THE_SOCKET(sd);
}
return ORTE_ERR_UNREACH;
}
/* if we don't already have it, get the peer */
if (NULL == peer) {
peer = mca_oob_tcp_peer_lookup(mod, &hdr.origin);
if (NULL == peer) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s mca_oob_tcp_recv_connect: connection from new peer",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
peer = OBJ_NEW(mca_oob_tcp_peer_t);
peer->mod = mod;
peer->name = hdr.origin;
peer->state = MCA_OOB_TCP_ACCEPTING;
ui64 = (uint64_t*)(&peer->name);
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mod->peers, (*ui64), peer)) {
OBJ_RELEASE(peer);
CLOSE_THE_SOCKET(sd);
return ORTE_ERR_UNREACH;
}
} else {
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_TCP_CONNECTED == peer->state ||
MCA_OOB_TCP_CONNECTING == peer->state ||
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr.origin));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
if (NULL != peer->active_addr) {
peer->active_addr->retries = 0;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_TCP_UNCONNECTED;
return ORTE_ERR_UNREACH;
} else {
/* retry the connection */
peer->state = MCA_OOB_TCP_CONNECTING;
ORTE_ACTIVATE_TCP_CONN_STATE(mod, peer, mca_oob_tcp_peer_try_connect);
return ORTE_ERR_UNREACH;
}
}
}
} else {
/* compare the peers name to the expected value */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr->origin)) {
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
"received unexpected process identifier %s from %s\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(hdr->origin)),
ORTE_NAME_PRINT(&(hdr.origin)),
ORTE_NAME_PRINT(&(peer->name)));
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
return ORTE_ERR_UNREACH;
}
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack header from %s is okay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* get the authentication and version payload */
if (NULL == (msg = (char*)malloc(hdr.nbytes))) {
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (!tcp_peer_recv_blocking(mod, peer, sd, msg, hdr.nbytes)) {
/* unable to complete the recv */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
free(msg);
return ORTE_ERR_UNREACH;
}
/* check that this is from a matching version */
version = (char*)(msg + sizeof(mca_oob_tcp_hdr_t));
version = (char*)(msg);
if (0 != strcmp(version, orte_version_string)) {
opal_output(0, "%s tcp_peer_recv_connect_ack: "
"received different version from %s: %s instead of %s\n",
@ -612,6 +715,7 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
version, orte_version_string);
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
free(msg);
return ORTE_ERR_UNREACH;
}
@ -621,11 +725,24 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
ORTE_NAME_PRINT(&peer->name));
/* check security token */
if (OPAL_SUCCESS != (rc = opal_sec.authenticate((opal_identifier_t*)(&hdr->origin),
(opal_sec_cred_t)(msg+sizeof(mca_oob_tcp_hdr_t)+strlen(orte_version_string)+1),
OPAL_SEC_CRED_MAX_SIZE))) {
creds.credential = (char*)(msg + strlen(version) + 1);
creds.size = hdr.nbytes - strlen(version) - 1;
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(&creds))) {
ORTE_ERROR_LOG(rc);
}
free(msg);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack %s authenticated",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* if the requestor wanted the header returned, then they
* will complete their processing
*/
if (NULL != dhdr) {
return ORTE_SUCCESS;
}
/* set the peer into the component and OOB-level peer tables to indicate
* that we know this peer and we will be handling him
@ -725,7 +842,7 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
* information that identifies the peers endpoint.
*/
static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer,
mca_oob_tcp_peer_t* peer, int sd,
void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
@ -734,10 +851,10 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s waiting for connect ack from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
while (cnt < size) {
int retval = recv(peer->sd, (char *)ptr+cnt, size-cnt, 0);
int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0);
/* remote closed connection */
if (retval == 0) {
@ -745,9 +862,13 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
"%s-%s tcp_peer_recv_blocking: "
"peer closed connection: peer state %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->state);
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
(NULL == peer) ? 0 : peer->state);
if (NULL != peer) {
mca_oob_tcp_peer_close(mod, peer);
} else {
CLOSE_THE_SOCKET(sd);
}
return false;
}
@ -775,18 +896,22 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
"%s connect ack received error %s from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
strerror(opal_socket_errno),
ORTE_NAME_PRINT(&(peer->name)));
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
return false;
} else {
opal_output(0,
"%s tcp_peer_recv_blocking: "
"recv() failed for %s: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
if (NULL != peer) {
peer->state = MCA_OOB_TCP_FAILED;
mca_oob_tcp_peer_close(mod, peer);
} else {
CLOSE_THE_SOCKET(sd);
}
return false;
}
}
@ -798,7 +923,7 @@ static bool tcp_peer_recv_blocking(mca_oob_tcp_module_t *mod,
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect ack received from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)));
return true;
}

Просмотреть файл

@ -99,7 +99,9 @@ ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const
ORTE_MODULE_DECLSPEC bool mca_oob_tcp_peer_accept(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer);
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer);
ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod, mca_oob_tcp_peer_t* peer);
ORTE_MODULE_DECLSPEC int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t* peer,
int sd, mca_oob_tcp_hdr_t *dhdr);
ORTE_MODULE_DECLSPEC void mca_oob_tcp_peer_close(mca_oob_tcp_module_t *mod,
mca_oob_tcp_peer_t *peer);

Просмотреть файл

@ -414,7 +414,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
switch (peer->state) {
case MCA_OOB_TCP_CONNECT_ACK:
if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer))) {
if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer, peer->sd, NULL))) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:tcp:recv:handler starting send/recv events",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));