diff --git a/opal/runtime/opal_progress.c b/opal/runtime/opal_progress.c index 3d93662039..ff1ae63899 100644 --- a/opal/runtime/opal_progress.c +++ b/opal/runtime/opal_progress.c @@ -70,6 +70,12 @@ static int32_t event_progress_delta = 0; be every time */ static int32_t num_event_users = 0; +/* How deep are we in opal_progress recursion? */ +#if OMPI_HAVE_THREAD_SUPPORT +volatile +#endif +uint32_t opal_progress_recursion_depth_counter = 0; + #if OMPI_ENABLE_DEBUG static int debug_output = -1; #endif @@ -161,6 +167,11 @@ opal_progress(void) size_t i; int events = 0; +#if OMPI_HAVE_THREAD_SUPPORT + opal_atomic_add(&opal_progress_recursion_depth_counter, 1); +#else + ++opal_progress_recursion_depth_counter; +#endif if( opal_progress_event_flag != 0 ) { #if (OMPI_ENABLE_PROGRESS_THREADS == 0) && OPAL_HAVE_WORKING_EVENTOPS #if OPAL_PROGRESS_USE_TIMERS @@ -210,6 +221,11 @@ opal_progress(void) #endif /* defined(__WINDOWS__) */ } #endif /* defined(__WINDOWS__) || defined(HAVE_SCHED_YIELD) */ +#if OMPI_HAVE_THREAD_SUPPORT + opal_atomic_add(&opal_progress_recursion_depth_counter, -1); +#else + --opal_progress_recursion_depth_counter; +#endif } diff --git a/opal/runtime/opal_progress.h b/opal/runtime/opal_progress.h index 29d4ffb02b..f1224caa2e 100644 --- a/opal/runtime/opal_progress.h +++ b/opal/runtime/opal_progress.h @@ -202,6 +202,29 @@ static inline bool opal_progress_spin(volatile bool* complete) } +/** + * \internal + * Don't use this variable; use the opal_progress_recursion_depth() + * function. + */ +OPAL_DECLSPEC extern +#if OMPI_HAVE_THREAD_SUPPORT +volatile +#endif +uint32_t opal_progress_recursion_depth_counter; + +/** + * Return the current level of recursion -- 0 means that we are not + * under an opal_progress() call at all. 1 means that you're in the + * top-level opal_progress() function (i.e., not deep in recursion). + * Higher values mean that you're that many levels deep in recursion. + */ +static inline uint32_t opal_progress_recursion_depth(void) +{ + return opal_progress_recursion_depth_counter; +} + + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/orte/mca/plm/alps/plm_alps_module.c b/orte/mca/plm/alps/plm_alps_module.c index e45404e25a..57f2e59dd9 100644 --- a/orte/mca/plm/alps/plm_alps_module.c +++ b/orte/mca/plm/alps/plm_alps_module.c @@ -149,6 +149,10 @@ static int plm_alps_launch_job(orte_job_t *jdata) orte_app_context_t **apps; orte_node_t **nodes; orte_std_cntr_t nnode; + orte_jobid_t failed_job; + + /* default to declaring the daemon launch failed */ + failed_job = ORTE_PROC_MY_NAME->jobid; if (mca_plm_alps_component.timing) { if (0 != gettimeofday(&joblaunchstart, NULL)) { @@ -356,6 +360,8 @@ static int plm_alps_launch_job(orte_job_t *jdata) } launch_apps: + /* if we get here, then daemons launched - change to declaring apps failed */ + failed_job = active_job; if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -398,7 +404,7 @@ cleanup: /* check for failed launch - if so, force terminate */ if (failed_launch) { - orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } return rc; @@ -494,13 +500,13 @@ static void alps_wait_cb(pid_t pid, int status, void* cbdata){ /* report that the daemon has failed so we break out of the daemon * callback receive and exit */ - orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_FAILED_TO_START); } else { /* an orted must have died unexpectedly after launch - report * that the daemon has failed so we exit */ - orte_plm_base_launch_failed(active_job, false, pid, status, ORTE_JOB_STATE_ABORTED); + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_ABORTED); } } diff --git a/orte/mca/plm/base/plm_base_heartbeat.c b/orte/mca/plm/base/plm_base_heartbeat.c index 1ae4fcbddb..fdaf42570b 100644 --- a/orte/mca/plm/base/plm_base_heartbeat.c +++ b/orte/mca/plm/base/plm_base_heartbeat.c @@ -117,7 +117,7 @@ static void check_heartbeat(int fd, short dummy, void *arg) /* if any daemon died, abort */ if (died) { - orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, false, -1, + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_ABORTED); return; } diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index eb276e9545..f8056292e6 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -63,7 +63,7 @@ static int orte_plm_base_report_launched(orte_jobid_t job); int orte_plm_base_setup_job(orte_job_t *jdata) { - int rc; + int rc, fd; orte_process_name_t name = {ORTE_JOBID_INVALID, 0}; ORTE_OUTPUT_VERBOSE((5, orte_plm_globals.output, @@ -136,6 +136,16 @@ int orte_plm_base_setup_job(orte_job_t *jdata) return rc; } + /* IOF cannot currently handle multiple pulls to the same fd. So + dup stderr to another fd. :-\ */ + fd = dup(2); + if (fd >= 0 && + ORTE_SUCCESS != (rc = orte_iof.iof_pull(&name, ORTE_NS_CMP_JOBID, + ORTE_IOF_INTERNAL, fd))) { + ORTE_ERROR_LOG(rc); + return rc; + } + #if OPAL_ENABLE_FT == 1 /* * Notify the Global SnapC component regarding new job @@ -224,7 +234,7 @@ static bool orted_failed_launch; static orte_job_t *jdatorted; static orte_proc_t **pdatorted; -void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t pid, +void orte_plm_base_launch_failed(orte_jobid_t job, pid_t pid, int status, orte_job_state_t state) { orte_job_t *jdata; @@ -239,10 +249,9 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t } ORTE_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:launch_failed for job %s %s daemon launch", + "%s plm:base:launch_failed for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(job), - (daemons_launching) ? "during" : "after")); + ORTE_JOBID_PRINT(job))); /* if this is the daemon job that failed, set the flag indicating * that a daemon failed so we use the proper @@ -261,26 +270,24 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t */ pidstr = strdup("unknown"); } - if (daemons_launching) { - if (WIFSIGNALED(status)) { /* died on signal */ + if (WIFSIGNALED(status)) { /* died on signal */ #ifdef WCOREDUMP - if (WCOREDUMP(status)) { - orte_show_help("help-plm-base.txt", "daemon-died-signal-core", true, - pidstr, WTERMSIG(status)); - } else { - orte_show_help("help-plm-base.txt", "daemon-died-signal", true, - pidstr, WTERMSIG(status)); - } -#else - orte_show_help("help-plm-base.txt", "daemon-died-signal", true, - pidstr, WTERMSIG(status)); -#endif /* WCOREDUMP */ + if (WCOREDUMP(status)) { + orte_show_help("help-plm-base.txt", "daemon-died-signal-core", true, + pidstr, WTERMSIG(status)); } else { - orte_show_help("help-plm-base.txt", "daemon-died-no-signal", true, - pidstr, WEXITSTATUS(status)); + orte_show_help("help-plm-base.txt", "daemon-died-signal", true, + pidstr, WTERMSIG(status)); } - orted_failed_launch = true; +#else + orte_show_help("help-plm-base.txt", "daemon-died-signal", true, + pidstr, WTERMSIG(status)); +#endif /* WCOREDUMP */ + } else { + orte_show_help("help-plm-base.txt", "daemon-died-no-signal", true, + pidstr, WEXITSTATUS(status)); } + orted_failed_launch = true; free(pidstr); } diff --git a/orte/mca/plm/base/plm_private.h b/orte/mca/plm/base/plm_private.h index 30431619a7..f544b6ac3a 100644 --- a/orte/mca/plm/base/plm_private.h +++ b/orte/mca/plm/base/plm_private.h @@ -65,8 +65,7 @@ ORTE_DECLSPEC int orte_plm_base_set_progress_sched(int sched); */ ORTE_DECLSPEC int orte_plm_base_setup_job(orte_job_t *jdata); ORTE_DECLSPEC int orte_plm_base_launch_apps(orte_jobid_t job); -ORTE_DECLSPEC void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active, - pid_t pid, int status, orte_job_state_t state); +ORTE_DECLSPEC void orte_plm_base_launch_failed(orte_jobid_t job, pid_t pid, int status, orte_job_state_t state); ORTE_DECLSPEC int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons); diff --git a/orte/mca/plm/lsf/plm_lsf_module.c b/orte/mca/plm/lsf/plm_lsf_module.c index f5ed1d0bba..5cb4891a3b 100644 --- a/orte/mca/plm/lsf/plm_lsf_module.c +++ b/orte/mca/plm/lsf/plm_lsf_module.c @@ -142,7 +142,11 @@ static int plm_lsf_launch_job(orte_job_t *jdata) orte_app_context_t **apps; orte_node_t **nodes; orte_std_cntr_t nnode; + orte_jobid_t failed_job; + /* default to declaring the daemons failed*/ + failed_job = ORTE_PROC_MY_NAME->jobid; + if (mca_plm_lsf_component.timing) { if (0 != gettimeofday(&joblaunchstart, NULL)) { orte_output(0, "plm_lsf: could not obtain job start time"); @@ -307,6 +311,8 @@ static int plm_lsf_launch_job(orte_job_t *jdata) } launch_apps: + /* daemons succeeded - any failure now would be from apps */ + failed_job = active_job; if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -346,8 +352,7 @@ cleanup: /* check for failed launch - if so, force terminate */ if (failed_launch) { - if (ORTE_SUCCESS != - orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } return rc; diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index 9d222f34ba..721e32acbc 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -313,7 +313,7 @@ static void orte_plm_rsh_wait_daemon(pid_t pid, int status, void* cbdata) /* note that this daemon failed */ daemon->state = ORTE_PROC_STATE_FAILED_TO_START; /* report that the daemon has failed so we can exit */ - orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_FAILED_TO_START); } } @@ -857,6 +857,10 @@ int orte_plm_rsh_launch(orte_job_t *jdata) orte_app_context_t **apps; orte_node_t **nodes; orte_std_cntr_t nnode; + orte_jobid_t failed_job; + + /* default to declaring the daemon launch as having failed */ + failed_job = ORTE_PROC_MY_NAME->jobid; if (orte_timing) { if (0 != gettimeofday(&joblaunchstart, NULL)) { @@ -1124,6 +1128,10 @@ next_node: } launch_apps: + /* if we get here, then the daemons succeeded, so any failure would now be + * for the application job + */ + failed_job = active_job; if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) { ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output, "%s plm:rsh: launch of apps failed for job %s on error %s", @@ -1149,7 +1157,7 @@ launch_apps: /* check for failed launch - if so, force terminate */ if (failed_launch) { - orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } /* setup a "heartbeat" timer to periodically check on diff --git a/orte/mca/plm/slurm/plm_slurm.h b/orte/mca/plm/slurm/plm_slurm.h index c5eb14af0c..c543999806 100644 --- a/orte/mca/plm/slurm/plm_slurm.h +++ b/orte/mca/plm/slurm/plm_slurm.h @@ -31,7 +31,6 @@ BEGIN_C_DECLS int priority; char *orted; char *custom_args; - bool detect_failure; }; typedef struct orte_plm_slurm_component_t orte_plm_slurm_component_t; diff --git a/orte/mca/plm/slurm/plm_slurm_component.c b/orte/mca/plm/slurm/plm_slurm_component.c index 2ececebb48..1f4564c9c3 100644 --- a/orte/mca/plm/slurm/plm_slurm_component.c +++ b/orte/mca/plm/slurm/plm_slurm_component.c @@ -97,8 +97,6 @@ orte_plm_slurm_component_t mca_plm_slurm_component = { static int plm_slurm_open(void) { - int tmp; - mca_base_component_t *comp = &mca_plm_slurm_component.super.base_version; mca_base_param_reg_int(comp, "priority", "Default selection priority", @@ -115,12 +113,6 @@ static int plm_slurm_open(void) false, false, NULL, &mca_plm_slurm_component.custom_args); - mca_base_param_reg_int(comp, "detect_failure", - "If set, have srun automatically detect failures and kill the job", - false, false, (int)false, - &tmp); - mca_plm_slurm_component.detect_failure = OPAL_INT_TO_BOOL(tmp); - return ORTE_SUCCESS; } diff --git a/orte/mca/plm/slurm/plm_slurm_module.c b/orte/mca/plm/slurm/plm_slurm_module.c index a962e3b59c..078bf80476 100644 --- a/orte/mca/plm/slurm/plm_slurm_module.c +++ b/orte/mca/plm/slurm/plm_slurm_module.c @@ -147,6 +147,10 @@ static int plm_slurm_launch_job(orte_job_t *jdata) char *cur_prefix; struct timeval launchstart, launchstop; int proc_vpid_index; + orte_jobid_t failed_job; + + /* flag the daemons as failing by default */ + failed_job = ORTE_PROC_MY_NAME->jobid; if (orte_timing) { if (0 != gettimeofday(&launchstart, NULL)) { @@ -231,9 +235,7 @@ static int plm_slurm_launch_job(orte_job_t *jdata) free(tmp); /* alert us if any orteds die during startup */ - if (mca_plm_slurm_component.detect_failure) { - opal_argv_append(&argc, &argv, "--kill-on-bad-exit"); - } + opal_argv_append(&argc, &argv, "--kill-on-bad-exit"); /* create nodelist */ nodelist_argv = NULL; @@ -367,6 +369,8 @@ static int plm_slurm_launch_job(orte_job_t *jdata) } launch_apps: + /* get here if daemons launch okay - any failures now by apps */ + failed_job = active_job; if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) { ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output, "%s plm:slurm: launch of apps failed for job %s on error %s", @@ -393,8 +397,6 @@ launch_apps: goto cleanup; } - /* JMS: short we stash the srun pid in the gpr somewhere for cleanup? */ - cleanup: if (NULL != argv) { opal_argv_free(argv); @@ -409,7 +411,7 @@ cleanup: /* check for failed launch - if so, force terminate */ if (failed_launch) { - orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } return rc; @@ -506,13 +508,13 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){ if (failed_launch) { /* report that the daemon has failed so we can exit */ - orte_plm_base_launch_failed(active_job, true, -1, status, ORTE_JOB_STATE_FAILED_TO_START); + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, status, ORTE_JOB_STATE_FAILED_TO_START); } else { /* an orted must have died unexpectedly after launch - report * that the daemon has failed so we exit */ - orte_plm_base_launch_failed(active_job, true, -1, status, ORTE_JOB_STATE_ABORTED); + orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, status, ORTE_JOB_STATE_ABORTED); } } diff --git a/orte/mca/plm/tm/plm_tm_module.c b/orte/mca/plm/tm/plm_tm_module.c index be6abca62f..b88eb849ce 100644 --- a/orte/mca/plm/tm/plm_tm_module.c +++ b/orte/mca/plm/tm/plm_tm_module.c @@ -148,7 +148,10 @@ static int plm_tm_launch_job(orte_job_t *jdata) tm_event_t event; bool failed_launch = true; mode_t current_umask; + orte_jobid_t failed_job; + /* default to declaring the daemons as failed */ + failed_job = ORTE_PROC_MY_NAME->jobid; /* create a jobid for this job */ if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) { @@ -369,6 +372,10 @@ static int plm_tm_launch_job(orte_job_t *jdata) } launch_apps: + /* since the daemons have launched, any failures now will be for the + * application job + */ + failed_job = jdata->jobid; if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(jdata->jobid))) { ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output, "%s plm:tm: launch of apps failed for job %s on error %s", @@ -408,8 +415,7 @@ launch_apps: /* check for failed launch - if so, force terminate */ if (failed_launch) { - orte_plm_base_launch_failed(jdata->jobid, true, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); - return rc; + orte_plm_base_launch_failed(failed_job, true, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START); } /* setup a "heartbeat" timer to periodically check on diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index c84ba27e53..9a5c304af6 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -281,7 +281,11 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, "%s orted_recv_cmd: reissued recv", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); } - + +static int num_recursions=0; +static int wait_time=1; +#define MAX_RECURSIONS 24 + void orte_daemon_cmd_processor(int fd, short event, void *data) { orte_message_event_t *mev = (orte_message_event_t*)data; @@ -294,6 +298,47 @@ void orte_daemon_cmd_processor(int fd, short event, void *data) orte_std_cntr_t n; orte_daemon_cmd_flag_t command; + /* check to see if we are in a progress recursion */ + if (orte_process_info.daemon && 1 < (ret = opal_progress_recursion_depth())) { + /* if we are in a recursion, we want to repost the message event + * so the progress engine can work its way back up to the top + * of the stack. Given that this could happen multiple times, + * we have to be careful to increase the time we wait so that + * we provide enough time - but not more time than necessary - for + * the stack to clear + */ + ORTE_OUTPUT_VERBOSE((1, orte_debug_output, + "%s orte:daemon:cmd:processor in recursion depth %d\n\treposting %s for tag %ld", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ret, + ORTE_NAME_PRINT(sender), + (long)(tag))); + if (MAX_RECURSIONS < num_recursions) { + /* we need to abort if we get too far down this path */ + opal_output(0, "%s ORTED_CMD_PROCESSOR: STUCK IN INFINITE LOOP - ABORTING", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + OBJ_RELEASE(mev); + /* make sure our local procs are dead - but don't update their state + * on the HNP as this may be redundant + */ + orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false); + + /* do -not- call finalize as this will send a message to the HNP + * indicating clean termination! Instead, just forcibly cleanup + * the local session_dir tree and abort + */ + orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); + + abort(); + } + wait_time = wait_time * 2; + ++num_recursions; + ORTE_MESSAGE_EVENT_DELAY(wait_time, mev); + return; + } + wait_time = 1; + num_recursions = 0; + ORTE_OUTPUT_VERBOSE((1, orte_debug_output, "%s orte:daemon:cmd:processor called by %s for tag %ld", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index dcc986c794..1bb82b4940 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -201,6 +201,11 @@ int orte_daemon(int argc, char *argv[]) char hostname[100]; char *tmp_env_var = NULL; + /* JUST TO HELP JMS DEBUG RACE CONDITION - TO BE REMOVED!! */ + gethostname(hostname, 100); + fprintf(stderr, "Daemon was launched on %s - starting up\n", hostname); + fflush(stderr); + /* initialize the globals */ memset(&orted_globals, 0, sizeof(orted_globals)); /* initialize the singleton died pipe to an illegal value so we can detect it was set */ @@ -385,9 +390,7 @@ int orte_daemon(int argc, char *argv[]) return ret; } - /* setup our receive function - this will allow us to relay messages - * during start for better scalability - */ + /* setup the primary daemon command receive function */ ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL); if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) { diff --git a/orte/runtime/orte_wait.c b/orte/runtime/orte_wait.c index fd2df3b9c9..3f538acd8b 100644 --- a/orte/runtime/orte_wait.c +++ b/orte/runtime/orte_wait.c @@ -74,16 +74,27 @@ static opal_list_t registered_cb; ********************************************************************/ static void message_event_destructor(orte_message_event_t *ev) { - OBJ_RELEASE(ev->buffer); + if (NULL != ev->ev) { + free(ev->ev); + } + if (NULL != ev->buffer) { + OBJ_RELEASE(ev->buffer); + } #if OMPI_ENABLE_DEBUG - ev->file = NULL; + if (NULL != ev->file) { + free(ev->file); + } #endif } static void message_event_constructor(orte_message_event_t *ev) { + ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t)); ev->buffer = OBJ_NEW(opal_buffer_t); -} +#if OMPI_ENABLE_DEBUG + ev->file = NULL; +#endif + } OBJ_CLASS_INSTANCE(orte_message_event_t, opal_object_t, message_event_constructor, diff --git a/orte/runtime/orte_wait.h b/orte/runtime/orte_wait.h index 7dd87903d7..4ef9eed884 100644 --- a/orte/runtime/orte_wait.h +++ b/orte/runtime/orte_wait.h @@ -149,6 +149,7 @@ ORTE_DECLSPEC void orte_trigger_event(int trig); */ typedef struct { opal_object_t super; + opal_event_t *ev; orte_process_name_t sender; opal_buffer_t *buffer; orte_rml_tag_t tag; @@ -159,17 +160,26 @@ typedef struct { } orte_message_event_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); +#define ORTE_MESSAGE_EVENT_DELAY(delay, mev) \ + do { \ + struct timeval now; \ + ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \ + "defining message event delay: %s %d", \ + __FILE__, __LINE__)); \ + now.tv_sec = delay/1000000; \ + now.tv_usec = delay%1000000; \ + opal_evtimer_add(mev->ev, &now); \ + } while(0); + #if OMPI_ENABLE_DEBUG #define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \ do { \ orte_message_event_t *mev; \ struct timeval now; \ - opal_event_t *tmp; \ ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \ "defining message event: %s %d", \ __FILE__, __LINE__)); \ - tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ mev = OBJ_NEW(orte_message_event_t); \ mev->sender.jobid = (sndr)->jobid; \ mev->sender.vpid = (sndr)->vpid; \ @@ -177,31 +187,30 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); mev->tag = (tg); \ mev->file = strdup((buf)->parent.cls_init_file_name); \ mev->line = (buf)->parent.cls_init_lineno; \ - opal_evtimer_set(tmp, (cbfunc), mev); \ + opal_evtimer_set(mev->ev, (cbfunc), mev); \ now.tv_sec = 0; \ now.tv_usec = 0; \ - opal_evtimer_add(tmp, &now); \ + opal_evtimer_add(mev->ev, &now); \ } while(0); + #else #define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \ do { \ orte_message_event_t *mev; \ struct timeval now; \ - opal_event_t *tmp; \ ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \ "defining message event: %s %d", \ __FILE__, __LINE__)); \ - tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ mev = OBJ_NEW(orte_message_event_t); \ mev->sender.jobid = (sndr)->jobid; \ mev->sender.vpid = (sndr)->vpid; \ opal_dss.copy_payload(mev->buffer, (buf)); \ mev->tag = (tg); \ - opal_evtimer_set(tmp, (cbfunc), mev); \ + opal_evtimer_set(mev->ev, (cbfunc), mev); \ now.tv_sec = 0; \ now.tv_usec = 0; \ - opal_evtimer_add(tmp, &now); \ + opal_evtimer_add(mev->ev, &now); \ } while(0); #endif @@ -227,19 +236,15 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t); do { \ struct timeval now; \ opal_event_t *tmp; \ + int timeout; \ tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \ opal_evtimer_set(tmp, (cbfunc), NULL); \ - now.tv_sec = 0; \ - now.tv_usec = (float)(deltat) * (float)(n); \ - if (maxwait > 0) { \ - if (now.tv_usec > (maxwait)) { \ - now.tv_usec = (maxwait); \ - } \ - } \ - if (now.tv_usec > 1000000.0) { \ - now.tv_sec = (float)((int)(now.tv_usec/1000000.0)); \ - now.tv_usec = now.tv_usec - 1000000.0*now.tv_sec; \ + timeout = (deltat) * (n); \ + if ((maxwait) > 0 && timeout > (maxwait)) { \ + timeout = (maxwait); \ } \ + now.tv_sec = timeout/1000000; \ + now.tv_usec = timeout%1000000; \ ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \ "defining timeout: %ld sec %ld usec", \ (long)now.tv_sec, (long)now.tv_usec)); \