change reduce-scatter/gather for non-power of 2. Spreading out the
load for the non-power of 2 phase of the reduction. This commit was SVN r18486.
Этот коммит содержится в:
родитель
f2a4b67809
Коммит
b08839f9f5
@ -410,6 +410,7 @@ int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
|
|||||||
long long tag, base_tag;
|
long long tag, base_tag;
|
||||||
sm_work_buffer_t *sm_buffer_desc;
|
sm_work_buffer_t *sm_buffer_desc;
|
||||||
volatile char * extra_rank_write_data_pointer;
|
volatile char * extra_rank_write_data_pointer;
|
||||||
|
volatile char * my_extra_write_pointer;
|
||||||
volatile char * partner_base_pointer;
|
volatile char * partner_base_pointer;
|
||||||
volatile char * my_pointer;
|
volatile char * my_pointer;
|
||||||
volatile char * my_base_pointer;
|
volatile char * my_base_pointer;
|
||||||
@ -518,22 +519,65 @@ int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
|
|||||||
|
|
||||||
/* copy data in from the "extra" source, if need be */
|
/* copy data in from the "extra" source, if need be */
|
||||||
tag=base_tag;
|
tag=base_tag;
|
||||||
if(0 < my_exchange_node->n_extra_sources) {
|
if(0 < my_exchange_node->n_extra_sources) {
|
||||||
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
|
int n_my_count;
|
||||||
extra_rank=my_exchange_node->rank_extra_source;
|
|
||||||
extra_ctl_pointer=
|
if ( EXCHANGE_NODE == my_exchange_node->node_type ) {
|
||||||
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
|
||||||
extra_rank_write_data_pointer=
|
/* signal to partner that I am ready */
|
||||||
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
MB();
|
||||||
|
/*
|
||||||
|
* Signal extra node that data is ready
|
||||||
|
*/
|
||||||
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
|
/* figure out my portion of the reduction */
|
||||||
|
n_my_count=count_this_stripe/2;
|
||||||
|
|
||||||
|
extra_rank=my_exchange_node->rank_extra_source;
|
||||||
|
extra_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
||||||
|
extra_rank_write_data_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
||||||
|
|
||||||
|
/* wait until remote data is read */
|
||||||
|
while( extra_ctl_pointer->flag < tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* apply collective operation to first half of the data */
|
||||||
|
if( 0 < n_my_count ) {
|
||||||
|
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
||||||
|
(void *)my_base_pointer, n_my_count,dtype);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* wait for my partner to finish reducing the data */
|
||||||
|
tag=base_tag+1;
|
||||||
|
while( extra_ctl_pointer->flag < tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* read my partner's data */
|
||||||
|
|
||||||
/* wait until remote data is read */
|
/* adjust read an write pointers */
|
||||||
while( extra_ctl_pointer->flag < tag ) {
|
extra_rank_write_data_pointer+=(n_my_count*dt_extent);
|
||||||
opal_progress();
|
|
||||||
}
|
if( 0 < (count_this_stripe-n_my_count) ) {
|
||||||
|
rc=ompi_ddt_copy_content_same_ddt(dtype,
|
||||||
/* apply collective operation */
|
count_this_stripe-n_my_count,
|
||||||
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
(char *)(my_base_pointer+n_my_count*dt_extent),
|
||||||
(void *)my_base_pointer, count_this_stripe,dtype);
|
(char *)extra_rank_write_data_pointer);
|
||||||
|
if( 0 != rc ) {
|
||||||
|
return OMPI_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* now we are ready for the power of 2 portion of the
|
||||||
|
* algorithm
|
||||||
|
*/
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
/* set memory barriet to make sure data is in main memory before
|
/* set memory barriet to make sure data is in main memory before
|
||||||
@ -543,10 +587,41 @@ int mca_coll_sm2_reduce_intra_reducescatter_gather(void *sbuf, void *rbuf,
|
|||||||
MB();
|
MB();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Signal parent that data is ready
|
* Signal extra node that data is ready
|
||||||
*/
|
*/
|
||||||
my_ctl_pointer->flag=tag;
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
|
/* figure out my portion of the reduction */
|
||||||
|
n_my_count=count_this_stripe-(count_this_stripe/2);
|
||||||
|
|
||||||
|
/* get the pointer to the partners data that needs to be reduced */
|
||||||
|
extra_rank=my_exchange_node->rank_extra_source;
|
||||||
|
extra_ctl_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].control_region;
|
||||||
|
extra_rank_write_data_pointer=
|
||||||
|
sm_buffer_desc->proc_memory[extra_rank].data_segment;
|
||||||
|
/* offset into my half of the data */
|
||||||
|
extra_rank_write_data_pointer+=
|
||||||
|
((count_this_stripe/2)*dt_extent);
|
||||||
|
my_extra_write_pointer=my_base_pointer+
|
||||||
|
((count_this_stripe/2)*dt_extent);
|
||||||
|
|
||||||
|
/* wait until remote data is read */
|
||||||
|
while( extra_ctl_pointer->flag < tag ) {
|
||||||
|
opal_progress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* apply collective operation to second half of the data */
|
||||||
|
if( 0 < n_my_count ) {
|
||||||
|
ompi_op_reduce(op,(void *)extra_rank_write_data_pointer,
|
||||||
|
(void *)my_extra_write_pointer, n_my_count,dtype);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* signal that I am done, so my partner can read my data */
|
||||||
|
MB();
|
||||||
|
tag=base_tag+1;
|
||||||
|
my_ctl_pointer->flag=tag;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MB();
|
MB();
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user