simplify the bcast code by taking abstract actions and making them
macros -- will help with the other algorithms This commit was SVN r7214.
Этот коммит содержится в:
родитель
3e002203a0
Коммит
9302f924ea
@ -284,7 +284,6 @@ extern "C" {
|
|||||||
*/
|
*/
|
||||||
extern mca_coll_sm_component_t mca_coll_sm_component;
|
extern mca_coll_sm_component_t mca_coll_sm_component;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* coll module functions
|
* coll module functions
|
||||||
*/
|
*/
|
||||||
@ -385,4 +384,109 @@ extern "C" {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global variables used in the macros (essentially constants, so
|
||||||
|
* these are thread safe)
|
||||||
|
*/
|
||||||
|
extern uint32_t mca_coll_sm_iov_size;
|
||||||
|
extern int32_t mca_coll_sm_bogus_free_after;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to setup flag usage
|
||||||
|
*/
|
||||||
|
#define FLAG_SETUP(flag_num, flag, data) \
|
||||||
|
(flag) = (mca_coll_sm_in_use_flag_t*) \
|
||||||
|
(((char *) (data)->mcb_in_use_flags) + \
|
||||||
|
((flag_num) * mca_coll_sm_component.sm_control_size));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to wait for the in-use flag to become idle (used by the root)
|
||||||
|
*/
|
||||||
|
#define FLAG_WAIT_FOR_IDLE(flag) \
|
||||||
|
while (0 != (flag)->mcsiuf_num_procs_using) continue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to wait for a flag to indicate that it's ready for this
|
||||||
|
* operation (used by non-root processes to know when FLAG_SET() has
|
||||||
|
* been called)
|
||||||
|
*/
|
||||||
|
#define FLAG_WAIT_FOR_OP(flag, op) \
|
||||||
|
while ((op) != flag->mcsiuf_operation_count) continue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to set an in-use flag with relevant data to claim it
|
||||||
|
*/
|
||||||
|
#define FLAG_RETAIN(flag, num_procs, op_count) \
|
||||||
|
(flag)->mcsiuf_num_procs_using = (num_procs); \
|
||||||
|
(flag)->mcsiuf_operation_count = (op_count);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to release an in-use flag from this process
|
||||||
|
*/
|
||||||
|
#define FLAG_RELEASE(flag) \
|
||||||
|
opal_atomic_add(&(flag)->mcsiuf_num_procs_using, -1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to copy a single segment in from a user buffer to a shared
|
||||||
|
* segment
|
||||||
|
*/
|
||||||
|
#define COPY_FRAGMENT_IN(convertor, index, iov, max_data) \
|
||||||
|
(iov).iov_base = \
|
||||||
|
(index)->mcbmi_data + \
|
||||||
|
(rank * mca_coll_sm_component.sm_fragment_size); \
|
||||||
|
(max_data) = (iov).iov_len = mca_coll_sm_component.sm_fragment_size; \
|
||||||
|
ompi_convertor_pack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||||
|
&(max_data), &mca_coll_sm_bogus_free_after);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to copy a single segment out from a shared segment to a user
|
||||||
|
* buffer
|
||||||
|
*/
|
||||||
|
#define COPY_FRAGMENT_OUT(convertor, src_rank, index, iov, max_data) \
|
||||||
|
(iov).iov_base = (((char*) (index)->mcbmi_data) + \
|
||||||
|
((src_rank) * mca_coll_sm_component.sm_fragment_size)); \
|
||||||
|
ompi_convertor_unpack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||||
|
&(max_data), &mca_coll_sm_bogus_free_after);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to memcpy a fragment between one shared segment and another
|
||||||
|
*/
|
||||||
|
#define COPY_FRAGMENT_BETWEEN(src_rank, dest_rank, index, len) \
|
||||||
|
memcpy(((index)->mcbmi_data + \
|
||||||
|
((dest_rank) * mca_coll_sm_component.sm_fragment_size)), \
|
||||||
|
((index)->mcbmi_data + \
|
||||||
|
((src_rank) * \
|
||||||
|
mca_coll_sm_component.sm_fragment_size)), \
|
||||||
|
(len));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro to tell children that a segment is ready (normalize the
|
||||||
|
* child's ID based on the shift used to calculate the "me" node in
|
||||||
|
* the tree)
|
||||||
|
*/
|
||||||
|
#define PARENT_NOTIFY_CHILDREN(children, num_children, index, value) \
|
||||||
|
for (i = 0; i < (num_children); ++i) { \
|
||||||
|
*((size_t*) \
|
||||||
|
(((char*) index->mcbmi_control) + \
|
||||||
|
(mca_coll_sm_component.sm_control_size * \
|
||||||
|
(((children)[i]->mcstn_id + root) % size)))) = (value); \
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Macro for childen to wait for parent notification (use real rank).
|
||||||
|
* Save the value passed and then reset it when done.
|
||||||
|
*/
|
||||||
|
#define CHILD_WAIT_FOR_NOTIFY(rank, index, value) \
|
||||||
|
while (0 == *((volatile uint32_t*) \
|
||||||
|
(((char*) index->mcbmi_control) + \
|
||||||
|
((rank) * mca_coll_sm_component.sm_control_size)))) { \
|
||||||
|
continue; \
|
||||||
|
} \
|
||||||
|
(value) = *((volatile uint32_t*) \
|
||||||
|
(((char*) index->mcbmi_control) + \
|
||||||
|
((rank) * mca_coll_sm_component.sm_control_size))); \
|
||||||
|
*((uint32_t*) (((char*) index->mcbmi_control) + \
|
||||||
|
((rank) * mca_coll_sm_component.sm_control_size))) = 0;
|
||||||
|
|
||||||
#endif /* MCA_COLL_SM_EXPORT_H */
|
#endif /* MCA_COLL_SM_EXPORT_H */
|
||||||
|
@ -46,17 +46,15 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
struct ompi_communicator_t *comm)
|
struct ompi_communicator_t *comm)
|
||||||
{
|
{
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
uint32_t iov_size = 1;
|
|
||||||
mca_coll_base_comm_t *data = comm->c_coll_selected_data;
|
mca_coll_base_comm_t *data = comm->c_coll_selected_data;
|
||||||
int i, ret, rank, size, num_children;
|
int i, ret, rank, size, num_children, src_rank;
|
||||||
int flag_num, segment_num, max_segment_num;
|
int flag_num, segment_num, max_segment_num;
|
||||||
int parent_rank, child_rank;
|
int parent_rank;
|
||||||
size_t total_size, max_data, bytes;
|
size_t total_size, max_data, bytes;
|
||||||
volatile uint32_t *my_control;
|
|
||||||
mca_coll_sm_in_use_flag_t *flag;
|
mca_coll_sm_in_use_flag_t *flag;
|
||||||
ompi_convertor_t convertor;
|
ompi_convertor_t convertor;
|
||||||
mca_coll_sm_tree_node_t *me, *parent, **children;
|
mca_coll_sm_tree_node_t *me, *parent, **children;
|
||||||
int32_t bogus_free_after = 0;
|
mca_coll_base_mpool_index_t *index;
|
||||||
|
|
||||||
/* Setup some identities */
|
/* Setup some identities */
|
||||||
|
|
||||||
@ -106,27 +104,9 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
flag_num = (data->mcb_operation_count++ %
|
flag_num = (data->mcb_operation_count++ %
|
||||||
mca_coll_sm_component.sm_comm_num_in_use_flags);
|
mca_coll_sm_component.sm_comm_num_in_use_flags);
|
||||||
|
|
||||||
/* Wait for the set of segments to become available */
|
FLAG_SETUP(flag_num, flag, data);
|
||||||
flag = (mca_coll_sm_in_use_flag_t*)
|
FLAG_WAIT_FOR_IDLE(flag);
|
||||||
(((char *) data->mcb_in_use_flags) +
|
FLAG_RETAIN(flag, size - 1, data->mcb_operation_count - 1);
|
||||||
(flag_num * mca_coll_sm_component.sm_control_size));
|
|
||||||
D(("root waiting for in_use flag %d (value %d), %p\n",
|
|
||||||
flag_num, flag->mcsiuf_num_procs_using, flag));
|
|
||||||
while (0 != flag->mcsiuf_num_procs_using) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
D(("root got in_use flag %d (value %d), %p\n",
|
|
||||||
flag_num, flag->mcsiuf_num_procs_using, flag));
|
|
||||||
|
|
||||||
/* Now that the set of segments is availble, mark it as
|
|
||||||
used. No need to include the root in the count (we'd
|
|
||||||
only have to decrement it later). Don't need a write
|
|
||||||
barrier here -- we have another later that will
|
|
||||||
guarantee that the write has completed, if
|
|
||||||
necessary. */
|
|
||||||
|
|
||||||
flag->mcsiuf_num_procs_using = size - 1;
|
|
||||||
flag->mcsiuf_operation_count = data->mcb_operation_count - 1;
|
|
||||||
|
|
||||||
/* Loop over all the segments in this set */
|
/* Loop over all the segments in this set */
|
||||||
|
|
||||||
@ -135,40 +115,19 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
max_segment_num = (flag_num + 1) *
|
max_segment_num = (flag_num + 1) *
|
||||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||||
do {
|
do {
|
||||||
|
index = &(data->mcb_mpool_index[segment_num]);
|
||||||
|
|
||||||
/* Copy the fragment from the user buffer to my fragment
|
/* Copy the fragment from the user buffer to my fragment
|
||||||
in the current segment */
|
in the current segment */
|
||||||
iov.iov_base =
|
COPY_FRAGMENT_IN(convertor, index, iov, max_data);
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_data +
|
|
||||||
(rank * mca_coll_sm_component.sm_fragment_size);
|
|
||||||
max_data = iov.iov_len;
|
|
||||||
D(("root copying %lu bytes to data fan out, seg %d: %p\n",
|
|
||||||
(unsigned long) iov.iov_len, segment_num, iov.iov_base));
|
|
||||||
ompi_convertor_pack(&convertor, &iov, &iov_size,
|
|
||||||
&max_data, &bogus_free_after);
|
|
||||||
bytes += max_data;
|
bytes += max_data;
|
||||||
|
|
||||||
/* Wait for the write to absolutely complete */
|
/* Wait for the write to absolutely complete */
|
||||||
opal_atomic_wmb();
|
opal_atomic_wmb();
|
||||||
|
|
||||||
/* Tell my children that this fragment is ready (be
|
/* Tell my children that this fragment is ready */
|
||||||
sure to normalize the child's ID based on the shift
|
PARENT_NOTIFY_CHILDREN(children, num_children, index,
|
||||||
we did above to calculate the "me" node in the
|
max_data);
|
||||||
tree) */
|
|
||||||
for (i = 0; i < num_children; ++i) {
|
|
||||||
child_rank = (children[i]->mcstn_id + root) % size;
|
|
||||||
*((size_t*)
|
|
||||||
(((char*)
|
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_control) +
|
|
||||||
(mca_coll_sm_component.sm_control_size *
|
|
||||||
child_rank))) = max_data;
|
|
||||||
D(("root sent notice to child %d (vrank %d), control to %p\n",
|
|
||||||
i, children[i]->mcstn_id,
|
|
||||||
(((char*)
|
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_control) +
|
|
||||||
(mca_coll_sm_component.sm_control_size *
|
|
||||||
child_rank))));
|
|
||||||
}
|
|
||||||
|
|
||||||
++segment_num;
|
++segment_num;
|
||||||
} while (bytes < total_size && segment_num < max_segment_num);
|
} while (bytes < total_size && segment_num < max_segment_num);
|
||||||
@ -207,13 +166,8 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
|
|
||||||
/* Wait for the root to mark this set of segments as
|
/* Wait for the root to mark this set of segments as
|
||||||
ours */
|
ours */
|
||||||
flag = (mca_coll_sm_in_use_flag_t*)
|
FLAG_SETUP(flag_num, flag, data);
|
||||||
(((char *) data->mcb_in_use_flags) +
|
FLAG_WAIT_FOR_OP(flag, data->mcb_operation_count);
|
||||||
(flag_num * mca_coll_sm_component.sm_control_size));
|
|
||||||
D(("rank %d waiting for root to claim in-use flag %d, %p (op count %d)\n", rank, flag_num, flag, data->mcb_operation_count));
|
|
||||||
while (data->mcb_operation_count != flag->mcsiuf_operation_count) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
++data->mcb_operation_count;
|
++data->mcb_operation_count;
|
||||||
|
|
||||||
/* Loop over all the segments in this set */
|
/* Loop over all the segments in this set */
|
||||||
@ -224,71 +178,32 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
mca_coll_sm_component.sm_comm_num_in_use_flags;
|
||||||
do {
|
do {
|
||||||
|
|
||||||
/* Pre-calculate some pointers */
|
/* Pre-calculate some values */
|
||||||
|
|
||||||
parent_rank = (parent->mcstn_id + root) % size;
|
parent_rank = (parent->mcstn_id + root) % size;
|
||||||
D(("rank %d parent rank is %d\n", rank, parent_rank));
|
index = &(data->mcb_mpool_index[segment_num]);
|
||||||
my_control = (uint32_t *)
|
|
||||||
(((char*)
|
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_control) +
|
|
||||||
(rank * mca_coll_sm_component.sm_control_size));
|
|
||||||
|
|
||||||
/* Wait for the fragment: the parent will mark the segment
|
/* Wait for my parent to tell me that the segment is ready */
|
||||||
as ready */
|
CHILD_WAIT_FOR_NOTIFY(rank, index, max_data);
|
||||||
D(("rank %d waiting for fragment in segment %d (control %p)\n",
|
|
||||||
rank, segment_num, (char*) my_control));
|
|
||||||
while (0 == *my_control) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
max_data = *my_control;
|
|
||||||
D(("rank %d: fragment ready in segment %d\n", rank, segment_num));
|
|
||||||
|
|
||||||
/* If I have children, send the data to them */
|
/* If I have children, send the data to them */
|
||||||
if (num_children > 0) {
|
if (num_children > 0) {
|
||||||
/* No need to wait for the segment to become
|
/* Copy the fragment from the parent's portion in
|
||||||
available -- the root has already claimed it
|
the segment to my portion in the segment. */
|
||||||
and we're all already using it. So copy the
|
COPY_FRAGMENT_BETWEEN(parent_rank, rank, index, max_data);
|
||||||
fragment from the parent's portion in the
|
|
||||||
segment to my portion in the segment. This is
|
|
||||||
a simply memcpy because it's already been
|
|
||||||
packed into the parent's segment. */
|
|
||||||
memcpy(/* my data fan out section in the segment */
|
|
||||||
(data->mcb_mpool_index[segment_num].mcbmi_data +
|
|
||||||
(rank * mca_coll_sm_component.sm_fragment_size)),
|
|
||||||
/* parent's fan out section in the segment */
|
|
||||||
(data->mcb_mpool_index[segment_num].mcbmi_data +
|
|
||||||
(parent_rank *
|
|
||||||
mca_coll_sm_component.sm_fragment_size)),
|
|
||||||
/* length */
|
|
||||||
*my_control);
|
|
||||||
D(("rank %d memcopy'ed fragment (%p) to my data fan out (%lu bytes)\n", rank,
|
|
||||||
(data->mcb_mpool_index[segment_num].mcbmi_data +
|
|
||||||
(parent_rank * mca_coll_sm_component.sm_fragment_size)),
|
|
||||||
(unsigned long) *my_control));
|
|
||||||
|
|
||||||
/* Wait for the write to absolutely complete */
|
/* Wait for the write to absolutely complete */
|
||||||
opal_atomic_wmb();
|
opal_atomic_wmb();
|
||||||
|
|
||||||
/* Tell my children that this fragment is ready */
|
/* Tell my children that this fragment is ready */
|
||||||
for (i = 0; i < num_children; ++i) {
|
PARENT_NOTIFY_CHILDREN(children, num_children, index,
|
||||||
child_rank = (children[i]->mcstn_id + root) % size;
|
max_data);
|
||||||
*((size_t*)
|
|
||||||
(((char*)
|
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_control) +
|
|
||||||
(mca_coll_sm_component.sm_control_size *
|
|
||||||
child_rank))) = *my_control;
|
|
||||||
D(("rank %d notifying child %d (vrank %d, rank %d)\n",
|
|
||||||
rank, i, children[i]->mcstn_id, child_rank));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Set the "copy from buffer" to be my local
|
/* Set the "copy from buffer" to be my local
|
||||||
segment buffer so that we don't potentially
|
segment buffer so that we don't potentially
|
||||||
incur a non-local memory copy from the parent's
|
incur a non-local memory copy from the parent's
|
||||||
fan out data segment [again] when copying to
|
fan out data segment [again] when copying to
|
||||||
the user's buffer */
|
the user's buffer */
|
||||||
iov.iov_base =
|
src_rank = rank;
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_data +
|
|
||||||
(rank * mca_coll_sm_component.sm_fragment_size);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If I don't have any children, set the "copy from
|
/* If I don't have any children, set the "copy from
|
||||||
@ -296,20 +211,12 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
directly from my parent */
|
directly from my parent */
|
||||||
|
|
||||||
else {
|
else {
|
||||||
iov.iov_base =
|
src_rank = parent_rank;
|
||||||
(((char*)
|
|
||||||
data->mcb_mpool_index[segment_num].mcbmi_data) +
|
|
||||||
(parent_rank *
|
|
||||||
mca_coll_sm_component.sm_fragment_size));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Copy to my output buffer */
|
/* Copy to my output buffer */
|
||||||
D(("rank %d convertor copied from parent data %p to user buffer (%lu bytes)\n",
|
COPY_FRAGMENT_OUT(convertor, src_rank, index, iov, max_data);
|
||||||
rank, iov.iov_base, (unsigned long) iov.iov_len));
|
|
||||||
ompi_convertor_unpack(&convertor, &iov, &iov_size,
|
|
||||||
&max_data, &bogus_free_after);
|
|
||||||
|
|
||||||
*my_control = 0;
|
|
||||||
bytes += max_data;
|
bytes += max_data;
|
||||||
++segment_num;
|
++segment_num;
|
||||||
} while (bytes < total_size && segment_num < max_segment_num);
|
} while (bytes < total_size && segment_num < max_segment_num);
|
||||||
@ -319,10 +226,7 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
|
|||||||
opal_atomic_wmb();
|
opal_atomic_wmb();
|
||||||
|
|
||||||
/* We're finished with this set of segments */
|
/* We're finished with this set of segments */
|
||||||
|
FLAG_RELEASE(flag);
|
||||||
D(("rank %d done with in-use flag %d (value %d)\n",
|
|
||||||
rank, flag_num, flag->mcsiuf_num_procs_using));
|
|
||||||
opal_atomic_add(&flag->mcsiuf_num_procs_using, -1);
|
|
||||||
} while (bytes < total_size);
|
} while (bytes < total_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,13 @@
|
|||||||
#include "coll_sm.h"
|
#include "coll_sm.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Global variables
|
||||||
|
*/
|
||||||
|
uint32_t mca_coll_sm_iov_size = 1;
|
||||||
|
int32_t mca_coll_sm_bogus_free_after = 0;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local functions
|
* Local functions
|
||||||
*/
|
*/
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user