diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 9016611d33..39e99d7ade 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -171,6 +171,12 @@ int orte_ess_base_orted_setup(char **hosts) } #endif + /* setup the global nidmap/pidmap object */ + orte_nidmap.bytes = NULL; + orte_nidmap.size = 0; + orte_pidmap.bytes = NULL; + orte_pidmap.size = 0; + /* open and setup the opal_pstat framework so we can provide * process stats if requested */ diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index 426be0ff8c..c3f6cd4562 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -224,6 +224,12 @@ static int rte_init(void) fflush(orte_xml_fp); } + /* setup the global nidmap/pidmap object */ + orte_nidmap.bytes = NULL; + orte_nidmap.size = 0; + orte_pidmap.bytes = NULL; + orte_pidmap.size = 0; + /* open and setup the opal_pstat framework so we can provide * process stats if requested */ diff --git a/orte/mca/grpcomm/base/grpcomm_base_xcast.c b/orte/mca/grpcomm/base/grpcomm_base_xcast.c index 55928c2383..f5780eceb8 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_xcast.c +++ b/orte/mca/grpcomm/base/grpcomm_base_xcast.c @@ -83,7 +83,8 @@ void orte_grpcomm_base_xcast_recv(int status, orte_process_name_t* sender, } /* update our local nidmap, if required - the decode function - * knows what to do - it will also free the bytes in the bo + * knows what to do - it will also free the bytes in the bo. Decode + * also updates our global nidmap object for sending to our local procs */ OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, "%s grpcomm:base:xcast updating daemon nidmap", diff --git a/orte/mca/odls/base/odls_base_close.c b/orte/mca/odls/base/odls_base_close.c index 3f12632050..036496b3a9 100644 --- a/orte/mca/odls/base/odls_base_close.c +++ b/orte/mca/odls/base/odls_base_close.c @@ -44,10 +44,6 @@ int orte_odls_base_close(void) OBJ_RELEASE(item); } OBJ_DESTRUCT(&orte_odls_globals.xterm_ranks); - if (NULL != orte_odls_globals.dmap && NULL != orte_odls_globals.dmap->bytes) { - free(orte_odls_globals.dmap->bytes); - free(orte_odls_globals.dmap); - } /* cleanup the global list of local children and job data */ for (i=0; i < orte_local_children->size; i++) { diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index bbe6397a44..08cda9a6b4 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -120,8 +120,8 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data, return ORTE_SUCCESS; } - /* construct a nodemap */ - if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&bo))) { + /* construct a nodemap - only want updated items */ + if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&bo, true))) { ORTE_ERROR_LOG(rc); return rc; } @@ -240,8 +240,8 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data, } } - /* encode the pidmap */ - if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(&bo))) { + /* encode the pidmap, taking only the updated procs */ + if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(&bo, true))) { ORTE_ERROR_LOG(rc); return rc; } @@ -251,13 +251,14 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data, ORTE_ERROR_LOG(rc); return rc; } - /* save it on the job data object as we won't be unpacking the buffer - * on our end - */ - opal_dss.copy((void**)&jdata->pmap, &bo, OPAL_BYTE_OBJECT); /* release the data since it has now been copied into our buffer */ free(bo.bytes); - + /* update our own version in case we have local procs */ + if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(&orte_pidmap, false))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* pack the binding bitmaps */ for (j=0; j < jdata->procs->size; j++) { if (NULL == (proc = (orte_proc_t *) opal_pointer_array_get_item(jdata->procs, j))) { @@ -363,20 +364,14 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, *job = ORTE_JOBID_INVALID; - /* extract the byte object holding the daemon map */ + /* extract the byte object holding the daemon map - we dealt with it + * during the xcast, so we can ignore it here + */ cnt=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) { ORTE_ERROR_LOG(rc); goto REPORT_ERROR; } - /* retain a copy for downloading to child processes */ - if (NULL != orte_odls_globals.dmap) { - free(orte_odls_globals.dmap->bytes); - free(orte_odls_globals.dmap); - orte_odls_globals.dmap = NULL; - } - orte_odls_globals.dmap = bo; - bo = NULL; /* unpack the wireup info flag */ cnt=1; @@ -435,6 +430,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) { continue; } + if (ORTE_PROC_STATE_UNDEF == pptr->state) { + /* not ready for use yet */ + continue; + } if (ORTE_SUCCESS != (rc = check_local_proc(jdata, pptr))) { ORTE_ERROR_LOG(rc); goto REPORT_ERROR; @@ -532,15 +531,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, ORTE_ERROR_LOG(rc); goto REPORT_ERROR; } - /* retain a copy for downloading to child processes */ - if (NULL != jdata->pmap) { - if (NULL != jdata->pmap->bytes) { - free(jdata->pmap->bytes); - } - free(jdata->pmap); - } - opal_dss.copy((void**)&jdata->pmap, bo, OPAL_BYTE_OBJECT); - /* decode the pidmap - this will also free the bytes in bo */ + /* decode the pidmap - this will also free the bytes in bo, and + * update our global pidmap object + */ if (ORTE_SUCCESS != (rc = orte_util_decode_daemon_pidmap(bo))) { ORTE_ERROR_LOG(rc); goto REPORT_ERROR; @@ -582,6 +575,10 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) { continue; } + if (ORTE_PROC_STATE_UNDEF == pptr->state) { + /* not ready for use yet */ + continue; + } /* see if it belongs to us */ if (ORTE_SUCCESS != (rc = check_local_proc(jdata, pptr))) { ORTE_ERROR_LOG(rc); @@ -1337,7 +1334,13 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata) continue; } - + /* is this child a candidate to start? it may not be alive + * because it already executed + */ + if (ORTE_PROC_STATE_INIT != child->state && + ORTE_PROC_STATE_RESTART != child->state) { + continue; + } /* do we have a child from the specified job. Because the * job could be given as a WILDCARD value, we must use * the dss.compare function to check for equality. @@ -1698,6 +1701,8 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, int rc=ORTE_SUCCESS, i; bool found=false, registering=false; orte_job_t *jobdat; + uint8_t flag; + opal_byte_object_t *boptr; OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls: require sync on child %s", @@ -1748,6 +1753,16 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, cnt = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &(child->rml_uri), &cnt, OPAL_STRING))) { ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + /* unpack the flag indicating MPI proc or not */ + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &flag, &cnt, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + if (1 == flag) { + child->mpi_proc = true; } } @@ -1761,29 +1776,22 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc, goto CLEANUP; } OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:sync nidmap requested for job %s: dmap %s pmap %s", + "%s odls:sync nidmap requested for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(jobdat->jobid), - (NULL == orte_odls_globals.dmap) ? "NULL" : "READY", - (NULL == jobdat->pmap) ? "NULL" : "READY")); - /* the proc needs a copy of both the daemon/node map, and - * the process map for its peers + ORTE_JOBID_PRINT(jobdat->jobid))); + /* the proc needs a copy of both the daemon/node map and + * the process map */ - if (NULL != orte_odls_globals.dmap && - NULL != jobdat->pmap) { - /* the data is in the local byte objects - send them */ - OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, - "%s odls:sync sending byte object", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); #if OPAL_HAVE_HWLOC - /* send the local topology so the individual apps - * don't hammer the system to collect it themselves - */ - opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO); + /* send the local topology so the individual apps + * don't hammer the system to collect it themselves + */ + opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO); #endif - opal_dss.pack(buffer, &orte_odls_globals.dmap, 1, OPAL_BYTE_OBJECT); - opal_dss.pack(buffer, &jobdat->pmap, 1, OPAL_BYTE_OBJECT); - } + boptr = &orte_nidmap; + opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); + boptr = &orte_pidmap; + opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); } OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, diff --git a/orte/mca/odls/base/odls_base_open.c b/orte/mca/odls/base/odls_base_open.c index 47838699dd..7d93a4286a 100644 --- a/orte/mca/odls/base/odls_base_open.c +++ b/orte/mca/odls/base/odls_base_open.c @@ -116,7 +116,6 @@ int orte_odls_base_open(void) /* initialize ODLS globals */ OBJ_CONSTRUCT(&orte_odls_globals.xterm_ranks, opal_list_t); orte_odls_globals.xtermcmd = NULL; - orte_odls_globals.dmap = NULL; /* check if the user requested that we display output in xterms */ if (NULL != orte_xterm) { diff --git a/orte/mca/odls/base/odls_private.h b/orte/mca/odls/base/odls_private.h index 3ce5364313..070f9bc855 100644 --- a/orte/mca/odls/base/odls_private.h +++ b/orte/mca/odls/base/odls_private.h @@ -52,8 +52,6 @@ typedef struct { int output; /** Time to allow process to forcibly die */ int timeout_before_sigkill; - /* byte object to store daemon map for later xmit to procs */ - opal_byte_object_t *dmap; /* list of ranks to be displayed on separate xterms */ opal_list_t xterm_ranks; /* the xterm cmd to be used */ diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index a2f57d91eb..9e479287c2 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -139,7 +139,8 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, orte_process_name_t name; pid_t pid; bool running; - + int8_t flag; + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:receive processing msg", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -328,13 +329,14 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, count=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &job, &count, ORTE_JOBID))) { ORTE_ERROR_LOG(rc); - goto CLEANUP; + goto DEPART; } name.jobid = job; /* get the job object */ if (NULL == (jdata = orte_get_job_data_object(job))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - goto CLEANUP; + rc = ORTE_ERR_NOT_FOUND; + goto DEPART; } count=1; while (ORTE_SUCCESS == opal_dss.unpack(buffer, &vpid, &count, ORTE_VPID)) { @@ -342,6 +344,23 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, break; } name.vpid = vpid; + /* unpack the mpi proc flag */ + count=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &count, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto DEPART; + } + /* get the proc data object */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + rc = ORTE_ERR_NOT_FOUND; + goto DEPART; + } + if (1 == flag) { + proc->mpi_proc = true; + } else { + proc->mpi_proc = false; + } ORTE_ACTIVATE_PROC_STATE(&name, ORTE_PROC_STATE_REGISTERED); count=1; } diff --git a/orte/mca/plm/plm_types.h b/orte/mca/plm/plm_types.h index cc805d7897..79d143520d 100644 --- a/orte/mca/plm/plm_types.h +++ b/orte/mca/plm/plm_types.h @@ -86,6 +86,17 @@ typedef uint32_t orte_proc_state_t; */ #define ORTE_PROC_STATE_DYNAMIC 100 +/* + * App_context state codes + */ +typedef int32_t orte_app_state_t; +#define ORTE_APP_STATE_T OPAL_INT32 + +#define ORTE_APP_STATE_UNDEF 0 +#define ORTE_APP_STATE_INIT 1 +#define ORTE_APP_STATE_ALL_MAPPED 2 +#define ORTE_APP_STATE_RUNNING 3 +#define ORTE_APP_STATE_COMPLETED 4 /* * Job state codes diff --git a/orte/mca/plm/rsh/plm_rsh_module.c b/orte/mca/plm/rsh/plm_rsh_module.c index d8987f4c86..7d8b3e7c5f 100644 --- a/orte/mca/plm/rsh/plm_rsh_module.c +++ b/orte/mca/plm/rsh/plm_rsh_module.c @@ -1102,8 +1102,8 @@ static void launch_daemons(int fd, short args, void *cbdata) OBJ_RELEASE(orte_tree_launch_cmd); goto cleanup; } - /* construct a nodemap */ - if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&bo))) { + /* construct a nodemap of all daemons we know about */ + if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&bo, false))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(orte_tree_launch_cmd); goto cleanup; diff --git a/orte/mca/rmaps/base/rmaps_base_map_job.c b/orte/mca/rmaps/base/rmaps_base_map_job.c index e5ced7d729..670ba3a7a4 100644 --- a/orte/mca/rmaps/base/rmaps_base_map_job.c +++ b/orte/mca/rmaps/base/rmaps_base_map_job.c @@ -56,6 +56,7 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata) /* convenience */ jdata = caddy->jdata; + jdata->state = ORTE_JOB_STATE_MAP; /* NOTE: NO PROXY COMPONENT REQUIRED - REMOTE PROCS ARE NOT * ALLOWED TO CALL RMAPS INDEPENDENTLY. ONLY THE PLM CAN @@ -155,7 +156,8 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata) item != opal_list_get_end(&orte_rmaps_base.selected_modules); item = opal_list_get_next(item)) { mod = (orte_rmaps_base_selected_module_t*)item; - if (ORTE_SUCCESS == (rc = mod->module->map_job(jdata))) { + if (ORTE_SUCCESS == (rc = mod->module->map_job(jdata)) || + ORTE_ERR_RESOURCE_BUSY == rc) { did_map = true; break; } @@ -169,6 +171,14 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata) return; } } + if (did_map && ORTE_ERR_RESOURCE_BUSY == rc) { + /* the map was done but nothing could be mapped + * for launch as all the resources were busy + */ + OBJ_RELEASE(caddy); + return; + } + /* if we get here without doing the map, or with zero procs in * the map, then that's an error */ diff --git a/orte/mca/rmaps/rmaps_types.h b/orte/mca/rmaps/rmaps_types.h index 70a89ebbe4..cf5c436b32 100644 --- a/orte/mca/rmaps/rmaps_types.h +++ b/orte/mca/rmaps/rmaps_types.h @@ -114,6 +114,8 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_job_map_t); #define ORTE_MAPPING_RR 0x000f /* sequential policy */ #define ORTE_MAPPING_SEQ 20 +/* staged execution mapping */ +#define ORTE_MAPPING_STAGED 21 /* rank file and other user-defined mapping */ #define ORTE_MAPPING_BYUSER 22 /* macro to separate out the mapping policy diff --git a/orte/mca/rmaps/staged/Makefile.am b/orte/mca/rmaps/staged/Makefile.am new file mode 100644 index 0000000000..b7cd9bc9f3 --- /dev/null +++ b/orte/mca/rmaps/staged/Makefile.am @@ -0,0 +1,35 @@ +# +# Copyright (c) 2012 Los Alamos National Security, LLC. +# All rights reserved +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + rmaps_staged.c \ + rmaps_staged.h \ + rmaps_staged_component.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_rmaps_staged_DSO +component_noinst = +component_install = mca_rmaps_staged.la +else +component_noinst = libmca_rmaps_staged.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_rmaps_staged_la_SOURCES = $(sources) +mca_rmaps_staged_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_rmaps_staged_la_SOURCES =$(sources) +libmca_rmaps_staged_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/rmaps/staged/rmaps_staged.c b/orte/mca/rmaps/staged/rmaps_staged.c new file mode 100644 index 0000000000..9a286884fe --- /dev/null +++ b/orte/mca/rmaps/staged/rmaps_staged.c @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2012 Los Alamos National Security, LLC. + * All rights reserved + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ +#ifdef HAVE_STRING_H +#include +#endif /* HAVE_STRING_H */ + +#include "opal/mca/base/mca_base_param.h" +#include "opal/mca/hwloc/base/base.h" +#include "opal/dss/dss.h" +#include "opal/util/output.h" + +#include "orte/util/show_help.h" +#include "orte/mca/errmgr/errmgr.h" + +#include "orte/mca/rmaps/base/rmaps_private.h" +#include "orte/mca/rmaps/base/base.h" +#include "rmaps_staged.h" + +static int staged_mapper(orte_job_t *jdata); + +orte_rmaps_base_module_t orte_rmaps_staged_module = { + staged_mapper +}; + +static int staged_mapper(orte_job_t *jdata) +{ + mca_base_component_t *c=&mca_rmaps_staged_component.base_version; + char *output; + int i, j, rc; + orte_app_context_t *app; + opal_list_t node_list; + orte_std_cntr_t num_slots; + orte_proc_t *proc; + orte_node_t *node; + bool work_to_do = false; + + /* only use this mapper if it was specified */ + if (NULL == jdata->map->req_mapper || + 0 != strcasecmp(jdata->map->req_mapper, c->mca_component_name) || + !(ORTE_MAPPING_STAGED & ORTE_GET_MAPPING_POLICY(jdata->map->mapping))) { + /* I wasn't specified */ + opal_output_verbose(5, orte_rmaps_base.rmaps_output, + "mca:rmaps:staged: job %s not using staged mapper", + ORTE_JOBID_PRINT(jdata->jobid)); + return ORTE_ERR_TAKE_NEXT_OPTION; + } + + opal_output_verbose(5, orte_rmaps_base.rmaps_output, + "%s mca:rmaps:staged: mapping job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jdata->jobid)); + + /* flag that I did the mapping */ + if (NULL != jdata->map->last_mapper) { + free(jdata->map->last_mapper); + } + jdata->map->last_mapper = strdup(c->mca_component_name); + + /* we assume that the app_contexts are in priority order, + * with the highest priority being the first entry in the + * job's app_context array. Loop across the app_contexts + * in order, looking for apps that have not been + * fully mapped + */ + for (i=0; i < jdata->apps->size; i++) { + if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { + continue; + } + /* has it been fully mapped? */ + if (ORTE_APP_STATE_ALL_MAPPED <= app->state) { + continue; + } + opal_output_verbose(5, orte_rmaps_base.rmaps_output, + "%s mca:rmaps:staged: working app %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app); + + /* find nodes that meet any constraints provided in the form of + * -hostfile or -host directives + */ + OBJ_CONSTRUCT(&node_list, opal_list_t); + if (ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app, + jdata->map->mapping, false))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* if nothing is available, then move on */ + if (0 == num_slots || 0 == opal_list_get_size(&node_list)) { + OBJ_DESTRUCT(&node_list); + continue; + } + /* assign any unmapped procs to an available slot */ + for (j=0; j < app->procs.size; j++) { + if (NULL == (proc = opal_pointer_array_get_item(&app->procs, j))) { + continue; + } + if (ORTE_PROC_STATE_UNDEF != proc->state) { + /* this proc has already been mapped or executed */ + continue; + } + /* flag that there is at least one proc still to + * be executed + */ + work_to_do = true; + /* map this proc to the first available slot */ + node = (orte_node_t*)opal_list_get_first(&node_list); + OBJ_RETAIN(node); /* maintain accounting on object */ + proc->node = node; + proc->nodename = node->name; + node->num_procs++; + node->slots_inuse++; + if (node->slots_inuse == node->slots_alloc) { + opal_list_remove_item(&node_list, &node->super); + OBJ_RELEASE(node); + } + if (0 > (rc = opal_pointer_array_add(node->procs, (void*)proc))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(proc); + return rc; + } + /* retain the proc struct so that we correctly track its release */ + OBJ_RETAIN(proc); + proc->state = ORTE_PROC_STATE_INIT; + /* flag the proc as updated so it will be included + * in the next pidmap message + */ + proc->updated =true; + /* add the node to the map, if needed */ + if (!node->mapped) { + if (ORTE_SUCCESS > (rc = opal_pointer_array_add(jdata->map->nodes, (void*)node))) { + ORTE_ERROR_LOG(rc); + return rc; + } + node->mapped = true; + OBJ_RETAIN(node); /* maintain accounting on object */ + jdata->map->num_nodes++; + } + if (0 == opal_list_get_size(&node_list)) { + /* nothing more we can do */ + break; + } + } + } + opal_dss.print(&output, NULL, jdata->map, ORTE_JOB_MAP); + opal_output(orte_clean_output, "%s", output); + + /* if there isn't at least one proc that can be launched, + * then indicate that we don't need to proceed with the + * launch sequence + */ + if (!work_to_do) { + return ORTE_ERR_RESOURCE_BUSY; + } + + /* flag that the job was updated so it will be + * included in the pidmap message + */ + jdata->updated = true; + + return ORTE_SUCCESS; +} diff --git a/orte/mca/rmaps/staged/rmaps_staged.h b/orte/mca/rmaps/staged/rmaps_staged.h new file mode 100644 index 0000000000..9c81886b24 --- /dev/null +++ b/orte/mca/rmaps/staged/rmaps_staged.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2012 Los Alamos National Security, LLC. + * All rights reserved + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_RMAPS_STAGED_H +#define ORTE_RMAPS_STAGED_H + +#include "orte_config.h" + +#include "orte/mca/rmaps/rmaps.h" + +BEGIN_C_DECLS + +ORTE_MODULE_DECLSPEC extern orte_rmaps_base_component_t mca_rmaps_staged_component; +extern orte_rmaps_base_module_t orte_rmaps_staged_module; + + +END_C_DECLS + +#endif diff --git a/orte/mca/rmaps/staged/rmaps_staged_component.c b/orte/mca/rmaps/staged/rmaps_staged_component.c new file mode 100644 index 0000000000..4f1bc507b4 --- /dev/null +++ b/orte/mca/rmaps/staged/rmaps_staged_component.c @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2012 Los Alamos National Security, LLC. + * All rights reserved + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/mca/base/base.h" +#include "opal/mca/base/mca_base_param.h" + +#include "orte/util/show_help.h" + +#include "orte/mca/rmaps/base/base.h" +#include "rmaps_staged.h" + +/* + * Local functions + */ + +static int orte_rmaps_staged_open(void); +static int orte_rmaps_staged_close(void); +static int orte_rmaps_staged_query(mca_base_module_t **module, int *priority); + +orte_rmaps_base_component_t mca_rmaps_staged_component = { + { + ORTE_RMAPS_BASE_VERSION_2_0_0, + + "staged", /* MCA component name */ + ORTE_MAJOR_VERSION, /* MCA component major version */ + ORTE_MINOR_VERSION, /* MCA component minor version */ + ORTE_RELEASE_VERSION, /* MCA component release version */ + orte_rmaps_staged_open, /* component open */ + orte_rmaps_staged_close, /* component close */ + orte_rmaps_staged_query /* component query */ + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + + +/** + * component open/close/init function + */ +static int orte_rmaps_staged_open(void) +{ + return ORTE_SUCCESS; +} + + +static int orte_rmaps_staged_query(mca_base_module_t **module, int *priority) +{ + *priority = 5; + *module = (mca_base_module_t *)&orte_rmaps_staged_module; + return ORTE_SUCCESS; +} + +/** + * Close all subsystems. + */ + +static int orte_rmaps_staged_close(void) +{ + return ORTE_SUCCESS; +} + + diff --git a/orte/mca/routed/base/routed_base_fns.c b/orte/mca/routed/base/routed_base_fns.c index 232227112c..c1bd2b208d 100644 --- a/orte/mca/routed/base/routed_base_fns.c +++ b/orte/mca/routed/base/routed_base_fns.c @@ -252,7 +252,8 @@ int orte_routed_base_register_sync(bool setup) int rc; orte_daemon_cmd_flag_t command=ORTE_DAEMON_SYNC_BY_PROC; char *rml_uri; - + uint8_t flag; + OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output, "%s registering sync to daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -290,6 +291,18 @@ int orte_routed_base_register_sync(bool setup) } if (NULL != rml_uri) free(rml_uri); + /* tell the daemon if we are an MPI proc */ + if (ORTE_PROC_IS_MPI) { + flag = 1; + } else { + flag = 0; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + return rc; + } + /* send the sync command to our daemon */ if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer, ORTE_RML_TAG_DAEMON, 0, diff --git a/orte/mca/state/base/state_base_fns.c b/orte/mca/state/base/state_base_fns.c index 841bbe131f..8e6bc79558 100644 --- a/orte/mca/state/base/state_base_fns.c +++ b/orte/mca/state/base/state_base_fns.c @@ -500,45 +500,18 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata) } pdata->iof_complete = true; if (pdata->waitpid_recvd) { - /* the proc has terminated */ - pdata->alive = false; - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } + ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED); } } else if (ORTE_PROC_STATE_WAITPID_FIRED == state) { /* update the proc state */ pdata->state = state; pdata->waitpid_recvd = true; if (pdata->iof_complete) { - /* the proc has terminated */ - pdata->alive = false; - pdata->state = ORTE_PROC_STATE_TERMINATED; - /* return the allocated slot for reuse */ - cleanup_node(pdata); - /* Clean up the session directory as if we were the process - * itself. This covers the case where the process died abnormally - * and didn't cleanup its own session directory. - */ - orte_session_dir_finalize(proc); - /* track job status */ - jdata->num_terminated++; - if (jdata->num_terminated == jdata->num_procs) { - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); - } + ORTE_ACTIVATE_PROC_STATE(proc, ORTE_PROC_STATE_TERMINATED); } } else if (ORTE_PROC_STATE_TERMINATED == state) { /* update the proc state */ + pdata->alive = false; pdata->state = state; if (pdata->local_proc) { /* Clean up the session directory as if we were the process diff --git a/orte/mca/state/hnp/configure.m4 b/orte/mca/state/hnp/configure.m4 deleted file mode 100644 index 9f40b478a1..0000000000 --- a/orte/mca/state/hnp/configure.m4 +++ /dev/null @@ -1,19 +0,0 @@ -# -*- shell-script -*- -# -# Copyright (c) 2011 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# -# MCA_state_hnp_CONFIG([action-if-found], [action-if-not-found]) -# ----------------------------------------------------------- -AC_DEFUN([MCA_orte_state_hnp_CONFIG], [ - AC_CONFIG_FILES([orte/mca/state/hnp/Makefile]) - - AS_IF([test "$orte_without_full_support" = 0], - [$1], - [$2]) -]) diff --git a/orte/mca/state/novm/configure.m4 b/orte/mca/state/novm/configure.m4 deleted file mode 100644 index 4a73bc7b24..0000000000 --- a/orte/mca/state/novm/configure.m4 +++ /dev/null @@ -1,19 +0,0 @@ -# -*- shell-script -*- -# -# Copyright (c) 2011-2012 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# -# MCA_state_novm_CONFIG([action-if-found], [action-if-not-found]) -# ----------------------------------------------------------- -AC_DEFUN([MCA_orte_state_novm_CONFIG], [ - AC_CONFIG_FILES([orte/mca/state/novm/Makefile]) - - AS_IF([test "$orte_without_full_support" = 0], - [$1], - [$2]) -]) diff --git a/orte/mca/state/orted/configure.m4 b/orte/mca/state/orted/configure.m4 deleted file mode 100644 index 0a5965372b..0000000000 --- a/orte/mca/state/orted/configure.m4 +++ /dev/null @@ -1,19 +0,0 @@ -# -*- shell-script -*- -# -# Copyright (c) 2011 Los Alamos National Security, LLC. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# -# MCA_state_orted_CONFIG([action-if-found], [action-if-not-found]) -# ----------------------------------------------------------- -AC_DEFUN([MCA_orte_state_orted_CONFIG], [ - AC_CONFIG_FILES([orte/mca/state/orted/Makefile]) - - AS_IF([test "$orte_without_full_support" = 0], - [$1], - [$2]) -]) diff --git a/orte/mca/state/orted/state_orted.c b/orte/mca/state/orted/state_orted.c index 952800711c..071051fece 100644 --- a/orte/mca/state/orted/state_orted.c +++ b/orte/mca/state/orted/state_orted.c @@ -203,6 +203,7 @@ static void track_procs(int fd, short argc, void *cbdata) int rc, i; orte_plm_cmd_flag_t cmd; orte_vpid_t null=ORTE_VPID_INVALID; + int8_t flag; OPAL_OUTPUT_VERBOSE((5, orte_state_base_output, "%s state:orted:track_procs called for proc %s state %s", @@ -258,6 +259,15 @@ static void track_procs(int fd, short argc, void *cbdata) ORTE_ERROR_LOG(rc); goto cleanup; } + if (pptr->mpi_proc) { + flag = 1; + } else { + flag = 0; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } } } /* pack an invalid marker */ diff --git a/orte/mca/state/staged/Makefile.am b/orte/mca/state/staged/Makefile.am new file mode 100644 index 0000000000..ff02ed135d --- /dev/null +++ b/orte/mca/state/staged/Makefile.am @@ -0,0 +1,37 @@ +# +# Copyright (c) 2012 Los Alamos National Security, LLC. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +dist_pkgdata_DATA = help-state-staged.txt + +sources = \ + state_staged.h \ + state_staged_component.c \ + state_staged.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_state_staged_DSO +component_noinst = +component_install = mca_state_staged.la +else +component_noinst = libmca_state_staged.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_state_staged_la_SOURCES = $(sources) +mca_state_staged_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_state_staged_la_SOURCES =$(sources) +libmca_state_staged_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/state/staged/help-state-staged.txt b/orte/mca/state/staged/help-state-staged.txt new file mode 100644 index 0000000000..016cea4fbc --- /dev/null +++ b/orte/mca/state/staged/help-state-staged.txt @@ -0,0 +1,18 @@ +# -*- text -*- +# +# Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# +[mpi-procs-not-supported] +The staged execution system cannot support MPI applications +because MPI requires that all processes execute simultaneously. + +Please re-run your job without the --staged option. +[no-np] +You must specify the number of procs for each app_context when +using staged execution diff --git a/orte/mca/state/staged/state_staged.c b/orte/mca/state/staged/state_staged.c new file mode 100644 index 0000000000..89300409dd --- /dev/null +++ b/orte/mca/state/staged/state_staged.c @@ -0,0 +1,359 @@ +/* + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#include +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ +#ifdef HAVE_STRING_H +#include +#endif + +#include "opal/util/output.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/iof/iof.h" +#include "orte/mca/plm/base/base.h" +#include "orte/mca/ras/base/base.h" +#include "orte/mca/rmaps/base/base.h" +#include "orte/mca/routed/routed.h" +#include "orte/mca/sensor/sensor.h" +#include "orte/util/session_dir.h" +#include "orte/util/show_help.h" +#include "orte/runtime/orte_quit.h" + +#include "orte/mca/state/state.h" +#include "orte/mca/state/base/base.h" +#include "orte/mca/state/base/state_private.h" +#include "state_staged.h" + +/* + * Module functions: Global + */ +static int init(void); +static int finalize(void); + +/****************** + * STAGED module + ******************/ +orte_state_base_module_t orte_state_staged_module = { + init, + finalize, + orte_state_base_activate_job_state, + orte_state_base_add_job_state, + orte_state_base_set_job_state_callback, + orte_state_base_set_job_state_priority, + orte_state_base_remove_job_state, + orte_state_base_activate_proc_state, + orte_state_base_add_proc_state, + orte_state_base_set_proc_state_callback, + orte_state_base_set_proc_state_priority, + orte_state_base_remove_proc_state +}; + +static void setup_job_complete(int fd, short args, void *cbdata); + +/* defined state machine sequence - individual + * plm's must add a state for launching daemons + */ +static orte_job_state_t launch_states[] = { + ORTE_JOB_STATE_INIT, + ORTE_JOB_STATE_INIT_COMPLETE, + ORTE_JOB_STATE_ALLOCATE, + ORTE_JOB_STATE_ALLOCATION_COMPLETE, + ORTE_JOB_STATE_DAEMONS_LAUNCHED, + ORTE_JOB_STATE_DAEMONS_REPORTED, + ORTE_JOB_STATE_VM_READY, + ORTE_JOB_STATE_MAP, + ORTE_JOB_STATE_MAP_COMPLETE, + ORTE_JOB_STATE_SYSTEM_PREP, + ORTE_JOB_STATE_LAUNCH_APPS, + ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE, + ORTE_JOB_STATE_RUNNING, + ORTE_JOB_STATE_REGISTERED, + /* termination states */ + ORTE_JOB_STATE_TERMINATED, + ORTE_JOB_STATE_NOTIFY_COMPLETED, + ORTE_JOB_STATE_ALL_JOBS_COMPLETE, + ORTE_JOB_STATE_DAEMONS_TERMINATED +}; +static orte_state_cbfunc_t launch_callbacks[] = { + orte_plm_base_setup_job, + setup_job_complete, + orte_ras_base_allocate, + orte_plm_base_allocation_complete, + orte_plm_base_daemons_launched, + orte_plm_base_daemons_reported, + orte_plm_base_vm_ready, + orte_rmaps_base_map_job, + orte_plm_base_mapping_complete, + orte_plm_base_complete_setup, + orte_plm_base_launch_apps, + orte_state_base_local_launch_complete, + orte_plm_base_post_launch, + orte_plm_base_registered, + orte_state_base_check_all_complete, + orte_state_base_cleanup_job, + orte_quit, + orte_quit +}; + +/* staged execution requires that we start as many + * procs initially as we have resources - if we have + * adequate resources, then we behave just like the + * default HNP module. If we don't, then we will have + * some procs left over - whenever a proc completes, + * we then initiate execution of one of the remaining + * procs. This continues until all procs are complete. + * + * NOTE: MPI DOESN'T KNOW HOW TO WORK WITH THIS SCENARIO, + * so detection of a call to MPI_Init from a proc while + * this module is active is cause for abort! + */ +static void track_procs(int fd, short args, void *cbdata); + +static orte_proc_state_t proc_states[] = { + ORTE_PROC_STATE_RUNNING, + ORTE_PROC_STATE_REGISTERED, + ORTE_PROC_STATE_IOF_COMPLETE, + ORTE_PROC_STATE_WAITPID_FIRED, + ORTE_PROC_STATE_TERMINATED +}; +static orte_state_cbfunc_t proc_callbacks[] = { + orte_state_base_track_procs, + track_procs, + orte_state_base_track_procs, + orte_state_base_track_procs, + track_procs +}; + +/************************ + * API Definitions + ************************/ +static int init(void) +{ + int i, rc; + int num_states; + + /* setup the state machines */ + OBJ_CONSTRUCT(&orte_job_states, opal_list_t); + OBJ_CONSTRUCT(&orte_proc_states, opal_list_t); + + /* setup the job state machine */ + num_states = sizeof(launch_states) / sizeof(orte_job_state_t); + for (i=0; i < num_states; i++) { + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(launch_states[i], + launch_callbacks[i], + ORTE_SYS_PRI))) { + ORTE_ERROR_LOG(rc); + } + } + /* add a default error response */ + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_FORCED_EXIT, + orte_quit, ORTE_ERROR_PRI))) { + ORTE_ERROR_LOG(rc); + } + /* add callback to report progress, if requested */ + if (ORTE_SUCCESS != (rc = orte_state.add_job_state(ORTE_JOB_STATE_REPORT_PROGRESS, + orte_state_base_report_progress, ORTE_ERROR_PRI))) { + ORTE_ERROR_LOG(rc); + } + if (5 < opal_output_get_verbosity(orte_state_base_output)) { + orte_state_base_print_job_state_machine(); + } + + /* populate the proc state machine to allow us to + * track proc lifecycle changes + */ + num_states = sizeof(proc_states) / sizeof(orte_proc_state_t); + for (i=0; i < num_states; i++) { + if (ORTE_SUCCESS != (rc = orte_state.add_proc_state(proc_states[i], + proc_callbacks[i], + ORTE_SYS_PRI))) { + ORTE_ERROR_LOG(rc); + } + } + if (5 < opal_output_get_verbosity(orte_state_base_output)) { + orte_state_base_print_proc_state_machine(); + } + + return ORTE_SUCCESS; +} + +static int finalize(void) +{ + opal_list_item_t *item; + + /* cleanup the proc state machine */ + while (NULL != (item = opal_list_remove_first(&orte_proc_states))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&orte_proc_states); + + return ORTE_SUCCESS; +} + +static void setup_job_complete(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_job_t *jdata = caddy->jdata; + int i, j; + orte_app_context_t *app; + orte_proc_t *proc; + orte_vpid_t vpid; + + /* check that the job meets our requirements */ + vpid = 0; + for (i=0; i < jdata->apps->size; i++) { + if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) { + continue; + } + if (app->num_procs <= 0) { + /* must specify -np for staged execution */ + orte_show_help("help-state-staged.txt", "no-np", true); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT); + OBJ_RELEASE(caddy); + return; + } + /* build the proc arrays - we'll need them later */ + for (j=0; j < app->num_procs; j++) { + proc = OBJ_NEW(orte_proc_t); + proc->name.jobid = jdata->jobid; + proc->name.vpid = vpid; + proc->app_idx = i; + proc->app_rank = j; + /* flag that the proc is NOT to be included + * in a pidmap message so we don't do it until + * the proc is actually scheduled for launch + */ + proc->updated = false; + /* procs must not barrier when executing in stages */ + proc->do_not_barrier = true; + /* add it to the job */ + opal_pointer_array_set_item(jdata->procs, vpid, proc); + jdata->num_procs++; + vpid++; + /* add it to the app */ + OBJ_RETAIN(proc); + opal_pointer_array_set_item(&app->procs, j, proc); + } + } + + /* set the job map to use the staged mapper */ + if (NULL == jdata->map) { + jdata->map = OBJ_NEW(orte_job_map_t); + jdata->map->req_mapper = strdup("staged"); + ORTE_SET_MAPPING_POLICY(jdata->map->mapping, ORTE_MAPPING_STAGED); + jdata->map->display_map = orte_rmaps_base.display_map; + } + orte_plm_base_setup_job_complete(0, 0, (void*)caddy); +} + +static void cleanup_node(orte_proc_t *proc) +{ + orte_node_t *node; + orte_proc_t *p; + int i; + + if (NULL == (node = proc->node)) { + return; + } + node->num_procs--; + node->slots_inuse--; + for (i=0; i < node->procs->size; i++) { + if (NULL == (p = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) { + continue; + } + if (p->name.jobid == proc->name.jobid && + p->name.vpid == proc->name.vpid) { + opal_pointer_array_set_item(node->procs, i, NULL); + OBJ_RELEASE(p); + break; + } + } +} + +static void track_procs(int fd, short args, void *cbdata) +{ + orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; + orte_process_name_t *proc = &caddy->name; + orte_proc_state_t state = caddy->proc_state; + orte_job_t *jdata; + orte_proc_t *pdata; + + OPAL_OUTPUT_VERBOSE((5, orte_state_base_output, + "%s state:staged:track_procs called for proc %s state %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(proc), + orte_proc_state_to_str(state))); + + /* get the job object for this proc */ + if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + pdata = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid); + + /* if this is a registration, check to see if it came from + * inside MPI_Init - if it did, that is not acceptable + */ + if (ORTE_PROC_STATE_REGISTERED == state) { + if (pdata->mpi_proc) { + /* we can't support this - issue an error and abort */ + orte_show_help("help-state-staged.txt", "mpi-procs-not-supported", true); + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_SILENT_ABORT); + } + /* update the proc state */ + pdata->state = state; + jdata->num_reported++; + if (jdata->num_reported == jdata->num_procs) { + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_REGISTERED); + } + OBJ_RELEASE(caddy); + return; + } + + /* if the proc terminated, see if any other procs are + * waiting to run. We assume that the app_contexts are + * in priority order, with the highest priority being + * at position 0 in the app_context array for this job + */ + if (ORTE_PROC_STATE_TERMINATED == state) { + /* update the proc state */ + pdata->alive = false; + pdata->state = state; + if (pdata->local_proc) { + /* Clean up the session directory as if we were the process + * itself. This covers the case where the process died abnormally + * and didn't cleanup its own session directory. + */ + orte_session_dir_finalize(proc); + } + /* return the allocated slot for reuse */ + cleanup_node(pdata); + /* track job status */ + jdata->num_terminated++; + if (jdata->num_terminated == jdata->num_procs) { + /* no other procs are waiting, so end this job */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); + } else { + /* schedule the job for re-mapping so that any procs + * waiting for resources can execute + */ + ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP); + } + OBJ_RELEASE(caddy); + return; + } +} diff --git a/orte/mca/state/staged/state_staged.h b/orte/mca/state/staged/state_staged.h new file mode 100644 index 0000000000..ab2cc2107b --- /dev/null +++ b/orte/mca/state/staged/state_staged.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2012 Los Alamos National Security, LLC. + * All rights reserved. + * + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + * + */ + +#ifndef MCA_STATE_STAGED_EXPORT_H +#define MCA_STATE_STAGED_EXPORT_H + +#include "orte_config.h" + +#include "orte/mca/state/state.h" + +BEGIN_C_DECLS + +/* + * Local Component structures + */ + +ORTE_MODULE_DECLSPEC extern orte_state_base_component_t mca_state_staged_component; + +ORTE_DECLSPEC extern orte_state_base_module_t orte_state_staged_module; + +END_C_DECLS + +#endif /* MCA_STATE_STAGED_EXPORT_H */ diff --git a/orte/mca/state/staged/state_staged_component.c b/orte/mca/state/staged/state_staged_component.c new file mode 100644 index 0000000000..ba9522a5b6 --- /dev/null +++ b/orte/mca/state/staged/state_staged_component.c @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2012 Los Alamos National Security, LLC. + * All rights reserved. + * + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "opal/util/output.h" + +#include "orte/mca/state/state.h" +#include "orte/mca/state/base/base.h" +#include "state_staged.h" + +/* + * Public string for version number + */ +const char *orte_state_staged_component_version_string = + "ORTE STATE staged MCA component version " ORTE_VERSION; + +/* + * Local functionality + */ +static int state_staged_open(void); +static int state_staged_close(void); +static int state_staged_component_query(mca_base_module_t **module, int *priority); + +/* + * Instantiate the public struct with all of our public information + * and pointer to our public functions in it + */ +orte_state_base_component_t mca_state_staged_component = +{ + /* Handle the general mca_component_t struct containing + * meta information about the component + */ + { + ORTE_STATE_BASE_VERSION_1_0_0, + /* Component name and version */ + "staged", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + state_staged_open, + state_staged_close, + state_staged_component_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, +}; + +static bool select_me = false; + +static int state_staged_open(void) +{ + int tmp; + mca_base_component_t *c=&mca_state_staged_component.base_version; + + mca_base_param_reg_int(c, "select", + "Use this component", + false, false, (int)false, &tmp); + select_me = OPAL_INT_TO_BOOL(tmp); + + return ORTE_SUCCESS; +} + +static int state_staged_close(void) +{ + return ORTE_SUCCESS; +} + +static int state_staged_component_query(mca_base_module_t **module, int *priority) +{ + if (ORTE_PROC_IS_HNP && select_me) { + *priority = 1000; + *module = (mca_base_module_t *)&orte_state_staged_module; + return ORTE_SUCCESS; + } + + *priority = -1; + *module = NULL; + return ORTE_ERROR; +} diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 85775a9bc6..db5caae15e 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -557,23 +557,17 @@ int orte_daemon(int argc, char *argv[]) orte_grpcomm_base.coll_id += 3; /* need to setup a pidmap for it */ - jdata->pmap = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t)); - if (ORTE_SUCCESS != (ret = orte_util_encode_pidmap(jdata->pmap))) { + if (ORTE_SUCCESS != (ret = orte_util_encode_pidmap(&orte_pidmap, false))) { ORTE_ERROR_LOG(ret); goto DONE; } - /* if we don't yet have a daemon map, then we have to generate one * to pass back to it */ - if (NULL == orte_odls_globals.dmap) { - orte_odls_globals.dmap = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t)); - /* construct a nodemap */ - if (ORTE_SUCCESS != (ret = orte_util_encode_nodemap(orte_odls_globals.dmap))) { - ORTE_ERROR_LOG(ret); - goto DONE; - } + if (ORTE_SUCCESS != (ret = orte_util_encode_nodemap(&orte_nidmap, false))) { + ORTE_ERROR_LOG(ret); + goto DONE; } /* create a string that contains our uri + the singleton's name + sysinfo */ diff --git a/orte/runtime/orte_globals.c b/orte/runtime/orte_globals.c index 85c4a85a9c..5b139addbf 100644 --- a/orte/runtime/orte_globals.c +++ b/orte/runtime/orte_globals.c @@ -190,6 +190,10 @@ opal_thread_t orte_progress_thread; opal_event_t orte_finalize_event; #endif +/* global nidmap/pidmap for daemons to give to apps */ +opal_byte_object_t orte_nidmap; +opal_byte_object_t orte_pidmap; + char *orte_selected_oob_component = NULL; #endif /* !ORTE_DISABLE_FULL_RTE */ @@ -224,7 +228,7 @@ int orte_dt_init(void) opal_output_set_verbosity(orte_debug_output, 1); } } - + /** register the base system types with the DSS */ tmp = ORTE_STD_CNTR; if (ORTE_SUCCESS != (rc = opal_dss.register_type(orte_dt_pack_std_cntr, @@ -568,6 +572,12 @@ static void orte_app_context_construct(orte_app_context_t* app_context) app_context->idx=0; app_context->app=NULL; app_context->num_procs=0; + OBJ_CONSTRUCT(&app_context->procs, opal_pointer_array_t); + opal_pointer_array_init(&app_context->procs, + 1, + ORTE_GLOBAL_ARRAY_MAX_SIZE, + 16); + app_context->state = ORTE_APP_STATE_UNDEF; app_context->first_rank = 0; app_context->argv=NULL; app_context->env=NULL; @@ -594,12 +604,21 @@ static void orte_app_context_construct(orte_app_context_t* app_context) static void orte_app_context_destructor(orte_app_context_t* app_context) { opal_list_item_t *item; + int i; + orte_proc_t *proc; if (NULL != app_context->app) { free (app_context->app); app_context->app = NULL; } + for (i=0; i < app_context->procs.size; i++) { + if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(&app_context->procs, i))) { + OBJ_RELEASE(proc); + } + } + OBJ_DESTRUCT(&app_context->procs); + /* argv and env lists created by util/argv copy functions */ if (NULL != app_context->argv) { opal_argv_free(app_context->argv); @@ -669,6 +688,7 @@ OBJ_CLASS_INSTANCE(orte_app_context_t, static void orte_job_construct(orte_job_t* job) { job->jobid = ORTE_JOBID_INVALID; + job->updated = true; job->apps = OBJ_NEW(opal_pointer_array_t); opal_pointer_array_init(job->apps, 1, @@ -685,7 +705,6 @@ static void orte_job_construct(orte_job_t* job) ORTE_GLOBAL_ARRAY_BLOCK_SIZE, ORTE_GLOBAL_ARRAY_MAX_SIZE, ORTE_GLOBAL_ARRAY_BLOCK_SIZE); - job->map = NULL; job->bookmark = NULL; job->state = ORTE_JOB_STATE_UNDEF; @@ -706,8 +725,6 @@ static void orte_job_construct(orte_job_t* job) job->enable_recovery = false; job->num_local_procs = 0; - job->pmap = NULL; - #if OPAL_ENABLE_FT_CR == 1 job->ckpt_state = 0; job->ckpt_snapshot_ref = NULL; @@ -753,13 +770,6 @@ static void orte_job_destruct(orte_job_t* job) } OBJ_RELEASE(job->procs); - if (NULL != job->pmap) { - if (NULL != job->pmap->bytes) { - free(job->pmap->bytes); - } - free(job->pmap); - } - #if OPAL_ENABLE_FT_CR == 1 if (NULL != job->ckpt_snapshot_ref) { free(job->ckpt_snapshot_ref); @@ -801,6 +811,7 @@ static void orte_node_construct(orte_node_t* node) node->daemon_launched = false; node->location_verified = false; node->launch_id = -1; + node->mapped = false; node->num_procs = 0; node->procs = OBJ_NEW(opal_pointer_array_t); @@ -891,6 +902,7 @@ static void orte_proc_construct(orte_proc_t* proc) proc->state = ORTE_PROC_STATE_UNDEF; proc->alive = false; proc->aborted = false; + proc->updated = true; proc->app_idx = 0; #if OPAL_HAVE_HWLOC proc->locale = NULL; @@ -913,6 +925,7 @@ static void orte_proc_construct(orte_proc_t* proc) OBJ_CONSTRUCT(&proc->stats, opal_ring_buffer_t); opal_ring_buffer_init(&proc->stats, orte_stat_history_size); proc->registered = false; + proc->mpi_proc = false; proc->deregistered = false; proc->iof_complete = false; proc->waitpid_recvd = false; diff --git a/orte/runtime/orte_globals.h b/orte/runtime/orte_globals.h index 24b69a3c8b..95639d09ec 100644 --- a/orte/runtime/orte_globals.h +++ b/orte/runtime/orte_globals.h @@ -245,6 +245,12 @@ typedef struct { char *app; /** Number of copies of this process that are to be launched */ orte_std_cntr_t num_procs; + /** Array of pointers to the proc objects for procs of this app_context + * NOTE - not always used + */ + opal_pointer_array_t procs; + /** State of the app_context */ + orte_app_state_t state; /** First MPI rank of this app_context in the job */ orte_vpid_t first_rank; /** Standard argv-style array, including a final NULL pointer */ @@ -362,6 +368,10 @@ typedef struct { opal_list_item_t super; /* jobid for this job */ orte_jobid_t jobid; + /* flag indicating that the job has been updated + * and needs to be included in the pidmap message + */ + bool updated; /* app_context array for this job */ opal_pointer_array_t *apps; /* number of app_contexts in the array */ @@ -423,8 +433,6 @@ typedef struct { /* max time for launch msg to be received */ struct timeval max_launch_msg_recvd; orte_vpid_t num_local_procs; - /* pidmap for delivery to procs */ - opal_byte_object_t *pmap; #if OPAL_ENABLE_FT_CR == 1 /* ckpt state */ size_t ckpt_state; @@ -471,6 +479,10 @@ struct orte_proc_t { bool alive; /* flag if it called abort */ bool aborted; + /* flag that the proc has been updated and need to be + * included in the next pidmap message + */ + bool updated; /* exit code */ orte_exit_code_t exit_code; /* the app_context that generated this proc */ @@ -514,6 +526,7 @@ struct orte_proc_t { opal_ring_buffer_t stats; /* track finalization */ bool registered; + bool mpi_proc; bool deregistered; bool iof_complete; bool waitpid_recvd; @@ -682,6 +695,10 @@ ORTE_DECLSPEC extern int orte_max_vm_size; /* record the selected oob component */ ORTE_DECLSPEC extern char *orte_selected_oob_component; +/* global nidmap/pidmap for daemons to give to apps */ +ORTE_DECLSPEC extern opal_byte_object_t orte_nidmap; +ORTE_DECLSPEC extern opal_byte_object_t orte_pidmap; + #endif /* ORTE_DISABLE_FULL_SUPPORT */ END_C_DECLS diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index 68b4dfdb1c..0d16fe3cb6 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -521,6 +521,10 @@ static opal_cmd_line_init_t cmd_line_init[] = { NULL, OPAL_CMD_LINE_TYPE_BOOL, "Execute without creating an allocation-spanning virtual machine (only start daemons on nodes hosting application procs)" }, + { "state", "staged", "select", '\0', "staged", "staged", 0, + NULL, OPAL_CMD_LINE_TYPE_BOOL, + "Used staged execution if inadequate resources are present (cannot support MPI jobs)" }, + /* End of list */ { NULL, NULL, NULL, '\0', NULL, NULL, 0, NULL, OPAL_CMD_LINE_TYPE_NULL, NULL } diff --git a/orte/util/error_strings.c b/orte/util/error_strings.c index d5e3a22ada..f391330647 100644 --- a/orte/util/error_strings.c +++ b/orte/util/error_strings.c @@ -288,6 +288,24 @@ const char *orte_job_state_to_str(orte_job_state_t state) } } +const char *orte_app_ctx_state_to_str(orte_app_state_t state) +{ + switch(state) { + case ORTE_APP_STATE_UNDEF: + return "UNDEFINED"; + case ORTE_APP_STATE_INIT: + return "PENDING INIT"; + case ORTE_APP_STATE_ALL_MAPPED: + return "ALL MAPPED"; + case ORTE_APP_STATE_RUNNING: + return "RUNNING"; + case ORTE_APP_STATE_COMPLETED: + return "COMPLETED"; + default: + return "UNKNOWN STATE!"; + } +} + const char *orte_proc_state_to_str(orte_proc_state_t state) { switch(state) { diff --git a/orte/util/error_strings.h b/orte/util/error_strings.h index 9ca5ca9656..c8abebfda0 100644 --- a/orte/util/error_strings.h +++ b/orte/util/error_strings.h @@ -10,7 +10,8 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved * $COPYRIGHT$ * * Additional copyrights may follow @@ -37,6 +38,8 @@ ORTE_DECLSPEC int orte_err2str(int errnum, const char **errmsg); ORTE_DECLSPEC const char *orte_job_state_to_str(orte_job_state_t state); +ORTE_DECLSPEC const char *orte_app_ctx_state_to_str(orte_app_state_t state); + ORTE_DECLSPEC const char *orte_proc_state_to_str(orte_proc_state_t state); END_C_DECLS diff --git a/orte/util/nidmap.c b/orte/util/nidmap.c index 5f44385079..f42bf8d49b 100644 --- a/orte/util/nidmap.c +++ b/orte/util/nidmap.c @@ -238,45 +238,53 @@ int orte_util_build_daemon_nidmap(char **nodes) } #endif -int orte_util_encode_nodemap(opal_byte_object_t *boptr) +int orte_util_encode_nodemap(opal_byte_object_t *boptr, bool update) { - orte_vpid_t vpid; orte_node_t *node; - int32_t i, num_nodes; + int32_t i; int rc; opal_buffer_t buf; char *ptr, *nodename; + orte_job_t *daemons; + orte_proc_t *dmn; + + /* if the daemon job has not been updated, then there is + * nothing to send + */ + daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + if (update && !daemons->updated) { + boptr->bytes = NULL; + boptr->size = 0; + return ORTE_SUCCESS; + } /* setup a buffer for tmp use */ OBJ_CONSTRUCT(&buf, opal_buffer_t); - /* determine the number of nodes in the global node array */ - num_nodes = 0; - for (i=0; i < orte_node_pool->size; i++) { - if (NULL == opal_pointer_array_get_item(orte_node_pool, i)) { + /* only send info on nodes that have daemons on them, and + * only regarding daemons that have changed - i.e., new + * daemons since the last time we sent the info - so we + * minimize the size of the nidmap message. The daemon + * will maintain a global picture of the overall nidmap + * as it receives updates, and pass that down to the procs + */ + for (i=0; i < daemons->procs->size; i++) { + if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, i))) { continue; } - ++num_nodes; - } - - /* pack number of nodes */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_nodes, 1, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* pack the data for each node by daemon */ - for (i=0; i < orte_node_pool->size; i++) { - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { + /* if we want an update nidmap and this daemon hasn't + * been updated, then skip it + */ + if (update && !dmn->updated) { continue; } - if (NULL == node->daemon) { - /* some nodes may not have daemons on them */ - vpid = ORTE_VPID_INVALID; - } else { - vpid = node->daemon->name.vpid; + /* if the daemon doesn't have a node, that's an error */ + if (NULL == (node = dmn->node)) { + opal_output(0, "DAEMON %s HAS NO NODE", ORTE_NAME_PRINT(&dmn->name)); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; } - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &vpid, 1, ORTE_VPID))) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &dmn->name.vpid, 1, ORTE_VPID))) { ORTE_ERROR_LOG(rc); return rc; } @@ -315,10 +323,10 @@ int orte_util_encode_nodemap(opal_byte_object_t *boptr) int orte_util_decode_nodemap(opal_byte_object_t *bo) { int n; - int32_t num_nodes, i, num_daemons; + int32_t num_daemons; orte_process_name_t daemon; opal_buffer_t buf; - int rc; + int rc=ORTE_SUCCESS; uint8_t oversub; char *nodename; @@ -330,31 +338,13 @@ int orte_util_decode_nodemap(opal_byte_object_t *bo) OBJ_CONSTRUCT(&buf, opal_buffer_t); opal_dss.load(&buf, bo->bytes, bo->size); - /* unpack number of nodes */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_nodes, &n, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - OPAL_OUTPUT_VERBOSE((1, orte_nidmap_output, - "%s decode:nidmap decoding %d nodes", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_nodes)); - /* set the daemon jobid */ daemon.jobid = ORTE_DAEMON_JOBID(ORTE_PROC_MY_NAME->jobid); num_daemons = 0; - for (i=0; i < num_nodes; i++) { - /* unpack the daemon vpid */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &daemon.vpid, &n, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - return rc; - } - if (ORTE_VPID_INVALID != daemon.vpid) { - ++num_daemons; - } + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &daemon.vpid, &n, ORTE_VPID))) { + ++num_daemons; /* unpack and store the node's name */ n=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &nodename, &n, OPAL_STRING))) { @@ -387,23 +377,27 @@ int orte_util_decode_nodemap(opal_byte_object_t *bo) return rc; } } - + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + } else { + rc = ORTE_SUCCESS; + } + /* update num_daemons */ orte_process_info.num_daemons = num_daemons; OBJ_DESTRUCT(&buf); - return ORTE_SUCCESS; + return rc; } /* decode a nodemap for a daemon */ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) { int n; - int32_t num_nodes, i; orte_vpid_t vpid; orte_node_t *node; opal_buffer_t buf; - int rc; + int rc=ORTE_SUCCESS; uint8_t *oversub; char *name; orte_job_t *daemons; @@ -417,34 +411,12 @@ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) OBJ_CONSTRUCT(&buf, opal_buffer_t); opal_dss.load(&buf, bo->bytes, bo->size); - /* unpack number of nodes */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_nodes, &n, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - OPAL_OUTPUT_VERBOSE((1, orte_nidmap_output, - "%s decode:nidmap decoding %d nodes", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_nodes)); - - /* set the size of the node pool storage so we minimize realloc's */ - if (ORTE_SUCCESS != (rc = opal_pointer_array_set_size(orte_node_pool, num_nodes))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* transfer the data to the nodes, counting the number of * daemons in the system */ daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); - for (i=0; i < num_nodes; i++) { - /* unpack the daemon vpid */ - n=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &vpid, &n, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - return rc; - } + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &vpid, &n, ORTE_VPID))) { /* unpack and store the node's name */ n=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &name, &n, OPAL_STRING))) { @@ -465,10 +437,6 @@ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) ORTE_ERROR_LOG(rc); return rc; } - if (ORTE_VPID_INVALID == vpid) { - /* no daemon on this node */ - continue; - } if (NULL == (dptr = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, vpid))) { dptr = OBJ_NEW(orte_proc_t); dptr->name.jobid = ORTE_PROC_MY_NAME->jobid; @@ -492,7 +460,13 @@ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) node->oversubscribed = true; } } - + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + rc = ORTE_SUCCESS; + orte_process_info.num_procs = daemons->num_procs; if (orte_process_info.max_procs < orte_process_info.num_procs) { @@ -502,8 +476,19 @@ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) /* update num_daemons */ orte_process_info.num_daemons = daemons->num_procs; + /* update the global nidmap object for sending to + * application procs + */ + if (NULL != orte_nidmap.bytes) { + free(orte_nidmap.bytes); + } + if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(&orte_nidmap, false))) { + ORTE_ERROR_LOG(rc); + } + if (0 < opal_output_get_verbosity(orte_nidmap_output)) { - for (i=0; i < num_nodes; i++) { + int i; + for (i=0; i < orte_node_pool->size; i++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { continue; } @@ -515,24 +500,15 @@ int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo) } OBJ_DESTRUCT(&buf); - return ORTE_SUCCESS; + return rc; } -int orte_util_encode_pidmap(opal_byte_object_t *boptr) +int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update) { orte_proc_t *proc; opal_buffer_t buf; - orte_local_rank_t *lrank = NULL; - orte_node_rank_t *nrank = NULL; - orte_job_t *jdata = NULL; - orte_vpid_t *daemons = NULL; - int i, j, k, rc = ORTE_SUCCESS; -#if OPAL_HAVE_HWLOC - unsigned int *bind_idx=NULL; -#endif - orte_proc_state_t *states=NULL; - orte_app_idx_t *app_idx=NULL; - int32_t *restarts=NULL; + int i, j, rc = ORTE_SUCCESS; + orte_job_t *jdata; /* setup the working buffer */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -551,6 +527,16 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr) if (NULL == jdata->map) { continue; } + /* if this job has already terminated, then ignore it */ + if (ORTE_JOB_STATE_TERMINATED < jdata->state) { + continue; + } + /* if we want an update version and there is nothing to update, ignore it */ + if (update && !jdata->updated) { + continue; + } + /* flag that we included it so we don't do so again */ + jdata->updated = false; /* pack the jobid */ if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->jobid, 1, ORTE_JOBID))) { ORTE_ERROR_LOG(rc); @@ -569,70 +555,54 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr) } #endif - /* allocate memory for the nodes, local ranks, node ranks, and bind_idx */ - daemons = (orte_vpid_t*)malloc(jdata->num_procs * sizeof(orte_vpid_t)); - lrank = (orte_local_rank_t*)malloc(jdata->num_procs*sizeof(orte_local_rank_t)); - nrank = (orte_node_rank_t*)malloc(jdata->num_procs*sizeof(orte_node_rank_t)); - states = (orte_proc_state_t*)malloc(jdata->num_procs*sizeof(orte_proc_state_t)); - app_idx = (orte_app_idx_t*)malloc(jdata->num_procs*sizeof(orte_app_idx_t)); - restarts = (int32_t*)malloc(jdata->num_procs*sizeof(int32_t)); -#if OPAL_HAVE_HWLOC - bind_idx = (unsigned int*)malloc(jdata->num_procs*sizeof(unsigned int)); -#endif - /* transfer and pack the node info in one pack */ - for (i=0, k=0; i < jdata->procs->size; i++) { + /* cycle thru the job's procs, including only those that have + * been updated so we minimize the amount of info being sent + */ + for (i=0; i < jdata->procs->size; i++) { if (NULL == (proc = (orte_proc_t *) opal_pointer_array_get_item(jdata->procs, i))) { continue; } - if( k >= (int)jdata->num_procs ) { - orte_show_help("help-orte-runtime.txt", "orte_nidmap:too_many_nodes", - true, jdata->num_procs); - break; + if (!proc->updated) { + continue; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->name.vpid, 1, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &(proc->node->daemon->name.vpid), 1, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->local_rank, 1, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->node_rank, 1, ORTE_NODE_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; } - daemons[k] = proc->node->daemon->name.vpid; - lrank[k] = proc->local_rank; - nrank[k] = proc->node_rank; - states[k] = proc->state; - app_idx[k] = proc->app_idx; - restarts[k] = proc->restarts; #if OPAL_HAVE_HWLOC - bind_idx[k] = proc->bind_idx; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->bind_idx, 1, OPAL_UINT))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } #endif - ++k; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->state, 1, ORTE_PROC_STATE))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->app_idx, 1, ORTE_APP_IDX))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->restarts, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup_and_return; + } } - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, daemons, jdata->num_procs, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } - /* transfer and pack the local_ranks in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, lrank, jdata->num_procs, ORTE_LOCAL_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } - /* transfer and pack the node ranks in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, nrank, jdata->num_procs, ORTE_NODE_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } -#if OPAL_HAVE_HWLOC - /* transfer and pack the bind_idx in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, bind_idx, jdata->num_procs, OPAL_UINT))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } -#endif - /* transfer and pack the states in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, states, jdata->num_procs, ORTE_PROC_STATE))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } - /* transfer and pack the app_idx's in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, app_idx, jdata->num_procs, ORTE_APP_IDX))) { - ORTE_ERROR_LOG(rc); - goto cleanup_and_return; - } - /* transfer and pack the restarts in one pack */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, restarts, jdata->num_procs, OPAL_INT32))) { + /* pack an invalid vpid to flag the end of this job data */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &ORTE_NAME_INVALID->vpid, 1, ORTE_VPID))) { ORTE_ERROR_LOG(rc); goto cleanup_and_return; } @@ -642,30 +612,6 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr) opal_dss.unload(&buf, (void**)&boptr->bytes, &boptr->size); cleanup_and_return: - - if( NULL != lrank ) { - free(lrank); - } - if( NULL != nrank ) { - free(nrank); - } - if( NULL != daemons ) { - free(daemons); - } -#if OPAL_HAVE_HWLOC - if( NULL != bind_idx ) { - free(bind_idx); - } -#endif - if (NULL != states) { - free(states); - } - if (NULL != app_idx) { - free(app_idx); - } - if (NULL != restarts) { - free(restarts); - } OBJ_DESTRUCT(&buf); return rc; @@ -675,20 +621,19 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr) int orte_util_decode_pidmap(opal_byte_object_t *bo) { orte_vpid_t i, num_procs, *vptr, daemon; - orte_vpid_t *daemons=NULL; - orte_local_rank_t *local_rank=NULL; - orte_node_rank_t *node_rank=NULL; + orte_local_rank_t local_rank; + orte_node_rank_t node_rank; #if OPAL_HAVE_HWLOC opal_hwloc_level_t bind_level = OPAL_HWLOC_NODE_LEVEL, pbind, *lvptr; - unsigned int *bind_idx=NULL, pbidx, *uiptr; + unsigned int bind_idx, pbidx, *uiptr; opal_hwloc_locality_t locality; #endif orte_std_cntr_t n; opal_buffer_t buf; int rc; - orte_proc_state_t *states = NULL; - orte_app_idx_t *app_idx = NULL; - int32_t *restarts = NULL; + orte_proc_state_t state; + orte_app_idx_t app_idx; + int32_t restarts; orte_process_name_t proc, dmn; orte_namelist_t *nm; opal_list_t jobs; @@ -701,6 +646,9 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) goto cleanup; } + /* set the daemon jobid */ + dmn.jobid = ORTE_DAEMON_JOBID(ORTE_PROC_MY_NAME->jobid); + n = 1; /* cycle through the buffer */ OBJ_CONSTRUCT(&jobs, opal_list_t); @@ -750,123 +698,91 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) ORTE_VPID_PRINT(num_procs), opal_hwloc_base_print_level(bind_level))); - /* allocate memory for the daemon info */ - daemons = (orte_vpid_t*)malloc(num_procs * sizeof(orte_vpid_t)); - /* unpack it in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, daemons, &n, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* allocate memory for local ranks */ - local_rank = (orte_local_rank_t*)malloc(num_procs*sizeof(orte_local_rank_t)); - /* unpack them in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, local_rank, &n, ORTE_LOCAL_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (proc.jobid == ORTE_PROC_MY_NAME->jobid) { - /* set mine */ - orte_process_info.my_local_rank = local_rank[ORTE_PROC_MY_NAME->vpid]; - if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_LOCALRANK, - &orte_process_info.my_local_rank, ORTE_LOCAL_RANK))) { + /* cycle thru the data until we hit an INVALID vpid indicating + * all data for this job has been read + */ + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &proc.vpid, &n, ORTE_VPID))) { + if (ORTE_VPID_INVALID == proc.vpid) { + break; + } + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &dmn.vpid, &n, ORTE_VPID))) { ORTE_ERROR_LOG(rc); goto cleanup; } - } - - /* allocate memory for node ranks */ - node_rank = (orte_node_rank_t*)malloc(num_procs*sizeof(orte_node_rank_t)); - /* unpack node ranks in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, node_rank, &n, ORTE_NODE_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (proc.jobid == ORTE_PROC_MY_NAME->jobid) { - /* set mine */ - orte_process_info.my_node_rank = node_rank[ORTE_PROC_MY_NAME->vpid]; - if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_NODERANK, - &orte_process_info.my_node_rank, ORTE_NODE_RANK))) { + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &local_rank, &n, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &node_rank, &n, ORTE_NODE_RANK))) { ORTE_ERROR_LOG(rc); goto cleanup; } - } - #if OPAL_HAVE_HWLOC - /* allocate memory for bind_idx */ - bind_idx = (unsigned int*)malloc(num_procs*sizeof(unsigned int)); - /* unpack bind_idx in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, bind_idx, &n, OPAL_UINT))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - if (proc.jobid == ORTE_PROC_MY_NAME->jobid) { - /* set mine */ - orte_process_info.bind_idx = bind_idx[ORTE_PROC_MY_NAME->vpid]; - if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_BIND_INDEX, - &orte_process_info.bind_idx, OPAL_UINT))) { + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bind_idx, &n, OPAL_UINT))) { ORTE_ERROR_LOG(rc); goto cleanup; } - } #endif - - /* allocate memory for states */ - states = (orte_proc_state_t*)malloc(num_procs*sizeof(orte_proc_state_t)); - /* unpack states in one shot */ - n=num_procs; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, states, &n, ORTE_PROC_STATE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* dump this info - apps don't need it */ - free(states); - states = NULL; - - /* allocate memory for app_idx's */ - app_idx = (orte_app_idx_t*)malloc(num_procs*sizeof(orte_app_idx_t)); - /* unpack app_idx's in one shot */ - n=num_procs; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, app_idx, &n, ORTE_APP_IDX))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* dump this info - apps don't need it */ - free(app_idx); - app_idx = NULL; - - /* allocate memory for restarts */ - restarts = (int32_t*)malloc(num_procs*sizeof(int32_t)); - /* unpack restarts in one shot */ - n=num_procs; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, restarts, &n, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - /* dump this info - apps don't need it */ - free(restarts); - restarts = NULL; - - /* set the daemon jobid */ - dmn.jobid = ORTE_DAEMON_JOBID(ORTE_PROC_MY_NAME->jobid); - - /* xfer the data */ - for (i=0; i < num_procs; i++) { if (proc.jobid == ORTE_PROC_MY_NAME->jobid && - i == ORTE_PROC_MY_NAME->vpid) { + proc.vpid == ORTE_PROC_MY_NAME->vpid) { + /* set mine */ + orte_process_info.my_local_rank = local_rank; + if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_LOCALRANK, + &orte_process_info.my_local_rank, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + orte_process_info.my_node_rank = node_rank; + if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_NODERANK, + &orte_process_info.my_node_rank, ORTE_NODE_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } +#if OPAL_HAVE_HWLOC + orte_process_info.bind_idx = bind_idx; + if (ORTE_SUCCESS != (rc = orte_db.store(ORTE_PROC_MY_NAME, ORTE_DB_BIND_INDEX, + &orte_process_info.bind_idx, OPAL_UINT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } +#endif + } + /* apps don't need the rest of the data in the buffer for this proc, + * but we have to unpack it anyway to stay in sync + */ + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &state, &n, ORTE_PROC_STATE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &app_idx, &n, ORTE_APP_IDX))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* we don't need to store data for ourself in the database + * as we already did so + */ + if (proc.jobid == ORTE_PROC_MY_NAME->jobid && + proc.vpid == ORTE_PROC_MY_NAME->vpid) { continue; } - proc.vpid = i; - if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_DAEMON_VPID, &daemons[i], ORTE_VPID))) { + /* store the data for this proc */ + if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_DAEMON_VPID, &dmn.vpid, ORTE_VPID))) { ORTE_ERROR_LOG(rc); goto cleanup; } /* lookup and store the hostname for this proc */ - dmn.vpid = daemons[i]; if (ORTE_SUCCESS != (rc = orte_db.fetch_pointer(&dmn, ORTE_DB_HOSTNAME, (void**)&hostname, OPAL_STRING))) { ORTE_ERROR_LOG(rc); goto cleanup; @@ -875,37 +791,21 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) ORTE_ERROR_LOG(rc); goto cleanup; } - if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_LOCALRANK, &local_rank[i], ORTE_LOCAL_RANK))) { + if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) { ORTE_ERROR_LOG(rc); goto cleanup; } - if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_NODERANK, &node_rank[i], ORTE_NODE_RANK))) { + if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) { ORTE_ERROR_LOG(rc); goto cleanup; } #if OPAL_HAVE_HWLOC - if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_BIND_INDEX, &bind_idx[i], OPAL_UINT))) { + if (ORTE_SUCCESS != (rc = orte_db.store(&proc, ORTE_DB_BIND_INDEX, &bind_idx, OPAL_UINT))) { ORTE_ERROR_LOG(rc); goto cleanup; } #endif - OPAL_OUTPUT_VERBOSE((10, orte_nidmap_output, - "%s orte:util:decode:pidmap proc %s host %s lrank %d nrank %d bindidx %u", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc), hostname, - (int)local_rank[i], (int)node_rank[i], bind_idx[i])); } - /* release data */ - free(daemons); - daemons = NULL; - free(local_rank); - local_rank = NULL; - free(node_rank); - node_rank = NULL; -#if OPAL_HAVE_HWLOC - free(bind_idx); - bind_idx = NULL; -#endif /* setup for next cycle */ n = 1; } @@ -979,7 +879,7 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) } else { /* we don't share a node */ OPAL_OUTPUT_VERBOSE((2, orte_nidmap_output, - "%s orte:util:decode:pidmap proc %s does NOT node [my daemon %s, their daemon %s]", + "%s orte:util:decode:pidmap proc %s does NOT share my node [my daemon %s, their daemon %s]", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&proc), ORTE_VPID_PRINT(ORTE_PROC_MY_DAEMON->vpid), @@ -1001,29 +901,6 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) cleanup: - if (NULL != daemons) { - free(daemons); - } - if (NULL != local_rank) { - free(local_rank); - } - if (NULL != node_rank) { - free(node_rank); - } -#if OPAL_HAVE_HWLOC - if (NULL != bind_idx) { - free(bind_idx); - } -#endif - if (NULL != states) { - free(states); - } - if (NULL != app_idx) { - free(app_idx); - } - if (NULL != restarts) { - free(restarts); - } OBJ_DESTRUCT(&buf); return rc; } @@ -1031,13 +908,12 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo) int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) { orte_jobid_t jobid; - orte_vpid_t i, num_procs; - orte_vpid_t *nodes=NULL; - orte_local_rank_t *local_rank=NULL; - orte_node_rank_t *node_rank=NULL; + orte_vpid_t vpid, num_procs, dmn; + orte_local_rank_t local_rank; + orte_node_rank_t node_rank; #if OPAL_HAVE_HWLOC opal_hwloc_level_t bind_level = OPAL_HWLOC_NODE_LEVEL; - unsigned int *bind_idx=NULL; + unsigned int bind_idx; #endif orte_std_cntr_t n; opal_buffer_t buf; @@ -1045,9 +921,9 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) orte_job_t *jdata, *daemons; orte_proc_t *proc, *pptr; orte_node_t *node, *nptr; - orte_proc_state_t *states=NULL; - orte_app_idx_t *app_idx=NULL; - int32_t *restarts=NULL; + orte_proc_state_t state; + orte_app_idx_t app_idx; + int32_t restarts; orte_job_map_t *map; bool found; @@ -1087,98 +963,78 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) goto cleanup; } #endif - - /* allocate memory for the node info */ - nodes = (orte_vpid_t*)malloc(num_procs * 4); - /* unpack it in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, nodes, &n, ORTE_VPID))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* allocate memory for local ranks */ - local_rank = (orte_local_rank_t*)malloc(num_procs*sizeof(orte_local_rank_t)); - /* unpack them in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, local_rank, &n, ORTE_LOCAL_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* allocate memory for node ranks */ - node_rank = (orte_node_rank_t*)malloc(num_procs*sizeof(orte_node_rank_t)); - /* unpack node ranks in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, node_rank, &n, ORTE_NODE_RANK))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - -#if OPAL_HAVE_HWLOC - /* allocate memory for bind_idx */ - bind_idx = (unsigned int*)malloc(num_procs*sizeof(unsigned int)); - /* unpack bind_idx in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, bind_idx, &n, OPAL_UINT))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } -#endif - - /* allocate memory for states */ - states = (orte_proc_state_t*)malloc(num_procs*sizeof(orte_proc_state_t)); - /* unpack states in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, states, &n, ORTE_PROC_STATE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* allocate memory for app_idx's */ - app_idx = (orte_app_idx_t*)malloc(num_procs*sizeof(orte_app_idx_t)); - /* unpack app_idx's in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, app_idx, &n, ORTE_APP_IDX))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* allocate memory for restarts */ - restarts = (int32_t*)malloc(num_procs*sizeof(int32_t)); - /* unpack restarts in one shot */ - n=num_procs; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, restarts, &n, OPAL_INT32))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* xfer the data */ + /* setup the map */ map = jdata->map; if (NULL == map) { jdata->map = OBJ_NEW(orte_job_map_t); map = jdata->map; } - for (i=0; i < num_procs; i++) { - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) { + + /* cycle thru the data until we hit an INVALID vpid indicating + * all data for this job has been read + */ + n=1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &vpid, &n, ORTE_VPID))) { + if (ORTE_VPID_INVALID == vpid) { + break; + } + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &dmn, &n, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &local_rank, &n, ORTE_LOCAL_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &node_rank, &n, ORTE_NODE_RANK))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } +#if OPAL_HAVE_HWLOC + n=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bind_idx, &n, OPAL_UINT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } +#endif + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &state, &n, ORTE_PROC_STATE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &app_idx, &n, ORTE_APP_IDX))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + n=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &restarts, &n, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* store the data for this proc */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) { proc = OBJ_NEW(orte_proc_t); proc->name.jobid = jdata->jobid; - proc->name.vpid = i; - opal_pointer_array_set_item(jdata->procs, i, proc); + proc->name.vpid = vpid; + opal_pointer_array_set_item(jdata->procs, vpid, proc); } /* lookup the node - should always be present */ - if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, nodes[i]))) { + if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, dmn))) { /* this should never happen, but protect ourselves anyway */ node = OBJ_NEW(orte_node_t); /* get the daemon */ - if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, nodes[i]))) { + if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, dmn))) { pptr = OBJ_NEW(orte_proc_t); pptr->name.jobid = ORTE_PROC_MY_NAME->jobid; - pptr->name.vpid = nodes[i]; - opal_pointer_array_set_item(daemons->procs, nodes[i], pptr); + pptr->name.vpid = dmn; + opal_pointer_array_set_item(daemons->procs, dmn, pptr); } node->daemon = pptr; - opal_pointer_array_add(orte_node_pool, node); + opal_pointer_array_set_item(orte_node_pool, dmn, node); } if (NULL != proc->node) { if (node != proc->node) { @@ -1235,61 +1091,32 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo) OBJ_RETAIN(proc); opal_pointer_array_add(node->procs, proc); /* update proc values */ - proc->local_rank = local_rank[i]; - proc->node_rank = node_rank[i]; - proc->app_idx = app_idx[i]; - proc->restarts = restarts[i]; - proc->state = states[i]; + proc->local_rank = local_rank; + proc->node_rank = node_rank; + proc->app_idx = app_idx; + proc->restarts = restarts; + proc->state = state; } - - /* release data */ - free(nodes); - nodes = NULL; - free(local_rank); - local_rank = NULL; - free(node_rank); - node_rank = NULL; -#if OPAL_HAVE_HWLOC - free(bind_idx); - bind_idx = NULL; -#endif - free(states); - states = NULL; - free(app_idx); - app_idx = NULL; - free(restarts); - restarts = NULL; /* setup for next cycle */ n = 1; } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - rc = ORTE_SUCCESS; + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + goto cleanup; } + rc = ORTE_SUCCESS; + /* update our global pidmap object for sending + * to procs + */ + if (NULL != orte_pidmap.bytes) { + free(orte_pidmap.bytes); + } + if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(&orte_pidmap, false))) { + ORTE_ERROR_LOG(rc); + } + cleanup: - if (NULL != nodes) { - free(nodes); - } - if (NULL != local_rank) { - free(local_rank); - } - if (NULL != node_rank) { - free(node_rank); - } -#if OPAL_HAVE_HWLOC - if (NULL != bind_idx) { - free(bind_idx); - } -#endif - if (NULL != states) { - free(states); - } - if (NULL != app_idx) { - free(app_idx); - } - if (NULL != restarts) { - free(restarts); - } OBJ_DESTRUCT(&buf); return rc; } diff --git a/orte/util/nidmap.h b/orte/util/nidmap.h index 8e10b44f2d..a8d716d702 100644 --- a/orte/util/nidmap.h +++ b/orte/util/nidmap.h @@ -43,11 +43,11 @@ BEGIN_C_DECLS ORTE_DECLSPEC int orte_util_nidmap_init(opal_buffer_t *buffer); ORTE_DECLSPEC void orte_util_nidmap_finalize(void); -ORTE_DECLSPEC int orte_util_encode_nodemap(opal_byte_object_t *boptr); +ORTE_DECLSPEC int orte_util_encode_nodemap(opal_byte_object_t *boptr, bool update); ORTE_DECLSPEC int orte_util_decode_nodemap(opal_byte_object_t *boptr); ORTE_DECLSPEC int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo); -ORTE_DECLSPEC int orte_util_encode_pidmap(opal_byte_object_t *boptr); +ORTE_DECLSPEC int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update); ORTE_DECLSPEC int orte_util_decode_pidmap(opal_byte_object_t *boptr); ORTE_DECLSPEC int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo);