1
1

Merge pull request #1008 from alex-mikheev/topic/ucx_support

UCX support for ompi and oshmem
Этот коммит содержится в:
Mike Dubman 2015-10-21 09:33:33 +03:00
родитель 3b0b929883 a313588337
Коммит 4ea13f10f6
41 изменённых файлов: 3096 добавлений и 133 удалений

80
config/ompi_check_ucx.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,80 @@
# -*- shell-script -*-
#
# Copyright (C) Mellanox Technologies Ltd. 2015. ALL RIGHTS RESERVED.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# OMPI_CHECK_UCX(prefix, [action-if-found], [action-if-not-found])
# --------------------------------------------------------
# check if UCX support can be found. sets prefix_{CPPFLAGS,
# LDFLAGS, LIBS} as needed and runs action-if-found if there is
# support, otherwise executes action-if-not-found
AC_DEFUN([OMPI_CHECK_UCX],[
AC_ARG_WITH([ucx],
[AC_HELP_STRING([--with-ucx(=DIR)],
[Build with Unified Communication X library support])])
OPAL_CHECK_WITHDIR([ucx], [$with_ucx], [include/ucp/api/ucp.h])
AC_ARG_WITH([ucx-libdir],
[AC_HELP_STRING([--with-ucx-libdir=DIR],
[Search for Unified Communication X libraries in DIR])])
OPAL_CHECK_WITHDIR([ucx-libdir], [$with_ucx_libdir], [libucp.*])
ompi_check_ucx_$1_save_CPPFLAGS="$CPPFLAGS"
ompi_check_ucx_$1_save_LDFLAGS="$LDFLAGS"
ompi_check_ucx_$1_save_LIBS="$LIBS"
AS_IF([test "$with_ucx" != "no"],
[AS_IF([test ! -z "$with_ucx" -a "$with_ucx" != "yes"],
[
ompi_check_ucx_dir="$with_ucx"
ompi_check_ucx_libdir="$with_ucx/lib"
])
AS_IF([test ! -z "$with_ucx_libdir" -a "$with_ucx_libdir" != "yes"],
[ompi_check_ucx_libdir="$with_ucx_libdir"])
ompi_check_ucx_extra_libs="-L$ompi_check_ucx_libdir"
OPAL_CHECK_PACKAGE([$1],
[ucp/api/ucp.h],
[ucp],
[ucp_cleanup],
[$ompi_check_ucx_extra_libs],
[$ompi_check_ucx_dir],
[$ompi_check_ucx_libdir],
[ompi_check_ucx_happy="yes"],
[ompi_check_ucx_happy="no"])],
[ompi_check_ucx_happy="no"])
CPPFLAGS="$ompi_check_ucx_$1_save_CPPFLAGS"
LDFLAGS="$ompi_check_ucx_$1_save_LDFLAGS"
LIBS="$ompi_check_ucx_$1_save_LIBS"
AC_MSG_CHECKING(for UCX version compatibility)
AC_REQUIRE_CPP
old_CFLAGS="$CFLAGS"
CFLAGS="$CFLAGS -I$ompi_check_ucx_dir/include"
AC_COMPILE_IFELSE(
[AC_LANG_PROGRAM([[#include <uct/api/version.h>]],
[[
]])],
[ompi_ucx_version_ok="yes"],
[ompi_ucx_version_ok="no"])
AC_MSG_RESULT([$ompi_ucx_version_ok])
CFLAGS=$old_CFLAGS
AS_IF([test "$ompi_ucx_version_ok" = "no"], [ompi_check_ucx_happy="no"])
AS_IF([test "$ompi_check_ucx_happy" = "yes"],
[$2],
[AS_IF([test ! -z "$with_ucx" -a "$with_ucx" != "no"],
[AC_MSG_ERROR([UCX support requested but not found. Aborting])])
$3])
])

Просмотреть файл

@ -73,6 +73,7 @@ struct ompi_datatype_t {
void* args; /**< Data description for the user */
void* packed_description; /**< Packed description of the datatype */
uint64_t pml_data; /**< PML-specific information */
/* --- cacheline 6 boundary (384 bytes) --- */
char name[MPI_MAX_OBJECT_NAME];/**< Externally visible name */
/* --- cacheline 7 boundary (448 bytes) --- */

Просмотреть файл

@ -35,6 +35,7 @@ static void __ompi_datatype_allocate( ompi_datatype_t* datatype )
datatype->d_keyhash = NULL;
datatype->name[0] = '\0';
datatype->packed_description = NULL;
datatype->pml_data = 0;
}
static void __ompi_datatype_release(ompi_datatype_t * datatype)

45
ompi/mca/pml/ucx/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
#
# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
AM_CPPFLAGS = $(pml_ucx_CPPFLAGS)
local_sources = \
pml_ucx.h \
pml_ucx.c \
pml_ucx_request.h \
pml_ucx_request.c \
pml_ucx_datatype.h \
pml_ucx_datatype.c \
pml_ucx_freelist.h \
pml_ucx_component.c
if MCA_BUILD_ompi_pml_ucx_DSO
component_noinst =
component_install = mca_pml_ucx.la
else
component_noinst = libmca_pml_ucx.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_pml_ucx_la_SOURCES = $(local_sources)
mca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS)
mca_pml_ucx_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_pml_ucx_la_SOURCES = $(local_sources)
libmca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS)
libmca_pml_ucx_la_LDFLAGS = -module -avoid-version

30
ompi/mca/pml/ucx/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
#
# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
AC_DEFUN([MCA_ompi_pml_ucx_POST_CONFIG], [
AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([PML])])
])
AC_DEFUN([MCA_ompi_pml_ucx_CONFIG], [
AC_CONFIG_FILES([ompi/mca/pml/ucx/Makefile])
OMPI_CHECK_UCX([pml_ucx],
[pml_ucx_happy="yes"],
[pml_ucx_happy="no"])
AS_IF([test "$pml_ucx_happy" = "yes"],
[$1],
[$2])
# substitute in the things needed to build ucx
AC_SUBST([pml_ucx_CPPFLAGS])
AC_SUBST([pml_ucx_LDFLAGS])
AC_SUBST([pml_ucx_LIBS])
])

712
ompi/mca/pml/ucx/pml_ucx.c Обычный файл
Просмотреть файл

@ -0,0 +1,712 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx.h"
#include "opal/runtime/opal.h"
#include "opal/mca/pmix/pmix.h"
#include "ompi/message/message.h"
#include "pml_ucx_request.h"
#include <inttypes.h>
#define PML_UCX_TRACE_SEND(_msg, _buf, _count, _datatype, _dst, _tag, _mode, _comm, ...) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' dst %d tag %d mode %s comm %d '%s'", \
__VA_ARGS__, \
(_buf), (_count), (_datatype)->name, (_dst), (_tag), \
mca_pml_ucx_send_mode_name(_mode), (_comm)->c_contextid, \
(_comm)->c_name);
#define PML_UCX_TRACE_RECV(_msg, _buf, _count, _datatype, _src, _tag, _comm, ...) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' src %d tag %d comm %d '%s'", \
__VA_ARGS__, \
(_buf), (_count), (_datatype)->name, (_src), (_tag), \
(_comm)->c_contextid, (_comm)->c_name);
#define PML_UCX_TRACE_PROBE(_msg, _src, _tag, _comm) \
PML_UCX_VERBOSE(8, _msg " src %d tag %d comm %d '%s'", \
_src, (_tag), (_comm)->c_contextid, (_comm)->c_name);
#define PML_UCX_TRACE_MRECV(_msg, _buf, _count, _datatype, _message) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' msg *%p=%p (%p)", \
(_buf), (_count), (_datatype)->name, (void*)(_message), \
(void*)*(_message), (*(_message))->req_ptr);
#define MODEX_KEY "pml-ucx"
mca_pml_ucx_module_t ompi_pml_ucx = {
{
mca_pml_ucx_add_procs,
mca_pml_ucx_del_procs,
mca_pml_ucx_enable,
NULL,
mca_pml_ucx_add_comm,
mca_pml_ucx_del_comm,
mca_pml_ucx_irecv_init,
mca_pml_ucx_irecv,
mca_pml_ucx_recv,
mca_pml_ucx_isend_init,
mca_pml_ucx_isend,
mca_pml_ucx_send,
mca_pml_ucx_iprobe,
mca_pml_ucx_probe,
mca_pml_ucx_start,
mca_pml_ucx_improbe,
mca_pml_ucx_mprobe,
mca_pml_ucx_imrecv,
mca_pml_ucx_mrecv,
mca_pml_ucx_dump,
NULL, /* FT */
1ul << (PML_UCX_TAG_BITS - 1),
1ul << (PML_UCX_CONTEXT_BITS),
},
NULL,
NULL
};
static int mca_pml_ucx_send_worker_address(void)
{
ucp_address_t *address;
ucs_status_t status;
size_t addrlen;
int rc;
status = ucp_worker_get_address(ompi_pml_ucx.ucp_worker, &address, &addrlen);
if (UCS_OK != status) {
PML_UCX_ERROR("Failed to get worker address");
return OMPI_ERROR;
}
OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL,
&mca_pml_ucx_component.pmlm_version, (void*)address, addrlen);
if (OMPI_SUCCESS != rc) {
PML_UCX_ERROR("Open MPI couldn't distribute EP connection details");
return OMPI_ERROR;
}
ucp_worker_release_address(ompi_pml_ucx.ucp_worker, address);
return OMPI_SUCCESS;
}
static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc,
ucp_address_t **address_p,
size_t *addrlen_p)
{
int ret;
*address_p = NULL;
OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name,
(void**)address_p, addrlen_p);
if (ret < 0) {
PML_UCX_ERROR("Failed to receive EP address");
}
return ret;
}
int mca_pml_ucx_open(void)
{
ucp_params_t params;
ucp_config_t *config;
ucs_status_t status;
PML_UCX_VERBOSE(1, "mca_pml_ucx_open");
/* Read options */
status = ucp_config_read("MPI", NULL, &config);
if (UCS_OK != status) {
return OMPI_ERROR;
}
params.features = UCP_FEATURE_TAG;
params.request_size = sizeof(ompi_request_t);
params.request_init = mca_pml_ucx_request_init;
params.request_cleanup = mca_pml_ucx_request_cleanup;
status = ucp_init(&params, config, &ompi_pml_ucx.ucp_context);
ucp_config_release(config);
if (UCS_OK != status) {
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_close(void)
{
PML_UCX_VERBOSE(1, "mca_pml_ucx_close");
if (ompi_pml_ucx.ucp_context != NULL) {
ucp_cleanup(ompi_pml_ucx.ucp_context);
ompi_pml_ucx.ucp_context = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_init(void)
{
ucs_status_t status;
int rc;
PML_UCX_VERBOSE(1, "mca_pml_ucx_init");
/* TODO check MPI thread mode */
status = ucp_worker_create(ompi_pml_ucx.ucp_context, UCS_THREAD_MODE_SINGLE,
&ompi_pml_ucx.ucp_worker);
if (UCS_OK != status) {
return OMPI_ERROR;
}
rc = mca_pml_ucx_send_worker_address();
if (rc < 0) {
return rc;
}
/* Initialize the free lists */
OBJ_CONSTRUCT(&ompi_pml_ucx.persistent_reqs, mca_pml_ucx_freelist_t);
OBJ_CONSTRUCT(&ompi_pml_ucx.convs, mca_pml_ucx_freelist_t);
/* Create a completed request to be returned from isend */
mca_pml_ucx_completed_request_init(&ompi_pml_ucx.completed_send_req);
opal_progress_register(mca_pml_ucx_progress);
PML_UCX_VERBOSE(2, "created ucp context %p, worker %p",
(void *)ompi_pml_ucx.ucp_context,
(void *)ompi_pml_ucx.ucp_worker);
return OMPI_SUCCESS;
}
int mca_pml_ucx_cleanup(void)
{
PML_UCX_VERBOSE(1, "mca_pml_ucx_cleanup");
opal_progress_unregister(mca_pml_ucx_progress);
OMPI_REQUEST_FINI(&ompi_pml_ucx.completed_send_req);
OBJ_DESTRUCT(&ompi_pml_ucx.convs);
OBJ_DESTRUCT(&ompi_pml_ucx.persistent_reqs);
if (ompi_pml_ucx.ucp_worker) {
ucp_worker_destroy(ompi_pml_ucx.ucp_worker);
ompi_pml_ucx.ucp_worker = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ucp_address_t *address;
ucs_status_t status;
size_t addrlen;
ucp_ep_h ep;
size_t i;
int ret;
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
procs,
nprocs))) {
return ret;
}
for (i = 0; i < nprocs; ++i) {
ret = mca_pml_ucx_recv_worker_address(procs[i], &address, &addrlen);
if (ret < 0) {
return ret;
}
if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
PML_UCX_VERBOSE(3, "already connected to proc. %d", procs[i]->super.proc_name.vpid);
continue;
}
PML_UCX_VERBOSE(2, "connecting to proc. %d", procs[i]->super.proc_name.vpid);
status = ucp_ep_create(ompi_pml_ucx.ucp_worker, address, &ep);
free(address);
if (UCS_OK != status) {
PML_UCX_ERROR("Failed to connect");
return OMPI_ERROR;
}
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ucp_ep_h ep;
size_t i;
for (i = 0; i < nprocs; ++i) {
PML_UCX_VERBOSE(2, "disconnecting from rank %d", procs[i]->super.proc_name.vpid);
ep = procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
if (ep != NULL) {
ucp_ep_destroy(ep);
}
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_enable(bool enable)
{
PML_UCX_FREELIST_INIT(&ompi_pml_ucx.persistent_reqs,
mca_pml_ucx_persistent_request_t,
128, -1, 128);
PML_UCX_FREELIST_INIT(&ompi_pml_ucx.convs,
mca_pml_ucx_convertor_t,
128, -1, 128);
return OMPI_SUCCESS;
}
int mca_pml_ucx_progress(void)
{
static int inprogress = 0;
if (inprogress != 0) {
return 0;
}
++inprogress;
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
--inprogress;
return OMPI_SUCCESS;
}
int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm)
{
return OMPI_SUCCESS;
}
int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm)
{
return OMPI_SUCCESS;
}
int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
mca_pml_ucx_persistent_request_t *req;
req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs);
if (req == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PML_UCX_TRACE_RECV("irecv_init request *%p=%p", buf, count, datatype, src,
tag, comm, (void*)request, (void*)req);
req->ompi.req_state = OMPI_REQUEST_INACTIVE;
req->flags = 0;
req->buffer = buf;
req->count = count;
req->datatype = mca_pml_ucx_get_datatype(datatype);
PML_UCX_MAKE_RECV_TAG(req->tag, req->recv.tag_mask, tag, src, comm);
*request = &req->ompi;
return OMPI_SUCCESS;
}
int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ompi_request_t *req;
PML_UCX_TRACE_RECV("irecv request *%p", buf, count, datatype, src, tag, comm,
(void*)request);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
ucp_tag, ucp_tag_mask,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
*request = req;
return OMPI_SUCCESS;
}
static void
mca_pml_ucx_blocking_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "blocking receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)req, ucs_status_string(status), info->sender_tag,
info->length);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
req->req_complete = true;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ompi_request_t *req;
PML_UCX_TRACE_RECV("%s", buf, count, datatype, src, tag, comm, "recv");
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
ucp_tag, ucp_tag_mask,
mca_pml_ucx_blocking_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
while (!req->req_complete) {
opal_progress();
}
if (mpi_status != MPI_STATUS_IGNORE) {
*mpi_status = req->req_status;
}
req->req_complete = false;
ucp_request_release(req);
return OMPI_SUCCESS;
}
static inline const char *mca_pml_ucx_send_mode_name(mca_pml_base_send_mode_t mode)
{
switch (mode) {
case MCA_PML_BASE_SEND_SYNCHRONOUS:
return "sync";
case MCA_PML_BASE_SEND_COMPLETE:
return "complete";
case MCA_PML_BASE_SEND_BUFFERED:
return "buffered";
case MCA_PML_BASE_SEND_READY:
return "ready";
case MCA_PML_BASE_SEND_STANDARD:
return "standard";
case MCA_PML_BASE_SEND_SIZE:
return "size";
default:
return "unknown";
}
}
int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
mca_pml_ucx_persistent_request_t *req;
req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs);
if (req == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PML_UCX_TRACE_SEND("isend_init request *%p=%p", buf, count, datatype, dst,
tag, mode, comm, (void*)request, (void*)req)
req->ompi.req_state = OMPI_REQUEST_INACTIVE;
req->flags = MCA_PML_UCX_REQUEST_FLAG_SEND;
req->buffer = (void *)buf;
req->count = count;
req->datatype = mca_pml_ucx_get_datatype(datatype);
req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm);
req->send.mode = mode;
req->send.ep = mca_pml_ucx_get_ep(comm, dst);
*request = &req->ompi;
return OMPI_SUCCESS;
}
int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
ompi_request_t *req;
PML_UCX_TRACE_SEND("isend request *%p", buf, count, datatype, dst, tag, mode,
comm, (void*)request)
/* TODO special care to sync/buffered send */
req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count,
mca_pml_ucx_get_datatype(datatype),
PML_UCX_MAKE_SEND_TAG(tag, comm),
mca_pml_ucx_send_completion);
if (req == NULL) {
PML_UCX_VERBOSE(8, "returning completed request");
*request = &ompi_pml_ucx.completed_send_req;
return OMPI_SUCCESS;
} else if (!UCS_PTR_IS_ERR(req)) {
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
*request = req;
return OMPI_SUCCESS;
} else {
PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
}
int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm)
{
ompi_request_t *req;
PML_UCX_TRACE_SEND("%s", buf, count, datatype, dst, tag, mode, comm, "send");
/* TODO special care to sync/buffered send */
req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count,
mca_pml_ucx_get_datatype(datatype),
PML_UCX_MAKE_SEND_TAG(tag, comm),
mca_pml_ucx_send_completion);
if (req == NULL) {
return OMPI_SUCCESS;
} else if (!UCS_PTR_IS_ERR(req)) {
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
ompi_request_wait(&req, MPI_STATUS_IGNORE);
return OMPI_SUCCESS;
} else {
PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
}
int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("iprobe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
0, &info);
if (ucp_msg != NULL) {
*matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else {
*matched = 0;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("probe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
0, &info);
if (ucp_msg != NULL) {
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS;
}
opal_progress();
}
}
int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, struct ompi_message_t **message,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("improbe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
1, &info);
if (ucp_msg != NULL) {
PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message);
PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg);
*matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else if (UCS_PTR_STATUS(ucp_msg) == UCS_ERR_NO_MESSAGE) {
*matched = 0;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("mprobe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
1, &info);
if (ucp_msg != NULL) {
PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message);
PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg);
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS;
}
opal_progress();
}
}
int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request)
{
ompi_request_t *req;
PML_UCX_TRACE_MRECV("imrecv", buf, count, datatype, message);
req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
(*message)->req_ptr,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
PML_UCX_MESSAGE_RELEASE(message);
*request = req;
return OMPI_SUCCESS;
}
int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status)
{
ompi_request_t *req;
PML_UCX_TRACE_MRECV("mrecv", buf, count, datatype, message);
req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
(*message)->req_ptr,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_MESSAGE_RELEASE(message);
ompi_request_wait(&req, status);
return OMPI_SUCCESS;
}
int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
{
mca_pml_ucx_persistent_request_t *preq;
ompi_request_t *tmp_req;
size_t i;
for (i = 0; i < count; ++i) {
preq = (mca_pml_ucx_persistent_request_t *)requests[i];
if ((preq == NULL) || (OMPI_REQUEST_PML != preq->ompi.req_type)) {
/* Skip irrelevant requests */
continue;
}
PML_UCX_ASSERT(preq->ompi.req_state != OMPI_REQUEST_INVALID);
preq->ompi.req_state = OMPI_REQUEST_ACTIVE;
mca_pml_ucx_request_reset(&preq->ompi);
if (preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) {
/* TODO special care to sync/buffered send */
PML_UCX_VERBOSE(8, "start send request %p", (void*)preq);
tmp_req = (ompi_request_t*)ucp_tag_send_nb(preq->send.ep, preq->buffer,
preq->count, preq->datatype,
preq->tag,
mca_pml_ucx_psend_completion);
} else {
PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq);
tmp_req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker,
preq->buffer, preq->count,
preq->datatype, preq->tag,
preq->recv.tag_mask,
mca_pml_ucx_precv_completion);
}
if (tmp_req == NULL) {
/* Only send can complete immediately */
PML_UCX_ASSERT(preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND);
PML_UCX_VERBOSE(8, "send completed immediately, completing persistent request %p",
(void*)preq);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_send_status(&preq->ompi.req_status, UCS_OK);
ompi_request_complete(&preq->ompi, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if (!UCS_PTR_IS_ERR(tmp_req)) {
OPAL_THREAD_LOCK(&ompi_request_lock);
if (tmp_req->req_complete) {
/* tmp_req is already completed */
PML_UCX_VERBOSE(8, "completing persistent request %p", (void*)preq);
mca_pml_ucx_persistent_requset_complete(preq, tmp_req);
} else {
/* tmp_req would be completed by callback and trigger completion
* of preq */
PML_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p",
(void*)tmp_req, (void*)preq);
tmp_req->req_complete_cb_data = preq;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
PML_UCX_ERROR("ucx %s failed: %s",
(preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) ? "send" : "recv",
ucs_status_string(UCS_PTR_STATUS(tmp_req)));
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose)
{
return OMPI_SUCCESS;
}

147
ompi/mca/pml/ucx/pml_ucx.h Обычный файл
Просмотреть файл

@ -0,0 +1,147 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_H_
#define PML_UCX_H_
#include "ompi_config.h"
#include "ompi/request/request.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include <ucp/api/ucp.h>
#include "pml_ucx_freelist.h"
typedef struct mca_pml_ucx_module mca_pml_ucx_module_t;
typedef struct pml_ucx_persistent_request mca_pml_ucx_persistent_request_t;
typedef struct pml_ucx_convertor mca_pml_ucx_convertor_t;
/*
* TODO version check
*/
struct mca_pml_ucx_module {
mca_pml_base_module_t super;
/* UCX global objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
/* Requests */
mca_pml_ucx_freelist_t persistent_reqs;
ompi_request_t completed_send_req;
/* Convertors pool */
mca_pml_ucx_freelist_t convs;
int priority;
int verbose;
int output;
};
extern mca_pml_base_component_2_0_0_t mca_pml_ucx_component;
extern mca_pml_ucx_module_t ompi_pml_ucx;
/* Debugging */
#define PML_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
#if PML_UCX_ENABLE_DEBUG
# define PML_UCX_MAX_VERBOSE 9
# define PML_UCX_ASSERT(_x) assert(_x)
#else
# define PML_UCX_MAX_VERBOSE 2
# define PML_UCX_ASSERT(_x)
#endif
#define _PML_UCX_QUOTE(_x) \
# _x
#define PML_UCX_QUOTE(_x) \
_PML_UCX_QUOTE(_x)
#define PML_UCX_ERROR(...) \
opal_output_verbose(0, ompi_pml_ucx.output, "Error: " __FILE__ ":" \
PML_UCX_QUOTE(__LINE__) __VA_ARGS__)
#define PML_UCX_VERBOSE(_level, ... ) \
if (((_level) <= PML_UCX_MAX_VERBOSE) && ((_level) <= ompi_pml_ucx.verbose)) { \
opal_output_verbose(_level, ompi_pml_ucx.output, __FILE__ ":" \
PML_UCX_QUOTE(__LINE__) __VA_ARGS__); \
}
int mca_pml_ucx_open(void);
int mca_pml_ucx_close(void);
int mca_pml_ucx_init(void);
int mca_pml_ucx_cleanup(void);
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs);
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs);
int mca_pml_ucx_enable(bool enable);
int mca_pml_ucx_progress(void);
int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm);
int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm);
int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* status);
int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm);
int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, ompi_status_public_t* status);
int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* status);
int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request);
int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_start(size_t count, ompi_request_t** requests);
int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose);
#endif /* PML_UCX_H_ */

107
ompi/mca/pml/ucx/pml_ucx_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,107 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx.h"
static int mca_pml_ucx_component_register(void);
static int mca_pml_ucx_component_open(void);
static int mca_pml_ucx_component_close(void);
static mca_pml_base_module_t*
mca_pml_ucx_component_init(int* priority, bool enable_progress_threads,
bool enable_mpi_threads);
static int mca_pml_ucx_component_fini(void);
mca_pml_base_component_2_0_0_t mca_pml_ucx_component = {
/* First, the mca_base_component_t struct containing meta
* information about the component itself */
{
MCA_PML_BASE_VERSION_2_0_0,
"ucx", /* MCA component name */
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
mca_pml_ucx_component_open, /* component open */
mca_pml_ucx_component_close, /* component close */
NULL,
mca_pml_ucx_component_register,
},
{
/* This component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},
mca_pml_ucx_component_init, /* component init */
mca_pml_ucx_component_fini /* component finalize */
};
static int mca_pml_ucx_component_register(void)
{
ompi_pml_ucx.verbose = 0;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "verbose",
"Verbose level of the UCX component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pml_ucx.verbose);
ompi_pml_ucx.priority = 50;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "priority",
"Priority of the UCX component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_3,
MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pml_ucx.priority);
return 0;
}
static int mca_pml_ucx_component_open(void)
{
ompi_pml_ucx.output = opal_output_open(NULL);
opal_output_set_verbosity(ompi_pml_ucx.output, ompi_pml_ucx.verbose);
return mca_pml_ucx_open();
}
static int mca_pml_ucx_component_close(void)
{
int rc;
rc = mca_pml_ucx_close();
if (rc != 0) {
return rc;
}
opal_output_close(ompi_pml_ucx.output);
return 0;
}
static mca_pml_base_module_t*
mca_pml_ucx_component_init(int* priority, bool enable_progress_threads,
bool enable_mpi_threads)
{
int ret;
if ( (ret = mca_pml_ucx_init()) != 0) {
return NULL;
}
*priority = ompi_pml_ucx.priority;
return &ompi_pml_ucx.super;
}
static int mca_pml_ucx_component_fini(void)
{
return mca_pml_ucx_cleanup();
}

157
ompi/mca/pml/ucx/pml_ucx_datatype.c Обычный файл
Просмотреть файл

@ -0,0 +1,157 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx_datatype.h"
#include "ompi/runtime/mpiruntime.h"
#include <inttypes.h>
static void* pml_ucx_generic_datatype_start_pack(void *context, const void *buffer,
size_t count)
{
ompi_datatype_t *datatype = context;
mca_pml_ucx_convertor_t *convertor;
convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs);
OBJ_RETAIN(datatype);
convertor->datatype = datatype;
opal_convertor_copy_and_prepare_for_send(ompi_proc_local_proc->super.proc_convertor,
&datatype->super, count, buffer, 0,
&convertor->opal_conv);
return convertor;
}
static void* pml_ucx_generic_datatype_start_unpack(void *context, void *buffer,
size_t count)
{
ompi_datatype_t *datatype = context;
mca_pml_ucx_convertor_t *convertor;
convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs);
OBJ_RETAIN(datatype);
convertor->datatype = datatype;
opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor,
&datatype->super, count, buffer, 0,
&convertor->opal_conv);
return convertor;
}
static size_t pml_ucx_generic_datatype_packed_size(void *state)
{
mca_pml_ucx_convertor_t *convertor = state;
size_t size;
opal_convertor_get_packed_size(&convertor->opal_conv, &size);
return size;
}
static size_t pml_ucx_generic_datatype_pack(void *state, size_t offset,
void *dest, size_t max_length)
{
mca_pml_ucx_convertor_t *convertor = state;
uint32_t iov_count;
struct iovec iov;
size_t length;
iov_count = 1;
iov.iov_base = dest;
iov.iov_len = max_length;
opal_convertor_set_position(&convertor->opal_conv, &offset);
length = max_length;
opal_convertor_pack(&convertor->opal_conv, &iov, &iov_count, &length);
return length;
}
static ucs_status_t pml_ucx_generic_datatype_unpack(void *state, size_t offset,
const void *src, size_t length)
{
mca_pml_ucx_convertor_t *convertor = state;
uint32_t iov_count;
struct iovec iov;
iov_count = 1;
iov.iov_base = (void*)src;
iov.iov_len = length;
opal_convertor_set_position(&convertor->opal_conv, &offset);
opal_convertor_unpack(&convertor->opal_conv, &iov, &iov_count, &length);
return UCS_OK;
}
static void pml_ucx_generic_datatype_finish(void *state)
{
mca_pml_ucx_convertor_t *convertor = state;
opal_convertor_cleanup(&convertor->opal_conv);
OBJ_RELEASE(convertor->datatype);
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.convs, &convertor->super);
}
static ucp_generic_dt_ops_t pml_ucx_generic_datatype_ops = {
.start_pack = pml_ucx_generic_datatype_start_pack,
.start_unpack = pml_ucx_generic_datatype_start_unpack,
.packed_size = pml_ucx_generic_datatype_packed_size,
.pack = pml_ucx_generic_datatype_pack,
.unpack = pml_ucx_generic_datatype_unpack,
.finish = pml_ucx_generic_datatype_finish
};
ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype)
{
ucp_datatype_t ucp_datatype;
ucs_status_t status;
ptrdiff_t lb;
size_t size;
ompi_datatype_type_lb(datatype, &lb);
if ((datatype->super.flags & OPAL_DATATYPE_FLAG_CONTIGUOUS) &&
(datatype->super.flags & OPAL_DATATYPE_FLAG_NO_GAPS) &&
(lb == 0))
{
ompi_datatype_type_size(datatype, &size);
PML_UCX_ASSERT(size > 0);
datatype->pml_data = ucp_dt_make_contig(size);
return datatype->pml_data;
}
status = ucp_dt_create_generic(&pml_ucx_generic_datatype_ops,
datatype, &ucp_datatype);
if (status != UCS_OK) {
PML_UCX_ERROR("Failed to create UCX datatype for %s", datatype->name);
ompi_mpi_abort(&ompi_mpi_comm_world.comm, 1);
}
PML_UCX_VERBOSE(7, "created generic UCX datatype 0x%"PRIx64, ucp_datatype)
// TODO put this on a list to be destroyed later
datatype->pml_data = ucp_datatype;
return ucp_datatype;
}
static void mca_pml_ucx_convertor_construct(mca_pml_ucx_convertor_t *convertor)
{
OBJ_CONSTRUCT(&convertor->opal_conv, opal_convertor_t);
}
static void mca_pml_ucx_convertor_destruct(mca_pml_ucx_convertor_t *convertor)
{
OBJ_DESTRUCT(&convertor->opal_conv);
}
OBJ_CLASS_INSTANCE(mca_pml_ucx_convertor_t,
opal_free_list_item_t,
mca_pml_ucx_convertor_construct,
mca_pml_ucx_convertor_destruct);

39
ompi/mca/pml/ucx/pml_ucx_datatype.h Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_DATATYPE_H_
#define PML_UCX_DATATYPE_H_
#include "pml_ucx.h"
struct pml_ucx_convertor {
opal_free_list_item_t super;
ompi_datatype_t *datatype;
opal_convertor_t opal_conv;
};
ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype);
OBJ_CLASS_DECLARATION(mca_pml_ucx_convertor_t);
static inline ucp_datatype_t mca_pml_ucx_get_datatype(ompi_datatype_t *datatype)
{
ucp_datatype_t ucp_type = datatype->pml_data;
if (OPAL_LIKELY(ucp_type != 0)) {
return ucp_type;
}
return mca_pml_ucx_init_datatype(datatype);
}
#endif /* PML_UCX_DATATYPE_H_ */

30
ompi/mca/pml/ucx/pml_ucx_freelist.h Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_FREELIST_H_
#define PML_UCX_FREELIST_H_
#include "ompi_config.h"
#include "opal/class/opal_free_list.h"
#define mca_pml_ucx_freelist_t opal_free_list_t
#define PML_UCX_FREELIST_GET(_freelist) \
opal_free_list_get (_freelist)
#define PML_UCX_FREELIST_RETURN(_freelist, _item) \
opal_free_list_return(_freelist, _item)
#define PML_UCX_FREELIST_INIT(_fl, _type, _initial, _max, _batch) \
opal_free_list_init(_fl, sizeof(_type), 8, OBJ_CLASS(_type), \
0, 0, _initial, _max, _batch, NULL, 0, NULL, NULL, NULL)
#endif /* PML_UCX_FREELIST_H_ */

178
ompi/mca/pml/ucx/pml_ucx_request.c Обычный файл
Просмотреть файл

@ -0,0 +1,178 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx_request.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/message/message.h"
#include <inttypes.h>
static int mca_pml_ucx_request_free(ompi_request_t **rptr)
{
ompi_request_t *req = *rptr;
PML_UCX_VERBOSE(9, "free request *%p=%p", (void*)rptr, (void*)req);
*rptr = MPI_REQUEST_NULL;
mca_pml_ucx_request_reset(req);
ucp_request_release(req);
return OMPI_SUCCESS;
}
void mca_pml_ucx_send_completion(void *request, ucs_status_t status)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "send request %p completed with status %s", (void*)req,
ucs_status_string(status));
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_send_status(&req->req_status, status);
PML_UCX_ASSERT(!req->req_complete);
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)req, ucs_status_string(status), info->sender_tag,
info->length);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq,
ompi_request_t *tmp_req)
{
preq->ompi.req_status = tmp_req->req_status;
ompi_request_complete(&preq->ompi, true);
tmp_req->req_complete_cb_data = NULL;
mca_pml_ucx_request_reset(tmp_req);
ucp_request_release(tmp_req);
}
static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req)
{
mca_pml_ucx_persistent_request_t *preq;
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_complete(tmp_req, false);
preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data;
if (preq != NULL) {
mca_pml_ucx_persistent_requset_complete(preq, tmp_req);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_psend_completion(void *request, ucs_status_t status)
{
ompi_request_t *tmp_req = request;
PML_UCX_VERBOSE(8, "persistent send request %p completed with status %s",
(void*)tmp_req, ucs_status_string(status));
mca_pml_ucx_set_send_status(&tmp_req->req_status, status);
mca_pml_ucx_preq_completion(tmp_req);
}
void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *tmp_req = request;
PML_UCX_VERBOSE(8, "persistent receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)tmp_req, ucs_status_string(status), info->sender_tag,
info->length);
mca_pml_ucx_set_recv_status(&tmp_req->req_status, status, info);
mca_pml_ucx_preq_completion(tmp_req);
}
static void mca_pml_ucx_request_init_common(ompi_request_t* ompi_req,
bool req_persistent,
ompi_request_state_t state,
ompi_request_free_fn_t req_free,
ompi_request_cancel_fn_t req_cancel)
{
OMPI_REQUEST_INIT(ompi_req, req_persistent);
ompi_req->req_type = OMPI_REQUEST_PML;
ompi_req->req_state = state;
ompi_req->req_complete_cb = NULL;
ompi_req->req_complete_cb_data = NULL;
ompi_req->req_free = req_free;
ompi_req->req_cancel = req_cancel;
}
void mca_pml_ucx_request_init(void *request)
{
ompi_request_t* ompi_req = request;
mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
mca_pml_ucx_request_free, NULL);
}
void mca_pml_ucx_request_cleanup(void *request)
{
ompi_request_t* ompi_req = request;
OMPI_REQUEST_FINI(ompi_req);
}
static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr)
{
mca_pml_ucx_persistent_request_t* req = (mca_pml_ucx_persistent_request_t*)*rptr;
*rptr = MPI_REQUEST_NULL;
req->ompi.req_state = OMPI_REQUEST_INVALID;
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &req->ompi.super);
return OMPI_SUCCESS;
}
static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req)
{
mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE,
mca_pml_ucx_persistent_request_free, NULL);
}
static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req)
{
OMPI_REQUEST_FINI(&req->ompi);
}
OBJ_CLASS_INSTANCE(mca_pml_ucx_persistent_request_t,
ompi_request_t,
mca_pml_ucx_persisternt_request_construct,
mca_pml_ucx_persisternt_request_destruct);
static int mca_pml_completed_request_free(struct ompi_request_t** rptr)
{
*rptr = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static int mca_pml_completed_request_cancel(struct ompi_request_t* ompi_req, int flag)
{
return OMPI_SUCCESS;
}
void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req)
{
mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
mca_pml_completed_request_free,
mca_pml_completed_request_cancel);
ompi_request_complete(ompi_req, false);
}

183
ompi/mca/pml/ucx/pml_ucx_request.h Обычный файл
Просмотреть файл

@ -0,0 +1,183 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_REQUEST_H_
#define PML_UCX_REQUEST_H_
#include "pml_ucx.h"
#include "pml_ucx_datatype.h"
enum {
MCA_PML_UCX_REQUEST_FLAG_SEND = (1 << 0), /* Persistent send */
MCA_PML_UCX_REQUEST_FLAG_FREE_CALLED = (1 << 1),
MCA_PML_UCX_REQUEST_FLAG_COMPLETED = (1 << 2)
};
/*
* UCX tag structure:
*
* 01234567 01234567 01234567 01234567 01234567 01234567 01234567 01234567
* | |
* message tag (24) | source rank (24) | context id (16)
* | |
*/
#define PML_UCX_TAG_BITS 24
#define PML_UCX_RANK_BITS 24
#define PML_UCX_CONTEXT_BITS 16
#define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \
((((uint64_t) (_tag) ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \
(((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \
((uint64_t)(_comm)->c_contextid))
#define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \
{ \
if ((_src) == MPI_ANY_SOURCE) { \
_ucp_tag_mask = 0x800000000000fffful; \
} else { \
_ucp_tag_mask = 0x800000fffffffffful; \
} \
\
_ucp_tag = (((uint64_t)(_src) & UCS_MASK(PML_UCX_RANK_BITS)) << PML_UCX_CONTEXT_BITS) | \
(_comm)->c_contextid; \
\
if ((_tag) != MPI_ANY_TAG) { \
_ucp_tag_mask |= 0x7fffff0000000000ul; \
_ucp_tag |= ((uint64_t)(_tag)) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS); \
} \
}
#define PML_UCX_TAG_GET_SOURCE(_tag) \
(((_tag) >> PML_UCX_CONTEXT_BITS) & UCS_MASK(PML_UCX_RANK_BITS))
#define PML_UCX_TAG_GET_MPI_TAG(_tag) \
((_tag) >> (PML_UCX_CONTEXT_BITS + PML_UCX_RANK_BITS))
#define PML_UCX_MESSAGE_NEW(_comm, _ucp_msg, _info, _message) \
{ \
struct ompi_message_t *msg = ompi_message_alloc(); \
if (msg == NULL) { \
/* TODO release UCP message */ \
return OMPI_ERR_OUT_OF_RESOURCE; \
} \
\
msg->comm = (_comm); \
msg->req_ptr = (_ucp_msg); \
msg->peer = PML_UCX_TAG_GET_SOURCE((_info)->sender_tag); \
msg->count = (_info)->length; \
*(_message) = msg; \
}
#define PML_UCX_MESSAGE_RELEASE(_message) \
{ \
ompi_message_return(*(_message)); \
*(_message) = NULL; \
}
struct pml_ucx_persistent_request {
ompi_request_t ompi;
unsigned flags;
void *buffer;
size_t count;
ucp_datatype_t datatype;
ucp_tag_t tag;
struct {
mca_pml_base_send_mode_t mode;
ucp_ep_h ep;
} send;
struct {
ucp_tag_t tag_mask;
} recv;
};
void mca_pml_ucx_send_completion(void *request, ucs_status_t status);
void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info);
void mca_pml_ucx_psend_completion(void *request, ucs_status_t status);
void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info);
void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq,
ompi_request_t *tmp_req);
void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req);
void mca_pml_ucx_request_init(void *request);
void mca_pml_ucx_request_cleanup(void *request);
static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
{
return ompi_comm_peer_lookup(comm, dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
}
static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
{
req->req_complete = false;
req->req_status._cancelled = false;
}
static void mca_pml_ucx_set_send_status(ompi_status_public_t* mpi_status,
ucs_status_t status)
{
if (status == UCS_OK) {
mpi_status->MPI_ERROR = MPI_SUCCESS;
} else if (status == UCS_ERR_CANCELED) {
mpi_status->_cancelled = true;
} else {
mpi_status->MPI_ERROR = MPI_ERR_INTERN;
}
}
static inline void mca_pml_ucx_set_recv_status(ompi_status_public_t* mpi_status,
ucs_status_t ucp_status,
const ucp_tag_recv_info_t *info)
{
int64_t tag;
if (ucp_status == UCS_OK) {
tag = info->sender_tag;
mpi_status->MPI_ERROR = MPI_SUCCESS;
mpi_status->MPI_SOURCE = PML_UCX_TAG_GET_SOURCE(tag);
mpi_status->MPI_TAG = PML_UCX_TAG_GET_MPI_TAG(tag);
mpi_status->_ucount = info->length;
} else if (ucp_status == UCS_ERR_MESSAGE_TRUNCATED) {
mpi_status->MPI_ERROR = MPI_ERR_TRUNCATE;
} else if (ucp_status == UCS_ERR_CANCELED) {
mpi_status->_cancelled = true;
} else {
mpi_status->MPI_ERROR = MPI_ERR_INTERN;
}
}
static inline void mca_pml_ucx_set_recv_status_safe(ompi_status_public_t* mpi_status,
ucs_status_t ucp_status,
const ucp_tag_recv_info_t *info)
{
if (mpi_status != MPI_STATUS_IGNORE) {
mca_pml_ucx_set_recv_status(mpi_status, ucp_status, info);
}
}
OBJ_CLASS_DECLARATION(mca_pml_ucx_persistent_request_t);
#endif /* PML_UCX_REQUEST_H_ */

Просмотреть файл

@ -17,6 +17,7 @@
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_mxm.h"
@ -75,10 +76,7 @@ int mca_atomic_mxm_cswap(void *target,
if (MXM_PTL_SHM == ptl_id) {
ptl_id = MXM_PTL_RDMA;
}
r_mkey = mca_memheap.memheap_get_cached_mkey(pe,
target,
ptl_id,
&remote_addr);
r_mkey = mca_memheap_base_get_cached_mkey(pe, target, ptl_id, &remote_addr);
if (!r_mkey) {
ATOMIC_ERROR("[#%d] %p is not address of symmetric variable",
my_pe, target);

Просмотреть файл

@ -18,6 +18,7 @@
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_mxm.h"
@ -77,10 +78,7 @@ int mca_atomic_mxm_fadd(void *target,
if (MXM_PTL_SHM == ptl_id) {
ptl_id = MXM_PTL_RDMA;
}
r_mkey = mca_memheap.memheap_get_cached_mkey(pe,
target,
ptl_id,
&remote_addr);
r_mkey = mca_memheap_base_get_cached_mkey(pe, target, ptl_id, &remote_addr);
if (!r_mkey) {
ATOMIC_ERROR("[#%d] %p is not address of symmetric variable",
my_pe, target);

42
oshmem/mca/atomic/ucx/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,42 @@
#
# Copyright (c) 2013 Mellanox Technologies, Inc.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
AM_CPPFLAGS = $(atomic_ucx_CPPFLAGS)
ucx_sources = \
atomic_ucx.h \
atomic_ucx_module.c \
atomic_ucx_component.c \
atomic_ucx_fadd.c \
atomic_ucx_cswap.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_oshmem_atomic_ucx_DSO
component_noinst =
component_install = mca_atomic_ucx.la
else
component_noinst = libmca_atomic_ucx.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_atomic_ucx_la_SOURCES = $(ucx_sources)
mca_atomic_ucx_la_LIBADD = $(atomic_ucx_LIBS)
mca_atomic_ucx_la_LDFLAGS = -module -avoid-version $(atomic_ucx_LDFLAGS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_atomic_ucx_la_SOURCES =$(ucx_sources)
libmca_atomic_ucx_la_LDFLAGS = -module -avoid-version $(atomic_ucx_LDFLAGS)

65
oshmem/mca/atomic/ucx/atomic_ucx.h Обычный файл
Просмотреть файл

@ -0,0 +1,65 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_ATOMIC_UCX_H
#define MCA_ATOMIC_UCX_H
#include "oshmem_config.h"
#include "opal/mca/mca.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/util/oshmem_util.h"
/* This component does uses SPML:UCX */
#include "oshmem/mca/spml/ucx/spml_ucx.h"
BEGIN_C_DECLS
/* Globally exported variables */
OSHMEM_MODULE_DECLSPEC extern mca_atomic_base_component_1_0_0_t
mca_atomic_ucx_component;
/* this component works with spml:ucx only */
extern mca_spml_ucx_t *mca_spml_self;
OSHMEM_DECLSPEC void atomic_ucx_lock(int pe);
OSHMEM_DECLSPEC void atomic_ucx_unlock(int pe);
/* API functions */
int mca_atomic_ucx_init(bool enable_progress_threads, bool enable_threads);
int mca_atomic_ucx_finalize(void);
mca_atomic_base_module_t*
mca_atomic_ucx_query(int *priority);
int mca_atomic_ucx_fadd(void *target,
void *prev,
const void *value,
size_t nlong,
int pe,
struct oshmem_op_t *op);
int mca_atomic_ucx_cswap(void *target,
void *prev,
const void *cond,
const void *value,
size_t nlong,
int pe);
struct mca_atomic_ucx_module_t {
mca_atomic_base_module_t super;
};
typedef struct mca_atomic_ucx_module_t mca_atomic_ucx_module_t;
OBJ_CLASS_DECLARATION(mca_atomic_ucx_module_t);
END_C_DECLS
#endif /* MCA_ATOMIC_MXM_H */

112
oshmem/mca/atomic/ucx/atomic_ucx_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,112 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "oshmem_config.h"
#include "oshmem/constants.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/spml/base/base.h"
#include "atomic_ucx.h"
/*
* Public string showing the scoll ucx component version number
*/
const char *mca_atomic_ucx_component_version_string =
"Open SHMEM ucx atomic MCA component version " OSHMEM_VERSION;
/*
* Global variable
*/
mca_spml_ucx_t *mca_spml_self = NULL;
/*
* Local function
*/
static int ucx_register(void);
static int ucx_open(void);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
mca_atomic_base_component_t mca_atomic_ucx_component = {
/* First, the mca_component_t struct containing meta information
about the component itself */
{
MCA_ATOMIC_BASE_VERSION_2_0_0,
/* Component name and version */
"ucx",
OSHMEM_MAJOR_VERSION,
OSHMEM_MINOR_VERSION,
OSHMEM_RELEASE_VERSION,
/* component open */
ucx_open,
/* component close */
NULL,
/* component query */
NULL,
/* component register */
ucx_register
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
/* Initialization / querying functions */
mca_atomic_ucx_init,
mca_atomic_ucx_finalize,
mca_atomic_ucx_query
};
static int ucx_register(void)
{
mca_atomic_ucx_component.priority = 100;
mca_base_component_var_register (&mca_atomic_ucx_component.atomic_version,
"priority", "Priority of the atomic:ucx "
"component (default: 100)", MCA_BASE_VAR_TYPE_INT,
NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_3,
MCA_BASE_VAR_SCOPE_ALL_EQ,
&mca_atomic_ucx_component.priority);
return OSHMEM_SUCCESS;
}
static int ucx_open(void)
{
/*
* This component is able to work using spml:ikrit component only
* (this check is added instead of !mca_spml_ikrit.enabled)
*/
if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) {
ATOMIC_VERBOSE(5,
"Can not use atomic/ucx because spml ikrit component disabled");
return OSHMEM_ERR_NOT_AVAILABLE;
}
mca_spml_self = (mca_spml_ucx_t *)mca_spml.self;
return OSHMEM_SUCCESS;
}
OBJ_CLASS_INSTANCE(mca_atomic_ucx_module_t,
mca_atomic_base_module_t,
NULL,
NULL);

34
oshmem/mca/atomic/ucx/atomic_ucx_cswap.c Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
/*
* Copyright (c) 2013 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "oshmem_config.h"
#include <stdio.h>
#include <stdlib.h>
#include "oshmem/constants.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_ucx.h"
int mca_atomic_ucx_cswap(void *target,
void *prev,
const void *cond,
const void *value,
size_t nlong,
int pe)
{
return OSHMEM_SUCCESS;
}

35
oshmem/mca/atomic/ucx/atomic_ucx_fadd.c Обычный файл
Просмотреть файл

@ -0,0 +1,35 @@
/*
* Copyright (c) 2013 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "oshmem_config.h"
#include <stdio.h>
#include <stdlib.h>
#include "oshmem/constants.h"
#include "oshmem/op/op.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_ucx.h"
int mca_atomic_ucx_fadd(void *target,
void *prev,
const void *value,
size_t nlong,
int pe,
struct oshmem_op_t *op)
{
/* TODO: actual code */
return OSHMEM_SUCCESS;
}

51
oshmem/mca/atomic/ucx/atomic_ucx_module.c Обычный файл
Просмотреть файл

@ -0,0 +1,51 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "oshmem_config.h"
#include <stdio.h>
#include "oshmem/constants.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/proc/proc.h"
#include "atomic_ucx.h"
/*
* Initial query function that is invoked during initialization, allowing
* this module to indicate what level of thread support it provides.
*/
int mca_atomic_ucx_init(bool enable_progress_threads, bool enable_threads)
{
return OSHMEM_SUCCESS;
}
int mca_atomic_ucx_finalize(void)
{
return OSHMEM_SUCCESS;
}
mca_atomic_base_module_t *
mca_atomic_ucx_query(int *priority)
{
mca_atomic_ucx_module_t *module;
*priority = mca_atomic_ucx_component.priority;
module = OBJ_NEW(mca_atomic_ucx_module_t);
if (module) {
module->super.atomic_fadd = mca_atomic_ucx_fadd;
module->super.atomic_cswap = mca_atomic_ucx_cswap;
return &(module->super);
}
return NULL ;
}

32
oshmem/mca/atomic/ucx/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
# MCA_oshmem_atomic_ucx_CONFIG([action-if-can-compile],
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_oshmem_atomic_ucx_CONFIG],[
AC_CONFIG_FILES([oshmem/mca/atomic/ucx/Makefile])
OMPI_CHECK_UCX([atomic_ucx],
[atomic_ucx_happy="yes"],
[atomic_ucx_happy="no"])
AS_IF([test "$atomic_ucx_happy" = "yes"],
[$1],
[$2])
# substitute in the things needed to build ucx
AC_SUBST([atomic_ucx_CFLAGS])
AC_SUBST([atomic_ucx_CPPFLAGS])
AC_SUBST([atomic_ucx_LDFLAGS])
AC_SUBST([atomic_ucx_LIBS])
])dnl

Просмотреть файл

@ -72,10 +72,11 @@ OSHMEM_DECLSPEC uint64_t mca_memheap_base_find_offset(int pe,
OSHMEM_DECLSPEC int mca_memheap_base_is_symmetric_addr(const void* va);
OSHMEM_DECLSPEC sshmem_mkey_t *mca_memheap_base_get_mkey(void* va,
int tr_id);
OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe,
void* va,
int btl_id,
void** rva);
OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s,
int pe,
void* va,
int btl_id,
void** rva);
OSHMEM_DECLSPEC void mca_memheap_modex_recv_all(void);
/* This function is for internal usage only
@ -147,6 +148,84 @@ OSHMEM_DECLSPEC extern mca_base_framework_t oshmem_memheap_base_framework;
oshmem_output_verbose(0, oshmem_memheap_base_framework.framework_output, \
"Warning %s:%d - %s()", __SPML_FILE__, __LINE__, __func__, __VA_ARGS__)
extern int mca_memheap_seg_cmp(const void *k, const void *v);
/* Turn ON/OFF debug output from build (default 0) */
#ifndef MEMHEAP_BASE_DEBUG
#define MEMHEAP_BASE_DEBUG 0
#endif
#define MEMHEAP_VERBOSE_FASTPATH(...)
extern mca_memheap_map_t* memheap_map;
static inline map_segment_t *memheap_find_va(const void* va)
{
map_segment_t *s;
if (OPAL_LIKELY((uintptr_t)va >= (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].seg_base_addr &&
(uintptr_t)va < (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].end)) {
s = &memheap_map->mem_segs[HEAP_SEG_INDEX];
} else {
s = bsearch(va,
&memheap_map->mem_segs[SYMB_SEG_INDEX],
memheap_map->n_segments - 1,
sizeof(*s),
mca_memheap_seg_cmp);
}
#if MEMHEAP_BASE_DEBUG == 1
if (s) {
MEMHEAP_VERBOSE(5, "match seg#%02ld: 0x%llX - 0x%llX %llu bytes va=%p",
s - memheap_map->mem_segs,
(long long)s->seg_base_addr,
(long long)s->end,
(long long)(s->end - s->seg_base_addr),
(void *)va);
}
#endif
return s;
}
static inline void* memheap_va2rva(void* va, void* local_base, void* remote_base)
{
return (void*) (remote_base > local_base ?
(uintptr_t)va + ((uintptr_t)remote_base - (uintptr_t)local_base) :
(uintptr_t)va - ((uintptr_t)local_base - (uintptr_t)remote_base));
}
static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe,
void* va,
int btl_id,
void** rva)
{
map_segment_t *s;
sshmem_mkey_t *mkey;
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p", pe, va);
s = memheap_find_va(va);
if (OPAL_UNLIKELY(NULL == s))
return NULL ;
if (OPAL_UNLIKELY(!MAP_SEGMENT_IS_VALID(s)))
return NULL ;
if (OPAL_UNLIKELY(pe == oshmem_my_proc_id())) {
*rva = va;
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (local) %lx %p", pe, va,
s->mkeys[btl_id].u.key, *rva);
return &s->mkeys[btl_id];
}
if (OPAL_LIKELY(s->mkeys_cache[pe])) {
mkey = &s->mkeys_cache[pe][btl_id];
*rva = memheap_va2rva(va, s->seg_base_addr, mkey->va_base);
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (cached) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva);
return mkey;
}
return mca_memheap_base_get_cached_mkey_slow(s, pe, va, btl_id, rva);
}
END_C_DECLS
#endif /* MCA_MEMHEAP_BASE_H */

Просмотреть файл

@ -55,9 +55,7 @@ struct oob_comm {
opal_list_t req_list;
};
#define MEMHEAP_VERBOSE_FASTPATH(...)
static mca_memheap_map_t* memheap_map = NULL;
mca_memheap_map_t* memheap_map = NULL;
struct oob_comm memheap_oob = {{{0}}};
@ -70,12 +68,12 @@ static int memheap_oob_get_mkeys(int pe,
uint32_t va_seg_num,
sshmem_mkey_t *mkey);
static inline void* __seg2base_va(int seg)
static inline void* mca_memheap_seg2base_va(int seg)
{
return memheap_map->mem_segs[seg].seg_base_addr;
}
static int _seg_cmp(const void *k, const void *v)
int mca_memheap_seg_cmp(const void *k, const void *v)
{
uintptr_t va = (uintptr_t) k;
map_segment_t *s = (map_segment_t *) v;
@ -88,34 +86,6 @@ static int _seg_cmp(const void *k, const void *v)
return 0;
}
static inline map_segment_t *__find_va(const void* va)
{
map_segment_t *s;
if (OPAL_LIKELY((uintptr_t)va >= (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].seg_base_addr &&
(uintptr_t)va < (uintptr_t)memheap_map->mem_segs[HEAP_SEG_INDEX].end)) {
s = &memheap_map->mem_segs[HEAP_SEG_INDEX];
} else {
s = bsearch(va,
&memheap_map->mem_segs[SYMB_SEG_INDEX],
memheap_map->n_segments - 1,
sizeof(*s),
_seg_cmp);
}
#if MEMHEAP_BASE_DEBUG == 1
if (s) {
MEMHEAP_VERBOSE(5, "match seg#%02ld: 0x%llX - 0x%llX %llu bytes va=%p",
s - memheap_map->mem_segs,
(long long)s->seg_base_addr,
(long long)s->end,
(long long)(s->end - s->seg_base_addr),
(void *)va);
}
#endif
return s;
}
/**
* @param all_trs
* 0 - pack mkeys for transports to given pe
@ -146,7 +116,7 @@ static int pack_local_mkeys(opal_buffer_t *msg, int pe, int seg, int all_trs)
else {
tr_id = i;
}
mkey = mca_memheap_base_get_mkey(__seg2base_va(seg), tr_id);
mkey = mca_memheap_base_get_mkey(mca_memheap_seg2base_va(seg), tr_id);
if (!mkey) {
MEMHEAP_ERROR("seg#%d tr_id: %d failed to find local mkey",
seg, tr_id);
@ -232,6 +202,7 @@ static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe)
}
cnt = memheap_oob.mkeys[tr_id].len;
opal_dss.unpack(msg, memheap_oob.mkeys[tr_id].u.data, &cnt, OPAL_BYTE);
MCA_SPML_CALL(rmkey_unpack(&memheap_oob.mkeys[tr_id], remote_pe));
} else {
memheap_oob.mkeys[tr_id].u.key = MAP_SEGMENT_SHM_INVALID;
}
@ -510,7 +481,7 @@ static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys)
if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(pe, seg, mkeys))) {
for (i = 0; i < memheap_map->num_transports; i++) {
mkeys[i].va_base = __seg2base_va(seg);
mkeys[i].va_base = mca_memheap_seg2base_va(seg);
MEMHEAP_VERBOSE(5,
"MKEY CALCULATED BY LOCAL SPML: pe: %d tr_id: %d %s",
pe,
@ -720,46 +691,15 @@ exit_fatal:
}
}
static inline void* va2rva(void* va,
void* local_base,
void* remote_base)
sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s,
int pe,
void* va,
int btl_id,
void** rva)
{
return (void*) (remote_base > local_base ?
(uintptr_t)va + ((uintptr_t)remote_base - (uintptr_t)local_base) :
(uintptr_t)va - ((uintptr_t)local_base - (uintptr_t)remote_base));
}
sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe,
void* va,
int btl_id,
void** rva)
{
map_segment_t *s;
int rc;
sshmem_mkey_t *mkey;
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p", pe, va);
s = __find_va(va);
if (NULL == s)
return NULL ;
if (!MAP_SEGMENT_IS_VALID(s))
return NULL ;
if (pe == oshmem_my_proc_id()) {
*rva = va;
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (local) %lx %p", pe, va,
s->mkeys[btl_id].u.key, *rva);
return &s->mkeys[btl_id];
}
if (OPAL_LIKELY(s->mkeys_cache[pe])) {
mkey = &s->mkeys_cache[pe][btl_id];
*rva = va2rva(va, s->seg_base_addr, mkey->va_base);
MEMHEAP_VERBOSE_FASTPATH(10, "rkey: pe=%d va=%p -> (cached) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva);
return mkey;
}
s->mkeys_cache[pe] = (sshmem_mkey_t *) calloc(memheap_map->num_transports,
sizeof(sshmem_mkey_t));
if (!s->mkeys_cache[pe])
@ -772,7 +712,7 @@ sshmem_mkey_t * mca_memheap_base_get_cached_mkey(int pe,
return NULL ;
mkey = &s->mkeys_cache[pe][btl_id];
*rva = va2rva(va, s->seg_base_addr, mkey->va_base);
*rva = memheap_va2rva(va, s->seg_base_addr, mkey->va_base);
MEMHEAP_VERBOSE_FASTPATH(5, "rkey: pe=%d va=%p -> (remote lookup) %lx %p", pe, (void *)va, mkey->u.key, (void *)*rva);
return mkey;
@ -782,7 +722,7 @@ sshmem_mkey_t *mca_memheap_base_get_mkey(void* va, int tr_id)
{
map_segment_t *s;
s = __find_va(va);
s = memheap_find_va(va);
return ((s && MAP_SEGMENT_IS_VALID(s)) ? &s->mkeys[tr_id] : NULL );
}
@ -795,7 +735,7 @@ uint64_t mca_memheap_base_find_offset(int pe,
map_segment_t *s;
int my_pe = oshmem_my_proc_id();
s = __find_va(va);
s = memheap_find_va(va);
if (my_pe == pe) {
return (uintptr_t)va - (uintptr_t)s->seg_base_addr;
@ -807,7 +747,7 @@ uint64_t mca_memheap_base_find_offset(int pe,
int mca_memheap_base_is_symmetric_addr(const void* va)
{
return (__find_va(va) ? 1 : 0);
return (memheap_find_va(va) ? 1 : 0);
}
int mca_memheap_base_detect_addr_type(void* va)
@ -815,7 +755,7 @@ int mca_memheap_base_detect_addr_type(void* va)
int addr_type = ADDR_INVALID;
map_segment_t *s;
s = __find_va(va);
s = memheap_find_va(va);
if (s) {
if (s->type == MAP_SEGMENT_STATIC) {

Просмотреть файл

@ -86,6 +86,7 @@ static int _dereg_segment(map_segment_t *s)
continue;
if (s->mkeys_cache[j]) {
if (s->mkeys_cache[j]->len) {
MCA_SPML_CALL(rmkey_free(s->mkeys_cache[j]));
free(s->mkeys_cache[j]->u.data);
s->mkeys_cache[j]->len = 0;
}

Просмотреть файл

@ -33,7 +33,6 @@ mca_memheap_buddy_module_t memheap_buddy = {
mca_memheap_buddy_private_alloc,
mca_memheap_buddy_private_free,
mca_memheap_base_get_cached_mkey,
mca_memheap_base_get_mkey,
mca_memheap_base_find_offset,
mca_memheap_base_is_symmetric_addr,

Просмотреть файл

@ -66,14 +66,6 @@ typedef uint64_t (*mca_memheap_base_module_find_offset_fn_t)(int pe,
void* va,
void* rva);
/**
* @return mkey suitable to access pe via given transport id. rva is set to virtual address mapping of (va)
* on remote pe.
*/
typedef sshmem_mkey_t * (*mca_memheap_base_module_get_cached_mkey_fn_t)(int pe,
void* va,
int transport_id,
void** rva);
typedef sshmem_mkey_t * (*mca_memheap_base_module_get_local_mkey_fn_t)(void* va,
int transport_id);
@ -118,7 +110,6 @@ struct mca_memheap_base_module_t {
mca_memheap_base_module_alloc_fn_t memheap_private_alloc;
mca_memheap_base_module_free_fn_t memheap_private_free;
mca_memheap_base_module_get_cached_mkey_fn_t memheap_get_cached_mkey;
mca_memheap_base_module_get_local_mkey_fn_t memheap_get_local_mkey;
mca_memheap_base_module_find_offset_fn_t memheap_find_offset;
mca_memheap_base_is_memheap_addr_fn_t memheap_is_symmetric_addr;

Просмотреть файл

@ -31,7 +31,6 @@ mca_memheap_ptmalloc_module_t memheap_ptmalloc = {
mca_memheap_ptmalloc_alloc,
mca_memheap_ptmalloc_free,
mca_memheap_base_get_cached_mkey,
mca_memheap_base_get_mkey,
mca_memheap_base_find_offset,
mca_memheap_base_is_symmetric_addr,

Просмотреть файл

@ -69,6 +69,9 @@ OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(int pe,
uint32_t seg,
sshmem_mkey_t *mkeys);
OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, int pe);
OSHMEM_DECLSPEC void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey);
/*
* MCA framework
*/

Просмотреть файл

@ -158,3 +158,10 @@ int mca_spml_base_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys)
return OSHMEM_ERROR;
}
void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, int pe)
{
}
void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey)
{
}

Просмотреть файл

@ -204,6 +204,9 @@ mca_spml_ikrit_t mca_spml_ikrit = {
mca_spml_base_wait,
mca_spml_base_wait_nb,
mca_spml_ikrit_fence,
mca_spml_base_rmkey_unpack,
mca_spml_base_rmkey_free,
(void*)&mca_spml_ikrit
}
};
@ -776,10 +779,7 @@ static int mca_spml_ikrit_get_helper(mxm_send_req_t *sreq,
/**
* Get the address to the remote rkey.
**/
r_mkey = mca_memheap.memheap_get_cached_mkey(src,
src_addr,
ptl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, ptl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
@ -826,10 +826,7 @@ static inline int mca_spml_ikrit_get_shm(void *src_addr,
if (ptl_id != MXM_PTL_SHM)
return OSHMEM_ERROR;
r_mkey = mca_memheap.memheap_get_cached_mkey(src,
src_addr,
ptl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, ptl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
@ -1064,10 +1061,7 @@ static inline int mca_spml_ikrit_put_internal(void* dst_addr,
ptl_id = get_ptl_id(dst);
/* Get rkey of remote PE (dst proc) which must be on memheap */
r_mkey = mca_memheap.memheap_get_cached_mkey(dst,
dst_addr,
ptl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
@ -1091,10 +1085,7 @@ static inline int mca_spml_ikrit_put_internal(void* dst_addr,
}
/* segment not mapped - fallback to rmda */
ptl_id = MXM_PTL_RDMA;
r_mkey = mca_memheap.memheap_get_cached_mkey(dst,
dst_addr,
ptl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
@ -1206,11 +1197,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr,
ptl_id = get_ptl_id(dst);
/* Get rkey of remote PE (dst proc) which must be on memheap */
r_mkey = mca_memheap.memheap_get_cached_mkey(dst,
//(unsigned long) dst_addr,
dst_addr,
ptl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, ptl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
@ -1233,7 +1220,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr,
}
/* segment not mapped - fallback to rmda */
ptl_id = MXM_PTL_RDMA;
r_mkey = mca_memheap.memheap_get_cached_mkey(dst,
r_mkey = mca_memheap_base_get_cached_mkey(dst,
//(unsigned long) dst_addr,
dst_addr,
ptl_id,

Просмотреть файл

@ -11,8 +11,8 @@
* @file
*/
#ifndef MCA_SPML_YODA_COMPONENT_H
#define MCA_SPML_YODA_COMPONENT_H
#ifndef MCA_SPML_IKRIT_COMPONENT_H
#define MCA_SPML_IKRIT_COMPONENT_H
BEGIN_C_DECLS

Просмотреть файл

@ -111,6 +111,20 @@ typedef int (*mca_spml_base_module_wait_fn_t)(void* addr,
void* value,
int datatype);
/**
* deserialize remote mkey
*
* @param mkey remote mkey
*/
typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(sshmem_mkey_t *, int remote_pe);
/**
* free resources used by deserialized remote mkey
*
* @param mkey remote mkey
*/
typedef void (*mca_spml_base_module_mkey_free_fn_t)(sshmem_mkey_t *);
/**
* Register (Pinn) a buffer of 'size' bits starting in address addr
*
@ -267,6 +281,8 @@ struct mca_spml_base_module_1_0_0_t {
mca_spml_base_module_wait_nb_fn_t spml_wait_nb;
mca_spml_base_module_fence_fn_t spml_fence;
mca_spml_base_module_mkey_unpack_fn_t spml_rmkey_unpack;
mca_spml_base_module_mkey_free_fn_t spml_rmkey_free;
void *self;
};

42
oshmem/mca/spml/ucx/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,42 @@
#
# Copyright (c) 2015 Mellanox Technologies, Inc.
# All rights reserved.
#
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
dist_ompidata_DATA =
AM_CPPFLAGS = $(spml_ucx_CPPFLAGS)
ucx_sources = \
spml_ucx_component.h \
spml_ucx_component.c \
spml_ucx.h \
spml_ucx.c
if MCA_BUILD_oshmem_spml_ucx_DSO
component_noinst =
component_install = mca_spml_ucx.la
else
component_noinst = libmca_spml_ucx.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_spml_ucx_la_SOURCES = $(ucx_sources)
mca_spml_ucx_la_LIBADD = $(spml_ucx_LIBS)
mca_spml_ucx_la_LDFLAGS = -module -avoid-version $(spml_ucx_LDFLAGS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_spml_ucx_la_SOURCES = $(ucx_sources)
libmca_spml_ucx_la_LIBADD = $(spml_ucx_LIBS)
libmca_spml_ucx_la_LDFLAGS = -module -avoid-version $(spml_ucx_LDFLAGS)

32
oshmem/mca/spml/ucx/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
# MCA_oshmem_spml_ucx_CONFIG([action-if-can-compile],
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_oshmem_spml_ucx_CONFIG],[
AC_CONFIG_FILES([oshmem/mca/spml/ucx/Makefile])
OMPI_CHECK_UCX([spml_ucx],
[spml_ucx_happy="yes"],
[spml_ucx_happy="no"])
AS_IF([test "$spml_ucx_happy" = "yes"],
[$1],
[$2])
# substitute in the things needed to build ucx
AC_SUBST([spml_ucx_CFLAGS])
AC_SUBST([spml_ucx_CPPFLAGS])
AC_SUBST([spml_ucx_LDFLAGS])
AC_SUBST([spml_ucx_LIBS])
])dnl

481
oshmem/mca/spml/ucx/spml_ucx.c Обычный файл
Просмотреть файл

@ -0,0 +1,481 @@
/*
* Copyright (c) 2013 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include "oshmem_config.h"
#include "opal/datatype/opal_convertor.h"
#include "orte/include/orte/types.h"
#include "orte/runtime/orte_globals.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/mca/pml/pml.h"
#include "oshmem/mca/spml/ucx/spml_ucx.h"
#include "oshmem/include/shmem.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/base.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/runtime/runtime.h"
#include "orte/util/show_help.h"
#include "oshmem/mca/spml/ucx/spml_ucx_component.h"
/* Turn ON/OFF debug output from build (default 0) */
#ifndef SPML_UCX_PUT_DEBUG
#define SPML_UCX_PUT_DEBUG 0
#endif
mca_spml_ucx_t mca_spml_ucx = {
{
/* Init mca_spml_base_module_t */
mca_spml_ucx_add_procs,
mca_spml_ucx_del_procs,
mca_spml_ucx_enable,
mca_spml_ucx_register,
mca_spml_ucx_deregister,
mca_spml_base_oob_get_mkeys,
mca_spml_ucx_put,
NULL, //mca_spml_ucx_put_nb,
mca_spml_ucx_get,
mca_spml_ucx_recv,
mca_spml_ucx_send,
mca_spml_base_wait,
mca_spml_base_wait_nb,
mca_spml_ucx_quiet, /* At the moment fence is the same as quite for
every spml */
mca_spml_ucx_rmkey_unpack,
mca_spml_ucx_rmkey_free,
(void*)&mca_spml_ucx
}
};
int mca_spml_ucx_enable(bool enable)
{
SPML_VERBOSE(50, "*** ucx ENABLED ****");
if (false == enable) {
return OSHMEM_SUCCESS;
}
mca_spml_ucx.enabled = true;
return OSHMEM_SUCCESS;
}
int mca_spml_ucx_del_procs(oshmem_proc_t** procs, size_t nprocs)
{
size_t i, n;
int my_rank = oshmem_my_proc_id();
oshmem_shmem_barrier();
if (!mca_spml_ucx.ucp_peers) {
return OSHMEM_SUCCESS;
}
for (n = 0; n < nprocs; n++) {
i = (my_rank + n) % nprocs;
if (mca_spml_ucx.ucp_peers[i].ucp_conn) {
ucp_ep_destroy(mca_spml_ucx.ucp_peers[i].ucp_conn);
}
}
free(mca_spml_ucx.ucp_peers);
return OSHMEM_SUCCESS;
}
/* TODO: move func into common place, use it with rkey exchng too */
static int oshmem_shmem_xchng(
void *local_data, int local_size, int nprocs,
void **rdata_p, int **roffsets_p, int **rsizes_p)
{
int *rcv_sizes = NULL;
int *rcv_offsets = NULL;
void *rcv_buf = NULL;
int rc;
int i;
/* do llgatherv */
rcv_offsets = malloc(nprocs * sizeof(*rcv_offsets));
if (NULL == rcv_offsets) {
goto err;
}
/* todo: move into separate function. do allgatherv */
rcv_sizes = malloc(nprocs * sizeof(*rcv_sizes));
if (NULL == rcv_sizes) {
goto err;
}
rc = oshmem_shmem_allgather(&local_size, rcv_sizes, sizeof(int));
if (MPI_SUCCESS != rc) {
goto err;
}
/* calculate displacements */
rcv_offsets[0] = 0;
for (i = 1; i < nprocs; i++) {
rcv_offsets[i] = rcv_offsets[i - 1] + rcv_sizes[i - 1];
}
rcv_buf = malloc(rcv_offsets[nprocs - 1] + rcv_sizes[nprocs - 1]);
if (NULL == rcv_buf) {
goto err;
}
rc = oshmem_shmem_allgatherv(local_data, rcv_buf, local_size, rcv_sizes, rcv_offsets);
if (MPI_SUCCESS != rc) {
goto err;
}
*rdata_p = rcv_buf;
*roffsets_p = rcv_offsets;
*rsizes_p = rcv_sizes;
return OSHMEM_SUCCESS;
err:
if (rcv_buf)
free(rcv_buf);
if (rcv_offsets)
free(rcv_offsets);
if (rcv_sizes)
free(rcv_sizes);
return OSHMEM_ERROR;
}
static void dump_address(int pe, char *addr, size_t len)
{
#ifdef SPML_UCX_DEBUG
int my_rank = oshmem_my_proc_id();
unsigned i;
printf("me=%d dest_pe=%d addr=%p len=%d\n", my_rank, pe, addr, len);
for (i = 0; i < len; i++) {
printf("%02X ", (unsigned)0xFF&addr[i]);
}
printf("\n");
#endif
}
int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
{
size_t i, n;
int rc = OSHMEM_ERROR;
int my_rank = oshmem_my_proc_id();
ucs_status_t err;
ucp_address_t *wk_local_addr;
size_t wk_addr_len;
int *wk_roffs, *wk_rsizes;
char *wk_raddrs;
mca_spml_ucx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(mca_spml_ucx.ucp_peers)));
if (NULL == mca_spml_ucx.ucp_peers) {
goto error;
}
err = ucp_worker_get_address(mca_spml_ucx.ucp_worker, &wk_local_addr, &wk_addr_len);
if (err == UCS_OK) {
goto error;
}
dump_address(my_rank, (char *)wk_local_addr, wk_addr_len);
rc = oshmem_shmem_xchng(wk_local_addr, wk_addr_len, nprocs,
(void **)&wk_raddrs, &wk_roffs, &wk_rsizes);
if (rc != OSHMEM_SUCCESS) {
goto error;
}
opal_progress_register(spml_ucx_progress);
/* Get the EP connection requests for all the processes from modex */
for (n = 0; n < nprocs; ++n) {
i = (my_rank + n) % nprocs;
//if (i == my_rank) continue;
dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]);
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
(ucp_address_t *)(wk_raddrs + wk_roffs[i]),
&mca_spml_ucx.ucp_peers[i].ucp_conn);
if (UCS_OK != err) {
SPML_ERROR("ucp_ep_create failed!!!\n");
goto error2;
}
}
ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr);
free(wk_raddrs);
free(wk_rsizes);
free(wk_roffs);
SPML_VERBOSE(50, "*** ADDED PROCS ***");
return OSHMEM_SUCCESS;
error2:
for (i = 0; i < nprocs; ++i) {
if (mca_spml_ucx.ucp_peers[i].ucp_conn) {
ucp_ep_destroy(mca_spml_ucx.ucp_peers[i].ucp_conn);
}
}
if (mca_spml_ucx.ucp_peers)
free(mca_spml_ucx.ucp_peers);
if (wk_raddrs)
free(wk_raddrs);
if (wk_rsizes)
free(wk_rsizes);
if (wk_roffs)
free(wk_roffs);
if (mca_spml_ucx.ucp_peers)
free(mca_spml_ucx.ucp_peers);
error:
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
SPML_ERROR("add procs FAILED rc=%d", rc);
return rc;
}
void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey)
{
spml_ucx_mkey_t *ucx_mkey;
if (!mkey->spml_context) {
return;
}
ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context);
ucp_rkey_destroy(ucx_mkey->rkey);
free(ucx_mkey);
}
void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, int pe)
{
spml_ucx_mkey_t *ucx_mkey;
ucs_status_t err;
ucx_mkey = (spml_ucx_mkey_t *)malloc(sizeof(*ucx_mkey));
if (!ucx_mkey) {
SPML_ERROR("not enough memory to allocate mkey");
goto error_fatal;
}
err = ucp_ep_rkey_unpack(mca_spml_ucx.ucp_peers[pe].ucp_conn,
mkey->u.data,
&ucx_mkey->rkey);
if (UCS_OK != err) {
SPML_ERROR("failed to unpack rkey");
goto error_fatal;
}
mkey->spml_context = ucx_mkey;
return;
error_fatal:
oshmem_shmem_abort(-1);
return;
}
sshmem_mkey_t *mca_spml_ucx_register(void* addr,
size_t size,
uint64_t shmid,
int *count)
{
sshmem_mkey_t *mkeys;
ucs_status_t err;
spml_ucx_mkey_t *ucx_mkey;
size_t len;
*count = 0;
mkeys = (sshmem_mkey_t *) calloc(1, sizeof(*mkeys));
if (!mkeys) {
return NULL ;
}
ucx_mkey = (spml_ucx_mkey_t *)malloc(sizeof(*ucx_mkey));
if (!ucx_mkey) {
goto error_out;
}
mkeys[0].spml_context = ucx_mkey;
err = ucp_mem_map(mca_spml_ucx.ucp_context,
&addr, size, 0, &ucx_mkey->mem_h);
if (UCS_OK != err) {
goto error_out1;
}
err = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h,
&mkeys[0].u.data, &len);
if (UCS_OK != err) {
goto error_unmap;
}
if (len >= 0xffff) {
SPML_ERROR("packed rkey is too long: %llu >= %d",
(unsigned long long)len,
0xffff);
oshmem_shmem_abort(-1);
}
err = ucp_ep_rkey_unpack(mca_spml_ucx.ucp_peers[oshmem_group_self->my_pe].ucp_conn,
mkeys[0].u.data,
&ucx_mkey->rkey);
if (UCS_OK != err) {
SPML_ERROR("failed to unpack rkey");
goto error_unmap;
}
mkeys[0].len = len;
mkeys[0].va_base = addr;
*count = 1;
return mkeys;
error_unmap:
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
error_out1:
free(ucx_mkey);
error_out:
free(mkeys);
return NULL ;
}
int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
{
spml_ucx_mkey_t *ucx_mkey;
MCA_SPML_CALL(fence());
if (!mkeys)
return OSHMEM_SUCCESS;
if (!mkeys[0].spml_context)
return OSHMEM_SUCCESS;
ucx_mkey = (spml_ucx_mkey_t *)mkeys[0].spml_context;
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
if (0 < mkeys[0].len) {
ucp_rkey_buffer_release(mkeys[0].u.data);
}
free(ucx_mkey);
return OSHMEM_SUCCESS;
}
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
spml_ucx_mkey_t *ucx_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
}
int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
spml_ucx_mkey_t *ucx_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
}
int mca_spml_ucx_fence(void)
{
ucs_status_t err;
err = ucp_worker_flush(mca_spml_ucx.ucp_worker);
if (UCS_OK != err) {
SPML_ERROR("fence failed");
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
return OSHMEM_SUCCESS;
}
int mca_spml_ucx_quiet(void)
{
ucs_status_t err;
err = ucp_worker_flush(mca_spml_ucx.ucp_worker);
if (UCS_OK != err) {
SPML_ERROR("fence failed");
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
return OSHMEM_SUCCESS;
}
/* blocking receive */
int mca_spml_ucx_recv(void* buf, size_t size, int src)
{
int rc = OSHMEM_SUCCESS;
rc = MCA_PML_CALL(recv(buf,
size,
&(ompi_mpi_unsigned_char.dt),
src,
0,
&(ompi_mpi_comm_world.comm),
NULL));
return rc;
}
/* for now only do blocking copy send */
int mca_spml_ucx_send(void* buf,
size_t size,
int dst,
mca_spml_base_put_mode_t mode)
{
int rc = OSHMEM_SUCCESS;
rc = MCA_PML_CALL(send(buf,
size,
&(ompi_mpi_unsigned_char.dt),
dst,
0,
(mca_pml_base_send_mode_t)mode,
&(ompi_mpi_comm_world.comm)));
return rc;
}

104
oshmem/mca/spml/ucx/spml_ucx.h Обычный файл
Просмотреть файл

@ -0,0 +1,104 @@
/*
* Copyright (c) 2013 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_SPML_UCX_H
#define MCA_SPML_UCX_H
#include "oshmem_config.h"
#include "oshmem/request/request.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/util/oshmem_util.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/spml_base_request.h"
#include "oshmem/mca/spml/base/spml_base_getreq.h"
#include "orte/runtime/orte_globals.h"
#include <ucp/api/ucp.h>
BEGIN_C_DECLS
/**
* UCX SPML module
*/
struct ucp_peer {
ucp_ep_h ucp_conn;
};
typedef struct ucp_peer ucp_peer_t;
struct mca_spml_ucx {
mca_spml_base_module_t super;
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
ucp_peer_t *ucp_peers;
int priority; /* component priority */
bool enabled;
};
typedef struct mca_spml_ucx mca_spml_ucx_t;
struct spml_ucx_mkey {
ucp_rkey_h rkey;
ucp_mem_h mem_h;
};
typedef struct spml_ucx_mkey spml_ucx_mkey_t;
extern mca_spml_ucx_t mca_spml_ucx;
extern int mca_spml_ucx_enable(bool enable);
extern int mca_spml_ucx_get(void* dst_addr,
size_t size,
void* src_addr,
int src);
extern int mca_spml_ucx_put(void* dst_addr,
size_t size,
void* src_addr,
int dst);
extern int mca_spml_ucx_put_nb(void* dst_addr,
size_t size,
void* src_addr,
int dst,
void **handle);
extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
extern int mca_spml_ucx_send(void* buf,
size_t size,
int dst,
mca_spml_base_put_mode_t mode);
extern sshmem_mkey_t *mca_spml_ucx_register(void* addr,
size_t size,
uint64_t shmid,
int *count);
extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys);
extern void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, int pe);
extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey);
extern int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs);
extern int mca_spml_ucx_del_procs(oshmem_proc_t** procs, size_t nprocs);
extern int mca_spml_ucx_fence(void);
extern int mca_spml_ucx_quiet(void);
extern int spml_ucx_progress(void);
END_C_DECLS
#endif

184
oshmem/mca/spml/ucx/spml_ucx_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,184 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include "oshmem_config.h"
#include "orte/util/show_help.h"
#include "shmem.h"
#include "oshmem/runtime/params.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/spml/base/base.h"
#include "spml_ucx_component.h"
#include "oshmem/mca/spml/ucx/spml_ucx.h"
#include "orte/util/show_help.h"
#include "opal/util/opal_environ.h"
static int mca_spml_ucx_component_register(void);
static int mca_spml_ucx_component_open(void);
static int mca_spml_ucx_component_close(void);
static mca_spml_base_module_t*
mca_spml_ucx_component_init(int* priority,
bool enable_progress_threads,
bool enable_mpi_threads);
static int mca_spml_ucx_component_fini(void);
mca_spml_base_component_2_0_0_t mca_spml_ucx_component = {
/* First, the mca_base_component_t struct containing meta
information about the component itself */
{
MCA_SPML_BASE_VERSION_2_0_0,
"ucx", /* MCA component name */
OSHMEM_MAJOR_VERSION, /* MCA component major version */
OSHMEM_MINOR_VERSION, /* MCA component minor version */
OSHMEM_RELEASE_VERSION, /* MCA component release version */
mca_spml_ucx_component_open, /* component open */
mca_spml_ucx_component_close, /* component close */
NULL,
mca_spml_ucx_component_register
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
mca_spml_ucx_component_init, /* component init */
mca_spml_ucx_component_fini /* component finalize */
};
static inline void mca_spml_ucx_param_register_int(const char* param_name,
int default_value,
const char *help_msg,
int *storage)
{
*storage = default_value;
(void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
param_name,
help_msg,
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
storage);
}
static inline void mca_spml_ucx_param_register_string(const char* param_name,
char* default_value,
const char *help_msg,
char **storage)
{
*storage = default_value;
(void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
param_name,
help_msg,
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
storage);
}
static int mca_spml_ucx_component_register(void)
{
mca_spml_ucx_param_register_int("priority", 20,
"[integer] ucx priority",
&mca_spml_ucx.priority);
return OSHMEM_SUCCESS;
}
int spml_ucx_progress(void)
{
ucp_worker_progress(mca_spml_ucx.ucp_worker);
return 1;
}
static int mca_spml_ucx_component_open(void)
{
ucs_status_t err;
ucp_config_t *ucp_config;
ucp_params_t params;
err = ucp_config_read("OSHMEM", NULL, &ucp_config);
if (UCS_OK != err) {
return OSHMEM_ERROR;
}
memset(&params, 0, sizeof(params));
params.features = UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64;
err = ucp_init(&params, ucp_config, &mca_spml_ucx.ucp_context);
ucp_config_release(ucp_config);
if (UCS_OK != err) {
return OSHMEM_ERROR;
}
return OSHMEM_SUCCESS;
}
static int mca_spml_ucx_component_close(void)
{
ucp_cleanup(mca_spml_ucx.ucp_context);
return OSHMEM_SUCCESS;
}
static int spml_ucx_init(void)
{
ucs_status_t err;
err = ucp_worker_create(mca_spml_ucx.ucp_context, UCS_THREAD_MODE_SINGLE,
&mca_spml_ucx.ucp_worker);
if (UCS_OK != err) {
return OSHMEM_ERROR;
}
return OSHMEM_SUCCESS;
}
static mca_spml_base_module_t*
mca_spml_ucx_component_init(int* priority,
bool enable_progress_threads,
bool enable_mpi_threads)
{
SPML_VERBOSE( 10, "in ucx, my priority is %d\n", mca_spml_ucx.priority);
if ((*priority) > mca_spml_ucx.priority) {
*priority = mca_spml_ucx.priority;
return NULL ;
}
*priority = mca_spml_ucx.priority;
if (OSHMEM_SUCCESS != spml_ucx_init())
return NULL ;
SPML_VERBOSE(50, "*** ucx initialized ****");
return &mca_spml_ucx.super;
}
static int mca_spml_ucx_component_fini(void)
{
opal_progress_unregister(spml_ucx_progress);
if (mca_spml_ucx.ucp_worker) {
ucp_worker_destroy(mca_spml_ucx.ucp_worker);
}
if(!mca_spml_ucx.enabled)
return OSHMEM_SUCCESS; /* never selected.. return success.. */
mca_spml_ucx.enabled = false; /* not anymore */
return OSHMEM_SUCCESS;
}

25
oshmem/mca/spml/ucx/spml_ucx_component.h Обычный файл
Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (c) 2015 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_SPML_UCX_COMPONENT_H
#define MCA_SPML_UCX_COMPONENT_H
BEGIN_C_DECLS
/*
* SPML module functions.
*/
OSHMEM_MODULE_DECLSPEC extern mca_spml_base_component_2_0_0_t mca_spml_ucx_component;
END_C_DECLS
#endif

Просмотреть файл

@ -62,6 +62,8 @@ mca_spml_yoda_module_t mca_spml_yoda = {
mca_spml_base_wait,
mca_spml_base_wait_nb,
mca_spml_yoda_fence,
mca_spml_base_rmkey_unpack,
mca_spml_base_rmkey_free,
(void *)&mca_spml_yoda
}
@ -759,10 +761,7 @@ static inline int mca_spml_yoda_put_internal(void *dst_addr,
put_via_send = !(bml_btl->btl->btl_flags & MCA_BTL_FLAGS_PUT);
/* Get rkey of remote PE (dst proc) which must be on memheap*/
r_mkey = mca_memheap.memheap_get_cached_mkey(dst,
dst_addr,
btl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, btl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
@ -1033,10 +1032,7 @@ int mca_spml_yoda_get(void* src_addr, size_t size, void* dst_addr, int src)
(bml_btl->btl->btl_flags & (MCA_BTL_FLAGS_PUT)) );
/* Get rkey of remote PE (src proc) which must be on memheap*/
r_mkey = mca_memheap.memheap_get_cached_mkey(src,
src_addr,
btl_id,
&rva);
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, btl_id, &rva);
if (!r_mkey) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);

Просмотреть файл

@ -14,6 +14,7 @@
#include "oshmem/runtime/runtime.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#if OSHMEM_PROFILING
#include "oshmem/include/pshmem.h"
@ -28,8 +29,7 @@ int shmem_addr_accessible(void *addr, int pe)
RUNTIME_CHECK_INIT();
mkey = MCA_MEMHEAP_CALL(get_cached_mkey(pe, addr,
oshmem_get_transport_id(pe), &rva));
mkey = mca_memheap_base_get_cached_mkey(pe, addr, oshmem_get_transport_id(pe), &rva);
return mkey ? 1 : 0;
}