1
1
openmpi/orte/mca/grpcomm/direct/grpcomm_direct.c
Ralph Castain 230d15f0d9 Cleanup the ras simulator capability, and the relay route thru grpcomm
direct. Don't resend wireup info if nothing has changed

Fix release of buffer

Correct the unpacking order

Fix the DVM - now minimized data transfer to it

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
2017-02-01 15:01:58 -08:00

603 строки
22 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (c) 2007 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#include "opal/dss/dss.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/base/base.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/base/base.h"
#include "orte/mca/state/state.h"
#include "orte/util/compress.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/proc_info.h"
#include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_direct.h"
/* Static API's */
static int init(void);
static void finalize(void);
static int xcast(orte_vpid_t *vpids,
size_t nprocs,
opal_buffer_t *buf);
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_direct_module = {
init,
finalize,
xcast,
allgather
};
/* internal functions */
static void xcast_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void allgather_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void barrier_release(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/* internal variables */
static opal_list_t tracker;
/**
* Initialize the module
*/
static int init(void)
{
OBJ_CONSTRUCT(&tracker, opal_list_t);
/* post the receives */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_XCAST,
ORTE_RML_PERSISTENT,
xcast_recv, NULL);
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_ALLGATHER_DIRECT,
ORTE_RML_PERSISTENT,
allgather_recv, NULL);
/* setup recv for barrier release */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_COLL_RELEASE,
ORTE_RML_PERSISTENT,
barrier_release, NULL);
return OPAL_SUCCESS;
}
/**
* Finalize the module
*/
static void finalize(void)
{
OPAL_LIST_DESTRUCT(&tracker);
return;
}
static int xcast(orte_vpid_t *vpids,
size_t nprocs,
opal_buffer_t *buf)
{
int rc;
/* send it to the HNP (could be myself) for relay */
OBJ_RETAIN(buf); // we'll let the RML release it
if (0 > (rc = orte_rml.send_buffer_nb(orte_coll_conduit,
ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_XCAST,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
return ORTE_SUCCESS;
}
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf)
{
int rc;
opal_buffer_t *relay;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct: allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the base functions pushed us into the event library
* before calling us, so we can safely access global data
* at this point */
relay = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &coll->sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return rc;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
/* send this to ourselves for processing */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:allgather sending to ourself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* send the info to ourselves for tracking */
rc = orte_rml.send_buffer_nb(orte_coll_conduit,
ORTE_PROC_MY_NAME, relay,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
return rc;
}
static void allgather_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int32_t cnt;
int rc, ret;
orte_grpcomm_signature_t *sig;
opal_buffer_t *reply;
orte_grpcomm_coll_t *coll;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the signature */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
return;
}
/* check for the tracker and create it if not found */
if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, true))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(sig);
return;
}
/* increment nprocs reported for collective */
coll->nreported++;
/* capture any provided content */
opal_dss.copy_payload(&coll->bucket, buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->nexpected, (int)coll->nreported));
/* see if everyone has reported */
if (coll->nreported == coll->nexpected) {
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather HNP reports complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the allgather is complete - send the xcast */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
} else {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather rollup complete - sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
/* relay the bucket upward */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the info to our parent */
rc = orte_rml.send_buffer_nb(orte_coll_conduit,
ORTE_PROC_MY_PARENT, reply,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
}
}
OBJ_RELEASE(sig);
}
static void xcast_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tg,
void* cbdata)
{
opal_list_item_t *item;
orte_namelist_t *nm;
int ret, cnt;
opal_buffer_t *relay=NULL, *rly;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
opal_buffer_t wireup, datbuf, *data;
opal_byte_object_t *bo;
int8_t flag;
orte_job_t *jdata;
orte_proc_t *rec;
opal_list_t coll;
orte_grpcomm_signature_t *sig;
orte_rml_tag_t tag;
char *rtmod;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast:recv: with %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)buffer->bytes_used));
/* we need a passthru buffer to send to our children - we leave it
* as compressed data */
rly = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(rly, buffer);
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* setup the relay list */
OBJ_CONSTRUCT(&coll, opal_list_t);
/* unpack the flag to see if this payload is compressed */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
if (flag) {
/* unpack the data size */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* unpack the unpacked data size */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* allocate the space */
packed_data = (uint8_t*)malloc(inlen);
/* unpack the data blob */
cnt = inlen;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, packed_data, &cnt, OPAL_UINT8))) {
ORTE_ERROR_LOG(ret);
free(packed_data);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* decompress the data */
if (orte_util_uncompress_block(&cmpdata, cmplen,
packed_data, inlen)) {
/* the data has been uncompressed */
opal_dss.load(&datbuf, cmpdata, cmplen);
data = &datbuf;
} else {
data = buffer;
}
free(packed_data);
} else {
data = buffer;
}
/* get the signature that we do not need */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
ORTE_FORCED_TERMINATE(ret);
return;
}
OBJ_RELEASE(sig);
/* get the target tag */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
ORTE_FORCED_TERMINATE(ret);
return;
}
/* get our conduit's routed module name */
rtmod = orte_rml.get_routed(orte_coll_conduit);
/* if this is headed for the daemon command processor,
* then we first need to check for add_local_procs
* as that command includes some needed wireup info */
if (ORTE_RML_TAG_DAEMON == tag) {
/* peek at the command */
cnt=1;
if (ORTE_SUCCESS == (ret = opal_dss.unpack(data, &command, &cnt, ORTE_DAEMON_CMD))) {
/* if it is an exit cmd, then flag that we are quitting so we will properly
* handle connection losses from our downstream peers */
if (ORTE_DAEMON_EXIT_CMD == command ||
ORTE_DAEMON_HALT_VM_CMD == command) {
orte_orteds_term_ordered = true;
/* copy the msg for relay to ourselves */
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
opal_dss.copy_payload(relay, data);
} else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
ORTE_DAEMON_DVM_NIDMAP_CMD == command) {
/* setup our internal relay buffer */
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
/* see if any daemons were launched */
cnt = 1;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
/* add it to our relay buffer as we will need it later */
opal_dss.pack(relay, &flag, 1, OPAL_INT8);
if (0 != flag) {
/* update our local nidmap, if required - the decode function
* knows what to do
*/
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast updating daemon nidmap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (!ORTE_PROC_IS_HNP) {
/* update the routing plan - the HNP already did
* it when it computed the VM, so don't waste time
* re-doing it here */
orte_routed.update_routing_plan(rtmod);
}
/* routing is now possible */
orte_routed_base.routing_enabled = true;
/* see if we have wiring info as well */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 != flag) {
/* unpack the byte object */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 < bo->size) {
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* pass it for processing */
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&wireup);
goto relay;
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
free(bo);
}
}
/* copy the remainder of the payload - we don't pass wiring info
* to the odls */
opal_dss.copy_payload(relay, data);
} else {
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
/* copy the msg for relay to ourselves */
opal_dss.copy_payload(relay, data);
}
} else {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
} else {
/* copy the msg for relay to ourselves */
relay = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(relay, data);
}
relay:
if (!orte_do_not_launch) {
/* get the list of next recipients from the routed module */
orte_routed.get_routing_list(rtmod, &coll);
/* if list is empty, no relay is required */
if (opal_list_is_empty(&coll)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay - recipient list is empty!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto CLEANUP;
}
/* send the message to each recipient on list, deconstructing it as we go */
while (NULL != (item = opal_list_remove_first(&coll))) {
nm = (orte_namelist_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
ORTE_NAME_PRINT(&nm->name)));
OBJ_RETAIN(rly);
/* check the state of the recipient - no point
* sending to someone not alive
*/
jdata = orte_get_job_data_object(nm->name.jobid);
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_PROC_STATE_RUNNING < rec->state ||
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
&nm->name, rly, ORTE_RML_TAG_XCAST,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
OBJ_RELEASE(item);
}
}
CLEANUP:
/* cleanup */
OPAL_LIST_DESTRUCT(&coll);
OBJ_RELEASE(rly); // retain accounting
/* now pass the relay buffer to myself for processing - don't
* inject it into the RML system via send as that will compete
* with the relay messages down in the OOB. Instead, pass it
* directly to the orted command processor */
if (ORTE_DAEMON_DVM_NIDMAP_CMD != command) {
ORTE_RML_POST_MESSAGE(ORTE_PROC_MY_NAME, tag, 1,
relay->base_ptr, relay->bytes_used);
relay->base_ptr = NULL;
relay->bytes_used = 0;
}
if (NULL != relay) {
OBJ_RELEASE(relay);
}
OBJ_DESTRUCT(&datbuf);
}
static void barrier_release(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int32_t cnt;
int rc, ret;
orte_grpcomm_signature_t *sig;
orte_grpcomm_coll_t *coll;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct: barrier release called with %d bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)buffer->bytes_used));
/* unpack the signature */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &cnt, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the return status */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* check for the tracker - it is not an error if not
* found as that just means we wre not involved
* in the collective */
if (NULL == (coll = orte_grpcomm_base_get_tracker(sig, false))) {
OBJ_RELEASE(sig);
return;
}
/* execute the callback */
if (NULL != coll->cbfunc) {
coll->cbfunc(ret, buffer, coll->cbdata);
}
opal_list_remove_item(&orte_grpcomm_base.ongoing, &coll->super);
OBJ_RELEASE(coll);
OBJ_RELEASE(sig);
}