1
1
This commit was SVN r18008.
Этот коммит содержится в:
Rich Graham 2008-03-28 15:10:07 +00:00
родитель 77653ac787
Коммит 90e53ca9ee
2 изменённых файлов: 69 добавлений и 24 удалений

Просмотреть файл

@ -319,7 +319,7 @@ int progress_fanin_fanout( void *sbuf, void *rbuf,
int count_processed,count_this_stripe;
ptrdiff_t dt_extent;
long long tag;
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
volatile char * my_data_pointer;
@ -329,7 +329,6 @@ int progress_fanin_fanout( void *sbuf, void *rbuf,
tree_node_t *my_reduction_node;
tree_node_t *my_fanout_read_tree;
my_fanout_parent=my_fanout_read_tree->parent_rank;
tag=reduction_desc->tag;
sm_buffer_desc=reduction_desc->shared_buffer;
my_rank=reduction_desc->my_rank;
@ -385,20 +384,24 @@ int progress_fanin_fanout( void *sbuf, void *rbuf,
* provids only 2 buffers, so can't add from two
* into a third buffer.
*/
count_processed=reduction_desc->count_processed;
count_this_stripe=reduction_desc->count_this_stripe;
/* error conditions already checed */
ompi_ddt_type_extent(dtype, &dt_extent);
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
if( STARTED == reduction_desc->status) {
/* copy-in only the first time through */
count_processed=reduction_desc->count_processed;
/* error conditions already checed */
ompi_ddt_type_extent(dtype, &dt_extent);
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
(char *)my_data_pointer,
(char *)((char *)sbuf+dt_extent*count_processed));
if( 0 != rc ) {
return OMPI_ERROR;
}
}
/*
* Wait on children, and apply op to their data
*/
n_children=my_reduction_node->n_children;
for( child=reduction_desc->n_child_loops_completed ;
child < n_children ; child++ ) {
child_rank=my_reduction_node->children_ranks[child];
@ -441,6 +444,7 @@ int progress_fanin_fanout( void *sbuf, void *rbuf,
}
REDUCTION_FANOUT:
my_fanout_parent=my_fanout_read_tree->parent_rank;
switch (my_reduction_node->my_node_type) {
case LEAF_NODE:
@ -621,8 +625,11 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
/* NOTE: need to check at communicator creation that we have enough
* temporary buffes for this
*/
for(i=0 ; i < depth_pipeline ; i++ ) {
/*
working_buffers[i].shared_buffer=alloc_sm2_shared_buffer(sm_module);
*/
working_buffers[i].status=BUFFER_AVAILABLE;
working_buffers[i].my_rank=my_rank;
working_buffers[i].my_reduction_node=my_reduction_node;
@ -654,13 +661,21 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
for( i=0 ; i < depth_pipeline ; i++ ) {
if( working_buffers[i].status != BUFFER_AVAILABLE ){
rc=progress_fanin_fanout( sbuf, rbuf, dtype, op,
&(working_buffers[buffer_index]),
&(working_buffers[i]),
sm_module->n_poll_loops, &completed);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* update the number of completed segments */
n_completed+=completed;
if( completed ) {
n_completed+=completed;
/* release of resources may be our of order, but allocation
* is ordered, and only after the pipleline tracker
* (working_buffers[]) indicates that it is complete, so
* resources will not be re-used too early
*/
rc=free_sm2_shared_buffer(sm_module);
}
}
}
@ -669,7 +684,9 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
}
/* initialize working buffer for this stripe */
working_buffers[buffer_index].status=FANIN;
working_buffers[buffer_index].shared_buffer=
alloc_sm2_shared_buffer(sm_module);
working_buffers[buffer_index].status=STARTED;
working_buffers[buffer_index].n_child_loops_completed=0;
count_processed=stripe_number*n_dts_per_buffer;
count_this_stripe=n_dts_per_buffer;
@ -689,7 +706,15 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
goto Error;
}
/* update the number of completed segments */
n_completed+=completed;
if( completed ) {
n_completed+=completed;
/* release of resources may be our of order, but allocation
* is ordered, and only after the pipleline tracker
* (working_buffers[]) indicates that it is complete, so
* resources will not be re-used too early
*/
rc=free_sm2_shared_buffer(sm_module);
}
}
@ -698,30 +723,42 @@ int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
for( i=0 ; i < depth_pipeline ; i++ ) {
if( working_buffers[i].status != BUFFER_AVAILABLE ){
rc=progress_fanin_fanout( sbuf, rbuf, dtype, op,
&(working_buffers[buffer_index]),
&(working_buffers[i]),
sm_module->n_poll_loops, &completed);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
/* update the number of completed segments */
n_completed+=completed;
if( completed ) {
n_completed+=completed;
/* release of resources may be our of order, but allocation
* is ordered, and only after the pipleline tracker
* (working_buffers[]) indicates that it is complete, so
* resources will not be re-used too early
*/
rc=free_sm2_shared_buffer(sm_module);
}
}
}
}
/* free work buffers */
/*
for(i=0 ; i < depth_pipeline ; i++ ) {
rc=free_sm2_shared_buffer(sm_module);
}
*/
/* return */
return rc;
Error:
/* free work buffers */
/*
for(i=0 ; i < depth_pipeline ; i++ ) {
rc=free_sm2_shared_buffer(sm_module);
}
*/
return rc;
}
#undef depth_pipeline
@ -1433,12 +1470,20 @@ int mca_coll_sm2_allreduce_intra(void *sbuf, void *rbuf, int count,
}
} else {
#endif /* testing */
/* Non-Commutative Operation */
rc= mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline(
sbuf, rbuf, count,dtype, op, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#if 0
/* Non-Commutative Operation */
rc= mca_coll_sm2_allreduce_intra_fanin_fanout(sbuf, rbuf, count,
dtype, op, comm, module);
if( OMPI_SUCCESS != rc ) {
goto Error;
}
#endif
#if 0 /* just for some testing */
}
#endif

Просмотреть файл

@ -153,8 +153,8 @@ static int allocate_shared_file(size_t size, char **file_name,
*/
unique_comm_id=(int)getpid();
len=asprintf(&f_name,
"%s"OPAL_PATH_SEP"sm_coll_v2%s_%0d_%0d",orte_process_info.job_session_dir,
orte_process_info.proc_session_dir,ompi_comm_get_cid(comm),unique_comm_id);
"%s"OPAL_PATH_SEP"sm_coll_v2_%0d_%0d",orte_process_info.job_session_dir,
ompi_comm_get_cid(comm),unique_comm_id);
if( 0 > len ) {
return OMPI_ERROR;
}
@ -163,8 +163,8 @@ static int allocate_shared_file(size_t size, char **file_name,
/* process initializing the file */
fd = open(*file_name, O_CREAT|O_RDWR, 0600);
if (fd < 0) {
opal_output(0,"mca_common_sm_mmap_init: open %s failed with errno=%d\n",
*file_name, errno);
opal_output(0,"mca_common_sm_mmap_init: open %s len %d failed with errno=%d\n",
*file_name, len, errno);
goto file_opened;
}
/* map the file and initialize segment state */
@ -238,8 +238,8 @@ static int allocate_shared_file(size_t size, char **file_name,
* communicators, that could have the same communicator id
*/
len=asprintf(&f_name,
"%s"OPAL_PATH_SEP"sm_coll_v2%s_%0d_%0d",orte_process_info.job_session_dir,
orte_process_info.proc_session_dir,ompi_comm_get_cid(comm),unique_comm_id);
"%s"OPAL_PATH_SEP"sm_coll_v2_%0d_%0d",orte_process_info.job_session_dir,
ompi_comm_get_cid(comm),unique_comm_id);
if( 0 > len ) {
return OMPI_ERROR;
}
@ -248,8 +248,8 @@ static int allocate_shared_file(size_t size, char **file_name,
/* open backing file */
fd = open(*file_name, O_RDWR, 0600);
if (fd < 0) {
opal_output(0,"mca_common_sm_mmap_init: open %s failed with errno=%d\n",
*file_name, errno);
opal_output(0,"mca_common_sm_mmap_init: open %s len %d failed with errno=%d\n",
*file_name, len, errno);
goto return_error;
}