1
1

From long-term discussions, have the daemons use the node_t and proc_t structs and arrays instead of the pidmap and nidmap arrays. Sets the stage for future work.

This commit was SVN r26359.
Этот коммит содержится в:
Ralph Castain 2012-04-29 00:10:01 +00:00
родитель 47a5e30095
Коммит 289f9f41ec
7 изменённых файлов: 607 добавлений и 219 удалений

Просмотреть файл

@ -83,8 +83,6 @@ ORTE_DECLSPEC int orte_ess_base_tool_finalize(void);
ORTE_DECLSPEC int orte_ess_base_orted_setup(char **hosts);
ORTE_DECLSPEC int orte_ess_base_orted_finalize(void);
ORTE_DECLSPEC int orte_ess_base_query_sys_info(char *node, char **keys, opal_list_t *values);
ORTE_DECLSPEC opal_paffinity_locality_t orte_ess_base_proc_get_locality(orte_process_name_t *proc);
ORTE_DECLSPEC orte_vpid_t orte_ess_base_proc_get_daemon(orte_process_name_t *proc);
ORTE_DECLSPEC char* orte_ess_base_proc_get_hostname(orte_process_name_t *proc);

Просмотреть файл

@ -39,6 +39,18 @@
#include "orte/mca/ess/base/base.h"
static orte_proc_t* find_proc(orte_process_name_t *proc) /* used by daemons */
{
orte_job_t *jdata;
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
return (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
}
opal_paffinity_locality_t orte_ess_base_proc_get_locality(orte_process_name_t *proc)
{
orte_pmap_t *pmap;
@ -58,92 +70,156 @@ opal_paffinity_locality_t orte_ess_base_proc_get_locality(orte_process_name_t *p
orte_vpid_t orte_ess_base_proc_get_daemon(orte_process_name_t *proc)
{
orte_nid_t *nid;
orte_proc_t *pdata;
orte_vpid_t vpid;
if( ORTE_JOBID_IS_DAEMON(proc->jobid) ) {
if (NULL == proc) {
return ORTE_VPID_INVALID;
}
if (ORTE_JOBID_IS_DAEMON(proc->jobid)) {
return proc->vpid;
}
if (NULL == (nid = orte_util_lookup_nid(proc))) {
return ORTE_VPID_INVALID;
if (ORTE_PROC_IS_APP) {
if (NULL == (nid = orte_util_lookup_nid(proc))) {
return ORTE_VPID_INVALID;
}
vpid = nid->daemon;
} else {
/* get the job data */
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_VPID_INVALID;
}
if (NULL == pdata->node || NULL == pdata->node->daemon) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_VPID_INVALID;
}
vpid = pdata->node->daemon->name.vpid;
}
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:base: proc %s is hosted by daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
ORTE_VPID_PRINT(nid->daemon)));
ORTE_VPID_PRINT(vpid)));
return nid->daemon;
return vpid;
}
char* orte_ess_base_proc_get_hostname(orte_process_name_t *proc)
{
orte_nid_t *nid;
if (NULL == (nid = orte_util_lookup_nid(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s LOOKING FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
orte_proc_t *pdata;
char *hostname;
if (NULL == proc) {
return NULL;
}
if (ORTE_PROC_IS_APP) {
if (NULL == (nid = orte_util_lookup_nid(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s LOOKING FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
return NULL;
}
hostname = nid->name;
} else {
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
hostname = pdata->node->name;
}
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:base: proc %s is on host %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
nid->name));
hostname));
return nid->name;
return hostname;
}
orte_local_rank_t orte_ess_base_proc_get_local_rank(orte_process_name_t *proc)
{
orte_pmap_t *pmap;
if (NULL == (pmap = orte_util_lookup_pmap(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
orte_proc_t *pdata;
orte_local_rank_t lrank;
if (NULL == proc) {
return ORTE_LOCAL_RANK_INVALID;
}
}
if (ORTE_PROC_IS_APP) {
if (NULL == (pmap = orte_util_lookup_pmap(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_LOCAL_RANK_INVALID;
}
lrank = pmap->local_rank;
} else {
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_LOCAL_RANK_INVALID;
}
lrank = pdata->local_rank;
}
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:base: proc %s has local rank %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
(int)pmap->local_rank));
(int)lrank));
return pmap->local_rank;
return lrank;
}
orte_node_rank_t orte_ess_base_proc_get_node_rank(orte_process_name_t *proc)
{
orte_pmap_t *pmap;
orte_ns_cmp_bitmask_t mask;
orte_proc_t *pdata;
orte_node_rank_t nrank;
mask = ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID;
/* is this me? */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, proc, ORTE_PROC_MY_NAME)) {
/* yes it is - reply with my rank. This is necessary
* because the pidmap will not have arrived when I
* am starting up, and if we use static ports, then
* I need to know my node rank during init
*/
return orte_process_info.my_node_rank;
}
if (NULL == (pmap = orte_util_lookup_pmap(proc))) {
if (NULL == proc) {
return ORTE_NODE_RANK_INVALID;
}
}
if (ORTE_PROC_IS_APP) {
/* is this me? */
if (proc->jobid == ORTE_PROC_MY_NAME->jobid &&
proc->vpid == ORTE_PROC_MY_NAME->vpid) {
/* yes it is - reply with my rank. This is necessary
* because the pidmap will not have arrived when I
* am starting up, and if we use static ports, then
* I need to know my node rank during init
*/
return orte_process_info.my_node_rank;
}
if (NULL == (pmap = orte_util_lookup_pmap(proc))) {
return ORTE_NODE_RANK_INVALID;
}
nrank = pmap->node_rank;
} else {
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_NODE_RANK_INVALID;
}
nrank = pdata->node_rank;
}
OPAL_OUTPUT_VERBOSE((5, orte_ess_base_output,
"%s ess:base: proc %s has node rank %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
(int)pmap->node_rank));
(int)nrank));
return pmap->node_rank;
return nrank;
}
int orte_ess_base_update_pidmap(opal_byte_object_t *bo)
@ -155,8 +231,14 @@ int orte_ess_base_update_pidmap(opal_byte_object_t *bo)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* build the pmap */
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo))) {
ORTE_ERROR_LOG(ret);
if (ORTE_PROC_IS_APP) {
if (ORTE_SUCCESS != (ret = orte_util_decode_pidmap(bo))) {
ORTE_ERROR_LOG(ret);
}
} else {
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_pidmap(bo))) {
ORTE_ERROR_LOG(ret);
}
}
return ret;
@ -165,10 +247,18 @@ int orte_ess_base_update_pidmap(opal_byte_object_t *bo)
int orte_ess_base_update_nidmap(opal_byte_object_t *bo)
{
int rc;
/* decode the nidmap - the util will know what to do */
if (ORTE_SUCCESS != (rc = orte_util_decode_nodemap(bo))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_PROC_IS_APP) {
if (ORTE_SUCCESS != (rc = orte_util_decode_nodemap(bo))) {
ORTE_ERROR_LOG(rc);
}
} else {
if (ORTE_SUCCESS != (rc = orte_util_decode_daemon_nodemap(bo))) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}

Просмотреть файл

@ -108,6 +108,7 @@ int orte_ess_base_orted_setup(char **hosts)
orte_job_t *jdata;
orte_proc_t *proc;
orte_app_context_t *app;
orte_node_t *node;
#ifndef __WINDOWS__
/* setup callback for SIGPIPE */
@ -406,17 +407,36 @@ int orte_ess_base_orted_setup(char **hosts)
}
}
/* setup the global job array */
/* setup the global job and node arrays */
orte_job_data = OBJ_NEW(opal_pointer_array_t);
if (ORTE_SUCCESS != (ret = opal_pointer_array_init(orte_job_data,
1,
ORTE_GLOBAL_ARRAY_MAX_SIZE,
1))) {
1,
ORTE_GLOBAL_ARRAY_MAX_SIZE,
1))) {
ORTE_ERROR_LOG(ret);
error = "setup job array";
goto error;
}
orte_node_pool = OBJ_NEW(opal_pointer_array_t);
if (ORTE_SUCCESS != (ret = opal_pointer_array_init(orte_node_pool,
ORTE_GLOBAL_ARRAY_BLOCK_SIZE,
ORTE_GLOBAL_ARRAY_MAX_SIZE,
ORTE_GLOBAL_ARRAY_BLOCK_SIZE))) {
ORTE_ERROR_LOG(ret);
error = "setup node array";
goto error;
}
orte_node_topologies = OBJ_NEW(opal_pointer_array_t);
if (ORTE_SUCCESS != (ret = opal_pointer_array_init(orte_node_topologies,
ORTE_GLOBAL_ARRAY_BLOCK_SIZE,
ORTE_GLOBAL_ARRAY_MAX_SIZE,
ORTE_GLOBAL_ARRAY_BLOCK_SIZE))) {
ORTE_ERROR_LOG(ret);
error = "setup node topologies array";
goto error;
}
/* Setup the job data object for the daemons */
/* create and store the job data object */
jdata = OBJ_NEW(orte_job_t);
@ -428,6 +448,15 @@ int orte_ess_base_orted_setup(char **hosts)
opal_pointer_array_set_item(jdata->apps, 0, app);
jdata->num_apps++;
/* create and store a node object where we are */
node = OBJ_NEW(orte_node_t);
node->name = strdup(orte_process_info.nodename);
node->index = opal_pointer_array_set_item(orte_node_pool, ORTE_PROC_MY_NAME->vpid, node);
#if OPAL_HAVE_HWLOC
/* point our topology to the one detected locally */
node->topology = opal_hwloc_topology;
#endif
/* create and store a proc object for us */
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = ORTE_PROC_MY_NAME->jobid;
@ -438,6 +467,17 @@ int orte_ess_base_orted_setup(char **hosts)
proc->state = ORTE_PROC_STATE_RUNNING;
opal_pointer_array_set_item(jdata->procs, proc->name.vpid, proc);
/* record that the daemon (i.e., us) is on this node
* NOTE: we do not add the proc object to the node's
* proc array because we are not an application proc.
* Instead, we record it in the daemon field of the
* node object
*/
OBJ_RETAIN(proc); /* keep accounting straight */
node->daemon = proc;
node->daemon_launched = true;
node->state = ORTE_NODE_STATE_UP;
/* record that the daemon job is running */
jdata->num_procs = 1;
jdata->state = ORTE_JOB_STATE_RUNNING;

Просмотреть файл

@ -92,11 +92,6 @@
static int rte_init(void);
static int rte_finalize(void);
static void rte_abort(int status, bool report) __opal_attribute_noreturn__;
static opal_paffinity_locality_t proc_get_locality(orte_process_name_t *proc);
static orte_vpid_t proc_get_daemon(orte_process_name_t *proc);
static char* proc_get_hostname(orte_process_name_t *proc);
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc);
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc);
static int update_pidmap(opal_byte_object_t *bo);
static int update_nidmap(opal_byte_object_t *bo);
@ -104,11 +99,11 @@ orte_ess_base_module_t orte_ess_hnp_module = {
rte_init,
rte_finalize,
rte_abort,
proc_get_locality,
proc_get_daemon,
proc_get_hostname,
proc_get_local_rank,
proc_get_node_rank,
NULL,
orte_ess_base_proc_get_daemon,
orte_ess_base_proc_get_hostname,
orte_ess_base_proc_get_local_rank,
orte_ess_base_proc_get_node_rank,
update_pidmap,
update_nidmap,
NULL /* ft_event */
@ -740,138 +735,6 @@ static void rte_abort(int status, bool report)
exit(status);
}
static opal_paffinity_locality_t proc_get_locality(orte_process_name_t *proc)
{
orte_node_t *node;
orte_proc_t *myproc;
int i;
orte_ns_cmp_bitmask_t mask;
/* the HNP is always on node=0 of the node array */
node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0);
/* cycle through the array of local procs */
for (i=0; i < node->procs->size; i++) {
if (NULL == (myproc = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
continue;
}
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &myproc->name, proc)) {
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s is LOCAL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
return (OPAL_PROC_ON_NODE | OPAL_PROC_ON_CU | OPAL_PROC_ON_CLUSTER);
}
}
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s is REMOTE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc)));
return OPAL_PROC_NON_LOCAL;
}
static orte_proc_t* find_proc(orte_process_name_t *proc)
{
orte_job_t *jdata;
if (NULL == (jdata = orte_get_job_data_object(proc->jobid))) {
return NULL;
}
return (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, proc->vpid);
}
static orte_vpid_t proc_get_daemon(orte_process_name_t *proc)
{
orte_proc_t *pdata;
if( NULL == proc ) {
return ORTE_VPID_INVALID;
}
if( ORTE_JOBID_IS_DAEMON(proc->jobid) ) {
return proc->vpid;
}
/* get the job data */
if (NULL == (pdata = find_proc(proc))) {
return ORTE_VPID_INVALID;
}
if( NULL == pdata->node || NULL == pdata->node->daemon ) {
return ORTE_VPID_INVALID;
}
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s is hosted by daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
ORTE_VPID_PRINT(pdata->node->daemon->name.vpid)));
return pdata->node->daemon->name.vpid;
}
static char* proc_get_hostname(orte_process_name_t *proc)
{
orte_proc_t *pdata;
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
}
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s is on host %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
pdata->node->name));
return pdata->node->name;
}
static orte_local_rank_t proc_get_local_rank(orte_process_name_t *proc)
{
orte_proc_t *pdata;
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_LOCAL_RANK_INVALID;
}
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s has local rank %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
(int)pdata->local_rank));
return pdata->local_rank;
}
static orte_node_rank_t proc_get_node_rank(orte_process_name_t *proc)
{
orte_proc_t *pdata;
if (NULL == (pdata = find_proc(proc))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_NODE_RANK_INVALID;
}
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
"%s ess:hnp: proc %s has node rank %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(proc),
(int)pdata->node_rank));
return pdata->node_rank;
}
static int update_pidmap(opal_byte_object_t *bo)
{
/* there is nothing to do here - the HNP can resolve

Просмотреть файл

@ -96,7 +96,6 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
{
int rc;
orte_job_t *jdata=NULL;
orte_proc_t *proc;
orte_job_map_t *map=NULL;
opal_buffer_t *wireup;
opal_byte_object_t bo, *boptr;
@ -322,17 +321,6 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
return rc;
}
/* pack the procs for this job */
for (j=0; j < jdata->procs->size; j++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &proc, 1, ORTE_PROC))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}
@ -400,7 +388,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_jobid_t debugger;
int32_t n;
orte_app_context_t *app;
orte_proc_t *pptr, *p2;
orte_proc_t *pptr;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:constructing child list",
@ -653,25 +641,14 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
return rc;
}
/* unpack the procs */
for (j=0; j < jdata->num_procs; j++) {
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &pptr, &cnt, ORTE_PROC))) {
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
/* check the procs */
for (n=0; n < jdata->procs->size; n++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, n))) {
continue;
}
/* add it to our global jdata object since
* many parts of the system will look for it there
*/
if (NULL != (p2 = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, pptr->name.vpid))) {
OBJ_RELEASE(p2);
}
opal_pointer_array_set_item(jdata->procs, pptr->name.vpid, pptr);
/* see if it belongs to us */
if (ORTE_SUCCESS != (rc = check_local_proc(jdata, pptr))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(pptr);
goto REPORT_ERROR;
}
}

Просмотреть файл

@ -58,6 +58,7 @@
#include "orte/util/regex.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/state/state.h"
#include "orte/util/nidmap.h"
@ -520,6 +521,145 @@ int orte_util_decode_nodemap(opal_byte_object_t *bo)
return ORTE_SUCCESS;
}
int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo)
{
int n;
int32_t num_nodes, i, num_daemons;
orte_vpid_t *vpids;
orte_node_t *node;
opal_buffer_t buf;
int rc;
uint8_t *oversub;
char *name;
orte_job_t *daemons;
orte_proc_t *dptr;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s decode:nidmap decoding daemon nodemap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* xfer the byte object to a buffer for unpacking */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, bo->bytes, bo->size);
/* unpack number of nodes */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_nodes, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s decode:nidmap decoding %d nodes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_nodes));
/* set the size of the nidmap storage so we minimize realloc's */
if (ORTE_SUCCESS != (rc = opal_pointer_array_set_size(orte_node_pool, num_nodes))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* loop over nodes and unpack the raw nodename */
for (i=0; i < num_nodes; i++) {
/* unpack the node's name */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &name, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* do we already have this node? Nodes don't change
* position in the node_pool array, so take advantage
* of that fact
*/
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
node = OBJ_NEW(orte_node_t);
node->name = name;
opal_pointer_array_set_item(orte_node_pool, i, node);
} else {
free(name);
}
}
/* unpack the daemon vpids */
vpids = (orte_vpid_t*)malloc(num_nodes * sizeof(orte_vpid_t));
n=num_nodes;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, vpids, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the oversubscribed flags */
oversub = (uint8_t*)malloc(num_nodes * sizeof(uint8_t));
n=num_nodes;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, oversub, &n, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* transfer the data to the nodes, counting the number of
* daemons in the system
*/
num_daemons = 0;
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
for (i=0; i < num_nodes; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
/* this is an error */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
return ORTE_ERR_NOT_FOUND;
}
if (NULL == (dptr = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, vpids[i]))) {
dptr = OBJ_NEW(orte_proc_t);
dptr->name.jobid = ORTE_PROC_MY_NAME->jobid;
dptr->name.vpid = vpids[i];
}
if (NULL != node->daemon) {
OBJ_RELEASE(node->daemon);
}
OBJ_RETAIN(dptr);
node->daemon = dptr;
if (NULL != dptr->node) {
OBJ_RELEASE(dptr->node);
}
OBJ_RETAIN(node);
dptr->node = node;
if (0 == oversub[i]) {
node->oversubscribed = false;
} else {
node->oversubscribed = true;
}
if (ORTE_VPID_INVALID != vpids[i]) {
++num_daemons;
}
}
free(vpids);
free(oversub);
orte_process_info.num_procs = num_daemons;
if (orte_process_info.max_procs < orte_process_info.num_procs) {
orte_process_info.max_procs = orte_process_info.num_procs;
}
/* update num_daemons */
orte_process_info.num_daemons = num_daemons;
if (0 < opal_output_get_verbosity(orte_debug_output)) {
for (i=0; i < num_nodes; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
continue;
}
opal_output(5, "%s node[%d].name %s daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), i,
(NULL == node->name) ? "NULL" : node->name,
ORTE_VPID_PRINT(node->daemon->name.vpid));
}
}
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
int orte_util_encode_pidmap(opal_byte_object_t *boptr)
{
orte_proc_t *proc;
@ -532,6 +672,9 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr)
#if OPAL_HAVE_HWLOC
unsigned int *bind_idx=NULL;
#endif
orte_proc_state_t *states=NULL;
orte_app_idx_t *app_idx=NULL;
int32_t *restarts=NULL;
/* setup the working buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
@ -572,6 +715,9 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr)
nodes = (int32_t*)malloc(jdata->num_procs * sizeof(int32_t));
lrank = (orte_local_rank_t*)malloc(jdata->num_procs*sizeof(orte_local_rank_t));
nrank = (orte_node_rank_t*)malloc(jdata->num_procs*sizeof(orte_node_rank_t));
states = (orte_proc_state_t*)malloc(jdata->num_procs*sizeof(orte_proc_state_t));
app_idx = (orte_app_idx_t*)malloc(jdata->num_procs*sizeof(orte_app_idx_t));
restarts = (int32_t*)malloc(jdata->num_procs*sizeof(int32_t));
#if OPAL_HAVE_HWLOC
bind_idx = (unsigned int*)malloc(jdata->num_procs*sizeof(unsigned int));
#endif
@ -588,6 +734,9 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr)
nodes[k] = proc->node->index;
lrank[k] = proc->local_rank;
nrank[k] = proc->node_rank;
states[k] = proc->state;
app_idx[k] = proc->app_idx;
restarts[k] = proc->restarts;
#if OPAL_HAVE_HWLOC
bind_idx[k] = proc->bind_idx;
#endif
@ -614,6 +763,21 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr)
goto cleanup_and_return;
}
#endif
/* transfer and pack the states in one pack */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, states, jdata->num_procs, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
/* transfer and pack the app_idx's in one pack */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, app_idx, jdata->num_procs, ORTE_APP_IDX))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
/* transfer and pack the restarts in one pack */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, restarts, jdata->num_procs, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup_and_return;
}
}
/* transfer the payload to the byte object */
@ -635,6 +799,15 @@ int orte_util_encode_pidmap(opal_byte_object_t *boptr)
free(bind_idx);
}
#endif
if (NULL != states) {
free(states);
}
if (NULL != app_idx) {
free(app_idx);
}
if (NULL != restarts) {
free(restarts);
}
OBJ_DESTRUCT(&buf);
return rc;
@ -659,6 +832,9 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
bool already_present;
int j;
int rc;
orte_proc_state_t *states = NULL;
orte_app_idx_t *app_idx = NULL;
int32_t *restarts = NULL;
/* xfer the byte object to a buffer for unpacking */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
@ -677,6 +853,7 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
* jobs are cleaned up as they complete, check the
* entire array
*/
jmap = NULL;
already_present = false;
for (j=0; j < orte_jobmap.size; j++) {
@ -749,6 +926,42 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
}
#endif
/* allocate memory for states */
states = (orte_proc_state_t*)malloc(num_procs*sizeof(orte_proc_state_t));
/* unpack states in one shot */
n=num_procs;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, states, &n, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* dump this info - apps don't need it */
free(states);
states = NULL;
/* allocate memory for app_idx's */
app_idx = (orte_app_idx_t*)malloc(num_procs*sizeof(orte_app_idx_t));
/* unpack app_idx's in one shot */
n=num_procs;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, app_idx, &n, ORTE_APP_IDX))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* dump this info - apps don't need it */
free(app_idx);
app_idx = NULL;
/* allocate memory for restarts */
restarts = (int32_t*)malloc(num_procs*sizeof(int32_t));
/* unpack restarts in one shot */
n=num_procs;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, restarts, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* dump this info - apps don't need it */
free(restarts);
restarts = NULL;
/* if we already know about this job, we need to check the data to see
* if something has changed - e.g., a proc that is being restarted somewhere
* other than where it previously was
@ -864,10 +1077,215 @@ int orte_util_decode_pidmap(opal_byte_object_t *bo)
free(bind_idx);
}
#endif
if (NULL != states) {
free(states);
}
if (NULL != app_idx) {
free(app_idx);
}
if (NULL != restarts) {
free(restarts);
}
OBJ_DESTRUCT(&buf);
return rc;
}
int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo)
{
orte_jobid_t jobid;
orte_vpid_t i, num_procs;
int32_t *nodes=NULL;
orte_local_rank_t *local_rank=NULL;
orte_node_rank_t *node_rank=NULL;
#if OPAL_HAVE_HWLOC
opal_hwloc_level_t bind_level = OPAL_HWLOC_NODE_LEVEL;
unsigned int *bind_idx=NULL;
#endif
orte_std_cntr_t n;
opal_buffer_t buf;
int rc;
orte_job_t *jdata;
orte_proc_t *proc;
orte_node_t *node;
orte_proc_state_t *states=NULL;
orte_app_idx_t *app_idx=NULL;
int32_t *restarts=NULL;
/* xfer the byte object to a buffer for unpacking */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.load(&buf, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
n = 1;
/* cycle through the buffer */
while (ORTE_SUCCESS == (rc = opal_dss.unpack(&buf, &jobid, &n, ORTE_JOBID))) {
/* see if we have this job object - could be a restart scenario */
if (NULL == (jdata = orte_get_job_data_object(jobid))) {
/* need to create this job */
jdata = OBJ_NEW(orte_job_t);
jdata->jobid = jobid;
opal_pointer_array_set_item(orte_job_data, ORTE_LOCAL_JOBID(jobid), jdata);
}
/* unpack the number of procs */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &num_procs, &n, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
jdata->num_procs = num_procs;
#if OPAL_HAVE_HWLOC
/* unpack the binding level */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bind_level, &n, OPAL_HWLOC_LEVEL_T))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
/* allocate memory for the node info */
nodes = (int32_t*)malloc(num_procs * 4);
/* unpack it in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, nodes, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* allocate memory for local ranks */
local_rank = (orte_local_rank_t*)malloc(num_procs*sizeof(orte_local_rank_t));
/* unpack them in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, local_rank, &n, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* allocate memory for node ranks */
node_rank = (orte_node_rank_t*)malloc(num_procs*sizeof(orte_node_rank_t));
/* unpack node ranks in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, node_rank, &n, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#if OPAL_HAVE_HWLOC
/* allocate memory for bind_idx */
bind_idx = (unsigned int*)malloc(num_procs*sizeof(unsigned int));
/* unpack bind_idx in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, bind_idx, &n, OPAL_UINT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
#endif
/* allocate memory for states */
states = (orte_proc_state_t*)malloc(num_procs*sizeof(orte_proc_state_t));
/* unpack states in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, states, &n, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* allocate memory for app_idx's */
app_idx = (orte_app_idx_t*)malloc(num_procs*sizeof(orte_app_idx_t));
/* unpack app_idx's in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, app_idx, &n, ORTE_APP_IDX))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* allocate memory for restarts */
restarts = (int32_t*)malloc(num_procs*sizeof(int32_t));
/* unpack restarts in one shot */
n=num_procs;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, restarts, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* xfer the data */
for (i=0; i < num_procs; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = jdata->jobid;
proc->name.vpid = i;
opal_pointer_array_set_item(jdata->procs, i, proc);
}
if (NULL != proc->node) {
OBJ_RELEASE(proc->node);
}
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, nodes[i]))) {
node = OBJ_NEW(orte_node_t);
opal_pointer_array_set_item(orte_node_pool, nodes[i], node);
}
OBJ_RETAIN(node);
proc->node = node;
proc->local_rank = local_rank[i];
proc->node_rank = node_rank[i];
proc->app_idx = app_idx[i];
proc->restarts = restarts[i];
proc->state = states[i];
}
/* release data */
free(nodes);
nodes = NULL;
free(local_rank);
local_rank = NULL;
free(node_rank);
node_rank = NULL;
#if OPAL_HAVE_HWLOC
free(bind_idx);
bind_idx = NULL;
#endif
free(states);
states = NULL;
free(app_idx);
app_idx = NULL;
free(restarts);
restarts = NULL;
/* setup for next cycle */
n = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) {
rc = ORTE_SUCCESS;
}
cleanup:
if (NULL != nodes) {
free(nodes);
}
if (NULL != local_rank) {
free(local_rank);
}
if (NULL != node_rank) {
free(node_rank);
}
#if OPAL_HAVE_HWLOC
if (NULL != bind_idx) {
free(bind_idx);
}
#endif
if (NULL != states) {
free(states);
}
if (NULL != app_idx) {
free(app_idx);
}
if (NULL != restarts) {
free(restarts);
}
OBJ_DESTRUCT(&buf);
return rc;
}
/*** NIDMAP UTILITIES ***/
orte_jmap_t* orte_util_lookup_jmap(orte_jobid_t job)

Просмотреть файл

@ -52,9 +52,11 @@ ORTE_DECLSPEC int orte_util_set_proc_state(orte_process_name_t *proc, orte_proc_
ORTE_DECLSPEC int orte_util_encode_nodemap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_nodemap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_daemon_nodemap(opal_byte_object_t *bo);
ORTE_DECLSPEC int orte_util_encode_pidmap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_pidmap(opal_byte_object_t *boptr);
ORTE_DECLSPEC int orte_util_decode_daemon_pidmap(opal_byte_object_t *bo);
ORTE_DECLSPEC int orte_util_build_daemon_nidmap(char **nodes);