From 691237801bc1d99ec3f7abebfad6a5c5072eee1e Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 13 Sep 2017 10:21:44 -0700 Subject: [PATCH] Update to track PMIx master Signed-off-by: Ralph Castain --- opal/mca/pmix/pmix2x/pmix/VERSION | 4 +- .../pmix2x/pmix/src/include/pmix_globals.c | 5 - .../pmix2x/pmix/src/include/pmix_globals.h | 2 - .../pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c | 395 +++--------------- .../pmix2x/pmix/src/mca/gds/hash/gds_hash.c | 299 ++++--------- 5 files changed, 128 insertions(+), 577 deletions(-) diff --git a/opal/mca/pmix/pmix2x/pmix/VERSION b/opal/mca/pmix/pmix2x/pmix/VERSION index 132056d535..987eeb5e9c 100644 --- a/opal/mca/pmix/pmix2x/pmix/VERSION +++ b/opal/mca/pmix/pmix2x/pmix/VERSION @@ -30,7 +30,7 @@ greek= # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=git1154ce3 +repo_rev=gitdcf4faf # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Sep 12, 2017" +date="Sep 13, 2017" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.c b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.c index d25d409c91..15d56e6268 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.c +++ b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.c @@ -57,8 +57,6 @@ static void nscon(pmix_nspace_t *p) p->nspace = NULL; p->nlocalprocs = 0; p->all_registered = false; - p->jobinfo = NULL; - p->njobinfo = 0; p->jobbkt = NULL; p->ndelivered = 0; PMIX_CONSTRUCT(&p->ranks, pmix_list_t); @@ -69,9 +67,6 @@ static void nsdes(pmix_nspace_t *p) if (NULL != p->nspace) { free(p->nspace); } - if (NULL != p->jobinfo) { - PMIX_INFO_FREE(p->jobinfo, p->njobinfo); - } if (NULL != p->jobbkt) { PMIX_RELEASE(p->jobbkt); } diff --git a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h index b3fb29c3e7..36af6e2d92 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h @@ -125,8 +125,6 @@ typedef struct { char *nspace; size_t nlocalprocs; bool all_registered; // all local ranks have been defined - pmix_info_t *jobinfo; // copy of the job-level info to be delivered to each proc - size_t njobinfo; pmix_buffer_t *jobbkt; // packed version of jobinfo size_t ndelivered; // count of #local clients that have received the jobinfo pmix_list_t ranks; // list of pmix_rank_info_t for connection support of my clients diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c index 62ce4b103e..999ed64b58 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c @@ -223,10 +223,6 @@ static inline void _esh_sessions_cleanup(void); static inline void _esh_ns_map_cleanup(void); static inline int _esh_dir_del(const char *dirname); -static inline int _collect_key_for_rank(pmix_peer_t *peer, pmix_rank_t rank, pmix_kval_t *kv); -static inline int _collected_key_dstore_store(pmix_nspace_t *nptr); -static inline pmix_status_t store_map(pmix_peer_t *peer, char **nodes, char **ppn); - static inline int _my_client(const char *nspace, pmix_rank_t rank); static pmix_status_t dstore_init(pmix_info_t info[], size_t ninfo); @@ -291,8 +287,6 @@ pmix_gds_base_module_t pmix_ds12_module = { .del_nspace = dstore_del_nspace, }; -static pmix_value_array_t *rank_kv_bufs = NULL; - static char *_base_path = NULL; static size_t _initial_segment_size = 0; static size_t _max_ns_num; @@ -910,7 +904,11 @@ static inline void _esh_session_release(session_t *s) } _delete_sm_desc(s->sm_seg_first); - close(s->lockfd); + /* if the lock fd was somehow set, then we + * need to close it */ + if (0 != s->lockfd) { + close(s->lockfd); + } if (NULL != s->lockfile) { if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { @@ -2944,256 +2942,52 @@ static pmix_status_t dstore_store_modex(struct pmix_nspace_t *nspace, return rc; } -static inline int _collect_key_for_rank(pmix_peer_t *peer, pmix_rank_t rank, pmix_kval_t *kv) +static pmix_status_t _store_job_info(pmix_proc_t *proc) { + pmix_cb_t cb; + pmix_kval_t *kv; + pmix_buffer_t buf; + pmix_kval_t *kv2 = NULL; pmix_status_t rc = PMIX_SUCCESS; - uint32_t i, size; - pmix_buffer_t *tmp = NULL; - pmix_rank_t cur_rank; - if (NULL == rank_kv_bufs) { - rank_kv_bufs = PMIX_NEW(pmix_value_array_t); - if (PMIX_SUCCESS != (rc = pmix_value_array_init(rank_kv_bufs, sizeof(pmix_buffer_t)))) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - /* rank WILDCARD contained in the 0 item */ - cur_rank = PMIX_RANK_WILDCARD == rank ? 0 : rank + 1; - size = (uint32_t)pmix_value_array_get_size(rank_kv_bufs); + kv2 = PMIX_NEW(pmix_kval_t); + PMIX_VALUE_CREATE(kv2->value, 1); + kv2->value->type = PMIX_BYTE_OBJECT; - if ((cur_rank + 1) <= size) { - tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, cur_rank)); - PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, tmp, kv, 1, PMIX_KVAL); - return rc; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(rank_kv_bufs, cur_rank + 1))) { + PMIX_CONSTRUCT(&buf, pmix_buffer_t); + PMIX_CONSTRUCT(&cb, pmix_cb_t); + cb.proc = proc; + cb.scope = PMIX_INTERNAL; + cb.copy = false; + + PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb); + if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); - return rc; - } - for (i = size; i < (cur_rank + 1); i++) { - tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i)); - PMIX_CONSTRUCT(tmp, pmix_buffer_t); - } - PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, tmp, kv, 1, PMIX_KVAL); - - return rc; -} - -static inline int _collected_key_dstore_store(pmix_nspace_t *nptr) -{ - int rc = PMIX_SUCCESS; - uint32_t i, size; - pmix_buffer_t *tmp; - pmix_rank_t rank; - pmix_kval_t *kv = NULL; - - if (NULL == rank_kv_bufs) { goto exit; } - kv = PMIX_NEW(pmix_kval_t); - PMIX_VALUE_CREATE(kv->value, 1); - kv->value->type = PMIX_BYTE_OBJECT; - size = pmix_value_array_get_size(rank_kv_bufs); - for (i = 0; i < size; i++) { - tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i)); - rank = 0 == i ? PMIX_RANK_WILDCARD : i - 1; - PMIX_UNLOAD_BUFFER(tmp, kv->value->data.bo.bytes, kv->value->data.bo.size); - if (PMIX_SUCCESS != (rc = _dstore_store(nptr->nspace, rank, kv))) { + PMIX_CONSTRUCT(&buf, pmix_buffer_t); + PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) { + PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL); + if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); goto exit; } } + PMIX_UNLOAD_BUFFER(&buf, kv2->value->data.bo.bytes, kv2->value->data.bo.size); + + if (PMIX_SUCCESS != (rc = _dstore_store(proc->nspace, proc->rank, kv2))) { + PMIX_ERROR_LOG(rc); + goto exit; + } exit: - if (NULL != kv) { - PMIX_RELEASE(kv); - } - if (NULL != rank_kv_bufs) { - size_t size = pmix_value_array_get_size(rank_kv_bufs); - size_t i; - for (i = 0; i < size; i++) { - pmix_buffer_t *tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i)); - PMIX_DESTRUCT(tmp); - } - PMIX_RELEASE(rank_kv_bufs); - rank_kv_bufs = NULL; - } + PMIX_RELEASE(kv2); + PMIX_DESTRUCT(&cb); + PMIX_DESTRUCT(&buf); return rc; } -static inline pmix_status_t store_map(pmix_peer_t *peer, - char **nodes, char **ppn) -{ - pmix_status_t rc; - pmix_value_t *val; - size_t m, n; - pmix_info_t *iptr, *info; - pmix_rank_t rank; - bool updated; - pmix_kval_t *kp2; - char **procs; - pmix_proc_t proc; - pmix_cb_t cb; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%d] gds:dstore:store_map", - pmix_globals.myid.nspace, pmix_globals.myid.rank); - - /* if the lists don't match, then that's wrong */ - if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - return PMIX_ERR_BAD_PARAM; - } - - for (n=0; NULL != nodes[n]; n++) { - /* check and see if we already have data for this node */ - val = NULL; - proc.rank = PMIX_RANK_WILDCARD; - (void)strncpy(proc.nspace, peer->nptr->nspace, PMIX_MAX_NSLEN); - PMIX_CONSTRUCT(&cb, pmix_cb_t); - cb.proc = &proc; - cb.scope = PMIX_INTERNAL; - cb.copy = true; // ??? - PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb); - if (PMIX_SUCCESS == rc && 1 == pmix_list_get_size(&cb.kvs)) { - kp2 = (pmix_kval_t*)pmix_list_get_first(&cb.kvs); - val = kp2->value; - //kp2->value = NULL; // protect the value - /* already have some data. See if we have the list of local peers */ - if (PMIX_DATA_ARRAY != val->type || - NULL == val->data.darray || - PMIX_INFO != val->data.darray->type || - 0 == val->data.darray->size) { - /* something is wrong */ - PMIX_VALUE_RELEASE(val); - PMIX_ERROR_LOG(PMIX_ERR_INVALID_VAL); - return PMIX_ERR_INVALID_VAL; - } - iptr = (pmix_info_t*)val->data.darray->array; - updated = false; - for (m=0; m < val->data.darray->size; m++) { - if (0 == strncmp(iptr[m].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN)) { - /* we will update this entry */ - if (NULL != iptr[m].value.data.string) { - free(iptr[m].value.data.string); - } - iptr[m].value.data.string = strdup(ppn[n]); - updated = true; - break; - } - } - if (!updated) { - /* append this entry to the current data */ - kp2 = PMIX_NEW(pmix_kval_t); - if (NULL == kp2) { - return PMIX_ERR_NOMEM; - } - kp2->key = strdup(nodes[n]); - kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - if (NULL == kp2->value) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - kp2->value->type = PMIX_DATA_ARRAY; - kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); - if (NULL == kp2->value->data.darray) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - kp2->value->data.darray->type = PMIX_INFO; - kp2->value->data.darray->size = val->data.darray->size + 1; - PMIX_INFO_CREATE(info, kp2->value->data.darray->size); - if (NULL == info) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - /* copy the pre-existing data across */ - for (m=0; m < val->data.darray->size; m++) { - PMIX_INFO_XFER(&info[m], &iptr[m]); - } - PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING); - kp2->value->data.darray->array = info; - - if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - return rc; - } - PMIX_RELEASE(kp2); - } - } else { - /* store the list as-is */ - kp2 = PMIX_NEW(pmix_kval_t); - if (NULL == kp2) { - return PMIX_ERR_NOMEM; - } - kp2->key = strdup(nodes[n]); - kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - if (NULL == kp2->value) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - kp2->value->type = PMIX_DATA_ARRAY; - kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); - if (NULL == kp2->value->data.darray) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - kp2->value->data.darray->type = PMIX_INFO; - PMIX_INFO_CREATE(info, 1); - if (NULL == info) { - PMIX_RELEASE(kp2); - return PMIX_ERR_NOMEM; - } - PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING); - kp2->value->data.darray->array = info; - kp2->value->data.darray->size = 1; - if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - return rc; - } - PMIX_RELEASE(kp2); - } - /* split the list of procs so we can store their - * individual location data */ - procs = pmix_argv_split(ppn[n], ','); - for (m=0; NULL != procs[m]; m++) { - /* store the hostname for each proc */ - kp2 = PMIX_NEW(pmix_kval_t); - kp2->key = strdup(PMIX_HOSTNAME); - kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - kp2->value->type = PMIX_STRING; - kp2->value->data.string = strdup(nodes[n]); - rank = strtol(procs[m], NULL, 10); - if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, rank, kp2))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - pmix_argv_free(procs); - return rc; - } - PMIX_RELEASE(kp2); - } - pmix_argv_free(procs); - } - - /* store the comma-delimited list of nodes hosting - * procs in this nspace */ - kp2 = PMIX_NEW(pmix_kval_t); - kp2->key = strdup(PMIX_NODE_LIST); - kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - kp2->value->type = PMIX_STRING; - kp2->value->data.string = pmix_argv_join(nodes, ','); - if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - return rc; - } - return PMIX_SUCCESS; -} - static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, pmix_buffer_t *reply) { @@ -3201,12 +2995,8 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, pmix_nspace_t *ns = peer->nptr; char *msg; pmix_status_t rc; - size_t j, n, size, len; - pmix_info_t *iptr; - pmix_rank_t rank; - pmix_kval_t *kp2; - uint8_t *tmp; - char **nodes=NULL, **procs=NULL; + pmix_proc_t proc; + pmix_rank_info_t *rinfo; pmix_output_verbose(2, pmix_gds_base_framework.framework_output, "[%s:%d] gds:dstore:register_job_info for peer [%s:%d]", @@ -3214,109 +3004,22 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, peer->info->pname.nspace, peer->info->pname.rank); if (0 == ns->ndelivered) { // don't store twice - for (n=0; n < ns->njobinfo; n++) { - if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_DATA)) { + (void)strncpy(proc.nspace, ns->nspace, PMIX_MAX_NSLEN); + proc.rank = PMIX_RANK_WILDCARD; + rc = _store_job_info(&proc); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return rc; + } - - if (PMIX_DATA_ARRAY != ns->jobinfo[n].value.type) { - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - size = ns->jobinfo[n].value.data.darray->size; - iptr = (pmix_info_t*)ns->jobinfo[n].value.data.darray->array; - /* first element of the array must be the rank */ - if (0 != strcmp(iptr[0].key, PMIX_RANK) || - PMIX_PROC_RANK != iptr[0].value.type) { - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - rank = iptr[0].value.data.rank; - /* cycle thru the values for this rank and store them */ - for (j=1; j < size; j++) { - kp2 = PMIX_NEW(pmix_kval_t); - if (NULL == kp2) { - rc = PMIX_ERR_NOMEM; - return rc; - } - kp2->key = strdup(iptr[j].key); - PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - return rc; - } - /* if the value contains a string that is longer than the - * limit, then compress it */ - if (PMIX_STRING_SIZE_CHECK(kp2->value)) { - if (pmix_util_compress_string(kp2->value->data.string, &tmp, &len)) { - if (NULL == tmp) { - PMIX_ERROR_LOG(PMIX_ERR_NOMEM); - rc = PMIX_ERR_NOMEM; - return rc; - } - kp2->value->type = PMIX_COMPRESSED_STRING; - free(kp2->value->data.string); - kp2->value->data.bo.bytes = (char*)tmp; - kp2->value->data.bo.size = len; - } - } - /* store it in the tmp buf */ - if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, rank, kp2))) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kp2); - return rc; - } - PMIX_RELEASE(kp2); // maintain acctg - } - } else if (0 == strcmp(ns->jobinfo[n].key, PMIX_NODE_MAP)) { - /* parse the regex to get the argv array of node names */ - if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(ns->jobinfo[n].value.data.string, &nodes))) { - PMIX_ERROR_LOG(rc); - return rc; - } - /* if we have already found the proc map, then parse - * and store the detailed map */ - if (NULL != procs) { - if (PMIX_SUCCESS != (rc = store_map(peer, nodes, procs))) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - } else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_MAP)) { - /* parse the regex to get the argv array containing proc ranks on each node */ - if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(ns->jobinfo[n].value.data.string, &procs))) { - PMIX_ERROR_LOG(rc); - return rc; - } - /* if we have already recv'd the node map, then parse - * and store the detailed map */ - if (NULL != nodes) { - if (PMIX_SUCCESS != (rc = store_map(peer, nodes, procs))) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - } else { - pmix_kval_t *kv = PMIX_NEW(pmix_kval_t); - PMIX_VALUE_CREATE(kv->value, 1); - kv->key = strdup(ns->jobinfo[n].key); - PMIX_VALUE_XFER(rc, kv->value, &ns->jobinfo[n].value); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kv); - return rc; - } - if ( PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kv))) { - PMIX_RELEASE(kv); - PMIX_ERROR_LOG(rc); - return rc; - } + PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) { + proc.rank = rinfo->pname.rank; + rc = _store_job_info(&proc); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return rc; } } - /* store all keys in thr dstore */ - _collected_key_dstore_store(ns); } /* answer to client */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/hash/gds_hash.c b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/hash/gds_hash.c index 98f267850d..1421e39988 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/hash/gds_hash.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/hash/gds_hash.c @@ -391,19 +391,6 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, return PMIX_SUCCESS; } - /* this is duplicative, but for now, we copy the data to the nspace - * jobinfo array as well as cache it internally so we can look it - * up if required. We will later figure out a way to reconstruct - * the jobinfo array when required */ - PMIX_INFO_CREATE(nptr->jobinfo, ninfo); - nptr->njobinfo = ninfo; - for (n=0; n < ninfo; n++) { - (void)strncpy(nptr->jobinfo[n].key, info[n].key, PMIX_MAX_KEYLEN); - PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, - &nptr->jobinfo[n].value, - &info[n].value); - } - /* cache the job info on the internal hash table for this nspace */ ht = &trk->internal; for (n=0; n < ninfo; n++) { @@ -558,219 +545,95 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, return rc; } -/* we need to pass three things to the client: - * - * (a) the list of nodes involved in this nspace - * - * (b) the hostname for each proc in this nspace - * - * (c) the list of procs on each node for reverse lookup - */ -static pmix_status_t pmix_pack_proc_map(struct pmix_peer_t *pr, - pmix_buffer_t *buf, - char **nodes, char **procs) -{ - pmix_peer_t *peer = (pmix_peer_t*)pr; - pmix_kval_t kv; - pmix_value_t val; - pmix_status_t rc; - pmix_buffer_t buf2; - size_t i, nnodes; - - /* bozo check - need procs for each node */ - if (pmix_argv_count(nodes) != pmix_argv_count(procs)) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - return PMIX_ERR_BAD_PARAM; - } - - PMIX_CONSTRUCT(&buf2, pmix_buffer_t); - PMIX_CONSTRUCT(&kv, pmix_kval_t); - kv.value = &val; - - /* pass the number of nodes involved in this namespace */ - nnodes = pmix_argv_count(nodes); - PMIX_BFROPS_PACK(rc, peer, &buf2, &nnodes, 1, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - - for (i=0; i < nnodes; i++) { - /* pass the complete list of procs on this node */ - kv.key = nodes[i]; - val.type = PMIX_STRING; - val.data.string = procs[i]; - PMIX_BFROPS_PACK(rc, peer, &buf2, &kv, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - kv.key = NULL; - val.data.string = NULL; - goto cleanup; - } - } - kv.key = NULL; - val.data.string = NULL; // we didn't strdup it, so don't release it - - /* pass the completed blob */ - kv.key = PMIX_MAP_BLOB; - val.type = PMIX_BYTE_OBJECT; - PMIX_UNLOAD_BUFFER(&buf2, val.data.bo.bytes, val.data.bo.size); - PMIX_BFROPS_PACK(rc, peer, buf, &kv, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - kv.key = NULL; - if (NULL != val.data.bo.bytes) { - free(val.data.bo.bytes); - } - kv.value = NULL; - - cleanup: - PMIX_DESTRUCT(&buf2); - PMIX_DESTRUCT(&kv); - return rc; -} - static pmix_status_t register_info(pmix_peer_t *peer, pmix_nspace_t *ns, pmix_buffer_t *reply) { - pmix_rank_t rank; - char **procs = NULL, **nodes = NULL; - size_t n, j, size; + pmix_hash_trkr_t *trk, *t; + pmix_hash_table_t *ht; + pmix_value_t *val, blob; pmix_status_t rc = PMIX_SUCCESS; - pmix_info_t *iptr; - pmix_buffer_t buf2; - pmix_kval_t kv, *kvptr; - pmix_value_t val; + pmix_rank_info_t *rinfo; + pmix_info_t *info; + size_t ninfo, n; + pmix_kval_t kv; + pmix_buffer_t buf; + pmix_rank_t rank; - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%d] gds:hash:register_info", - pmix_globals.myid.nspace, pmix_globals.myid.rank); - - /* pack the provided info */ - for (n=0; n < ns->njobinfo; n++) { - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "pmix:gds:hash packing job info %s", - ns->jobinfo[n].key); - - if (0 == strcmp(ns->jobinfo[n].key, PMIX_NODE_MAP)) { - /* parse the regex to get the argv array of node names */ - if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(ns->jobinfo[n].value.data.string, &nodes))) { - PMIX_ERROR_LOG(rc); - continue; - } - /* if we have already found the proc map, then pass - * the detailed map */ - if (NULL != procs) { - rc = pmix_pack_proc_map(peer, reply, nodes, procs); - pmix_argv_free(nodes); - nodes = NULL; - pmix_argv_free(procs); - procs = NULL; - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - } else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_MAP)) { - /* parse the regex to get the argv array containing proc ranks on each node */ - if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(ns->jobinfo[n].value.data.string, &procs))) { - PMIX_ERROR_LOG(rc); - continue; - } - /* if we have already recv'd the node map, then record - * the detailed map */ - if (NULL != nodes) { - rc = pmix_pack_proc_map(peer, reply, nodes, procs); - pmix_argv_free(nodes); - nodes = NULL; - pmix_argv_free(procs); - procs = NULL; - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - } else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_DATA)) { - /* an array of data pertaining to a specific proc */ - if (PMIX_DATA_ARRAY != ns->jobinfo[n].value.type) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - goto release; - } - size = ns->jobinfo[n].value.data.darray->size; - iptr = (pmix_info_t*)ns->jobinfo[n].value.data.darray->array; - PMIX_CONSTRUCT(&buf2, pmix_buffer_t); - /* first element of the array must be the rank */ - if (0 != strcmp(iptr[0].key, PMIX_RANK) || - PMIX_PROC_RANK != iptr[0].value.type) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - PMIX_DESTRUCT(&buf2); - goto release; - } - /* pack it separately */ - rank = iptr[0].value.data.rank; - PMIX_BFROPS_PACK(rc, peer, &buf2, &rank, 1, PMIX_PROC_RANK); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - goto release; - } - /* cycle thru the values for this rank and pack them */ - for (j=1; j < size; j++) { - kv.key = iptr[j].key; - kv.value = &iptr[j].value; - PMIX_BFROPS_PACK(rc, peer, &buf2, &kv, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - goto release; - } - } - /* now add the blob */ - kv.key = PMIX_PROC_BLOB; - kv.value = &val; - val.type = PMIX_BYTE_OBJECT; - PMIX_UNLOAD_BUFFER(&buf2, val.data.bo.bytes, val.data.bo.size); - PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL); - PMIX_VALUE_DESTRUCT(&val); // release the data - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - goto release; - } - PMIX_DESTRUCT(&buf2); - } else { - /* just a value relating to the entire job */ - kv.key = ns->jobinfo[n].key; - kv.value = &ns->jobinfo[n].value; - PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto release; - } - } - } - - /* now add any global data that was provided */ - PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) { - PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); + trk = NULL; + PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) { + if (0 == strcmp(ns->nspace, t->ns)) { + trk = t; break; } } - - release: - /* cleanup */ - if (NULL != nodes) { - pmix_argv_free(nodes); + if (NULL == trk) { + return PMIX_ERR_INVALID_NAMESPACE; } - if (NULL != procs) { - pmix_argv_free(procs); + /* the job data is stored on the internal hash table */ + ht = &trk->internal; + + /* fetch all values from the hash table tied to rank=wildcard */ + val = NULL; + rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + return rc; } + if (NULL == val->data.darray || + PMIX_INFO != val->data.darray->type || + 0 == val->data.darray->size) { + return PMIX_ERR_NOT_FOUND; + } + info = (pmix_info_t*)val->data.darray->array; + ninfo = val->data.darray->size; + for (n=0; n < ninfo; n++) { + kv.key = info[n].key; + kv.value = &info[n].value; + PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL); + } + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + + PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) { + val = NULL; + rc = pmix_hash_fetch(ht, rinfo->pname.rank, NULL, &val); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + return rc; + } + + PMIX_CONSTRUCT(&buf, pmix_buffer_t); + rank = rinfo->pname.rank; + PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK); + + info = (pmix_info_t*)val->data.darray->array; + ninfo = val->data.darray->size; + for (n=0; n < ninfo; n++) { + kv.key = info[n].key; + kv.value = &info[n].value; + PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL); + } + kv.key = PMIX_PROC_BLOB; + kv.value = &blob; + blob.type = PMIX_BYTE_OBJECT; + PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size); + PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL); + PMIX_VALUE_DESTRUCT(&blob); + PMIX_DESTRUCT(&buf); + + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + } return rc; } @@ -797,14 +660,6 @@ static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr, pmix_globals.myid.nspace, pmix_globals.myid.rank, peer->info->pname.nspace, peer->info->pname.rank); - - /* NOTE: we do not need to worry here about PMIX_REGISTER_NODATA - * as there will be no jobinfo stored on this nspace object - * if that directive has been given */ - if (NULL == ns->jobinfo) { - return PMIX_SUCCESS; - } - /* first see if we already have processed this data * for another peer in this nspace so we don't waste * time doing it again */