1
1

The SM2 collective component has not been updated in a long

time. Rich, the original developer, agrees with this removal.

This commit was SVN r25368.
Этот коммит содержится в:
George Bosilca 2011-10-25 22:07:09 +00:00
родитель e887d595c7
Коммит 72f731f25f
12 изменённых файлов: 0 добавлений и 6635 удалений

Просмотреть файл

Просмотреть файл

@ -1,2 +0,0 @@
rg6
rlgraham

Просмотреть файл

@ -1,57 +0,0 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
not_used_yet =
sources = \
coll_sm2.h \
coll_sm2_component.c \
coll_sm2_module.c \
coll_sm2_bcast.c \
coll_sm2_reduce.c \
coll_sm2_allreduce.c \
coll_sm2_barrier.c \
coll_sm2_service.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
component_noinst =
component_install =
if MCA_BUILD_ompi_coll_sm2_DSO
component_install += mca_coll_sm2.la
else
component_noinst += libmca_coll_sm2.la
endif
# See ompi/mca/btl/sm/Makefile.am for an explanation of
# libmca_common_sm.la.
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_coll_sm2_la_SOURCES = $(sources)
mca_coll_sm2_la_LDFLAGS = -module -avoid-version
mca_coll_sm2_la_LIBADD = \
$(top_ompi_builddir)/ompi/mca/common/sm/libmca_common_sm.la
noinst_LTLIBRARIES = $(component_noinst)
libmca_coll_sm2_la_SOURCES =$(sources)
libmca_coll_sm2_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,690 +0,0 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#ifndef MCA_COLL_SM2_EXPORT_H
#define MCA_COLL_SM2_EXPORT_H
#include "ompi_config.h"
#include "mpi.h"
#include "opal/mca/mca.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/common/sm/common_sm_mmap.h"
#include "ompi/request/request.h"
BEGIN_C_DECLS
#ifdef HAVE_SCHED_YIELD
# include <sched.h>
# define SPIN sched_yield()
#elif defined(__WINDOWS__)
# define SPIN SwitchToThread()
#else /* no switch available */
# define SPIN
#endif
/*
* Memory Management
* - All memory allocation will be done on a per-communictor basis
* - At least two banks of memory will be used
* - Each bank of memory will have M buffers (or segments)
* - These buffers will be used in a cirucular buffer order
* - Each buffer will be contigous in virtual memory, and will have page-aligned
* regions belonging to each process in the communicator
* - The memory associated with each process will have a control region, and
* a data region.
* - First touch will be used to enforce memory locality, and thus relies on
* processor affinity to be set.
* - A non-blocking collective will be issued when all buffers in a bank have
* been used. This will be completed before this bank is re-used.
*/
/**
* Structure to hold the sm coll component. First it holds the
* base coll component, and then holds a bunch of
* sm-coll-component-specific stuff (e.g., current MCA param
* values).
*/
struct mca_coll_sm2_component_t {
/** Base coll component */
mca_coll_base_component_2_0_0_t super;
/** MCA parameter: Priority of this component */
int sm2_priority;
/** MCA parameter: control region size (bytes), per proc */
size_t sm2_ctl_size_per_proc;
/** MCA parameter: control region size (bytes) actually allocated - per proc*/
size_t sm2_ctl_size_allocated;
/** MCA parameter: control region alignment */
size_t sm2_ctl_alignment;
/** MCA parameter: Max data Segment size */
size_t sm2_max_data_seg_size;
/** MCA parameter: Min data Segment size */
size_t sm2_data_seg_size;
/** MCA parameter: control data size (bytes) actually allocated - per proc*/
size_t sm2_data_size_allocated;
/** MCA parameter: data region alignment */
int sm2_data_alignment;
/** MCA parameter: number of memory banks */
int sm2_num_mem_banks;
/** MCA parameter: number of regions per memory bank */
int sm2_num_regions_per_bank;
/** MCA parameter: order of buffer management barrier tree */
int order_barrier_tree;
/** MCA parameter: order of reduction tree */
int order_reduction_tree;
/** MCA parameter: order of fan-out read tree */
int order_fanout_read_tree;
/** MCA paramenter: number of polling loops to run while waiting
* for children or parent to complete their work
*/
int n_poll_loops;
/** MCA parameter: message size cutoff for switching between
* short and long protocol
*/
size_t short_message_size;
/*
* Parameters to control methods used
*/
/** MCA parameter: method to force a given barrier method to be used.
* 0 - FANIN_FAN_OUT_BARRIER_FN
* 1 - RECURSIVE_DOUBLING_BARRIER_FN
*/
int force_barrier;
/** MCA parameter: method to force a given reduce method to be used.
* 0 - FANIN_FAN_OUT_REDUCE_FN
* 1 - REDUCE_SCATTER_GATHER_FN
*/
int force_reduce;
/** MCA parameter: method to force a given allreduce method to be used.
* 0 - FANIN_FANOUT_ALLREDUCE_FN
* 1 - REDUCE_SCATTER_ALLGATHER_FN
*/
int force_allreduce;
};
/**
* Convenience typedef
*/
typedef struct mca_coll_sm2_component_t mca_coll_sm2_component_t;
/*
* Implemented function index list
*/
/* barrier */
enum{
FANIN_FAN_OUT_BARRIER_FN,
RECURSIVE_DOUBLING_BARRIER_FN,
N_BARRIER_FNS
};
/* reduce */
enum{
FANIN_REDUCE_FN,
REDUCE_SCATTER_GATHER_FN,
N_REDUCE_FNS
};
enum{
SHORT_DATA_FN_REDUCE,
LONG_DATA_FN_REDUCE,
N_REDUCE_FNS_USED
};
/* all-reduce */
enum{
FANIN_FANOUT_ALLREDUCE_FN,
REDUCE_SCATTER_ALLGATHER_FN,
N_ALLREDUCE_FNS
};
enum{
SHORT_DATA_FN_ALLREDUCE,
LONG_DATA_FN_ALLREDUCE,
N_ALLREDUCE_FNS_USED
};
/* enum for node type */
enum{
ROOT_NODE,
LEAF_NODE,
INTERIOR_NODE
};
/*
* N-order tree node description
*/
struct tree_node_t {
/* my rank within the group */
int my_rank;
/* my node type - root, leaf, or interior */
int my_node_type;
/* number of nodes in the tree */
int tree_size;
/* number of parents (0/1) */
int n_parents;
/* number of children */
int n_children;
/* parent rank within the group */
int parent_rank;
/* chidren ranks within the group */
int *children_ranks;
};
typedef struct tree_node_t tree_node_t;
/*
* Pair-wise data exchange
*/
/* enum for node type */
enum{
EXCHANGE_NODE,
EXTRA_NODE
};
struct pair_exchange_node_t {
/* number of nodes this node will exchange data with */
int n_exchanges;
/* ranks of nodes involved in data exchnge */
int *rank_exchanges;
/* number of extra sources of data - outside largest power of 2 in
* this group */
int n_extra_sources;
/* rank of the extra source */
int rank_extra_source;
/* number of tags needed per stripe */
int n_tags;
/* log 2 of largest full power of 2 for this node set */
int log_2;
/* largest power of 2 that fits in this group */
int n_largest_pow_2;
/* node type */
int node_type;
};
typedef struct pair_exchange_node_t pair_exchange_node_t;
/*
* Barrier request objects
*/
/* shared memory data strucutures */
struct mca_coll_sm2_nb_request_process_shared_mem_t {
/* flag used to indicate the status of this memory region */
volatile long long flag;
volatile long long index;
/* pading */
/* Note: need to change this so it takes less memory */
char padding[2*opal_cache_line_size-2*sizeof(long long)];
};
typedef struct mca_coll_sm2_nb_request_process_shared_mem_t
mca_coll_sm2_nb_request_process_shared_mem_t;
/* enum for phase at which the nb barrier is in */
enum{
NB_BARRIER_INACTIVE,
NB_BARRIER_FAN_IN,
NB_BARRIER_FAN_OUT,
/* done and not started are the same for all practicle
* purposes, as the init funtion always sets this flag
*/
NB_BARRIER_DONE
};
/* forward declartion */
struct mca_coll_sm2_module_t;
/*
* shared memory region descriptor
*/
struct sm_memory_region_desc_t {
/* pointer to control structures */
volatile mca_coll_sm2_nb_request_process_shared_mem_t *control_region;
/* pointer to data segment, and lower half of data segment */
volatile char *data_segment;
};
typedef struct sm_memory_region_desc_t sm_memory_region_desc_t;
/*
* Shared memory buffer management strcucture
*/
struct sm_work_buffer_t {
/* pointer to segment base */
volatile char * base_segment_address;
/* description of how the memory segment is mapped on
* a per process basis
*/
sm_memory_region_desc_t *proc_memory;
/*
* bank index
*/
int bank_index;
/*
* first buffer in the bank - if the barrier corresponding to
* this bank is active when trying to allocate this buffer,
* can't proceed until it complete
*/
int index_first_buffer_in_bank;
/* last buffer in the bank - nb barrier is started after this
* buffer is freed.
*/
int index_last_buffer_in_bank;
};
typedef struct sm_work_buffer_t sm_work_buffer_t;
/* process private barrier request object */
struct mca_coll_sm2_nb_request_process_private_mem_t {
struct ompi_request_t super;
/* tag that will be used as unique barrier identifier */
long long tag;
/* barrier phase */
int sm2_barrier_phase;
/* shared memory strucuture index - will be flip-flopping between structures */
int sm_index;
/* this processes base address of the barrier shared memory region */
mca_coll_sm2_nb_request_process_shared_mem_t *barrier_base_address[2];
/* module pointer */
struct mca_coll_sm2_module_t *coll_sm2_module;
};
typedef struct mca_coll_sm2_nb_request_process_private_mem_t
mca_coll_sm2_nb_request_process_private_mem_t;
/* debug */
#define BARRIER_BANK_LIST_SIZE 32
/* end debug */
struct mca_coll_sm2_module_t {
/* base structure */
mca_coll_base_module_t super;
/* size */
int comm_size;
/* Shared Memory file name */
char *coll_sm2_file_name;
/* size of shared memory backing file */
size_t size_sm2_backing_file;
/* Memory pointer to shared file */
char *shared_memory_region;
/* size of memory banks control regions */
size_t size_mem_banks_ctl_region;
/* Pointer to the collective buffers */
char *collective_buffer_region;
/* size of collective buffer region */
size_t size_of_collective_buffer_region;
/* pointer to memory for blocking collectives */
char *sm_blocking_barrier_region;
/* size of memory for blocking collectives */
size_t size_of_blocking_barrier_region;
/* per proc size of memory for blocking collectives */
size_t per_proc_size_of_blocking_barrier_region;
/* index of blocking barrier memory region to use */
int index_blocking_barrier_memory_bank;
/* pointers to blocking memory control regions */
volatile mca_coll_sm2_nb_request_process_shared_mem_t ***ctl_blocking_barrier;
/* description of allocated temp buffers - one struct per
* buffer. Each buffer has space "owned" by each process
* in the group.
*/
sm_work_buffer_t *sm_buffer_descriptor;
/* size of memory region, per process, for memory bank management */
size_t sm2_size_management_region_per_proc;
/* size of each memory segment */
size_t segment_size;
/* size, per process, of each memory segment */
size_t segement_size_per_process;
/* size, per process and segment , of control region */
size_t ctl_memory_per_proc_per_segment;
/* size, per process and segment , of data region */
size_t data_memory_per_proc_per_segment;
/* data strucutures used to manage the memory buffers */
long long num_nb_barriers_started;
long long num_nb_barriers_completed;
/* number of memory banks */
int sm2_module_num_memory_banks;
/* number of buffers per memory bank */
int sm2_module_num_regions_per_bank;
/* total number of working buffers */
int sm2_module_num_buffers;
/* allocated buffer index - local counter */
int sm2_allocated_buffer_index;
/* freed allocated buffer index - local counter */
int sm2_freed_buffer_index;
/* communicator - there is a one-to-one association between
* the communicator and the module
*/
struct ompi_communicator_t *module_comm;
/* non-blocking barrier strcutres used for mangeing the shared
* buffers */
tree_node_t sm_buffer_mgmt_barrier_tree;
/* request objects for the non-blocking barrier */
mca_coll_sm2_nb_request_process_private_mem_t *barrier_request;
/* barrier request to progress */
int current_request_index;
/* unique tag used for non-blocking collectives */
long long nb_barrier_tag;
/* multinumial reduction tree */
tree_node_t *reduction_tree;
/* multinumial fan-out read tree */
tree_node_t *fanout_read_tree;
/* recursive-doubling tree node */
pair_exchange_node_t recursive_doubling_tree;
/* number of polling loops to run while waiting
* for children or parent to complete their work
*/
int n_poll_loops;
/* collective tag */
long long collective_tag;
/* scratch space - one int per process */
int *scratch_space;
/* message size cutoff for switching between short and long
* protocol
*/
size_t short_message_size;
/*
* flag indicating if have socket layout for the procs
*/
int have_socket_information;
/*
* socket index
*/
int *socket_index;
/*
* number of processes per socket
*/
int *n_procs_per_socket;
/*
* sockets in use
*/
int *sockets_in_use;
/*
* index of my socekt within the list of sockets in use
*/
int my_socket_group;
/*
* number of processes per socket
*/
int **list_of_ranks_per_socket;
/*
* function table for variants of a given collective
* function.
*/
mca_coll_base_module_barrier_fn_t barrier_functions[N_BARRIER_FNS];
mca_coll_base_module_reduce_fn_t list_reduce_functions[N_REDUCE_FNS];
mca_coll_base_module_reduce_fn_t reduce_functions[N_REDUCE_FNS_USED];
mca_coll_base_module_allreduce_fn_t
list_allreduce_functions[N_ALLREDUCE_FNS];
mca_coll_base_module_allreduce_fn_t
allreduce_functions[N_ALLREDUCE_FNS_USED];
};
typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t;
OBJ_CLASS_DECLARATION(mca_coll_sm2_module_t);
/*
* struct for manageing the allreduce pipeline.
*/
struct mca_coll_sm2_module_allreduce_pipeline_t {
/* pointer to shared temporary working buffer */
sm_work_buffer_t *shared_buffer;
/* cached rank */
int my_rank;
/* cached reduction node */
tree_node_t *my_reduction_node;
/* cached fanout tree */
tree_node_t *my_fanout_read_tree;
/* staus of the buffer - determines what next to do
* with this data
*/
int status;
/*
* number of child loops completed - needed for
* async progress
*/
int n_child_loops_completed;
/*
* number of data-type elements to process
*/
int count_this_stripe;
/*
* offset into the data type buffer, in units of data-types
*/
int count_processed;
/*
* tag
*/
long long tag;
};
typedef struct mca_coll_sm2_module_allreduce_pipeline_t
mca_coll_sm2_module_allreduce_pipeline_t;
OBJ_CLASS_DECLARATION(mca_coll_sm2_module_allreduce_pipeline_t);
enum {
BUFFER_AVAILABLE,
STARTED,
FANIN,
FANOUT
};
/**
* Global component instance
*/
OMPI_MODULE_DECLSPEC extern mca_coll_sm2_component_t mca_coll_sm2_component;
/*
* coll module functions
*/
/* query to see if the component is available for use, and can
* satisfy the thread and progress requirements
*/
int mca_coll_sm2_init_query(bool enable_progress_threads,
bool enable_mpi_threads);
/* query to see if the module is available for use on the given
* communicator, and if so, what it's priority is.
*/
mca_coll_base_module_t *
mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority);
/* setup an multi-nomial tree - for each node in the tree
* this returns it's parent, and it's children
*/
int setup_multinomial_tree(int tree_order, int num_nodes,
tree_node_t *tree_nodes);
/* setup recursive doubleing tree node */
int setup_recursive_doubling_tree_node(int num_nodes, int node_rank,
pair_exchange_node_t *tree_node);
/* non-blocking barrier - init function */
int mca_coll_sm2_nbbarrier_intra(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module);
/* non-blocking barrier - completion function */
int mca_coll_sm2_nbbarrier_intra_progress(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module);
/* allocate working buffer */
sm_work_buffer_t *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module);
/* free working buffer - it is assumed that buffers are released in
* the order they are allocated. We can assume this because each
* communiator will have only one outstanding collective at a given
* time, and we ensure that operations are completed in order. */
int free_sm2_shared_buffer(mca_coll_sm2_module_t *module);
/**
* Shared memory blocking allreduce.
*/
int mca_coll_sm2_allreduce_intra(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_allreduce_intra_reducescatter_allgather(
void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/**
* Shared memory blocking reduce
*/
int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/**
* Shared memory blocking broadcast.
*/
int mca_coll_sm2_bcast_intra(void *buf, int count,
struct ompi_datatype_t *dtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/**
* Shared memory blocking barrier
*/
int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_barrier_intra_fanin_fanout(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_sm2_barrier_intra_recursive_doubling(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
END_C_DECLS
#endif /* MCA_COLL_SM2_EXPORT_H */

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,711 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "opal/sys/atomic.h"
#include "coll_sm2.h"
/* debug
extern int debug_print;
end debug */
/**
* Shared memory barrier.
*
* Tree-based algorithm for a barrier: a fan in to rank 0 followed by
* a fan out using the barrier segments in the shared memory area.
*
* There are 2 sets of barrier buffers -- since there can only be, at
* most, 2 outstanding barriers at any time, there is no need for more
* than this. The generalized in-use flags, control, and data
* segments are not used.
*
* The general algorithm is for a given process to wait for its N
* children to fan in by monitoring a uint32_t in its barrier "in"
* buffer. When this value reaches N (i.e., each of the children have
* atomically incremented the value), then the process atomically
* increases the uint32_t in its parent's "in" buffer. Then the
* process waits for the parent to set a "1" in the process' "out"
* buffer. Once this happens, the process writes a "1" in each of its
* children's "out" buffers, and returns.
*
* There's corner cases, of course, such as the root that has no
* parent, and the leaves that have no children. But that's the
* general idea.
*/
/* non-blocking barrier - init function */
int mca_coll_sm2_nbbarrier_intra(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module)
{
/* since completion must be in-order for the sm collective buffer allocation
* to work correctly, no barrier completion will happen here. The most
* that will be done is for the leaf processes, to signal their presence.
*/
/* local variables */
int index;
long long tag;
mca_coll_sm2_module_t *sm_module;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;
/* get pointer to nb-barrier structure */
index=request->sm_index;
sm_barrier_region=(mca_coll_sm2_nb_request_process_shared_mem_t *)
(request->barrier_base_address[index]);
/* set barrier tag - no atomicity needed as only only one outstanding
* collective per communicator exists
*/
sm_module=(mca_coll_sm2_module_t *)module;
sm_module->nb_barrier_tag++;
request->tag=sm_module->nb_barrier_tag;
tag=sm_module->nb_barrier_tag;
if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
/*
* Fan-in phase
*/
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank*
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=tag;
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
} else if( INTERIOR_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
/*
* Fan-in phase
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
} else {
/*
* Fan-in phase
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
}
/* return - successful completion */
return OMPI_SUCCESS;
}
/* non-blocking barrier - completion function */
int mca_coll_sm2_nbbarrier_intra_progress(struct ompi_communicator_t *comm,
mca_coll_sm2_nb_request_process_private_mem_t *request,
mca_coll_base_module_t *module)
{
/* local variables */
int index;
int child,cnt,phase;
long long tag;
mca_coll_sm2_module_t *sm_module;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_barrier_region;
mca_coll_sm2_nb_request_process_shared_mem_t *sm_address;
/* get pointer to nb-barrier structure */
index=request->sm_index;
sm_barrier_region=request->barrier_base_address[index];
/* set barrier tag - no atomicity needed as only only one outstanding
* collective per communicator exists
*/
sm_module=(mca_coll_sm2_module_t *)module;
tag=request->tag;
if( LEAF_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
phase=request->sm2_barrier_phase;
if( NB_BARRIER_FAN_OUT == phase ) {
goto FANOUT_LEAF;
} else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
FANOUT_LEAF:
/*
* Fan-out phase
*/
/*
* check to see if parent has checked in
*/
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
sm_module->sm2_size_management_region_per_proc);
if( sm_address->flag != -tag ) {
/* if parent has not checked in - set parameters for async
* completion, incomplet barrier flag, and bail
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
return OMPI_SUCCESS;
}
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
} else if( INTERIOR_NODE == sm_module->sm_buffer_mgmt_barrier_tree.my_node_type ) {
phase=request->sm2_barrier_phase;
if( NB_BARRIER_FAN_OUT == phase ) {
goto FANOUT_INTERIOR;
} else if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
/* check to see if children have checked in */
cnt=0;
for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
/* compute flag address */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
sm_module->sm2_size_management_region_per_proc);
if(sm_address->flag == tag ) {
/* child arrived */
cnt++;
} else {
/* child not arrived, just break out */
break;
}
}
/* if children have not checked in - set paramenters for async
* completion, incomplet barrier flag, and bail
*/
if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
/* set restart parameters, and exit */
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
return OMPI_SUCCESS;
}
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=tag;
/* don't need memory barrier here, as we are not setting any other sm
* data for someone else to read
*/
FANOUT_INTERIOR:
/*
* Fan-out phase
*/
/*
* check to see if parent has checked in
*/
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.parent_rank*
sm_module->sm2_size_management_region_per_proc);
if( sm_address->flag != -tag ) {
/* if parent has not checked in - set parameters for async
* completion, incomplet barrier flag, and bail
*/
request->sm2_barrier_phase=NB_BARRIER_FAN_OUT;
return OMPI_SUCCESS;
}
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=-tag;
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
} else {
/* root node */
phase=request->sm2_barrier_phase;
if ( (NB_BARRIER_DONE == phase) || (NB_BARRIER_INACTIVE == phase) ) {
goto DONE;
}
/* defult - NB_BARRIER_FAN_IN */
/*
* Fan-in phase
*/
/* check to see if children have checked in */
cnt=0;
for( child=0 ; child < sm_module->sm_buffer_mgmt_barrier_tree.n_children ; child++ ) {
/* compute flag address */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.children_ranks[child] *
sm_module->sm2_size_management_region_per_proc);
if(sm_address->flag == tag ) {
/* child arrived */
cnt++;
} else {
/* child not arrived, just break out */
break;
}
}
/* if children have not checked in - set paramenters for async
* completion, incomplet barrier flag, and bail
*/
if( cnt != sm_module->sm_buffer_mgmt_barrier_tree.n_children ) {
/* set restart parameters, and exit */
request->sm2_barrier_phase=NB_BARRIER_FAN_IN;
return OMPI_SUCCESS;
}
/* Set my completion flag */
sm_address=(mca_coll_sm2_nb_request_process_shared_mem_t *)
((char *)sm_barrier_region+
sm_module->sm_buffer_mgmt_barrier_tree.my_rank *
sm_module->sm2_size_management_region_per_proc);
sm_address->flag=-tag;
/*
* set my completion flag
*/
request->sm2_barrier_phase=NB_BARRIER_DONE;
}
DONE:
/* return - successful completion */
return OMPI_SUCCESS;
}
/**
* Shared memory blocking allreduce.
*/
int mca_coll_sm2_barrier_intra_fanin_fanout(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS,bar_buff_index;
int my_rank, child_rank, child, n_parents, n_children;
int my_fanin_parent;
int my_fanout_parent;
long long tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_reduction_node, *my_fanout_read_tree;
sm_work_buffer_t *sm_buffer_desc;
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_rank=ompi_comm_rank(comm);
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]);
n_children=my_reduction_node->n_children;
n_parents=my_reduction_node->n_parents;
my_fanin_parent=my_reduction_node->parent_rank;
my_fanout_parent=my_fanout_read_tree->parent_rank;
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/*
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
*/
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/***************************
* Fan into root phase
***************************/
if( LEAF_NODE != my_reduction_node->my_node_type ) {
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][child_rank];
/*
sm_buffer_desc->proc_memory[child_rank].control_region;
*/
/* wait until child flag is set */
while( child_ctl_pointer->flag != tag ) {
opal_progress();
}
/* end test */
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
} else {
/* leaf node */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/***************************
* Fan out from root
***************************/
/*
* Fan out from root - let the memory copies at each
* stage help reduce memory contention.
*/
if( ROOT_NODE == my_fanout_read_tree->my_node_type ) {
/* I am the root - so copy signal children, and then
* start reading
*/
/*
MB();
*/
my_ctl_pointer->flag=-tag;
} else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != -tag ) {
opal_progress();
}
} else {
/* interior nodes */
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != -tag ) {
opal_progress();
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/* signal children that they may read the result data */
my_ctl_pointer->flag=-tag;
}
/* "free" the shared-memory working buffer */
/*
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
*/
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking barrier.
*/
int mca_coll_sm2_barrier_intra_recursive_doubling(
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS;
int pair_rank,exchange,extra_rank;
pair_exchange_node_t *my_exchange_node;
int my_rank,bar_buff_index;
long long tag, base_tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
partner_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
extra_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
/* debug
opal_timer_t t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
end debug */
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_exchange_node=&(sm_module->recursive_doubling_tree);
my_rank=ompi_comm_rank(comm);
/* get pointer to barrier strcuture */
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
/* get unique set of tags for this stripe.
* Assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
base_tag=sm_module->collective_tag;
sm_module->collective_tag+=my_exchange_node->n_tags;
/* get pointers to my work buffers */
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/* copy data in from the "extra" source, if need be */
tag=base_tag;
if(0 < my_exchange_node->n_extra_sources) {
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
} else {
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
}
/*
MB();
*/
/*
* Signal parent that data is ready
*/
tag=base_tag+1;
my_ctl_pointer->flag=tag;
/* loop over data exchanges */
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
/* debug
t4=opal_timer_base_get_cycles();
end debug */
/* is the remote data read */
pair_rank=my_exchange_node->rank_exchanges[exchange];
partner_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][pair_rank];
/*
sm_buffer_desc->proc_memory[pair_rank].control_region;
*/
/*
MB();
*/
my_ctl_pointer->flag=tag;
/* wait until remote data is read */
while( partner_ctl_pointer->flag < tag ) {
opal_progress();
}
/* end test */
/* signal that I am done reading my peer's data */
tag++;
}
/* copy data in from the "extra" source, if need be */
if(0 < my_exchange_node->n_extra_sources) {
tag=base_tag+my_exchange_node->n_tags-1;
if ( EXTRA_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while(! ( extra_ctl_pointer->flag == tag ) ) {
opal_progress();
}
/* signal that I am done */
my_ctl_pointer->flag=tag;
} else {
tag=base_tag+my_exchange_node->n_tags-1;
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
/* wait until child is done to move on - this buffer will
* be reused for the next stripe, so don't want to move
* on too quick.
*/
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
}
}
/* debug
t9=opal_timer_base_get_cycles();
timers[5]+=(t9-t8);
end debug */
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking barrier
*/
int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
sm_module=(mca_coll_sm2_module_t *) module;
rc= sm_module->barrier_functions[0](comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
return OMPI_SUCCESS;
Error:
return rc;
}

Просмотреть файл

@ -1,234 +0,0 @@
/*
* Copyright (c) 2007-2008 UT-Battelle, LLC
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include "ompi/constants.h"
#include "coll_sm2.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
/* debug
#include "opal/sys/timer.h"
extern uint64_t timers[7];
end debug */
/**
* Shared memory blocking allreduce.
*/
static
int mca_coll_sm2_fanout(void *buf, int count,
struct ompi_datatype_t *dtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int comm_size,process_shift,my_node_index;
int my_rank;
int count_processed,count_this_stripe;
int my_fanout_parent;
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
long long tag;
volatile char * my_data_pointer;
volatile char * parent_data_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_fanout_read_tree;
sm_work_buffer_t *sm_buffer_desc;
sm_module=(mca_coll_sm2_module_t *) module;
/* compute process shift */
my_rank=ompi_comm_rank(comm);
comm_size=ompi_comm_size(comm);
process_shift=root;
my_node_index=my_rank-root;
/* wrap around */
if(0 > my_node_index ) {
my_node_index+=comm_size;
}
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_datatype_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
message_extent=dt_extent*count;
/* lenght of control and data regions */
ctl_size=sm_module->ctl_memory_per_proc_per_segment;
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
/* number of data types copies that the scratch buffer can hold */
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
if ( 0 == n_dts_per_buffer ) {
rc=OMPI_ERROR;
goto Error;
}
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_fanout_read_tree=&(sm_module->fanout_read_tree[my_node_index]);
my_fanout_parent=my_fanout_read_tree->parent_rank+process_shift;
if( comm_size <= my_fanout_parent ){
my_fanout_parent-=comm_size;
}
count_processed=0;
/* get a pointer to the shared-memory working buffer */
/* NOTE: starting with a rather synchronous approach */
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
/* get number of elements to process in this stripe */
count_this_stripe=n_dts_per_buffer;
if( count_processed + count_this_stripe > count )
count_this_stripe=count-count_processed;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
/*
* Fan out from root - let the memory copies at each
* stage help reduce memory contention.
*/
if( ROOT_NODE == my_fanout_read_tree->my_node_type ) {
/* copy data to user supplied buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)buf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/* I am the root - so copy signal children, and then
* start reading
*/
MB();
my_ctl_pointer->flag=tag;
} else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {
parent_data_pointer=
sm_buffer_desc->proc_memory[my_fanout_parent].data_segment;
parent_ctl_pointer=
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != tag) {
opal_progress();
}
/* copy data to user supplied buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)buf+dt_extent*count_processed,
(char *)parent_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
} else {
/* interior nodes */
parent_data_pointer=
sm_buffer_desc->proc_memory[my_fanout_parent].data_segment;
parent_ctl_pointer=
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
/*
* wait on Parent to signal that data is ready
*/
while( parent_ctl_pointer->flag != tag) {
opal_progress();
}
/* copy the data to my shared buffer, for access by children */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,(char *)parent_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/* signal children that they may read the result data */
my_ctl_pointer->flag=tag;
/* copy data to user supplied buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)buf+dt_extent*count_processed,
(char *)my_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* update the count of elements processed */
count_processed+=count_this_stripe;
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking broadcast.
*/
int mca_coll_sm2_bcast_intra(void *buf, int count,
struct ompi_datatype_t *dtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc;
rc= mca_coll_sm2_fanout(buf, count, dtype, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
return OMPI_SUCCESS;
Error:
return rc;
}

Просмотреть файл

@ -1,208 +0,0 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Most of the description of the data layout is in the
* coll_sm_module.c file.
*/
#include "ompi_config.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#include "ompi/constants.h"
#include "coll_sm2.h"
#include "ompi/mca/coll/base/base.h"
/*
* Public string showing the coll ompi_sm V2 component version number
*/
const char *mca_coll_sm2_component_version_string =
"Open MPI sm-V2 collective MCA component version " OMPI_VERSION;
/*
* Local functions
*/
static int sm2_open(void);
static int sm2_close(void);
static inline int mca_coll_sm2_param_register_int(
const char* param_name, int default_value)
{
int id = mca_base_param_register_int("coll","sm2",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
mca_coll_sm2_component_t mca_coll_sm2_component = {
/* First, fill in the super */
{
/* First, the mca_component_t struct containing meta
information about the component itself */
{
MCA_COLL_BASE_VERSION_2_0_0,
/* Component name and version */
"sm-v2",
OMPI_MAJOR_VERSION,
OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION,
/* Component open and close functions */
sm2_open,
sm2_close,
},
{
/* The component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},
/* Initialization / querying functions */
mca_coll_sm2_init_query,
mca_coll_sm2_comm_query,
},
/* sm-component specifc information */
/* (default) priority */
0,
};
/*
* Open the component
*/
static int sm2_open(void)
{
/* local variables */
mca_coll_sm2_component_t *cs = &mca_coll_sm2_component;
/* set component priority */
cs->sm2_priority=
mca_coll_sm2_param_register_int("sm2_priority",90);
/* set control region size (bytes), per proc */
cs->sm2_ctl_size_per_proc=
mca_coll_sm2_param_register_int("sm2_ctl_size_per_proc",2*sizeof(long long));
/* initialize control region allocted */
cs->sm2_ctl_size_allocated=0;
/* set control region alignment (bytes) */
cs->sm2_ctl_alignment=
mca_coll_sm2_param_register_int("sm2_ctl_alignment",getpagesize());
/* Min data Segment size (bytes) - per proc */
cs->sm2_data_seg_size=
mca_coll_sm2_param_register_int("sm2_data_seg_size",32768);
/* Max data Segment size (bytes) - per proc */
cs->sm2_max_data_seg_size=
mca_coll_sm2_param_register_int("sm2_max_data_seg_size",20*getpagesize());
/* initialize control region allocted */
cs->sm2_data_size_allocated=0;
/* Data region alignment (bytes) - per proc */
cs->sm2_data_alignment=
mca_coll_sm2_param_register_int("sm2_data_alignment",opal_cache_line_size);
/* Number of memory banks */
cs->sm2_num_mem_banks=
mca_coll_sm2_param_register_int("sm2_num_mem_banks",2);
/* Number of regions per memory bank */
cs->sm2_num_regions_per_bank=
mca_coll_sm2_param_register_int("sm2_num_regions_per_bank",8);
/* Order of buffer management Barrier Tree */
cs->order_barrier_tree=
mca_coll_sm2_param_register_int("order_barrier_tree",2);
/* Order of reduction Tree */
cs->order_reduction_tree=
mca_coll_sm2_param_register_int("order_reduction_tree",2);
/* Order of fan-out read Tree */
cs->order_fanout_read_tree=
mca_coll_sm2_param_register_int("order_fanout_read_tree",4);
/* number of polling loops to allow pending resources to
* complete their work
*/
cs->n_poll_loops=
mca_coll_sm2_param_register_int("n_poll_loops",4);
/* Size of message for switching between short and long protocol.
* This should probably be the segment size for several algorithms,
* though not all.
*/
cs->short_message_size=
mca_coll_sm2_param_register_int("short_message_size",32768);
/* collective ops to use */
cs->force_barrier=
mca_coll_sm2_param_register_int("force_barrier",(-1));
cs->force_reduce=
mca_coll_sm2_param_register_int("force_reduce",(-1));
cs->force_allreduce=
mca_coll_sm2_param_register_int("force_allreduce",(-1));
return OMPI_SUCCESS;
}
/*
* Close the component
*/
static int sm2_close(void)
{
return OMPI_SUCCESS;
}
/* query to see if the component is available for use, and can
* satisfy the thread and progress requirements
*/
int mca_coll_sm2_init_query(bool enable_progress_threads,
bool enable_mpi_threads)
{
/* at this stage there is no reason to disaulify this component */
/* done */
return OMPI_SUCCESS;
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,852 +0,0 @@
/*
* Copyright (c) 2007-2008 UT-Battelle, LLC
* Copyright (c) 2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include "ompi/constants.h"
#include "coll_sm2.h"
#include "ompi/op/op.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
/* debug
#include "opal/sys/timer.h"
extern uint64_t timers[7];
end debug */
/**
* Shared memory blocking allreduce.
*/
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int my_rank, comm_size, child_rank, child, n_children;
int count_processed,count_this_stripe;
int process_shift,my_node_index;
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
long long tag;
volatile char * my_data_pointer;
volatile char * child_data_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
tree_node_t *my_reduction_node;
sm_work_buffer_t *sm_buffer_desc;
/* debug
last_root=root;
end debug */
sm_module=(mca_coll_sm2_module_t *) module;
/* compute process shift */
my_rank=ompi_comm_rank(comm);
comm_size=ompi_comm_size(comm);
process_shift=root;
my_node_index=my_rank-root;
/* wrap around */
if(0 > my_node_index ) {
my_node_index+=comm_size;
}
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_datatype_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
message_extent=dt_extent*count;
/* lenght of control and data regions */
ctl_size=sm_module->ctl_memory_per_proc_per_segment;
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
/* number of data types copies that the scratch buffer can hold */
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
if ( 0 == n_dts_per_buffer ) {
rc=OMPI_ERROR;
goto Error;
}
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_reduction_node=&(sm_module->reduction_tree[my_node_index]);
n_children=my_reduction_node->n_children;
/* debug
node_type=my_reduction_node->my_node_type;
end debug */
if( 1 == n_data_segments ) {
/* single data segment */
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/* debug
assert(tag);
end debug */
/* get a pointer to the shared-memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
/* debug
free_buff_free_index=tag;
end debug */
/* get number of elements to process in this stripe */
count_this_stripe=count;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
/***************************
* Fan into root phase
***************************/
if( ROOT_NODE == my_reduction_node->my_node_type ) {
/*
* copy local data from source buffer to result buffer
*/
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)rbuf,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 2 count %d root %d child_rank %d \n",
count,root,child_rank);
debug_module();
}
end debug */
/* wait until child flag is set */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)rbuf, count_this_stripe,dtype);
} /* end child loop */
} else if( INTERIOR_NODE == my_reduction_node->my_node_type ) {
/* copy segment into shared buffer - ompi_op_reduce
* provids only 2 buffers, so can't add from two
* into a third buffer.
*/
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* wait until child flag is set */
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 3 count %d root %d child_rank \n",
count,root,child_rank);
debug_module();
}
end debug */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)my_data_pointer, count_this_stripe,dtype);
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
} else {
/* leaf node */
/* copy segment into shared buffer - later on will optimize to
* eliminate extra copies.
*/
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)sbuf);
if( 0 != rc ) {
return OMPI_ERROR;
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
} else {
count_processed=0;
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
/* get unique tag for this stripe - assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/* get a pointer to the shared-memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
/* get number of elements to process in this stripe */
count_this_stripe=n_dts_per_buffer;
if( count_processed + count_this_stripe > count )
count_this_stripe=count-count_processed;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
/***************************
* Fan into root phase
***************************/
if( LEAF_NODE != my_reduction_node->my_node_type ) {
/* copy segment into shared buffer - ompi_op_reduce
* provids only 2 buffers, so can't add from two
* into a third buffer.
*/
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/*
* Wait on children, and apply op to their data
*/
for( child=0 ; child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
child_rank+=process_shift;
/* wrap around */
if( comm_size <= child_rank ){
child_rank-=comm_size;
}
child_ctl_pointer=
sm_buffer_desc->proc_memory[child_rank].control_region;
child_data_pointer=
sm_buffer_desc->proc_memory[child_rank].data_segment;
/* wait until child flag is set */
/* debug
if( 0 == child_ctl_pointer->flag ) {
fprintf(stderr,"TTT 1 count %d root %d child_rank %d \n",
count,root,child_rank);
debug_module();
}
end debug */
while(child_ctl_pointer->flag != tag) {
opal_progress();
}
/* apply collective operation */
ompi_op_reduce(op,(void *)child_data_pointer,
(void *)my_data_pointer, count_this_stripe,dtype);
} /* end child loop */
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
/* copy data to destination */
if( ROOT_NODE == my_reduction_node->my_node_type ) {
/* copy data to user supplied buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)rbuf+dt_extent*count_processed,
(char *)my_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
}
} else {
/* leaf node */
/* copy segment into shared buffer - later on will optimize to
* eliminate extra copies.
*/
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* update the count of elements processed */
count_processed+=count_this_stripe;
}
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking reduce.
*/
int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local varibles */
int i,rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int pair_rank,exchange,extra_rank,n_proc_data,tmp;
int starting_proc;
int n_elements_per_proc, n_residual_elements;
int cnt_offset,n_copy;
pair_exchange_node_t *my_exchange_node;
int my_rank,comm_size,count_processed,count_this_stripe;
int count_this_exchange;
int done_copy_tag,ok_to_copy_tag;
size_t len_data_buffer;
ptrdiff_t dt_extent;
long long tag, base_tag;
sm_work_buffer_t *sm_buffer_desc;
volatile char * extra_rank_write_data_pointer;
volatile char * my_extra_write_pointer;
volatile char * partner_base_pointer;
volatile char * my_pointer;
volatile char * my_base_pointer;
volatile char * partner_pointer;
volatile char * source_pointer;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
partner_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
extra_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
source_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
sm_module=(mca_coll_sm2_module_t *) module;
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_datatype_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* lenght of control and data regions */
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
/* number of data types copies that the scratch buffer can hold */
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
if ( 0 == n_dts_per_buffer ) {
rc=OMPI_ERROR;
goto Error;
}
len_data_buffer=n_dts_per_buffer*dt_extent;
/* compute number of stripes needed to process this collective */
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
/* get my node for the reduction tree */
my_exchange_node=&(sm_module->recursive_doubling_tree);
my_rank=ompi_comm_rank(comm);
comm_size=ompi_comm_size(comm);
/* get access to shared memory working buffer */
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_base_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
count_processed=0;
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
/* get number of elements to process in this stripe */
/* debug
t2=opal_timer_base_get_cycles();
end debug */
count_this_stripe=n_dts_per_buffer;
if( count_processed + count_this_stripe > count )
count_this_stripe=count-count_processed;
/* compute the number of elements "owned" by each process */
n_elements_per_proc=(count_this_stripe/my_exchange_node->n_largest_pow_2);
n_residual_elements=count_this_stripe-
n_elements_per_proc*my_exchange_node->n_largest_pow_2;
for(i=0 ; i < my_exchange_node->n_largest_pow_2 ; i++ ) {
sm_module->scratch_space[i]=n_elements_per_proc;
if( i < n_residual_elements) {
sm_module->scratch_space[i]++;
}
}
/* get unique set of tags for this stripe.
* Assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
base_tag=sm_module->collective_tag;
/* log_2 tags for recursive doubling, one for the non-power of 2
* initial send, 1 for first copy into shared memory, and
* one for completing the copyout.
*/
sm_module->collective_tag+=(my_exchange_node->log_2+3);
/* copy data into the write buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_base_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
/* debug
{ int *int_tmp=(int *)my_base_pointer;
int i;
fprintf(stderr," my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_stripe ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
/* debug
t3=opal_timer_base_get_cycles();
timers[1]+=(t3-t2);
end debug */
/* copy data in from the "extra" source, if need be */
tag=base_tag;
if(0 < my_exchange_node->n_extra_sources) {
int n_my_count;
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
/* signal to partner that I am ready */
MB();
/*
* Signal extra node that data is ready
*/
my_ctl_pointer->flag=tag;
/* figure out my portion of the reduction */
n_my_count=count_this_stripe/2;
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_buffer_desc->proc_memory[extra_rank].control_region;
extra_rank_write_data_pointer=
sm_buffer_desc->proc_memory[extra_rank].data_segment;
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
/* apply collective operation to first half of the data */
if( 0 < n_my_count ) {
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
(void *)my_base_pointer, n_my_count,dtype);
}
/* wait for my partner to finish reducing the data */
tag=base_tag+1;
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
/* read my partner's data */
/* adjust read an write pointers */
extra_rank_write_data_pointer+=(n_my_count*dt_extent);
if( 0 < (count_this_stripe-n_my_count) ) {
rc=ompi_datatype_copy_content_same_ddt(dtype,
count_this_stripe-n_my_count,
(char *)(my_base_pointer+n_my_count*dt_extent),
(char *)extra_rank_write_data_pointer);
if( 0 != rc ) {
return OMPI_ERROR;
}
}
/* now we are ready for the power of 2 portion of the
* algorithm
*/
} else {
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
MB();
/*
* Signal extra node that data is ready
*/
my_ctl_pointer->flag=tag;
/* figure out my portion of the reduction */
n_my_count=count_this_stripe-(count_this_stripe/2);
/* get the pointer to the partners data that needs to be reduced */
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_buffer_desc->proc_memory[extra_rank].control_region;
extra_rank_write_data_pointer=
sm_buffer_desc->proc_memory[extra_rank].data_segment;
/* offset into my half of the data */
extra_rank_write_data_pointer+=
((count_this_stripe/2)*dt_extent);
my_extra_write_pointer=my_base_pointer+
((count_this_stripe/2)*dt_extent);
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
/* apply collective operation to second half of the data */
if( 0 < n_my_count ) {
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
(void *)my_extra_write_pointer, n_my_count,dtype);
}
/* signal that I am done, so my partner can read my data */
MB();
tag=base_tag+1;
my_ctl_pointer->flag=tag;
}
}
MB();
/*
* reduce-scatter
*/
/*
* Signal parent that data is ready
*/
tag=base_tag+1;
my_ctl_pointer->flag=tag;
/*
* loop over data exchanges
*/
/* set the number of procs whos's data I will manipulate - this starts
* at the number of procs in the exchange, so a divide by two at each
* iteration will give the right number of proc for the given iteration
*/
/* debug
{ int *int_tmp=(int *)my_base_pointer;
int i;
fprintf(stderr," GGG my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_stripe ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
n_proc_data=my_exchange_node->n_largest_pow_2;
starting_proc=0;
for(exchange=my_exchange_node->n_exchanges-1;exchange>=0;exchange--) {
/* is the remote data read */
pair_rank=my_exchange_node->rank_exchanges[exchange];
partner_ctl_pointer=
sm_buffer_desc->proc_memory[pair_rank].control_region;
partner_base_pointer=
sm_buffer_desc->proc_memory[pair_rank].data_segment;
/* wait until remote data is read */
while( partner_ctl_pointer->flag < tag ) {
opal_progress();
}
/* figure out the base address to use : the lower rank gets
* the upper data, with the higher rank getting the lower half
* of the current chunk */
n_proc_data=n_proc_data/2;
if(pair_rank < my_rank ) {
starting_proc+=n_proc_data;
}
/* figure out my staring pointer */
tmp=0;
for(i=0 ; i < starting_proc ; i++ ) {
tmp+=sm_module->scratch_space[i];
}
my_pointer=my_base_pointer+tmp*dt_extent;
/* figure out partner's staring pointer */
partner_pointer=partner_base_pointer+tmp*dt_extent;
/* figure out how much to read */
tmp=0;
for(i=starting_proc ; i < starting_proc+n_proc_data ; i++ ) {
tmp+=sm_module->scratch_space[i];
}
count_this_exchange=tmp;
/* reduce data into my write buffer */
/* apply collective operation */
ompi_op_reduce(op,(void *)partner_pointer,
(void *)my_pointer, count_this_exchange,dtype);
/* debug
{ int *int_tmp=(int *)my_pointer;
int i;
fprintf(stderr," result my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_exchange ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
int_tmp=(int *)partner_pointer;
fprintf(stderr," partner data my rank %d data in tmp :: ",my_rank);
for (i=0 ; i < count_this_exchange ; i++ ) {
fprintf(stderr," %d ",int_tmp[i]);
}
fprintf(stderr,"\n");
fflush(stderr);
}
end debug */
/* signal that I am done reading my peer's data */
tag++;
MB();
my_ctl_pointer->flag=tag;
} /* end exchange loop */
/* debug
t8=opal_timer_base_get_cycles();
end debug */
/* copy data out to final destination. Could do some sort of
* recursive doubleing in the sm, then copy to process private,
* which reduces memory contention. However, this also almost
* doubles the number of copies.
*/
ok_to_copy_tag=base_tag+1+my_exchange_node->log_2;
/* only root reads the results */
if( root == my_rank) {
/* read from the result buffers directly to the final destinaion */
cnt_offset=0;
for(n_copy=0 ; n_copy < my_exchange_node->n_largest_pow_2 ; n_copy++ ) {
if( 0 >= sm_module->scratch_space[n_copy] )
continue;
source_ctl_pointer=
sm_buffer_desc->proc_memory[n_copy].control_region;
source_pointer=
sm_buffer_desc->proc_memory[n_copy].data_segment;
/* wait until remote data is read */
while( source_ctl_pointer->flag < ok_to_copy_tag ) {
opal_progress();
}
/* copy data into the destination buffer */
rc=ompi_datatype_copy_content_same_ddt(dtype,
sm_module->scratch_space[n_copy],
(char *)((char *)rbuf+
dt_extent*(count_processed+cnt_offset)),
(char *)((char *)source_pointer+
dt_extent*cnt_offset));
if( 0 != rc ) {
return OMPI_ERROR;
}
cnt_offset+=sm_module->scratch_space[n_copy];
}
}
done_copy_tag=base_tag+2+my_exchange_node->log_2;
my_ctl_pointer->flag=done_copy_tag;
/* wait for all to read the data, before re-using this buffer */
if( stripe_number < (n_data_segments-1) ) {
for(n_copy=0 ; n_copy < comm_size ; n_copy++ ) {
source_ctl_pointer=
sm_buffer_desc->proc_memory[n_copy].control_region;
while( source_ctl_pointer-> flag < done_copy_tag ) {
opal_progress();
}
}
}
/* update the count of elements processed */
count_processed+=count_this_stripe;
}
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking reduce.
*/
int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
ptrdiff_t dt_extent;
size_t len_data_buffer;
sm_module=(mca_coll_sm2_module_t *) module;
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_datatype_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
len_data_buffer=count*dt_extent;
if( len_data_buffer <= sm_module->short_message_size) {
rc=sm_module->reduce_functions[SHORT_DATA_FN_REDUCE]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
else {
rc=sm_module->reduce_functions[LONG_DATA_FN_REDUCE]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#if 0
rc= mca_coll_sm2_reduce_intra_fanin(sbuf, rbuf, count,
dtype, op, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
rc= mca_coll_sm2_reduce_intra_reducescatter_gather(sbuf, rbuf, count,
dtype, op, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#endif
return OMPI_SUCCESS;
Error:
return rc;
}

Просмотреть файл

@ -1,314 +0,0 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*
* Most of the description of the data layout is in the
* coll_sm_module.c file.
*/
#include "ompi_config.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#include "ompi/constants.h"
#include "coll_sm2.h"
#include "ompi/mca/coll/base/base.h"
/* setup an multi-nomial tree - for each node in the tree
* this returns it's parent, and it's children */
int setup_multinomial_tree(int tree_order, int num_nodes,
tree_node_t *tree_nodes)
{
/* local variables */
int i,result;
int cnt, parent_cnt,n_nodes_in_this_level,node_index;
int n_cum_nodes,current_level,node,n_nodes_prev_level,rank,parent_rank;
int n_nodes_in_last_level,n_full_stripes,n_in_partial_stipe,n_children;
int n_lvls_in_tree;
/* sanity check */
if( 1 >= tree_order ) {
goto Error;
}
/* figure out number of levels in the tree */
n_lvls_in_tree=0;
result=num_nodes;
/* cnt - number of ranks in given level */
cnt=1;
/* parent_cnt - cummulative count of ranks */
parent_cnt=0;
while( 0 < result ) {
result-=cnt;
cnt*=tree_order;
n_lvls_in_tree++;
};
/* loop over tree levels */
n_nodes_in_this_level=1;
node_index=-1;
n_cum_nodes=0;
for( current_level = 0 ; current_level < n_lvls_in_tree ; current_level++) {
/* loop over nodes in current level */
for ( node=0 ; node < n_nodes_in_this_level ; node++ ) {
/* get node index */
node_index++;
/* break if reach group size */
if( node_index == num_nodes) {
break;
}
tree_nodes[node_index].my_rank=node_index;
tree_nodes[node_index].children_ranks=NULL;
/*
* Parents
*/
if( 0 == current_level ) {
tree_nodes[node_index].n_parents=0;
/* get parent index */
tree_nodes[node_index].parent_rank=-1;
} else {
tree_nodes[node_index].n_parents=1;
/* get parent index */
n_nodes_prev_level=n_nodes_in_this_level/tree_order;
if( current_level == n_lvls_in_tree -1 ) {
/* load balance the lowest level */
parent_rank=node-
(node/n_nodes_prev_level)*n_nodes_prev_level;
parent_rank=n_cum_nodes-n_nodes_prev_level+
parent_rank;
tree_nodes[node_index].parent_rank=parent_rank;
} else {
tree_nodes[node_index].parent_rank=
(n_cum_nodes-n_nodes_prev_level)+node/tree_order;
}
}
/*
* Children
*/
/* get number of children */
if( (n_lvls_in_tree-1) == current_level ) {
/* leaves have no nodes */
tree_nodes[node_index].n_children=0;
tree_nodes[node_index].children_ranks=NULL;
} else {
/* take into account last level being incomplete */
if( (n_lvls_in_tree-2) == current_level ) {
/* last level is load balanced */
n_nodes_in_last_level=num_nodes-
(n_cum_nodes+n_nodes_in_this_level);
n_full_stripes=n_nodes_in_last_level/n_nodes_in_this_level;
n_in_partial_stipe=n_nodes_in_last_level-
n_full_stripes*n_nodes_in_this_level;
n_children=n_full_stripes;
if( n_full_stripes < tree_order ) {
if( node <= n_in_partial_stipe-1 ) {
n_children++;
}
}
tree_nodes[node_index].n_children=n_children;
if( 0 < n_children ) {
tree_nodes[node_index].children_ranks=(int *)
malloc(sizeof(int)*n_children);
if( NULL == tree_nodes[node_index].children_ranks) {
goto Error;
}
} else {
tree_nodes[node_index].children_ranks=NULL;
}
/* fill in list */
for( rank=0 ; rank < n_children ; rank++ ) {
tree_nodes[node_index].children_ranks[rank]=
node+rank*n_nodes_in_this_level;
tree_nodes[node_index].children_ranks[rank]+=
(n_cum_nodes+n_nodes_in_this_level);
}
} else {
n_children=tree_order;
tree_nodes[node_index].n_children=tree_order;
tree_nodes[node_index].children_ranks=(int *)
malloc(sizeof(int)*n_children);
if( NULL == tree_nodes[node_index].children_ranks) {
goto Error;
}
for( rank=0 ; rank < n_children ; rank++ ) {
tree_nodes[node_index].children_ranks[rank]=
rank+tree_order*node;
tree_nodes[node_index].children_ranks[rank]+=
(n_cum_nodes+n_nodes_in_this_level);
}
}
}
} /* end node loop */
/* update helper counters */
n_cum_nodes+=n_nodes_in_this_level;
n_nodes_in_this_level*=tree_order;
}
/* set node type */
for(i=0 ; i < num_nodes ; i++ ) {
if( 0 == tree_nodes[i].n_parents ) {
tree_nodes[i].my_node_type=ROOT_NODE;
} else if ( 0 == tree_nodes[i].n_children ) {
tree_nodes[i].my_node_type=LEAF_NODE;
} else {
tree_nodes[i].my_node_type=INTERIOR_NODE;
}
}
/* successful return */
return OMPI_SUCCESS;
Error:
/* free allocated memory */
for( i=0 ; i < num_nodes ; i++ ) {
if( NULL != tree_nodes[i].children_ranks ) {
free(tree_nodes[i].children_ranks);
}
}
/* error return */
return OMPI_ERROR;
}
/* setup recursive doubleing tree node */
int setup_recursive_doubling_tree_node(int num_nodes, int node_rank,
pair_exchange_node_t *exchange_node)
{
/* local variables */
int i,tmp,cnt,result,tree_order,n_extra_nodes;
int n_exchanges;
/* figure out number of levels in the tree */
n_exchanges=0;
result=num_nodes;
tree_order=2;
/* cnt - number of ranks in given level */
cnt=1;
while( num_nodes > cnt ) {
cnt*=tree_order;
n_exchanges++;
};
/* figure out the largest power of 2 that is less than or equal to
* num_nodes */
if( cnt > num_nodes) {
cnt/=tree_order;
n_exchanges--;
}
exchange_node->log_2=n_exchanges;
tmp=1;
for(i=0 ; i < n_exchanges ; i++ ) {
tmp*=2;
}
exchange_node->n_largest_pow_2=tmp;
/* set node characteristics - node that is not within the largest
* power of 2 will just send it's data to node that will participate
* in the recursive doubling, and get the result back at the end.
*/
if( node_rank+1 > cnt ) {
exchange_node->node_type=EXTRA_NODE;
} else {
exchange_node->node_type=EXCHANGE_NODE;
}
/* set the initial and final data exchanges - those that are not
* part of the recursive doubling.
*/
n_extra_nodes=num_nodes-cnt;
if ( EXCHANGE_NODE == exchange_node->node_type ) {
if( node_rank < n_extra_nodes ) {
exchange_node->n_extra_sources=1;
exchange_node->rank_extra_source=cnt+node_rank;
} else {
exchange_node->n_extra_sources=0;
exchange_node->rank_extra_source=-1;
}
} else {
exchange_node->n_extra_sources=1;
exchange_node->rank_extra_source=node_rank-cnt;
}
/* set the exchange pattern */
if( EXCHANGE_NODE == exchange_node->node_type ) {
exchange_node->n_exchanges=n_exchanges;
exchange_node->rank_exchanges=(int *) malloc
(n_exchanges*sizeof(int));
if( NULL == exchange_node->rank_exchanges ) {
goto Error;
}
/* fill in exchange partners */
result=1;
tmp=node_rank;
for( i=0 ; i < n_exchanges ; i++ ) {
if(tmp & 1 ) {
exchange_node->rank_exchanges[i]=
node_rank-result;
} else {
exchange_node->rank_exchanges[i]=
node_rank+result;
}
result*=2;
tmp/=2;
}
} else {
exchange_node->n_exchanges=0;
exchange_node->rank_exchanges=NULL;
}
/* set the number of tags needed per stripe - this must be the
* same across all procs in the communicator.
*/
exchange_node->n_tags=2*n_exchanges+1;
/* successful return */
return OMPI_SUCCESS;
Error:
/* error return */
return OMPI_ERROR;
}

Просмотреть файл

@ -1,34 +0,0 @@
# -*- text -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# This is the US/English general help file for Open MPI's Shared memory
# collective component.
#
[tree-degree-larger-than-control]
The specified shared memory collective tree degree
(coll_sm_tree_degree = %d) is too large. It must be less than or
equal to the control size (coll_sm_control_size = %d).
Automatically adjusting the tree degree to be equal to the control
size and continuing...
[tree-degree-larger-than-255]
The specified shared memory collective tree degree
(coll_sm_tree_degree = %d) is too large. It must be less than or
equal to 255.
Automatically adjusting the tree degree to be 255 and continuing...