/* * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #ifdef HAVE_STRING_H #include #endif #include "opal/util/output.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/iof/iof.h" #include "orte/mca/plm/base/base.h" #include "orte/mca/ras/base/base.h" #include "orte/mca/rmaps/base/base.h" #include "orte/mca/routed/routed.h" #include "orte/mca/notifier/notifier.h" #include "orte/mca/sensor/sensor.h" #include "orte/util/session_dir.h" #include "orte/runtime/orte_quit.h" #include "orte/mca/state/state.h" #include "orte/mca/state/base/base.h" #include "orte/mca/state/base/state_private.h" #include "state_hnp.h" /* * Module functions: Global */ static int init(void); static int finalize(void); /****************** * HNP module - just uses base functions after * initializing the proc state machine. Job state * machine is unused by hnplication procs at this * time. ******************/ orte_state_base_module_t orte_state_hnp_module = { init, finalize, orte_state_base_activate_job_state, orte_state_base_add_job_state, orte_state_base_set_job_state_callback, orte_state_base_set_job_state_priority, orte_state_base_remove_job_state, orte_state_base_activate_proc_state, orte_state_base_add_proc_state, orte_state_base_set_proc_state_callback, orte_state_base_set_proc_state_priority, orte_state_base_remove_proc_state }; static void local_launch_complete(int fd, short argc, void *cbdata) { orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; orte_job_t *jdata = state->jdata; if (orte_report_launch_progress) { if (0 == jdata->num_daemons_reported % 100 || jdata->num_daemons_reported == orte_process_info.num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS); } } OBJ_RELEASE(state); } static void track_procs(int fd, short argc, void *cbdata); static void check_all_complete(int fd, short argc, void *cbdata); static void report_progress(int fd, short argc, void *cbdata); /* defined default state machine sequence - individual * plm's must add a state for launching daemons */ static orte_job_state_t launch_states[] = { ORTE_JOB_STATE_INIT, ORTE_JOB_STATE_ALLOCATE, ORTE_JOB_STATE_DAEMONS_LAUNCHED, ORTE_JOB_STATE_DAEMONS_REPORTED, ORTE_JOB_STATE_MAP, ORTE_JOB_STATE_SYSTEM_PREP, ORTE_JOB_STATE_LAUNCH_APPS, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, ORTE_JOB_STATE_RUNNING, ORTE_JOB_STATE_REGISTERED, /* termination states */ ORTE_JOB_STATE_TERMINATED, ORTE_JOB_STATE_ALL_JOBS_COMPLETE, ORTE_JOB_STATE_DAEMONS_TERMINATED }; static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_setup_job, orte_ras_base_allocate, orte_plm_base_daemons_launched, orte_plm_base_daemons_reported, orte_rmaps_base_map_job, orte_plm_base_complete_setup, orte_plm_base_launch_apps, local_launch_complete, orte_plm_base_post_launch, orte_plm_base_registered, check_all_complete, orte_quit, orte_quit }; static orte_proc_state_t proc_states[] = { ORTE_PROC_STATE_RUNNING, ORTE_PROC_STATE_REGISTERED, ORTE_PROC_STATE_IOF_COMPLETE, ORTE_PROC_STATE_WAITPID_FIRED, ORTE_PROC_STATE_TERMINATED }; static orte_state_cbfunc_t proc_callbacks[] = { track_procs, track_procs, track_procs, track_procs, track_procs }; /************************ * API Definitions ************************/ static int init(void) { int i, rc; int num_states; /* setup the state machines */ OBJ_CONSTRUCT(&orte_job_states, opal_list_t); OBJ_CONSTRUCT(&orte_proc_states, opal_list_t); /* setup the job state machine */ num_states = sizeof(launch_states) / sizeof(orte_job_state_t); for (i=0; i < num_states; i++) { if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i], launch_callbacks[i], ORTE_SYS_PRI))) { ORTE_ERROR_LOG(rc); } } /* add a default error response */ if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT, orte_quit, ORTE_ERROR_PRI))) { ORTE_ERROR_LOG(rc); } /* add callback to report progress, if requested */ if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS, report_progress, ORTE_INFO_PRI))) { ORTE_ERROR_LOG(rc); } if (5 < opal_output_get_verbosity(orte_state_base_output)) { orte_state_base_print_job_state_machine(); } /* populate the proc state machine to allow us to * track proc lifecycle changes */ num_states = sizeof(proc_states) / sizeof(orte_proc_state_t); for (i=0; i < num_states; i++) { if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i], proc_callbacks[i], ORTE_SYS_PRI))) { ORTE_ERROR_LOG(rc); } } if (5 < opal_output_get_verbosity(orte_state_base_output)) { orte_state_base_print_proc_state_machine(); } return ORTE_SUCCESS; } static int finalize(void) { opal_list_item_t *item; /* cleanup the proc state machine */ while (NULL != (item = opal_list_remove_first(&orte_proc_states))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&orte_proc_states); return ORTE_SUCCESS; } static void cleanup_node(orte_proc_t *proc) { orte_node_t *node; orte_proc_t *p; int i; if (NULL == (node = proc->node)) { return; } node->num_procs--; node->slots_inuse--; for (i=0; i < node->procs->size; i++) { if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { continue; } if (p->name.jobid == proc->name.jobid && p->name.vpid == proc->name.vpid) { opal_pointer_array_set_item(node->procs, i, NULL); OBJ_RELEASE(p); break; } } } static void report_progress(int fd, short argc, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_job_t *jdata = caddy->jdata; opal_output(orte_clean_output, "App launch reported: %d (out of %d) daemons - %d (out of %d) procs", (int)jdata->num_daemons_reported, (int)orte_process_info.num_procs, (int)jdata->num_launched, (int)jdata->num_procs); OBJ_RELEASE(caddy); } static void track_procs(int fd, short argc, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_process_name_t *proc = &caddy->name; orte_proc_state_t state = caddy->proc_state; orte_job_t *jdata; orte_proc_t *pdata; OPAL_OUTPUT_VERBOSE((5, orte_state_base_output, "%s state:hnp:track_procs called for proc %s state %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(proc), orte_proc_state_to_str(state))); /* get the job object for this proc */ if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); if (ORTE_PROC_STATE_RUNNING == state) { /* update the proc state */ pdata->state = state; jdata->num_launched++; if (jdata->num_launched == jdata->num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING); } } else if (ORTE_PROC_STATE_REGISTERED == state) { /* update the proc state */ pdata->state = state; jdata->num_reported++; if (jdata->num_reported == jdata->num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED); } } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) { /* update the proc state */ pdata->state = state; /* Release only the stdin IOF file descriptor for this child, if one * was defined. File descriptors for the other IOF channels - stdout, * stderr, and stddiag - were released when their associated pipes * were cleared and closed due to termination of the process */ if (NULL != orte_iof.close) { orte_iof.close(proc, ORTE_IOF_STDIN); } pdata->iof_complete = true; if (pdata->waitpid_recvd) { /* the proc has terminated */ pdata->alive = false; pdata->state = ORTE_PROC_STATE_TERMINATED; /* return the allocated slot for reuse */ cleanup_node(pdata); /* Clean up the session directory as if we were the process * itself. This covers the case where the process died abnormally * and didn't cleanup its own session directory. */ orte_session_dir_finalize(proc); /* track job status */ jdata->num_terminated++; if (jdata->num_terminated == jdata->num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); } } } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) { /* update the proc state */ pdata->state = state; pdata->waitpid_recvd = true; if (pdata->iof_complete) { /* the proc has terminated */ pdata->alive = false; pdata->state = ORTE_PROC_STATE_TERMINATED; /* return the allocated slot for reuse */ cleanup_node(pdata); /* Clean up the session directory as if we were the process * itself. This covers the case where the process died abnormally * and didn't cleanup its own session directory. */ orte_session_dir_finalize(proc); /* track job status */ jdata->num_terminated++; if (jdata->num_terminated == jdata->num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); } } } else if (ORTE_PROC_STATE_TERMINATED == state) { /* update the proc state */ pdata->state = state; if (pdata->local_proc) { /* Clean up the session directory as if we were the process * itself. This covers the case where the process died abnormally * and didn't cleanup its own session directory. */ orte_session_dir_finalize(proc); } /* return the allocated slot for reuse */ cleanup_node(pdata); /* track job status */ jdata->num_terminated++; if (jdata->num_terminated == jdata->num_procs) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); } } cleanup: OBJ_RELEASE(caddy); } static void check_all_complete(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_job_t *jdata = caddy->jdata; orte_proc_t *proc; int i; orte_std_cntr_t j; orte_job_t *job; orte_node_t *node; orte_job_map_t *map; orte_std_cntr_t index; bool one_still_alive; orte_vpid_t lowest=0; char *msg; OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_complete on job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); if (NULL == jdata) { /* just check to see if the daemons are complete */ OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_complete - received NULL job, checking daemons", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); goto CHECK_DAEMONS; } /* turn off any sensor monitors on this job */ orte_sensor.stop(jdata->jobid); if (0 < jdata->num_non_zero_exit && !orte_abort_non_zero_exit) { if (!orte_report_child_jobs_separately || 1 == ORTE_LOCAL_JOBID(jdata->jobid)) { /* update the exit code */ ORTE_UPDATE_EXIT_STATUS(lowest); } /* warn user */ opal_output(orte_clean_output, "-------------------------------------------------------\n" "While %s job %s terminated normally, %d %s. Further examination may be required.\n" "-------------------------------------------------------", (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "the primary" : "child", (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "" : ORTE_LOCAL_JOBID_PRINT(jdata->jobid), jdata->num_non_zero_exit, (1 == jdata->num_non_zero_exit) ? "process returned\na non-zero exit code." : "processes returned\nnon-zero exit codes."); } OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_completed declared job %s normally terminated - checking all jobs", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* if this job is a continuously operating one, then don't do * anything further - just return here */ if (NULL != jdata && (ORTE_JOB_CONTROL_CONTINUOUS_OP & jdata->controls || ORTE_JOB_CONTROL_RECOVERABLE & jdata->controls)) { goto CHECK_ALIVE; } /* if the job that is being checked is the HNP, then we are * trying to terminate the orteds. In that situation, we * do -not- check all jobs - we simply notify the HNP * that the orteds are complete. Also check special case * if jdata is NULL - we want * to definitely declare the job done if the orteds * have completed, no matter what else may be happening. * This can happen if a ctrl-c hits in the "wrong" place * while launching */ CHECK_DAEMONS: if (jdata == NULL || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { if (0 == orte_routed.num_routes()) { /* orteds are done! */ OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s orteds complete - exiting", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); if (NULL == jdata) { jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); } ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_TERMINATED); OBJ_RELEASE(caddy); return; } OBJ_RELEASE(caddy); return; } /* Release the resources used by this job. Since some errmgrs may want * to continue using resources allocated to the job as part of their * fault recovery procedure, we only do this once the job is "complete". * Note that an aborted/killed job -is- flagged as complete and will * therefore have its resources released. We need to do this after * we call the errmgr so that any attempt to restart the job will * avoid doing so in the exact same place as the current job */ if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) { map = jdata->map; for (index = 0; index < map->nodes->size; index++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) { continue; } OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s releasing procs from node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), node->name)); for (i = 0; i < node->procs->size; i++) { if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { continue; } if (proc->name.jobid != jdata->jobid) { /* skip procs from another job */ continue; } node->slots_inuse--; node->num_procs--; OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s releasing proc %s from node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), node->name)); /* set the entry in the node array to NULL */ opal_pointer_array_set_item(node->procs, i, NULL); /* release the proc once for the map entry */ OBJ_RELEASE(proc); } } OBJ_RELEASE(map); jdata->map = NULL; } CHECK_ALIVE: /* now check to see if all jobs are done - release this jdata * object when we find it */ one_still_alive = false; for (j=1; j < orte_job_data->size; j++) { if (NULL == (job = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, j))) { /* since we are releasing jdata objects as we * go, we can no longer assume that the job_data * array is left justified */ continue; } /* if this is the job we are checking AND it normally terminated, * then go ahead and release it. We cannot release it if it * abnormally terminated as mpirun needs the info so it can * report appropriately to the user * * NOTE: do not release the primary job (j=1) so we * can pretty-print completion message */ if (NULL != jdata && job->jobid == jdata->jobid && (jdata->state == ORTE_JOB_STATE_TERMINATED || jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD)) { /* release this object, ensuring that the * pointer array internal accounting * is maintained! */ if (1 < j) { opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ OBJ_RELEASE(jdata); } continue; } /* if the job is flagged to not be monitored, skip it */ if (ORTE_JOB_CONTROL_DO_NOT_MONITOR & job->controls) { continue; } /* when checking for job termination, we must be sure to NOT check * our own job as it - rather obviously - has NOT terminated! */ if (job->num_terminated < job->num_procs) { /* we have at least one job that is not done yet - we cannot * just return, though, as we need to ensure we cleanout the * job data for the job that just completed */ OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_completed job %s is not terminated (%d:%d)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job->jobid), job->num_terminated, job->num_procs)); one_still_alive = true; } else { OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_completed job %s is terminated (%d vs %d [%s])", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job->jobid), job->num_terminated, job->num_procs, (NULL == jdata) ? "UNKNOWN" : orte_job_state_to_str(jdata->state) )); } } /* if a job is still alive, we just return */ if (one_still_alive) { OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_completed at least one job is not terminated", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); OBJ_RELEASE(caddy); return; } /* if we get here, then all jobs are done, so terminate */ OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, "%s state:hnp:check_job_completed all jobs terminated", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* set the exit status to 0 - this will only happen if it * wasn't already set by an error condition */ ORTE_UPDATE_EXIT_STATUS(0); /* provide a notifier message if that framework is active - ignored otherwise */ if (NULL != (job = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, 1))) { if (0 == orte_exit_status) { asprintf(&msg, "Job %s complete", ORTE_JOBID_PRINT(job->jobid)); orte_notifier.log(ORTE_NOTIFIER_INFO, 0, msg); } else { asprintf(&msg, "Job %s terminated abnormally", ORTE_JOBID_PRINT(job->jobid)); orte_notifier.log(ORTE_NOTIFIER_ALERT, orte_exit_status, msg); } free(msg); /* this job object will be release during finalize */ } /* order daemon termination - this tells us to cleanup * our local procs as well as telling remote daemons * to die */ orte_plm.terminate_orteds(); OBJ_RELEASE(caddy); }