diff --git a/orte/mca/errmgr/default_orted/errmgr_default_orted.c b/orte/mca/errmgr/default_orted/errmgr_default_orted.c index 7d131559a9..7583836712 100644 --- a/orte/mca/errmgr/default_orted/errmgr_default_orted.c +++ b/orte/mca/errmgr/default_orted/errmgr_default_orted.c @@ -46,6 +46,7 @@ #include "orte/mca/ess/ess.h" #include "orte/mca/state/state.h" +#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_quit.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/data_type_support/orte_dt_support.h" @@ -327,6 +328,7 @@ static void proc_errors(int fd, short args, void *cbdata) orte_plm_cmd_flag_t cmd; int rc=ORTE_SUCCESS; int i; + orte_wait_tracker_t *t2; ORTE_ACQUIRE_OBJECT(caddy); @@ -412,7 +414,14 @@ static void proc_errors(int fd, short args, void *cbdata) goto cleanup; } /* leave the exit code alone - process this as a waitpid */ - ompi_odls_base_default_wait_local_proc(child, NULL); + t2 = OBJ_NEW(orte_wait_tracker_t); + OBJ_RETAIN(child); // protect against race conditions + t2->child = child; + t2->evb = orte_event_base; + opal_event_set(t2->evb, &t2->ev, -1, + OPAL_EV_WRITE, orte_odls_base_default_wait_local_proc, t2); + opal_event_set_priority(&t2->ev, ORTE_MSG_PRI); + opal_event_active(&t2->ev, OPAL_EV_WRITE, 1); goto cleanup; } OPAL_OUTPUT_VERBOSE((2, orte_errmgr_base_framework.framework_output, diff --git a/orte/mca/odls/base/base.h b/orte/mca/odls/base/base.h index 83e382b2c5..2f86eb69ba 100644 --- a/orte/mca/odls/base/base.h +++ b/orte/mca/odls/base/base.h @@ -11,6 +11,7 @@ * All rights reserved. * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -44,5 +45,9 @@ ORTE_DECLSPEC extern mca_base_framework_t orte_odls_base_framework; */ ORTE_DECLSPEC int orte_odls_base_select(void); +ORTE_DECLSPEC void orte_odls_base_start_threads(orte_job_t *jdata); + +ORTE_DECLSPEC void orte_odls_base_harvest_threads(void); + END_C_DECLS #endif diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 3e80217e5f..1818a94fa7 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -15,7 +15,7 @@ * All rights reserved. * Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science + * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 Mellanox Technologies Ltd. All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. @@ -614,6 +614,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer, goto REPORT_ERROR; } + /* spin up the spawn threads */ + orte_odls_base_start_threads(jdata); + /* to save memory, purge the job map of all procs other than * our own - for daemons, this will completely release the * proc structures. For the HNP, the proc structs will @@ -727,9 +730,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata) int rc, i; bool found; orte_proc_state_t state; - char **argvptr; - char *pathenv = NULL, *mpiexec_pathenv = NULL; - char *full_search; ORTE_ACQUIRE_OBJECT(cd); @@ -772,44 +772,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata) goto errorout; } - /* Search for the OMPI_exec_path and PATH settings in the environment. */ - for (argvptr = app->env; *argvptr != NULL; argvptr++) { - if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) { - mpiexec_pathenv = *argvptr + 15; - } - if (0 == strncmp("PATH=", *argvptr, 5)) { - pathenv = *argvptr + 5; - } - } - - /* If OMPI_exec_path is set (meaning --path was used), then create a - temporary environment to be used in the search for the executable. - The PATH setting in this temporary environment is a combination of - the OMPI_exec_path and PATH values. If OMPI_exec_path is not set, - then just use existing environment with PATH in it. */ - if (NULL != mpiexec_pathenv) { - argvptr = NULL; - if (pathenv != NULL) { - asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv); - } else { - asprintf(&full_search, "%s", mpiexec_pathenv); - } - opal_setenv("PATH", full_search, true, &argvptr); - free(full_search); - } else { - argvptr = app->env; - } - - rc = orte_util_check_context_app(app, argvptr); - /* do not ERROR_LOG - it will be reported elsewhere */ - if (NULL != mpiexec_pathenv) { - opal_argv_free(argvptr); - } - if (ORTE_SUCCESS != rc) { - state = ORTE_PROC_STATE_FAILED_TO_LAUNCH; - goto errorout; - } - /* did the user request we display output in xterms? */ if (NULL != orte_xterm && !ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) { opal_list_item_t *nmitem; @@ -878,15 +840,14 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata) cd->argv[0] = param; } - if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) { - opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&child->name)); + opal_output_verbose(5, orte_odls_base_framework.framework_output, + "%s odls:launch spawning child %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&child->name)); + if (15 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) { /* dump what is going to be exec'd */ - if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) { - opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT); - } + opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT); } if (ORTE_SUCCESS != (rc = cd->fork_local(cd))) { @@ -923,6 +884,9 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) orte_odls_spawn_caddy_t *cd; opal_event_base_t *evb; char *effective_dir = NULL; + char **argvptr; + char *pathenv = NULL, *mpiexec_pathenv = NULL; + char *full_search; ORTE_ACQUIRE_OBJECT(caddy); @@ -1105,6 +1069,44 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) goto GETOUT; } + /* Search for the OMPI_exec_path and PATH settings in the environment. */ + for (argvptr = app->env; *argvptr != NULL; argvptr++) { + if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) { + mpiexec_pathenv = *argvptr + 15; + } + if (0 == strncmp("PATH=", *argvptr, 5)) { + pathenv = *argvptr + 5; + } + } + + /* If OMPI_exec_path is set (meaning --path was used), then create a + temporary environment to be used in the search for the executable. + The PATH setting in this temporary environment is a combination of + the OMPI_exec_path and PATH values. If OMPI_exec_path is not set, + then just use existing environment with PATH in it. */ + if (NULL != mpiexec_pathenv) { + argvptr = NULL; + if (pathenv != NULL) { + asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv); + } else { + asprintf(&full_search, "%s", mpiexec_pathenv); + } + opal_setenv("PATH", full_search, true, &argvptr); + free(full_search); + } else { + argvptr = app->env; + } + + rc = orte_util_check_context_app(app, argvptr); + /* do not ERROR_LOG - it will be reported elsewhere */ + if (NULL != mpiexec_pathenv) { + opal_argv_free(argvptr); + } + if (ORTE_SUCCESS != rc) { + goto GETOUT; + } + + /* tell all children that they are being launched via ORTE */ opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env); @@ -1186,10 +1188,17 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&child->name))); + /* determine the thread that will handle this child */ + ++orte_odls_globals.next_base; + if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) { + orte_odls_globals.next_base = 0; + } + evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base]; + /* set the waitpid callback here for thread protection and * to ensure we can capture the callback on shortlived apps */ ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE); - orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL); + orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL); /* dispatch this child to the next available launch thread */ cd = OBJ_NEW(orte_odls_spawn_caddy_t); @@ -1228,16 +1237,11 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) goto GETOUT; } } - ++orte_odls_globals.next_base; - if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) { - orte_odls_globals.next_base = 0; - } opal_output_verbose(1, orte_odls_base_framework.framework_output, "%s odls:dispatch %s to thread %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&child->name), orte_odls_globals.next_base); - evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base]; opal_event_set(evb, &cd->ev, -1, OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd); opal_event_set_priority(&cd->ev, ORTE_MSG_PRI); @@ -1255,11 +1259,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) free(effective_dir); effective_dir = NULL; } - /* tell the state machine that all local procs for this job - * were launched so that it can do whatever it needs to do, - * like send a state update message for all procs to the HNP - */ - ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE); ERROR_OUT: /* ensure we reset our working directory back to our default location */ @@ -1323,8 +1322,10 @@ int orte_odls_base_default_signal_local_procs(const orte_process_name_t *proc, i * Wait for a callback indicating the child has completed. */ -void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata) +void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata) { + orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata; + orte_proc_t *proc = t2->child; int i; orte_job_t *jobdat; orte_proc_state_t state=ORTE_PROC_STATE_WAITPID_FIRED; @@ -1528,6 +1529,8 @@ void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata) /* cancel the wait as this proc has already terminated */ orte_wait_cb_cancel(proc); ORTE_ACTIVATE_PROC_STATE(&proc->name, state); + /* cleanup the tracker */ + OBJ_RELEASE(t2); } typedef struct { @@ -1903,17 +1906,17 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child, goto CLEANUP; } } - orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL); - ++orte_odls_globals.next_base; if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) { orte_odls_globals.next_base = 0; } + evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base]; + orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL); + OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output, "%s restarting app %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app)); - evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base]; opal_event_set(evb, &cd->ev, -1, OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd); opal_event_set_priority(&cd->ev, ORTE_MSG_PRI); diff --git a/orte/mca/odls/base/odls_base_frame.c b/orte/mca/odls/base/odls_base_frame.c index 810cf43131..cb1cb3ae69 100644 --- a/orte/mca/odls/base/odls_base_frame.c +++ b/orte/mca/odls/base/odls_base_frame.c @@ -13,7 +13,7 @@ * Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2014-2015 Research Organization for Information Science + * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ @@ -39,12 +39,13 @@ #include "opal/util/argv.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/plm/plm_types.h" -#include "orte/util/name_fns.h" -#include "orte/runtime/orte_globals.h" -#include "orte/util/show_help.h" -#include "orte/util/parse_options.h" #include "orte/mca/ess/ess.h" +#include "orte/mca/plm/plm_types.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/util/parse_options.h" +#include "orte/util/show_help.h" +#include "orte/util/threads.h" #include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/base/base.h" @@ -78,14 +79,30 @@ static int orte_odls_base_register(mca_base_register_flag_t flags) MCA_BASE_VAR_SCOPE_READONLY, &orte_odls_globals.timeout_before_sigkill); - orte_odls_globals.num_threads = 0; + orte_odls_globals.max_threads = 4; + (void) mca_base_var_register("orte", "odls", "base", "max_threads", + "Maximum number of threads to use for spawning local procs", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &orte_odls_globals.max_threads); + + orte_odls_globals.num_threads = -1; (void) mca_base_var_register("orte", "odls", "base", "num_threads", - "Number of threads to use for spawning local procs", + "Specific number of threads to use for spawning local procs", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &orte_odls_globals.num_threads); + orte_odls_globals.cutoff = 32; + (void) mca_base_var_register("orte", "odls", "base", "cutoff", + "Minimum number of local procs before using thread pool for spawn", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &orte_odls_globals.cutoff); + orte_odls_globals.signal_direct_children_only = false; (void) mca_base_var_register("orte", "odls", "base", "signal_direct_children_only", "Whether to restrict signals (e.g., SIGTERM) to direct children, or " @@ -98,6 +115,73 @@ static int orte_odls_base_register(mca_base_register_flag_t flags) return ORTE_SUCCESS; } +void orte_odls_base_harvest_threads(void) +{ + int i; + + ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock); + if (0 < orte_odls_globals.num_threads) { + /* stop the progress threads */ + for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) { + opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]); + } + free(orte_odls_globals.ev_bases); + orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*)); + /* use the default event base */ + orte_odls_globals.ev_bases[0] = orte_event_base; + orte_odls_globals.num_threads = 0; + if (NULL != orte_odls_globals.ev_threads) { + opal_argv_free(orte_odls_globals.ev_threads); + orte_odls_globals.ev_threads = NULL; + } + } + ORTE_RELEASE_THREAD(&orte_odls_globals.lock); +} + +void orte_odls_base_start_threads(orte_job_t *jdata) +{ + int i; + char *tmp; + + ORTE_ACQUIRE_THREAD(&orte_odls_globals.lock); + /* only do this once */ + if (NULL != orte_odls_globals.ev_threads) { + ORTE_RELEASE_THREAD(&orte_odls_globals.lock); + return; + } + + /* setup the pool of worker threads */ + orte_odls_globals.ev_threads = NULL; + orte_odls_globals.next_base = 0; + if (0 == orte_odls_globals.num_threads || + (int)jdata->num_local_procs < orte_odls_globals.cutoff) { + orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*)); + /* use the default event base */ + orte_odls_globals.ev_bases[0] = orte_event_base; + } else { + if (-1 == orte_odls_globals.num_threads) { + /* user didn't specify anything, so default to some fraction of + * the number of local procs, capping it at the max num threads + * parameter value. */ + orte_odls_globals.num_threads = jdata->num_local_procs / 8; + if (0 == orte_odls_globals.num_threads) { + orte_odls_globals.num_threads = 1; + } else if (orte_odls_globals.max_threads < orte_odls_globals.num_threads) { + orte_odls_globals.num_threads = orte_odls_globals.max_threads; + } + } + orte_odls_globals.ev_bases = + (opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*)); + for (i=0; i < orte_odls_globals.num_threads; i++) { + asprintf(&tmp, "ORTE-ODLS-%d", i); + orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp); + opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp); + free(tmp); + } + } + ORTE_RELEASE_THREAD(&orte_odls_globals.lock); +} + static int orte_odls_base_close(void) { int i; @@ -118,14 +202,9 @@ static int orte_odls_base_close(void) } OBJ_RELEASE(orte_local_children); - if (0 < orte_odls_globals.num_threads) { - /* stop the progress threads */ - for (i=0; NULL != orte_odls_globals.ev_threads[i]; i++) { - opal_progress_thread_finalize(orte_odls_globals.ev_threads[i]); - } - } - free(orte_odls_globals.ev_bases); - opal_argv_free(orte_odls_globals.ev_threads); + orte_odls_base_harvest_threads(); + + ORTE_DESTRUCT_LOCK(&orte_odls_globals.lock); return mca_base_framework_components_close(&orte_odls_base_framework, NULL); } @@ -141,6 +220,9 @@ static int orte_odls_base_open(mca_base_open_flag_t flags) orte_namelist_t *nm; bool xterm_hold; + ORTE_CONSTRUCT_LOCK(&orte_odls_globals.lock); + orte_odls_globals.lock.active = false; // start with nobody having the thread + /* initialize the global array of local children */ orte_local_children = OBJ_NEW(opal_pointer_array_t); if (OPAL_SUCCESS != (rc = opal_pointer_array_init(orte_local_children, @@ -202,25 +284,6 @@ static int orte_odls_base_open(mca_base_open_flag_t flags) opal_argv_append_nosize(&orte_odls_globals.xtermcmd, "-e"); } - /* setup the pool of worker threads */ - orte_odls_globals.ev_threads = NULL; - orte_odls_globals.next_base = 0; - if (0 == orte_odls_globals.num_threads) { - orte_odls_globals.ev_bases = (opal_event_base_t**)malloc(sizeof(opal_event_base_t*)); - /* use the default event base */ - orte_odls_globals.ev_bases[0] = orte_event_base; - } else { - orte_odls_globals.ev_bases = - (opal_event_base_t**)malloc(orte_odls_globals.num_threads * sizeof(opal_event_base_t*)); - for (i=0; i < orte_odls_globals.num_threads; i++) { - asprintf(&tmp, "ORTE-ODLS-%d", i); - orte_odls_globals.ev_bases[i] = opal_progress_thread_init(tmp); - opal_argv_append_nosize(&orte_odls_globals.ev_threads, tmp); - free(tmp); - } - - } - /* Open up all available components */ return mca_base_framework_components_open(&orte_odls_base_framework, flags); } diff --git a/orte/mca/odls/base/odls_private.h b/orte/mca/odls/base/odls_private.h index de7df05ff3..8590137ade 100644 --- a/orte/mca/odls/base/odls_private.h +++ b/orte/mca/odls/base/odls_private.h @@ -14,6 +14,8 @@ * reserved. * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. + * Copyright (c) 2017 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -40,7 +42,7 @@ #include "orte/mca/iof/base/iof_base_setup.h" #include "orte/mca/rml/rml_types.h" #include "orte/runtime/orte_globals.h" - +#include "orte/util/threads.h" #include "orte/mca/odls/odls_types.h" BEGIN_C_DECLS @@ -59,11 +61,14 @@ typedef struct { /* the xterm cmd to be used */ char **xtermcmd; /* thread pool */ + int max_threads; int num_threads; + int cutoff; opal_event_base_t **ev_bases; // event base array for progress threads char** ev_threads; // event progress thread names int next_base; // counter to load-level thread use bool signal_direct_children_only; + orte_lock_t lock; } orte_odls_globals_t; ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals; @@ -127,7 +132,7 @@ OBJ_CLASS_DECLARATION(orte_odls_launch_local_t); ORTE_DECLSPEC void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata); -ORTE_DECLSPEC void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata); +ORTE_DECLSPEC void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata); /* define a function type to signal a local proc */ typedef int (*orte_odls_base_signal_local_fn_t)(pid_t pid, int signum); diff --git a/orte/mca/plm/alps/plm_alps_module.c b/orte/mca/plm/alps/plm_alps_module.c index c77704e6da..860409eca9 100644 --- a/orte/mca/plm/alps/plm_alps_module.c +++ b/orte/mca/plm/alps/plm_alps_module.c @@ -14,6 +14,8 @@ * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2017 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -517,7 +519,9 @@ static int plm_alps_finalize(void) } -static void alps_wait_cb(orte_proc_t *proc, void* cbdata){ +static void alps_wait_cb(int sd, short args, void *cbdata) { + orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata; + orte_proc_t *proc = t2->child; orte_job_t *jdata; /* According to the ALPS folks, alps always returns the highest exit @@ -550,6 +554,7 @@ static void alps_wait_cb(orte_proc_t *proc, void* cbdata){ ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ABORTED); } } + OBJ_RELEASE(t2); } @@ -575,7 +580,7 @@ static int plm_alps_start_proc(int argc, char **argv, char **env, /* be sure to mark it as alive so we don't instantly fire */ ORTE_FLAG_SET(alpsrun, ORTE_PROC_FLAG_ALIVE); /* setup the waitpid so we can find out if alps succeeds! */ - orte_wait_cb(alpsrun, alps_wait_cb, NULL); + orte_wait_cb(alpsrun, alps_wait_cb, orte_event_base, NULL); if (0 == alps_pid) { /* child */ char *bin_base = NULL, *lib_base = NULL; diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index fbed5caa29..ad698d138f 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -49,6 +49,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/mca/iof/base/base.h" +#include "orte/mca/odls/base/base.h" #include "orte/mca/ras/base/base.h" #include "orte/mca/rmaps/rmaps.h" #include "orte/mca/rmaps/base/base.h" @@ -56,7 +57,6 @@ #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/routed.h" #include "orte/mca/grpcomm/base/base.h" -#include "orte/mca/odls/odls.h" #if OPAL_ENABLE_FT_CR == 1 #include "orte/mca/snapc/base/base.h" #endif diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index 92ee33e21d..7b5ae93a1a 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -259,10 +259,11 @@ static int rsh_init(void) /** * Callback on daemon exit. */ -static void rsh_wait_daemon(orte_proc_t *daemon, void* cbdata) +static void rsh_wait_daemon(int sd, short flags, void *cbdata) { orte_job_t *jdata; orte_plm_rsh_caddy_t *caddy=(orte_plm_rsh_caddy_t*)cbdata; + orte_proc_t *daemon = caddy->daemon; char *rtmod; if (orte_orteds_term_ordered || orte_abnormal_term_ordered) { @@ -938,7 +939,7 @@ static void process_launch_list(int fd, short args, void *cbdata) caddy = (orte_plm_rsh_caddy_t*)item; /* register the sigchild callback */ ORTE_FLAG_SET(caddy->daemon, ORTE_PROC_FLAG_ALIVE); - orte_wait_cb(caddy->daemon, rsh_wait_daemon, (void*)caddy); + orte_wait_cb(caddy->daemon, rsh_wait_daemon, orte_event_base, (void*)caddy); /* fork a child to exec the rsh/ssh session */ pid = fork(); diff --git a/orte/mca/plm/slurm/plm_slurm_module.c b/orte/mca/plm/slurm/plm_slurm_module.c index 5ac4fed36a..593b80e1a9 100644 --- a/orte/mca/plm/slurm/plm_slurm_module.c +++ b/orte/mca/plm/slurm/plm_slurm_module.c @@ -524,7 +524,9 @@ static int plm_slurm_finalize(void) } -static void srun_wait_cb(orte_proc_t *proc, void* cbdata){ +static void srun_wait_cb(int sd, short fd, void *cbdata){ + orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata; + orte_proc_t *proc = t2->child; orte_job_t *jdata; /* According to the SLURM folks, srun always returns the highest exit @@ -576,7 +578,7 @@ static void srun_wait_cb(orte_proc_t *proc, void* cbdata){ } /* done with this dummy */ - OBJ_RELEASE(proc); + OBJ_RELEASE(t2); } @@ -613,7 +615,7 @@ static int plm_slurm_start_proc(int argc, char **argv, char **env, /* be sure to mark it as alive so we don't instantly fire */ ORTE_FLAG_SET(dummy, ORTE_PROC_FLAG_ALIVE); /* setup the waitpid so we can find out if srun succeeds! */ - orte_wait_cb(dummy, srun_wait_cb, NULL); + orte_wait_cb(dummy, srun_wait_cb, orte_event_base, NULL); if (0 == srun_pid) { /* child */ char *bin_base = NULL, *lib_base = NULL; diff --git a/orte/mca/state/orted/state_orted.c b/orte/mca/state/orted/state_orted.c index 39b0248588..417399d3b2 100644 --- a/orte/mca/state/orted/state_orted.c +++ b/orte/mca/state/orted/state_orted.c @@ -23,6 +23,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/iof/base/base.h" +#include "orte/mca/odls/base/base.h" #include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/rml/rml.h" #include "orte/mca/routed/routed.h" @@ -288,6 +289,13 @@ static void track_procs(int fd, short argc, void *cbdata) /* update the proc state */ pdata->state = state; jdata->num_launched++; + if (jdata->num_launched == jdata->num_local_procs) { + /* tell the state machine that all local procs for this job + * were launched so that it can do whatever it needs to do, + * like send a state update message for all procs to the HNP + */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE); + } /* don't update until we are told that all are done */ } else if (ORTE_PROC_STATE_REGISTERED == state) { /* update the proc state */ diff --git a/orte/runtime/orte_wait.c b/orte/runtime/orte_wait.c index 2e10e8770d..bed373d615 100644 --- a/orte/runtime/orte_wait.c +++ b/orte/runtime/orte_wait.c @@ -80,14 +80,7 @@ OBJ_CLASS_INSTANCE(orte_timer_t, timer_const, timer_dest); -/* Local objects */ -typedef struct { - opal_list_item_t super; - opal_event_t ev; - orte_proc_t *child; - orte_wait_fn_t cbfunc; - void *cbdata; -} orte_wait_tracker_t; + static void wccon(orte_wait_tracker_t *p) { p->child = NULL; @@ -100,9 +93,9 @@ static void wcdes(orte_wait_tracker_t *p) OBJ_RELEASE(p->child); } } -static OBJ_CLASS_INSTANCE(orte_wait_tracker_t, - opal_list_item_t, - wccon, wcdes); +OBJ_CLASS_INSTANCE(orte_wait_tracker_t, + opal_list_item_t, + wccon, wcdes); /* Local Variables */ static opal_event_t handler; @@ -150,7 +143,8 @@ int orte_wait_finalize(void) /* this function *must* always be called from * within an event in the orte_event_base */ -void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data) +void orte_wait_cb(orte_proc_t *child, orte_wait_cbfunc_t callback, + opal_event_base_t *evb, void *data) { orte_wait_tracker_t *t2; @@ -162,8 +156,19 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data) /* see if this proc is still alive */ if (!ORTE_FLAG_TEST(child, ORTE_PROC_FLAG_ALIVE)) { - /* already heard this proc is dead, so just do the callback */ - callback(child, data); + if (NULL != callback) { + /* already heard this proc is dead, so just do the callback */ + t2 = OBJ_NEW(orte_wait_tracker_t); + OBJ_RETAIN(child); // protect against race conditions + t2->child = child; + t2->evb = evb; + t2->cbfunc = callback; + t2->cbdata = data; + opal_event_set(t2->evb, &t2->ev, -1, + OPAL_EV_WRITE, t2->cbfunc, t2); + opal_event_set_priority(&t2->ev, ORTE_MSG_PRI); + opal_event_active(&t2->ev, OPAL_EV_WRITE, 1); + } return; } @@ -179,6 +184,7 @@ void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data) t2 = OBJ_NEW(orte_wait_tracker_t); OBJ_RETAIN(child); // protect against race conditions t2->child = child; + t2->evb = evb; t2->cbfunc = callback; t2->cbdata = data; opal_list_append(&pending_cbs, &t2->super); @@ -254,11 +260,15 @@ static void wait_signal_callback(int fd, short event, void *arg) if (pid == t2->child->pid) { /* found it! */ t2->child->exit_code = status; - if (NULL != t2->cbfunc) { - t2->cbfunc(t2->child, t2->cbdata); - } opal_list_remove_item(&pending_cbs, &t2->super); - OBJ_RELEASE(t2); + if (NULL != t2->cbfunc) { + opal_event_set(t2->evb, &t2->ev, -1, + OPAL_EV_WRITE, t2->cbfunc, t2); + opal_event_set_priority(&t2->ev, ORTE_MSG_PRI); + opal_event_active(&t2->ev, OPAL_EV_WRITE, 1); + } else { + OBJ_RELEASE(t2); + } break; } } diff --git a/orte/runtime/orte_wait.h b/orte/runtime/orte_wait.h index b8283f15ba..922f936bbf 100644 --- a/orte/runtime/orte_wait.h +++ b/orte/runtime/orte_wait.h @@ -53,7 +53,18 @@ BEGIN_C_DECLS /** typedef for callback function used in \c orte_wait_cb */ -typedef void (*orte_wait_fn_t)(orte_proc_t *proc, void *data); +typedef void (*orte_wait_cbfunc_t)(int fd, short args, void* cb); + +/* define a tracker */ +typedef struct { + opal_list_item_t super; + opal_event_t ev; + opal_event_base_t *evb; + orte_proc_t *child; + orte_wait_cbfunc_t cbfunc; + void *cbdata; +} orte_wait_tracker_t; +OBJ_CLASS_DECLARATION(orte_wait_tracker_t); /** * Disable / re-Enable SIGCHLD handler @@ -71,7 +82,8 @@ ORTE_DECLSPEC void orte_wait_disable(void); * \c waitpid() will have already been called on the process at this * time. */ -ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_fn_t callback, void *data); +ORTE_DECLSPEC void orte_wait_cb(orte_proc_t *proc, orte_wait_cbfunc_t callback, + opal_event_base_t *evb, void *data); ORTE_DECLSPEC void orte_wait_cb_cancel(orte_proc_t *proc);