From 441fb9fb9e6eada8cc55f178d338513c59c3053f Mon Sep 17 00:00:00 2001 From: Rich Graham Date: Thu, 27 Mar 2008 01:16:32 +0000 Subject: [PATCH] checkpoint. This commit was SVN r17985. --- ompi/mca/coll/sm2/coll_sm2.h | 189 ++++------- ompi/mca/coll/sm2/coll_sm2_allreduce.c | 426 +++++++++++++++++++++++++ ompi/mca/coll/sm2/coll_sm2_component.c | 6 + ompi/mca/coll/sm2/coll_sm2_module.c | 1 + 4 files changed, 497 insertions(+), 125 deletions(-) diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index ee1fa0006c..c747a47036 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -106,6 +106,11 @@ BEGIN_C_DECLS /** MCA parameter: order of fan-out read tree */ int order_fanout_read_tree; + /*I MCA paramenter: number of polling loops to run while waiting + * for children or parent to complete their work + */ + int n_poll_loops; + }; /** @@ -355,6 +360,11 @@ BEGIN_C_DECLS /* recursive-doubling tree node */ pair_exchange_node_t recursive_doubling_tree; + /* number of polling loops to run while waiting + * for children or parent to complete their work + */ + int n_poll_loops; + /* collective tag */ long long collective_tag; @@ -363,6 +373,60 @@ BEGIN_C_DECLS typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t; OBJ_CLASS_DECLARATION(mca_coll_sm2_module_t); + /* + * struct for manageing the allreduce pipeline. + */ + struct mca_coll_sm2_module_allreduce_pipeline_t { + /* pointer to shared temporary working buffer */ + sm_work_buffer_t *shared_buffer; + + /* cached rank */ + int my_rank; + + /* cached reduction node */ + tree_node_t *my_reduction_node; + + /* cached fanout tree */ + tree_node_t *my_fanout_read_tree; + + + /* staus of the buffer - determines what next to do + * with this data + */ + int status; + + /* + * number of child loops completed - needed for + * async progress + */ + int n_child_loops_completed; + + /* + * number of data-type elements to process + */ + int count_this_stripe; + + /* + * offset into the data type buffer, in units of data-types + */ + int count_processed; + + /* + * tag + */ + int tag; + }; + typedef struct mca_coll_sm2_module_allreduce_pipeline_t + mca_coll_sm2_module_allreduce_pipeline_t; + OBJ_CLASS_DECLARATION(mca_coll_sm2_module_allreduce_pipeline_t); + + enum { + BUFFER_AVAILABLE, + STARTED, + FANIN, + FANOUT + }; + /** * Global component instance @@ -424,131 +488,6 @@ BEGIN_C_DECLS struct ompi_communicator_t *comm, struct mca_coll_base_module_1_1_0_t *module); - -/** - * Macro to setup flag usage - */ -#define FLAG_SETUP(flag_num, flag, data) \ - (flag) = (mca_coll_sm_in_use_flag_t*) \ - (((char *) (data)->mcb_in_use_flags) + \ - ((flag_num) * mca_coll_sm_component.sm_control_size)) - -/** - * Macro to wait for the in-use flag to become idle (used by the root) - */ -#define FLAG_WAIT_FOR_IDLE(flag) \ - while (0 != (flag)->mcsiuf_num_procs_using) SPIN - -/** - * Macro to wait for a flag to indicate that it's ready for this - * operation (used by non-root processes to know when FLAG_SET() has - * been called) - */ -#define FLAG_WAIT_FOR_OP(flag, op) \ - while ((op) != flag->mcsiuf_operation_count) SPIN - -/** - * Macro to set an in-use flag with relevant data to claim it - */ -#define FLAG_RETAIN(flag, num_procs, op_count) \ - (flag)->mcsiuf_num_procs_using = (num_procs); \ - (flag)->mcsiuf_operation_count = (op_count) - -/** - * Macro to release an in-use flag from this process - */ -#define FLAG_RELEASE(flag) \ - opal_atomic_add(&(flag)->mcsiuf_num_procs_using, -1) - -/** - * Macro to copy a single segment in from a user buffer to a shared - * segment - */ -#define COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data) \ - (iov).iov_base = \ - (index)->mcbmi_data + \ - ((rank) * mca_coll_sm_component.sm_fragment_size); \ - (max_data) = (iov).iov_len = mca_coll_sm_component.sm_fragment_size; \ - ompi_convertor_pack(&(convertor), &(iov), &mca_coll_sm_iov_size, \ - &(max_data) ) - -/** - * Macro to copy a single segment out from a shared segment to a user - * buffer - */ -#define COPY_FRAGMENT_OUT(convertor, src_rank, index, iov, max_data) \ - (iov).iov_base = (((char*) (index)->mcbmi_data) + \ - ((src_rank) * mca_coll_sm_component.sm_fragment_size)); \ - ompi_convertor_unpack(&(convertor), &(iov), &mca_coll_sm_iov_size, \ - &(max_data) ) - -/** - * Macro to memcpy a fragment between one shared segment and another - */ -#define COPY_FRAGMENT_BETWEEN(src_rank, dest_rank, index, len) \ - memcpy(((index)->mcbmi_data + \ - ((dest_rank) * mca_coll_sm_component.sm_fragment_size)), \ - ((index)->mcbmi_data + \ - ((src_rank) * \ - mca_coll_sm_component.sm_fragment_size)), \ - (len)) - -/** - * Macro to tell children that a segment is ready (normalize - * the child's ID based on the shift used to calculate the "me" node - * in the tree). Used in fan out opertations. - */ -#define PARENT_NOTIFY_CHILDREN(children, num_children, index, value) \ - do { \ - for (i = 0; i < (num_children); ++i) { \ - *((size_t*) \ - (((char*) index->mcbmi_control) + \ - (mca_coll_sm_component.sm_control_size * \ - (((children)[i]->mcstn_id + root) % size)))) = (value); \ - } \ - } while (0) - -/** - * Macro for childen to wait for parent notification (use real rank). - * Save the value passed and then reset it when done. Used in fan out - * operations. - */ -#define CHILD_WAIT_FOR_NOTIFY(rank, index, value) \ - do { \ - uint32_t volatile *ptr = ((uint32_t*) \ - (((char*) index->mcbmi_control) + \ - ((rank) * mca_coll_sm_component.sm_control_size))); \ - while (0 == *ptr) SPIN; \ - (value) = *ptr; \ - *ptr = 0; \ - } while (0) - -/** - * Macro for children to tell parent that the data is ready in their - * segment. Used for fan in operations. - */ -#define CHILD_NOTIFY_PARENT(child_rank, parent_rank, index, value) \ - ((size_t volatile *) \ - (((char*) (index)->mcbmi_control) + \ - (mca_coll_sm_component.sm_control_size * \ - (parent_rank))))[(child_rank)] = (value) - -/** - * Macro for parent to wait for a specific child to tell it that the - * data is in the child's segment. Save the value when done. Used - * for fan in operations. - */ -#define PARENT_WAIT_FOR_NOTIFY_SPECIFIC(child_rank, parent_rank, index, value) \ - do { \ - size_t volatile *ptr = ((size_t volatile *) \ - (((char*) index->mcbmi_control) + \ - (mca_coll_sm_component.sm_control_size * \ - (parent_rank)))) + child_rank; \ - while (0 == *ptr) SPIN; \ - (value) = *ptr; \ - *ptr = 0; \ - } while (0) - END_C_DECLS #endif /* MCA_COLL_SM2_EXPORT_H */ diff --git a/ompi/mca/coll/sm2/coll_sm2_allreduce.c b/ompi/mca/coll/sm2/coll_sm2_allreduce.c index 974fbac2de..416ab420eb 100644 --- a/ompi/mca/coll/sm2/coll_sm2_allreduce.c +++ b/ompi/mca/coll/sm2/coll_sm2_allreduce.c @@ -300,6 +300,432 @@ Error: return rc; } +/* + * fanin/fanout progress function. + */ + +static +int progress_fanin_fanout( void *sbuf, void *rbuf, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, + mca_coll_sm2_module_allreduce_pipeline_t *reduction_desc, + int n_poll_loops, int *completed) +{ + /* local variables */ + + int my_rank,cnt; + int rc=OMPI_SUCCESS; + int my_fanout_parent; + int child_rank,n_children,child; + int count_processed,count_this_stripe; + ptrdiff_t dt_extent; + long long tag; + mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer; + volatile char * my_data_pointer; + volatile char * parent_data_pointer; + volatile char * child_data_pointer; + sm_work_buffer_t *sm_buffer_desc; + tree_node_t *my_reduction_node; + tree_node_t *my_fanout_read_tree; + + my_fanout_parent=my_fanout_read_tree->parent_rank; + tag=reduction_desc->tag; + sm_buffer_desc=reduction_desc->shared_buffer; + my_rank=reduction_desc->my_rank; + my_reduction_node=reduction_desc->my_reduction_node; + my_fanout_read_tree=reduction_desc->my_fanout_read_tree; + /* initialize flag indicating that segment is still active in the + * reduction + */ + *completed=0; + + my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region; + my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment; + + /* figure out where to proceed */ + if( FANOUT == reduction_desc->status) { + goto REDUCTION_FANOUT; + } + /* + * fan in + */ + switch (my_reduction_node->my_node_type) { + case LEAF_NODE: + /* leaf node */ + /* copy segment into shared buffer - later on will optimize to + * eliminate extra copies. + */ + count_processed=reduction_desc->count_processed; + count_this_stripe=reduction_desc->count_this_stripe; + /* error conditions already checed */ + ompi_ddt_type_extent(dtype, &dt_extent); + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_data_pointer, + (char *)((char *)sbuf+dt_extent*count_processed)); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + + /* + * Signal parent that data is ready + */ + my_ctl_pointer->flag=tag; + + break; + + default: + /* ROOT_NODE and INTERIOR_NODE */ + /* copy segment into shared buffer - ompi_op_reduce + * provids only 2 buffers, so can't add from two + * into a third buffer. + */ + count_processed=reduction_desc->count_processed; + count_this_stripe=reduction_desc->count_this_stripe; + /* error conditions already checed */ + ompi_ddt_type_extent(dtype, &dt_extent); + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_data_pointer, + (char *)((char *)sbuf+dt_extent*count_processed)); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* + * Wait on children, and apply op to their data + */ + for( child=reduction_desc->n_child_loops_completed ; + child < n_children ; child++ ) { + child_rank=my_reduction_node->children_ranks[child]; + + child_ctl_pointer= + sm_buffer_desc->proc_memory[child_rank].control_region; + child_data_pointer= + sm_buffer_desc->proc_memory[child_rank].data_segment; + + /* wait until child flag is set */ + cnt=0; + while( child_ctl_pointer->flag != tag ) { + opal_progress(); + cnt++; + if( n_poll_loops == cnt ) { + /* break out */ + reduction_desc->status=FANIN; + reduction_desc->n_child_loops_completed=child; + goto RETURN; + } + } + + /* apply collective operation */ + count_this_stripe=reduction_desc->count_this_stripe; + ompi_op_reduce(op,(void *)child_data_pointer, + (void *)my_data_pointer, count_this_stripe,dtype); + + } /* end child loop */ + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + + /* + * Signal parent that data is ready + */ + my_ctl_pointer->flag=tag; + + } + +REDUCTION_FANOUT: + switch (my_reduction_node->my_node_type) { + case LEAF_NODE: + + parent_data_pointer= + sm_buffer_desc->proc_memory[my_fanout_parent].data_segment; + parent_ctl_pointer= + sm_buffer_desc->proc_memory[my_fanout_parent].control_region; + + /* + * wait on Parent to signal that data is ready + */ + cnt=0; + while(parent_ctl_pointer->flag != -tag) { + opal_progress(); + cnt++; + if( n_poll_loops == cnt ) { + /* break out */ + reduction_desc->status=FANOUT; + goto RETURN; + } + } + + /* copy data to user supplied buffer */ + count_processed=reduction_desc->count_processed; + count_this_stripe=reduction_desc->count_this_stripe; + /* error conditions already checed */ + ompi_ddt_type_extent(dtype, &dt_extent); + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)rbuf+dt_extent*count_processed, + (char *)parent_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + + break; + + case INTERIOR_NODE: + + /* interior nodes */ + + parent_data_pointer= + sm_buffer_desc->proc_memory[my_fanout_parent].data_segment; + parent_ctl_pointer= + sm_buffer_desc->proc_memory[my_fanout_parent].control_region; + + /* + * wait on Parent to signal that data is ready + */ + cnt=0; + while(parent_ctl_pointer->flag != -tag) { + opal_progress(); + cnt++; + if( n_poll_loops == cnt ) { + /* break out */ + reduction_desc->status=FANOUT; + goto RETURN; + } + } + + /* copy the data to my shared buffer, for access by children */ + count_this_stripe=reduction_desc->count_this_stripe; + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_data_pointer,(char *)parent_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + + /* signal children that they may read the result data */ + my_ctl_pointer->flag=-tag; + + /* copy data to user supplied buffer */ + count_processed=reduction_desc->count_processed; + count_this_stripe=reduction_desc->count_this_stripe; + /* error conditions already checed */ + ompi_ddt_type_extent(dtype, &dt_extent); + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)rbuf+dt_extent*count_processed, + (char *)my_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + + break; + + case ROOT_NODE: + + /* I am the root - so copy signal children, and then + * start reading + */ + MB(); + my_ctl_pointer->flag=-tag; + + /* copy data to user supplied buffer */ + count_processed=reduction_desc->count_processed; + count_this_stripe=reduction_desc->count_this_stripe; + /* error conditions already checed */ + ompi_ddt_type_extent(dtype, &dt_extent); + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)((char *)rbuf+dt_extent*count_processed), + (char *)my_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + } + + /* completed processing the data in this stripe */ + *completed=1; + + /* mark the descriptor as available */ + reduction_desc->status=BUFFER_AVAILABLE; + + /* return */ +RETURN: + return OMPI_SUCCESS; + +} + + +/** + * Shared memory blocking allreduce - pipeline algorithm. + */ +#define depth_pipeline 2 + +static +int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline + (void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + struct mca_coll_base_module_1_1_0_t *module) +{ + + /* local variables */ + int i,buffer_index,stripe_number,my_rank,n_completed,completed; + int count_processed,count_this_stripe; + mca_coll_sm2_module_allreduce_pipeline_t working_buffers[depth_pipeline]; + int rc=OMPI_SUCCESS; + long long tag; + tree_node_t *my_reduction_node, *my_fanout_read_tree; + mca_coll_sm2_module_t *sm_module; + int n_dts_per_buffer,n_data_segments; + size_t len_data_buffer; + ptrdiff_t dt_extent; + + sm_module=(mca_coll_sm2_module_t *) module; + + /* get size of data needed - same layout as user data, so that + * we can apply the reudction routines directly on these buffers + */ + rc=ompi_ddt_type_extent(dtype, &dt_extent); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + + /* lenght of control and data regions */ + len_data_buffer=sm_module->data_memory_per_proc_per_segment; + + /* number of data types copies that the scratch buffer can hold */ + n_dts_per_buffer=((int) len_data_buffer)/dt_extent; + if ( 0 == n_dts_per_buffer ) { + rc=OMPI_ERROR; + goto Error; + } + + /* compute number of stripes needed to process this collective */ + n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ; + + /* get my node for the reduction tree */ + my_rank=ompi_comm_rank(comm); + my_reduction_node=&(sm_module->reduction_tree[my_rank]); + my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]); + count_processed=0; + + /* get the working data segments */ + /* NOTE: need to check at communicator creation that we have enough + * temporary buffes for this + */ + for(i=0 ; i < depth_pipeline ; i++ ) { + working_buffers[i].shared_buffer=alloc_sm2_shared_buffer(sm_module); + working_buffers[i].status=BUFFER_AVAILABLE; + working_buffers[i].my_rank=my_rank; + working_buffers[i].my_reduction_node=my_reduction_node; + working_buffers[i].my_fanout_read_tree=my_fanout_read_tree; + } + + n_completed=0; + buffer_index=-1; + /* loop over data segments */ + for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) { + + /* + * allocate working buffer + */ + + /* get working_buffers index - this needs to be deterministic, + * as each process is getting this pointer on it's own, so all + * need to point to the same data structure + */ + buffer_index++; + /* wrap around */ + if( buffer_index == depth_pipeline){ + buffer_index=0; + } + + /* wait for buffer to become available */ + while ( working_buffers[buffer_index].status != BUFFER_AVAILABLE ) { + /* loop over working buffers, and progress the reduction */ + for( i=0 ; i < depth_pipeline ; i++ ) { + if( working_buffers[i].status != BUFFER_AVAILABLE ){ + rc=progress_fanin_fanout( sbuf, rbuf, dtype, op, + &(working_buffers[buffer_index]), + sm_module->n_poll_loops, &completed); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + /* update the number of completed segments */ + n_completed+=completed; + } + } + + /* overall ompi progress */ + opal_progress(); + } + + /* initialize working buffer for this stripe */ + working_buffers[buffer_index].status=FANIN; + working_buffers[buffer_index].n_child_loops_completed=0; + count_processed=stripe_number*n_dts_per_buffer; + count_this_stripe=n_dts_per_buffer; + if( count_processed + count_this_stripe > count ) + count_this_stripe=count-count_processed; + working_buffers[buffer_index].count_this_stripe=count_this_stripe; + working_buffers[buffer_index].count_processed=count_processed; + tag=sm_module->collective_tag; + sm_module->collective_tag++; + working_buffers[buffer_index].tag=tag; + + /* progress this stripe */ + rc=progress_fanin_fanout( sbuf, rbuf, dtype, op, + &(working_buffers[buffer_index]), + sm_module->n_poll_loops, &completed); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + /* update the number of completed segments */ + n_completed+=completed; + + } + + /* progress remaining data stripes */ + while( n_completed < n_data_segments ) { + for( i=0 ; i < depth_pipeline ; i++ ) { + if( working_buffers[i].status != BUFFER_AVAILABLE ){ + rc=progress_fanin_fanout( sbuf, rbuf, dtype, op, + &(working_buffers[buffer_index]), + sm_module->n_poll_loops, &completed); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + /* update the number of completed segments */ + n_completed+=completed; + } + } + } + + /* free work buffers */ + for(i=0 ; i < depth_pipeline ; i++ ) { + rc=free_sm2_shared_buffer(sm_module); + } + + /* return */ + return rc; + +Error: + /* free work buffers */ + for(i=0 ; i < depth_pipeline ; i++ ) { + rc=free_sm2_shared_buffer(sm_module); + } + return rc; +} +#undef depth_pipeline + /** * Shared memory blocking allreduce. diff --git a/ompi/mca/coll/sm2/coll_sm2_component.c b/ompi/mca/coll/sm2/coll_sm2_component.c index b83138e3df..ab61b27d25 100644 --- a/ompi/mca/coll/sm2/coll_sm2_component.c +++ b/ompi/mca/coll/sm2/coll_sm2_component.c @@ -170,6 +170,12 @@ static int sm2_open(void) cs->order_fanout_read_tree= mca_coll_sm2_param_register_int("order_fanout_read_tree",4); + /* number of polling loops to allow pending resources to + * complete their work + */ + cs->n_poll_loops= + mca_coll_sm2_param_register_int("n_poll_loops",4); + return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index 1cdcc8eefa..52a49ab916 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -566,6 +566,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) sm_module->module_comm=comm; sm_module->comm_size=group_size; + sm_module->n_poll_loops=mca_coll_sm2_component.n_poll_loops; /* * set memory region parameters