1
1

Create the ability to re-use existing daemons. Included in the commit:

1. new functionality in the pls base to check for reusable daemons and launch upon them

2. an extension of the odls API to allow each odls component to build a notify message with the "correct" data in it for adding processes to the local daemon. This means that the odls now opens components on the HNP as well as on daemons - but that's the price of allowing so much flexibility. Only the default odls has this functionality enabled - the others just return NOT_IMPLEMENTED

3. addition of a new command line option "--reuse-daemons" to orterun. The default, for now, is to NOT reuse daemons. Once we have more time to test this capability, we may choose to reverse the default. For one thing, we probably want to investigate the tradeoffs in start time for comm_spawn'd processes that reuse daemons versus launch their own. On some systems, though, having another daemon show up can cause problems - so they may want to set the default as "reuse".

This is ONLY enabled for rsh launch, at the moment. The code needing to be added to each launcher is about three lines long, so I'll be doing that as I get access to machines I can test it on.

This commit was SVN r12608.
Этот коммит содержится в:
Ralph Castain 2006-11-15 21:12:27 +00:00
родитель 3f550c3a83
Коммит f7fc19a2ca
17 изменённых файлов: 411 добавлений и 54 удалений

Просмотреть файл

@ -109,14 +109,6 @@ int orte_odls_base_open(void)
return rc;
}
/* if we are NOT a daemon, then that is ALL we do! We just needed to ensure
* that the data type(s) got registered so we can send messages to the daemons
*/
if (!orte_process_info.daemon) {
orte_odls_base.components_available = false;
return ORTE_SUCCESS;
}
/* Open up all available components */
if (ORTE_SUCCESS !=

Просмотреть файл

@ -55,6 +55,7 @@
*/
orte_odls_base_module_t orte_odls_bproc_module = {
orte_odls_bproc_subscribe_launch_data,
orte_odls_bproc_get_add_procs_data,
orte_odls_bproc_launch_local_procs,
orte_odls_bproc_kill_local_procs,
orte_odls_bproc_signal_local_procs
@ -72,6 +73,14 @@ static int odls_bproc_setup_stdio(orte_process_name_t *proc_name,
orte_std_cntr_t app_context, bool connect_stdin);
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}
/**
* Creates the passed directory. If the directory already exists, it and its
* contents will be deleted then the directory will be created.

Просмотреть файл

@ -57,6 +57,9 @@ int orte_odls_bproc_finalize(void);
* Interface
*/
int orte_odls_bproc_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc);
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
int orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ);
int orte_odls_bproc_kill_local_procs(orte_jobid_t job, bool set_state);
int orte_odls_bproc_signal_local_procs(const orte_process_name_t* proc_name, int32_t signal);

Просмотреть файл

@ -53,6 +53,9 @@ int orte_odls_default_finalize(void);
* Interface
*/
int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc);
int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ);
int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state);
int orte_odls_default_signal_local_procs(const orte_process_name_t *proc,

Просмотреть файл

@ -100,6 +100,7 @@ static void set_handler_default(int sig);
orte_odls_base_module_t orte_odls_default_module = {
orte_odls_default_subscribe_launch_data,
orte_odls_default_get_add_procs_data,
orte_odls_default_launch_local_procs,
orte_odls_default_kill_local_procs,
orte_odls_default_signal_local_procs
@ -218,6 +219,128 @@ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb
return rc;
}
int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
{
orte_gpr_notify_data_t *ndat;
orte_gpr_value_t **values, *value;
orte_std_cntr_t cnt;
char *glob_tokens[] = {
ORTE_JOB_GLOBALS,
NULL
};
char *glob_keys[] = {
ORTE_JOB_APP_CONTEXT_KEY,
ORTE_JOB_VPID_START_KEY,
ORTE_JOB_VPID_RANGE_KEY,
NULL
};
opal_list_item_t *item;
orte_mapped_proc_t *proc;
int rc;
char *segment;
/* set default answer */
*data = NULL;
ndat = OBJ_NEW(orte_gpr_notify_data_t);
if (NULL == ndat) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* construct a fake trigger name so that the we can extract the jobid from it later */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&(ndat->target), "bogus", job))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* get the segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* get the info from the job globals container first */
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, glob_tokens, glob_keys, &cnt, &values))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* there can only be one value here since we only specified a single container.
* Just transfer the returned value to the ndat structure
*/
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, values[0]))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
ndat->cnt = 1;
/* the remainder of our required info is in the mapped_node object, so all we
* have to do is transfer it over
*/
for (item = opal_list_get_first(&node->procs);
item != opal_list_get_end(&node->procs);
item = opal_list_get_next(item)) {
proc = (orte_mapped_proc_t*)item;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, segment, 3, 1))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
value->tokens[0] = strdup("bogus"); /* must have at least one token */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_PROC_NAME_KEY,
ORTE_NAME, &proc->name))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
ORTE_PROC_APP_CONTEXT_KEY,
ORTE_STD_CNTR, &proc->app_idx))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
ORTE_NODE_NAME_KEY,
ORTE_STRING, node->nodename))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
ndat->cnt += 1;
}
*data = ndat;
return ORTE_SUCCESS;
}
static bool odls_default_child_died(pid_t pid, unsigned int timeout, int *exit_status)
{
time_t end;
@ -386,8 +509,6 @@ static void odls_default_wait_local_proc(pid_t pid, int status, void* cbdata)
item != opal_list_get_end(&orte_odls_default.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
opal_output(orte_odls_globals.output, "odls: checking child [%ld,%ld,%ld] alive %s",
ORTE_NAME_ARGS(child->name), (child->alive ? "true" : "dead"));
if (child->alive && pid == child->pid) { /* found it */
goto GOTCHILD;
}
@ -402,14 +523,8 @@ static void odls_default_wait_local_proc(pid_t pid, int status, void* cbdata)
return;
GOTCHILD:
opal_output(orte_odls_globals.output, "odls: flushing output for [%ld,%ld,%ld]",
ORTE_NAME_ARGS(child->name));
orte_iof.iof_flush();
opal_output(orte_odls_globals.output, "odls: output for [%ld,%ld,%ld] flushed",
ORTE_NAME_ARGS(child->name));
/* determine the state of this process */
aborted = false;
if(WIFEXITED(status)) {
@ -436,9 +551,6 @@ GOTCHILD:
job, vpid, "abort", NULL );
free(job);
free(vpid);
opal_output(orte_odls_globals.output, "odls: stat'ing file %s for [%ld,%ld,%ld]",
abort_file, ORTE_NAME_ARGS(child->name));
if (0 == stat(abort_file, &buf)) {
/* the abort file must exist - there is nothing in it we need. It's
* meer existence indicates that an abnormal termination occurred

Просмотреть файл

@ -33,6 +33,7 @@
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/odls/odls_types.h"
@ -45,6 +46,17 @@
*/
typedef int (*orte_odls_base_module_subscribe_launch_data_fn_t)(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc);
/*
* Construct a notify data object for use in adding local processes
* In order to reuse daemons, we need a way for the HNP to construct a notify_data object that
* contains the data needed by the active ODLS component to launch a local process. Since the
* only one that knows what a particular ODLS component needs is that component, we require an
* entry point that the HNP can call to get the required notify_data object
*/
typedef int (*orte_odls_base_module_get_add_procs_data_fn_t)(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
/**
* Locally launch the provided processes
*/
@ -66,6 +78,7 @@ typedef int (*orte_odls_base_module_signal_local_process_fn_t)(const orte_proces
*/
struct orte_odls_base_module_1_3_0_t {
orte_odls_base_module_subscribe_launch_data_fn_t subscribe_launch_data;
orte_odls_base_module_get_add_procs_data_fn_t get_add_procs_data;
orte_odls_base_module_launch_local_processes_fn_t launch_local_procs;
orte_odls_base_module_kill_local_processes_fn_t kill_local_procs;
orte_odls_base_module_signal_local_process_fn_t signal_local_procs;

Просмотреть файл

@ -175,6 +175,13 @@ static int orte_odls_process_subscribe_launch_data( orte_jobid_t job,
return rc;
}
static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}
static bool orte_odls_process_child_died( pid_t pid, unsigned int timeout,
int* exit_status )
{
@ -878,6 +885,7 @@ static int orte_odls_process_signal_local_proc(const orte_process_name_t *proc,
orte_odls_base_module_1_3_0_t orte_odls_process_module = {
orte_odls_process_subscribe_launch_data,
orte_odls_process_get_add_procs_data,
orte_odls_process_launch_local_procs,
orte_odls_process_kill_local_procs,
orte_odls_process_signal_local_proc

Просмотреть файл

@ -29,4 +29,5 @@ libmca_pls_la_SOURCES += \
base/pls_base_receive.c \
base/pls_base_select.c \
base/pls_base_dmn_registry_fns.c \
base/pls_base_reuse_daemon_launch.c \
base/pls_base_orted_cmds.c

Просмотреть файл

@ -53,6 +53,8 @@ extern "C" {
opal_mutex_t orted_cmd_lock;
/* orted cmd cond */
opal_condition_t orted_cmd_cond;
/** reuse daemons flag */
bool reuse_daemons;
} orte_pls_base_t;
/**

Просмотреть файл

@ -38,6 +38,7 @@ static void orte_pls_daemon_info_construct(orte_pls_daemon_info_t* ptr)
ptr->nodename = NULL;
ptr->name = NULL;
ptr->active_job = ORTE_JOBID_INVALID;
ptr->ndat = NULL;
}
/* destructor - used to free any resources held by instance */
@ -45,6 +46,7 @@ static void orte_pls_daemon_info_destructor(orte_pls_daemon_info_t* ptr)
{
if (NULL != ptr->nodename) free(ptr->nodename);
if (NULL != ptr->name) free(ptr->name);
if (NULL != ptr->ndat) OBJ_RELEASE(ptr->ndat);
}
OBJ_CLASS_INSTANCE(orte_pls_daemon_info_t, /* type name */
opal_list_item_t, /* parent "class" name */
@ -144,8 +146,9 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job)
orte_cellid_t *cell;
char *nodename;
orte_process_name_t *name;
orte_pls_daemon_info_t *dmn;
orte_pls_daemon_info_t *dmn, *dmn2;
bool found_name, found_node, found_cell;
opal_list_item_t *item;
int rc;
/* setup the key */
@ -203,10 +206,19 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job)
continue;
}
}
/* if we found everything, then this is a valid entry - create
* it and add it to the list
*/
/* if we found everything, then this is a valid entry */
if (found_name && found_node && found_cell) {
/* see if this daemon is already on the list - if so, then we don't add it */
for (item = opal_list_get_first(daemons);
item != opal_list_get_end(daemons);
item = opal_list_get_next(item)) {
dmn2 = (orte_pls_daemon_info_t*)item;
if (ORTE_EQUAL == orte_dss.compare(dmn2->name, name, ORTE_NAME)) {
/* already on list - ignore it */
goto MOVEON;
}
}
dmn = OBJ_NEW(orte_pls_daemon_info_t);
if (NULL == dmn) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -226,6 +238,7 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job)
/* add this daemon to the list */
opal_list_append(daemons, &dmn->super);
}
MOVEON:
OBJ_RELEASE(values[i]);
}
@ -295,3 +308,35 @@ int orte_pls_base_remove_daemon(orte_pls_daemon_info_t *info)
return ORTE_SUCCESS;
}
/*
* Check for available daemons we can re-use
*/
int orte_pls_base_check_avail_daemons(opal_list_t *daemons,
orte_jobid_t job)
{
orte_jobid_t parent;
int rc;
/* check for daemons belonging to the parent job */
if (ORTE_SUCCESS != (rc = orte_ns.get_parent_job(&parent, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(daemons, parent, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* now add in any persistent daemons - they are tagged as bootproxies
* for jobid = 0 */
if (ORTE_SUCCESS != (rc = orte_pls_base_get_active_daemons(daemons, 0, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -53,6 +53,8 @@ orte_pls_base_module_t orte_pls;
*/
int orte_pls_base_open(void)
{
int value;
/* Debugging / verbose output. Always have stream open, with
verbose set by the mca open system... */
orte_pls_base.pls_output = opal_output_open(NULL);
@ -64,6 +66,16 @@ int orte_pls_base_open(void)
OBJ_CONSTRUCT(&orte_pls_base.orted_cmd_lock, opal_mutex_t);
OBJ_CONSTRUCT(&orte_pls_base.orted_cmd_cond, opal_condition_t);
/* check for reuse of daemons */
mca_base_param_reg_int_name("pls", "base_reuse_daemons",
"If nonzero, reuse daemons to launch dynamically spawned processes. If zero, do not reuse daemons (default)",
false, false, (int)false, &value);
if (false == value) {
orte_pls_base.reuse_daemons = false;
} else {
orte_pls_base.reuse_daemons = true;
}
/* Open up all the components that we can find */
if (ORTE_SUCCESS !=

Просмотреть файл

@ -65,7 +65,7 @@ static void orte_pls_base_cmd_ack(int status, orte_process_name_t* sender,
ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL);
if (ret != ORTE_SUCCESS) {
ORTE_ERROR_LOG(ret);
return ret;
return;
}
}
@ -258,7 +258,7 @@ CLEANUP:
}
int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_data_t *ndat)
int orte_pls_base_orted_add_local_procs(opal_list_t *daemons)
{
int rc;
orte_buffer_t cmd;
@ -268,6 +268,15 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da
OPAL_TRACE(1);
/* pack and send the commands as fast as we can - we have to
* pack each time as the launch data could be different for
* the various nodes
*/
for (item = opal_list_get_first(daemons);
item != opal_list_get_end(daemons);
item = opal_list_get_next(item)) {
dmn = (orte_pls_daemon_info_t*)item;
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
/* pack the command */
@ -276,24 +285,19 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da
goto CLEANUP;
}
/* pack the jobid */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &ndat, 1, ORTE_GPR_NOTIFY_DATA))) {
/* pack the launch data for this daemon */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &(dmn->ndat), 1, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* send the commands as fast as we can */
for (item = opal_list_get_first(daemons);
item != opal_list_get_end(daemons);
item = opal_list_get_next(item)) {
dmn = (orte_pls_daemon_info_t*)item;
if (0 > orte_rml.send_buffer_nb(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED,
0, orte_pls_base_orted_send_cb, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&cmd);
return rc;
}
OBJ_DESTRUCT(&cmd);
orted_cmd_num_active++;
}
@ -306,17 +310,18 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_da
return rc;
}
/* wait for all commands to have been received */
/* wait for the command to have been received */
OPAL_THREAD_LOCK(&orte_pls_base.orted_cmd_lock);
if (orted_cmd_num_active > 0) {
opal_condition_wait(&orte_pls_base.orted_cmd_cond, &orte_pls_base.orted_cmd_lock);
}
OPAL_THREAD_UNLOCK(&orte_pls_base.orted_cmd_lock);
return ORTE_SUCCESS;
CLEANUP:
OBJ_DESTRUCT(&cmd);
/* we're done! */
return ORTE_SUCCESS;
return rc;
}

Просмотреть файл

@ -0,0 +1,115 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/orte_constants.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/dss/dss.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/pls/base/pls_private.h"
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job)
{
opal_list_t avail_daemons;
opal_list_item_t *item, *item2, *next;
orte_pls_daemon_info_t *dmn, *newdmn;
orte_mapped_node_t *node;
opal_list_t used_daemons;
orte_gpr_notify_data_t *ndat;
int rc;
OBJ_CONSTRUCT(&avail_daemons, opal_list_t);
OBJ_CONSTRUCT(&used_daemons, opal_list_t);
/* check for available daemons we could use */
if (ORTE_SUCCESS != (rc = orte_pls_base_check_avail_daemons(&avail_daemons, job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* go through the list, checking nodenames against what is in the
* map. If nodes match, then construct and send an appropriate command
* to that daemon to launch the local procs - remove that node structure
* from the map so that the main launcher doesn't also try to start procs
* on that node!
*/
while (NULL != (item = opal_list_remove_first(&avail_daemons))) {
dmn = (orte_pls_daemon_info_t*)item;
item2 = opal_list_get_first(&map->nodes);
while (item2 != opal_list_get_end(&map->nodes)) {
node = (orte_mapped_node_t*)item2;
/* save the next position in case we remove this one */
next = opal_list_get_next(item2);
if (0 == strcmp(node->nodename, dmn->nodename)) {
newdmn = OBJ_NEW(orte_pls_daemon_info_t);
newdmn->cell = dmn->cell;
newdmn->nodename = strdup(dmn->nodename);
newdmn->active_job = job;
orte_dss.copy((void**)&(newdmn->name), dmn->name, ORTE_NAME);
if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(&(newdmn->ndat), job, node))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_append(&used_daemons, &newdmn->super);
/* procs on this node are taken care of, so remove it from
* the map list so the main launcher won't try to launch them
*/
opal_list_remove_item(&map->nodes, item2);
OBJ_RELEASE(item2);
}
/* move to next position */
item2 = next;
}
}
if (0 >= opal_list_get_size(&used_daemons)) {
/* if none were used, then just return */
OBJ_DESTRUCT(&used_daemons);
return ORTE_SUCCESS;
}
/* store the bootproxy records */
orte_pls_base_store_active_daemons(&used_daemons);
/* launch any procs that are using existing daemons */
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_add_local_procs(&used_daemons))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* cleanup */
while (NULL != (item = opal_list_remove_first(&used_daemons))) OBJ_RELEASE(item);
OBJ_DESTRUCT(&used_daemons);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -32,6 +32,7 @@
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/ras/ras_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rmgr/rmgr_types.h"
#include "orte/mca/rml/rml_types.h"
@ -61,6 +62,7 @@ extern "C" {
char *nodename;
orte_process_name_t *name;
orte_jobid_t active_job;
orte_gpr_notify_data_t *ndat;
} orte_pls_daemon_info_t;
OBJ_CLASS_DECLARATION(orte_pls_daemon_info_t);
@ -77,11 +79,14 @@ extern "C" {
int orte_pls_base_orted_exit(opal_list_t *daemons);
int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job);
int orte_pls_base_orted_signal_local_procs(opal_list_t *daemons, int32_t signal);
int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_data_t *ndat);
int orte_pls_base_orted_add_local_procs(opal_list_t *dmnlist);
int orte_pls_base_get_active_daemons(opal_list_t *daemons, orte_jobid_t job, opal_list_t *attrs);
int orte_pls_base_store_active_daemons(opal_list_t *daemons);
int orte_pls_base_remove_daemon(orte_pls_daemon_info_t *info);
int orte_pls_base_check_avail_daemons(opal_list_t *daemons, orte_jobid_t job);
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job);
/*
* communications utilities

Просмотреть файл

@ -82,6 +82,7 @@
#include "orte/mca/smr/smr.h"
#include "orte/mca/pls/pls.h"
#include "orte/mca/pls/base/base.h"
#include "orte/mca/pls/base/pls_private.h"
#include "orte/mca/pls/rsh/pls_rsh.h"
@ -443,9 +444,9 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
int node_name_index2;
int proc_name_index;
int local_exec_index, local_exec_index_end;
char *jobid_string;
char *jobid_string = NULL;
char *uri, *param;
char **argv, **tmp;
char **argv = NULL, **tmp;
char *prefix_dir;
int argc;
int rc;
@ -481,7 +482,21 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
goto cleanup;
}
/* if the user requested that we re-use daemons,
* launch the procs on any existing, re-usable daemons */
if (orte_pls_base.reuse_daemons) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
num_nodes = (orte_std_cntr_t)opal_list_get_size(&map->nodes);
if (0 >= num_nodes) {
/* nothing left to do - just return */
rc = ORTE_SUCCESS;
goto cleanup;
}
if (mca_pls_rsh_component.debug_daemons &&
mca_pls_rsh_component.num_concurrent < num_nodes) {
@ -1094,8 +1109,8 @@ cleanup:
free(bin_base);
}
free(jobid_string); /* done with this variable */
opal_argv_free(argv);
if (NULL != jobid_string) free(jobid_string); /* done with this variable */
if (NULL != argv) opal_argv_free(argv);
return rc;
}

Просмотреть файл

@ -669,6 +669,7 @@ static void orte_daemon_recv_pls(int status, orte_process_name_t* sender,
ORTE_NAME_ARGS(orte_process_info.my_name));
}
/* unpack the notify data object */
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &ndat, &n, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;

Просмотреть файл

@ -114,6 +114,7 @@ struct globals_t {
bool debugger;
bool no_local_schedule;
bool displaymapatlaunch;
bool reuse_daemons;
int num_procs;
int exit_status;
char *hostfile;
@ -262,6 +263,10 @@ opal_cmd_line_init_t cmd_line_init[] = {
&orte_process_info.tmpdir_base, OPAL_CMD_LINE_TYPE_STRING,
"Set the root for the session directory tree for orterun ONLY" },
{ NULL, NULL, NULL, '\0', "reuse-daemons", "reuse-daemons", 0,
&orterun_globals.reuse_daemons, OPAL_CMD_LINE_TYPE_BOOL,
"If set, reuse daemons to launch dynamically spawned processes"},
{ NULL, NULL, NULL, '\0', NULL, "prefix", 1,
NULL, OPAL_CMD_LINE_TYPE_STRING,
"Prefix where Open MPI is installed on remote nodes" },
@ -414,7 +419,6 @@ int orterun(int argc, char *argv[])
free(tmp);
}
/* pre-condition any network transports that require it */
if (ORTE_SUCCESS != (rc = orte_pre_condition_transports(apps, num_apps))) {
ORTE_ERROR_LOG(rc);
@ -1000,6 +1004,18 @@ static int parse_globals(int argc, char* argv[])
mca_base_param_set_int(id, 1);
}
if (orterun_globals.reuse_daemons) {
id = mca_base_param_reg_int_name("pls", "base_reuse_daemons",
"If nonzero, reuse daemons to launch dynamically spawned processes. If zero, do not reuse daemons (default)",
false, false, 0, &ret);
if (orterun_globals.reuse_daemons) {
mca_base_param_set_int(id, (int)true);
} else {
mca_base_param_set_int(id, (int)false);
}
}
/* If we don't want to wait, we don't want to wait */
if (orterun_globals.no_wait_for_job_completion) {