Bunches of changes
- remove redundant OBJ_CONSTRUCT in bcast - fix up some macros in coll_sm.h - check to ensure that if there are too many processes in the communicator (i.e., if we couldn't fit a flag for each of them in the control segment), then fail selection - setup the in_use flags properly - adapt to new mpool API - first working copy of reduce -- not tree-baed (but still NUMA-aware), and only processes in order from process 0 to process N-1 -- do not have a tree-based and/or commutative version yet (i.e., process the results in whatever order they arrive) Reduce now passes the new ibm reduce_big.c test. Woo hoo! Time to declare success for the evening (and run the intel test tomorrow). This commit was SVN r7379.
Этот коммит содержится в:
родитель
ec621c2145
Коммит
068b9c72a2
@ -398,13 +398,13 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
#define FLAG_SETUP(flag_num, flag, data) \
|
||||
(flag) = (mca_coll_sm_in_use_flag_t*) \
|
||||
(((char *) (data)->mcb_in_use_flags) + \
|
||||
((flag_num) * mca_coll_sm_component.sm_control_size));
|
||||
((flag_num) * mca_coll_sm_component.sm_control_size))
|
||||
|
||||
/**
|
||||
* Macro to wait for the in-use flag to become idle (used by the root)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_IDLE(flag) \
|
||||
while (0 != (flag)->mcsiuf_num_procs_using) continue;
|
||||
while (0 != (flag)->mcsiuf_num_procs_using) continue
|
||||
|
||||
/**
|
||||
* Macro to wait for a flag to indicate that it's ready for this
|
||||
@ -412,32 +412,32 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
* been called)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_OP(flag, op) \
|
||||
while ((op) != flag->mcsiuf_operation_count) continue;
|
||||
while ((op) != flag->mcsiuf_operation_count) continue
|
||||
|
||||
/**
|
||||
* Macro to set an in-use flag with relevant data to claim it
|
||||
*/
|
||||
#define FLAG_RETAIN(flag, num_procs, op_count) \
|
||||
(flag)->mcsiuf_num_procs_using = (num_procs); \
|
||||
(flag)->mcsiuf_operation_count = (op_count);
|
||||
(flag)->mcsiuf_operation_count = (op_count)
|
||||
|
||||
/**
|
||||
* Macro to release an in-use flag from this process
|
||||
*/
|
||||
#define FLAG_RELEASE(flag) \
|
||||
opal_atomic_add(&(flag)->mcsiuf_num_procs_using, -1);
|
||||
opal_atomic_add(&(flag)->mcsiuf_num_procs_using, -1)
|
||||
|
||||
/**
|
||||
* Macro to copy a single segment in from a user buffer to a shared
|
||||
* segment
|
||||
*/
|
||||
#define COPY_FRAGMENT_IN(convertor, index, iov, max_data) \
|
||||
#define COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data) \
|
||||
(iov).iov_base = \
|
||||
(index)->mcbmi_data + \
|
||||
(rank * mca_coll_sm_component.sm_fragment_size); \
|
||||
((rank) * mca_coll_sm_component.sm_fragment_size); \
|
||||
(max_data) = (iov).iov_len = mca_coll_sm_component.sm_fragment_size; \
|
||||
ompi_convertor_pack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||
&(max_data), &mca_coll_sm_bogus_free_after);
|
||||
&(max_data), &mca_coll_sm_bogus_free_after)
|
||||
|
||||
/**
|
||||
* Macro to copy a single segment out from a shared segment to a user
|
||||
@ -447,7 +447,7 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
(iov).iov_base = (((char*) (index)->mcbmi_data) + \
|
||||
((src_rank) * mca_coll_sm_component.sm_fragment_size)); \
|
||||
ompi_convertor_unpack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||
&(max_data), &mca_coll_sm_bogus_free_after);
|
||||
&(max_data), &mca_coll_sm_bogus_free_after)
|
||||
|
||||
/**
|
||||
* Macro to memcpy a fragment between one shared segment and another
|
||||
@ -458,35 +458,62 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
((index)->mcbmi_data + \
|
||||
((src_rank) * \
|
||||
mca_coll_sm_component.sm_fragment_size)), \
|
||||
(len));
|
||||
(len))
|
||||
|
||||
/**
|
||||
* Macro to tell children that a segment is ready (normalize the
|
||||
* child's ID based on the shift used to calculate the "me" node in
|
||||
* the tree)
|
||||
* Macro to tell children that a segment is ready (normalize
|
||||
* the child's ID based on the shift used to calculate the "me" node
|
||||
* in the tree). Used in fan out opertations.
|
||||
*/
|
||||
#define PARENT_NOTIFY_CHILDREN(children, num_children, index, value) \
|
||||
for (i = 0; i < (num_children); ++i) { \
|
||||
*((size_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(((children)[i]->mcstn_id + root) % size)))) = (value); \
|
||||
}
|
||||
do { \
|
||||
for (i = 0; i < (num_children); ++i) { \
|
||||
*((size_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(((children)[i]->mcstn_id + root) % size)))) = (value); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
* Macro for childen to wait for parent notification (use real rank).
|
||||
* Save the value passed and then reset it when done.
|
||||
* Save the value passed and then reset it when done. Used in fan out
|
||||
* operations.
|
||||
*/
|
||||
#define CHILD_WAIT_FOR_NOTIFY(rank, index, value) \
|
||||
while (0 == *((volatile uint32_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size)))) { \
|
||||
continue; \
|
||||
} \
|
||||
(value) = *((volatile uint32_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size))); \
|
||||
*((uint32_t*) (((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size))) = 0;
|
||||
do { \
|
||||
volatile uint32_t *ptr = ((uint32_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size))); \
|
||||
while (0 == *ptr) continue; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
* Macro for children to tell parent that the data is ready in their
|
||||
* segment. Used for fan in operations.
|
||||
*/
|
||||
#define CHILD_NOTIFY_PARENT(child_rank, parent_rank, index, value) \
|
||||
((size_t*) \
|
||||
(((char*) (index)->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(parent_rank))))[(child_rank)] = (value)
|
||||
|
||||
/**
|
||||
* Macro for parent to wait for a specific child to tell it that the
|
||||
* data is in the child's segment. Save the value when done. Used
|
||||
* for fan in operations.
|
||||
*/
|
||||
#define PARENT_WAIT_FOR_NOTIFY_SPECIFIC(child_rank, parent_rank, index, value) \
|
||||
do { \
|
||||
volatile size_t *ptr = ((size_t *) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(parent_rank)))) + child_rank; \
|
||||
while (0 == *ptr) continue; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
|
||||
#endif /* MCA_COLL_SM_EXPORT_H */
|
||||
|
@ -126,7 +126,7 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
||||
|
||||
/* Copy the fragment from the user buffer to my fragment
|
||||
in the current segment */
|
||||
COPY_FRAGMENT_IN(convertor, index, iov, max_data);
|
||||
COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data);
|
||||
bytes += max_data;
|
||||
|
||||
/* Wait for the write to absolutely complete */
|
||||
@ -150,7 +150,6 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
||||
/* Non-root processes need a receive convertor to unpack from
|
||||
shared mmory to the user's buffer */
|
||||
|
||||
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
|
||||
if (OMPI_SUCCESS !=
|
||||
(ret =
|
||||
ompi_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor,
|
||||
|
@ -18,7 +18,7 @@
|
||||
*
|
||||
* Warning: this is not for the faint of heart -- don't even bother
|
||||
* reading this source code if you don't have a strong understanding
|
||||
* of nested data structures and pointer math (remeber that
|
||||
* of nested data structures and pointer math (remember that
|
||||
* associativity and order of C operations is *critical* in terms of
|
||||
* pointer math!).
|
||||
*/
|
||||
@ -41,6 +41,7 @@
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
#include "ompi/mca/mpool/mpool.h"
|
||||
#include "ompi/mca/mpool/base/base.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
#include "coll_sm.h"
|
||||
|
||||
|
||||
@ -179,6 +180,15 @@ mca_coll_sm_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* If the number of processes in this communicator is larger than
|
||||
(mca_coll_sm_component.sm_control_size / sizeof(uint32_t)),
|
||||
then we can't handle it. */
|
||||
|
||||
if (((unsigned) ompi_comm_size(comm)) >
|
||||
mca_coll_sm_component.sm_control_size / sizeof(uint32_t)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Get our priority */
|
||||
|
||||
*priority = mca_coll_sm_component.sm_priority;
|
||||
@ -362,7 +372,15 @@ sm_module_init(struct ompi_communicator_t *comm)
|
||||
maffinity[j].mbs_start_addr = base;
|
||||
maffinity[j].mbs_len = c->sm_control_size *
|
||||
c->sm_comm_num_in_use_flags;
|
||||
memset(maffinity[j].mbs_start_addr, 0, maffinity[j].mbs_len);
|
||||
/* Set the op counts to 1 (actually any nonzero value will do)
|
||||
so that the first time children/leaf processes come
|
||||
through, they don't see a value of 0 and think that the
|
||||
root/parent has already set the count to their op number
|
||||
(i.e., 0 is the first op count value). */
|
||||
for (i = 0; i < mca_coll_sm_component.sm_comm_num_in_use_flags; ++i) {
|
||||
((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_operation_count = 1;
|
||||
((mca_coll_sm_in_use_flag_t *)base)[i].mcsiuf_num_procs_using = 0;
|
||||
}
|
||||
D(("rank 0 zeroed in-use flags (num %d, len %d): %p - %p\n",
|
||||
c->sm_comm_num_in_use_flags,
|
||||
maffinity[j].mbs_len,
|
||||
@ -617,7 +635,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
|
||||
|
||||
data->mcb_data_mpool_malloc_addr = tmp =
|
||||
c->sm_data_mpool->mpool_alloc(c->sm_data_mpool, size,
|
||||
c->sm_control_size, NULL);
|
||||
c->sm_control_size, 0, NULL);
|
||||
if (NULL == tmp) {
|
||||
/* Cleanup before returning; allow other processes in
|
||||
this communicator to learn of the failure. Note
|
||||
|
@ -17,21 +17,489 @@
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/include/constants.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/datatype/convertor.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "opal/include/sys/atomic.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "coll_sm.h"
|
||||
|
||||
|
||||
/*
|
||||
* reduce
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm);
|
||||
#define WANT_REDUCE_NO_ORDER 0
|
||||
#if WANT_REDUCE_NO_ORDER
|
||||
static int reduce_no_order(void *sbuf, void* rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Useful utility routine
|
||||
*/
|
||||
static inline int min(int a, int b)
|
||||
{
|
||||
return (a < b) ? a : b;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shared memory reduction.
|
||||
*
|
||||
* Function: - shared memory reduce
|
||||
* Accepts: - same as MPI_Reduce()
|
||||
* Returns: - MPI_SUCCESS or error code
|
||||
* Simply farms out to the associative or non-associative functions.
|
||||
*/
|
||||
int mca_coll_sm_reduce_intra(void *sbuf, void* rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
int32_t size;
|
||||
|
||||
/* There are several possibilities:
|
||||
*
|
||||
|
||||
* 0. If the datatype is larger than a segment, fall back to basic
|
||||
* 1. If the op is user-defined, use the strict order
|
||||
* 2. If the op is intrinsic:
|
||||
* a. If the op is float-associative, use the unordered
|
||||
* b. If the op is not float-asociative:
|
||||
* i. if the data is floating point, use the strict order
|
||||
* ii. if the data is not floating point, use the unordered
|
||||
*/
|
||||
|
||||
ompi_ddt_type_size(dtype, &size);
|
||||
if (size > mca_coll_sm_component.sm_control_size) {
|
||||
return comm->c_coll_basic_module->coll_reduce(sbuf, rbuf, count,
|
||||
dtype, op, root, comm);
|
||||
}
|
||||
#if WANT_REDUCE_NO_ORDER
|
||||
else if (!ompi_op_is_intrinsic(op) ||
|
||||
(ompi_op_is_intrinsic(op) && !ompi_op_is_float_assoc(op) &&
|
||||
0 != (dtype->flags & DT_FLAG_DATA_FLOAT))) {
|
||||
return reduce_inorder(sbuf, rbuf, count, dtype, op, root, comm);
|
||||
} else {
|
||||
return reduce_no_order(sbuf, rbuf, count, dtype, op, root, comm);
|
||||
}
|
||||
#else
|
||||
else {
|
||||
return reduce_inorder(sbuf, rbuf, count, dtype, op, root, comm);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* In-order shared memory reduction.
|
||||
*
|
||||
* This function performs the reduction in order -- combining elements
|
||||
* starting with (0 operation 1), then (result operation 2), then
|
||||
* (result operation 3), etc.
|
||||
*
|
||||
* Root's algorithm:
|
||||
*
|
||||
* If our datatype is "friendly" (i.e., the representation of the
|
||||
* buffer is the same packed as it is unpacked), then the root doesn't
|
||||
* need a temporary buffer -- we can combine the operands directly
|
||||
* from the shared memory segments to the root's rbuf. Otherwise, we
|
||||
* need a receive convertor and receive each fragment into a temporary
|
||||
* buffer where we can combine that operan with the root's rbuf.
|
||||
*
|
||||
* In general, there are two loops:
|
||||
*
|
||||
* 1. loop over all fragments (which must be done in units of an
|
||||
* integer number of datatypes -- remember that if this function is
|
||||
* called, we know that the datattype is smaller than the max size of
|
||||
* a fragment, so this is definitely possible)
|
||||
*
|
||||
* 2. loop over all the processes -- 0 to (comm_size-1).
|
||||
* For process 0:
|
||||
* - if the root==0, copy the *entire* buffer (i.e., don't copy
|
||||
* fragment by fragment -- might as well copy the entire thing) the
|
||||
* first time through the algorithm, and no-op every other time
|
||||
* - else, copy from the shmem fragment to the out buffer
|
||||
* For all other proceses:
|
||||
* - if root==i, combine the relevant fragment from the sbuf to the
|
||||
* relevant fragment on the rbuf
|
||||
* - else, if the datatype is friendly, combine relevant fragment from
|
||||
* the shmem segment to the relevant fragment in the rbuf. Otherwise,
|
||||
* use the convertor to copy the fragment out of shmem into a temp
|
||||
* buffer and do the combination from there to the rbuf.
|
||||
*
|
||||
* If we don't have a friendly datatype, then free the temporary
|
||||
* buffer at the end.
|
||||
*/
|
||||
static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm)
|
||||
{
|
||||
struct iovec iov;
|
||||
mca_coll_base_comm_t *data = comm->c_coll_selected_data;
|
||||
int ret, rank, size;
|
||||
int flag_num, segment_num, max_segment_num;
|
||||
size_t total_size, max_data, bytes;
|
||||
mca_coll_sm_in_use_flag_t *flag;
|
||||
ompi_convertor_t convertor;
|
||||
mca_coll_base_mpool_index_t *index;
|
||||
mca_coll_sm_tree_node_t *me;
|
||||
int32_t ddt_size;
|
||||
size_t segment_ddt_count, segment_ddt_bytes, zero = 0;
|
||||
|
||||
/* Setup some identities */
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
size = ompi_comm_size(comm);
|
||||
|
||||
me = &data->mcb_tree[(rank + size - root) % size];
|
||||
|
||||
/* Figure out how much we should have the convertor copy. We need
|
||||
to have it be in units of a datatype -- i.e., we only want to
|
||||
copy a whole datatype worth of data or none at all (we've
|
||||
already guaranteed above that the datatype is not larger than a
|
||||
segment, so we'll at least get 1). */
|
||||
|
||||
ompi_ddt_type_size(dtype, &ddt_size);
|
||||
segment_ddt_count = mca_coll_sm_component.sm_fragment_size / ddt_size;
|
||||
iov.iov_len = segment_ddt_bytes = segment_ddt_count * ddt_size;
|
||||
total_size = ddt_size * count;
|
||||
|
||||
bytes = 0;
|
||||
|
||||
/* Only have one top-level decision as to whether I'm the root or
|
||||
not. Do this at the slight expense of repeating a little logic
|
||||
-- but it's better than a conditional branch in every loop
|
||||
iteration. */
|
||||
|
||||
/*********************************************************************
|
||||
* Root
|
||||
*********************************************************************/
|
||||
|
||||
if (root == rank) {
|
||||
char *reduce_temp_buffer, *free_buffer, *reduce_target;
|
||||
long true_lb, true_extent, lb, extent;
|
||||
char *inplace_temp;
|
||||
int peer;
|
||||
int count_left = count;
|
||||
int frag_num = 0;
|
||||
bool first_operation = true;
|
||||
|
||||
/* If the datatype is the same packed as it is unpacked, we
|
||||
can save a memory copy and just do the reduction operation
|
||||
directly from the shared memory segment. However, if the
|
||||
representation is not the same, then we need to get a
|
||||
receive convertor and a temporary buffer to receive
|
||||
into. */
|
||||
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
if (ompi_ddt_is_contiguous_memory_layout(dtype, count)) {
|
||||
free_buffer = NULL;
|
||||
} else {
|
||||
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
|
||||
|
||||
/* See lengthy comment in coll basic reduce about
|
||||
explanation for how to malloc the extra buffer. Note
|
||||
that we do not need a buffer big enough to hold "count"
|
||||
instances of the datatype (i.e., big enough to hold the
|
||||
entire user buffer) -- we only need to be able to hold
|
||||
"segment_ddt_count" instances (i.e., the number of
|
||||
instances that can be held in a single fragment) */
|
||||
|
||||
free_buffer = malloc(true_extent +
|
||||
(segment_ddt_count - 1) * extent);
|
||||
if (NULL == free_buffer) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
reduce_temp_buffer = free_buffer - lb;
|
||||
|
||||
/* Trickery here: we use a potentially smaller count than
|
||||
the user count -- use the largest count that is <=
|
||||
user's count that will fit within a single segment. */
|
||||
|
||||
if (OMPI_SUCCESS !=
|
||||
(ret = ompi_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor,
|
||||
dtype,
|
||||
segment_ddt_count,
|
||||
reduce_temp_buffer,
|
||||
&convertor))) {
|
||||
free(free_buffer);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* If we're a) doing MPI_IN_PLACE (which means we're the root
|
||||
-- wouldn't have gotten down here with MPI_IN_PLACE if we
|
||||
weren't the root), and b) we're not rank 0, then we need to
|
||||
copy the rbuf into a temporary buffer and use that as the
|
||||
sbuf */
|
||||
|
||||
if (MPI_IN_PLACE == sbuf && 0 != rank) {
|
||||
inplace_temp = malloc(true_extent + (count - 1) * extent);
|
||||
if (NULL == inplace_temp) {
|
||||
if (NULL != free_buffer) {
|
||||
free(free_buffer);
|
||||
}
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
sbuf = inplace_temp - lb;
|
||||
} else {
|
||||
inplace_temp = NULL;
|
||||
}
|
||||
|
||||
/* Main loop over receiving / reducing fragments */
|
||||
|
||||
do {
|
||||
|
||||
flag_num = (data->mcb_operation_count++ %
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags);
|
||||
|
||||
FLAG_SETUP(flag_num, flag, data);
|
||||
FLAG_WAIT_FOR_IDLE(flag);
|
||||
FLAG_RETAIN(flag, size, data->mcb_operation_count - 1);
|
||||
|
||||
/* Loop over all the segments in this set */
|
||||
|
||||
segment_num = flag_num *
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||
max_segment_num = (flag_num + 1) *
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||
reduce_target = (((char*) rbuf) + (frag_num * segment_ddt_bytes));
|
||||
|
||||
do {
|
||||
|
||||
/* Loop over the processes, receiving and reducing
|
||||
from them in order */
|
||||
|
||||
for (peer = 0; peer < size; ++peer) {
|
||||
|
||||
/* Handle the case where the source is this process */
|
||||
|
||||
if (rank == peer) {
|
||||
if (peer == 0) {
|
||||
/* If we're the root *and* the first
|
||||
process to be combined *and* this is
|
||||
the first segment in the entire
|
||||
algorithm, then just copy the whole
|
||||
buffer. That way, we never need to
|
||||
copy from this process again (i.e., do
|
||||
the copy all at once since all the data
|
||||
is local, and then don't worry about it
|
||||
for the rest of the algorithm) */
|
||||
if (first_operation) {
|
||||
first_operation = false;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
count,
|
||||
reduce_target, sbuf);
|
||||
D(("root copied entire buffer to rbuf (contig ddt, count %d) FIRST OPERATION\n", count));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* Otherwise, I'm not the first process,
|
||||
so instead of copying, combine in the
|
||||
next fragment */
|
||||
D(("root combiningn fragment from shmem (contig ddt): count %d (left %d, seg %d)\n", min(count_left, segment_ddt_count), count_left, segment_ddt_count));
|
||||
ompi_op_reduce(op,
|
||||
((char *) sbuf) +
|
||||
frag_num * segment_ddt_bytes,
|
||||
reduce_target,
|
||||
min(count_left, segment_ddt_count),
|
||||
dtype);
|
||||
}
|
||||
}
|
||||
|
||||
/* Now handle the case where the source is not
|
||||
this process. Wait for the process to copy to
|
||||
the segment. */
|
||||
|
||||
else {
|
||||
index = &(data->mcb_mpool_index[segment_num]);
|
||||
PARENT_WAIT_FOR_NOTIFY_SPECIFIC(peer, rank,
|
||||
index, max_data);
|
||||
|
||||
/* If we don't need an extra buffer, then do the
|
||||
reduction operation on the fragment straight
|
||||
from the shmem. */
|
||||
|
||||
if (NULL == free_buffer) {
|
||||
/* If this is the first process, just copy */
|
||||
if (0 == peer) {
|
||||
D(("root: special case -- copy from rank 0 shemem to reduce_target (%d bytes)\n", max_data));
|
||||
memcpy(reduce_target, index->mcbmi_data,
|
||||
max_data);
|
||||
}
|
||||
|
||||
/* If this is not the first process, do
|
||||
the reduction */
|
||||
else {
|
||||
|
||||
D(("root combining %d elements in shmem from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
ompi_op_reduce(op,
|
||||
(index->mcbmi_data +
|
||||
(peer * mca_coll_sm_component.sm_fragment_size)),
|
||||
reduce_target, max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
}
|
||||
|
||||
/* Otherwise, unpack the fragment to the temporary
|
||||
buffer and then do the reduction from there */
|
||||
|
||||
else {
|
||||
/* If this is the first process, then just
|
||||
copy out to the target buffer */
|
||||
if (0 == peer) {
|
||||
/* JMS: this is clearly inefficient --
|
||||
can avoid one of the memory copies
|
||||
here; have a pending question to
|
||||
george about this */
|
||||
D(("root: special case -- unpack and copy from rank 0 to reduce_target\n"));
|
||||
COPY_FRAGMENT_OUT(convertor, peer, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
max_data / ddt_size,
|
||||
reduce_target,
|
||||
iov.iov_base);
|
||||
}
|
||||
|
||||
/* Otherwise, copy to the temp buffer and
|
||||
then do the reduction */
|
||||
else {
|
||||
D(("root combining %d elements in copy out buffer from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
/* Unpack the fragment into my temporary
|
||||
buffer */
|
||||
COPY_FRAGMENT_OUT(convertor, peer, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
/* Do the reduction on this fragment */
|
||||
ompi_op_reduce(op, reduce_temp_buffer,
|
||||
reduce_target,
|
||||
max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
}
|
||||
} /* whether this process was me or not */
|
||||
} /* loop over all proceses */
|
||||
|
||||
/* We've iterated through all the processes -- now we
|
||||
move on to the next segment */
|
||||
|
||||
count_left -= segment_ddt_count;
|
||||
bytes += segment_ddt_bytes;
|
||||
++segment_num;
|
||||
++frag_num;
|
||||
reduce_target += segment_ddt_bytes;
|
||||
} while (bytes < total_size && segment_num < max_segment_num);
|
||||
|
||||
/* Root is now done with this set of segments */
|
||||
FLAG_RELEASE(flag);
|
||||
} while (bytes < total_size);
|
||||
|
||||
/* Kill the convertor, if we had one */
|
||||
|
||||
if (NULL != free_buffer) {
|
||||
OBJ_DESTRUCT(&convertor);
|
||||
free(free_buffer);
|
||||
}
|
||||
if (NULL != inplace_temp) {
|
||||
free(inplace_temp);
|
||||
}
|
||||
}
|
||||
|
||||
/*********************************************************************
|
||||
* Non-root
|
||||
*********************************************************************/
|
||||
|
||||
else {
|
||||
int parent_rank = (me->mcstn_parent->mcstn_id + root) % size;
|
||||
|
||||
/* Here we get a convertor for the full count that the user
|
||||
provided (as opposed to the convertor that the root got) */
|
||||
|
||||
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
|
||||
if (OMPI_SUCCESS !=
|
||||
(ret =
|
||||
ompi_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor,
|
||||
dtype,
|
||||
count,
|
||||
sbuf,
|
||||
&convertor))) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Loop over sending fragments to the root */
|
||||
|
||||
do {
|
||||
flag_num = (data->mcb_operation_count %
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags);
|
||||
|
||||
/* Wait for the root to mark this set of segments as
|
||||
ours */
|
||||
FLAG_SETUP(flag_num, flag, data);
|
||||
FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count);
|
||||
++data->mcb_operation_count;
|
||||
|
||||
/* Loop over all the segments in this set */
|
||||
|
||||
segment_num = flag_num *
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||
max_segment_num = (flag_num + 1) *
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||
do {
|
||||
index = &(data->mcb_mpool_index[segment_num]);
|
||||
|
||||
/* Copy from the user's buffer to the shared mem
|
||||
segment */
|
||||
COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data);
|
||||
bytes += max_data;
|
||||
|
||||
/* Wait for the write to absolutely complete */
|
||||
opal_atomic_wmb();
|
||||
|
||||
/* Tell my parent that this fragment is ready */
|
||||
CHILD_NOTIFY_PARENT(rank, parent_rank, index, max_data);
|
||||
|
||||
++segment_num;
|
||||
} while (bytes < total_size && segment_num < max_segment_num);
|
||||
|
||||
/* We're finished with this set of segments */
|
||||
FLAG_RELEASE(flag);
|
||||
} while (bytes < total_size);
|
||||
|
||||
/* Kill the convertor */
|
||||
|
||||
OBJ_DESTRUCT(&convertor);
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
#if WANT_REDUCE_NO_ORDER
|
||||
/**
|
||||
* Unordered shared memory reduction.
|
||||
*
|
||||
* This function performs the reduction in whatever order the operands
|
||||
* arrive.
|
||||
*/
|
||||
static int reduce_no_order(void *sbuf, void* rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user