From f70bb4774ad6bca0b65b9b03f07610528b25633a Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Thu, 10 Dec 2020 08:53:11 -0600 Subject: [PATCH] dynamic_gen2/file_write_all: fix chunk assignment per stride the dynamic_gen_file_write_all component distinguishes between the amount of data communicated to aggregators, and the amount of data written in a cycle by the aggregator (in contrary e.g. to the vulcan component). There was a bug in calculating which chunks have to be written in a cycle by an aggregator: we added as many elements into the io_array until we filled one stripe. Unfortuantely, the metric used was the amount of data instead of ensuring that all offsets fall within a single stripe. This commit fixes this issue. Note, the bug did not create a correctness problem, just a performance problem in case there were gaps in the file view. Signed-off-by: Edgar Gabriel --- .../fcoll_dynamic_gen2_file_write_all.c | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 9763140990..282b248f79 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2016 University of Houston. All rights reserved. + * Copyright (c) 2008-2020 University of Houston. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. @@ -92,7 +92,7 @@ typedef struct mca_io_ompio_aggregator_data { static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); -static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize ); +static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data ); int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, @@ -111,8 +111,7 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array, int *sorted); int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array, - int num_entries, int *last_array_pos, int *last_pos_in_field, - int chunk_size ); + int num_entries, int *last_array_pos, int *last_pos_in_field ); int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh, @@ -145,7 +144,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh, MPI_Aint *broken_total_lengths=NULL; int *aggregators=NULL; - int write_chunksize, *result_counts=NULL; + int *result_counts=NULL; #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN @@ -199,15 +198,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh, if ( fh->f_stripe_size == 0 ) { // EDGAR: just a quick heck for testing + //fh->f_stripe_size = 1048576; fh->f_stripe_size = 65536; } - if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) { - write_chunksize = fh->f_stripe_size; - } - else { - write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize; - } - ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators); if (OMPI_SUCCESS != ret){ @@ -608,7 +601,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_write_time = MPI_Wtime(); #endif - ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize ); + ret = write_init (fh, aggregators[i], aggr_data[i] ); if (OMPI_SUCCESS != ret){ goto exit; } @@ -637,7 +630,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_write_time = MPI_Wtime(); #endif - ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize ); + ret = write_init (fh, aggregators[i], aggr_data[i] ); if (OMPI_SUCCESS != ret){ goto exit; } @@ -735,7 +728,7 @@ exit : } -static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize ) +static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data ) { int ret=OMPI_SUCCESS; int last_array_pos=0; @@ -746,14 +739,28 @@ static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator while ( aggr_data->prev_bytes_to_write > 0 ) { aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array, aggr_data->prev_num_io_entries, - &last_array_pos, &last_pos, - write_chunksize ); + &last_array_pos, &last_pos ); if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { free ( aggr_data->prev_io_array); opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n"); ret = OMPI_ERROR; goto exit; } + +#if DEBUG_ON + printf("fh->f_num_of_io_entries=%d\n", fh->f_num_of_io_entries); + printf("[%d]: fh->f_io_array[0].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[0].offset, + fh->f_io_array[0].length); + if ( fh->f_num_of_io_entries > 1 ) + printf("[%d]: fh->f_io_array[1].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[1].offset, + fh->f_io_array[1].length); + + + int n = fh->f_num_of_io_entries-1; + if ( fh->f_num_of_io_entries > 2 ) + printf("[%d]: fh->f_io_array[n].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[n].offset, + fh->f_io_array[n].length); +#endif } free ( fh->f_io_array ); free ( aggr_data->prev_io_array); @@ -1595,14 +1602,15 @@ int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries, - int *ret_array_pos, int *ret_pos, int chunk_size ) + int *ret_array_pos, int *ret_pos ) { int array_pos = *ret_array_pos; int pos = *ret_pos; size_t bytes_written = 0; - size_t bytes_to_write = chunk_size; - + off_t baseaddr = ((off_t)io_array[array_pos].offset + pos) - (((off_t)io_array[array_pos].offset + pos) % (off_t)fh->f_stripe_size); + off_t endaddr = baseaddr + fh->f_stripe_size; + if ( 0 == array_pos && 0 == pos ) { fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t)); if ( NULL == fh->f_io_array ){ @@ -1612,20 +1620,21 @@ int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_ } int i=0; - while (bytes_to_write > 0 ) { - fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]); - fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]); + do { + fh->f_io_array[i].memory_address = (char *)io_array[array_pos].memory_address + pos; + fh->f_io_array[i].offset = (char *)io_array[array_pos].offset + pos; - if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) { - fh->f_io_array[i].length = bytes_to_write; + off_t length = io_array[array_pos].length - pos; + + if ( ( (off_t)fh->f_io_array[i].offset + length) < endaddr ) { + fh->f_io_array[i].length = length; } else { - fh->f_io_array[i].length = io_array[array_pos].length - pos; + fh->f_io_array[i].length = endaddr - (size_t)fh->f_io_array[i].offset; } - + pos += fh->f_io_array[i].length; bytes_written += fh->f_io_array[i].length; - bytes_to_write-= fh->f_io_array[i].length; i++; if ( pos == (int)io_array[array_pos].length ) { @@ -1637,7 +1646,7 @@ int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_ break; } } - } + } while ( (array_pos < num_entries) && (((off_t)io_array[array_pos].offset+pos ) < endaddr) ); fh->f_num_of_io_entries = i; *ret_array_pos = array_pos;