checkpoint - compiles, now neeed to debug.
This commit was SVN r17775.
Этот коммит содержится в:
родитель
18d5673d6b
Коммит
70157166f9
@ -166,6 +166,9 @@ BEGIN_C_DECLS
|
||||
/* rank of the extra source */
|
||||
int rank_extra_source;
|
||||
|
||||
/* number of tags needed per stripe */
|
||||
int n_tags;
|
||||
|
||||
/* node type */
|
||||
int node_type;
|
||||
|
||||
|
@ -53,8 +53,8 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
/* get unique tag for this collective - assume only one collective
|
||||
* per communicator at a given time, so no locking needed
|
||||
* for atomic update of the tag */
|
||||
sm_module->collective_tag++;
|
||||
tag=sm_module->collective_tag;
|
||||
sm_module->collective_tag++;
|
||||
|
||||
/* get size of data needed - same layout as user data, so that
|
||||
* we can apply the reudction routines directly on these buffers
|
||||
@ -338,33 +338,30 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
||||
{
|
||||
/* local variables */
|
||||
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
||||
int my_rank, child_rank, child, n_parents, n_children;
|
||||
int my_fanin_parent,count_processed,count_this_stripe;
|
||||
int my_fanout_parent;
|
||||
int pair_rank,exchange,extra_rank;
|
||||
pair_exchange_node_t *my_exchange_node;
|
||||
int my_rank,count_processed,count_this_stripe;
|
||||
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
|
||||
long long tag;
|
||||
long long tag, base_tag;
|
||||
volatile char * sm_buffer;
|
||||
volatile char * my_data_pointer;
|
||||
volatile char * child_data_pointer;
|
||||
volatile char * parent_data_pointer;
|
||||
volatile char * my_write_pointer;
|
||||
volatile char * my_read_pointer;
|
||||
volatile char * extra_rank_write_data_pointer;
|
||||
volatile char * partner_read_pointer;
|
||||
volatile char * partner_write_pointer;
|
||||
char *my_base_temp_pointer;
|
||||
volatile char * child_base_temp_pointer;
|
||||
volatile char * parent_base_temp_pointer;
|
||||
volatile char * partner_base_temp_pointer;
|
||||
volatile char * extra_rank_base_temp_pointer;
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||
partner_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t *
|
||||
extra_ctl_pointer;
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
tree_node_t *my_reduction_node, *my_fanout_read_tree;
|
||||
|
||||
|
||||
sm_module=(mca_coll_sm2_module_t *) module;
|
||||
|
||||
/* get unique tag for this collective - assume only one collective
|
||||
* per communicator at a given time, so no locking needed
|
||||
* for atomic update of the tag */
|
||||
sm_module->collective_tag++;
|
||||
tag=sm_module->collective_tag;
|
||||
|
||||
/* get size of data needed - same layout as user data, so that
|
||||
* we can apply the reudction routines directly on these buffers
|
||||
*/
|
||||
@ -385,17 +382,17 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* need a read and a write buffer for a pair-wise exchange of data */
|
||||
n_dts_per_buffer/=2;
|
||||
len_data_buffer=n_dts_per_buffer*dt_extent;
|
||||
|
||||
/* compute number of stripes needed to process this collective */
|
||||
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
|
||||
|
||||
/* get my node for the reduction tree */
|
||||
my_exchange_node=&(sm_module->recursive_doubling_tree);
|
||||
my_rank=ompi_comm_rank(comm);
|
||||
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
|
||||
my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]);
|
||||
n_children=my_reduction_node->n_children;
|
||||
n_parents=my_reduction_node->n_parents;
|
||||
my_fanin_parent=my_reduction_node->parent_rank;
|
||||
my_fanout_parent=my_fanout_read_tree->parent_rank;
|
||||
|
||||
count_processed=0;
|
||||
|
||||
/* get a pointer to the shared-memory working buffer */
|
||||
@ -411,211 +408,202 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
||||
if( count_processed + count_this_stripe > count )
|
||||
count_this_stripe=count-count_processed;
|
||||
|
||||
/* get unique set of tags for this stripe.
|
||||
* Assume only one collective
|
||||
* per communicator at a given time, so no locking needed
|
||||
* for atomic update of the tag */
|
||||
base_tag=sm_module->collective_tag;
|
||||
sm_module->collective_tag+=my_exchange_node->n_tags;
|
||||
|
||||
/* get base address to "my" memory segment */
|
||||
my_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+sm_module->sm_buffer_mgmt_barrier_tree.my_rank*
|
||||
sm_module->segement_size_per_process);
|
||||
/* offset to data segment */
|
||||
my_data_pointer=my_base_temp_pointer+ctl_size;
|
||||
my_ctl_pointer=(mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
my_base_temp_pointer;
|
||||
/* my partner will read my data, as I am reducing it's data into
|
||||
* my buffer
|
||||
*/
|
||||
my_write_pointer=my_base_temp_pointer+ctl_size;
|
||||
my_read_pointer=my_write_pointer+len_data_buffer;
|
||||
|
||||
/***************************
|
||||
* Fan into root phase
|
||||
***************************/
|
||||
/* copy data into the write buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_write_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* copy data in from the "extra" source, if need be */
|
||||
tag=base_tag;
|
||||
if(0 < my_exchange_node->n_extra_sources) {
|
||||
|
||||
if( LEAF_NODE != my_reduction_node->my_node_type ) {
|
||||
/* copy segment into shared buffer - ompi_op_reduce
|
||||
* provids only 2 buffers, so can't add from two
|
||||
* into a third buffer.
|
||||
*/
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
|
||||
|
||||
/*
|
||||
* Wait on children, and apply op to their data
|
||||
*/
|
||||
for( child=0 ; child < n_children ; child++ ) {
|
||||
child_rank=my_reduction_node->children_ranks[child];
|
||||
|
||||
/* get base address of child process */
|
||||
child_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+child_rank*
|
||||
extra_rank=my_exchange_node->rank_extra_source;
|
||||
extra_rank_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+extra_rank*
|
||||
sm_module->segement_size_per_process);
|
||||
|
||||
child_data_pointer=child_base_temp_pointer+ctl_size;
|
||||
child_ctl_pointer=
|
||||
|
||||
extra_ctl_pointer=
|
||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||
child_base_temp_pointer;
|
||||
|
||||
/* wait until child flag is set */
|
||||
extra_rank_base_temp_pointer;
|
||||
extra_rank_write_data_pointer=extra_rank_base_temp_pointer+
|
||||
ctl_size;
|
||||
|
||||
/* wait until remote data is read */
|
||||
while(!
|
||||
( (child_ctl_pointer->flag == tag) &
|
||||
(child_ctl_pointer->index== stripe_number) ) ) {
|
||||
/* Note: Actually need to make progress here */
|
||||
( extra_ctl_pointer->flag == tag ) ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* apply collective operation */
|
||||
ompi_op_reduce(op,(void *)child_data_pointer,
|
||||
(void *)my_data_pointer, count_this_stripe,dtype);
|
||||
/* test
|
||||
{
|
||||
int ii,n_ints;
|
||||
int *my_int=(int *)my_data_pointer;
|
||||
int *child_int=(int *)child_data_pointer;
|
||||
n_ints=count_this_stripe/4;
|
||||
for(ii=0 ; ii < n_ints ; ii++ ) {
|
||||
my_int[ii]+=child_data_pointer[ii];
|
||||
}
|
||||
}
|
||||
end test */
|
||||
|
||||
/* end test */
|
||||
} /* end child loop */
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
my_ctl_pointer->index=stripe_number;
|
||||
} else {
|
||||
/* leaf node */
|
||||
/* copy segment into shared buffer - later on will optimize to
|
||||
* eliminate extra copies.
|
||||
*/
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
my_ctl_pointer->index=stripe_number;
|
||||
}
|
||||
|
||||
/***************************
|
||||
* Fan into root phase
|
||||
***************************/
|
||||
/*
|
||||
* Fan out from root - let the memory copies at each
|
||||
* stage help reduce memory contention.
|
||||
*/
|
||||
if( ROOT_NODE == my_fanout_read_tree->my_node_type ) {
|
||||
/* I am the root - so copy signal children, and then
|
||||
* start reading
|
||||
*/
|
||||
MB();
|
||||
my_ctl_pointer->flag=-tag;
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)((char *)rbuf+dt_extent*count_processed),
|
||||
(char *)my_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
} else if( LEAF_NODE == my_fanout_read_tree->my_node_type ) {
|
||||
|
||||
parent_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+my_fanout_parent*
|
||||
sm_module->segement_size_per_process);
|
||||
|
||||
parent_data_pointer=(volatile char *)
|
||||
((char *)parent_base_temp_pointer+ctl_size);
|
||||
parent_ctl_pointer=(volatile
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
parent_base_temp_pointer;
|
||||
|
||||
child_ctl_pointer=
|
||||
(volatile mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
parent_data_pointer;
|
||||
|
||||
/*
|
||||
* wait on Parent to signal that data is ready
|
||||
*/
|
||||
while(!
|
||||
( (parent_ctl_pointer->flag == -tag) &
|
||||
(parent_ctl_pointer->index== stripe_number) ) ) {
|
||||
/* Note: Actually need to make progress here */
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)rbuf+dt_extent*count_processed,
|
||||
(char *)parent_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
} else {
|
||||
/* interior nodes */
|
||||
parent_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+my_fanout_parent*
|
||||
sm_module->segement_size_per_process);
|
||||
|
||||
parent_data_pointer=(volatile char *)
|
||||
((char *)parent_base_temp_pointer+ctl_size);
|
||||
parent_ctl_pointer=(volatile
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
parent_base_temp_pointer;
|
||||
|
||||
child_ctl_pointer=
|
||||
(volatile mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
parent_data_pointer;
|
||||
|
||||
/*
|
||||
* wait on Parent to signal that data is ready
|
||||
*/
|
||||
while(!
|
||||
( (parent_ctl_pointer->flag == -tag) &
|
||||
(parent_ctl_pointer->index== stripe_number) ) ) {
|
||||
/* Note: Actually need to make progress here */
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* copy the data to my shared buffer, for access by children */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,(char *)parent_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
||||
(void *)my_write_pointer, count_this_stripe,dtype);
|
||||
} else {
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
|
||||
/* signal children that they may read the result data */
|
||||
my_ctl_pointer->flag=-tag;
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
}
|
||||
}
|
||||
|
||||
/* fill in read buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_read_pointer, (char *)my_write_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
tag++;
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
/* loop over data exchanges */
|
||||
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
|
||||
|
||||
/* is the remote data read */
|
||||
pair_rank=my_exchange_node->rank_exchanges[exchange];
|
||||
partner_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+pair_rank*
|
||||
sm_module->segement_size_per_process);
|
||||
|
||||
partner_ctl_pointer=
|
||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||
partner_base_temp_pointer;
|
||||
partner_write_pointer=(char *)partner_ctl_pointer+ctl_size;
|
||||
partner_read_pointer=partner_write_pointer+
|
||||
len_data_buffer;
|
||||
|
||||
/* wait until remote data is read */
|
||||
while( partner_ctl_pointer->flag < tag ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* reduce data into my write buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)rbuf+dt_extent*count_processed,
|
||||
(char *)my_data_pointer);
|
||||
(char *)my_write_pointer,
|
||||
(char *)partner_read_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* signal that I am done reading my peer's data */
|
||||
tag++;
|
||||
MB();
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
/* can I modify my read buffer - is my pair done readin my data */
|
||||
while( partner_ctl_pointer->flag < tag ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* write the data into my read buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_read_pointer, (char *)my_write_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* signal that I am done writing my data into the read buffer */
|
||||
tag++;
|
||||
MB();
|
||||
my_ctl_pointer->flag=tag;
|
||||
}
|
||||
|
||||
|
||||
/* copy data in from the "extra" source, if need be */
|
||||
tag=base_tag;
|
||||
if(0 < my_exchange_node->n_extra_sources) {
|
||||
tag=base_tag+my_exchange_node->n_tags-1;
|
||||
|
||||
if ( EXTRA_NODE == my_exchange_node->node_type ) {
|
||||
|
||||
extra_rank=my_exchange_node->rank_extra_source;
|
||||
extra_rank_base_temp_pointer=(char *)
|
||||
((char *)sm_buffer+extra_rank*
|
||||
sm_module->segement_size_per_process);
|
||||
|
||||
extra_ctl_pointer=
|
||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||
extra_rank_base_temp_pointer;
|
||||
extra_rank_write_data_pointer=extra_rank_base_temp_pointer+
|
||||
ctl_size;
|
||||
|
||||
/* wait until remote data is read */
|
||||
while(! ( extra_ctl_pointer->flag == tag ) ) {
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
|
||||
/* write the data into my read buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_write_pointer,
|
||||
(char *)extra_rank_write_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* copy data into the destination buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)((char *)rbuf+dt_extent*count_processed),
|
||||
(char *)my_write_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* "free" the shared-memory working buffer */
|
||||
|
@ -438,8 +438,8 @@ static int init_sm2_barrier(struct ompi_communicator_t *comm,
|
||||
/* set the pointer to the request that needs to be completed first */
|
||||
module->current_request_index=0;
|
||||
|
||||
/* set collective tag */
|
||||
module->collective_tag=0;
|
||||
/* set starting collective tag */
|
||||
module->collective_tag=1;
|
||||
|
||||
/* return - successful */
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -296,6 +296,11 @@ int setup_recursive_doubling_tree_node(int num_nodes, int node_rank,
|
||||
|
||||
}
|
||||
|
||||
/* set the number of tags needed per stripe - this must be the
|
||||
* same across all procs in the communicator.
|
||||
*/
|
||||
exchange_node->n_tags=2*n_exchanges+1;
|
||||
|
||||
/* successful return */
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user