Merge pull request #2562 from rhc54/topic/pmix2
Update the PMIx2 support to include the latest shared memory optimizations
Этот коммит содержится в:
Коммит
15b6eaf2d4
@ -15,7 +15,7 @@
|
||||
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
|
||||
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved
|
||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -675,7 +675,7 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
|
||||
|
||||
/* copy over the name of the executable */
|
||||
app->cmd = strdup(array_of_commands[i]);
|
||||
opal_argv_append(&app->argc, &app->argv, app->cmd);
|
||||
opal_argv_append_nosize(&app->argv, app->cmd);
|
||||
|
||||
/* record the number of procs to be generated */
|
||||
app->maxprocs = array_of_maxprocs[i];
|
||||
@ -684,7 +684,7 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
|
||||
if (MPI_ARGVS_NULL != array_of_argv &&
|
||||
MPI_ARGV_NULL != array_of_argv[i]) {
|
||||
for (j=0; NULL != array_of_argv[i][j]; j++) {
|
||||
opal_argv_append(&app->argc, &app->argv, array_of_argv[i][j]);
|
||||
opal_argv_append_nosize(&app->argv, array_of_argv[i][j]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2015-2016 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -41,12 +41,12 @@ static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
|
||||
{
|
||||
opal_pmix_base_async_modex = false;
|
||||
(void) mca_base_var_register("opal", "pmix", "base", "async_modex", "Use asynchronous modex mode",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
|
||||
opal_pmix_collect_all_data = true;
|
||||
(void) mca_base_var_register("opal", "pmix", "base", "collect_data", "Collect all data during modex",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data);
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
@ -74,10 +74,10 @@ static int opal_pmix_base_frame_open(mca_base_open_flag_t flags)
|
||||
}
|
||||
|
||||
MCA_BASE_FRAMEWORK_DECLARE(opal, pmix, "OPAL PMI Client Framework",
|
||||
opal_pmix_base_frame_register,
|
||||
opal_pmix_base_frame_open,
|
||||
opal_pmix_base_frame_close,
|
||||
mca_pmix_base_static_components, 0);
|
||||
opal_pmix_base_frame_register,
|
||||
opal_pmix_base_frame_open,
|
||||
opal_pmix_base_frame_close,
|
||||
mca_pmix_base_static_components, 0);
|
||||
|
||||
/**** PMIX FRAMEWORK OBJECTS ****/
|
||||
static void lkcon(opal_pmix_pdata_t *p)
|
||||
@ -91,8 +91,8 @@ static void lkdes(opal_pmix_pdata_t *p)
|
||||
OBJ_DESTRUCT(&p->value);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(opal_pmix_pdata_t,
|
||||
opal_list_item_t,
|
||||
lkcon, lkdes);
|
||||
opal_list_item_t,
|
||||
lkcon, lkdes);
|
||||
|
||||
static void mdcon(opal_pmix_modex_data_t *p)
|
||||
{
|
||||
@ -104,38 +104,41 @@ static void mdcon(opal_pmix_modex_data_t *p)
|
||||
static void mddes(opal_pmix_modex_data_t *p)
|
||||
{
|
||||
if (NULL != p->blob) {
|
||||
free(p->blob);
|
||||
free(p->blob);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(opal_pmix_modex_data_t,
|
||||
opal_list_item_t,
|
||||
mdcon, mddes);
|
||||
opal_list_item_t,
|
||||
mdcon, mddes);
|
||||
|
||||
static void apcon(opal_pmix_app_t *p)
|
||||
{
|
||||
p->cmd = NULL;
|
||||
p->argc = 0;
|
||||
p->argv = NULL;
|
||||
p->env = NULL;
|
||||
p->cwd = NULL;
|
||||
p->maxprocs = 0;
|
||||
OBJ_CONSTRUCT(&p->info, opal_list_t);
|
||||
}
|
||||
static void apdes(opal_pmix_app_t *p)
|
||||
{
|
||||
if (NULL != p->cmd) {
|
||||
free(p->cmd);
|
||||
free(p->cmd);
|
||||
}
|
||||
if (NULL != p->argv) {
|
||||
opal_argv_free(p->argv);
|
||||
opal_argv_free(p->argv);
|
||||
}
|
||||
if (NULL != p->env) {
|
||||
opal_argv_free(p->env);
|
||||
opal_argv_free(p->env);
|
||||
}
|
||||
if (NULL != p->cwd) {
|
||||
free(p->cwd);
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&p->info);
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(opal_pmix_app_t,
|
||||
opal_list_item_t,
|
||||
apcon, apdes);
|
||||
opal_list_item_t,
|
||||
apcon, apdes);
|
||||
|
||||
static void qcon(opal_pmix_query_t *p)
|
||||
{
|
||||
|
@ -41,7 +41,7 @@ AC_DEFUN([MCA_opal_pmix_pmix2x_CONFIG],[
|
||||
[AC_HELP_STRING([--enable-pmix-dstore],
|
||||
[Enable PMIx shared memory data store (default: disabled)])])
|
||||
AC_MSG_CHECKING([if PMIx shared memory data store is enabled])
|
||||
if test "$enable_pmix2_dstore" != "no"; then
|
||||
if test "$enable_pmix_dstore" != "no"; then
|
||||
AC_MSG_RESULT([yes])
|
||||
opal_pmix_pmix2x_sm_flag=--enable-dstore
|
||||
else
|
||||
|
@ -30,7 +30,7 @@ greek=
|
||||
# command, or with the date (if "git describe" fails) in the form of
|
||||
# "date<date>".
|
||||
|
||||
repo_rev=git22b754e
|
||||
repo_rev=git9089b99
|
||||
|
||||
# If tarball_version is not empty, it is used as the version string in
|
||||
# the tarball filename, regardless of all other versions listed in
|
||||
@ -44,7 +44,7 @@ tarball_version=
|
||||
|
||||
# The date when this release was created
|
||||
|
||||
date="Dec 06, 2016"
|
||||
date="Dec 13, 2016"
|
||||
|
||||
# The shared library version of each of PMIx's public libraries.
|
||||
# These versions are maintained in accordance with the "Library
|
||||
|
@ -328,11 +328,11 @@ AC_DEFUN([PMIX_SETUP_CORE],[
|
||||
netdb.h ucred.h])
|
||||
|
||||
AC_CHECK_HEADERS([sys/mount.h], [], [],
|
||||
[AC_INCLUDES_DEFAULT
|
||||
#if HAVE_SYS_PARAM_H
|
||||
#include <sys/param.h>
|
||||
#endif
|
||||
])
|
||||
[AC_INCLUDES_DEFAULT
|
||||
#if HAVE_SYS_PARAM_H
|
||||
#include <sys/param.h>
|
||||
#endif
|
||||
])
|
||||
|
||||
AC_CHECK_HEADERS([sys/sysctl.h], [], [],
|
||||
[AC_INCLUDES_DEFAULT
|
||||
@ -358,6 +358,7 @@ AC_DEFUN([PMIX_SETUP_CORE],[
|
||||
#endif
|
||||
])
|
||||
|
||||
|
||||
# Note that sometimes we have <stdbool.h>, but it doesn't work (e.g.,
|
||||
# have both Portland and GNU installed; using pgcc will find GNU's
|
||||
# <stdbool.h>, which all it does -- by standard -- is define "bool" to
|
||||
|
@ -95,7 +95,6 @@ int main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
app->maxprocs = 2;
|
||||
app->argc = 1;
|
||||
app->argv = (char**)malloc(2 * sizeof(char*));
|
||||
if (0 > asprintf(&app->argv[0], "%s/client", dir)) {
|
||||
exit(1);
|
||||
|
@ -155,6 +155,7 @@ typedef uint32_t pmix_rank_t;
|
||||
#define PMIX_TDIR_RMCLEAN "pmix.tdir.rmclean" // (bool) Resource Manager will clean session directories
|
||||
|
||||
/* information about relative ranks as assigned by the RM */
|
||||
#define PMIX_NSPACE "pmix.nspace" // (char*) nspace of a job
|
||||
#define PMIX_JOBID "pmix.jobid" // (char*) jobid assigned by scheduler
|
||||
#define PMIX_APPNUM "pmix.appnum" // (uint32_t) app number within the job
|
||||
#define PMIX_RANK "pmix.rank" // (pmix_rank_t) process rank within the job
|
||||
@ -275,6 +276,8 @@ typedef uint32_t pmix_rank_t;
|
||||
// returns (pmix_data_array_t) an array of pmix_proc_info_t for
|
||||
// procs in job on same node
|
||||
#define PMIX_QUERY_AUTHORIZATIONS "pmix.qry.auths" // return operations tool is authorized to perform"
|
||||
#define PMIX_QUERY_SPAWN_SUPPORT "pmix.qry.spawn" // return a comma-delimited list of supported spawn attributes
|
||||
#define PMIX_QUERY_DEBUG_SUPPORT "pmix.qry.debug" // return a comma-delimited list of supported debug attributes
|
||||
|
||||
/* log attributes */
|
||||
#define PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr
|
||||
@ -282,9 +285,11 @@ typedef uint32_t pmix_rank_t;
|
||||
#define PMIX_LOG_SYSLOG "pmix.log.syslog" // (bool) log data to syslog - defaults to ERROR priority unless
|
||||
// modified by directive
|
||||
/* debugger attributes */
|
||||
#define PMIX_SPAWN_UNDER_DEBUGGER "pmix.dbg.pause" // (bool) job is being spawned under debugger - instruct it to pause on start
|
||||
#define PMIX_JOB_BEING_DEBUGGED "pmix.dbg.job" // (char*) nspace of the job to be debugged - the RM/PMIx server are
|
||||
// to provide the job-level info of that job to each debugger daemon
|
||||
#define PMIX_DEBUG_STOP_ON_EXEC "pmix.dbg.exec" // (bool) job is being spawned under debugger - instruct it to pause on start
|
||||
#define PMIX_DEBUG_STOP_IN_INIT "pmix.dbg.init" // (bool) instruct job to stop during PMIx init
|
||||
#define PMIX_DEBUG_WAIT_FOR_NOTIFY "pmix.dbg.notify" // (bool) block at desired point until receiving debugger release notification
|
||||
#define PMIX_DEBUG_JOB "pmix.dbg.job" // (char*) nspace of the job to be debugged - the RM/PMIx server are
|
||||
#define PMIX_DEBUG_WAITING_FOR_NOTIFY "pmix.dbg.waiting" // (bool) job to be debugged is waiting for a release
|
||||
|
||||
/**** PROCESS STATE DEFINITIONS ****/
|
||||
typedef uint8_t pmix_proc_state_t;
|
||||
@ -581,18 +586,17 @@ typedef struct pmix_proc_info {
|
||||
|
||||
|
||||
/**** PMIX VALUE STRUCT ****/
|
||||
typedef struct pmix_info_t pmix_info_t;
|
||||
|
||||
typedef struct pmix_data_array {
|
||||
pmix_data_type_t type;
|
||||
size_t size;
|
||||
void *array;
|
||||
} pmix_data_array_t;
|
||||
|
||||
/**** DEPRECATED ****/
|
||||
struct pmix_info;
|
||||
|
||||
typedef struct pmix_info_array {
|
||||
size_t size;
|
||||
struct pmix_info *array;
|
||||
pmix_info_t *array;
|
||||
} pmix_info_array_t;
|
||||
/********************/
|
||||
|
||||
@ -754,23 +758,29 @@ typedef struct pmix_value {
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/* expose two functions that are resolved in the
|
||||
/* expose some functions that are resolved in the
|
||||
* PMIx library, but part of a header that
|
||||
* includes internal functions - we don't
|
||||
* want to expose the entire header here
|
||||
*/
|
||||
void pmix_value_load(pmix_value_t *v, void *data, pmix_data_type_t type);
|
||||
pmix_status_t pmix_value_xfer(pmix_value_t *kv, pmix_value_t *src);
|
||||
pmix_status_t pmix_argv_append_nosize(char ***argv, const char *arg);
|
||||
pmix_status_t pmix_setenv(const char *name, const char *value,
|
||||
bool overwrite, char ***env);
|
||||
|
||||
|
||||
#define PMIX_ARGV_APPEND(a, b) \
|
||||
pmix_argv_append_nosize(&(a), (b))
|
||||
#define PMIX_SETENV(a, b, c) \
|
||||
pmix_setenv((a), (b), true, (c))
|
||||
|
||||
|
||||
/**** PMIX INFO STRUCT ****/
|
||||
typedef struct pmix_info {
|
||||
struct pmix_info_t {
|
||||
char key[PMIX_MAX_KEYLEN+1]; // ensure room for the NULL terminator
|
||||
pmix_info_directives_t flags; // bit-mask of flags
|
||||
pmix_value_t value;
|
||||
} pmix_info_t;
|
||||
};
|
||||
|
||||
/* utility macros for working with pmix_info_t structs */
|
||||
#define PMIX_INFO_CREATE(m, n) \
|
||||
@ -874,9 +884,9 @@ typedef struct pmix_pdata {
|
||||
/**** PMIX APP STRUCT ****/
|
||||
typedef struct pmix_app {
|
||||
char *cmd;
|
||||
int argc;
|
||||
char **argv;
|
||||
char **env;
|
||||
char *cwd;
|
||||
int maxprocs;
|
||||
pmix_info_t *info;
|
||||
size_t ninfo;
|
||||
@ -916,6 +926,9 @@ typedef struct pmix_app {
|
||||
} \
|
||||
free((m)->env); \
|
||||
} \
|
||||
if (NULL != (m)->cwd) { \
|
||||
free((m)->cwd); \
|
||||
} \
|
||||
if (NULL != (m)->info) { \
|
||||
for (_ii=0; _ii < (m)->ninfo; _ii++) { \
|
||||
PMIX_INFO_DESTRUCT(&(m)->info[_ii]); \
|
||||
|
@ -557,13 +557,15 @@ PMIX_EXPORT pmix_status_t pmix_value_xfer(pmix_value_t *p, pmix_value_t *src)
|
||||
if (NULL != sa[n].cmd) {
|
||||
pa[n].cmd = strdup(sa[n].cmd);
|
||||
}
|
||||
pa[n].argc = sa[n].argc;
|
||||
if (NULL != sa[n].argv) {
|
||||
pa[n].argv = pmix_argv_copy(sa[n].argv);
|
||||
}
|
||||
if (NULL != sa[n].env) {
|
||||
pa[n].env = pmix_argv_copy(sa[n].env);
|
||||
}
|
||||
if (NULL != sa[n].cwd) {
|
||||
pa[n].cwd = strdup(sa[n].cwd);
|
||||
}
|
||||
pa[n].maxprocs = sa[n].maxprocs;
|
||||
if (0 < sa[n].ninfo && NULL != sa[n].info) {
|
||||
PMIX_INFO_CREATE(pa[n].info, sa[n].ninfo);
|
||||
@ -844,9 +846,11 @@ pmix_status_t pmix_bfrop_copy_app(pmix_app_t **dest, pmix_app_t *src,
|
||||
|
||||
*dest = (pmix_app_t*)malloc(sizeof(pmix_app_t));
|
||||
(*dest)->cmd = strdup(src->cmd);
|
||||
(*dest)->argc = src->argc;
|
||||
(*dest)->argv = pmix_argv_copy(src->argv);
|
||||
(*dest)->env = pmix_argv_copy(src->env);
|
||||
if (NULL != src->cwd) {
|
||||
(*dest)->cwd = strdup(src->cwd);
|
||||
}
|
||||
(*dest)->maxprocs = src->maxprocs;
|
||||
(*dest)->ninfo = src->ninfo;
|
||||
(*dest)->info = (pmix_info_t*)malloc(src->ninfo * sizeof(pmix_info_t));
|
||||
@ -1174,13 +1178,15 @@ pmix_status_t pmix_bfrop_copy_darray(pmix_data_array_t **dest,
|
||||
if (NULL != sa[n].cmd) {
|
||||
pa[n].cmd = strdup(sa[n].cmd);
|
||||
}
|
||||
pa[n].argc = sa[n].argc;
|
||||
if (NULL != sa[n].argv) {
|
||||
pa[n].argv = pmix_argv_copy(sa[n].argv);
|
||||
}
|
||||
if (NULL != sa[n].env) {
|
||||
pa[n].env = pmix_argv_copy(sa[n].env);
|
||||
}
|
||||
if (NULL != sa[n].cwd) {
|
||||
pa[n].cwd = strdup(sa[n].cwd);
|
||||
}
|
||||
pa[n].maxprocs = sa[n].maxprocs;
|
||||
if (0 < sa[n].ninfo && NULL != sa[n].info) {
|
||||
PMIX_INFO_CREATE(pa[n].info, sa[n].ninfo);
|
||||
|
@ -753,10 +753,11 @@ pmix_status_t pmix_bfrop_pack_app(pmix_buffer_t *buffer, const void *src,
|
||||
return ret;
|
||||
}
|
||||
/* argv */
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_int(buffer, &app[i].argc, 1, PMIX_INT))) {
|
||||
nvals = pmix_argv_count(app[i].argv);
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_int(buffer, &nvals, 1, PMIX_INT32))) {
|
||||
return ret;
|
||||
}
|
||||
for (j=0; j < app[i].argc; j++) {
|
||||
for (j=0; j < nvals; j++) {
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_string(buffer, &app[i].argv[j], 1, PMIX_STRING))) {
|
||||
return ret;
|
||||
}
|
||||
@ -771,6 +772,10 @@ pmix_status_t pmix_bfrop_pack_app(pmix_buffer_t *buffer, const void *src,
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
/* cwd */
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_string(buffer, &app[i].cwd, 1, PMIX_STRING))) {
|
||||
return ret;
|
||||
}
|
||||
/* maxprocs */
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_pack_int(buffer, &app[i].maxprocs, 1, PMIX_INT))) {
|
||||
return ret;
|
||||
|
@ -939,11 +939,11 @@ pmix_status_t pmix_bfrop_unpack_app(pmix_buffer_t *buffer, void *dest,
|
||||
}
|
||||
/* unpack argc */
|
||||
m=1;
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_int(buffer, &ptr[i].argc, &m, PMIX_INT))) {
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_int(buffer, &nval, &m, PMIX_INT32))) {
|
||||
return ret;
|
||||
}
|
||||
/* unpack argv */
|
||||
for (k=0; k < ptr[i].argc; k++) {
|
||||
for (k=0; k < nval; k++) {
|
||||
m=1;
|
||||
tmp = NULL;
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_string(buffer, &tmp, &m, PMIX_STRING))) {
|
||||
@ -972,6 +972,11 @@ pmix_status_t pmix_bfrop_unpack_app(pmix_buffer_t *buffer, void *dest,
|
||||
pmix_argv_append_nosize(&ptr[i].env, tmp);
|
||||
free(tmp);
|
||||
}
|
||||
/* unpack cwd */
|
||||
m=1;
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_string(buffer, &ptr[i].cwd, &m, PMIX_STRING))) {
|
||||
return ret;
|
||||
}
|
||||
/* unpack maxprocs */
|
||||
m=1;
|
||||
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_int(buffer, &ptr[i].maxprocs, &m, PMIX_INT))) {
|
||||
|
@ -633,7 +633,6 @@ PMIX_EXPORT int PMI_Spawn_multiple(int count,
|
||||
apps[i].cmd = strdup(cmds[i]);
|
||||
apps[i].maxprocs = maxprocs[i];
|
||||
apps[i].argv = pmix_argv_copy((char**) argvs[i]);
|
||||
apps[i].argc = pmix_argv_count(apps[i].argv);
|
||||
apps[i].ninfo = info_keyval_sizesp[i];
|
||||
if (0 < apps[i].ninfo) {
|
||||
apps[i].info = (pmix_info_t*)malloc(apps[i].ninfo * sizeof(pmix_info_t));
|
||||
|
@ -184,7 +184,6 @@ PMIX_EXPORT int PMI2_Job_Spawn(int count, const char * cmds[],
|
||||
apps[i].cmd = strdup(cmds[i]);
|
||||
apps[i].maxprocs = maxprocs[i];
|
||||
apps[i].argv = pmix_argv_copy((char**)argvs[i]);
|
||||
apps[i].argc = pmix_argv_count(apps[i].argv);
|
||||
apps[i].ninfo = info_keyval_sizes[i];
|
||||
apps[i].info = (pmix_info_t*)malloc(apps[i].ninfo * sizeof(pmix_info_t));
|
||||
/* copy the info objects */
|
||||
|
@ -206,6 +206,28 @@ PMIX_EXPORT const char* PMIx_Get_version(void)
|
||||
return pmix_version_string;
|
||||
}
|
||||
|
||||
volatile bool waiting_for_debugger = true;
|
||||
static void notification_fn(size_t evhdlr_registration_id,
|
||||
pmix_status_t status,
|
||||
const pmix_proc_t *source,
|
||||
pmix_info_t info[], size_t ninfo,
|
||||
pmix_info_t results[], size_t nresults,
|
||||
pmix_event_notification_cbfunc_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
if (NULL != cbfunc) {
|
||||
cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
|
||||
}
|
||||
waiting_for_debugger = false;
|
||||
}
|
||||
static void evhandler_reg_callbk(pmix_status_t status,
|
||||
size_t evhandler_ref,
|
||||
void *cbdata)
|
||||
{
|
||||
volatile int *active = (volatile int*)cbdata;
|
||||
*active = status;
|
||||
}
|
||||
|
||||
PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
|
||||
pmix_info_t info[], size_t ninfo)
|
||||
{
|
||||
@ -215,6 +237,8 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
|
||||
pmix_cb_t cb;
|
||||
pmix_buffer_t *req;
|
||||
pmix_cmd_t cmd = PMIX_REQ_CMD;
|
||||
volatile int active;
|
||||
pmix_status_t code = PMIX_ERR_DEBUGGER_RELEASE;
|
||||
|
||||
if (NULL == proc) {
|
||||
return PMIX_ERR_BAD_PARAM;
|
||||
@ -247,6 +271,7 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
|
||||
/* setup the globals */
|
||||
PMIX_CONSTRUCT(&pmix_client_globals.pending_requests, pmix_list_t);
|
||||
PMIX_CONSTRUCT(&pmix_client_globals.myserver, pmix_peer_t);
|
||||
pmix_client_globals.wait_for_debugger = false;
|
||||
|
||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||
"pmix: init called");
|
||||
@ -320,8 +345,27 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
|
||||
|
||||
if (PMIX_SUCCESS == rc) {
|
||||
pmix_globals.init_cntr++;
|
||||
} else {
|
||||
return rc;
|
||||
}
|
||||
return rc;
|
||||
|
||||
/* check if we are to wait here for debugger attach */
|
||||
if (pmix_client_globals.wait_for_debugger) {
|
||||
/* register for the debugger release notificaation */
|
||||
active = -1;
|
||||
PMIx_Register_event_handler(&code, 1, NULL, 0,
|
||||
notification_fn, evhandler_reg_callbk, (void*)&active);
|
||||
while (-1 == active) {
|
||||
sleep(1);
|
||||
}
|
||||
if (0 != active) {
|
||||
return active;
|
||||
}
|
||||
/* wait for it to arrive */
|
||||
PMIX_WAIT_FOR_COMPLETION(waiting_for_debugger);
|
||||
}
|
||||
|
||||
return PMIX_SUCCESS;
|
||||
}
|
||||
|
||||
PMIX_EXPORT int PMIx_Initialized(void)
|
||||
|
@ -21,6 +21,7 @@ BEGIN_C_DECLS
|
||||
typedef struct {
|
||||
pmix_peer_t myserver; // messaging support to/from my server
|
||||
pmix_list_t pending_requests; // list of pmix_cb_t pending data requests
|
||||
bool wait_for_debugger; // stop at the end of PMIx_Init and wait for notification of debugger release
|
||||
} pmix_client_globals_t;
|
||||
|
||||
extern pmix_client_globals_t pmix_client_globals;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <pmix_server.h>
|
||||
#include <pmix_common.h>
|
||||
#include "src/include/pmix_globals.h"
|
||||
#include "src/client/pmix_client_ops.h"
|
||||
#include "src/class/pmix_value_array.h"
|
||||
#include "src/util/error.h"
|
||||
#include "src/buffer_ops/internal.h"
|
||||
@ -308,6 +309,13 @@ static inline pmix_status_t _job_data_store(const char *nspace, void *cbdata)
|
||||
}
|
||||
/* cleanup */
|
||||
PMIX_DESTRUCT(&buf2);
|
||||
} else if (0 == strcmp(kptr->key, PMIX_DEBUG_STOP_IN_INIT)) {
|
||||
/* set the flag - we don't store this value */
|
||||
if (PMIX_UNDEF == kptr->value->type) {
|
||||
pmix_client_globals.wait_for_debugger = true;
|
||||
} else {
|
||||
pmix_client_globals.wait_for_debugger = kptr->value->data.flag;
|
||||
}
|
||||
} else {
|
||||
if (PMIX_SUCCESS != (rc = _add_key_for_rank(PMIX_RANK_WILDCARD, kptr, cb))) {
|
||||
PMIX_ERROR_LOG(rc);
|
||||
|
@ -63,8 +63,8 @@ pmix_dstore_base_module_t pmix_dstore_esh_module = {
|
||||
#define ESH_ENV_NS_DATA_SEG_SIZE "NS_DATA_SEG_SIZE"
|
||||
#define ESH_ENV_LINEAR "SM_USE_LINEAR_SEARCH"
|
||||
|
||||
#define EXT_SLOT_SIZE (PMIX_MAX_KEYLEN + 1 + 2*sizeof(size_t)) /* in ext slot new offset will be stored in case if new data were added for the same process during next commit */
|
||||
#define KVAL_SIZE(size) (PMIX_MAX_KEYLEN + 1 + sizeof(size_t) + size)
|
||||
#define EXT_SLOT_SIZE(key) (strlen(key) + 1 + 2*sizeof(size_t)) /* in ext slot new offset will be stored in case if new data were added for the same process during next commit */
|
||||
#define KVAL_SIZE(key, size) (strlen(key) + 1 + sizeof(size_t) + size)
|
||||
|
||||
#define _ESH_LOCK(lockfd, operation) \
|
||||
__extension__ ({ \
|
||||
@ -1056,18 +1056,18 @@ int _esh_fetch(const char *nspace, pmix_rank_t rank, const char *key, pmix_value
|
||||
* EXTENSION slot which has key = EXTENSION_SLOT and a size_t value for offset
|
||||
* to next data address for this process.
|
||||
*/
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_INVALIDATED, PMIX_MAX_KEYLEN+1)) {
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_INVALIDATED, strlen(ESH_REGION_INVALIDATED)+1)) {
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %s:%u, skip %s region",
|
||||
__FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_INVALIDATED));
|
||||
/*skip it */
|
||||
size_t size;
|
||||
memcpy(&size, addr + PMIX_MAX_KEYLEN + 1, sizeof(size_t));
|
||||
memcpy(&size, addr + strlen(ESH_REGION_INVALIDATED) + 1, sizeof(size_t));
|
||||
/* go to next item, updating address */
|
||||
addr += KVAL_SIZE(size);
|
||||
} else if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1)) {
|
||||
addr += KVAL_SIZE(ESH_REGION_INVALIDATED, size);
|
||||
} else if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, strlen(ESH_REGION_EXTENSION)+1)) {
|
||||
size_t offset;
|
||||
memcpy(&offset, addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), sizeof(size_t));
|
||||
memcpy(&offset, addr + strlen(ESH_REGION_EXTENSION) + 1 + sizeof(size_t), sizeof(size_t));
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %s:%u, reached %s with %lu value",
|
||||
__FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_EXTENSION, offset));
|
||||
@ -1087,14 +1087,14 @@ int _esh_fetch(const char *nspace, pmix_rank_t rank, const char *key, pmix_value
|
||||
__FILE__, __LINE__, __func__, cur_rank, key));
|
||||
break;
|
||||
}
|
||||
} else if (0 == strncmp((const char *)addr, key, PMIX_MAX_KEYLEN+1)) {
|
||||
} else if (0 == strncmp((const char *)addr, key, strlen(key)+1)) {
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %s:%u, found target key %s",
|
||||
__FILE__, __LINE__, __func__, nspace, cur_rank, key));
|
||||
/* target key is found, get value */
|
||||
size_t size;
|
||||
memcpy(&size, addr + PMIX_MAX_KEYLEN + 1, sizeof(size_t));
|
||||
addr += PMIX_MAX_KEYLEN + 1 + sizeof(size_t);
|
||||
memcpy(&size, addr + strlen(key) + 1, sizeof(size_t));
|
||||
addr += strlen(key) + 1 + sizeof(size_t);
|
||||
PMIX_CONSTRUCT(&buffer, pmix_buffer_t);
|
||||
PMIX_LOAD_BUFFER(&buffer, addr, size);
|
||||
int cnt = 1;
|
||||
@ -1116,13 +1116,13 @@ int _esh_fetch(const char *nspace, pmix_rank_t rank, const char *key, pmix_value
|
||||
goto done;
|
||||
} else {
|
||||
char ckey[PMIX_MAX_KEYLEN+1] = {0};
|
||||
strncpy(ckey, (const char *)addr, PMIX_MAX_KEYLEN);
|
||||
size_t size;
|
||||
memcpy(&size, addr + PMIX_MAX_KEYLEN + 1, sizeof(size_t));
|
||||
strncpy(ckey, (const char *)addr, strlen((const char *)addr)+1);
|
||||
memcpy(&size, addr + strlen(ckey) + 1, sizeof(size_t));
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %s:%u, skip key %s look for key %s", __FILE__, __LINE__, __func__, nspace, cur_rank, ckey, key));
|
||||
/* go to next item, updating address */
|
||||
addr += KVAL_SIZE(size);
|
||||
addr += KVAL_SIZE(ckey, size);
|
||||
kval_cnt--;
|
||||
}
|
||||
}
|
||||
@ -1898,19 +1898,19 @@ static int put_empty_ext_slot(seg_desc_t *dataseg)
|
||||
uint8_t *addr;
|
||||
global_offset = get_free_offset(dataseg);
|
||||
rel_offset = global_offset % _data_segment_size;
|
||||
if (rel_offset + EXT_SLOT_SIZE > _data_segment_size) {
|
||||
if (rel_offset + EXT_SLOT_SIZE(ESH_REGION_EXTENSION) > _data_segment_size) {
|
||||
PMIX_ERROR_LOG(PMIX_ERROR);
|
||||
return PMIX_ERROR;
|
||||
}
|
||||
addr = _get_data_region_by_offset(dataseg, global_offset);
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1);
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, strlen(ESH_REGION_EXTENSION)+1);
|
||||
val = 0;
|
||||
sz = sizeof(size_t);
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1, &sz, sz);
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), &val, sz);
|
||||
memcpy(addr + strlen(ESH_REGION_EXTENSION) + 1, &sz, sz);
|
||||
memcpy(addr + strlen(ESH_REGION_EXTENSION) + 1 + sizeof(size_t), &val, sz);
|
||||
|
||||
/* update offset at the beginning of current segment */
|
||||
data_ended = rel_offset + EXT_SLOT_SIZE;
|
||||
data_ended = rel_offset + EXT_SLOT_SIZE(ESH_REGION_EXTENSION);
|
||||
addr = (uint8_t*)(addr - rel_offset);
|
||||
memcpy(addr, &data_ended, sizeof(size_t));
|
||||
return PMIX_SUCCESS;
|
||||
@ -1938,15 +1938,15 @@ static size_t put_data_to_the_end(ns_track_elem_t *ns_info, seg_desc_t *dataseg,
|
||||
offset = global_offset % _data_segment_size;
|
||||
|
||||
/* We should provide additional space at the end of segment to place EXTENSION_SLOT to have an ability to enlarge data for this rank.*/
|
||||
if (sizeof(size_t) + KVAL_SIZE(size) + EXT_SLOT_SIZE > _data_segment_size) {
|
||||
if (sizeof(size_t) + KVAL_SIZE(key, size) + EXT_SLOT_SIZE(key) > _data_segment_size) {
|
||||
/* this is an error case: segment is so small that cannot place evem a single key-value pair.
|
||||
* warn a user about it and fail. */
|
||||
offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */
|
||||
pmix_output(0, "PLEASE set NS_DATA_SEG_SIZE to value which is larger when %lu.",
|
||||
sizeof(size_t) + PMIX_MAX_KEYLEN + 1 + sizeof(size_t) + size + EXT_SLOT_SIZE);
|
||||
sizeof(size_t) + strlen(key) + 1 + sizeof(size_t) + size + EXT_SLOT_SIZE(key));
|
||||
return offset;
|
||||
}
|
||||
if (offset + KVAL_SIZE(size) + EXT_SLOT_SIZE > _data_segment_size) {
|
||||
if (offset + KVAL_SIZE(key, size) + EXT_SLOT_SIZE(key) > _data_segment_size) {
|
||||
id++;
|
||||
/* create a new data segment. */
|
||||
tmp = extend_segment(tmp, &ns_info->ns_map);
|
||||
@ -1968,13 +1968,13 @@ static size_t put_data_to_the_end(ns_track_elem_t *ns_info, seg_desc_t *dataseg,
|
||||
}
|
||||
global_offset = offset + id * _data_segment_size;
|
||||
addr = (uint8_t*)(tmp->seg_info.seg_base_addr)+offset;
|
||||
strncpy((char *)addr, key, PMIX_MAX_KEYLEN+1);
|
||||
strncpy((char *)addr, key, strlen(key)+1);
|
||||
sz = size;
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1, &sz, sizeof(size_t));
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), buffer, size);
|
||||
memcpy(addr + strlen(key) + 1, &sz, sizeof(size_t));
|
||||
memcpy(addr + strlen(key) + 1 + sizeof(size_t), buffer, size);
|
||||
|
||||
/* update offset at the beginning of current segment */
|
||||
data_ended = offset + KVAL_SIZE(size);
|
||||
data_ended = offset + KVAL_SIZE(key, size);
|
||||
addr = (uint8_t*)(tmp->seg_info.seg_base_addr);
|
||||
memcpy(addr, &data_ended, sizeof(size_t));
|
||||
PMIX_OUTPUT_VERBOSE((2, pmix_globals.debug_output,
|
||||
@ -2028,9 +2028,9 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
* put extension slot at the end of previous segment with a "reference" to a new_offset */
|
||||
size_t sz = sizeof(size_t);
|
||||
addr = _get_data_region_by_offset(datadesc, free_offset);
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1);
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1, &sz, sizeof(size_t));
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), &offset, sizeof(size_t));
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, strlen(ESH_REGION_EXTENSION)+1);
|
||||
memcpy(addr + strlen(ESH_REGION_EXTENSION) + 1, &sz, sizeof(size_t));
|
||||
memcpy(addr + strlen(ESH_REGION_EXTENSION) + 1 + sizeof(size_t), &offset, sizeof(size_t));
|
||||
}
|
||||
if (NULL == *rinfo) {
|
||||
*rinfo = (rank_meta_info*)malloc(sizeof(rank_meta_info));
|
||||
@ -2063,8 +2063,8 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
* .....
|
||||
* extension slot which has key = EXTENSION_SLOT and a size_t value for offset to next data address for this process.
|
||||
*/
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1)) {
|
||||
memcpy(&offset, addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), sizeof(size_t));
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, strlen(ESH_REGION_EXTENSION)+1)) {
|
||||
memcpy(&offset, addr + strlen(ESH_REGION_EXTENSION) + 1 + sizeof(size_t), sizeof(size_t));
|
||||
if (0 < offset) {
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %u, replace flag %d %s is filled with %lu value",
|
||||
@ -2079,22 +2079,22 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
} else {
|
||||
/* should not be, we should be out of cycle when this happens */
|
||||
}
|
||||
} else if (0 == strncmp((const char *)addr, kval->key, PMIX_MAX_KEYLEN+1)) {
|
||||
} else if (0 == strncmp((const char *)addr, kval->key, strlen(kval->key)+1)) {
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %u, replace flag %d found target key %s",
|
||||
__FILE__, __LINE__, __func__, rank, data_exist, kval->key));
|
||||
/* target key is found, compare value sizes */
|
||||
size_t cur_size;
|
||||
memcpy(&cur_size, addr + PMIX_MAX_KEYLEN + 1, sizeof(size_t));
|
||||
memcpy(&cur_size, addr + strlen(kval->key) + 1, sizeof(size_t));
|
||||
if (cur_size != size) {
|
||||
//if (1) { /* if we want to test replacing values for existing keys. */
|
||||
/* invalidate current value and store another one at the end of data region. */
|
||||
strncpy((char *)addr, ESH_REGION_INVALIDATED, PMIX_MAX_KEYLEN+1);
|
||||
strncpy((char *)addr, ESH_REGION_INVALIDATED, strlen(ESH_REGION_INVALIDATED)+1);
|
||||
/* decrementing count, it will be incremented back when we add a new value for this key at the end of region. */
|
||||
(*rinfo)->count--;
|
||||
kval_cnt--;
|
||||
/* go to next item, updating address */
|
||||
addr += KVAL_SIZE(cur_size);
|
||||
addr += KVAL_SIZE(ESH_REGION_INVALIDATED, cur_size);
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %u, replace flag %d mark key %s regions as invalidated. put new data at the end.",
|
||||
__FILE__, __LINE__, __func__, rank, data_exist, kval->key));
|
||||
@ -2103,7 +2103,7 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
"%s:%d:%s: for rank %u, replace flag %d replace data for key %s type %d in place",
|
||||
__FILE__, __LINE__, __func__, rank, data_exist, kval->key, kval->value->type));
|
||||
/* replace old data with new one. */
|
||||
addr += PMIX_MAX_KEYLEN + 1;
|
||||
addr += strlen(kval->key) + 1;
|
||||
memcpy(addr, &size, sizeof(size_t));
|
||||
addr += sizeof(size_t);
|
||||
memset(addr, 0, cur_size);
|
||||
@ -2114,19 +2114,19 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
}
|
||||
} else {
|
||||
char ckey[PMIX_MAX_KEYLEN+1] = {0};
|
||||
strncpy(ckey, (const char *)addr, PMIX_MAX_KEYLEN+1);
|
||||
strncpy(ckey, (const char *)addr, strlen(addr)+1);
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %u, replace flag %d skip %s key, look for %s key",
|
||||
__FILE__, __LINE__, __func__, rank, data_exist, ckey, kval->key));
|
||||
/* Skip it: key is "INVALIDATED" or key is valid but different from target one. */
|
||||
if (0 != strncmp(ESH_REGION_INVALIDATED, ckey, PMIX_MAX_KEYLEN+1)) {
|
||||
if (0 != strncmp(ESH_REGION_INVALIDATED, ckey, strlen(ckey)+1)) {
|
||||
/* count only valid items */
|
||||
kval_cnt--;
|
||||
}
|
||||
size_t size;
|
||||
memcpy(&size, addr + PMIX_MAX_KEYLEN + 1, sizeof(size_t));
|
||||
memcpy(&size, addr + strlen(ckey) + 1, sizeof(size_t));
|
||||
/* go to next item, updating address */
|
||||
addr += KVAL_SIZE(size);
|
||||
addr += KVAL_SIZE(ckey, size);
|
||||
}
|
||||
}
|
||||
if (1 == add_to_the_end) {
|
||||
@ -2148,11 +2148,11 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
* data for different ranks, and that's why next element is EXTENSION_SLOT.
|
||||
* We put new data to the end of data region and just update EXTENSION_SLOT value by new offset.
|
||||
*/
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1)) {
|
||||
if (0 == strncmp((const char *)addr, ESH_REGION_EXTENSION, strlen(ESH_REGION_EXTENSION)+1)) {
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
"%s:%d:%s: for rank %u, replace flag %d %s should be filled with offset %lu value",
|
||||
__FILE__, __LINE__, __func__, rank, data_exist, ESH_REGION_EXTENSION, offset));
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), &offset, sizeof(size_t));
|
||||
memcpy(addr + strlen(ESH_REGION_EXTENSION) + 1 + sizeof(size_t), &offset, sizeof(size_t));
|
||||
} else {
|
||||
/* (2) - we point to the first free offset, no more data is stored further in this segment.
|
||||
* There is no EXTENSION_SLOT by this addr since we continue pushing data for the same rank,
|
||||
@ -2163,9 +2163,10 @@ static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t
|
||||
if (free_offset != offset) {
|
||||
/* segment was extended, need to put extension slot by free_offset indicating new_offset */
|
||||
size_t sz = sizeof(size_t);
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, PMIX_MAX_KEYLEN+1);
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1, &sz, sz);
|
||||
memcpy(addr + PMIX_MAX_KEYLEN + 1 + sizeof(size_t), &offset, sz);
|
||||
size_t length = strlen(ESH_REGION_EXTENSION);
|
||||
strncpy((char *)addr, ESH_REGION_EXTENSION, length + 1);
|
||||
memcpy(addr + length + 1, &sz, sz);
|
||||
memcpy(addr + length + 1 + sizeof(size_t), &offset, sz);
|
||||
}
|
||||
}
|
||||
PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output,
|
||||
|
@ -435,6 +435,8 @@ static void _notify_client_event(int sd, short args, void *cbdata)
|
||||
pmix_notify_caddy_t *rbout;
|
||||
pmix_regevents_info_t *reginfoptr;
|
||||
pmix_peer_events_info_t *pr;
|
||||
size_t n;
|
||||
bool matched;
|
||||
|
||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||
"pmix_server: _notify_error notifying clients of error %d",
|
||||
@ -464,6 +466,24 @@ static void _notify_client_event(int sd, short args, void *cbdata)
|
||||
cd->source.rank == pr->peer->info->rank) {
|
||||
continue;
|
||||
}
|
||||
/* if we were given specific targets, check if this is one */
|
||||
if (NULL != cd->targets) {
|
||||
matched = false;
|
||||
for (n=0; n < cd->ntargets; n++) {
|
||||
if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
|
||||
continue;
|
||||
}
|
||||
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
|
||||
pr->peer->info->rank == cd->targets[n].rank) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matched) {
|
||||
/* do not notify this one */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||
"pmix_server: notifying client %s:%d",
|
||||
pr->peer->info->nptr->nspace, pr->peer->info->rank);
|
||||
@ -515,6 +535,18 @@ static pmix_status_t notify_client_of_event(pmix_status_t status,
|
||||
for (n=0; n < ninfo; n++) {
|
||||
if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
|
||||
cd->nondefault = true;
|
||||
} else if (strncmp(info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
|
||||
/* provides an array of pmix_proc_t identifying the procs
|
||||
* that are to receive this notification */
|
||||
if (PMIX_DATA_ARRAY != info[n].value.type ||
|
||||
NULL == info[n].value.data.darray ||
|
||||
NULL == info[n].value.data.darray->array) {
|
||||
/* this is an error */
|
||||
return PMIX_ERR_BAD_PARAM;
|
||||
}
|
||||
cd->ntargets = info[n].value.data.darray->size;
|
||||
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
|
||||
memcpy(cd->targets, info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1009,7 +1009,7 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
|
||||
pmix_notify_caddy_t *cd;
|
||||
int i;
|
||||
bool enviro_events = false;
|
||||
bool found;
|
||||
bool found, matched;
|
||||
|
||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||
"recvd register events");
|
||||
@ -1173,7 +1173,25 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
/* have a match - notify */
|
||||
/* if we were given specific targets, check if this is one */
|
||||
if (NULL != cd->targets) {
|
||||
matched = false;
|
||||
for (n=0; n < cd->ntargets; n++) {
|
||||
if (0 != strncmp(peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
|
||||
continue;
|
||||
}
|
||||
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
|
||||
peer->info->rank == cd->targets[n].rank) {
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matched) {
|
||||
/* do not notify this one */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
/* all matches - notify */
|
||||
PMIX_RETAIN(cd->buf);
|
||||
PMIX_SERVER_QUEUE_REPLY(peer, 0, cd->buf);
|
||||
}
|
||||
@ -1536,6 +1554,8 @@ static void ncon(pmix_notify_caddy_t *p)
|
||||
memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
|
||||
p->source.rank = PMIX_RANK_UNDEF;
|
||||
p->range = PMIX_RANGE_UNDEF;
|
||||
p->targets = NULL;
|
||||
p->ntargets = 0;
|
||||
p->nondefault = false;
|
||||
p->info = NULL;
|
||||
p->ninfo = 0;
|
||||
@ -1546,6 +1566,9 @@ static void ndes(pmix_notify_caddy_t *p)
|
||||
if (NULL != p->info) {
|
||||
PMIX_INFO_FREE(p->info, p->ninfo);
|
||||
}
|
||||
if (NULL != p->targets) {
|
||||
free(p->targets);
|
||||
}
|
||||
if (NULL != p->buf) {
|
||||
PMIX_RELEASE(p->buf);
|
||||
}
|
||||
|
@ -53,6 +53,8 @@ typedef struct {
|
||||
pmix_status_t status;
|
||||
pmix_proc_t source;
|
||||
pmix_data_range_t range;
|
||||
pmix_proc_t *targets;
|
||||
size_t ntargets;
|
||||
bool nondefault;
|
||||
pmix_info_t *info;
|
||||
size_t ninfo;
|
||||
|
@ -162,6 +162,14 @@ PMIX_EXPORT int PMIx_tool_init(pmix_proc_t *proc,
|
||||
return PMIX_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* if we were given an nspace in the environment, then we
|
||||
* must have been spawned by a PMIx server - so even though
|
||||
* we technically will operate as a tool, we are actually
|
||||
* a "client" of the PMIx server and should connect that way */
|
||||
if (NULL != getenv("PMIX_NAMESPACE")) {
|
||||
return PMIx_Init(proc, info, ninfo);
|
||||
}
|
||||
|
||||
if (0 < pmix_globals.init_cntr) {
|
||||
/* since we have been called before, the nspace and
|
||||
* rank should be known. So return them here if
|
||||
|
@ -88,9 +88,9 @@ int main(int argc, char **argv)
|
||||
PMIX_APP_CREATE(app, 1);
|
||||
app->cmd = strdup("gumby");
|
||||
app->maxprocs = 2;
|
||||
pmix_argv_append(&app->argc, &app->argv, "gumby");
|
||||
pmix_argv_append(&app->argc, &app->argv, "-n");
|
||||
pmix_argv_append(&app->argc, &app->argv, "2");
|
||||
pmix_argv_append_nosize(&app->argv, "gumby");
|
||||
pmix_argv_append_nosize(&app->argv, "-n");
|
||||
pmix_argv_append_nosize(&app->argv, "2");
|
||||
pmix_setenv("PMIX_ENV_VALUE", "3", true, &app->env);
|
||||
PMIX_INFO_CREATE(app->info, 2);
|
||||
(void)strncpy(app->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
|
||||
|
@ -892,7 +892,6 @@ int pmix2x_spawn(opal_list_t *job_info, opal_list_t *apps, opal_jobid_t *jobid)
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(app, apps, opal_pmix_app_t) {
|
||||
papps[n].cmd = strdup(app->cmd);
|
||||
papps[n].argc = app->argc;
|
||||
papps[n].argv = opal_argv_copy(app->argv);
|
||||
papps[n].env = opal_argv_copy(app->env);
|
||||
papps[n].maxprocs = app->maxprocs;
|
||||
@ -993,7 +992,6 @@ int pmix2x_spawnnb(opal_list_t *job_info, opal_list_t *apps,
|
||||
n=0;
|
||||
OPAL_LIST_FOREACH(app, apps, opal_pmix_app_t) {
|
||||
op->apps[n].cmd = strdup(app->cmd);
|
||||
op->apps[n].argc = app->argc;
|
||||
op->apps[n].argv = opal_argv_copy(app->argv);
|
||||
op->apps[n].env = opal_argv_copy(app->env);
|
||||
op->apps[n].maxprocs = app->maxprocs;
|
||||
|
@ -588,7 +588,6 @@ static pmix_status_t server_spawn_fn(const pmix_proc_t *p,
|
||||
if (NULL != apps[n].cmd) {
|
||||
app->cmd = strdup(apps[n].cmd);
|
||||
}
|
||||
app->argc = apps[n].argc;
|
||||
if (NULL != apps[n].argv) {
|
||||
app->argv = opal_argv_copy(apps[n].argv);
|
||||
}
|
||||
|
@ -88,6 +88,7 @@ BEGIN_C_DECLS
|
||||
#define OPAL_PMIX_TDIR_RMCLEAN "pmix.tdir.rmclean" // (bool) Resource Manager will clean session directories
|
||||
|
||||
/* information about relative ranks as assigned by the RM */
|
||||
#define OPAL_PMIX_NSPACE "pmix.nspace" // (char*) nspace of a job
|
||||
#define OPAL_PMIX_JOBID "pmix.jobid" // (uint32_t) jobid assigned by scheduler
|
||||
#define OPAL_PMIX_APPNUM "pmix.appnum" // (uint32_t) app number within the job
|
||||
#define OPAL_PMIX_RANK "pmix.rank" // (uint32_t) process rank within the job
|
||||
@ -200,6 +201,8 @@ BEGIN_C_DECLS
|
||||
#define OPAL_PMIX_FWD_STDOUT "pmix.fwd.stdout" // (bool) forward stdout from spawned procs to me
|
||||
#define OPAL_PMIX_FWD_STDERR "pmix.fwd.stderr" // (bool) forward stderr from spawned procs to me
|
||||
#define OPAL_PMIX_DEBUGGER_DAEMONS "pmix.debugger" // (bool) spawned app consists of debugger daemons
|
||||
#define OPAL_PMIX_COSPAWN_APP "pmix.cospawn" // (bool) designated app is to be spawned as a disconnected
|
||||
// job - i.e., not part of the "comm_world" of the job
|
||||
|
||||
/* query attributes */
|
||||
#define OPAL_PMIX_QUERY_NAMESPACES "pmix.qry.ns" // (char*) request a comma-delimited list of active nspaces
|
||||
@ -212,12 +215,22 @@ BEGIN_C_DECLS
|
||||
// returns (pmix_data_array_t) an array of pmix_proc_info_t for
|
||||
// procs in job on same node
|
||||
#define OPAL_PMIX_QUERY_AUTHORIZATIONS "pmix.qry.auths" // return operations tool is authorized to perform"
|
||||
#define OPAL_PMIX_QUERY_SPAWN_SUPPORT "pmix.qry.spawn" // return a comma-delimited list of supported spawn attributes
|
||||
#define OPAL_PMIX_QUERY_DEBUG_SUPPORT "pmix.qry.debug" // return a comma-delimited list of supported debug attributes
|
||||
|
||||
/* log attributes */
|
||||
#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr
|
||||
#define OPAL_PMIX_LOG_STDOUT "pmix.log.stdout" // (bool) log data to stdout
|
||||
#define OPAL_PMIX_LOG_SYSLOG "pmix.log.syslog" // (bool) log data to syslog - defaults to ERROR priority unless
|
||||
|
||||
/* debugger attributes */
|
||||
#define OPAL_PMIX_DEBUG_STOP_ON_EXEC "pmix.dbg.exec" // (bool) job is being spawned under debugger - instruct it to pause on start
|
||||
#define OPAL_PMIX_DEBUG_STOP_IN_INIT "pmix.dbg.init" // (bool) instruct job to stop during PMIx init
|
||||
#define OPAL_PMIX_DEBUG_WAIT_FOR_NOTIFY "pmix.dbg.notify" // (bool) block at desired point until receiving debugger release notification
|
||||
#define OPAL_PMIX_DEBUG_JOB "pmix.dbg.job" // (char*) nspace of the job to be debugged - the RM/PMIx server are
|
||||
#define OPAL_PMIX_DEBUG_WAITING_FOR_NOTIFY "pmix.dbg.waiting" // (bool) job to be debugged is waiting for a release
|
||||
|
||||
|
||||
/* define a scope for data "put" by PMI per the following:
|
||||
*
|
||||
* OPAL_PMI_LOCAL - the data is intended only for other application
|
||||
@ -278,9 +291,9 @@ OBJ_CLASS_DECLARATION(opal_pmix_pdata_t);
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
char *cmd;
|
||||
int argc;
|
||||
char **argv;
|
||||
char **env;
|
||||
char *cwd;
|
||||
int maxprocs;
|
||||
opal_list_t info;
|
||||
} opal_pmix_app_t;
|
||||
|
@ -1554,7 +1554,7 @@ int orte_odls_base_default_kill_local_procs(opal_pointer_array_t *procs,
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
|
||||
"%s SENDING SIGTERM TO %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&child->name)));
|
||||
ORTE_NAME_PRINT(&cd->child->name)));
|
||||
kill_local(cd->child->pid, SIGTERM);
|
||||
}
|
||||
/* wait a little again */
|
||||
@ -1564,7 +1564,7 @@ int orte_odls_base_default_kill_local_procs(opal_pointer_array_t *procs,
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
|
||||
"%s SENDING SIGKILL TO %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&child->name)));
|
||||
ORTE_NAME_PRINT(&cd->child->name)));
|
||||
kill_local(cd->child->pid, SIGKILL);
|
||||
/* indicate the waitpid fired as this is effectively what
|
||||
* has happened
|
||||
|
@ -159,7 +159,8 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
|
||||
orte_job_t *jdata;
|
||||
orte_app_context_t *app;
|
||||
opal_pmix_app_t *papp;
|
||||
opal_value_t *info;
|
||||
opal_value_t *info, *next;
|
||||
opal_list_t *cache;
|
||||
int rc;
|
||||
char cwd[OPAL_PATH_MAX];
|
||||
|
||||
@ -173,7 +174,7 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
|
||||
jdata->map = OBJ_NEW(orte_job_map_t);
|
||||
|
||||
/* transfer the job info across */
|
||||
OPAL_LIST_FOREACH(info, job_info, opal_value_t) {
|
||||
OPAL_LIST_FOREACH_SAFE(info, next, job_info, opal_value_t) {
|
||||
if (0 == strcmp(info->key, OPAL_PMIX_PERSONALITY)) {
|
||||
jdata->personality = opal_argv_split(info->data.string, ',');
|
||||
} else if (0 == strcmp(info->key, OPAL_PMIX_MAPPER)) {
|
||||
@ -246,10 +247,21 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION,
|
||||
ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);
|
||||
}
|
||||
} else if (0 == strcmp(info->key, OPAL_PMIX_DEBUG_STOP_ON_EXEC)) {
|
||||
/* we don't know how to do this */
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
} else {
|
||||
/* unrecognized key */
|
||||
orte_show_help("help-orted.txt", "bad-key",
|
||||
true, "spawn", "job level", info->key);
|
||||
/* cache for inclusion with job info at registration */
|
||||
cache = NULL;
|
||||
opal_list_remove_item(job_info, &info->super);
|
||||
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, (void**)&cache, OPAL_PTR) &&
|
||||
NULL != cache) {
|
||||
opal_list_append(cache, &info->super);
|
||||
} else {
|
||||
cache = OBJ_NEW(opal_list_t);
|
||||
opal_list_append(cache, &info->super);
|
||||
orte_set_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, ORTE_ATTR_LOCAL, (void*)cache, OPAL_PTR);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* if the job is missing a personality setting, add it */
|
||||
@ -262,9 +274,16 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
|
||||
app = OBJ_NEW(orte_app_context_t);
|
||||
app->idx = opal_pointer_array_add(jdata->apps, app);
|
||||
jdata->num_apps++;
|
||||
app->app = strdup(papp->cmd);
|
||||
if (NULL != papp->cmd) {
|
||||
app->app = strdup(papp->cmd);
|
||||
} else {
|
||||
app->app = strdup(papp->argv[0]);
|
||||
}
|
||||
app->argv = opal_argv_copy(papp->argv);
|
||||
app->env = opal_argv_copy(papp->env);
|
||||
if (NULL != papp->cwd) {
|
||||
app->cwd = strdup(papp->cwd);
|
||||
}
|
||||
app->num_procs = papp->maxprocs;
|
||||
OPAL_LIST_FOREACH(info, &papp->info, opal_value_t) {
|
||||
if (0 == strcmp(info->key, OPAL_PMIX_HOST)) {
|
||||
|
@ -447,6 +447,7 @@ static void _query(int sd, short args, void *cbdata)
|
||||
uint32_t key;
|
||||
void *nptr;
|
||||
char **nspaces=NULL, nspace[512];
|
||||
char **ans = NULL;
|
||||
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s processing query",
|
||||
@ -457,6 +458,9 @@ static void _query(int sd, short args, void *cbdata)
|
||||
/* see what they wanted */
|
||||
OPAL_LIST_FOREACH(q, cd->info, opal_pmix_query_t) {
|
||||
for (n=0; NULL != q->keys[n]; n++) {
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s processing key %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), q->keys[n]);
|
||||
if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_NAMESPACES)) {
|
||||
/* get the current jobids */
|
||||
rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jdata, &nptr);
|
||||
@ -470,6 +474,7 @@ static void _query(int sd, short args, void *cbdata)
|
||||
}
|
||||
/* join the results into a single comma-delimited string */
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(OPAL_PMIX_QUERY_NAMESPACES);
|
||||
kv->type = OPAL_STRING;
|
||||
if (NULL != nspaces) {
|
||||
kv->data.string = opal_argv_join(nspaces, ',');
|
||||
@ -477,9 +482,42 @@ static void _query(int sd, short args, void *cbdata)
|
||||
kv->data.string = NULL;
|
||||
}
|
||||
opal_list_append(results, &kv->super);
|
||||
} else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_SPAWN_SUPPORT)) {
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_HOST);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_HOSTFILE);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOST);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_ADD_HOSTFILE);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_PREFIX);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_WDIR);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_MAPPER);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_PPR);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_MAPBY);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_RANKBY);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_BINDTO);
|
||||
/* create the return kv */
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(OPAL_PMIX_QUERY_SPAWN_SUPPORT);
|
||||
kv->type = OPAL_STRING;
|
||||
kv->data.string = opal_argv_join(ans, ',');
|
||||
opal_list_append(results, &kv->super);
|
||||
opal_argv_free(ans);
|
||||
ans = NULL;
|
||||
} else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_DEBUG_SUPPORT)) {
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_STOP_IN_INIT);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_JOB);
|
||||
opal_argv_append_nosize(&ans, OPAL_PMIX_DEBUG_WAIT_FOR_NOTIFY);
|
||||
/* create the return kv */
|
||||
kv = OBJ_NEW(opal_value_t);
|
||||
kv->key = strdup(OPAL_PMIX_QUERY_DEBUG_SUPPORT);
|
||||
kv->type = OPAL_STRING;
|
||||
kv->data.string = opal_argv_join(ans, ',');
|
||||
opal_list_append(results, &kv->super);
|
||||
opal_argv_free(ans);
|
||||
ans = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == opal_list_get_size(results)) {
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
} else if (opal_list_get_size(results) < opal_list_get_size(cd->info)) {
|
||||
|
@ -65,6 +65,7 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
|
||||
orte_app_context_t *app;
|
||||
uid_t uid;
|
||||
gid_t gid;
|
||||
opal_list_t *cache;
|
||||
|
||||
opal_output_verbose(2, orte_pmix_server_globals.output,
|
||||
"%s register nspace for %s",
|
||||
@ -90,6 +91,17 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
|
||||
kv->type = OPAL_UINT32;
|
||||
opal_list_append(info, &kv->super);
|
||||
|
||||
/* check for cached values to add to the job info */
|
||||
cache = NULL;
|
||||
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE, (void**)&cache, OPAL_PTR) &&
|
||||
NULL != cache) {
|
||||
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(cache))) {
|
||||
opal_list_append(info, &kv->super);
|
||||
}
|
||||
orte_remove_attribute(&jdata->attributes, ORTE_JOB_INFO_CACHE);
|
||||
OBJ_RELEASE(cache);
|
||||
}
|
||||
|
||||
/* assemble the node and proc map info */
|
||||
list = NULL;
|
||||
procs = NULL;
|
||||
|
@ -69,6 +69,8 @@ int orte_dt_pack_job(opal_buffer_t *buffer, const void *src,
|
||||
orte_app_context_t *app;
|
||||
orte_proc_t *proc;
|
||||
orte_attribute_t *kv;
|
||||
opal_list_t *cache;
|
||||
opal_value_t *val;
|
||||
|
||||
/* array of pointers to orte_job_t objects - need to pack the objects a set of fields at a time */
|
||||
jobs = (orte_job_t**) src;
|
||||
@ -215,6 +217,33 @@ int orte_dt_pack_job(opal_buffer_t *buffer, const void *src,
|
||||
}
|
||||
}
|
||||
}
|
||||
/* check for job info attribute */
|
||||
cache = NULL;
|
||||
if (orte_get_attribute(&jobs[i]->attributes, ORTE_JOB_INFO_CACHE, (void**)&cache, OPAL_PTR) &&
|
||||
NULL != cache) {
|
||||
/* we need to pack these as well, but they are composed
|
||||
* of opal_value_t's on a list. So first pack the number
|
||||
* of list elements */
|
||||
count = opal_list_get_size(cache);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_pack_buffer(buffer, (void*)(&count), 1, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* now pack each element on the list */
|
||||
OPAL_LIST_FOREACH(val, cache, opal_value_t) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_pack_buffer(buffer, (void*)&val, 1, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* pack a zero to indicate no job info is being passed */
|
||||
count = 0;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_pack_buffer(buffer, (void*)(&count), 1, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -66,6 +66,8 @@ int orte_dt_unpack_job(opal_buffer_t *buffer, void *dest,
|
||||
orte_app_idx_t j;
|
||||
orte_attribute_t *kv;
|
||||
char *tmp;
|
||||
opal_value_t *val;
|
||||
opal_list_t *cache;
|
||||
|
||||
/* unpack into array of orte_job_t objects */
|
||||
jobs = (orte_job_t**) dest;
|
||||
@ -220,6 +222,26 @@ int orte_dt_unpack_job(opal_buffer_t *buffer, void *dest,
|
||||
kv->local = ORTE_ATTR_GLOBAL; // obviously not a local value
|
||||
opal_list_append(&jobs[i]->attributes, &kv->super);
|
||||
}
|
||||
/* unpack any job info */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_unpack_buffer(buffer, &count,
|
||||
&n, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (0 < count){
|
||||
cache = OBJ_NEW(opal_list_t);
|
||||
orte_set_attribute(&jobs[i]->attributes, ORTE_JOB_INFO_CACHE, ORTE_ATTR_LOCAL, (void*)cache, OPAL_PTR);
|
||||
for (k=0; k < count; k++) {
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss_unpack_buffer(buffer, &val,
|
||||
&n, OPAL_VALUE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
opal_list_append(cache, &val->super);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -284,6 +284,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
|
||||
return "ORTE_JOB_NOTIFY_COMPLETION";
|
||||
case ORTE_JOB_TRANSPORT_KEY:
|
||||
return "ORTE_JOB_TRANSPORT_KEY";
|
||||
case ORTE_JOB_INFO_CACHE:
|
||||
return "ORTE_JOB_INFO_CACHE";
|
||||
|
||||
case ORTE_PROC_NOBARRIER:
|
||||
return "PROC-NOBARRIER";
|
||||
|
@ -143,6 +143,7 @@ typedef uint16_t orte_job_flags_t;
|
||||
#define ORTE_JOB_MULTI_DAEMON_SIM (ORTE_JOB_START_KEY + 49) // bool - multiple daemons/node to simulate large cluster
|
||||
#define ORTE_JOB_NOTIFY_COMPLETION (ORTE_JOB_START_KEY + 50) // bool - notify parent proc when spawned job terminates
|
||||
#define ORTE_JOB_TRANSPORT_KEY (ORTE_JOB_START_KEY + 51) // string - transport keys assigned to this job
|
||||
#define ORTE_JOB_INFO_CACHE (ORTE_JOB_START_KEY + 52) // opal_list_t - list of opal_value_t to be included in job_info
|
||||
|
||||
#define ORTE_JOB_MAX_KEY 300
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user