diff --git a/orte/mca/odls/base/odls_base_open.c b/orte/mca/odls/base/odls_base_open.c index f64ba0cc40..d483391af5 100644 --- a/orte/mca/odls/base/odls_base_open.c +++ b/orte/mca/odls/base/odls_base_open.c @@ -109,14 +109,6 @@ int orte_odls_base_open(void) return rc; } - /* if we are NOT a daemon, then that is ALL we do! We just needed to ensure - * that the data type(s) got registered so we can send messages to the daemons - */ - if (!orte_process_info.daemon) { - orte_odls_base.components_available = false; - return ORTE_SUCCESS; - } - /* Open up all available components */ if (ORTE_SUCCESS != diff --git a/orte/mca/odls/bproc/odls_bproc.c b/orte/mca/odls/bproc/odls_bproc.c index f029b013f2..4e03bcb15a 100644 --- a/orte/mca/odls/bproc/odls_bproc.c +++ b/orte/mca/odls/bproc/odls_bproc.c @@ -55,6 +55,7 @@ */ orte_odls_base_module_t orte_odls_bproc_module = { orte_odls_bproc_subscribe_launch_data, + orte_odls_bproc_get_add_procs_data, orte_odls_bproc_launch_local_procs, orte_odls_bproc_kill_local_procs, orte_odls_bproc_signal_local_procs @@ -72,6 +73,14 @@ static int odls_bproc_setup_stdio(orte_process_name_t *proc_name, orte_std_cntr_t app_context, bool connect_stdin); +int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} + + /** * Creates the passed directory. If the directory already exists, it and its * contents will be deleted then the directory will be created. diff --git a/orte/mca/odls/bproc/odls_bproc.h b/orte/mca/odls/bproc/odls_bproc.h index 016b882472..189071d888 100644 --- a/orte/mca/odls/bproc/odls_bproc.h +++ b/orte/mca/odls/bproc/odls_bproc.h @@ -57,6 +57,9 @@ int orte_odls_bproc_finalize(void); * Interface */ int orte_odls_bproc_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc); +int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node); int orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ); int orte_odls_bproc_kill_local_procs(orte_jobid_t job, bool set_state); int orte_odls_bproc_signal_local_procs(const orte_process_name_t* proc_name, int32_t signal); diff --git a/orte/mca/odls/default/odls_default.h b/orte/mca/odls/default/odls_default.h index 3000e969c4..1258faeb9f 100644 --- a/orte/mca/odls/default/odls_default.h +++ b/orte/mca/odls/default/odls_default.h @@ -53,6 +53,9 @@ int orte_odls_default_finalize(void); * Interface */ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc); +int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node); int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ); int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state); int orte_odls_default_signal_local_procs(const orte_process_name_t *proc, diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index d6526ea0d1..c44e508fd5 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -100,6 +100,7 @@ static void set_handler_default(int sig); orte_odls_base_module_t orte_odls_default_module = { orte_odls_default_subscribe_launch_data, + orte_odls_default_get_add_procs_data, orte_odls_default_launch_local_procs, orte_odls_default_kill_local_procs, orte_odls_default_signal_local_procs @@ -218,6 +219,128 @@ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb return rc; } +int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node) +{ + orte_gpr_notify_data_t *ndat; + orte_gpr_value_t **values, *value; + orte_std_cntr_t cnt; + char *glob_tokens[] = { + ORTE_JOB_GLOBALS, + NULL + }; + char *glob_keys[] = { + ORTE_JOB_APP_CONTEXT_KEY, + ORTE_JOB_VPID_START_KEY, + ORTE_JOB_VPID_RANGE_KEY, + NULL + }; + opal_list_item_t *item; + orte_mapped_proc_t *proc; + int rc; + char *segment; + + /* set default answer */ + *data = NULL; + + ndat = OBJ_NEW(orte_gpr_notify_data_t); + if (NULL == ndat) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* construct a fake trigger name so that the we can extract the jobid from it later */ + if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&(ndat->target), "bogus", job))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + return rc; + } + + /* get the segment name */ + if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + return rc; + } + + /* get the info from the job globals container first */ + if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR, + segment, glob_tokens, glob_keys, &cnt, &values))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + return rc; + } + + /* there can only be one value here since we only specified a single container. + * Just transfer the returned value to the ndat structure + */ + if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, values[0]))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(values[0]); + return rc; + } + ndat->cnt = 1; + + /* the remainder of our required info is in the mapped_node object, so all we + * have to do is transfer it over + */ + for (item = opal_list_get_first(&node->procs); + item != opal_list_get_end(&node->procs); + item = opal_list_get_next(item)) { + proc = (orte_mapped_proc_t*)item; + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, segment, 3, 1))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(value); + return rc; + } + + value->tokens[0] = strdup("bogus"); /* must have at least one token */ + + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]), + ORTE_PROC_NAME_KEY, + ORTE_NAME, &proc->name))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(value); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]), + ORTE_PROC_APP_CONTEXT_KEY, + ORTE_STD_CNTR, &proc->app_idx))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(value); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]), + ORTE_NODE_NAME_KEY, + ORTE_STRING, node->nodename))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(value); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(ndat); + OBJ_RELEASE(values[0]); + return rc; + } + ndat->cnt += 1; + } + + *data = ndat; + return ORTE_SUCCESS; +} + + static bool odls_default_child_died(pid_t pid, unsigned int timeout, int *exit_status) { time_t end; @@ -386,8 +509,6 @@ static void odls_default_wait_local_proc(pid_t pid, int status, void* cbdata) item != opal_list_get_end(&orte_odls_default.children); item = opal_list_get_next(item)) { child = (orte_odls_child_t*)item; - opal_output(orte_odls_globals.output, "odls: checking child [%ld,%ld,%ld] alive %s", - ORTE_NAME_ARGS(child->name), (child->alive ? "true" : "dead")); if (child->alive && pid == child->pid) { /* found it */ goto GOTCHILD; } @@ -402,14 +523,8 @@ static void odls_default_wait_local_proc(pid_t pid, int status, void* cbdata) return; GOTCHILD: - opal_output(orte_odls_globals.output, "odls: flushing output for [%ld,%ld,%ld]", - ORTE_NAME_ARGS(child->name)); - orte_iof.iof_flush(); - opal_output(orte_odls_globals.output, "odls: output for [%ld,%ld,%ld] flushed", - ORTE_NAME_ARGS(child->name)); - /* determine the state of this process */ aborted = false; if(WIFEXITED(status)) { @@ -435,10 +550,7 @@ GOTCHILD: abort_file = opal_os_path(false, orte_process_info.universe_session_dir, job, vpid, "abort", NULL ); free(job); - free(vpid); - opal_output(orte_odls_globals.output, "odls: stat'ing file %s for [%ld,%ld,%ld]", - abort_file, ORTE_NAME_ARGS(child->name)); - + free(vpid); if (0 == stat(abort_file, &buf)) { /* the abort file must exist - there is nothing in it we need. It's * meer existence indicates that an abnormal termination occurred @@ -814,7 +926,7 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba value = values[j]; if (0 == strcmp(value->tokens[0], ORTE_JOB_GLOBALS)) { - /* this came from the globals container, so it must contain + /* this came from the globals container, so it must contain * the app_context(s), vpid_start, and vpid_range entries. Only one * value object should ever come from that container */ @@ -868,7 +980,7 @@ int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **ba /* Most C-compilers will bark if we try to directly compare the string in the * kval data area against a regular string, so we need to "get" the data * so we can access it */ - if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&node_name, kval->value, ORTE_STRING))) { + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&node_name, kval->value, ORTE_STRING))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/mca/odls/odls.h b/orte/mca/odls/odls.h index 6576bed903..915f11d8b0 100644 --- a/orte/mca/odls/odls.h +++ b/orte/mca/odls/odls.h @@ -33,6 +33,7 @@ #include "orte/mca/gpr/gpr_types.h" #include "orte/mca/ns/ns_types.h" +#include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/odls/odls_types.h" @@ -45,6 +46,17 @@ */ typedef int (*orte_odls_base_module_subscribe_launch_data_fn_t)(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc); +/* + * Construct a notify data object for use in adding local processes + * In order to reuse daemons, we need a way for the HNP to construct a notify_data object that + * contains the data needed by the active ODLS component to launch a local process. Since the + * only one that knows what a particular ODLS component needs is that component, we require an + * entry point that the HNP can call to get the required notify_data object + */ +typedef int (*orte_odls_base_module_get_add_procs_data_fn_t)(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node); + /** * Locally launch the provided processes */ @@ -66,6 +78,7 @@ typedef int (*orte_odls_base_module_signal_local_process_fn_t)(const orte_proces */ struct orte_odls_base_module_1_3_0_t { orte_odls_base_module_subscribe_launch_data_fn_t subscribe_launch_data; + orte_odls_base_module_get_add_procs_data_fn_t get_add_procs_data; orte_odls_base_module_launch_local_processes_fn_t launch_local_procs; orte_odls_base_module_kill_local_processes_fn_t kill_local_procs; orte_odls_base_module_signal_local_process_fn_t signal_local_procs; diff --git a/orte/mca/odls/process/odls_process_module.c b/orte/mca/odls/process/odls_process_module.c index e554d0726b..f21ef6c22a 100755 --- a/orte/mca/odls/process/odls_process_module.c +++ b/orte/mca/odls/process/odls_process_module.c @@ -175,6 +175,13 @@ static int orte_odls_process_subscribe_launch_data( orte_jobid_t job, return rc; } +static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data, + orte_jobid_t job, + orte_mapped_node_t *node) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} + static bool orte_odls_process_child_died( pid_t pid, unsigned int timeout, int* exit_status ) { @@ -878,6 +885,7 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc, orte_odls_base_module_1_3_0_t orte_odls_process_module = { orte_odls_process_subscribe_launch_data, + orte_odls_process_get_add_procs_data, orte_odls_process_launch_local_procs, orte_odls_process_kill_local_procs, orte_odls_process_signal_local_proc diff --git a/orte/mca/pls/base/Makefile.am b/orte/mca/pls/base/Makefile.am index 7df8bf1463..1fbce08e01 100644 --- a/orte/mca/pls/base/Makefile.am +++ b/orte/mca/pls/base/Makefile.am @@ -29,4 +29,5 @@ libmca_pls_la_SOURCES += \ base/pls_base_receive.c \ base/pls_base_select.c \ base/pls_base_dmn_registry_fns.c \ + base/pls_base_reuse_daemon_launch.c \ base/pls_base_orted_cmds.c diff --git a/orte/mca/pls/base/base.h b/orte/mca/pls/base/base.h index 053ec00cf3..ab9c59fb38 100644 --- a/orte/mca/pls/base/base.h +++ b/orte/mca/pls/base/base.h @@ -53,6 +53,8 @@ extern "C" { opal_mutex_t orted_cmd_lock; /* orted cmd cond */ opal_condition_t orted_cmd_cond; + /** reuse daemons flag */ + bool reuse_daemons; } orte_pls_base_t; /** diff --git a/orte/mca/pls/base/pls_base_dmn_registry_fns.c b/orte/mca/pls/base/pls_base_dmn_registry_fns.c index 829ccd9610..b882e316a7 100644 --- a/orte/mca/pls/base/pls_base_dmn_registry_fns.c +++ b/orte/mca/pls/base/pls_base_dmn_registry_fns.c @@ -38,6 +38,7 @@ static void orte_pls_daemon_info_construct(orte_pls_daemon_info_t* ptr) ptr->nodename = NULL; ptr->name = NULL; ptr->active_job = ORTE_JOBID_INVALID; + ptr->ndat = NULL; } /* destructor - used to free any resources held by instance */ @@ -45,6 +46,7 @@ static void orte_pls_daemon_info_destructor(orte_pls_daemon_info_t* ptr) { if (NULL != ptr->nodename) free(ptr->nodename); if (NULL != ptr->name) free(ptr->name); + if (NULL != ptr->ndat) OBJ_RELEASE(ptr->ndat); } OBJ_CLASS_INSTANCE(orte_pls_daemon_info_t, /* type name */ opal_list_item_t, /* parent "class" name */ @@ -144,8 +146,9 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job) orte_cellid_t *cell; char *nodename; orte_process_name_t *name; - orte_pls_daemon_info_t *dmn; + orte_pls_daemon_info_t *dmn, *dmn2; bool found_name, found_node, found_cell; + opal_list_item_t *item; int rc; /* setup the key */ @@ -203,10 +206,19 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job) continue; } } - /* if we found everything, then this is a valid entry - create - * it and add it to the list - */ + /* if we found everything, then this is a valid entry */ if (found_name && found_node && found_cell) { + /* see if this daemon is already on the list - if so, then we don't add it */ + for (item = opal_list_get_first(daemons); + item != opal_list_get_end(daemons); + item = opal_list_get_next(item)) { + dmn2 = (orte_pls_daemon_info_t*)item; + + if (ORTE_EQUAL == orte_dss.compare(dmn2->name, name, ORTE_NAME)) { + /* already on list - ignore it */ + goto MOVEON; + } + } dmn = OBJ_NEW(orte_pls_daemon_info_t); if (NULL == dmn) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); @@ -226,6 +238,7 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job) /* add this daemon to the list */ opal_list_append(daemons, &dmn->super); } +MOVEON: OBJ_RELEASE(values[i]); } @@ -295,3 +308,35 @@ int orte_pls_base_remove_daemon(orte_pls_daemon_info_t *info) return ORTE_SUCCESS; } + + +/* + * Check for available daemons we can re-use + */ +int orte_pls_base_check_avail_daemons(opal_list_t *daemons, + orte_jobid_t job) +{ + orte_jobid_t parent; + int rc; + + /* check for daemons belonging to the parent job */ + if (ORTE_SUCCESS != (rc = orte_ns.get_parent_job(&parent, job))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(daemons, parent, NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* now add in any persistent daemons - they are tagged as bootproxies + * for jobid = 0 */ + if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(daemons, 0, NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + return ORTE_SUCCESS; +} + diff --git a/orte/mca/pls/base/pls_base_open.c b/orte/mca/pls/base/pls_base_open.c index 2a6d507679..16585d1dcf 100644 --- a/orte/mca/pls/base/pls_base_open.c +++ b/orte/mca/pls/base/pls_base_open.c @@ -53,6 +53,8 @@ orte_pls_base_module_t orte_pls; */ int orte_pls_base_open(void) { + int value; + /* Debugging / verbose output. Always have stream open, with verbose set by the mca open system... */ orte_pls_base.pls_output = opal_output_open(NULL); @@ -64,6 +66,16 @@ int orte_pls_base_open(void) OBJ_CONSTRUCT(&orte_pls_base.orted_cmd_lock, opal_mutex_t); OBJ_CONSTRUCT(&orte_pls_base.orted_cmd_cond, opal_condition_t); + /* check for reuse of daemons */ + mca_base_param_reg_int_name("pls", "base_reuse_daemons", + "If nonzero, reuse daemons to launch dynamically spawned processes. If zero, do not reuse daemons (default)", + false, false, (int)false, &value); + if (false == value) { + orte_pls_base.reuse_daemons = false; + } else { + orte_pls_base.reuse_daemons = true; + } + /* Open up all the components that we can find */ if (ORTE_SUCCESS != diff --git a/orte/mca/pls/base/pls_base_orted_cmds.c b/orte/mca/pls/base/pls_base_orted_cmds.c index 0e6520843c..b538cc4011 100644 --- a/orte/mca/pls/base/pls_base_orted_cmds.c +++ b/orte/mca/pls/base/pls_base_orted_cmds.c @@ -65,7 +65,7 @@ static void orte_pls_base_cmd_ack(int status, orte_process_name_t* sender, ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL); if (ret != ORTE_SUCCESS) { ORTE_ERROR_LOG(ret); - return ret; + return; } } @@ -97,7 +97,7 @@ int orte_pls_base_orted_exit(opal_list_t *daemons) item != opal_list_get_end(daemons); item = opal_list_get_next(item)) { dmn = (orte_pls_daemon_info_t*)item; - + if (0 > orte_rml.send_buffer_nb(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED, 0, orte_pls_base_orted_send_cb, NULL)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); @@ -258,7 +258,7 @@ CLEANUP: } -int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_data_t *ndat) +int orte_pls_base_orted_add_local_procs(opal_list_t *daemons) { int rc; orte_buffer_t cmd; @@ -268,32 +268,36 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da OPAL_TRACE(1); - OBJ_CONSTRUCT(&cmd, orte_buffer_t); - - /* pack the command */ - if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } - - /* pack the jobid */ - if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &ndat, 1, ORTE_GPR_NOTIFY_DATA))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } - - /* send the commands as fast as we can */ + /* pack and send the commands as fast as we can - we have to + * pack each time as the launch data could be different for + * the various nodes + */ for (item = opal_list_get_first(daemons); item != opal_list_get_end(daemons); item = opal_list_get_next(item)) { dmn = (orte_pls_daemon_info_t*)item; + OBJ_CONSTRUCT(&cmd, orte_buffer_t); + + /* pack the command */ + if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + + /* pack the launch data for this daemon */ + if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &(dmn->ndat), 1, ORTE_GPR_NOTIFY_DATA))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (0 > orte_rml.send_buffer_nb(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED, 0, orte_pls_base_orted_send_cb, NULL)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); OBJ_DESTRUCT(&cmd); return rc; } + OBJ_DESTRUCT(&cmd); orted_cmd_num_active++; } @@ -306,17 +310,18 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da return rc; } - /* wait for all commands to have been received */ + /* wait for the command to have been received */ OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock); if (orted_cmd_num_active > 0) { opal_condition_wait(&orte_pls_base.orted_cmd_cond, &orte_pls_base.orted_cmd_lock); } OPAL_THREAD_UNLOCK(&orte_pls_base.orted_cmd_lock); + + return ORTE_SUCCESS; CLEANUP: OBJ_DESTRUCT(&cmd); - /* we're done! */ - return ORTE_SUCCESS; + return rc; } diff --git a/orte/mca/pls/base/pls_base_reuse_daemon_launch.c b/orte/mca/pls/base/pls_base_reuse_daemon_launch.c new file mode 100644 index 0000000000..490f8b187e --- /dev/null +++ b/orte/mca/pls/base/pls_base_reuse_daemon_launch.c @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +#include "orte/orte_constants.h" + +#include "opal/util/argv.h" +#include "opal/util/opal_environ.h" +#include "opal/mca/base/mca_base_param.h" + +#include "orte/dss/dss.h" +#include "orte/mca/odls/odls.h" +#include "orte/mca/rmaps/rmaps_types.h" +#include "orte/mca/gpr/gpr_types.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ns/ns_types.h" + +#include "orte/mca/pls/base/pls_private.h" + +int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job) +{ + opal_list_t avail_daemons; + opal_list_item_t *item, *item2, *next; + orte_pls_daemon_info_t *dmn, *newdmn; + orte_mapped_node_t *node; + opal_list_t used_daemons; + orte_gpr_notify_data_t *ndat; + int rc; + + OBJ_CONSTRUCT(&avail_daemons, opal_list_t); + OBJ_CONSTRUCT(&used_daemons, opal_list_t); + + /* check for available daemons we could use */ + if (ORTE_SUCCESS != (rc = orte_pls_base_check_avail_daemons(&avail_daemons, job))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* go through the list, checking nodenames against what is in the + * map. If nodes match, then construct and send an appropriate command + * to that daemon to launch the local procs - remove that node structure + * from the map so that the main launcher doesn't also try to start procs + * on that node! + */ + + while (NULL != (item = opal_list_remove_first(&avail_daemons))) { + dmn = (orte_pls_daemon_info_t*)item; + + item2 = opal_list_get_first(&map->nodes); + while (item2 != opal_list_get_end(&map->nodes)) { + node = (orte_mapped_node_t*)item2; + + /* save the next position in case we remove this one */ + next = opal_list_get_next(item2); + + if (0 == strcmp(node->nodename, dmn->nodename)) { + newdmn = OBJ_NEW(orte_pls_daemon_info_t); + newdmn->cell = dmn->cell; + newdmn->nodename = strdup(dmn->nodename); + newdmn->active_job = job; + orte_dss.copy((void**)&(newdmn->name), dmn->name, ORTE_NAME); + if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(&(newdmn->ndat), job, node))) { + ORTE_ERROR_LOG(rc); + return rc; + } + opal_list_append(&used_daemons, &newdmn->super); + /* procs on this node are taken care of, so remove it from + * the map list so the main launcher won't try to launch them + */ + opal_list_remove_item(&map->nodes, item2); + OBJ_RELEASE(item2); + } + + /* move to next position */ + item2 = next; + } + } + + if (0 >= opal_list_get_size(&used_daemons)) { + /* if none were used, then just return */ + OBJ_DESTRUCT(&used_daemons); + return ORTE_SUCCESS; + } + + /* store the bootproxy records */ + orte_pls_base_store_active_daemons(&used_daemons); + + /* launch any procs that are using existing daemons */ + if (ORTE_SUCCESS != (rc = orte_pls_base_orted_add_local_procs(&used_daemons))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* cleanup */ + while (NULL != (item = opal_list_remove_first(&used_daemons))) OBJ_RELEASE(item); + OBJ_DESTRUCT(&used_daemons); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/pls/base/pls_private.h b/orte/mca/pls/base/pls_private.h index 34b0b3ec96..f035657a35 100644 --- a/orte/mca/pls/base/pls_private.h +++ b/orte/mca/pls/base/pls_private.h @@ -32,6 +32,7 @@ #include "orte/mca/gpr/gpr_types.h" #include "orte/mca/ns/ns_types.h" #include "orte/mca/ras/ras_types.h" +#include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/rmgr/rmgr_types.h" #include "orte/mca/rml/rml_types.h" @@ -61,7 +62,8 @@ extern "C" { char *nodename; orte_process_name_t *name; orte_jobid_t active_job; - } orte_pls_daemon_info_t; + orte_gpr_notify_data_t *ndat; + } orte_pls_daemon_info_t; OBJ_CLASS_DECLARATION(orte_pls_daemon_info_t); @@ -77,12 +79,15 @@ extern "C" { int orte_pls_base_orted_exit(opal_list_t *daemons); int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job); int orte_pls_base_orted_signal_local_procs(opal_list_t *daemons, int32_t signal); - int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_data_t *ndat); + int orte_pls_base_orted_add_local_procs(opal_list_t *dmnlist); int orte_pls_base_get_active_daemons(opal_list_t *daemons, orte_jobid_t job, opal_list_t *attrs); int orte_pls_base_store_active_daemons(opal_list_t *daemons); int orte_pls_base_remove_daemon(orte_pls_daemon_info_t *info); + int orte_pls_base_check_avail_daemons(opal_list_t *daemons, orte_jobid_t job); + int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job); + /* * communications utilities */ diff --git a/orte/mca/pls/rsh/pls_rsh_module.c b/orte/mca/pls/rsh/pls_rsh_module.c index ad0ab530b8..35da010565 100644 --- a/orte/mca/pls/rsh/pls_rsh_module.c +++ b/orte/mca/pls/rsh/pls_rsh_module.c @@ -82,6 +82,7 @@ #include "orte/mca/smr/smr.h" #include "orte/mca/pls/pls.h" +#include "orte/mca/pls/base/base.h" #include "orte/mca/pls/base/pls_private.h" #include "orte/mca/pls/rsh/pls_rsh.h" @@ -443,9 +444,9 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) int node_name_index2; int proc_name_index; int local_exec_index, local_exec_index_end; - char *jobid_string; + char *jobid_string = NULL; char *uri, *param; - char **argv, **tmp; + char **argv = NULL, **tmp; char *prefix_dir; int argc; int rc; @@ -481,7 +482,21 @@ int orte_pls_rsh_launch(orte_jobid_t jobid) goto cleanup; } + /* if the user requested that we re-use daemons, + * launch the procs on any existing, re-usable daemons */ + if (orte_pls_base.reuse_daemons) { + if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + num_nodes = (orte_std_cntr_t)opal_list_get_size(&map->nodes); + if (0 >= num_nodes) { + /* nothing left to do - just return */ + rc = ORTE_SUCCESS; + goto cleanup; + } if (mca_pls_rsh_component.debug_daemons && mca_pls_rsh_component.num_concurrent < num_nodes) { @@ -1094,8 +1109,8 @@ cleanup: free(bin_base); } - free(jobid_string); /* done with this variable */ - opal_argv_free(argv); + if (NULL != jobid_string) free(jobid_string); /* done with this variable */ + if (NULL != argv) opal_argv_free(argv); return rc; } diff --git a/orte/tools/orted/orted.c b/orte/tools/orted/orted.c index 8fb8255fbe..3a7c63703a 100644 --- a/orte/tools/orted/orted.c +++ b/orte/tools/orted/orted.c @@ -669,6 +669,7 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender, ORTE_NAME_ARGS(orte_process_info.my_name)); } /* unpack the notify data object */ + n = 1; if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) { ORTE_ERROR_LOG(ret); goto CLEANUP; diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 3adde8e905..53e567309f 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -114,6 +114,7 @@ struct globals_t { bool debugger; bool no_local_schedule; bool displaymapatlaunch; + bool reuse_daemons; int num_procs; int exit_status; char *hostfile; @@ -262,6 +263,10 @@ opal_cmd_line_init_t cmd_line_init[] = { &orte_process_info.tmpdir_base, OPAL_CMD_LINE_TYPE_STRING, "Set the root for the session directory tree for orterun ONLY" }, + { NULL, NULL, NULL, '\0', "reuse-daemons", "reuse-daemons", 0, + &orterun_globals.reuse_daemons, OPAL_CMD_LINE_TYPE_BOOL, + "If set, reuse daemons to launch dynamically spawned processes"}, + { NULL, NULL, NULL, '\0', NULL, "prefix", 1, NULL, OPAL_CMD_LINE_TYPE_STRING, "Prefix where Open MPI is installed on remote nodes" }, @@ -413,7 +418,6 @@ int orterun(int argc, char *argv[]) } free(tmp); } - /* pre-condition any network transports that require it */ if (ORTE_SUCCESS != (rc = orte_pre_condition_transports(apps, num_apps))) { @@ -1000,6 +1004,18 @@ static int parse_globals(int argc, char* argv[]) mca_base_param_set_int(id, 1); } + if (orterun_globals.reuse_daemons) { + id = mca_base_param_reg_int_name("pls", "base_reuse_daemons", + "If nonzero, reuse daemons to launch dynamically spawned processes. If zero, do not reuse daemons (default)", + false, false, 0, &ret); + + if (orterun_globals.reuse_daemons) { + mca_base_param_set_int(id, (int)true); + } else { + mca_base_param_set_int(id, (int)false); + } + } + /* If we don't want to wait, we don't want to wait */ if (orterun_globals.no_wait_for_job_completion) {