From b83fc41d548504cde6ca0916875741eeffc54b66 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 2 Jul 2012 22:16:32 +0000 Subject: [PATCH] Add a state that allows mpirun or other tools to be notified of a job completion prior to terminating so that alternative actions can be performed. This commit was SVN r26716. --- orte/mca/plm/plm_types.h | 2 + orte/mca/state/hnp/state_hnp.c | 67 ++++++++++++++++++++++++++++------ orte/util/error_strings.c | 4 ++ 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index d536509f12..99da8e4540 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -121,6 +121,8 @@ typedef int32_t orte_job_state_t; #define ORTE_JOB_STATE_TERMINATED 21 /* all processes have terminated and job is no longer running */ #define ORTE_JOB_STATE_ALL_JOBS_COMPLETE 22 #define ORTE_JOB_STATE_DAEMONS_TERMINATED 23 +#define ORTE_JOB_STATE_NOTIFY_COMPLETED 24 /* callback to notify when job completes */ +#define ORTE_JOB_STATE_NOTIFIED 25 /* Define a boundary so we can easily and quickly determine * if a job abnormally terminated - leave a little room diff --git a/orte/mca/state/hnp/state_hnp.c b/orte/mca/state/hnp/state_hnp.c index 858a55530d..61913046b4 100644 --- a/orte/mca/state/hnp/state_hnp.c +++ b/orte/mca/state/hnp/state_hnp.c @@ -79,6 +79,7 @@ static void local_launch_complete(int fd, short argc, void *cbdata) static void track_procs(int fd, short argc, void *cbdata); static void check_all_complete(int fd, short argc, void *cbdata); static void report_progress(int fd, short argc, void *cbdata); +static void cleanup_job(int fd, short argc, void *cbdata); /* defined default state machine sequence - individual * plm's must add a state for launching daemons @@ -97,6 +98,7 @@ static orte_job_state_t launch_states[] = { ORTE_JOB_STATE_REGISTERED, /* termination states */ ORTE_JOB_STATE_TERMINATED, + ORTE_JOB_STATE_NOTIFY_COMPLETED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE, ORTE_JOB_STATE_DAEMONS_TERMINATED }; @@ -113,6 +115,7 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_post_launch, orte_plm_base_registered, check_all_complete, + cleanup_job, orte_quit, orte_quit }; @@ -341,6 +344,23 @@ static void track_procs(int fd, short argc, void *cbdata) OBJ_RELEASE(caddy); } +static void cleanup_job(int fd, short argc, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:hnp:cleanup on job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); + + /* flag that we were notified */ + jdata->state = ORTE_JOB_STATE_NOTIFIED; + /* send us back thru job complete */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + OBJ_RELEASE(caddy); +} + static void check_all_complete(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; @@ -361,12 +381,19 @@ static void check_all_complete(int fd, short args, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); - if (NULL == jdata) { + if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { /* just check to see if the daemons are complete */ OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_complete - received NULL job, checking daemons", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); goto CHECK_DAEMONS; + } else { + /* mark the job as terminated, but don't override any + * abnormal termination flags + */ + if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) { + jdata->state = ORTE_JOB_STATE_TERMINATED; + } } /* turn off any sensor monitors on this job */ @@ -480,7 +507,7 @@ static void check_all_complete(int fd, short args, void *cbdata) } CHECK_ALIVE: - /* now check to see if all jobs are done - release this jdata + /* now check to see if all jobs are done - trigger notification of this jdata * object when we find it */ one_still_alive = false; @@ -493,6 +520,9 @@ static void check_all_complete(int fd, short args, void *cbdata) continue; } /* if this is the job we are checking AND it normally terminated, + * then activate the "notify_completed" state - this will release + * the job state, but is provided so that the HNP main code can + * take alternative actions if desired. If the state is killed_by_cmd, * then go ahead and release it. We cannot release it if it * abnormally terminated as mpirun needs the info so it can * report appropriately to the user @@ -500,16 +530,29 @@ static void check_all_complete(int fd, short args, void *cbdata) * NOTE: do not release the primary job (j=1) so we * can pretty-print completion message */ - if (NULL != jdata && job->jobid == jdata->jobid && - (jdata->state == ORTE_JOB_STATE_TERMINATED || - jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD)) { - /* release this object, ensuring that the - * pointer array internal accounting - * is maintained! - */ - if (1 < j) { - opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ - OBJ_RELEASE(jdata); + if (NULL != jdata && job->jobid == jdata->jobid) { + opal_output(0, "CHECKING JOB %s", ORTE_JOBID_PRINT(jdata->jobid)); + if (jdata->state == ORTE_JOB_STATE_TERMINATED) { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:hnp:check_job_completed state is terminated - activating notify", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED); + one_still_alive = true; + } else if (jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD || + jdata->state == ORTE_JOB_STATE_NOTIFIED) { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:hnp:check_job_completed state is killed or notified - cleaning up", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* release this object, ensuring that the + * pointer array internal accounting + * is maintained! + */ + if (1 < j) { + opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ + OBJ_RELEASE(jdata); + } + } else { + opal_output(0, "STATE WAS %s", orte_job_state_to_str(jdata->state)); } continue; } diff --git a/orte/util/error_strings.c b/orte/util/error_strings.c index 26c75db9a2..9af759466e 100644 --- a/orte/util/error_strings.c +++ b/orte/util/error_strings.c @@ -233,6 +233,10 @@ const char *orte_job_state_to_str(orte_job_state_t state) return "UNTERMINATED"; case ORTE_JOB_STATE_TERMINATED: return "NORMALLY TERMINATED"; + case ORTE_JOB_STATE_NOTIFY_COMPLETED: + return "NOTIFY COMPLETED"; + case ORTE_JOB_STATE_NOTIFIED: + return "NOTIFIED"; case ORTE_JOB_STATE_ALL_JOBS_COMPLETE: return "ALL JOBS COMPLETE"; case ORTE_JOB_STATE_ERROR: