1
1

Revamp the TM plm module so that we detect orted termination without requiring a callback message by using the TM native capabilities. This allows TM to function with fully routed OOB comm, and to tell us what node failed to spawn a daemon.

This commit was SVN r20027.
Этот коммит содержится в:
Ralph Castain 2008-11-20 18:57:35 +00:00
родитель 5e6536eeda
Коммит 7213c109ac

Просмотреть файл

@ -85,13 +85,19 @@ static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_tm_finalize(void);
static int plm_tm_connect(void);
static int plm_tm_disconnect(void);
static void failed_start(int fd, short event, void *arg);
static int obit_submit(int tid);
/*
* Local "global" variables
*/
static opal_event_t *ev=NULL;
static bool connected;
static tm_event_t *events_spawn = NULL;
static tm_event_t *events_obit = NULL;
static tm_task_id *tm_task_ids = NULL;
static int *evs = NULL;
static bool time_is_up;
/*
* Global variable
@ -107,6 +113,20 @@ orte_plm_base_module_t orte_plm_tm_module = {
plm_tm_finalize
};
/* catch timeout to allow cmds to progress */
static void timer_cb(int fd, short event, void *cbdata)
{
opal_event_t *ev = (opal_event_t*)cbdata;
/* free event */
if (NULL != ev) {
free(ev);
}
/* declare time is up */
time_is_up = true;
}
/**
* Init the module
*/
@ -127,6 +147,7 @@ static int plm_tm_init(void)
*/
static int plm_tm_launch_job(orte_job_t *jdata)
{
orte_job_t *jdatorted;
orte_job_map_t *map = NULL;
orte_app_context_t **apps;
orte_node_t **nodes;
@ -137,21 +158,23 @@ static int plm_tm_launch_job(orte_job_t *jdata)
char **argv = NULL;
int argc = 0;
int rc;
bool connected = false;
orte_std_cntr_t launched = 0, i;
char *bin_base = NULL, *lib_base = NULL;
tm_event_t *tm_events = NULL;
tm_task_id *tm_task_ids = NULL;
int local_err;
tm_event_t event;
bool failed_launch = true;
mode_t current_umask;
orte_jobid_t failed_job;
orte_job_state_t job_state = ORTE_JOB_NEVER_LAUNCHED;
int offset;
tm_event_t eventpolled;
orte_std_cntr_t num_daemons;
opal_event_t *timerev;
int j;
/* default to declaring the daemons as failed */
failed_job = ORTE_PROC_MY_NAME->jobid;
connected = false;
/* create a jobid for this job */
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
ORTE_ERROR_LOG(rc);
@ -183,20 +206,107 @@ static int plm_tm_launch_job(orte_job_t *jdata)
goto launch_apps;
}
/* Allocate a bunch of TM events to use for tm_spawn()ing */
tm_events = malloc(sizeof(tm_event_t) * map->num_new_daemons);
if (NULL == tm_events) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
/* lookup the daemon job object - must do this -after- the job is
* setup so the number of required daemons has been updated
*/
if (NULL == (jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto cleanup;
}
tm_task_ids = malloc(sizeof(tm_task_id) * map->num_new_daemons);
if (NULL == tm_task_ids) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
num_daemons = jdatorted->num_procs - 1; /* do not include myself as I am already here! */
if (0 >= num_daemons) {
/* this won't work */
rc = ORTE_ERR_BAD_PARAM;
goto cleanup;
}
/* Allocate a bunch of TM events to use */
if (NULL == events_spawn) {
/* spawn events for first launch */
events_spawn = (tm_event_t*)malloc(num_daemons * sizeof(tm_event_t));
if (NULL == events_spawn) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
/* comm_spawn launch */
events_spawn = (tm_event_t*)realloc(events_spawn, sizeof(tm_event_t) * num_daemons);
if (NULL == events_spawn) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (NULL == events_obit) {
/* obit events for first launch */
events_obit = (tm_event_t*)malloc(num_daemons * sizeof(tm_event_t));
if (NULL == events_obit) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
/* comm_spawn launch */
events_obit = (tm_event_t*)realloc(events_obit, sizeof(tm_event_t) * num_daemons);
if (NULL == events_obit) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (NULL == evs) {
/* evs for first launch */
evs = (int*)malloc(num_daemons * sizeof(tm_event_t));
if (NULL == evs) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
/* comm_spawn launch */
evs = (int*)realloc(evs, sizeof(int) * num_daemons);
if (NULL == evs) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* allocate task ids for the orteds */
if (NULL == tm_task_ids) {
/* first launch */
tm_task_ids = (tm_task_id*)malloc(num_daemons * sizeof(tm_task_id));
if (NULL == tm_task_ids) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
/* comm_spawn launch */
tm_task_ids = (tm_task_id*)realloc(tm_task_ids, sizeof(tm_task_id) * num_daemons);
if (NULL == tm_task_ids) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* compute the offset into the event/task arrays */
offset = num_daemons - map->num_new_daemons;
/* initialize them */
for (i=0; i < map->num_new_daemons; i++) {
*(tm_task_ids + offset + i) = TM_NULL_TASK;
*(events_spawn + offset + i) = TM_NULL_EVENT;
*(events_obit + offset + i) = TM_NULL_EVENT;
*(evs + offset + i) = 0;
}
/* add the daemon command (as specified by user) */
orte_plm_base_setup_orted_cmd(&argc, &argv);
@ -296,7 +406,7 @@ static int plm_tm_launch_job(orte_job_t *jdata)
rc = orte_util_convert_vpid_to_string(&vpid_string, nodes[i]->daemon->name.vpid);
if (ORTE_SUCCESS != rc) {
opal_output(0, "plm:tm: unable to get daemon vpid as string");
exit(-1);
goto cleanup;
}
free(argv[proc_vpid_index]);
argv[proc_vpid_index] = strdup(vpid_string);
@ -312,7 +422,7 @@ static int plm_tm_launch_job(orte_job_t *jdata)
if (NULL != param) free(param);
}
rc = tm_spawn(argc, argv, env, node->launch_id, tm_task_ids + launched, tm_events + launched);
rc = tm_spawn(argc, argv, env, node->launch_id, tm_task_ids + offset + launched, events_spawn + offset + launched);
if (TM_SUCCESS != rc) {
orte_show_help("help-plm-tm.txt", "tm-spawn-failed",
true, argv[0], node->name, node->launch_id);
@ -330,14 +440,54 @@ static int plm_tm_launch_job(orte_job_t *jdata)
"%s plm:tm:launch: finished spawning orteds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup a timer to give the cmd a chance to be sent */
time_is_up = false;
ORTE_DETECT_TIMEOUT(&timerev, launched,
100, -1, timer_cb);
ORTE_PROGRESSED_WAIT(time_is_up, 0, 1);
/* TM poll for all the spawns */
for (i = 0; i < launched; ++i) {
rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
while (0 < launched) {
rc = tm_poll(TM_NULL_EVENT, &eventpolled, (int)false, &local_err);
if (TM_SUCCESS != rc) {
errno = local_err;
opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc);
opal_output(0, "plm:tm: event poll for spawned daemon failed, return status = %d", rc);
rc = ORTE_ERROR;
goto cleanup;
}
/* if we get back the NULL event, then just continue */
if (eventpolled == TM_NULL_EVENT) {
continue;
}
/* look for the spawned event */
for (j=0; j < map->num_new_daemons; j++) {
if (eventpolled == *(events_spawn + offset + j)) {
/* got the event - check returned code */
if (local_err) {
/* this orted failed to launch! */
orte_show_help("help-plm-tm.txt", "tm-spawn-failed",
true, argv[0], nodes[j]->name, nodes[j]->launch_id);
rc = ORTE_ERROR;
goto cleanup;
}
/* register the corresponding obit so we can detect when this
* orted terminates
*/
if (ORTE_SUCCESS != (rc = obit_submit(offset+j))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* all done with this event */
goto MOVEON;
}
}
/* if we get here, then we failed to find the event */
opal_output(0, "TM FAILED TO FIND SPAWN EVENT WHEN LAUNCHING");
rc = ORTE_ERROR;
goto cleanup;
MOVEON:
launched--;
}
/* set a timer to tell us if one or more daemon's fails to start - use the
@ -392,16 +542,6 @@ launch_apps:
opal_argv_free(env);
}
if (connected) {
plm_tm_disconnect();
}
if (NULL != tm_events) {
free(tm_events);
}
if (NULL != tm_task_ids) {
free(tm_task_ids);
}
if (NULL != lib_base) {
free(lib_base);
}
@ -442,6 +582,14 @@ static int plm_tm_terminate_job(orte_jobid_t jobid)
return rc;
}
/* quick timeout loop */
static bool timer_fired;
static void quicktime_cb(int fd, short event, void *cbdata)
{
/* declare it fired */
timer_fired = true;
}
/**
* Terminate the orteds for a given job
@ -449,12 +597,143 @@ static int plm_tm_terminate_job(orte_jobid_t jobid)
int plm_tm_terminate_orteds(void)
{
int rc;
orte_job_t *jdata;
orte_proc_t **daemons;
tm_event_t eventpolled;
orte_vpid_t j, alive;
int local_err;
opal_event_t *timerev=NULL;
opal_event_t *quicktime=NULL;
struct timeval quicktimeval;
bool aborted;
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: terminating orteds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* lookup the daemon job object */
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
}
alive = jdata->num_procs - 1; /* do not include myself! */
daemons = (orte_proc_t**)jdata->procs->addr;
aborted = false;
/* now tell them to die! */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_WITH_REPLY_CMD))) {
/* tell them to die! */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_NO_REPLY_CMD))) {
ORTE_ERROR_LOG(rc);
}
/* if there are more than just me... */
if (0 < alive) {
/* setup a max time for the daemons to die */
time_is_up = false;
ORTE_DETECT_TIMEOUT(&timerev, alive,
1000000, 60000000, timer_cb);
/* give the cmds a chance to get out */
quicktimeval.tv_sec = 0;
quicktimeval.tv_usec = 100;
timer_fired = false;
ORTE_DETECT_TIMEOUT(&quicktime, alive, 1000, 10000, quicktime_cb);
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
/* now begin polling to see if daemons have terminated */
while (!time_is_up && 0 < alive) {
OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output,
"%s plm:tm: polling for daemon termination",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
rc = tm_poll(TM_NULL_EVENT, &eventpolled, (int)false, &local_err);
if (TM_SUCCESS != rc) {
errno = local_err;
opal_output(0, "plm:tm: event poll for daemon termination failed, return status = %d", rc);
continue; /* we will wait for timeout to tell us to quit */
}
/* if we get back the NULL event, then just continue */
if (eventpolled == TM_NULL_EVENT) {
OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output,
"%s plm:tm: got null event",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* give system a little time to progress */
timer_fired = false;
opal_evtimer_add(quicktime, &quicktimeval);
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
continue;
}
/* look for the obit event */
for (j=0; j < jdata->num_procs-1; j++) {
if (eventpolled == *(events_obit + j)) {
/* got the event - check returned code */
if (local_err == TM_ESYSTEM) {
OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output,
"%s plm:tm: got TM_ESYSTEM on obit - resubmitting",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (rc = obit_submit(j))) {
ORTE_ERROR_LOG(rc);
goto MOVEON;
}
/* give system a little time to progress */
timer_fired = false;
opal_evtimer_add(quicktime, &quicktimeval);
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
}
if (0 != local_err) {
OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output,
"%s plm:tm: got error %d on obit for task %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_err, j));
rc = ORTE_ERROR;
goto MOVEON;
}
/* this daemon has terminated */
*(tm_task_ids+j) = TM_NULL_TASK;
*(events_obit+j) = TM_NULL_EVENT;
OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output,
"%s plm:tm: task %d exited with status %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j, *(evs+j)));
/* update the termination status for this daemon */
daemons[j+1]->exit_code = *(evs+j);
if (0 != daemons[j+1]->exit_code) {
daemons[j+1]->state = ORTE_PROC_STATE_ABORTED;
aborted = true;
} else {
daemons[j+1]->state = ORTE_PROC_STATE_TERMINATED;
}
jdata->num_terminated++;
/* all done with this event */
goto MOVEON;
}
}
/* if we get here, then we failed to find the event */
opal_output(0, "TM FAILED TO FIND OBIT EVENT");
MOVEON:
alive--;
}
/* release event if not already done */
if (NULL != quicktime) {
free(quicktime);
}
if (NULL != timerev) {
opal_event_del(timerev);
free(timerev);
}
} else {
/* still need to give the cmds a chance to get out so I can process
* them myself!
*/
timer_fired = false;
ORTE_DETECT_TIMEOUT(&quicktime, 1, 1000, 10000, quicktime_cb);
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
}
/* declare the daemons done */
if (aborted || 0 < alive) {
jdata->state = ORTE_JOB_STATE_ABORTED;
} else {
jdata->state = ORTE_JOB_STATE_TERMINATED;
}
orte_trigger_event(&orteds_exit);
return rc;
}
@ -483,6 +762,24 @@ static int plm_tm_finalize(void)
ORTE_ERROR_LOG(rc);
}
if (connected) {
tm_finalize();
}
/* cleanup data arrays */
if (NULL != events_spawn) {
free(events_spawn);
}
if (NULL != events_obit) {
free(events_obit);
}
if (NULL != tm_task_ids) {
free(tm_task_ids);
}
if (NULL != evs) {
free(evs);
}
return ORTE_SUCCESS;
}
@ -513,13 +810,6 @@ static int plm_tm_connect(void)
}
static int plm_tm_disconnect(void)
{
tm_finalize();
return ORTE_SUCCESS;
}
/* call this function if the timer fires indicating that one
* or more daemons failed to start
*/
@ -540,3 +830,21 @@ static void failed_start(int fd, short dummy, void *arg)
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1,
ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
}
static int obit_submit(int tid)
{
int rc;
if (TM_SUCCESS != (rc = tm_obit(*(tm_task_ids+tid), evs+tid, events_obit+tid))) {
opal_output(0, "failed to register termination notice for task %d", tid);
rc = ORTE_ERROR;
return rc;
}
if (*(events_obit+tid) == TM_NULL_EVENT) {
opal_output(0, "task %d is already dead", tid);
} else if (*(events_obit+tid) == TM_ERROR_EVENT) {
opal_output(0, "Error on obit return - got error event for task %d", tid);
}
return ORTE_SUCCESS;
}