Add a state that allows mpirun or other tools to be notified of a job completion prior to terminating so that alternative actions can be performed.
This commit was SVN r26716.
Этот коммит содержится в:
родитель
f96fa5ee2a
Коммит
b83fc41d54
@ -121,6 +121,8 @@ typedef int32_t orte_job_state_t;
|
|||||||
#define ORTE_JOB_STATE_TERMINATED 21 /* all processes have terminated and job is no longer running */
|
#define ORTE_JOB_STATE_TERMINATED 21 /* all processes have terminated and job is no longer running */
|
||||||
#define ORTE_JOB_STATE_ALL_JOBS_COMPLETE 22
|
#define ORTE_JOB_STATE_ALL_JOBS_COMPLETE 22
|
||||||
#define ORTE_JOB_STATE_DAEMONS_TERMINATED 23
|
#define ORTE_JOB_STATE_DAEMONS_TERMINATED 23
|
||||||
|
#define ORTE_JOB_STATE_NOTIFY_COMPLETED 24 /* callback to notify when job completes */
|
||||||
|
#define ORTE_JOB_STATE_NOTIFIED 25
|
||||||
|
|
||||||
/* Define a boundary so we can easily and quickly determine
|
/* Define a boundary so we can easily and quickly determine
|
||||||
* if a job abnormally terminated - leave a little room
|
* if a job abnormally terminated - leave a little room
|
||||||
|
@ -79,6 +79,7 @@ static void local_launch_complete(int fd, short argc, void *cbdata)
|
|||||||
static void track_procs(int fd, short argc, void *cbdata);
|
static void track_procs(int fd, short argc, void *cbdata);
|
||||||
static void check_all_complete(int fd, short argc, void *cbdata);
|
static void check_all_complete(int fd, short argc, void *cbdata);
|
||||||
static void report_progress(int fd, short argc, void *cbdata);
|
static void report_progress(int fd, short argc, void *cbdata);
|
||||||
|
static void cleanup_job(int fd, short argc, void *cbdata);
|
||||||
|
|
||||||
/* defined default state machine sequence - individual
|
/* defined default state machine sequence - individual
|
||||||
* plm's must add a state for launching daemons
|
* plm's must add a state for launching daemons
|
||||||
@ -97,6 +98,7 @@ static orte_job_state_t launch_states[] = {
|
|||||||
ORTE_JOB_STATE_REGISTERED,
|
ORTE_JOB_STATE_REGISTERED,
|
||||||
/* termination states */
|
/* termination states */
|
||||||
ORTE_JOB_STATE_TERMINATED,
|
ORTE_JOB_STATE_TERMINATED,
|
||||||
|
ORTE_JOB_STATE_NOTIFY_COMPLETED,
|
||||||
ORTE_JOB_STATE_ALL_JOBS_COMPLETE,
|
ORTE_JOB_STATE_ALL_JOBS_COMPLETE,
|
||||||
ORTE_JOB_STATE_DAEMONS_TERMINATED
|
ORTE_JOB_STATE_DAEMONS_TERMINATED
|
||||||
};
|
};
|
||||||
@ -113,6 +115,7 @@ static orte_state_cbfunc_t launch_callbacks[] = {
|
|||||||
orte_plm_base_post_launch,
|
orte_plm_base_post_launch,
|
||||||
orte_plm_base_registered,
|
orte_plm_base_registered,
|
||||||
check_all_complete,
|
check_all_complete,
|
||||||
|
cleanup_job,
|
||||||
orte_quit,
|
orte_quit,
|
||||||
orte_quit
|
orte_quit
|
||||||
};
|
};
|
||||||
@ -341,6 +344,23 @@ static void track_procs(int fd, short argc, void *cbdata)
|
|||||||
OBJ_RELEASE(caddy);
|
OBJ_RELEASE(caddy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void cleanup_job(int fd, short argc, void *cbdata)
|
||||||
|
{
|
||||||
|
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
||||||
|
orte_job_t *jdata = caddy->jdata;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
|
||||||
|
"%s state:hnp:cleanup on job %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid)));
|
||||||
|
|
||||||
|
/* flag that we were notified */
|
||||||
|
jdata->state = ORTE_JOB_STATE_NOTIFIED;
|
||||||
|
/* send us back thru job complete */
|
||||||
|
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
|
||||||
|
OBJ_RELEASE(caddy);
|
||||||
|
}
|
||||||
|
|
||||||
static void check_all_complete(int fd, short args, void *cbdata)
|
static void check_all_complete(int fd, short args, void *cbdata)
|
||||||
{
|
{
|
||||||
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
||||||
@ -361,12 +381,19 @@ static void check_all_complete(int fd, short args, void *cbdata)
|
|||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
(NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid)));
|
(NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid)));
|
||||||
|
|
||||||
if (NULL == jdata) {
|
if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||||
/* just check to see if the daemons are complete */
|
/* just check to see if the daemons are complete */
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
|
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
|
||||||
"%s state:hnp:check_job_complete - received NULL job, checking daemons",
|
"%s state:hnp:check_job_complete - received NULL job, checking daemons",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
goto CHECK_DAEMONS;
|
goto CHECK_DAEMONS;
|
||||||
|
} else {
|
||||||
|
/* mark the job as terminated, but don't override any
|
||||||
|
* abnormal termination flags
|
||||||
|
*/
|
||||||
|
if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) {
|
||||||
|
jdata->state = ORTE_JOB_STATE_TERMINATED;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* turn off any sensor monitors on this job */
|
/* turn off any sensor monitors on this job */
|
||||||
@ -480,7 +507,7 @@ static void check_all_complete(int fd, short args, void *cbdata)
|
|||||||
}
|
}
|
||||||
|
|
||||||
CHECK_ALIVE:
|
CHECK_ALIVE:
|
||||||
/* now check to see if all jobs are done - release this jdata
|
/* now check to see if all jobs are done - trigger notification of this jdata
|
||||||
* object when we find it
|
* object when we find it
|
||||||
*/
|
*/
|
||||||
one_still_alive = false;
|
one_still_alive = false;
|
||||||
@ -493,6 +520,9 @@ static void check_all_complete(int fd, short args, void *cbdata)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/* if this is the job we are checking AND it normally terminated,
|
/* if this is the job we are checking AND it normally terminated,
|
||||||
|
* then activate the "notify_completed" state - this will release
|
||||||
|
* the job state, but is provided so that the HNP main code can
|
||||||
|
* take alternative actions if desired. If the state is killed_by_cmd,
|
||||||
* then go ahead and release it. We cannot release it if it
|
* then go ahead and release it. We cannot release it if it
|
||||||
* abnormally terminated as mpirun needs the info so it can
|
* abnormally terminated as mpirun needs the info so it can
|
||||||
* report appropriately to the user
|
* report appropriately to the user
|
||||||
@ -500,16 +530,29 @@ static void check_all_complete(int fd, short args, void *cbdata)
|
|||||||
* NOTE: do not release the primary job (j=1) so we
|
* NOTE: do not release the primary job (j=1) so we
|
||||||
* can pretty-print completion message
|
* can pretty-print completion message
|
||||||
*/
|
*/
|
||||||
if (NULL != jdata && job->jobid == jdata->jobid &&
|
if (NULL != jdata && job->jobid == jdata->jobid) {
|
||||||
(jdata->state == ORTE_JOB_STATE_TERMINATED ||
|
opal_output(0, "CHECKING JOB %s", ORTE_JOBID_PRINT(jdata->jobid));
|
||||||
jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD)) {
|
if (jdata->state == ORTE_JOB_STATE_TERMINATED) {
|
||||||
/* release this object, ensuring that the
|
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
|
||||||
* pointer array internal accounting
|
"%s state:hnp:check_job_completed state is terminated - activating notify",
|
||||||
* is maintained!
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
*/
|
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED);
|
||||||
if (1 < j) {
|
one_still_alive = true;
|
||||||
opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */
|
} else if (jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD ||
|
||||||
OBJ_RELEASE(jdata);
|
jdata->state == ORTE_JOB_STATE_NOTIFIED) {
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_state_base_output,
|
||||||
|
"%s state:hnp:check_job_completed state is killed or notified - cleaning up",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
/* release this object, ensuring that the
|
||||||
|
* pointer array internal accounting
|
||||||
|
* is maintained!
|
||||||
|
*/
|
||||||
|
if (1 < j) {
|
||||||
|
opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */
|
||||||
|
OBJ_RELEASE(jdata);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
opal_output(0, "STATE WAS %s", orte_job_state_to_str(jdata->state));
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -233,6 +233,10 @@ const char *orte_job_state_to_str(orte_job_state_t state)
|
|||||||
return "UNTERMINATED";
|
return "UNTERMINATED";
|
||||||
case ORTE_JOB_STATE_TERMINATED:
|
case ORTE_JOB_STATE_TERMINATED:
|
||||||
return "NORMALLY TERMINATED";
|
return "NORMALLY TERMINATED";
|
||||||
|
case ORTE_JOB_STATE_NOTIFY_COMPLETED:
|
||||||
|
return "NOTIFY COMPLETED";
|
||||||
|
case ORTE_JOB_STATE_NOTIFIED:
|
||||||
|
return "NOTIFIED";
|
||||||
case ORTE_JOB_STATE_ALL_JOBS_COMPLETE:
|
case ORTE_JOB_STATE_ALL_JOBS_COMPLETE:
|
||||||
return "ALL JOBS COMPLETE";
|
return "ALL JOBS COMPLETE";
|
||||||
case ORTE_JOB_STATE_ERROR:
|
case ORTE_JOB_STATE_ERROR:
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user