1
1

First cut at updating the ccp launcher to use the state machine

This commit was SVN r26986.
Этот коммит содержится в:
Ralph Castain 2012-08-10 17:09:33 +00:00
родитель 908166d4f8
Коммит e3e9b7345d

Просмотреть файл

@ -4,7 +4,7 @@
* reserved. * reserved.
* Copyright (c) 2004-2010 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2010 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC. All rights * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
@ -53,6 +53,7 @@
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/rmaps.h" #include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/state/state.h"
#include "orte/mca/plm/plm.h" #include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h" #include "orte/mca/plm/base/plm_private.h"
@ -73,7 +74,7 @@
*/ */
static int plm_ccp_init(void); static int plm_ccp_init(void);
static int plm_ccp_launch_job(orte_job_t *jdata); static int plm_ccp_launch_job(orte_job_t *jdata);
static int plm_ccp_terminate_orteds(); static int plm_ccp_terminate_orteds(void);
static int plm_ccp_signal_job(orte_jobid_t jobid, int32_t signal); static int plm_ccp_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_ccp_finalize(void); static int plm_ccp_finalize(void);
@ -98,6 +99,7 @@ orte_plm_base_module_t orte_plm_ccp_module = {
plm_ccp_finalize plm_ccp_finalize
}; };
static void launch_daemons(int fd, short args, void *cbdata);
/** /**
* Init the module * Init the module
@ -106,6 +108,13 @@ static int plm_ccp_init(void)
{ {
int rc; int rc;
/* point to our launch command */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_LAUNCH_DAEMONS,
launch_daemons, ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_plm_base_comm_start())) { if (ORTE_SUCCESS != (rc = orte_plm_base_comm_start())) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
@ -117,12 +126,27 @@ static int plm_ccp_init(void)
} }
static int plm_ccp_launch_job(orte_job_t *jdata)
{
if (ORTE_JOB_CONTROL_RESTART & jdata->controls) {
/* this is a restart situation - skip to the mapping stage */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
} else {
/* new job - set it up */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_INIT);
}
return ORTE_SUCCESS;
}
/* When working in this function, ALWAYS jump to "cleanup" if /* When working in this function, ALWAYS jump to "cleanup" if
* you encounter an error so that orterun will be woken up and * you encounter an error so that orterun will be woken up and
* the job can cleanly terminate * the job can cleanly terminate
*/ */
static int plm_ccp_launch_job(orte_job_t *jdata) static void launch_daemons(int fd, short args, void *cbdata)
{ {
orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = state->jdata;
orte_app_context_t *app; orte_app_context_t *app;
orte_node_t *node; orte_node_t *node;
orte_std_cntr_t launched = 0, i; orte_std_cntr_t launched = 0, i;
@ -167,14 +191,6 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
} }
} }
/* if we don't want to launch, then don't attempt to
* launch the daemons - the user really wants to just
* look at the proposed process map
*/
if (orte_do_not_launch) {
goto launch_apps;
}
/* start by launching the virtual machine */ /* start by launching the virtual machine */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_virtual_machine(jdata))) { if (ORTE_SUCCESS != (rc = orte_plm_base_setup_virtual_machine(jdata))) {
@ -182,6 +198,21 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
goto cleanup; goto cleanup;
} }
/* if we don't want to launch, then don't attempt to
* launch the daemons - the user really wants to just
* look at the proposed process map
*/
if (orte_do_not_launch) {
/* set the state to indicate the daemons reported - this
* will trigger the daemons_reported event and cause the
* job to move to the following step
*/
jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
OBJ_RELEASE(state);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:rsh: launching vm", "%s plm:rsh: launching vm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -194,8 +225,16 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
} }
if (0 == map->num_new_daemons) { if (0 == map->num_new_daemons) {
/* have all the daemons we need - launch app */ /* set the state to indicate the daemons reported - this
goto launch_apps; * will trigger the daemons_reported event and cause the
* job to move to the following step
*/
state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
if (ORTE_JOB_STATE_DAEMONS_REPORTED == daemons->state) {
ORTE_ACTIVATE_JOB_STATE(state->jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
}
OBJ_RELEASE(state);
return;
} }
/* add the daemon command (as specified by user) */ /* add the daemon command (as specified by user) */
@ -481,10 +520,13 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
goto cleanup; goto cleanup;
} }
#if 0
/* RHC: you definitely cannot do that here!! Use
* nanosleep if you need to wait a little while
*/
/* Allow some progress to occur */ /* Allow some progress to occur */
opal_event_loop(orte_event_base, OPAL_EVLOOP_NONBLOCK); opal_event_loop(orte_event_base, OPAL_EVLOOP_NONBLOCK);
#endif
launched++; launched++;
pTask->Release(); pTask->Release();
@ -503,55 +545,11 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
"%s plm:ccp:launch: finished spawning orteds", "%s plm:ccp:launch: finished spawning orteds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* wait for daemons to callback */ /* set the job state to indicate the daemons are launched */
if (ORTE_SUCCESS != (rc = orte_plm_base_daemon_callback(map->num_new_daemons))) { jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:ccp: daemon launch failed for job %s on error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid), ORTE_ERROR_NAME(rc)));
goto cleanup;
}
launch_apps:
/* setup the job */
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_job(jdata))) {
ORTE_ERROR_LOG(rc);
failed_job = jdata->jobid;
goto cleanup;
}
failed_job = jdata->jobid;
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(jdata->jobid))) {
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:ccp: launch of apps failed for job %s on error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid), ORTE_ERROR_NAME(rc)));
goto cleanup;
}
/* if we get here, then everything launched okay - record that fact */
failed_launch = false;
/* check for timing request - get stop time for launch completion and report */
if (orte_timing) {
if (0 != gettimeofday(&completionstop, NULL)) {
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: could not obtain completion stop time"));
} else {
deltat = (launchstop.tv_sec - launchstart.tv_sec)*1000000 +
(launchstop.tv_usec - launchstart.tv_usec);
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: launch completion required %d usec", deltat));
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: Launch statistics:"));
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: Average time to launch an orted: %f usec", avgtime));
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: Max time to launch an orted: %d usec at iter %d", maxtime, maxiter));
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: Min time to launch an orted: %d usec at iter %d", mintime, miniter));
}
/* set the job state to indicate the daemons are launched */
state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
cleanup: cleanup:
if (NULL != argv) { if (NULL != argv) {
@ -576,35 +574,15 @@ static int plm_ccp_launch_job(orte_job_t *jdata)
/* check for failed launch - if so, force terminate */ /* check for failed launch - if so, force terminate */
if (failed_launch) { if (failed_launch) {
if (ORTE_ERR_SILENT == rc) { ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
orte_errmgr.update_state(failed_job, ORTE_JOB_STATE_SILENT_ABORT,
NULL, ORTE_PROC_STATE_UNDEF,
0, ORTE_ERROR_DEFAULT_EXIT_CODE);
} else {
orte_errmgr.update_state(failed_job, job_state,
NULL, ORTE_PROC_STATE_UNDEF,
0, ORTE_ERROR_DEFAULT_EXIT_CODE);
}
} }
/* check for timing request - get stop time and process if so */
if (orte_timing) {
if (0 != gettimeofday(&jobstop, NULL)) {
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: could not obtain stop time"));
} else {
deltat = (jobstop.tv_sec - jobstart.tv_sec)*1000000 +
(jobstop.tv_usec - jobstart.tv_usec);
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"plm_ccp: launch of entire job required %d usec", deltat));
}
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:ccp:launch: finished", "%s plm:ccp:launch: finished",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc; OBJ_RELEASE(state);
return;
} }