diff --git a/orte/mca/odls/odls_types.h b/orte/mca/odls/odls_types.h index 539f9a6ef5..aabbb34b3b 100644 --- a/orte/mca/odls/odls_types.h +++ b/orte/mca/odls/odls_types.h @@ -12,7 +12,7 @@ * Copyright (c) 2011-2016 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -93,6 +93,9 @@ typedef uint8_t orte_daemon_cmd_flag_t; /* tell DVM daemons to cleanup resources from job */ #define ORTE_DAEMON_DVM_CLEANUP_JOB_CMD (orte_daemon_cmd_flag_t) 34 +/* pass node info */ +#define ORTE_DAEMON_PASS_NODE_INFO_CMD (orte_daemon_cmd_flag_t) 35 + /* * Struct written up the pipe from the child to the parent. */ diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 363c006233..57f609bfc1 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -130,7 +130,11 @@ void orte_plm_base_daemons_reported(int fd, short args, void *cbdata) orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_topology_t *t; orte_node_t *node; - int i; + int i, rc; + uint8_t u8; + opal_buffer_t buf; + orte_grpcomm_signature_t *sig; + orte_daemon_cmd_flag_t command = ORTE_DAEMON_PASS_NODE_INFO_CMD; ORTE_ACQUIRE_OBJECT(caddy); @@ -177,6 +181,78 @@ void orte_plm_base_daemons_reported(int fd, short args, void *cbdata) /* ensure we update the routing plan */ orte_routed.update_routing_plan(NULL); + /* prep the buffer */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + /* load the command */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + + + /* if we did not execute a tree-spawn, then the daemons do + * not currently have a nidmap for the job - in that case, + * send one to them */ + if (!orte_nidmap_communicated) { + u8 = 1; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &u8, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + if (OPAL_SUCCESS != (rc = orte_util_nidmap_create(orte_node_pool, &buf))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + orte_nidmap_communicated = true; + } else { + u8 = 0; + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &u8, 1, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + } + + /* we always send the topologies and the #slots on each node. Note + * that we cannot send the #slots until after the above step since, + * for unmanaged allocations, we might have just determined it! */ + if (OPAL_SUCCESS != (rc = orte_util_pass_node_info(&buf))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + + /* goes to all daemons */ + sig = OBJ_NEW(orte_grpcomm_signature_t); + sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); + sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid; + sig->signature[0].vpid = ORTE_VPID_WILDCARD; + sig->sz = 1; + if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, &buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(sig); + OBJ_DESTRUCT(&buf); + ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); + OBJ_RELEASE(caddy); + return; + } + OBJ_DESTRUCT(&buf); + /* maintain accounting */ + OBJ_RELEASE(sig); + /* progress the job */ caddy->jdata->state = ORTE_JOB_STATE_DAEMONS_REPORTED; ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_VM_READY); diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 6cd1d5b15a..b07e86e6ab 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -59,6 +59,7 @@ #include "orte/util/proc_info.h" #include "orte/util/session_dir.h" #include "orte/util/name_fns.h" +#include "orte/util/nidmap.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/base/base.h" @@ -126,7 +127,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, char *coprocessors; orte_job_map_t *map; int8_t flag; - uint8_t *cmpdata; + uint8_t *cmpdata, u8; size_t cmplen; /* unpack the command */ @@ -241,6 +242,32 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, } break; + + case ORTE_DAEMON_PASS_NODE_INFO_CMD: + if (orte_debug_daemons_flag) { + opal_output(0, "%s orted_cmd: received pass_node_info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + if (!ORTE_PROC_IS_HNP) { + n = 1; + if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &u8, &n, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + if (1 == u8) { + if (ORTE_SUCCESS != (ret = orte_util_decode_nidmap(buffer))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } + if (ORTE_SUCCESS != (ret = orte_util_parse_node_info(buffer))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } + break; + + /**** ADD_LOCAL_PROCS ****/ case ORTE_DAEMON_ADD_LOCAL_PROCS: case ORTE_DAEMON_DVM_ADD_PROCS: diff --git a/orte/util/nidmap.c b/orte/util/nidmap.c index 04eaad570d..73dfe518f6 100644 --- a/orte/util/nidmap.c +++ b/orte/util/nidmap.c @@ -24,7 +24,7 @@ #include "opal/util/argv.h" #include "orte/mca/errmgr/errmgr.h" -#include "orte/mca/rmaps/rmaps_types.h" +#include "orte/mca/rmaps/base/base.h" #include "orte/mca/routed/routed.h" #include "orte/runtime/orte_globals.h" @@ -34,11 +34,10 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool, opal_buffer_t *buffer) { char *raw = NULL; - uint8_t *vpids=NULL, *flags=NULL, u8; + uint8_t *vpids=NULL, u8; uint16_t u16; - uint16_t *slots=NULL; uint32_t u32; - int n, ndaemons, rc, nbytes, nbitmap; + int n, ndaemons, rc, nbytes; bool compressed; char **names = NULL, **ranks = NULL; orte_node_t *nptr; @@ -84,13 +83,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool, } vpids = (uint8_t*)malloc(nbytes * pool->size); - /* make room for the number of slots on each node */ - slots = (uint16_t*)malloc(sizeof(uint16_t) * pool->size); - - /* and for the flags for each node - only need one bit/node */ - nbitmap = (pool->size / 8) + 1; - flags = (uint8_t*)calloc(1, nbitmap); - ndaemons = 0; for (n=0; n < pool->size; n++) { if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(pool, n))) { @@ -120,12 +112,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool, } memcpy(&vpids[nbytes*ndaemons], &u32, 4); } - /* store the number of slots */ - slots[n] = nptr->slots; - /* store the flag */ - if (ORTE_FLAG_TEST(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN)) { - flags[n/8] |= (1 << (7 - (n % 8))); - } ++ndaemons; } @@ -215,76 +201,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool, free(bo.bytes); } - /* compress the slots */ - if (opal_compress.compress_block((uint8_t*)slots, sizeof(uint16_t)*ndaemons, - (uint8_t**)&bo.bytes, &sz)) { - /* mark that this was compressed */ - compressed = true; - bo.size = sz; - } else { - /* mark that this was not compressed */ - compressed = false; - bo.bytes = (uint8_t*)slots; - bo.size = sizeof(uint16_t)*ndaemons; - } - /* indicate compression */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) { - if (compressed) { - free(bo.bytes); - } - goto cleanup; - } - /* if compressed, provide the uncompressed size */ - if (compressed) { - sz = sizeof(uint16_t)*ndaemons; - if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) { - free(bo.bytes); - goto cleanup; - } - } - /* add the object */ - boptr = &bo; - if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) { - if (compressed) { - free(bo.bytes); - } - goto cleanup; - } - if (compressed) { - free(bo.bytes); - } - - /* compress the flags */ - if (opal_compress.compress_block(flags, nbitmap, - (uint8_t**)&bo.bytes, &sz)) { - /* mark that this was compressed */ - compressed = true; - bo.size = sz; - } else { - /* mark that this was not compressed */ - compressed = false; - bo.bytes = flags; - bo.size = nbitmap; - } - /* indicate compression */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &compressed, 1, OPAL_BOOL))) { - if (compressed) { - free(bo.bytes); - } - goto cleanup; - } - /* if compressed, provide the uncompressed size */ - if (compressed) { - sz = nbitmap; - if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) { - free(bo.bytes); - goto cleanup; - } - } - /* add the object */ - boptr = &bo; - rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); - cleanup: if (NULL != names) { opal_argv_free(names); @@ -298,12 +214,6 @@ int orte_util_nidmap_create(opal_pointer_array_t *pool, if (NULL != vpids) { free(vpids); } - if (NULL != slots) { - free(slots); - } - if (NULL != flags) { - free(flags); - } return rc; } @@ -574,10 +484,9 @@ int orte_util_decode_nidmap(opal_buffer_t *buf) if (1 == flags[n]) { ORTE_FLAG_SET(nd, ORTE_NODE_FLAG_SLOTS_GIVEN); } - /* set the topology */ -#if !OPAL_ENABLE_HETEROGENEOUS_SUPPORT + /* set the topology - always default to homogeneous + * as that is the most common scenario */ nd->topology = t; -#endif /* see if it has a daemon on it */ if (1 == nbytes && UINT8_MAX != vp8[n]) { vpid = vp8[n]; @@ -620,6 +529,504 @@ int orte_util_decode_nidmap(opal_buffer_t *buf) return rc; } +typedef struct { + opal_list_item_t super; + orte_topology_t *t; +} orte_tptr_trk_t; +static OBJ_CLASS_INSTANCE(orte_tptr_trk_t, + opal_list_item_t, + NULL, NULL); + +int orte_util_pass_node_info(opal_buffer_t *buffer) +{ + uint16_t *slots=NULL, slot = UINT16_MAX; + uint8_t *flags=NULL, flag = UINT8_MAX, *topologies = NULL; + int8_t i8, ntopos; + int rc, n, nbitmap, nstart; + bool compressed, unislots = true, uniflags = true, unitopos = true; + orte_node_t *nptr; + opal_byte_object_t bo, *boptr; + size_t sz, nslots; + opal_buffer_t bucket; + orte_tptr_trk_t *trk; + opal_list_t topos; + orte_topology_t *t; + + /* make room for the number of slots on each node */ + nslots = sizeof(uint16_t) * orte_node_pool->size; + slots = (uint16_t*)malloc(nslots); + /* and for the flags for each node - only need one bit/node */ + nbitmap = (orte_node_pool->size / 8) + 1; + flags = (uint8_t*)calloc(1, nbitmap); + + /* handle the topologies - as the most common case by far + * is to have homogeneous topologies, we only send them + * if something is different. We know that the HNP is + * the first topology, and that any differing topology + * on the compute nodes must follow. So send the topologies + * if and only if: + * + * (a) the HNP is being used to house application procs and + * there is more than one topology in our array; or + * + * (b) the HNP is not being used, but there are more than + * two topologies in our array, thus indicating that + * there are multiple topologies on the compute nodes + */ + if (!orte_hnp_is_allocated || (ORTE_GET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping) & ORTE_MAPPING_NO_USE_LOCAL)) { + nstart = 1; + } else { + nstart = 0; + } + OBJ_CONSTRUCT(&topos, opal_list_t); + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + for (n=nstart; n < orte_node_topologies->size; n++) { + if (NULL == (t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, n))) { + continue; + } + trk = OBJ_NEW(orte_tptr_trk_t); + trk->t = t; + opal_list_append(&topos, &trk->super); + /* pack this topology string */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->sig, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&bucket); + goto cleanup; + } + /* pack the topology itself */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &t->topo, 1, OPAL_HWLOC_TOPO))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&bucket); + goto cleanup; + } + } + /* pack the number of topologies in allocation */ + ntopos = opal_list_get_size(&topos); + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &ntopos, 1, OPAL_INT8))) { + goto cleanup; + } + if (1 < ntopos) { + /* need to send them along */ + opal_dss.copy_payload(buffer, &bucket); + /* allocate space to report them */ + ntopos = orte_node_pool->size; + topologies = (uint8_t*)malloc(ntopos); + unitopos = false; + } + OBJ_DESTRUCT(&bucket); + + for (n=0; n < orte_node_pool->size; n++) { + if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + continue; + } + /* store the topology, if required */ + if (!unitopos) { + topologies[n] = 0; + if (0 == nstart || 0 < n) { + OPAL_LIST_FOREACH(trk, &topos, orte_tptr_trk_t) { + if (trk->t == nptr->topology) { + break; + } + topologies[n]++; + } + } + } + /* store the number of slots */ + slots[n] = nptr->slots; + if (UINT16_MAX == slot) { + slot = nptr->slots; + } else if (slot != nptr->slots) { + unislots = false; + } + /* store the flag */ + if (ORTE_FLAG_TEST(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN)) { + flags[n/8] |= (1 << (7 - (n % 8))); + if (UINT8_MAX == flag) { + flag = 1; + } else if (1 != flag) { + uniflags = false; + } + } else { + if (UINT8_MAX == flag) { + flag = 0; + } else if (0 != flag) { + uniflags = false; + } + } + } + + /* deal with the topology assignments */ + if (!unitopos) { + if (opal_compress.compress_block((uint8_t*)topologies, ntopos, + (uint8_t**)&bo.bytes, &sz)) { + /* mark that this was compressed */ + i8 = 1; + compressed = true; + bo.size = sz; + } else { + /* mark that this was not compressed */ + i8 = 0; + compressed = false; + bo.bytes = topologies; + bo.size = nbitmap; + } + /* indicate compression */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) { + if (compressed) { + free(bo.bytes); + } + goto cleanup; + } + /* if compressed, provide the uncompressed size */ + if (compressed) { + sz = nslots; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) { + free(bo.bytes); + goto cleanup; + } + } + /* add the object */ + boptr = &bo; + rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); + } + if (compressed) { + free(bo.bytes); + } + + /* if we have uniform #slots, then just flag it - no + * need to pass anything */ + if (unislots) { + i8 = -1 * slot; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) { + goto cleanup; + } + } else { + if (opal_compress.compress_block((uint8_t*)slots, nslots, + (uint8_t**)&bo.bytes, &sz)) { + /* mark that this was compressed */ + i8 = 1; + compressed = true; + bo.size = sz; + } else { + /* mark that this was not compressed */ + i8 = 0; + compressed = false; + bo.bytes = flags; + bo.size = nbitmap; + } + /* indicate compression */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) { + if (compressed) { + free(bo.bytes); + } + goto cleanup; + } + /* if compressed, provide the uncompressed size */ + if (compressed) { + sz = nslots; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) { + free(bo.bytes); + goto cleanup; + } + } + /* add the object */ + boptr = &bo; + rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); + } + if (compressed) { + free(bo.bytes); + } + + /* if we have uniform flags, then just flag it - no + * need to pass anything */ + if (uniflags) { + if (1 == flag) { + i8 = -1; + } else { + i8 = -2; + } + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) { + goto cleanup; + } + } else { + if (opal_compress.compress_block(flags, nbitmap, + (uint8_t**)&bo.bytes, &sz)) { + /* mark that this was compressed */ + i8 = 2; + compressed = true; + bo.size = sz; + } else { + /* mark that this was not compressed */ + i8 = 3; + compressed = false; + bo.bytes = flags; + bo.size = nbitmap; + } + /* indicate compression */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &i8, 1, OPAL_INT8))) { + if (compressed) { + free(bo.bytes); + } + goto cleanup; + } + /* if compressed, provide the uncompressed size */ + if (compressed) { + sz = nbitmap; + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sz, 1, OPAL_SIZE))) { + free(bo.bytes); + goto cleanup; + } + } + /* add the object */ + boptr = &bo; + rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT); + } + if (compressed) { + free(bo.bytes); + } + + cleanup: + if (NULL != slots) { + free(slots); + } + if (NULL != flags) { + free(flags); + } + return rc; +} + +int orte_util_parse_node_info(opal_buffer_t *buf) +{ + int8_t i8; + int rc = ORTE_SUCCESS, cnt, n, m; + orte_node_t *nptr; + size_t sz; + opal_byte_object_t *boptr; + uint16_t *slots = NULL; + uint8_t *flags = NULL; + uint8_t *topologies = NULL; + orte_topology_t *t2, **tps = NULL; + hwloc_topology_t topo; + char *sig; + + /* check to see if we have uniform topologies */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* we already defaulted to uniform topology, so only need to + * process this if it is non-uniform */ + if (1 < i8) { + /* create an array to cache these */ + tps = (orte_topology_t**)malloc(sizeof(orte_topology_t*)); + for (n=0; n < i8; n++) { + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &sig, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &topo, &cnt, OPAL_HWLOC_TOPO))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* new topology - record it */ + t2 = OBJ_NEW(orte_topology_t); + t2->sig = sig; + t2->topo = topo; + opal_pointer_array_add(orte_node_topologies, t2); + /* keep a cached copy */ + tps[n] = t2; + } + /* now get the array of assigned topologies */ + /* if compressed, get the uncompressed size */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* unpack the topologies object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* if compressed, decompress */ + if (1 == i8) { + if (!opal_compress.decompress_block((uint8_t**)&topologies, sz, + boptr->bytes, boptr->size)) { + ORTE_ERROR_LOG(ORTE_ERROR); + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + rc = ORTE_ERROR; + goto cleanup; + } + } else { + topologies = (uint8_t*)boptr->bytes; + boptr->bytes = NULL; + boptr->size = 0; + } + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + /* cycle across the node pool and assign the values */ + for (n=0, m=0; n < orte_node_pool->size; n++) { + if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + nptr->topology = tps[topologies[m]]; + ++m; + } + } + } + + /* check to see if we have uniform slot assignments */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* if so, then make every node the same */ + if (0 > i8) { + i8 = -1 * i8; + for (n=0; n < orte_node_pool->size; n++) { + if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + nptr->slots = i8; + } + } + } else { + /* if compressed, get the uncompressed size */ + if (1 == i8) { + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + /* unpack the slots object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* if compressed, decompress */ + if (1 == i8) { + if (!opal_compress.decompress_block((uint8_t**)&slots, sz, + boptr->bytes, boptr->size)) { + ORTE_ERROR_LOG(ORTE_ERROR); + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + rc = ORTE_ERROR; + goto cleanup; + } + } else { + slots = (uint16_t*)boptr->bytes; + boptr->bytes = NULL; + boptr->size = 0; + } + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + /* cycle across the node pool and assign the values */ + for (n=0, m=0; n < orte_node_pool->size; n++) { + if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + nptr->slots = slots[m]; + ++m; + } + } + } + + /* check to see if we have uniform flag assignments */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &i8, &cnt, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* if so, then make every node the same */ + if (0 > i8) { + i8 += 2; + for (n=0; n < orte_node_pool->size; n++) { + if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + if (i8) { + ORTE_FLAG_SET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN); + } else { + ORTE_FLAG_UNSET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN); + } + } + } + } else { + /* if compressed, get the uncompressed size */ + if (1 == i8) { + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sz, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + /* unpack the slots object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + /* if compressed, decompress */ + if (1 == i8) { + if (!opal_compress.decompress_block((uint8_t**)&flags, sz, + boptr->bytes, boptr->size)) { + ORTE_ERROR_LOG(ORTE_ERROR); + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + rc = ORTE_ERROR; + goto cleanup; + } + } else { + flags = (uint8_t*)boptr->bytes; + boptr->bytes = NULL; + boptr->size = 0; + } + if (NULL != boptr->bytes) { + free(boptr->bytes); + } + free(boptr); + /* cycle across the node pool and assign the values */ + for (n=0, m=0; n < orte_node_pool->size; n++) { + if (NULL != (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) { + if (flags[m]) { + ORTE_FLAG_SET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN); + } else { + ORTE_FLAG_UNSET(nptr, ORTE_NODE_FLAG_SLOTS_GIVEN); + } + ++m; + } + } + } + + cleanup: + if (NULL != slots) { + free(slots); + } + if (NULL != flags) { + free(flags); + } + if (NULL != tps) { + free(tps); + } + if (NULL != topologies) { + free(topologies); + } + return rc; +} + + int orte_util_generate_ppn(orte_job_t *jdata, opal_buffer_t *buf) { diff --git a/orte/util/nidmap.h b/orte/util/nidmap.h index ac935f58cc..ab728176aa 100644 --- a/orte/util/nidmap.h +++ b/orte/util/nidmap.h @@ -29,11 +29,20 @@ #include "opal/dss/dss_types.h" #include "orte/runtime/orte_globals.h" +/* pass info about the nodes in an allocation */ ORTE_DECLSPEC int orte_util_nidmap_create(opal_pointer_array_t *pool, opal_buffer_t *buf); ORTE_DECLSPEC int orte_util_decode_nidmap(opal_buffer_t *buf); + +/* pass topology and #slots info */ +ORTE_DECLSPEC int orte_util_pass_node_info(opal_buffer_t *buf); + +ORTE_DECLSPEC int orte_util_parse_node_info(opal_buffer_t *buf); + + +/* pass info about node assignments for a specific job */ ORTE_DECLSPEC int orte_util_generate_ppn(orte_job_t *jdata, opal_buffer_t *buf);