add reduction routine - fix buffer recycling logic which was totally
broken. This commit was SVN r18065.
Этот коммит содержится в:
родитель
50433bf833
Коммит
eb5d6096f1
@ -22,6 +22,7 @@ sources = \
|
||||
coll_sm2.h \
|
||||
coll_sm2_component.c \
|
||||
coll_sm2_module.c \
|
||||
coll_sm2_reduce.c \
|
||||
coll_sm2_allreduce.c \
|
||||
coll_sm2_barrier.c \
|
||||
coll_sm2_service.c
|
||||
|
@ -240,6 +240,11 @@ BEGIN_C_DECLS
|
||||
* a per process basis
|
||||
*/
|
||||
sm_memory_region_desc_t *proc_memory;
|
||||
|
||||
/*
|
||||
* bank index
|
||||
*/
|
||||
int bank_index;
|
||||
};
|
||||
typedef struct sm_work_buffer_t sm_work_buffer_t;
|
||||
|
||||
@ -327,12 +332,6 @@ BEGIN_C_DECLS
|
||||
*/
|
||||
int sm2_first_buffer_index_next_bank;
|
||||
|
||||
/* index of last buffer in this memory bank -
|
||||
* We start the non-blocking barrier after allocating
|
||||
* this buffer.
|
||||
*/
|
||||
int sm2_last_buffer_index_this_bank;
|
||||
|
||||
/* communicator - there is a one-to-one association between
|
||||
* the communicator and the module
|
||||
*/
|
||||
@ -488,6 +487,15 @@ BEGIN_C_DECLS
|
||||
struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module);
|
||||
|
||||
/**
|
||||
* Shared memory blocking reduce
|
||||
*/
|
||||
int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module);
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* MCA_COLL_SM2_EXPORT_H */
|
||||
|
@ -543,7 +543,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
sm_module->super.coll_exscan = NULL;
|
||||
sm_module->super.coll_gather = NULL;
|
||||
sm_module->super.coll_gatherv = NULL;
|
||||
sm_module->super.coll_reduce = NULL;
|
||||
sm_module->super.coll_reduce = mca_coll_sm2_reduce_intra;
|
||||
sm_module->super.coll_reduce_scatter = NULL;
|
||||
sm_module->super.coll_scan = NULL;
|
||||
sm_module->super.coll_scatter = NULL;
|
||||
@ -793,8 +793,6 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
/* NOTE: need to fix this if we have only one memory bank */
|
||||
sm_module->sm2_first_buffer_index_next_bank=
|
||||
sm_module->sm2_module_num_regions_per_bank;
|
||||
sm_module->sm2_last_buffer_index_this_bank=
|
||||
sm_module->sm2_module_num_regions_per_bank-1;
|
||||
if(sm_module->sm2_module_num_memory_banks > 1 ) {
|
||||
sm_module->sm2_first_buffer_index_next_bank=
|
||||
mca_coll_sm2_component.sm2_num_regions_per_bank;
|
||||
@ -816,6 +814,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
i*sm_module->segment_size;
|
||||
sm_module->sm_buffer_descriptor[i].base_segment_address=base_buffer;
|
||||
|
||||
|
||||
/* allocate array to keep data on each segment in the buffer.
|
||||
* One segment per process in the group.
|
||||
*/
|
||||
@ -858,6 +857,11 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
if( NULL == sm_module->sm_buffer_descriptor[i].proc_memory ) {
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* set bank index */
|
||||
sm_module->sm_buffer_descriptor[i].bank_index=
|
||||
i/sm_module->sm2_module_num_regions_per_bank;
|
||||
|
||||
for(j=0 ; j < group_size ; j++ ) {
|
||||
ctl_ptr=(volatile mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
(base_buffer+j* sm_module->segement_size_per_process);
|
||||
@ -922,6 +926,7 @@ sm2_module_enable(struct mca_coll_base_module_1_1_0_t *module,
|
||||
{
|
||||
/* local variables */
|
||||
char output_buffer[2*MPI_MAX_OBJECT_NAME];
|
||||
int bank_index;
|
||||
|
||||
memset(&output_buffer[0],0,sizeof(output_buffer));
|
||||
snprintf(output_buffer,sizeof(output_buffer),"%s (cid %d)", comm->c_name,
|
||||
@ -937,7 +942,8 @@ sm2_module_enable(struct mca_coll_base_module_1_1_0_t *module,
|
||||
sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
{
|
||||
/* local variables */
|
||||
int rc,buffer_index, memory_bank_index;
|
||||
int rc,buffer_index, memory_bank_index,i_request,bank_index;
|
||||
int request_index;
|
||||
mca_coll_sm2_nb_request_process_private_mem_t *request;
|
||||
|
||||
/* check to see if need to progress the current nb-barrier, which
|
||||
@ -979,21 +985,22 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
module->sm2_allocated_buffer_index=0;
|
||||
}
|
||||
|
||||
/* do I need to complete non-blocking barrier ? The barrier will
|
||||
* be initiated when a process is done with the buffer */
|
||||
if( module->sm2_allocated_buffer_index ==
|
||||
module->sm2_first_buffer_index_next_bank) {
|
||||
/* If this is the first buffer in the bank, see if the barrier
|
||||
* needs to be completed
|
||||
*/
|
||||
|
||||
memory_bank_index= module->sm2_allocated_buffer_index /
|
||||
module->sm2_module_num_regions_per_bank;
|
||||
bank_index=module->
|
||||
sm_buffer_descriptor[module->sm2_allocated_buffer_index].bank_index;
|
||||
if( NB_BARRIER_INACTIVE !=
|
||||
module->barrier_request[bank_index].sm2_barrier_phase ) {
|
||||
|
||||
if ( NB_BARRIER_INACTIVE !=
|
||||
module->barrier_request[memory_bank_index].sm2_barrier_phase) {
|
||||
/*
|
||||
* complete non-blocking barrier, so this memory bank will
|
||||
* be available for use.
|
||||
*/
|
||||
request=&(module->barrier_request[memory_bank_index]);
|
||||
request_index=module->current_request_index;
|
||||
/* complete barrier requests in order */
|
||||
for(i_request=0 ; i_request< module->sm2_module_num_memory_banks ;
|
||||
i_request++ ) {
|
||||
|
||||
/* complete requests in order */
|
||||
request=&(module->barrier_request[module->current_request_index]);
|
||||
while ( NB_BARRIER_DONE != request->sm2_barrier_phase ) {
|
||||
rc=mca_coll_sm2_nbbarrier_intra_progress(module->module_comm,
|
||||
request,
|
||||
@ -1001,28 +1008,31 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
return NULL;
|
||||
}
|
||||
/* set the reqeust to inactive, and point current_request_index
|
||||
* to the request for the next memory bank
|
||||
*/
|
||||
/* set request to inactive */
|
||||
request->sm2_barrier_phase=NB_BARRIER_INACTIVE;
|
||||
/* move pointer to next request that needs to be completed */
|
||||
module->current_request_index=memory_bank_index+1;
|
||||
/* wrap around */
|
||||
if( module->current_request_index ==
|
||||
module->sm2_module_num_memory_banks ) {
|
||||
module->current_request_index=0;
|
||||
}
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* re-set counter for next bank */
|
||||
module->sm2_first_buffer_index_next_bank +=
|
||||
module->sm2_module_num_regions_per_bank;
|
||||
if( module->sm2_first_buffer_index_next_bank ==
|
||||
|
||||
/* set the reqeust to inactive, and point current_request_index
|
||||
* to the request for the next memory bank
|
||||
*/
|
||||
|
||||
/* set request to inactive */
|
||||
request->sm2_barrier_phase=NB_BARRIER_INACTIVE;
|
||||
|
||||
/* move pointer to next request that needs to be completed */
|
||||
module->current_request_index++;
|
||||
|
||||
/* wrap around */
|
||||
if( module->current_request_index ==
|
||||
module->sm2_module_num_memory_banks ) {
|
||||
module->sm2_module_num_memory_banks=0;
|
||||
module->current_request_index=0;
|
||||
}
|
||||
|
||||
/* if current bank is free - break out */
|
||||
if( request_index == bank_index)
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
buffer_index=module->sm2_allocated_buffer_index;
|
||||
@ -1038,7 +1048,7 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
int free_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
{
|
||||
/* local variables */
|
||||
int rc,memory_bank_index;
|
||||
int rc,memory_bank_index,bank_index;
|
||||
mca_coll_sm2_nb_request_process_private_mem_t *request;
|
||||
|
||||
/* check to see if need to progress the current nb-barrier, which
|
||||
@ -1074,15 +1084,23 @@ int free_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
|
||||
/* get next buffer index */
|
||||
module->sm2_freed_buffer_index++;
|
||||
/* check for wrap-around */
|
||||
if( module->sm2_freed_buffer_index == module->sm2_module_num_buffers ) {
|
||||
module->sm2_freed_buffer_index=0;
|
||||
}
|
||||
bank_index=module->
|
||||
sm_buffer_descriptor[module->sm2_freed_buffer_index].bank_index;
|
||||
|
||||
/* do I need to initiate non-blocking barrier - do this when last
|
||||
* buffer in the pool is used
|
||||
*/
|
||||
|
||||
/* do I need to initiate non-blocking barrier */
|
||||
if( module->sm2_freed_buffer_index ==
|
||||
module->sm2_last_buffer_index_this_bank) {
|
||||
( module->sm2_module_num_regions_per_bank * (bank_index+1) -1 )
|
||||
) {
|
||||
|
||||
/* complete non-blocking barrier */
|
||||
memory_bank_index= module->sm2_freed_buffer_index /
|
||||
module->sm2_module_num_regions_per_bank;
|
||||
request=&(module->barrier_request[memory_bank_index]);
|
||||
/* start non-blocking barrier */
|
||||
request=&(module->barrier_request[bank_index]);
|
||||
rc=mca_coll_sm2_nbbarrier_intra(module->module_comm,
|
||||
request,(mca_coll_base_module_1_1_0_t *)module);
|
||||
if( OMPI_SUCCESS !=rc ) {
|
||||
@ -1104,22 +1122,8 @@ int free_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
|
||||
}
|
||||
|
||||
/* need to use buffer out of next memory bank */
|
||||
module->sm2_last_buffer_index_this_bank +=
|
||||
module->sm2_module_num_regions_per_bank;
|
||||
|
||||
/* wrap around */
|
||||
if( module->sm2_last_buffer_index_this_bank >=
|
||||
module->sm2_module_num_memory_banks ) {
|
||||
module->sm2_last_buffer_index_this_bank=
|
||||
module->sm2_module_num_regions_per_bank-1;
|
||||
}
|
||||
}
|
||||
|
||||
/* check for wrap-around */
|
||||
if( module->sm2_freed_buffer_index == module->sm2_module_num_buffers ) {
|
||||
module->sm2_freed_buffer_index=0;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
|
239
ompi/mca/coll/sm2/coll_sm2_reduce.c
Обычный файл
239
ompi/mca/coll/sm2/coll_sm2_reduce.c
Обычный файл
@ -0,0 +1,239 @@
|
||||
/*
|
||||
* Copyright (c) 2007-2008 UT-Battelle, LLC
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
/** @file */
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/constants.h"
|
||||
#include "coll_sm2.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/datatype/datatype.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
/* debug
|
||||
#include "opal/sys/timer.h"
|
||||
|
||||
extern uint64_t timers[7];
|
||||
end debug */
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Shared memory blocking allreduce.
|
||||
*/
|
||||
static
|
||||
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op,
|
||||
int root,
|
||||
struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module)
|
||||
{
|
||||
/* local variables */
|
||||
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
||||
int my_rank, comm_size, child_rank, child, n_children;
|
||||
int count_processed,count_this_stripe;
|
||||
int process_shift,my_node_index;
|
||||
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
|
||||
long long tag;
|
||||
volatile char * my_data_pointer;
|
||||
volatile char * child_data_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
tree_node_t *my_reduction_node;
|
||||
sm_work_buffer_t *sm_buffer_desc;
|
||||
|
||||
sm_module=(mca_coll_sm2_module_t *) module;
|
||||
|
||||
/* compute process shift */
|
||||
my_rank=ompi_comm_rank(comm);
|
||||
comm_size=ompi_comm_size(comm);
|
||||
process_shift=root;
|
||||
my_node_index=my_rank-root;
|
||||
/* wrap around */
|
||||
if(0 > my_node_index ) {
|
||||
my_node_index+=comm_size;
|
||||
}
|
||||
|
||||
/* get size of data needed - same layout as user data, so that
|
||||
* we can apply the reudction routines directly on these buffers
|
||||
*/
|
||||
rc=ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
message_extent=dt_extent*count;
|
||||
|
||||
/* lenght of control and data regions */
|
||||
ctl_size=sm_module->ctl_memory_per_proc_per_segment;
|
||||
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
|
||||
|
||||
/* number of data types copies that the scratch buffer can hold */
|
||||
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
|
||||
if ( 0 == n_dts_per_buffer ) {
|
||||
rc=OMPI_ERROR;
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* compute number of stripes needed to process this collective */
|
||||
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
|
||||
|
||||
/* get my node for the reduction tree */
|
||||
my_reduction_node=&(sm_module->reduction_tree[my_node_index]);
|
||||
n_children=my_reduction_node->n_children;
|
||||
count_processed=0;
|
||||
|
||||
/* get a pointer to the shared-memory working buffer */
|
||||
/* NOTE: starting with a rather synchronous approach */
|
||||
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
|
||||
|
||||
/* get unique tag for this stripe - assume only one collective
|
||||
* per communicator at a given time, so no locking needed
|
||||
* for atomic update of the tag */
|
||||
tag=sm_module->collective_tag;
|
||||
sm_module->collective_tag++;
|
||||
|
||||
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
|
||||
|
||||
/* get number of elements to process in this stripe */
|
||||
count_this_stripe=n_dts_per_buffer;
|
||||
if( count_processed + count_this_stripe > count )
|
||||
count_this_stripe=count-count_processed;
|
||||
|
||||
/* offset to data segment */
|
||||
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
|
||||
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
|
||||
|
||||
/***************************
|
||||
* Fan into root phase
|
||||
***************************/
|
||||
|
||||
if( LEAF_NODE != my_reduction_node->my_node_type ) {
|
||||
/* copy segment into shared buffer - ompi_op_reduce
|
||||
* provids only 2 buffers, so can't add from two
|
||||
* into a third buffer.
|
||||
*/
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait on children, and apply op to their data
|
||||
*/
|
||||
for( child=0 ; child < n_children ; child++ ) {
|
||||
child_rank=my_reduction_node->children_ranks[child];
|
||||
child_rank+=process_shift;
|
||||
/* wrap around */
|
||||
if( comm_size <= child_rank ){
|
||||
child_rank-=comm_size;
|
||||
}
|
||||
|
||||
child_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[child_rank].control_region;
|
||||
child_data_pointer=
|
||||
sm_buffer_desc->proc_memory[child_rank].data_segment;
|
||||
|
||||
/* wait until child flag is set */
|
||||
while(child_ctl_pointer->flag != tag) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* apply collective operation */
|
||||
ompi_op_reduce(op,(void *)child_data_pointer,
|
||||
(void *)my_data_pointer, count_this_stripe,dtype);
|
||||
|
||||
} /* end child loop */
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
my_ctl_pointer->index=stripe_number;
|
||||
|
||||
/* copy data to destination */
|
||||
if( ROOT_NODE == my_reduction_node->my_node_type ) {
|
||||
/* copy data to user supplied buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)rbuf+dt_extent*count_processed,
|
||||
(char *)my_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
/* leaf node */
|
||||
/* copy segment into shared buffer - later on will optimize to
|
||||
* eliminate extra copies.
|
||||
*/
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
}
|
||||
|
||||
/* "free" the shared-memory working buffer */
|
||||
rc=free_sm2_shared_buffer(sm_module);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* update the count of elements processed */
|
||||
count_processed+=count_this_stripe;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return rc;
|
||||
|
||||
Error:
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shared memory blocking reduce.
|
||||
*/
|
||||
int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
int root, struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module)
|
||||
{
|
||||
/* local variables */
|
||||
int rc;
|
||||
|
||||
rc= mca_coll_sm2_reduce_intra_fanin(sbuf, rbuf, count,
|
||||
dtype, op, root, comm, module);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
Error:
|
||||
return rc;
|
||||
}
|
Загрузка…
x
Ссылка в новой задаче
Block a user