1
1

To support comm_spawn in fully routed environments, daemons need to know the route to all procs in their job family. They already had this information, but were not retaining it. The infrastructure to do so has existed for some time - just never had the time to complete it.

This commit does that by ensuring that daemons retain knowledge of proc location for all procs in their job family. It required a minor change to the ESS API to allow the daemons to update their pidmaps as data was received. In addition, the routed modules have been updated to take advantage of the newly available info, and the encode/decode pidmap utilities have been updated to communicate the required info in the launch message.

This commit was SVN r20022.
Этот коммит содержится в:
Ralph Castain 2008-11-18 15:35:50 +00:00
родитель 9ba78f6e5f
Коммит 9a57db4a81
16 изменённых файлов: 272 добавлений и 341 удалений

Просмотреть файл

@ -47,7 +47,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
orte_ess_base_module_t orte_ess_alps_module = {
@ -61,7 +61,7 @@ orte_ess_base_module_t orte_ess_alps_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
NULL /* ft_event */
};
@ -74,7 +74,6 @@ static int rte_init(char flags)
{
int ret;
char *error = NULL;
orte_jmap_t *jmap;
/* run the prolog */
if (ORTE_SUCCESS != (ret = orte_ess_base_std_prolog())) {
@ -121,13 +120,10 @@ static int rte_init(char flags)
/* setup array of jmaps */
OBJ_CONSTRUCT(&jobmap, opal_pointer_array_t);
opal_pointer_array_init(&jobmap, 1, INT32_MAX, 1);
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = ORTE_PROC_MY_NAME->jobid;
opal_pointer_array_add(&jobmap, jmap);
/* if one was provided, build my nidmap */
if (ORTE_SUCCESS != (ret = orte_ess_base_build_nidmap(orte_process_info.sync_buf,
&nidmap, jmap))) {
&nidmap, &jobmap))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_build_nidmap";
goto error;
@ -322,17 +318,12 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pmap->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
orte_jmap_t *jmap;
int ret;
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = job;
opal_pointer_array_add(&jobmap, jmap);
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jobmap))) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -85,7 +85,7 @@ ORTE_DECLSPEC int orte_ess_base_orted_finalize(void);
*/
ORTE_DECLSPEC int orte_ess_base_build_nidmap(opal_buffer_t *buffer,
opal_pointer_array_t *nidmap,
orte_jmap_t *jmap);
opal_pointer_array_t *jobmap);
ORTE_DECLSPEC orte_pmap_t* orte_ess_base_lookup_pmap(opal_pointer_array_t *jobmap, orte_process_name_t *proc);

Просмотреть файл

@ -35,7 +35,7 @@
int orte_ess_base_build_nidmap(opal_buffer_t *buffer,
opal_pointer_array_t *nidmap,
orte_jmap_t *jmap)
opal_pointer_array_t *jobmap)
{
int rc;
opal_byte_object_t *bo;
@ -71,7 +71,7 @@ int orte_ess_base_build_nidmap(opal_buffer_t *buffer,
return rc;
}
/* unpack the process map */
if (ORTE_SUCCESS != (rc = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (rc = orte_util_decode_pidmap(bo, jobmap))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -88,6 +88,10 @@ orte_pmap_t* orte_ess_base_lookup_pmap(opal_pointer_array_t *jobmap, orte_proces
jmaps = (orte_jmap_t**)jobmap->addr;
for (i=0; i < jobmap->size && NULL != jmaps[i]; i++) {
OPAL_OUTPUT_VERBOSE((10, orte_ess_base_output,
"%s ess:lookup:pmap: checking job %s for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jmaps[i]->job), ORTE_JOBID_PRINT(proc->jobid)));
if (proc->jobid == jmaps[i]->job) {
pmap = (orte_pmap_t*)opal_value_array_get_item(&jmaps[i]->pmap, proc->vpid);
return pmap;
@ -111,6 +115,10 @@ static orte_nid_t* find_daemon_node(opal_pointer_array_t *nidmap,
nids = (orte_nid_t**)nidmap->addr;
for (i=0; i < nidmap->size && NULL != nids[i]; i++) {
OPAL_OUTPUT_VERBOSE((10, orte_ess_base_output,
"%s ess:find:daemon:node: checking daemon %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_VPID_PRINT(nids[i]->daemon), ORTE_VPID_PRINT(proc->vpid)));
if (nids[i]->daemon == proc->vpid) {
return nids[i];
}
@ -123,22 +131,45 @@ orte_nid_t* orte_ess_base_lookup_nid(opal_pointer_array_t *nidmap,
opal_pointer_array_t *jobmap,
orte_process_name_t *proc)
{
orte_nid_t **nids;
orte_nid_t **nids, *nid;
orte_pmap_t *pmap;
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:lookup:nid: looking for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
/* if the proc is from a different job family, we always
* return NULL - we cannot know info for procs in other
* job families.
*/
if (ORTE_JOB_FAMILY(proc->jobid) !=
ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* this isn't an error - let the caller decide if an
* error message is required
*/
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:lookup:nid: different job family",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return NULL;
}
if (ORTE_PROC_IS_DAEMON(proc->jobid)) {
if (ORTE_JOB_FAMILY(proc->jobid) !=
ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
return NULL;
}
/* looking for a daemon in my family */
return find_daemon_node(nidmap, proc);
if (NULL == (nid = find_daemon_node(nidmap, proc))) {
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:lookup:nid: couldn't find daemon node",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
return nid;
}
/* looking for an application proc */
if (NULL == (pmap = orte_ess_base_lookup_pmap(jobmap, proc))) {
opal_output(0, "proc: %s not found", ORTE_NAME_PRINT(proc));
/* if the proc is in my job family, then this definitely is
* an error - we should always know the node of a proc
* in our job family
*/
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}

21
orte/mca/ess/env/ess_env_module.c поставляемый
Просмотреть файл

@ -88,7 +88,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
#if OPAL_ENABLE_FT == 1
@ -107,7 +107,7 @@ orte_ess_base_module_t orte_ess_env_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
#if OPAL_ENABLE_FT == 1
rte_ft_event
@ -123,7 +123,6 @@ static int rte_init(char flags)
{
int ret;
char *error = NULL;
orte_jmap_t *jmap;
/* run the prolog */
if (ORTE_SUCCESS != (ret = orte_ess_base_std_prolog())) {
@ -172,13 +171,10 @@ static int rte_init(char flags)
/* setup array of jmaps */
OBJ_CONSTRUCT(&jobmap, opal_pointer_array_t);
opal_pointer_array_init(&jobmap, 1, INT32_MAX, 1);
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = ORTE_PROC_MY_NAME->jobid;
opal_pointer_array_add(&jobmap, jmap);
/* if one was provided, build my nidmap */
if (ORTE_SUCCESS != (ret = orte_ess_base_build_nidmap(orte_process_info.sync_buf,
&nidmap, jmap))) {
&nidmap, &jobmap))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_build_nidmap";
goto error;
@ -373,17 +369,16 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pmap->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
orte_jmap_t *jmap;
int ret;
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = job;
opal_pointer_array_add(&jobmap, jmap);
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:env: updating pidmap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jobmap))) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -121,14 +121,14 @@ typedef orte_node_rank_t (*orte_ess_base_module_proc_get_node_rank_fn_t)(orte_pr
typedef int (*orte_ess_base_module_update_arch_fn_t)(orte_process_name_t *proc, uint32_t arch);
/**
* Add a pidmap
* Update thr pidmap
*
* When a job is dynamically launched via comm_spawn, the pre-existing daemons need to
* update their knowledge of the process map within the job so they can properly do
* things like route messages. This API allows daemons - and anyone else who wants to - to
* add a pidmap for a new job
*/
typedef int (*orte_ess_base_module_add_pidmap_fn_t)(orte_jobid_t job, opal_byte_object_t *bo);
typedef int (*orte_ess_base_module_update_pidmap_fn_t)(opal_byte_object_t *bo);
/**
* Update a nidmap
@ -165,7 +165,7 @@ struct orte_ess_base_module_1_0_0_t {
orte_ess_base_module_proc_get_local_rank_fn_t get_local_rank;
orte_ess_base_module_proc_get_node_rank_fn_t get_node_rank;
orte_ess_base_module_update_arch_fn_t update_arch;
orte_ess_base_module_add_pidmap_fn_t add_pidmap;
orte_ess_base_module_update_pidmap_fn_t update_pidmap;
orte_ess_base_module_update_nidmap_fn_t update_nidmap;
orte_ess_base_module_ft_event_fn_t ft_event;
};

Просмотреть файл

@ -80,7 +80,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
@ -95,7 +95,7 @@ orte_ess_base_module_t orte_ess_hnp_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
NULL /* ft_event */
};
@ -680,7 +680,7 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pdata->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
/* there is nothing to do here - the HNP can resolve
* all requests directly from its internal data. However,

Просмотреть файл

@ -56,7 +56,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
orte_ess_base_module_t orte_ess_lsf_module = {
@ -70,7 +70,7 @@ orte_ess_base_module_t orte_ess_lsf_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
NULL /* ft_event */
};
@ -131,13 +131,10 @@ static int rte_init(char flags)
/* setup array of jmaps */
OBJ_CONSTRUCT(&jobmap, opal_pointer_array_t);
opal_pointer_array_init(&jobmap, 1, INT32_MAX, 1);
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = ORTE_PROC_MY_NAME->jobid;
opal_pointer_array_add(&jobmap, jmap);
/* if one was provided, build my nidmap */
if (ORTE_SUCCESS != (ret = orte_ess_base_build_nidmap(orte_process_info.sync_buf,
&nidmap, jmap))) {
&nidmap, &jobmap))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_build_nidmap";
goto error;
@ -332,17 +329,12 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pmap->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
orte_jmap_t *jmap;
int ret;
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = job;
opal_pointer_array_add(&jobmap, jmap);
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jobmap))) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -75,7 +75,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
orte_ess_base_module_t orte_ess_singleton_module = {
@ -89,7 +89,7 @@ orte_ess_base_module_t orte_ess_singleton_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
NULL /* ft_event */
};
@ -562,17 +562,12 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pmap->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
orte_jmap_t *jmap;
int ret;
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = job;
opal_pointer_array_add(&jobmap, jmap);
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jobmap))) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -57,7 +57,7 @@ static uint32_t proc_get_arch(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_arch(orte_process_name_t *proc, uint32_t arch);
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
orte_ess_base_module_t orte_ess_slurm_module = {
@ -71,7 +71,7 @@ orte_ess_base_module_t orte_ess_slurm_module = {
proc_get_local_rank,
proc_get_node_rank,
update_arch,
add_pidmap,
update_pidmap,
update_nidmap,
NULL /* ft_event */
};
@ -84,7 +84,6 @@ static int rte_init(char flags)
{
int ret;
char *error = NULL;
orte_jmap_t *jmap;
/* run the prolog */
if (ORTE_SUCCESS != (ret = orte_ess_base_std_prolog())) {
@ -132,13 +131,10 @@ static int rte_init(char flags)
/* setup array of jmaps */
OBJ_CONSTRUCT(&jobmap, opal_pointer_array_t);
opal_pointer_array_init(&jobmap, 1, INT32_MAX, 1);
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = ORTE_PROC_MY_NAME->jobid;
opal_pointer_array_add(&jobmap, jmap);
/* if one was provided, build my nidmap */
if (ORTE_SUCCESS != (ret = orte_ess_base_build_nidmap(orte_process_info.sync_buf,
&nidmap, jmap))) {
&nidmap, &jobmap))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_build_nidmap";
goto error;
@ -229,6 +225,10 @@ static orte_vpid_t proc_get_daemon(orte_process_name_t *proc)
orte_nid_t *nid;
if (NULL == (nid = orte_ess_base_lookup_nid(&nidmap, &jobmap, proc))) {
/* don't generate an error message here - it could be a call to
* get a route to a proc in an unknown job. Let the caller decide
* if an error message is required
*/
return ORTE_VPID_INVALID;
}
@ -333,17 +333,16 @@ static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
return pmap->node_rank;
}
static int add_pidmap(orte_jobid_t job, opal_byte_object_t *bo)
static int update_pidmap(opal_byte_object_t *bo)
{
orte_jmap_t *jmap;
int ret;
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = job;
opal_pointer_array_add(&jobmap, jmap);
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:slurm: updating pidmap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jmap->num_procs, &jmap->pmap))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo, &jobmap))) {
ORTE_ERROR_LOG(ret);
}

Просмотреть файл

@ -58,7 +58,7 @@ orte_ess_base_module_t orte_ess_tool_module = {
NULL, /* don't need a proc_get_local_rank fn */
NULL, /* don't need a proc_get_node_rank fn */
NULL, /* don't need to update_arch */
NULL, /* don't need to add_pidmap */
NULL, /* don't need to update_pidmap */
NULL, /* don't need to update_nidmap */
NULL /* ft_event */
};

Просмотреть файл

@ -230,7 +230,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
}
/* encode the pidmap */
if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(jdata, &bo))) {
if (ORTE_SUCCESS != (rc = orte_util_encode_pidmap(&bo))) {
ORTE_ERROR_LOG(rc);
return rc;
}
@ -507,7 +507,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* retain a copy for downloading to child processes */
opal_dss.copy((void**)&jobdat->pmap, bo, OPAL_BYTE_OBJECT);
/* decode the pidmap - this will also free the bytes in bo */
if (ORTE_SUCCESS != (rc = orte_ess.add_pidmap(jobdat->jobid, bo))) {
if (ORTE_SUCCESS != (rc = orte_ess.update_pidmap(bo))) {
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
@ -1462,6 +1462,7 @@ static void setup_singleton_jobdat(orte_jobid_t jobid)
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
/* need to setup a pidmap for it */
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
opal_dss.pack(&buffer, &jobid, 1, ORTE_JOBID); /* jobid */
opal_dss.pack(&buffer, &(ORTE_PROC_MY_NAME->vpid), 1, ORTE_VPID); /* num_procs */
one32 = 0;
opal_dss.pack(&buffer, &one32, 1, OPAL_INT32); /* node index */
@ -1478,7 +1479,7 @@ static void setup_singleton_jobdat(orte_jobid_t jobid)
/* save a copy to send back to the proc */
opal_dss.copy((void**)&jobdat->pmap, bo, OPAL_BYTE_OBJECT);
/* update our ess data - this will release the byte object's data */
if (ORTE_SUCCESS != (rc = orte_ess.add_pidmap(jobid, bo))) {
if (ORTE_SUCCESS != (rc = orte_ess.update_pidmap(bo))) {
ORTE_ERROR_LOG(rc);
}
free(bo);

Просмотреть файл

@ -358,16 +358,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == (daemon.vpid = orte_ess.proc_get_daemon(target))) {
/* we don't recognize this one - if we are the HNP, all
* we can do is abort
*/
if (orte_process_info.hnp) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
/* if we are not the HNP, send it to the wildcard location */
ret = &wildcard_route;
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
@ -618,17 +610,23 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
"%s routed_binomial: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if this is for my job family, then we send the buffer
* to the proper tag on the daemon
*/
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(job)) {
/* send the buffer to the proper tag on the daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, ndat,
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) != ORTE_JOB_FAMILY(job)) {
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_binomial_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the daemon acks the update to ensure that
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
@ -641,43 +639,10 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
"%s routed_binomial_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* we already have defined our routes to everyone to
* be through the local daemon, so nothing further to do
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_binomial_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_binomial_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}

Просмотреть файл

@ -335,16 +335,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == (daemon.vpid = orte_ess.proc_get_daemon(target))) {
/* we don't recognize this one or our nidmap has not yet
* been initialized - if we are the HNP, all we can do is abort
*/
if (orte_process_info.hnp) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
/* if we are not the HNP, send it to the wildcard location */
ret = &wildcard_route;
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
@ -611,17 +603,23 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
"%s routed_linear: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if this is for my job family, then we send the buffer
* to the proper tag on the daemon
*/
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(job)) {
/* send the buffer to the proper tag on the daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, ndat,
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) != ORTE_JOB_FAMILY(job)) {
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_linear_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the daemon acks the update to ensure that
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
@ -634,43 +632,10 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
"%s routed_linear_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* we already have defined our routes to everyone to
* be through the local daemon, so nothing further to do
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_linear_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_linear_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}

Просмотреть файл

@ -360,17 +360,10 @@ static orte_process_name_t get_route(orte_process_name_t *target)
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
/* find out what daemon hosts this proc */
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == (daemon.vpid = orte_ess.proc_get_daemon(target))) {
/* we don't recognize this one or our nidmap has not yet
* been initialized - if we are the HNP, all we can do is abort
*/
if (orte_process_info.hnp) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
/* if we are not the HNP, send it to the wildcard location */
ret = &wildcard_route;
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
@ -639,23 +632,28 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
*/
if (NULL != ndat) {
int rc;
#if 0
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_radix: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if this is for my job family, then we send the buffer
* to the proper tag on the daemon
*/
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(job)) {
/* send the buffer to the proper tag on the daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, ndat,
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) != ORTE_JOB_FAMILY(job)) {
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_radix_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the daemon acks the update to ensure that
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
@ -668,43 +666,10 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
"%s routed_radix_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* we already have defined our routes to everyone to
* be through the local daemon, so nothing further to do
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}
#endif
/* if this is for a different job family, then we route via our HNP
* to minimize connection counts to entities such as ompi-server, so
* start by sending the contact info to the HNP for update
*/
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_radix_init_routes: diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_radix_init_routes: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return ORTE_SUCCESS;
}

Просмотреть файл

@ -584,7 +584,7 @@ process_daemons:
return ORTE_SUCCESS;
}
int orte_util_encode_pidmap(orte_job_t *jdata, opal_byte_object_t *boptr)
int orte_util_encode_pidmap(opal_byte_object_t *boptr)
{
int32_t *nodes;
orte_proc_t **procs;
@ -592,57 +592,66 @@ int orte_util_encode_pidmap(orte_job_t *jdata, opal_byte_object_t *boptr)
opal_buffer_t buf;
orte_local_rank_t *lrank;
orte_node_rank_t *nrank;
orte_job_t **jobs, *jdata;
int j;
int rc;
/* setup the working buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack the number of procs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->num_procs, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
jobs = (orte_job_t**)orte_job_data->addr;
/* for each job... */
for (j=0; j < orte_job_data->size && NULL != jobs[j]; j++) {
jdata = jobs[j];
/* pack the jobid */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of procs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &jdata->num_procs, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for the nodes */
nodes = (int32_t*)malloc(jdata->num_procs * 4);
/* transfer and pack the node info in one pack */
procs = (orte_proc_t**)jdata->procs->addr;
for (i=0; i < jdata->num_procs; i++) {
nodes[i] = procs[i]->node->index;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, nodes, jdata->num_procs, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* free node storage */
free(nodes);
/* transfer and pack the local_ranks in one pack */
lrank = (orte_local_rank_t*)malloc(jdata->num_procs*sizeof(orte_local_rank_t));
for (i=0; i < jdata->num_procs; i++) {
lrank[i] = procs[i]->local_rank;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, lrank, jdata->num_procs, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
free(lrank);
/* transfer and pack the node ranks in one pack */
nrank = (orte_node_rank_t*)malloc(jdata->num_procs*sizeof(orte_node_rank_t));
for (i=0; i < jdata->num_procs; i++) {
nrank[i] = procs[i]->node_rank;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, nrank, jdata->num_procs, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
free(nrank);
}
/* allocate memory for the nodes */
nodes = (int32_t*)malloc(jdata->num_procs * 4);
/* transfer and pack the node info in one pack */
procs = (orte_proc_t**)jdata->procs->addr;
for (i=0; i < jdata->num_procs; i++) {
nodes[i] = procs[i]->node->index;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, nodes, jdata->num_procs, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* free node storage */
free(nodes);
/* allocate memory for the local_ranks */
lrank = (orte_local_rank_t*)malloc(jdata->num_procs*sizeof(orte_local_rank_t));
/* transfer and pack them in one pack */
for (i=0; i < jdata->num_procs; i++) {
lrank[i] = procs[i]->local_rank;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, lrank, jdata->num_procs, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
free(lrank);
/* transfer and pack the node ranks in one pack */
nrank = (orte_node_rank_t*)malloc(jdata->num_procs*sizeof(orte_node_rank_t));
for (i=0; i < jdata->num_procs; i++) {
nrank[i] = procs[i]->node_rank;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, nrank, jdata->num_procs, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
free(nrank);
/* transfer the payload to the byte object */
opal_dss.unload(&buf, (void**)&boptr->bytes, &boptr->size);
OBJ_DESTRUCT(&buf);
@ -651,9 +660,9 @@ int orte_util_encode_pidmap(orte_job_t *jdata, opal_byte_object_t *boptr)
}
int orte_util_decode_pidmap(opal_byte_object_t *bo, orte_vpid_t *nprocs,
opal_value_array_t *procs)
int orte_util_decode_pidmap(opal_byte_object_t *bo, opal_pointer_array_t *jobmap)
{
orte_jobid_t jobid;
orte_vpid_t i, num_procs;
orte_pmap_t pmap;
int32_t *nodes;
@ -661,66 +670,90 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo, orte_vpid_t *nprocs,
orte_node_rank_t *node_rank;
orte_std_cntr_t n;
opal_buffer_t buf;
orte_jmap_t **jobs, *jmap;
bool already_present;
opal_value_array_t *procs;
int j;
int rc;
/* xfer the byte object to a buffer for unpacking */
/* load it into a buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.load(&buf, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of procs */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_procs, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
/* cycle through the buffer */
jobs = (orte_jmap_t**)jobmap->addr;
n = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(&buf, &jobid, &n, ORTE_JOBID))) {
/* is this job already in the map? */
already_present = false;
for (j=0; j < jobmap->size && NULL != jobs[j]; j++) {
if (jobid == jobs[j]->job) {
already_present = true;
break;
}
}
/* unpack the number of procs */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_procs, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for the node info */
nodes = (int32_t*)malloc(num_procs * 4);
/* unpack it in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, nodes, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for local ranks */
local_rank = (orte_local_rank_t*)malloc(num_procs*sizeof(orte_local_rank_t));
/* unpack them in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, local_rank, &n, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for node ranks */
node_rank = (orte_node_rank_t*)malloc(num_procs*sizeof(orte_node_rank_t));
/* unpack node ranks in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, node_rank, &n, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we don't already have this data, store it */
if (!already_present) {
/* create and add an entry for the job */
jmap = OBJ_NEW(orte_jmap_t);
jmap->job = jobid;
jmap->num_procs = num_procs;
opal_pointer_array_add(jobmap, jmap);
/* allocate memory for the procs array */
procs = &jmap->pmap;
opal_value_array_set_size(procs, num_procs);
/* xfer the data */
for (i=0; i < num_procs; i++) {
pmap.node = nodes[i];
pmap.local_rank = local_rank[i];
pmap.node_rank = node_rank[i];
opal_value_array_set_item(procs, i, &pmap);
}
}
/* release data */
free(nodes);
free(local_rank);
free(node_rank);
}
*nprocs = num_procs;
/* allocate memory for the procs array */
opal_value_array_set_size(procs, num_procs);
/* allocate memory for the node info */
nodes = (int32_t*)malloc(num_procs * 4);
/* unpack it in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, nodes, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for local ranks */
local_rank = (orte_local_rank_t*)malloc(num_procs*sizeof(orte_local_rank_t));
/* unpack them in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, local_rank, &n, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* allocate memory for node ranks */
node_rank = (orte_node_rank_t*)malloc(num_procs*sizeof(orte_node_rank_t));
/* unpack node ranks in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, node_rank, &n, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* store the data */
for (i=0; i < num_procs; i++) {
pmap.node = nodes[i];
pmap.local_rank = local_rank[i];
pmap.node_rank = node_rank[i];
opal_value_array_set_item(procs, i, &pmap);
}
/* release data */
free(nodes);
free(local_rank);
free(node_rank);
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;

Просмотреть файл

@ -47,9 +47,8 @@ BEGIN_C_DECLS
ORTE_DECLSPEC int orte_util_encode_nodemap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_nodemap(opal_byte_object_t *boptr, opal_pointer_array_t *nodes);
ORTE_DECLSPEC int orte_util_encode_pidmap(orte_job_t *jdata, opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_pidmap(opal_byte_object_t *boptr, orte_vpid_t *num_procs,
opal_value_array_t *procs);
ORTE_DECLSPEC int orte_util_encode_pidmap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_pidmap(opal_byte_object_t *boptr, opal_pointer_array_t *jobmap);
END_C_DECLS