1
1

Correctly handle non-blocking collectives tags

As it is possible to have multiple outstanding non-blocking collectives
provided by different collective modules, we need a consistent
mechanism to allow them to select unique tags for each instance of a
collective.

Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
Этот коммит содержится в:
George Bosilca 2020-05-07 23:59:56 -04:00 коммит произвёл Jeff Squyres
родитель 8582e10d2b
Коммит c2970a3695
8 изменённых файлов: 151 добавлений и 192 удалений

Просмотреть файл

@ -40,6 +40,7 @@
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/topo/base/base.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
@ -382,9 +383,8 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
comm->c_pml_comm = NULL;
comm->c_topo = NULL;
comm->c_coll = NULL;
comm->c_ibcast_tag = 0;
comm->c_ireduce_tag = 0;
comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
/* A keyhash will be created if/when an attribute is cached on
this communicator */
comm->c_keyhash = NULL;

Просмотреть файл

@ -188,13 +188,12 @@ struct ompi_communicator_t {
/* Collectives module interface and data */
mca_coll_base_comm_coll_t *c_coll;
/* Non-blocking collective tag. These are added here as they should be
* shared between all non-blocking collective modules (to avoid message
* collisions between them in the case where multiple outstanding
* non-blocking collective coexists using multiple backends).
/* Non-blocking collective tag. These tags might be shared between
* all non-blocking collective modules (to avoid message collision
* between them in the case where multiple outstanding non-blocking
* collective coexists using multiple backends).
*/
opal_atomic_int32_t c_ibcast_tag;
opal_atomic_int32_t c_ireduce_tag;
opal_atomic_int32_t c_nbc_tag;
};
typedef struct ompi_communicator_t ompi_communicator_t;

Просмотреть файл

@ -25,21 +25,14 @@ int ompi_coll_adapt_ibcast_fini(void);
int ompi_coll_adapt_bcast(BCAST_ARGS);
int ompi_coll_adapt_ibcast(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
ompi_coll_tree_t * tree, size_t seg_size, int ibcast_tag);
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS,
int ibcast_tag);
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS,
int ibcast_tag);
ompi_coll_tree_t * tree, size_t seg_size);
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS);
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS);
/* Reduce */
int ompi_coll_adapt_ireduce_register(void);
@ -47,18 +40,11 @@ int ompi_coll_adapt_ireduce_fini(void);
int ompi_coll_adapt_reduce(REDUCE_ARGS);
int ompi_coll_adapt_ireduce(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
ompi_coll_tree_t * tree, size_t seg_size, int ireduce_tag);
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS,
int ireduce_tag);
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS,
int ireduce_tag);
ompi_coll_tree_t * tree, size_t seg_size);
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS);
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS);

Просмотреть файл

@ -14,7 +14,7 @@
#include "coll_adapt.h"
#include "coll_adapt_algorithms.h"
#include "coll_adapt_context.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "opal/util/bit_ops.h"
#include "opal/sys/atomic.h"
@ -27,8 +27,7 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff,
int root,
struct ompi_communicator_t * comm,
ompi_request_t ** request,
mca_coll_base_module_t * module,
int ibcast_tag);
mca_coll_base_module_t * module);
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = {
{0, (uintptr_t) ompi_coll_adapt_ibcast_tuned},
@ -158,11 +157,11 @@ static int send_cb(ompi_request_t * req)
"[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n",
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
send_context->peer, (void *) send_context->buff, send_count,
(send_context->con->ibcast_tag << 16) + new_id));
send_context->con->ibcast_tag - new_id));
err =
MCA_PML_CALL(isend
(send_buff, send_count, send_context->con->datatype, send_context->peer,
(send_context->con->ibcast_tag << 16) + new_id,
send_context->con->ibcast_tag - new_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
OPAL_THREAD_UNLOCK(context->con->mutex);
@ -245,10 +244,10 @@ static int recv_cb(ompi_request_t * req)
"[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n",
ompi_comm_rank(context->con->comm), context->frag_id, context->peer,
(void *) recv_buff, recv_count,
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id));
recv_context->con->ibcast_tag - recv_context->frag_id));
MCA_PML_CALL(irecv
(recv_buff, recv_count, recv_context->con->datatype, recv_context->peer,
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id,
recv_context->con->ibcast_tag - recv_context->frag_id,
recv_context->con->comm, &recv_req));
/* Invoke recvive call back */
@ -282,12 +281,12 @@ static int recv_cb(ompi_request_t * req)
"[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n",
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
send_context->peer, (void *) send_context->buff, send_count,
(send_context->con->ibcast_tag << 16) + send_context->frag_id));
send_context->con->ibcast_tag - send_context->frag_id));
err =
MCA_PML_CALL(isend
(send_buff, send_count, send_context->con->datatype,
send_context->peer,
(send_context->con->ibcast_tag << 16) + send_context->frag_id,
send_context->con->ibcast_tag - send_context->frag_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
OPAL_THREAD_UNLOCK(context->con->mutex);
@ -344,12 +343,10 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
*request = temp_request;
return MPI_SUCCESS;
}
int ibcast_tag = opal_atomic_add_fetch_32(&(comm->c_ibcast_tag), 1);
ibcast_tag = ibcast_tag % 4096;
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
"ibcast tag %d root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
ibcast_tag, root, mca_coll_adapt_component.adapt_ibcast_algorithm,
"ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
root, mca_coll_adapt_component.adapt_ibcast_algorithm,
mca_coll_adapt_component.adapt_ibcast_segment_size,
mca_coll_adapt_component.adapt_ibcast_max_send_requests,
mca_coll_adapt_component.adapt_ibcast_max_recv_requests));
@ -358,89 +355,82 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
(ompi_coll_adapt_ibcast_fn_t)
ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].
algorithm_fn_ptr;
return bcast_func(buff, count, datatype, root, comm, request, module, ibcast_tag);
return bcast_func(buff, count, datatype, root, comm, request, module);
}
/*
* Ibcast functions with different algorithms
*/
int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module, int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module)
{
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
return OMPI_ERR_NOT_IMPLEMENTED;
}
int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module,
int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module,
int ibcast_tag)
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request, mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
int fanout = ompi_comm_size(comm) - 1;
ompi_coll_tree_t *tree;
@ -453,16 +443,15 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t
}
int err =
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
mca_coll_adapt_component.adapt_ibcast_segment_size,
ibcast_tag);
mca_coll_adapt_component.adapt_ibcast_segment_size);
return err;
}
int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size, int ibcast_tag)
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size)
{
int i, j, rank, err;
/* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */
@ -555,11 +544,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
con->mutex = mutex;
con->request = temp_request;
con->tree = tree;
con->ibcast_tag = ibcast_tag;
con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: Ibcast, root %d, tag %d\n", rank, root,
ibcast_tag));
con->ibcast_tag));
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n",
rank, (void *) con->mutex, tree->tree_nextsize, num_segs,
@ -610,11 +599,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n",
rank, context->frag_id, context->peer,
(void *) send_buff, send_count, (ibcast_tag << 16) + i));
(void *) send_buff, send_count, con->ibcast_tag - i));
err =
MCA_PML_CALL(isend
(send_buff, send_count, datatype, context->peer,
(ibcast_tag << 16) + i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
con->ibcast_tag - i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
&send_req));
if (MPI_SUCCESS != err) {
return err;
@ -668,11 +657,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
"[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n",
ompi_comm_rank(context->con->comm), context->frag_id,
context->peer, (void *) recv_buff, recv_count,
(ibcast_tag << 16) + i));
con->ibcast_tag - i));
err =
MCA_PML_CALL(irecv
(recv_buff, recv_count, datatype, context->peer,
(ibcast_tag << 16) + i, comm, &recv_req));
con->ibcast_tag - i, comm, &recv_req));
if (MPI_SUCCESS != err) {
return err;
}

Просмотреть файл

@ -17,7 +17,7 @@
#include "coll_adapt_item.h"
#include "ompi/constants.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_base_topo.h"
@ -32,7 +32,7 @@ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf,
int root,
struct ompi_communicator_t * comm,
ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag);
mca_coll_base_module_t * module);
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = {
{0, (uintptr_t)ompi_coll_adapt_ireduce_tuned},
@ -263,14 +263,14 @@ static int send_cb(ompi_request_t * req)
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n",
send_context->con->rank, send_context->frag_id, send_context->peer,
(send_context->con->ireduce_tag << 16) + send_context->frag_id));
send_context->con->ireduce_tag - send_context->frag_id));
ompi_request_t *send_req;
err =
MCA_PML_CALL(isend
(send_context->buff, send_count, send_context->con->datatype,
send_context->peer,
(context->con->ireduce_tag << 16) + send_context->frag_id,
context->con->ireduce_tag - send_context->frag_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
return err;
@ -355,13 +355,13 @@ static int recv_cb(ompi_request_t * req)
"[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n",
context->con->rank, recv_context->frag_id, recv_context->peer,
(void *) inbuf,
(recv_context->con->ireduce_tag << 16) + recv_context->frag_id));
recv_context->con->ireduce_tag - recv_context->frag_id));
ompi_request_t *recv_req;
err =
MCA_PML_CALL(irecv
(temp_recv_buf, recv_count, recv_context->con->datatype,
recv_context->peer,
(recv_context->con->ireduce_tag << 16) + recv_context->frag_id,
recv_context->con->ireduce_tag - recv_context->frag_id,
recv_context->con->comm, &recv_req));
if (MPI_SUCCESS != err) {
return err;
@ -460,14 +460,14 @@ static int recv_cb(ompi_request_t * req)
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n",
send_context->con->rank, send_context->frag_id, send_context->peer,
(send_context->con->ireduce_tag << 16) + send_context->frag_id));
send_context->con->ireduce_tag - send_context->frag_id));
ompi_request_t *send_req;
err =
MCA_PML_CALL(isend
(send_context->buff, send_count, send_context->con->datatype,
send_context->peer,
(send_context->con->ireduce_tag << 16) + send_context->frag_id,
send_context->con->ireduce_tag - send_context->frag_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
if (MPI_SUCCESS != err) {
return err;
@ -524,12 +524,10 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
if (count == 0) {
return MPI_SUCCESS;
}
int ireduce_tag = opal_atomic_add_fetch_32(&(comm->c_ireduce_tag), 1);
ireduce_tag = (ireduce_tag % 4096) + 4096;
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
"ireduce tag %d root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
ireduce_tag, root, mca_coll_adapt_component.adapt_ireduce_algorithm,
"ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
root, mca_coll_adapt_component.adapt_ireduce_algorithm,
mca_coll_adapt_component.adapt_ireduce_segment_size,
mca_coll_adapt_component.adapt_ireduce_max_send_requests,
mca_coll_adapt_component.adapt_ireduce_max_recv_requests));
@ -538,93 +536,78 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
(ompi_coll_adapt_ireduce_fn_t)
ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.
adapt_ireduce_algorithm].algorithm_fn_ptr;
return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module, ireduce_tag);
return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module);
}
/*
* Ireduce functions with different algorithms
*/
int ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t *module)
{
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
return OMPI_ERR_NOT_IMPLEMENTED;
}
int ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, ompi_coll_base_topo_build_bmtree(comm, root),
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root),
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, ompi_coll_base_topo_build_tree(2, comm, root),
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, ompi_coll_base_topo_build_chain(1, comm, root),
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, ompi_coll_base_topo_build_chain(4, comm, root),
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module)
{
int fanout = ompi_comm_size(comm) - 1;
ompi_coll_tree_t *tree;
@ -635,19 +618,17 @@ int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
} else {
tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root);
}
int err =
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
ireduce_tag);
return err;
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
request, module, tree,
mca_coll_adapt_component.adapt_ireduce_segment_size);
}
int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size, int ireduce_tag)
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
struct ompi_communicator_t *comm, ompi_request_t ** request,
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
size_t seg_size)
{
ptrdiff_t extent, lower_bound, segment_increment;
@ -777,12 +758,12 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
con->rbuf = (char *) rbuf;
con->root = root;
con->distance = distance;
con->ireduce_tag = ireduce_tag;
con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
con->real_seg_size = real_seg_size;
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root,
ireduce_tag));
con->ireduce_tag));
/* If the current process is not leaf node */
if (tree->tree_nextsize > 0) {
@ -849,14 +830,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
"[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n",
context->con->rank, context->frag_id, context->peer,
recv_count, (void *) inbuf,
(ireduce_tag << 16) + seg_index));
con->ireduce_tag - seg_index));
/* Create a recv request */
ompi_request_t *recv_req;
err =
MCA_PML_CALL(irecv
(temp_recv_buf, recv_count, dtype, tree->tree_next[i],
(ireduce_tag << 16) + seg_index, comm, &recv_req));
con->ireduce_tag - seg_index, comm, &recv_req));
if (MPI_SUCCESS != err) {
return err;
}
@ -908,14 +889,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
"[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n",
context->con->rank, context->frag_id, context->peer,
send_count, (ireduce_tag << 16) + context->frag_id));
send_count, con->ireduce_tag - context->frag_id));
/* Create send request */
ompi_request_t *send_req;
err =
MCA_PML_CALL(isend
(context->buff, send_count, dtype, tree->tree_prev,
(ireduce_tag << 16) + context->frag_id,
con->ireduce_tag - context->frag_id,
MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &send_req));
if (MPI_SUCCESS != err) {
return err;

Просмотреть файл

@ -27,6 +27,8 @@
#include "ompi/mca/mca.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/request/request.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/op/op.h"
#include "ompi/mca/pml/pml.h"
@ -60,6 +62,22 @@ struct ompi_coll_base_nbc_request_t {
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t);
static inline int32_t
ompi_coll_base_nbc_reserve_tags(ompi_communicator_t* comm, int32_t reserve)
{
int32_t tag, old_tag;
assert( reserve > 0 );
reread_tag: /* In case we fail to atomically update the tag */
tag = old_tag = comm->c_nbc_tag;
if ((tag - reserve) < MCA_COLL_BASE_TAG_NONBLOCKING_END) {
tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
}
if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_32(&comm->c_nbc_tag, &old_tag, tag - reserve) ) {
goto reread_tag;
}
return tag;
}
typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t;
/**

Просмотреть файл

@ -94,7 +94,6 @@ struct ompi_coll_libnbc_module_t {
mca_coll_base_module_t super;
opal_mutex_t mutex;
bool comm_registered;
int tag;
#ifdef NBC_CACHE_SCHEDULE
void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct
hb_tree, but since this is a

Просмотреть файл

@ -25,7 +25,7 @@
* Additional copyrights may follow
*/
#include "nbc_internal.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/op/op.h"
#include "ompi/mca/pml/pml.h"
@ -595,7 +595,6 @@ void NBC_Return_handle(ompi_coll_libnbc_request_t *request) {
}
int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) {
comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
#ifdef NBC_CACHE_SCHEDULE
/* initialize the NBC_ALLTOALL SchedCache tree */
@ -672,7 +671,7 @@ int NBC_Start(NBC_Handle *handle) {
int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
ompi_coll_libnbc_module_t *module, bool persistent,
ompi_request_t **request, void *tmpbuf) {
int ret, tmp_tag;
int ret;
bool need_register = false;
ompi_coll_libnbc_request_t *handle;
@ -685,13 +684,7 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
/* update the module->tag here because other processes may have operations
* and they may update the module->tag */
OPAL_THREAD_LOCK(&module->mutex);
tmp_tag = module->tag--;
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
NBC_DEBUG(2,"resetting tags ...\n");
}
OPAL_THREAD_UNLOCK(&module->mutex);
(void)ompi_coll_base_nbc_reserve_tags(comm, 1);
OBJ_RELEASE(schedule);
free(tmpbuf);
@ -712,20 +705,15 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
/******************** Do the tag and shadow comm administration ... ***************/
OPAL_THREAD_LOCK(&module->mutex);
tmp_tag = module->tag--;
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
NBC_DEBUG(2,"resetting tags ...\n");
}
handle->tag = ompi_coll_base_nbc_reserve_tags(comm, 1);
OPAL_THREAD_LOCK(&module->mutex);
if (true != module->comm_registered) {
module->comm_registered = true;
need_register = true;
}
OPAL_THREAD_UNLOCK(&module->mutex);
handle->tag = tmp_tag;
/* register progress */
if (need_register) {
@ -737,7 +725,6 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
}
handle->comm=comm;
/*printf("got module: %lu tag: %i\n", module, module->tag);*/
/******************** end of tag and shadow comm administration ... ***************/
handle->comminfo = module;