From 5d58ce2113767016d5783c40c00a77488e3db2db Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Thu, 29 Jan 2015 10:23:36 -0600 Subject: [PATCH] handle the situation where you have an upper liomit on the simultanious number of pending aio operations. --- ompi/mca/fbtl/posix/fbtl_posix.c | 50 +++++++++++++++++++++-- ompi/mca/fbtl/posix/fbtl_posix.h | 22 +++++++--- ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c | 15 ++++++- ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c | 16 +++++++- 4 files changed, 90 insertions(+), 13 deletions(-) diff --git a/ompi/mca/fbtl/posix/fbtl_posix.c b/ompi/mca/fbtl/posix/fbtl_posix.c index 5854473d60..b38280e650 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix.c +++ b/ompi/mca/fbtl/posix/fbtl_posix.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -32,6 +32,8 @@ #include #endif +int fbtl_posix_max_aio_active_reqs=2048; + #include "ompi/mca/fbtl/fbtl.h" #include "ompi/mca/fbtl/posix/fbtl_posix.h" @@ -95,6 +97,13 @@ int mca_fbtl_posix_component_file_unquery (mca_io_ompio_file_t *file) { } int mca_fbtl_posix_module_init (mca_io_ompio_file_t *file) { + +#if defined (FBTL_POSIX_HAVE_AIO) + long val = sysconf(_SC_AIO_MAX); + if ( -1 != val ) { + fbtl_posix_max_aio_active_reqs = (int)val; + } +#endif return OMPI_SUCCESS; } @@ -107,14 +116,15 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req) { bool ret=false; #if defined (FBTL_POSIX_HAVE_AIO) - int i=0; + int i=0, lcount=0; mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data; - for (i=0; i < data->aio_req_count; i++ ) { + for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) { if ( EINPROGRESS == data->aio_req_status[i] ) { data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]); if ( 0 == data->aio_req_status[i]){ data->aio_open_reqs--; + lcount++; /* assuming right now that aio_return will return ** the number of bytes written/read and not an error code, ** since aio_error should have returned an error in that @@ -135,8 +145,42 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req) break; } } + else { + lcount++; + } } +#if 0 + printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs ); +#endif + if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) { + /* post the next batch of operations */ + data->aio_first_active_req = data->aio_last_active_req; + if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) { + data->aio_last_active_req += data->aio_req_chunks; + } + else { + data->aio_last_active_req = data->aio_req_count; + } + for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) { + if ( FBTL_POSIX_READ == data->aio_req_type ) { + if (-1 == aio_read(&data->aio_reqs[i])) { + perror("aio_read() error"); + return OMPI_ERROR; + } + } + else if ( FBTL_POSIX_WRITE == data->aio_req_type ) { + if (-1 == aio_write(&data->aio_reqs[i])) { + perror("aio_write() error"); + return OMPI_ERROR; + } + } + } +#if 0 + printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req ); +#endif + } + if ( 0 == data->aio_open_reqs ) { /* all pending operations are finished for this request */ req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; diff --git a/ompi/mca/fbtl/posix/fbtl_posix.h b/ompi/mca/fbtl/posix/fbtl_posix.h index 499a35f4d3..22d4de2e70 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix.h +++ b/ompi/mca/fbtl/posix/fbtl_posix.h @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -39,6 +39,8 @@ int mca_fbtl_posix_component_file_unquery (mca_io_ompio_file_t *file); int mca_fbtl_posix_module_init (mca_io_ompio_file_t *file); int mca_fbtl_posix_module_finalize (mca_io_ompio_file_t *file); +extern int fbtl_posix_max_aio_active_reqs; + OMPI_MODULE_DECLSPEC extern mca_fbtl_base_component_2_0_0_t mca_fbtl_posix_component; /* * ****************************************************************** @@ -57,17 +59,25 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req); void mca_fbtl_posix_request_free ( mca_ompio_request_t *req); struct mca_fbtl_posix_request_data_t { - int aio_req_count; - int aio_open_reqs; - struct aiocb *aio_reqs; - int *aio_req_status; - ssize_t aio_total_len; + int aio_req_count; /* total number of aio reqs */ + int aio_open_reqs; /* number of unfinished reqs */ + int aio_req_type; /* read or write */ + int aio_req_chunks; /* max. no. of aio reqs that can be posted at once*/ + int aio_first_active_req; /* first active posted req */ + int aio_last_active_req; /* last currently active poted req */ + struct aiocb *aio_reqs; /* pointer array of req structures */ + int *aio_req_status; /* array of statuses */ + ssize_t aio_total_len; /* total amount of data written */ }; typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t; /* Right now statically defined, will become a configure check */ #define FBTL_POSIX_HAVE_AIO 1 +/* define constants for AIO requests */ +#define FBTL_POSIX_READ 1 +#define FBTL_POSIX_WRITE 2 + /* * ****************************************************************** * ************ functions implemented in this module end ************ diff --git a/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c b/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c index 67d26d33f2..41cc861766 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -47,6 +47,8 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh, data->aio_req_count = fh->f_num_of_io_entries; data->aio_open_reqs = fh->f_num_of_io_entries; + data->aio_req_type = FBTL_POSIX_READ; + data->aio_req_chunks = fbtl_posix_max_aio_active_reqs; data->aio_total_len = 0; data->aio_reqs = (struct aiocb *) malloc (sizeof(struct aiocb) * fh->f_num_of_io_entries); @@ -70,7 +72,16 @@ ssize_t mca_fbtl_posix_ipreadv (mca_io_ompio_file_t *fh, data->aio_reqs[i].aio_reqprio = 0; data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE; data->aio_req_status[i] = EINPROGRESS; - + } + + data->aio_first_active_req = 0; + if ( data->aio_req_count > data->aio_req_chunks ) { + data->aio_last_active_req = data->aio_req_chunks; + } + else { + data->aio_last_active_req = data->aio_req_count; + } + for (i=0; i < data->aio_last_active_req; i++) { if (-1 == aio_read(&data->aio_reqs[i])) { perror("aio_read() error"); return OMPI_ERROR; diff --git a/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c b/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c index 856be64c82..acc1ffd789 100644 --- a/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c +++ b/ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2008-2014 University of Houston. All rights reserved. + * Copyright (c) 2008-2015 University of Houston. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -46,6 +46,8 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh, data->aio_req_count = fh->f_num_of_io_entries; data->aio_open_reqs = fh->f_num_of_io_entries; + data->aio_req_type = FBTL_POSIX_WRITE; + data->aio_req_chunks = fbtl_posix_max_aio_active_reqs; data->aio_total_len = 0; data->aio_reqs = (struct aiocb *) malloc (sizeof(struct aiocb) * fh->f_num_of_io_entries); @@ -69,7 +71,17 @@ ssize_t mca_fbtl_posix_ipwritev (mca_io_ompio_file_t *fh, data->aio_reqs[i].aio_reqprio = 0; data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE; data->aio_req_status[i] = EINPROGRESS; - + } + + data->aio_first_active_req = 0; + if ( data->aio_req_count > data->aio_req_chunks ) { + data->aio_last_active_req = data->aio_req_chunks; + } + else { + data->aio_last_active_req = data->aio_req_count; + } + + for (i=0; i < data->aio_last_active_req; i++) { if (-1 == aio_write(&data->aio_reqs[i])) { perror("aio_write() error"); return OMPI_ERROR;