Correctly handle non-blocking collectives tags
As it is possible to have multiple outstanding non-blocking collectives provided by different collective modules, we need a consistent mechanism to allow them to select unique tags for each instance of a collective. Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
Этот коммит содержится в:
родитель
8582e10d2b
Коммит
c2970a3695
@ -40,6 +40,7 @@
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/topo/base/base.h"
|
||||
#include "ompi/runtime/params.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
@ -382,9 +383,8 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
|
||||
comm->c_pml_comm = NULL;
|
||||
comm->c_topo = NULL;
|
||||
comm->c_coll = NULL;
|
||||
comm->c_ibcast_tag = 0;
|
||||
comm->c_ireduce_tag = 0;
|
||||
|
||||
comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
|
||||
/* A keyhash will be created if/when an attribute is cached on
|
||||
this communicator */
|
||||
comm->c_keyhash = NULL;
|
||||
|
@ -188,13 +188,12 @@ struct ompi_communicator_t {
|
||||
/* Collectives module interface and data */
|
||||
mca_coll_base_comm_coll_t *c_coll;
|
||||
|
||||
/* Non-blocking collective tag. These are added here as they should be
|
||||
* shared between all non-blocking collective modules (to avoid message
|
||||
* collisions between them in the case where multiple outstanding
|
||||
* non-blocking collective coexists using multiple backends).
|
||||
/* Non-blocking collective tag. These tags might be shared between
|
||||
* all non-blocking collective modules (to avoid message collision
|
||||
* between them in the case where multiple outstanding non-blocking
|
||||
* collective coexists using multiple backends).
|
||||
*/
|
||||
opal_atomic_int32_t c_ibcast_tag;
|
||||
opal_atomic_int32_t c_ireduce_tag;
|
||||
opal_atomic_int32_t c_nbc_tag;
|
||||
};
|
||||
typedef struct ompi_communicator_t ompi_communicator_t;
|
||||
|
||||
|
@ -25,21 +25,14 @@ int ompi_coll_adapt_ibcast_fini(void);
|
||||
int ompi_coll_adapt_bcast(BCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size, int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS,
|
||||
int ibcast_tag);
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
int ompi_coll_adapt_ibcast_binomial(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_in_order_binomial(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_binary(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_pipeline(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_chain(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_linear(IBCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast_tuned(IBCAST_ARGS);
|
||||
|
||||
/* Reduce */
|
||||
int ompi_coll_adapt_ireduce_register(void);
|
||||
@ -47,18 +40,11 @@ int ompi_coll_adapt_ireduce_fini(void);
|
||||
int ompi_coll_adapt_reduce(REDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size, int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS,
|
||||
int ireduce_tag);
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
int ompi_coll_adapt_ireduce_tuned(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_binomial(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_in_order_binomial(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_binary(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_pipeline(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_chain(IREDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce_linear(IREDUCE_ARGS);
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
#include "coll_adapt_context.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
#include "opal/sys/atomic.h"
|
||||
@ -27,8 +27,7 @@ typedef int (*ompi_coll_adapt_ibcast_fn_t) (void *buff,
|
||||
int root,
|
||||
struct ompi_communicator_t * comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module,
|
||||
int ibcast_tag);
|
||||
mca_coll_base_module_t * module);
|
||||
|
||||
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ibcast_algorithm_index[] = {
|
||||
{0, (uintptr_t) ompi_coll_adapt_ibcast_tuned},
|
||||
@ -158,11 +157,11 @@ static int send_cb(ompi_request_t * req)
|
||||
"[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
|
||||
send_context->peer, (void *) send_context->buff, send_count,
|
||||
(send_context->con->ibcast_tag << 16) + new_id));
|
||||
send_context->con->ibcast_tag - new_id));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype, send_context->peer,
|
||||
(send_context->con->ibcast_tag << 16) + new_id,
|
||||
send_context->con->ibcast_tag - new_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
@ -245,10 +244,10 @@ static int recv_cb(ompi_request_t * req)
|
||||
"[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id, context->peer,
|
||||
(void *) recv_buff, recv_count,
|
||||
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id));
|
||||
recv_context->con->ibcast_tag - recv_context->frag_id));
|
||||
MCA_PML_CALL(irecv
|
||||
(recv_buff, recv_count, recv_context->con->datatype, recv_context->peer,
|
||||
(recv_context->con->ibcast_tag << 16) + recv_context->frag_id,
|
||||
recv_context->con->ibcast_tag - recv_context->frag_id,
|
||||
recv_context->con->comm, &recv_req));
|
||||
|
||||
/* Invoke recvive call back */
|
||||
@ -282,12 +281,12 @@ static int recv_cb(ompi_request_t * req)
|
||||
"[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
|
||||
send_context->peer, (void *) send_context->buff, send_count,
|
||||
(send_context->con->ibcast_tag << 16) + send_context->frag_id));
|
||||
send_context->con->ibcast_tag - send_context->frag_id));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
(send_context->con->ibcast_tag << 16) + send_context->frag_id,
|
||||
send_context->con->ibcast_tag - send_context->frag_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
@ -344,12 +343,10 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
|
||||
*request = temp_request;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
int ibcast_tag = opal_atomic_add_fetch_32(&(comm->c_ibcast_tag), 1);
|
||||
ibcast_tag = ibcast_tag % 4096;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
|
||||
"ibcast tag %d root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
|
||||
ibcast_tag, root, mca_coll_adapt_component.adapt_ibcast_algorithm,
|
||||
"ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
|
||||
root, mca_coll_adapt_component.adapt_ibcast_algorithm,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
mca_coll_adapt_component.adapt_ibcast_max_send_requests,
|
||||
mca_coll_adapt_component.adapt_ibcast_max_recv_requests));
|
||||
@ -358,89 +355,82 @@ int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *dataty
|
||||
(ompi_coll_adapt_ibcast_fn_t)
|
||||
ompi_coll_adapt_ibcast_algorithm_index[mca_coll_adapt_component.adapt_ibcast_algorithm].
|
||||
algorithm_fn_ptr;
|
||||
return bcast_func(buff, count, datatype, root, comm, request, module, ibcast_tag);
|
||||
return bcast_func(buff, count, datatype, root, comm, request, module);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ibcast functions with different algorithms
|
||||
*/
|
||||
int ompi_coll_adapt_ibcast_tuned(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module, int ibcast_tag)
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module,
|
||||
int ibcast_tag)
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_in_order_binomial(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ibcast_tag)
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_binary(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ibcast_tag)
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_pipeline(void *buff, int count, struct ompi_datatype_t *datatype,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module,
|
||||
int ibcast_tag)
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_chain(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ibcast_tag)
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ibcast_tag)
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
int fanout = ompi_comm_size(comm) - 1;
|
||||
ompi_coll_tree_t *tree;
|
||||
@ -453,16 +443,15 @@ int ompi_coll_adapt_ibcast_linear(void *buff, int count, struct ompi_datatype_t
|
||||
}
|
||||
int err =
|
||||
ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
ibcast_tag);
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size, int ibcast_tag)
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size)
|
||||
{
|
||||
int i, j, rank, err;
|
||||
/* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */
|
||||
@ -555,11 +544,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
con->mutex = mutex;
|
||||
con->request = temp_request;
|
||||
con->tree = tree;
|
||||
con->ibcast_tag = ibcast_tag;
|
||||
con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Ibcast, root %d, tag %d\n", rank, root,
|
||||
ibcast_tag));
|
||||
con->ibcast_tag));
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n",
|
||||
rank, (void *) con->mutex, tree->tree_nextsize, num_segs,
|
||||
@ -610,11 +599,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
rank, context->frag_id, context->peer,
|
||||
(void *) send_buff, send_count, (ibcast_tag << 16) + i));
|
||||
(void *) send_buff, send_count, con->ibcast_tag - i));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, datatype, context->peer,
|
||||
(ibcast_tag << 16) + i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
|
||||
con->ibcast_tag - i, MCA_PML_BASE_SEND_SYNCHRONOUS, comm,
|
||||
&send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
@ -668,11 +657,11 @@ int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t
|
||||
"[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id,
|
||||
context->peer, (void *) recv_buff, recv_count,
|
||||
(ibcast_tag << 16) + i));
|
||||
con->ibcast_tag - i));
|
||||
err =
|
||||
MCA_PML_CALL(irecv
|
||||
(recv_buff, recv_count, datatype, context->peer,
|
||||
(ibcast_tag << 16) + i, comm, &recv_req));
|
||||
con->ibcast_tag - i, comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "coll_adapt_item.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
@ -32,7 +32,7 @@ typedef int (*ompi_coll_adapt_ireduce_fn_t) (const void *sbuf,
|
||||
int root,
|
||||
struct ompi_communicator_t * comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag);
|
||||
mca_coll_base_module_t * module);
|
||||
|
||||
static ompi_coll_adapt_algorithm_index_t ompi_coll_adapt_ireduce_algorithm_index[] = {
|
||||
{0, (uintptr_t)ompi_coll_adapt_ireduce_tuned},
|
||||
@ -263,14 +263,14 @@ static int send_cb(ompi_request_t * req)
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n",
|
||||
send_context->con->rank, send_context->frag_id, send_context->peer,
|
||||
(send_context->con->ireduce_tag << 16) + send_context->frag_id));
|
||||
send_context->con->ireduce_tag - send_context->frag_id));
|
||||
|
||||
ompi_request_t *send_req;
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_context->buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
(context->con->ireduce_tag << 16) + send_context->frag_id,
|
||||
context->con->ireduce_tag - send_context->frag_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
@ -355,13 +355,13 @@ static int recv_cb(ompi_request_t * req)
|
||||
"[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n",
|
||||
context->con->rank, recv_context->frag_id, recv_context->peer,
|
||||
(void *) inbuf,
|
||||
(recv_context->con->ireduce_tag << 16) + recv_context->frag_id));
|
||||
recv_context->con->ireduce_tag - recv_context->frag_id));
|
||||
ompi_request_t *recv_req;
|
||||
err =
|
||||
MCA_PML_CALL(irecv
|
||||
(temp_recv_buf, recv_count, recv_context->con->datatype,
|
||||
recv_context->peer,
|
||||
(recv_context->con->ireduce_tag << 16) + recv_context->frag_id,
|
||||
recv_context->con->ireduce_tag - recv_context->frag_id,
|
||||
recv_context->con->comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
@ -460,14 +460,14 @@ static int recv_cb(ompi_request_t * req)
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n",
|
||||
send_context->con->rank, send_context->frag_id, send_context->peer,
|
||||
(send_context->con->ireduce_tag << 16) + send_context->frag_id));
|
||||
send_context->con->ireduce_tag - send_context->frag_id));
|
||||
|
||||
ompi_request_t *send_req;
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_context->buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
(send_context->con->ireduce_tag << 16) + send_context->frag_id,
|
||||
send_context->con->ireduce_tag - send_context->frag_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
@ -524,12 +524,10 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
|
||||
if (count == 0) {
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
int ireduce_tag = opal_atomic_add_fetch_32(&(comm->c_ireduce_tag), 1);
|
||||
ireduce_tag = (ireduce_tag % 4096) + 4096;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
|
||||
"ireduce tag %d root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
|
||||
ireduce_tag, root, mca_coll_adapt_component.adapt_ireduce_algorithm,
|
||||
"ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
|
||||
root, mca_coll_adapt_component.adapt_ireduce_algorithm,
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
mca_coll_adapt_component.adapt_ireduce_max_send_requests,
|
||||
mca_coll_adapt_component.adapt_ireduce_max_recv_requests));
|
||||
@ -538,93 +536,78 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
|
||||
(ompi_coll_adapt_ireduce_fn_t)
|
||||
ompi_coll_adapt_ireduce_algorithm_index[mca_coll_adapt_component.
|
||||
adapt_ireduce_algorithm].algorithm_fn_ptr;
|
||||
return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module, ireduce_tag);
|
||||
return reduce_func(sbuf, rbuf, count, dtype, op, root, comm, request, module);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ireduce functions with different algorithms
|
||||
*/
|
||||
int ompi_coll_adapt_ireduce_tuned(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_bmtree(comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_in_order_binomial(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_in_order_bmtree(comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_in_order_bmtree(comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_binary(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_tree(2, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_tree(2, comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_pipeline(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(1, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_chain(1, comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ireduce_chain(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_coll_tree_t *tree = ompi_coll_base_topo_build_chain(4, comm, root);
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, ompi_coll_base_topo_build_chain(4, comm, root),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
int fanout = ompi_comm_size(comm) - 1;
|
||||
ompi_coll_tree_t *tree;
|
||||
@ -635,19 +618,17 @@ int ompi_coll_adapt_ireduce_linear(const void *sbuf, void *rbuf, int count,
|
||||
} else {
|
||||
tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root);
|
||||
}
|
||||
int err =
|
||||
ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
tree, mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
ireduce_tag);
|
||||
return err;
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm,
|
||||
request, module, tree,
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size, int ireduce_tag)
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size)
|
||||
{
|
||||
|
||||
ptrdiff_t extent, lower_bound, segment_increment;
|
||||
@ -777,12 +758,12 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
con->rbuf = (char *) rbuf;
|
||||
con->root = root;
|
||||
con->distance = distance;
|
||||
con->ireduce_tag = ireduce_tag;
|
||||
con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
|
||||
con->real_seg_size = real_seg_size;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root,
|
||||
ireduce_tag));
|
||||
con->ireduce_tag));
|
||||
|
||||
/* If the current process is not leaf node */
|
||||
if (tree->tree_nextsize > 0) {
|
||||
@ -849,14 +830,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
"[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n",
|
||||
context->con->rank, context->frag_id, context->peer,
|
||||
recv_count, (void *) inbuf,
|
||||
(ireduce_tag << 16) + seg_index));
|
||||
con->ireduce_tag - seg_index));
|
||||
|
||||
/* Create a recv request */
|
||||
ompi_request_t *recv_req;
|
||||
err =
|
||||
MCA_PML_CALL(irecv
|
||||
(temp_recv_buf, recv_count, dtype, tree->tree_next[i],
|
||||
(ireduce_tag << 16) + seg_index, comm, &recv_req));
|
||||
con->ireduce_tag - seg_index, comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
@ -908,14 +889,14 @@ int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n",
|
||||
context->con->rank, context->frag_id, context->peer,
|
||||
send_count, (ireduce_tag << 16) + context->frag_id));
|
||||
send_count, con->ireduce_tag - context->frag_id));
|
||||
|
||||
/* Create send request */
|
||||
ompi_request_t *send_req;
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(context->buff, send_count, dtype, tree->tree_prev,
|
||||
(ireduce_tag << 16) + context->frag_id,
|
||||
con->ireduce_tag - context->frag_id,
|
||||
MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
|
@ -27,6 +27,8 @@
|
||||
#include "ompi/mca/mca.h"
|
||||
#include "ompi/datatype/ompi_datatype.h"
|
||||
#include "ompi/request/request.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
|
||||
@ -60,6 +62,22 @@ struct ompi_coll_base_nbc_request_t {
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t);
|
||||
|
||||
static inline int32_t
|
||||
ompi_coll_base_nbc_reserve_tags(ompi_communicator_t* comm, int32_t reserve)
|
||||
{
|
||||
int32_t tag, old_tag;
|
||||
assert( reserve > 0 );
|
||||
reread_tag: /* In case we fail to atomically update the tag */
|
||||
tag = old_tag = comm->c_nbc_tag;
|
||||
if ((tag - reserve) < MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
}
|
||||
if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_32(&comm->c_nbc_tag, &old_tag, tag - reserve) ) {
|
||||
goto reread_tag;
|
||||
}
|
||||
return tag;
|
||||
}
|
||||
|
||||
typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t;
|
||||
|
||||
/**
|
||||
|
@ -94,7 +94,6 @@ struct ompi_coll_libnbc_module_t {
|
||||
mca_coll_base_module_t super;
|
||||
opal_mutex_t mutex;
|
||||
bool comm_registered;
|
||||
int tag;
|
||||
#ifdef NBC_CACHE_SCHEDULE
|
||||
void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct
|
||||
hb_tree, but since this is a
|
||||
|
@ -25,7 +25,7 @@
|
||||
* Additional copyrights may follow
|
||||
*/
|
||||
#include "nbc_internal.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
|
||||
@ -595,7 +595,6 @@ void NBC_Return_handle(ompi_coll_libnbc_request_t *request) {
|
||||
}
|
||||
|
||||
int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) {
|
||||
comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
|
||||
#ifdef NBC_CACHE_SCHEDULE
|
||||
/* initialize the NBC_ALLTOALL SchedCache tree */
|
||||
@ -672,7 +671,7 @@ int NBC_Start(NBC_Handle *handle) {
|
||||
int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
ompi_coll_libnbc_module_t *module, bool persistent,
|
||||
ompi_request_t **request, void *tmpbuf) {
|
||||
int ret, tmp_tag;
|
||||
int ret;
|
||||
bool need_register = false;
|
||||
ompi_coll_libnbc_request_t *handle;
|
||||
|
||||
@ -685,13 +684,7 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
|
||||
/* update the module->tag here because other processes may have operations
|
||||
* and they may update the module->tag */
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
tmp_tag = module->tag--;
|
||||
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->mutex);
|
||||
(void)ompi_coll_base_nbc_reserve_tags(comm, 1);
|
||||
|
||||
OBJ_RELEASE(schedule);
|
||||
free(tmpbuf);
|
||||
@ -712,20 +705,15 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
|
||||
/******************** Do the tag and shadow comm administration ... ***************/
|
||||
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
tmp_tag = module->tag--;
|
||||
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
handle->tag = ompi_coll_base_nbc_reserve_tags(comm, 1);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
if (true != module->comm_registered) {
|
||||
module->comm_registered = true;
|
||||
need_register = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->mutex);
|
||||
|
||||
handle->tag = tmp_tag;
|
||||
|
||||
/* register progress */
|
||||
if (need_register) {
|
||||
@ -737,7 +725,6 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
}
|
||||
|
||||
handle->comm=comm;
|
||||
/*printf("got module: %lu tag: %i\n", module, module->tag);*/
|
||||
|
||||
/******************** end of tag and shadow comm administration ... ***************/
|
||||
handle->comminfo = module;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user