1
1

Get staged execution working on multi-node setups. Improve efficiency by only remapping if all procs not yet mapped in the job.

This commit was SVN r27181.
This commit is contained in:
Ralph Castain 2012-08-29 20:35:52 +00:00
parent dd5bd99942
commit 1b659de132
9 changed files with 95 additions and 69 deletions

View File

@ -34,6 +34,7 @@ BEGIN_C_DECLS
#define ORTE_DB_ARCH "orte.arch"
#define ORTE_DB_NPROCS "orte.nprocs"
#define ORTE_DB_RMLURI "orte.rmluri"
#define ORTE_DB_BIND_BITMAP "orte.bind.bitmap"
END_C_DECLS

View File

@ -98,7 +98,6 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
{
int rc;
orte_job_t *jdata=NULL;
orte_proc_t *proc;
orte_job_map_t *map=NULL;
opal_buffer_t *wireup;
opal_byte_object_t bo, *boptr;
@ -259,18 +258,6 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
return rc;
}
/* pack the binding bitmaps */
for (j=0; j < jdata->procs->size; j++) {
if (NULL == (proc = (orte_proc_t *) opal_pointer_array_get_item(jdata->procs, j))) {
continue;
}
/* okay to pack NULL strings */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &proc->cpu_bitmap, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* pack the collective ids */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata->peer_modex, 1, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
@ -324,7 +311,8 @@ static int check_local_proc(orte_job_t *jdata, orte_proc_t *pptr)
if (!pptr->local_proc) {
/* not on the local list */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"adding proc %s to my local list",
"%s adding proc %s to my local list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pptr->name)));
/* keep tabs of the number of local procs */
jdata->num_local_procs++;
@ -351,7 +339,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_vpid_t j;
orte_std_cntr_t cnt;
orte_job_t *jdata=NULL;
orte_proc_t *proc;
opal_byte_object_t *bo;
int8_t flag;
int32_t n;
@ -539,20 +526,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
goto REPORT_ERROR;
}
/* unpack the binding bitmaps */
for (j=0; j < jdata->num_procs; j++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &proc->cpu_bitmap, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
}
/* unpack the collective ids */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jdata->peer_modex, &cnt, ORTE_GRPCOMM_COLL_ID_T))) {

View File

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Los Alamos National Security, LLC.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*

View File

@ -8,7 +8,9 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow

View File

@ -48,6 +48,7 @@ static int staged_mapper(orte_job_t *jdata)
orte_proc_t *proc;
orte_node_t *node;
bool work_to_do = false;
opal_list_item_t *item;
/* only use this mapper if it was specified */
if (NULL == jdata->map->req_mapper ||
@ -100,6 +101,9 @@ static int staged_mapper(orte_job_t *jdata)
}
/* if nothing is available, then move on */
if (0 == num_slots || 0 == opal_list_get_size(&node_list)) {
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: no nodes available for this app",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
OBJ_DESTRUCT(&node_list);
continue;
}
@ -110,20 +114,46 @@ static int staged_mapper(orte_job_t *jdata)
}
if (ORTE_PROC_STATE_UNDEF != proc->state) {
/* this proc has already been mapped or executed */
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: proc %s has already been mapped",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name));
continue;
}
/* flag that there is at least one proc still to
* be executed
*/
work_to_do = true;
/* track number mapped */
jdata->num_mapped++;
/* map this proc to the first available slot */
node = (orte_node_t*)opal_list_get_first(&node_list);
OBJ_RETAIN(node); /* maintain accounting on object */
opal_output_verbose(5, orte_rmaps_base.rmaps_output,
"%s mca:rmaps:staged: assigning proc %s to node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name), node->name);
proc->node = node;
proc->nodename = node->name;
/* the local rank is the number of procs
* on this node from this job - we don't
* directly track this number, so it must
* be found by looping across the node->procs
* array and counting it each time. For now,
* since we don't use this value in this mode
* of operation, just set it to something arbitrary
*/
proc->local_rank = node->num_procs;
/* the node rank is simply the number of procs
* on the node at this time
*/
proc->node_rank = node->num_procs;
/* track number of procs on node and number of slots used */
node->num_procs++;
node->slots_inuse++;
if (node->slots_inuse == node->slots_alloc) {
opal_output(0, "%s slots on node %s are fully used",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), node->name);
opal_list_remove_item(&node_list, &node->super);
OBJ_RELEASE(node);
}
@ -154,6 +184,11 @@ static int staged_mapper(orte_job_t *jdata)
break;
}
}
/* clear the list */
while (NULL != (item = opal_list_remove_first(&node_list))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&node_list);
}
/* if there isn't at least one proc that can be launched,

View File

@ -253,6 +253,7 @@ static void setup_job_complete(int fd, short args, void *cbdata)
jdata->map = OBJ_NEW(orte_job_map_t);
jdata->map->req_mapper = strdup("staged");
ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_STAGED);
ORTE_SET_MAPPING_DIRECTIVE(jdata->map->mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
jdata->map->display_map = orte_rmaps_base.display_map;
}
orte_plm_base_setup_job_complete(0, 0, (void*)caddy);
@ -347,12 +348,13 @@ static void track_procs(int fd, short args, void *cbdata)
if (jdata->num_terminated == jdata->num_procs) {
/* no other procs are waiting, so end this job */
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
} else {
/* schedule the job for re-mapping so that any procs
} else if (jdata->num_mapped < jdata->num_procs) {
/* schedule the job for re-mapping so that procs
* waiting for resources can execute
*/
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP);
}
/* otherwise, do nothing until more procs terminate */
OBJ_RELEASE(caddy);
return;
}

View File

@ -710,6 +710,7 @@ static void orte_job_construct(orte_job_t* job)
job->state = ORTE_JOB_STATE_UNDEF;
job->restart = false;
job->num_mapped = 0;
job->num_launched = 0;
job->num_reported = 0;
job->num_terminated = 0;

View File

@ -406,6 +406,8 @@ typedef struct {
orte_job_state_t state;
/* some procs in this job are being restarted */
bool restart;
/* number of procs mapped */
orte_vpid_t num_mapped;
/* number of procs launched */
orte_vpid_t num_launched;
/* number of procs reporting contact info */

View File

@ -602,6 +602,10 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->cpu_bitmap, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
#endif
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->state, 1, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
@ -642,6 +646,7 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
opal_hwloc_level_t bind_level = OPAL_HWLOC_NODE_LEVEL, pbind, *lvptr;
unsigned int bind_idx, pbidx, *uiptr;
opal_hwloc_locality_t locality;
char *cpu_bitmap;
#endif
orte_std_cntr_t n;
opal_buffer_t buf;
@ -742,29 +747,19 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc);
goto cleanup;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &cpu_bitmap, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
if (proc.jobid == ORTE_PROC_MY_NAME->jobid &&
proc.vpid == ORTE_PROC_MY_NAME->vpid) {
/* set mine */
orte_process_info.my_local_rank = local_rank;
if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_LOCALRANK,
&orte_process_info.my_local_rank, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
orte_process_info.my_node_rank = node_rank;
if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_NODERANK,
&orte_process_info.my_node_rank, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#if OPAL_HAVE_HWLOC
orte_process_info.bind_idx = bind_idx;
if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_BIND_INDEX,
&orte_process_info.bind_idx, OPAL_UINT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
}
/* apps don't need the rest of the data in the buffer for this proc,
@ -785,27 +780,7 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* we don't need to store data for ourself in the database
* as we already did so
*/
if (proc.jobid == ORTE_PROC_MY_NAME->jobid &&
proc.vpid == ORTE_PROC_MY_NAME->vpid) {
continue;
}
/* store the data for this proc */
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_DAEMON_VPID, &dmn.vpid, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* lookup and store the hostname for this proc */
if (ORTE_SUCCESS != (rc = orte_db.fetch_pointer(&dmn, ORTE_DB_HOSTNAME, (void**)&hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* store the values in the database */
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
@ -819,7 +794,32 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_BIND_BITMAP, cpu_bitmap, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
/* we don't need to store the rest of the values
* for ourself in the database
* as we already did so during startup
*/
if (proc.jobid != ORTE_PROC_MY_NAME->jobid ||
proc.vpid != ORTE_PROC_MY_NAME->vpid) {
/* store the data for this proc */
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_DAEMON_VPID, &dmn.vpid, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* lookup and store the hostname for this proc */
if (ORTE_SUCCESS != (rc = orte_db.fetch_pointer(&dmn, ORTE_DB_HOSTNAME, (void**)&hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
/* setup for next cycle */
n = 1;
@ -929,6 +929,7 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
#if OPAL_HAVE_HWLOC
opal_hwloc_level_t bind_level = OPAL_HWLOC_NODE_LEVEL;
unsigned int bind_idx;
char *cpu_bitmap;
#endif
orte_std_cntr_t n;
opal_buffer_t buf;
@ -1014,6 +1015,11 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc);
goto cleanup;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &cpu_bitmap, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &state, &n, ORTE_PROC_STATE))) {
@ -1111,6 +1117,10 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
proc->app_idx = app_idx;
proc->restarts = restarts;
proc->state = state;
#if OPAL_HAVE_HWLOC
proc->bind_idx = bind_idx;
proc->cpu_bitmap = cpu_bitmap;
#endif
}
/* setup for next cycle */
n = 1;