From be02211b4f55d2bc6828b554ea7a484db593b50f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 5 Aug 2008 15:09:29 +0000 Subject: [PATCH] Modify the wakeup system to make it more Windows-friendly. This allows Shiqing to consolidate the Windows-specific modifications into one location, and generalizes the wakeup procedure in case we hit other system-specific requirements. This needs some soak time to ensure we haven't opened any race conditions. I tried to loop everything in the shutdown procedure through that trigger event call to ensure it all goes through the one-time locks as it did before so that someone hitting ctrl-c when we are already shutting down shouldn't cause problems. Just want to let people use it for awhile to verify. This commit was SVN r19159. --- orte/mca/errmgr/default/errmgr_default.c | 10 +-- orte/mca/filem/base/filem_base_receive.c | 12 ++-- orte/mca/plm/alps/plm_alps_module.c | 1 - orte/mca/plm/base/plm_base_heartbeat.c | 1 - orte/mca/plm/base/plm_base_launch_support.c | 8 +-- orte/mca/plm/base/plm_base_receive.c | 3 +- orte/mca/plm/lsf/plm_lsf_module.c | 1 - orte/mca/plm/process/plm_process_module.c | 1 - orte/mca/plm/rsh/plm_rsh_module.c | 1 - orte/mca/plm/slurm/plm_slurm_module.c | 1 - orte/mca/plm/submit/pls_submit_module.c | 1 - orte/mca/plm/tm/plm_tm_module.c | 1 - orte/mca/plm/tmd/plm_tmd_module.c | 1 - orte/mca/ras/base/ras_base_allocate.c | 4 +- orte/orted/orted_comm.c | 5 +- orte/orted/orted_main.c | 19 ++++-- orte/runtime/Makefile.am | 2 - orte/runtime/orte_globals.c | 3 +- orte/runtime/orte_globals.h | 35 +++++++++- orte/runtime/orte_locks.c | 2 - orte/runtime/orte_locks.h | 1 - orte/runtime/orte_wait.c | 47 ++++++++++--- orte/runtime/orte_wait.h | 24 ++++--- orte/runtime/orte_wakeup.c | 49 -------------- orte/runtime/orte_wakeup.h | 74 --------------------- orte/tools/orterun/orterun.c | 41 ++++-------- 26 files changed, 131 insertions(+), 217 deletions(-) delete mode 100644 orte/runtime/orte_wakeup.c delete mode 100644 orte/runtime/orte_wakeup.h diff --git a/orte/mca/errmgr/default/errmgr_default.c b/orte/mca/errmgr/default/errmgr_default.c index ec5b08d463..94d7903f0b 100644 --- a/orte/mca/errmgr/default/errmgr_default.c +++ b/orte/mca/errmgr/default/errmgr_default.c @@ -29,7 +29,7 @@ #include "orte/runtime/runtime.h" #include "orte/runtime/orte_globals.h" -#include "orte/runtime/orte_wakeup.h" +#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_locks.h" #include "orte/mca/plm/plm.h" #include "orte/util/session_dir.h" @@ -101,9 +101,7 @@ void orte_errmgr_default_proc_aborted(orte_process_name_t *name, int exit_code) ORTE_UPDATE_EXIT_STATUS(exit_code); /* wakeup orterun so we can exit */ - if (ORTE_SUCCESS != (rc = orte_wakeup())) { - ORTE_ERROR_LOG(rc); - } + orte_trigger_event(&orte_exit); } /* @@ -144,9 +142,7 @@ void orte_errmgr_default_incomplete_start(orte_jobid_t job, int exit_code) ORTE_UPDATE_EXIT_STATUS(exit_code); /* wakeup orterun so we can exit */ - if (ORTE_SUCCESS != (rc = orte_wakeup())) { - ORTE_ERROR_LOG(rc); - } + orte_trigger_event(&orte_exit); } /* diff --git a/orte/mca/filem/base/filem_base_receive.c b/orte/mca/filem/base/filem_base_receive.c index 61cd351af8..734c294698 100644 --- a/orte/mca/filem/base/filem_base_receive.c +++ b/orte/mca/filem/base/filem_base_receive.c @@ -47,7 +47,7 @@ #include "orte/mca/rml/rml.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" -#include "orte/runtime/orte_wakeup.h" +#include "orte/runtime/orte_wait.h" #include "orte/mca/filem/filem.h" #include "orte/mca/filem/base/base.h" @@ -190,7 +190,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende if (NULL == (jdata = orte_get_job_data_object(name.jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_UPDATE_EXIT_STATUS(1); - orte_wakeup(); + orte_trigger_event(&orte_exit); goto CLEANUP; } /* get the proc object for it */ @@ -198,7 +198,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende if (NULL == procs[name.vpid] || NULL == procs[name.vpid]->node) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_UPDATE_EXIT_STATUS(1); - orte_wakeup(); + orte_trigger_event(&orte_exit); goto CLEANUP; } @@ -208,7 +208,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &(procs[name.vpid]->node->name), 1, OPAL_STRING))) { ORTE_ERROR_LOG(rc); ORTE_UPDATE_EXIT_STATUS(1); - orte_wakeup(); + orte_trigger_event(&orte_exit); goto CLEANUP; } @@ -294,13 +294,13 @@ static void filem_base_process_get_remote_path_cmd(orte_process_name_t* sender, if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &tmp_name, 1, OPAL_STRING))) { ORTE_ERROR_LOG(rc); ORTE_UPDATE_EXIT_STATUS(1); - orte_wakeup(); + orte_trigger_event(&orte_exit); goto CLEANUP; } if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &file_type, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); ORTE_UPDATE_EXIT_STATUS(1); - orte_wakeup(); + orte_trigger_event(&orte_exit); goto CLEANUP; } diff --git a/orte/mca/plm/alps/plm_alps_module.c b/orte/mca/plm/alps/plm_alps_module.c index 84eefcad30..789357dc81 100644 --- a/orte/mca/plm/alps/plm_alps_module.c +++ b/orte/mca/plm/alps/plm_alps_module.c @@ -61,7 +61,6 @@ #include "orte/runtime/orte_globals.h" #include "orte/util/name_fns.h" #include "orte/runtime/runtime.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/rmaps.h" diff --git a/orte/mca/plm/base/plm_base_heartbeat.c b/orte/mca/plm/base/plm_base_heartbeat.c index 1c4635b4fb..d92a44d492 100644 --- a/orte/mca/plm/base/plm_base_heartbeat.c +++ b/orte/mca/plm/base/plm_base_heartbeat.c @@ -31,7 +31,6 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/runtime/orte_globals.h" #include "orte/util/show_help.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wait.h" #include "orte/util/name_fns.h" diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index e28f718379..a526785dad 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -43,7 +43,6 @@ #if OPAL_ENABLE_FT == 1 #include "orte/mca/snapc/snapc.h" #endif -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_locks.h" @@ -299,7 +298,7 @@ void orte_plm_base_launch_failed(orte_jobid_t job, pid_t pid, WAKEUP: /* set orterun's exit code and wakeup so it can exit */ ORTE_UPDATE_EXIT_STATUS(status); - orte_wakeup(); + orte_trigger_event(&orte_exit); } static void process_orted_launch_report(int fd, short event, void *data) @@ -1035,9 +1034,8 @@ CHECK_ALL_JOBS: jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); if (jdata->num_terminated >= jdata->num_procs) { /* orteds are done! */ - int data=1; jdata->state = ORTE_JOB_STATE_TERMINATED; - write(orteds_exit, &data, sizeof(int)); + orte_trigger_event(&orteds_exit); return; } } @@ -1067,7 +1065,7 @@ CHECK_ALL_JOBS: OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:check_job_completed all jobs terminated - waking up", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - orte_wakeup(); + orte_trigger_event(&orte_exit); } } diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index dfbf750ee3..88dda842ac 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -45,7 +45,6 @@ #include "orte/mca/routed/routed.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/plm/plm_types.h" @@ -279,7 +278,7 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data) /* see if an error occurred - if so, wakeup so we can exit */ if (ORTE_SUCCESS != rc) { - orte_wakeup(); + orte_trigger_event(&orte_exit); } } diff --git a/orte/mca/plm/lsf/plm_lsf_module.c b/orte/mca/plm/lsf/plm_lsf_module.c index 348544001d..17feb3e482 100644 --- a/orte/mca/plm/lsf/plm_lsf_module.c +++ b/orte/mca/plm/lsf/plm_lsf_module.c @@ -64,7 +64,6 @@ #include "orte/util/show_help.h" #include "orte/runtime/runtime.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/rml/rml.h" #include "orte/mca/errmgr/errmgr.h" diff --git a/orte/mca/plm/process/plm_process_module.c b/orte/mca/plm/process/plm_process_module.c index f0aefc4b93..5ac128d028 100644 --- a/orte/mca/plm/process/plm_process_module.c +++ b/orte/mca/plm/process/plm_process_module.c @@ -73,7 +73,6 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "opal/dss/dss.h" #include "orte/mca/rml/rml.h" diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index 2ffce13f7d..2073a41939 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -74,7 +74,6 @@ #include "orte/util/show_help.h" #include "orte/util/session_dir.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_globals.h" #include "orte/util/name_fns.h" #include "orte/util/nidmap.h" diff --git a/orte/mca/plm/slurm/plm_slurm_module.c b/orte/mca/plm/slurm/plm_slurm_module.c index a2eded127a..d258e55627 100644 --- a/orte/mca/plm/slurm/plm_slurm_module.c +++ b/orte/mca/plm/slurm/plm_slurm_module.c @@ -61,7 +61,6 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/rmaps.h" diff --git a/orte/mca/plm/submit/pls_submit_module.c b/orte/mca/plm/submit/pls_submit_module.c index f07dece7de..734aeb29a8 100644 --- a/orte/mca/plm/submit/pls_submit_module.c +++ b/orte/mca/plm/submit/pls_submit_module.c @@ -66,7 +66,6 @@ #include "orte/util/session_dir.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/params.h" #include "opal/dss/dss.h" diff --git a/orte/mca/plm/tm/plm_tm_module.c b/orte/mca/plm/tm/plm_tm_module.c index b2c734e545..6c09839d66 100644 --- a/orte/mca/plm/tm/plm_tm_module.c +++ b/orte/mca/plm/tm/plm_tm_module.c @@ -65,7 +65,6 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/rmaps.h" diff --git a/orte/mca/plm/tmd/plm_tmd_module.c b/orte/mca/plm/tmd/plm_tmd_module.c index 9882fc545f..654a78bc66 100644 --- a/orte/mca/plm/tmd/plm_tmd_module.c +++ b/orte/mca/plm/tmd/plm_tmd_module.c @@ -65,7 +65,6 @@ #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/rmaps.h" diff --git a/orte/mca/ras/base/ras_base_allocate.c b/orte/mca/ras/base/ras_base_allocate.c index 50460b2b7c..6c1744a00c 100644 --- a/orte/mca/ras/base/ras_base_allocate.c +++ b/orte/mca/ras/base/ras_base_allocate.c @@ -29,7 +29,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" -#include "orte/runtime/orte_wakeup.h" +#include "orte/runtime/orte_wait.h" #include "orte/util/hostfile/hostfile.h" #include "orte/util/dash_host/dash_host.h" #include "orte/util/proc_info.h" @@ -118,7 +118,7 @@ int orte_ras_base_allocate(orte_job_t *jdata) OBJ_DESTRUCT(&nodes); orte_show_help("help-ras-base.txt", "ras-base:no-allocation", true); ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); - orte_wakeup(); + orte_trigger_event(&orte_exit); return ORTE_ERROR; } diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 263dca7992..28fc685ac4 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -74,7 +74,6 @@ #include "orte/runtime/runtime.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/orted/orted.h" @@ -640,7 +639,7 @@ static int process_commands(orte_process_name_t* sender, * NOTE: this event will fire -after- any zero-time events * so any pending relays -do- get sent first */ - orte_wakeup(); + orte_trigger_event(&orte_exit); return ORTE_SUCCESS; break; @@ -654,7 +653,7 @@ static int process_commands(orte_process_name_t* sender, * NOTE: this event will fire -after- any zero-time events * so any pending relays -do- get sent first */ - orte_wakeup(); + orte_trigger_event(&orte_exit); return ORTE_SUCCESS; break; diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index bc6273a72f..94fb3dc3d9 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -94,6 +94,7 @@ char *log_path = NULL; static opal_event_t *orted_exit_event; static void shutdown_callback(int fd, short flags, void *arg); +static void shutdown_signal(int fd, short flags, void *arg); static void signal_callback(int fd, short event, void *arg); static void clean_fail(int fd, short flags, void *arg); @@ -402,10 +403,10 @@ int orte_daemon(int argc, char *argv[]) * after ourselves. */ opal_event_set(&term_handler, SIGTERM, OPAL_EV_SIGNAL, - shutdown_callback, NULL); + shutdown_signal, NULL); opal_event_add(&term_handler, NULL); opal_event_set(&int_handler, SIGINT, OPAL_EV_SIGNAL, - shutdown_callback, NULL); + shutdown_signal, NULL); opal_event_add(&int_handler, NULL); #ifndef __WINDOWS__ @@ -666,15 +667,19 @@ static void clean_fail(int fd, short flags, void *arg) exit(ORTE_ERROR_DEFAULT_EXIT_CODE); } +static void shutdown_signal(int fd, short flags, void *arg) +{ + /* trigger the call to shutdown callback to protect + * against race conditions - the trigger event will + * check the one-time lock + */ + orte_trigger_event(&orte_exit); +} + static void shutdown_callback(int fd, short flags, void *arg) { int ret; - /* protect against multiple calls */ - if (!opal_atomic_trylock(&orted_exit_lock)) { /* returns 1 if already locked */ - return; - } - if (NULL != arg) { /* it's the singleton pipe... remove that handler */ opal_event_del(&pipe_handler); diff --git a/orte/runtime/Makefile.am b/orte/runtime/Makefile.am index 2d6a0431ab..373e1cb4b2 100644 --- a/orte/runtime/Makefile.am +++ b/orte/runtime/Makefile.am @@ -36,7 +36,6 @@ if !ORTE_DISABLE_FULL_SUPPORT headers += \ runtime/runtime_internals.h \ runtime/orte_wait.h \ - runtime/orte_wakeup.h \ runtime/orte_globals.h \ runtime/orte_globals_class_instances.h \ runtime/orte_cr.h \ @@ -53,7 +52,6 @@ libopen_rte_la_SOURCES += \ runtime/data_type_support/orte_dt_packing_fns.c \ runtime/data_type_support/orte_dt_unpacking_fns.c \ runtime/orte_wait.c \ - runtime/orte_wakeup.c \ runtime/orte_cr.c \ runtime/orte_data_server.c endif diff --git a/orte/runtime/orte_globals.c b/orte/runtime/orte_globals.c index 9003cf8064..5074654e9a 100644 --- a/orte/runtime/orte_globals.c +++ b/orte/runtime/orte_globals.c @@ -68,7 +68,8 @@ bool orte_allocation_required; char *orte_launch_agent; char **orted_cmd_line=NULL; -int orte_exit, orteds_exit; + +orte_trigger_event_t orte_exit, orteds_exit; int orte_exit_status = 0; bool orte_abnormal_term_ordered = false; bool orte_shutdown_in_progress = false; diff --git a/orte/runtime/orte_globals.h b/orte/runtime/orte_globals.h index c05599dd16..9849ecf3ab 100644 --- a/orte/runtime/orte_globals.h +++ b/orte/runtime/orte_globals.h @@ -42,6 +42,7 @@ #include "orte/mca/rmaps/rmaps_types.h" #include "orte/util/proc_info.h" #include "orte/runtime/runtime.h" +#include "orte/runtime/orte_wait.h" #define ORTE_GLOBAL_ARRAY_BLOCK_SIZE 64 #define ORTE_GLOBAL_ARRAY_MAX_SIZE INT_MAX @@ -51,6 +52,36 @@ BEGIN_C_DECLS +/** + * Define a macro for updating the orte_exit_status + * The macro provides a convenient way of doing this + * so that we can add thread locking at some point + * since the orte_exit_status is a global variable. + * + * Ensure that we do not overwrite the exit status if it has + * already been set to some non-zero value. If we don't make + * this check, then different parts of the code could overwrite + * each other's exit status in the case of abnormal termination. + * + * For example, if a process aborts, we would record the initial + * exit code from the aborted process. However, subsequent processes + * will have been aborted by signal as we kill the job. We don't want + * the subsequent processes to overwrite the original exit code so + * we can tell the user the exit code from the process that caused + * the whole thing to happen. + */ +#define ORTE_UPDATE_EXIT_STATUS(newstatus) \ + do { \ + if (0 == orte_exit_status && 0 != newstatus) { \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "%s:%s(%d) updating exit status to %d", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, newstatus)); \ + orte_exit_status = newstatus; \ + } \ + } while(0); + + ORTE_DECLSPEC extern bool orte_help_want_aggregate; ORTE_DECLSPEC extern char *orte_prohibited_session_dirs; @@ -353,7 +384,9 @@ ORTE_DECLSPEC extern bool orte_allocation_required; ORTE_DECLSPEC extern char *orte_launch_agent; ORTE_DECLSPEC extern char **orted_cmd_line; -ORTE_DECLSPEC extern int orte_exit, orteds_exit; + +/* exit triggers and flags */ +ORTE_DECLSPEC extern orte_trigger_event_t orte_exit, orteds_exit; ORTE_DECLSPEC extern int orte_exit_status; ORTE_DECLSPEC extern bool orte_abnormal_term_ordered; ORTE_DECLSPEC extern bool orte_shutdown_in_progress; diff --git a/orte/runtime/orte_locks.c b/orte/runtime/orte_locks.c index d9d792fc07..8f1317f55d 100644 --- a/orte/runtime/orte_locks.c +++ b/orte/runtime/orte_locks.c @@ -29,7 +29,6 @@ opal_atomic_lock_t orte_finalize_lock; opal_atomic_lock_t orted_exit_lock; /* for HNPs */ -opal_atomic_lock_t orte_wakeup_lock; opal_atomic_lock_t orte_job_complete_lock; opal_atomic_lock_t orte_terminate_lock; opal_atomic_lock_t orte_abort_inprogress_lock; @@ -44,7 +43,6 @@ int orte_locks_init(void) opal_atomic_init(&orted_exit_lock, OPAL_ATOMIC_UNLOCKED); /* for HNPs */ - opal_atomic_init(&orte_wakeup_lock, OPAL_ATOMIC_UNLOCKED); opal_atomic_init(&orte_job_complete_lock, OPAL_ATOMIC_UNLOCKED); opal_atomic_init(&orte_terminate_lock, OPAL_ATOMIC_UNLOCKED); opal_atomic_init(&orte_abort_inprogress_lock, OPAL_ATOMIC_UNLOCKED); diff --git a/orte/runtime/orte_locks.h b/orte/runtime/orte_locks.h index 5b2850add4..29b0b20bc9 100644 --- a/orte/runtime/orte_locks.h +++ b/orte/runtime/orte_locks.h @@ -38,7 +38,6 @@ ORTE_DECLSPEC extern opal_atomic_lock_t orte_finalize_lock; ORTE_DECLSPEC extern opal_atomic_lock_t orted_exit_lock; /* for HNPs */ -ORTE_DECLSPEC extern opal_atomic_lock_t orte_wakeup_lock; ORTE_DECLSPEC extern opal_atomic_lock_t orte_job_complete_lock; ORTE_DECLSPEC extern opal_atomic_lock_t orte_terminate_lock; ORTE_DECLSPEC extern opal_atomic_lock_t orte_abort_inprogress_lock; diff --git a/orte/runtime/orte_wait.c b/orte/runtime/orte_wait.c index f021fc4d3c..585a772ecb 100644 --- a/orte/runtime/orte_wait.c +++ b/orte/runtime/orte_wait.c @@ -48,13 +48,14 @@ #endif #include "opal/dss/dss_types.h" -#include "orte/util/show_help.h" #include "opal/class/opal_object.h" #include "opal/class/opal_list.h" #include "opal/event/event.h" #include "opal/threads/mutex.h" #include "opal/threads/condition.h" +#include "opal/sys/atomic.h" +#include "orte/util/show_help.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" @@ -95,6 +96,24 @@ OBJ_CLASS_INSTANCE(orte_message_event_t, message_event_constructor, message_event_destructor); + +static void trigger_event_destructor(orte_trigger_event_t *trig) +{ + if (0 <= trig->channel) { + close(trig->channel); + } +} + +static void trigger_event_constructor(orte_trigger_event_t *trig) +{ + trig->channel = -1; + opal_atomic_init(&trig->lock, OPAL_ATOMIC_UNLOCKED); +} +OBJ_CLASS_INSTANCE(orte_trigger_event_t, + opal_object_t, + trigger_event_constructor, + trigger_event_destructor); + #ifdef HAVE_WAITPID static volatile int cb_enabled = true; @@ -458,7 +477,7 @@ orte_wait_cb_enable() } -int orte_wait_event(opal_event_t **event, int *trig, +int orte_wait_event(opal_event_t **event, orte_trigger_event_t *trig, void (*cbfunc)(int, short, void*)) { int p[2]; @@ -471,8 +490,11 @@ int orte_wait_event(opal_event_t **event, int *trig, /* create the event */ *event = (opal_event_t*)malloc(sizeof(opal_event_t)); + /* setup the trigger and its associated lock */ + OBJ_CONSTRUCT(trig, orte_trigger_event_t); + /* pass back the write end of the pipe */ - *trig = p[1]; + trig->channel = p[1]; /* define the event to fire when someone writes to the pipe */ opal_event_set(*event, p[0], OPAL_EV_READ, cbfunc, NULL); @@ -485,11 +507,15 @@ int orte_wait_event(opal_event_t **event, int *trig, } -void orte_trigger_event(int trig) +void orte_trigger_event(orte_trigger_event_t *trig) { int data=1; - write(trig, &data, sizeof(int)); + if (!opal_atomic_trylock(&trig->lock)) { /* returns 1 if already locked */ + return; + } + + write(trig->channel, &data, sizeof(int)); opal_progress(); } @@ -792,11 +818,15 @@ orte_wait_finalize(void) return ORTE_SUCCESS; } -void orte_trigger_event(int trig) +void orte_trigger_event(orte_trigger_event_t *trig) { int data=1; - write(trig, &data, sizeof(int)); + if (!opal_atomic_trylock(&trig->lock)) { /* returns 1 if already locked */ + return; + } + + write(trig->channel, &data, sizeof(int)); opal_progress(); } @@ -1100,8 +1130,7 @@ orte_wait_cb_enable(void) return ORTE_ERR_NOT_SUPPORTED; } -void -orte_trigger_event(int trig) +void orte_trigger_event(orte_trigger_event_t *trig) { } diff --git a/orte/runtime/orte_wait.h b/orte/runtime/orte_wait.h index d871ed5439..23d4eae08e 100644 --- a/orte/runtime/orte_wait.h +++ b/orte/runtime/orte_wait.h @@ -38,15 +38,22 @@ #include "opal/dss/dss.h" #include "opal/class/opal_list.h" -#include "orte/util/show_help.h" +#include "opal/sys/atomic.h" #include "opal/event/event.h" #include "opal/runtime/opal_progress.h" +#include "orte/util/show_help.h" #include "orte/mca/rml/rml_types.h" -#include "orte/runtime/orte_globals.h" BEGIN_C_DECLS +typedef struct { + opal_object_t super; + int channel; + opal_atomic_lock_t lock; +} orte_trigger_event_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_trigger_event_t); + /** typedef for callback function used in \c ompi_rte_wait_cb */ typedef void (*orte_wait_fn_t)(pid_t wpid, int status, void *data); @@ -99,17 +106,18 @@ ORTE_DECLSPEC int orte_wait_cb_enable(void); /** * Setup to wait for an event * - * This function is used to setup a pipe that can be used elsewhere + * This function is used to setup a trigger event that can be used elsewhere * in the code base where we want to wait for some event to * happen. For example, orterun uses this function to setup an event * that is used to notify orterun of abnormal and normal termination * so it can wakeup and exit cleanly. * - * The event will be defined so that a write to the provided trigger - * pipe will cause the event to trigger and callback to the provided + * The event will be defined so that firing the provided trigger + * will cause the event to trigger and callback to the provided * function */ -ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig, +ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, + orte_trigger_event_t *trig, void (*cbfunc)(int, short, void*)); /** @@ -136,9 +144,9 @@ ORTE_DECLSPEC int orte_wait_event(opal_event_t **event, int *trig, * Trigger a defined event * * This function will trigger a previously-defined event - as setup - * by orte_wait_event - by sending a message to the provided pipe + * by orte_wait_event - by firing the provided trigger */ -ORTE_DECLSPEC void orte_trigger_event(int trig); +ORTE_DECLSPEC void orte_trigger_event(orte_trigger_event_t *trig); /** * Setup an event to process a message diff --git a/orte/runtime/orte_wakeup.c b/orte/runtime/orte_wakeup.c deleted file mode 100644 index 2c5254b3cb..0000000000 --- a/orte/runtime/orte_wakeup.c +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana - * University Research and Technology - * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University - * of Tennessee Research Foundation. All rights - * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - * - */ - -#include "orte_config.h" -#include "orte/constants.h" - -#ifdef HAVE_UNISTD_H -#include -#endif - -#include "orte/util/show_help.h" -#include "opal/threads/condition.h" -#include "opal/runtime/opal_progress.h" - -#include "orte/runtime/orte_globals.h" -#include "orte/util/name_fns.h" -#include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_locks.h" - -#include "orte/runtime/orte_wakeup.h" - -int orte_wakeup(void) -{ - /* set the exit status and trigger the - * exit procedure - */ - if (!opal_atomic_trylock(&orte_wakeup_lock)) { /* returns 1 if already locked */ - return ORTE_SUCCESS; - } - - orte_trigger_event(orte_exit); - return ORTE_SUCCESS; -} diff --git a/orte/runtime/orte_wakeup.h b/orte/runtime/orte_wakeup.h deleted file mode 100644 index f62c525479..0000000000 --- a/orte/runtime/orte_wakeup.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana - * University Research and Technology - * Corporation. All rights reserved. - * Copyright (c) 2004-2006 The University of Tennessee and The University - * of Tennessee Research Foundation. All rights - * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -/** - * @file - * - * Interface for forcibly waking up orterun. - */ -#ifndef ORTE_WAKEUP_H -#define ORTE_WAKEUP_H - -#include "orte_config.h" -#include "orte/types.h" - -#include "orte/util/show_help.h" - -#include "orte/runtime/orte_globals.h" -#include "orte/util/name_fns.h" - -BEGIN_C_DECLS - -/** - * Define a macro for updating the orte_exit_status - * The macro provides a convenient way of doing this - * so that we can add thread locking at some point - * since the orte_exit_status is a global variable. - * - * Ensure that we do not overwrite the exit status if it has - * already been set to some non-zero value. If we don't make - * this check, then different parts of the code could overwrite - * each other's exit status in the case of abnormal termination. - * - * For example, if a process aborts, we would record the initial - * exit code from the aborted process. However, subsequent processes - * will have been aborted by signal as we kill the job. We don't want - * the subsequent processes to overwrite the original exit code so - * we can tell the user the exit code from the process that caused - * the whole thing to happen. - */ -#define ORTE_UPDATE_EXIT_STATUS(newstatus) \ - do { \ - if (0 == orte_exit_status && 0 != newstatus) { \ - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ - "%s:%s(%d) updating exit status to %d", \ - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ - __FILE__, __LINE__, newstatus)); \ - orte_exit_status = newstatus; \ - } \ - } while(0); - - -/** - * Wakeup orterun by reporting the termination of all processes - */ -ORTE_DECLSPEC int orte_wakeup(void); - -END_C_DECLS - -#endif /* #ifndef ORTE_WAKEUP_H */ diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index cc9c170afc..20e8758d02 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -82,7 +82,6 @@ #include "orte/runtime/runtime.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" -#include "orte/runtime/orte_wakeup.h" #include "orte/runtime/orte_data_server.h" #include "orte/runtime/orte_locks.h" @@ -317,6 +316,7 @@ static opal_cmd_line_init_t cmd_line_init[] = { */ static void job_completed(int trigpipe, short event, void *arg); static void terminated(int trigpipe, short event, void *arg); +static void timeout_callback(int fd, short ign, void *arg); static void abort_signal_callback(int fd, short flags, void *arg); static void abort_exit_callback(int fd, short event, void *arg); static void signal_forward_callback(int fd, short event, void *arg); @@ -625,21 +625,14 @@ static void job_completed(int trigpipe, short event, void *arg) orte_job_state_t exit_state; orte_job_t *daemons; - /* flag that we are here to avoid doing it twice */ - if (!opal_atomic_trylock(&orte_job_complete_lock)) { /* returns 1 if already locked */ - return; - } - /* if the abort exit event is set, delete it */ if (NULL != abort_exit_event) { opal_evtimer_del(abort_exit_event); free(abort_exit_event); } - /* close the trigger pipe */ - if (0 <= trigpipe) { - close(trigpipe); - } + /* cleanup the trigger */ + OBJ_DESTRUCT(&orte_exit); exit_state = jdata->state; @@ -690,7 +683,7 @@ static void job_completed(int trigpipe, short event, void *arg) } ORTE_DETECT_TIMEOUT(&timeout_ev, daemons->num_procs, orte_timeout_usec_per_proc, - orte_max_timeout, terminated); + orte_max_timeout, timeout_callback); } /* now wait to hear it has been done */ @@ -718,15 +711,8 @@ static void terminated(int trigpipe, short event, void *arg) orte_proc_t **procs; orte_vpid_t i; - /* flag that we are here to avoid doing it twice */ - if (!opal_atomic_trylock(&orte_terminate_lock)) { /* returns 1 if already locked */ - return; - } - - /* close the trigger pipe so it cannot be called again */ - if (0 <= trigpipe) { - close(trigpipe); - } + /* cleanup the trigger */ + OBJ_DESTRUCT(&orteds_exit); /* clear the event timer */ if (NULL != timeout_ev) { @@ -948,10 +934,10 @@ static void dump_aborted_procs(void) static void timeout_callback(int fd, short ign, void *arg) { - /* just call terminated so we don't loop back into - * trying to kill things + /* fire the trigger that takes us to terminated so we don't + * loop back into trying to kill things */ - terminated(-1, 0, NULL); + orte_trigger_event(&orteds_exit); } static void abort_exit_callback(int fd, short ign, void *arg) @@ -984,16 +970,13 @@ static void abort_exit_callback(int fd, short ign, void *arg) * need to explicitly wake ourselves up to exit */ ORTE_UPDATE_EXIT_STATUS(ret); - orte_wakeup(); + orte_trigger_event(&orte_exit); } /* give ourselves a time limit on how long to wait * for the job to die, just in case we can't make it go * away for some reason. Don't send us directly back - * to job_completed, though, as that function expects - * to be triggered via orte_wakeup - we could get into - * race conditions, and the timeout won't provide - * that function with the orte_exit pipe fd so it can - * be closed + * to job_completed, though, as that function may be + * what has failed */ ORTE_DETECT_TIMEOUT(&abort_exit_event, jdata->num_procs, orte_timeout_usec_per_proc,