Merge pull request #1279 from edgargabriel/pr/plfs-fixes-2
fixes for the PLFS file system
Этот коммит содержится в:
Коммит
d827da5736
@ -28,74 +28,19 @@
|
|||||||
ssize_t mca_fbtl_plfs_preadv (mca_io_ompio_file_t *fh )
|
ssize_t mca_fbtl_plfs_preadv (mca_io_ompio_file_t *fh )
|
||||||
{
|
{
|
||||||
|
|
||||||
Plfs_fd *pfd = NULL;
|
Plfs_fd *pfd = fh->f_fs_ptr;
|
||||||
plfs_error_t plfs_ret;
|
plfs_error_t plfs_ret;
|
||||||
pfd = fh->f_fs_ptr;
|
|
||||||
ssize_t total_bytes_read=0;
|
ssize_t total_bytes_read=0;
|
||||||
|
int i;
|
||||||
int i, block=1;
|
ssize_t bytes_read;
|
||||||
struct iovec *iov = NULL;
|
|
||||||
int iov_count = 0;
|
|
||||||
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
|
|
||||||
|
|
||||||
if (NULL == fh->f_io_array) {
|
if (NULL == fh->f_io_array) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
iov = (struct iovec *) malloc
|
|
||||||
(OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
|
|
||||||
if (NULL == iov) {
|
|
||||||
opal_output(1, "OUT OF MEMORY\n");
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
|
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
|
||||||
if (0 == iov_count) {
|
plfs_ret = plfs_read( pfd, fh->f_io_array[i].memory_address, fh->f_io_array[i].length,
|
||||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
(off_t )fh->f_io_array[i].offset, &bytes_read );
|
||||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
|
||||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
|
||||||
iov_count ++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OMPIO_IOVEC_INITIAL_SIZE*block <= iov_count) {
|
|
||||||
block ++;
|
|
||||||
iov = (struct iovec *)realloc
|
|
||||||
(iov, OMPIO_IOVEC_INITIAL_SIZE * block *
|
|
||||||
sizeof(struct iovec));
|
|
||||||
if (NULL == iov) {
|
|
||||||
opal_output(1, "OUT OF MEMORY\n");
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fh->f_num_of_io_entries != i+1) {
|
|
||||||
if (((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
|
|
||||||
(OPAL_PTRDIFF_TYPE)fh->f_io_array[i].length) ==
|
|
||||||
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset) {
|
|
||||||
iov[iov_count].iov_base =
|
|
||||||
fh->f_io_array[i+1].memory_address;
|
|
||||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
|
||||||
iov_count ++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the total number of bytes to be read.
|
|
||||||
size_t bytes = 0;
|
|
||||||
for (int i = 0; i < iov_count; ++i) {
|
|
||||||
bytes += iov[i].iov_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate a temporary buffer to hold the data
|
|
||||||
char *buffer;
|
|
||||||
buffer = (char *) malloc (bytes);
|
|
||||||
if (buffer == NULL) {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the data
|
|
||||||
ssize_t bytes_read;
|
|
||||||
plfs_ret = plfs_read( pfd, buffer, bytes, iov_offset, &bytes_read );
|
|
||||||
if (PLFS_SUCCESS != plfs_ret) {
|
if (PLFS_SUCCESS != plfs_ret) {
|
||||||
opal_output(0, "fbtl_plfs_preadv: Error in plfs_read:\n%s\n", strplfserr(plfs_ret));
|
opal_output(0, "fbtl_plfs_preadv: Error in plfs_read:\n%s\n", strplfserr(plfs_ret));
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
@ -104,27 +49,6 @@ ssize_t mca_fbtl_plfs_preadv (mca_io_ompio_file_t *fh )
|
|||||||
if (bytes_read < 0)
|
if (bytes_read < 0)
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
total_bytes_read += bytes_read;
|
total_bytes_read += bytes_read;
|
||||||
// Copy the data from BUFFER into the memory specified by IOV
|
|
||||||
bytes = bytes_read;
|
|
||||||
for (int i = 0; i < iov_count; ++i) {
|
|
||||||
size_t copy = MIN (iov[i].iov_len, bytes);
|
|
||||||
(void) memcpy ((void *) iov[i].iov_base, (void *) buffer, copy);
|
|
||||||
buffer += copy;
|
|
||||||
bytes -= copy;
|
|
||||||
if (bytes == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
iov_count = 0;
|
|
||||||
if ( NULL != buffer ) {
|
|
||||||
free (buffer);
|
|
||||||
buffer=NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL != iov) {
|
|
||||||
free (iov);
|
|
||||||
iov = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return total_bytes_read;
|
return total_bytes_read;
|
||||||
|
@ -28,102 +28,26 @@
|
|||||||
|
|
||||||
ssize_t mca_fbtl_plfs_pwritev (mca_io_ompio_file_t *fh )
|
ssize_t mca_fbtl_plfs_pwritev (mca_io_ompio_file_t *fh )
|
||||||
{
|
{
|
||||||
Plfs_fd *pfd = NULL;
|
Plfs_fd *pfd = fh->f_fs_ptr;
|
||||||
plfs_error_t plfs_ret;
|
plfs_error_t plfs_ret;
|
||||||
pfd = fh->f_fs_ptr;
|
|
||||||
ssize_t total_bytes_written=0;
|
ssize_t total_bytes_written=0;
|
||||||
|
ssize_t bytes_written;
|
||||||
int i, block = 1;
|
int i;
|
||||||
struct iovec *iov = NULL;
|
|
||||||
int iov_count = 0;
|
|
||||||
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
|
|
||||||
|
|
||||||
if (NULL == fh->f_io_array) {
|
if (NULL == fh->f_io_array) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
iov = (struct iovec *) malloc
|
|
||||||
(OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
|
|
||||||
if (NULL == iov) {
|
|
||||||
opal_output(1, "OUT OF MEMORY\n");
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
|
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
|
||||||
if (0 == iov_count) {
|
plfs_ret = plfs_write( pfd, fh->f_io_array[i].memory_address,
|
||||||
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
|
fh->f_io_array[i].length,
|
||||||
iov[iov_count].iov_len = fh->f_io_array[i].length;
|
(off_t) fh->f_io_array[i].offset,
|
||||||
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
|
fh->f_rank, &bytes_written );
|
||||||
iov_count ++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OMPIO_IOVEC_INITIAL_SIZE*block <= iov_count) {
|
|
||||||
block ++;
|
|
||||||
iov = (struct iovec *)realloc
|
|
||||||
(iov, OMPIO_IOVEC_INITIAL_SIZE * block *
|
|
||||||
sizeof(struct iovec));
|
|
||||||
if (NULL == iov) {
|
|
||||||
opal_output(1, "OUT OF MEMORY\n");
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fh->f_num_of_io_entries != i+1) {
|
|
||||||
if (((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
|
|
||||||
(OPAL_PTRDIFF_TYPE)fh->f_io_array[i].length) ==
|
|
||||||
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset) {
|
|
||||||
iov[iov_count].iov_base =
|
|
||||||
fh->f_io_array[i+1].memory_address;
|
|
||||||
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
|
|
||||||
iov_count ++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the total number of bytes to be written.
|
|
||||||
size_t bytes = 0;
|
|
||||||
for (int i = 0; i < iov_count; ++i) {
|
|
||||||
bytes += iov[i].iov_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate a temporary buffer to hold the data
|
|
||||||
char *buffer=NULL;
|
|
||||||
buffer = (char *) malloc (bytes);
|
|
||||||
if (buffer == NULL) {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy the data into BUFFER.
|
|
||||||
size_t to_copy = bytes;
|
|
||||||
char *bp = buffer;
|
|
||||||
for (int i = 0; i < iov_count; ++i) {
|
|
||||||
size_t copy = MIN (iov[i].iov_len, to_copy);
|
|
||||||
bp = mempcpy ((void *) bp, (void *) iov[i].iov_base, copy);
|
|
||||||
to_copy -= copy;
|
|
||||||
if (to_copy == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write the data
|
|
||||||
ssize_t bytes_written;
|
|
||||||
|
|
||||||
plfs_ret = plfs_write( pfd, buffer, bytes, iov_offset, 0, &bytes_written );
|
|
||||||
if (PLFS_SUCCESS != plfs_ret) {
|
if (PLFS_SUCCESS != plfs_ret) {
|
||||||
opal_output(0, "fbtl_plfs_pwritev: Error in plfs_write:\n%s\n", strplfserr(plfs_ret));
|
opal_output(0, "fbtl_plfs_pwritev: Error in plfs_write:\n%s\n", strplfserr(plfs_ret));
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
total_bytes_written += bytes_written;
|
total_bytes_written += bytes_written;
|
||||||
iov_count = 0;
|
|
||||||
if ( NULL != buffer ) {
|
|
||||||
free ( buffer );
|
|
||||||
buffer=NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL != iov) {
|
|
||||||
free (iov);
|
|
||||||
iov = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return total_bytes_written;
|
return total_bytes_written;
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
#include <plfs.h>
|
#include <plfs.h>
|
||||||
|
|
||||||
extern int mca_fs_plfs_priority;
|
extern int mca_fs_plfs_priority;
|
||||||
extern int mca_fs_plfs_num_hostdir;
|
|
||||||
|
|
||||||
BEGIN_C_DECLS
|
BEGIN_C_DECLS
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ const char *mca_fs_plfs_component_version_string =
|
|||||||
static int plfs_register(void);
|
static int plfs_register(void);
|
||||||
|
|
||||||
int mca_fs_plfs_priority = 20;
|
int mca_fs_plfs_priority = 20;
|
||||||
int mca_fs_plfs_num_hostdir = -1;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Instantiate the public struct with all of our public information
|
* Instantiate the public struct with all of our public information
|
||||||
@ -77,12 +76,6 @@ plfs_register(void)
|
|||||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||||
OPAL_INFO_LVL_9,
|
OPAL_INFO_LVL_9,
|
||||||
MCA_BASE_VAR_SCOPE_READONLY, &mca_fs_plfs_priority);
|
MCA_BASE_VAR_SCOPE_READONLY, &mca_fs_plfs_priority);
|
||||||
mca_fs_plfs_num_hostdir = -1;
|
|
||||||
(void) mca_base_component_var_register(&mca_fs_plfs_component.fsm_version,
|
|
||||||
"num_hostdir", "number of host directories of a file over plfs",
|
|
||||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
|
||||||
OPAL_INFO_LVL_9,
|
|
||||||
MCA_BASE_VAR_SCOPE_READONLY, &mca_fs_plfs_num_hostdir);
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,9 @@ mca_fs_plfs_file_close (mca_io_ompio_file_t *fh)
|
|||||||
getcwd( wpath, sizeof(wpath) );
|
getcwd( wpath, sizeof(wpath) );
|
||||||
sprintf( wpath,"%s/%s",wpath,fh->f_filename );
|
sprintf( wpath,"%s/%s",wpath,fh->f_filename );
|
||||||
|
|
||||||
if(-1 == access(fh->f_filename, F_OK)) {
|
plfs_ret = plfs_access(wpath, F_OK);
|
||||||
|
if ( PLFS_SUCCESS != plfs_ret ) {
|
||||||
|
opal_output(0, "fs_plfs_file_close: Error in plfs_access:\n%s\n", strplfserr(plfs_ret));
|
||||||
return OMPI_ERROR; // file doesn't exist
|
return OMPI_ERROR; // file doesn't exist
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,7 +68,14 @@ mca_fs_plfs_file_close (mca_io_ompio_file_t *fh)
|
|||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
plfs_ret = plfs_close(fh->f_fs_ptr, 0, 0, amode ,NULL, &flags);
|
plfs_ret = plfs_sync(fh->f_fs_ptr);
|
||||||
|
if (PLFS_SUCCESS != plfs_ret) {
|
||||||
|
opal_output(0, "fs_plfs_file_close: Error in plfs_sync:\n%s\n", strplfserr(plfs_ret));
|
||||||
|
return OMPI_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
plfs_ret = plfs_close(fh->f_fs_ptr, fh->f_rank, 0, amode ,NULL, &flags);
|
||||||
if (PLFS_SUCCESS != plfs_ret) {
|
if (PLFS_SUCCESS != plfs_ret) {
|
||||||
opal_output(0, "fs_plfs_file_close: Error in plfs_close:\n%s\n", strplfserr(plfs_ret));
|
opal_output(0, "fs_plfs_file_close: Error in plfs_close:\n%s\n", strplfserr(plfs_ret));
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
|
@ -53,8 +53,6 @@ mca_fs_plfs_file_open (struct ompi_communicator_t *comm,
|
|||||||
plfs_error_t plfs_ret;
|
plfs_error_t plfs_ret;
|
||||||
Plfs_fd *pfd = NULL;
|
Plfs_fd *pfd = NULL;
|
||||||
char wpath[1024];
|
char wpath[1024];
|
||||||
size_t len = sizeof(int);
|
|
||||||
char key[] = "num_hostdirs";
|
|
||||||
|
|
||||||
rank = ompi_comm_rank ( comm );
|
rank = ompi_comm_rank ( comm );
|
||||||
|
|
||||||
@ -89,7 +87,7 @@ mca_fs_plfs_file_open (struct ompi_communicator_t *comm,
|
|||||||
if (access_mode & MPI_MODE_CREATE)
|
if (access_mode & MPI_MODE_CREATE)
|
||||||
amode = amode | O_CREAT;
|
amode = amode | O_CREAT;
|
||||||
|
|
||||||
plfs_ret = plfs_open( &pfd, wpath, amode, 0, perm, NULL );
|
plfs_ret = plfs_open( &pfd, wpath, amode, fh->f_rank, perm, NULL );
|
||||||
fh->f_fs_ptr = pfd;
|
fh->f_fs_ptr = pfd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +97,7 @@ mca_fs_plfs_file_open (struct ompi_communicator_t *comm,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (0 != rank) {
|
if (0 != rank) {
|
||||||
plfs_ret = plfs_open( &pfd, wpath, amode, 0, perm, NULL );
|
plfs_ret = plfs_open( &pfd, wpath, amode, fh->f_rank, perm, NULL );
|
||||||
if (PLFS_SUCCESS != plfs_ret) {
|
if (PLFS_SUCCESS != plfs_ret) {
|
||||||
opal_output(0, "fs_plfs_file_open: Error in plfs_open:\n%s\n", strplfserr(plfs_ret));
|
opal_output(0, "fs_plfs_file_open: Error in plfs_open:\n%s\n", strplfserr(plfs_ret));
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
@ -109,12 +107,5 @@ mca_fs_plfs_file_open (struct ompi_communicator_t *comm,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mca_fs_plfs_num_hostdir > 0) {
|
|
||||||
plfs_ret = plfs_setxattr( pfd, &mca_fs_plfs_num_hostdir, key, len );
|
|
||||||
if (PLFS_SUCCESS != plfs_ret) {
|
|
||||||
opal_output(0, "fs_plfs_file_open: Error in plfs_setxattr:\n%s\n", strplfserr(plfs_ret));
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user