From b3f59c76e151c86042811c1c5aaf75aad2432591 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Mon, 7 Aug 2017 13:39:47 -0500 Subject: [PATCH] io/ompio: new simple aggr. selection algorithm add a new aggregator selection algorithm based on the performance model described in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in Collective I/O Operations' Proceedings of the 17th IEEE/ACM Symposium on Cluster, Cloud and Grid Computing, Workshop on Theoretical Approaches to Performance Evaluation, Modeling and Simulation, 2017. Signed-off-by: Edgar Gabriel --- ompi/mca/io/ompio/io_ompio.h | 3 + ompi/mca/io/ompio/io_ompio_aggregators.c | 185 ++++++++++++++++++++--- ompi/mca/io/ompio/io_ompio_component.c | 27 ++++ 3 files changed, 195 insertions(+), 20 deletions(-) diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 417863a0bf..0268a02244 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -49,6 +49,9 @@ extern int mca_io_ompio_num_aggregators; extern int mca_io_ompio_record_offset_info; extern int mca_io_ompio_sharedfp_lazy_open; extern int mca_io_ompio_grouping_option; +extern int mca_io_ompio_max_aggregators_ratio; +extern int mca_io_ompio_aggregators_cutoff_threshold; + OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info; /* diff --git a/ompi/mca/io/ompio/io_ompio_aggregators.c b/ompi/mca/io/ompio/io_ompio_aggregators.c index 9bf568235c..1f674793c3 100644 --- a/ompi/mca/io/ompio/io_ompio_aggregators.c +++ b/ompi/mca/io/ompio/io_ompio_aggregators.c @@ -47,41 +47,115 @@ ** ** The first group functions determines the number of aggregators based on various characteristics ** -** 1. simple_grouping:aA simple heuristic based on the amount of data written and size of +** 1. simple_grouping: A simple heuristic based on the amount of data written and size of ** of the temporary buffer used by aggregator processes ** 2. fview_based_grouping: analysis the fileview to detect regular patterns ** 3. cart_based_grouping: uses a cartesian communicator to derive certain (probable) properties ** of the access pattern */ +int mca_io_base_check_params ( size_t, size_t, int, int); +static double cost_calc (int P, int P_agg, size_t Data_proc, size_t coll_buffer, int dim ); +#define DIM1 1 +#define DIM2 2 + int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, - int *num_groups, + int *num_groups_out, mca_io_ompio_contg *contg_groups) { - size_t stripe_size = (size_t) fh->f_stripe_size; int group_size = 0; int k=0, p=0, g=0; int total_procs = 0; + int num_groups=1; - if ( 0 >= fh->f_stripe_size ) { - stripe_size = OMPIO_DEFAULT_STRIPE_SIZE; - } + double time1=0.0, time2=0.0, dtime=0.0, dtime2=0.0, dtime_diff=0.0; + double dtime_threshold=0.0; + int mode=1; + int P_a, P_a_prev; - if ( 0 != fh->f_cc_size && stripe_size > fh->f_cc_size ) { - group_size = (((int)stripe_size/(int)fh->f_cc_size) > fh->f_size ) ? fh->f_size : ((int)stripe_size/(int)fh->f_cc_size); - *num_groups = fh->f_size / group_size; - } - else if ( fh->f_cc_size <= OMPIO_CONTG_FACTOR * stripe_size) { - *num_groups = fh->f_size/OMPIO_CONTG_FACTOR > 0 ? (fh->f_size/OMPIO_CONTG_FACTOR) : 1 ; - group_size = OMPIO_CONTG_FACTOR; - } - else { - *num_groups = fh->f_size; - group_size = 1; - } + /* The aggregator selection algorithm is based on the formulas described + ** in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in + ** Collective I/O operations', Proceedings of the 17th IEEE/ACM Symposium + ** on Cluster, Cloud and Grid Computing, Workshop on Theoretical + ** Approaches to Performance Evaluation, Modeling and Simulation, 2017. + ** + ** The current implementation is based on the 1-D and 2-D models derived for the even + ** file partitioning strategy in the paper. Note, that the formulas currently only model + ** the communication aspect of collective I/O operations. There are two extensions in this + ** implementation: + ** + ** 1. Since the resulting formula has an asymptotic behavior w.r.t. the + ** no. of aggregators, this version determines the no. of aggregators to + ** be used iteratively and stops increasing the no. of aggregators if the + ** benefits of increasing the aggregators is below a certain threshold + ** value relative to the last number tested. The aggresivnes of cutting of + ** the increasie in the number of aggregators is controlled by the new mca + ** parameter mca_io_ompio_aggregator_cutoff_threshold. Lower values for + ** this parameter will lead to higher number of aggregators (useful e.g + ** for PVFS2 and GPFS file systems), while higher number will lead to + ** lower no. of aggregators (useful for regular UNIX or NFS file systems). + ** + ** 2. The algorithm further caps the maximum no. of aggregators used to not exceed + ** (no. of processes / mca_io_ompio_max_aggregators_ratio), i.e. a higher value + ** for mca_io_ompio_max_aggregators will decrease the maximum number of aggregators + ** allowed for the given no. of processes. + */ + dtime_threshold = (double) mca_io_ompio_aggregators_cutoff_threshold / 100.0; + if ( fh->f_rank == 0 ) printf ("%d %lf\n", mca_io_ompio_aggregators_cutoff_threshold, dtime_threshold ); - for ( k=0, p=0; p<*num_groups; p++ ) { - if ( p == (*num_groups - 1) ) { + /* Determine whether to use the formula for 1-D or 2-D data decomposition. Anything + ** that is not 1-D is assumed to be 2-D in this version + */ + mode = ( fh->f_cc_size == fh->f_view_size ) ? 1 : 2; + + for ( P_a = 1; P_a <= fh->f_size; P_a *= 2 ) { + time1 = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode ); + if ( P_a != 1 ) { + dtime = (time2 - time1) / time2; + dtime_diff = fabs(dtime2 - dtime); +#ifdef OMPIO_DEBUG + printf(" d_p = %ld P_a = %d time1 = %lf dtime = %lf dtime_diff=%lf\n", fh->f_view_size, P_a, time1, dtime, dtime_diff ); +#endif + if ( dtime_diff < dtime_threshold ) { +#ifdef OMPIO_DEBUG + printf(" For P=%d d_p=%ld b_c=%d chosen P_a = %d \n", fh->f_size, fh->f_view_size, fh->f_bytes_per_agg, P_a_prev); +#endif + num_groups = P_a_prev; + break; + } + } + else { + time2 = time1; + } + dtime2 = dtime; + P_a_prev = P_a; + } + +#ifdef OMPIO_DEBUG + if ( fh->f_rank == 0 ) { + if ( mca_io_base_check_params ( fh->f_view_size, fh->f_cc_size, fh->f_bytes_per_agg, -1 ) ) { + if ( fh->f_view_size == MCA_IO_DEFAULT_FILE_VIEW_SIZE && MCA_IO_DEFAULT_FILE_VIEW_SIZE == fh->f_cc_size ) { + /* This is the default file view, not interested in it */ + } + else { + printf("fstype=%d view_size=%ld cc_size=%ld stripe_size=%ld\n", fh->f_fstype, fh->f_view_size, + fh->f_cc_size, fh->f_stripe_size); + } + } + } +#endif + + /* Cap the maximum number of aggregators.*/ + if ( num_groups > (fh->f_size/mca_io_ompio_max_aggregators_ratio)) { + num_groups = (fh->f_size/mca_io_ompio_max_aggregators_ratio); + } + if ( 1 >= num_groups ) { + num_groups = 1; + } + group_size = fh->f_size / num_groups; + + for ( k=0, p=0; pf_size - total_procs; } else { @@ -93,6 +167,8 @@ int mca_io_ompio_simple_grouping(mca_io_ompio_file_t *fh, k++; } } + + *num_groups_out = num_groups; return OMPI_SUCCESS; } @@ -1296,3 +1372,72 @@ exit: } +static double cost_calc (int P, int P_a, size_t d_p, size_t b_c, int dim ) +{ + int n_as, m_s, n_s; + int n_ar; + double t_send, t_recv, t_tot; + + /* LogGP parameters based on DDR InfiniBand values */ + double L=.00000184; + double o=.00000149; + double g=.0000119; + double G=.00000000067; + + long file_domain = (P * d_p) / P_a; + int n_r = ceil ((float)file_domain/(float) b_c); +// printf("p=%d, p_a =%d, d_p= %d, b_c=%d, iter=%d\n", +// P, P_a, d_p, b_c, iteration); + switch (dim) { + case DIM1: + { + if( d_p > b_c ){ + //printf("case 1\n"); + n_ar = 1; + n_as = 1; + m_s = b_c; + n_s = ceil((float)d_p/(float)b_c); + } + else { + n_ar = ceil((float)b_c/(float)d_p); + n_as = 1; + m_s = d_p; + n_s = 1; + } + break; + } + case DIM2: + { + int P_x, P_y, c; + + P_x = P_y = (int) sqrt(P); + c = ceil((float)P_a / (float)P_x); + + n_ar = P_y; + n_as = c; + if ( d_p > (P_a*b_c/P )) { + m_s = (int)fmin(b_c / P_y, d_p); + } + else { + m_s = (int)fmin(d_p * P_x / P_a, d_p); + } + break; + } + default : + printf("stop putting random values\n"); + break; + } + + n_s = ceil(((float) d_p / (float)(n_as * m_s))); +// printf("n_r=%d \t n_ar = %d \t n_as =%d \t n_s=%d \t m_s= %d\n",n_r, n_ar, n_as, n_s, m_s); + + if(m_s < 33554432) + g = .00000108; + + t_send = n_s * (L + 2 * o + (n_as -1) * g + (m_s - 1) * n_as * G); + t_recv= n_r * (L + 2 * o + (n_ar -1) * g + (m_s - 1) * n_ar * G);; + t_tot = t_send + t_recv; + +// printf("%lf\t%lf\t%lf\n", t_send, t_recv, t_tot); + return t_tot; +} diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index e0b89ab008..5a93a5f354 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -39,6 +39,8 @@ int mca_io_ompio_num_aggregators = -1; int mca_io_ompio_record_offset_info = 0; int mca_io_ompio_coll_timing_info = 0; int mca_io_ompio_sharedfp_lazy_open = 0; +int mca_io_ompio_max_aggregators_ratio=8; +int mca_io_ompio_aggregators_cutoff_threshold=3; int mca_io_ompio_grouping_option=5; @@ -216,6 +218,31 @@ static int register_component(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_grouping_option); + mca_io_ompio_max_aggregators_ratio = 8; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "max_aggregators_ratio", + "Maximum number of processes that can be an aggregator expressed as " + "the ratio to the number of process used to open the file" + " i.e 1 out of n processes can be an aggregator, with n being specified" + " by this mca parameter.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_io_ompio_max_aggregators_ratio); + + + mca_io_ompio_aggregators_cutoff_threshold=3; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "aggregators_cutoff_threshold", + "Relativ cutoff threshold for incrementing the number of aggregators " + "in the simple aggregator selection algorithm (5). Lower value " + "for this parameter will lead to higher no. of aggregators.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_io_ompio_aggregators_cutoff_threshold); + + return OMPI_SUCCESS; }