1
1
openmpi/orte/mca/rmaps/resilient/rmaps_resilient.c
Ralph Castain b96a71b62e Enable restart of individual processes upon command via the errmgr callback function. It needs an external application to drive this capability, so normal operations shouldn't be affected.
Does not support MPI applications. More work coming to update daemon accounting on movement of procs across nodes.

This commit was SVN r21545.
2009-06-26 20:54:58 +00:00

485 строки
18 KiB
C

/*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#ifdef HAVE_STRING_H
#include <string.h>
#endif /* HAVE_STRING_H */
#include <stdio.h>
#include "opal/mca/base/mca_base_param.h"
#include "opal/util/argv.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/base/rmaps_private.h"
#include "orte/mca/rmaps/base/base.h"
#include "rmaps_resilient.h"
/*
* Local variable
*/
static opal_list_item_t *cur_node_item = NULL;
static orte_vpid_t vpid_start = 0;
static char *orte_getline(FILE *fp);
/* default round-robin mapper */
static int rr_map_default(orte_job_t *jdata, orte_app_context_t *app,
opal_list_t *node_list, orte_vpid_t num_procs)
{
int rc;
/* if a bookmark exists from some prior mapping, set us to start there */
cur_node_item = orte_rmaps_base_get_starting_point(node_list, jdata);
/* now perform the mapping */
if (ORTE_RMAPS_BYNODE & jdata->map->policy) {
if (ORTE_SUCCESS != (rc = orte_rmaps_base_map_bynode(jdata, app, node_list,
num_procs, vpid_start,
cur_node_item))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_rmaps_base_map_byslot(jdata, app, node_list,
num_procs, vpid_start,
cur_node_item, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* update the starting vpid */
vpid_start += num_procs;
return ORTE_SUCCESS;
}
static void flag_nodes(opal_list_t *node_list)
{
opal_list_item_t *item, *nitem;
orte_node_t *node, *nd;
orte_rmaps_res_ftgrp_t *ftgrp;
int k;
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
item = opal_list_get_next(item)) {
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
/* reset the flags */
ftgrp->used = false;
ftgrp->included = false;
/* if at least one node in our list is included in this
* ftgrp, then flag it as included
*/
for (nitem = opal_list_get_first(node_list);
!ftgrp->included && nitem != opal_list_get_end(node_list);
nitem = opal_list_get_next(nitem)) {
node = (orte_node_t*)nitem;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (nd = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
if (0 == strcmp(nd->name, node->name)) {
ftgrp->included = true;
break;
}
}
}
}
}
/*
* Loadbalance the cluster
*/
static int orte_rmaps_resilient_map(orte_job_t *jdata)
{
orte_job_map_t *map;
orte_app_context_t *app;
int i, j, k, totnodes;
opal_list_t node_list;
opal_list_item_t *item;
orte_std_cntr_t num_slots;
int rc;
float avgload, minload;
orte_node_t *node, *nd=NULL, *oldnode;
orte_rmaps_res_ftgrp_t *ftgrp, *target;
orte_vpid_t totprocs, lowprocs;
FILE *fp;
char *ftinput;
int grp;
char **nodes;
bool found;
orte_proc_t *proc, *pc;
/* have we already constructed the fault group list? */
if (0 == opal_list_get_size(&mca_rmaps_resilient_component.fault_grps) &&
NULL != mca_rmaps_resilient_component.fault_group_file) {
/* construct it */
fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r");
if (NULL == fp) { /* not found */
orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found",
true, mca_rmaps_resilient_component.fault_group_file);
return ORTE_ERROR;
}
/* build list of fault groups */
grp = 0;
while (NULL != (ftinput = orte_getline(fp))) {
ftgrp = OBJ_NEW(orte_rmaps_res_ftgrp_t);
ftgrp->ftgrp = grp++;
nodes = opal_argv_split(ftinput, ',');
/* find the referenced nodes */
for (k=0; k < opal_argv_count(nodes); k++) {
found = false;
for (i=0; i < orte_node_pool->size && !found; i++) {
if (NULL == (node = opal_pointer_array_get_item(orte_node_pool, i))) {
continue;
}
if (0 == strcmp(node->name, nodes[k])) {
OBJ_RETAIN(node);
opal_pointer_array_add(&ftgrp->nodes, node);
found = true;
break;
}
}
}
opal_list_append(&mca_rmaps_resilient_component.fault_grps, &ftgrp->super);
opal_argv_free(nodes);
free(ftinput);
}
fclose(fp);
}
/* the map will never be NULL as we initialize it before getting here,
* so check to see if any nodes are in the map - this will be our
* indicator that this is the prior map for a failed job that
* needs to be re-mapped
*/
if (0 < jdata->map->num_nodes) {
/* cycle through all the procs in this job to find the one(s) that failed */
for (i=0; i < jdata->procs->size; i++) {
/* get the proc object */
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* is this proc to be restarted? */
if (proc->state != ORTE_PROC_STATE_RESTART) {
continue;
}
opal_output(0, "proc %s is to be restarted", ORTE_NAME_PRINT(&proc->name));
/* it is to be restarted - remove the proc from its current node */
oldnode = proc->node;
oldnode->num_procs--;
/* find this proc on node's pointer array */
for (k=0; k < oldnode->procs->size; k++) {
if (NULL == (pc = (orte_proc_t*)opal_pointer_array_get_item(oldnode->procs, k))) {
continue;
}
if (pc->name.jobid == proc->name.jobid &&
pc->name.vpid == proc->name.vpid) {
/* NULL that item */
opal_pointer_array_set_item(oldnode->procs, k, NULL);
break;
}
}
/* if we have fault groups, flag all the fault groups that
* include this node so we don't reuse them
*/
target = NULL;
minload = 1000000.0;
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
item = opal_list_get_next(item)) {
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
/* see if the node is in this fault group */
ftgrp->included = true;
ftgrp->used = false;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
if (0 == strcmp(node->name, proc->node->name)) {
/* yes - mark it to not be included */
ftgrp->included = false;
}
}
/* if this ftgrp is not included, then skip it */
if (!ftgrp->included) {
continue;
}
/* compute the load average on this fault group */
totprocs = 0;
totnodes = 0;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
totnodes++;
totprocs += node->num_procs;
}
avgload = (float)totprocs / (float)totnodes;
/* now find the lightest loaded of the included fault groups */
if (avgload < minload) {
minload = avgload;
target = ftgrp;
}
}
/* if no ftgrps are available, then just map it on the lightest loaded
* node in the current map, avoiding the current node if possible
*/
if (NULL == target) {
nd = oldnode; /* put it back where it was if nothing else is found */
totprocs = 1000000;
map = jdata->map;
for (k=0; k < map->nodes->size; k++) {
if (NULL == (node = opal_pointer_array_get_item(map->nodes, k)) ||
node == oldnode) {
continue;
}
if (node->num_procs < totprocs) {
nd = node;
totprocs = node->num_procs;
}
}
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookkeeping */
proc->node = nd;
opal_pointer_array_add(nd->procs, (void*)proc);
OBJ_RETAIN(proc); /* required to maintain bookkeeping */
nd->num_procs++;
/* flag the proc state as non-launched so we'll know to launch it */
proc->state = ORTE_PROC_STATE_INIT;
/* update the node and local ranks so static ports can
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
continue;
}
/* if we did find a target, re-map the proc to the lightest loaded
* node in that group
*/
lowprocs = 1000000;
nd = NULL;
for (k=0; k < target->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
continue;
}
if (node->num_procs < lowprocs) {
lowprocs = node->num_procs;
nd = node;
}
}
/* put proc on the found node */
OBJ_RETAIN(nd); /* required to maintain bookeeping */
proc->node = nd;
opal_pointer_array_add(nd->procs, (void*)proc);
nd->num_procs++;
/* flag the proc state as non-launched so we'll know to launch it */
proc->state = ORTE_PROC_STATE_INIT;
/* update the node and local ranks so static ports can
* be properly selected if active
*/
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
}
/* define the daemons that we will use for this job */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_define_daemons(jdata->map))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
/* CREATE INITIAL MAP FOR A JOB */
/* we map each app_context separately when creating an initial job map. For
* each app_context, we get the list of available nodes as this can be
* app_context specific based on hostfile and -host options. We then organize
* that list into fault groups based on the fault group definitions, if
* provided, and then divide the specified number of copies across them in
* a load-balanced way
*/
/* start at the beginning... */
vpid_start = 0;
jdata->num_procs = 0;
map = jdata->map;
for (i=0; i < jdata->apps->size; i++) {
/* get the app_context */
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
continue;
}
/* for each app_context, we have to get the list of nodes that it can
* use since that can now be modified with a hostfile and/or -host
* option
*/
OBJ_CONSTRUCT(&node_list, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
map->policy))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* were we given a fault group definition? */
if (0 < opal_list_get_size(&mca_rmaps_resilient_component.fault_grps)) {
/* flag the fault groups included by these nodes */
flag_nodes(&node_list);
/* map each copy to a different fault group - if more copies are
* specified than fault groups, then overlap in a round-robin fashion
*/
for (j=0; j < app->num_procs; j++) {
/* find unused included fault group with lowest average load - if none
* found, then break
*/
target = NULL;
minload = 1000000000.0;
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
item = opal_list_get_next(item)) {
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
/* if this ftgrp has already been used or is not included, then
* skip it
*/
if (ftgrp->used || !ftgrp->included) {
continue;
}
/* compute the load average on this fault group */
totprocs = 0;
totnodes = 0;
for (k=0; k < ftgrp->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
continue;
}
totnodes++;
totprocs += node->num_procs;
}
avgload = (float)totprocs / (float)totnodes;
if (avgload < minload) {
minload = avgload;
target = ftgrp;
}
}
/* if we have more procs than fault groups, then we simply
* map the remaining procs on available nodes in a round-robin
* fashion - it doesn't matter where they go as they will not
* be contributing to fault tolerance by definition
*/
if (NULL == target) {
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs-vpid_start))) {
goto error;
}
goto cleanup;
}
/* pick node with lowest load from within that group */
totprocs = 1000000;
for (k=0; k < target->nodes.size; k++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
continue;
}
if (node->num_procs < totprocs) {
totprocs = node->num_procs;
nd = node;
}
}
/* put proc on that node */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_claim_slot(jdata, nd, vpid_start, NULL, app->idx,
&node_list, jdata->map->oversubscribe, false))) {
/** if the code is ORTE_ERR_NODE_FULLY_USED, then we know this
* really isn't an error
*/
if (ORTE_ERR_NODE_FULLY_USED != rc) {
ORTE_ERROR_LOG(rc);
goto error;
}
}
/* track number of procs mapped */
vpid_start++;
/* flag this fault group as used */
target->used = true;
}
} else {
/* if we don't have a fault group definition, then just map the
* procs in a round-robin manner
*/
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs))) {
goto error;
}
}
cleanup:
/* cleanup the node list - it can differ from one app_context
* to another, so we have to get it every time
*/
while (NULL != (item = opal_list_remove_first(&node_list))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&node_list);
}
/* update the number of procs in the job */
jdata->num_procs = vpid_start;
/* compute and save convenience values */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_compute_usage(jdata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* define the daemons that we will use for this job */
if (ORTE_SUCCESS != (rc = orte_rmaps_base_define_daemons(jdata->map))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
error:
while (NULL != (item = opal_list_remove_first(&node_list))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&node_list);
return rc;
}
orte_rmaps_base_module_t orte_rmaps_resilient_module = {
orte_rmaps_resilient_map
};
static char *orte_getline(FILE *fp)
{
char *ret, *buff;
char input[1024];
ret = fgets(input, 1024, fp);
if (NULL != ret) {
input[strlen(input)-1] = '\0'; /* remove newline */
buff = strdup(input);
return buff;
}
return NULL;
}