From 5900415a25f3310ddc09dd1c1e2e4b8f383969e8 Mon Sep 17 00:00:00 2001 From: Rich Graham Date: Thu, 22 May 2008 18:50:53 +0000 Subject: [PATCH] for non-powers of 2, distribute the work on the first step among all the procs doing the work. This commit was SVN r18480. --- ompi/mca/coll/sm2/coll_sm2_allreduce.c | 236 ++++++++++++++++++------- 1 file changed, 171 insertions(+), 65 deletions(-) diff --git a/ompi/mca/coll/sm2/coll_sm2_allreduce.c b/ompi/mca/coll/sm2/coll_sm2_allreduce.c index c468d56cca..19d797ea18 100644 --- a/ompi/mca/coll/sm2/coll_sm2_allreduce.c +++ b/ompi/mca/coll/sm2/coll_sm2_allreduce.c @@ -138,7 +138,6 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, while(! ( (child_ctl_pointer->flag == tag) & (child_ctl_pointer->index== stripe_number) ) ) { - /* Note: Actually need to make progress here */ opal_progress(); } @@ -785,6 +784,7 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf, sm_work_buffer_t *sm_buffer_desc; volatile char * my_tmp_data_buffer[2]; volatile char * my_write_pointer; + volatile char * my_extra_write_pointer; volatile char * my_read_pointer; volatile char * extra_rank_write_data_pointer; volatile char * extra_rank_read_data_pointer; @@ -883,23 +883,62 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf, /* copy data in from the "extra" source, if need be */ tag=base_tag; if(0 < my_exchange_node->n_extra_sources) { + int n_my_count; if ( EXCHANGE_NODE == my_exchange_node->node_type ) { + /* signal to partner that I am ready */ + MB(); + /* + * Signal extra node that data is ready + */ + my_ctl_pointer->flag=tag; + + /* figure out my portion of the reduction */ + n_my_count=count_this_stripe/2; + extra_rank=my_exchange_node->rank_extra_source; extra_ctl_pointer= sm_buffer_desc->proc_memory[extra_rank].control_region; - extra_rank_write_data_pointer= - sm_buffer_desc->proc_memory[extra_rank].data_segment; + extra_rank_write_data_pointer= + sm_buffer_desc->proc_memory[extra_rank].data_segment; - /* wait until remote data is read */ - while( extra_ctl_pointer->flag < tag ) { - opal_progress(); - } - - /* apply collective operation */ - ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, - (void *)my_write_pointer, count_this_stripe,dtype); + /* wait until remote data is read */ + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* apply collective operation to first half of the data */ + ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, + (void *)my_write_pointer, n_my_count,dtype); + + + /* wait for my partner to finish reducing the data */ + tag=base_tag+1; + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + + /* read my partner's data */ + + /* adjust read an write pointers */ + extra_rank_write_data_pointer+= + (count_this_stripe-n_my_count)*dt_extent; + + rc=ompi_ddt_copy_content_same_ddt(dtype, + count_this_stripe-n_my_count, + (char *)(my_write_pointer+ + (count_this_stripe-n_my_count)*dt_extent), + (char *)extra_rank_write_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* now we are ready for the power of 2 portion of the + * algorithm + */ + } else { /* set memory barriet to make sure data is in main memory before @@ -909,10 +948,39 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf, MB(); /* - * Signal parent that data is ready + * Signal extra node that data is ready */ my_ctl_pointer->flag=tag; + /* figure out my portion of the reduction */ + n_my_count=count_this_stripe-(count_this_stripe/2); + + /* get the pointer to the partners data that needs to be reduced */ + extra_rank=my_exchange_node->rank_extra_source; + extra_ctl_pointer= + sm_buffer_desc->proc_memory[extra_rank].control_region; + extra_rank_write_data_pointer= + sm_buffer_desc->proc_memory[extra_rank].data_segment; + /* offset into my half of the data */ + extra_rank_write_data_pointer+= + (count_this_stripe/2)*dt_extent; + my_extra_write_pointer=my_write_pointer+ + (count_this_stripe/2)*dt_extent; + + /* wait until remote data is read */ + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* apply collective operation to second half of the data */ + ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, + (void *)my_extra_write_pointer, n_my_count,dtype); + + /* signal that I am done, so my partner can read my data */ + MB(); + tag=base_tag+1; + my_ctl_pointer->flag=tag; + } } @@ -957,28 +1025,10 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf, /* reduce data into my write buffer */ /* apply collective operation */ - /* - ompi_op_reduce(op,(void *)partner_read_pointer, - (void *)my_write_pointer, count_this_stripe,dtype); - */ - - /* test */ ompi_3buff_op_reduce(op,my_read_pointer,partner_read_pointer, my_write_pointer,count_this_stripe,dtype); - /* - { - int ii,n_ints; - int * restrict my_read=(int *)my_read_pointer; - int * restrict my_write=(int *)my_write_pointer; - int * restrict exchange_read=(int *)partner_read_pointer; - n_ints=count_this_stripe; - for(ii=0 ; ii < n_ints ; ii++ ) { - my_write[ii]=my_read[ii]+exchange_read[ii]; - } - } - */ /* debug t6=opal_sys_timer_get_cycles(); timers[3]+=(t6-t5); @@ -1134,6 +1184,7 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, volatile char * partner_base_pointer; volatile char * my_pointer; volatile char * my_base_pointer; + volatile char * my_extra_write_pointer; volatile char * partner_pointer; volatile char * source_pointer; mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer; @@ -1200,14 +1251,6 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, sm_module->scratch_space[i]++; } } - /* debug - fprintf(stderr," my_rank %d element list count_this_stripe %d : ",my_rank,count_this_stripe); - for(i=0 ; i < comm_size ; i++ ) { - fprintf(stderr," %d ",sm_module->scratch_space[i]); - } - fprintf(stderr," \n"); - fflush(stderr); - end debug */ /* get unique set of tags for this stripe. * Assume only one collective @@ -1228,17 +1271,6 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, if( 0 != rc ) { return OMPI_ERROR; } - /* debug - { int *int_tmp=(int *)my_base_pointer; - int i; - fprintf(stderr," my rank %d data in tmp :: ",my_rank); - for (i=0 ; i < count_this_stripe ; i++ ) { - fprintf(stderr," %d ",int_tmp[i]); - } - fprintf(stderr,"\n"); - fflush(stderr); - } - end debug */ /* debug t3=opal_sys_timer_get_cycles(); @@ -1247,22 +1279,65 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, /* copy data in from the "extra" source, if need be */ tag=base_tag; - if(0 < my_exchange_node->n_extra_sources) { - if ( EXCHANGE_NODE == my_exchange_node->node_type ) { - extra_rank=my_exchange_node->rank_extra_source; - extra_ctl_pointer= - sm_buffer_desc->proc_memory[extra_rank].control_region; - extra_rank_write_data_pointer= - sm_buffer_desc->proc_memory[extra_rank].data_segment; + if(0 < my_exchange_node->n_extra_sources) { + int n_my_count; + + if ( EXCHANGE_NODE == my_exchange_node->node_type ) { + + /* signal to partner that I am ready */ + MB(); + /* + * Signal extra node that data is ready + */ + my_ctl_pointer->flag=tag; + + /* figure out my portion of the reduction */ + n_my_count=count_this_stripe/2; + + extra_rank=my_exchange_node->rank_extra_source; + extra_ctl_pointer= + sm_buffer_desc->proc_memory[extra_rank].control_region; + extra_rank_write_data_pointer= + sm_buffer_desc->proc_memory[extra_rank].data_segment; + + /* wait until remote data is read */ + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* apply collective operation to first half of the data */ + if( 0 < n_my_count ) { + ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, + (void *)my_base_pointer, n_my_count,dtype); + } + + + /* wait for my partner to finish reducing the data */ + tag=base_tag+1; + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + + /* read my partner's data */ - /* wait until remote data is read */ - while( extra_ctl_pointer->flag < tag ) { - opal_progress(); - } - - /* apply collective operation */ - ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, - (void *)my_base_pointer, count_this_stripe,dtype); + /* adjust read an write pointers */ + extra_rank_write_data_pointer+=(n_my_count*dt_extent); + + if( 0 < (count_this_stripe-n_my_count) ) { + rc=ompi_ddt_copy_content_same_ddt(dtype, + count_this_stripe-n_my_count, + (char *)(my_base_pointer+n_my_count*dt_extent), + (char *)extra_rank_write_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + } + + /* now we are ready for the power of 2 portion of the + * algorithm + */ + } else { /* set memory barriet to make sure data is in main memory before @@ -1272,10 +1347,41 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf, MB(); /* - * Signal parent that data is ready + * Signal extra node that data is ready */ my_ctl_pointer->flag=tag; + /* figure out my portion of the reduction */ + n_my_count=count_this_stripe-(count_this_stripe/2); + + /* get the pointer to the partners data that needs to be reduced */ + extra_rank=my_exchange_node->rank_extra_source; + extra_ctl_pointer= + sm_buffer_desc->proc_memory[extra_rank].control_region; + extra_rank_write_data_pointer= + sm_buffer_desc->proc_memory[extra_rank].data_segment; + /* offset into my half of the data */ + extra_rank_write_data_pointer+= + ((count_this_stripe/2)*dt_extent); + my_extra_write_pointer=my_base_pointer+ + ((count_this_stripe/2)*dt_extent); + + /* wait until remote data is read */ + while( extra_ctl_pointer->flag < tag ) { + opal_progress(); + } + + /* apply collective operation to second half of the data */ + if( 0 < n_my_count ) { + ompi_op_reduce(op,(void *)extra_rank_write_data_pointer, + (void *)my_extra_write_pointer, n_my_count,dtype); + } + + /* signal that I am done, so my partner can read my data */ + MB(); + tag=base_tag+1; + my_ctl_pointer->flag=tag; + } } MB();