1
1

Merge pull request #8297 from edgargabriel/pr/v4.1.x-ompio-sync

ompio: resync v4.1 branch to master
Этот коммит содержится в:
Raghu Raja 2020-12-18 08:40:08 -08:00 коммит произвёл GitHub
родитель 7fd4f3261d b6c0baccdd
Коммит 7de39931a3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
37 изменённых файлов: 1093 добавлений и 622 удалений

4
NEWS
Просмотреть файл

@ -64,6 +64,10 @@ included in the vX.Y.Z section and be denoted as:
Both components are off by default and can be enabled by specifying
"mpirun --mca coll_adapt_priority 100 --mca coll_han_priority 100 ...".
We intend to enable both by default in Open MPI 5.0.
- OMPIO is now the default for MPI-IO on all filesystems, including
Lustre (prior to this, ROMIO was the default for Lustre). Many
thanks to Mark Dixon for identifying MPI I/O issues and providing
access to Lustre systems for testing.
- Updates for macOS Big Sur. Thanks to FX Coudert for reporting this
issue and pointing to a solution.
- Minor MPI one-sided RDMA performance improvements.

Просмотреть файл

@ -25,6 +25,7 @@ headers = \
common_ompio_aggregators.h \
common_ompio_print_queue.h \
common_ompio_request.h \
common_ompio_buffer.h \
common_ompio.h
sources = \
@ -34,6 +35,7 @@ sources = \
common_ompio_file_open.c \
common_ompio_file_view.c \
common_ompio_file_read.c \
common_ompio_buffer.c \
common_ompio_file_write.c
@ -74,10 +76,6 @@ else
ompidir = $(includedir)
endif
if OPAL_cuda_support
headers += common_ompio_cuda.h
sources += common_ompio_cuda.c
endif
# These two rules will sym link the "noinst" libtool library filename
# to the installable libtool library filename in the case where we are

Просмотреть файл

@ -29,7 +29,6 @@
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "ompi/errhandler/errhandler.h"
#include "opal/threads/mutex.h"
#include "ompi/file/file.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/fs/fs.h"
@ -66,7 +65,8 @@
#define OMPIO_LOCK_ENTIRE_FILE 0x00000080
#define OMPIO_LOCK_NEVER 0x00000100
#define OMPIO_LOCK_NOT_THIS_OP 0x00000200
#define OMPIO_DATAREP_NATIVE 0x00000400
#define OMPIO_COLLECTIVE_OP 0x00000800
#define OMPIO_ROOT 0
@ -158,7 +158,8 @@ struct ompio_file_t {
ompi_communicator_t *f_comm;
const char *f_filename;
char *f_datarep;
opal_convertor_t *f_convertor;
opal_convertor_t *f_mem_convertor;
opal_convertor_t *f_file_convertor;
opal_info_t *f_info;
int32_t f_flags;
void *f_fs_ptr;
@ -255,10 +256,16 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at (ompio_file_t *fh, OMPI_MPI_O
const void *buf, int count, struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int mca_common_ompio_file_write_all (ompio_file_t *fh, const void *buf,
int count, struct ompi_datatype_t *datatype,
ompi_status_public_t *status);
OMPI_DECLSPEC int mca_common_ompio_file_write_at_all (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset, const void *buf,
int count, struct ompi_datatype_t *datatype,
ompi_status_public_t *status);
OMPI_DECLSPEC int mca_common_ompio_file_iwrite_all (ompio_file_t *fp, const void *buf,
int count, struct ompi_datatype_t *datatype, ompi_request_t **request);
OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MPI_OFFSET_TYPE offset, const void *buf,
int count, struct ompi_datatype_t *datatype, ompi_request_t **request);
@ -266,7 +273,8 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MP
OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
size_t bytes_per_cycle, size_t max_data, uint32_t iov_count,
struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw,
size_t *spc );
size_t *spc, mca_common_ompio_io_array_t **io_array,
int *num_io_entries );
OMPI_DECLSPEC int mca_common_ompio_file_read (ompio_file_t *fh, void *buf, int count,
@ -283,10 +291,16 @@ OMPI_DECLSPEC int mca_common_ompio_file_iread_at (ompio_file_t *fh, OMPI_MPI_OFF
void *buf, int count, struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int mca_common_ompio_file_read_all (ompio_file_t *fh, void *buf, int count, struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
OMPI_DECLSPEC int mca_common_ompio_file_read_at_all (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset,
void *buf, int count, struct ompi_datatype_t *datatype,
ompi_status_public_t * status);
OMPI_DECLSPEC int mca_common_ompio_file_iread_all (ompio_file_t *fp, void *buf, int count, struct ompi_datatype_t *datatype,
ompi_request_t **request);
OMPI_DECLSPEC int mca_common_ompio_file_iread_at_all (ompio_file_t *fp, OMPI_MPI_OFFSET_TYPE offset,
void *buf, int count, struct ompi_datatype_t *datatype,
ompi_request_t **request);
@ -319,6 +333,7 @@ OMPI_DECLSPEC int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
int count,
const void *buf,
size_t *max_data,
opal_convertor_t *convertor,
struct iovec **iov,
uint32_t *iov_count);

Просмотреть файл

@ -1303,12 +1303,14 @@ int mca_common_ompio_prepare_to_group(ompio_file_t *fh,
fh->f_comm);
if ( OMPI_SUCCESS != ret ) {
opal_output (1, "mca_common_ompio_prepare_to_group: error in ompi_fcoll_base_coll_allgather_array\n");
free(start_offsets_lens_tmp);
goto exit;
}
end_offsets_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
if (NULL == end_offsets_tmp) {
opal_output (1, "OUT OF MEMORY\n");
goto exit;
free(start_offsets_lens_tmp);
return OMPI_ERR_OUT_OF_RESOURCE;
}
for( k = 0 ; k < fh->f_init_procs_per_group; k++){
end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1];
@ -1333,14 +1335,12 @@ int mca_common_ompio_prepare_to_group(ompio_file_t *fh,
if (NULL == aggr_bytes_per_group_tmp) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
free(end_offsets_tmp);
goto exit;
}
decision_list_tmp = (int* )malloc (fh->f_init_num_aggrs * sizeof(int));
if (NULL == decision_list_tmp) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
free(end_offsets_tmp);
if (NULL != aggr_bytes_per_group_tmp) {
free(aggr_bytes_per_group_tmp);
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -27,18 +27,20 @@
#include "opal/mca/allocator/allocator.h"
#include "opal/mca/allocator/base/base.h"
#include "common_ompio.h"
#include "common_ompio_cuda.h"
#include "common_ompio_buffer.h"
static opal_mutex_t mca_common_ompio_cuda_mutex; /* lock for thread safety */
static opal_mutex_t mca_common_ompio_buffer_mutex; /* lock for thread safety */
static mca_allocator_base_component_t* mca_common_ompio_allocator_component=NULL;
static mca_allocator_base_module_t* mca_common_ompio_allocator=NULL;
static int32_t mca_common_ompio_cuda_init = 0;
//static opal_atomic_int32_t mca_common_ompio_buffer_init = 0;
static int32_t mca_common_ompio_buffer_init = 0;
static int32_t mca_common_ompio_pagesize=4096;
static void* mca_common_ompio_cuda_alloc_seg ( void *ctx, size_t *size );
static void mca_common_ompio_cuda_free_seg ( void *ctx, void *buf );
static void* mca_common_ompio_buffer_alloc_seg ( void *ctx, size_t *size );
static void mca_common_ompio_buffer_free_seg ( void *ctx, void *buf );
#if OPAL_CUDA_SUPPORT
void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf, int *is_gpu,
int *is_managed)
{
@ -57,8 +59,9 @@ void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf, int *is
return;
}
#endif
static void* mca_common_ompio_cuda_alloc_seg ( void*ctx, size_t *size )
static void* mca_common_ompio_buffer_alloc_seg ( void*ctx, size_t *size )
{
char *buf=NULL;
size_t realsize, numpages;
@ -67,64 +70,67 @@ static void* mca_common_ompio_cuda_alloc_seg ( void*ctx, size_t *size )
realsize = numpages * mca_common_ompio_pagesize;
buf = malloc ( realsize);
#if OPAL_CUDA_SUPPORT
if ( NULL != buf ) {
mca_common_cuda_register ( ( char *)buf, realsize, NULL );
}
#endif
*size = realsize;
return buf;
}
static void mca_common_ompio_cuda_free_seg ( void *ctx, void *buf )
static void mca_common_ompio_buffer_free_seg ( void *ctx, void *buf )
{
if ( NULL != buf ) {
#if OPAL_CUDA_SUPPORT
mca_common_cuda_unregister ( (char *) buf, NULL );
#endif
free ( buf );
}
return;
}
int mca_common_ompio_cuda_alloc_init ( void )
int mca_common_ompio_buffer_alloc_init ( void )
{
bool thread_safe=true;
if(OPAL_THREAD_ADD_FETCH32(&mca_common_ompio_cuda_init, 1) > 1)
if(OPAL_THREAD_ADD_FETCH32(&mca_common_ompio_buffer_init, 1) > 1)
return OMPI_SUCCESS;
/* initialize static objects */
OBJ_CONSTRUCT(&mca_common_ompio_cuda_mutex, opal_mutex_t);
OBJ_CONSTRUCT(&mca_common_ompio_buffer_mutex, opal_mutex_t);
OPAL_THREAD_LOCK (&mca_common_ompio_cuda_mutex );
OPAL_THREAD_LOCK (&mca_common_ompio_buffer_mutex );
/* lookup name of the allocator to use */
if(NULL == (mca_common_ompio_allocator_component = mca_allocator_component_lookup("basic"))) {
OPAL_THREAD_UNLOCK(&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK(&mca_common_ompio_buffer_mutex);
return OMPI_ERR_BUFFER;
}
/* create an instance of the allocator */
mca_common_ompio_allocator = mca_common_ompio_allocator_component->allocator_init(thread_safe,
mca_common_ompio_cuda_alloc_seg,
mca_common_ompio_cuda_free_seg,
mca_common_ompio_buffer_alloc_seg,
mca_common_ompio_buffer_free_seg,
NULL);
if(NULL == mca_common_ompio_allocator) {
OPAL_THREAD_UNLOCK(&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK(&mca_common_ompio_buffer_mutex);
return OMPI_ERR_BUFFER;
}
// mca_common_ompio_pagesize = sysconf(_SC_PAGESIZE);
mca_common_ompio_pagesize = opal_getpagesize();
OPAL_THREAD_UNLOCK(&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK(&mca_common_ompio_buffer_mutex);
return OMPI_SUCCESS;
}
int mca_common_ompio_cuda_alloc_fini ( void )
int mca_common_ompio_buffer_alloc_fini ( void )
{
if ( NULL != mca_common_ompio_allocator ) {
OPAL_THREAD_LOCK (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_LOCK (&mca_common_ompio_buffer_mutex);
mca_common_ompio_allocator->alc_finalize(mca_common_ompio_allocator);
mca_common_ompio_allocator=NULL;
OPAL_THREAD_UNLOCK (&mca_common_ompio_cuda_mutex);
OBJ_DESTRUCT (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK (&mca_common_ompio_buffer_mutex);
OBJ_DESTRUCT (&mca_common_ompio_buffer_mutex);
}
return OMPI_SUCCESS;
@ -134,31 +140,31 @@ void *mca_common_ompio_alloc_buf ( ompio_file_t *fh, size_t bufsize )
{
char *tmp=NULL;
if ( !mca_common_ompio_cuda_init ){
mca_common_ompio_cuda_alloc_init ();
if ( !mca_common_ompio_buffer_init ){
mca_common_ompio_buffer_alloc_init ();
}
OPAL_THREAD_LOCK (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_LOCK (&mca_common_ompio_buffer_mutex);
tmp = mca_common_ompio_allocator->alc_alloc (mca_common_ompio_allocator,
bufsize, 0 );
OPAL_THREAD_UNLOCK (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK (&mca_common_ompio_buffer_mutex);
return tmp;
}
void mca_common_ompio_release_buf ( ompio_file_t *fh, void *buf )
{
if ( !mca_common_ompio_cuda_init ){
if ( !mca_common_ompio_buffer_init ){
/* Should not happen. You can not release a buf without
** having it allocated first.
*/
opal_output (1, "error in mca_common_ompio_release_buf: allocator not initialized\n");
}
OPAL_THREAD_LOCK (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_LOCK (&mca_common_ompio_buffer_mutex);
mca_common_ompio_allocator->alc_free (mca_common_ompio_allocator,
buf);
OPAL_THREAD_UNLOCK (&mca_common_ompio_cuda_mutex);
OPAL_THREAD_UNLOCK (&mca_common_ompio_buffer_mutex);
return;
}

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -22,9 +22,9 @@
#define MCA_COMMON_OMPIO_CUDA_H
#define OMPIO_CUDA_PREPARE_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_decoded_iov,_iov_count){ \
opal_convertor_clone ( _fh->f_convertor, _convertor, 0); \
opal_convertor_prepare_for_send ( _convertor, &(_datatype->super), _count, _buf );\
#define OMPIO_PREPARE_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_decoded_iov,_iov_count){ \
OBJ_CONSTRUCT( _convertor, opal_convertor_t); \
opal_convertor_copy_and_prepare_for_send ( _fh->f_file_convertor, &(_datatype->super), _count, _buf, CONVERTOR_SEND_CONVERSION, _convertor ); \
opal_convertor_get_packed_size( _convertor, &_max_data ); \
_tbuf = mca_common_ompio_alloc_buf (_fh, _max_data); \
if ( NULL == _tbuf ) { \
@ -40,11 +40,30 @@
_decoded_iov->iov_len = _max_data; \
_iov_count=1;}
#define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_decoded_iov,_iov_count){ \
OBJ_CONSTRUCT( _convertor, opal_convertor_t); \
opal_convertor_copy_and_prepare_for_recv ( _fh->f_file_convertor, &(_datatype->super), _count, _buf, 0, _convertor ); \
opal_convertor_get_packed_size( _convertor, &_max_data ); \
_tbuf = mca_common_ompio_alloc_buf (_fh, _max_data); \
if ( NULL == _tbuf ) { \
opal_output(1, "common_ompio: error allocating memory\n"); \
return OMPI_ERR_OUT_OF_RESOURCE; \
} \
_decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \
if ( NULL == _decoded_iov ) { \
opal_output(1, "common_ompio: could not allocate memory.\n"); \
return OMPI_ERR_OUT_OF_RESOURCE; \
} \
_decoded_iov->iov_base = _tbuf; \
_decoded_iov->iov_len = _max_data; \
_iov_count=1;}
#if OPAL_CUDA_SUPPORT
void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf,
int *is_gpu, int *is_managed);
int mca_common_ompio_cuda_alloc_init ( void );
int mca_common_ompio_cuda_alloc_fini ( void );
#endif
int mca_common_ompio_buffer_alloc_init ( void );
int mca_common_ompio_buffer_alloc_fini ( void );
void* mca_common_ompio_alloc_buf ( ompio_file_t *fh, size_t bufsize);

Просмотреть файл

@ -75,7 +75,8 @@ int mca_common_ompio_file_open (ompi_communicator_t *comm,
ompio_fh->f_rank = ompi_comm_rank (comm);
ompio_fh->f_size = ompi_comm_size (comm);
remote_arch = opal_local_arch;
ompio_fh->f_convertor = opal_convertor_create (remote_arch, 0);
ompio_fh->f_mem_convertor = opal_convertor_create (remote_arch, 0);
ompio_fh->f_file_convertor = opal_convertor_create (remote_arch, 0);
if ( true == use_sharedfp ) {
ret = ompi_comm_dup (comm, &ompio_fh->f_comm);
@ -323,17 +324,23 @@ int mca_common_ompio_file_close (ompio_file_t *ompio_fh)
ompio_fh->f_decoded_iov = NULL;
}
if (NULL != ompio_fh->f_convertor) {
free (ompio_fh->f_convertor);
ompio_fh->f_convertor = NULL;
if (NULL != ompio_fh->f_mem_convertor) {
opal_convertor_cleanup (ompio_fh->f_mem_convertor);
free (ompio_fh->f_mem_convertor);
ompio_fh->f_mem_convertor = NULL;
}
if (NULL != ompio_fh->f_file_convertor) {
opal_convertor_cleanup (ompio_fh->f_file_convertor);
free (ompio_fh->f_file_convertor);
ompio_fh->f_file_convertor = NULL;
}
if (NULL != ompio_fh->f_datarep) {
free (ompio_fh->f_datarep);
ompio_fh->f_datarep = NULL;
}
if ( NULL != ompio_fh->f_coll_write_time ) {
free ( ompio_fh->f_coll_write_time );
ompio_fh->f_coll_write_time = NULL;
@ -387,10 +394,10 @@ int mca_common_ompio_file_get_position (ompio_file_t *fh,
if ( 0 == fh->f_view_extent ||
0 == fh->f_view_size ||
0 == fh->f_etype_size ) {
/* not sure whether we should raise an error here */
*offset = 0;
return OMPI_SUCCESS;
}
/* No. of copies of the entire file view */
off = (fh->f_offset - fh->f_disp)/fh->f_view_extent;
@ -564,6 +571,7 @@ int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
int count,
const void *buf,
size_t *max_data,
opal_convertor_t *conv,
struct iovec **iov,
uint32_t *iovec_count)
{
@ -578,7 +586,7 @@ int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
size_t temp_data;
opal_convertor_clone (fh->f_convertor, &convertor, 0);
opal_convertor_clone (conv, &convertor, 0);
if (OMPI_SUCCESS != opal_convertor_prepare_for_send (&convertor,
&(datatype->super),
@ -674,7 +682,8 @@ int mca_common_ompio_decode_datatype (struct ompio_file_t *fh,
}
free (temp_iov);
opal_convertor_cleanup (&convertor);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -33,12 +33,10 @@
#include "common_ompio.h"
#include "common_ompio_request.h"
#include "common_ompio_buffer.h"
#include <unistd.h>
#include <math.h>
#if OPAL_CUDA_SUPPORT
#include "common_ompio_cuda.h"
#endif
/* Read and write routines are split into two interfaces.
** The
@ -90,39 +88,52 @@ int mca_common_ompio_file_read (ompio_file_t *fh,
return ret;
}
bool need_to_copy = false;
opal_convertor_t convertor;
#if OPAL_CUDA_SUPPORT
int is_gpu, is_managed;
opal_convertor_t convertor;
mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
if ( is_gpu && !is_managed ) {
need_to_copy = true;
}
#endif
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* only need to copy if any of these conditions are given:
1. buffer is an unmanaged CUDA buffer (checked above).
2. Datarepresentation is anything other than 'native' and
3. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
need_to_copy = true;
}
if ( need_to_copy ) {
char *tbuf=NULL;
OMPIO_CUDA_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
}
OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
}
else {
mca_common_ompio_decode_datatype (fh,
datatype,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
}
#else
mca_common_ompio_decode_datatype (fh,
datatype,
count,
buf,
&max_data,
&decoded_iov,
&iov_count);
#endif
if ( 0 < max_data && 0 == fh->f_iov_count ) {
if ( MPI_STATUS_IGNORE != status ) {
status->_ucount = 0;
}
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
return OMPI_SUCCESS;
}
@ -152,7 +163,9 @@ int mca_common_ompio_file_read (ompio_file_t *fh,
&i,
&j,
&total_bytes_read,
&spc);
&spc,
&fh->f_io_array,
&fh->f_num_of_io_entries);
if (fh->f_num_of_io_entries) {
ret_code = fh->f_fbtl->fbtl_preadv (fh);
@ -168,15 +181,14 @@ int mca_common_ompio_file_read (ompio_file_t *fh,
}
}
#if OPAL_CUDA_SUPPORT
if ( is_gpu && !is_managed ) {
if ( need_to_copy ) {
size_t pos=0;
opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
opal_convertor_cleanup (&convertor);
mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
}
#endif
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
@ -255,13 +267,32 @@ int mca_common_ompio_file_iread (ompio_file_t *fh,
int i = 0; /* index into the decoded iovec of the buffer */
int j = 0; /* index into the file vie iovec */
bool need_to_copy = false;
#if OPAL_CUDA_SUPPORT
int is_gpu, is_managed;
mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
if ( is_gpu && !is_managed ) {
need_to_copy = true;
}
#endif
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* only need to copy if any of these conditions are given:
1. buffer is an unmanaged CUDA buffer (checked above).
2. Datarepresentation is anything other than 'native' and
3. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
need_to_copy = true;
}
if ( need_to_copy ) {
char *tbuf=NULL;
OMPIO_CUDA_PREPARE_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count);
OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count);
ompio_req->req_tbuf = tbuf;
ompio_req->req_size = max_data;
@ -272,23 +303,21 @@ int mca_common_ompio_file_iread (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
}
#else
mca_common_ompio_decode_datatype (fh,
datatype,
count,
buf,
&max_data,
&decoded_iov,
&iov_count);
#endif
if ( 0 < max_data && 0 == fh->f_iov_count ) {
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
ompio_req->req_ompi.req_status._ucount = 0;
ompi_request_complete (&ompio_req->req_ompi, false);
*request = (ompi_request_t *) ompio_req;
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
return OMPI_SUCCESS;
}
@ -305,7 +334,9 @@ int mca_common_ompio_file_iread (ompio_file_t *fh,
&i,
&j,
&total_bytes_read,
&spc);
&spc,
&fh->f_io_array,
&fh->f_num_of_io_entries);
if (fh->f_num_of_io_entries) {
fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
@ -372,6 +403,62 @@ int mca_common_ompio_file_iread_at (ompio_file_t *fh,
/* Infrastructure for collective operations */
int mca_common_ompio_file_read_all (ompio_file_t *fh,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t * status)
{
int ret = OMPI_SUCCESS;
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* No need to check for GPU buffer for collective I/O.
Most algorithms copy data from aggregators, and send/recv
to/from GPU buffers works if ompi was compiled was GPU support.
If the individual fcoll component is used: there are no aggregators
in that concept. However, since they call common_ompio_file_write,
CUDA buffers are handled by that routine.
Thus, we only check for
1. Datarepresentation is anything other than 'native' and
2. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
size_t pos=0, max_data=0;
char *tbuf=NULL;
opal_convertor_t convertor;
struct iovec *decoded_iov = NULL;
uint32_t iov_count = 0;
OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
ret = fh->f_fcoll->fcoll_file_read_all (fh,
decoded_iov->iov_base,
decoded_iov->iov_len,
MPI_BYTE,
status);
opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
opal_convertor_cleanup (&convertor);
mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
}
else {
ret = fh->f_fcoll->fcoll_file_read_all (fh,
buf,
count,
datatype,
status);
}
return ret;
}
int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
@ -384,16 +471,41 @@ int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
mca_common_ompio_file_get_position (fh, &prev_offset );
mca_common_ompio_set_explicit_offset (fh, offset);
ret = fh->f_fcoll->fcoll_file_read_all (fh,
buf,
count,
datatype,
status);
ret = mca_common_ompio_file_read_all (fh,
buf,
count,
datatype,
status);
mca_common_ompio_set_explicit_offset (fh, prev_offset);
return ret;
}
int mca_common_ompio_file_iread_all (ompio_file_t *fp,
void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
{
int ret = OMPI_SUCCESS;
if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
ret = fp->f_fcoll->fcoll_file_iread_all (fp,
buf,
count,
datatype,
request);
}
else {
/* this fcoll component does not support non-blocking
collective I/O operations. WE fake it with
individual non-blocking I/O operations. */
ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
}
return ret;
}
int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
OMPI_MPI_OFFSET_TYPE offset,
void *buf,
@ -407,25 +519,17 @@ int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
mca_common_ompio_file_get_position (fp, &prev_offset );
mca_common_ompio_set_explicit_offset (fp, offset);
if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
ret = fp->f_fcoll->fcoll_file_iread_all (fp,
buf,
count,
datatype,
request);
}
else {
/* this fcoll component does not support non-blocking
collective I/O operations. WE fake it with
individual non-blocking I/O operations. */
ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
}
ret = mca_common_ompio_file_iread_all (fp,
buf,
count,
datatype,
request);
mca_common_ompio_set_explicit_offset (fp, prev_offset);
return ret;
}
int mca_common_ompio_set_explicit_offset (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset)
{

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* Copyright (c) 2017-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@ -91,6 +91,12 @@ int mca_common_ompio_set_view (ompio_file_t *fh,
fh->f_datarep = NULL;
}
if (NULL != fh->f_file_convertor) {
opal_convertor_cleanup (fh->f_file_convertor);
free (fh->f_file_convertor);
fh->f_file_convertor = NULL;
}
/* Reset the flags first */
if ( fh->f_flags & OMPIO_CONTIGUOUS_FVIEW ) {
fh->f_flags &= ~OMPIO_CONTIGUOUS_FVIEW;
@ -98,9 +104,24 @@ int mca_common_ompio_set_view (ompio_file_t *fh,
if ( fh->f_flags & OMPIO_UNIFORM_FVIEW ) {
fh->f_flags &= ~OMPIO_UNIFORM_FVIEW;
}
if ( fh->f_flags & OMPIO_DATAREP_NATIVE ) {
fh->f_flags &= ~OMPIO_DATAREP_NATIVE;
}
fh->f_datarep = strdup (datarep);
datatype_duplicate (filetype, &fh->f_orig_filetype );
if ( !(strcmp(datarep, "external32") && strcmp(datarep, "EXTERNAL32"))) {
fh->f_file_convertor = malloc ( sizeof(struct opal_convertor_t) );
if ( NULL == fh->f_file_convertor ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
opal_convertor_clone (ompi_mpi_external32_convertor, fh->f_file_convertor, 0);
}
else {
fh->f_file_convertor = opal_convertor_create (opal_local_arch, 0);
fh->f_flags |= OMPIO_DATAREP_NATIVE;
}
datatype_duplicate (filetype, &fh->f_orig_filetype );
opal_datatype_get_extent(&filetype->super, &lb, &ftype_extent);
opal_datatype_type_size (&filetype->super, &ftype_size);
@ -129,6 +150,7 @@ int mca_common_ompio_set_view (ompio_file_t *fh,
1,
NULL,
&max_data,
fh->f_file_convertor,
&fh->f_decoded_iov,
&fh->f_iov_count);

Просмотреть файл

@ -31,13 +31,10 @@
#include "common_ompio.h"
#include "common_ompio_request.h"
#include "common_ompio_buffer.h"
#include <unistd.h>
#include <math.h>
#if OPAL_CUDA_SUPPORT
#include "common_ompio_cuda.h"
#endif
int mca_common_ompio_file_write (ompio_file_t *fh,
const void *buf,
int count,
@ -72,16 +69,34 @@ int mca_common_ompio_file_write (ompio_file_t *fh,
return ret;
}
bool need_to_copy = false;
#if OPAL_CUDA_SUPPORT
int is_gpu, is_managed;
mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
if ( is_gpu && !is_managed ) {
need_to_copy = true;
}
#endif
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* only need to copy if any of these conditions are given:
1. buffer is an unmanaged CUDA buffer (checked above).
2. Datarepresentation is anything other than 'native' and
3. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
need_to_copy = true;
}
if ( need_to_copy ) {
size_t pos=0;
char *tbuf=NULL;
opal_convertor_t convertor;
OMPIO_CUDA_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
opal_convertor_cleanup ( &convertor);
}
@ -91,22 +106,19 @@ int mca_common_ompio_file_write (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
}
#else
mca_common_ompio_decode_datatype (fh,
datatype,
count,
buf,
&max_data,
&decoded_iov,
&iov_count);
#endif
if ( 0 < max_data && 0 == fh->f_iov_count ) {
if ( MPI_STATUS_IGNORE != status ) {
status->_ucount = 0;
}
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
return OMPI_SUCCESS;
}
@ -134,7 +146,9 @@ int mca_common_ompio_file_write (ompio_file_t *fh,
&i,
&j,
&total_bytes_written,
&spc);
&spc,
&fh->f_io_array,
&fh->f_num_of_io_entries);
if (fh->f_num_of_io_entries) {
ret_code =fh->f_fbtl->fbtl_pwritev (fh);
@ -149,11 +163,11 @@ int mca_common_ompio_file_write (ompio_file_t *fh,
fh->f_io_array = NULL;
}
}
#if OPAL_CUDA_SUPPORT
if ( is_gpu && !is_managed ) {
if ( need_to_copy ) {
mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
}
#endif
if (NULL != decoded_iov) {
free (decoded_iov);
@ -228,16 +242,34 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh,
int i = 0; /* index into the decoded iovec of the buffer */
int j = 0; /* index into the file vie iovec */
bool need_to_copy = false;
#if OPAL_CUDA_SUPPORT
int is_gpu, is_managed;
mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
if ( is_gpu && !is_managed ) {
need_to_copy = true;
}
#endif
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* only need to copy if any of these conditions are given:
1. buffer is an unmanaged CUDA buffer (checked above).
2. Datarepresentation is anything other than 'native' and
3. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
need_to_copy = true;
}
if ( need_to_copy ) {
size_t pos=0;
char *tbuf=NULL;
opal_convertor_t convertor;
OMPIO_CUDA_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
opal_convertor_cleanup (&convertor);
@ -250,23 +282,21 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
}
#else
mca_common_ompio_decode_datatype (fh,
datatype,
count,
buf,
&max_data,
&decoded_iov,
&iov_count);
#endif
if ( 0 < max_data && 0 == fh->f_iov_count ) {
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
ompio_req->req_ompi.req_status._ucount = 0;
ompi_request_complete (&ompio_req->req_ompi, false);
*request = (ompi_request_t *) ompio_req;
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
return OMPI_SUCCESS;
}
@ -283,7 +313,9 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh,
&i,
&j,
&total_bytes_written,
&spc);
&spc,
&fh->f_io_array,
&fh->f_num_of_io_entries);
if (fh->f_num_of_io_entries) {
fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *) ompio_req);
@ -348,6 +380,62 @@ int mca_common_ompio_file_iwrite_at (ompio_file_t *fh,
/* Collective operations */
/******************************************************************/
int mca_common_ompio_file_write_all (ompio_file_t *fh,
const void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_status_public_t *status)
{
int ret = OMPI_SUCCESS;
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
!(datatype == &ompi_mpi_byte.dt ||
datatype == &ompi_mpi_char.dt )) {
/* No need to check for GPU buffer for collective I/O.
Most algorithms first copy data to aggregators, and send/recv
to/from GPU buffers works if ompi was compiled was GPU support.
If the individual fcoll component is used: there are no aggregators
in that concept. However, since they call common_ompio_file_write,
CUDA buffers are handled by that routine.
Thus, we only check for
1. Datarepresentation is anything other than 'native' and
2. datatype is not byte or char (i.e it does require some actual
work to be done e.g. for external32.
*/
size_t pos=0, max_data=0;
char *tbuf=NULL;
opal_convertor_t convertor;
struct iovec *decoded_iov = NULL;
uint32_t iov_count = 0;
OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos );
opal_convertor_cleanup ( &convertor);
ret = fh->f_fcoll->fcoll_file_write_all (fh,
decoded_iov->iov_base,
decoded_iov->iov_len,
MPI_BYTE,
status);
mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
if (NULL != decoded_iov) {
free (decoded_iov);
decoded_iov = NULL;
}
}
else {
ret = fh->f_fcoll->fcoll_file_write_all (fh,
buf,
count,
datatype,
status);
}
return ret;
}
int mca_common_ompio_file_write_at_all (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
@ -361,30 +449,23 @@ int mca_common_ompio_file_write_at_all (ompio_file_t *fh,
mca_common_ompio_file_get_position (fh, &prev_offset );
mca_common_ompio_set_explicit_offset (fh, offset);
ret = fh->f_fcoll->fcoll_file_write_all (fh,
buf,
count,
datatype,
status);
ret = mca_common_ompio_file_write_all (fh,
buf,
count,
datatype,
status);
mca_common_ompio_set_explicit_offset (fh, prev_offset);
return ret;
}
int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp,
OMPI_MPI_OFFSET_TYPE offset,
const void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
int mca_common_ompio_file_iwrite_all (ompio_file_t *fp,
const void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE prev_offset;
mca_common_ompio_file_get_position (fp, &prev_offset );
mca_common_ompio_set_explicit_offset (fp, offset);
if ( NULL != fp->f_fcoll->fcoll_file_iwrite_all ) {
ret = fp->f_fcoll->fcoll_file_iwrite_all (fp,
@ -400,18 +481,40 @@ int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp,
ret = mca_common_ompio_file_iwrite ( fp, buf, count, datatype, request );
}
return ret;
}
int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp,
OMPI_MPI_OFFSET_TYPE offset,
const void *buf,
int count,
struct ompi_datatype_t *datatype,
ompi_request_t **request)
{
int ret = OMPI_SUCCESS;
OMPI_MPI_OFFSET_TYPE prev_offset;
mca_common_ompio_file_get_position (fp, &prev_offset );
mca_common_ompio_set_explicit_offset (fp, offset);
ret = mca_common_ompio_file_iwrite_all ( fp, buf, count, datatype, request );
mca_common_ompio_set_explicit_offset (fp, prev_offset);
return ret;
}
/* Helper function used by both read and write operations */
/**************************************************************/
int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
size_t bytes_per_cycle, size_t max_data, uint32_t iov_count,
struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw,
size_t *spc)
size_t *spc, mca_common_ompio_io_array_t **io_array,
int *num_io_entries)
{
ptrdiff_t disp;
int block = 1;
@ -424,7 +527,9 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
int k = 0; /* index into the io_array */
int i = *ii;
int j = *jj;
mca_common_ompio_io_array_t *f_io_array=NULL;
int f_num_io_entries=0;
sum_previous_length = fh->f_position_in_file_view;
if ((index == cycles-1) && (max_data % bytes_per_cycle)) {
@ -434,9 +539,9 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
bytes_to_write_in_cycle = bytes_per_cycle;
}
fh->f_io_array = (mca_common_ompio_io_array_t *)malloc
f_io_array = (mca_common_ompio_io_array_t *)malloc
(OMPIO_IOVEC_INITIAL_SIZE * sizeof (mca_common_ompio_io_array_t));
if (NULL == fh->f_io_array) {
if (NULL == f_io_array) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -445,10 +550,10 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
/* reallocate if needed */
if (OMPIO_IOVEC_INITIAL_SIZE*block <= k) {
block ++;
fh->f_io_array = (mca_common_ompio_io_array_t *)realloc
(fh->f_io_array, OMPIO_IOVEC_INITIAL_SIZE *
f_io_array = (mca_common_ompio_io_array_t *)realloc
(f_io_array, OMPIO_IOVEC_INITIAL_SIZE *
block * sizeof (mca_common_ompio_io_array_t));
if (NULL == fh->f_io_array) {
if (NULL == f_io_array) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -462,15 +567,15 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
disp = (ptrdiff_t)decoded_iov[i].iov_base +
(total_bytes_written - sum_previous_counts);
fh->f_io_array[k].memory_address = (IOVBASE_TYPE *)disp;
f_io_array[k].memory_address = (IOVBASE_TYPE *)disp;
if (decoded_iov[i].iov_len -
(total_bytes_written - sum_previous_counts) >=
bytes_to_write_in_cycle) {
fh->f_io_array[k].length = bytes_to_write_in_cycle;
f_io_array[k].length = bytes_to_write_in_cycle;
}
else {
fh->f_io_array[k].length = decoded_iov[i].iov_len -
f_io_array[k].length = decoded_iov[i].iov_len -
(total_bytes_written - sum_previous_counts);
}
@ -492,36 +597,36 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
disp = (ptrdiff_t)fh->f_decoded_iov[j].iov_base +
(fh->f_total_bytes - sum_previous_length);
fh->f_io_array[k].offset = (IOVBASE_TYPE *)(intptr_t)(disp + fh->f_offset);
f_io_array[k].offset = (IOVBASE_TYPE *)(intptr_t)(disp + fh->f_offset);
if (! (fh->f_flags & OMPIO_CONTIGUOUS_FVIEW)) {
if (fh->f_decoded_iov[j].iov_len -
(fh->f_total_bytes - sum_previous_length)
< fh->f_io_array[k].length) {
fh->f_io_array[k].length = fh->f_decoded_iov[j].iov_len -
< f_io_array[k].length) {
f_io_array[k].length = fh->f_decoded_iov[j].iov_len -
(fh->f_total_bytes - sum_previous_length);
}
}
total_bytes_written += fh->f_io_array[k].length;
fh->f_total_bytes += fh->f_io_array[k].length;
bytes_to_write_in_cycle -= fh->f_io_array[k].length;
total_bytes_written += f_io_array[k].length;
fh->f_total_bytes += f_io_array[k].length;
bytes_to_write_in_cycle -= f_io_array[k].length;
k = k + 1;
}
fh->f_position_in_file_view = sum_previous_length;
fh->f_index_in_file_view = j;
fh->f_num_of_io_entries = k;
f_num_io_entries = k;
#if 0
if (fh->f_rank == 0) {
int d;
printf("*************************** %d\n", fh->f_num_of_io_entries);
printf("*************************** %d\n", f_num_io_entries);
for (d=0 ; d<fh->f_num_of_io_entries ; d++) {
for (d=0 ; d<f_num_of_entries ; d++) {
printf(" ADDRESS: %p OFFSET: %p LENGTH: %d prev_count=%ld prev_length=%ld\n",
fh->f_io_array[d].memory_address,
fh->f_io_array[d].offset,
fh->f_io_array[d].length,
f_io_array[d].memory_address,
f_io_array[d].offset,
f_io_array[d].length,
sum_previous_counts, sum_previous_length);
}
}
@ -530,7 +635,9 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
*jj = j;
*tbw = total_bytes_written;
*spc = sum_previous_counts;
*io_array = f_io_array;
*num_io_entries = f_num_io_entries;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -19,9 +19,7 @@
*/
#include "common_ompio_request.h"
#if OPAL_CUDA_SUPPORT
#include "common_ompio_cuda.h"
#endif
#include "common_ompio_buffer.h"
static void mca_common_ompio_request_construct(mca_ompio_request_t* req);
static void mca_common_ompio_request_destruct(mca_ompio_request_t *req);
@ -37,7 +35,6 @@ opal_list_t mca_common_ompio_pending_requests = {{0}};
static int mca_common_ompio_request_free ( struct ompi_request_t **req)
{
mca_ompio_request_t *ompio_req = ( mca_ompio_request_t *)*req;
#if OPAL_CUDA_SUPPORT
if ( NULL != ompio_req->req_tbuf ) {
if ( MCA_OMPIO_REQUEST_READ == ompio_req->req_type ){
struct iovec decoded_iov;
@ -50,7 +47,6 @@ static int mca_common_ompio_request_free ( struct ompi_request_t **req)
}
mca_common_ompio_release_buf ( NULL, ompio_req->req_tbuf );
}
#endif
if ( NULL != ompio_req->req_free_fn ) {
ompio_req->req_free_fn (ompio_req );
}
@ -77,10 +73,8 @@ void mca_common_ompio_request_construct(mca_ompio_request_t* req)
req->req_ompi.req_cancel = mca_common_ompio_request_cancel;
req->req_ompi.req_type = OMPI_REQUEST_IO;
req->req_data = NULL;
#if OPAL_CUDA_SUPPORT
req->req_tbuf = NULL;
req->req_size = 0;
#endif
req->req_progress_fn = NULL;
req->req_free_fn = NULL;

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -52,11 +52,9 @@ struct mca_ompio_request_t {
mca_ompio_request_type_t req_type;
void *req_data;
opal_list_item_t req_item;
#if OPAL_CUDA_SUPPORT
void *req_tbuf;
size_t req_size;
opal_convertor_t req_convertor;
#endif
mca_fbtl_base_module_progress_fn_t req_progress_fn;
mca_fbtl_base_module_request_free_fn_t req_free_fn;
};

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -29,6 +29,11 @@
#include "ompi/mca/common/ompio/common_ompio_request.h"
extern int mca_fbtl_posix_priority;
extern bool mca_fbtl_posix_read_datasieving;
extern bool mca_fbtl_posix_write_datasieving;
extern size_t mca_fbtl_posix_max_block_size;
extern size_t mca_fbtl_posix_max_gap_size;
extern size_t mca_fbtl_posix_max_tmpbuf_size;
BEGIN_C_DECLS

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
@ -37,6 +37,15 @@ const char *mca_fbtl_posix_component_version_string =
"OMPI/MPI posix FBTL MCA component version " OMPI_VERSION;
int mca_fbtl_posix_priority = 10;
bool mca_fbtl_posix_read_datasieving = true;
bool mca_fbtl_posix_write_datasieving = true;
size_t mca_fbtl_posix_max_block_size = 1048576; // 1MB
size_t mca_fbtl_posix_max_gap_size = 4096; // Size of a block in many linux fs
size_t mca_fbtl_posix_max_tmpbuf_size = 67108864; // 64 MB
/*
* Private functions
*/
static int register_component(void);
/*
* Instantiate the public struct with all of our public information
@ -54,6 +63,7 @@ mca_fbtl_base_component_2_0_0_t mca_fbtl_posix_component = {
.mca_component_name = "posix",
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION),
.mca_register_component_params = register_component,
},
.fbtlm_data = {
/* This component is checkpointable */
@ -63,3 +73,62 @@ mca_fbtl_base_component_2_0_0_t mca_fbtl_posix_component = {
.fbtlm_file_query = mca_fbtl_posix_component_file_query, /* get priority and actions */
.fbtlm_file_unquery = mca_fbtl_posix_component_file_unquery, /* undo what was done by previous function */
};
static int register_component(void)
{
mca_fbtl_posix_priority = 10;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"priority", "Priority of the fbtl posix component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_priority);
mca_fbtl_posix_max_block_size = 1048576;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"max_block_size", "Maximum average size in bytes of a data block in an iovec for data sieving. "
"An average block size larger than this parameter will disable data sieving. Default: 1048576 bytes.",
MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_max_block_size );
mca_fbtl_posix_max_gap_size = 4096;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"max_gap_size", "Maximum average gap size between two blocks in an iovec for data sieving. "
"An average gap size larger than this parameter will disable data sieving. Default: 4096 bytes. " ,
MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_max_gap_size );
mca_fbtl_posix_max_tmpbuf_size = 67108864;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"max_tmpbuf_size", "Maximum size of the temporary buffer used for data sieving in bytes. "
"Default: 67108864 (64MB). " ,
MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_max_tmpbuf_size );
mca_fbtl_posix_read_datasieving = true;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"read_datasieving", "Parameter indicating whether to perform data sieving for read operations. "
"Default: true.",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_read_datasieving );
mca_fbtl_posix_write_datasieving = true;
(void) mca_base_component_var_register(&mca_fbtl_posix_component.fbtlm_version,
"write_datasieving", "Parameter indicating whether to perform data sieving for write operations. "
"Default: true.",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_fbtl_posix_write_datasieving );
return OMPI_SUCCESS;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -28,100 +28,260 @@
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
static ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh);
static ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh);
ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh )
{
/*int *fp = NULL;*/
int i, block=1, ret;
struct iovec *iov = NULL;
int iov_count = 0;
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
ssize_t bytes_read=0, ret_code=0;
struct flock lock;
off_t total_length, end_offset=0;
int ret;
if (NULL == fh->f_io_array) {
return OMPI_ERROR;
}
if ( fh->f_num_of_io_entries > 1 ) {
bool do_data_sieving = true;
iov = (struct iovec *) malloc
(OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
size_t avg_gap_size=0;
size_t avg_block_size = 0;
off_t prev_offset = (off_t)fh->f_io_array[0].offset;
int i;
for ( i=0; i< fh->f_num_of_io_entries; i++ ) {
avg_block_size += fh->f_io_array[i].length;
avg_gap_size += (size_t)((off_t)fh->f_io_array[i].offset - prev_offset);
prev_offset = (off_t)fh->f_io_array[i].offset;
}
avg_block_size = avg_block_size / fh->f_num_of_io_entries;
avg_gap_size = avg_gap_size / fh->f_num_of_io_entries;
if ( false == mca_fbtl_posix_read_datasieving ||
0 == avg_gap_size ||
avg_block_size > mca_fbtl_posix_max_block_size ||
avg_gap_size > mca_fbtl_posix_max_gap_size ) {
do_data_sieving = false;
}
if ( do_data_sieving) {
return mca_fbtl_posix_preadv_datasieving (fh);
}
else {
return mca_fbtl_posix_preadv_generic (fh);
}
}
else {
// i.e. fh->f_num_of_io_entries == 1
ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, (off_t)fh->f_io_array[0].offset,
(off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
return OMPI_ERROR;
}
ret_code = pread(fh->fd, fh->f_io_array[0].memory_address, fh->f_io_array[0].length,
(off_t)fh->f_io_array[0].offset );
mca_fbtl_posix_unlock ( &lock, fh );
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_preadv: error in (p)read(v):%s", strerror(errno));
return OMPI_ERROR;
}
bytes_read += ret_code;
}
return bytes_read;
}
ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh)
{
size_t start, end, len;
size_t bufsize = 0;
int ret, i, j;
ssize_t bytes_read=0, ret_code=0;
struct flock lock;
char *temp_buf = NULL;
int startindex = 0;
int endindex = 0;
bool done = false;
while (!done) {
// Break the io_array into chunks such that the size of the temporary
// buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes.
// Each iteration will thus work in the range (startindex, endindex[
startindex = endindex;
if ( startindex >= fh->f_num_of_io_entries ) {
done = true;
break;
}
size_t sstart = (size_t)fh->f_io_array[startindex].offset;
size_t slen=0;
for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) {
endindex = j;
slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart;
if (slen > mca_fbtl_posix_max_tmpbuf_size ) {
endindex = j-1;
break;
}
}
// Need to increment the value of endindex
// by one for the loop syntax to work correctly.
endindex++;
start = (size_t)fh->f_io_array[startindex].offset;
end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length;
len = end - start;
if ( len > bufsize ) {
if ( NULL != temp_buf ) {
free ( temp_buf);
}
temp_buf = (char *) malloc ( len );
if ( NULL == temp_buf ) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
bufsize = len;
}
// Read the entire block.
ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, start, len, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
ret_code = pread (fh->fd, temp_buf, len, start);
mca_fbtl_posix_unlock ( &lock, fh);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno));
free ( temp_buf);
return OMPI_ERROR;
}
// Copy out the elements that were requested.
size_t pos = 0;
size_t num_bytes;
size_t start_offset = (size_t) fh->f_io_array[startindex].offset;
for ( i = startindex ; i < endindex ; i++) {
pos = (size_t) fh->f_io_array[i].offset - start_offset;
if ( (ssize_t) pos > ret_code ) {
break;
}
num_bytes = fh->f_io_array[i].length;
if ( ((ssize_t) pos + (ssize_t)num_bytes) > ret_code ) {
num_bytes = ret_code - (ssize_t)pos;
}
memcpy (fh->f_io_array[i].memory_address, temp_buf + pos, num_bytes);
bytes_read += num_bytes;
}
}
free ( temp_buf);
return bytes_read;
}
ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh )
{
ssize_t bytes_read=0, ret_code=0;
struct iovec *iov = NULL;
struct flock lock;
int ret, i;
int block=1;
int iov_count = 0;
OMPI_MPI_OFFSET_TYPE iov_offset = 0;
off_t total_length, end_offset=0;
iov = (struct iovec *) malloc (OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
if (NULL == iov) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
if (0 == iov_count) {
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
iov[iov_count].iov_len = fh->f_io_array[i].length;
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
if (0 == iov_count) {
iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
iov[iov_count].iov_len = fh->f_io_array[i].length;
iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
iov_count ++;
}
if (OMPIO_IOVEC_INITIAL_SIZE*block <= iov_count) {
block ++;
iov = (struct iovec *)realloc
(iov, OMPIO_IOVEC_INITIAL_SIZE * block *
sizeof(struct iovec));
if (NULL == iov) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
if (fh->f_num_of_io_entries != i+1) {
if (((((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
(ptrdiff_t)fh->f_io_array[i].length) ==
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset)) &&
(iov_count < IOV_MAX ) ){
iov[iov_count].iov_base =
fh->f_io_array[i+1].memory_address;
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
iov_count ++;
continue;
}
}
iov_count ++;
}
if (OMPIO_IOVEC_INITIAL_SIZE*block <= iov_count) {
block ++;
iov = (struct iovec *)realloc
(iov, OMPIO_IOVEC_INITIAL_SIZE * block *
sizeof(struct iovec));
if (NULL == iov) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
}
if (fh->f_num_of_io_entries != i+1) {
if (((((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
(ptrdiff_t)fh->f_io_array[i].length) ==
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset)) &&
(iov_count < IOV_MAX ) ){
iov[iov_count].iov_base =
fh->f_io_array[i+1].memory_address;
iov[iov_count].iov_len = fh->f_io_array[i+1].length;
end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
iov_count ++;
continue;
}
}
total_length = (end_offset - (off_t)iov_offset );
ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno));
opal_output(1, "mca_fbtl_posix_preadv_generic: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno));
free (iov);
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
return OMPI_ERROR;
}
#if defined(HAVE_PREADV)
ret_code = preadv (fh->fd, iov, iov_count, iov_offset);
ret_code = preadv (fh->fd, iov, iov_count, iov_offset);
#else
if (-1 == lseek (fh->fd, iov_offset, SEEK_SET)) {
opal_output(1, "mca_fbtl_posix_preadv: error in lseek:%s", strerror(errno));
if (-1 == lseek (fh->fd, iov_offset, SEEK_SET)) {
opal_output(1, "mca_fbtl_posix_preadv_generic: error in lseek:%s", strerror(errno));
free(iov);
mca_fbtl_posix_unlock ( &lock, fh );
return OMPI_ERROR;
}
ret_code = readv (fh->fd, iov, iov_count);
return OMPI_ERROR;
}
ret_code = readv (fh->fd, iov, iov_count);
#endif
mca_fbtl_posix_unlock ( &lock, fh );
if ( 0 < ret_code ) {
bytes_read+=ret_code;
}
else if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_preadv: error in (p)readv:%s", strerror(errno));
if ( 0 < ret_code ) {
bytes_read+=ret_code;
}
else if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_preadv_generic: error in (p)readv:%s", strerror(errno));
free(iov);
return OMPI_ERROR;
}
else if ( 0 == ret_code ){
/* end of file reached, no point in continue reading; */
break;
}
iov_count = 0;
}
return OMPI_ERROR;
}
else if ( 0 == ret_code ){
/* end of file reached, no point in continue reading; */
break;
}
iov_count = 0;
}
free (iov);
return bytes_read;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2017 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -30,7 +30,187 @@
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
static ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh );
static ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh );
ssize_t mca_fbtl_posix_pwritev(ompio_file_t *fh )
{
ssize_t bytes_written=0, ret_code=0;
struct flock lock;
int ret;
if (NULL == fh->f_io_array) {
return OMPI_ERROR;
}
if ( fh->f_num_of_io_entries > 1 ) {
bool do_data_sieving = true;
size_t avg_gap_size=0;
size_t avg_block_size = 0;
off_t prev_offset = (off_t)fh->f_io_array[0].offset;
int i;
for ( i=0; i< fh->f_num_of_io_entries; i++ ) {
avg_block_size += fh->f_io_array[i].length;
avg_gap_size += (size_t)((off_t)fh->f_io_array[i].offset - prev_offset);
prev_offset = (off_t)fh->f_io_array[i].offset;
}
avg_block_size = avg_block_size / fh->f_num_of_io_entries;
avg_gap_size = avg_gap_size / fh->f_num_of_io_entries;
if ( false == mca_fbtl_posix_write_datasieving ||
0 == avg_gap_size ||
avg_block_size > mca_fbtl_posix_max_block_size ||
avg_gap_size > mca_fbtl_posix_max_gap_size ||
ompi_mpi_thread_multiple ||
!(fh->f_flags & OMPIO_COLLECTIVE_OP) ) {
do_data_sieving = false;
}
if ( do_data_sieving) {
return mca_fbtl_posix_pwritev_datasieving (fh);
}
else {
return mca_fbtl_posix_pwritev_generic (fh);
}
}
else {
// i.e. fh->f_num_of_io_entries == 1
ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, (off_t)fh->f_io_array[0].offset,
(off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_pwritev: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
return OMPI_ERROR;
}
ret_code = pwrite(fh->fd, fh->f_io_array[0].memory_address, fh->f_io_array[0].length,
(off_t)fh->f_io_array[0].offset );
mca_fbtl_posix_unlock ( &lock, fh );
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev: error in (p)write(v):%s", strerror(errno));
return OMPI_ERROR;
}
bytes_written += ret_code;
}
return bytes_written;
}
ssize_t mca_fbtl_posix_pwritev_datasieving (ompio_file_t *fh)
{
size_t start, end, len;
size_t bufsize = 0;
int ret, i, j;
ssize_t bytes_written=0, ret_code=0;
struct flock lock;
char *temp_buf = NULL;
int startindex = 0;
int endindex = 0;
bool done = false;
while (!done) {
// Break the io_array into chunks such that the size of the temporary
// buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes.
// Each iteration will thus work in the range (startindex, endindex[
startindex = endindex;
if ( startindex >= fh->f_num_of_io_entries ) {
done = true;
break;
}
size_t sstart = (size_t)fh->f_io_array[startindex].offset;
size_t slen=0;
for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) {
endindex = j;
slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart;
if (slen > mca_fbtl_posix_max_tmpbuf_size ) {
endindex = j-1;
break;
}
}
// Need to increment the value of endindex
// by one for the loop syntax to work correctly.
endindex++;
start = (size_t)fh->f_io_array[startindex].offset;
end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length;
len = end - start;
if ( len > bufsize ) {
if ( NULL != temp_buf ) {
free ( temp_buf);
}
temp_buf = (char *) malloc ( len );
if ( NULL == temp_buf ) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
bufsize = len;
}
// Read the entire block.
ret = mca_fbtl_posix_lock ( &lock, fh, F_WRLCK, start, len, OMPIO_LOCK_ENTIRE_REGION );
if ( 0 < ret ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s",
ret, strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
ret_code = pread (fh->fd, temp_buf, len, start);
if ( ret_code == -1 ) {
//opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
// Copy out the elements to write into temporary buffer.
size_t pos = 0;
size_t num_bytes;
size_t start_offset = (size_t) fh->f_io_array[startindex].offset;
for ( i = startindex ; i < endindex ; i++) {
pos = (size_t) fh->f_io_array[i].offset - start_offset;
num_bytes = fh->f_io_array[i].length;
memcpy (temp_buf + pos, fh->f_io_array[i].memory_address, num_bytes);
bytes_written += num_bytes;
}
ret_code = pwrite (fh->fd, temp_buf, len, start);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
mca_fbtl_posix_unlock ( &lock, fh);
if ( ret_code == -1 ) {
opal_output(1, "mca_fbtl_posix_pwritev_datasieving: error in pwrite:%s", strerror(errno));
/* Just in case some part of the lock worked */
mca_fbtl_posix_unlock ( &lock, fh);
free ( temp_buf);
return OMPI_ERROR;
}
}
free ( temp_buf);
return bytes_written;
}
ssize_t mca_fbtl_posix_pwritev_generic (ompio_file_t *fh )
{
/*int *fp = NULL;*/
int i, block = 1, ret;

Просмотреть файл

@ -130,6 +130,7 @@ mca_fcoll_dynamic_file_read_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret){

Просмотреть файл

@ -132,6 +132,7 @@ mca_fcoll_dynamic_file_write_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret ){

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -36,7 +36,6 @@ BEGIN_C_DECLS
extern int mca_fcoll_dynamic_gen2_priority;
extern int mca_fcoll_dynamic_gen2_num_groups;
extern int mca_fcoll_dynamic_gen2_write_chunksize;
OMPI_MODULE_DECLSPEC extern mca_fcoll_base_component_2_0_0_t mca_fcoll_dynamic_gen2_component;

Просмотреть файл

@ -11,7 +11,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
@ -42,7 +42,6 @@ const char *mca_fcoll_dynamic_gen2_component_version_string =
*/
int mca_fcoll_dynamic_gen2_priority = 10;
int mca_fcoll_dynamic_gen2_num_groups = 1;
int mca_fcoll_dynamic_gen2_write_chunksize = -1;
/*
* Local function
@ -95,12 +94,5 @@ dynamic_gen2_register(void)
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_num_groups);
mca_fcoll_dynamic_gen2_write_chunksize = -1;
(void) mca_base_component_var_register(&mca_fcoll_dynamic_gen2_component.fcollm_version,
"write_chunksize", "Chunk size written at once. Default: stripe_size of the file system",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_dynamic_gen2_write_chunksize);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -130,6 +130,7 @@ mca_fcoll_dynamic_gen2_file_read_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret){

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2016 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
@ -92,7 +92,7 @@ typedef struct mca_io_ompio_aggregator_data {
static int shuffle_init ( int index, int cycles, int aggregator, int rank,
mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize );
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data );
int mca_fcoll_dynamic_gen2_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
@ -111,8 +111,7 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array,
int *sorted);
int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array,
int num_entries, int *last_array_pos, int *last_pos_in_field,
int chunk_size );
int num_entries, int *last_array_pos, int *last_pos_in_field );
int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
@ -145,7 +144,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
MPI_Aint *broken_total_lengths=NULL;
int *aggregators=NULL;
int write_chunksize, *result_counts=NULL;
int *result_counts=NULL;
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
@ -170,6 +169,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret ){
@ -198,15 +198,9 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
if ( fh->f_stripe_size == 0 ) {
// EDGAR: just a quick heck for testing
//fh->f_stripe_size = 1048576;
fh->f_stripe_size = 65536;
}
if ( -1 == mca_fcoll_dynamic_gen2_write_chunksize ) {
write_chunksize = fh->f_stripe_size;
}
else {
write_chunksize = mca_fcoll_dynamic_gen2_write_chunksize;
}
ret = mca_fcoll_dynamic_gen2_get_configuration (fh, &dynamic_gen2_num_io_procs, &aggregators);
if (OMPI_SUCCESS != ret){
@ -607,7 +601,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[i], aggr_data[i] );
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -636,7 +630,7 @@ int mca_fcoll_dynamic_gen2_file_write_all (ompio_file_t *fh,
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[i], aggr_data[i] );
if (OMPI_SUCCESS != ret){
goto exit;
}
@ -734,7 +728,7 @@ exit :
}
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize )
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data )
{
int ret=OMPI_SUCCESS;
int last_array_pos=0;
@ -742,18 +736,36 @@ static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator
if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
fh->f_flags |= OMPIO_COLLECTIVE_OP;
while ( aggr_data->prev_bytes_to_write > 0 ) {
ssize_t tret;
aggr_data->prev_bytes_to_write -= mca_fcoll_dynamic_gen2_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
&last_array_pos, &last_pos,
write_chunksize );
if ( 0 > fh->f_fbtl->fbtl_pwritev (fh)) {
&last_array_pos, &last_pos );
tret = fh->f_fbtl->fbtl_pwritev (fh);
if ( 0 > tret ) {
free ( aggr_data->prev_io_array);
opal_output (1, "dynamic_gen2_write_all: fbtl_pwritev failed\n");
ret = OMPI_ERROR;
goto exit;
}
#if DEBUG_ON
printf("fh->f_num_of_io_entries=%d\n", fh->f_num_of_io_entries);
printf("[%d]: fh->f_io_array[0].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[0].offset,
fh->f_io_array[0].length);
if ( fh->f_num_of_io_entries > 1 )
printf("[%d]: fh->f_io_array[1].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[1].offset,
fh->f_io_array[1].length);
int n = fh->f_num_of_io_entries-1;
if ( fh->f_num_of_io_entries > 2 )
printf("[%d]: fh->f_io_array[n].offset = %ld .size = %ld\n", fh->f_rank, (long)fh->f_io_array[n].offset,
fh->f_io_array[n].length);
#endif
}
fh->f_flags &= ~OMPIO_COLLECTIVE_OP;
free ( fh->f_io_array );
free ( aggr_data->prev_io_array);
}
@ -800,7 +812,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
for(l=0;l<data->procs_per_group;l++){
data->disp_index[l] = 1;
data->disp_index[l] = 0;
if(data->max_disp_index[l] == 0) {
data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
@ -879,8 +891,8 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
/* The data fits completely into the block */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
(data->global_iov_array[data->sorted[data->current_index]].iov_len
- data->bytes_remaining);
@ -913,11 +925,12 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
/* the remaining data from the previous cycle is larger than the
data->bytes_to_write_in_cycle, so we have to segment again */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
(data->global_iov_array[data->sorted[data->current_index]].iov_len
- data->bytes_remaining);
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
@ -934,9 +947,10 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
(MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
/* This entry has more data than we can sendin one cycle */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
bytes_sent += data->bytes_to_write_in_cycle;
@ -950,9 +964,9 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
else {
/* Next data entry is less than data->bytes_to_write_in_cycle */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] =
data->global_iov_array[data->sorted[data->current_index]].iov_len;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t)
data->global_iov_array[data->sorted[data->current_index]].iov_base;
data->disp_index[data->n] += 1;
@ -1592,14 +1606,15 @@ int mca_fcoll_dynamic_gen2_get_configuration (ompio_file_t *fh, int *dynamic_gen
int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries,
int *ret_array_pos, int *ret_pos, int chunk_size )
int *ret_array_pos, int *ret_pos )
{
int array_pos = *ret_array_pos;
int pos = *ret_pos;
size_t bytes_written = 0;
size_t bytes_to_write = chunk_size;
off_t baseaddr = ((off_t)io_array[array_pos].offset + pos) - (((off_t)io_array[array_pos].offset + pos) % (off_t)fh->f_stripe_size);
off_t endaddr = baseaddr + fh->f_stripe_size;
if ( 0 == array_pos && 0 == pos ) {
fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t));
if ( NULL == fh->f_io_array ){
@ -1609,32 +1624,28 @@ int mca_fcoll_dynamic_gen2_split_iov_array ( ompio_file_t *fh, mca_common_ompio_
}
int i=0;
while (bytes_to_write > 0 ) {
fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]);
fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]);
do {
fh->f_io_array[i].memory_address = (char *)io_array[array_pos].memory_address + pos;
fh->f_io_array[i].offset = (char *)io_array[array_pos].offset + pos;
if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) {
fh->f_io_array[i].length = bytes_to_write;
off_t length = io_array[array_pos].length - pos;
if ( ( (off_t)fh->f_io_array[i].offset + length) < endaddr ) {
fh->f_io_array[i].length = length;
}
else {
fh->f_io_array[i].length = io_array[array_pos].length - pos;
fh->f_io_array[i].length = endaddr - (size_t)fh->f_io_array[i].offset;
}
pos += fh->f_io_array[i].length;
bytes_written += fh->f_io_array[i].length;
bytes_to_write-= fh->f_io_array[i].length;
i++;
if ( pos == (int)io_array[array_pos].length ) {
pos = 0;
if ((array_pos + 1) < num_entries) {
array_pos++;
}
else {
break;
}
array_pos++;
}
}
} while ( (array_pos < num_entries) && (((off_t)io_array[array_pos].offset+pos ) < endaddr) );
fh->f_num_of_io_entries = i;
*ret_array_pos = array_pos;

Просмотреть файл

@ -155,6 +155,7 @@ mca_fcoll_two_phase_file_read_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&temp_iov,
&iov_count);
if (OMPI_SUCCESS != ret ){

Просмотреть файл

@ -185,6 +185,7 @@ mca_fcoll_two_phase_file_write_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&temp_iov,
&iov_count);
if (OMPI_SUCCESS != ret ){

Просмотреть файл

@ -129,6 +129,7 @@ mca_fcoll_vulcan_file_read_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret){

Просмотреть файл

@ -185,6 +185,7 @@ int mca_fcoll_vulcan_file_write_all (ompio_file_t *fh,
count,
buf,
&max_data,
fh->f_mem_convertor,
&decoded_iov,
&iov_count);
if (OMPI_SUCCESS != ret ){
@ -770,7 +771,9 @@ static int write_init (ompio_file_t *fh,
}
}
else {
fh->f_flags |= OMPIO_COLLECTIVE_OP;
ret_temp = fh->f_fbtl->fbtl_pwritev(fh);
fh->f_flags &= ~OMPIO_COLLECTIVE_OP;
if(0 > ret_temp) {
opal_output (1, "vulcan_write_all: fbtl_pwritev failed\n");
ret = ret_temp;
@ -835,7 +838,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
for(l=0;l<data->procs_per_group;l++){
data->disp_index[l] = 1;
data->disp_index[l] = 0;
if ( data->max_disp_index[l] == 0 ) {
data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int));
@ -914,8 +917,8 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
if (data->bytes_remaining <= data->bytes_to_write_in_cycle) {
/* The data fits completely into the block */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_remaining;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
(data->global_iov_array[data->sorted[data->current_index]].iov_len
- data->bytes_remaining);
@ -949,11 +952,12 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
/* the remaining data from the previous cycle is larger than the
data->bytes_to_write_in_cycle, so we have to segment again */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base +
(data->global_iov_array[data->sorted[data->current_index]].iov_len
- data->bytes_remaining);
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
@ -970,9 +974,10 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
(MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) {
/* This entry has more data than we can sendin one cycle */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle;
data->displs_per_process[data->n][data->disp_index[data->n]] =
(ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ;
data->disp_index[data->n] += 1;
}
if (data->procs_in_group[data->n] == rank) {
bytes_sent += data->bytes_to_write_in_cycle;
@ -986,9 +991,9 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
else {
/* Next data entry is less than data->bytes_to_write_in_cycle */
if (aggregator == rank) {
data->blocklen_per_process[data->n][data->disp_index[data->n] - 1] =
data->blocklen_per_process[data->n][data->disp_index[data->n]] =
data->global_iov_array[data->sorted[data->current_index]].iov_len;
data->displs_per_process[data->n][data->disp_index[data->n] - 1] = (ptrdiff_t)
data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t)
data->global_iov_array[data->sorted[data->current_index]].iov_base;
data->disp_index[data->n] += 1;

Просмотреть файл

@ -64,22 +64,6 @@ int mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
struct opal_info_t *info,
ompio_file_t *fh);
int mca_fs_lustre_file_close (ompio_file_t *fh);
int mca_fs_lustre_file_delete (char *filename,
struct opal_info_t *info);
int mca_fs_lustre_file_set_size (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE size);
int mca_fs_lustre_file_get_size (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *size);
int mca_fs_lustre_file_sync (ompio_file_t *fh);
int mca_fs_lustre_file_seek (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
int whence);
/*
* ******************************************************************
* ************ functions implemented in this module end ************

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* Copyright (c) 2015-2020 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* $COPYRIGHT$
@ -29,6 +29,7 @@
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fs/fs.h"
#include "ompi/mca/fs/base/base.h"
#include "ompi/communicator/communicator.h"
#include "ompi/info/info.h"
@ -63,8 +64,7 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
struct opal_info_t *info,
ompio_file_t *fh)
{
int amode, rank;
int old_mask, perm;
int amode, perm;
int rc, ret=OMPI_SUCCESS;
int flag;
int fs_lustre_stripe_size = -1;
@ -73,24 +73,8 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
struct lov_user_md *lump=NULL;
if (fh->f_perm == OMPIO_PERM_NULL) {
old_mask = umask(022);
umask(old_mask);
perm = old_mask ^ 0666;
}
else {
perm = fh->f_perm;
}
rank = fh->f_rank;
amode = 0;
if (access_mode & MPI_MODE_RDONLY)
amode = amode | O_RDONLY;
if (access_mode & MPI_MODE_WRONLY)
amode = amode | O_WRONLY;
if (access_mode & MPI_MODE_RDWR)
amode = amode | O_RDWR;
perm = mca_fs_base_get_file_perm(fh);
amode = mca_fs_base_get_file_amode(fh->f_rank, access_mode);
opal_info_get (info, "stripe_size", MPI_MAX_INFO_VAL, char_stripe, &flag);
if ( flag ) {
@ -113,13 +97,7 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
/* Reset errno */
errno = 0;
if (0 == fh->f_rank) {
/* MODE_CREATE and MODE_EXCL can only be set by one process */
if ( access_mode & MPI_MODE_CREATE )
amode = amode | O_CREAT;
if (access_mode & MPI_MODE_EXCL)
amode = amode | O_EXCL;
if (OMPIO_ROOT == fh->f_rank) {
if ( (fs_lustre_stripe_size>0 || fs_lustre_stripe_width>0) &&
( amode&O_CREAT) &&
( (amode&O_RDWR)|| amode&O_WRONLY) ) {
@ -134,28 +112,9 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
else {
fh->fd = open (filename, amode, perm);
}
if ( 0 > fh->fd ) {
if ( EACCES == errno ) {
ret = MPI_ERR_ACCESS;
}
else if ( ENAMETOOLONG == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( ENOENT == errno ) {
ret = MPI_ERR_NO_SUCH_FILE;
}
else if ( EISDIR == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( EROFS == errno ) {
ret = MPI_ERR_READ_ONLY;
}
else if ( EEXIST == errno ) {
ret = MPI_ERR_FILE_EXISTS;
}
else {
ret = MPI_ERR_OTHER;
}
ret = mca_fs_base_get_mpi_err(errno);
}
}
@ -165,39 +124,17 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
return ret;
}
if ( 0 != rank ) {
if (OMPIO_ROOT != fh->f_rank) {
fh->fd = open (filename, amode, perm);
if ( 0 > fh->fd) {
if ( EACCES == errno ) {
ret = MPI_ERR_ACCESS;
}
else if ( ENAMETOOLONG == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( ENOENT == errno ) {
ret = MPI_ERR_NO_SUCH_FILE;
}
else if ( EISDIR == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( EROFS == errno ) {
ret = MPI_ERR_READ_ONLY;
}
else if ( EEXIST == errno ) {
ret = MPI_ERR_FILE_EXISTS;
}
else {
ret = MPI_ERR_OTHER;
}
return mca_fs_base_get_mpi_err(errno);
}
}
lump = alloc_lum();
if (NULL == lump ){
fprintf(stderr,"Cannot allocate memory for extracting stripe size\n");
return OMPI_ERROR;
fprintf(stderr,"Cannot allocate memory for extracting stripe size\n");
return OMPI_ERROR;
}
rc = llapi_file_get_stripe(filename, lump);
if (rc != 0) {
@ -207,6 +144,7 @@ mca_fs_lustre_file_open (struct ompi_communicator_t *comm,
fh->f_stripe_size = lump->lmm_stripe_size;
fh->f_stripe_count = lump->lmm_stripe_count;
fh->f_fs_block_size = lump->lmm_stripe_size;
fh->f_flags |= OMPIO_LOCK_NEVER;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -89,9 +89,6 @@ int mca_fs_pvfs2_file_get_size (ompio_file_t *fh,
int mca_fs_pvfs2_file_sync (ompio_file_t *fh);
int mca_fs_pvfs2_file_seek (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
int whence);
/*
* ******************************************************************
* ************ functions implemented in this module end ************

Просмотреть файл

@ -60,22 +60,6 @@ int mca_fs_ufs_file_open (struct ompi_communicator_t *comm,
struct opal_info_t *info,
ompio_file_t *fh);
int mca_fs_ufs_file_close (ompio_file_t *fh);
int mca_fs_ufs_file_delete (char *filename,
struct opal_info_t *info);
int mca_fs_ufs_file_set_size (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE size);
int mca_fs_ufs_file_get_size (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE *size);
int mca_fs_ufs_file_sync (ompio_file_t *fh);
int mca_fs_ufs_file_seek (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
int whence);
/*
* ******************************************************************
* ************ functions implemented in this module end ************

Просмотреть файл

@ -48,114 +48,32 @@ mca_fs_ufs_file_open (struct ompi_communicator_t *comm,
struct opal_info_t *info,
ompio_file_t *fh)
{
int amode;
int old_mask, perm;
int rank, ret=OMPI_SUCCESS;
int amode, perm;
int ret=OMPI_SUCCESS;
rank = ompi_comm_rank ( comm );
if (fh->f_perm == OMPIO_PERM_NULL) {
old_mask = umask(022);
umask(old_mask);
perm = old_mask ^ 0666;
}
else {
perm = fh->f_perm;
}
amode = 0;
if (access_mode & MPI_MODE_RDONLY)
amode = amode | O_RDONLY;
if (access_mode & MPI_MODE_WRONLY)
amode = amode | O_WRONLY;
if (access_mode & MPI_MODE_RDWR)
amode = amode | O_RDWR;
perm = mca_fs_base_get_file_perm(fh);
amode = mca_fs_base_get_file_amode(fh->f_rank, access_mode);
/* Reset errno */
errno = 0;
if ( 0 == rank ) {
/* MODE_CREATE and MODE_EXCL can only be set by one process */
if ( access_mode & MPI_MODE_CREATE )
amode = amode | O_CREAT;
if (access_mode & MPI_MODE_EXCL)
amode = amode | O_EXCL;
fh->fd = open (filename, amode, perm);
if ( 0 > fh->fd ) {
if ( EACCES == errno ) {
ret = MPI_ERR_ACCESS;
}
else if ( ENAMETOOLONG == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( ENOENT == errno ) {
ret = MPI_ERR_NO_SUCH_FILE;
}
else if ( EISDIR == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( EROFS == errno ) {
ret = MPI_ERR_READ_ONLY;
}
else if ( EEXIST == errno ) {
ret = MPI_ERR_FILE_EXISTS;
}
else if ( ENOSPC == errno ) {
ret = MPI_ERR_NO_SPACE;
}
else if ( EDQUOT == errno ) {
ret = MPI_ERR_QUOTA;
}
else if ( ETXTBSY == errno ) {
ret = MPI_ERR_FILE_IN_USE;
}
else {
ret = MPI_ERR_OTHER;
}
if (OMPIO_ROOT == fh->f_rank) {
fh->fd = open (filename, amode, perm);
if ( 0 > fh->fd ) {
ret = mca_fs_base_get_mpi_err(errno);
}
}
comm->c_coll->coll_bcast ( &ret, 1, MPI_INT, 0, comm, comm->c_coll->coll_bcast_module);
if ( OMPI_SUCCESS != ret ) {
fh->fd = -1;
return ret;
fh->fd = -1;
return ret;
}
if ( 0 != rank ) {
fh->fd = open (filename, amode, perm);
if ( 0 > fh->fd) {
if ( EACCES == errno ) {
ret = MPI_ERR_ACCESS;
}
else if ( ENAMETOOLONG == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( ENOENT == errno ) {
ret = MPI_ERR_NO_SUCH_FILE;
}
else if ( EISDIR == errno ) {
ret = MPI_ERR_BAD_FILE;
}
else if ( EROFS == errno ) {
ret = MPI_ERR_READ_ONLY;
}
else if ( EEXIST == errno ) {
ret = MPI_ERR_FILE_EXISTS;
}
else if ( ENOSPC == errno ) {
ret = MPI_ERR_NO_SPACE;
}
else if ( EDQUOT == errno ) {
ret = MPI_ERR_QUOTA;
}
else if ( ETXTBSY == errno ) {
ret = MPI_ERR_FILE_IN_USE;
}
else {
ret = MPI_ERR_OTHER;
}
}
if (OMPIO_ROOT != fh->f_rank) {
fh->fd = open (filename, amode, perm);
if ( 0 > fh->fd) {
return mca_fs_base_get_mpi_err(errno);
}
}
fh->f_stripe_size=0;

Просмотреть файл

@ -29,7 +29,6 @@
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "ompi/errhandler/errhandler.h"
#include "opal/threads/mutex.h"
#include "ompi/file/file.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/fs/fs.h"

Просмотреть файл

@ -10,12 +10,13 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2018 DataDirect Networks. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -27,17 +28,18 @@
#include "mpi.h"
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "opal/mca/base/base.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/fs/base/base.h"
#include "io_ompio.h"
#include "ompi/mca/common/ompio/common_ompio_request.h"
#include "ompi/mca/common/ompio/common_ompio_buffer.h"
#if OPAL_CUDA_SUPPORT
#include "ompi/mca/common/ompio/common_ompio_cuda.h"
#ifdef HAVE_IME_NATIVE_H
#include "ompi/mca/fs/ime/fs_ime.h"
#endif
int mca_io_ompio_cycle_buffer_size = OMPIO_DEFAULT_CYCLE_BUF_SIZE;
int mca_io_ompio_bytes_per_agg = OMPIO_PREALLOC_MAX_BUF_SIZE;
int mca_io_ompio_num_aggregators = -1;
@ -275,13 +277,13 @@ static int open_component(void)
static int close_component(void)
{
mca_common_ompio_request_fini ();
#if OPAL_CUDA_SUPPORT
mca_common_ompio_cuda_alloc_fini();
#endif
mca_common_ompio_buffer_alloc_fini();
OBJ_DESTRUCT(&mca_io_ompio_mutex);
#ifdef HAVE_IME_NATIVE_H
mca_fs_ime_native_fini();
#endif
return OMPI_SUCCESS;
}
@ -299,42 +301,11 @@ file_query(struct ompi_file_t *file,
int *priority)
{
mca_common_ompio_data_t *data;
char *tmp;
int rank;
int is_lustre=0; //false
tmp = strchr (file->f_filename, ':');
rank = ompi_comm_rank ( file->f_comm);
if (!tmp) {
if ( 0 == rank) {
if (LUSTRE == mca_fs_base_get_fstype(file->f_filename)) {
is_lustre = 1; //true
}
}
file->f_comm->c_coll->coll_bcast (&is_lustre,
1,
MPI_INT,
0,
file->f_comm,
file->f_comm->c_coll->coll_bcast_module);
}
else {
if (!strncasecmp(file->f_filename, "lustre:", 7) ) {
is_lustre = 1;
}
}
if (is_lustre) {
*priority = 1;
}
else {
*priority = priority_param;
}
*priority = priority_param;
/* Allocate a space for this module to hang private data (e.g.,
the OMPIO file handle) */
data = calloc(1, sizeof(mca_common_ompio_data_t));
if (NULL == data) {
return NULL;
@ -343,7 +314,6 @@ file_query(struct ompi_file_t *file,
*private_data = (struct mca_io_base_file_t*) data;
/* All done */
return &mca_io_ompio_module;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* Copyright (c) 2017-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -137,12 +137,11 @@ int mca_io_ompio_file_read_all (ompi_file_t *fh,
data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
OPAL_THREAD_LOCK(&fh->f_lock);
ret = data->ompio_fh.
f_fcoll->fcoll_file_read_all (&data->ompio_fh,
buf,
count,
datatype,
status);
ret = mca_common_ompio_file_read_all (&data->ompio_fh,
buf,
count,
datatype,
status);
OPAL_THREAD_UNLOCK(&fh->f_lock);
if ( MPI_STATUS_IGNORE != status ) {
size_t size;
@ -162,25 +161,15 @@ int mca_io_ompio_file_iread_all (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_common_ompio_data_t *data=NULL;
ompio_file_t *fp=NULL;
data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
fp = &data->ompio_fh;
OPAL_THREAD_LOCK(&fh->f_lock);
if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
ret = fp->f_fcoll->fcoll_file_iread_all (&data->ompio_fh,
buf,
count,
datatype,
request);
}
else {
/* this fcoll component does not support non-blocking
collective I/O operations. WE fake it with
individual non-blocking I/O operations. */
ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
}
ret = mca_common_ompio_file_iread_all (&data->ompio_fh,
buf,
count,
datatype,
request);
OPAL_THREAD_UNLOCK(&fh->f_lock);
return ret;

Просмотреть файл

@ -66,7 +66,8 @@ int mca_io_ompio_file_set_view (ompi_file_t *fp,
mca_common_ompio_data_t *data;
ompio_file_t *fh;
if ( (strcmp(datarep, "native") && strcmp(datarep, "NATIVE"))) {
if ( (strcmp(datarep, "native") && strcmp(datarep, "NATIVE") &&
strcmp(datarep, "external32") && strcmp(datarep, "EXTERNAL32"))) {
return MPI_ERR_UNSUPPORTED_DATAREP;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2018 University of Houston. All rights reserved.
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -143,12 +143,11 @@ int mca_io_ompio_file_write_all (ompi_file_t *fh,
data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
OPAL_THREAD_LOCK(&fh->f_lock);
ret = data->ompio_fh.
f_fcoll->fcoll_file_write_all (&data->ompio_fh,
buf,
count,
datatype,
status);
ret = mca_common_ompio_file_write_all (&data->ompio_fh,
buf,
count,
datatype,
status);
OPAL_THREAD_UNLOCK(&fh->f_lock);
if ( MPI_STATUS_IGNORE != status ) {
size_t size;
@ -186,25 +185,15 @@ int mca_io_ompio_file_iwrite_all (ompi_file_t *fh,
{
int ret = OMPI_SUCCESS;
mca_common_ompio_data_t *data=NULL;
ompio_file_t *fp=NULL;
data = (mca_common_ompio_data_t *) fh->f_io_selected_data;
fp = &data->ompio_fh;
OPAL_THREAD_LOCK(&fh->f_lock);
if ( NULL != fp->f_fcoll->fcoll_file_iwrite_all ) {
ret = fp->f_fcoll->fcoll_file_iwrite_all (&data->ompio_fh,
buf,
count,
datatype,
request);
}
else {
/* this fcoll component does not support non-blocking
collective I/O operations. WE fake it with
individual non-blocking I/O operations. */
ret = mca_common_ompio_file_iwrite ( fp, buf, count, datatype, request );
}
ret = mca_common_ompio_file_iwrite_all (&data->ompio_fh,
buf,
count,
datatype,
request);
OPAL_THREAD_UNLOCK(&fh->f_lock);
return ret;

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2016-2019 IBM Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -20,7 +20,6 @@
#include "ompi_config.h"
#include "mpi.h"
#include "opal/threads/mutex.h"
#include "ompi/mca/io/io.h"
#include "io_ompio.h"