From ef0e0171c917cb346f2ff1dcdf1ae8b0693ab549 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 4 May 2017 20:03:35 -0700 Subject: [PATCH 1/2] Implement the changes required to support cross-library coordination. Update PMIx to support intra-process notifications and ensure that we always notify ourselves for events. Add a new ompi/interlib directory where cross-lib coordination code can go, and put the code to declare ourselves there (called from ompi_mpi_init.c). Signed-off-by: Ralph Castain --- .gitignore | 1 + ompi/Makefile.am | 3 +- ompi/interlib/Makefile.am | 29 ++ ompi/interlib/interlib.c | 162 +++++++++++ ompi/interlib/interlib.h | 45 ++++ ompi/mca/rte/orte/rte_orte_module.c | 14 +- ompi/runtime/ompi_mpi_finalize.c | 5 +- ompi/runtime/ompi_mpi_init.c | 14 +- opal/include/opal/constants.h | 4 +- opal/mca/pmix/cray/pmix_cray.c | 4 +- opal/mca/pmix/ext1x/pmix1x.h | 4 +- opal/mca/pmix/ext1x/pmix1x_client.c | 2 +- opal/mca/pmix/ext2x/pmix2x.h | 4 +- opal/mca/pmix/ext2x/pmix2x_client.c | 4 +- opal/mca/pmix/flux/pmix_flux.c | 10 +- opal/mca/pmix/isolated/pmix_isolated.c | 162 +++++------ opal/mca/pmix/pmix.h | 4 +- .../pmix/pmix2x/pmix/include/pmix_common.h | 5 + .../pmix/pmix2x/pmix/src/client/pmix_client.c | 96 ++++++- .../pmix/pmix2x/pmix/src/event/pmix_event.h | 5 +- .../pmix/src/event/pmix_event_notification.c | 253 ++++++++++++++---- .../pmix/src/event/pmix_event_registration.c | 68 ++++- .../pmix2x/pmix/src/include/pmix_globals.h | 18 +- .../pmix/pmix2x/pmix/src/server/pmix_server.c | 8 +- .../pmix2x/pmix/src/server/pmix_server_ops.c | 4 +- .../pmix2x/pmix/src/server/pmix_server_ops.h | 1 - opal/mca/pmix/pmix2x/pmix/src/util/error.c | 2 + opal/mca/pmix/pmix2x/pmix2x.c | 8 + opal/mca/pmix/pmix2x/pmix2x.h | 2 +- opal/mca/pmix/pmix2x/pmix2x_client.c | 55 +++- opal/mca/pmix/pmix2x/pmix2x_server_south.c | 5 +- opal/mca/pmix/pmix_types.h | 5 +- opal/mca/pmix/s1/pmix_s1.c | 8 +- opal/mca/pmix/s2/pmix_s2.c | 8 +- .../errmgr/default_app/errmgr_default_app.c | 20 +- orte/mca/ess/pmi/ess_pmi_module.c | 2 +- orte/mca/ess/singleton/ess_singleton_module.c | 2 +- orte/test/mpi/Makefile | 5 +- orte/test/mpi/Makefile.include | 5 +- orte/test/mpi/xlib.c | 217 +++++++++++++++ 40 files changed, 1070 insertions(+), 203 deletions(-) create mode 100644 ompi/interlib/Makefile.am create mode 100644 ompi/interlib/interlib.c create mode 100644 ompi/interlib/interlib.h create mode 100644 orte/test/mpi/xlib.c diff --git a/.gitignore b/.gitignore index 76c1ab5d15..36908c03f0 100644 --- a/.gitignore +++ b/.gitignore @@ -387,6 +387,7 @@ orte/test/mpi/segv orte/test/mpi/simple_spawn orte/test/mpi/slave orte/test/mpi/spawn_multiple +orte/test/mpi/xlib orte/test/mpi/ziaprobe orte/test/mpi/ziatest orte/test/mpi/*.dwarf diff --git a/ompi/Makefile.am b/ompi/Makefile.am index abe0f1da14..3adcb79a8a 100644 --- a/ompi/Makefile.am +++ b/ompi/Makefile.am @@ -14,7 +14,7 @@ # Copyright (c) 2010-2011 Sandia National Laboratories. All rights reserved. # Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights # reserved. -# Copyright (c) 2015 Intel, Inc. All rights reserved. +# Copyright (c) 2015-2017 Intel, Inc. All rights reserved. # Copyright (c) 2015 Research Organization for Information Science # and Technology (RIST). All rights reserved. # Copyright (c) 2016 IBM Corporation. All rights reserved. @@ -178,6 +178,7 @@ include errhandler/Makefile.am include file/Makefile.am include group/Makefile.am include info/Makefile.am +include interlib/Makefile.am include message/Makefile.am include op/Makefile.am include peruse/Makefile.am diff --git a/ompi/interlib/Makefile.am b/ompi/interlib/Makefile.am new file mode 100644 index 0000000000..1a40fe8b26 --- /dev/null +++ b/ompi/interlib/Makefile.am @@ -0,0 +1,29 @@ +# -*- makefile -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2016 IBM Corporation. All rights reserved. +# Copyright (c) 2017 Intel, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# This makefile.am does not stand on its own - it is included from ompi/Makefile.am + +headers += \ + interlib/interlib.h + +lib@OMPI_LIBMPI_NAME@_la_SOURCES += \ + interlib/interlib.c diff --git a/ompi/interlib/interlib.c b/ompi/interlib/interlib.c new file mode 100644 index 0000000000..9e01d189c3 --- /dev/null +++ b/ompi/interlib/interlib.c @@ -0,0 +1,162 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include + +#include "opal/mca/pmix/pmix.h" +#include "ompi/mca/rte/rte.h" +#include "ompi/interlib/interlib.h" + + +typedef struct { + int status; + volatile bool active; +} myreg_t; + +/* + * errhandler id + */ +static size_t interlibhandler_id = SIZE_MAX; + + +static void model_registration_callback(int status, + size_t errhandler_ref, + void *cbdata) +{ + myreg_t *trk = (myreg_t*)cbdata; + + trk->status = status; + interlibhandler_id = errhandler_ref; + trk->active = false; +} +static void model_callback(int status, + const opal_process_name_t *source, + opal_list_t *info, opal_list_t *results, + opal_pmix_notification_complete_fn_t cbfunc, + void *cbdata) +{ + opal_value_t *val; + + /* we can ignore our own callback as we obviously + * know that we are MPI */ + if (NULL != info) { + OPAL_LIST_FOREACH(val, info, opal_value_t) { + if (OPAL_STRING == val->type) { +#if 0 + opal_output(0, "OMPI Model Callback Key: %s Val %s", val->key, val->data.string); +#else + if (0 == strcmp(val->key, OPAL_PMIX_MODEL_LIBRARY_NAME) && + 0 == strcmp(val->data.string, "OpenMPI")) { + goto cback; + } +#endif + } + } + } + /* otherwise, do something clever here */ + + cback: + /* we must NOT tell the event handler state machine that we + * are the last step as that will prevent it from notifying + * anyone else that might be listening for declarations */ + if (NULL != cbfunc) { + cbfunc(OMPI_SUCCESS, NULL, NULL, NULL, cbdata); + } +} + +int ompi_interlib_declare(int threadlevel, char *version) +{ + opal_list_t info, directives; + opal_value_t *kv; + myreg_t trk; + int ret; + + /* Register an event handler for library model declarations */ + trk.status = OPAL_ERROR; + trk.active = true; + /* give it a name so we can distinguish it */ + OBJ_CONSTRUCT(&directives, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("MPI-Model-Declarations"); + opal_list_append(&directives, &kv->super); + /* specify the event code */ + OBJ_CONSTRUCT(&info, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup("status"); // the key here is irrelevant + kv->type = OPAL_INT; + kv->data.integer = OPAL_ERR_MODEL_DECLARED; + opal_list_append(&info, &kv->super); + /* we could constrain the range to proc_local - technically, this + * isn't required so long as the code that generates + * the event stipulates its range as proc_local. We rely + * on that here */ + opal_pmix.register_evhandler(&info, &directives, model_callback, + model_registration_callback, + (void*)&trk); + OMPI_LAZY_WAIT_FOR_COMPLETION(trk.active); + + OPAL_LIST_DESTRUCT(&directives); + OPAL_LIST_DESTRUCT(&info); + if (OPAL_SUCCESS != trk.status) { + return trk.status; + } + + /* declare that we are present and active */ + OBJ_CONSTRUCT(&info, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_PROGRAMMING_MODEL); + kv->type = OPAL_STRING; + kv->data.string = strdup("MPI"); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("OpenMPI"); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_MODEL_LIBRARY_VERSION); + kv->type = OPAL_STRING; + kv->data.string = strdup(version); + opal_list_append(&info, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_THREADING_MODEL); + kv->type = OPAL_STRING; + if (MPI_THREAD_SINGLE == threadlevel) { + kv->data.string = strdup("NONE"); + } else { + kv->data.string = strdup("PTHREAD"); + } + opal_list_append(&info, &kv->super); + /* call pmix to initialize these values */ + if (OPAL_SUCCESS != (ret = opal_pmix.init(&info))) { + OPAL_LIST_DESTRUCT(&info); + return ret; + } + OPAL_LIST_DESTRUCT(&info); + return OMPI_SUCCESS; +} diff --git a/ompi/interlib/interlib.h b/ompi/interlib/interlib.h new file mode 100644 index 0000000000..404c3e5604 --- /dev/null +++ b/ompi/interlib/interlib.h @@ -0,0 +1,45 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2008-2009 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file **/ + +#ifndef OMPI_INTERLIB_H +#define OMPI_INTERLIB_H + +#include "ompi_config.h" + + +BEGIN_C_DECLS + +/* declare the presence of the OMPI library to other + * libraries that may be used in this application, and + * register for callbacks when any other such libraries + * declare themselves */ +OMPI_DECLSPEC int ompi_interlib_declare(int threadlevel, char *version); + + +END_C_DECLS + +#endif /* OMPI_INTERLIB_H */ diff --git a/ompi/mca/rte/orte/rte_orte_module.c b/ompi/mca/rte/orte/rte_orte_module.c index aa4f5ad5a4..91e86c9ea4 100644 --- a/ompi/mca/rte/orte/rte_orte_module.c +++ b/ompi/mca/rte/orte/rte_orte_module.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2012-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. * Copyright (c) 2012-2014 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. @@ -131,7 +131,7 @@ static void _register_fn(int status, void ompi_rte_wait_for_debugger(void) { int debugger; - opal_list_t *codes; + opal_list_t *codes, directives; opal_value_t *kv; char *evar; int time; @@ -179,9 +179,17 @@ void ompi_rte_wait_for_debugger(void) kv->data.integer = ORTE_ERR_DEBUGGER_RELEASE; opal_list_append(codes, &kv->super); - opal_pmix.register_evhandler(codes, NULL, _release_fn, _register_fn, codes); + OBJ_CONSTRUCT(&directives, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("MPI-DEBUGGER-ATTACH"); + opal_list_append(&directives, &kv->super); + + opal_pmix.register_evhandler(codes, &directives, _release_fn, _register_fn, codes); /* let the MPI progress engine run while we wait for registration to complete */ OMPI_WAIT_FOR_COMPLETION(debugger_register_active); + OPAL_LIST_DESTRUCT(&directives); /* let the MPI progress engine run while we wait for debugger release */ OMPI_WAIT_FOR_COMPLETION(debugger_event_active); diff --git a/ompi/runtime/ompi_mpi_finalize.c b/ompi/runtime/ompi_mpi_finalize.c index 3425329038..2101232e74 100644 --- a/ompi/runtime/ompi_mpi_finalize.c +++ b/ompi/runtime/ompi_mpi_finalize.c @@ -16,7 +16,7 @@ * Copyright (c) 2006 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2011 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * @@ -276,6 +276,9 @@ int ompi_mpi_finalize(void) } } + /* account for our refcount on pmix_init */ + opal_pmix.finalize(); + /* check for timing request - get stop time and report elapsed time if so */ //OPAL_TIMING_DELTAS(ompi_enable_timing, &tm); diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index ce99899bb8..1ba380974b 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -70,6 +70,7 @@ #include "ompi/info/info.h" #include "ompi/errhandler/errcode.h" #include "ompi/errhandler/errhandler.h" +#include "ompi/interlib/interlib.h" #include "ompi/request/request.h" #include "ompi/message/message.h" #include "ompi/op/op.h" @@ -315,7 +316,6 @@ static int _convert_process_name_to_string(char** name_string, return ompi_rte_convert_process_name_to_string(name_string, name); } - void ompi_mpi_thread_level(int requested, int *provided) { /** @@ -525,6 +525,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) kv = OBJ_NEW(opal_value_t); kv->key = strdup(OPAL_PMIX_EVENT_ORDER_PREPEND); opal_list_append(&info, &kv->super); + /* give it a name so we can distinguish it */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("MPI-Default"); + opal_list_append(&info, &kv->super); opal_pmix.register_evhandler(NULL, &info, ompi_errhandler_callback, ompi_errhandler_registration_callback, (void*)&errtrk); @@ -537,6 +543,12 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } + /* declare our presence for interlib coordination, and + * register for callbacks when other libs declare */ + if (OMPI_SUCCESS != (ret = ompi_interlib_declare(*provided, OMPI_IDENT_STRING))) { + error = "ompi_interlib_declare"; + goto error; + } /* determine the bitflag belonging to the threadlevel_support provided */ memset ( &threadlevel_bf, 0, sizeof(uint8_t)); diff --git a/opal/include/opal/constants.h b/opal/include/opal/constants.h index f8fd172dbe..e3e1cd2528 100644 --- a/opal/include/opal/constants.h +++ b/opal/include/opal/constants.h @@ -96,10 +96,10 @@ enum { OPAL_ERR_PROC_MIGRATE = (OPAL_ERR_BASE - 65), OPAL_ERR_EVENT_REGISTRATION = (OPAL_ERR_BASE - 66), OPAL_ERR_HEARTBEAT_ALERT = (OPAL_ERR_BASE - 67), - OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68) + OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68), + OPAL_ERR_MODEL_DECLARED = (OPAL_ERR_BASE - 69) }; #define OPAL_ERR_MAX (OPAL_ERR_BASE - 100) #endif /* OPAL_CONSTANTS_H */ - diff --git a/opal/mca/pmix/cray/pmix_cray.c b/opal/mca/pmix/cray/pmix_cray.c index 756128b069..00f32923f6 100644 --- a/opal/mca/pmix/cray/pmix_cray.c +++ b/opal/mca/pmix/cray/pmix_cray.c @@ -34,7 +34,7 @@ static char cray_pmi_version[128]; -static int cray_init(void); +static int cray_init(opal_list_t *ilist); static int cray_fini(void); static int cray_initialized(void); static int cray_abort(int flat, const char *msg, @@ -282,7 +282,7 @@ static void cray_get_more_info(void) return; } -static int cray_init(void) +static int cray_init(opal_list_t *ilist) { int i, spawned, size, rank, appnum, my_node; int rc, ret = OPAL_ERROR; diff --git a/opal/mca/pmix/ext1x/pmix1x.h b/opal/mca/pmix/ext1x/pmix1x.h index 28a6a9966c..3bcaa9c493 100644 --- a/opal/mca/pmix/ext1x/pmix1x.h +++ b/opal/mca/pmix/ext1x/pmix1x.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016-2017 Research Organization for Information Science @@ -90,7 +90,7 @@ OBJ_CLASS_DECLARATION(pmix1_opalcaddy_t); /**** CLIENT FUNCTIONS ****/ -OPAL_MODULE_DECLSPEC int pmix1_client_init(void); +OPAL_MODULE_DECLSPEC int pmix1_client_init(opal_list_t *ilist); OPAL_MODULE_DECLSPEC int pmix1_client_finalize(void); OPAL_MODULE_DECLSPEC int pmix1_initialized(void); OPAL_MODULE_DECLSPEC int pmix1_abort(int flag, const char *msg, diff --git a/opal/mca/pmix/ext1x/pmix1x_client.c b/opal/mca/pmix/ext1x/pmix1x_client.c index 8f8bb83040..26ef030dbb 100644 --- a/opal/mca/pmix/ext1x/pmix1x_client.c +++ b/opal/mca/pmix/ext1x/pmix1x_client.c @@ -100,7 +100,7 @@ static void errreg_cbfunc (pmix_status_t status, status, errhandler_ref); } -int pmix1_client_init(void) +int pmix1_client_init(opal_list_t *ilist) { opal_process_name_t pname; pmix_status_t rc; diff --git a/opal/mca/pmix/ext2x/pmix2x.h b/opal/mca/pmix/ext2x/pmix2x.h index c849356d37..29aca672f3 100644 --- a/opal/mca/pmix/ext2x/pmix2x.h +++ b/opal/mca/pmix/ext2x/pmix2x.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 Research Organization for Information Science @@ -216,7 +216,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); } while(0) /**** CLIENT FUNCTIONS ****/ -OPAL_MODULE_DECLSPEC int pmix2x_client_init(void); +OPAL_MODULE_DECLSPEC int pmix2x_client_init(opal_list_t *ilist); OPAL_MODULE_DECLSPEC int pmix2x_client_finalize(void); OPAL_MODULE_DECLSPEC int pmix2x_initialized(void); OPAL_MODULE_DECLSPEC int pmix2x_abort(int flag, const char *msg, diff --git a/opal/mca/pmix/ext2x/pmix2x_client.c b/opal/mca/pmix/ext2x/pmix2x_client.c index 1589af9ba6..28485f170b 100644 --- a/opal/mca/pmix/ext2x/pmix2x_client.c +++ b/opal/mca/pmix/ext2x/pmix2x_client.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. @@ -56,7 +56,7 @@ static void errreg_cbfunc (pmix_status_t status, status, (unsigned long)errhandler_ref); } -int pmix2x_client_init(void) +int pmix2x_client_init(opal_list_t *ilist) { opal_process_name_t pname; pmix_status_t rc; diff --git a/opal/mca/pmix/flux/pmix_flux.c b/opal/mca/pmix/flux/pmix_flux.c index a110962bf7..187108bcc7 100644 --- a/opal/mca/pmix/flux/pmix_flux.c +++ b/opal/mca/pmix/flux/pmix_flux.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. @@ -35,7 +35,7 @@ #include "opal/mca/pmix/base/pmix_base_hash.h" #include "pmix_flux.h" -static int flux_init(void); +static int flux_init(opal_list_t *ilist); static int flux_fini(void); static int flux_initialized(void); static int flux_abort(int flag, const char msg[], @@ -359,7 +359,7 @@ done: return ret; } -static int flux_init(void) +static int flux_init(opal_list_t *ilist) { int initialized; int spawned; @@ -372,6 +372,10 @@ static int flux_init(void) opal_process_name_t wildcard_rank; char *str; + if (0 < pmix_init_count) { + return OPAL_SUCCESS; + } + if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) { OPAL_PMI_ERROR(rc, "PMI_Initialized"); return OPAL_ERROR; diff --git a/opal/mca/pmix/isolated/pmix_isolated.c b/opal/mca/pmix/isolated/pmix_isolated.c index 08860ef895..2680496bc3 100644 --- a/opal/mca/pmix/isolated/pmix_isolated.c +++ b/opal/mca/pmix/isolated/pmix_isolated.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * Copyright (c) 2011-2015 Los Alamos National Security, LLC. All * rights reserved. * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. @@ -38,47 +38,47 @@ #include "opal/mca/pmix/base/pmix_base_hash.h" -static int isolated_init(void); +static int isolated_init(opal_list_t *ilist); static int isolated_fini(void); static int isolated_initialized(void); static int isolated_abort(int flat, const char *msg, - opal_list_t *procs); + opal_list_t *procs); static int isolated_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid); static int isolated_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps, - opal_pmix_spawn_cbfunc_t cbfunc, - void *cbdata); + opal_pmix_spawn_cbfunc_t cbfunc, + void *cbdata); static int isolated_job_connect(opal_list_t *procs); static int isolated_job_disconnect(opal_list_t *procs); static int isolated_job_disconnect_nb(opal_list_t *procs, - opal_pmix_op_cbfunc_t cbfunc, - void *cbdata); + opal_pmix_op_cbfunc_t cbfunc, + void *cbdata); static int isolated_resolve_peers(const char *nodename, - opal_jobid_t jobid, - opal_list_t *procs); + opal_jobid_t jobid, + opal_list_t *procs); static int isolated_resolve_nodes(opal_jobid_t jobid, char **nodelist); static int isolated_put(opal_pmix_scope_t scope, opal_value_t *kv); static int isolated_fence(opal_list_t *procs, int collect_data); static int isolated_fence_nb(opal_list_t *procs, int collect_data, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata); + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static int isolated_commit(void); static int isolated_get(const opal_process_name_t *id, - const char *key, opal_list_t *info, - opal_value_t **kv); + const char *key, opal_list_t *info, + opal_value_t **kv); static int isolated_get_nb(const opal_process_name_t *id, const char *key, - opal_list_t *info, - opal_pmix_value_cbfunc_t cbfunc, void *cbdata); + opal_list_t *info, + opal_pmix_value_cbfunc_t cbfunc, void *cbdata); static int isolated_publish(opal_list_t *info); static int isolated_publish_nb(opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata); + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static int isolated_lookup(opal_list_t *data, opal_list_t *info); static int isolated_lookup_nb(char **keys, opal_list_t *info, - opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata); + opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata); static int isolated_unpublish(char **keys, opal_list_t *info); static int isolated_unpublish_nb(char **keys, opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata); + opal_pmix_op_cbfunc_t cbfunc, void *cbdata); static const char *isolated_get_version(void); static int isolated_store_local(const opal_process_name_t *proc, - opal_value_t *val); + opal_value_t *val); static const char *isolated_get_nspace(opal_jobid_t jobid); static void isolated_register_jobid(opal_jobid_t jobid, const char *nspace); @@ -118,11 +118,15 @@ const opal_pmix_base_module_t opal_pmix_isolated_module = { static int isolated_init_count = 0; static opal_process_name_t isolated_pname; -static int isolated_init(void) +static int isolated_init(opal_list_t *ilist) { int rc; opal_value_t kv; + if (0 < isolated_init_count) { + return OPAL_SUCCESS; + } + ++isolated_init_count; /* store our name in the opal_proc_t so that @@ -133,8 +137,8 @@ static int isolated_init(void) isolated_pname.vpid = 0; opal_proc_set_name(&isolated_pname); opal_output_verbose(10, opal_pmix_base_framework.framework_output, - "%s pmix:isolated: assigned tmp name %d %d", - OPAL_NAME_PRINT(isolated_pname),isolated_pname.jobid,isolated_pname.vpid); + "%s pmix:isolated: assigned tmp name %d %d", + OPAL_NAME_PRINT(isolated_pname),isolated_pname.jobid,isolated_pname.vpid); // setup hash table opal_pmix_base_hash_init(); @@ -145,9 +149,9 @@ static int isolated_init(void) kv.type = OPAL_UINT32; kv.data.uint32 = 1; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -157,9 +161,9 @@ static int isolated_init(void) kv.type = OPAL_UINT32; kv.data.uint32 = 0; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -168,9 +172,9 @@ static int isolated_init(void) kv.type = OPAL_UINT32; kv.data.uint32 = 1; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -179,9 +183,9 @@ static int isolated_init(void) kv.type = OPAL_UINT32; kv.data.uint32 = 1; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -191,9 +195,9 @@ static int isolated_init(void) kv.type = OPAL_UINT32; kv.data.uint32 = 1; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -202,9 +206,9 @@ static int isolated_init(void) kv.type = OPAL_STRING; kv.data.string = strdup("0"); if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); @@ -214,9 +218,9 @@ static int isolated_init(void) kv.type = OPAL_UINT64; kv.data.uint64 = 0; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } /* save our local rank */ @@ -225,9 +229,9 @@ static int isolated_init(void) kv.type = OPAL_UINT16; kv.data.uint16 = 0; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } /* and our node rank */ @@ -236,26 +240,26 @@ static int isolated_init(void) kv.type = OPAL_UINT16; kv.data.uint16 = 0; if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - goto err_exit; + OPAL_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + goto err_exit; } OBJ_DESTRUCT(&kv); return OPAL_SUCCESS; -err_exit: + err_exit: return rc; } static int isolated_fini(void) { if (0 == isolated_init_count) { - return OPAL_SUCCESS; + return OPAL_SUCCESS; } if (0 != --isolated_init_count) { - return OPAL_SUCCESS; + return OPAL_SUCCESS; } opal_pmix_base_hash_finalize(); return OPAL_SUCCESS; @@ -264,13 +268,13 @@ static int isolated_fini(void) static int isolated_initialized(void) { if (0 < isolated_init_count) { - return 1; + return 1; } return 0; } static int isolated_abort(int flag, const char *msg, - opal_list_t *procs) + opal_list_t *procs) { return OPAL_SUCCESS; } @@ -281,8 +285,8 @@ static int isolated_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t } static int isolated_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps, - opal_pmix_spawn_cbfunc_t cbfunc, - void *cbdata) + opal_pmix_spawn_cbfunc_t cbfunc, + void *cbdata) { return OPAL_ERR_NOT_SUPPORTED; } @@ -298,15 +302,15 @@ static int isolated_job_disconnect(opal_list_t *procs) } static int isolated_job_disconnect_nb(opal_list_t *procs, - opal_pmix_op_cbfunc_t cbfunc, - void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, + void *cbdata) { return OPAL_ERR_NOT_SUPPORTED; } static int isolated_resolve_peers(const char *nodename, - opal_jobid_t jobid, - opal_list_t *procs) + opal_jobid_t jobid, + opal_list_t *procs) { return OPAL_ERR_NOT_IMPLEMENTED; } @@ -317,16 +321,16 @@ static int isolated_resolve_nodes(opal_jobid_t jobid, char **nodelist) } static int isolated_put(opal_pmix_scope_t scope, - opal_value_t *kv) + opal_value_t *kv) { int rc; opal_output_verbose(10, opal_pmix_base_framework.framework_output, - "%s pmix:isolated isolated_put key %s scope %d\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope); + "%s pmix:isolated isolated_put key %s scope %d\n", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope); if (!isolated_init_count) { - return OPAL_ERROR; + return OPAL_ERROR; } rc = opal_pmix_base_store(&isolated_pname, kv); @@ -345,39 +349,39 @@ static int isolated_fence(opal_list_t *procs, int collect_data) } static int isolated_fence_nb(opal_list_t *procs, int collect_data, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { return OPAL_ERR_NOT_IMPLEMENTED; } static int isolated_get(const opal_process_name_t *id, - const char *key, opal_list_t *info, - opal_value_t **kv) + const char *key, opal_list_t *info, + opal_value_t **kv) { int rc; opal_list_t vals; opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:isolated getting value for proc %s key %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*id), key); + "%s pmix:isolated getting value for proc %s key %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + OPAL_NAME_PRINT(*id), key); OBJ_CONSTRUCT(&vals, opal_list_t); rc = opal_pmix_base_fetch(id, key, &vals); if (OPAL_SUCCESS == rc) { - *kv = (opal_value_t*)opal_list_remove_first(&vals); - return OPAL_SUCCESS; + *kv = (opal_value_t*)opal_list_remove_first(&vals); + return OPAL_SUCCESS; } else { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:isolated fetch from dstore failed: %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc); + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s pmix:isolated fetch from dstore failed: %d", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc); } OPAL_LIST_DESTRUCT(&vals); return rc; } static int isolated_get_nb(const opal_process_name_t *id, const char *key, - opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata) + opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata) { return OPAL_ERR_NOT_IMPLEMENTED; } @@ -388,7 +392,7 @@ static int isolated_publish(opal_list_t *info) } static int isolated_publish_nb(opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { return OPAL_ERR_NOT_SUPPORTED; } @@ -399,7 +403,7 @@ static int isolated_lookup(opal_list_t *data, opal_list_t *info) } static int isolated_lookup_nb(char **keys, opal_list_t *info, - opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) + opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata) { return OPAL_ERR_NOT_SUPPORTED; } @@ -410,7 +414,7 @@ static int isolated_unpublish(char **keys, opal_list_t *info) } static int isolated_unpublish_nb(char **keys, opal_list_t *info, - opal_pmix_op_cbfunc_t cbfunc, void *cbdata) + opal_pmix_op_cbfunc_t cbfunc, void *cbdata) { return OPAL_ERR_NOT_SUPPORTED; } @@ -421,7 +425,7 @@ static const char *isolated_get_version(void) } static int isolated_store_local(const opal_process_name_t *proc, - opal_value_t *val) + opal_value_t *val) { opal_pmix_base_store(proc, val); diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h index 28da8fb916..a3940ae402 100644 --- a/opal/mca/pmix/pmix.h +++ b/opal/mca/pmix/pmix.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2015 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ @@ -284,7 +284,7 @@ extern int opal_pmix_base_exchange(opal_value_t *info, * If the information is not found, or the server connection fails, then * an appropriate error constant will be returned. */ -typedef int (*opal_pmix_base_module_init_fn_t)(void); +typedef int (*opal_pmix_base_module_init_fn_t)(opal_list_t *ilist); /* Finalize the PMIx client, closing the connection to the local server. * An error code will be returned if, for some reason, the connection diff --git a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h index 7bc9a8ce89..2be2f629b0 100644 --- a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h +++ b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h @@ -131,6 +131,10 @@ typedef uint32_t pmix_rank_t; #define PMIX_GRPID "pmix.egid" // (uint32_t) effective group id #define PMIX_DSTPATH "pmix.dstpath" // (char*) path to dstore files #define PMIX_VERSION_INFO "pmix.version" // (char*) PMIx version of contactor +#define PMIX_PROGRAMMING_MODEL "pmix.pgm.model" // (char*) programming model being initialized (e.g., "MPI" or "OpenMP") +#define PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH") +#define PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1") +#define PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads") /* attributes for the USOCK rendezvous socket */ @@ -531,6 +535,7 @@ typedef int pmix_status_t; #define PMIX_ERR_EVENT_REGISTRATION (PMIX_ERR_OP_BASE - 14) #define PMIX_ERR_JOB_TERMINATED (PMIX_ERR_OP_BASE - 15) #define PMIX_ERR_UPDATE_ENDPOINTS (PMIX_ERR_OP_BASE - 16) +#define PMIX_MODEL_DECLARED (PMIX_ERR_OP_BASE - 17) /* define a starting point for system error constants so * we avoid renumbering when making additions */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c index a1b9546bed..66801e0de9 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c @@ -57,7 +57,7 @@ #elif PMIX_CC_USE_IDENT #ident PMIX_VERSION #endif - static const char pmix_version_string[] = PMIX_VERSION; +static const char pmix_version_string[] = PMIX_VERSION; #include "src/class/pmix_list.h" @@ -134,8 +134,8 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer, goto error; } - /* we always leave space for a callback object */ - chain->ninfo = ninfo + 1; + /* we always leave space for the evhandler name plus a callback object */ + chain->ninfo = ninfo + 2; PMIX_INFO_CREATE(chain->info, chain->ninfo); if (0 < ninfo) { @@ -145,8 +145,10 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer, goto error; } } + /* put the evhandler name tag in its place */ + PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING); /* now put the callback object tag in the last element */ - PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] pmix:client_notify_recv - processing event %d, calling errhandler", @@ -236,6 +238,79 @@ static void evhandler_reg_callbk(pmix_status_t status, *active = status; } +typedef struct { + pmix_info_t *info; + size_t ninfo; +} mydata_t; + +static void release_info(pmix_status_t status, void *cbdata) +{ + mydata_t *cd = (mydata_t*)cbdata; + PMIX_INFO_FREE(cd->info, cd->ninfo); + free(cd); +} + +static void _check_for_notify(pmix_info_t info[], size_t ninfo) +{ + mydata_t *cd; + size_t n, m=0; + pmix_info_t *model=NULL, *library=NULL, *vers=NULL, *tmod=NULL; + + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_PROGRAMMING_MODEL, PMIX_MAX_KEYLEN)) { + /* we need to generate an event indicating that + * a programming model has been declared */ + model = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_NAME, PMIX_MAX_KEYLEN)) { + library = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_MODEL_LIBRARY_VERSION, PMIX_MAX_KEYLEN)) { + vers = &info[n]; + ++m; + } else if (0 == strncmp(info[n].key, PMIX_THREADING_MODEL, PMIX_MAX_KEYLEN)) { + tmod = &info[n]; + ++m; + } + } + if (0 < m) { + /* notify anyone listening that a model has been declared */ + cd = (mydata_t*)malloc(sizeof(mydata_t)); + if (NULL == cd) { + /* nothing we can do */ + return; + } + PMIX_INFO_CREATE(cd->info, m+1); + if (NULL == cd->info) { + free(cd); + return; + } + cd->ninfo = m+1; + n = 0; + if (NULL != model) { + PMIX_INFO_XFER(&cd->info[n], model); + ++n; + } + if (NULL != library) { + PMIX_INFO_XFER(&cd->info[n], library); + ++n; + } + if (NULL != vers) { + PMIX_INFO_XFER(&cd->info[n], vers); + ++n; + } + if (NULL != tmod) { + PMIX_INFO_XFER(&cd->info[n], tmod); + ++n; + } + /* mark that it is not to go to any default handlers */ + PMIX_INFO_LOAD(&cd->info[n], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL); + PMIx_Notify_event(PMIX_MODEL_DECLARED, + &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL, + cd->info, cd->ninfo, release_info, (void*)cd); + } +} + PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, pmix_info_t info[], size_t ninfo) { @@ -263,6 +338,12 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, (void)strncpy(proc->nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); proc->rank = pmix_globals.myid.rank; } + /* we also need to check the info keys to see if something need + * be done with them - e.g., to notify another library that we + * also have called init */ + if (NULL != info) { + _check_for_notify(info, ninfo); + } ++pmix_globals.init_cntr; return PMIX_SUCCESS; } @@ -280,6 +361,8 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, } /* setup the globals */ + PMIX_CONSTRUCT(&pmix_globals.notifications, pmix_ring_buffer_t); + pmix_ring_buffer_init(&pmix_globals.notifications, 256); PMIX_CONSTRUCT(&pmix_client_globals.pending_requests, pmix_list_t); PMIX_CONSTRUCT(&pmix_client_globals.myserver, pmix_peer_t); @@ -381,6 +464,11 @@ PMIX_EXPORT pmix_status_t PMIx_Init(pmix_proc_t *proc, } PMIX_INFO_DESTRUCT(&ginfo); + /* check to see if we need to notify anyone */ + if (NULL != info) { + _check_for_notify(info, ninfo); + } + return PMIX_SUCCESS; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h index e9ebd33318..2899faa9a6 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h @@ -125,11 +125,14 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, pmix_event_chain_t *_ch; \ _ch = PMIX_NEW(pmix_event_chain_t); \ _ch->status = (e); \ - _ch->ninfo = 1; \ + _ch->ninfo = 2; \ _ch->final_cbfunc = (f); \ _ch->final_cbdata = _ch; \ PMIX_INFO_CREATE(_ch->info, _ch->ninfo); \ PMIX_INFO_LOAD(&_ch->info[0], \ + PMIX_EVENT_HDLR_NAME, \ + NULL, PMIX_STRING); \ + PMIX_INFO_LOAD(&_ch->info[1], \ PMIX_EVENT_RETURN_OBJECT, \ NULL, PMIX_POINTER); \ pmix_invoke_local_event_hdlr(_ch); \ diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 83474169fd..38f93bd6f4 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -94,7 +94,7 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, pmix_cb_t *cb; pmix_event_chain_t *chain; size_t n; - + pmix_notify_caddy_t *cd, *rbout; pmix_output_verbose(2, pmix_globals.debug_output, "client: notifying server %s:%d of status %s", @@ -104,36 +104,39 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, if (!pmix_globals.connected) { return PMIX_ERR_UNREACH; } - /* create the msg object */ - msg = PMIX_NEW(pmix_buffer_t); - /* pack the command */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* pack the status */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* no need to pack the source as it is us */ + if (PMIX_RANGE_PROC_LOCAL != range) { + /* create the msg object */ + msg = PMIX_NEW(pmix_buffer_t); - /* pack the range */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - /* pack the info */ - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) { - PMIX_ERROR_LOG(rc); - goto cleanup; - } - if (0 < ninfo) { - if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) { + /* pack the command */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cmd, 1, PMIX_CMD))) { PMIX_ERROR_LOG(rc); goto cleanup; } + /* pack the status */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &status, 1, PMIX_STATUS))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + /* no need to pack the source as it is us */ + + /* pack the range */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &range, 1, PMIX_DATA_RANGE))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + /* pack the info */ + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &ninfo, 1, PMIX_SIZE))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + if (0 < ninfo) { + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, info, ninfo, PMIX_INFO))) { + PMIX_ERROR_LOG(rc); + goto cleanup; + } + } } /* setup for our own local callbacks */ @@ -141,8 +144,9 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, chain->status = status; (void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); chain->source.rank = pmix_globals.myid.rank; - /* we always leave space for a callback object */ - chain->ninfo = ninfo + 1; + /* we always leave space for a callback object and + * the evhandler name. */ + chain->ninfo = ninfo + 2; PMIX_INFO_CREATE(chain->info, chain->ninfo); if (0 < ninfo) { @@ -151,29 +155,84 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, PMIX_INFO_XFER(&chain->info[n], &info[n]); } } + /* put the evhandler name tag in the next-to-last element - we + * will fill it in as each handler is called */ + PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING); /* now put the callback object tag in the last element */ - PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); - /* create a callback object as we need to pass it to the - * recv routine so we know which callback to use when - * the server acks/nacks the register events request*/ - cb = PMIX_NEW(pmix_cb_t); - cb->op_cbfunc = cbfunc; - cb->cbdata = cbdata; - /* send to the server */ - pmix_output_verbose(2, pmix_globals.debug_output, - "client: notifying server %s:%d - sending", - pmix_globals.myid.nspace, pmix_globals.myid.rank); - rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(cb); - goto cleanup; + /* we need to cache this event so we can pass it into + * ourselves should someone later register for it */ + cd = PMIX_NEW(pmix_notify_caddy_t); + cd->status = status; + if (NULL == source) { + (void)strncpy(cd->source.nspace, "UNDEF", PMIX_MAX_NSLEN); + cd->source.rank = PMIX_RANK_UNDEF; + } else { + (void)strncpy(cd->source.nspace, source->nspace, PMIX_MAX_NSLEN); + cd->source.rank = source->rank; + } + cd->range = range; + + /* check for directives */ + if (NULL != info) { + cd->ninfo = chain->ninfo; + PMIX_INFO_CREATE(cd->info, cd->ninfo); + for (n=0; n < chain->ninfo; n++) { + PMIX_INFO_XFER(&cd->info[n], &chain->info[n]); + if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { + cd->nondefault = true; + } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { + /* provides an array of pmix_proc_t identifying the procs + * that are to receive this notification, or a single pmix_proc_t */ + if (PMIX_DATA_ARRAY == cd->info[n].value.type && + NULL != cd->info[n].value.data.darray && + NULL != cd->info[n].value.data.darray->array) { + cd->ntargets = cd->info[n].value.data.darray->size; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t)); + } else if (PMIX_PROC == cd->info[n].value.type) { + cd->ntargets = 1; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); + } else { + /* this is an error */ + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + return PMIX_ERR_BAD_PARAM; + } + } + } + } + /* add to our cache */ + rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); + /* if an older event was bumped, release it */ + if (NULL != rbout) { + PMIX_RELEASE(rbout); + } + + if (PMIX_RANGE_PROC_LOCAL != range) { + /* create a callback object as we need to pass it to the + * recv routine so we know which callback to use when + * the server acks/nacks the register events request. The + * server will _not_ send this notification back to us, + * so we handle it locally */ + cb = PMIX_NEW(pmix_cb_t); + cb->op_cbfunc = cbfunc; + cb->cbdata = cbdata; + /* send to the server */ + pmix_output_verbose(2, pmix_globals.debug_output, + "client: notifying server %s:%d - sending", + pmix_globals.myid.nspace, pmix_globals.myid.rank); + rc = pmix_ptl.send_recv(&pmix_client_globals.myserver, msg, notify_event_cbfunc, cb); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(cb); + goto cleanup; + } } /* now notify any matching registered callbacks we have */ pmix_invoke_local_event_hdlr(chain); - PMIX_RELEASE(chain); // maintain accounting return PMIX_SUCCESS; @@ -245,7 +304,7 @@ static void progress_local_event_hdlr(pmix_status_t status, chain->nresults = cnt; /* if the caller indicates that the chain is completed, - * or we completed the "last" event, then stop here */ + * or we completed the "last" event */ if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) { goto complete; } @@ -261,6 +320,13 @@ static void progress_local_event_hdlr(pmix_status_t status, if (nxt->codes[0] == chain->status && check_range(&nxt->rng, &chain->source)) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -294,6 +360,13 @@ static void progress_local_event_hdlr(pmix_status_t status, * the source fits within it */ if (nxt->codes[n] == chain->status) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -321,6 +394,13 @@ static void progress_local_event_hdlr(pmix_status_t status, * the source fits within it */ if (check_range(&nxt->rng, &chain->source)) { chain->evhdlr = nxt; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject; nxt->evhdlr(nxt->index, @@ -341,6 +421,13 @@ static void progress_local_event_hdlr(pmix_status_t status, if (1 == pmix_globals.events.last->ncodes && pmix_globals.events.last->codes[0] == chain->status) { chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -354,6 +441,13 @@ static void progress_local_event_hdlr(pmix_status_t status, for (n=0; n < pmix_globals.events.last->ncodes; n++) { if (pmix_globals.events.last->codes[n] == chain->status) { chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -367,6 +461,13 @@ static void progress_local_event_hdlr(pmix_status_t status, } else { /* gets run for all codes */ chain->evhdlr = pmix_globals.events.last; + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } /* add any cbobject - the info struct for it is at the end */ chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject; chain->evhdlr->evhdlr(chain->evhdlr->index, @@ -411,8 +512,9 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) bool found; pmix_output_verbose(2, pmix_globals.debug_output, - "%s:%d invoke_local_event_hdlr", - pmix_globals.myid.nspace, pmix_globals.myid.rank); + "%s:%d invoke_local_event_hdlr for status %s", + pmix_globals.myid.nspace, pmix_globals.myid.rank, + PMIx_Error_string(chain->status)); /* sanity check */ if (NULL == chain->info) { @@ -490,19 +592,42 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) } } - /* if they didn't want it to go to a default handler, then we are done */ - if (chain->nondefault) { - goto complete; + /* if they didn't want it to go to a default handler, then ignore them */ + if (!chain->nondefault) { + /* pass it to any default handlers */ + PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) { + if (check_range(&evhdlr->rng, &chain->source)) { + /* invoke the handler */ + chain->evhdlr = evhdlr; + goto invk; + } + } } - /* finally, pass it to any default handlers */ - PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) { - if (check_range(&evhdlr->rng, &chain->source)) { - /* invoke the handler */ - chain->evhdlr = evhdlr; + /* if we registered a "last" handler, and it fits the given range + * and code, then invoke it now */ + if (NULL != pmix_globals.events.last && + check_range(&pmix_globals.events.last->rng, &chain->source)) { + chain->endchain = true; // ensure we don't do this again + if (1 == pmix_globals.events.last->ncodes && + pmix_globals.events.last->codes[0] == chain->status) { + chain->evhdlr = pmix_globals.events.last; + goto invk; + } else if (NULL != pmix_globals.events.last->codes) { + /* need to check if this code is included in the array */ + for (i=0; i < pmix_globals.events.last->ncodes; i++) { + if (pmix_globals.events.last->codes[i] == chain->status) { + chain->evhdlr = pmix_globals.events.last; + goto invk; + } + } + } else { + /* gets run for all codes */ + chain->evhdlr = pmix_globals.events.last; goto invk; } } + /* if we got here, then nothing was found */ complete: /* we still have to call their final callback */ @@ -514,9 +639,18 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) invk: /* invoke the handler */ + /* add the handler name in case they want to reference it */ + if (NULL != chain->info[chain->ninfo-2].value.data.string) { + free(chain->info[chain->ninfo-2].value.data.string); + } + if (NULL != chain->evhdlr->name) { + chain->info[chain->ninfo-2].value.data.string = strdup(chain->evhdlr->name); + } chain->info[chain->ninfo-1].value.data.ptr = chain->evhdlr->cbobject; pmix_output_verbose(2, pmix_globals.debug_output, - "[%s:%d] INVOKING EVHDLR", __FILE__, __LINE__); + "[%s:%d] INVOKING EVHDLR %s", __FILE__, __LINE__, + (NULL == chain->evhdlr->name) ? + "NULL" : chain->evhdlr->name); chain->evhdlr->evhdlr(chain->evhdlr->index, chain->status, &chain->source, chain->info, chain->ninfo, @@ -544,7 +678,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) * the message until all local procs have received it, or it ages to * the point where it gets pushed out by more recent events */ PMIX_RETAIN(cd); - rbout = pmix_ring_buffer_push(&pmix_server_globals.notifications, cd); + rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); /* if an older event was bumped, release it */ if (NULL != rbout) { @@ -558,7 +692,8 @@ static void _notify_client_event(int sd, short args, void *cbdata) cd->status == reginfoptr->code) { PMIX_LIST_FOREACH(pr, ®infoptr->peers, pmix_peer_events_info_t) { /* if this client was the source of the event, then - * don't send it back */ + * don't send it back as they will have processed it + * when they generated it */ if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) && cd->source.rank == pr->peer->info->rank) { continue; diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_registration.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_registration.c index 134bece6ea..66ab6b21de 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_registration.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_registration.c @@ -325,20 +325,22 @@ static pmix_status_t _add_hdlr(pmix_rshift_caddy_t *cd, pmix_list_t *xfer) static void reg_event_hdlr(int sd, short args, void *cbdata) { - size_t index = 0, n; - pmix_status_t rc; pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)cbdata; + size_t index = 0, n, i; + pmix_status_t rc; pmix_event_hdlr_t *evhdlr, *ev; uint8_t location = PMIX_EVENT_ORDER_NONE; char *name = NULL, *locator = NULL; bool firstoverall=false, lastoverall=false; - bool found; + bool found, matched; pmix_list_t xfer; pmix_info_caddy_t *ixfer; void *cbobject = NULL; pmix_data_range_t range = PMIX_RANGE_UNDEF; pmix_proc_t *parray = NULL; size_t nprocs; + pmix_notify_caddy_t *ncd; + pmix_event_chain_t *chain; pmix_output_verbose(2, pmix_globals.debug_output, "pmix: register event_hdlr with %d infos", (int)cd->ninfo); @@ -672,6 +674,66 @@ static void reg_event_hdlr(int sd, short args, void *cbdata) cd->evregcbfn(rc, index, cd->cbdata); } + /* check if any matching notifications have been cached */ + for (i=0; i < pmix_globals.notifications.size; i++) { + if (NULL == (ncd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) { + break; + } + found = false; + if (NULL == cd->codes) { + /* they registered a default event handler - always matches */ + found = true; + } else { + for (n=0; n < cd->ncodes; n++) { + if (cd->codes[n] == ncd->status) { + found = true; + break; + } + } + } + if (found) { + /* if we were given specific targets, check if we are one */ + if (NULL != ncd->targets) { + matched = false; + for (n=0; n < ncd->ntargets; n++) { + if (0 != strncmp(pmix_globals.myid.nspace, ncd->targets[n].nspace, PMIX_MAX_NSLEN)) { + continue; + } + if (PMIX_RANK_WILDCARD == ncd->targets[n].rank || + pmix_globals.myid.rank == ncd->targets[n].rank) { + matched = true; + break; + } + } + if (!matched) { + /* do not notify this one */ + continue; + } + } + /* all matches - notify */ + chain = PMIX_NEW(pmix_event_chain_t); + chain->status = ncd->status; + (void)strncpy(chain->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN); + chain->source.rank = pmix_globals.myid.rank; + /* we already left space for evhandler name plus + * a callback object when we cached the notification */ + chain->ninfo = ncd->ninfo; + PMIX_INFO_CREATE(chain->info, chain->ninfo); + if (0 < cd->ninfo) { + /* need to copy the info */ + for (n=0; n < ncd->ninfo; n++) { + PMIX_INFO_XFER(&chain->info[n], &ncd->info[n]); + } + } + /* we don't want this chain to propagate, so indicate it + * should only be run as a single-shot */ + chain->endchain = true; + /* now notify any matching registered callbacks we have */ + pmix_invoke_local_event_hdlr(chain); + } + } + + /* all done */ PMIX_RELEASE(cd); } diff --git a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h index 1333cb24f1..300ea224dd 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix2x/pmix/src/include/pmix_globals.h @@ -36,6 +36,7 @@ #include "src/buffer_ops/types.h" #include "src/class/pmix_hash_table.h" #include "src/class/pmix_list.h" +#include "src/class/pmix_ring_buffer.h" #include "src/event/pmix_event.h" #include "src/mca/psec/psec.h" @@ -358,21 +359,22 @@ PMIX_CLASS_DECLARATION(pmix_info_caddy_t); * between various parts of the code library. Both the client * and server libraries must instance this structure */ typedef struct { - int init_cntr; // #times someone called Init - #times called Finalize + int init_cntr; // #times someone called Init - #times called Finalize pmix_proc_t myid; - pmix_peer_t *mypeer; // my own peer object + pmix_peer_t *mypeer; // my own peer object pmix_proc_type_t proc_type; - uid_t uid; // my effective uid - gid_t gid; // my effective gid + uid_t uid; // my effective uid + gid_t gid; // my effective gid int pindex; pmix_event_base_t *evbase; bool external_evbase; int debug_output; - pmix_events_t events; // my event handler registrations. + pmix_events_t events; // my event handler registrations. bool connected; - pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about - pmix_buffer_t *cache_local; // data PUT by me to local scope - pmix_buffer_t *cache_remote; // data PUT by me to remote scope + pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about + pmix_buffer_t *cache_local; // data PUT by me to local scope + pmix_buffer_t *cache_remote; // data PUT by me to remote scope + pmix_ring_buffer_t notifications; // ring buffer of pending notifications } pmix_globals_t; diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c index ed445a4a92..7046511180 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c @@ -105,6 +105,10 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module) pmix_globals.myid.rank = strtol(evar, NULL, 10); } + /* construct the global notification ring buffer */ + PMIX_CONSTRUCT(&pmix_globals.notifications, pmix_ring_buffer_t); + pmix_ring_buffer_init(&pmix_globals.notifications, 256); + /* setup the server-specific globals */ PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t); pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1); @@ -113,8 +117,6 @@ static pmix_status_t initialize_server_base(pmix_server_module_t *module) PMIX_CONSTRUCT(&pmix_server_globals.gdata, pmix_buffer_t); PMIX_CONSTRUCT(&pmix_server_globals.events, pmix_list_t); PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t); - PMIX_CONSTRUCT(&pmix_server_globals.notifications, pmix_ring_buffer_t); - pmix_ring_buffer_init(&pmix_server_globals.notifications, 256); pmix_output_verbose(2, pmix_globals.debug_output, "pmix:server init called"); @@ -261,7 +263,7 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) PMIX_LIST_DESTRUCT(&pmix_server_globals.remote_pnd); PMIX_LIST_DESTRUCT(&pmix_server_globals.local_reqs); PMIX_DESTRUCT(&pmix_server_globals.gdata); - PMIX_DESTRUCT(&pmix_server_globals.notifications); + PMIX_DESTRUCT(&pmix_globals.notifications); PMIX_LIST_DESTRUCT(&pmix_server_globals.events); if (NULL != security_mode) { diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c index 5add656abf..97fdd7cdfe 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c @@ -1160,8 +1160,8 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, check: /* check if any matching notifications have been cached */ - for (i=0; i < pmix_server_globals.notifications.size; i++) { - if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_server_globals.notifications, i))) { + for (i=0; i < pmix_globals.notifications.size; i++) { + if (NULL == (cd = (pmix_notify_caddy_t*)pmix_ring_buffer_poke(&pmix_globals.notifications, i))) { break; } found = false; diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.h b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.h index f502cd33a3..f978e058b3 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.h +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.h @@ -111,7 +111,6 @@ typedef struct { pmix_list_t local_reqs; // list of pmix_dmdx_local_t awaiting arrival of data from local neighbours pmix_buffer_t gdata; // cache of data given to me for passing to all clients pmix_list_t events; // list of pmix_regevents_info_t registered events - pmix_ring_buffer_t notifications; // ring buffer of pending notifications bool tool_connections_allowed; } pmix_server_globals_t; diff --git a/opal/mca/pmix/pmix2x/pmix/src/util/error.c b/opal/mca/pmix/pmix2x/pmix/src/util/error.c index d75bc2cd78..29ee09f129 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/util/error.c +++ b/opal/mca/pmix/pmix2x/pmix/src/util/error.c @@ -167,6 +167,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum) return "PMIX HEARTBEAT ALERT"; case PMIX_MONITOR_FILE_ALERT: return "PMIX FILE MONITOR ALERT"; + case PMIX_MODEL_DECLARED: + return "PMIX MODEL DECLARED"; case PMIX_SUCCESS: return "SUCCESS"; default: diff --git a/opal/mca/pmix/pmix2x/pmix2x.c b/opal/mca/pmix/pmix2x/pmix2x.c index efa8047d26..22a65a43df 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.c +++ b/opal/mca/pmix/pmix2x/pmix2x.c @@ -409,6 +409,9 @@ pmix_status_t pmix2x_convert_opalrc(int rc) case OPAL_ERR_PARTIAL_SUCCESS: return PMIX_QUERY_PARTIAL_SUCCESS; + case OPAL_ERR_MODEL_DECLARED: + return PMIX_MODEL_DECLARED; + case OPAL_ERROR: return PMIX_ERROR; case OPAL_SUCCESS: @@ -499,6 +502,10 @@ int pmix2x_convert_rc(pmix_status_t rc) case PMIX_MONITOR_FILE_ALERT: return OPAL_ERR_FILE_ALERT; + case PMIX_MODEL_DECLARED: + return OPAL_ERR_MODEL_DECLARED; + + case PMIX_ERROR: return OPAL_ERROR; case PMIX_SUCCESS: @@ -1010,6 +1017,7 @@ static void _reg_hdlr(int sd, short args, void *cbdata) n=0; OPAL_LIST_FOREACH(kv, cd->event_codes, opal_value_t) { op->pcodes[n] = pmix2x_convert_opalrc(kv->data.integer); + ++n; } } diff --git a/opal/mca/pmix/pmix2x/pmix2x.h b/opal/mca/pmix/pmix2x/pmix2x.h index 63506b19f1..720c6ac35f 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.h +++ b/opal/mca/pmix/pmix2x/pmix2x.h @@ -186,7 +186,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t); } while(0) /**** CLIENT FUNCTIONS ****/ -OPAL_MODULE_DECLSPEC int pmix2x_client_init(void); +OPAL_MODULE_DECLSPEC int pmix2x_client_init(opal_list_t *ilist); OPAL_MODULE_DECLSPEC int pmix2x_client_finalize(void); OPAL_MODULE_DECLSPEC int pmix2x_initialized(void); OPAL_MODULE_DECLSPEC int pmix2x_abort(int flag, const char *msg, diff --git a/opal/mca/pmix/pmix2x/pmix2x_client.c b/opal/mca/pmix/pmix2x/pmix2x_client.c index d758c8f6e3..70585af757 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_client.c +++ b/opal/mca/pmix/pmix2x/pmix2x_client.c @@ -36,6 +36,8 @@ static pmix_proc_t my_proc; static char *dbgvalue=NULL; +static volatile bool regactive; +static bool initialized = false; #define PMIX_WAIT_FOR_COMPLETION(a) \ do { \ @@ -55,28 +57,61 @@ static void errreg_cbfunc (pmix_status_t status, opal_output_verbose(5, opal_pmix_base_framework.framework_output, "PMIX client errreg_cbfunc - error handler registered status=%d, reference=%lu", status, (unsigned long)errhandler_ref); + regactive = false; } -int pmix2x_client_init(void) +int pmix2x_client_init(opal_list_t *ilist) { opal_process_name_t pname; pmix_status_t rc; int dbg; opal_pmix2x_jobid_trkr_t *job; opal_pmix2x_event_t *event; + pmix_info_t *pinfo; + size_t ninfo, n; + opal_value_t *ival; opal_output_verbose(1, opal_pmix_base_framework.framework_output, "PMIx_client init"); - if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) { - asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg); - putenv(dbgvalue); + if (!initialized) { + if (0 < (dbg = opal_output_get_verbosity(opal_pmix_base_framework.framework_output))) { + asprintf(&dbgvalue, "PMIX_DEBUG=%d", dbg); + putenv(dbgvalue); + } } - rc = PMIx_Init(&my_proc, NULL, 0); + /* convert the incoming list to info structs */ + if (NULL != ilist) { + ninfo = opal_list_get_size(ilist); + if (0 < ninfo) { + PMIX_INFO_CREATE(pinfo, ninfo); + n=0; + OPAL_LIST_FOREACH(ival, ilist, opal_value_t) { + (void)strncpy(pinfo[n].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&pinfo[n].value, ival); + ++n; + } + } else { + pinfo = NULL; + } + } else { + pinfo = NULL; + ninfo = 0; + } + + rc = PMIx_Init(&my_proc, pinfo, ninfo); if (PMIX_SUCCESS != rc) { return pmix2x_convert_rc(rc); } + if (0 < ninfo) { + PMIX_INFO_FREE(pinfo, ninfo); + + } + if (initialized) { + return OPAL_SUCCESS; + } + initialized = true; /* store our jobid and rank */ if (NULL != getenv(OPAL_MCA_PREFIX"orte_launch")) { @@ -102,7 +137,13 @@ int pmix2x_client_init(void) /* register the default event handler */ event = OBJ_NEW(opal_pmix2x_event_t); opal_list_append(&mca_pmix_pmix2x_component.events, &event->super); - PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, event); + PMIX_INFO_CREATE(pinfo, 1); + PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-DEFAULT", PMIX_STRING); + regactive = true; + PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, event); + PMIX_WAIT_FOR_COMPLETION(regactive); + PMIX_INFO_FREE(pinfo, 1); + return OPAL_SUCCESS; } @@ -130,7 +171,7 @@ int pmix2x_initialized(void) opal_output_verbose(1, opal_pmix_base_framework.framework_output, "PMIx_client initialized"); - return PMIx_Initialized(); + return initialized; } int pmix2x_abort(int flag, const char *msg, diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_south.c b/opal/mca/pmix/pmix2x/pmix2x_server_south.c index e1195da202..977194c545 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_south.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_south.c @@ -142,8 +142,11 @@ int pmix2x_server_init(opal_pmix_server_module_t *module, /* register the default event handler */ active = true; - PMIx_Register_event_handler(NULL, 0, NULL, 0, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active); + PMIX_INFO_CREATE(pinfo, 1); + PMIX_INFO_LOAD(&pinfo[0], PMIX_EVENT_HDLR_NAME, "OPAL-PMIX-2X-SERVER-DEFAULT", PMIX_STRING); + PMIx_Register_event_handler(NULL, 0, pinfo, 1, pmix2x_event_hdlr, errreg_cbfunc, (void*)&active); PMIX_WAIT_FOR_COMPLETION(active); + PMIX_INFO_FREE(pinfo, 1); /* as we might want to use some client-side functions, be sure * to register our own nspace */ diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 113ea02c33..1b8651fc3d 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -67,7 +67,10 @@ BEGIN_C_DECLS /* identification attributes */ #define OPAL_PMIX_USERID "pmix.euid" // (uint32_t) effective user id #define OPAL_PMIX_GRPID "pmix.egid" // (uint32_t) effective group id - +#define OPAL_PMIX_PROGRAMMING_MODEL "pmix.pgm.model" // (char*) programming model being initialized (e.g., "MPI" or "OpenMP") +#define OPAL_PMIX_MODEL_LIBRARY_NAME "pmix.mdl.name" // (char*) programming model implementation ID (e.g., "OpenMPI" or "MPICH") +#define OPAL_PMIX_MODEL_LIBRARY_VERSION "pmix.mld.vrs" // (char*) programming model version string (e.g., "2.1.1") +#define OPAL_PMIX_THREADING_MODEL "pmix.threads" // (char*) threading model used (e.g., "pthreads") /* attributes for the rendezvous socket */ #define OPAL_PMIX_USOCK_DISABLE "pmix.usock.disable" // (bool) disable legacy usock support diff --git a/opal/mca/pmix/s1/pmix_s1.c b/opal/mca/pmix/s1/pmix_s1.c index b04b247b3c..f68b427f71 100644 --- a/opal/mca/pmix/s1/pmix_s1.c +++ b/opal/mca/pmix/s1/pmix_s1.c @@ -31,7 +31,7 @@ #include "opal/mca/pmix/base/pmix_base_hash.h" #include "pmix_s1.h" -static int s1_init(void); +static int s1_init(opal_list_t *ilist); static int s1_fini(void); static int s1_initialized(void); static int s1_abort(int flag, const char msg[], @@ -141,7 +141,7 @@ static int kvs_put(const char key[], const char value[]) return rc; } -static int s1_init(void) +static int s1_init(opal_list_t *ilist) { PMI_BOOL initialized; int spawned; @@ -155,6 +155,10 @@ static int s1_init(void) char **localranks=NULL; opal_process_name_t wildcard_rank; + if (0 < pmix_init_count) { + return OPAL_SUCCESS; + } + if (PMI_SUCCESS != (rc = PMI_Initialized(&initialized))) { OPAL_PMI_ERROR(rc, "PMI_Initialized"); return OPAL_ERROR; diff --git a/opal/mca/pmix/s2/pmix_s2.c b/opal/mca/pmix/s2/pmix_s2.c index 130dedac5b..02d3beceb4 100644 --- a/opal/mca/pmix/s2/pmix_s2.c +++ b/opal/mca/pmix/s2/pmix_s2.c @@ -36,7 +36,7 @@ #include "opal/mca/pmix/base/pmix_base_hash.h" #include "pmix_s2.h" -static int s2_init(void); +static int s2_init(opal_list_t *ilist); static int s2_fini(void); static int s2_initialized(void); static int s2_abort(int flag, const char msg[], @@ -158,7 +158,7 @@ static int kvs_get(const char key[], char value [], int maxvalue) return OPAL_SUCCESS; } -static int s2_init(void) +static int s2_init(opal_list_t *ilist) { int spawned, size, rank, appnum; int rc, ret = OPAL_ERROR; @@ -174,6 +174,10 @@ static int s2_init(void) char nmtmp[64]; opal_process_name_t wildcard_rank; + if (0 < pmix_init_count) { + return OPAL_SUCCESS; + } + /* if we can't startup PMI, we can't be used */ if ( PMI2_Initialized () ) { return OPAL_SUCCESS; diff --git a/orte/mca/errmgr/default_app/errmgr_default_app.c b/orte/mca/errmgr/default_app/errmgr_default_app.c index 8e605bf173..c61f2d2241 100644 --- a/orte/mca/errmgr/default_app/errmgr_default_app.c +++ b/orte/mca/errmgr/default_app/errmgr_default_app.c @@ -9,7 +9,7 @@ * reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -33,6 +33,7 @@ #include "orte/util/name_fns.h" #include "orte/util/show_help.h" #include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/rml/rml.h" #include "orte/mca/odls/odls_types.h" #include "orte/mca/state/state.h" @@ -74,7 +75,9 @@ static size_t myerrhandle = SIZE_MAX; static void register_cbfunc(int status, size_t errhndler, void *cbdata) { + volatile bool *active = (volatile bool*)cbdata; myerrhandle = errhndler; + *active = false; } static void notify_cbfunc(int status, @@ -117,11 +120,24 @@ static void notify_cbfunc(int status, ************************/ static int init(void) { + opal_list_t directives; + volatile bool active; + opal_value_t *kv; + /* setup state machine to trap proc errors */ orte_state.add_proc_state(ORTE_PROC_STATE_ERROR, proc_errors, ORTE_ERROR_PRI); /* tie the default PMIx event handler back to us */ - opal_pmix.register_evhandler(NULL, NULL, notify_cbfunc, register_cbfunc, NULL); + active = true; + OBJ_CONSTRUCT(&directives, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_HDLR_NAME); + kv->type = OPAL_STRING; + kv->data.string = strdup("ORTE-APP-DEFAULT"); + opal_list_append(&directives, &kv->super); + opal_pmix.register_evhandler(NULL, &directives, notify_cbfunc, register_cbfunc, (void*)&active); + ORTE_WAIT_FOR_COMPLETION(active); + OPAL_LIST_DESTRUCT(&directives); return ORTE_SUCCESS; } diff --git a/orte/mca/ess/pmi/ess_pmi_module.c b/orte/mca/ess/pmi/ess_pmi_module.c index 6ed504f341..4ad414236a 100644 --- a/orte/mca/ess/pmi/ess_pmi_module.c +++ b/orte/mca/ess/pmi/ess_pmi_module.c @@ -124,7 +124,7 @@ static int rte_init(void) /* set the event base */ opal_pmix_base_set_evbase(orte_event_base); /* initialize the selected module */ - if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init()))) { + if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init(NULL)))) { /* we cannot run */ error = "pmix init"; goto error; diff --git a/orte/mca/ess/singleton/ess_singleton_module.c b/orte/mca/ess/singleton/ess_singleton_module.c index 2f2e5376ac..6ddca46124 100644 --- a/orte/mca/ess/singleton/ess_singleton_module.c +++ b/orte/mca/ess/singleton/ess_singleton_module.c @@ -189,7 +189,7 @@ static int rte_init(void) /* set the event base */ opal_pmix_base_set_evbase(orte_event_base); /* initialize the selected module */ - if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init()))) { + if (!opal_pmix.initialized() && (OPAL_SUCCESS != (ret = opal_pmix.init(NULL)))) { /* we cannot run */ error = "pmix init"; goto error; diff --git a/orte/test/mpi/Makefile b/orte/test/mpi/Makefile index 3a0074aa32..3bf63b8b0b 100644 --- a/orte/test/mpi/Makefile +++ b/orte/test/mpi/Makefile @@ -1,4 +1,4 @@ -PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client paccept pconnect ring hello.sapp binding badcoll attach +PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client paccept pconnect ring hello.sapp binding badcoll attach xlib all: $(PROGS) @@ -10,6 +10,9 @@ hello_output: hello_output.c hello_show_help: hello_show_help.c $(CC) $(CFLAGS) $(CFLAGS_INTERNAL) $^ -o $@ +xlib: xlib.c + $(CC) $(CFLAGS) $(CFLAGS_INTERNAL) $^ -o $@ -lpmix + CC = mpicc CFLAGS = -g --openmpi:linkall CFLAGS_INTERNAL = -I../../.. -I../../../orte/include -I../../../opal/include diff --git a/orte/test/mpi/Makefile.include b/orte/test/mpi/Makefile.include index 8f033e185a..45160a8f31 100644 --- a/orte/test/mpi/Makefile.include +++ b/orte/test/mpi/Makefile.include @@ -12,6 +12,7 @@ # All rights reserved. # Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. +# Copyright (c) 2017 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -55,5 +56,5 @@ EXTRA_DIST += \ test/mpi/singleton_client_server.c \ test/mpi/spawn_tree.c \ test/mpi/info_spawn.c \ - test/mpi/pmix.c - + test/mpi/pmix.c \ + test/mpi/xlib.c diff --git a/orte/test/mpi/xlib.c b/orte/test/mpi/xlib.c new file mode 100644 index 0000000000..7e74f46b77 --- /dev/null +++ b/orte/test/mpi/xlib.c @@ -0,0 +1,217 @@ +#include +#include +#include +#include + +#define SIZE 20 +#define POS 10 +#define INITIAL_VALUE 10 + +static pmix_proc_t myproc; + +/* this is the event notification function we pass down below + * when registering for general events - i.e.,, the default + * handler. We don't technically need to register one, but it + * is usually good practice to catch any events that occur */ +static void notification_fn(size_t evhdlr_registration_id, + pmix_status_t status, + const pmix_proc_t *source, + pmix_info_t info[], size_t ninfo, + pmix_info_t results[], size_t nresults, + pmix_event_notification_cbfunc_fn_t cbfunc, + void *cbdata) +{ + /* this example doesn't do anything with default events */ + fprintf(stderr, "Default event handler called with status %s\n", PMIx_Error_string(status)); + + if (NULL != cbfunc) { + cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); + } +} + +/* this is an event notification function that we explicitly request + * be called when the PMIX_ERR_JOB_TERMINATED notification is issued. + * We could catch it in the general event notification function and test + * the status to see if it was "job terminated", but it often is simpler + * to declare a use-specific notification callback point. In this case, + * we are asking to know whenever a job terminates, and we will then + * know we can exit */ +static void model_callback(size_t evhdlr_registration_id, + pmix_status_t status, + const pmix_proc_t *source, + pmix_info_t info[], size_t ninfo, + pmix_info_t results[], size_t nresults, + pmix_event_notification_cbfunc_fn_t cbfunc, + void *cbdata) +{ + size_t n; + + fprintf(stderr, "Model event handler called with status %d(%s)\n", status, PMIx_Error_string(status)); + + /* check to see what model declared itself */ + for (n=0; n < ninfo; n++) { + if (PMIX_STRING == info[n].value.type) { + fprintf(stderr, "\t%s:\t%s\n", info[n].key, info[n].value.data.string); + } + } + + /* we must NOT tell the event handler state machine that we + * are the last step as that will prevent it from notifying + * anyone else that might be listening for declarations */ + if (NULL != cbfunc) { + cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata); + } +} + +/* event handler registration is done asynchronously because it + * may involve the PMIx server registering with the host RM for + * external events. So we provide a callback function that returns + * the status of the request (success or an error), plus a numerical index + * to the registered event. The index is used later on to deregister + * an event handler - if we don't explicitly deregister it, then the + * PMIx server will do so when it see us exit */ +static void model_registration_callback(pmix_status_t status, + size_t evhandler_ref, + void *cbdata) +{ + volatile int *active = (volatile int*)cbdata; + + if (PMIX_SUCCESS != status) { + fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", + myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref); + } + *active = status; +} + +int main(int argc, char *argv[]) +{ + int i, rank, size, next, prev, tag = 201; + int array_size = SIZE; + int pos = POS; + int *send_array; + int *recv_array; + pmix_info_t *info; + size_t ninfo; + pmix_status_t code = PMIX_MODEL_DECLARED; + pmix_status_t rc; + volatile int active; + + + if (1 < argc) { + fprintf(stderr, "Declaring ourselves\n"); + /* declare ourselves as a non-MPI library prior to MPI_Init */ + ninfo = 4; + PMIX_INFO_CREATE(info, ninfo); + PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "EXAMPLE", PMIX_STRING); + PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "FOOL", PMIX_STRING); + PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3", PMIX_STRING); + PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "NONE", PMIX_STRING); + if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, info, ninfo))) { + fprintf(stderr, "PMIx Init failed: %s\n", PMIx_Error_string(rc)); + exit(1); + } + PMIX_INFO_FREE(info, ninfo); + + /* register a handler specifically for when models declare */ + active = -1; + ninfo = 1; + PMIX_INFO_CREATE(info, ninfo); + PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "APP-MODEL", PMIX_STRING); + PMIx_Register_event_handler(&code, 1, info, ninfo, + model_callback, model_registration_callback, (void*)&active); + while (-1 == active) { + usleep(10); + } + PMIX_INFO_FREE(info, ninfo); + if (0 != active) { + exit(active); + } + } + + /* initialize the MPI library - it will declare itself */ + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + if (argc <= 1) { + fprintf(stderr, "Registering handler\n"); + /* register a handler specifically for when models declare */ + active = -1; + ninfo = 1; + PMIX_INFO_CREATE(info, ninfo); + PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "APP-MODEL", PMIX_STRING); + + PMIx_Register_event_handler(&code, 1, info, ninfo, + model_callback, model_registration_callback, (void*)&active); + while (-1 == active) { + usleep(10); + } + PMIX_INFO_FREE(info, ninfo); + if (0 != active) { + exit(active); + } + } + + fprintf(stderr, "Rank %d has cleared MPI_Init\n", rank); + + next = (rank + 1) % size; + prev = (rank + size - 1) % size; + send_array = malloc(sizeof(int) * SIZE); + recv_array = malloc(sizeof(int) * SIZE); + + for (i = 0; i < array_size; ++i) { + send_array[i] = 17; + recv_array[i] = -1; + } + + if (0 == rank) { + send_array[pos] = INITIAL_VALUE; + MPI_Send(send_array, array_size, MPI_INT, next, tag, + MPI_COMM_WORLD); + } + + /* if we didn't already do it, declare another model now */ + if (argc <= 1) { + fprintf(stderr, "Declaring ourselves\n"); + /* declare ourselves as a non-MPI library after MPI_Init */ + ninfo = 4; + PMIX_INFO_CREATE(info, ninfo); + PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "EXAMPLE", PMIX_STRING); + PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "FOOL", PMIX_STRING); + PMIX_INFO_LOAD(&info[2], PMIX_MODEL_LIBRARY_VERSION, "1.2.3", PMIX_STRING); + PMIX_INFO_LOAD(&info[3], PMIX_THREADING_MODEL, "NONE", PMIX_STRING); + + if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, info, ninfo))) { + fprintf(stderr, "PMIx Init failed: %s\n", PMIx_Error_string(rc)); + exit(1); + } + PMIX_INFO_FREE(info, ninfo); + } + + while (1) { + recv_array[pos] = -1; + MPI_Recv(recv_array, array_size, MPI_INT, prev, tag, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + send_array[pos] = recv_array[pos]; + if (rank == 0) { + --send_array[pos]; + } + MPI_Send(send_array, array_size, MPI_INT, next, tag, MPI_COMM_WORLD); + if (0 == send_array[pos]) { + break; + } + } + + if (rank == 0) { + MPI_Recv(recv_array, array_size, MPI_INT, prev, tag, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + + fprintf(stderr, "Rank %d has completed ring\n", rank); + MPI_Barrier(MPI_COMM_WORLD); + fprintf(stderr, "Rank %d has completed MPI_Barrier\n", rank); + + /* decrement the PMIx refcount */ + PMIx_Finalize(NULL, 0); + MPI_Finalize(); + return 0; +} From 0afcb1a448a5447d73b1d8c595622f52235b49d8 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 5 May 2017 09:16:02 -0700 Subject: [PATCH 2/2] Update to support server self-notifications Signed-off-by: Ralph Castain --- .../pmix/src/event/pmix_event_notification.c | 130 +++++++++++++----- orte/test/mpi/xlib.c | 8 +- 2 files changed, 96 insertions(+), 42 deletions(-) diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 38f93bd6f4..8b2fc65751 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -659,6 +659,15 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) return; } +static void local_cbfunc(pmix_status_t status, void *cbdata) +{ + pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata; + + if (NULL != cd->cbfunc) { + cd->cbfunc(status, cd->cbdata); + } + PMIX_RELEASE(cd); +} static void _notify_client_event(int sd, short args, void *cbdata) { @@ -666,8 +675,9 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_notify_caddy_t *rbout; pmix_regevents_info_t *reginfoptr; pmix_peer_events_info_t *pr; + pmix_event_chain_t *chain; size_t n; - bool matched; + bool matched, holdcd; pmix_output_verbose(2, pmix_globals.debug_output, "pmix_server: _notify_error notifying clients of error %s", @@ -685,51 +695,95 @@ static void _notify_client_event(int sd, short args, void *cbdata) PMIX_RELEASE(rbout); } - /* cycle across our registered events and send the message to - * any client who registered for it */ - PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) { - if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) || - cd->status == reginfoptr->code) { - PMIX_LIST_FOREACH(pr, ®infoptr->peers, pmix_peer_events_info_t) { - /* if this client was the source of the event, then - * don't send it back as they will have processed it - * when they generated it */ - if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) && - cd->source.rank == pr->peer->info->rank) { - continue; - } - /* if we were given specific targets, check if this is one */ - if (NULL != cd->targets) { - matched = false; - for (n=0; n < cd->ntargets; n++) { - if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) { - continue; - } - if (PMIX_RANK_WILDCARD == cd->targets[n].rank || - pr->peer->info->rank == cd->targets[n].rank) { - matched = true; - break; - } - } - if (!matched) { - /* do not notify this one */ + holdcd = false; + if (PMIX_RANGE_PROC_LOCAL != cd->range) { + /* cycle across our registered events and send the message to + * any client who registered for it */ + PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) { + if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) || + cd->status == reginfoptr->code) { + PMIX_LIST_FOREACH(pr, ®infoptr->peers, pmix_peer_events_info_t) { + /* if this client was the source of the event, then + * don't send it back as they will have processed it + * when they generated it */ + if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) && + cd->source.rank == pr->peer->info->rank) { continue; } + /* if we were given specific targets, check if this is one */ + if (NULL != cd->targets) { + matched = false; + for (n=0; n < cd->ntargets; n++) { + if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) { + continue; + } + if (PMIX_RANK_WILDCARD == cd->targets[n].rank || + pr->peer->info->rank == cd->targets[n].rank) { + matched = true; + break; + } + } + if (!matched) { + /* do not notify this one */ + continue; + } + } + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix_server: notifying client %s:%d", + pr->peer->info->nptr->nspace, pr->peer->info->rank); + PMIX_RETAIN(cd->buf); + PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf); } - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server: notifying client %s:%d", - pr->peer->info->nptr->nspace, pr->peer->info->rank); - PMIX_RETAIN(cd->buf); - PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf); } } + if (PMIX_RANGE_LOCAL != cd->range && + 0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) && + cd->source.rank == pmix_globals.myid.rank) { + /* if we are the source, then we need to post this upwards as + * well so the host RM can broadcast it as necessary - we rely + * on the host RM to _not_ deliver this back to us! */ + if (NULL != pmix_host_server.notify_event) { + /* mark that we sent it upstairs so we don't release + * the caddy until we return from the host RM */ + holdcd = true; + pmix_host_server.notify_event(cd->status, &cd->source, cd->range, + cd->info, cd->ninfo, local_cbfunc, cd); + } + + } } - /* notify the caller */ - if (NULL != cd->cbfunc) { - cd->cbfunc(PMIX_SUCCESS, cd->cbdata); + /* we may also have registered for events, so be sure to check this + * against our registrations */ + chain = PMIX_NEW(pmix_event_chain_t); + chain->status = cd->status; + (void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN); + chain->source.rank = cd->source.rank; + /* we always leave space for a callback object and + * the evhandler name. */ + chain->ninfo = cd->ninfo + 2; + PMIX_INFO_CREATE(chain->info, chain->ninfo); + if (0 < cd->ninfo) { + /* need to copy the info */ + for (n=0; n < cd->ninfo; n++) { + PMIX_INFO_XFER(&chain->info[n], &cd->info[n]); + } + } + /* put the evhandler name tag in the next-to-last element - we + * will fill it in as each handler is called */ + PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING); + /* now put the callback object tag in the last element */ + PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + /* process it */ + pmix_invoke_local_event_hdlr(chain); + + if (!holdcd) { + /* notify the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(PMIX_SUCCESS, cd->cbdata); + } + PMIX_RELEASE(cd); } - PMIX_RELEASE(cd); } diff --git a/orte/test/mpi/xlib.c b/orte/test/mpi/xlib.c index 7e74f46b77..e75a874fab 100644 --- a/orte/test/mpi/xlib.c +++ b/orte/test/mpi/xlib.c @@ -30,12 +30,12 @@ static void notification_fn(size_t evhdlr_registration_id, } /* this is an event notification function that we explicitly request - * be called when the PMIX_ERR_JOB_TERMINATED notification is issued. + * be called when the PMIX_MODEL_DECLARED notification is issued. * We could catch it in the general event notification function and test - * the status to see if it was "job terminated", but it often is simpler + * the status to see if the status matched, but it often is simpler * to declare a use-specific notification callback point. In this case, - * we are asking to know whenever a job terminates, and we will then - * know we can exit */ + * we are asking to know whenever a programming model library is + * instantiated */ static void model_callback(size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source,