diff --git a/config/ompi_check_ucx.m4 b/config/ompi_check_ucx.m4 new file mode 100644 index 0000000000..3a7b8035cd --- /dev/null +++ b/config/ompi_check_ucx.m4 @@ -0,0 +1,80 @@ +# -*- shell-script -*- +# +# Copyright (C) Mellanox Technologies Ltd. 2015. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_CHECK_UCX(prefix, [action-if-found], [action-if-not-found]) +# -------------------------------------------------------- +# check if UCX support can be found. sets prefix_{CPPFLAGS, +# LDFLAGS, LIBS} as needed and runs action-if-found if there is +# support, otherwise executes action-if-not-found +AC_DEFUN([OMPI_CHECK_UCX],[ + AC_ARG_WITH([ucx], + [AC_HELP_STRING([--with-ucx(=DIR)], + [Build with Unified Communication X library support])]) + OPAL_CHECK_WITHDIR([ucx], [$with_ucx], [include/ucp/api/ucp.h]) + AC_ARG_WITH([ucx-libdir], + [AC_HELP_STRING([--with-ucx-libdir=DIR], + [Search for Unified Communication X libraries in DIR])]) + OPAL_CHECK_WITHDIR([ucx-libdir], [$with_ucx_libdir], [libucp.*]) + + ompi_check_ucx_$1_save_CPPFLAGS="$CPPFLAGS" + ompi_check_ucx_$1_save_LDFLAGS="$LDFLAGS" + ompi_check_ucx_$1_save_LIBS="$LIBS" + + AS_IF([test "$with_ucx" != "no"], + [AS_IF([test ! -z "$with_ucx" -a "$with_ucx" != "yes"], + [ + ompi_check_ucx_dir="$with_ucx" + ompi_check_ucx_libdir="$with_ucx/lib" + ]) + AS_IF([test ! -z "$with_ucx_libdir" -a "$with_ucx_libdir" != "yes"], + [ompi_check_ucx_libdir="$with_ucx_libdir"]) + + ompi_check_ucx_extra_libs="-L$ompi_check_ucx_libdir" + + OPAL_CHECK_PACKAGE([$1], + [ucp/api/ucp.h], + [ucp], + [ucp_cleanup], + [$ompi_check_ucx_extra_libs], + [$ompi_check_ucx_dir], + [$ompi_check_ucx_libdir], + [ompi_check_ucx_happy="yes"], + [ompi_check_ucx_happy="no"])], + [ompi_check_ucx_happy="no"]) + + + + CPPFLAGS="$ompi_check_ucx_$1_save_CPPFLAGS" + LDFLAGS="$ompi_check_ucx_$1_save_LDFLAGS" + LIBS="$ompi_check_ucx_$1_save_LIBS" + + AC_MSG_CHECKING(for UCX version compatibility) + AC_REQUIRE_CPP + old_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS -I$ompi_check_ucx_dir/include" + AC_COMPILE_IFELSE( + [AC_LANG_PROGRAM([[#include ]], + [[ + ]])], + [ompi_ucx_version_ok="yes"], + [ompi_ucx_version_ok="no"]) + + AC_MSG_RESULT([$ompi_ucx_version_ok]) + CFLAGS=$old_CFLAGS + + AS_IF([test "$ompi_ucx_version_ok" = "no"], [ompi_check_ucx_happy="no"]) + + AS_IF([test "$ompi_check_ucx_happy" = "yes"], + [$2], + [AS_IF([test ! -z "$with_ucx" -a "$with_ucx" != "no"], + [AC_MSG_ERROR([UCX support requested but not found. Aborting])]) + $3]) +]) + diff --git a/ompi/datatype/ompi_datatype.h b/ompi/datatype/ompi_datatype.h index 2deb4fcb9e..17e1632e07 100644 --- a/ompi/datatype/ompi_datatype.h +++ b/ompi/datatype/ompi_datatype.h @@ -73,6 +73,7 @@ struct ompi_datatype_t { void* args; /**< Data description for the user */ void* packed_description; /**< Packed description of the datatype */ + uint64_t pml_data; /**< PML-specific information */ /* --- cacheline 6 boundary (384 bytes) --- */ char name[MPI_MAX_OBJECT_NAME];/**< Externally visible name */ /* --- cacheline 7 boundary (448 bytes) --- */ diff --git a/ompi/datatype/ompi_datatype_create.c b/ompi/datatype/ompi_datatype_create.c index d97d9c942a..8c942ba4ba 100644 --- a/ompi/datatype/ompi_datatype_create.c +++ b/ompi/datatype/ompi_datatype_create.c @@ -35,6 +35,7 @@ static void __ompi_datatype_allocate( ompi_datatype_t* datatype ) datatype->d_keyhash = NULL; datatype->name[0] = '\0'; datatype->packed_description = NULL; + datatype->pml_data = 0; } static void __ompi_datatype_release(ompi_datatype_t * datatype) diff --git a/ompi/mca/pml/ucx/Makefile.am b/ompi/mca/pml/ucx/Makefile.am new file mode 100644 index 0000000000..0fdd85e272 --- /dev/null +++ b/ompi/mca/pml/ucx/Makefile.am @@ -0,0 +1,45 @@ +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +AM_CPPFLAGS = $(pml_ucx_CPPFLAGS) + +local_sources = \ + pml_ucx.h \ + pml_ucx.c \ + pml_ucx_request.h \ + pml_ucx_request.c \ + pml_ucx_datatype.h \ + pml_ucx_datatype.c \ + pml_ucx_freelist.h \ + pml_ucx_component.c + +if MCA_BUILD_ompi_pml_ucx_DSO +component_noinst = +component_install = mca_pml_ucx.la +else +component_noinst = libmca_pml_ucx.la +component_install = +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_pml_ucx_la_SOURCES = $(local_sources) +mca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS) +mca_pml_ucx_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_pml_ucx_la_SOURCES = $(local_sources) +libmca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS) +libmca_pml_ucx_la_LDFLAGS = -module -avoid-version + diff --git a/ompi/mca/pml/ucx/configure.m4 b/ompi/mca/pml/ucx/configure.m4 new file mode 100644 index 0000000000..9ee0273b39 --- /dev/null +++ b/ompi/mca/pml/ucx/configure.m4 @@ -0,0 +1,30 @@ +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + +AC_DEFUN([MCA_ompi_pml_ucx_POST_CONFIG], [ + AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([PML])]) +]) + +AC_DEFUN([MCA_ompi_pml_ucx_CONFIG], [ + AC_CONFIG_FILES([ompi/mca/pml/ucx/Makefile]) + + OMPI_CHECK_UCX([pml_ucx], + [pml_ucx_happy="yes"], + [pml_ucx_happy="no"]) + + AS_IF([test "$pml_ucx_happy" = "yes"], + [$1], + [$2]) + + # substitute in the things needed to build ucx + AC_SUBST([pml_ucx_CPPFLAGS]) + AC_SUBST([pml_ucx_LDFLAGS]) + AC_SUBST([pml_ucx_LIBS]) +]) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c new file mode 100644 index 0000000000..6f01a2007e --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -0,0 +1,712 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "pml_ucx.h" + +#include "opal/runtime/opal.h" +#include "opal/mca/pmix/pmix.h" +#include "ompi/message/message.h" +#include "pml_ucx_request.h" + +#include + + +#define PML_UCX_TRACE_SEND(_msg, _buf, _count, _datatype, _dst, _tag, _mode, _comm, ...) \ + PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' dst %d tag %d mode %s comm %d '%s'", \ + __VA_ARGS__, \ + (_buf), (_count), (_datatype)->name, (_dst), (_tag), \ + mca_pml_ucx_send_mode_name(_mode), (_comm)->c_contextid, \ + (_comm)->c_name); + +#define PML_UCX_TRACE_RECV(_msg, _buf, _count, _datatype, _src, _tag, _comm, ...) \ + PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' src %d tag %d comm %d '%s'", \ + __VA_ARGS__, \ + (_buf), (_count), (_datatype)->name, (_src), (_tag), \ + (_comm)->c_contextid, (_comm)->c_name); + +#define PML_UCX_TRACE_PROBE(_msg, _src, _tag, _comm) \ + PML_UCX_VERBOSE(8, _msg " src %d tag %d comm %d '%s'", \ + _src, (_tag), (_comm)->c_contextid, (_comm)->c_name); + +#define PML_UCX_TRACE_MRECV(_msg, _buf, _count, _datatype, _message) \ + PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' msg *%p=%p (%p)", \ + (_buf), (_count), (_datatype)->name, (void*)(_message), \ + (void*)*(_message), (*(_message))->req_ptr); + +#define MODEX_KEY "pml-ucx" + +mca_pml_ucx_module_t ompi_pml_ucx = { + { + mca_pml_ucx_add_procs, + mca_pml_ucx_del_procs, + mca_pml_ucx_enable, + NULL, + mca_pml_ucx_add_comm, + mca_pml_ucx_del_comm, + mca_pml_ucx_irecv_init, + mca_pml_ucx_irecv, + mca_pml_ucx_recv, + mca_pml_ucx_isend_init, + mca_pml_ucx_isend, + mca_pml_ucx_send, + mca_pml_ucx_iprobe, + mca_pml_ucx_probe, + mca_pml_ucx_start, + mca_pml_ucx_improbe, + mca_pml_ucx_mprobe, + mca_pml_ucx_imrecv, + mca_pml_ucx_mrecv, + mca_pml_ucx_dump, + NULL, /* FT */ + 1ul << (PML_UCX_TAG_BITS - 1), + 1ul << (PML_UCX_CONTEXT_BITS), + }, + NULL, + NULL +}; + +static int mca_pml_ucx_send_worker_address(void) +{ + ucp_address_t *address; + ucs_status_t status; + size_t addrlen; + int rc; + + status = ucp_worker_get_address(ompi_pml_ucx.ucp_worker, &address, &addrlen); + if (UCS_OK != status) { + PML_UCX_ERROR("Failed to get worker address"); + return OMPI_ERROR; + } + + OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL, + &mca_pml_ucx_component.pmlm_version, (void*)address, addrlen); + if (OMPI_SUCCESS != rc) { + PML_UCX_ERROR("Open MPI couldn't distribute EP connection details"); + return OMPI_ERROR; + } + + ucp_worker_release_address(ompi_pml_ucx.ucp_worker, address); + + return OMPI_SUCCESS; +} + +static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc, + ucp_address_t **address_p, + size_t *addrlen_p) +{ + int ret; + + *address_p = NULL; + OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name, + (void**)address_p, addrlen_p); + if (ret < 0) { + PML_UCX_ERROR("Failed to receive EP address"); + } + return ret; +} + +int mca_pml_ucx_open(void) +{ + ucp_params_t params; + ucp_config_t *config; + ucs_status_t status; + + PML_UCX_VERBOSE(1, "mca_pml_ucx_open"); + + /* Read options */ + status = ucp_config_read("MPI", NULL, &config); + if (UCS_OK != status) { + return OMPI_ERROR; + } + + params.features = UCP_FEATURE_TAG; + params.request_size = sizeof(ompi_request_t); + params.request_init = mca_pml_ucx_request_init; + params.request_cleanup = mca_pml_ucx_request_cleanup; + + status = ucp_init(¶ms, config, &ompi_pml_ucx.ucp_context); + ucp_config_release(config); + + if (UCS_OK != status) { + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +int mca_pml_ucx_close(void) +{ + PML_UCX_VERBOSE(1, "mca_pml_ucx_close"); + + if (ompi_pml_ucx.ucp_context != NULL) { + ucp_cleanup(ompi_pml_ucx.ucp_context); + ompi_pml_ucx.ucp_context = NULL; + } + return OMPI_SUCCESS; +} + +int mca_pml_ucx_init(void) +{ + ucs_status_t status; + int rc; + + PML_UCX_VERBOSE(1, "mca_pml_ucx_init"); + + /* TODO check MPI thread mode */ + status = ucp_worker_create(ompi_pml_ucx.ucp_context, UCS_THREAD_MODE_SINGLE, + &ompi_pml_ucx.ucp_worker); + if (UCS_OK != status) { + return OMPI_ERROR; + } + + rc = mca_pml_ucx_send_worker_address(); + if (rc < 0) { + return rc; + } + + /* Initialize the free lists */ + OBJ_CONSTRUCT(&ompi_pml_ucx.persistent_reqs, mca_pml_ucx_freelist_t); + OBJ_CONSTRUCT(&ompi_pml_ucx.convs, mca_pml_ucx_freelist_t); + + /* Create a completed request to be returned from isend */ + mca_pml_ucx_completed_request_init(&ompi_pml_ucx.completed_send_req); + + opal_progress_register(mca_pml_ucx_progress); + + PML_UCX_VERBOSE(2, "created ucp context %p, worker %p", + (void *)ompi_pml_ucx.ucp_context, + (void *)ompi_pml_ucx.ucp_worker); + return OMPI_SUCCESS; +} + +int mca_pml_ucx_cleanup(void) +{ + PML_UCX_VERBOSE(1, "mca_pml_ucx_cleanup"); + + opal_progress_unregister(mca_pml_ucx_progress); + + OMPI_REQUEST_FINI(&ompi_pml_ucx.completed_send_req); + OBJ_DESTRUCT(&ompi_pml_ucx.convs); + OBJ_DESTRUCT(&ompi_pml_ucx.persistent_reqs); + + if (ompi_pml_ucx.ucp_worker) { + ucp_worker_destroy(ompi_pml_ucx.ucp_worker); + ompi_pml_ucx.ucp_worker = NULL; + } + + return OMPI_SUCCESS; +} + +int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs) +{ + ucp_address_t *address; + ucs_status_t status; + size_t addrlen; + ucp_ep_h ep; + size_t i; + int ret; + + if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx", + procs, + nprocs))) { + return ret; + } + + for (i = 0; i < nprocs; ++i) { + ret = mca_pml_ucx_recv_worker_address(procs[i], &address, &addrlen); + if (ret < 0) { + return ret; + } + + if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) { + PML_UCX_VERBOSE(3, "already connected to proc. %d", procs[i]->super.proc_name.vpid); + continue; + } + + PML_UCX_VERBOSE(2, "connecting to proc. %d", procs[i]->super.proc_name.vpid); + status = ucp_ep_create(ompi_pml_ucx.ucp_worker, address, &ep); + free(address); + + if (UCS_OK != status) { + PML_UCX_ERROR("Failed to connect"); + return OMPI_ERROR; + } + + procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep; + } + + return OMPI_SUCCESS; +} + +int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs) +{ + ucp_ep_h ep; + size_t i; + + for (i = 0; i < nprocs; ++i) { + PML_UCX_VERBOSE(2, "disconnecting from rank %d", procs[i]->super.proc_name.vpid); + ep = procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]; + if (ep != NULL) { + ucp_ep_destroy(ep); + } + procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL; + } + return OMPI_SUCCESS; +} + +int mca_pml_ucx_enable(bool enable) +{ + PML_UCX_FREELIST_INIT(&ompi_pml_ucx.persistent_reqs, + mca_pml_ucx_persistent_request_t, + 128, -1, 128); + PML_UCX_FREELIST_INIT(&ompi_pml_ucx.convs, + mca_pml_ucx_convertor_t, + 128, -1, 128); + return OMPI_SUCCESS; +} + +int mca_pml_ucx_progress(void) +{ + static int inprogress = 0; + if (inprogress != 0) { + return 0; + } + + ++inprogress; + ucp_worker_progress(ompi_pml_ucx.ucp_worker); + --inprogress; + return OMPI_SUCCESS; +} + +int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm) +{ + return OMPI_SUCCESS; +} + +int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm) +{ + return OMPI_SUCCESS; +} + +int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_ucx_persistent_request_t *req; + + req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs); + if (req == NULL) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + PML_UCX_TRACE_RECV("irecv_init request *%p=%p", buf, count, datatype, src, + tag, comm, (void*)request, (void*)req); + + req->ompi.req_state = OMPI_REQUEST_INACTIVE; + req->flags = 0; + req->buffer = buf; + req->count = count; + req->datatype = mca_pml_ucx_get_datatype(datatype); + + PML_UCX_MAKE_RECV_TAG(req->tag, req->recv.tag_mask, tag, src, comm); + + *request = &req->ompi; + return OMPI_SUCCESS; +} + +int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ompi_request_t *req; + + PML_UCX_TRACE_RECV("irecv request *%p", buf, count, datatype, src, tag, comm, + (void*)request); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + mca_pml_ucx_get_datatype(datatype), + ucp_tag, ucp_tag_mask, + mca_pml_ucx_recv_completion); + if (UCS_PTR_IS_ERR(req)) { + PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } + + PML_UCX_VERBOSE(8, "got request %p", (void*)req); + *request = req; + return OMPI_SUCCESS; +} + +static void +mca_pml_ucx_blocking_recv_completion(void *request, ucs_status_t status, + ucp_tag_recv_info_t *info) +{ + ompi_request_t *req = request; + + PML_UCX_VERBOSE(8, "blocking receive request %p completed with status %s tag %"PRIx64" len %zu", + (void*)req, ucs_status_string(status), info->sender_tag, + info->length); + + OPAL_THREAD_LOCK(&ompi_request_lock); + mca_pml_ucx_set_recv_status(&req->req_status, status, info); + PML_UCX_ASSERT(!req->req_complete); + req->req_complete = true; + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src, + int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* mpi_status) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ompi_request_t *req; + + PML_UCX_TRACE_RECV("%s", buf, count, datatype, src, tag, comm, "recv"); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + mca_pml_ucx_get_datatype(datatype), + ucp_tag, ucp_tag_mask, + mca_pml_ucx_blocking_recv_completion); + if (UCS_PTR_IS_ERR(req)) { + PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } + + while (!req->req_complete) { + opal_progress(); + } + + if (mpi_status != MPI_STATUS_IGNORE) { + *mpi_status = req->req_status; + } + + req->req_complete = false; + ucp_request_release(req); + return OMPI_SUCCESS; +} + +static inline const char *mca_pml_ucx_send_mode_name(mca_pml_base_send_mode_t mode) +{ + switch (mode) { + case MCA_PML_BASE_SEND_SYNCHRONOUS: + return "sync"; + case MCA_PML_BASE_SEND_COMPLETE: + return "complete"; + case MCA_PML_BASE_SEND_BUFFERED: + return "buffered"; + case MCA_PML_BASE_SEND_READY: + return "ready"; + case MCA_PML_BASE_SEND_STANDARD: + return "standard"; + case MCA_PML_BASE_SEND_SIZE: + return "size"; + default: + return "unknown"; + } +} + +int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_ucx_persistent_request_t *req; + + + req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs); + if (req == NULL) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + PML_UCX_TRACE_SEND("isend_init request *%p=%p", buf, count, datatype, dst, + tag, mode, comm, (void*)request, (void*)req) + + req->ompi.req_state = OMPI_REQUEST_INACTIVE; + req->flags = MCA_PML_UCX_REQUEST_FLAG_SEND; + req->buffer = (void *)buf; + req->count = count; + req->datatype = mca_pml_ucx_get_datatype(datatype); + req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm); + req->send.mode = mode; + req->send.ep = mca_pml_ucx_get_ep(comm, dst); + + *request = &req->ompi; + return OMPI_SUCCESS; +} + +int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + ompi_request_t *req; + + PML_UCX_TRACE_SEND("isend request *%p", buf, count, datatype, dst, tag, mode, + comm, (void*)request) + + /* TODO special care to sync/buffered send */ + + req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count, + mca_pml_ucx_get_datatype(datatype), + PML_UCX_MAKE_SEND_TAG(tag, comm), + mca_pml_ucx_send_completion); + if (req == NULL) { + PML_UCX_VERBOSE(8, "returning completed request"); + *request = &ompi_pml_ucx.completed_send_req; + return OMPI_SUCCESS; + } else if (!UCS_PTR_IS_ERR(req)) { + PML_UCX_VERBOSE(8, "got request %p", (void*)req); + *request = req; + return OMPI_SUCCESS; + } else { + PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } +} + +int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst, + int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm) +{ + ompi_request_t *req; + + PML_UCX_TRACE_SEND("%s", buf, count, datatype, dst, tag, mode, comm, "send"); + + /* TODO special care to sync/buffered send */ + + req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count, + mca_pml_ucx_get_datatype(datatype), + PML_UCX_MAKE_SEND_TAG(tag, comm), + mca_pml_ucx_send_completion); + if (req == NULL) { + return OMPI_SUCCESS; + } else if (!UCS_PTR_IS_ERR(req)) { + PML_UCX_VERBOSE(8, "got request %p", (void*)req); + ompi_request_wait(&req, MPI_STATUS_IGNORE); + return OMPI_SUCCESS; + } else { + PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } +} + +int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, ompi_status_public_t* mpi_status) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ucp_tag_recv_info_t info; + ucp_tag_message_h ucp_msg; + + PML_UCX_TRACE_PROBE("iprobe", src, tag, comm); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + 0, &info); + if (ucp_msg != NULL) { + *matched = 1; + mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); + } else { + *matched = 0; + } + return OMPI_SUCCESS; +} + +int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* mpi_status) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ucp_tag_recv_info_t info; + ucp_tag_message_h ucp_msg; + + PML_UCX_TRACE_PROBE("probe", src, tag, comm); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + for (;;) { + ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + 0, &info); + if (ucp_msg != NULL) { + mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); + return OMPI_SUCCESS; + } + + opal_progress(); + } +} + +int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, struct ompi_message_t **message, + ompi_status_public_t* mpi_status) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ucp_tag_recv_info_t info; + ucp_tag_message_h ucp_msg; + + PML_UCX_TRACE_PROBE("improbe", src, tag, comm); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + 1, &info); + if (ucp_msg != NULL) { + PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message); + PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg); + *matched = 1; + mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); + } else if (UCS_PTR_STATUS(ucp_msg) == UCS_ERR_NO_MESSAGE) { + *matched = 0; + } + return OMPI_SUCCESS; +} + +int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm, + struct ompi_message_t **message, + ompi_status_public_t* mpi_status) +{ + ucp_tag_t ucp_tag, ucp_tag_mask; + ucp_tag_recv_info_t info; + ucp_tag_message_h ucp_msg; + + PML_UCX_TRACE_PROBE("mprobe", src, tag, comm); + + PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); + for (;;) { + ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, + 1, &info); + if (ucp_msg != NULL) { + PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message); + PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg); + mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); + return OMPI_SUCCESS; + } + + opal_progress(); + } +} + +int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + struct ompi_request_t **request) +{ + ompi_request_t *req; + + PML_UCX_TRACE_MRECV("imrecv", buf, count, datatype, message); + + req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + mca_pml_ucx_get_datatype(datatype), + (*message)->req_ptr, + mca_pml_ucx_recv_completion); + if (UCS_PTR_IS_ERR(req)) { + PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } + + PML_UCX_VERBOSE(8, "got request %p", (void*)req); + PML_UCX_MESSAGE_RELEASE(message); + *request = req; + return OMPI_SUCCESS; +} + +int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + ompi_status_public_t* status) +{ + ompi_request_t *req; + + PML_UCX_TRACE_MRECV("mrecv", buf, count, datatype, message); + + req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count, + mca_pml_ucx_get_datatype(datatype), + (*message)->req_ptr, + mca_pml_ucx_recv_completion); + if (UCS_PTR_IS_ERR(req)) { + PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); + return OMPI_ERROR; + } + + PML_UCX_MESSAGE_RELEASE(message); + + ompi_request_wait(&req, status); + return OMPI_SUCCESS; +} + +int mca_pml_ucx_start(size_t count, ompi_request_t** requests) +{ + mca_pml_ucx_persistent_request_t *preq; + ompi_request_t *tmp_req; + size_t i; + + for (i = 0; i < count; ++i) { + preq = (mca_pml_ucx_persistent_request_t *)requests[i]; + + if ((preq == NULL) || (OMPI_REQUEST_PML != preq->ompi.req_type)) { + /* Skip irrelevant requests */ + continue; + } + + PML_UCX_ASSERT(preq->ompi.req_state != OMPI_REQUEST_INVALID); + preq->ompi.req_state = OMPI_REQUEST_ACTIVE; + mca_pml_ucx_request_reset(&preq->ompi); + + if (preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) { + /* TODO special care to sync/buffered send */ + PML_UCX_VERBOSE(8, "start send request %p", (void*)preq); + tmp_req = (ompi_request_t*)ucp_tag_send_nb(preq->send.ep, preq->buffer, + preq->count, preq->datatype, + preq->tag, + mca_pml_ucx_psend_completion); + } else { + PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq); + tmp_req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, + preq->buffer, preq->count, + preq->datatype, preq->tag, + preq->recv.tag_mask, + mca_pml_ucx_precv_completion); + } + + if (tmp_req == NULL) { + /* Only send can complete immediately */ + PML_UCX_ASSERT(preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND); + + PML_UCX_VERBOSE(8, "send completed immediately, completing persistent request %p", + (void*)preq); + OPAL_THREAD_LOCK(&ompi_request_lock); + mca_pml_ucx_set_send_status(&preq->ompi.req_status, UCS_OK); + ompi_request_complete(&preq->ompi, true); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + } else if (!UCS_PTR_IS_ERR(tmp_req)) { + OPAL_THREAD_LOCK(&ompi_request_lock); + if (tmp_req->req_complete) { + /* tmp_req is already completed */ + PML_UCX_VERBOSE(8, "completing persistent request %p", (void*)preq); + mca_pml_ucx_persistent_requset_complete(preq, tmp_req); + } else { + /* tmp_req would be completed by callback and trigger completion + * of preq */ + PML_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p", + (void*)tmp_req, (void*)preq); + tmp_req->req_complete_cb_data = preq; + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + } else { + PML_UCX_ERROR("ucx %s failed: %s", + (preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) ? "send" : "recv", + ucs_status_string(UCS_PTR_STATUS(tmp_req))); + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose) +{ + return OMPI_SUCCESS; +} diff --git a/ompi/mca/pml/ucx/pml_ucx.h b/ompi/mca/pml/ucx/pml_ucx.h new file mode 100644 index 0000000000..8a696e8111 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx.h @@ -0,0 +1,147 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PML_UCX_H_ +#define PML_UCX_H_ + +#include "ompi_config.h" +#include "ompi/request/request.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/base.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/request/request.h" + +#include +#include "pml_ucx_freelist.h" + + +typedef struct mca_pml_ucx_module mca_pml_ucx_module_t; +typedef struct pml_ucx_persistent_request mca_pml_ucx_persistent_request_t; +typedef struct pml_ucx_convertor mca_pml_ucx_convertor_t; + +/* + * TODO version check + */ + +struct mca_pml_ucx_module { + mca_pml_base_module_t super; + + /* UCX global objects */ + ucp_context_h ucp_context; + ucp_worker_h ucp_worker; + + /* Requests */ + mca_pml_ucx_freelist_t persistent_reqs; + ompi_request_t completed_send_req; + + /* Convertors pool */ + mca_pml_ucx_freelist_t convs; + + int priority; + int verbose; + int output; +}; + +extern mca_pml_base_component_2_0_0_t mca_pml_ucx_component; +extern mca_pml_ucx_module_t ompi_pml_ucx; + + +/* Debugging */ +#define PML_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG +#if PML_UCX_ENABLE_DEBUG +# define PML_UCX_MAX_VERBOSE 9 +# define PML_UCX_ASSERT(_x) assert(_x) +#else +# define PML_UCX_MAX_VERBOSE 2 +# define PML_UCX_ASSERT(_x) +#endif + +#define _PML_UCX_QUOTE(_x) \ + # _x +#define PML_UCX_QUOTE(_x) \ + _PML_UCX_QUOTE(_x) + +#define PML_UCX_ERROR(...) \ + opal_output_verbose(0, ompi_pml_ucx.output, "Error: " __FILE__ ":" \ + PML_UCX_QUOTE(__LINE__) __VA_ARGS__) + +#define PML_UCX_VERBOSE(_level, ... ) \ + if (((_level) <= PML_UCX_MAX_VERBOSE) && ((_level) <= ompi_pml_ucx.verbose)) { \ + opal_output_verbose(_level, ompi_pml_ucx.output, __FILE__ ":" \ + PML_UCX_QUOTE(__LINE__) __VA_ARGS__); \ + } + +int mca_pml_ucx_open(void); +int mca_pml_ucx_close(void); +int mca_pml_ucx_init(void); +int mca_pml_ucx_cleanup(void); + +int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs); +int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs); + +int mca_pml_ucx_enable(bool enable); +int mca_pml_ucx_progress(void); + +int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm); +int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm); + +int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src, + int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status); + +int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst, + int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm); + +int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, ompi_status_public_t* status); + +int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status); + +int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm, + struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + struct ompi_request_t **request); + +int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_ucx_start(size_t count, ompi_request_t** requests); + +int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose); + +#endif /* PML_UCX_H_ */ diff --git a/ompi/mca/pml/ucx/pml_ucx_component.c b/ompi/mca/pml/ucx/pml_ucx_component.c new file mode 100644 index 0000000000..107476d242 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_component.c @@ -0,0 +1,107 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "pml_ucx.h" + + +static int mca_pml_ucx_component_register(void); +static int mca_pml_ucx_component_open(void); +static int mca_pml_ucx_component_close(void); + +static mca_pml_base_module_t* +mca_pml_ucx_component_init(int* priority, bool enable_progress_threads, + bool enable_mpi_threads); +static int mca_pml_ucx_component_fini(void); + + +mca_pml_base_component_2_0_0_t mca_pml_ucx_component = { + + /* First, the mca_base_component_t struct containing meta + * information about the component itself */ + { + MCA_PML_BASE_VERSION_2_0_0, + + "ucx", /* MCA component name */ + OMPI_MAJOR_VERSION, /* MCA component major version */ + OMPI_MINOR_VERSION, /* MCA component minor version */ + OMPI_RELEASE_VERSION, /* MCA component release version */ + mca_pml_ucx_component_open, /* component open */ + mca_pml_ucx_component_close, /* component close */ + NULL, + mca_pml_ucx_component_register, + }, + { + /* This component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + mca_pml_ucx_component_init, /* component init */ + mca_pml_ucx_component_fini /* component finalize */ +}; + +static int mca_pml_ucx_component_register(void) +{ + ompi_pml_ucx.verbose = 0; + (void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "verbose", + "Verbose level of the UCX component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_LOCAL, + &ompi_pml_ucx.verbose); + + ompi_pml_ucx.priority = 50; + (void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "priority", + "Priority of the UCX component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_LOCAL, + &ompi_pml_ucx.priority); + + return 0; +} + +static int mca_pml_ucx_component_open(void) +{ + ompi_pml_ucx.output = opal_output_open(NULL); + opal_output_set_verbosity(ompi_pml_ucx.output, ompi_pml_ucx.verbose); + return mca_pml_ucx_open(); +} + +static int mca_pml_ucx_component_close(void) +{ + int rc; + + rc = mca_pml_ucx_close(); + if (rc != 0) { + return rc; + } + + opal_output_close(ompi_pml_ucx.output); + return 0; +} + +static mca_pml_base_module_t* +mca_pml_ucx_component_init(int* priority, bool enable_progress_threads, + bool enable_mpi_threads) +{ + int ret; + + if ( (ret = mca_pml_ucx_init()) != 0) { + return NULL; + } + + *priority = ompi_pml_ucx.priority; + return &ompi_pml_ucx.super; +} + +static int mca_pml_ucx_component_fini(void) +{ + return mca_pml_ucx_cleanup(); +} + diff --git a/ompi/mca/pml/ucx/pml_ucx_datatype.c b/ompi/mca/pml/ucx/pml_ucx_datatype.c new file mode 100644 index 0000000000..488642fcda --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_datatype.c @@ -0,0 +1,157 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "pml_ucx_datatype.h" + +#include "ompi/runtime/mpiruntime.h" + +#include + + +static void* pml_ucx_generic_datatype_start_pack(void *context, const void *buffer, + size_t count) +{ + ompi_datatype_t *datatype = context; + mca_pml_ucx_convertor_t *convertor; + + convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs); + + OBJ_RETAIN(datatype); + convertor->datatype = datatype; + opal_convertor_copy_and_prepare_for_send(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buffer, 0, + &convertor->opal_conv); + return convertor; +} + +static void* pml_ucx_generic_datatype_start_unpack(void *context, void *buffer, + size_t count) +{ + ompi_datatype_t *datatype = context; + mca_pml_ucx_convertor_t *convertor; + + convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs); + + OBJ_RETAIN(datatype); + convertor->datatype = datatype; + opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buffer, 0, + &convertor->opal_conv); + return convertor; +} + +static size_t pml_ucx_generic_datatype_packed_size(void *state) +{ + mca_pml_ucx_convertor_t *convertor = state; + size_t size; + + opal_convertor_get_packed_size(&convertor->opal_conv, &size); + return size; +} + +static size_t pml_ucx_generic_datatype_pack(void *state, size_t offset, + void *dest, size_t max_length) +{ + mca_pml_ucx_convertor_t *convertor = state; + uint32_t iov_count; + struct iovec iov; + size_t length; + + iov_count = 1; + iov.iov_base = dest; + iov.iov_len = max_length; + + opal_convertor_set_position(&convertor->opal_conv, &offset); + length = max_length; + opal_convertor_pack(&convertor->opal_conv, &iov, &iov_count, &length); + return length; +} + +static ucs_status_t pml_ucx_generic_datatype_unpack(void *state, size_t offset, + const void *src, size_t length) +{ + mca_pml_ucx_convertor_t *convertor = state; + + uint32_t iov_count; + struct iovec iov; + + iov_count = 1; + iov.iov_base = (void*)src; + iov.iov_len = length; + + opal_convertor_set_position(&convertor->opal_conv, &offset); + opal_convertor_unpack(&convertor->opal_conv, &iov, &iov_count, &length); + return UCS_OK; +} + +static void pml_ucx_generic_datatype_finish(void *state) +{ + mca_pml_ucx_convertor_t *convertor = state; + + opal_convertor_cleanup(&convertor->opal_conv); + OBJ_RELEASE(convertor->datatype); + PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.convs, &convertor->super); +} + +static ucp_generic_dt_ops_t pml_ucx_generic_datatype_ops = { + .start_pack = pml_ucx_generic_datatype_start_pack, + .start_unpack = pml_ucx_generic_datatype_start_unpack, + .packed_size = pml_ucx_generic_datatype_packed_size, + .pack = pml_ucx_generic_datatype_pack, + .unpack = pml_ucx_generic_datatype_unpack, + .finish = pml_ucx_generic_datatype_finish +}; + +ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype) +{ + ucp_datatype_t ucp_datatype; + ucs_status_t status; + ptrdiff_t lb; + size_t size; + + ompi_datatype_type_lb(datatype, &lb); + + if ((datatype->super.flags & OPAL_DATATYPE_FLAG_CONTIGUOUS) && + (datatype->super.flags & OPAL_DATATYPE_FLAG_NO_GAPS) && + (lb == 0)) + { + ompi_datatype_type_size(datatype, &size); + PML_UCX_ASSERT(size > 0); + datatype->pml_data = ucp_dt_make_contig(size); + return datatype->pml_data; + } + + status = ucp_dt_create_generic(&pml_ucx_generic_datatype_ops, + datatype, &ucp_datatype); + if (status != UCS_OK) { + PML_UCX_ERROR("Failed to create UCX datatype for %s", datatype->name); + ompi_mpi_abort(&ompi_mpi_comm_world.comm, 1); + } + + PML_UCX_VERBOSE(7, "created generic UCX datatype 0x%"PRIx64, ucp_datatype) + // TODO put this on a list to be destroyed later + + datatype->pml_data = ucp_datatype; + return ucp_datatype; +} + +static void mca_pml_ucx_convertor_construct(mca_pml_ucx_convertor_t *convertor) +{ + OBJ_CONSTRUCT(&convertor->opal_conv, opal_convertor_t); +} + +static void mca_pml_ucx_convertor_destruct(mca_pml_ucx_convertor_t *convertor) +{ + OBJ_DESTRUCT(&convertor->opal_conv); +} + +OBJ_CLASS_INSTANCE(mca_pml_ucx_convertor_t, + opal_free_list_item_t, + mca_pml_ucx_convertor_construct, + mca_pml_ucx_convertor_destruct); diff --git a/ompi/mca/pml/ucx/pml_ucx_datatype.h b/ompi/mca/pml/ucx/pml_ucx_datatype.h new file mode 100644 index 0000000000..79dce36cc8 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_datatype.h @@ -0,0 +1,39 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PML_UCX_DATATYPE_H_ +#define PML_UCX_DATATYPE_H_ + +#include "pml_ucx.h" + + +struct pml_ucx_convertor { + opal_free_list_item_t super; + ompi_datatype_t *datatype; + opal_convertor_t opal_conv; +}; + + +ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype); + +OBJ_CLASS_DECLARATION(mca_pml_ucx_convertor_t); + + +static inline ucp_datatype_t mca_pml_ucx_get_datatype(ompi_datatype_t *datatype) +{ + ucp_datatype_t ucp_type = datatype->pml_data; + + if (OPAL_LIKELY(ucp_type != 0)) { + return ucp_type; + } + + return mca_pml_ucx_init_datatype(datatype); +} + +#endif /* PML_UCX_DATATYPE_H_ */ diff --git a/ompi/mca/pml/ucx/pml_ucx_freelist.h b/ompi/mca/pml/ucx/pml_ucx_freelist.h new file mode 100644 index 0000000000..8c16d6e5a4 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_freelist.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PML_UCX_FREELIST_H_ +#define PML_UCX_FREELIST_H_ + +#include "ompi_config.h" +#include "opal/class/opal_free_list.h" + + +#define mca_pml_ucx_freelist_t opal_free_list_t + +#define PML_UCX_FREELIST_GET(_freelist) \ + opal_free_list_get (_freelist) + +#define PML_UCX_FREELIST_RETURN(_freelist, _item) \ + opal_free_list_return(_freelist, _item) + +#define PML_UCX_FREELIST_INIT(_fl, _type, _initial, _max, _batch) \ + opal_free_list_init(_fl, sizeof(_type), 8, OBJ_CLASS(_type), \ + 0, 0, _initial, _max, _batch, NULL, 0, NULL, NULL, NULL) + + +#endif /* PML_UCX_FREELIST_H_ */ diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c new file mode 100644 index 0000000000..e1ad331c90 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -0,0 +1,178 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "pml_ucx_request.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" +#include "ompi/message/message.h" +#include + + +static int mca_pml_ucx_request_free(ompi_request_t **rptr) +{ + ompi_request_t *req = *rptr; + + PML_UCX_VERBOSE(9, "free request *%p=%p", (void*)rptr, (void*)req); + + *rptr = MPI_REQUEST_NULL; + mca_pml_ucx_request_reset(req); + ucp_request_release(req); + return OMPI_SUCCESS; +} + +void mca_pml_ucx_send_completion(void *request, ucs_status_t status) +{ + ompi_request_t *req = request; + + PML_UCX_VERBOSE(8, "send request %p completed with status %s", (void*)req, + ucs_status_string(status)); + + OPAL_THREAD_LOCK(&ompi_request_lock); + mca_pml_ucx_set_send_status(&req->req_status, status); + PML_UCX_ASSERT(!req->req_complete); + ompi_request_complete(req, true); + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +void mca_pml_ucx_recv_completion(void *request, ucs_status_t status, + ucp_tag_recv_info_t *info) +{ + ompi_request_t *req = request; + + PML_UCX_VERBOSE(8, "receive request %p completed with status %s tag %"PRIx64" len %zu", + (void*)req, ucs_status_string(status), info->sender_tag, + info->length); + + OPAL_THREAD_LOCK(&ompi_request_lock); + mca_pml_ucx_set_recv_status(&req->req_status, status, info); + PML_UCX_ASSERT(!req->req_complete); + ompi_request_complete(req, true); + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq, + ompi_request_t *tmp_req) +{ + preq->ompi.req_status = tmp_req->req_status; + ompi_request_complete(&preq->ompi, true); + tmp_req->req_complete_cb_data = NULL; + mca_pml_ucx_request_reset(tmp_req); + ucp_request_release(tmp_req); +} + +static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req) +{ + mca_pml_ucx_persistent_request_t *preq; + + OPAL_THREAD_LOCK(&ompi_request_lock); + ompi_request_complete(tmp_req, false); + preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data; + if (preq != NULL) { + mca_pml_ucx_persistent_requset_complete(preq, tmp_req); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +void mca_pml_ucx_psend_completion(void *request, ucs_status_t status) +{ + ompi_request_t *tmp_req = request; + + PML_UCX_VERBOSE(8, "persistent send request %p completed with status %s", + (void*)tmp_req, ucs_status_string(status)); + + mca_pml_ucx_set_send_status(&tmp_req->req_status, status); + mca_pml_ucx_preq_completion(tmp_req); +} + +void mca_pml_ucx_precv_completion(void *request, ucs_status_t status, + ucp_tag_recv_info_t *info) +{ + ompi_request_t *tmp_req = request; + + PML_UCX_VERBOSE(8, "persistent receive request %p completed with status %s tag %"PRIx64" len %zu", + (void*)tmp_req, ucs_status_string(status), info->sender_tag, + info->length); + + mca_pml_ucx_set_recv_status(&tmp_req->req_status, status, info); + mca_pml_ucx_preq_completion(tmp_req); +} + +static void mca_pml_ucx_request_init_common(ompi_request_t* ompi_req, + bool req_persistent, + ompi_request_state_t state, + ompi_request_free_fn_t req_free, + ompi_request_cancel_fn_t req_cancel) +{ + OMPI_REQUEST_INIT(ompi_req, req_persistent); + ompi_req->req_type = OMPI_REQUEST_PML; + ompi_req->req_state = state; + ompi_req->req_complete_cb = NULL; + ompi_req->req_complete_cb_data = NULL; + ompi_req->req_free = req_free; + ompi_req->req_cancel = req_cancel; +} + +void mca_pml_ucx_request_init(void *request) +{ + ompi_request_t* ompi_req = request; + mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, + mca_pml_ucx_request_free, NULL); +} + +void mca_pml_ucx_request_cleanup(void *request) +{ + ompi_request_t* ompi_req = request; + OMPI_REQUEST_FINI(ompi_req); +} + +static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr) +{ + mca_pml_ucx_persistent_request_t* req = (mca_pml_ucx_persistent_request_t*)*rptr; + + *rptr = MPI_REQUEST_NULL; + req->ompi.req_state = OMPI_REQUEST_INVALID; + PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &req->ompi.super); + return OMPI_SUCCESS; +} + +static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req) +{ + mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE, + mca_pml_ucx_persistent_request_free, NULL); +} + +static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req) +{ + OMPI_REQUEST_FINI(&req->ompi); +} + +OBJ_CLASS_INSTANCE(mca_pml_ucx_persistent_request_t, + ompi_request_t, + mca_pml_ucx_persisternt_request_construct, + mca_pml_ucx_persisternt_request_destruct); + +static int mca_pml_completed_request_free(struct ompi_request_t** rptr) +{ + *rptr = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +static int mca_pml_completed_request_cancel(struct ompi_request_t* ompi_req, int flag) +{ + return OMPI_SUCCESS; +} + +void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req) +{ + mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, + mca_pml_completed_request_free, + mca_pml_completed_request_cancel); + ompi_request_complete(ompi_req, false); + +} + diff --git a/ompi/mca/pml/ucx/pml_ucx_request.h b/ompi/mca/pml/ucx/pml_ucx_request.h new file mode 100644 index 0000000000..dfd91f31e4 --- /dev/null +++ b/ompi/mca/pml/ucx/pml_ucx_request.h @@ -0,0 +1,183 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PML_UCX_REQUEST_H_ +#define PML_UCX_REQUEST_H_ + +#include "pml_ucx.h" +#include "pml_ucx_datatype.h" + + +enum { + MCA_PML_UCX_REQUEST_FLAG_SEND = (1 << 0), /* Persistent send */ + MCA_PML_UCX_REQUEST_FLAG_FREE_CALLED = (1 << 1), + MCA_PML_UCX_REQUEST_FLAG_COMPLETED = (1 << 2) +}; + +/* + * UCX tag structure: + * + * 01234567 01234567 01234567 01234567 01234567 01234567 01234567 01234567 + * | | + * message tag (24) | source rank (24) | context id (16) + * | | + */ +#define PML_UCX_TAG_BITS 24 +#define PML_UCX_RANK_BITS 24 +#define PML_UCX_CONTEXT_BITS 16 + + +#define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \ + ((((uint64_t) (_tag) ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \ + (((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \ + ((uint64_t)(_comm)->c_contextid)) + + +#define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \ + { \ + if ((_src) == MPI_ANY_SOURCE) { \ + _ucp_tag_mask = 0x800000000000fffful; \ + } else { \ + _ucp_tag_mask = 0x800000fffffffffful; \ + } \ + \ + _ucp_tag = (((uint64_t)(_src) & UCS_MASK(PML_UCX_RANK_BITS)) << PML_UCX_CONTEXT_BITS) | \ + (_comm)->c_contextid; \ + \ + if ((_tag) != MPI_ANY_TAG) { \ + _ucp_tag_mask |= 0x7fffff0000000000ul; \ + _ucp_tag |= ((uint64_t)(_tag)) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS); \ + } \ + } + +#define PML_UCX_TAG_GET_SOURCE(_tag) \ + (((_tag) >> PML_UCX_CONTEXT_BITS) & UCS_MASK(PML_UCX_RANK_BITS)) + + +#define PML_UCX_TAG_GET_MPI_TAG(_tag) \ + ((_tag) >> (PML_UCX_CONTEXT_BITS + PML_UCX_RANK_BITS)) + + +#define PML_UCX_MESSAGE_NEW(_comm, _ucp_msg, _info, _message) \ + { \ + struct ompi_message_t *msg = ompi_message_alloc(); \ + if (msg == NULL) { \ + /* TODO release UCP message */ \ + return OMPI_ERR_OUT_OF_RESOURCE; \ + } \ + \ + msg->comm = (_comm); \ + msg->req_ptr = (_ucp_msg); \ + msg->peer = PML_UCX_TAG_GET_SOURCE((_info)->sender_tag); \ + msg->count = (_info)->length; \ + *(_message) = msg; \ + } + + +#define PML_UCX_MESSAGE_RELEASE(_message) \ + { \ + ompi_message_return(*(_message)); \ + *(_message) = NULL; \ + } + + +struct pml_ucx_persistent_request { + ompi_request_t ompi; + unsigned flags; + void *buffer; + size_t count; + ucp_datatype_t datatype; + ucp_tag_t tag; + struct { + mca_pml_base_send_mode_t mode; + ucp_ep_h ep; + } send; + struct { + ucp_tag_t tag_mask; + } recv; +}; + + +void mca_pml_ucx_send_completion(void *request, ucs_status_t status); + +void mca_pml_ucx_recv_completion(void *request, ucs_status_t status, + ucp_tag_recv_info_t *info); + +void mca_pml_ucx_psend_completion(void *request, ucs_status_t status); + +void mca_pml_ucx_precv_completion(void *request, ucs_status_t status, + ucp_tag_recv_info_t *info); + +void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq, + ompi_request_t *tmp_req); + +void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req); + +void mca_pml_ucx_request_init(void *request); + +void mca_pml_ucx_request_cleanup(void *request); + + +static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst) +{ + return ompi_comm_peer_lookup(comm, dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]; +} + +static inline void mca_pml_ucx_request_reset(ompi_request_t *req) +{ + req->req_complete = false; + req->req_status._cancelled = false; +} + +static void mca_pml_ucx_set_send_status(ompi_status_public_t* mpi_status, + ucs_status_t status) +{ + if (status == UCS_OK) { + mpi_status->MPI_ERROR = MPI_SUCCESS; + } else if (status == UCS_ERR_CANCELED) { + mpi_status->_cancelled = true; + } else { + mpi_status->MPI_ERROR = MPI_ERR_INTERN; + } +} + +static inline void mca_pml_ucx_set_recv_status(ompi_status_public_t* mpi_status, + ucs_status_t ucp_status, + const ucp_tag_recv_info_t *info) +{ + int64_t tag; + + if (ucp_status == UCS_OK) { + tag = info->sender_tag; + mpi_status->MPI_ERROR = MPI_SUCCESS; + mpi_status->MPI_SOURCE = PML_UCX_TAG_GET_SOURCE(tag); + mpi_status->MPI_TAG = PML_UCX_TAG_GET_MPI_TAG(tag); + mpi_status->_ucount = info->length; + } else if (ucp_status == UCS_ERR_MESSAGE_TRUNCATED) { + mpi_status->MPI_ERROR = MPI_ERR_TRUNCATE; + } else if (ucp_status == UCS_ERR_CANCELED) { + mpi_status->_cancelled = true; + } else { + mpi_status->MPI_ERROR = MPI_ERR_INTERN; + } +} + +static inline void mca_pml_ucx_set_recv_status_safe(ompi_status_public_t* mpi_status, + ucs_status_t ucp_status, + const ucp_tag_recv_info_t *info) +{ + if (mpi_status != MPI_STATUS_IGNORE) { + mca_pml_ucx_set_recv_status(mpi_status, ucp_status, info); + } +} + +OBJ_CLASS_DECLARATION(mca_pml_ucx_persistent_request_t); + + +#endif /* PML_UCX_REQUEST_H_ */ diff --git a/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c b/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c index dd85f64e09..b8bdc4f4d7 100644 --- a/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c +++ b/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c @@ -17,6 +17,7 @@ #include "oshmem/mca/atomic/atomic.h" #include "oshmem/mca/atomic/base/base.h" #include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" #include "oshmem/runtime/runtime.h" #include "atomic_mxm.h" @@ -75,10 +76,7 @@ int mca_atomic_mxm_cswap(void *target, if (MXM_PTL_SHM == ptl_id) { ptl_id = MXM_PTL_RDMA; } - r_mkey = mca_memheap.memheap_get_cached_mkey(pe, - target, - ptl_id, - &remote_addr); + r_mkey = mca_memheap_base_get_cached_mkey(pe, target, ptl_id, &remote_addr); if (!r_mkey) { ATOMIC_ERROR("[#%d] %p is not address of symmetric variable", my_pe, target); diff --git a/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c b/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c index a58804fa50..eec2183465 100644 --- a/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c +++ b/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c @@ -18,6 +18,7 @@ #include "oshmem/mca/atomic/atomic.h" #include "oshmem/mca/atomic/base/base.h" #include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" #include "oshmem/runtime/runtime.h" #include "atomic_mxm.h" @@ -77,10 +78,7 @@ int mca_atomic_mxm_fadd(void *target, if (MXM_PTL_SHM == ptl_id) { ptl_id = MXM_PTL_RDMA; } - r_mkey = mca_memheap.memheap_get_cached_mkey(pe, - target, - ptl_id, - &remote_addr); + r_mkey = mca_memheap_base_get_cached_mkey(pe, target, ptl_id, &remote_addr); if (!r_mkey) { ATOMIC_ERROR("[#%d] %p is not address of symmetric variable", my_pe, target); diff --git a/oshmem/mca/atomic/ucx/Makefile.am b/oshmem/mca/atomic/ucx/Makefile.am new file mode 100644 index 0000000000..57f8b552b7 --- /dev/null +++ b/oshmem/mca/atomic/ucx/Makefile.am @@ -0,0 +1,42 @@ +# +# Copyright (c) 2013 Mellanox Technologies, Inc. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +AM_CPPFLAGS = $(atomic_ucx_CPPFLAGS) + +ucx_sources = \ + atomic_ucx.h \ + atomic_ucx_module.c \ + atomic_ucx_component.c \ + atomic_ucx_fadd.c \ + atomic_ucx_cswap.c + + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_oshmem_atomic_ucx_DSO +component_noinst = +component_install = mca_atomic_ucx.la +else +component_noinst = libmca_atomic_ucx.la +component_install = +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_atomic_ucx_la_SOURCES = $(ucx_sources) +mca_atomic_ucx_la_LIBADD = $(atomic_ucx_LIBS) +mca_atomic_ucx_la_LDFLAGS = -module -avoid-version $(atomic_ucx_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_atomic_ucx_la_SOURCES =$(ucx_sources) +libmca_atomic_ucx_la_LDFLAGS = -module -avoid-version $(atomic_ucx_LDFLAGS) + diff --git a/oshmem/mca/atomic/ucx/atomic_ucx.h b/oshmem/mca/atomic/ucx/atomic_ucx.h new file mode 100644 index 0000000000..4db6008c1f --- /dev/null +++ b/oshmem/mca/atomic/ucx/atomic_ucx.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_ATOMIC_UCX_H +#define MCA_ATOMIC_UCX_H + +#include "oshmem_config.h" + +#include "opal/mca/mca.h" +#include "oshmem/mca/atomic/atomic.h" +#include "oshmem/util/oshmem_util.h" + +/* This component does uses SPML:UCX */ +#include "oshmem/mca/spml/ucx/spml_ucx.h" + + +BEGIN_C_DECLS + +/* Globally exported variables */ + +OSHMEM_MODULE_DECLSPEC extern mca_atomic_base_component_1_0_0_t +mca_atomic_ucx_component; + +/* this component works with spml:ucx only */ +extern mca_spml_ucx_t *mca_spml_self; + +OSHMEM_DECLSPEC void atomic_ucx_lock(int pe); +OSHMEM_DECLSPEC void atomic_ucx_unlock(int pe); + +/* API functions */ + +int mca_atomic_ucx_init(bool enable_progress_threads, bool enable_threads); +int mca_atomic_ucx_finalize(void); +mca_atomic_base_module_t* +mca_atomic_ucx_query(int *priority); + +int mca_atomic_ucx_fadd(void *target, + void *prev, + const void *value, + size_t nlong, + int pe, + struct oshmem_op_t *op); +int mca_atomic_ucx_cswap(void *target, + void *prev, + const void *cond, + const void *value, + size_t nlong, + int pe); + +struct mca_atomic_ucx_module_t { + mca_atomic_base_module_t super; +}; +typedef struct mca_atomic_ucx_module_t mca_atomic_ucx_module_t; +OBJ_CLASS_DECLARATION(mca_atomic_ucx_module_t); + +END_C_DECLS + +#endif /* MCA_ATOMIC_MXM_H */ diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_component.c b/oshmem/mca/atomic/ucx/atomic_ucx_component.c new file mode 100644 index 0000000000..f5580e3269 --- /dev/null +++ b/oshmem/mca/atomic/ucx/atomic_ucx_component.c @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "oshmem_config.h" + +#include "oshmem/constants.h" +#include "oshmem/mca/atomic/atomic.h" +#include "oshmem/mca/atomic/base/base.h" +#include "oshmem/mca/spml/base/base.h" + +#include "atomic_ucx.h" + + +/* + * Public string showing the scoll ucx component version number + */ +const char *mca_atomic_ucx_component_version_string = +"Open SHMEM ucx atomic MCA component version " OSHMEM_VERSION; + +/* + * Global variable + */ +mca_spml_ucx_t *mca_spml_self = NULL; + +/* + * Local function + */ +static int ucx_register(void); +static int ucx_open(void); + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ + +mca_atomic_base_component_t mca_atomic_ucx_component = { + + /* First, the mca_component_t struct containing meta information + about the component itself */ + + { + MCA_ATOMIC_BASE_VERSION_2_0_0, + + /* Component name and version */ + "ucx", + OSHMEM_MAJOR_VERSION, + OSHMEM_MINOR_VERSION, + OSHMEM_RELEASE_VERSION, + + /* component open */ + ucx_open, + /* component close */ + NULL, + /* component query */ + NULL, + /* component register */ + ucx_register + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, + + /* Initialization / querying functions */ + + mca_atomic_ucx_init, + mca_atomic_ucx_finalize, + mca_atomic_ucx_query +}; + +static int ucx_register(void) +{ + mca_atomic_ucx_component.priority = 100; + mca_base_component_var_register (&mca_atomic_ucx_component.atomic_version, + "priority", "Priority of the atomic:ucx " + "component (default: 100)", MCA_BASE_VAR_TYPE_INT, + NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, + OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_ALL_EQ, + &mca_atomic_ucx_component.priority); + + return OSHMEM_SUCCESS; +} + +static int ucx_open(void) +{ + /* + * This component is able to work using spml:ikrit component only + * (this check is added instead of !mca_spml_ikrit.enabled) + */ + if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) { + ATOMIC_VERBOSE(5, + "Can not use atomic/ucx because spml ikrit component disabled"); + return OSHMEM_ERR_NOT_AVAILABLE; + } + mca_spml_self = (mca_spml_ucx_t *)mca_spml.self; + + return OSHMEM_SUCCESS; +} + +OBJ_CLASS_INSTANCE(mca_atomic_ucx_module_t, + mca_atomic_base_module_t, + NULL, + NULL); + diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c new file mode 100644 index 0000000000..b2d028c44d --- /dev/null +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2013 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "oshmem_config.h" +#include +#include + +#include "oshmem/constants.h" +#include "oshmem/mca/spml/spml.h" +#include "oshmem/mca/atomic/atomic.h" +#include "oshmem/mca/atomic/base/base.h" +#include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" +#include "oshmem/runtime/runtime.h" + +#include "atomic_ucx.h" + +int mca_atomic_ucx_cswap(void *target, + void *prev, + const void *cond, + const void *value, + size_t nlong, + int pe) +{ + return OSHMEM_SUCCESS; +} + diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c new file mode 100644 index 0000000000..f3ea2e49a6 --- /dev/null +++ b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2013 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "oshmem_config.h" +#include +#include + +#include "oshmem/constants.h" +#include "oshmem/op/op.h" +#include "oshmem/mca/spml/spml.h" +#include "oshmem/mca/atomic/atomic.h" +#include "oshmem/mca/atomic/base/base.h" +#include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" +#include "oshmem/runtime/runtime.h" + +#include "atomic_ucx.h" + +int mca_atomic_ucx_fadd(void *target, + void *prev, + const void *value, + size_t nlong, + int pe, + struct oshmem_op_t *op) +{ + /* TODO: actual code */ + return OSHMEM_SUCCESS; +} diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_module.c b/oshmem/mca/atomic/ucx/atomic_ucx_module.c new file mode 100644 index 0000000000..0b570043a6 --- /dev/null +++ b/oshmem/mca/atomic/ucx/atomic_ucx_module.c @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "oshmem_config.h" +#include + +#include "oshmem/constants.h" +#include "oshmem/mca/atomic/atomic.h" +#include "oshmem/mca/spml/spml.h" +#include "oshmem/mca/memheap/memheap.h" +#include "oshmem/proc/proc.h" +#include "atomic_ucx.h" + +/* + * Initial query function that is invoked during initialization, allowing + * this module to indicate what level of thread support it provides. + */ +int mca_atomic_ucx_init(bool enable_progress_threads, bool enable_threads) +{ + return OSHMEM_SUCCESS; +} + +int mca_atomic_ucx_finalize(void) +{ + return OSHMEM_SUCCESS; +} + +mca_atomic_base_module_t * +mca_atomic_ucx_query(int *priority) +{ + mca_atomic_ucx_module_t *module; + + *priority = mca_atomic_ucx_component.priority; + + module = OBJ_NEW(mca_atomic_ucx_module_t); + if (module) { + module->super.atomic_fadd = mca_atomic_ucx_fadd; + module->super.atomic_cswap = mca_atomic_ucx_cswap; + return &(module->super); + } + + return NULL ; +} + diff --git a/oshmem/mca/atomic/ucx/configure.m4 b/oshmem/mca/atomic/ucx/configure.m4 new file mode 100644 index 0000000000..53b057da3f --- /dev/null +++ b/oshmem/mca/atomic/ucx/configure.m4 @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +# MCA_oshmem_atomic_ucx_CONFIG([action-if-can-compile], +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_oshmem_atomic_ucx_CONFIG],[ + AC_CONFIG_FILES([oshmem/mca/atomic/ucx/Makefile]) + + OMPI_CHECK_UCX([atomic_ucx], + [atomic_ucx_happy="yes"], + [atomic_ucx_happy="no"]) + + AS_IF([test "$atomic_ucx_happy" = "yes"], + [$1], + [$2]) + + + # substitute in the things needed to build ucx + AC_SUBST([atomic_ucx_CFLAGS]) + AC_SUBST([atomic_ucx_CPPFLAGS]) + AC_SUBST([atomic_ucx_LDFLAGS]) + AC_SUBST([atomic_ucx_LIBS]) +])dnl + diff --git a/oshmem/mca/memheap/base/base.h b/oshmem/mca/memheap/base/base.h index 1a9cbad5ef..34e92346d5 100644 --- a/oshmem/mca/memheap/base/base.h +++ b/oshmem/mca/memheap/base/base.h @@ -72,10 +72,11 @@ OSHMEM_DECLSPEC uint64_t mca_memheap_base_find_offset(int pe, OSHMEM_DECLSPEC int mca_memheap_base_is_symmetric_addr(const void* va); OSHMEM_DECLSPEC sshmem_mkey_t *mca_memheap_base_get_mkey(void* va, int tr_id); -OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe, - void* va, - int btl_id, - void** rva); +OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, + int pe, + void* va, + int btl_id, + void** rva); OSHMEM_DECLSPEC void mca_memheap_modex_recv_all(void); /* This function is for internal usage only @@ -147,6 +148,84 @@ OSHMEM_DECLSPEC extern mca_base_framework_t oshmem_memheap_base_framework; oshmem_output_verbose(0, oshmem_memheap_base_framework.framework_output, \ "Warning %s:%d - %s()", __SPML_FILE__, __LINE__, __func__, __VA_ARGS__) +extern int mca_memheap_seg_cmp(const void *k, const void *v); + +/* Turn ON/OFF debug output from build (default 0) */ +#ifndef MEMHEAP_BASE_DEBUG +#define MEMHEAP_BASE_DEBUG 0 +#endif +#define MEMHEAP_VERBOSE_FASTPATH(...) + +extern mca_memheap_map_t* memheap_map; + +static inline map_segment_t *memheap_find_va(const void* va) +{ + map_segment_t *s; + + if (OPAL_LIKELY((uintptr_t)va >= (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].seg_base_addr && + (uintptr_t)va < (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].end)) { + s = &memheap_map->mem_segs[HEAP_SEG_INDEX]; + } else { + s = bsearch(va, + &memheap_map->mem_segs[SYMB_SEG_INDEX], + memheap_map->n_segments - 1, + sizeof(*s), + mca_memheap_seg_cmp); + } + +#if MEMHEAP_BASE_DEBUG == 1 + if (s) { + MEMHEAP_VERBOSE(5, "match seg#%02ld: 0x%llX - 0x%llX %llu bytes va=%p", + s - memheap_map->mem_segs, + (long long)s->seg_base_addr, + (long long)s->end, + (long long)(s->end - s->seg_base_addr), + (void *)va); + } +#endif + return s; +} + +static inline void* memheap_va2rva(void* va, void* local_base, void* remote_base) +{ + return (void*) (remote_base > local_base ? + (uintptr_t)va + ((uintptr_t)remote_base - (uintptr_t)local_base) : + (uintptr_t)va - ((uintptr_t)local_base - (uintptr_t)remote_base)); +} + +static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe, + void* va, + int btl_id, + void** rva) +{ + map_segment_t *s; + sshmem_mkey_t *mkey; + + MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p", pe, va); + s = memheap_find_va(va); + if (OPAL_UNLIKELY(NULL == s)) + return NULL ; + + if (OPAL_UNLIKELY(!MAP_SEGMENT_IS_VALID(s))) + return NULL ; + + if (OPAL_UNLIKELY(pe == oshmem_my_proc_id())) { + *rva = va; + MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (local) %lx %p", pe, va, + s->mkeys[btl_id].u.key, *rva); + return &s->mkeys[btl_id]; + } + + if (OPAL_LIKELY(s->mkeys_cache[pe])) { + mkey = &s->mkeys_cache[pe][btl_id]; + *rva = memheap_va2rva(va, s->seg_base_addr, mkey->va_base); + MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (cached) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva); + return mkey; + } + + return mca_memheap_base_get_cached_mkey_slow(s, pe, va, btl_id, rva); +} + END_C_DECLS #endif /* MCA_MEMHEAP_BASE_H */ diff --git a/oshmem/mca/memheap/base/memheap_base_mkey.c b/oshmem/mca/memheap/base/memheap_base_mkey.c index 3a6b544bb1..95b770e6d4 100644 --- a/oshmem/mca/memheap/base/memheap_base_mkey.c +++ b/oshmem/mca/memheap/base/memheap_base_mkey.c @@ -55,9 +55,7 @@ struct oob_comm { opal_list_t req_list; }; -#define MEMHEAP_VERBOSE_FASTPATH(...) - -static mca_memheap_map_t* memheap_map = NULL; +mca_memheap_map_t* memheap_map = NULL; struct oob_comm memheap_oob = {{{0}}}; @@ -70,12 +68,12 @@ static int memheap_oob_get_mkeys(int pe, uint32_t va_seg_num, sshmem_mkey_t *mkey); -static inline void* __seg2base_va(int seg) +static inline void* mca_memheap_seg2base_va(int seg) { return memheap_map->mem_segs[seg].seg_base_addr; } -static int _seg_cmp(const void *k, const void *v) +int mca_memheap_seg_cmp(const void *k, const void *v) { uintptr_t va = (uintptr_t) k; map_segment_t *s = (map_segment_t *) v; @@ -88,34 +86,6 @@ static int _seg_cmp(const void *k, const void *v) return 0; } -static inline map_segment_t *__find_va(const void* va) -{ - map_segment_t *s; - - if (OPAL_LIKELY((uintptr_t)va >= (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].seg_base_addr && - (uintptr_t)va < (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].end)) { - s = &memheap_map->mem_segs[HEAP_SEG_INDEX]; - } else { - s = bsearch(va, - &memheap_map->mem_segs[SYMB_SEG_INDEX], - memheap_map->n_segments - 1, - sizeof(*s), - _seg_cmp); - } - -#if MEMHEAP_BASE_DEBUG == 1 - if (s) { - MEMHEAP_VERBOSE(5, "match seg#%02ld: 0x%llX - 0x%llX %llu bytes va=%p", - s - memheap_map->mem_segs, - (long long)s->seg_base_addr, - (long long)s->end, - (long long)(s->end - s->seg_base_addr), - (void *)va); - } -#endif - return s; -} - /** * @param all_trs * 0 - pack mkeys for transports to given pe @@ -146,7 +116,7 @@ static int pack_local_mkeys(opal_buffer_t *msg, int pe, int seg, int all_trs) else { tr_id = i; } - mkey = mca_memheap_base_get_mkey(__seg2base_va(seg), tr_id); + mkey = mca_memheap_base_get_mkey(mca_memheap_seg2base_va(seg), tr_id); if (!mkey) { MEMHEAP_ERROR("seg#%d tr_id: %d failed to find local mkey", seg, tr_id); @@ -232,6 +202,7 @@ static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe) } cnt = memheap_oob.mkeys[tr_id].len; opal_dss.unpack(msg, memheap_oob.mkeys[tr_id].u.data, &cnt, OPAL_BYTE); + MCA_SPML_CALL(rmkey_unpack(&memheap_oob.mkeys[tr_id], remote_pe)); } else { memheap_oob.mkeys[tr_id].u.key = MAP_SEGMENT_SHM_INVALID; } @@ -510,7 +481,7 @@ static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(pe, seg, mkeys))) { for (i = 0; i < memheap_map->num_transports; i++) { - mkeys[i].va_base = __seg2base_va(seg); + mkeys[i].va_base = mca_memheap_seg2base_va(seg); MEMHEAP_VERBOSE(5, "MKEY CALCULATED BY LOCAL SPML: pe: %d tr_id: %d %s", pe, @@ -720,46 +691,15 @@ exit_fatal: } } -static inline void* va2rva(void* va, - void* local_base, - void* remote_base) +sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, + int pe, + void* va, + int btl_id, + void** rva) { - return (void*) (remote_base > local_base ? - (uintptr_t)va + ((uintptr_t)remote_base - (uintptr_t)local_base) : - (uintptr_t)va - ((uintptr_t)local_base - (uintptr_t)remote_base)); -} - -sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe, - void* va, - int btl_id, - void** rva) -{ - map_segment_t *s; int rc; sshmem_mkey_t *mkey; - MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p", pe, va); - s = __find_va(va); - if (NULL == s) - return NULL ; - - if (!MAP_SEGMENT_IS_VALID(s)) - return NULL ; - - if (pe == oshmem_my_proc_id()) { - *rva = va; - MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (local) %lx %p", pe, va, - s->mkeys[btl_id].u.key, *rva); - return &s->mkeys[btl_id]; - } - - if (OPAL_LIKELY(s->mkeys_cache[pe])) { - mkey = &s->mkeys_cache[pe][btl_id]; - *rva = va2rva(va, s->seg_base_addr, mkey->va_base); - MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (cached) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva); - return mkey; - } - s->mkeys_cache[pe] = (sshmem_mkey_t *) calloc(memheap_map->num_transports, sizeof(sshmem_mkey_t)); if (!s->mkeys_cache[pe]) @@ -772,7 +712,7 @@ sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe, return NULL ; mkey = &s->mkeys_cache[pe][btl_id]; - *rva = va2rva(va, s->seg_base_addr, mkey->va_base); + *rva = memheap_va2rva(va, s->seg_base_addr, mkey->va_base); MEMHEAP_VERBOSE_FASTPATH(5, "rkey: pe=%d va=%p -> (remote lookup) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva); return mkey; @@ -782,7 +722,7 @@ sshmem_mkey_t *mca_memheap_base_get_mkey(void* va, int tr_id) { map_segment_t *s; - s = __find_va(va); + s = memheap_find_va(va); return ((s && MAP_SEGMENT_IS_VALID(s)) ? &s->mkeys[tr_id] : NULL ); } @@ -795,7 +735,7 @@ uint64_t mca_memheap_base_find_offset(int pe, map_segment_t *s; int my_pe = oshmem_my_proc_id(); - s = __find_va(va); + s = memheap_find_va(va); if (my_pe == pe) { return (uintptr_t)va - (uintptr_t)s->seg_base_addr; @@ -807,7 +747,7 @@ uint64_t mca_memheap_base_find_offset(int pe, int mca_memheap_base_is_symmetric_addr(const void* va) { - return (__find_va(va) ? 1 : 0); + return (memheap_find_va(va) ? 1 : 0); } int mca_memheap_base_detect_addr_type(void* va) @@ -815,7 +755,7 @@ int mca_memheap_base_detect_addr_type(void* va) int addr_type = ADDR_INVALID; map_segment_t *s; - s = __find_va(va); + s = memheap_find_va(va); if (s) { if (s->type == MAP_SEGMENT_STATIC) { diff --git a/oshmem/mca/memheap/base/memheap_base_register.c b/oshmem/mca/memheap/base/memheap_base_register.c index 04b8b62037..18da1790f5 100644 --- a/oshmem/mca/memheap/base/memheap_base_register.c +++ b/oshmem/mca/memheap/base/memheap_base_register.c @@ -86,6 +86,7 @@ static int _dereg_segment(map_segment_t *s) continue; if (s->mkeys_cache[j]) { if (s->mkeys_cache[j]->len) { + MCA_SPML_CALL(rmkey_free(s->mkeys_cache[j])); free(s->mkeys_cache[j]->u.data); s->mkeys_cache[j]->len = 0; } diff --git a/oshmem/mca/memheap/buddy/memheap_buddy.c b/oshmem/mca/memheap/buddy/memheap_buddy.c index 9f152b6d8a..a44efc8c8a 100644 --- a/oshmem/mca/memheap/buddy/memheap_buddy.c +++ b/oshmem/mca/memheap/buddy/memheap_buddy.c @@ -33,7 +33,6 @@ mca_memheap_buddy_module_t memheap_buddy = { mca_memheap_buddy_private_alloc, mca_memheap_buddy_private_free, - mca_memheap_base_get_cached_mkey, mca_memheap_base_get_mkey, mca_memheap_base_find_offset, mca_memheap_base_is_symmetric_addr, diff --git a/oshmem/mca/memheap/memheap.h b/oshmem/mca/memheap/memheap.h index 0d0dfa916a..4af948edca 100644 --- a/oshmem/mca/memheap/memheap.h +++ b/oshmem/mca/memheap/memheap.h @@ -66,14 +66,6 @@ typedef uint64_t (*mca_memheap_base_module_find_offset_fn_t)(int pe, void* va, void* rva); -/** - * @return mkey suitable to access pe via given transport id. rva is set to virtual address mapping of (va) - * on remote pe. - */ -typedef sshmem_mkey_t * (*mca_memheap_base_module_get_cached_mkey_fn_t)(int pe, - void* va, - int transport_id, - void** rva); typedef sshmem_mkey_t * (*mca_memheap_base_module_get_local_mkey_fn_t)(void* va, int transport_id); @@ -118,7 +110,6 @@ struct mca_memheap_base_module_t { mca_memheap_base_module_alloc_fn_t memheap_private_alloc; mca_memheap_base_module_free_fn_t memheap_private_free; - mca_memheap_base_module_get_cached_mkey_fn_t memheap_get_cached_mkey; mca_memheap_base_module_get_local_mkey_fn_t memheap_get_local_mkey; mca_memheap_base_module_find_offset_fn_t memheap_find_offset; mca_memheap_base_is_memheap_addr_fn_t memheap_is_symmetric_addr; diff --git a/oshmem/mca/memheap/ptmalloc/memheap_ptmalloc.c b/oshmem/mca/memheap/ptmalloc/memheap_ptmalloc.c index 7c2cf62375..0a993eaddc 100644 --- a/oshmem/mca/memheap/ptmalloc/memheap_ptmalloc.c +++ b/oshmem/mca/memheap/ptmalloc/memheap_ptmalloc.c @@ -31,7 +31,6 @@ mca_memheap_ptmalloc_module_t memheap_ptmalloc = { mca_memheap_ptmalloc_alloc, mca_memheap_ptmalloc_free, - mca_memheap_base_get_cached_mkey, mca_memheap_base_get_mkey, mca_memheap_base_find_offset, mca_memheap_base_is_symmetric_addr, diff --git a/oshmem/mca/spml/base/base.h b/oshmem/mca/spml/base/base.h index 8617c1dfbf..7c1d7a104c 100644 --- a/oshmem/mca/spml/base/base.h +++ b/oshmem/mca/spml/base/base.h @@ -69,6 +69,9 @@ OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys); +OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, int pe); +OSHMEM_DECLSPEC void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey); + /* * MCA framework */ diff --git a/oshmem/mca/spml/base/spml_base.c b/oshmem/mca/spml/base/spml_base.c index 12c923d985..20a4a49f77 100644 --- a/oshmem/mca/spml/base/spml_base.c +++ b/oshmem/mca/spml/base/spml_base.c @@ -158,3 +158,10 @@ int mca_spml_base_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) return OSHMEM_ERROR; } +void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, int pe) +{ +} + +void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey) +{ +} diff --git a/oshmem/mca/spml/ikrit/spml_ikrit.c b/oshmem/mca/spml/ikrit/spml_ikrit.c index 274b09b0d1..7bda408b55 100644 --- a/oshmem/mca/spml/ikrit/spml_ikrit.c +++ b/oshmem/mca/spml/ikrit/spml_ikrit.c @@ -204,6 +204,9 @@ mca_spml_ikrit_t mca_spml_ikrit = { mca_spml_base_wait, mca_spml_base_wait_nb, mca_spml_ikrit_fence, + mca_spml_base_rmkey_unpack, + mca_spml_base_rmkey_free, + (void*)&mca_spml_ikrit } }; @@ -776,10 +779,7 @@ static int mca_spml_ikrit_get_helper(mxm_send_req_t *sreq, /** * Get the address to the remote rkey. **/ - r_mkey = mca_memheap.memheap_get_cached_mkey(src, - src_addr, - ptl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, ptl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", src, src_addr); @@ -826,10 +826,7 @@ static inline int mca_spml_ikrit_get_shm(void *src_addr, if (ptl_id != MXM_PTL_SHM) return OSHMEM_ERROR; - r_mkey = mca_memheap.memheap_get_cached_mkey(src, - src_addr, - ptl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, ptl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", src, src_addr); @@ -1064,10 +1061,7 @@ static inline int mca_spml_ikrit_put_internal(void* dst_addr, ptl_id = get_ptl_id(dst); /* Get rkey of remote PE (dst proc) which must be on memheap */ - r_mkey = mca_memheap.memheap_get_cached_mkey(dst, - dst_addr, - ptl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", dst, dst_addr); @@ -1091,10 +1085,7 @@ static inline int mca_spml_ikrit_put_internal(void* dst_addr, } /* segment not mapped - fallback to rmda */ ptl_id = MXM_PTL_RDMA; - r_mkey = mca_memheap.memheap_get_cached_mkey(dst, - dst_addr, - ptl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", dst, dst_addr); @@ -1206,11 +1197,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr, ptl_id = get_ptl_id(dst); /* Get rkey of remote PE (dst proc) which must be on memheap */ - r_mkey = mca_memheap.memheap_get_cached_mkey(dst, - //(unsigned long) dst_addr, - dst_addr, - ptl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", dst, dst_addr); @@ -1233,7 +1220,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr, } /* segment not mapped - fallback to rmda */ ptl_id = MXM_PTL_RDMA; - r_mkey = mca_memheap.memheap_get_cached_mkey(dst, + r_mkey = mca_memheap_base_get_cached_mkey(dst, //(unsigned long) dst_addr, dst_addr, ptl_id, diff --git a/oshmem/mca/spml/ikrit/spml_ikrit_component.h b/oshmem/mca/spml/ikrit/spml_ikrit_component.h index 3c8ef8ffe3..62785f5919 100644 --- a/oshmem/mca/spml/ikrit/spml_ikrit_component.h +++ b/oshmem/mca/spml/ikrit/spml_ikrit_component.h @@ -11,8 +11,8 @@ * @file */ -#ifndef MCA_SPML_YODA_COMPONENT_H -#define MCA_SPML_YODA_COMPONENT_H +#ifndef MCA_SPML_IKRIT_COMPONENT_H +#define MCA_SPML_IKRIT_COMPONENT_H BEGIN_C_DECLS diff --git a/oshmem/mca/spml/spml.h b/oshmem/mca/spml/spml.h index 4f9940456e..bd9e996d23 100644 --- a/oshmem/mca/spml/spml.h +++ b/oshmem/mca/spml/spml.h @@ -111,6 +111,20 @@ typedef int (*mca_spml_base_module_wait_fn_t)(void* addr, void* value, int datatype); +/** + * deserialize remote mkey + * + * @param mkey remote mkey + */ +typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(sshmem_mkey_t *, int remote_pe); + +/** + * free resources used by deserialized remote mkey + * + * @param mkey remote mkey + */ +typedef void (*mca_spml_base_module_mkey_free_fn_t)(sshmem_mkey_t *); + /** * Register (Pinn) a buffer of 'size' bits starting in address addr * @@ -267,6 +281,8 @@ struct mca_spml_base_module_1_0_0_t { mca_spml_base_module_wait_nb_fn_t spml_wait_nb; mca_spml_base_module_fence_fn_t spml_fence; + mca_spml_base_module_mkey_unpack_fn_t spml_rmkey_unpack; + mca_spml_base_module_mkey_free_fn_t spml_rmkey_free; void *self; }; diff --git a/oshmem/mca/spml/ucx/Makefile.am b/oshmem/mca/spml/ucx/Makefile.am new file mode 100644 index 0000000000..fdb86de492 --- /dev/null +++ b/oshmem/mca/spml/ucx/Makefile.am @@ -0,0 +1,42 @@ +# +# Copyright (c) 2015 Mellanox Technologies, Inc. +# All rights reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +dist_ompidata_DATA = + +AM_CPPFLAGS = $(spml_ucx_CPPFLAGS) + +ucx_sources = \ + spml_ucx_component.h \ + spml_ucx_component.c \ + spml_ucx.h \ + spml_ucx.c + + + +if MCA_BUILD_oshmem_spml_ucx_DSO +component_noinst = +component_install = mca_spml_ucx.la +else +component_noinst = libmca_spml_ucx.la +component_install = +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_spml_ucx_la_SOURCES = $(ucx_sources) +mca_spml_ucx_la_LIBADD = $(spml_ucx_LIBS) +mca_spml_ucx_la_LDFLAGS = -module -avoid-version $(spml_ucx_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_spml_ucx_la_SOURCES = $(ucx_sources) +libmca_spml_ucx_la_LIBADD = $(spml_ucx_LIBS) +libmca_spml_ucx_la_LDFLAGS = -module -avoid-version $(spml_ucx_LDFLAGS) + diff --git a/oshmem/mca/spml/ucx/configure.m4 b/oshmem/mca/spml/ucx/configure.m4 new file mode 100644 index 0000000000..135d2c9842 --- /dev/null +++ b/oshmem/mca/spml/ucx/configure.m4 @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +# MCA_oshmem_spml_ucx_CONFIG([action-if-can-compile], +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_oshmem_spml_ucx_CONFIG],[ + AC_CONFIG_FILES([oshmem/mca/spml/ucx/Makefile]) + + OMPI_CHECK_UCX([spml_ucx], + [spml_ucx_happy="yes"], + [spml_ucx_happy="no"]) + + AS_IF([test "$spml_ucx_happy" = "yes"], + [$1], + [$2]) + + + # substitute in the things needed to build ucx + AC_SUBST([spml_ucx_CFLAGS]) + AC_SUBST([spml_ucx_CPPFLAGS]) + AC_SUBST([spml_ucx_LDFLAGS]) + AC_SUBST([spml_ucx_LIBS]) +])dnl + diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c new file mode 100644 index 0000000000..b34e2561b5 --- /dev/null +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -0,0 +1,481 @@ +/* + * Copyright (c) 2013 Mellanox Technologies, Inc. + * All rights reserved. + * Copyright (c) 2014 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#define _GNU_SOURCE +#include + +#include +#include +#include + +#include "oshmem_config.h" +#include "opal/datatype/opal_convertor.h" +#include "orte/include/orte/types.h" +#include "orte/runtime/orte_globals.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/mca/pml/pml.h" + + +#include "oshmem/mca/spml/ucx/spml_ucx.h" +#include "oshmem/include/shmem.h" +#include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" +#include "oshmem/proc/proc.h" +#include "oshmem/mca/spml/base/base.h" +#include "oshmem/mca/spml/base/spml_base_putreq.h" +#include "oshmem/runtime/runtime.h" +#include "orte/util/show_help.h" + +#include "oshmem/mca/spml/ucx/spml_ucx_component.h" + +/* Turn ON/OFF debug output from build (default 0) */ +#ifndef SPML_UCX_PUT_DEBUG +#define SPML_UCX_PUT_DEBUG 0 +#endif + + +mca_spml_ucx_t mca_spml_ucx = { + { + /* Init mca_spml_base_module_t */ + mca_spml_ucx_add_procs, + mca_spml_ucx_del_procs, + mca_spml_ucx_enable, + mca_spml_ucx_register, + mca_spml_ucx_deregister, + mca_spml_base_oob_get_mkeys, + mca_spml_ucx_put, + NULL, //mca_spml_ucx_put_nb, + mca_spml_ucx_get, + mca_spml_ucx_recv, + mca_spml_ucx_send, + mca_spml_base_wait, + mca_spml_base_wait_nb, + mca_spml_ucx_quiet, /* At the moment fence is the same as quite for + every spml */ + mca_spml_ucx_rmkey_unpack, + mca_spml_ucx_rmkey_free, + (void*)&mca_spml_ucx + } +}; + +int mca_spml_ucx_enable(bool enable) +{ + SPML_VERBOSE(50, "*** ucx ENABLED ****"); + if (false == enable) { + return OSHMEM_SUCCESS; + } + + mca_spml_ucx.enabled = true; + + return OSHMEM_SUCCESS; +} + +int mca_spml_ucx_del_procs(oshmem_proc_t** procs, size_t nprocs) +{ + size_t i, n; + int my_rank = oshmem_my_proc_id(); + + oshmem_shmem_barrier(); + + if (!mca_spml_ucx.ucp_peers) { + return OSHMEM_SUCCESS; + } + + for (n = 0; n < nprocs; n++) { + i = (my_rank + n) % nprocs; + if (mca_spml_ucx.ucp_peers[i].ucp_conn) { + ucp_ep_destroy(mca_spml_ucx.ucp_peers[i].ucp_conn); + } + } + + free(mca_spml_ucx.ucp_peers); + return OSHMEM_SUCCESS; +} + +/* TODO: move func into common place, use it with rkey exchng too */ +static int oshmem_shmem_xchng( + void *local_data, int local_size, int nprocs, + void **rdata_p, int **roffsets_p, int **rsizes_p) +{ + int *rcv_sizes = NULL; + int *rcv_offsets = NULL; + void *rcv_buf = NULL; + int rc; + int i; + + /* do llgatherv */ + rcv_offsets = malloc(nprocs * sizeof(*rcv_offsets)); + if (NULL == rcv_offsets) { + goto err; + } + + /* todo: move into separate function. do allgatherv */ + rcv_sizes = malloc(nprocs * sizeof(*rcv_sizes)); + if (NULL == rcv_sizes) { + goto err; + } + + rc = oshmem_shmem_allgather(&local_size, rcv_sizes, sizeof(int)); + if (MPI_SUCCESS != rc) { + goto err; + } + + /* calculate displacements */ + rcv_offsets[0] = 0; + for (i = 1; i < nprocs; i++) { + rcv_offsets[i] = rcv_offsets[i - 1] + rcv_sizes[i - 1]; + } + + rcv_buf = malloc(rcv_offsets[nprocs - 1] + rcv_sizes[nprocs - 1]); + if (NULL == rcv_buf) { + goto err; + } + + rc = oshmem_shmem_allgatherv(local_data, rcv_buf, local_size, rcv_sizes, rcv_offsets); + if (MPI_SUCCESS != rc) { + goto err; + } + + *rdata_p = rcv_buf; + *roffsets_p = rcv_offsets; + *rsizes_p = rcv_sizes; + return OSHMEM_SUCCESS; + +err: + if (rcv_buf) + free(rcv_buf); + if (rcv_offsets) + free(rcv_offsets); + if (rcv_sizes) + free(rcv_sizes); + return OSHMEM_ERROR; +} + +static void dump_address(int pe, char *addr, size_t len) +{ +#ifdef SPML_UCX_DEBUG + int my_rank = oshmem_my_proc_id(); + unsigned i; + + printf("me=%d dest_pe=%d addr=%p len=%d\n", my_rank, pe, addr, len); + for (i = 0; i < len; i++) { + printf("%02X ", (unsigned)0xFF&addr[i]); + } + printf("\n"); +#endif +} + +int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs) +{ + size_t i, n; + int rc = OSHMEM_ERROR; + int my_rank = oshmem_my_proc_id(); + ucs_status_t err; + ucp_address_t *wk_local_addr; + size_t wk_addr_len; + int *wk_roffs, *wk_rsizes; + char *wk_raddrs; + + + mca_spml_ucx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(mca_spml_ucx.ucp_peers))); + if (NULL == mca_spml_ucx.ucp_peers) { + goto error; + } + + err = ucp_worker_get_address(mca_spml_ucx.ucp_worker, &wk_local_addr, &wk_addr_len); + if (err == UCS_OK) { + goto error; + } + dump_address(my_rank, (char *)wk_local_addr, wk_addr_len); + + rc = oshmem_shmem_xchng(wk_local_addr, wk_addr_len, nprocs, + (void **)&wk_raddrs, &wk_roffs, &wk_rsizes); + if (rc != OSHMEM_SUCCESS) { + goto error; + } + + opal_progress_register(spml_ucx_progress); + + /* Get the EP connection requests for all the processes from modex */ + for (n = 0; n < nprocs; ++n) { + i = (my_rank + n) % nprocs; + //if (i == my_rank) continue; + dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]); + err = ucp_ep_create(mca_spml_ucx.ucp_worker, + (ucp_address_t *)(wk_raddrs + wk_roffs[i]), + &mca_spml_ucx.ucp_peers[i].ucp_conn); + if (UCS_OK != err) { + SPML_ERROR("ucp_ep_create failed!!!\n"); + goto error2; + } + } + + ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr); + free(wk_raddrs); + free(wk_rsizes); + free(wk_roffs); + + SPML_VERBOSE(50, "*** ADDED PROCS ***"); + return OSHMEM_SUCCESS; + +error2: + for (i = 0; i < nprocs; ++i) { + if (mca_spml_ucx.ucp_peers[i].ucp_conn) { + ucp_ep_destroy(mca_spml_ucx.ucp_peers[i].ucp_conn); + } + } + if (mca_spml_ucx.ucp_peers) + free(mca_spml_ucx.ucp_peers); + if (wk_raddrs) + free(wk_raddrs); + if (wk_rsizes) + free(wk_rsizes); + if (wk_roffs) + free(wk_roffs); + if (mca_spml_ucx.ucp_peers) + free(mca_spml_ucx.ucp_peers); +error: + rc = OSHMEM_ERR_OUT_OF_RESOURCE; + SPML_ERROR("add procs FAILED rc=%d", rc); + return rc; + +} + +void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey) +{ + spml_ucx_mkey_t *ucx_mkey; + + if (!mkey->spml_context) { + return; + } + ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context); + ucp_rkey_destroy(ucx_mkey->rkey); + free(ucx_mkey); +} + +void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, int pe) +{ + spml_ucx_mkey_t *ucx_mkey; + ucs_status_t err; + + ucx_mkey = (spml_ucx_mkey_t *)malloc(sizeof(*ucx_mkey)); + if (!ucx_mkey) { + SPML_ERROR("not enough memory to allocate mkey"); + goto error_fatal; + } + + err = ucp_ep_rkey_unpack(mca_spml_ucx.ucp_peers[pe].ucp_conn, + mkey->u.data, + &ucx_mkey->rkey); + if (UCS_OK != err) { + SPML_ERROR("failed to unpack rkey"); + goto error_fatal; + } + + mkey->spml_context = ucx_mkey; + return; + +error_fatal: + oshmem_shmem_abort(-1); + return; +} + +sshmem_mkey_t *mca_spml_ucx_register(void* addr, + size_t size, + uint64_t shmid, + int *count) +{ + sshmem_mkey_t *mkeys; + ucs_status_t err; + spml_ucx_mkey_t *ucx_mkey; + size_t len; + + *count = 0; + mkeys = (sshmem_mkey_t *) calloc(1, sizeof(*mkeys)); + if (!mkeys) { + return NULL ; + } + + ucx_mkey = (spml_ucx_mkey_t *)malloc(sizeof(*ucx_mkey)); + if (!ucx_mkey) { + goto error_out; + } + + mkeys[0].spml_context = ucx_mkey; + err = ucp_mem_map(mca_spml_ucx.ucp_context, + &addr, size, 0, &ucx_mkey->mem_h); + if (UCS_OK != err) { + goto error_out1; + } + + err = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h, + &mkeys[0].u.data, &len); + if (UCS_OK != err) { + goto error_unmap; + } + if (len >= 0xffff) { + SPML_ERROR("packed rkey is too long: %llu >= %d", + (unsigned long long)len, + 0xffff); + oshmem_shmem_abort(-1); + } + + err = ucp_ep_rkey_unpack(mca_spml_ucx.ucp_peers[oshmem_group_self->my_pe].ucp_conn, + mkeys[0].u.data, + &ucx_mkey->rkey); + if (UCS_OK != err) { + SPML_ERROR("failed to unpack rkey"); + goto error_unmap; + } + + mkeys[0].len = len; + mkeys[0].va_base = addr; + *count = 1; + return mkeys; + +error_unmap: + ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h); +error_out1: + free(ucx_mkey); +error_out: + free(mkeys); + + return NULL ; +} + +int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) +{ + spml_ucx_mkey_t *ucx_mkey; + + MCA_SPML_CALL(fence()); + if (!mkeys) + return OSHMEM_SUCCESS; + + if (!mkeys[0].spml_context) + return OSHMEM_SUCCESS; + + ucx_mkey = (spml_ucx_mkey_t *)mkeys[0].spml_context; + ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h); + + if (0 < mkeys[0].len) { + ucp_rkey_buffer_release(mkeys[0].u.data); + } + + free(ucx_mkey); + return OSHMEM_SUCCESS; +} + +int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src) +{ + void *rva; + sshmem_mkey_t *r_mkey; + ucs_status_t err; + spml_ucx_mkey_t *ucx_mkey; + + r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva); + if (OPAL_UNLIKELY(!r_mkey)) { + SPML_ERROR("pe=%d: %p is not address of shared variable", + src, src_addr); + oshmem_shmem_abort(-1); + return OSHMEM_ERROR; + } + + ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context); + err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size, + (uint64_t)rva, ucx_mkey->rkey); + + return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR; +} + +int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst) +{ + void *rva; + sshmem_mkey_t *r_mkey; + ucs_status_t err; + spml_ucx_mkey_t *ucx_mkey; + + r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva); + if (OPAL_UNLIKELY(!r_mkey)) { + SPML_ERROR("pe=%d: %p is not address of shared variable", + dst, dst_addr); + oshmem_shmem_abort(-1); + return OSHMEM_ERROR; + } + + ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context); + + err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size, + (uint64_t)rva, ucx_mkey->rkey); + + return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR; +} + +int mca_spml_ucx_fence(void) +{ + ucs_status_t err; + + err = ucp_worker_flush(mca_spml_ucx.ucp_worker); + if (UCS_OK != err) { + SPML_ERROR("fence failed"); + oshmem_shmem_abort(-1); + return OSHMEM_ERROR; + } + return OSHMEM_SUCCESS; +} + +int mca_spml_ucx_quiet(void) +{ + ucs_status_t err; + + err = ucp_worker_flush(mca_spml_ucx.ucp_worker); + if (UCS_OK != err) { + SPML_ERROR("fence failed"); + oshmem_shmem_abort(-1); + return OSHMEM_ERROR; + } + return OSHMEM_SUCCESS; +} + +/* blocking receive */ +int mca_spml_ucx_recv(void* buf, size_t size, int src) +{ + int rc = OSHMEM_SUCCESS; + + rc = MCA_PML_CALL(recv(buf, + size, + &(ompi_mpi_unsigned_char.dt), + src, + 0, + &(ompi_mpi_comm_world.comm), + NULL)); + + return rc; +} + +/* for now only do blocking copy send */ +int mca_spml_ucx_send(void* buf, + size_t size, + int dst, + mca_spml_base_put_mode_t mode) +{ + int rc = OSHMEM_SUCCESS; + + rc = MCA_PML_CALL(send(buf, + size, + &(ompi_mpi_unsigned_char.dt), + dst, + 0, + (mca_pml_base_send_mode_t)mode, + &(ompi_mpi_comm_world.comm))); + + return rc; +} diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h new file mode 100644 index 0000000000..5bc46fe87b --- /dev/null +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2013 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_SPML_UCX_H +#define MCA_SPML_UCX_H + +#include "oshmem_config.h" +#include "oshmem/request/request.h" +#include "oshmem/mca/spml/spml.h" +#include "oshmem/util/oshmem_util.h" +#include "oshmem/mca/spml/base/spml_base_putreq.h" +#include "oshmem/proc/proc.h" +#include "oshmem/mca/spml/base/spml_base_request.h" +#include "oshmem/mca/spml/base/spml_base_getreq.h" + +#include "orte/runtime/orte_globals.h" + +#include + +BEGIN_C_DECLS + +/** + * UCX SPML module + */ +struct ucp_peer { + ucp_ep_h ucp_conn; +}; + +typedef struct ucp_peer ucp_peer_t; + +struct mca_spml_ucx { + mca_spml_base_module_t super; + ucp_context_h ucp_context; + ucp_worker_h ucp_worker; + ucp_peer_t *ucp_peers; + + int priority; /* component priority */ + bool enabled; +}; + +typedef struct mca_spml_ucx mca_spml_ucx_t; + +struct spml_ucx_mkey { + ucp_rkey_h rkey; + ucp_mem_h mem_h; +}; + +typedef struct spml_ucx_mkey spml_ucx_mkey_t; + + +extern mca_spml_ucx_t mca_spml_ucx; + +extern int mca_spml_ucx_enable(bool enable); +extern int mca_spml_ucx_get(void* dst_addr, + size_t size, + void* src_addr, + int src); + +extern int mca_spml_ucx_put(void* dst_addr, + size_t size, + void* src_addr, + int dst); + +extern int mca_spml_ucx_put_nb(void* dst_addr, + size_t size, + void* src_addr, + int dst, + void **handle); + +extern int mca_spml_ucx_recv(void* buf, size_t size, int src); +extern int mca_spml_ucx_send(void* buf, + size_t size, + int dst, + mca_spml_base_put_mode_t mode); + +extern sshmem_mkey_t *mca_spml_ucx_register(void* addr, + size_t size, + uint64_t shmid, + int *count); +extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys); + +extern void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, int pe); +extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey); + +extern int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs); +extern int mca_spml_ucx_del_procs(oshmem_proc_t** procs, size_t nprocs); +extern int mca_spml_ucx_fence(void); +extern int mca_spml_ucx_quiet(void); +extern int spml_ucx_progress(void); + +END_C_DECLS + +#endif + diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c new file mode 100644 index 0000000000..c6d4547418 --- /dev/null +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#define _GNU_SOURCE +#include + +#include +#include + +#include "oshmem_config.h" +#include "orte/util/show_help.h" +#include "shmem.h" +#include "oshmem/runtime/params.h" +#include "oshmem/mca/spml/spml.h" +#include "oshmem/mca/spml/base/base.h" +#include "spml_ucx_component.h" +#include "oshmem/mca/spml/ucx/spml_ucx.h" + +#include "orte/util/show_help.h" +#include "opal/util/opal_environ.h" + +static int mca_spml_ucx_component_register(void); +static int mca_spml_ucx_component_open(void); +static int mca_spml_ucx_component_close(void); +static mca_spml_base_module_t* +mca_spml_ucx_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads); +static int mca_spml_ucx_component_fini(void); +mca_spml_base_component_2_0_0_t mca_spml_ucx_component = { + + /* First, the mca_base_component_t struct containing meta + information about the component itself */ + + { + MCA_SPML_BASE_VERSION_2_0_0, + + "ucx", /* MCA component name */ + OSHMEM_MAJOR_VERSION, /* MCA component major version */ + OSHMEM_MINOR_VERSION, /* MCA component minor version */ + OSHMEM_RELEASE_VERSION, /* MCA component release version */ + mca_spml_ucx_component_open, /* component open */ + mca_spml_ucx_component_close, /* component close */ + NULL, + mca_spml_ucx_component_register + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, + + mca_spml_ucx_component_init, /* component init */ + mca_spml_ucx_component_fini /* component finalize */ + +}; + + +static inline void mca_spml_ucx_param_register_int(const char* param_name, + int default_value, + const char *help_msg, + int *storage) +{ + *storage = default_value; + (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version, + param_name, + help_msg, + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + storage); +} + +static inline void mca_spml_ucx_param_register_string(const char* param_name, + char* default_value, + const char *help_msg, + char **storage) +{ + *storage = default_value; + (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version, + param_name, + help_msg, + MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + storage); +} + +static int mca_spml_ucx_component_register(void) +{ + mca_spml_ucx_param_register_int("priority", 20, + "[integer] ucx priority", + &mca_spml_ucx.priority); + + return OSHMEM_SUCCESS; +} + +int spml_ucx_progress(void) +{ + ucp_worker_progress(mca_spml_ucx.ucp_worker); + return 1; +} + +static int mca_spml_ucx_component_open(void) +{ + ucs_status_t err; + ucp_config_t *ucp_config; + ucp_params_t params; + + err = ucp_config_read("OSHMEM", NULL, &ucp_config); + if (UCS_OK != err) { + return OSHMEM_ERROR; + } + + memset(¶ms, 0, sizeof(params)); + params.features = UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64; + + err = ucp_init(¶ms, ucp_config, &mca_spml_ucx.ucp_context); + ucp_config_release(ucp_config); + if (UCS_OK != err) { + return OSHMEM_ERROR; + } + + return OSHMEM_SUCCESS; +} + +static int mca_spml_ucx_component_close(void) +{ + ucp_cleanup(mca_spml_ucx.ucp_context); + return OSHMEM_SUCCESS; +} + +static int spml_ucx_init(void) +{ + ucs_status_t err; + + err = ucp_worker_create(mca_spml_ucx.ucp_context, UCS_THREAD_MODE_SINGLE, + &mca_spml_ucx.ucp_worker); + if (UCS_OK != err) { + return OSHMEM_ERROR; + } + + return OSHMEM_SUCCESS; +} + +static mca_spml_base_module_t* +mca_spml_ucx_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads) +{ + SPML_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority); + + if ((*priority) > mca_spml_ucx.priority) { + *priority = mca_spml_ucx.priority; + return NULL ; + } + *priority = mca_spml_ucx.priority; + + if (OSHMEM_SUCCESS != spml_ucx_init()) + return NULL ; + + SPML_VERBOSE(50, "*** ucx initialized ****"); + return &mca_spml_ucx.super; +} + +static int mca_spml_ucx_component_fini(void) +{ + opal_progress_unregister(spml_ucx_progress); + + if (mca_spml_ucx.ucp_worker) { + ucp_worker_destroy(mca_spml_ucx.ucp_worker); + } + if(!mca_spml_ucx.enabled) + return OSHMEM_SUCCESS; /* never selected.. return success.. */ + + mca_spml_ucx.enabled = false; /* not anymore */ + return OSHMEM_SUCCESS; +} + diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.h b/oshmem/mca/spml/ucx/spml_ucx_component.h new file mode 100644 index 0000000000..9c532ad477 --- /dev/null +++ b/oshmem/mca/spml/ucx/spml_ucx_component.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2015 Mellanox Technologies, Inc. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_SPML_UCX_COMPONENT_H +#define MCA_SPML_UCX_COMPONENT_H + +BEGIN_C_DECLS + +/* + * SPML module functions. + */ +OSHMEM_MODULE_DECLSPEC extern mca_spml_base_component_2_0_0_t mca_spml_ucx_component; +END_C_DECLS + +#endif diff --git a/oshmem/mca/spml/yoda/spml_yoda.c b/oshmem/mca/spml/yoda/spml_yoda.c index e850fae7f4..a3eba1e921 100644 --- a/oshmem/mca/spml/yoda/spml_yoda.c +++ b/oshmem/mca/spml/yoda/spml_yoda.c @@ -62,6 +62,8 @@ mca_spml_yoda_module_t mca_spml_yoda = { mca_spml_base_wait, mca_spml_base_wait_nb, mca_spml_yoda_fence, + mca_spml_base_rmkey_unpack, + mca_spml_base_rmkey_free, (void *)&mca_spml_yoda } @@ -759,10 +761,7 @@ static inline int mca_spml_yoda_put_internal(void *dst_addr, put_via_send = !(bml_btl->btl->btl_flags & MCA_BTL_FLAGS_PUT); /* Get rkey of remote PE (dst proc) which must be on memheap*/ - r_mkey = mca_memheap.memheap_get_cached_mkey(dst, - dst_addr, - btl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, btl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", dst, dst_addr); @@ -1033,10 +1032,7 @@ int mca_spml_yoda_get(void* src_addr, size_t size, void* dst_addr, int src) (bml_btl->btl->btl_flags & (MCA_BTL_FLAGS_PUT)) ); /* Get rkey of remote PE (src proc) which must be on memheap*/ - r_mkey = mca_memheap.memheap_get_cached_mkey(src, - src_addr, - btl_id, - &rva); + r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, btl_id, &rva); if (!r_mkey) { SPML_ERROR("pe=%d: %p is not address of shared variable", src, src_addr); diff --git a/oshmem/shmem/c/shmem_addr_accessible.c b/oshmem/shmem/c/shmem_addr_accessible.c index ad9c083de6..8fc8ff6076 100644 --- a/oshmem/shmem/c/shmem_addr_accessible.c +++ b/oshmem/shmem/c/shmem_addr_accessible.c @@ -14,6 +14,7 @@ #include "oshmem/runtime/runtime.h" #include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" #if OSHMEM_PROFILING #include "oshmem/include/pshmem.h" @@ -28,8 +29,7 @@ int shmem_addr_accessible(void *addr, int pe) RUNTIME_CHECK_INIT(); - mkey = MCA_MEMHEAP_CALL(get_cached_mkey(pe, addr, - oshmem_get_transport_id(pe), &rva)); + mkey = mca_memheap_base_get_cached_mkey(pe, addr, oshmem_get_transport_id(pe), &rva); return mkey ? 1 : 0; }