1
1
Artem Polyakov 68167ec879 ompi/comm: Improve MPI_Comm_create algorithm
Force only procs that are participating in the ne Comm to decide what
    CID is appropriate. This will have 2 advantages:
    * Speedup Comm creation for small communicators: non-participating procs
      will not interfere
    * Reduce CID fragmentation: non-overlaping groups will be allowed to use
      same CID.

Signed-off-by: Artem Polyakov <artpol84@gmail.com>
2017-04-21 08:33:29 +07:00

1147 строки
42 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2007 Voltaire All rights reserved.
* Copyright (c) 2006-2010 University of Houston. All rights reserved.
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 IBM Corporation. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix.h"
#include "ompi/proc/proc.h"
#include "ompi/communicator/communicator.h"
#include "ompi/op/op.h"
#include "ompi/constants.h"
#include "opal/class/opal_pointer_array.h"
#include "opal/class/opal_list.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/request/request.h"
#include "ompi/runtime/mpiruntime.h"
struct ompi_comm_cid_context_t;
typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
struct ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
struct ompi_comm_cid_context_t {
opal_object_t super;
ompi_communicator_t *newcomm;
ompi_communicator_t **newcommp;
ompi_communicator_t *comm;
ompi_communicator_t *bridgecomm;
ompi_comm_allreduce_impl_fn_t allreduce_fn;
int nextcid;
int nextlocal_cid;
int start;
int flag, rflag;
int local_leader;
int remote_leader;
int iter;
/** storage for activate barrier */
int ok;
char *port_string;
bool send_first;
int pml_tag;
char *pmix_tag;
};
typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t;
static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context)
{
memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
}
static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context)
{
free (context->port_string);
free (context->pmix_tag);
}
OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t,
mca_comm_cid_context_construct,
mca_comm_cid_context_destruct);
struct ompi_comm_allreduce_context_t {
opal_object_t super;
int *inbuf;
int *outbuf;
int count;
struct ompi_op_t *op;
ompi_comm_cid_context_t *cid_context;
int *tmpbuf;
/* for group allreduce */
int peers_comm[3];
};
typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t;
static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context)
{
memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super));
}
static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context)
{
free (context->tmpbuf);
}
OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t,
ompi_comm_allreduce_context_construct,
ompi_comm_allreduce_context_destruct);
/**
* These functions make sure, that we determine the global result over
* an intra communicators (simple), an inter-communicator and a
* pseudo inter-communicator described by two separate intra-comms
* and a bridge-comm (intercomm-create scenario).
*/
/* non-blocking intracommunicator allreduce */
static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
/* non-blocking intercommunicator allreduce */
static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req);
static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT;
int ompi_comm_cid_init (void)
{
return OMPI_SUCCESS;
}
static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0,
const void *arg1, const char *pmix_tag, bool send_first,
int mode)
{
ompi_comm_cid_context_t *context;
context = OBJ_NEW(ompi_comm_cid_context_t);
if (OPAL_UNLIKELY(NULL == context)) {
return NULL;
}
context->newcomm = newcomm;
context->comm = comm;
context->bridgecomm = bridgecomm;
context->pml_tag = 0;
/* Determine which implementation of allreduce we have to use
* for the current mode. */
switch (mode) {
case OMPI_COMM_CID_INTRA:
context->allreduce_fn = ompi_comm_allreduce_intra_nb;
break;
case OMPI_COMM_CID_INTER:
context->allreduce_fn = ompi_comm_allreduce_inter_nb;
break;
case OMPI_COMM_CID_GROUP:
context->allreduce_fn = ompi_comm_allreduce_group_nb;
context->pml_tag = ((int *) arg0)[0];
break;
case OMPI_COMM_CID_INTRA_PMIX:
context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb;
context->local_leader = ((int *) arg0)[0];
if (arg1) {
context->port_string = strdup ((char *) arg1);
}
context->pmix_tag = strdup ((char *) pmix_tag);
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb;
context->local_leader = ((int *) arg0)[0];
context->remote_leader = ((int *) arg1)[0];
break;
default:
OBJ_RELEASE(context);
return NULL;
}
context->send_first = send_first;
context->iter = 0;
context->ok = 1;
return context;
}
static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_comm_cid_context_t *cid_context)
{
ompi_comm_allreduce_context_t *context;
context = OBJ_NEW(ompi_comm_allreduce_context_t);
if (OPAL_UNLIKELY(NULL == context)) {
return NULL;
}
context->inbuf = inbuf;
context->outbuf = outbuf;
context->count = count;
context->op = op;
context->cid_context = cid_context;
return context;
}
/* find the next available local cid and start an allreduce */
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
/* verify that the maximum cid is locally available and start an allreduce */
static int ompi_comm_checkcid (ompi_comm_request_t *request);
/* verify that the cid was available globally */
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
bool send_first, int mode, ompi_request_t **req)
{
ompi_comm_cid_context_t *context;
ompi_comm_request_t *request;
context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1,
"nextcid", send_first, mode);
if (NULL == context) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
context->start = ompi_mpi_communicators.lowest_free;
request = ompi_comm_request_get ();
if (NULL == request) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
bool send_first, int mode)
{
ompi_request_t *req;
int rc;
rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
if (OMPI_SUCCESS != rc) {
return rc;
}
ompi_request_wait_completion (req);
rc = req->req_status.MPI_ERROR;
ompi_comm_request_return ((ompi_comm_request_t *) req);
return rc;
}
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
ompi_request_t *subreq;
bool flag;
int ret;
int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
}
if (ompi_comm_cid_lowest_id < my_id) {
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
}
ompi_comm_cid_lowest_id = my_id;
/**
* This is the real algorithm described in the doc
*/
if( participate ){
flag = false;
context->nextlocal_cid = mca_pml.pml_max_contextid;
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
context->comm);
if (true == flag) {
context->nextlocal_cid = i;
break;
}
}
} else {
context->nextlocal_cid = 0;
}
ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context, &subreq);
/* there was a failure during non-blocking collective
* all we can do is abort
*/
if (OMPI_SUCCESS != ret) {
goto err_exit;
}
if ( ((unsigned int) context->nextlocal_cid == mca_pml.pml_max_contextid) ) {
/* Our local CID space is out, others already aware (allreduce above) */
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto err_exit;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
/* next we want to verify that the resulting commid is ok */
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
err_exit:
if (participate && flag) {
opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
}
ompi_comm_cid_lowest_id = INT64_MAX;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ret;
}
static int ompi_comm_checkcid (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
ompi_request_t *subreq;
int ret;
int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
}
if( !participate ){
context->flag = 1;
} else {
context->flag = (context->nextcid == context->nextlocal_cid);
if ( participate && !context->flag) {
opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators,
context->nextcid, context->comm);
}
}
++context->iter;
ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq);
if (OMPI_SUCCESS == ret) {
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
} else {
if (participate && context->flag ) {
opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
}
ompi_comm_cid_lowest_id = INT64_MAX;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ret;
}
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
int participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED);
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
}
if (1 == context->rflag) {
if( !participate ) {
/* we need to provide something sane here
* but we cannot use `nextcid` as we may have it
* in-use, go ahead with next locally-available CID
*/
context->nextlocal_cid = mca_pml.pml_max_contextid;
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
bool flag;
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
context->comm);
if (true == flag) {
context->nextlocal_cid = i;
break;
}
}
context->nextcid = context->nextlocal_cid;
}
/* set the according values to the newcomm */
context->newcomm->c_contextid = context->nextcid;
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
/* unlock the cid generator */
ompi_comm_cid_lowest_id = INT64_MAX;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
/* done! */
return OMPI_SUCCESS;
}
if (participate && (1 == context->flag)) {
/* we could use this cid, but other don't agree */
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
context->start = context->nextcid + 1; /* that's where we can start the next round */
}
++context->iter;
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
/* try again */
return ompi_comm_allreduce_getnextcid (request);
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
/* This routine serves two purposes:
* - the allreduce acts as a kind of Barrier,
* which avoids, that we have incoming fragments
* on the new communicator before everybody has set
* up the comm structure.
* - some components (e.g. the collective MagPIe component
* might want to generate new communicators and communicate
* using the new comm. Thus, it can just be called after
* the 'barrier'.
*
* The reason that this routine is in comm_cid and not in
* comm.c is, that this file contains the allreduce implementations
* which are required, and thus we avoid having duplicate code...
*/
/* Non-blocking version of ompi_comm_activate */
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0,
const void *arg1, bool send_first, int mode, ompi_request_t **req)
{
ompi_comm_cid_context_t *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int ret = 0;
context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate",
send_first, mode);
if (NULL == context) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */
context->newcommp = newcomm;
request = ompi_comm_request_get ();
if (NULL == request) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
/* Initialize the PML stuff in the newcomm */
if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
OBJ_RELEASE(newcomm);
OBJ_RELEASE(context);
*newcomm = MPI_COMM_NULL;
return ret;
}
OMPI_COMM_SET_PML_ADDED(*newcomm);
}
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
&subreq);
if (OMPI_SUCCESS != ret) {
ompi_comm_request_return (request);
return ret;
}
ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm, const void *arg0,
const void *arg1, bool send_first, int mode)
{
ompi_request_t *req;
int rc;
rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req);
if (OMPI_SUCCESS != rc) {
return rc;
}
ompi_request_wait_completion (req);
rc = req->req_status.MPI_ERROR;
ompi_comm_request_return ((ompi_comm_request_t *) req);
return rc;
}
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
int ret;
/**
* Check to see if this process is in the new communicator.
*
* Specifically, this function is invoked by all proceses in the
* old communicator, regardless of whether they are in the new
* communicator or not. This is because it is far simpler to use
* MPI collective functions on the old communicator to determine
* some data for the new communicator (e.g., remote_leader) than
* to kludge up our own pseudo-collective routines over just the
* processes in the new communicator. Hence, *all* processes in
* the old communicator need to invoke this function.
*
* That being said, only processes in the new communicator need to
* select a coll module for the new communicator. More
* specifically, proceses who are not in the new communicator
* should *not* select a coll module -- for example,
* ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who
* are not in the new communicator. This can cause errors in the
* selection / initialization of a coll module. Plus, it's
* wasteful -- processes in the new communicator will end up
* freeing the new communicator anyway, so we might as well leave
* the coll selection as NULL (the coll base comm unselect code
* handles that case properly).
*/
if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) {
return OMPI_SUCCESS;
}
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) {
OBJ_RELEASE(context->newcomm);
*context->newcommp = MPI_COMM_NULL;
return ret;
}
/* For an inter communicator, we have to deal with the potential
* problem of what is happening if the local_comm that we created
* has a lower CID than the parent comm. This is not a problem
* as long as the user calls MPI_Comm_free on the inter communicator.
* However, if the communicators are not freed by the user but released
* by Open MPI in MPI_Finalize, we walk through the list of still available
* communicators and free them one by one. Thus, local_comm is freed before
* the actual inter-communicator. However, the local_comm pointer in the
* inter communicator will still contain the 'previous' address of the local_comm
* and thus this will lead to a segmentation violation. In order to prevent
* that from happening, we increase the reference counter local_comm
* by one if its CID is lower than the parent. We cannot increase however
* its reference counter if the CID of local_comm is larger than
* the CID of the inter communicators, since a regular MPI_Comm_free would
* leave in that the case the local_comm hanging around and thus we would not
* recycle CID's properly, which was the reason and the cause for this trouble.
*/
if (OMPI_COMM_IS_INTER(context->newcomm)) {
if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) {
OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm);
OBJ_RETAIN (context->newcomm);
}
}
/* done */
return OMPI_SUCCESS;
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op,
ompi_comm_cid_context_t *context, ompi_request_t **req)
{
ompi_communicator_t *comm = context->comm;
return comm->c_coll->coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
req, comm->c_coll->coll_iallreduce_module);
}
/* Non-blocking version of ompi_comm_allreduce_inter */
static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_comm_cid_context_t *cid_context,
ompi_request_t **req)
{
ompi_communicator_t *intercomm = cid_context->comm;
ompi_comm_allreduce_context_t *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int local_rank, rc;
if (!OMPI_COMM_IS_INTER (cid_context->comm)) {
return MPI_ERR_COMM;
}
request = ompi_comm_request_get ();
if (OPAL_UNLIKELY(NULL == request)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
if (OPAL_UNLIKELY(NULL == context)) {
ompi_comm_request_return (request);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
/* Allocate temporary arrays */
local_rank = ompi_comm_rank (intercomm);
if (0 == local_rank) {
context->tmpbuf = (int *) calloc (count, sizeof(int));
if (OPAL_UNLIKELY (NULL == context->tmpbuf)) {
ompi_comm_request_return (request);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
/* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group
* and vise-versa. */
rc = intercomm->c_local_comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0,
intercomm->c_local_comm, &subreq,
intercomm->c_local_comm->c_coll->coll_ireduce_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
ompi_comm_request_return (request);
return rc;
}
if (0 == local_rank) {
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1);
} else {
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1);
}
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_communicator_t *intercomm = context->cid_context->comm;
ompi_request_t *subreqs[2];
int rc;
/* local leader exchange their data and determine the overall result
for both groups */
rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
intercomm, subreqs));
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1));
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2);
}
static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
return ompi_comm_allreduce_inter_bcast (request);
}
static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_communicator_t *comm = context->cid_context->comm->c_local_comm;
ompi_request_t *subreq;
int rc;
/* both roots have the same result. broadcast to the local group */
rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm,
&subreq, comm->c_coll->coll_ibcast_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
}
static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_communicator_t *comm = context->cid_context->comm;
ompi_request_t *subreq;
int rc;
rc = comm->c_coll->coll_ibcast (context->outbuf, context->count, MPI_INT,
context->cid_context->local_leader, comm,
&subreq, comm->c_coll->coll_ibcast_module);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
return ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
}
static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
/* step 3: reduce leader data */
ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
/* schedule the broadcast to local peers */
return ompi_comm_allreduce_bridged_schedule_bcast (request);
}
static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm;
ompi_request_t *subreq[2];
int rc;
/* step 2: leader exchange */
rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader,
OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1));
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader,
OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm,
subreq));
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2);
}
static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_comm_cid_context_t *cid_context,
ompi_request_t **req)
{
ompi_communicator_t *comm = cid_context->comm;
ompi_comm_allreduce_context_t *context;
int local_rank = ompi_comm_rank (comm);
ompi_comm_request_t *request;
ompi_request_t *subreq;
int rc;
context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
if (OPAL_UNLIKELY(NULL == context)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (local_rank == cid_context->local_leader) {
context->tmpbuf = (int *) calloc (count, sizeof (int));
if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
request = ompi_comm_request_get ();
if (OPAL_UNLIKELY(NULL == request)) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
if (cid_context->local_leader == local_rank) {
memcpy (context->tmpbuf, inbuf, count * sizeof (int));
}
/* step 1: reduce to the local leader */
rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
cid_context->local_leader, comm, &subreq,
comm->c_coll->coll_ireduce_module);
if ( OMPI_SUCCESS != rc ) {
ompi_comm_request_return (request);
return rc;
}
if (cid_context->local_leader == local_rank) {
rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete,
&subreq, 1);
} else {
/* go ahead and schedule the broadcast */
ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
}
if (OMPI_SUCCESS != rc) {
ompi_comm_request_return (request);
return rc;
}
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_comm_cid_context_t *cid_context = context->cid_context;
int32_t size_count = context->count;
opal_value_t info;
opal_pmix_pdata_t pdat;
opal_buffer_t sbuf;
int rc;
int bytes_written;
const int output_id = 0;
const int verbosity_level = 1;
OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) {
OBJ_DESTRUCT(&sbuf);
opal_output_verbose (verbosity_level, output_id, "pack failed. rc %d\n", rc);
return rc;
}
OBJ_CONSTRUCT(&info, opal_value_t);
OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);
info.type = OPAL_BYTE_OBJECT;
pdat.value.type = OPAL_BYTE_OBJECT;
opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size);
OBJ_DESTRUCT(&sbuf);
bytes_written = asprintf(&info.key,
cid_context->send_first ? "%s:%s:send:%d"
: "%s:%s:recv:%d",
cid_context->port_string,
cid_context->pmix_tag,
cid_context->iter);
if (bytes_written == -1) {
opal_output_verbose (verbosity_level, output_id, "writing info.key failed\n");
} else {
bytes_written = asprintf(&pdat.value.key,
cid_context->send_first ? "%s:%s:recv:%d"
: "%s:%s:send:%d",
cid_context->port_string,
cid_context->pmix_tag,
cid_context->iter);
if (bytes_written == -1) {
opal_output_verbose (verbosity_level, output_id, "writing pdat.value.key failed\n");
}
}
if (bytes_written == -1) {
// write with separate calls,
// just in case the args are the cause of failure
opal_output_verbose (verbosity_level, output_id, "send first: %d\n", cid_context->send_first);
opal_output_verbose (verbosity_level, output_id, "port string: %s\n", cid_context->port_string);
opal_output_verbose (verbosity_level, output_id, "pmix tag: %s\n", cid_context->pmix_tag);
opal_output_verbose (verbosity_level, output_id, "iter: %d\n", cid_context->iter);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* this macro is not actually non-blocking. if a non-blocking version becomes available this function
* needs to be reworked to take advantage of it. */
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 600); // give them 10 minutes
OBJ_DESTRUCT(&info);
if (OPAL_SUCCESS != rc) {
OBJ_DESTRUCT(&pdat);
return rc;
}
OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size);
pdat.value.data.bo.bytes = NULL;
pdat.value.data.bo.size = 0;
OBJ_DESTRUCT(&pdat);
rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT);
OBJ_DESTRUCT(&sbuf);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
return rc;
}
ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT);
return ompi_comm_allreduce_bridged_schedule_bcast (request);
}
static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_comm_cid_context_t *cid_context,
ompi_request_t **req)
{
ompi_communicator_t *comm = cid_context->comm;
ompi_comm_allreduce_context_t *context;
int local_rank = ompi_comm_rank (comm);
ompi_comm_request_t *request;
ompi_request_t *subreq;
int rc;
context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
if (OPAL_UNLIKELY(NULL == context)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (cid_context->local_leader == local_rank) {
context->tmpbuf = (int *) calloc (count, sizeof(int));
if (OPAL_UNLIKELY(NULL == context->tmpbuf)) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
request = ompi_comm_request_get ();
if (NULL == request) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
/* comm is an intra-communicator */
rc = comm->c_coll->coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op,
cid_context->local_leader, comm,
&subreq, comm->c_coll->coll_ireduce_module);
if ( OMPI_SUCCESS != rc ) {
ompi_comm_request_return (request);
return rc;
}
if (cid_context->local_leader == local_rank) {
rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete,
&subreq, 1);
} else {
/* go ahead and schedule the broadcast */
rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1);
rc = ompi_comm_allreduce_bridged_schedule_bcast (request);
}
if (OMPI_SUCCESS != rc) {
ompi_comm_request_return (request);
return rc;
}
ompi_comm_request_start (request);
*req = (ompi_request_t *) request;
/* use the same function as bridged to schedule the broadcast */
return OMPI_SUCCESS;
}
static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_comm_cid_context_t *cid_context = context->cid_context;
ompi_request_t *subreq[2];
int subreq_count = 0;
int rc;
for (int i = 0 ; i < 2 ; ++i) {
if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1],
cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
cid_context->comm, subreq + subreq_count++));
if (OMPI_SUCCESS != rc) {
return rc;
}
}
}
return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count);
}
static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request)
{
ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context;
ompi_comm_cid_context_t *cid_context = context->cid_context;
int *tmp = context->tmpbuf;
ompi_request_t *subreq[2];
int rc;
for (int i = 0 ; i < 2 ; ++i) {
if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT);
tmp += context->count;
}
}
if (MPI_PROC_NULL != context->peers_comm[0]) {
/* interior node */
rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD,
cid_context->comm, subreq));
if (OMPI_SUCCESS != rc) {
return rc;
}
rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0],
cid_context->pml_tag, cid_context->comm, subreq + 1));
if (OMPI_SUCCESS != rc) {
return rc;
}
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2);
}
/* root */
return ompi_comm_allreduce_group_broadcast (request);
}
static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req)
{
ompi_group_t *group = cid_context->newcomm->c_local_group;
const int group_size = ompi_group_size (group);
const int group_rank = ompi_group_rank (group);
ompi_communicator_t *comm = cid_context->comm;
int peers_group[3], *tmp, subreq_count = 0;
ompi_comm_allreduce_context_t *context;
ompi_comm_request_t *request;
ompi_request_t *subreq[3];
context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context);
if (NULL == context) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
tmp = context->tmpbuf = calloc (sizeof (int), count * 3);
if (NULL == context->tmpbuf) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request = ompi_comm_request_get ();
if (NULL == request) {
OBJ_RELEASE(context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request->context = &context->super;
/* basic recursive doubling allreduce on the group */
peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL;
peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL;
peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL;
/* translate the ranks into the ranks of the parent communicator */
ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm);
/* reduce */
memmove (outbuf, inbuf, sizeof (int) * count);
for (int i = 0 ; i < 2 ; ++i) {
if (MPI_PROC_NULL != context->peers_comm[i + 1]) {
int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1],
cid_context->pml_tag, comm, subreq + subreq_count++));
if (OMPI_SUCCESS != rc) {
ompi_comm_request_return (request);
return rc;
}
tmp += count;
}
}
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count);
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}