1
1

add selection logic for barrier and reduce.

This commit was SVN r18215.
Этот коммит содержится в:
Rich Graham 2008-04-19 22:40:04 +00:00
родитель bee8b42f29
Коммит df35223603
6 изменённых файлов: 396 добавлений и 59 удалений

Просмотреть файл

@ -106,11 +106,32 @@ BEGIN_C_DECLS
/** MCA parameter: order of fan-out read tree */
int order_fanout_read_tree;
/*I MCA paramenter: number of polling loops to run while waiting
/** MCA paramenter: number of polling loops to run while waiting
* for children or parent to complete their work
*/
int n_poll_loops;
/** MCA parameter: message size cutoff for switching between
* short and long protocol
*/
size_t short_message_size;
/*
* Parameters to control methods used
*/
/** MCA parameter: method to force a given barrier method to be used.
* 0 - FANIN_FAN_OUT_BARRIER_FN
* 1 - RECURSIVE_DOUBLING_BARRIER_FN
*/
int force_barrier;
/** MCA parameter: method to force a given reduce method to be used.
* 0 - FANIN_FAN_OUT_REDUCE_FN,
* 1 - REDUCE_SCATTER_GATHER_FN,
*/
int force_reduce;
};
/**
@ -118,6 +139,30 @@ BEGIN_C_DECLS
*/
typedef struct mca_coll_sm2_component_t mca_coll_sm2_component_t;
/*
* Implemented function index list
*/
/* barrier */
enum{
FANIN_FAN_OUT_BARRIER_FN,
RECURSIVE_DOUBLING_BARRIER_FN,
N_BARRIER_FNS
};
/* reduce */
enum{
FANIN_REDUCE_FN,
REDUCE_SCATTER_GATHER_FN,
N_REDUCE_FNS
};
enum{
SHORT_DATA_FN,
LONG_DATA_FN,
N_REDUCE_FNS_USED
};
/* enum for node type */
enum{
ROOT_NODE,
@ -326,7 +371,7 @@ BEGIN_C_DECLS
int index_blocking_barrier_memory_bank;
/* pointers to blocking memory control regions */
mca_coll_sm2_nb_request_process_shared_mem_t ***ctl_blocking_barrier;
volatile mca_coll_sm2_nb_request_process_shared_mem_t ***ctl_blocking_barrier;
/* description of allocated temp buffers - one struct per
* buffer. Each buffer has space "owned" by each process
@ -403,16 +448,22 @@ BEGIN_C_DECLS
/* collective tag */
long long collective_tag;
/* debug flag RLG */
int blocked_on_barrier;
long long barrier_bank_list[BARRIER_BANK_LIST_SIZE];
long long barrier_bank_cntr;
/* end debug */
/* scratch space - one int per process */
int *scratch_space;
/* message size cutoff for switching between short and long
* protocol
*/
size_t short_message_size;
/*
* function table for variants of a given collective
* function.
*/
mca_coll_base_module_barrier_fn_t barrier_functions[N_BARRIER_FNS];
mca_coll_base_module_reduce_fn_t list_reduce_functions[N_REDUCE_FNS];
mca_coll_base_module_reduce_fn_t reduce_functions[N_REDUCE_FNS_USED];
};
typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t;
@ -541,6 +592,16 @@ BEGIN_C_DECLS
int root, struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
int root, struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
/**
* Shared memory blocking broadcast.
*/
@ -555,6 +616,14 @@ BEGIN_C_DECLS
int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
int mca_coll_sm2_barrier_intra_fanin_fanout(
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
int mca_coll_sm2_barrier_intra_recursive_doubling(
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module);
END_C_DECLS
#endif /* MCA_COLL_SM2_EXPORT_H */

Просмотреть файл

@ -1118,7 +1118,7 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf,
/* local varibles */
int i,rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
int pair_rank,exchange,extra_rank,n_proc_data,tmp;
int starting_proc,n_procs_to_read,base_block_proc,base_read_proc;
int starting_proc;
int n_elements_per_proc, n_residual_elements;
int cnt_offset,n_copy;
pair_exchange_node_t *my_exchange_node;
@ -1129,7 +1129,6 @@ int mca_coll_sm2_allreduce_intra_reducescatter_allgather(void *sbuf, void *rbuf,
ptrdiff_t dt_extent;
long long tag, base_tag;
sm_work_buffer_t *sm_buffer_desc;
volatile char * my_write_pointer;
volatile char * extra_rank_write_data_pointer;
volatile char * extra_rank_read_data_pointer;
volatile char * partner_base_pointer;

Просмотреть файл

@ -314,13 +314,12 @@ DONE:
/**
* Shared memory blocking allreduce.
*/
static
int mca_coll_sm2_barrier_intra_fanin_fanout(
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS;
int rc=OMPI_SUCCESS,bar_buff_index;
int my_rank, child_rank, child, n_parents, n_children;
int my_fanin_parent;
int my_fanout_parent;
@ -334,8 +333,6 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_rank=ompi_comm_rank(comm);
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
@ -351,10 +348,17 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
tag=sm_module->collective_tag;
sm_module->collective_tag++;
/*
sm_buffer_desc=alloc_sm2_shared_buffer(sm_module);
*/
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
/* offset to data segment */
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/***************************
* Fan into root phase
@ -369,11 +373,13 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
child_rank=my_reduction_node->children_ranks[child];
child_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][child_rank];
/*
sm_buffer_desc->proc_memory[child_rank].control_region;
*/
/* wait until child flag is set */
while( child_ctl_pointer->flag != tag ) {
/* Note: Actually need to make progress here */
opal_progress();
}
@ -384,7 +390,9 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
@ -396,7 +404,9 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
@ -415,14 +425,19 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
/* I am the root - so copy signal children, and then
* start reading
*/
/*
MB();
*/
my_ctl_pointer->flag=-tag;
} else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
@ -435,7 +450,10 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
/* interior nodes */
parent_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_fanout_parent];
/*
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
*/
/*
* wait on Parent to signal that data is ready
@ -447,13 +465,216 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/* signal children that they may read the result data */
my_ctl_pointer->flag=-tag;
}
/* "free" the shared-memory working buffer */
/*
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
*/
/* return */
return rc;
Error:
return rc;
}
/**
* Shared memory blocking barrier.
*/
int mca_coll_sm2_barrier_intra_recursive_doubling(
struct ompi_communicator_t *comm,
struct mca_coll_base_module_1_1_0_t *module)
{
/* local variables */
int rc=OMPI_SUCCESS;
int pair_rank,exchange,extra_rank;
pair_exchange_node_t *my_exchange_node;
int my_rank,bar_buff_index;
long long tag, base_tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
partner_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
extra_ctl_pointer;
mca_coll_sm2_module_t *sm_module;
/* debug
opal_timer_t t0,t1,t2,t3,t4,t5,t6,t7,t8,t9,t10;
end debug */
sm_module=(mca_coll_sm2_module_t *) module;
/* get my node for the reduction tree */
my_exchange_node=&(sm_module->recursive_doubling_tree);
my_rank=ompi_comm_rank(comm);
/* get pointer to barrier strcuture */
sm_module->index_blocking_barrier_memory_bank^=1;
bar_buff_index=sm_module->index_blocking_barrier_memory_bank;
/* get unique set of tags for this stripe.
* Assume only one collective
* per communicator at a given time, so no locking needed
* for atomic update of the tag */
base_tag=sm_module->collective_tag;
sm_module->collective_tag+=my_exchange_node->n_tags;
/* get pointers to my work buffers */
my_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][my_rank];
/*
sm_buffer_desc->proc_memory[my_rank].control_region;
*/
/* copy data in from the "extra" source, if need be */
tag=base_tag;
if(0 < my_exchange_node->n_extra_sources) {
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
} else {
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
}
}
/*
MB();
*/
/*
* Signal parent that data is ready
*/
tag=base_tag+1;
my_ctl_pointer->flag=tag;
/* loop over data exchanges */
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
/* debug
t4=opal_sys_timer_get_cycles();
end debug */
/* is the remote data read */
pair_rank=my_exchange_node->rank_exchanges[exchange];
partner_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][pair_rank];
/*
sm_buffer_desc->proc_memory[pair_rank].control_region;
*/
/*
MB();
*/
my_ctl_pointer->flag=tag;
/* wait until remote data is read */
while( partner_ctl_pointer->flag < tag ) {
opal_progress();
}
/* end test */
/* signal that I am done reading my peer's data */
tag++;
}
/* copy data in from the "extra" source, if need be */
if(0 < my_exchange_node->n_extra_sources) {
tag=base_tag+my_exchange_node->n_tags-1;
if ( EXTRA_NODE == my_exchange_node->node_type ) {
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while(! ( extra_ctl_pointer->flag == tag ) ) {
opal_progress();
}
/* signal that I am done */
my_ctl_pointer->flag=tag;
} else {
tag=base_tag+my_exchange_node->n_tags-1;
/* set memory barriet to make sure data is in main memory before
* the completion flgas are set.
*/
/*
MB();
*/
/*
* Signal parent that data is ready
*/
my_ctl_pointer->flag=tag;
/* wait until child is done to move on - this buffer will
* be reused for the next stripe, so don't want to move
* on too quick.
*/
extra_rank=my_exchange_node->rank_extra_source;
extra_ctl_pointer=
sm_module->ctl_blocking_barrier[bar_buff_index][extra_rank];
/*
sm_buffer_desc->proc_memory[extra_rank].control_region;
*/
/* wait until remote data is read */
while( extra_ctl_pointer->flag < tag ) {
opal_progress();
}
}
}
/* debug
t9=opal_sys_timer_get_cycles();
timers[5]+=(t9-t8);
end debug */
/* "free" the shared-memory working buffer */
rc=free_sm2_shared_buffer(sm_module);
if( OMPI_SUCCESS != rc ) {
@ -466,7 +687,6 @@ int mca_coll_sm2_barrier_intra_fanin_fanout(
Error:
return rc;
}
/**
* Shared memory blocking barrier
*/
@ -475,8 +695,11 @@ int mca_coll_sm2_barrier_intra( struct ompi_communicator_t *comm,
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
rc= mca_coll_sm2_barrier_intra_fanin_fanout(comm, module);
sm_module=(mca_coll_sm2_module_t *) module;
rc= sm_module->barrier_functions[0](comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}

Просмотреть файл

@ -140,7 +140,7 @@ static int sm2_open(void)
/* set component priority */
cs->sm2_priority=
mca_coll_sm2_param_register_int("sm2_priority",0);
mca_coll_sm2_param_register_int("sm2_priority",90);
/* set control region size (bytes), per proc */
cs->sm2_ctl_size_per_proc=
@ -155,7 +155,7 @@ static int sm2_open(void)
/* Min data Segment size (bytes) - per proc */
cs->sm2_data_seg_size=
mca_coll_sm2_param_register_int("sm2_data_seg_size",0);
mca_coll_sm2_param_register_int("sm2_data_seg_size",32768);
/* Max data Segment size (bytes) - per proc */
cs->sm2_max_data_seg_size=
@ -194,6 +194,20 @@ static int sm2_open(void)
cs->n_poll_loops=
mca_coll_sm2_param_register_int("n_poll_loops",4);
/* Size of message for switching between short and long protocol.
* This should probably be the segment size for several algorithms,
* though not all.
*/
cs->short_message_size=
mca_coll_sm2_param_register_int("short_message_size",32768);
/* collective ops to use */
cs->force_barrier=
mca_coll_sm2_param_register_int("force_barrier",(-1));
cs->force_reduce=
mca_coll_sm2_param_register_int("force_reduce",(-1));
/* debug */
/*
new_sigact.sa_handler=dbg_handler;

Просмотреть файл

@ -726,6 +726,42 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
sm_module->super.coll_scatter = NULL;
sm_module->super.coll_scatterv = NULL;
/*
* set up specific function to be used
*/
/* barrier */
sm_module->barrier_functions[FANIN_FAN_OUT_BARRIER_FN]=
mca_coll_sm2_barrier_intra_fanin_fanout;
sm_module->barrier_functions[RECURSIVE_DOUBLING_BARRIER_FN]=
mca_coll_sm2_barrier_intra_fanin_fanout;
if( ( 0 <= mca_coll_sm2_component.force_barrier ) &&
( N_BARRIER_FNS > mca_coll_sm2_component.force_barrier ) ) {
/* set user specifed function */
mca_coll_base_module_barrier_fn_t tmp_fn=
sm_module->barrier_functions[mca_coll_sm2_component.force_barrier];
sm_module->barrier_functions[FANIN_FAN_OUT_BARRIER_FN]=tmp_fn;
sm_module->barrier_functions[RECURSIVE_DOUBLING_BARRIER_FN]=tmp_fn;
}
/* reduce */
sm_module->list_reduce_functions[FANIN_REDUCE_FN]=
mca_coll_sm2_reduce_intra_fanin;
sm_module->list_reduce_functions[REDUCE_SCATTER_GATHER_FN]=
mca_coll_sm2_reduce_intra_reducescatter_gather;
sm_module->reduce_functions[SHORT_DATA_FN]=
sm_module->list_reduce_functions[FANIN_REDUCE_FN];
sm_module->reduce_functions[LONG_DATA_FN]=
sm_module->list_reduce_functions[REDUCE_SCATTER_GATHER_FN];
if( ( 0 <= mca_coll_sm2_component.force_reduce ) &&
( N_REDUCE_FNS > mca_coll_sm2_component.force_reduce ) ) {
/* set user specifed function */
mca_coll_base_module_barrier_fn_t tmp_fn=
sm_module->reduce_functions[mca_coll_sm2_component.force_reduce];
sm_module->reduce_functions[SHORT_DATA_FN]=tmp_fn;
sm_module->reduce_functions[LONG_DATA_FN]=tmp_fn;
}
/*
* Some initialization
*/
@ -1059,7 +1095,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
sm_module->index_blocking_barrier_memory_bank=0;
sm_module->ctl_blocking_barrier=
(mca_coll_sm2_nb_request_process_shared_mem_t ***)
(volatile mca_coll_sm2_nb_request_process_shared_mem_t ***)
malloc(2*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t **));
if( NULL == sm_module->ctl_blocking_barrier ) {
goto CLEANUP;
@ -1077,11 +1113,6 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
goto CLEANUP;
}
/* debug */
fprintf(stderr," sizeof(mca_coll_sm2_nb_request_process_shared_mem_t) %lx \n",
sizeof(mca_coll_sm2_nb_request_process_shared_mem_t));
fflush(stderr);
/* end debug */
for( j= 0 ; j < 2 ; j++ ) {
for( i=0 ; i < group_size ; i++ ) {
sm_module->ctl_blocking_barrier[j][i]=
@ -1091,18 +1122,12 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
j*sizeof(mca_coll_sm2_nb_request_process_shared_mem_t)+
i*sm_module->per_proc_size_of_blocking_barrier_region )
;
/* debug */
fprintf(stderr," i %d j %d %p base %p pp %lx\n",i,j,
sm_module->ctl_blocking_barrier[j][i],
sm_module->sm_blocking_barrier_region,
sm_module->per_proc_size_of_blocking_barrier_region);
fflush(stderr);
/* end debug */
sm_module->ctl_blocking_barrier[j][i]->flag=0;
}
}
/* set the switch-over parameter */
sm_module->short_message_size=mca_coll_sm2_component.short_message_size;
/* touch pages to apply memory affinity - Note: do we really need this or will
* the algorithms do this */

Просмотреть файл

@ -20,22 +20,10 @@
extern uint64_t timers[7];
end debug */
/* debug
#include <assert.h>
extern void debug_module(void);
extern int last_root;
extern int node_type;
extern long long free_buff_free_index;
int last_root;
int node_type;
end debug */
/**
* Shared memory blocking allreduce.
*/
static
int mca_coll_sm2_reduce_intra_fanin(void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
@ -400,8 +388,7 @@ Error:
/**
* Shared memory blocking reduce.
*/
static
int mca_coll_sm2_reduce_intra_reducescatter_allgather(void *sbuf, void *rbuf,
int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
int root,
@ -492,14 +479,6 @@ int mca_coll_sm2_reduce_intra_reducescatter_allgather(void *sbuf, void *rbuf,
sm_module->scratch_space[i]++;
}
}
/* debug
fprintf(stderr," my_rank %d element list count_this_stripe %d : ",my_rank,count_this_stripe);
for(i=0 ; i < comm_size ; i++ ) {
fprintf(stderr," %d ",sm_module->scratch_space[i]);
}
fprintf(stderr," \n");
fflush(stderr);
end debug */
/* get unique set of tags for this stripe.
* Assume only one collective
@ -745,6 +724,34 @@ int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
{
/* local variables */
int rc;
mca_coll_sm2_module_t *sm_module;
ptrdiff_t dt_extent;
size_t len_data_buffer;
sm_module=(mca_coll_sm2_module_t *) module;
/* get size of data needed - same layout as user data, so that
* we can apply the reudction routines directly on these buffers
*/
rc=ompi_ddt_type_extent(dtype, &dt_extent);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
len_data_buffer=count*dt_extent;
if( len_data_buffer <= sm_module->short_message_size) {
rc=sm_module->reduce_functions[SHORT_DATA_FN]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
else {
rc=sm_module->reduce_functions[LONG_DATA_FN]
(sbuf, rbuf, count, dtype, op, root, comm, module);
}
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#if 0
rc= mca_coll_sm2_reduce_intra_fanin(sbuf, rbuf, count,
@ -752,13 +759,13 @@ int mca_coll_sm2_reduce_intra(void *sbuf, void *rbuf, int count,
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#endif
rc= mca_coll_sm2_reduce_intra_reducescatter_allgather(sbuf, rbuf, count,
rc= mca_coll_sm2_reduce_intra_reducescatter_gather(sbuf, rbuf, count,
dtype, op, root, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#endif
return OMPI_SUCCESS;