From 3adf96fdb811ee2495affdc86a7be6c091de4e10 Mon Sep 17 00:00:00 2001 From: Mikhail Kurnosov Date: Tue, 5 Jun 2018 15:49:07 +0700 Subject: [PATCH] coll/base: add butterfly algorithm for MPI_Reduce_scatter Implements butterfly algorithm for MPI_Reduce_scatter. The algorithm can be used both by commutative and non-commutative operations, for power-of-two and non-power-of-two number of processes. Signed-off-by: Mikhail Kurnosov --- ompi/mca/coll/base/coll_base_functions.h | 1 + ompi/mca/coll/base/coll_base_reduce_scatter.c | 275 ++++++++++++++++++ .../base/coll_base_reduce_scatter_block.c | 16 - ompi/mca/coll/base/coll_base_util.c | 15 + ompi/mca/coll/base/coll_base_util.h | 8 + .../coll_tuned_reduce_scatter_decision.c | 7 +- 6 files changed, 304 insertions(+), 18 deletions(-) diff --git a/ompi/mca/coll/base/coll_base_functions.h b/ompi/mca/coll/base/coll_base_functions.h index 3ea0c6dfe7..00efecf705 100644 --- a/ompi/mca/coll/base/coll_base_functions.h +++ b/ompi/mca/coll/base/coll_base_functions.h @@ -248,6 +248,7 @@ int ompi_coll_base_reduce_intra_redscat_gather(REDUCE_ARGS); int ompi_coll_base_reduce_scatter_intra_nonoverlapping(REDUCESCATTER_ARGS); int ompi_coll_base_reduce_scatter_intra_basic_recursivehalving(REDUCESCATTER_ARGS); int ompi_coll_base_reduce_scatter_intra_ring(REDUCESCATTER_ARGS); +int ompi_coll_base_reduce_scatter_intra_butterfly(REDUCESCATTER_ARGS); /* Reduce_scatter_block */ int ompi_coll_base_reduce_scatter_block_basic_linear(REDUCESCATTERBLOCK_ARGS); diff --git a/ompi/mca/coll/base/coll_base_reduce_scatter.c b/ompi/mca/coll/base/coll_base_reduce_scatter.c index f24211d355..948a17376c 100644 --- a/ompi/mca/coll/base/coll_base_reduce_scatter.c +++ b/ompi/mca/coll/base/coll_base_reduce_scatter.c @@ -36,6 +36,7 @@ #include "ompi/op/op.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "coll_base_topo.h" +#include "coll_base_util.h" /******************************************************************************* * ompi_coll_base_reduce_scatter_intra_nonoverlapping @@ -620,3 +621,277 @@ ompi_coll_base_reduce_scatter_intra_ring( const void *sbuf, void *rbuf, const in if (NULL != inbuf_free[1]) free(inbuf_free[1]); return ret; } + +/* + * ompi_sum_counts: Returns sum of counts [lo, hi] + * lo, hi in {0, 1, ..., nprocs_pof2 - 1} + */ +static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi) +{ + /* Adjust lo and hi for taking into account blocks of excluded processes */ + lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem; + hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem; + return displs[hi] + counts[hi] - displs[lo]; +} + +/* + * ompi_coll_base_reduce_scatter_intra_butterfly + * + * Function: Butterfly algorithm for reduce_scatter + * Accepts: Same as MPI_Reduce_scatter + * Returns: MPI_SUCCESS or error code + * + * Description: Implements butterfly algorithm for MPI_Reduce_scatter [*]. + * The algorithm can be used both by commutative and non-commutative + * operations, for power-of-two and non-power-of-two number of processes. + * + * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter + * with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137. + * + * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma), + * where m = sum of rcounts[], p = comm_size + * Memory requirements (per process): 2 * m * typesize + comm_size + * + * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5] + * Step 1. Reduce the number of processes to 4 + * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1 + * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10] + * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1 + * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10] + * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5] + * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5] + * + * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks. + * Round 1 (mask=1, nblocks=2) + * 0: vrank -1 + * 1: vrank 0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*] + * 2: vrank -1 + * 3: vrank 1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20] + * 4: vrank 2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*] + * 5: vrank 3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10] + * + * Round 2 (mask=2, nblocks=1) + * 0: vrank -1 + * 1: vrank 0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*] + * 2: vrank -1 + * 3: vrank 1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*] + * 4: vrank 2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*] + * 5: vrank 3 [**|**|8|10] : exch with 1: send [2], recv [3]: [**|**|*|30] + * + * Step 3. Exchange with remote process according to a mirror permutation: + * mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3 + * 0: vrank -1: recv "0" from process 0 + * 1: vrank 0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0) + * 2: vrank -1: recv result "12" from process 4 + * 3: vrank 1 [**|**|24|*] + * 4: vrank 2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3 + * 5: vrank 3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3) + */ +int +ompi_coll_base_reduce_scatter_intra_butterfly( + const void *sbuf, void *rbuf, const int *rcounts, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + char *tmpbuf[2] = {NULL, NULL}, *psend, *precv; + int *displs = NULL, index; + ptrdiff_t span, gap, totalcount, extent; + int err = MPI_SUCCESS; + int comm_size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, + "coll:base:reduce_scatter_intra_butterfly: rank %d/%d", + rank, comm_size)); + if (comm_size < 2) + return MPI_SUCCESS; + + displs = malloc(sizeof(*displs) * comm_size); + if (NULL == displs) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + displs[0] = 0; + for (int i = 1; i < comm_size; i++) { + displs[i] = displs[i - 1] + rcounts[i - 1]; + } + totalcount = displs[comm_size - 1] + rcounts[comm_size - 1]; + + ompi_datatype_type_extent(dtype, &extent); + span = opal_datatype_span(&dtype->super, totalcount, &gap); + tmpbuf[0] = malloc(span); + tmpbuf[1] = malloc(span); + if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + psend = tmpbuf[0] - gap; + precv = tmpbuf[1] - gap; + + if (sbuf != MPI_IN_PLACE) { + err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } else { + err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + + /* + * Step 1. Reduce the number of processes to the nearest lower power of two + * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes. + * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send + * the input vector to their neighbor (rank + 1) and all the odd ranks recv + * the input vector and perform local reduction. + * The odd ranks (0 to 2r - 1) contain the reduction with the input + * vector on their neighbors (the even ranks). The first r odd + * processes and the p - 2r last processes are renumbered from + * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the + * rest of the algorithm. + */ + + /* Find nearest power-of-two less than or equal to comm_size */ + int nprocs_pof2 = opal_next_poweroftwo(comm_size); + nprocs_pof2 >>= 1; + int nprocs_rem = comm_size - nprocs_pof2; + int log2_size = opal_cube_dim(nprocs_pof2); + + int vrank = -1; + if (rank < 2 * nprocs_rem) { + if ((rank % 2) == 0) { + /* Even process */ + err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (OMPI_SUCCESS != err) { goto cleanup_and_return; } + /* This process does not participate in the rest of the algorithm */ + vrank = -1; + } else { + /* Odd process */ + err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE)); + if (OMPI_SUCCESS != err) { goto cleanup_and_return; } + ompi_op_reduce(op, precv, psend, totalcount, dtype); + /* Adjust rank to be the bottom "remain" ranks */ + vrank = rank / 2; + } + } else { + /* Adjust rank to show that the bottom "even remain" ranks dropped out */ + vrank = rank - nprocs_rem; + } + + if (vrank != -1) { + /* + * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks: + * block 0: rcounts[0] and rcounts[1] -- for process 0 and 1 + * block 1: rcounts[2] and rcounts[3] -- for process 2 and 3 + * ... + * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1] + * block r: rcounts[r+r] + * block r+1: rcounts[r+r+1] + * ... + * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1] + */ + int nblocks = nprocs_pof2, send_index = 0, recv_index = 0; + for (int mask = 1; mask < nprocs_pof2; mask <<= 1) { + int vpeer = vrank ^ mask; + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + + nblocks /= 2; + if ((vrank & mask) == 0) { + /* Send the upper half of reduction buffer, recv the lower half */ + send_index += nblocks; + } else { + /* Send the upper half of reduction buffer, recv the lower half */ + recv_index += nblocks; + } + + /* Send blocks: [send_index, send_index + nblocks - 1] */ + int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem, + send_index, send_index + nblocks - 1); + index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index; + ptrdiff_t sdispl = displs[index]; + + /* Recv blocks: [recv_index, recv_index + nblocks - 1] */ + int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem, + recv_index, recv_index + nblocks - 1); + index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index; + ptrdiff_t rdispl = displs[index]; + + err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count, + dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER, + precv + (ptrdiff_t)rdispl * extent, recv_count, + dtype, peer, MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + + if (vrank < vpeer) { + /* precv = psend precv */ + ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent, + precv + (ptrdiff_t)rdispl * extent, recv_count, dtype); + char *p = psend; + psend = precv; + precv = p; + } else { + /* psend = precv psend */ + ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent, + psend + (ptrdiff_t)rdispl * extent, recv_count, dtype); + } + send_index = recv_index; + } + /* + * psend points to the result block [send_index] + * Exchange results with remote process according to a mirror permutation. + */ + int vpeer = ompi_mirror_perm(vrank, log2_size); + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index; + + if (vpeer < nprocs_rem) { + /* + * Process has two blocks: for excluded process and own. + * Send the first block to excluded process. + */ + err = MCA_PML_CALL(send(psend + (ptrdiff_t)displs[index] * extent, + rcounts[index], dtype, peer - 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + + /* If process has two blocks, then send the second block (own block) */ + if (vpeer < nprocs_rem) + index++; + if (vpeer != vrank) { + err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)displs[index] * extent, + rcounts[index], dtype, peer, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + rbuf, rcounts[rank], dtype, peer, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } else { + err = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], rbuf, + psend + (ptrdiff_t)displs[rank] * extent); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + + } else { + /* Excluded process: receive result */ + int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size); + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, peer, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm, + MPI_STATUS_IGNORE)); + if (OMPI_SUCCESS != err) { goto cleanup_and_return; } + } + +cleanup_and_return: + if (displs) + free(displs); + if (tmpbuf[0]) + free(tmpbuf[0]); + if (tmpbuf[1]) + free(tmpbuf[1]); + return err; +} diff --git a/ompi/mca/coll/base/coll_base_reduce_scatter_block.c b/ompi/mca/coll/base/coll_base_reduce_scatter_block.c index 9bc04c9135..617a58cee8 100644 --- a/ompi/mca/coll/base/coll_base_reduce_scatter_block.c +++ b/ompi/mca/coll/base/coll_base_reduce_scatter_block.c @@ -511,22 +511,6 @@ cleanup_and_return: return err; } -/* - * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits - * of x [*]. - * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013. - * Chapter 7. Rearranging Bits and Bytes. - */ -static unsigned int ompi_mirror_perm(unsigned int x, int nbits) -{ - x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1)); - x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2)); - x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4)); - x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8)); - x = ((x >> 16) | (x << 16)); - return x >> (sizeof(x) * CHAR_BIT - nbits); -} - static int ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2( const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, diff --git a/ompi/mca/coll/base/coll_base_util.c b/ompi/mca/coll/base/coll_base_util.c index d35c14173a..9ff9b0c156 100644 --- a/ompi/mca/coll/base/coll_base_util.c +++ b/ompi/mca/coll/base/coll_base_util.c @@ -78,3 +78,18 @@ int ompi_coll_base_sendrecv_actual( const void* sendbuf, size_t scount, return (err); } +/* + * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits + * of x [*]. + * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013. + * Chapter 7. Rearranging Bits and Bytes. + */ +unsigned int ompi_mirror_perm(unsigned int x, int nbits) +{ + x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1)); + x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2)); + x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4)); + x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8)); + x = ((x >> 16) | (x << 16)); + return x >> (sizeof(x) * CHAR_BIT - nbits); +} diff --git a/ompi/mca/coll/base/coll_base_util.h b/ompi/mca/coll/base/coll_base_util.h index df1f7d18f4..5694ad78af 100644 --- a/ompi/mca/coll/base/coll_base_util.h +++ b/ompi/mca/coll/base/coll_base_util.h @@ -70,5 +70,13 @@ ompi_coll_base_sendrecv( void* sendbuf, size_t scount, ompi_datatype_t* sdatatyp source, rtag, comm, status); } +/** + * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits + * of x [*]. + * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013. + * Chapter 7. Rearranging Bits and Bytes. + */ +unsigned int ompi_mirror_perm(unsigned int x, int nbits); + END_C_DECLS #endif /* MCA_COLL_BASE_UTIL_EXPORT_H */ diff --git a/ompi/mca/coll/tuned/coll_tuned_reduce_scatter_decision.c b/ompi/mca/coll/tuned/coll_tuned_reduce_scatter_decision.c index d09c6a78be..d93a7d9f6e 100644 --- a/ompi/mca/coll/tuned/coll_tuned_reduce_scatter_decision.c +++ b/ompi/mca/coll/tuned/coll_tuned_reduce_scatter_decision.c @@ -36,8 +36,9 @@ static int coll_tuned_reduce_scatter_chain_fanout; static mca_base_var_enum_value_t reduce_scatter_algorithms[] = { {0, "ignore"}, {1, "non-overlapping"}, - {2, "recursive_halfing"}, + {2, "recursive_halving"}, {3, "ring"}, + {4, "butterfly"}, {0, NULL} }; @@ -76,7 +77,7 @@ int ompi_coll_tuned_reduce_scatter_intra_check_forced_init (coll_tuned_force_alg mca_param_indices->algorithm_param_index = mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version, "reduce_scatter_algorithm", - "Which reduce reduce_scatter algorithm is used. Can be locked down to choice of: 0 ignore, 1 non-overlapping (Reduce + Scatterv), 2 recursive halving, 3 ring", + "Which reduce reduce_scatter algorithm is used. Can be locked down to choice of: 0 ignore, 1 non-overlapping (Reduce + Scatterv), 2 recursive halving, 3 ring, 4 butterfly", MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_ALL, @@ -139,6 +140,8 @@ int ompi_coll_tuned_reduce_scatter_intra_do_this(const void *sbuf, void* rbuf, dtype, op, comm, module); case (3): return ompi_coll_base_reduce_scatter_intra_ring(sbuf, rbuf, rcounts, dtype, op, comm, module); + case (4): return ompi_coll_base_reduce_scatter_intra_butterfly(sbuf, rbuf, rcounts, + dtype, op, comm, module); } /* switch */ OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_scatter_intra_do_this attempt to select algorithm %d when only 0-%d is valid?", algorithm, ompi_coll_tuned_forced_max_algorithms[REDUCESCATTER]));