1
1

Merge pull request #6830 from rhc54/topic/dpm

Provide locality for all procs on node
Этот коммит содержится в:
Ralph Castain 2019-07-23 08:10:57 -07:00 коммит произвёл GitHub
родитель 20dd06c151 d202e10c14
Коммит 8f32a59304
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
25 изменённых файлов: 483 добавлений и 382 удалений

Просмотреть файл

@ -15,7 +15,7 @@
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2018 Amazon.com, Inc. or its affiliates. All Rights reserved.
@ -406,9 +406,43 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
goto exit;
}
if (0 < opal_list_get_size(&ilist)) {
uint32_t *peer_ranks = NULL;
int prn, nprn;
char *val, *mycpuset;
uint16_t u16;
opal_process_name_t wildcard_rank;
/* convert the list of new procs to a proc_t array */
new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
sizeof(ompi_proc_t *));
/* get the list of local peers for the new procs */
cd = (ompi_dpm_proct_caddy_t*)opal_list_get_first(&ilist);
proc = cd->p;
wildcard_rank.jobid = proc->super.proc_name.jobid;
wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
/* retrieve the local peers */
OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, OPAL_PMIX_LOCAL_PEERS,
&wildcard_rank, &val, OPAL_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
char **peers = opal_argv_split(val, ',');
free(val);
nprn = opal_argv_count(peers);
peer_ranks = (uint32_t*)calloc(nprn, sizeof(uint32_t));
for (prn = 0; NULL != peers[prn]; prn++) {
peer_ranks[prn] = strtoul(peers[prn], NULL, 10);
}
opal_argv_free(peers);
}
/* get my locality string */
val = NULL;
OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, OPAL_PMIX_LOCALITY_STRING,
OMPI_PROC_MY_NAME, &val, OPAL_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
mycpuset = val;
} else {
mycpuset = NULL;
}
i = 0;
OPAL_LIST_FOREACH(cd, &ilist, ompi_dpm_proct_caddy_t) {
opal_value_t *kv;
@ -418,15 +452,38 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
* OPAL_PMIX_LOCALITY and OPAL_PMIX_HOSTNAME. since we can live without
* them, we are just fine */
ompi_proc_complete_init_single(proc);
/* save the locality for later */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_LOCALITY);
kv->type = OPAL_UINT16;
kv->data.uint16 = proc->super.proc_flags;
opal_pmix.store_local(&proc->super.proc_name, kv);
OBJ_RELEASE(kv); // maintain accounting
/* if this proc is local, then get its locality */
if (NULL != peer_ranks) {
for (prn=0; prn < nprn; prn++) {
if (peer_ranks[prn] == proc->super.proc_name.vpid) {
/* get their locality string */
val = NULL;
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, OPAL_PMIX_LOCALITY_STRING,
&proc->super.proc_name, &val, OPAL_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
u16 = opal_hwloc_compute_relative_locality(mycpuset, val);
free(val);
} else {
/* all we can say is that it shares our node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
}
proc->super.proc_flags = u16;
/* save the locality for later */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_LOCALITY);
kv->type = OPAL_UINT16;
kv->data.uint16 = proc->super.proc_flags;
opal_pmix.store_local(&proc->super.proc_name, kv);
OBJ_RELEASE(kv); // maintain accounting
break;
}
}
}
++i;
}
if (NULL != mycpuset) {
free(mycpuset);
}
/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
free(new_proc_list);

Просмотреть файл

@ -23,14 +23,14 @@ release=0
# The only requirement is that it must be entirely printable ASCII
# characters and have no white space.
greek=a1
greek=
# If repo_rev is empty, then the repository version number will be
# obtained during "make dist" via the "git describe --tags --always"
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=git03a8b5da
repo_rev=git628a724c
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Jul 16, 2019"
date="Jul 21, 2019"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library

Просмотреть файл

@ -12,7 +12,7 @@
# Copyright (c) 2006-2016 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Mellanox Technologies, Inc.
# All rights reserved.
# Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
# Copyright (c) 2015-2019 Intel, Inc. All rights reserved.
# Copyright (c) 2015 Research Organization for Information Science
# and Technology (RIST). All rights reserved.
# $COPYRIGHT$
@ -192,7 +192,7 @@
Summary: An extended/exascale implementation of PMI
Name: %{?_name:%{_name}}%{!?_name:pmix}
Version: 4.0.0a1
Version: 4.0.0
Release: 1%{?dist}
License: BSD
Group: Development/Libraries

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies, Inc.
@ -83,7 +83,7 @@ PMIX_EXPORT int PMI_Init(int *spawned)
/* getting internal key requires special rank value */
memcpy(&proc, &myproc, sizeof(myproc));
proc.rank = PMIX_RANK_UNDEF;
proc.rank = PMIX_RANK_WILDCARD;
/* set controlling parameters
* PMIX_OPTIONAL - expect that these keys should be available on startup
@ -392,8 +392,6 @@ PMIX_EXPORT int PMI_Get_appnum(int *appnum)
pmix_value_t *val;
pmix_info_t info[1];
bool val_optinal = 1;
pmix_proc_t proc = myproc;
proc.rank = PMIX_RANK_WILDCARD;
PMI_CHECK();
@ -412,11 +410,11 @@ PMIX_EXPORT int PMI_Get_appnum(int *appnum)
PMIX_INFO_CONSTRUCT(&info[0]);
PMIX_INFO_LOAD(&info[0], PMIX_OPTIONAL, &val_optinal, PMIX_BOOL);
rc = PMIx_Get(&proc, PMIX_APPNUM, info, 1, &val);
rc = PMIx_Get(&myproc, PMIX_APPNUM, info, 1, &val);
if (PMIX_SUCCESS == rc) {
rc = convert_int(appnum, val);
PMIX_VALUE_RELEASE(val);
} else if( PMIX_ERR_NOT_FOUND == rc ){
} else {
/* this is optional value, set to 0 */
*appnum = 0;
rc = PMIX_SUCCESS;
@ -443,7 +441,7 @@ PMIX_EXPORT int PMI_Publish_name(const char service_name[], const char port[])
}
/* pass the service/port */
pmix_strncpy(info.key, service_name, PMIX_MAX_KEYLEN);
pmix_strncpy(info.key, service_name, PMIX_MAX_KEYLEN);
info.value.type = PMIX_STRING;
info.value.data.string = (char*) port;
@ -495,7 +493,7 @@ PMIX_EXPORT int PMI_Lookup_name(const char service_name[], char port[])
PMIX_PDATA_CONSTRUCT(&pdata);
/* pass the service */
pmix_strncpy(pdata.key, service_name, PMIX_MAX_KEYLEN);
pmix_strncpy(pdata.key, service_name, PMIX_MAX_KEYLEN);
/* PMI-1 doesn't want the nspace back */
if (PMIX_SUCCESS != (rc = PMIx_Lookup(&pdata, 1, NULL, 0))) {
@ -512,7 +510,7 @@ PMIX_EXPORT int PMI_Lookup_name(const char service_name[], char port[])
* potential we could overrun it. As this feature
* isn't widely supported in PMI-1, try being
* conservative */
pmix_strncpy(port, pdata.value.data.string, PMIX_MAX_KEYLEN);
pmix_strncpy(port, pdata.value.data.string, PMIX_MAX_KEYLEN);
PMIX_PDATA_DESTRUCT(&pdata);
return PMIX_SUCCESS;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
@ -72,7 +72,7 @@ PMIX_EXPORT pmix_status_t PMIx_Fence(const pmix_proc_t procs[], size_t nprocs,
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"pmix: executing fence");
if (pmix_globals.init_cntr <= 0) {
@ -105,7 +105,7 @@ PMIX_EXPORT pmix_status_t PMIx_Fence(const pmix_proc_t procs[], size_t nprocs,
rc = cb->status;
PMIX_RELEASE(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"pmix: fence released");
return rc;
@ -124,7 +124,7 @@ PMIX_EXPORT pmix_status_t PMIx_Fence_nb(const pmix_proc_t procs[], size_t nprocs
PMIX_ACQUIRE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"pmix: fence_nb called");
if (pmix_globals.init_cntr <= 0) {
@ -184,7 +184,7 @@ static pmix_status_t unpack_return(pmix_buffer_t *data)
pmix_status_t ret;
int32_t cnt;
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"client:unpack fence called");
/* unpack the status code */
@ -195,7 +195,7 @@ static pmix_status_t unpack_return(pmix_buffer_t *data)
PMIX_ERROR_LOG(rc);
return rc;
}
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"client:unpack fence received status %d", ret);
return ret;
}
@ -254,7 +254,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
pmix_status_t rc;
pmix_output_verbose(2, pmix_globals.debug_output,
pmix_output_verbose(2, pmix_client_globals.fence_output,
"pmix: fence_nb callback recvd");
if (NULL == cb) {

Просмотреть файл

@ -53,6 +53,7 @@
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/hash.h"
#include "src/util/name_fns.h"
#include "src/util/output.h"
#include "src/mca/gds/gds.h"
#include "src/mca/ptl/ptl.h"
@ -95,14 +96,15 @@ PMIX_EXPORT pmix_status_t PMIx_Get(const pmix_proc_t *proc,
PMIX_RELEASE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_client_globals.get_output,
"pmix:client get for %s:%d key %s",
(NULL == proc) ? "NULL" : proc->nspace,
(NULL == proc) ? PMIX_RANK_UNDEF : proc->rank,
"pmix:client get for %s key %s",
(NULL == proc) ? "NULL" : PMIX_NAME_PRINT(proc),
(NULL == key) ? "NULL" : key);
/* try to get data directly, without threadshift */
if (PMIX_SUCCESS == (rc = _getfn_fastpath(proc, key, info, ninfo, val))) {
goto done;
if (PMIX_RANK_UNDEF != proc->rank) {
if (PMIX_SUCCESS == (rc = _getfn_fastpath(proc, key, info, ninfo, val))) {
goto done;
}
}
/* create a callback object as we need to pass it to the
@ -325,9 +327,14 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr,
}
if (PMIX_SUCCESS != ret) {
PMIX_ERROR_LOG(ret);
goto done;
}
PMIX_GDS_ACCEPT_KVS_RESP(rc, pmix_client_globals.myserver, buf);
if (PMIX_RANK_UNDEF == proc.rank) {
PMIX_GDS_ACCEPT_KVS_RESP(rc, pmix_globals.mypeer, buf);
} else {
PMIX_GDS_ACCEPT_KVS_RESP(rc, pmix_client_globals.myserver, buf);
}
if (PMIX_SUCCESS != rc) {
goto done;
}
@ -346,7 +353,11 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr,
/* fetch the data from server peer module - since it is passing
* it back to the user, we need a copy of it */
cb->copy = true;
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
if (PMIX_RANK_UNDEF == proc.rank) {
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
} else {
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
}
if (PMIX_SUCCESS == rc) {
if (1 != pmix_list_get_size(&cb->kvs)) {
rc = PMIX_ERR_INVALID_VAL;
@ -494,10 +505,14 @@ static pmix_status_t _getfn_fastpath(const pmix_proc_t *proc, const pmix_key_t k
/* scan the incoming directives */
if (NULL != info) {
for (n=0; n < ninfo; n++) {
if (0 == strncmp(info[n].key, PMIX_DATA_SCOPE, PMIX_MAX_KEYLEN)) {
if (PMIX_CHECK_KEY(&info[n], PMIX_DATA_SCOPE)) {
cb->scope = info[n].value.data.scope;
} else if (PMIX_CHECK_KEY(&info[n], PMIX_OPTIONAL) ||
PMIX_CHECK_KEY(&info[n], PMIX_IMMEDIATE)) {
continue;
} else {
/* we cannot handle any other directives via this path */
PMIX_RELEASE(cb);
return PMIX_ERR_NOT_SUPPORTED;
}
}
@ -508,16 +523,16 @@ static pmix_status_t _getfn_fastpath(const pmix_proc_t *proc, const pmix_key_t k
cb->info = (pmix_info_t*)info;
cb->ninfo = ninfo;
PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_globals.mypeer);
PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_client_globals.myserver);
if (PMIX_SUCCESS == rc) {
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
if (PMIX_SUCCESS == rc) {
goto done;
}
}
PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_client_globals.myserver);
PMIX_GDS_FETCH_IS_TSAFE(rc, pmix_globals.mypeer);
if (PMIX_SUCCESS == rc) {
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, cb);
if (PMIX_SUCCESS == rc) {
goto done;
}
@ -551,15 +566,15 @@ static void _getnbfn(int fd, short flags, void *cbdata)
/* cb was passed to us from another thread - acquire it */
PMIX_ACQUIRE_OBJECT(cb);
pmix_output_verbose(2, pmix_client_globals.get_output,
"pmix: getnbfn value for proc %s:%u key %s",
cb->pname.nspace, cb->pname.rank,
(NULL == cb->key) ? "NULL" : cb->key);
/* set the proc object identifier */
pmix_strncpy(proc.nspace, cb->pname.nspace, PMIX_MAX_NSLEN);
proc.rank = cb->pname.rank;
pmix_output_verbose(2, pmix_client_globals.get_output,
"pmix: getnbfn value for proc %s key %s",
PMIX_NAME_PRINT(&proc),
(NULL == cb->key) ? "NULL" : cb->key);
/* scan the incoming directives */
if (NULL != cb->info) {
for (n=0; n < cb->ninfo; n++) {
@ -612,7 +627,13 @@ static void _getnbfn(int fd, short flags, void *cbdata)
/* fetch the data from my server's module - since we are passing
* it back to the user, we need a copy of it */
cb->copy = true;
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
/* if the peer and server GDS component are the same, then no
* point in trying it again */
if (0 != strcmp(pmix_globals.mypeer->nptr->compat.gds->name, pmix_client_globals.myserver->nptr->compat.gds->name)) {
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
} else {
rc = PMIX_ERR_TAKE_NEXT_OPTION;
}
if (PMIX_SUCCESS != rc) {
pmix_output_verbose(5, pmix_client_globals.get_output,
"pmix:client job-level data NOT found");
@ -661,7 +682,17 @@ static void _getnbfn(int fd, short flags, void *cbdata)
"pmix:client job-level data NOT found");
rc = process_values(&val, cb);
goto respond;
} else if (PMIX_RANK_UNDEF == proc.rank) {
/* the data would have to be stored on our own peer, so
* we need to go request it */
goto request;
} else {
/* if the peer and server GDS component are the same, then no
* point in trying it again */
if (0 == strcmp(pmix_globals.mypeer->nptr->compat.gds->name, pmix_client_globals.myserver->nptr->compat.gds->name)) {
val = NULL;
goto request;
}
cb->proc = &proc;
cb->copy = true;
PMIX_GDS_FETCH_KV(rc, pmix_client_globals.myserver, cb);
@ -730,9 +761,9 @@ static void _getnbfn(int fd, short flags, void *cbdata)
}
pmix_output_verbose(2, pmix_client_globals.get_output,
"%s:%d REQUESTING DATA FROM SERVER FOR %s:%d KEY %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
cb->pname.nspace, cb->pname.rank, cb->key);
"%s REQUESTING DATA FROM SERVER FOR %s KEY %s",
PMIX_NAME_PRINT(&pmix_globals.myid),
PMIX_NAME_PRINT(cb->proc), cb->key);
/* track the callback object */
pmix_list_append(&pmix_client_globals.pending_requests, &cb->super);

Просмотреть файл

@ -886,7 +886,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
} else {
/* look up the nspace for this proc */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) {
nptr = tmp;
break;

Просмотреть файл

@ -513,6 +513,7 @@ typedef struct {
bool xml_output;
bool timestamp_output;
size_t output_limit;
pmix_list_t nspaces;
} pmix_globals_t;
/* provide access to a function to cleanup epilogs */

Просмотреть файл

@ -1369,6 +1369,7 @@ static pmix_status_t register_info(pmix_peer_t *peer,
PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
}
/* get the proc-level data for each proc in the job */
for (rank=0; rank < ns->nprocs; rank++) {
val = NULL;
rc = pmix_hash_fetch(ht, rank, NULL, &val);
@ -1523,6 +1524,7 @@ static pmix_status_t hash_store_job_info(const char *nspace,
pmix_hash_table_t *ht;
char **nodelist = NULL;
pmix_info_t *info, *iptr;
pmix_namespace_t *ns, *nptr;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%u] pmix:gds:hash store job info for nspace %s",
@ -1542,6 +1544,24 @@ static pmix_status_t hash_store_job_info(const char *nspace,
return rc;
}
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, nspace)) {
nptr = ns;
break;
}
}
if (NULL == nptr) {
nptr = PMIX_NEW(pmix_namespace_t);
if (NULL == nptr) {
rc = PMIX_ERR_NOMEM;
return rc;
}
nptr->nspace = strdup(nspace);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* see if we already have a hash table for this nspace */
ht = NULL;
PMIX_LIST_FOREACH(htptr, &myjobs, pmix_job_t) {
@ -1554,6 +1574,8 @@ static pmix_status_t hash_store_job_info(const char *nspace,
/* nope - create one */
htptr = PMIX_NEW(pmix_job_t);
htptr->ns = strdup(nspace);
PMIX_RETAIN(nptr);
htptr->nptr = nptr;
pmix_list_append(&myjobs, &htptr->super);
ht = &htptr->internal;
}
@ -1566,7 +1588,7 @@ static pmix_status_t hash_store_job_info(const char *nspace,
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%u] pmix:gds:hash store job info working key %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) {
if (PMIX_CHECK_KEY(kptr, PMIX_PROC_BLOB)) {
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
@ -1617,7 +1639,7 @@ static pmix_status_t hash_store_job_info(const char *nspace,
/* cleanup */
PMIX_DESTRUCT(&buf2); // releases the original kptr data
PMIX_RELEASE(kp2);
} else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) {
} else if (PMIX_CHECK_KEY(kptr, PMIX_MAP_BLOB)) {
/* transfer the byte object for unpacking */
bo = &(kptr->value->data.bo);
PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
@ -1788,6 +1810,11 @@ static pmix_status_t hash_store_job_info(const char *nspace,
PMIX_RELEASE(kptr);
return rc;
}
/* if this is the job size, then store it in
* the nptr tracker */
if (0 == nptr->nprocs && PMIX_CHECK_KEY(kptr, PMIX_JOB_SIZE)) {
nptr->nprocs = kptr->value->data.uint32;
}
}
PMIX_RELEASE(kptr);
kptr = PMIX_NEW(pmix_kval_t);
@ -1813,11 +1840,12 @@ static pmix_status_t hash_store(const pmix_proc_t *proc,
pmix_job_t *trk, *t;
pmix_status_t rc;
pmix_kval_t *kp;
pmix_namespace_t *ns, *nptr;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:hash:hash_store for proc [%s:%d] key %s type %s scope %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
proc->nspace, proc->rank, kv->key,
"%s gds:hash:hash_store for proc %s key %s type %s scope %s",
PMIX_NAME_PRINT(&pmix_globals.myid),
PMIX_NAME_PRINT(proc), kv->key,
PMIx_Data_type_string(kv->value->type), PMIx_Scope_string(scope));
if (NULL == kv->key) {
@ -1836,6 +1864,26 @@ static pmix_status_t hash_store(const pmix_proc_t *proc,
/* create one */
trk = PMIX_NEW(pmix_job_t);
trk->ns = strdup(proc->nspace);
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, proc->nspace)) {
nptr = ns;
break;
}
}
if (NULL == nptr) {
nptr = PMIX_NEW(pmix_namespace_t);
if (NULL == nptr) {
rc = PMIX_ERR_NOMEM;
PMIX_RELEASE(trk);
return rc;
}
nptr->nspace = strdup(proc->nspace);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
PMIX_RETAIN(nptr);
trk->nptr = nptr;
pmix_list_append(&myjobs, &trk->super);
}
@ -1869,6 +1917,11 @@ static pmix_status_t hash_store(const pmix_proc_t *proc,
}
}
/* if the number of procs for the nspace object is new, then update it */
if (0 == trk->nptr->nprocs && PMIX_CHECK_KEY(kv, PMIX_JOB_SIZE)) {
trk->nptr->nprocs = kv->value->data.uint32;
}
/* store it in the corresponding hash table */
if (PMIX_INTERNAL == scope) {
if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kv))) {
@ -1941,6 +1994,7 @@ static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
pmix_job_t *trk, *t;
pmix_status_t rc = PMIX_SUCCESS;
pmix_kval_t *kv;
pmix_namespace_t *ns, *nptr;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%d] gds:hash:store_modex for nspace %s",
@ -1959,6 +2013,26 @@ static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
/* create one */
trk = PMIX_NEW(pmix_job_t);
trk->ns = strdup(proc->nspace);
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, proc->nspace)) {
nptr = ns;
break;
}
}
if (NULL == nptr) {
nptr = PMIX_NEW(pmix_namespace_t);
if (NULL == nptr) {
rc = PMIX_ERR_NOMEM;
PMIX_RELEASE(trk);
return rc;
}
nptr->nspace = strdup(proc->nspace);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
PMIX_RETAIN(nptr);
trk->nptr = nptr;
pmix_list_append(&myjobs, &trk->super);
}
@ -1973,10 +2047,20 @@ static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
while (PMIX_SUCCESS == rc) {
/* store this in the hash table */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
PMIX_ERROR_LOG(rc);
return rc;
if (PMIX_RANK_UNDEF == proc->rank) {
/* if the rank is undefined, then we store it on the
* remote table of rank=0 as we know that rank must
* always exist */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, 0, kv))) {
PMIX_ERROR_LOG(rc);
return rc;
}
} else {
/* store this in the hash table */
if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
PMIX_ERROR_LOG(rc);
return rc;
}
}
PMIX_RELEASE(kv); // maintain accounting as the hash increments the ref count
/* continue along */
@ -1996,6 +2080,7 @@ static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
static pmix_status_t dohash(pmix_hash_table_t *ht,
const char *key,
pmix_rank_t rank,
bool skip_genvals,
pmix_list_t *kvs)
{
pmix_status_t rc;
@ -2003,6 +2088,7 @@ static pmix_status_t dohash(pmix_hash_table_t *ht,
pmix_kval_t *kv, *k2;
pmix_info_t *info;
size_t n, ninfo;
bool found;
rc = pmix_hash_fetch(ht, rank, key, &val);
if (PMIX_SUCCESS == rc) {
@ -2013,20 +2099,27 @@ static pmix_status_t dohash(pmix_hash_table_t *ht,
PMIX_INFO != val->data.darray->type ||
0 == val->data.darray->size) {
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
PMIX_RELEASE(val);
return PMIX_ERR_NOT_FOUND;
}
info = (pmix_info_t*)val->data.darray->array;
ninfo = val->data.darray->size;
for (n=0; n < ninfo; n++) {
/* if the rank is UNDEF, then we don't want
* anything that starts with "pmix" */
if (skip_genvals &&
0 == strncmp(info[n].key, "pmix", 4)) {
continue;
}
/* see if we already have this on the list */
kv = NULL;
found = false;
PMIX_LIST_FOREACH(k2, kvs, pmix_kval_t) {
if (PMIX_CHECK_KEY(&info[n], k2->key)) {
kv = k2;
found = true;
break;
}
}
if (NULL != kv) {
if (found) {
continue;
}
kv = PMIX_NEW(pmix_kval_t);
@ -2072,7 +2165,7 @@ static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
{
size_t n;
pmix_status_t rc;
uint32_t nid;
uint32_t nid=0;
char *hostname = NULL;
bool found = false;
pmix_nodeinfo_t *nd, *ndptr;
@ -2104,7 +2197,7 @@ static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
/* scan the list of nodes to find the matching entry */
nd = NULL;
PMIX_LIST_FOREACH(ndptr, tgt, pmix_nodeinfo_t) {
if (NULL != hostname && 0 == strcmp(nd->hostname, hostname)) {
if (NULL != hostname && 0 == strcmp(ndptr->hostname, hostname)) {
nd = ndptr;
break;
}
@ -2231,10 +2324,10 @@ static pmix_status_t hash_fetch(const pmix_proc_t *proc,
pmix_rank_t rnk;
pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
"[%s:%u] pmix:gds:hash fetch %s for proc %s:%u on scope %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
"%s pmix:gds:hash fetch %s for proc %s on scope %s",
PMIX_NAME_PRINT(&pmix_globals.myid),
(NULL == key) ? "NULL" : key,
proc->nspace, proc->rank, PMIx_Scope_string(scope));
PMIX_NAME_PRINT(proc), PMIx_Scope_string(scope));
/* if the rank is wildcard and the key is NULL, then
* they are asking for a complete copy of the job-level
@ -2373,7 +2466,7 @@ static pmix_status_t hash_fetch(const pmix_proc_t *proc,
rc = fetch_appinfo(key, &trk->apps, qualifiers, nqual, kvs);
/* if they did, then we are done */
if (PMIX_ERR_DATA_VALUE_NOT_FOUND != rc) {
return rc;
return rc;
}
}
@ -2401,16 +2494,15 @@ static pmix_status_t hash_fetch(const pmix_proc_t *proc,
* be the source */
if (PMIX_RANK_UNDEF == proc->rank) {
for (rnk=0; rnk < trk->nptr->nprocs; rnk++) {
rc = dohash(ht, key, rnk, kvs);
if (PMIX_SUCCESS != rc) {
rc = dohash(ht, key, rnk, true, kvs);
if (PMIX_ERR_NOMEM == rc) {
return rc;
}
if (NULL != key) {
if (PMIX_SUCCESS == rc && NULL != key) {
return rc;
}
}
/* also need to check any info that came in via
* a job-level info array */
/* also need to check any job-level info */
PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
if (NULL == key || PMIX_CHECK_KEY(kvptr, key)) {
kv = PMIX_NEW(pmix_kval_t);
@ -2427,11 +2519,13 @@ static pmix_status_t hash_fetch(const pmix_proc_t *proc,
}
}
}
if (0 == pmix_list_get_size(kvs)) {
rc = PMIX_ERR_NOT_FOUND;
if (NULL == key) {
/* and need to add all job info just in case that was
* passed via a different GDS component */
dohash(&trk->internal, NULL, PMIX_RANK_WILDCARD, false, kvs);
}
} else {
rc = dohash(ht, key, proc->rank, kvs);
rc = dohash(ht, key, proc->rank, false, kvs);
}
if (PMIX_SUCCESS == rc) {
if (PMIX_GLOBAL == scope) {
@ -2459,6 +2553,9 @@ static pmix_status_t hash_fetch(const pmix_proc_t *proc,
}
}
}
if (0 == pmix_list_get_size(kvs)) {
rc = PMIX_ERR_NOT_FOUND;
}
return rc;
}
@ -2549,6 +2646,12 @@ static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
PMIX_ERROR_LOG(rc);
return rc;
}
/* if the rank is UNDEF, then we store this on our own
* rank tables */
if (PMIX_RANK_UNDEF == proct.rank) {
proct.rank = pmix_globals.myid.rank;
}
cnt = 1;
kv = PMIX_NEW(pmix_kval_t);
PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
@ -2558,7 +2661,6 @@ static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
* the kval contains shmem connection info, then the
* component will know what to do about it (or else
* we selected the wrong component for this peer!) */
PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proct, PMIX_INTERNAL, kv);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);

Просмотреть файл

@ -155,7 +155,7 @@ void pmix_pfexec_base_spawn_proc(int sd, short args, void *cbdata)
/* ensure our nspace is on the server global list */
nptr = NULL;
PMIX_LIST_FOREACH(n2, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(n2, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(n2->nspace, pmix_globals.myid.nspace)) {
nptr = n2;
break;
@ -165,7 +165,7 @@ void pmix_pfexec_base_spawn_proc(int sd, short args, void *cbdata)
/* add it */
nptr = PMIX_NEW(pmix_namespace_t);
nptr->nspace = strdup(pmix_globals.myid.nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* mark all children as "registered" so collectives don't falter */
nptr->all_registered = true;

Просмотреть файл

@ -60,7 +60,7 @@ pmix_status_t pmix_pnet_base_allocate(char *nspace,
nptr = NULL;
/* find this nspace - note that it may not have
* been registered yet */
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, nspace)) {
nptr = ns;
break;
@ -73,7 +73,7 @@ pmix_status_t pmix_pnet_base_allocate(char *nspace,
return PMIX_ERR_NOMEM;
}
nptr->nspace = strdup(nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
if (NULL != info) {
@ -144,7 +144,7 @@ pmix_status_t pmix_pnet_base_setup_local_network(char *nspace,
/* find this proc's nspace object */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, nspace)) {
nptr = ns;
break;
@ -157,7 +157,7 @@ pmix_status_t pmix_pnet_base_setup_local_network(char *nspace,
return PMIX_ERR_NOMEM;
}
nptr->nspace = strdup(nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
@ -189,7 +189,7 @@ pmix_status_t pmix_pnet_base_setup_fork(const pmix_proc_t *proc, char ***env)
/* find this proc's nspace object */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, proc->nspace)) {
nptr = ns;
break;
@ -202,7 +202,7 @@ pmix_status_t pmix_pnet_base_setup_fork(const pmix_proc_t *proc, char ***env)
return PMIX_ERR_NOMEM;
}
nptr->nspace = strdup(proc->nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
PMIX_LIST_FOREACH(active, &pmix_pnet_globals.actives, pmix_pnet_base_active_module_t) {
@ -280,7 +280,7 @@ void pmix_pnet_base_deregister_nspace(char *nspace)
/* find this nspace object */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, nspace)) {
nptr = ns;
break;

Просмотреть файл

@ -693,9 +693,14 @@ static pmix_status_t allocate(pmix_namespace_t *nptr,
}
}
if (NULL == nd) {
/* we don't have this node in our list */
rc = PMIX_ERR_NOT_FOUND;
goto cleanup;
/* we don't have this node in our list - so since this
* is a mockup, take the nth node in the list of nodes
* we know about */
p = n % pmix_list_get_size(&mynodes);
nd = (pnet_node_t*)pmix_list_get_first(&mynodes);
for (m=0; m < p; m++) {
nd = (pnet_node_t*)pmix_list_get_next(&nd->super);
}
}
kv = PMIX_NEW(pmix_kval_t);
if (NULL == kv) {
@ -731,6 +736,9 @@ static pmix_status_t allocate(pmix_namespace_t *nptr,
ip2 = (pmix_info_t*)d2->array;
/* start with the rank */
rank = strtoul(locals[m], NULL, 10);
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test:allocate assigning %d endpoints for rank %u",
(int)q, rank);
PMIX_INFO_LOAD(&ip2[0], PMIX_RANK, &rank, PMIX_PROC_RANK);
/* the second element in this array will itself
* be a data array of endpts */
@ -761,6 +769,9 @@ static pmix_status_t allocate(pmix_namespace_t *nptr,
ip2[2].value.data.darray = d3;
coords = (pmix_coord_t*)d3->array;
nic = (pnet_nic_t*)pmix_list_get_first(&nd->nics);
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test:allocate assigning %d coordinates for rank %u",
(int)q, rank);
for (p=0; p < q; p++) {
coords[p].view = PMIX_COORD_LOGICAL_VIEW;
coords[p].dims = 3;
@ -881,6 +892,8 @@ static pmix_status_t setup_local_network(pmix_namespace_t *nptr,
* the rank and the endpts and coords assigned to that rank. This is
* precisely the data we need to cache for the job, so
* just do so) */
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test:setup_local_network caching %d endpts", (int)nvals);
PMIX_GDS_CACHE_JOB_INFO(rc, pmix_globals.mypeer, nptr, iptr, nvals);
if (PMIX_SUCCESS != rc) {
PMIX_RELEASE(kv);
@ -932,24 +945,28 @@ static pmix_status_t setup_fork(pmix_namespace_t *nptr,
static void child_finalized(pmix_proc_t *peer)
{
pmix_output(0, "pnet:test CHILD %s:%d FINALIZED",
peer->nspace, peer->rank);
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test CHILD %s:%d FINALIZED",
peer->nspace, peer->rank);
}
static void local_app_finalized(pmix_namespace_t *nptr)
{
pmix_output(0, "pnet:test NSPACE %s LOCALLY FINALIZED", nptr->nspace);
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test NSPACE %s LOCALLY FINALIZED", nptr->nspace);
}
static void deregister_nspace(pmix_namespace_t *nptr)
{
pmix_output(0, "pnet:test DEREGISTER NSPACE %s", nptr->nspace);
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test DEREGISTER NSPACE %s", nptr->nspace);
}
static pmix_status_t collect_inventory(pmix_info_t directives[], size_t ndirs,
pmix_inventory_cbfunc_t cbfunc, void *cbdata)
{
pmix_output(0, "pnet:test COLLECT INVENTORY");
pmix_output_verbose(2, pmix_pnet_base_framework.framework_output,
"pnet:test COLLECT INVENTORY");
return PMIX_ERR_NOT_SUPPORTED;
}

Просмотреть файл

@ -245,6 +245,7 @@ static int component_register(void)
}
static char *urifile = NULL;
static bool created_rendezvous_file = false;
static pmix_status_t component_open(void)
{
@ -312,7 +313,9 @@ pmix_status_t component_close(void)
free(mca_ptl_tcp_component.pid_filename);
}
if (NULL != mca_ptl_tcp_component.rendezvous_filename) {
unlink(mca_ptl_tcp_component.rendezvous_filename);
if (created_rendezvous_file) {
unlink(mca_ptl_tcp_component.rendezvous_filename);
}
free(mca_ptl_tcp_component.rendezvous_filename);
}
if (NULL != urifile) {
@ -741,6 +744,7 @@ static pmix_status_t setup_listener(pmix_info_t info[], size_t ninfo,
mca_ptl_tcp_component.rendezvous_filename = NULL;
goto sockerror;
}
created_rendezvous_file = true;
}
nextstep:
@ -1412,7 +1416,7 @@ static void connection_handler(int sd, short args, void *cbdata)
* of local clients. So let's start by searching for
* the nspace object */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, nspace)) {
nptr = tmp;
break;
@ -1562,7 +1566,7 @@ static void connection_handler(int sd, short args, void *cbdata)
/* see if we know this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, nspace)) {
nptr = tmp;
break;
@ -1866,7 +1870,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
if (5 != pnd->flag && 8 != pnd->flag) {
PMIX_RETAIN(nptr);
nptr->nspace = strdup(cd->proc.nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
info = PMIX_NEW(pmix_rank_info_t);
info->pname.nspace = strdup(nptr->nspace);
info->pname.rank = cd->proc.rank;
@ -1894,7 +1898,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
peer->nptr->compat.psec = pmix_psec_base_assign_module(pnd->psec);
if (NULL == peer->nptr->compat.psec) {
PMIX_RELEASE(peer);
pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
PMIX_RELEASE(nptr); // will release the info object
CLOSE_THE_SOCKET(pnd->sd);
goto done;
@ -1909,7 +1913,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
PMIX_INFO_DESTRUCT(&ginfo);
if (NULL == peer->nptr->compat.gds) {
PMIX_RELEASE(peer);
pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
PMIX_RELEASE(nptr); // will release the info object
CLOSE_THE_SOCKET(pnd->sd);
goto done;
@ -1928,7 +1932,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
req = PMIX_NEW(pmix_iof_req_t);
if (NULL == req) {
PMIX_RELEASE(peer);
pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
PMIX_RELEASE(nptr); // will release the info object
CLOSE_THE_SOCKET(pnd->sd);
goto done;
@ -1964,7 +1968,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
"validation of tool credentials failed: %s",
PMIx_Error_string(rc));
PMIX_RELEASE(peer);
pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
PMIX_RELEASE(nptr); // will release the info object
CLOSE_THE_SOCKET(pnd->sd);
goto done;
@ -1977,7 +1981,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
PMIX_RELEASE(pnd);
PMIX_RELEASE(cd);
PMIX_RELEASE(peer);
pmix_list_remove_item(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super);
PMIX_RELEASE(nptr); // will release the info object
/* probably cannot send an error reply if we are out of memory */
return;

Просмотреть файл

@ -552,7 +552,7 @@ static void connection_handler(int sd, short args, void *cbdata)
/* see if we know this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, nspace)) {
nptr = tmp;
break;

Просмотреть файл

@ -13,8 +13,8 @@
* Copyright (c) 2010-2015 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -124,6 +124,8 @@ void pmix_rte_finalize(void)
PMIX_DESTRUCT(&pmix_globals.notifications);
PMIX_LIST_DESTRUCT(&pmix_globals.iof_requests);
PMIX_LIST_DESTRUCT(&pmix_globals.stdin_targets);
free(pmix_globals.hostname);
PMIX_LIST_DESTRUCT(&pmix_globals.nspaces);
/* now safe to release the event base */
if (!pmix_globals.external_evbase) {

Просмотреть файл

@ -178,6 +178,8 @@ int pmix_rte_init(pmix_proc_type_t type,
ret = pmix_hotel_init(&pmix_globals.notifications, pmix_globals.max_events,
pmix_globals.evbase, pmix_globals.event_eviction_time,
_notification_eviction_cbfunc);
PMIX_CONSTRUCT(&pmix_globals.nspaces, pmix_list_t);
if (PMIX_SUCCESS != ret) {
error = "notification hotel init";
goto return_error;

Просмотреть файл

@ -96,7 +96,6 @@ pmix_status_t pmix_server_initialize(void)
PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.nspaces, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.groups, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.iof, pmix_list_t);
@ -366,7 +365,7 @@ PMIX_EXPORT pmix_status_t PMIx_server_init(pmix_server_module_t *module,
pmix_globals.mypeer->nptr = PMIX_NEW(pmix_namespace_t);
/* ensure our own nspace is first on the list */
PMIX_RETAIN(pmix_globals.mypeer->nptr);
pmix_list_prepend(&pmix_server_globals.nspaces, &pmix_globals.mypeer->nptr->super);
pmix_list_prepend(&pmix_globals.nspaces, &pmix_globals.mypeer->nptr->super);
}
pmix_globals.mypeer->nptr->nspace = strdup(pmix_globals.myid.nspace);
rinfo->pname.nspace = strdup(pmix_globals.mypeer->nptr->nspace);
@ -489,13 +488,12 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void)
PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs);
PMIX_LIST_DESTRUCT(&pmix_server_globals.gdata);
PMIX_LIST_DESTRUCT(&pmix_server_globals.events);
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
/* ensure that we do the specified cleanup - if this is an
* abnormal termination, then the nspace object may not be
* at zero refcount */
pmix_execute_epilog(&ns->epilog);
}
PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces);
PMIX_LIST_DESTRUCT(&pmix_server_globals.groups);
PMIX_LIST_DESTRUCT(&pmix_server_globals.iof);
@ -562,7 +560,7 @@ static void _register_nspace(int sd, short args, void *cbdata)
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
nptr = tmp;
break;
@ -575,7 +573,7 @@ static void _register_nspace(int sd, short args, void *cbdata)
goto release;
}
nptr->nspace = strdup(cd->proc.nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
nptr->nlocalprocs = cd->nlocalprocs;
@ -779,12 +777,12 @@ static void _deregister_nspace(int sd, short args, void *cbdata)
pmix_server_purge_events(NULL, &cd->proc);
/* release this nspace */
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (PMIX_CHECK_NSPACE(tmp->nspace, cd->proc.nspace)) {
/* perform any nspace-level epilog */
pmix_execute_epilog(&tmp->epilog);
/* remove and release it */
pmix_list_remove_item(&pmix_server_globals.nspaces, &tmp->super);
pmix_list_remove_item(&pmix_globals.nspaces, &tmp->super);
PMIX_RELEASE(tmp);
break;
}
@ -1012,7 +1010,7 @@ static void _register_client(int sd, short args, void *cbdata)
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
nptr = ns;
break;
@ -1025,7 +1023,7 @@ static void _register_client(int sd, short args, void *cbdata)
goto cleanup;
}
nptr->nspace = strdup(cd->proc.nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* setup a peer object for this client - since the host server
* only deals with the original processes and not any clones,
@ -1066,7 +1064,7 @@ static void _register_client(int sd, short args, void *cbdata)
* if the nspaces are all defined */
if (all_def) {
/* so far, they have all been defined - check this one */
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 < ns->nlocalprocs &&
0 == strcmp(trk->pcs[i].nspace, ns->nspace)) {
all_def = ns->all_registered;
@ -1181,7 +1179,7 @@ static void _deregister_client(int sd, short args, void *cbdata)
/* see if we already have this nspace */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, cd->proc.nspace)) {
nptr = tmp;
break;
@ -1391,7 +1389,7 @@ static void _dmodex_req(int sd, short args, void *cbdata)
* been informed of it - so first check to see if we know
* about this nspace yet */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(ns->nspace, cd->proc.nspace)) {
nptr = ns;
break;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
@ -50,6 +50,7 @@
#include "src/mca/gds/gds.h"
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/name_fns.h"
#include "src/util/output.h"
#include "src/util/pmix_environ.h"
@ -190,7 +191,7 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf,
/* find the nspace object for this client */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(nspace, ns->nspace)) {
nptr = ns;
break;
@ -603,6 +604,7 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
char *data = NULL;
size_t sz = 0;
pmix_scope_t scope = PMIX_SCOPE_UNDEF;
bool diffnspace = false;
pmix_output_verbose(2, pmix_server_globals.get_output,
"%s:%d SATISFY REQUEST CALLED",
@ -616,10 +618,18 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
pmix_strncpy(proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
/* if we have local clients of this nspace, then we use
* the corresponding GDS to retrieve the data. Otherwise,
* the data will have been stored under our GDS */
if (0 < nptr->nlocalprocs) {
if (!PMIX_CHECK_NSPACE(nptr->nspace, cd->peer->info->pname.nspace)) {
diffnspace = true;
}
/* if rank is PMIX_RANK_UNDEF, then it was stored in our GDS */
if (PMIX_RANK_UNDEF == rank) {
scope = PMIX_GLOBAL; // we have to search everywhere
peer = pmix_globals.mypeer;
} else if (0 < nptr->nlocalprocs) {
/* if we have local clients of this nspace, then we use
* the corresponding GDS to retrieve the data. Otherwise,
* the data will have been stored under our GDS */
if (local) {
*local = true;
}
@ -659,8 +669,7 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
/* if they are asking about a rank from an nspace different
* from their own, or they gave a rank of "wildcard", then
* include a copy of the job-level info */
if (PMIX_RANK_WILDCARD == rank ||
0 != strncmp(nptr->nspace, cd->peer->info->pname.nspace, PMIX_MAX_NSLEN)) {
if (PMIX_RANK_WILDCARD == rank || diffnspace) {
proc.rank = PMIX_RANK_WILDCARD;
PMIX_CONSTRUCT(&cb, pmix_cb_t);
/* this data is requested by a local client, so give the gds the option
@ -673,7 +682,7 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
if (PMIX_SUCCESS == rc) {
PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
/* assemble the provided data into a byte object */
PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
PMIX_GDS_ASSEMB_KVS_REQ(rc, pmix_globals.mypeer, &proc, &cb.kvs, &pkt, cd);
if (rc != PMIX_SUCCESS) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&pkt);
@ -719,7 +728,8 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
/* retrieve the data for the specific rank they are asking about */
if (PMIX_RANK_WILDCARD != rank) {
if (!PMIX_PROC_IS_SERVER(peer) && !peer->commit_cnt) {
if (!PMIX_PROC_IS_SERVER(peer) && 0 == peer->commit_cnt) {
PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
/* this condition works only for local requests, server does
* count commits for local ranks, and check this count when
* local request.
@ -742,7 +752,11 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
found = true;
PMIX_CONSTRUCT(&pkt, pmix_buffer_t);
/* assemble the provided data into a byte object */
PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
if (PMIX_RANK_UNDEF == rank || diffnspace) {
PMIX_GDS_ASSEMB_KVS_REQ(rc, pmix_globals.mypeer, &proc, &cb.kvs, &pkt, cd);
} else {
PMIX_GDS_ASSEMB_KVS_REQ(rc, cd->peer, &proc, &cb.kvs, &pkt, cd);
}
if (rc != PMIX_SUCCESS) {
PMIX_ERROR_LOG(rc);
PMIX_DESTRUCT(&pkt);
@ -788,6 +802,7 @@ static pmix_status_t _satisfy_request(pmix_namespace_t *nptr, pmix_rank_t rank,
}
PMIX_DESTRUCT(&cb);
}
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
PMIX_DESTRUCT(&pbkt);
@ -895,7 +910,7 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
/* find the nspace object for the proc whose data is being received */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) {
nptr = ns;
break;
@ -909,7 +924,7 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
nptr = PMIX_NEW(pmix_namespace_t);
nptr->nspace = strdup(caddy->lcd->proc.nspace);
/* add to the list */
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* if the request was successfully satisfied, then store the data.

Просмотреть файл

@ -442,7 +442,7 @@ static pmix_server_trkr_t* new_tracker(char *id, pmix_proc_t *procs,
}
/* is this nspace known to us? */
nptr = NULL;
PMIX_LIST_FOREACH(ns, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(procs[i].nspace, ns->nspace)) {
nptr = ns;
break;
@ -3025,7 +3025,7 @@ pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer,
for (n=0; n < cd->ntargets; n++) {
/* find the nspace of this proc */
nptr = NULL;
PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) {
PMIX_LIST_FOREACH(tmp, &pmix_globals.nspaces, pmix_namespace_t) {
if (0 == strcmp(tmp->nspace, cd->targets[n].nspace)) {
nptr = tmp;
break;
@ -3038,7 +3038,7 @@ pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer,
goto exit;
}
nptr->nspace = strdup(cd->targets[n].nspace);
pmix_list_append(&pmix_server_globals.nspaces, &nptr->super);
pmix_list_append(&pmix_globals.nspaces, &nptr->super);
}
/* if the rank is wildcard, then we use the epilog for the nspace */
if (PMIX_RANK_WILDCARD == cd->targets[n].rank) {

Просмотреть файл

@ -1254,7 +1254,6 @@ PMIX_EXPORT pmix_status_t PMIx_tool_finalize(void)
PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs);
PMIX_LIST_DESTRUCT(&pmix_server_globals.gdata);
PMIX_LIST_DESTRUCT(&pmix_server_globals.events);
PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces);
PMIX_LIST_DESTRUCT(&pmix_server_globals.iof);
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2019 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -98,6 +98,7 @@ char* pmix_util_print_name_args(const pmix_proc_t *name)
{
pmix_print_args_buffers_t *ptr;
char *rank;
int index;
/* get the next buffer */
ptr = get_print_name_buffer();
@ -105,29 +106,36 @@ char* pmix_util_print_name_args(const pmix_proc_t *name)
PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE);
return pmix_print_args_null;
}
/* cycle around the ring */
if (PMIX_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
/* protect against NULL names */
if (NULL == name) {
snprintf(ptr->buffers[ptr->cntr++], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "[NO-NAME]");
return ptr->buffers[ptr->cntr-1];
index = ptr->cntr;
snprintf(ptr->buffers[index], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "[NO-NAME]");
ptr->cntr++;
if (PMIX_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
return ptr->buffers[index];
}
rank = pmix_util_print_rank(name->rank);
snprintf(ptr->buffers[ptr->cntr++],
index = ptr->cntr;
snprintf(ptr->buffers[index],
PMIX_PRINT_NAME_ARGS_MAX_SIZE,
"[%s,%s]", name->nspace, rank);
"[%s:%s]", name->nspace, rank);
ptr->cntr++;
if (PMIX_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
return ptr->buffers[ptr->cntr-1];
return ptr->buffers[index];
}
char* pmix_util_print_rank(const pmix_rank_t vpid)
{
pmix_print_args_buffers_t *ptr;
int index;
ptr = get_print_name_buffer();
@ -136,19 +144,19 @@ char* pmix_util_print_rank(const pmix_rank_t vpid)
return pmix_print_args_null;
}
/* cycle around the ring */
if (PMIX_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
index = ptr->cntr;
if (PMIX_RANK_UNDEF == vpid) {
snprintf(ptr->buffers[ptr->cntr++], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "UNDEF");
snprintf(ptr->buffers[index], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "UNDEF");
} else if (PMIX_RANK_WILDCARD == vpid) {
snprintf(ptr->buffers[ptr->cntr++], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "WILDCARD");
snprintf(ptr->buffers[index], PMIX_PRINT_NAME_ARGS_MAX_SIZE, "WILDCARD");
} else {
snprintf(ptr->buffers[ptr->cntr++],
snprintf(ptr->buffers[index],
PMIX_PRINT_NAME_ARGS_MAX_SIZE,
"%ld", (long)vpid);
}
return ptr->buffers[ptr->cntr-1];
ptr->cntr++;
if (PMIX_PRINT_NAME_ARG_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
return ptr->buffers[index];
}

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
@ -24,6 +24,9 @@ static int _legacy = 0;
/* Verbose level 0-silent, 1-fatal, 2-error, 3+ debug*/
static int _verbose = 1;
static int spawned, size, rank=-1, appnum;
static char jobid[255];
static void log_fatal(const char *format, ...)
{
va_list arglist;
@ -36,7 +39,7 @@ static void log_fatal(const char *format, ...)
va_end(arglist);
return;
}
fprintf(stderr, "FATAL: %s", output);
fprintf(stderr, "%d:FATAL: %s", rank, output);
free(output);
}
va_end(arglist);
@ -54,7 +57,7 @@ static void log_error(const char *format, ...)
va_end(arglist);
return;
}
fprintf(stderr, "ERROR: %s", output);
fprintf(stderr, "%d:ERROR: %s", rank, output);
free(output);
}
va_end(arglist);
@ -72,7 +75,7 @@ static void log_info(const char *format, ...)
va_end(arglist);
return;
}
fprintf(stderr, "INFO: %s", output);
fprintf(stderr, "%d:INFO: %s", rank, output);
free(output);
}
va_end(arglist);
@ -81,7 +84,7 @@ static void log_info(const char *format, ...)
#define log_assert(e, msg) \
do { \
if (!(e)) { \
log_fatal("%s at %s:%d\n", msg, __func__, __LINE__); \
log_fatal("%d:%s at %s:%d\n", rank, msg, __func__, __LINE__); \
rc = -1; \
} \
} while (0)
@ -99,10 +102,6 @@ static int test_item5(void);
static int test_item6(void);
static int test_item7(void);
static int spawned, size, rank, appnum;
static char jobid[255];
int main(int argc, char **argv)
{
int ret = 0;
@ -372,21 +371,24 @@ static int test_item6(void)
{
int rc = 0;
char val[100];
const char *tkey = __func__;
char *tkey;
const char *tval = __FILE__;
asprintf(&tkey, "%d:%s", rank, __func__);
if (PMI_SUCCESS != (rc = PMI_KVS_Put(jobid, tkey, tval))) {
log_fatal("PMI_KVS_Put %d\n", rc);
free(tkey);
return rc;
}
if (PMI_SUCCESS != (rc = PMI_KVS_Get(jobid, tkey, val, sizeof(val)))) {
log_fatal("PMI_KVS_Get %d\n", rc);
free(tkey);
return rc;
}
log_info("tkey=%s tval=%s val=%s\n", tkey, tval, val);
free(tkey);
log_assert(!strcmp(tval, val), "value does not meet expectation");
return rc;
@ -398,16 +400,16 @@ static int test_item7(void)
char tkey[100];
char tval[100];
char val[100];
int i = 0;
int i = 0, j;
log_info("TEST7\n");
for (i = 0; i < size; i++) {
sprintf(tkey, "KEY-%d", i);
sprintf(tkey, "%d:KEY-%d", rank, i);
sprintf(tval, "VALUE-%d", i);
if (i == rank) {
if (PMI_SUCCESS != (rc = PMI_KVS_Put(jobid, tkey, tval))) {
log_fatal("PMI_KVS_Put [%s=%s] %d\n", tkey, tval, rc);
return rc;
}
if (PMI_SUCCESS != (rc = PMI_KVS_Put(jobid, tkey, tval))) {
log_fatal("PMI_KVS_Put [%s=%s] %d\n", tkey, tval, rc);
return rc;
}
}
@ -416,22 +418,27 @@ static int test_item7(void)
return rc;
}
log_info("BARRIER\n");
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
log_fatal("PMI_Barrier %d\n", rc);
return rc;
}
for (i = 0; i < size; i++) {
sprintf(tkey, "KEY-%d", i);
sprintf(tval, "VALUE-%d", i);
if (PMI_SUCCESS != (rc = PMI_KVS_Get(jobid, tkey, val, sizeof(val)))) {
log_fatal("PMI_KVS_Get [%s=?] %d\n", tkey, rc);
return rc;
for (j=0; j < size; j++) {
sprintf(tkey, "%d:KEY-%d", i, j);
sprintf(tval, "VALUE-%d", j);
log_info("Get key %s\n", tkey);
if (PMI_SUCCESS != (rc = PMI_KVS_Get(jobid, tkey, val, sizeof(val)))) {
log_fatal("PMI_KVS_Get [%s=?] %d\n", tkey, rc);
return rc;
}
log_info("tkey=%s tval=%s val=%s\n", tkey, tval, val);
log_assert(!strcmp(tval, val), "value does not meet expectation");
}
log_info("tkey=%s tval=%s val=%s\n", tkey, tval, val);
log_assert(!strcmp(tval, val), "value does not meet expectation");
}
return rc;

Просмотреть файл

@ -86,7 +86,7 @@ simplegacy_SOURCES = \
simplegacy.c
simplegacy_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS)
simplegacy_LDADD = \
$(top_builddir)/src/libpmix.la
$(top_builddir)/src/libpmi.la
simptimeout_SOURCES = \
simptimeout.c

Просмотреть файл

@ -197,17 +197,17 @@ int main(int argc, char **argv)
}
/* get our universe size */
/* get our job size */
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Get universe size failed: %s",
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_JOB_SIZE, NULL, 0, &val))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Get job size failed: %s",
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
goto done;
}
nprocs = val->data.uint32;
PMIX_VALUE_RELEASE(val);
pmix_output(0, "Client %s:%d universe size %d", myproc.nspace, myproc.rank, nprocs);
pmix_output(0, "Client %s:%d job size %d", myproc.nspace, myproc.rank, nprocs);
/* get our assigned network endpts */
if (PMIX_SUCCESS != (rc = PMIx_Get(&myproc, PMIX_NETWORK_ENDPT, NULL, 0, &val))) {

Просмотреть файл

@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved.
* $COPYRIGHT$
*
@ -24,230 +24,90 @@
*/
#include <src/include/pmix_config.h>
#include <pmix.h>
#include <pmi.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include "src/class/pmix_object.h"
#include "src/util/output.h"
#include "src/util/printf.h"
#define MAXCNT 3
static volatile bool completed = false;
static pmix_proc_t myproc;
static void notification_fn(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo,
pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
pmix_output(0, "Client %s:%d NOTIFIED with status %s", myproc.nspace, myproc.rank, PMIx_Error_string(status));
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
}
completed = true;
}
static void errhandler_reg_callbk(pmix_status_t status,
size_t errhandler_ref,
void *cbdata)
{
volatile bool *active = (volatile bool*)cbdata;
pmix_output(0, "Client: ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu",
status, (unsigned long)errhandler_ref);
*active = false;
}
/* this is an event notification function that we explicitly request
* be called when the PMIX_MODEL_DECLARED notification is issued.
* We could catch it in the general event notification function and test
* the status to see if the status matched, but it often is simpler
* to declare a use-specific notification callback point. In this case,
* we are asking to know whenever a model is declared as a means
* of testing server self-notification */
static void model_callback(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo,
pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
size_t n;
/* just let us know it was received */
fprintf(stderr, "%s:%d Model event handler called with status %d(%s)\n",
myproc.nspace, myproc.rank, status, PMIx_Error_string(status));
for (n=0; n < ninfo; n++) {
if (PMIX_STRING == info[n].value.type) {
fprintf(stderr, "%s:%d\t%s:\t%s\n",
myproc.nspace, myproc.rank,
info[n].key, info[n].value.data.string);
}
}
/* we must NOT tell the event handler state machine that we
* are the last step as that will prevent it from notifying
* anyone else that might be listening for declarations */
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
}
}
/* event handler registration is done asynchronously */
static void model_registration_callback(pmix_status_t status,
size_t evhandler_ref,
void *cbdata)
{
volatile int *active = (volatile int*)cbdata;
fprintf(stderr, "simpclient EVENT HANDLER REGISTRATION RETURN STATUS %d, ref=%lu\n",
status, (unsigned long)evhandler_ref);
*active = false;
}
int main(int argc, char **argv)
{
int rc;
pmix_value_t value;
pmix_value_t *val = &value;
int rc, j, n;
char *tmp;
pmix_proc_t proc;
uint32_t nprocs, n;
int cnt, j;
volatile bool active;
pmix_info_t info, *iptr;
size_t ninfo;
pmix_status_t code;
int spawned;
int rank;
int nprocs;
char value[1024];
/* init us and declare we are a test programming model */
PMIX_INFO_CREATE(iptr, 2);
PMIX_INFO_LOAD(&iptr[0], PMIX_PROGRAMMING_MODEL, "TEST", PMIX_STRING);
PMIX_INFO_LOAD(&iptr[1], PMIX_MODEL_LIBRARY_NAME, "PMIX", PMIX_STRING);
if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, iptr, 2))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Init failed: %s",
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
fprintf(stderr, "Client calling init\n");
if (PMI_SUCCESS != (rc = PMI_Init(&spawned))) {
fprintf(stderr, "Client PMI_Init failed: %d\n", rc);
exit(rc);
}
PMIX_INFO_FREE(iptr, 2);
pmix_output(0, "Client ns %s rank %d: Running", myproc.nspace, myproc.rank);
fprintf(stderr, "Client Running\n");
/* test something */
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_JOB_SIZE, NULL, 0, &val))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Get failed: %s",
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
if (PMI_SUCCESS != (rc = PMI_Get_rank(&rank))) {
fprintf(stderr, "Client PMI_Get_rank failed: %d\n", rc);
exit(rc);
}
nprocs = val->data .uint32;
PMIX_VALUE_RELEASE(val);
pmix_output(0, "Client %s:%d universe size %d", myproc.nspace, myproc.rank, nprocs);
/* register a handler specifically for when models declare */
active = true;
ninfo = 1;
PMIX_INFO_CREATE(iptr, ninfo);
PMIX_INFO_LOAD(&iptr[0], PMIX_EVENT_HDLR_NAME, "SIMPCLIENT-MODEL", PMIX_STRING);
code = PMIX_MODEL_DECLARED;
PMIx_Register_event_handler(&code, 1, iptr, ninfo,
model_callback, model_registration_callback, (void*)&active);
while (active) {
usleep(10);
if (PMI_SUCCESS != (rc = PMI_Get_universe_size(&nprocs))) {
fprintf(stderr, "Client %d: PMI_Get_universe_size failed: %d\n", rank, rc);
exit(rc);
}
PMIX_INFO_FREE(iptr, ninfo);
fprintf(stderr, "Client %d job size %d\n", rank, nprocs);
/* register our errhandler */
active = true;
PMIx_Register_event_handler(NULL, 0, NULL, 0,
notification_fn, errhandler_reg_callbk, (void*)&active);
while (active) {
usleep(10);
for (j=0; j < 10; j++) {
(void)asprintf(&tmp, "%d-gasnet-0-%d", rank, j);
if (PMI_SUCCESS != (rc = PMI_KVS_Put("foobar", tmp, "myvalue"))) {
fprintf(stderr, "Client %d: j %d PMI_KVS_Put failed: %d\n",
rank, j, rc);
goto done;
}
free(tmp);
}
memset(&info, 0, sizeof(pmix_info_t));
(void)strncpy(info.key, PMIX_COLLECT_DATA, PMIX_MAX_KEYLEN);
info.value.type = PMIX_UNDEF;
info.value.data.flag = 1;
if (PMIX_SUCCESS != (rc = PMI_KVS_Commit("foobar"))) {
fprintf(stderr, "Client %d: PMI_KVS_Commit failed: %d\n", rank, rc);
goto done;
}
for (cnt=0; cnt < MAXCNT; cnt++) {
pmix_output(0, "EXECUTING LOOP %d", cnt);
for (j=0; j < 10; j++) {
(void)asprintf(&tmp, "%s-%d-gasnet-%d-%d", myproc.nspace, myproc.rank, cnt, j);
value.type = PMIX_UINT64;
value.data.uint64 = 1234;
if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_GLOBAL, tmp, &value))) {
pmix_output(0, "Client ns %s rank %d: PMIx_Put failed: %s",
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
goto done;
fprintf(stderr, "Client rank %d: CALLING PMI_Barrier\n", rank);
/* call fence to ensure the data is received */
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
fprintf(stderr, "Client %d: PMI_Barrier failed: %d\n", rank, rc);
goto done;
}
/* check the returned data */
for (j=0; j < 10; j++) {
for (n=0; n < nprocs; n++) {
(void)asprintf(&tmp, "%d-gasnet-0-%d", n, j);
fprintf(stderr, "Client %d: Calling get\n", rank);
if (PMI_SUCCESS != (rc = PMI_KVS_Get("foobar", tmp, value, 1024))) {
fprintf(stderr, "Client %d: PMI_Get failed: %d\n", rank, rc);
continue;
}
if (0 == strcmp(value, "myvalue")) {
fprintf(stderr, "Client %d: PMI_Get returned correct value\n", rank);
} else {
fprintf(stderr, "Client %d: PMI_Get returned incorrect value\n", rank);
}
free(tmp);
}
if (PMIX_SUCCESS != (rc = PMIx_Commit())) {
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Commit failed: %s",
myproc.nspace, myproc.rank, cnt, PMIx_Error_string(rc));
goto done;
}
/* call fence to ensure the data is received */
if (PMIX_SUCCESS != (rc = PMIx_Fence(NULL, 0, &info, 1))) {
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Fence failed: %s",
myproc.nspace, myproc.rank, cnt, PMIx_Error_string(rc));
goto done;
}
/* check the returned data */
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_UNDEF;
for (j=0; j < 10; j++) {
for (n=0; n < nprocs; n++) {
(void)asprintf(&tmp, "%s-%d-gasnet-%d-%d", myproc.nspace, n, cnt, j);
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) {
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s failed: %s",
myproc.nspace, myproc.rank, j, tmp, PMIx_Error_string(rc));
continue;
}
if (NULL == val) {
pmix_output(0, "Client ns %s rank %d: NULL value returned",
myproc.nspace, myproc.rank);
break;
}
if (PMIX_UINT64 != val->type) {
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned wrong type: %d", myproc.nspace, myproc.rank, j, tmp, val->type);
PMIX_VALUE_RELEASE(val);
free(tmp);
continue;
}
if (1234 != val->data.uint64) {
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned wrong value: %d", myproc.nspace, myproc.rank, j, tmp, (int)val->data.uint64);
PMIX_VALUE_RELEASE(val);
free(tmp);
continue;
}
pmix_output(0, "Client ns %s rank %d cnt %d: PMIx_Get %s returned correct", myproc.nspace, myproc.rank, j, tmp);
PMIX_VALUE_RELEASE(val);
free(tmp);
}
}
}
done:
/* finalize us */
pmix_output(0, "Client ns %s rank %d: Finalizing", myproc.nspace, myproc.rank);
if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) {
fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %s\n",
myproc.nspace, myproc.rank, PMIx_Error_string(rc));
fprintf(stderr, "Client rank %d: Finalizing\n", rank);
if (PMI_SUCCESS != (rc = PMI_Finalize())) {
fprintf(stderr, "Client rank %d: finalize failed %d\n", rank, rc);
} else {
fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank);
fprintf(stderr, "Client %d:PMI_Finalize successfully completed\n", rank);
}
fflush(stderr);
return(rc);