diff --git a/orte/mca/plm/tm/plm_tm_module.c b/orte/mca/plm/tm/plm_tm_module.c index 04bfbd1770..2067d41ff2 100644 --- a/orte/mca/plm/tm/plm_tm_module.c +++ b/orte/mca/plm/tm/plm_tm_module.c @@ -85,13 +85,19 @@ static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal); static int plm_tm_finalize(void); static int plm_tm_connect(void); -static int plm_tm_disconnect(void); static void failed_start(int fd, short event, void *arg); +static int obit_submit(int tid); /* * Local "global" variables */ static opal_event_t *ev=NULL; +static bool connected; +static tm_event_t *events_spawn = NULL; +static tm_event_t *events_obit = NULL; +static tm_task_id *tm_task_ids = NULL; +static int *evs = NULL; +static bool time_is_up; /* * Global variable @@ -107,6 +113,20 @@ orte_plm_base_module_t orte_plm_tm_module = { plm_tm_finalize }; +/* catch timeout to allow cmds to progress */ +static void timer_cb(int fd, short event, void *cbdata) +{ + opal_event_t *ev = (opal_event_t*)cbdata; + + /* free event */ + if (NULL != ev) { + free(ev); + } + /* declare time is up */ + time_is_up = true; +} + + /** * Init the module */ @@ -127,6 +147,7 @@ static int plm_tm_init(void) */ static int plm_tm_launch_job(orte_job_t *jdata) { + orte_job_t *jdatorted; orte_job_map_t *map = NULL; orte_app_context_t **apps; orte_node_t **nodes; @@ -137,21 +158,23 @@ static int plm_tm_launch_job(orte_job_t *jdata) char **argv = NULL; int argc = 0; int rc; - bool connected = false; orte_std_cntr_t launched = 0, i; char *bin_base = NULL, *lib_base = NULL; - tm_event_t *tm_events = NULL; - tm_task_id *tm_task_ids = NULL; int local_err; - tm_event_t event; bool failed_launch = true; mode_t current_umask; orte_jobid_t failed_job; orte_job_state_t job_state = ORTE_JOB_NEVER_LAUNCHED; - + int offset; + tm_event_t eventpolled; + orte_std_cntr_t num_daemons; + opal_event_t *timerev; + int j; + /* default to declaring the daemons as failed */ failed_job = ORTE_PROC_MY_NAME->jobid; - + connected = false; + /* create a jobid for this job */ if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { ORTE_ERROR_LOG(rc); @@ -183,20 +206,107 @@ static int plm_tm_launch_job(orte_job_t *jdata) goto launch_apps; } - /* Allocate a bunch of TM events to use for tm_spawn()ing */ - tm_events = malloc(sizeof(tm_event_t) * map->num_new_daemons); - if (NULL == tm_events) { - rc = ORTE_ERR_OUT_OF_RESOURCE; - ORTE_ERROR_LOG(rc); + /* lookup the daemon job object - must do this -after- the job is + * setup so the number of required daemons has been updated + */ + if (NULL == (jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; goto cleanup; } - tm_task_ids = malloc(sizeof(tm_task_id) * map->num_new_daemons); - if (NULL == tm_task_ids) { - rc = ORTE_ERR_OUT_OF_RESOURCE; - ORTE_ERROR_LOG(rc); + num_daemons = jdatorted->num_procs - 1; /* do not include myself as I am already here! */ + if (0 >= num_daemons) { + /* this won't work */ + rc = ORTE_ERR_BAD_PARAM; goto cleanup; } + + /* Allocate a bunch of TM events to use */ + if (NULL == events_spawn) { + /* spawn events for first launch */ + events_spawn = (tm_event_t*)malloc(num_daemons * sizeof(tm_event_t)); + if (NULL == events_spawn) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + /* comm_spawn launch */ + events_spawn = (tm_event_t*)realloc(events_spawn, sizeof(tm_event_t) * num_daemons); + if (NULL == events_spawn) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + } + if (NULL == events_obit) { + /* obit events for first launch */ + events_obit = (tm_event_t*)malloc(num_daemons * sizeof(tm_event_t)); + if (NULL == events_obit) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + /* comm_spawn launch */ + events_obit = (tm_event_t*)realloc(events_obit, sizeof(tm_event_t) * num_daemons); + if (NULL == events_obit) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + } + if (NULL == evs) { + /* evs for first launch */ + evs = (int*)malloc(num_daemons * sizeof(tm_event_t)); + if (NULL == evs) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + /* comm_spawn launch */ + evs = (int*)realloc(evs, sizeof(int) * num_daemons); + if (NULL == evs) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + } + /* allocate task ids for the orteds */ + if (NULL == tm_task_ids) { + /* first launch */ + tm_task_ids = (tm_task_id*)malloc(num_daemons * sizeof(tm_task_id)); + if (NULL == tm_task_ids) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + /* comm_spawn launch */ + tm_task_ids = (tm_task_id*)realloc(tm_task_ids, sizeof(tm_task_id) * num_daemons); + if (NULL == tm_task_ids) { + rc = ORTE_ERR_OUT_OF_RESOURCE; + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* compute the offset into the event/task arrays */ + offset = num_daemons - map->num_new_daemons; + + /* initialize them */ + for (i=0; i < map->num_new_daemons; i++) { + *(tm_task_ids + offset + i) = TM_NULL_TASK; + *(events_spawn + offset + i) = TM_NULL_EVENT; + *(events_obit + offset + i) = TM_NULL_EVENT; + *(evs + offset + i) = 0; + } + /* add the daemon command (as specified by user) */ orte_plm_base_setup_orted_cmd(&argc, &argv); @@ -296,7 +406,7 @@ static int plm_tm_launch_job(orte_job_t *jdata) rc = orte_util_convert_vpid_to_string(&vpid_string, nodes[i]->daemon->name.vpid); if (ORTE_SUCCESS != rc) { opal_output(0, "plm:tm: unable to get daemon vpid as string"); - exit(-1); + goto cleanup; } free(argv[proc_vpid_index]); argv[proc_vpid_index] = strdup(vpid_string); @@ -312,7 +422,7 @@ static int plm_tm_launch_job(orte_job_t *jdata) if (NULL != param) free(param); } - rc = tm_spawn(argc, argv, env, node->launch_id, tm_task_ids + launched, tm_events + launched); + rc = tm_spawn(argc, argv, env, node->launch_id, tm_task_ids + offset + launched, events_spawn + offset + launched); if (TM_SUCCESS != rc) { orte_show_help("help-plm-tm.txt", "tm-spawn-failed", true, argv[0], node->name, node->launch_id); @@ -330,14 +440,54 @@ static int plm_tm_launch_job(orte_job_t *jdata) "%s plm:tm:launch: finished spawning orteds", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* setup a timer to give the cmd a chance to be sent */ + time_is_up = false; + ORTE_DETECT_TIMEOUT(&timerev, launched, + 100, -1, timer_cb); + + ORTE_PROGRESSED_WAIT(time_is_up, 0, 1); + /* TM poll for all the spawns */ - for (i = 0; i < launched; ++i) { - rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err); + while (0 < launched) { + rc = tm_poll(TM_NULL_EVENT, &eventpolled, (int)false, &local_err); if (TM_SUCCESS != rc) { - errno = local_err; - opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc); + opal_output(0, "plm:tm: event poll for spawned daemon failed, return status = %d", rc); + rc = ORTE_ERROR; goto cleanup; } + /* if we get back the NULL event, then just continue */ + if (eventpolled == TM_NULL_EVENT) { + continue; + } + /* look for the spawned event */ + for (j=0; j < map->num_new_daemons; j++) { + if (eventpolled == *(events_spawn + offset + j)) { + /* got the event - check returned code */ + if (local_err) { + /* this orted failed to launch! */ + orte_show_help("help-plm-tm.txt", "tm-spawn-failed", + true, argv[0], nodes[j]->name, nodes[j]->launch_id); + rc = ORTE_ERROR; + goto cleanup; + } + /* register the corresponding obit so we can detect when this + * orted terminates + */ + if (ORTE_SUCCESS != (rc = obit_submit(offset+j))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* all done with this event */ + goto MOVEON; + } + } + /* if we get here, then we failed to find the event */ + opal_output(0, "TM FAILED TO FIND SPAWN EVENT WHEN LAUNCHING"); + rc = ORTE_ERROR; + goto cleanup; + + MOVEON: + launched--; } /* set a timer to tell us if one or more daemon's fails to start - use the @@ -392,16 +542,6 @@ launch_apps: opal_argv_free(env); } - if (connected) { - plm_tm_disconnect(); - } - if (NULL != tm_events) { - free(tm_events); - } - if (NULL != tm_task_ids) { - free(tm_task_ids); - } - if (NULL != lib_base) { free(lib_base); } @@ -442,6 +582,14 @@ static int plm_tm_terminate_job(orte_jobid_t jobid) return rc; } +/* quick timeout loop */ +static bool timer_fired; + +static void quicktime_cb(int fd, short event, void *cbdata) +{ + /* declare it fired */ + timer_fired = true; +} /** * Terminate the orteds for a given job @@ -449,12 +597,143 @@ static int plm_tm_terminate_job(orte_jobid_t jobid) int plm_tm_terminate_orteds(void) { int rc; + orte_job_t *jdata; + orte_proc_t **daemons; + tm_event_t eventpolled; + orte_vpid_t j, alive; + int local_err; + opal_event_t *timerev=NULL; + opal_event_t *quicktime=NULL; + struct timeval quicktimeval; + bool aborted; + + OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output, + "%s plm:tm: terminating orteds", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* lookup the daemon job object */ + if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + } + alive = jdata->num_procs - 1; /* do not include myself! */ + daemons = (orte_proc_t**)jdata->procs->addr; + aborted = false; - /* now tell them to die! */ - if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_WITH_REPLY_CMD))) { + /* tell them to die! */ + if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_NO_REPLY_CMD))) { ORTE_ERROR_LOG(rc); } + /* if there are more than just me... */ + if (0 < alive) { + /* setup a max time for the daemons to die */ + time_is_up = false; + ORTE_DETECT_TIMEOUT(&timerev, alive, + 1000000, 60000000, timer_cb); + + /* give the cmds a chance to get out */ + quicktimeval.tv_sec = 0; + quicktimeval.tv_usec = 100; + timer_fired = false; + ORTE_DETECT_TIMEOUT(&quicktime, alive, 1000, 10000, quicktime_cb); + ORTE_PROGRESSED_WAIT(timer_fired, 0, 1); + + /* now begin polling to see if daemons have terminated */ + while (!time_is_up && 0 < alive) { + OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output, + "%s plm:tm: polling for daemon termination", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + rc = tm_poll(TM_NULL_EVENT, &eventpolled, (int)false, &local_err); + if (TM_SUCCESS != rc) { + errno = local_err; + opal_output(0, "plm:tm: event poll for daemon termination failed, return status = %d", rc); + continue; /* we will wait for timeout to tell us to quit */ + } + /* if we get back the NULL event, then just continue */ + if (eventpolled == TM_NULL_EVENT) { + OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output, + "%s plm:tm: got null event", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* give system a little time to progress */ + timer_fired = false; + opal_evtimer_add(quicktime, &quicktimeval); + ORTE_PROGRESSED_WAIT(timer_fired, 0, 1); + continue; + } + /* look for the obit event */ + for (j=0; j < jdata->num_procs-1; j++) { + if (eventpolled == *(events_obit + j)) { + /* got the event - check returned code */ + if (local_err == TM_ESYSTEM) { + OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output, + "%s plm:tm: got TM_ESYSTEM on obit - resubmitting", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + if (ORTE_SUCCESS != (rc = obit_submit(j))) { + ORTE_ERROR_LOG(rc); + goto MOVEON; + } + /* give system a little time to progress */ + timer_fired = false; + opal_evtimer_add(quicktime, &quicktimeval); + ORTE_PROGRESSED_WAIT(timer_fired, 0, 1); + } + if (0 != local_err) { + OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output, + "%s plm:tm: got error %d on obit for task %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_err, j)); + rc = ORTE_ERROR; + goto MOVEON; + } + /* this daemon has terminated */ + *(tm_task_ids+j) = TM_NULL_TASK; + *(events_obit+j) = TM_NULL_EVENT; + OPAL_OUTPUT_VERBOSE((10, orte_plm_globals.output, + "%s plm:tm: task %d exited with status %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j, *(evs+j))); + /* update the termination status for this daemon */ + daemons[j+1]->exit_code = *(evs+j); + if (0 != daemons[j+1]->exit_code) { + daemons[j+1]->state = ORTE_PROC_STATE_ABORTED; + aborted = true; + } else { + daemons[j+1]->state = ORTE_PROC_STATE_TERMINATED; + } + jdata->num_terminated++; + /* all done with this event */ + goto MOVEON; + } + } + /* if we get here, then we failed to find the event */ + opal_output(0, "TM FAILED TO FIND OBIT EVENT"); + + MOVEON: + alive--; + } + + /* release event if not already done */ + if (NULL != quicktime) { + free(quicktime); + } + if (NULL != timerev) { + opal_event_del(timerev); + free(timerev); + } + } else { + /* still need to give the cmds a chance to get out so I can process + * them myself! + */ + timer_fired = false; + ORTE_DETECT_TIMEOUT(&quicktime, 1, 1000, 10000, quicktime_cb); + ORTE_PROGRESSED_WAIT(timer_fired, 0, 1); + } + + /* declare the daemons done */ + if (aborted || 0 < alive) { + jdata->state = ORTE_JOB_STATE_ABORTED; + } else { + jdata->state = ORTE_JOB_STATE_TERMINATED; + } + orte_trigger_event(&orteds_exit); return rc; } @@ -483,6 +762,24 @@ static int plm_tm_finalize(void) ORTE_ERROR_LOG(rc); } + if (connected) { + tm_finalize(); + } + + /* cleanup data arrays */ + if (NULL != events_spawn) { + free(events_spawn); + } + if (NULL != events_obit) { + free(events_obit); + } + if (NULL != tm_task_ids) { + free(tm_task_ids); + } + if (NULL != evs) { + free(evs); + } + return ORTE_SUCCESS; } @@ -513,13 +810,6 @@ static int plm_tm_connect(void) } -static int plm_tm_disconnect(void) -{ - tm_finalize(); - - return ORTE_SUCCESS; -} - /* call this function if the timer fires indicating that one * or more daemons failed to start */ @@ -540,3 +830,21 @@ static void failed_start(int fd, short dummy, void *arg) orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } + +static int obit_submit(int tid) +{ + int rc; + + if (TM_SUCCESS != (rc = tm_obit(*(tm_task_ids+tid), evs+tid, events_obit+tid))) { + opal_output(0, "failed to register termination notice for task %d", tid); + rc = ORTE_ERROR; + return rc; + } + if (*(events_obit+tid) == TM_NULL_EVENT) { + opal_output(0, "task %d is already dead", tid); + } else if (*(events_obit+tid) == TM_ERROR_EVENT) { + opal_output(0, "Error on obit return - got error event for task %d", tid); + } + + return ORTE_SUCCESS; +}