1
1
openmpi/orte/mca/plm/tm/plm_tm_module.c
2012-04-06 15:43:28 +00:00

563 строки
17 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2007-2012 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <signal.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include <errno.h>
#include <tm.h>
#include "opal/mca/installdirs/installdirs.h"
#include "opal/mca/event/event.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "orte/util/show_help.h"
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/runtime/opal_progress.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/state/state.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h"
#include "plm_tm.h"
/*
* API functions
*/
static int plm_tm_init(void);
static int plm_tm_launch_job(orte_job_t *jdata);
static int plm_tm_terminate_orteds(void);
static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal);
static int plm_tm_finalize(void);
/*
* Local "global" variables
*/
static orte_std_cntr_t launched = 0;
static bool connected = false;
/*
* Global variable
*/
orte_plm_base_module_t orte_plm_tm_module = {
plm_tm_init,
orte_plm_base_set_hnp_name,
plm_tm_launch_job,
NULL,
orte_plm_base_orted_terminate_job,
plm_tm_terminate_orteds,
orte_plm_base_orted_kill_local_procs,
plm_tm_signal_job,
plm_tm_finalize
};
/* Local functions */
static int plm_tm_connect(void);
static void failed_start(int fd, short event, void *arg);
static void launch_daemons(int fd, short args, void *cbdata);
static void poll_spawns(int fd, short args, void *cbdata);
/**
* Init the module
*/
static int plm_tm_init(void)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_plm_base_comm_start())) {
ORTE_ERROR_LOG(rc);
}
/* we assign daemon nodes at launch */
orte_plm_globals.daemon_nodes_assigned_at_launch = true;
/* point to our launch command */
if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_LAUNCH_DAEMONS,
launch_daemons, ORTE_SYS_PRI))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* overwrite the daemons_launched state to point to
* our own local function
*/
if (ORTE_SUCCESS != (rc = orte_state.set_job_state_callback(ORTE_JOB_STATE_DAEMONS_LAUNCHED,
poll_spawns))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return rc;
}
static int plm_tm_launch_job(orte_job_t *jdata)
{
if (ORTE_JOB_CONTROL_RESTART & jdata->controls) {
/* this is a restart situation - skip to the mapping stage */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
} else {
/* new job - set it up */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_INIT);
}
return ORTE_SUCCESS;
}
/* When working in this function, ALWAYS jump to "cleanup" if
* you encounter an error so that orterun will be woken up and
* the job can cleanly terminate
*/
static void launch_daemons(int fd, short args, void *cbdata)
{
orte_job_map_t *map = NULL;
orte_app_context_t *app;
orte_node_t *node;
int proc_vpid_index;
char *param;
char **env = NULL;
char *var;
char **argv = NULL;
char **nodeargv;
int argc = 0;
int rc;
orte_std_cntr_t i;
char *bin_base = NULL, *lib_base = NULL;
tm_event_t *tm_events = NULL;
tm_task_id *tm_task_ids = NULL;
bool failed_launch = true;
mode_t current_umask;
char *nodelist;
char* vpid_string;
orte_job_t *daemons, *jdata;
orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
jdata = state->jdata;
/* if we are launching debugger daemons, then just go
* do it - no new daemons will be launched
*/
if (ORTE_JOB_CONTROL_DEBUGGER_DAEMON & jdata->controls) {
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED);
OBJ_RELEASE(state);
return;
}
/* setup the virtual machine */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
if (ORTE_SUCCESS != (rc = orte_plm_base_setup_virtual_machine(jdata))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if we don't want to launch, then don't attempt to
* launch the daemons - the user really wants to just
* look at the proposed process map
*/
if (orte_do_not_launch) {
/* set the state to indicate the daemons reported - this
* will trigger the daemons_reported event and cause the
* job to move to the following step
*/
jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
ORTE_ACTIVATE_JOB_STATE(daemons, ORTE_JOB_STATE_DAEMONS_REPORTED);
OBJ_RELEASE(state);
return;
}
/* Get the map for this job */
if (NULL == (map = daemons->map)) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto cleanup;
}
if (0 == map->num_new_daemons) {
/* set the state to indicate the daemons reported - this
* will trigger the daemons_reported event and cause the
* job to move to the following step
*/
jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
ORTE_ACTIVATE_JOB_STATE(daemons, ORTE_JOB_STATE_DAEMONS_REPORTED);
OBJ_RELEASE(state);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: launching vm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* Allocate a bunch of TM events to use for tm_spawn()ing */
tm_events = malloc(sizeof(tm_event_t) * map->num_new_daemons);
if (NULL == tm_events) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
tm_task_ids = malloc(sizeof(tm_task_id) * map->num_new_daemons);
if (NULL == tm_task_ids) {
rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* add the daemon command (as specified by user) */
orte_plm_base_setup_orted_cmd(&argc, &argv);
/* create a list of nodes in this launch */
nodeargv = NULL;
for (i = 0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
/* if this daemon already exists, don't launch it! */
if (node->daemon_launched) {
continue;
}
/* add to list */
opal_argv_append_nosize(&nodeargv, node->name);
}
nodelist = opal_argv_join(nodeargv, ',');
opal_argv_free(nodeargv);
/* Add basic orted command line options */
orte_plm_base_orted_append_basic_args(&argc, &argv, "tm",
&proc_vpid_index,
nodelist);
free(nodelist);
if (0 < opal_output_get_verbosity(orte_plm_globals.output)) {
param = opal_argv_join(argv, ' ');
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: final top-level argv:\n\t%s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == param) ? "NULL" : param));
if (NULL != param) free(param);
}
rc = plm_tm_connect();
if (ORTE_SUCCESS != rc) {
goto cleanup;
}
connected = true;
/* Figure out the basenames for the libdir and bindir. There is a
lengthy comment about this in plm_rsh_module.c explaining all
the rationale for how / why we're doing this. */
lib_base = opal_basename(opal_install_dirs.libdir);
bin_base = opal_basename(opal_install_dirs.bindir);
/* setup environment */
env = opal_argv_copy(orte_launch_environ);
/* enable local launch by the orteds */
var = mca_base_param_environ_variable("plm", NULL, NULL);
opal_setenv(var, "rsh", true, &env);
free(var);
/* add our umask -- see big note in orted.c */
current_umask = umask(0);
umask(current_umask);
asprintf(&var, "0%o", current_umask);
opal_setenv("ORTE_DAEMON_UMASK_VALUE", var, true, &env);
free(var);
/* If we have a prefix, then modify the PATH and
LD_LIBRARY_PATH environment variables. We only allow
a single prefix to be specified. Since there will
always be at least one app_context, we take it from
there
*/
app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0);
if (NULL != app->prefix_dir) {
char *newenv;
for (i = 0; NULL != env && NULL != env[i]; ++i) {
/* Reset PATH */
if (0 == strncmp("PATH=", env[i], 5)) {
asprintf(&newenv, "%s/%s:%s",
app->prefix_dir, bin_base, env[i] + 5);
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: resetting PATH: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
newenv));
opal_setenv("PATH", newenv, true, &env);
free(newenv);
}
/* Reset LD_LIBRARY_PATH */
else if (0 == strncmp("LD_LIBRARY_PATH=", env[i], 16)) {
asprintf(&newenv, "%s/%s:%s",
app->prefix_dir, lib_base, env[i] + 16);
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: resetting LD_LIBRARY_PATH: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
newenv));
opal_setenv("LD_LIBRARY_PATH", newenv, true, &env);
free(newenv);
}
}
}
/* Iterate through each of the nodes and spin
* up a daemon.
*/
for (i = 0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
/* if this daemon already exists, don't launch it! */
if (node->daemon_launched) {
continue;
}
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: launching on node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
node->name));
/* setup process name */
rc = orte_util_convert_vpid_to_string(&vpid_string, node->daemon->name.vpid);
if (ORTE_SUCCESS != rc) {
opal_output(0, "plm:tm: unable to get daemon vpid as string");
exit(-1);
}
free(argv[proc_vpid_index]);
argv[proc_vpid_index] = strdup(vpid_string);
free(vpid_string);
/* exec the daemon */
if (0 < opal_output_get_verbosity(orte_plm_globals.output)) {
param = opal_argv_join(argv, ' ');
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: executing:\n\t%s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == param) ? "NULL" : param));
if (NULL != param) free(param);
}
rc = tm_spawn(argc, argv, env, node->launch_id, tm_task_ids + launched, tm_events + launched);
if (TM_SUCCESS != rc) {
orte_show_help("help-plm-tm.txt", "tm-spawn-failed",
true, argv[0], node->name, node->launch_id);
rc = ORTE_ERROR;
goto cleanup;
}
launched++;
}
/* indicate that the daemons for this job were launched */
state->jdata->state = ORTE_JOB_STATE_DAEMONS_LAUNCHED;
/* flag that launch was successful, so far as we currently know */
failed_launch = false;
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm:launch: finished spawning orteds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
cleanup:
/* cleanup */
OBJ_RELEASE(state);
/* check for failed launch - if so, force terminate */
if (failed_launch) {
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
}
static void poll_spawns(int fd, short args, void *cbdata)
{
orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
int i, rc;
bool failed_launch = true;
int local_err;
tm_event_t event;
/* TM poll for all the spawns */
for (i = 0; i < launched; ++i) {
rc = tm_poll(TM_NULL_EVENT, &event, 1, &local_err);
if (TM_SUCCESS != rc) {
opal_output(0, "plm:tm: failed to poll for a spawned daemon, return status = %d", rc);
goto cleanup;
}
if (TM_SUCCESS != local_err) {
opal_output(0, "plm:tm: failed to spawn daemon, error code = %d", local_err );
goto cleanup;
}
}
failed_launch = false;
#if 0
/* set a timer to tell us if one or more daemon's fails to start - use the
* millisec/daemon timeout provided by the user to compute time
*/
if (0 < orte_startup_timeout) {
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
"%s plm:tm: setting startup timer for %d milliseconds",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
orte_startup_timeout));
ORTE_DETECT_TIMEOUT(map->num_new_daemons,
orte_startup_timeout*1000,
-1, failed_start, state->jdata);
}
#endif
cleanup:
/* cleanup */
OBJ_RELEASE(state);
/* check for failed launch - if so, force terminate */
if (failed_launch) {
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
}
}
/**
* Terminate the orteds for a given job
*/
int plm_tm_terminate_orteds(void)
{
int rc;
/* now tell them to die */
if (orte_abnormal_term_ordered) {
/* cannot know if a daemon is able to
* tell us it died, so just ensure they
* all terminate
*/
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_HALT_VM_CMD))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* we need them to "phone home", though,
* so we can know that they have exited
*/
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_exit(ORTE_DAEMON_EXIT_CMD))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}
static int plm_tm_signal_job(orte_jobid_t jobid, int32_t signal)
{
int rc;
/* order them to pass this signal to their local procs */
if (ORTE_SUCCESS != (rc = orte_plm_base_orted_signal_local_procs(jobid, signal))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/*
* Free stuff
*/
static int plm_tm_finalize(void)
{
int rc;
/* cleanup any pending recvs */
if (ORTE_SUCCESS != (rc = orte_plm_base_comm_stop())) {
ORTE_ERROR_LOG(rc);
}
if (connected) {
tm_finalize();
connected = false;
}
return ORTE_SUCCESS;
}
static int plm_tm_connect(void)
{
int ret;
struct tm_roots tm_root;
int count, progress;
/* try a couple times to connect - might get busy signals every
now and then */
for (count = 0 ; count < 10; ++count) {
ret = tm_init(NULL, &tm_root);
if (TM_SUCCESS == ret) {
return ORTE_SUCCESS;
}
for (progress = 0 ; progress < 10 ; ++progress) {
opal_progress();
#if HAVE_SCHED_YIELD
sched_yield();
#endif
}
}
return ORTE_ERR_RESOURCE_BUSY;
}