diff --git a/orte/mca/plm/base/base.h b/orte/mca/plm/base/base.h index d56cfdc507..a824e399af 100644 --- a/orte/mca/plm/base/base.h +++ b/orte/mca/plm/base/base.h @@ -86,8 +86,10 @@ ORTE_DECLSPEC void orte_plm_base_setup_job(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_setup_job_complete(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_complete_setup(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_daemons_reported(int fd, short args, void *cbdata); +ORTE_DECLSPEC void orte_plm_base_allocation_complete(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_daemons_launched(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_vm_ready(int fd, short args, void *cbdata); +ORTE_DECLSPEC void orte_plm_base_mapping_complete(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_launch_apps(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_post_launch(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_plm_base_registered(int fd, short args, void *cbdata); diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 00f8bbfac5..0f7b3e7ced 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -118,6 +118,18 @@ void orte_plm_base_daemons_reported(int fd, short args, void *cbdata) OBJ_RELEASE(caddy); } +void orte_plm_base_allocation_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + + /* move the state machine along */ + caddy->jdata->state = ORTE_JOB_STATE_ALLOCATION_COMPLETE; + ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_LAUNCH_DAEMONS); + + /* cleanup */ + OBJ_RELEASE(caddy); +} + void orte_plm_base_daemons_launched(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; @@ -142,6 +154,19 @@ void orte_plm_base_vm_ready(int fd, short args, void *cbdata) OBJ_RELEASE(caddy); } +void orte_plm_base_mapping_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + + /* move the state machine along */ + caddy->jdata->state = ORTE_JOB_STATE_MAP_COMPLETE; + ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_SYSTEM_PREP); + + /* cleanup */ + OBJ_RELEASE(caddy); +} + + void orte_plm_base_setup_job(int fd, short args, void *cbdata) { int rc; @@ -1101,6 +1126,72 @@ int orte_plm_base_setup_virtual_machine(orte_job_t *jdata) return ORTE_ERR_NOT_FOUND; } + /* if we are not working with a virtual machine, then we + * look across all jobs and ensure that the "VM" contains + * all nodes with application procs on them + */ + if (ORTE_JOB_CONTROL_NO_VM & daemons->controls) { + OBJ_CONSTRUCT(&nodes, opal_list_t); + if (NULL == daemons->map) { + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:setup_vm creating map", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* this is the first time thru, so the vm is just getting + * defined - create a map for it + */ + daemons->map = OBJ_NEW(orte_job_map_t); + } + map = daemons->map; + /* loop across all nodes and include those that have + * num_procs > 0 && no daemon already on them + */ + for (i=1; i < orte_node_pool->size; i++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { + continue; + } + /* ignore nodes that are marked as do-not-use for this mapping */ + if (ORTE_NODE_STATE_DO_NOT_USE == node->state) { + /* reset the state so it can be used another time */ + node->state = ORTE_NODE_STATE_UP; + continue; + } + if (ORTE_NODE_STATE_DOWN == node->state) { + continue; + } + if (ORTE_NODE_STATE_NOT_INCLUDED == node->state) { + /* not to be used */ + continue; + } + if (0 < node->num_procs) { + /* retain a copy for our use in case the item gets + * destructed along the way + */ + OBJ_RETAIN(node); + opal_list_append(&nodes, &node->super); + } + } + /* see if anybody had procs */ + if (0 == opal_list_get_size(&nodes)) { + /* if the HNP has some procs, then we are still good */ + node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0); + if (0 < node->num_procs) { + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:setup_vm only HNP in use", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + OBJ_DESTRUCT(&nodes); + /* mark that the daemons have reported so we can proceed */ + daemons->state = ORTE_JOB_STATE_DAEMONS_REPORTED; + return ORTE_SUCCESS; + } + /* well, if the HNP doesn't have any procs, and neither did + * anyone else...then we have a big problem + */ + ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + return ORTE_ERR_FATAL; + } + goto process; + } + if (NULL == daemons->map) { OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:setup_vm creating map", @@ -1229,6 +1320,7 @@ int orte_plm_base_setup_virtual_machine(orte_job_t *jdata) return ORTE_SUCCESS; } + process: /* cycle thru all available nodes and find those that do not already * have a daemon on them - no need to include our own as we are * obviously already here! If a max vm size was given, then limit diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index f7334043b2..cc805d7897 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -99,18 +99,20 @@ typedef int32_t orte_job_state_t; #define ORTE_JOB_STATE_INIT 1 /* ready to be assigned id */ #define ORTE_JOB_STATE_INIT_COMPLETE 2 /* jobid assigned and setup */ #define ORTE_JOB_STATE_ALLOCATE 3 /* ready to be allocated */ -#define ORTE_JOB_STATE_MAP 4 /* ready to be mapped */ -#define ORTE_JOB_STATE_SYSTEM_PREP 5 /* ready for final sanity check and system values updated */ -#define ORTE_JOB_STATE_LAUNCH_DAEMONS 6 /* ready to launch daemons */ -#define ORTE_JOB_STATE_DAEMONS_LAUNCHED 7 /* daemons for this job have been launched */ -#define ORTE_JOB_STATE_DAEMONS_REPORTED 8 /* all launched daemons have reported */ -#define ORTE_JOB_STATE_VM_READY 9 /* the VM is ready for operation */ -#define ORTE_JOB_STATE_LAUNCH_APPS 10 /* ready to launch apps */ -#define ORTE_JOB_STATE_RUNNING 11 /* all procs have been fork'd */ -#define ORTE_JOB_STATE_SUSPENDED 12 /* job has been suspended */ -#define ORTE_JOB_STATE_REGISTERED 13 /* all procs registered for sync */ -#define ORTE_JOB_STATE_READY_FOR_DEBUGGERS 14 /* job ready for debugger init after spawn */ -#define ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE 15 /* all local procs have attempted launch */ +#define ORTE_JOB_STATE_ALLOCATION_COMPLETE 4 /* allocation completed */ +#define ORTE_JOB_STATE_MAP 5 /* ready to be mapped */ +#define ORTE_JOB_STATE_MAP_COMPLETE 6 /* mapping complete */ +#define ORTE_JOB_STATE_SYSTEM_PREP 7 /* ready for final sanity check and system values updated */ +#define ORTE_JOB_STATE_LAUNCH_DAEMONS 8 /* ready to launch daemons */ +#define ORTE_JOB_STATE_DAEMONS_LAUNCHED 9 /* daemons for this job have been launched */ +#define ORTE_JOB_STATE_DAEMONS_REPORTED 10 /* all launched daemons have reported */ +#define ORTE_JOB_STATE_VM_READY 11 /* the VM is ready for operation */ +#define ORTE_JOB_STATE_LAUNCH_APPS 12 /* ready to launch apps */ +#define ORTE_JOB_STATE_RUNNING 13 /* all procs have been fork'd */ +#define ORTE_JOB_STATE_SUSPENDED 14 /* job has been suspended */ +#define ORTE_JOB_STATE_REGISTERED 15 /* all procs registered for sync */ +#define ORTE_JOB_STATE_READY_FOR_DEBUGGERS 16 /* job ready for debugger init after spawn */ +#define ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE 17 /* all local procs have attempted launch */ /* * Define a "boundary" so we can easily and quickly determine diff --git a/orte/mca/ras/base/ras_base_allocate.c b/orte/mca/ras/base/ras_base_allocate.c index b5fe8c07b1..c75113b1f9 100644 --- a/orte/mca/ras/base/ras_base_allocate.c +++ b/orte/mca/ras/base/ras_base_allocate.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -434,7 +434,7 @@ void orte_ras_base_allocate(int fd, short args, void *cbdata) jdata->total_slots_alloc = orte_ras_base.total_slots_alloc; /* set the job state to the next position */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LAUNCH_DAEMONS); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATION_COMPLETE); /* cleanup */ OBJ_RELEASE(caddy); diff --git a/orte/mca/rmaps/base/rmaps_base_map_job.c b/orte/mca/rmaps/base/rmaps_base_map_job.c index 1549ac0c90..e5ced7d729 100644 --- a/orte/mca/rmaps/base/rmaps_base_map_job.c +++ b/orte/mca/rmaps/base/rmaps_base_map_job.c @@ -302,7 +302,7 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata) } } /* set the job state to the next position */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SYSTEM_PREP); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP_COMPLETE); /* cleanup */ OBJ_RELEASE(caddy); diff --git a/orte/mca/rmaps/base/rmaps_base_support_fns.c b/orte/mca/rmaps/base/rmaps_base_support_fns.c index 8f690af968..5bd2efbe8b 100644 --- a/orte/mca/rmaps/base/rmaps_base_support_fns.c +++ b/orte/mca/rmaps/base/rmaps_base_support_fns.c @@ -129,10 +129,17 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr orte_std_cntr_t num_slots; orte_std_cntr_t i; int rc; + orte_job_t *daemons; + bool novm; /** set default answer */ *total_num_slots = 0; + /* get the daemon job object */ + daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + /* see is we have a vm or not */ + novm = ORTE_JOB_CONTROL_NO_VM & daemons->controls; + /* if the hnp was allocated, include it unless flagged not to */ if (orte_hnp_is_allocated && !(ORTE_GET_MAPPING_DIRECTIVE(policy) & ORTE_MAPPING_NO_USE_LOCAL)) { if (NULL != (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0))) { @@ -180,8 +187,10 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr /* not to be used */ continue; } - /* if this node wasn't included in the vm (e.g., by -host), ignore it */ - if (NULL == node->daemon) { + /* if this node wasn't included in the vm (e.g., by -host), ignore it, + * unless we are mapping prior to launching the vm + */ + if (NULL == node->daemon && !novm) { continue; } /* retain a copy for our use in case the item gets @@ -195,7 +204,8 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr */ node->mapped = false; } - if (NULL == nd || nd->daemon->name.vpid < node->daemon->name.vpid) { + if (NULL == nd || NULL == nd->daemon || + nd->daemon->name.vpid < node->daemon->name.vpid) { /* just append to end */ opal_list_append(allocated_nodes, &node->super); nd = node; diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index bb2772d65b..3b7efe20e7 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -16,7 +16,13 @@ #include "opal/mca/event/event.h" #include "orte/runtime/orte_globals.h" -#include "orte/mca/plm/plm_types.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/iof/iof.h" +#include "orte/mca/rmaps/rmaps_types.h" +#include "orte/mca/plm/plm.h" +#include "orte/mca/routed/routed.h" +#include "orte/mca/sensor/sensor.h" +#include "orte/util/session_dir.h" #include "orte/mca/state/base/base.h" #include "orte/mca/state/base/state_private.h" @@ -372,3 +378,422 @@ void orte_state_base_print_proc_state_machine(void) } } +static void cleanup_node(orte_proc_t *proc) +{ + orte_node_t *node; + orte_proc_t *p; + int i; + + if (NULL == (node = proc->node)) { + return; + } + node->num_procs--; + node->slots_inuse--; + for (i=0; i < node->procs->size; i++) { + if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { + continue; + } + if (p->name.jobid == proc->name.jobid && + p->name.vpid == proc->name.vpid) { + opal_pointer_array_set_item(node->procs, i, NULL); + OBJ_RELEASE(p); + break; + } + } +} + +void orte_state_base_local_launch_complete(int fd, short argc, void *cbdata) +{ + orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = state->jdata; + + if (orte_report_launch_progress) { + if (0 == jdata->num_daemons_reported % 100 || + jdata->num_daemons_reported == orte_process_info.num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS); + } + } + OBJ_RELEASE(state); +} + +void orte_state_base_cleanup_job(int fd, short argc, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:cleanup on job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); + + /* flag that we were notified */ + jdata->state = ORTE_JOB_STATE_NOTIFIED; + /* send us back thru job complete */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + OBJ_RELEASE(caddy); +} + +void orte_state_base_report_progress(int fd, short argc, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + + opal_output(orte_clean_output, "App launch reported: %d (out of %d) daemons - %d (out of %d) procs", + (int)jdata->num_daemons_reported, (int)orte_process_info.num_procs, + (int)jdata->num_launched, (int)jdata->num_procs); + OBJ_RELEASE(caddy); +} + +void orte_state_base_track_procs(int fd, short argc, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_process_name_t *proc = &caddy->name; + orte_proc_state_t state = caddy->proc_state; + orte_job_t *jdata; + orte_proc_t *pdata; + + OPAL_OUTPUT_VERBOSE((5, orte_state_base_output, + "%s state:base:track_procs called for proc %s state %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(proc), + orte_proc_state_to_str(state))); + + /* get the job object for this proc */ + if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + goto cleanup; + } + pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); + + if (ORTE_PROC_STATE_RUNNING == state) { + /* update the proc state */ + pdata->state = state; + jdata->num_launched++; + if (jdata->num_launched == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING); + } + } else if (ORTE_PROC_STATE_REGISTERED == state) { + /* update the proc state */ + pdata->state = state; + jdata->num_reported++; + if (jdata->num_reported == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED); + } + } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) { + /* update the proc state */ + pdata->state = state; + /* Release only the stdin IOF file descriptor for this child, if one + * was defined. File descriptors for the other IOF channels - stdout, + * stderr, and stddiag - were released when their associated pipes + * were cleared and closed due to termination of the process + */ + if (NULL != orte_iof.close) { + orte_iof.close(proc, ORTE_IOF_STDIN); + } + pdata->iof_complete = true; + if (pdata->waitpid_recvd) { + /* the proc has terminated */ + pdata->alive = false; + pdata->state = ORTE_PROC_STATE_TERMINATED; + /* return the allocated slot for reuse */ + cleanup_node(pdata); + /* Clean up the session directory as if we were the process + * itself. This covers the case where the process died abnormally + * and didn't cleanup its own session directory. + */ + orte_session_dir_finalize(proc); + /* track job status */ + jdata->num_terminated++; + if (jdata->num_terminated == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + } + } + } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) { + /* update the proc state */ + pdata->state = state; + pdata->waitpid_recvd = true; + if (pdata->iof_complete) { + /* the proc has terminated */ + pdata->alive = false; + pdata->state = ORTE_PROC_STATE_TERMINATED; + /* return the allocated slot for reuse */ + cleanup_node(pdata); + /* Clean up the session directory as if we were the process + * itself. This covers the case where the process died abnormally + * and didn't cleanup its own session directory. + */ + orte_session_dir_finalize(proc); + /* track job status */ + jdata->num_terminated++; + if (jdata->num_terminated == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + } + } + } else if (ORTE_PROC_STATE_TERMINATED == state) { + /* update the proc state */ + pdata->state = state; + if (pdata->local_proc) { + /* Clean up the session directory as if we were the process + * itself. This covers the case where the process died abnormally + * and didn't cleanup its own session directory. + */ + orte_session_dir_finalize(proc); + } + /* return the allocated slot for reuse */ + cleanup_node(pdata); + /* track job status */ + jdata->num_terminated++; + if (jdata->num_terminated == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + } + } + + cleanup: + OBJ_RELEASE(caddy); +} + +void orte_state_base_check_all_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + + orte_proc_t *proc; + int i; + orte_std_cntr_t j; + orte_job_t *job; + orte_node_t *node; + orte_job_map_t *map; + orte_std_cntr_t index; + bool one_still_alive; + orte_vpid_t lowest=0; + + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_complete on job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); + + if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { + /* just check to see if the daemons are complete */ + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_complete - received NULL job, checking daemons", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + goto CHECK_DAEMONS; + } else { + /* mark the job as terminated, but don't override any + * abnormal termination flags + */ + if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) { + jdata->state = ORTE_JOB_STATE_TERMINATED; + } + } + + /* turn off any sensor monitors on this job */ + orte_sensor.stop(jdata->jobid); + + /* tell the IOF that the job is complete */ + if (NULL != orte_iof.complete) { + orte_iof.complete(jdata); + } + + if (0 < jdata->num_non_zero_exit && !orte_abort_non_zero_exit) { + if (!orte_report_child_jobs_separately || 1 == ORTE_LOCAL_JOBID(jdata->jobid)) { + /* update the exit code */ + ORTE_UPDATE_EXIT_STATUS(lowest); + } + + /* warn user */ + opal_output(orte_clean_output, + "-------------------------------------------------------\n" + "While %s job %s terminated normally, %d %s. Further examination may be required.\n" + "-------------------------------------------------------", + (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "the primary" : "child", + (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "" : ORTE_LOCAL_JOBID_PRINT(jdata->jobid), + jdata->num_non_zero_exit, + (1 == jdata->num_non_zero_exit) ? "process returned\na non-zero exit code." : + "processes returned\nnon-zero exit codes."); + } + + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed declared job %s normally terminated - checking all jobs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid))); + + /* if this job is a continuously operating one, then don't do + * anything further - just return here + */ + if (NULL != jdata && + (ORTE_JOB_CONTROL_CONTINUOUS_OP & jdata->controls || + ORTE_JOB_CONTROL_RECOVERABLE & jdata->controls)) { + goto CHECK_ALIVE; + } + + /* if the job that is being checked is the HNP, then we are + * trying to terminate the orteds. In that situation, we + * do -not- check all jobs - we simply notify the HNP + * that the orteds are complete. Also check special case + * if jdata is NULL - we want + * to definitely declare the job done if the orteds + * have completed, no matter what else may be happening. + * This can happen if a ctrl-c hits in the "wrong" place + * while launching + */ + CHECK_DAEMONS: + if (jdata == NULL || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { + if (0 == orte_routed.num_routes()) { + /* orteds are done! */ + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s orteds complete - exiting", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + if (NULL == jdata) { + jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + } + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_TERMINATED); + OBJ_RELEASE(caddy); + return; + } + OBJ_RELEASE(caddy); + return; + } + + /* Release the resources used by this job. Since some errmgrs may want + * to continue using resources allocated to the job as part of their + * fault recovery procedure, we only do this once the job is "complete". + * Note that an aborted/killed job -is- flagged as complete and will + * therefore have its resources released. We need to do this after + * we call the errmgr so that any attempt to restart the job will + * avoid doing so in the exact same place as the current job + */ + if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) { + map = jdata->map; + for (index = 0; index < map->nodes->size; index++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) { + continue; + } + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s releasing procs from node %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + node->name)); + for (i = 0; i < node->procs->size; i++) { + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { + continue; + } + if (proc->name.jobid != jdata->jobid) { + /* skip procs from another job */ + continue; + } + node->slots_inuse--; + node->num_procs--; + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s releasing proc %s from node %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->name), node->name)); + /* set the entry in the node array to NULL */ + opal_pointer_array_set_item(node->procs, i, NULL); + /* release the proc once for the map entry */ + OBJ_RELEASE(proc); + } + } + OBJ_RELEASE(map); + jdata->map = NULL; + } + + CHECK_ALIVE: + /* now check to see if all jobs are done - trigger notification of this jdata + * object when we find it + */ + one_still_alive = false; + for (j=1; j < orte_job_data->size; j++) { + if (NULL == (job = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, j))) { + /* since we are releasing jdata objects as we + * go, we can no longer assume that the job_data + * array is left justified + */ + continue; + } + /* if this is the job we are checking AND it normally terminated, + * then activate the "notify_completed" state - this will release + * the job state, but is provided so that the HNP main code can + * take alternative actions if desired. If the state is killed_by_cmd, + * then go ahead and release it. We cannot release it if it + * abnormally terminated as mpirun needs the info so it can + * report appropriately to the user + * + * NOTE: do not release the primary job (j=1) so we + * can pretty-print completion message + */ + if (NULL != jdata && job->jobid == jdata->jobid) { + if (jdata->state == ORTE_JOB_STATE_TERMINATED) { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed state is terminated - activating notify", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED); + one_still_alive = true; + } else if (jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD || + jdata->state == ORTE_JOB_STATE_NOTIFIED) { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed state is killed or notified - cleaning up", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* release this object, ensuring that the + * pointer array internal accounting + * is maintained! + */ + if (1 < j) { + opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ + OBJ_RELEASE(jdata); + } + } + continue; + } + /* if the job is flagged to not be monitored, skip it */ + if (ORTE_JOB_CONTROL_DO_NOT_MONITOR & job->controls) { + continue; + } + /* when checking for job termination, we must be sure to NOT check + * our own job as it - rather obviously - has NOT terminated! + */ + if (job->num_terminated < job->num_procs) { + /* we have at least one job that is not done yet - we cannot + * just return, though, as we need to ensure we cleanout the + * job data for the job that just completed + */ + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed job %s is not terminated (%d:%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(job->jobid), + job->num_terminated, job->num_procs)); + one_still_alive = true; + } + else { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed job %s is terminated (%d vs %d [%s])", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(job->jobid), + job->num_terminated, job->num_procs, + (NULL == jdata) ? "UNKNOWN" : orte_job_state_to_str(jdata->state) )); + } + } + /* if a job is still alive, we just return */ + if (one_still_alive) { + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed at least one job is not terminated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + OBJ_RELEASE(caddy); + return; + } + /* if we get here, then all jobs are done, so terminate */ + OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, + "%s state:base:check_job_completed all jobs terminated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* set the exit status to 0 - this will only happen if it + * wasn't already set by an error condition + */ + ORTE_UPDATE_EXIT_STATUS(0); + + /* order daemon termination - this tells us to cleanup + * our local procs as well as telling remote daemons + * to die + */ + orte_plm.terminate_orteds(); + + OBJ_RELEASE(caddy); +} diff --git a/orte/mca/state/base/state_private.h b/orte/mca/state/base/state_private.h index d87e4b4cf7..3eaf8e207e 100644 --- a/orte/mca/state/base/state_private.h +++ b/orte/mca/state/base/state_private.h @@ -79,6 +79,13 @@ ORTE_DECLSPEC int orte_state_base_remove_proc_state(orte_proc_state_t state); ORTE_DECLSPEC void orte_util_print_proc_state_machine(void); +/* common state processing functions */ +ORTE_DECLSPEC void orte_state_base_local_launch_complete(int fd, short argc, void *cbdata); +ORTE_DECLSPEC void orte_state_base_cleanup_job(int fd, short argc, void *cbdata); +ORTE_DECLSPEC void orte_state_base_report_progress(int fd, short argc, void *cbdata); +ORTE_DECLSPEC void orte_state_base_track_procs(int fd, short argc, void *cbdata); +ORTE_DECLSPEC void orte_state_base_check_all_complete(int fd, short args, void *cbdata); + END_C_DECLS #endif diff --git a/orte/mca/state/hnp/state_hnp.c b/orte/mca/state/hnp/state_hnp.c index c52daaad96..4045578b84 100644 --- a/orte/mca/state/hnp/state_hnp.c +++ b/orte/mca/state/hnp/state_hnp.c @@ -62,25 +62,6 @@ orte_state_base_module_t orte_state_hnp_module = { orte_state_base_remove_proc_state }; -static void local_launch_complete(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = state->jdata; - - if (orte_report_launch_progress) { - if (0 == jdata->num_daemons_reported % 100 || - jdata->num_daemons_reported == orte_process_info.num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REPORT_PROGRESS); - } - } - OBJ_RELEASE(state); -} - -static void track_procs(int fd, short argc, void *cbdata); -static void check_all_complete(int fd, short argc, void *cbdata); -static void report_progress(int fd, short argc, void *cbdata); -static void cleanup_job(int fd, short argc, void *cbdata); - /* defined default state machine sequence - individual * plm's must add a state for launching daemons */ @@ -88,10 +69,12 @@ static orte_job_state_t launch_states[] = { ORTE_JOB_STATE_INIT, ORTE_JOB_STATE_INIT_COMPLETE, ORTE_JOB_STATE_ALLOCATE, + ORTE_JOB_STATE_ALLOCATION_COMPLETE, ORTE_JOB_STATE_DAEMONS_LAUNCHED, ORTE_JOB_STATE_DAEMONS_REPORTED, ORTE_JOB_STATE_VM_READY, ORTE_JOB_STATE_MAP, + ORTE_JOB_STATE_MAP_COMPLETE, ORTE_JOB_STATE_SYSTEM_PREP, ORTE_JOB_STATE_LAUNCH_APPS, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, @@ -107,17 +90,19 @@ static orte_state_cbfunc_t launch_callbacks[] = { orte_plm_base_setup_job, orte_plm_base_setup_job_complete, orte_ras_base_allocate, + orte_plm_base_allocation_complete, orte_plm_base_daemons_launched, orte_plm_base_daemons_reported, orte_plm_base_vm_ready, orte_rmaps_base_map_job, + orte_plm_base_mapping_complete, orte_plm_base_complete_setup, orte_plm_base_launch_apps, - local_launch_complete, + orte_state_base_local_launch_complete, orte_plm_base_post_launch, orte_plm_base_registered, - check_all_complete, - cleanup_job, + orte_state_base_check_all_complete, + orte_state_base_cleanup_job, orte_quit, orte_quit }; @@ -130,11 +115,11 @@ static orte_proc_state_t proc_states[] = { ORTE_PROC_STATE_TERMINATED }; static orte_state_cbfunc_t proc_callbacks[] = { - track_procs, - track_procs, - track_procs, - track_procs, - track_procs + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs }; /************************ @@ -165,7 +150,7 @@ static int init(void) } /* add callback to report progress, if requested */ if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS, - report_progress, ORTE_ERROR_PRI))) { + orte_state_base_report_progress, ORTE_ERROR_PRI))) { ORTE_ERROR_LOG(rc); } if (5 < opal_output_get_verbosity(orte_state_base_output)) { @@ -202,409 +187,3 @@ static int finalize(void) return ORTE_SUCCESS; } - -static void cleanup_node(orte_proc_t *proc) -{ - orte_node_t *node; - orte_proc_t *p; - int i; - - if (NULL == (node = proc->node)) { - return; - } - node->num_procs--; - node->slots_inuse--; - for (i=0; i < node->procs->size; i++) { - if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { - continue; - } - if (p->name.jobid == proc->name.jobid && - p->name.vpid == proc->name.vpid) { - opal_pointer_array_set_item(node->procs, i, NULL); - OBJ_RELEASE(p); - break; - } - } -} - -static void report_progress(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = caddy->jdata; - - opal_output(orte_clean_output, "App launch reported: %d (out of %d) daemons - %d (out of %d) procs", - (int)jdata->num_daemons_reported, (int)orte_process_info.num_procs, - (int)jdata->num_launched, (int)jdata->num_procs); - OBJ_RELEASE(caddy); -} - -static void track_procs(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_process_name_t *proc = &caddy->name; - orte_proc_state_t state = caddy->proc_state; - orte_job_t *jdata; - orte_proc_t *pdata; - - OPAL_OUTPUT_VERBOSE((5, orte_state_base_output, - "%s state:hnp:track_procs called for proc %s state %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(proc), - orte_proc_state_to_str(state))); - - /* get the job object for this proc */ - if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - goto cleanup; - } - pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); - - if (ORTE_PROC_STATE_RUNNING == state) { - /* update the proc state */ - pdata->state = state; - jdata->num_launched++; - if (jdata->num_launched == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_RUNNING); - } - } else if (ORTE_PROC_STATE_REGISTERED == state) { - /* update the proc state */ - pdata->state = state; - jdata->num_reported++; - if (jdata->num_reported == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED); - } - } else if (ORTE_PROC_STATE_IOF_COMPLETE == state) { - /* update the proc state */ - pdata->state = state; - /* Release only the stdin IOF file descriptor for this child, if one - * was defined. File descriptors for the other IOF channels - stdout, - * stderr, and stddiag - were released when their associated pipes - * were cleared and closed due to termination of the process - */ - if (NULL != orte_iof.close) { - orte_iof.close(proc, ORTE_IOF_STDIN); - } - pdata->iof_complete = true; - if (pdata->waitpid_recvd) { - /* the proc has terminated */ - pdata->alive = false; - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } - } - } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) { - /* update the proc state */ - pdata->state = state; - pdata->waitpid_recvd = true; - if (pdata->iof_complete) { - /* the proc has terminated */ - pdata->alive = false; - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } - } - } else if (ORTE_PROC_STATE_TERMINATED == state) { - /* update the proc state */ - pdata->state = state; - if (pdata->local_proc) { - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - } - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } - } - - cleanup: - OBJ_RELEASE(caddy); -} - -static void cleanup_job(int fd, short argc, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = caddy->jdata; - - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:cleanup on job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); - - /* flag that we were notified */ - jdata->state = ORTE_JOB_STATE_NOTIFIED; - /* send us back thru job complete */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - OBJ_RELEASE(caddy); -} - -static void check_all_complete(int fd, short args, void *cbdata) -{ - orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; - orte_job_t *jdata = caddy->jdata; - - orte_proc_t *proc; - int i; - orte_std_cntr_t j; - orte_job_t *job; - orte_node_t *node; - orte_job_map_t *map; - orte_std_cntr_t index; - bool one_still_alive; - orte_vpid_t lowest=0; - - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_complete on job %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == jdata) ? "NULL" : ORTE_JOBID_PRINT(jdata->jobid))); - - if (NULL == jdata || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { - /* just check to see if the daemons are complete */ - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_complete - received NULL job, checking daemons", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - goto CHECK_DAEMONS; - } else { - /* mark the job as terminated, but don't override any - * abnormal termination flags - */ - if (jdata->state < ORTE_JOB_STATE_UNTERMINATED) { - jdata->state = ORTE_JOB_STATE_TERMINATED; - } - } - - /* turn off any sensor monitors on this job */ - orte_sensor.stop(jdata->jobid); - - /* tell the IOF that the job is complete */ - if (NULL != orte_iof.complete) { - orte_iof.complete(jdata); - } - - if (0 < jdata->num_non_zero_exit && !orte_abort_non_zero_exit) { - if (!orte_report_child_jobs_separately || 1 == ORTE_LOCAL_JOBID(jdata->jobid)) { - /* update the exit code */ - ORTE_UPDATE_EXIT_STATUS(lowest); - } - - /* warn user */ - opal_output(orte_clean_output, - "-------------------------------------------------------\n" - "While %s job %s terminated normally, %d %s. Further examination may be required.\n" - "-------------------------------------------------------", - (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "the primary" : "child", - (1 == ORTE_LOCAL_JOBID(jdata->jobid)) ? "" : ORTE_LOCAL_JOBID_PRINT(jdata->jobid), - jdata->num_non_zero_exit, - (1 == jdata->num_non_zero_exit) ? "process returned\na non-zero exit code." : - "processes returned\nnon-zero exit codes."); - } - - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed declared job %s normally terminated - checking all jobs", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jdata->jobid))); - - /* if this job is a continuously operating one, then don't do - * anything further - just return here - */ - if (NULL != jdata && - (ORTE_JOB_CONTROL_CONTINUOUS_OP & jdata->controls || - ORTE_JOB_CONTROL_RECOVERABLE & jdata->controls)) { - goto CHECK_ALIVE; - } - - /* if the job that is being checked is the HNP, then we are - * trying to terminate the orteds. In that situation, we - * do -not- check all jobs - we simply notify the HNP - * that the orteds are complete. Also check special case - * if jdata is NULL - we want - * to definitely declare the job done if the orteds - * have completed, no matter what else may be happening. - * This can happen if a ctrl-c hits in the "wrong" place - * while launching - */ - CHECK_DAEMONS: - if (jdata == NULL || jdata->jobid == ORTE_PROC_MY_NAME->jobid) { - if (0 == orte_routed.num_routes()) { - /* orteds are done! */ - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s orteds complete - exiting", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - if (NULL == jdata) { - jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); - } - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_TERMINATED); - OBJ_RELEASE(caddy); - return; - } - OBJ_RELEASE(caddy); - return; - } - - /* Release the resources used by this job. Since some errmgrs may want - * to continue using resources allocated to the job as part of their - * fault recovery procedure, we only do this once the job is "complete". - * Note that an aborted/killed job -is- flagged as complete and will - * therefore have its resources released. We need to do this after - * we call the errmgr so that any attempt to restart the job will - * avoid doing so in the exact same place as the current job - */ - if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) { - map = jdata->map; - for (index = 0; index < map->nodes->size; index++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) { - continue; - } - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s releasing procs from node %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - node->name)); - for (i = 0; i < node->procs->size; i++) { - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { - continue; - } - if (proc->name.jobid != jdata->jobid) { - /* skip procs from another job */ - continue; - } - node->slots_inuse--; - node->num_procs--; - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s releasing proc %s from node %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc->name), node->name)); - /* set the entry in the node array to NULL */ - opal_pointer_array_set_item(node->procs, i, NULL); - /* release the proc once for the map entry */ - OBJ_RELEASE(proc); - } - } - OBJ_RELEASE(map); - jdata->map = NULL; - } - - CHECK_ALIVE: - /* now check to see if all jobs are done - trigger notification of this jdata - * object when we find it - */ - one_still_alive = false; - for (j=1; j < orte_job_data->size; j++) { - if (NULL == (job = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, j))) { - /* since we are releasing jdata objects as we - * go, we can no longer assume that the job_data - * array is left justified - */ - continue; - } - /* if this is the job we are checking AND it normally terminated, - * then activate the "notify_completed" state - this will release - * the job state, but is provided so that the HNP main code can - * take alternative actions if desired. If the state is killed_by_cmd, - * then go ahead and release it. We cannot release it if it - * abnormally terminated as mpirun needs the info so it can - * report appropriately to the user - * - * NOTE: do not release the primary job (j=1) so we - * can pretty-print completion message - */ - if (NULL != jdata && job->jobid == jdata->jobid) { - if (jdata->state == ORTE_JOB_STATE_TERMINATED) { - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed state is terminated - activating notify", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED); - one_still_alive = true; - } else if (jdata->state == ORTE_JOB_STATE_KILLED_BY_CMD || - jdata->state == ORTE_JOB_STATE_NOTIFIED) { - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed state is killed or notified - cleaning up", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* release this object, ensuring that the - * pointer array internal accounting - * is maintained! - */ - if (1 < j) { - opal_pointer_array_set_item(orte_job_data, j, NULL); /* ensure the array has a NULL */ - OBJ_RELEASE(jdata); - } - } - continue; - } - /* if the job is flagged to not be monitored, skip it */ - if (ORTE_JOB_CONTROL_DO_NOT_MONITOR & job->controls) { - continue; - } - /* when checking for job termination, we must be sure to NOT check - * our own job as it - rather obviously - has NOT terminated! - */ - if (job->num_terminated < job->num_procs) { - /* we have at least one job that is not done yet - we cannot - * just return, though, as we need to ensure we cleanout the - * job data for the job that just completed - */ - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed job %s is not terminated (%d:%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(job->jobid), - job->num_terminated, job->num_procs)); - one_still_alive = true; - } - else { - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed job %s is terminated (%d vs %d [%s])", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(job->jobid), - job->num_terminated, job->num_procs, - (NULL == jdata) ? "UNKNOWN" : orte_job_state_to_str(jdata->state) )); - } - } - /* if a job is still alive, we just return */ - if (one_still_alive) { - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed at least one job is not terminated", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - OBJ_RELEASE(caddy); - return; - } - /* if we get here, then all jobs are done, so terminate */ - OPAL_OUTPUT_VERBOSE((2, orte_state_base_output, - "%s state:hnp:check_job_completed all jobs terminated", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* set the exit status to 0 - this will only happen if it - * wasn't already set by an error condition - */ - ORTE_UPDATE_EXIT_STATUS(0); - - /* order daemon termination - this tells us to cleanup - * our local procs as well as telling remote daemons - * to die - */ - orte_plm.terminate_orteds(); - - OBJ_RELEASE(caddy); -} diff --git a/orte/mca/state/hnp/state_hnp_component.c b/orte/mca/state/hnp/state_hnp_component.c index 0c6c147aaa..dae9b30472 100644 --- a/orte/mca/state/hnp/state_hnp_component.c +++ b/orte/mca/state/hnp/state_hnp_component.c @@ -57,7 +57,7 @@ orte_state_base_component_t mca_state_hnp_component = }, }; -static int my_priority=1000; +static int my_priority=60; static int state_hnp_open(void) { diff --git a/orte/mca/state/novm/Makefile.am b/orte/mca/state/novm/Makefile.am new file mode 100644 index 0000000000..8c4816ea78 --- /dev/null +++ b/orte/mca/state/novm/Makefile.am @@ -0,0 +1,35 @@ +# +# Copyright (c) 2011-2012 Los Alamos National Security, LLC. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + state_novm.h \ + state_novm_component.c \ + state_novm.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_state_novm_DSO +component_noinst = +component_install = mca_state_novm.la +else +component_noinst = libmca_state_novm.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_state_novm_la_SOURCES = $(sources) +mca_state_novm_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_state_novm_la_SOURCES =$(sources) +libmca_state_novm_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/state/novm/configure.m4 b/orte/mca/state/novm/configure.m4 new file mode 100644 index 0000000000..4a73bc7b24 --- /dev/null +++ b/orte/mca/state/novm/configure.m4 @@ -0,0 +1,19 @@ +# -*- shell-script -*- +# +# Copyright (c) 2011-2012 Los Alamos National Security, LLC. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# MCA_state_novm_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_orte_state_novm_CONFIG], [ + AC_CONFIG_FILES([orte/mca/state/novm/Makefile]) + + AS_IF([test "$orte_without_full_support" = 0], + [$1], + [$2]) +]) diff --git a/orte/mca/state/novm/state_novm.c b/orte/mca/state/novm/state_novm.c new file mode 100644 index 0000000000..2f46b8ec45 --- /dev/null +++ b/orte/mca/state/novm/state_novm.c @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#include +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ +#ifdef HAVE_STRING_H +#include +#endif + +#include "opal/util/output.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/iof/iof.h" +#include "orte/mca/plm/base/base.h" +#include "orte/mca/ras/base/base.h" +#include "orte/mca/rmaps/base/base.h" +#include "orte/mca/routed/routed.h" +#include "orte/mca/sensor/sensor.h" +#include "orte/util/session_dir.h" +#include "orte/runtime/orte_quit.h" + +#include "orte/mca/state/state.h" +#include "orte/mca/state/base/base.h" +#include "orte/mca/state/base/state_private.h" +#include "state_novm.h" + +/* + * Module functions: Global + */ +static int init(void); +static int finalize(void); + +/****************** + * NOVM module - just uses base functions after + * initializing the proc state machine. Job state + * machine is unused by application procs at this + * time. + ******************/ +orte_state_base_module_t orte_state_novm_module = { + init, + finalize, + orte_state_base_activate_job_state, + orte_state_base_add_job_state, + orte_state_base_set_job_state_callback, + orte_state_base_set_job_state_priority, + orte_state_base_remove_job_state, + orte_state_base_activate_proc_state, + orte_state_base_add_proc_state, + orte_state_base_set_proc_state_callback, + orte_state_base_set_proc_state_priority, + orte_state_base_remove_proc_state +}; + +static void allocation_complete(int fd, short args, void *cbdata); +static void map_complete(int fd, short args, void *cbdata); +static void vm_ready(int fd, short args, void *cbdata); + +/* defined state machine sequence for no VM - individual + * plm's must add a state for launching daemons + */ +static orte_job_state_t launch_states[] = { + ORTE_JOB_STATE_INIT, + ORTE_JOB_STATE_INIT_COMPLETE, + ORTE_JOB_STATE_ALLOCATE, + ORTE_JOB_STATE_ALLOCATION_COMPLETE, + ORTE_JOB_STATE_DAEMONS_LAUNCHED, + ORTE_JOB_STATE_DAEMONS_REPORTED, + ORTE_JOB_STATE_VM_READY, + ORTE_JOB_STATE_MAP, + ORTE_JOB_STATE_MAP_COMPLETE, + ORTE_JOB_STATE_SYSTEM_PREP, + ORTE_JOB_STATE_LAUNCH_APPS, + ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, + ORTE_JOB_STATE_RUNNING, + ORTE_JOB_STATE_REGISTERED, + /* termination states */ + ORTE_JOB_STATE_TERMINATED, + ORTE_JOB_STATE_NOTIFY_COMPLETED, + ORTE_JOB_STATE_ALL_JOBS_COMPLETE, + ORTE_JOB_STATE_DAEMONS_TERMINATED +}; +static orte_state_cbfunc_t launch_callbacks[] = { + orte_plm_base_setup_job, + orte_plm_base_setup_job_complete, + orte_ras_base_allocate, + allocation_complete, + orte_plm_base_daemons_launched, + orte_plm_base_daemons_reported, + vm_ready, + orte_rmaps_base_map_job, + map_complete, + orte_plm_base_complete_setup, + orte_plm_base_launch_apps, + orte_state_base_local_launch_complete, + orte_plm_base_post_launch, + orte_plm_base_registered, + orte_state_base_check_all_complete, + orte_state_base_cleanup_job, + orte_quit, + orte_quit +}; + +static orte_proc_state_t proc_states[] = { + ORTE_PROC_STATE_RUNNING, + ORTE_PROC_STATE_REGISTERED, + ORTE_PROC_STATE_IOF_COMPLETE, + ORTE_PROC_STATE_WAITPID_FIRED, + ORTE_PROC_STATE_TERMINATED +}; +static orte_state_cbfunc_t proc_callbacks[] = { + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs +}; + +/************************ + * API Definitions + ************************/ +static int init(void) +{ + int i, rc; + int num_states; + + /* setup the state machines */ + OBJ_CONSTRUCT(&orte_job_states, opal_list_t); + OBJ_CONSTRUCT(&orte_proc_states, opal_list_t); + + /* setup the job state machine */ + num_states = sizeof(launch_states) / sizeof(orte_job_state_t); + for (i=0; i < num_states; i++) { + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i], + launch_callbacks[i], + ORTE_SYS_PRI))) { + ORTE_ERROR_LOG(rc); + } + } + /* add a default error response */ + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT, + orte_quit, ORTE_ERROR_PRI))) { + ORTE_ERROR_LOG(rc); + } + /* add callback to report progress, if requested */ + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS, + orte_state_base_report_progress, ORTE_ERROR_PRI))) { + ORTE_ERROR_LOG(rc); + } + if (5 < opal_output_get_verbosity(orte_state_base_output)) { + orte_state_base_print_job_state_machine(); + } + + /* populate the proc state machine to allow us to + * track proc lifecycle changes + */ + num_states = sizeof(proc_states) / sizeof(orte_proc_state_t); + for (i=0; i < num_states; i++) { + if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i], + proc_callbacks[i], + ORTE_SYS_PRI))) { + ORTE_ERROR_LOG(rc); + } + } + if (5 < opal_output_get_verbosity(orte_state_base_output)) { + orte_state_base_print_proc_state_machine(); + } + + return ORTE_SUCCESS; +} + +static int finalize(void) +{ + opal_list_item_t *item; + + /* cleanup the proc state machine */ + while (NULL != (item = opal_list_remove_first(&orte_proc_states))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&orte_proc_states); + + return ORTE_SUCCESS; +} + +/* after we allocate, we need to map the processes + * so we know what nodes will be used + */ +static void allocation_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = state->jdata; + orte_job_t *daemons; + + jdata->state = ORTE_JOB_STATE_ALLOCATION_COMPLETE; + + /* get the daemon job object */ + if (NULL == (daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + goto done; + } + /* mark that we are not using a VM */ + daemons->controls |= ORTE_JOB_CONTROL_NO_VM; + +#if OPAL_HAVE_HWLOC + { + hwloc_topology_t t; + orte_node_t *node; + int i; + + /* ensure that all nodes point to our topology - we + * cannot support hetero nodes with this state machine + */ + t = (hwloc_topology_t)opal_pointer_array_get_item(orte_node_topologies, 0); + for (i=1; i < orte_node_pool->size; i++) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { + continue; + } + node->topology = t; + } + } +#endif + + /* move to the map stage */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP); + + done: + /* cleanup */ + OBJ_RELEASE(state); +} + +/* after we map, we are ready to launch the daemons */ +static void map_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = state->jdata; + + jdata->state = ORTE_JOB_STATE_MAP_COMPLETE; + /* move to the map stage */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LAUNCH_DAEMONS); + + /* cleanup */ + OBJ_RELEASE(state); +} + +static void vm_ready(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = state->jdata; + + /* now that the daemons are launched, we are ready + * to roll + */ + jdata->state = ORTE_JOB_STATE_VM_READY; + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SYSTEM_PREP); + + OBJ_RELEASE(state); +} diff --git a/orte/mca/state/novm/state_novm.h b/orte/mca/state/novm/state_novm.h new file mode 100644 index 0000000000..ddd153ecfe --- /dev/null +++ b/orte/mca/state/novm/state_novm.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. + * All rights reserved. + * + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + * + */ + +#ifndef MCA_STATE_NOVM_EXPORT_H +#define MCA_STATE_NOVM_EXPORT_H + +#include "orte_config.h" + +#include "orte/mca/state/state.h" + +BEGIN_C_DECLS + +/* + * Local Component structures + */ + +ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_novm_component; + +ORTE_DECLSPEC extern orte_state_base_module_t orte_state_novm_module; + +END_C_DECLS + +#endif /* MCA_STATE_NOVM_EXPORT_H */ diff --git a/orte/mca/state/novm/state_novm_component.c b/orte/mca/state/novm/state_novm_component.c new file mode 100644 index 0000000000..33b887c6ff --- /dev/null +++ b/orte/mca/state/novm/state_novm_component.c @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. + * All rights reserved. + * + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "opal/util/output.h" + +#include "orte/mca/state/state.h" +#include "orte/mca/state/base/base.h" +#include "state_novm.h" + +/* + * Public string for version number + */ +const char *orte_state_novm_component_version_string = + "ORTE STATE novm MCA component version " ORTE_VERSION; + +/* + * Local functionality + */ +static int state_novm_open(void); +static int state_novm_close(void); +static int state_novm_component_query(mca_base_module_t **module, int *priority); + +/* + * Instantiate the public struct with all of our public information + * and pointer to our public functions in it + */ +orte_state_base_component_t mca_state_novm_component = +{ + /* Handle the general mca_component_t struct containing + * meta information about the component + */ + { + ORTE_STATE_BASE_VERSION_1_0_0, + /* Component name and version */ + "novm", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + state_novm_open, + state_novm_close, + state_novm_component_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, +}; + +static int my_priority; + +static int state_novm_open(void) +{ + mca_base_component_t *c=&mca_state_novm_component.base_version; + + mca_base_param_reg_int(c, "priority", + "Selection priority", + false, false, 50, &my_priority); + return ORTE_SUCCESS; +} + +static int state_novm_close(void) +{ + return ORTE_SUCCESS; +} + +static int state_novm_component_query(mca_base_module_t **module, int *priority) +{ + if (ORTE_PROC_IS_HNP) { + /* set our priority mid-range so we'll be selected if user desires */ + *priority = my_priority; + *module = (mca_base_module_t *)&orte_state_novm_module; + return ORTE_SUCCESS; + } + + *priority = -1; + *module = NULL; + return ORTE_ERROR; +} diff --git a/orte/runtime/orte_globals.h b/orte/runtime/orte_globals.h index 232ca57a87..0ca96c026d 100644 --- a/orte/runtime/orte_globals.h +++ b/orte/runtime/orte_globals.h @@ -221,6 +221,7 @@ typedef uint16_t orte_job_controls_t; #define ORTE_JOB_CONTROL_MAPPER 0x0800 #define ORTE_JOB_CONTROL_REDUCER 0x1000 #define ORTE_JOB_CONTROL_COMBINER 0x2000 +#define ORTE_JOB_CONTROL_NO_VM 0x4000 /* global type definitions used by RTE - instanced in orte_globals.c */ diff --git a/orte/util/error_strings.c b/orte/util/error_strings.c index 63ac896516..d5e3a22ada 100644 --- a/orte/util/error_strings.c +++ b/orte/util/error_strings.c @@ -207,8 +207,12 @@ const char *orte_job_state_to_str(orte_job_state_t state) return "INIT_COMPLETE"; case ORTE_JOB_STATE_ALLOCATE: return "PENDING ALLOCATION"; + case ORTE_JOB_STATE_ALLOCATION_COMPLETE: + return "ALLOCATION COMPLETE"; case ORTE_JOB_STATE_MAP: return "PENDING MAPPING"; + case ORTE_JOB_STATE_MAP_COMPLETE: + return "MAP COMPLETE"; case ORTE_JOB_STATE_SYSTEM_PREP: return "PENDING FINAL SYSTEM PREP"; case ORTE_JOB_STATE_LAUNCH_DAEMONS: