implement allreduce as reduce-scatter, followed by an allgather.
This commit was SVN r18132.
Этот коммит содержится в:
родитель
08ead87604
Коммит
a6bdbfab97
@ -177,6 +177,9 @@ BEGIN_C_DECLS
|
|||||||
/* log 2 of largest full power of 2 for this node set */
|
/* log 2 of largest full power of 2 for this node set */
|
||||||
int log_2;
|
int log_2;
|
||||||
|
|
||||||
|
/* largest power of 2 that fits in this group */
|
||||||
|
int n_largest_pow_2;
|
||||||
|
|
||||||
/* node type */
|
/* node type */
|
||||||
int node_type;
|
int node_type;
|
||||||
|
|
||||||
@ -386,6 +389,9 @@ BEGIN_C_DECLS
|
|||||||
long long barrier_bank_cntr;
|
long long barrier_bank_cntr;
|
||||||
/* end debug */
|
/* end debug */
|
||||||
|
|
||||||
|
/* scratch space - one int per process */
|
||||||
|
int *scratch_space;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t;
|
typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t;
|
||||||
|
@ -1106,6 +1106,340 @@ Error:
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shared memory blocking allreduce.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf,
|
||||||
|
int count, struct ompi_datatype_t *dtype,
|
||||||
|
struct ompi_op_t *op, struct ompi_communicator_t *comm,
|
||||||
|
struct mca_coll_base_module_1_1_0_t *module)
|
||||||
|
{
|
||||||
|
/* local varibles */
|
||||||
|
int i,rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
||||||
|
int pair_rank,exchange,extra_rank,n_proc_data,tmp;
|
||||||
|
int starting_proc,n_procs_to_read,base_block_proc,base_read_proc;
|
||||||
|
int n_elements_per_proc, n_residual_elements;
|
||||||
|
int cnt_offset,n_copy;
|
||||||
|
pair_exchange_node_t *my_exchange_node;
|
||||||
|
int my_rank,comm_size,count_processed,count_this_stripe;
|
||||||
|
int count_this_exchange;
|
||||||
|
int done_copy_tag,ok_to_copy_tag;
|
||||||
|
size_t len_data_buffer;
|
||||||
|
ptrdiff_t dt_extent;
|
||||||
|
long long tag, base_tag;
|
||||||
|
sm_work_buffer_t *sm_buffer_desc;
|
||||||
|
volatile char * my_write_pointer;
|
||||||
|
volatile char * extra_rank_write_data_pointer;
|
||||||
|
volatile char * extra_rank_read_data_pointer;
|
||||||
|
volatile char * partner_base_pointer;
|
||||||
|
volatile char * my_pointer;
|
||||||
|
volatile char * my_base_pointer;
|
||||||
|
volatile char * partner_pointer;
|
||||||
|
volatile char * source_pointer;
|
||||||
|
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||||
|
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||||
|
partner_ctl_pointer;
|
||||||
|
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||||
|
extra_ctl_pointer;
|
||||||
|
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||||
|
source_ctl_pointer;
|
||||||
|
mca_coll_sm2_module_t *sm_module;
|
||||||
|
|
||||||
|
sm_module=(mca_coll_sm2_module_t *) module;
|
||||||
|
|
||||||
|
/* get size of data needed - same layout as user data, so that
|
||||||
|
* we can apply the reudction routines directly on these buffers
|
||||||
|
*/
|
||||||
|
rc=ompi_ddt_type_extent(dtype, &dt_extent);
|
||||||
|
if( OMPI_SUCCESS != rc ) {
|
||||||
|
goto Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* lenght of control and data regions */
|
||||||
|
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
|
||||||
|
|
||||||
|
/* number of data types copies that the scratch buffer can hold */
|
||||||
|
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
|
||||||
|
if ( 0 == n_dts_per_buffer ) {
|
||||||
|
rc=OMPI_ERROR;
|
||||||
|
goto Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
len_data_buffer=n_dts_per_buffer*dt_extent;
|
||||||
|
|
||||||
|
/* compute number of stripes needed to process this collective */
|
||||||
|
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
|
||||||
|
|
||||||
|
/* get my node for the reduction tree */
|
||||||
|
my_exchange_node=&(sm_module->recursive_doubling_tree);
|
||||||
|
my_rank=ompi_comm_rank(comm);
|
||||||
|
comm_size=ompi_comm_size(comm);
|
||||||
|
|
||||||
|
/* get access to shared memory working buffer */
|
||||||
|
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
|
||||||
|
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
|
||||||
|
my_base_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
|
||||||
|
|
||||||
|
count_processed=0;
|
||||||
|
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
|
||||||
|
/* get number of elements to process in this stripe */
|
||||||
|
/* debug
|
||||||
|
t2=opal_sys_timer_get_cycles();
|
||||||
|
end debug */
|
||||||
|
count_this_stripe=n_dts_per_buffer;
|
||||||
|
if( count_processed + count_this_stripe > count )
|
||||||
|
count_this_stripe=count-count_processed;
|
||||||
|
|
||||||
|
/* compute the number of elements "owned" by each process */
|
||||||
|
n_elements_per_proc=(count_this_stripe/my_exchange_node->n_largest_pow_2);
|
||||||
|
n_residual_elements=count_this_stripe-
|
||||||
|
n_elements_per_proc*my_exchange_node->n_largest_pow_2;
|
||||||
|
for(i=0 ; i < my_exchange_node->n_largest_pow_2 ; i++ ) {
|
||||||
|
sm_module->scratch_space[i]=n_elements_per_proc;
|
||||||
|
if( i < n_residual_elements) {
|
||||||
|
sm_module->scratch_space[i]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* debug
|
||||||
|
fprintf(stderr," my_rank %d element list count_this_stripe %d : ",my_rank,count_this_stripe);
|
||||||
|
for(i=0 ; i < comm_size ; i++ ) {
|
||||||
|
fprintf(stderr," %d ",sm_module->scratch_space[i]);
|
||||||
|
}
|
||||||
|
fprintf(stderr," \n");
|
||||||
|
fflush(stderr);
|
||||||
|
end debug */
|
||||||
|
|
||||||
|
/* get unique set of tags for this stripe.
|
||||||
|
* Assume only one collective
|
||||||
|
* per communicator at a given time, so no locking needed
|
||||||
|
* for atomic update of the tag */
|
||||||
|
base_tag=sm_module->collective_tag;
|
||||||
|
/* log_2 tags for recursive doubling, one for the non-power of 2
|
||||||
|
* initial send, 1 for first copy into shared memory, and
|
||||||
|
* one for completing the copyout.
|
||||||
|
*/
|
||||||
|
sm_module->collective_tag+=(my_exchange_node->log_2+3);
|
||||||
|
|
||||||
|
|
||||||
|
/* copy data into the write buffer */
|
||||||
|
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||||
|
(char *)my_base_pointer,
|
||||||
|
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||||
|
if( 0 != rc ) {
|
||||||
|
return OMPI_ERROR;
|
||||||
|
}
|
||||||
|
/* debug
|
||||||
|
{ int *int_tmp=(int *)my_base_pointer;
|
||||||
|
int i;
|
||||||
|
fprintf(stderr," my rank %d data in tmp :: ",my_rank);
|
||||||
|
for (i=0 ; i < count_this_stripe ; i++ ) {
|
||||||
|
fprintf(stderr," %d ",int_tmp[i]);
|
||||||
|
}
|
||||||
|
fprintf(stderr,"\n");
|
||||||
|
fflush(stderr);
|
||||||
|
}
|
||||||
|
end debug */
|
||||||
|
|
||||||
|
/* debug
|
||||||
|
t3=opal_sys_timer_get_cycles();
|
||||||
|
timers[1]+=(t3-t2);
|
||||||
|
end debug */
|
||||||
|
|
||||||
|
/* copy data in from the "extra" source, if need be */
|
||||||
|
tag=base_tag;
|
||||||
|
if(0 < my_exchange_node->n_extra_sources) {
|
||||||
|
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
|
||||||
|
extra_rank=my_exchange_node->rank_extra_source;
|
||||||
|
extra_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
||||||
|
extra_rank_write_data_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
||||||
|
|
||||||
|
/* wait until remote data is read */
|
||||||
|
while( extra_ctl_pointer->flag < tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* apply collective operation */
|
||||||
|
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
||||||
|
(void *)my_base_pointer, count_this_stripe,dtype);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
/* set memory barriet to make sure data is in main memory before
|
||||||
|
* the completion flgas are set.
|
||||||
|
*/
|
||||||
|
|
||||||
|
MB();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Signal parent that data is ready
|
||||||
|
*/
|
||||||
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MB();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* reduce-scatter
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
* Signal parent that data is ready
|
||||||
|
*/
|
||||||
|
tag=base_tag+1;
|
||||||
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* loop over data exchanges
|
||||||
|
*/
|
||||||
|
/* set the number of procs whos's data I will manipulate - this starts
|
||||||
|
* at the number of procs in the exchange, so a divide by two at each
|
||||||
|
* iteration will give the right number of proc for the given iteration
|
||||||
|
*/
|
||||||
|
/* debug
|
||||||
|
{ int *int_tmp=(int *)my_base_pointer;
|
||||||
|
int i;
|
||||||
|
fprintf(stderr," GGG my rank %d data in tmp :: ",my_rank);
|
||||||
|
for (i=0 ; i < count_this_stripe ; i++ ) {
|
||||||
|
fprintf(stderr," %d ",int_tmp[i]);
|
||||||
|
}
|
||||||
|
fprintf(stderr,"\n");
|
||||||
|
fflush(stderr);
|
||||||
|
}
|
||||||
|
end debug */
|
||||||
|
n_proc_data=my_exchange_node->n_largest_pow_2;
|
||||||
|
starting_proc=0;
|
||||||
|
for(exchange=my_exchange_node->n_exchanges-1;exchange>=0;exchange--) {
|
||||||
|
|
||||||
|
/* is the remote data read */
|
||||||
|
pair_rank=my_exchange_node->rank_exchanges[exchange];
|
||||||
|
|
||||||
|
partner_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[pair_rank].control_region;
|
||||||
|
partner_base_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[pair_rank].data_segment;
|
||||||
|
|
||||||
|
/* wait until remote data is read */
|
||||||
|
while( partner_ctl_pointer->flag < tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* figure out the base address to use : the lower rank gets
|
||||||
|
* the upper data, with the higher rank getting the lower half
|
||||||
|
* of the current chunk */
|
||||||
|
n_proc_data=n_proc_data/2;
|
||||||
|
if(pair_rank < my_rank ) {
|
||||||
|
starting_proc+=n_proc_data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* figure out my staring pointer */
|
||||||
|
tmp=0;
|
||||||
|
for(i=0 ; i < starting_proc ; i++ ) {
|
||||||
|
tmp+=sm_module->scratch_space[i];
|
||||||
|
}
|
||||||
|
my_pointer=my_base_pointer+tmp*dt_extent;
|
||||||
|
/* figure out partner's staring pointer */
|
||||||
|
partner_pointer=partner_base_pointer+tmp*dt_extent;
|
||||||
|
|
||||||
|
/* figure out how much to read */
|
||||||
|
tmp=0;
|
||||||
|
for(i=starting_proc ; i < starting_proc+n_proc_data ; i++ ) {
|
||||||
|
tmp+=sm_module->scratch_space[i];
|
||||||
|
}
|
||||||
|
count_this_exchange=tmp;
|
||||||
|
|
||||||
|
/* reduce data into my write buffer */
|
||||||
|
/* apply collective operation */
|
||||||
|
ompi_op_reduce(op,(void *)partner_pointer,
|
||||||
|
(void *)my_pointer, count_this_exchange,dtype);
|
||||||
|
/* debug
|
||||||
|
{ int *int_tmp=(int *)my_pointer;
|
||||||
|
int i;
|
||||||
|
fprintf(stderr," result my rank %d data in tmp :: ",my_rank);
|
||||||
|
for (i=0 ; i < count_this_exchange ; i++ ) {
|
||||||
|
fprintf(stderr," %d ",int_tmp[i]);
|
||||||
|
}
|
||||||
|
fprintf(stderr,"\n");
|
||||||
|
int_tmp=(int *)partner_pointer;
|
||||||
|
fprintf(stderr," partner data my rank %d data in tmp :: ",my_rank);
|
||||||
|
for (i=0 ; i < count_this_exchange ; i++ ) {
|
||||||
|
fprintf(stderr," %d ",int_tmp[i]);
|
||||||
|
}
|
||||||
|
fprintf(stderr,"\n");
|
||||||
|
fflush(stderr);
|
||||||
|
}
|
||||||
|
end debug */
|
||||||
|
|
||||||
|
/* signal that I am done reading my peer's data */
|
||||||
|
tag++;
|
||||||
|
MB();
|
||||||
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
|
|
||||||
|
} /* end exchange loop */
|
||||||
|
|
||||||
|
/* debug
|
||||||
|
t8=opal_sys_timer_get_cycles();
|
||||||
|
end debug */
|
||||||
|
/* copy data out to final destination. Could do some sort of
|
||||||
|
* recursive doubleing in the sm, then copy to process private,
|
||||||
|
* which reduces memory contention. However, this also almost
|
||||||
|
* doubles the number of copies.
|
||||||
|
*/
|
||||||
|
ok_to_copy_tag=base_tag+1+my_exchange_node->log_2;
|
||||||
|
|
||||||
|
/* read from the result buffers directly to the final destinaion */
|
||||||
|
cnt_offset=0;
|
||||||
|
for(n_copy=0 ; n_copy < my_exchange_node->n_largest_pow_2 ; n_copy++ ) {
|
||||||
|
|
||||||
|
if( 0 >= sm_module->scratch_space[n_copy] )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
source_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[n_copy].control_region;
|
||||||
|
source_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[n_copy].data_segment;
|
||||||
|
|
||||||
|
/* wait until remote data is read */
|
||||||
|
while( source_ctl_pointer->flag < ok_to_copy_tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
/* copy data into the destination buffer */
|
||||||
|
rc=ompi_ddt_copy_content_same_ddt(dtype,
|
||||||
|
sm_module->scratch_space[n_copy],
|
||||||
|
(char *)((char *)rbuf+
|
||||||
|
dt_extent*(count_processed+cnt_offset)),
|
||||||
|
(char *)((char *)source_pointer+
|
||||||
|
dt_extent*cnt_offset));
|
||||||
|
if( 0 != rc ) {
|
||||||
|
return OMPI_ERROR;
|
||||||
|
}
|
||||||
|
cnt_offset+=sm_module->scratch_space[n_copy];
|
||||||
|
}
|
||||||
|
done_copy_tag=base_tag+2+my_exchange_node->log_2;
|
||||||
|
my_ctl_pointer->flag=done_copy_tag;
|
||||||
|
|
||||||
|
/* wait for all to read the data, before re-using this buffer */
|
||||||
|
if( stripe_number < (n_data_segments-1) ) {
|
||||||
|
for(n_copy=0 ; n_copy < comm_size ; n_copy++ ) {
|
||||||
|
source_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[n_copy].control_region;
|
||||||
|
while( source_ctl_pointer-> flag < done_copy_tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* update the count of elements processed */
|
||||||
|
count_processed+=count_this_stripe;
|
||||||
|
}
|
||||||
|
/* return */
|
||||||
|
return rc;
|
||||||
|
|
||||||
|
Error:
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
#if 0
|
#if 0
|
||||||
/* just storing various versions of the recursive doubling algorithm,
|
/* just storing various versions of the recursive doubling algorithm,
|
||||||
* so can compare them later on.
|
* so can compare them later on.
|
||||||
@ -1421,16 +1755,6 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
if( 0 != rc ) {
|
if( 0 != rc ) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
/* debug
|
|
||||||
t9=opal_sys_timer_get_cycles();
|
|
||||||
timers[5]+=(t9-t8);
|
|
||||||
end debug */
|
|
||||||
|
|
||||||
/* "free" the shared-memory working buffer */
|
|
||||||
/* debug
|
|
||||||
t10=opal_sys_timer_get_cycles();
|
|
||||||
timers[6]+=(t10-t9);
|
|
||||||
end debug */
|
|
||||||
|
|
||||||
/* update the count of elements processed */
|
/* update the count of elements processed */
|
||||||
count_processed+=count_this_stripe;
|
count_processed+=count_this_stripe;
|
||||||
@ -1462,13 +1786,18 @@ int mca_coll_sm2_allreduce_intra(void *sbuf, void *rbuf, int count,
|
|||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if( 0 != (op->o_flags & OMPI_OP_FLAGS_COMMUTE)) {
|
if( 0 != (op->o_flags & OMPI_OP_FLAGS_COMMUTE)) {
|
||||||
#endif
|
|
||||||
/* Commutative Operation */
|
/* Commutative Operation */
|
||||||
rc= mca_coll_sm2_allreduce_intra_recursive_doubling(sbuf, rbuf, count,
|
rc= mca_coll_sm2_allreduce_intra_recursive_doubling(sbuf, rbuf, count,
|
||||||
dtype, op, comm, module);
|
dtype, op, comm, module);
|
||||||
if( OMPI_SUCCESS != rc ) {
|
if( OMPI_SUCCESS != rc ) {
|
||||||
goto Error;
|
goto Error;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
rc= mca_coll_sm2_allreduce_intra_reducescatter_allgather(sbuf, rbuf, count,
|
||||||
|
dtype, op, comm, module);
|
||||||
|
if( OMPI_SUCCESS != rc ) {
|
||||||
|
goto Error;
|
||||||
|
}
|
||||||
#if 0
|
#if 0
|
||||||
} else {
|
} else {
|
||||||
/* Non-Commutative Operation */
|
/* Non-Commutative Operation */
|
||||||
|
@ -1022,6 +1022,12 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* allocate process private scratch space */
|
||||||
|
sm_module->scratch_space=(int *)malloc(sizeof(int)*group_size);
|
||||||
|
if( NULL == sm_module->scratch_space) {
|
||||||
|
goto CLEANUP;
|
||||||
|
}
|
||||||
|
|
||||||
/* touch pages to apply memory affinity - Note: do we really need this or will
|
/* touch pages to apply memory affinity - Note: do we really need this or will
|
||||||
* the algorithms do this */
|
* the algorithms do this */
|
||||||
|
|
||||||
@ -1060,6 +1066,11 @@ CLEANUP:
|
|||||||
sm_module->sm_buffer_descriptor=NULL;
|
sm_module->sm_buffer_descriptor=NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(sm_module->scratch_space) {
|
||||||
|
free(sm_module->scratch_space);
|
||||||
|
sm_module->scratch_space=NULL;
|
||||||
|
}
|
||||||
|
|
||||||
OBJ_RELEASE(sm_module);
|
OBJ_RELEASE(sm_module);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -234,6 +234,12 @@ int setup_recursive_doubling_tree_node(int num_nodes, int node_rank,
|
|||||||
}
|
}
|
||||||
exchange_node->log_2=n_exchanges;
|
exchange_node->log_2=n_exchanges;
|
||||||
|
|
||||||
|
tmp=1;
|
||||||
|
for(i=0 ; i < n_exchanges ; i++ ) {
|
||||||
|
tmp*=2;
|
||||||
|
}
|
||||||
|
exchange_node->n_largest_pow_2=tmp;
|
||||||
|
|
||||||
/* set node characteristics - node that is not within the largest
|
/* set node characteristics - node that is not within the largest
|
||||||
* power of 2 will just send it's data to node that will participate
|
* power of 2 will just send it's data to node that will participate
|
||||||
* in the recursive doubling, and get the result back at the end.
|
* in the recursive doubling, and get the result back at the end.
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user