1
1
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2018-09-12 13:00:17 -07:00
родитель 3a584fee53
Коммит 131ea01320
18 изменённых файлов: 370 добавлений и 308 удалений

Просмотреть файл

@ -21,6 +21,18 @@ example, a bug might be fixed in the master, and then moved to the
current release as well as the "stable" bug fix release branch.
3.0.2 -- 18 Sept 2018
----------------------
- Ensure we cleanup any active sensors when a peer departs. Allow the
heartbeat monitor to "reset" if a process stops beating and subsequently
returns
- Fix a few bugs in the event notification system and provide some
missing implementation (support for specifying target procs to
receive the event).
- Add PMIX_PROC_TERMINATED constant
- Properly deal with EOPNOTSUPP from getsockopt() on ARM
3.0.1 -- 23 Aug 2018
----------------------
**** DEPRECATION WARNING: The pmix_info_array_t struct was
@ -77,6 +89,18 @@ current release as well as the "stable" bug fix release branch.
- Fix several memory and file descriptor leaks
2.1.4 -- 18 Sep 2018
----------------------
- Updated configury to silence warnings on older compilers
- Implement job control and sensor APIs
- Update sensor support
- Fix a few bugs in the event notification system and provide some
missing implementation (support for specifying target procs to
receive the event).
- Add PMIX_PROC_TERMINATED constant
- Properly deal with EOPNOTSUPP from getsockopt() on ARM
2.1.3 -- 23 Aug 2018
----------------------
- Fixed memory corruption bug in event notification

Просмотреть файл

@ -15,7 +15,7 @@
major=3
minor=0
release=1
release=2
# greek is used for alpha or beta release tags. If it is non-empty,
# it will be appended to the version number. It does not have to be
@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=gitbf30a5f
repo_rev=gite574b10d
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in
@ -44,7 +44,7 @@ tarball_version=
# The date when this release was created
date="Aug 20, 2018"
date="Sep 18, 2018"
# The shared library version of each of PMIx's public libraries.
# These versions are maintained in accordance with the "Library
@ -75,6 +75,6 @@ date="Aug 20, 2018"
# Version numbers are described in the Libtool current:revision:age
# format.
libpmix_so_version=4:1:2
libpmix_so_version=4:2:2
libpmi_so_version=1:0:0
libpmi2_so_version=1:0:0

Просмотреть файл

@ -192,7 +192,7 @@
Summary: An extended/exascale implementation of PMI
Name: %{?_name:%{_name}}%{!?_name:pmix}
Version: 3.0.1
Version: 3.0.2
Release: 1%{?dist}
License: BSD
Group: Development/Libraries

Просмотреть файл

@ -762,6 +762,7 @@ typedef int pmix_status_t;
/* monitoring */
#define PMIX_MONITOR_HEARTBEAT_ALERT (PMIX_ERR_V2X_BASE - 9)
#define PMIX_MONITOR_FILE_ALERT (PMIX_ERR_V2X_BASE - 10)
#define PMIX_PROC_TERMINATED (PMIX_ERR_V2X_BASE - 11)
/* define a starting point for operational error constants so
* we avoid renumbering when making additions */
@ -782,6 +783,7 @@ typedef int pmix_status_t;
#define PMIX_LAUNCHER_READY (PMIX_ERR_OP_BASE - 25)
#define PMIX_OPERATION_IN_PROGRESS (PMIX_ERR_OP_BASE - 26)
#define PMIX_OPERATION_SUCCEEDED (PMIX_ERR_OP_BASE - 27)
/* gap for group codes */
/* define a starting point for system error constants so
@ -951,6 +953,19 @@ typedef uint16_t pmix_iof_channel_t;
#define PMIX_FWD_ALL_CHANNELS 0x00ff
/* declare a convenience macro for checking keys */
#define PMIX_CHECK_KEY(a, b) \
(0 == strncmp((a)->key, (b), PMIX_MAX_KEYLEN))
/* define a convenience macro for checking nspaces */
#define PMIX_CHECK_NSPACE(a, b) \
(0 == strncmp((a), (b), PMIX_MAX_NSLEN))
/* define a convenience macro for checking names */
#define PMIX_CHECK_PROCID(a, b) \
(PMIX_CHECK_NSPACE((a)->nspace, (b)->nspace) && ((a)->rank == (b)->rank || (PMIX_RANK_WILDCARD == (a)->rank || PMIX_RANK_WILDCARD == (b)->rank)))
/**** PMIX BYTE OBJECT ****/
typedef struct pmix_byte_object {
char *bytes;
@ -1456,13 +1471,13 @@ struct pmix_info_t {
(m)->flags = 0; \
pmix_value_load(&((m)->value), (v), (t)); \
} while (0)
#define PMIX_INFO_XFER(d, s) \
do { \
if (NULL != (s)->key) { \
(void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \
} \
(d)->flags = (s)->flags; \
pmix_value_xfer(&(d)->value, &(s)->value); \
#define PMIX_INFO_XFER(d, s) \
do { \
if (NULL != (s)->key) { \
(void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \
} \
(d)->flags = (s)->flags; \
pmix_value_xfer(&(d)->value, (pmix_value_t*)&(s)->value); \
} while(0)
#define PMIX_INFO_REQUIRED(m) \

Просмотреть файл

@ -169,14 +169,9 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer,
PMIX_RELEASE(chain);
goto error;
}
/* check for non-default flag */
for (cnt=0; cnt < (int)ninfo; cnt++) {
if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]);
break;
}
}
}
/* prep the chain for processing */
pmix_prep_event_chain(chain, chain->info, ninfo, false);
pmix_output_verbose(2, pmix_client_globals.base_output,
"[%s:%d] pmix:client_notify_recv - processing event %s, calling errhandler",

Просмотреть файл

@ -39,6 +39,13 @@
#define PMIX_EVENT_ORDER_PREPEND 0x10
#define PMIX_EVENT_ORDER_APPEND 0x20
/* define an internal attribute for marking that the
* server processed an event before passing it up
* to its host in case it comes back down - avoids
* infinite loop */
#define PMIX_SERVER_INTERNAL_NOTIFY "pmix.srvr.internal.notify"
/* define a struct for tracking registration ranges */
typedef struct {
pmix_data_range_t range;
@ -117,8 +124,15 @@ typedef struct pmix_event_chain_t {
bool endchain;
pmix_proc_t source;
pmix_data_range_t range;
/* When generating events, callers can specify
* the range of targets to receive notifications.
*/
pmix_proc_t *targets;
size_t ntargets;
/* the processes that we affected by the event */
pmix_proc_t *affected;
size_t naffected;
/* any info provided by the event generator */
pmix_info_t *info;
size_t ninfo;
size_t nallocated;
@ -130,6 +144,13 @@ typedef struct pmix_event_chain_t {
} pmix_event_chain_t;
PMIX_CLASS_DECLARATION(pmix_event_chain_t);
/* prepare a chain for processing by cycling across provided
* info structs and translating those supported by the event
* system into the chain object*/
pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain,
const pmix_info_t *info, size_t ninfo,
bool xfer);
/* invoke the error handler that is registered against the given
* status, passing it the provided info on the procs that were
* affected, plus any additional info provided by the server */
@ -146,7 +167,7 @@ bool pmix_notify_check_affected(pmix_proc_t *interested, size_t ninterested,
pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
const pmix_proc_t *source,
pmix_data_range_t range,
pmix_info_t info[], size_t ninfo,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata);
void pmix_event_timeout_cb(int fd, short flags, void *arg);

Просмотреть файл

@ -30,7 +30,7 @@
static pmix_status_t notify_server_of_event(pmix_status_t status,
const pmix_proc_t *source,
pmix_data_range_t range,
pmix_info_t info[], size_t ninfo,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata);
/* if we are a client, we call this function to notify the server of
@ -55,13 +55,16 @@ PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status,
if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer) &&
!PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) {
PMIX_RELEASE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_server_globals.event_output,
"pmix_server_notify_event source = %s:%d event_status = %d",
(NULL == source) ? "UNKNOWN" : source->nspace,
(NULL == source) ? PMIX_RANK_WILDCARD : source->rank, status);
rc = pmix_server_notify_client_of_event(status, source, range,
info, ninfo,
cbfunc, cbdata);
pmix_output_verbose(2, pmix_server_globals.event_output,
"pmix_server_notify_event source = %s:%d event_status = %d, rc= %d",
(NULL == source) ? "UNKNOWN" : source->nspace,
(NULL == source) ? PMIX_RANK_WILDCARD : source->rank, status, rc);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
return rc;
}
@ -71,14 +74,17 @@ PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status,
return PMIX_ERR_UNREACH;
}
PMIX_RELEASE_THREAD(&pmix_global_lock);
pmix_output_verbose(2, pmix_client_globals.event_output,
"pmix_client_notify_event source = %s:%d event_status =%d",
(NULL == source) ? pmix_globals.myid.nspace : source->nspace,
(NULL == source) ? pmix_globals.myid.rank : source->rank, status);
rc = notify_server_of_event(status, source, range,
info, ninfo,
cbfunc, cbdata);
pmix_output_verbose(2, pmix_client_globals.event_output,
"pmix_client_notify_event source = %s:%d event_status =%d, rc=%d",
(NULL == source) ? pmix_globals.myid.nspace : source->nspace,
(NULL == source) ? pmix_globals.myid.rank : source->rank, status, rc);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
return rc;
}
@ -106,7 +112,7 @@ static void notify_event_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr,
static pmix_status_t notify_server_of_event(pmix_status_t status,
const pmix_proc_t *source,
pmix_data_range_t range,
pmix_info_t info[], size_t ninfo,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
pmix_status_t rc;
@ -171,14 +177,8 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
/* we always leave space for event hdlr name and a callback object */
chain->nallocated = ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->nallocated);
if (0 < ninfo) {
chain->ninfo = ninfo;
/* need to copy the info */
for (n=0; n < ninfo; n++) {
PMIX_INFO_XFER(&chain->info[n], &info[n]);
}
}
/* prep the chain for processing */
pmix_prep_event_chain(chain, info, ninfo, true);
/* we need to cache this event so we can pass it into
* ourselves should someone later register for it */
@ -195,65 +195,25 @@ static pmix_status_t notify_server_of_event(pmix_status_t status,
if (0 < chain->ninfo) {
cd->ninfo = chain->ninfo;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
cd->nondefault = chain->nondefault;
/* need to copy the info */
for (n=0; n < cd->ninfo; n++) {
PMIX_INFO_XFER(&cd->info[n], &chain->info[n]);
if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
cd->nondefault = true;
chain->nondefault = true;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
/* provides an array of pmix_proc_t identifying the procs
* that are to receive this notification, or a single pmix_proc_t */
if (PMIX_DATA_ARRAY == cd->info[n].value.type &&
NULL != cd->info[n].value.data.darray &&
NULL != cd->info[n].value.data.darray->array) {
cd->ntargets = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t));
} else if (PMIX_PROC == cd->info[n].value.type) {
cd->ntargets = 1;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
} else {
/* this is an error */
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) {
PMIX_PROC_CREATE(cd->affected, 1);
if (NULL == cd->affected) {
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
cd->naffected = 1;
memcpy(cd->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
/* need to do the same for chain so it can be correctly processed */
PMIX_PROC_CREATE(chain->affected, 1);
if (NULL == chain->affected) {
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
chain->naffected = 1;
memcpy(chain->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) {
cd->naffected = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->affected, cd->naffected);
if (NULL == cd->affected) {
cd->naffected = 0;
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
memcpy(cd->affected, cd->info[n].value.data.darray->array, cd->naffected * sizeof(pmix_proc_t));
/* need to do the same for chain so it can be correctly processed */
chain->naffected = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(chain->affected, chain->naffected);
if (NULL == chain->affected) {
chain->naffected = 0;
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
memcpy(chain->affected, cd->info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
}
if (NULL != chain->targets) {
cd->ntargets = chain->ntargets;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
}
if (NULL != chain->affected) {
cd->naffected = chain->naffected;
PMIX_PROC_CREATE(cd->affected, cd->naffected);
if (NULL == cd->affected) {
cd->naffected = 0;
rc = PMIX_ERR_NOMEM;
goto cleanup;
}
memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
}
}
@ -425,7 +385,7 @@ static void progress_local_event_hdlr(pmix_status_t status,
}
while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) {
nxt = (pmix_event_hdlr_t*)item;
if (!pmix_notify_check_range(&nxt->rng, &chain->source) &&
if (!pmix_notify_check_range(&nxt->rng, &chain->source) ||
!pmix_notify_check_affected(nxt->affected, nxt->naffected,
chain->affected, chain->naffected)) {
continue;
@ -623,10 +583,17 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
goto complete;
}
/* check for directives */
for (i=0; i < chain->ninfo; i++) {
if (0 == strncmp(chain->info[i].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = true;
/* if we are not a target, then we can simply ignore this event */
if (NULL != chain->targets) {
found = false;
for (i=0; i < chain->ntargets; i++) {
if (PMIX_CHECK_PROCID(&chain->targets[i], &pmix_globals.myid)) {
found = true;
break;
}
}
if (!found) {
goto complete;
}
}
@ -815,7 +782,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
if (0 < cd->ninfo) {
/* check for caching instructions */
for (n=0; n < cd->ninfo; n++) {
if (0 == strncmp(cd->info[n].key, PMIX_EVENT_DO_NOT_CACHE, PMIX_MAX_KEYLEN)) {
if (PMIX_CHECK_KEY(&cd->info[n], PMIX_EVENT_DO_NOT_CACHE)) {
if (PMIX_INFO_TRUE(&cd->info[n])) {
holdcd = false;
}
@ -837,6 +804,59 @@ static void _notify_client_event(int sd, short args, void *cbdata)
}
}
/* we may also have registered for events, so setup to check this
* against our registrations */
chain = PMIX_NEW(pmix_event_chain_t);
chain->status = cd->status;
(void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
chain->source.rank = cd->source.rank;
/* we always leave space for a callback object and
* the evhandler name. */
chain->nallocated = cd->ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->nallocated);
/* prep the chain for processing */
pmix_prep_event_chain(chain, cd->info, cd->ninfo, true);
if (0 < cd->ninfo) {
/* copy setup to the cd object */
cd->nondefault = chain->nondefault;
if (NULL != chain->targets) {
cd->ntargets = chain->ntargets;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t));
}
if (NULL != chain->affected) {
cd->naffected = chain->naffected;
PMIX_PROC_CREATE(cd->affected, cd->naffected);
if (NULL == cd->affected) {
cd->naffected = 0;
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_ERR_NOMEM, cd->cbdata);
}
PMIX_RELEASE(cd);
PMIX_RELEASE(chain);
return;
}
memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t));
}
}
/* if they provided a PMIX_EVENT_CUSTOM_RANGE info object but
* specified a range other than PMIX_RANGE_CUSTOM, then this
* is an error */
if (PMIX_RANGE_CUSTOM != cd->range && NULL != cd->targets) {
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata);
}
PMIX_RELEASE(cd);
PMIX_RELEASE(chain);
return;
}
holdcd = false;
if (PMIX_RANGE_PROC_LOCAL != cd->range) {
PMIX_CONSTRUCT(&trk, pmix_list_t);
@ -849,8 +869,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
/* if this client was the source of the event, then
* don't send it back as they will have processed it
* when they generated it */
if (0 == strncmp(cd->source.nspace, pr->peer->info->pname.nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pr->peer->info->pname.rank) {
if (PMIX_CHECK_PROCID(&cd->source, &pr->peer->info->pname)) {
continue;
}
/* if we have already notified this client, then don't do it again */
@ -868,11 +887,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
if (NULL != cd->targets) {
matched = false;
for (n=0; n < cd->ntargets; n++) {
if (0 != strncmp(pr->peer->info->pname.nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
pr->peer->info->pname.rank == cd->targets[n].rank) {
if (PMIX_CHECK_PROCID(&pr->peer->info->pname, &cd->targets[n])) {
matched = true;
break;
}
@ -940,9 +955,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
}
}
PMIX_LIST_DESTRUCT(&trk);
if (PMIX_RANGE_LOCAL != cd->range &&
0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pmix_globals.myid.rank) {
if (PMIX_RANGE_LOCAL != cd->range && PMIX_CHECK_PROCID(&cd->source, &pmix_globals.myid)) {
/* if we are the source, then we need to post this upwards as
* well so the host RM can broadcast it as necessary - we rely
* on the host RM to _not_ deliver this back to us! */
@ -953,85 +966,10 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
cd->info, cd->ninfo, local_cbfunc, cd);
}
}
}
/* we may also have registered for events, so be sure to check this
* against our registrations */
chain = PMIX_NEW(pmix_event_chain_t);
chain->status = cd->status;
(void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
chain->source.rank = cd->source.rank;
/* we always leave space for a callback object and
* the evhandler name. */
chain->nallocated = cd->ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->nallocated);
if (0 < cd->ninfo) {
chain->ninfo = cd->ninfo;
/* need to copy the info */
for (n=0; n < cd->ninfo; n++) {
PMIX_INFO_XFER(&chain->info[n], &cd->info[n]);
if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
cd->nondefault = true;
chain->nondefault = true;
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
/* provides an array of pmix_proc_t identifying the procs
* that are to receive this notification, or a single pmix_proc_t */
if (PMIX_DATA_ARRAY == cd->info[n].value.type &&
NULL != cd->info[n].value.data.darray &&
NULL != cd->info[n].value.data.darray->array) {
cd->ntargets = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t));
} else if (PMIX_PROC == cd->info[n].value.type) {
cd->ntargets = 1;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
} else {
/* this is an error */
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
PMIX_RELEASE(chain);
return;
}
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) {
PMIX_PROC_CREATE(cd->affected, 1);
if (NULL == cd->affected) {
PMIX_RELEASE(chain);
return;
}
cd->naffected = 1;
memcpy(cd->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
/* need to do the same for chain so it can be correctly processed */
PMIX_PROC_CREATE(chain->affected, 1);
if (NULL == chain->affected) {
PMIX_RELEASE(chain);
return;
}
chain->naffected = 1;
memcpy(chain->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t));
} else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) {
cd->naffected = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->affected, cd->naffected);
if (NULL == cd->affected) {
cd->naffected = 0;
PMIX_RELEASE(chain);
return;
}
memcpy(cd->affected, cd->info[n].value.data.darray->array, cd->naffected * sizeof(pmix_proc_t));
/* need to do the same for chain so it can be correctly processed */
chain->naffected = cd->info[n].value.data.darray->size;
PMIX_PROC_CREATE(chain->affected, chain->naffected);
if (NULL == chain->affected) {
chain->naffected = 0;
PMIX_RELEASE(chain);
return;
}
memcpy(chain->affected, cd->info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
}
}
}
/* process it */
/* process it ourselves */
pmix_invoke_local_event_hdlr(chain);
if (!holdcd) {
@ -1054,7 +992,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
const pmix_proc_t *source,
pmix_data_range_t range,
pmix_info_t info[], size_t ninfo,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
pmix_notify_caddy_t *cd;
@ -1064,6 +1002,11 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
"pmix_server: notify client of event %s",
PMIx_Error_string(status));
/* check for prior processing */
if (NULL != info && PMIX_CHECK_KEY(&info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY)) {
return PMIX_OPERATION_SUCCEEDED;
}
cd = PMIX_NEW(pmix_notify_caddy_t);
cd->status = status;
if (NULL == source) {
@ -1084,51 +1027,6 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
}
}
/* check for directives */
if (NULL != info) {
for (n=0; n < ninfo; n++) {
if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
cd->nondefault = true;
} else if (0 == strncmp(info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) {
/* provides an array of pmix_proc_t identifying the procs
* that are to receive this notification, or a single pmix_proc_t */
if (PMIX_DATA_ARRAY == info[n].value.type &&
NULL != info[n].value.data.darray &&
NULL != info[n].value.data.darray->array) {
cd->ntargets = info[n].value.data.darray->size;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t));
} else if (PMIX_PROC == info[n].value.type) {
cd->ntargets = 1;
PMIX_PROC_CREATE(cd->targets, cd->ntargets);
memcpy(cd->targets, info[n].value.data.proc, sizeof(pmix_proc_t));
} else {
/* this is an error */
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
}
}
}
/*
* If the range is PMIX_RANGE_NAMESPACE, then they should not have set a
* PMIX_EVENT_CUSTOM_RANGE info object or at least we should ignore it
*/
if (PMIX_RANGE_NAMESPACE == cd->range) {
if (cd->targets) {
PMIX_PROC_FREE(cd->targets, cd->ntargets);
}
PMIX_PROC_CREATE(cd->targets, 1);
cd->ntargets = 1;
cd->targets[0].rank = PMIX_RANK_WILDCARD;
if (NULL == source) {
strncpy(cd->targets[0].nspace, "UNDEF", PMIX_MAX_NSLEN);
} else {
strncpy(cd->targets[0].nspace, source->nspace, PMIX_MAX_NSLEN);
}
}
/* track the eventual callback info */
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
@ -1246,6 +1144,65 @@ void pmix_event_timeout_cb(int fd, short flags, void *arg)
}
}
pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain,
const pmix_info_t *info, size_t ninfo,
bool xfer)
{
size_t n;
if (NULL != info && 0 < ninfo) {
chain->ninfo = ninfo;
if (NULL == chain->info) {
PMIX_INFO_CREATE(chain->info, chain->ninfo);
}
/* need to copy the info */
for (n=0; n < ninfo; n++) {
if (xfer) {
/* chain doesn't already have a copy of the info */
PMIX_INFO_XFER(&chain->info[n], &info[n]);
}
/* look for specific directives */
if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = PMIX_INFO_TRUE(&info[n]);
} else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_CUSTOM_RANGE)) {
/* provides an array of pmix_proc_t identifying the procs
* that are to receive this notification, or a single pmix_proc_t */
if (PMIX_DATA_ARRAY == info[n].value.type &&
NULL != info[n].value.data.darray &&
NULL != info[n].value.data.darray->array) {
chain->ntargets = info[n].value.data.darray->size;
PMIX_PROC_CREATE(chain->targets, chain->ntargets);
memcpy(chain->targets, info[n].value.data.darray->array, chain->ntargets * sizeof(pmix_proc_t));
} else if (PMIX_PROC == info[n].value.type) {
chain->ntargets = 1;
PMIX_PROC_CREATE(chain->targets, chain->ntargets);
memcpy(chain->targets, info[n].value.data.proc, sizeof(pmix_proc_t));
} else {
/* this is an error */
PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
return PMIX_ERR_BAD_PARAM;
}
} else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) {
PMIX_PROC_CREATE(chain->affected, 1);
if (NULL == chain->affected) {
return PMIX_ERR_NOMEM;
}
chain->naffected = 1;
memcpy(chain->affected, info[n].value.data.proc, sizeof(pmix_proc_t));
} else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) {
chain->naffected = info[n].value.data.darray->size;
PMIX_PROC_CREATE(chain->affected, chain->naffected);
if (NULL == chain->affected) {
chain->naffected = 0;
return PMIX_ERR_NOMEM;
}
memcpy(chain->affected, info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
}
}
}
return PMIX_SUCCESS;
}
/**** CLASS INSTANTIATIONS ****/
static void sevcon(pmix_event_hdlr_t *p)
@ -1328,6 +1285,8 @@ static void chcon(pmix_event_chain_t *p)
p->source.rank = PMIX_RANK_UNDEF;
p->nondefault = false;
p->endchain = false;
p->targets = NULL;
p->ntargets = 0;
p->range = PMIX_RANGE_UNDEF;
p->affected = NULL;
p->naffected = 0;
@ -1345,6 +1304,9 @@ static void chdes(pmix_event_chain_t *p)
if (p->timer_active) {
pmix_event_del(&p->ev);
}
if (NULL != p->targets) {
PMIX_PROC_FREE(p->targets, p->ntargets);
}
if (NULL != p->affected) {
PMIX_PROC_FREE(p->affected, p->naffected);
}

Просмотреть файл

@ -6,7 +6,7 @@
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved.
*
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017-2018 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -258,7 +258,9 @@ static pmix_status_t stop(pmix_peer_t *requestor, char *id)
cd = PMIX_NEW(file_caddy_t);
PMIX_RETAIN(requestor);
cd->requestor = requestor;
cd->id = strdup(id);
if (NULL != id) {
cd->id = strdup(id);
}
/* need to push into our event base to add this to our trackers */
pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,

Просмотреть файл

@ -63,6 +63,7 @@ typedef struct {
pmix_data_range_t range;
pmix_info_t *info;
size_t ninfo;
bool stopped;
} pmix_heartbeat_trkr_t;
static void ft_constructor(pmix_heartbeat_trkr_t *ft)
@ -79,6 +80,7 @@ static void ft_constructor(pmix_heartbeat_trkr_t *ft)
ft->range = PMIX_RANGE_NAMESPACE;
ft->info = NULL;
ft->ninfo = 0;
ft->stopped = false;
}
static void ft_destructor(pmix_heartbeat_trkr_t *ft)
{
@ -251,7 +253,9 @@ static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id)
cd = PMIX_NEW(heartbeat_caddy_t);
PMIX_RETAIN(requestor);
cd->requestor = requestor;
cd->id = strdup(id);
if (NULL != id) {
cd->id = strdup(id);
}
/* need to push into our event base to remove this from our trackers */
pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
@ -266,7 +270,7 @@ static void opcbfunc(pmix_status_t status, void *cbdata)
{
pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
PMIX_RELEASE(ft);
PMIX_RELEASE(ft); // maintain accounting
}
/* this function automatically gets periodically called
@ -286,23 +290,25 @@ static void check_heartbeat(int fd, short dummy, void *cbdata)
pmix_globals.myid.nspace, pmix_globals.myid.rank,
ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
if (0 == ft->nbeats) {
if (0 == ft->nbeats && !ft->stopped) {
/* no heartbeat recvd in last window */
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sensor:check_heartbeat failed for proc %s:%d",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank));
/* stop monitoring this client */
pmix_list_remove_item(&mca_psensor_heartbeat_component.trackers, &ft->super);
/* generate an event */
(void)strncpy(source.nspace, ft->requestor->info->pname.nspace, PMIX_MAX_NSLEN);
source.rank = ft->requestor->info->pname.rank;
/* ensure the tracker remains throughout the process */
PMIX_RETAIN(ft);
/* mark that the process appears stopped so we don't
* continue to report it */
ft->stopped = true;
rc = PMIx_Notify_event(PMIX_MONITOR_HEARTBEAT_ALERT, &source,
ft->range, ft->info, ft->ninfo, opcbfunc, ft);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
return;
} else {
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sensor:check_heartbeat detected %d beats for proc %s:%d",
@ -328,6 +334,8 @@ static void add_beat(int sd, short args, void *cbdata)
if (ft->requestor == b->peer) {
/* increment the beat count */
++ft->nbeats;
/* ensure we know that the proc is alive */
ft->stopped = false;
break;
}
}

Просмотреть файл

@ -45,6 +45,7 @@
#include "src/server/pmix_server_ops.h"
#include "src/util/error.h"
#include "src/util/show_help.h"
#include "src/mca/psensor/psensor.h"
#include "src/mca/ptl/base/base.h"
@ -148,6 +149,9 @@ void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err)
}
}
}
/* cleanup any sensors that are monitoring them */
pmix_psensor.stop(peer, NULL);
if (!peer->finalized && !PMIX_PROC_IS_TOOL(peer)) {
/* if this peer already called finalize, then
* we are just seeing their connection go away

Просмотреть файл

@ -1004,7 +1004,7 @@ static pmix_status_t recv_connect_ack(int sd)
/* get the current timeout value so we can reset to it */
sz = sizeof(save);
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
if (ENOPROTOOPT == errno) {
if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
sockopt = false;
} else {
return PMIX_ERR_UNREACH;

Просмотреть файл

@ -393,7 +393,7 @@ static pmix_status_t recv_connect_ack(int sd)
/* get the current timeout value so we can reset to it */
sz = sizeof(save);
if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) {
if (ENOPROTOOPT == errno) {
if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) {
sockopt = false;
} else {
return PMIX_ERR_UNREACH;

Просмотреть файл

@ -1071,6 +1071,7 @@ static void _deregister_client(int sd, short args, void *cbdata)
* for tools, so don't clean them up */
if (!PMIX_PROC_IS_TOOL(peer)) {
pmix_pnet.child_finalized(&cd->proc);
pmix_psensor.stop(peer, NULL);
}
}
if (nptr->nlocalprocs == nptr->nfinalized) {

Просмотреть файл

@ -1933,6 +1933,47 @@ static void local_cbfunc(pmix_status_t status, void *cbdata)
PMIX_RELEASE(cd);
}
static void intermed_step(pmix_status_t status, void *cbdata)
{
pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
pmix_status_t rc;
if (PMIX_SUCCESS != status) {
rc = status;
goto complete;
}
/* check the range directive - if it is LOCAL, then we are
* done. Otherwise, it needs to go up to our
* host for dissemination */
if (PMIX_RANGE_LOCAL == cd->range) {
rc = PMIX_SUCCESS;
goto complete;
}
if (NULL == pmix_host_server.notify_event) {
rc = PMIX_ERR_NOT_SUPPORTED;
goto complete;
}
/* pass it to our host RM for distribution */
rc = pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
cd->info, cd->ninfo, local_cbfunc, cd);
if (PMIX_SUCCESS == rc) {
/* let the callback function respond for us */
return;
}
if (PMIX_OPERATION_SUCCEEDED == rc) {
rc = PMIX_SUCCESS; // local_cbfunc will not be called
}
complete:
if (NULL != cd->cbfunc) {
cd->cbfunc(rc, cd->cbdata);
}
PMIX_RELEASE(cd);
}
pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer,
pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc,
@ -1941,13 +1982,11 @@ pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer,
int32_t cnt;
pmix_status_t rc;
pmix_notify_caddy_t *cd;
size_t ninfo;
pmix_output_verbose(2, pmix_server_globals.event_output,
"recvd event notification from client");
if (NULL == pmix_host_server.notify_event) {
return PMIX_ERR_NOT_SUPPORTED;
}
"%s:%d recvd event notification from client",
pmix_globals.myid.nspace, pmix_globals.myid.rank);
cd = PMIX_NEW(pmix_notify_caddy_t);
if (NULL == cd) {
@ -1977,44 +2016,36 @@ pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer,
/* unpack the info keys */
cnt = 1;
PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE);
PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto exit;
}
if (0 < cd->ninfo) {
PMIX_INFO_CREATE(cd->info, cd->ninfo);
if (NULL == cd->info) {
rc = PMIX_ERR_NOMEM;
goto exit;
}
cnt = cd->ninfo;
cd->ninfo = ninfo + 1;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
if (NULL == cd->info) {
rc = PMIX_ERR_NOMEM;
goto exit;
}
if (0 < ninfo) {
cnt = ninfo;
PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
goto exit;
}
}
/* check the range directive - if it is LOCAL, then we just
* process it ourselves. Otherwise, it needs to go up to our
* host for dissemination */
if (PMIX_RANGE_LOCAL == cd->range) {
if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status,
&cd->source,
cd->range,
cd->info, cd->ninfo,
local_cbfunc, cd))) {
goto exit;
}
return PMIX_SUCCESS;
/* add an info object to mark that we recvd this internally */
PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY, NULL, PMIX_BOOL);
/* process it */
if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status,
&cd->source,
cd->range,
cd->info, cd->ninfo,
intermed_step, cd))) {
goto exit;
}
/* when we receive an event from a client, we just pass it to
* our host RM for distribution - if any targeted recipients
* are local to us, the host RM will let us know */
pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
cd->info, cd->ninfo, local_cbfunc, cd);
/* tell the switchyard we will handle it from here */
return PMIX_SUCCESS;
exit:

Просмотреть файл

@ -162,30 +162,9 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer,
PMIX_RELEASE(chain);
goto error;
}
/* check for directives */
for (cnt=0; cnt < (int)ninfo; cnt++) {
if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) {
chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]);
} else if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) {
PMIX_PROC_CREATE(chain->affected, 1);
if (NULL == chain->affected) {
PMIX_RELEASE(chain);
goto error;
}
chain->naffected = 1;
memcpy(chain->affected, chain->info[cnt].value.data.proc, sizeof(pmix_proc_t));
} else if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) {
chain->naffected = chain->info[cnt].value.data.darray->size;
PMIX_PROC_CREATE(chain->affected, chain->naffected);
if (NULL == chain->affected) {
chain->naffected = 0;
PMIX_RELEASE(chain);
goto error;
}
memcpy(chain->affected, chain->info[cnt].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t));
}
}
}
/* prep the chain for processing */
pmix_prep_event_chain(chain, chain->info, ninfo, false);
pmix_output_verbose(2, pmix_client_globals.event_output,
"[%s:%d] pmix:tool_notify_recv - processing event %s from source %s:%d, calling errhandler",

Просмотреть файл

@ -202,7 +202,8 @@ PMIX_EXPORT const char* PMIx_Error_string(pmix_status_t errnum)
return "LAUNCHER-READY";
case PMIX_OPERATION_IN_PROGRESS:
return "OPERATION-IN-PROGRESS";
case PMIX_PROC_TERMINATED:
return "PROC-TERMINATED";
case PMIX_ERR_NODE_DOWN:
return "NODE-DOWN";

Просмотреть файл

@ -15,6 +15,8 @@
* All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 University of Houston. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -54,9 +56,6 @@
#ifdef HAVE_SYS_STATVFS_H
#include <sys/statvfs.h>
#endif
#ifdef HAVE_SYS_MOUNT_H
#include <sys/mount.h>
#endif
#ifdef HAVE_MNTENT_H
#include <mntent.h>
#endif

Просмотреть файл

@ -317,6 +317,17 @@ static void model_registration_callback(pmix_status_t status,
DEBUG_WAKEUP_THREAD(lock);
}
static void set_handler_default(int sig)
{
struct sigaction act;
act.sa_handler = SIG_DFL;
act.sa_flags = 0;
sigemptyset(&act.sa_mask);
sigaction(sig, &act, (struct sigaction *)0);
}
int main(int argc, char **argv)
{
char **client_env=NULL;
@ -570,14 +581,22 @@ int main(int argc, char **argv)
PMIx_server_finalize();
return -1;
}
child = PMIX_NEW(wait_tracker_t);
child->pid = pid;
pmix_list_append(&children, &child->super);
if (pid == 0) {
sigset_t sigs;
set_handler_default(SIGTERM);
set_handler_default(SIGINT);
set_handler_default(SIGHUP);
set_handler_default(SIGPIPE);
set_handler_default(SIGCHLD);
sigprocmask(0, 0, &sigs);
sigprocmask(SIG_UNBLOCK, &sigs, 0);
execve(executable, client_argv, client_env);
/* Does not return */
exit(0);
} else {
child = PMIX_NEW(wait_tracker_t);
child->pid = pid;
pmix_list_append(&children, &child->super);
}
}
free(executable);
@ -596,8 +615,7 @@ int main(int argc, char **argv)
n=0;
PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
if (0 != child->exit_code) {
fprintf(stderr, "Child %d exited with status %d - test FAILED\n", n, child->exit_code);
goto done;
fprintf(stderr, "Child %d [%d] exited with status %d - test FAILED\n", n, child->pid, child->exit_code);
}
++n;
}
@ -1024,7 +1042,8 @@ static pmix_status_t notify_event(pmix_status_t code,
pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return PMIX_SUCCESS;
pmix_output(0, "SERVER: NOTIFY EVENT");
return PMIX_OPERATION_SUCCEEDED;
}
typedef struct query_data_t {
@ -1147,8 +1166,9 @@ static void wait_signal_callback(int fd, short event, void *arg)
exit_code = status;
}
--wakeup;
break;
return;
}
}
}
fprintf(stderr, "ENDLOOP\n");
}