1
1
openmpi/orte/runtime/orte_setup_hnp.c
George Bosilca f52c10d18e And ORTE is ready for prime-time. All Windows tricks are in:
- use the OPAL functions for PATH and environment variables
- make all headers C++ friendly
- no unamed structures
- no implicit cast.

Plus a full implementation for the orte_wait functions.

This commit was SVN r11347.
2006-08-23 03:32:36 +00:00

589 строки
19 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Establish a Head Node Process on a cluster's front end
*/
#include "orte_config.h"
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifdef HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#include <fcntl.h>
#include "orte/orte_constants.h"
#include "opal/event/event.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "opal/util/argv.h"
#include "opal/util/opal_environ.h"
#include "opal/util/output.h"
#include "opal/util/path.h"
#include "opal/util/os_path.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/dss/dss.h"
#include "orte/runtime/orte_wait.h"
#include "orte/util/univ_info.h"
#include "orte/util/sys_info.h"
#include "orte/util/proc_info.h"
#include "orte/util/session_dir.h"
#include "orte/util/universe_setup_file_io.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rds/rds_types.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/orte_setup_hnp.h"
#if !defined(__WINDOWS__)
extern char **environ;
#endif /* !defined(__WINDOWS__) */
/* Local condition variables and mutex
*/
static opal_mutex_t orte_setup_hnp_mutex;
static opal_condition_t orte_setup_hnp_condition;
/* Local return code */
static int orte_setup_hnp_rc;
/* Local uri storage */
static char *orte_setup_hnp_orted_uri;
static orte_setup_hnp_cb_data_t orte_setup_hnp_cbdata;
/*
* NON-BLOCKING RECEIVER
*/
static void orte_setup_hnp_recv(int status, orte_process_name_t* sender,
orte_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/*
* PID WAIT CALLBACK
*/
static void orte_setup_hnp_wait(pid_t wpid, int status, void *data);
/*
* ORTE_SETUP_HNP
*/
int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
{
char **argv, *param, *uri, *uid, *hn=NULL;
char *path, *name_string, *orteprobe;
int argc, rc=ORTE_SUCCESS, id, intparam;
pid_t pid;
bool can_launch=false, on_gpr=false;
orte_cellid_t cellid=ORTE_CELLID_MAX, *cptr;
orte_jobid_t jobid;
orte_vpid_t vpid;
orte_std_cntr_t i, j, k, cnt=0;
orte_gpr_value_t **values=NULL, *value;
orte_gpr_keyval_t **keyvals;
char *keys[4], *tokens[3], *cellname;
struct timeval tv;
struct timespec ts;
bool infrastructure = true, *bptr, tf_flag;
/* get the nodename for the headnode of the target cluster */
if (NULL == headnode) { /* not provided, so try to look it up */
tokens[0] = target_cluster;
tokens[1] = NULL;
keys[0] = ORTE_RDS_FE_NAME;
keys[1] = ORTE_RDS_FE_SSH;
keys[2] = ORTE_CELLID_KEY;
keys[3] = NULL;
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_RESOURCE_SEGMENT,
tokens, keys, &cnt, &values))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */
goto MOVEON;
}
on_gpr = true;
/* need to decide what to do if more than value found. Some
* clusters have more than one head node, so which one do
* we choose? For now, just take the first one returned.
*/
keyvals = values[0]->keyvals;
for (i=0; i < values[0]->cnt; i++) {
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_NAME)) {
hn = strdup((const char*)keyvals[i]->value->data);
continue;
}
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_SSH)) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, keyvals[i]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
can_launch = *bptr;
continue;
}
if (0 == strcmp(keyvals[i]->key, ORTE_CELLID_KEY)) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyvals[i]->value, ORTE_CELLID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
cellid = *cptr;
continue;
}
}
goto MOVEON;
} else { /* lookup the headnode's cellid */
hn = strdup(headnode);
keys[0] = ORTE_RDS_FE_NAME;
keys[1] = ORTE_RDS_FE_SSH;
keys[2] = ORTE_CELLID_KEY;
keys[3] = NULL;
rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
ORTE_RESOURCE_SEGMENT,
NULL, keys, &cnt, &values);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* Nothing found */
if (0 == cnt || 0 == values[0]->cnt) {
goto MOVEON;
}
on_gpr = true;
for (i=0; i < cnt; i++) {
keyvals = values[i]->keyvals;
for (j=0; j < values[i]->cnt; j++) {
if ((0 == strcmp(keyvals[j]->key, ORTE_RDS_FE_NAME)) &&
0 == strcmp((const char*)keyvals[j]->value->data, headnode)) {
/* okay, this is the right cell - now need to find
* the ssh flag (if provided) and cellid
*/
for (k=0; k < values[i]->cnt; k++) {
if (0 == strcmp(keyvals[k]->key, ORTE_RDS_FE_SSH)) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&bptr, keyvals[i]->value, ORTE_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
can_launch = *bptr;
continue;
}
if (0 == strcmp(keyvals[k]->key, ORTE_CELLID_KEY)) {
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&cptr, keyvals[i]->value, ORTE_CELLID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
cellid = *cptr;
continue;
}
}
goto MOVEON;
}
}
}
}
MOVEON:
if (NULL != values) {
for (i=0; i < cnt; i++)
OBJ_RELEASE(values[i]);
free(values);
}
if (!on_gpr && (NULL != target_cluster || NULL != headnode)) {
/* if we couldn't find anything about this cell on the gpr, then
* we need to put the required headnode data on the registry. We need
* it to be there so other functions/processes can find it, if needed.
* User must provide either a target_cluster name (which then must be
* synonymous with the headnode name), a headnode name (on a named or
* unnamed target_cluster), or both.
*/
/* get new cellid for this site/resource */
if (NULL != target_cluster) {
cellname = strdup(target_cluster);
} else {
/* if the target_cluster was NULL, then headnode CAN'T be NULL
* or else we wouldn't get here
*/
cellname = strdup(headnode);
}
/* can't know the site name, so it becomes "unknown" */
rc = orte_ns.create_cellid(&cellid, "unknown", cellname);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
free(cellname);
return rc;
}
/*
* Store the cell info on the resource segment of the registry
*/
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR,
ORTE_RESOURCE_SEGMENT, 4, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_schema.get_node_tokens(&(value->tokens), &(value->num_tokens), cellid, cellname);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
/* Set Cell Name */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]), ORTE_RDS_NAME, ORTE_STRING, cellname))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
/* Set Cell ID */
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[1]), ORTE_CELLID_KEY, ORTE_CELLID, &cellid))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
/* Set Front End Name */
if (NULL == headnode) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]), ORTE_RDS_FE_NAME, ORTE_STRING, cellname))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[2]), ORTE_RDS_FE_NAME, ORTE_STRING, headnode))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
}
/* Asssume ability to ssh to front end node*/
tf_flag = true;
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[3]), ORTE_RDS_FE_SSH, ORTE_BOOL, &tf_flag))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
/* Place value in GPR */
rc = orte_gpr.put(1, &value);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
OBJ_RELEASE(value);
free(cellname);
can_launch = true;
}
if (!can_launch || ORTE_CELLID_MAX == cellid) {
return ORTE_ERR_UNREACH;
}
/* get the user's name on the headnode */
if (NULL == username) {
uid = strdup(orte_system_info.user);
} else {
uid = strdup(username);
}
/* SETUP TO LAUNCH PROBE */
/* setup the conditioned wait and mutex variables */
OBJ_CONSTRUCT(&orte_setup_hnp_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&orte_setup_hnp_condition, opal_condition_t);
/* get a jobid for the probe */
rc = orte_ns.create_jobid(&jobid);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* get a vpid for the probe */
rc = orte_ns.reserve_range(jobid, 1, &vpid);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* initialize probe's process name... */
rc = orte_ns.create_process_name(&(orte_setup_hnp_cbdata.name), cellid, jobid, vpid);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* ...and get string representation */
rc = orte_ns.get_proc_name_string(&name_string, orte_setup_hnp_cbdata.name);
if (ORTE_SUCCESS != rc ) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* setup callback data on sigchild */
if (NULL != target_cluster) {
orte_setup_hnp_cbdata.target_cluster = strdup(target_cluster);
} else {
orte_setup_hnp_cbdata.target_cluster = NULL;
}
orte_setup_hnp_cbdata.headnode = strdup(headnode);
orte_setup_hnp_cbdata.jobid = jobid;
/* get name of probe application - just in case user specified something different */
id = mca_base_param_register_string("orteprobe",NULL,NULL,NULL,"orteprobe");
mca_base_param_lookup_string(id, &orteprobe);
/* get rsh/ssh launch mechanism parameters */
id = mca_base_param_register_string("pls","rsh","agent",NULL,"ssh");
mca_base_param_lookup_string(id, &param);
/* Initialize the argv array */
argv = opal_argv_split(param, ' ');
argc = opal_argv_count(argv);
if (argc <= 0) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
rc = ORTE_ERR_BAD_PARAM;
goto CLEANUP;
}
free(param);
/* setup the path */
path = opal_path_findv(argv[0], 0, environ, NULL);
/* add the username and nodename */
opal_argv_append(&argc, &argv, "-l");
opal_argv_append(&argc, &argv, uid);
opal_argv_append(&argc, &argv, hn);
/* add the probe application */
opal_argv_append(&argc, &argv, orteprobe);
/* tell the probe it's name */
opal_argv_append(&argc, &argv, "--name");
opal_argv_append(&argc, &argv, name_string);
/* setup probe's ns contact info */
opal_argv_append(&argc, &argv, "--nsreplica");
if(NULL != orte_process_info.ns_replica_uri) {
uri = strdup(orte_process_info.ns_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* setup probe's gpr contact info */
opal_argv_append(&argc, &argv, "--gprreplica");
if(NULL != orte_process_info.gpr_replica_uri) {
uri = strdup(orte_process_info.gpr_replica_uri);
} else {
uri = orte_rml.get_uri();
}
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* tell the probe who to report to */
uri = orte_rml.get_uri();
asprintf(&param, "\"%s\"", uri);
opal_argv_append(&argc, &argv, "--requestor");
opal_argv_append(&argc, &argv, param);
free(param);
free(uri);
/* pass along any parameters for the head node process
* in case one needs to be created
*/
id = mca_base_param_register_string("scope",NULL,NULL,NULL,"public");
mca_base_param_lookup_string(id, &param);
opal_argv_append(&argc, &argv, "--scope");
opal_argv_append(&argc, &argv, param);
free(param);
id = mca_base_param_register_int("persistent",NULL,NULL,NULL,(int)false);
mca_base_param_lookup_int(id, &intparam);
if (intparam) {
opal_argv_append(&argc, &argv, "--persistent");
}
/* issue the non-blocking recv to get the probe's findings */
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_PROBE,
0, orte_setup_hnp_recv, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
#ifndef __WINDOWS__
/* fork a child to exec the rsh/ssh session */
orte_setup_hnp_rc = ORTE_SUCCESS;
pid = fork();
if (pid < 0) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
if (pid == 0) { /* child */
/* exec the probe launch */
execv(path, argv);
ORTE_ERROR_LOG(ORTE_ERROR);
opal_output(0, "orte_setup_hnp: execv failed with errno=%d\n", errno);
return ORTE_ERROR;
} else { /* parent */
orte_wait_cb(pid, orte_setup_hnp_wait, &orte_setup_hnp_cbdata);
/* block until a timeout occurs or probe dies/calls back */
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + 1000000;
ts.tv_nsec = 0;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
opal_condition_timedwait(&orte_setup_hnp_condition, &orte_setup_hnp_mutex, &ts);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
if (ORTE_SUCCESS == orte_setup_hnp_rc) {
/* Remember if we were infrastructre or not */
id = mca_base_param_find("orte", NULL, "infrastructure");
mca_base_param_lookup_int(id, &intparam);
if ( ((int)true) != intparam) {
infrastructure = false;
}
/* need to restart the local system so it can connect to the remote daemon. */
if (ORTE_SUCCESS != (rc = orte_restart(orte_setup_hnp_cbdata.name, orte_setup_hnp_orted_uri))) {
/** can't use ORTE_ERROR_LOG here as it may no longer be valid. Since we may
* have gotten part way through the shutdown/restart process, we can't have
* any idea of our current state - all we can really do at this point is
* abort
*/
fprintf(stderr, "orte_setup_hnp: aborted during restart of local process\n");
}
/*
* ...and we are now ready to go!
*/
return ORTE_SUCCESS;
}
return orte_setup_hnp_rc;
}
#else
ORTE_ERROR_LOG(ORTE_ERROR);
opal_output(0, "This function has not been implemented in windows yet, file %s line %d\n", __FILE__, __LINE__);
abort();
#endif
CLEANUP:
return rc;
}
static void orte_setup_hnp_recv(int status, orte_process_name_t* sender,
orte_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_std_cntr_t n=1;
int rc;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &orte_setup_hnp_orted_uri, &n, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
orte_setup_hnp_rc = rc;
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
return;
}
orte_setup_hnp_rc = ORTE_SUCCESS;
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
}
static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
{
orte_setup_hnp_cb_data_t *data;
OPAL_THREAD_LOCK(&orte_setup_hnp_mutex);
data = (orte_setup_hnp_cb_data_t*)cbdata;
/* if ssh exited abnormally, print something useful to the user and cleanup
* the registry entries for the HNP jobid.
This should somehow be pushed up to the calling level, but we
don't really have a way to do that just yet.
*/
if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) {
/* tell the user something went wrong */
opal_output(0, "ERROR: The probe on head node %s of the %s cluster failed to start as expected.",
data->headnode, data->target_cluster);
opal_output(0, "ERROR: There may be more information available from");
opal_output(0, "ERROR: the remote shell (see above).");
if (WIFEXITED(status)) {
opal_output(0, "ERROR: The probe exited unexpectedly with status %d.",
WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
#ifdef WCOREDUMP
if (WCOREDUMP(status)) {
opal_output(0, "The probe received a signal %d (with core).",
WTERMSIG(status));
} else {
opal_output(0, "The probe received a signal %d.", WTERMSIG(status));
}
#else
opal_output(0, "The probe received a signal %d.", WTERMSIG(status));
#endif /* WCOREDUMP */
} else {
opal_output(0, "No extra status information is available: %d.", status);
}
}
opal_condition_signal(&orte_setup_hnp_condition);
OPAL_THREAD_UNLOCK(&orte_setup_hnp_mutex);
}