1
1
- bcast now works properly for root!=0 and multi-fragment messages
- destroy mpool when communicator is destroyed
Still need to implement:
- "in use" flags for groups of fragments so that "wrapping around" in
  the data segment doesn't overwrite not-yet-read data
- ensure that shared memory isn't removed before all processes have
  finished with it (e.g., during COMM_FREE)

This commit was SVN r7172.
Этот коммит содержится в:
Jeff Squyres 2005-09-03 11:49:46 +00:00
родитель e53fa399dc
Коммит bc72a7722b
5 изменённых файлов: 150 добавлений и 108 удалений

Просмотреть файл

@ -28,6 +28,17 @@
#include "ompi/mca/mpool/mpool.h"
#include "ompi/mca/common/sm/common_sm_mmap.h"
/*
* Horrid debugging macro
*/
#if 0
#include <stdio.h>
#define D(foo) { printf foo ; fflush(stdout); }
#else
#define D(foo)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
@ -45,7 +56,7 @@ extern "C" {
/** Number of segments in the data mpool area for this
communicator */
int smbcs_communicator_num_segments;
int smbcs_comm_num_segments;
/** Number of processes in this communicator who have seen
this value already. */
@ -114,9 +125,13 @@ extern "C" {
use */
char *sm_mpool_name;
/** MCA parameter: Number of "in use" flags in each
communicator's area in the data mpool */
int sm_comm_num_in_use_flags;
/** MCA parameter: Number of segments for each communicator in
the data mpool */
int sm_communicator_num_segments;
int sm_comm_num_segments;
/** MCA parameter: Fragment size for data */
int sm_fragment_size;
@ -124,6 +139,10 @@ extern "C" {
/** MCA parameter: Degree of tree for tree-based collectives */
int sm_tree_degree;
/** MCA parameter: Number of processes to use in the
calculation of the "info" MCA parameter */
int sm_info_comm_size;
/** Size of the bootstrap area -- calculated in
coll_sm_component.c */
size_t sm_bootstrap_size;
@ -176,6 +195,8 @@ extern "C" {
struct mca_coll_base_mpool_index_t {
/** Pointer to beginning of control data */
uint32_t *mcbmi_control;
/** Pointer to the "in use" buffer for this segment */
uint32_t *mcbmi_in_use;
/** Pointer to beginning of message fragment data */
char *mcbmi_data;
};

Просмотреть файл

@ -23,12 +23,6 @@
#include "opal/include/sys/atomic.h"
#include "coll_sm.h"
#if 0
#define D(foo) { printf foo ; fflush(stdout); }
#else
#define D(foo)
#endif
/**
* Shared memory barrier.
*

Просмотреть файл

@ -24,13 +24,6 @@
#include "opal/include/sys/atomic.h"
#include "coll_sm.h"
#if 0
#define D(foo) { printf foo ; fflush(stdout); }
#else
#define D(foo)
#endif
/**
* Shared memory broadcast.
*
@ -55,7 +48,8 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
struct iovec iov;
uint32_t iov_size = 1;
mca_coll_base_comm_t *data = comm->c_coll_selected_data;
int i, ret, rank, size, segment, num_children;
int i, ret, rank, vrank, size, segment, num_children;
int parent_rank, child_rank;
size_t total_size, max_data, bytes;
uint32_t *my_control, *parent_control;
ompi_convertor_t send_convertor;
@ -67,8 +61,10 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
rank = ompi_comm_rank(comm);
size = ompi_comm_size(comm);
vrank = (rank + size - root) % size;
me = &data->mcb_tree[(rank + size - root) % size];
me = &data->mcb_tree[vrank];
D(("rank %d: virtual rank %d\n", rank, vrank));
parent = me->mcstn_parent;
children = me->mcstn_children;
num_children = me->mcstn_num_children;
@ -119,7 +115,7 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
/* Loop over the fragments */
do {
segment = (++data->mcb_operation_count %
segment = (data->mcb_operation_count++ %
data->mcb_mpool_num_segments);
/* Root */
@ -129,30 +125,36 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
/* Wait for the segment to become available */
/* JMS */
/* Copy the fragment from the user buffer to the segment */
iov.iov_base = data->mcb_mpool_index[segment].mcbmi_data +
(root * mca_coll_sm_component.sm_fragment_size);
/* Copy the fragment from the user buffer to my fragment
in the current segment */
iov.iov_base =
data->mcb_mpool_index[segment].mcbmi_data +
(rank * mca_coll_sm_component.sm_fragment_size);
max_data = iov.iov_len;
D(("root copying %lu bytes to data fan out, seg %d: %p\n",
(unsigned long) iov.iov_len, segment, iov.iov_base));
ompi_convertor_pack(&send_convertor, &iov, &iov_size,
&max_data, &bogus_free_after);
D(("root sent %lu bytes to data fan out: %p\n",
(unsigned long) max_data,
data->mcb_mpool_index[segment].mcbmi_data +
(root * mca_coll_sm_component.sm_fragment_size)));
/* Wait for the write to absolutely complete */
opal_atomic_wmb();
/* Tell my children that this fragment is ready */
/* Tell my children that this fragment is ready (be sure
to normalize the child's ID based on the shift we did
above to calculate the "me" node in the tree) */
for (i = 0; i < num_children; ++i) {
/* JMS: TEMPORARILY HARDWIRED FOR ROOT==0 */
child_rank = (children[i]->mcstn_id + root) % size;
*((size_t*)
(((char*)
data->mcb_mpool_index[segment].mcbmi_control) +
(mca_coll_sm_component.sm_control_size *
children[i]->mcstn_id))) = max_data;
D(("root sent notice to child %d (rank %d)\n",
i, children[i]->mcstn_id));
child_rank))) = max_data;
D(("root sent notice to child %d (vrank %d), control to %p\n",
i, children[i]->mcstn_id,
(((char*)
data->mcb_mpool_index[segment].mcbmi_control) +
(mca_coll_sm_component.sm_control_size *
child_rank))));
}
}
@ -161,7 +163,9 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
else {
/* Pre-calculate some pointers */
parent_rank = (parent->mcstn_id + root) % size;
D(("rank %d parent rank is %d\n", rank, parent_rank));
my_control = (uint32_t *)
(((char*)
data->mcb_mpool_index[segment].mcbmi_control) +
@ -169,42 +173,40 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
parent_control = (uint32_t *)
(((char*)
data->mcb_mpool_index[segment].mcbmi_control) +
(parent->mcstn_id * mca_coll_sm_component.sm_control_size));
(parent_rank * mca_coll_sm_component.sm_control_size));
/* Wait for the fragment: the parent will mark the segment
as ready */
D(("rank %d waiting for fragment in segment %d\n", rank, segment));
D(("rank %d waiting for fragment in segment %d (control %p)\n",
rank, segment, (char*) my_control));
while (0 == *my_control) {
continue;
}
max_data = *my_control;
D(("rank %d: fragment ready in segment %d\n", rank, segment));
/* If I have children, send the data to them */
if (num_children > 0) {
max_data = iov.iov_len;
/* Wait for the segment to become available */
/* JMS */
/* Copy the fragment from the parent's portion in the
segment to my portion in the segment. This is a
simply memcpy because it's already been packed
into the parent's segment. */
/* No need to wait for the segment to become available
-- the root has already claimed it and we're all
already using it. So copy the fragment from the
parent's portion in the segment to my portion in
the segment. This is a simply memcpy because it's
already been packed into the parent's segment. */
memcpy(/* my data fan out section in the segment */
(data->mcb_mpool_index[segment].mcbmi_data +
(me->mcstn_id *
mca_coll_sm_component.sm_fragment_size)),
(rank * mca_coll_sm_component.sm_fragment_size)),
/* parent's fan out section in the segment */
(data->mcb_mpool_index[segment].mcbmi_data +
(parent->mcstn_id *
(parent_rank *
mca_coll_sm_component.sm_fragment_size)),
/* length */
*my_control);
D(("rank %d copied fragment (%p) to my data fan out (%lu bytes)\n", rank,
D(("rank %d memcopy'ed fragment (%p) to my data fan out (%lu bytes)\n", rank,
(data->mcb_mpool_index[segment].mcbmi_data +
(parent->mcstn_id *
mca_coll_sm_component.sm_fragment_size)),
(parent_rank * mca_coll_sm_component.sm_fragment_size)),
(unsigned long) *my_control));
/* Wait for the write to absolutely complete */
@ -212,14 +214,14 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
/* Tell my children that this fragment is ready */
for (i = 0; i < num_children; ++i) {
/* JMS: TEMPORARILY HARDWIRED FOR ROOT==0 */
*((uint32_t*)
child_rank = (children[i]->mcstn_id + root) % size;
*((size_t*)
(((char*)
data->mcb_mpool_index[segment].mcbmi_control) +
(mca_coll_sm_component.sm_control_size *
children[i]->mcstn_id))) = 1;
D(("rank %d notifying child %d (rank %d)\n",
rank, i, children[i]->mcstn_id));
child_rank))) = *my_control;
D(("rank %d notifying child %d (vrank %d, rank %d)\n",
rank, i, children[i]->mcstn_id, child_rank));
}
/* Set the "copy from buffer" to be my local segment
@ -229,9 +231,7 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
buffer */
iov.iov_base =
data->mcb_mpool_index[segment].mcbmi_data +
(me->mcstn_id *
mca_coll_sm_component.sm_fragment_size);
D(("rank %d convertor copying from my data fan out\n", rank));
(rank * mca_coll_sm_component.sm_fragment_size);
}
/* If I don't have any children, set the "copy from
@ -242,14 +242,14 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
iov.iov_base =
(((char*)
data->mcb_mpool_index[segment].mcbmi_data) +
(parent->mcstn_id *
mca_coll_sm_component.sm_fragment_size));
(parent_rank * mca_coll_sm_component.sm_fragment_size));
}
/* Copy to my output buffer */
D(("rank %d convertor copied from parent data %p to user buffer (%lu bytes)\n",
rank, iov.iov_base, (unsigned long) iov.iov_len));
ompi_convertor_unpack(&recv_convertor, &iov, &iov_size,
&max_data, &bogus_free_after);
D(("rank %d convertor copied into user buffer (%lu bytes)\n", rank, max_data));
}
/* It's ok to only look at the max_data from the last
@ -257,13 +257,10 @@ int mca_coll_sm_bcast_intra(void *buff, int count,
them */
bytes += max_data;
} while (bytes < total_size);
D(("rank %d done sending/receiving\n", rank));
if (root == rank) {
D(("root destroying send convertor\n"));
OBJ_DESTRUCT(&send_convertor);
} else {
D(("rank %d destroying recv_convertor\n", rank));
OBJ_DESTRUCT(&recv_convertor);
}
D(("rank %d done with bcast\n", rank));

Просмотреть файл

@ -12,12 +12,12 @@
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
* Most of the description of the data layout is in the
* coll_sm_module.c file.
*/
#include "ompi_config.h"
@ -96,8 +96,8 @@ mca_coll_sm_component_t mca_coll_sm_component = {
/* (default) priority */
75,
/* (default) control unit size (bytes) */
64,
/* (default) control size (bytes) */
4096,
/* (default) bootstrap filename */
"coll-sm-bootstrap",
@ -108,9 +108,13 @@ mca_coll_sm_component_t mca_coll_sm_component = {
/* (default) mpool name to use */
"sm",
/* (default) number of "in use" flags for each communicator's area
in the mpool */
2,
/* (default) number of segments for each communicator in the mpool
area */
2,
8,
/* (default) fragment size */
8192,
@ -119,6 +123,10 @@ mca_coll_sm_component_t mca_coll_sm_component = {
control unit size) */
4,
/* (default) number of processes in coll_sm_shared_mem_size
information variable */
4,
/* default values for non-MCA parameters */
0, /* bootstrap size -- filled in below */
0, /* mpool data size -- filled in below */
@ -147,7 +155,7 @@ static int sm_open(void)
&cs->sm_priority);
mca_base_param_reg_int(c, "control_size",
"Length of the control data -- should usually be either a cache line on most SMPs, or a page on machine where pages that support direct memory affinity placement (in bytes)",
"Length of the control data -- should usually be either the length of a cache line on most SMPs, or the size of a page on machines that support direct memory affinity page placement (in bytes)",
false, false,
cs->sm_control_size,
&cs->sm_control_size);
@ -165,7 +173,7 @@ static int sm_open(void)
&cs->sm_bootstrap_num_segments);
mca_base_param_reg_int(c, "fragment_size",
"Fragment size (in bytes) used for passing data through shared memory (will be rounded up to the nearest control_size size)",
"Fragment size (in bytes) used for passing data through shared memory (bytes, will be rounded up to the nearest control_size size)",
false, false,
cs->sm_fragment_size,
&cs->sm_fragment_size);
@ -180,17 +188,30 @@ static int sm_open(void)
cs->sm_mpool_name,
&cs->sm_mpool_name);
mca_base_param_reg_int(c, "communicator_num_segments",
"Number of shared memory collective segments on each communicator (must be >= 2)",
mca_base_param_reg_int(c, "comm_in_use_flags",
"Number of \"in use\" flags, used to mark a message passing area segment as currently being used or not (must be >= 2 and <= comm_num_segments)",
false, false,
cs->sm_communicator_num_segments,
&cs->sm_communicator_num_segments);
if (cs->sm_communicator_num_segments < 2) {
cs->sm_communicator_num_segments = 2;
cs->sm_comm_num_in_use_flags,
&cs->sm_comm_num_in_use_flags);
if (cs->sm_comm_num_in_use_flags < 2) {
cs->sm_comm_num_in_use_flags = 2;
}
mca_base_param_reg_int(c, "comm_num_segments",
"Number of segments in each communicator's shared memory message passing area (must be >= 2, and must be a multiple of comm_in_use_flags)",
false, false,
cs->sm_comm_num_segments,
&cs->sm_comm_num_segments);
if (cs->sm_comm_num_segments < 2) {
cs->sm_comm_num_segments = 2;
}
if (0 != (cs->sm_comm_num_segments % cs->sm_comm_num_in_use_flags)) {
cs->sm_comm_num_segments += cs->sm_comm_num_in_use_flags -
(cs->sm_comm_num_segments % cs->sm_comm_num_in_use_flags);
}
mca_base_param_reg_int(c, "tree_degree",
"Degree of the tree for tree-based operations (must be <= control size and <= 255)",
"Degree of the tree for tree-based operations (must be => 1 and <= min(control_size, 255))",
false, false,
cs->sm_tree_degree,
&cs->sm_tree_degree);
@ -238,11 +259,15 @@ static int sm_open(void)
Which reduces to:
num_segs * num_procs * 2 * (control_size + frag)
For this example, assume num_procs = 1.
*/
size2 = cs->sm_communicator_num_segments * 2 *
mca_base_param_reg_int(c, "info_num_procs",
"Number of processes to use for the calculation of the shared_mem_size MCA information parameter (must be => 2)",
false, false,
cs->sm_info_comm_size,
&cs->sm_info_comm_size);
size2 = cs->sm_comm_num_segments * cs->sm_info_comm_size *
(cs->sm_control_size + cs->sm_fragment_size);
mca_base_param_reg_int(c, "shared_mem_used_data",
"Amount of shared memory used in the shared memory data area for one process (in bytes)",

Просмотреть файл

@ -13,6 +13,10 @@
*
* $HEADER$
*/
/**
* @file
*
*/
#include "ompi_config.h"
@ -197,7 +201,7 @@ sm_module_init(struct ompi_communicator_t *comm)
mca_coll_sm_component_t *c = &mca_coll_sm_component;
opal_maffinity_base_segment_t *segments;
int parent, min_child, max_child, num_children;
char *barrier_base;
char *base;
const int num_barrier_buffers = 2;
/* Get some space to setup memory affinity (just easier to try to
@ -250,16 +254,6 @@ sm_module_init(struct ompi_communicator_t *comm)
data->mcb_tree[i - 1].mcstn_children + c->sm_tree_degree;
}
/* Bootstrap this communicator; find the shared memory in the main
mpool that has been allocated among my peers for this
communicator. */
if (OMPI_SUCCESS != bootstrap_comm(comm)) {
free(data);
comm->c_coll_selected_data = NULL;
return NULL;
}
/* Pre-compute a tree for a given number of processes and degree.
We'll re-use this tree for all possible values of root (i.e.,
shift everyone's process to be the "0"/root in this tree. */
@ -299,6 +293,16 @@ sm_module_init(struct ompi_communicator_t *comm)
}
}
/* Bootstrap this communicator; find the shared memory in the main
mpool that has been allocated among my peers for this
communicator. */
if (OMPI_SUCCESS != bootstrap_comm(comm)) {
free(data);
comm->c_coll_selected_data = NULL;
return NULL;
}
/* Once the communicator is bootstrapped, setup the pointers into
the data mpool area. First, setup the barrier buffers. There
are 2 sets of barrier buffers (because there can never be more
@ -309,12 +313,12 @@ sm_module_init(struct ompi_communicator_t *comm)
data is sufficient). */
control_size = c->sm_control_size;
barrier_base = (char*) (data->mcb_mpool_base + data->mcb_mpool_offset);
base = (char*) (data->mcb_mpool_base + data->mcb_mpool_offset);
data->mcb_barrier_control_me = (uint32_t*)
(barrier_base + (rank * control_size * num_barrier_buffers * 2));
(base + (rank * control_size * num_barrier_buffers * 2));
if (data->mcb_tree[rank].mcstn_parent) {
data->mcb_barrier_control_parent = (uint32_t*)
(barrier_base +
(base +
(data->mcb_tree[rank].mcstn_parent->mcstn_id * control_size *
num_barrier_buffers * 2));
} else {
@ -322,7 +326,7 @@ sm_module_init(struct ompi_communicator_t *comm)
}
if (data->mcb_tree[rank].mcstn_num_children > 0) {
data->mcb_barrier_control_children = (uint32_t*)
(barrier_base +
(base +
(data->mcb_tree[rank].mcstn_children[0]->mcstn_id * control_size *
num_barrier_buffers * 2));
} else {
@ -335,12 +339,13 @@ sm_module_init(struct ompi_communicator_t *comm)
control_size = size * c->sm_control_size;
frag_size = size * c->sm_fragment_size;
base += (control_size * num_barrier_buffers * 2);
for (j = i = 0; i < data->mcb_mpool_num_segments; ++i) {
data->mcb_mpool_index[i].mcbmi_control = (uint32_t*)
(barrier_base + (control_size * num_barrier_buffers * 2));
(base + (i * (control_size + frag_size)));
data->mcb_mpool_index[i].mcbmi_data =
((char*) data->mcb_mpool_index[i].mcbmi_control) +
control_size;
(((char*) data->mcb_mpool_index[i].mcbmi_control) +
control_size);
/* Memory affinity: control */
@ -504,7 +509,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
mca_coll_sm_bootstrap_comm_setup_t *bscs;
mca_coll_base_comm_t *data = comm->c_coll_selected_data;
int comm_size = ompi_comm_size(comm);
int num_segments = c->sm_communicator_num_segments;
int num_segments = c->sm_comm_num_segments;
int frag_size = c->sm_fragment_size;
int control_size = c->sm_control_size;
@ -547,7 +552,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
i = empty_index;
bshe->smbhe_cids[i] = comm->c_contextid;
bscs[i].smbcs_communicator_num_segments = num_segments;
bscs[i].smbcs_comm_num_segments = num_segments;
bscs[i].smbcs_count = comm_size;
/* Calculate how much space we need in the data mpool.
@ -556,9 +561,10 @@ static int bootstrap_comm(ompi_communicator_t *comm)
- size of the barrier data (2 of these):
- fan-in data (num_procs * control_size)
- fan-out data (num_procs * control_size)
- size of the control data (one for each segment):
- size of the "in use" buffers:
- num_in_use_buffers * control_size
- size of the message fragment area (one for each segment):
- control (num_procs * control_size)
- size of message fragment data (one for each segment):
- fragment data (num_procs * (frag_size))
So it's:
@ -585,7 +591,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
component would have been chosen), so we don't need
to do that cleanup. */
bscs[i].smbcs_data_mpool_offset = 0;
bscs[i].smbcs_communicator_num_segments = 0;
bscs[i].smbcs_comm_num_segments = 0;
--bscs[i].smbcs_count;
opal_atomic_unlock(&bshe->super.seg_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
@ -618,7 +624,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
/* Check to see if there was an error while allocating the shared
memory */
if (0 == bscs[i].smbcs_communicator_num_segments) {
if (0 == bscs[i].smbcs_comm_num_segments) {
err = OMPI_ERR_OUT_OF_RESOURCE;
}
@ -630,7 +636,7 @@ static int bootstrap_comm(ompi_communicator_t *comm)
data->mcb_mpool_base = c->sm_data_mpool->mpool_base(c->sm_data_mpool);
data->mcb_mpool_offset = bscs[i].smbcs_data_mpool_offset;
data->mcb_mpool_area = data->mcb_mpool_base + data->mcb_mpool_offset;
data->mcb_mpool_num_segments = bscs[i].smbcs_communicator_num_segments;
data->mcb_mpool_num_segments = bscs[i].smbcs_comm_num_segments;
data->mcb_operation_count = 0;
}
@ -665,8 +671,7 @@ int mca_coll_sm_bootstrap_finalize(void)
/* Free the area in the mpool that we were using */
if (mca_coll_sm_component.sm_data_mpool_created) {
/* JMS: there does not yet seem to be any opposite to
mca_mpool_base_module_create()... */
mca_mpool_base_module_destroy(mca_coll_sm_component.sm_data_mpool);
}
/* Free the entire bootstrap area (no need to zero out