1
1

implementation of non-blocking read/write operations through aio

functions for the posix module. Som interface changes for the fbtl were
necessary for that.

This commit was SVN r32777.
Этот коммит содержится в:
Edgar Gabriel 2014-09-23 21:27:57 +00:00
родитель 8bd3160432
Коммит 05c34946f7
11 изменённых файлов: 277 добавлений и 111 удалений

Просмотреть файл

@ -122,13 +122,15 @@ typedef ssize_t (*mca_fbtl_base_module_pwritev_fn_t)
(struct mca_io_ompio_file_t *file );
typedef ssize_t (*mca_fbtl_base_module_ipreadv_fn_t)
(struct mca_io_ompio_file_t *file,
ompi_request_t **request);
ompi_request_t *request);
typedef ssize_t (*mca_fbtl_base_module_ipwritev_fn_t)
(struct mca_io_ompio_file_t *file,
ompi_request_t **request);
ompi_request_t *request);
typedef bool (*mca_fbtl_base_module_progress_fn_t)
( struct mca_ompio_request_t *request);
typedef void (*mca_fbtl_base_module_request_free_fn_t)
( struct mca_ompio_request_t *request);
/*
* ***********************************************************************
* *************************** module structure *************************
@ -144,11 +146,12 @@ struct mca_fbtl_base_module_1_0_0_t {
mca_fbtl_base_module_finalize_1_0_0_fn_t fbtl_module_finalize;
/* FBTL function pointers */
mca_fbtl_base_module_preadv_fn_t fbtl_preadv;
mca_fbtl_base_module_ipreadv_fn_t fbtl_ipreadv;
mca_fbtl_base_module_pwritev_fn_t fbtl_pwritev;
mca_fbtl_base_module_ipwritev_fn_t fbtl_ipwritev;
mca_fbtl_base_module_progress_fn_t fbtl_progress;
mca_fbtl_base_module_preadv_fn_t fbtl_preadv;
mca_fbtl_base_module_ipreadv_fn_t fbtl_ipreadv;
mca_fbtl_base_module_pwritev_fn_t fbtl_pwritev;
mca_fbtl_base_module_ipwritev_fn_t fbtl_ipwritev;
mca_fbtl_base_module_progress_fn_t fbtl_progress;
mca_fbtl_base_module_request_free_fn_t fbtl_request_free;
};
typedef struct mca_fbtl_base_module_1_0_0_t mca_fbtl_base_module_1_0_0_t;
typedef mca_fbtl_base_module_1_0_0_t mca_fbtl_base_module_t;

Просмотреть файл

@ -40,7 +40,8 @@ static mca_fbtl_base_module_1_0_0_t plfs = {
NULL, /* non-blocking read */
mca_fbtl_plfs_pwritev, /* blocking write */
NULL, /* non-blocking write */
NULL /* module specific progress */
NULL, /* module specific progress */
NULL /* free module specific data items on the request */
};
/*
* *******************************************************************

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -25,6 +25,13 @@
#include "ompi_config.h"
#include "mpi.h"
#include <unistd.h>
#include <sys/uio.h>
#if HAVE_AIO_H
#include <aio.h>
#endif
#include "ompi/mca/fbtl/fbtl.h"
#include "ompi/mca/fbtl/posix/fbtl_posix.h"
@ -37,10 +44,21 @@ static mca_fbtl_base_module_1_0_0_t posix = {
mca_fbtl_posix_module_init, /* initalise after being selected */
mca_fbtl_posix_module_finalize, /* close a module on a communicator */
mca_fbtl_posix_preadv, /* blocking read */
#if defined (FBTL_POSIX_HAVE_AIO)
mca_fbtl_posix_ipreadv, /* non-blocking read*/
#else
NULL, /* non-blocking read */
#endif
mca_fbtl_posix_pwritev, /* blocking write */
#if defined (FBTL_POSIX_HAVE_AIO)
mca_fbtl_posix_ipwritev, /* non-blocking write */
mca_fbtl_posix_progress, /* module specific progress */
mca_fbtl_posix_request_free /* free module specific data items on the request */
#else
NULL, /* non-blocking write */
NULL /* module specific progress */
NULL, /* module specific progress */
NULL /* free module specific data items on the request */
#endif
};
/*
* *******************************************************************
@ -84,3 +102,66 @@ int mca_fbtl_posix_module_init (mca_io_ompio_file_t *file) {
int mca_fbtl_posix_module_finalize (mca_io_ompio_file_t *file) {
return OMPI_SUCCESS;
}
bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
{
bool ret=false;
#if defined (FBTL_POSIX_HAVE_AIO)
int i=0;
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
for (i=0; i < data->aio_req_count; i++ ) {
if ( EINPROGRESS == data->aio_req_status[i] ) {
data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
if ( 0 == data->aio_req_status[i]){
data->aio_open_reqs--;
/* assuming right now that aio_return will return
** the number of bytes written/read and not an error code,
** since aio_error should have returned an error in that
** case and not 0 ( which means request is complete)
*/
data->aio_total_len += aio_return (&data->aio_reqs[i]);
}
else if ( EINPROGRESS == data->aio_req_status[i]){
/* not yet done */
continue;
}
else {
/* an error occured. Mark the request done, but
set an error code in the status */
req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
req->req_ompi.req_status._ucount = data->aio_total_len;
ret = true;
break;
}
}
}
if ( 0 == data->aio_open_reqs ) {
/* all pending operations are finished for this request */
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
req->req_ompi.req_status._ucount = data->aio_total_len;
ret = true;
}
#endif
return ret;
}
void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
{
#if defined (FBTL_POSIX_HAVE_AIO)
/* Free the fbtl specific data structures */
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
if (NULL != data ) {
if ( NULL != data->aio_reqs ) {
free ( data->aio_reqs);
}
if ( NULL != data->aio_req_status ) {
free ( data->aio_req_status );
}
free ( data );
req->req_data = NULL;
}
#endif
return;
}

Просмотреть файл

@ -24,6 +24,7 @@
#include "opal/mca/mca.h"
#include "ompi/mca/fbtl/fbtl.h"
#include "ompi/mca/io/ompio/io_ompio.h"
#include "ompi/mca/io/ompio/io_ompio_request.h"
extern int mca_fbtl_posix_priority;
@ -48,9 +49,24 @@ OMPI_MODULE_DECLSPEC extern mca_fbtl_base_component_2_0_0_t mca_fbtl_posix_compo
ssize_t mca_fbtl_posix_preadv (mca_io_ompio_file_t *file );
ssize_t mca_fbtl_posix_pwritev (mca_io_ompio_file_t *file );
ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *file,
ompi_request_t **request);
ompi_request_t *request);
ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *file,
ompi_request_t **request);
ompi_request_t *request);
bool mca_fbtl_posix_progress ( mca_ompio_request_t *req);
void mca_fbtl_posix_request_free ( mca_ompio_request_t *req);
struct mca_fbtl_posix_request_data_t {
int aio_req_count;
int aio_open_reqs;
struct aiocb *aio_reqs;
int *aio_req_status;
ssize_t aio_total_len;
};
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;
/* Right now statically defined, will become a configure check */
#define FBTL_POSIX_HAVE_AIO 1
/*
* ******************************************************************

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -21,13 +21,65 @@
#include "ompi_config.h"
#include "fbtl_posix.h"
#include <unistd.h>
#include <sys/uio.h>
#if HAVE_AIO_H
#include <aio.h>
#endif
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *file,
ompi_request_t **request)
ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh,
ompi_request_t *request)
{
printf ("POSIX IPREADV\n");
#if defined (FBTL_POSIX_HAVE_AIO)
mca_fbtl_posix_request_data_t *data;
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
int i=0;
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
if ( NULL == data ) {
opal_output (1,"could not allocate memory\n");
return 0;
}
data->aio_req_count = fh->f_num_of_io_entries;
data->aio_open_reqs = fh->f_num_of_io_entries;
data->aio_total_len = 0;
data->aio_reqs = (struct aiocb *) malloc (sizeof(struct aiocb) *
fh->f_num_of_io_entries);
if (NULL == data->aio_reqs) {
opal_output(1, "OUT OF MEMORY\n");
return 0;
}
data->aio_req_status = (int *) malloc (sizeof(int) * fh->f_num_of_io_entries);
if (NULL == data->aio_req_status) {
opal_output(1, "OUT OF MEMORY\n");
return 0;
}
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
fh->f_io_array[i].offset;
data->aio_reqs[i].aio_buf = fh->f_io_array[i].memory_address;
data->aio_reqs[i].aio_nbytes = fh->f_io_array[i].length;
data->aio_reqs[i].aio_fildes = fh->fd;
data->aio_reqs[i].aio_reqprio = 0;
data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
data->aio_req_status[i] = EINPROGRESS;
if (-1 == aio_read(&data->aio_reqs[i])) {
perror("aio_read() error");
return OMPI_ERROR;
}
}
req->req_data = data;
req->req_progress_fn = mca_fbtl_posix_progress;
req->req_free_fn = mca_fbtl_posix_request_free;
#endif
return OMPI_SUCCESS;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -20,97 +20,65 @@
#include "ompi_config.h"
#include "fbtl_posix.h"
#include "mpi.h"
#include <unistd.h>
#include <sys/uio.h>
#if HAVE_AIO_H
#include <aio.h>
#endif
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/mca/fbtl/fbtl.h"
ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh,
ompi_request_t **request)
ompi_request_t *request)
{
int i;
int num_req = 0;
int merge = 0;
size_t k;
char *merge_buf = NULL;
size_t merge_length = 0;
OMPI_MPI_OFFSET_TYPE merge_offset = 0;
struct aiocb *aiocbp;
#if defined(FBTL_POSIX_HAVE_AIO)
mca_fbtl_posix_request_data_t *data;
mca_ompio_request_t *req = (mca_ompio_request_t *) request;
int i=0;
aiocbp = (struct aiocb *) malloc (sizeof(struct aiocb) *
fh->f_num_of_io_entries);
if (NULL == aiocbp) {
data = (mca_fbtl_posix_request_data_t *) malloc ( sizeof (mca_fbtl_posix_request_data_t));
if ( NULL == data ) {
opal_output (1,"could not allocate memory\n");
return 0;
}
data->aio_req_count = fh->f_num_of_io_entries;
data->aio_open_reqs = fh->f_num_of_io_entries;
data->aio_total_len = 0;
data->aio_reqs = (struct aiocb *) malloc (sizeof(struct aiocb) *
fh->f_num_of_io_entries);
if (NULL == data->aio_reqs) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
return 0;
}
for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
if (fh->f_num_of_io_entries != i+1) {
if (((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
(OPAL_PTRDIFF_TYPE)fh->f_io_array[i].length) ==
(OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset) {
if (!merge) {
merge_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
fh->f_io_array[i].offset;
merge_length = fh->f_io_array[i].length;
}
merge_length += fh->f_io_array[i+1].length;
merge++;
continue;
}
}
if (merge) {
merge_buf = malloc (merge_length);
if (NULL == merge_buf) {
opal_output(1, "OUT OF MEMORY\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
k = 0;
while (merge >= 0) {
memcpy (merge_buf + k,
fh->f_io_array[i-merge].memory_address,
fh->f_io_array[i-merge].length);
k += fh->f_io_array[i-merge].length;
merge --;
}
aiocbp[num_req].aio_offset = merge_offset;
aiocbp[num_req].aio_buf = merge_buf;
aiocbp[num_req].aio_nbytes = merge_length;
aiocbp[num_req].aio_fildes = fh->fd;
aiocbp[num_req].aio_reqprio = 0;
aiocbp[num_req].aio_sigevent.sigev_notify = SIGEV_NONE;
if (-1 == aio_write(&aiocbp[num_req])) {
perror("aio_write() error");
return OMPI_ERROR;
}
merge = 0;
merge_offset = 0;
merge_length = 0;
if (NULL != merge_buf) {
free (merge_buf);
merge_buf = NULL;
}
}
else {
aiocbp[num_req].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
fh->f_io_array[i].offset;
aiocbp[num_req].aio_buf = fh->f_io_array[i].memory_address;
aiocbp[num_req].aio_nbytes = fh->f_io_array[i].length;
aiocbp[num_req].aio_fildes = fh->fd;
aiocbp[num_req].aio_reqprio = 0;
aiocbp[num_req].aio_sigevent.sigev_notify = SIGEV_NONE;
if (-1 == aio_write(&aiocbp[num_req])) {
perror("aio_write() error");
return OMPI_ERROR;
}
}
num_req ++;
data->aio_req_status = (int *) malloc (sizeof(int) * fh->f_num_of_io_entries);
if (NULL == data->aio_req_status) {
opal_output(1, "OUT OF MEMORY\n");
return 0;
}
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
fh->f_io_array[i].offset;
data->aio_reqs[i].aio_buf = fh->f_io_array[i].memory_address;
data->aio_reqs[i].aio_nbytes = fh->f_io_array[i].length;
data->aio_reqs[i].aio_fildes = fh->fd;
data->aio_reqs[i].aio_reqprio = 0;
data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
data->aio_req_status[i] = EINPROGRESS;
if (-1 == aio_write(&data->aio_reqs[i])) {
perror("aio_write() error");
return OMPI_ERROR;
}
}
req->req_data = data;
req->req_progress_fn = mca_fbtl_posix_progress;
req->req_free_fn = mca_fbtl_posix_request_free;
#endif
return OMPI_SUCCESS;
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2011 University of Houston. All rights reserved.
* Copyright (c) 2008-2014 University of Houston. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,7 +40,8 @@ static mca_fbtl_base_module_1_0_0_t pvfs2 = {
NULL, /* non-blocking read */
mca_fbtl_pvfs2_pwritev, /* blocking write */
NULL, /* non-blocking write */
NULL /* module specific progress */
NULL, /* module specific progress */
NULL /* free module specific data items on the request */
};
/*
* *******************************************************************

Просмотреть файл

@ -267,9 +267,16 @@ int ompio_io_ompio_file_iread (mca_io_ompio_file_t *fh,
&total_bytes_read);
if (fh->f_num_of_io_entries) {
fh->f_fbtl->fbtl_ipreadv (fh, request);
fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
}
if ( false == mca_io_ompio_progress_is_registered ) {
// Lazy initialization of progress function to minimize impact
// on other ompi functionality in case its not used.
opal_progress_register (mca_io_ompio_component_progress);
mca_io_ompio_progress_is_registered=true;
}
fh->f_num_of_io_entries = 0;
if (NULL != fh->f_io_array) {
free (fh->f_io_array);

Просмотреть файл

@ -255,7 +255,14 @@ int ompio_io_ompio_file_iwrite (mca_io_ompio_file_t *fh,
&total_bytes_written);
if (fh->f_num_of_io_entries) {
fh->f_fbtl->fbtl_ipwritev (fh, request);
fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *) ompio_req);
}
if ( false == mca_io_ompio_progress_is_registered ) {
// Lazy initialization of progress function to minimize impact
// on other ompi functionality in case its not used.
opal_progress_register (mca_io_ompio_component_progress);
mca_io_ompio_progress_is_registered=true;
}
fh->f_num_of_io_entries = 0;

Просмотреть файл

@ -23,10 +23,14 @@
static void mca_io_ompio_request_construct(mca_ompio_request_t* req);
static void mca_io_ompio_request_destruct(mca_ompio_request_t *req);
bool mca_io_ompio_progress_is_registered=false;
static int mca_io_ompio_request_free ( struct ompi_request_t **req)
{
mca_ompio_request_t *ompio_req = ( mca_ompio_request_t *)*req;
if ( NULL != ompio_req->req_free_fn ) {
ompio_req->req_free_fn (ompio_req );
}
opal_list_remove_item (&mca_io_ompio_pending_requests, &ompio_req->req_item);
OBJ_RELEASE (*req);
@ -49,6 +53,7 @@ void mca_io_ompio_request_construct(mca_ompio_request_t* req)
req->req_ompi.req_cancel = mca_io_ompio_request_cancel;
req->req_data = NULL;
req->req_progress_fn = NULL;
req->req_free_fn = NULL;
OBJ_CONSTRUCT(&req->req_item, opal_list_item_t);
opal_list_append (&mca_io_ompio_pending_requests, &req->req_item);
@ -64,3 +69,29 @@ void mca_io_ompio_request_destruct(mca_ompio_request_t* req)
return;
}
int mca_io_ompio_component_progress ( void )
{
mca_ompio_request_t *req=NULL;
opal_list_item_t *litem=NULL;
int completed=0;
OPAL_LIST_FOREACH(litem, &mca_io_ompio_pending_requests, opal_list_item_t) {
req = GET_OMPIO_REQ_FROM_ITEM(litem);
if ( true == req->req_ompi.req_complete ) {
continue;
}
if ( NULL != req->req_progress_fn ) {
if ( req->req_progress_fn(req) ) {
completed++;
ompi_request_complete (&req->req_ompi, 1);
/* The fbtl progress function is expected to set the
** status elements
*/
}
}
}
return completed;
}

Просмотреть файл

@ -29,6 +29,7 @@
BEGIN_C_DECLS
extern opal_list_t mca_io_ompio_pending_requests;
extern bool mca_io_ompio_progress_is_registered;
/**
* Type of request.
@ -45,22 +46,20 @@ typedef enum {
* Main structure for OMPIO requests
*/
struct mca_ompio_request_t {
ompi_request_t req_ompi;
mca_ompio_request_type_t req_type;
void *req_data;
opal_list_item_t req_item;
mca_fbtl_base_module_progress_fn_t *req_progress_fn;
ompi_request_t req_ompi;
mca_ompio_request_type_t req_type;
void *req_data;
opal_list_item_t req_item;
mca_fbtl_base_module_progress_fn_t req_progress_fn;
mca_fbtl_base_module_request_free_fn_t req_free_fn;
};
typedef struct mca_ompio_request_t mca_ompio_request_t;
OBJ_CLASS_DECLARATION(mca_ompio_request_t);
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
#define GET_OMPIO_REQ_FROMM_ITEM(ITEM) container_of((ITEM), mca_ompio_request_t, req_item)
#define GET_OMPIO_REQ_FROM_ITEM(ITEM) ((mca_ompio_request_t *)((char *)ITEM - offsetof(struct mca_ompio_request_t,req_item)))
OMPI_DECLSPEC int mca_io_ompio_component_progress ( void);
END_C_DECLS