1
1

Merge pull request #4985 from mkurnosov/spacc-scan-exscan

coll/spacc: Add recursive doubling algorithm for Scan and Exscan
Этот коммит содержится в:
Nathan Hjelm 2018-05-01 09:21:23 -06:00 коммит произвёл GitHub
родитель 591b174434 82a3a5bdb5
Коммит e9ef7aa256
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
23 изменённых файлов: 1529 добавлений и 1254 удалений

Просмотреть файл

@ -42,4 +42,6 @@ libmca_coll_la_SOURCES += \
base/coll_base_alltoallv.c \
base/coll_base_reduce.c \
base/coll_base_barrier.c \
base/coll_base_reduce_scatter.c
base/coll_base_reduce_scatter.c \
base/coll_base_exscan.c \
base/coll_base_scan.c

Просмотреть файл

@ -15,6 +15,8 @@
* reserved.
* Copyright (c) 2015-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2018 Siberian State University of Telecommunications
* and Information Science. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -907,5 +909,335 @@ ompi_coll_base_allreduce_intra_basic_linear(const void *sbuf, void *rbuf, int co
return ompi_coll_base_bcast_intra_basic_linear(rbuf, count, dtype, 0, comm, module);
}
/*
* ompi_coll_base_allreduce_intra_redscat_allgather
*
* Function: Allreduce using Rabenseifner's algorithm.
* Accepts: Same arguments as MPI_Allreduce
* Returns: MPI_SUCCESS or error code
*
* Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
* [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
* Optimization of Collective Communication Operations in MPICH //
* The Int. Journal of High Performance Computing Applications. Vol 19,
* Issue 1, pp. 49--66.
* [2] http://www.hlrs.de/mpi/myreduce.html.
*
* This algorithm is a combination of a reduce-scatter implemented with
* recursive vector halving and recursive distance doubling, followed either
* by an allgather implemented with recursive doubling [1].
*
* Step 1. If the number of processes is not a power of two, reduce it to
* the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
* by removing r = p - p' extra processes as follows. In the first 2r processes
* (ranks 0 to 2r - 1), all the even ranks send the second half of the input
* vector to their right neighbor (rank + 1), and all the odd ranks send
* the first half of the input vector to their left neighbor (rank - 1).
* The even ranks compute the reduction on the first half of the vector and
* the odd ranks compute the reduction on the second half. The odd ranks then
* send the result to their left neighbors (the even ranks). As a result,
* the even ranks among the first 2r processes now contain the reduction with
* the input vector on their right neighbors (the odd ranks). These odd ranks
* do not participate in the rest of the algorithm, which leaves behind
* a power-of-two number of processes. The first r even-ranked processes and
* the last p - 2r processes are now renumbered from 0 to p' - 1.
*
* Step 2. The remaining processes now perform a reduce-scatter by using
* recursive vector halving and recursive distance doubling. The even-ranked
* processes send the second half of their buffer to rank + 1 and the odd-ranked
* processes send the first half of their buffer to rank - 1. All processes
* then compute the reduction between the local buffer and the received buffer.
* In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
* distance is doubled. At the end, each of the p' processes has 1 / p' of the
* total reduction result.
*
* Step 3. An allgather is performed by using recursive vector doubling and
* distance halving. All exchanges are executed in reverse order relative
* to recursive doubling on previous step. If the number of processes is not
* a power of two, the total result vector must be sent to the r processes
* that were removed in the first step.
*
* Limitations:
* count >= 2^{\floor{\log_2 p}}
* commutative operations only
* intra-communicators only
*
* Memory requirements (per process):
* count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
*/
int ompi_coll_base_allreduce_intra_redscat_allgather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"coll:base:allreduce_intra_redscat_allgather: rank %d/%d",
rank, comm_size));
/* Find nearest power-of-two less than or equal to comm_size */
int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
assert(nsteps >= 0);
int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"coll:base:allreduce_intra_redscat_allgather: rank %d/%d "
"count %d switching to basic linear allreduce",
rank, comm_size, count));
return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
op, comm, module);
}
int err = MPI_SUCCESS;
ptrdiff_t lb, extent, dsize, gap = 0;
ompi_datatype_get_extent(dtype, &lb, &extent);
dsize = opal_datatype_span(&dtype->super, count, &gap);
/* Temporary buffer for receiving messages */
char *tmp_buf = NULL;
char *tmp_buf_raw = (char *)malloc(dsize);
if (NULL == tmp_buf_raw)
return OMPI_ERR_OUT_OF_RESOURCE;
tmp_buf = tmp_buf_raw - gap;
if (sbuf != MPI_IN_PLACE) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
(char *)sbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
/*
* Step 1. Reduce the number of processes to the nearest lower power of two
* p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
* 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
* the second half of the input vector to their right neighbor (rank + 1)
* and all the odd ranks send the first half of the input vector to their
* left neighbor (rank - 1).
* 2. All 2r processes compute the reduction on their half.
* 3. The odd ranks then send the result to their left neighbors
* (the even ranks).
*
* The even ranks (0 to 2r - 1) now contain the reduction with the input
* vector on their right neighbors (the odd ranks). The first r even
* processes and the p - 2r last processes are renumbered from
* 0 to 2^{\floor{\log_2 p}} - 1.
*/
int vrank, step, wsize;
int nprocs_rem = comm_size - nprocs_pof2;
if (rank < 2 * nprocs_rem) {
int count_lhalf = count / 2;
int count_rhalf = count - count_lhalf;
if (rank % 2 != 0) {
/*
* Odd process -- exchange with rank - 1
* Send the left half of the input vector to the left neighbor,
* Recv the right half of the input vector from the left neighbor
*/
err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
(char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
/* Send the right half to the left neighbor */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* This process does not pariticipate in recursive doubling phase */
vrank = -1;
} else {
/*
* Even process -- exchange with rank + 1
* Send the right half of the input vector to the right neighbor,
* Recv the left half of the input vector from the right neighbor
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
tmp_buf, count_lhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
/* Recv the right half from the right neighbor */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = rank / 2;
}
} else { /* rank >= 2 * nprocs_rem */
vrank = rank - nprocs_rem;
}
/*
* Step 2. Reduce-scatter implemented with recursive vector halving and
* recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
* power-of-two number of processes with new ranks (vrank) and result in rbuf.
*
* The even-ranked processes send the right half of their buffer to rank + 1
* and the odd-ranked processes send the left half of their buffer to
* rank - 1. All processes then compute the reduction between the local
* buffer and the received buffer. In the next \log_2(p') - 1 steps, the
* buffers are recursively halved, and the distance is doubled. At the end,
* each of the p' processes has 1 / p' of the total reduction result.
*/
rindex = malloc(sizeof(*rindex) * nsteps);
sindex = malloc(sizeof(*sindex) * nsteps);
rcount = malloc(sizeof(*rcount) * nsteps);
scount = malloc(sizeof(*scount) * nsteps);
if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
if (vrank != -1) {
step = 0;
wsize = count;
sindex[0] = rindex[0] = 0;
for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
/*
* On each iteration: rindex[step] = sindex[step] -- begining of the
* current window. Length of the current window is storded in wsize.
*/
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if (rank < dest) {
/*
* Recv into the left half of the current window, send the right
* half of the window to the peer (perform reduce on the left
* half of the current window)
*/
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
sindex[step] = rindex[step] + rcount[step];
} else {
/*
* Recv into the right half of the current window, send the left
* half of the window to the peer (perform reduce on the right
* half of the current window)
*/
scount[step] = wsize / 2;
rcount[step] = wsize - scount[step];
rindex[step] = sindex[step] + scount[step];
}
/* Send part of data from the rbuf, recv into the tmp_buf */
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
(char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype);
/* Move the current window to the received message */
if (step + 1 < nsteps) {
rindex[step + 1] = rindex[step];
sindex[step + 1] = rindex[step];
wsize = rcount[step];
step++;
}
}
/*
* Assertion: each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
*/
/*
* Step 3. Allgather by the recursive doubling algorithm.
* Each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
* All exchanges are executed in reverse order relative
* to recursive doubling (previous step).
*/
step = nsteps - 1;
for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
/*
* Send rcount[step] elements from rbuf[rindex[step]...]
* Recv scount[step] elements to rbuf[sindex[step]...]
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
step--;
}
}
/*
* Step 4. Send total result to excluded odd ranks.
*/
if (rank < 2 * nprocs_rem) {
if (rank % 2 != 0) {
/* Odd process -- recv result from rank - 1 */
err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
/* Even process -- send result to rank + 1 */
err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
}
cleanup_and_return:
if (NULL != tmp_buf_raw)
free(tmp_buf_raw);
if (NULL != rindex)
free(rindex);
if (NULL != sindex)
free(sindex);
if (NULL != rcount)
free(rcount);
if (NULL != scount)
free(scount);
return err;
}
/* copied function (with appropriate renaming) ends here */

223
ompi/mca/coll/base/coll_base_exscan.c Обычный файл
Просмотреть файл

@ -0,0 +1,223 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018 Siberian State University of Telecommunications
* and Information Science. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
/*
* ompi_coll_base_exscan_intra_linear
*
* Function: Linear algorithm for exclusive scan.
* Accepts: Same as MPI_Exscan
* Returns: MPI_SUCCESS or error code
*/
int
ompi_coll_base_exscan_intra_linear(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int size, rank, err;
ptrdiff_t dsize, gap;
char *free_buffer = NULL;
char *reduce_buffer = NULL;
rank = ompi_comm_rank(comm);
size = ompi_comm_size(comm);
/* For MPI_IN_PLACE, just adjust send buffer to point to
* receive buffer. */
if (MPI_IN_PLACE == sbuf) {
sbuf = rbuf;
}
/* If we're rank 0, then just send our sbuf to the next rank, and
* we are done. */
if (0 == rank) {
return MCA_PML_CALL(send(sbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_EXSCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
}
/* If we're the last rank, then just receive the result from the
* prior rank, and we are done. */
else if ((size - 1) == rank) {
return MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_EXSCAN, comm,
MPI_STATUS_IGNORE));
}
/* Otherwise, get the result from the prior rank, combine it with my
* data, and send it to the next rank */
/* Get a temporary buffer to perform the reduction into. Rationale
* for malloc'ing this size is provided in coll_basic_reduce.c. */
dsize = opal_datatype_span(&dtype->super, count, &gap);
free_buffer = (char*)malloc(dsize);
if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
reduce_buffer = free_buffer - gap;
err = ompi_datatype_copy_content_same_ddt(dtype, count,
reduce_buffer, (char*)sbuf);
/* Receive the reduced value from the prior rank */
err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_EXSCAN, comm, MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
goto error;
}
/* Now reduce the prior rank's result with my source buffer. The source
* buffer had been previously copied into the temporary reduce_buffer. */
ompi_op_reduce(op, rbuf, reduce_buffer, count, dtype);
/* Send my result off to the next rank */
err = MCA_PML_CALL(send(reduce_buffer, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_EXSCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
/* Error */
error:
free(free_buffer);
/* All done */
return err;
}
/*
* ompi_coll_base_exscan_intra_recursivedoubling
*
* Function: Recursive doubling algorithm for exclusive scan.
* Accepts: Same as MPI_Exscan
* Returns: MPI_SUCCESS or error code
*
* Description: Implements recursive doubling algorithm for MPI_Exscan.
* The algorithm preserves order of operations so it can
* be used both by commutative and non-commutative operations.
*
* Example for 5 processes and commutative operation MPI_SUM:
* Process: 0 1 2 3 4
* recvbuf: - - - - -
* psend: [0] [1] [2] [3] [4]
*
* Step 1:
* recvbuf: - [0] - [2] -
* psend: [1+0] [0+1] [3+2] [2+3] [4]
*
* Step 2:
* recvbuf: - [0] [1+0] [(0+1)+2] -
* psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4]
*
* Step 3:
* recvbuf: - [0] [1+0] [(0+1)+2] [(3+2)+(1+0)]
* psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4]
*
* Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma)
* Memory requirements (per process): 2 * count * typesize = O(count)
* Limitations: intra-communicators only
*/
int ompi_coll_base_exscan_intra_recursivedoubling(
const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int err = MPI_SUCCESS;
char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:exscan_intra_recursivedoubling: rank %d/%d",
rank, comm_size));
if (count == 0)
return MPI_SUCCESS;
if (comm_size < 2)
return MPI_SUCCESS;
ptrdiff_t dsize, gap;
dsize = opal_datatype_span(&datatype->super, count, &gap);
tmpsend_raw = malloc(dsize);
tmprecv_raw = malloc(dsize);
if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
char *psend = tmpsend_raw - gap;
char *precv = tmprecv_raw - gap;
if (sendbuf != MPI_IN_PLACE) {
err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, (char *)sendbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
int is_commute = ompi_op_is_commute(op);
int is_first_block = 1;
for (int mask = 1; mask < comm_size; mask <<= 1) {
int remote = rank ^ mask;
if (remote < comm_size) {
err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
MCA_COLL_BASE_TAG_EXSCAN,
precv, count, datatype, remote,
MCA_COLL_BASE_TAG_EXSCAN, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
if (rank > remote) {
/* Assertion: rank > 0 and rbuf is valid */
if (is_first_block) {
err = ompi_datatype_copy_content_same_ddt(datatype, count,
recvbuf, precv);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
is_first_block = 0;
} else {
/* Accumulate prefix reduction: recvbuf = precv <op> recvbuf */
ompi_op_reduce(op, precv, recvbuf, count, datatype);
}
/* Partial result: psend = precv <op> psend */
ompi_op_reduce(op, precv, psend, count, datatype);
} else {
if (is_commute) {
/* psend = precv <op> psend */
ompi_op_reduce(op, precv, psend, count, datatype);
} else {
/* precv = psend <op> precv */
ompi_op_reduce(op, psend, precv, count, datatype);
char *tmp = psend;
psend = precv;
precv = tmp;
}
}
}
}
cleanup_and_return:
if (NULL != tmpsend_raw)
free(tmpsend_raw);
if (NULL != tmprecv_raw)
free(tmprecv_raw);
return err;
}

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017 FUJITSU LIMITED. All rights reserved.
@ -182,6 +182,7 @@ int ompi_coll_base_allreduce_intra_recursivedoubling(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_ring(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_ring_segmented(ALLREDUCE_ARGS, uint32_t segsize);
int ompi_coll_base_allreduce_intra_basic_linear(ALLREDUCE_ARGS);
int ompi_coll_base_allreduce_intra_redscat_allgather(ALLREDUCE_ARGS);
/* AlltoAll */
int ompi_coll_base_alltoall_intra_pairwise(ALLTOALL_ARGS);
@ -222,6 +223,9 @@ int ompi_coll_base_bcast_intra_bintree(BCAST_ARGS, uint32_t segsize);
int ompi_coll_base_bcast_intra_split_bintree(BCAST_ARGS, uint32_t segsize);
/* Exscan */
int ompi_coll_base_exscan_intra_recursivedoubling(EXSCAN_ARGS);
int ompi_coll_base_exscan_intra_linear(EXSCAN_ARGS);
int ompi_coll_base_exscan_intra_recursivedoubling(EXSCAN_ARGS);
/* Gather */
int ompi_coll_base_gather_intra_basic_linear(GATHER_ARGS);
@ -238,6 +242,7 @@ int ompi_coll_base_reduce_intra_pipeline(REDUCE_ARGS, uint32_t segsize, int max_
int ompi_coll_base_reduce_intra_binary(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs );
int ompi_coll_base_reduce_intra_binomial(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs );
int ompi_coll_base_reduce_intra_in_order_binary(REDUCE_ARGS, uint32_t segsize, int max_outstanding_reqs );
int ompi_coll_base_reduce_intra_redscat_gather(REDUCE_ARGS);
/* Reduce_scatter */
int ompi_coll_base_reduce_scatter_intra_nonoverlapping(REDUCESCATTER_ARGS);
@ -245,6 +250,9 @@ int ompi_coll_base_reduce_scatter_intra_basic_recursivehalving(REDUCESCATTER_ARG
int ompi_coll_base_reduce_scatter_intra_ring(REDUCESCATTER_ARGS);
/* Scan */
int ompi_coll_base_scan_intra_recursivedoubling(SCAN_ARGS);
int ompi_coll_base_scan_intra_linear(SCAN_ARGS);
int ompi_coll_base_scan_intra_recursivedoubling(SCAN_ARGS);
/* Scatter */
int ompi_coll_base_scatter_intra_basic_linear(SCATTER_ARGS);

Просмотреть файл

@ -15,6 +15,8 @@
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2018 Siberian State University of Telecommunications
* and Information Science. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -25,6 +27,7 @@
#include "ompi_config.h"
#include "mpi.h"
#include "opal/util/bit_ops.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
@ -34,6 +37,7 @@
#include "ompi/op/op.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "coll_base_topo.h"
#include "coll_base_util.h"
int mca_coll_base_reduce_local(const void *inbuf, void *inoutbuf, int count,
struct ompi_datatype_t * dtype, struct ompi_op_t * op,
@ -706,3 +710,395 @@ ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count
return MPI_SUCCESS;
}
/*
* ompi_coll_base_reduce_intra_redscat_gather
*
* Function: Reduce using Rabenseifner's algorithm.
* Accepts: Same arguments as MPI_Reduce
* Returns: MPI_SUCCESS or error code
*
* Description: an implementation of Rabenseifner's reduce algorithm [1, 2].
* [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
* Optimization of Collective Communication Operations in MPICH //
* The Int. Journal of High Performance Computing Applications. Vol 19,
* Issue 1, pp. 49--66.
* [2] http://www.hlrs.de/mpi/myreduce.html.
*
* This algorithm is a combination of a reduce-scatter implemented with
* recursive vector halving and recursive distance doubling, followed either
* by a binomial tree gather [1].
*
* Step 1. If the number of processes is not a power of two, reduce it to
* the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
* by removing r = p - p' extra processes as follows. In the first 2r processes
* (ranks 0 to 2r - 1), all the even ranks send the second half of the input
* vector to their right neighbor (rank + 1), and all the odd ranks send
* the first half of the input vector to their left neighbor (rank - 1).
* The even ranks compute the reduction on the first half of the vector and
* the odd ranks compute the reduction on the second half. The odd ranks then
* send the result to their left neighbors (the even ranks). As a result,
* the even ranks among the first 2r processes now contain the reduction with
* the input vector on their right neighbors (the odd ranks). These odd ranks
* do not participate in the rest of the algorithm, which leaves behind
* a power-of-two number of processes. The first r even-ranked processes and
* the last p - 2r processes are now renumbered from 0 to p' - 1.
*
* Step 2. The remaining processes now perform a reduce-scatter by using
* recursive vector halving and recursive distance doubling. The even-ranked
* processes send the second half of their buffer to rank + 1 and the odd-ranked
* processes send the first half of their buffer to rank - 1. All processes
* then compute the reduction between the local buffer and the received buffer.
* In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
* distance is doubled. At the end, each of the p' processes has 1 / p' of the
* total reduction result.
*
* Step 3. A binomial tree gather is performed by using recursive vector
* doubling and distance halving. In the non-power-of-two case, if the root
* happens to be one of those odd-ranked processes that would normally
* be removed in the first step, then the role of this process and process 0
* are interchanged.
*
* Limitations:
* count >= 2^{\floor{\log_2 p}}
* commutative operations only
* intra-communicators only
*
* Memory requirements (per process):
* rank != root: 2 * count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
* rank == root: count * typesize + 4 * \log_2(p) * sizeof(int) = O(count)
*
* Recommendations: root = 0, otherwise it is required additional steps
* in the root process.
*/
int ompi_coll_base_reduce_intra_redscat_gather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"coll:base:reduce_intra_redscat_gather: rank %d/%d, root %d",
rank, comm_size, root));
/* Find nearest power-of-two less than or equal to comm_size */
int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
assert(nsteps >= 0);
int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"coll:base:reduce_intra_redscat_gather: rank %d/%d count %d "
"switching to basic linear reduce", rank, comm_size, count));
return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
op, root, comm, module);
}
int err = MPI_SUCCESS;
int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
ptrdiff_t lb, extent, dsize, gap;
ompi_datatype_get_extent(dtype, &lb, &extent);
dsize = opal_datatype_span(&dtype->super, count, &gap);
/* Temporary buffers */
char *tmp_buf_raw = NULL, *rbuf_raw = NULL;
tmp_buf_raw = malloc(dsize);
if (NULL == tmp_buf_raw) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
char *tmp_buf = tmp_buf_raw - gap;
if (rank != root) {
rbuf_raw = malloc(dsize);
if (NULL == rbuf_raw) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
rbuf = rbuf_raw - gap;
}
if ((rank != root) || (sbuf != MPI_IN_PLACE)) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, rbuf,
(char *)sbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
/*
* Step 1. Reduce the number of processes to the nearest lower power of two
* p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
* 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
* the second half of the input vector to their right neighbor (rank + 1)
* and all the odd ranks send the first half of the input vector to their
* left neighbor (rank - 1).
* 2. All 2r processes compute the reduction on their half.
* 3. The odd ranks then send the result to their left neighbors
* (the even ranks).
*
* The even ranks (0 to 2r - 1) now contain the reduction with the input
* vector on their right neighbors (the odd ranks). The first r even
* processes and the p - 2r last processes are renumbered from
* 0 to 2^{\floor{\log_2 p}} - 1. These odd ranks do not participate in the
* rest of the algorithm.
*/
int vrank, step, wsize;
int nprocs_rem = comm_size - nprocs_pof2;
if (rank < 2 * nprocs_rem) {
int count_lhalf = count / 2;
int count_rhalf = count - count_lhalf;
if (rank % 2 != 0) {
/*
* Odd process -- exchange with rank - 1
* Send the left half of the input vector to the left neighbor,
* Recv the right half of the input vector from the left neighbor
*/
err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE,
(char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
(char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
/* Send the right half to the left neighbor */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* This process does not pariticipate in recursive doubling phase */
vrank = -1;
} else {
/*
* Even process -- exchange with rank + 1
* Send the right half of the input vector to the right neighbor,
* Recv the left half of the input vector from the right neighbor
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE,
tmp_buf, count_lhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
/* Recv the right half from the right neighbor */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = rank / 2;
}
} else { /* rank >= 2 * nprocs_rem */
vrank = rank - nprocs_rem;
}
/*
* Step 2. Reduce-scatter implemented with recursive vector halving and
* recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
* power-of-two number of processes with new ranks (vrank) and result in rbuf.
*
* The even-ranked processes send the right half of their buffer to rank + 1
* and the odd-ranked processes send the left half of their buffer to
* rank - 1. All processes then compute the reduction between the local
* buffer and the received buffer. In the next \log_2(p') - 1 steps, the
* buffers are recursively halved, and the distance is doubled. At the end,
* each of the p' processes has 1 / p' of the total reduction result.
*/
rindex = malloc(sizeof(*rindex) * nsteps); /* O(\log_2(p)) */
sindex = malloc(sizeof(*sindex) * nsteps);
rcount = malloc(sizeof(*rcount) * nsteps);
scount = malloc(sizeof(*scount) * nsteps);
if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
if (vrank != -1) {
step = 0;
wsize = count;
sindex[0] = rindex[0] = 0;
for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
/*
* On each iteration: rindex[step] = sindex[step] -- begining of the
* current window. Length of the current window is storded in wsize.
*/
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if (rank < dest) {
/*
* Recv into the left half of the current window, send the right
* half of the window to the peer (perform reduce on the left
* half of the current window)
*/
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
sindex[step] = rindex[step] + rcount[step];
} else {
/*
* Recv into the right half of the current window, send the left
* half of the window to the peer (perform reduce on the right
* half of the current window)
*/
scount[step] = wsize / 2;
rcount[step] = wsize - scount[step];
rindex[step] = sindex[step] + scount[step];
}
/* Send part of data from the rbuf, recv into the tmp_buf */
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE,
(char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
(char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype);
/* Move the current window to the received message */
if (step + 1 < nsteps) {
rindex[step + 1] = rindex[step];
sindex[step + 1] = rindex[step];
wsize = rcount[step];
step++;
}
}
}
/*
* Assertion: each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
*/
/*
* Setup the root process for gather operation.
* Case 1: root < 2r and root is odd -- root process was excluded on step 1
* Recv data from process 0, vroot = 0, vrank = 0
* Case 2: root < 2r and root is even: vroot = root / 2
* Case 3: root >= 2r: vroot = root - r
*/
int vroot = 0;
if (root < 2 * nprocs_rem) {
if (root % 2 != 0) {
vroot = 0;
if (rank == root) {
/*
* Case 1: root < 2r and root is odd -- root process was
* excluded on step 1 (newrank == -1).
* Recv a data from the process 0.
*/
rindex[0] = 0;
step = 0, wsize = count;
for (int mask = 1; mask < nprocs_pof2; mask *= 2) {
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
rindex[step] = 0;
sindex[step] = rcount[step];
step++;
wsize /= 2;
}
err = MCA_PML_CALL(recv(rbuf, rcount[nsteps - 1], dtype, 0,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = 0;
} else if (vrank == 0) {
/* Send a data to the root */
err = MCA_PML_CALL(send(rbuf, rcount[nsteps - 1], dtype, root,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = -1;
}
} else {
/* Case 2: root < 2r and a root is even: vroot = root / 2 */
vroot = root / 2;
}
} else {
/* Case 3: root >= 2r: newroot = root - r */
vroot = root - nprocs_rem;
}
/*
* Step 3. Gather result at the vroot by the binomial tree algorithm.
* Each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
* All exchanges are executed in reverse order relative
* to recursive doubling (previous step).
*/
if (vrank != -1) {
int vdest_tree, vroot_tree;
step = nsteps - 1; /* step = ilog2(p') - 1 */
for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if ((vdest == 0) && (root < 2 * nprocs_rem) && (root % 2 != 0))
dest = root;
vdest_tree = vdest >> step;
vdest_tree <<= step;
vroot_tree = vroot >> step;
vroot_tree <<= step;
if (vdest_tree == vroot_tree) {
/* Send data from rbuf and exit */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
break;
} else {
/* Recv and continue */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
step--;
}
}
cleanup_and_return:
if (NULL != tmp_buf_raw)
free(tmp_buf_raw);
if (NULL != rbuf_raw)
free(rbuf_raw);
if (NULL != rindex)
free(rindex);
if (NULL != sindex)
free(sindex);
if (NULL != rcount)
free(rcount);
if (NULL != scount)
free(scount);
return err;
}

230
ompi/mca/coll/base/coll_base_scan.c Обычный файл
Просмотреть файл

@ -0,0 +1,230 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018 Siberian State University of Telecommunications
* and Information Science. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
/*
* ompi_coll_base_scan_intra_linear
*
* Function: Linear algorithm for inclusive scan.
* Accepts: Same as MPI_Scan
* Returns: MPI_SUCCESS or error code
*/
int
ompi_coll_base_scan_intra_linear(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int size, rank, err;
ptrdiff_t dsize, gap;
char *free_buffer = NULL;
char *pml_buffer = NULL;
/* Initialize */
rank = ompi_comm_rank(comm);
size = ompi_comm_size(comm);
/* If I'm rank 0, just copy into the receive buffer */
if (0 == rank) {
if (MPI_IN_PLACE != sbuf) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (MPI_SUCCESS != err) {
return err;
}
}
}
/* Otherwise receive previous buffer and reduce. */
else {
/* Allocate a temporary buffer. Rationale for this size is
* listed in coll_basic_reduce.c. Use this temporary buffer to
* receive into, later. */
dsize = opal_datatype_span(&dtype->super, count, &gap);
free_buffer = malloc(dsize);
if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = free_buffer - gap;
/* Copy the send buffer into the receive buffer. */
if (MPI_IN_PLACE != sbuf) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
}
/* Receive the prior answer */
err = MCA_PML_CALL(recv(pml_buffer, count, dtype,
rank - 1, MCA_COLL_BASE_TAG_SCAN, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
/* Perform the operation */
ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
/* All done */
if (NULL != free_buffer) {
free(free_buffer);
}
}
/* Send result to next process. */
if (rank < (size - 1)) {
return MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_SCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
}
/* All done */
return MPI_SUCCESS;
}
/*
* ompi_coll_base_scan_intra_recursivedoubling
*
* Function: Recursive doubling algorithm for inclusive scan.
* Accepts: Same as MPI_Scan
* Returns: MPI_SUCCESS or error code
*
* Description: Implements recursive doubling algorithm for MPI_Scan.
* The algorithm preserves order of operations so it can
* be used both by commutative and non-commutative operations.
*
* Example for 5 processes and commutative operation MPI_SUM:
* Process: 0 1 2 3 4
* recvbuf: [0] [1] [2] [3] [4]
* psend: [0] [1] [2] [3] [4]
*
* Step 1:
* recvbuf: [0] [0+1] [2] [2+3] [4]
* psend: [1+0] [0+1] [3+2] [2+3] [4]
*
* Step 2:
* recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [4]
* psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4]
*
* Step 3:
* recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [((3+2)+(1+0))+4]
* psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4]
*
* Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma)
* Memory requirements (per process): 2 * count * typesize = O(count)
* Limitations: intra-communicators only
*/
int ompi_coll_base_scan_intra_recursivedoubling(
const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int err = MPI_SUCCESS;
char *tmpsend_raw = NULL, *tmprecv_raw = NULL;
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"coll:base:scan_intra_recursivedoubling: rank %d/%d",
rank, comm_size));
if (count == 0)
return MPI_SUCCESS;
if (sendbuf != MPI_IN_PLACE) {
err = ompi_datatype_copy_content_same_ddt(datatype, count, recvbuf, (char *)sendbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
if (comm_size < 2)
return MPI_SUCCESS;
ptrdiff_t dsize, gap;
dsize = opal_datatype_span(&datatype->super, count, &gap);
tmpsend_raw = malloc(dsize);
tmprecv_raw = malloc(dsize);
if (NULL == tmpsend_raw || NULL == tmprecv_raw) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
char *psend = tmpsend_raw - gap;
char *precv = tmprecv_raw - gap;
err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
int is_commute = ompi_op_is_commute(op);
for (int mask = 1; mask < comm_size; mask <<= 1) {
int remote = rank ^ mask;
if (remote < comm_size) {
err = ompi_coll_base_sendrecv(psend, count, datatype, remote,
MCA_COLL_BASE_TAG_SCAN,
precv, count, datatype, remote,
MCA_COLL_BASE_TAG_SCAN, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
if (rank > remote) {
/* Accumulate prefix reduction: recvbuf = precv <op> recvbuf */
ompi_op_reduce(op, precv, recvbuf, count, datatype);
/* Partial result: psend = precv <op> psend */
ompi_op_reduce(op, precv, psend, count, datatype);
} else {
if (is_commute) {
/* psend = precv <op> psend */
ompi_op_reduce(op, precv, psend, count, datatype);
} else {
/* precv = psend <op> precv */
ompi_op_reduce(op, psend, precv, count, datatype);
char *tmp = psend;
psend = precv;
precv = tmp;
}
}
}
}
cleanup_and_return:
if (NULL != tmpsend_raw)
free(tmpsend_raw);
if (NULL != tmprecv_raw)
free(tmprecv_raw);
return err;
}

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2011 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -48,72 +48,7 @@ mca_coll_basic_exscan_intra(const void *sbuf, void *rbuf, int count,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int size, rank, err;
ptrdiff_t dsize, gap;
char *free_buffer = NULL;
char *reduce_buffer = NULL;
rank = ompi_comm_rank(comm);
size = ompi_comm_size(comm);
/* For MPI_IN_PLACE, just adjust send buffer to point to
* receive buffer. */
if (MPI_IN_PLACE == sbuf) {
sbuf = rbuf;
}
/* If we're rank 0, then just send our sbuf to the next rank, and
* we are done. */
if (0 == rank) {
return MCA_PML_CALL(send(sbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_EXSCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
}
/* If we're the last rank, then just receive the result from the
* prior rank, and we are done. */
else if ((size - 1) == rank) {
return MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_EXSCAN, comm,
MPI_STATUS_IGNORE));
}
/* Otherwise, get the result from the prior rank, combine it with my
* data, and send it to the next rank */
/* Get a temporary buffer to perform the reduction into. Rationale
* for malloc'ing this size is provided in coll_basic_reduce.c. */
dsize = opal_datatype_span(&dtype->super, count, &gap);
free_buffer = (char*)malloc(dsize);
if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
reduce_buffer = free_buffer - gap;
err = ompi_datatype_copy_content_same_ddt(dtype, count,
reduce_buffer, (char*)sbuf);
/* Receive the reduced value from the prior rank */
err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_EXSCAN, comm, MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
goto error;
}
/* Now reduce the prior rank's result with my source buffer. The source
* buffer had been previously copied into the temporary reduce_buffer. */
ompi_op_reduce(op, rbuf, reduce_buffer, count, dtype);
/* Send my result off to the next rank */
err = MCA_PML_CALL(send(reduce_buffer, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_EXSCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
/* Error */
error:
free(free_buffer);
/* All done */
return err;
return ompi_coll_base_exscan_intra_linear(sbuf, rbuf, count, dtype, op, comm, module);
}

Просмотреть файл

@ -9,7 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -46,85 +46,5 @@ mca_coll_basic_scan_intra(const void *sbuf, void *rbuf, int count,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int size, rank, err;
ptrdiff_t dsize, gap;
char *free_buffer = NULL;
char *pml_buffer = NULL;
/* Initialize */
rank = ompi_comm_rank(comm);
size = ompi_comm_size(comm);
/* If I'm rank 0, just copy into the receive buffer */
if (0 == rank) {
if (MPI_IN_PLACE != sbuf) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (MPI_SUCCESS != err) {
return err;
}
}
}
/* Otherwise receive previous buffer and reduce. */
else {
/* Allocate a temporary buffer. Rationale for this size is
* listed in coll_basic_reduce.c. Use this temporary buffer to
* receive into, later. */
dsize = opal_datatype_span(&dtype->super, count, &gap);
free_buffer = malloc(dsize);
if (NULL == free_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pml_buffer = free_buffer - gap;
/* Copy the send buffer into the receive buffer. */
if (MPI_IN_PLACE != sbuf) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)rbuf, (char*)sbuf);
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
}
/* Receive the prior answer */
err = MCA_PML_CALL(recv(pml_buffer, count, dtype,
rank - 1, MCA_COLL_BASE_TAG_SCAN, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
if (NULL != free_buffer) {
free(free_buffer);
}
return err;
}
/* Perform the operation */
ompi_op_reduce(op, pml_buffer, rbuf, count, dtype);
/* All done */
if (NULL != free_buffer) {
free(free_buffer);
}
}
/* Send result to next process. */
if (rank < (size - 1)) {
return MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_SCAN,
MCA_PML_BASE_SEND_STANDARD, comm));
}
/* All done */
return MPI_SUCCESS;
return ompi_coll_base_scan_intra_linear(sbuf, rbuf, count, dtype, op, comm, module);
}

Просмотреть файл

@ -1,37 +0,0 @@
#
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
coll_spacc.h \
coll_spacc_component.c \
coll_spacc_module.c \
coll_spacc_allreduce.c \
coll_spacc_reduce.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_ompi_coll_spacc_DSO
component_noinst =
component_install = mca_coll_spacc.la
else
component_noinst = libmca_coll_spacc.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_coll_spacc_la_SOURCES = $(sources)
mca_coll_spacc_la_LDFLAGS = -module -avoid-version
mca_coll_spacc_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la
noinst_LTLIBRARIES = $(component_noinst)
libmca_coll_spacc_la_SOURCES =$(sources)
libmca_coll_spacc_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,84 +0,0 @@
/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_COLL_SPACC_EXPORT_H
#define MCA_COLL_SPACC_EXPORT_H
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/mca/coll/coll.h"
BEGIN_C_DECLS
/* Globally exported variables */
extern int mca_coll_spacc_stream;
extern int mca_coll_spacc_priority;
extern int mca_coll_spacc_verbose;
/* API functions */
int mca_coll_spacc_init_query(bool enable_progress_threads,
bool enable_mpi_threads);
mca_coll_base_module_t
*mca_coll_spacc_comm_query(struct ompi_communicator_t *comm, int *priority);
int mca_coll_spacc_module_enable(mca_coll_base_module_t *module,
struct ompi_communicator_t *comm);
int mca_coll_spacc_allreduce_intra_redscat_allgather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
int mca_coll_spacc_reduce_intra_redscat_gather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);
/*
* coll API functions
*/
/* API functions */
int ompi_coll_spacc_init_query(bool enable_progress_threads,
bool enable_mpi_threads);
mca_coll_base_module_t *
ompi_coll_spacc_comm_query(struct ompi_communicator_t *comm, int *priority);
struct mca_coll_spacc_component_t {
/* Base coll component */
mca_coll_base_component_2_0_0_t super;
/* MCA parameter: priority of this component */
int spacc_priority;
/* global stuff that I need the component to store */
/* MCA parameters first */
};
/*
* Convenience typedef
*/
typedef struct mca_coll_spacc_component_t mca_coll_spacc_component_t;
/*
* Global component instance
*/
OMPI_MODULE_DECLSPEC extern mca_coll_spacc_component_t mca_coll_spacc_component;
struct mca_coll_spacc_module_t {
mca_coll_base_module_t super;
};
typedef struct mca_coll_spacc_module_t mca_coll_spacc_module_t;
OBJ_CLASS_DECLARATION(mca_coll_spacc_module_t);
#endif /* MCA_COLL_SPACC_EXPORT_H */

Просмотреть файл

@ -1,355 +0,0 @@
/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "coll_spacc.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "opal/util/bit_ops.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
/*
* mca_coll_spacc_allreduce_intra_redscat_gather
*
* Function: Allreduce using Rabenseifner's algorithm.
* Accepts: Same arguments as MPI_Allreduce
* Returns: MPI_SUCCESS or error code
*
* Description: an implementation of Rabenseifner's allreduce algorithm [1, 2].
* [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
* Optimization of Collective Communication Operations in MPICH //
* The Int. Journal of High Performance Computing Applications. Vol 19,
* Issue 1, pp. 49--66.
* [2] http://www.hlrs.de/mpi/myreduce.html.
*
* This algorithm is a combination of a reduce-scatter implemented with
* recursive vector halving and recursive distance doubling, followed either
* by an allgather implemented with recursive doubling [1].
*
* Step 1. If the number of processes is not a power of two, reduce it to
* the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
* by removing r = p - p' extra processes as follows. In the first 2r processes
* (ranks 0 to 2r - 1), all the even ranks send the second half of the input
* vector to their right neighbor (rank + 1), and all the odd ranks send
* the first half of the input vector to their left neighbor (rank - 1).
* The even ranks compute the reduction on the first half of the vector and
* the odd ranks compute the reduction on the second half. The odd ranks then
* send the result to their left neighbors (the even ranks). As a result,
* the even ranks among the first 2r processes now contain the reduction with
* the input vector on their right neighbors (the odd ranks). These odd ranks
* do not participate in the rest of the algorithm, which leaves behind
* a power-of-two number of processes. The first r even-ranked processes and
* the last p - 2r processes are now renumbered from 0 to p' - 1.
*
* Step 2. The remaining processes now perform a reduce-scatter by using
* recursive vector halving and recursive distance doubling. The even-ranked
* processes send the second half of their buffer to rank + 1 and the odd-ranked
* processes send the first half of their buffer to rank - 1. All processes
* then compute the reduction between the local buffer and the received buffer.
* In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
* distance is doubled. At the end, each of the p' processes has 1 / p' of the
* total reduction result.
*
* Step 3. An allgather is performed by using recursive vector doubling and
* distance halving. All exchanges are executed in reverse order relative
* to recursive doubling on previous step. If the number of processes is not
* a power of two, the total result vector must be sent to the r processes
* that were removed in the first step.
*
* Limitations:
* count >= 2^{\floor{\log_2 p}}
* commutative operations only
* intra-communicators only
*
* Memory requirements (per process):
* count * typesize + 4 * log_2(p) * sizeof(int) = O(count)
*/
int mca_coll_spacc_allreduce_intra_redscat_allgather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
opal_output_verbose(30, mca_coll_spacc_stream,
"coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d",
rank, comm_size);
/* Find nearest power-of-two less than or equal to comm_size */
int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
assert(nsteps >= 0);
int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
opal_output_verbose(20, mca_coll_spacc_stream,
"coll:spacc:allreduce_intra_redscat_allgather: rank %d/%d count %d switching to base allreduce",
rank, comm_size, count);
return ompi_coll_base_allreduce_intra_basic_linear(sbuf, rbuf, count, dtype,
op, comm, module);
}
int err = MPI_SUCCESS;
ptrdiff_t lb, extent, dsize, gap = 0;
ompi_datatype_get_extent(dtype, &lb, &extent);
dsize = opal_datatype_span(&dtype->super, count, &gap);
/* Temporary buffer for receiving messages */
char *tmp_buf = NULL;
char *tmp_buf_raw = (char *)malloc(dsize);
if (NULL == tmp_buf_raw)
return OMPI_ERR_OUT_OF_RESOURCE;
tmp_buf = tmp_buf_raw - gap;
if (sbuf != MPI_IN_PLACE) {
/* Copy sbuf to rbuf */
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
(char *)sbuf);
}
/*
* Step 1. Reduce the number of processes to the nearest lower power of two
* p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
* 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
* the second half of the input vector to their right neighbor (rank + 1)
* and all the odd ranks send the first half of the input vector to their
* left neighbor (rank - 1).
* 2. All 2r processes compute the reduction on their half.
* 3. The odd ranks then send the result to their left neighbors
* (the even ranks).
*
* The even ranks (0 to 2r - 1) now contain the reduction with the input
* vector on their right neighbors (the odd ranks). The first r even
* processes and the p - 2r last processes are renumbered from
* 0 to 2^{\floor{\log_2 p}} - 1.
*/
int vrank, step, wsize;
int nprocs_rem = comm_size - nprocs_pof2;
if (rank < 2 * nprocs_rem) {
int count_lhalf = count / 2;
int count_rhalf = count - count_lhalf;
if (rank % 2 != 0) {
/*
* Odd process -- exchange with rank - 1
* Send the left half of the input vector to the left neighbor,
* Recv the right half of the input vector from the left neighbor
*/
err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
(char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
/* Send the right half to the left neighbor */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* This process does not pariticipate in recursive doubling phase */
vrank = -1;
} else {
/*
* Even process -- exchange with rank + 1
* Send the right half of the input vector to the right neighbor,
* Recv the left half of the input vector from the right neighbor
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
tmp_buf, count_lhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
/* Recv the right half from the right neighbor */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = rank / 2;
}
} else { /* rank >= 2 * nprocs_rem */
vrank = rank - nprocs_rem;
}
/*
* Step 2. Reduce-scatter implemented with recursive vector halving and
* recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
* power-of-two number of processes with new ranks (vrank) and result in rbuf.
*
* The even-ranked processes send the right half of their buffer to rank + 1
* and the odd-ranked processes send the left half of their buffer to
* rank - 1. All processes then compute the reduction between the local
* buffer and the received buffer. In the next \log_2(p') - 1 steps, the
* buffers are recursively halved, and the distance is doubled. At the end,
* each of the p' processes has 1 / p' of the total reduction result.
*/
rindex = malloc(sizeof(*rindex) * nsteps);
sindex = malloc(sizeof(*sindex) * nsteps);
rcount = malloc(sizeof(*rcount) * nsteps);
scount = malloc(sizeof(*scount) * nsteps);
if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
if (vrank != -1) {
step = 0;
wsize = count;
sindex[0] = rindex[0] = 0;
for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
/*
* On each iteration: rindex[step] = sindex[step] -- begining of the
* current window. Length of the current window is storded in wsize.
*/
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if (rank < dest) {
/*
* Recv into the left half of the current window, send the right
* half of the window to the peer (perform reduce on the left
* half of the current window)
*/
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
sindex[step] = rindex[step] + rcount[step];
} else {
/*
* Recv into the right half of the current window, send the left
* half of the window to the peer (perform reduce on the right
* half of the current window)
*/
scount[step] = wsize / 2;
rcount[step] = wsize - scount[step];
rindex[step] = sindex[step] + scount[step];
}
/* Send part of data from the rbuf, recv into the tmp_buf */
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
(char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype);
/* Move the current window to the received message */
if (step + 1 < nsteps) {
rindex[step + 1] = rindex[step];
sindex[step + 1] = rindex[step];
wsize = rcount[step];
step++;
}
}
/*
* Assertion: each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
*/
/*
* Step 3. Allgather by the recursive doubling algorithm.
* Each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
* All exchanges are executed in reverse order relative
* to recursive doubling (previous step).
*/
step--;
for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
/*
* Send rcount[step] elements from rbuf[rindex[step]...]
* Recv scount[step] elements to rbuf[sindex[step]...]
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE,
(char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
step--;
}
}
/*
* Step 4. Send total result to excluded odd ranks.
*/
if (rank < 2 * nprocs_rem) {
if (rank % 2 != 0) {
/* Odd process -- recv result from rank - 1 */
err = MCA_PML_CALL(recv(rbuf, count, dtype, rank - 1,
MCA_COLL_BASE_TAG_ALLREDUCE, comm,
MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != err) { goto cleanup_and_return; }
} else {
/* Even process -- send result to rank + 1 */
err = MCA_PML_CALL(send(rbuf, count, dtype, rank + 1,
MCA_COLL_BASE_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
}
cleanup_and_return:
if (NULL != tmp_buf_raw)
free(tmp_buf_raw);
if (NULL != rindex)
free(rindex);
if (NULL != sindex)
free(sindex);
if (NULL != rcount)
free(rcount);
if (NULL != scount)
free(scount);
return err;
}

Просмотреть файл

@ -1,100 +0,0 @@
/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/mca/coll/coll.h"
#include "coll_spacc.h"
/*
* Public string showing the coll ompi_spacc component version number
*/
const char *ompi_coll_spacc_component_version_string =
"Open MPI SPACC collective MCA component version " OMPI_VERSION;
/*
* Global variable
*/
int mca_coll_spacc_priority = 5;
int mca_coll_spacc_stream = -1;
int mca_coll_spacc_verbose = 0;
/*
* Local function
*/
static int spacc_register(void);
static int spacc_open(void);
static int spacc_close(void);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
mca_coll_spacc_component_t mca_coll_spacc_component = {
/* First, fill in the super */
{
/* First, the mca_component_t struct containing meta information
about the component itself */
.collm_version = {
MCA_COLL_BASE_VERSION_2_0_0,
/* Component name and version */
.mca_component_name = "spacc",
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
OMPI_RELEASE_VERSION),
/* Component open and close functions */
.mca_open_component = spacc_open,
.mca_close_component = spacc_close,
.mca_register_component_params = spacc_register,
},
.collm_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Initialization / querying functions */
.collm_init_query = ompi_coll_spacc_init_query,
.collm_comm_query = ompi_coll_spacc_comm_query,
}
};
static int spacc_register(void)
{
/* Use a low priority, but allow other components to be lower */
mca_coll_spacc_priority = 5;
(void)mca_base_component_var_register(&mca_coll_spacc_component.super.collm_version,
"priority", "Priority of the spacc coll component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_spacc_priority);
(void)mca_base_component_var_register(&mca_coll_spacc_component.super.collm_version,
"verbose", "Verbose level of the spacc coll component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_spacc_verbose);
return OMPI_SUCCESS;
}
static int spacc_open(void)
{
mca_coll_spacc_stream = opal_output_open(NULL);
opal_output_set_verbosity(mca_coll_spacc_stream, mca_coll_spacc_verbose);
opal_output_verbose(30, mca_coll_spacc_stream, "coll:spacc:component_open: done");
return OMPI_SUCCESS;
}
static int spacc_close(void)
{
opal_output_verbose(30, mca_coll_spacc_stream, "coll:spacc:component_close: done");
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,99 +0,0 @@
/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/coll.h"
#include "coll_spacc.h"
static int spacc_module_enable(mca_coll_base_module_t *module,
struct ompi_communicator_t *comm);
/*
* Initial query function that is invoked during MPI_INIT, allowing
* this component to disqualify itself if it doesn't support the
* required level of thread support.
*/
int ompi_coll_spacc_init_query(bool enable_progress_threads,
bool enable_mpi_threads)
{
return OMPI_SUCCESS;
}
/*
* Invoked when there's a new communicator that has been created.
* Look at the communicator and decide which set of functions and
* priority we want to return.
*/
mca_coll_base_module_t *ompi_coll_spacc_comm_query(
struct ompi_communicator_t *comm, int *priority)
{
mca_coll_spacc_module_t *spacc_module;
opal_output_verbose(30, mca_coll_spacc_stream, "coll:spacc:module_comm_query called");
if (OMPI_COMM_IS_INTER(comm)) {
opal_output_verbose(20, mca_coll_spacc_stream,
"coll:spacc:module_comm_query: spacc does not support inter-communicators");
*priority = 0;
return NULL;
}
if (OMPI_COMM_IS_INTRA(comm) && ompi_comm_size(comm) < 2) {
*priority = 0;
return NULL;
}
spacc_module = OBJ_NEW(mca_coll_spacc_module_t);
if (NULL == spacc_module)
return NULL;
*priority = mca_coll_spacc_priority;
spacc_module->super.coll_module_enable = spacc_module_enable;
spacc_module->super.ft_event = NULL;
spacc_module->super.coll_allgather = NULL;
spacc_module->super.coll_allgatherv = NULL;
spacc_module->super.coll_allreduce = mca_coll_spacc_allreduce_intra_redscat_allgather;
spacc_module->super.coll_alltoall = NULL;
spacc_module->super.coll_alltoallv = NULL;
spacc_module->super.coll_alltoallw = NULL;
spacc_module->super.coll_barrier = NULL;
spacc_module->super.coll_bcast = NULL;
spacc_module->super.coll_exscan = NULL;
spacc_module->super.coll_gather = NULL;
spacc_module->super.coll_gatherv = NULL;
spacc_module->super.coll_reduce = mca_coll_spacc_reduce_intra_redscat_gather;
spacc_module->super.coll_reduce_scatter_block = NULL;
spacc_module->super.coll_reduce_scatter = NULL;
spacc_module->super.coll_scan = NULL;
spacc_module->super.coll_scatter = NULL;
spacc_module->super.coll_scatterv = NULL;
return &(spacc_module->super);
}
/*
* Init module on the communicator
*/
static int spacc_module_enable(mca_coll_base_module_t *module,
struct ompi_communicator_t *comm)
{
opal_output_verbose(30, mca_coll_spacc_stream, "coll:spacc:module_enable called");
return OMPI_SUCCESS;
}
static void mca_coll_spacc_module_construct(mca_coll_spacc_module_t *module)
{
/* mca_coll_spacc_module_t *spacc_module = (mca_coll_spacc_module_t*)module; */
}
OBJ_CLASS_INSTANCE(mca_coll_spacc_module_t, mca_coll_base_module_t,
mca_coll_spacc_module_construct, NULL);

Просмотреть файл

@ -1,416 +0,0 @@
/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "coll_spacc.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "opal/util/bit_ops.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_functions.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/coll/base/coll_base_util.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
/*
* mca_coll_spacc_reduce_intra_redscat_gather
*
* Function: Reduce using Rabenseifner's algorithm.
* Accepts: Same arguments as MPI_Reduce
* Returns: MPI_SUCCESS or error code
*
* Description: an implementation of Rabenseifner's reduce algorithm [1, 2].
* [1] Rajeev Thakur, Rolf Rabenseifner and William Gropp.
* Optimization of Collective Communication Operations in MPICH //
* The Int. Journal of High Performance Computing Applications. Vol 19,
* Issue 1, pp. 49--66.
* [2] http://www.hlrs.de/mpi/myreduce.html.
*
* This algorithm is a combination of a reduce-scatter implemented with
* recursive vector halving and recursive distance doubling, followed either
* by a binomial tree gather [1].
*
* Step 1. If the number of processes is not a power of two, reduce it to
* the nearest lower power of two (p' = 2^{\floor{\log_2 p}})
* by removing r = p - p' extra processes as follows. In the first 2r processes
* (ranks 0 to 2r - 1), all the even ranks send the second half of the input
* vector to their right neighbor (rank + 1), and all the odd ranks send
* the first half of the input vector to their left neighbor (rank - 1).
* The even ranks compute the reduction on the first half of the vector and
* the odd ranks compute the reduction on the second half. The odd ranks then
* send the result to their left neighbors (the even ranks). As a result,
* the even ranks among the first 2r processes now contain the reduction with
* the input vector on their right neighbors (the odd ranks). These odd ranks
* do not participate in the rest of the algorithm, which leaves behind
* a power-of-two number of processes. The first r even-ranked processes and
* the last p - 2r processes are now renumbered from 0 to p' - 1.
*
* Step 2. The remaining processes now perform a reduce-scatter by using
* recursive vector halving and recursive distance doubling. The even-ranked
* processes send the second half of their buffer to rank + 1 and the odd-ranked
* processes send the first half of their buffer to rank - 1. All processes
* then compute the reduction between the local buffer and the received buffer.
* In the next log_2(p') - 1 steps, the buffers are recursively halved, and the
* distance is doubled. At the end, each of the p' processes has 1 / p' of the
* total reduction result.
*
* Step 3. A binomial tree gather is performed by using recursive vector
* doubling and distance halving. In the non-power-of-two case, if the root
* happens to be one of those odd-ranked processes that would normally
* be removed in the first step, then the role of this process and process 0
* are interchanged.
*
* Limitations:
* count >= 2^{\floor{\log_2 p}}
* commutative operations only
* intra-communicators only
*
* Memory requirements (per process):
* rank != root: 2 * count * typesize + 4 * log_2(p) * sizeof(int) = O(count)
* rank == root: count * typesize + 4 * log_2(p) * sizeof(int) = O(count)
*
* Recommendations: root = 0, otherwise it is required additional steps
* in the root process.
*/
int mca_coll_spacc_reduce_intra_redscat_gather(
const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int comm_size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
opal_output_verbose(30, mca_coll_spacc_stream,
"coll:spacc:reduce_intra_redscat_gather: rank %d/%d, root %d",
rank, comm_size, root);
/* Find nearest power-of-two less than or equal to comm_size */
int nsteps = opal_hibit(comm_size, comm->c_cube_dim + 1); /* ilog2(comm_size) */
assert(nsteps >= 0);
int nprocs_pof2 = 1 << nsteps; /* flp2(comm_size) */
if (count < nprocs_pof2 || !ompi_op_is_commute(op)) {
opal_output_verbose(20, mca_coll_spacc_stream,
"coll:spacc:reduce_intra_redscat_gather: rank %d/%d count %d switching to base reduce",
rank, comm_size, count);
return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype,
op, root, comm, module);
}
int err = MPI_SUCCESS;
int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL;
ptrdiff_t lb, extent, dsize, gap;
ompi_datatype_get_extent(dtype, &lb, &extent);
dsize = opal_datatype_span(&dtype->super, count, &gap);
/* Temporary buffer for receiving messages */
char *tmp_buf = NULL;
char *tmp_buf_raw = (char *)malloc(dsize);
if (NULL == tmp_buf_raw)
return OMPI_ERR_OUT_OF_RESOURCE;
tmp_buf = tmp_buf_raw - gap;
char *rbuf_raw = NULL;
if (rank != root) {
rbuf_raw = (char *)malloc(dsize);
if (NULL == rbuf_raw) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
rbuf = rbuf_raw - gap;
}
if ((rank != root) || (sbuf != MPI_IN_PLACE)) {
/* Copy sbuf to rbuf */
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char *)rbuf,
(char *)sbuf);
}
/*
* Step 1. Reduce the number of processes to the nearest lower power of two
* p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
* 1. In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
* the second half of the input vector to their right neighbor (rank + 1)
* and all the odd ranks send the first half of the input vector to their
* left neighbor (rank - 1).
* 2. All 2r processes compute the reduction on their half.
* 3. The odd ranks then send the result to their left neighbors
* (the even ranks).
*
* The even ranks (0 to 2r - 1) now contain the reduction with the input
* vector on their right neighbors (the odd ranks). The first r even
* processes and the p - 2r last processes are renumbered from
* 0 to 2^{\floor{\log_2 p}} - 1. These odd ranks do not participate in the
* rest of the algorithm.
*/
int vrank, step, wsize;
int nprocs_rem = comm_size - nprocs_pof2;
if (rank < 2 * nprocs_rem) {
int count_lhalf = count / 2;
int count_rhalf = count - count_lhalf;
if (rank % 2 != 0) {
/*
* Odd process -- exchange with rank - 1
* Send the left half of the input vector to the left neighbor,
* Recv the right half of the input vector from the left neighbor
*/
err = ompi_coll_base_sendrecv(rbuf, count_lhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE,
(char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)count_lhalf * extent,
(char *)rbuf + count_lhalf * extent, count_rhalf, dtype);
/* Send the right half to the left neighbor */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank - 1,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* This process does not pariticipate in recursive doubling phase */
vrank = -1;
} else {
/*
* Even process -- exchange with rank + 1
* Send the right half of the input vector to the right neighbor,
* Recv the left half of the input vector from the right neighbor
*/
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE,
tmp_buf, count_lhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Reduce on the right half of the buffers (result in rbuf) */
ompi_op_reduce(op, tmp_buf, rbuf, count_lhalf, dtype);
/* Recv the right half from the right neighbor */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)count_lhalf * extent,
count_rhalf, dtype, rank + 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = rank / 2;
}
} else { /* rank >= 2 * nprocs_rem */
vrank = rank - nprocs_rem;
}
/*
* Step 2. Reduce-scatter implemented with recursive vector halving and
* recursive distance doubling. We have p' = 2^{\floor{\log_2 p}}
* power-of-two number of processes with new ranks (vrank) and result in rbuf.
*
* The even-ranked processes send the right half of their buffer to rank + 1
* and the odd-ranked processes send the left half of their buffer to
* rank - 1. All processes then compute the reduction between the local
* buffer and the received buffer. In the next \log_2(p') - 1 steps, the
* buffers are recursively halved, and the distance is doubled. At the end,
* each of the p' processes has 1 / p' of the total reduction result.
*/
rindex = malloc(sizeof(*rindex) * nsteps); /* O(\log_2(p)) */
sindex = malloc(sizeof(*sindex) * nsteps);
rcount = malloc(sizeof(*rcount) * nsteps);
scount = malloc(sizeof(*scount) * nsteps);
if (NULL == rindex || NULL == sindex || NULL == rcount || NULL == scount) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup_and_return;
}
if (vrank != -1) {
step = 0;
wsize = count;
sindex[0] = rindex[0] = 0;
for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
/*
* On each iteration: rindex[step] = sindex[step] -- begining of the
* current window. Length of the current window is storded in wsize.
*/
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if (rank < dest) {
/*
* Recv into the left half of the current window, send the right
* half of the window to the peer (perform reduce on the left
* half of the current window)
*/
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
sindex[step] = rindex[step] + rcount[step];
} else {
/*
* Recv into the right half of the current window, send the left
* half of the window to the peer (perform reduce on the right
* half of the current window)
*/
scount[step] = wsize / 2;
rcount[step] = wsize - scount[step];
rindex[step] = sindex[step] + scount[step];
}
/* Send part of data from the rbuf, recv into the tmp_buf */
err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE,
(char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE, rank);
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
/* Local reduce: rbuf[] = tmp_buf[] <op> rbuf[] */
ompi_op_reduce(op, (char *)tmp_buf + (ptrdiff_t)rindex[step] * extent,
(char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype);
/* Move the current window to the received message */
if (step + 1 < nsteps) {
rindex[step + 1] = rindex[step];
sindex[step + 1] = rindex[step];
wsize = rcount[step];
step++;
}
}
}
/*
* Assertion: each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
*/
/*
* Setup the root process for gather operation.
* Case 1: root < 2r and root is odd -- root process was excluded on step 1
* Recv data from process 0, vroot = 0, vrank = 0
* Case 2: root < 2r and root is even: vroot = root / 2
* Case 3: root >= 2r: vroot = root - r
*/
int vroot = 0;
if (root < 2 * nprocs_rem) {
if (root % 2 != 0) {
vroot = 0;
if (rank == root) {
/*
* Case 1: root < 2r and root is odd -- root process was
* excluded on step 1 (newrank == -1).
* Recv a data from the process 0.
*/
rindex[0] = 0;
step = 0, wsize = count;
for (int mask = 1; mask < nprocs_pof2; mask *= 2) {
rcount[step] = wsize / 2;
scount[step] = wsize - rcount[step];
rindex[step] = 0;
sindex[step] = rcount[step];
step++;
wsize /= 2;
}
err = MCA_PML_CALL(recv(rbuf, rcount[nsteps - 1], dtype, 0,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = 0;
} else if (vrank == 0) {
/* Send a data to the root */
err = MCA_PML_CALL(send(rbuf, rcount[nsteps - 1], dtype, root,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
vrank = -1;
}
} else {
/* Case 2: root < 2r and a root is even: vroot = root / 2 */
vroot = root / 2;
}
} else {
/* Case 3: root >= 2r: newroot = root - r */
vroot = root - nprocs_rem;
}
/*
* Step 3. Gather result at the vroot by the binomial tree algorithm.
* Each process has 1 / p' of the total reduction result:
* rcount[nsteps - 1] elements in the rbuf[rindex[nsteps - 1], ...].
* All exchanges are executed in reverse order relative
* to recursive doubling (previous step).
*/
if (vrank != -1) {
int vdest_tree, vroot_tree;
step = nsteps - 1; /* step = ilog2(p') - 1 */
for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) {
int vdest = vrank ^ mask;
/* Translate vdest virtual rank to real rank */
int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem;
if ((vdest == 0) && (root < 2 * nprocs_rem) && (root % 2 != 0))
dest = root;
vdest_tree = vdest >> step;
vdest_tree <<= step;
vroot_tree = vroot >> step;
vroot_tree <<= step;
if (vdest_tree == vroot_tree) {
/* Send data from rbuf and exit */
err = MCA_PML_CALL(send((char *)rbuf + (ptrdiff_t)rindex[step] * extent,
rcount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
break;
} else {
/* Recv and continue */
err = MCA_PML_CALL(recv((char *)rbuf + (ptrdiff_t)sindex[step] * extent,
scount[step], dtype, dest,
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) { goto cleanup_and_return; }
}
step--;
}
}
cleanup_and_return:
if (NULL != tmp_buf_raw)
free(tmp_buf_raw);
if (NULL != rbuf_raw)
free(rbuf_raw);
if (NULL != rindex)
free(rindex);
if (NULL != sindex)
free(sindex);
if (NULL != rcount)
free(rcount);
if (NULL != scount)
free(scount);
return err;
}

Просмотреть файл

@ -11,6 +11,8 @@
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2017 IBM Corporation. All rights reserved.
# Copyright (c) 2018 Research Organization for Information Science
# and Technology (RIST). All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -38,7 +40,9 @@ sources = \
coll_tuned_reduce_decision.c \
coll_tuned_bcast_decision.c \
coll_tuned_reduce_scatter_decision.c \
coll_tuned_scatter_decision.c
coll_tuned_scatter_decision.c \
coll_tuned_exscan_decision.c \
coll_tuned_scan_decision.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

Просмотреть файл

@ -3,8 +3,8 @@
* Copyright (c) 2004-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -156,6 +156,18 @@ int ompi_coll_tuned_scatter_intra_dec_dynamic(SCATTER_ARGS);
int ompi_coll_tuned_scatter_intra_do_this(SCATTER_ARGS, int algorithm, int faninout, int segsize);
int ompi_coll_tuned_scatter_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
/* Exscan */
int ompi_coll_tuned_exscan_intra_dec_fixed(EXSCAN_ARGS);
int ompi_coll_tuned_exscan_intra_dec_dynamic(EXSCAN_ARGS);
int ompi_coll_tuned_exscan_intra_do_this(EXSCAN_ARGS, int algorithm);
int ompi_coll_tuned_exscan_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
/* Scan */
int ompi_coll_tuned_scan_intra_dec_fixed(SCAN_ARGS);
int ompi_coll_tuned_scan_intra_dec_dynamic(SCAN_ARGS);
int ompi_coll_tuned_scan_intra_do_this(SCAN_ARGS, int algorithm);
int ompi_coll_tuned_scan_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices);
int mca_coll_tuned_ft_event(int state);
struct mca_coll_tuned_component_t {

Просмотреть файл

@ -3,8 +3,8 @@
* Copyright (c) 2004-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -41,6 +41,7 @@ static mca_base_var_enum_value_t allreduce_algorithms[] = {
{3, "recursive_doubling"},
{4, "ring"},
{5, "segmented_ring"},
{6, "rabenseifner"},
{0, NULL}
};
@ -142,6 +143,8 @@ int ompi_coll_tuned_allreduce_intra_do_this(const void *sbuf, void *rbuf, int co
return ompi_coll_base_allreduce_intra_ring(sbuf, rbuf, count, dtype, op, comm, module);
case (5):
return ompi_coll_base_allreduce_intra_ring_segmented(sbuf, rbuf, count, dtype, op, comm, module, segsize);
case (6):
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:allreduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[ALLREDUCE]));

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -189,6 +189,8 @@ static int tuned_register(void)
ompi_coll_tuned_reduce_scatter_intra_check_forced_init(&ompi_coll_tuned_forced_params[REDUCESCATTER]);
ompi_coll_tuned_gather_intra_check_forced_init(&ompi_coll_tuned_forced_params[GATHER]);
ompi_coll_tuned_scatter_intra_check_forced_init(&ompi_coll_tuned_forced_params[SCATTER]);
ompi_coll_tuned_exscan_intra_check_forced_init(&ompi_coll_tuned_forced_params[EXSCAN]);
ompi_coll_tuned_scan_intra_check_forced_init(&ompi_coll_tuned_forced_params[SCAN]);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
@ -610,3 +610,89 @@ int ompi_coll_tuned_scatter_intra_dec_dynamic(const void *sbuf, int scount,
rbuf, rcount, rdtype,
root, comm, module);
}
int ompi_coll_tuned_exscan_intra_dec_dynamic(const void *sbuf, void* rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
OPAL_OUTPUT((ompi_coll_tuned_stream,
"ompi_coll_tuned_exscan_intra_dec_dynamic"));
/**
* check to see if we have some filebased rules.
*/
if (tuned_module->com_rules[EXSCAN]) {
int comsize, alg, faninout, segsize, max_requests;
size_t dsize;
comsize = ompi_comm_size(comm);
ompi_datatype_type_size (dtype, &dsize);
dsize *= comsize;
alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[EXSCAN],
dsize, &faninout, &segsize, &max_requests);
if (alg) {
/* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_exscan_intra_do_this (sbuf, rbuf, count, dtype,
op, comm, module,
alg);
} /* found a method */
} /*end if any com rules to check */
if (tuned_module->user_forced[EXSCAN].algorithm) {
return ompi_coll_tuned_exscan_intra_do_this(sbuf, rbuf, count, dtype,
op, comm, module,
tuned_module->user_forced[EXSCAN].algorithm);
}
return ompi_coll_base_exscan_intra_linear(sbuf, rbuf, count, dtype,
op, comm, module);
}
int ompi_coll_tuned_scan_intra_dec_dynamic(const void *sbuf, void* rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
OPAL_OUTPUT((ompi_coll_tuned_stream,
"ompi_coll_tuned_scan_intra_dec_dynamic"));
/**
* check to see if we have some filebased rules.
*/
if (tuned_module->com_rules[SCAN]) {
int comsize, alg, faninout, segsize, max_requests;
size_t dsize;
comsize = ompi_comm_size(comm);
ompi_datatype_type_size (dtype, &dsize);
dsize *= comsize;
alg = ompi_coll_tuned_get_target_method_params (tuned_module->com_rules[SCAN],
dsize, &faninout, &segsize, &max_requests);
if (alg) {
/* we have found a valid choice from the file based rules for this message size */
return ompi_coll_tuned_scan_intra_do_this (sbuf, rbuf, count, dtype,
op, comm, module,
alg);
} /* found a method */
} /*end if any com rules to check */
if (tuned_module->user_forced[SCAN].algorithm) {
return ompi_coll_tuned_scan_intra_do_this(sbuf, rbuf, count, dtype,
op, comm, module,
tuned_module->user_forced[SCAN].algorithm);
}
return ompi_coll_base_scan_intra_linear(sbuf, rbuf, count, dtype,
op, comm, module);
}

Просмотреть файл

@ -0,0 +1,104 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_topo.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
#include "coll_tuned.h"
/* exscan algorithm variables */
static int coll_tuned_exscan_forced_algorithm = 0;
/* valid values for coll_tuned_exscan_forced_algorithm */
static mca_base_var_enum_value_t exscan_algorithms[] = {
{0, "ignore"},
{1, "linear"},
{2, "recursive_doubling"},
{0, NULL}
};
/**
* The following are used by dynamic and forced rules
*
* publish details of each algorithm and if its forced/fixed/locked in
* as you add methods/algorithms you must update this and the query/map routines
*
* this routine is called by the component only
* this makes sure that the mca parameters are set to their initial values and
* perms module does not call this they call the forced_getvalues routine
* instead.
*/
int ompi_coll_tuned_exscan_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
mca_base_var_enum_t*new_enum;
int cnt;
for( cnt = 0; NULL != exscan_algorithms[cnt].string; cnt++ );
ompi_coll_tuned_forced_max_algorithms[EXSCAN] = cnt;
(void) mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"exscan_algorithm_count",
"Number of exscan algorithms available",
MCA_BASE_VAR_TYPE_INT, NULL, 0,
MCA_BASE_VAR_FLAG_DEFAULT_ONLY,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_CONSTANT,
&ompi_coll_tuned_forced_max_algorithms[EXSCAN]);
/* MPI_T: This variable should eventually be bound to a communicator */
coll_tuned_exscan_forced_algorithm = 0;
(void) mca_base_var_enum_create("coll_tuned_exscan_algorithms", exscan_algorithms, &new_enum);
mca_param_indices->algorithm_param_index =
mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"exscan_algorithm",
"Which exscan algorithm is used. Can be locked down to choice of: 0 ignore, 1 linear, 2 recursive_doubling",
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_ALL,
&coll_tuned_exscan_forced_algorithm);
OBJ_RELEASE(new_enum);
if (mca_param_indices->algorithm_param_index < 0) {
return mca_param_indices->algorithm_param_index;
}
return (MPI_SUCCESS);
}
int ompi_coll_tuned_exscan_intra_do_this(const void *sbuf, void* rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module,
int algorithm)
{
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:exscan_intra_do_this selected algorithm %d",
algorithm));
switch (algorithm) {
case (0):
case (1): return ompi_coll_base_exscan_intra_linear(sbuf, rbuf, count, dtype,
op, comm, module);
case (2): return ompi_coll_base_exscan_intra_recursivedoubling(sbuf, rbuf, count, dtype,
op, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:exscan_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[EXSCAN]));
return (MPI_ERR_ARG);
}

Просмотреть файл

@ -11,6 +11,8 @@
* All rights reserved.
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -229,7 +231,7 @@ tuned_module_enable( mca_coll_base_module_t *module,
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, BCAST,
tuned_module->super.coll_bcast = ompi_coll_tuned_bcast_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, EXSCAN,
tuned_module->super.coll_exscan = NULL);
tuned_module->super.coll_exscan = ompi_coll_tuned_exscan_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, GATHER,
tuned_module->super.coll_gather = ompi_coll_tuned_gather_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, GATHERV,
@ -239,7 +241,7 @@ tuned_module_enable( mca_coll_base_module_t *module,
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, REDUCESCATTER,
tuned_module->super.coll_reduce_scatter = ompi_coll_tuned_reduce_scatter_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, SCAN,
tuned_module->super.coll_scan = NULL);
tuned_module->super.coll_scan = ompi_coll_tuned_scan_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, SCATTER,
tuned_module->super.coll_scatter = ompi_coll_tuned_scatter_intra_dec_dynamic);
COLL_TUNED_EXECUTE_IF_DYNAMIC(tuned_module, SCATTERV,

Просмотреть файл

@ -3,8 +3,8 @@
* Copyright (c) 2004-2017 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -41,6 +41,7 @@ static mca_base_var_enum_value_t reduce_algorithms[] = {
{4, "binary"},
{5, "binomial"},
{6, "in-order_binary"},
{7, "rabenseifner"},
{0, NULL}
};
@ -79,7 +80,7 @@ int ompi_coll_tuned_reduce_intra_check_forced_init (coll_tuned_force_algorithm_m
mca_param_indices->algorithm_param_index =
mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"reduce_algorithm",
"Which reduce algorithm is used. Can be locked down to choice of: 0 ignore, 1 linear, 2 chain, 3 pipeline, 4 binary, 5 binomial, 6 in-order binary",
"Which reduce algorithm is used. Can be locked down to choice of: 0 ignore, 1 linear, 2 chain, 3 pipeline, 4 binary, 5 binomial, 6 in-order binary, 7 rabenseifner",
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_ALL,
@ -173,6 +174,8 @@ int ompi_coll_tuned_reduce_intra_do_this(const void *sbuf, void* rbuf, int count
case (6): return ompi_coll_base_reduce_intra_in_order_binary(sbuf, rbuf, count, dtype,
op, root, comm, module,
segsize, max_requests);
case (7): return ompi_coll_base_reduce_intra_redscat_gather(sbuf, rbuf, count, dtype,
op, root, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:reduce_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[REDUCE]));

Просмотреть файл

@ -0,0 +1,104 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include "ompi/constants.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/coll_base_topo.h"
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/op/op.h"
#include "coll_tuned.h"
/* scan algorithm variables */
static int coll_tuned_scan_forced_algorithm = 0;
/* valid values for coll_tuned_scan_forced_algorithm */
static mca_base_var_enum_value_t scan_algorithms[] = {
{0, "ignore"},
{1, "linear"},
{2, "recursive_doubling"},
{0, NULL}
};
/**
* The following are used by dynamic and forced rules
*
* publish details of each algorithm and if its forced/fixed/locked in
* as you add methods/algorithms you must update this and the query/map routines
*
* this routine is called by the component only
* this makes sure that the mca parameters are set to their initial values and
* perms module does not call this they call the forced_getvalues routine
* instead.
*/
int ompi_coll_tuned_scan_intra_check_forced_init (coll_tuned_force_algorithm_mca_param_indices_t *mca_param_indices)
{
mca_base_var_enum_t*new_enum;
int cnt;
for( cnt = 0; NULL != scan_algorithms[cnt].string; cnt++ );
ompi_coll_tuned_forced_max_algorithms[SCAN] = cnt;
(void) mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"scan_algorithm_count",
"Number of scan algorithms available",
MCA_BASE_VAR_TYPE_INT, NULL, 0,
MCA_BASE_VAR_FLAG_DEFAULT_ONLY,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_CONSTANT,
&ompi_coll_tuned_forced_max_algorithms[SCAN]);
/* MPI_T: This variable should eventually be bound to a communicator */
coll_tuned_scan_forced_algorithm = 0;
(void) mca_base_var_enum_create("coll_tuned_scan_algorithms", scan_algorithms, &new_enum);
mca_param_indices->algorithm_param_index =
mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"scan_algorithm",
"Which scan algorithm is used. Can be locked down to choice of: 0 ignore, 1 linear, 2 recursive_doubling",
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_ALL,
&coll_tuned_scan_forced_algorithm);
OBJ_RELEASE(new_enum);
if (mca_param_indices->algorithm_param_index < 0) {
return mca_param_indices->algorithm_param_index;
}
return (MPI_SUCCESS);
}
int ompi_coll_tuned_scan_intra_do_this(const void *sbuf, void* rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module,
int algorithm)
{
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:scan_intra_do_this selected algorithm %d",
algorithm));
switch (algorithm) {
case (0):
case (1): return ompi_coll_base_scan_intra_linear(sbuf, rbuf, count, dtype,
op, comm, module);
case (2): return ompi_coll_base_scan_intra_recursivedoubling(sbuf, rbuf, count, dtype,
op, comm, module);
} /* switch */
OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:scan_intra_do_this attempt to select algorithm %d when only 0-%d is valid?",
algorithm, ompi_coll_tuned_forced_max_algorithms[SCAN]));
return (MPI_ERR_ARG);
}