Merge pull request #5226 from mkurnosov/base-reduce-scatter-butterfly

coll/base: add butterfly algorithm for MPI_Reduce_scatter
Этот коммит содержится в:
Nathan Hjelm 2018-06-05 09:38:39 -06:00 коммит произвёл GitHub
родитель 8192eb9aa6 3adf96fdb8
Коммит 948e38d260
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 304 добавлений и 18 удалений

Просмотреть файл

@ -248,6 +248,7 @@ int ompi_coll_base_reduce_intra_redscat_gather(REDUCE_ARGS);
int ompi_coll_base_reduce_scatter_intra_nonoverlapping(REDUCESCATTER_ARGS);
int ompi_coll_base_reduce_scatter_intra_basic_recursivehalving(REDUCESCATTER_ARGS);
int ompi_coll_base_reduce_scatter_intra_ring(REDUCESCATTER_ARGS);
int ompi_coll_base_reduce_scatter_intra_butterfly(REDUCESCATTER_ARGS);
/* Reduce_scatter_block */
int ompi_coll_base_reduce_scatter_block_basic_linear(REDUCESCATTERBLOCK_ARGS);

Просмотреть файл

@ -36,6 +36,7 @@
#include "ompi/op/op.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "coll_base_topo.h"
#include "coll_base_util.h"
* ompi_coll_base_reduce_scatter_intra_nonoverlapping
@ -620,3 +621,277 @@ ompi_coll_base_reduce_scatter_intra_ring( const void *sbuf, void *rbuf, const in
if (NULL != inbuf_free[1]) free(inbuf_free[1]);
return ret;
* ompi_sum_counts: Returns sum of counts [lo, hi]
* lo, hi in {0, 1, ..., nprocs_pof2 - 1}
static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
/* Adjust lo and hi for taking into account blocks of excluded processes */
lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
return displs[hi] + counts[hi] - displs[lo];
* ompi_coll_base_reduce_scatter_intra_butterfly
* Function: Butterfly algorithm for reduce_scatter
* Accepts: Same as MPI_Reduce_scatter
* Returns: MPI_SUCCESS or error code
* Description: Implements butterfly algorithm for MPI_Reduce_scatter [*].
* The algorithm can be used both by commutative and non-commutative
* operations, for power-of-two and non-power-of-two number of processes.
* [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
* with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
* Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
* where m = sum of rcounts[], p = comm_size
* Memory requirements (per process): 2 * m * typesize + comm_size
* Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
* Step 1. Reduce the number of processes to 4
* rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
* rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
* rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
* rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
* rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
* rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
* Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
* Round 1 (mask=1, nblocks=2)
* 0: vrank -1
* 1: vrank 0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
* 2: vrank -1
* 3: vrank 1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
* 4: vrank 2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
* 5: vrank 3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
* Round 2 (mask=2, nblocks=1)
* 0: vrank -1
* 1: vrank 0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
* 2: vrank -1
* 3: vrank 1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
* 4: vrank 2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
* 5: vrank 3 [**|**|8|10] : exch with 1: send [2], recv [3]: [**|**|*|30]
* Step 3. Exchange with remote process according to a mirror permutation:
* mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
* 0: vrank -1: recv "0" from process 0
* 1: vrank 0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
* 2: vrank -1: recv result "12" from process 4
* 3: vrank 1 [**|**|24|*]
* 4: vrank 2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
* 5: vrank 3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
const void *sbuf, void *rbuf, const int *rcounts, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
int *displs = NULL, index;
ptrdiff_t span, gap, totalcount, extent;
int err = MPI_SUCCESS;
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
"coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
rank, comm_size));
if (comm_size < 2)
displs = malloc(sizeof(*displs) * comm_size);
if (NULL == displs) {
goto cleanup_and_return;
displs[0] = 0;
for (int i = 1; i < comm_size; i++) {
displs[i] = displs[i - 1] + rcounts[i - 1];
totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
ompi_datatype_type_extent(dtype, &extent);
span = opal_datatype_span(&dtype->super, totalcount, &gap);
tmpbuf[0] = malloc(span);
tmpbuf[1] = malloc(span);
if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
goto cleanup_and_return;
psend = tmpbuf[0] - gap;
precv = tmpbuf[1] - gap;
if (sbuf != MPI_IN_PLACE) {
err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, (char *)sbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
err = ompi_datatype_copy_content_same_ddt(dtype, totalcount, psend, rbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
* Step 1. Reduce the number of processes to the nearest lower power of two
* p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
* In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
* the input vector to their neighbor (rank + 1) and all the odd ranks recv
* the input vector and perform local reduction.
* The odd ranks (0 to 2r - 1) contain the reduction with the input
* vector on their neighbors (the even ranks). The first r odd
* processes and the p - 2r last processes are renumbered from
* 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
* rest of the algorithm.
/* Find nearest power-of-two less than or equal to comm_size */
int nprocs_pof2 = opal_next_poweroftwo(comm_size);
nprocs_pof2 >>= 1;
int nprocs_rem = comm_size - nprocs_pof2;
int log2_size = opal_cube_dim(nprocs_pof2);
int vrank = -1;
if (rank < 2 * nprocs_rem) {
if ((rank % 2) == 0) {
/* Even process */
err = MCA_PML_CALL(send(psend, totalcount, dtype, rank + 1,
if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
/* This process does not participate in the rest of the algorithm */
vrank = -1;
} else {
/* Odd process */
err = MCA_PML_CALL(recv(precv, totalcount, dtype, rank - 1,
if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
ompi_op_reduce(op, precv, psend, totalcount, dtype);
/* Adjust rank to be the bottom "remain" ranks */
vrank = rank / 2;
} else {
/* Adjust rank to show that the bottom "even remain" ranks dropped out */
vrank = rank - nprocs_rem;
if (vrank != -1) {
* Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
* block 0: rcounts[0] and rcounts[1] -- for process 0 and 1
* block 1: rcounts[2] and rcounts[3] -- for process 2 and 3
* ...
* block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
* block r: rcounts[r+r]
* block r+1: rcounts[r+r+1]
* ...
* block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
int vpeer = vrank ^ mask;
int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
nblocks /= 2;
if ((vrank & mask) == 0) {
/* Send the upper half of reduction buffer, recv the lower half */
send_index += nblocks;
} else {
/* Send the upper half of reduction buffer, recv the lower half */
recv_index += nblocks;
/* Send blocks: [send_index, send_index + nblocks - 1] */
int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
send_index, send_index + nblocks - 1);
index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
ptrdiff_t sdispl = displs[index];
/* Recv blocks: [recv_index, recv_index + nblocks - 1] */
int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
recv_index, recv_index + nblocks - 1);
index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
ptrdiff_t rdispl = displs[index];
err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
precv + (ptrdiff_t)rdispl * extent, recv_count,
comm, MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
if (vrank < vpeer) {
/* precv = psend <op> precv */
ompi_op_reduce(op, psend + (ptrdiff_t)rdispl * extent,
precv + (ptrdiff_t)rdispl * extent, recv_count, dtype);
char *p = psend;
psend = precv;
precv = p;
} else {
/* psend = precv <op> psend */
ompi_op_reduce(op, precv + (ptrdiff_t)rdispl * extent,
psend + (ptrdiff_t)rdispl * extent, recv_count, dtype);
send_index = recv_index;
* psend points to the result block [send_index]
* Exchange results with remote process according to a mirror permutation.
int vpeer = ompi_mirror_perm(vrank, log2_size);
int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
if (vpeer < nprocs_rem) {
* Process has two blocks: for excluded process and own.
* Send the first block to excluded process.
err = MCA_PML_CALL(send(psend + (ptrdiff_t)displs[index] * extent,
rcounts[index], dtype, peer - 1,
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* If process has two blocks, then send the second block (own block) */
if (vpeer < nprocs_rem)
if (vpeer != vrank) {
err = ompi_coll_base_sendrecv(psend + (ptrdiff_t)displs[index] * extent,
rcounts[index], dtype, peer,
rbuf, rcounts[rank], dtype, peer,
comm, MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
err = ompi_datatype_copy_content_same_ddt(dtype, rcounts[rank], rbuf,
psend + (ptrdiff_t)displs[rank] * extent);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
/* Excluded process: receive result */
int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
err = MCA_PML_CALL(recv(rbuf, rcounts[rank], dtype, peer,
if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
if (displs)
if (tmpbuf[0])
if (tmpbuf[1])
return err;

Просмотреть файл

@ -511,22 +511,6 @@ cleanup_and_return:
return err;
* ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
* of x [*].
* [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
* Chapter 7. Rearranging Bits and Bytes.
static unsigned int ompi_mirror_perm(unsigned int x, int nbits)
x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1));
x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2));
x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4));
x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8));
x = ((x >> 16) | (x << 16));
return x >> (sizeof(x) * CHAR_BIT - nbits);
static int ompi_coll_base_reduce_scatter_block_intra_butterfly_pof2(
const void *sbuf, void *rbuf, int rcount, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,

Просмотреть файл

@ -78,3 +78,18 @@ int ompi_coll_base_sendrecv_actual( const void* sendbuf, size_t scount,
return (err);
* ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
* of x [*].
* [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
* Chapter 7. Rearranging Bits and Bytes.
unsigned int ompi_mirror_perm(unsigned int x, int nbits)
x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1));
x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2));
x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4));
x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8));
x = ((x >> 16) | (x << 16));
return x >> (sizeof(x) * CHAR_BIT - nbits);

Просмотреть файл

@ -70,5 +70,13 @@ ompi_coll_base_sendrecv( void* sendbuf, size_t scount, ompi_datatype_t* sdatatyp
source, rtag, comm, status);
* ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
* of x [*].
* [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
* Chapter 7. Rearranging Bits and Bytes.
unsigned int ompi_mirror_perm(unsigned int x, int nbits);

Просмотреть файл

@ -36,8 +36,9 @@ static int coll_tuned_reduce_scatter_chain_fanout;
static mca_base_var_enum_value_t reduce_scatter_algorithms[] = {
{0, "ignore"},
{1, "non-overlapping"},
{2, "recursive_halfing"},
{2, "recursive_halving"},
{3, "ring"},
{4, "butterfly"},
{0, NULL}
@ -76,7 +77,7 @@ int ompi_coll_tuned_reduce_scatter_intra_check_forced_init (coll_tuned_force_alg
mca_param_indices->algorithm_param_index =
"Which reduce reduce_scatter algorithm is used. Can be locked down to choice of: 0 ignore, 1 non-overlapping (Reduce + Scatterv), 2 recursive halving, 3 ring",
"Which reduce reduce_scatter algorithm is used. Can be locked down to choice of: 0 ignore, 1 non-overlapping (Reduce + Scatterv), 2 recursive halving, 3 ring, 4 butterfly",
@ -139,6 +140,8 @@ int ompi_coll_tuned_reduce_scatter_intra_do_this(const void *sbuf, void* rbuf,
dtype, op, comm, module);
case (3): return ompi_coll_base_reduce_scatter_intra_ring(sbuf, rbuf, rcounts,
dtype, op, comm, module);
case (4): return ompi_coll_base_reduce_scatter_intra_butterfly(sbuf, rbuf, rcounts,
dtype, op, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_scatter_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[REDUCESCATTER]));