reduce the number of temp buffers used.
This commit was SVN r17828.
Этот коммит содержится в:
родитель
0f9d642d51
Коммит
3c2f1eb8bf
@ -658,6 +658,329 @@ Error:
|
||||
return rc;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* just storing various versions of the recursive doubling algorithm,
|
||||
* so can compare them later on.
|
||||
*/
|
||||
/**
|
||||
* Shared memory blocking allreduce.
|
||||
*/
|
||||
static
|
||||
int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
||||
int count, struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op, struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module)
|
||||
{
|
||||
/* local variables */
|
||||
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
||||
int pair_rank,exchange,extra_rank;
|
||||
int index_read,index_write;
|
||||
pair_exchange_node_t *my_exchange_node;
|
||||
int my_rank,count_processed,count_this_stripe;
|
||||
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
|
||||
long long tag, base_tag;
|
||||
sm_work_buffer_t *sm_buffer_desc;
|
||||
volatile char * my_tmp_data_buffer[2];
|
||||
volatile char * my_write_pointer;
|
||||
volatile char * my_read_pointer;
|
||||
volatile char * extra_rank_write_data_pointer;
|
||||
volatile char * extra_rank_read_data_pointer;
|
||||
volatile char * partner_read_pointer;
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||
partner_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||
extra_ctl_pointer;
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
/* debug */
|
||||
opal_timer_t t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
|
||||
/* end debug */
|
||||
|
||||
sm_module=(mca_coll_sm2_module_t *) module;
|
||||
|
||||
/* get size of data needed - same layout as user data, so that
|
||||
* we can apply the reudction routines directly on these buffers
|
||||
*/
|
||||
rc=ompi_ddt_type_size(dtype, &dt_extent);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
message_extent=dt_extent*count;
|
||||
|
||||
/* lenght of control and data regions */
|
||||
ctl_size=sm_module->ctl_memory_per_proc_per_segment;
|
||||
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
|
||||
|
||||
/* number of data types copies that the scratch buffer can hold */
|
||||
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
|
||||
if ( 0 == n_dts_per_buffer ) {
|
||||
rc=OMPI_ERROR;
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* need a read and a write buffer for a pair-wise exchange of data */
|
||||
n_dts_per_buffer/=2;
|
||||
len_data_buffer=n_dts_per_buffer*dt_extent;
|
||||
|
||||
/* compute number of stripes needed to process this collective */
|
||||
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
|
||||
|
||||
/* get my node for the reduction tree */
|
||||
my_exchange_node=&(sm_module->recursive_doubling_tree);
|
||||
my_rank=ompi_comm_rank(comm);
|
||||
|
||||
count_processed=0;
|
||||
|
||||
/* get a pointer to the shared-memory working buffer */
|
||||
/* NOTE: starting with a rather synchronous approach */
|
||||
|
||||
/* use the same set of buffers for a single reduction */
|
||||
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
|
||||
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
|
||||
/* debug */
|
||||
t0=opal_sys_timer_get_cycles();
|
||||
/* end debug */
|
||||
/* debug */
|
||||
t1=opal_sys_timer_get_cycles();
|
||||
/* end debug */
|
||||
/* get number of elements to process in this stripe */
|
||||
count_this_stripe=n_dts_per_buffer;
|
||||
if( count_processed + count_this_stripe > count )
|
||||
count_this_stripe=count-count_processed;
|
||||
|
||||
/* get unique set of tags for this stripe.
|
||||
* Assume only one collective
|
||||
* per communicator at a given time, so no locking needed
|
||||
* for atomic update of the tag */
|
||||
base_tag=sm_module->collective_tag;
|
||||
sm_module->collective_tag+=my_exchange_node->n_tags;
|
||||
|
||||
/* get pointers to my work buffers */
|
||||
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
|
||||
my_write_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
|
||||
my_read_pointer=my_write_pointer+len_data_buffer;
|
||||
my_tmp_data_buffer[0]=my_write_pointer;
|
||||
my_tmp_data_buffer[1]=my_read_pointer;
|
||||
/* debug */
|
||||
t2=opal_sys_timer_get_cycles();
|
||||
timers[0]+=(t2-t1);
|
||||
/* end debug */
|
||||
|
||||
/* copy data into the write buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_write_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
/* debug */
|
||||
t3=opal_sys_timer_get_cycles();
|
||||
timers[1]+=(t3-t2);
|
||||
/* end debug */
|
||||
|
||||
/* copy data in from the "extra" source, if need be */
|
||||
tag=base_tag;
|
||||
if(0 < my_exchange_node->n_extra_sources) {
|
||||
|
||||
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
|
||||
|
||||
extra_rank=my_exchange_node->rank_extra_source;
|
||||
extra_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
||||
extra_rank_write_data_pointer=
|
||||
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
||||
|
||||
/* wait until remote data is read */
|
||||
while( extra_ctl_pointer->flag < tag ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* apply collective operation */
|
||||
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
||||
(void *)my_write_pointer, count_this_stripe,dtype);
|
||||
} else {
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
MB();
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
tag=base_tag+1;
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
/* loop over data exchanges */
|
||||
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
|
||||
|
||||
/* debug */
|
||||
t4=opal_sys_timer_get_cycles();
|
||||
/* end debug */
|
||||
|
||||
index_read=(exchange&1);
|
||||
index_write=((exchange+1)&1);
|
||||
|
||||
my_write_pointer=my_tmp_data_buffer[index_write];
|
||||
my_read_pointer=my_tmp_data_buffer[index_read];
|
||||
|
||||
/* is the remote data read */
|
||||
pair_rank=my_exchange_node->rank_exchanges[exchange];
|
||||
partner_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[pair_rank].control_region;
|
||||
partner_read_pointer=
|
||||
sm_buffer_desc->proc_memory[pair_rank].data_segment;
|
||||
if( 1 == index_read ) {
|
||||
partner_read_pointer+=len_data_buffer;
|
||||
}
|
||||
|
||||
/* wait until remote data is read */
|
||||
while( partner_ctl_pointer->flag < tag ) {
|
||||
opal_progress();
|
||||
}
|
||||
/* debug */
|
||||
t5=opal_sys_timer_get_cycles();
|
||||
timers[2]+=(t5-t4);
|
||||
/* end debug */
|
||||
|
||||
/* reduce data into my write buffer */
|
||||
/* apply collective operation */
|
||||
/*
|
||||
ompi_op_reduce(op,(void *)partner_read_pointer,
|
||||
(void *)my_write_pointer, count_this_stripe,dtype);
|
||||
*/
|
||||
|
||||
/* test */
|
||||
|
||||
{
|
||||
int ii,n_ints;
|
||||
int * restrict my_read=(int *)my_read_pointer;
|
||||
int * restrict my_write=(int *)my_write_pointer;
|
||||
int * restrict exchange_read=(int *)partner_read_pointer;
|
||||
n_ints=count_this_stripe;
|
||||
for(ii=0 ; ii < n_ints ; ii++ ) {
|
||||
my_write[ii]=my_read[ii]+exchange_read[ii];
|
||||
}
|
||||
|
||||
}
|
||||
/* debug */
|
||||
t6=opal_sys_timer_get_cycles();
|
||||
timers[3]+=(t6-t5);
|
||||
/* end debug */
|
||||
|
||||
/* end test */
|
||||
|
||||
/* signal that I am done reading my peer's data */
|
||||
tag++;
|
||||
MB();
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
/* wait for my peer to finish - other wise buffers may be
|
||||
* reused too early */
|
||||
while( partner_ctl_pointer->flag < tag ) {
|
||||
opal_progress();
|
||||
}
|
||||
/* debug */
|
||||
t7=opal_sys_timer_get_cycles();
|
||||
timers[4]+=(t7-t6);
|
||||
/* end debug */
|
||||
|
||||
}
|
||||
|
||||
/* copy data in from the "extra" source, if need be */
|
||||
if(0 < my_exchange_node->n_extra_sources) {
|
||||
tag=base_tag+my_exchange_node->n_tags-1;
|
||||
|
||||
if ( EXTRA_NODE == my_exchange_node->node_type ) {
|
||||
|
||||
extra_rank=my_exchange_node->rank_extra_source;
|
||||
extra_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
||||
extra_rank_read_data_pointer=
|
||||
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
||||
index_read=(my_exchange_node->log_2&1);
|
||||
if( index_read ) {
|
||||
extra_rank_read_data_pointer+=len_data_buffer;
|
||||
}
|
||||
|
||||
/* wait until remote data is read */
|
||||
while(! ( extra_ctl_pointer->flag == tag ) ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
|
||||
/* write the data into my read buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_write_pointer,
|
||||
(char *)extra_rank_read_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
tag=base_tag+my_exchange_node->n_tags-1;
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* debug */
|
||||
t8=opal_sys_timer_get_cycles();
|
||||
/* end debug */
|
||||
/* copy data into the destination buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)((char *)rbuf+dt_extent*count_processed),
|
||||
(char *)my_write_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
/* debug */
|
||||
t9=opal_sys_timer_get_cycles();
|
||||
timers[5]+=(t9-t8);
|
||||
/* end debug */
|
||||
|
||||
/* "free" the shared-memory working buffer */
|
||||
/* debug */
|
||||
t10=opal_sys_timer_get_cycles();
|
||||
timers[6]+=(t10-t9);
|
||||
/* end debug */
|
||||
|
||||
/* update the count of elements processed */
|
||||
count_processed+=count_this_stripe;
|
||||
}
|
||||
|
||||
rc=free_sm2_shared_buffer(sm_module);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return rc;
|
||||
|
||||
Error:
|
||||
return rc;
|
||||
}
|
||||
#endif
|
||||
/**
|
||||
* Shared memory blocking allreduce.
|
||||
*/
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user