diff --git a/ompi/mca/fbtl/posix/fbtl_posix.c b/ompi/mca/fbtl/posix/fbtl_posix.c index 4c6d21ab01..29dbdb4197 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix.c +++ b/ompi/mca/fbtl/posix/fbtl_posix.c @@ -118,6 +118,7 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req) #if defined (FBTL_POSIX_HAVE_AIO) int i=0, lcount=0; mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data; + off_t start_offset, end_offset, total_length; for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) { if ( EINPROGRESS == data->aio_req_status[i] ) { @@ -154,6 +155,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req) #endif if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) { + /* release the lock of the previous operations */ + mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh ); + /* post the next batch of operations */ data->aio_first_active_req = data->aio_last_active_req; if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) { @@ -162,16 +166,30 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req) else { data->aio_last_active_req = data->aio_req_count; } + + start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset; + end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes; + total_length = (end_offset - start_offset); + + if ( FBTL_POSIX_READ == data->aio_req_type ) { + mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION ); + } + else if ( FBTL_POSIX_WRITE == data->aio_req_type ) { + mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION ); + } + for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) { if ( FBTL_POSIX_READ == data->aio_req_type ) { if (-1 == aio_read(&data->aio_reqs[i])) { perror("aio_read() error"); + mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh ); return OMPI_ERROR; } } else if ( FBTL_POSIX_WRITE == data->aio_req_type ) { if (-1 == aio_write(&data->aio_reqs[i])) { perror("aio_write() error"); + mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh ); return OMPI_ERROR; } } diff --git a/ompi/mca/fbtl/posix/fbtl_posix.h b/ompi/mca/fbtl/posix/fbtl_posix.h index b9353a921a..b4d029dd96 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix.h +++ b/ompi/mca/fbtl/posix/fbtl_posix.h @@ -73,6 +73,8 @@ struct mca_fbtl_posix_request_data_t { struct aiocb *aio_reqs; /* pointer array of req structures */ int *aio_req_status; /* array of statuses */ ssize_t aio_total_len; /* total amount of data written */ + struct flock aio_lock; /* lock used for certain file systems */ + mca_io_ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */ }; typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t; diff --git a/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c b/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c index 00eaedeaf7..6ac9d03703 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c @@ -40,6 +40,7 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh, mca_fbtl_posix_request_data_t *data; mca_ompio_request_t *req = (mca_ompio_request_t *) request; int i=0; + off_t start_offset, end_offset, total_length; data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t)); if ( NULL == data ) { @@ -67,6 +68,7 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh, free(data); return 0; } + data->aio_fh = fh; for ( i=0; if_num_of_io_entries; i++ ) { data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t) @@ -86,9 +88,16 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh, else { data->aio_last_active_req = data->aio_req_count; } + + start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset; + end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes; + total_length = (end_offset - start_offset); + mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION ); + for (i=0; i < data->aio_last_active_req; i++) { if (-1 == aio_read(&data->aio_reqs[i])) { opal_output(1, "aio_read() error: %s", strerror(errno)); + mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh ); free(data->aio_reqs); free(data->aio_req_status); free(data); diff --git a/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c b/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c index 1d869c2a75..2bd04ff747 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c @@ -39,6 +39,7 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh, mca_fbtl_posix_request_data_t *data; mca_ompio_request_t *req = (mca_ompio_request_t *) request; int i=0; + off_t start_offset, end_offset, total_length; data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t)); if ( NULL == data ) { @@ -66,6 +67,7 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh, free(data); return 0; } + data->aio_fh = fh; for ( i=0; if_num_of_io_entries; i++ ) { data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t) @@ -85,10 +87,16 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh, else { data->aio_last_active_req = data->aio_req_count; } + + start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset; + end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes; + total_length = (end_offset - start_offset); + mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION ); for (i=0; i < data->aio_last_active_req; i++) { if (-1 == aio_write(&data->aio_reqs[i])) { opal_output(1, "aio_write() error: %s", strerror(errno)); + mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh ); free(data->aio_req_status); free(data->aio_reqs); free(data); diff --git a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c index cbf9d34fb9..b6360b3d97 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_preadv.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_preadv.c @@ -55,7 +55,6 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh ) iov[iov_count].iov_base = fh->f_io_array[i].memory_address; iov[iov_count].iov_len = fh->f_io_array[i].length; iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset; - total_length = iov[iov_count].iov_len; iov_count ++; } @@ -78,11 +77,12 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh ) iov[iov_count].iov_base = fh->f_io_array[i+1].memory_address; iov[iov_count].iov_len = fh->f_io_array[i+1].length; - total_length += iov[iov_count].iov_len; iov_count ++; continue; } } + + total_length = ((off_t)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len - (off_t)iov_offset ); mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE ); #if defined(HAVE_PREADV) ret_code = preadv (fh->fd, iov, iov_count, iov_offset); diff --git a/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c b/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c index da7c31e959..61d8008355 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_pwritev.c @@ -57,7 +57,6 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh ) iov[iov_count].iov_base = fh->f_io_array[i].memory_address; iov[iov_count].iov_len = fh->f_io_array[i].length; iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset; - total_length = iov[iov_count].iov_len; iov_count ++; } @@ -79,7 +78,6 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh ) (iov_count < IOV_MAX )) { iov[iov_count].iov_base = fh->f_io_array[i+1].memory_address; iov[iov_count].iov_len = fh->f_io_array[i+1].length; - total_length += iov[iov_count].iov_len; iov_count ++; continue; } @@ -97,6 +95,7 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh ) */ + total_length = ((off_t)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len - (off_t)iov_offset); mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE ); #if defined (HAVE_PWRITEV) ret_code = pwritev (fh->fd, iov, iov_count, iov_offset);