1
1
openmpi/ompi/mca/coll/sm2/coll_sm2_barrier.c
Jeff Squyres 0af7ac53f2 Fixes trac:1392, #1400
* add "register" function to mca_base_component_t
   * converted coll:basic and paffinity:linux and paffinity:solaris to
     use this function
   * we'll convert the rest over time (I'll file a ticket once all
     this is committed)
 * add 32 bytes of "reserved" space to the end of mca_base_component_t
   and mca_base_component_data_2_0_0_t to make future upgrades
   [slightly] easier
   * new mca_base_component_t size: 196 bytes
   * new mca_base_component_data_2_0_0_t size: 36 bytes
 * MCA base version bumped to v2.0
   * '''We now refuse to load components that are not MCA v2.0.x'''
 * all MCA frameworks versions bumped to v2.0
 * be a little more explicit about version numbers in the MCA base
   * add big comment in mca.h about versioning philosophy

This commit was SVN r19073.

The following Trac tickets were found above:
  Ticket 1392 --> https://svn.open-mpi.org/trac/ompi/ticket/1392
2008-07-28 22:40:57 +00:00

712 строки
22 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "opal/sys/atomic.h"
#include "coll_sm2.h"
/* debug
extern int debug_print;
end debug */
/**
* Shared memory barrier.
*
* Tree-based algorithm for a barrier: a fan in to rank 0 followed by
* a fan out using the barrier segments in the shared memory area.
*
* There are 2 sets of barrier buffers -- since there can only be, at
* most, 2 outstanding barriers at any time, there is no need for more
* than this. The generalized in-use flags, control, and data
* segments are not used.
*
* The general algorithm is for a given process to wait for its N
* children to fan in by monitoring a uint32_t in its barrier "in"
* buffer. When this value reaches N (i.e., each of the children have
* atomically incremented the value), then the process atomically
* increases the uint32_t in its parent's "in" buffer. Then the
* process waits for the parent to set a "1" in the process' "out"
* buffer. Once this happens, the process writes a "1" in each of its
* children's "out" buffers, and returns.
*
* There's corner cases, of course, such as the root that has no
* parent, and the leaves that have no children. But that's the
* general idea.
*/
/* non-blocking barrier - init function */
int mca_coll_sm2_nbbarrier_intra(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module)
{
/* since completion must be in-order for the sm collective buffer allocation
* to work correctly, no barrier completion will happen here. The most
* that will be done is for the leaf processes, to signal their presence.
*/
/* local variables */
int index;
long long tag;
mca_coll_sm2_module_t *sm_module;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;
/* get pointer to nb-barrier structure */
index=request->sm_index;
sm_barrier_region=(mca_coll_sm2_nb_request_process_shared_mem_t *)
(request->barrier_base_address[index]);
/* set barrier tag - no atomicity needed as only only one outstanding
* collective per communicator exists
*/
sm_module=(mca_coll_sm2_module_t *)module;
sm_module->nb_barrier_tag++;
request->tag=sm_module->nb_barrier_tag;
tag=sm_module->nb_barrier_tag;
if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
/*
* Fan-in phase
*/
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank*
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=tag;
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
} else if( INTERIOR_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
/*
* Fan-in phase
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
} else {
/*
* Fan-in phase
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
}
/* return - successful completion */
return OMPI_SUCCESS;
}
/* non-blocking barrier - completion function */
int mca_coll_sm2_nbbarrier_intra_progress(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module)
{
/* local variables */
int index;
int child,cnt,phase;
long long tag;
mca_coll_sm2_module_t *sm_module;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;
/* get pointer to nb-barrier structure */
index=request->sm_index;
sm_barrier_region=request->barrier_base_address[index];
/* set barrier tag - no atomicity needed as only only one outstanding
* collective per communicator exists
*/
sm_module=(mca_coll_sm2_module_t *)module;
tag=request->tag;
if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
phase=request->sm2_barrier_phase;
if( NB_BARRIER_FAN_OUT == phase ) {
goto FANOUT_LEAF;
} else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
FANOUT_LEAF:
/*
* Fan-out phase
*/
/*
* check to see if parent has checked in
*/
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
sm_module->sm2_size_management_region_per_proc);
if( sm_address->flag != -tag ) {
/* if parent has not checked in - set parameters for async
* completion, incomplet barrier flag, and bail
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
return OMPI_SUCCESS;
}
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
} else if( INTERIOR_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
phase=request->sm2_barrier_phase;
if( NB_BARRIER_FAN_OUT == phase ) {
goto FANOUT_INTERIOR;
} else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
/* check to see if children have checked in */
cnt=0;
for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
/* compute flag address */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
sm_module->sm2_size_management_region_per_proc);
if(sm_address->flag == tag ) {
/* child arrived */
cnt++;
} else {
/* child not arrived, just break out */
break;
}
}
/* if children have not checked in - set paramenters for async
* completion, incomplet barrier flag, and bail
*/
if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
/* set restart parameters, and exit */
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
return OMPI_SUCCESS;
}
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=tag;
/* don't need memory barrier here, as we are not setting any other sm
* data for someone else to read
*/
FANOUT_INTERIOR:
/*
* Fan-out phase
*/
/*
* check to see if parent has checked in
*/
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
sm_module->sm2_size_management_region_per_proc);
if( sm_address->flag != -tag ) {
/* if parent has not checked in - set parameters for async
* completion, incomplet barrier flag, and bail
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
return OMPI_SUCCESS;
}
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=-tag;
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
} else {
/* root node */
phase=request->sm2_barrier_phase;
if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
/* check to see if children have checked in */
cnt=0;
for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
/* compute flag address */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
sm_module->sm2_size_management_region_per_proc);
if(sm_address->flag == tag ) {
/* child arrived */
cnt++;
} else {
/* child not arrived, just break out */
break;
}
}
/* if children have not checked in - set paramenters for async
* completion, incomplet barrier flag, and bail
*/
if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
/* set restart parameters, and exit */
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
return OMPI_SUCCESS;
}
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=-tag;
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
}
DONE:
/* return - successful completion */
return OMPI_SUCCESS;
}
/**
* Shared memory blocking allreduce.
*/
int mca_coll_sm2_barrier_intra_fanin_fanout(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS,bar_buff_index;
int my_rank, child_rank, child, n_parents, n_children;
int my_fanin_parent;
int my_fanout_parent;
long long tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_reduction_node, *my_fanout_read_tree;
sm_work_buffer_t *sm_buffer_desc;
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_rank=ompi_comm_rank(comm);
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]);
n_children=my_reduction_node->n_children;
n_parents=my_reduction_node->n_parents;
my_fanin_parent=my_reduction_node->parent_rank;
my_fanout_parent=my_fanout_read_tree->parent_rank;
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/*
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
*/
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/***************************
* Fan into root phase
***************************/
if( LEAF_NODE != my_reduction_node->my_node_type ) {
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][child_rank];
/*
sm_buffer_desc->proc_memory[child_rank].control_region;
*/
/* wait until child flag is set */
while( child_ctl_pointer->flag != tag ) {
opal_progress();
}
/* end test */
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
} else {
/* leaf node */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/***************************
* Fan out from root
***************************/
/*
* Fan out from root - let the memory copies at each
* stage help reduce memory contention.
*/
if( ROOT_NODE == my_fanout_read_tree->my_node_type ) {
/* I am the root - so copy signal children, and then
* start reading
*/
/*
MB();
*/
my_ctl_pointer->flag=-tag;
} else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != -tag ) {
opal_progress();
}
} else {
/* interior nodes */
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != -tag ) {
opal_progress();
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/* signal children that they may read the result data */
my_ctl_pointer->flag=-tag;
}
/* "free" the shared-memory working buffer */
/*
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
*/
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking barrier.
*/
int mca_coll_sm2_barrier_intra_recursive_doubling(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS;
int pair_rank,exchange,extra_rank;
pair_exchange_node_t *my_exchange_node;
int my_rank,bar_buff_index;
long long tag, base_tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
partner_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
extra_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
/* debug
opal_timer_t t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
end debug */
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_exchange_node=&(sm_module->recursive_doubling_tree);
my_rank=ompi_comm_rank(comm);
/* get pointer to barrier strcuture */
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
/* get unique set of tags for this stripe.
* Assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
base_tag=sm_module->collective_tag;
sm_module->collective_tag+=my_exchange_node->n_tags;
/* get pointers to my work buffers */
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/* copy data in from the "extra" source, if need be */
tag=base_tag;
if(0 < my_exchange_node->n_extra_sources) {
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
} else {
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
}
/*
MB();
*/
/*
* Signal parent that data is ready
*/
tag=base_tag+1;
my_ctl_pointer->flag=tag;
/* loop over data exchanges */
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
/* debug
t4=opal_sys_timer_get_cycles();
end debug */
/* is the remote data read */
pair_rank=my_exchange_node->rank_exchanges[exchange];
partner_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][pair_rank];
/*
sm_buffer_desc->proc_memory[pair_rank].control_region;
*/
/*
MB();
*/
my_ctl_pointer->flag=tag;
/* wait until remote data is read */
while( partner_ctl_pointer->flag < tag ) {
opal_progress();
}
/* end test */
/* signal that I am done reading my peer's data */
tag++;
}
/* copy data in from the "extra" source, if need be */
if(0 < my_exchange_node->n_extra_sources) {
tag=base_tag+my_exchange_node->n_tags-1;
if ( EXTRA_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while(! ( extra_ctl_pointer->flag == tag ) ) {
opal_progress();
}
/* signal that I am done */
my_ctl_pointer->flag=tag;
} else {
tag=base_tag+my_exchange_node->n_tags-1;
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
/* wait until child is done to move on - this buffer will
* be reused for the next stripe, so don't want to move
* on too quick.
*/
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
}
}
/* debug
t9=opal_sys_timer_get_cycles();
timers[5]+=(t9-t8);
end debug */
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking barrier
*/
int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
sm_module=(mca_coll_sm2_module_t *) module;
rc= sm_module->barrier_functions[0](comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
return OMPI_SUCCESS;
Error:
return rc;
}