1
1

Merge pull request #4532 from rhc54/topic/odls

Try adding local spawn threads by default to parallelize the fork/exec process.
Этот коммит содержится в:
Ralph Castain 2017-11-30 06:38:32 -08:00 коммит произвёл GitHub
родитель b310add995 335fc96f42
Коммит 0fcc996c41
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 253 добавлений и 130 удалений

Просмотреть файл

@ -46,6 +46,7 @@
#include "orte/mca/ess/ess.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_quit.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/data_type_support/orte_dt_support.h"
@ -327,6 +328,7 @@ static void proc_errors(int fd, short args, void *cbdata)
orte_plm_cmd_flag_t cmd;
int rc=ORTE_SUCCESS;
int i;
orte_wait_tracker_t *t2;
ORTE_ACQUIRE_OBJECT(caddy);
@ -412,7 +414,14 @@ static void proc_errors(int fd, short args, void *cbdata)
goto cleanup;
}
/* leave the exit code alone - process this as a waitpid */
ompi_odls_base_default_wait_local_proc(child, NULL);
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = orte_event_base;
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, orte_odls_base_default_wait_local_proc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_errmgr_base_framework.framework_output,

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -44,5 +45,9 @@ ORTE_DECLSPEC extern mca_base_framework_t orte_odls_base_framework;
*/
ORTE_DECLSPEC int orte_odls_base_select(void);
ORTE_DECLSPEC void orte_odls_base_start_threads(orte_job_t *jdata);
ORTE_DECLSPEC void orte_odls_base_harvest_threads(void);
END_C_DECLS
#endif

Просмотреть файл

@ -15,7 +15,7 @@
* All rights reserved.
* Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Mellanox Technologies Ltd. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@ -614,6 +614,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
goto REPORT_ERROR;
}
/* spin up the spawn threads */
orte_odls_base_start_threads(jdata);
/* to save memory, purge the job map of all procs other than
* our own - for daemons, this will completely release the
* proc structures. For the HNP, the proc structs will
@ -727,9 +730,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
int rc, i;
bool found;
orte_proc_state_t state;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;
ORTE_ACQUIRE_OBJECT(cd);
@ -772,44 +772,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
goto errorout;
}
/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}
/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}
rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
state = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
goto errorout;
}
/* did the user request we display output in xterms? */
if (NULL != orte_xterm && !ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
opal_list_item_t *nmitem;
@ -878,15 +840,14 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
cd->argv[0] = param;
}
if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
opal_output_verbose(5, orte_odls_base_framework.framework_output,
"%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
if (15 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
/* dump what is going to be exec'd */
if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
if (ORTE_SUCCESS != (rc = cd->fork_local(cd))) {
@ -923,6 +884,9 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
orte_odls_spawn_caddy_t *cd;
opal_event_base_t *evb;
char *effective_dir = NULL;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;
ORTE_ACQUIRE_OBJECT(caddy);
@ -1105,6 +1069,44 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}
/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}
rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
goto GETOUT;
}
/* tell all children that they are being launched via ORTE */
opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env);
@ -1186,10 +1188,17 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
/* determine the thread that will handle this child */
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);
/* dispatch this child to the next available launch thread */
cd = OBJ_NEW(orte_odls_spawn_caddy_t);
@ -1228,16 +1237,11 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
}
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
opal_output_verbose(1, orte_odls_base_framework.framework_output,
"%s odls:dispatch %s to thread %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name),
orte_odls_globals.next_base);
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
@ -1255,11 +1259,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
free(effective_dir);
effective_dir = NULL;
}
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
ERROR_OUT:
/* ensure we reset our working directory back to our default location */
@ -1323,8 +1322,10 @@ int orte_odls_base_default_signal_local_procs(const orte_process_name_t *proc, i
* Wait for a callback indicating the child has completed.
*/
void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata)
{
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
int i;
orte_job_t *jobdat;
orte_proc_state_t state=ORTE_PROC_STATE_WAITPID_FIRED;
@ -1528,6 +1529,8 @@ void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
/* cancel the wait as this proc has already terminated */
orte_wait_cb_cancel(proc);
ORTE_ACTIVATE_PROC_STATE(&proc->name, state);
/* cleanup the tracker */
OBJ_RELEASE(t2);
}
typedef struct {
@ -1903,17 +1906,17 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
goto CLEANUP;
}
}
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s restarting app %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app));
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);

Просмотреть файл

@ -13,7 +13,7 @@
* Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
@ -39,12 +39,13 @@
#include "opal/util/argv.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/plm/plm_types.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/show_help.h"
#include "orte/util/parse_options.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/plm/plm_types.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/util/threads.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/odls/base/base.h"
@ -78,14 +79,30 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.timeout_before_sigkill);
orte_odls_globals.num_threads = 0;
orte_odls_globals.max_threads = 4;
(void) mca_base_var_register("orte", "odls", "base", "max_threads",
"Maximum number of threads to use for spawning local procs",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.max_threads);
orte_odls_globals.num_threads = -1;
(void) mca_base_var_register("orte", "odls", "base", "num_threads",
"Number of threads to use for spawning local procs",
"Specific number of threads to use for spawning local procs",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.num_threads);
orte_odls_globals.cutoff = 32;
(void) mca_base_var_register("orte", "odls", "base", "cutoff",
"Minimum number of local procs before using thread pool for spawn",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&orte_odls_globals.cutoff);
orte_odls_globals.signal_direct_children_only = false;
(void) mca_base_var_register("orte", "odls", "base", "signal_direct_children_only",
"Whether to restrict signals (e.g., SIGTERM) to direct children, or "
@ -98,6 +115,73 @@ static int orte_odls_base_register(mca_base_register_flag_t flags)
return ORTE_SUCCESS;
}
void orte_odls_base_harvest_threads(void)
{
int i;
ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock);
if (0 < orte_odls_globals.num_threads) {
/* stop the progress threads */
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
}
free(orte_odls_globals.ev_bases);
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
orte_odls_globals.num_threads = 0;
if (NULL != orte_odls_globals.ev_threads) {
opal_argv_free(orte_odls_globals.ev_threads);
orte_odls_globals.ev_threads = NULL;
}
}
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
}
void orte_odls_base_start_threads(orte_job_t *jdata)
{
int i;
char *tmp;
ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock);
/* only do this once */
if (NULL != orte_odls_globals.ev_threads) {
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
return;
}
/* setup the pool of worker threads */
orte_odls_globals.ev_threads = NULL;
orte_odls_globals.next_base = 0;
if (0 == orte_odls_globals.num_threads ||
(int)jdata->num_local_procs < orte_odls_globals.cutoff) {
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
} else {
if (-1 == orte_odls_globals.num_threads) {
/* user didn't specify anything, so default to some fraction of
* the number of local procs, capping it at the max num threads
* parameter value. */
orte_odls_globals.num_threads = jdata->num_local_procs / 8;
if (0 == orte_odls_globals.num_threads) {
orte_odls_globals.num_threads = 1;
} else if (orte_odls_globals.max_threads < orte_odls_globals.num_threads) {
orte_odls_globals.num_threads = orte_odls_globals.max_threads;
}
}
orte_odls_globals.ev_bases =
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
for (i=0; i < orte_odls_globals.num_threads; i++) {
asprintf(&tmp, "ORTE-ODLS-%d", i);
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
free(tmp);
}
}
ORTE_RELEASE_THREAD(&orte_odls_globals.lock);
}
static int orte_odls_base_close(void)
{
int i;
@ -118,14 +202,9 @@ static int orte_odls_base_close(void)
}
OBJ_RELEASE(orte_local_children);
if (0 < orte_odls_globals.num_threads) {
/* stop the progress threads */
for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) {
opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]);
}
}
free(orte_odls_globals.ev_bases);
opal_argv_free(orte_odls_globals.ev_threads);
orte_odls_base_harvest_threads();
ORTE_DESTRUCT_LOCK(&orte_odls_globals.lock);
return mca_base_framework_components_close(&orte_odls_base_framework, NULL);
}
@ -141,6 +220,9 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
orte_namelist_t *nm;
bool xterm_hold;
ORTE_CONSTRUCT_LOCK(&orte_odls_globals.lock);
orte_odls_globals.lock.active = false; // start with nobody having the thread
/* initialize the global array of local children */
orte_local_children = OBJ_NEW(opal_pointer_array_t);
if (OPAL_SUCCESS != (rc = opal_pointer_array_init(orte_local_children,
@ -202,25 +284,6 @@ static int orte_odls_base_open(mca_base_open_flag_t flags)
opal_argv_append_nosize(&orte_odls_globals.xtermcmd, "-e");
}
/* setup the pool of worker threads */
orte_odls_globals.ev_threads = NULL;
orte_odls_globals.next_base = 0;
if (0 == orte_odls_globals.num_threads) {
orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*));
/* use the default event base */
orte_odls_globals.ev_bases[0] = orte_event_base;
} else {
orte_odls_globals.ev_bases =
(opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*));
for (i=0; i < orte_odls_globals.num_threads; i++) {
asprintf(&tmp, "ORTE-ODLS-%d", i);
orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp);
opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp);
free(tmp);
}
}
/* Open up all available components */
return mca_base_framework_components_open(&orte_odls_base_framework, flags);
}

Просмотреть файл

@ -14,6 +14,8 @@
* reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,7 +42,7 @@
#include "orte/mca/iof/base/iof_base_setup.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/threads.h"
#include "orte/mca/odls/odls_types.h"
BEGIN_C_DECLS
@ -59,11 +61,14 @@ typedef struct {
/* the xterm cmd to be used */
char **xtermcmd;
/* thread pool */
int max_threads;
int num_threads;
int cutoff;
opal_event_base_t **ev_bases; // event base array for progress threads
char** ev_threads; // event progress thread names
int next_base; // counter to load-level thread use
bool signal_direct_children_only;
orte_lock_t lock;
} orte_odls_globals_t;
ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals;
@ -127,7 +132,7 @@ OBJ_CLASS_DECLARATION(orte_odls_launch_local_t);
ORTE_DECLSPEC void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata);
ORTE_DECLSPEC void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata);
ORTE_DECLSPEC void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata);
/* define a function type to signal a local proc */
typedef int (*orte_odls_base_signal_local_fn_t)(pid_t pid, int signum);

Просмотреть файл

@ -14,6 +14,8 @@
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -517,7 +519,9 @@ static int plm_alps_finalize(void)
}
static void alps_wait_cb(orte_proc_t *proc, void* cbdata){
static void alps_wait_cb(int sd, short args, void *cbdata) {
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
orte_job_t *jdata;
/* According to the ALPS folks, alps always returns the highest exit
@ -550,6 +554,7 @@ static void alps_wait_cb(orte_proc_t *proc, void* cbdata){
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ABORTED);
}
}
OBJ_RELEASE(t2);
}
@ -575,7 +580,7 @@ static int plm_alps_start_proc(int argc, char **argv, char **env,
/* be sure to mark it as alive so we don't instantly fire */
ORTE_FLAG_SET(alpsrun, ORTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if alps succeeds! */
orte_wait_cb(alpsrun, alps_wait_cb, NULL);
orte_wait_cb(alpsrun, alps_wait_cb, orte_event_base, NULL);
if (0 == alps_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;

Просмотреть файл

@ -49,6 +49,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/iof/base/base.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/ras/base/base.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/rmaps/base/base.h"
@ -56,7 +57,6 @@
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/odls/odls.h"
#if OPAL_ENABLE_FT_CR == 1
#include "orte/mca/snapc/base/base.h"
#endif

Просмотреть файл

@ -259,10 +259,11 @@ static int rsh_init(void)
/**
* Callback on daemon exit.
*/
static void rsh_wait_daemon(orte_proc_t *daemon, void* cbdata)
static void rsh_wait_daemon(int sd, short flags, void *cbdata)
{
orte_job_t *jdata;
orte_plm_rsh_caddy_t *caddy=(orte_plm_rsh_caddy_t*)cbdata;
orte_proc_t *daemon = caddy->daemon;
char *rtmod;
if (orte_orteds_term_ordered || orte_abnormal_term_ordered) {
@ -938,7 +939,7 @@ static void process_launch_list(int fd, short args, void *cbdata)
caddy = (orte_plm_rsh_caddy_t*)item;
/* register the sigchild callback */
ORTE_FLAG_SET(caddy->daemon, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(caddy->daemon, rsh_wait_daemon, (void*)caddy);
orte_wait_cb(caddy->daemon, rsh_wait_daemon, orte_event_base, (void*)caddy);
/* fork a child to exec the rsh/ssh session */
pid = fork();

Просмотреть файл

@ -524,7 +524,9 @@ static int plm_slurm_finalize(void)
}
static void srun_wait_cb(orte_proc_t *proc, void* cbdata){
static void srun_wait_cb(int sd, short fd, void *cbdata){
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
orte_job_t *jdata;
/* According to the SLURM folks, srun always returns the highest exit
@ -576,7 +578,7 @@ static void srun_wait_cb(orte_proc_t *proc, void* cbdata){
}
/* done with this dummy */
OBJ_RELEASE(proc);
OBJ_RELEASE(t2);
}
@ -613,7 +615,7 @@ static int plm_slurm_start_proc(int argc, char **argv, char **env,
/* be sure to mark it as alive so we don't instantly fire */
ORTE_FLAG_SET(dummy, ORTE_PROC_FLAG_ALIVE);
/* setup the waitpid so we can find out if srun succeeds! */
orte_wait_cb(dummy, srun_wait_cb, NULL);
orte_wait_cb(dummy, srun_wait_cb, orte_event_base, NULL);
if (0 == srun_pid) { /* child */
char *bin_base = NULL, *lib_base = NULL;

Просмотреть файл

@ -23,6 +23,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/base/base.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h"
@ -288,6 +289,13 @@ static void track_procs(int fd, short argc, void *cbdata)
/* update the proc state */
pdata->state = state;
jdata->num_launched++;
if (jdata->num_launched == jdata->num_local_procs) {
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);
}
/* don't update until we are told that all are done */
} else if (ORTE_PROC_STATE_REGISTERED == state) {
/* update the proc state */

Просмотреть файл

@ -80,14 +80,7 @@ OBJ_CLASS_INSTANCE(orte_timer_t,
timer_const,
timer_dest);
/* Local objects */
typedef struct {
opal_list_item_t super;
opal_event_t ev;
orte_proc_t *child;
orte_wait_fn_t cbfunc;
void *cbdata;
} orte_wait_tracker_t;
static void wccon(orte_wait_tracker_t *p)
{
p->child = NULL;
@ -100,9 +93,9 @@ static void wcdes(orte_wait_tracker_t *p)
OBJ_RELEASE(p->child);
}
}
static OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
opal_list_item_t,
wccon, wcdes);
OBJ_CLASS_INSTANCE(orte_wait_tracker_t,
opal_list_item_t,
wccon, wcdes);
/* Local Variables */
static opal_event_t handler;
@ -150,7 +143,8 @@ int orte_wait_finalize(void)
/* this function *must* always be called from
* within an event in the orte_event_base */
void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback,
opal_event_base_t *evb, void *data)
{
orte_wait_tracker_t *t2;
@ -162,8 +156,19 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
/* see if this proc is still alive */
if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) {
/* already heard this proc is dead, so just do the callback */
callback(child, data);
if (NULL != callback) {
/* already heard this proc is dead, so just do the callback */
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, t2->cbfunc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
}
return;
}
@ -179,6 +184,7 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data)
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = evb;
t2->cbfunc = callback;
t2->cbdata = data;
opal_list_append(&pending_cbs, &t2->super);
@ -254,11 +260,15 @@ static void wait_signal_callback(int fd, short event, void *arg)
if (pid == t2->child->pid) {
/* found it! */
t2->child->exit_code = status;
if (NULL != t2->cbfunc) {
t2->cbfunc(t2->child, t2->cbdata);
}
opal_list_remove_item(&pending_cbs, &t2->super);
OBJ_RELEASE(t2);
if (NULL != t2->cbfunc) {
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, t2->cbfunc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
} else {
OBJ_RELEASE(t2);
}
break;
}
}

Просмотреть файл

@ -53,7 +53,18 @@
BEGIN_C_DECLS
/** typedef for callback function used in \c orte_wait_cb */
typedef void (*orte_wait_fn_t)(orte_proc_t *proc, void *data);
typedef void (*orte_wait_cbfunc_t)(int fd, short args, void* cb);
/* define a tracker */
typedef struct {
opal_list_item_t super;
opal_event_t ev;
opal_event_base_t *evb;
orte_proc_t *child;
orte_wait_cbfunc_t cbfunc;
void *cbdata;
} orte_wait_tracker_t;
OBJ_CLASS_DECLARATION(orte_wait_tracker_t);
/**
* Disable / re-Enable SIGCHLD handler
@ -71,7 +82,8 @@ ORTE_DECLSPEC void orte_wait_disable(void);
* \c waitpid() will have already been called on the process at this
* time.
*/
ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_fn_t callback, void *data);
ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_cbfunc_t callback,
opal_event_base_t *evb, void *data);
ORTE_DECLSPEC void orte_wait_cb_cancel(orte_proc_t *proc);