1
1
openmpi/ompi/mca/coll/sm2/coll_sm2_reduce.c

775 строки
27 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2007-2008 UT-Battelle, LLC
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include "ompi/constants.h"
#include "coll_sm2.h"
#include "ompi/op/op.h"
#include "ompi/datatype/datatype.h"
#include "ompi/communicator/communicator.h"
/* debug
#include "opal/sys/timer.h"
extern uint64_t timers[7];
end debug */
/**
* Shared memory blocking allreduce.
*/
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root,
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int my_rank, comm_size, child_rank, child, n_children;
int count_processed,count_this_stripe;
int process_shift,my_node_index;
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
long long tag;
volatile char * my_data_pointer;
volatile char * child_data_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_reduction_node;
sm_work_buffer_t *sm_buffer_desc;
/* debug
last_root=root;
end debug */
sm_module=(mca_coll_sm2_module_t *) module;
/* compute process shift */
my_rank=ompi_comm_rank(comm);
comm_size=ompi_comm_size(comm);
process_shift=root;
my_node_index=my_rank-root;
/* wrap around */
if(0 > my_node_index ) {
my_node_index+=comm_size;
}
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_ddt_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
message_extent=dt_extent*count;
/* lenght of control and data regions */
ctl_size=sm_module->ctl_memory_per_proc_per_segment;
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
/* number of data types copies that the scratch buffer can hold */
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
if ( 0 == n_dts_per_buffer ) {
rc=OMPI_ERROR;
goto Error;
}
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_reduction_node=&(sm_module->reduction_tree[my_node_index]);
n_children=my_reduction_node->n_children;
/* debug
node_type=my_reduction_node->my_node_type;
end debug */
if( 1 == n_data_segments ) {
/* single data segment */
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/* debug
assert(tag);
end debug */
/* get a pointer to the shared-memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
/* debug
free_buff_free_index=tag;
end debug */
/* get number of elements to process in this stripe */
count_this_stripe=count;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
/***************************
* Fan into root phase
***************************/
if( ROOT_NODE == my_reduction_node->my_node_type ) {
/*
* copy local data from source buffer to result buffer
*/
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)rbuf,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 2 count %d root %d child_rank %d \n",
count,root,child_rank);
debug_module();
}
end debug */
/* wait until child flag is set */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)rbuf, count_this_stripe,dtype);
} /* end child loop */
} else if( INTERIOR_NODE == my_reduction_node->my_node_type ) {
/* copy segment into shared buffer - ompi_op_reduce
* provids only 2 buffers, so can't add from two
* into a third buffer.
*/
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* wait until child flag is set */
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 3 count %d root %d child_rank \n",
count,root,child_rank);
debug_module();
}
end debug */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)my_data_pointer, count_this_stripe,dtype);
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
} else {
/* leaf node */
/* copy segment into shared buffer - later on will optimize to
* eliminate extra copies.
*/
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
} else {
count_processed=0;
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/* get a pointer to the shared-memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
/* get number of elements to process in this stripe */
count_this_stripe=n_dts_per_buffer;
if( count_processed + count_this_stripe > count )
count_this_stripe=count-count_processed;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
/***************************
* Fan into root phase
***************************/
if( LEAF_NODE != my_reduction_node->my_node_type ) {
/* copy segment into shared buffer - ompi_op_reduce
* provids only 2 buffers, so can't add from two
* into a third buffer.
*/
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* wait until child flag is set */
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 1 count %d root %d child_rank %d \n",
count,root,child_rank);
debug_module();
}
end debug */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)my_data_pointer, count_this_stripe,dtype);
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
/* copy data to destination */
if( ROOT_NODE == my_reduction_node->my_node_type ) {
/* copy data to user supplied buffer */
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)rbuf+dt_extent*count_processed,
(char *)my_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
}
} else {
/* leaf node */
/* copy segment into shared buffer - later on will optimize to
* eliminate extra copies.
*/
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* update the count of elements processed */
count_processed+=count_this_stripe;
}
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking reduce.
*/
int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root,
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module)
{
/* local varibles */
int i,rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int pair_rank,exchange,extra_rank,n_proc_data,tmp;
int starting_proc;
int n_elements_per_proc, n_residual_elements;
int cnt_offset,n_copy;
pair_exchange_node_t *my_exchange_node;
int my_rank,comm_size,count_processed,count_this_stripe;
int count_this_exchange;
int done_copy_tag,ok_to_copy_tag;
size_t len_data_buffer;
ptrdiff_t dt_extent;
long long tag, base_tag;
sm_work_buffer_t *sm_buffer_desc;
volatile char * extra_rank_write_data_pointer;
volatile char * partner_base_pointer;
volatile char * my_pointer;
volatile char * my_base_pointer;
volatile char * partner_pointer;
volatile char * source_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
partner_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
extra_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
source_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
sm_module=(mca_coll_sm2_module_t *) module;
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_ddt_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* lenght of control and data regions */
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
/* number of data types copies that the scratch buffer can hold */
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
if ( 0 == n_dts_per_buffer ) {
rc=OMPI_ERROR;
goto Error;
}
len_data_buffer=n_dts_per_buffer*dt_extent;
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_exchange_node=&(sm_module->recursive_doubling_tree);
my_rank=ompi_comm_rank(comm);
comm_size=ompi_comm_size(comm);
/* get access to shared memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_base_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
count_processed=0;
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
/* get number of elements to process in this stripe */
/* debug
t2=opal_sys_timer_get_cycles();
end debug */
count_this_stripe=n_dts_per_buffer;
if( count_processed + count_this_stripe > count )
count_this_stripe=count-count_processed;
/* compute the number of elements "owned" by each process */
n_elements_per_proc=(count_this_stripe/my_exchange_node->n_largest_pow_2);
n_residual_elements=count_this_stripe-
n_elements_per_proc*my_exchange_node->n_largest_pow_2;
for(i=0 ; i < my_exchange_node->n_largest_pow_2 ; i++ ) {
sm_module->scratch_space[i]=n_elements_per_proc;
if( i < n_residual_elements) {
sm_module->scratch_space[i]++;
}
}
/* get unique set of tags for this stripe.
* Assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
base_tag=sm_module->collective_tag;
/* log_2 tags for recursive doubling, one for the non-power of 2
* initial send, 1 for first copy into shared memory, and
* one for completing the copyout.
*/
sm_module->collective_tag+=(my_exchange_node->log_2+3);
/* copy data into the write buffer */
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_base_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/* debug
{ int *int_tmp=(int *)my_base_pointer;
int i;
fprintf(stderr," my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_stripe ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
/* debug
t3=opal_sys_timer_get_cycles();
timers[1]+=(t3-t2);
end debug */
/* copy data in from the "extra" source, if need be */
tag=base_tag;
if(0 < my_exchange_node->n_extra_sources) {
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_buffer_desc->proc_memory[extra_rank].control_region;
extra_rank_write_data_pointer=
sm_buffer_desc->proc_memory[extra_rank].data_segment;
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
(void *)my_base_pointer, count_this_stripe,dtype);
} else {
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
}
MB();
/*
* reduce-scatter
*/
/*
* Signal parent that data is ready
*/
tag=base_tag+1;
my_ctl_pointer->flag=tag;
/*
* loop over data exchanges
*/
/* set the number of procs whos's data I will manipulate - this starts
* at the number of procs in the exchange, so a divide by two at each
* iteration will give the right number of proc for the given iteration
*/
/* debug
{ int *int_tmp=(int *)my_base_pointer;
int i;
fprintf(stderr," GGG my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_stripe ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
n_proc_data=my_exchange_node->n_largest_pow_2;
starting_proc=0;
for(exchange=my_exchange_node->n_exchanges-1;exchange>=0;exchange--) {
/* is the remote data read */
pair_rank=my_exchange_node->rank_exchanges[exchange];
partner_ctl_pointer=
sm_buffer_desc->proc_memory[pair_rank].control_region;
partner_base_pointer=
sm_buffer_desc->proc_memory[pair_rank].data_segment;
/* wait until remote data is read */
while( partner_ctl_pointer->flag < tag ) {
opal_progress();
}
/* figure out the base address to use : the lower rank gets
* the upper data, with the higher rank getting the lower half
* of the current chunk */
n_proc_data=n_proc_data/2;
if(pair_rank < my_rank ) {
starting_proc+=n_proc_data;
}
/* figure out my staring pointer */
tmp=0;
for(i=0 ; i < starting_proc ; i++ ) {
tmp+=sm_module->scratch_space[i];
}
my_pointer=my_base_pointer+tmp*dt_extent;
/* figure out partner's staring pointer */
partner_pointer=partner_base_pointer+tmp*dt_extent;
/* figure out how much to read */
tmp=0;
for(i=starting_proc ; i < starting_proc+n_proc_data ; i++ ) {
tmp+=sm_module->scratch_space[i];
}
count_this_exchange=tmp;
/* reduce data into my write buffer */
/* apply collective operation */
ompi_op_reduce(op,(void *)partner_pointer,
(void *)my_pointer, count_this_exchange,dtype);
/* debug
{ int *int_tmp=(int *)my_pointer;
int i;
fprintf(stderr," result my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_exchange ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
int_tmp=(int *)partner_pointer;
fprintf(stderr," partner data my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_exchange ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
/* signal that I am done reading my peer's data */
tag++;
MB();
my_ctl_pointer->flag=tag;
} /* end exchange loop */
/* debug
t8=opal_sys_timer_get_cycles();
end debug */
/* copy data out to final destination. Could do some sort of
* recursive doubleing in the sm, then copy to process private,
* which reduces memory contention. However, this also almost
* doubles the number of copies.
*/
ok_to_copy_tag=base_tag+1+my_exchange_node->log_2;
/* only root reads the results */
if( root == my_rank) {
/* read from the result buffers directly to the final destinaion */
cnt_offset=0;
for(n_copy=0 ; n_copy < my_exchange_node->n_largest_pow_2 ; n_copy++ ) {
if( 0 >= sm_module->scratch_space[n_copy] )
continue;
source_ctl_pointer=
sm_buffer_desc->proc_memory[n_copy].control_region;
source_pointer=
sm_buffer_desc->proc_memory[n_copy].data_segment;
/* wait until remote data is read */
while( source_ctl_pointer->flag < ok_to_copy_tag ) {
opal_progress();
}
/* copy data into the destination buffer */
rc=ompi_ddt_copy_content_same_ddt(dtype,
sm_module->scratch_space[n_copy],
(char *)((char *)rbuf+
dt_extent*(count_processed+cnt_offset)),
(char *)((char *)source_pointer+
dt_extent*cnt_offset));
if( 0 != rc ) {
return OMPI_ERROR;
}
cnt_offset+=sm_module->scratch_space[n_copy];
}
}
done_copy_tag=base_tag+2+my_exchange_node->log_2;
my_ctl_pointer->flag=done_copy_tag;
/* wait for all to read the data, before re-using this buffer */
if( stripe_number < (n_data_segments-1) ) {
for(n_copy=0 ; n_copy < comm_size ; n_copy++ ) {
source_ctl_pointer=
sm_buffer_desc->proc_memory[n_copy].control_region;
while( source_ctl_pointer-> flag < done_copy_tag ) {
opal_progress();
}
}
}
/* update the count of elements processed */
count_processed+=count_this_stripe;
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking reduce.
*/
int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module)
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
ptrdiff_t dt_extent;
size_t len_data_buffer;
sm_module=(mca_coll_sm2_module_t *) module;
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_ddt_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
len_data_buffer=count*dt_extent;
if( len_data_buffer <= sm_module->short_message_size) {
rc=sm_module->reduce_functions[SHORT_DATA_FN_REDUCE]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
else {
rc=sm_module->reduce_functions[LONG_DATA_FN_REDUCE]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#if 0
rc= mca_coll_sm2_reduce_intra_fanin(sbuf, rbuf, count,
dtype, op, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
rc= mca_coll_sm2_reduce_intra_reducescatter_gather(sbuf, rbuf, count,
dtype, op, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#endif
return OMPI_SUCCESS;
Error:
return rc;
}