1
1

Modify the reuse daemons procedure so we only generate the add_local_procs message once. Revise the display-map-at-launch option so the RMAPS framework takes responsibility for implementation of that option.

Modify the RMAPS framework so we eliminate communicating a map to a backend node when certain attributes are set. The proxy functions are now implemented in the base, and a check made for HNP/non-HNP operation made in the map_jobs function prior to execution.

This commit was SVN r12619.
Этот коммит содержится в:
Ralph Castain 2006-11-17 19:06:10 +00:00
родитель d248f608b4
Коммит f771cc4fbd
26 изменённых файлов: 330 добавлений и 417 удалений

Просмотреть файл

@ -73,9 +73,7 @@ static int odls_bproc_setup_stdio(orte_process_name_t *proc_name,
orte_std_cntr_t app_context, bool connect_stdin);
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data, orte_job_map_t *map)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}

Просмотреть файл

@ -34,6 +34,9 @@
#include "opal/mca/mca.h"
#include "opal/threads/condition.h"
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/odls/odls.h"
#if defined(c_plusplus) || defined(__cplusplus)
@ -57,9 +60,7 @@ int orte_odls_bproc_finalize(void);
* Interface
*/
int orte_odls_bproc_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc);
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
int orte_odls_bproc_get_add_procs_data(orte_gpr_notify_data_t **data, orte_job_map_t *map);
int orte_odls_bproc_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ);
int orte_odls_bproc_kill_local_procs(orte_jobid_t job, bool set_state);
int orte_odls_bproc_signal_local_procs(const orte_process_name_t* proc_name, int32_t signal);

Просмотреть файл

@ -30,6 +30,7 @@
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/odls/odls.h"
@ -53,9 +54,7 @@ int orte_odls_default_finalize(void);
* Interface
*/
int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc);
int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data, orte_job_map_t *map);
int orte_odls_default_launch_local_procs(orte_gpr_notify_data_t *data, char **base_environ);
int orte_odls_default_kill_local_procs(orte_jobid_t job, bool set_state);
int orte_odls_default_signal_local_procs(const orte_process_name_t *proc,

Просмотреть файл

@ -220,8 +220,7 @@ int orte_odls_default_subscribe_launch_data(orte_jobid_t job, orte_gpr_notify_cb
}
int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
orte_job_map_t *map)
{
orte_gpr_notify_data_t *ndat;
orte_gpr_value_t **values, *value;
@ -236,7 +235,8 @@ int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
ORTE_JOB_VPID_RANGE_KEY,
NULL
};
opal_list_item_t *item;
opal_list_item_t *item, *m_item;
orte_mapped_node_t *node;
orte_mapped_proc_t *proc;
int rc;
char *segment;
@ -251,14 +251,14 @@ int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
}
/* construct a fake trigger name so that the we can extract the jobid from it later */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&(ndat->target), "bogus", job))) {
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&(ndat->target), "bogus", map->job))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
/* get the segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, job))) {
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, map->job))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
@ -283,57 +283,63 @@ int orte_odls_default_get_add_procs_data(orte_gpr_notify_data_t **data,
}
ndat->cnt = 1;
/* the remainder of our required info is in the mapped_node object, so all we
/* the remainder of our required info is in the mapped_node objects, so all we
* have to do is transfer it over
*/
for (item = opal_list_get_first(&node->procs);
item != opal_list_get_end(&node->procs);
item = opal_list_get_next(item)) {
proc = (orte_mapped_proc_t*)item;
for (m_item = opal_list_get_first(&map->nodes);
m_item != opal_list_get_end(&map->nodes);
m_item = opal_list_get_next(m_item)) {
node = (orte_mapped_node_t*)m_item;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, segment, 3, 1))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
for (item = opal_list_get_first(&node->procs);
item != opal_list_get_end(&node->procs);
item = opal_list_get_next(item)) {
proc = (orte_mapped_proc_t*)item;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, 0, segment, 3, 1))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
value->tokens[0] = strdup("bogus"); /* must have at least one token */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_PROC_NAME_KEY,
ORTE_NAME, &proc->name))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
ORTE_PROC_APP_CONTEXT_KEY,
ORTE_STD_CNTR, &proc->app_idx))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
ORTE_NODE_NAME_KEY,
ORTE_STRING, node->nodename))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
ndat->cnt += 1;
}
value->tokens[0] = strdup("bogus"); /* must have at least one token */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_PROC_NAME_KEY,
ORTE_NAME, &proc->name))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]),
ORTE_PROC_APP_CONTEXT_KEY,
ORTE_STD_CNTR, &proc->app_idx))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]),
ORTE_NODE_NAME_KEY,
ORTE_STRING, node->nodename))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_pointer_array_add(&cnt, ndat->values, value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
OBJ_RELEASE(values[0]);
return rc;
}
ndat->cnt += 1;
}
*data = ndat;

Просмотреть файл

@ -51,11 +51,12 @@ typedef int (*orte_odls_base_module_subscribe_launch_data_fn_t)(orte_jobid_t job
* In order to reuse daemons, we need a way for the HNP to construct a notify_data object that
* contains the data needed by the active ODLS component to launch a local process. Since the
* only one that knows what a particular ODLS component needs is that component, we require an
* entry point that the HNP can call to get the required notify_data object
* entry point that the HNP can call to get the required notify_data object. This is constructed
* for *all* nodes - the individual orteds then parse that data to find the specific launch info
* for procs on their node
*/
typedef int (*orte_odls_base_module_get_add_procs_data_fn_t)(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node);
orte_job_map_t *map);
/**
* Locally launch the provided processes

Просмотреть файл

@ -175,9 +175,7 @@ static int orte_odls_process_subscribe_launch_data( orte_jobid_t job,
return rc;
}
static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data,
orte_jobid_t job,
orte_mapped_node_t *node)
static int orte_odls_process_get_add_procs_data(orte_gpr_notify_data_t **data, orte_job_map_t *map)
{
return ORTE_ERR_NOT_IMPLEMENTED;
}

Просмотреть файл

@ -38,7 +38,6 @@ static void orte_pls_daemon_info_construct(orte_pls_daemon_info_t* ptr)
ptr->nodename = NULL;
ptr->name = NULL;
ptr->active_job = ORTE_JOBID_INVALID;
ptr->ndat = NULL;
}
/* destructor - used to free any resources held by instance */
@ -46,7 +45,6 @@ static void orte_pls_daemon_info_destructor(orte_pls_daemon_info_t* ptr)
{
if (NULL != ptr->nodename) free(ptr->nodename);
if (NULL != ptr->name) free(ptr->name);
if (NULL != ptr->ndat) OBJ_RELEASE(ptr->ndat);
}
OBJ_CLASS_INSTANCE(orte_pls_daemon_info_t, /* type name */
opal_list_item_t, /* parent "class" name */
@ -219,6 +217,11 @@ static int get_daemons(opal_list_t *daemons, orte_jobid_t job)
}
/* if we found everything, then this is a valid entry */
if (found_name && found_node && found_cell) {
/* first check if this name is ourself - if so, ignore it */
if (ORTE_EQUAL == orte_dss.compare(name, ORTE_PROC_MY_NAME, ORTE_NAME)) {
goto MOVEON;
}
if (check_dups) {
/* see if this daemon is already on the list - if so, then we don't add it */
for (item = opal_list_get_first(daemons);

Просмотреть файл

@ -258,7 +258,7 @@ CLEANUP:
}
int orte_pls_base_orted_add_local_procs(opal_list_t *daemons)
int orte_pls_base_orted_add_local_procs(opal_list_t *daemons, orte_gpr_notify_data_t *ndat)
{
int rc;
orte_buffer_t cmd;
@ -268,40 +268,36 @@ int orte_pls_base_orted_add_local_procs(opal_list_t *daemons)
OPAL_TRACE(1);
/* pack and send the commands as fast as we can - we have to
* pack each time as the launch data could be different for
* the various nodes
*/
/* pack the command */
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* pack the launch data for the daemons */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &ndat, 1, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
for (item = opal_list_get_first(daemons);
item != opal_list_get_end(daemons);
item = opal_list_get_next(item)) {
dmn = (orte_pls_daemon_info_t*)item;
OBJ_CONSTRUCT(&cmd, orte_buffer_t);
/* pack the command */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* pack the launch data for this daemon */
if (ORTE_SUCCESS != (rc = orte_dss.pack(&cmd, &(dmn->ndat), 1, ORTE_GPR_NOTIFY_DATA))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (0 > orte_rml.send_buffer_nb(dmn->name, &cmd, ORTE_RML_TAG_PLS_ORTED,
0, orte_pls_base_orted_send_cb, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&cmd);
return rc;
}
OBJ_DESTRUCT(&cmd);
orted_cmd_num_active++;
}
OBJ_DESTRUCT(&cmd);
/* post the receive for the ack's */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLS_ORTED_ACK,
ORTE_RML_NON_PERSISTENT, orte_pls_base_cmd_ack, NULL);

Просмотреть файл

@ -33,20 +33,22 @@
#include "orte/mca/pls/base/pls_private.h"
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job)
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map)
{
opal_list_t avail_daemons;
opal_list_item_t *item, *item2, *next;
orte_pls_daemon_info_t *dmn, *newdmn;
orte_mapped_node_t *node;
opal_list_t used_daemons;
orte_gpr_notify_data_t *ndat;
bool found;
int rc;
OBJ_CONSTRUCT(&avail_daemons, opal_list_t);
OBJ_CONSTRUCT(&used_daemons, opal_list_t);
/* check for available daemons we could use */
if (ORTE_SUCCESS != (rc = orte_pls_base_check_avail_daemons(&avail_daemons, job))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_check_avail_daemons(&avail_daemons, map->job))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -58,6 +60,7 @@ int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t j
* on that node!
*/
found = false;
while (NULL != (item = opal_list_remove_first(&avail_daemons))) {
dmn = (orte_pls_daemon_info_t*)item;
@ -72,14 +75,21 @@ int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t j
newdmn = OBJ_NEW(orte_pls_daemon_info_t);
newdmn->cell = dmn->cell;
newdmn->nodename = strdup(dmn->nodename);
newdmn->active_job = job;
newdmn->active_job = map->job;
orte_dss.copy((void**)&(newdmn->name), dmn->name, ORTE_NAME);
if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(&(newdmn->ndat), job, node))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_append(&used_daemons, &newdmn->super);
/* procs on this node are taken care of, so remove it from
/* get the launch message only once - do it the first time
* through so all the nodes are still on the map!
*/
if (!found) {
if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(&ndat, map))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(ndat);
return rc;
}
found = true;
}
/* procs on this node will be taken care of, so remove it from
* the map list so the main launcher won't try to launch them
*/
opal_list_remove_item(&map->nodes, item2);
@ -91,8 +101,8 @@ int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t j
}
}
if (0 >= opal_list_get_size(&used_daemons)) {
/* if none were used, then just return */
if (!found) {
/* if no daemons were reused, then just return */
OBJ_DESTRUCT(&used_daemons);
return ORTE_SUCCESS;
}
@ -101,10 +111,11 @@ int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t j
orte_pls_base_store_active_daemons(&used_daemons);
/* launch any procs that are using existing daemons */
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_add_local_procs(&used_daemons))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_orted_add_local_procs(&used_daemons, ndat))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_RELEASE(ndat);
/* cleanup */
while (NULL != (item = opal_list_remove_first(&used_daemons))) OBJ_RELEASE(item);

Просмотреть файл

@ -62,7 +62,6 @@ extern "C" {
char *nodename;
orte_process_name_t *name;
orte_jobid_t active_job;
orte_gpr_notify_data_t *ndat;
} orte_pls_daemon_info_t;
OBJ_CLASS_DECLARATION(orte_pls_daemon_info_t);
@ -79,14 +78,14 @@ extern "C" {
int orte_pls_base_orted_exit(opal_list_t *daemons);
int orte_pls_base_orted_kill_local_procs(opal_list_t *daemons, orte_jobid_t job);
int orte_pls_base_orted_signal_local_procs(opal_list_t *daemons, int32_t signal);
int orte_pls_base_orted_add_local_procs(opal_list_t *dmnlist);
int orte_pls_base_orted_add_local_procs(opal_list_t *dmnlist, orte_gpr_notify_data_t *ndat);
int orte_pls_base_get_active_daemons(opal_list_t *daemons, orte_jobid_t job, opal_list_t *attrs);
int orte_pls_base_store_active_daemons(opal_list_t *daemons);
int orte_pls_base_remove_daemon(orte_pls_daemon_info_t *info);
int orte_pls_base_check_avail_daemons(opal_list_t *daemons, orte_jobid_t job);
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map, orte_jobid_t job);
int orte_pls_base_launch_on_existing_daemons(orte_job_map_t *map);
/*
* communications utilities

Просмотреть файл

@ -235,7 +235,7 @@ int orte_pls_gridengine_launch_job(orte_jobid_t jobid)
* launch the procs on any existing, re-usable daemons
*/
if (orte_pls_base.reuse_daemons) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(map);
OBJ_DESTRUCT(&daemons);

Просмотреть файл

@ -488,7 +488,7 @@ int orte_pls_rsh_launch(orte_jobid_t jobid)
* launch the procs on any existing, re-usable daemons
*/
if (orte_pls_base.reuse_daemons) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(map);
OBJ_DESTRUCT(&active_daemons);

Просмотреть файл

@ -166,7 +166,7 @@ static int pls_slurm_launch_job(orte_jobid_t jobid)
* launch the procs on any existing, re-usable daemons
*/
if (orte_pls_base.reuse_daemons) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(map);
OBJ_DESTRUCT(&daemons);

Просмотреть файл

@ -172,7 +172,7 @@ static int pls_tm_launch_job(orte_jobid_t jobid)
* launch the procs on any existing, re-usable daemons
*/
if (orte_pls_base.reuse_daemons) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map, jobid))) {
if (ORTE_SUCCESS != (rc = orte_pls_base_launch_on_existing_daemons(map))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(map);
return rc;

Просмотреть файл

@ -27,6 +27,7 @@ libmca_rmaps_la_SOURCES += \
base/rmaps_base_close.c \
base/rmaps_base_registry_fns.c \
base/rmaps_base_map_job.c \
base/rmaps_base_proxy_map_job.c \
base/rmaps_base_support_fns.c \
base/rmaps_base_open.c \
base/rmaps_base_receive.c \

Просмотреть файл

@ -63,6 +63,8 @@ extern "C" {
bool per_node;
/* do we not allow use of the localhost */
bool no_use_local;
/* display the map after it is computed */
bool display_map;
} orte_rmaps_base_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_rmaps_base_t);
/**

Просмотреть файл

@ -55,6 +55,11 @@ int orte_rmaps_base_map_job(orte_jobid_t job, opal_list_t *attributes)
orte_job_map_t *map;
int rc;
/* if we are not on the head node, use the proxy component */
if (!orte_process_info.seed) {
return orte_rmaps_base_proxy_map_job(job, attributes);
}
/* check the attributes to see if anything in the environment
* has been overridden. If not, then install the environment
* values to correctly control the behavior of the RMAPS component.
@ -202,7 +207,8 @@ int orte_rmaps_base_map_job(orte_jobid_t job, opal_list_t *attributes)
}
/* if we wanted to display the map, now is the time to do it */
if (NULL != orte_rmgr.find_attribute(attributes, ORTE_RMAPS_DISPLAY_AFTER_MAP)) {
if (orte_rmaps_base.display_map ||
NULL != orte_rmgr.find_attribute(attributes, ORTE_RMAPS_DISPLAY_AFTER_MAP)) {
orte_rmaps.get_job_map(&map, job);
orte_dss.dump(0, map, ORTE_JOB_MAP);
}

Просмотреть файл

@ -130,6 +130,16 @@ int orte_rmaps_base_open(void)
orte_rmaps_base.oversubscribe = false;
}
/* should we display the map after determining it? */
mca_base_param_reg_int_name("rmaps_base", "display_map",
"Whether to display the process map after it is computed",
false, false, (int)false, &value);
if ((int)false == value) {
orte_rmaps_base.display_map = false;
} else {
orte_rmaps_base.display_map = true;
}
/** register the base system types with the DSS */
tmp = ORTE_JOB_MAP;
if (ORTE_SUCCESS != (rc = orte_dss.register_type(orte_rmaps_base_pack_map,

Просмотреть файл

@ -21,19 +21,18 @@
#include "orte/orte_constants.h"
#include "orte/dss/dss.h"
#include "orte/runtime/runtime.h"
#include "orte/util/proc_info.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/base/rmaps_private.h"
#include "orte/mca/rmaps/proxy/rmaps_proxy.h"
/*
* Map a job
*/
int orte_rmaps_proxy_map(orte_jobid_t job, opal_list_t *attributes)
int orte_rmaps_base_proxy_map_job(orte_jobid_t job, opal_list_t *attributes)
{
orte_buffer_t* cmd;
orte_buffer_t* answer;
@ -71,7 +70,7 @@ int orte_rmaps_proxy_map(orte_jobid_t job, opal_list_t *attributes)
}
/* send the request */
if (0 > orte_rml.send_buffer(orte_rmaps_proxy_globals.replica, cmd, ORTE_RML_TAG_RMAPS, 0)) {
if (0 > orte_rml.send_buffer(ORTE_PROC_MY_HNP, cmd, ORTE_RML_TAG_RMAPS, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(cmd);
return ORTE_ERR_COMM_FAILURE;
@ -86,7 +85,7 @@ int orte_rmaps_proxy_map(orte_jobid_t job, opal_list_t *attributes)
}
/* enter a blocking receive until we hear back */
if (0 > orte_rml.recv_buffer(orte_rmaps_proxy_globals.replica, answer, ORTE_RML_TAG_RMAPS)) {
if (0 > orte_rml.recv_buffer(ORTE_PROC_MY_HNP, answer, ORTE_RML_TAG_RMAPS)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(answer);
return ORTE_ERR_COMM_FAILURE;

Просмотреть файл

@ -177,6 +177,8 @@ ORTE_DECLSPEC int orte_rmaps_base_claim_slot(orte_job_map_t *map,
opal_list_t *fully_used_nodes,
bool oversubscribe);
ORTE_DECLSPEC int orte_rmaps_base_proxy_map_job(orte_jobid_t job, opal_list_t *attributes);
/** Local data type functions */
void orte_rmaps_base_std_obj_release(orte_data_value_t *value);

Просмотреть файл

@ -1,46 +0,0 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
rmaps_proxy.h \
rmaps_proxy_component.c \
rmaps_proxy.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_rmaps_proxy_DSO
component_noinst =
component_install = mca_rmaps_proxy.la
else
component_noinst = libmca_rmaps_proxy.la
component_install =
endif
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)
mca_rmaps_proxy_la_SOURCES = $(sources)
mca_rmaps_proxy_la_LDFLAGS = -module -avoid-version
mca_rmaps_proxy_la_LIBADD = \
$(top_ompi_builddir)/orte/liborte.la \
$(top_ompi_builddir)/opal/libopal.la
noinst_LTLIBRARIES = $(component_noinst)
libmca_rmaps_proxy_la_SOURCES =$(sources)
libmca_rmaps_proxy_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,23 +0,0 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_INIT_FILE=rmaps_proxy_component.c
PARAM_CONFIG_FILES="Makefile"

Просмотреть файл

@ -1,75 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#ifndef ORTE_RMAPS_PROXY_H
#define ORTE_RMAPS_PROXY_H
#include "orte_config.h"
#include "orte/orte_types.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/rmaps/rmaps.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/*
* Module open / close
*/
int orte_rmaps_proxy_open(void);
int orte_rmaps_proxy_close(void);
/*
* Startup / Shutdown
*/
orte_rmaps_base_module_t*
orte_rmaps_proxy_component_init(int *priority);
int orte_rmaps_proxy_finalize(void);
/*
* globals used within the component
*/
typedef struct {
int debug;
orte_process_name_t *replica;
} orte_rmaps_proxy_globals_t;
extern orte_rmaps_proxy_globals_t orte_rmaps_proxy_globals;
/*
* Component API functions
*/
int orte_rmaps_proxy_map(orte_jobid_t job, opal_list_t *attributes);
/*
* Global component.
*/
ORTE_MODULE_DECLSPEC extern orte_rmaps_base_component_t mca_rmaps_proxy_component;
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -1,153 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI General Purpose Registry - Proxy component
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/orte_constants.h"
#include "orte/orte_types.h"
#include "opal/util/output.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/util/proc_info.h"
#include "orte/mca/ns/ns_types.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/rmaps/base/rmaps_private.h"
#include "rmaps_proxy.h"
/*
* Struct of function pointers that need to be initialized
*/
orte_rmaps_base_component_t mca_rmaps_proxy_component = {
{
ORTE_RMAPS_BASE_VERSION_1_3_0,
"proxy", /* MCA module name */
ORTE_MAJOR_VERSION, /* MCA module major version */
ORTE_MINOR_VERSION, /* MCA module minor version */
ORTE_RELEASE_VERSION, /* MCA module release version */
orte_rmaps_proxy_open, /* module open */
orte_rmaps_proxy_close /* module close */
},
{
false /* checkpoint / restart */
},
orte_rmaps_proxy_component_init /* module init */
};
/*
* setup the function pointers for the module
*/
static orte_rmaps_base_module_t orte_rmaps_proxy = {
orte_rmaps_proxy_map,
orte_rmaps_base_get_job_map,
orte_rmaps_base_get_node_map,
orte_rmaps_proxy_finalize
};
/*
* Whether or not we allowed this component to be selected
*/
static bool initialized = false;
/* local globals */
orte_rmaps_proxy_globals_t orte_rmaps_proxy_globals;
/*
* Open the component
*/
int orte_rmaps_proxy_open(void)
{
int id, tmp;
id = mca_base_param_register_int("rmaps", "proxy", "debug", NULL, 0);
mca_base_param_lookup_int(id, &tmp);
if (tmp) {
orte_rmaps_proxy_globals.debug = true;
} else {
orte_rmaps_proxy_globals.debug = false;
}
return ORTE_SUCCESS;
}
/*
* Close the component
*/
int orte_rmaps_proxy_close(void)
{
return ORTE_SUCCESS;
}
orte_rmaps_base_module_t*
orte_rmaps_proxy_component_init(int *priority)
{
if (orte_rmaps_proxy_globals.debug) {
opal_output(0, "rmaps_proxy_init called");
}
/* If we are an HNP or an orted, then don't pick us! */
if (orte_process_info.seed || orte_process_info.daemon) {
/* don't take me! */
return NULL;
}
/* Return a module (choose an arbitrary, positive priority --
it's only relevant compared to other components). */
*priority = 10;
/* define the replica for us to use - for now, just point
* to the name service replica
*/
orte_rmaps_proxy_globals.replica = orte_process_info.ns_replica;
initialized = true;
return &orte_rmaps_proxy;
}
/*
* finalize routine
*/
int orte_rmaps_proxy_finalize(void)
{
if (orte_rmaps_proxy_globals.debug) {
opal_output(0, "[%lu,%lu,%lu] rmaps_proxy_finalize called",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
initialized = false;
/* All done */
return ORTE_SUCCESS;
}

Просмотреть файл

@ -47,6 +47,7 @@
#include "opal/util/argv.h"
#include "opal/util/basename.h"
#include "opal/util/cmd_line.h"
#include "opal/util/daemon_init.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/show_help.h"
@ -69,6 +70,8 @@
#include "orte/mca/schema/schema.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/orte_wait.h"
@ -78,6 +81,8 @@
*/
static orte_jobid_t jobid = ORTE_JOBID_INVALID;
static char *orteboot_basename = NULL;
static struct opal_event term_handler;
static struct opal_event int_handler;
/*
* setup globals for catching orteboot command line options
@ -88,6 +93,7 @@ struct globals_t {
bool verbose;
bool quiet;
bool exit;
bool debug;
char *hostfile;
char *wdir;
opal_mutex_t lock;
@ -171,6 +177,12 @@ extern char** environ;
/*
* Local functions
*/
static void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata);
static void abort_signal_callback(int fd, short event, void *arg);
static void exit_callback(int fd, short event, void *arg);
int main(int argc, char *argv[])
{
@ -188,6 +200,7 @@ int main(int argc, char *argv[])
orteboot_globals.version = false;
orteboot_globals.verbose = false;
orteboot_globals.exit = false;
orteboot_globals.debug = false;
/* Setup MCA params */
mca_base_param_init();
@ -275,7 +288,10 @@ int main(int argc, char *argv[])
return rc;
}
free(tmp);
/* set the local debug flag too! */
orteboot_globals.debug = true;
}
id = mca_base_param_reg_int_name("orte_debug", "daemons_file",
"Whether want stdout/stderr of daemons to go to a file or not",
false, false, 0, &iparam);
@ -304,6 +320,13 @@ int main(int argc, char *argv[])
free(tmp);
}
/* detach from controlling terminal
* otherwise, remain attached so output can get to the user
*/
if(orteboot_globals.debug == false) {
opal_daemon_init(NULL);
}
/* Intialize our Open RTE environment */
/* Set the flag telling orte_init that I am NOT a
* singleton, but am "infrastructure" - prevents setting
@ -316,6 +339,21 @@ int main(int argc, char *argv[])
return rc;
}
/** setup callbacks for abort signals */
opal_signal_set(&term_handler, SIGTERM,
abort_signal_callback, &term_handler);
opal_signal_add(&term_handler, NULL);
opal_signal_set(&int_handler, SIGINT,
abort_signal_callback, &int_handler);
opal_signal_add(&int_handler, NULL);
/* issue the non-blocking receive */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* Prep to start the virtual machine */
/* construct the list of attributes */
OBJ_CONSTRUCT(&attributes, opal_list_t);
@ -341,8 +379,153 @@ int main(int argc, char *argv[])
OBJ_DESTRUCT(&attributes);
/* setup and enter the event monitor */
OPAL_THREAD_LOCK(&orteboot_globals.lock);
while (false == orteboot_globals.exit) {
opal_condition_wait(&orteboot_globals.cond, &orteboot_globals.lock);
}
OPAL_THREAD_UNLOCK(&orteboot_globals.lock);
orte_finalize();
free(orteboot_basename);
return rc;
}
static void exit_callback(int fd, short event, void *arg)
{
/* Remove the TERM and INT signal handlers */
opal_signal_del(&term_handler);
opal_signal_del(&int_handler);
/* Trigger the normal exit conditions */
orteboot_globals.exit = true;
opal_condition_signal(&orteboot_globals.cond);
}
static void abort_signal_callback(int fd, short flags, void *arg)
{
int ret;
struct timeval tv = { 1, 0 };
opal_event_t* event;
opal_list_t attrs;
opal_list_item_t *item;
static int signalled = 0;
OPAL_TRACE(1);
if (0 != signalled++) {
return;
}
fprintf(stderr, "%s: killing job...\n\n", orteboot_basename);
/* terminate the vm - this will also wake us up so we can exit */
OBJ_CONSTRUCT(&attrs, opal_list_t);
orte_rmgr.add_attribute(&attrs, ORTE_NS_INCLUDE_DESCENDANTS, ORTE_UNDEF, NULL, ORTE_RMGR_ATTR_OVERRIDE);
ret = orte_pls.terminate_orteds(0, &attrs);
while (NULL != (item = opal_list_remove_first(&attrs))) OBJ_RELEASE(item);
OBJ_DESTRUCT(&attrs);
/* setup a delay to give the orteds time to complete their departure */
if (NULL != (event = (opal_event_t*)malloc(sizeof(opal_event_t)))) {
opal_evtimer_set(event, exit_callback, NULL);
opal_evtimer_add(event, &tv);
}
}
static void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_buffer_t *answer;
orte_daemon_cmd_flag_t command;
int ret;
orte_std_cntr_t n;
char *contact_info;
OPAL_TRACE(1);
OPAL_THREAD_LOCK(&orteboot_globals.lock);
if (orteboot_globals.debug) {
opal_output(0, "orteboot: received message from [%ld,%ld,%ld]", ORTE_NAME_ARGS(sender));
}
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto DONE;
}
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &command, &n, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/**** EXIT COMMAND ****/
if (ORTE_DAEMON_EXIT_CMD == command) {
if (orteboot_globals.debug) {
opal_output(0, "orteboot: received exit");
}
orteboot_globals.exit = true;
opal_condition_signal(&orteboot_globals.cond);
goto CLEANUP;
/**** CONTACT QUERY COMMAND ****/
} else if (ORTE_DAEMON_CONTACT_QUERY_CMD == command) {
/* send back contact info */
contact_info = orte_rml.get_uri();
if (NULL == contact_info) {
ORTE_ERROR_LOG(ORTE_ERROR);
goto CLEANUP;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(answer, &contact_info, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
}
goto CLEANUP;
/**** HOSTFILE COMMAND ****/
} else if (ORTE_DAEMON_HOSTFILE_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP;
/**** SCRIPTFILE COMMAND ****/
} else if (ORTE_DAEMON_SCRIPTFILE_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP;
/**** HEARTBEAT COMMAND ****/
} else if (ORTE_DAEMON_HEARTBEAT_CMD == command) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
goto CLEANUP;
}
CLEANUP:
OBJ_RELEASE(answer);
DONE:
OPAL_THREAD_UNLOCK(&orteboot_globals.lock);
/* reissue the non-blocking receive */
ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON, ORTE_RML_NON_PERSISTENT, orte_daemon_recv, NULL);
if (ret != ORTE_SUCCESS && ret != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(ret);
}
return;
}

Просмотреть файл

@ -199,7 +199,7 @@ opal_cmd_line_init_t cmd_line_init[] = {
{ NULL, NULL, NULL, '\0', "nooversubscribe", "nooversubscribe", 0,
&orterun_globals.no_oversubscribe, OPAL_CMD_LINE_TYPE_BOOL,
"Nodes are not to be oversubscribed, even if the system supports such operation"},
{ NULL, NULL, NULL, '\0', "display-map-at-launch", "display-map-at-launch", 0,
{ "rmaps", "base", "display_map", '\0', "display-map-at-launch", "display-map-at-launch", 0,
NULL, OPAL_CMD_LINE_TYPE_BOOL,
"Display the process map just before launch"},
@ -792,12 +792,7 @@ static void abort_signal_callback(int fd, short flags, void *arg)
}
}
/* setup a delay - if the timer fires, then we assume that the orteds
* failed to properly kill the job. This can happen for a variety of
* reasons - for example, if an application couldn't be found, then
* the local orted may not be able to tell us it "terminated" since
* it never actually started
*/
/* setup a delay to give the orteds time to complete their departure */
if (NULL != (event = (opal_event_t*)malloc(sizeof(opal_event_t)))) {
opal_evtimer_set(event, exit_callback, NULL);
opal_evtimer_add(event, &tv);