48fc339718
onto the backend daemons. By default, let mpirun only pack the app_context info and send that to the backend daemons where the mapping will be done. This significantly reduces the computational time on mpirun as it isn't running up/down the topology tree computing thousands of binding locations, and it reduces the launch message to a very small number of bytes. When running -novm, fall back to the old way of doing things where mpirun computes the entire map and binding, and then sends the full info to the backend daemon. Add a new cmd line option/mca param --fwd-mpirun-port that allows mpirun to dynamically select a port, but then passes that back to all the other daemons so they will use that port as a static port for their own wireup. In this mode, we no longer "phone home" directly to mpirun, but instead use the static port to wireup at daemon start. We then use the routing tree to rollup the initial launch report, and limit the number of open sockets on mpirun's node. Update ras simulator to track the new nidmap code Cleanup some bugs in the nidmap regex code, and enhance the error message for not enough slots to include the host on which the problem is found. Update gadget platform file Initialize the range count when starting a new range Fix the no-np case in managed allocation Ensure DVM node usage gets cleaned up after each job Update scaling.pl script to use --fwd-mpirun-port. Pre-connect the daemon to its parent during launch while we are otherwise waiting for the daemon's children to send their "phone home" rollup messages Signed-off-by: Ralph Castain <rhc@open-mpi.org>
562 строки
20 KiB
C
562 строки
20 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
|
|
* Copyright (c) 2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
/** @file:
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* includes
|
|
*/
|
|
#include "orte_config.h"
|
|
|
|
|
|
#include "opal/dss/dss.h"
|
|
|
|
#include "orte/util/compress.h"
|
|
#include "orte/util/proc_info.h"
|
|
#include "orte/util/error_strings.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/odls/base/base.h"
|
|
#include "orte/mca/rmaps/rmaps_types.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/nidmap.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/grpcomm/grpcomm.h"
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag);
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns);
|
|
|
|
typedef struct {
|
|
opal_object_t super;
|
|
opal_event_t ev;
|
|
orte_grpcomm_signature_t *sig;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_cbfunc_t cbfunc;
|
|
void *cbdata;
|
|
} orte_grpcomm_caddy_t;
|
|
static void gccon(orte_grpcomm_caddy_t *p)
|
|
{
|
|
p->sig = NULL;
|
|
p->buf = NULL;
|
|
p->cbfunc = NULL;
|
|
p->cbdata = NULL;
|
|
}
|
|
static void gcdes(orte_grpcomm_caddy_t *p)
|
|
{
|
|
if (NULL != p->buf) {
|
|
OBJ_RELEASE(p->buf);
|
|
}
|
|
}
|
|
static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
|
|
opal_object_t,
|
|
gccon, gcdes);
|
|
|
|
int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
|
|
orte_rml_tag_t tag,
|
|
opal_buffer_t *msg)
|
|
{
|
|
int rc = ORTE_ERROR;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_vpid_t *dmns;
|
|
size_t ndmns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:xcast sending %u bytes to tag %ld",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
|
|
|
|
/* this function does not access any framework-global data, and
|
|
* so it does not require us to push it into the event library */
|
|
|
|
/* prep the output buffer */
|
|
buf = OBJ_NEW(opal_buffer_t);
|
|
|
|
/* create the array of participating daemons */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
return rc;
|
|
}
|
|
|
|
/* setup the payload */
|
|
if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/* cycle thru the actives and see who can send it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->xcast) {
|
|
if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(buf); // if the module needs to keep the buf, it should OBJ_RETAIN it
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
static void allgather_stub(int fd, short args, void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
|
|
int ret = OPAL_SUCCESS;
|
|
int rc;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_grpcomm_coll_t *coll;
|
|
uint32_t *seq_number;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather stub",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* retrieve an existing tracker, create it if not
|
|
* already found. The allgather module is responsible
|
|
* for releasing it upon completion of the collective */
|
|
ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
|
|
if (OPAL_ERR_NOT_FOUND == ret) {
|
|
seq_number = (uint32_t *)malloc(sizeof(uint32_t));
|
|
*seq_number = 0;
|
|
} else if (OPAL_SUCCESS == ret) {
|
|
*seq_number = *seq_number + 1;
|
|
} else {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot get signature from hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
|
|
if (OPAL_SUCCESS != ret) {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot add new signature to hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
coll = orte_grpcomm_base_get_tracker(cd->sig, true);
|
|
OBJ_RELEASE(cd->sig);
|
|
coll->cbfunc = cd->cbfunc;
|
|
coll->cbdata = cd->cbdata;
|
|
|
|
/* cycle thru the actives and see who can process it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->allgather) {
|
|
if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(cd);
|
|
}
|
|
|
|
int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buf,
|
|
orte_grpcomm_cbfunc_t cbfunc,
|
|
void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* must push this into the event library to ensure we can
|
|
* access framework-global data safely */
|
|
cd = OBJ_NEW(orte_grpcomm_caddy_t);
|
|
/* ensure the data doesn't go away */
|
|
OBJ_RETAIN(buf);
|
|
opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
|
|
cd->buf = buf;
|
|
cd->cbfunc = cbfunc;
|
|
cd->cbdata = cbdata;
|
|
opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
|
|
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
|
|
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
|
|
{
|
|
orte_grpcomm_coll_t *coll;
|
|
int rc;
|
|
orte_namelist_t *nm;
|
|
opal_list_t children;
|
|
size_t n;
|
|
char *routed;
|
|
|
|
/* search the existing tracker list to see if this already exists */
|
|
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
|
|
if (NULL == sig->signature) {
|
|
if (NULL == coll->sig->signature) {
|
|
/* only one collective can operate at a time
|
|
* across every process in the system */
|
|
return coll;
|
|
}
|
|
/* if only one is NULL, then we can't possibly match */
|
|
break;
|
|
}
|
|
if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:returning existing collective",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
return coll;
|
|
}
|
|
}
|
|
/* if we get here, then this is a new collective - so create
|
|
* the tracker for it */
|
|
if (!create) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base: not creating new coll",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return NULL;
|
|
}
|
|
coll = OBJ_NEW(orte_grpcomm_coll_t);
|
|
opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
|
|
|
|
if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
|
|
char *tmp=NULL;
|
|
(void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
|
|
opal_output(0, "%s grpcomm:base: creating new coll for%s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
|
|
free(tmp);
|
|
}
|
|
|
|
opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
|
|
|
|
/* now get the daemons involved */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return NULL;
|
|
}
|
|
|
|
/* get the routed module for our conduit */
|
|
routed = orte_rml.get_routed(orte_coll_conduit);
|
|
if (NULL == routed) {
|
|
/* this conduit is not routed, so we expect all daemons
|
|
* to directly participate */
|
|
coll->nexpected = coll->ndmns;
|
|
} else {
|
|
/* cycle thru the array of daemons and compare them to our
|
|
* children in the routing tree, counting the ones that match
|
|
* so we know how many daemons we should receive contributions from */
|
|
OBJ_CONSTRUCT(&children, opal_list_t);
|
|
orte_routed.get_routing_list(routed, &children);
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (nm->name.vpid == coll->dmns[n]) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&children);
|
|
|
|
/* see if I am in the array of participants - note that I may
|
|
* be in the rollup tree even though I'm not participating
|
|
* in the collective itself */
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return coll;
|
|
}
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns)
|
|
{
|
|
size_t n;
|
|
orte_job_t *jdata;
|
|
orte_proc_t *proc;
|
|
orte_node_t *node;
|
|
int i;
|
|
opal_list_t ds;
|
|
orte_namelist_t *nm;
|
|
orte_vpid_t vpid;
|
|
bool found;
|
|
size_t nds;
|
|
orte_vpid_t *dns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called with %s signature",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == sig->signature) ? "NULL" : "NON-NULL"));
|
|
|
|
/* if NULL == procs, or the target jobid is our own,
|
|
* then all daemons are participating */
|
|
if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
|
|
*ndmns = orte_process_info.num_procs;
|
|
*dmns = NULL;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called for all procs in job %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_JOBID_PRINT(sig->signature[0].jobid)));
|
|
/* all daemons hosting this jobid are participating */
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
|
|
/* we haven't generated a job map yet - if we are the HNP,
|
|
* then we should only involve ourselves. Otherwise, we have
|
|
* no choice but to abort to avoid hangs */
|
|
if (ORTE_PROC_IS_HNP) {
|
|
dns = (orte_vpid_t*)malloc(sizeof(vpid));
|
|
dns[0] = ORTE_PROC_MY_NAME->vpid;
|
|
*ndmns = 1;
|
|
*dmns = dns;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
|
|
nds = 0;
|
|
for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
|
|
if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
|
|
continue;
|
|
}
|
|
if (NULL == node->daemon) {
|
|
/* should never happen */
|
|
ORTE_ERROR_LOG(ORTE_ERROR);
|
|
free(dns);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&node->daemon->name)));
|
|
dns[nds++] = node->daemon->name.vpid;
|
|
}
|
|
} else {
|
|
/* lookup the daemon for each proc and add it to the list, checking to
|
|
* ensure any daemon only gets added once. Yes, this isn't a scalable
|
|
* algo - someone can come up with something better! */
|
|
OBJ_CONSTRUCT(&ds, opal_list_t);
|
|
for (n=0; n < sig->sz; n++) {
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s sign: GETTING PROC OBJECT FOR %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&sig->signature[n])));
|
|
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == proc->node || NULL == proc->node->daemon) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
vpid = proc->node->daemon->name.vpid;
|
|
found = false;
|
|
OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
|
|
if (nm->name.vpid == vpid) {
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!found) {
|
|
nm = OBJ_NEW(orte_namelist_t);
|
|
nm->name.vpid = vpid;
|
|
opal_list_append(&ds, &nm->super);
|
|
}
|
|
}
|
|
if (0 == opal_list_get_size(&ds)) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
|
|
nds = 0;
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&nm->name)));
|
|
dns[nds++] = nm->name.vpid;
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
}
|
|
*dmns = dns;
|
|
*ndmns = nds;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag)
|
|
{
|
|
int rc;
|
|
opal_buffer_t data;
|
|
int8_t flag;
|
|
uint8_t *cmpdata;
|
|
size_t cmplen;
|
|
|
|
/* setup an intermediate buffer */
|
|
OBJ_CONSTRUCT(&data, opal_buffer_t);
|
|
|
|
/* pass along the signature */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pass the final tag */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* copy the payload into the new buffer - this is non-destructive, so our
|
|
* caller is still responsible for releasing any memory in the buffer they
|
|
* gave to us
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* see if we want to compress this message */
|
|
if (orte_util_compress_block((uint8_t*)data.base_ptr, data.bytes_used,
|
|
&cmpdata, &cmplen)) {
|
|
/* the data was compressed - mark that we compressed it */
|
|
flag = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the uncompressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed info */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
} else {
|
|
/* mark that it was not compressed */
|
|
flag = 0;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
return rc;
|
|
}
|
|
/* transfer the payload across */
|
|
opal_dss.copy_payload(buffer, &data);
|
|
OBJ_DESTRUCT(&data);
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"MSG SIZE: %lu", buffer->bytes_used));
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|
|
|
|
unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|