1
1

* version of the tm pls that uses the proxy orteds, avoiding all the nasty

multi-client issues the old version had.  Also, ignore the NULL iof
   component, since we shouldn't use it when using the proxy orteds

This commit was SVN r6939.
Этот коммит содержится в:
Brian Barrett 2005-08-19 16:49:59 +00:00
родитель 80f27b5d87
Коммит e737bba753
10 изменённых файлов: 439 добавлений и 1068 удалений

Просмотреть файл

Просмотреть файл

@ -1,2 +0,0 @@
jsquyres
brbarret

Просмотреть файл

@ -22,10 +22,8 @@ AM_CPPFLAGS = $(pls_tm_CPPFLAGS)
sources = \
pls_tm.h \
pls_tm_child.c \
pls_tm_component.c \
pls_tm_module.c \
pls_tm_registry.c
pls_tm_module.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

Просмотреть файл

@ -22,43 +22,23 @@
#include "mca/mca.h"
#include "mca/pls/pls.h"
#include "tm.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
struct orte_pls_tm_component_t {
orte_pls_base_component_t super;
int priority;
int debug;
char *orted;
};
typedef struct orte_pls_tm_component_t orte_pls_tm_component_t;
/* Globally exported variables */
OMPI_COMP_EXPORT extern orte_pls_base_component_1_0_0_t
orte_pls_tm_component;
OMPI_COMP_EXPORT extern orte_pls_base_module_1_0_0_t
orte_pls_tm_module;
OMPI_COMP_EXPORT extern orte_pls_tm_component_t mca_pls_tm_component;
extern orte_pls_base_module_1_0_0_t orte_pls_tm_module;
/* Global, but not exported variables */
extern bool orte_pls_tm_connected;
/* Internal struct */
typedef struct pls_tm_proc_state_t {
tm_task_id tid;
uint32_t state;
} pls_tm_proc_state_t;
/* Local functions */
int orte_pls_tm_put_tid(const orte_process_name_t* name,
tm_task_id tid, int state);
int orte_pls_tm_get_tids(orte_jobid_t jobid, tm_task_id **tids,
orte_process_name_t **names, size_t *size);
/* Child process functions */
int orte_pls_tm_child_init(void);
int orte_pls_tm_child_launch(orte_jobid_t jobid);
int orte_pls_tm_child_wait(orte_jobid_t jobid);
int orte_pls_tm_child_finalize(void);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -1,592 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "orte_config.h"
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <signal.h>
#include "include/orte_constants.h"
#include "include/orte_types.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "runtime/runtime.h"
#include "runtime/orte_wait.h"
#include "mca/base/mca_base_param.h"
#include "mca/rmgr/base/base.h"
#include "mca/rmaps/base/rmaps_base_map.h"
#include "mca/pls/pls.h"
#include "mca/pls/base/base.h"
#include "mca/errmgr/errmgr.h"
#include "mca/soh/soh_types.h"
#include "mca/gpr/gpr.h"
#include "orte/mca/sds/base/base.h"
#include "mca/soh/soh.h"
#include "mca/rml/rml.h"
#include "mca/ns/ns.h"
#include "pls_tm.h"
/*
* Local functions
*/
static int do_tm_resolve(char *hostnames, tm_node_id *tnodeid);
static int query_tm_hostnames(void);
static char* get_tm_hostname(tm_node_id node);
/*
* Local variables. Note that these are only used *per child
* process*, so we're guaranteed that only one thread will be using
* these -- no need for locking.
*/
static char **tm_hostnames = NULL;
static tm_node_id *tm_node_ids = NULL;
static tm_task_id *task_ids = NULL;
static size_t num_spawned = 0;
static int num_tm_hostnames, num_node_ids;
static orte_process_name_t *names = NULL;
int orte_pls_tm_child_init(void)
{
int ret;
char* uri;
orte_cellid_t new_cellid;
orte_jobid_t new_jobid;
orte_vpid_t new_vpid;
orte_process_name_t *new_child_name;
/* Re-start us as a new ORTE process */
opal_set_using_threads(false);
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: starting");
if (NULL == (uri = orte_rml.get_uri())) {
ORTE_ERROR_LOG(ORTE_ERROR);
exit(-1);
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: got uri: %s", uri);
orte_ns.get_cellid(&new_cellid, orte_process_info.my_name);
orte_ns.get_jobid(&new_jobid, orte_process_info.my_name);
new_vpid = 1;
orte_ns.reserve_range(new_jobid, 1, &new_vpid);
if (ORTE_JOBID_MAX == new_jobid ||
ORTE_CELLID_MAX == new_cellid ||
ORTE_VPID_MAX == new_vpid) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
exit(-1);
}
ret = orte_ns.create_process_name(&new_child_name, new_cellid,
new_jobid, new_vpid);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
exit(-1);
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: restarting ORTE");
ret = orte_restart(new_child_name, uri);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
exit(-1);
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: am now a new ORTE process");
/* All done */
return ORTE_SUCCESS;
}
int orte_pls_tm_child_launch(orte_jobid_t jobid)
{
int ret, local_errno;
size_t i, j;
tm_event_t event;
char *flat;
char old_cwd[OMPI_PATH_MAX];
opal_list_t mapping;
bool mapping_valid = false;
opal_list_item_t *item;
char **mca_env = NULL, **tmp_env, **local_env;
char *path, *new_path;
int num_mca_env;
orte_rmaps_base_proc_t *proc;
orte_app_context_t *app;
bool failure;
tm_node_id tnodeid;
struct tm_roots tm_root;
/* Open up our connection to tm */
ret = tm_init(NULL, &tm_root);
if (TM_SUCCESS != ret) {
return ORTE_ERR_RESOURCE_BUSY;
}
orte_pls_tm_connected = true;
/* Get the hostnames from the output of the mapping. Since we
have to cross reference against TM, it's much more efficient to
do all the nodes in the entire map all at once. */
OBJ_CONSTRUCT(&mapping, opal_list_t);
if (ORTE_SUCCESS != (ret = orte_rmaps_base_get_map(jobid, &mapping))) {
goto cleanup;
}
mapping_valid = true;
/* Count how many processes we're starting so that we can allocate
space for all the tid's */
for (failure = false, i = 0, item = opal_list_get_first(&mapping);
!failure && item != opal_list_get_end(&mapping);
item = opal_list_get_next(item)) {
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*) item;
i += map->num_procs;
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: found a total of %d procs", i);
task_ids = malloc((sizeof(tm_task_id) * i) +
(sizeof(orte_process_name_t) * i));
if (NULL == task_ids) {
ret = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
names = (orte_process_name_t*) (task_ids + i);
memset(names, 0, sizeof(orte_process_name_t) * i);
/* Make up an environment for all the job processes. */
mca_env = NULL;
num_mca_env = 0;
mca_base_param_build_env(&mca_env, &num_mca_env, true);
/* While we're traversing these data structures, also setup the
proc_status array for later a "put" to the registry */
getcwd(old_cwd, OMPI_PATH_MAX);
failure = false;
for (num_spawned = i = 0, item = opal_list_get_first(&mapping);
!failure && item != opal_list_get_end(&mapping);
item = opal_list_get_next(item), ++i) {
orte_rmaps_base_map_t* map = (orte_rmaps_base_map_t*) item;
app = map->app;
/* See if the app cwd exists; try changing to the cwd and then
changing back */
if (0 != chdir(app->cwd)) {
ret = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: app %d cwd (%s) exists",
i, app->cwd);
/* Get a full pathname for argv[0] -- tm won't spawn without
an absolute pathname. :-( app->app is already an absolute
pathname, so don't even bother to check -- just replace
argv[0] with app->app. */
free(app->argv[0]);
app->argv[0] = strdup(app->app);
flat = opal_argv_join(app->argv, ' ');
/* Make a global env for the app */
tmp_env = opal_environ_merge(app->env, mca_env);
local_env = opal_environ_merge(environ, tmp_env);
if (NULL != tmp_env) {
opal_argv_free(tmp_env);
}
/* Ensure "." is in the PATH. If it's not there, add it at
the end */
for (j = 0; NULL != local_env[j]; ++j) {
if (0 == strncmp("PATH=", local_env[j], 5)) {
path = local_env[j] + 5;
if (0 != strcmp(".", path) &&
0 != strncmp(".:", path, 2) &&
NULL == strstr(":.:", path) &&
0 != strncmp(":.", path + strlen(path) - 2, 2)) {
asprintf(&new_path, "PATH=%s:.", path);
free(local_env[j]);
local_env[j] = new_path;
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: appended \".\" to PATH");
break;
}
}
}
/* Now iterate through all the procs in this app and launch them */
for (j = 0; j < map->num_procs; ++j, ++num_spawned) {
proc = map->procs[j];
/* Get a TM node ID for the node for this proc */
if (ORTE_SUCCESS != do_tm_resolve(proc->proc_node->node_name,
&tnodeid)) {
ret = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/* Set the job name in the environment */
orte_ns_nds_env_put(&proc->proc_name, num_spawned, 1, &local_env);
/* Launch it */
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: starting process %d (%s) on %s (TM node id %d)",
num_spawned, flat, proc->proc_node->node_name,
tnodeid);
if (TM_SUCCESS != tm_spawn(app->argc, app->argv,
local_env, tnodeid,
&task_ids[num_spawned], &event)) {
ret = ORTE_ERR_RESOURCE_BUSY;
ORTE_ERROR_LOG(ret);
goto loop_error;
}
ret = tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
if (TM_SUCCESS != ret) {
ret = ORTE_ERR_RESOURCE_BUSY;
ORTE_ERROR_LOG(ret);
goto loop_error;
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: launch successful (tid %d); posting to registry", task_ids[num_spawned]);
/* Write this proc's TID to the registry (so that we can
kill it if we need to) */
orte_pls_tm_put_tid(&(proc->proc_name), task_ids[num_spawned],
ORTE_PROC_STATE_LAUNCHED);
/* Bastardize this function to set our state to
ORTE_PROC_STATE_LAUNCHED with a bogus PID (make it
equal this proc's index in the overall job -- i.e.,
rank in MPI_COMM_WORLD) */
ret = orte_pls_base_set_proc_pid(&(proc->proc_name), num_spawned);
if (ORTE_SUCCESS != ret) {
ret = ORTE_ERR_RESOURCE_BUSY;
ORTE_ERROR_LOG(ret);
goto loop_error;
}
/* Save the name so that we can use it later */
names[num_spawned] = proc->proc_name;
/* Ok, we succeeded in lauching that process. Loop around
to get the next. */
continue;
loop_error:
/* Hack so that we don't have to make the
pls_tm_terminate_job globally scoped */
orte_pls_tm_module.terminate_job(jobid);
failure = true;
break;
}
/* Now go back to the original cwd */
if (0 != chdir(old_cwd)) {
ret = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
/* Free things from the last app */
opal_argv_free(local_env);
free(flat);
}
/* All done */
cleanup:
opal_output(orte_pls_base.pls_output,
"pls:tm:launch:child: launched %d processes", num_spawned);
if (NULL != mca_env) {
opal_argv_free(mca_env);
}
if (mapping_valid) {
while (NULL != (item = opal_list_remove_first(&mapping))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&mapping);
}
tm_finalize();
orte_pls_tm_connected = false;
return ret;
}
/*
* Waiting for the death of all the tm_spawn'ed processes.
*/
int orte_pls_tm_child_wait(orte_jobid_t jobid)
{
size_t i, j;
int ret, local_errno, *exit_statuses;
tm_event_t event, *events;
struct tm_roots tm_root;
opal_output(orte_pls_base.pls_output,
"pls:tm:wait:child: waiting for processes to exit");
/* Open up our connection to tm */
ret = tm_init(NULL, &tm_root);
if (TM_SUCCESS != ret) {
ret = ORTE_ERR_RESOURCE_BUSY;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
orte_pls_tm_connected = true;
/* Setup to wait for all the tid's to die */
events = malloc((sizeof(tm_event_t) * num_spawned) +
(sizeof(int) * num_spawned));
if (NULL == events) {
ret = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
exit_statuses = (int*) (events + num_spawned);
/* Ask for all obituaries */
for (i = 0; i < num_spawned; ++i) {
ret = tm_obit(task_ids[i], &exit_statuses[i], &events[i]);
if (TM_SUCCESS != ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tm_obit failed with %d", ret);
ret = ORTE_ERROR;
ORTE_ERROR_LOG(ret);
goto cleanup;
}
}
/* Poll until we get all obituaries */
for (i = 0; i < num_spawned; ++i) {
tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
for (j = 0; j < num_spawned; ++j) {
if (event == events[j]) {
opal_output(orte_pls_base.pls_output,
"pls:tm:wait:child: caught obit for tid %d",
task_ids[j]);
ret = orte_soh.set_proc_soh(&names[j],
ORTE_PROC_STATE_TERMINATED,
exit_statuses[j]);
events[j] = TM_NULL_EVENT;
break;
}
}
if (j >= num_spawned) {
fprintf(stderr, "Whoops! Didn't find return event!\n");
}
}
cleanup:
opal_output(orte_pls_base.pls_output,
"pls:tm:wait:child: done waiting for process obits");
if (NULL != events) {
free(events);
}
/* All done */
if (orte_pls_tm_connected) {
tm_finalize();
}
orte_pls_tm_connected = false;
return ORTE_SUCCESS;
}
int orte_pls_tm_child_finalize(void)
{
if (NULL != tm_hostnames) {
opal_argv_free(tm_hostnames);
tm_hostnames = NULL;
}
if (NULL != tm_node_ids) {
free(tm_node_ids);
tm_node_ids = NULL;
}
/* All done */
opal_output(orte_pls_base.pls_output,
"pls:tm:finalize:child: all done -- exiting");
orte_finalize();
return ORTE_SUCCESS;
}
/***********************************************************************/
/*
* Take a list of hostnames and return their corresponding TM node
* ID's. This is not the most efficient method of doing this, but
* it's not much of an issue here (this is not a performance-critical
* section of code)
*/
static int do_tm_resolve(char *hostname, tm_node_id *tnodeid)
{
int i, ret;
/* Have we already queried TM for all the node info? */
if (NULL == tm_hostnames) {
ret = query_tm_hostnames();
if (ORTE_SUCCESS != ret) {
return ret;
}
}
/* Find the TM ID of the hostname that we're looking for */
for (i = 0; i < num_tm_hostnames; ++i) {
if (0 == strcmp(hostname, tm_hostnames[i])) {
*tnodeid = tm_node_ids[i];
opal_output(orte_pls_base.pls_output,
"pls:tm:launch: resolved host %s to node ID %d",
hostname, tm_node_ids[i]);
break;
}
}
/* All done */
if (i < num_tm_hostnames) {
ret = ORTE_SUCCESS;
} else {
ret = ORTE_ERR_NOT_FOUND;
}
return ret;
}
static int query_tm_hostnames(void)
{
char *h;
int i, ret;
/* Get the list of nodes allocated in this PBS job */
ret = tm_nodeinfo(&tm_node_ids, &num_node_ids);
if (TM_SUCCESS != ret) {
return ORTE_ERR_NOT_FOUND;
}
/* TM "nodes" may actually correspond to PBS "VCPUs", which means
there may be multiple "TM nodes" that correspond to the same
physical node. This doesn't really affect what we're doing
here (we actually ignore the fact that they're duplicates --
slightly inefficient, but no big deal); just mentioned for
completeness... */
tm_hostnames = NULL;
num_tm_hostnames = 0;
for (i = 0; i < num_node_ids; ++i) {
h = get_tm_hostname(tm_node_ids[i]);
opal_argv_append(&num_tm_hostnames, &tm_hostnames, h);
free(h);
}
/* All done */
return ORTE_SUCCESS;
}
/*
* For a given TM node ID, get the string hostname corresponding to
* it.
*/
static char* get_tm_hostname(tm_node_id node)
{
int ret, local_errno;
char *hostname;
tm_event_t event;
char buffer[256];
char **argv;
/* Get the info string corresponding to this TM node ID */
ret = tm_rescinfo(node, buffer, sizeof(buffer) - 1, &event);
if (TM_SUCCESS != ret) {
return NULL;
}
/* Now wait for that event to happen */
ret = tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
if (TM_SUCCESS != ret) {
return NULL;
}
/* According to the TM man page, we get back a space-separated
string array. The hostname is the second item. Use a cheap
trick to get it. */
buffer[sizeof(buffer) - 1] = '\0';
argv = opal_argv_split(buffer, ' ');
if (NULL == argv) {
return NULL;
}
hostname = strdup(argv[1]);
opal_argv_free(argv);
/* All done */
return hostname;
}

Просмотреть файл

@ -36,11 +36,6 @@ const char *mca_pls_tm_component_version_string =
"Open MPI tm pls MCA component version " ORTE_VERSION;
/*
* Local variable
*/
static int param_priority = -1;
/*
* Local function
@ -54,48 +49,51 @@ static struct orte_pls_base_module_1_0_0_t *pls_tm_init(int *priority);
* and pointers to our public functions in it
*/
orte_pls_base_component_1_0_0_t mca_pls_tm_component = {
/* First, the mca_component_t struct containing meta information
about the component itself */
orte_pls_tm_component_t mca_pls_tm_component = {
{
/* Indicate that we are a pls v1.0.0 component (which also
implies a specific MCA version) */
/* First, the mca_component_t struct containing meta information
about the component itself */
ORTE_PLS_BASE_VERSION_1_0_0,
{
/* Indicate that we are a pls v1.0.0 component (which also
implies a specific MCA version) */
ORTE_PLS_BASE_VERSION_1_0_0,
/* Component name and version */
/* Component name and version */
"tm",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
"tm",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
pls_tm_open,
NULL
},
/* Component open and close functions */
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
true
},
pls_tm_open,
NULL
},
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
true
},
/* Initialization / querying functions */
pls_tm_init
/* Initialization / querying functions */
pls_tm_init
}
};
static int pls_tm_open(void)
{
param_priority =
mca_base_param_register_int("pls", "tm", "priority", NULL, 75);
mca_base_component_t *comp = &mca_pls_tm_component.super.pls_version;
mca_base_param_reg_int(comp, "debug", "Enable debugging of TM pls",
false, false, 0, &mca_pls_tm_component.debug);
mca_base_param_reg_int(comp, "priority", "Default selection priority",
false, false, 75, &mca_pls_tm_component.priority);
mca_base_param_reg_string(comp, "orted", "Command to use to start proxy orted",
false, false, "orted", &mca_pls_tm_component.orted);
return ORTE_SUCCESS;
}
@ -107,10 +105,7 @@ static struct orte_pls_base_module_1_0_0_t *pls_tm_init(int *priority)
if (NULL != getenv("PBS_ENVIRONMENT") &&
NULL != getenv("PBS_JOBID")) {
mca_base_param_lookup_int(param_priority, priority);
opal_output(orte_pls_base.pls_output,
"pls:tm: available for selection, priority %d", *priority);
*priority = mca_pls_tm_component.priority;
return &orte_pls_tm_module;
}

Просмотреть файл

@ -28,7 +28,13 @@
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#include <errno.h>
#include <tm.h>
#include "pls_tm.h"
#include "include/orte_constants.h"
#include "include/orte_types.h"
#include "opal/util/argv.h"
@ -48,7 +54,7 @@
#include "mca/soh/soh.h"
#include "mca/rml/rml.h"
#include "mca/ns/ns.h"
#include "pls_tm.h"
#include "opal/runtime/opal_progress.h"
/*
@ -59,10 +65,9 @@ static int pls_tm_terminate_job(orte_jobid_t jobid);
static int pls_tm_terminate_proc(const orte_process_name_t *name);
static int pls_tm_finalize(void);
static void do_wait_proc(pid_t pid, int status, void* cbdata);
static int kill_tids(tm_task_id *tids, orte_process_name_t *names,
size_t num_tids);
static int pls_tm_connect(void);
static int pls_tm_disconnect(void);
static int pls_tm_start_proc(char *nodename, int argc, char **argv, char **env);
/*
* Global variable
@ -73,129 +78,261 @@ orte_pls_base_module_1_0_0_t orte_pls_tm_module = {
pls_tm_terminate_proc,
pls_tm_finalize
};
bool orte_pls_tm_connected = false;
extern char **environ;
#define NUM_SIGNAL_POLL_ITERS 50
/*
* Local variables
*/
static bool wait_cb_set = false;
static pid_t child_pid = -1;
static int pls_tm_launch(orte_jobid_t jobid)
static int
pls_tm_launch(orte_jobid_t jobid)
{
orte_jobid_t *save;
opal_list_t nodes;
opal_list_item_t* item;
size_t num_nodes;
orte_vpid_t vpid;
int node_name_index;
int proc_name_index;
char *jobid_string;
char *uri, *param;
char **argv;
int argc;
int rc;
int id;
/* query the list of nodes allocated to the job - don't need the entire
* mapping - as the daemon/proxy is responsibe for determining the apps
* to launch on each node.
*/
OBJ_CONSTRUCT(&nodes, opal_list_t);
/* Copy the jobid */
save = malloc(sizeof(orte_jobid_t));
if (NULL == save) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
rc = orte_ras_base_node_query_alloc(&nodes, jobid);
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
memcpy(save, &jobid, sizeof(orte_jobid_t));
/* Child */
/*
* Allocate a range of vpids for the daemons.
*/
num_nodes = opal_list_get_size(&nodes);
if (num_nodes == 0) {
return ORTE_ERR_BAD_PARAM;
}
rc = orte_ns.reserve_range(0, num_nodes, &vpid);
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
opal_output(orte_pls_base.pls_output,
"pls:tm:launch: launching child to do the work");
child_pid = fork();
if (0 == child_pid) {
if (ORTE_SUCCESS != orte_pls_tm_child_init() ||
ORTE_SUCCESS != orte_pls_tm_child_launch(jobid) ||
ORTE_SUCCESS != orte_pls_tm_child_wait(jobid) ||
ORTE_SUCCESS != orte_pls_tm_child_finalize()) {
/* Bogus logic just to stop at the first failure */
child_pid++;
/* need integer value for command line parameter */
asprintf(&jobid_string, "%lu", (unsigned long) jobid);
/*
* start building argv array
*/
argv = NULL;
argc = 0;
/* add the daemon command (as specified by user) */
opal_argv_append(&argc, &argv, mca_pls_tm_component.orted);
opal_argv_append(&argc, &argv, "--no-daemonize");
/* check for debug flags */
id = mca_base_param_register_int("orte","debug",NULL,NULL,0);
mca_base_param_lookup_int(id,&rc);
if (rc) {
opal_argv_append(&argc, &argv, "--debug");
}
id = mca_base_param_register_int("orte","debug","daemons",NULL,0);
mca_base_param_lookup_int(id,&rc);
if (rc) {
opal_argv_append(&argc, &argv, "--debug-daemons");
}
id = mca_base_param_register_int("orte","debug","daemons_file",NULL,0);
mca_base_param_lookup_int(id,&rc);
if (rc) {
opal_argv_append(&argc, &argv, "--debug-daemons-file");
}
/* proxy information */
opal_argv_append(&argc, &argv, "--bootproxy");
opal_argv_append(&argc, &argv, jobid_string);
opal_argv_append(&argc, &argv, "--name");
proc_name_index = argc;
opal_argv_append(&argc, &argv, "");
/* tell the daemon how many procs are in the daemon's job */
opal_argv_append(&argc, &argv, "--num_procs");
asprintf(&param, "%lu", (unsigned long)(vpid + num_nodes));
opal_argv_append(&argc, &argv, param);
free(param);
/* tell the daemon the starting vpid of the daemon's job */
opal_argv_append(&argc, &argv, "--vpid_start");
opal_argv_append(&argc, &argv, "0");
opal_argv_append(&argc, &argv, "--nodename");
node_name_index = argc;
opal_argv_append(&argc, &argv, "");
/* pass along the universe name and location info */
opal_argv_append(&argc, &argv, "--universe");
asprintf(&param, "%s@%s:%s", orte_universe_info.uid,
orte_universe_info.host, orte_universe_info.name);
opal_argv_append(&argc, &argv, param);
free(param);
/* setup ns contact info */
opal_argv_append(&argc, &argv, "--nsreplica");
if (NULL != orte_process_info.ns_replica_uri) {
uri = strdup(orte_process_info.ns_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(uri);
free(param);
/* setup gpr contact info */
opal_argv_append(&argc, &argv, "--gprreplica");
if (NULL != orte_process_info.gpr_replica_uri) {
uri = strdup(orte_process_info.gpr_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(uri);
free(param);
if (mca_pls_tm_component.debug) {
param = opal_argv_join(argv, ' ');
if (NULL != param) {
opal_output(0, "pls:tm: final top-level argv:");
opal_output(0, "pls:tm: %s", param);
free(param);
}
exit(0);
}
printf("tm child PID: %d\n", child_pid);
fflush(stdout);
/* Parent */
rc = pls_tm_connect();
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
orte_wait_cb(child_pid, do_wait_proc, save);
wait_cb_set = true;
/*
* Iterate through each of the nodes and spin
* up a daemon.
*/
for(item = opal_list_get_first(&nodes);
item != opal_list_get_end(&nodes);
item = opal_list_get_next(item)) {
orte_ras_node_t* node = (orte_ras_node_t*)item;
orte_process_name_t* name;
char* name_string;
char** env;
char* var;
return ORTE_SUCCESS;
/* setup node name */
argv[node_name_index] = node->node_name;
/* initialize daemons process name */
rc = orte_ns.create_process_name(&name, node->node_cellid, 0, vpid);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* setup per-node options */
if (mca_pls_tm_component.debug) {
opal_output(0, "pls:tm: launching on node %s\n",
node->node_name);
}
/* setup process name */
rc = orte_ns.get_proc_name_string(&name_string, name);
if (ORTE_SUCCESS != rc) {
opal_output(0, "orte_pls_tm: unable to create process name");
exit(-1);
}
argv[proc_name_index] = name_string;
/* setup environment */
env = opal_argv_copy(environ);
var = mca_base_param_environ_variable("seed",NULL,NULL);
opal_setenv(var, "0", true, &env);
/* set the progress engine schedule for this node.
* if node_slots is set to zero, then we default to
* NOT being oversubscribed
*/
if (node->node_slots > 0 &&
node->node_slots_inuse > node->node_slots) {
if (mca_pls_tm_component.debug) {
opal_output(0, "pls:tm: oversubscribed -- setting mpi_yield_when_idle to 1");
}
var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(var, "1", true, &env);
} else {
if (mca_pls_tm_component.debug) {
opal_output(0, "pls:tm: not oversubscribed -- setting mpi_yield_when_idle to 0");
}
var = mca_base_param_environ_variable("mpi", NULL, "yield_when_idle");
opal_setenv(var, "0", true, &env);
}
free(var);
/* save the daemons name on the node */
if (ORTE_SUCCESS != (rc = orte_pls_base_proxy_set_node_name(node,jobid,name))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* exec the daemon */
if (mca_pls_tm_component.debug) {
param = opal_argv_join(argv, ' ');
if (NULL != param) {
opal_output(0, "pls:tm: executing: %s", param);
free(param);
}
}
/* BWB - fill me in */
rc = pls_tm_start_proc(node->node_name, argc, argv, env);
if (ORTE_SUCCESS != rc) {
opal_output(0, "pls:tm: start_procs returned error %d", rc);
goto cleanup;
}
vpid++;
free(name);
}
rc = pls_tm_disconnect();
cleanup:
while (NULL != (item = opal_list_remove_first(&nodes))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&nodes);
return rc;
}
static int pls_tm_terminate_job(orte_jobid_t jobid)
static int
pls_tm_terminate_job(orte_jobid_t jobid)
{
struct tm_roots tm_root;
tm_task_id *tids;
orte_process_name_t *names;
size_t size;
int ret;
/* If we have a child, that child is potentially sitting inside
tm_poll(), and we won't be able to tm_init(). Sigh. So kill
the child. */
if (child_pid > 0) {
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate_job: killing tm shephard");
kill(child_pid, SIGKILL);
waitpid(child_pid, NULL, 0);
child_pid = -1;
sleep(1);
}
/* Open up our connection to tm. Note that we may be called from
launch, above, in which case we don't need to tm_init */
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate_job: killing jobid %d", jobid);
if (!orte_pls_tm_connected) {
ret = tm_init(NULL, &tm_root);
if (TM_SUCCESS != ret) {
ret = ORTE_ERR_RESOURCE_BUSY;
ORTE_ERROR_LOG(ret);
return ret;
}
}
/* Get the TIDs from the registry */
ret = orte_pls_tm_get_tids(jobid, &tids, &names, &size);
if (ORTE_SUCCESS == ret && size > 0) {
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate_job: got %d tids from registry", size);
ret = kill_tids(tids, names, size);
if (NULL != names) {
free(names);
}
if (NULL != tids) {
free(tids);
}
} else {
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate_job: got no tids from registry -- nothing to kill");
}
/* All done */
if (!orte_pls_tm_connected) {
tm_finalize();
}
return ret;
return orte_pls_base_proxy_terminate_job(jobid);
}
/*
* TM can't kill individual processes -- PBS will kill the entire job
*/
static int pls_tm_terminate_proc(const orte_process_name_t *name)
static int
pls_tm_terminate_proc(const orte_process_name_t *name)
{
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate_proc: not supported");
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
return ORTE_ERR_NOT_SUPPORTED;
}
@ -203,176 +340,190 @@ static int pls_tm_terminate_proc(const orte_process_name_t *name)
/*
* Free stuff
*/
static int pls_tm_finalize(void)
static int
pls_tm_finalize(void)
{
if (wait_cb_set) {
orte_wait_cb_cancel(child_pid);
}
/* cleanup any pending recvs */
orte_rml.recv_cancel(ORTE_RML_NAME_ANY, ORTE_RML_TAG_RMGR_CLNT);
return ORTE_SUCCESS;
}
static void do_wait_proc(pid_t pid, int status, void *cbdata)
static int
pls_tm_connect(void)
{
orte_jobid_t *jobid = (orte_jobid_t *) cbdata;
int ret;
struct tm_roots tm_root;
int count, progress;
printf("Child TM proc has exited!\n");
fflush(stdout);
/* try a couple times to connect - might get busy signals every
now and then */
for (count = 0 ; count < 10; ++count) {
ret = tm_init(NULL, &tm_root);
if (TM_SUCCESS == ret) {
return ORTE_SUCCESS;
}
free(cbdata);
for (progress = 0 ; progress < 10 ; ++progress) {
opal_progress();
#if HAVE_SCHED_YIELD
sched_yield();
#endif
}
}
return ORTE_ERR_RESOURCE_BUSY;
}
/*
* Kill a bunch of tids. Don't care about errors here -- just make a
* best attempt to kill kill kill; if we fail, oh well.
*/
static int kill_tids(tm_task_id *tids, orte_process_name_t *names, size_t size)
static int
pls_tm_disconnect(void)
{
size_t i;
int j, ret, local_errno, exit_status;
tm_finalize();
return ORTE_SUCCESS;
}
static char **tm_hostnames = NULL;
static tm_node_id *tm_node_ids = NULL;
static int num_tm_hostnames, num_node_ids;
/*
* For a given TM node ID, get the string hostname corresponding to
* it.
*/
static char*
get_tm_hostname(tm_node_id node)
{
int ret, local_errno;
char *hostname;
tm_event_t event;
bool died;
char buffer[256];
char **argv;
for (i = 0; i < size; ++i) {
died = false;
/* Get the info string corresponding to this TM node ID */
/* First, kill with SIGTERM */
ret = tm_rescinfo(node, buffer, sizeof(buffer) - 1, &event);
if (TM_SUCCESS != ret) {
return NULL;
}
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate:kill_tids: killing tid %d", tids[i]);
ret = tm_kill(tids[i], SIGTERM, &event);
/* Now wait for that event to happen */
/* If we didn't find the tid, then just continue -- it may
have exited on its own */
ret = tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
if (TM_SUCCESS != ret) {
return NULL;
}
if (TM_ENOTFOUND == ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:terminate:kill_tids: tid %d not found (already dead?)",
tids[i]);
died = true;
} else if (TM_SUCCESS != ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tm_kill failed with %d", ret);
ret = ORTE_ERROR;
ORTE_ERROR_LOG(ret);
return ret;
}
if (!died) {
tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: killed tid %d with SIGTERM", tids[i]);
/* According to the TM man page, we get back a space-separated
string array. The hostname is the second item. Use a cheap
trick to get it. */
/* Did it die? */
buffer[sizeof(buffer) - 1] = '\0';
argv = opal_argv_split(buffer, ' ');
if (NULL == argv) {
return NULL;
}
hostname = strdup(argv[1]);
opal_argv_free(argv);
ret = tm_obit(tids[i], &exit_status, &event);
if (TM_SUCCESS != ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tm_obit failed with %d", ret);
ret = ORTE_ERROR;
ORTE_ERROR_LOG(ret);
return ret;
}
/* All done */
tm_poll(TM_NULL_EVENT, &event, 0, &local_errno);
return hostname;
}
/* If it's dead, save the state */
if (TM_NULL_EVENT != event) {
died = true;
}
/* It didn't seem to die right away; poll a few times */
else {
for (j = 0; j < NUM_SIGNAL_POLL_ITERS; ++j) {
tm_poll(TM_NULL_EVENT, &event, 0, &local_errno);
if (TM_NULL_EVENT != event) {
died = true;
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tid %d died", tids[i]);
break;
}
#if defined(WIN32)
sleep(1);
#else
usleep(1);
#endif
}
/* No, it did not die. Try with SIGKILL */
if (!died) {
ret = tm_kill(tids[i], SIGKILL, &event);
if (TM_SUCCESS != ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tm_kill failed with %d",
ret);
ret = ORTE_ERROR;
ORTE_ERROR_LOG(ret);
return ret;
}
tm_poll(TM_NULL_EVENT, &event, 1, &local_errno);
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: killed tid %d with SIGKILL",
tids[i]);
/* Did it die this time? */
ret = tm_obit(tids[i], &exit_status, &event);
if (TM_SUCCESS != ret) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tm_obit failed with %d",
ret);
ret = ORTE_ERROR;
ORTE_ERROR_LOG(ret);
return ret;
}
tm_poll(TM_NULL_EVENT, &event, 0, &local_errno);
/* No -- poll a few times -- just to try to clean it
up... If we don't get it here, oh well. Just let
the resources hang; TM will clean them up when the
job completed */
if (TM_NULL_EVENT == event) {
for (j = 0; j < NUM_SIGNAL_POLL_ITERS; ++j) {
tm_poll(TM_NULL_EVENT, &event, 0, &local_errno);
if (TM_NULL_EVENT != event) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tid %d (finally) died",
tids[i]);
died = true;
break;
}
#if defined(WIN32)
sleep(1);
#else
usleep(1);
#endif
}
if (j >= NUM_SIGNAL_POLL_ITERS) {
opal_output(orte_pls_base.pls_output,
"pls:tm:kill: tid %d did not die!",
tids[i]);
}
}
}
}
}
static int
query_tm_hostnames(void)
{
char *h;
int i, ret;
/* If it's dead, update the registry */
/* Get the list of nodes allocated in this PBS job */
if (died) {
ret = orte_soh.set_proc_soh(&names[i],
ORTE_PROC_STATE_TERMINATED,
exit_status);
}
ret = tm_nodeinfo(&tm_node_ids, &num_node_ids);
if (TM_SUCCESS != ret) {
return ORTE_ERR_NOT_FOUND;
}
/* TM "nodes" may actually correspond to PBS "VCPUs", which means
there may be multiple "TM nodes" that correspond to the same
physical node. This doesn't really affect what we're doing
here (we actually ignore the fact that they're duplicates --
slightly inefficient, but no big deal); just mentioned for
completeness... */
tm_hostnames = NULL;
num_tm_hostnames = 0;
for (i = 0; i < num_node_ids; ++i) {
h = get_tm_hostname(tm_node_ids[i]);
opal_argv_append(&num_tm_hostnames, &tm_hostnames, h);
free(h);
}
/* All done */
return ORTE_SUCCESS;
}
static int
do_tm_resolve(char *hostname, tm_node_id *tnodeid)
{
int i, ret;
/* Have we already queried TM for all the node info? */
if (NULL == tm_hostnames) {
ret = query_tm_hostnames();
if (ORTE_SUCCESS != ret) {
return ret;
}
}
/* Find the TM ID of the hostname that we're looking for */
for (i = 0; i < num_tm_hostnames; ++i) {
if (0 == strcmp(hostname, tm_hostnames[i])) {
*tnodeid = tm_node_ids[i];
opal_output(orte_pls_base.pls_output,
"pls:tm:launch: resolved host %s to node ID %d",
hostname, tm_node_ids[i]);
break;
}
}
/* All done */
if (i < num_tm_hostnames) {
ret = ORTE_SUCCESS;
} else {
ret = ORTE_ERR_NOT_FOUND;
}
return ret;
}
static int
pls_tm_start_proc(char *nodename, int argc, char **argv, char **env)
{
int ret, local_err;
tm_node_id node_id;
tm_task_id task_id;
tm_event_t event;
/* get the tm node id for this node */
ret = do_tm_resolve(nodename, &node_id);
if (ORTE_SUCCESS != ret) return ret;
ret = tm_spawn(argc, argv, env, node_id, &task_id, &event);
if (TM_SUCCESS != ret) return ORTE_ERROR;
ret = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
if (TM_SUCCESS != ret) {
errno = local_err;
return ORTE_ERR_IN_ERRNO;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -1,157 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "include/orte_constants.h"
#include "opal/util/output.h"
#include "mca/mca.h"
#include "mca/base/base.h"
#include "mca/pls/base/base.h"
#include "mca/ns/ns.h"
#include "mca/gpr/gpr.h"
#include "mca/soh/soh_types.h"
#include "mca/errmgr/errmgr.h"
#include "pls_tm.h"
#define TID_KEY "orte-pls-tm-tid"
/**
* Save the TID of a given process
*/
int orte_pls_tm_put_tid(const orte_process_name_t* name,
tm_task_id tid, int state)
{
orte_gpr_value_t* values[1];
orte_gpr_value_t value;
orte_gpr_keyval_t kv_tid = {{OBJ_CLASS(orte_gpr_keyval_t),0},TID_KEY,ORTE_UINT32};
orte_gpr_keyval_t kv_state = {{OBJ_CLASS(orte_gpr_keyval_t),0},ORTE_PROC_STATE_KEY,ORTE_PROC_STATE};
orte_gpr_keyval_t* keyvals[2];
int i, rc;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&value.segment, name->jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&value.tokens, &value.num_tokens, (orte_process_name_t*) name))) {
ORTE_ERROR_LOG(rc);
return rc;
}
kv_tid.value.ui32 = tid;
kv_state.value.proc_state = state;
keyvals[0] = &kv_tid;
keyvals[1] = &kv_state;
value.keyvals = keyvals;
value.cnt = 2;
value.addr_mode = ORTE_GPR_OVERWRITE;
values[0] = &value;
rc = orte_gpr.put(1, values);
free(value.segment);
for (i = 0; i < value.num_tokens; ++i) {
free(value.tokens[i]);
}
free(value.tokens);
return rc;
}
/**
* Retreive all process tids for the specified job.
*/
#include <unistd.h>
int orte_pls_tm_get_tids(orte_jobid_t jobid, tm_task_id **tids,
orte_process_name_t **names, size_t* size)
{
char *segment = NULL;
char *keys[3];
orte_gpr_value_t** values = NULL;
int i, j, num_values = 0;
int rc;
/* Zero out in case of error */
*tids = NULL;
*names = NULL;
*size = 0;
/* Query the job segment on the registry */
if (ORTE_SUCCESS !=
(rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
keys[0] = TID_KEY;
keys[1] = ORTE_PROC_NAME_KEY;
keys[2] = NULL;
rc = orte_gpr.get(
ORTE_GPR_KEYS_AND,
segment,
NULL,
keys,
&num_values,
&values
);
if (rc != ORTE_SUCCESS) {
free(segment);
return rc;
}
/* If we got values back (both TID and the process names have to
exist), then process them */
if (num_values > 0) {
*tids = malloc(sizeof(tm_task_id) * num_values);
*names = malloc(sizeof(orte_process_name_t) * num_values);
if (NULL == *tids || NULL == *names) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
for (i = 0; i < num_values; ++i) {
for (j = 0; j < values[i]->cnt; ++j) {
if (0 == strcmp(values[i]->keyvals[j]->key, TID_KEY)) {
(*tids)[i] = values[i]->keyvals[j]->value.ui32;
} else if (0 == strcmp(values[i]->keyvals[j]->key,
ORTE_PROC_NAME_KEY)) {
(*names)[i] = values[i]->keyvals[j]->value.proc;
}
}
}
*size = num_values;
}
cleanup:
if (NULL != values) {
for (i = 0; i < num_values; ++i) {
OBJ_RELEASE(values[i]);
}
free(values);
}
if (NULL != segment) {
free(segment);
}
return rc;
}

Просмотреть файл

Просмотреть файл

@ -1,2 +0,0 @@
jsquyres
brbarret