fix several race conditions.
This commit was SVN r17645.
Этот коммит содержится в:
родитель
5dc64cea6a
Коммит
5df6c6d043
@ -37,7 +37,8 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
volatile char * my_data_pointer;
|
||||
volatile char * child_data_pointer;
|
||||
volatile char * parent_data_pointer;
|
||||
char *my_base_temp_pointer, * volatile child_base_temp_pointer;
|
||||
char *my_base_temp_pointer;
|
||||
volatile char * child_base_temp_pointer;
|
||||
char * volatile parent_base_temp_pointer, * volatile root_base_temp_pointer;
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
|
||||
@ -45,10 +46,6 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
tree_node_t *my_reduction_node, *my_fanout_read_tree;
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG sm2 allreduce called r %d \n",ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
sm_module=(mca_coll_sm2_module_t *) module;
|
||||
|
||||
@ -94,17 +91,11 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
/* get a pointer to the shared-memory working buffer */
|
||||
/* NOTE: starting with a rather synchronous approach */
|
||||
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG strip_number %d r %d \n",
|
||||
stripe_number,ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
sm_buffer=alloc_sm2_shared_buffer(sm_module);
|
||||
if( NULL == sm_buffer) {
|
||||
rc=OMPI_ERR_OUT_OF_RESOURCE;
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* get number of elements to process in this stripe */
|
||||
count_this_stripe=n_dts_per_buffer;
|
||||
if( count_processed + count_this_stripe > count )
|
||||
@ -119,31 +110,17 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
my_ctl_pointer=(mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
my_base_temp_pointer;
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG before fan in r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/*
|
||||
* Fan into root phase
|
||||
/* * Fan into root phase
|
||||
*/
|
||||
|
||||
/* copy segment into shared buffer - later on will optimize to
|
||||
* eliminate extra copies.
|
||||
*/
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
my_data_pointer, sbuf+dt_extent*count_processed);
|
||||
my_data_pointer, (char *)sbuf+dt_extent*count_processed);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG copied my data to sm r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fprintf(stderr," GGGG tag %lld index %d r %d \n",
|
||||
tag,stripe_number,ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/*
|
||||
* Wait on children, and apply op to their data
|
||||
*/
|
||||
@ -160,12 +137,6 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||
child_base_temp_pointer;
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG before wait tag %lld index %d p %p r %d \n",
|
||||
tag,stripe_number,&(child_ctl_pointer->flag),
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/* wait until child flag is set */
|
||||
while(!
|
||||
(child_ctl_pointer->flag == tag &
|
||||
@ -173,21 +144,43 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
/* Note: Actually need to make progress here */
|
||||
;
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG after wait %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
if( count_this_stripe >= 3 ) {
|
||||
fprintf(stderr," HHHH child: %d %d %d me: %d %d %d\n",
|
||||
((int *)child_data_pointer)[7],
|
||||
((int *)child_data_pointer)[6],
|
||||
((int *)child_data_pointer)[5],
|
||||
((int *)my_data_pointer)[7],
|
||||
((int *)my_data_pointer)[6],
|
||||
((int *)my_data_pointer)[5]
|
||||
/*
|
||||
((int *)child_data_pointer)[count_this_stripe-3],
|
||||
((int *)child_data_pointer)[count_this_stripe-2],
|
||||
((int *)child_data_pointer)[count_this_stripe-1],
|
||||
((int *)my_data_pointer)[count_this_stripe-3],
|
||||
((int *)my_data_pointer)[count_this_stripe-2],
|
||||
((int *)my_data_pointer)[count_this_stripe-1]
|
||||
*/
|
||||
);
|
||||
}
|
||||
else if( count_this_stripe >= 2 ) {
|
||||
fprintf(stderr," HHHH child: %d %d me: %d %d \n",
|
||||
((int *)child_data_pointer)[count_this_stripe-2],
|
||||
((int *)child_data_pointer)[count_this_stripe-1],
|
||||
((int *)my_data_pointer)[count_this_stripe-2],
|
||||
((int *)my_data_pointer)[count_this_stripe-1]
|
||||
);
|
||||
}
|
||||
else {
|
||||
fprintf(stderr," HHHH child: %d me: %d \n",
|
||||
((int *)child_data_pointer)[count_this_stripe-1],
|
||||
((int *)my_data_pointer)[count_this_stripe-1]
|
||||
);
|
||||
}
|
||||
/* apply collective operation */
|
||||
ompi_op_reduce(op,child_data_pointer,my_data_pointer,
|
||||
count,dtype);
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG got data from kids r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
count_this_stripe,dtype);
|
||||
} /* end child loop */
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
@ -200,14 +193,6 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
my_ctl_pointer->flag=tag;
|
||||
my_ctl_pointer->index=stripe_number;
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG signaled parent p %p val %lld - r %d \n",
|
||||
&(my_ctl_pointer->flag),
|
||||
my_ctl_pointer->flag,
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/*
|
||||
* Fan out from root - let the memory copies at each
|
||||
* stage help reduce memory contention.
|
||||
@ -216,13 +201,8 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
/* I am the root - so copy signal children, and then
|
||||
* start reading
|
||||
*/
|
||||
MB();
|
||||
my_ctl_pointer->flag=-tag;
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG reset flag to %lld - r %d \n",
|
||||
my_ctl_pointer->flag,
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
@ -240,37 +220,18 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
parent_ctl_pointer=parent_base_temp_pointer;
|
||||
|
||||
child_ctl_pointer=
|
||||
( mca_coll_sm2_nb_request_process_shared_mem_t * volatile)
|
||||
(volatile mca_coll_sm2_nb_request_process_shared_mem_t *)
|
||||
parent_data_pointer;
|
||||
|
||||
/*
|
||||
* wait on Parent to signal that data is ready
|
||||
*/
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG waiting on parent - r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
while(!
|
||||
/* in fan-in, index was already set correctly, so
|
||||
* no need to check it
|
||||
*/
|
||||
(parent_ctl_pointer->flag == -tag ) ) {
|
||||
/* debug */
|
||||
fprintf(stderr," VVVV flag %lld index %lld - r %d \n",
|
||||
parent_ctl_pointer->flag,
|
||||
parent_ctl_pointer->index,
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
(parent_ctl_pointer->flag == -tag &
|
||||
parent_ctl_pointer->index== stripe_number) ) {
|
||||
/* Note: Actually need to make progress here */
|
||||
;
|
||||
}
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG done waiting on parent - r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
|
||||
/* copy the data to my shared buffer, for access by children */
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
@ -284,11 +245,6 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout(void *sbuf, void *rbuf, int count,
|
||||
*/
|
||||
MB();
|
||||
|
||||
/* debug */
|
||||
fprintf(stderr," GGGG about to signal children - r %d \n",
|
||||
ompi_comm_rank(comm));
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
/* signal children that they may read the result data */
|
||||
my_ctl_pointer->flag=-tag;
|
||||
|
||||
@ -350,9 +306,5 @@ int mca_coll_sm2_allreduce_intra(void *sbuf, void *rbuf, int count,
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
Error:
|
||||
/* debug */
|
||||
fprintf(stderr," EEEError from allredcue : %d \n",rc);
|
||||
fflush(stderr);
|
||||
/* end debug */
|
||||
return rc;
|
||||
}
|
||||
|
@ -130,13 +130,10 @@ static int sm2_open(void)
|
||||
/* set component priority */
|
||||
cs->sm2_priority=
|
||||
mca_coll_sm2_param_register_int("sm2_priority",0);
|
||||
/* debug */
|
||||
fprintf(stderr," DDDD cs->sm2_priority %d \n",cs->sm2_priority);
|
||||
/* end debub */
|
||||
|
||||
/* set control region size (bytes), per proc */
|
||||
cs->sm2_ctl_size_per_proc=
|
||||
mca_coll_sm2_param_register_int("sm2_ctl_size_per_proc",sizeof(int));
|
||||
mca_coll_sm2_param_register_int("sm2_ctl_size_per_proc",2*sizeof(long long));
|
||||
|
||||
/* initialize control region allocted */
|
||||
cs->sm2_ctl_size_allocated=0;
|
||||
|
@ -458,9 +458,6 @@ struct mca_coll_base_module_1_1_0_t *
|
||||
mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
{
|
||||
/* local variables */
|
||||
/* debug */
|
||||
int i,j;
|
||||
/* end debug */
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
int group_size,ret;
|
||||
size_t alignment,size,size_tot,size_tot_per_proc_per_seg;
|
||||
@ -548,8 +545,8 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
/*
|
||||
* get control region size
|
||||
*/
|
||||
/* just enough place for one flag per process */
|
||||
ctl_memory_per_proc_per_segment=sizeof(long long);
|
||||
/* just enough place for two flags per process */
|
||||
ctl_memory_per_proc_per_segment=2*sizeof(long long);
|
||||
if( mca_coll_sm2_component.sm2_ctl_size_per_proc > ctl_memory_per_proc_per_segment )
|
||||
ctl_memory_per_proc_per_segment=mca_coll_sm2_component.sm2_ctl_size_per_proc;
|
||||
ctl_memory_per_proc_per_segment=ctl_memory_per_proc_per_segment * group_size ;
|
||||
@ -834,7 +831,6 @@ char *alloc_sm2_shared_buffer(mca_coll_sm2_module_t *module)
|
||||
return_buffer=module->collective_buffer_region+
|
||||
buffer_index*module->segment_size;
|
||||
|
||||
/* return */
|
||||
return return_buffer;
|
||||
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user