io/ompio: introduce a new function to retrieve mca parameter values
ompio has the unique problem, that mca parameters set in the io/ompio component have to be accessible from other frameworks as well. This is mostly done to avoid a replication in the parameter names and to reduce the number of mca parameters that and end-user has to worry about. This commit introduces a generic function to retrieve ompio mca parameters, the function pointer is stored on the file handle. It replaces two functions that used the same concept already for one parameter each. Signed-off-by: Edgar Gabriel <egabriel@central.uh.edu>
Этот коммит содержится в:
родитель
a4755b694b
Коммит
1f151be6d2
@ -106,8 +106,7 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm,
|
||||
ompio_fh->f_decode_datatype=ompi_io_ompio_decode_datatype;
|
||||
ompio_fh->f_generate_current_file_view=ompi_io_ompio_generate_current_file_view;
|
||||
|
||||
ompio_fh->f_get_num_aggregators=mca_io_ompio_get_num_aggregators;
|
||||
ompio_fh->f_get_bytes_per_agg=mca_io_ompio_get_bytes_per_agg;
|
||||
ompio_fh->f_get_mca_parameter_value=mca_io_ompio_get_mca_parameter_value;
|
||||
ompio_fh->f_set_aggregator_props=mca_io_ompio_set_aggregator_props;
|
||||
|
||||
/* This fix is needed for data seiving to work with
|
||||
|
@ -144,7 +144,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_num_io_procs);
|
||||
dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_num_io_procs,
|
||||
max_data);
|
||||
@ -333,7 +337,12 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if ( my_aggregator == fh->f_rank) {
|
||||
|
@ -146,7 +146,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_num_io_procs );
|
||||
dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_num_io_procs,
|
||||
max_data);
|
||||
@ -357,7 +361,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
|
@ -144,7 +144,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs);
|
||||
dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_gen2_num_io_procs,
|
||||
max_data);
|
||||
@ -333,7 +337,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if ( my_aggregator == fh->f_rank) {
|
||||
|
@ -160,7 +160,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
||||
/**************************************************************************
|
||||
** 1. In case the data is not contigous in memory, decode it into an iovec
|
||||
**************************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
|
||||
the user requested */
|
||||
bytes_per_cycle =bytes_per_cycle/2;
|
||||
@ -188,7 +192,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
||||
dynamic_gen2_num_io_procs = fh->f_stripe_count;
|
||||
}
|
||||
else {
|
||||
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs );
|
||||
dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -82,7 +82,8 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
MPI_Aint **displs_per_process=NULL, global_iov_count=0, global_count=0;
|
||||
MPI_Aint *memory_displacements=NULL;
|
||||
int bytes_to_read_in_cycle=0;
|
||||
size_t max_data=0, bytes_per_cycle=0;
|
||||
size_t max_data=0;
|
||||
MPI_Aint bytes_per_cycle=0;
|
||||
uint32_t iov_count=0, iov_index=0;
|
||||
struct iovec *decoded_iov=NULL, *iov=NULL;
|
||||
mca_fcoll_static_local_io_array *local_iov_array=NULL, *global_iov_array=NULL;
|
||||
@ -143,7 +144,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
|
||||
fh->f_get_num_aggregators ( &static_num_io_procs );
|
||||
static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == static_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
static_num_io_procs,
|
||||
max_data);
|
||||
@ -210,7 +215,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
ompi_datatype_commit (&io_array_type);
|
||||
|
||||
/* #########################################################*/
|
||||
fh->f_get_bytes_per_agg ( (int*) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
local_cycles = ceil((double)max_data*fh->f_procs_per_group/bytes_per_cycle);
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
@ -483,7 +492,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group))) {
|
||||
bytes_to_read_in_cycle = max_data - position;
|
||||
}
|
||||
else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) {
|
||||
else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) {
|
||||
bytes_to_read_in_cycle = max_data;
|
||||
}
|
||||
else {
|
||||
|
@ -69,7 +69,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
|
||||
|
||||
size_t max_data = 0, bytes_per_cycle=0;
|
||||
size_t max_data = 0;
|
||||
MPI_Aint bytes_per_cycle=0;
|
||||
struct iovec *iov=NULL, *decoded_iov=NULL;
|
||||
uint32_t iov_count=0, iov_index=0;
|
||||
int i=0,j=0,l=0, temp_index;
|
||||
@ -144,7 +145,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( & static_num_io_procs );
|
||||
static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == static_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
static_num_io_procs,
|
||||
max_data);
|
||||
@ -202,7 +207,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
local_cycles = ceil( ((double)max_data*fh->f_procs_per_group) /bytes_per_cycle);
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
@ -475,7 +484,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group)) ) {
|
||||
bytes_to_write_in_cycle = max_data - total_bytes_written;
|
||||
}
|
||||
else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) {
|
||||
else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) {
|
||||
bytes_to_write_in_cycle = max_data;
|
||||
}
|
||||
else {
|
||||
|
@ -186,7 +186,11 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators (&two_phase_num_io_procs);
|
||||
two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if (-1 == two_phase_num_io_procs ){
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
two_phase_num_io_procs,
|
||||
@ -575,7 +579,11 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size);
|
||||
two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
|
||||
two_phase_cycle_buffer_size);
|
||||
|
||||
|
@ -222,7 +222,11 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &two_phase_num_io_procs );
|
||||
two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if(-1 == two_phase_num_io_procs){
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
two_phase_num_io_procs,
|
||||
@ -642,7 +646,11 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size );
|
||||
two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ntimes = (int) ((end_loc - st_loc + two_phase_cycle_buffer_size)/two_phase_cycle_buffer_size);
|
||||
|
||||
if ((st_loc == -1) && (end_loc == -1)) {
|
||||
@ -1073,13 +1081,18 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
||||
fh->f_io_array[0].length = size;
|
||||
fh->f_io_array[0].memory_address = write_buf;
|
||||
if (fh->f_num_of_io_entries){
|
||||
if ( fh->f_amode & MPI_MODE_WRONLY &&
|
||||
!mca_io_ompio_overwrite_amode ){
|
||||
int amode_overwrite;
|
||||
amode_overwrite = fh->f_get_mca_parameter_value ("overwrite_amode", strlen("overwrite_amode"));
|
||||
if ( OMPI_ERR_MAX == amode_overwrite ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if ( fh->f_amode & MPI_MODE_WRONLY && !amode_overwrite ){
|
||||
if ( 0 == fh->f_rank ) {
|
||||
printf("\n File not opened in RDWR mode, can not continue."
|
||||
"\n To resolve this problem, you can either \n"
|
||||
" a. open the file with MPI_MODE_RDWR instead of MPI_MODE_WRONLY\n"
|
||||
" b. ensure that the mca parameter mca_io_ompio_amode_overwrite is set to 1\n"
|
||||
" b. ensure that the mca parameter mca_io_ompio_overwrite_amode is set to 1\n"
|
||||
" c. use an fcoll component that does not use data sieving (e.g. dynamic)\n");
|
||||
}
|
||||
ret = MPI_ERR_FILE;
|
||||
|
@ -640,18 +640,38 @@ int ompi_io_ompio_sort_offlen (mca_io_ompio_offlen_array_t *io_array,
|
||||
}
|
||||
|
||||
|
||||
void mca_io_ompio_get_num_aggregators ( int *num_aggregators)
|
||||
int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length )
|
||||
{
|
||||
*num_aggregators = mca_io_ompio_num_aggregators;
|
||||
return;
|
||||
}
|
||||
if ( !strncmp ( mca_parameter_name, "num_aggregators", name_length )) {
|
||||
return mca_io_ompio_num_aggregators;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "bytes_per_agg", name_length )) {
|
||||
return mca_io_ompio_bytes_per_agg;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "overwrite_amode", name_length )) {
|
||||
return mca_io_ompio_overwrite_amode;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) {
|
||||
return mca_io_ompio_cycle_buffer_size;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) {
|
||||
return mca_io_ompio_max_aggregators_ratio;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "aggregators_cutoff_threshold", name_length )) {
|
||||
return mca_io_ompio_aggregators_cutoff_threshold;
|
||||
}
|
||||
else {
|
||||
opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name");
|
||||
}
|
||||
|
||||
void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg)
|
||||
{
|
||||
*bytes_per_agg = mca_io_ompio_bytes_per_agg;
|
||||
return;
|
||||
/* Using here OMPI_ERROR_MAX instead of OMPI_ERROR, since -1 (which is OMPI_ERROR)
|
||||
** is a valid value for some mca parameters, indicating that the user did not set
|
||||
** that parameter value
|
||||
*/
|
||||
return OMPI_ERR_MAX;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -196,8 +196,7 @@ typedef int (*mca_io_ompio_generate_current_file_view_fn_t) (struct mca_io_ompio
|
||||
|
||||
/* functions to retrieve the number of aggregators and the size of the
|
||||
temporary buffer on aggregators from the fcoll modules */
|
||||
typedef void (*mca_io_ompio_get_num_aggregators_fn_t) ( int *num_aggregators);
|
||||
typedef void (*mca_io_ompio_get_bytes_per_agg_fn_t) ( int *bytes_per_agg);
|
||||
typedef int (*mca_io_ompio_get_mca_parameter_value_fn_t) ( char *mca_parameter_name, int name_length );
|
||||
typedef int (*mca_io_ompio_set_aggregator_props_fn_t) (struct mca_io_ompio_file_t *fh,
|
||||
int num_aggregators,
|
||||
size_t bytes_per_proc);
|
||||
@ -291,8 +290,7 @@ struct mca_io_ompio_file_t {
|
||||
mca_io_ompio_decode_datatype_fn_t f_decode_datatype;
|
||||
mca_io_ompio_generate_current_file_view_fn_t f_generate_current_file_view;
|
||||
|
||||
mca_io_ompio_get_num_aggregators_fn_t f_get_num_aggregators;
|
||||
mca_io_ompio_get_bytes_per_agg_fn_t f_get_bytes_per_agg;
|
||||
mca_io_ompio_get_mca_parameter_value_fn_t f_get_mca_parameter_value;
|
||||
mca_io_ompio_set_aggregator_props_fn_t f_set_aggregator_props;
|
||||
};
|
||||
typedef struct mca_io_ompio_file_t mca_io_ompio_file_t;
|
||||
@ -308,8 +306,7 @@ typedef struct mca_io_ompio_data_t mca_io_ompio_data_t;
|
||||
|
||||
/* functions to retrieve the number of aggregators and the size of the
|
||||
temporary buffer on aggregators from the fcoll modules */
|
||||
OMPI_DECLSPEC void mca_io_ompio_get_num_aggregators ( int *num_aggregators);
|
||||
OMPI_DECLSPEC void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg);
|
||||
OMPI_DECLSPEC int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length);
|
||||
|
||||
/*
|
||||
* Function that takes in a datatype and buffer, and decodes that datatype
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user