diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 8bedfef7d0..7554cd17d6 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -14,7 +14,7 @@ * et Automatique. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2014-2016 Research Organization for Information Science + * Copyright (c) 2014-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. * $COPYRIGHT$ @@ -1088,8 +1088,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender, /* rank=1 always sends its topology back */ topo = NULL; if (1 == dname.vpid) { + uint8_t flag; + size_t inlen, cmplen; + uint8_t *packed_data, *cmpdata; + opal_buffer_t datbuf, *data; + OBJ_CONSTRUCT(&datbuf, opal_buffer_t); + /* unpack the flag to see if this payload is compressed */ idx=1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + if (flag) { + /* unpack the data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* unpack the unpacked data size */ + idx=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* allocate the space */ + packed_data = (uint8_t*)malloc(inlen); + /* unpack the data blob */ + idx = inlen; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + orted_failed_launch = true; + goto CLEANUP; + } + /* decompress the data */ + if (orte_util_uncompress_block(&cmpdata, cmplen, + packed_data, inlen)) { + /* the data has been uncompressed */ + opal_dss.load(&datbuf, cmpdata, cmplen); + data = &datbuf; + } else { + data = buffer; + } + free(packed_data); + } else { + data = buffer; + } + idx=1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 76b62f6d1e..c21e0f54f6 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -76,6 +76,7 @@ #include "orte/util/parse_options.h" #include "orte/mca/rml/base/rml_contact.h" #include "orte/util/pre_condition_transports.h" +#include "orte/util/compress.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" @@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[]) /* if we are rank=1, then send our topology back - otherwise, mpirun * will request it if necessary */ if (1 == ORTE_PROC_MY_NAME->vpid) { - if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { + opal_buffer_t data; + int8_t flag; + uint8_t *cmpdata; + size_t cmplen; + + /* setup an intermediate buffer */ + OBJ_CONSTRUCT(&data, opal_buffer_t); + + if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(ret); } + if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used, + &cmpdata, &cmplen)) { + /* the data was compressed - mark that we compressed it */ + flag = 1; + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the uncompressed length */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + /* pack the compressed info */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) { + ORTE_ERROR_LOG(ret); + free(cmpdata); + OBJ_DESTRUCT(&data); + } + OBJ_DESTRUCT(&data); + free(cmpdata); + } else { + /* mark that it was not compressed */ + flag = 0; + if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&data); + free(cmpdata); + } + /* transfer the payload across */ + opal_dss.copy_payload(buffer, &data); + OBJ_DESTRUCT(&data); + } } /* send it to the designated target */