Start reducing our dependency on the event library by removing at least one instance where we use it to redirect the program counter. Rolf reported occasional hangs of mpirun in very specific circumstances after all daemons were done. A review of MTT results indicates this may have been happening more generally in a small fraction of cases.
The problem was tracked to use of the grpcomm.onesided_barrier to control daemon/mpirun termination. This relied on messaging -and- required that the program counter jump from the errmgr back to grpcomm. On rare occasions, this jump did not occur, causing mpirun to hang. This patch looks more invasive than it is - most of the affected files simply had one or two lines removed. The essence of the change is: * pulled the job_complete and quit routines out of orterun and orted_main and put them in a common place * modified the errmgr to directly call the new routines when termination is detected * removed the grpcomm.onesided_barrier and its associated RML tag * add a new "num_routes" API to the routed framework that reports back the number of dependent routes. When route_lost is called, the daemon's list of "children" is checked and adjusted if that route went to a "leaf" in the routing tree * use connection termination between daemons to track rollup of the daemon tree. Daemons and HNP now terminate once num_routes returns zero Also picked up in this commit is the addition of a new bool flag to the app_context struct, and increasing the job_control field from 8 to 16 bits. Both trivial. This commit was SVN r23429.
This commit is contained in:
parent
acd990ffe5
commit
12cd07c9a9
@ -27,12 +27,6 @@
|
||||
#include "opal/util/opal_sos.h"
|
||||
#include "opal/dss/dss.h"
|
||||
|
||||
#include "orte/util/error_strings.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_locks.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/odls/odls.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
@ -43,6 +37,15 @@
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/mca/debugger/base/base.h"
|
||||
|
||||
#include "orte/util/error_strings.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_locks.h"
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/errmgr/base/base.h"
|
||||
#include "orte/mca/errmgr/base/errmgr_private.h"
|
||||
@ -296,8 +299,15 @@ static int update_state(orte_jobid_t job,
|
||||
|
||||
/* get the job object */
|
||||
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
/* if the orteds are terminating, check job complete */
|
||||
if (orte_orteds_term_ordered) {
|
||||
opal_output(0, "TERM ORDERED - CHECKING COMPLETE");
|
||||
check_job_complete(NULL);
|
||||
return ORTE_SUCCESS;
|
||||
} else {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
}
|
||||
|
||||
/* update is for a specific proc */
|
||||
@ -390,38 +400,48 @@ static int update_state(orte_jobid_t job,
|
||||
break;
|
||||
|
||||
case ORTE_PROC_STATE_COMM_FAILED:
|
||||
/* delete the route */
|
||||
orte_routed.delete_route(proc);
|
||||
/* purge the oob */
|
||||
orte_rml.purge(proc);
|
||||
/* is this to a daemon? */
|
||||
if (ORTE_PROC_MY_NAME->jobid == proc->jobid) {
|
||||
/* if we have ordered orteds to terminate, see if this one failed to tell
|
||||
* us it had terminated
|
||||
*/
|
||||
/* if this is my own connection, ignore it */
|
||||
if (ORTE_PROC_MY_NAME->vpid == proc->vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s My own connection - ignoring it",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
break;
|
||||
}
|
||||
/* if we have ordered orteds to terminate, record it */
|
||||
if (orte_orteds_term_ordered) {
|
||||
if (orte_orted_exit_with_barrier) {
|
||||
record_dead_daemon(jdata, proc->vpid, state, exit_code);
|
||||
check_job_complete(jdata);
|
||||
break;
|
||||
} else {
|
||||
record_dead_daemon(jdata, proc->vpid, state, 0);
|
||||
check_job_complete(jdata);
|
||||
break;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s Daemons terminating - recording daemon %s as gone",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(proc)));
|
||||
/* remove from dependent routes, if it is one */
|
||||
orte_routed.route_lost(proc);
|
||||
/* update daemon job */
|
||||
record_dead_daemon(jdata, proc->vpid, state, 0);
|
||||
/* check for complete */
|
||||
check_job_complete(jdata);
|
||||
break;
|
||||
}
|
||||
/* if abort is in progress, see if this one failed to tell
|
||||
* us it had terminated
|
||||
*/
|
||||
if (orte_abnormal_term_ordered) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s Abort in progress - recording daemon %s as gone",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(proc)));
|
||||
/* remove from dependent routes, if it is one */
|
||||
orte_routed.route_lost(proc);
|
||||
/* update daemon job */
|
||||
record_dead_daemon(jdata, proc->vpid, state, exit_code);
|
||||
/* check for complete */
|
||||
check_job_complete(jdata);
|
||||
break;
|
||||
}
|
||||
/* if this is my own connection, ignore it */
|
||||
if (ORTE_PROC_MY_NAME->vpid == proc->vpid) {
|
||||
break;
|
||||
}
|
||||
/* delete the route */
|
||||
orte_routed.delete_route(proc);
|
||||
/* purge the oob */
|
||||
orte_rml.purge(proc);
|
||||
|
||||
if (orte_enable_recovery) {
|
||||
/* relocate its processes */
|
||||
if (ORTE_SUCCESS != (rc = hnp_relocate(jdata, proc, state, exit_code))) {
|
||||
@ -755,7 +775,14 @@ static void check_job_complete(orte_job_t *jdata)
|
||||
/* Check if FileM is active. If so then keep processing. */
|
||||
OPAL_ACQUIRE_THREAD(&orte_filem_base_lock, &orte_filem_base_cond, &orte_filem_base_is_active);
|
||||
#endif
|
||||
|
||||
if (NULL == jdata) {
|
||||
/* just check to see if the daemons are complete */
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s errmgr:hnp:check_job_complete - received NULL job, checking daemons",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto CHECK_DAEMONS;
|
||||
}
|
||||
|
||||
for (i=0; i < jdata->procs->size && !jdata->abort; i++) {
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
|
||||
/* the proc array may no longer be left justified, so
|
||||
@ -978,14 +1005,21 @@ static void check_job_complete(orte_job_t *jdata)
|
||||
* This can happen if a ctrl-c hits in the "wrong" place
|
||||
* while launching
|
||||
*/
|
||||
CHECK_DAEMONS:
|
||||
if (jdata == NULL || jdata->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
|
||||
if (jdata->num_terminated >= jdata->num_procs) {
|
||||
if (0 == orte_routed.num_routes()) {
|
||||
/* orteds are done! */
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s orteds complete - exiting",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
if (NULL == jdata) {
|
||||
jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
|
||||
}
|
||||
jdata->state = ORTE_JOB_STATE_TERMINATED;
|
||||
orte_trigger_event(&orteds_exit);
|
||||
orte_quit();
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Release the resources used by this job. Since some errmgrs may want
|
||||
@ -1094,15 +1128,22 @@ static void check_job_complete(orte_job_t *jdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
return;
|
||||
}
|
||||
/* if we get here, then all jobs are done, so wakeup */
|
||||
/* if we get here, then all jobs are done, so terminate */
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s errmgr:hnp:check_job_completed all jobs terminated - waking up",
|
||||
"%s errmgr:hnp:check_job_completed all jobs terminated",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
/* set the exit status to 0 - this will only happen if it
|
||||
* wasn't already set by an error condition
|
||||
*/
|
||||
ORTE_UPDATE_EXIT_STATUS(0);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
/* if I am the only daemon alive, then I can exit now */
|
||||
if (0 == orte_routed.num_routes()) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s orteds complete - exiting",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
orte_quit();
|
||||
}
|
||||
}
|
||||
|
||||
static void killprocs(orte_jobid_t job, orte_vpid_t vpid)
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "orte/mca/plm/plm_types.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/mca/sensor/sensor.h"
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/errmgr/base/base.h"
|
||||
@ -244,16 +245,23 @@ static int update_state(orte_jobid_t job,
|
||||
ORTE_PROC_MY_NAME->vpid == proc->vpid) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
/* delete the route */
|
||||
orte_routed.delete_route(proc);
|
||||
/* purge the oob */
|
||||
orte_rml.purge(proc);
|
||||
/* see if this was a lifeline */
|
||||
if (ORTE_SUCCESS != orte_routed.route_lost(proc)) {
|
||||
/* kill our children */
|
||||
killprocs(ORTE_JOBID_WILDCARD, ORTE_VPID_WILDCARD);
|
||||
/* tell the caller we can't recover */
|
||||
return ORTE_ERR_UNRECOVERABLE;
|
||||
/* terminate - our routed children will see
|
||||
* us leave and automatically die
|
||||
*/
|
||||
orte_quit();
|
||||
}
|
||||
/* purge the oob */
|
||||
orte_rml.purge(proc);
|
||||
/* was it a daemon that failed? */
|
||||
if (proc->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
/* if all my routes are gone, then terminate ourselves */
|
||||
if (0 == orte_routed.num_routes()) {
|
||||
orte_quit();
|
||||
}
|
||||
}
|
||||
/* if not, then indicate we can continue */
|
||||
return ORTE_SUCCESS;
|
||||
@ -272,10 +280,17 @@ static int update_state(orte_jobid_t job,
|
||||
}
|
||||
}
|
||||
if (NULL == jobdat) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
/* must already be complete */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if there are no local procs for this job, we can
|
||||
* ignore this call
|
||||
*/
|
||||
if (0 == jobdat->num_local_procs) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_errmgr_base.output,
|
||||
"%s errmgr:orted got state %s for proc %s pid %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
|
@ -37,6 +37,7 @@
|
||||
#include "opal/mca/pstat/base/base.h"
|
||||
#include "opal/mca/paffinity/base/base.h"
|
||||
#include "opal/mca/sysinfo/base/base.h"
|
||||
#include "opal/util/os_path.h"
|
||||
|
||||
#include "orte/mca/rml/base/base.h"
|
||||
#include "orte/mca/routed/base/base.h"
|
||||
@ -66,18 +67,61 @@
|
||||
#include "orte/runtime/orte_cr.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
|
||||
#include "orte/mca/ess/base/base.h"
|
||||
|
||||
static bool plm_in_use;
|
||||
/* local globals */
|
||||
static bool plm_in_use=false;
|
||||
static bool signals_set=false;
|
||||
static struct opal_event term_handler;
|
||||
static struct opal_event int_handler;
|
||||
static struct opal_event epipe_handler;
|
||||
#ifndef __WINDOWS__
|
||||
static struct opal_event sigusr1_handler;
|
||||
static struct opal_event sigusr2_handler;
|
||||
#endif /* __WINDOWS__ */
|
||||
char *log_path = NULL;
|
||||
static void shutdown_signal(int fd, short flags, void *arg);
|
||||
static void signal_callback(int fd, short flags, void *arg);
|
||||
static void epipe_signal_callback(int fd, short flags, void *arg);
|
||||
|
||||
int orte_ess_base_orted_setup(char **hosts)
|
||||
{
|
||||
int ret;
|
||||
int fd;
|
||||
char log_file[PATH_MAX];
|
||||
char *jobidstring;
|
||||
char *error = NULL;
|
||||
char *plm_to_use;
|
||||
int value;
|
||||
|
||||
#ifndef __WINDOWS__
|
||||
/* setup callback for SIGPIPE */
|
||||
opal_signal_set(&epipe_handler, SIGPIPE,
|
||||
epipe_signal_callback, &epipe_handler);
|
||||
opal_signal_add(&epipe_handler, NULL);
|
||||
/* Set signal handlers to catch kill signals so we can properly clean up
|
||||
* after ourselves.
|
||||
*/
|
||||
opal_event_set(&term_handler, SIGTERM, OPAL_EV_SIGNAL,
|
||||
shutdown_signal, NULL);
|
||||
opal_event_add(&term_handler, NULL);
|
||||
opal_event_set(&int_handler, SIGINT, OPAL_EV_SIGNAL,
|
||||
shutdown_signal, NULL);
|
||||
opal_event_add(&int_handler, NULL);
|
||||
|
||||
/** setup callbacks for signals we should ignore */
|
||||
opal_signal_set(&sigusr1_handler, SIGUSR1,
|
||||
signal_callback, &sigusr1_handler);
|
||||
opal_signal_add(&sigusr1_handler, NULL);
|
||||
opal_signal_set(&sigusr2_handler, SIGUSR2,
|
||||
signal_callback, &sigusr2_handler);
|
||||
opal_signal_add(&sigusr2_handler, NULL);
|
||||
#endif /* __WINDOWS__ */
|
||||
|
||||
signals_set = true;
|
||||
|
||||
/* initialize the global list of local children and job data */
|
||||
OBJ_CONSTRUCT(&orte_local_children, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_local_jobdata, opal_list_t);
|
||||
@ -321,10 +365,48 @@ int orte_ess_base_orted_setup(char **hosts)
|
||||
goto error;
|
||||
}
|
||||
/* Once the session directory location has been established, set
|
||||
the opal_output env file location to be in the
|
||||
proc-specific session directory. */
|
||||
the opal_output env file location to be in the
|
||||
proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
/* setup stdout/stderr */
|
||||
if (orte_debug_daemons_file_flag) {
|
||||
/* if we are debugging to a file, then send stdout/stderr to
|
||||
* the orted log file
|
||||
*/
|
||||
|
||||
/* get my jobid */
|
||||
if (ORTE_SUCCESS != (ret = orte_util_convert_jobid_to_string(&jobidstring,
|
||||
ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "convert_jobid";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* define a log file name in the session directory */
|
||||
snprintf(log_file, PATH_MAX, "output-orted-%s-%s.log",
|
||||
jobidstring, orte_process_info.nodename);
|
||||
log_path = opal_os_path(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.top_session_dir,
|
||||
log_file,
|
||||
NULL);
|
||||
|
||||
fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640);
|
||||
if (fd < 0) {
|
||||
/* couldn't open the file for some reason, so
|
||||
* just connect everything to /dev/null
|
||||
*/
|
||||
fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666);
|
||||
} else {
|
||||
dup2(fd, STDOUT_FILENO);
|
||||
dup2(fd, STDERR_FILENO);
|
||||
if(fd != STDOUT_FILENO && fd != STDERR_FILENO) {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* setup the routed info - the selected routed component
|
||||
@ -434,7 +516,7 @@ int orte_ess_base_orted_setup(char **hosts)
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
|
||||
error:
|
||||
error:
|
||||
orte_show_help("help-orte-runtime.txt",
|
||||
"orte_init:startup:internal-failure",
|
||||
true, error, ORTE_ERROR_NAME(ret), ret);
|
||||
@ -447,14 +529,27 @@ int orte_ess_base_orted_finalize(void)
|
||||
/* stop the local sensors */
|
||||
orte_sensor.stop(ORTE_PROC_MY_NAME->jobid);
|
||||
|
||||
/* ensure all the orteds depart together */
|
||||
if (!orte_abnormal_term_ordered) {
|
||||
/* if we are abnormally terminating, don't attempt
|
||||
* to do a barrier as nobody else will be entering
|
||||
* that call
|
||||
*/
|
||||
orte_grpcomm.onesided_barrier();
|
||||
if (signals_set) {
|
||||
/* Release all local signal handlers */
|
||||
opal_event_del(&epipe_handler);
|
||||
opal_event_del(&term_handler);
|
||||
opal_event_del(&int_handler);
|
||||
#ifndef __WINDOWS__
|
||||
opal_signal_del(&sigusr1_handler);
|
||||
opal_signal_del(&sigusr2_handler);
|
||||
#endif /* __WINDOWS__ */
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
if (NULL != log_path) {
|
||||
unlink(log_path);
|
||||
}
|
||||
|
||||
/* make sure our local procs are dead */
|
||||
orte_odls.kill_local_procs(NULL);
|
||||
|
||||
/* whack any lingering session directory files from our jobs */
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
orte_sensor_base_close();
|
||||
orte_db_base_close();
|
||||
@ -493,3 +588,29 @@ int orte_ess_base_orted_finalize(void)
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void shutdown_signal(int fd, short flags, void *arg)
|
||||
{
|
||||
/* trigger the call to shutdown callback to protect
|
||||
* against race conditions - the trigger event will
|
||||
* check the one-time lock
|
||||
*/
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
orte_quit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Deal with sigpipe errors
|
||||
*/
|
||||
static void epipe_signal_callback(int fd, short flags, void *arg)
|
||||
{
|
||||
/* for now, we just announce and ignore them */
|
||||
opal_output(0, "%s reports a SIGPIPE error on fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
|
||||
return;
|
||||
}
|
||||
|
||||
static void signal_callback(int fd, short event, void *arg)
|
||||
{
|
||||
/* just ignore these signals */
|
||||
}
|
||||
|
@ -43,7 +43,6 @@
|
||||
#include "opal/mca/paffinity/base/base.h"
|
||||
#include "opal/mca/sysinfo/base/base.h"
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/mca/rml/base/base.h"
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
#include "orte/mca/routed/base/base.h"
|
||||
@ -53,6 +52,7 @@
|
||||
#include "orte/mca/iof/base/base.h"
|
||||
#include "orte/mca/ras/base/base.h"
|
||||
#include "orte/mca/plm/base/base.h"
|
||||
#include "orte/mca/plm/plm.h"
|
||||
#include "orte/mca/odls/base/base.h"
|
||||
#include "orte/mca/notifier/base/base.h"
|
||||
#include "orte/mca/rmcast/base/base.h"
|
||||
@ -60,12 +60,14 @@
|
||||
#include "orte/mca/sensor/base/base.h"
|
||||
#include "orte/mca/sensor/sensor.h"
|
||||
#include "orte/mca/debugger/base/base.h"
|
||||
|
||||
#include "orte/mca/debugger/debugger.h"
|
||||
#include "orte/mca/rmaps/base/base.h"
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
#include "orte/mca/snapc/base/base.h"
|
||||
#endif
|
||||
#include "orte/mca/filem/base/base.h"
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/util/session_dir.h"
|
||||
#include "orte/util/hnp_contact.h"
|
||||
@ -76,8 +78,11 @@
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
#include "orte/runtime/orte_cr.h"
|
||||
#include "orte/runtime/orte_locks.h"
|
||||
#include "orte/runtime/orte_data_server.h"
|
||||
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/ess/base/base.h"
|
||||
#include "orte/mca/ess/hnp/ess_hnp.h"
|
||||
@ -108,6 +113,23 @@ orte_ess_base_module_t orte_ess_hnp_module = {
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
/* local globals */
|
||||
static bool signals_set=false;
|
||||
static struct opal_event term_handler;
|
||||
static struct opal_event int_handler;
|
||||
static struct opal_event epipe_handler;
|
||||
#ifndef __WINDOWS__
|
||||
static struct opal_event sigusr1_handler;
|
||||
static struct opal_event sigusr2_handler;
|
||||
static struct opal_event sigtstp_handler;
|
||||
static struct opal_event sigcont_handler;
|
||||
#endif /* __WINDOWS__ */
|
||||
|
||||
static void abort_signal_callback(int fd, short flags, void *arg);
|
||||
static void abort_exit_callback(int fd, short event, void *arg);
|
||||
static void epipe_signal_callback(int fd, short flags, void *arg);
|
||||
static void signal_forward_callback(int fd, short event, void *arg);
|
||||
|
||||
static int rte_init(void)
|
||||
{
|
||||
int ret;
|
||||
@ -124,6 +146,41 @@ static int rte_init(void)
|
||||
goto error;
|
||||
}
|
||||
|
||||
#ifndef __WINDOWS__
|
||||
/* setup callback for SIGPIPE */
|
||||
opal_signal_set(&epipe_handler, SIGPIPE,
|
||||
epipe_signal_callback, &epipe_handler);
|
||||
opal_signal_add(&epipe_handler, NULL);
|
||||
/** setup callbacks for abort signals - from this point
|
||||
* forward, we need to abort in a manner that allows us
|
||||
* to cleanup
|
||||
*/
|
||||
opal_signal_set(&term_handler, SIGTERM,
|
||||
abort_signal_callback, &term_handler);
|
||||
opal_signal_add(&term_handler, NULL);
|
||||
opal_signal_set(&int_handler, SIGINT,
|
||||
abort_signal_callback, &int_handler);
|
||||
opal_signal_add(&int_handler, NULL);
|
||||
|
||||
/** setup callbacks for signals we should foward */
|
||||
opal_signal_set(&sigusr1_handler, SIGUSR1,
|
||||
signal_forward_callback, &sigusr1_handler);
|
||||
opal_signal_add(&sigusr1_handler, NULL);
|
||||
opal_signal_set(&sigusr2_handler, SIGUSR2,
|
||||
signal_forward_callback, &sigusr2_handler);
|
||||
opal_signal_add(&sigusr2_handler, NULL);
|
||||
if (orte_forward_job_control) {
|
||||
opal_signal_set(&sigtstp_handler, SIGTSTP,
|
||||
signal_forward_callback, &sigtstp_handler);
|
||||
opal_signal_add(&sigtstp_handler, NULL);
|
||||
opal_signal_set(&sigcont_handler, SIGCONT,
|
||||
signal_forward_callback, &sigcont_handler);
|
||||
opal_signal_add(&sigcont_handler, NULL);
|
||||
}
|
||||
#endif /* __WINDOWS__ */
|
||||
|
||||
signals_set = true;
|
||||
|
||||
/* determine the topology info */
|
||||
if (0 == orte_default_num_sockets_per_board) {
|
||||
/* we weren't given a number, so try to determine it */
|
||||
@ -615,6 +672,24 @@ static int rte_finalize(void)
|
||||
orte_job_t *job;
|
||||
int i;
|
||||
|
||||
if (signals_set) {
|
||||
/* Remove the epipe handler */
|
||||
opal_signal_del(&epipe_handler);
|
||||
/* Remove the TERM and INT signal handlers */
|
||||
opal_signal_del(&term_handler);
|
||||
opal_signal_del(&int_handler);
|
||||
#ifndef __WINDOWS__
|
||||
/** Remove the USR signal handlers */
|
||||
opal_signal_del(&sigusr1_handler);
|
||||
opal_signal_del(&sigusr2_handler);
|
||||
if (orte_forward_job_control) {
|
||||
opal_signal_del(&sigtstp_handler);
|
||||
opal_signal_del(&sigcont_handler);
|
||||
}
|
||||
#endif /* __WINDOWS__ */
|
||||
signals_set = false;
|
||||
}
|
||||
|
||||
/* stop the debuggers */
|
||||
orte_debugger_base_close();
|
||||
|
||||
@ -879,3 +954,132 @@ static int update_nidmap(opal_byte_object_t *bo)
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool forcibly_die=false;
|
||||
|
||||
static void abort_exit_callback(int fd, short ign, void *arg)
|
||||
{
|
||||
int ret;
|
||||
|
||||
fprintf(stderr, "%s: killing job...\n\n", orte_basename);
|
||||
|
||||
/* since we are being terminated by a user's signal, be
|
||||
* sure to exit with a non-zero exit code - but don't
|
||||
* overwrite any error code from a proc that might have
|
||||
* failed, in case that is why the user ordered us
|
||||
* to terminate
|
||||
*/
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
|
||||
/* terminate the job - this will also wakeup orterun so
|
||||
* it can report to the user and kill all the orteds.
|
||||
* Check the jobid, though, just in case the user
|
||||
* hit ctrl-c before we had a chance to setup the
|
||||
* job in the system - in which case there is nothing
|
||||
* to terminate!
|
||||
*/
|
||||
if (!orte_never_launched) {
|
||||
/* if the debuggers were run, clean up */
|
||||
orte_debugger.finalize();
|
||||
|
||||
/*
|
||||
* Turn off the process recovery functionality, if it was enabled.
|
||||
* This keeps the errmgr from trying to recover from the shutdown
|
||||
* procedure.
|
||||
*/
|
||||
orte_enable_recovery = false;
|
||||
|
||||
/* terminate the orteds - they will automatically kill
|
||||
* their local procs
|
||||
*/
|
||||
ret = orte_plm.terminate_orteds();
|
||||
|
||||
} else {
|
||||
/* if the jobid is invalid or we never launched,
|
||||
* there is nothing to do but just clean ourselves
|
||||
* up and exit
|
||||
*/
|
||||
orte_quit();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to terminate the job and wait for callback indicating
|
||||
* the job has been aborted.
|
||||
*/
|
||||
static void abort_signal_callback(int fd, short flags, void *arg)
|
||||
{
|
||||
/* if we have already ordered this once, don't keep
|
||||
* doing it to avoid race conditions
|
||||
*/
|
||||
if (!opal_atomic_trylock(&orte_abort_inprogress_lock)) { /* returns 1 if already locked */
|
||||
if (forcibly_die) {
|
||||
/* kill any local procs */
|
||||
orte_odls.kill_local_procs(NULL);
|
||||
|
||||
/* whack any lingering session directory files from our jobs */
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
/* cleanup our data server */
|
||||
orte_data_server_finalize();
|
||||
|
||||
/* exit with a non-zero status */
|
||||
exit(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
}
|
||||
fprintf(stderr, "%s: abort is already in progress...hit ctrl-c again to forcibly terminate\n\n", orte_basename);
|
||||
forcibly_die = true;
|
||||
return;
|
||||
}
|
||||
|
||||
/* set the global abnormal exit flag so we know not to
|
||||
* use the standard xcast for terminating orteds
|
||||
*/
|
||||
orte_abnormal_term_ordered = true;
|
||||
/* ensure that the forwarding of stdin stops */
|
||||
orte_job_term_ordered = true;
|
||||
|
||||
/* tell us to be quiet - hey, the user killed us with a ctrl-c,
|
||||
* so need to tell them that!
|
||||
*/
|
||||
orte_execute_quiet = true;
|
||||
|
||||
/* We are in an event handler; the job completed procedure
|
||||
will delete the signal handler that is currently running
|
||||
(which is a Bad Thing), so we can't call it directly.
|
||||
Instead, we have to exit this handler and setup to call
|
||||
job_completed() after this. */
|
||||
ORTE_TIMER_EVENT(0, 0, abort_exit_callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deal with sigpipe errors
|
||||
*/
|
||||
static void epipe_signal_callback(int fd, short flags, void *arg)
|
||||
{
|
||||
/* for now, we just announce and ignore them */
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_verbosity,
|
||||
"%s reports a SIGPIPE error on fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd));
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass user signals to the remote application processes
|
||||
*/
|
||||
static void signal_forward_callback(int fd, short event, void *arg)
|
||||
{
|
||||
struct opal_event *signal = (struct opal_event*)arg;
|
||||
int signum, ret;
|
||||
|
||||
signum = OPAL_EVENT_SIGNAL(signal);
|
||||
if (!orte_execute_quiet){
|
||||
fprintf(stderr, "%s: Forwarding signal %d to job\n",
|
||||
orte_basename, signum);
|
||||
}
|
||||
|
||||
/** send the signal out to the processes, including any descendants */
|
||||
if (ORTE_SUCCESS != (ret = orte_plm.signal_job(ORTE_JOBID_WILDCARD, signum))) {
|
||||
fprintf(stderr, "Signal %d could not be sent to the job (returned %d)",
|
||||
signum, ret);
|
||||
}
|
||||
}
|
||||
|
@ -164,8 +164,6 @@ static int rte_finalize(void)
|
||||
|
||||
/* if I am a daemon, finalize using the default procedure */
|
||||
if (ORTE_PROC_IS_DAEMON) {
|
||||
/* don't need to do the barrier */
|
||||
orte_orted_exit_with_barrier = false;
|
||||
if (ORTE_SUCCESS != (ret = orte_ess_base_orted_finalize())) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
}
|
||||
|
@ -52,7 +52,7 @@
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
|
||||
#include "orte/mca/filem/filem.h"
|
||||
#include "orte/mca/filem/base/base.h"
|
||||
@ -195,7 +195,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende
|
||||
if (NULL == (jdata = orte_get_job_data_object(name.jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
ORTE_UPDATE_EXIT_STATUS(1);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* get the proc object for it */
|
||||
@ -203,7 +203,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende
|
||||
if (NULL == procs[name.vpid] || NULL == procs[name.vpid]->node) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
ORTE_UPDATE_EXIT_STATUS(1);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
@ -213,7 +213,7 @@ static void filem_base_process_get_proc_node_name_cmd(orte_process_name_t* sende
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &(procs[name.vpid]->node->name), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(1);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
@ -299,13 +299,13 @@ static void filem_base_process_get_remote_path_cmd(orte_process_name_t* sender,
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &tmp_name, 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(1);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&answer, &file_type, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(1);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,6 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int bad_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int bad_barrier(void);
|
||||
static int bad_onesided_barrier(void);
|
||||
static int modex(opal_list_t *procs);
|
||||
|
||||
/* Module def */
|
||||
@ -62,7 +61,6 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
|
||||
bad_allgather,
|
||||
orte_grpcomm_base_allgather_list,
|
||||
bad_barrier,
|
||||
bad_onesided_barrier,
|
||||
orte_grpcomm_base_set_proc_attr,
|
||||
orte_grpcomm_base_get_proc_attr,
|
||||
modex,
|
||||
@ -70,7 +68,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
|
||||
};
|
||||
|
||||
/* Local variables */
|
||||
static orte_grpcomm_collective_t barrier, allgather, onesided_barrier;
|
||||
static orte_grpcomm_collective_t barrier, allgather;
|
||||
|
||||
/**
|
||||
* Initialize the module
|
||||
@ -87,7 +85,6 @@ static int init(void)
|
||||
/* setup global variables */
|
||||
OBJ_CONSTRUCT(&barrier, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&allgather, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&onesided_barrier, orte_grpcomm_collective_t);
|
||||
|
||||
/* if we are a daemon or the hnp, we need to post a
|
||||
* recv to catch any collective operations
|
||||
@ -115,7 +112,6 @@ static void finalize(void)
|
||||
/* destruct the globals */
|
||||
OBJ_DESTRUCT(&barrier);
|
||||
OBJ_DESTRUCT(&allgather);
|
||||
OBJ_DESTRUCT(&onesided_barrier);
|
||||
|
||||
/* if we are a daemon or the hnp, we need to cancel the
|
||||
* recv we posted
|
||||
@ -229,124 +225,6 @@ static int bad_barrier(void)
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void onesided_barrier_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_grpcomm_collective_t *coll = (orte_grpcomm_collective_t*)cbdata;
|
||||
|
||||
OPAL_THREAD_LOCK(&coll->lock);
|
||||
/* flag as recvd */
|
||||
coll->recvd += 1;
|
||||
if (orte_process_info.num_procs == coll->recvd) {
|
||||
opal_condition_broadcast(&coll->cond);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&coll->lock);
|
||||
}
|
||||
|
||||
/* quick timeout loop */
|
||||
static bool timer_fired;
|
||||
|
||||
static void quicktime_cb(int fd, short event, void *cbdata)
|
||||
{
|
||||
/* declare it fired */
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static int bad_onesided_barrier(void)
|
||||
{
|
||||
opal_list_t daemon_tree;
|
||||
opal_list_item_t *item;
|
||||
opal_buffer_t buf;
|
||||
orte_process_name_t my_parent;
|
||||
opal_event_t *quicktime=NULL;
|
||||
struct timeval quicktimeval;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad: onesided barrier called",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if we are not to use the barrier, then just return */
|
||||
if (!orte_orted_exit_with_barrier) {
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are the HNP, we need to do a little delay to give
|
||||
* the orteds a chance to exit before we leave
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad: onesided barrier adding delay timer",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
quicktimeval.tv_sec = 0;
|
||||
quicktimeval.tv_usec = 100;
|
||||
timer_fired = false;
|
||||
ORTE_DETECT_TIMEOUT(&quicktime, orte_process_info.num_procs, 1000, 10000, quicktime_cb);
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* figure out how many participants we should be expecting */
|
||||
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
|
||||
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
my_parent.vpid = orte_routed.get_routing_tree(&daemon_tree);
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
onesided_barrier.recvd += orte_process_info.num_procs - opal_list_get_size(&daemon_tree);
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad: onesided barrier num_participating %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)(orte_process_info.num_procs - opal_list_get_size(&daemon_tree))));
|
||||
|
||||
/* disassemble the daemon tree */
|
||||
while (NULL != (item = opal_list_remove_first(&daemon_tree))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&daemon_tree);
|
||||
|
||||
/* set the recv */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_ONESIDED_BARRIER,
|
||||
ORTE_RML_PERSISTENT,
|
||||
onesided_barrier_recv,
|
||||
&onesided_barrier))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* wait to get all my inputs */
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
while (onesided_barrier.recvd < orte_process_info.num_procs) {
|
||||
opal_condition_wait(&onesided_barrier.cond, &onesided_barrier.lock);
|
||||
}
|
||||
/* reset the collective */
|
||||
onesided_barrier.recvd = 0;
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
|
||||
/* cancel the recv */
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ONESIDED_BARRIER);
|
||||
|
||||
/* if I am the HNP, then we are done */
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* send a zero-byte msg to my parent */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* send it */
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad:onsided:barrier not the HNP - sending to parent %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&my_parent)));
|
||||
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_ONESIDED_BARRIER, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
|
@ -55,7 +55,6 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int basic_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int basic_barrier(void);
|
||||
static int basic_onesided_barrier(void);
|
||||
static int modex(opal_list_t *procs);
|
||||
static int set_proc_attr(const char *attr_name, const void *data, size_t size);
|
||||
static int get_proc_attr(const orte_process_name_t proc,
|
||||
@ -70,7 +69,6 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||
basic_allgather,
|
||||
orte_grpcomm_base_allgather_list,
|
||||
basic_barrier,
|
||||
basic_onesided_barrier,
|
||||
set_proc_attr,
|
||||
get_proc_attr,
|
||||
modex,
|
||||
@ -78,7 +76,7 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||
};
|
||||
|
||||
/* Local variables */
|
||||
static orte_grpcomm_collective_t barrier, allgather, onesided_barrier;
|
||||
static orte_grpcomm_collective_t barrier, allgather;
|
||||
|
||||
static bool recv_on;
|
||||
static opal_buffer_t *profile_buf=NULL;
|
||||
@ -118,7 +116,6 @@ static int init(void)
|
||||
/* setup global variables */
|
||||
OBJ_CONSTRUCT(&barrier, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&allgather, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&onesided_barrier, orte_grpcomm_collective_t);
|
||||
|
||||
if (ORTE_PROC_IS_HNP && recv_on) {
|
||||
/* open the profile file for writing */
|
||||
@ -186,7 +183,6 @@ static void finalize(void)
|
||||
/* destruct the globals */
|
||||
OBJ_DESTRUCT(&barrier);
|
||||
OBJ_DESTRUCT(&allgather);
|
||||
OBJ_DESTRUCT(&onesided_barrier);
|
||||
|
||||
if (ORTE_PROC_IS_HNP && recv_on) {
|
||||
/* if we are profiling and I am the HNP, then stop the
|
||||
@ -311,124 +307,6 @@ static int basic_barrier(void)
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void onesided_barrier_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
orte_grpcomm_collective_t *coll = (orte_grpcomm_collective_t*)cbdata;
|
||||
|
||||
OPAL_THREAD_LOCK(&coll->lock);
|
||||
/* flag as recvd */
|
||||
coll->recvd += 1;
|
||||
if (orte_process_info.num_procs == coll->recvd) {
|
||||
opal_condition_broadcast(&coll->cond);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&coll->lock);
|
||||
}
|
||||
/* quick timeout loop */
|
||||
static bool timer_fired;
|
||||
|
||||
static void quicktime_cb(int fd, short event, void *cbdata)
|
||||
{
|
||||
/* declare it fired */
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static int basic_onesided_barrier(void)
|
||||
{
|
||||
opal_list_t daemon_tree;
|
||||
opal_list_item_t *item;
|
||||
opal_buffer_t buf;
|
||||
orte_process_name_t my_parent;
|
||||
opal_event_t *quicktime=NULL;
|
||||
struct timeval quicktimeval;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:basic: onesided barrier called",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if we are not to use the barrier, then just return */
|
||||
if (!orte_orted_exit_with_barrier) {
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are the HNP, we need to do a little delay to give
|
||||
* the orteds a chance to exit before we leave
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:basic: onesided barrier adding delay timer",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
quicktimeval.tv_sec = 0;
|
||||
quicktimeval.tv_usec = 100;
|
||||
timer_fired = false;
|
||||
ORTE_DETECT_TIMEOUT(&quicktime, orte_process_info.num_procs, 1000, 10000, quicktime_cb);
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* figure out how many participants we should be expecting */
|
||||
OBJ_CONSTRUCT(&daemon_tree, opal_list_t);
|
||||
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
my_parent.vpid = orte_routed.get_routing_tree(&daemon_tree);
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
onesided_barrier.recvd += orte_process_info.num_procs - opal_list_get_size(&daemon_tree);
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:basic: onesided barrier num_participating %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)(orte_process_info.num_procs - opal_list_get_size(&daemon_tree))));
|
||||
|
||||
/* disassemble the daemon tree */
|
||||
while (NULL != (item = opal_list_remove_first(&daemon_tree))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_DESTRUCT(&daemon_tree);
|
||||
|
||||
/* set the recv */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_ONESIDED_BARRIER,
|
||||
ORTE_RML_PERSISTENT,
|
||||
onesided_barrier_recv,
|
||||
&onesided_barrier))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* wait to get all my inputs */
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
while (onesided_barrier.recvd < orte_process_info.num_procs) {
|
||||
opal_condition_wait(&onesided_barrier.cond, &onesided_barrier.lock);
|
||||
}
|
||||
/* reset the collective */
|
||||
onesided_barrier.recvd = 0;
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
|
||||
/* cancel the recv */
|
||||
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ONESIDED_BARRIER);
|
||||
|
||||
/* if I am the HNP, then we are done */
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* send a zero-byte msg to my parent */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* send it */
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:basic:onsided:barrier not the HNP - sending to parent %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&my_parent)));
|
||||
if (0 > (rc = orte_rml.send_buffer(&my_parent, &buf, ORTE_RML_TAG_ONESIDED_BARRIER, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
|
@ -75,7 +75,6 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = {
|
||||
allgather,
|
||||
allgather_list,
|
||||
orte_grpcomm_cnos_barrier,
|
||||
orte_grpcomm_cnos_barrier,
|
||||
set_proc_attr,
|
||||
get_proc_attr,
|
||||
modex,
|
||||
|
@ -71,11 +71,6 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names,
|
||||
/* barrier function */
|
||||
typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void);
|
||||
|
||||
/* one-sided barrier function - process releases once its
|
||||
* contribution is complete
|
||||
*/
|
||||
typedef int (*orte_grpcomm_base_module_onesided_barrier_fn_t)(void);
|
||||
|
||||
|
||||
/** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION
|
||||
* OF HOW THIS ALL WORKS
|
||||
@ -108,7 +103,6 @@ struct orte_grpcomm_base_module_2_0_0_t {
|
||||
orte_grpcomm_base_module_allgather_fn_t allgather;
|
||||
orte_grpcomm_base_module_allgather_list_fn_t allgather_list;
|
||||
orte_grpcomm_base_module_barrier_fn_t barrier;
|
||||
orte_grpcomm_base_module_onesided_barrier_fn_t onesided_barrier;
|
||||
/* modex functions */
|
||||
orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr;
|
||||
orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr;
|
||||
|
@ -68,7 +68,6 @@ orte_grpcomm_base_module_t orte_grpcomm_hier_module = {
|
||||
hier_allgather,
|
||||
orte_grpcomm_base_allgather_list,
|
||||
hier_barrier,
|
||||
NULL, /* onesided barrier only used by daemons */
|
||||
set_proc_attr,
|
||||
get_proc_attr,
|
||||
modex,
|
||||
|
@ -48,7 +48,6 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int mcast_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int mcast_barrier(void);
|
||||
static int mcast_onesided_barrier(void);
|
||||
static int modex(opal_list_t *procs);
|
||||
static int get_proc_attr(const orte_process_name_t proc,
|
||||
const char * attribute_name, void **val,
|
||||
@ -62,7 +61,6 @@ orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
|
||||
mcast_allgather,
|
||||
orte_grpcomm_base_allgather_list,
|
||||
mcast_barrier,
|
||||
mcast_onesided_barrier,
|
||||
orte_grpcomm_base_set_proc_attr,
|
||||
get_proc_attr,
|
||||
modex,
|
||||
@ -77,7 +75,7 @@ static void daemon_recv(int status,
|
||||
opal_buffer_t *buf, void* cbdata);
|
||||
|
||||
/* Local variables */
|
||||
static orte_grpcomm_collective_t barrier, allgather, onesided_barrier;
|
||||
static orte_grpcomm_collective_t barrier, allgather;
|
||||
|
||||
/**
|
||||
* Initialize the module
|
||||
@ -93,7 +91,6 @@ static int init(void)
|
||||
/* setup global variables */
|
||||
OBJ_CONSTRUCT(&barrier, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&allgather, orte_grpcomm_collective_t);
|
||||
OBJ_CONSTRUCT(&onesided_barrier, orte_grpcomm_collective_t);
|
||||
|
||||
/* point to our collective function */
|
||||
orte_grpcomm_base.daemon_coll = orte_grpcomm_mcast_daemon_coll;
|
||||
@ -130,7 +127,6 @@ static void finalize(void)
|
||||
/* destruct the globals */
|
||||
OBJ_DESTRUCT(&barrier);
|
||||
OBJ_DESTRUCT(&allgather);
|
||||
OBJ_DESTRUCT(&onesided_barrier);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -285,73 +281,6 @@ static int mcast_barrier(void)
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
/* quick timeout loop */
|
||||
static bool timer_fired;
|
||||
|
||||
static void quicktime_cb(int fd, short event, void *cbdata)
|
||||
{
|
||||
/* declare it fired */
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static int mcast_onesided_barrier(void)
|
||||
{
|
||||
opal_event_t *quicktime=NULL;
|
||||
struct timeval quicktimeval;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:mcast: onesided barrier called",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am alone, just return */
|
||||
if (1 == orte_process_info.num_procs) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we are not to use the barrier, then just return */
|
||||
if (!orte_orted_exit_with_barrier) {
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are the HNP, we need to do a little delay to give
|
||||
* the orteds a chance to exit before we leave
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||
"%s grpcomm:mcast: onesided barrier adding delay timer",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
quicktimeval.tv_sec = 0;
|
||||
quicktimeval.tv_usec = 100;
|
||||
timer_fired = false;
|
||||
ORTE_DETECT_TIMEOUT(&quicktime, orte_process_info.num_procs, 1000, 10000, quicktime_cb);
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we are not the HNP, just send and leave */
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
if (ORTE_SUCCESS != (rc = xcast(ORTE_PROC_MY_NAME->jobid, NULL, ORTE_RML_TAG_ONESIDED_BARRIER))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* initialize things */
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
onesided_barrier.recvd += 1; /* account for me */
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
|
||||
/* wait to complete */
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
while (orte_process_info.num_procs <= onesided_barrier.recvd) {
|
||||
opal_condition_wait(&onesided_barrier.cond, &onesided_barrier.lock);
|
||||
}
|
||||
/* reset the collective */
|
||||
onesided_barrier.recvd = 0;
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
@ -551,16 +480,6 @@ static void daemon_recv(int status,
|
||||
ORTE_MESSAGE_EVENT(sender, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
break;
|
||||
|
||||
case ORTE_RML_TAG_ONESIDED_BARRIER:
|
||||
OPAL_THREAD_LOCK(&onesided_barrier.lock);
|
||||
onesided_barrier.recvd += 1;
|
||||
/* check for completion */
|
||||
if (orte_process_info.num_procs <= onesided_barrier.recvd) {
|
||||
opal_condition_broadcast(&onesided_barrier.cond);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&onesided_barrier.lock);
|
||||
break;
|
||||
|
||||
case ORTE_RML_TAG_BARRIER:
|
||||
OPAL_THREAD_LOCK(&barrier.lock);
|
||||
/* the recv is the trigger */
|
||||
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2007-2010 Cisco Systems, Inc. All rights reserved.
|
||||
* Copyright (c) 2009 Institut National de Recherche en Informatique
|
||||
* et Automatique. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -57,7 +57,7 @@
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/orte_locks.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/runtime/orte_quit.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/nidmap.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
@ -149,7 +149,7 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
if (NULL == crud) {
|
||||
orte_never_launched = true;
|
||||
ORTE_UPDATE_EXIT_STATUS(0);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
orte_util_nidmap_init(NULL);
|
||||
@ -173,7 +173,7 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
free(crud);
|
||||
orte_never_launched = true;
|
||||
ORTE_UPDATE_EXIT_STATUS(0);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
|
||||
@ -198,7 +198,7 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
if (orte_do_not_launch) {
|
||||
orte_never_launched = true;
|
||||
ORTE_UPDATE_EXIT_STATUS(0);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
return ORTE_ERR_SILENT;
|
||||
}
|
||||
|
||||
@ -214,7 +214,7 @@ int orte_plm_base_setup_job(orte_job_t *jdata)
|
||||
ORTE_VPID_PRINT(jdata->num_procs));
|
||||
orte_never_launched = true;
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
orte_trigger_event(&orte_exit);
|
||||
orte_jobs_complete();
|
||||
return ORTE_ERROR;
|
||||
}
|
||||