/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */
/** @file */

#include "ompi_config.h"

#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "opal/sys/atomic.h"
#include "coll_sm2.h"
/* debug 
extern int debug_print;
 end debug */

/**
 * Shared memory barrier.
 *
 * Tree-based algorithm for a barrier: a fan in to rank 0 followed by
 * a fan out using the barrier segments in the shared memory area.
 *
 * There are 2 sets of barrier buffers -- since there can only be, at
 * most, 2 outstanding barriers at any time, there is no need for more
 * than this.  The generalized in-use flags, control, and data
 * segments are not used.
 *
 * The general algorithm is for a given process to wait for its N
 * children to fan in by monitoring a uint32_t in its barrier "in"
 * buffer.  When this value reaches N (i.e., each of the children have
 * atomically incremented the value), then the process atomically
 * increases the uint32_t in its parent's "in" buffer.  Then the
 * process waits for the parent to set a "1" in the process' "out"
 * buffer.  Once this happens, the process writes a "1" in each of its
 * children's "out" buffers, and returns.
 *
 * There's corner cases, of course, such as the root that has no
 * parent, and the leaves that have no children.  But that's the
 * general idea.
 */

/* non-blocking barrier - init function */
int mca_coll_sm2_nbbarrier_intra(struct ompi_communicator_t *comm,
        mca_coll_sm2_nb_request_process_private_mem_t *request,
        mca_coll_base_module_t *module)
{

    /* since completion must be in-order for the sm collective buffer allocation
     * to work correctly, no barrier completion will happen here.  The most
     * that will be done is for the leaf processes, to signal their presence.
     */
    /* local variables */
    int index;
    long long tag;
    mca_coll_sm2_module_t *sm_module;
    mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
    mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;

    /* get pointer to nb-barrier structure */
    index=request->sm_index;
    sm_barrier_region=(mca_coll_sm2_nb_request_process_shared_mem_t *)
        (request->barrier_base_address[index]);

    /* set barrier tag - no atomicity needed as only only one outstanding
     *   collective per communicator exists
     */
    sm_module=(mca_coll_sm2_module_t *)module;
    sm_module->nb_barrier_tag++;
    request->tag=sm_module->nb_barrier_tag;
    tag=sm_module->nb_barrier_tag;

    if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
        /*
         * Fan-in phase
         */
    
        /* Set my completion flag */
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
             sm_module->sm_buffer_mgmt_barrier_tree.my_rank*
             sm_module->sm2_size_management_region_per_proc);
        sm_address->flag=tag;
        request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
    
    } else if( INTERIOR_NODE  == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
        /*
         * Fan-in phase
         */
        request->sm2_barrier_phase=NB_BARRIER_FAN_IN;

    } else {
    
        /*
         * Fan-in phase
         */
        request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
    }

    /* return - successful completion */
    return OMPI_SUCCESS;
}


/* non-blocking barrier - completion function */
int mca_coll_sm2_nbbarrier_intra_progress(struct ompi_communicator_t *comm,
        mca_coll_sm2_nb_request_process_private_mem_t *request,
        mca_coll_base_module_t *module)
{

    /* local variables */
    int index;
    int child,cnt,phase;
    long long tag;
    mca_coll_sm2_module_t *sm_module;
    mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
    mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;

    /* get pointer to nb-barrier structure */
    index=request->sm_index;
    sm_barrier_region=request->barrier_base_address[index];

    /* set barrier tag - no atomicity needed as only only one outstanding
     *   collective per communicator exists
     */
    sm_module=(mca_coll_sm2_module_t *)module;
    tag=request->tag;

    if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
        phase=request->sm2_barrier_phase;
        if( NB_BARRIER_FAN_OUT == phase ) {
            goto FANOUT_LEAF;
        } else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
            goto DONE;
        }
        /* defult - NB_BARRIER_FAN_IN */
    
        /*
         * Fan-in phase
         */
    
    FANOUT_LEAF:
        /*
         * Fan-out phase
         */
    
        /*
         * check to see if parent has checked in
         */
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
            sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
            sm_module->sm2_size_management_region_per_proc);
        if( sm_address->flag != -tag ) {
            /* if parent has not checked in - set parameters for async
             *   completion, incomplet barrier flag, and bail
             */
            request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
            return OMPI_SUCCESS;
        }
    
        /*
         * set my completion flag
         */
        request->sm2_barrier_phase=NB_BARRIER_DONE;
    } else if( INTERIOR_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
        phase=request->sm2_barrier_phase;
        if( NB_BARRIER_FAN_OUT == phase ) {
            goto FANOUT_INTERIOR;
        } else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
            goto DONE;
        }
        /* defult - NB_BARRIER_FAN_IN */
    
        /*
         * Fan-in phase
         */
    
        /* check to see if children have checked in */
        cnt=0;
        for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
            /* compute flag address */
            sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
                ((char *)sm_barrier_region+
                sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
                sm_module->sm2_size_management_region_per_proc);
            if(sm_address->flag == tag ) {
                /* child arrived */
                cnt++;
            } else {
                /* child not arrived, just break out */
                break;
            }
        }
    
        /* if children have not checked in - set paramenters for async
         *   completion, incomplet barrier flag, and bail
         */
        if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
            /* set restart parameters, and exit */
            request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
            return OMPI_SUCCESS;
        }
    
        /* Set my completion flag */
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
             sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
             sm_module->sm2_size_management_region_per_proc);
        sm_address->flag=tag;
        /* don't need memory barrier here, as we are not setting any other sm
         * data for someone else to read
         */
    
    FANOUT_INTERIOR:
        /*
         * Fan-out phase
         */
    
        /*
         * check to see if parent has checked in
         */
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
            sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
            sm_module->sm2_size_management_region_per_proc);
        if( sm_address->flag != -tag ) {
            /* if parent has not checked in - set parameters for async
             *   completion, incomplet barrier flag, and bail
             */
            request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
            return OMPI_SUCCESS;
        }
    
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
             sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
             sm_module->sm2_size_management_region_per_proc);
        sm_address->flag=-tag;
    
        /*
         * set my completion flag
         */
        request->sm2_barrier_phase=NB_BARRIER_DONE;
    } else {
        /* root node */
        phase=request->sm2_barrier_phase;
        if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
            goto DONE;
        }
        /* defult - NB_BARRIER_FAN_IN */
    
        /*
         * Fan-in phase
         */
    
        /* check to see if children have checked in */
        cnt=0;
        for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
            /* compute flag address */
            sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
                ((char *)sm_barrier_region+
                sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
                sm_module->sm2_size_management_region_per_proc);
            if(sm_address->flag == tag ) {
                /* child arrived */
                cnt++;
            } else {
                /* child not arrived, just break out */
                break;
            }
        }
    
        /* if children have not checked in - set paramenters for async
         *   completion, incomplet barrier flag, and bail
         */
        if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
            /* set restart parameters, and exit */
            request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
            return OMPI_SUCCESS;
        }
    
        /* Set my completion flag */
        sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
            ((char *)sm_barrier_region+
             sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
             sm_module->sm2_size_management_region_per_proc);
        sm_address->flag=-tag;

        /*
         * set my completion flag
         */
        request->sm2_barrier_phase=NB_BARRIER_DONE;
    }
    
DONE:
    /* return - successful completion */
    return OMPI_SUCCESS;
}

/**
 * Shared memory blocking allreduce.
 */
int mca_coll_sm2_barrier_intra_fanin_fanout(
        struct ompi_communicator_t *comm,
        mca_coll_base_module_t *module)
{
    /* local variables */
    int rc=OMPI_SUCCESS,bar_buff_index;
    int my_rank, child_rank, child, n_parents, n_children;
    int my_fanin_parent;
    int my_fanout_parent;
    long long tag;
    mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
    volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
    volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
    mca_coll_sm2_module_t *sm_module;
    tree_node_t *my_reduction_node, *my_fanout_read_tree;
    sm_work_buffer_t *sm_buffer_desc;

    sm_module=(mca_coll_sm2_module_t *) module;

    /* get my node for the reduction tree */
    my_rank=ompi_comm_rank(comm);
    my_reduction_node=&(sm_module->reduction_tree[my_rank]);
    my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]);
    n_children=my_reduction_node->n_children;
    n_parents=my_reduction_node->n_parents;
    my_fanin_parent=my_reduction_node->parent_rank;
    my_fanout_parent=my_fanout_read_tree->parent_rank;

    /* get unique tag for this stripe - assume only one collective
     *  per communicator at a given time, so no locking needed
     *  for atomic update of the tag */
    tag=sm_module->collective_tag;
    sm_module->collective_tag++;

    /*
    sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
    */
    sm_module->index_blocking_barrier_memory_bank^=1;
    bar_buff_index=sm_module->index_blocking_barrier_memory_bank;

    my_ctl_pointer=
        sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
    /*
        sm_buffer_desc->proc_memory[my_rank].control_region;
        */

    /***************************
     * Fan into root phase
     ***************************/

    if( LEAF_NODE != my_reduction_node->my_node_type ) {

        /*
         * Wait on children, and apply op to their data
         */
        for( child=0 ; child < n_children ; child++ ) {

            child_rank=my_reduction_node->children_ranks[child];
            child_ctl_pointer=
                sm_module->ctl_blocking_barrier[bar_buff_index][child_rank];
            /*
                sm_buffer_desc->proc_memory[child_rank].control_region;
                */

            /* wait until child flag is set */
            while(  child_ctl_pointer->flag != tag ) {
                opal_progress();
            }


            /* end test */
        } /* end child loop */

        /* set memory barriet to make sure data is in main memory before
         *  the completion flgas are set.
         */
        /*
        MB();
        */

        /*
         * Signal parent that data is ready
         */
        my_ctl_pointer->flag=tag;
    } else {
        /* leaf node */

        /* set memory barriet to make sure data is in main memory before
         *  the completion flgas are set.
         */
        /*
        MB();
        */

        /*
         * Signal parent that data is ready
         */
        my_ctl_pointer->flag=tag;
    }

    /***************************
     * Fan out from root
     ***************************/
    /*
     * Fan out from root - let the memory copies at each
     *   stage help reduce memory contention.
     */
    if( ROOT_NODE == my_fanout_read_tree->my_node_type ) {
        /* I am the root - so copy  signal children, and then
         *   start reading
         */
        /*
        MB();
        */
        my_ctl_pointer->flag=-tag;


    } else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {

        parent_ctl_pointer=
            sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
        /*
            sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
            */

        /*
         * wait on Parent to signal that data is ready
         */
        while( parent_ctl_pointer->flag != -tag ) {
            opal_progress();
        }

    } else {
        /* interior nodes */
   
        parent_ctl_pointer=
            sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
        /*
            sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
            */

        /*
         * wait on Parent to signal that data is ready
         */
        while( parent_ctl_pointer->flag != -tag ) {
            opal_progress();
        }

        /* set memory barriet to make sure data is in main memory before
         *  the completion flgas are set.
         */
        /*
        MB();
        */

        /* signal children that they may read the result data */
        my_ctl_pointer->flag=-tag;

    }

    /* "free" the shared-memory working buffer */
    /*
    rc=free_sm2_shared_buffer(sm_module);
    if( OMPI_SUCCESS != rc ) {
        goto Error;
    }
    */
    
    /* return */
    return rc;

Error:
    return rc;
}

/**
 * Shared memory blocking barrier.
 */
int mca_coll_sm2_barrier_intra_recursive_doubling( 
        struct ompi_communicator_t *comm,
        mca_coll_base_module_t *module)
{
    /* local variables */
    int rc=OMPI_SUCCESS;
    int pair_rank,exchange,extra_rank;
    pair_exchange_node_t *my_exchange_node;
    int my_rank,bar_buff_index;
    long long tag, base_tag;
    mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
    volatile mca_coll_sm2_nb_request_process_shared_mem_t * 
        partner_ctl_pointer;
    volatile mca_coll_sm2_nb_request_process_shared_mem_t * 
        extra_ctl_pointer;
    mca_coll_sm2_module_t *sm_module;
    /* debug 
    opal_timer_t t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
     end debug */

    sm_module=(mca_coll_sm2_module_t *) module;

    /* get my node for the reduction tree */
    my_exchange_node=&(sm_module->recursive_doubling_tree);
    my_rank=ompi_comm_rank(comm);

    /* get pointer to barrier strcuture */
    sm_module->index_blocking_barrier_memory_bank^=1;
    bar_buff_index=sm_module->index_blocking_barrier_memory_bank;


        /* get unique set of tags for this stripe.
         *  Assume only one collective
         *  per communicator at a given time, so no locking needed
         *  for atomic update of the tag */
        base_tag=sm_module->collective_tag;
        sm_module->collective_tag+=my_exchange_node->n_tags;

        /* get pointers to my work buffers */
        my_ctl_pointer=
            sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
        /*
           sm_buffer_desc->proc_memory[my_rank].control_region;
            */

            /* copy data in from the "extra" source, if need be */
            tag=base_tag;
            if(0 < my_exchange_node->n_extra_sources)  {

                if ( EXCHANGE_NODE == my_exchange_node->node_type ) {

                    extra_rank=my_exchange_node->rank_extra_source;
                    extra_ctl_pointer=
                        sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
                    /*
                        sm_buffer_desc->proc_memory[extra_rank].control_region;
                        */
                    
                /* wait until remote data is read */
                while( extra_ctl_pointer->flag < tag ) {
                    opal_progress();
                }
    
            } else {
        
                /*
                MB();
                */
    
                /*
                 * Signal parent that data is ready
                 */
                my_ctl_pointer->flag=tag;

            }
        }

            /*
        MB();
        */
        /*
         * Signal parent that data is ready
         */
        tag=base_tag+1;
        my_ctl_pointer->flag=tag;

        /* loop over data exchanges */
        for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {

            /* debug 
            t4=opal_sys_timer_get_cycles();
             end debug */

            /* is the remote data read */
            pair_rank=my_exchange_node->rank_exchanges[exchange];
            partner_ctl_pointer=
                sm_module->ctl_blocking_barrier[bar_buff_index][pair_rank];
            /*
                sm_buffer_desc->proc_memory[pair_rank].control_region;
                */
                
            /*
            MB();
            */
            my_ctl_pointer->flag=tag;

            /* wait until remote data is read */
            while(  partner_ctl_pointer->flag < tag  ) {
                opal_progress();
            }

            /* end test */
            
            /* signal that I am done reading my peer's data */
            tag++;

        }

        /* copy data in from the "extra" source, if need be */
        if(0 < my_exchange_node->n_extra_sources)  {
            tag=base_tag+my_exchange_node->n_tags-1;

            if ( EXTRA_NODE == my_exchange_node->node_type ) {

                extra_rank=my_exchange_node->rank_extra_source;
                extra_ctl_pointer=
                    sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
                /*
                    sm_buffer_desc->proc_memory[extra_rank].control_region;
                    */
                    
                /* wait until remote data is read */
                while(! ( extra_ctl_pointer->flag == tag ) ) {
                    opal_progress();
                }
    
            
                /* signal that I am done */
                my_ctl_pointer->flag=tag;

            } else {
        
                tag=base_tag+my_exchange_node->n_tags-1;
                /* set memory barriet to make sure data is in main memory before
                 *  the completion flgas are set.
                 */

                /*
                MB();
                */
    
                /*
                 * Signal parent that data is ready
                 */
                my_ctl_pointer->flag=tag;

                /* wait until child is done to move on - this buffer will
                 *   be reused for the next stripe, so don't want to move
                 *   on too quick.
                 */
                extra_rank=my_exchange_node->rank_extra_source;
                extra_ctl_pointer=
                    sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
                /*
                    sm_buffer_desc->proc_memory[extra_rank].control_region;
                    */

                /* wait until remote data is read */
                while( extra_ctl_pointer->flag < tag  ) {
                    opal_progress();
                }
            }
        }



    /* debug 

    t9=opal_sys_timer_get_cycles();
    timers[5]+=(t9-t8);
     end debug */


    /* "free" the shared-memory working buffer */
    rc=free_sm2_shared_buffer(sm_module);
    if( OMPI_SUCCESS != rc ) {
        goto Error;
    }

    /* return */
    return rc;

Error:
    return rc;
}
/**
 * Shared memory blocking barrier
 */
int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
                                mca_coll_base_module_t *module)
{
    /* local variables */
    int rc;
    mca_coll_sm2_module_t *sm_module;

    sm_module=(mca_coll_sm2_module_t *) module;

    rc= sm_module->barrier_functions[0](comm, module);
    if( OMPI_SUCCESS != rc ) {
        goto Error;
    }

    return OMPI_SUCCESS;

Error:
    return rc;
}