1
1

fbtl_posix_pwritev: add datasieving support for write

its however restricted to collective I/O operations, at this point
only from vulcan and dynamic_gen2. required some more infrastructure
to be added to recognize individual I/O and multi-threaded environments.

Signed-off-by: Edgar Gabriel <egabriel@central.uh.edu>
Этот коммит содержится в:
Edgar Gabriel 2020-12-16 15:34:45 -06:00
родитель 90d8c8c39c
Коммит d65480df35
5 изменённых файлов: 192 добавлений и 3 удалений

Просмотреть файл

@ -67,6 +67,7 @@
#define OMPIO_LOCK_NEVER 0x00000100
#define OMPIO_LOCK_NOT_THIS_OP 0x00000200
#define OMPIO_DATAREP_NATIVE 0x00000400
#define OMPIO_COLLECTIVE_OP 0x00000800
#define OMPIO_ROOT 0

Просмотреть файл

@ -158,6 +158,7 @@ ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh)
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
@ -165,6 +166,7 @@ ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh)
mca_fbtl_posix_unlock ( &lock, fh);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno));
free ( temp_buf);
return OMPI_ERROR;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -30,7 +30,187 @@
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
static ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh );
static ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh );
ssize_t mca_fbtl_posix_pwritev(ompio_file_t *fh )
{
ssize_t bytes_written=0, ret_code=0;
struct flock lock;
int ret;
if (NULL == fh->f_io_array) {
return OMPI_ERROR;
}
if ( fh->f_num_of_io_entries > 1 ) {
bool do_data_sieving = true;
size_t avg_gap_size=0;
size_t avg_block_size = 0;
off_t prev_offset = (off_t)fh->f_io_array[0].offset;
int i;
for ( i=0; i< fh->f_num_of_io_entries; i++ ) {
avg_block_size += fh->f_io_array[i].length;
avg_gap_size += (size_t)((off_t)fh->f_io_array[i].offset - prev_offset);
prev_offset = (off_t)fh->f_io_array[i].offset;
}
avg_block_size = avg_block_size / fh->f_num_of_io_entries;
avg_gap_size = avg_gap_size / fh->f_num_of_io_entries;
if ( false == mca_fbtl_posix_write_datasieving ||
0 == avg_gap_size ||
avg_block_size > mca_fbtl_posix_max_block_size ||
avg_gap_size > mca_fbtl_posix_max_gap_size ||
ompi_mpi_thread_multiple ||
!(fh->f_flags & OMPIO_COLLECTIVE_OP) ) {
do_data_sieving = false;
}
if ( do_data_sieving) {
return mca_fbtl_posix_pwritev_datasieving (fh);
}
else {
return mca_fbtl_posix_pwritev_generic (fh);
}
}
else {
// i.e. fh->f_num_of_io_entries == 1
ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, (off_t)fh->f_io_array[0].offset,
(off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
return OMPI_ERROR;
}
ret_code = pwrite(fh->fd, fh->f_io_array[0].memory_address, fh->f_io_array[0].length,
(off_t)fh->f_io_array[0].offset );
mca_fbtl_posix_unlock ( &lock, fh );
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev: error in (p)write(v):%s", strerror(errno));
return OMPI_ERROR;
}
bytes_written += ret_code;
}
return bytes_written;
}
ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh)
{
size_t start, end, len;
size_t bufsize = 0;
int ret, i, j;
ssize_t bytes_written=0, ret_code=0;
struct flock lock;
char *temp_buf = NULL;
int startindex = 0;
int endindex = 0;
bool done = false;
while (!done) {
// Break the io_array into chunks such that the size of the temporary
// buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes.
// Each iteration will thus work in the range (startindex, endindex[
startindex = endindex;
if ( startindex >= fh->f_num_of_io_entries ) {
done = true;
break;
}
size_t sstart = (size_t)fh->f_io_array[startindex].offset;
size_t slen=0;
for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) {
endindex = j;
slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart;
if (slen > mca_fbtl_posix_max_tmpbuf_size ) {
endindex = j-1;
break;
}
}
// Need to increment the value of endindex
// by one for the loop syntax to work correctly.
endindex++;
start = (size_t)fh->f_io_array[startindex].offset;
end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length;
len = end - start;
if ( len > bufsize ) {
if ( NULL != temp_buf ) {
free ( temp_buf);
}
temp_buf = (char *) malloc ( len );
if ( NULL == temp_buf ) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
bufsize = len;
}
// Read the entire block.
ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, start, len, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
ret_code = pread (fh->fd, temp_buf, len, start);
if ( ret_code == -1 ) {
//opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
// Copy out the elements to write into temporary buffer.
size_t pos = 0;
size_t num_bytes;
size_t start_offset = (size_t) fh->f_io_array[startindex].offset;
for ( i = startindex ; i < endindex ; i++) {
pos = (size_t) fh->f_io_array[i].offset - start_offset;
num_bytes = fh->f_io_array[i].length;
memcpy (temp_buf + pos, fh->f_io_array[i].memory_address, num_bytes);
bytes_written += num_bytes;
}
ret_code = pwrite (fh->fd, temp_buf, len, start);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
mca_fbtl_posix_unlock ( &lock, fh);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
}
free ( temp_buf);
return bytes_written;
}
ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh )
{
/*int *fp = NULL;*/
int i, block = 1, ret;

Просмотреть файл

@ -736,11 +736,15 @@ static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator
if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
ssize_t tret;
aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos );
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
fh->f_flags |= OMPIO_COLLECTIVE_OP;
tret = fh->f_fbtl->fbtl_pwritev (fh);
fh->f_flags &= ~OMPIO_COLLECTIVE_OP;
if ( 0 > tret ) {
free ( aggr_data->prev_io_array);
opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
ret = OMPI_ERROR;

Просмотреть файл

@ -771,7 +771,9 @@ static int write_init (ompio_file_t *fh,
}
}
else {
fh->f_flags |= OMPIO_COLLECTIVE_OP;
ret_temp = fh->f_fbtl->fbtl_pwritev(fh);
fh->f_flags &= ~OMPIO_COLLECTIVE_OP;
if(0 > ret_temp) {
opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
ret = ret_temp;