Checkpoint
This commit was SVN r5740.
Этот коммит содержится в:
родитель
3232200e51
Коммит
ef07d67946
@ -38,6 +38,9 @@
|
||||
|
||||
|
||||
#include "include/orte_constants.h"
|
||||
#include "event/event.h"
|
||||
#include "threads/mutex.h"
|
||||
#include "threads/condition.h"
|
||||
#include "runtime/orte_wait.h"
|
||||
#include "util/argv.h"
|
||||
#include "util/output.h"
|
||||
@ -52,7 +55,9 @@
|
||||
#include "mca/base/mca_base_param.h"
|
||||
#include "mca/soh/soh.h"
|
||||
#include "mca/rml/rml.h"
|
||||
#include "mca/rds/rds_types.h"
|
||||
#include "mca/ns/ns.h"
|
||||
#include "mca/gpr/gpr.h"
|
||||
#include "mca/errmgr/errmgr.h"
|
||||
|
||||
#include "runtime/runtime.h"
|
||||
@ -69,6 +74,11 @@ typedef struct {
|
||||
orte_jobid_t jobid;
|
||||
} orte_setup_hnp_cb_data_t;
|
||||
|
||||
/* Local condition variables and mutex
|
||||
*/
|
||||
static ompi_mutex_t orte_setup_hnp_mutex;
|
||||
static ompi_condition_t orte_setup_hnp_condition;
|
||||
|
||||
static orte_setup_hnp_cb_data_t orte_setup_hnp_cbdata = {NULL, NULL, NULL, 0};
|
||||
|
||||
/*
|
||||
@ -94,15 +104,192 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
|
||||
char *path, *name_string, *orteprobe;
|
||||
int argc, rc=ORTE_SUCCESS, id;
|
||||
pid_t pid;
|
||||
bool can_launch=false, on_gpr=false;
|
||||
orte_cellid_t cellid=ORTE_CELLID_MAX;
|
||||
orte_jobid_t jobid;
|
||||
orte_vpid_t vpid;
|
||||
|
||||
size_t i, j, k, cnt=0;
|
||||
orte_gpr_value_t **values=NULL, *value;
|
||||
orte_gpr_keyval_t **keyvals;
|
||||
char *keys[4], *tokens[3], *cellname;
|
||||
struct timeval tv;
|
||||
struct timespec ts;
|
||||
|
||||
/* get the nodename for the headnode of the target cluster */
|
||||
if (NULL == headnode) { /* not provided, so try to look it up */
|
||||
tokens[0] = target_cluster;
|
||||
tokens[1] = NULL;
|
||||
keys[0] = ORTE_RDS_FE_NAME;
|
||||
keys[1] = ORTE_RDS_FE_SSH;
|
||||
keys[2] = ORTE_CELLID_KEY;
|
||||
keys[3] = NULL;
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
ORTE_RESOURCE_SEGMENT,
|
||||
tokens, keys, &cnt, &values))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */
|
||||
goto MOVEON;
|
||||
}
|
||||
on_gpr = true;
|
||||
/* need to decide what to do if more than value found. Some
|
||||
* clusters have more than one head node, so which one do
|
||||
* we choose? For now, just take the first one returned.
|
||||
*/
|
||||
keyvals = values[0]->keyvals;
|
||||
for (i=0; i < values[0]->cnt; i++) {
|
||||
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_NAME)) {
|
||||
hn = strdup(keyvals[i]->value.strptr);
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_SSH)) {
|
||||
can_launch = keyvals[i]->value.tf_flag;
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(keyvals[i]->key, ORTE_CELLID_KEY)) {
|
||||
cellid = keyvals[i]->value.cellid;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
goto MOVEON;
|
||||
|
||||
} else { /* lookup the headnode's cellid */
|
||||
hn = strdup(headnode);
|
||||
cellid = 0;
|
||||
keys[0] = ORTE_RDS_FE_NAME;
|
||||
keys[1] = ORTE_RDS_FE_SSH;
|
||||
keys[2] = ORTE_CELLID_KEY;
|
||||
keys[3] = NULL;
|
||||
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR,
|
||||
ORTE_RESOURCE_SEGMENT,
|
||||
NULL, keys, &cnt, &values))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */
|
||||
goto MOVEON;
|
||||
}
|
||||
on_gpr = true;
|
||||
for (i=0; i < cnt; i++) {
|
||||
keyvals = values[i]->keyvals;
|
||||
for (j=0; j < values[i]->cnt; j++) {
|
||||
if ((0 == strcmp(keyvals[j]->key, ORTE_RDS_FE_NAME)) &&
|
||||
0 == strcmp(keyvals[j]->value.strptr, headnode)) {
|
||||
/* okay, this is the right cell - now need to find
|
||||
* the ssh flag (if provided) and cellid
|
||||
*/
|
||||
for (k=0; k < values[i]->cnt; k++) {
|
||||
if (0 == strcmp(keyvals[k]->key, ORTE_RDS_FE_SSH)) {
|
||||
can_launch = keyvals[k]->value.tf_flag;
|
||||
continue;
|
||||
}
|
||||
if (0 == strcmp(keyvals[k]->key, ORTE_CELLID_KEY)) {
|
||||
cellid = keyvals[k]->value.cellid;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
goto MOVEON;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MOVEON:
|
||||
if (NULL != values) {
|
||||
for (i=0; i < cnt; i++) OBJ_RELEASE(values[i]);
|
||||
free(values);
|
||||
}
|
||||
|
||||
if (!on_gpr && (NULL != target_cluster || NULL != headnode)) {
|
||||
/* if we couldn't find anything about this cell on the gpr, then
|
||||
* we need to put the required headnode data on the registry. We need
|
||||
* it to be there so other functions/processes can find it, if needed.
|
||||
* User must provide either a target_cluster name (which then must be
|
||||
* synonymous with the headnode name), a headnode name (on a named or
|
||||
* unnamed target_cluster), or both.
|
||||
*/
|
||||
/* get new cellid for this site/resource */
|
||||
if (NULL != target_cluster) {
|
||||
cellname = strdup(target_cluster);
|
||||
} else {
|
||||
/* if the target_cluster was NULL, then headnode CAN'T be NULL
|
||||
* or else we wouldn't get here
|
||||
*/
|
||||
cellname = strdup(headnode);
|
||||
}
|
||||
/* can't know the site name, so it becomes "unknown" */
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.create_cellid(&cellid, "UNKNOWN",
|
||||
cellname))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(cellname);
|
||||
return rc;
|
||||
}
|
||||
/* now store the cell info on the resource segment of the registry */
|
||||
value = OBJ_NEW(orte_gpr_value_t);
|
||||
if (NULL == value) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
value->addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
|
||||
value->segment = strdup(ORTE_RESOURCE_SEGMENT);
|
||||
|
||||
value->cnt = 4;
|
||||
value->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*));
|
||||
if (NULL == value->keyvals) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for (i=0; i < value->cnt; i++) {
|
||||
value->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
|
||||
if (NULL == value->keyvals[i]) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
value->keyvals[0]->key = strdup(ORTE_RDS_NAME);
|
||||
value->keyvals[0]->type = ORTE_STRING;
|
||||
value->keyvals[0]->value.strptr = strdup(cellname);
|
||||
value->keyvals[1]->key = strdup(ORTE_CELLID_KEY);
|
||||
value->keyvals[1]->type = ORTE_CELLID;
|
||||
value->keyvals[1]->value.cellid = cellid;
|
||||
value->keyvals[2]->key = strdup(ORTE_RDS_FE_NAME);
|
||||
value->keyvals[2]->type = ORTE_STRING;
|
||||
if (NULL == headnode) {
|
||||
value->keyvals[2]->value.strptr = strdup(cellname);
|
||||
} else {
|
||||
value->keyvals[2]->value.strptr = strdup(headnode);
|
||||
}
|
||||
value->keyvals[3]->key = strdup(ORTE_RDS_FE_SSH);
|
||||
value->keyvals[3]->type = ORTE_BOOL;
|
||||
value->keyvals[3]->value.tf_flag = true;
|
||||
|
||||
value->num_tokens = 3;
|
||||
value->tokens = (char**)malloc(3 * sizeof(char*));
|
||||
if (NULL == value->tokens) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.convert_cellid_to_string(&(value->tokens[0]), cellid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
value->tokens[1] = strdup("UNKNOWN"); /* site name is unknown */
|
||||
value->tokens[2] = strdup(cellname);
|
||||
|
||||
if (ORTE_SUCCESS != orte_gpr.put(1, &value)) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(value);
|
||||
return rc;
|
||||
}
|
||||
OBJ_RELEASE(value);
|
||||
free(cellname);
|
||||
can_launch = true;
|
||||
}
|
||||
|
||||
orte_gpr.dump_segments(0);
|
||||
|
||||
if (!can_launch || ORTE_CELLID_MAX == cellid) {
|
||||
return ORTE_ERR_UNREACH;
|
||||
}
|
||||
|
||||
/* get the user's name on the headnode */
|
||||
@ -114,6 +301,10 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
|
||||
|
||||
/* SETUP TO LAUNCH PROBE */
|
||||
|
||||
/* setup the conditioned wait and mutex variables */
|
||||
OBJ_CONSTRUCT(&orte_setup_hnp_mutex, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&orte_setup_hnp_condition, ompi_condition_t);
|
||||
|
||||
/* get a jobid for the probe */
|
||||
if (ORTE_SUCCESS != (rc = orte_ns.create_jobid(&jobid))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -182,6 +373,7 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
|
||||
}
|
||||
asprintf(¶m, "\"%s\"", uri);
|
||||
ompi_argv_append(&argc, &argv, param);
|
||||
free(param);
|
||||
free(uri);
|
||||
|
||||
/* setup probe's gpr contact info */
|
||||
@ -193,12 +385,15 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
|
||||
}
|
||||
asprintf(¶m, "\"%s\"", uri);
|
||||
ompi_argv_append(&argc, &argv, param);
|
||||
free(param);
|
||||
free(uri);
|
||||
|
||||
/* tell the probe who to report to */
|
||||
uri = orte_rml.get_uri();
|
||||
asprintf(¶m, "\"%s\"", uri);
|
||||
ompi_argv_append(&argc, &argv, "--requestor");
|
||||
ompi_argv_append(&argc, &argv, uri);
|
||||
ompi_argv_append(&argc, &argv, param);
|
||||
free(param);
|
||||
free(uri);
|
||||
|
||||
/* issue the non-blocking recv to get the probe's findings */
|
||||
@ -225,10 +420,19 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username)
|
||||
return ORTE_ERROR;
|
||||
|
||||
} else { /* parent */
|
||||
|
||||
orte_wait_cb(pid, orte_setup_hnp_wait, &orte_setup_hnp_cbdata);
|
||||
}
|
||||
|
||||
/* block until a timeout occurs or probe dies/calls back */
|
||||
gettimeofday(&tv, NULL);
|
||||
ts.tv_sec = tv.tv_sec + 1000000;
|
||||
ts.tv_nsec = 0;
|
||||
|
||||
OMPI_THREAD_LOCK(&orte_setup_hnp_mutex);
|
||||
ompi_condition_timedwait(&orte_setup_hnp_condition, &orte_setup_hnp_mutex, &ts);
|
||||
OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
return rc;
|
||||
|
||||
@ -243,9 +447,10 @@ static void orte_setup_hnp_recv(int status, orte_process_name_t* sender,
|
||||
orte_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&orte_setup_hnp_mutex);
|
||||
ompi_output(0, "HE CALLED HOME!!");
|
||||
orte_finalize();
|
||||
exit(0);
|
||||
ompi_condition_signal(&orte_setup_hnp_condition);
|
||||
OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex);
|
||||
}
|
||||
|
||||
static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
|
||||
@ -253,6 +458,8 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
|
||||
int rc;
|
||||
orte_setup_hnp_cb_data_t *data;
|
||||
|
||||
OMPI_THREAD_LOCK(&orte_setup_hnp_mutex);
|
||||
|
||||
data = (orte_setup_hnp_cb_data_t*)cbdata;
|
||||
|
||||
/* if ssh exited abnormally, print something useful to the user and cleanup
|
||||
@ -261,11 +468,6 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
|
||||
don't really have a way to do that just yet.
|
||||
*/
|
||||
if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) {
|
||||
/* set the probe's state-of-health to aborted */
|
||||
if (ORTE_SUCCESS != (rc =
|
||||
orte_soh.set_proc_soh(data->name, ORTE_PROC_STATE_ABORTED, status))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
/* tell the user something went wrong */
|
||||
ompi_output(0, "ERROR: The probe on head node %s of the %s cluster failed to start as expected.",
|
||||
data->headnode, data->target_cluster);
|
||||
@ -289,5 +491,9 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata)
|
||||
ompi_output(0, "No extra status information is available: %d.", status);
|
||||
}
|
||||
}
|
||||
|
||||
ompi_condition_signal(&orte_setup_hnp_condition);
|
||||
OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex);
|
||||
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user