diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index 54a95c3568..71a8ff87f9 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -177,6 +177,9 @@ BEGIN_C_DECLS /* log 2 of largest full power of 2 for this node set */ int log_2; + /* largest power of 2 that fits in this group */ + int n_largest_pow_2; + /* node type */ int node_type; @@ -386,6 +389,9 @@ BEGIN_C_DECLS long long barrier_bank_cntr; /* end debug */ + /* scratch space - one int per process */ + int *scratch_space; + }; typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t; diff --git a/ompi/mca/coll/sm2/coll_sm2_allreduce.c b/ompi/mca/coll/sm2/coll_sm2_allreduce.c index f07a535300..145dec2565 100644 --- a/ompi/mca/coll/sm2/coll_sm2_allreduce.c +++ b/ompi/mca/coll/sm2/coll_sm2_allreduce.c @@ -1106,6 +1106,340 @@ Error: return rc; } +/** + * Shared memory blocking allreduce. + */ +static +int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, + int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + struct mca_coll_base_module_1_1_0_t *module) +{ + /* local varibles */ + int i,rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number; + int pair_rank,exchange,extra_rank,n_proc_data,tmp; + int starting_proc,n_procs_to_read,base_block_proc,base_read_proc; + int n_elements_per_proc, n_residual_elements; + int cnt_offset,n_copy; + pair_exchange_node_t *my_exchange_node; + int my_rank,comm_size,count_processed,count_this_stripe; + int count_this_exchange; + int done_copy_tag,ok_to_copy_tag; + size_t len_data_buffer; + ptrdiff_t dt_extent; + long long tag, base_tag; + sm_work_buffer_t *sm_buffer_desc; + volatile char * my_write_pointer; + volatile char * extra_rank_write_data_pointer; + volatile char * extra_rank_read_data_pointer; + volatile char * partner_base_pointer; + volatile char * my_pointer; + volatile char * my_base_pointer; + volatile char * partner_pointer; + volatile char * source_pointer; + mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * + partner_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * + extra_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * + source_ctl_pointer; + mca_coll_sm2_module_t *sm_module; + + sm_module=(mca_coll_sm2_module_t *) module; + + /* get size of data needed - same layout as user data, so that + * we can apply the reudction routines directly on these buffers + */ + rc=ompi_ddt_type_extent(dtype, &dt_extent); + if( OMPI_SUCCESS != rc ) { + goto Error; + } + + /* lenght of control and data regions */ + len_data_buffer=sm_module->data_memory_per_proc_per_segment; + + /* number of data types copies that the scratch buffer can hold */ + n_dts_per_buffer=((int) len_data_buffer)/dt_extent; + if ( 0 == n_dts_per_buffer ) { + rc=OMPI_ERROR; + goto Error; + } + + len_data_buffer=n_dts_per_buffer*dt_extent; + + /* compute number of stripes needed to process this collective */ + n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ; + + /* get my node for the reduction tree */ + my_exchange_node=&(sm_module->recursive_doubling_tree); + my_rank=ompi_comm_rank(comm); + comm_size=ompi_comm_size(comm); + + /* get access to shared memory working buffer */ + sm_buffer_desc=alloc_sm2_shared_buffer(sm_module); + my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region; + my_base_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment; + + count_processed=0; + for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) { + /* get number of elements to process in this stripe */ + /* debug + t2=opal_sys_timer_get_cycles(); + end debug */ + count_this_stripe=n_dts_per_buffer; + if( count_processed + count_this_stripe > count ) + count_this_stripe=count-count_processed; + + /* compute the number of elements "owned" by each process */ + n_elements_per_proc=(count_this_stripe/my_exchange_node->n_largest_pow_2); + n_residual_elements=count_this_stripe- + n_elements_per_proc*my_exchange_node->n_largest_pow_2; + for(i=0 ; i < my_exchange_node->n_largest_pow_2 ; i++ ) { + sm_module->scratch_space[i]=n_elements_per_proc; + if( i < n_residual_elements) { + sm_module->scratch_space[i]++; + } + } + /* debug + fprintf(stderr," my_rank %d element list count_this_stripe %d : ",my_rank,count_this_stripe); + for(i=0 ; i < comm_size ; i++ ) { + fprintf(stderr," %d ",sm_module->scratch_space[i]); + } + fprintf(stderr," \n"); + fflush(stderr); + end debug */ + + /* get unique set of tags for this stripe. + * Assume only one collective + * per communicator at a given time, so no locking needed + * for atomic update of the tag */ + base_tag=sm_module->collective_tag; + /* log_2 tags for recursive doubling, one for the non-power of 2 + * initial send, 1 for first copy into shared memory, and + * one for completing the copyout. + */ + sm_module->collective_tag+=(my_exchange_node->log_2+3); + + + /* copy data into the write buffer */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)my_base_pointer, + (char *)((char *)sbuf+dt_extent*count_processed)); + if( 0 != rc ) { + return OMPI_ERROR; + } + /* debug + { int *int_tmp=(int *)my_base_pointer; + int i; + fprintf(stderr," my rank %d data in tmp :: ",my_rank); + for (i=0 ; i < count_this_stripe ; i++ ) { + fprintf(stderr," %d ",int_tmp[i]); + } + fprintf(stderr,"\n"); + fflush(stderr); + } + end debug */ + + /* debug + t3=opal_sys_timer_get_cycles(); + timers[1]+=(t3-t2); + end debug */ + + /* copy data in from the "extra" source, if need be */ + tag=base_tag; + if(0 < my_exchange_node->n_extra_sources) { + if ( EXCHANGE_NODE == my_exchange_node->node_type ) { + extra_rank=my_exchange_node->rank_extra_source; + extra_ctl_pointer= + sm_buffer_desc->proc_memory[extra_rank].control_region; + extra_rank_write_data_pointer= + sm_buffer_desc->proc_memory[extra_rank].data_segment; + + /* wait until remote data is read */ + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* apply collective operation */ + ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, + (void *)my_base_pointer, count_this_stripe,dtype); + } else { + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + + MB(); + + /* + * Signal parent that data is ready + */ + my_ctl_pointer->flag=tag; + + } + } + MB(); + + /* + * reduce-scatter + */ + /* + * Signal parent that data is ready + */ + tag=base_tag+1; + my_ctl_pointer->flag=tag; + + /* + * loop over data exchanges + */ + /* set the number of procs whos's data I will manipulate - this starts + * at the number of procs in the exchange, so a divide by two at each + * iteration will give the right number of proc for the given iteration + */ + /* debug + { int *int_tmp=(int *)my_base_pointer; + int i; + fprintf(stderr," GGG my rank %d data in tmp :: ",my_rank); + for (i=0 ; i < count_this_stripe ; i++ ) { + fprintf(stderr," %d ",int_tmp[i]); + } + fprintf(stderr,"\n"); + fflush(stderr); + } + end debug */ + n_proc_data=my_exchange_node->n_largest_pow_2; + starting_proc=0; + for(exchange=my_exchange_node->n_exchanges-1;exchange>=0;exchange--) { + + /* is the remote data read */ + pair_rank=my_exchange_node->rank_exchanges[exchange]; + + partner_ctl_pointer= + sm_buffer_desc->proc_memory[pair_rank].control_region; + partner_base_pointer= + sm_buffer_desc->proc_memory[pair_rank].data_segment; + + /* wait until remote data is read */ + while( partner_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* figure out the base address to use : the lower rank gets + * the upper data, with the higher rank getting the lower half + * of the current chunk */ + n_proc_data=n_proc_data/2; + if(pair_rank < my_rank ) { + starting_proc+=n_proc_data; + } + + /* figure out my staring pointer */ + tmp=0; + for(i=0 ; i < starting_proc ; i++ ) { + tmp+=sm_module->scratch_space[i]; + } + my_pointer=my_base_pointer+tmp*dt_extent; + /* figure out partner's staring pointer */ + partner_pointer=partner_base_pointer+tmp*dt_extent; + + /* figure out how much to read */ + tmp=0; + for(i=starting_proc ; i < starting_proc+n_proc_data ; i++ ) { + tmp+=sm_module->scratch_space[i]; + } + count_this_exchange=tmp; + + /* reduce data into my write buffer */ + /* apply collective operation */ + ompi_op_reduce(op,(void *)partner_pointer, + (void *)my_pointer, count_this_exchange,dtype); + /* debug + { int *int_tmp=(int *)my_pointer; + int i; + fprintf(stderr," result my rank %d data in tmp :: ",my_rank); + for (i=0 ; i < count_this_exchange ; i++ ) { + fprintf(stderr," %d ",int_tmp[i]); + } + fprintf(stderr,"\n"); + int_tmp=(int *)partner_pointer; + fprintf(stderr," partner data my rank %d data in tmp :: ",my_rank); + for (i=0 ; i < count_this_exchange ; i++ ) { + fprintf(stderr," %d ",int_tmp[i]); + } + fprintf(stderr,"\n"); + fflush(stderr); + } + end debug */ + + /* signal that I am done reading my peer's data */ + tag++; + MB(); + my_ctl_pointer->flag=tag; + + + } /* end exchange loop */ + + /* debug + t8=opal_sys_timer_get_cycles(); + end debug */ + /* copy data out to final destination. Could do some sort of + * recursive doubleing in the sm, then copy to process private, + * which reduces memory contention. However, this also almost + * doubles the number of copies. + */ + ok_to_copy_tag=base_tag+1+my_exchange_node->log_2; + + /* read from the result buffers directly to the final destinaion */ + cnt_offset=0; + for(n_copy=0 ; n_copy < my_exchange_node->n_largest_pow_2 ; n_copy++ ) { + + if( 0 >= sm_module->scratch_space[n_copy] ) + continue; + + source_ctl_pointer= + sm_buffer_desc->proc_memory[n_copy].control_region; + source_pointer= + sm_buffer_desc->proc_memory[n_copy].data_segment; + + /* wait until remote data is read */ + while( source_ctl_pointer->flag < ok_to_copy_tag ) { + opal_progress(); + } + /* copy data into the destination buffer */ + rc=ompi_ddt_copy_content_same_ddt(dtype, + sm_module->scratch_space[n_copy], + (char *)((char *)rbuf+ + dt_extent*(count_processed+cnt_offset)), + (char *)((char *)source_pointer+ + dt_extent*cnt_offset)); + if( 0 != rc ) { + return OMPI_ERROR; + } + cnt_offset+=sm_module->scratch_space[n_copy]; + } + done_copy_tag=base_tag+2+my_exchange_node->log_2; + my_ctl_pointer->flag=done_copy_tag; + + /* wait for all to read the data, before re-using this buffer */ + if( stripe_number < (n_data_segments-1) ) { + for(n_copy=0 ; n_copy < comm_size ; n_copy++ ) { + source_ctl_pointer= + sm_buffer_desc->proc_memory[n_copy].control_region; + while( source_ctl_pointer-> flag < done_copy_tag ) { + opal_progress(); + } + } + } + + /* update the count of elements processed */ + count_processed+=count_this_stripe; + } + /* return */ + return rc; + +Error: + return rc; +} #if 0 /* just storing various versions of the recursive doubling algorithm, * so can compare them later on. @@ -1421,17 +1755,7 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf, if( 0 != rc ) { return OMPI_ERROR; } - /* debug - t9=opal_sys_timer_get_cycles(); - timers[5]+=(t9-t8); - end debug */ - /* "free" the shared-memory working buffer */ - /* debug - t10=opal_sys_timer_get_cycles(); - timers[6]+=(t10-t9); - end debug */ - /* update the count of elements processed */ count_processed+=count_this_stripe; } @@ -1462,13 +1786,18 @@ int mca_coll_sm2_allreduce_intra(void *sbuf, void *rbuf, int count, #if 0 if( 0 != (op->o_flags & OMPI_OP_FLAGS_COMMUTE)) { -#endif /* Commutative Operation */ rc= mca_coll_sm2_allreduce_intra_recursive_doubling(sbuf, rbuf, count, dtype, op, comm, module); if( OMPI_SUCCESS != rc ) { goto Error; } +#endif + rc= mca_coll_sm2_allreduce_intra_reducescatter_allgather(sbuf, rbuf, count, + dtype, op, comm, module); + if( OMPI_SUCCESS != rc ) { + goto Error; + } #if 0 } else { /* Non-Commutative Operation */ diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index 5b6c7a46e0..884a5104a2 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -1022,6 +1022,12 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) } + /* allocate process private scratch space */ + sm_module->scratch_space=(int *)malloc(sizeof(int)*group_size); + if( NULL == sm_module->scratch_space) { + goto CLEANUP; + } + /* touch pages to apply memory affinity - Note: do we really need this or will * the algorithms do this */ @@ -1060,6 +1066,11 @@ CLEANUP: sm_module->sm_buffer_descriptor=NULL; } + if(sm_module->scratch_space) { + free(sm_module->scratch_space); + sm_module->scratch_space=NULL; + } + OBJ_RELEASE(sm_module); return NULL; diff --git a/ompi/mca/coll/sm2/coll_sm2_service.c b/ompi/mca/coll/sm2/coll_sm2_service.c index 17df871753..832f07772f 100644 --- a/ompi/mca/coll/sm2/coll_sm2_service.c +++ b/ompi/mca/coll/sm2/coll_sm2_service.c @@ -234,6 +234,12 @@ int setup_recursive_doubling_tree_node(int num_nodes, int node_rank, } exchange_node->log_2=n_exchanges; + tmp=1; + for(i=0 ; i < n_exchanges ; i++ ) { + tmp*=2; + } + exchange_node->n_largest_pow_2=tmp; + /* set node characteristics - node that is not within the largest * power of 2 will just send it's data to node that will participate * in the recursive doubling, and get the result back at the end.