From bd12e2b1c674cad060faf35e34ed6450b7441c36 Mon Sep 17 00:00:00 2001 From: Mikhail Kurnosov Date: Wed, 28 Mar 2018 16:27:11 +0700 Subject: [PATCH] Add recursive doubling algorithm for Scan and Exscan Implements recursive doubling algorithm for MPI_Scan and MPI_Exscan. The algorithm preserves order of operations so it can be used both by commutative and non-commutative operations. Signed-off-by: Mikhail Kurnosov --- ompi/mca/coll/spacc/Makefile.am | 4 +- ompi/mca/coll/spacc/coll_spacc.h | 10 ++ ompi/mca/coll/spacc/coll_spacc_exscan.c | 136 ++++++++++++++++++++++++ ompi/mca/coll/spacc/coll_spacc_module.c | 9 +- ompi/mca/coll/spacc/coll_spacc_scan.c | 127 ++++++++++++++++++++++ 5 files changed, 283 insertions(+), 3 deletions(-) create mode 100644 ompi/mca/coll/spacc/coll_spacc_exscan.c create mode 100644 ompi/mca/coll/spacc/coll_spacc_scan.c diff --git a/ompi/mca/coll/spacc/Makefile.am b/ompi/mca/coll/spacc/Makefile.am index b400922f02..f456e01784 100644 --- a/ompi/mca/coll/spacc/Makefile.am +++ b/ompi/mca/coll/spacc/Makefile.am @@ -12,7 +12,9 @@ sources = \ coll_spacc_component.c \ coll_spacc_module.c \ coll_spacc_allreduce.c \ - coll_spacc_reduce.c + coll_spacc_exscan.c \ + coll_spacc_reduce.c \ + coll_spacc_scan.c # Make the output library in this directory, and name it either # mca__.la (for DSO builds) or libmca__.la diff --git a/ompi/mca/coll/spacc/coll_spacc.h b/ompi/mca/coll/spacc/coll_spacc.h index a10f0a9fd4..7fc89538f7 100644 --- a/ompi/mca/coll/spacc/coll_spacc.h +++ b/ompi/mca/coll/spacc/coll_spacc.h @@ -41,6 +41,16 @@ int mca_coll_spacc_reduce_intra_redscat_gather( struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t *module); +int mca_coll_spacc_exscan_intra_recursivedoubling( + const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + +int mca_coll_spacc_scan_intra_recursivedoubling( + const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + /* * coll API functions */ diff --git a/ompi/mca/coll/spacc/coll_spacc_exscan.c b/ompi/mca/coll/spacc/coll_spacc_exscan.c new file mode 100644 index 0000000000..fc9c410bdc --- /dev/null +++ b/ompi/mca/coll/spacc/coll_spacc_exscan.c @@ -0,0 +1,136 @@ +/* + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "coll_spacc.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "opal/util/bit_ops.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/op/op.h" + +/* + * mca_coll_spacc_exscan_intra_recursivedoubling + * + * Function: Recursive doubling algorithm for exclusive scan. + * Accepts: Same as MPI_Exscan + * Returns: MPI_SUCCESS or error code + * + * Description: Implements recursive doubling algorithm for MPI_Exscan. + * The algorithm preserves order of operations so it can + * be used both by commutative and non-commutative operations. + * + * Example for 5 processes and commutative operation MPI_SUM: + * Process: 0 1 2 3 4 + * rbuf: - - - - - + * psend: [0] [1] [2] [3] [4] + * + * Step 1: + * rbuf: - [0] - [2] - + * psend: [1+0] [0+1] [3+2] [2+3] [4] + * + * Step 2: + * rbuf: - [0] [1+0] [(0+1)+2] - + * psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4] + * + * Step 3: + * rbuf - [0] [1+0] [(0+1)+2] [(3+2)+(1+0)] + * psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4] + * + * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma) + * Memory requirements (per process): 2 * count * typesize = O(count) + * Limitations: intra-communicators only + */ +int mca_coll_spacc_exscan_intra_recursivedoubling( + const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + int err = MPI_SUCCESS; + char *tmpsend_raw = NULL, *tmprecv_raw = NULL; + int comm_size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((mca_coll_spacc_stream, "coll:spacc:exscan_intra_recursivedoubling: rank %d/%d", + rank, comm_size)); + if (count == 0) + return MPI_SUCCESS; + if (comm_size < 2) + return MPI_SUCCESS; + + ptrdiff_t dsize, gap; + dsize = opal_datatype_span(&dtype->super, count, &gap); + tmpsend_raw = malloc(dsize); + tmprecv_raw = malloc(dsize); + if (NULL == tmpsend_raw || NULL == tmprecv_raw) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + char *psend = tmpsend_raw - gap; + char *precv = tmprecv_raw - gap; + if (sbuf != MPI_IN_PLACE) { + err = ompi_datatype_copy_content_same_ddt(dtype, count, psend, sbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } else { + err = ompi_datatype_copy_content_same_ddt(dtype, count, psend, rbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + int is_commute = ompi_op_is_commute(op); + int is_first_block = 1; + + for (int mask = 1; mask < comm_size; mask <<= 1) { + int remote = rank ^ mask; + if (remote < comm_size) { + err = ompi_coll_base_sendrecv(psend, count, dtype, remote, + MCA_COLL_BASE_TAG_SCAN, + precv, count, dtype, remote, + MCA_COLL_BASE_TAG_SCAN, comm, + MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + + if (rank > remote) { + /* Assertion: rank > 0 and rbuf is valid */ + if (is_first_block) { + err = ompi_datatype_copy_content_same_ddt(dtype, count, rbuf, precv); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + is_first_block = 0; + } else { + /* Accumulate prefix reduction: rbuf = precv rbuf */ + ompi_op_reduce(op, precv, rbuf, count, dtype); + } + /* Partial result: psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, dtype); + } else { + if (is_commute) { + /* psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, dtype); + } else { + /* precv = psend precv */ + ompi_op_reduce(op, psend, precv, count, dtype); + char *tmp = psend; + psend = precv; + precv = tmp; + } + } + } + } + +cleanup_and_return: + if (NULL != tmpsend_raw) + free(tmpsend_raw); + if (NULL != tmprecv_raw) + free(tmprecv_raw); + return err; +} diff --git a/ompi/mca/coll/spacc/coll_spacc_module.c b/ompi/mca/coll/spacc/coll_spacc_module.c index bd83b1e3b2..43b25c9115 100644 --- a/ompi/mca/coll/spacc/coll_spacc_module.c +++ b/ompi/mca/coll/spacc/coll_spacc_module.c @@ -11,6 +11,7 @@ #include "mpi.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/coll/base/base.h" +#include "ompi/mca/coll/base/coll_base_functions.h" #include "ompi/mca/coll/coll.h" #include "coll_spacc.h" @@ -67,13 +68,13 @@ mca_coll_base_module_t *ompi_coll_spacc_comm_query( spacc_module->super.coll_alltoallw = NULL; spacc_module->super.coll_barrier = NULL; spacc_module->super.coll_bcast = NULL; - spacc_module->super.coll_exscan = NULL; + spacc_module->super.coll_exscan = mca_coll_spacc_exscan_intra_recursivedoubling; spacc_module->super.coll_gather = NULL; spacc_module->super.coll_gatherv = NULL; spacc_module->super.coll_reduce = mca_coll_spacc_reduce_intra_redscat_gather; spacc_module->super.coll_reduce_scatter_block = NULL; spacc_module->super.coll_reduce_scatter = NULL; - spacc_module->super.coll_scan = NULL; + spacc_module->super.coll_scan = mca_coll_spacc_scan_intra_recursivedoubling; spacc_module->super.coll_scatter = NULL; spacc_module->super.coll_scatterv = NULL; @@ -87,6 +88,10 @@ static int spacc_module_enable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm) { opal_output_verbose(30, mca_coll_spacc_stream, "coll:spacc:module_enable called"); + /* prepare the placeholder for the array of request* */ + module->base_data = OBJ_NEW(mca_coll_base_comm_t); + if (NULL == module->base_data) + return OMPI_ERROR; return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/spacc/coll_spacc_scan.c b/ompi/mca/coll/spacc/coll_spacc_scan.c new file mode 100644 index 0000000000..d028185b28 --- /dev/null +++ b/ompi/mca/coll/spacc/coll_spacc_scan.c @@ -0,0 +1,127 @@ +/* + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "coll_spacc.h" + +#include "mpi.h" +#include "ompi/constants.h" +#include "opal/util/bit_ops.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/op/op.h" + +/* + * mca_coll_spacc_scan_intra_recursivedoubling + * + * Function: Recursive doubling algorithm for inclusive scan. + * Accepts: Same as MPI_Scan + * Returns: MPI_SUCCESS or error code + * + * Description: Implements recursive doubling algorithm for MPI_Scan. + * The algorithm preserves order of operations so it can + * be used both by commutative and non-commutative operations. + * + * Example for 5 processes and commutative operation MPI_SUM: + * Process: 0 1 2 3 4 + * rbuf: [0] [1] [2] [3] [4] + * psend: [0] [1] [2] [3] [4] + * Step 1: + * rbuf: [0] [0+1] [2] [2+3] [4] + * psend: [1+0] [0+1] [3+2] [2+3] [4] + * + * Step 2: + * rbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [4] + * psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4] + * + * Step 3: + * rbuf [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [((3+2)+(1+0))+4] + * psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4] + * + * Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma) + * Memory requirements (per process): 2 * count * typesize = O(count) + * Limitations: intra-communicators only + */ +int mca_coll_spacc_scan_intra_recursivedoubling( + const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + int err = MPI_SUCCESS; + char *tmpsend_raw = NULL, *tmprecv_raw = NULL; + int comm_size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + + OPAL_OUTPUT((mca_coll_spacc_stream, "coll:spacc:scan_intra_recursivedoubling: rank %d/%d", + rank, comm_size)); + if (count == 0) + return MPI_SUCCESS; + + if (sbuf != MPI_IN_PLACE) { + err = ompi_datatype_copy_content_same_ddt(dtype, count, rbuf, sbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + if (comm_size < 2) + return MPI_SUCCESS; + + ptrdiff_t dsize, gap; + dsize = opal_datatype_span(&dtype->super, count, &gap); + tmpsend_raw = malloc(dsize); + tmprecv_raw = malloc(dsize); + if (NULL == tmpsend_raw || NULL == tmprecv_raw) { + err = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup_and_return; + } + char *psend = tmpsend_raw - gap; + char *precv = tmprecv_raw - gap; + err = ompi_datatype_copy_content_same_ddt(dtype, count, psend, rbuf); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + int is_commute = ompi_op_is_commute(op); + + for (int mask = 1; mask < comm_size; mask <<= 1) { + int remote = rank ^ mask; + if (remote < comm_size) { + err = ompi_coll_base_sendrecv(psend, count, dtype, remote, + MCA_COLL_BASE_TAG_SCAN, + precv, count, dtype, remote, + MCA_COLL_BASE_TAG_SCAN, comm, + MPI_STATUS_IGNORE, rank); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + + if (rank > remote) { + /* Accumulate prefix reduction: rbuf = precv rbuf */ + ompi_op_reduce(op, precv, rbuf, count, dtype); + /* Partial result: psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, dtype); + } else { + if (is_commute) { + /* psend = precv psend */ + ompi_op_reduce(op, precv, psend, count, dtype); + } else { + /* precv = psend precv */ + ompi_op_reduce(op, psend, precv, count, dtype); + char *tmp = psend; + psend = precv; + precv = tmp; + } + } + } + } + +cleanup_and_return: + if (NULL != tmpsend_raw) + free(tmpsend_raw); + if (NULL != tmprecv_raw) + free(tmprecv_raw); + return err; +}