fbtl/posix: add support for file locking for the non-blocking operations
Signed-off-by: Edgar Gabriel <egabriel@central.uh.edu>
Этот коммит содержится в:
родитель
415e76514d
Коммит
a3c638bc38
@ -118,6 +118,7 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
#if defined (FBTL_POSIX_HAVE_AIO)
|
||||
int i=0, lcount=0;
|
||||
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
|
||||
if ( EINPROGRESS == data->aio_req_status[i] ) {
|
||||
@ -154,6 +155,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
#endif
|
||||
|
||||
if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
|
||||
/* release the lock of the previous operations */
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
|
||||
/* post the next batch of operations */
|
||||
data->aio_first_active_req = data->aio_last_active_req;
|
||||
if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
|
||||
@ -162,16 +166,30 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
|
||||
if ( FBTL_POSIX_READ == data->aio_req_type ) {
|
||||
mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
}
|
||||
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
|
||||
mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
}
|
||||
|
||||
for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
|
||||
if ( FBTL_POSIX_READ == data->aio_req_type ) {
|
||||
if (-1 == aio_read(&data->aio_reqs[i])) {
|
||||
perror("aio_read() error");
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
|
||||
if (-1 == aio_write(&data->aio_reqs[i])) {
|
||||
perror("aio_write() error");
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
@ -73,6 +73,8 @@ struct mca_fbtl_posix_request_data_t {
|
||||
struct aiocb *aio_reqs; /* pointer array of req structures */
|
||||
int *aio_req_status; /* array of statuses */
|
||||
ssize_t aio_total_len; /* total amount of data written */
|
||||
struct flock aio_lock; /* lock used for certain file systems */
|
||||
mca_io_ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */
|
||||
};
|
||||
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
|
||||
|
||||
|
@ -40,6 +40,7 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
mca_fbtl_posix_request_data_t *data;
|
||||
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
|
||||
int i=0;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
|
||||
if ( NULL == data ) {
|
||||
@ -67,6 +68,7 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
free(data);
|
||||
return 0;
|
||||
}
|
||||
data->aio_fh = fh;
|
||||
|
||||
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
|
||||
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
|
||||
@ -86,9 +88,16 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
|
||||
for (i=0; i < data->aio_last_active_req; i++) {
|
||||
if (-1 == aio_read(&data->aio_reqs[i])) {
|
||||
opal_output(1, "aio_read() error: %s", strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_reqs);
|
||||
free(data->aio_req_status);
|
||||
free(data);
|
||||
|
@ -39,6 +39,7 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
mca_fbtl_posix_request_data_t *data;
|
||||
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
|
||||
int i=0;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
|
||||
if ( NULL == data ) {
|
||||
@ -66,6 +67,7 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
free(data);
|
||||
return 0;
|
||||
}
|
||||
data->aio_fh = fh;
|
||||
|
||||
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
|
||||
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
|
||||
@ -85,10 +87,16 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
|
||||
for (i=0; i < data->aio_last_active_req; i++) {
|
||||
if (-1 == aio_write(&data->aio_reqs[i])) {
|
||||
opal_output(1, "aio_write() error: %s", strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_req_status);
|
||||
free(data->aio_reqs);
|
||||
free(data);
|
||||
|
@ -55,7 +55,6 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
||||
total_length = iov[iov_count].iov_len;
|
||||
iov_count ++;
|
||||
}
|
||||
|
||||
@ -78,11 +77,12 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base =
|
||||
fh->f_io_array[i+1].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
||||
total_length += iov[iov_count].iov_len;
|
||||
iov_count ++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
total_length = ((off_t)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len - (off_t)iov_offset );
|
||||
mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
|
||||
#if defined(HAVE_PREADV)
|
||||
ret_code = preadv (fh->fd, iov, iov_count, iov_offset);
|
||||
|
@ -57,7 +57,6 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
||||
total_length = iov[iov_count].iov_len;
|
||||
iov_count ++;
|
||||
}
|
||||
|
||||
@ -79,7 +78,6 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
(iov_count < IOV_MAX )) {
|
||||
iov[iov_count].iov_base = fh->f_io_array[i+1].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
||||
total_length += iov[iov_count].iov_len;
|
||||
iov_count ++;
|
||||
continue;
|
||||
}
|
||||
@ -97,6 +95,7 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
|
||||
*/
|
||||
|
||||
total_length = ((off_t)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len - (off_t)iov_offset);
|
||||
mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
|
||||
#if defined (HAVE_PWRITEV)
|
||||
ret_code = pwritev (fh->fd, iov, iov_count, iov_offset);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user