1
1

Try automatically adding local spawn threads to parallelize the fork/exec process to speed up the launch on large SMPs. Harvest the threads after initial spawn to minimize any impact on running jobs.

Change the determination of #spawn threads to be done on basis of #local procs in first job being spawned. Someone can look at an optimization that handles subsequent dynamic spawns that might be larger in size.

Leave the threads running, but blocked, for the life of the daemon, and use them to harvest the local procs as they terminate. This helps short-lived jobs in particular.

Add MCA params to set:
  * max number of spawn threads (default: 4)
  * set a specific number of spawn threads (default: -1, indicating no set number)
  * cutoff - minimum number of local procs before using spawn threads (default: 32)

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-11-24 13:29:02 -08:00
родитель 45db3637af
Коммит 8f496b01b7
12 изменённых файлов: 251 добавлений и 129 удалений

Просмотреть файл

@ -46,6 +46,7 @@
#include "orte/mca/ess/ess.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_quit.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/data_type_support/orte_dt_support.h"
@ -327,6 +328,7 @@ static void proc_errors(int fd, short args, void *cbdata)
orte_plm_cmd_flag_t cmd;
int rc=ORTE_SUCCESS;
int i;
orte_wait_tracker_t *t2;
ORTE_ACQUIRE_OBJECT(caddy);
@ -412,7 +414,14 @@ static void proc_errors(int fd, short args, void *cbdata)
goto cleanup;
}
/* leave the exit code alone - process this as a waitpid */
ompi_odls_base_default_wait_local_proc(child, NULL);
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = orte_event_base;
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, orte_odls_base_default_wait_local_proc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_errmgr_base_framework.framework_output,

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -44,5 +45,9 @@ ORTE_DECLSPEC extern mca_base_framework_t orte_odls_base_framework;
*/
ORTE_DECLSPEC int orte_odls_base_select(void);
ORTE_DECLSPEC void orte_odls_base_start_threads(orte_job_t *jdata);
ORTE_DECLSPEC void orte_odls_base_harvest_threads(void);
END_C_DECLS
#endif

Просмотреть файл

@ -15,7 +15,7 @@
* All rights reserved.
* Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Mellanox Technologies Ltd. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@ -614,6 +614,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
goto REPORT_ERROR;
}
/* spin up the spawn threads */
orte_odls_base_start_threads(jdata);
/* to save memory, purge the job map of all procs other than
* our own - for daemons, this will completely release the
* proc structures. For the HNP, the proc structs will
@ -727,9 +730,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
int rc, i;
bool found;
orte_proc_state_t state;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;
ORTE_ACQUIRE_OBJECT(cd);
@ -772,44 +772,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
goto errorout;
}
/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}
/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}
rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
state = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
goto errorout;
}
/* did the user request we display output in xterms? */
if (NULL != orte_xterm && !ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
opal_list_item_t *nmitem;
@ -878,15 +840,14 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
cd->argv[0] = param;
}
if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
opal_output_verbose(5, orte_odls_base_framework.framework_output,
"%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
if (15 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
/* dump what is going to be exec'd */
if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
if (ORTE_SUCCESS != (rc = cd->fork_local(cd))) {
@ -923,6 +884,9 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
orte_odls_spawn_caddy_t *cd;
opal_event_base_t *evb;
char *effective_dir = NULL;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;
ORTE_ACQUIRE_OBJECT(caddy);
@ -1105,6 +1069,44 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}
/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}
rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
goto GETOUT;
}
/* tell all children that they are being launched via ORTE */
opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env);
@ -1186,10 +1188,17 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
/* determine the thread that will handle this child */
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);
/* dispatch this child to the next available launch thread */
cd = OBJ_NEW(orte_odls_spawn_caddy_t);
@ -1228,16 +1237,11 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
}
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
opal_output_verbose(1, orte_odls_base_framework.framework_output,
"%s odls:dispatch %s to thread %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name),
orte_odls_globals.next_base);
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
@ -1255,11 +1259,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
free(effective_dir);
effective_dir = NULL;
}
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
ERROR_OUT:
/* ensure we reset our working directory back to our default location */
@ -1323,8 +1322,10 @@ int orte_odls_base_default_signal_local_procs(const orte_process_name_t *proc, i
* Wait for a callback indicating the child has completed.
*/
void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata)
{
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
int i;
orte_job_t *jobdat;
orte_proc_state_t state=ORTE_PROC_STATE_WAITPID_FIRED;
@ -1528,6 +1529,8 @@ void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
/* cancel the wait as this proc has already terminated */
orte_wait_cb_cancel(proc);
ORTE_ACTIVATE_PROC_STATE(&proc->name, state);
/* cleanup the tracker */
OBJ_RELEASE(t2);
}
typedef struct {
@ -1903,17 +1906,17 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
goto CLEANUP;
}
}
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s restarting app %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app));
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);

Просмотреть файл

@ -13,7 +13,7 @@
* Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
@ -39,12 +39,13 @@
#include "opal/util/argv.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/plm/plm_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/show_help.h"
#include "orte/util/parse_options.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/plm/plm_types.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/util/threads.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/odls/base/base.h"
@ -78,14 +79,30 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.timeout_before_sigkill);
orte_odls_globals.num_threads = 0;
orte_odls_globals.max_threads = 4;
(void) mca_base_var_register("orte", "odls", "base", "max_threads",
"Maximum number of threads to use for spawning local procs",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.max_threads);
orte_odls_globals.num_threads = -1;
(void) mca_base_var_register("orte", "odls", "base", "num_threads",
"Number of threads to use for spawning local procs",
"Specific number of threads to use for spawning local procs",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.num_threads);
orte_odls_globals.cutoff = 32;
(void) mca_base_var_register("orte", "odls", "base", "cutoff",
"Minimum number of local procs before using thread pool for spawn",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.cutoff);
orte_odls_globals.signal_direct_children_only = false;
(void) mca_base_var_register("orte", "odls", "base", "signal_direct_children_only",
"Whether to restrict signals (e.g., SIGTERM) to direct children, or "
@ -98,12 +115,80 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
return ORTE_SUCCESS;
}
void orte_odls_base_harvest_threads(void)
{
int i;
ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock);
if (0 < orte_odls_globals.num_threads) {
/* stop the progress threads */
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
}
free(orte_odls_globals.ev_bases);
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
orte_odls_globals.num_threads = 0;
if (NULL != orte_odls_globals.ev_threads) {
opal_argv_free(orte_odls_globals.ev_threads);
orte_odls_globals.ev_threads = NULL;
}
}
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
}
void orte_odls_base_start_threads(orte_job_t *jdata)
{
int i;
char *tmp;
ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock);
/* only do this once */
if (NULL != orte_odls_globals.ev_threads) {
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
return;
}
/* setup the pool of worker threads */
orte_odls_globals.ev_threads = NULL;
orte_odls_globals.next_base = 0;
if (0 == orte_odls_globals.num_threads ||
(int)jdata->num_local_procs < orte_odls_globals.cutoff) {
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
} else {
if (-1 == orte_odls_globals.num_threads) {
/* user didn't specify anything, so default to some fraction of
* the number of local procs, capping it at the max num threads
* parameter value. */
orte_odls_globals.num_threads = jdata->num_local_procs / 8;
if (0 == orte_odls_globals.num_threads) {
orte_odls_globals.num_threads = 1;
} else if (orte_odls_globals.max_threads < orte_odls_globals.num_threads) {
orte_odls_globals.num_threads = orte_odls_globals.max_threads;
}
}
orte_odls_globals.ev_bases =
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
for (i=0; i < orte_odls_globals.num_threads; i++) {
asprintf(&tmp, "ORTE-ODLS-%d", i);
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
free(tmp);
}
}
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
}
static int orte_odls_base_close(void)
{
int i;
orte_proc_t *proc;
opal_list_item_t *item;
opal_output(0, "CLOSE");
/* cleanup ODLS globals */
while (NULL != (item = opal_list_remove_first(&orte_odls_globals.xterm_ranks))) {
OBJ_RELEASE(item);
@ -118,14 +203,9 @@ static int orte_odls_base_close(void)
}
OBJ_RELEASE(orte_local_children);
if (0 < orte_odls_globals.num_threads) {
/* stop the progress threads */
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
}
}
free(orte_odls_globals.ev_bases);
opal_argv_free(orte_odls_globals.ev_threads);
orte_odls_base_harvest_threads();
ORTE_DESTRUCT_LOCK(&orte_odls_globals.lock);
return mca_base_framework_components_close(&orte_odls_base_framework, NULL);
}
@ -141,6 +221,9 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
orte_namelist_t *nm;
bool xterm_hold;
ORTE_CONSTRUCT_LOCK(&orte_odls_globals.lock);
orte_odls_globals.lock.active = false; // start with nobody having the thread
/* initialize the global array of local children */
orte_local_children = OBJ_NEW(opal_pointer_array_t);
if (OPAL_SUCCESS != (rc = opal_pointer_array_init(orte_local_children,
@ -202,25 +285,6 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
opal_argv_append_nosize(&orte_odls_globals.xtermcmd, "-e");
}
/* setup the pool of worker threads */
orte_odls_globals.ev_threads = NULL;
orte_odls_globals.next_base = 0;
if (0 == orte_odls_globals.num_threads) {
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
} else {
orte_odls_globals.ev_bases =
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
for (i=0; i < orte_odls_globals.num_threads; i++) {
asprintf(&tmp, "ORTE-ODLS-%d", i);
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
free(tmp);
}
}
/* Open up all available components */
return mca_base_framework_components_open(&orte_odls_base_framework, flags);
}

Просмотреть файл

@ -14,6 +14,8 @@
* reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,7 +42,7 @@
#include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/threads.h"
#include "orte/mca/odls/odls_types.h"
BEGIN_C_DECLS
@ -59,11 +61,14 @@ typedef struct {
/* the xterm cmd to be used */
char **xtermcmd;
/* thread pool */
int max_threads;
int num_threads;
int cutoff;
opal_event_base_t **ev_bases; // event base array for progress threads
char** ev_threads; // event progress thread names
int next_base; // counter to load-level thread use
bool signal_direct_children_only;
orte_lock_t lock;
} orte_odls_globals_t;
ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals;
@ -127,7 +132,7 @@ OBJ_CLASS_DECLARATION(orte_odls_launch_local_t);
ORTE_DECLSPEC void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata);
ORTE_DECLSPEC void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata);
ORTE_DECLSPEC void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata);
/* define a function type to signal a local proc */
typedef int (*orte_odls_base_signal_local_fn_t)(pid_t pid, int signum);

Просмотреть файл

@ -517,7 +517,9 @@ static int plm_alps_finalize(void)
}
static void alps_wait_cb(orte_proc_t *proc, void* cbdata){
static void alps_wait_cb(int sd, short args, void *cbdata) {
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
orte_job_t *jdata;
/* According to the ALPS folks, alps always returns the highest exit
@ -550,6 +552,7 @@ static void alps_wait_cb(orte_proc_t *proc, void* cbdata){
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ABORTED);
}
}
OBJ_RELEASE(t2);
}

Просмотреть файл

@ -49,6 +49,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/iof/base/base.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/ras/base/base.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/rmaps/base/base.h"
@ -56,7 +57,6 @@
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/odls/odls.h"
#if OPAL_ENABLE_FT_CR == 1
#include "orte/mca/snapc/base/base.h"
#endif

Просмотреть файл

@ -259,10 +259,11 @@ static int rsh_init(void)
/**
* Callback on daemon exit.
*/
static void rsh_wait_daemon(orte_proc_t *daemon, void* cbdata)
static void rsh_wait_daemon(int sd, short flags, void *cbdata)
{
orte_job_t *jdata;
orte_plm_rsh_caddy_t *caddy=(orte_plm_rsh_caddy_t*)cbdata;
orte_proc_t *daemon = caddy->daemon;
char *rtmod;
if (orte_orteds_term_ordered || orte_abnormal_term_ordered) {
@ -938,7 +939,7 @@ static void process_launch_list(int fd, short args, void *cbdata)
caddy = (orte_plm_rsh_caddy_t*)item;
/* register the sigchild callback */
ORTE_FLAG_SET(caddy->daemon, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(caddy->daemon, rsh_wait_daemon, (void*)caddy);
orte_wait_cb(caddy->daemon, rsh_wait_daemon, orte_event_base, (void*)caddy);
/* fork a child to exec the rsh/ssh session */
pid = fork();

Просмотреть файл

@ -524,7 +524,9 @@ static int plm_slurm_finalize(void)
}
static void srun_wait_cb(orte_proc_t *proc, void* cbdata){
static void srun_wait_cb(int sd, short fd, void *cbdata){
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
orte_job_t *jdata;
/* According to the SLURM folks, srun always returns the highest exit
@ -576,7 +578,7 @@ static void srun_wait_cb(orte_proc_t *proc, void* cbdata){
}
/* done with this dummy */
OBJ_RELEASE(proc);
OBJ_RELEASE(t2);
}
@ -613,7 +615,7 @@ static int plm_slurm_start_proc(int argc, char **argv, char **env,
/* be sure to mark it as alive so we don't instantly fire */
ORTE_FLAG_SET(dummy, ORTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if srun succeeds! */
orte_wait_cb(dummy, srun_wait_cb, NULL);
orte_wait_cb(dummy, srun_wait_cb, orte_event_base, NULL);
if (0 == srun_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;

Просмотреть файл

@ -23,6 +23,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/base/base.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h"
@ -288,6 +289,13 @@ static void track_procs(int fd, short argc, void *cbdata)
/* update the proc state */
pdata->state = state;
jdata->num_launched++;
if (jdata->num_launched == jdata->num_local_procs) {
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
}
/* don't update until we are told that all are done */
} else if (ORTE_PROC_STATE_REGISTERED == state) {
/* update the proc state */

Просмотреть файл

@ -80,14 +80,7 @@ OBJ_CLASS_INSTANCE(orte_timer_t,
timer_const,
timer_dest);
/* Local objects */
typedef struct {
opal_list_item_t super;
opal_event_t ev;
orte_proc_t *child;
orte_wait_fn_t cbfunc;
void *cbdata;
} orte_wait_tracker_t;
static void wccon(orte_wait_tracker_t *p)
{
p->child = NULL;
@ -100,9 +93,9 @@ static void wcdes(orte_wait_tracker_t *p)
OBJ_RELEASE(p->child);
}
}
static OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
opal_list_item_t,
wccon, wcdes);
OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
opal_list_item_t,
wccon, wcdes);
/* Local Variables */
static opal_event_t handler;
@ -150,7 +143,8 @@ int orte_wait_finalize(void)
/* this function *must* always be called from
* within an event in the orte_event_base */
void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback,
opal_event_base_t *evb, void *data)
{
orte_wait_tracker_t *t2;
@ -162,8 +156,19 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
/* see if this proc is still alive */
if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
/* already heard this proc is dead, so just do the callback */
callback(child, data);
if (NULL != callback) {
/* already heard this proc is dead, so just do the callback */
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, t2->cbfunc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
}
return;
}
@ -179,6 +184,7 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
opal_list_append(&pending_cbs, &t2->super);
@ -254,11 +260,15 @@ static void wait_signal_callback(int fd, short event, void *arg)
if (pid == t2->child->pid) {
/* found it! */
t2->child->exit_code = status;
if (NULL != t2->cbfunc) {
t2->cbfunc(t2->child, t2->cbdata);
}
opal_list_remove_item(&pending_cbs, &t2->super);
OBJ_RELEASE(t2);
if (NULL != t2->cbfunc) {
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, t2->cbfunc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
} else {
OBJ_RELEASE(t2);
}
break;
}
}

Просмотреть файл

@ -53,7 +53,18 @@
BEGIN_C_DECLS
/** typedef for callback function used in \c orte_wait_cb */
typedef void (*orte_wait_fn_t)(orte_proc_t *proc, void *data);
typedef void (*orte_wait_cbfunc_t)(int fd, short args, void* cb);
/* define a tracker */
typedef struct {
opal_list_item_t super;
opal_event_t ev;
opal_event_base_t *evb;
orte_proc_t *child;
orte_wait_cbfunc_t cbfunc;
void *cbdata;
} orte_wait_tracker_t;
OBJ_CLASS_DECLARATION(orte_wait_tracker_t);
/**
* Disable / re-Enable SIGCHLD handler
@ -71,7 +82,8 @@ ORTE_DECLSPEC void orte_wait_disable(void);
* \c waitpid() will have already been called on the process at this
* time.
*/
ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_fn_t callback, void *data);
ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_cbfunc_t callback,
opal_event_base_t *evb, void *data);
ORTE_DECLSPEC void orte_wait_cb_cancel(orte_proc_t *proc);