diff --git a/ompi/mca/coll/tuned/coll_tuned.h b/ompi/mca/coll/tuned/coll_tuned.h index 363c1dda05..5fd2d35c04 100644 --- a/ompi/mca/coll/tuned/coll_tuned.h +++ b/ompi/mca/coll/tuned/coll_tuned.h @@ -201,6 +201,7 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_gatherv_inter_dec_dynamic(GATHER_ARGS); /* Reduce */ + int ompi_coll_tuned_reduce_generic( REDUCE_ARGS, ompi_coll_tree_t* tree, int count_by_segment ); int ompi_coll_tuned_reduce_intra_dec_fixed(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_dec_dynamic(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_do_forced(REDUCE_ARGS); @@ -209,6 +210,8 @@ extern int ompi_coll_tuned_forced_max_algorithms[COLLCOUNT]; int ompi_coll_tuned_reduce_intra_basic_linear(REDUCE_ARGS); int ompi_coll_tuned_reduce_intra_chain(REDUCE_ARGS, uint32_t segsize, int fanout); int ompi_coll_tuned_reduce_intra_pipeline(REDUCE_ARGS, uint32_t segsize); + int ompi_coll_tuned_reduce_intra_binary(REDUCE_ARGS, uint32_t segsize); + int ompi_coll_tuned_reduce_intra_binomial(REDUCE_ARGS, uint32_t segsize); int ompi_coll_tuned_reduce_inter_dec_fixed(REDUCE_ARGS); int ompi_coll_tuned_reduce_inter_dec_dynamic(REDUCE_ARGS); @@ -338,4 +341,59 @@ struct mca_coll_base_comm_t { #if defined(c_plusplus) || defined(__cplusplus) } #endif + +#define COLL_TUNED_UPDATE_BINTREE( OMPI_COMM, ROOT ) \ +do { \ + mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \ + if( !( (coll_comm->cached_bintree) \ + && (coll_comm->cached_bintree_root == (ROOT)) ) ) { \ + if( coll_comm->cached_bintree ) { /* destroy previous binomial if defined */ \ + ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_bintree) ); \ + } \ + coll_comm->cached_bintree = ompi_coll_tuned_topo_build_tree(2,(OMPI_COMM),(ROOT)); \ + coll_comm->cached_bintree_root = (ROOT); \ + } \ +} while (0) + +#define COLL_TUNED_UPDATE_BMTREE( OMPI_COMM, ROOT ) \ +do { \ + mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \ + if( !( (coll_comm->cached_bmtree) \ + && (coll_comm->cached_bmtree_root == (ROOT)) ) ) { \ + if( coll_comm->cached_bmtree ) { /* destroy previous binomial if defined */ \ + ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_bmtree) ); \ + } \ + coll_comm->cached_bmtree = ompi_coll_tuned_topo_build_bmtree( (OMPI_COMM), (ROOT) ); \ + coll_comm->cached_bmtree_root = (ROOT); \ + } \ +} while (0) + +#define COLL_TUNED_UPDATE_PIPELINE( OMPI_COMM, ROOT ) \ +do { \ + mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \ + if( !( (coll_comm->cached_pipeline) \ + && (coll_comm->cached_pipeline_root == (ROOT)) ) ) { \ + if (coll_comm->cached_pipeline) { /* destroy previous pipeline if defined */ \ + ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_pipeline) ); \ + } \ + coll_comm->cached_pipeline = ompi_coll_tuned_topo_build_chain( 1, (OMPI_COMM), (ROOT) ); \ + coll_comm->cached_pipeline_root = (ROOT); \ + } \ +} while (0) + +#define COLL_TUNED_UPDATE_CHAIN( OMPI_COMM, ROOT, FANOUT ) \ +do { \ + mca_coll_base_comm_t* coll_comm = (OMPI_COMM)->c_coll_selected_data; \ + if( !( (coll_comm->cached_chain) \ + && (coll_comm->cached_chain_root == (ROOT)) \ + && (coll_comm->cached_chain_fanout == (FANOUT)) ) ) { \ + if( coll_comm->cached_chain) { /* destroy previous chain if defined */ \ + ompi_coll_tuned_topo_destroy_tree( &(coll_comm->cached_chain) ); \ + } \ + coll_comm->cached_chain = ompi_coll_tuned_topo_build_chain((FANOUT),(OMPI_COMM),(ROOT)); \ + coll_comm->cached_chain_root = (ROOT); \ + coll_comm->cached_chain_fanout = (FANOUT); \ + } \ +} while (0) + #endif /* MCA_COLL_TUNED_EXPORT_H */ diff --git a/ompi/mca/coll/tuned/coll_tuned_bcast.c b/ompi/mca/coll/tuned/coll_tuned_bcast.c index 269acc8923..a392707738 100644 --- a/ompi/mca/coll/tuned/coll_tuned_bcast.c +++ b/ompi/mca/coll/tuned/coll_tuned_bcast.c @@ -57,24 +57,8 @@ ompi_coll_tuned_bcast_intra_chain ( void *buff, int count, return MPI_SUCCESS; } - /* - * setup the chain topology. - * if the previous chain topology is the same, then use this cached copy - * other wise recreate it. - */ - - if ((comm->c_coll_selected_data->cached_chain) && (comm->c_coll_selected_data->cached_chain_root == root) - && (comm->c_coll_selected_data->cached_chain_fanout == chains)) { - chain = comm->c_coll_selected_data->cached_chain; - } - else { - if (comm->c_coll_selected_data->cached_chain) { /* destroy previous chain if defined */ - ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_chain); - } - comm->c_coll_selected_data->cached_chain = chain = ompi_coll_tuned_topo_build_chain( chains, comm, root ); - comm->c_coll_selected_data->cached_chain_root = root; - comm->c_coll_selected_data->cached_chain_fanout = chains; - } + /* setup the chain topology. */ + COLL_TUNED_UPDATE_CHAIN( comm, root, chains ); ompi_ddt_type_size( datatype, &typelng ); @@ -257,23 +241,8 @@ ompi_coll_tuned_bcast_intra_split_bintree ( void* buffer, return MPI_SUCCESS; } - /* - * setup the tree topology. - * if the previous tree topology is the same, then use this cached copy - * other wise recreate it. - */ - - if ((comm->c_coll_selected_data->cached_bintree) && (comm->c_coll_selected_data->cached_bintree_root == root)) { - tree = comm->c_coll_selected_data->cached_bintree; - } - else { - if (comm->c_coll_selected_data->cached_bintree) { /* destroy previous tree if defined */ - ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_bintree); - } - comm->c_coll_selected_data->cached_bintree = tree = ompi_coll_tuned_topo_build_tree( 2, comm, root ); - comm->c_coll_selected_data->cached_bintree_root = root; - } - + /* setup the binary tree topology. */ + COLL_TUNED_UPDATE_BINTREE( comm, root ); err = ompi_ddt_type_size( datatype, &type_size ); @@ -517,22 +486,8 @@ ompi_coll_tuned_bcast_intra_bintree ( void* buffer, return MPI_SUCCESS; } - /* - * setup the tree topology. - * if the previous tree topology is the same, then use this cached copy - * other wise recreate it. - */ - - if ((comm->c_coll_selected_data->cached_bintree) && (comm->c_coll_selected_data->cached_bintree_root == root)) { - tree = comm->c_coll_selected_data->cached_bintree; - } - else { - if (comm->c_coll_selected_data->cached_bintree) { /* destroy previous bintree if defined */ - ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_bintree); - } - comm->c_coll_selected_data->cached_bintree = tree = ompi_coll_tuned_topo_build_tree( 2, comm, root ); - comm->c_coll_selected_data->cached_bintree_root = root; - } + /* setup the tree topology. */ + COLL_TUNED_UPDATE_BINTREE( comm, root ); err = ompi_ddt_type_size( datatype, &type_size ); diff --git a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c index f08298fd41..be830e91b8 100644 --- a/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c +++ b/ompi/mca/coll/tuned/coll_tuned_decision_fixed.c @@ -61,16 +61,16 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm) { - int comsize, rank, err; + int communicator_size, rank, err; size_t dsize, total_dsize; OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_alltoall_intra_dec_fixed")); - comsize = ompi_comm_size(comm); + communicator_size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); /* special case */ - if (comsize==2) { + if (communicator_size==2) { return ompi_coll_tuned_alltoall_intra_two_procs (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm); } @@ -81,9 +81,9 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount, return (err); } - total_dsize = dsize * scount * comsize; /* needed for decision */ + total_dsize = dsize * scount * communicator_size; /* needed for decision */ - if (comsize >= 12 && total_dsize <= 768) { + if (communicator_size >= 12 && total_dsize <= 768) { return ompi_coll_tuned_alltoall_intra_bruck (sbuf, scount, sdtype, rbuf, rcount, rdtype, comm); } if (total_dsize <= 131072) { @@ -102,13 +102,13 @@ int ompi_coll_tuned_alltoall_intra_dec_fixed(void *sbuf, int scount, */ int ompi_coll_tuned_barrier_intra_dec_fixed(struct ompi_communicator_t *comm) { - int comsize; + int communicator_size; OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_barrier_intra_dec_fixed")); - comsize = ompi_comm_size(comm); + communicator_size = ompi_comm_size(comm); - if( 2 == comsize ) + if( 2 == communicator_size ) return ompi_coll_tuned_barrier_intra_two_procs(comm); /** * Basic optimisation. If we have a power of 2 number of nodes @@ -117,8 +117,8 @@ int ompi_coll_tuned_barrier_intra_dec_fixed(struct ompi_communicator_t *comm) */ { bool has_one = false; - for( ; comsize > 0; comsize >>= 1 ) { - if( comsize & 0x1 ) { + for( ; communicator_size > 0; communicator_size >>= 1 ) { + if( communicator_size & 0x1 ) { if( has_one ) return ompi_coll_tuned_barrier_intra_bruck(comm); has_one = true; @@ -142,13 +142,19 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count, struct ompi_datatype_t *datatype, int root, struct ompi_communicator_t *comm) { - int comsize, rank, err; + const double a0 = -7.8710; + const double b0 = 41.1613; + const double a1 = 0.0150; + const double b1 = 11.2445; + const double a2 = 0.0023; + const double b2 = 3.8074; + int communicator_size, rank, err; int segsize = 0; - size_t msgsize, dsize; + size_t message_size, dsize; OPAL_OUTPUT((ompi_coll_tuned_stream,"ompi_coll_tuned_bcast_intra_dec_fixed")); - comsize = ompi_comm_size(comm); + communicator_size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); /* else we need data size for decision function */ @@ -158,26 +164,64 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count, return (err); } - msgsize = dsize * (unsigned long)count; /* needed for decision */ + message_size = dsize * (unsigned long)count; /* needed for decision */ + if ((message_size <= 1024) && (communicator_size < 12)) { + /* Linear_0K */ + return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm); + } else if (message_size < 8192) { + if ((communicator_size < 12) || + (communicator_size < (a0 * (message_size / 1024.0) + b0))) { + /* Binary_0K */ + segsize = 0; + } else { + /* Binary_1K */ + segsize = 1024; + } + return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize); + } else if (message_size <= 35000) { + if (communicator_size <= 12) { + /* Binary_8K */ + segsize = 1024 << 3; + return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize); + } else { + /* SplittedBinary_1K */ + segsize = 1024; + return ompi_coll_tuned_bcast_intra_split_bintree(buff, count, datatype, root, comm, segsize); + } + + } else if (communicator_size > (a1 * (message_size / 1024.0) + b1)) { + /* SplittedBinary_8K */ + segsize = 1024 << 3; + return ompi_coll_tuned_bcast_intra_split_bintree(buff, count, datatype, root, comm, segsize); + } + if (communicator_size > (a2 * (message_size / 1024.0) + b2)) { + /* Pipeline_8K */ + segsize = 1024 << 3; + } else { + /* Pipeline_64K */ + segsize = 1024 << 6; + } + return ompi_coll_tuned_bcast_intra_pipeline (buff, count, datatype, root, comm, segsize); +#if 0 /* this is based on gige measurements */ - if (comsize < 4) { + if (communicator_size < 4) { return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm); } - if (comsize == 4) { - if (msgsize < 524288) segsize = 0; + if (communicator_size == 4) { + if (message_size < 524288) segsize = 0; else segsize = 16384; return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize); } - if (comsize <= 8 && msgsize < 4096) { + if (communicator_size <= 8 && message_size < 4096) { return ompi_coll_tuned_bcast_intra_basic_linear (buff, count, datatype, root, comm); } - if (comsize > 8 && msgsize >= 32768 && msgsize < 524288) { + if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) { segsize = 16384; return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize); } - if (msgsize >= 524288) { + if (message_size >= 524288) { segsize = 16384; return ompi_coll_tuned_bcast_intra_pipeline (buff, count, datatype, root, comm, segsize); } @@ -185,6 +229,7 @@ int ompi_coll_tuned_bcast_intra_dec_fixed(void *buff, int count, /* once tested can swap this back in */ /* return ompi_coll_tuned_bcast_intra_bmtree (buff, count, datatype, root, comm, segsize); */ return ompi_coll_tuned_bcast_intra_bintree (buff, count, datatype, root, comm, segsize); +#endif /* 0 */ } /* @@ -200,8 +245,16 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, struct ompi_op_t* op, int root, struct ompi_communicator_t* comm) { - int comsize, rank, err, segsize = 0, fanout = 0; - size_t msgsize, dsize; + int communicator_size, rank, err, segsize = 0; + size_t message_size, dsize; + const double a1 = 0.6016 / 1024.0; /* [1/B] */ + const double b1 = 1.3496; + const double a2 = 0.0410 / 1024.0; /* [1/B] */ + const double b2 = 9.7128; + const double a3 = 0.0422 / 1024.0; /* [1/B] */ + const double b3 = 1.1614; + const double a4 = 0.0033 / 1024.0; /* [1/B] */ + const double b4 = 1.6761; OPAL_OUTPUT((ompi_coll_tuned_stream, "ompi_coll_tuned_reduce_intra_dec_fixed")); @@ -212,7 +265,7 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); } - comsize = ompi_comm_size(comm); + communicator_size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); /* need data size for decision function */ @@ -222,24 +275,58 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, return (err); } - msgsize = dsize * count; /* needed for decision */ + message_size = dsize * count; /* needed for decision */ - /* for small messages use linear algorithm */ - if (msgsize <= 4096) { + if (((communicator_size < 20) && (message_size < 512)) || + ((communicator_size < 10) && (message_size <= 1024))){ + /* Linear_0K */ + return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); + } else if ((communicator_size < 8) && (message_size < 20480)) { + /* Binomial_0K */ segsize = 0; - fanout = comsize - 1; + return ompi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + } else if (message_size < 2048) { + /* Binary_0K */ + segsize = 0; + return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + } else if (communicator_size > (a1 * message_size + b1)) { + /* Binary_1K */ + segsize = 1024; + return ompi_coll_tuned_reduce_intra_binary(sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + } else if (communicator_size > (a2 * message_size + b2)) { + /* Pipeline_1K */ + segsize = 1024; + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + } else if (communicator_size > (a3 * message_size + b3)) { + /* Binary_32K */ + segsize = 32*1024; + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); + } + if (communicator_size > (a4 * message_size + b4)) { + /* Pipeline_32K */ + segsize = 32*1024; + } else { + /* Pipeline_64K */ + segsize = 64*1024; + } + return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); +#if 0 + /* for small messages use linear algorithm */ + if (message_size <= 4096) { + segsize = 0; + fanout = communicator_size - 1; /* when linear implemented or taken from basic put here, right now using chain as a linear system */ /* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */ return ompi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); /* return ompi_coll_tuned_reduce_intra_chain (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */ } - if (msgsize < 524288) { - if (msgsize <= 65536 ) { + if (message_size < 524288) { + if (message_size <= 65536 ) { segsize = 32768; fanout = 8; } else { segsize = 1024; - fanout = comsize/2; + fanout = communicator_size/2; } /* later swap this for a binary tree */ /* fanout = 2; */ @@ -247,4 +334,5 @@ int ompi_coll_tuned_reduce_intra_dec_fixed( void *sendbuf, void *recvbuf, } segsize = 1024; return ompi_coll_tuned_reduce_intra_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm, segsize); +#endif /* 0 */ } diff --git a/ompi/mca/coll/tuned/coll_tuned_reduce.c b/ompi/mca/coll/tuned/coll_tuned_reduce.c index f0284a7b8e..80c7605b90 100644 --- a/ompi/mca/coll/tuned/coll_tuned_reduce.c +++ b/ompi/mca/coll/tuned/coll_tuned_reduce.c @@ -29,73 +29,44 @@ #include "coll_tuned.h" #include "coll_tuned_topo.h" -/* Attention: this version of the reduce operations does not - work for: - - non-commutative operations - - segment sizes which are not multiplies of the extent of the datatype - meaning that at least one datatype must fit in the segment ! -*/ - -int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, - ompi_datatype_t* datatype, ompi_op_t* op, - int root, ompi_communicator_t* comm, uint32_t segsize, - int fanout) +/** + * This is a generic implementation of the reduce protocol. It used the tree + * provided as an argument and execute all operations using a segment of + * count times a datatype. + * For the last communication it will update the count in order to limit + * th number of datatype to the original count (original_count) + */ +int ompi_coll_tuned_reduce_generic( void* sendbuf, void* recvbuf, int original_count, + ompi_datatype_t* datatype, ompi_op_t* op, + int root, ompi_communicator_t* comm, + ompi_coll_tree_t* tree, int count_by_segment ) { - int ret, line, rank, size, i = 0; - int recvcount, sendcount, prevcount, inbi, previnbi; - int segcount, segindex, num_segments; char *inbuf[2] = {(char*)NULL, (char*)NULL}; - char *accumbuf = (char*)NULL; - char *sendtmpbuf = (char*)NULL; - ptrdiff_t ext, lb; + char *local_op_buffer, *accumbuf, *sendtmpbuf; + ptrdiff_t extent, lower_bound; size_t typelng, realsegsize; ompi_request_t* reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL}; - ompi_coll_tree_t* chain; - - size = ompi_comm_size(comm); - rank = ompi_comm_rank(comm); - - OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", rank, fanout, segsize)); - - /* setup the chain topology. - * if the previous chain topology is the same, then use this cached copy - * other wise recreate it. - */ - if ((comm->c_coll_selected_data->cached_chain) && (comm->c_coll_selected_data->cached_chain_root == root) - && (comm->c_coll_selected_data->cached_chain_fanout == fanout)) { - chain = comm->c_coll_selected_data->cached_chain; - } else { - if (comm->c_coll_selected_data->cached_chain) { /* destroy previous chain if defined */ - ompi_coll_tuned_topo_destroy_tree (&comm->c_coll_selected_data->cached_chain); - } - comm->c_coll_selected_data->cached_chain = chain = ompi_coll_tuned_topo_build_chain(fanout,comm,root); - comm->c_coll_selected_data->cached_chain_root = root; - comm->c_coll_selected_data->cached_chain_fanout = fanout; - } + int num_segments, line, ret, segindex, i, rank; + int recvcount, prevcount, inbi, previnbi; /** * Determine number of segments and number of elements * sent per operation */ - ompi_ddt_get_extent( datatype, &lb, &ext ); + ompi_ddt_get_extent( datatype, &lower_bound, &extent ); ompi_ddt_type_size( datatype, &typelng ); - if( segsize > typelng ) { - segcount = (int)(segsize / typelng); - num_segments = count/segcount; - if( (count % segcount) != 0 ) num_segments++; - } else { - segcount = count; - num_segments = 1; - } - realsegsize = segcount * ext; + num_segments = (original_count + count_by_segment - 1) / count_by_segment; + realsegsize = count_by_segment * extent; sendtmpbuf = (char*) sendbuf; if( sendbuf == MPI_IN_PLACE ) { sendtmpbuf = (char *)recvbuf; } + rank = ompi_comm_rank(comm); + /* non-leaf nodes - wait for children to send me data & forward up (if needed) */ - if( chain->tree_nextsize > 0 ) { + if( tree->tree_nextsize > 0 ) { /* handle non existant recv buffer (i.e. its NULL.. like basic allreduce uses!) */ accumbuf = (char*)recvbuf; if( NULL == accumbuf ) { @@ -108,7 +79,7 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, if( inbuf[0] == NULL ) { line = __LINE__; ret = -1; goto error_hndl; } /* if there is chance to overlap communication - allocate second buffer */ - if( (num_segments > 1) || (chain->tree_nextsize > 1) ) { + if( (num_segments > 1) || (tree->tree_nextsize > 1) ) { inbuf[1] = (char*) malloc(realsegsize); if( inbuf[1] == NULL ) { line = __LINE__; ret = -1; goto error_hndl;} } else { @@ -122,12 +93,12 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, for( segindex = 0; segindex <= num_segments; segindex++ ) { prevcount = recvcount; /* recvcount - number of elements in current segment */ - recvcount = segcount; + recvcount = count_by_segment; if( segindex == (num_segments-1) ) - recvcount = count - segcount*segindex; + recvcount = original_count - count_by_segment * segindex; /* for each child */ - for( i = 0; i < chain->tree_nextsize; i++ ) { + for( i = 0; i < tree->tree_nextsize; i++ ) { /** * We try to overlap communication: * either with next segment or with the next child @@ -144,11 +115,11 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, * BUT if we are root and are USING MPI_IN_PLACE this is wrong ek! * check for root might not be needed as it should be checked higher up */ - if( !((MPI_IN_PLACE==sendbuf) && (rank==root)) ) { + if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { local_recvbuf = accumbuf + segindex * realsegsize; } } - ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount,datatype, chain->tree_next[i], + ret = MCA_PML_CALL(irecv(local_recvbuf, recvcount,datatype, tree->tree_next[i], MCA_COLL_BASE_TAG_REDUCE, comm, &reqs[inbi])); if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } @@ -157,25 +128,24 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, /* wait on data from last child for previous segment */ ret = ompi_request_wait_all( 1, &reqs[previnbi], MPI_STATUSES_IGNORE ); if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + local_op_buffer = inbuf[previnbi]; if( i > 0 ) { /* our first operation is to combine our own [sendbuf] data with the data * we recvd from down stream (but only if we are not root and not using * MPI_IN_PLACE) */ - void* local_op_buffer = inbuf[previnbi]; if( 1 == i ) { - if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) { + if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { local_op_buffer = sendtmpbuf + segindex * realsegsize; } } /* apply operation */ ompi_op_reduce(op, local_op_buffer, accumbuf+segindex*realsegsize, recvcount, datatype ); } else if ( segindex > 0 ) { - void* local_op_buffer = inbuf[previnbi]; void* accumulator = accumbuf + (segindex-1) * realsegsize; - if( chain->tree_nextsize <= 1 ) { - if( !((MPI_IN_PLACE == sendbuf) && (rank == root)) ) { + if( tree->tree_nextsize <= 1 ) { + if( !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { local_op_buffer = sendtmpbuf+(segindex-1)*realsegsize; } } @@ -184,10 +154,10 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, /* all reduced on available data this step (i) complete, pass to * the next process unless your the root */ - if (rank != root) { + if (rank != tree->tree_root) { /* send combined/accumulated data to parent */ ret = MCA_PML_CALL( send( accumulator, prevcount, datatype, - chain->tree_prev, MCA_COLL_BASE_TAG_REDUCE, + tree->tree_prev, MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, comm) ); if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } } @@ -210,20 +180,20 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, /* leaf nodes */ else { /* Send segmented data to parents */ - for (segindex = 0; segindex < num_segments; segindex++) { - if (segindex < num_segments-1) sendcount = segcount; - else sendcount = count - segindex*segcount; - ret = MCA_PML_CALL( send((char*)sendbuf+segindex*realsegsize, sendcount, - datatype, chain->tree_prev, + segindex = 0; + while( original_count > 0 ) { + if( original_count < count_by_segment ) count_by_segment = original_count; + ret = MCA_PML_CALL( send((char*)sendbuf + segindex * realsegsize, count_by_segment, + datatype, tree->tree_prev, MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, comm) ); if (ret != MPI_SUCCESS) { line = __LINE__; goto error_hndl; } + segindex++; + original_count -= count_by_segment; } } + return OMPI_SUCCESS; - return MPI_SUCCESS; - - /* error handler */ - error_hndl: + error_hndl: /* error handler */ OPAL_OUTPUT (( ompi_coll_tuned_stream, "ERROR_HNDL: node %d file %s line %d error %d\n", rank, __FILE__, line, ret )); if( inbuf[0] != NULL ) free(inbuf[0]); if( inbuf[1] != NULL ) free(inbuf[1]); @@ -231,23 +201,123 @@ int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, return ret; } +/* Attention: this version of the reduce operations does not + work for: + - non-commutative operations + - segment sizes which are not multiplies of the extent of the datatype + meaning that at least one datatype must fit in the segment ! +*/ + +int ompi_coll_tuned_reduce_intra_chain( void *sendbuf, void *recvbuf, int count, + ompi_datatype_t* datatype, ompi_op_t* op, + int root, ompi_communicator_t* comm, uint32_t segsize, + int fanout) +{ + int segcount; + size_t typelng; + + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", ompi_comm_rank(comm), fanout, segsize)); + + COLL_TUNED_UPDATE_CHAIN( comm, root, fanout ); + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, + comm->c_coll_selected_data->cached_chain, segcount ); +} + int ompi_coll_tuned_reduce_intra_pipeline( void *sendbuf, void *recvbuf, int count, ompi_datatype_t* datatype, ompi_op_t* op, int root, ompi_communicator_t* comm, uint32_t segsize ) { - int rank; + int segcount; + size_t typelng; - rank = ompi_comm_rank(comm); + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_pipeline rank %d ss %5d", + ompi_comm_rank(comm), segsize)); - OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_pipeline rank %d ss %5d", rank, segsize)); + COLL_TUNED_UPDATE_PIPELINE( comm, root ); - return ompi_coll_tuned_reduce_intra_chain( sendbuf,recvbuf, count, - datatype, op, root, comm, - segsize, 1 ); + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, + comm->c_coll_selected_data->cached_pipeline, segcount ); } +int ompi_coll_tuned_reduce_intra_binary( void *sendbuf, void *recvbuf, + int count, ompi_datatype_t* datatype, + ompi_op_t* op, int root, + ompi_communicator_t* comm, uint32_t segsize ) +{ + int segcount; + size_t typelng; + + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_binary rank %d ss %5d", + ompi_comm_rank(comm), segsize)); + + COLL_TUNED_UPDATE_BINTREE( comm, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, + comm->c_coll_selected_data->cached_bintree, segcount ); +} + +int ompi_coll_tuned_reduce_intra_binomial( void *sendbuf, void *recvbuf, + int count, ompi_datatype_t* datatype, + ompi_op_t* op, int root, + ompi_communicator_t* comm, uint32_t segsize ) +{ + int segcount; + size_t typelng; + + OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_binomial rank %d ss %5d", + ompi_comm_rank(comm), segsize)); + + COLL_TUNED_UPDATE_BMTREE( comm, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + ompi_ddt_type_size( datatype, &typelng ); + if( segsize > typelng ) { + segcount = (int)(segsize / typelng); + } else { + segcount = count; + } + + return ompi_coll_tuned_reduce_generic( sendbuf, recvbuf, count, datatype, op, root, comm, + comm->c_coll_selected_data->cached_bmtree, segcount ); +} /* * Linear functions are copied from the BASIC coll module @@ -358,7 +428,6 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count, } /* Perform the reduction */ - ompi_op_reduce(op, inbuf, rbuf, count, dtype); } @@ -371,22 +440,22 @@ ompi_coll_tuned_reduce_intra_basic_linear(void *sbuf, void *rbuf, int count, } /* All done */ - return MPI_SUCCESS; } /* copied function (with appropriate renaming) ends here */ -/* The following are used by dynamic and forced rules */ - -/* publish details of each algorithm and if its forced/fixed/locked in */ -/* as you add methods/algorithms you must update this and the query/map routines */ - -/* this routine is called by the component only */ -/* this makes sure that the mca parameters are set to their initial values and perms */ -/* module does not call this they call the forced_getvalues routine instead */ - +/** + * The following are used by dynamic and forced rules + * + * publish details of each algorithm and if its forced/fixed/locked in + * as you add methods/algorithms you must update this and the query/map routines + * + * this routine is called by the component only + * this makes sure that the mca parameters are set to their initial values and perms + * module does not call this they call the forced_getvalues routine instead + */ int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices) { int rc; diff --git a/ompi/mca/coll/tuned/coll_tuned_topo.c b/ompi/mca/coll/tuned/coll_tuned_topo.c index c5c4e616e8..dff6d9a2b5 100644 --- a/ompi/mca/coll/tuned/coll_tuned_topo.c +++ b/ompi/mca/coll/tuned/coll_tuned_topo.c @@ -169,6 +169,128 @@ ompi_coll_tuned_topo_build_tree( int fanout, return tree; } +/* + * Constructs in-order binary tree which can be used for non-commutative reduce + * operations. + * Root of this tree is always rank (size-1) and fanout is 2. + * Here are some of the examples of this tree: + * size == 2 size = 4 size = 9 + * 1 3 8 + * / / \ / \ + * 0 2 1 7 3 + * / / \ / \ + * 0 6 5 2 1 + * / / + * 4 0 + */ +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_in_order_bintree( struct ompi_communicator_t* comm ) +{ + int rank, size; + int myrank, rightsize, delta; + int parent, lchild, rchild; + ompi_coll_tree_t* tree; + + /* + * Get size and rank of the process in this communicator + */ + size = ompi_comm_size(comm); + rank = ompi_comm_rank(comm); + + tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); + if (!tree) { + OPAL_OUTPUT((ompi_coll_tuned_stream, + "coll:tuned:topo_build_tree PANIC::out of memory")); + return NULL; + } + + tree->tree_root = MPI_UNDEFINED; + tree->tree_nextsize = MPI_UNDEFINED; + + /* + * Initialize tree + */ + tree->tree_fanout = 2; + tree->tree_bmtree = 0; + tree->tree_root = size - 1; + tree->tree_prev = -1; + tree->tree_nextsize = 0; + tree->tree_next[0] = -1; + tree->tree_next[1] = -1; + OPAL_OUTPUT((ompi_coll_tuned_stream, + "coll:tuned:topo_build_in_order_tree Building fo %d rt %d", + tree->tree_fanout, tree->tree_root)); + + /* + * Build the tree + */ + myrank = rank; + parent = size - 1; + delta = 0; + + while ( 1 ) { + /* Compute the size of the right subtree */ + rightsize = size >> 1; + + /* Determine the left and right child of this parent */ + lchild = -1; + rchild = -1; + if (size - 1 > 0) { + lchild = parent - 1; + if (lchild > 0) { + rchild = rightsize - 1; + } + } + + /* The following cases are possible: myrank can be + - a parent, + - belong to the left subtree, or + - belong to the right subtee + Each of the cases need to be handled differently. + */ + + if (myrank == parent) { + /* I am the parent: + - compute real ranks of my children, and exit the loop. */ + if (lchild >= 0) tree->tree_next[0] = lchild + delta; + if (rchild >= 0) tree->tree_next[1] = rchild + delta; + break; + } + if (myrank > rchild) { + /* I belong to the left subtree: + - If I am the left child, compute real rank of my parent + - Iterate down through tree: + compute new size, shift ranks down, and update delta. + */ + if (myrank == lchild) { + tree->tree_prev = parent + delta; + } + size = size - rightsize - 1; + delta = delta + rightsize; + myrank = myrank - rightsize; + parent = size - 1; + + } else { + /* I belong to the right subtree: + - If I am the right child, compute real rank of my parent + - Iterate down through tree: + compute new size and parent, + but the delta and rank do not need to change. + */ + if (myrank == rchild) { + tree->tree_prev = parent + delta; + } + size = rightsize; + parent = rchild; + } + } + + if (tree->tree_next[0] >= 0) { tree->tree_nextsize = 1; } + if (tree->tree_next[1] >= 0) { tree->tree_nextsize += 1; } + + return tree; +} + int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree ) { ompi_coll_tree_t *ptr; diff --git a/ompi/mca/coll/tuned/coll_tuned_topo.h b/ompi/mca/coll/tuned/coll_tuned_topo.h index e1e79682e7..a1c0817334 100644 --- a/ompi/mca/coll/tuned/coll_tuned_topo.h +++ b/ompi/mca/coll/tuned/coll_tuned_topo.h @@ -40,7 +40,8 @@ extern "C" { ompi_coll_tuned_topo_build_tree( int fanout, struct ompi_communicator_t* com, int root ); - int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree ); + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_in_order_bintree( struct ompi_communicator_t* comm ); ompi_coll_tree_t* ompi_coll_tuned_topo_build_bmtree( struct ompi_communicator_t* comm, @@ -51,6 +52,8 @@ extern "C" { struct ompi_communicator_t* com, int root ); + int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree ); + /* debugging stuff, will be removed later */ int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank);