1
1
openmpi/ompi/mca/bcol/iboffload/bcol_iboffload_allreduce.c

1416 строки
54 KiB
C
Исходник Обычный вид История

coll/ml: add support for blocking and non-blocking allreduce, reduce, and allgather. The new collectives provide a signifigant performance increase over tuned for small and medium messages. We are initially setting the priority lower than tuned until this has had some time to soak in the trunk. Please set coll_ml_priority to 90 for MTT runs. Credit for this work goes to Manjunath Gorentla Venkata (ORNL), Pavel Shamis (ORNL), and Nathan Hjelm (LANL). Commit details (for reference): Import ORNL's collectives for MPI_Allreduce, MPI_Reduce, and MPI_Allgather. We need to take the basesmuma header into account when calculating the ptpcoll small message thresholds. Add a define to bcol.h indicating the maximum header size so we can take the header into account while not making ptpcoll dependent on information from basesmuma. This resolves an issue with allreduce where ptpcoll overwrites the header of the next buffer in the basesmuma bank. Fix reduce and make a sequential collective launcher in coll_ml_inlines.h The root calculation for reduce was wrong for any root != 0. There are four possibilities for the root: - The root is not the current process but is in the current hierarchy. In this case the root is the index of the global root as specified in the root vector. - The root is not the current process and is not in the next level of the hierarchy. In this case 0 must be the local root since this process will never communicate with the real root. - The root is not the current process but will be in next level of the hierarchy. In this case the current process must be the root. - I am the root. The root is my index. Tested with IMB which rotates the root on every call to MPI_Reduce. Consider IMB the reproducer for the issue this commit solves. Make the bcast algorithm decision an enumerated variable Resolve various asset failures when destructing coll ml requests. Two issues: - Always reset the request to be invalid before returning it to the free list. This will avoid an asset in ompi_request_t's destructor. OMPI_REQUEST_FINI does this (and also releases the fortran handle index). - Never explicitly construct or destruct the superclass of an opal object. This screws up the class function tables and will cause either an assert failure or a segmentation fault when destructing coll ml requests. Cleanup allgather. I removed the duplicate non-blocking and blocking functions and modeled the cleanup after what I found in allreduce. Also cleaned up the code somewhat. Don't bother copying from the send to the recieve buffer in bcol_basesmuma_allreduce_intra_fanin_fanout if the pointers are the same. The eliminates a warning about memcpy and aliasing and avoids an unnecessary call to memcpy. Alwasy call CHECK_AND_RELEASE on memsync collectives. There was a call to OBJ_RELEASE on the collective communicator but because CHECK_AND_RECYLCE was never called there was not matching call to OBJ_RELEASE. This caused coll ml to leak communicators. Make allreduce use the sequential collective launcher in coll_ml_inlines.h Just launch the next collective in the component progress. I am a little unsure about this patch. There appears to be some sort of race between collectives that causes buffer exhaustion in some cases (IMB Allreduce is a reproducer). Changing progress to only launch the next bcol seems to resolve the issue but might not be the best fix. Note that I see little-no performance penalty for this change. Fix allreduce when there are extra sources. There was an issue with the buffer offset calculation when there are extra sources. In the case of extra sources == 1 the offset was set to buffer_size (just past the header of the next buffer). I adjusted the buffer size to take into accoun the maximum header size (see the earlier commit that added this) and simplified the offset calculation. Make reduce/allreduce non-blocking. This is required for MPI_Comm_idup to work correctly. This has been tested with various layouts using the ibm testsuite and imb and appears to have the same performance as the old blocking version. Fix allgather for non-contiguous layouts and simplify parsing the topology. Some things in this patch: - There were several comments to the effect that level 0 of the hierarchy MUST contain all of the ranks. At least one function made this assumption but it was not true. I changed the sbgp components and the coll ml initization code to enforce this requirement. - Ensure that hierarchy level 0 has the ranks in the correct scatter gather order. This removes the need for a separate sort list and fixes the offset calculation for allgather. - There were several passes over the hierarchy to determine properties of the hierarchy. I eliminated these extra passes and the memory allocation associated with them and calculate the tree properties on the fly. The same DFS recursion also handles the re-order of level 0. All these changes have been verified with MPI_Allreduce, MPI_Reduce, and MPI_Allgather. All functions now pass all IBM/Open MPI, and IMB tests. coll/ml: correct pointer usage for MPI_BOTTOM Since contiguous datatypes are copied via memcpy (bypassing the convertor) we need to adjust for the lb of the datatype. This corrects problems found testing code that uses MPI_BOTTOM (NULL) as the send pointer. Add fallback collectives for allreduce and reduce. cmr=v1.7.5:reviewer=pasha This commit was SVN r30363.
2014-01-22 19:39:19 +04:00
/*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <inttypes.h>
#include "bcol_iboffload.h"
#include "bcol_iboffload_frag.h"
#include "bcol_iboffload_task.h"
#include "bcol_iboffload_collfrag.h"
#include "bcol_iboffload_endpoint.h"
#include "opal/include/opal/types.h"
static int mca_bcol_iboffload_calc_res_to_user(void *callback_data)
{
int rc;
uint64_t result = 0;
uint64_t l_operand = 0;
uint64_t r_operand = 0;
mca_bcol_iboffload_collfrag_t *coll_frag =
(mca_bcol_iboffload_collfrag_t *) callback_data;
mca_bcol_iboffload_collreq_t *coll_request = coll_frag->coll_full_req;
ompi_op_t *op = coll_request->op;
ompi_datatype_t *dtype = coll_request->dtype;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
struct ibv_context *ib_dev_context = coll_request->module->device->dev.ib_dev_context;
IBOFFLOAD_VERBOSE(10, ("Start calculating.\n"));
rc = unpack_data_from_calc(ib_dev_context,
cm->map_ompi_to_ib_calcs[op->op_type],
cm->map_ompi_to_ib_dt[dtype->id], false,
(void *) (uintptr_t) coll_request->l_operand,
NULL, (void *) &l_operand);
if (0 != rc) {
IBOFFLOAD_VERBOSE(10, ("unpack_data_from_calc for l_operand failed: op %s, type %s\n",
op->o_name, dtype->name));
return OMPI_ERROR;
}
rc = unpack_data_from_calc(ib_dev_context,
cm->map_ompi_to_ib_calcs[op->op_type],
cm->map_ompi_to_ib_dt[dtype->id], false,
(void *) (uintptr_t) coll_request->r_operand,
NULL, (void *) &r_operand);
if (0 != rc) {
IBOFFLOAD_VERBOSE(10, ("unpack_data_from_calc for r_operand failed: op %s, type %s\n",
op->o_name, dtype->name));
return OMPI_ERROR;
}
switch (op->op_type) {
case OMPI_OP_PROD:
break; /* ronni todo - ????? */
case OMPI_OP_LAND:
result = l_operand && r_operand;
break;
case OMPI_OP_BAND:
result = l_operand & r_operand;
break;
case OMPI_OP_LOR:
result = l_operand || r_operand;
break;
case OMPI_OP_BOR:
result = l_operand | r_operand;
break;
case OMPI_OP_LXOR:
result = ((l_operand && !r_operand) || (!l_operand && r_operand));
break;
case OMPI_OP_BXOR:
result = l_operand ^ r_operand;
break;
case OMPI_OP_MAXLOC:
case OMPI_OP_MINLOC:
break;
case OMPI_OP_MAX:
case OMPI_OP_MIN:
case OMPI_OP_SUM:
switch (cm->map_ompi_to_ib_dt[dtype->id]) {
case IBV_M_DATA_TYPE_INT8:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, char, l_operand, r_operand, result);
break;
case IBV_M_DATA_TYPE_INT16:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, int16_t, l_operand, r_operand, result);
break;
case IBV_M_DATA_TYPE_INT32:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, int32_t, l_operand, r_operand, result);
break;
case IBV_M_DATA_TYPE_INT64:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, int64_t, l_operand, r_operand, result);
break;
case IBV_M_DATA_TYPE_FLOAT32:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, float, l_operand, r_operand, result);
break;
case IBV_M_DATA_TYPE_FLOAT64:
MCA_BCOL_IBOFFLOAD_ALLREDUCE_DO_CALC(coll_request->op->op_type, double, l_operand, r_operand, result);
break;
default:
IBOFFLOAD_VERBOSE(10, ("Unsupported data type: %s.\n", dtype->name));
return OMPI_ERROR;
}
break;
default:
IBOFFLOAD_VERBOSE(10, ("Unsupported op: %s.\n", coll_request->op->o_name));
return OMPI_ERROR;
}
memcpy(coll_request->buffer_info[RBUF].buf, &result, coll_frag->unpack_size);
IBOFFLOAD_VERBOSE(10, ("The output data after calc is %lf, result %lf, l_operand %lf, r_operand %lf: "
"sbuf addr %p, rbuf addr %p.\n",
*(double *) coll_request->buffer_info[RBUF].buf, *(double *) &result,
*(double *) &l_operand, *(double *) &r_operand,
coll_request->buffer_info[SBUF].buf,
coll_request->buffer_info[RBUF].buf));
return OMPI_SUCCESS;
}
static int mca_bcol_iboffload_unpack_res_to_user(void *callback_data)
{
int rc;
mca_bcol_iboffload_collfrag_t *coll_frag =
(mca_bcol_iboffload_collfrag_t *) callback_data;
mca_bcol_iboffload_collreq_t *coll_request = coll_frag->coll_full_req;
mca_bcol_iboffload_task_t *task = (mca_bcol_iboffload_task_t *) coll_frag->signal_task_wr_id;
mca_bcol_iboffload_frag_t *recv_frag = task->frag;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
struct ibv_context *ib_dev_context = coll_request->module->device->dev.ib_dev_context;
rc = unpack_data_from_calc(ib_dev_context,
cm->map_ompi_to_ib_calcs[coll_request->op->op_type],
cm->map_ompi_to_ib_dt[coll_request->dtype->id],
false, (void*) (uintptr_t) recv_frag->sg_entry.addr,
NULL, coll_request->buffer_info[RBUF].buf);
if (0 != rc) {
IBOFFLOAD_VERBOSE(10, ("unpack_data_from_calc is failed: op %s, type %s\n",
coll_request->op->o_name, coll_request->dtype->name));
return OMPI_ERROR;
}
IBOFFLOAD_VERBOSE(10, ("The naitive output data is %" PRId64 ".\n"
"The output data is %" PRId64 ".\n",
*(uint64_t *) recv_frag->sg_entry.addr,
*(uint64_t *) coll_request->buffer_info[RBUF].buf));
return OMPI_SUCCESS;
}
static int
allreduce_extra_node(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request)
/* (EXTRA_NODE == my_exchange_node->node_type) */
{
/* local variables */
int rc, extra_rank;
mca_bcol_iboffload_frag_t *send_fragment,
*preposted_recv_frag;
mca_bcol_iboffload_task_t *send_task,
*wait_task;
struct mqe_task *last_wait, /* we need ask from completion on last wait */
*last_send;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
struct mqe_task **mqe_ptr_to_set;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
mqe_ptr_to_set = &coll_fragment->to_post;
if (OPAL_UNLIKELY(false == BCOL_IBOFFLOAD_MQ_HAVE_CREDITS(
iboffload, coll_fragment->mq_index, coll_fragment->mq_credits))) {
IBOFFLOAD_VERBOSE(10, ("There are not enough credits on MQ.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* I will NOT participate in the exchange - so just "register" as here */
extra_rank = my_exchange_node->rank_extra_source;
send_fragment = mca_bcol_iboffload_get_send_frag(coll_request,
extra_rank, coll_request->qp_index,
MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE, 0,
SBUF,
MCA_BCOL_IBOFFLOAD_SEND_FRAG_ML_CALC);
if (OPAL_UNLIKELY(NULL == send_fragment)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting and packing send frag.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* send my operand to EXCHANGE NODE */
send_task = mca_bcol_iboffload_get_send_task(iboffload, extra_rank,
coll_request->qp_index, send_fragment, coll_fragment, INLINE);
if (OPAL_UNLIKELY(NULL == send_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting send task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, send_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, send_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, extra_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for final result from EXCHANGE NODE */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, extra_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
*mqe_ptr_to_set = NULL;
/* finish initializing full message descriptor */
coll_request->n_fragments = 1;
coll_request->n_frags_sent = 1;
/* Pasha: need to set to true in upper layer */
coll_request->user_handle_freed = false;
last_wait->flags |= MQE_WR_FLAG_SIGNAL;
coll_fragment->signal_task_wr_id = last_wait->wr_id;
last_wait->wr_id = (uint64_t) (uintptr_t) coll_fragment;
/* post the mwr */
IBOFFLOAD_VERBOSE(10, ("Post tasks.\n"));
rc = mca_bcol_iboffload_post_mqe_tasks(iboffload, coll_fragment->to_post);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_ERROR(("MQE task posting failing.\n"));
/* Note: need to clean up */
return rc;
}
MCA_BCOL_UPDATE_ORDER_COUNTER(&iboffload->super, coll_request->order_info);
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Allreduce: adding collfrag to collfrag_pending.\n"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
/**
* Start allreduce
*/
static int do_exchange(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request,
struct mqe_task ***mqe_ptr_to_set,
struct mqe_task **last_wait,
struct ibv_sge **l_operand,
struct ibv_sge **r_operand)
{
int rc = OMPI_SUCCESS, exchange, pair_rank,
my_rank = ((mca_sbgp_base_module_t *) iboffload->ibnet)->my_index;
mca_bcol_iboffload_frag_t *preposted_recv_frag;
mca_bcol_iboffload_task_t *wait_task,
*calc_task;
struct mqe_task *last_send;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
size_t calc_size = MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE + MCA_IBOFFLOAD_CALC_SIZE_EXT;
pair_rank = my_exchange_node->rank_exchanges[0];
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, pair_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for send from first algorithm partner */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, pair_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
(*l_operand)->length = calc_size;
for (exchange = 1; exchange < my_exchange_node->n_exchanges; ++exchange) {
pair_rank = my_exchange_node->rank_exchanges[exchange];
(*r_operand) = &preposted_recv_frag->sg_entry;
(*r_operand)->length = calc_size;
/* Calc and send the result to the partner */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
pair_rank, coll_request->qp_index, NULL,
*l_operand, *r_operand,
coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
/* Calc and send the result to myself */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
*l_operand, *r_operand, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (NULL == wait_task) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
(*l_operand) = &preposted_recv_frag->sg_entry;
(*l_operand)->length = calc_size;
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, pair_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from the current algorithm partner */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, pair_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
}
(*r_operand) = &preposted_recv_frag->sg_entry;
(*r_operand)->length = calc_size;
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Adding collfrag to collfrag_pending"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
/* Power of 2 case */
static int
pure_recursive_doubling(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request)
{
/* local variables */
int rc = OMPI_SUCCESS, pair_rank,
my_rank = ((mca_sbgp_base_module_t *) iboffload->ibnet)->my_index;
struct mqe_task *last_send,
*last_wait;
mca_bcol_iboffload_task_t *send_task,
*wait_task,
*calc_task;
mca_bcol_iboffload_frag_t *send_fragment,
*preposted_recv_frag;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
struct ibv_sge *r_operand = NULL,
*l_operand = NULL;
struct mqe_task **mqe_ptr_to_set;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
mqe_ptr_to_set = &coll_fragment->to_post;
if (OPAL_UNLIKELY(false == BCOL_IBOFFLOAD_MQ_HAVE_CREDITS(
iboffload, coll_fragment->mq_index, coll_fragment->mq_credits))) {
IBOFFLOAD_VERBOSE(10, ("There are not enough credits on MQ.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
IBOFFLOAD_VERBOSE(10, ("Allreduce starting: type %d op %d, "
"n_extra_sources - %d.\n", cm->map_ompi_to_ib_dt[coll_request->dtype->id],
cm->map_ompi_to_ib_calcs[coll_request->op->op_type],
my_exchange_node->n_extra_sources));
pair_rank = my_exchange_node->rank_exchanges[0];
send_fragment = mca_bcol_iboffload_get_send_frag(coll_request,
pair_rank, coll_request->qp_index,
(MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE + MCA_IBOFFLOAD_CALC_SIZE_EXT), 0,
SBUF,
MCA_BCOL_IBOFFLOAD_SEND_FRAG_ML_CALC);
if (OPAL_UNLIKELY(NULL == send_fragment)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting and packing send frag.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Vasily: NO_INLINE ????? */
/* send my operand to the first algorithm partner */
send_task = mca_bcol_iboffload_get_send_task(iboffload, pair_rank,
coll_request->qp_index, send_fragment, coll_fragment, NO_INLINE);
if (OPAL_UNLIKELY(NULL == send_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting send task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, send_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, send_task);
l_operand = &send_fragment->sg_entry;
/* Recursive-doubling exchange */
rc = do_exchange(iboffload, coll_request, &mqe_ptr_to_set,
&last_wait, &l_operand, &r_operand);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
if (false == coll_request->do_calc_in_cpu) {
/* Calc and send the result to myself */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
l_operand,
r_operand, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
} else {
coll_request->l_operand = l_operand->addr;
coll_request->r_operand = r_operand->addr;
}
*mqe_ptr_to_set = NULL;
/* Vasily: TODO with MACRO */
/* finish initializing full message descriptor */
coll_request->n_fragments = 1;
coll_request->n_frags_sent = 1;
/* Pasha: need to set to true in upper layer */
coll_request->user_handle_freed = false;
last_wait->flags |= MQE_WR_FLAG_SIGNAL;
coll_fragment->signal_task_wr_id = last_wait->wr_id;
last_wait->wr_id = (uint64_t) (uintptr_t) coll_fragment;
/* post the mwr */
IBOFFLOAD_VERBOSE(10, ("Post tasks.\n"));
rc = mca_bcol_iboffload_post_mqe_tasks(iboffload, coll_fragment->to_post);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_ERROR(("MQE task posting failing.\n"));
/* Note: need to clean up */
return rc;
}
MCA_BCOL_UPDATE_ORDER_COUNTER(&iboffload->super, coll_request->order_info);
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Adding collfrag to collfrag_pending"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
static int rdma_do_exchange(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request,
struct mqe_task ***mqe_ptr_to_set,
struct mqe_task **last_wait,
struct ibv_sge **l_operand,
struct ibv_sge **r_operand)
{
int rc = OMPI_SUCCESS, exchange, pair_rank,
my_rank = ((mca_sbgp_base_module_t *) iboffload->ibnet)->my_index;
mca_bcol_iboffload_frag_t *preposted_recv_frag;
mca_bcol_iboffload_task_t *wait_task,
*calc_task;
struct mqe_task *last_send;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
const size_t calc_size = MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE + MCA_IBOFFLOAD_CALC_SIZE_EXT;
size_t remote_offset = calc_size;
size_t self_offset = 0;
pair_rank = my_exchange_node->rank_exchanges[0];
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, pair_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for send from first algorithm partner */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, pair_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
(*l_operand)->length = 2 * calc_size ;
for (exchange = 1; exchange < my_exchange_node->n_exchanges; ++exchange) {
pair_rank = my_exchange_node->rank_exchanges[exchange];
/* Pasha: Not used
(*r_operand) = &preposted_recv_frag->sg_entry;
(*r_operand)->length = calc_size;
*/
remote_offset += 2 * calc_size;
self_offset += 2 * calc_size;
/* Calc and send the result to the partner */
/*
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
pair_rank, coll_request->qp_index, NULL,
*l_operand, *r_operand,
coll_request, NO_INLINE);
*/
calc_task = mca_bcol_iboffload_get_rdma_calc_task(iboffload,
pair_rank, coll_request->qp_index, NULL,
*l_operand, NULL,
coll_request, remote_offset);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
/* Calc and send the result to myself */
/*
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
*l_operand, NULL,
coll_request, NO_INLINE);
*/
calc_task = mca_bcol_iboffload_get_rdma_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
*l_operand, NULL,
coll_request, self_offset);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (NULL == wait_task) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
/*
(*l_operand) = &preposted_recv_frag->sg_entry;
*/
/* (*l_operand)->length = 2 * calc_size; */
(*l_operand)->addr = (uint64_t) (uintptr_t) ((unsigned char *) (*l_operand)->addr + 2 * calc_size);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, pair_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
IBOFFLOAD_VERBOSE(10, ("Get prepost recv fag fail.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from the current algorithm partner */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, pair_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST((*mqe_ptr_to_set), wait_task, (*last_wait));
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
}
/* Pasha: not used
(*r_operand) = &preposted_recv_frag->sg_entry;
(*r_operand)->length = calc_size;
*/
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Adding collfrag to collfrag_pending"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
#define ALLREDUCE_BASE_OFFSET (MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE + MCA_IBOFFLOAD_CALC_SIZE_EXT)
/* RDMA Recursive doubling + cache friendly version */
static int
rdma_pure_recursive_doubling(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request)
{
/* local variables */
int rc = OMPI_SUCCESS, pair_rank,
my_rank = ((mca_sbgp_base_module_t *) iboffload->ibnet)->my_index;
struct mqe_task *last_send,
*last_wait;
mca_bcol_iboffload_task_t *send_task,
*wait_task,
*calc_task;
mca_bcol_iboffload_frag_t *send_fragment,
*preposted_recv_frag;
struct ibv_sge operand;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
struct ibv_sge *r_operand = NULL,
*l_operand = NULL;
struct mqe_task **mqe_ptr_to_set;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
mqe_ptr_to_set = &coll_fragment->to_post;
if (OPAL_UNLIKELY(false == BCOL_IBOFFLOAD_MQ_HAVE_CREDITS(
iboffload, coll_fragment->mq_index, coll_fragment->mq_credits))) {
IBOFFLOAD_VERBOSE(10, ("There are not enough credits on MQ.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
IBOFFLOAD_VERBOSE(10, ("Allreduce starting: type %d op %d, "
"n_extra_sources - %d.\n", cm->map_ompi_to_ib_dt[coll_request->dtype->id],
cm->map_ompi_to_ib_calcs[coll_request->op->op_type],
my_exchange_node->n_extra_sources));
pair_rank = my_exchange_node->rank_exchanges[0];
send_fragment = mca_bcol_iboffload_get_send_frag(coll_request,
pair_rank, coll_request->qp_index,
(MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE + MCA_IBOFFLOAD_CALC_SIZE_EXT),
0,
SBUF,
MCA_BCOL_IBOFFLOAD_SEND_FRAG_ML_CALC);
if (OPAL_UNLIKELY(NULL == send_fragment)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting and packing send frag.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Vasily: NO_INLINE ????? */
/* send my operand to the first algorithm partner */
/* send_task = mca_bcol_iboffload_get_send_task(iboffload, pair_rank,
coll_request->qp_index, send_fragment, coll_fragment, NO_INLINE); */
send_task = mca_bcol_iboffload_get_rdma_task(
pair_rank, ALLREDUCE_BASE_OFFSET,
send_fragment, iboffload, coll_fragment);
if (OPAL_UNLIKELY(NULL == send_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting send task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Pasha: ugly but faster, set inline on first send */
SENDWR(send_task)->send_flags |= IBV_SEND_INLINE;
APPEND_TO_TASKLIST(mqe_ptr_to_set, send_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, send_task);
/* l_operand = &send_fragment->sg_entry; */
operand = send_fragment->sg_entry;
l_operand = &operand;
/* Recursive-doubling exchange */
rc = rdma_do_exchange(iboffload, coll_request, &mqe_ptr_to_set,
&last_wait, &l_operand, &r_operand);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
/* Pasha: This flow is broken, print error */
if (false == coll_request->do_calc_in_cpu) {
ML_ERROR(("Calc in CPU must be enabled !!!"));
/* Calc and send the result to myself */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
l_operand,
r_operand, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
} else {
coll_request->l_operand = (uint64_t) (uintptr_t)
((unsigned char *)l_operand->addr);
coll_request->r_operand = (uint64_t) (uintptr_t)
((unsigned char *) (coll_request->l_operand) + ALLREDUCE_BASE_OFFSET);
}
*mqe_ptr_to_set = NULL;
/* Vasily: TODO with MACRO */
/* finish initializing full message descriptor */
coll_request->n_fragments = 1;
coll_request->n_frags_sent = 1;
/* Pasha: need to set to true in upper layer */
coll_request->user_handle_freed = false;
last_wait->flags |= MQE_WR_FLAG_SIGNAL;
coll_fragment->signal_task_wr_id = last_wait->wr_id;
last_wait->wr_id = (uint64_t) (uintptr_t) coll_fragment;
/* post the mwr */
IBOFFLOAD_VERBOSE(10, ("Post tasks.\n"));
rc = mca_bcol_iboffload_post_mqe_tasks(iboffload, coll_fragment->to_post);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_ERROR(("MQE task posting failing.\n"));
/* Note: need to clean up */
return rc;
}
MCA_BCOL_UPDATE_ORDER_COUNTER(&iboffload->super, coll_request->order_info);
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Adding collfrag to collfrag_pending"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
/*
* non power of 2 & EXCHANGE_NODE case,
* need to wait for message from "extra" proc.
*/
static int
non_pure_recursive_doubling(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request)
{
/* local variables */
int rc = OMPI_SUCCESS, extra_rank, pair_rank,
my_rank = ((mca_sbgp_base_module_t *) iboffload->ibnet)->my_index;
mca_bcol_iboffload_frag_t *calc_fragment,
*preposted_recv_frag;
mca_bcol_iboffload_task_t *wait_task,
*calc_task;
struct ibv_sge *r_operand = NULL,
*l_operand = NULL;
struct mqe_task *last_wait, /* we need ask from completion on last wait */
*last_send;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
struct mqe_task **mqe_ptr_to_set;
mca_bcol_iboffload_collfrag_t *coll_fragment = (mca_bcol_iboffload_collfrag_t *)
opal_list_get_last(&coll_request->work_requests);
mqe_ptr_to_set = &coll_fragment->to_post;
if (OPAL_UNLIKELY(false == BCOL_IBOFFLOAD_MQ_HAVE_CREDITS(
iboffload, coll_fragment->mq_index, coll_fragment->mq_credits))) {
IBOFFLOAD_VERBOSE(10, ("There are not enough credits on MQ.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
IBOFFLOAD_VERBOSE(10, ("Allreduce starting: type %d op %d, "
"n_extra_sources - %d.\n", cm->map_ompi_to_ib_dt[coll_request->dtype->id],
cm->map_ompi_to_ib_calcs[coll_request->op->op_type],
my_exchange_node->n_extra_sources));
extra_rank = my_exchange_node->rank_extra_source;
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, extra_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for data from extra node */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, extra_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
pair_rank = my_exchange_node->rank_exchanges[0];
calc_fragment = mca_bcol_iboffload_get_send_frag(coll_request,
pair_rank, coll_request->qp_index,
MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE +
MCA_IBOFFLOAD_CALC_SIZE_EXT, 0,
SBUF,
MCA_BCOL_IBOFFLOAD_SEND_FRAG_ML_CALC);
if (OPAL_UNLIKELY(NULL == calc_fragment)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting and packing send frag.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Calc extra node operand with mine and send the result
to the first algorithm partner */
preposted_recv_frag->sg_entry.length = MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE +
MCA_IBOFFLOAD_CALC_SIZE_EXT;
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
pair_rank, coll_request->qp_index, calc_fragment,
&preposted_recv_frag->sg_entry,
&calc_fragment->sg_entry, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
/* Calc extra node operand with mine and store the result on my buff */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
&preposted_recv_frag->sg_entry,
&calc_fragment->sg_entry, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
l_operand = &preposted_recv_frag->sg_entry;
l_operand->length = MCA_IBOFFLOAD_IB_DRIVER_OPERAND_SIZE +
MCA_IBOFFLOAD_CALC_SIZE_EXT;
/* Recursive-doubling exchange */
rc = do_exchange(iboffload, coll_request, &mqe_ptr_to_set,
&last_wait, &l_operand, &r_operand);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
}
/* Need to send message to "extra" proc =>
one more final result calc for extra node */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
extra_rank, coll_request->qp_index, NULL,
l_operand,
r_operand, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
if (false == coll_request->do_calc_in_cpu) {
/* Calc and send the result to myself */
calc_task = mca_bcol_iboffload_get_calc_task(iboffload,
my_rank, coll_request->qp_index, NULL,
l_operand,
r_operand, coll_request, NO_INLINE);
if (OPAL_UNLIKELY(NULL == calc_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting calc task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, calc_task, last_send);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, calc_task);
preposted_recv_frag =
mca_bcol_iboffload_get_preposted_recv_frag(
iboffload, my_rank, coll_request->qp_index);
if (OPAL_UNLIKELY(NULL == preposted_recv_frag)) {
/* RLG need cleanup */
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
/* Wait for calc from myself */
wait_task = mca_bcol_iboffload_get_wait_task(iboffload, my_rank, 1,
preposted_recv_frag, coll_request->qp_index, NULL);
if (OPAL_UNLIKELY(NULL == wait_task)) {
IBOFFLOAD_VERBOSE(10, ("Failing for getting wait task.\n"));
rc = OMPI_ERR_RESOURCE_BUSY;
goto out_of_resources;
}
APPEND_TO_TASKLIST(mqe_ptr_to_set, wait_task, last_wait);
MCA_BCOL_IBOFFLOAD_APPEND_TASK_TO_LIST(coll_fragment->task_next, wait_task);
} else {
coll_request->l_operand = l_operand->addr;
coll_request->r_operand = r_operand->addr;
}
*mqe_ptr_to_set = NULL;
/* finish initializing full message descriptor */
coll_request->n_fragments = 1;
coll_request->n_frags_sent = 1;
assert(NULL != last_wait);
last_wait->flags |= MQE_WR_FLAG_SIGNAL;
coll_fragment->signal_task_wr_id = last_wait->wr_id;
last_wait->wr_id = (uint64_t) (uintptr_t) coll_fragment;
/* post the mwr */
IBOFFLOAD_VERBOSE(10, ("Post tasks.\n"));
rc = mca_bcol_iboffload_post_mqe_tasks(iboffload, coll_fragment->to_post);
if(OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_ERROR(("MQE task posting failing.\n"));
/* Note: need to clean up */
return rc;
}
MCA_BCOL_UPDATE_ORDER_COUNTER(&iboffload->super, coll_request->order_info);
return OMPI_SUCCESS;
out_of_resources:
/* Release all resources */
IBOFFLOAD_VERBOSE(10, ("Adding collfrag to collfrag_pending"));
return mca_bcol_iboffload_free_resources_and_move_to_pending(coll_fragment, iboffload);
}
static int mca_bcol_iboffload_allreduce_init(
bcol_function_args_t *fn_arguments,
mca_bcol_iboffload_module_t *iboffload,
struct mca_bcol_iboffload_collreq_t **coll_request,
bool if_bcol_last)
{
int rc;
bool exclude_case;
ompi_free_list_item_t *item;
mca_bcol_iboffload_collfrag_t *coll_fragment;
mca_bcol_iboffload_component_t *cm = &mca_bcol_iboffload_component;
IBOFFLOAD_VERBOSE(10, ("Calling for mca_bcol_iboffload_allreduce_init.\n"));
OMPI_FREE_LIST_WAIT(&cm->collreqs_free, item, rc);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_VERBOSE(10, ("Failing for coll request free list waiting.\n"));
return rc;
}
(*coll_request) = (mca_bcol_iboffload_collreq_t *) item;
(*coll_request)->progress_fn = iboffload->allreduce_algth;
(*coll_request)->if_bcol_last = if_bcol_last;
exclude_case = (non_pure_recursive_doubling == iboffload->allreduce_algth &&
(OMPI_OP_SUM == fn_arguments->op->op_type &&
OMPI_DATATYPE_MPI_DOUBLE == fn_arguments->dtype->id));
(*coll_request)->do_calc_in_cpu = cm->last_calc_in_cpu && !exclude_case;
if (false == (*coll_request)->do_calc_in_cpu ||
allreduce_extra_node == iboffload->allreduce_algth) {
(*coll_request)->do_calc_in_cpu = false; /* Relevant for extra node only */
(*coll_request)->completion_cb_fn =
mca_bcol_iboffload_unpack_res_to_user;
} else {
(*coll_request)->completion_cb_fn =
mca_bcol_iboffload_calc_res_to_user;
}
(*coll_request)->module = iboffload;
(*coll_request)->op = fn_arguments->op;
(*coll_request)->dtype = fn_arguments->dtype;
(*coll_request)->count = fn_arguments->count;
(*coll_request)->ml_buffer_index = fn_arguments->buffer_index;
(*coll_request)->buffer_info[SBUF].lkey = iboffload->rdma_block.ib_info.lkey;
(*coll_request)->order_info = &fn_arguments->order_info;
/* ML buffer was provided, no need to pack the data.
* It is few assumption here:
* we CAN touch and change ML buffer
*/
(*coll_request)->buffer_info[SBUF].buf = (void *) (
(unsigned char *) fn_arguments->sbuf +
(size_t) fn_arguments->sbuf_offset);
(*coll_request)->buffer_info[SBUF].offset = fn_arguments->sbuf_offset;
(*coll_request)->buffer_info[RBUF].buf = (void *) (
(unsigned char *) fn_arguments->rbuf +
(size_t) fn_arguments->rbuf_offset);
(*coll_request)->buffer_info[RBUF].offset = fn_arguments->rbuf_offset;
if(mca_bcol_iboffload_component.enable_rdma_calc) {
(*coll_request)->qp_index = MCA_BCOL_IBOFFLOAD_QP_BARRIER;
} else {
(*coll_request)->qp_index = MCA_BCOL_IBOFFLOAD_QP_REGULAR;
}
(*coll_request)->n_frag_mpi_complete = 0;
(*coll_request)->n_frag_net_complete = 0;
fn_arguments->bcol_opaque_data = (void *) (*coll_request);
/*
* setup collective work request
*/
/* get collective frag */
coll_fragment = &((*coll_request)->first_collfrag);
mca_bcol_iboffload_collfrag_init(coll_fragment);
coll_fragment->mq_index = COLL_MQ;
coll_fragment->alg = RECURSIVE_DOUBLING_ALLREDUCE_ALG;
coll_fragment->mq_credits =
iboffload->alg_task_consump[RECURSIVE_DOUBLING_ALLREDUCE_ALG];
/* set pointers for (coll frag) <-> (coll full request) */
MCA_BCOL_IBOFFLOAD_SET_COLL_REQ_LINKS(*coll_request, coll_fragment);
coll_fragment->unpack_size =
mca_bcol_base_get_buff_length(fn_arguments->dtype, fn_arguments->count);
IBOFFLOAD_VERBOSE(10, ("The input data is %lf", *(double *) (*coll_request)->buffer_info[SBUF].buf));
return OMPI_SUCCESS;
}
static int mca_bcol_iboffload_allreduce_intra(bcol_function_args_t *fn_arguments,
struct coll_ml_function_t *const_args)
{
/* local variables */
int rc;
mca_bcol_iboffload_collreq_t *coll_request = NULL;
mca_bcol_iboffload_module_t *iboffload =
(mca_bcol_iboffload_module_t *) const_args->bcol_module;
/* Pasha: please do not touch this line, it used for ML buffer recycling barrier call */
bool if_bcol_last = ((const_args->index_of_this_type_in_collective + 1) ==
const_args->n_of_this_type_in_collective);
MCA_BCOL_CHECK_ORDER(const_args->bcol_module, fn_arguments);
IBOFFLOAD_VERBOSE(10, ("n_of_this_type_in_a_row %d, index_in_consecutive_same_bcol_calls %d",
const_args->n_of_this_type_in_a_row,
const_args->index_in_consecutive_same_bcol_calls + 1));
IBOFFLOAD_VERBOSE(10, ("Allreduce started.\n"));
fn_arguments->result_in_rbuf = true;
rc = mca_bcol_iboffload_allreduce_init(fn_arguments, iboffload,
&coll_request, if_bcol_last);
if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
IBOFFLOAD_VERBOSE(10, ("Get error from mca_bcol_iboffload_allreduce_init.\n"));
return rc;
}
/* Allreduce starting */
rc = iboffload->allreduce_algth(iboffload, coll_request);
if (OPAL_UNLIKELY(OMPI_ERROR == rc)) {
return BCOL_FN_NOT_STARTED;
}
IBOFFLOAD_VERBOSE(10, ("Wait for completions.\n"));
/* done */
return BCOL_FN_STARTED;
}
static int mca_bcol_iboffload_allreduce_progress(
bcol_function_args_t *input_args,
struct coll_ml_function_t *const_args)
{
mca_bcol_iboffload_collreq_t *coll_request =
(mca_bcol_iboffload_collreq_t *)
input_args->bcol_opaque_data;
if (BCOL_IS_COMPLETED(coll_request)) {
coll_request->user_handle_freed = true;
if (COLLREQ_IS_DONE(coll_request)) {
IBOFFLOAD_VERBOSE(10, ("Coll request already done.\n"));
RELEASE_COLLREQ(coll_request);
}
IBOFFLOAD_VERBOSE(10, ("Allreduce already done.\n"));
return BCOL_FN_COMPLETE;
}
return BCOL_FN_STARTED;
}
int mca_bcol_iboffload_allreduce_first_call(mca_bcol_iboffload_module_t *iboffload,
mca_bcol_iboffload_collreq_t *coll_request)
{
netpatterns_pair_exchange_node_t *my_exchange_node =
&iboffload->recursive_doubling_tree;
int i = 0, my_rank = iboffload->ibnet->super.my_index,
n_exchanges = my_exchange_node->n_exchanges,
*exchanges = my_exchange_node->rank_exchanges,
n_extra_src = my_exchange_node->n_extra_sources,
rank_extra_src = my_exchange_node->rank_extra_source;
mca_bcol_iboffload_endpoint_t *ep = iboffload->endpoints[my_rank];
/* Connecting to myself */
while (OMPI_SUCCESS !=
check_endpoint_state(ep, NULL, NULL)) {
opal_progress();
}
iboffload->alg_task_consump[RECURSIVE_DOUBLING_ALLREDUCE_ALG] = 0;
if (0 < n_extra_src) {
iboffload->alg_task_consump[RECURSIVE_DOUBLING_ALLREDUCE_ALG] += 4; /* Two CALCs and two WAITs tasks */
ep = iboffload->endpoints[rank_extra_src];
while (OMPI_SUCCESS !=
check_endpoint_state(ep, NULL, NULL)) {
opal_progress();
}
}
for (i = 0; i < n_exchanges; ++i) {
iboffload->alg_task_consump[RECURSIVE_DOUBLING_ALLREDUCE_ALG] += 4; /* Two CALCs and two WAITs tasks */
ep = iboffload->endpoints[exchanges[i]];
while (OMPI_SUCCESS !=
check_endpoint_state(ep, NULL, NULL)) {
opal_progress();
}
}
iboffload->alg_task_consump[RECURSIVE_DOUBLING_ALLREDUCE_ALG] += 4; /* Two CALCs and two WAITs tasks */
if (0 < my_exchange_node->n_extra_sources) {
iboffload->allreduce_algth =
(EXTRA_NODE == my_exchange_node->node_type)?
allreduce_extra_node:
non_pure_recursive_doubling;
} else {
if(mca_bcol_iboffload_component.enable_rdma_calc) {
iboffload->allreduce_algth =
rdma_pure_recursive_doubling;
} else {
iboffload->allreduce_algth =
pure_recursive_doubling;
}
}
return iboffload->allreduce_algth(iboffload, coll_request);
}
int mca_bcol_iboffload_allreduce_register(mca_bcol_base_module_t *super)
{
mca_bcol_base_coll_fn_comm_attributes_t comm_attribs;
mca_bcol_base_coll_fn_invoke_attributes_t inv_attribs;
IBOFFLOAD_VERBOSE(10, ("Register iboffload Allreduce.\n"));
comm_attribs.bcoll_type = BCOL_ALLREDUCE;
comm_attribs.comm_size_min = 0;
comm_attribs.comm_size_max = 1024 * 1024;
comm_attribs.waiting_semantics = NON_BLOCKING;
inv_attribs.bcol_msg_min = 0;
inv_attribs.bcol_msg_max = 20000; /* range 1 */
inv_attribs.datatype_bitmap = 0xffffffff;
inv_attribs.op_types_bitmap = 0xffffffff;
comm_attribs.data_src = DATA_SRC_KNOWN;
mca_bcol_base_set_attributes(super,
&comm_attribs, &inv_attribs,
mca_bcol_iboffload_allreduce_intra,
mca_bcol_iboffload_allreduce_progress);
return OMPI_SUCCESS;
}