- Remove some silly compiler warnings
- Move the "process 0" logic out of the main loop in reduce to make the code a bit less complex (at the price of slight code duplication, but it iss now significantly easier to read) - Fix problem with uniquenes guarantee in the bootstrap mpool -- using the CID alone was not sufficient enough to guarantee uniquenes; now use (CID, rank 0 process name) tuple to check for uniqueness - Made a few debugging help changes in coll_sm.h; especially helps debugging on uniprocessors This commit was SVN r7599.
Этот коммит содержится в:
родитель
2cedfeec53
Коммит
c7fe54ba44
@ -22,6 +22,7 @@
|
||||
|
||||
#include "mpi.h"
|
||||
#include "opal/mca/mca.h"
|
||||
#include "orte/mca/ns/ns_types.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/mpool/mpool.h"
|
||||
#include "ompi/mca/common/sm/common_sm_mmap.h"
|
||||
@ -36,6 +37,13 @@
|
||||
#define D(foo)
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
#include <sched.h>
|
||||
#define SPIN sched_yield()
|
||||
#else
|
||||
#define SPIN continue
|
||||
#endif
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -66,6 +74,23 @@ extern "C" {
|
||||
typedef struct mca_coll_sm_bootstrap_comm_setup_t
|
||||
mca_coll_sm_bootstrap_comm_setup_t;
|
||||
|
||||
/**
|
||||
* Structure that acts as a key for the bootstrap area -- a
|
||||
* segment in the bootstrap mpool is uniquely identified by its
|
||||
* CID and the process name of rank 0 in the group.
|
||||
*/
|
||||
struct mca_coll_sm_bootstrap_comm_key_t {
|
||||
/** CID of the communicator using a bootstrap segment */
|
||||
uint32_t mcsbck_cid;
|
||||
/** Process name of rank 0 in this communicator */
|
||||
orte_process_name_t mcsbck_rank0_name;
|
||||
};
|
||||
/**
|
||||
* Convenience typedef
|
||||
*/
|
||||
typedef struct mca_coll_sm_bootstrap_comm_key_t
|
||||
mca_coll_sm_bootstrap_comm_key_t;
|
||||
|
||||
/**
|
||||
* Extension to the control structure in the bootstrap mmap file
|
||||
*/
|
||||
@ -79,12 +104,13 @@ extern "C" {
|
||||
mca_coll_sm_bootstrap_comm_setup_t *smbhe_segments;
|
||||
|
||||
/** Pointer to array containing
|
||||
component.sm_bootstrap_num_segments CIDs for use in
|
||||
bootstrap phase -- will always point immediately after the
|
||||
end of this struct (i.e., still within this header, but
|
||||
since it's variable size (set by MCA param), we can't just
|
||||
have it here in the struct. Bonk). */
|
||||
uint32_t *smbhe_cids;
|
||||
component.sm_bootstrap_num_segments (CID, rank 0 process
|
||||
name) tuples for use in bootstrap phase -- will always
|
||||
point immediately after the end of this struct (i.e.,
|
||||
still within this header, but since it's variable size
|
||||
(set by MCA param), we can't just have it here in the
|
||||
struct. Bonk). */
|
||||
mca_coll_sm_bootstrap_comm_key_t *smbhe_keys;
|
||||
};
|
||||
/**
|
||||
* Convenience typedef
|
||||
@ -410,7 +436,7 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
* Macro to wait for the in-use flag to become idle (used by the root)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_IDLE(flag) \
|
||||
while (0 != (flag)->mcsiuf_num_procs_using) continue
|
||||
while (0 != (flag)->mcsiuf_num_procs_using) SPIN
|
||||
|
||||
/**
|
||||
* Macro to wait for a flag to indicate that it's ready for this
|
||||
@ -418,7 +444,7 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
* been called)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_OP(flag, op) \
|
||||
while ((op) != flag->mcsiuf_operation_count) continue
|
||||
while ((op) != flag->mcsiuf_operation_count) SPIN
|
||||
|
||||
/**
|
||||
* Macro to set an in-use flag with relevant data to claim it
|
||||
@ -491,7 +517,7 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
volatile uint32_t *ptr = ((uint32_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size))); \
|
||||
while (0 == *ptr) continue; \
|
||||
while (0 == *ptr) SPIN; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
@ -517,7 +543,7 @@ extern int32_t mca_coll_sm_bogus_free_after;
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(parent_rank)))) + child_rank; \
|
||||
while (0 == *ptr) continue; \
|
||||
while (0 == *ptr) SPIN; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include "mpi.h"
|
||||
#include "opal/mca/maffinity/maffinity.h"
|
||||
#include "opal/mca/maffinity/base/base.h"
|
||||
#include "orte/mca/ns/ns.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
@ -509,7 +510,8 @@ static int bootstrap_init(void)
|
||||
sizeof(mca_coll_sm_bootstrap_header_extension_t) +
|
||||
(mca_coll_sm_component.sm_bootstrap_num_segments *
|
||||
sizeof(mca_coll_sm_bootstrap_comm_setup_t)) +
|
||||
(sizeof(uint32_t) * mca_coll_sm_component.sm_bootstrap_num_segments);
|
||||
(sizeof(mca_coll_sm_bootstrap_comm_key_t) *
|
||||
mca_coll_sm_component.sm_bootstrap_num_segments);
|
||||
|
||||
mca_coll_sm_component.sm_bootstrap_meta = meta =
|
||||
mca_common_sm_mmap_init(size, fullpath,
|
||||
@ -535,10 +537,12 @@ static int bootstrap_init(void)
|
||||
sizeof(mca_coll_sm_bootstrap_header_extension_t) +
|
||||
(sizeof(uint32_t) *
|
||||
mca_coll_sm_component.sm_bootstrap_num_segments));
|
||||
bshe->smbhe_cids = (uint32_t *)
|
||||
bshe->smbhe_keys = (mca_coll_sm_bootstrap_comm_key_t *)
|
||||
(((char *) bshe) + sizeof(*bshe));
|
||||
for (i = 0; i < mca_coll_sm_component.sm_bootstrap_num_segments; ++i) {
|
||||
bshe->smbhe_cids[i] = INT_MAX;
|
||||
bshe->smbhe_keys[i].mcsbck_cid = INT_MAX;
|
||||
memset(&bshe->smbhe_keys[i].mcsbck_rank0_name, 0,
|
||||
sizeof(orte_process_name_t));
|
||||
}
|
||||
|
||||
bshe->super.seg_inited = true;
|
||||
@ -565,6 +569,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
|
||||
int num_in_use = c->sm_comm_num_in_use_flags;
|
||||
int frag_size = c->sm_fragment_size;
|
||||
int control_size = c->sm_control_size;
|
||||
orte_process_name_t *rank0;
|
||||
|
||||
/* Is our CID in the CIDs array? If not, loop until we can find
|
||||
an open slot in the array to use in the bootstrap to setup our
|
||||
@ -574,15 +579,20 @@ static int bootstrap_comm(ompi_communicator_t *comm)
|
||||
c->sm_bootstrap_meta->map_seg;
|
||||
bscs = bshe->smbhe_segments;
|
||||
opal_atomic_lock(&bshe->super.seg_lock);
|
||||
rank0 = &(comm->c_local_group->grp_proc_pointers[0]->proc_name);
|
||||
while (1) {
|
||||
opal_atomic_wmb();
|
||||
found = false;
|
||||
empty_index = -1;
|
||||
for (i = 0; i < mca_coll_sm_component.sm_bootstrap_num_segments; ++i) {
|
||||
if (comm->c_contextid == bshe->smbhe_cids[i]) {
|
||||
if (comm->c_contextid == bshe->smbhe_keys[i].mcsbck_cid &&
|
||||
0 == orte_ns.compare(ORTE_NS_CMP_ALL,
|
||||
rank0,
|
||||
&bshe->smbhe_keys[i].mcsbck_rank0_name)) {
|
||||
found = true;
|
||||
break;
|
||||
} else if (INT_MAX == bshe->smbhe_cids[i] && -1 == empty_index) {
|
||||
} else if (INT_MAX == bshe->smbhe_keys[i].mcsbck_cid &&
|
||||
-1 == empty_index) {
|
||||
empty_index = i;
|
||||
}
|
||||
}
|
||||
@ -603,7 +613,9 @@ static int bootstrap_comm(ompi_communicator_t *comm)
|
||||
size_t size;
|
||||
|
||||
i = empty_index;
|
||||
bshe->smbhe_cids[i] = comm->c_contextid;
|
||||
bshe->smbhe_keys[i].mcsbck_cid = comm->c_contextid;
|
||||
/* JMS better assignment? */
|
||||
bshe->smbhe_keys[i].mcsbck_rank0_name = *rank0;
|
||||
|
||||
bscs[i].smbcs_count = comm_size;
|
||||
|
||||
@ -701,7 +713,9 @@ static int bootstrap_comm(ompi_communicator_t *comm)
|
||||
--bscs[i].smbcs_count;
|
||||
if (0 == bscs[i].smbcs_count) {
|
||||
bscs[i].smbcs_data_mpool_offset = 0;
|
||||
bshe->smbhe_cids[i] = INT_MAX;
|
||||
bshe->smbhe_keys[i].mcsbck_cid = INT_MAX;
|
||||
memset(&bshe->smbhe_keys[i].mcsbck_rank0_name, 0,
|
||||
sizeof(orte_process_name_t));
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
@ -199,7 +199,7 @@ static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
ompi_ddt_get_extent(dtype, &lb, &extent);
|
||||
ompi_ddt_get_true_extent(dtype, &true_lb, &true_extent);
|
||||
if (ompi_ddt_is_contiguous_memory_layout(dtype, count)) {
|
||||
free_buffer = NULL;
|
||||
reduce_temp_buffer = free_buffer = NULL;
|
||||
} else {
|
||||
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
|
||||
|
||||
@ -256,12 +256,13 @@ static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
|
||||
do {
|
||||
|
||||
flag_num = (data->mcb_operation_count++ %
|
||||
flag_num = (data->mcb_operation_count %
|
||||
mca_coll_sm_component.sm_comm_num_in_use_flags);
|
||||
|
||||
FLAG_SETUP(flag_num, flag, data);
|
||||
FLAG_WAIT_FOR_IDLE(flag);
|
||||
FLAG_RETAIN(flag, size, data->mcb_operation_count - 1);
|
||||
FLAG_RETAIN(flag, size, data->mcb_operation_count);
|
||||
++data->mcb_operation_count;
|
||||
|
||||
/* Loop over all the segments in this set */
|
||||
|
||||
@ -272,45 +273,74 @@ static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
reduce_target = (((char*) rbuf) + (frag_num * segment_ddt_bytes));
|
||||
do {
|
||||
|
||||
/* Loop over the processes, receiving and reducing
|
||||
from them in order */
|
||||
/* Process 0 (special case) */
|
||||
|
||||
for (peer = 0; peer < size; ++peer) {
|
||||
if (rank == 0) {
|
||||
/* If we're the root *and* the first process to be
|
||||
combined *and* this is the first segment in the
|
||||
entire algorithm, then just copy the whole
|
||||
buffer. That way, we never need to copy from
|
||||
this process again (i.e., do the copy all at
|
||||
once since all the data is local, and then
|
||||
don't worry about it for the rest of the
|
||||
algorithm) */
|
||||
if (first_operation) {
|
||||
first_operation = false;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
count,
|
||||
reduce_target, sbuf);
|
||||
D(("root copied entire buffer to rbuf (contig ddt, count %d) FIRST OPERATION\n", count));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
index = &(data->mcb_mpool_index[segment_num]);
|
||||
PARENT_WAIT_FOR_NOTIFY_SPECIFIC(0, rank, index, max_data);
|
||||
|
||||
/* If we don't need an extra buffer, memcpy the
|
||||
fragment straight to the output buffer.
|
||||
Otherwise, unpack. */
|
||||
|
||||
if (NULL == free_buffer) {
|
||||
D(("root: special case -- copy from rank 0 shemem to reduce_target (%d bytes)\n", max_data));
|
||||
memcpy(reduce_target, index->mcbmi_data, max_data);
|
||||
} else {
|
||||
/* This is somethat inefficient -- should be
|
||||
able to avoid one of the memory copies
|
||||
here, but doing so would violate an
|
||||
abstraction barrier in the convertor (i.e.,
|
||||
directly manipulate some of the private
|
||||
data on the convertor struct) */
|
||||
D(("root: special case -- unpack and copy from rank 0 to reduce_target\n"));
|
||||
COPY_FRAGMENT_OUT(convertor, 0, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
max_data / ddt_size,
|
||||
reduce_target,
|
||||
iov.iov_base);
|
||||
}
|
||||
}
|
||||
|
||||
/* Loop over all the remaining processes, receiving
|
||||
and reducing them in order */
|
||||
|
||||
for (peer = 1; peer < size; ++peer) {
|
||||
|
||||
/* Handle the case where the source is this process */
|
||||
|
||||
if (rank == peer) {
|
||||
if (peer == 0) {
|
||||
/* If we're the root *and* the first
|
||||
process to be combined *and* this is
|
||||
the first segment in the entire
|
||||
algorithm, then just copy the whole
|
||||
buffer. That way, we never need to
|
||||
copy from this process again (i.e., do
|
||||
the copy all at once since all the data
|
||||
is local, and then don't worry about it
|
||||
for the rest of the algorithm) */
|
||||
if (first_operation) {
|
||||
first_operation = false;
|
||||
if (MPI_IN_PLACE != sbuf) {
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
count,
|
||||
reduce_target, sbuf);
|
||||
D(("root copied entire buffer to rbuf (contig ddt, count %d) FIRST OPERATION\n", count));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* Otherwise, I'm not the first process,
|
||||
so instead of copying, combine in the
|
||||
next fragment */
|
||||
D(("root combiningn fragment from shmem (contig ddt): count %d (left %d, seg %d)\n", min(count_left, segment_ddt_count), count_left, segment_ddt_count));
|
||||
ompi_op_reduce(op,
|
||||
((char *) sbuf) +
|
||||
frag_num * segment_ddt_bytes,
|
||||
reduce_target,
|
||||
min(count_left, segment_ddt_count),
|
||||
dtype);
|
||||
}
|
||||
/* Otherwise, I'm not the first process, so
|
||||
instead of copying, combine in the next
|
||||
fragment */
|
||||
D(("root combiningn fragment from shmem (contig ddt): count %d (left %d, seg %d)\n", min(count_left, segment_ddt_count), count_left, segment_ddt_count));
|
||||
ompi_op_reduce(op,
|
||||
((char *) sbuf) +
|
||||
frag_num * segment_ddt_bytes,
|
||||
reduce_target,
|
||||
min(count_left, segment_ddt_count),
|
||||
dtype);
|
||||
}
|
||||
|
||||
/* Now handle the case where the source is not
|
||||
@ -327,66 +357,32 @@ static int reduce_inorder(void *sbuf, void* rbuf, int count,
|
||||
from the shmem. */
|
||||
|
||||
if (NULL == free_buffer) {
|
||||
/* If this is the first process, just copy */
|
||||
if (0 == peer) {
|
||||
D(("root: special case -- copy from rank 0 shemem to reduce_target (%d bytes)\n", max_data));
|
||||
memcpy(reduce_target, index->mcbmi_data,
|
||||
max_data);
|
||||
}
|
||||
|
||||
/* If this is not the first process, do
|
||||
the reduction */
|
||||
else {
|
||||
|
||||
D(("root combining %d elements in shmem from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
ompi_op_reduce(op,
|
||||
(index->mcbmi_data +
|
||||
(peer * mca_coll_sm_component.sm_fragment_size)),
|
||||
reduce_target, max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
D(("root combining %d elements in shmem from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
ompi_op_reduce(op,
|
||||
(index->mcbmi_data +
|
||||
(peer * mca_coll_sm_component.sm_fragment_size)),
|
||||
reduce_target, max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
|
||||
/* Otherwise, unpack the fragment to the temporary
|
||||
buffer and then do the reduction from there */
|
||||
|
||||
else {
|
||||
/* If this is the first process, then just
|
||||
copy out to the target buffer */
|
||||
if (0 == peer) {
|
||||
/* JMS: this is clearly inefficient --
|
||||
can avoid one of the memory copies
|
||||
here; have a pending question to
|
||||
george about this */
|
||||
D(("root: special case -- unpack and copy from rank 0 to reduce_target\n"));
|
||||
COPY_FRAGMENT_OUT(convertor, peer, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
ompi_ddt_copy_content_same_ddt(dtype,
|
||||
max_data / ddt_size,
|
||||
reduce_target,
|
||||
iov.iov_base);
|
||||
}
|
||||
|
||||
/* Otherwise, copy to the temp buffer and
|
||||
then do the reduction */
|
||||
else {
|
||||
D(("root combining %d elements in copy out buffer from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
/* Unpack the fragment into my temporary
|
||||
buffer */
|
||||
COPY_FRAGMENT_OUT(convertor, peer, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
/* Do the reduction on this fragment */
|
||||
ompi_op_reduce(op, reduce_temp_buffer,
|
||||
reduce_target,
|
||||
max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
D(("root combining %d elements in copy out buffer from peer %d\n",
|
||||
max_data / ddt_size, peer));
|
||||
/* Unpack the fragment into my temporary
|
||||
buffer */
|
||||
COPY_FRAGMENT_OUT(convertor, peer, index,
|
||||
iov, max_data);
|
||||
ompi_convertor_set_position(&convertor, &zero);
|
||||
|
||||
/* Do the reduction on this fragment */
|
||||
ompi_op_reduce(op, reduce_temp_buffer,
|
||||
reduce_target,
|
||||
max_data / ddt_size,
|
||||
dtype);
|
||||
}
|
||||
} /* whether this process was me or not */
|
||||
} /* loop over all proceses */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user