1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-09-13 10:21:44 -07:00
родитель df4bd83fcb
Коммит 691237801b
5 изменённых файлов: 128 добавлений и 577 удалений

Просмотреть файл

@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=git1154ce3
repo_rev=gitdcf4faf
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Sep 12, 2017"
date="Sep 13, 2017"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library

Просмотреть файл

@ -57,8 +57,6 @@ static void nscon(pmix_nspace_t *p)
p->nspace = NULL;
p->nlocalprocs = 0;
p->all_registered = false;
p->jobinfo = NULL;
p->njobinfo = 0;
p->jobbkt = NULL;
p->ndelivered = 0;
PMIX_CONSTRUCT(&p->ranks, pmix_list_t);
@ -69,9 +67,6 @@ static void nsdes(pmix_nspace_t *p)
if (NULL != p->nspace) {
free(p->nspace);
}
if (NULL != p->jobinfo) {
PMIX_INFO_FREE(p->jobinfo, p->njobinfo);
}
if (NULL != p->jobbkt) {
PMIX_RELEASE(p->jobbkt);
}

Просмотреть файл

@ -125,8 +125,6 @@ typedef struct {
char *nspace;
size_t nlocalprocs;
bool all_registered; // all local ranks have been defined
pmix_info_t *jobinfo; // copy of the job-level info to be delivered to each proc
size_t njobinfo;
pmix_buffer_t *jobbkt; // packed version of jobinfo
size_t ndelivered; // count of #local clients that have received the jobinfo
pmix_list_t ranks; // list of pmix_rank_info_t for connection support of my clients

Просмотреть файл

@ -223,10 +223,6 @@ static inline void _esh_sessions_cleanup(void);
static inline void _esh_ns_map_cleanup(void);
static inline int _esh_dir_del(const char *dirname);
static inline int _collect_key_for_rank(pmix_peer_t *peer, pmix_rank_t rank, pmix_kval_t *kv);
static inline int _collected_key_dstore_store(pmix_nspace_t *nptr);
static inline pmix_status_t store_map(pmix_peer_t *peer, char **nodes, char **ppn);
static inline int _my_client(const char *nspace, pmix_rank_t rank);
static pmix_status_t dstore_init(pmix_info_t info[], size_t ninfo);
@ -291,8 +287,6 @@ pmix_gds_base_module_t pmix_ds12_module = {
.del_nspace = dstore_del_nspace,
};
static pmix_value_array_t *rank_kv_bufs = NULL;
static char *_base_path = NULL;
static size_t _initial_segment_size = 0;
static size_t _max_ns_num;
@ -910,7 +904,11 @@ static inline void _esh_session_release(session_t *s)
}
_delete_sm_desc(s->sm_seg_first);
close(s->lockfd);
/* if the lock fd was somehow set, then we
* need to close it */
if (0 != s->lockfd) {
close(s->lockfd);
}
if (NULL != s->lockfile) {
if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) {
@ -2944,256 +2942,52 @@ static pmix_status_t dstore_store_modex(struct pmix_nspace_t *nspace,
return rc;
}
static inline int _collect_key_for_rank(pmix_peer_t *peer, pmix_rank_t rank, pmix_kval_t *kv)
static pmix_status_t _store_job_info(pmix_proc_t *proc)
{
pmix_cb_t cb;
pmix_kval_t *kv;
pmix_buffer_t buf;
pmix_kval_t *kv2 = NULL;
pmix_status_t rc = PMIX_SUCCESS;
uint32_t i, size;
pmix_buffer_t *tmp = NULL;
pmix_rank_t cur_rank;
if (NULL == rank_kv_bufs) {
rank_kv_bufs = PMIX_NEW(pmix_value_array_t);
if (PMIX_SUCCESS != (rc = pmix_value_array_init(rank_kv_bufs, sizeof(pmix_buffer_t)))) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
/* rank WILDCARD contained in the 0 item */
cur_rank = PMIX_RANK_WILDCARD == rank ? 0 : rank + 1;
size = (uint32_t)pmix_value_array_get_size(rank_kv_bufs);
kv2 = PMIX_NEW(pmix_kval_t);
PMIX_VALUE_CREATE(kv2->value, 1);
kv2->value->type = PMIX_BYTE_OBJECT;
if ((cur_rank + 1) <= size) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, cur_rank));
PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, tmp, kv, 1, PMIX_KVAL);
return rc;
}
if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(rank_kv_bufs, cur_rank + 1))) {
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
PMIX_CONSTRUCT(&cb, pmix_cb_t);
cb.proc = proc;
cb.scope = PMIX_INTERNAL;
cb.copy = false;
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
for (i = size; i < (cur_rank + 1); i++) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i));
PMIX_CONSTRUCT(tmp, pmix_buffer_t);
}
PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, tmp, kv, 1, PMIX_KVAL);
return rc;
}
static inline int _collected_key_dstore_store(pmix_nspace_t *nptr)
{
int rc = PMIX_SUCCESS;
uint32_t i, size;
pmix_buffer_t *tmp;
pmix_rank_t rank;
pmix_kval_t *kv = NULL;
if (NULL == rank_kv_bufs) {
goto exit;
}
kv = PMIX_NEW(pmix_kval_t);
PMIX_VALUE_CREATE(kv->value, 1);
kv->value->type = PMIX_BYTE_OBJECT;
size = pmix_value_array_get_size(rank_kv_bufs);
for (i = 0; i < size; i++) {
tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i));
rank = 0 == i ? PMIX_RANK_WILDCARD : i - 1;
PMIX_UNLOAD_BUFFER(tmp, kv->value->data.bo.bytes, kv->value->data.bo.size);
if (PMIX_SUCCESS != (rc = _dstore_store(nptr->nspace, rank, kv))) {
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) {
PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto exit;
}
}
PMIX_UNLOAD_BUFFER(&buf, kv2->value->data.bo.bytes, kv2->value->data.bo.size);
if (PMIX_SUCCESS != (rc = _dstore_store(proc->nspace, proc->rank, kv2))) {
PMIX_ERROR_LOG(rc);
goto exit;
}
exit:
if (NULL != kv) {
PMIX_RELEASE(kv);
}
if (NULL != rank_kv_bufs) {
size_t size = pmix_value_array_get_size(rank_kv_bufs);
size_t i;
for (i = 0; i < size; i++) {
pmix_buffer_t *tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(rank_kv_bufs, pmix_buffer_t, i));
PMIX_DESTRUCT(tmp);
}
PMIX_RELEASE(rank_kv_bufs);
rank_kv_bufs = NULL;
}
PMIX_RELEASE(kv2);
PMIX_DESTRUCT(&cb);
PMIX_DESTRUCT(&buf);
return rc;
}
static inline pmix_status_t store_map(pmix_peer_t *peer,
char **nodes, char **ppn)
{
pmix_status_t rc;
pmix_value_t *val;
size_t m, n;
pmix_info_t *iptr, *info;
pmix_rank_t rank;
bool updated;
pmix_kval_t *kp2;
char **procs;
pmix_proc_t proc;
pmix_cb_t cb;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:dstore:store_map",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
/* if the lists don't match, then that's wrong */
if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
for (n=0; NULL != nodes[n]; n++) {
/* check and see if we already have data for this node */
val = NULL;
proc.rank = PMIX_RANK_WILDCARD;
(void)strncpy(proc.nspace, peer->nptr->nspace, PMIX_MAX_NSLEN);
PMIX_CONSTRUCT(&cb, pmix_cb_t);
cb.proc = &proc;
cb.scope = PMIX_INTERNAL;
cb.copy = true; // ???
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb);
if (PMIX_SUCCESS == rc && 1 == pmix_list_get_size(&cb.kvs)) {
kp2 = (pmix_kval_t*)pmix_list_get_first(&cb.kvs);
val = kp2->value;
//kp2->value = NULL; // protect the value
/* already have some data. See if we have the list of local peers */
if (PMIX_DATA_ARRAY != val->type ||
NULL == val->data.darray ||
PMIX_INFO != val->data.darray->type ||
0 == val->data.darray->size) {
/* something is wrong */
PMIX_VALUE_RELEASE(val);
PMIX_ERROR_LOG(PMIX_ERR_INVALID_VAL);
return PMIX_ERR_INVALID_VAL;
}
iptr = (pmix_info_t*)val->data.darray->array;
updated = false;
for (m=0; m < val->data.darray->size; m++) {
if (0 == strncmp(iptr[m].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN)) {
/* we will update this entry */
if (NULL != iptr[m].value.data.string) {
free(iptr[m].value.data.string);
}
iptr[m].value.data.string = strdup(ppn[n]);
updated = true;
break;
}
}
if (!updated) {
/* append this entry to the current data */
kp2 = PMIX_NEW(pmix_kval_t);
if (NULL == kp2) {
return PMIX_ERR_NOMEM;
}
kp2->key = strdup(nodes[n]);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
if (NULL == kp2->value) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
kp2->value->type = PMIX_DATA_ARRAY;
kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
if (NULL == kp2->value->data.darray) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
kp2->value->data.darray->type = PMIX_INFO;
kp2->value->data.darray->size = val->data.darray->size + 1;
PMIX_INFO_CREATE(info, kp2->value->data.darray->size);
if (NULL == info) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
/* copy the pre-existing data across */
for (m=0; m < val->data.darray->size; m++) {
PMIX_INFO_XFER(&info[m], &iptr[m]);
}
PMIX_INFO_LOAD(&info[kp2->value->data.darray->size-1], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
kp2->value->data.darray->array = info;
if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
return rc;
}
PMIX_RELEASE(kp2);
}
} else {
/* store the list as-is */
kp2 = PMIX_NEW(pmix_kval_t);
if (NULL == kp2) {
return PMIX_ERR_NOMEM;
}
kp2->key = strdup(nodes[n]);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
if (NULL == kp2->value) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
kp2->value->type = PMIX_DATA_ARRAY;
kp2->value->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
if (NULL == kp2->value->data.darray) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
kp2->value->data.darray->type = PMIX_INFO;
PMIX_INFO_CREATE(info, 1);
if (NULL == info) {
PMIX_RELEASE(kp2);
return PMIX_ERR_NOMEM;
}
PMIX_INFO_LOAD(&info[0], PMIX_LOCAL_PEERS, ppn[n], PMIX_STRING);
kp2->value->data.darray->array = info;
kp2->value->data.darray->size = 1;
if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
return rc;
}
PMIX_RELEASE(kp2);
}
/* split the list of procs so we can store their
* individual location data */
procs = pmix_argv_split(ppn[n], ',');
for (m=0; NULL != procs[m]; m++) {
/* store the hostname for each proc */
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_HOSTNAME);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
kp2->value->type = PMIX_STRING;
kp2->value->data.string = strdup(nodes[n]);
rank = strtol(procs[m], NULL, 10);
if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, rank, kp2))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
pmix_argv_free(procs);
return rc;
}
PMIX_RELEASE(kp2);
}
pmix_argv_free(procs);
}
/* store the comma-delimited list of nodes hosting
* procs in this nspace */
kp2 = PMIX_NEW(pmix_kval_t);
kp2->key = strdup(PMIX_NODE_LIST);
kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
kp2->value->type = PMIX_STRING;
kp2->value->data.string = pmix_argv_join(nodes, ',');
if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kp2))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
return rc;
}
return PMIX_SUCCESS;
}
static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr,
pmix_buffer_t *reply)
{
@ -3201,12 +2995,8 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr,
pmix_nspace_t *ns = peer->nptr;
char *msg;
pmix_status_t rc;
size_t j, n, size, len;
pmix_info_t *iptr;
pmix_rank_t rank;
pmix_kval_t *kp2;
uint8_t *tmp;
char **nodes=NULL, **procs=NULL;
pmix_proc_t proc;
pmix_rank_info_t *rinfo;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:dstore:register_job_info for peer [%s:%d]",
@ -3214,109 +3004,22 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr,
peer->info->pname.nspace, peer->info->pname.rank);
if (0 == ns->ndelivered) { // don't store twice
for (n=0; n < ns->njobinfo; n++) {
if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_DATA)) {
(void)strncpy(proc.nspace, ns->nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
rc = _store_job_info(&proc);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
if (PMIX_DATA_ARRAY != ns->jobinfo[n].value.type) {
rc = PMIX_ERR_BAD_PARAM;
PMIX_ERROR_LOG(rc);
return rc;
}
size = ns->jobinfo[n].value.data.darray->size;
iptr = (pmix_info_t*)ns->jobinfo[n].value.data.darray->array;
/* first element of the array must be the rank */
if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
PMIX_PROC_RANK != iptr[0].value.type) {
rc = PMIX_ERR_BAD_PARAM;
PMIX_ERROR_LOG(rc);
return rc;
}
rank = iptr[0].value.data.rank;
/* cycle thru the values for this rank and store them */
for (j=1; j < size; j++) {
kp2 = PMIX_NEW(pmix_kval_t);
if (NULL == kp2) {
rc = PMIX_ERR_NOMEM;
return rc;
}
kp2->key = strdup(iptr[j].key);
PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
return rc;
}
/* if the value contains a string that is longer than the
* limit, then compress it */
if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
if (pmix_util_compress_string(kp2->value->data.string, &tmp, &len)) {
if (NULL == tmp) {
PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
rc = PMIX_ERR_NOMEM;
return rc;
}
kp2->value->type = PMIX_COMPRESSED_STRING;
free(kp2->value->data.string);
kp2->value->data.bo.bytes = (char*)tmp;
kp2->value->data.bo.size = len;
}
}
/* store it in the tmp buf */
if (PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, rank, kp2))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kp2);
return rc;
}
PMIX_RELEASE(kp2); // maintain acctg
}
} else if (0 == strcmp(ns->jobinfo[n].key, PMIX_NODE_MAP)) {
/* parse the regex to get the argv array of node names */
if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(ns->jobinfo[n].value.data.string, &nodes))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* if we have already found the proc map, then parse
* and store the detailed map */
if (NULL != procs) {
if (PMIX_SUCCESS != (rc = store_map(peer, nodes, procs))) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
} else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_MAP)) {
/* parse the regex to get the argv array containing proc ranks on each node */
if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(ns->jobinfo[n].value.data.string, &procs))) {
PMIX_ERROR_LOG(rc);
return rc;
}
/* if we have already recv'd the node map, then parse
* and store the detailed map */
if (NULL != nodes) {
if (PMIX_SUCCESS != (rc = store_map(peer, nodes, procs))) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
} else {
pmix_kval_t *kv = PMIX_NEW(pmix_kval_t);
PMIX_VALUE_CREATE(kv->value, 1);
kv->key = strdup(ns->jobinfo[n].key);
PMIX_VALUE_XFER(rc, kv->value, &ns->jobinfo[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(kv);
return rc;
}
if ( PMIX_SUCCESS != (rc = _collect_key_for_rank(peer, PMIX_RANK_WILDCARD, kv))) {
PMIX_RELEASE(kv);
PMIX_ERROR_LOG(rc);
return rc;
}
PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) {
proc.rank = rinfo->pname.rank;
rc = _store_job_info(&proc);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
/* store all keys in thr dstore */
_collected_key_dstore_store(ns);
}
/* answer to client */

Просмотреть файл

@ -391,19 +391,6 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns,
return PMIX_SUCCESS;
}
/* this is duplicative, but for now, we copy the data to the nspace
* jobinfo array as well as cache it internally so we can look it
* up if required. We will later figure out a way to reconstruct
* the jobinfo array when required */
PMIX_INFO_CREATE(nptr->jobinfo, ninfo);
nptr->njobinfo = ninfo;
for (n=0; n < ninfo; n++) {
(void)strncpy(nptr->jobinfo[n].key, info[n].key, PMIX_MAX_KEYLEN);
PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
&nptr->jobinfo[n].value,
&info[n].value);
}
/* cache the job info on the internal hash table for this nspace */
ht = &trk->internal;
for (n=0; n < ninfo; n++) {
@ -558,219 +545,95 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns,
return rc;
}
/* we need to pass three things to the client:
*
* (a) the list of nodes involved in this nspace
*
* (b) the hostname for each proc in this nspace
*
* (c) the list of procs on each node for reverse lookup
*/
static pmix_status_t pmix_pack_proc_map(struct pmix_peer_t *pr,
pmix_buffer_t *buf,
char **nodes, char **procs)
{
pmix_peer_t *peer = (pmix_peer_t*)pr;
pmix_kval_t kv;
pmix_value_t val;
pmix_status_t rc;
pmix_buffer_t buf2;
size_t i, nnodes;
/* bozo check - need procs for each node */
if (pmix_argv_count(nodes) != pmix_argv_count(procs)) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_CONSTRUCT(&kv, pmix_kval_t);
kv.value = &val;
/* pass the number of nodes involved in this namespace */
nnodes = pmix_argv_count(nodes);
PMIX_BFROPS_PACK(rc, peer, &buf2, &nnodes, 1, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
for (i=0; i < nnodes; i++) {
/* pass the complete list of procs on this node */
kv.key = nodes[i];
val.type = PMIX_STRING;
val.data.string = procs[i];
PMIX_BFROPS_PACK(rc, peer, &buf2, &kv, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
kv.key = NULL;
val.data.string = NULL;
goto cleanup;
}
}
kv.key = NULL;
val.data.string = NULL; // we didn't strdup it, so don't release it
/* pass the completed blob */
kv.key = PMIX_MAP_BLOB;
val.type = PMIX_BYTE_OBJECT;
PMIX_UNLOAD_BUFFER(&buf2, val.data.bo.bytes, val.data.bo.size);
PMIX_BFROPS_PACK(rc, peer, buf, &kv, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
kv.key = NULL;
if (NULL != val.data.bo.bytes) {
free(val.data.bo.bytes);
}
kv.value = NULL;
cleanup:
PMIX_DESTRUCT(&buf2);
PMIX_DESTRUCT(&kv);
return rc;
}
static pmix_status_t register_info(pmix_peer_t *peer,
pmix_nspace_t *ns,
pmix_buffer_t *reply)
{
pmix_rank_t rank;
char **procs = NULL, **nodes = NULL;
size_t n, j, size;
pmix_hash_trkr_t *trk, *t;
pmix_hash_table_t *ht;
pmix_value_t *val, blob;
pmix_status_t rc = PMIX_SUCCESS;
pmix_info_t *iptr;
pmix_buffer_t buf2;
pmix_kval_t kv, *kvptr;
pmix_value_t val;
pmix_rank_info_t *rinfo;
pmix_info_t *info;
size_t ninfo, n;
pmix_kval_t kv;
pmix_buffer_t buf;
pmix_rank_t rank;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:hash:register_info",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
/* pack the provided info */
for (n=0; n < ns->njobinfo; n++) {
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"pmix:gds:hash packing job info %s",
ns->jobinfo[n].key);
if (0 == strcmp(ns->jobinfo[n].key, PMIX_NODE_MAP)) {
/* parse the regex to get the argv array of node names */
if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(ns->jobinfo[n].value.data.string, &nodes))) {
PMIX_ERROR_LOG(rc);
continue;
}
/* if we have already found the proc map, then pass
* the detailed map */
if (NULL != procs) {
rc = pmix_pack_proc_map(peer, reply, nodes, procs);
pmix_argv_free(nodes);
nodes = NULL;
pmix_argv_free(procs);
procs = NULL;
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
} else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_MAP)) {
/* parse the regex to get the argv array containing proc ranks on each node */
if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(ns->jobinfo[n].value.data.string, &procs))) {
PMIX_ERROR_LOG(rc);
continue;
}
/* if we have already recv'd the node map, then record
* the detailed map */
if (NULL != nodes) {
rc = pmix_pack_proc_map(peer, reply, nodes, procs);
pmix_argv_free(nodes);
nodes = NULL;
pmix_argv_free(procs);
procs = NULL;
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
} else if (0 == strcmp(ns->jobinfo[n].key, PMIX_PROC_DATA)) {
/* an array of data pertaining to a specific proc */
if (PMIX_DATA_ARRAY != ns->jobinfo[n].value.type) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
goto release;
}
size = ns->jobinfo[n].value.data.darray->size;
iptr = (pmix_info_t*)ns->jobinfo[n].value.data.darray->array;
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
/* first element of the array must be the rank */
if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
PMIX_PROC_RANK != iptr[0].value.type) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
PMIX_DESTRUCT(&buf2);
goto release;
}
/* pack it separately */
rank = iptr[0].value.data.rank;
PMIX_BFROPS_PACK(rc, peer, &buf2, &rank, 1, PMIX_PROC_RANK);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
goto release;
}
/* cycle thru the values for this rank and pack them */
for (j=1; j < size; j++) {
kv.key = iptr[j].key;
kv.value = &iptr[j].value;
PMIX_BFROPS_PACK(rc, peer, &buf2, &kv, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
goto release;
}
}
/* now add the blob */
kv.key = PMIX_PROC_BLOB;
kv.value = &val;
val.type = PMIX_BYTE_OBJECT;
PMIX_UNLOAD_BUFFER(&buf2, val.data.bo.bytes, val.data.bo.size);
PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
PMIX_VALUE_DESTRUCT(&val); // release the data
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&buf2);
goto release;
}
PMIX_DESTRUCT(&buf2);
} else {
/* just a value relating to the entire job */
kv.key = ns->jobinfo[n].key;
kv.value = &ns->jobinfo[n].value;
PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto release;
}
}
}
/* now add any global data that was provided */
PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) {
PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
trk = NULL;
PMIX_LIST_FOREACH(t, &myhashes, pmix_hash_trkr_t) {
if (0 == strcmp(ns->nspace, t->ns)) {
trk = t;
break;
}
}
release:
/* cleanup */
if (NULL != nodes) {
pmix_argv_free(nodes);
if (NULL == trk) {
return PMIX_ERR_INVALID_NAMESPACE;
}
if (NULL != procs) {
pmix_argv_free(procs);
/* the job data is stored on the internal hash table */
ht = &trk->internal;
/* fetch all values from the hash table tied to rank=wildcard */
val = NULL;
rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
return rc;
}
if (NULL == val->data.darray ||
PMIX_INFO != val->data.darray->type ||
0 == val->data.darray->size) {
return PMIX_ERR_NOT_FOUND;
}
info = (pmix_info_t*)val->data.darray->array;
ninfo = val->data.darray->size;
for (n=0; n < ninfo; n++) {
kv.key = info[n].key;
kv.value = &info[n].value;
PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
}
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) {
val = NULL;
rc = pmix_hash_fetch(ht, rinfo->pname.rank, NULL, &val);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
return rc;
}
PMIX_CONSTRUCT(&buf, pmix_buffer_t);
rank = rinfo->pname.rank;
PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK);
info = (pmix_info_t*)val->data.darray->array;
ninfo = val->data.darray->size;
for (n=0; n < ninfo; n++) {
kv.key = info[n].key;
kv.value = &info[n].value;
PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL);
}
kv.key = PMIX_PROC_BLOB;
kv.value = &blob;
blob.type = PMIX_BYTE_OBJECT;
PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size);
PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
PMIX_VALUE_DESTRUCT(&blob);
PMIX_DESTRUCT(&buf);
if (NULL != val) {
PMIX_VALUE_RELEASE(val);
}
}
return rc;
}
@ -797,14 +660,6 @@ static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
pmix_globals.myid.nspace, pmix_globals.myid.rank,
peer->info->pname.nspace, peer->info->pname.rank);
/* NOTE: we do not need to worry here about PMIX_REGISTER_NODATA
* as there will be no jobinfo stored on this nspace object
* if that directive has been given */
if (NULL == ns->jobinfo) {
return PMIX_SUCCESS;
}
/* first see if we already have processed this data
* for another peer in this nspace so we don't waste
* time doing it again */