diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index cb5a50a5c1..1b08917de4 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -1030,13 +1030,29 @@ int ompi_io_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh, fh->f_flags |= OMPIO_AGGREGATOR_IS_SET; if (-1 == num_aggregators) { - mca_io_ompio_create_groups(fh,bytes_per_proc); + if ( SIMPLE == mca_io_ompio_grouping_option || + NO_REFINEMENT == mca_io_ompio_grouping_option ) { + fh->f_aggregator_index = 0; + fh->f_final_num_aggrs = fh->f_init_num_aggrs; + fh->f_procs_per_group = fh->f_init_procs_per_group; + + fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int)); + if (NULL == fh->f_procs_in_group) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + for (j=0 ; jf_procs_per_group ; j++) { + fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j]; + } + } + else { + mca_io_ompio_create_groups(fh,bytes_per_proc); + } return OMPI_SUCCESS; } //Forced number of aggregators - else - { /* calculate the offset at which each group of processes will start */ procs_per_group = ceil ((float)fh->f_size/num_aggregators); @@ -1062,7 +1078,6 @@ int ompi_io_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh, fh->f_final_num_aggrs = num_aggregators; return OMPI_SUCCESS; - } } diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 703ed0bbc8..4f54e9af6d 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -101,15 +101,22 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info; #define OMPIO_MERGE 1 #define OMPIO_SPLIT 2 #define OMPIO_RETAIN 3 + #define DATA_VOLUME 1 #define UNIFORM_DISTRIBUTION 2 -#define OMPIO_UNIFORM_DIST_THRESHOLD 0.5 #define CONTIGUITY 3 -#define OMPIO_CONTG_THRESHOLD 1048576 #define OPTIMIZE_GROUPING 4 -#define OMPIO_PROCS_PER_GROUP_TAG 0 -#define OMPIO_PROCS_IN_GROUP_TAG 1 -#define OMPIO_MERGE_THRESHOLD 0.5 +#define SIMPLE 5 +#define NO_REFINEMENT 6 + + +#define OMPIO_UNIFORM_DIST_THRESHOLD 0.5 +#define OMPIO_CONTG_THRESHOLD 1048576 +#define OMPIO_CONTG_FACTOR 8 +#define OMPIO_DEFAULT_STRIPE_SIZE 1048576 +#define OMPIO_PROCS_PER_GROUP_TAG 0 +#define OMPIO_PROCS_IN_GROUP_TAG 1 +#define OMPIO_MERGE_THRESHOLD 0.5 /*---------------------------*/ @@ -543,6 +550,9 @@ int mca_io_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh); int mca_io_ompio_fview_based_grouping(mca_io_ompio_file_t *fh, int *num_groups, contg *contg_groups); +int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, + int *num_groups, + contg *contg_groups); int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh, int num_groups, diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index 4c2c086821..af8918985c 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -38,7 +38,7 @@ int mca_io_ompio_record_offset_info = 0; int mca_io_ompio_coll_timing_info = 0; int mca_io_ompio_sharedfp_lazy_open = 1; -int mca_io_ompio_grouping_option=0; +int mca_io_ompio_grouping_option=5; /* * Private functions @@ -202,10 +202,13 @@ static int register_component(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_sharedfp_lazy_open); - mca_io_ompio_grouping_option = 0; + mca_io_ompio_grouping_option = 5; (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, "grouping_option", - "Option for grouping of processes in the aggregator selection", + "Option for grouping of processes in the aggregator selection " + "1: Data volume based grouping 2: maximizing group size uniformity 3: maximimze " + "data contiguity 4: hybrid optimization 5: simple (default) " + "6: skip refinement step", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, diff --git a/ompi/mca/io/ompio/io_ompio_file_set_view.c b/ompi/mca/io/ompio/io_ompio_file_set_view.c index 0f5d75a31c..0cf04b1373 100644 --- a/ompi/mca/io/ompio/io_ompio_file_set_view.c +++ b/ompi/mca/io/ompio/io_ompio_file_set_view.c @@ -148,19 +148,29 @@ int mca_io_ompio_set_view_internal(mca_io_ompio_file_t *fh, } } - if( OMPI_SUCCESS != mca_io_ompio_fview_based_grouping(fh, + if ( SIMPLE != mca_io_ompio_grouping_option ) { + if( OMPI_SUCCESS != mca_io_ompio_fview_based_grouping(fh, &num_groups, contg_groups)){ - opal_output(1, "mca_io_ompio_fview_based_grouping() failed\n"); - free(contg_groups); - return OMPI_ERROR; + opal_output(1, "mca_io_ompio_fview_based_grouping() failed\n"); + free(contg_groups); + return OMPI_ERROR; + } } - if( !( (fh->f_comm->c_flags & OMPI_COMM_CART) && - (num_groups == 1 || num_groups == fh->f_size)) ) { - mca_io_ompio_finalize_initial_grouping(fh, - num_groups, - contg_groups); + else { + if( OMPI_SUCCESS != mca_io_ompio_simple_grouping(fh, + &num_groups, + contg_groups)){ + opal_output(1, "mca_io_ompio_simple_grouping() failed\n"); + free(contg_groups); + return OMPI_ERROR; + } } + + + mca_io_ompio_finalize_initial_grouping(fh, + num_groups, + contg_groups); for( i = 0; i < fh->f_size; i++){ free(contg_groups[i].procs_in_contg_group); } @@ -231,7 +241,7 @@ int mca_io_ompio_file_get_view (struct ompi_file_t *fp, OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh) { - int uniform = 0, global_uniform = 0; + int uniform = 0; OMPI_MPI_OFFSET_TYPE avg[3] = {0,0,0}; OMPI_MPI_OFFSET_TYPE global_avg[3] = {0,0,0}; int i = 0; @@ -268,6 +278,10 @@ OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh) global_avg[0] = global_avg[0]/fh->f_size; global_avg[1] = global_avg[1]/fh->f_size; +#if 0 + /* Disabling the feature since we are not using it anyway. Saves us one allreduce operation. */ + int global_uniform=0; + if ( global_avg[0] == avg[0] && global_avg[1] == avg[1] && 0 == avg[2] && @@ -293,10 +307,53 @@ OMPI_MPI_OFFSET_TYPE get_contiguous_chunk_size (mca_io_ompio_file_t *fh) /* yes, everybody agrees on having a uniform file view */ fh->f_flags |= OMPIO_UNIFORM_FVIEW; } - +#endif return global_avg[0]; } +int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, + int *num_groups, + contg *contg_groups) +{ + size_t stripe_size = (size_t) fh->f_stripe_size; + int group_size = 0; + int k=0, p=0, g=0; + int total_procs = 0; + + if ( 0 < fh->f_stripe_size ) { + stripe_size = OMPIO_DEFAULT_STRIPE_SIZE; + } + + if ( stripe_size > fh->f_cc_size ) { + group_size = (((int)stripe_size/(int)fh->f_cc_size) > fh->f_size ) ? fh->f_size : ((int)stripe_size/(int)fh->f_cc_size); + *num_groups = fh->f_size / group_size; + } + else if ( fh->f_cc_size <= OMPIO_CONTG_FACTOR * stripe_size) { + *num_groups = fh->f_size/OMPIO_CONTG_FACTOR > 0 ? (fh->f_size/OMPIO_CONTG_FACTOR) : 1 ; + group_size = OMPIO_CONTG_FACTOR; + } + else { + *num_groups = fh->f_size; + group_size = 1; + } + + for ( k=0, p=0; p<*num_groups; p++ ) { + if ( p == (*num_groups - 1) ) { + contg_groups[p].procs_per_contg_group = fh->f_size - total_procs; + } + else { + contg_groups[p].procs_per_contg_group = group_size; + total_procs +=group_size; + } + for ( g=0; gf_rank; - if( OMPIO_ROOT == fh->f_rank){ - start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE)); - if (NULL == start_offsets_lens) { - opal_output (1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE)); - if (NULL == end_offsets) { - opal_output (1, "OUT OF MEMORY\n"); - free(start_offsets_lens); - return OMPI_ERR_OUT_OF_RESOURCE; - } - + start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE)); + if (NULL == start_offsets_lens) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; } - //Gather start offsets across processes in a group on aggregator - fh->f_comm->c_coll.coll_gather (start_offset_len, - 3, - OMPI_OFFSET_DATATYPE, - start_offsets_lens, - 3, - OMPI_OFFSET_DATATYPE, - OMPIO_ROOT, - fh->f_comm, - fh->f_comm->c_coll.coll_gather_module); - + end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE)); + if (NULL == end_offsets) { + opal_output (1, "OUT OF MEMORY\n"); + free(start_offsets_lens); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + //Allgather start offsets across processes in a group on aggregator + fh->f_comm->c_coll.coll_allgather (start_offset_len, + 3, + OMPI_OFFSET_DATATYPE, + start_offsets_lens, + 3, + OMPI_OFFSET_DATATYPE, + fh->f_comm, + fh->f_comm->c_coll.coll_allgather_module); + //Calculate contg chunk size and contg subgroups - if(OMPIO_ROOT == fh->f_rank){ - for( k = 0 ; k < fh->f_size; k++){ - end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1]; - contg_groups[k].contg_chunk_size = 0; - } - k = 0; - while( k < fh->f_size){ - if( k == 0){ - contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; - contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; - g++; - contg_groups[p].procs_per_contg_group = g; - k++; - } - else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){ - contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; - contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; - g++; - contg_groups[p].procs_per_contg_group = g; - k++; - } - else{ - p++; - g = 0; - contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; - contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; - g++; - contg_groups[p].procs_per_contg_group = g; - k++; - } - } - - *num_groups = p+1; - if (NULL != start_offsets_lens) { - free (start_offsets_lens); - start_offsets_lens = NULL; - } - if (NULL != end_offsets) { - free (end_offsets); - end_offsets = NULL; - } + for( k = 0 ; k < fh->f_size; k++){ + end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1]; + contg_groups[k].contg_chunk_size = 0; } - - //bcast num_groups to all procs - fh->f_comm->c_coll.coll_bcast (num_groups, - 1, - MPI_INT, - OMPIO_ROOT, - fh->f_comm, - fh->f_comm->c_coll.coll_bcast_module); - - + k = 0; + while( k < fh->f_size){ + if( k == 0){ + contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; + contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; + g++; + contg_groups[p].procs_per_contg_group = g; + k++; + } + else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){ + contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; + contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; + g++; + contg_groups[p].procs_per_contg_group = g; + k++; + } + else{ + p++; + g = 0; + contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1]; + contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2]; + g++; + contg_groups[p].procs_per_contg_group = g; + k++; + } + } + + *num_groups = p+1; + free (start_offsets_lens); + free (end_offsets); + return OMPI_SUCCESS; } @@ -408,105 +445,34 @@ int mca_io_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh, int z = 0; int y = 0; - int r = 0; - - MPI_Request *sendreq = NULL , *req = NULL; - - - req = (MPI_Request *)malloc (2* sizeof(MPI_Request)); - if (NULL == req) { - return OMPI_ERR_OUT_OF_RESOURCE; - } fh->f_init_num_aggrs = num_groups; fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int)); if (NULL == fh->f_init_aggr_list) { opal_output (1, "OUT OF MEMORY\n"); - free(req); return OMPI_ERR_OUT_OF_RESOURCE; } - if(OMPIO_ROOT == fh->f_rank){ - sendreq = (MPI_Request *)malloc ( 2 *fh->f_size * sizeof(MPI_Request)); - if (NULL == sendreq) { - free(req); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - for( z = 0 ;z < num_groups; z++){ - for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){ - MCA_PML_CALL(isend(&contg_groups[z].procs_per_contg_group, - 1, - MPI_INT, - contg_groups[z].procs_in_contg_group[y], - OMPIO_PROCS_PER_GROUP_TAG, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &sendreq[r++])); - - //send initial grouping distribution to all processes in the group - MCA_PML_CALL(isend(contg_groups[z].procs_in_contg_group, - contg_groups[z].procs_per_contg_group, - MPI_INT, - contg_groups[z].procs_in_contg_group[y], - OMPIO_PROCS_IN_GROUP_TAG, - MCA_PML_BASE_SEND_STANDARD, - fh->f_comm, - &sendreq[r++])); - } - } - } - - //All processes receive initial procs per group from OMPIO_ROOT - MCA_PML_CALL(irecv(&fh->f_init_procs_per_group, - 1, - MPI_INT, - OMPIO_ROOT, - OMPIO_PROCS_PER_GROUP_TAG, - fh->f_comm, - &req[0])); - - ompi_request_wait (&req[0], MPI_STATUS_IGNORE); - fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int)); - if (NULL == fh->f_init_procs_in_group) { - opal_output (1, "OUT OF MEMORY\n"); - free(req); - if (NULL != sendreq) { - free(sendreq); + for( z = 0 ;z < num_groups; z++){ + for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){ + if ( fh->f_rank == contg_groups[z].procs_in_contg_group[y] ) { + fh->f_init_procs_per_group = contg_groups[z].procs_per_contg_group; + fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int)); + if (NULL == fh->f_init_procs_in_group) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + memcpy ( fh->f_init_procs_in_group, contg_groups[z].procs_in_contg_group, + contg_groups[z].procs_per_contg_group * sizeof (int)); + + } } - return OMPI_ERR_OUT_OF_RESOURCE; - } - //All processes receive initial process distribution from OMPIO_ROOT - MCA_PML_CALL(irecv(fh->f_init_procs_in_group, - fh->f_init_procs_per_group, - MPI_INT, - OMPIO_ROOT, - OMPIO_PROCS_IN_GROUP_TAG, - fh->f_comm, - &req[1])); - - ompi_request_wait (&req[1], MPI_STATUS_IGNORE); - free (req); - if(OMPIO_ROOT == fh->f_rank){ - ompi_request_wait_all (r, sendreq, MPI_STATUSES_IGNORE); - free (sendreq); } - - /*set initial aggregator list */ - //OMPIO_ROOT broadcasts aggr list - if(OMPIO_ROOT == fh->f_rank){ - for( z = 0 ;z < num_groups; z++){ - fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0]; - } + for( z = 0 ;z < num_groups; z++){ + fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0]; } - fh->f_comm->c_coll.coll_bcast (fh->f_init_aggr_list, - num_groups, - MPI_INT, - OMPIO_ROOT, - fh->f_comm, - fh->f_comm->c_coll.coll_bcast_module); return OMPI_SUCCESS; }