diff --git a/orte/mca/odls/process/odls_process.h b/orte/mca/odls/process/odls_process.h index d9c15b7914..991ee080a5 100755 --- a/orte/mca/odls/process/odls_process.h +++ b/orte/mca/odls/process/odls_process.h @@ -32,35 +32,47 @@ orte_odls_base_module_t* orte_odls_process_component_init(int *priority); /* * Startup / Shutdown */ -int orte_odls_process_finalize(void); - - -/* - * Interface - */ -int orte_odls_process_launch(orte_jobid_t); -int orte_odls_process_terminate_job(orte_jobid_t); -int orte_odls_process_terminate_proc(const orte_process_name_t* proc_name); -int orte_odls_process_signal_job(orte_jobid_t, int32_t); -int orte_odls_process_signal_proc(const orte_process_name_t* proc_name, int32_t signal); +int orte_odls_process_component_finalize(void); /** - * ODLS Component + * ODLS Process globals */ -struct orte_odls_process_component_t { - orte_odls_base_component_t super; - int debug; - int priority; - int reap; - int timeout_before_sigkill; - int num_children; - opal_mutex_t lock; +typedef struct orte_odls_process_globals_t { + opal_mutex_t mutex; opal_condition_t cond; -}; -typedef struct orte_odls_process_component_t orte_odls_process_component_t; + opal_list_t children; +} orte_odls_process_globals_t; +extern orte_odls_process_globals_t orte_odls_process; -ORTE_MODULE_DECLSPEC extern orte_odls_process_component_t mca_odls_process_component; +/* + * List object to locally store the process names and pids of + * our children. This can subsequently be used to order termination + * or pass signals without looking the info up again. + */ +typedef struct odls_process_child_t { + opal_list_item_t super; /* required to place this on a list */ + orte_process_name_t *name; /* the OpenRTE name of the proc */ + pid_t pid; /* local pid of the proc */ + orte_std_cntr_t app_idx; /* index of the app_context for this proc */ + bool alive; /* is this proc alive? */ +} odls_process_child_t; +OBJ_CLASS_DECLARATION(odls_process_child_t); + +/* + * List object to locally store app_contexts returned by the + * registry subscription. Since we don't know how many app_contexts will + * be returned, we need to store them on a list. + */ +typedef struct odls_process_app_context_t { + opal_list_item_t super; /* required to place this on a list */ + orte_app_context_t *app_context; +} odls_process_app_context_t; +OBJ_CLASS_DECLARATION(odls_process_app_context_t); + +/* + * ODLS Process module + */ extern orte_odls_base_module_t orte_odls_process_module; diff --git a/orte/mca/odls/process/odls_process_component.c b/orte/mca/odls/process/odls_process_component.c index 19fa07af7e..772ea69216 100755 --- a/orte/mca/odls/process/odls_process_component.c +++ b/orte/mca/odls/process/odls_process_component.c @@ -24,12 +24,32 @@ #include "orte/mca/odls/odls.h" #include "orte/mca/odls/process/odls_process.h" +/* Instantiate the component globals */ +orte_odls_process_globals_t orte_odls_process; -/* - * Public string showing the odls ompi_process component version number - */ -const char *mca_odls_process_component_version_string = - "Open MPI process odls MCA component version " ORTE_VERSION; + +/* instance the child list object */ +static void odls_process_child_constructor(odls_process_child_t *ptr) +{ + ptr->name = NULL; + ptr->pid = 0; + ptr->app_idx = -1; + ptr->alive = false; +} +static void odls_process_child_destructor(odls_process_child_t *ptr) +{ + if (NULL != ptr->name) free(ptr->name); +} + +OBJ_CLASS_INSTANCE(odls_process_child_t, + opal_list_item_t, + odls_process_child_constructor, + odls_process_child_destructor); + +/* instance the app_context list object */ +OBJ_CLASS_INSTANCE(odls_process_app_context_t, + opal_list_item_t, + NULL, NULL); /* @@ -37,17 +57,14 @@ const char *mca_odls_process_component_version_string = * and pointers to our public functions in it */ -orte_odls_process_component_t mca_odls_process_component = { - { +orte_odls_base_component_t mca_odls_process_component = { /* First, the mca_component_t struct containing meta information - about the component itself */ - + about the component itself */ { - /* Indicate that we are a odls v1.0.0 component (which also - implies a specific MCA version) */ - - ORTE_ODLS_BASE_VERSION_1_0_0, - + /* Indicate that we are a odls v1.3.0 component (which also + implies a specific MCA version) */ + + ORTE_ODLS_BASE_VERSION_1_3_0, /* Component name and version */ "process", @@ -71,63 +88,47 @@ orte_odls_process_component_t mca_odls_process_component = { /* Initialization / querying functions */ - orte_odls_process_component_init - } + orte_odls_process_component_init, + orte_odls_process_component_finalize }; - - int orte_odls_process_component_open(void) { - mca_base_component_t *c = &mca_odls_process_component.super.odls_version; - /* initialize globals */ - OBJ_CONSTRUCT(&mca_odls_process_component.lock, opal_mutex_t); - OBJ_CONSTRUCT(&mca_odls_process_component.cond, opal_condition_t); + OBJ_CONSTRUCT(&orte_odls_process.mutex, opal_mutex_t); + OBJ_CONSTRUCT(&orte_odls_process.cond, opal_condition_t); + OBJ_CONSTRUCT(&orte_odls_process.children, opal_list_t); - /* lookup parameters */ - mca_base_param_reg_int(c, "reap", - "Whether to wait to reap all children before finalizing or not", - false, false, 1, &mca_odls_process_component.reap); - mca_base_param_reg_int(c, "reap_timeout", - "When killing children processes, first send a SIGTERM, then wait at least this timeout (in seconds), then send a SIGKILL", - false, false, 0, &mca_odls_process_component.timeout_before_sigkill); - mca_base_param_reg_int(c, "priority", - "Priority of this component", - false, false, 1, &mca_odls_process_component.priority); - mca_base_param_reg_int(c, "debug", - "Whether to enable debugging output or not", - false, false, 0, &mca_odls_process_component.debug); - if (mca_odls_process_component.debug == 0) { - int id = mca_base_param_register_int("debug",NULL,NULL,NULL,0); - int value; - mca_base_param_lookup_int(id,&value); - mca_odls_process_component.debug = (value > 0) ? 1 : 0; - } return ORTE_SUCCESS; } - orte_odls_base_module_t *orte_odls_process_component_init(int *priority) { - /* Only return a module if we're in the orted */ -#if 0 - if (orte_process_info.daemon) { - *priority = mca_odls_process_component.priority; - return &orte_odls_process_module; - } else { - return NULL; - } -#endif - *priority = mca_odls_process_component.priority; + /* the base open/select logic protects us against operation when + * we are NOT in a daemon, so we don't have to check that here + */ + + *priority = 1; + return &orte_odls_process_module; } - int orte_odls_process_component_close(void) { - OBJ_DESTRUCT(&mca_odls_process_component.lock); - OBJ_DESTRUCT(&mca_odls_process_component.cond); + OBJ_DESTRUCT(&orte_odls_process.mutex); + OBJ_DESTRUCT(&orte_odls_process.cond); + OBJ_DESTRUCT(&orte_odls_process.children); return ORTE_SUCCESS; } +int orte_odls_process_component_finalize(void) +{ + opal_list_item_t *item; + + /* cleanup state */ + while (NULL != (item = opal_list_remove_first(&orte_odls_process.children))) { + OBJ_RELEASE(item); + } + + return ORTE_SUCCESS; +} diff --git a/orte/mca/odls/process/odls_process_module.c b/orte/mca/odls/process/odls_process_module.c index aaf047dad2..4b41496084 100755 --- a/orte/mca/odls/process/odls_process_module.c +++ b/orte/mca/odls/process/odls_process_module.c @@ -18,6 +18,7 @@ #endif #include #include +#include #ifdef HAVE_SYS_WAIT_H #include #endif @@ -26,91 +27,160 @@ #include #endif #include -#ifdef HAVE_SYS_PARAM_H -#include -#endif -#ifdef HAVE_NETDB_H -#include -#endif -#include "orte/orte_constants.h" #include "opal/event/event.h" #include "opal/util/argv.h" #include "opal/util/output.h" -#include "opal/mca/paffinity/base/base.h" +#include "opal/util/os_path.h" #include "opal/util/show_help.h" #include "opal/util/path.h" -#include "opal/class/opal_value_array.h" +#include "opal/util/basename.h" +#include "opal/util/opal_environ.h" +#include "opal/mca/base/mca_base_param.h" +#include "opal/mca/paffinity/base/base.h" + +#include "orte/dss/dss.h" #include "orte/util/sys_info.h" #include "orte/util/univ_info.h" -#include "opal/util/opal_environ.h" #include "orte/util/session_dir.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/base/base.h" #include "orte/mca/iof/iof.h" #include "orte/mca/iof/base/iof_base_setup.h" -#include "opal/mca/base/mca_base_param.h" #include "orte/mca/ns/ns.h" #include "orte/mca/sds/base/base.h" -#include "orte/mca/odls/odls.h" -#include "orte/mca/odls/base/base.h" +#include "orte/mca/rmgr/rmgr.h" #include "orte/mca/rml/rml.h" #include "orte/mca/gpr/gpr.h" #include "orte/mca/rmaps/base/base.h" -#include "orte/mca/rmaps/base/rmaps_base_map.h" #include "orte/mca/smr/smr.h" -#include "orte/mca/smr/base/base.h" + +#include "orte/mca/odls/base/odls_private.h" #include "orte/mca/odls/process/odls_process.h" -#if !defined(__WINDOWS__) -extern char **environ; -#endif /* !defined(__WINDOWS__) */ - -#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS -static int orte_odls_process_launch_threaded(orte_jobid_t); -#endif - - -orte_odls_base_module_1_0_0_t orte_odls_process_module = { -#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS - orte_odls_process_launch_threaded, -#else - orte_odls_process_launch, -#endif - orte_odls_process_terminate_job, - orte_odls_process_terminate_proc, - orte_odls_process_signal_job, - orte_odls_process_signal_proc, - orte_odls_process_finalize -}; - static void set_handler_default(int sig); - -static bool orte_odls_process_child_died(pid_t pid, unsigned int timeout) +/* this entire function gets called within a GPR compound command, + * so the subscription actually doesn't get done until the orted + * executes the compound command + */ +static int orte_odls_process_subscribe_launch_data( orte_jobid_t job, + orte_gpr_notify_cb_fn_t cbfunc ) { -#if NOT_YET_AVAILABLE - time_t end; - pid_t ret; - end = time(NULL) + timeout; - do { - ret = waitpid(pid, NULL, WNOHANG); - if (pid == ret) { - /* It died -- return success */ - return true; - } else if (-1 == ret && ECHILD == errno) { - /* The pid no longer exists, so we'll call this "good - enough for government work" */ - return true; + char *segment; + orte_gpr_value_t *values[2]; + orte_gpr_subscription_t *subs, sub=ORTE_GPR_SUBSCRIPTION_EMPTY; + orte_gpr_trigger_t *trigs, trig=ORTE_GPR_TRIGGER_EMPTY; + char *glob_keys[] = { + ORTE_JOB_APP_CONTEXT_KEY, + ORTE_JOB_VPID_START_KEY, + ORTE_JOB_VPID_RANGE_KEY + }; + int num_glob_keys = 3; + char* keys[] = { + ORTE_PROC_NAME_KEY, + ORTE_PROC_APP_CONTEXT_KEY, + ORTE_NODE_NAME_KEY, + }; + int num_keys = 3; + int i, rc; + + /* get the job segment name */ + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* attach ourselves to the "standard" orted trigger */ + if (ORTE_SUCCESS != + (rc = orte_schema.get_std_trigger_name(&(trig.name), + ORTED_LAUNCH_STAGE_GATE_TRIGGER, job))) { + ORTE_ERROR_LOG(rc); + free(segment); + return rc; + } + + /* ask for return of all data required for launching local processes */ + subs = ⊂ + sub.action = ORTE_GPR_NOTIFY_DELETE_AFTER_TRIG; + if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&(sub.name), + ORTED_LAUNCH_STG_SUB, + job))) { + ORTE_ERROR_LOG(rc); + free(segment); + free(trig.name); + return rc; + } + sub.cnt = 2; + sub.values = values; + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[0]), ORTE_GPR_TOKENS_OR, segment, + num_glob_keys, 1))) { + ORTE_ERROR_LOG(rc); + free(segment); + free(sub.name); + free(trig.name); + return rc; + } + values[0]->tokens[0] = strdup(ORTE_JOB_GLOBALS); + for (i=0; i < num_glob_keys; i++) { + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[0]->keyvals[i]), + glob_keys[i], ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + free(segment); + free(sub.name); + free(trig.name); + OBJ_RELEASE(values[0]); + return rc; } + } + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&(values[1]), ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR, + segment, num_keys, 0))) { + ORTE_ERROR_LOG(rc); + free(segment); + free(sub.name); + free(trig.name); + OBJ_RELEASE(values[0]); + return rc; + } + for (i=0; i < num_keys; i++) { + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(values[1]->keyvals[i]), + keys[i], ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + free(segment); + free(sub.name); + free(trig.name); + OBJ_RELEASE(values[0]); + OBJ_RELEASE(values[1]); + return rc; + } + } + + sub.cbfunc = cbfunc; + + trigs = &trig; + + /* do the subscription */ + if (ORTE_SUCCESS != (rc = orte_gpr.subscribe(1, &subs, 1, &trigs))) { + ORTE_ERROR_LOG(rc); + } + free(segment); + free(sub.name); + free(trig.name); + OBJ_RELEASE(values[0]); + OBJ_RELEASE(values[1]); - /* Sleep for a second */ - sleep(1); - } while (time(NULL) < end); -#endif /* NOT_YET_AVAILABLE */ + return rc; +} + +static bool orte_odls_process_child_died( pid_t pid, unsigned int timeout, + int* exit_status ) +{ int error; - HANDLE handle = OpenProcess( PROCESS_TERMINATE | SYNCHRONIZE, FALSE, (DWORD)pid ); + HANDLE handle = OpenProcess( PROCESS_TERMINATE | SYNCHRONIZE, FALSE, + (DWORD)pid ); if( INVALID_HANDLE_VALUE == handle ) { error = GetLastError(); /* Let's suppose that the process dissapear ... by now */ @@ -121,94 +191,230 @@ static bool orte_odls_process_child_died(pid_t pid, unsigned int timeout) return false; } -static void orte_odls_process_kill_processes(opal_value_array_t *pids) +static int orte_odls_process_kill_local( pid_t pid, int sig_num ) { - size_t i; - pid_t pid; + if( false == TerminateProcess( (HANDLE)pid, 1 ) ) { + return (int)GetLastError(); + } + return 0; +} - for (i = 0; i < opal_value_array_get_size(pids); ++i) { - pid = OPAL_VALUE_ARRAY_GET_ITEM(pids, pid_t, i); +static int orte_odls_process_kill_local_procs(orte_jobid_t job, bool set_state) +{ + odls_process_child_t *child; + opal_list_item_t *item; + int rc, exit_status; + opal_list_t procs_killed; + orte_namelist_t *proc; + + OBJ_CONSTRUCT(&procs_killed, opal_list_t); + + /* since we are going to be working with the global list of + * children, we need to protect that list from modification + * by other threads + */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); + + for (item = opal_list_get_first(&orte_odls_process.children); + item != opal_list_get_end(&orte_odls_process.children); + item = opal_list_get_next(item)) { + child = (odls_process_child_t*)item; + + /* is this process alive? if not, then nothing for us + * to do to it + */ + if (!child->alive) { + continue; + } + + /* do we have a child from the specified job? Because the + * job could be given as a WILDCARD value, we must use + * the dss.compare function to check for equality. + */ + if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) { + continue; + } /* de-register the SIGCHILD callback for this pid */ - orte_wait_cb_cancel(pid); + orte_wait_cb_cancel(child->pid); - /* Send a sigterm to the process. If we get ESRCH back, that - means the process is already dead, so just proceed on to - the reaping of it. If we get any other error back, just - skip it and go on to the next process. */ - if( false == TerminateProcess( (HANDLE)pid, 1 ) ) { - DWORD err = GetLastError(); - char hostname[MAXHOSTNAMELEN]; - gethostname(hostname, sizeof(hostname)); - - opal_show_help("help-orte-odls-process.txt", - "orte-odls-process:could-not-send-kill", - true, hostname, pid, err); - - continue; + /* Send a sigterm to the process. */ + if (0 != orte_odls_process_kill_local(child->pid, SIGTERM)) { + int err = GetLastError(); + opal_show_help("help-odls-default.txt", + "odls-default:could-not-send-kill", + true, orte_system_info.nodename, child->pid, err); + goto MOVEON; } /* The kill succeeded. Wait up to timeout_before_sigkill seconds to see if it died. */ - if (!orte_odls_process_child_died(pid, mca_odls_process_component.timeout_before_sigkill)) { - char hostname[MAXHOSTNAMELEN]; - gethostname(hostname, sizeof(hostname)); - - opal_show_help("help-orte-odls-process.txt", - "orte-odls-process:could-not-kill", - true, hostname, pid); + if (!orte_odls_process_child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) { + /* try killing it again */ + orte_odls_process_kill_local(child->pid, SIGKILL); + /* Double check that it actually died this time */ + if (!orte_odls_process_child_died(child->pid, orte_odls_globals.timeout_before_sigkill, &exit_status)) { + opal_show_help("help-odls-default.txt", + "odls-default:could-not-kill", + true, orte_system_info.nodename, child->pid); + } } +MOVEON: + /* set the process to "not alive" */ + child->alive = false; + + /* add this proc to the local list */ + proc = OBJ_NEW(orte_namelist_t); + if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(proc->name), child->name, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + return rc; + } + opal_list_append(&procs_killed, &proc->item); } + + /* we are done with the global list, so we can now release + * any waiting threads - this also allows any callbacks to work + */ + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + + /* deconstruct the local list and update the process states on the registry, if indicated */ + while (NULL != (item = opal_list_remove_first(&procs_killed))) { + proc = (orte_namelist_t*)item; + if (set_state) { + if (ORTE_SUCCESS != (rc = orte_smr.set_proc_state(proc->name, ORTE_PROC_STATE_TERMINATED, exit_status))) { + ORTE_ERROR_LOG(rc); + /* don't exit out even if this didn't work - we still might need to kill more + * processes, so just keep trucking + */ + } + } + OBJ_RELEASE(proc); + } + + OBJ_DESTRUCT(&procs_killed); - /* Release any waiting threads from this process */ - OPAL_THREAD_LOCK(&mca_odls_process_component.lock); - mca_odls_process_component.num_children = 0; - opal_condition_signal(&mca_odls_process_component.cond); - OPAL_THREAD_UNLOCK(&mca_odls_process_component.lock); + return ORTE_SUCCESS; } /* * Wait for a callback indicating the child has completed. */ - -static void orte_odls_process_wait_proc(pid_t pid, int status, void* cbdata) +static void odls_process_wait_local_proc(pid_t pid, int status, void* cbdata) { - orte_rmaps_base_proc_t* proc = (orte_rmaps_base_proc_t*)cbdata; + odls_process_child_t *child; + opal_list_item_t *item; + bool aborted; + char *job, *vpid, *abort_file; + struct _stat buf; int rc; - /* Clean up the session directory as if we were the process - itself. This covers the case where the process died abnormally - and didn't cleanup its own session directory. */ - orte_session_dir_finalize(&proc->proc_name); + /* since we are going to be working with the global list of + * children, we need to protect that list from modification + * by other threads. This will also be used to protect us + * from race conditions on any abort situation + */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); + + /* find this child */ + for (item = opal_list_get_first(&orte_odls_process.children); + item != opal_list_get_end(&orte_odls_process.children); + item = opal_list_get_next(item)) { + child = (odls_process_child_t*)item; + if (child->alive && pid == child->pid) { /* found it */ + goto GOTCHILD; + } + } + /* get here if we didn't find the child, or if the specified child is already + * dead. If the latter, then we have a problem as it means we are detecting + * it exiting multiple times + */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + return; + +GOTCHILD: orte_iof.iof_flush(); - /* set the state of this process */ + /* determine the state of this process */ + aborted = false; if(WIFEXITED(status)) { - rc = orte_smr.set_proc_state(&proc->proc_name, ORTE_PROC_STATE_TERMINATED, status); + /* even though the process exited "normally", it is quite + * possible that this happened via an orte_abort call - in + * which case, we need to indicate this was an "abnormal" + * termination. See the note in "orte_abort.c" for + * an explanation of this process. + * + * For our purposes here, we need to check for the existence + * of an "abort" file in this process' session directory. If + * we find it, then we know that this was an abnormal termination. + */ + if (ORTE_SUCCESS != (rc = orte_ns.convert_jobid_to_string(&job, child->name->jobid))) { + ORTE_ERROR_LOG(rc); + goto MOVEON; + } + if (ORTE_SUCCESS != (rc = orte_ns.convert_vpid_to_string(&vpid, child->name->vpid))) { + ORTE_ERROR_LOG(rc); + free(job); + goto MOVEON; + } + abort_file = opal_os_path(false, orte_process_info.universe_session_dir, + job, vpid, "abort", NULL ); + free(job); + free(vpid); + if (0 == _stat(abort_file, &buf)) { + /* the abort file must exist - there is nothing in it we need. It's + * meer existence indicates that an abnormal termination occurred + */ + aborted = true; + free(abort_file); + } } else { - rc = orte_smr.set_proc_state(&proc->proc_name, ORTE_PROC_STATE_ABORTED, status); + /* the process was terminated with a signal! That's definitely + * abnormal, so indicate that condition + */ + aborted = true; } - if(ORTE_SUCCESS != rc) { + +MOVEON: + /* set this proc to "not alive" */ + child->alive = false; + + /* Clean up the session directory as if we were the process + * itself. This covers the case where the process died abnormally + * and didn't cleanup its own session directory. + */ + orte_session_dir_finalize(child->name); + + /* Need to unlock before we call set_proc_state as this is going to generate + * a trigger that will eventually callback to us + */ + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + + if (aborted) { + rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, status); + } else { + rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_TERMINATED, status); + } + + if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); } - OBJ_RELEASE(proc); - - /* release any waiting threads */ - OPAL_THREAD_LOCK(&mca_odls_process_component.lock); - mca_odls_process_component.num_children--; - opal_condition_signal(&mca_odls_process_component.cond); - OPAL_THREAD_UNLOCK(&mca_odls_process_component.lock); } /** * Fork/exec the specified processes */ -static int orte_odls_process_proc( +static int orte_odls_process_fork_local_proc( orte_app_context_t* context, - orte_rmaps_base_proc_t* proc, + odls_process_child_t *child, orte_vpid_t vpid_start, orte_vpid_t vpid_range, bool want_processor, @@ -217,7 +423,6 @@ static int orte_odls_process_proc( pid_t pid; orte_iof_base_io_conf_t opts; int rc; - orte_vpid_t vpid; int i = 0; char** environ_copy; char *param, *param2; @@ -229,20 +434,26 @@ static int orte_odls_process_proc( /* BWB - Fix post beta. Should setup stdin in orterun and make part of the app_context */ - if (ORTE_SUCCESS == orte_ns.get_vpid(&vpid, &proc->proc_name) && - vpid == 0) { + if( 0 == child->name->vpid ) { opts.connect_stdin = true; } else { opts.connect_stdin = false; } /* Try to change to the context cwd and check that the app - * exists and is executable - */ - if (ORTE_SUCCESS != orte_odls_base_check_context_cwd(context, true) || - ORTE_SUCCESS != orte_odls_base_check_context_app(context)) { - opal_show_help("help-orte-odls-process.txt", "orte-odls-process:execv-error", - true, context->app, strerror(errno)); + exists and is executable */ + if (ORTE_SUCCESS != (i = orte_rmgr.check_context_cwd(context, true))) { + opal_show_help("help-odls-default.txt", + "odls-default:chdir-error", + true, orte_system_info.nodename, context->cwd); + /* Tell the parent that Badness happened */ + return ORTE_ERR_FATAL; + } + if (ORTE_SUCCESS != (i = orte_rmgr.check_context_app(context))) { + opal_show_help("help-odls-default.txt", + "odls-default:argv0-not-accessible", + true, orte_system_info.nodename, context->app); + /* Tell the parent that Badness happened */ return ORTE_ERR_FATAL; } @@ -274,8 +485,20 @@ static int orte_odls_process_proc( opal_setenv("PATH", newenv, true, &environ_copy); free(newenv); } + + /* Reset LD_LIBRARY_PATH */ + else if (0 == strncmp("LD_LIBRARY_PATH=", context->env[i], 16)) { + asprintf(&newenv, "%s/lib:%s", + context->prefix_dir, context->env[i] + 16); + opal_setenv("LD_LIBRARY_PATH", newenv, true, &environ_copy); + free(newenv); + } } + param = mca_base_param_environ_variable("rmgr","bootproxy","jobid"); + opal_unsetenv(param, &environ_copy); + free(param); + if (want_processor) { param = mca_base_param_environ_variable("mpi", NULL, "paffinity_processor"); @@ -323,13 +546,15 @@ static int orte_odls_process_proc( opal_setenv(param, orte_system_info.nodename, true, &environ_copy); free(param); - /* push name into environment */ - orte_ns_nds_env_put(&proc->proc_name, vpid_start, vpid_range, + /* push name into environment */ + orte_ns_nds_env_put(child->name, vpid_start, vpid_range, &environ_copy); - param = mca_base_param_environ_variable("rmgr","bootproxy","jobid"); - opal_unsetenv(param, &environ_copy); - free(param); + if (context->argv == NULL) { + context->argv = (char**)malloc(sizeof(char*)*2); + context->argv[0] = strdup(context->app); + context->argv[1] = NULL; + } /* Flush all standard handles (stdin, stdout & stderr). */ _flushall(); @@ -338,27 +563,32 @@ static int orte_odls_process_proc( intptr_t handle = _spawnve( _P_NOWAIT, context->app, context->argv, environ_copy ); if( -1 == handle ) { + opal_show_help("help-orted-launcer.txt", "orted-launcher:execv-error", + true, context->app, "TODO: some error"); + orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, -1); return ORTE_ERROR; } pid = handle; } - /* save the pid in the registry */ - if (ORTE_SUCCESS != (rc = orte_odls_base_set_proc_pid(&proc->proc_name, pid))) { + /* set the proc state to LAUNCHED and increment that counter so the trigger can fire + */ + if (ORTE_SUCCESS != + (rc = orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_LAUNCHED, 0))) { ORTE_ERROR_LOG(rc); return rc; } + + /* save the pid and indicate we've been launched */ + child->pid = pid; + child->alive = true; /* wait for the child process - dont register for wait * callback until after I/O is setup and the pid registered - * otherwise can receive the wait callback before the above is * ever completed */ - OPAL_THREAD_LOCK(&mca_odls_process_component.lock); - mca_odls_process_component.num_children++; - OPAL_THREAD_UNLOCK(&mca_odls_process_component.lock); - OBJ_RETAIN(proc); - orte_wait_cb(pid, orte_odls_process_wait_proc, proc); + orte_wait_cb(pid, odls_process_wait_local_proc, NULL); return ORTE_SUCCESS; } @@ -367,352 +597,290 @@ static int orte_odls_process_proc( * Launch all processes allocated to the current node. */ -int orte_odls_process_launch(orte_jobid_t jobid) +static int orte_odls_process_launch_local_procs(orte_gpr_notify_data_t *data) { - opal_list_t map; - opal_list_item_t* item; - orte_vpid_t vpid_start; - orte_vpid_t vpid_range; int rc; - size_t num_processors, num_processes; + orte_std_cntr_t i, j, kv, kv2, *sptr; + orte_gpr_value_t *value, **values; + orte_gpr_keyval_t *kval; + orte_app_context_t *app; + orte_jobid_t job; + orte_vpid_t *vptr, start, range; + char *node_name; + opal_list_t app_context_list; + odls_process_child_t *child; + odls_process_app_context_t *app_item; + size_t num_processors; + bool want_processor; + opal_list_item_t *item, *item2; - /* query the allocation for this node */ - OBJ_CONSTRUCT(&map, opal_list_t); - rc = orte_rmaps_base_get_node_map( - orte_process_info.my_name->cellid,jobid,orte_system_info.nodename,&map); - if (ORTE_SUCCESS != rc) { + /* parse the returned data to create the required structures + * for a fork launch. Since the data will contain information + * on procs for ALL nodes, we first have to find the value + * struct that contains info for our node. + */ + + /* first, retrieve the job number we are to launch from the + * returned data - we can extract the jobid directly from the + * subscription name we created + */ + if (ORTE_SUCCESS != (rc = orte_schema.extract_jobid_from_std_trigger_name(&job, data->target))) { ORTE_ERROR_LOG(rc); - goto cleanup; + return rc; + } + /* We need to create a list of the app_contexts + * so we can know what to launch - the process info only gives + * us an index into the app_context array, not the app_context + * info itself. + */ + + OBJ_CONSTRUCT(&app_context_list, opal_list_t); + + /* set the default values to INVALID */ + start = ORTE_VPID_INVALID; + range = ORTE_VPID_INVALID; + + values = (orte_gpr_value_t**)(data->values)->addr; + for (j=0, i=0; i < data->cnt && j < (data->values)->size; j++) { /* loop through all returned values */ + if (NULL != values[j]) { + i++; + value = values[j]; + + if (0 == strcmp(value->tokens[0], ORTE_JOB_GLOBALS)) { + /* this came from the globals container, so it must contain + * the app_context(s), vpid_start, and vpid_range entries. Only one + * value object should ever come from that container + */ + for (kv=0; kv < value->cnt; kv++) { + kval = value->keyvals[kv]; + if (strcmp(kval->key, ORTE_JOB_VPID_START_KEY) == 0) { + /* this can only occur once, so just store it */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, kval->value, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + return rc; + } + start = *vptr; + continue; + } + if (strcmp(kval->key, ORTE_JOB_VPID_RANGE_KEY) == 0) { + /* this can only occur once, so just store it */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&vptr, kval->value, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + return rc; + } + range = *vptr; + continue; + } + if (strcmp(kval->key, ORTE_JOB_APP_CONTEXT_KEY) == 0) { + /* this can occur multiple times since we allow multiple + * app_contexts on the orterun command line. Add them + * to the list + */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&app, kval->value, ORTE_APP_CONTEXT))) { + ORTE_ERROR_LOG(rc); + return rc; + } + app_item = OBJ_NEW(odls_process_app_context_t); + if (NULL == app_item) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + app_item->app_context = app; + opal_list_append(&app_context_list, &app_item->super); + kval->value->data = NULL; /* protect the data storage from later release */ + } + } /* end for loop to process global data */ + } else { + /* this must have come from one of the process containers, so it must + * contain data for a proc structure - see if it + * belongs to this node + */ + for (kv=0; kv < value->cnt; kv++) { + kval = value->keyvals[kv]; + if (strcmp(kval->key, ORTE_NODE_NAME_KEY) == 0) { + /* Most C-compilers will bark if we try to directly compare the string in the + * kval data area against a regular string, so we need to "get" the data + * so we can access it */ + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&node_name, kval->value, ORTE_STRING))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* if this is our node...must also protect against a zero-length string */ + if (NULL != node_name && 0 == strcmp(node_name, orte_system_info.nodename)) { + /* ...harvest the info into a new child structure */ + child = OBJ_NEW(odls_process_child_t); + for (kv2 = 0; kv2 < value->cnt; kv2++) { + kval = value->keyvals[kv2]; + if(strcmp(kval->key, ORTE_PROC_NAME_KEY) == 0) { + /* copy the name into the child object */ + if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(child->name), kval->value->data, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + return rc; + } + continue; + } + if(strcmp(kval->key, ORTE_PROC_APP_CONTEXT_KEY) == 0) { + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, kval->value, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + child->app_idx = *sptr; /* save the index into the app_context objects */ + continue; + } + } /* kv2 */ + /* protect operation on the global list of children */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); + opal_list_append(&orte_odls_process.children, &child->super); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + + } + } + } /* for kv */ + } + } /* for j */ } - rc = orte_rmaps_base_get_vpid_range(jobid, &vpid_start, &vpid_range); - if (ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* are we oversubscribing? */ + /* determine if we are oversubscribed */ + want_processor = true; /* default to taking it for ourselves */ opal_paffinity_base_get_num_processors(&rc); - num_processors = (size_t) rc; - for (num_processes = 0, item = opal_list_get_first(&map); - item != opal_list_get_end(&map); - item = opal_list_get_next(item)) { - orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)item; - num_processes += map->num_procs; + num_processors = (size_t)rc; + if (opal_list_get_size(&orte_odls_process.children) > num_processors) { /* oversubscribed */ + want_processor = false; } - /* attempt to launch each of the apps */ - for (item = opal_list_get_first(&map); - item != opal_list_get_end(&map); - item = opal_list_get_next(item)) { - orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*)item; - orte_std_cntr_t i; - for (i=0; inum_procs; i++) { - rc = orte_odls_process_proc(map->app, map->procs[i], vpid_start, - vpid_range, - (num_processes > num_processors) ? - false : true, i); + /* okay, now let's launch our local procs using a fork/exec */ + i = 0; + /* protect operations involving the global list of children */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); - if (ORTE_SUCCESS != rc) { - /* Set the state of this process, and all remaining - processes to be launched to ABORTED. This will - cause the entire job to abort. */ - for (; i < map->num_procs; ++i) { - orte_smr.set_proc_state(&map->procs[i]->proc_name, - ORTE_PROC_STATE_ABORTED, 0); - } + for (item = opal_list_get_first(&orte_odls_process.children); + item != opal_list_get_end(&orte_odls_process.children); + item = opal_list_get_next(item)) { + child = (odls_process_child_t*)item; - /* Propagate the error up the stack */ - ORTE_ERROR_LOG(rc); - goto cleanup; + /* is this child already alive? This can happen if + * we are asked to launch additional processes. + * If it has been launched, then do nothing + */ + if (child->alive) { + continue; + } + + /* do we have a child from the specified job. Because the + * job could be given as a WILDCARD value, we must use + * the dss.compare function to check for equality. + */ + if (ORTE_EQUAL != orte_dss.compare(&job, &(child->name->jobid), ORTE_JOBID)) { + continue; + } + + /* find the indicated app_context in the list */ + for (item2 = opal_list_get_first(&app_context_list); + item2 != opal_list_get_end(&app_context_list); + item2 = opal_list_get_next(item2)) { + app_item = (odls_process_app_context_t*)item2; + if (child->app_idx == app_item->app_context->idx) { + app = app_item->app_context; + goto DOFORK; } } + /* get here if we couldn't find the app_context */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + return ORTE_ERR_NOT_FOUND; + +DOFORK: + /* must unlock prior to fork to keep things clean in the + * event library + */ + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + + if (ORTE_SUCCESS != (rc = orte_odls_process_fork_local_proc(app, child, start, + range, want_processor, i))) { + ORTE_ERROR_LOG(rc); + orte_smr.set_proc_state(child->name, ORTE_PROC_STATE_ABORTED, 0); + opal_condition_signal(&orte_odls_process.cond); + return rc; + } + /* reaquire lock so we don't double unlock... */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); + i++; } -cleanup: - while(NULL != (item = opal_list_remove_first(&map))) { + /* cleanup */ + while (NULL != (item = opal_list_remove_first(&app_context_list))) { OBJ_RELEASE(item); } - OBJ_DESTRUCT(&map); - return rc; -} + OBJ_DESTRUCT(&app_context_list); -/** - * Query for all processes allocated to the job and terminate - * those on the current node. - */ - -int orte_odls_process_terminate_job(orte_jobid_t jobid) -{ - /* query for the pids allocated on this node */ - char *segment; - char *keys[3]; - orte_gpr_value_t** values = NULL; - orte_std_cntr_t i, k; - orte_std_cntr_t num_values = 0; - int rc; - opal_value_array_t pids; - - /* setup the pid array */ - OBJ_CONSTRUCT(&pids, opal_value_array_t); - opal_value_array_init(&pids, sizeof(pid_t)); - - /* query the job segment on the registry */ - if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - keys[0] = ORTE_NODE_NAME_KEY; - keys[1] = ORTE_PROC_PID_KEY; - keys[2] = NULL; - - rc = orte_gpr.get( - ORTE_GPR_KEYS_AND|ORTE_GPR_TOKENS_OR, - segment, - NULL, - keys, - &num_values, - &values - ); - if(rc != ORTE_SUCCESS) { - free(segment); - return rc; - } - - for(i=0; icnt; k++) { - orte_gpr_keyval_t* keyval = value->keyvals[k]; - if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) { - if(orte_dss.compare(keyval->value->data, orte_system_info.nodename, ORTE_STRING) != ORTE_EQUAL) { - break; - } - } else if (strcmp(keyval->key, ORTE_PROC_PID_KEY) == 0) { - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&pidptr, keyval->value, ORTE_PID))) { - ORTE_ERROR_LOG(rc); - free(segment); - return rc; - } - pid = *pidptr; - } - } - if (0 != pid) { - opal_value_array_append_item(&pids, &pid); - } - OBJ_RELEASE(value); - } - - /* If we have processes to kill, go kill them */ - if (opal_value_array_get_size(&pids) > 0) { - orte_odls_process_kill_processes(&pids); - } - OBJ_DESTRUCT(&pids); - - if(NULL != values) { - free(values); - } - free(segment); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); return ORTE_SUCCESS; } - -int orte_odls_process_terminate_proc(const orte_process_name_t* proc) +static int send_signal(pid_t pid, int signal) { - return ORTE_ERR_NOT_IMPLEMENTED; + return ORTE_ERROR; } -/** - * Query for all processes allocated to the job and signal - * those on the current node. - */ - -int orte_odls_process_signal_job(orte_jobid_t jobid, int32_t signal) +static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc, int32_t signal) { - /* query for the pids allocated on this node */ - char *segment; - char *keys[3]; - orte_gpr_value_t** values = NULL; - orte_std_cntr_t i, k; - orte_std_cntr_t num_values = 0; int rc; - opal_value_array_t pids; + opal_list_item_t *item; + odls_process_child_t *child; + + /* protect operations involving the global list of children */ + OPAL_THREAD_LOCK(&orte_odls_process.mutex); - /* setup the pid array */ - OBJ_CONSTRUCT(&pids, opal_value_array_t); - opal_value_array_init(&pids, sizeof(pid_t)); - - /* query the job segment on the registry */ - if(ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - keys[0] = ORTE_NODE_NAME_KEY; - keys[1] = ORTE_PROC_PID_KEY; - keys[2] = NULL; - - rc = orte_gpr.get( - ORTE_GPR_KEYS_AND|ORTE_GPR_TOKENS_OR, - segment, - NULL, - keys, - &num_values, - &values - ); - if(rc != ORTE_SUCCESS) { - free(segment); - return rc; - } - - for(i=0; icnt; k++) { - orte_gpr_keyval_t* keyval = value->keyvals[k]; - if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) { - if(orte_dss.compare(keyval->value->data, orte_system_info.nodename, ORTE_STRING) != ORTE_EQUAL) { - break; - } - } else if (strcmp(keyval->key, ORTE_PROC_PID_KEY) == 0) { - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&pidptr, keyval->value, ORTE_PID))) { - ORTE_ERROR_LOG(rc); - free(segment); - return rc; - } - pid = *pidptr; + /* if procs is NULL, then we want to signal all + * of the local procs, so just do that case + */ + if (NULL == proc) { + rc = ORTE_SUCCESS; /* pre-set this as an empty list causes us to drop to bottom */ + for (item = opal_list_get_first(&orte_odls_process.children); + item != opal_list_get_end(&orte_odls_process.children); + item = opal_list_get_next(item)) { + child = (odls_process_child_t*)item; + if (ORTE_SUCCESS != (rc = send_signal(child->pid, (int)signal))) { + ORTE_ERROR_LOG(rc); } } - if (0 != pid) { - opal_value_array_append_item(&pids, &pid); - } - OBJ_RELEASE(value); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + return rc; } - - rc = ORTE_SUCCESS; - opal_output( 0, "Win32 do not allow us to deliver a signal to a process\n" ); -#if 0 - /* If we have processes to signal, go signal them */ - for (i = 0; i < opal_value_array_get_size(&pids); ++i) { - pid = OPAL_VALUE_ARRAY_GET_ITEM(&pids, pid_t, i); - if(kill(pid, (int)signal) != 0) { - switch(errno) { - case EINVAL: - ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); - rc = ORTE_ERR_BAD_PARAM; - break; - case ESRCH: - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - rc = ORTE_ERR_NOT_FOUND; - break; - case EPERM: - ORTE_ERROR_LOG(ORTE_ERR_PERM); - rc = ORTE_ERR_PERM; - break; - default: - ORTE_ERROR_LOG(ORTE_ERROR); - rc = ORTE_ERROR; + + /* we want it sent to some specified process, so find it */ + for (item = opal_list_get_first(&orte_odls_process.children); + item != opal_list_get_end(&orte_odls_process.children); + item = opal_list_get_next(item)) { + child = (odls_process_child_t*)item; + if (ORTE_EQUAL == orte_dss.compare(&(child->name), (void*)proc, ORTE_NAME)) { + /* unlock before signaling as this may generate a callback */ + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + if (ORTE_SUCCESS != (rc = send_signal(child->pid, (int)signal))) { + ORTE_ERROR_LOG(rc); } + return rc; } } -#endif - OBJ_DESTRUCT(&pids); - - if(NULL != values) { - free(values); - } - free(segment); - - return rc; + + /* only way to get here is if we couldn't find the specified proc. + * report that as an error and return it + */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + opal_condition_signal(&orte_odls_process.cond); + OPAL_THREAD_UNLOCK(&orte_odls_process.mutex); + return ORTE_ERR_NOT_FOUND; } - -int orte_odls_process_signal_proc(const orte_process_name_t* proc, int32_t signal) -{ - return ORTE_ERR_NOT_IMPLEMENTED; -} - -int orte_odls_process_finalize(void) -{ - if(mca_odls_process_component.reap) { - OPAL_THREAD_LOCK(&mca_odls_process_component.lock); - while(mca_odls_process_component.num_children > 0) { - opal_condition_wait(&mca_odls_process_component.cond, - &mca_odls_process_component.lock); - } - OPAL_THREAD_UNLOCK(&mca_odls_process_component.lock); - } - return ORTE_SUCCESS; -} - - -/** - * Handle threading issues. - */ - -#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS && OMPI_ENABLE_PROGRESS_THREADS - -struct orte_odls_process_stack_t { - opal_condition_t cond; - opal_mutex_t mutex; - bool complete; - orte_jobid_t jobid; - int rc; +orte_odls_base_module_1_3_0_t orte_odls_process_module = { + orte_odls_process_subscribe_launch_data, + orte_odls_process_launch_local_procs, + orte_odls_process_kill_local_procs, + orte_odls_process_signal_local_proc }; -typedef struct orte_odls_process_stack_t orte_odls_process_stack_t; - -static void orte_odls_process_stack_construct(orte_odls_process_stack_t* stack) -{ - OBJ_CONSTRUCT(&stack->mutex, opal_mutex_t); - OBJ_CONSTRUCT(&stack->cond, opal_condition_t); - stack->rc = 0; - stack->complete = false; -} - -static void orte_odls_process_stack_destruct(orte_odls_process_stack_t* stack) -{ - OBJ_DESTRUCT(&stack->mutex); - OBJ_DESTRUCT(&stack->cond); -} - -static OBJ_CLASS_INSTANCE( - orte_odls_process_stack_t, - opal_object_t, - orte_odls_process_stack_construct, - orte_odls_process_stack_destruct); - - -static void orte_odls_process_launch_cb(int fd, short event, void* args) -{ - orte_odls_process_stack_t *stack = (orte_odls_process_stack_t*)args; - OPAL_THREAD_LOCK(&stack->mutex); - stack->rc = orte_odls_process_launch(stack->jobid); - stack->complete = true; - opal_condition_signal(&stack->cond); - OPAL_THREAD_UNLOCK(&stack->mutex); -} - -static int orte_odls_process_launch_threaded(orte_jobid_t jobid) -{ - - struct timeval tv = { 0, 0 }; - struct opal_event event; - struct orte_odls_process_stack_t stack; - - OBJ_CONSTRUCT(&stack, orte_odls_process_stack_t); - - stack.jobid = jobid; - opal_evtimer_set(&event, orte_odls_process_launch_cb, &stack); - opal_evtimer_add(&event, &tv); - - OPAL_THREAD_LOCK(&stack.mutex); - while(false == stack.complete) { - opal_condition_wait(&stack.cond, &stack.mutex); - } - OPAL_THREAD_UNLOCK(&stack.mutex); - OBJ_DESTRUCT(&stack); - return stack.rc; -} - -#endif