/* * Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009-2010 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include "orte/constants.h" #include "orte/types.h" #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #ifdef HAVE_STRING_H #include #endif /* HAVE_STRING_H */ #include #include "opal/util/argv.h" #include "opal/class/opal_pointer_array.h" #include "orte/util/error_strings.h" #include "orte/util/show_help.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmaps/base/rmaps_private.h" #include "orte/mca/rmaps/base/base.h" #include "rmaps_resilient.h" /* * Local variable */ static char *orte_getline(FILE *fp); static bool have_ftgrps=false, made_ftgrps=false; static int construct_ftgrps(void); static int get_ftgrp_target(orte_proc_t *proc, orte_rmaps_res_ftgrp_t **target, orte_node_t **nd); static int get_new_node(orte_proc_t *proc, orte_app_context_t *app, orte_job_map_t *map, orte_node_t **ndret); static int map_to_ftgrps(orte_job_t *jdata); /* * Loadbalance the cluster */ static int orte_rmaps_resilient_map(orte_job_t *jdata) { orte_app_context_t *app; int i, j; int rc = ORTE_SUCCESS; orte_node_t *nd=NULL, *oldnode, *node, *nptr; orte_rmaps_res_ftgrp_t *target = NULL; orte_proc_t *proc; orte_vpid_t totprocs; opal_list_t node_list; orte_std_cntr_t num_slots; opal_list_item_t *item; mca_base_component_t *c = &mca_rmaps_resilient_component.super.base_version; bool found; if (!(ORTE_JOB_CONTROL_RESTART & jdata->controls)) { if (NULL != jdata->map->req_mapper && 0 != strcasecmp(jdata->map->req_mapper, c->mca_component_name)) { /* a mapper has been specified, and it isn't me */ opal_output_verbose(5, orte_rmaps_base_framework.framework_output, "mca:rmaps:resilient: job %s not using resilient mapper", ORTE_JOBID_PRINT(jdata->jobid)); return ORTE_ERR_TAKE_NEXT_OPTION; } if (NULL == mca_rmaps_resilient_component.fault_group_file) { opal_output_verbose(5, orte_rmaps_base_framework.framework_output, "mca:rmaps:resilient: cannot perform initial map of job %s - no fault groups", ORTE_JOBID_PRINT(jdata->jobid)); return ORTE_ERR_TAKE_NEXT_OPTION; } } else if (!(ORTE_JOB_CONTROL_PROCS_MIGRATING & jdata->controls)) { opal_output_verbose(5, orte_rmaps_base_framework.framework_output, "mca:rmaps:resilient: cannot map job %s - not in restart or migrating", ORTE_JOBID_PRINT(jdata->jobid)); return ORTE_ERR_TAKE_NEXT_OPTION; } opal_output_verbose(5, orte_rmaps_base_framework.framework_output, "mca:rmaps:resilient: mapping job %s", ORTE_JOBID_PRINT(jdata->jobid)); /* flag that I did the mapping */ if (NULL != jdata->map->last_mapper) { free(jdata->map->last_mapper); } jdata->map->last_mapper = strdup(c->mca_component_name); /* have we already constructed the fault group list? */ if (!made_ftgrps) { construct_ftgrps(); } if (ORTE_JOB_STATE_INIT == jdata->state) { /* this is an initial map - let the fault group mapper * handle it */ return map_to_ftgrps(jdata); } /* * NOTE: if a proc is being ADDED to an existing job, then its * node field will be NULL. */ OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: remapping job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* cycle through all the procs in this job to find the one(s) that failed */ for (i=0; i < jdata->procs->size; i++) { /* get the proc object */ if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { continue; } OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output, "%s PROC %s STATE %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), orte_proc_state_to_str(proc->state))); /* is this proc to be restarted? */ if (proc->state != ORTE_PROC_STATE_RESTART) { continue; } /* save the current node */ oldnode = proc->node; /* point to the app */ app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proc->app_idx); if( NULL == app ) { ORTE_ERROR_LOG(ORTE_ERR_FAILED_TO_MAP); rc = ORTE_ERR_FAILED_TO_MAP; goto error; } if (NULL == oldnode) { OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: proc %s is to be started", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name))); } else { OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: proc %s from node %s[%s] is to be restarted", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), (NULL == oldnode->name) ? "NULL" : oldnode->name, (NULL == oldnode->daemon) ? "--" : ORTE_VPID_PRINT(oldnode->daemon->name.vpid))); } if (NULL == oldnode) { /* this proc was not previously running - likely it is being added * to the job. So place it on the node with the fewest procs to * balance the load */ OBJ_CONSTRUCT(&node_list, opal_list_t); if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app, jdata->map->mapping, false, false))) { ORTE_ERROR_LOG(rc); while (NULL != (item = opal_list_remove_first(&node_list))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&node_list); goto error; } if (opal_list_is_empty(&node_list)) { /* put the proc on "hold" until resources are available */ OBJ_DESTRUCT(&node_list); proc->state = ORTE_PROC_STATE_MIGRATING; rc = ORTE_ERR_OUT_OF_RESOURCE; goto error; } totprocs = 1000000; nd = NULL; while (NULL != (item = opal_list_remove_first(&node_list))) { node = (orte_node_t*)item; if (node->num_procs < totprocs) { nd = node; totprocs = node->num_procs; } OBJ_RELEASE(item); /* maintain accounting */ } OBJ_DESTRUCT(&node_list); /* we already checked to ensure there was at least one node, * so we couldn't have come out of the loop with nd=NULL */ OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: Placing new process on node %s[%s] (no ftgrp)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nd->name, (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid))); } else { OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: proc %s from node %s is to be restarted", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), (NULL == proc->node) ? "NULL" : proc->node->name)); /* if we have fault groups, use them */ if (have_ftgrps) { if (ORTE_SUCCESS != (rc = get_ftgrp_target(proc, &target, &nd))) { ORTE_ERROR_LOG(rc); goto error; } OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: placing proc %s into fault group %d node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), target->ftgrp, nd->name)); } else { if (ORTE_SUCCESS != (rc = get_new_node(proc, app, jdata->map, &nd))) { ORTE_ERROR_LOG(rc); return rc; } } } /* add node to map if necessary - nothing we can do here * but search for it */ found = false; for (j=0; j < jdata->map->nodes->size; j++) { if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, j))) { continue; } if (nptr == nd) { found = true; break; } } if (!found) { OBJ_RETAIN(nd); opal_pointer_array_add(jdata->map->nodes, nd); nd->mapped = true; } OBJ_RETAIN(nd); /* maintain accounting on object */ proc->node = nd; proc->nodename = nd->name; nd->num_procs++; opal_pointer_array_add(nd->procs, (void*)proc); /* retain the proc struct so that we correctly track its release */ OBJ_RETAIN(proc); /* flag the proc state as non-launched so we'll know to launch it */ proc->state = ORTE_PROC_STATE_INIT; /* update the node and local ranks so static ports can * be properly selected if active */ orte_rmaps_base_update_local_ranks(jdata, oldnode, nd, proc); } error: return rc; } orte_rmaps_base_module_t orte_rmaps_resilient_module = { orte_rmaps_resilient_map }; static char *orte_getline(FILE *fp) { char *ret, *buff; char input[1024]; ret = fgets(input, 1024, fp); if (NULL != ret) { input[strlen(input)-1] = '\0'; /* remove newline */ buff = strdup(input); return buff; } return NULL; } static int construct_ftgrps(void) { orte_rmaps_res_ftgrp_t *ftgrp; orte_node_t *node; FILE *fp; char *ftinput; int grp; char **nodes; bool found; int i, k; /* flag that we did this */ made_ftgrps = true; if (NULL == mca_rmaps_resilient_component.fault_group_file) { /* nothing to build */ return ORTE_SUCCESS; } /* construct it */ OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: constructing fault groups", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); fp = fopen(mca_rmaps_resilient_component.fault_group_file, "r"); if (NULL == fp) { /* not found */ orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:file-not-found", true, mca_rmaps_resilient_component.fault_group_file); return ORTE_ERR_FAILED_TO_MAP; } /* build list of fault groups */ grp = 0; while (NULL != (ftinput = orte_getline(fp))) { ftgrp = OBJ_NEW(orte_rmaps_res_ftgrp_t); ftgrp->ftgrp = grp++; nodes = opal_argv_split(ftinput, ','); /* find the referenced nodes */ for (k=0; k < opal_argv_count(nodes); k++) { found = false; for (i=0; i < orte_node_pool->size && !found; i++) { if (NULL == (node = opal_pointer_array_get_item(orte_node_pool, i))) { continue; } if (0 == strcmp(node->name, nodes[k])) { OBJ_RETAIN(node); OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: adding node %s to fault group %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), node->name, ftgrp->ftgrp)); opal_pointer_array_add(&ftgrp->nodes, node); found = true; break; } } } opal_list_append(&mca_rmaps_resilient_component.fault_grps, &ftgrp->super); opal_argv_free(nodes); free(ftinput); } fclose(fp); /* flag that we have fault grps */ have_ftgrps = true; return ORTE_SUCCESS; } static int get_ftgrp_target(orte_proc_t *proc, orte_rmaps_res_ftgrp_t **tgt, orte_node_t **ndret) { opal_list_item_t *item; int k, totnodes; orte_node_t *node, *nd; orte_rmaps_res_ftgrp_t *target, *ftgrp; float avgload, minload; orte_vpid_t totprocs, lowprocs; /* set defaults */ *tgt = NULL; *ndret = NULL; /* flag all the fault groups that * include this node so we don't reuse them */ minload = 1000000.0; target = NULL; for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps); item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps); item = opal_list_get_next(item)) { ftgrp = (orte_rmaps_res_ftgrp_t*)item; /* see if the node is in this fault group */ ftgrp->included = true; ftgrp->used = false; for (k=0; k < ftgrp->nodes.size; k++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { continue; } if (NULL != proc->node && 0 == strcmp(node->name, proc->node->name)) { /* yes - mark it to not be included */ OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: node %s is in fault group %d, which will be excluded", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), proc->node->name, ftgrp->ftgrp)); ftgrp->included = false; break; } } /* if this ftgrp is not included, then skip it */ if (!ftgrp->included) { continue; } /* compute the load average on this fault group */ totprocs = 0; totnodes = 0; for (k=0; k < ftgrp->nodes.size; k++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { continue; } totnodes++; totprocs += node->num_procs; } avgload = (float)totprocs / (float)totnodes; /* now find the lightest loaded of the included fault groups */ if (avgload < minload) { minload = avgload; target = ftgrp; OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: found new min load ftgrp %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ftgrp->ftgrp)); } } if (NULL == target) { /* nothing found */ return ORTE_ERR_NOT_FOUND; } /* if we did find a target, re-map the proc to the lightest loaded * node in that group */ lowprocs = 1000000; nd = NULL; for (k=0; k < target->nodes.size; k++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) { continue; } if (node->num_procs < lowprocs) { lowprocs = node->num_procs; nd = node; } } /* return the results */ *tgt = target; *ndret = nd; return ORTE_SUCCESS; } static int get_new_node(orte_proc_t *proc, orte_app_context_t *app, orte_job_map_t *map, orte_node_t **ndret) { orte_node_t *nd, *oldnode, *node; orte_proc_t *pptr; int rc, j; opal_list_t node_list, candidates; opal_list_item_t *item, *next; orte_std_cntr_t num_slots; bool found; /* set defaults */ *ndret = NULL; nd = NULL; oldnode = proc->node; /* * Get a list of all nodes */ OBJ_CONSTRUCT(&node_list, opal_list_t); if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app, map->mapping, false, false))) { ORTE_ERROR_LOG(rc); goto release; } if (opal_list_is_empty(&node_list)) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto release; } if (1 == opal_list_get_size(&node_list)) { /* if we have only one node, all we can do is put the proc on that * node, even if it is the same one - better than not restarting at * all */ nd = (orte_node_t*)opal_list_get_first(&node_list); proc->prior_node = oldnode; OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: Placing process %s on node %s[%s] (only one avail node)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), nd->name, (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid))); goto release; } /* * Cycle thru the list, transferring * all available nodes to the candidate list * so we can get them in the right order * */ OBJ_CONSTRUCT(&candidates, opal_list_t); while (NULL != (item = opal_list_remove_first(&node_list))) { node = (orte_node_t*)item; /* don't put it back on current node */ if (node == oldnode) { OBJ_RELEASE(item); continue; } if (0 == node->num_procs) { OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output, "%s PREPENDING EMPTY NODE %s[%s] TO CANDIDATES", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == node->name) ? "NULL" : node->name, (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid))); opal_list_prepend(&candidates, item); } else { OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output, "%s APPENDING NON-EMPTY NODE %s[%s] TO CANDIDATES", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == node->name) ? "NULL" : node->name, (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid))); opal_list_append(&candidates, item); } } /* search the candidates * try to use a semi-intelligent selection logic here that: * * (a) avoids putting the proc on a node where a peer is already * located as this degrades our fault tolerance * * (b) avoids "ricochet effect" where a process would ping-pong * between two nodes as it fails */ nd = NULL; item = opal_list_get_first(&candidates); while (item != opal_list_get_end(&candidates)) { node = (orte_node_t*)item; next = opal_list_get_next(item); /* don't return to our prior location to avoid * "ricochet" effect */ if (NULL != proc->prior_node && node == proc->prior_node) { OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output, "%s REMOVING PRIOR NODE %s[%s] FROM CANDIDATES", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == node->name) ? "NULL" : node->name, (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid))); opal_list_remove_item(&candidates, item); OBJ_RELEASE(item); /* maintain acctg */ item = next; continue; } /* if this node is empty, then it is the winner */ if (0 == node->num_procs) { nd = node; proc->prior_node = oldnode; break; } /* if this node has someone from my job, then skip it * to avoid (a) */ found = false; for (j=0; j < node->procs->size; j++) { if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(node->procs, j))) { continue; } if (pptr->name.jobid == proc->name.jobid) { OPAL_OUTPUT_VERBOSE((7, orte_rmaps_base_framework.framework_output, "%s FOUND PEER %s ON NODE %s[%s]", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pptr->name), (NULL == node->name) ? "NULL" : node->name, (NULL == node->daemon) ? "--" : ORTE_VPID_PRINT(node->daemon->name.vpid))); found = true; break; } } if (found) { item = next; continue; } /* get here if all tests pass - take this node */ nd = node; proc->prior_node = oldnode; break; } if (NULL == nd) { /* didn't find anything */ if (NULL != proc->prior_node) { nd = proc->prior_node; OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: Placing process %s on prior node %s[%s] (no ftgrp)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), (NULL == nd->name) ? "NULL" : nd->name, (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid))); } else { nd = oldnode; proc->prior_node = oldnode; OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: Placing process %s back on old node %s[%s] (no ftgrp)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc->name), (NULL == nd->name) ? "NULL" : nd->name, (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid))); } } /* cleanup candidate list */ while (NULL != (item = opal_list_remove_first(&candidates))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&candidates); release: OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: Placing process on node %s[%s] (no ftgrp)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == nd->name) ? "NULL" : nd->name, (NULL == nd->daemon) ? "--" : ORTE_VPID_PRINT(nd->daemon->name.vpid))); while (NULL != (item = opal_list_remove_first(&node_list))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&node_list); *ndret = nd; return rc; } static void flag_nodes(opal_list_t *node_list) { opal_list_item_t *item, *nitem; orte_node_t *node, *nd; orte_rmaps_res_ftgrp_t *ftgrp; int k; for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps); item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps); item = opal_list_get_next(item)) { ftgrp = (orte_rmaps_res_ftgrp_t*)item; /* reset the flags */ ftgrp->used = false; ftgrp->included = false; /* if at least one node in our list is included in this * ftgrp, then flag it as included */ for (nitem = opal_list_get_first(node_list); !ftgrp->included && nitem != opal_list_get_end(node_list); nitem = opal_list_get_next(nitem)) { node = (orte_node_t*)nitem; for (k=0; k < ftgrp->nodes.size; k++) { if (NULL == (nd = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { continue; } if (0 == strcmp(nd->name, node->name)) { ftgrp->included = true; break; } } } } } static int map_to_ftgrps(orte_job_t *jdata) { orte_job_map_t *map; orte_app_context_t *app; int i, j, k, totnodes; opal_list_t node_list; opal_list_item_t *item, *next, *curitem; orte_std_cntr_t num_slots; int rc = ORTE_SUCCESS; float avgload, minload; orte_node_t *node, *nd=NULL; orte_rmaps_res_ftgrp_t *ftgrp, *target = NULL; orte_vpid_t totprocs, num_assigned; bool initial_map=true; OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: creating initial map for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* start at the beginning... */ jdata->num_procs = 0; map = jdata->map; for (i=0; i < jdata->apps->size; i++) { /* get the app_context */ if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { continue; } /* you cannot use this mapper unless you specify the number of procs to * launch for each app */ if (0 == app->num_procs) { orte_show_help("help-orte-rmaps-resilient.txt", "orte-rmaps-resilient:num-procs", true); return ORTE_ERR_SILENT; } num_assigned = 0; /* for each app_context, we have to get the list of nodes that it can * use since that can now be modified with a hostfile and/or -host * option */ OBJ_CONSTRUCT(&node_list, opal_list_t); if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app, map->mapping, initial_map, false))) { ORTE_ERROR_LOG(rc); return rc; } /* flag that all subsequent requests should not reset the node->mapped flag */ initial_map = false; /* remove all nodes that are not "up" or do not have a running daemon on them */ item = opal_list_get_first(&node_list); while (item != opal_list_get_end(&node_list)) { next = opal_list_get_next(item); node = (orte_node_t*)item; if (ORTE_NODE_STATE_UP != node->state || NULL == node->daemon || ORTE_PROC_STATE_RUNNING != node->daemon->state) { opal_list_remove_item(&node_list, item); OBJ_RELEASE(item); } item = next; } curitem = opal_list_get_first(&node_list); /* flag the fault groups included by these nodes */ flag_nodes(&node_list); /* map each copy to a different fault group - if more copies are * specified than fault groups, then overlap in a round-robin fashion */ for (j=0; j < app->num_procs; j++) { /* find unused included fault group with lowest average load - if none * found, then break */ target = NULL; minload = 1000000000.0; for (item = opal_list_get_first(&mca_rmaps_resilient_component.fault_grps); item != opal_list_get_end(&mca_rmaps_resilient_component.fault_grps); item = opal_list_get_next(item)) { ftgrp = (orte_rmaps_res_ftgrp_t*)item; OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: fault group %d used: %s included %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ftgrp->ftgrp, ftgrp->used ? "YES" : "NO", ftgrp->included ? "YES" : "NO" )); /* if this ftgrp has already been used or is not included, then * skip it */ if (ftgrp->used || !ftgrp->included) { continue; } /* compute the load average on this fault group */ totprocs = 0; totnodes = 0; for (k=0; k < ftgrp->nodes.size; k++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&ftgrp->nodes, k))) { continue; } totnodes++; totprocs += node->num_procs; } avgload = (float)totprocs / (float)totnodes; if (avgload < minload) { minload = avgload; target = ftgrp; OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: found new min load ftgrp %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ftgrp->ftgrp)); } } /* if we have more procs than fault groups, then we simply * map the remaining procs on available nodes in a round-robin * fashion - it doesn't matter where they go as they will not * be contributing to fault tolerance by definition */ if (NULL == target) { OPAL_OUTPUT_VERBOSE((2, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: more procs than fault groups - mapping excess rr", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); nd = (orte_node_t*)curitem; curitem = opal_list_get_next(curitem); if (curitem == opal_list_get_end(&node_list)) { curitem = opal_list_get_first(&node_list); } } else { /* pick node with lowest load from within that group */ totprocs = 1000000; for (k=0; k < target->nodes.size; k++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(&target->nodes, k))) { continue; } if (node->num_procs < totprocs) { totprocs = node->num_procs; nd = node; } } } OPAL_OUTPUT_VERBOSE((1, orte_rmaps_base_framework.framework_output, "%s rmaps:resilient: placing proc into fault group %d node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (NULL == target) ? -1 : target->ftgrp, nd->name)); /* if the node isn't in the map, add it */ if (!nd->mapped) { OBJ_RETAIN(nd); opal_pointer_array_add(map->nodes, nd); nd->mapped = true; } if (NULL == orte_rmaps_base_setup_proc(jdata, nd, app->idx)) { ORTE_ERROR_LOG(ORTE_ERROR); return ORTE_ERROR; } if ((nd->slots < (int)nd->num_procs) || (0 < nd->slots_max && nd->slots_max < (int)nd->num_procs)) { if (ORTE_MAPPING_NO_OVERSUBSCRIBE & ORTE_GET_MAPPING_DIRECTIVE(jdata->map->mapping)) { orte_show_help("help-orte-rmaps-base.txt", "orte-rmaps-base:alloc-error", true, nd->num_procs, app->app); return ORTE_ERR_SILENT; } /* flag the node as oversubscribed so that sched-yield gets * properly set */ nd->oversubscribed = true; } /* track number of procs mapped */ num_assigned++; /* flag this fault group as used */ if (NULL != target) { target->used = true; } } /* track number of procs */ jdata->num_procs += app->num_procs; /* compute vpids and add proc objects to the job - this has to be * done after each app_context is mapped in order to keep the * vpids contiguous within an app_context */ if (ORTE_SUCCESS != (rc = orte_rmaps_base_compute_vpids(jdata, app, &node_list))) { ORTE_ERROR_LOG(rc); return rc; } /* cleanup the node list - it can differ from one app_context * to another, so we have to get it every time */ while (NULL != (item = opal_list_remove_first(&node_list))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&node_list); } /* compute and save local ranks */ if (ORTE_SUCCESS != (rc = orte_rmaps_base_compute_local_ranks(jdata))) { ORTE_ERROR_LOG(rc); return rc; } return ORTE_SUCCESS; }