Merge pull request #4362 from edgargabriel/topic/fbtl-locking-support
Add file locking support in posix fbtl
Этот коммит содержится в:
Коммит
defe73984a
@ -90,6 +90,7 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm,
|
||||
ompio_fh->f_amode = amode;
|
||||
ompio_fh->f_info = info;
|
||||
ompio_fh->f_atomicity = 0;
|
||||
ompio_fh->f_fs_block_size = 4096;
|
||||
|
||||
mca_common_ompio_set_file_defaults (ompio_fh);
|
||||
ompio_fh->f_filename = filename;
|
||||
|
@ -9,7 +9,7 @@
|
||||
# University of Stuttgart. All rights reserved.
|
||||
# Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
# All rights reserved.
|
||||
# Copyright (c) 2008-2011 University of Houston. All rights reserved.
|
||||
# Copyright (c) 2008-2017 University of Houston. All rights reserved.
|
||||
# Copyright (c) 2017 IBM Corporation. All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
@ -49,4 +49,5 @@ sources = \
|
||||
fbtl_posix_preadv.c \
|
||||
fbtl_posix_ipreadv.c \
|
||||
fbtl_posix_pwritev.c \
|
||||
fbtl_posix_ipwritev.c
|
||||
fbtl_posix_ipwritev.c \
|
||||
fbtl_posix_lock.c
|
||||
|
@ -116,8 +116,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
{
|
||||
bool ret=false;
|
||||
#if defined (FBTL_POSIX_HAVE_AIO)
|
||||
int i=0, lcount=0;
|
||||
int i=0, lcount=0, ret_code;
|
||||
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
|
||||
if ( EINPROGRESS == data->aio_req_status[i] ) {
|
||||
@ -154,6 +155,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
#endif
|
||||
|
||||
if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
|
||||
/* release the lock of the previous operations */
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
|
||||
/* post the next batch of operations */
|
||||
data->aio_first_active_req = data->aio_last_active_req;
|
||||
if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
|
||||
@ -162,16 +166,36 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
|
||||
if ( FBTL_POSIX_READ == data->aio_req_type ) {
|
||||
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
}
|
||||
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
|
||||
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
}
|
||||
if ( 0 < ret_code ) {
|
||||
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
|
||||
/* Just in case some part of the lock actually succeeded. */
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
|
||||
if ( FBTL_POSIX_READ == data->aio_req_type ) {
|
||||
if (-1 == aio_read(&data->aio_reqs[i])) {
|
||||
perror("aio_read() error");
|
||||
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
|
||||
if (-1 == aio_write(&data->aio_reqs[i])) {
|
||||
perror("aio_write() error");
|
||||
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
@ -185,6 +209,7 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
|
||||
/* all pending operations are finished for this request */
|
||||
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->req_ompi.req_status._ucount = data->aio_total_len;
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
ret = true;
|
||||
}
|
||||
#endif
|
||||
@ -197,6 +222,7 @@ void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
|
||||
/* Free the fbtl specific data structures */
|
||||
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
|
||||
if (NULL != data ) {
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
if ( NULL != data->aio_reqs ) {
|
||||
free ( data->aio_reqs);
|
||||
}
|
||||
|
@ -58,6 +58,11 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *file,
|
||||
bool mca_fbtl_posix_progress ( mca_ompio_request_t *req);
|
||||
void mca_fbtl_posix_request_free ( mca_ompio_request_t *req);
|
||||
|
||||
int mca_fbtl_posix_lock ( struct flock *lock, mca_io_ompio_file_t *fh, int op,
|
||||
OMPI_MPI_OFFSET_TYPE iov_offset, off_t len, int flags);
|
||||
void mca_fbtl_posix_unlock ( struct flock *lock, mca_io_ompio_file_t *fh );
|
||||
|
||||
|
||||
struct mca_fbtl_posix_request_data_t {
|
||||
int aio_req_count; /* total number of aio reqs */
|
||||
int aio_open_reqs; /* number of unfinished reqs */
|
||||
@ -68,6 +73,8 @@ struct mca_fbtl_posix_request_data_t {
|
||||
struct aiocb *aio_reqs; /* pointer array of req structures */
|
||||
int *aio_req_status; /* array of statuses */
|
||||
ssize_t aio_total_len; /* total amount of data written */
|
||||
struct flock aio_lock; /* lock used for certain file systems */
|
||||
mca_io_ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */
|
||||
};
|
||||
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
|
||||
|
||||
@ -78,6 +85,7 @@ typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
|
||||
#define FBTL_POSIX_READ 1
|
||||
#define FBTL_POSIX_WRITE 2
|
||||
|
||||
|
||||
/*
|
||||
* ******************************************************************
|
||||
* ************ functions implemented in this module end ************
|
||||
|
@ -39,7 +39,8 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
#if defined (FBTL_POSIX_HAVE_AIO)
|
||||
mca_fbtl_posix_request_data_t *data;
|
||||
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
|
||||
int i=0;
|
||||
int i=0, ret;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
|
||||
if ( NULL == data ) {
|
||||
@ -67,6 +68,7 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
free(data);
|
||||
return 0;
|
||||
}
|
||||
data->aio_fh = fh;
|
||||
|
||||
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
|
||||
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
|
||||
@ -86,9 +88,24 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
if ( 0 < ret ) {
|
||||
opal_output(1, "mca_fbtl_posix_ipreadv: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_reqs);
|
||||
free(data->aio_req_status);
|
||||
free(data);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
for (i=0; i < data->aio_last_active_req; i++) {
|
||||
if (-1 == aio_read(&data->aio_reqs[i])) {
|
||||
opal_output(1, "aio_read() error: %s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_ipreadv: error in aio_read(): %s", strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_reqs);
|
||||
free(data->aio_req_status);
|
||||
free(data);
|
||||
|
@ -38,7 +38,8 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
#if defined(FBTL_POSIX_HAVE_AIO)
|
||||
mca_fbtl_posix_request_data_t *data;
|
||||
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
|
||||
int i=0;
|
||||
int i=0, ret;
|
||||
off_t start_offset, end_offset, total_length;
|
||||
|
||||
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
|
||||
if ( NULL == data ) {
|
||||
@ -66,6 +67,7 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
free(data);
|
||||
return 0;
|
||||
}
|
||||
data->aio_fh = fh;
|
||||
|
||||
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
|
||||
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
|
||||
@ -85,10 +87,24 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
|
||||
else {
|
||||
data->aio_last_active_req = data->aio_req_count;
|
||||
}
|
||||
|
||||
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
|
||||
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
|
||||
total_length = (end_offset - start_offset);
|
||||
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
|
||||
if ( 0 < ret ) {
|
||||
opal_output(1, "mca_fbtl_posix_ipwritev: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_reqs);
|
||||
free(data->aio_req_status);
|
||||
free(data);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
for (i=0; i < data->aio_last_active_req; i++) {
|
||||
if (-1 == aio_write(&data->aio_reqs[i])) {
|
||||
opal_output(1, "aio_write() error: %s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_ipwritev: error in aio_write(): %s", strerror(errno));
|
||||
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
|
||||
free(data->aio_req_status);
|
||||
free(data->aio_reqs);
|
||||
free(data);
|
||||
|
149
ompi/mca/fbtl/posix/fbtl_posix_lock.c
Обычный файл
149
ompi/mca/fbtl/posix/fbtl_posix_lock.c
Обычный файл
@ -0,0 +1,149 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2011 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2017 University of Houston. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include "fbtl_posix.h"
|
||||
|
||||
#include "mpi.h"
|
||||
#include <unistd.h>
|
||||
#include <sys/uio.h>
|
||||
#include <errno.h>
|
||||
#include <limits.h>
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/fbtl/fbtl.h"
|
||||
|
||||
#define MAX_ERRCOUNT 100
|
||||
|
||||
/*
|
||||
op: can be F_WRLCK or F_RDLCK
|
||||
flags: can be OMPIO_LOCK_ENTIRE_REGION or OMPIO_LOCK_SELECTIVE. This is typically set by the operation, not the fs component.
|
||||
e.g. a collective and an individual component might require different level of protection through locking,
|
||||
also one might need to do different things for blocking (pwritev,preadv) operations and non-blocking (aio) operations.
|
||||
|
||||
fh->f_flags can contain similar sounding flags, those were set by the fs component and/or user requests.
|
||||
|
||||
Support for MPI atomicity operations are envisioned, but not yet tested.
|
||||
*/
|
||||
|
||||
int mca_fbtl_posix_lock ( struct flock *lock, mca_io_ompio_file_t *fh, int op,
|
||||
OMPI_MPI_OFFSET_TYPE offset, off_t len, int flags)
|
||||
{
|
||||
off_t lmod, bmod;
|
||||
int ret, err_count;
|
||||
|
||||
lock->l_type = op;
|
||||
lock->l_whence = SEEK_SET;
|
||||
lock->l_start =-1;
|
||||
lock->l_len =-1;
|
||||
if ( 0 == len ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( fh->f_flags & OMPIO_LOCK_ENTIRE_FILE ) {
|
||||
lock->l_start = (off_t) 0;
|
||||
lock->l_len = 0;
|
||||
}
|
||||
else {
|
||||
if ( (fh->f_flags & OMPIO_LOCK_NEVER) ||
|
||||
(fh->f_flags & OMPIO_LOCK_NOT_THIS_OP )){
|
||||
/* OMPIO_LOCK_NEVER:
|
||||
ompio tells us not to worry about locking. This can be due to three
|
||||
reasons:
|
||||
1. user enforced
|
||||
2. single node job where the locking is handled already in the kernel
|
||||
3. file view is set to distinct regions such that multiple processes
|
||||
do not collide on the block level. ( not entirely sure yet how
|
||||
to check for this except in trivial cases).
|
||||
OMPI_LOCK_NOT_THIS_OP:
|
||||
will typically be set by fcoll components indicating that the file partitioning
|
||||
ensures no overlap in blocks.
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
if ( flags == OMPIO_LOCK_ENTIRE_REGION ) {
|
||||
lock->l_start = (off_t) offset;
|
||||
lock->l_len = len;
|
||||
}
|
||||
else {
|
||||
/* We only try to lock the first block in the data range if
|
||||
the starting offset is not the starting offset of a file system
|
||||
block. And the last block in the data range if the offset+len
|
||||
is not equal to the end of a file system block.
|
||||
If we need to lock both beginning + end, we combine
|
||||
the two into a single lock.
|
||||
*/
|
||||
bmod = offset % fh->f_fs_block_size;
|
||||
if ( bmod ) {
|
||||
lock->l_start = (off_t) offset;
|
||||
lock->l_len = bmod;
|
||||
}
|
||||
lmod = (offset+len)%fh->f_fs_block_size;
|
||||
if ( lmod ) {
|
||||
if ( !bmod ) {
|
||||
lock->l_start = (offset+len-lmod );
|
||||
lock->l_len = lmod;
|
||||
}
|
||||
else {
|
||||
lock->l_len = len;
|
||||
}
|
||||
}
|
||||
if ( -1 == lock->l_start && -1 == lock->l_len ) {
|
||||
/* no need to lock in this instance */
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef OMPIO_DEBUG
|
||||
printf("%d: acquiring lock for offset %ld length %ld requested offset %ld request len %ld \n",
|
||||
fh->f_rank, lock->l_start, lock->l_len, offset, len);
|
||||
#endif
|
||||
errno=0;
|
||||
err_count=0;
|
||||
do {
|
||||
ret = fcntl ( fh->fd, F_SETLKW, lock);
|
||||
if ( ret ) {
|
||||
#ifdef OMPIO_DEBUG
|
||||
printf("[%d] ret = %d errno=%d %s\n", fh->f_rank, ret, errno, strerror(errno) );
|
||||
#endif
|
||||
err_count++;
|
||||
}
|
||||
} while ( ret && ((errno == EINTR) || ((errno == EINPROGRESS) && err_count < MAX_ERRCOUNT )));
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void mca_fbtl_posix_unlock ( struct flock *lock, mca_io_ompio_file_t *fh )
|
||||
{
|
||||
if ( -1 == lock->l_start && -1 == lock->l_len ) {
|
||||
return;
|
||||
}
|
||||
|
||||
lock->l_type = F_UNLCK;
|
||||
#ifdef OMPIO_DEBUG
|
||||
printf("%d: releasing lock for offset %ld length %ld\n", fh->f_rank, lock->l_start, lock->l_len);
|
||||
#endif
|
||||
fcntl ( fh->fd, F_SETLK, lock);
|
||||
lock->l_start = -1;
|
||||
lock->l_len = -1;
|
||||
|
||||
return;
|
||||
}
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
|
||||
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
|
||||
* Copyright (c) 2015-2017 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -31,11 +31,13 @@
|
||||
ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh )
|
||||
{
|
||||
/*int *fp = NULL;*/
|
||||
int i, block=1;
|
||||
int i, block=1, ret;
|
||||
struct iovec *iov = NULL;
|
||||
int iov_count = 0;
|
||||
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
|
||||
ssize_t bytes_read=0, ret_code=0;
|
||||
struct flock lock;
|
||||
off_t total_length, end_offset=0;
|
||||
|
||||
if (NULL == fh->f_io_array) {
|
||||
return OMPI_ERROR;
|
||||
@ -53,6 +55,7 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
||||
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
|
||||
iov_count ++;
|
||||
}
|
||||
|
||||
@ -75,29 +78,39 @@ ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base =
|
||||
fh->f_io_array[i+1].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
||||
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
|
||||
iov_count ++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
total_length = (end_offset - (off_t)iov_offset );
|
||||
|
||||
ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
|
||||
if ( 0 < ret ) {
|
||||
opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno));
|
||||
free (iov);
|
||||
/* Just in case some part of the lock worked */
|
||||
mca_fbtl_posix_unlock ( &lock, fh);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
#if defined(HAVE_PREADV)
|
||||
ret_code = preadv (fh->fd, iov, iov_count, iov_offset);
|
||||
if ( 0 < ret_code ) {
|
||||
bytes_read+=ret_code;
|
||||
}
|
||||
#else
|
||||
if (-1 == lseek (fh->fd, iov_offset, SEEK_SET)) {
|
||||
opal_output(1, "lseek:%s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_preadv: error in lseek:%s", strerror(errno));
|
||||
free(iov);
|
||||
mca_fbtl_posix_unlock ( &lock, fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
ret_code = readv (fh->fd, iov, iov_count);
|
||||
#endif
|
||||
mca_fbtl_posix_unlock ( &lock, fh );
|
||||
if ( 0 < ret_code ) {
|
||||
bytes_read+=ret_code;
|
||||
}
|
||||
#endif
|
||||
else if ( ret_code == -1 ) {
|
||||
opal_output(1, "readv:%s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_preadv: error in (p)readv:%s", strerror(errno));
|
||||
free(iov);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
|
||||
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
|
||||
* Copyright (c) 2015-2017 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -33,11 +33,13 @@
|
||||
ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
{
|
||||
/*int *fp = NULL;*/
|
||||
int i, block = 1;
|
||||
int i, block = 1, ret;
|
||||
struct iovec *iov = NULL;
|
||||
int iov_count = 0;
|
||||
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
|
||||
ssize_t ret_code=0, bytes_written=0;
|
||||
struct flock lock;
|
||||
off_t total_length, end_offset=0;
|
||||
|
||||
if (NULL == fh->f_io_array) {
|
||||
return OMPI_ERROR;
|
||||
@ -55,6 +57,7 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
||||
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
|
||||
iov_count ++;
|
||||
}
|
||||
|
||||
@ -74,10 +77,10 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
(ptrdiff_t)fh->f_io_array[i].length) ==
|
||||
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset) &&
|
||||
(iov_count < IOV_MAX )) {
|
||||
iov[iov_count].iov_base =
|
||||
fh->f_io_array[i+1].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
||||
iov_count ++;
|
||||
iov[iov_count].iov_base = fh->f_io_array[i+1].memory_address;
|
||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
||||
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
|
||||
iov_count ++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -93,25 +96,33 @@ ssize_t mca_fbtl_posix_pwritev(mca_io_ompio_file_t *fh )
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
total_length = (end_offset - (off_t)iov_offset);
|
||||
ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
|
||||
if ( 0 < ret ) {
|
||||
opal_output(1, "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
|
||||
free (iov);
|
||||
/* just in case some part of the lock worked */
|
||||
mca_fbtl_posix_unlock ( &lock, fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
#if defined (HAVE_PWRITEV)
|
||||
ret_code = pwritev (fh->fd, iov, iov_count, iov_offset);
|
||||
if ( 0 < ret_code ) {
|
||||
bytes_written += ret_code;
|
||||
}
|
||||
|
||||
#else
|
||||
if (-1 == lseek (fh->fd, iov_offset, SEEK_SET)) {
|
||||
opal_output(1, "lseek:%s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_pwritev: error in lseek:%s", strerror(errno));
|
||||
free(iov);
|
||||
mca_fbtl_posix_unlock ( &lock, fh );
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
ret_code = writev (fh->fd, iov, iov_count);
|
||||
#endif
|
||||
mca_fbtl_posix_unlock ( &lock, fh );
|
||||
if ( 0 < ret_code ) {
|
||||
bytes_written += ret_code;
|
||||
}
|
||||
#endif
|
||||
else if (-1 == ret_code ) {
|
||||
opal_output(1, "writev:%s", strerror(errno));
|
||||
opal_output(1, "mca_fbtl_posix_pwritev: error in writev:%s", strerror(errno));
|
||||
free (iov);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
@ -29,6 +29,12 @@
|
||||
#include "ompi/mca/common/ompio/common_ompio.h"
|
||||
|
||||
extern int mca_fs_ufs_priority;
|
||||
extern int mca_fs_ufs_lock_algorithm;
|
||||
|
||||
#define FS_UFS_LOCK_AUTO 0
|
||||
#define FS_UFS_LOCK_NEVER 1
|
||||
#define FS_UFS_LOCK_ENTIRE_FILE 2
|
||||
#define FS_UFS_LOCK_RANGES 3
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
|
@ -31,6 +31,12 @@
|
||||
#include "mpi.h"
|
||||
|
||||
int mca_fs_ufs_priority = 10;
|
||||
int mca_fs_ufs_lock_algorithm=0; /* auto */
|
||||
/*
|
||||
* Private functions
|
||||
*/
|
||||
static int register_component(void);
|
||||
|
||||
|
||||
/*
|
||||
* Public string showing the fs ufs component version number
|
||||
@ -54,6 +60,7 @@ mca_fs_base_component_2_0_0_t mca_fs_ufs_component = {
|
||||
.mca_component_name = "ufs",
|
||||
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
|
||||
OMPI_RELEASE_VERSION),
|
||||
.mca_register_component_params = register_component,
|
||||
},
|
||||
.fsm_data = {
|
||||
/* This component is checkpointable */
|
||||
@ -63,3 +70,26 @@ mca_fs_base_component_2_0_0_t mca_fs_ufs_component = {
|
||||
.fsm_file_query = mca_fs_ufs_component_file_query, /* get priority and actions */
|
||||
.fsm_file_unquery = mca_fs_ufs_component_file_unquery, /* undo what was done by previous function */
|
||||
};
|
||||
|
||||
static int register_component(void)
|
||||
{
|
||||
mca_fs_ufs_priority = 10;
|
||||
(void) mca_base_component_var_register(&mca_fs_ufs_component.fsm_version,
|
||||
"priority", "Priority of the fs ufs component",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_fs_ufs_priority);
|
||||
|
||||
mca_fs_ufs_lock_algorithm = 0;
|
||||
(void) mca_base_component_var_register(&mca_fs_ufs_component.fsm_version,
|
||||
"lock_algorithm", "Locking algorithm used by the fs ufs component. "
|
||||
" 0: auto (default), 1: skip locking, 2: always lock entire file, "
|
||||
"3: lock only specific ranges",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_fs_ufs_lock_algorithm );
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -28,9 +28,11 @@
|
||||
#include <sys/stat.h>
|
||||
#include "mpi.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/fs/base/base.h"
|
||||
#include "ompi/mca/fs/fs.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/info/info.h"
|
||||
#include "opal/util/path.h"
|
||||
|
||||
/*
|
||||
* file_open_ufs
|
||||
@ -97,5 +99,54 @@ mca_fs_ufs_file_open (struct ompi_communicator_t *comm,
|
||||
fh->f_stripe_size=0;
|
||||
fh->f_stripe_count=1;
|
||||
|
||||
/* Need to check for NFS here. If the file system is not NFS but a regular UFS file system,
|
||||
we do not need to enforce locking. A regular XFS or EXT4 file system can only be used
|
||||
within a single node, local environment, and in this case the OS will already ensure correct
|
||||
handling of file system blocks;
|
||||
*/
|
||||
|
||||
if ( FS_UFS_LOCK_AUTO == mca_fs_ufs_lock_algorithm ) {
|
||||
char *fstype=NULL;
|
||||
bool bret = opal_path_nfs ( (char *)filename, &fstype );
|
||||
|
||||
if ( false == bret ) {
|
||||
char *dir;
|
||||
mca_fs_base_get_parent_dir ( (char *)filename, &dir );
|
||||
bret = opal_path_nfs (dir, &fstype);
|
||||
free(dir);
|
||||
}
|
||||
|
||||
if ( true == bret ) {
|
||||
if ( 0 == strncasecmp(fstype, "nfs", sizeof("nfs")) ) {
|
||||
/* Based on my tests, only locking the entire file for all operations
|
||||
guarantueed for the entire teststuite to pass correctly. I would not
|
||||
be surprised, if depending on the NFS configuration that might not always
|
||||
be necessary, and the user can change that with an MCA parameter of this
|
||||
component. */
|
||||
fh->f_flags |= OMPIO_LOCK_ENTIRE_FILE;
|
||||
}
|
||||
else {
|
||||
fh->f_flags |= OMPIO_LOCK_NEVER;
|
||||
}
|
||||
}
|
||||
else {
|
||||
fh->f_flags |= OMPIO_LOCK_NEVER;
|
||||
}
|
||||
free (fstype);
|
||||
}
|
||||
else if ( FS_UFS_LOCK_NEVER == mca_fs_ufs_lock_algorithm ) {
|
||||
fh->f_flags |= OMPIO_LOCK_NEVER;
|
||||
}
|
||||
else if ( FS_UFS_LOCK_ENTIRE_FILE == mca_fs_ufs_lock_algorithm ) {
|
||||
fh->f_flags |= OMPIO_LOCK_ENTIRE_FILE;
|
||||
}
|
||||
else if ( FS_UFS_LOCK_RANGES == mca_fs_ufs_lock_algorithm ) {
|
||||
/* Nothing to be done. This is what the posix fbtl component would do
|
||||
anyway without additional information . */
|
||||
}
|
||||
else {
|
||||
opal_output ( 1, "Invalid value for mca_fs_ufs_lock_algorithm %d", mca_fs_ufs_lock_algorithm );
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -64,6 +64,10 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;
|
||||
#define OMPIO_CONTIGUOUS_FVIEW 0x00000010
|
||||
#define OMPIO_AGGREGATOR_IS_SET 0x00000020
|
||||
#define OMPIO_SHAREDFP_IS_SET 0x00000040
|
||||
#define OMPIO_LOCK_ENTIRE_FILE 0x00000080
|
||||
#define OMPIO_LOCK_NEVER 0x00000100
|
||||
#define OMPIO_LOCK_NOT_THIS_OP 0x00000200
|
||||
|
||||
|
||||
#define QUEUESIZE 2048
|
||||
#define MCA_IO_DEFAULT_FILE_VIEW_SIZE 4*1024*1024
|
||||
@ -121,6 +125,10 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;
|
||||
#define OMPIO_PROCS_IN_GROUP_TAG 1
|
||||
#define OMPIO_MERGE_THRESHOLD 0.5
|
||||
|
||||
|
||||
#define OMPIO_LOCK_ENTIRE_REGION 10
|
||||
#define OMPIO_LOCK_SELECTIVE 11
|
||||
|
||||
/*---------------------------*/
|
||||
|
||||
BEGIN_C_DECLS
|
||||
@ -216,6 +224,7 @@ struct mca_io_ompio_file_t {
|
||||
opal_info_t *f_info;
|
||||
int32_t f_flags;
|
||||
void *f_fs_ptr;
|
||||
int f_fs_block_size;
|
||||
int f_atomicity;
|
||||
size_t f_stripe_size;
|
||||
int f_stripe_count;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user