1
1

dynamic_gen2/file_write_all: fix chunk assignment per stride

the dynamic_gen_file_write_all component distinguishes between the amount of data communicated
to aggregators, and the amount of data written in a cycle by the aggregator (in contrary e.g. to the vulcan component).
There was a bug in calculating which chunks have to be written in a cycle by an aggregator: we added as many elements into the
io_array until we filled one stripe. Unfortuantely, the metric used was the amount of data instead of ensuring that all offsets
fall within a single stripe. This commit fixes this issue. Note, the bug did not create a correctness problem, just a performance
problem in case there were gaps in the file view.

Signed-off-by: Edgar Gabriel <egabriel@central.uh.edu>
Этот коммит содержится в:
Edgar Gabriel 2020-12-10 08:53:11 -06:00
родитель 47fb05f82a
Коммит f70bb4774a

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@ -92,7 +92,7 @@ typedef struct mca_io_ompio_aggregator_data {
static int shuffle_init ( int index, int cycles, int aggregator, int rank,
mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data );
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
@ -111,8 +111,7 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
int *sorted);
int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
int num_entries, int *last_array_pos, int *last_pos_in_field,
int chunk_size );
int num_entries, int *last_array_pos, int *last_pos_in_field );
int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
@ -145,7 +144,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
MPI_Aint *broken_total_lengths=NULL;
int *aggregators=NULL;
int write_chunksize, *result_counts=NULL;
int *result_counts=NULL;
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
@ -199,15 +198,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
if ( fh->f_stripe_size == 0 ) {
// EDGAR: just a quick heck for testing
//fh->f_stripe_size = 1048576;
fh->f_stripe_size = 65536;
}
if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) {
write_chunksize = fh->f_stripe_size;
}
else {
write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize;
}
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
if (OMPI_SUCCESS != ret){
@ -608,7 +601,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[i], aggr_data[i] );
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -637,7 +630,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[i], aggr_data[i] );
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -735,7 +728,7 @@ exit :
}
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data )
{
int ret=OMPI_SUCCESS;
int last_array_pos=0;
@ -746,14 +739,28 @@ static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator
while ( aggr_data->prev_bytes_to_write > 0 ) {
aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos,
write_chunksize );
&last_array_pos, &last_pos );
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
free ( aggr_data->prev_io_array);
opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
ret = OMPI_ERROR;
goto exit;
}
#if DEBUG_ON
printf("fh->f_num_of_io_entries=%d\n", fh->f_num_of_io_entries);
printf("[%d]: fh->f_io_array[0].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[0].offset,
fh->f_io_array[0].length);
if ( fh->f_num_of_io_entries > 1 )
printf("[%d]: fh->f_io_array[1].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[1].offset,
fh->f_io_array[1].length);
int n = fh->f_num_of_io_entries-1;
if ( fh->f_num_of_io_entries > 2 )
printf("[%d]: fh->f_io_array[n].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[n].offset,
fh->f_io_array[n].length);
#endif
}
free ( fh->f_io_array );
free ( aggr_data->prev_io_array);
@ -1595,14 +1602,15 @@ int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen
int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
int *ret_array_pos, int *ret_pos, int chunk_size )
int *ret_array_pos, int *ret_pos )
{
int array_pos = *ret_array_pos;
int pos = *ret_pos;
size_t bytes_written = 0;
size_t bytes_to_write = chunk_size;
off_t baseaddr = ((off_t)io_array[array_pos].offset + pos) - (((off_t)io_array[array_pos].offset + pos) % (off_t)fh->f_stripe_size);
off_t endaddr = baseaddr + fh->f_stripe_size;
if ( 0 == array_pos && 0 == pos ) {
fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
if ( NULL == fh->f_io_array ){
@ -1612,20 +1620,21 @@ int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_
}
int i=0;
while (bytes_to_write > 0 ) {
fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
do {
fh->f_io_array[i].memory_address = (char *)io_array[array_pos].memory_address + pos;
fh->f_io_array[i].offset = (char *)io_array[array_pos].offset + pos;
if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
fh->f_io_array[i].length = bytes_to_write;
off_t length = io_array[array_pos].length - pos;
if ( ( (off_t)fh->f_io_array[i].offset + length) < endaddr ) {
fh->f_io_array[i].length = length;
}
else {
fh->f_io_array[i].length = io_array[array_pos].length - pos;
fh->f_io_array[i].length = endaddr - (size_t)fh->f_io_array[i].offset;
}
pos += fh->f_io_array[i].length;
bytes_written += fh->f_io_array[i].length;
bytes_to_write-= fh->f_io_array[i].length;
i++;
if ( pos == (int)io_array[array_pos].length ) {
@ -1637,7 +1646,7 @@ int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_
break;
}
}
}
} while ( (array_pos < num_entries) && (((off_t)io_array[array_pos].offset+pos ) < endaddr) );
fh->f_num_of_io_entries = i;
*ret_array_pos = array_pos;