/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2016 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2006-2012 Cisco Systems, Inc.  All rights reserved.
 * Copyright (c) 2009      Sun Microsystems, Inc.  All rights reserved.
 * Copyright (c) 2018      Research Organization for Information Science
 *                         and Technology (RIST).  All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

#include "ompi_config.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/grequest.h"
#include "ompi/mpi/fortran/base/fint_2_int.h"
#include "ompi/request/grequestx.h"

static bool requests_initialized = false;
static opal_list_t requests;
static opal_atomic_int32_t active_requests = 0;
static bool in_progress = false;
static opal_mutex_t lock = OPAL_MUTEX_STATIC_INIT;

static int grequestx_progress(void) {
    ompi_grequest_t *request, *next;
    int completed = 0;

    OPAL_THREAD_LOCK(&lock);
    if (!in_progress) {
        in_progress = true;

        OPAL_LIST_FOREACH_SAFE(request, next, &requests, ompi_grequest_t) {
            MPI_Status status;
            OPAL_THREAD_UNLOCK(&lock);
            request->greq_poll.c_poll(request->greq_state, &status);
            OPAL_THREAD_LOCK(&lock);
            if (REQUEST_COMPLETE(&request->greq_base)) {
                opal_list_remove_item(&requests, &request->greq_base.super.super);
                OBJ_RELEASE(request);
                completed++;
            }
        }
        in_progress = false;
    }
    OPAL_THREAD_UNLOCK(&lock);

    return completed;
}

int ompi_grequestx_start(
    MPI_Grequest_query_function *gquery_fn,
    MPI_Grequest_free_function *gfree_fn,
    MPI_Grequest_cancel_function *gcancel_fn,
    ompi_grequestx_poll_function *gpoll_fn,
    void* extra_state,
    ompi_request_t** request)
{
    int rc;

    rc = ompi_grequest_start(gquery_fn, gfree_fn, gcancel_fn, extra_state, request);
    if (OMPI_SUCCESS != rc) {
        return rc;
    }
    ((ompi_grequest_t *)*request)->greq_poll.c_poll = gpoll_fn;

    /* prevent the request from being destroyed upon completion,
     * we first have to remove it from the list of active requests
     */
    OBJ_RETAIN(((ompi_grequest_t *)*request));

    OPAL_THREAD_LOCK(&lock);
    if (!requests_initialized) {
        OBJ_CONSTRUCT(&requests, opal_list_t);
        requests_initialized = true;
    }

    opal_list_append(&requests, &((*request)->super.super));
    OPAL_THREAD_UNLOCK(&lock);
    int32_t tmp = OPAL_THREAD_ADD_FETCH32(&active_requests, 1);
    if (1 == tmp) {
        opal_progress_register(grequestx_progress);
    }

    return OMPI_SUCCESS;
}


struct grequestx_class {
    opal_object_t super;
    MPI_Grequest_query_function *gquery_fn;
    MPI_Grequest_free_function *gfree_fn;
    MPI_Grequest_cancel_function *gcancel_fn;
    ompi_grequestx_poll_function *gpoll_fn;
    ompi_grequestx_wait_function *gwait_fn;
} ;

typedef struct grequestx_class grequestx_class;

static int next_class = 0;

static OBJ_CLASS_INSTANCE(grequestx_class, opal_object_t, NULL, NULL);

static opal_pointer_array_t classes;

int ompi_grequestx_class_create(
    MPI_Grequest_query_function *gquery_fn,
    MPI_Grequest_free_function *gfree_fn,
    MPI_Grequest_cancel_function *gcancel_fn,
    ompi_grequestx_poll_function *gpoll_fn,
    ompi_grequestx_wait_function *gwait_fn,
    ompi_grequestx_class *greq_class)
{
    grequestx_class * class = OBJ_NEW(grequestx_class);
    class->gquery_fn = gquery_fn;
    class->gfree_fn = gfree_fn;
    class->gcancel_fn = gcancel_fn;
    class->gpoll_fn = gpoll_fn;
    class->gwait_fn = gwait_fn;

    if (0 == next_class) {
        OBJ_CONSTRUCT(&classes, opal_pointer_array_t);
    }
    opal_pointer_array_add(&classes, class);
    next_class ++;

    return OMPI_SUCCESS;
}

int ompi_grequestx_class_allocate(
    ompi_grequestx_class greq_class,
    void *extra_state,
    ompi_request_t **request)
{
    grequestx_class *class = opal_pointer_array_get_item(&classes, greq_class);
    ompi_grequestx_start(class->gquery_fn, class->gfree_fn, class->gcancel_fn, class->gpoll_fn, extra_state, request);

    return OMPI_SUCCESS;
}