1
1

Merge pull request #2766 from rhc54/topic/zlib

Compress the xcast message if bigger than a defined size to further improve launch performance at scale
Этот коммит содержится в:
Ralph Castain 2017-01-19 23:14:04 -08:00 коммит произвёл GitHub
родитель 37ee823a0f 668421b6ec
Коммит 33d97b22bc
9 изменённых файлов: 418 добавлений и 45 удалений

Просмотреть файл

@ -17,7 +17,7 @@ dnl Copyright (c) 2009 Los Alamos National Security, LLC. All rights
dnl reserved. dnl reserved.
dnl Copyright (c) 2009-2011 Oak Ridge National Labs. All rights reserved. dnl Copyright (c) 2009-2011 Oak Ridge National Labs. All rights reserved.
dnl Copyright (c) 2011-2013 NVIDIA Corporation. All rights reserved. dnl Copyright (c) 2011-2013 NVIDIA Corporation. All rights reserved.
dnl Copyright (c) 2013 Intel, Inc. All rights reserved dnl Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
dnl Copyright (c) 2015 Research Organization for Information Science dnl Copyright (c) 2015 Research Organization for Information Science
dnl and Technology (RIST). All rights reserved. dnl and Technology (RIST). All rights reserved.
dnl dnl
@ -508,4 +508,8 @@ AC_DEFINE([OPAL_ENABLE_PROGRESS_THREADS],
[0], [0],
[Whether we want BTL progress threads enabled]) [Whether we want BTL progress threads enabled])
dnl Check for zlib support
OPAL_ZLIB_CONFIG
])dnl ])dnl

85
config/opal_setup_zlib.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,85 @@
# -*- shell-script -*-
#
# Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
# Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_zlib_CONFIG([action-if-found], [action-if-not-found])
# --------------------------------------------------------------------
AC_DEFUN([OPAL_ZLIB_CONFIG],[
OPAL_VAR_SCOPE_PUSH([opal_zlib_dir opal_zlib_libdir])
AC_ARG_WITH([zlib],
[AC_HELP_STRING([--with-zlib=DIR],
[Search for zlib headers and libraries in DIR ])])
AC_ARG_WITH([zlib-libdir],
[AC_HELP_STRING([--with-zlib-libdir=DIR],
[Search for zlib libraries in DIR ])])
opal_zlib_support=0
if test "$with_zlib" != "no"; then
AC_MSG_CHECKING([for zlib in])
if test ! -z "$with_zlib" && test "$with_zlib" != "yes"; then
opal_zlib_dir=$with_zlib
opal_zlib_standard_header_location=no
if test -d $with_zlib/lib; then
opal_zlib_libdir=$with_zlib/lib
elif test -d $with_zlib/lib64; then
opal_zlib_libdir=$with_zlib/lib64
else
AC_MSG_RESULT([Could not find $with_zlib/lib or $with_zlib/lib64])
AC_MSG_ERROR([Can not continue])
fi
AC_MSG_RESULT([$opal_zlib_dir and $opal_zlib_libdir])
else
AC_MSG_RESULT([(default search paths)])
opal_zlib_standard_header_location=yes
fi
AS_IF([test ! -z "$with_zlib_libdir" && test "$with_zlib_libdir" != "yes"],
[opal_zlib_libdir="$with_zlib_libdir"
opal_zlib_standard_lib_location=no],
[opal_zlib_standard_lib_location=yes])
OPAL_CHECK_PACKAGE([opal_zlib],
[zlib.h],
[z],
[deflate],
[-lz],
[$opal_zlib_dir],
[$opal_zlib_libdir],
[opal_zlib_support=1],
[opal_zlib_support=0])
if test $opal_zlib_support == "1"; then
LIBS="$LIBS -lz"
if test "$opal_zlib_standard_header_location" != "yes"; then
CPPFLAGS="$CPPFLAGS $opal_zlib_CPPFLAGS"
fi
if test "$opal_zlib_standard_lib_location" != "yes"; then
LDFLAGS="$LDFLAGS $opal_zlib_LDFLAGS"
fi
fi
fi
if test ! -z "$with_zlib" && test "$with_zlib" != "no" && test "$opal_zlib_support" != "1"; then
AC_MSG_WARN([ZLIB SUPPORT REQUESTED AND NOT FOUND])
AC_MSG_ERROR([CANNOT CONTINUE])
fi
AC_MSG_CHECKING([will zlib support be built])
if test "$opal_zlib_support" != "1"; then
AC_MSG_RESULT([no])
else
AC_MSG_RESULT([yes])
fi
AC_DEFINE_UNQUOTED([OPAL_HAVE_ZLIB], [$opal_zlib_support],
[Whether or not we have zlib support])
OPAL_VAR_SCOPE_POP
])dnl

Просмотреть файл

@ -12,7 +12,7 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved. * Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science * Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved. * and Technology (RIST). All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
@ -33,6 +33,7 @@
#include "opal/dss/dss.h" #include "opal/dss/dss.h"
#include "orte/util/compress.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
#include "orte/util/error_strings.h" #include "orte/util/error_strings.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
@ -465,28 +466,85 @@ static int pack_xcast(orte_grpcomm_signature_t *sig,
orte_rml_tag_t tag) orte_rml_tag_t tag)
{ {
int rc; int rc;
opal_buffer_t data;
int8_t flag;
uint8_t *cmpdata;
size_t cmplen;
/* setup an intermediate buffer */
OBJ_CONSTRUCT(&data, opal_buffer_t);
/* pass along the signature */ /* pass along the signature */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &sig, 1, ORTE_SIGNATURE))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto CLEANUP; OBJ_DESTRUCT(&data);
return rc;
} }
/* pass the final tag */ /* pass the final tag */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &tag, 1, ORTE_RML_TAG))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto CLEANUP; OBJ_DESTRUCT(&data);
return rc;
} }
/* copy the payload into the new buffer - this is non-destructive, so our /* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they * caller is still responsible for releasing any memory in the buffer they
* gave to us * gave to us
*/ */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buffer, message))) { if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto CLEANUP; OBJ_DESTRUCT(&data);
return rc;
}
/* see if we want to compress this message */
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
&cmpdata, &cmplen)) {
/* the data was compressed - mark that we compressed it */
flag = 1;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
free(cmpdata);
OBJ_DESTRUCT(&data);
return rc;
}
/* pack the compressed length */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
free(cmpdata);
OBJ_DESTRUCT(&data);
return rc;
}
/* pack the uncompressed length */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
free(cmpdata);
OBJ_DESTRUCT(&data);
return rc;
}
/* pack the compressed info */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
free(cmpdata);
OBJ_DESTRUCT(&data);
return rc;
}
OBJ_DESTRUCT(&data);
free(cmpdata);
} else {
/* mark that it was not compressed */
flag = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&data);
free(cmpdata);
return rc;
}
/* transfer the payload across */
opal_dss.copy_payload(buffer, &data);
OBJ_DESTRUCT(&data);
} }
CLEANUP:
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -29,6 +29,7 @@
#include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/routed.h" #include "orte/mca/routed/routed.h"
#include "orte/mca/state/state.h" #include "orte/mca/state/state.h"
#include "orte/util/compress.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/util/nidmap.h" #include "orte/util/nidmap.h"
#include "orte/util/proc_info.h" #include "orte/util/proc_info.h"
@ -261,7 +262,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
int ret, cnt; int ret, cnt;
opal_buffer_t *relay, *rly; opal_buffer_t *relay, *rly;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD; orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
opal_buffer_t wireup; opal_buffer_t wireup, datbuf, *data;
opal_byte_object_t *bo; opal_byte_object_t *bo;
int8_t flag; int8_t flag;
orte_job_t *jdata; orte_job_t *jdata;
@ -270,20 +271,71 @@ static void xcast_recv(int status, orte_process_name_t* sender,
orte_grpcomm_signature_t *sig; orte_grpcomm_signature_t *sig;
orte_rml_tag_t tag; orte_rml_tag_t tag;
char *rtmod; char *rtmod;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast:recv: with %d bytes", "%s grpcomm:direct:xcast:recv: with %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)buffer->bytes_used)); (int)buffer->bytes_used));
/* we need a passthru buffer to send to our children */ /* we need a passthru buffer to send to our children - we leave it
* as compressed data */
rly = OBJ_NEW(opal_buffer_t); rly = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(rly, buffer); opal_dss.copy_payload(rly, buffer);
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* unpack the flag to see if this payload is compressed */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
return;
}
if (flag) {
/* unpack the data size */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
return;
}
/* unpack the unpacked data size */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
return;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
cnt = inlen;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, packed_data, &cnt, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(packed_data);
ORTE_FORCED_TERMINATE(ret);
return;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
/* get the signature that we do not need */ /* get the signature that we do not need */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) { if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
ORTE_FORCED_TERMINATE(ret); ORTE_FORCED_TERMINATE(ret);
return; return;
} }
@ -291,8 +343,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
/* get the target tag */ /* get the target tag */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &tag, &cnt, ORTE_RML_TAG))) { if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
ORTE_FORCED_TERMINATE(ret); ORTE_FORCED_TERMINATE(ret);
return; return;
} }
@ -300,7 +353,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
/* setup a buffer we can pass to ourselves - this just contains /* setup a buffer we can pass to ourselves - this just contains
* the initial message, minus the headers inserted by xcast itself */ * the initial message, minus the headers inserted by xcast itself */
relay = OBJ_NEW(opal_buffer_t); relay = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(relay, buffer); opal_dss.copy_payload(relay, data);
/* setup the relay list */ /* setup the relay list */
OBJ_CONSTRUCT(&coll, opal_list_t); OBJ_CONSTRUCT(&coll, opal_list_t);
@ -313,13 +366,13 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_RML_TAG_DAEMON == tag) { if (ORTE_RML_TAG_DAEMON == tag) {
/* peek at the command */ /* peek at the command */
cnt=1; cnt=1;
if (ORTE_SUCCESS == (ret = opal_dss.unpack(buffer, &command, &cnt, ORTE_DAEMON_CMD))) { if (ORTE_SUCCESS == (ret = opal_dss.unpack(data, &command, &cnt, ORTE_DAEMON_CMD))) {
/* if it is add_procs, then... */ /* if it is add_procs, then... */
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command || if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
ORTE_DAEMON_DVM_NIDMAP_CMD == command) { ORTE_DAEMON_DVM_NIDMAP_CMD == command) {
/* extract the byte object holding the daemonmap */ /* extract the byte object holding the daemonmap */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) { if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto relay; goto relay;
} }
@ -351,7 +404,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
/* see if we have wiring info as well */ /* see if we have wiring info as well */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) { if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto relay; goto relay;
} }
@ -366,7 +419,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
} }
if (0 == flag) { if (0 == flag) {
/* copy the remainder of the payload */ /* copy the remainder of the payload */
opal_dss.copy_payload(relay, buffer); opal_dss.copy_payload(relay, data);
/* no - just return */ /* no - just return */
goto relay; goto relay;
} }
@ -374,7 +427,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
/* unpack the byte object */ /* unpack the byte object */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) { if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
goto relay; goto relay;
} }
@ -394,7 +447,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
free(bo); free(bo);
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) { if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) {
/* copy the remainder of the payload */ /* copy the remainder of the payload */
opal_dss.copy_payload(relay, buffer); opal_dss.copy_payload(relay, data);
} }
} }
} else { } else {
@ -472,6 +525,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
relay->bytes_used = 0; relay->bytes_used = 0;
} }
OBJ_RELEASE(relay); OBJ_RELEASE(relay);
OBJ_DESTRUCT(&datbuf);
} }
static void barrier_release(int status, orte_process_name_t* sender, static void barrier_release(int status, orte_process_name_t* sender,

Просмотреть файл

@ -99,7 +99,7 @@
/* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN /* IT IS CRITICAL THAT ANY CHANGE IN THE ORDER OF THE INFO PACKED IN
* THIS FUNCTION BE REFLECTED IN THE CONSTRUCT_CHILD_LIST PARSER BELOW * THIS FUNCTION BE REFLECTED IN THE CONSTRUCT_CHILD_LIST PARSER BELOW
*/ */
int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data, int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
orte_jobid_t job) orte_jobid_t job)
{ {
int rc; int rc;
@ -127,12 +127,12 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
* info as we don't need it - just pack the job itself */ * info as we don't need it - just pack the job itself */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FIXED_DVM, NULL, OPAL_BOOL)) { if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FIXED_DVM, NULL, OPAL_BOOL)) {
numjobs = 0; numjobs = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numjobs, 1, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
/* pack the job struct */ /* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata, 1, ORTE_JOB))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
} }
return rc; return rc;
@ -146,7 +146,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
/* store it */ /* store it */
boptr = &bo; boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &boptr, 1, OPAL_BYTE_OBJECT))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
@ -157,7 +157,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
if (!orte_static_ports) { if (!orte_static_ports) {
/* pack a flag indicating wiring info is provided */ /* pack a flag indicating wiring info is provided */
flag = 1; flag = 1;
opal_dss.pack(data, &flag, 1, OPAL_INT8); opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
/* get wireup info for daemons per the selected routing module */ /* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t); wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) { if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
@ -170,7 +170,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
/* pack the byte object - zero-byte objects are fine */ /* pack the byte object - zero-byte objects are fine */
bo.size = numbytes; bo.size = numbytes;
boptr = &bo; boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &boptr, 1, OPAL_BYTE_OBJECT))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup); OBJ_RELEASE(wireup);
return rc; return rc;
@ -183,7 +183,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
} else { } else {
/* pack a flag indicating no wireup data is provided */ /* pack a flag indicating no wireup data is provided */
flag = 0; flag = 0;
opal_dss.pack(data, &flag, 1, OPAL_INT8); opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
} }
/* check if this job caused daemons to be spawned - if it did, /* check if this job caused daemons to be spawned - if it did,
@ -203,6 +203,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
/* pack the job struct */ /* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &jptr, 1, ORTE_JOB))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(&jobdata, &jptr, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
return rc; return rc;
} }
++numjobs; ++numjobs;
@ -210,14 +211,15 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jptr, nptr, &nptr); rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jptr, nptr, &nptr);
} }
/* pack the number of jobs */ /* pack the number of jobs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numjobs, 1, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata);
return rc; return rc;
} }
if (0 < numjobs) { if (0 < numjobs) {
/* pack the jobdata buffer */ /* pack the jobdata buffer */
wireup = &jobdata; wireup = &jobdata;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &wireup, 1, OPAL_BUFFER))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &wireup, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&jobdata); OBJ_DESTRUCT(&jobdata);
return rc; return rc;
@ -226,7 +228,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
} }
} else { } else {
numjobs = 0; numjobs = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numjobs, 1, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
@ -234,7 +236,7 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
/* pack the job struct */ /* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata, 1, ORTE_JOB))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
@ -249,7 +251,7 @@ static void fm_release(void *cbdata)
OBJ_RELEASE(bptr); OBJ_RELEASE(bptr);
} }
int orte_odls_base_default_construct_child_list(opal_buffer_t *data, int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
orte_jobid_t *job) orte_jobid_t *job)
{ {
int rc; int rc;
@ -271,7 +273,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* unpack the flag to see if additional jobs are included in the data */ /* unpack the flag to see if additional jobs are included in the data */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &n, &cnt, OPAL_INT32))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &n, &cnt, OPAL_INT32))) {
*job = ORTE_JOBID_INVALID; *job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto REPORT_ERROR; goto REPORT_ERROR;
@ -283,7 +285,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
if (0 < n) { if (0 < n) {
/* unpack the buffer containing the info */ /* unpack the buffer containing the info */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &bptr, &cnt, OPAL_BUFFER))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
*job = ORTE_JOBID_INVALID; *job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto REPORT_ERROR; goto REPORT_ERROR;
@ -307,7 +309,8 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
} }
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) { if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND; rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
} }
OBJ_RETAIN(dmn->node); OBJ_RETAIN(dmn->node);
pptr->node = dmn->node; pptr->node = dmn->node;
@ -329,7 +332,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* unpack the job we are to launch */ /* unpack the job we are to launch */
cnt=1; cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jdata, &cnt, ORTE_JOB))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &jdata, &cnt, ORTE_JOB))) {
*job = ORTE_JOBID_INVALID; *job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto REPORT_ERROR; goto REPORT_ERROR;
@ -433,13 +436,14 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
ORTE_VPID_PRINT(pptr->parent)); ORTE_VPID_PRINT(pptr->parent));
if (ORTE_VPID_INVALID == pptr->parent) { if (ORTE_VPID_INVALID == pptr->parent) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
ORTE_FORCED_TERMINATE(ORTE_ERR_BAD_PARAM); rc = ORTE_ERR_BAD_PARAM;
return ORTE_ERR_BAD_PARAM; goto REPORT_ERROR;
} }
/* connect the proc to its node object */ /* connect the proc to its node object */
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) { if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND; rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
} }
OBJ_RETAIN(dmn->node); OBJ_RETAIN(dmn->node);
pptr->node = dmn->node; pptr->node = dmn->node;
@ -502,7 +506,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto REPORT_ERROR; goto REPORT_ERROR;
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
REPORT_ERROR: REPORT_ERROR:
@ -513,7 +516,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
* deal with the hang! * deal with the hang!
*/ */
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_NEVER_LAUNCHED); ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_NEVER_LAUNCHED);
return rc; return rc;
} }

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2006-2013 Los Alamos National Security, LLC. * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *

Просмотреть файл

@ -11,7 +11,7 @@
# All rights reserved. # All rights reserved.
# Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. # Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
# Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2014-2016 Intel, Inc. All rights reserved. # Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
# Copyright (c) 2016 Research Organization for Information Science # Copyright (c) 2016 Research Organization for Information Science
# and Technology (RIST). All rights reserved. # and Technology (RIST). All rights reserved.
# $COPYRIGHT$ # $COPYRIGHT$
@ -59,7 +59,8 @@ headers += \
util/nidmap.h \ util/nidmap.h \
util/regex.h \ util/regex.h \
util/attr.h \ util/attr.h \
util/listener.h util/listener.h \
util/compress.h
lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \ lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \
util/error_strings.c \ util/error_strings.c \
@ -78,7 +79,8 @@ lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \
util/nidmap.c \ util/nidmap.c \
util/regex.c \ util/regex.c \
util/attr.c \ util/attr.c \
util/listener.c util/listener.c \
util/compress.c
# Remove the generated man pages # Remove the generated man pages
distclean-local: distclean-local:

115
orte/util/compress.c Обычный файл
Просмотреть файл

@ -0,0 +1,115 @@
/*
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include <orte_config.h>
#include <stdlib.h>
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#ifdef HAVE_ZLIB_H
#include <zlib.h>
#endif
#include "opal/util/output.h"
#include "compress.h"
#ifdef OPAL_HAVE_ZLIB
bool orte_util_compress_block(uint8_t *inbytes,
size_t inlen,
uint8_t **outbytes,
size_t *olen)
{
z_stream strm;
size_t len;
uint8_t *tmp;
if (inlen < ORTE_COMPRESS_LIMIT) {
return false;
}
/* set default output */
*outbytes = NULL;
*olen = 0;
/* setup the stream */
memset (&strm, 0, sizeof (strm));
deflateInit (&strm, 9);
/* get an upper bound on the required output storage */
len = deflateBound(&strm, inlen);
if (NULL == (tmp = (uint8_t*)malloc(len))) {
return false;
}
strm.next_in = inbytes;
strm.avail_in = inlen;
/* allocating the upper bound guarantees zlib will
* always successfully compress into the available space */
strm.avail_out = len;
strm.next_out = tmp;
deflate (&strm, Z_FINISH);
deflateEnd (&strm);
*outbytes = tmp;
*olen = len - strm.avail_out;
return true; // we did the compression
}
#else
bool orte_util_compress_block(uint8_t *inbytes,
size_t inlen,
uint8_t **outbytes,
size_t *olen)
{
return false; // we did not compress
}
#endif
#if OPAL_HAVE_ZLIB
bool orte_util_uncompress_block(uint8_t **outbytes, size_t olen,
uint8_t *inbytes, size_t len)
{
uint8_t *dest;
z_stream strm;
/* set the default error answer */
*outbytes = NULL;
/* setting destination to the fully decompressed size */
dest = (uint8_t*)malloc(olen);
if (NULL == dest) {
return false;
}
memset (&strm, 0, sizeof (strm));
if (Z_OK != inflateInit(&strm)) {
free(dest);
return false;
}
strm.avail_in = len;
strm.next_in = inbytes;
strm.avail_out = olen;
strm.next_out = dest;
if (Z_STREAM_END != inflate (&strm, Z_FINISH)) {
opal_output(0, "\tDECOMPRESS FAILED: %s", strm.msg);
}
inflateEnd (&strm);
*outbytes = dest;
return true;
}
#else
bool orte_util_uncompress_block(uint8_t **outbytes, size_t olen,
uint8_t *inbytes, size_t len)
{
return false;
}
#endif

53
orte/util/compress.h Обычный файл
Просмотреть файл

@ -0,0 +1,53 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Compress/decompress long data blocks
*/
#ifndef ORTE_COMPRESS_H
#define ORTE_COMPRESS_H
#include <orte_config.h>
BEGIN_C_DECLS
/* define a limit for compression */
#define ORTE_COMPRESS_LIMIT 4096
/**
* Compress a string into a byte object using Zlib
*/
ORTE_DECLSPEC bool orte_util_compress_block(uint8_t *inbytes,
size_t inlen,
uint8_t **outbytes,
size_t *olen);
/**
* Decompress a byte object
*/
ORTE_DECLSPEC bool orte_util_uncompress_block(uint8_t **outbytes, size_t olen,
uint8_t *inbytes, size_t len);
END_C_DECLS
#endif /* ORTE_COMPRESS_H */