Fix a potential, albeit perhaps esoteric, race condition that can occur for fast HNP's, slow orteds, and fast apps. Under those conditions, it is possible for the orted to be caught in its original send of contact info back to the HNP, and thus for the progress stack never to recover back to a high level. In those circumstances, the orted can "hang" when trying to exit.
Add a new function to opal_progress that tells us our recursion depth to support that solution. Yes, I know this sounds picky, but good ol' Jeff managed to make it happen by driving his cluster near to death... Also ensure that we declare "failed" for the daemon job when daemons fail instead of the application job. This is important so that orte knows that it cannot use xcast to tell daemons to "exit", nor should it expect all daemons to respond. Otherwise, it is possible to hang. After lots of testing, decide to default (again) to slurm detecting failed orteds. This proved necessary to avoid rather annoying hangs that were difficult to recover from. There are conditions where slurm will fail to launch all daemons (slurm folks are working on it), and yet again, good ol' Jeff managed to find both of them. Thanks you Jeff! :-/ This commit was SVN r18611.
This commit is contained in:
parent
2aec094d56
commit
7bee71aa59
@ -70,6 +70,12 @@ static int32_t event_progress_delta = 0;
|
||||
be every time */
|
||||
static int32_t num_event_users = 0;
|
||||
|
||||
/* How deep are we in opal_progress recursion? */
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
volatile
|
||||
#endif
|
||||
uint32_t opal_progress_recursion_depth_counter = 0;
|
||||
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
static int debug_output = -1;
|
||||
#endif
|
||||
@ -161,6 +167,11 @@ opal_progress(void)
|
||||
size_t i;
|
||||
int events = 0;
|
||||
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
opal_atomic_add(&opal_progress_recursion_depth_counter, 1);
|
||||
#else
|
||||
++opal_progress_recursion_depth_counter;
|
||||
#endif
|
||||
if( opal_progress_event_flag != 0 ) {
|
||||
#if (OMPI_ENABLE_PROGRESS_THREADS == 0) && OPAL_HAVE_WORKING_EVENTOPS
|
||||
#if OPAL_PROGRESS_USE_TIMERS
|
||||
@ -210,6 +221,11 @@ opal_progress(void)
|
||||
#endif /* defined(__WINDOWS__) */
|
||||
}
|
||||
#endif /* defined(__WINDOWS__) || defined(HAVE_SCHED_YIELD) */
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
opal_atomic_add(&opal_progress_recursion_depth_counter, -1);
|
||||
#else
|
||||
--opal_progress_recursion_depth_counter;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -202,6 +202,29 @@ static inline bool opal_progress_spin(volatile bool* complete)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* \internal
|
||||
* Don't use this variable; use the opal_progress_recursion_depth()
|
||||
* function.
|
||||
*/
|
||||
OPAL_DECLSPEC extern
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
volatile
|
||||
#endif
|
||||
uint32_t opal_progress_recursion_depth_counter;
|
||||
|
||||
/**
|
||||
* Return the current level of recursion -- 0 means that we are not
|
||||
* under an opal_progress() call at all. 1 means that you're in the
|
||||
* top-level opal_progress() function (i.e., not deep in recursion).
|
||||
* Higher values mean that you're that many levels deep in recursion.
|
||||
*/
|
||||
static inline uint32_t opal_progress_recursion_depth(void)
|
||||
{
|
||||
return opal_progress_recursion_depth_counter;
|
||||
}
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -149,6 +149,10 @@ static int plm_alps_launch_job(orte_job_t *jdata)
|
||||
orte_app_context_t **apps;
|
||||
orte_node_t **nodes;
|
||||
orte_std_cntr_t nnode;
|
||||
orte_jobid_t failed_job;
|
||||
|
||||
/* default to declaring the daemon launch failed */
|
||||
failed_job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
if (mca_plm_alps_component.timing) {
|
||||
if (0 != gettimeofday(&joblaunchstart, NULL)) {
|
||||
@ -356,6 +360,8 @@ static int plm_alps_launch_job(orte_job_t *jdata)
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
/* if we get here, then daemons launched - change to declaring apps failed */
|
||||
failed_job = active_job;
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
@ -398,7 +404,7 @@ cleanup:
|
||||
|
||||
/* check for failed launch - if so, force terminate */
|
||||
if (failed_launch) {
|
||||
orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
|
||||
return rc;
|
||||
@ -494,13 +500,13 @@ static void alps_wait_cb(pid_t pid, int status, void* cbdata){
|
||||
/* report that the daemon has failed so we break out of the daemon
|
||||
* callback receive and exit
|
||||
*/
|
||||
orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
|
||||
} else {
|
||||
/* an orted must have died unexpectedly after launch - report
|
||||
* that the daemon has failed so we exit
|
||||
*/
|
||||
orte_plm_base_launch_failed(active_job, false, pid, status, ORTE_JOB_STATE_ABORTED);
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_ABORTED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,7 +117,7 @@ static void check_heartbeat(int fd, short dummy, void *arg)
|
||||
|
||||
/* if any daemon died, abort */
|
||||
if (died) {
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, false, -1,
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1,
|
||||
ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_ABORTED);
|
||||
return;
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -63,7 +63,7 @@ static int orte_plm_base_report_launched(orte_jobid_t job);
|
||||
|
||||
int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
{
|
||||
int rc;
|
||||
int rc, fd;
|
||||
orte_process_name_t name = {ORTE_JOBID_INVALID, 0};
|
||||
|
||||
ORTE_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
@ -136,6 +136,16 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* IOF cannot currently handle multiple pulls to the same fd. So
|
||||
dup stderr to another fd. :-\ */
|
||||
fd = dup(2);
|
||||
if (fd >= 0 &&
|
||||
ORTE_SUCCESS != (rc = orte_iof.iof_pull(&name, ORTE_NS_CMP_JOBID,
|
||||
ORTE_IOF_INTERNAL, fd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
/*
|
||||
* Notify the Global SnapC component regarding new job
|
||||
@ -224,7 +234,7 @@ static bool orted_failed_launch;
|
||||
static orte_job_t *jdatorted;
|
||||
static orte_proc_t **pdatorted;
|
||||
|
||||
void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t pid,
|
||||
void orte_plm_base_launch_failed(orte_jobid_t job, pid_t pid,
|
||||
int status, orte_job_state_t state)
|
||||
{
|
||||
orte_job_t *jdata;
|
||||
@ -239,10 +249,9 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t
|
||||
}
|
||||
|
||||
ORTE_OUTPUT_VERBOSE((5, orte_plm_globals.output,
|
||||
"%s plm:base:launch_failed for job %s %s daemon launch",
|
||||
"%s plm:base:launch_failed for job %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job),
|
||||
(daemons_launching) ? "during" : "after"));
|
||||
ORTE_JOBID_PRINT(job)));
|
||||
|
||||
/* if this is the daemon job that failed, set the flag indicating
|
||||
* that a daemon failed so we use the proper
|
||||
@ -261,26 +270,24 @@ void orte_plm_base_launch_failed(orte_jobid_t job, bool daemons_launching, pid_t
|
||||
*/
|
||||
pidstr = strdup("unknown");
|
||||
}
|
||||
if (daemons_launching) {
|
||||
if (WIFSIGNALED(status)) { /* died on signal */
|
||||
if (WIFSIGNALED(status)) { /* died on signal */
|
||||
#ifdef WCOREDUMP
|
||||
if (WCOREDUMP(status)) {
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal-core", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
} else {
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
}
|
||||
#else
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
#endif /* WCOREDUMP */
|
||||
if (WCOREDUMP(status)) {
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal-core", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
} else {
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-no-signal", true,
|
||||
pidstr, WEXITSTATUS(status));
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
}
|
||||
orted_failed_launch = true;
|
||||
#else
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-signal", true,
|
||||
pidstr, WTERMSIG(status));
|
||||
#endif /* WCOREDUMP */
|
||||
} else {
|
||||
orte_show_help("help-plm-base.txt", "daemon-died-no-signal", true,
|
||||
pidstr, WEXITSTATUS(status));
|
||||
}
|
||||
orted_failed_launch = true;
|
||||
free(pidstr);
|
||||
}
|
||||
|
||||
|
@ -65,8 +65,7 @@ ORTE_DECLSPEC int orte_plm_base_set_progress_sched(int sched);
|
||||
*/
|
||||
ORTE_DECLSPEC int orte_plm_base_setup_job(orte_job_t *jdata);
|
||||
ORTE_DECLSPEC int orte_plm_base_launch_apps(orte_jobid_t job);
|
||||
ORTE_DECLSPEC void orte_plm_base_launch_failed(orte_jobid_t job, bool callback_active,
|
||||
pid_t pid, int status, orte_job_state_t state);
|
||||
ORTE_DECLSPEC void orte_plm_base_launch_failed(orte_jobid_t job, pid_t pid, int status, orte_job_state_t state);
|
||||
|
||||
ORTE_DECLSPEC int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons);
|
||||
|
||||
|
@ -142,7 +142,11 @@ static int plm_lsf_launch_job(orte_job_t *jdata)
|
||||
orte_app_context_t **apps;
|
||||
orte_node_t **nodes;
|
||||
orte_std_cntr_t nnode;
|
||||
orte_jobid_t failed_job;
|
||||
|
||||
/* default to declaring the daemons failed*/
|
||||
failed_job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
if (mca_plm_lsf_component.timing) {
|
||||
if (0 != gettimeofday(&joblaunchstart, NULL)) {
|
||||
orte_output(0, "plm_lsf: could not obtain job start time");
|
||||
@ -307,6 +311,8 @@ static int plm_lsf_launch_job(orte_job_t *jdata)
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
/* daemons succeeded - any failure now would be from apps */
|
||||
failed_job = active_job;
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
@ -346,8 +352,7 @@ cleanup:
|
||||
|
||||
/* check for failed launch - if so, force terminate */
|
||||
if (failed_launch) {
|
||||
if (ORTE_SUCCESS !=
|
||||
orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
|
||||
return rc;
|
||||
|
@ -313,7 +313,7 @@ static void orte_plm_rsh_wait_daemon(pid_t pid, int status, void* cbdata)
|
||||
/* note that this daemon failed */
|
||||
daemon->state = ORTE_PROC_STATE_FAILED_TO_START;
|
||||
/* report that the daemon has failed so we can exit */
|
||||
orte_plm_base_launch_failed(active_job, true, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, pid, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
}
|
||||
|
||||
@ -857,6 +857,10 @@ int orte_plm_rsh_launch(orte_job_t *jdata)
|
||||
orte_app_context_t **apps;
|
||||
orte_node_t **nodes;
|
||||
orte_std_cntr_t nnode;
|
||||
orte_jobid_t failed_job;
|
||||
|
||||
/* default to declaring the daemon launch as having failed */
|
||||
failed_job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
if (orte_timing) {
|
||||
if (0 != gettimeofday(&joblaunchstart, NULL)) {
|
||||
@ -1124,6 +1128,10 @@ next_node:
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
/* if we get here, then the daemons succeeded, so any failure would now be
|
||||
* for the application job
|
||||
*/
|
||||
failed_job = active_job;
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) {
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:rsh: launch of apps failed for job %s on error %s",
|
||||
@ -1149,7 +1157,7 @@ launch_apps:
|
||||
|
||||
/* check for failed launch - if so, force terminate */
|
||||
if (failed_launch) {
|
||||
orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
|
||||
/* setup a "heartbeat" timer to periodically check on
|
||||
|
@ -31,7 +31,6 @@ BEGIN_C_DECLS
|
||||
int priority;
|
||||
char *orted;
|
||||
char *custom_args;
|
||||
bool detect_failure;
|
||||
};
|
||||
typedef struct orte_plm_slurm_component_t orte_plm_slurm_component_t;
|
||||
|
||||
|
@ -97,8 +97,6 @@ orte_plm_slurm_component_t mca_plm_slurm_component = {
|
||||
|
||||
static int plm_slurm_open(void)
|
||||
{
|
||||
int tmp;
|
||||
|
||||
mca_base_component_t *comp = &mca_plm_slurm_component.super.base_version;
|
||||
|
||||
mca_base_param_reg_int(comp, "priority", "Default selection priority",
|
||||
@ -115,12 +113,6 @@ static int plm_slurm_open(void)
|
||||
false, false, NULL,
|
||||
&mca_plm_slurm_component.custom_args);
|
||||
|
||||
mca_base_param_reg_int(comp, "detect_failure",
|
||||
"If set, have srun automatically detect failures and kill the job",
|
||||
false, false, (int)false,
|
||||
&tmp);
|
||||
mca_plm_slurm_component.detect_failure = OPAL_INT_TO_BOOL(tmp);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,10 @@ static int plm_slurm_launch_job(orte_job_t *jdata)
|
||||
char *cur_prefix;
|
||||
struct timeval launchstart, launchstop;
|
||||
int proc_vpid_index;
|
||||
orte_jobid_t failed_job;
|
||||
|
||||
/* flag the daemons as failing by default */
|
||||
failed_job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
if (orte_timing) {
|
||||
if (0 != gettimeofday(&launchstart, NULL)) {
|
||||
@ -231,9 +235,7 @@ static int plm_slurm_launch_job(orte_job_t *jdata)
|
||||
free(tmp);
|
||||
|
||||
/* alert us if any orteds die during startup */
|
||||
if (mca_plm_slurm_component.detect_failure) {
|
||||
opal_argv_append(&argc, &argv, "--kill-on-bad-exit");
|
||||
}
|
||||
opal_argv_append(&argc, &argv, "--kill-on-bad-exit");
|
||||
|
||||
/* create nodelist */
|
||||
nodelist_argv = NULL;
|
||||
@ -367,6 +369,8 @@ static int plm_slurm_launch_job(orte_job_t *jdata)
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
/* get here if daemons launch okay - any failures now by apps */
|
||||
failed_job = active_job;
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(active_job))) {
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:slurm: launch of apps failed for job %s on error %s",
|
||||
@ -393,8 +397,6 @@ launch_apps:
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* JMS: short we stash the srun pid in the gpr somewhere for cleanup? */
|
||||
|
||||
cleanup:
|
||||
if (NULL != argv) {
|
||||
opal_argv_free(argv);
|
||||
@ -409,7 +411,7 @@ cleanup:
|
||||
|
||||
/* check for failed launch - if so, force terminate */
|
||||
if (failed_launch) {
|
||||
orte_plm_base_launch_failed(jdata->jobid, false, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(failed_job, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
|
||||
return rc;
|
||||
@ -506,13 +508,13 @@ static void srun_wait_cb(pid_t pid, int status, void* cbdata){
|
||||
if (failed_launch) {
|
||||
/* report that the daemon has failed so we can exit
|
||||
*/
|
||||
orte_plm_base_launch_failed(active_job, true, -1, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, status, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
|
||||
} else {
|
||||
/* an orted must have died unexpectedly after launch - report
|
||||
* that the daemon has failed so we exit
|
||||
*/
|
||||
orte_plm_base_launch_failed(active_job, true, -1, status, ORTE_JOB_STATE_ABORTED);
|
||||
orte_plm_base_launch_failed(ORTE_PROC_MY_NAME->jobid, -1, status, ORTE_JOB_STATE_ABORTED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,10 @@ static int plm_tm_launch_job(orte_job_t *jdata)
|
||||
tm_event_t event;
|
||||
bool failed_launch = true;
|
||||
mode_t current_umask;
|
||||
orte_jobid_t failed_job;
|
||||
|
||||
/* default to declaring the daemons as failed */
|
||||
failed_job = ORTE_PROC_MY_NAME->jobid;
|
||||
|
||||
/* create a jobid for this job */
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(&jdata->jobid))) {
|
||||
@ -369,6 +372,10 @@ static int plm_tm_launch_job(orte_job_t *jdata)
|
||||
}
|
||||
|
||||
launch_apps:
|
||||
/* since the daemons have launched, any failures now will be for the
|
||||
* application job
|
||||
*/
|
||||
failed_job = jdata->jobid;
|
||||
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(jdata->jobid))) {
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_plm_globals.output,
|
||||
"%s plm:tm: launch of apps failed for job %s on error %s",
|
||||
@ -408,8 +415,7 @@ launch_apps:
|
||||
|
||||
/* check for failed launch - if so, force terminate */
|
||||
if (failed_launch) {
|
||||
orte_plm_base_launch_failed(jdata->jobid, true, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
return rc;
|
||||
orte_plm_base_launch_failed(failed_job, true, -1, ORTE_ERROR_DEFAULT_EXIT_CODE, ORTE_JOB_STATE_FAILED_TO_START);
|
||||
}
|
||||
|
||||
/* setup a "heartbeat" timer to periodically check on
|
||||
|
@ -281,7 +281,11 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
|
||||
"%s orted_recv_cmd: reissued recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
|
||||
static int num_recursions=0;
|
||||
static int wait_time=1;
|
||||
#define MAX_RECURSIONS 24
|
||||
|
||||
void orte_daemon_cmd_processor(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
@ -294,6 +298,47 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
|
||||
orte_std_cntr_t n;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
|
||||
/* check to see if we are in a progress recursion */
|
||||
if (orte_process_info.daemon && 1 < (ret = opal_progress_recursion_depth())) {
|
||||
/* if we are in a recursion, we want to repost the message event
|
||||
* so the progress engine can work its way back up to the top
|
||||
* of the stack. Given that this could happen multiple times,
|
||||
* we have to be careful to increase the time we wait so that
|
||||
* we provide enough time - but not more time than necessary - for
|
||||
* the stack to clear
|
||||
*/
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:cmd:processor in recursion depth %d\n\treposting %s for tag %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ret,
|
||||
ORTE_NAME_PRINT(sender),
|
||||
(long)(tag)));
|
||||
if (MAX_RECURSIONS < num_recursions) {
|
||||
/* we need to abort if we get too far down this path */
|
||||
opal_output(0, "%s ORTED_CMD_PROCESSOR: STUCK IN INFINITE LOOP - ABORTING",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
OBJ_RELEASE(mev);
|
||||
/* make sure our local procs are dead - but don't update their state
|
||||
* on the HNP as this may be redundant
|
||||
*/
|
||||
orte_odls.kill_local_procs(ORTE_JOBID_WILDCARD, false);
|
||||
|
||||
/* do -not- call finalize as this will send a message to the HNP
|
||||
* indicating clean termination! Instead, just forcibly cleanup
|
||||
* the local session_dir tree and abort
|
||||
*/
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
abort();
|
||||
}
|
||||
wait_time = wait_time * 2;
|
||||
++num_recursions;
|
||||
ORTE_MESSAGE_EVENT_DELAY(wait_time, mev);
|
||||
return;
|
||||
}
|
||||
wait_time = 1;
|
||||
num_recursions = 0;
|
||||
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:cmd:processor called by %s for tag %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
|
@ -201,6 +201,11 @@ int orte_daemon(int argc, char *argv[])
|
||||
char hostname[100];
|
||||
char *tmp_env_var = NULL;
|
||||
|
||||
/* JUST TO HELP JMS DEBUG RACE CONDITION - TO BE REMOVED!! */
|
||||
gethostname(hostname, 100);
|
||||
fprintf(stderr, "Daemon was launched on %s - starting up\n", hostname);
|
||||
fflush(stderr);
|
||||
|
||||
/* initialize the globals */
|
||||
memset(&orted_globals, 0, sizeof(orted_globals));
|
||||
/* initialize the singleton died pipe to an illegal value so we can detect it was set */
|
||||
@ -385,9 +390,7 @@ int orte_daemon(int argc, char *argv[])
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* setup our receive function - this will allow us to relay messages
|
||||
* during start for better scalability
|
||||
*/
|
||||
/* setup the primary daemon command receive function */
|
||||
ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON,
|
||||
ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL);
|
||||
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
|
||||
|
@ -74,16 +74,27 @@ static opal_list_t registered_cb;
|
||||
********************************************************************/
|
||||
static void message_event_destructor(orte_message_event_t *ev)
|
||||
{
|
||||
OBJ_RELEASE(ev->buffer);
|
||||
if (NULL != ev->ev) {
|
||||
free(ev->ev);
|
||||
}
|
||||
if (NULL != ev->buffer) {
|
||||
OBJ_RELEASE(ev->buffer);
|
||||
}
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
ev->file = NULL;
|
||||
if (NULL != ev->file) {
|
||||
free(ev->file);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void message_event_constructor(orte_message_event_t *ev)
|
||||
{
|
||||
ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t));
|
||||
ev->buffer = OBJ_NEW(opal_buffer_t);
|
||||
}
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
ev->file = NULL;
|
||||
#endif
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_message_event_t,
|
||||
opal_object_t,
|
||||
message_event_constructor,
|
||||
|
@ -149,6 +149,7 @@ ORTE_DECLSPEC void orte_trigger_event(int trig);
|
||||
*/
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t *ev;
|
||||
orte_process_name_t sender;
|
||||
opal_buffer_t *buffer;
|
||||
orte_rml_tag_t tag;
|
||||
@ -159,17 +160,26 @@ typedef struct {
|
||||
} orte_message_event_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
|
||||
#define ORTE_MESSAGE_EVENT_DELAY(delay, mev) \
|
||||
do { \
|
||||
struct timeval now; \
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining message event delay: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
now.tv_sec = delay/1000000; \
|
||||
now.tv_usec = delay%1000000; \
|
||||
opal_evtimer_add(mev->ev, &now); \
|
||||
} while(0);
|
||||
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
|
||||
#define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \
|
||||
do { \
|
||||
orte_message_event_t *mev; \
|
||||
struct timeval now; \
|
||||
opal_event_t *tmp; \
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining message event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
|
||||
mev = OBJ_NEW(orte_message_event_t); \
|
||||
mev->sender.jobid = (sndr)->jobid; \
|
||||
mev->sender.vpid = (sndr)->vpid; \
|
||||
@ -177,31 +187,30 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
mev->tag = (tg); \
|
||||
mev->file = strdup((buf)->parent.cls_init_file_name); \
|
||||
mev->line = (buf)->parent.cls_init_lineno; \
|
||||
opal_evtimer_set(tmp, (cbfunc), mev); \
|
||||
opal_evtimer_set(mev->ev, (cbfunc), mev); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = 0; \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
opal_evtimer_add(mev->ev, &now); \
|
||||
} while(0);
|
||||
|
||||
#else
|
||||
|
||||
#define ORTE_MESSAGE_EVENT(sndr, buf, tg, cbfunc) \
|
||||
do { \
|
||||
orte_message_event_t *mev; \
|
||||
struct timeval now; \
|
||||
opal_event_t *tmp; \
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining message event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
|
||||
mev = OBJ_NEW(orte_message_event_t); \
|
||||
mev->sender.jobid = (sndr)->jobid; \
|
||||
mev->sender.vpid = (sndr)->vpid; \
|
||||
opal_dss.copy_payload(mev->buffer, (buf)); \
|
||||
mev->tag = (tg); \
|
||||
opal_evtimer_set(tmp, (cbfunc), mev); \
|
||||
opal_evtimer_set(mev->ev, (cbfunc), mev); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = 0; \
|
||||
opal_evtimer_add(tmp, &now); \
|
||||
opal_evtimer_add(mev->ev, &now); \
|
||||
} while(0);
|
||||
|
||||
#endif
|
||||
@ -227,19 +236,15 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_message_event_t);
|
||||
do { \
|
||||
struct timeval now; \
|
||||
opal_event_t *tmp; \
|
||||
int timeout; \
|
||||
tmp = (opal_event_t*)malloc(sizeof(opal_event_t)); \
|
||||
opal_evtimer_set(tmp, (cbfunc), NULL); \
|
||||
now.tv_sec = 0; \
|
||||
now.tv_usec = (float)(deltat) * (float)(n); \
|
||||
if (maxwait > 0) { \
|
||||
if (now.tv_usec > (maxwait)) { \
|
||||
now.tv_usec = (maxwait); \
|
||||
} \
|
||||
} \
|
||||
if (now.tv_usec > 1000000.0) { \
|
||||
now.tv_sec = (float)((int)(now.tv_usec/1000000.0)); \
|
||||
now.tv_usec = now.tv_usec - 1000000.0*now.tv_sec; \
|
||||
timeout = (deltat) * (n); \
|
||||
if ((maxwait) > 0 && timeout > (maxwait)) { \
|
||||
timeout = (maxwait); \
|
||||
} \
|
||||
now.tv_sec = timeout/1000000; \
|
||||
now.tv_usec = timeout%1000000; \
|
||||
ORTE_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"defining timeout: %ld sec %ld usec", \
|
||||
(long)now.tv_sec, (long)now.tv_usec)); \
|
||||
|
Loading…
x
Reference in New Issue
Block a user