1
1

Cleanup of generic reduce function and possible (low probability) bug fix.

- fixing line lengths and some of the comments
- possible bug fix (but I do not think we exposed it in any tests so far)
  temporary buffers were allocated as multiples of extent instead of 
  true_extent + (count -1) * extent.
Everything is still passing Intel tests over tcp and btl mx up to 64 nodes.

This commit was SVN r13956.
Этот коммит содержится в:
Jelena Pjesivac-Grbovic 2007-03-08 00:54:52 +00:00
родитель 57cbafafd5
Коммит 9780a000ba

Просмотреть файл

@ -45,9 +45,10 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
int root, ompi_communicator_t* comm,
ompi_coll_tree_t* tree, int count_by_segment )
{
char *inbuf[2] = {(char*)NULL, (char*)NULL};
char *local_op_buffer = NULL, *accumbuf = NULL, *sendtmpbuf = NULL;
ptrdiff_t extent, lower_bound, realsegsize;
char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL};
char *accumbuf = NULL, *accumbuf_free = NULL;
char *local_op_buffer = NULL, *sendtmpbuf = NULL;
ptrdiff_t extent, lower_bound, segment_increment;
size_t typelng;
ompi_request_t* reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
int num_segments, line, ret, segindex, i, rank;
@ -60,7 +61,7 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
ompi_ddt_get_extent( datatype, &lower_bound, &extent );
ompi_ddt_type_size( datatype, &typelng );
num_segments = (original_count + count_by_segment - 1) / count_by_segment;
realsegsize = count_by_segment * extent;
segment_increment = count_by_segment * extent;
sendtmpbuf = (char*) sendbuf;
if( sendbuf == MPI_IN_PLACE ) {
@ -69,19 +70,24 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
rank = ompi_comm_rank(comm);
/* non-leaf nodes - wait for children to send me data & forward up (if needed) */
/* non-leaf nodes - wait for children to send me data & forward up
(if needed) */
if( tree->tree_nextsize > 0 ) {
ptrdiff_t true_lower_bound, true_extent, real_segment_size;
ompi_ddt_get_true_extent( datatype, &true_lower_bound,
&true_extent );
/* handle non existant recv buffer (i.e. its NULL) and
protect the recv buffer on non-root nodes */
accumbuf = (char*)recvbuf;
if( (NULL == accumbuf) || (root != rank) ) {
ptrdiff_t true_lower_bound, true_extent;
ompi_ddt_get_true_extent( datatype, &true_lower_bound,
&true_extent );
/* Allocate temporary accumulator buffer.
TODO: The size of the buffer can be optimized to be segment size */
accumbuf = (char*)malloc(true_extent + (original_count - 1) * extent);
if (accumbuf == NULL) { line = __LINE__; ret = -1; goto error_hndl; }
/* Allocate temporary accumulator buffer. */
accumbuf_free = (char*)malloc(true_extent +
(original_count - 1) * extent);
if (accumbuf_free == NULL) {
line = __LINE__; ret = -1; goto error_hndl;
}
accumbuf = accumbuf_free - lower_bound;
}
/* If this is a non-commutative operation we must copy
@ -92,16 +98,22 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
(char*)sendtmpbuf);
}
/* Allocate two buffers for incoming segments */
inbuf[0] = (char*) malloc(realsegsize);
if( inbuf[0] == NULL ) { line = __LINE__; ret = -1; goto error_hndl; }
real_segment_size = true_extent + (count_by_segment - 1) * extent;
inbuf_free[0] = (char*) malloc(real_segment_size);
if( inbuf_free[0] == NULL ) {
line = __LINE__; ret = -1; goto error_hndl;
}
inbuf[0] = inbuf_free[0] - lower_bound;
/* if there is chance to overlap communication -
allocate second buffer */
if( (num_segments > 1) || (tree->tree_nextsize > 1) ) {
inbuf[1] = (char*) malloc(realsegsize);
if( inbuf[1] == NULL ) { line = __LINE__; ret = -1; goto error_hndl;}
} else {
inbuf[1] = NULL;
inbuf_free[1] = (char*) malloc(real_segment_size);
if( inbuf_free[1] == NULL ) {
line = __LINE__; ret = -1; goto error_hndl;
}
inbuf[1] = inbuf_free[1] - lower_bound;
}
/* reset input buffer index and receive count */
inbi = 0;
recvcount = 0;
@ -135,7 +147,7 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
*/
if( (ompi_op_is_commute(op)) &&
!((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_recvbuf = accumbuf + segindex * realsegsize;
local_recvbuf = accumbuf + segindex * segment_increment;
}
}
@ -143,12 +155,14 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
tree->tree_next[i],
MCA_COLL_BASE_TAG_REDUCE, comm,
&reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl;}
}
/* wait for previous req to complete, if any.
if there are no requests reqs[inbi ^1] will be MPI_REQUEST_NULL. */
if there are no requests reqs[inbi ^1] will be
MPI_REQUEST_NULL. */
/* wait on data from last child for previous segment */
ret = ompi_request_wait_all( 1, &reqs[inbi ^ 1], MPI_STATUSES_IGNORE );
ret = ompi_request_wait_all( 1, &reqs[inbi ^ 1],
MPI_STATUSES_IGNORE );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
local_op_buffer = inbuf[inbi ^ 1];
if( i > 0 ) {
@ -160,38 +174,41 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
if( 1 == i ) {
if( (ompi_op_is_commute(op)) &&
!((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_op_buffer = sendtmpbuf + segindex * realsegsize;
local_op_buffer = sendtmpbuf + segindex * segment_increment;
}
}
/* apply operation */
ompi_op_reduce(op, local_op_buffer,
accumbuf + segindex * realsegsize, recvcount,
datatype );
accumbuf + segindex * segment_increment,
recvcount, datatype );
} else if ( segindex > 0 ) {
void* accumulator = accumbuf + (segindex-1) * realsegsize;
void* accumulator = accumbuf + (segindex-1) * segment_increment;
if( tree->tree_nextsize <= 1 ) {
if( (ompi_op_is_commute(op)) &&
!((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_op_buffer = sendtmpbuf + (segindex-1) * realsegsize;
local_op_buffer = sendtmpbuf + (segindex-1) * segment_increment;
}
}
ompi_op_reduce(op, local_op_buffer, accumulator, prevcount,
datatype );
/* all reduced on available data this step (i) complete, pass to
* the next process unless your the root
/* all reduced on available data this step (i) complete,
* pass to the next process unless you are the root.
*/
if (rank != tree->tree_root) {
/* send combined/accumulated data to parent */
ret = MCA_PML_CALL( send( accumulator, prevcount, datatype,
tree->tree_prev,
ret = MCA_PML_CALL( send( accumulator, prevcount,
datatype, tree->tree_prev,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm) );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
MCA_PML_BASE_SEND_STANDARD,
comm) );
if (ret != MPI_SUCCESS) {
line = __LINE__; goto error_hndl;
}
}
/* we stop when segindex = number of segments
(i.e. we do num_segment+1 steps to allow for pipelining */
(i.e. we do num_segment+1 steps for pipelining */
if (segindex == num_segments) break;
}
@ -201,9 +218,9 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
} /* end of for each segment */
/* clean up */
if( inbuf[0] != NULL) free(inbuf[0]);
if( inbuf[1] != NULL) free(inbuf[1]);
if( (NULL == recvbuf) || (root != rank) ) free(accumbuf);
if( inbuf_free[0] != NULL) free(inbuf_free[0]);
if( inbuf_free[1] != NULL) free(inbuf_free[1]);
if( accumbuf_free != NULL ) free(accumbuf_free);
}
/* leaf nodes */
@ -211,10 +228,14 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
/* Send segmented data to parents */
segindex = 0;
while( original_count > 0 ) {
if( original_count < count_by_segment ) count_by_segment = original_count;
ret = MCA_PML_CALL( send((char*)sendbuf + segindex * realsegsize, count_by_segment,
datatype, tree->tree_prev,
MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, comm) );
if( original_count < count_by_segment ) {
count_by_segment = original_count;
}
ret = MCA_PML_CALL( send((char*)sendbuf +
segindex * segment_increment,
count_by_segment, datatype,
tree->tree_prev, MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm) );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
segindex++;
original_count -= count_by_segment;
@ -223,11 +244,12 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
return OMPI_SUCCESS;
error_hndl: /* error handler */
OPAL_OUTPUT (( ompi_coll_tuned_stream, "ERROR_HNDL: node %d file %s line %d error %d\n",
OPAL_OUTPUT (( ompi_coll_tuned_stream,
"ERROR_HNDL: node %d file %s line %d error %d\n",
rank, __FILE__, line, ret ));
if( inbuf[0] != NULL ) free(inbuf[0]);
if( inbuf[1] != NULL ) free(inbuf[1]);
if( (NULL == recvbuf) || ((root != rank) && (NULL != accumbuf)) ) free(accumbuf);
if( inbuf_free[0] != NULL ) free(inbuf_free[0]);
if( inbuf_free[1] != NULL ) free(inbuf_free[1]);
if( accumbuf_free != NULL ) free(accumbuf);
return ret;
}
@ -239,9 +261,10 @@ int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_c
*/
int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
ompi_datatype_t* datatype, ompi_op_t* op,
int root, ompi_communicator_t* comm, uint32_t segsize,
int fanout)
ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm,
uint32_t segsize, int fanout)
{
int segcount = count;
size_t typelng;
@ -256,15 +279,18 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
ompi_ddt_type_size( datatype, &typelng );
COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_chain, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype,
op, root, comm,
comm->c_coll_selected_data->cached_chain,
segcount );
}
int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
ompi_communicator_t* comm,
uint32_t segsize )
{
int segcount = count;
size_t typelng;
@ -281,14 +307,17 @@ int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf,
ompi_ddt_type_size( datatype, &typelng );
COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_pipeline, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype,
op, root, comm,
comm->c_coll_selected_data->cached_pipeline,
segcount );
}
int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
ompi_communicator_t* comm,
uint32_t segsize )
{
int segcount = count;
size_t typelng;
@ -305,14 +334,17 @@ int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf,
ompi_ddt_type_size( datatype, &typelng );
COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_bintree, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype,
op, root, comm,
comm->c_coll_selected_data->cached_bintree,
segcount );
}
int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
ompi_communicator_t* comm,
uint32_t segsize )
{
int segcount = count;
size_t typelng;
@ -329,8 +361,10 @@ int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf,
ompi_ddt_type_size( datatype, &typelng );
COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_bmtree, segcount );
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype,
op, root, comm,
comm->c_coll_selected_data->cached_bmtree,
segcount );
}
/*
@ -484,7 +518,8 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
return err;
}
/* see discussion in ompi_coll_basic_reduce_lin_intra about extent and true extend */
/* see discussion in ompi_coll_basic_reduce_lin_intra about
extent and true extent */
/* for reducing buffer allocation lengths.... */
ompi_ddt_get_extent(dtype, &lb, &extent);
@ -510,7 +545,8 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
/* Initialize the receive buffer. */
if (rank == (size - 1)) {
err = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
err = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)rbuf,
(char*)sbuf);
} else {
err = MCA_PML_CALL(recv(rbuf, count, dtype, size - 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
@ -547,7 +583,8 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
}
if (NULL != inplace_temp) {
err = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)sbuf, inplace_temp);
err = ompi_ddt_copy_content_same_ddt(dtype, count, (char*)sbuf,
inplace_temp);
free(inplace_temp);
}
if (NULL != free_buffer) {
@ -568,8 +605,9 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
* as you add methods/algorithms you must update this and the query/map routines
*
* this routine is called by the component only
* this makes sure that the mca parameters are set to their initial values and perms
* module does not call this they call the forced_getvalues routine instead
* this makes sure that the mca parameters are set to their initial values and
* perms module does not call this they call the forced_getvalues routine
* instead.
*/
int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
@ -630,18 +668,34 @@ int ompi_coll_tuned_reduce_intra_do_forced(void *sbuf, void* rbuf, int count,
comm->c_coll_selected_data->user_forced[REDUCE].algorithm));
switch (comm->c_coll_selected_data->user_forced[REDUCE].algorithm) {
case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, root, comm);
case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, root, comm);
case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, dtype, op, root, comm,
case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf,
count, dtype,
op, root, comm);
case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf,
count, dtype,
op, root,
comm);
case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count,
dtype, op, root,
comm,
comm->c_coll_selected_data->user_forced[REDUCE].segsize,
comm->c_coll_selected_data->user_forced[REDUCE].chain_fanout);
case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, dtype, op, root, comm,
case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count,
dtype, op, root,
comm,
comm->c_coll_selected_data->user_forced[REDUCE].segsize);
case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, dtype, op, root, comm,
case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count,
dtype, op, root,
comm,
comm->c_coll_selected_data->user_forced[REDUCE].segsize);
case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, dtype, op, root, comm,
case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count,
dtype, op, root,
comm,
comm->c_coll_selected_data->user_forced[REDUCE].segsize);
case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, count, dtype, op, root, comm,
case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf,
count,
dtype, op,
root, comm,
comm->c_coll_selected_data->user_forced[REDUCE].segsize);
default:
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_forced attempt to select algorithm %d when only 0-%d is valid?",
@ -661,17 +715,30 @@ int ompi_coll_tuned_reduce_intra_do_this(void *sbuf, void* rbuf, int count,
algorithm, faninout, segsize));
switch (algorithm) {
case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf, count, dtype, op, root, comm);
case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf, count, dtype, op, root, comm);
case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count, dtype, op, root, comm,
case (0): return ompi_coll_tuned_reduce_intra_dec_fixed (sbuf, rbuf,
count, dtype, op,
root, comm);
case (1): return ompi_coll_tuned_reduce_intra_basic_linear (sbuf, rbuf,
count, dtype,
op, root,
comm);
case (2): return ompi_coll_tuned_reduce_intra_chain (sbuf, rbuf, count,
dtype, op, root,
comm,
segsize, faninout);
case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count, dtype, op, root, comm,
segsize);
case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count, dtype, op, root, comm,
segsize);
case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count, dtype, op, root, comm,
segsize);
case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf, count, dtype, op, root, comm,
case (3): return ompi_coll_tuned_reduce_intra_pipeline (sbuf, rbuf, count,
dtype, op, root,
comm, segsize);
case (4): return ompi_coll_tuned_reduce_intra_binary (sbuf, rbuf, count,
dtype, op, root,
comm, segsize);
case (5): return ompi_coll_tuned_reduce_intra_binomial (sbuf, rbuf, count,
dtype, op, root,
comm, segsize);
case (6): return ompi_coll_tuned_reduce_intra_in_order_binary(sbuf, rbuf,
count,
dtype, op,
root, comm,
segsize);
default: