170327e575
This commit was SVN r21424.
477 строки
18 KiB
C
477 строки
18 KiB
C
/*
|
|
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
|
|
*
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
#include "orte/types.h"
|
|
|
|
#include <errno.h>
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif /* HAVE_UNISTD_H */
|
|
#ifdef HAVE_STRING_H
|
|
#include <string.h>
|
|
#endif /* HAVE_STRING_H */
|
|
#include <stdio.h>
|
|
|
|
#include "opal/mca/base/mca_base_param.h"
|
|
#include "opal/util/argv.h"
|
|
#include "opal/class/opal_pointer_array.h"
|
|
|
|
#include "orte/util/show_help.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
|
|
#include "orte/mca/rmaps/base/rmaps_private.h"
|
|
#include "orte/mca/rmaps/base/base.h"
|
|
#include "rmaps_resilient.h"
|
|
|
|
|
|
/*
|
|
* Local variable
|
|
*/
|
|
static opal_list_item_t *cur_node_item = NULL;
|
|
static orte_vpid_t vpid_start = 0;
|
|
|
|
static char *orte_getline(FILE *fp);
|
|
|
|
/* default round-robin mapper */
|
|
static int rr_map_default(orte_job_t *jdata, orte_app_context_t *app,
|
|
opal_list_t *node_list, orte_vpid_t num_procs)
|
|
{
|
|
int rc;
|
|
|
|
/* if a bookmark exists from some prior mapping, set us to start there */
|
|
cur_node_item = orte_rmaps_base_get_starting_point(node_list, jdata);
|
|
|
|
/* now perform the mapping */
|
|
if (ORTE_RMAPS_BYNODE & jdata->map->policy) {
|
|
if (ORTE_SUCCESS != (rc = orte_rmaps_base_map_bynode(jdata, app, node_list,
|
|
num_procs, vpid_start,
|
|
cur_node_item))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
} else {
|
|
if (ORTE_SUCCESS != (rc = orte_rmaps_base_map_byslot(jdata, app, node_list,
|
|
num_procs, vpid_start,
|
|
cur_node_item, 0))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
/* update the starting vpid */
|
|
vpid_start += num_procs;
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
static void flag_nodes(opal_list_t *node_list)
|
|
{
|
|
opal_list_item_t *item, *nitem;
|
|
orte_node_t *node, *nd;
|
|
orte_rmaps_res_ftgrp_t *ftgrp;
|
|
int k;
|
|
|
|
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
|
|
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
|
|
item = opal_list_get_next(item)) {
|
|
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
|
|
/* reset the flags */
|
|
ftgrp->used = false;
|
|
ftgrp->included = false;
|
|
/* if at least one node in our list is included in this
|
|
* ftgrp, then flag it as included
|
|
*/
|
|
for (nitem = opal_list_get_first(node_list);
|
|
!ftgrp->included && nitem != opal_list_get_end(node_list);
|
|
nitem = opal_list_get_next(nitem)) {
|
|
node = (orte_node_t*)nitem;
|
|
for (k=0; k < ftgrp->nodes.size; k++) {
|
|
if (NULL == (nd = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
|
|
continue;
|
|
}
|
|
if (0 == strcmp(nd->name, node->name)) {
|
|
ftgrp->included = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Loadbalance the cluster
|
|
*/
|
|
static int orte_rmaps_resilient_map(orte_job_t *jdata)
|
|
{
|
|
orte_job_map_t *map;
|
|
orte_app_context_t *app;
|
|
int i, j, k, totnodes;
|
|
opal_list_t node_list;
|
|
opal_list_item_t *item;
|
|
orte_std_cntr_t num_slots;
|
|
int rc;
|
|
float avgload, minload;
|
|
orte_node_t *node, *nd, *oldnode;
|
|
orte_rmaps_res_ftgrp_t *ftgrp, *target;
|
|
orte_vpid_t totprocs, lowprocs;
|
|
FILE *fp;
|
|
char *ftinput;
|
|
int grp;
|
|
char **nodes;
|
|
bool found;
|
|
orte_proc_t *proc, *pc;
|
|
|
|
/* have we already constructed the fault group list? */
|
|
if (0 == opal_list_get_size(&mca_rmaps_resilient_component.fault_grps) &&
|
|
NULL != mca_rmaps_resilient_component.fault_group_file) {
|
|
/* construct it */
|
|
fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r");
|
|
if (NULL == fp) { /* not found */
|
|
orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found",
|
|
true, mca_rmaps_resilient_component.fault_group_file);
|
|
return ORTE_ERROR;
|
|
}
|
|
/* build list of fault groups */
|
|
grp = 0;
|
|
while (NULL != (ftinput = orte_getline(fp))) {
|
|
ftgrp = OBJ_NEW(orte_rmaps_res_ftgrp_t);
|
|
ftgrp->ftgrp = grp++;
|
|
nodes = opal_argv_split(ftinput, ',');
|
|
/* find the referenced nodes */
|
|
for (k=0; k < opal_argv_count(nodes); k++) {
|
|
found = false;
|
|
for (i=0; i < orte_node_pool->size && !found; i++) {
|
|
if (NULL == (node = opal_pointer_array_get_item(orte_node_pool, i))) {
|
|
continue;
|
|
}
|
|
if (0 == strcmp(node->name, nodes[k])) {
|
|
OBJ_RETAIN(node);
|
|
opal_pointer_array_add(&ftgrp->nodes, node);
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
opal_list_append(&mca_rmaps_resilient_component.fault_grps, &ftgrp->super);
|
|
opal_argv_free(nodes);
|
|
free(ftinput);
|
|
}
|
|
fclose(fp);
|
|
}
|
|
|
|
/* the map will never be NULL as we initialize it before getting here,
|
|
* so check to see if any nodes are in the map - this will be our
|
|
* indicator that this is the prior map for a failed job that
|
|
* needs to be re-mapped
|
|
*/
|
|
if (0 < jdata->map->num_nodes) {
|
|
/* cycle through all the procs in this job to find the one(s) that failed */
|
|
for (i=0; i < jdata->procs->size; i++) {
|
|
/* get the proc object */
|
|
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
|
|
continue;
|
|
}
|
|
/* is this proc to be restarted? */
|
|
if (proc->state != ORTE_PROC_STATE_RESTART) {
|
|
continue;
|
|
}
|
|
|
|
/* it is to be restarted - remove the proc from its current node */
|
|
oldnode = proc->node;
|
|
oldnode->num_procs--;
|
|
/* find this proc on node's pointer array */
|
|
for (k=0; k < oldnode->procs->size; k++) {
|
|
if (NULL == (pc = (orte_proc_t*)opal_pointer_array_get_item(oldnode->procs, k))) {
|
|
continue;
|
|
}
|
|
if (pc->name.jobid == proc->name.jobid &&
|
|
pc->name.vpid == proc->name.vpid) {
|
|
/* NULL that item */
|
|
opal_pointer_array_set_item(oldnode->procs, k, NULL);
|
|
break;
|
|
}
|
|
}
|
|
/* if we have fault groups, flag all the fault groups that
|
|
* include this node so we don't reuse them
|
|
*/
|
|
target = NULL;
|
|
minload = 1000000.0;
|
|
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
|
|
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
|
|
item = opal_list_get_next(item)) {
|
|
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
|
|
/* see if the node is in this fault group */
|
|
ftgrp->included = true;
|
|
ftgrp->used = false;
|
|
for (k=0; k < ftgrp->nodes.size; k++) {
|
|
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
|
|
continue;
|
|
}
|
|
if (0 == strcmp(node->name, proc->node->name)) {
|
|
/* yes - mark it to not be included */
|
|
ftgrp->included = false;
|
|
}
|
|
}
|
|
/* if this ftgrp is not included, then skip it */
|
|
if (!ftgrp->included) {
|
|
continue;
|
|
}
|
|
/* compute the load average on this fault group */
|
|
totprocs = 0;
|
|
totnodes = 0;
|
|
for (k=0; k < ftgrp->nodes.size; k++) {
|
|
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
|
|
continue;
|
|
}
|
|
totnodes++;
|
|
totprocs += node->num_procs;
|
|
if (node->num_procs < lowprocs) {
|
|
lowprocs = node->num_procs;
|
|
nd = node;
|
|
}
|
|
}
|
|
avgload = (float)totprocs / (float)totnodes;
|
|
/* now find the lightest loaded of the included fault groups */
|
|
if (avgload < minload) {
|
|
minload = avgload;
|
|
target = ftgrp;
|
|
}
|
|
}
|
|
/* if no ftgrps are available, then just map it on the lightest loaded
|
|
* node in the current map, avoiding the current node if possible
|
|
*/
|
|
if (NULL == target) {
|
|
nd = oldnode; /* put it back where it was if nothing else is found */
|
|
totprocs = 1000000;
|
|
map = jdata->map;
|
|
for (k=0; k < map->nodes->size; k++) {
|
|
if (NULL == (node = opal_pointer_array_get_item(map->nodes, k)) ||
|
|
node == oldnode) {
|
|
continue;
|
|
}
|
|
if (node->num_procs < totprocs) {
|
|
nd = node;
|
|
totprocs = node->num_procs;
|
|
}
|
|
}
|
|
/* put proc on the found node */
|
|
OBJ_RETAIN(nd); /* required to maintain bookeeping */
|
|
proc->node = nd;
|
|
opal_pointer_array_add(nd->procs, (void*)proc);
|
|
nd->num_procs++;
|
|
/* flag the proc state as non-launched so we'll know to launch it */
|
|
proc->state = ORTE_PROC_STATE_INIT;
|
|
/* update the node and local ranks so static ports can
|
|
* be properly selected if active
|
|
*/
|
|
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
|
|
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
|
|
continue;
|
|
}
|
|
/* if we did find a target, re-map the proc to the lightest loaded
|
|
* node in that group
|
|
*/
|
|
lowprocs = 1000000;
|
|
nd = NULL;
|
|
for (k=0; k < target->nodes.size; k++) {
|
|
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
|
|
continue;
|
|
}
|
|
if (node->num_procs < lowprocs) {
|
|
lowprocs = node->num_procs;
|
|
nd = node;
|
|
}
|
|
}
|
|
/* put proc on the found node */
|
|
OBJ_RETAIN(nd); /* required to maintain bookeeping */
|
|
proc->node = nd;
|
|
opal_pointer_array_add(nd->procs, (void*)proc);
|
|
nd->num_procs++;
|
|
/* flag the proc state as non-launched so we'll know to launch it */
|
|
proc->state = ORTE_PROC_STATE_INIT;
|
|
/* update the node and local ranks so static ports can
|
|
* be properly selected if active
|
|
*/
|
|
orte_rmaps_base_update_usage(jdata, oldnode, nd, proc);
|
|
OBJ_RELEASE(oldnode); /* required to maintain bookeeping */
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
/* CREATE INITIAL MAP FOR A JOB */
|
|
/* we map each app_context separately when creating an initial job map. For
|
|
* each app_context, we get the list of available nodes as this can be
|
|
* app_context specific based on hostfile and -host options. We then organize
|
|
* that list into fault groups based on the fault group definitions, if
|
|
* provided, and then divide the specified number of copies across them in
|
|
* a load-balanced way
|
|
*/
|
|
|
|
/* start at the beginning... */
|
|
vpid_start = 0;
|
|
jdata->num_procs = 0;
|
|
map = jdata->map;
|
|
|
|
for (i=0; i < jdata->apps->size; i++) {
|
|
/* get the app_context */
|
|
if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
|
|
continue;
|
|
}
|
|
/* for each app_context, we have to get the list of nodes that it can
|
|
* use since that can now be modified with a hostfile and/or -host
|
|
* option
|
|
*/
|
|
OBJ_CONSTRUCT(&node_list, opal_list_t);
|
|
if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
|
|
map->policy))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto error;
|
|
}
|
|
/* were we given a fault group definition? */
|
|
if (0 < opal_list_get_size(&mca_rmaps_resilient_component.fault_grps)) {
|
|
/* flag the fault groups included by these nodes */
|
|
flag_nodes(&node_list);
|
|
/* map each copy to a different fault group - if more copies are
|
|
* specified than fault groups, then overlap in a round-robin fashion
|
|
*/
|
|
for (j=0; j < app->num_procs; j++) {
|
|
/* find unused included fault group with lowest average load - if none
|
|
* found, then break
|
|
*/
|
|
target = NULL;
|
|
minload = 1000000000.0;
|
|
for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps);
|
|
item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps);
|
|
item = opal_list_get_next(item)) {
|
|
ftgrp = (orte_rmaps_res_ftgrp_t*)item;
|
|
/* if this ftgrp has already been used or is not included, then
|
|
* skip it
|
|
*/
|
|
if (ftgrp->used || !ftgrp->included) {
|
|
continue;
|
|
}
|
|
/* compute the load average on this fault group */
|
|
totprocs = 0;
|
|
totnodes = 0;
|
|
for (k=0; k < ftgrp->nodes.size; k++) {
|
|
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) {
|
|
continue;
|
|
}
|
|
totnodes++;
|
|
totprocs += node->num_procs;
|
|
}
|
|
avgload = (float)totprocs / (float)totnodes;
|
|
if (avgload < minload) {
|
|
minload = avgload;
|
|
target = ftgrp;
|
|
}
|
|
}
|
|
/* if we have more procs than fault groups, then we simply
|
|
* map the remaining procs on available nodes in a round-robin
|
|
* fashion - it doesn't matter where they go as they will not
|
|
* be contributing to fault tolerance by definition
|
|
*/
|
|
if (NULL == target) {
|
|
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs-vpid_start))) {
|
|
goto error;
|
|
}
|
|
goto cleanup;
|
|
}
|
|
/* pick node with lowest load from within that group */
|
|
totprocs = 1000000;
|
|
for (k=0; k < target->nodes.size; k++) {
|
|
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) {
|
|
continue;
|
|
}
|
|
if (node->num_procs < totprocs) {
|
|
totprocs = node->num_procs;
|
|
nd = node;
|
|
}
|
|
}
|
|
/* put proc on that node */
|
|
if (ORTE_SUCCESS != (rc = orte_rmaps_base_claim_slot(jdata, nd, vpid_start, NULL, app->idx,
|
|
&node_list, jdata->map->oversubscribe, false))) {
|
|
/** if the code is ORTE_ERR_NODE_FULLY_USED, then we know this
|
|
* really isn't an error
|
|
*/
|
|
if (ORTE_ERR_NODE_FULLY_USED != rc) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto error;
|
|
}
|
|
}
|
|
/* track number of procs mapped */
|
|
vpid_start++;
|
|
|
|
/* flag this fault group as used */
|
|
target->used = true;
|
|
}
|
|
} else {
|
|
/* if we don't have a fault group definition, then just map the
|
|
* procs in a round-robin manner
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = rr_map_default(jdata, app, &node_list, app->num_procs))) {
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
cleanup:
|
|
/* cleanup the node list - it can differ from one app_context
|
|
* to another, so we have to get it every time
|
|
*/
|
|
while (NULL != (item = opal_list_remove_first(&node_list))) {
|
|
OBJ_RELEASE(item);
|
|
}
|
|
OBJ_DESTRUCT(&node_list);
|
|
}
|
|
|
|
/* update the number of procs in the job */
|
|
jdata->num_procs = vpid_start;
|
|
|
|
/* compute and save convenience values */
|
|
if (ORTE_SUCCESS != (rc = orte_rmaps_base_compute_usage(jdata))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return rc;
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
error:
|
|
while (NULL != (item = opal_list_remove_first(&node_list))) {
|
|
OBJ_RELEASE(item);
|
|
}
|
|
OBJ_DESTRUCT(&node_list);
|
|
|
|
return rc;
|
|
}
|
|
|
|
orte_rmaps_base_module_t orte_rmaps_resilient_module = {
|
|
orte_rmaps_resilient_map
|
|
};
|
|
|
|
static char *orte_getline(FILE *fp)
|
|
{
|
|
char *ret, *buff;
|
|
char input[1024];
|
|
|
|
ret = fgets(input, 1024, fp);
|
|
if (NULL != ret) {
|
|
input[strlen(input)-1] = '\0'; /* remove newline */
|
|
buff = strdup(input);
|
|
return buff;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|