From 68aa691171f3223cf668b2e3af18a9c3eba97a89 Mon Sep 17 00:00:00 2001 From: Rich Graham Date: Wed, 27 Feb 2008 14:56:36 +0000 Subject: [PATCH] checkpoint work. This commit was SVN r17620. --- ompi/mca/coll/sm2/coll_sm2.h | 10 +- ompi/mca/coll/sm2/coll_sm2_allreduce.c | 152 +++++++++++++++++++++---- ompi/mca/coll/sm2/coll_sm2_component.c | 4 + ompi/mca/coll/sm2/coll_sm2_module.c | 28 ++++- 4 files changed, 172 insertions(+), 22 deletions(-) diff --git a/ompi/mca/coll/sm2/coll_sm2.h b/ompi/mca/coll/sm2/coll_sm2.h index 6c542f5a0e..1147acad9b 100644 --- a/ompi/mca/coll/sm2/coll_sm2.h +++ b/ompi/mca/coll/sm2/coll_sm2.h @@ -103,6 +103,9 @@ BEGIN_C_DECLS /** MCA parameter: order of reduction tree */ int order_reduction_tree; + /** MCA parameter: order of fan-out read tree */ + int order_fanout_read_tree; + }; /** @@ -146,8 +149,8 @@ BEGIN_C_DECLS /* shared memory data strucutures */ struct mca_coll_sm2_nb_request_process_shared_mem_t { /* flag used to indicate the status of this memory region */ - long long flag; - long long index; + volatile long long flag; + volatile long long index; /* pading */ /* Note: need to change this so it takes less memory */ @@ -276,6 +279,9 @@ BEGIN_C_DECLS /* multinumial reduction tree */ tree_node_t *reduction_tree; + /* multinumial fan-out read tree */ + tree_node_t *fanout_read_tree; + /* collective tag */ long long collective_tag; diff --git a/ompi/mca/coll/sm2/coll_sm2_allreduce.c b/ompi/mca/coll/sm2/coll_sm2_allreduce.c index e2f598062c..ac94dedae0 100644 --- a/ompi/mca/coll/sm2/coll_sm2_allreduce.c +++ b/ompi/mca/coll/sm2/coll_sm2_allreduce.c @@ -29,20 +29,26 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, /* local variables */ int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number; int my_rank, child_rank, parent_rank, child, n_parents, n_children; - int my_parent; + int my_fanin_parent,count_processed,count_this_stripe; + int my_fanout_parent; size_t message_extent,dt_extent,ctl_size,len_data_buffer; long long tag; - volatile char *sm_buffer; - char *my_data_pointer, * volatile child_data_pointer; - char * volatile parent_data_pointer, * volatile root_data_pointer; + volatile char * sm_buffer; + volatile char * my_data_pointer; + volatile char * child_data_pointer; + volatile char * parent_data_pointer; char *my_base_temp_pointer, * volatile child_base_temp_pointer; char * volatile parent_base_temp_pointer, * volatile root_base_temp_pointer; mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer; - mca_coll_sm2_nb_request_process_shared_mem_t * volatile child_ctl_pointer; - mca_coll_sm2_nb_request_process_shared_mem_t * volatile parent_ctl_pointer; - mca_coll_sm2_nb_request_process_shared_mem_t * volatile root_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer; + volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer; mca_coll_sm2_module_t *sm_module; - tree_node_t *my_reduction_node; + tree_node_t *my_reduction_node, *my_fanout_read_tree; + + /* debug */ + fprintf(stderr," GGGG sm2 allreduce called r %d \n",ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ sm_module=(mca_coll_sm2_module_t *) module; @@ -78,18 +84,31 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, /* get my node for the reduction tree */ my_rank=ompi_comm_rank(comm); my_reduction_node=&(sm_module->reduction_tree[my_rank]); + my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]); n_children=my_reduction_node->n_children; n_parents=my_reduction_node->n_parents; - my_parent=my_reduction_node->parent_rank; + my_fanin_parent=my_reduction_node->parent_rank; + my_fanout_parent=my_fanout_read_tree->parent_rank; + count_processed=0; /* get a pointer to the shared-memory working buffer */ /* NOTE: starting with a rather synchronous approach */ for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) { + /* debug */ + fprintf(stderr," GGGG strip_number %d r %d \n", + stripe_number,ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ sm_buffer=alloc_sm2_shared_buffer(sm_module); if( NULL == sm_buffer) { rc=OMPI_ERR_OUT_OF_RESOURCE; goto Error; } + + /* get number of elements to process in this stripe */ + count_this_stripe=n_dts_per_buffer; + if( count_processed + count_this_stripe > count ) + count_this_stripe=count-count_processed; /* get base address to "my" memory segment */ my_base_temp_pointer=(char *) @@ -100,6 +119,11 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, my_ctl_pointer=(mca_coll_sm2_nb_request_process_shared_mem_t *) my_base_temp_pointer; + /* debug */ + fprintf(stderr," GGGG before fan in r %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* * Fan into root phase */ @@ -107,7 +131,19 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, /* copy segment into shared buffer - later on will optimize to * eliminate extra copies. */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + my_data_pointer, sbuf+dt_extent*count_processed); + if( 0 != rc ) { + return OMPI_ERROR; + } + /* debug */ + fprintf(stderr," GGGG copied my data to sm r %d \n", + ompi_comm_rank(comm)); + fprintf(stderr," GGGG tag %lld index %d r %d \n", + tag,stripe_number,ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* * Wait on children, and apply op to their data */ @@ -124,6 +160,12 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, ( mca_coll_sm2_nb_request_process_shared_mem_t * volatile) child_base_temp_pointer; + /* debug */ + fprintf(stderr," GGGG before wait tag %lld index %d p %p r %d \n", + tag,stripe_number,&(child_ctl_pointer->flag), + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* wait until child flag is set */ while(! (child_ctl_pointer->flag == tag & @@ -131,11 +173,21 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, /* Note: Actually need to make progress here */ ; } + /* debug */ + fprintf(stderr," GGGG after wait %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* apply collective operation */ ompi_op_reduce(op,child_data_pointer,my_data_pointer, count,dtype); } + /* debug */ + fprintf(stderr," GGGG got data from kids r %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* set memory barriet to make sure data is in main memory before * the completion flgas are set. @@ -148,25 +200,45 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, my_ctl_pointer->flag=tag; my_ctl_pointer->index=stripe_number; + /* debug */ + fprintf(stderr," GGGG signaled parent p %p val %lld - r %d \n", + &(my_ctl_pointer->flag), + my_ctl_pointer->flag, + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* - * Fan out from root phase - let the memory copies at each + * Fan out from root - let the memory copies at each * stage help reduce memory contention. */ - if( 0 < my_reduction_node->n_parents ) { + if( 0 == my_fanout_read_tree->n_parents ) { /* I am the root - so copy signal children, and then * start reading */ my_ctl_pointer->flag=-tag; + /* debug */ + fprintf(stderr," GGGG reset flag to %lld - r %d \n", + my_ctl_pointer->flag, + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* copy data to user supplied buffer */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)rbuf+dt_extent*count_processed,my_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } } else { - parent_data_pointer=(char *) - ((char *)sm_buffer+my_parent* + parent_base_temp_pointer=(char *) + ((char *)sm_buffer+my_fanout_parent* sm_module->segement_size_per_process); + + parent_data_pointer=parent_base_temp_pointer+ctl_size; + parent_ctl_pointer=parent_base_temp_pointer; - parent_ctl_pointer=parent_data_pointer+ctl_size; child_ctl_pointer= ( mca_coll_sm2_nb_request_process_shared_mem_t * volatile) parent_data_pointer; @@ -174,20 +246,59 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, /* * wait on Parent to signal that data is ready */ + /* debug */ + fprintf(stderr," GGGG waiting on parent - r %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ while(! - (parent_ctl_pointer->flag == -tag & - parent_ctl_pointer->index== stripe_number) ) { + /* in fan-in, index was already set correctly, so + * no need to check it + */ + (parent_ctl_pointer->flag == -tag ) ) { + /* debug */ + fprintf(stderr," VVVV flag %lld index %lld - r %d \n", + parent_ctl_pointer->flag, + parent_ctl_pointer->index, + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* Note: Actually need to make progress here */ ; } + /* debug */ + fprintf(stderr," GGGG done waiting on parent - r %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ - /* copy data to user supplied buffer */ - root_base_temp_pointer=(char *)sm_buffer; - root_data_pointer=child_base_temp_pointer+ctl_size; + /* copy the data to my shared buffer, for access by children */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + my_data_pointer,parent_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + + /* set memory barriet to make sure data is in main memory before + * the completion flgas are set. + */ + MB(); + /* debug */ + fprintf(stderr," GGGG about to signal children - r %d \n", + ompi_comm_rank(comm)); + fflush(stderr); + /* end debug */ /* signal children that they may read the result data */ my_ctl_pointer->flag=-tag; + /* copy data to user supplied buffer */ + rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe, + (char *)rbuf+dt_extent*count_processed,my_data_pointer); + if( 0 != rc ) { + return OMPI_ERROR; + } + } /* "free" the shared-memory working buffer */ @@ -195,6 +306,9 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count, if( OMPI_SUCCESS != rc ) { goto Error; } + + /* update the count of elements processed */ + count_processed+=count_this_stripe; } /* return */ diff --git a/ompi/mca/coll/sm2/coll_sm2_component.c b/ompi/mca/coll/sm2/coll_sm2_component.c index 2f7d93de49..ed4b697c01 100644 --- a/ompi/mca/coll/sm2/coll_sm2_component.c +++ b/ompi/mca/coll/sm2/coll_sm2_component.c @@ -176,6 +176,10 @@ static int sm2_open(void) cs->order_reduction_tree= mca_coll_sm2_param_register_int("order_reduction_tree",2); + /* Order of fan-out read Tree */ + cs->order_fanout_read_tree= + mca_coll_sm2_param_register_int("order_fanout_read_tree",4); + return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/sm2/coll_sm2_module.c b/ompi/mca/coll/sm2/coll_sm2_module.c index 9cd279ccd5..477cf49bdb 100644 --- a/ompi/mca/coll/sm2/coll_sm2_module.c +++ b/ompi/mca/coll/sm2/coll_sm2_module.c @@ -82,6 +82,19 @@ mca_coll_sm2_module_destruct(mca_coll_sm2_module_t *module) } free(module->reduction_tree); } + + /* free fan-out read tree */ + if( NULL != module->fanout_read_tree ) { + for( i=0 ; i < module->comm_size ; i++ ) { + if( NULL != module->fanout_read_tree[i].children_ranks) { + free(module->fanout_read_tree[i].children_ranks); + } + } + free(module->fanout_read_tree); + } + + + /* done */ } static bool have_local_peers(ompi_group_t *group, size_t size) @@ -510,6 +523,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) * Some initialization */ sm_module->reduction_tree=NULL; + sm_module->fanout_read_tree=NULL; /* * create backing file @@ -634,7 +648,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) goto CLEANUP; } - /* initialize barrier reduction tree */ + /* initialize reduction tree */ sm_module->reduction_tree=(tree_node_t *) malloc( sizeof(tree_node_t )*group_size); if( NULL == sm_module->reduction_tree ) { @@ -647,6 +661,18 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority) goto CLEANUP; } + /* initialize fan-out read tree */ + sm_module->fanout_read_tree=(tree_node_t *) malloc( + sizeof(tree_node_t )*group_size); + if( NULL == sm_module->fanout_read_tree ) { + goto CLEANUP; + } + + ret=setup_multinomial_tree(mca_coll_sm2_component.order_fanout_read_tree, + group_size,sm_module->fanout_read_tree); + if( MPI_SUCCESS != ret ) { + goto CLEANUP; + } /* initialize local counters */ sm_module->sm2_allocated_buffer_index=-1; sm_module->sm2_freed_buffer_index=-1;