From e65fa4ff5ce0d1259f24d0329d015acfaaf0e801 Mon Sep 17 00:00:00 2001 From: Xi Luo Date: Sun, 24 Jun 2018 21:33:57 -0400 Subject: [PATCH] Bring ADAPT collective to 4.1 This is a meta commit, that encapsulate all the ADAPT commits in the master into a single PR for 4.1. The master commits included here are: fe73586, a4be3bb, d712645, c2970a3, e59bde9, ee592f3 and c98e387. Here is a detailed list of added capabilities: * coll/adapt: Fix naming conventions and C11 atomic use * coll/adapt: Remove unused component field in module * Consistent handling of zero counts in the MPI API. * Correctly handle non-blocking collectives tags * As it is possible to have multiple outstanding non-blocking collectives provided by different collective modules, we need a consistent mechanism to allow them to select unique tags for each instance of a collective. * Add support for fallback to previous coll module on non-commutative operations (#30) * Replace mutexes by atomic operations. * Use the correct nbc request type (for both ibcast and ireduce) * coll/base: document type casts in ompi_coll_base_retain_* * add module-wide topology cache * use standard instead of synchronous send and add mca parameter to control mode of initial send in ireduce/ibcast * reduce number of memory allocations * call the default request completion. * Remove the requests from the Fortran lookup conversion tables before completing and free it. * piggybacking Bull functionalities Signed-off-by: Xi Luo Signed-off-by: George Bosilca Signed-off-by: Marc Sergent Co-authored-by: Joseph Schuchart Co-authored-by: Lemarinier, Pierre Co-authored-by: pierrele <31764860+pierrele@users.noreply.github.com> --- ompi/communicator/comm_init.c | 4 +- ompi/communicator/communicator.h | 9 +- ompi/mca/coll/adapt/Makefile.am | 51 ++ ompi/mca/coll/adapt/coll_adapt.h | 145 ++++ ompi/mca/coll/adapt/coll_adapt_algorithms.h | 38 + ompi/mca/coll/adapt/coll_adapt_bcast.c | 26 + ompi/mca/coll/adapt/coll_adapt_component.c | 155 ++++ ompi/mca/coll/adapt/coll_adapt_context.c | 42 ++ ompi/mca/coll/adapt/coll_adapt_context.h | 128 ++++ ompi/mca/coll/adapt/coll_adapt_ibcast.c | 575 +++++++++++++++ ompi/mca/coll/adapt/coll_adapt_inbuf.c | 16 + ompi/mca/coll/adapt/coll_adapt_inbuf.h | 26 + ompi/mca/coll/adapt/coll_adapt_ireduce.c | 771 ++++++++++++++++++++ ompi/mca/coll/adapt/coll_adapt_item.c | 15 + ompi/mca/coll/adapt/coll_adapt_item.h | 25 + ompi/mca/coll/adapt/coll_adapt_module.c | 200 +++++ ompi/mca/coll/adapt/coll_adapt_reduce.c | 40 + ompi/mca/coll/adapt/coll_adapt_topocache.c | 105 +++ ompi/mca/coll/adapt/coll_adapt_topocache.h | 35 + ompi/mca/coll/base/coll_base_util.h | 37 + ompi/mca/coll/libnbc/coll_libnbc.h | 1 - ompi/mca/coll/libnbc/nbc.c | 23 +- ompi/mca/coll/tuned/coll_tuned_component.c | 13 +- ompi/mpi/c/ibcast.c | 10 +- ompi/mpi/c/ireduce_scatter_block.c | 7 +- ompi/mpi/c/reduce_scatter_block.c | 5 +- ompi/request/request.h | 23 +- 27 files changed, 2487 insertions(+), 38 deletions(-) create mode 100644 ompi/mca/coll/adapt/Makefile.am create mode 100644 ompi/mca/coll/adapt/coll_adapt.h create mode 100644 ompi/mca/coll/adapt/coll_adapt_algorithms.h create mode 100644 ompi/mca/coll/adapt/coll_adapt_bcast.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_component.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_context.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_context.h create mode 100644 ompi/mca/coll/adapt/coll_adapt_ibcast.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_inbuf.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_inbuf.h create mode 100644 ompi/mca/coll/adapt/coll_adapt_ireduce.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_item.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_item.h create mode 100644 ompi/mca/coll/adapt/coll_adapt_module.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_reduce.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_topocache.c create mode 100644 ompi/mca/coll/adapt/coll_adapt_topocache.h diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index 75aac4d49e..b85da3a49d 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2017 The University of Tennessee and The University + * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -39,6 +39,7 @@ #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/coll/base/base.h" +#include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/topo/base/base.h" #include "ompi/runtime/params.h" #include "ompi/communicator/communicator.h" @@ -378,6 +379,7 @@ static void ompi_comm_construct(ompi_communicator_t* comm) comm->c_pml_comm = NULL; comm->c_topo = NULL; comm->c_coll = NULL; + comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; /* A keyhash will be created if/when an attribute is cached on this communicator */ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 4fe4721244..b7a31f1dfc 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2017 The University of Tennessee and The University + * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -187,6 +187,13 @@ struct ompi_communicator_t { /* Collectives module interface and data */ mca_coll_base_comm_coll_t *c_coll; + + /* Non-blocking collective tag. These tags might be shared between + * all non-blocking collective modules (to avoid message collision + * between them in the case where multiple outstanding non-blocking + * collective coexists using multiple backends). + */ + volatile int32_t c_nbc_tag; }; typedef struct ompi_communicator_t ompi_communicator_t; diff --git a/ompi/mca/coll/adapt/Makefile.am b/ompi/mca/coll/adapt/Makefile.am new file mode 100644 index 0000000000..5b69d3fded --- /dev/null +++ b/ompi/mca/coll/adapt/Makefile.am @@ -0,0 +1,51 @@ +# +# Copyright (c) 2014-2020 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + +sources = \ + coll_adapt_component.c \ + coll_adapt_module.c \ + coll_adapt_bcast.c \ + coll_adapt_ibcast.c \ + coll_adapt_reduce.c \ + coll_adapt_ireduce.c \ + coll_adapt.h \ + coll_adapt_algorithms.h \ + coll_adapt_context.h \ + coll_adapt_context.c \ + coll_adapt_inbuf.c \ + coll_adapt_inbuf.h \ + coll_adapt_item.c \ + coll_adapt_item.h \ + coll_adapt_topocache.c \ + coll_adapt_topocache.h + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +component_noinst = +component_install = +if MCA_BUILD_ompi_coll_adapt_DSO +component_install += mca_coll_adapt.la +else +component_noinst += libmca_coll_adapt.la +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_coll_adapt_la_SOURCES = $(sources) +mca_coll_adapt_la_LDFLAGS = -module -avoid-version +mca_coll_adapt_la_LIBADD = + +noinst_LTLIBRARIES = $(component_noinst) +libmca_coll_adapt_la_SOURCES =$(sources) +libmca_coll_adapt_la_LDFLAGS = -module -avoid-version diff --git a/ompi/mca/coll/adapt/coll_adapt.h b/ompi/mca/coll/adapt/coll_adapt.h new file mode 100644 index 0000000000..79e90174d4 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt.h @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#ifndef MCA_COLL_ADAPT_EXPORT_H +#define MCA_COLL_ADAPT_EXPORT_H + +#include "ompi_config.h" + +#include "mpi.h" +#include "opal/mca/mca.h" +#include "opal/datatype/opal_convertor.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_topo.h" + +BEGIN_C_DECLS + +typedef struct mca_coll_adapt_module_t mca_coll_adapt_module_t; + +typedef enum { + OMPI_COLL_ADAPT_ALGORITHM_TUNED = 0, + OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL, + OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL, + OMPI_COLL_ADAPT_ALGORITHM_BINARY, + OMPI_COLL_ADAPT_ALGORITHM_PIPELINE, + OMPI_COLL_ADAPT_ALGORITHM_CHAIN, + OMPI_COLL_ADAPT_ALGORITHM_LINEAR, + OMPI_COLL_ADAPT_ALGORITHM_COUNT /* number of algorithms, keep last! */ +} ompi_coll_adapt_algorithm_t; + +/* + * Structure to hold the adapt coll component. First it holds the + * base coll component, and then holds a bunch of + * adapt-coll-component-specific stuff (e.g., current MCA param + * values). + */ +typedef struct mca_coll_adapt_component_t { + /* Base coll component */ + mca_coll_base_component_2_0_0_t super; + + /* MCA parameter: Priority of this component */ + int adapt_priority; + + /* MCA parameter: Output stream and verbose level */ + int adapt_output; + int adapt_verbose; + + /* MCA parameter: Maximum number of segment in context free list */ + int adapt_context_free_list_max; + + /* MCA parameter: Minimum number of segment in context free list */ + int adapt_context_free_list_min; + + /* MCA parameter: Increasement number of segment in context free list */ + int adapt_context_free_list_inc; + + /* Bcast MCA parameter */ + int adapt_ibcast_algorithm; + size_t adapt_ibcast_segment_size; + int adapt_ibcast_max_send_requests; + int adapt_ibcast_max_recv_requests; + bool adapt_ibcast_synchronous_send; + /* Bcast free list */ + opal_free_list_t *adapt_ibcast_context_free_list; + + /* Reduce MCA parameter */ + int adapt_ireduce_algorithm; + size_t adapt_ireduce_segment_size; + int adapt_ireduce_max_send_requests; + int adapt_ireduce_max_recv_requests; + int adapt_inbuf_free_list_min; + int adapt_inbuf_free_list_max; + int adapt_inbuf_free_list_inc; + bool adapt_ireduce_synchronous_send; + + /* Reduce free list */ + opal_free_list_t *adapt_ireduce_context_free_list; + +} mca_coll_adapt_component_t; + +/* + * Structure used to store what is necessary for the collective operations + * routines in case of fallback. + */ +typedef struct mca_coll_adapt_collective_fallback_s { + union { + mca_coll_base_module_reduce_fn_t reduce; + mca_coll_base_module_ireduce_fn_t ireduce; + } previous_routine; + mca_coll_base_module_t *previous_module; +} mca_coll_adapt_collective_fallback_t; + + +typedef enum mca_coll_adapt_colltype { + ADAPT_REDUCE = 0, + ADAPT_IREDUCE = 1, + ADAPT_COLLCOUNT +} mca_coll_adapt_colltype_t; + +/* + * Some defines to stick to the naming used in the other components in terms of + * fallback routines + */ +#define previous_reduce previous_routines[ADAPT_REDUCE].previous_routine.reduce +#define previous_ireduce previous_routines[ADAPT_IREDUCE].previous_routine.ireduce + +#define previous_reduce_module previous_routines[ADAPT_REDUCE].previous_module +#define previous_ireduce_module previous_routines[ADAPT_IREDUCE].previous_module + + +/* Coll adapt module per communicator*/ +struct mca_coll_adapt_module_t { + /* Base module */ + mca_coll_base_module_t super; + + /* To be able to fallback when the cases are not supported */ + struct mca_coll_adapt_collective_fallback_s previous_routines[ADAPT_COLLCOUNT]; + + /* cached topologies */ + opal_list_t *topo_cache; + + /* Whether this module has been lazily initialized or not yet */ + bool adapt_enabled; +}; +OBJ_CLASS_DECLARATION(mca_coll_adapt_module_t); + +/* Global component instance */ +OMPI_MODULE_DECLSPEC extern mca_coll_adapt_component_t mca_coll_adapt_component; + +/* ADAPT module functions */ +int ompi_coll_adapt_init_query(bool enable_progress_threads, bool enable_mpi_threads); +mca_coll_base_module_t * ompi_coll_adapt_comm_query(struct ompi_communicator_t *comm, int *priority); + +/* ADAPT request free */ +int ompi_coll_adapt_request_free(ompi_request_t **request); + +#endif /* MCA_COLL_ADAPT_EXPORT_H */ diff --git a/ompi/mca/coll/adapt/coll_adapt_algorithms.h b/ompi/mca/coll/adapt/coll_adapt_algorithms.h new file mode 100644 index 0000000000..16d365cc60 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_algorithms.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_topo.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include + +typedef int (*ompi_mca_coll_adapt_ibcast_function_t)(IBCAST_ARGS); +typedef int (*ompi_mca_coll_adapt_ireduce_function_t)(IREDUCE_ARGS); + +typedef struct ompi_coll_adapt_algorithm_index_s { + int algorithm_index; + union { + ompi_mca_coll_adapt_ibcast_function_t ibcast_fn_ptr; + ompi_mca_coll_adapt_ireduce_function_t ireduce_fn_ptr; + }; +} ompi_coll_adapt_algorithm_index_t; + +/* Bcast */ +int ompi_coll_adapt_ibcast_register(void); +int ompi_coll_adapt_ibcast_fini(void); +int ompi_coll_adapt_bcast(BCAST_ARGS); +int ompi_coll_adapt_ibcast(IBCAST_ARGS); + +/* Reduce */ +int ompi_coll_adapt_ireduce_register(void); +int ompi_coll_adapt_ireduce_fini(void); +int ompi_coll_adapt_reduce(REDUCE_ARGS); +int ompi_coll_adapt_ireduce(IREDUCE_ARGS); diff --git a/ompi/mca/coll/adapt/coll_adapt_bcast.c b/ompi/mca/coll/adapt/coll_adapt_bcast.c new file mode 100644 index 0000000000..9cfebd9785 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_bcast.c @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_adapt.h" +#include "coll_adapt_algorithms.h" + +int ompi_coll_adapt_bcast(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, mca_coll_base_module_t * module) +{ + ompi_request_t *request = NULL; + int err = ompi_coll_adapt_ibcast(buff, count, datatype, root, comm, &request, module); + if( MPI_SUCCESS != err ) { + if( NULL == request ) + return err; + } + ompi_request_wait(&request, MPI_STATUS_IGNORE); + return err; +} diff --git a/ompi/mca/coll/adapt/coll_adapt_component.c b/ompi/mca/coll/adapt/coll_adapt_component.c new file mode 100644 index 0000000000..6ec9964e90 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_component.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "opal/util/show_help.h" +#include "ompi/constants.h" +#include "ompi/mca/coll/coll.h" +#include "coll_adapt.h" +#include "coll_adapt_algorithms.h" + +/* + * Public string showing the coll ompi_adapt component version number + */ +const char *mca_coll_adapt_component_version_string = + "Open MPI ADAPT collective MCA component version " OMPI_VERSION; + +/* + * Local functions + */ +static int adapt_open(void); +static int adapt_close(void); +static int adapt_register(void); + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ + +mca_coll_adapt_component_t mca_coll_adapt_component = { + /* First, fill in the super */ + { + /* First, the mca_component_t struct containing meta + information about the component itself */ + .collm_version = { + MCA_COLL_BASE_VERSION_2_0_0, + + /* Component name and version */ + .mca_component_name = "adapt", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + + /* Component functions */ + .mca_open_component = adapt_open, + .mca_close_component = adapt_close, + .mca_register_component_params = adapt_register, + }, + .collm_data = { + /* The component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + /* Initialization / querying functions */ + .collm_init_query = ompi_coll_adapt_init_query, + .collm_comm_query = ompi_coll_adapt_comm_query, + }, + + /* adapt-component specific information */ + + 0, /* (default) priority */ + + 0, /* (default) output stream */ + 0, /* (default) verbose level */ + + /* default values for non-MCA parameters */ + /* Not specifying values here gives us all 0's */ +}; + +/* Open the component */ +static int adapt_open(void) +{ + mca_coll_adapt_component_t *cs = &mca_coll_adapt_component; + + if (cs->adapt_verbose > 0) { + cs->adapt_output = opal_output_open(NULL); + opal_output_set_verbosity(cs->adapt_output, cs->adapt_verbose); + } + + return OMPI_SUCCESS; +} + + +/* Shut down the component */ +static int adapt_close(void) +{ + ompi_coll_adapt_ibcast_fini(); + ompi_coll_adapt_ireduce_fini(); + + return OMPI_SUCCESS; +} + +static int adapt_verify_mca_variables(void) +{ + return OMPI_SUCCESS; +} + +/* + * Register MCA params + */ +static int adapt_register(void) +{ + mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version; + mca_coll_adapt_component_t *cs = &mca_coll_adapt_component; + + /* If we want to be selected (i.e., all procs on one node), then + we should have a high priority */ + cs->adapt_priority = 0; + (void) mca_base_component_var_register(c, "priority", "Priority of the adapt coll component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->adapt_priority); + + cs->adapt_verbose = ompi_coll_base_framework.framework_verbose; + (void) mca_base_component_var_register(c, "verbose", + "Verbose level (default set to the collective framework verbosity)", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->adapt_verbose); + + cs->adapt_context_free_list_min = 64; + (void) mca_base_component_var_register(c, "context_free_list_min", + "Minimum number of segments in context free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &cs->adapt_context_free_list_min); + + cs->adapt_context_free_list_max = 1024; + (void) mca_base_component_var_register(c, "context_free_list_max", + "Maximum number of segments in context free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &cs->adapt_context_free_list_max); + + cs->adapt_context_free_list_inc = 32; + (void) mca_base_component_var_register(c, "context_free_list_inc", + "Increasement number of segments in context free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &cs->adapt_context_free_list_inc); + ompi_coll_adapt_ibcast_register(); + ompi_coll_adapt_ireduce_register(); + + return adapt_verify_mca_variables(); +} diff --git a/ompi/mca/coll/adapt/coll_adapt_context.c b/ompi/mca/coll/adapt/coll_adapt_context.c new file mode 100644 index 0000000000..a28960ebe4 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_context.c @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi/mca/coll/coll.h" +#include "coll_adapt_context.h" + + +static void adapt_constant_reduce_context_construct(ompi_coll_adapt_constant_reduce_context_t *context) +{ + OBJ_CONSTRUCT(&context->recv_list, opal_list_t); + OBJ_CONSTRUCT(&context->mutex_recv_list, opal_mutex_t); + OBJ_CONSTRUCT(&context->inbuf_list, opal_free_list_t); +} + +static void adapt_constant_reduce_context_destruct(ompi_coll_adapt_constant_reduce_context_t *context) +{ + OBJ_DESTRUCT(&context->mutex_recv_list); + OBJ_DESTRUCT(&context->recv_list); + OBJ_DESTRUCT(&context->inbuf_list); +} + + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_bcast_context_t, opal_free_list_item_t, + NULL, NULL); + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_constant_bcast_context_t, opal_object_t, + NULL, NULL); + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_reduce_context_t, opal_free_list_item_t, + NULL, NULL); + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_constant_reduce_context_t, opal_object_t, + &adapt_constant_reduce_context_construct, + &adapt_constant_reduce_context_destruct); diff --git a/ompi/mca/coll/adapt/coll_adapt_context.h b/ompi/mca/coll/adapt/coll_adapt_context.h new file mode 100644 index 0000000000..5d729423fb --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_context.h @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi/mca/coll/coll.h" +#include "opal/class/opal_free_list.h" +#include "opal/class/opal_list.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/op/op.h" +#include "ompi/mca/coll/base/coll_base_topo.h" +#include "coll_adapt_inbuf.h" + +/* Bcast constant context in bcast context */ +struct ompi_coll_adapt_constant_bcast_context_s { + opal_object_t super; + int root; + size_t count; + size_t seg_count; + ompi_datatype_t *datatype; + ompi_communicator_t *comm; + int real_seg_size; + int num_segs; + ompi_request_t *request; + opal_mutex_t *mutex; + int *recv_array; + int *send_array; + /* Length of the fragment array, which is the number of recevied segments */ + int num_recv_segs; + /* Number of segments that is finishing recving */ + int num_recv_fini; + /* Store the number of sent segments */ + int num_sent_segs; + ompi_coll_tree_t *tree; + int ibcast_tag; +}; + +typedef struct ompi_coll_adapt_constant_bcast_context_s ompi_coll_adapt_constant_bcast_context_t; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_constant_bcast_context_t); + + +/* Bcast context of each segment*/ +typedef struct ompi_coll_adapt_bcast_context_s ompi_coll_adapt_bcast_context_t; + +typedef int (*ompi_coll_adapt_bcast_cuda_callback_fn_t) (ompi_coll_adapt_bcast_context_t * context); + +struct ompi_coll_adapt_bcast_context_s { + opal_free_list_item_t super; + char *buff; + int frag_id; + int child_id; + int peer; + ompi_coll_adapt_constant_bcast_context_t *con; +}; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_bcast_context_t); + +/* Reduce constant context in reduce context */ +struct ompi_coll_adapt_constant_reduce_context_s { + opal_object_t super; + size_t count; + size_t seg_count; + ompi_datatype_t *datatype; + ompi_communicator_t *comm; + size_t real_seg_size; + /* Increment of each segment */ + int segment_increment; + int num_segs; + int rank; + int root; + /* The distance between the address of inbuf->buff and the address of inbuf */ + int distance; + int ireduce_tag; + /* How many sends are posted but not finished */ + int32_t ongoing_send; + /* Length of the fragment array, which is the number of recevied segments */ + int32_t num_recv_segs; + /* Number of sent segments */ + int32_t num_sent_segs; + /* Next seg need to be received for every children */ + int32_t *next_recv_segs; + /* Mutex to protect each segment when do the reduce op */ + opal_mutex_t *mutex_op_list; + /* Reduce operation */ + ompi_op_t *op; + ompi_coll_tree_t *tree; + /* Accumulate buff */ + char **accumbuf; + ptrdiff_t lower_bound; + char *sbuf; + char *rbuf; + opal_free_list_t inbuf_list; + /* Mutex to protect recv_list */ + opal_mutex_t mutex_recv_list; + /* A list to store the segments which are received and not yet be sent */ + opal_list_t recv_list; + ompi_request_t *request; +}; + +typedef struct ompi_coll_adapt_constant_reduce_context_s ompi_coll_adapt_constant_reduce_context_t; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_constant_reduce_context_t); + +/* Reduce context of each segment */ +typedef struct ompi_coll_adapt_reduce_context_s ompi_coll_adapt_reduce_context_t; + +typedef int (*ompi_coll_adapt_reduce_cuda_callback_fn_t) (ompi_coll_adapt_reduce_context_t * context); + +struct ompi_coll_adapt_reduce_context_s { + opal_free_list_item_t super; + char *buff; + int seg_index; + int child_id; + int peer; + ompi_coll_adapt_constant_reduce_context_t *con; + /* store the incoming segment */ + ompi_coll_adapt_inbuf_t *inbuf; +}; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_reduce_context_t); diff --git a/ompi/mca/coll/adapt/coll_adapt_ibcast.c b/ompi/mca/coll/adapt/coll_adapt_ibcast.c new file mode 100644 index 0000000000..4091be7ada --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_ibcast.c @@ -0,0 +1,575 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "ompi/mca/pml/pml.h" +#include "coll_adapt.h" +#include "coll_adapt_algorithms.h" +#include "coll_adapt_context.h" +#include "coll_adapt_topocache.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "opal/util/bit_ops.h" +#include "opal/sys/atomic.h" +#include "ompi/mca/pml/ob1/pml_ob1.h" + +static int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS, + ompi_coll_tree_t * tree, size_t seg_size); + +/* + * Set up MCA parameters of MPI_Bcast and MPI_IBcast + */ +int ompi_coll_adapt_ibcast_register(void) +{ + mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version; + + mca_coll_adapt_component.adapt_ibcast_algorithm = 1; + mca_base_component_var_register(c, "bcast_algorithm", + "Algorithm of broadcast, 0: tuned, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_algorithm); + if( (mca_coll_adapt_component.adapt_ibcast_algorithm < 0) || + (mca_coll_adapt_component.adapt_ibcast_algorithm >= OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) { + mca_coll_adapt_component.adapt_ibcast_algorithm = 1; + } + + mca_coll_adapt_component.adapt_ibcast_segment_size = 0; + mca_base_component_var_register(c, "bcast_segment_size", + "Segment size in bytes used by default for bcast algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_segment_size); + + mca_coll_adapt_component.adapt_ibcast_max_send_requests = 2; + mca_base_component_var_register(c, "bcast_max_send_requests", + "Maximum number of send requests", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_max_send_requests); + + mca_coll_adapt_component.adapt_ibcast_max_recv_requests = 3; + mca_base_component_var_register(c, "bcast_max_recv_requests", + "Maximum number of receive requests", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_max_recv_requests); + + mca_coll_adapt_component.adapt_ibcast_synchronous_send = true; + (void) mca_base_component_var_register(c, "bcast_synchronous_send", + "Whether to use synchronous send operations during setup of bcast operations", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ibcast_synchronous_send); + + mca_coll_adapt_component.adapt_ibcast_context_free_list = NULL; + return OMPI_SUCCESS; +} + +/* + * Release the free list created in ompi_coll_adapt_ibcast_generic + */ +int ompi_coll_adapt_ibcast_fini(void) +{ + if (NULL != mca_coll_adapt_component.adapt_ibcast_context_free_list) { + OBJ_RELEASE(mca_coll_adapt_component.adapt_ibcast_context_free_list); + mca_coll_adapt_component.adapt_ibcast_context_free_list = NULL; + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "ibcast fini\n")); + } + return OMPI_SUCCESS; +} + +/* + * Finish a ibcast request + */ +static int ibcast_request_fini(ompi_coll_adapt_bcast_context_t * context) +{ + ompi_request_t *temp_req = context->con->request; + if (context->con->tree->tree_nextsize != 0) { + free(context->con->send_array); + } + if (context->con->num_segs != 0) { + free(context->con->recv_array); + } + OBJ_RELEASE(context->con->mutex); + OBJ_RELEASE(context->con); + ompi_request_complete(temp_req, 1); + + return OMPI_SUCCESS; +} + +/* + * Callback function of isend + */ +static int send_cb(ompi_request_t * req) +{ + ompi_coll_adapt_bcast_context_t *context = + (ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data; + + int err; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Send(cb): segment %d to %d at buff %p root %d\n", + ompi_comm_rank(context->con->comm), context->frag_id, + context->peer, (void *) context->buff, context->con->root)); + + OPAL_THREAD_LOCK(context->con->mutex); + int sent_id = context->con->send_array[context->child_id]; + /* If the current process has fragments in recv_array can be sent */ + if (sent_id < context->con->num_recv_segs) { + ompi_request_t *send_req; + ompi_coll_adapt_bcast_context_t *send_context; + int new_id = context->con->recv_array[sent_id]; + ++(context->con->send_array[context->child_id]); + OPAL_THREAD_UNLOCK(context->con->mutex); + + send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); + send_context->buff = + context->buff + (new_id - context->frag_id) * context->con->real_seg_size; + send_context->frag_id = new_id; + send_context->child_id = context->child_id; + send_context->peer = context->peer; + send_context->con = context->con; + int send_count = send_context->con->seg_count; + if (new_id == (send_context->con->num_segs - 1)) { + send_count = send_context->con->count - new_id * send_context->con->seg_count; + } + char *send_buff = send_context->buff; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n", + ompi_comm_rank(send_context->con->comm), send_context->frag_id, + send_context->peer, (void *) send_context->buff, send_count, + send_context->con->ibcast_tag - new_id)); + err = MCA_PML_CALL(isend + (send_buff, send_count, send_context->con->datatype, send_context->peer, + send_context->con->ibcast_tag - new_id, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); + if (MPI_SUCCESS != err) { + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *)send_context); + OBJ_RELEASE(context->con); + return err; + } + /* Set send callback */ + ompi_request_set_callback(send_req, send_cb, send_context); + OPAL_THREAD_LOCK(context->con->mutex); + } else { + /* No future send here, we can release the ref */ + OBJ_RELEASE(context->con); + } + int num_sent = ++(context->con->num_sent_segs); + int num_recv_fini = context->con->num_recv_fini; + int rank = ompi_comm_rank(context->con->comm); + OPAL_THREAD_UNLOCK(context->con->mutex); + /* Check whether signal the condition */ + if ((rank == context->con->root + && num_sent == context->con->tree->tree_nextsize * context->con->num_segs) + || (context->con->tree->tree_nextsize > 0 && rank != context->con->root + && num_sent == context->con->tree->tree_nextsize * context->con->num_segs + && num_recv_fini == context->con->num_segs)) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n", + ompi_comm_rank(context->con->comm))); + ibcast_request_fini(context); + } + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *) context); + req->req_free(&req); + /* Call back function return 1 to signal that request has been free'd */ + return 1; +} + +/* + * Callback function of irecv + */ +static int recv_cb(ompi_request_t * req) +{ + /* Get necessary info from request */ + ompi_coll_adapt_bcast_context_t *context = + (ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data; + + int err, i; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Recv(cb): segment %d from %d at buff %p root %d\n", + ompi_comm_rank(context->con->comm), context->frag_id, + context->peer, (void *) context->buff, context->con->root)); + + /* Store the frag_id to seg array */ + OPAL_THREAD_LOCK(context->con->mutex); + int num_recv_segs = ++(context->con->num_recv_segs); + context->con->recv_array[num_recv_segs - 1] = context->frag_id; + OPAL_THREAD_UNLOCK(context->con->mutex); + + int new_id = num_recv_segs + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1; + /* Receive new segment */ + if (new_id < context->con->num_segs) { + ompi_request_t *recv_req; + ompi_coll_adapt_bcast_context_t *recv_context; + /* Get new context item from free list */ + recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); + recv_context->buff = + context->buff + (new_id - context->frag_id) * context->con->real_seg_size; + recv_context->frag_id = new_id; + recv_context->child_id = context->child_id; + recv_context->peer = context->peer; + recv_context->con = context->con; + OBJ_RETAIN(context->con); + int recv_count = recv_context->con->seg_count; + if (new_id == (recv_context->con->num_segs - 1)) { + recv_count = recv_context->con->count - new_id * recv_context->con->seg_count; + } + char *recv_buff = recv_context->buff; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n", + ompi_comm_rank(context->con->comm), context->frag_id, context->peer, + (void *) recv_buff, recv_count, + recv_context->con->ibcast_tag - recv_context->frag_id)); + MCA_PML_CALL(irecv + (recv_buff, recv_count, recv_context->con->datatype, recv_context->peer, + recv_context->con->ibcast_tag - recv_context->frag_id, + recv_context->con->comm, &recv_req)); + + /* Set the receive callback */ + ompi_request_set_callback(recv_req, recv_cb, recv_context); + } + + OPAL_THREAD_LOCK(context->con->mutex); + /* Propagate segment to all children */ + for (i = 0; i < context->con->tree->tree_nextsize; i++) { + /* If the current process can send the segment now, which means the only segment need to be sent is the just arrived one */ + if (num_recv_segs - 1 == context->con->send_array[i]) { + ompi_request_t *send_req; + + ++(context->con->send_array[i]); + + /* release mutex to avoid deadlock in case a callback is triggered below */ + OPAL_THREAD_UNLOCK(context->con->mutex); + + int send_count = context->con->seg_count; + if (context->frag_id == (context->con->num_segs - 1)) { + send_count = context->con->count - context->frag_id * context->con->seg_count; + } + + ompi_coll_adapt_bcast_context_t *send_context; + send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list); + send_context->buff = context->buff; + send_context->frag_id = context->frag_id; + send_context->child_id = i; + send_context->peer = context->con->tree->tree_next[i]; + send_context->con = context->con; + OBJ_RETAIN(context->con); + char *send_buff = send_context->buff; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n", + ompi_comm_rank(send_context->con->comm), send_context->frag_id, + send_context->peer, (void *) send_context->buff, send_count, + send_context->con->ibcast_tag - send_context->frag_id)); + err = + MCA_PML_CALL(isend + (send_buff, send_count, send_context->con->datatype, + send_context->peer, + send_context->con->ibcast_tag - send_context->frag_id, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); + if (MPI_SUCCESS != err) { + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *)send_context); + OBJ_RELEASE(context->con); + return err; + } + /* Set send callback */ + ompi_request_set_callback(send_req, send_cb, send_context); + + /* retake the mutex for next iteration */ + OPAL_THREAD_LOCK(context->con->mutex); + } + } + OBJ_RELEASE(context->con); + + int num_sent = context->con->num_sent_segs; + int num_recv_fini = ++(context->con->num_recv_fini); + + OPAL_THREAD_UNLOCK(context->con->mutex); + /* If this process is leaf and has received all the segments */ + if ((context->con->tree->tree_nextsize > 0 + && num_sent == context->con->tree->tree_nextsize * context->con->num_segs + && num_recv_fini == context->con->num_segs) || (context->con->tree->tree_nextsize == 0 + && num_recv_fini == context->con->num_segs)) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n", + ompi_comm_rank(context->con->comm))); + ibcast_request_fini(context); + } + opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list, + (opal_free_list_item_t *) context); + req->req_free(&req); + + /* Call back function return 1 to signal that request has been free'd */ + return 1; +} + +int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module) +{ + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, + "ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n", + root, mca_coll_adapt_component.adapt_ibcast_algorithm, + mca_coll_adapt_component.adapt_ibcast_segment_size, + mca_coll_adapt_component.adapt_ibcast_max_send_requests, + mca_coll_adapt_component.adapt_ibcast_max_recv_requests)); + + if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ibcast_algorithm) { + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); + return OMPI_ERR_NOT_IMPLEMENTED; + } + + return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module, + adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ibcast_algorithm), + mca_coll_adapt_component.adapt_ibcast_segment_size); +} + + +int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module, ompi_coll_tree_t * tree, + size_t seg_size) +{ + int i, j, rank, err; + /* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */ + int min; + + /* Number of datatype in a segment */ + int seg_count = count; + /* Size of a datatype */ + size_t type_size; + /* Real size of a segment */ + size_t real_seg_size; + ptrdiff_t extent, lb; + /* Number of segments */ + int num_segs; + + mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ibcast_synchronous_send) + ? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD; + + /* The request passed outside */ + ompi_coll_base_nbc_request_t *temp_request = NULL; + opal_mutex_t *mutex; + /* Store the segments which are received */ + int *recv_array = NULL; + /* Record how many isends have been issued for every child */ + int *send_array = NULL; + + /* Atomically set up free list */ + if (NULL == mca_coll_adapt_component.adapt_ibcast_context_free_list) { + opal_free_list_t* fl = OBJ_NEW(opal_free_list_t); + opal_free_list_init(fl, + sizeof(ompi_coll_adapt_bcast_context_t), + opal_cache_line_size, + OBJ_CLASS(ompi_coll_adapt_bcast_context_t), + 0, opal_cache_line_size, + mca_coll_adapt_component.adapt_context_free_list_min, + mca_coll_adapt_component.adapt_context_free_list_max, + mca_coll_adapt_component.adapt_context_free_list_inc, + NULL, 0, NULL, NULL, NULL); + if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&mca_coll_adapt_component.adapt_ibcast_context_free_list, + &(intptr_t){0}, fl) ) { + OBJ_RELEASE(fl); + } + } + + /* Set up request */ + temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t); + OMPI_REQUEST_INIT(&temp_request->super, false); + temp_request->super.req_state = OMPI_REQUEST_ACTIVE; + temp_request->super.req_type = OMPI_REQUEST_COLL; + temp_request->super.req_free = ompi_coll_adapt_request_free; + temp_request->super.req_status.MPI_SOURCE = 0; + temp_request->super.req_status.MPI_TAG = 0; + temp_request->super.req_status.MPI_ERROR = 0; + temp_request->super.req_status._cancelled = 0; + temp_request->super.req_status._ucount = 0; + *request = (ompi_request_t*)temp_request; + + /* Set up mutex */ + mutex = OBJ_NEW(opal_mutex_t); + + rank = ompi_comm_rank(comm); + + /* Determine number of elements sent per operation */ + ompi_datatype_type_size(datatype, &type_size); + COLL_BASE_COMPUTED_SEGCOUNT(seg_size, type_size, seg_count); + + ompi_datatype_get_extent(datatype, &lb, &extent); + num_segs = (count + seg_count - 1) / seg_count; + real_seg_size = (ptrdiff_t) seg_count *extent; + + /* Set memory for recv_array and send_array, created on heap becasue they are needed to be accessed by other functions (callback functions) */ + if (num_segs != 0) { + recv_array = (int *) malloc(sizeof(int) * num_segs); + } + if (tree->tree_nextsize != 0) { + send_array = (int *) malloc(sizeof(int) * tree->tree_nextsize); + } + + /* Set constant context for send and recv call back */ + ompi_coll_adapt_constant_bcast_context_t *con = OBJ_NEW(ompi_coll_adapt_constant_bcast_context_t); + con->root = root; + con->count = count; + con->seg_count = seg_count; + con->datatype = datatype; + con->comm = comm; + con->real_seg_size = real_seg_size; + con->num_segs = num_segs; + con->recv_array = recv_array; + con->num_recv_segs = 0; + con->num_recv_fini = 0; + con->send_array = send_array; + con->num_sent_segs = 0; + con->mutex = mutex; + con->request = (ompi_request_t*)temp_request; + con->tree = tree; + con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Ibcast, root %d, tag %d\n", rank, root, + con->ibcast_tag)); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n", + rank, (void *) con->mutex, tree->tree_nextsize, num_segs, + (int) real_seg_size, seg_count, (void *) con->tree)); + + OPAL_THREAD_LOCK(mutex); + + /* If the current process is root, it sends segment to every children */ + if (rank == root) { + /* Handle the situation when num_segs < SEND_NUM */ + if (num_segs <= mca_coll_adapt_component.adapt_ibcast_max_send_requests) { + min = num_segs; + } else { + min = mca_coll_adapt_component.adapt_ibcast_max_send_requests; + } + + /* Set recv_array, root has already had all the segments */ + for (i = 0; i < num_segs; i++) { + recv_array[i] = i; + } + con->num_recv_segs = num_segs; + /* Set send_array, will send ompi_coll_adapt_ibcast_max_send_requests segments */ + for (i = 0; i < tree->tree_nextsize; i++) { + send_array[i] = mca_coll_adapt_component.adapt_ibcast_max_send_requests; + } + + ompi_request_t *send_req; + /* Number of datatypes in each send */ + int send_count = seg_count; + for (i = 0; i < min; i++) { + if (i == (num_segs - 1)) { + send_count = count - i * seg_count; + } + for (j = 0; j < tree->tree_nextsize; j++) { + ompi_coll_adapt_bcast_context_t *context = + (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component. + adapt_ibcast_context_free_list); + context->buff = (char *) buff + i * real_seg_size; + context->frag_id = i; + /* The id of peer in in children_list */ + context->child_id = j; + /* Actural rank of the peer */ + context->peer = tree->tree_next[j]; + context->con = con; + OBJ_RETAIN(con); + + char *send_buff = context->buff; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n", + rank, context->frag_id, context->peer, + (void *) send_buff, send_count, con->ibcast_tag - i)); + err = + MCA_PML_CALL(isend + (send_buff, send_count, datatype, context->peer, + con->ibcast_tag - i, sendmode, comm, + &send_req)); + if (MPI_SUCCESS != err) { + return err; + } + /* Set send callback */ + OPAL_THREAD_UNLOCK(mutex); + ompi_request_set_callback(send_req, send_cb, context); + OPAL_THREAD_LOCK(mutex); + } + } + + } + + /* If the current process is not root, it receives data from parent in the tree. */ + else { + /* Handle the situation when num_segs < RECV_NUM */ + if (num_segs <= mca_coll_adapt_component.adapt_ibcast_max_recv_requests) { + min = num_segs; + } else { + min = mca_coll_adapt_component.adapt_ibcast_max_recv_requests; + } + + /* Set recv_array, recv_array is empty */ + for (i = 0; i < num_segs; i++) { + recv_array[i] = 0; + } + /* Set send_array to empty */ + for (i = 0; i < tree->tree_nextsize; i++) { + send_array[i] = 0; + } + + /* Create a recv request */ + ompi_request_t *recv_req; + + /* Recevice some segments from its parent */ + int recv_count = seg_count; + for (i = 0; i < min; i++) { + if (i == (num_segs - 1)) { + recv_count = count - i * seg_count; + } + ompi_coll_adapt_bcast_context_t *context = + (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component. + adapt_ibcast_context_free_list); + context->buff = (char *) buff + i * real_seg_size; + context->frag_id = i; + context->peer = tree->tree_prev; + context->con = con; + OBJ_RETAIN(con); + char *recv_buff = context->buff; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n", + ompi_comm_rank(context->con->comm), context->frag_id, + context->peer, (void *) recv_buff, recv_count, + con->ibcast_tag - i)); + err = + MCA_PML_CALL(irecv + (recv_buff, recv_count, datatype, context->peer, + con->ibcast_tag - i, comm, &recv_req)); + if (MPI_SUCCESS != err) { + return err; + } + /* Set receive callback */ + OPAL_THREAD_UNLOCK(mutex); + ompi_request_set_callback(recv_req, recv_cb, context); + OPAL_THREAD_LOCK(mutex); + } + + } + + OPAL_THREAD_UNLOCK(mutex); + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: End of Ibcast\n", rank)); + + return MPI_SUCCESS; +} diff --git a/ompi/mca/coll/adapt/coll_adapt_inbuf.c b/ompi/mca/coll/adapt/coll_adapt_inbuf.c new file mode 100644 index 0000000000..aed2f309e3 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_inbuf.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_adapt.h" +#include "coll_adapt_inbuf.h" + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_inbuf_t, opal_free_list_item_t, + NULL, NULL); diff --git a/ompi/mca/coll/adapt/coll_adapt_inbuf.h b/ompi/mca/coll/adapt/coll_adapt_inbuf.h new file mode 100644 index 0000000000..d339256b85 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_inbuf.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COLL_ADAPT_INBUF_H +#define MCA_COLL_ADAPT_INBUF_H + +#include "opal/class/opal_free_list.h" + +struct ompi_coll_adapt_inbuf_s { + opal_free_list_item_t super; + char buff[]; +}; + +typedef struct ompi_coll_adapt_inbuf_s ompi_coll_adapt_inbuf_t; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_inbuf_t); + +#endif /* MCA_COLL_ADAPT_INBUF_H */ diff --git a/ompi/mca/coll/adapt/coll_adapt_ireduce.c b/ompi/mca/coll/adapt/coll_adapt_ireduce.c new file mode 100644 index 0000000000..8689e38128 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_ireduce.c @@ -0,0 +1,771 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "ompi/communicator/communicator.h" +#include "coll_adapt.h" +#include "coll_adapt_algorithms.h" +#include "coll_adapt_context.h" +#include "coll_adapt_item.h" +#include "coll_adapt_topocache.h" +#include "ompi/constants.h" +#include "ompi/mca/coll/base/coll_base_util.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/coll/base/coll_base_topo.h" + +static int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS, + ompi_coll_tree_t * tree, size_t seg_size); + +/* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */ + +/* + * Set up MCA parameters of MPI_Reduce and MPI_Ireduce + */ +int ompi_coll_adapt_ireduce_register(void) +{ + mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version; + + mca_coll_adapt_component.adapt_ireduce_algorithm = 1; + mca_base_component_var_register(c, "reduce_algorithm", + "Algorithm of reduce, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_algorithm); + if( (mca_coll_adapt_component.adapt_ireduce_algorithm < 0) || + (mca_coll_adapt_component.adapt_ireduce_algorithm > OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) { + mca_coll_adapt_component.adapt_ireduce_algorithm = 1; + } + + mca_coll_adapt_component.adapt_ireduce_segment_size = 163740; + mca_base_component_var_register(c, "reduce_segment_size", + "Segment size in bytes used by default for reduce algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.", + MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_segment_size); + + mca_coll_adapt_component.adapt_ireduce_max_send_requests = 2; + mca_base_component_var_register(c, "reduce_max_send_requests", + "Maximum number of send requests", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_max_send_requests); + + mca_coll_adapt_component.adapt_ireduce_max_recv_requests = 3; + mca_base_component_var_register(c, "reduce_max_recv_requests", + "Maximum number of receive requests per peer", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_max_recv_requests); + + mca_coll_adapt_component.adapt_inbuf_free_list_min = 10; + mca_base_component_var_register(c, "inbuf_free_list_min", + "Minimum number of segment in inbuf free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_inbuf_free_list_min); + + mca_coll_adapt_component.adapt_inbuf_free_list_max = 10000; + mca_base_component_var_register(c, "inbuf_free_list_max", + "Maximum number of segment in inbuf free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_inbuf_free_list_max); + + + mca_coll_adapt_component.adapt_inbuf_free_list_inc = 10; + mca_base_component_var_register(c, "inbuf_free_list_inc", + "Number of segments to allocate when growing the inbuf free list", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_inbuf_free_list_inc); + + mca_coll_adapt_component.adapt_ireduce_synchronous_send = true; + (void) mca_base_component_var_register(c, "reduce_synchronous_send", + "Whether to use synchronous send operations during setup of reduce operations", + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_adapt_component.adapt_ireduce_synchronous_send); + + mca_coll_adapt_component.adapt_ireduce_context_free_list = NULL; + return OMPI_SUCCESS; +} + +/* + * Release the free list created in ompi_coll_adapt_ireduce_generic + */ +int ompi_coll_adapt_ireduce_fini(void) +{ + if (NULL != mca_coll_adapt_component.adapt_ireduce_context_free_list) { + OBJ_RELEASE(mca_coll_adapt_component.adapt_ireduce_context_free_list); + mca_coll_adapt_component.adapt_ireduce_context_free_list = NULL; + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "ireduce fini\n")); + } + return OMPI_SUCCESS; +} + +/* + * Functions to access list + */ +static ompi_coll_adapt_item_t *get_next_ready_item(ompi_coll_adapt_constant_reduce_context_t *con, int num_children) +{ + ompi_coll_adapt_item_t *item = NULL, *temp_item; + if (opal_list_is_empty(&con->recv_list)) { + return NULL; + } + OPAL_THREAD_LOCK(&con->mutex_recv_list); + OPAL_LIST_FOREACH(temp_item, &con->recv_list, ompi_coll_adapt_item_t) { + if (temp_item->count == num_children) { + item = temp_item; + opal_list_remove_item(&con->recv_list, (opal_list_item_t *) temp_item); + break; + } + } + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + return item; +} + +static int add_to_recv_list(ompi_coll_adapt_constant_reduce_context_t *con, int id) +{ + ompi_coll_adapt_item_t *item; + + OPAL_THREAD_LOCK(&con->mutex_recv_list); + OPAL_LIST_FOREACH(item, &con->recv_list, ompi_coll_adapt_item_t) { + if (item->id == id) { + (item->count)++; + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 1\n")); + return 1; + } + } + + /* Add a new object to the list with count set to 1 */ + item = OBJ_NEW(ompi_coll_adapt_item_t); + item->id = id; + item->count = 1; + opal_list_append(&con->recv_list, (opal_list_item_t *) item); + OPAL_THREAD_UNLOCK(&con->mutex_recv_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 2\n")); + return 2; +} + +/* + * Get the inbuf address + */ +static ompi_coll_adapt_inbuf_t *to_inbuf(char *buf, int distance) +{ + return (ompi_coll_adapt_inbuf_t *) (buf - distance); +} + +/* + * Finish a ireduce request + */ +static int ireduce_request_fini(ompi_coll_adapt_reduce_context_t * context) +{ + /* Return the allocated recourses */ + ompi_request_t *temp_req = context->con->request; + if (context->con->accumbuf != NULL) { + if (context->con->rank != context->con->root) { + for (int i = 0; i < context->con->num_segs; i++) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: Return accumbuf %d %p\n", + ompi_comm_rank(context->con->comm), i, + (void *) to_inbuf(context->con->accumbuf[i], + context->con->distance))); + opal_free_list_return_st(&context->con->inbuf_list, + (opal_free_list_item_t *) to_inbuf(context->con->accumbuf[i], + context->con->distance)); + } + } + free(context->con->accumbuf); + } + for (int i = 0; i < context->con->num_segs; i++) { + OBJ_DESTRUCT(&context->con->mutex_op_list[i]); + } + free(context->con->mutex_op_list); + if (context->con->tree->tree_nextsize > 0) { + free(context->con->next_recv_segs); + } + OBJ_RELEASE(context->con); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n")); + opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, + (opal_free_list_item_t *) context); + /* Complete the request */ + ompi_request_complete(temp_req, 1); + return OMPI_SUCCESS; +} + +/* + * Callback function of isend + */ +static int send_cb(ompi_request_t * req) +{ + ompi_coll_adapt_reduce_context_t *context = + (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: ireduce_send_cb, peer %d, seg_id %d\n", context->con->rank, + context->peer, context->seg_index)); + int err; + + opal_atomic_sub_fetch_32(&(context->con->ongoing_send), 1); + + /* Send a new segment */ + ompi_coll_adapt_item_t *item = + get_next_ready_item(context->con, context->con->tree->tree_nextsize); + + if (item != NULL) { + /* Get new context item from free list */ + ompi_coll_adapt_reduce_context_t *send_context = + (ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component. + adapt_ireduce_context_free_list); + if (context->con->tree->tree_nextsize > 0) { + send_context->buff = context->con->accumbuf[item->id]; + } else { + send_context->buff = + context->buff + (item->id - context->seg_index) * context->con->segment_increment; + } + send_context->seg_index = item->id; + send_context->peer = context->peer; + send_context->con = context->con; + + opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1); + + int send_count = send_context->con->seg_count; + if (item->id == (send_context->con->num_segs - 1)) { + send_count = send_context->con->count - item->id * send_context->con->seg_count; + } + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n", + send_context->con->rank, send_context->seg_index, send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index)); + + ompi_request_t *send_req; + err = MCA_PML_CALL(isend + (send_context->buff, send_count, send_context->con->datatype, + send_context->peer, + context->con->ireduce_tag - send_context->seg_index, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); + if (MPI_SUCCESS != err) { + return err; + } + + /* Release the item */ + OBJ_RELEASE(item); + + /* Set the send call back */ + ompi_request_set_callback(send_req, send_cb, send_context); + } + + int32_t num_sent = opal_atomic_add_fetch_32(&(context->con->num_sent_segs), 1); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In send_cb, root = %d, num_sent = %d, num_segs = %d\n", + context->con->rank, context->con->tree->tree_root, num_sent, + context->con->num_segs)); + /* Check whether signal the condition, non root and sent all the segments */ + if (num_sent == context->con->num_segs && + context->con->num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize) { + ireduce_request_fini(context); + } else { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n")); + opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, + (opal_free_list_item_t *) context); + } + /* Call back function return 1, which means successful */ + req->req_free(&req); + return 1; +} + +/* + * Callback function of irecv + */ +static int recv_cb(ompi_request_t * req) +{ + ompi_coll_adapt_reduce_context_t *context = (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data; + int32_t new_id = opal_atomic_add_fetch_32(&(context->con->next_recv_segs[context->child_id]), 1); + int err; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: ireduce_recv_cb, peer %d, seg_id %d\n", context->con->rank, + context->peer, context->seg_index)); + + /* Did we still need to receive subsequent fragments from this child ? */ + if (new_id < context->con->num_segs) { + char *temp_recv_buf = NULL; + ompi_coll_adapt_inbuf_t *inbuf = NULL; + /* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */ + if (context->child_id == 0 && context->con->sbuf != MPI_IN_PLACE + && context->con->root == context->con->rank) { + temp_recv_buf = (char *) context->con->rbuf + + (ptrdiff_t) new_id *(ptrdiff_t) context->con->segment_increment; + } else { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In recv_cb, alloc inbuf\n", context->con->rank)); + inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&context->con->inbuf_list); + temp_recv_buf = inbuf->buff - context->con->lower_bound; + } + /* Get new context item from free list */ + ompi_coll_adapt_reduce_context_t *recv_context = + (ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component. + adapt_ireduce_context_free_list); + recv_context->buff = temp_recv_buf; + recv_context->seg_index = new_id; + recv_context->child_id = context->child_id; + recv_context->peer = context->peer; + recv_context->con = context->con; + recv_context->inbuf = inbuf; + int recv_count = recv_context->con->seg_count; + if (new_id == (recv_context->con->num_segs - 1)) { + recv_count = recv_context->con->count - new_id * recv_context->con->seg_count; + } + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n", + context->con->rank, recv_context->seg_index, recv_context->peer, + (void *) inbuf, + recv_context->con->ireduce_tag - recv_context->seg_index)); + ompi_request_t *recv_req; + err = MCA_PML_CALL(irecv(temp_recv_buf, recv_count, recv_context->con->datatype, + recv_context->peer, + recv_context->con->ireduce_tag - recv_context->seg_index, + recv_context->con->comm, &recv_req)); + if (MPI_SUCCESS != err) { + return err; + } + /* Set the receive call back */ + ompi_request_set_callback(recv_req, recv_cb, recv_context); + } + + /* Do the op */ + int op_count = context->con->seg_count; + if (context->seg_index == (context->con->num_segs - 1)) { + op_count = context->con->count - context->seg_index * context->con->seg_count; + } + + int keep_inbuf = 0; + OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->seg_index]); + if (NULL == context->con->accumbuf[context->seg_index]) { + if (NULL == context->inbuf) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: set accumbuf to rbuf\n", context->con->rank)); + context->con->accumbuf[context->seg_index] = context->buff; + } else { + keep_inbuf = 1; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: set accumbuf to inbuf\n", context->con->rank)); + context->con->accumbuf[context->seg_index] = context->inbuf->buff - context->con->lower_bound; + } + /* Op sbuf and accmbuf to accumbuf */ + ompi_op_reduce(context->con->op, + context->con->sbuf + (ptrdiff_t) context->seg_index * (ptrdiff_t) context->con->segment_increment, + context->con->accumbuf[context->seg_index], op_count, context->con->datatype); + + } else { + if (NULL == context->inbuf) { + /* Op rbuf and accumbuf to rbuf */ + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: op rbuf and accumbuf to rbuf\n", context->con->rank)); + ompi_op_reduce(context->con->op, context->con->accumbuf[context->seg_index], + context->buff, op_count, context->con->datatype); + /* Free old accumbuf */ + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: free old accumbuf %p\n", context->con->rank, + (void *) to_inbuf(context->con->accumbuf[context->seg_index], + context->con->distance))); + opal_free_list_return(&context->con->inbuf_list, + (opal_free_list_item_t *) to_inbuf(context->con->accumbuf[context->seg_index], + context->con->distance)); + /* Set accumbut to rbuf */ + context->con->accumbuf[context->seg_index] = context->buff; + } else { + /* Op inbuf and accmbuf to accumbuf */ + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: op inbuf and accmbuf to accumbuf\n", context->con->rank)); + ompi_op_reduce(context->con->op, context->inbuf->buff - context->con->lower_bound, + context->con->accumbuf[context->seg_index], op_count, + context->con->datatype); + } + } + OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->seg_index]); + + /* Set recv list */ + if (context->con->rank != context->con->tree->tree_root) { + add_to_recv_list(context->con, context->seg_index); + } + + /* Send to parent */ + if (context->con->rank != context->con->tree->tree_root + && context->con->ongoing_send < mca_coll_adapt_component.adapt_ireduce_max_send_requests) { + ompi_coll_adapt_item_t *item = get_next_ready_item(context->con, context->con->tree->tree_nextsize); + + if (NULL != item) { + /* Get new context item from free list */ + ompi_coll_adapt_reduce_context_t *send_context = + (ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component. + adapt_ireduce_context_free_list); + send_context->buff = context->con->accumbuf[context->seg_index]; + send_context->seg_index = item->id; + send_context->peer = context->con->tree->tree_prev; + send_context->con = context->con; + opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1); + + int send_count = send_context->con->seg_count; + if (item->id == (send_context->con->num_segs - 1)) { + send_count = send_context->con->count - item->id * send_context->con->seg_count; + } + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n", + send_context->con->rank, send_context->seg_index, send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index)); + + ompi_request_t *send_req; + err = MCA_PML_CALL(isend(send_context->buff, send_count, send_context->con->datatype, + send_context->peer, + send_context->con->ireduce_tag - send_context->seg_index, + MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req)); + if (MPI_SUCCESS != err) { + return err; + } + OBJ_RELEASE(item); + + /* Set the send call back */ + ompi_request_set_callback(send_req, send_cb, send_context); + } + } + + int32_t num_recv_segs = opal_atomic_add_fetch_32(&(context->con->num_recv_segs), 1); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In recv_cb, tree = %p, root = %d, num_recv = %d, num_segs = %d, num_child = %d\n", + context->con->rank, (void *) context->con->tree, + context->con->tree->tree_root, num_recv_segs, context->con->num_segs, + context->con->tree->tree_nextsize)); + /* Prepare for releasing all acquired resources */ + if (!keep_inbuf && NULL != context->inbuf) { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: root free context inbuf %p", context->con->rank, + (void *) context->inbuf)); + opal_free_list_return(&context->con->inbuf_list, + (opal_free_list_item_t *) context->inbuf); + } + /* If this is root and has received all the segments */ + if (num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize && + (context->con->tree->tree_root == context->con->rank || context->con->num_sent_segs == context->con->num_segs)) { + ireduce_request_fini(context); + } else { + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: return context_list", + context->con->rank)); + opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list, + (opal_free_list_item_t *) context); + } + req->req_free(&req); + return 1; +} + +int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, + ompi_request_t ** request, mca_coll_base_module_t * module) +{ + + /* Fall-back if operation is commutative */ + if (!ompi_op_is_commute(op)){ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "ADAPT cannot handle reduce with this (non-commutative) operation. It needs to fall back on another component\n")); + return adapt_module->previous_ireduce(sbuf, rbuf, count, dtype, op, root, + comm, request, + adapt_module->previous_reduce_module); + } + + + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, + "ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n", + root, mca_coll_adapt_component.adapt_ireduce_algorithm, + mca_coll_adapt_component.adapt_ireduce_segment_size, + mca_coll_adapt_component.adapt_ireduce_max_send_requests, + mca_coll_adapt_component.adapt_ireduce_max_recv_requests)); + + if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ireduce_algorithm) { + OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n")); + return OMPI_ERR_NOT_IMPLEMENTED; + } + + + return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module, + adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ireduce_algorithm), + mca_coll_adapt_component.adapt_ireduce_segment_size); + +} + + +int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + struct ompi_communicator_t *comm, ompi_request_t ** request, + mca_coll_base_module_t * module, ompi_coll_tree_t * tree, + size_t seg_size) +{ + + ptrdiff_t extent, lower_bound, segment_increment; + ptrdiff_t true_lower_bound, true_extent, real_seg_size; + size_t typelng; + int seg_count = count, num_segs, rank, recv_count, send_count, err, min; + /* Used to store the accumuate result, pointer to every segment */ + char **accumbuf = NULL; + opal_mutex_t *mutex_op_list; + /* A list to store the segments need to be sent */ + opal_list_t *recv_list; + mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ireduce_synchronous_send) + ? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD; + + /* Determine number of segments and number of elements sent per operation */ + rank = ompi_comm_rank(comm); + ompi_datatype_get_extent(dtype, &lower_bound, &extent); + ompi_datatype_type_size(dtype, &typelng); + COLL_BASE_COMPUTED_SEGCOUNT(seg_size, typelng, seg_count); + num_segs = (count + seg_count - 1) / seg_count; + segment_increment = (ptrdiff_t) seg_count *extent; + ompi_datatype_get_true_extent(dtype, &true_lower_bound, &true_extent); + real_seg_size = true_extent + (ptrdiff_t) (seg_count - 1) * extent; + + /* Atomically set up free list */ + if (NULL == mca_coll_adapt_component.adapt_ireduce_context_free_list) { + opal_free_list_t* fl = OBJ_NEW(opal_free_list_t); + opal_free_list_init(fl, + sizeof(ompi_coll_adapt_reduce_context_t), + opal_cache_line_size, + OBJ_CLASS(ompi_coll_adapt_reduce_context_t), + 0, opal_cache_line_size, + mca_coll_adapt_component.adapt_context_free_list_min, + mca_coll_adapt_component.adapt_context_free_list_max, + mca_coll_adapt_component.adapt_context_free_list_inc, + NULL, 0, NULL, NULL, NULL); + if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&mca_coll_adapt_component.adapt_ireduce_context_free_list, + &(intptr_t){0}, fl) ) { + OBJ_RELEASE(fl); + } + } + + ompi_coll_base_nbc_request_t *temp_request = NULL; + /* Set up request */ + temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t); + OMPI_REQUEST_INIT(&temp_request->super, false); + temp_request->super.req_state = OMPI_REQUEST_ACTIVE; + temp_request->super.req_type = OMPI_REQUEST_COLL; + temp_request->super.req_free = ompi_coll_adapt_request_free; + temp_request->super.req_status.MPI_SOURCE = 0; + temp_request->super.req_status.MPI_TAG = 0; + temp_request->super.req_status.MPI_ERROR = 0; + temp_request->super.req_status._cancelled = 0; + temp_request->super.req_status._ucount = 0; + *request = (ompi_request_t*)temp_request; + + /* Set up mutex */ + mutex_op_list = (opal_mutex_t *) malloc(sizeof(opal_mutex_t) * num_segs); + for (int32_t i = 0; i < num_segs; i++) { + OBJ_CONSTRUCT(&mutex_op_list[i], opal_mutex_t); + } + + /* Set constant context for send and recv call back */ + ompi_coll_adapt_constant_reduce_context_t *con = + OBJ_NEW(ompi_coll_adapt_constant_reduce_context_t); + con->count = count; + con->seg_count = seg_count; + con->datatype = dtype; + con->comm = comm; + con->segment_increment = segment_increment; + con->num_segs = num_segs; + con->request = (ompi_request_t*)temp_request; + con->rank = rank; + con->num_recv_segs = 0; + con->num_sent_segs = 0; + con->ongoing_send = 0; + con->mutex_op_list = mutex_op_list; + con->op = op; + con->tree = tree; + con->lower_bound = lower_bound; + con->sbuf = (char *) sbuf; + con->rbuf = (char *) rbuf; + con->root = root; + con->distance = 0; + con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs); + con->real_seg_size = real_seg_size; + + /* If the current process is not leaf */ + if (tree->tree_nextsize > 0) { + size_t num_allocate_elems = mca_coll_adapt_component.adapt_inbuf_free_list_min; + if (tree->tree_nextsize * num_segs < num_allocate_elems) { + num_allocate_elems = tree->tree_nextsize * num_segs; + } + opal_free_list_init(&con->inbuf_list, + sizeof(ompi_coll_adapt_inbuf_t) + real_seg_size, + opal_cache_line_size, + OBJ_CLASS(ompi_coll_adapt_inbuf_t), + 0, opal_cache_line_size, + num_allocate_elems, + mca_coll_adapt_component.adapt_inbuf_free_list_max, + mca_coll_adapt_component.adapt_inbuf_free_list_inc, + NULL, 0, NULL, NULL, NULL); + /* Set up next_recv_segs */ + con->next_recv_segs = (int32_t *) malloc(sizeof(int32_t) * tree->tree_nextsize); + ompi_coll_adapt_inbuf_t *temp_inbuf = + (ompi_coll_adapt_inbuf_t *) opal_free_list_wait_st(&con->inbuf_list); + con->distance = (char *) temp_inbuf->buff - lower_bound - (char *) temp_inbuf; //address of inbuf->buff to address of inbuf + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: distance %d, inbuf %p, inbuf->buff %p, inbuf->buff-lb %p, to_inbuf %p, inbuf_list %p\n", + rank, con->distance, (void *) temp_inbuf, (void *) temp_inbuf->buff, + (char *) temp_inbuf->buff - lower_bound, + (void *) to_inbuf((char *) temp_inbuf->buff - lower_bound, con->distance), + (void *) &con->inbuf_list)); + opal_free_list_return_st(&con->inbuf_list, (opal_free_list_item_t *) temp_inbuf); + } else { + con->next_recv_segs = NULL; + } + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root, + con->ireduce_tag)); + + /* If the current process is not leaf node */ + if (tree->tree_nextsize > 0) { + /* Set up accumbuf */ + accumbuf = (char **) malloc(sizeof(char *) * num_segs); + if (root == rank && sbuf == MPI_IN_PLACE) { + for (int32_t i = 0; i < num_segs; i++) { + accumbuf[i] = (char *) rbuf + (ptrdiff_t) i *(ptrdiff_t) segment_increment; + } + } else { + for (int32_t i = 0; i < num_segs; i++) { + accumbuf[i] = NULL; + } + } + + con->accumbuf = accumbuf; + + /* For the first batch of segments */ + min = mca_coll_adapt_component.adapt_ireduce_max_recv_requests; + if (num_segs < mca_coll_adapt_component.adapt_ireduce_max_recv_requests) { + min = num_segs; + } + for (int32_t i = 0; i < tree->tree_nextsize; i++) { + con->next_recv_segs[i] = min - 1; + } + + int num_recvs = 0; + for (int32_t seg_index = 0; seg_index < min; seg_index++) + { + /* For each child */ + for (int32_t i = 0; i < tree->tree_nextsize; i++) { + recv_count = seg_count; + if (seg_index == (num_segs - 1)) { + recv_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index; + } + char *temp_recv_buf = NULL; + ompi_coll_adapt_inbuf_t *inbuf = NULL; + /* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */ + if (i == 0 && sbuf != MPI_IN_PLACE && root == rank) { + temp_recv_buf = (char *) rbuf + (ptrdiff_t) seg_index *(ptrdiff_t) segment_increment; + } else { + inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&con->inbuf_list); + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, alloc inbuf %p\n", rank, + (void *) inbuf)); + temp_recv_buf = inbuf->buff - lower_bound; + } + /* Get context */ + ompi_coll_adapt_reduce_context_t *context = + (ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component. + adapt_ireduce_context_free_list); + context->buff = temp_recv_buf; + context->seg_index = seg_index; + context->child_id = i; //the id of peer in in the tree + context->peer = tree->tree_next[i]; //the actual rank of the peer + context->con = con; + context->inbuf = inbuf; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n", + context->con->rank, context->seg_index, context->peer, + recv_count, (void *) inbuf, + con->ireduce_tag - seg_index)); + + /* Create a recv request */ + ompi_request_t *recv_req; + err = MCA_PML_CALL(irecv + (temp_recv_buf, recv_count, dtype, tree->tree_next[i], + con->ireduce_tag - seg_index, comm, &recv_req)); + if (MPI_SUCCESS != err) { + return err; + } + /* Set the recv callback */ + ompi_request_set_callback(recv_req, recv_cb, context); + + ++num_recvs; + } + } + } + + /* Leaf nodes */ + else { + /* Set up recv_list */ + min = mca_coll_adapt_component.adapt_ireduce_max_send_requests; + if (num_segs <= mca_coll_adapt_component.adapt_ireduce_max_send_requests) { + min = num_segs; + } + /* put all items into the recv_list that won't be sent immediately */ + for (int32_t seg_index = min; seg_index < num_segs; seg_index++) { + ompi_coll_adapt_item_t *item; + item = OBJ_NEW(ompi_coll_adapt_item_t); + item->id = seg_index; + item->count = tree->tree_nextsize; + opal_list_append(&con->recv_list, (opal_list_item_t *) item); + } + con->accumbuf = accumbuf; + con->ongoing_send = min; + for (int32_t seg_index = 0; seg_index < min; seg_index++) { + send_count = seg_count; + if (seg_index == (num_segs - 1)) { + send_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index; + } + ompi_coll_adapt_reduce_context_t *context = + (ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component.adapt_ireduce_context_free_list); + context->buff = (char *) sbuf + (ptrdiff_t) seg_index * (ptrdiff_t) segment_increment; + context->seg_index = seg_index; + /* Actural rank of the peer */ + context->peer = tree->tree_prev; + context->con = con; + context->inbuf = NULL; + + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n", + context->con->rank, context->seg_index, context->peer, + send_count, con->ireduce_tag - context->seg_index)); + + /* Create send request */ + ompi_request_t *send_req; + err = MCA_PML_CALL(isend + (context->buff, send_count, dtype, tree->tree_prev, + con->ireduce_tag - context->seg_index, + sendmode, comm, &send_req)); + if (MPI_SUCCESS != err) { + return err; + } + + /* Set the send callback */ + ompi_request_set_callback(send_req, send_cb, context); + } + + } + + return MPI_SUCCESS; +} diff --git a/ompi/mca/coll/adapt/coll_adapt_item.c b/ompi/mca/coll/adapt/coll_adapt_item.c new file mode 100644 index 0000000000..1cb144b309 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_item.c @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_adapt_item.h" + +OBJ_CLASS_INSTANCE(ompi_coll_adapt_item_t, opal_list_item_t, + NULL, NULL); diff --git a/ompi/mca/coll/adapt/coll_adapt_item.h b/ompi/mca/coll/adapt/coll_adapt_item.h new file mode 100644 index 0000000000..0eb129704d --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_item.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "opal/class/opal_list.h" +#include "coll_adapt_inbuf.h" + +struct ompi_coll_adapt_item_s { + opal_list_item_t super; + /* Fragment id */ + int id; + /* The number of children which have received the current segment */ + int count; +}; + +typedef struct ompi_coll_adapt_item_s ompi_coll_adapt_item_t; + +OBJ_CLASS_DECLARATION(ompi_coll_adapt_item_t); diff --git a/ompi/mca/coll/adapt/coll_adapt_module.c b/ompi/mca/coll/adapt/coll_adapt_module.c new file mode 100644 index 0000000000..fd8c448f20 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_module.c @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include +#ifdef HAVE_STRING_H +#include +#endif /* HAVE_STRING_H */ +#ifdef HAVE_SCHED_H +#include +#endif /* HAVE_SCHED_H */ +#include +#ifdef HAVE_SYS_MMAN_H +#include +#endif /* HAVE_SYS_MMAN_H */ +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ + +#include "mpi.h" +#include "opal_stdint.h" +#include "opal/util/os_path.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/group/group.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/base.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/proc/proc.h" +#include "coll_adapt.h" + +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/pml/pml.h" +#include "coll_adapt_algorithms.h" +#include "coll_adapt_topocache.h" + + +/* + * Local functions + */ + +/* + * Module constructor + */ +static void adapt_module_construct(mca_coll_adapt_module_t * module) +{ + module->topo_cache = NULL; + module->adapt_enabled = false; +} + +/* + * Module destructor + */ +static void adapt_module_destruct(mca_coll_adapt_module_t * module) +{ + if (NULL != module->topo_cache) { + adapt_topology_cache_item_t *item; + while (NULL != (item = (adapt_topology_cache_item_t*)opal_list_remove_first(module->topo_cache))) { + OBJ_RELEASE(item); + } + OBJ_RELEASE(module->topo_cache); + module->topo_cache = NULL; + } + module->adapt_enabled = false; +} + + +OBJ_CLASS_INSTANCE(mca_coll_adapt_module_t, + mca_coll_base_module_t, + adapt_module_construct, + adapt_module_destruct); + +/* + * In this macro, the following variables are supposed to have been declared + * in the caller: + * . ompi_communicator_t *comm + * . mca_coll_adapt_module_t *adapt_module + */ +#define ADAPT_SAVE_PREV_COLL_API(__api) \ + do { \ + adapt_module->previous_ ## __api = comm->c_coll->coll_ ## __api; \ + adapt_module->previous_ ## __api ## _module = comm->c_coll->coll_ ## __api ## _module; \ + if (!comm->c_coll->coll_ ## __api || !comm->c_coll->coll_ ## __api ## _module) { \ + opal_output_verbose(1, ompi_coll_base_framework.framework_output, \ + "(%d/%s): no underlying " # __api"; disqualifying myself", \ + comm->c_contextid, comm->c_name); \ + return OMPI_ERROR; \ + } \ + OBJ_RETAIN(adapt_module->previous_ ## __api ## _module); \ + } while(0) + + +/* + * Init module on the communicator + */ +static int adapt_module_enable(mca_coll_base_module_t * module, + struct ompi_communicator_t *comm) +{ + mca_coll_adapt_module_t * adapt_module = (mca_coll_adapt_module_t*) module; + + ADAPT_SAVE_PREV_COLL_API(reduce); + ADAPT_SAVE_PREV_COLL_API(ireduce); + + return OMPI_SUCCESS; +} + +/* + * Initial query function that is invoked during MPI_INIT, allowing + * this component to disqualify itself if it doesn't support the + * required level of thread support. This function is invoked exactly + * once. + */ +int ompi_coll_adapt_init_query(bool enable_progress_threads, bool enable_mpi_threads) +{ + return OMPI_SUCCESS; +} + +/* + * Invoked when there's a new communicator that has been created. + * Look at the communicator and decide which set of functions and + * priority we want to return. + */ +mca_coll_base_module_t *ompi_coll_adapt_comm_query(struct ompi_communicator_t * comm, + int *priority) +{ + mca_coll_adapt_module_t *adapt_module; + + /* If we're intercomm, or if there's only one process in the communicator */ + if (OMPI_COMM_IS_INTER(comm) || 1 == ompi_comm_size(comm)) { + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:adapt:comm_query (%d/%s): intercomm, " + "comm is too small; disqualifying myself", + comm->c_contextid, comm->c_name); + return NULL; + } + + /* Get the priority level attached to this module. + If priority is less than or equal to 0, then the module is unavailable. */ + *priority = mca_coll_adapt_component.adapt_priority; + if (mca_coll_adapt_component.adapt_priority <= 0) { + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:adapt:comm_query (%d/%s): priority too low; " + "disqualifying myself", + comm->c_contextid, comm->c_name); + return NULL; + } + + adapt_module = OBJ_NEW(mca_coll_adapt_module_t); + if (NULL == adapt_module) { + return NULL; + } + + /* All is good -- return a module */ + adapt_module->super.coll_module_enable = adapt_module_enable; + adapt_module->super.ft_event = NULL; + adapt_module->super.coll_allgather = NULL; + adapt_module->super.coll_allgatherv = NULL; + adapt_module->super.coll_allreduce = NULL; + adapt_module->super.coll_alltoall = NULL; + adapt_module->super.coll_alltoallw = NULL; + adapt_module->super.coll_barrier = NULL; + adapt_module->super.coll_bcast = ompi_coll_adapt_bcast; + adapt_module->super.coll_exscan = NULL; + adapt_module->super.coll_gather = NULL; + adapt_module->super.coll_gatherv = NULL; + adapt_module->super.coll_reduce = ompi_coll_adapt_reduce; + adapt_module->super.coll_reduce_scatter = NULL; + adapt_module->super.coll_scan = NULL; + adapt_module->super.coll_scatter = NULL; + adapt_module->super.coll_scatterv = NULL; + adapt_module->super.coll_ibcast = ompi_coll_adapt_ibcast; + adapt_module->super.coll_ireduce = ompi_coll_adapt_ireduce; + adapt_module->super.coll_iallreduce = NULL; + + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:adapt:comm_query (%d/%s): pick me! pick me!", + comm->c_contextid, comm->c_name); + return &(adapt_module->super); +} + +/* + * Free ADAPT request + */ +int ompi_coll_adapt_request_free(ompi_request_t ** request) +{ + OMPI_REQUEST_FINI(*request); + (*request)->req_state = OMPI_REQUEST_INVALID; + OBJ_RELEASE(*request); + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} diff --git a/ompi/mca/coll/adapt/coll_adapt_reduce.c b/ompi/mca/coll/adapt/coll_adapt_reduce.c new file mode 100644 index 0000000000..d0ad26d6e6 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_reduce.c @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "ompi/op/op.h" +#include "coll_adapt.h" +#include "coll_adapt_algorithms.h" + +/* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */ +int ompi_coll_adapt_reduce(const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, + struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + /* Fall-back if operation is commutative */ + if (!ompi_op_is_commute(op)){ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module; + OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, + "ADAPT cannot handle reduce with this (commutative) operation. It needs to fall back on another component\n")); + return adapt_module->previous_reduce(sbuf, rbuf, count, dtype, op, root, + comm, + adapt_module->previous_reduce_module); + } + + ompi_request_t *request = NULL; + int err = ompi_coll_adapt_ireduce(sbuf, rbuf, count, dtype, op, root, comm, &request, module); + if( MPI_SUCCESS != err ) { + if( NULL == request ) + return err; + } + ompi_request_wait(&request, MPI_STATUS_IGNORE); + return err; +} diff --git a/ompi/mca/coll/adapt/coll_adapt_topocache.c b/ompi/mca/coll/adapt/coll_adapt_topocache.c new file mode 100644 index 0000000000..93c9a6043d --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_topocache.c @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_adapt.h" +#include "coll_adapt_topocache.h" + +#include "ompi/communicator/communicator.h" + +static void destruct_topology_cache(adapt_topology_cache_item_t *item) +{ + if (NULL != item->tree) { + ompi_coll_base_topo_destroy_tree(&item->tree); + } +} + +OBJ_CLASS_INSTANCE(adapt_topology_cache_item_t, opal_list_item_t, + NULL, &destruct_topology_cache); + +static ompi_coll_tree_t *create_topology( + ompi_coll_adapt_algorithm_t algorithm, + int root, + struct ompi_communicator_t *comm) +{ + switch(algorithm) { + case OMPI_COLL_ADAPT_ALGORITHM_TUNED: + { + return NULL; + } + case OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL: + { + return ompi_coll_base_topo_build_bmtree(comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL: + { + return ompi_coll_base_topo_build_in_order_bmtree(comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_BINARY: + { + return ompi_coll_base_topo_build_tree(2, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_PIPELINE: + { + return ompi_coll_base_topo_build_chain(1, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_CHAIN: + { + return ompi_coll_base_topo_build_chain(4, comm, root); + } + case OMPI_COLL_ADAPT_ALGORITHM_LINEAR: + { + int fanout = ompi_comm_size(comm) - 1; + ompi_coll_tree_t *tree; + if (fanout < 1) { + tree = ompi_coll_base_topo_build_chain(1, comm, root); + } else if (fanout <= MAXTREEFANOUT) { + tree = ompi_coll_base_topo_build_tree(ompi_comm_size(comm) - 1, comm, root); + } else { + tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root); + } + return tree; + } + default: + printf("WARN: unknown topology %d\n", algorithm); + return NULL; + } +} + +ompi_coll_tree_t* adapt_module_cached_topology( + mca_coll_base_module_t *module, + struct ompi_communicator_t *comm, + int root, + ompi_coll_adapt_algorithm_t algorithm) +{ + mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t*)module; + adapt_topology_cache_item_t *item; + ompi_coll_tree_t * tree; + if (NULL != adapt_module->topo_cache) { + OPAL_LIST_FOREACH(item, adapt_module->topo_cache, adapt_topology_cache_item_t) { + if (item->root == root && item->algorithm == algorithm) { + return item->tree; + } + } + } else { + adapt_module->topo_cache = OBJ_NEW(opal_list_t); + } + + /* topology not found, create one */ + tree = create_topology(algorithm, root, comm); + + item = OBJ_NEW(adapt_topology_cache_item_t); + item->tree = tree; + item->root = root; + item->algorithm = algorithm; + opal_list_prepend(adapt_module->topo_cache, &item->super); + return tree; +} + diff --git a/ompi/mca/coll/adapt/coll_adapt_topocache.h b/ompi/mca/coll/adapt/coll_adapt_topocache.h new file mode 100644 index 0000000000..6910089ee0 --- /dev/null +++ b/ompi/mca/coll/adapt/coll_adapt_topocache.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2020 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COLL_ADAPT_TOPOCACHE_H +#define MCA_COLL_ADAPT_TOPOCACHE_H + +#include "opal/class/opal_list.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/coll_base_topo.h" + +typedef struct adapt_topology_cache_item_t { + opal_list_item_t super; + ompi_coll_tree_t *tree; + int root; + int algorithm; +} adapt_topology_cache_item_t; + +OBJ_CLASS_DECLARATION(adapt_topology_cache_item_t); + + +OMPI_DECLSPEC ompi_coll_tree_t* adapt_module_cached_topology( + mca_coll_base_module_t *module, + struct ompi_communicator_t *comm, + int root, + ompi_coll_adapt_algorithm_t algorithm); + +#endif /* MCA_COLL_ADAPT_TOPOCACHE_H */ diff --git a/ompi/mca/coll/base/coll_base_util.h b/ompi/mca/coll/base/coll_base_util.h index a5b8016124..555b40bd46 100644 --- a/ompi/mca/coll/base/coll_base_util.h +++ b/ompi/mca/coll/base/coll_base_util.h @@ -27,11 +27,17 @@ #include "ompi/mca/mca.h" #include "ompi/datatype/ompi_datatype.h" #include "ompi/request/request.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/coll/base/coll_tags.h" #include "ompi/op/op.h" #include "ompi/mca/pml/pml.h" BEGIN_C_DECLS +/** + * Request structure to be returned by non-blocking + * collective operations. + */ struct ompi_coll_base_nbc_request_t { ompi_request_t super; union { @@ -60,6 +66,22 @@ struct ompi_coll_base_nbc_request_t { OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t); +static inline int32_t +ompi_coll_base_nbc_reserve_tags(ompi_communicator_t* comm, int32_t reserve) +{ + int32_t tag, old_tag; + assert( reserve > 0 ); + reread_tag: /* In case we fail to atomically update the tag */ + tag = old_tag = comm->c_nbc_tag; + if ((tag - reserve) < MCA_COLL_BASE_TAG_NONBLOCKING_END) { + tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; + } + if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_32(&comm->c_nbc_tag, &old_tag, tag - reserve) ) { + goto reread_tag; + } + return tag; +} + typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t; /** @@ -115,14 +137,29 @@ unsigned int ompi_mirror_perm(unsigned int x, int nbits); */ int ompi_rounddown(int num, int factor); +/** + * If necessary, retain op and store it in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_op( ompi_request_t *request, ompi_op_t *op, ompi_datatype_t *type); +/** + * If necessary, retain the datatypes and store them in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_datatypes( ompi_request_t *request, ompi_datatype_t *stype, ompi_datatype_t *rtype); +/** + * If necessary, retain the datatypes and store them in the + * request object, which should be of type ompi_coll_base_nbc_request_t + * (will be cast internally). + */ int ompi_coll_base_retain_datatypes_w( ompi_request_t *request, ompi_datatype_t *stypes[], ompi_datatype_t *rtypes[]); diff --git a/ompi/mca/coll/libnbc/coll_libnbc.h b/ompi/mca/coll/libnbc/coll_libnbc.h index badc187077..bbd346e9c1 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc.h +++ b/ompi/mca/coll/libnbc/coll_libnbc.h @@ -94,7 +94,6 @@ struct ompi_coll_libnbc_module_t { mca_coll_base_module_t super; opal_mutex_t mutex; bool comm_registered; - int tag; #ifdef NBC_CACHE_SCHEDULE void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct hb_tree, but since this is a diff --git a/ompi/mca/coll/libnbc/nbc.c b/ompi/mca/coll/libnbc/nbc.c index 171f5a37e9..35e02fe87b 100644 --- a/ompi/mca/coll/libnbc/nbc.c +++ b/ompi/mca/coll/libnbc/nbc.c @@ -25,7 +25,7 @@ * Additional copyrights may follow */ #include "nbc_internal.h" -#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/coll/base/coll_base_util.h" #include "ompi/op/op.h" #include "ompi/mca/pml/pml.h" @@ -595,7 +595,6 @@ void NBC_Return_handle(ompi_coll_libnbc_request_t *request) { } int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) { - comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE; #ifdef NBC_CACHE_SCHEDULE /* initialize the NBC_ALLTOALL SchedCache tree */ @@ -672,7 +671,7 @@ int NBC_Start(NBC_Handle *handle) { int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, ompi_coll_libnbc_module_t *module, bool persistent, ompi_request_t **request, void *tmpbuf) { - int ret, tmp_tag; + int ret; bool need_register = false; ompi_coll_libnbc_request_t *handle; @@ -685,13 +684,7 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, /* update the module->tag here because other processes may have operations * and they may update the module->tag */ - OPAL_THREAD_LOCK(&module->mutex); - tmp_tag = module->tag--; - if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) { - tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; - NBC_DEBUG(2,"resetting tags ...\n"); - } - OPAL_THREAD_UNLOCK(&module->mutex); + (void)ompi_coll_base_nbc_reserve_tags(comm, 1); OBJ_RELEASE(schedule); free(tmpbuf); @@ -712,20 +705,15 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, /******************** Do the tag and shadow comm administration ... ***************/ - OPAL_THREAD_LOCK(&module->mutex); - tmp_tag = module->tag--; - if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) { - tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE; - NBC_DEBUG(2,"resetting tags ...\n"); - } + handle->tag = ompi_coll_base_nbc_reserve_tags(comm, 1); + OPAL_THREAD_LOCK(&module->mutex); if (true != module->comm_registered) { module->comm_registered = true; need_register = true; } OPAL_THREAD_UNLOCK(&module->mutex); - handle->tag = tmp_tag; /* register progress */ if (need_register) { @@ -737,7 +725,6 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm, } handle->comm=comm; - /*printf("got module: %lu tag: %i\n", module, module->tag);*/ /******************** end of tag and shadow comm administration ... ***************/ handle->comminfo = module; diff --git a/ompi/mca/coll/tuned/coll_tuned_component.c b/ompi/mca/coll/tuned/coll_tuned_component.c index a17cfacb12..7f6764d5f9 100644 --- a/ompi/mca/coll/tuned/coll_tuned_component.c +++ b/ompi/mca/coll/tuned/coll_tuned_component.c @@ -215,17 +215,8 @@ static int tuned_open(void) int rc; #if OPAL_ENABLE_DEBUG - { - int param; - - param = mca_base_var_find("ompi", "coll", "base", "verbose"); - if (param >= 0) { - const int *verbose = NULL; - mca_base_var_get_value(param, &verbose, NULL, NULL); - if (verbose && verbose[0] > 0) { - ompi_coll_tuned_stream = opal_output_open(NULL); - } - } + if (ompi_coll_base_framework.framework_verbose) { + ompi_coll_tuned_stream = opal_output_open(NULL); } #endif /* OPAL_ENABLE_DEBUG */ diff --git a/ompi/mpi/c/ibcast.c b/ompi/mpi/c/ibcast.c index 2dcdbb9633..33a05154e1 100644 --- a/ompi/mpi/c/ibcast.c +++ b/ompi/mpi/c/ibcast.c @@ -2,7 +2,7 @@ * Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved. * Copyright (c) 2015-2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. - * Copyright (c) 2017-2018 The University of Tennessee and The University + * Copyright (c) 2017-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * $COPYRIGHT$ @@ -80,6 +80,14 @@ int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, } } + /* If there's only one node, or if the count is 0, we're done */ + + if ((OMPI_COMM_IS_INTRA(comm) && ompi_comm_size(comm) <= 1) || + 0 == count) { + *request = &ompi_request_empty; + return MPI_SUCCESS; + } + OPAL_CR_ENTER_LIBRARY(); /* Invoke the coll component to perform the back-end operation */ diff --git a/ompi/mpi/c/ireduce_scatter_block.c b/ompi/mpi/c/ireduce_scatter_block.c index ce43ab3cd4..1e974bed3f 100644 --- a/ompi/mpi/c/ireduce_scatter_block.c +++ b/ompi/mpi/c/ireduce_scatter_block.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2018 The University of Tennessee and The University + * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, @@ -97,6 +97,11 @@ int MPI_Ireduce_scatter_block(const void *sendbuf, void *recvbuf, int recvcount, OMPI_ERRHANDLER_CHECK(err, comm, err, FUNC_NAME); } + if (0 == recvcount) { + *request = &ompi_request_empty; + return MPI_SUCCESS; + } + OPAL_CR_ENTER_LIBRARY(); /* Invoke the coll component to perform the back-end operation */ diff --git a/ompi/mpi/c/reduce_scatter_block.c b/ompi/mpi/c/reduce_scatter_block.c index 96b991f5cc..9172c1aac9 100644 --- a/ompi/mpi/c/reduce_scatter_block.c +++ b/ompi/mpi/c/reduce_scatter_block.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2017 The University of Tennessee and The University + * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, @@ -94,6 +94,9 @@ int MPI_Reduce_scatter_block(const void *sendbuf, void *recvbuf, int recvcount, OMPI_CHECK_DATATYPE_FOR_SEND(err, datatype, recvcount); OMPI_ERRHANDLER_CHECK(err, comm, err, FUNC_NAME); } + if (0 == recvcount) { + return MPI_SUCCESS; + } OPAL_CR_ENTER_LIBRARY(); diff --git a/ompi/request/request.h b/ompi/request/request.h index 6460fbe4fa..9a0cc33bbf 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2016 The University of Tennessee and The University + * Copyright (c) 2004-2020 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -434,9 +434,11 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa { int rc = 0; - if( NULL != request->req_complete_cb) { - rc = request->req_complete_cb( request ); + if(NULL != request->req_complete_cb) { + /* Set the request cb to NULL to allow resetting in the callback */ + ompi_request_complete_fn_t fct = request->req_complete_cb; request->req_complete_cb = NULL; + rc = fct( request ); } if (0 == rc) { @@ -457,6 +459,21 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa return OMPI_SUCCESS; } +static inline int ompi_request_set_callback(ompi_request_t* request, + ompi_request_complete_fn_t cb, + void* cb_data) +{ + request->req_complete_cb_data = cb_data; + request->req_complete_cb = cb; + /* If request is completed and the callback is not called, need to call callback */ + if ((NULL != request->req_complete_cb) && (request->req_complete == REQUEST_COMPLETED)) { + ompi_request_complete_fn_t fct = request->req_complete_cb; + request->req_complete_cb = NULL; + return fct( request ); + } + return OMPI_SUCCESS; +} + END_C_DECLS #endif