1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-11-11 17:06:41 -08:00
родитель 8a9ef3dc2d
Коммит d75d0bc5f6
6 изменённых файлов: 255 добавлений и 114 удалений

Просмотреть файл

@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=git9137d98
repo_rev=gitf56d30e
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Nov 07, 2017"
date="Nov 11, 2017"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library

Просмотреть файл

@ -48,11 +48,8 @@
#ifndef PMI_H
#define PMI_H
#ifdef PMIX_HAVE_VISIBILITY
#define PMIX_EXPORT __attribute__((__visibility__("default")))
#else
#define PMIX_EXPORT
#endif
/* Structure and constant definitions */
#include <pmix_common.h>
/* prototypes for the PMI interface in MPICH2 */

Просмотреть файл

@ -7,11 +7,8 @@
#ifndef PMI2_H_INCLUDED
#define PMI2_H_INCLUDED
#ifdef PMIX_HAVE_VISIBILITY
#define PMIX_EXPORT __attribute__((__visibility__("default")))
#else
#define PMIX_EXPORT
#endif
/* Structure and constant definitions */
#include <pmix_common.h>
#define PMI2_MAX_KEYLEN 64
#define PMI2_MAX_VALLEN 1024

Просмотреть файл

@ -114,27 +114,31 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
/* if we don't have a path to the daemon rendezvous point,
* then we need to return an error */
if (NULL == (evar = getenv("PMIX_SERVER_URI"))) {
if (NULL != (evar = getenv("PMIX_SERVER_URI2USOCK"))) {
/* this is a v2.1+ server */
pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v21");
if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
return PMIX_ERR_INIT;
}
} else if (NULL != (evar = getenv("PMIX_SERVER_URI"))) {
/* this is a pre-v2.1 server - must use the v12 bfrops module */
pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
return PMIX_ERR_INIT;
}
} else {
/* let the caller know that the server isn't available */
return PMIX_ERR_SERVER_NOT_AVAIL;
}
/* the server will be using the same bfrops as us */
pmix_client_globals.myserver->nptr->compat.bfrops = pmix_globals.mypeer->nptr->compat.bfrops;
uri = pmix_argv_split(evar, ':');
if (3 != pmix_argv_count(uri)) {
pmix_argv_free(uri);
PMIX_ERROR_LOG(PMIX_ERROR);
return PMIX_ERROR;
}
/* definitely a v1 server */
pmix_client_globals.myserver->proc_type = PMIX_PROC_SERVER | PMIX_PROC_V1;
/* must use the v12 bfrops module */
pmix_globals.mypeer->nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
if (NULL == pmix_globals.mypeer->nptr->compat.bfrops) {
pmix_argv_free(uri);
return PMIX_ERR_INIT;
}
/* the server will be using the same */
pmix_client_globals.myserver->nptr->compat.bfrops = pmix_globals.mypeer->nptr->compat.bfrops;
/* set the server nspace */
if (NULL == pmix_client_globals.myserver->info) {
pmix_client_globals.myserver->info = PMIX_NEW(pmix_rank_info_t);
@ -272,6 +276,8 @@ static pmix_status_t send_connect_ack(int sd)
size_t sdsize=0, csize=0, len;
char *cred = NULL;
pmix_status_t rc;
char *sec, *bfrops, *gds;
pmix_bfrop_buffer_type_t bftype;
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"pmix: SEND CONNECT ACK");
@ -292,8 +298,24 @@ static pmix_status_t send_connect_ack(int sd)
return rc;
}
/* add the name of our active sec module - we selected it
* in pmix_client.c prior to entering here */
sec = pmix_globals.mypeer->nptr->compat.psec->name;
/* add our active bfrops module name */
bfrops = pmix_globals.mypeer->nptr->compat.bfrops->version;
/* and the type of buffer we are using */
bftype = pmix_globals.mypeer->nptr->compat.type;
/* add our active gds module for working with the server */
gds = (char*)pmix_client_globals.myserver->nptr->compat.gds->name;
/* set the number of bytes to be read beyond the header */
hdr.nbytes = sdsize + strlen(PMIX_VERSION) + 1 + len; // must NULL terminate the VERSION string!
hdr.nbytes = sdsize + (strlen(PMIX_VERSION) + 1) + \
(sizeof(size_t) + len) + \
(strlen(sec) + 1) + \
(strlen(bfrops) + 1) + sizeof(bftype) + \
(strlen(gds) + 1); // must NULL terminate the strings!
/* create a space for our message */
sdsize = (sizeof(hdr) + hdr.nbytes);
@ -309,16 +331,41 @@ static pmix_status_t send_connect_ack(int sd)
csize=0;
memcpy(msg, &hdr, sizeof(pmix_usock_hdr_t));
csize += sizeof(pmix_usock_hdr_t);
/* pass our nspace */
memcpy(msg+csize, pmix_globals.myid.nspace, strlen(pmix_globals.myid.nspace));
csize += strlen(pmix_globals.myid.nspace)+1;
/* pass our rank */
memcpy(msg+csize, &pmix_globals.myid.rank, sizeof(int));
csize += sizeof(int);
/* pass our version string */
memcpy(msg+csize, PMIX_VERSION, strlen(PMIX_VERSION));
csize += strlen(PMIX_VERSION)+1;
if (NULL != cred) {
memcpy(msg+csize, cred, strlen(cred)); // leaves last position in msg set to NULL
/* pass the size of the credential */
memcpy(msg+csize, &len, sizeof(size_t));
csize += sizeof(size_t);
if (0 < len) {
memcpy(msg+csize, cred, len);
csize += len;
}
/* pass our active sec module */
memcpy(msg+csize, sec, strlen(sec));
csize += strlen(sec)+1;
/* provide our active bfrops module */
memcpy(msg+csize, bfrops, strlen(bfrops));
csize += strlen(bfrops)+1;
/* provide the bfrops type */
memcpy(msg+csize, &bftype, sizeof(bftype));
csize += sizeof(bftype);
/* provide the gds module */
memcpy(msg+csize, gds, strlen(gds));
/* send the entire msg across */
if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(sd, msg, sdsize)) {
free(msg);
if (NULL != cred) {

Просмотреть файл

@ -104,7 +104,6 @@ PMIX_EXPORT pmix_ptl_usock_component_t mca_ptl_usock_component = {
static void connection_handler(int sd, short args, void *cbdata);
static void listener_cb(int incoming_sd, void *cbdata);
static char *sec_mode = NULL;
pmix_status_t component_open(void)
{
@ -133,9 +132,6 @@ pmix_status_t component_open(void)
pmix_status_t component_close(void)
{
if (NULL != sec_mode) {
free(sec_mode);
}
if (NULL != mca_ptl_usock_component.tmpdir) {
free(mca_ptl_usock_component.tmpdir);
}
@ -176,7 +172,7 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
socklen_t addrlen;
struct sockaddr_un *address;
bool disabled = false;
char *secmods, **options, *pmix_pid;
char *pmix_pid;
pid_t mypid;
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
@ -211,17 +207,6 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
address = (struct sockaddr_un*)&mca_ptl_usock_component.connection;
address->sun_family = AF_UNIX;
/* any client we hear from will be using v1.x protocols. This
* means that they cannot tell us what security module they
* are using as this wasn't included in their handshake. So
* the best we can assume is that they are using the highest
* priority default we have */
secmods = pmix_psec_base_get_available_modules();
options = pmix_argv_split(secmods, ',');
sec_mode = strdup(options[0]);
pmix_argv_free(options);
free(secmods);
/* define the listener */
lt = PMIX_NEW(pmix_listener_t);
@ -242,7 +227,7 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
snprintf(address->sun_path, sizeof(address->sun_path)-1, "%s", pmix_pid);
free(pmix_pid);
/* set the URI */
lt->varname = strdup("PMIX_SERVER_URI");
lt->varname = strdup("PMIX_SERVER_URI:PMIX_SERVER_URI2USOCK");
if (0 > asprintf(&lt->uri, "%s:%lu:%s", pmix_globals.myid.nspace,
(unsigned long)pmix_globals.myid.rank, address->sun_path)) {
PMIX_RELEASE(lt);
@ -349,57 +334,10 @@ static void listener_cb(int incoming_sd, void *cbdata)
pmix_event_active(&pending_connection->ev, EV_WRITE, 1);
}
/* Parse init-ack message:
* NSPACE<0><rank>VERSION<0>[CRED<0>]
*/
static pmix_status_t parse_connect_ack (char *msg, unsigned int len,
char **nspace, unsigned int *rank,
char **version, char **cred)
{
unsigned int msglen;
PMIX_STRNLEN(msglen, msg, len);
if (msglen < len) {
*nspace = msg;
msg += strlen(*nspace) + 1;
len -= strlen(*nspace) + 1;
} else {
return PMIX_ERR_BAD_PARAM;
}
PMIX_STRNLEN(msglen, msg, len);
if (msglen <= len) {
memcpy(rank, msg, sizeof(int));
msg += sizeof(int);
len -= sizeof(int);
} else {
return PMIX_ERR_BAD_PARAM;
}
PMIX_STRNLEN(msglen, msg, len);
if (msglen < len) {
*version = msg;
msg += strlen(*version) + 1;
len -= strlen(*version) + 1;
} else {
return PMIX_ERR_BAD_PARAM;
}
PMIX_STRNLEN(msglen, msg, len);
if (msglen < len)
*cred = msg;
else {
*cred = NULL;
}
return PMIX_SUCCESS;
}
static void connection_handler(int sd, short args, void *cbdata)
{
pmix_pending_connection_t *pnd = (pmix_pending_connection_t*)cbdata;
char *msg, *nspace, *version, *cred;
char *msg, *ptr, *nspace, *version, *cred, *sec, *bfrops, *gds;
pmix_status_t rc;
unsigned int rank;
pmix_usock_hdr_t hdr;
@ -409,6 +347,12 @@ static void connection_handler(int sd, short args, void *cbdata)
bool found;
pmix_proc_t proc;
size_t len;
pmix_bfrop_buffer_type_t bftype;
char **vers;
int major, minor, rel;
unsigned int msglen;
pmix_info_t ginfo;
size_t credlen;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(pnd);
@ -448,11 +392,140 @@ static void connection_handler(int sd, short args, void *cbdata)
PMIX_RELEASE(pnd);
return;
}
len = hdr.nbytes;
ptr = msg;
if (PMIX_SUCCESS != (rc = parse_connect_ack(msg, hdr.nbytes, &nspace,
&rank, &version, &cred))) {
/* extract the nspace of the requestor */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
nspace = ptr;
ptr += strlen(nspace) + 1;
len -= strlen(nspace) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* extract the rank */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen <= len) {
memcpy(&rank, ptr, sizeof(int));
ptr += sizeof(int);
len -= sizeof(int);
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* get their version string */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
version = ptr;
ptr += strlen(version) + 1;
len -= strlen(version) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* check the version - we do NOT support anything less than
* v1.2.5 */
vers = pmix_argv_split(version, '.');
major = strtol(vers[0], NULL, 10);
minor = strtol(vers[1], NULL, 10);
rel = strtol(vers[2], NULL, 10);
pmix_argv_free(vers);
if (1 == major && (2 != minor || 5 > rel)) {
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"error parsing connect-ack from client ON SOCKET %d", pnd->sd);
"connection request from client of unsupported version %s", version);
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* get any provided credential */
if (1 == major) {
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
cred = ptr;
ptr += strlen(cred) + 1;
len -= strlen(cred) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
} else {
if (sizeof(size_t) < len) {
memcpy(&credlen, ptr, sizeof(size_t));
ptr += sizeof(size_t);
len -= sizeof(size_t);
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
if (0 < credlen) {
cred = ptr;
ptr += credlen;
len -= credlen;
}
}
/* get their sec module */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
sec = ptr;
ptr += strlen(sec) + 1;
len -= strlen(sec) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* get their bfrops module */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
bfrops = ptr;
ptr += strlen(bfrops) + 1;
len -= strlen(bfrops) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* get their buffer type */
if (0 < len) {
bftype = ptr[0];
ptr += 1;
len -= 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* get their gds module */
PMIX_STRNLEN(msglen, ptr, len);
if (msglen < len) {
gds = ptr;
ptr += strlen(gds) + 1;
len -= strlen(gds) + 1;
} else {
free(msg);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
@ -463,11 +536,6 @@ static void connection_handler(int sd, short args, void *cbdata)
"connect-ack recvd from peer %s:%d:%s on socket %d",
nspace, rank, version, pnd->sd);
/* do not check the version - we only retain it at this
* time in case we need to check it at some future date.
* For now, our intent is to retain backward compatibility
* and so we will assume that all versions are compatible. */
/* see if we know this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_nspace_t) {
@ -509,8 +577,25 @@ static void connection_handler(int sd, short args, void *cbdata)
rc = PMIX_ERR_NOMEM;
goto error;
}
/* mark it as being a v1 type */
psave->proc_type = PMIX_PROC_CLIENT | PMIX_PROC_V1;
/* mark it as being a client of the correct type */
if (1 == major) {
psave->proc_type = PMIX_PROC_CLIENT | PMIX_PROC_V1;
} else if (2 == major && 0 == minor) {
psave->proc_type = PMIX_PROC_CLIENT | PMIX_PROC_V20;
} else if (2 == major && 1 == minor) {
psave->proc_type = PMIX_PROC_CLIENT | PMIX_PROC_V21;
} else if (3 == major) {
psave->proc_type = PMIX_PROC_CLIENT | PMIX_PROC_V3;
} else {
/* we don't recognize this version */
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"connection request from client of unrecognized version %s", version);
free(msg);
PMIX_RELEASE(psave);
CLOSE_THE_SOCKET(pnd->sd);
PMIX_RELEASE(pnd);
return;
}
/* add the nspace tracker */
PMIX_RETAIN(nptr);
psave->nptr = nptr;
@ -531,7 +616,7 @@ static void connection_handler(int sd, short args, void *cbdata)
info->peerid = psave->index;
/* get the appropriate compatibility modules */
nptr->compat.psec = pmix_psec_base_assign_module(sec_mode);
nptr->compat.psec = pmix_psec_base_assign_module(sec);
if (NULL == nptr->compat.psec) {
free(msg);
info->proc_cnt--;
@ -541,8 +626,9 @@ static void connection_handler(int sd, short args, void *cbdata)
/* send an error reply to the client */
goto error;
}
/* we need the v1.2 bfrops module */
nptr->compat.bfrops = pmix_bfrops_base_assign_module("v12");
/* set the bfrops module to match this peer */
nptr->compat.bfrops = pmix_bfrops_base_assign_module(bfrops);
if (NULL == nptr->compat.bfrops) {
free(msg);
info->proc_cnt--;
@ -552,16 +638,20 @@ static void connection_handler(int sd, short args, void *cbdata)
/* send an error reply to the client */
goto error;
}
/* we have no way of knowing their buffer type, so take our default */
nptr->compat.type = pmix_bfrops_globals.default_type;
/* set the buffer type */
nptr->compat.type = bftype;
/* take the highest priority gds module - in the absence of any info,
* we assume they can handle both dstore and hash */
nptr->compat.gds = pmix_gds_base_assign_module(NULL, 0);
/* set the gds module to match this peer */
if (NULL != gds) {
PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, gds, PMIX_STRING);
nptr->compat.gds = pmix_gds_base_assign_module(&ginfo, 1);
PMIX_INFO_DESTRUCT(&ginfo);
} else {
nptr->compat.gds = pmix_gds_base_assign_module(NULL, 0);
}
if (NULL == nptr->compat.gds) {
free(msg);
info->proc_cnt--;
PMIX_RELEASE(info);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
PMIX_RELEASE(psave);
/* send an error reply to the client */
@ -581,11 +671,13 @@ static void connection_handler(int sd, short args, void *cbdata)
}
PMIX_PSEC_VALIDATE_CONNECTION(rc, psave,
PMIX_PROTOCOL_V1, cred, len);
/* now done with the msg */
free(msg);
if (PMIX_SUCCESS != rc) {
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"validation of client credentials failed: %s",
PMIx_Error_string(rc));
free(msg);
info->proc_cnt--;
PMIX_RELEASE(info);
pmix_pointer_array_set_item(&pmix_server_globals.clients, psave->index, NULL);
@ -595,7 +687,6 @@ static void connection_handler(int sd, short args, void *cbdata)
PMIX_RELEASE(pnd);
return;
}
free(msg);
/* send the client's array index */
if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&psave->index, sizeof(int)))) {
@ -642,6 +733,9 @@ static void connection_handler(int sd, short args, void *cbdata)
return;
error:
if (NULL != cred) {
free(cred);
}
/* send an error reply to the client */
if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(pnd->sd, (char*)&rc, sizeof(int))) {
PMIX_ERROR_LOG(rc);

Просмотреть файл

@ -264,7 +264,13 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
}
}
/* see if anyone local is waiting on this data- could be more than one */
return pmix_pending_resolve(nptr, info->pname.rank, PMIX_SUCCESS, NULL);
pmix_output(0, "CHECKING PENDING");
rc = pmix_pending_resolve(nptr, info->pname.rank, PMIX_SUCCESS, NULL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
pmix_output(0, "RETURNING %d", rc);
return rc;
}
/* get an existing object for tracking LOCAL participation in a collective