Merge pull request #1073 from rhc54/topic/pmix
Cleanup the PMIx direct modex support.
Этот коммит содержится в:
Коммит
b603307f7d
@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
|||||||
|
|
||||||
/* exchange connection info - this function may also act as a barrier
|
/* exchange connection info - this function may also act as a barrier
|
||||||
* if data exchange is required. The modex occurs solely across procs
|
* if data exchange is required. The modex occurs solely across procs
|
||||||
* in our job, so no proc array is passed. If a barrier is required,
|
* in our job. If a barrier is required, the "modex" function will
|
||||||
* the "modex" function will perform it internally
|
* perform it internally */
|
||||||
*/
|
OPAL_MODEX();
|
||||||
OPAL_MODEX(NULL, 1);
|
|
||||||
|
|
||||||
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));
|
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));
|
||||||
|
|
||||||
|
@ -31,12 +31,21 @@
|
|||||||
/* Note that this initializer is important -- do not remove it! See
|
/* Note that this initializer is important -- do not remove it! See
|
||||||
https://github.com/open-mpi/ompi/issues/375 for details. */
|
https://github.com/open-mpi/ompi/issues/375 for details. */
|
||||||
opal_pmix_base_module_t opal_pmix = { 0 };
|
opal_pmix_base_module_t opal_pmix = { 0 };
|
||||||
bool opal_pmix_collect_all_data = false;
|
bool opal_pmix_collect_all_data = true;
|
||||||
bool opal_pmix_base_allow_delayed_server = false;
|
bool opal_pmix_base_allow_delayed_server = false;
|
||||||
int opal_pmix_verbose_output = -1;
|
int opal_pmix_verbose_output = -1;
|
||||||
|
bool opal_pmix_base_async_modex = false;
|
||||||
|
|
||||||
static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
|
static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
|
||||||
{
|
{
|
||||||
|
opal_pmix_base_async_modex = false;
|
||||||
|
(void) mca_base_var_register("opal", "pmix", "base", "async_modex", "Use asynchronous modex mode",
|
||||||
|
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
|
||||||
|
opal_pmix_collect_all_data = true;
|
||||||
|
(void) mca_base_var_register("opal", "pmix", "base", "collect_data", "Collect all data during modex",
|
||||||
|
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data);
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,8 @@ BEGIN_C_DECLS
|
|||||||
/* provide access to the framework verbose output without
|
/* provide access to the framework verbose output without
|
||||||
* exposing the entire base */
|
* exposing the entire base */
|
||||||
extern int opal_pmix_verbose_output;
|
extern int opal_pmix_verbose_output;
|
||||||
|
extern bool opal_pmix_collect_all_data;
|
||||||
|
extern bool opal_pmix_base_async_modex;
|
||||||
extern int opal_pmix_base_exchange(opal_value_t *info,
|
extern int opal_pmix_base_exchange(opal_value_t *info,
|
||||||
opal_pmix_pdata_t *pdat,
|
opal_pmix_pdata_t *pdat,
|
||||||
int timeout);
|
int timeout);
|
||||||
@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t *info,
|
|||||||
* that takes into account directives and availability of
|
* that takes into account directives and availability of
|
||||||
* non-blocking operations
|
* non-blocking operations
|
||||||
*/
|
*/
|
||||||
#define OPAL_MODEX(p, s) \
|
#define OPAL_MODEX() \
|
||||||
do { \
|
do { \
|
||||||
opal_pmix.commit(); \
|
opal_pmix.commit(); \
|
||||||
opal_pmix.fence((p), (s)); \
|
if (!opal_pmix_base_async_modex) { \
|
||||||
|
opal_pmix.fence(NULL, \
|
||||||
|
opal_pmix_collect_all_data); \
|
||||||
|
} \
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -183,7 +183,7 @@ BEGIN_C_DECLS
|
|||||||
|
|
||||||
/**** PMIX ERROR CONSTANTS ****/
|
/**** PMIX ERROR CONSTANTS ****/
|
||||||
/* PMIx errors are always negative, with 0 reserved for success */
|
/* PMIx errors are always negative, with 0 reserved for success */
|
||||||
#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries in enum
|
#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries in enum
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN,
|
PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN,
|
||||||
@ -230,6 +230,7 @@ typedef enum {
|
|||||||
PMIX_ERR_INVALID_CRED,
|
PMIX_ERR_INVALID_CRED,
|
||||||
PMIX_EXISTS,
|
PMIX_EXISTS,
|
||||||
|
|
||||||
|
PMIX_ERR_SILENT,
|
||||||
PMIX_ERROR,
|
PMIX_ERROR,
|
||||||
PMIX_SUCCESS
|
PMIX_SUCCESS
|
||||||
} pmix_status_t;
|
} pmix_status_t;
|
||||||
|
@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
|
|||||||
PMIX_RELEASE(bptr); // free's the data region
|
PMIX_RELEASE(bptr); // free's the data region
|
||||||
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
|
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
|
||||||
PMIX_ERROR_LOG(rc);
|
PMIX_ERROR_LOG(rc);
|
||||||
|
rc = PMIX_ERR_SILENT; // avoid error-logging twice
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,4 +16,5 @@ sources += \
|
|||||||
src/server/pmix_server.c \
|
src/server/pmix_server.c \
|
||||||
src/server/pmix_server_ops.c \
|
src/server/pmix_server_ops.c \
|
||||||
src/server/pmix_server_regex.c \
|
src/server/pmix_server_regex.c \
|
||||||
src/server/pmix_server_listener.c
|
src/server/pmix_server_listener.c \
|
||||||
|
src/server/pmix_server_get.c
|
||||||
|
@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void *cbdata)
|
|||||||
pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata;
|
pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata;
|
||||||
pmix_usock_send_t *snd;
|
pmix_usock_send_t *snd;
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
"[%s:%d] queue callback called: reply to %s:%d on tag %d,"
|
"[%s:%d] queue callback called: reply to %s:%d on tag %d",
|
||||||
"event_is_active=%d",
|
|
||||||
__FILE__, __LINE__,
|
__FILE__, __LINE__,
|
||||||
(queue->peer)->info->nptr->nspace,
|
(queue->peer)->info->nptr->nspace,
|
||||||
(queue->peer)->info->rank, (queue->tag),
|
(queue->peer)->info->rank, (queue->tag),
|
||||||
@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, void *cbdata)
|
|||||||
queue->buf = (b); \
|
queue->buf = (b); \
|
||||||
queue->tag = (t); \
|
queue->tag = (t); \
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output, \
|
pmix_output_verbose(2, pmix_globals.debug_output, \
|
||||||
"[%s:%d] queue reply to %s:%d on tag %d," \
|
"[%s:%d] queue reply to %s:%d on tag %d", \
|
||||||
"event_is_active=%d", \
|
|
||||||
__FILE__, __LINE__, \
|
__FILE__, __LINE__, \
|
||||||
(queue->peer)->info->nptr->nspace, \
|
(queue->peer)->info->nptr->nspace, \
|
||||||
(queue->peer)->info->rank, (queue->tag), \
|
(queue->peer)->info->rank, (queue->tag)); \
|
||||||
(queue->peer)->send_ev_active); \
|
|
||||||
event_assign(&queue->ev, pmix_globals.evbase, -1, \
|
event_assign(&queue->ev, pmix_globals.evbase, -1, \
|
||||||
EV_WRITE, _queue_message, queue); \
|
EV_WRITE, _queue_message, queue); \
|
||||||
event_priority_set(&queue->ev, 0); \
|
event_priority_set(&queue->ev, 0); \
|
||||||
@ -723,7 +720,7 @@ static void _register_client(int sd, short args, void *cbdata)
|
|||||||
* someone has been waiting for a request on a remote proc
|
* someone has been waiting for a request on a remote proc
|
||||||
* in one of our nspaces, but we didn't know all the local procs
|
* in one of our nspaces, but we didn't know all the local procs
|
||||||
* and so couldn't determine the proc was remote */
|
* and so couldn't determine the proc was remote */
|
||||||
pmix_pending_nspace_fix(nptr);
|
pmix_pending_nspace_requests(nptr);
|
||||||
}
|
}
|
||||||
/* let the caller know we are done */
|
/* let the caller know we are done */
|
||||||
if (NULL != cd->opcbfunc) {
|
if (NULL != cd->opcbfunc) {
|
||||||
|
552
opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
Обычный файл
552
opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
Обычный файл
@ -0,0 +1,552 @@
|
|||||||
|
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
|
||||||
|
* Copyright (c) 2014-2015 Research Organization for Information Science
|
||||||
|
* and Technology (RIST). All rights reserved.
|
||||||
|
* Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
|
||||||
|
* All rights reserved.
|
||||||
|
* Copyright (c) 2015 Mellanox Technologies, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
* $COPYRIGHT$
|
||||||
|
*
|
||||||
|
* Additional copyrights may follow
|
||||||
|
*
|
||||||
|
* $HEADER$
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <private/autogen/config.h>
|
||||||
|
#include <pmix/rename.h>
|
||||||
|
#include <private/types.h>
|
||||||
|
#include <private/pmix_stdint.h>
|
||||||
|
#include <private/pmix_socket_errno.h>
|
||||||
|
|
||||||
|
#include <pmix_server.h>
|
||||||
|
#include "src/include/pmix_globals.h"
|
||||||
|
|
||||||
|
#ifdef HAVE_STRING_H
|
||||||
|
#include <string.h>
|
||||||
|
#endif
|
||||||
|
#include <fcntl.h>
|
||||||
|
#ifdef HAVE_UNISTD_H
|
||||||
|
#include <unistd.h>
|
||||||
|
#endif
|
||||||
|
#ifdef HAVE_SYS_SOCKET_H
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#endif
|
||||||
|
#ifdef HAVE_SYS_UN_H
|
||||||
|
#include <sys/un.h>
|
||||||
|
#endif
|
||||||
|
#ifdef HAVE_SYS_UIO_H
|
||||||
|
#include <sys/uio.h>
|
||||||
|
#endif
|
||||||
|
#ifdef HAVE_SYS_TYPES_H
|
||||||
|
#include <sys/types.h>
|
||||||
|
#endif
|
||||||
|
#include PMIX_EVENT_HEADER
|
||||||
|
|
||||||
|
#include "src/class/pmix_list.h"
|
||||||
|
#include "src/buffer_ops/buffer_ops.h"
|
||||||
|
#include "src/util/argv.h"
|
||||||
|
#include "src/util/error.h"
|
||||||
|
#include "src/util/output.h"
|
||||||
|
#include "src/util/pmix_environ.h"
|
||||||
|
#include "src/util/progress_threads.h"
|
||||||
|
#include "src/usock/usock.h"
|
||||||
|
#include "src/sec/pmix_sec.h"
|
||||||
|
|
||||||
|
#include "pmix_server_ops.h"
|
||||||
|
|
||||||
|
extern pmix_server_module_t pmix_host_server;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pmix_object_t super;
|
||||||
|
pmix_event_t ev;
|
||||||
|
pmix_status_t status;
|
||||||
|
const char *data;
|
||||||
|
size_t ndata;
|
||||||
|
pmix_dmdx_local_t *lcd;
|
||||||
|
pmix_release_cbfunc_t relcbfunc;
|
||||||
|
void *cbdata;
|
||||||
|
} pmix_dmdx_reply_caddy_t;
|
||||||
|
static void dcd_con(pmix_dmdx_reply_caddy_t *p)
|
||||||
|
{
|
||||||
|
p->status = PMIX_ERROR;
|
||||||
|
p->ndata = 0;
|
||||||
|
p->lcd = NULL;
|
||||||
|
p->relcbfunc = NULL;
|
||||||
|
p->cbdata = NULL;
|
||||||
|
}
|
||||||
|
PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
|
||||||
|
pmix_object_t, dcd_con, NULL);
|
||||||
|
|
||||||
|
|
||||||
|
static void dmdx_cbfunc(pmix_status_t status, const char *data,
|
||||||
|
size_t ndata, void *cbdata,
|
||||||
|
pmix_release_cbfunc_t relfn, void *relcbdata);
|
||||||
|
static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank,
|
||||||
|
pmix_modex_cbfunc_t cbfunc, void *cbdata);
|
||||||
|
static pmix_status_t create_local_tracker(char nspace[], int rank,
|
||||||
|
pmix_info_t info[], size_t ninfo,
|
||||||
|
pmix_modex_cbfunc_t cbfunc,
|
||||||
|
void *cbdata,
|
||||||
|
pmix_dmdx_local_t **lcd);
|
||||||
|
|
||||||
|
|
||||||
|
/* declare a function whose sole purpose is to
|
||||||
|
* free data that we provided to our host server
|
||||||
|
* when servicing dmodex requests */
|
||||||
|
static void relfn(void *cbdata)
|
||||||
|
{
|
||||||
|
char *data = (char*)cbdata;
|
||||||
|
free(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pmix_status_t pmix_server_get(pmix_buffer_t *buf,
|
||||||
|
pmix_modex_cbfunc_t cbfunc,
|
||||||
|
void *cbdata)
|
||||||
|
{
|
||||||
|
int32_t cnt;
|
||||||
|
pmix_status_t rc;
|
||||||
|
int rank;
|
||||||
|
char *cptr;
|
||||||
|
char nspace[PMIX_MAX_NSLEN+1];
|
||||||
|
pmix_nspace_t *ns, *nptr;
|
||||||
|
pmix_info_t *info=NULL;
|
||||||
|
size_t ninfo=0;
|
||||||
|
pmix_dmdx_local_t *lcd, *cd;
|
||||||
|
pmix_rank_info_t *iptr;
|
||||||
|
pmix_hash_table_t *ht;
|
||||||
|
bool local;
|
||||||
|
|
||||||
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
|
"recvd GET");
|
||||||
|
|
||||||
|
/* setup */
|
||||||
|
memset(nspace, 0, sizeof(nspace));
|
||||||
|
|
||||||
|
/* retrieve the nspace and rank of the requested proc */
|
||||||
|
cnt = 1;
|
||||||
|
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) {
|
||||||
|
PMIX_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
(void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
|
||||||
|
free(cptr);
|
||||||
|
cnt = 1;
|
||||||
|
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) {
|
||||||
|
PMIX_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
/* retrieve any provided info structs */
|
||||||
|
cnt = 1;
|
||||||
|
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
|
||||||
|
PMIX_ERROR_LOG(rc);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
if (0 < ninfo) {
|
||||||
|
PMIX_INFO_CREATE(info, ninfo);
|
||||||
|
cnt = ninfo;
|
||||||
|
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
|
||||||
|
PMIX_ERROR_LOG(rc);
|
||||||
|
PMIX_INFO_FREE(info, ninfo);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* find the nspace object for this client */
|
||||||
|
nptr = NULL;
|
||||||
|
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
|
||||||
|
if (0 == strcmp(nspace, ns->nspace)) {
|
||||||
|
nptr = ns;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
|
"%s:%d EXECUTE GET FOR %s:%d",
|
||||||
|
pmix_globals.myid.nspace,
|
||||||
|
pmix_globals.myid.rank, nspace, rank);
|
||||||
|
|
||||||
|
if (NULL == nptr || NULL == nptr->server) {
|
||||||
|
/* this is for an nspace we don't know about yet, so
|
||||||
|
* record the request for data from this process and
|
||||||
|
* give the host server a chance to tell us about it */
|
||||||
|
rc = create_local_tracker(nspace, rank, info, ninfo,
|
||||||
|
cbfunc, cbdata, &lcd);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We have to wait for all local clients to be registered before
|
||||||
|
* we can know whether this request is for data from a local or a
|
||||||
|
* remote client because one client might ask for data about another
|
||||||
|
* client that the host RM hasn't told us about yet. Fortunately,
|
||||||
|
* we do know how many clients to expect, so first check to see if
|
||||||
|
* all clients have been registered with us */
|
||||||
|
if (!nptr->server->all_registered) {
|
||||||
|
/* we cannot do anything further, so just track this request
|
||||||
|
* for now */
|
||||||
|
rc = create_local_tracker(nspace, rank, info, ninfo,
|
||||||
|
cbfunc, cbdata, &lcd);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Since we know about all the local clients in this nspace,
|
||||||
|
* let's first try to satisfy the request with any available data.
|
||||||
|
* By default, we assume we are looking for data from a remote
|
||||||
|
* client, and then check to see if this is one of my local
|
||||||
|
* clients - if so, then we look in that hash table */
|
||||||
|
ht = &nptr->server->remote;
|
||||||
|
local = false;
|
||||||
|
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
|
||||||
|
if (iptr->rank == rank) {
|
||||||
|
/* it is known local client - check the local table */
|
||||||
|
ht = &nptr->server->mylocal;
|
||||||
|
local = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* see if we already have this data */
|
||||||
|
rc = _satisfy_request(ht, rank, cbfunc, cbdata);
|
||||||
|
if( PMIX_SUCCESS == rc ){
|
||||||
|
/* request was successfully satisfied */
|
||||||
|
PMIX_INFO_FREE(info, ninfo);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If we get here, then we don't have the data at this time. Check
|
||||||
|
* to see if we already have a pending request for the data - if
|
||||||
|
* we do, then we can just wait for it to arrive */
|
||||||
|
rc = create_local_tracker(nspace, rank, info, ninfo,
|
||||||
|
cbfunc, cbdata, &lcd);
|
||||||
|
if (PMIX_SUCCESS == rc) {
|
||||||
|
/* we are already waiting for the data - nothing more
|
||||||
|
* for us to do as the function added the new request
|
||||||
|
* to the tracker for us */
|
||||||
|
return PMIX_SUCCESS;
|
||||||
|
}
|
||||||
|
if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) {
|
||||||
|
/* we have a problem - e.g., out of memory */
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Getting here means that we didn't already have a request for
|
||||||
|
* for data pending, and so we created a new tracker for this
|
||||||
|
* request. We know the identity of all our local clients, so
|
||||||
|
* if this is one, then we have nothing further to do - we will
|
||||||
|
* fulfill the request once the process commits its data */
|
||||||
|
if (local) {
|
||||||
|
return PMIX_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* this isn't a local client of ours, so we need to ask the host
|
||||||
|
* resource manager server to please get the info for us from
|
||||||
|
* whomever is hosting the target process */
|
||||||
|
if (NULL != pmix_host_server.direct_modex) {
|
||||||
|
rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
|
||||||
|
} else {
|
||||||
|
/* if we don't have direct modex feature, just respond with "not found" */
|
||||||
|
cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
|
||||||
|
PMIX_INFO_FREE(info, ninfo);
|
||||||
|
pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
|
||||||
|
PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
|
||||||
|
PMIX_RELEASE(lcd);
|
||||||
|
rc = PMIX_ERR_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
static pmix_status_t create_local_tracker(char nspace[], int rank,
|
||||||
|
pmix_info_t info[], size_t ninfo,
|
||||||
|
pmix_modex_cbfunc_t cbfunc,
|
||||||
|
void *cbdata,
|
||||||
|
pmix_dmdx_local_t **ld)
|
||||||
|
{
|
||||||
|
pmix_dmdx_local_t *lcd, *cd;
|
||||||
|
pmix_dmdx_request_t *req;
|
||||||
|
pmix_status_t rc;
|
||||||
|
|
||||||
|
/* define default */
|
||||||
|
*ld = NULL;
|
||||||
|
|
||||||
|
/* see if we already have an existing request for data
|
||||||
|
* from this namespace/rank */
|
||||||
|
lcd = NULL;
|
||||||
|
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
||||||
|
if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
|
||||||
|
rank != cd->proc.rank ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
lcd = cd;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (NULL != lcd) {
|
||||||
|
/* we already have a request, so just track that someone
|
||||||
|
* else wants data from the same target */
|
||||||
|
rc = PMIX_SUCCESS; // indicates we found an existing request
|
||||||
|
goto complete;
|
||||||
|
}
|
||||||
|
/* we do not have an existing request, so let's create
|
||||||
|
* one and add it to our list */
|
||||||
|
lcd = PMIX_NEW(pmix_dmdx_local_t);
|
||||||
|
if (NULL == lcd){
|
||||||
|
PMIX_INFO_FREE(info, ninfo);
|
||||||
|
return PMIX_ERR_NOMEM;
|
||||||
|
}
|
||||||
|
strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN);
|
||||||
|
lcd->proc.rank = rank;
|
||||||
|
lcd->info = info;
|
||||||
|
lcd->ninfo = ninfo;
|
||||||
|
pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
|
||||||
|
rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new request tracker
|
||||||
|
|
||||||
|
complete:
|
||||||
|
/* track this specific requestor so we return the
|
||||||
|
* data to them */
|
||||||
|
req = PMIX_NEW(pmix_dmdx_request_t);
|
||||||
|
req->cbfunc = cbfunc;
|
||||||
|
req->cbdata = cbdata;
|
||||||
|
pmix_list_append(&lcd->loc_reqs, &req->super);
|
||||||
|
*ld = lcd;
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
void pmix_pending_nspace_requests(pmix_nspace_t *nptr)
|
||||||
|
{
|
||||||
|
pmix_dmdx_local_t *cd, *cd_next;
|
||||||
|
|
||||||
|
/* Now that we know all local ranks, go along request list and ask for remote data
|
||||||
|
* for the non-local ranks, and resolve all pending requests for local procs
|
||||||
|
* that were waiting for registration to complete
|
||||||
|
*/
|
||||||
|
PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
||||||
|
pmix_rank_info_t *info;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
|
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
|
||||||
|
if (info->rank == cd->proc.rank) {
|
||||||
|
found = true; // we will satisy this request upon commit from new proc
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if not found - this is remote process and we need to send
|
||||||
|
* corresponding direct modex request */
|
||||||
|
if( !found ){
|
||||||
|
if( NULL != pmix_host_server.direct_modex ){
|
||||||
|
pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
|
||||||
|
} else {
|
||||||
|
pmix_dmdx_request_t *req, *req_next;
|
||||||
|
PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
|
||||||
|
req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
|
||||||
|
pmix_list_remove_item(&cd->loc_reqs, &req->super);
|
||||||
|
PMIX_RELEASE(req);
|
||||||
|
}
|
||||||
|
pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
|
||||||
|
PMIX_RELEASE(cd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank,
|
||||||
|
pmix_modex_cbfunc_t cbfunc, void *cbdata)
|
||||||
|
{
|
||||||
|
pmix_status_t rc;
|
||||||
|
pmix_value_t *val;
|
||||||
|
char *data;
|
||||||
|
size_t sz;
|
||||||
|
pmix_buffer_t xfer, pbkt, *xptr;
|
||||||
|
|
||||||
|
/* check to see if this data already has been
|
||||||
|
* obtained as a result of a prior direct modex request from
|
||||||
|
* a remote peer, or due to data from a local client
|
||||||
|
* having been committed */
|
||||||
|
rc = pmix_hash_fetch(ht, rank, "modex", &val);
|
||||||
|
if (PMIX_SUCCESS == rc && NULL != val) {
|
||||||
|
/* the client is expecting this to arrive as a byte object
|
||||||
|
* containing a buffer, so package it accordingly */
|
||||||
|
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
|
||||||
|
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
|
||||||
|
xptr = &xfer;
|
||||||
|
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
|
||||||
|
pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER);
|
||||||
|
xfer.base_ptr = NULL; // protect the passed data
|
||||||
|
xfer.bytes_used = 0;
|
||||||
|
PMIX_DESTRUCT(&xfer);
|
||||||
|
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
|
||||||
|
PMIX_DESTRUCT(&pbkt);
|
||||||
|
PMIX_VALUE_RELEASE(val);
|
||||||
|
/* pass it back */
|
||||||
|
cbfunc(rc, data, sz, cbdata, relfn, data);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
return PMIX_ERR_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Resolve pending requests to this namespace/rank */
|
||||||
|
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
|
||||||
|
pmix_status_t status, pmix_dmdx_local_t *lcd)
|
||||||
|
{
|
||||||
|
pmix_dmdx_local_t *cd;
|
||||||
|
|
||||||
|
/* find corresponding request (if exists) */
|
||||||
|
if (NULL == lcd && NULL != nptr) {
|
||||||
|
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
||||||
|
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
|
||||||
|
rank != cd->proc.rank) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
lcd = cd;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If somebody was interested in this rank */
|
||||||
|
if (NULL != lcd) {
|
||||||
|
pmix_dmdx_request_t *req;
|
||||||
|
|
||||||
|
if (PMIX_SUCCESS != status){
|
||||||
|
/* if we've got an error for this request - just forward it*/
|
||||||
|
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
|
||||||
|
/* if we can't satisfy this request - respond with error */
|
||||||
|
req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
|
||||||
|
}
|
||||||
|
} else if (NULL != nptr) {
|
||||||
|
/* if we've got the blob - try to satisfy requests */
|
||||||
|
pmix_hash_table_t *ht;
|
||||||
|
pmix_rank_info_t *iptr;
|
||||||
|
|
||||||
|
/* by default we are looking for the remote data */
|
||||||
|
ht = &nptr->server->remote;
|
||||||
|
/* check if this rank is local */
|
||||||
|
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
|
||||||
|
if (iptr->rank == rank) {
|
||||||
|
ht = &nptr->server->mylocal;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* run through all the requests to this rank */
|
||||||
|
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
|
||||||
|
pmix_status_t rc;
|
||||||
|
rc = _satisfy_request(ht, rank, req->cbfunc, req->cbdata);
|
||||||
|
if( PMIX_SUCCESS != rc ){
|
||||||
|
/* if we can't satisfy this particular request (missing key?) */
|
||||||
|
req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* remove all requests to this rank and cleanup the corresponding structure */
|
||||||
|
pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd);
|
||||||
|
PMIX_RELEASE(lcd);
|
||||||
|
}
|
||||||
|
return PMIX_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* process the returned data from the host RM server */
|
||||||
|
static void _process_dmdx_reply(int fd, short args, void *cbdata)
|
||||||
|
{
|
||||||
|
pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
|
||||||
|
pmix_kval_t *kp;
|
||||||
|
pmix_nspace_t *ns, *nptr;
|
||||||
|
pmix_status_t rc;
|
||||||
|
pmix_buffer_t xfer, pbkt, *xptr;
|
||||||
|
|
||||||
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
|
"[%s:%d] process dmdx reply from %s:%d",
|
||||||
|
__FILE__, __LINE__,
|
||||||
|
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
|
||||||
|
|
||||||
|
/* find the nspace object for this client */
|
||||||
|
nptr = NULL;
|
||||||
|
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
|
||||||
|
if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
|
||||||
|
nptr = ns;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == nptr) {
|
||||||
|
/* should be impossible */
|
||||||
|
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
|
||||||
|
caddy->status = PMIX_ERR_NOT_FOUND;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if the request was successfully satisfied, then store the data
|
||||||
|
* in our hash table for remote procs. Although we could immediately
|
||||||
|
* resolve any outstanding requests on our tracking list, we instead
|
||||||
|
* store the data first so we can immediately satisfy any future
|
||||||
|
* requests. Then, rather than duplicate the resolve code here, we
|
||||||
|
* will let the pmix_pending_resolve function go ahead and retrieve
|
||||||
|
* it from the hash table */
|
||||||
|
if (PMIX_SUCCESS == caddy->status) {
|
||||||
|
kp = PMIX_NEW(pmix_kval_t);
|
||||||
|
kp->key = strdup("modex");
|
||||||
|
PMIX_VALUE_CREATE(kp->value, 1);
|
||||||
|
kp->value->type = PMIX_BYTE_OBJECT;
|
||||||
|
/* we don't know if the host is going to save this data
|
||||||
|
* or not, so we have to copy it - the client is expecting
|
||||||
|
* this to arrive as a byte object containing a buffer, so
|
||||||
|
* package it accordingly */
|
||||||
|
kp->value->data.bo.bytes = malloc(caddy->ndata);
|
||||||
|
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
|
||||||
|
kp->value->data.bo.size = caddy->ndata;
|
||||||
|
/* store it in the appropriate hash */
|
||||||
|
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
|
||||||
|
PMIX_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
PMIX_RELEASE(kp); // maintain acctg
|
||||||
|
}
|
||||||
|
|
||||||
|
/* always execute the callback to avoid having the client hang */
|
||||||
|
pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
/* now call the release function so the host server
|
||||||
|
* knows it can release the data */
|
||||||
|
if (NULL != caddy->relcbfunc) {
|
||||||
|
caddy->relcbfunc(caddy->cbdata);
|
||||||
|
}
|
||||||
|
PMIX_RELEASE(caddy);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* this is the callback function that the host RM server will call
|
||||||
|
* when it gets requested info back from a remote server */
|
||||||
|
static void dmdx_cbfunc(pmix_status_t status,
|
||||||
|
const char *data, size_t ndata, void *cbdata,
|
||||||
|
pmix_release_cbfunc_t release_fn, void *release_cbdata)
|
||||||
|
{
|
||||||
|
pmix_dmdx_reply_caddy_t *caddy;
|
||||||
|
|
||||||
|
/* because the host RM is calling us from their own thread, we
|
||||||
|
* need to thread-shift into our local progress thread before
|
||||||
|
* accessing any global info */
|
||||||
|
caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
|
||||||
|
caddy->status = status;
|
||||||
|
/* point to the callers cbfunc */
|
||||||
|
caddy->relcbfunc = release_fn;
|
||||||
|
caddy->cbdata = release_cbdata;
|
||||||
|
|
||||||
|
/* point to the returned data and our own internal
|
||||||
|
* tracker */
|
||||||
|
caddy->data = data;
|
||||||
|
caddy->ndata = ndata;
|
||||||
|
caddy->lcd = (pmix_dmdx_local_t *)cbdata;
|
||||||
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
|
"[%s:%d] queue dmdx reply for %s:%d",
|
||||||
|
__FILE__, __LINE__,
|
||||||
|
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
|
||||||
|
event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
|
||||||
|
_process_dmdx_reply, caddy);
|
||||||
|
event_priority_set(&caddy->ev, 0);
|
||||||
|
event_active(&caddy->ev, EV_WRITE, 1);
|
||||||
|
}
|
||||||
|
|
@ -58,246 +58,6 @@
|
|||||||
|
|
||||||
pmix_server_module_t pmix_host_server = {0};
|
pmix_server_module_t pmix_host_server = {0};
|
||||||
|
|
||||||
static void dmdx_cbfunc(pmix_status_t status, const char *data,
|
|
||||||
size_t ndata, void *cbdata,
|
|
||||||
pmix_release_cbfunc_t relfn, void *relcbdata);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
pmix_object_t super;
|
|
||||||
pmix_event_t ev;
|
|
||||||
pmix_status_t status;
|
|
||||||
const char *data;
|
|
||||||
size_t ndata;
|
|
||||||
pmix_dmdx_local_t *lcd;
|
|
||||||
pmix_release_cbfunc_t relcbfunc;
|
|
||||||
void *cbdata;
|
|
||||||
} pmix_dmdx_reply_caddy_t;
|
|
||||||
PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
|
|
||||||
pmix_object_t, NULL, NULL);
|
|
||||||
|
|
||||||
|
|
||||||
static void relfn(void *cbdata)
|
|
||||||
{
|
|
||||||
char *data = (char*)cbdata;
|
|
||||||
free(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank,
|
|
||||||
pmix_hash_table_t *ht,
|
|
||||||
pmix_modex_cbfunc_t cbfunc, void *cbdata)
|
|
||||||
{
|
|
||||||
pmix_status_t rc;
|
|
||||||
pmix_buffer_t pbkt, xfer;
|
|
||||||
pmix_value_t *val;
|
|
||||||
char *data;
|
|
||||||
size_t sz;
|
|
||||||
|
|
||||||
/* check to see if this data already has been
|
|
||||||
* obtained as a result of a prior direct modex request from
|
|
||||||
* another local peer */
|
|
||||||
rc = pmix_hash_fetch(ht, rank, "modex", &val);
|
|
||||||
if (PMIX_SUCCESS == rc && NULL != val) {
|
|
||||||
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
|
|
||||||
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
|
|
||||||
pmix_buffer_t *pxfer = &xfer;
|
|
||||||
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
|
|
||||||
pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
|
|
||||||
xfer.base_ptr = NULL;
|
|
||||||
xfer.bytes_used = 0;
|
|
||||||
PMIX_DESTRUCT(&xfer);
|
|
||||||
PMIX_VALUE_RELEASE(val);
|
|
||||||
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
|
|
||||||
PMIX_DESTRUCT(&pbkt);
|
|
||||||
/* pass it back */
|
|
||||||
cbfunc(rc, data, sz, cbdata, relfn, data);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
return PMIX_ERR_NOT_FOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank,
|
|
||||||
pmix_info_t *info, size_t ninfo,
|
|
||||||
pmix_modex_cbfunc_t cbfunc, void *cbdata)
|
|
||||||
{
|
|
||||||
pmix_dmdx_local_t *lcd = NULL, *cd;
|
|
||||||
pmix_rank_info_t *iptr;
|
|
||||||
pmix_hash_table_t *ht;
|
|
||||||
pmix_status_t rc;
|
|
||||||
|
|
||||||
/* 1. Try to satisfy the request right now */
|
|
||||||
|
|
||||||
/* by default we are looking for the remote data */
|
|
||||||
ht = &nptr->server->remote;
|
|
||||||
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
|
|
||||||
if (iptr->rank == rank) {
|
|
||||||
/* in case it is known local rank - check local table */
|
|
||||||
ht = &nptr->server->mylocal;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata);
|
|
||||||
if( PMIX_SUCCESS == rc ){
|
|
||||||
/* request was successfully satisfied */
|
|
||||||
PMIX_INFO_FREE(info, ninfo);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* 2. We were unable to satisfy request right now.
|
|
||||||
* Look for existing requests to this namespace/rank */
|
|
||||||
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
|
||||||
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
|
|
||||||
rank != cd->proc.rank ) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
lcd = cd;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* 3. If no requests exists then:
|
|
||||||
* - if all local clients are registered then we were called because
|
|
||||||
* the remote data was requested. Create request and call direct modex
|
|
||||||
* to retrieve the data
|
|
||||||
* - if not all local ranks were registered, we need to wait untill
|
|
||||||
* pmix_pending_localy_fin would be called to resolve this. Just add the
|
|
||||||
* request for now.
|
|
||||||
*/
|
|
||||||
if (NULL == lcd) {
|
|
||||||
lcd = PMIX_NEW(pmix_dmdx_local_t);
|
|
||||||
if (NULL == lcd){
|
|
||||||
PMIX_INFO_FREE(info, ninfo);
|
|
||||||
return PMIX_ERR_NOMEM;
|
|
||||||
}
|
|
||||||
strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
|
|
||||||
lcd->proc.rank = rank;
|
|
||||||
lcd->info = info;
|
|
||||||
lcd->ninfo = ninfo;
|
|
||||||
pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super);
|
|
||||||
|
|
||||||
/* check & send request if need/possible */
|
|
||||||
if (nptr->server->all_registered && NULL == info) {
|
|
||||||
if (NULL != pmix_host_server.direct_modex) {
|
|
||||||
pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd);
|
|
||||||
} else {
|
|
||||||
/* if we don't have direct modex feature, just respond with "not found" */
|
|
||||||
cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
|
|
||||||
PMIX_INFO_FREE(info, ninfo);
|
|
||||||
pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super);
|
|
||||||
PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
|
|
||||||
PMIX_RELEASE(lcd);
|
|
||||||
return PMIX_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t);
|
|
||||||
req->cbfunc = cbfunc;
|
|
||||||
req->cbdata = cbdata;
|
|
||||||
pmix_list_append(&lcd->loc_reqs, &req->super);
|
|
||||||
return PMIX_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void pmix_pending_nspace_fix(pmix_nspace_t *nptr)
|
|
||||||
{
|
|
||||||
pmix_dmdx_local_t *cd, *cd_next;
|
|
||||||
|
|
||||||
/* Now when we know all local ranks, go along request list and ask for remote data
|
|
||||||
* for the non-local ranks, and resolve all pending requests for local procs
|
|
||||||
* that were waiting for registration to complete
|
|
||||||
*/
|
|
||||||
PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
|
||||||
pmix_rank_info_t *info;
|
|
||||||
bool found = false;
|
|
||||||
|
|
||||||
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
|
|
||||||
if (info->rank == cd->proc.rank) {
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if not found - this is remote process and we need to send
|
|
||||||
* corresponding direct modex request */
|
|
||||||
if( !found ){
|
|
||||||
if( NULL != pmix_host_server.direct_modex ){
|
|
||||||
pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd);
|
|
||||||
} else {
|
|
||||||
pmix_dmdx_request_t *req, *req_next;
|
|
||||||
PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) {
|
|
||||||
req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL);
|
|
||||||
pmix_list_remove_item(&cd->loc_reqs, &req->super);
|
|
||||||
PMIX_RELEASE(req);
|
|
||||||
}
|
|
||||||
pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super);
|
|
||||||
PMIX_RELEASE(cd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Resolve pending requests to this namespace/rank */
|
|
||||||
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
|
|
||||||
pmix_status_t status, pmix_dmdx_local_t *lcd)
|
|
||||||
{
|
|
||||||
pmix_dmdx_local_t *cd;
|
|
||||||
|
|
||||||
/* find corresponding request (if exists) */
|
|
||||||
if( NULL == lcd ){
|
|
||||||
PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
|
|
||||||
if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ||
|
|
||||||
rank != cd->proc.rank) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
lcd = cd;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If somebody was interested in this rank */
|
|
||||||
if( NULL != lcd ){
|
|
||||||
pmix_dmdx_request_t *req;
|
|
||||||
|
|
||||||
if (PMIX_SUCCESS != status){
|
|
||||||
/* if we've got an error for this request - just forward it*/
|
|
||||||
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
|
|
||||||
/* if we can't satisfy this request - respond with error */
|
|
||||||
req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* if we've got the blob - try to satisfy requests */
|
|
||||||
pmix_hash_table_t *ht;
|
|
||||||
pmix_rank_info_t *iptr;
|
|
||||||
|
|
||||||
/* by default we are looking for the remote data */
|
|
||||||
ht = &nptr->server->remote;
|
|
||||||
/* check if this rank is local */
|
|
||||||
PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) {
|
|
||||||
if (iptr->rank == rank) {
|
|
||||||
ht = &nptr->server->mylocal;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* run through all the requests to this rank */
|
|
||||||
PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
|
|
||||||
pmix_status_t rc;
|
|
||||||
rc = _satisfy_request(nptr, rank, ht, req->cbfunc, req->cbdata);
|
|
||||||
if( PMIX_SUCCESS != rc ){
|
|
||||||
/* if we can't satisfy this particular request (missing key?) */
|
|
||||||
req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* remove all requests to this rank and cleanup the corresponding structure */
|
|
||||||
pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd);
|
|
||||||
PMIX_RELEASE(lcd);
|
|
||||||
}
|
|
||||||
return PMIX_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
|
pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf,
|
||||||
pmix_op_cbfunc_t cbfunc, void *cbdata)
|
pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||||
{
|
{
|
||||||
@ -436,13 +196,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
|
|||||||
* may not be a contribution */
|
* may not be a contribution */
|
||||||
if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, info->rank, "modex", &val) &&
|
if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, info->rank, "modex", &val) &&
|
||||||
NULL != val) {
|
NULL != val) {
|
||||||
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
|
PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes, val->data.bo.size);
|
||||||
PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size);
|
|
||||||
pmix_buffer_t *pxfer = &xfer;
|
|
||||||
pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
|
|
||||||
xfer.base_ptr = NULL;
|
|
||||||
xfer.bytes_used = 0;
|
|
||||||
PMIX_DESTRUCT(&xfer);
|
|
||||||
PMIX_VALUE_RELEASE(val);
|
PMIX_VALUE_RELEASE(val);
|
||||||
}
|
}
|
||||||
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
|
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
|
||||||
@ -457,7 +211,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
|
|||||||
PMIX_RELEASE(dcd);
|
PMIX_RELEASE(dcd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* see if anyone local is waiting on this data- could be more than one */
|
/* see if anyone local is waiting on this data - could be more than one */
|
||||||
return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL);
|
return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -826,163 +580,6 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd,
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _process_dmdx_reply(int fd, short args, void *cbdata)
|
|
||||||
{
|
|
||||||
pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata;
|
|
||||||
pmix_kval_t *kp;
|
|
||||||
pmix_nspace_t *ns, *nptr;
|
|
||||||
pmix_status_t rc;
|
|
||||||
|
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
|
||||||
"[%s:%d] queue dmdx reply from %s:%d",
|
|
||||||
__FILE__, __LINE__,
|
|
||||||
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
|
|
||||||
|
|
||||||
/* find the nspace object for this client */
|
|
||||||
nptr = NULL;
|
|
||||||
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
|
|
||||||
if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
|
|
||||||
nptr = ns;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == nptr) {
|
|
||||||
/* should be impossible */
|
|
||||||
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
|
|
||||||
caddy->status = PMIX_ERR_NOT_FOUND;
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (PMIX_SUCCESS == caddy->status) {
|
|
||||||
kp = PMIX_NEW(pmix_kval_t);
|
|
||||||
kp->key = strdup("modex");
|
|
||||||
PMIX_VALUE_CREATE(kp->value, 1);
|
|
||||||
kp->value->type = PMIX_BYTE_OBJECT;
|
|
||||||
/* we don't know if the host is going to save this data
|
|
||||||
* or not, so we have to copy it */
|
|
||||||
kp->value->data.bo.bytes = (char*)malloc(caddy->ndata);
|
|
||||||
memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
|
|
||||||
kp->value->data.bo.size = caddy->ndata;
|
|
||||||
/* store it in the appropriate hash */
|
|
||||||
if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) {
|
|
||||||
PMIX_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
PMIX_RELEASE(kp); // maintain acctg
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup:
|
|
||||||
/* always execute the callback to avoid having the client hang */
|
|
||||||
pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd);
|
|
||||||
|
|
||||||
/* now call the release function so the host server
|
|
||||||
* knows it can release the data */
|
|
||||||
if (NULL != caddy->relcbfunc) {
|
|
||||||
caddy->relcbfunc(caddy->cbdata);
|
|
||||||
}
|
|
||||||
PMIX_RELEASE(caddy);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmdx_cbfunc(pmix_status_t status,
|
|
||||||
const char *data, size_t ndata, void *cbdata,
|
|
||||||
pmix_release_cbfunc_t release_fn, void *release_cbdata)
|
|
||||||
{
|
|
||||||
pmix_dmdx_reply_caddy_t *caddy;
|
|
||||||
caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
|
|
||||||
caddy->status = status;
|
|
||||||
/* point to the callers cbfunc */
|
|
||||||
caddy->relcbfunc = release_fn;
|
|
||||||
caddy->cbdata = release_cbdata;
|
|
||||||
|
|
||||||
caddy->data = data;
|
|
||||||
caddy->ndata = ndata;
|
|
||||||
caddy->lcd = (pmix_dmdx_local_t *)cbdata;
|
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] queue dmdx reply %s:%d",
|
|
||||||
__FILE__, __LINE__,
|
|
||||||
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
|
|
||||||
event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
|
|
||||||
_process_dmdx_reply, caddy);
|
|
||||||
event_priority_set(&caddy->ev, 0);
|
|
||||||
event_active(&caddy->ev, EV_WRITE, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
pmix_status_t pmix_server_get(pmix_buffer_t *buf,
|
|
||||||
pmix_modex_cbfunc_t cbfunc,
|
|
||||||
void *cbdata)
|
|
||||||
{
|
|
||||||
int32_t cnt;
|
|
||||||
pmix_status_t rc;
|
|
||||||
int rank;
|
|
||||||
char *cptr;
|
|
||||||
char nspace[PMIX_MAX_NSLEN+1];
|
|
||||||
pmix_nspace_t *ns, *nptr;
|
|
||||||
pmix_info_t *info=NULL;
|
|
||||||
size_t ninfo=0;
|
|
||||||
|
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
|
||||||
"recvd GET");
|
|
||||||
|
|
||||||
/* setup */
|
|
||||||
memset(nspace, 0, sizeof(nspace));
|
|
||||||
|
|
||||||
/* retrieve the nspace and rank of the requested proc */
|
|
||||||
cnt = 1;
|
|
||||||
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) {
|
|
||||||
PMIX_ERROR_LOG(rc);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
(void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
|
|
||||||
free(cptr);
|
|
||||||
cnt = 1;
|
|
||||||
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) {
|
|
||||||
PMIX_ERROR_LOG(rc);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
/* find the nspace object for this client */
|
|
||||||
nptr = NULL;
|
|
||||||
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) {
|
|
||||||
if (0 == strcmp(nspace, ns->nspace)) {
|
|
||||||
nptr = ns;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
|
||||||
"%s:%d EXECUTE GET FOR %s:%d",
|
|
||||||
pmix_globals.myid.nspace,
|
|
||||||
pmix_globals.myid.rank, nspace, rank);
|
|
||||||
|
|
||||||
/* retrieve any provided info structs */
|
|
||||||
cnt = 1;
|
|
||||||
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) {
|
|
||||||
PMIX_ERROR_LOG(rc);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
if (0 < ninfo) {
|
|
||||||
PMIX_INFO_CREATE(info, ninfo);
|
|
||||||
cnt = ninfo;
|
|
||||||
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) {
|
|
||||||
PMIX_ERROR_LOG(rc);
|
|
||||||
PMIX_INFO_FREE(info, ninfo);
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == nptr) {
|
|
||||||
/* this is for an nspace we don't know about yet, so
|
|
||||||
* give the host server a chance to tell us about it */
|
|
||||||
nptr = PMIX_NEW(pmix_nspace_t);
|
|
||||||
(void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN);
|
|
||||||
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
|
|
||||||
}
|
|
||||||
/* if we don't have any ranks for this job, protect ourselves here */
|
|
||||||
if (NULL == nptr->server) {
|
|
||||||
nptr->server = PMIX_NEW(pmix_server_nspace_t);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pmix_pending_request(nptr, rank, info, ninfo, cbfunc, cbdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
pmix_status_t pmix_server_publish(pmix_peer_t *peer,
|
pmix_status_t pmix_server_publish(pmix_peer_t *peer,
|
||||||
pmix_buffer_t *buf,
|
pmix_buffer_t *buf,
|
||||||
pmix_op_cbfunc_t cbfunc, void *cbdata)
|
pmix_op_cbfunc_t cbfunc, void *cbdata)
|
||||||
|
@ -183,10 +183,7 @@ void pmix_stop_listening(void);
|
|||||||
|
|
||||||
bool pmix_server_trk_update(pmix_server_trkr_t *trk);
|
bool pmix_server_trk_update(pmix_server_trkr_t *trk);
|
||||||
|
|
||||||
pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank,
|
void pmix_pending_nspace_requests(pmix_nspace_t *nptr);
|
||||||
pmix_info_t *info, size_t ninfo,
|
|
||||||
pmix_modex_cbfunc_t cbfunc, void *cbdata);
|
|
||||||
void pmix_pending_nspace_fix(pmix_nspace_t *nptr);
|
|
||||||
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
|
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank,
|
||||||
pmix_status_t status, pmix_dmdx_local_t *lcd);
|
pmix_status_t status, pmix_dmdx_local_t *lcd);
|
||||||
|
|
||||||
|
@ -123,6 +123,8 @@ const char* PMIx_Error_string(pmix_status_t errnum)
|
|||||||
case PMIX_EXISTS:
|
case PMIX_EXISTS:
|
||||||
return "EXISTS";
|
return "EXISTS";
|
||||||
|
|
||||||
|
case PMIX_ERR_SILENT:
|
||||||
|
return "SILENT";
|
||||||
case PMIX_ERROR:
|
case PMIX_ERROR:
|
||||||
return "ERROR";
|
return "ERROR";
|
||||||
case PMIX_SUCCESS:
|
case PMIX_SUCCESS:
|
||||||
|
@ -28,9 +28,13 @@
|
|||||||
|
|
||||||
BEGIN_C_DECLS
|
BEGIN_C_DECLS
|
||||||
|
|
||||||
#define PMIX_ERROR_LOG(r) \
|
#define PMIX_ERROR_LOG(r) \
|
||||||
pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \
|
do { \
|
||||||
PMIx_Error_string((r)), __FILE__, __LINE__);
|
if (PMIX_ERR_SILENT != (r)) { \
|
||||||
|
pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \
|
||||||
|
PMIx_Error_string((r)), __FILE__, __LINE__); \
|
||||||
|
} \
|
||||||
|
}while(0);
|
||||||
|
|
||||||
#define PMIX_REPORT_ERROR(e) \
|
#define PMIX_REPORT_ERROR(e) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -217,6 +217,7 @@ int pmix1_store_local(const opal_process_name_t *proc, opal_value_t *val)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (NULL == job) {
|
if (NULL == job) {
|
||||||
|
OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
|
||||||
return OPAL_ERR_NOT_FOUND;
|
return OPAL_ERR_NOT_FOUND;
|
||||||
}
|
}
|
||||||
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);
|
||||||
|
@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t status, void *cbdata)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int pmix1_server_register_nspace(opal_jobid_t jobid,
|
int pmix1_server_register_nspace(opal_jobid_t jobid,
|
||||||
int nlocalprocs,
|
int nlocalprocs,
|
||||||
opal_list_t *info,
|
opal_list_t *info,
|
||||||
opal_pmix_op_cbfunc_t cbfunc,
|
opal_pmix_op_cbfunc_t cbfunc,
|
||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
opal_value_t *kv, *k2;
|
opal_value_t *kv, *k2;
|
||||||
pmix_info_t *pinfo, *pmap;
|
pmix_info_t *pinfo, *pmap;
|
||||||
@ -168,10 +168,17 @@ int pmix1_server_register_nspace(opal_jobid_t jobid,
|
|||||||
pmix_status_t rc;
|
pmix_status_t rc;
|
||||||
pmix1_opcaddy_t *op;
|
pmix1_opcaddy_t *op;
|
||||||
opal_list_t *pmapinfo;
|
opal_list_t *pmapinfo;
|
||||||
|
opal_pmix1_jobid_trkr_t *job;
|
||||||
|
|
||||||
/* convert the jobid */
|
/* convert the jobid */
|
||||||
(void)snprintf(nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(jobid));
|
(void)snprintf(nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(jobid));
|
||||||
|
|
||||||
|
/* store this job in our list of known nspaces */
|
||||||
|
job = OBJ_NEW(opal_pmix1_jobid_trkr_t);
|
||||||
|
(void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
|
||||||
|
job->jobid = jobid;
|
||||||
|
opal_list_append(&mca_pmix_pmix1xx_component.jobids, &job->super);
|
||||||
|
|
||||||
/* convert the list to an array of pmix_info_t */
|
/* convert the list to an array of pmix_info_t */
|
||||||
if (NULL != info) {
|
if (NULL != info) {
|
||||||
sz = opal_list_get_size(info);
|
sz = opal_list_get_size(info);
|
||||||
@ -220,10 +227,10 @@ int pmix1_server_register_nspace(opal_jobid_t jobid,
|
|||||||
|
|
||||||
|
|
||||||
int pmix1_server_register_client(const opal_process_name_t *proc,
|
int pmix1_server_register_client(const opal_process_name_t *proc,
|
||||||
uid_t uid, gid_t gid,
|
uid_t uid, gid_t gid,
|
||||||
void *server_object,
|
void *server_object,
|
||||||
opal_pmix_op_cbfunc_t cbfunc,
|
opal_pmix_op_cbfunc_t cbfunc,
|
||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
pmix_status_t rc;
|
pmix_status_t rc;
|
||||||
pmix1_opcaddy_t *op;
|
pmix1_opcaddy_t *op;
|
||||||
@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbd
|
|||||||
}
|
}
|
||||||
|
|
||||||
int pmix1_server_dmodex(const opal_process_name_t *proc,
|
int pmix1_server_dmodex(const opal_process_name_t *proc,
|
||||||
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
|
opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
|
||||||
{
|
{
|
||||||
pmix1_opcaddy_t *op;
|
pmix1_opcaddy_t *op;
|
||||||
pmix_status_t rc;
|
pmix_status_t rc;
|
||||||
|
@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
|
|||||||
int rc, ret, room_num;
|
int rc, ret, room_num;
|
||||||
int32_t cnt;
|
int32_t cnt;
|
||||||
opal_process_name_t target;
|
opal_process_name_t target;
|
||||||
opal_value_t kv;
|
|
||||||
pmix_server_req_t *req;
|
pmix_server_req_t *req;
|
||||||
uint8_t *data = NULL;
|
uint8_t *data = NULL;
|
||||||
int32_t ndata = 0;
|
int32_t ndata = 0;
|
||||||
@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we got something, store the blobs locally so we can
|
|
||||||
* meet any further requests without doing a remote fetch.
|
|
||||||
* This must be done as a single blob for later retrieval */
|
|
||||||
if (ORTE_SUCCESS == ret && NULL != data) {
|
|
||||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
|
||||||
kv.key = strdup("modex");
|
|
||||||
kv.type = OPAL_BYTE_OBJECT;
|
|
||||||
kv.data.bo.bytes = data;
|
|
||||||
kv.data.bo.size = ndata;
|
|
||||||
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&target, &kv))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
kv.data.bo.bytes = NULL; // protect the data
|
|
||||||
kv.data.bo.size = 0;
|
|
||||||
OBJ_DESTRUCT(&kv);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* check the request out of the tracking hotel */
|
/* check the request out of the tracking hotel */
|
||||||
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
|
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req);
|
||||||
/* return the returned data to the requestor */
|
/* return the returned data to the requestor */
|
||||||
if (NULL != req) {
|
if (NULL != req) {
|
||||||
if (NULL != req->mdxcbfunc) {
|
if (NULL != req->mdxcbfunc) {
|
||||||
req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, relcbfunc, data);
|
req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, relcbfunc, data);
|
||||||
|
} else {
|
||||||
|
free(data);
|
||||||
}
|
}
|
||||||
OBJ_RELEASE(req);
|
OBJ_RELEASE(req);
|
||||||
}
|
}
|
||||||
|
@ -197,6 +197,12 @@ static void dmodex_req(int sd, short args, void *cbdata)
|
|||||||
goto callback;
|
goto callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* if we are the host daemon, then this is a local request, so
|
||||||
|
* just wait for the data to come in */
|
||||||
|
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* construct a request message */
|
/* construct a request message */
|
||||||
buf = OBJ_NEW(opal_buffer_t);
|
buf = OBJ_NEW(opal_buffer_t);
|
||||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user