1
1

got it all, but for the mem copies. Also, need to make sure volatile declarations are all inplace, as well as memory barriers.

This commit was SVN r17572.
Этот коммит содержится в:
Rich Graham 2008-02-25 00:16:21 +00:00
родитель 2d8c2420e8
Коммит b4bbb70bb7
2 изменённых файлов: 98 добавлений и 14 удалений

Просмотреть файл

@ -147,9 +147,11 @@ BEGIN_C_DECLS
struct mca_coll_sm2_nb_request_process_shared_mem_t {
/* flag used to indicate the status of this memory region */
long long flag;
long long index;
/* pading */
char padding[CACHE_LINE_SIZE-sizeof(long long)];
/* Note: need to change this so it takes less memory */
char padding[2*CACHE_LINE_SIZE-2*sizeof(long long)];
};
typedef struct mca_coll_sm2_nb_request_process_shared_mem_t

Просмотреть файл

@ -14,6 +14,7 @@
#include "coll_sm2.h"
#include "ompi/op/op.h"
#include "ompi/datatype/datatype.h"
#include "ompi/communicator/communicator.h"
/**
@ -27,10 +28,21 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
{
/* local variables */
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int my_rank, child_rank, parent_rank, child, n_parents, n_children;
int my_parent;
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
long long tag;
volatile char *sm_buffer;
char *my_data_pointer, * volatile child_data_pointer;
char * volatile parent_data_pointer, * volatile root_data_pointer;
char *my_base_temp_pointer, * volatile child_base_temp_pointer;
char * volatile parent_base_temp_pointer, * volatile root_base_temp_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t * volatile child_ctl_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t * volatile parent_ctl_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t * volatile root_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_reduction_node;
sm_module=(mca_coll_sm2_module_t *) module;
@ -63,6 +75,13 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_rank=ompi_comm_rank(comm);
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
n_children=my_reduction_node->n_children;
n_parents=my_reduction_node->n_parents;
my_parent=my_reduction_node->parent_rank;
/* get a pointer to the shared-memory working buffer */
/* NOTE: starting with a rather synchronous approach */
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
@ -71,6 +90,16 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
rc=OMPI_ERR_OUT_OF_RESOURCE;
goto Error;
}
/* get base address to "my" memory segment */
my_base_temp_pointer=(char *)
((char *)sm_buffer+sm_module->sm_buffer_mgmt_barrier_tree.my_rank*
sm_module->segement_size_per_process);
/* offset to data segment */
my_data_pointer=my_base_temp_pointer+ctl_size;
my_ctl_pointer=(mca_coll_sm2_nb_request_process_shared_mem_t *)
my_base_temp_pointer;
/*
* Fan into root phase
*/
@ -82,31 +111,84 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
/* get base address of child process */
child_base_temp_pointer=(char *)
((char *)sm_buffer+child_rank*
sm_module->segement_size_per_process);
child_data_pointer=child_base_temp_pointer+ctl_size;
child_ctl_pointer=
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
child_base_temp_pointer;
/* wait until child flag is set */
while(!
(child_ctl_pointer->flag == tag &
child_ctl_pointer->index== stripe_number) ) {
/* Note: Actually need to make progress here */
;
}
/* apply collective operation */
ompi_op_reduce(op,child_data_pointer,my_data_pointer,
count,dtype);
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
my_ctl_pointer->index=stripe_number;
/*
* Fan out from root phase
* Fan out from root phase - let the memory copies at each
* stage help reduce memory contention.
*/
if( 0 < my_reduction_node->n_parents ) {
/* I am the root - so copy signal children, and then
* start reading
*/
my_ctl_pointer->flag=-tag;
/*
* wait on Parent to signal that data is ready
*/
/* copy data to user supplied buffer */
/*
* Copy data to shared buffer
*/
} else {
parent_data_pointer=(char *)
((char *)sm_buffer+my_parent*
sm_module->segement_size_per_process);
/*
* Signal children that Data is ready for reading
*/
parent_ctl_pointer=parent_data_pointer+ctl_size;
child_ctl_pointer=
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
parent_data_pointer;
/*
* wait on Parent to signal that data is ready
*/
while(!
(parent_ctl_pointer->flag == -tag &
parent_ctl_pointer->index== stripe_number) ) {
/* Note: Actually need to make progress here */
;
}
/*
* Copy data out to destination
*/
/* copy data to user supplied buffer */
root_base_temp_pointer=(char *)sm_buffer;
root_data_pointer=child_base_temp_pointer+ctl_size;
/* signal children that they may read the result data */
my_ctl_pointer->flag=-tag;
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);