1
1

Add topology support for hetero systems

Signed-off-by: Ralph Castain <rhc@pmix.org>
Этот коммит содержится в:
Ralph Castain 2019-02-06 10:35:24 -08:00
родитель 88ac05fca6
Коммит 01e9aca40f
5 изменённых файлов: 621 добавлений и 99 удалений

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2011-2016 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -93,6 +93,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* tell DVM daemons to cleanup resources from job */
#define ORTE_DAEMON_DVM_CLEANUP_JOB_CMD (orte_daemon_cmd_flag_t) 34
/* pass node info */
#define ORTE_DAEMON_PASS_NODE_INFO_CMD (orte_daemon_cmd_flag_t) 35
/*
* Struct written up the pipe from the child to the parent.
*/

Просмотреть файл

@ -130,7 +130,11 @@ void orte_plm_base_daemons_reported(int fd, short args, void *cbdata)
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_topology_t *t;
orte_node_t *node;
int i;
int i, rc;
uint8_t u8;
opal_buffer_t buf;
orte_grpcomm_signature_t *sig;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_PASS_NODE_INFO_CMD;
ORTE_ACQUIRE_OBJECT(caddy);
@ -177,6 +181,78 @@ void orte_plm_base_daemons_reported(int fd, short args, void *cbdata)
/* ensure we update the routing plan */
orte_routed.update_routing_plan(NULL);
/* prep the buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* load the command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
/* if we did not execute a tree-spawn, then the daemons do
* not currently have a nidmap for the job - in that case,
* send one to them */
if (!orte_nidmap_communicated) {
u8 = 1;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &u8, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
if (OPAL_SUCCESS != (rc = orte_util_nidmap_create(orte_node_pool, &buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
orte_nidmap_communicated = true;
} else {
u8 = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &u8, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
}
/* we always send the topologies and the #slots on each node. Note
* that we cannot send the #slots until after the above step since,
* for unmanaged allocations, we might have just determined it! */
if (OPAL_SUCCESS != (rc = orte_util_pass_node_info(&buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
sig->sz = 1;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, &buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(sig);
OBJ_DESTRUCT(&buf);
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
OBJ_RELEASE(caddy);
return;
}
OBJ_DESTRUCT(&buf);
/* maintain accounting */
OBJ_RELEASE(sig);
/* progress the job */
caddy->jdata->state = ORTE_JOB_STATE_DAEMONS_REPORTED;
ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_VM_READY);

Просмотреть файл

@ -59,6 +59,7 @@
#include "orte/util/proc_info.h"
#include "orte/util/session_dir.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
@ -126,7 +127,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
char *coprocessors;
orte_job_map_t *map;
int8_t flag;
uint8_t *cmpdata;
uint8_t *cmpdata, u8;
size_t cmplen;
/* unpack the command */
@ -241,6 +242,32 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
break;
case ORTE_DAEMON_PASS_NODE_INFO_CMD:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received pass_node_info",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
if (!ORTE_PROC_IS_HNP) {
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &u8, &n, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
if (1 == u8) {
if (ORTE_SUCCESS != (ret = orte_util_decode_nidmap(buffer))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
if (ORTE_SUCCESS != (ret = orte_util_parse_node_info(buffer))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
break;
/**** ADD_LOCAL_PROCS ****/
case ORTE_DAEMON_ADD_LOCAL_PROCS:
case ORTE_DAEMON_DVM_ADD_PROCS:

Просмотреть файл

@ -24,7 +24,7 @@
#include "opal/util/argv.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/routed/routed.h"
#include "orte/runtime/orte_globals.h"
@ -34,11 +34,10 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool,
opal_buffer_t *buffer)
{
char *raw = NULL;
uint8_t *vpids=NULL, *flags=NULL, u8;
uint8_t *vpids=NULL, u8;
uint16_t u16;
uint16_t *slots=NULL;
uint32_t u32;
int n, ndaemons, rc, nbytes, nbitmap;
int n, ndaemons, rc, nbytes;
bool compressed;
char **names = NULL, **ranks = NULL;
orte_node_t *nptr;
@ -84,13 +83,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool,
}
vpids = (uint8_t*)malloc(nbytes * pool->size);
/* make room for the number of slots on each node */
slots = (uint16_t*)malloc(sizeof(uint16_t) * pool->size);
/* and for the flags for each node - only need one bit/node */
nbitmap = (pool->size / 8) + 1;
flags = (uint8_t*)calloc(1, nbitmap);
ndaemons = 0;
for (n=0; n < pool->size; n++) {
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(pool, n))) {
@ -120,12 +112,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool,
}
memcpy(&vpids[nbytes*ndaemons], &u32, 4);
}
/* store the number of slots */
slots[n] = nptr->slots;
/* store the flag */
if (ORTE_FLAG_TEST(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN)) {
flags[n/8] |= (1 << (7 - (n % 8)));
}
++ndaemons;
}
@ -215,76 +201,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool,
free(bo.bytes);
}
/* compress the slots */
if (opal_compress.compress_block((uint8_t*)slots, sizeof(uint16_t)*ndaemons,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
compressed = false;
bo.bytes = (uint8_t*)slots;
bo.size = sizeof(uint16_t)*ndaemons;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
/* if compressed, provide the uncompressed size */
if (compressed) {
sz = sizeof(uint16_t)*ndaemons;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) {
free(bo.bytes);
goto cleanup;
}
}
/* add the object */
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
if (compressed) {
free(bo.bytes);
}
/* compress the flags */
if (opal_compress.compress_block(flags, nbitmap,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
compressed = false;
bo.bytes = flags;
bo.size = nbitmap;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
/* if compressed, provide the uncompressed size */
if (compressed) {
sz = nbitmap;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) {
free(bo.bytes);
goto cleanup;
}
}
/* add the object */
boptr = &bo;
rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT);
cleanup:
if (NULL != names) {
opal_argv_free(names);
@ -298,12 +214,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool,
if (NULL != vpids) {
free(vpids);
}
if (NULL != slots) {
free(slots);
}
if (NULL != flags) {
free(flags);
}
return rc;
}
@ -574,10 +484,9 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
if (1 == flags[n]) {
ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_SLOTS_GIVEN);
}
/* set the topology */
#if !OPAL_ENABLE_HETEROGENEOUS_SUPPORT
/* set the topology - always default to homogeneous
* as that is the most common scenario */
nd->topology = t;
#endif
/* see if it has a daemon on it */
if (1 == nbytes && UINT8_MAX != vp8[n]) {
vpid = vp8[n];
@ -620,6 +529,504 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
return rc;
}
typedef struct {
opal_list_item_t super;
orte_topology_t *t;
} orte_tptr_trk_t;
static OBJ_CLASS_INSTANCE(orte_tptr_trk_t,
opal_list_item_t,
NULL, NULL);
int orte_util_pass_node_info(opal_buffer_t *buffer)
{
uint16_t *slots=NULL, slot = UINT16_MAX;
uint8_t *flags=NULL, flag = UINT8_MAX, *topologies = NULL;
int8_t i8, ntopos;
int rc, n, nbitmap, nstart;
bool compressed, unislots = true, uniflags = true, unitopos = true;
orte_node_t *nptr;
opal_byte_object_t bo, *boptr;
size_t sz, nslots;
opal_buffer_t bucket;
orte_tptr_trk_t *trk;
opal_list_t topos;
orte_topology_t *t;
/* make room for the number of slots on each node */
nslots = sizeof(uint16_t) * orte_node_pool->size;
slots = (uint16_t*)malloc(nslots);
/* and for the flags for each node - only need one bit/node */
nbitmap = (orte_node_pool->size / 8) + 1;
flags = (uint8_t*)calloc(1, nbitmap);
/* handle the topologies - as the most common case by far
* is to have homogeneous topologies, we only send them
* if something is different. We know that the HNP is
* the first topology, and that any differing topology
* on the compute nodes must follow. So send the topologies
* if and only if:
*
* (a) the HNP is being used to house application procs and
* there is more than one topology in our array; or
*
* (b) the HNP is not being used, but there are more than
* two topologies in our array, thus indicating that
* there are multiple topologies on the compute nodes
*/
if (!orte_hnp_is_allocated || (ORTE_GET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping) & ORTE_MAPPING_NO_USE_LOCAL)) {
nstart = 1;
} else {
nstart = 0;
}
OBJ_CONSTRUCT(&topos, opal_list_t);
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (n=nstart; n < orte_node_topologies->size; n++) {
if (NULL == (t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, n))) {
continue;
}
trk = OBJ_NEW(orte_tptr_trk_t);
trk->t = t;
opal_list_append(&topos, &trk->super);
/* pack this topology string */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->sig, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
/* pack the topology itself */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->topo, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&bucket);
goto cleanup;
}
}
/* pack the number of topologies in allocation */
ntopos = opal_list_get_size(&topos);
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &ntopos, 1, OPAL_INT8))) {
goto cleanup;
}
if (1 < ntopos) {
/* need to send them along */
opal_dss.copy_payload(buffer, &bucket);
/* allocate space to report them */
ntopos = orte_node_pool->size;
topologies = (uint8_t*)malloc(ntopos);
unitopos = false;
}
OBJ_DESTRUCT(&bucket);
for (n=0; n < orte_node_pool->size; n++) {
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
continue;
}
/* store the topology, if required */
if (!unitopos) {
topologies[n] = 0;
if (0 == nstart || 0 < n) {
OPAL_LIST_FOREACH(trk, &topos, orte_tptr_trk_t) {
if (trk->t == nptr->topology) {
break;
}
topologies[n]++;
}
}
}
/* store the number of slots */
slots[n] = nptr->slots;
if (UINT16_MAX == slot) {
slot = nptr->slots;
} else if (slot != nptr->slots) {
unislots = false;
}
/* store the flag */
if (ORTE_FLAG_TEST(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN)) {
flags[n/8] |= (1 << (7 - (n % 8)));
if (UINT8_MAX == flag) {
flag = 1;
} else if (1 != flag) {
uniflags = false;
}
} else {
if (UINT8_MAX == flag) {
flag = 0;
} else if (0 != flag) {
uniflags = false;
}
}
}
/* deal with the topology assignments */
if (!unitopos) {
if (opal_compress.compress_block((uint8_t*)topologies, ntopos,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
i8 = 1;
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
i8 = 0;
compressed = false;
bo.bytes = topologies;
bo.size = nbitmap;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
/* if compressed, provide the uncompressed size */
if (compressed) {
sz = nslots;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) {
free(bo.bytes);
goto cleanup;
}
}
/* add the object */
boptr = &bo;
rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT);
}
if (compressed) {
free(bo.bytes);
}
/* if we have uniform #slots, then just flag it - no
* need to pass anything */
if (unislots) {
i8 = -1 * slot;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
goto cleanup;
}
} else {
if (opal_compress.compress_block((uint8_t*)slots, nslots,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
i8 = 1;
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
i8 = 0;
compressed = false;
bo.bytes = flags;
bo.size = nbitmap;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
/* if compressed, provide the uncompressed size */
if (compressed) {
sz = nslots;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) {
free(bo.bytes);
goto cleanup;
}
}
/* add the object */
boptr = &bo;
rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT);
}
if (compressed) {
free(bo.bytes);
}
/* if we have uniform flags, then just flag it - no
* need to pass anything */
if (uniflags) {
if (1 == flag) {
i8 = -1;
} else {
i8 = -2;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
goto cleanup;
}
} else {
if (opal_compress.compress_block(flags, nbitmap,
(uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */
i8 = 2;
compressed = true;
bo.size = sz;
} else {
/* mark that this was not compressed */
i8 = 3;
compressed = false;
bo.bytes = flags;
bo.size = nbitmap;
}
/* indicate compression */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) {
if (compressed) {
free(bo.bytes);
}
goto cleanup;
}
/* if compressed, provide the uncompressed size */
if (compressed) {
sz = nbitmap;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) {
free(bo.bytes);
goto cleanup;
}
}
/* add the object */
boptr = &bo;
rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT);
}
if (compressed) {
free(bo.bytes);
}
cleanup:
if (NULL != slots) {
free(slots);
}
if (NULL != flags) {
free(flags);
}
return rc;
}
int orte_util_parse_node_info(opal_buffer_t *buf)
{
int8_t i8;
int rc = ORTE_SUCCESS, cnt, n, m;
orte_node_t *nptr;
size_t sz;
opal_byte_object_t *boptr;
uint16_t *slots = NULL;
uint8_t *flags = NULL;
uint8_t *topologies = NULL;
orte_topology_t *t2, **tps = NULL;
hwloc_topology_t topo;
char *sig;
/* check to see if we have uniform topologies */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* we already defaulted to uniform topology, so only need to
* process this if it is non-uniform */
if (1 < i8) {
/* create an array to cache these */
tps = (orte_topology_t**)malloc(sizeof(orte_topology_t*));
for (n=0; n < i8; n++) {
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &sig, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &topo, &cnt, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* new topology - record it */
t2 = OBJ_NEW(orte_topology_t);
t2->sig = sig;
t2->topo = topo;
opal_pointer_array_add(orte_node_topologies, t2);
/* keep a cached copy */
tps[n] = t2;
}
/* now get the array of assigned topologies */
/* if compressed, get the uncompressed size */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the topologies object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if compressed, decompress */
if (1 == i8) {
if (!opal_compress.decompress_block((uint8_t**)&topologies, sz,
boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR);
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
rc = ORTE_ERROR;
goto cleanup;
}
} else {
topologies = (uint8_t*)boptr->bytes;
boptr->bytes = NULL;
boptr->size = 0;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
/* cycle across the node pool and assign the values */
for (n=0, m=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
nptr->topology = tps[topologies[m]];
++m;
}
}
}
/* check to see if we have uniform slot assignments */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if so, then make every node the same */
if (0 > i8) {
i8 = -1 * i8;
for (n=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
nptr->slots = i8;
}
}
} else {
/* if compressed, get the uncompressed size */
if (1 == i8) {
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* unpack the slots object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if compressed, decompress */
if (1 == i8) {
if (!opal_compress.decompress_block((uint8_t**)&slots, sz,
boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR);
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
rc = ORTE_ERROR;
goto cleanup;
}
} else {
slots = (uint16_t*)boptr->bytes;
boptr->bytes = NULL;
boptr->size = 0;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
/* cycle across the node pool and assign the values */
for (n=0, m=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
nptr->slots = slots[m];
++m;
}
}
}
/* check to see if we have uniform flag assignments */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if so, then make every node the same */
if (0 > i8) {
i8 += 2;
for (n=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
if (i8) {
ORTE_FLAG_SET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN);
} else {
ORTE_FLAG_UNSET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN);
}
}
}
} else {
/* if compressed, get the uncompressed size */
if (1 == i8) {
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* unpack the slots object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if compressed, decompress */
if (1 == i8) {
if (!opal_compress.decompress_block((uint8_t**)&flags, sz,
boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR);
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
rc = ORTE_ERROR;
goto cleanup;
}
} else {
flags = (uint8_t*)boptr->bytes;
boptr->bytes = NULL;
boptr->size = 0;
}
if (NULL != boptr->bytes) {
free(boptr->bytes);
}
free(boptr);
/* cycle across the node pool and assign the values */
for (n=0, m=0; n < orte_node_pool->size; n++) {
if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
if (flags[m]) {
ORTE_FLAG_SET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN);
} else {
ORTE_FLAG_UNSET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN);
}
++m;
}
}
}
cleanup:
if (NULL != slots) {
free(slots);
}
if (NULL != flags) {
free(flags);
}
if (NULL != tps) {
free(tps);
}
if (NULL != topologies) {
free(topologies);
}
return rc;
}
int orte_util_generate_ppn(orte_job_t *jdata,
opal_buffer_t *buf)
{

Просмотреть файл

@ -29,11 +29,20 @@
#include "opal/dss/dss_types.h"
#include "orte/runtime/orte_globals.h"
/* pass info about the nodes in an allocation */
ORTE_DECLSPEC int orte_util_nidmap_create(opal_pointer_array_t *pool,
opal_buffer_t *buf);
ORTE_DECLSPEC int orte_util_decode_nidmap(opal_buffer_t *buf);
/* pass topology and #slots info */
ORTE_DECLSPEC int orte_util_pass_node_info(opal_buffer_t *buf);
ORTE_DECLSPEC int orte_util_parse_node_info(opal_buffer_t *buf);
/* pass info about node assignments for a specific job */
ORTE_DECLSPEC int orte_util_generate_ppn(orte_job_t *jdata,
opal_buffer_t *buf);