diff --git a/ompi/mca/coll/sm2/Makefile.am b/ompi/mca/coll/sm2/Makefile.am index 40e9b922aa..0e967f4077 100644 --- a/ompi/mca/coll/sm2/Makefile.am +++ b/ompi/mca/coll/sm2/Makefile.am @@ -22,6 +22,7 @@ sources = \ coll_sm2.h \ coll_sm2_component.c \ coll_sm2_module.c \ + coll_sm2_reduce.c \ coll_sm2_allreduce.c \ coll_sm2_barrier.c \ coll_sm2_service.c diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index c747a47036..19f525911c 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -240,6 +240,11 @@ BEGIN_C_DECLS * a per process basis */ sm_memory_region_desc_t *proc_memory; + + /* + * bank index + */ + int bank_index; }; typedef struct sm_work_buffer_t sm_work_buffer_t; @@ -327,12 +332,6 @@ BEGIN_C_DECLS */ int sm2_first_buffer_index_next_bank; - /* index of last buffer in this memory bank - - * We start the non-blocking barrier after allocating - * this buffer. - */ - int sm2_last_buffer_index_this_bank; - /* communicator - there is a one-to-one association between * the communicator and the module */ @@ -488,6 +487,15 @@ BEGIN_C_DECLS struct ompi_communicator_t *comm, struct mca_coll_base_module_1_1_0_t *module); + /** + * Shared memory blocking reduce + */ + int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + struct mca_coll_base_module_1_1_0_t *module); + + END_C_DECLS #endif /* MCA_COLL_SM2_EXPORT_H */ diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index e1e5362d9b..c73030404d 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -543,7 +543,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) sm_module->super.coll_exscan = NULL; sm_module->super.coll_gather = NULL; sm_module->super.coll_gatherv = NULL; - sm_module->super.coll_reduce = NULL; + sm_module->super.coll_reduce = mca_coll_sm2_reduce_intra; sm_module->super.coll_reduce_scatter = NULL; sm_module->super.coll_scan = NULL; sm_module->super.coll_scatter = NULL; @@ -793,8 +793,6 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) /* NOTE: need to fix this if we have only one memory bank */ sm_module->sm2_first_buffer_index_next_bank= sm_module->sm2_module_num_regions_per_bank; - sm_module->sm2_last_buffer_index_this_bank= - sm_module->sm2_module_num_regions_per_bank-1; if(sm_module->sm2_module_num_memory_banks > 1 ) { sm_module->sm2_first_buffer_index_next_bank= mca_coll_sm2_component.sm2_num_regions_per_bank; @@ -816,6 +814,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) i*sm_module->segment_size; sm_module->sm_buffer_descriptor[i].base_segment_address=base_buffer; + /* allocate array to keep data on each segment in the buffer. * One segment per process in the group. */ @@ -858,6 +857,11 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) if( NULL == sm_module->sm_buffer_descriptor[i].proc_memory ) { goto CLEANUP; } + + /* set bank index */ + sm_module->sm_buffer_descriptor[i].bank_index= + i/sm_module->sm2_module_num_regions_per_bank; + for(j=0 ; j < group_size ; j++ ) { ctl_ptr=(volatile mca_coll_sm2_nb_request_process_shared_mem_t *) (base_buffer+j* sm_module->segement_size_per_process); @@ -922,6 +926,7 @@ sm2_module_enable(struct mca_coll_base_module_1_1_0_t *module, { /* local variables */ char output_buffer[2*MPI_MAX_OBJECT_NAME]; + int bank_index; memset(&output_buffer[0],0,sizeof(output_buffer)); snprintf(output_buffer,sizeof(output_buffer),"%s (cid %d)", comm->c_name, @@ -937,7 +942,8 @@ sm2_module_enable(struct mca_coll_base_module_1_1_0_t *module, sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module) { /* local variables */ - int rc,buffer_index, memory_bank_index; + int rc,buffer_index, memory_bank_index,i_request,bank_index; + int request_index; mca_coll_sm2_nb_request_process_private_mem_t *request; /* check to see if need to progress the current nb-barrier, which @@ -979,21 +985,22 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module) module->sm2_allocated_buffer_index=0; } - /* do I need to complete non-blocking barrier ? The barrier will - * be initiated when a process is done with the buffer */ - if( module->sm2_allocated_buffer_index == - module->sm2_first_buffer_index_next_bank) { + /* If this is the first buffer in the bank, see if the barrier + * needs to be completed + */ - memory_bank_index= module->sm2_allocated_buffer_index / - module->sm2_module_num_regions_per_bank; + bank_index=module-> + sm_buffer_descriptor[module->sm2_allocated_buffer_index].bank_index; + if( NB_BARRIER_INACTIVE != + module->barrier_request[bank_index].sm2_barrier_phase ) { - if ( NB_BARRIER_INACTIVE != - module->barrier_request[memory_bank_index].sm2_barrier_phase) { - /* - * complete non-blocking barrier, so this memory bank will - * be available for use. - */ - request=&(module->barrier_request[memory_bank_index]); + request_index=module->current_request_index; + /* complete barrier requests in order */ + for(i_request=0 ; i_request< module->sm2_module_num_memory_banks ; + i_request++ ) { + + /* complete requests in order */ + request=&(module->barrier_request[module->current_request_index]); while ( NB_BARRIER_DONE != request->sm2_barrier_phase ) { rc=mca_coll_sm2_nbbarrier_intra_progress(module->module_comm, request, @@ -1001,28 +1008,31 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module) if( OMPI_SUCCESS != rc ) { return NULL; } - /* set the reqeust to inactive, and point current_request_index - * to the request for the next memory bank - */ - /* set request to inactive */ - request->sm2_barrier_phase=NB_BARRIER_INACTIVE; - /* move pointer to next request that needs to be completed */ - module->current_request_index=memory_bank_index+1; - /* wrap around */ - if( module->current_request_index == - module->sm2_module_num_memory_banks ) { - module->current_request_index=0; - } + opal_progress(); } - - /* re-set counter for next bank */ - module->sm2_first_buffer_index_next_bank += - module->sm2_module_num_regions_per_bank; - if( module->sm2_first_buffer_index_next_bank == + + /* set the reqeust to inactive, and point current_request_index + * to the request for the next memory bank + */ + + /* set request to inactive */ + request->sm2_barrier_phase=NB_BARRIER_INACTIVE; + + /* move pointer to next request that needs to be completed */ + module->current_request_index++; + + /* wrap around */ + if( module->current_request_index == module->sm2_module_num_memory_banks ) { - module->sm2_module_num_memory_banks=0; + module->current_request_index=0; } + + /* if current bank is free - break out */ + if( request_index == bank_index) + break; + } + } buffer_index=module->sm2_allocated_buffer_index; @@ -1038,7 +1048,7 @@ sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module) int free_sm2_shared_buffer(mca_coll_sm2_module_t *module) { /* local variables */ - int rc,memory_bank_index; + int rc,memory_bank_index,bank_index; mca_coll_sm2_nb_request_process_private_mem_t *request; /* check to see if need to progress the current nb-barrier, which @@ -1074,15 +1084,23 @@ int free_sm2_shared_buffer(mca_coll_sm2_module_t *module) /* get next buffer index */ module->sm2_freed_buffer_index++; + /* check for wrap-around */ + if( module->sm2_freed_buffer_index == module->sm2_module_num_buffers ) { + module->sm2_freed_buffer_index=0; + } + bank_index=module-> + sm_buffer_descriptor[module->sm2_freed_buffer_index].bank_index; + + /* do I need to initiate non-blocking barrier - do this when last + * buffer in the pool is used + */ - /* do I need to initiate non-blocking barrier */ if( module->sm2_freed_buffer_index == - module->sm2_last_buffer_index_this_bank) { + ( module->sm2_module_num_regions_per_bank * (bank_index+1) -1 ) + ) { - /* complete non-blocking barrier */ - memory_bank_index= module->sm2_freed_buffer_index / - module->sm2_module_num_regions_per_bank; - request=&(module->barrier_request[memory_bank_index]); + /* start non-blocking barrier */ + request=&(module->barrier_request[bank_index]); rc=mca_coll_sm2_nbbarrier_intra(module->module_comm, request,(mca_coll_base_module_1_1_0_t *)module); if( OMPI_SUCCESS !=rc ) { @@ -1104,22 +1122,8 @@ int free_sm2_shared_buffer(mca_coll_sm2_module_t *module) } - /* need to use buffer out of next memory bank */ - module->sm2_last_buffer_index_this_bank += - module->sm2_module_num_regions_per_bank; - - /* wrap around */ - if( module->sm2_last_buffer_index_this_bank >= - module->sm2_module_num_memory_banks ) { - module->sm2_last_buffer_index_this_bank= - module->sm2_module_num_regions_per_bank-1; - } } - /* check for wrap-around */ - if( module->sm2_freed_buffer_index == module->sm2_module_num_buffers ) { - module->sm2_freed_buffer_index=0; - } /* return */ return OMPI_SUCCESS; diff --git a/ompi/mca/coll/sm2/coll_sm2_reduce.c b/ompi/mca/coll/sm2/coll_sm2_reduce.c new file mode 100644 index 0000000000..84c18a0b89 --- /dev/null +++ b/ompi/mca/coll/sm2/coll_sm2_reduce.c @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2007-2008 UT-Battelle, LLC + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file */ + +#include "ompi_config.h" + +#include "ompi/constants.h" +#include "coll_sm2.h" +#include "ompi/op/op.h" +#include "ompi/datatype/datatype.h" +#include "ompi/communicator/communicator.h" +/* debug +#include "opal/sys/timer.h" + +extern uint64_t timers[7]; + end debug */ + + + +/** + * Shared memory blocking allreduce. + */ +static +int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + int root, + struct ompi_communicator_t *comm, + struct mca_coll_base_module_1_1_0_t *module) +{ + /* local variables */ + int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number; + int my_rank, comm_size, child_rank, child, n_children; + int count_processed,count_this_stripe; + int process_shift,my_node_index; + size_t message_extent,dt_extent,ctl_size,len_data_buffer; + long long tag; + volatile char * my_data_pointer; + volatile char * child_data_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer; + mca_coll_sm2_module_t *sm_module; + tree_node_t *my_reduction_node; + sm_work_buffer_t *sm_buffer_desc; + + sm_module=(mca_coll_sm2_module_t *) module; + + /* compute process shift */ + my_rank=ompi_comm_rank(comm); + comm_size=ompi_comm_size(comm); + process_shift=root; + my_node_index=my_rank-root; + /* wrap around */ + if(0 > my_node_index ) { + my_node_index+=comm_size; + } + + /* get size of data needed - same layout as user data, so that + * we can apply the reudction routines directly on these buffers + */ + rc=ompi_ddt_type_extent(dtype, &dt_extent); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + message_extent=dt_extent*count; + + /* lenght of control and data regions */ + ctl_size=sm_module->ctl_memory_per_proc_per_segment; + len_data_buffer=sm_module->data_memory_per_proc_per_segment; + + /* number of data types copies that the scratch buffer can hold */ + n_dts_per_buffer=((int) len_data_buffer)/dt_extent; + if ( 0 == n_dts_per_buffer ) { + rc=OMPI_ERROR; + goto Error; + } + + /* compute number of stripes needed to process this collective */ + n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ; + + /* get my node for the reduction tree */ + my_reduction_node=&(sm_module->reduction_tree[my_node_index]); + n_children=my_reduction_node->n_children; + count_processed=0; + + /* get a pointer to the shared-memory working buffer */ + /* NOTE: starting with a rather synchronous approach */ + for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) { + + /* get unique tag for this stripe - assume only one collective + * per communicator at a given time, so no locking needed + * for atomic update of the tag */ + tag=sm_module->collective_tag; + sm_module->collective_tag++; + + sm_buffer_desc=alloc_sm2_shared_buffer(sm_module); + + /* get number of elements to process in this stripe */ + count_this_stripe=n_dts_per_buffer; + if( count_processed + count_this_stripe > count ) + count_this_stripe=count-count_processed; + + /* offset to data segment */ + my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region; + my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment; + + /*************************** + * Fan into root phase + ***************************/ + + if( LEAF_NODE != my_reduction_node->my_node_type ) { + /* copy segment into shared buffer - ompi_op_reduce + * provids only 2 buffers, so can't add from two + * into a third buffer. + */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_data_pointer, + (char *)((char *)sbuf+dt_extent*count_processed)); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* + * Wait on children, and apply op to their data + */ + for( child=0 ; child < n_children ; child++ ) { + child_rank=my_reduction_node->children_ranks[child]; + child_rank+=process_shift; + /* wrap around */ + if( comm_size <= child_rank ){ + child_rank-=comm_size; + } + + child_ctl_pointer= + sm_buffer_desc->proc_memory[child_rank].control_region; + child_data_pointer= + sm_buffer_desc->proc_memory[child_rank].data_segment; + + /* wait until child flag is set */ + while(child_ctl_pointer->flag != tag) { + opal_progress(); + } + + /* apply collective operation */ + ompi_op_reduce(op,(void *)child_data_pointer, + (void *)my_data_pointer, count_this_stripe,dtype); + + } /* end child loop */ + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + + /* + * Signal parent that data is ready + */ + my_ctl_pointer->flag=tag; + my_ctl_pointer->index=stripe_number; + + /* copy data to destination */ + if( ROOT_NODE == my_reduction_node->my_node_type ) { + /* copy data to user supplied buffer */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)rbuf+dt_extent*count_processed, + (char *)my_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + } + + } else { + /* leaf node */ + /* copy segment into shared buffer - later on will optimize to + * eliminate extra copies. + */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_data_pointer, + (char *)((char *)sbuf+dt_extent*count_processed)); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + + /* + * Signal parent that data is ready + */ + my_ctl_pointer->flag=tag; + } + + /* "free" the shared-memory working buffer */ + rc=free_sm2_shared_buffer(sm_module); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + + /* update the count of elements processed */ + count_processed+=count_this_stripe; + } + + /* return */ + return rc; + +Error: + return rc; +} + +/** + * Shared memory blocking reduce. + */ +int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + int root, struct ompi_communicator_t *comm, + struct mca_coll_base_module_1_1_0_t *module) +{ + /* local variables */ + int rc; + + rc= mca_coll_sm2_reduce_intra_fanin(sbuf, rbuf, count, + dtype, op, root, comm, module); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + + return OMPI_SUCCESS; + +Error: + return rc; +}