1
1
Fine tuning of flux component
Fix a few minor issues with the initial cut:
* Job id could be obtained from the PMI kvsname like SLURM,
  but simpler to getenv (FLUX_JOB_ID)
* Flux pmi-1 doesn't define PMI_BOOL, PMI_TRUE, PMI_FALSE
* Flux pmi-1 maps the deprecated PMI_Get_kvs_domain_id() to
  PMI_KVS_Get_my_name() internally, so just call that instead.
* Drop residual slurm references.

Add wrappers for PMI functions so that if HAVE_FLUX_PMI_LIBRARY
is not defined, the component can dlopen libpmi.so at location
specified by the FLUX_PMI_LIBRARY_PATH env variable, which adds
flexibility.  If HAVE_FLUX_PMI_LIBRARY is defined, link with
libpmi.so at build time in the usual way.

Update configury for flux component

Update m4 so the configure options work as follows:

 --with-flux-pmi
      Build Flux PMI support (default: yes)

 --with-flux-pmi-library
      Link Flux PMI support with PMI library at build
      time. Otherwise the library is opened at runtime at
      location specified by FLUX_PMI_LIBRARY_PATH environment
      variable. Use this option to enable Flux support when
      building statically or without dlopen support (default: no)

If the latter option is provided, the library/header is located at
build time using the pkg-config module 'flux-pmi'.  Otherwise there
is no library/header dependency.

Handle the case where ompi is configured with --disable-dlopen
or --enable-statkc.  In those cases, don't build the component
unless --with-flux-pmi-library is provided.

It is fatal if the user explicitly requests --with-flux-pmi but
it cannot be built (e.g. due to --disable-dlopen).

Add a schizo/flux component

Update schizo/flux component

Eliminate slurm-specific usage cases.

Since the module is only loaded if FLUX_JOB_ID is set, there are
only two cases to handle:

1) App was launched indirectly through mpirun.  This is not yet
supported with Flux, but hook remains in case this mode is supported
in the future.

2) App was launched directly by Flux, with Flux providing
CPU binding, if any.

Fix up white space in pmix/flux component

Drop non-blocking fence from pmix:flux component

The flux PMI-1 library is not thread safe, therefore
register a regular blocking fence callback instead of the
thread-shifting fencenb().

pmix/flux component avoids extra PMI_KVS_Gets

Keys stored into the base cache under the wildcard
rank are not intended to be part of the global key namespace.
These keys therefore should not trigger a PMI_KVS_Get() if they
are not found in the cache.

Minor pmix/flux component cleanup

pmix/flux: drop code for fetching unused pmix_id

pmix/flux: err_exit must return error

Problem: in flux_init(), although 'ret' (variable holding
err_exit return code) is initialized to OPAL_ERROR, the
variable is reused as a temporary result code, so if there are
some successes followed by a failure that doesn't set 'ret',
flux_init() could return success with PMI not initialized.

Ensure that a "goto err_exit" returns OPAL_ERROR if 'ret'
is not set to some other error code.

pmix/flux: don't mix OPAL_ and PMI_ return codes

Problem: flux_init() can return both PMI_ and OPAL_ return
codes.  Although OPAL_SUCCESS and PMI_SUCCESS are both defined
as 0, other codes are not compatible.

Ensure that flux_init() consistently uses 'rc' for PMI_
return codes and 'ret' for OPAL_ return codes.

pmix/flux: factor out repeated code for cache put

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2016-12-07 08:57:01 -08:00
родитель ced245d093
Коммит 215d6290e0
11 изменённых файлов: 1248 добавлений и 0 удалений

38
opal/mca/pmix/flux/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,38 @@
#
# Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
pmix_flux.h \
pmix_flux_component.c \
pmix_flux.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_opal_pmix_flux_DSO
component_noinst =
component_install = mca_pmix_flux.la
else
component_noinst = libmca_pmix_flux.la
component_install =
endif
mcacomponentdir = $(opallibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_pmix_flux_la_SOURCES = $(sources)
mca_pmix_flux_la_CPPFLAGS = $(FLUX_PMI_CFLAGS)
mca_pmix_flux_la_LDFLAGS = -module -avoid-version
mca_pmix_flux_la_LIBADD = $(FLUX_PMI_LIBS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_pmix_flux_la_SOURCES =$(sources)
libmca_pmix_flux_la_CPPFLAGS = $(FLUX_PMI_CFLAGS)
libmca_pmix_flux_la_LDFLAGS = -module -avoid-version
libmca_pmix_flux_la_LIBADD = $(FLUX_PMI_LIBS)

63
opal/mca/pmix/flux/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,63 @@
# -*- shell-script -*-
#
# Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_pmix_flux_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_opal_pmix_flux_CONFIG], [
AC_CONFIG_FILES([opal/mca/pmix/flux/Makefile])
AC_ARG_WITH([flux-pmi],
[AC_HELP_STRING([--with-flux-pmi],
[Build Flux PMI support (default: yes)])])
AC_ARG_WITH([flux-pmi-library],
[AC_HELP_STRING([--with-flux-pmi-library],
[Link Flux PMI support with PMI library at build time. Otherwise the library is opened at runtime at location specified by FLUX_PMI_LIBRARY_PATH environment variable. Use this option to enable Flux support when building statically or without dlopen support (default: no)])])
# pkg-config check aborts configure on failure
AC_MSG_CHECKING([if user wants Flux support to link against PMI library])
AS_IF([test "x$with_flux_pmi_library" != "xyes"],
[AC_MSG_RESULT([no])
$3],
[AC_MSG_RESULT([yes])
PKG_CHECK_MODULES([FLUX_PMI], [flux-pmi], [], [])
have_flux_pmi_library=yes
AC_DEFINE([HAVE_FLUX_PMI_LIBRARY], [1],
[Flux support builds against external PMI library])
])
AC_MSG_CHECKING([if Flux support allowed to use dlopen])
AS_IF([test $OPAL_ENABLE_DLOPEN_SUPPORT -eq 1 && test "x$compile_mode" = "xdso"],
[AC_MSG_RESULT([yes])
flux_can_dlopen=yes
],
[AC_MSG_RESULT([no])
])
AC_MSG_CHECKING([Checking if Flux PMI support can be built])
AS_IF([test "x$with_flux_pmi" != "xno" && ( test "x$have_flux_pmi_library" = "xyes" || test "x$flux_can_dlopen" = "xyes" ) ],
[AC_MSG_RESULT([yes])
opal_enable_flux=yes
],
[AC_MSG_RESULT([no])
AS_IF([test "x$with_flux_pmi" = "xyes"],
[AC_MSG_ERROR([Aborting since Flux PMI support was requested])
])
])
# Evaluate succeed / fail
AS_IF([test "x$opal_enable_flux" = "xyes"],
[$1
# need to set the wrapper flags for static builds
pmix_flux_WRAPPER_EXTRA_LIBS="$FLUX_PMI_LIBS"],
[$2])
])

7
opal/mca/pmix/flux/owner.txt Обычный файл
Просмотреть файл

@ -0,0 +1,7 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: active

784
opal/mca/pmix/flux/pmix_flux.c Обычный файл
Просмотреть файл

@ -0,0 +1,784 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal_config.h"
#include "opal/constants.h"
#include "opal/types.h"
#include "opal_stdint.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/proc.h"
#include "opal/util/show_help.h"
#include <string.h>
#if defined (HAVE_FLUX_PMI_LIBRARY)
#include <pmi.h>
#else
#include <dlfcn.h>
#endif
#include "opal/mca/pmix/base/base.h"
#include "opal/mca/pmix/base/pmix_base_fns.h"
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_flux.h"
static int flux_init(void);
static int flux_fini(void);
static int flux_initialized(void);
static int flux_abort(int flag, const char msg[],
opal_list_t *procs);
static int flux_commit(void);
static int flux_fence(opal_list_t *procs, int collect_data);
static int flux_put(opal_pmix_scope_t scope,
opal_value_t *kv);
static int flux_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv);
static int flux_publish(opal_list_t *info);
static int flux_lookup(opal_list_t *data, opal_list_t *info);
static int flux_unpublish(char **keys, opal_list_t *info);
static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
static int flux_job_connect(opal_list_t *procs);
static int flux_job_disconnect(opal_list_t *procs);
static int flux_store_local(const opal_process_name_t *proc,
opal_value_t *val);
static const char *flux_get_nspace(opal_jobid_t jobid);
static void flux_register_jobid(opal_jobid_t jobid, const char *nspace);
const opal_pmix_base_module_t opal_pmix_flux_module = {
.init = flux_init,
.finalize = flux_fini,
.initialized = flux_initialized,
.abort = flux_abort,
.commit = flux_commit,
.fence = flux_fence,
.put = flux_put,
.get = flux_get,
.publish = flux_publish,
.lookup = flux_lookup,
.unpublish = flux_unpublish,
.spawn = flux_spawn,
.connect = flux_job_connect,
.disconnect = flux_job_disconnect,
.register_evhandler = opal_pmix_base_register_handler,
.deregister_evhandler = opal_pmix_base_deregister_handler,
.store_local = flux_store_local,
.get_nspace = flux_get_nspace,
.register_jobid = flux_register_jobid
};
// usage accounting
static int pmix_init_count = 0;
// PMI constant values:
static int pmix_kvslen_max = 0;
static int pmix_keylen_max = 0;
static int pmix_vallen_max = 0;
static int pmix_vallen_threshold = INT_MAX;
// Job environment description
static char *pmix_kvs_name = NULL;
static bool flux_committed = false;
static char* pmix_packed_data = NULL;
static int pmix_packed_data_offset = 0;
static char* pmix_packed_encoded_data = NULL;
static int pmix_packed_encoded_data_offset = 0;
static int pmix_pack_key = 0;
static opal_process_name_t flux_pname;
static int *lranks = NULL, nlranks;
static char* pmix_error(int pmix_err);
#define OPAL_PMI_ERROR(pmi_err, pmi_func) \
do { \
opal_output(0, "%s [%s:%d:%s]: %s\n", \
pmi_func, __FILE__, __LINE__, __func__, \
pmix_error(pmi_err)); \
} while(0);
#if !defined (HAVE_FLUX_PMI_LIBRARY)
//
// Wrapper functions for dlopened() PMI library.
//
#define PMI_SUCCESS 0
#define PMI_FAIL -1
#define PMI_ERR_INIT 1
#define PMI_ERR_NOMEM 2
#define PMI_ERR_INVALID_ARG 3
#define PMI_ERR_INVALID_KEY 4
#define PMI_ERR_INVALID_KEY_LENGTH 5
#define PMI_ERR_INVALID_VAL 6
#define PMI_ERR_INVALID_VAL_LENGTH 7
#define PMI_ERR_INVALID_LENGTH 8
#define PMI_ERR_INVALID_NUM_ARGS 9
#define PMI_ERR_INVALID_ARGS 10
#define PMI_ERR_INVALID_NUM_PARSED 11
#define PMI_ERR_INVALID_KEYVALP 12
#define PMI_ERR_INVALID_SIZE 13
static void *dso = NULL;
static int PMI_Init (int *spawned)
{
int (*f)(int *);
if (!dso) {
const char *path;
if ((path = getenv ("FLUX_PMI_LIBRARY_PATH")))
dso = dlopen (path, RTLD_NOW | RTLD_GLOBAL);
if (!dso)
return PMI_FAIL;
}
*(void **)(&f) = dlsym (dso, "PMI_Init");
return f ? f (spawned) : PMI_FAIL;
}
static int PMI_Initialized (int *initialized)
{
int (*f)(int *);
if (!dso) {
if (initialized)
*initialized = 0;
return PMI_SUCCESS;
}
*(void **)(&f) = dlsym (dso, "PMI_Initialized");
return f ? f (initialized) : PMI_FAIL;
}
static int PMI_Finalize (void)
{
int (*f)(void);
int rc;
if (!dso)
return PMI_SUCCESS;
*(void **)(&f) = dlsym (dso, "PMI_Finalize");
rc = f ? f () : PMI_FAIL;
dlclose (dso);
return rc;
}
static int PMI_Get_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_rank (int *rank)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_rank") : NULL;
return f ? f (rank) : PMI_FAIL;
}
static int PMI_Get_universe_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_universe_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_appnum (int *appnum)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_appnum") : NULL;
return f ? f (appnum) : PMI_FAIL;
}
static int PMI_Barrier (void)
{
int (*f)(void);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Barrier") : NULL;
return f ? f () : PMI_FAIL;
}
static int PMI_Abort (int exit_code, const char *error_msg)
{
int (*f)(int, const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Abort") : NULL;
return f ? f (exit_code, error_msg) : PMI_FAIL;
}
static int PMI_KVS_Get_my_name (char *kvsname, int length)
{
int (*f)(char *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_my_name") : NULL;
return f ? f (kvsname, length) : PMI_FAIL;
}
static int PMI_KVS_Get_name_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_name_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Get_key_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_key_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Get_value_length_max (int *length)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get_value_length_max") : NULL;
return f ? f (length) : PMI_FAIL;
}
static int PMI_KVS_Put (const char *kvsname, const char *key, const char *value)
{
int (*f)(const char *, const char *, const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Put") : NULL;
return f ? f (kvsname, key, value) : PMI_FAIL;
}
static int PMI_KVS_Commit (const char *kvsname)
{
int (*f)(const char *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Commit") : NULL;
return f ? f (kvsname) : PMI_FAIL;
}
static int PMI_KVS_Get (const char *kvsname, const char *key,
char *value, int len)
{
int (*f)(const char *, const char *, char *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_KVS_Get") : NULL;
return f ? f (kvsname, key, value, len) : PMI_FAIL;
}
static int PMI_Get_clique_size (int *size)
{
int (*f)(int *);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_size") : NULL;
return f ? f (size) : PMI_FAIL;
}
static int PMI_Get_clique_ranks (int *ranks, int length)
{
int (*f)(int *, int);
*(void **)(&f) = dso ? dlsym (dso, "PMI_Get_clique_ranks") : NULL;
return f ? f (ranks, length) : PMI_FAIL;
}
#endif /* !HAVE_FLUX_PMI_LIBRARY */
static int kvs_get(const char key[], char value [], int maxvalue)
{
int rc;
rc = PMI_KVS_Get(pmix_kvs_name, key, value, maxvalue);
if( PMI_SUCCESS != rc ){
/* silently return an error - might be okay */
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int kvs_put(const char key[], const char value[])
{
int rc;
rc = PMI_KVS_Put(pmix_kvs_name, key, value);
if( PMI_SUCCESS != rc ){
OPAL_PMI_ERROR(rc, "PMI_KVS_Put");
return OPAL_ERROR;
}
return rc;
}
static int cache_put_uint(opal_process_name_t *id, int type,
const char key[], uint64_t val)
{
char *cpy;
opal_value_t kv;
int ret;
if (!(cpy = strdup (key))) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto done;
}
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = cpy;
kv.type = type;
switch (type) {
case OPAL_UINT16:
kv.data.uint16 = val;
break;
case OPAL_UINT32:
kv.data.uint32 = val;
break;
case OPAL_UINT64:
kv.data.uint64 = val;
break;
default:
ret = OPAL_ERROR;
goto done_free;
}
ret = opal_pmix_base_store(id, &kv);
done_free:
OBJ_DESTRUCT(&kv);
done:
if (OPAL_SUCCESS != ret)
OPAL_ERROR_LOG(ret);
return ret;
}
static int cache_put_string (opal_process_name_t *id,
const char key[], char *val)
{
char *cpy;
opal_value_t kv;
int ret;
if (!(cpy = strdup (key))) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto done;
}
OBJ_CONSTRUCT(&kv, opal_value_t);
kv.key = cpy;
kv.type = OPAL_STRING;
kv.data.string = val;
ret = opal_pmix_base_store(id, &kv);
OBJ_DESTRUCT(&kv);
done:
if (OPAL_SUCCESS != ret)
OPAL_ERROR_LOG(ret);
return ret;
}
static int flux_init(void)
{
int initialized;
int spawned;
int rc, ret = OPAL_ERROR;
int i, rank, lrank, nrank;
char tmp[64];
const char *jobid;
opal_process_name_t ldr;
char **localranks=NULL;
opal_process_name_t wildcard_rank;
char *str;
if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
OPAL_PMI_ERROR(rc, "PMI_Initialized");
return OPAL_ERROR;
}
if (!initialized && PMI_SUCCESS != (rc = PMI_Init(&spawned))) {
OPAL_PMI_ERROR(rc, "PMI_Init");
return OPAL_ERROR;
}
// setup hash table
opal_pmix_base_hash_init();
// Initialize space demands
rc = PMI_KVS_Get_value_length_max(&pmix_vallen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_value_length_max");
goto err_exit;
}
pmix_vallen_threshold = pmix_vallen_max * 3;
pmix_vallen_threshold >>= 2;
rc = PMI_KVS_Get_name_length_max(&pmix_kvslen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_name_length_max");
goto err_exit;
}
rc = PMI_KVS_Get_key_length_max(&pmix_keylen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_key_length_max");
goto err_exit;
}
/* get our rank */
rc = PMI_Get_rank(&rank);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_rank");
goto err_exit;
}
/* get integer job id */
if (!(jobid = getenv ("FLUX_JOB_ID"))) {
opal_output(0, "getenv FLUX_JOB_ID [%s:%d:%s]: failed\n",
__FILE__, __LINE__, __func__);
goto err_exit;
}
flux_pname.jobid = strtoul(jobid, NULL, 10);
ldr.jobid = flux_pname.jobid;
flux_pname.vpid = rank;
/* store our name in the opal_proc_t so that
* debug messages will make sense - an upper
* layer will eventually overwrite it, but that
* won't do any harm */
opal_proc_set_name(&flux_pname);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux: assigned tmp name",
OPAL_NAME_PRINT(flux_pname));
/* setup wildcard rank*/
wildcard_rank = OPAL_PROC_MY_NAME;
wildcard_rank.vpid = OPAL_VPID_WILDCARD;
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_JOBID,
flux_pname.jobid)))
goto err_exit;
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT32,
OPAL_PMIX_RANK,
rank)))
goto err_exit;
pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
if (pmix_kvs_name == NULL) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
goto err_exit;
}
rc = PMI_KVS_Get_my_name(pmix_kvs_name, pmix_kvslen_max);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Get_my_name");
goto err_exit;
}
/* get our local proc info to find our local rank */
if (PMI_SUCCESS != (rc = PMI_Get_clique_size(&nlranks))) {
OPAL_PMI_ERROR(rc, "PMI_Get_clique_size");
goto err_exit;
}
/* save the local size */
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_LOCAL_SIZE,
nlranks)))
goto err_exit;
lrank = 0;
nrank = 0;
ldr.vpid = rank;
if (0 < nlranks) {
/* now get the specific ranks */
lranks = (int*)calloc(nlranks, sizeof(int));
if (NULL == lranks) {
ret = OPAL_ERR_OUT_OF_RESOURCE;
OPAL_ERROR_LOG(rc);
goto err_exit;
}
if (PMI_SUCCESS != (rc = PMI_Get_clique_ranks(lranks, nlranks))) {
OPAL_PMI_ERROR(rc, "PMI_Get_clique_ranks");
free(lranks);
goto err_exit;
}
/* note the local ldr */
ldr.vpid = lranks[0];
/* save this */
memset(tmp, 0, 64);
for (i=0; i < nlranks; i++) {
(void)snprintf(tmp, 64, "%d", lranks[i]);
opal_argv_append_nosize(&localranks, tmp);
if (rank == lranks[i]) {
lrank = i;
nrank = i;
}
}
str = opal_argv_join(localranks, ',');
opal_argv_free(localranks);
if (OPAL_SUCCESS != (ret = cache_put_string (&wildcard_rank,
OPAL_PMIX_LOCAL_PEERS,
str)))
goto err_exit;
}
/* save the local leader */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT64,
OPAL_PMIX_LOCALLDR,
*(uint64_t*)&ldr)))
goto err_exit;
/* save our local rank */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT16,
OPAL_PMIX_LOCAL_RANK,
lrank)))
goto err_exit;
/* and our node rank */
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT16,
OPAL_PMIX_NODE_RANK,
nrank)))
goto err_exit;
/* get universe size */
rc = PMI_Get_universe_size(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
goto err_exit;
}
/* push this into the dstore for subsequent fetches */
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_UNIV_SIZE,
i)))
goto err_exit;
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_MAX_PROCS,
i)))
goto err_exit;
/* get job size */
rc = PMI_Get_size(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_size");
goto err_exit;
}
if (OPAL_SUCCESS != (ret = cache_put_uint (&wildcard_rank,
OPAL_UINT32,
OPAL_PMIX_JOB_SIZE,
i)))
goto err_exit;
/* get appnum */
rc = PMI_Get_appnum(&i);
if (PMI_SUCCESS != rc) {
OPAL_PMI_ERROR(rc, "PMI_Get_appnum");
goto err_exit;
}
if (OPAL_SUCCESS != (ret = cache_put_uint (&OPAL_PROC_MY_NAME,
OPAL_UINT32,
OPAL_PMIX_APPNUM,
i)))
goto err_exit;
/* increment the init count */
++pmix_init_count;
return OPAL_SUCCESS;
err_exit:
PMI_Finalize();
return (ret == OPAL_SUCCESS ? OPAL_ERROR : ret);
}
static int flux_fini(void) {
if (0 == pmix_init_count) {
return OPAL_SUCCESS;
}
if (0 == --pmix_init_count) {
PMI_Finalize ();
}
// teardown hash table
opal_pmix_base_hash_finalize();
return OPAL_SUCCESS;
}
static int flux_initialized(void)
{
if (0 < pmix_init_count) {
return 1;
}
return 0;
}
static int flux_abort(int flag, const char msg[],
opal_list_t *procs)
{
PMI_Abort(flag, msg);
return OPAL_SUCCESS;
}
static int flux_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
{
/*
int rc;
size_t preput_vector_size;
const int info_keyval_sizes[1];
info_keyval_sizes[0] = (int)opal_list_get_size(info_keyval_vector);
//FIXME what's the size of array of lists?
preput_vector_size = opal_list_get_size(preput_keyval_vector);
rc = PMI_Spawn_multiple(count, cmds, argcs, argvs, maxprocs, info_keyval_sizes, info_keyval_vector, (int)preput_vector_size, preput_keyval_vector);
if( PMI_SUCCESS != rc ) {
OPAL_PMI_ERROR(rc, "PMI_Spawn_multiple");
return OPAL_ERROR;
}*/
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int flux_put(opal_pmix_scope_t scope,
opal_value_t *kv)
{
int rc;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux put for key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key);
if (OPAL_SUCCESS != (rc = opal_pmix_base_store_encoded (kv->key, (void*)&kv->data, kv->type, &pmix_packed_data, &pmix_packed_data_offset))) {
OPAL_ERROR_LOG(rc);
return rc;
}
if (pmix_packed_data_offset == 0) {
/* nothing to write */
return OPAL_SUCCESS;
}
if (((pmix_packed_data_offset/3)*4) + pmix_packed_encoded_data_offset < pmix_vallen_max) {
/* this meta-key is still being filled,
* nothing to put yet
*/
return OPAL_SUCCESS;
}
rc = opal_pmix_base_partial_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
flux_committed = false;
return rc;
}
static int flux_commit(void)
{
int rc;
/* check if there is partially filled meta key and put them */
opal_pmix_base_commit_packed (&pmix_packed_data, &pmix_packed_data_offset,
&pmix_packed_encoded_data, &pmix_packed_encoded_data_offset,
pmix_vallen_max, &pmix_pack_key, kvs_put);
if (PMI_SUCCESS != (rc = PMI_KVS_Commit(pmix_kvs_name))) {
OPAL_PMI_ERROR(rc, "PMI_KVS_Commit");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int flux_fence(opal_list_t *procs, int collect_data)
{
int rc;
if (PMI_SUCCESS != (rc = PMI_Barrier())) {
OPAL_PMI_ERROR(rc, "PMI_Barrier");
return OPAL_ERROR;
}
return OPAL_SUCCESS;
}
static int flux_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv)
{
int rc;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux called get for key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
/* Keys presumed stored directly to cache by flux_init() under the
* wildcard rank must not trigger PMI_KVS_Get() if not found. */
if (id->vpid == OPAL_VPID_WILDCARD) {
opal_list_t values;
OBJ_CONSTRUCT(&values, opal_list_t);
rc = opal_pmix_base_fetch (id, key, &values);
OPAL_LIST_DESTRUCT(&values);
if (OPAL_SUCCESS != rc) {
return rc;
}
}
rc = opal_pmix_base_cache_keys_locally(id, key, kv, pmix_kvs_name, pmix_vallen_max, kvs_get);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:flux got key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), key);
return rc;
}
static int flux_publish(opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_lookup(opal_list_t *data, opal_list_t *info)
{
// Allocate mem for port here? Otherwise we won't get success!
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_unpublish(char **keys, opal_list_t *info)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_job_connect(opal_list_t *procs)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_job_disconnect(opal_list_t *procs)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int flux_store_local(const opal_process_name_t *proc,
opal_value_t *val)
{
opal_pmix_base_store(proc, val);
return OPAL_SUCCESS;
}
static const char *flux_get_nspace(opal_jobid_t jobid)
{
return "N/A";
}
static void flux_register_jobid(opal_jobid_t jobid, const char *nspace)
{
return;
}
static char* pmix_error(int pmix_err)
{
char * err_msg;
switch(pmix_err) {
case PMI_FAIL: err_msg = "Operation failed"; break;
case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
#if defined(PMI_ERR_INVALID_KVS)
/* pmix.h calls this a valid return code but mpich doesn't define it */
case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
#endif
case PMI_SUCCESS: err_msg = "Success"; break;
default: err_msg = "Unkown error";
}
return err_msg;
}

31
opal/mca/pmix/flux/pmix_flux.h Обычный файл
Просмотреть файл

@ -0,0 +1,31 @@
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_PMIX_FLUX_H
#define MCA_PMIX_FLUX_H
#include "opal_config.h"
#include "opal/mca/mca.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/mca/pmix/base/pmix_base_fns.h"
BEGIN_C_DECLS
/*
* Globally exported variable
*/
OPAL_DECLSPEC extern opal_pmix_base_component_t mca_pmix_flux_component;
OPAL_DECLSPEC extern const opal_pmix_base_module_t opal_pmix_flux_module;
END_C_DECLS
#endif /* MCA_PMIX_FLUX_H */

104
opal/mca/pmix/flux/pmix_flux_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,104 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "opal_config.h"
#include "opal/constants.h"
#include "opal/mca/pmix/pmix.h"
#include "pmix_flux.h"
/*
* Public string showing the pmix flux component version number
*/
const char *opal_pmix_flux_component_version_string =
"OPAL flux pmix MCA component version " OPAL_VERSION;
/*
* Local function
*/
static int pmix_flux_component_query(mca_base_module_t **module, int *priority);
static int pmix_flux_component_register(void);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
opal_pmix_base_component_t mca_pmix_flux_component = {
/* First, the mca_component_t struct containing meta information
about the component itself */
.base_version = {
/* Indicate that we are a pmix v1.1.0 component (which also
implies a specific MCA version) */
OPAL_PMIX_BASE_VERSION_2_0_0,
/* Component name and version */
.mca_component_name = "flux",
MCA_BASE_MAKE_VERSION(component, OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION,
OPAL_RELEASE_VERSION),
/* Component open and close functions */
.mca_query_component = pmix_flux_component_query,
.mca_register_component_params = pmix_flux_component_register,
},
/* Next the MCA v1.0.0 component meta data */
.base_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
.priority = 10,
};
static int pmix_flux_component_register(void)
{
int ret;
mca_base_component_t *component = &mca_pmix_flux_component.base_version;
mca_pmix_flux_component.priority = 20;
ret = mca_base_component_var_register(component, "priority",
"Priority of the pmix flux component (default: 20)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&mca_pmix_flux_component.priority);
if (0 > ret) {
return ret;
}
return OPAL_SUCCESS;
}
static int pmix_flux_component_query(mca_base_module_t **module, int *priority)
{
/* disqualify ourselves if we are not under Flux */
if (NULL == getenv("FLUX_JOB_ID")) {
*priority = 0;
*module = NULL;
return OPAL_ERROR;
}
/* we can be considered */
*priority = mca_pmix_flux_component.priority;
*module = (mca_base_module_t *)&opal_pmix_flux_module;
return OPAL_SUCCESS;
}

34
orte/mca/schizo/flux/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
#
# Copyright (c) 2016 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
schizo_flux_component.c \
schizo_flux.h \
schizo_flux.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_schizo_flux_DSO
component_noinst =
component_install = mca_schizo_flux.la
else
component_noinst = libmca_schizo_flux.la
component_install =
endif
mcacomponentdir = $(ortelibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_schizo_flux_la_SOURCES = $(sources)
mca_schizo_flux_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_schizo_flux_la_SOURCES = $(sources)
libmca_schizo_flux_la_LDFLAGS = -module -avoid-version

7
orte/mca/schizo/flux/owner.txt Обычный файл
Просмотреть файл

@ -0,0 +1,7 @@
#
# owner/status file
# owner: institution that is responsible for this package
# status: e.g. active, maintenance, unmaintained
#
owner: INTEL
status: active

101
orte/mca/schizo/flux/schizo_flux.c Обычный файл
Просмотреть файл

@ -0,0 +1,101 @@
/*
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <ctype.h>
#include "opal/util/argv.h"
#include "opal/util/basename.h"
#include "opal/util/opal_environ.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/mca/schizo/base/base.h"
#include "schizo_flux.h"
static orte_schizo_launch_environ_t check_launch_environment(void);
static void finalize(void);
orte_schizo_base_module_t orte_schizo_flux_module = {
.check_launch_environment = check_launch_environment,
.finalize = finalize
};
static char **pushed_envs = NULL;
static char **pushed_vals = NULL;
static orte_schizo_launch_environ_t myenv;
static bool myenvdefined = false;
static orte_schizo_launch_environ_t check_launch_environment(void)
{
char *bind, *list, *ptr;
int i;
if (myenvdefined) {
return myenv;
}
myenvdefined = true;
/* we were only selected because FLUX was detected
* and we are an app, so no need to further check
* that here. Instead, see if we were direct launched
* vs launched via mpirun */
if (NULL != orte_process_info.my_daemon_uri) {
/* nope */
myenv = ORTE_SCHIZO_NATIVE_LAUNCHED;
opal_argv_append_nosize(&pushed_envs, OPAL_MCA_PREFIX"ess");
opal_argv_append_nosize(&pushed_vals, "pmi");
goto setup;
}
myenv = ORTE_SCHIZO_DIRECT_LAUNCHED;
opal_argv_append_nosize(&pushed_envs, OPAL_MCA_PREFIX"ess");
opal_argv_append_nosize(&pushed_vals, "pmi");
/* if we are direct launched by FLUX, then we want
* to ensure that we do not override their binding
* options, so set that envar */
opal_argv_append_nosize(&pushed_envs, OPAL_MCA_PREFIX"hwloc_base_binding_policy");
opal_argv_append_nosize(&pushed_vals, "none");
/* indicate we are externally bound so we won't try to do it ourselves */
opal_argv_append_nosize(&pushed_envs, OPAL_MCA_PREFIX"orte_externally_bound");
opal_argv_append_nosize(&pushed_vals, "1");
setup:
opal_output_verbose(1, orte_schizo_base_framework.framework_output,
"schizo:flux DECLARED AS %s", orte_schizo_base_print_env(myenv));
if (NULL != pushed_envs) {
for (i=0; NULL != pushed_envs[i]; i++) {
opal_setenv(pushed_envs[i], pushed_vals[i], true, &environ);
}
}
return myenv;
}
static void finalize(void)
{
int i;
if (NULL != pushed_envs) {
for (i=0; NULL != pushed_envs[i]; i++) {
opal_unsetenv(pushed_envs[i], &environ);
}
opal_argv_free(pushed_envs);
opal_argv_free(pushed_vals);
}
}

28
orte/mca/schizo/flux/schizo_flux.h Обычный файл
Просмотреть файл

@ -0,0 +1,28 @@
/*
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_SCHIZO_FLUX_H_
#define _MCA_SCHIZO_FLUX_H_
#include "orte_config.h"
#include "orte/types.h"
#include "opal/mca/base/base.h"
#include "orte/mca/schizo/schizo.h"
BEGIN_C_DECLS
ORTE_MODULE_DECLSPEC extern orte_schizo_base_component_t mca_schizo_flux_component;
extern orte_schizo_base_module_t orte_schizo_flux_module;
END_C_DECLS
#endif /* MCA_SCHIZO_FLUX_H_ */

Просмотреть файл

@ -0,0 +1,51 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#include "opal/util/show_help.h"
#include "orte/mca/schizo/schizo.h"
#include "schizo_flux.h"
static int component_query(mca_base_module_t **module, int *priority);
/*
* Struct of function pointers and all that to let us be initialized
*/
orte_schizo_base_component_t mca_schizo_flux_component = {
.base_version = {
MCA_SCHIZO_BASE_VERSION_1_0_0,
.mca_component_name = "flux",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
.mca_query_component = component_query,
},
.base_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
};
static int component_query(mca_base_module_t **module, int *priority)
{
/* disqualify ourselves if we are not an app or under flux */
if (!ORTE_PROC_IS_APP || NULL == getenv("FLUX_JOB_ID")) {
*priority = 0;
*module = NULL;
return OPAL_ERROR;
}
*module = (mca_base_module_t*)&orte_schizo_flux_module;
*priority = 60;
return ORTE_SUCCESS;
}