From 50ec214d42f40ef5ef7bee5a570d4f71441b807e Mon Sep 17 00:00:00 2001 From: Mikhail Kurnosov Date: Fri, 30 Mar 2018 10:12:51 +0700 Subject: [PATCH] Add recursive doubling algorithm for MPI_Scan and MPI_Exscan to coll/base Signed-off-by: Mikhail Kurnosov --- ompi/mca/coll/base/Makefile.am | 4 +- ompi/mca/coll/base/coll_base_exscan.c | 138 +++++++++++++++++++++++ ompi/mca/coll/base/coll_base_functions.h | 2 + ompi/mca/coll/base/coll_base_scan.c | 130 +++++++++++++++++++++ 4 files changed, 273 insertions(+), 1 deletion(-) create mode 100644 ompi/mca/coll/base/coll_base_exscan.c create mode 100644 ompi/mca/coll/base/coll_base_scan.c diff --git a/ompi/mca/coll/base/Makefile.am b/ompi/mca/coll/base/Makefile.am index 21c144bf78..7d95406597 100644 --- a/ompi/mca/coll/base/Makefile.am +++ b/ompi/mca/coll/base/Makefile.am @@ -42,4 +42,6 @@ libmca_coll_la_SOURCES += \ base/coll_base_alltoallv.c \ base/coll_base_reduce.c \ base/coll_base_barrier.c \ - base/coll_base_reduce_scatter.c + base/coll_base_reduce_scatter.c \ + base/coll_base_exscan.c \ + base/coll_base_scan.c diff --git a/ompi/mca/coll/base/coll_base_exscan.c b/ompi/mca/coll/base/coll_base_exscan.c new file mode 100644 index 0000000000..762ae044f2 --- /dev/null +++ b/ompi/mca/coll/base/coll_base_exscan.c @@ -0,0 +1,138 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2018 Siberian State University of Telecommunications + * and Information Science. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/op/op.h" + +/* + * mca_coll_base_exscan_intra_recursivedoubling + * + * Function: Recursive doubling algorithm for exclusive scan. + * Accepts: Same as MPI_Exscan + * Returns: MPI_SUCCESS or error code + * + * Description: Implements recursive doubling algorithm for MPI_Exscan. + * The algorithm preserves order of operations so it can + * be used both by commutative and non-commutative operations. + * + * Example for 5 processes and commutative operation MPI_SUM: + * Process: 0 1 2 3 4 + * recvbuf: - - - - - + * psend: [0] [1] [2] [3] [4] + * + * Step 1: + * recvbuf: - [0] - [2] - + * psend: [1+0] [0+1] [3+2] [2+3] [4] + * + * Step 2: + * recvbuf: - [0] [1+0] [(0+1)+2] - + * psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4] + * + * Step 3: + * recvbuf: - [0] [1+0] [(0+1)+2] [(3+2)+(1+0)] + * psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4] + * + * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma) + * Memory requirements (per process): 2 * count * typesize = O(count) + * Limitations: intra-communicators only + */ +int mca_coll_base_exscan_intra_recursivedoubling( + const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + int err = MPI_SUCCESS; + char *tmpsend_raw = NULL, *tmprecv_raw = NULL; + int comm_size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, "coll:base:exscan_intra_recursivedoubling: rank %d/%d", + rank, comm_size)); + if (count == 0) + return MPI_SUCCESS; + if (comm_size < 2) + return MPI_SUCCESS; + + ptrdiff_t dsize, gap; + dsize = opal_datatype_span(&datatype->super, count, &gap); + tmpsend_raw = malloc(dsize); + tmprecv_raw = malloc(dsize); + if (NULL == tmpsend_raw || NULL == tmprecv_raw) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + char *psend = tmpsend_raw - gap; + char *precv = tmprecv_raw - gap; + if (sendbuf != MPI_IN_PLACE) { + err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, (char *)sendbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } else { + err = ompi_datatype_copy_content_same_ddt(datatype, count, psend, recvbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + int is_commute = ompi_op_is_commute(op); + int is_first_block = 1; + + for (int mask = 1; mask < comm_size; mask <<= 1) { + int remote = rank ^ mask; + if (remote < comm_size) { + err = ompi_coll_base_sendrecv(psend, count, datatype, remote, + MCA_COLL_BASE_TAG_EXSCAN, + precv, count, datatype, remote, + MCA_COLL_BASE_TAG_EXSCAN, comm, + MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + + if (rank > remote) { + /* Assertion: rank > 0 and rbuf is valid */ + if (is_first_block) { + err = ompi_datatype_copy_content_same_ddt(datatype, count, + recvbuf, precv); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + is_first_block = 0; + } else { + /* Accumulate prefix reduction: recvbuf = precv recvbuf */ + ompi_op_reduce(op, precv, recvbuf, count, datatype); + } + /* Partial result: psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, datatype); + } else { + if (is_commute) { + /* psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, datatype); + } else { + /* precv = psend precv */ + ompi_op_reduce(op, psend, precv, count, datatype); + char *tmp = psend; + psend = precv; + precv = tmp; + } + } + } + } + +cleanup_and_return: + if (NULL != tmpsend_raw) + free(tmpsend_raw); + if (NULL != tmprecv_raw) + free(tmprecv_raw); + return err; +} diff --git a/ompi/mca/coll/base/coll_base_functions.h b/ompi/mca/coll/base/coll_base_functions.h index 9e81e2bd18..b5bb4d60f4 100644 --- a/ompi/mca/coll/base/coll_base_functions.h +++ b/ompi/mca/coll/base/coll_base_functions.h @@ -222,6 +222,7 @@ int ompi_coll_base_bcast_intra_bintree(BCAST_ARGS, uint32_t segsize); int ompi_coll_base_bcast_intra_split_bintree(BCAST_ARGS, uint32_t segsize); /* Exscan */ +int mca_coll_base_exscan_intra_recursivedoubling(EXSCAN_ARGS); /* Gather */ int ompi_coll_base_gather_intra_basic_linear(GATHER_ARGS); @@ -245,6 +246,7 @@ int ompi_coll_base_reduce_scatter_intra_basic_recursivehalving(REDUCESCATTER_ARG int ompi_coll_base_reduce_scatter_intra_ring(REDUCESCATTER_ARGS); /* Scan */ +int mca_coll_base_scan_intra_recursivedoubling(SCAN_ARGS); /* Scatter */ int ompi_coll_base_scatter_intra_basic_linear(SCATTER_ARGS); diff --git a/ompi/mca/coll/base/coll_base_scan.c b/ompi/mca/coll/base/coll_base_scan.c new file mode 100644 index 0000000000..8e8cba2242 --- /dev/null +++ b/ompi/mca/coll/base/coll_base_scan.c @@ -0,0 +1,130 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2018 Siberian State University of Telecommunications + * and Information Science. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/op/op.h" + +/* + * mca_coll_base_scan_intra_recursivedoubling + * + * Function: Recursive doubling algorithm for inclusive scan. + * Accepts: Same as MPI_Scan + * Returns: MPI_SUCCESS or error code + * + * Description: Implements recursive doubling algorithm for MPI_Scan. + * The algorithm preserves order of operations so it can + * be used both by commutative and non-commutative operations. + * + * Example for 5 processes and commutative operation MPI_SUM: + * Process: 0 1 2 3 4 + * recvbuf: [0] [1] [2] [3] [4] + * psend: [0] [1] [2] [3] [4] + * + * Step 1: + * recvbuf: [0] [0+1] [2] [2+3] [4] + * psend: [1+0] [0+1] [3+2] [2+3] [4] + * + * Step 2: + * recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [4] + * psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4] + * + * Step 3: + * recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [((3+2)+(1+0))+4] + * psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4] + * + * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma) + * Memory requirements (per process): 2 * count * typesize = O(count) + * Limitations: intra-communicators only + */ +int mca_coll_base_scan_intra_recursivedoubling( + const void *sendbuf, void *recvbuf, int count, struct ompi_datatype_t *datatype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + int err = MPI_SUCCESS; + char *tmpsend_raw = NULL, *tmprecv_raw = NULL; + int comm_size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, + "coll:base:scan_intra_recursivedoubling: rank %d/%d", + rank, comm_size)); + if (count == 0) + return MPI_SUCCESS; + + if (sendbuf != MPI_IN_PLACE) { + err = ompi_datatype_copy_content_same_ddt(datatype, count, recvbuf, (char *)sendbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + if (comm_size < 2) + return MPI_SUCCESS; + + ptrdiff_t dsize, gap; + dsize = opal_datatype_span(&datatype->super, count, &gap); + tmpsend_raw = malloc(dsize); + tmprecv_raw = malloc(dsize); + if (NULL == tmpsend_raw || NULL == tmprecv_raw) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + char *psend = tmpsend_raw - gap; + char *precv = tmprecv_raw - gap; + err = ompi_datatype_copy_content_same_ddt(dtype, count, psend, recvbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + int is_commute = ompi_op_is_commute(op); + + for (int mask = 1; mask < comm_size; mask <<= 1) { + int remote = rank ^ mask; + if (remote < comm_size) { + err = ompi_coll_base_sendrecv(psend, count, datatype, remote, + MCA_COLL_BASE_TAG_SCAN, + precv, count, datatype, remote, + MCA_COLL_BASE_TAG_SCAN, comm, + MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + + if (rank > remote) { + /* Accumulate prefix reduction: recvbuf = precv recvbuf */ + ompi_op_reduce(op, precv, recvbuf, count, datatype); + /* Partial result: psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, datatype); + } else { + if (is_commute) { + /* psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, datatype); + } else { + /* precv = psend precv */ + ompi_op_reduce(op, psend, precv, count, datatype); + char *tmp = psend; + psend = precv; + precv = tmp; + } + } + } + } + +cleanup_and_return: + if (NULL != tmpsend_raw) + free(tmpsend_raw); + if (NULL != tmprecv_raw) + free(tmprecv_raw); + return err; +}