1
1

Merge pull request #3273 from rhc54/topic/pmix2.0

Update to PMIx v2.0alpha
Этот коммит содержится в:
Ralph Castain 2017-04-03 11:07:08 -07:00 коммит произвёл GitHub
родитель b3736701c4 2cc5fea8be
Коммит 9850832dbd
18 изменённых файлов: 1141 добавлений и 473 удалений

Просмотреть файл

@ -1,4 +1,5 @@
Copyright (c) 2015-2016 Intel, Inc. All rights reserved. Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
Copyright (c) 2017 IBM Corporation. All rights reserved.
$COPYRIGHT$ $COPYRIGHT$
Additional copyrights may follow Additional copyrights may follow
@ -23,6 +24,32 @@ current release as well as the "stable" bug fix release branch.
Master (not on release branches yet) Master (not on release branches yet)
------------------------------------ ------------------------------------
1.2.2 -- 21 March 2017
----------------------
- Compiler fix for Sun/Oracle CC (PR #322)
- Fix missing include (PR #326)
- Improve error checking around posix_fallocate (PR #329)
- Fix possible memory corruption (PR #331)
1.2.1 -- 21 Feb. 2017
----------------------
- dstore: Fix data corruption bug in key overwrite cases
- dstore: Performance and scalability fixes
- sm: Use posix_fallocate() before mmap
- pmi1/pmi2: Restore support
- dstore: Fix extension slot size allocation (Issue #280)
1.2.0 -- 14 Dec. 2016
----------------------
- Add shared memory data storage (dstore) option. Default: enabled
Configure option: --disable-dstore
- PMIx_Commit performance improvements
- Disable errhandler support
- Keep job info in the shared memory dstore
- PMIx_Get performance and memory improvements
1.1.5 1.1.5
----- -----
- Add pmix_version.h to support direct detection of PMIx library version - Add pmix_version.h to support direct detection of PMIx library version

Просмотреть файл

@ -23,14 +23,14 @@ release=0
# The only requirement is that it must be entirely printable ASCII # The only requirement is that it must be entirely printable ASCII
# characters and have no white space. # characters and have no white space.
greek= greek=a1
# If repo_rev is empty, then the repository version number will be # If repo_rev is empty, then the repository version number will be
# obtained during "make dist" via the "git describe --tags --always" # obtained during "make dist" via the "git describe --tags --always"
# command, or with the date (if "git describe" fails) in the form of # command, or with the date (if "git describe" fails) in the form of
# "date<date>". # "date<date>".
repo_rev=git4cdd5e0 repo_rev=gitc442ba8
# If tarball_version is not empty, it is used as the version string in # If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in # the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created # The date when this release was created
date="Mar 11, 2017" date="Apr 02, 2017"
# The shared library version of each of PMIx's public libraries. # The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library # These versions are maintained in accordance with the "Library

Просмотреть файл

@ -13,7 +13,9 @@ dnl All rights reserved.
dnl Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. dnl Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
dnl Copyright (c) 2009 Oak Ridge National Labs. All rights reserved. dnl Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
dnl Copyright (c) 2009-2016 Cisco Systems, Inc. All rights reserved. dnl Copyright (c) 2009-2016 Cisco Systems, Inc. All rights reserved.
dnl Copyright (c) 2013-2016 Intel, Inc. All rights reserved. dnl Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
dnl Copyright (c) 2017 Research Organization for Information Science
dnl and Technology (RIST). All rights reserved.
dnl dnl
dnl $COPYRIGHT$ dnl $COPYRIGHT$
dnl dnl
@ -278,7 +280,7 @@ for val in ${$1}; do
# http://www.open-mpi.org/community/lists/devel/2012/08/11362.php). # http://www.open-mpi.org/community/lists/devel/2012/08/11362.php).
case $val in case $val in
-Xclang) -Xclang|-Xg)
pmix_found=0 pmix_found=0
pmix_i=`expr $pmix_count + 1` pmix_i=`expr $pmix_count + 1`
;; ;;
@ -366,7 +368,7 @@ AC_DEFUN([PMIX_FLAGS_UNIQ],[
# https://github.com/open-mpi/ompi/issues/324). # https://github.com/open-mpi/ompi/issues/324).
case $val in case $val in
-Xclang) -Xclang|-Xg)
pmix_found=0 pmix_found=0
pmix_i=`expr $pmix_count + 1` pmix_i=`expr $pmix_count + 1`
;; ;;

Просмотреть файл

@ -21,7 +21,7 @@
AM_CPPFLAGS = -I$(top_builddir)/src -I$(top_builddir)/src/include -I$(top_builddir)/include -I$(top_builddir)/include/pmix AM_CPPFLAGS = -I$(top_builddir)/src -I$(top_builddir)/src/include -I$(top_builddir)/include -I$(top_builddir)/include/pmix
noinst_PROGRAMS = client dmodex dynamic fault pub tool debugger debuggerd alloc noinst_PROGRAMS = client dmodex dynamic fault pub tool debugger debuggerd alloc jctrl
if !WANT_HIDDEN if !WANT_HIDDEN
# these examples use internal symbols # these examples use internal symbols
# use --disable-visibility # use --disable-visibility
@ -40,11 +40,14 @@ debuggerd_SOURCES = debuggerd.c
debuggerd_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) debuggerd_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS)
debuggerd_LDADD = $(top_builddir)/src/libpmix.la debuggerd_LDADD = $(top_builddir)/src/libpmix.la
alloc_SOURCES = alloc.c alloc_SOURCES = alloc.c
alloc_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) alloc_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS)
alloc_LDADD = $(top_builddir)/src/libpmix.la alloc_LDADD = $(top_builddir)/src/libpmix.la
jctrl_SOURCES = jctrl.c
jctrl_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS)
jctrl_LDADD = $(top_builddir)/src/libpmix.la
dmodex_SOURCES = dmodex.c dmodex_SOURCES = dmodex.c
dmodex_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) dmodex_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS)
dmodex_LDADD = $(top_builddir)/src/libpmix.la dmodex_LDADD = $(top_builddir)/src/libpmix.la

229
opal/mca/pmix/pmix2x/pmix/examples/jctrl.c Обычный файл
Просмотреть файл

@ -0,0 +1,229 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#define _GNU_SOURCE
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <pmix.h>
static pmix_proc_t myproc;
/* this is the event notification function we pass down below
* when registering for general events - i.e.,, the default
* handler. We don't technically need to register one, but it
* is usually good practice to catch any events that occur */
static void notification_fn(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo,
pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
if (NULL != cbfunc) {
cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata);
}
}
/* event handler registration is done asynchronously because it
* may involve the PMIx server registering with the host RM for
* external events. So we provide a callback function that returns
* the status of the request (success or an error), plus a numerical index
* to the registered event. The index is used later on to deregister
* an event handler - if we don't explicitly deregister it, then the
* PMIx server will do so when it see us exit */
static void evhandler_reg_callbk(pmix_status_t status,
size_t evhandler_ref,
void *cbdata)
{
volatile int *active = (volatile int*)cbdata;
if (PMIX_SUCCESS != status) {
fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref);
}
*active = status;
}
static void infocbfunc(pmix_status_t status,
pmix_info_t *info, size_t ninfo,
void *cbdata,
pmix_release_cbfunc_t release_fn,
void *release_cbdata)
{
volatile int *active = (volatile int*)cbdata;
/* release the caller */
if (NULL != release_fn) {
release_fn(release_cbdata);
}
*active = status;
}
int main(int argc, char **argv)
{
int rc;
pmix_value_t value;
pmix_value_t *val = &value;
pmix_proc_t proc;
uint32_t nprocs, n;
pmix_info_t *info, *iptr;
bool flag;
volatile int active;
pmix_data_array_t *dptr;
/* init us - note that the call to "init" includes the return of
* any job-related info provided by the RM. */
if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %d\n", myproc.nspace, myproc.rank, rc);
exit(0);
}
fprintf(stderr, "Client ns %s rank %d: Running\n", myproc.nspace, myproc.rank);
/* register our default event handler - again, this isn't strictly
* required, but is generally good practice */
active = -1;
PMIx_Register_event_handler(NULL, 0, NULL, 0,
notification_fn, evhandler_reg_callbk, (void*)&active);
while (-1 == active) {
sleep(1);
}
if (0 != active) {
fprintf(stderr, "[%s:%d] Default handler registration failed\n", myproc.nspace, myproc.rank);
exit(active);
}
/* job-related info is found in our nspace, assigned to the
* wildcard rank as it doesn't relate to a specific rank. Setup
* a name to retrieve such values */
PMIX_PROC_CONSTRUCT(&proc);
(void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
/* get our universe size */
if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Get universe size failed: %d\n", myproc.nspace, myproc.rank, rc);
goto done;
}
nprocs = val->data.uint32;
PMIX_VALUE_RELEASE(val);
fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs);
/* inform the RM that we are preemptible, and that our checkpoint methods are
* "signal" on SIGUSR2 and event on PMIX_JCTRL_CHECKPOINT */
PMIX_INFO_CREATE(info, 2);
flag = true;
PMIX_INFO_LOAD(&info[0], PMIX_JOB_CTRL_PREEMPTIBLE, (void*)&flag, PMIX_BOOL);
/* can't use "load" to load a pmix_data_array_t */
(void)strncpy(info[1].key, PMIX_JOB_CTRL_CHECKPOINT_METHOD, PMIX_MAX_KEYLEN);
info[1].value.type = PMIX_DATA_ARRAY;
dptr = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t));
info[1].value.data.darray = dptr;
dptr->type = PMIX_INFO;
dptr->size = 2;
PMIX_INFO_CREATE(dptr->array, dptr->size);
rc = SIGUSR2;
iptr = (pmix_info_t*)dptr->array;
PMIX_INFO_LOAD(&iptr[0], PMIX_JOB_CTRL_CHECKPOINT_SIGNAL, &rc, PMIX_INT);
rc = PMIX_JCTRL_CHECKPOINT;
PMIX_INFO_LOAD(&iptr[1], PMIX_JOB_CTRL_CHECKPOINT_EVENT, &rc, PMIX_STATUS);
/* since this is informational and not a requested operation, the target parameter
* doesn't mean anything and can be ignored */
active = -1;
if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(NULL, 0, info, 2, infocbfunc, (void*)&active))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, rc);
goto done;
}
while (-1 == active) {
sleep(1);
}
PMIX_INFO_FREE(info, 2);
if (0 != active) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, rc);
exit(active);
}
/* now request that this process be monitored using heartbeats */
PMIX_INFO_CREATE(iptr, 1);
PMIX_INFO_LOAD(&iptr[0], PMIX_MONITOR_HEARTBEAT, NULL, PMIX_POINTER);
PMIX_INFO_CREATE(info, 3);
PMIX_INFO_LOAD(&info[0], PMIX_MONITOR_ID, "MONITOR1", PMIX_STRING);
n = 5; // require a heartbeat every 5 seconds
PMIX_INFO_LOAD(&info[1], PMIX_MONITOR_HEARTBEAT_TIME, &n, PMIX_UINT32);
n = 2; // two heartbeats can be missed before declaring us "stalled"
PMIX_INFO_LOAD(&info[2], PMIX_MONITOR_HEARTBEAT_DROPS, &n, PMIX_UINT32);
/* make the request */
active = -1;
if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(iptr, PMIX_MONITOR_HEARTBEAT_ALERT,
info, 3, infocbfunc, (void*)&active))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, rc);
goto done;
}
while (-1 == active) {
sleep(1);
}
PMIX_INFO_FREE(iptr, 1);
PMIX_INFO_FREE(info, 3);
if (0 != active) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, rc);
exit(active);
}
/* send a heartbeat */
PMIx_Heartbeat();
/* call fence to synchronize with our peers - no need to
* collect any info as we didn't "put" anything */
PMIX_INFO_CREATE(info, 1);
flag = false;
PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, info, 1))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc);
goto done;
}
PMIX_INFO_FREE(info, 1);
done:
/* finalize us */
fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank);
if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) {
fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %d\n", myproc.nspace, myproc.rank, rc);
} else {
fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank);
}
fflush(stderr);
return(0);
}

Просмотреть файл

@ -523,8 +523,14 @@ pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pmix_status_t
pmix_info_cbfunc_t cbfunc, void *cbdata); pmix_info_cbfunc_t cbfunc, void *cbdata);
/* define a special macro to simplify sending of a heartbeat */ /* define a special macro to simplify sending of a heartbeat */
#define PMIx_Heartbeat() \ #define PMIx_Heartbeat() \
PMIx_Process_monitor_nb(PMIX_SEND_HEARTBEAT, NULL, 0, NULL, NULL) do { \
pmix_info_t _in; \
PMIX_INFO_CONSTRUCT(&_in); \
PMIX_INFO_LOAD(&_in, PMIX_SEND_HEARTBEAT, NULL, PMIX_POINTER); \
PMIx_Process_monitor_nb(&_in, PMIX_SUCCESS, NULL, 0, NULL, NULL); \
PMIX_INFO_DESTRUCT(&_in); \
} while(0)
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }

Просмотреть файл

@ -245,10 +245,17 @@ typedef uint32_t pmix_rank_t;
#define PMIX_EVENT_HDLR_NAME "pmix.evname" // (char*) string name identifying this handler #define PMIX_EVENT_HDLR_NAME "pmix.evname" // (char*) string name identifying this handler
#define PMIX_EVENT_JOB_LEVEL "pmix.evjob" // (bool) register for job-specific events only #define PMIX_EVENT_JOB_LEVEL "pmix.evjob" // (bool) register for job-specific events only
#define PMIX_EVENT_ENVIRO_LEVEL "pmix.evenv" // (bool) register for environment events only #define PMIX_EVENT_ENVIRO_LEVEL "pmix.evenv" // (bool) register for environment events only
#define PMIX_EVENT_ORDER_PREPEND "pmix.evprepend" // (bool) prepend this handler to the precedence list #define PMIX_EVENT_HDLR_FIRST "pmix.evfirst" // (bool) invoke this event handler before any other handlers
#define PMIX_EVENT_CUSTOM_RANGE "pmix.evrange" // (pmix_proc_t*) array of pmix_proc_t defining range of event notification #define PMIX_EVENT_HDLR_LAST "pmix.evlast" // (bool) invoke this event handler after all other handlers have been called
#define PMIX_EVENT_HDLR_FIRST_IN_CATEGORY "pmix.evfirstcat" // (bool) invoke this event handler before any other handlers in this category
#define PMIX_EVENT_HDLR_LAST_IN_CATEGORY "pmix.evlastcat" // (bool) invoke this event handler after all other handlers in this category have been called
#define PMIX_EVENT_HDLR_BEFORE "pmix.evbefore" // (char*) put this event handler immediately before the one specified in the (char*) value
#define PMIX_EVENT_HDLR_AFTER "pmix.evafter" // (char*) put this event handler immediately after the one specified in the (char*) value
#define PMIX_EVENT_HDLR_PREPEND "pmix.evprepend" // (bool) prepend this handler to the precedence list within its category
#define PMIX_EVENT_HDLR_APPEND "pmix.evappend" // (bool) append this handler to the precedence list within its category
#define PMIX_EVENT_CUSTOM_RANGE "pmix.evrange" // (pmix_data_array_t*) array of pmix_proc_t defining range of event notification
#define PMIX_EVENT_AFFECTED_PROC "pmix.evproc" // (pmix_proc_t) single proc that was affected #define PMIX_EVENT_AFFECTED_PROC "pmix.evproc" // (pmix_proc_t) single proc that was affected
#define PMIX_EVENT_AFFECTED_PROCS "pmix.evaffected" // (pmix_proc_t*) array of pmix_proc_t defining affected procs #define PMIX_EVENT_AFFECTED_PROCS "pmix.evaffected" // (pmix_data_array_t*) array of pmix_proc_t defining affected procs
#define PMIX_EVENT_NON_DEFAULT "pmix.evnondef" // (bool) event is not to be delivered to default event handlers #define PMIX_EVENT_NON_DEFAULT "pmix.evnondef" // (bool) event is not to be delivered to default event handlers
#define PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked #define PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked
// NOTE: the object will _only_ be returned to the process that // NOTE: the object will _only_ be returned to the process that
@ -260,6 +267,10 @@ typedef uint32_t pmix_rank_t;
#define PMIX_EVENT_TERMINATE_NODE "pmix.evterm.node" // (bool) RM intends to terminate all procs on this node #define PMIX_EVENT_TERMINATE_NODE "pmix.evterm.node" // (bool) RM intends to terminate all procs on this node
#define PMIX_EVENT_TERMINATE_PROC "pmix.evterm.proc" // (bool) RM intends to terminate just this process #define PMIX_EVENT_TERMINATE_PROC "pmix.evterm.proc" // (bool) RM intends to terminate just this process
#define PMIX_EVENT_ACTION_TIMEOUT "pmix.evtimeout" // (int) time in sec before RM will execute error response #define PMIX_EVENT_ACTION_TIMEOUT "pmix.evtimeout" // (int) time in sec before RM will execute error response
#define PMIX_EVENT_NO_TERMINATION "pmix.evnoterm" // (bool) indicates that the handler has satisfactorily handled
// the event and believes termination of the application is not required
#define PMIX_EVENT_WANT_TERMINATION "pmix.evterm" // (bool) indicates that the handler has determined that the application should be terminated
/* attributes used to describe "spawn" attributes */ /* attributes used to describe "spawn" attributes */
#define PMIX_PERSONALITY "pmix.pers" // (char*) name of personality to use #define PMIX_PERSONALITY "pmix.pers" // (char*) name of personality to use
@ -363,25 +374,31 @@ typedef uint32_t pmix_rank_t;
#define PMIX_JOB_CTRL_CHECKPOINT_EVENT "pmix.jctrl.ckptev" // (bool) use event notification to trigger process checkpoint #define PMIX_JOB_CTRL_CHECKPOINT_EVENT "pmix.jctrl.ckptev" // (bool) use event notification to trigger process checkpoint
#define PMIX_JOB_CTRL_CHECKPOINT_SIGNAL "pmix.jctrl.ckptsig" // (int) use the given signal to trigger process checkpoint #define PMIX_JOB_CTRL_CHECKPOINT_SIGNAL "pmix.jctrl.ckptsig" // (int) use the given signal to trigger process checkpoint
#define PMIX_JOB_CTRL_CHECKPOINT_TIMEOUT "pmix.jctrl.ckptsig" // (int) time in seconds to wait for checkpoint to complete #define PMIX_JOB_CTRL_CHECKPOINT_TIMEOUT "pmix.jctrl.ckptsig" // (int) time in seconds to wait for checkpoint to complete
#define PMIX_JOB_CTRL_CHECKPOINT_METHOD "pmix.jctrl.ckmethod" // (pmix_data_array_t) array of pmix_info_t declaring each
// method and value supported by this application
#define PMIX_JOB_CTRL_SIGNAL "pmix.jctrl.sig" // (int) send given signal to specified processes #define PMIX_JOB_CTRL_SIGNAL "pmix.jctrl.sig" // (int) send given signal to specified processes
#define PMIX_JOB_CTRL_PROVISION "pmix.jctrl.pvn" // (char*) regex identifying nodes that are to be provisioned #define PMIX_JOB_CTRL_PROVISION "pmix.jctrl.pvn" // (char*) regex identifying nodes that are to be provisioned
#define PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned #define PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned
#define PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted #define PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted
/* monitoring attributes */ /* monitoring attributes */
#define PMIX_MONITOR_ID "pmix.monitor.id" // (char*) provide a string identifier for this request
#define PMIX_MONITOR_CANCEL "pmix.monitor.cancel" // (char*) identifier to be canceled (NULL = cancel all
// monitoring for this process)
#define PMIX_MONITOR_APP_CONTROL "pmix.monitor.appctrl" // (bool) the application desires to control the response to
// a monitoring event
#define PMIX_MONITOR_HEARTBEAT "pmix.monitor.mbeat" // (void) register to have the server monitor the requestor for heartbeats #define PMIX_MONITOR_HEARTBEAT "pmix.monitor.mbeat" // (void) register to have the server monitor the requestor for heartbeats
#define PMIX_SEND_HEARTBEAT "pmix.monitor.beat" // (void) send heartbeat to local server #define PMIX_SEND_HEARTBEAT "pmix.monitor.beat" // (void) send heartbeat to local server
#define PMIX_MONITOR_HEARTBEAT_TIME "pmix.monitor.btime" // (uint32_t) time in seconds before declaring heartbeat missed #define PMIX_MONITOR_HEARTBEAT_TIME "pmix.monitor.btime" // (uint32_t) time in seconds before declaring heartbeat missed
#define PMIX_MONITOR_HEARTBEAT_DROPS "pmix.monitor.bdrop" // (uint32_t) number of heartbeats that can be missed before taking #define PMIX_MONITOR_HEARTBEAT_DROPS "pmix.monitor.bdrop" // (uint32_t) number of heartbeats that can be missed before
// specified action // generating the event
#define PMIX_MONITOR_FILE "pmix.monitor.fmon" // (char*) register to monitor file for signs of life #define PMIX_MONITOR_FILE "pmix.monitor.fmon" // (char*) register to monitor file for signs of life
#define PMIX_MONITOR_FILE_SIZE "pmix.monitor.fsize" // (bool) monitor size of given file is growing to determine app is running #define PMIX_MONITOR_FILE_SIZE "pmix.monitor.fsize" // (bool) monitor size of given file is growing to determine app is running
#define PMIX_MONITOR_FILE_ACCESS "pmix.monitor.faccess" // (char*) monitor time since last access of given file to determine app is running #define PMIX_MONITOR_FILE_ACCESS "pmix.monitor.faccess" // (char*) monitor time since last access of given file to determine app is running
#define PMIX_MONITOR_FILE_MODIFY "pmix.monitor.fmod" // (char*) monitor time since last modified of given file to determine app is running #define PMIX_MONITOR_FILE_MODIFY "pmix.monitor.fmod" // (char*) monitor time since last modified of given file to determine app is running
#define PMIX_MONITOR_FILE_CHECK_TIME "pmix.monitor.ftime" // (uint32_t) time in seconds between checking file #define PMIX_MONITOR_FILE_CHECK_TIME "pmix.monitor.ftime" // (uint32_t) time in seconds between checking file
#define PMIX_MONITOR_FILE_DROPS "pmix.monitor.fdrop" // (uint32_t) number of file checks that can be missed before taking #define PMIX_MONITOR_FILE_DROPS "pmix.monitor.fdrop" // (uint32_t) number of file checks that can be missed before
// specified action // generating the event
/**** PROCESS STATE DEFINITIONS ****/ /**** PROCESS STATE DEFINITIONS ****/
typedef uint8_t pmix_proc_state_t; typedef uint8_t pmix_proc_state_t;
@ -490,19 +507,21 @@ typedef int pmix_status_t;
#define PMIX_ERR_V2X_BASE -100 #define PMIX_ERR_V2X_BASE -100
/* v2.x communication errors */ /* v2.x communication errors */
#define PMIX_ERR_LOST_CONNECTION_TO_SERVER (PMIX_ERR_V2X_BASE - 1) #define PMIX_ERR_LOST_CONNECTION_TO_SERVER (PMIX_ERR_V2X_BASE - 1)
#define PMIX_ERR_LOST_PEER_CONNECTION (PMIX_ERR_V2X_BASE - 2) #define PMIX_ERR_LOST_PEER_CONNECTION (PMIX_ERR_V2X_BASE - 2)
#define PMIX_ERR_LOST_CONNECTION_TO_CLIENT (PMIX_ERR_V2X_BASE - 3) #define PMIX_ERR_LOST_CONNECTION_TO_CLIENT (PMIX_ERR_V2X_BASE - 3)
/* used by the query system */ /* used by the query system */
#define PMIX_QUERY_PARTIAL_SUCCESS (PMIX_ERR_V2X_BASE - 4) #define PMIX_QUERY_PARTIAL_SUCCESS (PMIX_ERR_V2X_BASE - 4)
/* request responses */ /* request responses */
#define PMIX_NOTIFY_ALLOC_COMPLETE (PMIX_ERR_V2X_BASE - 5) #define PMIX_NOTIFY_ALLOC_COMPLETE (PMIX_ERR_V2X_BASE - 5)
/* job control */ /* job control */
#define PMIX_JCTRL_CHECKPOINT (PMIX_ERR_V2X_BASE - 6) #define PMIX_JCTRL_CHECKPOINT (PMIX_ERR_V2X_BASE - 6) // monitored by client to trigger checkpoint operation
#define PMIX_JCTRL_PREEMPT_ALERT (PMIX_ERR_V2X_BASE - 7) #define PMIX_JCTRL_CHECKPOINT_COMPLETE (PMIX_ERR_V2X_BASE - 7) // sent by client and monitored by server to notify that requested
// checkpoint operation has completed
#define PMIX_JCTRL_PREEMPT_ALERT (PMIX_ERR_V2X_BASE - 8) // monitored by client to detect RM intends to preempt
/* monitoring */ /* monitoring */
#define PMIX_MONITOR_HEARTBEAT_ALERT (PMIX_ERR_V2X_BASE - 8) #define PMIX_MONITOR_HEARTBEAT_ALERT (PMIX_ERR_V2X_BASE - 9)
#define PMIX_MONITOR_FILE_ALERT (PMIX_ERR_V2X_BASE - 9) #define PMIX_MONITOR_FILE_ALERT (PMIX_ERR_V2X_BASE - 10)
/* define a starting point for operational error constants so /* define a starting point for operational error constants so
* we avoid renumbering when making additions */ * we avoid renumbering when making additions */
@ -627,6 +646,7 @@ typedef uint8_t pmix_data_range_t;
#define PMIX_RANGE_SESSION 4 // data available to all procs in session #define PMIX_RANGE_SESSION 4 // data available to all procs in session
#define PMIX_RANGE_GLOBAL 5 // data available to all procs #define PMIX_RANGE_GLOBAL 5 // data available to all procs
#define PMIX_RANGE_CUSTOM 6 // range is specified in a pmix_info_t #define PMIX_RANGE_CUSTOM 6 // range is specified in a pmix_info_t
#define PMIX_RANGE_PROC_LOCAL 7 // restrict range to the local proc
/* define a "persistence" policy for data published by clients */ /* define a "persistence" policy for data published by clients */
typedef uint8_t pmix_persistence_t; typedef uint8_t pmix_persistence_t;

Просмотреть файл

@ -335,7 +335,8 @@ typedef pmix_status_t (*pmix_server_job_control_fn_t)(const pmix_proc_t *request
pmix_info_cbfunc_t cbfunc, void *cbdata); pmix_info_cbfunc_t cbfunc, void *cbdata);
/* Request that a client be monitored for activity */ /* Request that a client be monitored for activity */
typedef pmix_status_t (*pmix_server_monitor_fn_t)(const pmix_proc_t *requestor, pmix_status_t error, typedef pmix_status_t (*pmix_server_monitor_fn_t)(const pmix_proc_t *requestor,
const pmix_info_t *monitor, pmix_status_t error,
const pmix_info_t directives[], size_t ndirs, const pmix_info_t directives[], size_t ndirs,
pmix_info_cbfunc_t cbfunc, void *cbdata); pmix_info_cbfunc_t cbfunc, void *cbdata);

Просмотреть файл

@ -211,7 +211,7 @@ PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pm
} }
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:monitor handed to RM"); "pmix:monitor handed to RM");
rc = pmix_host_server.monitor(&pmix_globals.myid, error, rc = pmix_host_server.monitor(&pmix_globals.myid, monitor, error,
directives, ndirs, cbfunc, cbdata); directives, ndirs, cbfunc, cbdata);
return rc; return rc;
} }
@ -231,6 +231,13 @@ PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pm
return rc; return rc;
} }
/* pack the monitor */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, monitor, 1, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
PMIX_RELEASE(msg);
return rc;
}
/* pack the error */ /* pack the error */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &error, 1, PMIX_STATUS))) { if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &error, 1, PMIX_STATUS))) {
PMIX_ERROR_LOG(rc); PMIX_ERROR_LOG(rc);

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved * Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -29,47 +29,42 @@
BEGIN_C_DECLS BEGIN_C_DECLS
/* define an object for tracking event handlers focused on a #define PMIX_EVENT_ORDER_NONE 0x00
* single status code */ #define PMIX_EVENT_ORDER_FIRST 0x01
#define PMIX_EVENT_ORDER_LAST 0x02
#define PMIX_EVENT_ORDER_BEFORE 0x04
#define PMIX_EVENT_ORDER_AFTER 0x08
#define PMIX_EVENT_ORDER_PREPEND 0x10
#define PMIX_EVENT_ORDER_APPEND 0x20
/* define a struct for tracking registration ranges */
typedef struct {
pmix_data_range_t range;
pmix_proc_t *procs;
size_t nprocs;
} pmix_range_trkr_t;
/* define a common struct for tracking event handlers */
typedef struct { typedef struct {
pmix_list_item_t super; pmix_list_item_t super;
char *name; char *name;
size_t index; size_t index;
pmix_status_t code; uint8_t precedence;
char *locator;
pmix_range_trkr_t rng;
pmix_notification_fn_t evhdlr; pmix_notification_fn_t evhdlr;
void *cbobject; void *cbobject;
} pmix_single_event_t;
PMIX_CLASS_DECLARATION(pmix_single_event_t);
/* define an object for tracking event handlers registered
* on multiple status codes, generally corresponding to a
* functional group */
typedef struct {
pmix_list_item_t super;
char *name;
size_t index;
pmix_status_t *codes; pmix_status_t *codes;
size_t ncodes; size_t ncodes;
pmix_notification_fn_t evhdlr; } pmix_event_hdlr_t;
void *cbobject; PMIX_CLASS_DECLARATION(pmix_event_hdlr_t);
} pmix_multi_event_t;
PMIX_CLASS_DECLARATION(pmix_multi_event_t);
/* define an object for tracking default event handlers */
typedef struct {
pmix_list_item_t super;
char *name;
size_t index;
pmix_notification_fn_t evhdlr;
void *cbobject;
} pmix_default_event_t;
PMIX_CLASS_DECLARATION(pmix_default_event_t);
/* define an object for tracking status codes we are actively /* define an object for tracking status codes we are actively
* registered to receive */ * registered to receive */
typedef struct { typedef struct {
pmix_list_item_t super; pmix_list_item_t super;
pmix_status_t code; pmix_status_t code;
size_t nregs;
} pmix_active_code_t; } pmix_active_code_t;
PMIX_CLASS_DECLARATION(pmix_active_code_t); PMIX_CLASS_DECLARATION(pmix_active_code_t);
@ -79,6 +74,8 @@ PMIX_CLASS_DECLARATION(pmix_active_code_t);
typedef struct { typedef struct {
pmix_object_t super; pmix_object_t super;
size_t nhdlrs; size_t nhdlrs;
pmix_event_hdlr_t *first;
pmix_event_hdlr_t *last;
pmix_list_t actives; pmix_list_t actives;
pmix_list_t single_events; pmix_list_t single_events;
pmix_list_t multi_events; pmix_list_t multi_events;
@ -98,15 +95,14 @@ typedef struct pmix_event_chain_t {
pmix_object_t super; pmix_object_t super;
pmix_status_t status; pmix_status_t status;
bool nondefault; bool nondefault;
bool endchain;
pmix_proc_t source; pmix_proc_t source;
pmix_data_range_t range; pmix_data_range_t range;
pmix_info_t *info; pmix_info_t *info;
size_t ninfo; size_t ninfo;
pmix_info_t *results; pmix_info_t *results;
size_t nresults; size_t nresults;
pmix_single_event_t *sing; pmix_event_hdlr_t *evhdlr;
pmix_multi_event_t *multi;
pmix_default_event_t *def;
pmix_op_cbfunc_t final_cbfunc; pmix_op_cbfunc_t final_cbfunc;
void *final_cbdata; void *final_cbdata;
} pmix_event_chain_t; } pmix_event_chain_t;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/* /*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science * Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved. * and Technology (RIST). All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
@ -29,6 +29,8 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
pmix_info_t info[], size_t ninfo, pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata); pmix_op_cbfunc_t cbfunc, void *cbdata);
static bool check_range(pmix_range_trkr_t *range, const pmix_proc_t *proc);
/* if we are a client, we call this function to notify the server of /* if we are a client, we call this function to notify the server of
* an event. If we are a server, our host RM will call this function * an event. If we are a server, our host RM will call this function
* to notify us of an event */ * to notify us of an event */
@ -190,47 +192,49 @@ static void progress_local_event_hdlr(pmix_status_t status,
void *notification_cbdata) void *notification_cbdata)
{ {
pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata; pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata;
size_t n, nsave; size_t n, nsave, cnt;
pmix_info_t *newinfo; pmix_info_t *newinfo;
pmix_list_item_t *nxt; pmix_list_item_t *item;
pmix_single_event_t *sing; pmix_event_hdlr_t *nxt;
pmix_multi_event_t *multi;
pmix_default_event_t *def;
/* if the caller indicates that the chain is completed, then stop here */ /* aggregate the results per RFC0018 - first search the
if (PMIX_EVENT_ACTION_COMPLETE == status) { * prior chained results to see if any keys have been NULL'd
goto complete; * as this indicates that info struct should be removed */
} nsave = 0;
/* save the current number of results */
nsave = chain->nresults;
/* create the new space */
PMIX_INFO_CREATE(newinfo, chain->nresults + nresults + 1);
/* transfer over the prior data */
for (n=0; n < chain->nresults; n++) { for (n=0; n < chain->nresults; n++) {
PMIX_INFO_XFER(&newinfo[n], &chain->results[n]); if (NULL != chain->results[n].key) {
++nsave;
}
} }
/* save this handler's response */ /* we have to at least record the status returned by each
if (NULL != chain->sing) { * stage of the event handler chain, so we have to reallocate
if (NULL != chain->sing->name) { * the array to make space */
(void)strncpy(newinfo[nsave].key, chain->sing->name, PMIX_MAX_KEYLEN);
} /* add in any new results plus space for the returned status */
} else if (NULL != chain->multi) { nsave += nresults + 1;
if (NULL != chain->multi->name) { /* create the new space */
(void)strncpy(newinfo[nsave].key, chain->multi->name, PMIX_MAX_KEYLEN); PMIX_INFO_CREATE(newinfo, nsave);
} /* transfer over the prior data */
} else if (NULL != chain->def) { cnt = 0;
if (NULL != chain->def->name) { for (n=0; n < chain->nresults; n++) {
(void)strncpy(newinfo[nsave].key, chain->def->name, PMIX_MAX_KEYLEN); if (NULL != chain->results[n].key) {
PMIX_INFO_XFER(&newinfo[cnt], &chain->results[n]);
++cnt;
} }
}
/* save this handler's returned status */
if (NULL != chain->evhdlr->name) {
(void)strncpy(newinfo[cnt].key, chain->evhdlr->name, PMIX_MAX_KEYLEN);
} else { } else {
(void)strncpy(newinfo[nsave].key, "UNKNOWN", PMIX_MAX_KEYLEN); (void)strncpy(newinfo[cnt].key, "UNKNOWN", PMIX_MAX_KEYLEN);
} }
newinfo[nsave].value.type = PMIX_STATUS; newinfo[cnt].value.type = PMIX_STATUS;
newinfo[nsave].value.data.status = status; newinfo[cnt].value.data.status = status;
++cnt;
/* transfer across the new results */ /* transfer across the new results */
for (n=0; n < nresults; n++) { for (n=0; n < nresults; n++) {
PMIX_INFO_XFER(&newinfo[n+nsave+1], &results[n]); PMIX_INFO_XFER(&newinfo[cnt], &results[n]);
++cnt;
} }
/* release the prior results */ /* release the prior results */
if (0 < chain->nresults) { if (0 < chain->nresults) {
@ -238,76 +242,139 @@ static void progress_local_event_hdlr(pmix_status_t status,
} }
/* pass along the new ones */ /* pass along the new ones */
chain->results = newinfo; chain->results = newinfo;
chain->nresults = nsave + nresults; chain->nresults = cnt;
/* if the caller indicates that the chain is completed,
* or we completed the "last" event, then stop here */
if (PMIX_EVENT_ACTION_COMPLETE == status || chain->endchain) {
goto complete;
}
item = NULL;
/* see if we need to continue, starting with the single code events */ /* see if we need to continue, starting with the single code events */
if (NULL != chain->sing) { if (1 == chain->evhdlr->ncodes) {
/* the last handler was for a single code - see if there are /* the last handler was for a single code - see if there are
* any others that match this event */ * any others that match this event */
while (pmix_list_get_end(&pmix_globals.events.single_events) != (nxt = pmix_list_get_next(&chain->sing->super))) { item = &chain->evhdlr->super;
sing = (pmix_single_event_t*)nxt; while (pmix_list_get_end(&pmix_globals.events.single_events) != (item = pmix_list_get_next(item))) {
if (sing->code == chain->status) { nxt = (pmix_event_hdlr_t*)item;
chain->sing = sing; if (nxt->codes[0] == chain->status &&
check_range(&nxt->rng, &chain->source)) {
chain->evhdlr = nxt;
/* add any cbobject - the info struct for it is at the end */ /* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = sing->cbobject; chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
sing->evhdlr(sing->index, nxt->evhdlr(nxt->index,
chain->status, &chain->source, chain->status, &chain->source,
chain->info, chain->ninfo, chain->info, chain->ninfo,
chain->results, chain->nresults, chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain); progress_local_event_hdlr, (void*)chain);
goto complete; return;
} }
} }
/* if we get here, then there are no more single code /* if we get here, then there are no more single code
* events that match */ * events that match */
chain->sing = NULL; item = pmix_list_get_begin(&pmix_globals.events.multi_events);
/* pickup the beginning of the multi-code event list */
chain->multi = (pmix_multi_event_t*)pmix_list_get_begin(&pmix_globals.events.multi_events);
} }
/* see if we need to continue with the multi code events */ /* see if we need to continue with the multi code events */
if (NULL != chain->multi) { if (NULL != chain->evhdlr->codes || NULL != item) {
while (pmix_list_get_end(&pmix_globals.events.multi_events) != (nxt = pmix_list_get_next(&chain->multi->super))) { /* the last handler was for a multi-code event, or we exhausted
multi = (pmix_multi_event_t*)nxt; * all the single code events */
for (n=0; n < multi->ncodes; n++) { if (NULL == item) {
if (multi->codes[n] == chain->status) { /* if the last handler was multi-code, then start from that point */
/* found it - invoke the handler, pointing its item = &chain->evhdlr->super;
* callback function to our progression function */ }
chain->multi = multi; while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) {
nxt = (pmix_event_hdlr_t*)item;
if (!check_range(&nxt->rng, &chain->source)) {
continue;
}
for (n=0; n < nxt->ncodes; n++) {
/* if this event handler provided a range, check to see if
* the source fits within it */
if (nxt->codes[n] == chain->status) {
chain->evhdlr = nxt;
/* add any cbobject - the info struct for it is at the end */ /* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = multi->cbobject; chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
multi->evhdlr(multi->index, nxt->evhdlr(nxt->index,
chain->status, &chain->source, chain->status, &chain->source,
chain->info, chain->ninfo, chain->info, chain->ninfo,
chain->results, chain->nresults, chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain); progress_local_event_hdlr, (void*)chain);
goto complete; return;
} }
} }
} }
/* if we get here, then there are no more multi-mode /* if we get here, then there are no more multi-mode
* events that match */ * events that match */
chain->multi = NULL; item = pmix_list_get_begin(&pmix_globals.events.default_events);
/* pickup the beginning of the default event list */
chain->def = (pmix_default_event_t*)pmix_list_get_begin(&pmix_globals.events.default_events);
} }
/* if they didn't want it to go to a default handler, then we are done */ /* if they didn't want it to go to a default handler, then ignore them */
if (chain->nondefault) { if (!chain->nondefault) {
goto complete; if (NULL == item) {
item = &chain->evhdlr->super;
}
if (pmix_list_get_end(&pmix_globals.events.default_events) != (item = pmix_list_get_next(item))) {
nxt = (pmix_event_hdlr_t*)item;
/* if this event handler provided a range, check to see if
* the source fits within it */
if (check_range(&nxt->rng, &chain->source)) {
chain->evhdlr = nxt;
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = nxt->cbobject;
nxt->evhdlr(nxt->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain);
return;
}
}
} }
if (NULL != chain->def) { /* if we registered a "last" handler, and it fits the given range
if (pmix_list_get_end(&pmix_globals.events.default_events) != (nxt = pmix_list_get_next(&chain->def->super))) { * and code, then invoke it now */
def = (pmix_default_event_t*)nxt; if (NULL != pmix_globals.events.last &&
chain->def = def; check_range(&pmix_globals.events.last->rng, &chain->source)) {
chain->endchain = true; // ensure we don't do this again
if (1 == pmix_globals.events.last->ncodes &&
pmix_globals.events.last->codes[0] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
/* add any cbobject - the info struct for it is at the end */ /* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = def->cbobject; chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
def->evhdlr(def->index, chain->evhdlr->evhdlr(chain->evhdlr->index,
chain->status, &chain->source, chain->status, &chain->source,
chain->info, chain->ninfo, chain->info, chain->ninfo,
chain->results, chain->nresults, chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain); progress_local_event_hdlr, (void*)chain);
return;
} else if (NULL != pmix_globals.events.last->codes) {
/* need to check if this code is included in the array */
for (n=0; n < pmix_globals.events.last->ncodes; n++) {
if (pmix_globals.events.last->codes[n] == chain->status) {
chain->evhdlr = pmix_globals.events.last;
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
chain->evhdlr->evhdlr(chain->evhdlr->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain);
return;
}
}
} else {
/* gets run for all codes */
chain->evhdlr = pmix_globals.events.last;
/* add any cbobject - the info struct for it is at the end */
chain->info[chain->ninfo-1].value.data.ptr = pmix_globals.events.last->cbobject;
chain->evhdlr->evhdlr(chain->evhdlr->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
chain->results, chain->nresults,
progress_local_event_hdlr, (void*)chain);
return;
} }
} }
@ -339,10 +406,9 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
/* We need to parse thru each registered handler and determine /* We need to parse thru each registered handler and determine
* which one(s) to call for the specific error */ * which one(s) to call for the specific error */
size_t i; size_t i;
pmix_single_event_t *sing; pmix_event_hdlr_t *evhdlr;
pmix_multi_event_t *multi;
pmix_default_event_t *def;
pmix_status_t rc = PMIX_SUCCESS; pmix_status_t rc = PMIX_SUCCESS;
bool found;
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"%s:%d invoke_local_event_hdlr", "%s:%d invoke_local_event_hdlr",
@ -363,45 +429,63 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
} }
} }
/* if we registered a "first" handler, and it fits the given range,
* then invoke it first */
if (NULL != pmix_globals.events.first) {
if (1 == pmix_globals.events.first->ncodes &&
pmix_globals.events.first->codes[0] == chain->status &&
check_range(&pmix_globals.events.first->rng, &chain->source)) {
/* invoke the handler */
chain->evhdlr = pmix_globals.events.first;
goto invk;
} else if (NULL != pmix_globals.events.first->codes) {
/* need to check if this code is included in the array */
found = false;
for (i=0; i < pmix_globals.events.first->ncodes; i++) {
if (pmix_globals.events.first->codes[i] == chain->status) {
found = true;
break;
}
}
/* if this event handler provided a range, check to see if
* the source fits within it */
if (found && check_range(&pmix_globals.events.first->rng, &chain->source)) {
/* invoke the handler */
chain->evhdlr = pmix_globals.events.first;
goto invk;
}
} else {
/* take all codes for a default handler */
if (check_range(&pmix_globals.events.first->rng, &chain->source)) {
/* invoke the handler */
chain->evhdlr = pmix_globals.events.first;
goto invk;
}
}
/* get here if there is no match, so fall thru */
}
/* cycle thru the single-event registrations first */ /* cycle thru the single-event registrations first */
PMIX_LIST_FOREACH(sing, &pmix_globals.events.single_events, pmix_single_event_t) { PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
if (sing->code == chain->status) { if (evhdlr->codes[0] == chain->status) {
/* found it - invoke the handler, pointing its if (check_range(&evhdlr->rng, &chain->source)) {
* callback function to our progression function */ /* invoke the handler */
chain->sing = sing; chain->evhdlr = evhdlr;
/* add any cbobject - the info struct for it is at the end */ goto invk;
chain->info[chain->ninfo-1].value.data.ptr = sing->cbobject; }
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] CALLING SINGLE EVHDLR",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
sing->evhdlr(sing->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
NULL, 0,
progress_local_event_hdlr, (void*)chain);
return;
} }
} }
/* if we didn't find any match in the single-event registrations, /* if we didn't find any match in the single-event registrations,
* then cycle thru the multi-event registrations next */ * then cycle thru the multi-event registrations next */
PMIX_LIST_FOREACH(multi, &pmix_globals.events.multi_events, pmix_multi_event_t) { PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
for (i=0; i < multi->ncodes; i++) { for (i=0; i < evhdlr->ncodes; i++) {
if (multi->codes[i] == chain->status) { if (evhdlr->codes[i] == chain->status) {
/* found it - invoke the handler, pointing its if (check_range(&evhdlr->rng, &chain->source)) {
* callback function to our progression function */ /* invoke the handler */
chain->multi = multi; chain->evhdlr = evhdlr;
/* add any cbobject - the info struct for it is at the end */ goto invk;
chain->info[chain->ninfo-1].value.data.ptr = multi->cbobject; }
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] CALLING MULTI EVHDLR",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
multi->evhdlr(multi->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
NULL, 0,
progress_local_event_hdlr, (void*)chain);
return;
} }
} }
} }
@ -412,26 +496,33 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
} }
/* finally, pass it to any default handlers */ /* finally, pass it to any default handlers */
PMIX_LIST_FOREACH(def, &pmix_globals.events.default_events, pmix_default_event_t) { PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
chain->def = def; if (check_range(&evhdlr->rng, &chain->source)) {
/* add any cbobject - the info struct for it is at the end */ /* invoke the handler */
chain->info[chain->ninfo-1].value.data.ptr = def->cbobject; chain->evhdlr = evhdlr;
pmix_output_verbose(2, pmix_globals.debug_output, goto invk;
"[%s:%d] CALLING DEFAULT EVHDLR", __FILE__, __LINE__); }
def->evhdlr(def->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
NULL, 0,
progress_local_event_hdlr, (void*)chain);
return;
} }
/* if we got here, then nothing was found */
complete: complete:
/* we still have to call their final callback */ /* we still have to call their final callback */
if (NULL != chain->final_cbfunc) { if (NULL != chain->final_cbfunc) {
chain->final_cbfunc(rc, chain->final_cbdata); chain->final_cbfunc(rc, chain->final_cbdata);
} }
return; return;
invk:
/* invoke the handler */
chain->info[chain->ninfo-1].value.data.ptr = chain->evhdlr->cbobject;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] INVOKING EVHDLR", __FILE__, __LINE__);
chain->evhdlr->evhdlr(chain->evhdlr->index,
chain->status, &chain->source,
chain->info, chain->ninfo,
NULL, 0,
progress_local_event_hdlr, (void*)chain);
return;
} }
@ -617,66 +708,104 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
return PMIX_SUCCESS; return PMIX_SUCCESS;
} }
static void sevcon(pmix_single_event_t *p) static bool check_range(pmix_range_trkr_t *rng,
const pmix_proc_t *proc)
{
size_t n;
if (PMIX_RANGE_UNDEF == rng->range ||
PMIX_RANGE_GLOBAL == rng->range ||
PMIX_RANGE_SESSION == rng->range ||
PMIX_RANGE_LOCAL == rng->range) { // assume RM took care of session & local for now
return true;
}
if (PMIX_RANGE_NAMESPACE == rng->range) {
if (0 == strncmp(pmix_globals.myid.nspace, proc->nspace, PMIX_MAX_NSLEN)) {
return true;
}
return false;
}
if (PMIX_RANGE_PROC_LOCAL == rng->range) {
if (0 == strncmp(pmix_globals.myid.nspace, proc->nspace, PMIX_MAX_NSLEN) &&
pmix_globals.myid.rank == proc->rank) {
return true;
}
return false;
}
if (PMIX_RANGE_CUSTOM == rng->range) {
if (NULL != rng->procs) {
/* see if this proc was included */
for (n=0; n < rng->nprocs; n++) {
if (0 != strncmp(rng->procs[n].nspace, proc->nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == rng->procs[n].rank ||
rng->procs[n].rank == proc->rank) {
return true;
}
}
/* if we get here, then this proc isn't in range */
return false;
} else {
/* if they didn't give us a list, then assume
* everyone included */
return true;
}
}
/* if it is anything else, then reject it */
return false;
}
/**** CLASS INSTANTIATIONS ****/
static void sevcon(pmix_event_hdlr_t *p)
{ {
p->name = NULL; p->name = NULL;
p->index = UINT_MAX;
p->precedence = PMIX_EVENT_ORDER_NONE;
p->locator = NULL;
p->rng.range = PMIX_RANGE_UNDEF;
p->rng.procs = NULL;
p->rng.nprocs = 0;
p->evhdlr = NULL; p->evhdlr = NULL;
p->cbobject = NULL; p->cbobject = NULL;
p->codes = NULL;
p->ncodes = 0;
} }
static void sevdes(pmix_single_event_t *p) static void sevdes(pmix_event_hdlr_t *p)
{ {
if (NULL != p->name) { if (NULL != p->name) {
free(p->name); free(p->name);
} }
} if (NULL != p->locator) {
PMIX_CLASS_INSTANCE(pmix_single_event_t, free(p->locator);
pmix_list_item_t, }
sevcon, sevdes); if (NULL != p->rng.procs) {
free(p->rng.procs);
static void mevcon(pmix_multi_event_t *p)
{
p->name = NULL;
p->codes = NULL;
p->ncodes = 0;
p->evhdlr = NULL;
p->cbobject = NULL;
}
static void mevdes(pmix_multi_event_t *p)
{
if (NULL != p->name) {
free(p->name);
} }
if (NULL != p->codes) { if (NULL != p->codes) {
free(p->codes); free(p->codes);
} }
} }
PMIX_CLASS_INSTANCE(pmix_multi_event_t, PMIX_CLASS_INSTANCE(pmix_event_hdlr_t,
pmix_list_item_t, pmix_list_item_t,
mevcon, mevdes); sevcon, sevdes);
static void devcon(pmix_default_event_t *p) static void accon(pmix_active_code_t *p)
{ {
p->name = NULL; p->nregs = 0;
p->evhdlr = NULL;
p->cbobject = NULL;
} }
static void devdes(pmix_default_event_t *p)
{
if (NULL != p->name) {
free(p->name);
}
}
PMIX_CLASS_INSTANCE(pmix_default_event_t,
pmix_list_item_t,
devcon, devdes);
PMIX_CLASS_INSTANCE(pmix_active_code_t, PMIX_CLASS_INSTANCE(pmix_active_code_t,
pmix_list_item_t, pmix_list_item_t,
NULL, NULL); accon, NULL);
static void evcon(pmix_events_t *p) static void evcon(pmix_events_t *p)
{ {
p->nhdlrs = 0; p->nhdlrs = 0;
p->first = NULL;
p->last = NULL;
PMIX_CONSTRUCT(&p->actives, pmix_list_t); PMIX_CONSTRUCT(&p->actives, pmix_list_t);
PMIX_CONSTRUCT(&p->single_events, pmix_list_t); PMIX_CONSTRUCT(&p->single_events, pmix_list_t);
PMIX_CONSTRUCT(&p->multi_events, pmix_list_t); PMIX_CONSTRUCT(&p->multi_events, pmix_list_t);
@ -684,6 +813,12 @@ static void evcon(pmix_events_t *p)
} }
static void evdes(pmix_events_t *p) static void evdes(pmix_events_t *p)
{ {
if (NULL != p->first) {
PMIX_RELEASE(p->first);
}
if (NULL != p->last) {
PMIX_RELEASE(p->last);
}
PMIX_LIST_DESTRUCT(&p->actives); PMIX_LIST_DESTRUCT(&p->actives);
PMIX_LIST_DESTRUCT(&p->single_events); PMIX_LIST_DESTRUCT(&p->single_events);
PMIX_LIST_DESTRUCT(&p->multi_events); PMIX_LIST_DESTRUCT(&p->multi_events);
@ -698,14 +833,13 @@ static void chcon(pmix_event_chain_t *p)
memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1); memset(p->source.nspace, 0, PMIX_MAX_NSLEN+1);
p->source.rank = PMIX_RANK_UNDEF; p->source.rank = PMIX_RANK_UNDEF;
p->nondefault = false; p->nondefault = false;
p->endchain = false;
p->range = PMIX_RANGE_UNDEF; p->range = PMIX_RANGE_UNDEF;
p->info = NULL; p->info = NULL;
p->ninfo = 0; p->ninfo = 0;
p->results = NULL; p->results = NULL;
p->nresults = 0; p->nresults = 0;
p->sing = NULL; p->evhdlr = NULL;
p->multi = NULL;
p->def = NULL;
p->final_cbfunc = NULL; p->final_cbfunc = NULL;
p->final_cbdata = NULL; p->final_cbdata = NULL;
} }

Просмотреть файл

@ -22,39 +22,46 @@
#include "src/client/pmix_client_ops.h" #include "src/client/pmix_client_ops.h"
#include "src/server/pmix_server_ops.h" #include "src/server/pmix_server_ops.h"
#include "src/include/pmix_globals.h" #include "src/include/pmix_globals.h"
#include "src/event/pmix_event.h"
typedef struct { typedef struct {
pmix_object_t super; pmix_object_t super;
volatile bool active;
pmix_event_t ev;
size_t index; size_t index;
bool firstoverall;
bool enviro;
pmix_list_t *list; pmix_list_t *list;
pmix_list_item_t *item; pmix_event_hdlr_t *hdlr;
pmix_shift_caddy_t *cd; void *cd;
pmix_status_t *codes; pmix_status_t *codes;
size_t ncodes; size_t ncodes;
pmix_info_t *info; pmix_info_t *info;
size_t ninfo; size_t ninfo;
pmix_notification_fn_t evhdlr;
pmix_evhdlr_reg_cbfunc_t evregcbfn;
void *cbdata;
} pmix_rshift_caddy_t; } pmix_rshift_caddy_t;
static void rscon(pmix_rshift_caddy_t *p) static void rscon(pmix_rshift_caddy_t *p)
{ {
p->firstoverall = false;
p->enviro = false;
p->list = NULL; p->list = NULL;
p->item = NULL; p->hdlr = NULL;
p->cd = NULL; p->cd = NULL;
p->codes = NULL; p->codes = NULL;
p->ncodes = 0; p->ncodes = 0;
p->info = NULL; p->info = NULL;
p->ninfo = 0; p->ninfo = 0;
p->evhdlr = NULL;
p->evregcbfn = NULL;
p->cbdata = NULL;
} }
static void rsdes(pmix_rshift_caddy_t *p) static void rsdes(pmix_rshift_caddy_t *p)
{ {
if (NULL != p->cd) { if (NULL != p->cd) {
PMIX_RELEASE(p->cd); PMIX_RELEASE(p->cd);
} }
if (NULL != p->codes) {
free(p->codes);
}
if (NULL != p->info) {
PMIX_INFO_FREE(p->info, p->ninfo);
}
} }
PMIX_CLASS_INSTANCE(pmix_rshift_caddy_t, PMIX_CLASS_INSTANCE(pmix_rshift_caddy_t,
pmix_object_t, pmix_object_t,
@ -65,6 +72,7 @@ static void regevents_cbfunc(struct pmix_peer_t *peer, pmix_ptl_hdr_t *hdr,
pmix_buffer_t *buf, void *cbdata) pmix_buffer_t *buf, void *cbdata)
{ {
pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata; pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata;
pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rb->cd;
pmix_status_t rc, ret; pmix_status_t rc, ret;
int cnt; int cnt;
size_t index = rb->index; size_t index = rb->index;
@ -78,17 +86,34 @@ static void regevents_cbfunc(struct pmix_peer_t *peer, pmix_ptl_hdr_t *hdr,
(PMIX_SUCCESS != ret)) { (PMIX_SUCCESS != ret)) {
PMIX_ERROR_LOG(rc); PMIX_ERROR_LOG(rc);
/* remove the err handler and call the error handler reg completion callback fn.*/ /* remove the err handler and call the error handler reg completion callback fn.*/
if (NULL != rb->list && NULL != rb->item) { if (NULL == rb->list) {
pmix_list_remove_item(rb->list, rb->item); if (NULL != rb->hdlr) {
PMIX_RELEASE(rb->item); PMIX_RELEASE(rb->hdlr);
}
if (rb->firstoverall) {
pmix_globals.events.first = NULL;
} else {
pmix_globals.events.last = NULL;
}
} else if (NULL != rb->hdlr) {
pmix_list_remove_item(rb->list, &rb->hdlr->super);
PMIX_RELEASE(rb->hdlr);
} }
ret = PMIX_ERR_SERVER_FAILED_REQUEST; ret = PMIX_ERR_SERVER_FAILED_REQUEST;
index = UINT_MAX; index = UINT_MAX;
} }
/* call the callback */ /* call the callback */
if (NULL != rb->cd && NULL != rb->cd->cbfunc.evregcbfn) { if (NULL != cd && NULL != cd->evregcbfn) {
rb->cd->cbfunc.evregcbfn(ret, index, rb->cd->cbdata); cd->evregcbfn(ret, index, cd->cbdata);
}
/* release any info we brought along as they are
* internally generated and not provided by the caller */
if (NULL!= rb->info) {
PMIX_INFO_FREE(rb->info, rb->ninfo);
}
if (NULL != rb->codes) {
free(rb->codes);
} }
PMIX_RELEASE(rb); PMIX_RELEASE(rb);
} }
@ -96,29 +121,47 @@ static void regevents_cbfunc(struct pmix_peer_t *peer, pmix_ptl_hdr_t *hdr,
static void reg_cbfunc(pmix_status_t status, void *cbdata) static void reg_cbfunc(pmix_status_t status, void *cbdata)
{ {
pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata; pmix_rshift_caddy_t *rb = (pmix_rshift_caddy_t*)cbdata;
pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rb->cd;
pmix_status_t rc = status; pmix_status_t rc = status;
size_t index = rb->index; size_t index = rb->index;
if (PMIX_SUCCESS != status) { if (PMIX_SUCCESS != status) {
/* if we failed to register, then remove this event */ /* if we failed to register, then remove this event */
if (NULL != rb->list && NULL != rb->item) { if (NULL == rb->list) {
pmix_list_remove_item(rb->list, rb->item); if (NULL != rb->hdlr) {
PMIX_RELEASE(rb->item); PMIX_RELEASE(rb->hdlr);
rc = PMIX_ERR_SERVER_FAILED_REQUEST; }
index = UINT_MAX; if (rb->firstoverall) {
pmix_globals.events.first = NULL;
} else {
pmix_globals.events.last = NULL;
}
} else if (NULL != rb->hdlr) {
pmix_list_remove_item(rb->list, &rb->hdlr->super);
PMIX_RELEASE(rb->hdlr);
} }
rc = PMIX_ERR_SERVER_FAILED_REQUEST;
index = UINT_MAX;
} }
if (NULL != rb->cd && NULL != rb->cd->cbfunc.evregcbfn) { if (NULL != cd && NULL != cd->evregcbfn) {
/* pass back our local index */ /* pass back our local index */
rb->cd->cbfunc.evregcbfn(rc, index, rb->cd->cbdata); cd->evregcbfn(rc, index, cd->cbdata);
}
/* release any info we brought along as they are
* internally generated and not provided by the caller */
if (NULL!= rb->info) {
PMIX_INFO_FREE(rb->info, rb->ninfo);
}
if (NULL != rb->codes) {
free(rb->codes);
} }
PMIX_RELEASE(rb); PMIX_RELEASE(rb);
} }
static pmix_status_t _send_to_server(pmix_rshift_caddy_t *rcd) static pmix_status_t _send_to_server(pmix_rshift_caddy_t *rcd)
{ {
pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)rcd->cd;
pmix_status_t rc; pmix_status_t rc;
pmix_buffer_t *msg; pmix_buffer_t *msg;
pmix_cmd_t cmd=PMIX_REGEVENTS_CMD; pmix_cmd_t cmd=PMIX_REGEVENTS_CMD;
@ -130,13 +173,13 @@ static pmix_status_t _send_to_server(pmix_rshift_caddy_t *rcd)
return rc; return rc;
} }
/* pack the number of codes */ /* pack the number of codes */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &rcd->cd->ncodes, 1, PMIX_SIZE))) { if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &cd->ncodes, 1, PMIX_SIZE))) {
PMIX_ERROR_LOG(rc); PMIX_ERROR_LOG(rc);
return rc; return rc;
} }
/* pack any provided codes - may be NULL */ /* pack any provided codes - may be NULL */
if (NULL != rcd->cd->codes && 0 < rcd->cd->ncodes) { if (NULL != cd->codes && 0 < cd->ncodes) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, rcd->cd->codes, rcd->cd->ncodes, PMIX_STATUS))) { if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, cd->codes, cd->ncodes, PMIX_STATUS))) {
PMIX_ERROR_LOG(rc); PMIX_ERROR_LOG(rc);
return rc; return rc;
} }
@ -163,9 +206,7 @@ static pmix_status_t _send_to_server(pmix_rshift_caddy_t *rcd)
return rc; return rc;
} }
static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item, static pmix_status_t _add_hdlr(pmix_rshift_caddy_t *cd, pmix_list_t *xfer)
size_t index, bool prepend, pmix_list_t *xfer,
pmix_shift_caddy_t *cd)
{ {
pmix_rshift_caddy_t *cd2; pmix_rshift_caddy_t *cd2;
pmix_info_caddy_t *ixfer; pmix_info_caddy_t *ixfer;
@ -177,12 +218,6 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: _add_hdlr"); "pmix: _add_hdlr");
if (prepend) {
pmix_list_prepend(list, item);
} else {
pmix_list_append(list, item);
}
/* check to see if we have an active registration on these codes */ /* check to see if we have an active registration on these codes */
if (NULL == cd->codes) { if (NULL == cd->codes) {
registered = false; registered = false;
@ -190,15 +225,15 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
if (PMIX_MAX_ERR_CONSTANT == active->code) { if (PMIX_MAX_ERR_CONSTANT == active->code) {
/* we have registered a default */ /* we have registered a default */
registered = true; registered = true;
++active->nregs;
break; break;
} }
} }
if (!registered) { if (!registered) {
active = PMIX_NEW(pmix_active_code_t); active = PMIX_NEW(pmix_active_code_t);
active->code = PMIX_MAX_ERR_CONSTANT; active->code = PMIX_MAX_ERR_CONSTANT;
active->nregs = 1;
pmix_list_append(&pmix_globals.events.actives, &active->super); pmix_list_append(&pmix_globals.events.actives, &active->super);
/* ensure we register it */
need_register = true;
} }
} else { } else {
for (n=0; n < cd->ncodes; n++) { for (n=0; n < cd->ncodes; n++) {
@ -206,12 +241,14 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) { PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
if (active->code == cd->codes[n]) { if (active->code == cd->codes[n]) {
registered = true; registered = true;
++active->nregs;
break; break;
} }
} }
if (!registered) { if (!registered) {
active = PMIX_NEW(pmix_active_code_t); active = PMIX_NEW(pmix_active_code_t);
active->code = cd->codes[n]; active->code = cd->codes[n];
active->nregs = 1;
pmix_list_append(&pmix_globals.events.actives, &active->super); pmix_list_append(&pmix_globals.events.actives, &active->super);
/* ensure we register it */ /* ensure we register it */
need_register = true; need_register = true;
@ -221,9 +258,10 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
/* prep next step */ /* prep next step */
cd2 = PMIX_NEW(pmix_rshift_caddy_t); cd2 = PMIX_NEW(pmix_rshift_caddy_t);
cd2->index = index; cd2->index = cd->index;
cd2->list = list; cd2->firstoverall = cd->firstoverall;
cd2->item = item; cd2->list = cd->list;
cd2->hdlr = cd->hdlr;
PMIX_RETAIN(cd); PMIX_RETAIN(cd);
cd2->cd = cd; cd2->cd = cd;
cd2->ninfo = pmix_list_get_size(xfer); cd2->ninfo = pmix_list_get_size(xfer);
@ -249,9 +287,10 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
if (PMIX_SUCCESS != (rc = _send_to_server(cd2))) { if (PMIX_SUCCESS != (rc = _send_to_server(cd2))) {
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: add_hdlr - pack send_to_server failed status=%d", rc); "pmix: add_hdlr - pack send_to_server failed status=%d", rc);
if (NULL != cd2->info) {
PMIX_INFO_FREE(cd2->info, cd2->ninfo);
}
PMIX_RELEASE(cd2); PMIX_RELEASE(cd2);
pmix_list_remove_item(list, item);
PMIX_RELEASE(item);
return rc; return rc;
} }
return PMIX_ERR_WOULD_BLOCK; return PMIX_ERR_WOULD_BLOCK;
@ -267,13 +306,17 @@ static pmix_status_t _add_hdlr(pmix_list_t *list, pmix_list_item_t *item,
if (PMIX_SUCCESS != (rc = pmix_host_server.register_events(cd->codes, cd->ncodes, if (PMIX_SUCCESS != (rc = pmix_host_server.register_events(cd->codes, cd->ncodes,
cd2->info, cd2->ninfo, cd2->info, cd2->ninfo,
reg_cbfunc, cd2))) { reg_cbfunc, cd2))) {
if (NULL != cd2->info) {
PMIX_INFO_FREE(cd2->info, cd2->ninfo);
}
PMIX_RELEASE(cd2); PMIX_RELEASE(cd2);
pmix_list_remove_item(list, item);
PMIX_RELEASE(item);
return rc; return rc;
} }
return PMIX_ERR_WOULD_BLOCK; return PMIX_ERR_WOULD_BLOCK;
} else { } else {
if (NULL != cd2->info) {
PMIX_INFO_FREE(cd2->info, cd2->ninfo);
}
PMIX_RELEASE(cd2); PMIX_RELEASE(cd2);
} }
@ -284,15 +327,18 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
{ {
size_t index = 0, n; size_t index = 0, n;
pmix_status_t rc; pmix_status_t rc;
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata; pmix_rshift_caddy_t *cd = (pmix_rshift_caddy_t*)cbdata;
pmix_single_event_t *sing; pmix_event_hdlr_t *evhdlr, *ev;
pmix_multi_event_t *multi; uint8_t location = PMIX_EVENT_ORDER_NONE;
pmix_default_event_t *def; char *name = NULL, *locator = NULL;
bool prepend = false; bool firstoverall=false, lastoverall=false;
char *name = NULL; bool found;
pmix_list_t xfer; pmix_list_t xfer;
pmix_info_caddy_t *ixfer; pmix_info_caddy_t *ixfer;
void *cbobject = NULL; void *cbobject = NULL;
pmix_data_range_t range = PMIX_RANGE_UNDEF;
pmix_proc_t *parray = NULL;
size_t nprocs;
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: register event_hdlr with %d infos", (int)cd->ninfo); "pmix: register event_hdlr with %d infos", (int)cd->ninfo);
@ -302,16 +348,60 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
/* if directives were included */ /* if directives were included */
if (NULL != cd->info) { if (NULL != cd->info) {
for (n=0; n < cd->ninfo; n++) { for (n=0; n < cd->ninfo; n++) {
if (0 == strcmp(cd->info[n].key, PMIX_EVENT_ORDER_PREPEND)) { if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_FIRST, PMIX_MAX_KEYLEN)) {
/* flag if they asked to prepend this event /* flag if they asked to put this one first overall */
* on the precedence order */ if (PMIX_UNDEF == cd->info[n].value.type ||
prepend = true; cd->info[n].value.data.flag) {
} else if (0 == strcmp(cd->info[n].key, PMIX_EVENT_HDLR_NAME)) { firstoverall = true;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_LAST, PMIX_MAX_KEYLEN)) {
/* flag if they asked to put this one last overall */
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
lastoverall = true;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_PREPEND, PMIX_MAX_KEYLEN)) {
/* flag if they asked to prepend this handler */
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
location = PMIX_EVENT_ORDER_PREPEND;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_APPEND, PMIX_MAX_KEYLEN)) {
/* flag if they asked to append this handler */
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
location = PMIX_EVENT_ORDER_APPEND;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_NAME, PMIX_MAX_KEYLEN)) {
name = cd->info[n].value.data.string; name = cd->info[n].value.data.string;
} else if (0 == strcmp(cd->info[n].key, PMIX_EVENT_ENVIRO_LEVEL)) { } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_ENVIRO_LEVEL, PMIX_MAX_KEYLEN)) {
cd->enviro = cd->info[n].value.data.flag; if (PMIX_UNDEF == cd->info[n].value.type ||
} else if (0 == strcmp(cd->info[n].key, PMIX_EVENT_RETURN_OBJECT)) { cd->info[n].value.data.flag) {
cd->enviro = true;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) {
cbobject = cd->info[n].value.data.ptr; cbobject = cd->info[n].value.data.ptr;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_FIRST_IN_CATEGORY, PMIX_MAX_KEYLEN)) {
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
location = PMIX_EVENT_ORDER_FIRST;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_LAST_IN_CATEGORY, PMIX_MAX_KEYLEN)) {
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
location = PMIX_EVENT_ORDER_LAST;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_BEFORE, PMIX_MAX_KEYLEN)) {
location = PMIX_EVENT_ORDER_BEFORE;
locator = cd->info[n].value.data.string;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_HDLR_AFTER, PMIX_MAX_KEYLEN)) {
location = PMIX_EVENT_ORDER_AFTER;
locator = cd->info[n].value.data.string;
} else if (0 == strncmp(cd->info[n].key, PMIX_RANGE, PMIX_MAX_KEYLEN)) {
range = cd->info[n].value.data.range;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
parray = (pmix_proc_t*)cd->info[n].value.data.darray->array;
nprocs = cd->info[n].value.data.darray->size;
} else { } else {
ixfer = PMIX_NEW(pmix_info_caddy_t); ixfer = PMIX_NEW(pmix_info_caddy_t);
ixfer->info = &cd->info[n]; ixfer->info = &cd->info[n];
@ -320,51 +410,62 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
} }
} }
/* if the code array is NULL, then this is a default event /* if they indicated this is to be the "first" or "last" event, then
* registration request */ * first check to ensure they didn't already direct some
if (NULL == cd->codes) { * other event into the same cherished position */
def = PMIX_NEW(pmix_default_event_t); if (firstoverall || lastoverall) {
if (NULL != name) { if ((firstoverall && NULL != pmix_globals.events.first) ||
def->name = strdup(name); (lastoverall && NULL != pmix_globals.events.last)) {
} /* oops - someone already took that position */
index = pmix_globals.events.nhdlrs;
++pmix_globals.events.nhdlrs;
def->index = index;
def->evhdlr = cd->evhdlr;
def->cbobject = cbobject;
rc = _add_hdlr(&pmix_globals.events.default_events, &def->super,
index, prepend, &xfer, cd);
PMIX_LIST_DESTRUCT(&xfer);
if (PMIX_SUCCESS != rc &&
PMIX_ERR_WOULD_BLOCK != rc) {
/* unable to register */
--pmix_globals.events.nhdlrs;
rc = PMIX_ERR_EVENT_REGISTRATION;
index = UINT_MAX; index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
goto ack; goto ack;
} }
if (PMIX_ERR_WOULD_BLOCK == rc) { evhdlr = PMIX_NEW(pmix_event_hdlr_t);
/* the callback will provide our response */ if (NULL == evhdlr) {
PMIX_RELEASE(cd); index = UINT_MAX;
return; rc = PMIX_ERR_EVENT_REGISTRATION;
goto ack;
} }
goto ack;
}
/* if there is only one code, then this is a single event registration */
if (1 == cd->ncodes) {
sing = PMIX_NEW(pmix_single_event_t);
if (NULL != name) { if (NULL != name) {
sing->name = strdup(name); evhdlr->name = strdup(name);
} }
sing->code = cd->codes[0];
index = pmix_globals.events.nhdlrs; index = pmix_globals.events.nhdlrs;
sing->index = index; evhdlr->index = index;
sing->evhdlr = cd->evhdlr; evhdlr->rng.range = range;
++pmix_globals.events.nhdlrs; if (NULL != parray) {
sing->cbobject = cbobject; evhdlr->rng.nprocs = nprocs;
rc = _add_hdlr(&pmix_globals.events.single_events, &sing->super, PMIX_PROC_CREATE(evhdlr->rng.procs, nprocs);
index, prepend, &xfer, cd); if (NULL == evhdlr->rng.procs) {
index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
PMIX_RELEASE(evhdlr);
goto ack;
}
memcpy(evhdlr->rng.procs, parray, nprocs * sizeof(pmix_proc_t));
}
evhdlr->evhdlr = cd->evhdlr;
evhdlr->cbobject = cbobject;
if (NULL != cd->codes) {
evhdlr->codes = (pmix_status_t*)malloc(cd->ncodes * sizeof(pmix_status_t));
if (NULL == evhdlr->codes) {
PMIX_RELEASE(evhdlr);
index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
goto ack;
}
memcpy(evhdlr->codes, cd->codes, cd->ncodes * sizeof(pmix_status_t));
}
if (firstoverall) {
pmix_globals.events.first = evhdlr;
} else {
pmix_globals.events.last = evhdlr;
}
cd->index = index;
cd->list = NULL;
cd->hdlr = evhdlr;
cd->firstoverall = firstoverall;
rc = _add_hdlr(cd, &xfer);
PMIX_LIST_DESTRUCT(&xfer); PMIX_LIST_DESTRUCT(&xfer);
if (PMIX_SUCCESS != rc && if (PMIX_SUCCESS != rc &&
PMIX_ERR_WOULD_BLOCK != rc) { PMIX_ERR_WOULD_BLOCK != rc) {
@ -372,6 +473,12 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
--pmix_globals.events.nhdlrs; --pmix_globals.events.nhdlrs;
rc = PMIX_ERR_EVENT_REGISTRATION; rc = PMIX_ERR_EVENT_REGISTRATION;
index = UINT_MAX; index = UINT_MAX;
if (firstoverall) {
pmix_globals.events.first = NULL;
} else {
pmix_globals.events.last = NULL;
}
PMIX_RELEASE(evhdlr);
goto ack; goto ack;
} }
if (PMIX_ERR_WOULD_BLOCK == rc) { if (PMIX_ERR_WOULD_BLOCK == rc) {
@ -382,30 +489,164 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
goto ack; goto ack;
} }
/* must be a multi-code registration */ /* get here if this isn't an overall first or last event - start
multi = PMIX_NEW(pmix_multi_event_t); * by creating an event */
if (NULL != name) { evhdlr = PMIX_NEW(pmix_event_hdlr_t);
multi->name = strdup(name); if (NULL == evhdlr) {
index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
goto ack;
}
if (NULL != name) {
evhdlr->name = strdup(name);
} }
multi->codes = (pmix_status_t*)malloc(cd->ncodes * sizeof(pmix_status_t));
multi->ncodes = cd->ncodes;
memcpy(multi->codes, cd->codes, cd->ncodes * sizeof(pmix_status_t));
index = pmix_globals.events.nhdlrs; index = pmix_globals.events.nhdlrs;
multi->index = index; evhdlr->index = index;
multi->evhdlr = cd->evhdlr; evhdlr->precedence = location;
++pmix_globals.events.nhdlrs; evhdlr->locator = locator;
multi->cbobject = cbobject; evhdlr->rng.range = range;
rc = _add_hdlr(&pmix_globals.events.multi_events, &multi->super, if (NULL != parray) {
index, prepend, &xfer, cd); evhdlr->rng.nprocs = nprocs;
PMIX_PROC_CREATE(evhdlr->rng.procs, nprocs);
if (NULL == evhdlr->rng.procs) {
index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
PMIX_RELEASE(evhdlr);
goto ack;
}
memcpy(evhdlr->rng.procs, parray, nprocs * sizeof(pmix_proc_t));
}
evhdlr->evhdlr = cd->evhdlr;
evhdlr->cbobject = cbobject;
if (NULL == cd->codes) {
/* this is a default handler */
cd->list = &pmix_globals.events.default_events;
} else {
evhdlr->codes = (pmix_status_t*)malloc(cd->ncodes * sizeof(pmix_status_t));
if (NULL == evhdlr->codes) {
PMIX_RELEASE(evhdlr);
index = UINT_MAX;
rc = PMIX_ERR_EVENT_REGISTRATION;
goto ack;
}
memcpy(evhdlr->codes, cd->codes, cd->ncodes * sizeof(pmix_status_t));
if (1 == cd->ncodes) {
cd->list = &pmix_globals.events.single_events;
} else {
cd->list = &pmix_globals.events.multi_events;
}
}
/* setup to add the handler */
cd->index = index;
cd->hdlr = evhdlr;
cd->firstoverall = false;
/* tell the server about it, if necessary - any actions
* will be deferred until after this event completes */
if (PMIX_RANGE_PROC_LOCAL == range) {
rc = PMIX_SUCCESS;
} else {
rc = _add_hdlr(cd, &xfer);
}
PMIX_LIST_DESTRUCT(&xfer); PMIX_LIST_DESTRUCT(&xfer);
if (PMIX_SUCCESS != rc && if (PMIX_SUCCESS != rc &&
PMIX_ERR_WOULD_BLOCK != rc) { PMIX_ERR_WOULD_BLOCK != rc) {
/* unable to register */ /* unable to register */
--pmix_globals.events.nhdlrs; --pmix_globals.events.nhdlrs;
rc = PMIX_ERR_EVENT_REGISTRATION; rc = PMIX_ERR_EVENT_REGISTRATION;
index = UINT_MAX; index = UINT_MAX;
PMIX_RELEASE(evhdlr);
goto ack; goto ack;
} }
/* now add this event to the appropriate list - if the registration
* subsequently fails, it will be removed */
/* if the list is empty, or no location was specified, just put this on it */
if (0 == pmix_list_get_size(cd->list) ||
PMIX_EVENT_ORDER_NONE == location) {
pmix_list_prepend(cd->list, &evhdlr->super);
} else if (PMIX_EVENT_ORDER_FIRST == location) {
/* see if the first handler on the list was also declared as "first" */
ev = (pmix_event_hdlr_t*)pmix_list_get_first(cd->list);
if (PMIX_EVENT_ORDER_FIRST == ev->precedence) {
/* this is an error */
--pmix_globals.events.nhdlrs;
rc = PMIX_ERR_EVENT_REGISTRATION;
index = UINT_MAX;
PMIX_RELEASE(evhdlr);
goto ack;
}
/* prepend it to the list */
pmix_list_prepend(cd->list, &evhdlr->super);
} else if (PMIX_EVENT_ORDER_LAST == location) {
/* see if the last handler on the list was also declared as "last" */
ev = (pmix_event_hdlr_t*)pmix_list_get_last(cd->list);
if (PMIX_EVENT_ORDER_LAST == ev->precedence) {
/* this is an error */
--pmix_globals.events.nhdlrs;
rc = PMIX_ERR_EVENT_REGISTRATION;
index = UINT_MAX;
PMIX_RELEASE(evhdlr);
goto ack;
}
/* append it to the list */
pmix_list_append(cd->list, &evhdlr->super);
} else if (PMIX_EVENT_ORDER_PREPEND == location) {
/* we know the list isn't empty - check the first element to see if
* it is designated to be "first". If so, then we need to put this
* right after it */
ev = (pmix_event_hdlr_t*)pmix_list_get_first(cd->list);
if (PMIX_EVENT_ORDER_FIRST == ev->precedence) {
ev = (pmix_event_hdlr_t*)pmix_list_get_next(&ev->super);
if (NULL != ev) {
pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
} else {
/* we are at the end of the list */
pmix_list_append(cd->list, &evhdlr->super);
}
} else {
pmix_list_prepend(cd->list, &evhdlr->super);
}
} else if (PMIX_EVENT_ORDER_APPEND == location) {
/* we know the list isn't empty - check the last element to see if
* it is designated to be "last". If so, then we need to put this
* right before it */
ev = (pmix_event_hdlr_t*)pmix_list_get_last(cd->list);
if (PMIX_EVENT_ORDER_LAST == ev->precedence) {
pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
} else {
pmix_list_append(cd->list, &evhdlr->super);
}
} else {
/* find the named event */
found = false;
PMIX_LIST_FOREACH(ev, cd->list, pmix_event_hdlr_t) {
if (NULL == ev->name) {
continue;
}
if (0 == strcmp(ev->name, name)) {
if (PMIX_EVENT_ORDER_BEFORE == location) {
/* put it before this handler */
pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
} else {
/* put it after this handler */
ev = (pmix_event_hdlr_t*)pmix_list_get_next(&ev->super);
if (NULL != ev) {
pmix_list_insert_pos(cd->list, &ev->super, &evhdlr->super);
} else {
/* we are at the end of the list */
pmix_list_append(cd->list, &evhdlr->super);
}
}
found = true;
break;
}
}
/* if the handler wasn't found, then it may show up later - so
* for now just prepend it to the list */
if (!found) {
pmix_list_prepend(cd->list, &evhdlr->super);
}
}
if (PMIX_ERR_WOULD_BLOCK == rc) { if (PMIX_ERR_WOULD_BLOCK == rc) {
/* the callback will provide our response */ /* the callback will provide our response */
PMIX_RELEASE(cd); PMIX_RELEASE(cd);
@ -415,7 +656,9 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
ack: ack:
/* acknowledge the registration so the caller can release /* acknowledge the registration so the caller can release
* their data */ * their data */
cd->cbfunc.evregcbfn(rc, index, cd->cbdata); if (NULL != cd->evregcbfn) {
cd->evregcbfn(rc, index, cd->cbdata);
}
PMIX_RELEASE(cd); PMIX_RELEASE(cd);
} }
@ -426,17 +669,17 @@ PMIX_EXPORT void PMIx_Register_event_handler(pmix_status_t codes[], size_t ncode
pmix_evhdlr_reg_cbfunc_t cbfunc, pmix_evhdlr_reg_cbfunc_t cbfunc,
void *cbdata) void *cbdata)
{ {
pmix_shift_caddy_t *cd; pmix_rshift_caddy_t *cd;
/* need to thread shift this request so we can access /* need to thread shift this request so we can access
* our global data to register this *local* event handler */ * our global data to register this *local* event handler */
cd = PMIX_NEW(pmix_shift_caddy_t); cd = PMIX_NEW(pmix_rshift_caddy_t);
cd->codes = codes; cd->codes = codes;
cd->ncodes = ncodes; cd->ncodes = ncodes;
cd->info = info; cd->info = info;
cd->ninfo = ninfo; cd->ninfo = ninfo;
cd->evhdlr = event_hdlr; cd->evhdlr = event_hdlr;
cd->cbfunc.errregcbfn = cbfunc; cd->evregcbfn = cbfunc;
cd->cbdata = cbdata; cd->cbdata = cbdata;
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
@ -449,14 +692,12 @@ static void dereg_event_hdlr(int sd, short args, void *cbdata)
{ {
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata; pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
pmix_buffer_t *msg = NULL; pmix_buffer_t *msg = NULL;
pmix_single_event_t *sing, *s2; pmix_event_hdlr_t *evhdlr, *ev;
pmix_multi_event_t *multi, *m2;
pmix_default_event_t *def;
pmix_cmd_t cmd = PMIX_DEREGEVENTS_CMD; pmix_cmd_t cmd = PMIX_DEREGEVENTS_CMD;
pmix_status_t rc = PMIX_SUCCESS; pmix_status_t rc = PMIX_SUCCESS;
pmix_status_t wildcard = PMIX_MAX_ERR_CONSTANT; pmix_status_t wildcard = PMIX_MAX_ERR_CONSTANT;
size_t n; size_t n;
bool found, foundcode; pmix_active_code_t *active;
/* if I am not the server, then I need to notify the server /* if I am not the server, then I need to notify the server
* to remove my registration */ * to remove my registration */
@ -468,101 +709,130 @@ static void dereg_event_hdlr(int sd, short args, void *cbdata)
} }
} }
/* the registration can be in any of three places, so check them all */ /* check the first and last locations */
PMIX_LIST_FOREACH(def, &pmix_globals.events.default_events, pmix_default_event_t) { if (NULL != pmix_globals.events.first ||
if (def->index == cd->ref) { NULL != pmix_globals.events.last) {
if (pmix_globals.events.first->index == cd->ref ||
pmix_globals.events.last->index == cd->ref) {
/* found it */ /* found it */
pmix_list_remove_item(&pmix_globals.events.default_events, &def->super); if (pmix_globals.events.first->index == cd->ref) {
ev = pmix_globals.events.first;
} else {
ev = pmix_globals.events.last;
}
if (NULL != msg) {
/* if this is a default handler, see if any other default
* handlers remain */
if (NULL == ev->codes) {
if (0 == pmix_list_get_size(&pmix_globals.events.default_events)) {
/* tell the server to dereg our default handler */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &wildcard, 1, PMIX_STATUS))) {
PMIX_RELEASE(msg);
goto cleanup;
}
}
} else {
for (n=0; n < ev->ncodes; n++) {
/* see if this is the last registration we have for this code */
PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
if (active->code == ev->codes[n]) {
--active->nregs;
if (0 == active->nregs) {
pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
/* tell the server to dereg this code */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &active->code, 1, PMIX_STATUS))) {
PMIX_RELEASE(active);
PMIX_RELEASE(msg);
goto cleanup;
}
PMIX_RELEASE(active);
}
break;
}
}
}
}
}
if (pmix_globals.events.first->index == cd->ref) {
pmix_globals.events.first = NULL;
} else {
pmix_globals.events.last = NULL;
}
PMIX_RELEASE(ev);
goto cleanup;
}
}
/* the registration can be in any of three places, so check each of them */
PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.default_events, pmix_event_hdlr_t) {
if (evhdlr->index == cd->ref) {
/* found it */
pmix_list_remove_item(&pmix_globals.events.default_events, &evhdlr->super);
if (NULL != msg) { if (NULL != msg) {
/* if there are no more default handlers registered, tell /* if there are no more default handlers registered, tell
* the server to dereg the default handler */ * the server to dereg the default handler */
if (0 == pmix_list_get_size(&pmix_globals.events.default_events)) { if (0 == pmix_list_get_size(&pmix_globals.events.default_events)) {
n = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &n, 1, PMIX_SIZE))) {
PMIX_RELEASE(msg);
goto cleanup;
}
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &wildcard, 1, PMIX_STATUS))) { if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &wildcard, 1, PMIX_STATUS))) {
PMIX_RELEASE(msg); PMIX_RELEASE(msg);
goto cleanup; goto cleanup;
} }
} }
} }
PMIX_RELEASE(def); PMIX_RELEASE(evhdlr);
goto report; goto report;
} }
} }
PMIX_LIST_FOREACH(sing, &pmix_globals.events.single_events, pmix_single_event_t) { PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.single_events, pmix_event_hdlr_t) {
if (sing->index == cd->ref) { if (evhdlr->index == cd->ref) {
/* found it */ /* found it */
pmix_list_remove_item(&pmix_globals.events.single_events, &sing->super); pmix_list_remove_item(&pmix_globals.events.single_events, &evhdlr->super);
if (NULL != msg) { if (NULL != msg) {
/* if there are no more handlers registered for this code, tell /* see if this is the last registration we have for this code */
* the server to dereg the handler for this code */ PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
found = false; if (active->code == evhdlr->codes[0]) {
PMIX_LIST_FOREACH(s2, &pmix_globals.events.single_events, pmix_single_event_t) { --active->nregs;
if (s2->code == sing->code) { if (0 == active->nregs) {
found = true; pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
break; /* tell the server to dereg this code */
} if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &active->code, 1, PMIX_STATUS))) {
} PMIX_RELEASE(active);
if (!found) { PMIX_RELEASE(msg);
n = 1; goto cleanup;
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &n, 1, PMIX_SIZE))) { }
PMIX_RELEASE(msg); PMIX_RELEASE(active);
PMIX_RELEASE(sing);
goto cleanup;
}
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &sing->code, 1, PMIX_STATUS))) {
PMIX_RELEASE(msg);
PMIX_RELEASE(sing);
goto cleanup;
}
}
}
PMIX_RELEASE(sing);
goto report;
}
}
PMIX_LIST_FOREACH(multi, &pmix_globals.events.multi_events, pmix_multi_event_t) {
if (multi->index == cd->ref) {
/* found it */
pmix_list_remove_item(&pmix_globals.events.multi_events, &multi->super);
if (NULL != msg) {
/* if there are no more handlers registered for this code, tell
* the server to dereg the handler for this code */
found = false;
PMIX_LIST_FOREACH(m2, &pmix_globals.events.multi_events, pmix_multi_event_t) {
if (m2->ncodes != multi->ncodes) {
continue;
}
foundcode = true;
for (n=0; n < multi->ncodes; n++) {
if (m2->codes[n] != multi->codes[n]) {
foundcode = false;
break;
} }
}
if (foundcode) {
found = true;
break; break;
} }
} }
if (!found) { }
n = multi->ncodes; PMIX_RELEASE(evhdlr);
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &n, 1, PMIX_SIZE))) { goto report;
PMIX_RELEASE(msg); }
PMIX_RELEASE(multi); }
goto cleanup; PMIX_LIST_FOREACH(evhdlr, &pmix_globals.events.multi_events, pmix_event_hdlr_t) {
} if (evhdlr->index == cd->ref) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &multi->codes, n, PMIX_STATUS))) { /* found it */
PMIX_RELEASE(msg); pmix_list_remove_item(&pmix_globals.events.multi_events, &evhdlr->super);
PMIX_RELEASE(multi); for (n=0; n < evhdlr->ncodes; n++) {
goto cleanup; /* see if this is the last registration we have for this code */
PMIX_LIST_FOREACH(active, &pmix_globals.events.actives, pmix_active_code_t) {
if (active->code == evhdlr->codes[n]) {
--active->nregs;
if (0 == active->nregs) {
pmix_list_remove_item(&pmix_globals.events.actives, &active->super);
/* tell the server to dereg this code */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msg, &active->code, 1, PMIX_STATUS))) {
PMIX_RELEASE(active);
PMIX_RELEASE(msg);
goto cleanup;
}
PMIX_RELEASE(active);
}
break;
} }
} }
} }
PMIX_RELEASE(multi); PMIX_RELEASE(evhdlr);
goto report; goto report;
} }
} }

Просмотреть файл

@ -42,7 +42,7 @@ pmix_psensor_base_module_t pmix_psensor = {
pmix_psensor_base_start, pmix_psensor_base_start,
pmix_psensor_base_stop pmix_psensor_base_stop
}; };
pmix_psensor_base_t pmix_psensor_base = {{{0}}};; pmix_psensor_base_t pmix_psensor_base = {{{0}}};
static bool use_separate_thread = false; static bool use_separate_thread = false;

Просмотреть файл

@ -18,8 +18,6 @@
#include "src/mca/psensor/base/base.h" #include "src/mca/psensor/base/base.h"
static bool mods_active = false;
pmix_status_t pmix_psensor_base_start(pmix_peer_t *requestor, pmix_status_t error, pmix_status_t pmix_psensor_base_start(pmix_peer_t *requestor, pmix_status_t error,
const pmix_info_t *monitor, const pmix_info_t *monitor,
const pmix_info_t directives[], size_t ndirs) const pmix_info_t directives[], size_t ndirs)
@ -27,7 +25,7 @@ pmix_status_t pmix_psensor_base_start(pmix_peer_t *requestor, pmix_status_t erro
pmix_psensor_active_module_t *mod; pmix_psensor_active_module_t *mod;
pmix_status_t rc; pmix_status_t rc;
opal_output_verbose(5, pmix_psensor_base_framework.framework_output, pmix_output_verbose(5, pmix_psensor_base_framework.framework_output,
"%s:%d sensor:base: starting sensors", "%s:%d sensor:base: starting sensors",
pmix_globals.myid.nspace, pmix_globals.myid.rank); pmix_globals.myid.nspace, pmix_globals.myid.rank);
@ -50,7 +48,7 @@ pmix_status_t pmix_psensor_base_stop(pmix_peer_t *requestor,
pmix_psensor_active_module_t *mod; pmix_psensor_active_module_t *mod;
pmix_status_t rc; pmix_status_t rc;
opal_output_verbose(5, pmix_psensor_base_framework.framework_output, pmix_output_verbose(5, pmix_psensor_base_framework.framework_output,
"%s:%d sensor:base: stopping sensors", "%s:%d sensor:base: stopping sensors",
pmix_globals.myid.nspace, pmix_globals.myid.rank); pmix_globals.myid.nspace, pmix_globals.myid.rank);

Просмотреть файл

@ -176,8 +176,7 @@ static pmix_status_t start(pmix_peer_t *requestor, pmix_status_t error,
const pmix_info_t directives[], size_t ndirs) const pmix_info_t directives[], size_t ndirs)
{ {
file_tracker_t *ft; file_tracker_t *ft;
pmix_info_t *ptr; size_t n;
size_t n, n2;
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] checking file monitoring for requestor %s:%d", "[%s:%d] checking file monitoring for requestor %s:%d",
@ -278,7 +277,7 @@ static void file_sample(int sd, short args, void *cbdata)
pmix_status_t rc; pmix_status_t rc;
pmix_proc_t source; pmix_proc_t source;
OPAL_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sampling file %s", "[%s:%d] sampling file %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank, pmix_globals.myid.nspace, pmix_globals.myid.rank,
ft->file)); ft->file));
@ -301,7 +300,7 @@ static void file_sample(int sd, short args, void *cbdata)
(unsigned long)buf.st_size, ctime(&buf.st_atime), ctime(&buf.st_mtime))); (unsigned long)buf.st_size, ctime(&buf.st_atime), ctime(&buf.st_mtime)));
if (ft->file_size) { if (ft->file_size) {
if (buf.st_size == ft->last_size) { if (buf.st_size == (int64_t)ft->last_size) {
ft->nmisses++; ft->nmisses++;
} else { } else {
ft->nmisses = 0; ft->nmisses = 0;
@ -323,7 +322,6 @@ static void file_sample(int sd, short args, void *cbdata)
} }
} }
CHECK:
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sampled file %s misses %d", "[%s:%d] sampled file %s misses %d",
pmix_globals.myid.nspace, pmix_globals.myid.rank, pmix_globals.myid.nspace, pmix_globals.myid.rank,

Просмотреть файл

@ -165,7 +165,7 @@ static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error
const pmix_info_t directives[], size_t ndirs) const pmix_info_t directives[], size_t ndirs)
{ {
pmix_heartbeat_trkr_t *ft; pmix_heartbeat_trkr_t *ft;
size_t n, n2; size_t n;
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] checking heartbeat monitoring for requestor %s:%d", "[%s:%d] checking heartbeat monitoring for requestor %s:%d",

Просмотреть файл

@ -156,12 +156,10 @@ static void lost_connection(pmix_peer_t *peer, pmix_status_t err)
PMIX_CONSTRUCT(&buf, pmix_buffer_t); PMIX_CONSTRUCT(&buf, pmix_buffer_t);
hdr.nbytes = 0; // initialize the hdr to something safe hdr.nbytes = 0; // initialize the hdr to something safe
PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) { PMIX_LIST_FOREACH(rcv, &pmix_ptl_globals.posted_recvs, pmix_ptl_posted_recv_t) {
if (PMIX_PTL_TAG_DYNAMIC <= rcv->tag && UINT_MAX != rcv->tag) { if (UINT_MAX != rcv->tag && NULL != rcv->cbfunc) {
if (NULL != rcv->cbfunc) { /* construct and load the buffer */
/* construct and load the buffer */ hdr.tag = rcv->tag;
hdr.tag = rcv->tag; rcv->cbfunc(pmix_globals.mypeer, &hdr, &buf, rcv->cbdata);
rcv->cbfunc(pmix_globals.mypeer, &hdr, &buf, rcv->cbdata);
}
} }
} }
PMIX_DESTRUCT(&buf); PMIX_DESTRUCT(&buf);

Просмотреть файл

@ -1219,9 +1219,7 @@ void pmix_server_deregister_events(pmix_peer_t *peer,
pmix_buffer_t *buf) pmix_buffer_t *buf)
{ {
int32_t cnt; int32_t cnt;
pmix_status_t rc, *codes = NULL, *cdptr, maxcode = PMIX_MAX_ERR_CONSTANT; pmix_status_t rc, code;
pmix_info_t *info = NULL;
size_t ninfo=0, ncodes, ncds, n;
pmix_regevents_info_t *reginfo = NULL; pmix_regevents_info_t *reginfo = NULL;
pmix_regevents_info_t *reginfo_next; pmix_regevents_info_t *reginfo_next;
pmix_peer_events_info_t *prev; pmix_peer_events_info_t *prev;
@ -1229,34 +1227,11 @@ void pmix_server_deregister_events(pmix_peer_t *peer,
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"recvd deregister events"); "recvd deregister events");
/* unpack the number of codes */ /* unpack codes and process until done */
cnt=1; cnt=1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ncodes, &cnt, PMIX_SIZE))) { while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(buf, &code, &cnt, PMIX_STATUS))) {
/* it is okay if there aren't any - equivalent to a wildcard */
ncodes = 0;
}
/* unpack the array of codes */
if (0 < ncodes) {
codes = (pmix_status_t*)malloc(ncodes * sizeof(pmix_status_t));
cnt=ncodes;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, codes, &cnt, PMIX_STATUS))) {
PMIX_ERROR_LOG(rc);
goto cleanup;
}
}
/* find the event registration info so we can delete them */
if (NULL == codes) {
cdptr = &maxcode;
ncds = 1;
} else {
cdptr = codes;
ncds = ncodes;
}
for (n=0; n < ncds; n++) {
PMIX_LIST_FOREACH_SAFE(reginfo, reginfo_next, &pmix_server_globals.events, pmix_regevents_info_t) { PMIX_LIST_FOREACH_SAFE(reginfo, reginfo_next, &pmix_server_globals.events, pmix_regevents_info_t) {
if (cdptr[n] == reginfo->code) { if (code == reginfo->code) {
/* found it - remove this peer from the list */ /* found it - remove this peer from the list */
PMIX_LIST_FOREACH(prev, &reginfo->peers, pmix_peer_events_info_t) { PMIX_LIST_FOREACH(prev, &reginfo->peers, pmix_peer_events_info_t) {
if (prev->peer == peer) { if (prev->peer == peer) {
@ -1275,15 +1250,9 @@ void pmix_server_deregister_events(pmix_peer_t *peer,
} }
} }
} }
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
cleanup: PMIX_ERROR_LOG(rc);
if (NULL != codes) {
free(codes);
} }
if (NULL != info) {
PMIX_INFO_FREE(info, ninfo);
}
return;
} }
@ -1636,6 +1605,7 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
void *cbdata) void *cbdata)
{ {
int32_t cnt; int32_t cnt;
pmix_info_t monitor;
pmix_status_t rc, error; pmix_status_t rc, error;
pmix_query_caddy_t *cd; pmix_query_caddy_t *cd;
pmix_proc_t proc; pmix_proc_t proc;
@ -1650,6 +1620,14 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
cd = PMIX_NEW(pmix_query_caddy_t); cd = PMIX_NEW(pmix_query_caddy_t);
cd->cbdata = cbdata; cd->cbdata = cbdata;
/* unpack what is to be monitored */
PMIX_INFO_CONSTRUCT(&monitor);
cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &monitor, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
goto exit;
}
/* unpack the error code */ /* unpack the error code */
cnt = 1; cnt = 1;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &error, &cnt, PMIX_STATUS))) { if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &error, &cnt, PMIX_STATUS))) {
@ -1678,7 +1656,7 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
proc.rank = peer->info->rank; proc.rank = peer->info->rank;
/* ask the host to execute the request */ /* ask the host to execute the request */
if (PMIX_SUCCESS != (rc = pmix_host_server.monitor(&proc, error, if (PMIX_SUCCESS != (rc = pmix_host_server.monitor(&proc, &monitor, error,
cd->info, cd->ninfo, cd->info, cd->ninfo,
cbfunc, cd))) { cbfunc, cd))) {
goto exit; goto exit;
@ -1686,6 +1664,7 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer,
return PMIX_SUCCESS; return PMIX_SUCCESS;
exit: exit:
PMIX_INFO_DESTRUCT(&monitor);
PMIX_RELEASE(cd); PMIX_RELEASE(cd);
return rc; return rc;
} }