From da7df6d2571e1f7a809e779a73f2036949bfa0b2 Mon Sep 17 00:00:00 2001 From: Galen Shipman Date: Thu, 3 Aug 2006 05:29:49 +0000 Subject: [PATCH] monitor bproc node state and terminate the job if a node in our job goes down.. This commit was SVN r11109. --- orte/mca/pls/bproc/pls_bproc.c | 144 +++++++++++++++++++++++ orte/mca/pls/bproc/pls_bproc.h | 5 + orte/mca/pls/bproc/pls_bproc_component.c | 2 + 3 files changed, 151 insertions(+) diff --git a/orte/mca/pls/bproc/pls_bproc.c b/orte/mca/pls/bproc/pls_bproc.c index fa74f3bda1..93a54c4a1f 100644 --- a/orte/mca/pls/bproc/pls_bproc.c +++ b/orte/mca/pls/bproc/pls_bproc.c @@ -54,6 +54,7 @@ #include "orte/util/sys_info.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/iof/iof.h" +#include "orte/mca/gpr/gpr.h" #include "orte/mca/ns/base/base.h" #include "orte/mca/sds/base/base.h" #include "orte/mca/oob/base/base.h" @@ -661,6 +662,121 @@ cleanup: } return rc; } +static void orte_pls_bproc_check_node_state(orte_gpr_notify_data_t *notify_data, + void *user_tag) { + orte_gpr_value_t **values; + bool dead_node = false; + char *dead_node_name; + int i,j; + + printf("inside check node state... \n"); + + /* first see if node is in + ORTE_NODE_STATE_DOWN or + ORTE_NODE_STATE_REBOOT */ + + values = (orte_gpr_value_t**)(notify_data->values)->addr; + for( j = 0; j < notify_data->cnt; j++) { + dead_node = false; + for( i = 0; i < values[j]->cnt; i++) { + orte_gpr_keyval_t* keyval = values[j]->keyvals[i]; + if(strcmp(keyval->key, ORTE_NODE_STATE_KEY) == 0) { + orte_node_state_t *node_state; + int ret; + if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &node_state, keyval->value, ORTE_NODE_STATE))) { + return; + } + if( *node_state == ORTE_NODE_STATE_DOWN || + *node_state == ORTE_NODE_STATE_REBOOT) { + dead_node = true; + printf("found a dead node state.. \n"); + } + } else if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) { + char* tmp_name; + int ret; + if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &tmp_name, keyval->value, ORTE_STRING))) { + return; + } + else { + dead_node_name = strdup(tmp_name); + printf("found a node named %s\n", dead_node_name); + } + } + } + printf("found a node named %s is dead? %d\n", dead_node_name, dead_node); + if(dead_node) { + /* gotta see if this node belongs to us... arg.. */ + /* also, we know by order of creation that the node state */ + /* comes before the node name.. see soh_bproc.c */ + size_t name_idx; + for (name_idx = 0; + name_idx < orte_pointer_array_get_size(mca_pls_bproc_component.active_node_names); + name_idx++) { + char* node_name = (char*) orte_pointer_array_get_item(mca_pls_bproc_component.active_node_names, name_idx); + if(strcmp(node_name, dead_node_name) == 0){ + printf("this dead node %s belongs to us... \n", node_name); + /* one of our nodes up and died... */ + /* not much to do other than die.... */ + int ret, exit_status = ORTE_SUCCESS; + char *segment = NULL; + orte_gpr_value_t** seg_values = NULL; + size_t k, l, num_values = 0; + + /********************** + * Job Info segment + **********************/ + segment = strdup(ORTE_JOBINFO_SEGMENT); + + if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR, + segment, + NULL, + NULL, + &num_values, + &seg_values ) ) ) { + + } + + /* + * kill all the jobids that are not zero + */ + for(i = 0; i < num_values; ++i) { + orte_gpr_value_t* value = values[i]; + orte_jobid_t jobid; + orte_schema.extract_jobid_from_segment_name(&jobid, value->tokens[0]); + printf("killing jobid %d\n", jobid); + if(jobid != 0) + orte_pls_bproc_terminate_job(jobid); + } + /* + * and kill everyone else + */ + printf("and go bye-bye...\n"); + orte_pls_bproc_terminate_job(0); + /* shouldn't ever get here.. */ + exit(1); + } + + + } + } + } +} + +static int orte_pls_bproc_monitor_nodes() { + orte_gpr_subscription_id_t id; + return orte_gpr.subscribe_1(&id, + NULL, + NULL, + ORTE_GPR_NOTIFY_VALUE_CHG, + ORTE_GPR_TOKENS_OR | + ORTE_GPR_KEYS_OR, + ORTE_NODE_SEGMENT, + NULL, + strdup(ORTE_NODE_STATE_KEY), + orte_pls_bproc_check_node_state, + NULL); + +} /** * Launches the application processes @@ -898,6 +1014,34 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) { num_processes += rc; context++; } + + idx = 0; + for (item = opal_list_get_first(&mapping); + item != opal_list_get_end(&mapping); + item = opal_list_get_next(item)) { + orte_rmaps_base_map_t *map = (orte_rmaps_base_map_t*) item; + for (item2 = opal_list_get_first(&map->nodes); + item2 != opal_list_get_end(&map->nodes); + item2 = opal_list_get_next(item2)) { + orte_ras_node_t* node = (orte_ras_node_t*) item2; + + rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.active_node_names, + strdup(node->node_name)); + + + } + } + + /* setup subscription for each node so we can detect + when the node's state changes, usefull for aborting when + a bproc node up and dies */ + + rc = orte_pls_bproc_monitor_nodes(); + + if(ORTE_SUCCESS != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } /* launch the daemons on all the nodes which have processes assign to them */ rc = orte_pls_bproc_launch_daemons(cellid, &map->app->env, node_array, diff --git a/orte/mca/pls/bproc/pls_bproc.h b/orte/mca/pls/bproc/pls_bproc.h index bf6696eae0..ca9ca18ae1 100644 --- a/orte/mca/pls/bproc/pls_bproc.h +++ b/orte/mca/pls/bproc/pls_bproc.h @@ -100,6 +100,11 @@ struct orte_pls_bproc_component_t { orte_pointer_array_t * daemon_names; /**< Array of the process names of all the daemons. This is used to send * the daemons a termonation signal when all the user processes are done */ + orte_pointer_array_t* active_node_names; + /**< Array of the bproc node names of all the daemons. This is used to + * track which bproc nodes belong to us*/ + + }; /** * Convenience typedef diff --git a/orte/mca/pls/bproc/pls_bproc_component.c b/orte/mca/pls/bproc/pls_bproc_component.c index 8bf259629c..b5254e9530 100644 --- a/orte/mca/pls/bproc/pls_bproc_component.c +++ b/orte/mca/pls/bproc/pls_bproc_component.c @@ -74,6 +74,8 @@ int orte_pls_bproc_component_open(void) { OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t); /* init the list to hold the daemon names */ rc = orte_pointer_array_init(&mca_pls_bproc_component.daemon_names, 8, 200000, 8); + /* init the list to hold the daemon names */ + rc = orte_pointer_array_init(&mca_pls_bproc_component.active_node_names, 8, 200000, 8); if(ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); }