From 1f151be6d244ff6d88590b31a1316567d00f6a46 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 1 Dec 2017 10:00:23 -0600 Subject: [PATCH] io/ompio: introduce a new function to retrieve mca parameter values ompio has the unique problem, that mca parameters set in the io/ompio component have to be accessible from other frameworks as well. This is mostly done to avoid a replication in the parameter names and to reduce the number of mca parameters that and end-user has to worry about. This commit introduces a generic function to retrieve ompio mca parameters, the function pointer is stored on the file handle. It replaces two functions that used the same concept already for one parameter each. Signed-off-by: Edgar Gabriel --- .../mca/common/ompio/common_ompio_file_open.c | 3 +- .../dynamic/fcoll_dynamic_file_read_all.c | 13 +++++-- .../dynamic/fcoll_dynamic_file_write_all.c | 12 +++++-- .../fcoll_dynamic_gen2_file_read_all.c | 12 +++++-- .../fcoll_dynamic_gen2_file_write_all.c | 12 +++++-- .../fcoll/static/fcoll_static_file_read_all.c | 17 ++++++--- .../static/fcoll_static_file_write_all.c | 17 ++++++--- .../two_phase/fcoll_two_phase_file_read_all.c | 12 +++++-- .../fcoll_two_phase_file_write_all.c | 23 +++++++++--- ompi/mca/io/ompio/io_ompio.c | 36 ++++++++++++++----- ompi/mca/io/ompio/io_ompio.h | 9 ++--- 11 files changed, 127 insertions(+), 39 deletions(-) diff --git a/ompi/mca/common/ompio/common_ompio_file_open.c b/ompi/mca/common/ompio/common_ompio_file_open.c index 5a026e8c80..a67afe2d95 100644 --- a/ompi/mca/common/ompio/common_ompio_file_open.c +++ b/ompi/mca/common/ompio/common_ompio_file_open.c @@ -106,8 +106,7 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm, ompio_fh->f_decode_datatype=ompi_io_ompio_decode_datatype; ompio_fh->f_generate_current_file_view=ompi_io_ompio_generate_current_file_view; - ompio_fh->f_get_num_aggregators=mca_io_ompio_get_num_aggregators; - ompio_fh->f_get_bytes_per_agg=mca_io_ompio_get_bytes_per_agg; + ompio_fh->f_get_mca_parameter_value=mca_io_ompio_get_mca_parameter_value; ompio_fh->f_set_aggregator_props=mca_io_ompio_set_aggregator_props; /* This fix is needed for data seiving to work with diff --git a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c index bfaad026a4..608d231f70 100644 --- a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c +++ b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_read_all.c @@ -144,7 +144,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators ( &dynamic_num_io_procs); + dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == dynamic_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, dynamic_num_io_procs, max_data); @@ -333,7 +337,12 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh, *** 6. Determine the number of cycles required to execute this *** operation *************************************************************/ - fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } + cycles = ceil((double)total_bytes/bytes_per_cycle); if ( my_aggregator == fh->f_rank) { diff --git a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c index bc94068a14..86f30d7df1 100644 --- a/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c +++ b/ompi/mca/fcoll/dynamic/fcoll_dynamic_file_write_all.c @@ -146,7 +146,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators ( &dynamic_num_io_procs ); + dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == dynamic_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, dynamic_num_io_procs, max_data); @@ -357,7 +361,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh, *** 6. Determine the number of cycles required to execute this *** operation *************************************************************/ - fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle ); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } cycles = ceil((double)total_bytes/bytes_per_cycle); if (my_aggregator == fh->f_rank) { diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c index b4a5492db2..c80cbd36dc 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_read_all.c @@ -144,7 +144,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs); + dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, dynamic_gen2_num_io_procs, max_data); @@ -333,7 +337,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh, *** 6. Determine the number of cycles required to execute this *** operation *************************************************************/ - fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } cycles = ceil((double)total_bytes/bytes_per_cycle); if ( my_aggregator == fh->f_rank) { diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index a72817d7a5..f8e328a566 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -160,7 +160,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, /************************************************************************** ** 1. In case the data is not contigous in memory, decode it into an iovec **************************************************************************/ - fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle ); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ bytes_per_cycle =bytes_per_cycle/2; @@ -188,7 +192,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh, dynamic_gen2_num_io_procs = fh->f_stripe_count; } else { - fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs ); + dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } } diff --git a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c index 6afed7f29f..c6410c7209 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_read_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_read_all.c @@ -82,7 +82,8 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, MPI_Aint **displs_per_process=NULL, global_iov_count=0, global_count=0; MPI_Aint *memory_displacements=NULL; int bytes_to_read_in_cycle=0; - size_t max_data=0, bytes_per_cycle=0; + size_t max_data=0; + MPI_Aint bytes_per_cycle=0; uint32_t iov_count=0, iov_index=0; struct iovec *decoded_iov=NULL, *iov=NULL; mca_fcoll_static_local_io_array *local_iov_array=NULL, *global_iov_array=NULL; @@ -143,7 +144,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, } - fh->f_get_num_aggregators ( &static_num_io_procs ); + static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == static_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh, static_num_io_procs, max_data); @@ -210,7 +215,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, ompi_datatype_commit (&io_array_type); /* #########################################################*/ - fh->f_get_bytes_per_agg ( (int*) &bytes_per_cycle); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } local_cycles = ceil((double)max_data*fh->f_procs_per_group/bytes_per_cycle); #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN @@ -483,7 +492,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh, if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group))) { bytes_to_read_in_cycle = max_data - position; } - else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) { + else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) { bytes_to_read_in_cycle = max_data; } else { diff --git a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c index 99255b0589..0f16422129 100644 --- a/ompi/mca/fcoll/static/fcoll_static_file_write_all.c +++ b/ompi/mca/fcoll/static/fcoll_static_file_write_all.c @@ -69,7 +69,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, - size_t max_data = 0, bytes_per_cycle=0; + size_t max_data = 0; + MPI_Aint bytes_per_cycle=0; struct iovec *iov=NULL, *decoded_iov=NULL; uint32_t iov_count=0, iov_index=0; int i=0,j=0,l=0, temp_index; @@ -144,7 +145,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators ( & static_num_io_procs ); + static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == static_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh, static_num_io_procs, max_data); @@ -202,7 +207,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, } - fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle); + bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == bytes_per_cycle ) { + ret = OMPI_ERROR; + goto exit; + } local_cycles = ceil( ((double)max_data*fh->f_procs_per_group) /bytes_per_cycle); #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN @@ -475,7 +484,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh, if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group)) ) { bytes_to_write_in_cycle = max_data - total_bytes_written; } - else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) { + else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) { bytes_to_write_in_cycle = max_data; } else { diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c index 70353179b3..22bb1baa36 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_read_all.c @@ -186,7 +186,11 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators (&two_phase_num_io_procs); + two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == two_phase_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } if (-1 == two_phase_num_io_procs ){ ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh, two_phase_num_io_procs, @@ -575,7 +579,11 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh, } } - fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size); + two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) { + ret = OMPI_ERROR; + goto exit; + } ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/ two_phase_cycle_buffer_size); diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c index 8c62099ccc..b06b4e0e0c 100644 --- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c +++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c @@ -222,7 +222,11 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh, status->_ucount = max_data; } - fh->f_get_num_aggregators ( &two_phase_num_io_procs ); + two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if ( OMPI_ERR_MAX == two_phase_num_io_procs ) { + ret = OMPI_ERROR; + goto exit; + } if(-1 == two_phase_num_io_procs){ ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh, two_phase_num_io_procs, @@ -642,7 +646,11 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh, } } - fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size ); + two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg")); + if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) { + ret = OMPI_ERROR; + goto exit; + } ntimes = (int) ((end_loc - st_loc + two_phase_cycle_buffer_size)/two_phase_cycle_buffer_size); if ((st_loc == -1) && (end_loc == -1)) { @@ -1073,13 +1081,18 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh, fh->f_io_array[0].length = size; fh->f_io_array[0].memory_address = write_buf; if (fh->f_num_of_io_entries){ - if ( fh->f_amode & MPI_MODE_WRONLY && - !mca_io_ompio_overwrite_amode ){ + int amode_overwrite; + amode_overwrite = fh->f_get_mca_parameter_value ("overwrite_amode", strlen("overwrite_amode")); + if ( OMPI_ERR_MAX == amode_overwrite ) { + ret = OMPI_ERROR; + goto exit; + } + if ( fh->f_amode & MPI_MODE_WRONLY && !amode_overwrite ){ if ( 0 == fh->f_rank ) { printf("\n File not opened in RDWR mode, can not continue." "\n To resolve this problem, you can either \n" " a. open the file with MPI_MODE_RDWR instead of MPI_MODE_WRONLY\n" - " b. ensure that the mca parameter mca_io_ompio_amode_overwrite is set to 1\n" + " b. ensure that the mca parameter mca_io_ompio_overwrite_amode is set to 1\n" " c. use an fcoll component that does not use data sieving (e.g. dynamic)\n"); } ret = MPI_ERR_FILE; diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 6d3b5726e7..05188ad764 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -640,18 +640,38 @@ int ompi_io_ompio_sort_offlen (mca_io_ompio_offlen_array_t *io_array, } -void mca_io_ompio_get_num_aggregators ( int *num_aggregators) +int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length ) { - *num_aggregators = mca_io_ompio_num_aggregators; - return; -} + if ( !strncmp ( mca_parameter_name, "num_aggregators", name_length )) { + return mca_io_ompio_num_aggregators; + } + else if ( !strncmp ( mca_parameter_name, "bytes_per_agg", name_length )) { + return mca_io_ompio_bytes_per_agg; + } + else if ( !strncmp ( mca_parameter_name, "overwrite_amode", name_length )) { + return mca_io_ompio_overwrite_amode; + } + else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) { + return mca_io_ompio_cycle_buffer_size; + } + else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) { + return mca_io_ompio_max_aggregators_ratio; + } + else if ( !strncmp ( mca_parameter_name, "aggregators_cutoff_threshold", name_length )) { + return mca_io_ompio_aggregators_cutoff_threshold; + } + else { + opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name"); + } -void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg) -{ - *bytes_per_agg = mca_io_ompio_bytes_per_agg; - return; + /* Using here OMPI_ERROR_MAX instead of OMPI_ERROR, since -1 (which is OMPI_ERROR) + ** is a valid value for some mca parameters, indicating that the user did not set + ** that parameter value + */ + return OMPI_ERR_MAX; } + diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index ccb97f5b1b..4e01b30b8f 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -196,8 +196,7 @@ typedef int (*mca_io_ompio_generate_current_file_view_fn_t) (struct mca_io_ompio /* functions to retrieve the number of aggregators and the size of the temporary buffer on aggregators from the fcoll modules */ -typedef void (*mca_io_ompio_get_num_aggregators_fn_t) ( int *num_aggregators); -typedef void (*mca_io_ompio_get_bytes_per_agg_fn_t) ( int *bytes_per_agg); +typedef int (*mca_io_ompio_get_mca_parameter_value_fn_t) ( char *mca_parameter_name, int name_length ); typedef int (*mca_io_ompio_set_aggregator_props_fn_t) (struct mca_io_ompio_file_t *fh, int num_aggregators, size_t bytes_per_proc); @@ -291,8 +290,7 @@ struct mca_io_ompio_file_t { mca_io_ompio_decode_datatype_fn_t f_decode_datatype; mca_io_ompio_generate_current_file_view_fn_t f_generate_current_file_view; - mca_io_ompio_get_num_aggregators_fn_t f_get_num_aggregators; - mca_io_ompio_get_bytes_per_agg_fn_t f_get_bytes_per_agg; + mca_io_ompio_get_mca_parameter_value_fn_t f_get_mca_parameter_value; mca_io_ompio_set_aggregator_props_fn_t f_set_aggregator_props; }; typedef struct mca_io_ompio_file_t mca_io_ompio_file_t; @@ -308,8 +306,7 @@ typedef struct mca_io_ompio_data_t mca_io_ompio_data_t; /* functions to retrieve the number of aggregators and the size of the temporary buffer on aggregators from the fcoll modules */ -OMPI_DECLSPEC void mca_io_ompio_get_num_aggregators ( int *num_aggregators); -OMPI_DECLSPEC void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg); +OMPI_DECLSPEC int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length); /* * Function that takes in a datatype and buffer, and decodes that datatype