From d65480df3558f4803b9a6ae3e276cdb6b280f05d Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Wed, 16 Dec 2020 15:34:45 -0600 Subject: [PATCH] fbtl_posix_pwritev: add datasieving support for write its however restricted to collective I/O operations, at this point only from vulcan and dynamic_gen2. required some more infrastructure to be added to recognize individual I/O and multi-threaded environments. Signed-off-by: Edgar Gabriel --- ompi/mca/common/ompio/common_ompio.h | 1 + ompi/mca/fbtl/posix/fbtl_posix_preadv.c | 2 + ompi/mca/fbtl/posix/fbtl_posix_pwritev.c | 182 +++++++++++++++++- .../fcoll_dynamic_gen2_file_write_all.c | 8 +- .../vulcan/fcoll_vulcan_file_write_all.c | 2 + 5 files changed, 192 insertions(+), 3 deletions(-) diff --git a/ompi/mca/common/ompio/common_ompio.h b/ompi/mca/common/ompio/common_ompio.h index 33dd0dd402..1f2bbc585d 100644 --- a/ompi/mca/common/ompio/common_ompio.h +++ b/ompi/mca/common/ompio/common_ompio.h @@ -67,6 +67,7 @@ #define OMPIO_LOCK_NEVER 0x00000100 #define OMPIO_LOCK_NOT_THIS_OP 0x00000200 #define OMPIO_DATAREP_NATIVE 0x00000400 +#define OMPIO_COLLECTIVE_OP 0x00000800 #define OMPIO_ROOT 0 diff --git a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c index 15e9772981..89a819a6e2 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c @@ -158,6 +158,7 @@ ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh) ret, strerror(errno)); /* Just in case some part of the lock worked */ mca_fbtl_posix_unlock ( &lock, fh); + free ( temp_buf); return OMPI_ERROR; } @@ -165,6 +166,7 @@ ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh) mca_fbtl_posix_unlock ( &lock, fh); if ( ret_code == -1 ) { opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno)); + free ( temp_buf); return OMPI_ERROR; } diff --git a/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c b/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c index 7ad6e6d9d2..d54e9e0943 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2017 University of Houston. All rights reserved. + * Copyright (c) 2008-2020 University of Houston. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -30,7 +30,187 @@ #include "ompi/constants.h" #include "ompi/mca/fbtl/fbtl.h" +static ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh ); +static ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh ); + ssize_t mca_fbtl_posix_pwritev(ompio_file_t *fh ) +{ + ssize_t bytes_written=0, ret_code=0; + struct flock lock; + int ret; + + if (NULL == fh->f_io_array) { + return OMPI_ERROR; + } + + if ( fh->f_num_of_io_entries > 1 ) { + bool do_data_sieving = true; + + size_t avg_gap_size=0; + size_t avg_block_size = 0; + off_t prev_offset = (off_t)fh->f_io_array[0].offset; + int i; + for ( i=0; i< fh->f_num_of_io_entries; i++ ) { + avg_block_size += fh->f_io_array[i].length; + avg_gap_size += (size_t)((off_t)fh->f_io_array[i].offset - prev_offset); + prev_offset = (off_t)fh->f_io_array[i].offset; + } + avg_block_size = avg_block_size / fh->f_num_of_io_entries; + avg_gap_size = avg_gap_size / fh->f_num_of_io_entries; + + if ( false == mca_fbtl_posix_write_datasieving || + 0 == avg_gap_size || + avg_block_size > mca_fbtl_posix_max_block_size || + avg_gap_size > mca_fbtl_posix_max_gap_size || + ompi_mpi_thread_multiple || + !(fh->f_flags & OMPIO_COLLECTIVE_OP) ) { + do_data_sieving = false; + } + + if ( do_data_sieving) { + return mca_fbtl_posix_pwritev_datasieving (fh); + } + else { + return mca_fbtl_posix_pwritev_generic (fh); + } + } + else { + // i.e. fh->f_num_of_io_entries == 1 + ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, (off_t)fh->f_io_array[0].offset, + (off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION ); + if ( 0 < ret ) { + opal_output(1, "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() ret=%d: %s", + ret, strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + return OMPI_ERROR; + } + + ret_code = pwrite(fh->fd, fh->f_io_array[0].memory_address, fh->f_io_array[0].length, + (off_t)fh->f_io_array[0].offset ); + mca_fbtl_posix_unlock ( &lock, fh ); + if ( ret_code == -1 ) { + opal_output(1, "mca_fbtl_posix_pwritev: error in (p)write(v):%s", strerror(errno)); + return OMPI_ERROR; + } + + bytes_written += ret_code; + } + + return bytes_written; +} + +ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh) +{ + size_t start, end, len; + size_t bufsize = 0; + int ret, i, j; + ssize_t bytes_written=0, ret_code=0; + struct flock lock; + char *temp_buf = NULL; + + int startindex = 0; + int endindex = 0; + bool done = false; + + while (!done) { + // Break the io_array into chunks such that the size of the temporary + // buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes. + // Each iteration will thus work in the range (startindex, endindex[ + startindex = endindex; + if ( startindex >= fh->f_num_of_io_entries ) { + done = true; + break; + } + + size_t sstart = (size_t)fh->f_io_array[startindex].offset; + size_t slen=0; + + for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) { + endindex = j; + slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart; + if (slen > mca_fbtl_posix_max_tmpbuf_size ) { + endindex = j-1; + break; + } + } + // Need to increment the value of endindex + // by one for the loop syntax to work correctly. + endindex++; + + start = (size_t)fh->f_io_array[startindex].offset; + end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length; + len = end - start; + + if ( len > bufsize ) { + if ( NULL != temp_buf ) { + free ( temp_buf); + } + temp_buf = (char *) malloc ( len ); + if ( NULL == temp_buf ) { + opal_output(1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + bufsize = len; + } + + // Read the entire block. + ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, start, len, OMPIO_LOCK_ENTIRE_REGION ); + if ( 0 < ret ) { + opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s", + ret, strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + free ( temp_buf); + return OMPI_ERROR; + } + + ret_code = pread (fh->fd, temp_buf, len, start); + if ( ret_code == -1 ) { + //opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno)); + opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + free ( temp_buf); + return OMPI_ERROR; + } + + // Copy out the elements to write into temporary buffer. + size_t pos = 0; + size_t num_bytes; + size_t start_offset = (size_t) fh->f_io_array[startindex].offset; + for ( i = startindex ; i < endindex ; i++) { + pos = (size_t) fh->f_io_array[i].offset - start_offset; + num_bytes = fh->f_io_array[i].length; + memcpy (temp_buf + pos, fh->f_io_array[i].memory_address, num_bytes); + bytes_written += num_bytes; + } + ret_code = pwrite (fh->fd, temp_buf, len, start); + if ( ret_code == -1 ) { + opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + free ( temp_buf); + return OMPI_ERROR; + } + + mca_fbtl_posix_unlock ( &lock, fh); + if ( ret_code == -1 ) { + opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + free ( temp_buf); + return OMPI_ERROR; + } + + } + + free ( temp_buf); + return bytes_written; +} + + +ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh ) { /*int *fp = NULL;*/ int i, block = 1, ret; diff --git a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c index 282b248f79..6e6f09a20c 100644 --- a/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c +++ b/ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c @@ -736,11 +736,15 @@ static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) { - while ( aggr_data->prev_bytes_to_write > 0 ) { + while ( aggr_data->prev_bytes_to_write > 0 ) { + ssize_t tret; aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array, aggr_data->prev_num_io_entries, &last_array_pos, &last_pos ); - if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) { + fh->f_flags |= OMPIO_COLLECTIVE_OP; + tret = fh->f_fbtl->fbtl_pwritev (fh); + fh->f_flags &= ~OMPIO_COLLECTIVE_OP; + if ( 0 > tret ) { free ( aggr_data->prev_io_array); opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n"); ret = OMPI_ERROR; diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index abf281a0a3..8b2ccaa183 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -771,7 +771,9 @@ static int write_init (ompio_file_t *fh, } } else { + fh->f_flags |= OMPIO_COLLECTIVE_OP; ret_temp = fh->f_fbtl->fbtl_pwritev(fh); + fh->f_flags &= ~OMPIO_COLLECTIVE_OP; if(0 > ret_temp) { opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n"); ret = ret_temp;