Bring ADAPT collective to 4.1
This is a meta commit, that encapsulate all the ADAPT commits in the master into a single PR for 4.1. The master commits included here are: fe73586, a4be3bb, d712645, c2970a3, e59bde9, ee592f3 and c98e387. Here is a detailed list of added capabilities: * coll/adapt: Fix naming conventions and C11 atomic use * coll/adapt: Remove unused component field in module * Consistent handling of zero counts in the MPI API. * Correctly handle non-blocking collectives tags * As it is possible to have multiple outstanding non-blocking collectives provided by different collective modules, we need a consistent mechanism to allow them to select unique tags for each instance of a collective. * Add support for fallback to previous coll module on non-commutative operations (#30) * Replace mutexes by atomic operations. * Use the correct nbc request type (for both ibcast and ireduce) * coll/base: document type casts in ompi_coll_base_retain_* * add module-wide topology cache * use standard instead of synchronous send and add mca parameter to control mode of initial send in ireduce/ibcast * reduce number of memory allocations * call the default request completion. * Remove the requests from the Fortran lookup conversion tables before completing and free it. * piggybacking Bull functionalities Signed-off-by: Xi Luo <xluo12@vols.utk.edu> Signed-off-by: George Bosilca <bosilca@icl.utk.edu> Signed-off-by: Marc Sergent <marc.sergent@atos.net> Co-authored-by: Joseph Schuchart <schuchart@hlrs.de> Co-authored-by: Lemarinier, Pierre <pierre.lemarinier@atos.net> Co-authored-by: pierrele <31764860+pierrele@users.noreply.github.com>
Этот коммит содержится в:
родитель
57044aa0b0
Коммит
e65fa4ff5c
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2017 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
@ -39,6 +39,7 @@
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/topo/base/base.h"
|
||||
#include "ompi/runtime/params.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
@ -378,6 +379,7 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
|
||||
comm->c_pml_comm = NULL;
|
||||
comm->c_topo = NULL;
|
||||
comm->c_coll = NULL;
|
||||
comm->c_nbc_tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
|
||||
/* A keyhash will be created if/when an attribute is cached on
|
||||
this communicator */
|
||||
|
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2017 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
@ -187,6 +187,13 @@ struct ompi_communicator_t {
|
||||
|
||||
/* Collectives module interface and data */
|
||||
mca_coll_base_comm_coll_t *c_coll;
|
||||
|
||||
/* Non-blocking collective tag. These tags might be shared between
|
||||
* all non-blocking collective modules (to avoid message collision
|
||||
* between them in the case where multiple outstanding non-blocking
|
||||
* collective coexists using multiple backends).
|
||||
*/
|
||||
volatile int32_t c_nbc_tag;
|
||||
};
|
||||
typedef struct ompi_communicator_t ompi_communicator_t;
|
||||
|
||||
|
51
ompi/mca/coll/adapt/Makefile.am
Обычный файл
51
ompi/mca/coll/adapt/Makefile.am
Обычный файл
@ -0,0 +1,51 @@
|
||||
#
|
||||
# Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
# of Tennessee Research Foundation. All rights
|
||||
# reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
|
||||
sources = \
|
||||
coll_adapt_component.c \
|
||||
coll_adapt_module.c \
|
||||
coll_adapt_bcast.c \
|
||||
coll_adapt_ibcast.c \
|
||||
coll_adapt_reduce.c \
|
||||
coll_adapt_ireduce.c \
|
||||
coll_adapt.h \
|
||||
coll_adapt_algorithms.h \
|
||||
coll_adapt_context.h \
|
||||
coll_adapt_context.c \
|
||||
coll_adapt_inbuf.c \
|
||||
coll_adapt_inbuf.h \
|
||||
coll_adapt_item.c \
|
||||
coll_adapt_item.h \
|
||||
coll_adapt_topocache.c \
|
||||
coll_adapt_topocache.h
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
component_noinst =
|
||||
component_install =
|
||||
if MCA_BUILD_ompi_coll_adapt_DSO
|
||||
component_install += mca_coll_adapt.la
|
||||
else
|
||||
component_noinst += libmca_coll_adapt.la
|
||||
endif
|
||||
|
||||
mcacomponentdir = $(ompilibdir)
|
||||
mcacomponent_LTLIBRARIES = $(component_install)
|
||||
mca_coll_adapt_la_SOURCES = $(sources)
|
||||
mca_coll_adapt_la_LDFLAGS = -module -avoid-version
|
||||
mca_coll_adapt_la_LIBADD =
|
||||
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_coll_adapt_la_SOURCES =$(sources)
|
||||
libmca_coll_adapt_la_LDFLAGS = -module -avoid-version
|
145
ompi/mca/coll/adapt/coll_adapt.h
Обычный файл
145
ompi/mca/coll/adapt/coll_adapt.h
Обычный файл
@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
|
||||
#ifndef MCA_COLL_ADAPT_EXPORT_H
|
||||
#define MCA_COLL_ADAPT_EXPORT_H
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "mpi.h"
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/datatype/opal_convertor.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct mca_coll_adapt_module_t mca_coll_adapt_module_t;
|
||||
|
||||
typedef enum {
|
||||
OMPI_COLL_ADAPT_ALGORITHM_TUNED = 0,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_BINARY,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_PIPELINE,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_CHAIN,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_LINEAR,
|
||||
OMPI_COLL_ADAPT_ALGORITHM_COUNT /* number of algorithms, keep last! */
|
||||
} ompi_coll_adapt_algorithm_t;
|
||||
|
||||
/*
|
||||
* Structure to hold the adapt coll component. First it holds the
|
||||
* base coll component, and then holds a bunch of
|
||||
* adapt-coll-component-specific stuff (e.g., current MCA param
|
||||
* values).
|
||||
*/
|
||||
typedef struct mca_coll_adapt_component_t {
|
||||
/* Base coll component */
|
||||
mca_coll_base_component_2_0_0_t super;
|
||||
|
||||
/* MCA parameter: Priority of this component */
|
||||
int adapt_priority;
|
||||
|
||||
/* MCA parameter: Output stream and verbose level */
|
||||
int adapt_output;
|
||||
int adapt_verbose;
|
||||
|
||||
/* MCA parameter: Maximum number of segment in context free list */
|
||||
int adapt_context_free_list_max;
|
||||
|
||||
/* MCA parameter: Minimum number of segment in context free list */
|
||||
int adapt_context_free_list_min;
|
||||
|
||||
/* MCA parameter: Increasement number of segment in context free list */
|
||||
int adapt_context_free_list_inc;
|
||||
|
||||
/* Bcast MCA parameter */
|
||||
int adapt_ibcast_algorithm;
|
||||
size_t adapt_ibcast_segment_size;
|
||||
int adapt_ibcast_max_send_requests;
|
||||
int adapt_ibcast_max_recv_requests;
|
||||
bool adapt_ibcast_synchronous_send;
|
||||
/* Bcast free list */
|
||||
opal_free_list_t *adapt_ibcast_context_free_list;
|
||||
|
||||
/* Reduce MCA parameter */
|
||||
int adapt_ireduce_algorithm;
|
||||
size_t adapt_ireduce_segment_size;
|
||||
int adapt_ireduce_max_send_requests;
|
||||
int adapt_ireduce_max_recv_requests;
|
||||
int adapt_inbuf_free_list_min;
|
||||
int adapt_inbuf_free_list_max;
|
||||
int adapt_inbuf_free_list_inc;
|
||||
bool adapt_ireduce_synchronous_send;
|
||||
|
||||
/* Reduce free list */
|
||||
opal_free_list_t *adapt_ireduce_context_free_list;
|
||||
|
||||
} mca_coll_adapt_component_t;
|
||||
|
||||
/*
|
||||
* Structure used to store what is necessary for the collective operations
|
||||
* routines in case of fallback.
|
||||
*/
|
||||
typedef struct mca_coll_adapt_collective_fallback_s {
|
||||
union {
|
||||
mca_coll_base_module_reduce_fn_t reduce;
|
||||
mca_coll_base_module_ireduce_fn_t ireduce;
|
||||
} previous_routine;
|
||||
mca_coll_base_module_t *previous_module;
|
||||
} mca_coll_adapt_collective_fallback_t;
|
||||
|
||||
|
||||
typedef enum mca_coll_adapt_colltype {
|
||||
ADAPT_REDUCE = 0,
|
||||
ADAPT_IREDUCE = 1,
|
||||
ADAPT_COLLCOUNT
|
||||
} mca_coll_adapt_colltype_t;
|
||||
|
||||
/*
|
||||
* Some defines to stick to the naming used in the other components in terms of
|
||||
* fallback routines
|
||||
*/
|
||||
#define previous_reduce previous_routines[ADAPT_REDUCE].previous_routine.reduce
|
||||
#define previous_ireduce previous_routines[ADAPT_IREDUCE].previous_routine.ireduce
|
||||
|
||||
#define previous_reduce_module previous_routines[ADAPT_REDUCE].previous_module
|
||||
#define previous_ireduce_module previous_routines[ADAPT_IREDUCE].previous_module
|
||||
|
||||
|
||||
/* Coll adapt module per communicator*/
|
||||
struct mca_coll_adapt_module_t {
|
||||
/* Base module */
|
||||
mca_coll_base_module_t super;
|
||||
|
||||
/* To be able to fallback when the cases are not supported */
|
||||
struct mca_coll_adapt_collective_fallback_s previous_routines[ADAPT_COLLCOUNT];
|
||||
|
||||
/* cached topologies */
|
||||
opal_list_t *topo_cache;
|
||||
|
||||
/* Whether this module has been lazily initialized or not yet */
|
||||
bool adapt_enabled;
|
||||
};
|
||||
OBJ_CLASS_DECLARATION(mca_coll_adapt_module_t);
|
||||
|
||||
/* Global component instance */
|
||||
OMPI_MODULE_DECLSPEC extern mca_coll_adapt_component_t mca_coll_adapt_component;
|
||||
|
||||
/* ADAPT module functions */
|
||||
int ompi_coll_adapt_init_query(bool enable_progress_threads, bool enable_mpi_threads);
|
||||
mca_coll_base_module_t * ompi_coll_adapt_comm_query(struct ompi_communicator_t *comm, int *priority);
|
||||
|
||||
/* ADAPT request free */
|
||||
int ompi_coll_adapt_request_free(ompi_request_t **request);
|
||||
|
||||
#endif /* MCA_COLL_ADAPT_EXPORT_H */
|
38
ompi/mca/coll/adapt/coll_adapt_algorithms.h
Обычный файл
38
ompi/mca/coll/adapt/coll_adapt_algorithms.h
Обычный файл
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include <math.h>
|
||||
|
||||
typedef int (*ompi_mca_coll_adapt_ibcast_function_t)(IBCAST_ARGS);
|
||||
typedef int (*ompi_mca_coll_adapt_ireduce_function_t)(IREDUCE_ARGS);
|
||||
|
||||
typedef struct ompi_coll_adapt_algorithm_index_s {
|
||||
int algorithm_index;
|
||||
union {
|
||||
ompi_mca_coll_adapt_ibcast_function_t ibcast_fn_ptr;
|
||||
ompi_mca_coll_adapt_ireduce_function_t ireduce_fn_ptr;
|
||||
};
|
||||
} ompi_coll_adapt_algorithm_index_t;
|
||||
|
||||
/* Bcast */
|
||||
int ompi_coll_adapt_ibcast_register(void);
|
||||
int ompi_coll_adapt_ibcast_fini(void);
|
||||
int ompi_coll_adapt_bcast(BCAST_ARGS);
|
||||
int ompi_coll_adapt_ibcast(IBCAST_ARGS);
|
||||
|
||||
/* Reduce */
|
||||
int ompi_coll_adapt_ireduce_register(void);
|
||||
int ompi_coll_adapt_ireduce_fini(void);
|
||||
int ompi_coll_adapt_reduce(REDUCE_ARGS);
|
||||
int ompi_coll_adapt_ireduce(IREDUCE_ARGS);
|
26
ompi/mca/coll/adapt/coll_adapt_bcast.c
Обычный файл
26
ompi/mca/coll/adapt/coll_adapt_bcast.c
Обычный файл
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
|
||||
int ompi_coll_adapt_bcast(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, mca_coll_base_module_t * module)
|
||||
{
|
||||
ompi_request_t *request = NULL;
|
||||
int err = ompi_coll_adapt_ibcast(buff, count, datatype, root, comm, &request, module);
|
||||
if( MPI_SUCCESS != err ) {
|
||||
if( NULL == request )
|
||||
return err;
|
||||
}
|
||||
ompi_request_wait(&request, MPI_STATUS_IGNORE);
|
||||
return err;
|
||||
}
|
155
ompi/mca/coll/adapt/coll_adapt_component.c
Обычный файл
155
ompi/mca/coll/adapt/coll_adapt_component.c
Обычный файл
@ -0,0 +1,155 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "opal/util/show_help.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
|
||||
/*
|
||||
* Public string showing the coll ompi_adapt component version number
|
||||
*/
|
||||
const char *mca_coll_adapt_component_version_string =
|
||||
"Open MPI ADAPT collective MCA component version " OMPI_VERSION;
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
static int adapt_open(void);
|
||||
static int adapt_close(void);
|
||||
static int adapt_register(void);
|
||||
|
||||
/*
|
||||
* Instantiate the public struct with all of our public information
|
||||
* and pointers to our public functions in it
|
||||
*/
|
||||
|
||||
mca_coll_adapt_component_t mca_coll_adapt_component = {
|
||||
/* First, fill in the super */
|
||||
{
|
||||
/* First, the mca_component_t struct containing meta
|
||||
information about the component itself */
|
||||
.collm_version = {
|
||||
MCA_COLL_BASE_VERSION_2_0_0,
|
||||
|
||||
/* Component name and version */
|
||||
.mca_component_name = "adapt",
|
||||
MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION,
|
||||
OMPI_RELEASE_VERSION),
|
||||
|
||||
/* Component functions */
|
||||
.mca_open_component = adapt_open,
|
||||
.mca_close_component = adapt_close,
|
||||
.mca_register_component_params = adapt_register,
|
||||
},
|
||||
.collm_data = {
|
||||
/* The component is not checkpoint ready */
|
||||
MCA_BASE_METADATA_PARAM_NONE
|
||||
},
|
||||
|
||||
/* Initialization / querying functions */
|
||||
.collm_init_query = ompi_coll_adapt_init_query,
|
||||
.collm_comm_query = ompi_coll_adapt_comm_query,
|
||||
},
|
||||
|
||||
/* adapt-component specific information */
|
||||
|
||||
0, /* (default) priority */
|
||||
|
||||
0, /* (default) output stream */
|
||||
0, /* (default) verbose level */
|
||||
|
||||
/* default values for non-MCA parameters */
|
||||
/* Not specifying values here gives us all 0's */
|
||||
};
|
||||
|
||||
/* Open the component */
|
||||
static int adapt_open(void)
|
||||
{
|
||||
mca_coll_adapt_component_t *cs = &mca_coll_adapt_component;
|
||||
|
||||
if (cs->adapt_verbose > 0) {
|
||||
cs->adapt_output = opal_output_open(NULL);
|
||||
opal_output_set_verbosity(cs->adapt_output, cs->adapt_verbose);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/* Shut down the component */
|
||||
static int adapt_close(void)
|
||||
{
|
||||
ompi_coll_adapt_ibcast_fini();
|
||||
ompi_coll_adapt_ireduce_fini();
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int adapt_verify_mca_variables(void)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register MCA params
|
||||
*/
|
||||
static int adapt_register(void)
|
||||
{
|
||||
mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version;
|
||||
mca_coll_adapt_component_t *cs = &mca_coll_adapt_component;
|
||||
|
||||
/* If we want to be selected (i.e., all procs on one node), then
|
||||
we should have a high priority */
|
||||
cs->adapt_priority = 0;
|
||||
(void) mca_base_component_var_register(c, "priority", "Priority of the adapt coll component",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &cs->adapt_priority);
|
||||
|
||||
cs->adapt_verbose = ompi_coll_base_framework.framework_verbose;
|
||||
(void) mca_base_component_var_register(c, "verbose",
|
||||
"Verbose level (default set to the collective framework verbosity)",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY, &cs->adapt_verbose);
|
||||
|
||||
cs->adapt_context_free_list_min = 64;
|
||||
(void) mca_base_component_var_register(c, "context_free_list_min",
|
||||
"Minimum number of segments in context free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&cs->adapt_context_free_list_min);
|
||||
|
||||
cs->adapt_context_free_list_max = 1024;
|
||||
(void) mca_base_component_var_register(c, "context_free_list_max",
|
||||
"Maximum number of segments in context free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&cs->adapt_context_free_list_max);
|
||||
|
||||
cs->adapt_context_free_list_inc = 32;
|
||||
(void) mca_base_component_var_register(c, "context_free_list_inc",
|
||||
"Increasement number of segments in context free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&cs->adapt_context_free_list_inc);
|
||||
ompi_coll_adapt_ibcast_register();
|
||||
ompi_coll_adapt_ireduce_register();
|
||||
|
||||
return adapt_verify_mca_variables();
|
||||
}
|
42
ompi/mca/coll/adapt/coll_adapt_context.c
Обычный файл
42
ompi/mca/coll/adapt/coll_adapt_context.c
Обычный файл
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "coll_adapt_context.h"
|
||||
|
||||
|
||||
static void adapt_constant_reduce_context_construct(ompi_coll_adapt_constant_reduce_context_t *context)
|
||||
{
|
||||
OBJ_CONSTRUCT(&context->recv_list, opal_list_t);
|
||||
OBJ_CONSTRUCT(&context->mutex_recv_list, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&context->inbuf_list, opal_free_list_t);
|
||||
}
|
||||
|
||||
static void adapt_constant_reduce_context_destruct(ompi_coll_adapt_constant_reduce_context_t *context)
|
||||
{
|
||||
OBJ_DESTRUCT(&context->mutex_recv_list);
|
||||
OBJ_DESTRUCT(&context->recv_list);
|
||||
OBJ_DESTRUCT(&context->inbuf_list);
|
||||
}
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_bcast_context_t, opal_free_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_constant_bcast_context_t, opal_object_t,
|
||||
NULL, NULL);
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_reduce_context_t, opal_free_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_constant_reduce_context_t, opal_object_t,
|
||||
&adapt_constant_reduce_context_construct,
|
||||
&adapt_constant_reduce_context_destruct);
|
128
ompi/mca/coll/adapt/coll_adapt_context.h
Обычный файл
128
ompi/mca/coll/adapt/coll_adapt_context.h
Обычный файл
@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "opal/class/opal_free_list.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "ompi/datatype/ompi_datatype.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
#include "coll_adapt_inbuf.h"
|
||||
|
||||
/* Bcast constant context in bcast context */
|
||||
struct ompi_coll_adapt_constant_bcast_context_s {
|
||||
opal_object_t super;
|
||||
int root;
|
||||
size_t count;
|
||||
size_t seg_count;
|
||||
ompi_datatype_t *datatype;
|
||||
ompi_communicator_t *comm;
|
||||
int real_seg_size;
|
||||
int num_segs;
|
||||
ompi_request_t *request;
|
||||
opal_mutex_t *mutex;
|
||||
int *recv_array;
|
||||
int *send_array;
|
||||
/* Length of the fragment array, which is the number of recevied segments */
|
||||
int num_recv_segs;
|
||||
/* Number of segments that is finishing recving */
|
||||
int num_recv_fini;
|
||||
/* Store the number of sent segments */
|
||||
int num_sent_segs;
|
||||
ompi_coll_tree_t *tree;
|
||||
int ibcast_tag;
|
||||
};
|
||||
|
||||
typedef struct ompi_coll_adapt_constant_bcast_context_s ompi_coll_adapt_constant_bcast_context_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_constant_bcast_context_t);
|
||||
|
||||
|
||||
/* Bcast context of each segment*/
|
||||
typedef struct ompi_coll_adapt_bcast_context_s ompi_coll_adapt_bcast_context_t;
|
||||
|
||||
typedef int (*ompi_coll_adapt_bcast_cuda_callback_fn_t) (ompi_coll_adapt_bcast_context_t * context);
|
||||
|
||||
struct ompi_coll_adapt_bcast_context_s {
|
||||
opal_free_list_item_t super;
|
||||
char *buff;
|
||||
int frag_id;
|
||||
int child_id;
|
||||
int peer;
|
||||
ompi_coll_adapt_constant_bcast_context_t *con;
|
||||
};
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_bcast_context_t);
|
||||
|
||||
/* Reduce constant context in reduce context */
|
||||
struct ompi_coll_adapt_constant_reduce_context_s {
|
||||
opal_object_t super;
|
||||
size_t count;
|
||||
size_t seg_count;
|
||||
ompi_datatype_t *datatype;
|
||||
ompi_communicator_t *comm;
|
||||
size_t real_seg_size;
|
||||
/* Increment of each segment */
|
||||
int segment_increment;
|
||||
int num_segs;
|
||||
int rank;
|
||||
int root;
|
||||
/* The distance between the address of inbuf->buff and the address of inbuf */
|
||||
int distance;
|
||||
int ireduce_tag;
|
||||
/* How many sends are posted but not finished */
|
||||
int32_t ongoing_send;
|
||||
/* Length of the fragment array, which is the number of recevied segments */
|
||||
int32_t num_recv_segs;
|
||||
/* Number of sent segments */
|
||||
int32_t num_sent_segs;
|
||||
/* Next seg need to be received for every children */
|
||||
int32_t *next_recv_segs;
|
||||
/* Mutex to protect each segment when do the reduce op */
|
||||
opal_mutex_t *mutex_op_list;
|
||||
/* Reduce operation */
|
||||
ompi_op_t *op;
|
||||
ompi_coll_tree_t *tree;
|
||||
/* Accumulate buff */
|
||||
char **accumbuf;
|
||||
ptrdiff_t lower_bound;
|
||||
char *sbuf;
|
||||
char *rbuf;
|
||||
opal_free_list_t inbuf_list;
|
||||
/* Mutex to protect recv_list */
|
||||
opal_mutex_t mutex_recv_list;
|
||||
/* A list to store the segments which are received and not yet be sent */
|
||||
opal_list_t recv_list;
|
||||
ompi_request_t *request;
|
||||
};
|
||||
|
||||
typedef struct ompi_coll_adapt_constant_reduce_context_s ompi_coll_adapt_constant_reduce_context_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_constant_reduce_context_t);
|
||||
|
||||
/* Reduce context of each segment */
|
||||
typedef struct ompi_coll_adapt_reduce_context_s ompi_coll_adapt_reduce_context_t;
|
||||
|
||||
typedef int (*ompi_coll_adapt_reduce_cuda_callback_fn_t) (ompi_coll_adapt_reduce_context_t * context);
|
||||
|
||||
struct ompi_coll_adapt_reduce_context_s {
|
||||
opal_free_list_item_t super;
|
||||
char *buff;
|
||||
int seg_index;
|
||||
int child_id;
|
||||
int peer;
|
||||
ompi_coll_adapt_constant_reduce_context_t *con;
|
||||
/* store the incoming segment */
|
||||
ompi_coll_adapt_inbuf_t *inbuf;
|
||||
};
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_reduce_context_t);
|
575
ompi/mca/coll/adapt/coll_adapt_ibcast.c
Обычный файл
575
ompi/mca/coll/adapt/coll_adapt_ibcast.c
Обычный файл
@ -0,0 +1,575 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
#include "coll_adapt_context.h"
|
||||
#include "coll_adapt_topocache.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "ompi/mca/pml/ob1/pml_ob1.h"
|
||||
|
||||
static int ompi_coll_adapt_ibcast_generic(IBCAST_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
|
||||
/*
|
||||
* Set up MCA parameters of MPI_Bcast and MPI_IBcast
|
||||
*/
|
||||
int ompi_coll_adapt_ibcast_register(void)
|
||||
{
|
||||
mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version;
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_algorithm = 1;
|
||||
mca_base_component_var_register(c, "bcast_algorithm",
|
||||
"Algorithm of broadcast, 0: tuned, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_algorithm);
|
||||
if( (mca_coll_adapt_component.adapt_ibcast_algorithm < 0) ||
|
||||
(mca_coll_adapt_component.adapt_ibcast_algorithm >= OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) {
|
||||
mca_coll_adapt_component.adapt_ibcast_algorithm = 1;
|
||||
}
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size = 0;
|
||||
mca_base_component_var_register(c, "bcast_segment_size",
|
||||
"Segment size in bytes used by default for bcast algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_max_send_requests = 2;
|
||||
mca_base_component_var_register(c, "bcast_max_send_requests",
|
||||
"Maximum number of send requests",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_max_send_requests);
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_max_recv_requests = 3;
|
||||
mca_base_component_var_register(c, "bcast_max_recv_requests",
|
||||
"Maximum number of receive requests",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_max_recv_requests);
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_synchronous_send = true;
|
||||
(void) mca_base_component_var_register(c, "bcast_synchronous_send",
|
||||
"Whether to use synchronous send operations during setup of bcast operations",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ibcast_synchronous_send);
|
||||
|
||||
mca_coll_adapt_component.adapt_ibcast_context_free_list = NULL;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Release the free list created in ompi_coll_adapt_ibcast_generic
|
||||
*/
|
||||
int ompi_coll_adapt_ibcast_fini(void)
|
||||
{
|
||||
if (NULL != mca_coll_adapt_component.adapt_ibcast_context_free_list) {
|
||||
OBJ_RELEASE(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
mca_coll_adapt_component.adapt_ibcast_context_free_list = NULL;
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "ibcast fini\n"));
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Finish a ibcast request
|
||||
*/
|
||||
static int ibcast_request_fini(ompi_coll_adapt_bcast_context_t * context)
|
||||
{
|
||||
ompi_request_t *temp_req = context->con->request;
|
||||
if (context->con->tree->tree_nextsize != 0) {
|
||||
free(context->con->send_array);
|
||||
}
|
||||
if (context->con->num_segs != 0) {
|
||||
free(context->con->recv_array);
|
||||
}
|
||||
OBJ_RELEASE(context->con->mutex);
|
||||
OBJ_RELEASE(context->con);
|
||||
ompi_request_complete(temp_req, 1);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback function of isend
|
||||
*/
|
||||
static int send_cb(ompi_request_t * req)
|
||||
{
|
||||
ompi_coll_adapt_bcast_context_t *context =
|
||||
(ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data;
|
||||
|
||||
int err;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Send(cb): segment %d to %d at buff %p root %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id,
|
||||
context->peer, (void *) context->buff, context->con->root));
|
||||
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
int sent_id = context->con->send_array[context->child_id];
|
||||
/* If the current process has fragments in recv_array can be sent */
|
||||
if (sent_id < context->con->num_recv_segs) {
|
||||
ompi_request_t *send_req;
|
||||
ompi_coll_adapt_bcast_context_t *send_context;
|
||||
int new_id = context->con->recv_array[sent_id];
|
||||
++(context->con->send_array[context->child_id]);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
send_context->buff =
|
||||
context->buff + (new_id - context->frag_id) * context->con->real_seg_size;
|
||||
send_context->frag_id = new_id;
|
||||
send_context->child_id = context->child_id;
|
||||
send_context->peer = context->peer;
|
||||
send_context->con = context->con;
|
||||
int send_count = send_context->con->seg_count;
|
||||
if (new_id == (send_context->con->num_segs - 1)) {
|
||||
send_count = send_context->con->count - new_id * send_context->con->seg_count;
|
||||
}
|
||||
char *send_buff = send_context->buff;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Send(start in send cb): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
|
||||
send_context->peer, (void *) send_context->buff, send_count,
|
||||
send_context->con->ibcast_tag - new_id));
|
||||
err = MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype, send_context->peer,
|
||||
send_context->con->ibcast_tag - new_id,
|
||||
MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *)send_context);
|
||||
OBJ_RELEASE(context->con);
|
||||
return err;
|
||||
}
|
||||
/* Set send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
} else {
|
||||
/* No future send here, we can release the ref */
|
||||
OBJ_RELEASE(context->con);
|
||||
}
|
||||
int num_sent = ++(context->con->num_sent_segs);
|
||||
int num_recv_fini = context->con->num_recv_fini;
|
||||
int rank = ompi_comm_rank(context->con->comm);
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
/* Check whether signal the condition */
|
||||
if ((rank == context->con->root
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs)
|
||||
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
|
||||
&& num_recv_fini == context->con->num_segs)) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n",
|
||||
ompi_comm_rank(context->con->comm)));
|
||||
ibcast_request_fini(context);
|
||||
}
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
req->req_free(&req);
|
||||
/* Call back function return 1 to signal that request has been free'd */
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback function of irecv
|
||||
*/
|
||||
static int recv_cb(ompi_request_t * req)
|
||||
{
|
||||
/* Get necessary info from request */
|
||||
ompi_coll_adapt_bcast_context_t *context =
|
||||
(ompi_coll_adapt_bcast_context_t *) req->req_complete_cb_data;
|
||||
|
||||
int err, i;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Recv(cb): segment %d from %d at buff %p root %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id,
|
||||
context->peer, (void *) context->buff, context->con->root));
|
||||
|
||||
/* Store the frag_id to seg array */
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
int num_recv_segs = ++(context->con->num_recv_segs);
|
||||
context->con->recv_array[num_recv_segs - 1] = context->frag_id;
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
|
||||
int new_id = num_recv_segs + mca_coll_adapt_component.adapt_ibcast_max_recv_requests - 1;
|
||||
/* Receive new segment */
|
||||
if (new_id < context->con->num_segs) {
|
||||
ompi_request_t *recv_req;
|
||||
ompi_coll_adapt_bcast_context_t *recv_context;
|
||||
/* Get new context item from free list */
|
||||
recv_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
recv_context->buff =
|
||||
context->buff + (new_id - context->frag_id) * context->con->real_seg_size;
|
||||
recv_context->frag_id = new_id;
|
||||
recv_context->child_id = context->child_id;
|
||||
recv_context->peer = context->peer;
|
||||
recv_context->con = context->con;
|
||||
OBJ_RETAIN(context->con);
|
||||
int recv_count = recv_context->con->seg_count;
|
||||
if (new_id == (recv_context->con->num_segs - 1)) {
|
||||
recv_count = recv_context->con->count - new_id * recv_context->con->seg_count;
|
||||
}
|
||||
char *recv_buff = recv_context->buff;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Recv(start in recv cb): segment %d from %d at buff %p recv_count %d tag %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id, context->peer,
|
||||
(void *) recv_buff, recv_count,
|
||||
recv_context->con->ibcast_tag - recv_context->frag_id));
|
||||
MCA_PML_CALL(irecv
|
||||
(recv_buff, recv_count, recv_context->con->datatype, recv_context->peer,
|
||||
recv_context->con->ibcast_tag - recv_context->frag_id,
|
||||
recv_context->con->comm, &recv_req));
|
||||
|
||||
/* Set the receive callback */
|
||||
ompi_request_set_callback(recv_req, recv_cb, recv_context);
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
/* Propagate segment to all children */
|
||||
for (i = 0; i < context->con->tree->tree_nextsize; i++) {
|
||||
/* If the current process can send the segment now, which means the only segment need to be sent is the just arrived one */
|
||||
if (num_recv_segs - 1 == context->con->send_array[i]) {
|
||||
ompi_request_t *send_req;
|
||||
|
||||
++(context->con->send_array[i]);
|
||||
|
||||
/* release mutex to avoid deadlock in case a callback is triggered below */
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
|
||||
int send_count = context->con->seg_count;
|
||||
if (context->frag_id == (context->con->num_segs - 1)) {
|
||||
send_count = context->con->count - context->frag_id * context->con->seg_count;
|
||||
}
|
||||
|
||||
ompi_coll_adapt_bcast_context_t *send_context;
|
||||
send_context = (ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.adapt_ibcast_context_free_list);
|
||||
send_context->buff = context->buff;
|
||||
send_context->frag_id = context->frag_id;
|
||||
send_context->child_id = i;
|
||||
send_context->peer = context->con->tree->tree_next[i];
|
||||
send_context->con = context->con;
|
||||
OBJ_RETAIN(context->con);
|
||||
char *send_buff = send_context->buff;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Send(start in recv cb): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
ompi_comm_rank(send_context->con->comm), send_context->frag_id,
|
||||
send_context->peer, (void *) send_context->buff, send_count,
|
||||
send_context->con->ibcast_tag - send_context->frag_id));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
send_context->con->ibcast_tag - send_context->frag_id,
|
||||
MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *)send_context);
|
||||
OBJ_RELEASE(context->con);
|
||||
return err;
|
||||
}
|
||||
/* Set send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
|
||||
/* retake the mutex for next iteration */
|
||||
OPAL_THREAD_LOCK(context->con->mutex);
|
||||
}
|
||||
}
|
||||
OBJ_RELEASE(context->con);
|
||||
|
||||
int num_sent = context->con->num_sent_segs;
|
||||
int num_recv_fini = ++(context->con->num_recv_fini);
|
||||
|
||||
OPAL_THREAD_UNLOCK(context->con->mutex);
|
||||
/* If this process is leaf and has received all the segments */
|
||||
if ((context->con->tree->tree_nextsize > 0
|
||||
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
|
||||
&& num_recv_fini == context->con->num_segs) || (context->con->tree->tree_nextsize == 0
|
||||
&& num_recv_fini == context->con->num_segs)) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n",
|
||||
ompi_comm_rank(context->con->comm)));
|
||||
ibcast_request_fini(context);
|
||||
}
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
req->req_free(&req);
|
||||
|
||||
/* Call back function return 1 to signal that request has been free'd */
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ibcast(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
|
||||
"ibcast root %d, algorithm %d, coll_adapt_ibcast_segment_size %zu, coll_adapt_ibcast_max_send_requests %d, coll_adapt_ibcast_max_recv_requests %d\n",
|
||||
root, mca_coll_adapt_component.adapt_ibcast_algorithm,
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size,
|
||||
mca_coll_adapt_component.adapt_ibcast_max_send_requests,
|
||||
mca_coll_adapt_component.adapt_ibcast_max_recv_requests));
|
||||
|
||||
if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ibcast_algorithm) {
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
return ompi_coll_adapt_ibcast_generic(buff, count, datatype, root, comm, request, module,
|
||||
adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ibcast_algorithm),
|
||||
mca_coll_adapt_component.adapt_ibcast_segment_size);
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ibcast_generic(void *buff, int count, struct ompi_datatype_t *datatype, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size)
|
||||
{
|
||||
int i, j, rank, err;
|
||||
/* The min of num_segs and SEND_NUM or RECV_NUM, in case the num_segs is less than SEND_NUM or RECV_NUM */
|
||||
int min;
|
||||
|
||||
/* Number of datatype in a segment */
|
||||
int seg_count = count;
|
||||
/* Size of a datatype */
|
||||
size_t type_size;
|
||||
/* Real size of a segment */
|
||||
size_t real_seg_size;
|
||||
ptrdiff_t extent, lb;
|
||||
/* Number of segments */
|
||||
int num_segs;
|
||||
|
||||
mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ibcast_synchronous_send)
|
||||
? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD;
|
||||
|
||||
/* The request passed outside */
|
||||
ompi_coll_base_nbc_request_t *temp_request = NULL;
|
||||
opal_mutex_t *mutex;
|
||||
/* Store the segments which are received */
|
||||
int *recv_array = NULL;
|
||||
/* Record how many isends have been issued for every child */
|
||||
int *send_array = NULL;
|
||||
|
||||
/* Atomically set up free list */
|
||||
if (NULL == mca_coll_adapt_component.adapt_ibcast_context_free_list) {
|
||||
opal_free_list_t* fl = OBJ_NEW(opal_free_list_t);
|
||||
opal_free_list_init(fl,
|
||||
sizeof(ompi_coll_adapt_bcast_context_t),
|
||||
opal_cache_line_size,
|
||||
OBJ_CLASS(ompi_coll_adapt_bcast_context_t),
|
||||
0, opal_cache_line_size,
|
||||
mca_coll_adapt_component.adapt_context_free_list_min,
|
||||
mca_coll_adapt_component.adapt_context_free_list_max,
|
||||
mca_coll_adapt_component.adapt_context_free_list_inc,
|
||||
NULL, 0, NULL, NULL, NULL);
|
||||
if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&mca_coll_adapt_component.adapt_ibcast_context_free_list,
|
||||
&(intptr_t){0}, fl) ) {
|
||||
OBJ_RELEASE(fl);
|
||||
}
|
||||
}
|
||||
|
||||
/* Set up request */
|
||||
temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t);
|
||||
OMPI_REQUEST_INIT(&temp_request->super, false);
|
||||
temp_request->super.req_state = OMPI_REQUEST_ACTIVE;
|
||||
temp_request->super.req_type = OMPI_REQUEST_COLL;
|
||||
temp_request->super.req_free = ompi_coll_adapt_request_free;
|
||||
temp_request->super.req_status.MPI_SOURCE = 0;
|
||||
temp_request->super.req_status.MPI_TAG = 0;
|
||||
temp_request->super.req_status.MPI_ERROR = 0;
|
||||
temp_request->super.req_status._cancelled = 0;
|
||||
temp_request->super.req_status._ucount = 0;
|
||||
*request = (ompi_request_t*)temp_request;
|
||||
|
||||
/* Set up mutex */
|
||||
mutex = OBJ_NEW(opal_mutex_t);
|
||||
|
||||
rank = ompi_comm_rank(comm);
|
||||
|
||||
/* Determine number of elements sent per operation */
|
||||
ompi_datatype_type_size(datatype, &type_size);
|
||||
COLL_BASE_COMPUTED_SEGCOUNT(seg_size, type_size, seg_count);
|
||||
|
||||
ompi_datatype_get_extent(datatype, &lb, &extent);
|
||||
num_segs = (count + seg_count - 1) / seg_count;
|
||||
real_seg_size = (ptrdiff_t) seg_count *extent;
|
||||
|
||||
/* Set memory for recv_array and send_array, created on heap becasue they are needed to be accessed by other functions (callback functions) */
|
||||
if (num_segs != 0) {
|
||||
recv_array = (int *) malloc(sizeof(int) * num_segs);
|
||||
}
|
||||
if (tree->tree_nextsize != 0) {
|
||||
send_array = (int *) malloc(sizeof(int) * tree->tree_nextsize);
|
||||
}
|
||||
|
||||
/* Set constant context for send and recv call back */
|
||||
ompi_coll_adapt_constant_bcast_context_t *con = OBJ_NEW(ompi_coll_adapt_constant_bcast_context_t);
|
||||
con->root = root;
|
||||
con->count = count;
|
||||
con->seg_count = seg_count;
|
||||
con->datatype = datatype;
|
||||
con->comm = comm;
|
||||
con->real_seg_size = real_seg_size;
|
||||
con->num_segs = num_segs;
|
||||
con->recv_array = recv_array;
|
||||
con->num_recv_segs = 0;
|
||||
con->num_recv_fini = 0;
|
||||
con->send_array = send_array;
|
||||
con->num_sent_segs = 0;
|
||||
con->mutex = mutex;
|
||||
con->request = (ompi_request_t*)temp_request;
|
||||
con->tree = tree;
|
||||
con->ibcast_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Ibcast, root %d, tag %d\n", rank, root,
|
||||
con->ibcast_tag));
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: con->mutex = %p, num_children = %d, num_segs = %d, real_seg_size = %d, seg_count = %d, tree_adreess = %p\n",
|
||||
rank, (void *) con->mutex, tree->tree_nextsize, num_segs,
|
||||
(int) real_seg_size, seg_count, (void *) con->tree));
|
||||
|
||||
OPAL_THREAD_LOCK(mutex);
|
||||
|
||||
/* If the current process is root, it sends segment to every children */
|
||||
if (rank == root) {
|
||||
/* Handle the situation when num_segs < SEND_NUM */
|
||||
if (num_segs <= mca_coll_adapt_component.adapt_ibcast_max_send_requests) {
|
||||
min = num_segs;
|
||||
} else {
|
||||
min = mca_coll_adapt_component.adapt_ibcast_max_send_requests;
|
||||
}
|
||||
|
||||
/* Set recv_array, root has already had all the segments */
|
||||
for (i = 0; i < num_segs; i++) {
|
||||
recv_array[i] = i;
|
||||
}
|
||||
con->num_recv_segs = num_segs;
|
||||
/* Set send_array, will send ompi_coll_adapt_ibcast_max_send_requests segments */
|
||||
for (i = 0; i < tree->tree_nextsize; i++) {
|
||||
send_array[i] = mca_coll_adapt_component.adapt_ibcast_max_send_requests;
|
||||
}
|
||||
|
||||
ompi_request_t *send_req;
|
||||
/* Number of datatypes in each send */
|
||||
int send_count = seg_count;
|
||||
for (i = 0; i < min; i++) {
|
||||
if (i == (num_segs - 1)) {
|
||||
send_count = count - i * seg_count;
|
||||
}
|
||||
for (j = 0; j < tree->tree_nextsize; j++) {
|
||||
ompi_coll_adapt_bcast_context_t *context =
|
||||
(ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ibcast_context_free_list);
|
||||
context->buff = (char *) buff + i * real_seg_size;
|
||||
context->frag_id = i;
|
||||
/* The id of peer in in children_list */
|
||||
context->child_id = j;
|
||||
/* Actural rank of the peer */
|
||||
context->peer = tree->tree_next[j];
|
||||
context->con = con;
|
||||
OBJ_RETAIN(con);
|
||||
|
||||
char *send_buff = context->buff;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Send(start in main): segment %d to %d at buff %p send_count %d tag %d\n",
|
||||
rank, context->frag_id, context->peer,
|
||||
(void *) send_buff, send_count, con->ibcast_tag - i));
|
||||
err =
|
||||
MCA_PML_CALL(isend
|
||||
(send_buff, send_count, datatype, context->peer,
|
||||
con->ibcast_tag - i, sendmode, comm,
|
||||
&send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Set send callback */
|
||||
OPAL_THREAD_UNLOCK(mutex);
|
||||
ompi_request_set_callback(send_req, send_cb, context);
|
||||
OPAL_THREAD_LOCK(mutex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* If the current process is not root, it receives data from parent in the tree. */
|
||||
else {
|
||||
/* Handle the situation when num_segs < RECV_NUM */
|
||||
if (num_segs <= mca_coll_adapt_component.adapt_ibcast_max_recv_requests) {
|
||||
min = num_segs;
|
||||
} else {
|
||||
min = mca_coll_adapt_component.adapt_ibcast_max_recv_requests;
|
||||
}
|
||||
|
||||
/* Set recv_array, recv_array is empty */
|
||||
for (i = 0; i < num_segs; i++) {
|
||||
recv_array[i] = 0;
|
||||
}
|
||||
/* Set send_array to empty */
|
||||
for (i = 0; i < tree->tree_nextsize; i++) {
|
||||
send_array[i] = 0;
|
||||
}
|
||||
|
||||
/* Create a recv request */
|
||||
ompi_request_t *recv_req;
|
||||
|
||||
/* Recevice some segments from its parent */
|
||||
int recv_count = seg_count;
|
||||
for (i = 0; i < min; i++) {
|
||||
if (i == (num_segs - 1)) {
|
||||
recv_count = count - i * seg_count;
|
||||
}
|
||||
ompi_coll_adapt_bcast_context_t *context =
|
||||
(ompi_coll_adapt_bcast_context_t *) opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ibcast_context_free_list);
|
||||
context->buff = (char *) buff + i * real_seg_size;
|
||||
context->frag_id = i;
|
||||
context->peer = tree->tree_prev;
|
||||
context->con = con;
|
||||
OBJ_RETAIN(con);
|
||||
char *recv_buff = context->buff;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Recv(start in main): segment %d from %d at buff %p recv_count %d tag %d\n",
|
||||
ompi_comm_rank(context->con->comm), context->frag_id,
|
||||
context->peer, (void *) recv_buff, recv_count,
|
||||
con->ibcast_tag - i));
|
||||
err =
|
||||
MCA_PML_CALL(irecv
|
||||
(recv_buff, recv_count, datatype, context->peer,
|
||||
con->ibcast_tag - i, comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Set receive callback */
|
||||
OPAL_THREAD_UNLOCK(mutex);
|
||||
ompi_request_set_callback(recv_req, recv_cb, context);
|
||||
OPAL_THREAD_LOCK(mutex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(mutex);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: End of Ibcast\n", rank));
|
||||
|
||||
return MPI_SUCCESS;
|
||||
}
|
16
ompi/mca/coll/adapt/coll_adapt_inbuf.c
Обычный файл
16
ompi/mca/coll/adapt/coll_adapt_inbuf.c
Обычный файл
@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_inbuf.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_inbuf_t, opal_free_list_item_t,
|
||||
NULL, NULL);
|
26
ompi/mca/coll/adapt/coll_adapt_inbuf.h
Обычный файл
26
ompi/mca/coll/adapt/coll_adapt_inbuf.h
Обычный файл
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_COLL_ADAPT_INBUF_H
|
||||
#define MCA_COLL_ADAPT_INBUF_H
|
||||
|
||||
#include "opal/class/opal_free_list.h"
|
||||
|
||||
struct ompi_coll_adapt_inbuf_s {
|
||||
opal_free_list_item_t super;
|
||||
char buff[];
|
||||
};
|
||||
|
||||
typedef struct ompi_coll_adapt_inbuf_s ompi_coll_adapt_inbuf_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_inbuf_t);
|
||||
|
||||
#endif /* MCA_COLL_ADAPT_INBUF_H */
|
771
ompi/mca/coll/adapt/coll_adapt_ireduce.c
Обычный файл
771
ompi/mca/coll/adapt/coll_adapt_ireduce.c
Обычный файл
@ -0,0 +1,771 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
#include "coll_adapt_context.h"
|
||||
#include "coll_adapt_item.h"
|
||||
#include "coll_adapt_topocache.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
|
||||
static int ompi_coll_adapt_ireduce_generic(IREDUCE_ARGS,
|
||||
ompi_coll_tree_t * tree, size_t seg_size);
|
||||
|
||||
/* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */
|
||||
|
||||
/*
|
||||
* Set up MCA parameters of MPI_Reduce and MPI_Ireduce
|
||||
*/
|
||||
int ompi_coll_adapt_ireduce_register(void)
|
||||
{
|
||||
mca_base_component_t *c = &mca_coll_adapt_component.super.collm_version;
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_algorithm = 1;
|
||||
mca_base_component_var_register(c, "reduce_algorithm",
|
||||
"Algorithm of reduce, 1: binomial, 2: in_order_binomial, 3: binary, 4: pipeline, 5: chain, 6: linear", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_algorithm);
|
||||
if( (mca_coll_adapt_component.adapt_ireduce_algorithm < 0) ||
|
||||
(mca_coll_adapt_component.adapt_ireduce_algorithm > OMPI_COLL_ADAPT_ALGORITHM_COUNT) ) {
|
||||
mca_coll_adapt_component.adapt_ireduce_algorithm = 1;
|
||||
}
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size = 163740;
|
||||
mca_base_component_var_register(c, "reduce_segment_size",
|
||||
"Segment size in bytes used by default for reduce algorithms. Only has meaning if algorithm is forced and supports segmenting. 0 bytes means no segmentation.",
|
||||
MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_max_send_requests = 2;
|
||||
mca_base_component_var_register(c, "reduce_max_send_requests",
|
||||
"Maximum number of send requests",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_max_send_requests);
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_max_recv_requests = 3;
|
||||
mca_base_component_var_register(c, "reduce_max_recv_requests",
|
||||
"Maximum number of receive requests per peer",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_max_recv_requests);
|
||||
|
||||
mca_coll_adapt_component.adapt_inbuf_free_list_min = 10;
|
||||
mca_base_component_var_register(c, "inbuf_free_list_min",
|
||||
"Minimum number of segment in inbuf free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_inbuf_free_list_min);
|
||||
|
||||
mca_coll_adapt_component.adapt_inbuf_free_list_max = 10000;
|
||||
mca_base_component_var_register(c, "inbuf_free_list_max",
|
||||
"Maximum number of segment in inbuf free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_inbuf_free_list_max);
|
||||
|
||||
|
||||
mca_coll_adapt_component.adapt_inbuf_free_list_inc = 10;
|
||||
mca_base_component_var_register(c, "inbuf_free_list_inc",
|
||||
"Number of segments to allocate when growing the inbuf free list",
|
||||
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_5,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_inbuf_free_list_inc);
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_synchronous_send = true;
|
||||
(void) mca_base_component_var_register(c, "reduce_synchronous_send",
|
||||
"Whether to use synchronous send operations during setup of reduce operations",
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
&mca_coll_adapt_component.adapt_ireduce_synchronous_send);
|
||||
|
||||
mca_coll_adapt_component.adapt_ireduce_context_free_list = NULL;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Release the free list created in ompi_coll_adapt_ireduce_generic
|
||||
*/
|
||||
int ompi_coll_adapt_ireduce_fini(void)
|
||||
{
|
||||
if (NULL != mca_coll_adapt_component.adapt_ireduce_context_free_list) {
|
||||
OBJ_RELEASE(mca_coll_adapt_component.adapt_ireduce_context_free_list);
|
||||
mca_coll_adapt_component.adapt_ireduce_context_free_list = NULL;
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "ireduce fini\n"));
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Functions to access list
|
||||
*/
|
||||
static ompi_coll_adapt_item_t *get_next_ready_item(ompi_coll_adapt_constant_reduce_context_t *con, int num_children)
|
||||
{
|
||||
ompi_coll_adapt_item_t *item = NULL, *temp_item;
|
||||
if (opal_list_is_empty(&con->recv_list)) {
|
||||
return NULL;
|
||||
}
|
||||
OPAL_THREAD_LOCK(&con->mutex_recv_list);
|
||||
OPAL_LIST_FOREACH(temp_item, &con->recv_list, ompi_coll_adapt_item_t) {
|
||||
if (temp_item->count == num_children) {
|
||||
item = temp_item;
|
||||
opal_list_remove_item(&con->recv_list, (opal_list_item_t *) temp_item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&con->mutex_recv_list);
|
||||
return item;
|
||||
}
|
||||
|
||||
static int add_to_recv_list(ompi_coll_adapt_constant_reduce_context_t *con, int id)
|
||||
{
|
||||
ompi_coll_adapt_item_t *item;
|
||||
|
||||
OPAL_THREAD_LOCK(&con->mutex_recv_list);
|
||||
OPAL_LIST_FOREACH(item, &con->recv_list, ompi_coll_adapt_item_t) {
|
||||
if (item->id == id) {
|
||||
(item->count)++;
|
||||
OPAL_THREAD_UNLOCK(&con->mutex_recv_list);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 1\n"));
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Add a new object to the list with count set to 1 */
|
||||
item = OBJ_NEW(ompi_coll_adapt_item_t);
|
||||
item->id = id;
|
||||
item->count = 1;
|
||||
opal_list_append(&con->recv_list, (opal_list_item_t *) item);
|
||||
OPAL_THREAD_UNLOCK(&con->mutex_recv_list);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "add_to_recv_list_return 2\n"));
|
||||
return 2;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the inbuf address
|
||||
*/
|
||||
static ompi_coll_adapt_inbuf_t *to_inbuf(char *buf, int distance)
|
||||
{
|
||||
return (ompi_coll_adapt_inbuf_t *) (buf - distance);
|
||||
}
|
||||
|
||||
/*
|
||||
* Finish a ireduce request
|
||||
*/
|
||||
static int ireduce_request_fini(ompi_coll_adapt_reduce_context_t * context)
|
||||
{
|
||||
/* Return the allocated recourses */
|
||||
ompi_request_t *temp_req = context->con->request;
|
||||
if (context->con->accumbuf != NULL) {
|
||||
if (context->con->rank != context->con->root) {
|
||||
for (int i = 0; i < context->con->num_segs; i++) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: Return accumbuf %d %p\n",
|
||||
ompi_comm_rank(context->con->comm), i,
|
||||
(void *) to_inbuf(context->con->accumbuf[i],
|
||||
context->con->distance)));
|
||||
opal_free_list_return_st(&context->con->inbuf_list,
|
||||
(opal_free_list_item_t *) to_inbuf(context->con->accumbuf[i],
|
||||
context->con->distance));
|
||||
}
|
||||
}
|
||||
free(context->con->accumbuf);
|
||||
}
|
||||
for (int i = 0; i < context->con->num_segs; i++) {
|
||||
OBJ_DESTRUCT(&context->con->mutex_op_list[i]);
|
||||
}
|
||||
free(context->con->mutex_op_list);
|
||||
if (context->con->tree->tree_nextsize > 0) {
|
||||
free(context->con->next_recv_segs);
|
||||
}
|
||||
OBJ_RELEASE(context->con);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n"));
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
/* Complete the request */
|
||||
ompi_request_complete(temp_req, 1);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback function of isend
|
||||
*/
|
||||
static int send_cb(ompi_request_t * req)
|
||||
{
|
||||
ompi_coll_adapt_reduce_context_t *context =
|
||||
(ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: ireduce_send_cb, peer %d, seg_id %d\n", context->con->rank,
|
||||
context->peer, context->seg_index));
|
||||
int err;
|
||||
|
||||
opal_atomic_sub_fetch_32(&(context->con->ongoing_send), 1);
|
||||
|
||||
/* Send a new segment */
|
||||
ompi_coll_adapt_item_t *item =
|
||||
get_next_ready_item(context->con, context->con->tree->tree_nextsize);
|
||||
|
||||
if (item != NULL) {
|
||||
/* Get new context item from free list */
|
||||
ompi_coll_adapt_reduce_context_t *send_context =
|
||||
(ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ireduce_context_free_list);
|
||||
if (context->con->tree->tree_nextsize > 0) {
|
||||
send_context->buff = context->con->accumbuf[item->id];
|
||||
} else {
|
||||
send_context->buff =
|
||||
context->buff + (item->id - context->seg_index) * context->con->segment_increment;
|
||||
}
|
||||
send_context->seg_index = item->id;
|
||||
send_context->peer = context->peer;
|
||||
send_context->con = context->con;
|
||||
|
||||
opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1);
|
||||
|
||||
int send_count = send_context->con->seg_count;
|
||||
if (item->id == (send_context->con->num_segs - 1)) {
|
||||
send_count = send_context->con->count - item->id * send_context->con->seg_count;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In send_cb, create isend to seg %d, peer %d, tag %d\n",
|
||||
send_context->con->rank, send_context->seg_index, send_context->peer,
|
||||
send_context->con->ireduce_tag - send_context->seg_index));
|
||||
|
||||
ompi_request_t *send_req;
|
||||
err = MCA_PML_CALL(isend
|
||||
(send_context->buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
context->con->ireduce_tag - send_context->seg_index,
|
||||
MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Release the item */
|
||||
OBJ_RELEASE(item);
|
||||
|
||||
/* Set the send call back */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
}
|
||||
|
||||
int32_t num_sent = opal_atomic_add_fetch_32(&(context->con->num_sent_segs), 1);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In send_cb, root = %d, num_sent = %d, num_segs = %d\n",
|
||||
context->con->rank, context->con->tree->tree_root, num_sent,
|
||||
context->con->num_segs));
|
||||
/* Check whether signal the condition, non root and sent all the segments */
|
||||
if (num_sent == context->con->num_segs &&
|
||||
context->con->num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize) {
|
||||
ireduce_request_fini(context);
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "return context_list\n"));
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
}
|
||||
/* Call back function return 1, which means successful */
|
||||
req->req_free(&req);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback function of irecv
|
||||
*/
|
||||
static int recv_cb(ompi_request_t * req)
|
||||
{
|
||||
ompi_coll_adapt_reduce_context_t *context = (ompi_coll_adapt_reduce_context_t *) req->req_complete_cb_data;
|
||||
int32_t new_id = opal_atomic_add_fetch_32(&(context->con->next_recv_segs[context->child_id]), 1);
|
||||
int err;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: ireduce_recv_cb, peer %d, seg_id %d\n", context->con->rank,
|
||||
context->peer, context->seg_index));
|
||||
|
||||
/* Did we still need to receive subsequent fragments from this child ? */
|
||||
if (new_id < context->con->num_segs) {
|
||||
char *temp_recv_buf = NULL;
|
||||
ompi_coll_adapt_inbuf_t *inbuf = NULL;
|
||||
/* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */
|
||||
if (context->child_id == 0 && context->con->sbuf != MPI_IN_PLACE
|
||||
&& context->con->root == context->con->rank) {
|
||||
temp_recv_buf = (char *) context->con->rbuf +
|
||||
(ptrdiff_t) new_id *(ptrdiff_t) context->con->segment_increment;
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In recv_cb, alloc inbuf\n", context->con->rank));
|
||||
inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&context->con->inbuf_list);
|
||||
temp_recv_buf = inbuf->buff - context->con->lower_bound;
|
||||
}
|
||||
/* Get new context item from free list */
|
||||
ompi_coll_adapt_reduce_context_t *recv_context =
|
||||
(ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ireduce_context_free_list);
|
||||
recv_context->buff = temp_recv_buf;
|
||||
recv_context->seg_index = new_id;
|
||||
recv_context->child_id = context->child_id;
|
||||
recv_context->peer = context->peer;
|
||||
recv_context->con = context->con;
|
||||
recv_context->inbuf = inbuf;
|
||||
int recv_count = recv_context->con->seg_count;
|
||||
if (new_id == (recv_context->con->num_segs - 1)) {
|
||||
recv_count = recv_context->con->count - new_id * recv_context->con->seg_count;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In recv_cb, create irecv for seg %d, peer %d, inbuf %p, tag %d\n",
|
||||
context->con->rank, recv_context->seg_index, recv_context->peer,
|
||||
(void *) inbuf,
|
||||
recv_context->con->ireduce_tag - recv_context->seg_index));
|
||||
ompi_request_t *recv_req;
|
||||
err = MCA_PML_CALL(irecv(temp_recv_buf, recv_count, recv_context->con->datatype,
|
||||
recv_context->peer,
|
||||
recv_context->con->ireduce_tag - recv_context->seg_index,
|
||||
recv_context->con->comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Set the receive call back */
|
||||
ompi_request_set_callback(recv_req, recv_cb, recv_context);
|
||||
}
|
||||
|
||||
/* Do the op */
|
||||
int op_count = context->con->seg_count;
|
||||
if (context->seg_index == (context->con->num_segs - 1)) {
|
||||
op_count = context->con->count - context->seg_index * context->con->seg_count;
|
||||
}
|
||||
|
||||
int keep_inbuf = 0;
|
||||
OPAL_THREAD_LOCK(&context->con->mutex_op_list[context->seg_index]);
|
||||
if (NULL == context->con->accumbuf[context->seg_index]) {
|
||||
if (NULL == context->inbuf) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: set accumbuf to rbuf\n", context->con->rank));
|
||||
context->con->accumbuf[context->seg_index] = context->buff;
|
||||
} else {
|
||||
keep_inbuf = 1;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: set accumbuf to inbuf\n", context->con->rank));
|
||||
context->con->accumbuf[context->seg_index] = context->inbuf->buff - context->con->lower_bound;
|
||||
}
|
||||
/* Op sbuf and accmbuf to accumbuf */
|
||||
ompi_op_reduce(context->con->op,
|
||||
context->con->sbuf + (ptrdiff_t) context->seg_index * (ptrdiff_t) context->con->segment_increment,
|
||||
context->con->accumbuf[context->seg_index], op_count, context->con->datatype);
|
||||
|
||||
} else {
|
||||
if (NULL == context->inbuf) {
|
||||
/* Op rbuf and accumbuf to rbuf */
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: op rbuf and accumbuf to rbuf\n", context->con->rank));
|
||||
ompi_op_reduce(context->con->op, context->con->accumbuf[context->seg_index],
|
||||
context->buff, op_count, context->con->datatype);
|
||||
/* Free old accumbuf */
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: free old accumbuf %p\n", context->con->rank,
|
||||
(void *) to_inbuf(context->con->accumbuf[context->seg_index],
|
||||
context->con->distance)));
|
||||
opal_free_list_return(&context->con->inbuf_list,
|
||||
(opal_free_list_item_t *) to_inbuf(context->con->accumbuf[context->seg_index],
|
||||
context->con->distance));
|
||||
/* Set accumbut to rbuf */
|
||||
context->con->accumbuf[context->seg_index] = context->buff;
|
||||
} else {
|
||||
/* Op inbuf and accmbuf to accumbuf */
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: op inbuf and accmbuf to accumbuf\n", context->con->rank));
|
||||
ompi_op_reduce(context->con->op, context->inbuf->buff - context->con->lower_bound,
|
||||
context->con->accumbuf[context->seg_index], op_count,
|
||||
context->con->datatype);
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&context->con->mutex_op_list[context->seg_index]);
|
||||
|
||||
/* Set recv list */
|
||||
if (context->con->rank != context->con->tree->tree_root) {
|
||||
add_to_recv_list(context->con, context->seg_index);
|
||||
}
|
||||
|
||||
/* Send to parent */
|
||||
if (context->con->rank != context->con->tree->tree_root
|
||||
&& context->con->ongoing_send < mca_coll_adapt_component.adapt_ireduce_max_send_requests) {
|
||||
ompi_coll_adapt_item_t *item = get_next_ready_item(context->con, context->con->tree->tree_nextsize);
|
||||
|
||||
if (NULL != item) {
|
||||
/* Get new context item from free list */
|
||||
ompi_coll_adapt_reduce_context_t *send_context =
|
||||
(ompi_coll_adapt_reduce_context_t *) opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ireduce_context_free_list);
|
||||
send_context->buff = context->con->accumbuf[context->seg_index];
|
||||
send_context->seg_index = item->id;
|
||||
send_context->peer = context->con->tree->tree_prev;
|
||||
send_context->con = context->con;
|
||||
opal_atomic_add_fetch_32(&(context->con->ongoing_send), 1);
|
||||
|
||||
int send_count = send_context->con->seg_count;
|
||||
if (item->id == (send_context->con->num_segs - 1)) {
|
||||
send_count = send_context->con->count - item->id * send_context->con->seg_count;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In recv_cb, create isend to seg %d, peer %d, tag %d\n",
|
||||
send_context->con->rank, send_context->seg_index, send_context->peer,
|
||||
send_context->con->ireduce_tag - send_context->seg_index));
|
||||
|
||||
ompi_request_t *send_req;
|
||||
err = MCA_PML_CALL(isend(send_context->buff, send_count, send_context->con->datatype,
|
||||
send_context->peer,
|
||||
send_context->con->ireduce_tag - send_context->seg_index,
|
||||
MCA_PML_BASE_SEND_STANDARD, send_context->con->comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
OBJ_RELEASE(item);
|
||||
|
||||
/* Set the send call back */
|
||||
ompi_request_set_callback(send_req, send_cb, send_context);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t num_recv_segs = opal_atomic_add_fetch_32(&(context->con->num_recv_segs), 1);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In recv_cb, tree = %p, root = %d, num_recv = %d, num_segs = %d, num_child = %d\n",
|
||||
context->con->rank, (void *) context->con->tree,
|
||||
context->con->tree->tree_root, num_recv_segs, context->con->num_segs,
|
||||
context->con->tree->tree_nextsize));
|
||||
/* Prepare for releasing all acquired resources */
|
||||
if (!keep_inbuf && NULL != context->inbuf) {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: root free context inbuf %p", context->con->rank,
|
||||
(void *) context->inbuf));
|
||||
opal_free_list_return(&context->con->inbuf_list,
|
||||
(opal_free_list_item_t *) context->inbuf);
|
||||
}
|
||||
/* If this is root and has received all the segments */
|
||||
if (num_recv_segs == context->con->num_segs * context->con->tree->tree_nextsize &&
|
||||
(context->con->tree->tree_root == context->con->rank || context->con->num_sent_segs == context->con->num_segs)) {
|
||||
ireduce_request_fini(context);
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: return context_list",
|
||||
context->con->rank));
|
||||
opal_free_list_return(mca_coll_adapt_component.adapt_ireduce_context_free_list,
|
||||
(opal_free_list_item_t *) context);
|
||||
}
|
||||
req->req_free(&req);
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
|
||||
ompi_request_t ** request, mca_coll_base_module_t * module)
|
||||
{
|
||||
|
||||
/* Fall-back if operation is commutative */
|
||||
if (!ompi_op_is_commute(op)){
|
||||
mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"ADAPT cannot handle reduce with this (non-commutative) operation. It needs to fall back on another component\n"));
|
||||
return adapt_module->previous_ireduce(sbuf, rbuf, count, dtype, op, root,
|
||||
comm, request,
|
||||
adapt_module->previous_reduce_module);
|
||||
}
|
||||
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
|
||||
"ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
|
||||
root, mca_coll_adapt_component.adapt_ireduce_algorithm,
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size,
|
||||
mca_coll_adapt_component.adapt_ireduce_max_send_requests,
|
||||
mca_coll_adapt_component.adapt_ireduce_max_recv_requests));
|
||||
|
||||
if (OMPI_COLL_ADAPT_ALGORITHM_TUNED == mca_coll_adapt_component.adapt_ireduce_algorithm) {
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output, "tuned not implemented\n"));
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
|
||||
return ompi_coll_adapt_ireduce_generic(sbuf, rbuf, count, dtype, op, root, comm, request, module,
|
||||
adapt_module_cached_topology(module, comm, root, mca_coll_adapt_component.adapt_ireduce_algorithm),
|
||||
mca_coll_adapt_component.adapt_ireduce_segment_size);
|
||||
|
||||
}
|
||||
|
||||
|
||||
int ompi_coll_adapt_ireduce_generic(const void *sbuf, void *rbuf, int count,
|
||||
struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root,
|
||||
struct ompi_communicator_t *comm, ompi_request_t ** request,
|
||||
mca_coll_base_module_t * module, ompi_coll_tree_t * tree,
|
||||
size_t seg_size)
|
||||
{
|
||||
|
||||
ptrdiff_t extent, lower_bound, segment_increment;
|
||||
ptrdiff_t true_lower_bound, true_extent, real_seg_size;
|
||||
size_t typelng;
|
||||
int seg_count = count, num_segs, rank, recv_count, send_count, err, min;
|
||||
/* Used to store the accumuate result, pointer to every segment */
|
||||
char **accumbuf = NULL;
|
||||
opal_mutex_t *mutex_op_list;
|
||||
/* A list to store the segments need to be sent */
|
||||
opal_list_t *recv_list;
|
||||
mca_pml_base_send_mode_t sendmode = (mca_coll_adapt_component.adapt_ireduce_synchronous_send)
|
||||
? MCA_PML_BASE_SEND_SYNCHRONOUS : MCA_PML_BASE_SEND_STANDARD;
|
||||
|
||||
/* Determine number of segments and number of elements sent per operation */
|
||||
rank = ompi_comm_rank(comm);
|
||||
ompi_datatype_get_extent(dtype, &lower_bound, &extent);
|
||||
ompi_datatype_type_size(dtype, &typelng);
|
||||
COLL_BASE_COMPUTED_SEGCOUNT(seg_size, typelng, seg_count);
|
||||
num_segs = (count + seg_count - 1) / seg_count;
|
||||
segment_increment = (ptrdiff_t) seg_count *extent;
|
||||
ompi_datatype_get_true_extent(dtype, &true_lower_bound, &true_extent);
|
||||
real_seg_size = true_extent + (ptrdiff_t) (seg_count - 1) * extent;
|
||||
|
||||
/* Atomically set up free list */
|
||||
if (NULL == mca_coll_adapt_component.adapt_ireduce_context_free_list) {
|
||||
opal_free_list_t* fl = OBJ_NEW(opal_free_list_t);
|
||||
opal_free_list_init(fl,
|
||||
sizeof(ompi_coll_adapt_reduce_context_t),
|
||||
opal_cache_line_size,
|
||||
OBJ_CLASS(ompi_coll_adapt_reduce_context_t),
|
||||
0, opal_cache_line_size,
|
||||
mca_coll_adapt_component.adapt_context_free_list_min,
|
||||
mca_coll_adapt_component.adapt_context_free_list_max,
|
||||
mca_coll_adapt_component.adapt_context_free_list_inc,
|
||||
NULL, 0, NULL, NULL, NULL);
|
||||
if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&mca_coll_adapt_component.adapt_ireduce_context_free_list,
|
||||
&(intptr_t){0}, fl) ) {
|
||||
OBJ_RELEASE(fl);
|
||||
}
|
||||
}
|
||||
|
||||
ompi_coll_base_nbc_request_t *temp_request = NULL;
|
||||
/* Set up request */
|
||||
temp_request = OBJ_NEW(ompi_coll_base_nbc_request_t);
|
||||
OMPI_REQUEST_INIT(&temp_request->super, false);
|
||||
temp_request->super.req_state = OMPI_REQUEST_ACTIVE;
|
||||
temp_request->super.req_type = OMPI_REQUEST_COLL;
|
||||
temp_request->super.req_free = ompi_coll_adapt_request_free;
|
||||
temp_request->super.req_status.MPI_SOURCE = 0;
|
||||
temp_request->super.req_status.MPI_TAG = 0;
|
||||
temp_request->super.req_status.MPI_ERROR = 0;
|
||||
temp_request->super.req_status._cancelled = 0;
|
||||
temp_request->super.req_status._ucount = 0;
|
||||
*request = (ompi_request_t*)temp_request;
|
||||
|
||||
/* Set up mutex */
|
||||
mutex_op_list = (opal_mutex_t *) malloc(sizeof(opal_mutex_t) * num_segs);
|
||||
for (int32_t i = 0; i < num_segs; i++) {
|
||||
OBJ_CONSTRUCT(&mutex_op_list[i], opal_mutex_t);
|
||||
}
|
||||
|
||||
/* Set constant context for send and recv call back */
|
||||
ompi_coll_adapt_constant_reduce_context_t *con =
|
||||
OBJ_NEW(ompi_coll_adapt_constant_reduce_context_t);
|
||||
con->count = count;
|
||||
con->seg_count = seg_count;
|
||||
con->datatype = dtype;
|
||||
con->comm = comm;
|
||||
con->segment_increment = segment_increment;
|
||||
con->num_segs = num_segs;
|
||||
con->request = (ompi_request_t*)temp_request;
|
||||
con->rank = rank;
|
||||
con->num_recv_segs = 0;
|
||||
con->num_sent_segs = 0;
|
||||
con->ongoing_send = 0;
|
||||
con->mutex_op_list = mutex_op_list;
|
||||
con->op = op;
|
||||
con->tree = tree;
|
||||
con->lower_bound = lower_bound;
|
||||
con->sbuf = (char *) sbuf;
|
||||
con->rbuf = (char *) rbuf;
|
||||
con->root = root;
|
||||
con->distance = 0;
|
||||
con->ireduce_tag = ompi_coll_base_nbc_reserve_tags(comm, num_segs);
|
||||
con->real_seg_size = real_seg_size;
|
||||
|
||||
/* If the current process is not leaf */
|
||||
if (tree->tree_nextsize > 0) {
|
||||
size_t num_allocate_elems = mca_coll_adapt_component.adapt_inbuf_free_list_min;
|
||||
if (tree->tree_nextsize * num_segs < num_allocate_elems) {
|
||||
num_allocate_elems = tree->tree_nextsize * num_segs;
|
||||
}
|
||||
opal_free_list_init(&con->inbuf_list,
|
||||
sizeof(ompi_coll_adapt_inbuf_t) + real_seg_size,
|
||||
opal_cache_line_size,
|
||||
OBJ_CLASS(ompi_coll_adapt_inbuf_t),
|
||||
0, opal_cache_line_size,
|
||||
num_allocate_elems,
|
||||
mca_coll_adapt_component.adapt_inbuf_free_list_max,
|
||||
mca_coll_adapt_component.adapt_inbuf_free_list_inc,
|
||||
NULL, 0, NULL, NULL, NULL);
|
||||
/* Set up next_recv_segs */
|
||||
con->next_recv_segs = (int32_t *) malloc(sizeof(int32_t) * tree->tree_nextsize);
|
||||
ompi_coll_adapt_inbuf_t *temp_inbuf =
|
||||
(ompi_coll_adapt_inbuf_t *) opal_free_list_wait_st(&con->inbuf_list);
|
||||
con->distance = (char *) temp_inbuf->buff - lower_bound - (char *) temp_inbuf; //address of inbuf->buff to address of inbuf
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: distance %d, inbuf %p, inbuf->buff %p, inbuf->buff-lb %p, to_inbuf %p, inbuf_list %p\n",
|
||||
rank, con->distance, (void *) temp_inbuf, (void *) temp_inbuf->buff,
|
||||
(char *) temp_inbuf->buff - lower_bound,
|
||||
(void *) to_inbuf((char *) temp_inbuf->buff - lower_bound, con->distance),
|
||||
(void *) &con->inbuf_list));
|
||||
opal_free_list_return_st(&con->inbuf_list, (opal_free_list_item_t *) temp_inbuf);
|
||||
} else {
|
||||
con->next_recv_segs = NULL;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: start ireduce root %d tag %d\n", rank, tree->tree_root,
|
||||
con->ireduce_tag));
|
||||
|
||||
/* If the current process is not leaf node */
|
||||
if (tree->tree_nextsize > 0) {
|
||||
/* Set up accumbuf */
|
||||
accumbuf = (char **) malloc(sizeof(char *) * num_segs);
|
||||
if (root == rank && sbuf == MPI_IN_PLACE) {
|
||||
for (int32_t i = 0; i < num_segs; i++) {
|
||||
accumbuf[i] = (char *) rbuf + (ptrdiff_t) i *(ptrdiff_t) segment_increment;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < num_segs; i++) {
|
||||
accumbuf[i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
con->accumbuf = accumbuf;
|
||||
|
||||
/* For the first batch of segments */
|
||||
min = mca_coll_adapt_component.adapt_ireduce_max_recv_requests;
|
||||
if (num_segs < mca_coll_adapt_component.adapt_ireduce_max_recv_requests) {
|
||||
min = num_segs;
|
||||
}
|
||||
for (int32_t i = 0; i < tree->tree_nextsize; i++) {
|
||||
con->next_recv_segs[i] = min - 1;
|
||||
}
|
||||
|
||||
int num_recvs = 0;
|
||||
for (int32_t seg_index = 0; seg_index < min; seg_index++)
|
||||
{
|
||||
/* For each child */
|
||||
for (int32_t i = 0; i < tree->tree_nextsize; i++) {
|
||||
recv_count = seg_count;
|
||||
if (seg_index == (num_segs - 1)) {
|
||||
recv_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index;
|
||||
}
|
||||
char *temp_recv_buf = NULL;
|
||||
ompi_coll_adapt_inbuf_t *inbuf = NULL;
|
||||
/* Set inbuf, if it it first child, recv on rbuf, else recv on inbuf */
|
||||
if (i == 0 && sbuf != MPI_IN_PLACE && root == rank) {
|
||||
temp_recv_buf = (char *) rbuf + (ptrdiff_t) seg_index *(ptrdiff_t) segment_increment;
|
||||
} else {
|
||||
inbuf = (ompi_coll_adapt_inbuf_t *) opal_free_list_wait(&con->inbuf_list);
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In ireduce, alloc inbuf %p\n", rank,
|
||||
(void *) inbuf));
|
||||
temp_recv_buf = inbuf->buff - lower_bound;
|
||||
}
|
||||
/* Get context */
|
||||
ompi_coll_adapt_reduce_context_t *context =
|
||||
(ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component.
|
||||
adapt_ireduce_context_free_list);
|
||||
context->buff = temp_recv_buf;
|
||||
context->seg_index = seg_index;
|
||||
context->child_id = i; //the id of peer in in the tree
|
||||
context->peer = tree->tree_next[i]; //the actual rank of the peer
|
||||
context->con = con;
|
||||
context->inbuf = inbuf;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In ireduce, create irecv for seg %d, peer %d, recv_count %d, inbuf %p tag %d\n",
|
||||
context->con->rank, context->seg_index, context->peer,
|
||||
recv_count, (void *) inbuf,
|
||||
con->ireduce_tag - seg_index));
|
||||
|
||||
/* Create a recv request */
|
||||
ompi_request_t *recv_req;
|
||||
err = MCA_PML_CALL(irecv
|
||||
(temp_recv_buf, recv_count, dtype, tree->tree_next[i],
|
||||
con->ireduce_tag - seg_index, comm, &recv_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
/* Set the recv callback */
|
||||
ompi_request_set_callback(recv_req, recv_cb, context);
|
||||
|
||||
++num_recvs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Leaf nodes */
|
||||
else {
|
||||
/* Set up recv_list */
|
||||
min = mca_coll_adapt_component.adapt_ireduce_max_send_requests;
|
||||
if (num_segs <= mca_coll_adapt_component.adapt_ireduce_max_send_requests) {
|
||||
min = num_segs;
|
||||
}
|
||||
/* put all items into the recv_list that won't be sent immediately */
|
||||
for (int32_t seg_index = min; seg_index < num_segs; seg_index++) {
|
||||
ompi_coll_adapt_item_t *item;
|
||||
item = OBJ_NEW(ompi_coll_adapt_item_t);
|
||||
item->id = seg_index;
|
||||
item->count = tree->tree_nextsize;
|
||||
opal_list_append(&con->recv_list, (opal_list_item_t *) item);
|
||||
}
|
||||
con->accumbuf = accumbuf;
|
||||
con->ongoing_send = min;
|
||||
for (int32_t seg_index = 0; seg_index < min; seg_index++) {
|
||||
send_count = seg_count;
|
||||
if (seg_index == (num_segs - 1)) {
|
||||
send_count = count - (ptrdiff_t) seg_count *(ptrdiff_t) seg_index;
|
||||
}
|
||||
ompi_coll_adapt_reduce_context_t *context =
|
||||
(ompi_coll_adapt_reduce_context_t *)opal_free_list_wait(mca_coll_adapt_component.adapt_ireduce_context_free_list);
|
||||
context->buff = (char *) sbuf + (ptrdiff_t) seg_index * (ptrdiff_t) segment_increment;
|
||||
context->seg_index = seg_index;
|
||||
/* Actural rank of the peer */
|
||||
context->peer = tree->tree_prev;
|
||||
context->con = con;
|
||||
context->inbuf = NULL;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"[%d]: In ireduce, create isend to seg %d, peer %d, send_count %d tag %d\n",
|
||||
context->con->rank, context->seg_index, context->peer,
|
||||
send_count, con->ireduce_tag - context->seg_index));
|
||||
|
||||
/* Create send request */
|
||||
ompi_request_t *send_req;
|
||||
err = MCA_PML_CALL(isend
|
||||
(context->buff, send_count, dtype, tree->tree_prev,
|
||||
con->ireduce_tag - context->seg_index,
|
||||
sendmode, comm, &send_req));
|
||||
if (MPI_SUCCESS != err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Set the send callback */
|
||||
ompi_request_set_callback(send_req, send_cb, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return MPI_SUCCESS;
|
||||
}
|
15
ompi/mca/coll/adapt/coll_adapt_item.c
Обычный файл
15
ompi/mca/coll/adapt/coll_adapt_item.c
Обычный файл
@ -0,0 +1,15 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "coll_adapt_item.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_coll_adapt_item_t, opal_list_item_t,
|
||||
NULL, NULL);
|
25
ompi/mca/coll/adapt/coll_adapt_item.h
Обычный файл
25
ompi/mca/coll/adapt/coll_adapt_item.h
Обычный файл
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "coll_adapt_inbuf.h"
|
||||
|
||||
struct ompi_coll_adapt_item_s {
|
||||
opal_list_item_t super;
|
||||
/* Fragment id */
|
||||
int id;
|
||||
/* The number of children which have received the current segment */
|
||||
int count;
|
||||
};
|
||||
|
||||
typedef struct ompi_coll_adapt_item_s ompi_coll_adapt_item_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_coll_adapt_item_t);
|
200
ompi/mca/coll/adapt/coll_adapt_module.c
Обычный файл
200
ompi/mca/coll/adapt/coll_adapt_module.c
Обычный файл
@ -0,0 +1,200 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#ifdef HAVE_STRING_H
|
||||
#include <string.h>
|
||||
#endif /* HAVE_STRING_H */
|
||||
#ifdef HAVE_SCHED_H
|
||||
#include <sched.h>
|
||||
#endif /* HAVE_SCHED_H */
|
||||
#include <sys/types.h>
|
||||
#ifdef HAVE_SYS_MMAN_H
|
||||
#include <sys/mman.h>
|
||||
#endif /* HAVE_SYS_MMAN_H */
|
||||
#ifdef HAVE_UNISTD_H
|
||||
#include <unistd.h>
|
||||
#endif /* HAVE_UNISTD_H */
|
||||
|
||||
#include "mpi.h"
|
||||
#include "opal_stdint.h"
|
||||
#include "opal/util/os_path.h"
|
||||
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/group/group.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
#include "ompi/mca/coll/base/coll_base_functions.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
#include "coll_adapt.h"
|
||||
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
#include "coll_adapt_topocache.h"
|
||||
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
|
||||
/*
|
||||
* Module constructor
|
||||
*/
|
||||
static void adapt_module_construct(mca_coll_adapt_module_t * module)
|
||||
{
|
||||
module->topo_cache = NULL;
|
||||
module->adapt_enabled = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Module destructor
|
||||
*/
|
||||
static void adapt_module_destruct(mca_coll_adapt_module_t * module)
|
||||
{
|
||||
if (NULL != module->topo_cache) {
|
||||
adapt_topology_cache_item_t *item;
|
||||
while (NULL != (item = (adapt_topology_cache_item_t*)opal_list_remove_first(module->topo_cache))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OBJ_RELEASE(module->topo_cache);
|
||||
module->topo_cache = NULL;
|
||||
}
|
||||
module->adapt_enabled = false;
|
||||
}
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_coll_adapt_module_t,
|
||||
mca_coll_base_module_t,
|
||||
adapt_module_construct,
|
||||
adapt_module_destruct);
|
||||
|
||||
/*
|
||||
* In this macro, the following variables are supposed to have been declared
|
||||
* in the caller:
|
||||
* . ompi_communicator_t *comm
|
||||
* . mca_coll_adapt_module_t *adapt_module
|
||||
*/
|
||||
#define ADAPT_SAVE_PREV_COLL_API(__api) \
|
||||
do { \
|
||||
adapt_module->previous_ ## __api = comm->c_coll->coll_ ## __api; \
|
||||
adapt_module->previous_ ## __api ## _module = comm->c_coll->coll_ ## __api ## _module; \
|
||||
if (!comm->c_coll->coll_ ## __api || !comm->c_coll->coll_ ## __api ## _module) { \
|
||||
opal_output_verbose(1, ompi_coll_base_framework.framework_output, \
|
||||
"(%d/%s): no underlying " # __api"; disqualifying myself", \
|
||||
comm->c_contextid, comm->c_name); \
|
||||
return OMPI_ERROR; \
|
||||
} \
|
||||
OBJ_RETAIN(adapt_module->previous_ ## __api ## _module); \
|
||||
} while(0)
|
||||
|
||||
|
||||
/*
|
||||
* Init module on the communicator
|
||||
*/
|
||||
static int adapt_module_enable(mca_coll_base_module_t * module,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
mca_coll_adapt_module_t * adapt_module = (mca_coll_adapt_module_t*) module;
|
||||
|
||||
ADAPT_SAVE_PREV_COLL_API(reduce);
|
||||
ADAPT_SAVE_PREV_COLL_API(ireduce);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initial query function that is invoked during MPI_INIT, allowing
|
||||
* this component to disqualify itself if it doesn't support the
|
||||
* required level of thread support. This function is invoked exactly
|
||||
* once.
|
||||
*/
|
||||
int ompi_coll_adapt_init_query(bool enable_progress_threads, bool enable_mpi_threads)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Invoked when there's a new communicator that has been created.
|
||||
* Look at the communicator and decide which set of functions and
|
||||
* priority we want to return.
|
||||
*/
|
||||
mca_coll_base_module_t *ompi_coll_adapt_comm_query(struct ompi_communicator_t * comm,
|
||||
int *priority)
|
||||
{
|
||||
mca_coll_adapt_module_t *adapt_module;
|
||||
|
||||
/* If we're intercomm, or if there's only one process in the communicator */
|
||||
if (OMPI_COMM_IS_INTER(comm) || 1 == ompi_comm_size(comm)) {
|
||||
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
|
||||
"coll:adapt:comm_query (%d/%s): intercomm, "
|
||||
"comm is too small; disqualifying myself",
|
||||
comm->c_contextid, comm->c_name);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Get the priority level attached to this module.
|
||||
If priority is less than or equal to 0, then the module is unavailable. */
|
||||
*priority = mca_coll_adapt_component.adapt_priority;
|
||||
if (mca_coll_adapt_component.adapt_priority <= 0) {
|
||||
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
|
||||
"coll:adapt:comm_query (%d/%s): priority too low; "
|
||||
"disqualifying myself",
|
||||
comm->c_contextid, comm->c_name);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
adapt_module = OBJ_NEW(mca_coll_adapt_module_t);
|
||||
if (NULL == adapt_module) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* All is good -- return a module */
|
||||
adapt_module->super.coll_module_enable = adapt_module_enable;
|
||||
adapt_module->super.ft_event = NULL;
|
||||
adapt_module->super.coll_allgather = NULL;
|
||||
adapt_module->super.coll_allgatherv = NULL;
|
||||
adapt_module->super.coll_allreduce = NULL;
|
||||
adapt_module->super.coll_alltoall = NULL;
|
||||
adapt_module->super.coll_alltoallw = NULL;
|
||||
adapt_module->super.coll_barrier = NULL;
|
||||
adapt_module->super.coll_bcast = ompi_coll_adapt_bcast;
|
||||
adapt_module->super.coll_exscan = NULL;
|
||||
adapt_module->super.coll_gather = NULL;
|
||||
adapt_module->super.coll_gatherv = NULL;
|
||||
adapt_module->super.coll_reduce = ompi_coll_adapt_reduce;
|
||||
adapt_module->super.coll_reduce_scatter = NULL;
|
||||
adapt_module->super.coll_scan = NULL;
|
||||
adapt_module->super.coll_scatter = NULL;
|
||||
adapt_module->super.coll_scatterv = NULL;
|
||||
adapt_module->super.coll_ibcast = ompi_coll_adapt_ibcast;
|
||||
adapt_module->super.coll_ireduce = ompi_coll_adapt_ireduce;
|
||||
adapt_module->super.coll_iallreduce = NULL;
|
||||
|
||||
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
|
||||
"coll:adapt:comm_query (%d/%s): pick me! pick me!",
|
||||
comm->c_contextid, comm->c_name);
|
||||
return &(adapt_module->super);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free ADAPT request
|
||||
*/
|
||||
int ompi_coll_adapt_request_free(ompi_request_t ** request)
|
||||
{
|
||||
OMPI_REQUEST_FINI(*request);
|
||||
(*request)->req_state = OMPI_REQUEST_INVALID;
|
||||
OBJ_RELEASE(*request);
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
40
ompi/mca/coll/adapt/coll_adapt_reduce.c
Обычный файл
40
ompi/mca/coll/adapt/coll_adapt_reduce.c
Обычный файл
@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
|
||||
#include "ompi/op/op.h"
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_algorithms.h"
|
||||
|
||||
/* MPI_Reduce and MPI_Ireduce in the ADAPT module only work for commutative operations */
|
||||
int ompi_coll_adapt_reduce(const void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype,
|
||||
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
|
||||
mca_coll_base_module_t * module)
|
||||
{
|
||||
/* Fall-back if operation is commutative */
|
||||
if (!ompi_op_is_commute(op)){
|
||||
mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module;
|
||||
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
|
||||
"ADAPT cannot handle reduce with this (commutative) operation. It needs to fall back on another component\n"));
|
||||
return adapt_module->previous_reduce(sbuf, rbuf, count, dtype, op, root,
|
||||
comm,
|
||||
adapt_module->previous_reduce_module);
|
||||
}
|
||||
|
||||
ompi_request_t *request = NULL;
|
||||
int err = ompi_coll_adapt_ireduce(sbuf, rbuf, count, dtype, op, root, comm, &request, module);
|
||||
if( MPI_SUCCESS != err ) {
|
||||
if( NULL == request )
|
||||
return err;
|
||||
}
|
||||
ompi_request_wait(&request, MPI_STATUS_IGNORE);
|
||||
return err;
|
||||
}
|
105
ompi/mca/coll/adapt/coll_adapt_topocache.c
Обычный файл
105
ompi/mca/coll/adapt/coll_adapt_topocache.c
Обычный файл
@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "coll_adapt.h"
|
||||
#include "coll_adapt_topocache.h"
|
||||
|
||||
#include "ompi/communicator/communicator.h"
|
||||
|
||||
static void destruct_topology_cache(adapt_topology_cache_item_t *item)
|
||||
{
|
||||
if (NULL != item->tree) {
|
||||
ompi_coll_base_topo_destroy_tree(&item->tree);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(adapt_topology_cache_item_t, opal_list_item_t,
|
||||
NULL, &destruct_topology_cache);
|
||||
|
||||
static ompi_coll_tree_t *create_topology(
|
||||
ompi_coll_adapt_algorithm_t algorithm,
|
||||
int root,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
switch(algorithm) {
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_TUNED:
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_BINOMIAL:
|
||||
{
|
||||
return ompi_coll_base_topo_build_bmtree(comm, root);
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_IN_ORDER_BINOMIAL:
|
||||
{
|
||||
return ompi_coll_base_topo_build_in_order_bmtree(comm, root);
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_BINARY:
|
||||
{
|
||||
return ompi_coll_base_topo_build_tree(2, comm, root);
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_PIPELINE:
|
||||
{
|
||||
return ompi_coll_base_topo_build_chain(1, comm, root);
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_CHAIN:
|
||||
{
|
||||
return ompi_coll_base_topo_build_chain(4, comm, root);
|
||||
}
|
||||
case OMPI_COLL_ADAPT_ALGORITHM_LINEAR:
|
||||
{
|
||||
int fanout = ompi_comm_size(comm) - 1;
|
||||
ompi_coll_tree_t *tree;
|
||||
if (fanout < 1) {
|
||||
tree = ompi_coll_base_topo_build_chain(1, comm, root);
|
||||
} else if (fanout <= MAXTREEFANOUT) {
|
||||
tree = ompi_coll_base_topo_build_tree(ompi_comm_size(comm) - 1, comm, root);
|
||||
} else {
|
||||
tree = ompi_coll_base_topo_build_tree(MAXTREEFANOUT, comm, root);
|
||||
}
|
||||
return tree;
|
||||
}
|
||||
default:
|
||||
printf("WARN: unknown topology %d\n", algorithm);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
ompi_coll_tree_t* adapt_module_cached_topology(
|
||||
mca_coll_base_module_t *module,
|
||||
struct ompi_communicator_t *comm,
|
||||
int root,
|
||||
ompi_coll_adapt_algorithm_t algorithm)
|
||||
{
|
||||
mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t*)module;
|
||||
adapt_topology_cache_item_t *item;
|
||||
ompi_coll_tree_t * tree;
|
||||
if (NULL != adapt_module->topo_cache) {
|
||||
OPAL_LIST_FOREACH(item, adapt_module->topo_cache, adapt_topology_cache_item_t) {
|
||||
if (item->root == root && item->algorithm == algorithm) {
|
||||
return item->tree;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
adapt_module->topo_cache = OBJ_NEW(opal_list_t);
|
||||
}
|
||||
|
||||
/* topology not found, create one */
|
||||
tree = create_topology(algorithm, root, comm);
|
||||
|
||||
item = OBJ_NEW(adapt_topology_cache_item_t);
|
||||
item->tree = tree;
|
||||
item->root = root;
|
||||
item->algorithm = algorithm;
|
||||
opal_list_prepend(adapt_module->topo_cache, &item->super);
|
||||
return tree;
|
||||
}
|
||||
|
35
ompi/mca/coll/adapt/coll_adapt_topocache.h
Обычный файл
35
ompi/mca/coll/adapt/coll_adapt_topocache.h
Обычный файл
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright (c) 2014-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_COLL_ADAPT_TOPOCACHE_H
|
||||
#define MCA_COLL_ADAPT_TOPOCACHE_H
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "ompi/mca/coll/base/coll_base_topo.h"
|
||||
|
||||
typedef struct adapt_topology_cache_item_t {
|
||||
opal_list_item_t super;
|
||||
ompi_coll_tree_t *tree;
|
||||
int root;
|
||||
int algorithm;
|
||||
} adapt_topology_cache_item_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(adapt_topology_cache_item_t);
|
||||
|
||||
|
||||
OMPI_DECLSPEC ompi_coll_tree_t* adapt_module_cached_topology(
|
||||
mca_coll_base_module_t *module,
|
||||
struct ompi_communicator_t *comm,
|
||||
int root,
|
||||
ompi_coll_adapt_algorithm_t algorithm);
|
||||
|
||||
#endif /* MCA_COLL_ADAPT_TOPOCACHE_H */
|
@ -27,11 +27,17 @@
|
||||
#include "ompi/mca/mca.h"
|
||||
#include "ompi/datatype/ompi_datatype.h"
|
||||
#include "ompi/request/request.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/**
|
||||
* Request structure to be returned by non-blocking
|
||||
* collective operations.
|
||||
*/
|
||||
struct ompi_coll_base_nbc_request_t {
|
||||
ompi_request_t super;
|
||||
union {
|
||||
@ -60,6 +66,22 @@ struct ompi_coll_base_nbc_request_t {
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t);
|
||||
|
||||
static inline int32_t
|
||||
ompi_coll_base_nbc_reserve_tags(ompi_communicator_t* comm, int32_t reserve)
|
||||
{
|
||||
int32_t tag, old_tag;
|
||||
assert( reserve > 0 );
|
||||
reread_tag: /* In case we fail to atomically update the tag */
|
||||
tag = old_tag = comm->c_nbc_tag;
|
||||
if ((tag - reserve) < MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
}
|
||||
if( !OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_32(&comm->c_nbc_tag, &old_tag, tag - reserve) ) {
|
||||
goto reread_tag;
|
||||
}
|
||||
return tag;
|
||||
}
|
||||
|
||||
typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t;
|
||||
|
||||
/**
|
||||
@ -115,14 +137,29 @@ unsigned int ompi_mirror_perm(unsigned int x, int nbits);
|
||||
*/
|
||||
int ompi_rounddown(int num, int factor);
|
||||
|
||||
/**
|
||||
* If necessary, retain op and store it in the
|
||||
* request object, which should be of type ompi_coll_base_nbc_request_t
|
||||
* (will be cast internally).
|
||||
*/
|
||||
int ompi_coll_base_retain_op( ompi_request_t *request,
|
||||
ompi_op_t *op,
|
||||
ompi_datatype_t *type);
|
||||
|
||||
/**
|
||||
* If necessary, retain the datatypes and store them in the
|
||||
* request object, which should be of type ompi_coll_base_nbc_request_t
|
||||
* (will be cast internally).
|
||||
*/
|
||||
int ompi_coll_base_retain_datatypes( ompi_request_t *request,
|
||||
ompi_datatype_t *stype,
|
||||
ompi_datatype_t *rtype);
|
||||
|
||||
/**
|
||||
* If necessary, retain the datatypes and store them in the
|
||||
* request object, which should be of type ompi_coll_base_nbc_request_t
|
||||
* (will be cast internally).
|
||||
*/
|
||||
int ompi_coll_base_retain_datatypes_w( ompi_request_t *request,
|
||||
ompi_datatype_t *stypes[],
|
||||
ompi_datatype_t *rtypes[]);
|
||||
|
@ -94,7 +94,6 @@ struct ompi_coll_libnbc_module_t {
|
||||
mca_coll_base_module_t super;
|
||||
opal_mutex_t mutex;
|
||||
bool comm_registered;
|
||||
int tag;
|
||||
#ifdef NBC_CACHE_SCHEDULE
|
||||
void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct
|
||||
hb_tree, but since this is a
|
||||
|
@ -25,7 +25,7 @@
|
||||
* Additional copyrights may follow
|
||||
*/
|
||||
#include "nbc_internal.h"
|
||||
#include "ompi/mca/coll/base/coll_tags.h"
|
||||
#include "ompi/mca/coll/base/coll_base_util.h"
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
|
||||
@ -595,7 +595,6 @@ void NBC_Return_handle(ompi_coll_libnbc_request_t *request) {
|
||||
}
|
||||
|
||||
int NBC_Init_comm(MPI_Comm comm, NBC_Comminfo *comminfo) {
|
||||
comminfo->tag= MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
|
||||
#ifdef NBC_CACHE_SCHEDULE
|
||||
/* initialize the NBC_ALLTOALL SchedCache tree */
|
||||
@ -672,7 +671,7 @@ int NBC_Start(NBC_Handle *handle) {
|
||||
int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
ompi_coll_libnbc_module_t *module, bool persistent,
|
||||
ompi_request_t **request, void *tmpbuf) {
|
||||
int ret, tmp_tag;
|
||||
int ret;
|
||||
bool need_register = false;
|
||||
ompi_coll_libnbc_request_t *handle;
|
||||
|
||||
@ -685,13 +684,7 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
|
||||
/* update the module->tag here because other processes may have operations
|
||||
* and they may update the module->tag */
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
tmp_tag = module->tag--;
|
||||
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->mutex);
|
||||
(void)ompi_coll_base_nbc_reserve_tags(comm, 1);
|
||||
|
||||
OBJ_RELEASE(schedule);
|
||||
free(tmpbuf);
|
||||
@ -712,20 +705,15 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
|
||||
/******************** Do the tag and shadow comm administration ... ***************/
|
||||
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
tmp_tag = module->tag--;
|
||||
if (tmp_tag == MCA_COLL_BASE_TAG_NONBLOCKING_END) {
|
||||
tmp_tag = module->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
handle->tag = ompi_coll_base_nbc_reserve_tags(comm, 1);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->mutex);
|
||||
if (true != module->comm_registered) {
|
||||
module->comm_registered = true;
|
||||
need_register = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->mutex);
|
||||
|
||||
handle->tag = tmp_tag;
|
||||
|
||||
/* register progress */
|
||||
if (need_register) {
|
||||
@ -737,7 +725,6 @@ int NBC_Schedule_request(NBC_Schedule *schedule, ompi_communicator_t *comm,
|
||||
}
|
||||
|
||||
handle->comm=comm;
|
||||
/*printf("got module: %lu tag: %i\n", module, module->tag);*/
|
||||
|
||||
/******************** end of tag and shadow comm administration ... ***************/
|
||||
handle->comminfo = module;
|
||||
|
@ -215,17 +215,8 @@ static int tuned_open(void)
|
||||
int rc;
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
{
|
||||
int param;
|
||||
|
||||
param = mca_base_var_find("ompi", "coll", "base", "verbose");
|
||||
if (param >= 0) {
|
||||
const int *verbose = NULL;
|
||||
mca_base_var_get_value(param, &verbose, NULL, NULL);
|
||||
if (verbose && verbose[0] > 0) {
|
||||
ompi_coll_tuned_stream = opal_output_open(NULL);
|
||||
}
|
||||
}
|
||||
if (ompi_coll_base_framework.framework_verbose) {
|
||||
ompi_coll_tuned_stream = opal_output_open(NULL);
|
||||
}
|
||||
#endif /* OPAL_ENABLE_DEBUG */
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
* Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved.
|
||||
* Copyright (c) 2015-2019 Research Organization for Information Science
|
||||
* and Technology (RIST). All rights reserved.
|
||||
* Copyright (c) 2017-2018 The University of Tennessee and The University
|
||||
* Copyright (c) 2017-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
@ -80,6 +80,14 @@ int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype,
|
||||
}
|
||||
}
|
||||
|
||||
/* If there's only one node, or if the count is 0, we're done */
|
||||
|
||||
if ((OMPI_COMM_IS_INTRA(comm) && ompi_comm_size(comm) <= 1) ||
|
||||
0 == count) {
|
||||
*request = &ompi_request_empty;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
/* Invoke the coll component to perform the back-end operation */
|
||||
|
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2018 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
|
||||
@ -97,6 +97,11 @@ int MPI_Ireduce_scatter_block(const void *sendbuf, void *recvbuf, int recvcount,
|
||||
OMPI_ERRHANDLER_CHECK(err, comm, err, FUNC_NAME);
|
||||
}
|
||||
|
||||
if (0 == recvcount) {
|
||||
*request = &ompi_request_empty;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
/* Invoke the coll component to perform the back-end operation */
|
||||
|
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2017 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
|
||||
@ -94,6 +94,9 @@ int MPI_Reduce_scatter_block(const void *sendbuf, void *recvbuf, int recvcount,
|
||||
OMPI_CHECK_DATATYPE_FOR_SEND(err, datatype, recvcount);
|
||||
OMPI_ERRHANDLER_CHECK(err, comm, err, FUNC_NAME);
|
||||
}
|
||||
if (0 == recvcount) {
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2016 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2020 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
@ -434,9 +434,11 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa
|
||||
{
|
||||
int rc = 0;
|
||||
|
||||
if( NULL != request->req_complete_cb) {
|
||||
rc = request->req_complete_cb( request );
|
||||
if(NULL != request->req_complete_cb) {
|
||||
/* Set the request cb to NULL to allow resetting in the callback */
|
||||
ompi_request_complete_fn_t fct = request->req_complete_cb;
|
||||
request->req_complete_cb = NULL;
|
||||
rc = fct( request );
|
||||
}
|
||||
|
||||
if (0 == rc) {
|
||||
@ -457,6 +459,21 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int ompi_request_set_callback(ompi_request_t* request,
|
||||
ompi_request_complete_fn_t cb,
|
||||
void* cb_data)
|
||||
{
|
||||
request->req_complete_cb_data = cb_data;
|
||||
request->req_complete_cb = cb;
|
||||
/* If request is completed and the callback is not called, need to call callback */
|
||||
if ((NULL != request->req_complete_cb) && (request->req_complete == REQUEST_COMPLETED)) {
|
||||
ompi_request_complete_fn_t fct = request->req_complete_cb;
|
||||
request->req_complete_cb = NULL;
|
||||
return fct( request );
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user