1
1

Merge pull request #3448 from rhc54/topic/omp

Implement the changes required to support cross-library coordination.…
Этот коммит содержится в:
Ralph Castain 2017-05-08 11:08:36 -07:00 коммит произвёл GitHub
родитель 42d31454a5 0afcb1a448
Коммит 2f11d371cd
40 изменённых файлов: 1160 добавлений и 239 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -387,6 +387,7 @@ orte/test/mpi/segv
orte/test/mpi/simple_spawn
orte/test/mpi/slave
orte/test/mpi/spawn_multiple
orte/test/mpi/xlib
orte/test/mpi/ziaprobe
orte/test/mpi/ziatest
orte/test/mpi/*.dwarf

Просмотреть файл

@ -14,7 +14,7 @@
# Copyright (c) 2010-2011 Sandia National Laboratories. All rights reserved.
# Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
# reserved.
# Copyright (c) 2015 Intel, Inc. All rights reserved.
# Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
# Copyright (c) 2015 Research Organization for Information Science
# and Technology (RIST). All rights reserved.
# Copyright (c) 2016 IBM Corporation. All rights reserved.
@ -178,6 +178,7 @@ include errhandler/Makefile.am
include file/Makefile.am
include group/Makefile.am
include info/Makefile.am
include interlib/Makefile.am
include message/Makefile.am
include op/Makefile.am
include peruse/Makefile.am

29
ompi/interlib/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,29 @@
# -*- makefile -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2016 IBM Corporation. All rights reserved.
# Copyright (c) 2017 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# This makefile.am does not stand on its own - it is included from ompi/Makefile.am
headers += \
interlib/interlib.h
lib@OMPI_LIBMPI_NAME@_la_SOURCES += \
interlib/interlib.c

162
ompi/interlib/interlib.c Обычный файл
Просмотреть файл

@ -0,0 +1,162 @@
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "opal/mca/pmix/pmix.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/interlib/interlib.h"
typedef struct {
int status;
volatile bool active;
} myreg_t;
/*
* errhandler id
*/
static size_t interlibhandler_id = SIZE_MAX;
static void model_registration_callback(int status,
size_t errhandler_ref,
void *cbdata)
{
myreg_t *trk = (myreg_t*)cbdata;
trk->status = status;
interlibhandler_id = errhandler_ref;
trk->active = false;
}
static void model_callback(int status,
const opal_process_name_t *source,
opal_list_t *info, opal_list_t *results,
opal_pmix_notification_complete_fn_t cbfunc,
void *cbdata)
{
opal_value_t *val;
/* we can ignore our own callback as we obviously
* know that we are MPI */
if (NULL != info) {
OPAL_LIST_FOREACH(val, info, opal_value_t) {
if (OPAL_STRING == val->type) {
#if 0
opal_output(0, "OMPI Model Callback Key: %s Val %s", val->key, val->data.string);
#else
if (0 == strcmp(val->key, OPAL_PMIX_MODEL_LIBRARY_NAME) &&
0 == strcmp(val->data.string, "OpenMPI")) {
goto cback;
}
#endif
}
}
}
/* otherwise, do something clever here */
cback:
/* we must NOT tell the event handler state machine that we
* are the last step as that will prevent it from notifying
* anyone else that might be listening for declarations */
if (NULL != cbfunc) {
cbfunc(OMPI_SUCCESS, NULL, NULL, NULL, cbdata);
}
}
int ompi_interlib_declare(int threadlevel, char *version)
{
opal_list_t info, directives;
opal_value_t *kv;
myreg_t trk;
int ret;
/* Register an event handler for library model declarations */
trk.status = OPAL_ERROR;
trk.active = true;
/* give it a name so we can distinguish it */
OBJ_CONSTRUCT(&directives, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME);
kv->type = OPAL_STRING;
kv->data.string = strdup("MPI-Model-Declarations");
opal_list_append(&directives, &kv->super);
/* specify the event code */
OBJ_CONSTRUCT(&info, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup("status"); // the key here is irrelevant
kv->type = OPAL_INT;
kv->data.integer = OPAL_ERR_MODEL_DECLARED;
opal_list_append(&info, &kv->super);
/* we could constrain the range to proc_local - technically, this
* isn't required so long as the code that generates
* the event stipulates its range as proc_local. We rely
* on that here */
opal_pmix.register_evhandler(&info, &directives, model_callback,
model_registration_callback,
(void*)&trk);
OMPI_LAZY_WAIT_FOR_COMPLETION(trk.active);
OPAL_LIST_DESTRUCT(&directives);
OPAL_LIST_DESTRUCT(&info);
if (OPAL_SUCCESS != trk.status) {
return trk.status;
}
/* declare that we are present and active */
OBJ_CONSTRUCT(&info, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_PROGRAMMING_MODEL);
kv->type = OPAL_STRING;
kv->data.string = strdup("MPI");
opal_list_append(&info, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_NAME);
kv->type = OPAL_STRING;
kv->data.string = strdup("OpenMPI");
opal_list_append(&info, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_VERSION);
kv->type = OPAL_STRING;
kv->data.string = strdup(version);
opal_list_append(&info, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_THREADING_MODEL);
kv->type = OPAL_STRING;
if (MPI_THREAD_SINGLE == threadlevel) {
kv->data.string = strdup("NONE");
} else {
kv->data.string = strdup("PTHREAD");
}
opal_list_append(&info, &kv->super);
/* call pmix to initialize these values */
if (OPAL_SUCCESS != (ret = opal_pmix.init(&info))) {
OPAL_LIST_DESTRUCT(&info);
return ret;
}
OPAL_LIST_DESTRUCT(&info);
return OMPI_SUCCESS;
}

45
ompi/interlib/interlib.h Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2008-2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file **/
#ifndef OMPI_INTERLIB_H
#define OMPI_INTERLIB_H
#include "ompi_config.h"
BEGIN_C_DECLS
/* declare the presence of the OMPI library to other
* libraries that may be used in this application, and
* register for callbacks when any other such libraries
* declare themselves */
OMPI_DECLSPEC int ompi_interlib_declare(int threadlevel, char *version);
END_C_DECLS
#endif /* OMPI_INTERLIB_H */

Просмотреть файл

@ -1,7 +1,7 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2012-2014 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
@ -131,7 +131,7 @@ static void _register_fn(int status,
void ompi_rte_wait_for_debugger(void)
{
int debugger;
opal_list_t *codes;
opal_list_t *codes, directives;
opal_value_t *kv;
char *evar;
int time;
@ -179,9 +179,17 @@ void ompi_rte_wait_for_debugger(void)
kv->data.integer = ORTE_ERR_DEBUGGER_RELEASE;
opal_list_append(codes, &kv->super);
opal_pmix.register_evhandler(codes, NULL, _release_fn, _register_fn, codes);
OBJ_CONSTRUCT(&directives, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME);
kv->type = OPAL_STRING;
kv->data.string = strdup("MPI-DEBUGGER-ATTACH");
opal_list_append(&directives, &kv->super);
opal_pmix.register_evhandler(codes, &directives, _release_fn, _register_fn, codes);
/* let the MPI progress engine run while we wait for registration to complete */
OMPI_WAIT_FOR_COMPLETION(debugger_register_active);
OPAL_LIST_DESTRUCT(&directives);
/* let the MPI progress engine run while we wait for debugger release */
OMPI_WAIT_FOR_COMPLETION(debugger_event_active);

Просмотреть файл

@ -16,7 +16,7 @@
* Copyright (c) 2006 University of Houston. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
*
@ -276,6 +276,9 @@ int ompi_mpi_finalize(void)
}
}
/* account for our refcount on pmix_init */
opal_pmix.finalize();
/* check for timing request - get stop time and report elapsed
time if so */
//OPAL_TIMING_DELTAS(ompi_enable_timing, &tm);

Просмотреть файл

@ -70,6 +70,7 @@
#include "ompi/info/info.h"
#include "ompi/errhandler/errcode.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/interlib/interlib.h"
#include "ompi/request/request.h"
#include "ompi/message/message.h"
#include "ompi/op/op.h"
@ -315,7 +316,6 @@ static int _convert_process_name_to_string(char** name_string,
return ompi_rte_convert_process_name_to_string(name_string, name);
}
void ompi_mpi_thread_level(int requested, int *provided)
{
/**
@ -525,6 +525,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_ORDER_PREPEND);
opal_list_append(&info, &kv->super);
/* give it a name so we can distinguish it */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME);
kv->type = OPAL_STRING;
kv->data.string = strdup("MPI-Default");
opal_list_append(&info, &kv->super);
opal_pmix.register_evhandler(NULL, &info, ompi_errhandler_callback,
ompi_errhandler_registration_callback,
(void*)&errtrk);
@ -537,6 +543,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
/* declare our presence for interlib coordination, and
* register for callbacks when other libs declare */
if (OMPI_SUCCESS != (ret = ompi_interlib_declare(*provided, OMPI_IDENT_STRING))) {
error = "ompi_interlib_declare";
goto error;
}
/* determine the bitflag belonging to the threadlevel_support provided */
memset ( &threadlevel_bf, 0, sizeof(uint8_t));

Просмотреть файл

@ -96,10 +96,10 @@ enum {
OPAL_ERR_PROC_MIGRATE = (OPAL_ERR_BASE - 65),
OPAL_ERR_EVENT_REGISTRATION = (OPAL_ERR_BASE - 66),
OPAL_ERR_HEARTBEAT_ALERT = (OPAL_ERR_BASE - 67),
OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68)
OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68),
OPAL_ERR_MODEL_DECLARED = (OPAL_ERR_BASE - 69)
};
#define OPAL_ERR_MAX (OPAL_ERR_BASE - 100)
#endif /* OPAL_CONSTANTS_H */

Просмотреть файл

@ -34,7 +34,7 @@
static char cray_pmi_version[128];
static int cray_init(void);
static int cray_init(opal_list_t *ilist);
static int cray_fini(void);
static int cray_initialized(void);
static int cray_abort(int flat, const char *msg,
@ -282,7 +282,7 @@ static void cray_get_more_info(void)
return;
}
static int cray_init(void)
static int cray_init(opal_list_t *ilist)
{
int i, spawned, size, rank, appnum, my_node;
int rc, ret = OPAL_ERROR;

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016-2017 Research Organization for Information Science
@ -90,7 +90,7 @@ OBJ_CLASS_DECLARATION(pmix1_opalcaddy_t);
/**** CLIENT FUNCTIONS ****/
OPAL_MODULE_DECLSPEC int pmix1_client_init(void);
OPAL_MODULE_DECLSPEC int pmix1_client_init(opal_list_t *ilist);
OPAL_MODULE_DECLSPEC int pmix1_client_finalize(void);
OPAL_MODULE_DECLSPEC int pmix1_initialized(void);
OPAL_MODULE_DECLSPEC int pmix1_abort(int flag, const char *msg,

Просмотреть файл

@ -100,7 +100,7 @@ static void errreg_cbfunc (pmix_status_t status,
status, errhandler_ref);
}
int pmix1_client_init(void)
int pmix1_client_init(opal_list_t *ilist)
{
opal_process_name_t pname;
pmix_status_t rc;

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
@ -216,7 +216,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t);
} while(0)
/**** CLIENT FUNCTIONS ****/
OPAL_MODULE_DECLSPEC int pmix2x_client_init(void);
OPAL_MODULE_DECLSPEC int pmix2x_client_init(opal_list_t *ilist);
OPAL_MODULE_DECLSPEC int pmix2x_client_finalize(void);
OPAL_MODULE_DECLSPEC int pmix2x_initialized(void);
OPAL_MODULE_DECLSPEC int pmix2x_abort(int flag, const char *msg,

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Mellanox Technologies, Inc.
@ -56,7 +56,7 @@ static void errreg_cbfunc (pmix_status_t status,
status, (unsigned long)errhandler_ref);
}
int pmix2x_client_init(void)
int pmix2x_client_init(opal_list_t *ilist)
{
opal_process_name_t pname;
pmix_status_t rc;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
@ -35,7 +35,7 @@
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_flux.h"
static int flux_init(void);
static int flux_init(opal_list_t *ilist);
static int flux_fini(void);
static int flux_initialized(void);
static int flux_abort(int flag, const char msg[],
@ -359,7 +359,7 @@ done:
return ret;
}
static int flux_init(void)
static int flux_init(opal_list_t *ilist)
{
int initialized;
int spawned;
@ -372,6 +372,10 @@ static int flux_init(void)
opal_process_name_t wildcard_rank;
char *str;
if (0 < pmix_init_count) {
return OPAL_SUCCESS;
}
if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
OPAL_PMI_ERROR(rc, "PMI_Initialized");
return OPAL_ERROR;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
@ -38,47 +38,47 @@
#include "opal/mca/pmix/base/pmix_base_hash.h"
static int isolated_init(void);
static int isolated_init(opal_list_t *ilist);
static int isolated_fini(void);
static int isolated_initialized(void);
static int isolated_abort(int flat, const char *msg,
opal_list_t *procs);
opal_list_t *procs);
static int isolated_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
static int isolated_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
opal_pmix_spawn_cbfunc_t cbfunc,
void *cbdata);
opal_pmix_spawn_cbfunc_t cbfunc,
void *cbdata);
static int isolated_job_connect(opal_list_t *procs);
static int isolated_job_disconnect(opal_list_t *procs);
static int isolated_job_disconnect_nb(opal_list_t *procs,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata);
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata);
static int isolated_resolve_peers(const char *nodename,
opal_jobid_t jobid,
opal_list_t *procs);
opal_jobid_t jobid,
opal_list_t *procs);
static int isolated_resolve_nodes(opal_jobid_t jobid, char **nodelist);
static int isolated_put(opal_pmix_scope_t scope, opal_value_t *kv);
static int isolated_fence(opal_list_t *procs, int collect_data);
static int isolated_fence_nb(opal_list_t *procs, int collect_data,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static int isolated_commit(void);
static int isolated_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv);
const char *key, opal_list_t *info,
opal_value_t **kv);
static int isolated_get_nb(const opal_process_name_t *id, const char *key,
opal_list_t *info,
opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
opal_list_t *info,
opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
static int isolated_publish(opal_list_t *info);
static int isolated_publish_nb(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static int isolated_lookup(opal_list_t *data, opal_list_t *info);
static int isolated_lookup_nb(char **keys, opal_list_t *info,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
static int isolated_unpublish(char **keys, opal_list_t *info);
static int isolated_unpublish_nb(char **keys, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
static const char *isolated_get_version(void);
static int isolated_store_local(const opal_process_name_t *proc,
opal_value_t *val);
opal_value_t *val);
static const char *isolated_get_nspace(opal_jobid_t jobid);
static void isolated_register_jobid(opal_jobid_t jobid, const char *nspace);
@ -118,11 +118,15 @@ const opal_pmix_base_module_t opal_pmix_isolated_module = {
static int isolated_init_count = 0;
static opal_process_name_t isolated_pname;
static int isolated_init(void)
static int isolated_init(opal_list_t *ilist)
{
int rc;
opal_value_t kv;
if (0 < isolated_init_count) {
return OPAL_SUCCESS;
}
++isolated_init_count;
/* store our name in the opal_proc_t so that
@ -133,8 +137,8 @@ static int isolated_init(void)
isolated_pname.vpid = 0;
opal_proc_set_name(&isolated_pname);
opal_output_verbose(10, opal_pmix_base_framework.framework_output,
"%s pmix:isolated: assigned tmp name %d %d",
OPAL_NAME_PRINT(isolated_pname),isolated_pname.jobid,isolated_pname.vpid);
"%s pmix:isolated: assigned tmp name %d %d",
OPAL_NAME_PRINT(isolated_pname),isolated_pname.jobid,isolated_pname.vpid);
// setup hash table
opal_pmix_base_hash_init();
@ -145,9 +149,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT32;
kv.data.uint32 = 1;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -157,9 +161,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT32;
kv.data.uint32 = 0;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -168,9 +172,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT32;
kv.data.uint32 = 1;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -179,9 +183,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT32;
kv.data.uint32 = 1;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -191,9 +195,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT32;
kv.data.uint32 = 1;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -202,9 +206,9 @@ static int isolated_init(void)
kv.type = OPAL_STRING;
kv.data.string = strdup("0");
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
@ -214,9 +218,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT64;
kv.data.uint64 = 0;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
/* save our local rank */
@ -225,9 +229,9 @@ static int isolated_init(void)
kv.type = OPAL_UINT16;
kv.data.uint16 = 0;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
/* and our node rank */
@ -236,26 +240,26 @@ static int isolated_init(void)
kv.type = OPAL_UINT16;
kv.data.uint16 = 0;
if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
OPAL_ERROR_LOG(rc);
OBJ_DESTRUCT(&kv);
goto err_exit;
}
OBJ_DESTRUCT(&kv);
return OPAL_SUCCESS;
err_exit:
err_exit:
return rc;
}
static int isolated_fini(void)
{
if (0 == isolated_init_count) {
return OPAL_SUCCESS;
return OPAL_SUCCESS;
}
if (0 != --isolated_init_count) {
return OPAL_SUCCESS;
return OPAL_SUCCESS;
}
opal_pmix_base_hash_finalize();
return OPAL_SUCCESS;
@ -264,13 +268,13 @@ static int isolated_fini(void)
static int isolated_initialized(void)
{
if (0 < isolated_init_count) {
return 1;
return 1;
}
return 0;
}
static int isolated_abort(int flag, const char *msg,
opal_list_t *procs)
opal_list_t *procs)
{
return OPAL_SUCCESS;
}
@ -281,8 +285,8 @@ static int isolated_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t
}
static int isolated_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
opal_pmix_spawn_cbfunc_t cbfunc,
void *cbdata)
opal_pmix_spawn_cbfunc_t cbfunc,
void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
@ -298,15 +302,15 @@ static int isolated_job_disconnect(opal_list_t *procs)
}
static int isolated_job_disconnect_nb(opal_list_t *procs,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
static int isolated_resolve_peers(const char *nodename,
opal_jobid_t jobid,
opal_list_t *procs)
opal_jobid_t jobid,
opal_list_t *procs)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
@ -317,16 +321,16 @@ static int isolated_resolve_nodes(opal_jobid_t jobid, char **nodelist)
}
static int isolated_put(opal_pmix_scope_t scope,
opal_value_t *kv)
opal_value_t *kv)
{
int rc;
opal_output_verbose(10, opal_pmix_base_framework.framework_output,
"%s pmix:isolated isolated_put key %s scope %d\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);
"%s pmix:isolated isolated_put key %s scope %d\n",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);
if (!isolated_init_count) {
return OPAL_ERROR;
return OPAL_ERROR;
}
rc = opal_pmix_base_store(&isolated_pname, kv);
@ -345,39 +349,39 @@ static int isolated_fence(opal_list_t *procs, int collect_data)
}
static int isolated_fence_nb(opal_list_t *procs, int collect_data,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
static int isolated_get(const opal_process_name_t *id,
const char *key, opal_list_t *info,
opal_value_t **kv)
const char *key, opal_list_t *info,
opal_value_t **kv)
{
int rc;
opal_list_t vals;
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:isolated getting value for proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*id), key);
"%s pmix:isolated getting value for proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*id), key);
OBJ_CONSTRUCT(&vals, opal_list_t);
rc = opal_pmix_base_fetch(id, key, &vals);
if (OPAL_SUCCESS == rc) {
*kv = (opal_value_t*)opal_list_remove_first(&vals);
return OPAL_SUCCESS;
*kv = (opal_value_t*)opal_list_remove_first(&vals);
return OPAL_SUCCESS;
} else {
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:isolated fetch from dstore failed: %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc);
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
"%s pmix:isolated fetch from dstore failed: %d",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc);
}
OPAL_LIST_DESTRUCT(&vals);
return rc;
}
static int isolated_get_nb(const opal_process_name_t *id, const char *key,
opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
@ -388,7 +392,7 @@ static int isolated_publish(opal_list_t *info)
}
static int isolated_publish_nb(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
@ -399,7 +403,7 @@ static int isolated_lookup(opal_list_t *data, opal_list_t *info)
}
static int isolated_lookup_nb(char **keys, opal_list_t *info,
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
@ -410,7 +414,7 @@ static int isolated_unpublish(char **keys, opal_list_t *info)
}
static int isolated_unpublish_nb(char **keys, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return OPAL_ERR_NOT_SUPPORTED;
}
@ -421,7 +425,7 @@ static const char *isolated_get_version(void)
}
static int isolated_store_local(const opal_process_name_t *proc,
opal_value_t *val)
opal_value_t *val)
{
opal_pmix_base_store(proc, val);

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
@ -284,7 +284,7 @@ extern int opal_pmix_base_exchange(opal_value_t *info,
* If the information is not found, or the server connection fails, then
* an appropriate error constant will be returned.
*/
typedef int (*opal_pmix_base_module_init_fn_t)(void);
typedef int (*opal_pmix_base_module_init_fn_t)(opal_list_t *ilist);
/* Finalize the PMIx client, closing the connection to the local server.
* An error code will be returned if, for some reason, the connection

Просмотреть файл

@ -131,6 +131,10 @@ typedef uint32_t pmix_rank_t;
#define PMIX_GRPID "pmix.egid" // (uint32_t) effective group id
#define PMIX_DSTPATH "pmix.dstpath" // (char*) path to dstore files
#define PMIX_VERSION_INFO "pmix.version" // (char*) PMIx version of contactor
#define PMIX_PROGRAMMING_MODEL "pmix.pgm.model" // (char*) programming model being initialized (e.g., "MPI" or "OpenMP")
#define PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH")
#define PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1")
#define PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads")
/* attributes for the USOCK rendezvous socket */
@ -531,6 +535,7 @@ typedef int pmix_status_t;
#define PMIX_ERR_EVENT_REGISTRATION (PMIX_ERR_OP_BASE - 14)
#define PMIX_ERR_JOB_TERMINATED (PMIX_ERR_OP_BASE - 15)
#define PMIX_ERR_UPDATE_ENDPOINTS (PMIX_ERR_OP_BASE - 16)
#define PMIX_MODEL_DECLARED (PMIX_ERR_OP_BASE - 17)
/* define a starting point for system error constants so
* we avoid renumbering when making additions */

Просмотреть файл

@ -57,7 +57,7 @@
#elif PMIX_CC_USE_IDENT
#ident PMIX_VERSION
#endif
static const char pmix_version_string[] = PMIX_VERSION;
static const char pmix_version_string[] = PMIX_VERSION;
#include "src/class/pmix_list.h"
@ -134,8 +134,8 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer,
goto error;
}
/* we always leave space for a callback object */
chain->ninfo = ninfo + 1;
/* we always leave space for the evhandler name plus a callback object */
chain->ninfo = ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (0 < ninfo) {
@ -145,8 +145,10 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer,
goto error;
}
}
/* put the evhandler name tag in its place */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING);
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] pmix:client_notify_recv - processing event %d, calling errhandler",
@ -236,6 +238,79 @@ static void evhandler_reg_callbk(pmix_status_t status,
*active = status;
}
typedef struct {
pmix_info_t *info;
size_t ninfo;
} mydata_t;
static void release_info(pmix_status_t status, void *cbdata)
{
mydata_t *cd = (mydata_t*)cbdata;
PMIX_INFO_FREE(cd->info, cd->ninfo);
free(cd);
}
static void _check_for_notify(pmix_info_t info[], size_t ninfo)
{
mydata_t *cd;
size_t n, m=0;
pmix_info_t *model=NULL, *library=NULL, *vers=NULL, *tmod=NULL;
for (n=0; n < ninfo; n++) {
if (0 == strncmp(info[n].key, PMIX_PROGRAMMING_MODEL, PMIX_MAX_KEYLEN)) {
/* we need to generate an event indicating that
* a programming model has been declared */
model = &info[n];
++m;
} else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_NAME, PMIX_MAX_KEYLEN)) {
library = &info[n];
++m;
} else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_VERSION, PMIX_MAX_KEYLEN)) {
vers = &info[n];
++m;
} else if (0 == strncmp(info[n].key, PMIX_THREADING_MODEL, PMIX_MAX_KEYLEN)) {
tmod = &info[n];
++m;
}
}
if (0 < m) {
/* notify anyone listening that a model has been declared */
cd = (mydata_t*)malloc(sizeof(mydata_t));
if (NULL == cd) {
/* nothing we can do */
return;
}
PMIX_INFO_CREATE(cd->info, m+1);
if (NULL == cd->info) {
free(cd);
return;
}
cd->ninfo = m+1;
n = 0;
if (NULL != model) {
PMIX_INFO_XFER(&cd->info[n], model);
++n;
}
if (NULL != library) {
PMIX_INFO_XFER(&cd->info[n], library);
++n;
}
if (NULL != vers) {
PMIX_INFO_XFER(&cd->info[n], vers);
++n;
}
if (NULL != tmod) {
PMIX_INFO_XFER(&cd->info[n], tmod);
++n;
}
/* mark that it is not to go to any default handlers */
PMIX_INFO_LOAD(&cd->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
PMIx_Notify_event(PMIX_MODEL_DECLARED,
&pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
cd->info, cd->ninfo, release_info, (void*)cd);
}
}
PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
pmix_info_t info[], size_t ninfo)
{
@ -263,6 +338,12 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
(void)strncpy(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
proc->rank = pmix_globals.myid.rank;
}
/* we also need to check the info keys to see if something need
* be done with them - e.g., to notify another library that we
* also have called init */
if (NULL != info) {
_check_for_notify(info, ninfo);
}
++pmix_globals.init_cntr;
return PMIX_SUCCESS;
}
@ -280,6 +361,8 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
}
/* setup the globals */
PMIX_CONSTRUCT(&pmix_globals.notifications, pmix_ring_buffer_t);
pmix_ring_buffer_init(&pmix_globals.notifications, 256);
PMIX_CONSTRUCT(&pmix_client_globals.pending_requests, pmix_list_t);
PMIX_CONSTRUCT(&pmix_client_globals.myserver, pmix_peer_t);
@ -381,6 +464,11 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc,
}
PMIX_INFO_DESTRUCT(&ginfo);
/* check to see if we need to notify anyone */
if (NULL != info) {
_check_for_notify(info, ninfo);
}
return PMIX_SUCCESS;
}

Просмотреть файл

@ -125,11 +125,14 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
pmix_event_chain_t *_ch; \
_ch = PMIX_NEW(pmix_event_chain_t); \
_ch->status = (e); \
_ch->ninfo = 1; \
_ch->ninfo = 2; \
_ch->final_cbfunc = (f); \
_ch->final_cbdata = _ch; \
PMIX_INFO_CREATE(_ch->info, _ch->ninfo); \
PMIX_INFO_LOAD(&_ch->info[0], \
PMIX_EVENT_HDLR_NAME, \
NULL, PMIX_STRING); \
PMIX_INFO_LOAD(&_ch->info[1], \
PMIX_EVENT_RETURN_OBJECT, \
NULL, PMIX_POINTER); \
pmix_invoke_local_event_hdlr(_ch); \

Просмотреть файл

@ -94,7 +94,7 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
pmix_cb_t *cb;
pmix_event_chain_t *chain;
size_t n;
pmix_notify_caddy_t *cd, *rbout;
pmix_output_verbose(2, pmix_globals.debug_output,
"client: notifying server %s:%d of status %s",
@ -104,36 +104,39 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
if (!pmix_globals.connected) {
return PMIX_ERR_UNREACH;
}
/* create the msg object */
msg = PMIX_NEW(pmix_buffer_t);
/* pack the command */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* pack the status */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* no need to pack the source as it is us */
if (PMIX_RANGE_PROC_LOCAL != range) {
/* create the msg object */
msg = PMIX_NEW(pmix_buffer_t);
/* pack the range */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* pack the info */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
if (0 < ninfo) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) {
/* pack the command */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* pack the status */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* no need to pack the source as it is us */
/* pack the range */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
/* pack the info */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
if (0 < ninfo) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
}
/* setup for our own local callbacks */
@ -141,8 +144,9 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
chain->status = status;
(void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
chain->source.rank = pmix_globals.myid.rank;
/* we always leave space for a callback object */
chain->ninfo = ninfo + 1;
/* we always leave space for a callback object and
* the evhandler name. */
chain->ninfo = ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (0 < ninfo) {
@ -151,29 +155,84 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
PMIX_INFO_XFER(&chain->info[n], &info[n]);
}
}
/* put the evhandler name tag in the next-to-last element - we
* will fill it in as each handler is called */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING);
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the server acks/nacks the register events request*/
cb = PMIX_NEW(pmix_cb_t);
cb->op_cbfunc = cbfunc;
cb->cbdata = cbdata;
/* send to the server */
pmix_output_verbose(2, pmix_globals.debug_output,
"client: notifying server %s:%d - sending",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(cb);
goto cleanup;
/* we need to cache this event so we can pass it into
* ourselves should someone later register for it */
cd = PMIX_NEW(pmix_notify_caddy_t);
cd->status = status;
if (NULL == source) {
(void)strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN);
cd->source.rank = PMIX_RANK_UNDEF;
} else {
(void)strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN);
cd->source.rank = source->rank;
}
cd->range = range;
/* check for directives */
if (NULL != info) {
cd->ninfo = chain->ninfo;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
for (n=0; n < chain->ninfo; n++) {
PMIX_INFO_XFER(&cd->info[n], &chain->info[n]);
if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
cd->nondefault = true;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
/* provides an array of pmix_proc_t identifying the procs
* that are to receive this notification, or a single pmix_proc_t */
if (PMIX_DATA_ARRAY == cd->info[n].value.type &&
NULL != cd->info[n].value.data.darray &&
NULL != cd->info[n].value.data.darray->array) {
cd->ntargets = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t));
} else if (PMIX_PROC == cd->info[n].value.type) {
cd->ntargets = 1;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
} else {
/* this is an error */
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
}
}
}
/* add to our cache */
rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd);
/* if an older event was bumped, release it */
if (NULL != rbout) {
PMIX_RELEASE(rbout);
}
if (PMIX_RANGE_PROC_LOCAL != range) {
/* create a callback object as we need to pass it to the
* recv routine so we know which callback to use when
* the server acks/nacks the register events request. The
* server will _not_ send this notification back to us,
* so we handle it locally */
cb = PMIX_NEW(pmix_cb_t);
cb->op_cbfunc = cbfunc;
cb->cbdata = cbdata;
/* send to the server */
pmix_output_verbose(2, pmix_globals.debug_output,
"client: notifying server %s:%d - sending",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(cb);
goto cleanup;
}
}
/* now notify any matching registered callbacks we have */
pmix_invoke_local_event_hdlr(chain);
PMIX_RELEASE(chain); // maintain accounting
return PMIX_SUCCESS;
@ -245,7 +304,7 @@ static void progress_local_event_hdlr(pmix_status_t status,
chain->nresults = cnt;
/* if the caller indicates that the chain is completed,
* or we completed the "last" event, then stop here */
* or we completed the "last" event */
if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) {
goto complete;
}
@ -261,6 +320,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
if (nxt->codes[0] == chain->status &&
check_range(&nxt->rng, &chain->source)) {
chain->evhdlr = nxt;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
nxt->evhdlr(nxt->index,
@ -294,6 +360,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
* the source fits within it */
if (nxt->codes[n] == chain->status) {
chain->evhdlr = nxt;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
nxt->evhdlr(nxt->index,
@ -321,6 +394,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
* the source fits within it */
if (check_range(&nxt->rng, &chain->source)) {
chain->evhdlr = nxt;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
nxt->evhdlr(nxt->index,
@ -341,6 +421,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
if (1 == pmix_globals.events.last->ncodes &&
pmix_globals.events.last->codes[0] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
chain->evhdlr->evhdlr(chain->evhdlr->index,
@ -354,6 +441,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
for (n=0; n < pmix_globals.events.last->ncodes; n++) {
if (pmix_globals.events.last->codes[n] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
chain->evhdlr->evhdlr(chain->evhdlr->index,
@ -367,6 +461,13 @@ static void progress_local_event_hdlr(pmix_status_t status,
} else {
/* gets run for all codes */
chain->evhdlr = pmix_globals.events.last;
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
chain->evhdlr->evhdlr(chain->evhdlr->index,
@ -411,8 +512,9 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
bool found;
pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d invoke_local_event_hdlr",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
"%s:%d invoke_local_event_hdlr for status %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
PMIx_Error_string(chain->status));
/* sanity check */
if (NULL == chain->info) {
@ -490,19 +592,42 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
}
}
/* if they didn't want it to go to a default handler, then we are done */
if (chain->nondefault) {
goto complete;
/* if they didn't want it to go to a default handler, then ignore them */
if (!chain->nondefault) {
/* pass it to any default handlers */
PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
if (check_range(&evhdlr->rng, &chain->source)) {
/* invoke the handler */
chain->evhdlr = evhdlr;
goto invk;
}
}
}
/* finally, pass it to any default handlers */
PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
if (check_range(&evhdlr->rng, &chain->source)) {
/* invoke the handler */
chain->evhdlr = evhdlr;
/* if we registered a "last" handler, and it fits the given range
* and code, then invoke it now */
if (NULL != pmix_globals.events.last &&
check_range(&pmix_globals.events.last->rng, &chain->source)) {
chain->endchain = true; // ensure we don't do this again
if (1 == pmix_globals.events.last->ncodes &&
pmix_globals.events.last->codes[0] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
goto invk;
} else if (NULL != pmix_globals.events.last->codes) {
/* need to check if this code is included in the array */
for (i=0; i < pmix_globals.events.last->ncodes; i++) {
if (pmix_globals.events.last->codes[i] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
goto invk;
}
}
} else {
/* gets run for all codes */
chain->evhdlr = pmix_globals.events.last;
goto invk;
}
}
/* if we got here, then nothing was found */
complete:
/* we still have to call their final callback */
@ -514,9 +639,18 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
invk:
/* invoke the handler */
/* add the handler name in case they want to reference it */
if (NULL != chain->info[chain->ninfo-2].value.data.string) {
free(chain->info[chain->ninfo-2].value.data.string);
}
if (NULL != chain->evhdlr->name) {
chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name);
}
chain->info[chain->ninfo-1].value.data.ptr = chain->evhdlr->cbobject;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] INVOKING EVHDLR", __FILE__, __LINE__);
"[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__,
(NULL == chain->evhdlr->name) ?
"NULL" : chain->evhdlr->name);
chain->evhdlr->evhdlr(chain->evhdlr->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
@ -525,6 +659,15 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
return;
}
static void local_cbfunc(pmix_status_t status, void *cbdata)
{
pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
if (NULL != cd->cbfunc) {
cd->cbfunc(status, cd->cbdata);
}
PMIX_RELEASE(cd);
}
static void _notify_client_event(int sd, short args, void *cbdata)
{
@ -532,8 +675,9 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_notify_caddy_t *rbout;
pmix_regevents_info_t *reginfoptr;
pmix_peer_events_info_t *pr;
pmix_event_chain_t *chain;
size_t n;
bool matched;
bool matched, holdcd;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: _notify_error notifying clients of error %s",
@ -544,57 +688,102 @@ static void _notify_client_event(int sd, short args, void *cbdata)
* the message until all local procs have received it, or it ages to
* the point where it gets pushed out by more recent events */
PMIX_RETAIN(cd);
rbout = pmix_ring_buffer_push(&pmix_server_globals.notifications, cd);
rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd);
/* if an older event was bumped, release it */
if (NULL != rbout) {
PMIX_RELEASE(rbout);
}
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
cd->status == reginfoptr->code) {
PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
/* if this client was the source of the event, then
* don't send it back */
if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pr->peer->info->rank) {
continue;
}
/* if we were given specific targets, check if this is one */
if (NULL != cd->targets) {
matched = false;
for (n=0; n < cd->ntargets; n++) {
if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
pr->peer->info->rank == cd->targets[n].rank) {
matched = true;
break;
}
}
if (!matched) {
/* do not notify this one */
holdcd = false;
if (PMIX_RANGE_PROC_LOCAL != cd->range) {
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
cd->status == reginfoptr->code) {
PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
/* if this client was the source of the event, then
* don't send it back as they will have processed it
* when they generated it */
if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pr->peer->info->rank) {
continue;
}
/* if we were given specific targets, check if this is one */
if (NULL != cd->targets) {
matched = false;
for (n=0; n < cd->ntargets; n++) {
if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
pr->peer->info->rank == cd->targets[n].rank) {
matched = true;
break;
}
}
if (!matched) {
/* do not notify this one */
continue;
}
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%d",
pr->peer->info->nptr->nspace, pr->peer->info->rank);
PMIX_RETAIN(cd->buf);
PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf);
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%d",
pr->peer->info->nptr->nspace, pr->peer->info->rank);
PMIX_RETAIN(cd->buf);
PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf);
}
}
if (PMIX_RANGE_LOCAL != cd->range &&
0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pmix_globals.myid.rank) {
/* if we are the source, then we need to post this upwards as
* well so the host RM can broadcast it as necessary - we rely
* on the host RM to _not_ deliver this back to us! */
if (NULL != pmix_host_server.notify_event) {
/* mark that we sent it upstairs so we don't release
* the caddy until we return from the host RM */
holdcd = true;
pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
cd->info, cd->ninfo, local_cbfunc, cd);
}
}
}
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
/* we may also have registered for events, so be sure to check this
* against our registrations */
chain = PMIX_NEW(pmix_event_chain_t);
chain->status = cd->status;
(void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
chain->source.rank = cd->source.rank;
/* we always leave space for a callback object and
* the evhandler name. */
chain->ninfo = cd->ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (0 < cd->ninfo) {
/* need to copy the info */
for (n=0; n < cd->ninfo; n++) {
PMIX_INFO_XFER(&chain->info[n], &cd->info[n]);
}
}
/* put the evhandler name tag in the next-to-last element - we
* will fill it in as each handler is called */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING);
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
/* process it */
pmix_invoke_local_event_hdlr(chain);
if (!holdcd) {
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
}
PMIX_RELEASE(cd);
}
PMIX_RELEASE(cd);
}

Просмотреть файл

@ -325,20 +325,22 @@ static pmix_status_t _add_hdlr(pmix_rshift_caddy_t *cd, pmix_list_t *xfer)
static void reg_event_hdlr(int sd, short args, void *cbdata)
{
size_t index = 0, n;
pmix_status_t rc;
pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)cbdata;
size_t index = 0, n, i;
pmix_status_t rc;
pmix_event_hdlr_t *evhdlr, *ev;
uint8_t location = PMIX_EVENT_ORDER_NONE;
char *name = NULL, *locator = NULL;
bool firstoverall=false, lastoverall=false;
bool found;
bool found, matched;
pmix_list_t xfer;
pmix_info_caddy_t *ixfer;
void *cbobject = NULL;
pmix_data_range_t range = PMIX_RANGE_UNDEF;
pmix_proc_t *parray = NULL;
size_t nprocs;
pmix_notify_caddy_t *ncd;
pmix_event_chain_t *chain;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: register event_hdlr with %d infos", (int)cd->ninfo);
@ -672,6 +674,66 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
cd->evregcbfn(rc, index, cd->cbdata);
}
/* check if any matching notifications have been cached */
for (i=0; i < pmix_globals.notifications.size; i++) {
if (NULL == (ncd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) {
break;
}
found = false;
if (NULL == cd->codes) {
/* they registered a default event handler - always matches */
found = true;
} else {
for (n=0; n < cd->ncodes; n++) {
if (cd->codes[n] == ncd->status) {
found = true;
break;
}
}
}
if (found) {
/* if we were given specific targets, check if we are one */
if (NULL != ncd->targets) {
matched = false;
for (n=0; n < ncd->ntargets; n++) {
if (0 != strncmp(pmix_globals.myid.nspace, ncd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == ncd->targets[n].rank ||
pmix_globals.myid.rank == ncd->targets[n].rank) {
matched = true;
break;
}
}
if (!matched) {
/* do not notify this one */
continue;
}
}
/* all matches - notify */
chain = PMIX_NEW(pmix_event_chain_t);
chain->status = ncd->status;
(void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN);
chain->source.rank = pmix_globals.myid.rank;
/* we already left space for evhandler name plus
* a callback object when we cached the notification */
chain->ninfo = ncd->ninfo;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (0 < cd->ninfo) {
/* need to copy the info */
for (n=0; n < ncd->ninfo; n++) {
PMIX_INFO_XFER(&chain->info[n], &ncd->info[n]);
}
}
/* we don't want this chain to propagate, so indicate it
* should only be run as a single-shot */
chain->endchain = true;
/* now notify any matching registered callbacks we have */
pmix_invoke_local_event_hdlr(chain);
}
}
/* all done */
PMIX_RELEASE(cd);
}

Просмотреть файл

@ -36,6 +36,7 @@
#include "src/buffer_ops/types.h"
#include "src/class/pmix_hash_table.h"
#include "src/class/pmix_list.h"
#include "src/class/pmix_ring_buffer.h"
#include "src/event/pmix_event.h"
#include "src/mca/psec/psec.h"
@ -358,21 +359,22 @@ PMIX_CLASS_DECLARATION(pmix_info_caddy_t);
* between various parts of the code library. Both the client
* and server libraries must instance this structure */
typedef struct {
int init_cntr; // #times someone called Init - #times called Finalize
int init_cntr; // #times someone called Init - #times called Finalize
pmix_proc_t myid;
pmix_peer_t *mypeer; // my own peer object
pmix_peer_t *mypeer; // my own peer object
pmix_proc_type_t proc_type;
uid_t uid; // my effective uid
gid_t gid; // my effective gid
uid_t uid; // my effective uid
gid_t gid; // my effective gid
int pindex;
pmix_event_base_t *evbase;
bool external_evbase;
int debug_output;
pmix_events_t events; // my event handler registrations.
pmix_events_t events; // my event handler registrations.
bool connected;
pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about
pmix_buffer_t *cache_local; // data PUT by me to local scope
pmix_buffer_t *cache_remote; // data PUT by me to remote scope
pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about
pmix_buffer_t *cache_local; // data PUT by me to local scope
pmix_buffer_t *cache_remote; // data PUT by me to remote scope
pmix_ring_buffer_t notifications; // ring buffer of pending notifications
} pmix_globals_t;

Просмотреть файл

@ -105,6 +105,10 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module)
pmix_globals.myid.rank = strtol(evar, NULL, 10);
}
/* construct the global notification ring buffer */
PMIX_CONSTRUCT(&pmix_globals.notifications, pmix_ring_buffer_t);
pmix_ring_buffer_init(&pmix_globals.notifications, 256);
/* setup the server-specific globals */
PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t);
pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1);
@ -113,8 +117,6 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module)
PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_buffer_t);
PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t);
PMIX_CONSTRUCT(&pmix_server_globals.notifications, pmix_ring_buffer_t);
pmix_ring_buffer_init(&pmix_server_globals.notifications, 256);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server init called");
@ -261,7 +263,7 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void)
PMIX_LIST_DESTRUCT(&pmix_server_globals.remote_pnd);
PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs);
PMIX_DESTRUCT(&pmix_server_globals.gdata);
PMIX_DESTRUCT(&pmix_server_globals.notifications);
PMIX_DESTRUCT(&pmix_globals.notifications);
PMIX_LIST_DESTRUCT(&pmix_server_globals.events);
if (NULL != security_mode) {

Просмотреть файл

@ -1160,8 +1160,8 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
check:
/* check if any matching notifications have been cached */
for (i=0; i < pmix_server_globals.notifications.size; i++) {
if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_server_globals.notifications, i))) {
for (i=0; i < pmix_globals.notifications.size; i++) {
if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) {
break;
}
found = false;

Просмотреть файл

@ -111,7 +111,6 @@ typedef struct {
pmix_list_t local_reqs; // list of pmix_dmdx_local_t awaiting arrival of data from local neighbours
pmix_buffer_t gdata; // cache of data given to me for passing to all clients
pmix_list_t events; // list of pmix_regevents_info_t registered events
pmix_ring_buffer_t notifications; // ring buffer of pending notifications
bool tool_connections_allowed;
} pmix_server_globals_t;

Просмотреть файл

@ -167,6 +167,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum)
return "PMIX HEARTBEAT ALERT";
case PMIX_MONITOR_FILE_ALERT:
return "PMIX FILE MONITOR ALERT";
case PMIX_MODEL_DECLARED:
return "PMIX MODEL DECLARED";
case PMIX_SUCCESS:
return "SUCCESS";
default:

Просмотреть файл

@ -409,6 +409,9 @@ pmix_status_t pmix2x_convert_opalrc(int rc)
case OPAL_ERR_PARTIAL_SUCCESS:
return PMIX_QUERY_PARTIAL_SUCCESS;
case OPAL_ERR_MODEL_DECLARED:
return PMIX_MODEL_DECLARED;
case OPAL_ERROR:
return PMIX_ERROR;
case OPAL_SUCCESS:
@ -499,6 +502,10 @@ int pmix2x_convert_rc(pmix_status_t rc)
case PMIX_MONITOR_FILE_ALERT:
return OPAL_ERR_FILE_ALERT;
case PMIX_MODEL_DECLARED:
return OPAL_ERR_MODEL_DECLARED;
case PMIX_ERROR:
return OPAL_ERROR;
case PMIX_SUCCESS:
@ -1010,6 +1017,7 @@ static void _reg_hdlr(int sd, short args, void *cbdata)
n=0;
OPAL_LIST_FOREACH(kv, cd->event_codes, opal_value_t) {
op->pcodes[n] = pmix2x_convert_opalrc(kv->data.integer);
++n;
}
}

Просмотреть файл

@ -186,7 +186,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t);
} while(0)
/**** CLIENT FUNCTIONS ****/
OPAL_MODULE_DECLSPEC int pmix2x_client_init(void);
OPAL_MODULE_DECLSPEC int pmix2x_client_init(opal_list_t *ilist);
OPAL_MODULE_DECLSPEC int pmix2x_client_finalize(void);
OPAL_MODULE_DECLSPEC int pmix2x_initialized(void);
OPAL_MODULE_DECLSPEC int pmix2x_abort(int flag, const char *msg,

Просмотреть файл

@ -36,6 +36,8 @@
static pmix_proc_t my_proc;
static char *dbgvalue=NULL;
static volatile bool regactive;
static bool initialized = false;
#define PMIX_WAIT_FOR_COMPLETION(a) \
do { \
@ -55,28 +57,61 @@ static void errreg_cbfunc (pmix_status_t status,
opal_output_verbose(5, opal_pmix_base_framework.framework_output,
"PMIX client errreg_cbfunc - error handler registered status=%d, reference=%lu",
status, (unsigned long)errhandler_ref);
regactive = false;
}
int pmix2x_client_init(void)
int pmix2x_client_init(opal_list_t *ilist)
{
opal_process_name_t pname;
pmix_status_t rc;
int dbg;
opal_pmix2x_jobid_trkr_t *job;
opal_pmix2x_event_t *event;
pmix_info_t *pinfo;
size_t ninfo, n;
opal_value_t *ival;
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client init");
if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
putenv(dbgvalue);
if (!initialized) {
if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) {
asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg);
putenv(dbgvalue);
}
}
rc = PMIx_Init(&my_proc, NULL, 0);
/* convert the incoming list to info structs */
if (NULL != ilist) {
ninfo = opal_list_get_size(ilist);
if (0 < ninfo) {
PMIX_INFO_CREATE(pinfo, ninfo);
n=0;
OPAL_LIST_FOREACH(ival, ilist, opal_value_t) {
(void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN);
pmix2x_value_load(&pinfo[n].value, ival);
++n;
}
} else {
pinfo = NULL;
}
} else {
pinfo = NULL;
ninfo = 0;
}
rc = PMIx_Init(&my_proc, pinfo, ninfo);
if (PMIX_SUCCESS != rc) {
return pmix2x_convert_rc(rc);
}
if (0 < ninfo) {
PMIX_INFO_FREE(pinfo, ninfo);
}
if (initialized) {
return OPAL_SUCCESS;
}
initialized = true;
/* store our jobid and rank */
if (NULL != getenv(OPAL_MCA_PREFIX"orte_launch")) {
@ -102,7 +137,13 @@ int pmix2x_client_init(void)
/* register the default event handler */
event = OBJ_NEW(opal_pmix2x_event_t);
opal_list_append(&mca_pmix_pmix2x_component.events, &event->super);
PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, event);
PMIX_INFO_CREATE(pinfo, 1);
PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-DEFAULT", PMIX_STRING);
regactive = true;
PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, event);
PMIX_WAIT_FOR_COMPLETION(regactive);
PMIX_INFO_FREE(pinfo, 1);
return OPAL_SUCCESS;
}
@ -130,7 +171,7 @@ int pmix2x_initialized(void)
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"PMIx_client initialized");
return PMIx_Initialized();
return initialized;
}
int pmix2x_abort(int flag, const char *msg,

Просмотреть файл

@ -142,8 +142,11 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
/* register the default event handler */
active = true;
PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active);
PMIX_INFO_CREATE(pinfo, 1);
PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING);
PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active);
PMIX_WAIT_FOR_COMPLETION(active);
PMIX_INFO_FREE(pinfo, 1);
/* as we might want to use some client-side functions, be sure
* to register our own nspace */

Просмотреть файл

@ -67,7 +67,10 @@ BEGIN_C_DECLS
/* identification attributes */
#define OPAL_PMIX_USERID "pmix.euid" // (uint32_t) effective user id
#define OPAL_PMIX_GRPID "pmix.egid" // (uint32_t) effective group id
#define OPAL_PMIX_PROGRAMMING_MODEL "pmix.pgm.model" // (char*) programming model being initialized (e.g., "MPI" or "OpenMP")
#define OPAL_PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH")
#define OPAL_PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1")
#define OPAL_PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads")
/* attributes for the rendezvous socket */
#define OPAL_PMIX_USOCK_DISABLE "pmix.usock.disable" // (bool) disable legacy usock support

Просмотреть файл

@ -31,7 +31,7 @@
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_s1.h"
static int s1_init(void);
static int s1_init(opal_list_t *ilist);
static int s1_fini(void);
static int s1_initialized(void);
static int s1_abort(int flag, const char msg[],
@ -141,7 +141,7 @@ static int kvs_put(const char key[], const char value[])
return rc;
}
static int s1_init(void)
static int s1_init(opal_list_t *ilist)
{
PMI_BOOL initialized;
int spawned;
@ -155,6 +155,10 @@ static int s1_init(void)
char **localranks=NULL;
opal_process_name_t wildcard_rank;
if (0 < pmix_init_count) {
return OPAL_SUCCESS;
}
if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) {
OPAL_PMI_ERROR(rc, "PMI_Initialized");
return OPAL_ERROR;

Просмотреть файл

@ -36,7 +36,7 @@
#include "opal/mca/pmix/base/pmix_base_hash.h"
#include "pmix_s2.h"
static int s2_init(void);
static int s2_init(opal_list_t *ilist);
static int s2_fini(void);
static int s2_initialized(void);
static int s2_abort(int flag, const char msg[],
@ -158,7 +158,7 @@ static int kvs_get(const char key[], char value [], int maxvalue)
return OPAL_SUCCESS;
}
static int s2_init(void)
static int s2_init(opal_list_t *ilist)
{
int spawned, size, rank, appnum;
int rc, ret = OPAL_ERROR;
@ -174,6 +174,10 @@ static int s2_init(void)
char nmtmp[64];
opal_process_name_t wildcard_rank;
if (0 < pmix_init_count) {
return OPAL_SUCCESS;
}
/* if we can't startup PMI, we can't be used */
if ( PMI2_Initialized () ) {
return OPAL_SUCCESS;

Просмотреть файл

@ -9,7 +9,7 @@
* reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -33,6 +33,7 @@
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/state/state.h"
@ -74,7 +75,9 @@ static size_t myerrhandle = SIZE_MAX;
static void register_cbfunc(int status, size_t errhndler, void *cbdata)
{
volatile bool *active = (volatile bool*)cbdata;
myerrhandle = errhndler;
*active = false;
}
static void notify_cbfunc(int status,
@ -117,11 +120,24 @@ static void notify_cbfunc(int status,
************************/
static int init(void)
{
opal_list_t directives;
volatile bool active;
opal_value_t *kv;
/* setup state machine to trap proc errors */
orte_state.add_proc_state(ORTE_PROC_STATE_ERROR, proc_errors, ORTE_ERROR_PRI);
/* tie the default PMIx event handler back to us */
opal_pmix.register_evhandler(NULL, NULL, notify_cbfunc, register_cbfunc, NULL);
active = true;
OBJ_CONSTRUCT(&directives, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME);
kv->type = OPAL_STRING;
kv->data.string = strdup("ORTE-APP-DEFAULT");
opal_list_append(&directives, &kv->super);
opal_pmix.register_evhandler(NULL, &directives, notify_cbfunc, register_cbfunc, (void*)&active);
ORTE_WAIT_FOR_COMPLETION(active);
OPAL_LIST_DESTRUCT(&directives);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -124,7 +124,7 @@ static int rte_init(void)
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
/* initialize the selected module */
if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init()))) {
if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init(NULL)))) {
/* we cannot run */
error = "pmix init";
goto error;

Просмотреть файл

@ -189,7 +189,7 @@ static int rte_init(void)
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
/* initialize the selected module */
if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init()))) {
if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init(NULL)))) {
/* we cannot run */
error = "pmix init";
goto error;

Просмотреть файл

@ -1,4 +1,4 @@
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client paccept pconnect ring hello.sapp binding badcoll attach
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client paccept pconnect ring hello.sapp binding badcoll attach xlib
all: $(PROGS)
@ -10,6 +10,9 @@ hello_output: hello_output.c
hello_show_help: hello_show_help.c
$(CC) $(CFLAGS) $(CFLAGS_INTERNAL) $^ -o $@
xlib: xlib.c
$(CC) $(CFLAGS) $(CFLAGS_INTERNAL) $^ -o $@ -lpmix
CC = mpicc
CFLAGS = -g --openmpi:linkall
CFLAGS_INTERNAL = -I../../.. -I../../../orte/include -I../../../opal/include

Просмотреть файл

@ -12,6 +12,7 @@
# All rights reserved.
# Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
# Copyright (c) 2017 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -55,5 +56,5 @@ EXTRA_DIST += \
test/mpi/singleton_client_server.c \
test/mpi/spawn_tree.c \
test/mpi/info_spawn.c \
test/mpi/pmix.c
test/mpi/pmix.c \
test/mpi/xlib.c

217
orte/test/mpi/xlib.c Обычный файл
Просмотреть файл

@ -0,0 +1,217 @@
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <pmix.h>
#define SIZE 20
#define POS 10
#define INITIAL_VALUE 10
static pmix_proc_t myproc;
/* this is the event notification function we pass down below
* when registering for general events - i.e.,, the default
* handler. We don't technically need to register one, but it
* is usually good practice to catch any events that occur */
static void notification_fn(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo,
pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
/* this example doesn't do anything with default events */
fprintf(stderr, "Default event handler called with status %s\n", PMIx_Error_string(status));
if (NULL != cbfunc) {
cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
}
}
/* this is an event notification function that we explicitly request
* be called when the PMIX_MODEL_DECLARED notification is issued.
* We could catch it in the general event notification function and test
* the status to see if the status matched, but it often is simpler
* to declare a use-specific notification callback point. In this case,
* we are asking to know whenever a programming model library is
* instantiated */
static void model_callback(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo,
pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
size_t n;
fprintf(stderr, "Model event handler called with status %d(%s)\n", status, PMIx_Error_string(status));
/* check to see what model declared itself */
for (n=0; n < ninfo; n++) {
if (PMIX_STRING == info[n].value.type) {
fprintf(stderr, "\t%s:\t%s\n", info[n].key, info[n].value.data.string);
}
}
/* we must NOT tell the event handler state machine that we
* are the last step as that will prevent it from notifying
* anyone else that might be listening for declarations */
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
}
}
/* event handler registration is done asynchronously because it
* may involve the PMIx server registering with the host RM for
* external events. So we provide a callback function that returns
* the status of the request (success or an error), plus a numerical index
* to the registered event. The index is used later on to deregister
* an event handler - if we don't explicitly deregister it, then the
* PMIx server will do so when it see us exit */
static void model_registration_callback(pmix_status_t status,
size_t evhandler_ref,
void *cbdata)
{
volatile int *active = (volatile int*)cbdata;
if (PMIX_SUCCESS != status) {
fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref);
}
*active = status;
}
int main(int argc, char *argv[])
{
int i, rank, size, next, prev, tag = 201;
int array_size = SIZE;
int pos = POS;
int *send_array;
int *recv_array;
pmix_info_t *info;
size_t ninfo;
pmix_status_t code = PMIX_MODEL_DECLARED;
pmix_status_t rc;
volatile int active;
if (1 < argc) {
fprintf(stderr, "Declaring ourselves\n");
/* declare ourselves as a non-MPI library prior to MPI_Init */
ninfo = 4;
PMIX_INFO_CREATE(info, ninfo);
PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "EXAMPLE", PMIX_STRING);
PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "FOOL", PMIX_STRING);
PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3", PMIX_STRING);
PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "NONE", PMIX_STRING);
if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, info, ninfo))) {
fprintf(stderr, "PMIx Init failed: %s\n", PMIx_Error_string(rc));
exit(1);
}
PMIX_INFO_FREE(info, ninfo);
/* register a handler specifically for when models declare */
active = -1;
ninfo = 1;
PMIX_INFO_CREATE(info, ninfo);
PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "APP-MODEL", PMIX_STRING);
PMIx_Register_event_handler(&code, 1, info, ninfo,
model_callback, model_registration_callback, (void*)&active);
while (-1 == active) {
usleep(10);
}
PMIX_INFO_FREE(info, ninfo);
if (0 != active) {
exit(active);
}
}
/* initialize the MPI library - it will declare itself */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (argc <= 1) {
fprintf(stderr, "Registering handler\n");
/* register a handler specifically for when models declare */
active = -1;
ninfo = 1;
PMIX_INFO_CREATE(info, ninfo);
PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "APP-MODEL", PMIX_STRING);
PMIx_Register_event_handler(&code, 1, info, ninfo,
model_callback, model_registration_callback, (void*)&active);
while (-1 == active) {
usleep(10);
}
PMIX_INFO_FREE(info, ninfo);
if (0 != active) {
exit(active);
}
}
fprintf(stderr, "Rank %d has cleared MPI_Init\n", rank);
next = (rank + 1) % size;
prev = (rank + size - 1) % size;
send_array = malloc(sizeof(int) * SIZE);
recv_array = malloc(sizeof(int) * SIZE);
for (i = 0; i < array_size; ++i) {
send_array[i] = 17;
recv_array[i] = -1;
}
if (0 == rank) {
send_array[pos] = INITIAL_VALUE;
MPI_Send(send_array, array_size, MPI_INT, next, tag,
MPI_COMM_WORLD);
}
/* if we didn't already do it, declare another model now */
if (argc <= 1) {
fprintf(stderr, "Declaring ourselves\n");
/* declare ourselves as a non-MPI library after MPI_Init */
ninfo = 4;
PMIX_INFO_CREATE(info, ninfo);
PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "EXAMPLE", PMIX_STRING);
PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "FOOL", PMIX_STRING);
PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3", PMIX_STRING);
PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "NONE", PMIX_STRING);
if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, info, ninfo))) {
fprintf(stderr, "PMIx Init failed: %s\n", PMIx_Error_string(rc));
exit(1);
}
PMIX_INFO_FREE(info, ninfo);
}
while (1) {
recv_array[pos] = -1;
MPI_Recv(recv_array, array_size, MPI_INT, prev, tag,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
send_array[pos] = recv_array[pos];
if (rank == 0) {
--send_array[pos];
}
MPI_Send(send_array, array_size, MPI_INT, next, tag, MPI_COMM_WORLD);
if (0 == send_array[pos]) {
break;
}
}
if (rank == 0) {
MPI_Recv(recv_array, array_size, MPI_INT, prev, tag,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
fprintf(stderr, "Rank %d has completed ring\n", rank);
MPI_Barrier(MPI_COMM_WORLD);
fprintf(stderr, "Rank %d has completed MPI_Barrier\n", rank);
/* decrement the PMIx refcount */
PMIx_Finalize(NULL, 0);
MPI_Finalize();
return 0;
}