1
1

Repair support for Bproc 4 on 64-bit systems. Update the SMR framework to actually support the begin_monitoring API. Implement the get/set_node_state APIs.

This commit was SVN r12864.
Этот коммит содержится в:
Ralph Castain 2006-12-15 02:34:14 +00:00
родитель b99e5a71d1
Коммит 64ec238b7b
20 изменённых файлов: 684 добавлений и 563 удалений

Просмотреть файл

@ -55,6 +55,8 @@ extern "C" {
opal_condition_t orted_cmd_cond; opal_condition_t orted_cmd_cond;
/** reuse daemons flag */ /** reuse daemons flag */
bool reuse_daemons; bool reuse_daemons;
/** request for timing measurement reports */
bool timing;
} orte_pls_base_t; } orte_pls_base_t;
/** /**

Просмотреть файл

@ -76,6 +76,16 @@ int orte_pls_base_open(void)
orte_pls_base.reuse_daemons = true; orte_pls_base.reuse_daemons = true;
} }
/* check for timing requests */
mca_base_param_reg_int_name("orte", "timing",
"Request that critical timing loops be measured",
false, false, 0, &value);
if (value != 0) {
orte_pls_base.timing = true;
} else {
orte_pls_base.timing = false;
}
/* Open up all the components that we can find */ /* Open up all the components that we can find */
if (ORTE_SUCCESS != if (ORTE_SUCCESS !=

Просмотреть файл

@ -76,9 +76,10 @@
/** /**
* Our current evironment * Our current evironment
*/ */
#if !defined(__WINDOWS__)
extern char **environ; extern char **environ;
#endif /* !defined(__WINDOWS__) */
static bool daemons_launched;
static bool bynode;
#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS #if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
int orte_pls_bproc_launch_threaded(orte_jobid_t); int orte_pls_bproc_launch_threaded(orte_jobid_t);
@ -103,10 +104,8 @@ orte_pls_base_module_t orte_pls_bproc_module = {
}; };
static int orte_pls_bproc_node_array(orte_job_map_t* map, static int orte_pls_bproc_node_list(orte_job_map_t *map,
int ** node_array, int * node_array_len); int *node_array, int * num_nodes,
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
int ** node_list, int * num_nodes,
int num_procs); int num_procs);
static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io, static int orte_pls_bproc_setup_io(orte_jobid_t jobid, struct bproc_io_t * io,
int node_rank, int app_context); int node_rank, int app_context);
@ -121,92 +120,36 @@ static int bproc_vexecmove(int nnodes, int *nodes, int *pids, const char *cmd,
char * const argv[], char * envp[]); char * const argv[], char * envp[]);
#endif #endif
static void orte_pls_bproc_setup_env(char *** env); static void orte_pls_bproc_setup_env(char *** env);
static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp);
orte_job_map_t *map, static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots,
orte_vpid_t global_vpid_start, orte_vpid_t vpid_start, int app_context);
orte_jobid_t jobid, int* num_daemons);
static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
orte_job_map_t* map, int num_processes,
int num_slots,
orte_vpid_t vpid_start,
orte_vpid_t global_vpid_start,
int app_context,
int * node_array, int node_array_len);
/** /**
* creates an array that is indexed by the node number and each entry contains the * Creates a list of nodes from a job map that should participate in the next launch cycle.
* number of processes that will be launched on that node. * @param map a pointer to the job map
* * @param node_array a pointer to an integer array that will contain the node names
* @param map single context mapping * @param num_nodes a pointer to the place where we will store the number of nodes in the array
* @param node_array a pointer to put the node array into * @param num_procs the number of processes that a node must have to be placed on the list
* @param node_array_len returns the length of the array
* @retval >=0 the number of processes
* @retval <0 orte err
*/ */
static int orte_pls_bproc_node_array(orte_job_map_t* map, static int orte_pls_bproc_node_list(orte_job_map_t *map, int *node_array, int *num_nodes, int num_procs)
int ** node_array, int * node_array_len) { {
opal_list_item_t* item; opal_list_item_t *item;
int num_procs = 0; orte_mapped_node_t *node;
int num_on_node;
OPAL_TRACE(1);
*node_array_len = 0;
for(item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
item = opal_list_get_next(item)) {
if(*node_array_len < atol(((orte_mapped_node_t*)item)->nodename)) {
*node_array_len = atol(((orte_mapped_node_t*)item)->nodename);
}
}
(*node_array_len)++;
/* build the node array */
*node_array = (int*)malloc(sizeof(int) * *node_array_len);
if(NULL == *node_array) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memset(*node_array, 0, sizeof(int) * *node_array_len);
for(item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
item = opal_list_get_next(item)) {
orte_mapped_node_t* node = (orte_mapped_node_t*)item;
num_on_node = opal_list_get_size(&node->procs);
(*node_array)[atol(node->nodename)] += num_on_node;
num_procs += num_on_node;
}
return num_procs;
}
/**
* Creates a bproc nodelist from a node array.
* @param node_array an array of bproc nodes that contains the number of processes
* to be launched on each node
* @param node_array_len the length of the node array
* @param node_list a pointer that the bproc node list will be returned in
* @param num_nodes a pointer to return the number of nodes in the node list
* @param num_procs the number of processes that a node must have to be on the
* node list
*/
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
int ** node_list, int * num_nodes,
int num_procs) {
int node;
OPAL_TRACE(1); OPAL_TRACE(1);
/* initialize all */
*num_nodes = 0; *num_nodes = 0;
*node_list = (int*)malloc(sizeof(int) * node_array_len); memset((void*)node_array, -1, sizeof(int) * map->num_nodes);
if(NULL == *node_list) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* build the node list */ /* build the node list */
for(node = 0; node < node_array_len; node++) { for(item = opal_list_get_first(&map->nodes);
if(node_array[node] >= num_procs) { item != opal_list_get_end(&map->nodes);
(*node_list)[(*num_nodes)++] = node; item = opal_list_get_next(item)) {
node = (orte_mapped_node_t*)item;
if (node->num_procs >= num_procs) {
node_array[(*num_nodes)++] = atoi(node->nodename);
} }
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -328,7 +271,7 @@ static void orte_pls_bproc_waitpid_daemon_cb(pid_t wpid, int status, void *data)
OPAL_TRACE(1); OPAL_TRACE(1);
if(!mca_pls_bproc_component.done_launching) { if(!daemons_launched) {
/* if a daemon exits before we are done launching the user apps we send a /* if a daemon exits before we are done launching the user apps we send a
* message to ourself so we will break out of the receive loop and exit */ * message to ourself so we will break out of the receive loop and exit */
orte_buffer_t ack; orte_buffer_t ack;
@ -438,12 +381,6 @@ static void orte_pls_bproc_setup_env(char *** env)
opal_setenv(var,orte_process_info.ns_replica_uri, true, env); opal_setenv(var,orte_process_info.ns_replica_uri, true, env);
free(var); free(var);
/* make sure the frontend hostname does not get pushed out to the backend */
var = mca_base_param_environ_variable("orte", "base", "nodename");
opal_unsetenv(var, env);
free(var);
opal_unsetenv("HOSTNAME", env);
/* make sure the username used to create the bproc directory is the same on /* make sure the username used to create the bproc directory is the same on
* the backend as the frontend */ * the backend as the frontend */
var = mca_base_param_environ_variable("pls","bproc","username"); var = mca_base_param_environ_variable("pls","bproc","username");
@ -467,13 +404,19 @@ static void orte_pls_bproc_setup_env(char *** env)
free(param); free(param);
free(var); free(var);
/* merge in environment */ /* merge in environment - merge ensures we don't overwrite anything we just set */
merged = opal_environ_merge(*env, environ); merged = opal_environ_merge(*env, environ);
opal_argv_free(*env); opal_argv_free(*env);
*env = merged; *env = merged;
/* make sure hostname doesn't get pushed to backend node */ /* make sure hostname doesn't get pushed to backend node */
opal_unsetenv("HOSTNAME", env); opal_unsetenv("HOSTNAME", env);
/* make sure the frontend hostname does not get pushed out to the backend */
var = mca_base_param_environ_variable("orte", "base", "nodename");
opal_unsetenv(var, env);
free(var);
} }
/** /**
@ -489,10 +432,7 @@ static void orte_pls_bproc_setup_env(char *** env)
* @retval ORTE_SUCCESS * @retval ORTE_SUCCESS
* @retval error * @retval error
*/ */
static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp, static int orte_pls_bproc_launch_daemons(orte_job_map_t *map, char ***envp) {
orte_job_map_t *map,
orte_vpid_t global_vpid_start,
orte_jobid_t jobid, int *num_launched) {
int * daemon_list = NULL; int * daemon_list = NULL;
int num_daemons = 0; int num_daemons = 0;
int rc, i; int rc, i;
@ -503,9 +443,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
char * var; char * var;
int stride; int stride;
char * orted_path; char * orted_path;
orte_jobid_t daemon_jobid; orte_vpid_t daemon_vpid_start;
orte_process_name_t * proc_name;
orte_vpid_t daemon_vpid_start = 0;
orte_std_cntr_t idx; orte_std_cntr_t idx;
struct stat buf; struct stat buf;
opal_list_t daemons; opal_list_t daemons;
@ -515,12 +453,15 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
OPAL_TRACE(1); OPAL_TRACE(1);
if (mca_pls_bproc_component.timing) { if (orte_pls_base.timing) {
if (0 != gettimeofday(&joblaunchstart, NULL)) { if (0 != gettimeofday(&joblaunchstart, NULL)) {
opal_output(0, "pls_bproc: could not obtain start time"); opal_output(0, "pls_bproc: could not obtain start time");
} }
} }
/* indicate that the daemons have not completely launched yet */
daemons_launched = false;
/* setup a list that will contain the info for all the daemons /* setup a list that will contain the info for all the daemons
* so we can store it on the registry when done * so we can store it on the registry when done
*/ */
@ -530,7 +471,13 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
* their names so we can pass that to bproc - populate the list * their names so we can pass that to bproc - populate the list
* with the node names * with the node names
*/ */
num_daemons = opal_list_get_size(&map->nodes); num_daemons = map->num_nodes;
if (0 == num_daemons) {
/* nothing to do */
OBJ_DESTRUCT(&daemons);
return ORTE_SUCCESS;
}
if(NULL == (daemon_list = (int*)malloc(sizeof(int) * num_daemons))) { if(NULL == (daemon_list = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup; goto cleanup;
@ -544,26 +491,28 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
daemon_list[i++] = atoi(node->nodename); daemon_list[i++] = atoi(node->nodename);
} }
/* allocate storage to save the daemon pids */ /* allocate storage for bproc to return the daemon pids */
if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) { if(NULL == (pids = (int*)malloc(sizeof(int) * num_daemons))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup; goto cleanup;
} }
/* allocate a range of vpids for the daemons */ /* allocate a range of vpids for the daemons */
daemon_jobid = orte_process_info.my_name->jobid; rc = orte_ns.reserve_range(0, num_daemons, &daemon_vpid_start);
rc = orte_ns.reserve_range(daemon_jobid, num_daemons, &daemon_vpid_start);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* setup the orted triggers for passing their launch info */ /* setup the orted triggers for passing their launch info */
if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_daemons, NULL, NULL))) { if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(map->job, num_daemons, NULL, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* setup the daemon environment */
orte_pls_bproc_setup_env(envp);
/* daemons calculate their process name using a "stride" of one, so /* daemons calculate their process name using a "stride" of one, so
* push that value into their environment */ * push that value into their environment */
stride = 1; stride = 1;
@ -574,8 +523,8 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
free(var); free(var);
/* set up the base environment so the daemons can get their names once launched */ /* set up the base environment so the daemons can get their names once launched */
rc = orte_ns_nds_bproc_put(cellid, daemon_jobid, daemon_vpid_start, rc = orte_ns_nds_bproc_put(ORTE_PROC_MY_NAME->cellid, 0, daemon_vpid_start,
global_vpid_start, num_daemons, envp); 0, num_daemons, envp);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
@ -592,7 +541,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
#endif #endif
opal_argv_append(&argc, &argv, "--bootproxy"); opal_argv_append(&argc, &argv, "--bootproxy");
orte_ns.convert_jobid_to_string(&param, jobid); orte_ns.convert_jobid_to_string(&param, map->job);
opal_argv_append(&argc, &argv, param); opal_argv_append(&argc, &argv, param);
free(param); free(param);
@ -634,17 +583,20 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
} }
/* launch the daemons */ /* launch the daemons */
mca_pls_bproc_component.num_daemons += num_daemons; if (orte_pls_base.timing) {
if (mca_pls_bproc_component.timing) {
if (0 != gettimeofday(&launchstart, NULL)) { if (0 != gettimeofday(&launchstart, NULL)) {
opal_output(0, "pls_bproc: could not obtain start time"); opal_output(0, "pls_bproc: could not obtain start time");
} }
} }
rc = bproc_vexecmove(num_daemons, daemon_list, pids, orted_path, argv, *envp); if (mca_pls_bproc_component.do_not_launch) {
for (i=0; i < num_daemons; i++) pids[i] = i+1;
rc = num_daemons;
} else {
rc = bproc_vexecmove(num_daemons, daemon_list, pids, orted_path, argv, *envp);
}
if (mca_pls_bproc_component.timing) { if (orte_pls_base.timing) {
if (0 != gettimeofday(&launchstop, NULL)) { if (0 != gettimeofday(&launchstop, NULL)) {
opal_output(0, "pls_bproc: could not obtain stop time"); opal_output(0, "pls_bproc: could not obtain stop time");
} else { } else {
@ -657,10 +609,10 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
if(rc != num_daemons) { if(rc != num_daemons) {
opal_show_help("help-pls-bproc.txt", "daemon-launch-number", true, opal_show_help("help-pls-bproc.txt", "daemon-launch-number", true,
num_daemons, rc, orted_path); num_daemons, rc, orted_path);
mca_pls_bproc_component.num_daemons -= num_daemons;
rc = ORTE_ERROR; rc = ORTE_ERROR;
goto cleanup; goto cleanup;
} }
if(0 < mca_pls_bproc_component.debug) { if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n", opal_output(0, "PLS_BPROC DEBUG: %d daemons launched. First pid: %d\n",
rc, *pids); rc, *pids);
@ -674,33 +626,27 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} else { } else {
rc = orte_ns.create_process_name(&proc_name, cellid, daemon_jobid,
daemon_vpid_start + i);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.daemon_names,
proc_name);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 > asprintf(&param, "%d", daemon_list[i])) { if (0 > asprintf(&param, "%d", daemon_list[i])) {
rc = ORTE_ERR_OUT_OF_RESOURCE; rc = ORTE_ERR_OUT_OF_RESOURCE;
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto cleanup; goto cleanup;
} }
rc = orte_pls_bproc_set_node_pid(cellid, param, jobid, pids[i]); rc = orte_pls_bproc_set_node_pid(ORTE_PROC_MY_NAME->cellid, param, map->job, pids[i]);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
dmn = OBJ_NEW(orte_pls_daemon_info_t); dmn = OBJ_NEW(orte_pls_daemon_info_t);
orte_dss.copy((void**)&(dmn->name), proc_name, ORTE_NAME); rc = orte_ns.create_process_name(&(dmn->name), ORTE_PROC_MY_NAME->cellid, 0,
dmn->cell = cellid; daemon_vpid_start + i);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
dmn->cell = dmn->name->cellid;
dmn->nodename = strdup(param); dmn->nodename = strdup(param);
dmn->active_job = jobid; dmn->active_job = map->job;
opal_list_append(&daemons, &dmn->super); opal_list_append(&daemons, &dmn->super);
free(param); free(param);
@ -711,22 +657,63 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons))) { if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
*num_launched = num_daemons;
/* setup the callbacks - this needs to be done *after* we store the /* setup the callbacks - this needs to be done *after* we store the
* daemon info so that short-lived apps don't cause mpirun to * daemon info so that short-lived apps don't cause mpirun to
* try and terminate the orteds before we record them * try and terminate the orteds before we record them
*/ */
for (i=0; i < num_daemons; i++) { if (!mca_pls_bproc_component.do_not_launch) {
rc = orte_wait_cb(pids[i], orte_pls_bproc_waitpid_daemon_cb, for (i=0; i < num_daemons; i++) {
&daemon_list[i]); rc = orte_wait_cb(pids[i], orte_pls_bproc_waitpid_daemon_cb,
if(ORTE_SUCCESS != rc) { &daemon_list[i]);
ORTE_ERROR_LOG(rc); if(ORTE_SUCCESS != rc) {
goto cleanup; ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* wait for communication back from the daemons, which indicates they have
* sucessfully set up the pty/pipes and IO forwarding which the user apps
* will use */
for(i = 0; i < num_daemons; i++) {
orte_buffer_t ack;
int src[4];
OBJ_CONSTRUCT(&ack, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &ack, ORTE_RML_TAG_BPROC);
if(0 > rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&ack);
goto cleanup;
}
idx = 4;
rc = orte_dss.unpack(&ack, &src, &idx, ORTE_INT);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&ack);
if(-1 == src[0]) {
/* one of the daemons has failed to properly launch. The error is sent
* by orte_pls_bproc_waitpid_daemon_cb */
if(-1 == src[1]) { /* did not die on a signal */
opal_show_help("help-pls-bproc.txt", "daemon-died-no-signal", true,
src[2], src[3]);
} else { /* died on a signal */
opal_show_help("help-pls-bproc.txt", "daemon-died-signal", true,
src[2], src[3], src[1]);
}
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
orte_pls_bproc_terminate_job(map->job, NULL);
goto cleanup;
}
} }
} }
if (mca_pls_bproc_component.timing) { /* indicate that the daemons have now launched */
daemons_launched = true;
if (orte_pls_base.timing) {
if (0 != gettimeofday(&launchstop, NULL)) { if (0 != gettimeofday(&launchstop, NULL)) {
opal_output(0, "pls_bproc: could not obtain stop time"); opal_output(0, "pls_bproc: could not obtain stop time");
} else { } else {
@ -755,126 +742,39 @@ cleanup:
} }
static void
orte_pls_bproc_check_node_state(orte_gpr_notify_data_t *notify_data,
void *user_tag)
{
orte_gpr_value_t **values;
bool dead_node = false;
char *dead_node_name;
orte_std_cntr_t i, j;
OPAL_TRACE(1);
/* first see if node is in
ORTE_NODE_STATE_DOWN or
ORTE_NODE_STATE_REBOOT */
values = (orte_gpr_value_t**)(notify_data->values)->addr;
for( j = 0; j < notify_data->cnt; j++) {
dead_node = false;
for( i = 0; i < values[j]->cnt; i++) {
orte_gpr_keyval_t* keyval = values[j]->keyvals[i];
if(strcmp(keyval->key, ORTE_NODE_STATE_KEY) == 0) {
orte_node_state_t *node_state;
int ret;
if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &node_state, keyval->value, ORTE_NODE_STATE))) {
return;
}
if( *node_state == ORTE_NODE_STATE_DOWN ||
*node_state == ORTE_NODE_STATE_REBOOT) {
dead_node = true;
printf("found a dead node state.. \n");
}
} else if(strcmp(keyval->key, ORTE_NODE_NAME_KEY) == 0) {
char* tmp_name;
int ret;
if( ORTE_SUCCESS != (ret = orte_dss.get( (void **) &tmp_name, keyval->value, ORTE_STRING))) {
return;
}
else {
dead_node_name = strdup(tmp_name);
printf("found a node named %s\n", dead_node_name);
}
}
}
printf("found a node named %s is dead? %d\n", dead_node_name, dead_node);
if(dead_node) {
/* gotta see if this node belongs to us... arg.. */
/* also, we know by order of creation that the node state */
/* comes before the node name.. see smr_bproc.c */
orte_std_cntr_t name_idx;
for (name_idx = 0;
name_idx < orte_pointer_array_get_size(mca_pls_bproc_component.active_node_names);
name_idx++) {
char* node_name = (char*) orte_pointer_array_get_item(mca_pls_bproc_component.active_node_names, name_idx);
if(strcmp(node_name, dead_node_name) == 0){
/* one of our nodes up and died... */
/* not much to do other than die.... */
int ret = ORTE_SUCCESS;
char *segment = NULL;
orte_gpr_value_t** seg_values = NULL;
orte_std_cntr_t num_values = 0;
/**********************
* Job Info segment
**********************/
segment = strdup(ORTE_JOBINFO_SEGMENT);
if( ORTE_SUCCESS != (ret = orte_gpr.get(ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_OR,
segment,
NULL,
NULL,
&num_values,
&seg_values ) ) ) {
}
/*
* kill all the jobids that are not zero
*/
for(i = 0; i < num_values; ++i) {
orte_gpr_value_t* value = values[i];
orte_jobid_t jobid;
orte_schema.extract_jobid_from_segment_name(&jobid, value->tokens[0]);
printf("killing jobid %d\n", jobid);
if(jobid != 0)
orte_pls_bproc_terminate_job(jobid, NULL);
}
/*
* and kill everyone else
*/
printf("and go bye-bye...\n");
orte_pls_bproc_terminate_job(0, NULL);
/* shouldn't ever get here.. */
exit(1);
}
}
}
}
}
static int static int
orte_pls_bproc_monitor_nodes(void) orte_pls_bproc_node_failed(orte_gpr_notify_msg_t *msg)
{ {
orte_gpr_subscription_id_t id; orte_jobid_t job;
OPAL_TRACE(1); /* respond to a node failure reported by the smr. We know that
* this function will only be called when one or more nodes in
* our allocation fails, so we just need to respond to it. The
* complication is that the failure could occur in any of several
* states:
* (a) before we start to launch the daemons
* (b) while we are launching the daemons
* (c) after the daemons are launched, while we are launching the app
* (d) during app launch
* (e) after app launch, but before completion
* (f) while the app is finalizing
* (g) while we are cleaning up after the app has finalized
*/
printf("mpirun has detected a dead node within the job and is terminating\n");
/* extract the jobid from the returned data */
orte_schema.extract_jobid_from_std_trigger_name(&job, msg->trigger);
/* terminate all jobs in the in the job family */
orte_pls_bproc_terminate_job(job, NULL);
/* kill the daemons */
orte_pls_bproc_terminate_job(0, NULL);
/* shouldn't ever get here.. */
exit(1);
return orte_gpr.subscribe_1(&id,
NULL,
NULL,
ORTE_GPR_NOTIFY_VALUE_CHG,
ORTE_GPR_TOKENS_OR |
ORTE_GPR_KEYS_OR,
ORTE_BPROC_NODE_SEGMENT,
NULL,
ORTE_NODE_STATE_KEY,
orte_pls_bproc_check_node_state,
NULL);
} }
@ -892,17 +792,13 @@ orte_pls_bproc_monitor_nodes(void)
* @retval ORTE_SUCCESS * @retval ORTE_SUCCESS
* @retval error * @retval error
*/ */
static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid, static int orte_pls_bproc_launch_app(orte_job_map_t* map, int num_slots,
orte_job_map_t* map, int num_processes, int num_slots, orte_vpid_t vpid_start, int app_context) {
orte_vpid_t vpid_start, int *node_array, num_nodes, cycle;
orte_vpid_t global_vpid_start,
int app_context, int * node_array,
int node_array_len) {
int * node_list = NULL;
int num_nodes, cycle;
int rc, i, j, stride; int rc, i, j, stride;
int * pids = NULL; orte_std_cntr_t num_processes;
char * var, * param; int *pids = NULL;
char *var, *param;
orte_process_name_t * proc_name; orte_process_name_t * proc_name;
struct bproc_io_t bproc_io[3]; struct bproc_io_t bproc_io[3];
char **env; char **env;
@ -910,13 +806,8 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
OPAL_TRACE(1); OPAL_TRACE(1);
if(NULL == (pids = (int*)malloc(sizeof(int) * node_array_len))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* point to the env array for this app_context */ /* point to the env array for this app_context */
env = map->apps[app_context]->env; env = opal_argv_copy(map->apps[app_context]->env);
/* set up app context */ /* set up app context */
asprintf(&param, "%d", app_context); asprintf(&param, "%d", app_context);
@ -926,7 +817,7 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
free(var); free(var);
/* set the vpid-to-vpid stride based on the mapping mode */ /* set the vpid-to-vpid stride based on the mapping mode */
if (mca_pls_bproc_component.bynode) { if (bynode) {
/* we are mapping by node, so we want to set the stride /* we are mapping by node, so we want to set the stride
* length (i.e., the step size between vpids that is used * length (i.e., the step size between vpids that is used
* to compute the process name) to 1 * to compute the process name) to 1
@ -946,6 +837,13 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
free(param); free(param);
free(var); free(var);
/* set up the node_array to handle the launch */
node_array = (int*)malloc(map->num_nodes * sizeof(int));
if (NULL == node_array) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* initialize the cycle count. Computing the process name under Bproc /* initialize the cycle count. Computing the process name under Bproc
* is a complex matter when mapping by slot as Bproc's inherent * is a complex matter when mapping by slot as Bproc's inherent
* methodology is to do everything by node. When mapping by slot, the * methodology is to do everything by node. When mapping by slot, the
@ -965,8 +863,9 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
/* launch the processes */ /* launch the processes */
i = 1; i = 1;
rc = orte_pls_bproc_node_list(node_array, node_array_len, &node_list, num_processes = map->vpid_range;
&num_nodes, i);
rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
@ -980,19 +879,19 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
"\tlaunching cycle %d", i); "\tlaunching cycle %d", i);
for (dbg=0; dbg<num_nodes; dbg++) { for (dbg=0; dbg<num_nodes; dbg++) {
opal_output_verbose(1, orte_pls_base.pls_output, opal_output_verbose(1, orte_pls_base.pls_output,
"\t\tlaunching on node %d", node_list[dbg]); "\t\tlaunching on node %d", node_array[dbg]);
} }
} }
/* setup environment so the procs can figure out their names */ /* setup environment so the procs can figure out their names */
rc = orte_ns_nds_bproc_put(cellid, jobid, vpid_start, global_vpid_start, rc = orte_ns_nds_bproc_put(ORTE_PROC_MY_NAME->cellid, map->job, vpid_start, map->vpid_start,
num_processes, &env); num_processes, &env);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
rc = orte_pls_bproc_setup_io(jobid, bproc_io, i - 1, app_context); rc = orte_pls_bproc_setup_io(map->job, bproc_io, i - 1, app_context);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
@ -1000,9 +899,24 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
if(0 < mca_pls_bproc_component.debug) { if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "pls_bproc: launching %d processes:", num_nodes); opal_output(0, "pls_bproc: launching %d processes:", num_nodes);
} }
rc = bproc_vexecmove_io(num_nodes, node_list, pids, bproc_io, 3,
map->apps[app_context]->app, /* allocate space for bproc to return the pids */
map->apps[app_context]->argv, env); pids = (int*)malloc(num_nodes * sizeof(int));
if (NULL == pids) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
if (mca_pls_bproc_component.do_not_launch) {
for (j=0; j < num_nodes; j++) pids[j] = j+1;
rc = num_nodes;
} else {
rc = bproc_vexecmove_io(num_nodes, node_array, pids, bproc_io, 3,
map->apps[app_context]->app,
map->apps[app_context]->argv, env);
}
if(0 < mca_pls_bproc_component.debug) { if(0 < mca_pls_bproc_component.debug) {
opal_output(0, "pls_bproc: %d processes launched. First pid: %d", opal_output(0, "pls_bproc: %d processes launched. First pid: %d",
rc, *pids); rc, *pids);
@ -1013,37 +927,39 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
rc = ORTE_ERROR; rc = ORTE_ERROR;
goto cleanup; goto cleanup;
} }
for(j = 0; j < num_nodes; j++) { for(j = 0; j < num_nodes; j++) {
if(0 >= pids[j]) { if(0 >= pids[j]) {
opal_show_help("help-pls-bproc.txt", "proc-launch-bad-pid", true, opal_show_help("help-pls-bproc.txt", "proc-launch-bad-pid", true,
node_list[j], pids[j], errno, map->apps[app_context]->app); node_array[j], pids[j], errno, map->apps[app_context]->app);
rc = ORTE_ERROR; rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} else { } else {
mca_pls_bproc_component.num_procs++; rc = orte_ns.create_process_name(&proc_name, ORTE_PROC_MY_NAME->cellid, map->job,
rc = orte_ns.create_process_name(&proc_name, cellid, jobid,
vpid_start + j); vpid_start + j);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
orte_pls_bproc_set_proc_pid(proc_name, pids[j], node_list[j]); orte_pls_bproc_set_proc_pid(proc_name, pids[j], node_array[j]);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name); if (!mca_pls_bproc_component.do_not_launch) {
if(ORTE_SUCCESS != rc) { rc = orte_wait_cb(pids[j], orte_pls_bproc_waitpid_cb, proc_name);
ORTE_ERROR_LOG(rc); if(ORTE_SUCCESS != rc) {
goto cleanup; ORTE_ERROR_LOG(rc);
goto cleanup;
}
} }
} }
} }
free(node_list); free(pids);
node_list = NULL; pids = NULL;
i++; i++;
if (mca_pls_bproc_component.bynode) { if (bynode) {
/* we are mapping by node, so the vpid_start must increment by /* we are mapping by node, so the vpid_start must increment by
* the number of nodes * the number of nodes
*/ */
@ -1064,8 +980,7 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
} }
} }
rc = orte_pls_bproc_node_list(node_array, node_array_len, &node_list, rc = orte_pls_bproc_node_list(map, node_array, &num_nodes, i);
&num_nodes, i);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
@ -1073,12 +988,13 @@ static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
} }
cleanup: cleanup:
if(NULL != node_list) {
free(node_list);
}
if(NULL != pids) { if(NULL != pids) {
free(pids); free(pids);
} }
free(node_array);
if (NULL != env) opal_argv_free(env);
return rc; return rc;
} }
@ -1096,25 +1012,16 @@ cleanup:
* @retval error * @retval error
*/ */
int orte_pls_bproc_launch(orte_jobid_t jobid) { int orte_pls_bproc_launch(orte_jobid_t jobid) {
opal_list_item_t* item;
orte_cellid_t cellid;
orte_job_map_t* map; orte_job_map_t* map;
orte_mapped_node_t *map_node; orte_mapped_node_t *map_node;
orte_vpid_t vpid_launch; orte_vpid_t vpid_launch;
orte_vpid_t vpid_range;
orte_vpid_t vpid_start;
int rc; int rc;
int src[4];
int ** node_array = NULL;
int * node_array_len = NULL;
int num_processes = 0;
int num_daemons;
int num_slots; int num_slots;
int context; int context;
int i, j; int i;
orte_std_cntr_t idx;
char cwd_save[OMPI_PATH_MAX + 1]; char cwd_save[OMPI_PATH_MAX + 1];
orte_ras_node_t *ras_node; orte_ras_node_t *ras_node;
char **daemon_env;
OPAL_TRACE(1); OPAL_TRACE(1);
@ -1137,15 +1044,13 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
return rc; return rc;
} }
if(ORTE_SUCCESS != (rc = orte_rmgr.get_vpid_range(jobid, &vpid_start, /* set the mapping mode */
&vpid_range))) { if (NULL != map->mapping_mode && 0 == strcmp("bynode", map->mapping_mode)) {
ORTE_ERROR_LOG(rc); bynode = true;
goto cleanup; } else {
bynode = false;
} }
/* get the cellid */
cellid = orte_process_info.my_name->cellid;
/* check all of the app_contexts for sanity */ /* check all of the app_contexts for sanity */
for (i=0; i < map->num_apps; i++) { for (i=0; i < map->num_apps; i++) {
/* Check that the cwd is sane. We have to chdir there in /* Check that the cwd is sane. We have to chdir there in
@ -1190,94 +1095,32 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
opal_output(0, "pls_bproc: --- starting to launch procs ---"); opal_output(0, "pls_bproc: --- starting to launch procs ---");
} }
/* create an array to hold the pointers to the node arrays for each app /* save the daemon environment */
* context. Also, create an array to hold the lengths of the node arrays */ daemon_env = opal_argv_copy(map->apps[0]->env);
node_array = malloc(map->num_apps * sizeof(int *));
node_array_len = malloc(map->num_apps * sizeof(int *));
/* for each application context - create a node array and setup its env */ /* for each application context, setup its env */
for(i=0; i < map->num_apps; i++) { for(i=0; i < map->num_apps; i++) {
rc = orte_pls_bproc_node_array(map, &node_array[i],
&node_array_len[i]);
if(0 > rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
orte_pls_bproc_setup_env(&map->apps[i]->env); orte_pls_bproc_setup_env(&map->apps[i]->env);
num_processes += rc;
} }
/* save the active node names */ /* tell the smr which nodes to monitor so we can be notified
idx = 0;
for (item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
item = opal_list_get_next(item)) {
orte_mapped_node_t* node = (orte_mapped_node_t*) item;
rc = orte_pointer_array_add(&idx, mca_pls_bproc_component.active_node_names,
strdup(node->nodename));
}
/* setup subscription for each node so we can detect
when the node's state changes, useful for aborting when when the node's state changes, useful for aborting when
a bproc node up and dies */ a bproc node up and dies */
if (ORTE_SUCCESS != (rc = orte_smr.begin_monitoring(map, orte_pls_bproc_node_failed, NULL))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_pls_bproc_monitor_nodes(); /* launch the daemons on all nodes which have processes assigned to them */
rc = orte_pls_bproc_launch_daemons(map, &daemon_env);
opal_argv_free(daemon_env);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* launch the daemons on all the nodes which have processes assigned to them. vpid_launch = map->vpid_start;
* We need to send along an appropriate environment for the daemons. Since
* there must be at least ONE app_context, we can just take that one
*/
rc = orte_pls_bproc_launch_daemons(cellid, &map->apps[0]->env, map,
vpid_start, jobid, &num_daemons);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* wait for communication back from the daemons, which indicates they have
* sucessfully set up the pty/pipes and IO forwarding which the user apps
* will use */
for(j = 0; j < num_daemons; j++) {
orte_buffer_t ack;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &ack, ORTE_RML_TAG_BPROC);
if(0 > rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&ack);
goto cleanup;
}
idx = 4;
rc = orte_dss.unpack(&ack, &src, &idx, ORTE_INT);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&ack);
if(-1 == src[0]) {
/* one of the daemons has failed to properly launch. The error is sent
* by orte_pls_bproc_waitpid_daemon_cb */
if(-1 == src[1]) { /* did not die on a signal */
opal_show_help("help-pls-bproc.txt", "daemon-died-no-signal", true,
src[2], src[3]);
} else { /* died on a signal */
opal_show_help("help-pls-bproc.txt", "daemon-died-signal", true,
src[2], src[3], src[1]);
}
rc = ORTE_ERROR;
ORTE_ERROR_LOG(rc);
orte_pls_bproc_terminate_job(jobid, NULL);
goto cleanup;
}
}
vpid_launch = vpid_start;
/* for each application context launch the app */ /* for each application context launch the app */
for(context=0; context < map->num_apps; context++) { for(context=0; context < map->num_apps; context++) {
@ -1286,29 +1129,24 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
goto cleanup; goto cleanup;
} }
rc = orte_pls_bproc_launch_app(cellid, jobid, map, num_processes, num_slots, rc = orte_pls_bproc_launch_app(map, num_slots, vpid_launch, context);
vpid_launch, vpid_start, context,
node_array[context], node_array_len[context]);
if(ORTE_SUCCESS != rc) { if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
free(node_array[context]); vpid_launch += map->apps[context]->num_procs;
vpid_launch = vpid_start + mca_pls_bproc_component.num_procs;
} }
mca_pls_bproc_component.done_launching = true;
cleanup: cleanup:
chdir(cwd_save); chdir(cwd_save);
OBJ_RELEASE(map); OBJ_RELEASE(map);
if(NULL != node_array) { if (mca_pls_bproc_component.do_not_launch) {
free(node_array); /* abort the job */
} exit(-1);
if(NULL != node_array_len) {
free(node_array_len);
} }
return rc; return rc;
} }

Просмотреть файл

@ -107,43 +107,29 @@ void orte_pls_bproc_recv(int status, orte_process_name_t* sender,
struct orte_pls_bproc_component_t { struct orte_pls_bproc_component_t {
orte_pls_base_component_t super; orte_pls_base_component_t super;
/**< The base class */ /**< The base class */
bool done_launching;
/**< Is true if we are done launching the user's app. */
char * orted; char * orted;
/**< The orted executeable. This can be an absolute path, or if not found /**< The orted executable. This can be an absolute path, or if not found
* we will look for it in the user's path */ * we will look for it in the user's path */
int debug; int debug;
/**< If greater than 0 print debugging information */ /**< If greater than 0 print debugging information */
bool timing;
/**< If true, report launch timing info */
int num_procs;
/**< The number of processes that are running */
int priority; int priority;
/**< The priority of this component. This will be returned if we determine /**< The priority of this component. This will be returned if we determine
* that bproc is available and running on this node, */ * that bproc is available and running on this node, */
int terminate_sig; int terminate_sig;
/**< The signal that gets sent to a process to kill it. */ /**< The signal that gets sent to a process to kill it. */
size_t num_daemons;
/**< The number of daemons that are currently running. */
opal_mutex_t lock; opal_mutex_t lock;
/**< Lock used to prevent some race conditions */ /**< Lock used to prevent some race conditions */
opal_condition_t condition; opal_condition_t condition;
/**< Condition that is signaled when all the daemons have died */ /**< Condition that is signaled when all the daemons have died */
orte_pointer_array_t * daemon_names;
/**< Array of the process names of all the daemons. This is used to send
* the daemons a termonation signal when all the user processes are done */
orte_pointer_array_t* active_node_names;
/**< Array of the bproc node names of all the daemons. This is used to
* track which bproc nodes belong to us*/
bool bynode;
/**< Indicates whether or not this application is to be mapped by node
* (if set to true) or by slot (default)
*/
bool recv_issued; bool recv_issued;
/**< Indicates that the comm recv for reporting abnormal proc termination /**< Indicates that the comm recv for reporting abnormal proc termination
* has been issued * has been issued
*/ */
bool do_not_launch;
/**< for test purposes, do everything but the actual launch */
orte_std_cntr_t num_daemons;
/**< track the number of daemons being launched so we can tell when
* all have reported in */
}; };
/** /**
* Convenience typedef * Convenience typedef

Просмотреть файл

@ -54,8 +54,7 @@ orte_pls_bproc_component_t mca_pls_bproc_component = {
* finishes setting up the component struct. * finishes setting up the component struct.
*/ */
int orte_pls_bproc_component_open(void) { int orte_pls_bproc_component_open(void) {
int rc, tmp, value; int rc;
char *policy;
/* init parameters */ /* init parameters */
mca_base_component_t *c = &mca_pls_bproc_component.super.pls_version; mca_base_component_t *c = &mca_pls_bproc_component.super.pls_version;
@ -69,43 +68,19 @@ int orte_pls_bproc_component_open(void) {
false, 9, &mca_pls_bproc_component.terminate_sig); false, 9, &mca_pls_bproc_component.terminate_sig);
mca_base_param_reg_string(c, "orted", "Path to where orted is installed", mca_base_param_reg_string(c, "orted", "Path to where orted is installed",
false, false, "orted", &mca_pls_bproc_component.orted); false, false, "orted", &mca_pls_bproc_component.orted);
mca_pls_bproc_component.num_procs = 0; mca_base_param_reg_int(c, "nolaunch", NULL, false, false, (int)false,
mca_pls_bproc_component.num_daemons = 0; &rc);
mca_pls_bproc_component.done_launching = false; if ((int)false == rc) {
mca_pls_bproc_component.do_not_launch = false;
} else {
mca_pls_bproc_component.do_not_launch = true;
}
mca_pls_bproc_component.recv_issued = false; mca_pls_bproc_component.recv_issued = false;
OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_pls_bproc_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t); OBJ_CONSTRUCT(&mca_pls_bproc_component.condition, opal_condition_t);
/* we need to know the intended method for mapping processes return ORTE_SUCCESS;
* so we can properly direct the bproc processes on how to compute
* their name
*/
mca_base_param_reg_string_name("rmaps", "base_schedule_policy",
"Scheduling Policy for RMAPS. [slot | node]",
false, false, "slot", &policy);
if (0 == strcmp(policy, "node")) {
mca_pls_bproc_component.bynode = true;
} else {
mca_pls_bproc_component.bynode = false;
}
tmp = mca_base_param_reg_int_name("orte", "timing",
"Request that critical timing loops be measured",
false, false, 0, &value);
if (value != 0) {
mca_pls_bproc_component.timing = true;
} else {
mca_pls_bproc_component.timing = false;
}
/* init the list to hold the daemon names */
rc = orte_pointer_array_init(&mca_pls_bproc_component.daemon_names, 8, 200000, 8);
/* init the list to hold the daemon names */
rc = orte_pointer_array_init(&mca_pls_bproc_component.active_node_names, 8, 200000, 8);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
} }
/** /**
@ -114,7 +89,6 @@ int orte_pls_bproc_component_open(void) {
int orte_pls_bproc_component_close(void) { int orte_pls_bproc_component_close(void) {
OBJ_DESTRUCT(&mca_pls_bproc_component.lock); OBJ_DESTRUCT(&mca_pls_bproc_component.lock);
OBJ_DESTRUCT(&mca_pls_bproc_component.condition); OBJ_DESTRUCT(&mca_pls_bproc_component.condition);
OBJ_RELEASE(mca_pls_bproc_component.daemon_names);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -131,7 +105,7 @@ orte_pls_base_module_t* orte_pls_bproc_init(int *priority) {
return NULL; return NULL;
/* okay, we are in an HNP - now check to see if BProc is running here */ /* okay, we are in an HNP - now check to see if BProc is running here */
ret = bproc_version(&version); ret = bproc_version(&version);
if (ret != 0) { if (ret != 0) {
return NULL; return NULL;
} }

Просмотреть файл

@ -87,6 +87,13 @@ int orte_rmaps_base_get_job_map(orte_job_map_t **map, orte_jobid_t jobid)
/* set the jobid */ /* set the jobid */
mapping->job = jobid; mapping->job = jobid;
/* get the vpid start/range info */
if (ORTE_SUCCESS != (rc = orte_rmgr.get_vpid_range(jobid, &mapping->vpid_start, &mapping->vpid_range))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(mapping);
return rc;
}
/* get the job segment name */ /* get the job segment name */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -325,6 +332,12 @@ int orte_rmaps_base_put_job_map(orte_job_map_t *map)
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
/* store the vpid start/range info */
if (ORTE_SUCCESS != (rc = orte_rmgr.set_vpid_range(map->job, map->vpid_start, map->vpid_range))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/** /**
* allocate value array. We need to reserve one extra spot so we can set the counter * allocate value array. We need to reserve one extra spot so we can set the counter
* for the process INIT state to indicate that all procs are at that state. This will * for the process INIT state to indicate that all procs are at that state. This will

Просмотреть файл

@ -518,8 +518,10 @@ static int orte_rmaps_rr_map(orte_jobid_t jobid, opal_list_t *attributes)
/* Make assignments */ /* Make assignments */
if (mca_rmaps_round_robin_component.bynode) { if (mca_rmaps_round_robin_component.bynode) {
map->mapping_mode = strdup("bynode");
rc = map_app_by_node(app, map, jobid, vpid_start, working_node_list, &max_used_nodes); rc = map_app_by_node(app, map, jobid, vpid_start, working_node_list, &max_used_nodes);
} else { } else {
map->mapping_mode = strdup("byslot");
rc = map_app_by_slot(app, map, jobid, vpid_start, working_node_list, &max_used_nodes); rc = map_app_by_slot(app, map, jobid, vpid_start, working_node_list, &max_used_nodes);
} }

Просмотреть файл

@ -34,8 +34,8 @@
#include "orte/mca/errmgr/base/base.h" #include "orte/mca/errmgr/base/base.h"
int orte_ns_nds_env_put(const orte_process_name_t* name, int orte_ns_nds_env_put(const orte_process_name_t* name,
orte_vpid_t vpid_start, size_t num_procs, orte_vpid_t vpid_start, size_t num_procs,
char ***env) char ***env)
{ {
char* param; char* param;
char* cellid; char* cellid;

Просмотреть файл

@ -28,6 +28,8 @@ libmca_smr_la_SOURCES += \
base/smr_base_set_proc_state.c \ base/smr_base_set_proc_state.c \
base/smr_base_get_job_state.c \ base/smr_base_get_job_state.c \
base/smr_base_set_job_state.c \ base/smr_base_set_job_state.c \
base/smr_base_get_node_state.c \
base/smr_base_set_node_state.c \
base/smr_base_trig_init_fns.c \ base/smr_base_trig_init_fns.c \
base/smr_base_open.c \ base/smr_base_open.c \
base/data_type_support/smr_data_type_compare_fns.c \ base/data_type_support/smr_data_type_compare_fns.c \

103
orte/mca/smr/base/smr_base_get_node_state.c Обычный файл
Просмотреть файл

@ -0,0 +1,103 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*/
/*
* includes
*/
#include "orte_config.h"
#include <string.h>
#include "orte/mca/schema/schema.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/smr/base/smr_private.h"
int orte_smr_base_get_node_state(orte_node_state_t *state,
orte_cellid_t cell,
char *nodename)
{
orte_gpr_value_t **values=NULL;
int rc;
orte_std_cntr_t cnt, num_tokens, i;
char **tokens;
char *keys[] = {
ORTE_NODE_STATE_KEY,
NULL
};
orte_node_state_t *sptr;
if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&tokens, &num_tokens, cell, nodename))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.get(ORTE_GPR_TOKENS_XAND, ORTE_NODE_SEGMENT,
tokens, keys, &cnt, &values))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/** there should be one - and only one - value returned. if cnt is anything else,
* we have a problem
*/
if (1 != cnt) {
if (0 == cnt) { /** check for special case - didn't find the node's container */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/** if not 0, then we have too many - report that */
ORTE_ERROR_LOG(ORTE_ERR_INDETERMINATE_STATE_INFO);
rc = ORTE_ERR_INDETERMINATE_STATE_INFO;
goto CLEANUP;
}
/* there should only be one keyval returned - if not, got a problem */
if (1 != values[0]->cnt) {
ORTE_ERROR_LOG(ORTE_ERR_INDETERMINATE_STATE_INFO);
rc = ORTE_ERR_INDETERMINATE_STATE_INFO;
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_dss.get((void**)&sptr, values[0]->keyvals[0]->value, ORTE_NODE_STATE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
*state = *sptr;
CLEANUP:
for (i=0; i < num_tokens; i++) {
if (NULL != tokens[i]) free(tokens[i]);
}
free(tokens);
if (NULL != values) {
for (i=0; i < cnt; i++) {
OBJ_RELEASE(values[i]);
}
if (NULL != values) free(values);
}
return rc;
}

Просмотреть файл

@ -36,21 +36,9 @@
* as some systems can call this without that support * as some systems can call this without that support
*/ */
int orte_smr_base_get_node_state_not_available(orte_node_state_t *state, int orte_smr_base_begin_monitoring_not_available(orte_job_map_t *map,
orte_cellid_t cell, orte_gpr_trigger_cb_fn_t cbfunc,
char *nodename) void *user_tag)
{
return ORTE_SUCCESS;
}
int orte_smr_base_set_node_state_not_available(orte_cellid_t cell,
char *nodename,
orte_node_state_t state)
{
return ORTE_SUCCESS;
}
int orte_smr_base_begin_monitoring_not_available(orte_jobid_t job)
{ {
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -57,8 +57,8 @@ orte_smr_base_module_t orte_smr = {
orte_smr_base_get_proc_state, orte_smr_base_get_proc_state,
orte_smr_base_set_proc_state, orte_smr_base_set_proc_state,
orte_smr_base_get_node_state_not_available, orte_smr_base_get_node_state,
orte_smr_base_set_node_state_not_available, orte_smr_base_set_node_state,
orte_smr_base_get_job_state, orte_smr_base_get_job_state,
orte_smr_base_set_job_state, orte_smr_base_set_job_state,
orte_smr_base_begin_monitoring_not_available, orte_smr_base_begin_monitoring_not_available,
@ -69,6 +69,26 @@ orte_smr_base_module_t orte_smr = {
orte_smr_base_module_finalize_not_available orte_smr_base_module_finalize_not_available
}; };
/*
* OBJ constructors/desctructors for SMR types
*/
static void orte_smr_node_tracker_construct(orte_smr_node_state_tracker_t* node)
{
node->cell = ORTE_CELLID_INVALID;
node->nodename = NULL;
node->state = ORTE_NODE_STATE_UNKNOWN;
}
static void orte_smr_node_tracker_destruct(orte_smr_node_state_tracker_t* node)
{
if (NULL != node->nodename) free(node->nodename);
}
OBJ_CLASS_INSTANCE(orte_smr_node_state_tracker_t,
opal_list_item_t,
orte_smr_node_tracker_construct,
orte_smr_node_tracker_destruct);
/** /**
* Function for finding and opening either all MCA components, or the one * Function for finding and opening either all MCA components, or the one
* that was specifically requested via a MCA parameter. * that was specifically requested via a MCA parameter.
@ -79,8 +99,6 @@ int orte_smr_base_open(void)
int param, value, rc; int param, value, rc;
orte_data_type_t tmp; orte_data_type_t tmp;
/* fprintf(stderr,"orte_smr_base_open:enter\n"); */
/* setup output for debug messages */ /* setup output for debug messages */
orte_smr_base.smr_output = opal_output_open(NULL); orte_smr_base.smr_output = opal_output_open(NULL);

Просмотреть файл

@ -82,9 +82,9 @@ int orte_smr_base_select(void)
/* If it's not the best one, finalize it */ /* If it's not the best one, finalize it */
/* else { */ else {
/* component->smr_finalize(); */ component->smr_finalize();
/* } */ }
} /* for each possible component */ } /* for each possible component */

64
orte/mca/smr/base/smr_base_set_node_state.c Обычный файл
Просмотреть файл

@ -0,0 +1,64 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/mca/schema/schema.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/smr/base/smr_private.h"
int orte_smr_base_set_node_state(orte_cellid_t cell,
char *nodename,
orte_node_state_t state)
{
orte_gpr_value_t *value;
int rc;
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
ORTE_NODE_SEGMENT, 1, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_node_tokens(&(value->tokens), &(value->num_tokens), cell, nodename))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]), ORTE_NODE_STATE_KEY, ORTE_NODE_STATE, &state))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &value))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(value);
return rc;
}

Просмотреть файл

@ -43,6 +43,17 @@
extern "C" { extern "C" {
#endif #endif
/*
* Define an object for internally tracking node states
*/
typedef struct {
opal_list_item_t super;
orte_cellid_t cell;
char *nodename;
orte_node_state_t state;
} orte_smr_node_state_tracker_t;
OBJ_CLASS_DECLARATION(orte_smr_node_state_tracker_t);
int orte_smr_base_get_proc_state(orte_proc_state_t *state, int orte_smr_base_get_proc_state(orte_proc_state_t *state,
int *status, int *status,
orte_process_name_t *proc); orte_process_name_t *proc);
@ -51,13 +62,13 @@ int orte_smr_base_set_proc_state(orte_process_name_t *proc,
orte_proc_state_t state, orte_proc_state_t state,
int status); int status);
int orte_smr_base_get_node_state_not_available(orte_node_state_t *state, int orte_smr_base_get_node_state(orte_node_state_t *state,
orte_cellid_t cell, orte_cellid_t cell,
char *nodename); char *nodename);
int orte_smr_base_set_node_state_not_available(orte_cellid_t cell, int orte_smr_base_set_node_state(orte_cellid_t cell,
char *nodename, char *nodename,
orte_node_state_t state); orte_node_state_t state);
int orte_smr_base_get_job_state(orte_job_state_t *state, int orte_smr_base_get_job_state(orte_job_state_t *state,
orte_jobid_t jobid); orte_jobid_t jobid);
@ -87,7 +98,9 @@ int orte_smr_base_job_stage_gate_subscribe(orte_jobid_t job,
orte_gpr_notify_cb_fn_t cbfunc, void* cbdata, orte_gpr_notify_cb_fn_t cbfunc, void* cbdata,
orte_proc_state_t cb_conditions); orte_proc_state_t cb_conditions);
int orte_smr_base_begin_monitoring_not_available(orte_jobid_t job); int orte_smr_base_begin_monitoring_not_available(orte_job_map_t *map,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
int orte_smr_base_module_finalize_not_available (void); int orte_smr_base_module_finalize_not_available (void);

Просмотреть файл

@ -28,6 +28,9 @@
#include "orte/orte_constants.h" #include "orte/orte_constants.h"
#include "orte/orte_types.h" #include "orte/orte_types.h"
#include "opal/util/output.h"
#include "opal/class/opal_list.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/mca/ns/ns.h" #include "orte/mca/ns/ns.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
@ -36,7 +39,6 @@
#include "orte/mca/smr/base/smr_private.h" #include "orte/mca/smr/base/smr_private.h"
#include "orte/mca/smr/bproc/smr_bproc.h" #include "orte/mca/smr/bproc/smr_bproc.h"
#include "opal/util/output.h"
#define BIT_MASK(bit) (bit_set)(1 << (bit)) #define BIT_MASK(bit) (bit_set)(1 << (bit))
#define EMPTY_SET (bit_set)0 #define EMPTY_SET (bit_set)0
@ -53,7 +55,9 @@
| BIT_MASK(BIT_NODE_BPROC_USER) \ | BIT_MASK(BIT_NODE_BPROC_USER) \
| BIT_MASK(BIT_NODE_BPROC_GROUP)) | BIT_MASK(BIT_NODE_BPROC_GROUP))
/* define some local variables/types */
typedef unsigned int bit_set; typedef unsigned int bit_set;
static opal_list_t active_node_list;
static inline void set_bit(bit_set *set, int bit) static inline void set_bit(bit_set *set, int bit)
{ {
@ -82,8 +86,6 @@ static inline int empty_set(bit_set set)
return set == EMPTY_SET; return set == EMPTY_SET;
} }
static int orte_smr_bproc_get_proc_state(orte_proc_state_t *, int *, orte_process_name_t *);
static int orte_smr_bproc_set_proc_state(orte_process_name_t *, orte_proc_state_t, int);
static int orte_smr_bproc_finalize(void); static int orte_smr_bproc_finalize(void);
/** /**
@ -144,6 +146,8 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni)
struct group *grp; struct group *grp;
orte_gpr_value_t *value; orte_gpr_value_t *value;
int rc; int rc;
orte_smr_node_state_tracker_t *node;
opal_list_item_t *item;
cnt = num_bits(changes); cnt = num_bits(changes);
@ -153,6 +157,9 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni)
if (cnt == 0) if (cnt == 0)
return; return;
/* check and update the general cluster status segment - this segment has entries
* for every node in the cluster, not just the ones we want to monitor
*/
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND, if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
ORTE_BPROC_NODE_SEGMENT, cnt, 0))) { ORTE_BPROC_NODE_SEGMENT, cnt, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -233,7 +240,7 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni)
if (idx != cnt) { if (idx != cnt) {
opal_output(0, "smr_bproc: internal error %d != %d\n", idx, cnt); opal_output(0, "smr_bproc: internal error %d != %d\n", idx, cnt);
free(node_name); free(node_name);
OBJ_RELEASE(value); OBJ_RELEASE(value);
opal_event_del(&mca_smr_bproc_component.notify_event); opal_event_del(&mca_smr_bproc_component.notify_event);
return; return;
@ -257,11 +264,61 @@ static void update_registry(bit_set changes, struct bproc_node_info_t *ni)
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
opal_event_del(&mca_smr_bproc_component.notify_event); opal_event_del(&mca_smr_bproc_component.notify_event);
} }
free(node_name);
OBJ_RELEASE(value); OBJ_RELEASE(value);
/* now let's see if this is one of the nodes we are monitoring and
* update it IFF it the state changed to specified conditions. This
* action will trigger a callback to the right place to decide what
* to do about it
*/
if (mca_smr_bproc_component.monitoring &&
is_set(changes, BIT_NODE_STATE)) {
/* see if this is a node we are monitoring */
for (item = opal_list_get_first(&active_node_list);
item != opal_list_get_end(&active_node_list);
item = opal_list_get_next(item)) {
node = (orte_smr_node_state_tracker_t*)item;
if (0 == strcmp(node->nodename, node_name)) {
/* This is a node we are monitoring. If this is a state we care about,
* and the state has changed (so we only do this once) - trip the alert monitor
*/
if (state != node->state &&
(state == ORTE_NODE_STATE_DOWN || state == ORTE_NODE_STATE_REBOOT)) {
if (ORTE_SUCCESS != (rc = orte_gpr.create_value(&value, ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND,
ORTE_BPROC_NODE_SEGMENT, 1, 0))) {
ORTE_ERROR_LOG(rc);
return;
}
value->tokens[0] = strdup(ORTE_BPROC_NODE_GLOBALS);
if (ORTE_SUCCESS != (rc = orte_gpr.create_keyval(&(value->keyvals[0]),
ORTE_BPROC_NODE_ALERT_CNTR,
ORTE_UNDEF, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return;
}
if ((rc = orte_gpr.increment_value(value)) != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
opal_event_del(&mca_smr_bproc_component.notify_event);
}
OBJ_RELEASE(value);
}
/* update our local records */
node->state = state;
/* cleanup and return - no need to keep searching */
free(node_name);
return;
}
}
}
/* if this isn't someone we are monitoring, or it doesn't meet specified conditions,
* then just cleanup and leave
*/
free(node_name);
} }
static int do_update(struct bproc_node_set_t *ns) static int do_update(struct bproc_node_set_t *ns)
{ {
int i; int i;
@ -311,10 +368,9 @@ static void orte_smr_bproc_notify_handler(int fd, short flags, void *user)
/** /**
* Register a callback to receive BProc update notifications * Register a callback to receive BProc update notifications
*/ */
int orte_smr_bproc_module_init(void) static int orte_smr_bproc_module_init(void)
{ {
int rc; int rc;
struct bproc_node_set_t ns = BPROC_EMPTY_NODESET;
if (mca_smr_bproc_component.debug) if (mca_smr_bproc_component.debug)
opal_output(0, "init smr_bproc_module\n"); opal_output(0, "init smr_bproc_module\n");
@ -323,8 +379,48 @@ int orte_smr_bproc_module_init(void)
mca_smr_bproc_component.node_set.size = 0; mca_smr_bproc_component.node_set.size = 0;
/* construct the monitored node list so we can track who is being monitored */
OBJ_CONSTRUCT(&active_node_list, opal_list_t);
return ORTE_SUCCESS;
}
/*
* Setup to begin monitoring a job
*/
int orte_smr_bproc_begin_monitoring(orte_job_map_t *map, orte_gpr_trigger_cb_fn_t cbfunc, void *user_tag)
{
struct bproc_node_set_t ns = BPROC_EMPTY_NODESET;
/* if our internal structures haven't been initialized, then
* set them up
*/
if (!initialized) {
orte_smr_bproc_module_init();
initialized = true;
}
/* setup the local monitoring list */
for (item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
item = opal_list_get_next(item)) {
node = (orte_mapped_node_t*)item;
newnode = OBJ_NEW(orte_smr_node_state_tracker_t);
newnode->cell = node->cell;
newnode->nodename = strdup(node->nodename);
opal_list_append(&active_node_list, &newnode->super);
}
/* define the alert monitor to call the cbfunc if we trigger the alert */
orte_smr.define_alert_monitor(map->job, ORTE_BPROC_NODE_ALERT_TRIG,
ORTE_BPROC_NODE_ALERT_CNTR,
0, 1, true, cbfunc, user_tag);
/* /*
* Set initial node status * Set initial node status for all nodes in the local cell. We will
* receive reports from them all, but we will only provide alerts
* on those we are actively monitoring
*/ */
if (bproc_nodelist(&ns) < 0) if (bproc_nodelist(&ns) < 0)
@ -334,7 +430,7 @@ int orte_smr_bproc_module_init(void)
bproc_nodeset_free(&ns); bproc_nodeset_free(&ns);
/* /*
* Now regiser notify event * Now register notify event
*/ */
mca_smr_bproc_component.notify_fd = bproc_notifier(); mca_smr_bproc_component.notify_fd = bproc_notifier();
@ -344,42 +440,16 @@ int orte_smr_bproc_module_init(void)
memset(&mca_smr_bproc_component.notify_event, 0, sizeof(opal_event_t)); memset(&mca_smr_bproc_component.notify_event, 0, sizeof(opal_event_t));
opal_event_set( opal_event_set(
&mca_smr_bproc_component.notify_event, &mca_smr_bproc_component.notify_event,
mca_smr_bproc_component.notify_fd, mca_smr_bproc_component.notify_fd,
OPAL_EV_READ|OPAL_EV_PERSIST, OPAL_EV_READ|OPAL_EV_PERSIST,
orte_smr_bproc_notify_handler, orte_smr_bproc_notify_handler,
0); 0);
opal_event_add(&mca_smr_bproc_component.notify_event, 0); opal_event_add(&mca_smr_bproc_component.notify_event, 0);
return ORTE_SUCCESS;
} }
orte_smr_base_module_t orte_smr_bproc_module = {
orte_smr_bproc_get_proc_state,
orte_smr_bproc_set_proc_state,
orte_smr_base_get_node_state_not_available,
orte_smr_base_set_node_state_not_available,
orte_smr_base_get_job_state,
orte_smr_base_set_job_state,
orte_smr_base_begin_monitoring_not_available,
orte_smr_base_init_job_stage_gates,
orte_smr_base_init_orted_stage_gates,
orte_smr_base_define_alert_monitor,
orte_smr_base_job_stage_gate_subscribe,
orte_smr_bproc_finalize
};
static int orte_smr_bproc_get_proc_state(orte_proc_state_t *state, int *status, orte_process_name_t *proc)
{
return orte_smr_base_get_proc_state(state, status, proc);
}
static int orte_smr_bproc_set_proc_state(orte_process_name_t *proc, orte_proc_state_t state, int status)
{
return orte_smr_base_set_proc_state(proc, state, status);
}
/** /**
* Cleanup * Cleanup
*/ */

Просмотреть файл

@ -38,11 +38,18 @@ extern "C" {
#define ORTE_SMR_BPROC_NODE_USER "orte-node-bproc-user" #define ORTE_SMR_BPROC_NODE_USER "orte-node-bproc-user"
#define ORTE_SMR_BPROC_NODE_GROUP "orte-node-bproc-group" #define ORTE_SMR_BPROC_NODE_GROUP "orte-node-bproc-group"
#define ORTE_BPROC_NODE_ALERT_TRIG "orte-bproc-node-alert-trig"
#define ORTE_BPROC_NODE_ALERT_CNTR "orte-bproc-node-alert-cntr"
#define ORTE_BPROC_NODE_GLOBALS "orte-node-bproc-globals"
/** /**
* Module init/fini * Module init/fini
*/ */
int orte_smr_bproc_module_init(void); int orte_smr_bproc_init(void);
int orte_smr_bproc_module_finalize(void); int orte_smr_bproc_finalize(void);
int orte_smr_bproc_begin_monitoring(orte_job_map_t *map,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
struct orte_smr_bproc_component_t { struct orte_smr_bproc_component_t {
orte_smr_base_component_t super; orte_smr_base_component_t super;
@ -50,8 +57,8 @@ struct orte_smr_bproc_component_t {
int priority; int priority;
opal_event_t notify_event; opal_event_t notify_event;
int notify_fd; int notify_fd;
orte_cellid_t cellid;
struct bproc_node_set_t node_set; struct bproc_node_set_t node_set;
bool monitoring;
}; };
typedef struct orte_smr_bproc_component_t orte_smr_bproc_component_t; typedef struct orte_smr_bproc_component_t orte_smr_bproc_component_t;

Просмотреть файл

@ -62,6 +62,21 @@ orte_smr_bproc_component_t mca_smr_bproc_component = {
} }
}; };
orte_smr_base_module_t orte_smr_bproc_module = {
orte_smr_base_get_proc_state,
orte_smr_base_set_proc_state,
orte_smr_base_get_node_state_not_available,
orte_smr_base_set_node_state_not_available,
orte_smr_base_get_job_state,
orte_smr_base_set_job_state,
orte_smr_bproc_begin_monitoring,
orte_smr_base_init_job_stage_gates,
orte_smr_base_init_orted_stage_gates,
orte_smr_base_define_alert_monitor,
orte_smr_base_job_stage_gate_subscribe,
orte_smr_bproc_finalize
};
/** /**
* Utility function to register parameters * Utility function to register parameters
*/ */
@ -85,6 +100,8 @@ static int orte_smr_bproc_open(void)
orte_smr_bproc_param_register_int("debug", 0); orte_smr_bproc_param_register_int("debug", 0);
mca_smr_bproc_component.priority = mca_smr_bproc_component.priority =
orte_smr_bproc_param_register_int("priority", 1); orte_smr_bproc_param_register_int("priority", 1);
mca_smr_bproc_component.monitoring = false;
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -94,11 +111,11 @@ static int orte_smr_bproc_open(void)
static orte_smr_base_module_t* orte_smr_bproc_init(int *priority) static orte_smr_base_module_t* orte_smr_bproc_init(int *priority)
{ {
if (!orte_process_info.seed) if (!orte_process_info.seed) {
return NULL; return NULL;
}
*priority = mca_smr_bproc_component.priority; *priority = mca_smr_bproc_component.priority;
orte_smr_bproc_module_init();
return &orte_smr_bproc_module; return &orte_smr_bproc_module;
} }

Просмотреть файл

@ -37,6 +37,7 @@
#include "orte/mca/gpr/gpr_types.h" #include "orte/mca/gpr/gpr_types.h"
#include "orte/mca/ns/ns_types.h" #include "orte/mca/ns/ns_types.h"
#include "orte/mca/smr/smr_types.h" #include "orte/mca/smr/smr_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
@ -145,12 +146,14 @@ typedef int (*orte_smr_base_module_define_alert_monitor_fn_t)(orte_jobid_t job,
/* /*
* Initiate monitoring of a job * Initiate monitoring of a job
* This function notifies the smr that it should initiate monitoring of the specified * This function notifies the smr that it should initiate monitoring of the specified
* jobid. It is called by the resource manager once a job has been launched. Calling * jobid. It is called by a PLS component at an appropriate point in the launch procedure. Calling
* the function, allows smr components (e.g., the BProc component that monitors daemons * the function allows smr components (e.g., the BProc component that monitors daemons
* via the BProc-provided centralized alerting system) to make the necessary connections * via the BProc-provided centralized alerting system) to make the necessary connections
* for monitoring the job. * for monitoring the job.
*/ */
typedef int (*orte_smr_base_module_begin_monitoring_fn_t)(orte_jobid_t job); typedef int (*orte_smr_base_module_begin_monitoring_fn_t)(orte_job_map_t *map,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
/* /*
* Subscribe to a job stage gate * Subscribe to a job stage gate

Просмотреть файл

@ -21,6 +21,12 @@
#include "orte_config.h" #include "orte_config.h"
#include "opal/class/opal_list.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/* /*
* Process exit codes * Process exit codes
*/ */
@ -92,4 +98,9 @@ typedef int8_t orte_node_state_t;
/** Node is rebooting (only some systems will support this; see /** Node is rebooting (only some systems will support this; see
orte_node_state_t) */ orte_node_state_t) */
#define ORTE_NODE_STATE_REBOOT 0x03 #define ORTE_NODE_STATE_REBOOT 0x03
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif #endif