From 2d8c2420e8f4a1162bb6fb22f436b6c3441edf0f Mon Sep 17 00:00:00 2001 From: Rich Graham Date: Sun, 24 Feb 2008 20:54:16 +0000 Subject: [PATCH] checkpoint. This commit was SVN r17571. --- ompi/mca/coll/sm2/coll_sm2.h | 9 +++ ompi/mca/coll/sm2/coll_sm2_allreduce.c | 87 ++++++++++++++++++++++---- ompi/mca/coll/sm2/coll_sm2_module.c | 22 ++----- 3 files changed, 89 insertions(+), 29 deletions(-) diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index a8c0201355..6f33349f6f 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -219,6 +219,12 @@ BEGIN_C_DECLS /* size, per process, of each memory segment */ size_t segement_size_per_process; + /* size, per process and segment , of control region */ + size_t ctl_memory_per_proc_per_segment; + + /* size, per process and segment , of data region */ + size_t data_memory_per_proc_per_segment; + /* number of memory banks */ int sm2_module_num_memory_banks; @@ -268,6 +274,9 @@ BEGIN_C_DECLS /* multinumial reduction tree */ tree_node_t *reduction_tree; + /* collective tag */ + long long collective_tag; + }; typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t; diff --git a/ompi/mca/coll/sm2/coll_sm2_allreduce.c b/ompi/mca/coll/sm2/coll_sm2_allreduce.c index 2078b26c58..7630869f3e 100644 --- a/ompi/mca/coll/sm2/coll_sm2_allreduce.c +++ b/ompi/mca/coll/sm2/coll_sm2_allreduce.c @@ -26,34 +26,95 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, struct mca_coll_base_module_1_1_0_t *module) { /* local variables */ - int rc=OMPI_SUCCESS; - size_t message_extent; - char *sm_buffer; + int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number; + size_t message_extent,dt_extent,ctl_size,len_data_buffer; + long long tag; + volatile char *sm_buffer; mca_coll_sm2_module_t *sm_module; sm_module=(mca_coll_sm2_module_t *) module; - /* get a pointer to the shared-memory working buffer */ - sm_buffer=alloc_sm2_shared_buffer(sm_module); - if( NULL == sm_buffer) { - rc=OMPI_ERR_OUT_OF_RESOURCE; - goto Error; - } + /* get unique tag for this collective - assume only one collective + * per communicator at a given time, so no locking needed + * for atomic update of the tag */ + sm_module->collective_tag++; + tag=sm_module->collective_tag; /* get size of data needed - same layout as user data, so that * we can apply the reudction routines directly on these buffers */ - rc=ompi_ddt_type_size(dtype, &message_extent); + rc=ompi_ddt_type_size(dtype, &dt_extent); if( OMPI_SUCCESS != rc ) { goto Error; } + message_extent=dt_extent*count; - /* "free" the shared-memory working buffer */ - rc=free_sm2_shared_buffer(sm_module); - if( OMPI_SUCCESS != rc ) { + /* lenght of control and data regions */ + ctl_size=sm_module->ctl_memory_per_proc_per_segment; + len_data_buffer=sm_module->data_memory_per_proc_per_segment; + + /* number of data types copies that the scratch buffer can hold */ + n_dts_per_buffer=((int) len_data_buffer)/dt_extent; + if ( 0 == n_dts_per_buffer ) { + rc=OMPI_ERROR; goto Error; } + /* compute number of stripes needed to process this collective */ + n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ; + + /* get a pointer to the shared-memory working buffer */ + /* NOTE: starting with a rather synchronous approach */ + for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) { + sm_buffer=alloc_sm2_shared_buffer(sm_module); + if( NULL == sm_buffer) { + rc=OMPI_ERR_OUT_OF_RESOURCE; + goto Error; + } + /* + * Fan into root phase + */ + + /* copy segment into shared buffer - later on will optimize to + * eliminate extra copies. + */ + + /* + * Wait on children, and apply op to their data + */ + + /* + * Signal parent that data is ready + */ + + + /* + * Fan out from root phase + */ + + /* + * wait on Parent to signal that data is ready + */ + + /* + * Copy data to shared buffer + */ + + /* + * Signal children that Data is ready for reading + */ + + /* + * Copy data out to destination + */ + + /* "free" the shared-memory working buffer */ + rc=free_sm2_shared_buffer(sm_module); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + } + /* return */ return rc; diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index a7830ca22e..9cd279ccd5 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -425,6 +425,9 @@ static int init_sm2_barrier(struct ompi_communicator_t *comm, /* set the pointer to the request that needs to be completed first */ module->current_request_index=0; + /* set collective tag */ + module->collective_tag=0; + /* return - successful */ return OMPI_SUCCESS; @@ -546,6 +549,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) (alignment + ctl_memory_per_proc_per_segment -1) / alignment; ctl_memory_per_proc_per_segment*=alignment; mca_coll_sm2_component.sm2_ctl_size_allocated=ctl_memory_per_proc_per_segment; + sm_module->ctl_memory_per_proc_per_segment=ctl_memory_per_proc_per_segment; /* get data region size - allocation happens on a page granularity, with * a minimum of a page allocated per proc, so adjust to this @@ -566,6 +570,8 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) sm_module->segement_size_per_process=size_tot_per_proc_per_seg; sm_module->segment_size=size_tot_per_segment; + sm_module->data_memory_per_proc_per_segment=size_tot_per_proc_per_seg- + ctl_memory_per_proc_per_segment; /* compute memory per bank */ tot_size_per_bank=size_tot_per_segment*mca_coll_sm2_component.sm2_num_regions_per_bank; @@ -640,22 +646,6 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) if( MPI_SUCCESS != ret ) { goto CLEANUP; } - /* debug */ - if( 0 == ompi_comm_rank(comm) ) { - fprintf(stderr," my rank %d \n",ompi_comm_rank(comm)); - for( i=0 ; i < ompi_comm_size(comm) ; i++ ) { - fprintf(stderr," DDDD i %d parent %d children :: ", - i,sm_module->reduction_tree[i].parent_rank); - for (j=0 ; j < sm_module->reduction_tree[i].n_children ; j++ ) { - - fprintf(stderr," %d ", - sm_module->reduction_tree[i].children_ranks[j]); - } - fprintf(stderr," \n"); - fflush(stderr); - } - } - /* initialize local counters */ sm_module->sm2_allocated_buffer_index=-1;