
passed to make it all flow thru the opal/pmix "put/get" operations. Update the PMIx code to latest master to pickup some required behaviors. Remove the no-longer-required get_contact_info and set_contact_info from the RML layer. Add an MCA param to allow the ofi/rml component to route messages if desired. This is mainly for experimentation at this point as we aren't sure if routing wi ll be beneficial at large scales. Leave it "off" by default. Signed-off-by: Ralph Castain <rhc@open-mpi.org>
646 строки
24 KiB
C
646 строки
24 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; -*- */
|
|
/*
|
|
* Copyright (c) 2007 The Trustees of Indiana University.
|
|
* All rights reserved.
|
|
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
|
|
* rights reserved.
|
|
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
|
* Copyright (c) 2014-2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
#include "orte/types.h"
|
|
|
|
#include <string.h>
|
|
|
|
#include "opal/dss/dss.h"
|
|
#include "opal/class/opal_list.h"
|
|
#include "opal/mca/pmix/pmix.h"
|
|
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/rml/base/base.h"
|
|
#include "orte/mca/rml/base/rml_contact.h"
|
|
#include "orte/mca/routed/base/base.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/util/compress.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/nidmap.h"
|
|
#include "orte/util/proc_info.h"
|
|
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
#include "grpcomm_direct.h"
|
|
|
|
|
|
/* Static API's */
|
|
static int init(void);
|
|
static void finalize(void);
|
|
static int xcast(orte_vpid_t *vpids,
|
|
size_t nprocs,
|
|
opal_buffer_t *buf);
|
|
static int allgather(orte_grpcomm_coll_t *coll,
|
|
opal_buffer_t *buf);
|
|
|
|
/* Module def */
|
|
orte_grpcomm_base_module_t orte_grpcomm_direct_module = {
|
|
init,
|
|
finalize,
|
|
xcast,
|
|
allgather
|
|
};
|
|
|
|
/* internal functions */
|
|
static void xcast_recv(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
static void allgather_recv(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
static void barrier_release(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
|
|
/* internal variables */
|
|
static opal_list_t tracker;
|
|
|
|
/**
|
|
* Initialize the module
|
|
*/
|
|
static int init(void)
|
|
{
|
|
OBJ_CONSTRUCT(&tracker, opal_list_t);
|
|
|
|
/* post the receives */
|
|
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
ORTE_RML_TAG_XCAST,
|
|
ORTE_RML_PERSISTENT,
|
|
xcast_recv, NULL);
|
|
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
ORTE_RML_TAG_ALLGATHER_DIRECT,
|
|
ORTE_RML_PERSISTENT,
|
|
allgather_recv, NULL);
|
|
/* setup recv for barrier release */
|
|
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
ORTE_RML_TAG_COLL_RELEASE,
|
|
ORTE_RML_PERSISTENT,
|
|
barrier_release, NULL);
|
|
|
|
return OPAL_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* Finalize the module
|
|
*/
|
|
static void finalize(void)
|
|
{
|
|
OPAL_LIST_DESTRUCT(&tracker);
|
|
return;
|
|
}
|
|
|
|
static int xcast(orte_vpid_t *vpids,
|
|
size_t nprocs,
|
|
opal_buffer_t *buf)
|
|
{
|
|
int rc;
|
|
|
|
/* send it to the HNP (could be myself) for relay */
|
|
OBJ_RETAIN(buf); // we'll let the RML release it
|
|
if (0 > (rc = orte_rml.send_buffer_nb(orte_coll_conduit,
|
|
ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_XCAST,
|
|
orte_rml_send_callback, NULL))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(buf);
|
|
return rc;
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
static int allgather(orte_grpcomm_coll_t *coll,
|
|
opal_buffer_t *buf)
|
|
{
|
|
int rc;
|
|
opal_buffer_t *relay;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct: allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* the base functions pushed us into the event library
|
|
* before calling us, so we can safely access global data
|
|
* at this point */
|
|
|
|
relay = OBJ_NEW(opal_buffer_t);
|
|
/* pack the signature */
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &coll->sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(relay);
|
|
return rc;
|
|
}
|
|
|
|
/* pass along the payload */
|
|
opal_dss.copy_payload(relay, buf);
|
|
|
|
/* send this to ourselves for processing */
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct:allgather sending to ourself",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* send the info to ourselves for tracking */
|
|
rc = orte_rml.send_buffer_nb(orte_coll_conduit,
|
|
ORTE_PROC_MY_NAME, relay,
|
|
ORTE_RML_TAG_ALLGATHER_DIRECT,
|
|
orte_rml_send_callback, NULL);
|
|
return rc;
|
|
}
|
|
|
|
static void allgather_recv(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata)
|
|
{
|
|
int32_t cnt;
|
|
int rc, ret;
|
|
orte_grpcomm_signature_t *sig;
|
|
opal_buffer_t *reply;
|
|
orte_grpcomm_coll_t *coll;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct allgather recvd from %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(sender)));
|
|
|
|
/* unpack the signature */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return;
|
|
}
|
|
|
|
/* check for the tracker and create it if not found */
|
|
if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, true))) {
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
OBJ_RELEASE(sig);
|
|
return;
|
|
}
|
|
|
|
/* increment nprocs reported for collective */
|
|
coll->nreported++;
|
|
/* capture any provided content */
|
|
opal_dss.copy_payload(&coll->bucket, buffer);
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(int)coll->nexpected, (int)coll->nreported));
|
|
|
|
/* see if everyone has reported */
|
|
if (coll->nreported == coll->nexpected) {
|
|
if (ORTE_PROC_IS_HNP) {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct allgather HNP reports complete",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
/* the allgather is complete - send the xcast */
|
|
reply = OBJ_NEW(opal_buffer_t);
|
|
/* pack the signature */
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(reply);
|
|
OBJ_RELEASE(sig);
|
|
return;
|
|
}
|
|
/* pack the status - success since the allgather completed. This
|
|
* would be an error if we timeout instead */
|
|
ret = ORTE_SUCCESS;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(reply);
|
|
OBJ_RELEASE(sig);
|
|
return;
|
|
}
|
|
/* transfer the collected bucket */
|
|
opal_dss.copy_payload(reply, &coll->bucket);
|
|
/* send the release via xcast */
|
|
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
|
|
OBJ_RELEASE(reply);
|
|
} else {
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct allgather rollup complete - sending to %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
|
|
/* relay the bucket upward */
|
|
reply = OBJ_NEW(opal_buffer_t);
|
|
/* pack the signature */
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
OBJ_RELEASE(reply);
|
|
OBJ_RELEASE(sig);
|
|
return;
|
|
}
|
|
/* transfer the collected bucket */
|
|
opal_dss.copy_payload(reply, &coll->bucket);
|
|
/* send the info to our parent */
|
|
rc = orte_rml.send_buffer_nb(orte_coll_conduit,
|
|
ORTE_PROC_MY_PARENT, reply,
|
|
ORTE_RML_TAG_ALLGATHER_DIRECT,
|
|
orte_rml_send_callback, NULL);
|
|
}
|
|
}
|
|
OBJ_RELEASE(sig);
|
|
}
|
|
|
|
static void xcast_recv(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tg,
|
|
void* cbdata)
|
|
{
|
|
opal_list_item_t *item;
|
|
orte_namelist_t *nm;
|
|
int ret, cnt;
|
|
opal_buffer_t *relay=NULL, *rly;
|
|
orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
|
|
opal_buffer_t wireup, datbuf, *data;
|
|
opal_byte_object_t *bo;
|
|
int8_t flag;
|
|
orte_job_t *jdata;
|
|
orte_proc_t *rec;
|
|
opal_list_t coll;
|
|
orte_grpcomm_signature_t *sig;
|
|
orte_rml_tag_t tag;
|
|
char *rtmod, *nidmap;
|
|
size_t inlen, cmplen;
|
|
uint8_t *packed_data, *cmpdata;
|
|
int32_t nvals, i;
|
|
opal_value_t *kv;
|
|
orte_process_name_t dmn;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct:xcast:recv: with %d bytes",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(int)buffer->bytes_used));
|
|
|
|
/* we need a passthru buffer to send to our children - we leave it
|
|
* as compressed data */
|
|
rly = OBJ_NEW(opal_buffer_t);
|
|
opal_dss.copy_payload(rly, buffer);
|
|
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
|
|
/* setup the relay list */
|
|
OBJ_CONSTRUCT(&coll, opal_list_t);
|
|
|
|
/* unpack the flag to see if this payload is compressed */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
return;
|
|
}
|
|
if (flag) {
|
|
/* unpack the data size */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
return;
|
|
}
|
|
/* unpack the unpacked data size */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
return;
|
|
}
|
|
/* allocate the space */
|
|
packed_data = (uint8_t*)malloc(inlen);
|
|
/* unpack the data blob */
|
|
cnt = inlen;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, packed_data, &cnt, OPAL_UINT8))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
free(packed_data);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
return;
|
|
}
|
|
/* decompress the data */
|
|
if (orte_util_uncompress_block(&cmpdata, cmplen,
|
|
packed_data, inlen)) {
|
|
/* the data has been uncompressed */
|
|
opal_dss.load(&datbuf, cmpdata, cmplen);
|
|
data = &datbuf;
|
|
} else {
|
|
data = buffer;
|
|
}
|
|
free(packed_data);
|
|
} else {
|
|
data = buffer;
|
|
}
|
|
|
|
/* get the signature that we do not need */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
return;
|
|
}
|
|
OBJ_RELEASE(sig);
|
|
|
|
/* get the target tag */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_DESTRUCT(&datbuf);
|
|
OBJ_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly);
|
|
ORTE_FORCED_TERMINATE(ret);
|
|
return;
|
|
}
|
|
|
|
/* get our conduit's routed module name */
|
|
rtmod = orte_rml.get_routed(orte_coll_conduit);
|
|
|
|
/* if this is headed for the daemon command processor,
|
|
* then we first need to check for add_local_procs
|
|
* as that command includes some needed wireup info */
|
|
if (ORTE_RML_TAG_DAEMON == tag) {
|
|
/* peek at the command */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS == (ret = opal_dss.unpack(data, &command, &cnt, ORTE_DAEMON_CMD))) {
|
|
/* if it is an exit cmd, then flag that we are quitting so we will properly
|
|
* handle connection losses from our downstream peers */
|
|
if (ORTE_DAEMON_EXIT_CMD == command ||
|
|
ORTE_DAEMON_HALT_VM_CMD == command) {
|
|
orte_orteds_term_ordered = true;
|
|
if (ORTE_DAEMON_HALT_VM_CMD == command) {
|
|
/* this is an abnormal termination */
|
|
orte_abnormal_term_ordered = true;
|
|
}
|
|
/* copy the msg for relay to ourselves */
|
|
relay = OBJ_NEW(opal_buffer_t);
|
|
/* repack the command */
|
|
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
opal_dss.copy_payload(relay, data);
|
|
} else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
|
|
ORTE_DAEMON_DVM_NIDMAP_CMD == command ||
|
|
ORTE_DAEMON_DVM_ADD_PROCS == command) {
|
|
/* setup our internal relay buffer */
|
|
relay = OBJ_NEW(opal_buffer_t);
|
|
/* repack the command */
|
|
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
/* unpack the nidmap string - may be NULL */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &nidmap, &cnt, OPAL_STRING))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
if (NULL != nidmap) {
|
|
if (ORTE_SUCCESS != (ret = orte_util_nidmap_parse(nidmap))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
free(nidmap);
|
|
}
|
|
/* see if they included info on node capabilities */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
if (0 != flag) {
|
|
/* update our local nidmap, if required - the decode function
|
|
* knows what to do
|
|
*/
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct:xcast updating daemon nidmap",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
|
|
if (!ORTE_PROC_IS_HNP) {
|
|
/* update the routing plan - the HNP already did
|
|
* it when it computed the VM, so don't waste time
|
|
* re-doing it here */
|
|
orte_routed.update_routing_plan(rtmod);
|
|
}
|
|
/* routing is now possible */
|
|
orte_routed_base.routing_enabled = true;
|
|
|
|
/* unpack the byte object */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
if (0 < bo->size) {
|
|
/* load it into a buffer */
|
|
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
|
|
opal_dss.load(&wireup, bo->bytes, bo->size);
|
|
/* decode it, pushing the info into our database */
|
|
cnt=1;
|
|
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
break;
|
|
}
|
|
for (i=0; i < nvals; i++) {
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv, &cnt, OPAL_VALUE))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
break;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s STORING MODEX DATA FOR PROC %s KEY %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&dmn), kv->key));
|
|
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kv))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(kv);
|
|
break;
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
}
|
|
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
|
|
ORTE_ERROR_LOG(ret);
|
|
}
|
|
/* done with the wireup buffer - dump it */
|
|
OBJ_DESTRUCT(&wireup);
|
|
}
|
|
free(bo);
|
|
}
|
|
/* copy the remainder of the payload - we don't pass wiring info
|
|
* to the odls */
|
|
opal_dss.copy_payload(relay, data);
|
|
} else {
|
|
relay = OBJ_NEW(opal_buffer_t);
|
|
/* repack the command */
|
|
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto relay;
|
|
}
|
|
/* copy the msg for relay to ourselves */
|
|
opal_dss.copy_payload(relay, data);
|
|
}
|
|
} else {
|
|
ORTE_ERROR_LOG(ret);
|
|
goto CLEANUP;
|
|
}
|
|
} else {
|
|
/* copy the msg for relay to ourselves */
|
|
relay = OBJ_NEW(opal_buffer_t);
|
|
opal_dss.copy_payload(relay, data);
|
|
}
|
|
|
|
relay:
|
|
if (!orte_do_not_launch) {
|
|
/* get the list of next recipients from the routed module */
|
|
orte_routed.get_routing_list(rtmod, &coll);
|
|
|
|
/* if list is empty, no relay is required */
|
|
if (opal_list_is_empty(&coll)) {
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct:send_relay - recipient list is empty!",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
goto CLEANUP;
|
|
}
|
|
|
|
/* send the message to each recipient on list, deconstructing it as we go */
|
|
while (NULL != (item = opal_list_remove_first(&coll))) {
|
|
nm = (orte_namelist_t*)item;
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
|
|
ORTE_NAME_PRINT(&nm->name)));
|
|
OBJ_RETAIN(rly);
|
|
/* check the state of the recipient - no point
|
|
* sending to someone not alive
|
|
*/
|
|
jdata = orte_get_job_data_object(nm->name.jobid);
|
|
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
|
|
if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
|
|
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
|
|
}
|
|
OBJ_RELEASE(rly);
|
|
OBJ_RELEASE(item);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
|
|
continue;
|
|
}
|
|
if ((ORTE_PROC_STATE_RUNNING < rec->state &&
|
|
ORTE_PROC_STATE_CALLED_ABORT != rec->state) ||
|
|
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
|
|
if (!orte_abnormal_term_ordered && !orte_orteds_term_ordered) {
|
|
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay: %s ",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name),
|
|
ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE) ? orte_proc_state_to_str(rec->state) : "NOT ALIVE");
|
|
}
|
|
OBJ_RELEASE(rly);
|
|
OBJ_RELEASE(item);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
|
|
continue;
|
|
}
|
|
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
|
|
&nm->name, rly, ORTE_RML_TAG_XCAST,
|
|
orte_rml_send_callback, NULL))) {
|
|
ORTE_ERROR_LOG(ret);
|
|
OBJ_RELEASE(rly);
|
|
OBJ_RELEASE(item);
|
|
ORTE_FORCED_TERMINATE(ORTE_ERR_UNREACH);
|
|
continue;
|
|
}
|
|
OBJ_RELEASE(item);
|
|
}
|
|
}
|
|
|
|
CLEANUP:
|
|
/* cleanup */
|
|
OPAL_LIST_DESTRUCT(&coll);
|
|
OBJ_RELEASE(rly); // retain accounting
|
|
|
|
/* now pass the relay buffer to myself for processing - don't
|
|
* inject it into the RML system via send as that will compete
|
|
* with the relay messages down in the OOB. Instead, pass it
|
|
* directly to the RML message processor */
|
|
if (ORTE_DAEMON_DVM_NIDMAP_CMD != command) {
|
|
ORTE_RML_POST_MESSAGE(ORTE_PROC_MY_NAME, tag, 1,
|
|
relay->base_ptr, relay->bytes_used);
|
|
relay->base_ptr = NULL;
|
|
relay->bytes_used = 0;
|
|
}
|
|
if (NULL != relay) {
|
|
OBJ_RELEASE(relay);
|
|
}
|
|
OBJ_DESTRUCT(&datbuf);
|
|
}
|
|
|
|
static void barrier_release(int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata)
|
|
{
|
|
int32_t cnt;
|
|
int rc, ret;
|
|
orte_grpcomm_signature_t *sig;
|
|
orte_grpcomm_coll_t *coll;
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:direct: barrier release called with %d bytes",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buffer->bytes_used));
|
|
|
|
/* unpack the signature */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return;
|
|
}
|
|
|
|
/* unpack the return status */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
return;
|
|
}
|
|
|
|
/* check for the tracker - it is not an error if not
|
|
* found as that just means we wre not involved
|
|
* in the collective */
|
|
if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, false))) {
|
|
OBJ_RELEASE(sig);
|
|
return;
|
|
}
|
|
|
|
/* execute the callback */
|
|
if (NULL != coll->cbfunc) {
|
|
coll->cbfunc(ret, buffer, coll->cbdata);
|
|
}
|
|
opal_list_remove_item(&orte_grpcomm_base.ongoing, &coll->super);
|
|
OBJ_RELEASE(coll);
|
|
OBJ_RELEASE(sig);
|
|
}
|