1
1

Big collective commit. I lightly test it, but I think it should be quite stable. Anyway,

the default decision functions (for broadcast, reduce and barrier) are based on a
high performance network (not TCP). It should give good performance (really good) for
any network having the following caracteristics: small latency (5 microseconds) and good
bandwidth (more than 1Gb/s).
+ Cleanup of the reduce algorithms, plus 2 new algorithms (binary and binomial). Now most
  of the reduce algorithms use a generic tree based function for completing the reduce.
+ Added macros for computing the trees (they are used for bcast and reduce right now).
+ Allow the usage of all 5 topologies.
+ Jelena's implementation of a binary tree that can be used for non commutative operations.
  Right now only the tree building function is there, it will get activated soon.
+ Some others minor cleanups.

This commit was SVN r12326.
Этот коммит содержится в:
George Bosilca 2006-10-26 22:53:05 +00:00
родитель 894b220fbb
Коммит ba3c247f2a
6 изменённых файлов: 466 добавлений и 171 удалений

Просмотреть файл

@ -201,6 +201,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_gatherv_inter_dec_dynamic(GATHER_ARGS);
/* Reduce */
int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment );
int ompi_coll_tuned_reduce_intra_dec_fixed(REDUCE_ARGS);
int ompi_coll_tuned_reduce_intra_dec_dynamic(REDUCE_ARGS);
int ompi_coll_tuned_reduce_intra_do_forced(REDUCE_ARGS);
@ -209,6 +210,8 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT];
int ompi_coll_tuned_reduce_intra_basic_linear(REDUCE_ARGS);
int ompi_coll_tuned_reduce_intra_chain(REDUCE_ARGS, uint32_t segsize, int fanout);
int ompi_coll_tuned_reduce_intra_pipeline(REDUCE_ARGS, uint32_t segsize);
int ompi_coll_tuned_reduce_intra_binary(REDUCE_ARGS, uint32_t segsize);
int ompi_coll_tuned_reduce_intra_binomial(REDUCE_ARGS, uint32_t segsize);
int ompi_coll_tuned_reduce_inter_dec_fixed(REDUCE_ARGS);
int ompi_coll_tuned_reduce_inter_dec_dynamic(REDUCE_ARGS);
@ -338,4 +341,59 @@ struct mca_coll_base_comm_t {
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#define COLL_TUNED_UPDATE_BINTREE( OMPI_COMM, ROOT ) \
do { \
mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \
if( !( (coll_comm->cached_bintree) \
&& (coll_comm->cached_bintree_root == (ROOT)) ) ) { \
if( coll_comm->cached_bintree ) { /* destroy previous binomial if defined */ \
ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_bintree) ); \
} \
coll_comm->cached_bintree = ompi_coll_tuned_topo_build_tree(2,(OMPI_COMM),(ROOT)); \
coll_comm->cached_bintree_root = (ROOT); \
} \
} while (0)
#define COLL_TUNED_UPDATE_BMTREE( OMPI_COMM, ROOT ) \
do { \
mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \
if( !( (coll_comm->cached_bmtree) \
&& (coll_comm->cached_bmtree_root == (ROOT)) ) ) { \
if( coll_comm->cached_bmtree ) { /* destroy previous binomial if defined */ \
ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_bmtree) ); \
} \
coll_comm->cached_bmtree = ompi_coll_tuned_topo_build_bmtree( (OMPI_COMM), (ROOT) ); \
coll_comm->cached_bmtree_root = (ROOT); \
} \
} while (0)
#define COLL_TUNED_UPDATE_PIPELINE( OMPI_COMM, ROOT ) \
do { \
mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \
if( !( (coll_comm->cached_pipeline) \
&& (coll_comm->cached_pipeline_root == (ROOT)) ) ) { \
if (coll_comm->cached_pipeline) { /* destroy previous pipeline if defined */ \
ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_pipeline) ); \
} \
coll_comm->cached_pipeline = ompi_coll_tuned_topo_build_chain( 1, (OMPI_COMM), (ROOT) ); \
coll_comm->cached_pipeline_root = (ROOT); \
} \
} while (0)
#define COLL_TUNED_UPDATE_CHAIN( OMPI_COMM, ROOT, FANOUT ) \
do { \
mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \
if( !( (coll_comm->cached_chain) \
&& (coll_comm->cached_chain_root == (ROOT)) \
&& (coll_comm->cached_chain_fanout == (FANOUT)) ) ) { \
if( coll_comm->cached_chain) { /* destroy previous chain if defined */ \
ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_chain) ); \
} \
coll_comm->cached_chain = ompi_coll_tuned_topo_build_chain((FANOUT),(OMPI_COMM),(ROOT)); \
coll_comm->cached_chain_root = (ROOT); \
coll_comm->cached_chain_fanout = (FANOUT); \
} \
} while (0)
#endif /* MCA_COLL_TUNED_EXPORT_H */

Просмотреть файл

@ -57,24 +57,8 @@ ompi_coll_tuned_bcast_intra_chain ( void *buff, int count,
return MPI_SUCCESS;
}
/*
* setup the chain topology.
* if the previous chain topology is the same, then use this cached copy
* other wise recreate it.
*/
if ((comm->c_coll_selected_data->cached_chain) && (comm->c_coll_selected_data->cached_chain_root == root)
&& (comm->c_coll_selected_data->cached_chain_fanout == chains)) {
chain = comm->c_coll_selected_data->cached_chain;
}
else {
if (comm->c_coll_selected_data->cached_chain) { /* destroy previous chain if defined */
ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_chain);
}
comm->c_coll_selected_data->cached_chain = chain = ompi_coll_tuned_topo_build_chain( chains, comm, root );
comm->c_coll_selected_data->cached_chain_root = root;
comm->c_coll_selected_data->cached_chain_fanout = chains;
}
/* setup the chain topology. */
COLL_TUNED_UPDATE_CHAIN( comm, root, chains );
ompi_ddt_type_size( datatype, &typelng );
@ -257,23 +241,8 @@ ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer,
return MPI_SUCCESS;
}
/*
* setup the tree topology.
* if the previous tree topology is the same, then use this cached copy
* other wise recreate it.
*/
if ((comm->c_coll_selected_data->cached_bintree) && (comm->c_coll_selected_data->cached_bintree_root == root)) {
tree = comm->c_coll_selected_data->cached_bintree;
}
else {
if (comm->c_coll_selected_data->cached_bintree) { /* destroy previous tree if defined */
ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_bintree);
}
comm->c_coll_selected_data->cached_bintree = tree = ompi_coll_tuned_topo_build_tree( 2, comm, root );
comm->c_coll_selected_data->cached_bintree_root = root;
}
/* setup the binary tree topology. */
COLL_TUNED_UPDATE_BINTREE( comm, root );
err = ompi_ddt_type_size( datatype, &type_size );
@ -517,22 +486,8 @@ ompi_coll_tuned_bcast_intra_bintree ( void* buffer,
return MPI_SUCCESS;
}
/*
* setup the tree topology.
* if the previous tree topology is the same, then use this cached copy
* other wise recreate it.
*/
if ((comm->c_coll_selected_data->cached_bintree) && (comm->c_coll_selected_data->cached_bintree_root == root)) {
tree = comm->c_coll_selected_data->cached_bintree;
}
else {
if (comm->c_coll_selected_data->cached_bintree) { /* destroy previous bintree if defined */
ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_bintree);
}
comm->c_coll_selected_data->cached_bintree = tree = ompi_coll_tuned_topo_build_tree( 2, comm, root );
comm->c_coll_selected_data->cached_bintree_root = root;
}
/* setup the tree topology. */
COLL_TUNED_UPDATE_BINTREE( comm, root );
err = ompi_ddt_type_size( datatype, &type_size );

Просмотреть файл

@ -61,16 +61,16 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm)
{
int comsize, rank, err;
int communicator_size, rank, err;
size_t dsize, total_dsize;
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_alltoall_intra_dec_fixed"));
comsize = ompi_comm_size(comm);
communicator_size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
/* special case */
if (comsize==2) {
if (communicator_size==2) {
return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
}
@ -81,9 +81,9 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount,
return (err);
}
total_dsize = dsize * scount * comsize; /* needed for decision */
total_dsize = dsize * scount * communicator_size; /* needed for decision */
if (comsize >= 12 && total_dsize <= 768) {
if (communicator_size >= 12 && total_dsize <= 768) {
return ompi_coll_tuned_alltoall_intra_bruck (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
}
if (total_dsize <= 131072) {
@ -102,13 +102,13 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount,
*/
int ompi_coll_tuned_barrier_intra_dec_fixed(struct ompi_communicator_t *comm)
{
int comsize;
int communicator_size;
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_barrier_intra_dec_fixed"));
comsize = ompi_comm_size(comm);
communicator_size = ompi_comm_size(comm);
if( 2 == comsize )
if( 2 == communicator_size )
return ompi_coll_tuned_barrier_intra_two_procs(comm);
/**
* Basic optimisation. If we have a power of 2 number of nodes
@ -117,8 +117,8 @@ int ompi_coll_tuned_barrier_intra_dec_fixed(struct ompi_communicator_t *comm)
*/
{
bool has_one = false;
for( ; comsize > 0; comsize >>= 1 ) {
if( comsize & 0x1 ) {
for( ; communicator_size > 0; communicator_size >>= 1 ) {
if( communicator_size & 0x1 ) {
if( has_one )
return ompi_coll_tuned_barrier_intra_bruck(comm);
has_one = true;
@ -142,13 +142,19 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count,
struct ompi_datatype_t *datatype, int root,
struct ompi_communicator_t *comm)
{
int comsize, rank, err;
const double a0 = -7.8710;
const double b0 = 41.1613;
const double a1 = 0.0150;
const double b1 = 11.2445;
const double a2 = 0.0023;
const double b2 = 3.8074;
int communicator_size, rank, err;
int segsize = 0;
size_t msgsize, dsize;
size_t message_size, dsize;
OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_dec_fixed"));
comsize = ompi_comm_size(comm);
communicator_size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
/* else we need data size for decision function */
@ -158,26 +164,64 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count,
return (err);
}
msgsize = dsize * (unsigned long)count; /* needed for decision */
message_size = dsize * (unsigned long)count; /* needed for decision */
if ((message_size <= 1024) && (communicator_size < 12)) {
/* Linear_0K */
return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm);
} else if (message_size < 8192) {
if ((communicator_size < 12) ||
(communicator_size < (a0 * (message_size / 1024.0) + b0))) {
/* Binary_0K */
segsize = 0;
} else {
/* Binary_1K */
segsize = 1024;
}
return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize);
} else if (message_size <= 35000) {
if (communicator_size <= 12) {
/* Binary_8K */
segsize = 1024 << 3;
return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize);
} else {
/* SplittedBinary_1K */
segsize = 1024;
return ompi_coll_tuned_bcast_intra_split_bintree(buff, count, datatype, root, comm, segsize);
}
} else if (communicator_size > (a1 * (message_size / 1024.0) + b1)) {
/* SplittedBinary_8K */
segsize = 1024 << 3;
return ompi_coll_tuned_bcast_intra_split_bintree(buff, count, datatype, root, comm, segsize);
}
if (communicator_size > (a2 * (message_size / 1024.0) + b2)) {
/* Pipeline_8K */
segsize = 1024 << 3;
} else {
/* Pipeline_64K */
segsize = 1024 << 6;
}
return ompi_coll_tuned_bcast_intra_pipeline (buff, count, datatype, root, comm, segsize);
#if 0
/* this is based on gige measurements */
if (comsize < 4) {
if (communicator_size < 4) {
return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm);
}
if (comsize == 4) {
if (msgsize < 524288) segsize = 0;
if (communicator_size == 4) {
if (message_size < 524288) segsize = 0;
else segsize = 16384;
return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize);
}
if (comsize <= 8 && msgsize < 4096) {
if (communicator_size <= 8 && message_size < 4096) {
return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm);
}
if (comsize > 8 && msgsize >= 32768 && msgsize < 524288) {
if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) {
segsize = 16384;
return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize);
}
if (msgsize >= 524288) {
if (message_size >= 524288) {
segsize = 16384;
return ompi_coll_tuned_bcast_intra_pipeline (buff, count, datatype, root, comm, segsize);
}
@ -185,6 +229,7 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count,
/* once tested can swap this back in */
/* return ompi_coll_tuned_bcast_intra_bmtree (buff, count, datatype, root, comm, segsize); */
return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize);
#endif /* 0 */
}
/*
@ -200,8 +245,16 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
struct ompi_op_t* op, int root,
struct ompi_communicator_t* comm)
{
int comsize, rank, err, segsize = 0, fanout = 0;
size_t msgsize, dsize;
int communicator_size, rank, err, segsize = 0;
size_t message_size, dsize;
const double a1 = 0.6016 / 1024.0; /* [1/B] */
const double b1 = 1.3496;
const double a2 = 0.0410 / 1024.0; /* [1/B] */
const double b2 = 9.7128;
const double a3 = 0.0422 / 1024.0; /* [1/B] */
const double b3 = 1.1614;
const double a4 = 0.0033 / 1024.0; /* [1/B] */
const double b4 = 1.6761;
OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_reduce_intra_dec_fixed"));
@ -212,7 +265,7 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm);
}
comsize = ompi_comm_size(comm);
communicator_size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
/* need data size for decision function */
@ -222,24 +275,58 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
return (err);
}
msgsize = dsize * count; /* needed for decision */
message_size = dsize * count; /* needed for decision */
/* for small messages use linear algorithm */
if (msgsize <= 4096) {
if (((communicator_size < 20) && (message_size < 512)) ||
((communicator_size < 10) && (message_size <= 1024))){
/* Linear_0K */
return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm);
} else if ((communicator_size < 8) && (message_size < 20480)) {
/* Binomial_0K */
segsize = 0;
fanout = comsize - 1;
return ompi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
} else if (message_size < 2048) {
/* Binary_0K */
segsize = 0;
return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
} else if (communicator_size > (a1 * message_size + b1)) {
/* Binary_1K */
segsize = 1024;
return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
} else if (communicator_size > (a2 * message_size + b2)) {
/* Pipeline_1K */
segsize = 1024;
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
} else if (communicator_size > (a3 * message_size + b3)) {
/* Binary_32K */
segsize = 32*1024;
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
}
if (communicator_size > (a4 * message_size + b4)) {
/* Pipeline_32K */
segsize = 32*1024;
} else {
/* Pipeline_64K */
segsize = 64*1024;
}
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
#if 0
/* for small messages use linear algorithm */
if (message_size <= 4096) {
segsize = 0;
fanout = communicator_size - 1;
/* when linear implemented or taken from basic put here, right now using chain as a linear system */
/* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */
return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm);
/* return ompi_coll_tuned_reduce_intra_chain (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */
}
if (msgsize < 524288) {
if (msgsize <= 65536 ) {
if (message_size < 524288) {
if (message_size <= 65536 ) {
segsize = 32768;
fanout = 8;
} else {
segsize = 1024;
fanout = comsize/2;
fanout = communicator_size/2;
}
/* later swap this for a binary tree */
/* fanout = 2; */
@ -247,4 +334,5 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf,
}
segsize = 1024;
return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize);
#endif /* 0 */
}

Просмотреть файл

@ -29,73 +29,44 @@
#include "coll_tuned.h"
#include "coll_tuned_topo.h"
/* Attention: this version of the reduce operations does not
work for:
- non-commutative operations
- segment sizes which are not multiplies of the extent of the datatype
meaning that at least one datatype must fit in the segment !
*/
int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
ompi_datatype_t* datatype, ompi_op_t* op,
int root, ompi_communicator_t* comm, uint32_t segsize,
int fanout)
/**
* This is a generic implementation of the reduce protocol. It used the tree
* provided as an argument and execute all operations using a segment of
* count times a datatype.
* For the last communication it will update the count in order to limit
* th number of datatype to the original count (original_count)
*/
int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_count,
ompi_datatype_t* datatype, ompi_op_t* op,
int root, ompi_communicator_t* comm,
ompi_coll_tree_t* tree, int count_by_segment )
{
int ret, line, rank, size, i = 0;
int recvcount, sendcount, prevcount, inbi, previnbi;
int segcount, segindex, num_segments;
char *inbuf[2] = {(char*)NULL, (char*)NULL};
char *accumbuf = (char*)NULL;
char *sendtmpbuf = (char*)NULL;
ptrdiff_t ext, lb;
char *local_op_buffer, *accumbuf, *sendtmpbuf;
ptrdiff_t extent, lower_bound;
size_t typelng, realsegsize;
ompi_request_t* reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
ompi_coll_tree_t* chain;
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", rank, fanout, segsize));
/* setup the chain topology.
* if the previous chain topology is the same, then use this cached copy
* other wise recreate it.
*/
if ((comm->c_coll_selected_data->cached_chain) && (comm->c_coll_selected_data->cached_chain_root == root)
&& (comm->c_coll_selected_data->cached_chain_fanout == fanout)) {
chain = comm->c_coll_selected_data->cached_chain;
} else {
if (comm->c_coll_selected_data->cached_chain) { /* destroy previous chain if defined */
ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_chain);
}
comm->c_coll_selected_data->cached_chain = chain = ompi_coll_tuned_topo_build_chain(fanout,comm,root);
comm->c_coll_selected_data->cached_chain_root = root;
comm->c_coll_selected_data->cached_chain_fanout = fanout;
}
int num_segments, line, ret, segindex, i, rank;
int recvcount, prevcount, inbi, previnbi;
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_get_extent( datatype, &lb, &ext );
ompi_ddt_get_extent( datatype, &lower_bound, &extent );
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
num_segments = count/segcount;
if( (count % segcount) != 0 ) num_segments++;
} else {
segcount = count;
num_segments = 1;
}
realsegsize = segcount * ext;
num_segments = (original_count + count_by_segment - 1) / count_by_segment;
realsegsize = count_by_segment * extent;
sendtmpbuf = (char*) sendbuf;
if( sendbuf == MPI_IN_PLACE ) {
sendtmpbuf = (char *)recvbuf;
}
rank = ompi_comm_rank(comm);
/* non-leaf nodes - wait for children to send me data & forward up (if needed) */
if( chain->tree_nextsize > 0 ) {
if( tree->tree_nextsize > 0 ) {
/* handle non existant recv buffer (i.e. its NULL.. like basic allreduce uses!) */
accumbuf = (char*)recvbuf;
if( NULL == accumbuf ) {
@ -108,7 +79,7 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
if( inbuf[0] == NULL ) { line = __LINE__; ret = -1; goto error_hndl; }
/* if there is chance to overlap communication -
allocate second buffer */
if( (num_segments > 1) || (chain->tree_nextsize > 1) ) {
if( (num_segments > 1) || (tree->tree_nextsize > 1) ) {
inbuf[1] = (char*) malloc(realsegsize);
if( inbuf[1] == NULL ) { line = __LINE__; ret = -1; goto error_hndl;}
} else {
@ -122,12 +93,12 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
for( segindex = 0; segindex <= num_segments; segindex++ ) {
prevcount = recvcount;
/* recvcount - number of elements in current segment */
recvcount = segcount;
recvcount = count_by_segment;
if( segindex == (num_segments-1) )
recvcount = count - segcount*segindex;
recvcount = original_count - count_by_segment * segindex;
/* for each child */
for( i = 0; i < chain->tree_nextsize; i++ ) {
for( i = 0; i < tree->tree_nextsize; i++ ) {
/**
* We try to overlap communication:
* either with next segment or with the next child
@ -144,11 +115,11 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
* BUT if we are root and are USING MPI_IN_PLACE this is wrong ek!
* check for root might not be needed as it should be checked higher up
*/
if( !((MPI_IN_PLACE==sendbuf) && (rank==root)) ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_recvbuf = accumbuf + segindex * realsegsize;
}
}
ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount,datatype, chain->tree_next[i],
ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount,datatype, tree->tree_next[i],
MCA_COLL_BASE_TAG_REDUCE, comm, &reqs[inbi]));
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
@ -157,25 +128,24 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
/* wait on data from last child for previous segment */
ret = ompi_request_wait_all( 1, &reqs[previnbi], MPI_STATUSES_IGNORE );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
local_op_buffer = inbuf[previnbi];
if( i > 0 ) {
/* our first operation is to combine our own [sendbuf] data with the data
* we recvd from down stream (but only if we are not root and not using
* MPI_IN_PLACE)
*/
void* local_op_buffer = inbuf[previnbi];
if( 1 == i ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_op_buffer = sendtmpbuf + segindex * realsegsize;
}
}
/* apply operation */
ompi_op_reduce(op, local_op_buffer, accumbuf+segindex*realsegsize, recvcount, datatype );
} else if ( segindex > 0 ) {
void* local_op_buffer = inbuf[previnbi];
void* accumulator = accumbuf + (segindex-1) * realsegsize;
if( chain->tree_nextsize <= 1 ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) {
if( tree->tree_nextsize <= 1 ) {
if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
local_op_buffer = sendtmpbuf+(segindex-1)*realsegsize;
}
}
@ -184,10 +154,10 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
/* all reduced on available data this step (i) complete, pass to
* the next process unless your the root
*/
if (rank != root) {
if (rank != tree->tree_root) {
/* send combined/accumulated data to parent */
ret = MCA_PML_CALL( send( accumulator, prevcount, datatype,
chain->tree_prev, MCA_COLL_BASE_TAG_REDUCE,
tree->tree_prev, MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm) );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
}
@ -210,20 +180,20 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
/* leaf nodes */
else {
/* Send segmented data to parents */
for (segindex = 0; segindex < num_segments; segindex++) {
if (segindex < num_segments-1) sendcount = segcount;
else sendcount = count - segindex*segcount;
ret = MCA_PML_CALL( send((char*)sendbuf+segindex*realsegsize, sendcount,
datatype, chain->tree_prev,
segindex = 0;
while( original_count > 0 ) {
if( original_count < count_by_segment ) count_by_segment = original_count;
ret = MCA_PML_CALL( send((char*)sendbuf + segindex * realsegsize, count_by_segment,
datatype, tree->tree_prev,
MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, comm) );
if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; }
segindex++;
original_count -= count_by_segment;
}
}
return OMPI_SUCCESS;
return MPI_SUCCESS;
/* error handler */
error_hndl:
error_hndl: /* error handler */
OPAL_OUTPUT (( ompi_coll_tuned_stream, "ERROR_HNDL: node %d file %s line %d error %d\n", rank, __FILE__, line, ret ));
if( inbuf[0] != NULL ) free(inbuf[0]);
if( inbuf[1] != NULL ) free(inbuf[1]);
@ -231,23 +201,123 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
return ret;
}
/* Attention: this version of the reduce operations does not
work for:
- non-commutative operations
- segment sizes which are not multiplies of the extent of the datatype
meaning that at least one datatype must fit in the segment !
*/
int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count,
ompi_datatype_t* datatype, ompi_op_t* op,
int root, ompi_communicator_t* comm, uint32_t segsize,
int fanout)
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), fanout, segsize));
COLL_TUNED_UPDATE_CHAIN( comm, root, fanout );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_chain, segcount );
}
int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
{
int rank;
int segcount;
size_t typelng;
rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_pipeline rank %d ss %5d",
ompi_comm_rank(comm), segsize));
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_pipeline rank %d ss %5d", rank, segsize));
COLL_TUNED_UPDATE_PIPELINE( comm, root );
return ompi_coll_tuned_reduce_intra_chain( sendbuf,recvbuf, count,
datatype, op, root, comm,
segsize, 1 );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_pipeline, segcount );
}
int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_binary rank %d ss %5d",
ompi_comm_rank(comm), segsize));
COLL_TUNED_UPDATE_BINTREE( comm, root );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_bintree, segcount );
}
int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf,
int count, ompi_datatype_t* datatype,
ompi_op_t* op, int root,
ompi_communicator_t* comm, uint32_t segsize )
{
int segcount;
size_t typelng;
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_binomial rank %d ss %5d",
ompi_comm_rank(comm), segsize));
COLL_TUNED_UPDATE_BMTREE( comm, root );
/**
* Determine number of segments and number of elements
* sent per operation
*/
ompi_ddt_type_size( datatype, &typelng );
if( segsize > typelng ) {
segcount = (int)(segsize / typelng);
} else {
segcount = count;
}
return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm,
comm->c_coll_selected_data->cached_bmtree, segcount );
}
/*
* Linear functions are copied from the BASIC coll module
@ -358,7 +428,6 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
}
/* Perform the reduction */
ompi_op_reduce(op, inbuf, rbuf, count, dtype);
}
@ -371,22 +440,22 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count,
}
/* All done */
return MPI_SUCCESS;
}
/* copied function (with appropriate renaming) ends here */
/* The following are used by dynamic and forced rules */
/* publish details of each algorithm and if its forced/fixed/locked in */
/* as you add methods/algorithms you must update this and the query/map routines */
/* this routine is called by the component only */
/* this makes sure that the mca parameters are set to their initial values and perms */
/* module does not call this they call the forced_getvalues routine instead */
/**
* The following are used by dynamic and forced rules
*
* publish details of each algorithm and if its forced/fixed/locked in
* as you add methods/algorithms you must update this and the query/map routines
*
* this routine is called by the component only
* this makes sure that the mca parameters are set to their initial values and perms
* module does not call this they call the forced_getvalues routine instead
*/
int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
int rc;

Просмотреть файл

@ -169,6 +169,128 @@ ompi_coll_tuned_topo_build_tree( int fanout,
return tree;
}
/*
* Constructs in-order binary tree which can be used for non-commutative reduce
* operations.
* Root of this tree is always rank (size-1) and fanout is 2.
* Here are some of the examples of this tree:
* size == 2 size = 4 size = 9
* 1 3 8
* / / \ / \
* 0 2 1 7 3
* / / \ / \
* 0 6 5 2 1
* / /
* 4 0
*/
ompi_coll_tree_t*
ompi_coll_tuned_topo_build_in_order_bintree( struct ompi_communicator_t* comm )
{
int rank, size;
int myrank, rightsize, delta;
int parent, lchild, rchild;
ompi_coll_tree_t* tree;
/*
* Get size and rank of the process in this communicator
*/
size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);
tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
if (!tree) {
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:topo_build_tree PANIC::out of memory"));
return NULL;
}
tree->tree_root = MPI_UNDEFINED;
tree->tree_nextsize = MPI_UNDEFINED;
/*
* Initialize tree
*/
tree->tree_fanout = 2;
tree->tree_bmtree = 0;
tree->tree_root = size - 1;
tree->tree_prev = -1;
tree->tree_nextsize = 0;
tree->tree_next[0] = -1;
tree->tree_next[1] = -1;
OPAL_OUTPUT((ompi_coll_tuned_stream,
"coll:tuned:topo_build_in_order_tree Building fo %d rt %d",
tree->tree_fanout, tree->tree_root));
/*
* Build the tree
*/
myrank = rank;
parent = size - 1;
delta = 0;
while ( 1 ) {
/* Compute the size of the right subtree */
rightsize = size >> 1;
/* Determine the left and right child of this parent */
lchild = -1;
rchild = -1;
if (size - 1 > 0) {
lchild = parent - 1;
if (lchild > 0) {
rchild = rightsize - 1;
}
}
/* The following cases are possible: myrank can be
- a parent,
- belong to the left subtree, or
- belong to the right subtee
Each of the cases need to be handled differently.
*/
if (myrank == parent) {
/* I am the parent:
- compute real ranks of my children, and exit the loop. */
if (lchild >= 0) tree->tree_next[0] = lchild + delta;
if (rchild >= 0) tree->tree_next[1] = rchild + delta;
break;
}
if (myrank > rchild) {
/* I belong to the left subtree:
- If I am the left child, compute real rank of my parent
- Iterate down through tree:
compute new size, shift ranks down, and update delta.
*/
if (myrank == lchild) {
tree->tree_prev = parent + delta;
}
size = size - rightsize - 1;
delta = delta + rightsize;
myrank = myrank - rightsize;
parent = size - 1;
} else {
/* I belong to the right subtree:
- If I am the right child, compute real rank of my parent
- Iterate down through tree:
compute new size and parent,
but the delta and rank do not need to change.
*/
if (myrank == rchild) {
tree->tree_prev = parent + delta;
}
size = rightsize;
parent = rchild;
}
}
if (tree->tree_next[0] >= 0) { tree->tree_nextsize = 1; }
if (tree->tree_next[1] >= 0) { tree->tree_nextsize += 1; }
return tree;
}
int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree )
{
ompi_coll_tree_t *ptr;

Просмотреть файл

@ -40,7 +40,8 @@ extern "C" {
ompi_coll_tuned_topo_build_tree( int fanout,
struct ompi_communicator_t* com,
int root );
int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree );
ompi_coll_tree_t*
ompi_coll_tuned_topo_build_in_order_bintree( struct ompi_communicator_t* comm );
ompi_coll_tree_t*
ompi_coll_tuned_topo_build_bmtree( struct ompi_communicator_t* comm,
@ -51,6 +52,8 @@ extern "C" {
struct ompi_communicator_t* com,
int root );
int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree );
/* debugging stuff, will be removed later */
int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank);