diff --git a/orte/mca/pls/base/base.h b/orte/mca/pls/base/base.h index ab9c59fb38..91cefc2b3e 100644 --- a/orte/mca/pls/base/base.h +++ b/orte/mca/pls/base/base.h @@ -55,6 +55,8 @@ extern "C" { opal_condition_t orted_cmd_cond; /** reuse daemons flag */ bool reuse_daemons; + /** request for timing measurement reports */ + bool timing; } orte_pls_base_t; /** diff --git a/orte/mca/pls/base/pls_base_open.c b/orte/mca/pls/base/pls_base_open.c index 16585d1dcf..a7074aeb65 100644 --- a/orte/mca/pls/base/pls_base_open.c +++ b/orte/mca/pls/base/pls_base_open.c @@ -76,6 +76,16 @@ int orte_pls_base_open(void) orte_pls_base.reuse_daemons = true; } + /* check for timing requests */ + mca_base_param_reg_int_name("orte", "timing", + "Request that critical timing loops be measured", + false, false, 0, &value); + if (value != 0) { + orte_pls_base.timing = true; + } else { + orte_pls_base.timing = false; + } + /* Open up all the components that we can find */ if (ORTE_SUCCESS != diff --git a/orte/mca/pls/bproc/pls_bproc.c b/orte/mca/pls/bproc/pls_bproc.c index 31bab70dd1..c32b9028d9 100644 --- a/orte/mca/pls/bproc/pls_bproc.c +++ b/orte/mca/pls/bproc/pls_bproc.c @@ -76,9 +76,10 @@ /** * Our current evironment */ -#if !defined(__WINDOWS__) extern char **environ; -#endif /* !defined(__WINDOWS__) */ + +static bool daemons_launched; +static bool bynode; #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS int orte_pls_bproc_launch_threaded(orte_jobid_t); @@ -103,10 +104,8 @@ orte_pls_base_module_t orte_pls_bproc_module = { }; -static int orte_pls_bproc_node_array(orte_job_map_t* map, - int ** node_array, int * node_array_len); -static int orte_pls_bproc_node_list(int * node_array, int node_array_len, - int ** node_list, int * num_nodes, +static int orte_pls_bproc_node_list(orte_job_map_t *map, + int *node_array, int * num_nodes, int num_procs); static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io, int node_rank, int app_context); @@ -121,92 +120,36 @@ static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd, char * const argv[], char * envp[]); #endif static void orte_pls_bproc_setup_env(char *** env); -static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, - orte_job_map_t *map, - orte_vpid_t global_vpid_start, - orte_jobid_t jobid, int* num_daemons); -static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, - orte_job_map_t* map, int num_processes, - int num_slots, - orte_vpid_t vpid_start, - orte_vpid_t global_vpid_start, - int app_context, - int * node_array, int node_array_len); +static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp); +static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots, + orte_vpid_t vpid_start, int app_context); /** - * creates an array that is indexed by the node number and each entry contains the - * number of processes that will be launched on that node. - * - * @param map single context mapping - * @param node_array a pointer to put the node array into - * @param node_array_len returns the length of the array - * @retval >=0 the number of processes - * @retval <0 orte err + * Creates a list of nodes from a job map that should participate in the next launch cycle. + * @param map a pointer to the job map + * @param node_array a pointer to an integer array that will contain the node names + * @param num_nodes a pointer to the place where we will store the number of nodes in the array + * @param num_procs the number of processes that a node must have to be placed on the list */ -static int orte_pls_bproc_node_array(orte_job_map_t* map, - int ** node_array, int * node_array_len) { - opal_list_item_t* item; - int num_procs = 0; - int num_on_node; - - OPAL_TRACE(1); - - *node_array_len = 0; - for(item = opal_list_get_first(&map->nodes); - item != opal_list_get_end(&map->nodes); - item = opal_list_get_next(item)) { - if(*node_array_len < atol(((orte_mapped_node_t*)item)->nodename)) { - *node_array_len = atol(((orte_mapped_node_t*)item)->nodename); - } - } - (*node_array_len)++; - /* build the node array */ - *node_array = (int*)malloc(sizeof(int) * *node_array_len); - if(NULL == *node_array) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - memset(*node_array, 0, sizeof(int) * *node_array_len); - - for(item = opal_list_get_first(&map->nodes); - item != opal_list_get_end(&map->nodes); - item = opal_list_get_next(item)) { - orte_mapped_node_t* node = (orte_mapped_node_t*)item; - num_on_node = opal_list_get_size(&node->procs); - (*node_array)[atol(node->nodename)] += num_on_node; - num_procs += num_on_node; - } - return num_procs; -} - -/** - * Creates a bproc nodelist from a node array. - * @param node_array an array of bproc nodes that contains the number of processes - * to be launched on each node - * @param node_array_len the length of the node array - * @param node_list a pointer that the bproc node list will be returned in - * @param num_nodes a pointer to return the number of nodes in the node list - * @param num_procs the number of processes that a node must have to be on the - * node list - */ -static int orte_pls_bproc_node_list(int * node_array, int node_array_len, - int ** node_list, int * num_nodes, - int num_procs) { - int node; - +static int orte_pls_bproc_node_list(orte_job_map_t *map, int *node_array, int *num_nodes, int num_procs) +{ + opal_list_item_t *item; + orte_mapped_node_t *node; + OPAL_TRACE(1); + /* initialize all */ *num_nodes = 0; - *node_list = (int*)malloc(sizeof(int) * node_array_len); - if(NULL == *node_list) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } + memset((void*)node_array, -1, sizeof(int) * map->num_nodes); /* build the node list */ - for(node = 0; node < node_array_len; node++) { - if(node_array[node] >= num_procs) { - (*node_list)[(*num_nodes)++] = node; + for(item = opal_list_get_first(&map->nodes); + item != opal_list_get_end(&map->nodes); + item = opal_list_get_next(item)) { + node = (orte_mapped_node_t*)item; + + if (node->num_procs >= num_procs) { + node_array[(*num_nodes)++] = atoi(node->nodename); } } return ORTE_SUCCESS; @@ -328,7 +271,7 @@ static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data) OPAL_TRACE(1); - if(!mca_pls_bproc_component.done_launching) { + if(!daemons_launched) { /* if a daemon exits before we are done launching the user apps we send a * message to ourself so we will break out of the receive loop and exit */ orte_buffer_t ack; @@ -438,12 +381,6 @@ static void orte_pls_bproc_setup_env(char *** env) opal_setenv(var,orte_process_info.ns_replica_uri, true, env); free(var); - /* make sure the frontend hostname does not get pushed out to the backend */ - var = mca_base_param_environ_variable("orte", "base", "nodename"); - opal_unsetenv(var, env); - free(var); - opal_unsetenv("HOSTNAME", env); - /* make sure the username used to create the bproc directory is the same on * the backend as the frontend */ var = mca_base_param_environ_variable("pls","bproc","username"); @@ -467,13 +404,19 @@ static void orte_pls_bproc_setup_env(char *** env) free(param); free(var); - /* merge in environment */ + /* merge in environment - merge ensures we don't overwrite anything we just set */ merged = opal_environ_merge(*env, environ); opal_argv_free(*env); *env = merged; - + /* make sure hostname doesn't get pushed to backend node */ opal_unsetenv("HOSTNAME", env); + + /* make sure the frontend hostname does not get pushed out to the backend */ + var = mca_base_param_environ_variable("orte", "base", "nodename"); + opal_unsetenv(var, env); + free(var); + } /** @@ -489,10 +432,7 @@ static void orte_pls_bproc_setup_env(char *** env) * @retval ORTE_SUCCESS * @retval error */ -static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, - orte_job_map_t *map, - orte_vpid_t global_vpid_start, - orte_jobid_t jobid, int *num_launched) { +static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp) { int * daemon_list = NULL; int num_daemons = 0; int rc, i; @@ -503,9 +443,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, char * var; int stride; char * orted_path; - orte_jobid_t daemon_jobid; - orte_process_name_t * proc_name; - orte_vpid_t daemon_vpid_start = 0; + orte_vpid_t daemon_vpid_start; orte_std_cntr_t idx; struct stat buf; opal_list_t daemons; @@ -515,12 +453,15 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, OPAL_TRACE(1); - if (mca_pls_bproc_component.timing) { + if (orte_pls_base.timing) { if (0 != gettimeofday(&joblaunchstart, NULL)) { opal_output(0, "pls_bproc: could not obtain start time"); } } + /* indicate that the daemons have not completely launched yet */ + daemons_launched = false; + /* setup a list that will contain the info for all the daemons * so we can store it on the registry when done */ @@ -530,7 +471,13 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, * their names so we can pass that to bproc - populate the list * with the node names */ - num_daemons = opal_list_get_size(&map->nodes); + num_daemons = map->num_nodes; + if (0 == num_daemons) { + /* nothing to do */ + OBJ_DESTRUCT(&daemons); + return ORTE_SUCCESS; + } + if(NULL == (daemon_list = (int*)malloc(sizeof(int) * num_daemons))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto cleanup; @@ -544,26 +491,28 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, daemon_list[i++] = atoi(node->nodename); } - /* allocate storage to save the daemon pids */ + /* allocate storage for bproc to return the daemon pids */ if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto cleanup; } /* allocate a range of vpids for the daemons */ - daemon_jobid = orte_process_info.my_name->jobid; - rc = orte_ns.reserve_range(daemon_jobid, num_daemons, &daemon_vpid_start); + rc = orte_ns.reserve_range(0, num_daemons, &daemon_vpid_start); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } /* setup the orted triggers for passing their launch info */ - if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_daemons, NULL, NULL))) { + if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(map->job, num_daemons, NULL, NULL))) { ORTE_ERROR_LOG(rc); goto cleanup; } + /* setup the daemon environment */ + orte_pls_bproc_setup_env(envp); + /* daemons calculate their process name using a "stride" of one, so * push that value into their environment */ stride = 1; @@ -574,8 +523,8 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, free(var); /* set up the base environment so the daemons can get their names once launched */ - rc = orte_ns_nds_bproc_put(cellid, daemon_jobid, daemon_vpid_start, - global_vpid_start, num_daemons, envp); + rc = orte_ns_nds_bproc_put(ORTE_PROC_MY_NAME->cellid, 0, daemon_vpid_start, + 0, num_daemons, envp); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -592,7 +541,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, #endif opal_argv_append(&argc, &argv, "--bootproxy"); - orte_ns.convert_jobid_to_string(¶m, jobid); + orte_ns.convert_jobid_to_string(¶m, map->job); opal_argv_append(&argc, &argv, param); free(param); @@ -634,17 +583,20 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, } /* launch the daemons */ - mca_pls_bproc_component.num_daemons += num_daemons; - - if (mca_pls_bproc_component.timing) { + if (orte_pls_base.timing) { if (0 != gettimeofday(&launchstart, NULL)) { opal_output(0, "pls_bproc: could not obtain start time"); } } - rc = bproc_vexecmove(num_daemons, daemon_list, pids, orted_path, argv, *envp); + if (mca_pls_bproc_component.do_not_launch) { + for (i=0; i < num_daemons; i++) pids[i] = i+1; + rc = num_daemons; + } else { + rc = bproc_vexecmove(num_daemons, daemon_list, pids, orted_path, argv, *envp); + } - if (mca_pls_bproc_component.timing) { + if (orte_pls_base.timing) { if (0 != gettimeofday(&launchstop, NULL)) { opal_output(0, "pls_bproc: could not obtain stop time"); } else { @@ -657,10 +609,10 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, if(rc != num_daemons) { opal_show_help("help-pls-bproc.txt", "daemon-launch-number", true, num_daemons, rc, orted_path); - mca_pls_bproc_component.num_daemons -= num_daemons; rc = ORTE_ERROR; goto cleanup; } + if(0 < mca_pls_bproc_component.debug) { opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n", rc, *pids); @@ -674,33 +626,27 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, ORTE_ERROR_LOG(rc); goto cleanup; } else { - rc = orte_ns.create_process_name(&proc_name, cellid, daemon_jobid, - daemon_vpid_start + i); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.daemon_names, - proc_name); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } if (0 > asprintf(¶m, "%d", daemon_list[i])) { rc = ORTE_ERR_OUT_OF_RESOURCE; ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); goto cleanup; } - rc = orte_pls_bproc_set_node_pid(cellid, param, jobid, pids[i]); + rc = orte_pls_bproc_set_node_pid(ORTE_PROC_MY_NAME->cellid, param, map->job, pids[i]); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } + dmn = OBJ_NEW(orte_pls_daemon_info_t); - orte_dss.copy((void**)&(dmn->name), proc_name, ORTE_NAME); - dmn->cell = cellid; + rc = orte_ns.create_process_name(&(dmn->name), ORTE_PROC_MY_NAME->cellid, 0, + daemon_vpid_start + i); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + dmn->cell = dmn->name->cellid; dmn->nodename = strdup(param); - dmn->active_job = jobid; + dmn->active_job = map->job; opal_list_append(&daemons, &dmn->super); free(param); @@ -711,22 +657,63 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons))) { ORTE_ERROR_LOG(rc); } - *num_launched = num_daemons; /* setup the callbacks - this needs to be done *after* we store the * daemon info so that short-lived apps don't cause mpirun to * try and terminate the orteds before we record them */ - for (i=0; i < num_daemons; i++) { - rc = orte_wait_cb(pids[i], orte_pls_bproc_waitpid_daemon_cb, - &daemon_list[i]); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; + if (!mca_pls_bproc_component.do_not_launch) { + for (i=0; i < num_daemons; i++) { + rc = orte_wait_cb(pids[i], orte_pls_bproc_waitpid_daemon_cb, + &daemon_list[i]); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* wait for communication back from the daemons, which indicates they have + * sucessfully set up the pty/pipes and IO forwarding which the user apps + * will use */ + for(i = 0; i < num_daemons; i++) { + orte_buffer_t ack; + int src[4]; + OBJ_CONSTRUCT(&ack, orte_buffer_t); + rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &ack, ORTE_RML_TAG_BPROC); + if(0 > rc) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&ack); + goto cleanup; + } + idx = 4; + rc = orte_dss.unpack(&ack, &src, &idx, ORTE_INT); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + } + OBJ_DESTRUCT(&ack); + + if(-1 == src[0]) { + /* one of the daemons has failed to properly launch. The error is sent + * by orte_pls_bproc_waitpid_daemon_cb */ + if(-1 == src[1]) { /* did not die on a signal */ + opal_show_help("help-pls-bproc.txt", "daemon-died-no-signal", true, + src[2], src[3]); + } else { /* died on a signal */ + opal_show_help("help-pls-bproc.txt", "daemon-died-signal", true, + src[2], src[3], src[1]); + } + rc = ORTE_ERROR; + ORTE_ERROR_LOG(rc); + orte_pls_bproc_terminate_job(map->job, NULL); + goto cleanup; + } } } - - if (mca_pls_bproc_component.timing) { + + /* indicate that the daemons have now launched */ + daemons_launched = true; + + if (orte_pls_base.timing) { if (0 != gettimeofday(&launchstop, NULL)) { opal_output(0, "pls_bproc: could not obtain stop time"); } else { @@ -750,131 +737,44 @@ cleanup: OBJ_RELEASE(item); } OBJ_DESTRUCT(&daemons); - + return rc; } -static void -orte_pls_bproc_check_node_state(orte_gpr_notify_data_t *notify_data, - void *user_tag) -{ - orte_gpr_value_t **values; - bool dead_node = false; - char *dead_node_name; - orte_std_cntr_t i, j; - - OPAL_TRACE(1); - - /* first see if node is in - ORTE_NODE_STATE_DOWN or - ORTE_NODE_STATE_REBOOT */ - - values = (orte_gpr_value_t**)(notify_data->values)->addr; - for( j = 0; j < notify_data->cnt; j++) { - dead_node = false; - for( i = 0; i < values[j]->cnt; i++) { - orte_gpr_keyval_t* keyval = values[j]->keyvals[i]; - if(strcmp(keyval->key, ORTE_NODE_STATE_KEY) == 0) { - orte_node_state_t *node_state; - int ret; - if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &node_state, keyval->value, ORTE_NODE_STATE))) { - return; - } - if( *node_state == ORTE_NODE_STATE_DOWN || - *node_state == ORTE_NODE_STATE_REBOOT) { - dead_node = true; - printf("found a dead node state.. \n"); - } - } else if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) { - char* tmp_name; - int ret; - if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &tmp_name, keyval->value, ORTE_STRING))) { - return; - } - else { - dead_node_name = strdup(tmp_name); - printf("found a node named %s\n", dead_node_name); - } - } - } - printf("found a node named %s is dead? %d\n", dead_node_name, dead_node); - if(dead_node) { - /* gotta see if this node belongs to us... arg.. */ - /* also, we know by order of creation that the node state */ - /* comes before the node name.. see smr_bproc.c */ - orte_std_cntr_t name_idx; - for (name_idx = 0; - name_idx < orte_pointer_array_get_size(mca_pls_bproc_component.active_node_names); - name_idx++) { - char* node_name = (char*) orte_pointer_array_get_item(mca_pls_bproc_component.active_node_names, name_idx); - if(strcmp(node_name, dead_node_name) == 0){ - /* one of our nodes up and died... */ - /* not much to do other than die.... */ - int ret = ORTE_SUCCESS; - char *segment = NULL; - orte_gpr_value_t** seg_values = NULL; - orte_std_cntr_t num_values = 0; - - /********************** - * Job Info segment - **********************/ - segment = strdup(ORTE_JOBINFO_SEGMENT); - - if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR, - segment, - NULL, - NULL, - &num_values, - &seg_values ) ) ) { - - } - - /* - * kill all the jobids that are not zero - */ - for(i = 0; i < num_values; ++i) { - orte_gpr_value_t* value = values[i]; - orte_jobid_t jobid; - orte_schema.extract_jobid_from_segment_name(&jobid, value->tokens[0]); - printf("killing jobid %d\n", jobid); - if(jobid != 0) - orte_pls_bproc_terminate_job(jobid, NULL); - } - /* - * and kill everyone else - */ - printf("and go bye-bye...\n"); - orte_pls_bproc_terminate_job(0, NULL); - /* shouldn't ever get here.. */ - exit(1); - } - - - } - } - } -} - - static int -orte_pls_bproc_monitor_nodes(void) +orte_pls_bproc_node_failed(orte_gpr_notify_msg_t *msg) { - orte_gpr_subscription_id_t id; - - OPAL_TRACE(1); - - return orte_gpr.subscribe_1(&id, - NULL, - NULL, - ORTE_GPR_NOTIFY_VALUE_CHG, - ORTE_GPR_TOKENS_OR | - ORTE_GPR_KEYS_OR, - ORTE_BPROC_NODE_SEGMENT, - NULL, - ORTE_NODE_STATE_KEY, - orte_pls_bproc_check_node_state, - NULL); + orte_jobid_t job; + + /* respond to a node failure reported by the smr. We know that + * this function will only be called when one or more nodes in + * our allocation fails, so we just need to respond to it. The + * complication is that the failure could occur in any of several + * states: + * (a) before we start to launch the daemons + * (b) while we are launching the daemons + * (c) after the daemons are launched, while we are launching the app + * (d) during app launch + * (e) after app launch, but before completion + * (f) while the app is finalizing + * (g) while we are cleaning up after the app has finalized + */ + + printf("mpirun has detected a dead node within the job and is terminating\n"); + + /* extract the jobid from the returned data */ + orte_schema.extract_jobid_from_std_trigger_name(&job, msg->trigger); + + /* terminate all jobs in the in the job family */ + orte_pls_bproc_terminate_job(job, NULL); + + /* kill the daemons */ + orte_pls_bproc_terminate_job(0, NULL); + + /* shouldn't ever get here.. */ + exit(1); + } @@ -892,17 +792,13 @@ orte_pls_bproc_monitor_nodes(void) * @retval ORTE_SUCCESS * @retval error */ -static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, - orte_job_map_t* map, int num_processes, int num_slots, - orte_vpid_t vpid_start, - orte_vpid_t global_vpid_start, - int app_context, int * node_array, - int node_array_len) { - int * node_list = NULL; - int num_nodes, cycle; +static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots, + orte_vpid_t vpid_start, int app_context) { + int *node_array, num_nodes, cycle; int rc, i, j, stride; - int * pids = NULL; - char * var, * param; + orte_std_cntr_t num_processes; + int *pids = NULL; + char *var, *param; orte_process_name_t * proc_name; struct bproc_io_t bproc_io[3]; char **env; @@ -910,13 +806,8 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, OPAL_TRACE(1); - if(NULL == (pids = (int*)malloc(sizeof(int) * node_array_len))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - /* point to the env array for this app_context */ - env = map->apps[app_context]->env; + env = opal_argv_copy(map->apps[app_context]->env); /* set up app context */ asprintf(¶m, "%d", app_context); @@ -926,7 +817,7 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, free(var); /* set the vpid-to-vpid stride based on the mapping mode */ - if (mca_pls_bproc_component.bynode) { + if (bynode) { /* we are mapping by node, so we want to set the stride * length (i.e., the step size between vpids that is used * to compute the process name) to 1 @@ -946,6 +837,13 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, free(param); free(var); + /* set up the node_array to handle the launch */ + node_array = (int*)malloc(map->num_nodes * sizeof(int)); + if (NULL == node_array) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* initialize the cycle count. Computing the process name under Bproc * is a complex matter when mapping by slot as Bproc's inherent * methodology is to do everything by node. When mapping by slot, the @@ -965,8 +863,9 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, /* launch the processes */ i = 1; - rc = orte_pls_bproc_node_list(node_array, node_array_len, &node_list, - &num_nodes, i); + num_processes = map->vpid_range; + + rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -980,19 +879,19 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, "\tlaunching cycle %d", i); for (dbg=0; dbgcellid, map->job, vpid_start, map->vpid_start, num_processes, &env); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } - rc = orte_pls_bproc_setup_io(jobid, bproc_io, i - 1, app_context); + rc = orte_pls_bproc_setup_io(map->job, bproc_io, i - 1, app_context); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -1000,9 +899,24 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, if(0 < mca_pls_bproc_component.debug) { opal_output(0, "pls_bproc: launching %d processes:", num_nodes); } - rc = bproc_vexecmove_io(num_nodes, node_list, pids, bproc_io, 3, - map->apps[app_context]->app, - map->apps[app_context]->argv, env); + + /* allocate space for bproc to return the pids */ + pids = (int*)malloc(num_nodes * sizeof(int)); + if (NULL == pids) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + + if (mca_pls_bproc_component.do_not_launch) { + for (j=0; j < num_nodes; j++) pids[j] = j+1; + rc = num_nodes; + } else { + rc = bproc_vexecmove_io(num_nodes, node_array, pids, bproc_io, 3, + map->apps[app_context]->app, + map->apps[app_context]->argv, env); + } + if(0 < mca_pls_bproc_component.debug) { opal_output(0, "pls_bproc: %d processes launched. First pid: %d", rc, *pids); @@ -1013,37 +927,39 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, rc = ORTE_ERROR; goto cleanup; } + for(j = 0; j < num_nodes; j++) { if(0 >= pids[j]) { opal_show_help("help-pls-bproc.txt", "proc-launch-bad-pid", true, - node_list[j], pids[j], errno, map->apps[app_context]->app); + node_array[j], pids[j], errno, map->apps[app_context]->app); rc = ORTE_ERROR; ORTE_ERROR_LOG(rc); goto cleanup; } else { - mca_pls_bproc_component.num_procs++; - rc = orte_ns.create_process_name(&proc_name, cellid, jobid, + rc = orte_ns.create_process_name(&proc_name, ORTE_PROC_MY_NAME->cellid, map->job, vpid_start + j); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } - orte_pls_bproc_set_proc_pid(proc_name, pids[j], node_list[j]); + orte_pls_bproc_set_proc_pid(proc_name, pids[j], node_array[j]); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } - rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; + if (!mca_pls_bproc_component.do_not_launch) { + rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name); + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } } } } - free(node_list); - node_list = NULL; + free(pids); + pids = NULL; i++; - if (mca_pls_bproc_component.bynode) { + if (bynode) { /* we are mapping by node, so the vpid_start must increment by * the number of nodes */ @@ -1064,8 +980,7 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, } } - rc = orte_pls_bproc_node_list(node_array, node_array_len, &node_list, - &num_nodes, i); + rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -1073,12 +988,13 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, } cleanup: - if(NULL != node_list) { - free(node_list); - } if(NULL != pids) { free(pids); } + + free(node_array); + + if (NULL != env) opal_argv_free(env); return rc; } @@ -1096,25 +1012,16 @@ cleanup: * @retval error */ int orte_pls_bproc_launch(orte_jobid_t jobid) { - opal_list_item_t* item; - orte_cellid_t cellid; orte_job_map_t* map; orte_mapped_node_t *map_node; orte_vpid_t vpid_launch; - orte_vpid_t vpid_range; - orte_vpid_t vpid_start; int rc; - int src[4]; - int ** node_array = NULL; - int * node_array_len = NULL; - int num_processes = 0; - int num_daemons; int num_slots; int context; - int i, j; - orte_std_cntr_t idx; + int i; char cwd_save[OMPI_PATH_MAX + 1]; orte_ras_node_t *ras_node; + char **daemon_env; OPAL_TRACE(1); @@ -1137,14 +1044,12 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) { return rc; } - if(ORTE_SUCCESS != (rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, - &vpid_range))) { - ORTE_ERROR_LOG(rc); - goto cleanup; + /* set the mapping mode */ + if (NULL != map->mapping_mode && 0 == strcmp("bynode", map->mapping_mode)) { + bynode = true; + } else { + bynode = false; } - - /* get the cellid */ - cellid = orte_process_info.my_name->cellid; /* check all of the app_contexts for sanity */ for (i=0; i < map->num_apps; i++) { @@ -1190,94 +1095,32 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) { opal_output(0, "pls_bproc: --- starting to launch procs ---"); } - /* create an array to hold the pointers to the node arrays for each app - * context. Also, create an array to hold the lengths of the node arrays */ - node_array = malloc(map->num_apps * sizeof(int *)); - node_array_len = malloc(map->num_apps * sizeof(int *)); + /* save the daemon environment */ + daemon_env = opal_argv_copy(map->apps[0]->env); - /* for each application context - create a node array and setup its env */ + /* for each application context, setup its env */ for(i=0; i < map->num_apps; i++) { - rc = orte_pls_bproc_node_array(map, &node_array[i], - &node_array_len[i]); - if(0 > rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - orte_pls_bproc_setup_env(&map->apps[i]->env); - num_processes += rc; } - /* save the active node names */ - idx = 0; - for (item = opal_list_get_first(&map->nodes); - item != opal_list_get_end(&map->nodes); - item = opal_list_get_next(item)) { - orte_mapped_node_t* node = (orte_mapped_node_t*) item; - - rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.active_node_names, - strdup(node->nodename)); - } - - /* setup subscription for each node so we can detect + /* tell the smr which nodes to monitor so we can be notified when the node's state changes, useful for aborting when - a bproc node up and dies */ - - rc = orte_pls_bproc_monitor_nodes(); + a bproc node up and dies */ + if (ORTE_SUCCESS != (rc = orte_smr.begin_monitoring(map, orte_pls_bproc_node_failed, NULL))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* launch the daemons on all nodes which have processes assigned to them */ + rc = orte_pls_bproc_launch_daemons(map, &daemon_env); + opal_argv_free(daemon_env); + if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } - - /* launch the daemons on all the nodes which have processes assigned to them. - * We need to send along an appropriate environment for the daemons. Since - * there must be at least ONE app_context, we can just take that one - */ - rc = orte_pls_bproc_launch_daemons(cellid, &map->apps[0]->env, map, - vpid_start, jobid, &num_daemons); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* wait for communication back from the daemons, which indicates they have - * sucessfully set up the pty/pipes and IO forwarding which the user apps - * will use */ - for(j = 0; j < num_daemons; j++) { - orte_buffer_t ack; - OBJ_CONSTRUCT(&ack, orte_buffer_t); - rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &ack, ORTE_RML_TAG_BPROC); - if(0 > rc) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&ack); - goto cleanup; - } - idx = 4; - rc = orte_dss.unpack(&ack, &src, &idx, ORTE_INT); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - } - OBJ_DESTRUCT(&ack); - - if(-1 == src[0]) { - /* one of the daemons has failed to properly launch. The error is sent - * by orte_pls_bproc_waitpid_daemon_cb */ - if(-1 == src[1]) { /* did not die on a signal */ - opal_show_help("help-pls-bproc.txt", "daemon-died-no-signal", true, - src[2], src[3]); - } else { /* died on a signal */ - opal_show_help("help-pls-bproc.txt", "daemon-died-signal", true, - src[2], src[3], src[1]); - } - rc = ORTE_ERROR; - ORTE_ERROR_LOG(rc); - orte_pls_bproc_terminate_job(jobid, NULL); - goto cleanup; - } - } - - vpid_launch = vpid_start; + + vpid_launch = map->vpid_start; /* for each application context launch the app */ for(context=0; context < map->num_apps; context++) { @@ -1286,29 +1129,24 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) { goto cleanup; } - rc = orte_pls_bproc_launch_app(cellid, jobid, map, num_processes, num_slots, - vpid_launch, vpid_start, context, - node_array[context], node_array_len[context]); + rc = orte_pls_bproc_launch_app(map, num_slots, vpid_launch, context); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); goto cleanup; } - free(node_array[context]); - vpid_launch = vpid_start + mca_pls_bproc_component.num_procs; + vpid_launch += map->apps[context]->num_procs; } - mca_pls_bproc_component.done_launching = true; cleanup: chdir(cwd_save); OBJ_RELEASE(map); - if(NULL != node_array) { - free(node_array); - } - if(NULL != node_array_len) { - free(node_array_len); + if (mca_pls_bproc_component.do_not_launch) { + /* abort the job */ + exit(-1); } + return rc; } diff --git a/orte/mca/pls/bproc/pls_bproc.h b/orte/mca/pls/bproc/pls_bproc.h index 55e7635f8d..3207912f85 100644 --- a/orte/mca/pls/bproc/pls_bproc.h +++ b/orte/mca/pls/bproc/pls_bproc.h @@ -107,43 +107,29 @@ void orte_pls_bproc_recv(int status, orte_process_name_t* sender, struct orte_pls_bproc_component_t { orte_pls_base_component_t super; /**< The base class */ - bool done_launching; - /**< Is true if we are done launching the user's app. */ char * orted; - /**< The orted executeable. This can be an absolute path, or if not found + /**< The orted executable. This can be an absolute path, or if not found * we will look for it in the user's path */ int debug; /**< If greater than 0 print debugging information */ - bool timing; - /**< If true, report launch timing info */ - int num_procs; - /**< The number of processes that are running */ int priority; /**< The priority of this component. This will be returned if we determine * that bproc is available and running on this node, */ int terminate_sig; /**< The signal that gets sent to a process to kill it. */ - size_t num_daemons; - /**< The number of daemons that are currently running. */ opal_mutex_t lock; /**< Lock used to prevent some race conditions */ opal_condition_t condition; /**< Condition that is signaled when all the daemons have died */ - orte_pointer_array_t * daemon_names; - /**< Array of the process names of all the daemons. This is used to send - * the daemons a termonation signal when all the user processes are done */ - orte_pointer_array_t* active_node_names; - /**< Array of the bproc node names of all the daemons. This is used to - * track which bproc nodes belong to us*/ - bool bynode; - /**< Indicates whether or not this application is to be mapped by node - * (if set to true) or by slot (default) - */ bool recv_issued; /**< Indicates that the comm recv for reporting abnormal proc termination * has been issued */ - + bool do_not_launch; + /**< for test purposes, do everything but the actual launch */ + orte_std_cntr_t num_daemons; + /**< track the number of daemons being launched so we can tell when + * all have reported in */ }; /** * Convenience typedef diff --git a/orte/mca/pls/bproc/pls_bproc_component.c b/orte/mca/pls/bproc/pls_bproc_component.c index 8f3ae3c180..a72e32463b 100644 --- a/orte/mca/pls/bproc/pls_bproc_component.c +++ b/orte/mca/pls/bproc/pls_bproc_component.c @@ -54,8 +54,7 @@ orte_pls_bproc_component_t mca_pls_bproc_component = { * finishes setting up the component struct. */ int orte_pls_bproc_component_open(void) { - int rc, tmp, value; - char *policy; + int rc; /* init parameters */ mca_base_component_t *c = &mca_pls_bproc_component.super.pls_version; @@ -69,43 +68,19 @@ int orte_pls_bproc_component_open(void) { false, 9, &mca_pls_bproc_component.terminate_sig); mca_base_param_reg_string(c, "orted", "Path to where orted is installed", false, false, "orted", &mca_pls_bproc_component.orted); - mca_pls_bproc_component.num_procs = 0; - mca_pls_bproc_component.num_daemons = 0; - mca_pls_bproc_component.done_launching = false; + mca_base_param_reg_int(c, "nolaunch", NULL, false, false, (int)false, + &rc); + if ((int)false == rc) { + mca_pls_bproc_component.do_not_launch = false; + } else { + mca_pls_bproc_component.do_not_launch = true; + } + mca_pls_bproc_component.recv_issued = false; OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t); - - /* we need to know the intended method for mapping processes - * so we can properly direct the bproc processes on how to compute - * their name - */ - mca_base_param_reg_string_name("rmaps", "base_schedule_policy", - "Scheduling Policy for RMAPS. [slot | node]", - false, false, "slot", &policy); - if (0 == strcmp(policy, "node")) { - mca_pls_bproc_component.bynode = true; - } else { - mca_pls_bproc_component.bynode = false; - } - tmp = mca_base_param_reg_int_name("orte", "timing", - "Request that critical timing loops be measured", - false, false, 0, &value); - if (value != 0) { - mca_pls_bproc_component.timing = true; - } else { - mca_pls_bproc_component.timing = false; - } - - /* init the list to hold the daemon names */ - rc = orte_pointer_array_init(&mca_pls_bproc_component.daemon_names, 8, 200000, 8); - /* init the list to hold the daemon names */ - rc = orte_pointer_array_init(&mca_pls_bproc_component.active_node_names, 8, 200000, 8); - if(ORTE_SUCCESS != rc) { - ORTE_ERROR_LOG(rc); - } - return rc; + return ORTE_SUCCESS; } /** @@ -114,7 +89,6 @@ int orte_pls_bproc_component_open(void) { int orte_pls_bproc_component_close(void) { OBJ_DESTRUCT(&mca_pls_bproc_component.lock); OBJ_DESTRUCT(&mca_pls_bproc_component.condition); - OBJ_RELEASE(mca_pls_bproc_component.daemon_names); return ORTE_SUCCESS; } @@ -131,7 +105,7 @@ orte_pls_base_module_t* orte_pls_bproc_init(int *priority) { return NULL; /* okay, we are in an HNP - now check to see if BProc is running here */ - ret = bproc_version(&version); + ret = bproc_version(&version); if (ret != 0) { return NULL; } diff --git a/orte/mca/rmaps/base/rmaps_base_registry_fns.c b/orte/mca/rmaps/base/rmaps_base_registry_fns.c index 0d27522af1..ef70666286 100644 --- a/orte/mca/rmaps/base/rmaps_base_registry_fns.c +++ b/orte/mca/rmaps/base/rmaps_base_registry_fns.c @@ -87,6 +87,13 @@ int orte_rmaps_base_get_job_map(orte_job_map_t **map, orte_jobid_t jobid) /* set the jobid */ mapping->job = jobid; + /* get the vpid start/range info */ + if (ORTE_SUCCESS != (rc = orte_rmgr.get_vpid_range(jobid, &mapping->vpid_start, &mapping->vpid_range))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(mapping); + return rc; + } + /* get the job segment name */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { ORTE_ERROR_LOG(rc); @@ -325,6 +332,12 @@ int orte_rmaps_base_put_job_map(orte_job_map_t *map) return ORTE_ERR_BAD_PARAM; } + /* store the vpid start/range info */ + if (ORTE_SUCCESS != (rc = orte_rmgr.set_vpid_range(map->job, map->vpid_start, map->vpid_range))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /** * allocate value array. We need to reserve one extra spot so we can set the counter * for the process INIT state to indicate that all procs are at that state. This will diff --git a/orte/mca/rmaps/round_robin/rmaps_rr.c b/orte/mca/rmaps/round_robin/rmaps_rr.c index 914d16e0b2..7fccd4fda6 100644 --- a/orte/mca/rmaps/round_robin/rmaps_rr.c +++ b/orte/mca/rmaps/round_robin/rmaps_rr.c @@ -518,8 +518,10 @@ static int orte_rmaps_rr_map(orte_jobid_t jobid, opal_list_t *attributes) /* Make assignments */ if (mca_rmaps_round_robin_component.bynode) { + map->mapping_mode = strdup("bynode"); rc = map_app_by_node(app, map, jobid, vpid_start, working_node_list, &max_used_nodes); } else { + map->mapping_mode = strdup("byslot"); rc = map_app_by_slot(app, map, jobid, vpid_start, working_node_list, &max_used_nodes); } diff --git a/orte/mca/sds/base/sds_base_put.c b/orte/mca/sds/base/sds_base_put.c index 3b49a19e4a..b6e31b36e0 100644 --- a/orte/mca/sds/base/sds_base_put.c +++ b/orte/mca/sds/base/sds_base_put.c @@ -34,8 +34,8 @@ #include "orte/mca/errmgr/base/base.h" int orte_ns_nds_env_put(const orte_process_name_t* name, - orte_vpid_t vpid_start, size_t num_procs, - char ***env) + orte_vpid_t vpid_start, size_t num_procs, + char ***env) { char* param; char* cellid; diff --git a/orte/mca/smr/base/Makefile.am b/orte/mca/smr/base/Makefile.am index 2044ea63f5..ac93af08be 100644 --- a/orte/mca/smr/base/Makefile.am +++ b/orte/mca/smr/base/Makefile.am @@ -28,6 +28,8 @@ libmca_smr_la_SOURCES += \ base/smr_base_set_proc_state.c \ base/smr_base_get_job_state.c \ base/smr_base_set_job_state.c \ + base/smr_base_get_node_state.c \ + base/smr_base_set_node_state.c \ base/smr_base_trig_init_fns.c \ base/smr_base_open.c \ base/data_type_support/smr_data_type_compare_fns.c \ diff --git a/orte/mca/smr/base/smr_base_get_node_state.c b/orte/mca/smr/base/smr_base_get_node_state.c new file mode 100644 index 0000000000..d8bcb182b6 --- /dev/null +++ b/orte/mca/smr/base/smr_base_get_node_state.c @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + */ + +/* + * includes + */ +#include "orte_config.h" + +#include + +#include "orte/mca/schema/schema.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/gpr/gpr.h" +#include "orte/mca/ns/ns.h" + +#include "orte/mca/smr/base/smr_private.h" + +int orte_smr_base_get_node_state(orte_node_state_t *state, + orte_cellid_t cell, + char *nodename) +{ + orte_gpr_value_t **values=NULL; + int rc; + orte_std_cntr_t cnt, num_tokens, i; + char **tokens; + char *keys[] = { + ORTE_NODE_STATE_KEY, + NULL + }; + orte_node_state_t *sptr; + + if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&tokens, &num_tokens, cell, nodename))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_XAND, ORTE_NODE_SEGMENT, + tokens, keys, &cnt, &values))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + + /** there should be one - and only one - value returned. if cnt is anything else, + * we have a problem + */ + if (1 != cnt) { + if (0 == cnt) { /** check for special case - didn't find the node's container */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; + goto CLEANUP; + } + /** if not 0, then we have too many - report that */ + ORTE_ERROR_LOG(ORTE_ERR_INDETERMINATE_STATE_INFO); + rc = ORTE_ERR_INDETERMINATE_STATE_INFO; + goto CLEANUP; + } + + /* there should only be one keyval returned - if not, got a problem */ + if (1 != values[0]->cnt) { + ORTE_ERROR_LOG(ORTE_ERR_INDETERMINATE_STATE_INFO); + rc = ORTE_ERR_INDETERMINATE_STATE_INFO; + goto CLEANUP; + } + + if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, values[0]->keyvals[0]->value, ORTE_NODE_STATE))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + *state = *sptr; + +CLEANUP: + for (i=0; i < num_tokens; i++) { + if (NULL != tokens[i]) free(tokens[i]); + } + free(tokens); + + if (NULL != values) { + for (i=0; i < cnt; i++) { + OBJ_RELEASE(values[i]); + } + if (NULL != values) free(values); + } + + return rc; +} diff --git a/orte/mca/smr/base/smr_base_local_functions.c b/orte/mca/smr/base/smr_base_local_functions.c index 805a555115..9591bfad94 100644 --- a/orte/mca/smr/base/smr_base_local_functions.c +++ b/orte/mca/smr/base/smr_base_local_functions.c @@ -36,21 +36,9 @@ * as some systems can call this without that support */ -int orte_smr_base_get_node_state_not_available(orte_node_state_t *state, - orte_cellid_t cell, - char *nodename) -{ - return ORTE_SUCCESS; -} - -int orte_smr_base_set_node_state_not_available(orte_cellid_t cell, - char *nodename, - orte_node_state_t state) -{ - return ORTE_SUCCESS; -} - -int orte_smr_base_begin_monitoring_not_available(orte_jobid_t job) +int orte_smr_base_begin_monitoring_not_available(orte_job_map_t *map, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag) { return ORTE_SUCCESS; } diff --git a/orte/mca/smr/base/smr_base_open.c b/orte/mca/smr/base/smr_base_open.c index 1f8d2b2982..9fb2f29e6f 100644 --- a/orte/mca/smr/base/smr_base_open.c +++ b/orte/mca/smr/base/smr_base_open.c @@ -57,8 +57,8 @@ orte_smr_base_module_t orte_smr = { orte_smr_base_get_proc_state, orte_smr_base_set_proc_state, - orte_smr_base_get_node_state_not_available, - orte_smr_base_set_node_state_not_available, + orte_smr_base_get_node_state, + orte_smr_base_set_node_state, orte_smr_base_get_job_state, orte_smr_base_set_job_state, orte_smr_base_begin_monitoring_not_available, @@ -69,6 +69,26 @@ orte_smr_base_module_t orte_smr = { orte_smr_base_module_finalize_not_available }; +/* + * OBJ constructors/desctructors for SMR types + */ +static void orte_smr_node_tracker_construct(orte_smr_node_state_tracker_t* node) +{ + node->cell = ORTE_CELLID_INVALID; + node->nodename = NULL; + node->state = ORTE_NODE_STATE_UNKNOWN; +} + +static void orte_smr_node_tracker_destruct(orte_smr_node_state_tracker_t* node) +{ + if (NULL != node->nodename) free(node->nodename); +} + +OBJ_CLASS_INSTANCE(orte_smr_node_state_tracker_t, + opal_list_item_t, + orte_smr_node_tracker_construct, + orte_smr_node_tracker_destruct); + /** * Function for finding and opening either all MCA components, or the one * that was specifically requested via a MCA parameter. @@ -79,8 +99,6 @@ int orte_smr_base_open(void) int param, value, rc; orte_data_type_t tmp; -/* fprintf(stderr,"orte_smr_base_open:enter\n"); */ - /* setup output for debug messages */ orte_smr_base.smr_output = opal_output_open(NULL); diff --git a/orte/mca/smr/base/smr_base_select.c b/orte/mca/smr/base/smr_base_select.c index db7773112d..ce432fb1cc 100644 --- a/orte/mca/smr/base/smr_base_select.c +++ b/orte/mca/smr/base/smr_base_select.c @@ -82,9 +82,9 @@ int orte_smr_base_select(void) /* If it's not the best one, finalize it */ -/* else { */ -/* component->smr_finalize(); */ -/* } */ + else { + component->smr_finalize(); + } } /* for each possible component */ diff --git a/orte/mca/smr/base/smr_base_set_node_state.c b/orte/mca/smr/base/smr_base_set_node_state.c new file mode 100644 index 0000000000..fcc81cef82 --- /dev/null +++ b/orte/mca/smr/base/smr_base_set_node_state.c @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + */ + +/* + * includes + */ +#include "orte_config.h" + +#include "orte/mca/schema/schema.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/gpr/gpr.h" + +#include "orte/mca/smr/base/smr_private.h" + +int orte_smr_base_set_node_state(orte_cellid_t cell, + char *nodename, + orte_node_state_t state) +{ + orte_gpr_value_t *value; + int rc; + + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, + ORTE_NODE_SEGMENT, 1, 0))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&(value->tokens), &(value->num_tokens), cell, nodename))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(value); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]), ORTE_NODE_STATE_KEY, ORTE_NODE_STATE, &state))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(value); + return rc; + } + + if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &value))) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(value); + + return rc; +} diff --git a/orte/mca/smr/base/smr_private.h b/orte/mca/smr/base/smr_private.h index f0c0f479b2..2358552d74 100644 --- a/orte/mca/smr/base/smr_private.h +++ b/orte/mca/smr/base/smr_private.h @@ -43,6 +43,17 @@ extern "C" { #endif +/* + * Define an object for internally tracking node states + */ +typedef struct { + opal_list_item_t super; + orte_cellid_t cell; + char *nodename; + orte_node_state_t state; +} orte_smr_node_state_tracker_t; +OBJ_CLASS_DECLARATION(orte_smr_node_state_tracker_t); + int orte_smr_base_get_proc_state(orte_proc_state_t *state, int *status, orte_process_name_t *proc); @@ -51,13 +62,13 @@ int orte_smr_base_set_proc_state(orte_process_name_t *proc, orte_proc_state_t state, int status); -int orte_smr_base_get_node_state_not_available(orte_node_state_t *state, - orte_cellid_t cell, - char *nodename); +int orte_smr_base_get_node_state(orte_node_state_t *state, + orte_cellid_t cell, + char *nodename); -int orte_smr_base_set_node_state_not_available(orte_cellid_t cell, - char *nodename, - orte_node_state_t state); +int orte_smr_base_set_node_state(orte_cellid_t cell, + char *nodename, + orte_node_state_t state); int orte_smr_base_get_job_state(orte_job_state_t *state, orte_jobid_t jobid); @@ -87,7 +98,9 @@ int orte_smr_base_job_stage_gate_subscribe(orte_jobid_t job, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata, orte_proc_state_t cb_conditions); -int orte_smr_base_begin_monitoring_not_available(orte_jobid_t job); +int orte_smr_base_begin_monitoring_not_available(orte_job_map_t *map, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag); int orte_smr_base_module_finalize_not_available (void); diff --git a/orte/mca/smr/bproc/smr_bproc.c b/orte/mca/smr/bproc/smr_bproc.c index 90f0f1d4df..12df074ff6 100644 --- a/orte/mca/smr/bproc/smr_bproc.c +++ b/orte/mca/smr/bproc/smr_bproc.c @@ -28,6 +28,9 @@ #include "orte/orte_constants.h" #include "orte/orte_types.h" +#include "opal/util/output.h" +#include "opal/class/opal_list.h" + #include "orte/util/proc_info.h" #include "orte/mca/ns/ns.h" #include "orte/mca/errmgr/errmgr.h" @@ -36,7 +39,6 @@ #include "orte/mca/smr/base/smr_private.h" #include "orte/mca/smr/bproc/smr_bproc.h" -#include "opal/util/output.h" #define BIT_MASK(bit) (bit_set)(1 << (bit)) #define EMPTY_SET (bit_set)0 @@ -53,7 +55,9 @@ | BIT_MASK(BIT_NODE_BPROC_USER) \ | BIT_MASK(BIT_NODE_BPROC_GROUP)) +/* define some local variables/types */ typedef unsigned int bit_set; +static opal_list_t active_node_list; static inline void set_bit(bit_set *set, int bit) { @@ -82,8 +86,6 @@ static inline int empty_set(bit_set set) return set == EMPTY_SET; } -static int orte_smr_bproc_get_proc_state(orte_proc_state_t *, int *, orte_process_name_t *); -static int orte_smr_bproc_set_proc_state(orte_process_name_t *, orte_proc_state_t, int); static int orte_smr_bproc_finalize(void); /** @@ -144,7 +146,9 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni) struct group *grp; orte_gpr_value_t *value; int rc; - + orte_smr_node_state_tracker_t *node; + opal_list_item_t *item; + cnt = num_bits(changes); /* @@ -153,6 +157,9 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni) if (cnt == 0) return; + /* check and update the general cluster status segment - this segment has entries + * for every node in the cluster, not just the ones we want to monitor + */ if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, ORTE_BPROC_NODE_SEGMENT, cnt, 0))) { ORTE_ERROR_LOG(rc); @@ -233,7 +240,7 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni) if (idx != cnt) { opal_output(0, "smr_bproc: internal error %d != %d\n", idx, cnt); - free(node_name); + free(node_name); OBJ_RELEASE(value); opal_event_del(&mca_smr_bproc_component.notify_event); return; @@ -257,11 +264,61 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni) ORTE_ERROR_LOG(ret); opal_event_del(&mca_smr_bproc_component.notify_event); } - - free(node_name); OBJ_RELEASE(value); + + /* now let's see if this is one of the nodes we are monitoring and + * update it IFF it the state changed to specified conditions. This + * action will trigger a callback to the right place to decide what + * to do about it + */ + if (mca_smr_bproc_component.monitoring && + is_set(changes, BIT_NODE_STATE)) { + /* see if this is a node we are monitoring */ + for (item = opal_list_get_first(&active_node_list); + item != opal_list_get_end(&active_node_list); + item = opal_list_get_next(item)) { + node = (orte_smr_node_state_tracker_t*)item; + if (0 == strcmp(node->nodename, node_name)) { + /* This is a node we are monitoring. If this is a state we care about, + * and the state has changed (so we only do this once) - trip the alert monitor + */ + if (state != node->state && + (state == ORTE_NODE_STATE_DOWN || state == ORTE_NODE_STATE_REBOOT)) { + if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, + ORTE_BPROC_NODE_SEGMENT, 1, 0))) { + ORTE_ERROR_LOG(rc); + return; + } + value->tokens[0] = strdup(ORTE_BPROC_NODE_GLOBALS); + if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]), + ORTE_BPROC_NODE_ALERT_CNTR, + ORTE_UNDEF, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(value); + return; + } + if ((rc = orte_gpr.increment_value(value)) != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + opal_event_del(&mca_smr_bproc_component.notify_event); + } + OBJ_RELEASE(value); + } + /* update our local records */ + node->state = state; + /* cleanup and return - no need to keep searching */ + free(node_name); + return; + } + } + } + + /* if this isn't someone we are monitoring, or it doesn't meet specified conditions, + * then just cleanup and leave + */ + free(node_name); } + static int do_update(struct bproc_node_set_t *ns) { int i; @@ -311,10 +368,9 @@ static void orte_smr_bproc_notify_handler(int fd, short flags, void *user) /** * Register a callback to receive BProc update notifications */ -int orte_smr_bproc_module_init(void) +static int orte_smr_bproc_module_init(void) { int rc; - struct bproc_node_set_t ns = BPROC_EMPTY_NODESET; if (mca_smr_bproc_component.debug) opal_output(0, "init smr_bproc_module\n"); @@ -323,63 +379,77 @@ int orte_smr_bproc_module_init(void) mca_smr_bproc_component.node_set.size = 0; - /* - * Set initial node status - */ - - if (bproc_nodelist(&ns) < 0) - return ORTE_ERROR; - - if (!do_update(&ns)) - bproc_nodeset_free(&ns); - - /* - * Now regiser notify event - */ - - mca_smr_bproc_component.notify_fd = bproc_notifier(); - if (mca_smr_bproc_component.notify_fd < 0) - return ORTE_ERROR; - - memset(&mca_smr_bproc_component.notify_event, 0, sizeof(opal_event_t)); - - opal_event_set( - &mca_smr_bproc_component.notify_event, - mca_smr_bproc_component.notify_fd, - OPAL_EV_READ|OPAL_EV_PERSIST, - orte_smr_bproc_notify_handler, - 0); - - opal_event_add(&mca_smr_bproc_component.notify_event, 0); - + /* construct the monitored node list so we can track who is being monitored */ + OBJ_CONSTRUCT(&active_node_list, opal_list_t); + return ORTE_SUCCESS; } -orte_smr_base_module_t orte_smr_bproc_module = { - orte_smr_bproc_get_proc_state, - orte_smr_bproc_set_proc_state, - orte_smr_base_get_node_state_not_available, - orte_smr_base_set_node_state_not_available, - orte_smr_base_get_job_state, - orte_smr_base_set_job_state, - orte_smr_base_begin_monitoring_not_available, - orte_smr_base_init_job_stage_gates, - orte_smr_base_init_orted_stage_gates, - orte_smr_base_define_alert_monitor, - orte_smr_base_job_stage_gate_subscribe, - orte_smr_bproc_finalize -}; - -static int orte_smr_bproc_get_proc_state(orte_proc_state_t *state, int *status, orte_process_name_t *proc) +/* + * Setup to begin monitoring a job + */ +int orte_smr_bproc_begin_monitoring(orte_job_map_t *map, orte_gpr_trigger_cb_fn_t cbfunc, void *user_tag) { - return orte_smr_base_get_proc_state(state, status, proc); -} + struct bproc_node_set_t ns = BPROC_EMPTY_NODESET; -static int orte_smr_bproc_set_proc_state(orte_process_name_t *proc, orte_proc_state_t state, int status) -{ - return orte_smr_base_set_proc_state(proc, state, status); + /* if our internal structures haven't been initialized, then + * set them up + */ + if (!initialized) { + orte_smr_bproc_module_init(); + initialized = true; + } + + /* setup the local monitoring list */ + for (item = opal_list_get_first(&map->nodes); + item != opal_list_get_end(&map->nodes); + item = opal_list_get_next(item)) { + node = (orte_mapped_node_t*)item; + + newnode = OBJ_NEW(orte_smr_node_state_tracker_t); + newnode->cell = node->cell; + newnode->nodename = strdup(node->nodename); + opal_list_append(&active_node_list, &newnode->super); + } + + /* define the alert monitor to call the cbfunc if we trigger the alert */ + orte_smr.define_alert_monitor(map->job, ORTE_BPROC_NODE_ALERT_TRIG, + ORTE_BPROC_NODE_ALERT_CNTR, + 0, 1, true, cbfunc, user_tag); + + /* + * Set initial node status for all nodes in the local cell. We will + * receive reports from them all, but we will only provide alerts + * on those we are actively monitoring + */ + + if (bproc_nodelist(&ns) < 0) + return ORTE_ERROR; + + if (!do_update(&ns)) + bproc_nodeset_free(&ns); + + /* + * Now register notify event + */ + + mca_smr_bproc_component.notify_fd = bproc_notifier(); + if (mca_smr_bproc_component.notify_fd < 0) + return ORTE_ERROR; + + memset(&mca_smr_bproc_component.notify_event, 0, sizeof(opal_event_t)); + + opal_event_set( + &mca_smr_bproc_component.notify_event, + mca_smr_bproc_component.notify_fd, + OPAL_EV_READ|OPAL_EV_PERSIST, + orte_smr_bproc_notify_handler, + 0); + + opal_event_add(&mca_smr_bproc_component.notify_event, 0); + + } - /** * Cleanup */ diff --git a/orte/mca/smr/bproc/smr_bproc.h b/orte/mca/smr/bproc/smr_bproc.h index 99288549be..49029c0fb7 100644 --- a/orte/mca/smr/bproc/smr_bproc.h +++ b/orte/mca/smr/bproc/smr_bproc.h @@ -38,11 +38,18 @@ extern "C" { #define ORTE_SMR_BPROC_NODE_USER "orte-node-bproc-user" #define ORTE_SMR_BPROC_NODE_GROUP "orte-node-bproc-group" +#define ORTE_BPROC_NODE_ALERT_TRIG "orte-bproc-node-alert-trig" +#define ORTE_BPROC_NODE_ALERT_CNTR "orte-bproc-node-alert-cntr" +#define ORTE_BPROC_NODE_GLOBALS "orte-node-bproc-globals" + /** * Module init/fini */ -int orte_smr_bproc_module_init(void); -int orte_smr_bproc_module_finalize(void); +int orte_smr_bproc_init(void); +int orte_smr_bproc_finalize(void); +int orte_smr_bproc_begin_monitoring(orte_job_map_t *map, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag); struct orte_smr_bproc_component_t { orte_smr_base_component_t super; @@ -50,8 +57,8 @@ struct orte_smr_bproc_component_t { int priority; opal_event_t notify_event; int notify_fd; - orte_cellid_t cellid; struct bproc_node_set_t node_set; + bool monitoring; }; typedef struct orte_smr_bproc_component_t orte_smr_bproc_component_t; diff --git a/orte/mca/smr/bproc/smr_bproc_component.c b/orte/mca/smr/bproc/smr_bproc_component.c index e0ef6e2af5..950ad878a1 100644 --- a/orte/mca/smr/bproc/smr_bproc_component.c +++ b/orte/mca/smr/bproc/smr_bproc_component.c @@ -62,6 +62,21 @@ orte_smr_bproc_component_t mca_smr_bproc_component = { } }; +orte_smr_base_module_t orte_smr_bproc_module = { + orte_smr_base_get_proc_state, + orte_smr_base_set_proc_state, + orte_smr_base_get_node_state_not_available, + orte_smr_base_set_node_state_not_available, + orte_smr_base_get_job_state, + orte_smr_base_set_job_state, + orte_smr_bproc_begin_monitoring, + orte_smr_base_init_job_stage_gates, + orte_smr_base_init_orted_stage_gates, + orte_smr_base_define_alert_monitor, + orte_smr_base_job_stage_gate_subscribe, + orte_smr_bproc_finalize +}; + /** * Utility function to register parameters */ @@ -85,6 +100,8 @@ static int orte_smr_bproc_open(void) orte_smr_bproc_param_register_int("debug", 0); mca_smr_bproc_component.priority = orte_smr_bproc_param_register_int("priority", 1); + mca_smr_bproc_component.monitoring = false; + return ORTE_SUCCESS; } @@ -94,11 +111,11 @@ static int orte_smr_bproc_open(void) static orte_smr_base_module_t* orte_smr_bproc_init(int *priority) { - if (!orte_process_info.seed) - return NULL; + if (!orte_process_info.seed) { + return NULL; + } *priority = mca_smr_bproc_component.priority; - orte_smr_bproc_module_init(); return &orte_smr_bproc_module; } diff --git a/orte/mca/smr/smr.h b/orte/mca/smr/smr.h index 0851d29f9f..ea2005a950 100644 --- a/orte/mca/smr/smr.h +++ b/orte/mca/smr/smr.h @@ -37,6 +37,7 @@ #include "orte/mca/gpr/gpr_types.h" #include "orte/mca/ns/ns_types.h" #include "orte/mca/smr/smr_types.h" +#include "orte/mca/rmaps/rmaps_types.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -145,12 +146,14 @@ typedef int (*orte_smr_base_module_define_alert_monitor_fn_t)(orte_jobid_t job, /* * Initiate monitoring of a job * This function notifies the smr that it should initiate monitoring of the specified - * jobid. It is called by the resource manager once a job has been launched. Calling - * the function, allows smr components (e.g., the BProc component that monitors daemons + * jobid. It is called by a PLS component at an appropriate point in the launch procedure. Calling + * the function allows smr components (e.g., the BProc component that monitors daemons * via the BProc-provided centralized alerting system) to make the necessary connections * for monitoring the job. */ -typedef int (*orte_smr_base_module_begin_monitoring_fn_t)(orte_jobid_t job); +typedef int (*orte_smr_base_module_begin_monitoring_fn_t)(orte_job_map_t *map, + orte_gpr_trigger_cb_fn_t cbfunc, + void *user_tag); /* * Subscribe to a job stage gate diff --git a/orte/mca/smr/smr_types.h b/orte/mca/smr/smr_types.h index ea8de07f0e..87c7d79942 100644 --- a/orte/mca/smr/smr_types.h +++ b/orte/mca/smr/smr_types.h @@ -21,6 +21,12 @@ #include "orte_config.h" +#include "opal/class/opal_list.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + /* * Process exit codes */ @@ -92,4 +98,9 @@ typedef int8_t orte_node_state_t; /** Node is rebooting (only some systems will support this; see orte_node_state_t) */ #define ORTE_NODE_STATE_REBOOT 0x03 + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + #endif