1
1
openmpi/orte/runtime/orte_setup_hnp.c
Josh Hursey 4eefb33182 Some param changes:
- Change orte_base_infrastructre to orte_infrastructre to conform with 
  ompi_info's needs
- Move MCA Param registration in ORTE to a centralized function that is 
  called first in orte_init_stage1
- Set the infrastructre flag as an argument to orte_init
- Adjust initalization functions to properly pass down the infrastructre
  flag.

This commit was SVN r7053.
2005-08-26 20:13:35 +00:00

612 строки
20 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Establish a Head Node Process on a cluster's front end
*/
#include "orte_config.h"
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#include <fcntl.h>
#include "include/orte_constants.h"
#include "dps/dps.h"
#include "opal/event/event.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "runtime/orte_wait.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/path.h"
#include "util/univ_info.h"
#include "util/sys_info.h"
#include "util/proc_info.h"
#include "opal/util/os_path.h"
#include "util/session_dir.h"
#include "util/universe_setup_file_io.h"
#include "mca/base/mca_base_param.h"
#include "mca/soh/soh.h"
#include "mca/rml/rml.h"
#include "mca/rds/rds_types.h"
#include "mca/ns/ns.h"
#include "mca/gpr/gpr.h"
#include "mca/errmgr/errmgr.h"
#include "runtime/runtime.h"
#include "runtime/orte_setup_hnp.h"
extern char **environ;
/* Local condition variables and mutex
*/
static opal_mutex_t orte_setup_hnp_mutex;
static opal_condition_t orte_setup_hnp_condition;
/* Local return code */
static int orte_setup_hnp_rc;
/* Local uri storage */
static char *orte_setup_hnp_orted_uri;
static orte_setup_hnp_cb_data_t orte_setup_hnp_cbdata;
/*
* NON-BLOCKING RECEIVER
*/
static void orte_setup_hnp_recv(int status, orte_process_name_t* sender,
orte_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/*
* PID WAIT CALLBACK
*/
static void orte_setup_hnp_wait(pid_t wpid, int status, void *data);
/*
* ORTE_SETUP_HNP
*/
int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
{
#ifndef WIN32
char **argv, *param, *uri, *uid, *hn=NULL;
char *path, *name_string, *orteprobe;
int argc, rc=ORTE_SUCCESS, id, intparam;
pid_t pid;
bool can_launch=false, on_gpr=false;
orte_cellid_t cellid=ORTE_CELLID_MAX;
orte_jobid_t jobid;
orte_vpid_t vpid;
size_t i, j, k, cnt=0;
orte_gpr_value_t **values=NULL, *value;
orte_gpr_keyval_t **keyvals;
char *keys[4], *tokens[3], *cellname;
struct timeval tv;
struct timespec ts;
bool infrastructure = true;
/* get the nodename for the headnode of the target cluster */
if (NULL == headnode) { /* not provided, so try to look it up */
tokens[0] = target_cluster;
tokens[1] = NULL;
keys[0] = ORTE_RDS_FE_NAME;
keys[1] = ORTE_RDS_FE_SSH;
keys[2] = ORTE_CELLID_KEY;
keys[3] = NULL;
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_RESOURCE_SEGMENT,
tokens, keys, &cnt, &values))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */
goto MOVEON;
}
on_gpr = true;
/* need to decide what to do if more than value found. Some
* clusters have more than one head node, so which one do
* we choose? For now, just take the first one returned.
*/
keyvals = values[0]->keyvals;
for (i=0; i < values[0]->cnt; i++) {
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_NAME)) {
hn = strdup(keyvals[i]->value.strptr);
continue;
}
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_SSH)) {
can_launch = keyvals[i]->value.tf_flag;
continue;
}
if (0 == strcmp(keyvals[i]->key, ORTE_CELLID_KEY)) {
cellid = keyvals[i]->value.cellid;
continue;
}
}
goto MOVEON;
} else { /* lookup the headnode's cellid */
hn = strdup(headnode);
keys[0] = ORTE_RDS_FE_NAME;
keys[1] = ORTE_RDS_FE_SSH;
keys[2] = ORTE_CELLID_KEY;
keys[3] = NULL;
rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_RESOURCE_SEGMENT,
NULL, keys, &cnt, &values);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* Nothing found */
if (0 == cnt || 0 == values[0]->cnt) {
goto MOVEON;
}
on_gpr = true;
for (i=0; i < cnt; i++) {
keyvals = values[i]->keyvals;
for (j=0; j < values[i]->cnt; j++) {
if ((0 == strcmp(keyvals[j]->key, ORTE_RDS_FE_NAME)) &&
0 == strcmp(keyvals[j]->value.strptr, headnode)) {
/* okay, this is the right cell - now need to find
* the ssh flag (if provided) and cellid
*/
for (k=0; k < values[i]->cnt; k++) {
if (0 == strcmp(keyvals[k]->key, ORTE_RDS_FE_SSH)) {
can_launch = keyvals[k]->value.tf_flag;
continue;
}
if (0 == strcmp(keyvals[k]->key, ORTE_CELLID_KEY)) {
cellid = keyvals[k]->value.cellid;
continue;
}
}
goto MOVEON;
}
}
}
}
MOVEON:
if (NULL != values) {
for (i=0; i < cnt; i++)
OBJ_RELEASE(values[i]);
free(values);
}
if (!on_gpr && (NULL != target_cluster || NULL != headnode)) {
/* if we couldn't find anything about this cell on the gpr, then
* we need to put the required headnode data on the registry. We need
* it to be there so other functions/processes can find it, if needed.
* User must provide either a target_cluster name (which then must be
* synonymous with the headnode name), a headnode name (on a named or
* unnamed target_cluster), or both.
*/
/* get new cellid for this site/resource */
if (NULL != target_cluster) {
cellname = strdup(target_cluster);
} else {
/* if the target_cluster was NULL, then headnode CAN'T be NULL
* or else we wouldn't get here
*/
cellname = strdup(headnode);
}
/* can't know the site name, so it becomes "unknown" */
rc = orte_ns.create_cellid(&cellid, "unknown", cellname);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
free(cellname);
return rc;
}
/*
* Store the cell info on the resource segment of the registry
*/
value = OBJ_NEW(orte_gpr_value_t);
if (NULL == value) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value->addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
value->segment = strdup(ORTE_RESOURCE_SEGMENT);
value->cnt = 4;
value->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*));
if (NULL == value->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i=0; i < value->cnt; i++) {
value->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value->keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
}
/* Set Cell Name */
value->keyvals[0]->key = strdup(ORTE_RDS_NAME);
value->keyvals[0]->type = ORTE_STRING;
value->keyvals[0]->value.strptr = strdup(cellname);
/* Set Cell ID */
value->keyvals[1]->key = strdup(ORTE_CELLID_KEY);
value->keyvals[1]->type = ORTE_CELLID;
value->keyvals[1]->value.cellid = cellid;
/* Set Front End Name */
value->keyvals[2]->key = strdup(ORTE_RDS_FE_NAME);
value->keyvals[2]->type = ORTE_STRING;
if (NULL == headnode) {
value->keyvals[2]->value.strptr = strdup(cellname);
} else {
value->keyvals[2]->value.strptr = strdup(headnode);
}
/* Asssume ability to ssh to front end node*/
value->keyvals[3]->key = strdup(ORTE_RDS_FE_SSH);
value->keyvals[3]->type = ORTE_BOOL;
value->keyvals[3]->value.tf_flag = true;
value->num_tokens = 3;
value->tokens = (char**)malloc(3 * sizeof(char*));
if (NULL == value->tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
rc = orte_schema.get_node_tokens(&value->tokens, &value->num_tokens, cellid, cellname);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* Place tokens in GPR */
rc = orte_gpr.put(1, &value);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
OBJ_RELEASE(value);
free(cellname);
can_launch = true;
}
if (!can_launch || ORTE_CELLID_MAX == cellid) {
return ORTE_ERR_UNREACH;
}
/* get the user's name on the headnode */
if (NULL == username) {
uid = strdup(orte_system_info.user);
} else {
uid = strdup(username);
}
/* SETUP TO LAUNCH PROBE */
/* setup the conditioned wait and mutex variables */
OBJ_CONSTRUCT(&orte_setup_hnp_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&orte_setup_hnp_condition, opal_condition_t);
/* get a jobid for the probe */
rc = orte_ns.create_jobid(&jobid);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get a vpid for the probe */
rc = orte_ns.reserve_range(jobid, 1, &vpid);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* initialize probe's process name... */
rc = orte_ns.create_process_name(&(orte_setup_hnp_cbdata.name), cellid, jobid, vpid);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* ...and get string representation */
rc = orte_ns.get_proc_name_string(&name_string, orte_setup_hnp_cbdata.name);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* setup callback data on sigchild */
if (NULL != target_cluster) {
orte_setup_hnp_cbdata.target_cluster = strdup(target_cluster);
} else {
orte_setup_hnp_cbdata.target_cluster = NULL;
}
orte_setup_hnp_cbdata.headnode = strdup(headnode);
orte_setup_hnp_cbdata.jobid = jobid;
/* get name of probe application - just in case user specified something different */
id = mca_base_param_register_string("orteprobe",NULL,NULL,NULL,"orteprobe");
mca_base_param_lookup_string(id, &orteprobe);
/* get rsh/ssh launch mechanism parameters */
id = mca_base_param_register_string("pls","rsh","agent",NULL,"ssh");
mca_base_param_lookup_string(id, &param);
/* Initialize the argv array */
argv = opal_argv_split(param, ' ');
argc = opal_argv_count(argv);
if (argc <= 0) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
rc = ORTE_ERR_BAD_PARAM;
goto CLEANUP;
}
free(param);
/* setup the path */
path = opal_path_findv(argv[0], 0, environ, NULL);
/* add the username and nodename */
opal_argv_append(&argc, &argv, "-l");
opal_argv_append(&argc, &argv, uid);
opal_argv_append(&argc, &argv, hn);
/* add the probe application */
opal_argv_append(&argc, &argv, orteprobe);
/* tell the probe it's name */
opal_argv_append(&argc, &argv, "--name");
opal_argv_append(&argc, &argv, name_string);
/* setup probe's ns contact info */
opal_argv_append(&argc, &argv, "--nsreplica");
if(NULL != orte_process_info.ns_replica_uri) {
uri = strdup(orte_process_info.ns_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* setup probe's gpr contact info */
opal_argv_append(&argc, &argv, "--gprreplica");
if(NULL != orte_process_info.gpr_replica_uri) {
uri = strdup(orte_process_info.gpr_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* tell the probe who to report to */
uri = orte_rml.get_uri();
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, "--requestor");
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* pass along any parameters for the head node process
* in case one needs to be created
*/
id = mca_base_param_register_string("scope",NULL,NULL,NULL,"public");
mca_base_param_lookup_string(id, &param);
opal_argv_append(&argc, &argv, "--scope");
opal_argv_append(&argc, &argv, param);
free(param);
id = mca_base_param_register_int("persistent",NULL,NULL,NULL,(int)false);
mca_base_param_lookup_int(id, &intparam);
if (intparam) {
opal_argv_append(&argc, &argv, "--persistent");
}
/* issue the non-blocking recv to get the probe's findings */
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PROBE,
0, orte_setup_hnp_recv, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* fork a child to exec the rsh/ssh session */
orte_setup_hnp_rc = ORTE_SUCCESS;
pid = fork();
if (pid < 0) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
if (pid == 0) { /* child */
/* exec the probe launch */
execv(path, argv);
ORTE_ERROR_LOG(ORTE_ERROR);
opal_output(0, "orte_setup_hnp: execv failed with errno=%d\n", errno);
return ORTE_ERROR;
} else { /* parent */
orte_wait_cb(pid, orte_setup_hnp_wait, &orte_setup_hnp_cbdata);
/* block until a timeout occurs or probe dies/calls back */
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + 1000000;
ts.tv_nsec = 0;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
opal_condition_timedwait(&orte_setup_hnp_condition, &orte_setup_hnp_mutex, &ts);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
if (ORTE_SUCCESS == orte_setup_hnp_rc) {
/* Remember if we were infrastructre or not */
id = mca_base_param_find("orte", NULL, "infrastructure");
mca_base_param_lookup_int(id, &intparam);
if ( ((int)true) != intparam) {
infrastructure = false;
}
/* need to restart the local system so it can connect to the remote daemon.
* we only want to clear the run-time itself - we cannot close the OPAL
* utilities, though, or we will lose all of our MCA parameters
*/
orte_system_finalize();
/*
* now set the relevant MCA parameters to point us at the remote daemon...
*/
rc = opal_setenv("OMPI_MCA_gpr_replica_uri",
orte_setup_hnp_orted_uri, true, &environ);
if (ORTE_SUCCESS != rc) {
fprintf(stderr, "orte_setup_hnp: could not set gpr_replica_uri in environ\n");
return rc;
}
rc = opal_setenv("OMPI_MCA_ns_replica_uri",
orte_setup_hnp_orted_uri, true, &environ);
if (ORTE_SUCCESS != rc) {
fprintf(stderr, "orte_setup_hnp: could not set ns_replica_uri in environ\n");
return rc;
}
opal_unsetenv("OMPI_MCA_seed", &environ);
rc = opal_setenv("OMPI_MCA_universe_uri",
orte_setup_hnp_orted_uri, true, &environ);
if (ORTE_SUCCESS != rc) {
fprintf(stderr, "orte_setup_hnp: could not set universe_uri in environ\n");
return rc;
}
/*
* ...re-init ourselves...
*/
rc = orte_system_init(infrastructure);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/*
* ...and we are now ready to go!
*/
return ORTE_SUCCESS;
}
return orte_setup_hnp_rc;
}
CLEANUP:
return rc;
#else
printf ("This function has not been implemented in windows yet, file %s line %d\n", __FILE__, __LINE__);
abort();
#endif
}
static void orte_setup_hnp_recv(int status, orte_process_name_t* sender,
orte_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
size_t n=1;
int rc;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
if (ORTE_SUCCESS != (rc = orte_dps.unpack(buffer, &orte_setup_hnp_orted_uri, &n, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
orte_setup_hnp_rc = rc;
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
return;
}
orte_setup_hnp_rc = ORTE_SUCCESS;
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
}
static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
{
orte_setup_hnp_cb_data_t *data;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
data = (orte_setup_hnp_cb_data_t*)cbdata;
/* if ssh exited abnormally, print something useful to the user and cleanup
* the registry entries for the HNP jobid.
This should somehow be pushed up to the calling level, but we
don't really have a way to do that just yet.
*/
if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) {
/* tell the user something went wrong */
opal_output(0, "ERROR: The probe on head node %s of the %s cluster failed to start as expected.",
data->headnode, data->target_cluster);
opal_output(0, "ERROR: There may be more information available from");
opal_output(0, "ERROR: the remote shell (see above).");
if (WIFEXITED(status)) {
opal_output(0, "ERROR: The probe exited unexpectedly with status %d.",
WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
opal_output(0, "The probe received a signal %d (with core).",
WTERMSIG(status));
} else {
opal_output(0, "The probe received a signal %d.", WTERMSIG(status));
}
#else
opal_output(0, "The probe received a signal %d.", WTERMSIG(status));
#endif /* WCOREDUMP */
} else {
opal_output(0, "No extra status information is available: %d.", status);
}
}
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
}