compress the topology sent by the first orted
Refs open-mpi/ompi#3414 Signed-off-by: Gilles Gouaillardet <gilles@rist.or.jp>
Этот коммит содержится в:
родитель
28281190eb
Коммит
49cd40b2df
@ -14,7 +14,7 @@
|
||||
* et Automatique. All rights reserved.
|
||||
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
|
||||
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2014-2016 Research Organization for Information Science
|
||||
* Copyright (c) 2014-2017 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2016 IBM Corporation. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -1088,8 +1088,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
|
||||
/* rank=1 always sends its topology back */
|
||||
topo = NULL;
|
||||
if (1 == dname.vpid) {
|
||||
uint8_t flag;
|
||||
size_t inlen, cmplen;
|
||||
uint8_t *packed_data, *cmpdata;
|
||||
opal_buffer_t datbuf, *data;
|
||||
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
|
||||
/* unpack the flag to see if this payload is compressed */
|
||||
idx=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (flag) {
|
||||
/* unpack the data size */
|
||||
idx=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* unpack the unpacked data size */
|
||||
idx=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* allocate the space */
|
||||
packed_data = (uint8_t*)malloc(inlen);
|
||||
/* unpack the data blob */
|
||||
idx = inlen;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* decompress the data */
|
||||
if (orte_util_uncompress_block(&cmpdata, cmplen,
|
||||
packed_data, inlen)) {
|
||||
/* the data has been uncompressed */
|
||||
opal_dss.load(&datbuf, cmpdata, cmplen);
|
||||
data = &datbuf;
|
||||
} else {
|
||||
data = buffer;
|
||||
}
|
||||
free(packed_data);
|
||||
} else {
|
||||
data = buffer;
|
||||
}
|
||||
idx=1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
orted_failed_launch = true;
|
||||
goto CLEANUP;
|
||||
|
@ -76,6 +76,7 @@
|
||||
#include "orte/util/parse_options.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/util/pre_condition_transports.h"
|
||||
#include "orte/util/compress.h"
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[])
|
||||
/* if we are rank=1, then send our topology back - otherwise, mpirun
|
||||
* will request it if necessary */
|
||||
if (1 == ORTE_PROC_MY_NAME->vpid) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
|
||||
opal_buffer_t data;
|
||||
int8_t flag;
|
||||
uint8_t *cmpdata;
|
||||
size_t cmplen;
|
||||
|
||||
/* setup an intermediate buffer */
|
||||
OBJ_CONSTRUCT(&data, opal_buffer_t);
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
}
|
||||
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
|
||||
&cmpdata, &cmplen)) {
|
||||
/* the data was compressed - mark that we compressed it */
|
||||
flag = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
free(cmpdata);
|
||||
OBJ_DESTRUCT(&data);
|
||||
}
|
||||
/* pack the compressed length */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
free(cmpdata);
|
||||
OBJ_DESTRUCT(&data);
|
||||
}
|
||||
/* pack the uncompressed length */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
free(cmpdata);
|
||||
OBJ_DESTRUCT(&data);
|
||||
}
|
||||
/* pack the compressed info */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
free(cmpdata);
|
||||
OBJ_DESTRUCT(&data);
|
||||
}
|
||||
OBJ_DESTRUCT(&data);
|
||||
free(cmpdata);
|
||||
} else {
|
||||
/* mark that it was not compressed */
|
||||
flag = 0;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&data);
|
||||
free(cmpdata);
|
||||
}
|
||||
/* transfer the payload across */
|
||||
opal_dss.copy_payload(buffer, &data);
|
||||
OBJ_DESTRUCT(&data);
|
||||
}
|
||||
}
|
||||
|
||||
/* send it to the designated target */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user