Merge pull request #4559 from edgargabriel/topic/disable-amode-overwrite
io/ompio: introduce a new function to retrieve mca parameter values
Этот коммит содержится в:
Коммит
710fb72afa
@ -106,8 +106,7 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm,
|
||||
ompio_fh->f_decode_datatype=ompi_io_ompio_decode_datatype;
|
||||
ompio_fh->f_generate_current_file_view=ompi_io_ompio_generate_current_file_view;
|
||||
|
||||
ompio_fh->f_get_num_aggregators=mca_io_ompio_get_num_aggregators;
|
||||
ompio_fh->f_get_bytes_per_agg=mca_io_ompio_get_bytes_per_agg;
|
||||
ompio_fh->f_get_mca_parameter_value=mca_io_ompio_get_mca_parameter_value;
|
||||
ompio_fh->f_set_aggregator_props=mca_io_ompio_set_aggregator_props;
|
||||
|
||||
/* This fix is needed for data seiving to work with
|
||||
|
@ -144,7 +144,11 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_num_io_procs);
|
||||
dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_num_io_procs,
|
||||
max_data);
|
||||
@ -333,7 +337,12 @@ mca_fcoll_dynamic_file_read_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if ( my_aggregator == fh->f_rank) {
|
||||
|
@ -146,7 +146,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_num_io_procs );
|
||||
dynamic_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_num_io_procs,
|
||||
max_data);
|
||||
@ -357,7 +361,11 @@ mca_fcoll_dynamic_file_write_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if (my_aggregator == fh->f_rank) {
|
||||
|
@ -144,7 +144,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs);
|
||||
dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
dynamic_gen2_num_io_procs,
|
||||
max_data);
|
||||
@ -333,7 +337,11 @@ mca_fcoll_dynamic_gen2_file_read_all (mca_io_ompio_file_t *fh,
|
||||
*** 6. Determine the number of cycles required to execute this
|
||||
*** operation
|
||||
*************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
cycles = ceil((double)total_bytes/bytes_per_cycle);
|
||||
|
||||
if ( my_aggregator == fh->f_rank) {
|
||||
|
@ -160,7 +160,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
||||
/**************************************************************************
|
||||
** 1. In case the data is not contigous in memory, decode it into an iovec
|
||||
**************************************************************************/
|
||||
fh->f_get_bytes_per_agg ( (int *)&bytes_per_cycle );
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
|
||||
the user requested */
|
||||
bytes_per_cycle =bytes_per_cycle/2;
|
||||
@ -188,7 +192,11 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
|
||||
dynamic_gen2_num_io_procs = fh->f_stripe_count;
|
||||
}
|
||||
else {
|
||||
fh->f_get_num_aggregators ( &dynamic_gen2_num_io_procs );
|
||||
dynamic_gen2_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == dynamic_gen2_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -82,7 +82,8 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
MPI_Aint **displs_per_process=NULL, global_iov_count=0, global_count=0;
|
||||
MPI_Aint *memory_displacements=NULL;
|
||||
int bytes_to_read_in_cycle=0;
|
||||
size_t max_data=0, bytes_per_cycle=0;
|
||||
size_t max_data=0;
|
||||
MPI_Aint bytes_per_cycle=0;
|
||||
uint32_t iov_count=0, iov_index=0;
|
||||
struct iovec *decoded_iov=NULL, *iov=NULL;
|
||||
mca_fcoll_static_local_io_array *local_iov_array=NULL, *global_iov_array=NULL;
|
||||
@ -143,7 +144,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
}
|
||||
|
||||
|
||||
fh->f_get_num_aggregators ( &static_num_io_procs );
|
||||
static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == static_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *) fh,
|
||||
static_num_io_procs,
|
||||
max_data);
|
||||
@ -210,7 +215,11 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
ompi_datatype_commit (&io_array_type);
|
||||
|
||||
/* #########################################################*/
|
||||
fh->f_get_bytes_per_agg ( (int*) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
local_cycles = ceil((double)max_data*fh->f_procs_per_group/bytes_per_cycle);
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
@ -483,7 +492,7 @@ mca_fcoll_static_file_read_all (mca_io_ompio_file_t *fh,
|
||||
if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group))) {
|
||||
bytes_to_read_in_cycle = max_data - position;
|
||||
}
|
||||
else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) {
|
||||
else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) {
|
||||
bytes_to_read_in_cycle = max_data;
|
||||
}
|
||||
else {
|
||||
|
@ -69,7 +69,8 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
|
||||
|
||||
size_t max_data = 0, bytes_per_cycle=0;
|
||||
size_t max_data = 0;
|
||||
MPI_Aint bytes_per_cycle=0;
|
||||
struct iovec *iov=NULL, *decoded_iov=NULL;
|
||||
uint32_t iov_count=0, iov_index=0;
|
||||
int i=0,j=0,l=0, temp_index;
|
||||
@ -144,7 +145,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( & static_num_io_procs );
|
||||
static_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == static_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
static_num_io_procs,
|
||||
max_data);
|
||||
@ -202,7 +207,11 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( (int *) &bytes_per_cycle);
|
||||
bytes_per_cycle = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == bytes_per_cycle ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
local_cycles = ceil( ((double)max_data*fh->f_procs_per_group) /bytes_per_cycle);
|
||||
|
||||
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
|
||||
@ -475,7 +484,7 @@ mca_fcoll_static_file_write_all (mca_io_ompio_file_t *fh,
|
||||
if ((index == local_cycles-1) && (max_data % (bytes_per_cycle/fh->f_procs_per_group)) ) {
|
||||
bytes_to_write_in_cycle = max_data - total_bytes_written;
|
||||
}
|
||||
else if (max_data <= bytes_per_cycle/fh->f_procs_per_group) {
|
||||
else if (max_data <= (size_t) (bytes_per_cycle/fh->f_procs_per_group)) {
|
||||
bytes_to_write_in_cycle = max_data;
|
||||
}
|
||||
else {
|
||||
|
@ -186,7 +186,11 @@ mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators (&two_phase_num_io_procs);
|
||||
two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if (-1 == two_phase_num_io_procs ){
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
two_phase_num_io_procs,
|
||||
@ -575,7 +579,11 @@ static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size);
|
||||
two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
|
||||
two_phase_cycle_buffer_size);
|
||||
|
||||
|
@ -222,7 +222,11 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
|
||||
status->_ucount = max_data;
|
||||
}
|
||||
|
||||
fh->f_get_num_aggregators ( &two_phase_num_io_procs );
|
||||
two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
|
||||
if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if(-1 == two_phase_num_io_procs){
|
||||
ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
|
||||
two_phase_num_io_procs,
|
||||
@ -642,7 +646,11 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
|
||||
}
|
||||
}
|
||||
|
||||
fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size );
|
||||
two_phase_cycle_buffer_size = fh->f_get_mca_parameter_value ("bytes_per_agg", strlen ("bytes_per_agg"));
|
||||
if ( OMPI_ERR_MAX == two_phase_cycle_buffer_size ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
ntimes = (int) ((end_loc - st_loc + two_phase_cycle_buffer_size)/two_phase_cycle_buffer_size);
|
||||
|
||||
if ((st_loc == -1) && (end_loc == -1)) {
|
||||
@ -1073,13 +1081,18 @@ static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
|
||||
fh->f_io_array[0].length = size;
|
||||
fh->f_io_array[0].memory_address = write_buf;
|
||||
if (fh->f_num_of_io_entries){
|
||||
if ( fh->f_amode & MPI_MODE_WRONLY &&
|
||||
!mca_io_ompio_overwrite_amode ){
|
||||
int amode_overwrite;
|
||||
amode_overwrite = fh->f_get_mca_parameter_value ("overwrite_amode", strlen("overwrite_amode"));
|
||||
if ( OMPI_ERR_MAX == amode_overwrite ) {
|
||||
ret = OMPI_ERROR;
|
||||
goto exit;
|
||||
}
|
||||
if ( fh->f_amode & MPI_MODE_WRONLY && !amode_overwrite ){
|
||||
if ( 0 == fh->f_rank ) {
|
||||
printf("\n File not opened in RDWR mode, can not continue."
|
||||
"\n To resolve this problem, you can either \n"
|
||||
" a. open the file with MPI_MODE_RDWR instead of MPI_MODE_WRONLY\n"
|
||||
" b. ensure that the mca parameter mca_io_ompio_amode_overwrite is set to 1\n"
|
||||
" b. ensure that the mca parameter mca_io_ompio_overwrite_amode is set to 1\n"
|
||||
" c. use an fcoll component that does not use data sieving (e.g. dynamic)\n");
|
||||
}
|
||||
ret = MPI_ERR_FILE;
|
||||
|
@ -640,18 +640,38 @@ int ompi_io_ompio_sort_offlen (mca_io_ompio_offlen_array_t *io_array,
|
||||
}
|
||||
|
||||
|
||||
void mca_io_ompio_get_num_aggregators ( int *num_aggregators)
|
||||
int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length )
|
||||
{
|
||||
*num_aggregators = mca_io_ompio_num_aggregators;
|
||||
return;
|
||||
}
|
||||
if ( !strncmp ( mca_parameter_name, "num_aggregators", name_length )) {
|
||||
return mca_io_ompio_num_aggregators;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "bytes_per_agg", name_length )) {
|
||||
return mca_io_ompio_bytes_per_agg;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "overwrite_amode", name_length )) {
|
||||
return mca_io_ompio_overwrite_amode;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) {
|
||||
return mca_io_ompio_cycle_buffer_size;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) {
|
||||
return mca_io_ompio_max_aggregators_ratio;
|
||||
}
|
||||
else if ( !strncmp ( mca_parameter_name, "aggregators_cutoff_threshold", name_length )) {
|
||||
return mca_io_ompio_aggregators_cutoff_threshold;
|
||||
}
|
||||
else {
|
||||
opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name");
|
||||
}
|
||||
|
||||
void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg)
|
||||
{
|
||||
*bytes_per_agg = mca_io_ompio_bytes_per_agg;
|
||||
return;
|
||||
/* Using here OMPI_ERROR_MAX instead of OMPI_ERROR, since -1 (which is OMPI_ERROR)
|
||||
** is a valid value for some mca parameters, indicating that the user did not set
|
||||
** that parameter value
|
||||
*/
|
||||
return OMPI_ERR_MAX;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -196,8 +196,7 @@ typedef int (*mca_io_ompio_generate_current_file_view_fn_t) (struct mca_io_ompio
|
||||
|
||||
/* functions to retrieve the number of aggregators and the size of the
|
||||
temporary buffer on aggregators from the fcoll modules */
|
||||
typedef void (*mca_io_ompio_get_num_aggregators_fn_t) ( int *num_aggregators);
|
||||
typedef void (*mca_io_ompio_get_bytes_per_agg_fn_t) ( int *bytes_per_agg);
|
||||
typedef int (*mca_io_ompio_get_mca_parameter_value_fn_t) ( char *mca_parameter_name, int name_length );
|
||||
typedef int (*mca_io_ompio_set_aggregator_props_fn_t) (struct mca_io_ompio_file_t *fh,
|
||||
int num_aggregators,
|
||||
size_t bytes_per_proc);
|
||||
@ -291,8 +290,7 @@ struct mca_io_ompio_file_t {
|
||||
mca_io_ompio_decode_datatype_fn_t f_decode_datatype;
|
||||
mca_io_ompio_generate_current_file_view_fn_t f_generate_current_file_view;
|
||||
|
||||
mca_io_ompio_get_num_aggregators_fn_t f_get_num_aggregators;
|
||||
mca_io_ompio_get_bytes_per_agg_fn_t f_get_bytes_per_agg;
|
||||
mca_io_ompio_get_mca_parameter_value_fn_t f_get_mca_parameter_value;
|
||||
mca_io_ompio_set_aggregator_props_fn_t f_set_aggregator_props;
|
||||
};
|
||||
typedef struct mca_io_ompio_file_t mca_io_ompio_file_t;
|
||||
@ -308,8 +306,7 @@ typedef struct mca_io_ompio_data_t mca_io_ompio_data_t;
|
||||
|
||||
/* functions to retrieve the number of aggregators and the size of the
|
||||
temporary buffer on aggregators from the fcoll modules */
|
||||
OMPI_DECLSPEC void mca_io_ompio_get_num_aggregators ( int *num_aggregators);
|
||||
OMPI_DECLSPEC void mca_io_ompio_get_bytes_per_agg ( int *bytes_per_agg);
|
||||
OMPI_DECLSPEC int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_length);
|
||||
|
||||
/*
|
||||
* Function that takes in a datatype and buffer, and decodes that datatype
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user