1
1
openmpi/opal/mca/pmix/flux/pmix_flux.c
Ralph Castain 215d6290e0 Add a flux component for LLNL
Fine tuning of flux component
Fix a few minor issues with the initial cut:
* Job id could be obtained from the PMI kvsname like SLURM,
  but simpler to getenv (FLUX_JOB_ID)
* Flux pmi-1 doesn't define PMI_BOOL, PMI_TRUE, PMI_FALSE
* Flux pmi-1 maps the deprecated PMI_Get_kvs_domain_id() to
  PMI_KVS_Get_my_name() internally, so just call that instead.
* Drop residual slurm references.

Add wrappers for PMI functions so that if HAVE_FLUX_PMI_LIBRARY
is not defined, the component can dlopen libpmi.so at location
specified by the FLUX_PMI_LIBRARY_PATH env variable, which adds
flexibility.  If HAVE_FLUX_PMI_LIBRARY is defined, link with
libpmi.so at build time in the usual way.

Update configury for flux component

Update m4 so the configure options work as follows:

 --with-flux-pmi
      Build Flux PMI support (default: yes)

 --with-flux-pmi-library
      Link Flux PMI support with PMI library at build
      time. Otherwise the library is opened at runtime at
      location specified by FLUX_PMI_LIBRARY_PATH environment
      variable. Use this option to enable Flux support when
      building statically or without dlopen support (default: no)

If the latter option is provided, the library/header is located at
build time using the pkg-config module 'flux-pmi'.  Otherwise there
is no library/header dependency.

Handle the case where ompi is configured with --disable-dlopen
or --enable-statkc.  In those cases, don't build the component
unless --with-flux-pmi-library is provided.

It is fatal if the user explicitly requests --with-flux-pmi but
it cannot be built (e.g. due to --disable-dlopen).

Add a schizo/flux component

Update schizo/flux component

Eliminate slurm-specific usage cases.

Since the module is only loaded if FLUX_JOB_ID is set, there are
only two cases to handle:

1) App was launched indirectly through mpirun.  This is not yet
supported with Flux, but hook remains in case this mode is supported
in the future.

2) App was launched directly by Flux, with Flux providing
CPU binding, if any.

Fix up white space in pmix/flux component

Drop non-blocking fence from pmix:flux component

The flux PMI-1 library is not thread safe, therefore
register a regular blocking fence callback instead of the
thread-shifting fencenb().

pmix/flux component avoids extra PMI_KVS_Gets

Keys stored into the base cache under the wildcard
rank are not intended to be part of the global key namespace.
These keys therefore should not trigger a PMI_KVS_Get() if they
are not found in the cache.

Minor pmix/flux component cleanup

pmix/flux: drop code for fetching unused pmix_id

pmix/flux: err_exit must return error

Problem: in flux_init(), although 'ret' (variable holding
err_exit return code) is initialized to OPAL_ERROR, the
variable is reused as a temporary result code, so if there are
some successes followed by a failure that doesn't set 'ret',
flux_init() could return success with PMI not initialized.

Ensure that a "goto err_exit" returns OPAL_ERROR if 'ret'
is not set to some other error code.

pmix/flux: don't mix OPAL_ and PMI_ return codes

Problem: flux_init() can return both PMI_ and OPAL_ return
codes.  Although OPAL_SUCCESS and PMI_SUCCESS are both defined
as 0, other codes are not compatible.

Ensure that flux_init() consistently uses 'rc' for PMI_
return codes and 'ret' for OPAL_ return codes.

pmix/flux: factor out repeated code for cache put

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
2016-12-16 18:26:38 -08:00

785 строки
24 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal_config.h"
#include "opal/constants.h"
#include "opal/types.h"
#include "opal_stdint.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/proc.h"
#include "opal/util/show_help.h"
#include <string.h>
#if defined (HAVE_FLUX_PMI_LIBRARY)
#include <pmi.h>
#else
#include <dlfcn.h>
#endif
#include "opal/mca/pmix/base/base.h"
#include "opal/mca/pmix/base/pmix_base_fns.h"
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_flux.h"
static int flux_init(void);
static int flux_fini(void);
static int flux_initialized(void);
static int flux_abort(int flag, const char msg[],
opal_list_t *procs);
static int flux_commit(void);
static int flux_fence(opal_list_t *procs, int collect_data);
static int flux_put(opal_pmix_scope_t scope,
opal_value_t *kv);
static int flux_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv);
static int flux_publish(opal_list_t *info);
static int flux_lookup(opal_list_t *data, opal_list_t *info);
static int flux_unpublish(char **keys, opal_list_t *info);
static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
static int flux_job_connect(opal_list_t *procs);
static int flux_job_disconnect(opal_list_t *procs);
static int flux_store_local(const opal_process_name_t *proc,
opal_value_t *val);
static const char *flux_get_nspace(opal_jobid_t jobid);
static void flux_register_jobid(opal_jobid_t jobid, const char *nspace);
const opal_pmix_base_module_t opal_pmix_flux_module = {
.init = flux_init,
.finalize = flux_fini,
.initialized = flux_initialized,
.abort = flux_abort,
.commit = flux_commit,
.fence = flux_fence,
.put = flux_put,
.get = flux_get,
.publish = flux_publish,
.lookup = flux_lookup,
.unpublish = flux_unpublish,
.spawn = flux_spawn,
.connect = flux_job_connect,
.disconnect = flux_job_disconnect,
.register_evhandler = opal_pmix_base_register_handler,
.deregister_evhandler = opal_pmix_base_deregister_handler,
.store_local = flux_store_local,
.get_nspace = flux_get_nspace,
.register_jobid = flux_register_jobid
};
// usage accounting
static int pmix_init_count = 0;
// PMI constant values:
static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description
static char *pmix_kvs_name = NULL;
static bool flux_committed = false;
static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0;
static opal_process_name_t flux_pname;
static int *lranks = NULL, nlranks;
static char* pmix_error(int pmix_err);
#define OPAL_PMI_ERROR(pmi_err, pmi_func) \
do { \
opal_output(0, "%s [%s:%d:%s]: %s\n", \
pmi_func, __FILE__, __LINE__, __func__, \
pmix_error(pmi_err)); \
} while(0);
#if !defined (HAVE_FLUX_PMI_LIBRARY)
//
// Wrapper functions for dlopened() PMI library.
//
#define PMI_SUCCESS 0
#define PMI_FAIL -1
#define PMI_ERR_INIT 1
#define PMI_ERR_NOMEM 2
#define PMI_ERR_INVALID_ARG 3
#define PMI_ERR_INVALID_KEY 4
#define PMI_ERR_INVALID_KEY_LENGTH 5
#define PMI_ERR_INVALID_VAL 6
#define PMI_ERR_INVALID_VAL_LENGTH 7
#define PMI_ERR_INVALID_LENGTH 8
#define PMI_ERR_INVALID_NUM_ARGS 9
#define PMI_ERR_INVALID_ARGS 10
#define PMI_ERR_INVALID_NUM_PARSED 11
#define PMI_ERR_INVALID_KEYVALP 12
#define PMI_ERR_INVALID_SIZE 13
static void *dso = NULL;
static int PMI_Init (int *spawned)
{
int (*f)(int *);
if (!dso) {
const char *path;
if ((path = getenv ("FLUX_PMI_LIBRARY_PATH")))
dso = dlopen (path, RTLD_NOW | RTLD_GLOBAL);
if (!dso)
return PMI_FAIL;
}
*(void **)(&f) = dlsym (dso, "PMI_Init");
return f ? f (spawned) : PMI_FAIL;
}
static int PMI_Initialized (int *initialized)
{
int (*f)(int *);
if (!dso) {
if (initialized)
*initialized = 0;
return PMI_SUCCESS;
}
*(void **)(&f) = dlsym (dso, "PMI_Initialized");
return f ? f (initialized) : PMI_FAIL;
}
static int PMI_Finalize (void)
{
int (*f)(void);
int rc;
if (!dso)
return PMI_SUCCESS;
*(void **)(&f) = dlsym (dso, "PMI_Finalize");
rc = f ? f () : PMI_FAIL;
dlclose (dso);
return rc;
}
static int PMI_Get_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_rank (int *rank)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_rank") : NULL;
return f ? f (rank) : PMI_FAIL;
}
static int PMI_Get_universe_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_universe_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_appnum (int *appnum)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_appnum") : NULL;
return f ? f (appnum) : PMI_FAIL;
}
static int PMI_Barrier (void)
{
int (*f)(void);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Barrier") : NULL;
return f ? f () : PMI_FAIL;
}
static int PMI_Abort (int exit_code, const char *error_msg)
{
int (*f)(int, const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Abort") : NULL;
return f ? f (exit_code, error_msg) : PMI_FAIL;
}
static int PMI_KVS_Get_my_name (char *kvsname, int length)
{
int (*f)(char *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_my_name") : NULL;
return f ? f (kvsname, length) : PMI_FAIL;
}
static int PMI_KVS_Get_name_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_name_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Get_key_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_key_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Get_value_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_value_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Put (const char *kvsname, const char *key, const char *value)
{
int (*f)(const char *, const char *, const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Put") : NULL;
return f ? f (kvsname, key, value) : PMI_FAIL;
}
static int PMI_KVS_Commit (const char *kvsname)
{
int (*f)(const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Commit") : NULL;
return f ? f (kvsname) : PMI_FAIL;
}
static int PMI_KVS_Get (const char *kvsname, const char *key,
char *value, int len)
{
int (*f)(const char *, const char *, char *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get") : NULL;
return f ? f (kvsname, key, value, len) : PMI_FAIL;
}
static int PMI_Get_clique_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_clique_ranks (int *ranks, int length)
{
int (*f)(int *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_ranks") : NULL;
return f ? f (ranks, length) : PMI_FAIL;
}
#endif /* !HAVE_FLUX_PMI_LIBRARY */
static int kvs_get(const char key[], char value [], int maxvalue)
{
int rc;
rc = PMI_KVS_Get(pmix_kvs_name, key, value, maxvalue);
if( PMI_SUCCESS != rc ){
/* silently return an error - might be okay */
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int kvs_put(const char key[], const char value[])
{
int rc;
rc = PMI_KVS_Put(pmix_kvs_name, key, value);
if( PMI_SUCCESS != rc ){
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
return OPAL_ERROR;
}
return rc;
}
static int cache_put_uint(opal_process_name_t *id, int type,
const char key[], uint64_t val)
{
char *cpy;
opal_value_t kv;
int ret;
if (!(cpy = strdup (key))) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto done;
}
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = cpy;
kv.type = type;
switch (type) {
case OPAL_UINT16:
kv.data.uint16 = val;
break;
case OPAL_UINT32:
kv.data.uint32 = val;
break;
case OPAL_UINT64:
kv.data.uint64 = val;
break;
default:
ret = OPAL_ERROR;
goto done_free;
}
ret = opal_pmix_base_store(id, &kv);
done_free:
OBJ_DESTRUCT(&kv);
done:
if (OPAL_SUCCESS != ret)
OPAL_ERROR_LOG(ret);
return ret;
}
static int cache_put_string (opal_process_name_t *id,
const char key[], char *val)
{
char *cpy;
opal_value_t kv;
int ret;
if (!(cpy = strdup (key))) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto done;
}
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = cpy;
kv.type = OPAL_STRING;
kv.data.string = val;
ret = opal_pmix_base_store(id, &kv);
OBJ_DESTRUCT(&kv);
done:
if (OPAL_SUCCESS != ret)
OPAL_ERROR_LOG(ret);
return ret;
}
static int flux_init(void)
{
int initialized;
int spawned;
int rc, ret = OPAL_ERROR;
int i, rank, lrank, nrank;
char tmp[64];
const char *jobid;
opal_process_name_t ldr;
char **localranks=NULL;
opal_process_name_t wildcard_rank;
char *str;
if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
OPAL_PMI_ERROR(rc, "PMI_Initialized");
return OPAL_ERROR;
}
if (!initialized && PMI_SUCCESS != (rc = PMI_Init(&spawned))) {
OPAL_PMI_ERROR(rc, "PMI_Init");
return OPAL_ERROR;
}
// setup hash table
opal_pmix_base_hash_init();
// Initialize space demands
rc = PMI_KVS_Get_value_length_max(&pmix_vallen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
goto err_exit;
}
pmix_vallen_threshold = pmix_vallen_max * 3;
pmix_vallen_threshold >>= 2;
rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
goto err_exit;
}
rc = PMI_KVS_Get_key_length_max(&pmix_keylen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
goto err_exit;
}
/* get our rank */
rc = PMI_Get_rank(&rank);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_rank");
goto err_exit;
}
/* get integer job id */
if (!(jobid = getenv ("FLUX_JOB_ID"))) {
opal_output(0, "getenv FLUX_JOB_ID [%s:%d:%s]: failed\n",
__FILE__, __LINE__, __func__);
goto err_exit;
}
flux_pname.jobid = strtoul(jobid, NULL, 10);
ldr.jobid = flux_pname.jobid;
flux_pname.vpid = rank;
/* store our name in the opal_proc_t so that
* debug messages will make sense - an upper
* layer will eventually overwrite it, but that
* won't do any harm */
opal_proc_set_name(&flux_pname);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux: assigned tmp name",
OPAL_NAME_PRINT(flux_pname));
/* setup wildcard rank*/
wildcard_rank = OPAL_PROC_MY_NAME;
wildcard_rank.vpid = OPAL_VPID_WILDCARD;
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_JOBID,
flux_pname.jobid)))
goto err_exit;
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT32,
OPAL_PMIX_RANK,
rank)))
goto err_exit;
pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
if (pmix_kvs_name == NULL) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto err_exit;
}
rc = PMI_KVS_Get_my_name(pmix_kvs_name, pmix_kvslen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
goto err_exit;
}
/* get our local proc info to find our local rank */
if (PMI_SUCCESS != (rc = PMI_Get_clique_size(&nlranks))) {
OPAL_PMI_ERROR(rc, "PMI_Get_clique_size");
goto err_exit;
}
/* save the local size */
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_LOCAL_SIZE,
nlranks)))
goto err_exit;
lrank = 0;
nrank = 0;
ldr.vpid = rank;
if (0 < nlranks) {
/* now get the specific ranks */
lranks = (int*)calloc(nlranks, sizeof(int));
if (NULL == lranks) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
OPAL_ERROR_LOG(rc);
goto err_exit;
}
if (PMI_SUCCESS != (rc = PMI_Get_clique_ranks(lranks, nlranks))) {
OPAL_PMI_ERROR(rc, "PMI_Get_clique_ranks");
free(lranks);
goto err_exit;
}
/* note the local ldr */
ldr.vpid = lranks[0];
/* save this */
memset(tmp, 0, 64);
for (i=0; i < nlranks; i++) {
(void)snprintf(tmp, 64, "%d", lranks[i]);
opal_argv_append_nosize(&localranks, tmp);
if (rank == lranks[i]) {
lrank = i;
nrank = i;
}
}
str = opal_argv_join(localranks, ',');
opal_argv_free(localranks);
if (OPAL_SUCCESS != (ret = cache_put_string (&wildcard_rank,
OPAL_PMIX_LOCAL_PEERS,
str)))
goto err_exit;
}
/* save the local leader */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT64,
OPAL_PMIX_LOCALLDR,
*(uint64_t*)&ldr)))
goto err_exit;
/* save our local rank */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT16,
OPAL_PMIX_LOCAL_RANK,
lrank)))
goto err_exit;
/* and our node rank */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT16,
OPAL_PMIX_NODE_RANK,
nrank)))
goto err_exit;
/* get universe size */
rc = PMI_Get_universe_size(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
goto err_exit;
}
/* push this into the dstore for subsequent fetches */
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_UNIV_SIZE,
i)))
goto err_exit;
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_MAX_PROCS,
i)))
goto err_exit;
/* get job size */
rc = PMI_Get_size(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_size");
goto err_exit;
}
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_JOB_SIZE,
i)))
goto err_exit;
/* get appnum */
rc = PMI_Get_appnum(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_appnum");
goto err_exit;
}
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT32,
OPAL_PMIX_APPNUM,
i)))
goto err_exit;
/* increment the init count */
++pmix_init_count;
return OPAL_SUCCESS;
err_exit:
PMI_Finalize();
return (ret == OPAL_SUCCESS ? OPAL_ERROR : ret);
}
static int flux_fini(void) {
if (0 == pmix_init_count) {
return OPAL_SUCCESS;
}
if (0 == --pmix_init_count) {
PMI_Finalize ();
}
// teardown hash table
opal_pmix_base_hash_finalize();
return OPAL_SUCCESS;
}
static int flux_initialized(void)
{
if (0 < pmix_init_count) {
return 1;
}
return 0;
}
static int flux_abort(int flag, const char msg[],
opal_list_t *procs)
{
PMI_Abort(flag, msg);
return OPAL_SUCCESS;
}
static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
{
/*
int rc;
size_t preput_vector_size;
const int info_keyval_sizes[1];
info_keyval_sizes[0] = (int)opal_list_get_size(info_keyval_vector);
//FIXME what's the size of array of lists?
preput_vector_size = opal_list_get_size(preput_keyval_vector);
rc = PMI_Spawn_multiple(count, cmds, argcs, argvs, maxprocs, info_keyval_sizes, info_keyval_vector, (int)preput_vector_size, preput_keyval_vector);
if( PMI_SUCCESS != rc ) {
OPAL_PMI_ERROR(rc, "PMI_Spawn_multiple");
return OPAL_ERROR;
}*/
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int flux_put(opal_pmix_scope_t scope,
opal_value_t *kv)
{
int rc;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux put for key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type, &pmix_packed_data, &pmix_packed_data_offset))) {
OPAL_ERROR_LOG(rc);
return rc;
}
if (pmix_packed_data_offset == 0) {
/* nothing to write */
return OPAL_SUCCESS;
}
if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled,
* nothing to put yet
*/
return OPAL_SUCCESS;
}
rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
flux_committed = false;
return rc;
}
static int flux_commit(void)
{
int rc;
/* check if there is partially filled meta key and put them */
opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmix_kvs_name))) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Commit");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int flux_fence(opal_list_t *procs, int collect_data)
{
int rc;
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
OPAL_PMI_ERROR(rc, "PMI_Barrier");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int flux_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv)
{
int rc;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux called get for key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
/* Keys presumed stored directly to cache by flux_init() under the
* wildcard rank must not trigger PMI_KVS_Get() if not found. */
if (id->vpid == OPAL_VPID_WILDCARD) {
opal_list_t values;
OBJ_CONSTRUCT(&values, opal_list_t);
rc = opal_pmix_base_fetch (id, key, &values);
OPAL_LIST_DESTRUCT(&values);
if (OPAL_SUCCESS != rc) {
return rc;
}
}
rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux got key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
return rc;
}
static int flux_publish(opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_lookup(opal_list_t *data, opal_list_t *info)
{
// Allocate mem for port here? Otherwise we won't get success!
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_unpublish(char **keys, opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_job_connect(opal_list_t *procs)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_job_disconnect(opal_list_t *procs)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_store_local(const opal_process_name_t *proc,
opal_value_t *val)
{
opal_pmix_base_store(proc, val);
return OPAL_SUCCESS;
}
static const char *flux_get_nspace(opal_jobid_t jobid)
{
return "N/A";
}
static void flux_register_jobid(opal_jobid_t jobid, const char *nspace)
{
return;
}
static char* pmix_error(int pmix_err)
{
char * err_msg;
switch(pmix_err) {
case PMI_FAIL: err_msg = "Operation failed"; break;
case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
#if defined(PMI_ERR_INVALID_KVS)
/* pmix.h calls this a valid return code but mpich doesn't define it */
case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
#endif
case PMI_SUCCESS: err_msg = "Success"; break;
default: err_msg = "Unkown error";
}
return err_msg;
}