diff --git a/src/runtime/orte_setup_hnp.c b/src/runtime/orte_setup_hnp.c index bda8e8e7ea..bfa77d668f 100644 --- a/src/runtime/orte_setup_hnp.c +++ b/src/runtime/orte_setup_hnp.c @@ -38,6 +38,9 @@ #include "include/orte_constants.h" +#include "event/event.h" +#include "threads/mutex.h" +#include "threads/condition.h" #include "runtime/orte_wait.h" #include "util/argv.h" #include "util/output.h" @@ -52,7 +55,9 @@ #include "mca/base/mca_base_param.h" #include "mca/soh/soh.h" #include "mca/rml/rml.h" +#include "mca/rds/rds_types.h" #include "mca/ns/ns.h" +#include "mca/gpr/gpr.h" #include "mca/errmgr/errmgr.h" #include "runtime/runtime.h" @@ -69,6 +74,11 @@ typedef struct { orte_jobid_t jobid; } orte_setup_hnp_cb_data_t; +/* Local condition variables and mutex + */ +static ompi_mutex_t orte_setup_hnp_mutex; +static ompi_condition_t orte_setup_hnp_condition; + static orte_setup_hnp_cb_data_t orte_setup_hnp_cbdata = {NULL, NULL, NULL, 0}; /* @@ -94,15 +104,192 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) char *path, *name_string, *orteprobe; int argc, rc=ORTE_SUCCESS, id; pid_t pid; + bool can_launch=false, on_gpr=false; orte_cellid_t cellid=ORTE_CELLID_MAX; orte_jobid_t jobid; orte_vpid_t vpid; - + size_t i, j, k, cnt=0; + orte_gpr_value_t **values=NULL, *value; + orte_gpr_keyval_t **keyvals; + char *keys[4], *tokens[3], *cellname; + struct timeval tv; + struct timespec ts; + /* get the nodename for the headnode of the target cluster */ if (NULL == headnode) { /* not provided, so try to look it up */ + tokens[0] = target_cluster; + tokens[1] = NULL; + keys[0] = ORTE_RDS_FE_NAME; + keys[1] = ORTE_RDS_FE_SSH; + keys[2] = ORTE_CELLID_KEY; + keys[3] = NULL; + if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + ORTE_RESOURCE_SEGMENT, + tokens, keys, &cnt, &values))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */ + goto MOVEON; + } + on_gpr = true; + /* need to decide what to do if more than value found. Some + * clusters have more than one head node, so which one do + * we choose? For now, just take the first one returned. + */ + keyvals = values[0]->keyvals; + for (i=0; i < values[0]->cnt; i++) { + if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_NAME)) { + hn = strdup(keyvals[i]->value.strptr); + continue; + } + if (0 == strcmp(keyvals[i]->key, ORTE_RDS_FE_SSH)) { + can_launch = keyvals[i]->value.tf_flag; + continue; + } + if (0 == strcmp(keyvals[i]->key, ORTE_CELLID_KEY)) { + cellid = keyvals[i]->value.cellid; + continue; + } + } + goto MOVEON; + } else { /* lookup the headnode's cellid */ hn = strdup(headnode); - cellid = 0; + keys[0] = ORTE_RDS_FE_NAME; + keys[1] = ORTE_RDS_FE_SSH; + keys[2] = ORTE_CELLID_KEY; + keys[3] = NULL; + if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, + ORTE_RESOURCE_SEGMENT, + NULL, keys, &cnt, &values))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (0 == cnt || 0 == values[0]->cnt) { /* nothing found */ + goto MOVEON; + } + on_gpr = true; + for (i=0; i < cnt; i++) { + keyvals = values[i]->keyvals; + for (j=0; j < values[i]->cnt; j++) { + if ((0 == strcmp(keyvals[j]->key, ORTE_RDS_FE_NAME)) && + 0 == strcmp(keyvals[j]->value.strptr, headnode)) { + /* okay, this is the right cell - now need to find + * the ssh flag (if provided) and cellid + */ + for (k=0; k < values[i]->cnt; k++) { + if (0 == strcmp(keyvals[k]->key, ORTE_RDS_FE_SSH)) { + can_launch = keyvals[k]->value.tf_flag; + continue; + } + if (0 == strcmp(keyvals[k]->key, ORTE_CELLID_KEY)) { + cellid = keyvals[k]->value.cellid; + continue; + } + } + goto MOVEON; + } + } + } + } + +MOVEON: + if (NULL != values) { + for (i=0; i < cnt; i++) OBJ_RELEASE(values[i]); + free(values); + } + + if (!on_gpr && (NULL != target_cluster || NULL != headnode)) { + /* if we couldn't find anything about this cell on the gpr, then + * we need to put the required headnode data on the registry. We need + * it to be there so other functions/processes can find it, if needed. + * User must provide either a target_cluster name (which then must be + * synonymous with the headnode name), a headnode name (on a named or + * unnamed target_cluster), or both. + */ + /* get new cellid for this site/resource */ + if (NULL != target_cluster) { + cellname = strdup(target_cluster); + } else { + /* if the target_cluster was NULL, then headnode CAN'T be NULL + * or else we wouldn't get here + */ + cellname = strdup(headnode); + } + /* can't know the site name, so it becomes "unknown" */ + if (ORTE_SUCCESS != (rc = orte_ns.create_cellid(&cellid, "UNKNOWN", + cellname))) { + ORTE_ERROR_LOG(rc); + free(cellname); + return rc; + } + /* now store the cell info on the resource segment of the registry */ + value = OBJ_NEW(orte_gpr_value_t); + if (NULL == value) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + value->addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR; + value->segment = strdup(ORTE_RESOURCE_SEGMENT); + + value->cnt = 4; + value->keyvals = (orte_gpr_keyval_t**)malloc(value->cnt * sizeof(orte_gpr_keyval_t*)); + if (NULL == value->keyvals) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + for (i=0; i < value->cnt; i++) { + value->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t); + if (NULL == value->keyvals[i]) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + } + value->keyvals[0]->key = strdup(ORTE_RDS_NAME); + value->keyvals[0]->type = ORTE_STRING; + value->keyvals[0]->value.strptr = strdup(cellname); + value->keyvals[1]->key = strdup(ORTE_CELLID_KEY); + value->keyvals[1]->type = ORTE_CELLID; + value->keyvals[1]->value.cellid = cellid; + value->keyvals[2]->key = strdup(ORTE_RDS_FE_NAME); + value->keyvals[2]->type = ORTE_STRING; + if (NULL == headnode) { + value->keyvals[2]->value.strptr = strdup(cellname); + } else { + value->keyvals[2]->value.strptr = strdup(headnode); + } + value->keyvals[3]->key = strdup(ORTE_RDS_FE_SSH); + value->keyvals[3]->type = ORTE_BOOL; + value->keyvals[3]->value.tf_flag = true; + + value->num_tokens = 3; + value->tokens = (char**)malloc(3 * sizeof(char*)); + if (NULL == value->tokens) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + if (ORTE_SUCCESS != (rc = orte_ns.convert_cellid_to_string(&(value->tokens[0]), cellid))) { + ORTE_ERROR_LOG(rc); + return rc; + } + value->tokens[1] = strdup("UNKNOWN"); /* site name is unknown */ + value->tokens[2] = strdup(cellname); + + if (ORTE_SUCCESS != orte_gpr.put(1, &value)) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(value); + return rc; + } + OBJ_RELEASE(value); + free(cellname); + can_launch = true; + } + + orte_gpr.dump_segments(0); + + if (!can_launch || ORTE_CELLID_MAX == cellid) { + return ORTE_ERR_UNREACH; } /* get the user's name on the headnode */ @@ -114,6 +301,10 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) /* SETUP TO LAUNCH PROBE */ + /* setup the conditioned wait and mutex variables */ + OBJ_CONSTRUCT(&orte_setup_hnp_mutex, ompi_mutex_t); + OBJ_CONSTRUCT(&orte_setup_hnp_condition, ompi_condition_t); + /* get a jobid for the probe */ if (ORTE_SUCCESS != (rc = orte_ns.create_jobid(&jobid))) { ORTE_ERROR_LOG(rc); @@ -182,6 +373,7 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) } asprintf(¶m, "\"%s\"", uri); ompi_argv_append(&argc, &argv, param); + free(param); free(uri); /* setup probe's gpr contact info */ @@ -193,12 +385,15 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) } asprintf(¶m, "\"%s\"", uri); ompi_argv_append(&argc, &argv, param); + free(param); free(uri); /* tell the probe who to report to */ uri = orte_rml.get_uri(); + asprintf(¶m, "\"%s\"", uri); ompi_argv_append(&argc, &argv, "--requestor"); - ompi_argv_append(&argc, &argv, uri); + ompi_argv_append(&argc, &argv, param); + free(param); free(uri); /* issue the non-blocking recv to get the probe's findings */ @@ -225,10 +420,19 @@ int orte_setup_hnp(char *target_cluster, char *headnode, char *username) return ORTE_ERROR; } else { /* parent */ - orte_wait_cb(pid, orte_setup_hnp_wait, &orte_setup_hnp_cbdata); - } + /* block until a timeout occurs or probe dies/calls back */ + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + 1000000; + ts.tv_nsec = 0; + + OMPI_THREAD_LOCK(&orte_setup_hnp_mutex); + ompi_condition_timedwait(&orte_setup_hnp_condition, &orte_setup_hnp_mutex, &ts); + OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex); + return ORTE_SUCCESS; + } + CLEANUP: return rc; @@ -243,9 +447,10 @@ static void orte_setup_hnp_recv(int status, orte_process_name_t* sender, orte_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata) { + OMPI_THREAD_LOCK(&orte_setup_hnp_mutex); ompi_output(0, "HE CALLED HOME!!"); - orte_finalize(); - exit(0); + ompi_condition_signal(&orte_setup_hnp_condition); + OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex); } static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata) @@ -253,6 +458,8 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata) int rc; orte_setup_hnp_cb_data_t *data; + OMPI_THREAD_LOCK(&orte_setup_hnp_mutex); + data = (orte_setup_hnp_cb_data_t*)cbdata; /* if ssh exited abnormally, print something useful to the user and cleanup @@ -261,11 +468,6 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata) don't really have a way to do that just yet. */ if (! WIFEXITED(status) || ! WEXITSTATUS(status) == 0) { - /* set the probe's state-of-health to aborted */ - if (ORTE_SUCCESS != (rc = - orte_soh.set_proc_soh(data->name, ORTE_PROC_STATE_ABORTED, status))) { - ORTE_ERROR_LOG(rc); - } /* tell the user something went wrong */ ompi_output(0, "ERROR: The probe on head node %s of the %s cluster failed to start as expected.", data->headnode, data->target_cluster); @@ -289,5 +491,9 @@ static void orte_setup_hnp_wait(pid_t wpid, int status, void *cbdata) ompi_output(0, "No extra status information is available: %d.", status); } } + + ompi_condition_signal(&orte_setup_hnp_condition); + OMPI_THREAD_UNLOCK(&orte_setup_hnp_mutex); + }