diff --git a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c index 5e19080105..15e9772981 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c @@ -29,8 +29,8 @@ #include "ompi/mca/fbtl/fbtl.h" -static int mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh); -static int mca_fbtl_posix_preadv_generic (ompio_file_t *fh); +static ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh); +static ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh); ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh ) { @@ -57,8 +57,8 @@ ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh ) avg_block_size = avg_block_size / fh->f_num_of_io_entries; avg_gap_size = avg_gap_size / fh->f_num_of_io_entries; - if ( mca_fbtl_posix_read_datasieving == false || - avg_gap_size == 0 || + if ( false == mca_fbtl_posix_read_datasieving || + 0 == avg_gap_size || avg_block_size > mca_fbtl_posix_max_block_size || avg_gap_size > mca_fbtl_posix_max_gap_size ) { do_data_sieving = false; @@ -76,7 +76,8 @@ ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh ) ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, (off_t)fh->f_io_array[0].offset, (off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION ); if ( 0 < ret ) { - opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno)); + opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s", + ret, strerror(errno)); /* Just in case some part of the lock worked */ mca_fbtl_posix_unlock ( &lock, fh); return OMPI_ERROR; @@ -96,62 +97,101 @@ ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh ) return bytes_read; } -int mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh) +ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh) { size_t start, end, len; - int ret, i; + size_t bufsize = 0; + int ret, i, j; ssize_t bytes_read=0, ret_code=0; struct flock lock; + char *temp_buf = NULL; - start = (off_t)fh->f_io_array[0].offset; - end = (off_t)fh->f_io_array[fh->f_num_of_io_entries-1].offset + fh->f_io_array[fh->f_num_of_io_entries-1].length; - len = end - start; + int startindex = 0; + int endindex = 0; + bool done = false; - char *temp_buf = (char *) malloc ( len ); - if ( NULL == temp_buf ) { - opal_output(1, "OUT OF MEMORY\n"); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - // Read the entire block. - ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, start, len, OMPIO_LOCK_ENTIRE_REGION ); - if ( 0 < ret ) { - opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno)); - /* Just in case some part of the lock worked */ - mca_fbtl_posix_unlock ( &lock, fh); - return OMPI_ERROR; - } - - ret_code = pread (fh->fd, temp_buf, len, start); - mca_fbtl_posix_unlock ( &lock, fh); - if ( ret_code == -1 ) { - opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno)); - return OMPI_ERROR; - } - - // Copy out the elements that were requested. - size_t pos = 0; - size_t num_bytes; - size_t start_offset = (size_t) fh->f_io_array[0].offset; - for (i=0 ; if_num_of_io_entries ; i++) { - pos = (size_t) fh->f_io_array[i].offset - start_offset; - if ( (ssize_t) pos > ret_code ) { + while (!done) { + // Break the io_array into chunks such that the size of the temporary + // buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes. + // Each iteration will thus work in the range (startindex, endindex[ + startindex = endindex; + if ( startindex >= fh->f_num_of_io_entries ) { + done = true; break; } - num_bytes = fh->f_io_array[i].length; - if ( ((ssize_t) pos + (ssize_t)num_bytes) > ret_code ) { - num_bytes = ret_code - (ssize_t)pos; + + size_t sstart = (size_t)fh->f_io_array[startindex].offset; + size_t slen=0; + + for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) { + endindex = j; + slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart; + if (slen > mca_fbtl_posix_max_tmpbuf_size ) { + endindex = j-1; + break; + } + } + // Need to increment the value of endindex + // by one for the loop syntax to work correctly. + endindex++; + + start = (size_t)fh->f_io_array[startindex].offset; + end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length; + len = end - start; + + if ( len > bufsize ) { + if ( NULL != temp_buf ) { + free ( temp_buf); + } + temp_buf = (char *) malloc ( len ); + if ( NULL == temp_buf ) { + opal_output(1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + bufsize = len; } - memcpy (fh->f_io_array[i].memory_address, temp_buf + pos, num_bytes); - bytes_read += num_bytes; + // Read the entire block. + ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, start, len, OMPIO_LOCK_ENTIRE_REGION ); + if ( 0 < ret ) { + opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s", + ret, strerror(errno)); + /* Just in case some part of the lock worked */ + mca_fbtl_posix_unlock ( &lock, fh); + return OMPI_ERROR; + } + + ret_code = pread (fh->fd, temp_buf, len, start); + mca_fbtl_posix_unlock ( &lock, fh); + if ( ret_code == -1 ) { + opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno)); + return OMPI_ERROR; + } + + // Copy out the elements that were requested. + size_t pos = 0; + size_t num_bytes; + size_t start_offset = (size_t) fh->f_io_array[startindex].offset; + for ( i = startindex ; i < endindex ; i++) { + pos = (size_t) fh->f_io_array[i].offset - start_offset; + if ( (ssize_t) pos > ret_code ) { + break; + } + num_bytes = fh->f_io_array[i].length; + if ( ((ssize_t) pos + (ssize_t)num_bytes) > ret_code ) { + num_bytes = ret_code - (ssize_t)pos; + } + + memcpy (fh->f_io_array[i].memory_address, temp_buf + pos, num_bytes); + bytes_read += num_bytes; + } } free ( temp_buf); return bytes_read; } -int mca_fbtl_posix_preadv_generic (ompio_file_t *fh ) +ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh ) { ssize_t bytes_read=0, ret_code=0; struct iovec *iov = NULL;