diff --git a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h index cf222422b3..de86c4ae49 100644 --- a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h +++ b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h @@ -244,6 +244,7 @@ typedef uint32_t pmix_rank_t; #define PMIX_HWLOC_XML_V1 "pmix.hwlocxml1" // (char*) XML representation of local topology using HWLOC v1.x format #define PMIX_HWLOC_XML_V2 "pmix.hwlocxml2" // (char*) XML representation of local topology using HWLOC v2.x format + /* request-related info */ #define PMIX_COLLECT_DATA "pmix.collect" // (bool) collect data and return it at the end of the operation #define PMIX_TIMEOUT "pmix.timeout" // (int) time in sec before specified operation should time out (0 => infinite) @@ -263,6 +264,9 @@ typedef uint32_t pmix_rank_t; // not request data from the server if not found #define PMIX_EMBED_BARRIER "pmix.embed.barrier" // (bool) execute a blocking fence operation before executing the // specified operation +#define PMIX_JOB_TERM_STATUS "pmix.job.term.status" // (pmix_status_t) status returned upon job termination +#define PMIX_PROC_STATE_STATUS "pmix.proc.state" // (pmix_proc_state_t) process state + /* attributes used by host server to pass data to the server convenience library - the * data will then be parsed and provided to the local clients */ @@ -274,10 +278,12 @@ typedef uint32_t pmix_rank_t; #define PMIX_APP_MAP_TYPE "pmix.apmap.type" // (char*) type of mapping used to layout the application (e.g., cyclic) #define PMIX_APP_MAP_REGEX "pmix.apmap.regex" // (char*) regex describing the result of the mapping + /* attributes used internally to communicate data from the server to the client */ #define PMIX_PROC_BLOB "pmix.pblob" // (pmix_byte_object_t) packed blob of process data #define PMIX_MAP_BLOB "pmix.mblob" // (pmix_byte_object_t) packed blob of process location + /* event handler registration and notification info keys */ #define PMIX_EVENT_HDLR_NAME "pmix.evname" // (char*) string name identifying this handler #define PMIX_EVENT_JOB_LEVEL "pmix.evjob" // (bool) register for job-specific events only @@ -297,6 +303,9 @@ typedef uint32_t pmix_rank_t; #define PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked // NOTE: the object will _only_ be returned to the process that // registered it +#define PMIX_EVENT_DO_NOT_CACHE "pmix.evnocache" // (bool) instruct the PMIx server not to cache the event +#define PMIX_EVENT_SILENT_TERMINATION "pmix.evsilentterm" // (bool) do not generate an event when this job normally terminates + /* fault tolerance-related events */ #define PMIX_EVENT_TERMINATE_SESSION "pmix.evterm.sess" // (bool) RM intends to terminate session diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 2b2e44ab34..ad68c291a1 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -807,16 +807,33 @@ static void _notify_client_event(int sd, short args, void *cbdata) PMIx_Data_range_string(cd->range), cd->nondefault ? "NONDEFAULT" : "OPEN"); - /* we cannot know if everyone who wants this notice has had a chance - * to register for it - the notice may be coming too early. So cache - * the message until all local procs have received it, or it ages to - * the point where it gets pushed out by more recent events */ - PMIX_RETAIN(cd); - rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); + /* check for caching instructions */ + holdcd = true; + if (0 < cd->ninfo) { + /* check for caching instructions */ + for (n=0; n < cd->ninfo; n++) { + if (0 == strncmp(cd->info[n].key, PMIX_EVENT_DO_NOT_CACHE, PMIX_MAX_KEYLEN)) { + if (PMIX_UNDEF == cd->info[n].value.type || + cd->info[n].value.data.flag) { + holdcd = false; + break; + } + } + } + } - /* if an older event was bumped, release it */ - if (NULL != rbout) { - PMIX_RELEASE(rbout); + if (holdcd) { + /* we cannot know if everyone who wants this notice has had a chance + * to register for it - the notice may be coming too early. So cache + * the message until all local procs have received it, or it ages to + * the point where it gets pushed out by more recent events */ + PMIX_RETAIN(cd); + rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd); + + /* if an older event was bumped, release it */ + if (NULL != rbout) { + PMIX_RELEASE(rbout); + } } holdcd = false; @@ -984,8 +1001,15 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, cd->source.rank = source->rank; } cd->range = range; - cd->info = info; - cd->ninfo = ninfo; + /* have to copy the info to preserve it for future when cached */ + if (0 < ninfo) { + cd->ninfo = ninfo; + PMIX_INFO_CREATE(cd->info, cd->ninfo); + /* need to copy the info */ + for (n=0; n < cd->ninfo; n++) { + PMIX_INFO_XFER(&cd->info[n], &info[n]); + } + } /* check for directives */ if (NULL != info) { diff --git a/opal/mca/pmix/pmix2x/pmix2x.c b/opal/mca/pmix/pmix2x/pmix2x.c index 4a3a58fca0..6edf9a320f 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.c +++ b/opal/mca/pmix/pmix2x/pmix2x.c @@ -1155,6 +1155,8 @@ static int notify_event(int status, } op = OBJ_NEW(pmix2x_opcaddy_t); + op->opcbfunc = cbfunc; + op->cbdata = cbdata; /* convert the status */ pstatus = pmix2x_convert_opalrc(status); @@ -1183,7 +1185,14 @@ static int notify_event(int status, n=0; OPAL_LIST_FOREACH(kv, info, opal_value_t) { (void)strncpy(op->info[n].key, kv->key, PMIX_MAX_KEYLEN); - pmix2x_value_load(&op->info[n].value, kv); + /* little dicey here as we need to convert a status, if + * provided, and it will be an int coming down to us */ + if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) { + op->info[n].value.type = PMIX_STATUS; + op->info[n].value.data.status = pmix2x_convert_opalrc(kv->data.integer); + } else { + pmix2x_value_load(&op->info[n].value, kv); + } ++n; } } diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index fb18fa048d..e1104dc9dd 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -192,6 +192,9 @@ BEGIN_C_DECLS // not request data from the server if not found #define OPAL_PMIX_EMBED_BARRIER "pmix.embed.barrier" // (bool) execute a blocking fence operation before executing the // specified operation +#define OPAL_PMIX_JOB_TERM_STATUS "pmix.job.term.status" // (int) status returned upon job termination +#define OPAL_PMIX_PROC_STATE_STATUS "pmix.proc.state" // (int) process state + /* attribute used by host server to pass data to the server convenience library - the @@ -218,6 +221,9 @@ BEGIN_C_DECLS #define OPAL_PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked // NOTE: the object will _only_ be returned to the process that // registered it +#define OPAL_PMIX_EVENT_DO_NOT_CACHE "pmix.evnocache" // (bool) instruct the PMIx server not to cache the event +#define OPAL_PMIX_EVENT_SILENT_TERMINATION "pmix.evsilentterm" // (bool) do not generate an event when this job normally terminates + /* fault tolerance-related events */ #define OPAL_PMIX_EVENT_TERMINATE_SESSION "pmix.evterm.sess" // (bool) RM intends to terminate session diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index cf885bf5f7..b0cc89b3e1 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -1000,7 +1000,7 @@ int get_ofi_prov_id(opal_list_t *attributes) opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - get_ofi_prov_id() -> comparing sockets != %s to choose first available fabric provider", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - providers[i], cur_fi->fabric_attr->prov_name); + cur_fi->fabric_attr->prov_name); if (0 != strcmp("sockets", cur_fi->fabric_attr->prov_name)) { ofi_prov_id = prov_num; opal_output_verbose(20,orte_rml_base_framework.framework_output, diff --git a/orte/mca/state/dvm/state_dvm.c b/orte/mca/state/dvm/state_dvm.c index 6293d4af2f..fd04e9fe7f 100644 --- a/orte/mca/state/dvm/state_dvm.c +++ b/orte/mca/state/dvm/state_dvm.c @@ -50,6 +50,7 @@ static int finalize(void); static void init_complete(int fd, short args, void *cbdata); static void vm_ready(int fd, short args, void *cbata); static void check_complete(int fd, short args, void *cbdata); +static void cleanup_job(int fd, short args, void *cbdata); /****************** * DVM module - used when mpirun is persistent @@ -89,6 +90,7 @@ static orte_job_state_t launch_states[] = { ORTE_JOB_STATE_REGISTERED, /* termination states */ ORTE_JOB_STATE_TERMINATED, + ORTE_JOB_STATE_NOTIFY_COMPLETED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE }; static orte_state_cbfunc_t launch_callbacks[] = { @@ -107,6 +109,7 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_post_launch, orte_plm_base_registered, check_complete, + cleanup_job, orte_quit }; @@ -501,11 +504,24 @@ static void check_complete(int fd, short args, void *cbdata) OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output, "%s state:dvm:check_job_completed state is terminated - activating notify", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL, - OPAL_PMIX_RANGE_GLOBAL, NULL, - NULL, NULL); - opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED); + /* mark the job as notified */ + jdata->state = ORTE_JOB_STATE_NOTIFIED; } OBJ_RELEASE(caddy); } + +static void cleanup_job(int sd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata; + + ORTE_ACQUIRE_OBJECT(caddy); + jdata = caddy->jdata; + + /* remove this object from the job array */ + opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); + + OBJ_RELEASE(caddy); +} diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index e501a39782..c99e9845a4 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -577,7 +577,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, if (NULL == (jdata = orte_get_job_data_object(job))) { /* we can safely ignore this request as the job * was already cleaned up */ - opal_output(0, "NULL JOB"); goto CLEANUP; } @@ -585,7 +584,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, * can ignore this request as we would have already * dealt with it */ if (0 < jdata->num_local_procs) { - opal_output(0, "NO PROCS"); goto CLEANUP; } @@ -622,7 +620,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, OBJ_RELEASE(map); jdata->map = NULL; } - opal_output(0, "CLEANUP COMPLETE"); break; diff --git a/orte/orted/pmix/pmix_server_gen.c b/orte/orted/pmix/pmix_server_gen.c index f67d0a67b7..d91e2aa88f 100644 --- a/orte/orted/pmix/pmix_server_gen.c +++ b/orte/orted/pmix/pmix_server_gen.c @@ -725,6 +725,8 @@ static void _toolconn(int sd, short args, void *cbdata) orte_node_t *node; orte_process_name_t tool; int rc; + opal_value_t *val; + bool flag; ORTE_ACQUIRE_OBJECT(cd); @@ -788,6 +790,22 @@ static void _toolconn(int sd, short args, void *cbdata) proc->app_idx = 0; ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_LOCAL); + /* check for directives */ + if (NULL != cd->info) { + OPAL_LIST_FOREACH(val, cd->info, opal_value_t) { + if (0 == strcmp(val->key, OPAL_PMIX_EVENT_SILENT_TERMINATION)) { + if (OPAL_UNDEF == val->type || val->data.flag) { + flag = true; + orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, + ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL); + } + } + } + } + flag = true; + orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, + ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL); + /* pass back the assigned jobid */ tool.jobid = jdata->jobid; tool.vpid = 0; diff --git a/orte/tools/orte-dvm/orte-dvm.c b/orte/tools/orte-dvm/orte-dvm.c index 3aa6413b40..08dc2319d6 100644 --- a/orte/tools/orte-dvm/orte-dvm.c +++ b/orte/tools/orte-dvm/orte-dvm.c @@ -57,6 +57,7 @@ #include "opal/mca/event/event.h" #include "opal/mca/installdirs/installdirs.h" #include "opal/mca/base/base.h" +#include "opal/mca/pmix/pmix.h" #include "opal/util/argv.h" #include "opal/util/output.h" #include "opal/util/basename.h" @@ -466,17 +467,10 @@ int main(int argc, char *argv[]) exit(orte_exit_status); } -static void send_callback(int status, orte_process_name_t *peer, - opal_buffer_t* buffer, orte_rml_tag_t tag, - void* cbdata) - +static void notify_complete(int status, void *cbdata) { - orte_job_t *jdata = (orte_job_t*)cbdata; - - OBJ_RELEASE(buffer); - /* cleanup the job object */ - opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); - OBJ_RELEASE(jdata); + opal_list_t *info = (opal_list_t*)cbdata; + OPAL_LIST_RELEASE(info); } static void notify_requestor(int sd, short args, void *cbdata) @@ -484,15 +478,13 @@ static void notify_requestor(int sd, short args, void *cbdata) orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_job_t *jdata = caddy->jdata; orte_proc_t *pptr; - int ret, id, *idptr; + int ret; opal_buffer_t *reply; orte_daemon_cmd_flag_t command; orte_grpcomm_signature_t *sig; - -opal_output(0, "NOTIFY JOB COMPLETE"); - - /* notify the requestor */ - reply = OBJ_NEW(opal_buffer_t); + bool notify = true; + opal_list_t *info; + opal_value_t *val; /* see if there was any problem */ if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) { @@ -503,31 +495,37 @@ opal_output(0, "NOTIFY JOB COMPLETE"); } else { ret = 0; } - /* return the completion status */ - opal_dss.pack(reply, &ret, 1, OPAL_INT); - /* pack the jobid to be returned */ - opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID); - - /* return the tracker ID */ - idptr = &id; - if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&idptr, OPAL_INT)) { - /* pack the sender's index to the tracking object */ - opal_dss.pack(reply, idptr, 1, OPAL_INT); + if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) { + notify = false; } - /* if there was a problem, we need to send the requestor more info about what happened */ - if (0 < ret) { - opal_dss.pack(reply, &jdata->state, 1, ORTE_JOB_STATE_T); - opal_dss.pack(reply, &pptr, 1, ORTE_PROC); - opal_dss.pack(reply, &pptr->node, 1, ORTE_NODE); + if (notify) { + info = OBJ_NEW(opal_list_t); + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE); + val->type = OPAL_BOOL; + val->data.flag = true; + opal_list_append(info, &val->super); + /* provide the status */ + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS); + val->type = OPAL_INT; + val->data.integer = ret; + opal_list_append(info, &val->super); + /* if there was a problem, we need to send the requestor more info about what happened */ + if (0 < ret) { + val = OBJ_NEW(opal_value_t); + val->key = strdup(OPAL_PMIX_PROCID); + val->type = OPAL_NAME; + val->data.name = pptr->name; + opal_list_append(info, &val->super); + } + opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL, + OPAL_PMIX_RANGE_GLOBAL, info, + notify_complete, info); } - orte_rml.send_buffer_nb(orte_mgmt_conduit, - &jdata->originator, reply, - ORTE_RML_TAG_NOTIFY_COMPLETE, - send_callback, jdata); - /* now ensure that _all_ daemons know that this job has terminated so even * those that did not participate in it will know to cleanup the resources * they assigned to the job. This is necessary now that the mapping function @@ -545,9 +543,4 @@ opal_output(0, "NOTIFY JOB COMPLETE"); orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply); OBJ_RELEASE(reply); OBJ_RELEASE(sig); - - /* we cannot cleanup the job object as we might - * hit an error during transmission, so clean it - * up in the send callback */ - OBJ_RELEASE(caddy); } diff --git a/orte/tools/prun/prun.c b/orte/tools/prun/prun.c index 5ca703fcd2..38f5555dcd 100644 --- a/orte/tools/prun/prun.c +++ b/orte/tools/prun/prun.c @@ -506,6 +506,15 @@ static void evhandler(size_t evhdlr_registration_id, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { + size_t n; + + if (NULL != info) { + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_JOB_TERM_STATUS, PMIX_MAX_KEYLEN)) { + opal_output(0, "JOB COMPLETED WITH STATUS %s", PMIx_Error_string(info[n].value.data.status)); + } + } + } if (NULL != cbfunc) { cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata); } diff --git a/orte/util/attr.c b/orte/util/attr.c index a2d6ed48a7..19d644bf6a 100644 --- a/orte/util/attr.c +++ b/orte/util/attr.c @@ -288,6 +288,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key) return "ORTE_JOB_INFO_CACHE"; case ORTE_JOB_FULLY_DESCRIBED: return "ORTE_JOB_FULLY_DESCRIBED"; + case ORTE_JOB_SILENT_TERMINATION: + return "ORTE_JOB_SILENT_TERMINATION"; case ORTE_PROC_NOBARRIER: return "PROC-NOBARRIER"; diff --git a/orte/util/attr.h b/orte/util/attr.h index 817581e38b..b1b9b224ea 100644 --- a/orte/util/attr.h +++ b/orte/util/attr.h @@ -144,6 +144,8 @@ typedef uint16_t orte_job_flags_t; #define ORTE_JOB_TRANSPORT_KEY (ORTE_JOB_START_KEY + 51) // string - transport keys assigned to this job #define ORTE_JOB_INFO_CACHE (ORTE_JOB_START_KEY + 52) // opal_list_t - list of opal_value_t to be included in job_info #define ORTE_JOB_FULLY_DESCRIBED (ORTE_JOB_START_KEY + 53) // bool - job is fully described in launch msg +#define ORTE_JOB_SILENT_TERMINATION (ORTE_JOB_START_KEY + 54) // bool - do not generate an event notification when job + // normally terminates #define ORTE_JOB_MAX_KEY 300