move some test code to another machine.
This commit was SVN r17785.
Этот коммит содержится в:
родитель
0fb6cf0916
Коммит
9131461511
@ -169,6 +169,9 @@ BEGIN_C_DECLS
|
|||||||
/* number of tags needed per stripe */
|
/* number of tags needed per stripe */
|
||||||
int n_tags;
|
int n_tags;
|
||||||
|
|
||||||
|
/* log 2 of largest full power of 2 for this node set */
|
||||||
|
int log_2;
|
||||||
|
|
||||||
/* node type */
|
/* node type */
|
||||||
int node_type;
|
int node_type;
|
||||||
|
|
||||||
|
@ -339,14 +339,17 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
/* local variables */
|
/* local variables */
|
||||||
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
int rc=OMPI_SUCCESS,n_dts_per_buffer,n_data_segments,stripe_number;
|
||||||
int pair_rank,exchange,extra_rank;
|
int pair_rank,exchange,extra_rank;
|
||||||
|
int index_read,index_write;
|
||||||
pair_exchange_node_t *my_exchange_node;
|
pair_exchange_node_t *my_exchange_node;
|
||||||
int my_rank,count_processed,count_this_stripe;
|
int my_rank,count_processed,count_this_stripe;
|
||||||
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
|
size_t message_extent,dt_extent,ctl_size,len_data_buffer;
|
||||||
long long tag, base_tag;
|
long long tag, base_tag;
|
||||||
volatile char * sm_buffer;
|
volatile char * sm_buffer;
|
||||||
|
volatile char * my_tmp_data_buffer[2];
|
||||||
volatile char * my_write_pointer;
|
volatile char * my_write_pointer;
|
||||||
volatile char * my_read_pointer;
|
volatile char * my_read_pointer;
|
||||||
volatile char * extra_rank_write_data_pointer;
|
volatile char * extra_rank_write_data_pointer;
|
||||||
|
volatile char * extra_rank_read_data_pointer;
|
||||||
volatile char * partner_read_pointer;
|
volatile char * partner_read_pointer;
|
||||||
volatile char * partner_write_pointer;
|
volatile char * partner_write_pointer;
|
||||||
char *my_base_temp_pointer;
|
char *my_base_temp_pointer;
|
||||||
@ -426,6 +429,8 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
*/
|
*/
|
||||||
my_write_pointer=my_base_temp_pointer+ctl_size;
|
my_write_pointer=my_base_temp_pointer+ctl_size;
|
||||||
my_read_pointer=my_write_pointer+len_data_buffer;
|
my_read_pointer=my_write_pointer+len_data_buffer;
|
||||||
|
my_tmp_data_buffer[0]=my_write_pointer;
|
||||||
|
my_tmp_data_buffer[1]=my_read_pointer;
|
||||||
|
|
||||||
/* copy data into the write buffer */
|
/* copy data into the write buffer */
|
||||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||||
@ -476,28 +481,22 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* fill in read buffer */
|
|
||||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
|
||||||
(char *)my_read_pointer, (char *)my_write_pointer);
|
|
||||||
if( 0 != rc ) {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
tag++;
|
|
||||||
|
|
||||||
/* set memory barriet to make sure data is in main memory before
|
|
||||||
* the completion flgas are set.
|
|
||||||
*/
|
|
||||||
|
|
||||||
MB();
|
MB();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Signal parent that data is ready
|
* Signal parent that data is ready
|
||||||
*/
|
*/
|
||||||
|
tag=base_tag+1;
|
||||||
my_ctl_pointer->flag=tag;
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
/* loop over data exchanges */
|
/* loop over data exchanges */
|
||||||
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
|
for(exchange=0 ; exchange < my_exchange_node->n_exchanges ; exchange++) {
|
||||||
|
|
||||||
|
index_read=(exchange&1);
|
||||||
|
index_write=((exchange+1)&1);
|
||||||
|
|
||||||
|
my_write_pointer=my_tmp_data_buffer[index_write];
|
||||||
|
my_read_pointer=my_tmp_data_buffer[index_read];
|
||||||
|
|
||||||
/* is the remote data read */
|
/* is the remote data read */
|
||||||
pair_rank=my_exchange_node->rank_exchanges[exchange];
|
pair_rank=my_exchange_node->rank_exchanges[exchange];
|
||||||
partner_base_temp_pointer=(char *)
|
partner_base_temp_pointer=(char *)
|
||||||
@ -507,9 +506,10 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
partner_ctl_pointer=
|
partner_ctl_pointer=
|
||||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||||
partner_base_temp_pointer;
|
partner_base_temp_pointer;
|
||||||
partner_write_pointer=(char *)partner_ctl_pointer+ctl_size;
|
partner_read_pointer=(char *)partner_ctl_pointer+ctl_size;
|
||||||
partner_read_pointer=partner_write_pointer+
|
if( 1 == index_read ) {
|
||||||
len_data_buffer;
|
partner_read_pointer+=len_data_buffer;
|
||||||
|
}
|
||||||
|
|
||||||
/* wait until remote data is read */
|
/* wait until remote data is read */
|
||||||
while( partner_ctl_pointer->flag < tag ) {
|
while( partner_ctl_pointer->flag < tag ) {
|
||||||
@ -518,35 +518,41 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
|
|
||||||
/* reduce data into my write buffer */
|
/* reduce data into my write buffer */
|
||||||
/* apply collective operation */
|
/* apply collective operation */
|
||||||
|
/*
|
||||||
ompi_op_reduce(op,(void *)partner_read_pointer,
|
ompi_op_reduce(op,(void *)partner_read_pointer,
|
||||||
(void *)my_write_pointer, count_this_stripe,dtype);
|
(void *)my_write_pointer, count_this_stripe,dtype);
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* test */
|
||||||
|
|
||||||
|
{
|
||||||
|
int ii,n_ints;
|
||||||
|
int *my_read=(int *)my_read_pointer;
|
||||||
|
int *my_write=(int *)my_write_pointer;
|
||||||
|
int *exchange_read=(int *)partner_read_pointer;
|
||||||
|
n_ints=count_this_stripe;
|
||||||
|
for(ii=0 ; ii < n_ints ; ii++ ) {
|
||||||
|
my_write[ii]=my_read[ii]+exchange_read[ii];
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* end test */
|
||||||
|
|
||||||
/* signal that I am done reading my peer's data */
|
/* signal that I am done reading my peer's data */
|
||||||
tag++;
|
tag++;
|
||||||
MB();
|
MB();
|
||||||
my_ctl_pointer->flag=tag;
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
/* can I modify my read buffer - is my pair done readin my data */
|
/* wait for my peer to finish - other wise buffers may be
|
||||||
|
* reused too early */
|
||||||
while( partner_ctl_pointer->flag < tag ) {
|
while( partner_ctl_pointer->flag < tag ) {
|
||||||
opal_progress();
|
opal_progress();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* write the data into my read buffer */
|
|
||||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
|
||||||
(char *)my_read_pointer, (char *)my_write_pointer);
|
|
||||||
if( 0 != rc ) {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* signal that I am done writing my data into the read buffer */
|
|
||||||
tag++;
|
|
||||||
MB();
|
|
||||||
my_ctl_pointer->flag=tag;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* copy data in from the "extra" source, if need be */
|
/* copy data in from the "extra" source, if need be */
|
||||||
tag=base_tag;
|
|
||||||
if(0 < my_exchange_node->n_extra_sources) {
|
if(0 < my_exchange_node->n_extra_sources) {
|
||||||
tag=base_tag+my_exchange_node->n_tags-1;
|
tag=base_tag+my_exchange_node->n_tags-1;
|
||||||
|
|
||||||
@ -560,8 +566,12 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
extra_ctl_pointer=
|
extra_ctl_pointer=
|
||||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||||
extra_rank_base_temp_pointer;
|
extra_rank_base_temp_pointer;
|
||||||
extra_rank_write_data_pointer=extra_rank_base_temp_pointer+
|
index_read=(my_exchange_node->log_2&1);
|
||||||
|
extra_rank_read_data_pointer=extra_rank_base_temp_pointer+
|
||||||
ctl_size;
|
ctl_size;
|
||||||
|
if( index_read ) {
|
||||||
|
extra_rank_read_data_pointer+=len_data_buffer;
|
||||||
|
}
|
||||||
|
|
||||||
/* wait until remote data is read */
|
/* wait until remote data is read */
|
||||||
while(! ( extra_ctl_pointer->flag == tag ) ) {
|
while(! ( extra_ctl_pointer->flag == tag ) ) {
|
||||||
@ -572,13 +582,14 @@ int mca_coll_sm2_allreduce_intra_recursive_doubling(void *sbuf, void *rbuf,
|
|||||||
/* write the data into my read buffer */
|
/* write the data into my read buffer */
|
||||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||||
(char *)my_write_pointer,
|
(char *)my_write_pointer,
|
||||||
(char *)extra_rank_write_data_pointer);
|
(char *)extra_rank_read_data_pointer);
|
||||||
if( 0 != rc ) {
|
if( 0 != rc ) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
tag=base_tag+my_exchange_node->n_tags-1;
|
||||||
/* set memory barriet to make sure data is in main memory before
|
/* set memory barriet to make sure data is in main memory before
|
||||||
* the completion flgas are set.
|
* the completion flgas are set.
|
||||||
*/
|
*/
|
||||||
|
@ -233,6 +233,7 @@ int setup_recursive_doubling_tree_node(int num_nodes, int node_rank,
|
|||||||
cnt/=tree_order;
|
cnt/=tree_order;
|
||||||
n_exchanges--;
|
n_exchanges--;
|
||||||
}
|
}
|
||||||
|
exchange_node->log_2=n_exchanges;
|
||||||
|
|
||||||
/* set node characteristics - node that is not within the largest
|
/* set node characteristics - node that is not within the largest
|
||||||
* power of 2 will just send it's data to node that will participate
|
* power of 2 will just send it's data to node that will participate
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user