1
1

Cleanup the PMIx direct modex support. Add an MCA parameter pmix_base_async_modex that will cause the async modex to be used when set to 1. Default it to 0 for now

to continue current default behavior.

Also add an MCA param pmix_base_collect_data to direct that the blocking fence shall return all data to each process. Obviously, this param has no effect if async_
modex is used.
Этот коммит содержится в:
Ralph Castain 2015-10-27 11:01:49 -07:00
родитель 3035e14051
Коммит 267ca8fcd3
16 изменённых файлов: 620 добавлений и 457 удалений

Просмотреть файл

@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* exchange connection info - this function may also act as a barrier
* if data exchange is required. The modex occurs solely across procs
* in our job, so no proc array is passed. If a barrier is required,
* the "modex" function will perform it internally
*/
OPAL_MODEX(NULL, 1);
* in our job. If a barrier is required, the "modex" function will
* perform it internally */
OPAL_MODEX();
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));

Просмотреть файл

@ -31,12 +31,21 @@
/* Note that this initializer is important -- do not remove it! See
https://github.com/open-mpi/ompi/issues/375 for details. */
opal_pmix_base_module_t opal_pmix = { 0 };
bool opal_pmix_collect_all_data = false;
bool opal_pmix_collect_all_data = true;
bool opal_pmix_base_allow_delayed_server = false;
int opal_pmix_verbose_output = -1;
bool opal_pmix_base_async_modex = false;
static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
{
opal_pmix_base_async_modex = false;
(void) mca_base_var_register("opal", "pmix", "base", "async_modex", "Use asynchronous modex mode",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
opal_pmix_collect_all_data = true;
(void) mca_base_var_register("opal", "pmix", "base", "collect_data", "Collect all data during modex",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -36,6 +36,8 @@ BEGIN_C_DECLS
/* provide access to the framework verbose output without
* exposing the entire base */
extern int opal_pmix_verbose_output;
extern bool opal_pmix_collect_all_data;
extern bool opal_pmix_base_async_modex;
extern int opal_pmix_base_exchange(opal_value_t *info,
opal_pmix_pdata_t *pdat,
int timeout);
@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t *info,
* that takes into account directives and availability of
* non-blocking operations
*/
#define OPAL_MODEX(p, s) \
do { \
opal_pmix.commit(); \
opal_pmix.fence((p), (s)); \
#define OPAL_MODEX() \
do { \
opal_pmix.commit(); \
if (!opal_pmix_base_async_modex) { \
opal_pmix.fence(NULL, \
opal_pmix_collect_all_data); \
} \
} while(0);
/**

Просмотреть файл

@ -183,7 +183,7 @@ BEGIN_C_DECLS
/**** PMIX ERROR CONSTANTS ****/
/* PMIx errors are always negative, with 0 reserved for success */
#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries in enum
#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries in enum
typedef enum {
PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN,
@ -230,6 +230,7 @@ typedef enum {
PMIX_ERR_INVALID_CRED,
PMIX_EXISTS,
PMIX_ERR_SILENT,
PMIX_ERROR,
PMIX_SUCCESS
} pmix_status_t;

Просмотреть файл

@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
PMIX_RELEASE(bptr); // free's the data region
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
PMIX_ERROR_LOG(rc);
rc = PMIX_ERR_SILENT; // avoid error-logging twice
break;
}
}

Просмотреть файл

@ -16,4 +16,5 @@ sources += \
src/server/pmix_server.c \
src/server/pmix_server_ops.c \
src/server/pmix_server_regex.c \
src/server/pmix_server_listener.c
src/server/pmix_server_listener.c \
src/server/pmix_server_get.c

Просмотреть файл

@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void *cbdata)
pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata;
pmix_usock_send_t *snd;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] queue callback called: reply to %s:%d on tag %d,"
"event_is_active=%d",
"[%s:%d] queue callback called: reply to %s:%d on tag %d",
__FILE__, __LINE__,
(queue->peer)->info->nptr->nspace,
(queue->peer)->info->rank, (queue->tag),
@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, void *cbdata)
queue->buf = (b); \
queue->tag = (t); \
pmix_output_verbose(2, pmix_globals.debug_output, \
"[%s:%d] queue reply to %s:%d on tag %d," \
"event_is_active=%d", \
"[%s:%d] queue reply to %s:%d on tag %d", \
__FILE__, __LINE__, \
(queue->peer)->info->nptr->nspace, \
(queue->peer)->info->rank, (queue->tag), \
(queue->peer)->send_ev_active); \
(queue->peer)->info->rank, (queue->tag)); \
event_assign(&queue->ev, pmix_globals.evbase, -1, \
EV_WRITE, _queue_message, queue); \
event_priority_set(&queue->ev, 0); \
@ -723,7 +720,7 @@ static void _register_client(int sd, short args, void *cbdata)
* someone has been waiting for a request on a remote proc
* in one of our nspaces, but we didn't know all the local procs
* and so couldn't determine the proc was remote */
pmix_pending_nspace_fix(nptr);
pmix_pending_nspace_requests(nptr);
}
/* let the caller know we are done */
if (NULL != cd->opcbfunc) {

Просмотреть файл

@ -0,0 +1,552 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
* All rights reserved.
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <private/autogen/config.h>
#include <pmix/rename.h>
#include <private/types.h>
#include <private/pmix_stdint.h>
#include <private/pmix_socket_errno.h>
#include <pmix_server.h>
#include "src/include/pmix_globals.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include <fcntl.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include PMIX_EVENT_HEADER
#include "src/class/pmix_list.h"
#include "src/buffer_ops/buffer_ops.h"
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/output.h"
#include "src/util/pmix_environ.h"
#include "src/util/progress_threads.h"
#include "src/usock/usock.h"
#include "src/sec/pmix_sec.h"
#include "pmix_server_ops.h"
extern pmix_server_module_t pmix_host_server;
typedef struct {
pmix_object_t super;
pmix_event_t ev;
pmix_status_t status;
const char *data;
size_t ndata;
pmix_dmdx_local_t *lcd;
pmix_release_cbfunc_t relcbfunc;
void *cbdata;
} pmix_dmdx_reply_caddy_t;
static void dcd_con(pmix_dmdx_reply_caddy_t *p)
{
p->status = PMIX_ERROR;
p->ndata = 0;
p->lcd = NULL;
p->relcbfunc = NULL;
p->cbdata = NULL;
}
PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
pmix_object_t, dcd_con, NULL);
static void dmdx_cbfunc(pmix_status_t status, const char *data,
size_t ndata, void *cbdata,
pmix_release_cbfunc_t relfn, void *relcbdata);
static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank,
pmix_modex_cbfunc_t cbfunc, void *cbdata);
static pmix_status_t create_local_tracker(char nspace[], int rank,
pmix_info_t info[], size_t ninfo,
pmix_modex_cbfunc_t cbfunc,
void *cbdata,
pmix_dmdx_local_t **lcd);
/* declare a function whose sole purpose is to
* free data that we provided to our host server
* when servicing dmodex requests */
static void relfn(void *cbdata)
{
char *data = (char*)cbdata;
free(data);
}
pmix_status_t pmix_server_get(pmix_buffer_t *buf,
pmix_modex_cbfunc_t cbfunc,
void *cbdata)
{
int32_t cnt;
pmix_status_t rc;
int rank;
char *cptr;
char nspace[PMIX_MAX_NSLEN+1];
pmix_nspace_t *ns, *nptr;
pmix_info_t *info=NULL;
size_t ninfo=0;
pmix_dmdx_local_t *lcd, *cd;
pmix_rank_info_t *iptr;
pmix_hash_table_t *ht;
bool local;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd GET");
/* setup */
memset(nspace, 0, sizeof(nspace));
/* retrieve the nspace and rank of the requested proc */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
return rc;
}
(void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
free(cptr);
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* retrieve any provided info structs */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
cnt = ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
PMIX_INFO_FREE(info, ninfo);
return rc;
}
}
/* find the nspace object for this client */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(nspace, ns->nspace)) {
nptr = ns;
break;
}
}
pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d EXECUTE GET FOR %s:%d",
pmix_globals.myid.nspace,
pmix_globals.myid.rank, nspace, rank);
if (NULL == nptr || NULL == nptr->server) {
/* this is for an nspace we don't know about yet, so
* record the request for data from this process and
* give the host server a chance to tell us about it */
rc = create_local_tracker(nspace, rank, info, ninfo,
cbfunc, cbdata, &lcd);
return rc;
}
/* We have to wait for all local clients to be registered before
* we can know whether this request is for data from a local or a
* remote client because one client might ask for data about another
* client that the host RM hasn't told us about yet. Fortunately,
* we do know how many clients to expect, so first check to see if
* all clients have been registered with us */
if (!nptr->server->all_registered) {
/* we cannot do anything further, so just track this request
* for now */
rc = create_local_tracker(nspace, rank, info, ninfo,
cbfunc, cbdata, &lcd);
return rc;
}
/* Since we know about all the local clients in this nspace,
* let's first try to satisfy the request with any available data.
* By default, we assume we are looking for data from a remote
* client, and then check to see if this is one of my local
* clients - if so, then we look in that hash table */
ht = &nptr->server->remote;
local = false;
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
if (iptr->rank == rank) {
/* it is known local client - check the local table */
ht = &nptr->server->mylocal;
local = true;
break;
}
}
/* see if we already have this data */
rc = _satisfy_request(ht, rank, cbfunc, cbdata);
if( PMIX_SUCCESS == rc ){
/* request was successfully satisfied */
PMIX_INFO_FREE(info, ninfo);
return rc;
}
/* If we get here, then we don't have the data at this time. Check
* to see if we already have a pending request for the data - if
* we do, then we can just wait for it to arrive */
rc = create_local_tracker(nspace, rank, info, ninfo,
cbfunc, cbdata, &lcd);
if (PMIX_SUCCESS == rc) {
/* we are already waiting for the data - nothing more
* for us to do as the function added the new request
* to the tracker for us */
return PMIX_SUCCESS;
}
if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) {
/* we have a problem - e.g., out of memory */
return rc;
}
/* Getting here means that we didn't already have a request for
* for data pending, and so we created a new tracker for this
* request. We know the identity of all our local clients, so
* if this is one, then we have nothing further to do - we will
* fulfill the request once the process commits its data */
if (local) {
return PMIX_SUCCESS;
}
/* this isn't a local client of ours, so we need to ask the host
* resource manager server to please get the info for us from
* whomever is hosting the target process */
if (NULL != pmix_host_server.direct_modex) {
rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
} else {
/* if we don't have direct modex feature, just respond with "not found" */
cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
PMIX_INFO_FREE(info, ninfo);
pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
PMIX_RELEASE(lcd);
rc = PMIX_ERR_NOT_FOUND;
}
return rc;
}
static pmix_status_t create_local_tracker(char nspace[], int rank,
pmix_info_t info[], size_t ninfo,
pmix_modex_cbfunc_t cbfunc,
void *cbdata,
pmix_dmdx_local_t **ld)
{
pmix_dmdx_local_t *lcd, *cd;
pmix_dmdx_request_t *req;
pmix_status_t rc;
/* define default */
*ld = NULL;
/* see if we already have an existing request for data
* from this namespace/rank */
lcd = NULL;
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
rank != cd->proc.rank ) {
continue;
}
lcd = cd;
break;
}
if (NULL != lcd) {
/* we already have a request, so just track that someone
* else wants data from the same target */
rc = PMIX_SUCCESS; // indicates we found an existing request
goto complete;
}
/* we do not have an existing request, so let's create
* one and add it to our list */
lcd = PMIX_NEW(pmix_dmdx_local_t);
if (NULL == lcd){
PMIX_INFO_FREE(info, ninfo);
return PMIX_ERR_NOMEM;
}
strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN);
lcd->proc.rank = rank;
lcd->info = info;
lcd->ninfo = ninfo;
pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new request tracker
complete:
/* track this specific requestor so we return the
* data to them */
req = PMIX_NEW(pmix_dmdx_request_t);
req->cbfunc = cbfunc;
req->cbdata = cbdata;
pmix_list_append(&lcd->loc_reqs, &req->super);
*ld = lcd;
return rc;
}
void pmix_pending_nspace_requests(pmix_nspace_t *nptr)
{
pmix_dmdx_local_t *cd, *cd_next;
/* Now that we know all local ranks, go along request list and ask for remote data
* for the non-local ranks, and resolve all pending requests for local procs
* that were waiting for registration to complete
*/
PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
pmix_rank_info_t *info;
bool found = false;
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
continue;
}
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
if (info->rank == cd->proc.rank) {
found = true; // we will satisy this request upon commit from new proc
break;
}
}
/* if not found - this is remote process and we need to send
* corresponding direct modex request */
if( !found ){
if( NULL != pmix_host_server.direct_modex ){
pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
} else {
pmix_dmdx_request_t *req, *req_next;
PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
pmix_list_remove_item(&cd->loc_reqs, &req->super);
PMIX_RELEASE(req);
}
pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
PMIX_RELEASE(cd);
}
}
}
}
static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank,
pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
pmix_status_t rc;
pmix_value_t *val;
char *data;
size_t sz;
pmix_buffer_t xfer, pbkt, *xptr;
/* check to see if this data already has been
* obtained as a result of a prior direct modex request from
* a remote peer, or due to data from a local client
* having been committed */
rc = pmix_hash_fetch(ht, rank, "modex", &val);
if (PMIX_SUCCESS == rc && NULL != val) {
/* the client is expecting this to arrive as a byte object
* containing a buffer, so package it accordingly */
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
xptr = &xfer;
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER);
xfer.base_ptr = NULL; // protect the passed data
xfer.bytes_used = 0;
PMIX_DESTRUCT(&xfer);
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
PMIX_DESTRUCT(&pbkt);
PMIX_VALUE_RELEASE(val);
/* pass it back */
cbfunc(rc, data, sz, cbdata, relfn, data);
return rc;
}
return PMIX_ERR_NOT_FOUND;
}
/* Resolve pending requests to this namespace/rank */
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
pmix_status_t status, pmix_dmdx_local_t *lcd)
{
pmix_dmdx_local_t *cd;
/* find corresponding request (if exists) */
if (NULL == lcd && NULL != nptr) {
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
rank != cd->proc.rank) {
continue;
}
lcd = cd;
break;
}
}
/* If somebody was interested in this rank */
if (NULL != lcd) {
pmix_dmdx_request_t *req;
if (PMIX_SUCCESS != status){
/* if we've got an error for this request - just forward it*/
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
/* if we can't satisfy this request - respond with error */
req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
}
} else if (NULL != nptr) {
/* if we've got the blob - try to satisfy requests */
pmix_hash_table_t *ht;
pmix_rank_info_t *iptr;
/* by default we are looking for the remote data */
ht = &nptr->server->remote;
/* check if this rank is local */
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
if (iptr->rank == rank) {
ht = &nptr->server->mylocal;
break;
}
}
/* run through all the requests to this rank */
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
pmix_status_t rc;
rc = _satisfy_request(ht, rank, req->cbfunc, req->cbdata);
if( PMIX_SUCCESS != rc ){
/* if we can't satisfy this particular request (missing key?) */
req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
}
}
}
/* remove all requests to this rank and cleanup the corresponding structure */
pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd);
PMIX_RELEASE(lcd);
}
return PMIX_SUCCESS;
}
/* process the returned data from the host RM server */
static void _process_dmdx_reply(int fd, short args, void *cbdata)
{
pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
pmix_kval_t *kp;
pmix_nspace_t *ns, *nptr;
pmix_status_t rc;
pmix_buffer_t xfer, pbkt, *xptr;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] process dmdx reply from %s:%d",
__FILE__, __LINE__,
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
/* find the nspace object for this client */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
nptr = ns;
break;
}
}
if (NULL == nptr) {
/* should be impossible */
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
caddy->status = PMIX_ERR_NOT_FOUND;
goto cleanup;
}
/* if the request was successfully satisfied, then store the data
* in our hash table for remote procs. Although we could immediately
* resolve any outstanding requests on our tracking list, we instead
* store the data first so we can immediately satisfy any future
* requests. Then, rather than duplicate the resolve code here, we
* will let the pmix_pending_resolve function go ahead and retrieve
* it from the hash table */
if (PMIX_SUCCESS == caddy->status) {
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
/* we don't know if the host is going to save this data
* or not, so we have to copy it - the client is expecting
* this to arrive as a byte object containing a buffer, so
* package it accordingly */
kp->value->data.bo.bytes = malloc(caddy->ndata);
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
kp->value->data.bo.size = caddy->ndata;
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp); // maintain acctg
}
/* always execute the callback to avoid having the client hang */
pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
cleanup:
/* now call the release function so the host server
* knows it can release the data */
if (NULL != caddy->relcbfunc) {
caddy->relcbfunc(caddy->cbdata);
}
PMIX_RELEASE(caddy);
}
/* this is the callback function that the host RM server will call
* when it gets requested info back from a remote server */
static void dmdx_cbfunc(pmix_status_t status,
const char *data, size_t ndata, void *cbdata,
pmix_release_cbfunc_t release_fn, void *release_cbdata)
{
pmix_dmdx_reply_caddy_t *caddy;
/* because the host RM is calling us from their own thread, we
* need to thread-shift into our local progress thread before
* accessing any global info */
caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
caddy->status = status;
/* point to the callers cbfunc */
caddy->relcbfunc = release_fn;
caddy->cbdata = release_cbdata;
/* point to the returned data and our own internal
* tracker */
caddy->data = data;
caddy->ndata = ndata;
caddy->lcd = (pmix_dmdx_local_t *)cbdata;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] queue dmdx reply for %s:%d",
__FILE__, __LINE__,
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
_process_dmdx_reply, caddy);
event_priority_set(&caddy->ev, 0);
event_active(&caddy->ev, EV_WRITE, 1);
}

Просмотреть файл

@ -58,246 +58,6 @@
pmix_server_module_t pmix_host_server = {0};
static void dmdx_cbfunc(pmix_status_t status, const char *data,
size_t ndata, void *cbdata,
pmix_release_cbfunc_t relfn, void *relcbdata);
typedef struct {
pmix_object_t super;
pmix_event_t ev;
pmix_status_t status;
const char *data;
size_t ndata;
pmix_dmdx_local_t *lcd;
pmix_release_cbfunc_t relcbfunc;
void *cbdata;
} pmix_dmdx_reply_caddy_t;
PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
pmix_object_t, NULL, NULL);
static void relfn(void *cbdata)
{
char *data = (char*)cbdata;
free(data);
}
static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank,
pmix_hash_table_t *ht,
pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
pmix_status_t rc;
pmix_buffer_t pbkt, xfer;
pmix_value_t *val;
char *data;
size_t sz;
/* check to see if this data already has been
* obtained as a result of a prior direct modex request from
* another local peer */
rc = pmix_hash_fetch(ht, rank, "modex", &val);
if (PMIX_SUCCESS == rc && NULL != val) {
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
pmix_buffer_t *pxfer = &xfer;
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
xfer.base_ptr = NULL;
xfer.bytes_used = 0;
PMIX_DESTRUCT(&xfer);
PMIX_VALUE_RELEASE(val);
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
PMIX_DESTRUCT(&pbkt);
/* pass it back */
cbfunc(rc, data, sz, cbdata, relfn, data);
return rc;
}
return PMIX_ERR_NOT_FOUND;
}
pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank,
pmix_info_t *info, size_t ninfo,
pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
pmix_dmdx_local_t *lcd = NULL, *cd;
pmix_rank_info_t *iptr;
pmix_hash_table_t *ht;
pmix_status_t rc;
/* 1. Try to satisfy the request right now */
/* by default we are looking for the remote data */
ht = &nptr->server->remote;
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
if (iptr->rank == rank) {
/* in case it is known local rank - check local table */
ht = &nptr->server->mylocal;
break;
}
}
rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata);
if( PMIX_SUCCESS == rc ){
/* request was successfully satisfied */
PMIX_INFO_FREE(info, ninfo);
return rc;
}
/* 2. We were unable to satisfy request right now.
* Look for existing requests to this namespace/rank */
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
rank != cd->proc.rank ) {
continue;
}
lcd = cd;
break;
}
/* 3. If no requests exists then:
* - if all local clients are registered then we were called because
* the remote data was requested. Create request and call direct modex
* to retrieve the data
* - if not all local ranks were registered, we need to wait untill
* pmix_pending_localy_fin would be called to resolve this. Just add the
* request for now.
*/
if (NULL == lcd) {
lcd = PMIX_NEW(pmix_dmdx_local_t);
if (NULL == lcd){
PMIX_INFO_FREE(info, ninfo);
return PMIX_ERR_NOMEM;
}
strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
lcd->proc.rank = rank;
lcd->info = info;
lcd->ninfo = ninfo;
pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
/* check & send request if need/possible */
if (nptr->server->all_registered && NULL == info) {
if (NULL != pmix_host_server.direct_modex) {
pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
} else {
/* if we don't have direct modex feature, just respond with "not found" */
cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
PMIX_INFO_FREE(info, ninfo);
pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
PMIX_RELEASE(lcd);
return PMIX_SUCCESS;
}
}
}
pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t);
req->cbfunc = cbfunc;
req->cbdata = cbdata;
pmix_list_append(&lcd->loc_reqs, &req->super);
return PMIX_SUCCESS;
}
void pmix_pending_nspace_fix(pmix_nspace_t *nptr)
{
pmix_dmdx_local_t *cd, *cd_next;
/* Now when we know all local ranks, go along request list and ask for remote data
* for the non-local ranks, and resolve all pending requests for local procs
* that were waiting for registration to complete
*/
PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
pmix_rank_info_t *info;
bool found = false;
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
continue;
}
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
if (info->rank == cd->proc.rank) {
found = true;
break;
}
}
/* if not found - this is remote process and we need to send
* corresponding direct modex request */
if( !found ){
if( NULL != pmix_host_server.direct_modex ){
pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
} else {
pmix_dmdx_request_t *req, *req_next;
PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
pmix_list_remove_item(&cd->loc_reqs, &req->super);
PMIX_RELEASE(req);
}
pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
PMIX_RELEASE(cd);
}
}
}
}
/* Resolve pending requests to this namespace/rank */
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
pmix_status_t status, pmix_dmdx_local_t *lcd)
{
pmix_dmdx_local_t *cd;
/* find corresponding request (if exists) */
if( NULL == lcd ){
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
rank != cd->proc.rank) {
continue;
}
lcd = cd;
break;
}
}
/* If somebody was interested in this rank */
if( NULL != lcd ){
pmix_dmdx_request_t *req;
if (PMIX_SUCCESS != status){
/* if we've got an error for this request - just forward it*/
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
/* if we can't satisfy this request - respond with error */
req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
}
} else {
/* if we've got the blob - try to satisfy requests */
pmix_hash_table_t *ht;
pmix_rank_info_t *iptr;
/* by default we are looking for the remote data */
ht = &nptr->server->remote;
/* check if this rank is local */
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
if (iptr->rank == rank) {
ht = &nptr->server->mylocal;
break;
}
}
/* run through all the requests to this rank */
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
pmix_status_t rc;
rc = _satisfy_request(nptr, rank, ht, req->cbfunc, req->cbdata);
if( PMIX_SUCCESS != rc ){
/* if we can't satisfy this particular request (missing key?) */
req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
}
}
}
/* remove all requests to this rank and cleanup the corresponding structure */
pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd);
PMIX_RELEASE(lcd);
}
return PMIX_SUCCESS;
}
pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
@ -436,13 +196,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
* may not be a contribution */
if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, info->rank, "modex", &val) &&
NULL != val) {
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
pmix_buffer_t *pxfer = &xfer;
pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
xfer.base_ptr = NULL;
xfer.bytes_used = 0;
PMIX_DESTRUCT(&xfer);
PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes, val->data.bo.size);
PMIX_VALUE_RELEASE(val);
}
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
@ -457,7 +211,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
PMIX_RELEASE(dcd);
}
}
/* see if anyone local is waiting on this data- could be more than one */
/* see if anyone local is waiting on this data - could be more than one */
return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL);
}
@ -826,163 +580,6 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd,
return rc;
}
static void _process_dmdx_reply(int fd, short args, void *cbdata)
{
pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
pmix_kval_t *kp;
pmix_nspace_t *ns, *nptr;
pmix_status_t rc;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] queue dmdx reply from %s:%d",
__FILE__, __LINE__,
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
/* find the nspace object for this client */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
nptr = ns;
break;
}
}
if (NULL == nptr) {
/* should be impossible */
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
caddy->status = PMIX_ERR_NOT_FOUND;
goto cleanup;
}
if (PMIX_SUCCESS == caddy->status) {
kp = PMIX_NEW(pmix_kval_t);
kp->key = strdup("modex");
PMIX_VALUE_CREATE(kp->value, 1);
kp->value->type = PMIX_BYTE_OBJECT;
/* we don't know if the host is going to save this data
* or not, so we have to copy it */
kp->value->data.bo.bytes = (char*)malloc(caddy->ndata);
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
kp->value->data.bo.size = caddy->ndata;
/* store it in the appropriate hash */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
PMIX_ERROR_LOG(rc);
}
PMIX_RELEASE(kp); // maintain acctg
}
cleanup:
/* always execute the callback to avoid having the client hang */
pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
/* now call the release function so the host server
* knows it can release the data */
if (NULL != caddy->relcbfunc) {
caddy->relcbfunc(caddy->cbdata);
}
PMIX_RELEASE(caddy);
}
static void dmdx_cbfunc(pmix_status_t status,
const char *data, size_t ndata, void *cbdata,
pmix_release_cbfunc_t release_fn, void *release_cbdata)
{
pmix_dmdx_reply_caddy_t *caddy;
caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
caddy->status = status;
/* point to the callers cbfunc */
caddy->relcbfunc = release_fn;
caddy->cbdata = release_cbdata;
caddy->data = data;
caddy->ndata = ndata;
caddy->lcd = (pmix_dmdx_local_t *)cbdata;
pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] queue dmdx reply %s:%d",
__FILE__, __LINE__,
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
_process_dmdx_reply, caddy);
event_priority_set(&caddy->ev, 0);
event_active(&caddy->ev, EV_WRITE, 1);
}
pmix_status_t pmix_server_get(pmix_buffer_t *buf,
pmix_modex_cbfunc_t cbfunc,
void *cbdata)
{
int32_t cnt;
pmix_status_t rc;
int rank;
char *cptr;
char nspace[PMIX_MAX_NSLEN+1];
pmix_nspace_t *ns, *nptr;
pmix_info_t *info=NULL;
size_t ninfo=0;
pmix_output_verbose(2, pmix_globals.debug_output,
"recvd GET");
/* setup */
memset(nspace, 0, sizeof(nspace));
/* retrieve the nspace and rank of the requested proc */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
return rc;
}
(void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
free(cptr);
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* find the nspace object for this client */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strcmp(nspace, ns->nspace)) {
nptr = ns;
break;
}
}
pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d EXECUTE GET FOR %s:%d",
pmix_globals.myid.nspace,
pmix_globals.myid.rank, nspace, rank);
/* retrieve any provided info structs */
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
return rc;
}
if (0 < ninfo) {
PMIX_INFO_CREATE(info, ninfo);
cnt = ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
PMIX_INFO_FREE(info, ninfo);
return rc;
}
}
if (NULL == nptr) {
/* this is for an nspace we don't know about yet, so
* give the host server a chance to tell us about it */
nptr = PMIX_NEW(pmix_nspace_t);
(void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* if we don't have any ranks for this job, protect ourselves here */
if (NULL == nptr->server) {
nptr->server = PMIX_NEW(pmix_server_nspace_t);
}
return pmix_pending_request(nptr, rank, info, ninfo, cbfunc, cbdata);
}
pmix_status_t pmix_server_publish(pmix_peer_t *peer,
pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc, void *cbdata)

Просмотреть файл

@ -183,10 +183,7 @@ void pmix_stop_listening(void);
bool pmix_server_trk_update(pmix_server_trkr_t *trk);
pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank,
pmix_info_t *info, size_t ninfo,
pmix_modex_cbfunc_t cbfunc, void *cbdata);
void pmix_pending_nspace_fix(pmix_nspace_t *nptr);
void pmix_pending_nspace_requests(pmix_nspace_t *nptr);
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
pmix_status_t status, pmix_dmdx_local_t *lcd);

Просмотреть файл

@ -123,6 +123,8 @@ const char* PMIx_Error_string(pmix_status_t errnum)
case PMIX_EXISTS:
return "EXISTS";
case PMIX_ERR_SILENT:
return "SILENT";
case PMIX_ERROR:
return "ERROR";
case PMIX_SUCCESS:

Просмотреть файл

@ -28,9 +28,13 @@
BEGIN_C_DECLS
#define PMIX_ERROR_LOG(r) \
pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \
PMIx_Error_string((r)), __FILE__, __LINE__);
#define PMIX_ERROR_LOG(r) \
do { \
if (PMIX_ERR_SILENT != (r)) { \
pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \
PMIx_Error_string((r)), __FILE__, __LINE__); \
} \
}while(0);
#define PMIX_REPORT_ERROR(e) \
do { \

Просмотреть файл

@ -217,6 +217,7 @@ int pmix1_store_local(const opal_process_name_t *proc, opal_value_t *val)
}
}
if (NULL == job) {
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
return OPAL_ERR_NOT_FOUND;
}
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);

Просмотреть файл

@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t status, void *cbdata)
}
int pmix1_server_register_nspace(opal_jobid_t jobid,
int nlocalprocs,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
int nlocalprocs,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
{
opal_value_t *kv, *k2;
pmix_info_t *pinfo, *pmap;
@ -168,10 +168,17 @@ int pmix1_server_register_nspace(opal_jobid_t jobid,
pmix_status_t rc;
pmix1_opcaddy_t *op;
opal_list_t *pmapinfo;
opal_pmix1_jobid_trkr_t *job;
/* convert the jobid */
(void)snprintf(nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(jobid));
/* store this job in our list of known nspaces */
job = OBJ_NEW(opal_pmix1_jobid_trkr_t);
(void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
job->jobid = jobid;
opal_list_append(&mca_pmix_pmix1xx_component.jobids, &job->super);
/* convert the list to an array of pmix_info_t */
if (NULL != info) {
sz = opal_list_get_size(info);
@ -220,10 +227,10 @@ int pmix1_server_register_nspace(opal_jobid_t jobid,
int pmix1_server_register_client(const opal_process_name_t *proc,
uid_t uid, gid_t gid,
void *server_object,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
uid_t uid, gid_t gid,
void *server_object,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
{
pmix_status_t rc;
pmix1_opcaddy_t *op;
@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbd
}
int pmix1_server_dmodex(const opal_process_name_t *proc,
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
pmix1_opcaddy_t *op;
pmix_status_t rc;

Просмотреть файл

@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
int rc, ret, room_num;
int32_t cnt;
opal_process_name_t target;
opal_value_t kv;
pmix_server_req_t *req;
uint8_t *data = NULL;
int32_t ndata = 0;
@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
return;
}
/* if we got something, store the blobs locally so we can
* meet any further requests without doing a remote fetch.
* This must be done as a single blob for later retrieval */
if (ORTE_SUCCESS == ret && NULL != data) {
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = strdup("modex");
kv.type = OPAL_BYTE_OBJECT;
kv.data.bo.bytes = data;
kv.data.bo.size = ndata;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&target, &kv))) {
ORTE_ERROR_LOG(rc);
}
kv.data.bo.bytes = NULL; // protect the data
kv.data.bo.size = 0;
OBJ_DESTRUCT(&kv);
}
/* check the request out of the tracking hotel */
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
/* return the returned data to the requestor */
if (NULL != req) {
if (NULL != req->mdxcbfunc) {
req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, relcbfunc, data);
} else {
free(data);
}
OBJ_RELEASE(req);
}

Просмотреть файл

@ -197,6 +197,12 @@ static void dmodex_req(int sd, short args, void *cbdata)
goto callback;
}
/* if we are the host daemon, then this is a local request, so
* just wait for the data to come in */
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
return;
}
/* construct a request message */
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {