1
1

Update to support server self-notifications

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-05-05 09:16:02 -07:00
родитель ef0e0171c9
Коммит 0afcb1a448
2 изменённых файлов: 96 добавлений и 42 удалений

Просмотреть файл

@ -659,6 +659,15 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
return;
}
static void local_cbfunc(pmix_status_t status, void *cbdata)
{
pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata;
if (NULL != cd->cbfunc) {
cd->cbfunc(status, cd->cbdata);
}
PMIX_RELEASE(cd);
}
static void _notify_client_event(int sd, short args, void *cbdata)
{
@ -666,8 +675,9 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_notify_caddy_t *rbout;
pmix_regevents_info_t *reginfoptr;
pmix_peer_events_info_t *pr;
pmix_event_chain_t *chain;
size_t n;
bool matched;
bool matched, holdcd;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: _notify_error notifying clients of error %s",
@ -685,51 +695,95 @@ static void _notify_client_event(int sd, short args, void *cbdata)
PMIX_RELEASE(rbout);
}
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
cd->status == reginfoptr->code) {
PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
/* if this client was the source of the event, then
* don't send it back as they will have processed it
* when they generated it */
if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pr->peer->info->rank) {
continue;
}
/* if we were given specific targets, check if this is one */
if (NULL != cd->targets) {
matched = false;
for (n=0; n < cd->ntargets; n++) {
if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
pr->peer->info->rank == cd->targets[n].rank) {
matched = true;
break;
}
}
if (!matched) {
/* do not notify this one */
holdcd = false;
if (PMIX_RANGE_PROC_LOCAL != cd->range) {
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
cd->status == reginfoptr->code) {
PMIX_LIST_FOREACH(pr, &reginfoptr->peers, pmix_peer_events_info_t) {
/* if this client was the source of the event, then
* don't send it back as they will have processed it
* when they generated it */
if (0 == strncmp(cd->source.nspace, pr->peer->info->nptr->nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pr->peer->info->rank) {
continue;
}
/* if we were given specific targets, check if this is one */
if (NULL != cd->targets) {
matched = false;
for (n=0; n < cd->ntargets; n++) {
if (0 != strncmp(pr->peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
pr->peer->info->rank == cd->targets[n].rank) {
matched = true;
break;
}
}
if (!matched) {
/* do not notify this one */
continue;
}
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%d",
pr->peer->info->nptr->nspace, pr->peer->info->rank);
PMIX_RETAIN(cd->buf);
PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf);
}
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%d",
pr->peer->info->nptr->nspace, pr->peer->info->rank);
PMIX_RETAIN(cd->buf);
PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf);
}
}
if (PMIX_RANGE_LOCAL != cd->range &&
0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) &&
cd->source.rank == pmix_globals.myid.rank) {
/* if we are the source, then we need to post this upwards as
* well so the host RM can broadcast it as necessary - we rely
* on the host RM to _not_ deliver this back to us! */
if (NULL != pmix_host_server.notify_event) {
/* mark that we sent it upstairs so we don't release
* the caddy until we return from the host RM */
holdcd = true;
pmix_host_server.notify_event(cd->status, &cd->source, cd->range,
cd->info, cd->ninfo, local_cbfunc, cd);
}
}
}
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
/* we may also have registered for events, so be sure to check this
* against our registrations */
chain = PMIX_NEW(pmix_event_chain_t);
chain->status = cd->status;
(void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN);
chain->source.rank = cd->source.rank;
/* we always leave space for a callback object and
* the evhandler name. */
chain->ninfo = cd->ninfo + 2;
PMIX_INFO_CREATE(chain->info, chain->ninfo);
if (0 < cd->ninfo) {
/* need to copy the info */
for (n=0; n < cd->ninfo; n++) {
PMIX_INFO_XFER(&chain->info[n], &cd->info[n]);
}
}
/* put the evhandler name tag in the next-to-last element - we
* will fill it in as each handler is called */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-2], PMIX_EVENT_HDLR_NAME, NULL, PMIX_STRING);
/* now put the callback object tag in the last element */
PMIX_INFO_LOAD(&chain->info[chain->ninfo-1], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER);
/* process it */
pmix_invoke_local_event_hdlr(chain);
if (!holdcd) {
/* notify the caller */
if (NULL != cd->cbfunc) {
cd->cbfunc(PMIX_SUCCESS, cd->cbdata);
}
PMIX_RELEASE(cd);
}
PMIX_RELEASE(cd);
}

Просмотреть файл

@ -30,12 +30,12 @@ static void notification_fn(size_t evhdlr_registration_id,
}
/* this is an event notification function that we explicitly request
* be called when the PMIX_ERR_JOB_TERMINATED notification is issued.
* be called when the PMIX_MODEL_DECLARED notification is issued.
* We could catch it in the general event notification function and test
* the status to see if it was "job terminated", but it often is simpler
* the status to see if the status matched, but it often is simpler
* to declare a use-specific notification callback point. In this case,
* we are asking to know whenever a job terminates, and we will then
* know we can exit */
* we are asking to know whenever a programming model library is
* instantiated */
static void model_callback(size_t evhdlr_registration_id,
pmix_status_t status,
const pmix_proc_t *source,