From d57ae62dee7cfaf6843a2a33574f299cbc719f36 Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Fri, 22 Jun 2018 15:03:01 +0300 Subject: [PATCH 1/3] MCA/UCX: added common module - implemented non-blocking routines for flush operations Signed-off-by: Sergey Oblomov --- VERSION | 1 + configure.ac | 1 + ompi/mca/osc/ucx/Makefile.am | 4 +- ompi/mca/osc/ucx/osc_ucx_active_target.c | 7 +- ompi/mca/osc/ucx/osc_ucx_comm.c | 15 ++-- ompi/mca/osc/ucx/osc_ucx_passive_target.c | 9 ++- ompi/mca/pml/ucx/Makefile.am | 4 +- opal/mca/common/ucx/Makefile.am | 92 +++++++++++++++++++++++ opal/mca/common/ucx/common_ucx.c | 34 +++++++++ opal/mca/common/ucx/common_ucx.h | 60 +++++++++++++++ opal/mca/common/ucx/configure.m4 | 33 ++++++++ opal/mca/common/ucx/owner.txt | 8 ++ oshmem/mca/atomic/ucx/atomic_ucx.h | 24 +----- oshmem/mca/spml/ucx/Makefile.am | 3 +- oshmem/mca/spml/ucx/spml_ucx.c | 3 +- 15 files changed, 255 insertions(+), 43 deletions(-) create mode 100644 opal/mca/common/ucx/Makefile.am create mode 100644 opal/mca/common/ucx/common_ucx.c create mode 100644 opal/mca/common/ucx/common_ucx.h create mode 100644 opal/mca/common/ucx/configure.m4 create mode 100644 opal/mca/common/ucx/owner.txt diff --git a/VERSION b/VERSION index f92b41d35e..6fadf03012 100644 --- a/VERSION +++ b/VERSION @@ -112,5 +112,6 @@ libmca_orte_common_alps_so_version=0:0:0 libmca_opal_common_cuda_so_version=0:0:0 libmca_opal_common_ofi_so_version=0:0:0 libmca_opal_common_sm_so_version=0:0:0 +libmca_opal_common_ucx_so_version=0:0:0 libmca_opal_common_ugni_so_version=0:0:0 libmca_opal_common_verbs_so_version=0:0:0 diff --git a/configure.ac b/configure.ac index 7984974665..92d661c305 100644 --- a/configure.ac +++ b/configure.ac @@ -162,6 +162,7 @@ AC_SUBST(libmca_opal_common_verbs_so_version) AC_SUBST(libmca_orte_common_alps_so_version) AC_SUBST(libmca_ompi_common_ompio_so_version) AC_SUBST(libmca_ompi_common_monitoring_so_version) +AC_SUBST(libmca_opal_common_ucx_so_version) # # Get the versions of the autotools that were used to bootstrap us diff --git a/ompi/mca/osc/ucx/Makefile.am b/ompi/mca/osc/ucx/Makefile.am index e301686c3b..a7dae148e8 100644 --- a/ompi/mca/osc/ucx/Makefile.am +++ b/ompi/mca/osc/ucx/Makefile.am @@ -34,8 +34,8 @@ endif mcacomponentdir = $(pkglibdir) mcacomponent_LTLIBRARIES = $(component_install) mca_osc_ucx_la_SOURCES = $(ucx_sources) -mca_osc_ucx_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ - $(osc_ucx_LIBS) +mca_osc_ucx_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la $(osc_ucx_LIBS) \ + $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la mca_osc_ucx_la_LDFLAGS = -module -avoid-version $(osc_ucx_LDFLAGS) noinst_LTLIBRARIES = $(component_noinst) diff --git a/ompi/mca/osc/ucx/osc_ucx_active_target.c b/ompi/mca/osc/ucx/osc_ucx_active_target.c index 50eebdb19f..348d1cf701 100644 --- a/ompi/mca/osc/ucx/osc_ucx_active_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_active_target.c @@ -28,6 +28,7 @@ #include "ompi/mca/osc/osc.h" #include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "osc_ucx.h" @@ -73,7 +74,7 @@ int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) { } if (!(assert & MPI_MODE_NOPRECEDE)) { - status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_worker_flush failed: %d\n", @@ -175,7 +176,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) { module->epoch_type.access = NONE_EPOCH; - status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_worker_flush failed: %d\n", @@ -200,7 +201,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) { __FILE__, __LINE__, status); } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 22f4ce1e94..35a6af55b8 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -12,6 +12,7 @@ #include "ompi/mca/osc/osc.h" #include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "osc_ucx.h" #include "osc_ucx_request.h" @@ -65,9 +66,7 @@ static inline int incr_and_check_ops_num(ompi_osc_ucx_module_t *module, int targ module->global_ops_num++; module->per_target_ops_nums[target]++; if (module->global_ops_num >= OSC_UCX_OPS_THRESHOLD) { - /* TODO: ucp_ep_flush needs to be replaced with its non-blocking counterpart - * when it is implemented in UCX */ - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -348,7 +347,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module return OMPI_ERROR; } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -550,7 +549,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, return ret; } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -607,7 +606,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, return ret; } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -801,7 +800,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, return ret; } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -857,7 +856,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, return ret; } - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", diff --git a/ompi/mca/osc/ucx/osc_ucx_passive_target.c b/ompi/mca/osc/ucx/osc_ucx_passive_target.c index 9f2fe98b63..432b2b7edf 100644 --- a/ompi/mca/osc/ucx/osc_ucx_passive_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_passive_target.c @@ -12,6 +12,7 @@ #include "ompi/mca/osc/osc.h" #include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "osc_ucx.h" @@ -179,7 +180,7 @@ int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) { (uint32_t)target); ep = OSC_UCX_GET_EP(module->comm, target); - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -258,7 +259,7 @@ int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) { assert(module->lock_count == 0); - status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_worker_flush failed: %d\n", @@ -314,7 +315,7 @@ int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) { } ep = OSC_UCX_GET_EP(module->comm, target); - status = ucp_ep_flush(ep); + status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_ep_flush failed: %d\n", @@ -337,7 +338,7 @@ int ompi_osc_ucx_flush_all(struct ompi_win_t *win) { return OMPI_ERR_RMA_SYNC; } - status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_worker_flush failed: %d\n", diff --git a/ompi/mca/pml/ucx/Makefile.am b/ompi/mca/pml/ucx/Makefile.am index 54e590438e..6b8355031f 100644 --- a/ompi/mca/pml/ucx/Makefile.am +++ b/ompi/mca/pml/ucx/Makefile.am @@ -37,8 +37,8 @@ endif mcacomponentdir = $(ompilibdir) mcacomponent_LTLIBRARIES = $(component_install) mca_pml_ucx_la_SOURCES = $(local_sources) -mca_pml_ucx_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ - $(pml_ucx_LIBS) +mca_pml_ucx_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la $(pml_ucx_LIBS) \ + $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la mca_pml_ucx_la_LDFLAGS = -module -avoid-version $(pml_ucx_LDFLAGS) noinst_LTLIBRARIES = $(component_noinst) diff --git a/opal/mca/common/ucx/Makefile.am b/opal/mca/common/ucx/Makefile.am new file mode 100644 index 0000000000..9ac8222743 --- /dev/null +++ b/opal/mca/common/ucx/Makefile.am @@ -0,0 +1,92 @@ +# +# Copyright (c) 2018 Mellanox Technologies. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Note that building this common component statically and linking +# against other dynamic components is *not* supported! + +# Header files + +headers = \ + common_ucx.h + +# Source files + +sources = \ + common_ucx.c + +# Help file + +# As per above, we'll either have an installable or noinst result. +# The installable one should follow the same MCA prefix naming rules +# (i.e., libmca__.la). The noinst one can be named +# whatever it wants, although libmca___noinst.la is +# recommended. + +# To simplify components that link to this library, we will *always* +# have an output libtool library named libmca__.la -- even +# for case 2) described above (i.e., so there's no conditional logic +# necessary in component Makefile.am's that link to this library). +# Hence, if we're creating a noinst version of this library (i.e., +# case 2), we sym link it to the libmca__.la name +# (libtool will do the Right Things under the covers). See the +# all-local and clean-local rules, below, for how this is effected. + +lib_LTLIBRARIES = +noinst_LTLIBRARIES = +comp_inst = lib@OPAL_LIB_PREFIX@mca_common_ucx.la +comp_noinst = lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst.la + +if MCA_BUILD_opal_common_ucx_DSO +lib_LTLIBRARIES += $(comp_inst) +else +noinst_LTLIBRARIES += $(comp_noinst) +endif + +lib@OPAL_LIB_PREFIX@mca_common_ucx_la_SOURCES = \ + $(headers) $(sources) +lib@OPAL_LIB_PREFIX@mca_common_ucx_la_LDFLAGS = \ + -version-info $(libmca_opal_common_ucx_so_version) \ + $(common_ucx_LDFLAGS) +lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LDFLAGS = \ + $(common_ucx_LDFLAGS) +lib@OPAL_LIB_PREFIX@mca_common_ucx_la_LIBADD = \ + $(common_ucx_LIBS) \ + $(OMPI_TOP_BUILDDIR)/opal/lib@OPAL_LIB_PREFIX@open-pal.la +lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LIBADD = \ + $(common_ucx_LIBS) +lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_SOURCES = \ + $(headers) $(sources) + +# Conditionally install the header files + +if WANT_INSTALL_HEADERS +opaldir = $(opalincludedir)/$(subdir) +opal_HEADERS = $(headers) +endif + +# These two rules will sym link the "noinst" libtool library filename +# to the installable libtool library filename in the case where we are +# compiling this component statically (case 2), described above). + +# See Makefile.ompi-rules for an explanation of the "V" macros, below +V=0 +OMPI_V_LN_SCOMP = $(ompi__v_LN_SCOMP_$V) +ompi__v_LN_SCOMP_ = $(ompi__v_LN_SCOMP_$AM_DEFAULT_VERBOSITY) +ompi__v_LN_SCOMP_0 = @echo " LN_S " `basename $(comp_inst)`; + +all-local: + $(OMPI_V_LN_SCOMP) if test -z "$(lib_LTLIBRARIES)"; then \ + rm -f "$(comp_inst)"; \ + $(LN_S) "$(comp_noinst)" "$(comp_inst)"; \ + fi + +clean-local: + if test -z "$(lib_LTLIBRARIES)"; then \ + rm -f "$(comp_inst)"; \ + fi diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c new file mode 100644 index 0000000000..77dc9211e5 --- /dev/null +++ b/opal/mca/common/ucx/common_ucx.c @@ -0,0 +1,34 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2018. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "opal_config.h" + +#include "common_ucx.h" + +/***********************************************************************/ + +static void opal_common_ucp_send_cb(void *request, ucs_status_t status) +{ +} + +ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker) +{ + ucs_status_ptr_t status; + + status = ucp_ep_flush_nb(ep, 0, opal_common_ucp_send_cb); + return opal_common_ucx_wait_request(status, worker); +} + +ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker) +{ + ucs_status_ptr_t status; + + status = ucp_worker_flush_nb(worker, 0, opal_common_ucp_send_cb); + return opal_common_ucx_wait_request(status, worker); +} diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h new file mode 100644 index 0000000000..08ffb09ac0 --- /dev/null +++ b/opal/mca/common/ucx/common_ucx.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2018 Mellanox Technologies. All rights reserved. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _COMMON_UCX_H_ +#define _COMMON_UCX_H_ + +#include "opal_config.h" + +#include + +#include + +#include "opal/mca/mca.h" +#include "opal/runtime/opal_progress.h" +#include "opal/class/opal_list.h" + +BEGIN_C_DECLS + +OPAL_DECLSPEC ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker); +OPAL_DECLSPEC ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker); + +static inline +ucs_status_t opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker) +{ + ucs_status_t status; + int i; + + /* check for request completed or failed */ + if (UCS_OK == request) { + return UCS_OK; + } else if (UCS_PTR_IS_ERR(request)) { + return UCS_PTR_STATUS(request); + } + + while (1) { + /* call UCX progress */ + for (i = 0; i < 100; i++) { + if (UCS_INPROGRESS != (status = ucp_request_check_status(request))) { + ucp_request_free(request); + return status; + } + ucp_worker_progress(worker); + } + /* call OPAL progress on every 100 call to UCX progress */ + opal_progress(); + } +} + +END_C_DECLS + +#endif + + diff --git a/opal/mca/common/ucx/configure.m4 b/opal/mca/common/ucx/configure.m4 new file mode 100644 index 0000000000..6d79f04275 --- /dev/null +++ b/opal/mca/common/ucx/configure.m4 @@ -0,0 +1,33 @@ +# -*- shell-script -*- +# +# Copyright (c) 2018 Mellanox Technologies. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_opal_common_sm_CONFIG([action-if-can-compile], +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_opal_common_ucx_CONFIG],[ + AC_CONFIG_FILES([opal/mca/common/ucx/Makefile]) + common_ucx_happy="no" + OMPI_CHECK_UCX([common_ucx], + [common_ucx_happy="yes"], + [common_ucx_happy="no"]) + + AS_IF([test "$common_ucx_happy" = "yes"], + [$1], + [$2]) + + + # substitute in the things needed to build openib + AC_SUBST([common_ucx_CFLAGS]) + AC_SUBST([common_ucx_CPPFLAGS]) + AC_SUBST([common_ucx_LDFLAGS]) + AC_SUBST([common_ucx_LIBS]) +])dnl + + diff --git a/opal/mca/common/ucx/owner.txt b/opal/mca/common/ucx/owner.txt new file mode 100644 index 0000000000..b0b06c9ab5 --- /dev/null +++ b/opal/mca/common/ucx/owner.txt @@ -0,0 +1,8 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: MELLANOX +status: maintenance + diff --git a/oshmem/mca/atomic/ucx/atomic_ucx.h b/oshmem/mca/atomic/ucx/atomic_ucx.h index 3b5ba03b73..3fda14c94e 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx.h +++ b/oshmem/mca/atomic/ucx/atomic_ucx.h @@ -13,6 +13,7 @@ #include "oshmem_config.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "opal/mca/mca.h" #include "oshmem/mca/atomic/atomic.h" #include "oshmem/util/oshmem_util.h" @@ -66,28 +67,7 @@ void mca_atomic_ucx_complete_cb(void *request, ucs_status_t status); static inline ucs_status_t mca_atomic_ucx_wait_request(ucs_status_ptr_t request) { - ucs_status_t status; - int i; - - /* check for request completed or failed */ - if (UCS_OK == request) { - return UCS_OK; - } else if (UCS_PTR_IS_ERR(request)) { - return UCS_PTR_STATUS(request); - } - - while (1) { - /* call UCX progress */ - for (i = 0; i < 100; i++) { - if (UCS_INPROGRESS != (status = ucp_request_check_status(request))) { - ucp_request_free(request); - return status; - } - ucp_worker_progress(mca_spml_self->ucp_worker); - } - /* call OPAL progress on every 100 call to UCX progress */ - opal_progress(); - } + return opal_common_ucx_wait_request(request, mca_spml_self->ucp_worker); } END_C_DECLS diff --git a/oshmem/mca/spml/ucx/Makefile.am b/oshmem/mca/spml/ucx/Makefile.am index 84d8a74925..94a48a58e7 100644 --- a/oshmem/mca/spml/ucx/Makefile.am +++ b/oshmem/mca/spml/ucx/Makefile.am @@ -34,7 +34,8 @@ mcacomponentdir = $(ompilibdir) mcacomponent_LTLIBRARIES = $(component_install) mca_spml_ucx_la_SOURCES = $(ucx_sources) mca_spml_ucx_la_LIBADD = $(top_builddir)/oshmem/liboshmem.la \ - $(spml_ucx_LIBS) + $(spml_ucx_LIBS) \ + $(OPAL_TOP_BUILDDIR)/opal/mca/common/ucx/lib@OPAL_LIB_PREFIX@mca_common_ucx.la mca_spml_ucx_la_LDFLAGS = -module -avoid-version $(spml_ucx_LDFLAGS) noinst_LTLIBRARIES = $(component_noinst) diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 463b0f0a00..7c5fcb119d 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -20,6 +20,7 @@ #include "oshmem_config.h" #include "opal/datatype/opal_convertor.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "orte/include/orte/types.h" #include "orte/runtime/orte_globals.h" #include "ompi/datatype/ompi_datatype.h" @@ -611,7 +612,7 @@ int mca_spml_ucx_quiet(void) { ucs_status_t err; - err = ucp_worker_flush(mca_spml_ucx.ucp_worker); + err = opal_common_ucx_worker_flush(mca_spml_ucx.ucp_worker); if (UCS_OK != err) { SPML_ERROR("quiet failed: %s", ucs_status_string(err)); oshmem_shmem_abort(-1); From 63e7ba6843afd921ce2cc6b69eb17b201b203edd Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Mon, 25 Jun 2018 11:00:12 +0300 Subject: [PATCH 2/3] MCA/COMMON/UCX: added parameter for UCX/opal progress - added parameter to set UCX/opal progresses - minor refactoring of request wait routines Signed-off-by: Sergey Oblomov --- ompi/mca/osc/ucx/osc_ucx_component.c | 2 ++ opal/mca/common/ucx/Makefile.am | 8 +++--- opal/mca/common/ucx/common_ucx.c | 32 +++++++++++---------- opal/mca/common/ucx/common_ucx.h | 34 +++++++++++++++++------ oshmem/mca/atomic/ucx/atomic_ucx.h | 8 ------ oshmem/mca/atomic/ucx/atomic_ucx_cswap.c | 10 ++++--- oshmem/mca/atomic/ucx/atomic_ucx_fadd.c | 5 ++-- oshmem/mca/atomic/ucx/atomic_ucx_module.c | 5 ---- oshmem/mca/spml/ucx/spml_ucx.c | 1 + 9 files changed, 60 insertions(+), 45 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c index 0c518e371e..8b4123758a 100644 --- a/ompi/mca/osc/ucx/osc_ucx_component.c +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -12,6 +12,7 @@ #include "ompi/mca/osc/osc.h" #include "ompi/mca/osc/base/base.h" #include "ompi/mca/osc/base/osc_base_obj_convert.h" +#include "opal/mca/common/ucx/common_ucx.h" #include "osc_ucx.h" #include "osc_ucx_request.h" @@ -179,6 +180,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads) goto error; } + opal_common_ucx_mca_register(); return ret; error: if (progress_registered) opal_progress_unregister(progress_callback); diff --git a/opal/mca/common/ucx/Makefile.am b/opal/mca/common/ucx/Makefile.am index 9ac8222743..1f24f3e8b0 100644 --- a/opal/mca/common/ucx/Makefile.am +++ b/opal/mca/common/ucx/Makefile.am @@ -53,15 +53,15 @@ lib@OPAL_LIB_PREFIX@mca_common_ucx_la_SOURCES = \ lib@OPAL_LIB_PREFIX@mca_common_ucx_la_LDFLAGS = \ -version-info $(libmca_opal_common_ucx_so_version) \ $(common_ucx_LDFLAGS) -lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LDFLAGS = \ - $(common_ucx_LDFLAGS) lib@OPAL_LIB_PREFIX@mca_common_ucx_la_LIBADD = \ $(common_ucx_LIBS) \ $(OMPI_TOP_BUILDDIR)/opal/lib@OPAL_LIB_PREFIX@open-pal.la -lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LIBADD = \ - $(common_ucx_LIBS) lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_SOURCES = \ $(headers) $(sources) +lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LDFLAGS = \ + $(common_ucx_LDFLAGS) +lib@OPAL_LIB_PREFIX@mca_common_ucx_noinst_la_LIBADD = \ + $(common_ucx_LIBS) # Conditionally install the header files diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c index 77dc9211e5..5d30fbe47e 100644 --- a/opal/mca/common/ucx/common_ucx.c +++ b/opal/mca/common/ucx/common_ucx.c @@ -10,25 +10,29 @@ #include "opal_config.h" #include "common_ucx.h" +#include "opal/mca/base/mca_base_var.h" /***********************************************************************/ -static void opal_common_ucp_send_cb(void *request, ucs_status_t status) +int opal_common_ucx_progress_iterations = 100; + +OPAL_DECLSPEC void opal_common_ucx_mca_register(void) { + static int registered = 0; + + if (registered) { + /* process once */ + return; + } + + registered = 1; + mca_base_var_register("opal", "opal_common", "ucx", "progress_iterations", + "Set number of calls of internal UCX progress calls per opal_progress call", + MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, + &opal_common_ucx_progress_iterations); } -ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker) +void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status) { - ucs_status_ptr_t status; - - status = ucp_ep_flush_nb(ep, 0, opal_common_ucp_send_cb); - return opal_common_ucx_wait_request(status, worker); -} - -ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker) -{ - ucs_status_ptr_t status; - - status = ucp_worker_flush_nb(worker, 0, opal_common_ucp_send_cb); - return opal_common_ucx_wait_request(status, worker); } diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 08ffb09ac0..d1bbcbb8c9 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -23,8 +23,10 @@ BEGIN_C_DECLS -OPAL_DECLSPEC ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker); -OPAL_DECLSPEC ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker); +extern int opal_common_ucx_progress_iterations; + +OPAL_DECLSPEC void opal_common_ucx_mca_register(void); +OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status); static inline ucs_status_t opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker) @@ -33,28 +35,44 @@ ucs_status_t opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h int i; /* check for request completed or failed */ - if (UCS_OK == request) { + if (OPAL_LIKELY(UCS_OK == request)) { return UCS_OK; - } else if (UCS_PTR_IS_ERR(request)) { + } else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) { return UCS_PTR_STATUS(request); } while (1) { /* call UCX progress */ - for (i = 0; i < 100; i++) { + for (i = 0; i < opal_common_ucx_progress_iterations; i++) { if (UCS_INPROGRESS != (status = ucp_request_check_status(request))) { ucp_request_free(request); return status; } ucp_worker_progress(worker); } - /* call OPAL progress on every 100 call to UCX progress */ + /* call OPAL progress on every opal_common_ucx_progress_iterations + * calls to UCX progress */ opal_progress(); } } +static inline +ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker) +{ + ucs_status_ptr_t status; + + status = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb); + return opal_common_ucx_wait_request(status, worker); +} + +static inline +ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker) +{ + ucs_status_ptr_t status; + + status = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb); + return opal_common_ucx_wait_request(status, worker); +} END_C_DECLS #endif - - diff --git a/oshmem/mca/atomic/ucx/atomic_ucx.h b/oshmem/mca/atomic/ucx/atomic_ucx.h index 3fda14c94e..ff8281515e 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx.h +++ b/oshmem/mca/atomic/ucx/atomic_ucx.h @@ -61,14 +61,6 @@ struct mca_atomic_ucx_module_t { typedef struct mca_atomic_ucx_module_t mca_atomic_ucx_module_t; OBJ_CLASS_DECLARATION(mca_atomic_ucx_module_t); - -void mca_atomic_ucx_complete_cb(void *request, ucs_status_t status); - -static inline -ucs_status_t mca_atomic_ucx_wait_request(ucs_status_ptr_t request) -{ - return opal_common_ucx_wait_request(request, mca_spml_self->ucp_worker); -} END_C_DECLS #endif /* MCA_ATOMIC_UCX_H */ diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c index 57723cf0ae..f6740f3897 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -41,15 +41,17 @@ int mca_atomic_ucx_cswap_inner(void *target, if (NULL == cond) { status_ptr = ucp_atomic_fetch_nb(mca_spml_self->ucp_peers[pe].ucp_conn, UCP_ATOMIC_FETCH_OP_SWAP, val, prev, nlong, - rva, ucx_mkey->rkey, mca_atomic_ucx_complete_cb); - status = mca_atomic_ucx_wait_request(status_ptr); + rva, ucx_mkey->rkey, + opal_common_ucx_empty_complete_cb); + status = opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker); } else { cmp = (4 == nlong) ? *(uint32_t*)cond : *(uint64_t*)cond; status_ptr = ucp_atomic_fetch_nb(mca_spml_self->ucp_peers[pe].ucp_conn, UCP_ATOMIC_FETCH_OP_CSWAP, cmp, &val, nlong, - rva, ucx_mkey->rkey, mca_atomic_ucx_complete_cb); - status = mca_atomic_ucx_wait_request(status_ptr); + rva, ucx_mkey->rkey, + opal_common_ucx_empty_complete_cb); + status = opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker); if (UCS_OK == status) { assert(NULL != prev); memcpy(prev, &val, nlong); diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c index b8639d2d00..f92c53ed22 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c @@ -49,8 +49,9 @@ int mca_atomic_ucx_fadd(void *target, else { status_ptr = ucp_atomic_fetch_nb(mca_spml_self->ucp_peers[pe].ucp_conn, UCP_ATOMIC_FETCH_OP_FADD, val, prev, nlong, - rva, ucx_mkey->rkey, mca_atomic_ucx_complete_cb); - status = mca_atomic_ucx_wait_request(status_ptr); + rva, ucx_mkey->rkey, + opal_common_ucx_empty_complete_cb); + status = opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker); } return ucx_status_to_oshmem(status); diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_module.c b/oshmem/mca/atomic/ucx/atomic_ucx_module.c index a59783d186..753e77bdda 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_module.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_module.c @@ -48,8 +48,3 @@ mca_atomic_ucx_query(int *priority) return NULL ; } - -void mca_atomic_ucx_complete_cb(void *request, ucs_status_t status) -{ -} - diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 7c5fcb119d..09032ff95f 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -506,6 +506,7 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr, mkeys[0].va_base = addr; *count = 1; mca_spml_ucx_cache_mkey(&mkeys[0], segno, my_pe); + opal_common_ucx_mca_register(); return mkeys; error_unmap: From bf7fd480e9556ed79ba64a45ab373eaeec16ec66 Mon Sep 17 00:00:00 2001 From: Sergey Oblomov Date: Mon, 25 Jun 2018 12:25:31 +0300 Subject: [PATCH 3/3] MCA/COMMON/UCX: added non-blocking implementations of atomics - added implementation of swap/cswap/fadd operations - blocking add64 is replaced by non-blocking routine Signed-off-by: Sergey Oblomov --- ompi/mca/osc/ucx/osc_ucx_active_target.c | 9 ++++-- ompi/mca/osc/ucx/osc_ucx_comm.c | 12 ++++--- ompi/mca/osc/ucx/osc_ucx_passive_target.c | 22 ++++++++----- opal/mca/common/ucx/common_ucx.c | 8 ++--- opal/mca/common/ucx/common_ucx.h | 38 +++++++++++++++++++++++ 5 files changed, 69 insertions(+), 20 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_active_target.c b/ompi/mca/osc/ucx/osc_ucx_active_target.c index 348d1cf701..49c72b4a50 100644 --- a/ompi/mca/osc/ucx/osc_ucx_active_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_active_target.c @@ -260,7 +260,9 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t uint64_t curr_idx = 0, result = 0; /* do fop first to get an post index */ - status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result); + status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1, + &result, sizeof(result), + remote_addr, rkey, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_fadd64 failed: %d\n", @@ -273,8 +275,9 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t /* do cas to send post message */ do { - status = ucp_atomic_cswap64(ep, 0, (uint64_t)myrank + 1, - remote_addr, rkey, &result); + status = opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result, + sizeof(result), remote_addr, rkey, + mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_cswap64 failed: %d\n", diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 35a6af55b8..99fb8af44d 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -290,9 +290,10 @@ static inline int start_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, in ucs_status_t status; while (result_value != TARGET_LOCK_UNLOCKED) { - status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED, - TARGET_LOCK_EXCLUSIVE, - remote_addr, rkey, &result_value); + status = opal_common_ucx_atomic_cswap(ep, TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE, + &result_value, sizeof(result_value), + remote_addr, rkey, + mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_cswap64 failed: %d\n", @@ -310,8 +311,9 @@ static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET; ucs_status_t status; - status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED, - remote_addr, rkey, &result_value); + status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, + &result_value, sizeof(result_value), + remote_addr, rkey, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_swap64 failed: %d\n", diff --git a/ompi/mca/osc/ucx/osc_ucx_passive_target.c b/ompi/mca/osc/ucx/osc_ucx_passive_target.c index 432b2b7edf..e0781da81c 100644 --- a/ompi/mca/osc/ucx/osc_ucx_passive_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_passive_target.c @@ -26,7 +26,9 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) { ucs_status_t status; while (true) { - status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result_value); + status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1, + &result_value, sizeof(result_value), + remote_addr, rkey, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_fadd64 failed: %d\n", @@ -35,7 +37,8 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) { } assert(result_value >= 0); if (result_value >= TARGET_LOCK_EXCLUSIVE) { - status = ucp_atomic_add64(ep, (-1), remote_addr, rkey); + status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, (-1), sizeof(uint64_t), + remote_addr, rkey); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_add64 failed: %d\n", @@ -56,7 +59,8 @@ static inline int end_shared(ompi_osc_ucx_module_t *module, int target) { uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; ucs_status_t status; - status = ucp_atomic_add64(ep, (-1), remote_addr, rkey); + status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, (-1), sizeof(uint64_t), + remote_addr, rkey); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_add64 failed: %d\n", @@ -75,9 +79,10 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) { ucs_status_t status; while (result_value != TARGET_LOCK_UNLOCKED) { - status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED, - TARGET_LOCK_EXCLUSIVE, - remote_addr, rkey, &result_value); + status = opal_common_ucx_atomic_cswap(ep, TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE, + &result_value, sizeof(result_value), + remote_addr, rkey, + mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_cswap64 failed: %d\n", @@ -96,8 +101,9 @@ static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) { uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; ucs_status_t status; - status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED, - remote_addr, rkey, &result_value); + status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, + &result_value, sizeof(result_value), + remote_addr, rkey, mca_osc_ucx_component.ucp_worker); if (status != UCS_OK) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ucp_atomic_swap64 failed: %d\n", diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c index 5d30fbe47e..85b96a92cd 100644 --- a/opal/mca/common/ucx/common_ucx.c +++ b/opal/mca/common/ucx/common_ucx.c @@ -27,10 +27,10 @@ OPAL_DECLSPEC void opal_common_ucx_mca_register(void) registered = 1; mca_base_var_register("opal", "opal_common", "ucx", "progress_iterations", - "Set number of calls of internal UCX progress calls per opal_progress call", - MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, - OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, - &opal_common_ucx_progress_iterations); + "Set number of calls of internal UCX progress calls per opal_progress call", + MCA_BASE_VAR_TYPE_INT, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, + &opal_common_ucx_progress_iterations); } void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status) diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index d1bbcbb8c9..3a2d208dc8 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -73,6 +73,44 @@ ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker) status = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb); return opal_common_ucx_wait_request(status, worker); } + +static inline +ucs_status_t opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode, + uint64_t value, void *result, size_t op_size, + uint64_t remote_addr, ucp_rkey_h rkey, + ucp_worker_h worker) +{ + ucs_status_ptr_t request; + + request = ucp_atomic_fetch_nb(ep, opcode, value, result, op_size, + remote_addr, rkey, opal_common_ucx_empty_complete_cb); + return opal_common_ucx_wait_request(request, worker); +} + +static inline +ucs_status_t opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare, + uint64_t value, void *result, size_t op_size, + uint64_t remote_addr, ucp_rkey_h rkey, + ucp_worker_h worker) +{ + uint64_t tmp = value; + ucs_status_t status; + + status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp, + op_size, remote_addr, rkey, worker); + if (OPAL_LIKELY(UCS_OK == status)) { + /* in case if op_size is constant (like sizeof(type)) then this condition + * is evaluated in compile time */ + if (op_size == sizeof(uint64_t)) { + *(uint64_t*)result = tmp; + } else { + assert(op_size == sizeof(uint32_t)); + *(uint32_t*)result = tmp; + } + } + return status; +} + END_C_DECLS #endif