1
1

orte: use compression for ORTE_DAEMON_REPORT_TOPOLOGY_CMD answer

Refs open-mpi/ompi#3414

Signed-off-by: Gilles Gouaillardet <gilles@rist.or.jp>
Этот коммит содержится в:
Gilles Gouaillardet 2017-04-27 17:21:59 +09:00
родитель 49cd40b2df
Коммит 57b4144e57
2 изменённых файлов: 108 добавлений и 13 удалений

Просмотреть файл

@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
int i;
uint32_t h;
orte_job_t *jdata;
uint8_t flag;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
opal_buffer_t datbuf, *data;
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:daemon_topology recvd for daemon %s",
@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
orted_failed_launch = true;
goto CLEANUP;
}
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if (flag) {
/* unpack the data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* unpack the unpacked data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
idx = inlen;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
/* unpack the topology signature for this node */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
/* unpack the topology */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
/* unpack any coprocessors */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
}
/* see if this daemon is on a coprocessor */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;

Просмотреть файл

@ -15,7 +15,7 @@
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
* Copyright (c) 2016-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -59,6 +59,7 @@
#include "orte/util/session_dir.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/compress.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
int32_t signal;
orte_jobid_t job;
char *contact_info;
opal_buffer_t *answer;
opal_buffer_t data, *answer;
orte_job_t *jdata;
orte_process_name_t proc, proc2;
orte_process_name_t *return_addr;
@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
char *rtmod;
char *coprocessors;
orte_job_map_t *map;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;
/* unpack the command */
n = 1;
@ -620,23 +624,23 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
/**** REPORT TOPOLOGY COMMAND ****/
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
answer = OBJ_NEW(opal_buffer_t);
OBJ_CONSTRUCT(&data, opal_buffer_t);
/* pack the topology signature */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}
/* pack the topology */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}
/* detect and add any coprocessors */
coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology);
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL != coprocessors) {
@ -644,12 +648,54 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
/* see if I am on a coprocessor */
coprocessors = opal_hwloc_base_check_on_coprocessor();
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL!= coprocessors) {
free(coprocessors);
}
answer = OBJ_NEW(opal_buffer_t);
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed info */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&data);
free(cmpdata);
}
/* transfer the payload across */
opal_dss.copy_payload(answer, &data);
OBJ_DESTRUCT(&data);
}
/* send the data */
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT,