1
1

Per request from Nathan, add an offset value to the job struct so we can construct a "global rank" that spans multiple jobs during dynamic launch operations. Store a new ORTE_DB_GLOBAL_RANK value for each process in the database, and ensure that we share our own value during connect_accept so both sides can see it.

This isn't being used yet - just enabling Nathan to do what he needs.

***** NOTE: any use of the OMPI_DB_GLOBAL_RANK database key must be protected by #ifdef OMPI_DB_GLOBAL_RANK as not all RTE's will define this key. *****

This commit was SVN r29708.
Этот коммит содержится в:
Ralph Castain 2013-11-14 17:01:43 +00:00
родитель c4e4871ee5
Коммит 7480beb7f0
6 изменённых файлов: 64 добавлений и 9 удалений

Просмотреть файл

@ -102,6 +102,7 @@ OMPI_DECLSPEC int ompi_rte_db_remove(const ompi_process_name_t *nm,
const char *key); const char *key);
#define OMPI_DB_HOSTNAME ORTE_DB_HOSTNAME #define OMPI_DB_HOSTNAME ORTE_DB_HOSTNAME
#define OMPI_DB_LOCALITY ORTE_DB_LOCALITY #define OMPI_DB_LOCALITY ORTE_DB_LOCALITY
#define OMPI_DB_GLOBAL_RANK ORTE_DB_GLOBAL_RANK
/* Communications */ /* Communications */
typedef orte_rml_tag_t ompi_rml_tag_t; typedef orte_rml_tag_t ompi_rml_tag_t;

Просмотреть файл

@ -212,6 +212,13 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
} }
#endif #endif
/* set the offset so shared memory components can potentially
* connect to any spawned jobs
*/
jdata->offset = orte_total_procs;
/* track the total number of procs launched by us */
orte_total_procs += jdata->num_procs;
/* if it is a dynamic spawn, save the bookmark on the parent's job too */ /* if it is a dynamic spawn, save the bookmark on the parent's job too */
if (ORTE_JOBID_INVALID != jdata->originator.jobid) { if (ORTE_JOBID_INVALID != jdata->originator.jobid) {
if (NULL != (parent = orte_get_job_data_object(jdata->originator.jobid))) { if (NULL != (parent = orte_get_job_data_object(jdata->originator.jobid))) {
@ -223,7 +230,7 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
* daemon job * daemon job
*/ */
if (jdata->map->display_map) { if (jdata->map->display_map) {
char *output; char *output=NULL;
int i, j; int i, j;
orte_node_t *node; orte_node_t *node;
orte_proc_t *proc; orte_proc_t *proc;
@ -236,7 +243,8 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
* the output a line at a time here * the output a line at a time here
*/ */
/* display just the procs in a diffable format */ /* display just the procs in a diffable format */
opal_output(orte_clean_output, "<map>"); opal_output(orte_clean_output, "<map>\n\t<jobid=%s>\n\t<offset=%s>",
ORTE_JOBID_PRINT(jdata->jobid), ORTE_VPID_PRINT(jdata->offset));
fflush(stderr); fflush(stderr);
/* loop through nodes */ /* loop through nodes */
for (i=0; i < jdata->map->nodes->size; i++) { for (i=0; i < jdata->map->nodes->size; i++) {
@ -302,6 +310,7 @@ void orte_rmaps_base_map_job(int fd, short args, void *cbdata)
fflush(stderr); fflush(stderr);
#endif #endif
} else { } else {
opal_output(orte_clean_output, " Data for JOB %s offset %s", ORTE_JOBID_PRINT(jdata->jobid), ORTE_VPID_PRINT(jdata->offset));
opal_dss.print(&output, NULL, jdata->map, ORTE_JOB_MAP); opal_dss.print(&output, NULL, jdata->map, ORTE_JOB_MAP);
if (orte_xml_output) { if (orte_xml_output) {
fprintf(orte_xml_fp, "%s\n", output); fprintf(orte_xml_fp, "%s\n", output);

Просмотреть файл

@ -251,7 +251,7 @@ int orte_dt_print_job(char **output, char *prefix, orte_job_t *src, opal_data_ty
tmp = tmp2; tmp = tmp2;
} }
asprintf(&tmp2, "%s\n%sNum procs: %ld", tmp, pfx, (long)src->num_procs); asprintf(&tmp2, "%s\n%sNum procs: %ld\tOffset: %ld", tmp, pfx, (long)src->num_procs, (long)src->offset);
free(tmp); free(tmp);
tmp = tmp2; tmp = tmp2;

Просмотреть файл

@ -134,7 +134,7 @@ opal_pointer_array_t *orte_job_data;
opal_pointer_array_t *orte_node_pool; opal_pointer_array_t *orte_node_pool;
opal_pointer_array_t *orte_node_topologies; opal_pointer_array_t *orte_node_topologies;
opal_pointer_array_t *orte_local_children; opal_pointer_array_t *orte_local_children;
uint16_t orte_num_jobs = 0; orte_vpid_t orte_total_procs = 0;
/* IOF controls */ /* IOF controls */
bool orte_tag_output; bool orte_tag_output;
@ -694,6 +694,7 @@ OBJ_CLASS_INSTANCE(orte_app_context_t,
static void orte_job_construct(orte_job_t* job) static void orte_job_construct(orte_job_t* job)
{ {
job->jobid = ORTE_JOBID_INVALID; job->jobid = ORTE_JOBID_INVALID;
job->offset = 0;
job->updated = true; job->updated = true;
job->apps = OBJ_NEW(opal_pointer_array_t); job->apps = OBJ_NEW(opal_pointer_array_t);
opal_pointer_array_init(job->apps, opal_pointer_array_init(job->apps,

Просмотреть файл

@ -124,9 +124,10 @@ ORTE_DECLSPEC extern int orte_exit_status;
#define ORTE_DB_LOCALRANK "orte.local.rank" #define ORTE_DB_LOCALRANK "orte.local.rank"
#define ORTE_DB_ARCH "orte.arch" #define ORTE_DB_ARCH "orte.arch"
#define ORTE_DB_NPROCS "orte.nprocs" #define ORTE_DB_NPROCS "orte.nprocs"
#define ORTE_DB_NPROC_OFFSET "orte.nproc.offset"
#define ORTE_DB_RMLURI "orte.rmluri" #define ORTE_DB_RMLURI "orte.rmluri"
#define ORTE_DB_HOSTID "orte.hostid" #define ORTE_DB_HOSTID "orte.hostid"
#define ORTE_DB_GLOBAL_RANK "orte.global.rank"
/* State Machine lists */ /* State Machine lists */
ORTE_DECLSPEC extern opal_list_t orte_job_states; ORTE_DECLSPEC extern opal_list_t orte_job_states;
@ -376,6 +377,8 @@ typedef struct {
opal_list_item_t super; opal_list_item_t super;
/* jobid for this job */ /* jobid for this job */
orte_jobid_t jobid; orte_jobid_t jobid;
/* offset to the total number of procs */
orte_vpid_t offset;
/* flag indicating that the job has been updated /* flag indicating that the job has been updated
* and needs to be included in the pidmap message * and needs to be included in the pidmap message
*/ */
@ -673,7 +676,7 @@ ORTE_DECLSPEC extern opal_pointer_array_t *orte_job_data;
ORTE_DECLSPEC extern opal_pointer_array_t *orte_node_pool; ORTE_DECLSPEC extern opal_pointer_array_t *orte_node_pool;
ORTE_DECLSPEC extern opal_pointer_array_t *orte_node_topologies; ORTE_DECLSPEC extern opal_pointer_array_t *orte_node_topologies;
ORTE_DECLSPEC extern opal_pointer_array_t *orte_local_children; ORTE_DECLSPEC extern opal_pointer_array_t *orte_local_children;
ORTE_DECLSPEC extern uint16_t orte_num_jobs; ORTE_DECLSPEC extern orte_vpid_t orte_total_procs;
/* whether or not to forward SIGTSTP and SIGCONT signals */ /* whether or not to forward SIGTSTP and SIGCONT signals */
ORTE_DECLSPEC extern bool orte_forward_job_control; ORTE_DECLSPEC extern bool orte_forward_job_control;

Просмотреть файл

@ -783,6 +783,11 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup_and_return; goto cleanup_and_return;
} }
/* pack the offset */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->offset, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
/* cycle thru the job's procs, including only those that have /* cycle thru the job's procs, including only those that have
* been updated so we minimize the amount of info being sent * been updated so we minimize the amount of info being sent
*/ */
@ -824,7 +829,7 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup_and_return; goto cleanup_and_return;
} }
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->do_not_barrier, 1, OPAL_BOOL))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &proc->do_not_barrier, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup_and_return; goto cleanup_and_return;
} }
@ -870,7 +875,7 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr, bool update)
/* only APPS call this function - daemons have their own */ /* only APPS call this function - daemons have their own */
int orte_util_decode_pidmap(opal_byte_object_t *bo) int orte_util_decode_pidmap(opal_byte_object_t *bo)
{ {
orte_vpid_t num_procs, hostid, *vptr; orte_vpid_t num_procs, offset, hostid, *vptr, global_rank;
orte_local_rank_t local_rank; orte_local_rank_t local_rank;
orte_node_rank_t node_rank; orte_node_rank_t node_rank;
#if OPAL_HAVE_HWLOC #if OPAL_HAVE_HWLOC
@ -927,7 +932,18 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* unpack and store the offset */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &offset, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* only of possible use to ourselves */
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&proc, OPAL_SCOPE_INTERNAL,
ORTE_DB_NPROC_OFFSET, &offset, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* cycle thru the data until we hit an INVALID vpid indicating /* cycle thru the data until we hit an INVALID vpid indicating
* all data for this job has been read * all data for this job has been read
*/ */
@ -1062,6 +1078,23 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* store this procs global rank - only used by us */
global_rank = proc.vpid + offset;
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&proc, OPAL_SCOPE_INTERNAL,
ORTE_DB_GLOBAL_RANK, &global_rank, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
/* update our own global rank - this is something we will need
* to share with non-peers
*/
global_rank = proc.vpid + offset;
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&proc, OPAL_SCOPE_NON_PEER,
ORTE_DB_GLOBAL_RANK, &global_rank, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} }
} }
/* see if there is a file map */ /* see if there is a file map */
@ -1159,6 +1192,14 @@ int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
} }
jdata->num_procs = num_procs; jdata->num_procs = num_procs;
/* unpack the offset */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_procs, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
jdata->offset = num_procs;
/* cycle thru the data until we hit an INVALID vpid indicating /* cycle thru the data until we hit an INVALID vpid indicating
* all data for this job has been read * all data for this job has been read
*/ */