From 267ca8fcd3a59b780491d80d29e870061d8dac56 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 27 Oct 2015 11:01:49 -0700 Subject: [PATCH] Cleanup the PMIx direct modex support. Add an MCA parameter pmix_base_async_modex that will cause the async modex to be used when set to 1. Default it to 0 for now to continue current default behavior. Also add an MCA param pmix_base_collect_data to direct that the blocking fence shall return all data to each process. Obviously, this param has no effect if async_ modex is used. --- ompi/runtime/ompi_mpi_init.c | 7 +- opal/mca/pmix/base/pmix_base_frame.c | 11 +- opal/mca/pmix/pmix.h | 13 +- .../pmix/include/pmix/pmix_common.h.in | 3 +- .../pmix1xx/pmix/src/client/pmix_client_get.c | 1 + .../pmix/pmix1xx/pmix/src/server/Makefile.am | 3 +- .../pmix1xx/pmix/src/server/pmix_server.c | 11 +- .../pmix1xx/pmix/src/server/pmix_server_get.c | 552 ++++++++++++++++++ .../pmix1xx/pmix/src/server/pmix_server_ops.c | 407 +------------ .../pmix1xx/pmix/src/server/pmix_server_ops.h | 5 +- opal/mca/pmix/pmix1xx/pmix/src/util/error.c | 2 + opal/mca/pmix/pmix1xx/pmix/src/util/error.h | 10 +- opal/mca/pmix/pmix1xx/pmix1_client.c | 1 + opal/mca/pmix/pmix1xx/pmix1_server_south.c | 25 +- orte/orted/pmix/pmix_server.c | 20 +- orte/orted/pmix/pmix_server_fence.c | 6 + 16 files changed, 620 insertions(+), 457 deletions(-) create mode 100644 opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index 4c0391d970..d0eebb29df 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) /* exchange connection info - this function may also act as a barrier * if data exchange is required. The modex occurs solely across procs - * in our job, so no proc array is passed. If a barrier is required, - * the "modex" function will perform it internally - */ - OPAL_MODEX(NULL, 1); + * in our job. If a barrier is required, the "modex" function will + * perform it internally */ + OPAL_MODEX(); OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier")); diff --git a/opal/mca/pmix/base/pmix_base_frame.c b/opal/mca/pmix/base/pmix_base_frame.c index e1ab7666e1..6e8a347f10 100644 --- a/opal/mca/pmix/base/pmix_base_frame.c +++ b/opal/mca/pmix/base/pmix_base_frame.c @@ -31,12 +31,21 @@ /* Note that this initializer is important -- do not remove it! See https://github.com/open-mpi/ompi/issues/375 for details. */ opal_pmix_base_module_t opal_pmix = { 0 }; -bool opal_pmix_collect_all_data = false; +bool opal_pmix_collect_all_data = true; bool opal_pmix_base_allow_delayed_server = false; int opal_pmix_verbose_output = -1; +bool opal_pmix_base_async_modex = false; static int opal_pmix_base_frame_register(mca_base_register_flag_t flags) { + opal_pmix_base_async_modex = false; + (void) mca_base_var_register("opal", "pmix", "base", "async_modex", "Use asynchronous modex mode", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex); + opal_pmix_collect_all_data = true; + (void) mca_base_var_register("opal", "pmix", "base", "collect_data", "Collect all data during modex", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data); return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h index f265e0157f..722352980d 100644 --- a/opal/mca/pmix/pmix.h +++ b/opal/mca/pmix/pmix.h @@ -36,6 +36,8 @@ BEGIN_C_DECLS /* provide access to the framework verbose output without * exposing the entire base */ extern int opal_pmix_verbose_output; +extern bool opal_pmix_collect_all_data; +extern bool opal_pmix_base_async_modex; extern int opal_pmix_base_exchange(opal_value_t *info, opal_pmix_pdata_t *pdat, int timeout); @@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t *info, * that takes into account directives and availability of * non-blocking operations */ -#define OPAL_MODEX(p, s) \ - do { \ - opal_pmix.commit(); \ - opal_pmix.fence((p), (s)); \ +#define OPAL_MODEX() \ + do { \ + opal_pmix.commit(); \ + if (!opal_pmix_base_async_modex) { \ + opal_pmix.fence(NULL, \ + opal_pmix_collect_all_data); \ + } \ } while(0); /** diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in index 0216e34aa2..5a111a1c40 100644 --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in @@ -183,7 +183,7 @@ BEGIN_C_DECLS /**** PMIX ERROR CONSTANTS ****/ /* PMIx errors are always negative, with 0 reserved for success */ -#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries in enum +#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries in enum typedef enum { PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN, @@ -230,6 +230,7 @@ typedef enum { PMIX_ERR_INVALID_CRED, PMIX_EXISTS, + PMIX_ERR_SILENT, PMIX_ERROR, PMIX_SUCCESS } pmix_status_t; diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c index d41be9cbe3..b93ca6d77b 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c @@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, PMIX_RELEASE(bptr); // free's the data region if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { PMIX_ERROR_LOG(rc); + rc = PMIX_ERR_SILENT; // avoid error-logging twice break; } } diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am index 5422b780dc..88b0468e47 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am @@ -16,4 +16,5 @@ sources += \ src/server/pmix_server.c \ src/server/pmix_server_ops.c \ src/server/pmix_server_regex.c \ - src/server/pmix_server_listener.c + src/server/pmix_server_listener.c \ + src/server/pmix_server_get.c diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c index d16ae16212..85f9e17b54 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c @@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void *cbdata) pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata; pmix_usock_send_t *snd; pmix_output_verbose(2, pmix_globals.debug_output, - "[%s:%d] queue callback called: reply to %s:%d on tag %d," - "event_is_active=%d", + "[%s:%d] queue callback called: reply to %s:%d on tag %d", __FILE__, __LINE__, (queue->peer)->info->nptr->nspace, (queue->peer)->info->rank, (queue->tag), @@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, void *cbdata) queue->buf = (b); \ queue->tag = (t); \ pmix_output_verbose(2, pmix_globals.debug_output, \ - "[%s:%d] queue reply to %s:%d on tag %d," \ - "event_is_active=%d", \ + "[%s:%d] queue reply to %s:%d on tag %d", \ __FILE__, __LINE__, \ (queue->peer)->info->nptr->nspace, \ - (queue->peer)->info->rank, (queue->tag), \ - (queue->peer)->send_ev_active); \ + (queue->peer)->info->rank, (queue->tag)); \ event_assign(&queue->ev, pmix_globals.evbase, -1, \ EV_WRITE, _queue_message, queue); \ event_priority_set(&queue->ev, 0); \ @@ -723,7 +720,7 @@ static void _register_client(int sd, short args, void *cbdata) * someone has been waiting for a request on a remote proc * in one of our nspaces, but we didn't know all the local procs * and so couldn't determine the proc was remote */ - pmix_pending_nspace_fix(nptr); + pmix_pending_nspace_requests(nptr); } /* let the caller know we are done */ if (NULL != cd->opcbfunc) { diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c new file mode 100644 index 0000000000..2cb75cfd4f --- /dev/null +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c @@ -0,0 +1,552 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2014-2015 Artem Y. Polyakov . + * All rights reserved. + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include +#include + +#include +#include "src/include/pmix_globals.h" + +#ifdef HAVE_STRING_H +#include +#endif +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#ifdef HAVE_SYS_UN_H +#include +#endif +#ifdef HAVE_SYS_UIO_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include PMIX_EVENT_HEADER + +#include "src/class/pmix_list.h" +#include "src/buffer_ops/buffer_ops.h" +#include "src/util/argv.h" +#include "src/util/error.h" +#include "src/util/output.h" +#include "src/util/pmix_environ.h" +#include "src/util/progress_threads.h" +#include "src/usock/usock.h" +#include "src/sec/pmix_sec.h" + +#include "pmix_server_ops.h" + +extern pmix_server_module_t pmix_host_server; + +typedef struct { + pmix_object_t super; + pmix_event_t ev; + pmix_status_t status; + const char *data; + size_t ndata; + pmix_dmdx_local_t *lcd; + pmix_release_cbfunc_t relcbfunc; + void *cbdata; +} pmix_dmdx_reply_caddy_t; +static void dcd_con(pmix_dmdx_reply_caddy_t *p) +{ + p->status = PMIX_ERROR; + p->ndata = 0; + p->lcd = NULL; + p->relcbfunc = NULL; + p->cbdata = NULL; +} +PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, + pmix_object_t, dcd_con, NULL); + + +static void dmdx_cbfunc(pmix_status_t status, const char *data, + size_t ndata, void *cbdata, + pmix_release_cbfunc_t relfn, void *relcbdata); +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, + pmix_modex_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t create_local_tracker(char nspace[], int rank, + pmix_info_t info[], size_t ninfo, + pmix_modex_cbfunc_t cbfunc, + void *cbdata, + pmix_dmdx_local_t **lcd); + + +/* declare a function whose sole purpose is to + * free data that we provided to our host server + * when servicing dmodex requests */ +static void relfn(void *cbdata) +{ + char *data = (char*)cbdata; + free(data); +} + + +pmix_status_t pmix_server_get(pmix_buffer_t *buf, + pmix_modex_cbfunc_t cbfunc, + void *cbdata) +{ + int32_t cnt; + pmix_status_t rc; + int rank; + char *cptr; + char nspace[PMIX_MAX_NSLEN+1]; + pmix_nspace_t *ns, *nptr; + pmix_info_t *info=NULL; + size_t ninfo=0; + pmix_dmdx_local_t *lcd, *cd; + pmix_rank_info_t *iptr; + pmix_hash_table_t *ht; + bool local; + + pmix_output_verbose(2, pmix_globals.debug_output, + "recvd GET"); + + /* setup */ + memset(nspace, 0, sizeof(nspace)); + + /* retrieve the nspace and rank of the requested proc */ + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) { + PMIX_ERROR_LOG(rc); + return rc; + } + (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); + free(cptr); + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) { + PMIX_ERROR_LOG(rc); + return rc; + } + /* retrieve any provided info structs */ + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) { + PMIX_ERROR_LOG(rc); + return rc; + } + if (0 < ninfo) { + PMIX_INFO_CREATE(info, ninfo); + cnt = ninfo; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) { + PMIX_ERROR_LOG(rc); + PMIX_INFO_FREE(info, ninfo); + return rc; + } + } + + /* find the nspace object for this client */ + nptr = NULL; + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { + if (0 == strcmp(nspace, ns->nspace)) { + nptr = ns; + break; + } + } + + pmix_output_verbose(2, pmix_globals.debug_output, + "%s:%d EXECUTE GET FOR %s:%d", + pmix_globals.myid.nspace, + pmix_globals.myid.rank, nspace, rank); + + if (NULL == nptr || NULL == nptr->server) { + /* this is for an nspace we don't know about yet, so + * record the request for data from this process and + * give the host server a chance to tell us about it */ + rc = create_local_tracker(nspace, rank, info, ninfo, + cbfunc, cbdata, &lcd); + return rc; + } + + /* We have to wait for all local clients to be registered before + * we can know whether this request is for data from a local or a + * remote client because one client might ask for data about another + * client that the host RM hasn't told us about yet. Fortunately, + * we do know how many clients to expect, so first check to see if + * all clients have been registered with us */ + if (!nptr->server->all_registered) { + /* we cannot do anything further, so just track this request + * for now */ + rc = create_local_tracker(nspace, rank, info, ninfo, + cbfunc, cbdata, &lcd); + return rc; + } + + /* Since we know about all the local clients in this nspace, + * let's first try to satisfy the request with any available data. + * By default, we assume we are looking for data from a remote + * client, and then check to see if this is one of my local + * clients - if so, then we look in that hash table */ + ht = &nptr->server->remote; + local = false; + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { + if (iptr->rank == rank) { + /* it is known local client - check the local table */ + ht = &nptr->server->mylocal; + local = true; + break; + } + } + + /* see if we already have this data */ + rc = _satisfy_request(ht, rank, cbfunc, cbdata); + if( PMIX_SUCCESS == rc ){ + /* request was successfully satisfied */ + PMIX_INFO_FREE(info, ninfo); + return rc; + } + + /* If we get here, then we don't have the data at this time. Check + * to see if we already have a pending request for the data - if + * we do, then we can just wait for it to arrive */ + rc = create_local_tracker(nspace, rank, info, ninfo, + cbfunc, cbdata, &lcd); + if (PMIX_SUCCESS == rc) { + /* we are already waiting for the data - nothing more + * for us to do as the function added the new request + * to the tracker for us */ + return PMIX_SUCCESS; + } + if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) { + /* we have a problem - e.g., out of memory */ + return rc; + } + + /* Getting here means that we didn't already have a request for + * for data pending, and so we created a new tracker for this + * request. We know the identity of all our local clients, so + * if this is one, then we have nothing further to do - we will + * fulfill the request once the process commits its data */ + if (local) { + return PMIX_SUCCESS; + } + + /* this isn't a local client of ours, so we need to ask the host + * resource manager server to please get the info for us from + * whomever is hosting the target process */ + if (NULL != pmix_host_server.direct_modex) { + rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd); + } else { + /* if we don't have direct modex feature, just respond with "not found" */ + cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); + PMIX_INFO_FREE(info, ninfo); + pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super); + PMIX_LIST_DESTRUCT(&lcd->loc_reqs); + PMIX_RELEASE(lcd); + rc = PMIX_ERR_NOT_FOUND; + } + + return rc; +} + +static pmix_status_t create_local_tracker(char nspace[], int rank, + pmix_info_t info[], size_t ninfo, + pmix_modex_cbfunc_t cbfunc, + void *cbdata, + pmix_dmdx_local_t **ld) +{ + pmix_dmdx_local_t *lcd, *cd; + pmix_dmdx_request_t *req; + pmix_status_t rc; + + /* define default */ + *ld = NULL; + + /* see if we already have an existing request for data + * from this namespace/rank */ + lcd = NULL; + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { + if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || + rank != cd->proc.rank ) { + continue; + } + lcd = cd; + break; + } + if (NULL != lcd) { + /* we already have a request, so just track that someone + * else wants data from the same target */ + rc = PMIX_SUCCESS; // indicates we found an existing request + goto complete; + } + /* we do not have an existing request, so let's create + * one and add it to our list */ + lcd = PMIX_NEW(pmix_dmdx_local_t); + if (NULL == lcd){ + PMIX_INFO_FREE(info, ninfo); + return PMIX_ERR_NOMEM; + } + strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN); + lcd->proc.rank = rank; + lcd->info = info; + lcd->ninfo = ninfo; + pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); + rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new request tracker + + complete: + /* track this specific requestor so we return the + * data to them */ + req = PMIX_NEW(pmix_dmdx_request_t); + req->cbfunc = cbfunc; + req->cbdata = cbdata; + pmix_list_append(&lcd->loc_reqs, &req->super); + *ld = lcd; + return rc; +} + +void pmix_pending_nspace_requests(pmix_nspace_t *nptr) +{ + pmix_dmdx_local_t *cd, *cd_next; + + /* Now that we know all local ranks, go along request list and ask for remote data + * for the non-local ranks, and resolve all pending requests for local procs + * that were waiting for registration to complete + */ + PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { + pmix_rank_info_t *info; + bool found = false; + + if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) { + continue; + } + + PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { + if (info->rank == cd->proc.rank) { + found = true; // we will satisy this request upon commit from new proc + break; + } + } + + /* if not found - this is remote process and we need to send + * corresponding direct modex request */ + if( !found ){ + if( NULL != pmix_host_server.direct_modex ){ + pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd); + } else { + pmix_dmdx_request_t *req, *req_next; + PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) { + req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL); + pmix_list_remove_item(&cd->loc_reqs, &req->super); + PMIX_RELEASE(req); + } + pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super); + PMIX_RELEASE(cd); + } + } + } +} + +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, + pmix_modex_cbfunc_t cbfunc, void *cbdata) +{ + pmix_status_t rc; + pmix_value_t *val; + char *data; + size_t sz; + pmix_buffer_t xfer, pbkt, *xptr; + + /* check to see if this data already has been + * obtained as a result of a prior direct modex request from + * a remote peer, or due to data from a local client + * having been committed */ + rc = pmix_hash_fetch(ht, rank, "modex", &val); + if (PMIX_SUCCESS == rc && NULL != val) { + /* the client is expecting this to arrive as a byte object + * containing a buffer, so package it accordingly */ + PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); + PMIX_CONSTRUCT(&xfer, pmix_buffer_t); + xptr = &xfer; + PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); + pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER); + xfer.base_ptr = NULL; // protect the passed data + xfer.bytes_used = 0; + PMIX_DESTRUCT(&xfer); + PMIX_UNLOAD_BUFFER(&pbkt, data, sz); + PMIX_DESTRUCT(&pbkt); + PMIX_VALUE_RELEASE(val); + /* pass it back */ + cbfunc(rc, data, sz, cbdata, relfn, data); + return rc; + } + return PMIX_ERR_NOT_FOUND; +} + +/* Resolve pending requests to this namespace/rank */ +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, + pmix_status_t status, pmix_dmdx_local_t *lcd) +{ + pmix_dmdx_local_t *cd; + + /* find corresponding request (if exists) */ + if (NULL == lcd && NULL != nptr) { + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { + if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || + rank != cd->proc.rank) { + continue; + } + lcd = cd; + break; + } + } + + /* If somebody was interested in this rank */ + if (NULL != lcd) { + pmix_dmdx_request_t *req; + + if (PMIX_SUCCESS != status){ + /* if we've got an error for this request - just forward it*/ + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { + /* if we can't satisfy this request - respond with error */ + req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); + } + } else if (NULL != nptr) { + /* if we've got the blob - try to satisfy requests */ + pmix_hash_table_t *ht; + pmix_rank_info_t *iptr; + + /* by default we are looking for the remote data */ + ht = &nptr->server->remote; + /* check if this rank is local */ + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { + if (iptr->rank == rank) { + ht = &nptr->server->mylocal; + break; + } + } + + /* run through all the requests to this rank */ + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { + pmix_status_t rc; + rc = _satisfy_request(ht, rank, req->cbfunc, req->cbdata); + if( PMIX_SUCCESS != rc ){ + /* if we can't satisfy this particular request (missing key?) */ + req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); + } + } + } + /* remove all requests to this rank and cleanup the corresponding structure */ + pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd); + PMIX_RELEASE(lcd); + } + return PMIX_SUCCESS; +} + +/* process the returned data from the host RM server */ +static void _process_dmdx_reply(int fd, short args, void *cbdata) +{ + pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; + pmix_kval_t *kp; + pmix_nspace_t *ns, *nptr; + pmix_status_t rc; + pmix_buffer_t xfer, pbkt, *xptr; + + pmix_output_verbose(2, pmix_globals.debug_output, + "[%s:%d] process dmdx reply from %s:%d", + __FILE__, __LINE__, + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); + + /* find the nspace object for this client */ + nptr = NULL; + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { + if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { + nptr = ns; + break; + } + } + + if (NULL == nptr) { + /* should be impossible */ + PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); + caddy->status = PMIX_ERR_NOT_FOUND; + goto cleanup; + } + + /* if the request was successfully satisfied, then store the data + * in our hash table for remote procs. Although we could immediately + * resolve any outstanding requests on our tracking list, we instead + * store the data first so we can immediately satisfy any future + * requests. Then, rather than duplicate the resolve code here, we + * will let the pmix_pending_resolve function go ahead and retrieve + * it from the hash table */ + if (PMIX_SUCCESS == caddy->status) { + kp = PMIX_NEW(pmix_kval_t); + kp->key = strdup("modex"); + PMIX_VALUE_CREATE(kp->value, 1); + kp->value->type = PMIX_BYTE_OBJECT; + /* we don't know if the host is going to save this data + * or not, so we have to copy it - the client is expecting + * this to arrive as a byte object containing a buffer, so + * package it accordingly */ + kp->value->data.bo.bytes = malloc(caddy->ndata); + memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); + kp->value->data.bo.size = caddy->ndata; + /* store it in the appropriate hash */ + if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { + PMIX_ERROR_LOG(rc); + } + PMIX_RELEASE(kp); // maintain acctg + } + + /* always execute the callback to avoid having the client hang */ + pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd); + +cleanup: + /* now call the release function so the host server + * knows it can release the data */ + if (NULL != caddy->relcbfunc) { + caddy->relcbfunc(caddy->cbdata); + } + PMIX_RELEASE(caddy); +} + +/* this is the callback function that the host RM server will call + * when it gets requested info back from a remote server */ +static void dmdx_cbfunc(pmix_status_t status, + const char *data, size_t ndata, void *cbdata, + pmix_release_cbfunc_t release_fn, void *release_cbdata) +{ + pmix_dmdx_reply_caddy_t *caddy; + + /* because the host RM is calling us from their own thread, we + * need to thread-shift into our local progress thread before + * accessing any global info */ + caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); + caddy->status = status; + /* point to the callers cbfunc */ + caddy->relcbfunc = release_fn; + caddy->cbdata = release_cbdata; + + /* point to the returned data and our own internal + * tracker */ + caddy->data = data; + caddy->ndata = ndata; + caddy->lcd = (pmix_dmdx_local_t *)cbdata; + pmix_output_verbose(2, pmix_globals.debug_output, + "[%s:%d] queue dmdx reply for %s:%d", + __FILE__, __LINE__, + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); + event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, + _process_dmdx_reply, caddy); + event_priority_set(&caddy->ev, 0); + event_active(&caddy->ev, EV_WRITE, 1); +} + diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c index 4a4abd1074..43d35b5def 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c @@ -58,246 +58,6 @@ pmix_server_module_t pmix_host_server = {0}; -static void dmdx_cbfunc(pmix_status_t status, const char *data, - size_t ndata, void *cbdata, - pmix_release_cbfunc_t relfn, void *relcbdata); - -typedef struct { - pmix_object_t super; - pmix_event_t ev; - pmix_status_t status; - const char *data; - size_t ndata; - pmix_dmdx_local_t *lcd; - pmix_release_cbfunc_t relcbfunc; - void *cbdata; -} pmix_dmdx_reply_caddy_t; -PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, - pmix_object_t, NULL, NULL); - - -static void relfn(void *cbdata) -{ - char *data = (char*)cbdata; - free(data); -} - -static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank, - pmix_hash_table_t *ht, - pmix_modex_cbfunc_t cbfunc, void *cbdata) -{ - pmix_status_t rc; - pmix_buffer_t pbkt, xfer; - pmix_value_t *val; - char *data; - size_t sz; - - /* check to see if this data already has been - * obtained as a result of a prior direct modex request from - * another local peer */ - rc = pmix_hash_fetch(ht, rank, "modex", &val); - if (PMIX_SUCCESS == rc && NULL != val) { - PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); - pmix_buffer_t *pxfer = &xfer; - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); - xfer.base_ptr = NULL; - xfer.bytes_used = 0; - PMIX_DESTRUCT(&xfer); - PMIX_VALUE_RELEASE(val); - PMIX_UNLOAD_BUFFER(&pbkt, data, sz); - PMIX_DESTRUCT(&pbkt); - /* pass it back */ - cbfunc(rc, data, sz, cbdata, relfn, data); - return rc; - } - return PMIX_ERR_NOT_FOUND; -} - -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, - pmix_info_t *info, size_t ninfo, - pmix_modex_cbfunc_t cbfunc, void *cbdata) -{ - pmix_dmdx_local_t *lcd = NULL, *cd; - pmix_rank_info_t *iptr; - pmix_hash_table_t *ht; - pmix_status_t rc; - - /* 1. Try to satisfy the request right now */ - - /* by default we are looking for the remote data */ - ht = &nptr->server->remote; - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { - if (iptr->rank == rank) { - /* in case it is known local rank - check local table */ - ht = &nptr->server->mylocal; - break; - } - } - - rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata); - if( PMIX_SUCCESS == rc ){ - /* request was successfully satisfied */ - PMIX_INFO_FREE(info, ninfo); - return rc; - } - - /* 2. We were unable to satisfy request right now. - * Look for existing requests to this namespace/rank */ - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || - rank != cd->proc.rank ) { - continue; - } - lcd = cd; - break; - } - - /* 3. If no requests exists then: - * - if all local clients are registered then we were called because - * the remote data was requested. Create request and call direct modex - * to retrieve the data - * - if not all local ranks were registered, we need to wait untill - * pmix_pending_localy_fin would be called to resolve this. Just add the - * request for now. - */ - if (NULL == lcd) { - lcd = PMIX_NEW(pmix_dmdx_local_t); - if (NULL == lcd){ - PMIX_INFO_FREE(info, ninfo); - return PMIX_ERR_NOMEM; - } - strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN); - lcd->proc.rank = rank; - lcd->info = info; - lcd->ninfo = ninfo; - pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); - - /* check & send request if need/possible */ - if (nptr->server->all_registered && NULL == info) { - if (NULL != pmix_host_server.direct_modex) { - pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd); - } else { - /* if we don't have direct modex feature, just respond with "not found" */ - cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); - PMIX_INFO_FREE(info, ninfo); - pmix_list_remove_item(&pmix_server_globals.local_reqs, &lcd->super); - PMIX_LIST_DESTRUCT(&lcd->loc_reqs); - PMIX_RELEASE(lcd); - return PMIX_SUCCESS; - } - } - } - pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t); - req->cbfunc = cbfunc; - req->cbdata = cbdata; - pmix_list_append(&lcd->loc_reqs, &req->super); - return PMIX_SUCCESS; -} - -void pmix_pending_nspace_fix(pmix_nspace_t *nptr) -{ - pmix_dmdx_local_t *cd, *cd_next; - - /* Now when we know all local ranks, go along request list and ask for remote data - * for the non-local ranks, and resolve all pending requests for local procs - * that were waiting for registration to complete - */ - PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { - pmix_rank_info_t *info; - bool found = false; - - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) { - continue; - } - - PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { - if (info->rank == cd->proc.rank) { - found = true; - break; - } - } - - /* if not found - this is remote process and we need to send - * corresponding direct modex request */ - if( !found ){ - if( NULL != pmix_host_server.direct_modex ){ - pmix_host_server.direct_modex(&cd->proc, cd->info, cd->ninfo, dmdx_cbfunc, cd); - } else { - pmix_dmdx_request_t *req, *req_next; - PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, pmix_dmdx_request_t) { - req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, NULL, NULL); - pmix_list_remove_item(&cd->loc_reqs, &req->super); - PMIX_RELEASE(req); - } - pmix_list_remove_item(&pmix_server_globals.local_reqs, &cd->super); - PMIX_RELEASE(cd); - } - } - } -} - -/* Resolve pending requests to this namespace/rank */ -pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, - pmix_status_t status, pmix_dmdx_local_t *lcd) -{ - pmix_dmdx_local_t *cd; - - /* find corresponding request (if exists) */ - if( NULL == lcd ){ - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || - rank != cd->proc.rank) { - continue; - } - lcd = cd; - break; - } - } - - /* If somebody was interested in this rank */ - if( NULL != lcd ){ - pmix_dmdx_request_t *req; - - if (PMIX_SUCCESS != status){ - /* if we've got an error for this request - just forward it*/ - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { - /* if we can't satisfy this request - respond with error */ - req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); - } - } else { - /* if we've got the blob - try to satisfy requests */ - pmix_hash_table_t *ht; - pmix_rank_info_t *iptr; - - /* by default we are looking for the remote data */ - ht = &nptr->server->remote; - /* check if this rank is local */ - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { - if (iptr->rank == rank) { - ht = &nptr->server->mylocal; - break; - } - } - - /* run through all the requests to this rank */ - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { - pmix_status_t rc; - rc = _satisfy_request(nptr, rank, ht, req->cbfunc, req->cbdata); - if( PMIX_SUCCESS != rc ){ - /* if we can't satisfy this particular request (missing key?) */ - req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); - } - } - } - /* remove all requests to this rank and cleanup the corresponding structure */ - pmix_list_remove_item(&pmix_server_globals.local_reqs, (pmix_list_item_t*)lcd); - PMIX_RELEASE(lcd); - } - return PMIX_SUCCESS; -} - pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf, pmix_op_cbfunc_t cbfunc, void *cbdata) { @@ -436,13 +196,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf) * may not be a contribution */ if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, info->rank, "modex", &val) && NULL != val) { - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); - pmix_buffer_t *pxfer = &xfer; - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); - xfer.base_ptr = NULL; - xfer.bytes_used = 0; - PMIX_DESTRUCT(&xfer); + PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes, val->data.bo.size); PMIX_VALUE_RELEASE(val); } PMIX_UNLOAD_BUFFER(&pbkt, data, sz); @@ -457,7 +211,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf) PMIX_RELEASE(dcd); } } - /* see if anyone local is waiting on this data- could be more than one */ + /* see if anyone local is waiting on this data - could be more than one */ return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL); } @@ -826,163 +580,6 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t *cd, return rc; } -static void _process_dmdx_reply(int fd, short args, void *cbdata) -{ - pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; - pmix_kval_t *kp; - pmix_nspace_t *ns, *nptr; - pmix_status_t rc; - - pmix_output_verbose(2, pmix_globals.debug_output, - "[%s:%d] queue dmdx reply from %s:%d", - __FILE__, __LINE__, - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); - - /* find the nspace object for this client */ - nptr = NULL; - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { - if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { - nptr = ns; - break; - } - } - - if (NULL == nptr) { - /* should be impossible */ - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); - caddy->status = PMIX_ERR_NOT_FOUND; - goto cleanup; - } - - if (PMIX_SUCCESS == caddy->status) { - kp = PMIX_NEW(pmix_kval_t); - kp->key = strdup("modex"); - PMIX_VALUE_CREATE(kp->value, 1); - kp->value->type = PMIX_BYTE_OBJECT; - /* we don't know if the host is going to save this data - * or not, so we have to copy it */ - kp->value->data.bo.bytes = (char*)malloc(caddy->ndata); - memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); - kp->value->data.bo.size = caddy->ndata; - /* store it in the appropriate hash */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp); // maintain acctg - } - -cleanup: - /* always execute the callback to avoid having the client hang */ - pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd); - - /* now call the release function so the host server - * knows it can release the data */ - if (NULL != caddy->relcbfunc) { - caddy->relcbfunc(caddy->cbdata); - } - PMIX_RELEASE(caddy); -} - -static void dmdx_cbfunc(pmix_status_t status, - const char *data, size_t ndata, void *cbdata, - pmix_release_cbfunc_t release_fn, void *release_cbdata) -{ - pmix_dmdx_reply_caddy_t *caddy; - caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); - caddy->status = status; - /* point to the callers cbfunc */ - caddy->relcbfunc = release_fn; - caddy->cbdata = release_cbdata; - - caddy->data = data; - caddy->ndata = ndata; - caddy->lcd = (pmix_dmdx_local_t *)cbdata; - pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] queue dmdx reply %s:%d", - __FILE__, __LINE__, - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); - event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, - _process_dmdx_reply, caddy); - event_priority_set(&caddy->ev, 0); - event_active(&caddy->ev, EV_WRITE, 1); -} - -pmix_status_t pmix_server_get(pmix_buffer_t *buf, - pmix_modex_cbfunc_t cbfunc, - void *cbdata) -{ - int32_t cnt; - pmix_status_t rc; - int rank; - char *cptr; - char nspace[PMIX_MAX_NSLEN+1]; - pmix_nspace_t *ns, *nptr; - pmix_info_t *info=NULL; - size_t ninfo=0; - - pmix_output_verbose(2, pmix_globals.debug_output, - "recvd GET"); - - /* setup */ - memset(nspace, 0, sizeof(nspace)); - - /* retrieve the nspace and rank of the requested proc */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, PMIX_STRING))) { - PMIX_ERROR_LOG(rc); - return rc; - } - (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); - free(cptr); - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, PMIX_INT))) { - PMIX_ERROR_LOG(rc); - return rc; - } - /* find the nspace object for this client */ - nptr = NULL; - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { - if (0 == strcmp(nspace, ns->nspace)) { - nptr = ns; - break; - } - } - - pmix_output_verbose(2, pmix_globals.debug_output, - "%s:%d EXECUTE GET FOR %s:%d", - pmix_globals.myid.nspace, - pmix_globals.myid.rank, nspace, rank); - - /* retrieve any provided info structs */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, PMIX_SIZE))) { - PMIX_ERROR_LOG(rc); - return rc; - } - if (0 < ninfo) { - PMIX_INFO_CREATE(info, ninfo); - cnt = ninfo; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, PMIX_INFO))) { - PMIX_ERROR_LOG(rc); - PMIX_INFO_FREE(info, ninfo); - return rc; - } - } - - if (NULL == nptr) { - /* this is for an nspace we don't know about yet, so - * give the host server a chance to tell us about it */ - nptr = PMIX_NEW(pmix_nspace_t); - (void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN); - pmix_list_append(&pmix_globals.nspaces, &nptr->super); - } - /* if we don't have any ranks for this job, protect ourselves here */ - if (NULL == nptr->server) { - nptr->server = PMIX_NEW(pmix_server_nspace_t); - } - - return pmix_pending_request(nptr, rank, info, ninfo, cbfunc, cbdata); -} - pmix_status_t pmix_server_publish(pmix_peer_t *peer, pmix_buffer_t *buf, pmix_op_cbfunc_t cbfunc, void *cbdata) diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h index c6279d5392..9129b6bbbd 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h @@ -183,10 +183,7 @@ void pmix_stop_listening(void); bool pmix_server_trk_update(pmix_server_trkr_t *trk); -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, - pmix_info_t *info, size_t ninfo, - pmix_modex_cbfunc_t cbfunc, void *cbdata); -void pmix_pending_nspace_fix(pmix_nspace_t *nptr); +void pmix_pending_nspace_requests(pmix_nspace_t *nptr); pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, pmix_status_t status, pmix_dmdx_local_t *lcd); diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c index 8cc4bcd978..90c42edb66 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c @@ -123,6 +123,8 @@ const char* PMIx_Error_string(pmix_status_t errnum) case PMIX_EXISTS: return "EXISTS"; + case PMIX_ERR_SILENT: + return "SILENT"; case PMIX_ERROR: return "ERROR"; case PMIX_SUCCESS: diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h index f72227aedc..e43ac47bd9 100644 --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h @@ -28,9 +28,13 @@ BEGIN_C_DECLS -#define PMIX_ERROR_LOG(r) \ - pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ - PMIx_Error_string((r)), __FILE__, __LINE__); +#define PMIX_ERROR_LOG(r) \ + do { \ + if (PMIX_ERR_SILENT != (r)) { \ + pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ + PMIx_Error_string((r)), __FILE__, __LINE__); \ + } \ + }while(0); #define PMIX_REPORT_ERROR(e) \ do { \ diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c b/opal/mca/pmix/pmix1xx/pmix1_client.c index f1ba0d5891..e9c50b7a7e 100644 --- a/opal/mca/pmix/pmix1xx/pmix1_client.c +++ b/opal/mca/pmix/pmix1xx/pmix1_client.c @@ -217,6 +217,7 @@ int pmix1_store_local(const opal_process_name_t *proc, opal_value_t *val) } } if (NULL == job) { + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); return OPAL_ERR_NOT_FOUND; } (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); diff --git a/opal/mca/pmix/pmix1xx/pmix1_server_south.c b/opal/mca/pmix/pmix1xx/pmix1_server_south.c index ae42de0082..f0d0f11d49 100644 --- a/opal/mca/pmix/pmix1xx/pmix1_server_south.c +++ b/opal/mca/pmix/pmix1xx/pmix1_server_south.c @@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t status, void *cbdata) } int pmix1_server_register_nspace(opal_jobid_t jobid, - int nlocalprocs, - opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, - void *cbdata) + int nlocalprocs, + opal_list_t *info, + opal_pmix_op_cbfunc_t cbfunc, + void *cbdata) { opal_value_t *kv, *k2; pmix_info_t *pinfo, *pmap; @@ -168,10 +168,17 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, pmix_status_t rc; pmix1_opcaddy_t *op; opal_list_t *pmapinfo; + opal_pmix1_jobid_trkr_t *job; /* convert the jobid */ (void)snprintf(nspace, PMIX_MAX_NSLEN, opal_convert_jobid_to_string(jobid)); + /* store this job in our list of known nspaces */ + job = OBJ_NEW(opal_pmix1_jobid_trkr_t); + (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN); + job->jobid = jobid; + opal_list_append(&mca_pmix_pmix1xx_component.jobids, &job->super); + /* convert the list to an array of pmix_info_t */ if (NULL != info) { sz = opal_list_get_size(info); @@ -220,10 +227,10 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, int pmix1_server_register_client(const opal_process_name_t *proc, - uid_t uid, gid_t gid, - void *server_object, - opal_pmix_op_cbfunc_t cbfunc, - void *cbdata) + uid_t uid, gid_t gid, + void *server_object, + opal_pmix_op_cbfunc_t cbfunc, + void *cbdata) { pmix_status_t rc; pmix1_opcaddy_t *op; @@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t status, char *data, size_t sz, void *cbd } int pmix1_server_dmodex(const opal_process_name_t *proc, - opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) + opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) { pmix1_opcaddy_t *op; pmix_status_t rc; diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index 953145d9aa..ee5582c4b8 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, int rc, ret, room_num; int32_t cnt; opal_process_name_t target; - opal_value_t kv; pmix_server_req_t *req; uint8_t *data = NULL; int32_t ndata = 0; @@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, return; } - /* if we got something, store the blobs locally so we can - * meet any further requests without doing a remote fetch. - * This must be done as a single blob for later retrieval */ - if (ORTE_SUCCESS == ret && NULL != data) { - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup("modex"); - kv.type = OPAL_BYTE_OBJECT; - kv.data.bo.bytes = data; - kv.data.bo.size = ndata; - if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&target, &kv))) { - ORTE_ERROR_LOG(rc); - } - kv.data.bo.bytes = NULL; // protect the data - kv.data.bo.size = 0; - OBJ_DESTRUCT(&kv); - } - /* check the request out of the tracking hotel */ opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, room_num, (void**)&req); /* return the returned data to the requestor */ if (NULL != req) { if (NULL != req->mdxcbfunc) { req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, relcbfunc, data); + } else { + free(data); } OBJ_RELEASE(req); } diff --git a/orte/orted/pmix/pmix_server_fence.c b/orte/orted/pmix/pmix_server_fence.c index b3b0e33059..765c1c2f77 100644 --- a/orte/orted/pmix/pmix_server_fence.c +++ b/orte/orted/pmix/pmix_server_fence.c @@ -197,6 +197,12 @@ static void dmodex_req(int sd, short args, void *cbdata) goto callback; } + /* if we are the host daemon, then this is a local request, so + * just wait for the data to come in */ + if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) { + return; + } + /* construct a request message */ buf = OBJ_NEW(opal_buffer_t); if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, OPAL_NAME))) {