From 256b5adac56595091e3b2ab36989b30add51583f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 19 Dec 2016 00:34:27 -0800 Subject: [PATCH] Transfer across final fixes from debugger attach work Signed-off-by: Ralph Castain --- .../pmix/src/event/pmix_event_notification.c | 8 +- opal/mca/pmix/pmix2x/pmix2x.c | 34 +++- opal/mca/pmix/pmix2x/pmix2x_server_north.c | 15 +- opal/mca/pmix/pmix2x/pmix2x_server_south.c | 1 + orte/mca/state/base/state_base_fns.c | 160 +++++++++--------- 5 files changed, 132 insertions(+), 86 deletions(-) diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index d3646e72e5..0d7702bcea 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -501,8 +501,9 @@ static void _notify_client_event(int sd, short args, void *cbdata) } } pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server: notifying client %s:%d", - pr->peer->info->nptr->nspace, pr->peer->info->rank); + "pmix_server: notifying client %s:%d of status %s", + pr->peer->info->nptr->nspace, pr->peer->info->rank, + PMIx_Error_string(cd->status)); PMIX_RETAIN(cd->buf); PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf); } @@ -555,7 +556,7 @@ static pmix_status_t notify_client_of_event(pmix_status_t status, for (n=0; n < ninfo; n++) { if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { cd->nondefault = true; - } else if (strncmp(info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { + } else if (0 == strncmp(info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { /* provides an array of pmix_proc_t identifying the procs * that are to receive this notification */ if (PMIX_DATA_ARRAY == info[n].value.type && @@ -570,6 +571,7 @@ static pmix_status_t notify_client_of_event(pmix_status_t status, memcpy(cd->targets, info[n].value.data.proc, sizeof(pmix_proc_t)); } else { /* this is an error */ + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); return PMIX_ERR_BAD_PARAM; } } diff --git a/opal/mca/pmix/pmix2x/pmix2x.c b/opal/mca/pmix/pmix2x/pmix2x.c index 10533c4f40..0481aceb2c 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.c +++ b/opal/mca/pmix/pmix2x/pmix2x.c @@ -733,6 +733,9 @@ pmix_persistence_t pmix2x_convert_opalpersist(opal_pmix_persistence_t persist) void pmix2x_value_load(pmix_value_t *v, opal_value_t *kv) { + opal_pmix2x_jobid_trkr_t *job; + bool found; + switch(kv->type) { case OPAL_UNDEF: v->type = PMIX_UNDEF; @@ -829,7 +832,18 @@ void pmix2x_value_load(pmix_value_t *v, v->type = PMIX_PROC; /* have to stringify the jobid */ PMIX_PROC_CREATE(v->data.proc, 1); - (void)opal_snprintf_jobid(v->data.proc->nspace, PMIX_MAX_NSLEN, kv->data.name.vpid); + /* see if this job is in our list of known nspaces */ + found = false; + OPAL_LIST_FOREACH(job, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (job->jobid == kv->data.name.jobid) { + (void)strncpy(v->data.proc->nspace, job->nspace, PMIX_MAX_NSLEN); + found = true; + break; + } + } + if (!found) { + (void)opal_snprintf_jobid(v->data.proc->nspace, PMIX_MAX_NSLEN, kv->data.name.vpid); + } v->data.proc->rank = pmix2x_convert_opalrank(kv->data.name.vpid); break; case OPAL_BYTE_OBJECT: @@ -875,7 +889,8 @@ int pmix2x_value_unload(opal_value_t *kv, const pmix_value_t *v) { int rc=OPAL_SUCCESS; - + bool found; + opal_pmix2x_jobid_trkr_t *job; switch(v->type) { case PMIX_UNDEF: @@ -969,8 +984,19 @@ int pmix2x_value_unload(opal_value_t *kv, break; case PMIX_PROC: kv->type = OPAL_NAME; - if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&kv->data.name.jobid, v->data.proc->nspace))) { - return pmix2x_convert_opalrc(rc); + /* see if this job is in our list of known nspaces */ + found = false; + OPAL_LIST_FOREACH(job, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) { + if (0 == strncmp(job->nspace, v->data.proc->nspace, PMIX_MAX_NSLEN)) { + kv->data.name.jobid = job->jobid; + found = true; + break; + } + } + if (!found) { + if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&kv->data.name.jobid, v->data.proc->nspace))) { + return pmix2x_convert_opalrc(rc); + } } kv->data.name.vpid = pmix2x_convert_rank(v->data.proc->rank); break; diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_north.c b/opal/mca/pmix/pmix2x/pmix2x_server_north.c index 10d62c2847..743953b509 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_north.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_north.c @@ -920,13 +920,22 @@ static void toolcbfunc(int status, pmix2x_opalcaddy_t *opalcaddy = (pmix2x_opalcaddy_t*)cbdata; pmix_status_t rc; pmix_proc_t p; + opal_pmix2x_jobid_trkr_t *job; /* convert the status */ rc = pmix2x_convert_opalrc(status); - /* convert the process name */ - (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc.jobid); - p.rank = pmix2x_convert_opalrank(proc.vpid); + memset(&p, 0, sizeof(pmix_proc_t)); + if (OPAL_SUCCESS == status) { + /* convert the process name */ + (void)opal_snprintf_jobid(p.nspace, PMIX_MAX_NSLEN, proc.jobid); + p.rank = pmix2x_convert_opalrank(proc.vpid); + /* store this job in our list of known nspaces */ + job = OBJ_NEW(opal_pmix2x_jobid_trkr_t); + (void)strncpy(job->nspace, p.nspace, PMIX_MAX_NSLEN); + job->jobid = proc.jobid; + opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super); + } /* pass it down */ if (NULL != opalcaddy->toolcbfunc) { diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_south.c b/opal/mca/pmix/pmix2x/pmix2x_server_south.c index 6256eb560e..f18dadb49f 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_south.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_south.c @@ -499,6 +499,7 @@ int pmix2x_server_notify_event(int status, OPAL_LIST_FOREACH(kv, info, opal_value_t) { (void)strncpy(pinfo[n].key, kv->key, PMIX_MAX_KEYLEN); pmix2x_value_load(&pinfo[n].value, kv); + ++n; } } else { sz = 0; diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 98a3b8df13..3cdf7c5059 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -459,74 +459,25 @@ void orte_state_base_report_progress(int fd, short argc, void *cbdata) OBJ_RELEASE(caddy); } -static void _send_notification(int status, orte_process_name_t *proc) -{ - opal_buffer_t buf; - orte_grpcomm_signature_t sig; - int rc; - opal_value_t kv, *kvptr; - - OBJ_CONSTRUCT(&buf, opal_buffer_t); - - /* pack the status */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &status, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&buf); - return; - } - - /* the source is me */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&buf); - return; - } - - /* pass along the affected proc (one opal_value_t) */ - rc = 1; - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rc, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&buf); - return; - } - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC); - kv.type = OPAL_NAME; - kv.data.name.jobid = proc->jobid; - kv.data.name.vpid = proc->vpid; - kvptr = &kv; - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &kvptr, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&kv); - OBJ_DESTRUCT(&buf); - return; - } - OBJ_DESTRUCT(&kv); - - - /* xcast it to everyone */ - OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t); - sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); - sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid; - sig.signature[0].vpid = ORTE_VPID_WILDCARD; - sig.sz = 1; - - if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, &buf))) { - ORTE_ERROR_LOG(rc); - } - OBJ_DESTRUCT(&sig); - OBJ_DESTRUCT(&buf); -} - -static void _send_direct_notify(int status, orte_process_name_t *proc) +static void _send_notification(int status, + orte_process_name_t *proc, + orte_process_name_t *target) { opal_buffer_t *buf; + orte_grpcomm_signature_t sig; int rc; opal_value_t kv, *kvptr; orte_process_name_t daemon; buf = OBJ_NEW(opal_buffer_t); + opal_output_verbose(5, orte_state_base_framework.framework_output, + "%s state:base:sending notification %s proc %s target %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_ERROR_NAME(status), + ORTE_NAME_PRINT(proc), + ORTE_NAME_PRINT(target)); + /* pack the status */ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &status, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); @@ -541,15 +492,17 @@ static void _send_direct_notify(int status, orte_process_name_t *proc) return; } - /* pass along the proc to be notified (one opal_value_t) */ - rc = 1; + /* we are going to pass three opal_value_t's */ + rc = 3; if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rc, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); return; } + + /* pass along the affected proc(s) */ OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE); + kv.key = strdup(OPAL_PMIX_EVENT_AFFECTED_PROC); kv.type = OPAL_NAME; kv.data.name.jobid = proc->jobid; kv.data.name.vpid = proc->vpid; @@ -562,17 +515,66 @@ static void _send_direct_notify(int status, orte_process_name_t *proc) } OBJ_DESTRUCT(&kv); - - /* get the daemon hosting the proc to be notified */ - daemon.jobid = ORTE_PROC_MY_NAME->jobid; - daemon.vpid = orte_get_proc_daemon_vpid(proc); - /* send the notification to that daemon */ - if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, - &daemon, buf, - ORTE_RML_TAG_NOTIFICATION, - orte_rml_send_callback, NULL))) { + /* pass along the proc(s) to be notified */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_PMIX_EVENT_CUSTOM_RANGE); + kv.type = OPAL_NAME; + kv.data.name.jobid = target->jobid; + kv.data.name.vpid = target->vpid; + kvptr = &kv; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); OBJ_RELEASE(buf); + return; + } + OBJ_DESTRUCT(&kv); + + /* mark this as intended for non-default event handlers */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); + kv.type = OPAL_BOOL; + kv.data.flag = true; + kvptr = &kv; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kvptr, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&kv); + OBJ_RELEASE(buf); + return; + } + OBJ_DESTRUCT(&kv); + + /* if the targets are a wildcard, then xcast it to everyone */ + if (ORTE_VPID_WILDCARD == target->vpid) { + OBJ_CONSTRUCT(&sig, orte_grpcomm_signature_t); + sig.signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); + sig.signature[0].jobid = ORTE_PROC_MY_NAME->jobid; + sig.signature[0].vpid = ORTE_VPID_WILDCARD; + sig.sz = 1; + + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(&sig, ORTE_RML_TAG_NOTIFICATION, buf))) { + ORTE_ERROR_LOG(rc); + } + OBJ_DESTRUCT(&sig); + OBJ_RELEASE(buf); + } else { + /* get the daemon hosting the proc to be notified */ + daemon.jobid = ORTE_PROC_MY_NAME->jobid; + daemon.vpid = orte_get_proc_daemon_vpid(target); + /* send the notification to that daemon */ + opal_output_verbose(5, orte_state_base_framework.framework_output, + "%s state:base:sending notification %s to proc %s at daemon %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_ERROR_NAME(status), + ORTE_NAME_PRINT(target), + ORTE_NAME_PRINT(&daemon)); + if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, + &daemon, buf, + ORTE_RML_TAG_NOTIFICATION, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + } } } @@ -585,7 +587,7 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata) orte_proc_t *pdata; int i; char *rtmod; - orte_process_name_t parent, *npptr; + orte_process_name_t parent, target, *npptr; opal_output_verbose(5, orte_state_base_framework.framework_output, "%s state:base:track_procs called for proc %s state %s", @@ -699,15 +701,21 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata) npptr = &parent; if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&npptr, OPAL_NAME)) { /* notify everyone who asked for it */ - _send_direct_notify(OPAL_ERR_JOB_TERMINATED, ORTE_NAME_WILDCARD); + target.jobid = jdata->jobid; + target.vpid = ORTE_VPID_WILDCARD; + _send_notification(OPAL_ERR_JOB_TERMINATED, &target, ORTE_NAME_WILDCARD); } else { - _send_direct_notify(OPAL_ERR_JOB_TERMINATED, &parent); + target.jobid = jdata->jobid; + target.vpid = ORTE_VPID_WILDCARD; + _send_notification(OPAL_ERR_JOB_TERMINATED, &target, &parent); } } } else if (ORTE_PROC_STATE_TERMINATED < pdata->state && !orte_job_term_ordered) { /* if this was an abnormal term, notify the other procs of the termination */ - _send_notification(OPAL_ERR_PROC_ABORTED, &pdata->name); + parent.jobid = jdata->jobid; + parent.vpid = ORTE_VPID_WILDCARD; + _send_notification(OPAL_ERR_PROC_ABORTED, &pdata->name, &parent); } }