/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2006 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2008-2015 University of Houston. All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 *
 * These symbols are in a file by themselves to provide nice linker
 * semantics. Since linkers generally pull in symbols by object fules,
 * keeping these symbols as the only symbols in this file prevents
 * utility programs such as "ompi_info" from having to import entire
 * modules just to query their version and parameters
 */

#include "ompi_config.h"
#include "mpi.h"

#include <unistd.h>
#include <sys/uio.h>
#if HAVE_AIO_H
#include <aio.h>
#endif

int fbtl_posix_max_aio_active_reqs=2048;

#include "ompi/mca/fbtl/fbtl.h"
#include "ompi/mca/fbtl/posix/fbtl_posix.h"

/*
 * *******************************************************************
 * ************************ actions structure ************************
 * *******************************************************************
 */
static mca_fbtl_base_module_1_0_0_t posix =  {
    mca_fbtl_posix_module_init,     /* initalise after being selected */
    mca_fbtl_posix_module_finalize, /* close a module on a communicator */
    mca_fbtl_posix_preadv,          /* blocking read */
#if defined (FBTL_POSIX_HAVE_AIO)
    mca_fbtl_posix_ipreadv,         /* non-blocking read*/
#else
    NULL,                           /* non-blocking read */
#endif
    mca_fbtl_posix_pwritev,         /* blocking write */
#if defined (FBTL_POSIX_HAVE_AIO)
    mca_fbtl_posix_ipwritev,        /* non-blocking write */
    mca_fbtl_posix_progress,        /* module specific progress */
    mca_fbtl_posix_request_free     /* free module specific data items on the request */
#else
    NULL,                           /* non-blocking write */
    NULL,                           /* module specific progress */
    NULL                            /* free module specific data items on the request */
#endif
};
/*
 * *******************************************************************
 * ************************* structure ends **************************
 * *******************************************************************
 */

int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
                                      bool enable_mpi_threads) {
    /* Nothing to do */

   return OMPI_SUCCESS;
}

struct mca_fbtl_base_module_1_0_0_t *
mca_fbtl_posix_component_file_query (mca_io_ompio_file_t *fh, int *priority) {
   *priority = mca_fbtl_posix_priority;

   if (UFS == fh->f_fstype) {
       if (*priority < 50) {
           *priority = 50;
       }
   }

   return &posix;
}

int mca_fbtl_posix_component_file_unquery (mca_io_ompio_file_t *file) {
   /* This function might be needed for some purposes later. for now it
    * does not have anything to do since there are no steps which need
    * to be undone if this module is not selected */

   return OMPI_SUCCESS;
}

int mca_fbtl_posix_module_init (mca_io_ompio_file_t *file) {

#if defined (FBTL_POSIX_HAVE_AIO)
    long val = sysconf(_SC_AIO_MAX);
    if ( -1 != val ) {
	fbtl_posix_max_aio_active_reqs = (int)val;
    }
#endif
    return OMPI_SUCCESS;
}


int mca_fbtl_posix_module_finalize (mca_io_ompio_file_t *file) {
    return OMPI_SUCCESS;
}

bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
{
    bool ret=false;
#if defined (FBTL_POSIX_HAVE_AIO)
    int i=0, lcount=0, ret_code;
    mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
    off_t start_offset, end_offset, total_length;

    for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
	if ( EINPROGRESS == data->aio_req_status[i] ) {
	    data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
	    if ( 0 == data->aio_req_status[i]){
		data->aio_open_reqs--;
		lcount++;
		/* assuming right now that aio_return will return
		** the number of bytes written/read and not an error code,
		** since aio_error should have returned an error in that
		** case and not 0 ( which means request is complete)
		*/
		data->aio_total_len += aio_return (&data->aio_reqs[i]);
	    }
	    else if ( EINPROGRESS == data->aio_req_status[i]){
		/* not yet done */
		continue;
	    }
	    else {
		/* an error occured. Mark the request done, but
		   set an error code in the status */
		req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
		req->req_ompi.req_status._ucount = data->aio_total_len;
		ret = true;
		break;
	    }
	}
	else {
	    lcount++;
	}
    }
#if 0
    printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
#endif

    if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
        /* release the lock of the previous operations */
        mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
        
	/* post the next batch of operations */
	data->aio_first_active_req = data->aio_last_active_req;
	if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
	    data->aio_last_active_req += data->aio_req_chunks;
	}
	else {
	    data->aio_last_active_req = data->aio_req_count;
	}

        start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
        end_offset   = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
        total_length = (end_offset - start_offset);

        if ( FBTL_POSIX_READ == data->aio_req_type ) {
            ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
        }
        else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
            ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
        }
        if ( 0 < ret_code ) {
            opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
            /* Just in case some part of the lock actually succeeded. */
            mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
            return OMPI_ERROR;
        }
        
	for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
	    if ( FBTL_POSIX_READ == data->aio_req_type ) {
		if (-1 == aio_read(&data->aio_reqs[i])) {
		    opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
                    mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
		    return OMPI_ERROR;
		}
	    }
	    else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
		if (-1 == aio_write(&data->aio_reqs[i])) {
		    opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
                    mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
		    return OMPI_ERROR;
		}
	    }
	}
#if 0
	printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
#endif
    }

    if ( 0 == data->aio_open_reqs ) {
	/* all pending operations are finished for this request */
	req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
	req->req_ompi.req_status._ucount = data->aio_total_len;
        mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
	ret = true;
    }
#endif
    return ret;
}

void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
{
#if defined (FBTL_POSIX_HAVE_AIO)
    /* Free the fbtl specific data structures */
    mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
    if (NULL != data ) {
        mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
	if ( NULL != data->aio_reqs ) {
	    free ( data->aio_reqs);
	}
	if ( NULL != data->aio_req_status ) {
	    free ( data->aio_req_status );
	}
	free ( data );
	req->req_data = NULL;
    }
#endif
  return;
}