checkpoint.
This commit was SVN r17985.
Этот коммит содержится в:
родитель
8e6da2ee76
Коммит
441fb9fb9e
@ -106,6 +106,11 @@ BEGIN_C_DECLS
|
||||
/** MCA parameter: order of fan-out read tree */
|
||||
int order_fanout_read_tree;
|
||||
|
||||
/*I MCA paramenter: number of polling loops to run while waiting
|
||||
* for children or parent to complete their work
|
||||
*/
|
||||
int n_poll_loops;
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
@ -355,6 +360,11 @@ BEGIN_C_DECLS
|
||||
/* recursive-doubling tree node */
|
||||
pair_exchange_node_t recursive_doubling_tree;
|
||||
|
||||
/* number of polling loops to run while waiting
|
||||
* for children or parent to complete their work
|
||||
*/
|
||||
int n_poll_loops;
|
||||
|
||||
/* collective tag */
|
||||
long long collective_tag;
|
||||
|
||||
@ -363,6 +373,60 @@ BEGIN_C_DECLS
|
||||
typedef struct mca_coll_sm2_module_t mca_coll_sm2_module_t;
|
||||
OBJ_CLASS_DECLARATION(mca_coll_sm2_module_t);
|
||||
|
||||
/*
|
||||
* struct for manageing the allreduce pipeline.
|
||||
*/
|
||||
struct mca_coll_sm2_module_allreduce_pipeline_t {
|
||||
/* pointer to shared temporary working buffer */
|
||||
sm_work_buffer_t *shared_buffer;
|
||||
|
||||
/* cached rank */
|
||||
int my_rank;
|
||||
|
||||
/* cached reduction node */
|
||||
tree_node_t *my_reduction_node;
|
||||
|
||||
/* cached fanout tree */
|
||||
tree_node_t *my_fanout_read_tree;
|
||||
|
||||
|
||||
/* staus of the buffer - determines what next to do
|
||||
* with this data
|
||||
*/
|
||||
int status;
|
||||
|
||||
/*
|
||||
* number of child loops completed - needed for
|
||||
* async progress
|
||||
*/
|
||||
int n_child_loops_completed;
|
||||
|
||||
/*
|
||||
* number of data-type elements to process
|
||||
*/
|
||||
int count_this_stripe;
|
||||
|
||||
/*
|
||||
* offset into the data type buffer, in units of data-types
|
||||
*/
|
||||
int count_processed;
|
||||
|
||||
/*
|
||||
* tag
|
||||
*/
|
||||
int tag;
|
||||
};
|
||||
typedef struct mca_coll_sm2_module_allreduce_pipeline_t
|
||||
mca_coll_sm2_module_allreduce_pipeline_t;
|
||||
OBJ_CLASS_DECLARATION(mca_coll_sm2_module_allreduce_pipeline_t);
|
||||
|
||||
enum {
|
||||
BUFFER_AVAILABLE,
|
||||
STARTED,
|
||||
FANIN,
|
||||
FANOUT
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Global component instance
|
||||
@ -424,131 +488,6 @@ BEGIN_C_DECLS
|
||||
struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module);
|
||||
|
||||
|
||||
/**
|
||||
* Macro to setup flag usage
|
||||
*/
|
||||
#define FLAG_SETUP(flag_num, flag, data) \
|
||||
(flag) = (mca_coll_sm_in_use_flag_t*) \
|
||||
(((char *) (data)->mcb_in_use_flags) + \
|
||||
((flag_num) * mca_coll_sm_component.sm_control_size))
|
||||
|
||||
/**
|
||||
* Macro to wait for the in-use flag to become idle (used by the root)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_IDLE(flag) \
|
||||
while (0 != (flag)->mcsiuf_num_procs_using) SPIN
|
||||
|
||||
/**
|
||||
* Macro to wait for a flag to indicate that it's ready for this
|
||||
* operation (used by non-root processes to know when FLAG_SET() has
|
||||
* been called)
|
||||
*/
|
||||
#define FLAG_WAIT_FOR_OP(flag, op) \
|
||||
while ((op) != flag->mcsiuf_operation_count) SPIN
|
||||
|
||||
/**
|
||||
* Macro to set an in-use flag with relevant data to claim it
|
||||
*/
|
||||
#define FLAG_RETAIN(flag, num_procs, op_count) \
|
||||
(flag)->mcsiuf_num_procs_using = (num_procs); \
|
||||
(flag)->mcsiuf_operation_count = (op_count)
|
||||
|
||||
/**
|
||||
* Macro to release an in-use flag from this process
|
||||
*/
|
||||
#define FLAG_RELEASE(flag) \
|
||||
opal_atomic_add(&(flag)->mcsiuf_num_procs_using, -1)
|
||||
|
||||
/**
|
||||
* Macro to copy a single segment in from a user buffer to a shared
|
||||
* segment
|
||||
*/
|
||||
#define COPY_FRAGMENT_IN(convertor, index, rank, iov, max_data) \
|
||||
(iov).iov_base = \
|
||||
(index)->mcbmi_data + \
|
||||
((rank) * mca_coll_sm_component.sm_fragment_size); \
|
||||
(max_data) = (iov).iov_len = mca_coll_sm_component.sm_fragment_size; \
|
||||
ompi_convertor_pack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||
&(max_data) )
|
||||
|
||||
/**
|
||||
* Macro to copy a single segment out from a shared segment to a user
|
||||
* buffer
|
||||
*/
|
||||
#define COPY_FRAGMENT_OUT(convertor, src_rank, index, iov, max_data) \
|
||||
(iov).iov_base = (((char*) (index)->mcbmi_data) + \
|
||||
((src_rank) * mca_coll_sm_component.sm_fragment_size)); \
|
||||
ompi_convertor_unpack(&(convertor), &(iov), &mca_coll_sm_iov_size, \
|
||||
&(max_data) )
|
||||
|
||||
/**
|
||||
* Macro to memcpy a fragment between one shared segment and another
|
||||
*/
|
||||
#define COPY_FRAGMENT_BETWEEN(src_rank, dest_rank, index, len) \
|
||||
memcpy(((index)->mcbmi_data + \
|
||||
((dest_rank) * mca_coll_sm_component.sm_fragment_size)), \
|
||||
((index)->mcbmi_data + \
|
||||
((src_rank) * \
|
||||
mca_coll_sm_component.sm_fragment_size)), \
|
||||
(len))
|
||||
|
||||
/**
|
||||
* Macro to tell children that a segment is ready (normalize
|
||||
* the child's ID based on the shift used to calculate the "me" node
|
||||
* in the tree). Used in fan out opertations.
|
||||
*/
|
||||
#define PARENT_NOTIFY_CHILDREN(children, num_children, index, value) \
|
||||
do { \
|
||||
for (i = 0; i < (num_children); ++i) { \
|
||||
*((size_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(((children)[i]->mcstn_id + root) % size)))) = (value); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
* Macro for childen to wait for parent notification (use real rank).
|
||||
* Save the value passed and then reset it when done. Used in fan out
|
||||
* operations.
|
||||
*/
|
||||
#define CHILD_WAIT_FOR_NOTIFY(rank, index, value) \
|
||||
do { \
|
||||
uint32_t volatile *ptr = ((uint32_t*) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
((rank) * mca_coll_sm_component.sm_control_size))); \
|
||||
while (0 == *ptr) SPIN; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
* Macro for children to tell parent that the data is ready in their
|
||||
* segment. Used for fan in operations.
|
||||
*/
|
||||
#define CHILD_NOTIFY_PARENT(child_rank, parent_rank, index, value) \
|
||||
((size_t volatile *) \
|
||||
(((char*) (index)->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(parent_rank))))[(child_rank)] = (value)
|
||||
|
||||
/**
|
||||
* Macro for parent to wait for a specific child to tell it that the
|
||||
* data is in the child's segment. Save the value when done. Used
|
||||
* for fan in operations.
|
||||
*/
|
||||
#define PARENT_WAIT_FOR_NOTIFY_SPECIFIC(child_rank, parent_rank, index, value) \
|
||||
do { \
|
||||
size_t volatile *ptr = ((size_t volatile *) \
|
||||
(((char*) index->mcbmi_control) + \
|
||||
(mca_coll_sm_component.sm_control_size * \
|
||||
(parent_rank)))) + child_rank; \
|
||||
while (0 == *ptr) SPIN; \
|
||||
(value) = *ptr; \
|
||||
*ptr = 0; \
|
||||
} while (0)
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* MCA_COLL_SM2_EXPORT_H */
|
||||
|
@ -300,6 +300,432 @@ Error:
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* fanin/fanout progress function.
|
||||
*/
|
||||
|
||||
static
|
||||
int progress_fanin_fanout( void *sbuf, void *rbuf,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
|
||||
mca_coll_sm2_module_allreduce_pipeline_t *reduction_desc,
|
||||
int n_poll_loops, int *completed)
|
||||
{
|
||||
/* local variables */
|
||||
|
||||
int my_rank,cnt;
|
||||
int rc=OMPI_SUCCESS;
|
||||
int my_fanout_parent;
|
||||
int child_rank,n_children,child;
|
||||
int count_processed,count_this_stripe;
|
||||
ptrdiff_t dt_extent;
|
||||
long long tag;
|
||||
mca_coll_sm2_nb_request_process_shared_mem_t *my_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * parent_ctl_pointer;
|
||||
volatile mca_coll_sm2_nb_request_process_shared_mem_t * child_ctl_pointer;
|
||||
volatile char * my_data_pointer;
|
||||
volatile char * parent_data_pointer;
|
||||
volatile char * child_data_pointer;
|
||||
sm_work_buffer_t *sm_buffer_desc;
|
||||
tree_node_t *my_reduction_node;
|
||||
tree_node_t *my_fanout_read_tree;
|
||||
|
||||
my_fanout_parent=my_fanout_read_tree->parent_rank;
|
||||
tag=reduction_desc->tag;
|
||||
sm_buffer_desc=reduction_desc->shared_buffer;
|
||||
my_rank=reduction_desc->my_rank;
|
||||
my_reduction_node=reduction_desc->my_reduction_node;
|
||||
my_fanout_read_tree=reduction_desc->my_fanout_read_tree;
|
||||
/* initialize flag indicating that segment is still active in the
|
||||
* reduction
|
||||
*/
|
||||
*completed=0;
|
||||
|
||||
my_ctl_pointer=sm_buffer_desc->proc_memory[my_rank].control_region;
|
||||
my_data_pointer=sm_buffer_desc->proc_memory[my_rank].data_segment;
|
||||
|
||||
/* figure out where to proceed */
|
||||
if( FANOUT == reduction_desc->status) {
|
||||
goto REDUCTION_FANOUT;
|
||||
}
|
||||
/*
|
||||
* fan in
|
||||
*/
|
||||
switch (my_reduction_node->my_node_type) {
|
||||
case LEAF_NODE:
|
||||
/* leaf node */
|
||||
/* copy segment into shared buffer - later on will optimize to
|
||||
* eliminate extra copies.
|
||||
*/
|
||||
count_processed=reduction_desc->count_processed;
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
/* error conditions already checed */
|
||||
ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
/* ROOT_NODE and INTERIOR_NODE */
|
||||
/* copy segment into shared buffer - ompi_op_reduce
|
||||
* provids only 2 buffers, so can't add from two
|
||||
* into a third buffer.
|
||||
*/
|
||||
count_processed=reduction_desc->count_processed;
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
/* error conditions already checed */
|
||||
ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,
|
||||
(char *)((char *)sbuf+dt_extent*count_processed));
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait on children, and apply op to their data
|
||||
*/
|
||||
for( child=reduction_desc->n_child_loops_completed ;
|
||||
child < n_children ; child++ ) {
|
||||
child_rank=my_reduction_node->children_ranks[child];
|
||||
|
||||
child_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[child_rank].control_region;
|
||||
child_data_pointer=
|
||||
sm_buffer_desc->proc_memory[child_rank].data_segment;
|
||||
|
||||
/* wait until child flag is set */
|
||||
cnt=0;
|
||||
while( child_ctl_pointer->flag != tag ) {
|
||||
opal_progress();
|
||||
cnt++;
|
||||
if( n_poll_loops == cnt ) {
|
||||
/* break out */
|
||||
reduction_desc->status=FANIN;
|
||||
reduction_desc->n_child_loops_completed=child;
|
||||
goto RETURN;
|
||||
}
|
||||
}
|
||||
|
||||
/* apply collective operation */
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
ompi_op_reduce(op,(void *)child_data_pointer,
|
||||
(void *)my_data_pointer, count_this_stripe,dtype);
|
||||
|
||||
} /* end child loop */
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/*
|
||||
* Signal parent that data is ready
|
||||
*/
|
||||
my_ctl_pointer->flag=tag;
|
||||
|
||||
}
|
||||
|
||||
REDUCTION_FANOUT:
|
||||
switch (my_reduction_node->my_node_type) {
|
||||
case LEAF_NODE:
|
||||
|
||||
parent_data_pointer=
|
||||
sm_buffer_desc->proc_memory[my_fanout_parent].data_segment;
|
||||
parent_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
|
||||
|
||||
/*
|
||||
* wait on Parent to signal that data is ready
|
||||
*/
|
||||
cnt=0;
|
||||
while(parent_ctl_pointer->flag != -tag) {
|
||||
opal_progress();
|
||||
cnt++;
|
||||
if( n_poll_loops == cnt ) {
|
||||
/* break out */
|
||||
reduction_desc->status=FANOUT;
|
||||
goto RETURN;
|
||||
}
|
||||
}
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
count_processed=reduction_desc->count_processed;
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
/* error conditions already checed */
|
||||
ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)rbuf+dt_extent*count_processed,
|
||||
(char *)parent_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case INTERIOR_NODE:
|
||||
|
||||
/* interior nodes */
|
||||
|
||||
parent_data_pointer=
|
||||
sm_buffer_desc->proc_memory[my_fanout_parent].data_segment;
|
||||
parent_ctl_pointer=
|
||||
sm_buffer_desc->proc_memory[my_fanout_parent].control_region;
|
||||
|
||||
/*
|
||||
* wait on Parent to signal that data is ready
|
||||
*/
|
||||
cnt=0;
|
||||
while(parent_ctl_pointer->flag != -tag) {
|
||||
opal_progress();
|
||||
cnt++;
|
||||
if( n_poll_loops == cnt ) {
|
||||
/* break out */
|
||||
reduction_desc->status=FANOUT;
|
||||
goto RETURN;
|
||||
}
|
||||
}
|
||||
|
||||
/* copy the data to my shared buffer, for access by children */
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)my_data_pointer,(char *)parent_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* set memory barriet to make sure data is in main memory before
|
||||
* the completion flgas are set.
|
||||
*/
|
||||
MB();
|
||||
|
||||
/* signal children that they may read the result data */
|
||||
my_ctl_pointer->flag=-tag;
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
count_processed=reduction_desc->count_processed;
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
/* error conditions already checed */
|
||||
ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)rbuf+dt_extent*count_processed,
|
||||
(char *)my_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case ROOT_NODE:
|
||||
|
||||
/* I am the root - so copy signal children, and then
|
||||
* start reading
|
||||
*/
|
||||
MB();
|
||||
my_ctl_pointer->flag=-tag;
|
||||
|
||||
/* copy data to user supplied buffer */
|
||||
count_processed=reduction_desc->count_processed;
|
||||
count_this_stripe=reduction_desc->count_this_stripe;
|
||||
/* error conditions already checed */
|
||||
ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
rc=ompi_ddt_copy_content_same_ddt(dtype, count_this_stripe,
|
||||
(char *)((char *)rbuf+dt_extent*count_processed),
|
||||
(char *)my_data_pointer);
|
||||
if( 0 != rc ) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/* completed processing the data in this stripe */
|
||||
*completed=1;
|
||||
|
||||
/* mark the descriptor as available */
|
||||
reduction_desc->status=BUFFER_AVAILABLE;
|
||||
|
||||
/* return */
|
||||
RETURN:
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shared memory blocking allreduce - pipeline algorithm.
|
||||
*/
|
||||
#define depth_pipeline 2
|
||||
|
||||
static
|
||||
int mca_coll_sm2_allreduce_intra_fanin_fanout_pipeline
|
||||
(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op, struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_module_1_1_0_t *module)
|
||||
{
|
||||
|
||||
/* local variables */
|
||||
int i,buffer_index,stripe_number,my_rank,n_completed,completed;
|
||||
int count_processed,count_this_stripe;
|
||||
mca_coll_sm2_module_allreduce_pipeline_t working_buffers[depth_pipeline];
|
||||
int rc=OMPI_SUCCESS;
|
||||
long long tag;
|
||||
tree_node_t *my_reduction_node, *my_fanout_read_tree;
|
||||
mca_coll_sm2_module_t *sm_module;
|
||||
int n_dts_per_buffer,n_data_segments;
|
||||
size_t len_data_buffer;
|
||||
ptrdiff_t dt_extent;
|
||||
|
||||
sm_module=(mca_coll_sm2_module_t *) module;
|
||||
|
||||
/* get size of data needed - same layout as user data, so that
|
||||
* we can apply the reudction routines directly on these buffers
|
||||
*/
|
||||
rc=ompi_ddt_type_extent(dtype, &dt_extent);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* lenght of control and data regions */
|
||||
len_data_buffer=sm_module->data_memory_per_proc_per_segment;
|
||||
|
||||
/* number of data types copies that the scratch buffer can hold */
|
||||
n_dts_per_buffer=((int) len_data_buffer)/dt_extent;
|
||||
if ( 0 == n_dts_per_buffer ) {
|
||||
rc=OMPI_ERROR;
|
||||
goto Error;
|
||||
}
|
||||
|
||||
/* compute number of stripes needed to process this collective */
|
||||
n_data_segments=(count+n_dts_per_buffer -1 ) / n_dts_per_buffer ;
|
||||
|
||||
/* get my node for the reduction tree */
|
||||
my_rank=ompi_comm_rank(comm);
|
||||
my_reduction_node=&(sm_module->reduction_tree[my_rank]);
|
||||
my_fanout_read_tree=&(sm_module->fanout_read_tree[my_rank]);
|
||||
count_processed=0;
|
||||
|
||||
/* get the working data segments */
|
||||
/* NOTE: need to check at communicator creation that we have enough
|
||||
* temporary buffes for this
|
||||
*/
|
||||
for(i=0 ; i < depth_pipeline ; i++ ) {
|
||||
working_buffers[i].shared_buffer=alloc_sm2_shared_buffer(sm_module);
|
||||
working_buffers[i].status=BUFFER_AVAILABLE;
|
||||
working_buffers[i].my_rank=my_rank;
|
||||
working_buffers[i].my_reduction_node=my_reduction_node;
|
||||
working_buffers[i].my_fanout_read_tree=my_fanout_read_tree;
|
||||
}
|
||||
|
||||
n_completed=0;
|
||||
buffer_index=-1;
|
||||
/* loop over data segments */
|
||||
for( stripe_number=0 ; stripe_number < n_data_segments ; stripe_number++ ) {
|
||||
|
||||
/*
|
||||
* allocate working buffer
|
||||
*/
|
||||
|
||||
/* get working_buffers index - this needs to be deterministic,
|
||||
* as each process is getting this pointer on it's own, so all
|
||||
* need to point to the same data structure
|
||||
*/
|
||||
buffer_index++;
|
||||
/* wrap around */
|
||||
if( buffer_index == depth_pipeline){
|
||||
buffer_index=0;
|
||||
}
|
||||
|
||||
/* wait for buffer to become available */
|
||||
while ( working_buffers[buffer_index].status != BUFFER_AVAILABLE ) {
|
||||
/* loop over working buffers, and progress the reduction */
|
||||
for( i=0 ; i < depth_pipeline ; i++ ) {
|
||||
if( working_buffers[i].status != BUFFER_AVAILABLE ){
|
||||
rc=progress_fanin_fanout( sbuf, rbuf, dtype, op,
|
||||
&(working_buffers[buffer_index]),
|
||||
sm_module->n_poll_loops, &completed);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
/* update the number of completed segments */
|
||||
n_completed+=completed;
|
||||
}
|
||||
}
|
||||
|
||||
/* overall ompi progress */
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
/* initialize working buffer for this stripe */
|
||||
working_buffers[buffer_index].status=FANIN;
|
||||
working_buffers[buffer_index].n_child_loops_completed=0;
|
||||
count_processed=stripe_number*n_dts_per_buffer;
|
||||
count_this_stripe=n_dts_per_buffer;
|
||||
if( count_processed + count_this_stripe > count )
|
||||
count_this_stripe=count-count_processed;
|
||||
working_buffers[buffer_index].count_this_stripe=count_this_stripe;
|
||||
working_buffers[buffer_index].count_processed=count_processed;
|
||||
tag=sm_module->collective_tag;
|
||||
sm_module->collective_tag++;
|
||||
working_buffers[buffer_index].tag=tag;
|
||||
|
||||
/* progress this stripe */
|
||||
rc=progress_fanin_fanout( sbuf, rbuf, dtype, op,
|
||||
&(working_buffers[buffer_index]),
|
||||
sm_module->n_poll_loops, &completed);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
/* update the number of completed segments */
|
||||
n_completed+=completed;
|
||||
|
||||
}
|
||||
|
||||
/* progress remaining data stripes */
|
||||
while( n_completed < n_data_segments ) {
|
||||
for( i=0 ; i < depth_pipeline ; i++ ) {
|
||||
if( working_buffers[i].status != BUFFER_AVAILABLE ){
|
||||
rc=progress_fanin_fanout( sbuf, rbuf, dtype, op,
|
||||
&(working_buffers[buffer_index]),
|
||||
sm_module->n_poll_loops, &completed);
|
||||
if( OMPI_SUCCESS != rc ) {
|
||||
goto Error;
|
||||
}
|
||||
/* update the number of completed segments */
|
||||
n_completed+=completed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* free work buffers */
|
||||
for(i=0 ; i < depth_pipeline ; i++ ) {
|
||||
rc=free_sm2_shared_buffer(sm_module);
|
||||
}
|
||||
|
||||
/* return */
|
||||
return rc;
|
||||
|
||||
Error:
|
||||
/* free work buffers */
|
||||
for(i=0 ; i < depth_pipeline ; i++ ) {
|
||||
rc=free_sm2_shared_buffer(sm_module);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
#undef depth_pipeline
|
||||
|
||||
|
||||
/**
|
||||
* Shared memory blocking allreduce.
|
||||
|
@ -170,6 +170,12 @@ static int sm2_open(void)
|
||||
cs->order_fanout_read_tree=
|
||||
mca_coll_sm2_param_register_int("order_fanout_read_tree",4);
|
||||
|
||||
/* number of polling loops to allow pending resources to
|
||||
* complete their work
|
||||
*/
|
||||
cs->n_poll_loops=
|
||||
mca_coll_sm2_param_register_int("n_poll_loops",4);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -566,6 +566,7 @@ mca_coll_sm2_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
|
||||
sm_module->module_comm=comm;
|
||||
sm_module->comm_size=group_size;
|
||||
sm_module->n_poll_loops=mca_coll_sm2_component.n_poll_loops;
|
||||
|
||||
/*
|
||||
* set memory region parameters
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user