
We've been fighting the battle of trying to create a regex generator and parser that can handle arbitrary hostname schemes - without long-term success. The worst of it is that there is no way of checking to see if the computed regex is correct short of parsing it and doing a character-by-character comparison with the original string. Ugh...there has to be a better solution. One option is to investigate using 3rd-party regex libraries as those are coming from communities whose sole focus is resolving that problem. However, someone would need to spend the time to investigate it, and we'd have to find a license-friendly implementation. Another option is to quit beating our heads against the wall and just compress the information. It won't be as much of a reduction, but we also won't keep hitting scenarios where things break. In this case, it seems that "perfection" is definitely the enemy of "good enough". This PR implements the compression option while retaining the possibility of people adding regex-generating components. The compression code used in ORTE is consolidated into the opal/compress framework. That framework currently held bzip and gzip components for use in compressing checkpoint files - since we no longer support C/R, I have .opal_ignore'd those components. However, I have left the original framework APIs alone in case someone ever decides to redo C/R. The APIs of interest here are added to the framework - specifically, the "compress_block" and "decompress_block" functions. I then moved the ORTE zlib compression code into a new component in this framework. Unfortunately, the framework currently is a single-select one - i.e., only one active component at a time. Since I .opal_ignore'd the other two and made the priority of zlib high, this isn't a problem. However, if someone wants to re-enable bzip/gzip or add another component, they might need to transition opal/compress to a multi-select framework. Included changes: * Consolidate the compression code into the opal/compress framework * Move the ORTE zlib compression code into a new opal/compress/zlib component * Ignore the bzip and gzip components in opal/compress framework * Add a "compress_base_limit" MCA param to set the threshold above which we compress data - defaults to 4096 bytes * Delete stale brucks and rcd components from orte/grpcomm framework * Delete the orte/regx framework * Update the launch system to use opal/compress instead of string regex * Provide a default module if no zlib is available * Fix some misc multi-node issues * Properly generate the nidmap in response to a "connection warmup" message so the remote daemon knows the children it needs to launch. * Remove stale references to orte_node_regex * opal_byte_object_t's are not OPAL objects - properly release allocated memory. * Set the topology * Currently only handling homogeneous case * Update the compress framework files to conform * Consolidate open/close into one "frame" file. Ensure we open/close the framework Signed-off-by: Ralph Castain <rhc@pmix.org>
570 строки
20 KiB
C
570 строки
20 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2016-2019 Intel, Inc. All rights reserved.
|
|
* Copyright (c) 2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
/** @file:
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* includes
|
|
*/
|
|
#include "orte_config.h"
|
|
|
|
|
|
#include "opal/dss/dss.h"
|
|
|
|
#include "opal/mca/compress/compress.h"
|
|
#include "orte/util/proc_info.h"
|
|
#include "orte/util/error_strings.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/odls/base/base.h"
|
|
#include "orte/mca/rmaps/rmaps_types.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/threads.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/grpcomm/grpcomm.h"
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag);
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns);
|
|
|
|
typedef struct {
|
|
opal_object_t super;
|
|
opal_event_t ev;
|
|
orte_grpcomm_signature_t *sig;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_cbfunc_t cbfunc;
|
|
void *cbdata;
|
|
} orte_grpcomm_caddy_t;
|
|
static void gccon(orte_grpcomm_caddy_t *p)
|
|
{
|
|
p->sig = NULL;
|
|
p->buf = NULL;
|
|
p->cbfunc = NULL;
|
|
p->cbdata = NULL;
|
|
}
|
|
static void gcdes(orte_grpcomm_caddy_t *p)
|
|
{
|
|
if (NULL != p->buf) {
|
|
OBJ_RELEASE(p->buf);
|
|
}
|
|
}
|
|
static OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
|
|
opal_object_t,
|
|
gccon, gcdes);
|
|
|
|
int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
|
|
orte_rml_tag_t tag,
|
|
opal_buffer_t *msg)
|
|
{
|
|
int rc = ORTE_ERROR;
|
|
opal_buffer_t *buf;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_vpid_t *dmns;
|
|
size_t ndmns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:xcast sending %u bytes to tag %ld",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == msg) ? 0 : (unsigned int)msg->bytes_used, (long)tag));
|
|
|
|
/* this function does not access any framework-global data, and
|
|
* so it does not require us to push it into the event library */
|
|
|
|
/* prep the output buffer */
|
|
buf = OBJ_NEW(opal_buffer_t);
|
|
|
|
/* create the array of participating daemons */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &dmns, &ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
return rc;
|
|
}
|
|
|
|
/* setup the payload */
|
|
if (ORTE_SUCCESS != (rc = pack_xcast(sig, buf, msg, tag))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/* cycle thru the actives and see who can send it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->xcast) {
|
|
if (ORTE_SUCCESS == (rc = active->module->xcast(dmns, ndmns, buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(buf); // if the module needs to keep the buf, it should OBJ_RETAIN it
|
|
if (NULL != dmns) {
|
|
free(dmns);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
static void allgather_stub(int fd, short args, void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
|
|
int ret = OPAL_SUCCESS;
|
|
int rc;
|
|
orte_grpcomm_base_active_t *active;
|
|
orte_grpcomm_coll_t *coll;
|
|
uint32_t *seq_number;
|
|
|
|
ORTE_ACQUIRE_OBJECT(cd);
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather stub",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* retrieve an existing tracker, create it if not
|
|
* already found. The allgather module is responsible
|
|
* for releasing it upon completion of the collective */
|
|
ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void **)&seq_number);
|
|
if (OPAL_ERR_NOT_FOUND == ret) {
|
|
seq_number = (uint32_t *)malloc(sizeof(uint32_t));
|
|
*seq_number = 0;
|
|
} else if (OPAL_SUCCESS == ret) {
|
|
*seq_number = *seq_number + 1;
|
|
} else {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot get signature from hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)seq_number);
|
|
if (OPAL_SUCCESS != ret) {
|
|
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
|
|
"%s rpcomm:base:allgather cannot add new signature to hash table",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
coll = orte_grpcomm_base_get_tracker(cd->sig, true);
|
|
if (NULL == coll) {
|
|
OBJ_RELEASE(cd->sig);
|
|
OBJ_RELEASE(cd);
|
|
return;
|
|
}
|
|
OBJ_RELEASE(cd->sig);
|
|
coll->cbfunc = cd->cbfunc;
|
|
coll->cbdata = cd->cbdata;
|
|
|
|
/* cycle thru the actives and see who can process it */
|
|
OPAL_LIST_FOREACH(active, &orte_grpcomm_base.actives, orte_grpcomm_base_active_t) {
|
|
if (NULL != active->module->allgather) {
|
|
if (ORTE_SUCCESS == (rc = active->module->allgather(coll, cd->buf))) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
OBJ_RELEASE(cd);
|
|
}
|
|
|
|
int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buf,
|
|
orte_grpcomm_cbfunc_t cbfunc,
|
|
void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *cd;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* must push this into the event library to ensure we can
|
|
* access framework-global data safely */
|
|
cd = OBJ_NEW(orte_grpcomm_caddy_t);
|
|
/* ensure the data doesn't go away */
|
|
OBJ_RETAIN(buf);
|
|
opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
|
|
cd->buf = buf;
|
|
cd->cbfunc = cbfunc;
|
|
cd->cbdata = cbdata;
|
|
opal_event_set(orte_event_base, &cd->ev, -1, OPAL_EV_WRITE, allgather_stub, cd);
|
|
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
|
|
ORTE_POST_OBJECT(cd);
|
|
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1);
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig, bool create)
|
|
{
|
|
orte_grpcomm_coll_t *coll;
|
|
int rc;
|
|
orte_namelist_t *nm;
|
|
opal_list_t children;
|
|
size_t n;
|
|
char *routed;
|
|
|
|
/* search the existing tracker list to see if this already exists */
|
|
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
|
|
if (NULL == sig->signature) {
|
|
if (NULL == coll->sig->signature) {
|
|
/* only one collective can operate at a time
|
|
* across every process in the system */
|
|
return coll;
|
|
}
|
|
/* if only one is NULL, then we can't possibly match */
|
|
break;
|
|
}
|
|
if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:returning existing collective",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
return coll;
|
|
}
|
|
}
|
|
/* if we get here, then this is a new collective - so create
|
|
* the tracker for it */
|
|
if (!create) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base: not creating new coll",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return NULL;
|
|
}
|
|
coll = OBJ_NEW(orte_grpcomm_coll_t);
|
|
opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
|
|
|
|
if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
|
|
char *tmp=NULL;
|
|
(void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
|
|
opal_output(0, "%s grpcomm:base: creating new coll for%s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
|
|
free(tmp);
|
|
}
|
|
|
|
opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
|
|
|
|
/* now get the daemons involved */
|
|
if (ORTE_SUCCESS != (rc = create_dmns(sig, &coll->dmns, &coll->ndmns))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return NULL;
|
|
}
|
|
|
|
/* get the routed module for our conduit */
|
|
routed = orte_rml.get_routed(orte_coll_conduit);
|
|
if (NULL == routed) {
|
|
/* this conduit is not routed, so we expect all daemons
|
|
* to directly participate */
|
|
coll->nexpected = coll->ndmns;
|
|
} else {
|
|
/* cycle thru the array of daemons and compare them to our
|
|
* children in the routing tree, counting the ones that match
|
|
* so we know how many daemons we should receive contributions from */
|
|
OBJ_CONSTRUCT(&children, opal_list_t);
|
|
orte_routed.get_routing_list(routed, &children);
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (nm->name.vpid == coll->dmns[n]) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&children);
|
|
|
|
/* see if I am in the array of participants - note that I may
|
|
* be in the rollup tree even though I'm not participating
|
|
* in the collective itself */
|
|
for (n=0; n < coll->ndmns; n++) {
|
|
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
|
|
coll->nexpected++;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return coll;
|
|
}
|
|
|
|
static int create_dmns(orte_grpcomm_signature_t *sig,
|
|
orte_vpid_t **dmns, size_t *ndmns)
|
|
{
|
|
size_t n;
|
|
orte_job_t *jdata;
|
|
orte_proc_t *proc;
|
|
orte_node_t *node;
|
|
int i;
|
|
opal_list_t ds;
|
|
orte_namelist_t *nm;
|
|
orte_vpid_t vpid;
|
|
bool found;
|
|
size_t nds;
|
|
orte_vpid_t *dns;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called with %s signature",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == sig->signature) ? "NULL" : "NON-NULL"));
|
|
|
|
/* if NULL == procs, or the target jobid is our own,
|
|
* then all daemons are participating */
|
|
if (NULL == sig->signature || ORTE_PROC_MY_NAME->jobid == sig->signature[0].jobid) {
|
|
*ndmns = orte_process_info.num_procs;
|
|
*dmns = NULL;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
if (ORTE_VPID_WILDCARD == sig->signature[0].vpid) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns called for all procs in job %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_JOBID_PRINT(sig->signature[0].jobid)));
|
|
/* all daemons hosting this jobid are participating */
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == jdata->map || 0 == jdata->map->num_nodes) {
|
|
/* we haven't generated a job map yet - if we are the HNP,
|
|
* then we should only involve ourselves. Otherwise, we have
|
|
* no choice but to abort to avoid hangs */
|
|
if (ORTE_PROC_IS_HNP) {
|
|
dns = (orte_vpid_t*)malloc(sizeof(vpid));
|
|
dns[0] = ORTE_PROC_MY_NAME->vpid;
|
|
*ndmns = 1;
|
|
*dmns = dns;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(jdata->map->num_nodes * sizeof(vpid));
|
|
nds = 0;
|
|
for (i=0; i < jdata->map->nodes->size && (int)nds < jdata->map->num_nodes; i++) {
|
|
if (NULL == (node = opal_pointer_array_get_item(jdata->map->nodes, i))) {
|
|
continue;
|
|
}
|
|
if (NULL == node->daemon) {
|
|
/* should never happen */
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
free(dns);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&node->daemon->name)));
|
|
dns[nds++] = node->daemon->name.vpid;
|
|
}
|
|
} else {
|
|
/* lookup the daemon for each proc and add it to the list, checking to
|
|
* ensure any daemon only gets added once. Yes, this isn't a scalable
|
|
* algo - someone can come up with something better! */
|
|
OBJ_CONSTRUCT(&ds, opal_list_t);
|
|
for (n=0; n < sig->sz; n++) {
|
|
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s sign: GETTING PROC OBJECT FOR %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&sig->signature[n])));
|
|
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
if (NULL == proc->node || NULL == proc->node->daemon) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
vpid = proc->node->daemon->name.vpid;
|
|
found = false;
|
|
OPAL_LIST_FOREACH(nm, &ds, orte_namelist_t) {
|
|
if (nm->name.vpid == vpid) {
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!found) {
|
|
nm = OBJ_NEW(orte_namelist_t);
|
|
nm->name.vpid = vpid;
|
|
opal_list_append(&ds, &nm->super);
|
|
}
|
|
}
|
|
if (0 == opal_list_get_size(&ds)) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
|
|
*ndmns = 0;
|
|
*dmns = NULL;
|
|
return ORTE_ERR_NOT_FOUND;
|
|
}
|
|
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
|
|
nds = 0;
|
|
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&ds))) {
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:create_dmns adding daemon %s to array",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&nm->name)));
|
|
dns[nds++] = nm->name.vpid;
|
|
OBJ_RELEASE(nm);
|
|
}
|
|
OPAL_LIST_DESTRUCT(&ds);
|
|
}
|
|
*dmns = dns;
|
|
*ndmns = nds;
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
static int pack_xcast(orte_grpcomm_signature_t *sig,
|
|
opal_buffer_t *buffer,
|
|
opal_buffer_t *message,
|
|
orte_rml_tag_t tag)
|
|
{
|
|
int rc;
|
|
opal_buffer_t data;
|
|
int8_t flag;
|
|
uint8_t *cmpdata;
|
|
size_t cmplen;
|
|
|
|
/* setup an intermediate buffer */
|
|
OBJ_CONSTRUCT(&data, opal_buffer_t);
|
|
|
|
/* pass along the signature */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pass the final tag */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&data, &tag, 1, ORTE_RML_TAG))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* copy the payload into the new buffer - this is non-destructive, so our
|
|
* caller is still responsible for releasing any memory in the buffer they
|
|
* gave to us
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&data, message))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
|
|
/* see if we want to compress this message */
|
|
if (opal_compress.compress_block((uint8_t*)data.base_ptr, data.bytes_used,
|
|
&cmpdata, &cmplen)) {
|
|
/* the data was compressed - mark that we compressed it */
|
|
flag = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &cmplen, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the uncompressed length */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &data.bytes_used, 1, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
/* pack the compressed info */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, cmpdata, cmplen, OPAL_UINT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
free(cmpdata);
|
|
OBJ_DESTRUCT(&data);
|
|
return rc;
|
|
}
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
} else {
|
|
/* mark that it was not compressed */
|
|
flag = 0;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &flag, 1, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_DESTRUCT(&data);
|
|
free(cmpdata);
|
|
return rc;
|
|
}
|
|
/* transfer the payload across */
|
|
opal_dss.copy_payload(buffer, &data);
|
|
OBJ_DESTRUCT(&data);
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"MSG SIZE: %lu", buffer->bytes_used));
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
void orte_grpcomm_base_mark_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
opal_bitmap_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|
|
|
|
unsigned int orte_grpcomm_base_check_distance_recv (orte_grpcomm_coll_t *coll,
|
|
uint32_t distance) {
|
|
return opal_bitmap_is_set_bit (&coll->distance_mask_recv, distance);
|
|
}
|