1
1

Merge pull request #3424 from ggouaillardet/topic/compress_hwloc_topo

compress the XML topology sent out-of-band
Этот коммит содержится в:
Gilles Gouaillardet 2017-04-28 11:05:58 +09:00 коммит произвёл GitHub
родитель 1707022f12 57b4144e57
Коммит c793dc8881
3 изменённых файлов: 210 добавлений и 16 удалений

Просмотреть файл

@ -14,7 +14,7 @@
* et Automatique. All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 IBM Corporation. All rights reserved.
* $COPYRIGHT$
@ -817,6 +817,10 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
int i;
uint32_t h;
orte_job_t *jdata;
uint8_t flag;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
opal_buffer_t datbuf, *data;
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:daemon_topology recvd for daemon %s",
@ -832,10 +836,55 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
orted_failed_launch = true;
goto CLEANUP;
}
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if (flag) {
/* unpack the data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* unpack the unpacked data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
idx = inlen;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
/* unpack the topology signature for this node */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -861,7 +910,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
/* unpack the topology */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -873,7 +922,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
/* unpack any coprocessors */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -900,7 +949,7 @@ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender,
}
/* see if this daemon is on a coprocessor */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &coprocessors, &idx, OPAL_STRING))) {
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
@ -1088,8 +1137,57 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
/* rank=1 always sends its topology back */
topo = NULL;
if (1 == dname.vpid) {
uint8_t flag;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
opal_buffer_t datbuf, *data;
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &topo, &idx, OPAL_HWLOC_TOPO))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
if (flag) {
/* unpack the data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* unpack the unpacked data size */
idx=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
idx = inlen;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
idx=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;

Просмотреть файл

@ -15,7 +15,7 @@
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
* Copyright (c) 2016-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -59,6 +59,7 @@
#include "orte/util/session_dir.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/compress.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
@ -101,7 +102,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
int32_t signal;
orte_jobid_t job;
char *contact_info;
opal_buffer_t *answer;
opal_buffer_t data, *answer;
orte_job_t *jdata;
orte_process_name_t proc, proc2;
orte_process_name_t *return_addr;
@ -124,6 +125,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
char *rtmod;
char *coprocessors;
orte_job_map_t *map;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;
/* unpack the command */
n = 1;
@ -620,23 +624,23 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
/**** REPORT TOPOLOGY COMMAND ****/
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
answer = OBJ_NEW(opal_buffer_t);
OBJ_CONSTRUCT(&data, opal_buffer_t);
/* pack the topology signature */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &orte_topo_signature, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &orte_topo_signature, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}
/* pack the topology */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
OBJ_DESTRUCT(&data);
goto CLEANUP;
}
/* detect and add any coprocessors */
coprocessors = opal_hwloc_base_find_coprocessors(opal_hwloc_topology);
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL != coprocessors) {
@ -644,12 +648,54 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
/* see if I am on a coprocessor */
coprocessors = opal_hwloc_base_check_on_coprocessor();
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &coprocessors, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &coprocessors, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
}
if (NULL!= coprocessors) {
free(coprocessors);
}
answer = OBJ_NEW(opal_buffer_t);
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed info */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&data);
free(cmpdata);
}
/* transfer the payload across */
opal_dss.copy_payload(answer, &data);
OBJ_DESTRUCT(&data);
}
/* send the data */
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
sender, answer, ORTE_RML_TAG_TOPOLOGY_REPORT,

Просмотреть файл

@ -76,6 +76,7 @@
#include "orte/util/parse_options.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/util/pre_condition_transports.h"
#include "orte/util/compress.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
@ -793,9 +794,58 @@ int orte_daemon(int argc, char *argv[])
/* if we are rank=1, then send our topology back - otherwise, mpirun
* will request it if necessary */
if (1 == ORTE_PROC_MY_NAME->vpid) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
opal_buffer_t data;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;
/* setup an intermediate buffer */
OBJ_CONSTRUCT(&data, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&data, &opal_hwloc_topology, 1, OPAL_HWLOC_TOPO))) {
ORTE_ERROR_LOG(ret);
}
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
/* pack the compressed info */
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(cmpdata);
OBJ_DESTRUCT(&data);
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&data);
free(cmpdata);
}
/* transfer the payload across */
opal_dss.copy_payload(buffer, &data);
OBJ_DESTRUCT(&data);
}
}
/* send it to the designated target */