1
1
Этот коммит содержится в:
yosefe 2015-08-23 20:48:14 +03:00
родитель 502dc8aaa4
Коммит a313588337
10 изменённых файлов: 1628 добавлений и 0 удалений

45
ompi/mca/pml/ucx/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,45 @@
#
# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
AM_CPPFLAGS = $(pml_ucx_CPPFLAGS)
local_sources = \
pml_ucx.h \
pml_ucx.c \
pml_ucx_request.h \
pml_ucx_request.c \
pml_ucx_datatype.h \
pml_ucx_datatype.c \
pml_ucx_freelist.h \
pml_ucx_component.c
if MCA_BUILD_ompi_pml_ucx_DSO
component_noinst =
component_install = mca_pml_ucx.la
else
component_noinst = libmca_pml_ucx.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_pml_ucx_la_SOURCES = $(local_sources)
mca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS)
mca_pml_ucx_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_pml_ucx_la_SOURCES = $(local_sources)
libmca_pml_ucx_la_LIBADD = $(pml_ucx_LIBS)
libmca_pml_ucx_la_LDFLAGS = -module -avoid-version

30
ompi/mca/pml/ucx/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
#
# Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
AC_DEFUN([MCA_ompi_pml_ucx_POST_CONFIG], [
AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([PML])])
])
AC_DEFUN([MCA_ompi_pml_ucx_CONFIG], [
AC_CONFIG_FILES([ompi/mca/pml/ucx/Makefile])
OMPI_CHECK_UCX([pml_ucx],
[pml_ucx_happy="yes"],
[pml_ucx_happy="no"])
AS_IF([test "$pml_ucx_happy" = "yes"],
[$1],
[$2])
# substitute in the things needed to build ucx
AC_SUBST([pml_ucx_CPPFLAGS])
AC_SUBST([pml_ucx_LDFLAGS])
AC_SUBST([pml_ucx_LIBS])
])

712
ompi/mca/pml/ucx/pml_ucx.c Обычный файл
Просмотреть файл

@ -0,0 +1,712 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx.h"
#include "opal/runtime/opal.h"
#include "opal/mca/pmix/pmix.h"
#include "ompi/message/message.h"
#include "pml_ucx_request.h"
#include <inttypes.h>
#define PML_UCX_TRACE_SEND(_msg, _buf, _count, _datatype, _dst, _tag, _mode, _comm, ...) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' dst %d tag %d mode %s comm %d '%s'", \
__VA_ARGS__, \
(_buf), (_count), (_datatype)->name, (_dst), (_tag), \
mca_pml_ucx_send_mode_name(_mode), (_comm)->c_contextid, \
(_comm)->c_name);
#define PML_UCX_TRACE_RECV(_msg, _buf, _count, _datatype, _src, _tag, _comm, ...) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' src %d tag %d comm %d '%s'", \
__VA_ARGS__, \
(_buf), (_count), (_datatype)->name, (_src), (_tag), \
(_comm)->c_contextid, (_comm)->c_name);
#define PML_UCX_TRACE_PROBE(_msg, _src, _tag, _comm) \
PML_UCX_VERBOSE(8, _msg " src %d tag %d comm %d '%s'", \
_src, (_tag), (_comm)->c_contextid, (_comm)->c_name);
#define PML_UCX_TRACE_MRECV(_msg, _buf, _count, _datatype, _message) \
PML_UCX_VERBOSE(8, _msg " buf %p count %zu type '%s' msg *%p=%p (%p)", \
(_buf), (_count), (_datatype)->name, (void*)(_message), \
(void*)*(_message), (*(_message))->req_ptr);
#define MODEX_KEY "pml-ucx"
mca_pml_ucx_module_t ompi_pml_ucx = {
{
mca_pml_ucx_add_procs,
mca_pml_ucx_del_procs,
mca_pml_ucx_enable,
NULL,
mca_pml_ucx_add_comm,
mca_pml_ucx_del_comm,
mca_pml_ucx_irecv_init,
mca_pml_ucx_irecv,
mca_pml_ucx_recv,
mca_pml_ucx_isend_init,
mca_pml_ucx_isend,
mca_pml_ucx_send,
mca_pml_ucx_iprobe,
mca_pml_ucx_probe,
mca_pml_ucx_start,
mca_pml_ucx_improbe,
mca_pml_ucx_mprobe,
mca_pml_ucx_imrecv,
mca_pml_ucx_mrecv,
mca_pml_ucx_dump,
NULL, /* FT */
1ul << (PML_UCX_TAG_BITS - 1),
1ul << (PML_UCX_CONTEXT_BITS),
},
NULL,
NULL
};
static int mca_pml_ucx_send_worker_address(void)
{
ucp_address_t *address;
ucs_status_t status;
size_t addrlen;
int rc;
status = ucp_worker_get_address(ompi_pml_ucx.ucp_worker, &address, &addrlen);
if (UCS_OK != status) {
PML_UCX_ERROR("Failed to get worker address");
return OMPI_ERROR;
}
OPAL_MODEX_SEND(rc, OPAL_PMIX_GLOBAL,
&mca_pml_ucx_component.pmlm_version, (void*)address, addrlen);
if (OMPI_SUCCESS != rc) {
PML_UCX_ERROR("Open MPI couldn't distribute EP connection details");
return OMPI_ERROR;
}
ucp_worker_release_address(ompi_pml_ucx.ucp_worker, address);
return OMPI_SUCCESS;
}
static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc,
ucp_address_t **address_p,
size_t *addrlen_p)
{
int ret;
*address_p = NULL;
OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name,
(void**)address_p, addrlen_p);
if (ret < 0) {
PML_UCX_ERROR("Failed to receive EP address");
}
return ret;
}
int mca_pml_ucx_open(void)
{
ucp_params_t params;
ucp_config_t *config;
ucs_status_t status;
PML_UCX_VERBOSE(1, "mca_pml_ucx_open");
/* Read options */
status = ucp_config_read("MPI", NULL, &config);
if (UCS_OK != status) {
return OMPI_ERROR;
}
params.features = UCP_FEATURE_TAG;
params.request_size = sizeof(ompi_request_t);
params.request_init = mca_pml_ucx_request_init;
params.request_cleanup = mca_pml_ucx_request_cleanup;
status = ucp_init(&params, config, &ompi_pml_ucx.ucp_context);
ucp_config_release(config);
if (UCS_OK != status) {
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_close(void)
{
PML_UCX_VERBOSE(1, "mca_pml_ucx_close");
if (ompi_pml_ucx.ucp_context != NULL) {
ucp_cleanup(ompi_pml_ucx.ucp_context);
ompi_pml_ucx.ucp_context = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_init(void)
{
ucs_status_t status;
int rc;
PML_UCX_VERBOSE(1, "mca_pml_ucx_init");
/* TODO check MPI thread mode */
status = ucp_worker_create(ompi_pml_ucx.ucp_context, UCS_THREAD_MODE_SINGLE,
&ompi_pml_ucx.ucp_worker);
if (UCS_OK != status) {
return OMPI_ERROR;
}
rc = mca_pml_ucx_send_worker_address();
if (rc < 0) {
return rc;
}
/* Initialize the free lists */
OBJ_CONSTRUCT(&ompi_pml_ucx.persistent_reqs, mca_pml_ucx_freelist_t);
OBJ_CONSTRUCT(&ompi_pml_ucx.convs, mca_pml_ucx_freelist_t);
/* Create a completed request to be returned from isend */
mca_pml_ucx_completed_request_init(&ompi_pml_ucx.completed_send_req);
opal_progress_register(mca_pml_ucx_progress);
PML_UCX_VERBOSE(2, "created ucp context %p, worker %p",
(void *)ompi_pml_ucx.ucp_context,
(void *)ompi_pml_ucx.ucp_worker);
return OMPI_SUCCESS;
}
int mca_pml_ucx_cleanup(void)
{
PML_UCX_VERBOSE(1, "mca_pml_ucx_cleanup");
opal_progress_unregister(mca_pml_ucx_progress);
OMPI_REQUEST_FINI(&ompi_pml_ucx.completed_send_req);
OBJ_DESTRUCT(&ompi_pml_ucx.convs);
OBJ_DESTRUCT(&ompi_pml_ucx.persistent_reqs);
if (ompi_pml_ucx.ucp_worker) {
ucp_worker_destroy(ompi_pml_ucx.ucp_worker);
ompi_pml_ucx.ucp_worker = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ucp_address_t *address;
ucs_status_t status;
size_t addrlen;
ucp_ep_h ep;
size_t i;
int ret;
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
procs,
nprocs))) {
return ret;
}
for (i = 0; i < nprocs; ++i) {
ret = mca_pml_ucx_recv_worker_address(procs[i], &address, &addrlen);
if (ret < 0) {
return ret;
}
if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
PML_UCX_VERBOSE(3, "already connected to proc. %d", procs[i]->super.proc_name.vpid);
continue;
}
PML_UCX_VERBOSE(2, "connecting to proc. %d", procs[i]->super.proc_name.vpid);
status = ucp_ep_create(ompi_pml_ucx.ucp_worker, address, &ep);
free(address);
if (UCS_OK != status) {
PML_UCX_ERROR("Failed to connect");
return OMPI_ERROR;
}
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ucp_ep_h ep;
size_t i;
for (i = 0; i < nprocs; ++i) {
PML_UCX_VERBOSE(2, "disconnecting from rank %d", procs[i]->super.proc_name.vpid);
ep = procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
if (ep != NULL) {
ucp_ep_destroy(ep);
}
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_enable(bool enable)
{
PML_UCX_FREELIST_INIT(&ompi_pml_ucx.persistent_reqs,
mca_pml_ucx_persistent_request_t,
128, -1, 128);
PML_UCX_FREELIST_INIT(&ompi_pml_ucx.convs,
mca_pml_ucx_convertor_t,
128, -1, 128);
return OMPI_SUCCESS;
}
int mca_pml_ucx_progress(void)
{
static int inprogress = 0;
if (inprogress != 0) {
return 0;
}
++inprogress;
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
--inprogress;
return OMPI_SUCCESS;
}
int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm)
{
return OMPI_SUCCESS;
}
int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm)
{
return OMPI_SUCCESS;
}
int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
mca_pml_ucx_persistent_request_t *req;
req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs);
if (req == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PML_UCX_TRACE_RECV("irecv_init request *%p=%p", buf, count, datatype, src,
tag, comm, (void*)request, (void*)req);
req->ompi.req_state = OMPI_REQUEST_INACTIVE;
req->flags = 0;
req->buffer = buf;
req->count = count;
req->datatype = mca_pml_ucx_get_datatype(datatype);
PML_UCX_MAKE_RECV_TAG(req->tag, req->recv.tag_mask, tag, src, comm);
*request = &req->ompi;
return OMPI_SUCCESS;
}
int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ompi_request_t *req;
PML_UCX_TRACE_RECV("irecv request *%p", buf, count, datatype, src, tag, comm,
(void*)request);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
ucp_tag, ucp_tag_mask,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
*request = req;
return OMPI_SUCCESS;
}
static void
mca_pml_ucx_blocking_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "blocking receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)req, ucs_status_string(status), info->sender_tag,
info->length);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
req->req_complete = true;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ompi_request_t *req;
PML_UCX_TRACE_RECV("%s", buf, count, datatype, src, tag, comm, "recv");
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
ucp_tag, ucp_tag_mask,
mca_pml_ucx_blocking_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
while (!req->req_complete) {
opal_progress();
}
if (mpi_status != MPI_STATUS_IGNORE) {
*mpi_status = req->req_status;
}
req->req_complete = false;
ucp_request_release(req);
return OMPI_SUCCESS;
}
static inline const char *mca_pml_ucx_send_mode_name(mca_pml_base_send_mode_t mode)
{
switch (mode) {
case MCA_PML_BASE_SEND_SYNCHRONOUS:
return "sync";
case MCA_PML_BASE_SEND_COMPLETE:
return "complete";
case MCA_PML_BASE_SEND_BUFFERED:
return "buffered";
case MCA_PML_BASE_SEND_READY:
return "ready";
case MCA_PML_BASE_SEND_STANDARD:
return "standard";
case MCA_PML_BASE_SEND_SIZE:
return "size";
default:
return "unknown";
}
}
int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
mca_pml_ucx_persistent_request_t *req;
req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs);
if (req == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PML_UCX_TRACE_SEND("isend_init request *%p=%p", buf, count, datatype, dst,
tag, mode, comm, (void*)request, (void*)req)
req->ompi.req_state = OMPI_REQUEST_INACTIVE;
req->flags = MCA_PML_UCX_REQUEST_FLAG_SEND;
req->buffer = (void *)buf;
req->count = count;
req->datatype = mca_pml_ucx_get_datatype(datatype);
req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm);
req->send.mode = mode;
req->send.ep = mca_pml_ucx_get_ep(comm, dst);
*request = &req->ompi;
return OMPI_SUCCESS;
}
int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request)
{
ompi_request_t *req;
PML_UCX_TRACE_SEND("isend request *%p", buf, count, datatype, dst, tag, mode,
comm, (void*)request)
/* TODO special care to sync/buffered send */
req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count,
mca_pml_ucx_get_datatype(datatype),
PML_UCX_MAKE_SEND_TAG(tag, comm),
mca_pml_ucx_send_completion);
if (req == NULL) {
PML_UCX_VERBOSE(8, "returning completed request");
*request = &ompi_pml_ucx.completed_send_req;
return OMPI_SUCCESS;
} else if (!UCS_PTR_IS_ERR(req)) {
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
*request = req;
return OMPI_SUCCESS;
} else {
PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
}
int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm)
{
ompi_request_t *req;
PML_UCX_TRACE_SEND("%s", buf, count, datatype, dst, tag, mode, comm, "send");
/* TODO special care to sync/buffered send */
req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count,
mca_pml_ucx_get_datatype(datatype),
PML_UCX_MAKE_SEND_TAG(tag, comm),
mca_pml_ucx_send_completion);
if (req == NULL) {
return OMPI_SUCCESS;
} else if (!UCS_PTR_IS_ERR(req)) {
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
ompi_request_wait(&req, MPI_STATUS_IGNORE);
return OMPI_SUCCESS;
} else {
PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
}
int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("iprobe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
0, &info);
if (ucp_msg != NULL) {
*matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else {
*matched = 0;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("probe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
0, &info);
if (ucp_msg != NULL) {
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS;
}
opal_progress();
}
}
int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, struct ompi_message_t **message,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("improbe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
1, &info);
if (ucp_msg != NULL) {
PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message);
PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg);
*matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else if (UCS_PTR_STATUS(ucp_msg) == UCS_ERR_NO_MESSAGE) {
*matched = 0;
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* mpi_status)
{
ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg;
PML_UCX_TRACE_PROBE("mprobe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
1, &info);
if (ucp_msg != NULL) {
PML_UCX_MESSAGE_NEW(comm, ucp_msg, &info, message);
PML_UCX_VERBOSE(8, "got message %p (%p)", (void*)*message, ucp_msg);
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS;
}
opal_progress();
}
}
int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request)
{
ompi_request_t *req;
PML_UCX_TRACE_MRECV("imrecv", buf, count, datatype, message);
req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
(*message)->req_ptr,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
PML_UCX_MESSAGE_RELEASE(message);
*request = req;
return OMPI_SUCCESS;
}
int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status)
{
ompi_request_t *req;
PML_UCX_TRACE_MRECV("mrecv", buf, count, datatype, message);
req = (ompi_request_t*)ucp_tag_msg_recv_nb(ompi_pml_ucx.ucp_worker, buf, count,
mca_pml_ucx_get_datatype(datatype),
(*message)->req_ptr,
mca_pml_ucx_recv_completion);
if (UCS_PTR_IS_ERR(req)) {
PML_UCX_ERROR("ucx msg recv failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR;
}
PML_UCX_MESSAGE_RELEASE(message);
ompi_request_wait(&req, status);
return OMPI_SUCCESS;
}
int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
{
mca_pml_ucx_persistent_request_t *preq;
ompi_request_t *tmp_req;
size_t i;
for (i = 0; i < count; ++i) {
preq = (mca_pml_ucx_persistent_request_t *)requests[i];
if ((preq == NULL) || (OMPI_REQUEST_PML != preq->ompi.req_type)) {
/* Skip irrelevant requests */
continue;
}
PML_UCX_ASSERT(preq->ompi.req_state != OMPI_REQUEST_INVALID);
preq->ompi.req_state = OMPI_REQUEST_ACTIVE;
mca_pml_ucx_request_reset(&preq->ompi);
if (preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) {
/* TODO special care to sync/buffered send */
PML_UCX_VERBOSE(8, "start send request %p", (void*)preq);
tmp_req = (ompi_request_t*)ucp_tag_send_nb(preq->send.ep, preq->buffer,
preq->count, preq->datatype,
preq->tag,
mca_pml_ucx_psend_completion);
} else {
PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq);
tmp_req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker,
preq->buffer, preq->count,
preq->datatype, preq->tag,
preq->recv.tag_mask,
mca_pml_ucx_precv_completion);
}
if (tmp_req == NULL) {
/* Only send can complete immediately */
PML_UCX_ASSERT(preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND);
PML_UCX_VERBOSE(8, "send completed immediately, completing persistent request %p",
(void*)preq);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_send_status(&preq->ompi.req_status, UCS_OK);
ompi_request_complete(&preq->ompi, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else if (!UCS_PTR_IS_ERR(tmp_req)) {
OPAL_THREAD_LOCK(&ompi_request_lock);
if (tmp_req->req_complete) {
/* tmp_req is already completed */
PML_UCX_VERBOSE(8, "completing persistent request %p", (void*)preq);
mca_pml_ucx_persistent_requset_complete(preq, tmp_req);
} else {
/* tmp_req would be completed by callback and trigger completion
* of preq */
PML_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p",
(void*)tmp_req, (void*)preq);
tmp_req->req_complete_cb_data = preq;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
PML_UCX_ERROR("ucx %s failed: %s",
(preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) ? "send" : "recv",
ucs_status_string(UCS_PTR_STATUS(tmp_req)));
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose)
{
return OMPI_SUCCESS;
}

147
ompi/mca/pml/ucx/pml_ucx.h Обычный файл
Просмотреть файл

@ -0,0 +1,147 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_H_
#define PML_UCX_H_
#include "ompi_config.h"
#include "ompi/request/request.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include <ucp/api/ucp.h>
#include "pml_ucx_freelist.h"
typedef struct mca_pml_ucx_module mca_pml_ucx_module_t;
typedef struct pml_ucx_persistent_request mca_pml_ucx_persistent_request_t;
typedef struct pml_ucx_convertor mca_pml_ucx_convertor_t;
/*
* TODO version check
*/
struct mca_pml_ucx_module {
mca_pml_base_module_t super;
/* UCX global objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
/* Requests */
mca_pml_ucx_freelist_t persistent_reqs;
ompi_request_t completed_send_req;
/* Convertors pool */
mca_pml_ucx_freelist_t convs;
int priority;
int verbose;
int output;
};
extern mca_pml_base_component_2_0_0_t mca_pml_ucx_component;
extern mca_pml_ucx_module_t ompi_pml_ucx;
/* Debugging */
#define PML_UCX_ENABLE_DEBUG OPAL_ENABLE_DEBUG
#if PML_UCX_ENABLE_DEBUG
# define PML_UCX_MAX_VERBOSE 9
# define PML_UCX_ASSERT(_x) assert(_x)
#else
# define PML_UCX_MAX_VERBOSE 2
# define PML_UCX_ASSERT(_x)
#endif
#define _PML_UCX_QUOTE(_x) \
# _x
#define PML_UCX_QUOTE(_x) \
_PML_UCX_QUOTE(_x)
#define PML_UCX_ERROR(...) \
opal_output_verbose(0, ompi_pml_ucx.output, "Error: " __FILE__ ":" \
PML_UCX_QUOTE(__LINE__) __VA_ARGS__)
#define PML_UCX_VERBOSE(_level, ... ) \
if (((_level) <= PML_UCX_MAX_VERBOSE) && ((_level) <= ompi_pml_ucx.verbose)) { \
opal_output_verbose(_level, ompi_pml_ucx.output, __FILE__ ":" \
PML_UCX_QUOTE(__LINE__) __VA_ARGS__); \
}
int mca_pml_ucx_open(void);
int mca_pml_ucx_close(void);
int mca_pml_ucx_init(void);
int mca_pml_ucx_cleanup(void);
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs);
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs);
int mca_pml_ucx_enable(bool enable);
int mca_pml_ucx_progress(void);
int mca_pml_ucx_add_comm(struct ompi_communicator_t* comm);
int mca_pml_ucx_del_comm(struct ompi_communicator_t* comm);
int mca_pml_ucx_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_irecv(void *buf, size_t count, ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src,
int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* status);
int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
int dst, int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm,
struct ompi_request_t **request);
int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, int dst,
int tag, mca_pml_base_send_mode_t mode,
struct ompi_communicator_t* comm);
int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, ompi_status_public_t* status);
int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm,
ompi_status_public_t* status);
int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_imrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
struct ompi_request_t **request);
int mca_pml_ucx_mrecv(void *buf, size_t count, ompi_datatype_t *datatype,
struct ompi_message_t **message,
ompi_status_public_t* status);
int mca_pml_ucx_start(size_t count, ompi_request_t** requests);
int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose);
#endif /* PML_UCX_H_ */

107
ompi/mca/pml/ucx/pml_ucx_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,107 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx.h"
static int mca_pml_ucx_component_register(void);
static int mca_pml_ucx_component_open(void);
static int mca_pml_ucx_component_close(void);
static mca_pml_base_module_t*
mca_pml_ucx_component_init(int* priority, bool enable_progress_threads,
bool enable_mpi_threads);
static int mca_pml_ucx_component_fini(void);
mca_pml_base_component_2_0_0_t mca_pml_ucx_component = {
/* First, the mca_base_component_t struct containing meta
* information about the component itself */
{
MCA_PML_BASE_VERSION_2_0_0,
"ucx", /* MCA component name */
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
mca_pml_ucx_component_open, /* component open */
mca_pml_ucx_component_close, /* component close */
NULL,
mca_pml_ucx_component_register,
},
{
/* This component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},
mca_pml_ucx_component_init, /* component init */
mca_pml_ucx_component_fini /* component finalize */
};
static int mca_pml_ucx_component_register(void)
{
ompi_pml_ucx.verbose = 0;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "verbose",
"Verbose level of the UCX component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pml_ucx.verbose);
ompi_pml_ucx.priority = 50;
(void) mca_base_component_var_register(&mca_pml_ucx_component.pmlm_version, "priority",
"Priority of the UCX component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_3,
MCA_BASE_VAR_SCOPE_LOCAL,
&ompi_pml_ucx.priority);
return 0;
}
static int mca_pml_ucx_component_open(void)
{
ompi_pml_ucx.output = opal_output_open(NULL);
opal_output_set_verbosity(ompi_pml_ucx.output, ompi_pml_ucx.verbose);
return mca_pml_ucx_open();
}
static int mca_pml_ucx_component_close(void)
{
int rc;
rc = mca_pml_ucx_close();
if (rc != 0) {
return rc;
}
opal_output_close(ompi_pml_ucx.output);
return 0;
}
static mca_pml_base_module_t*
mca_pml_ucx_component_init(int* priority, bool enable_progress_threads,
bool enable_mpi_threads)
{
int ret;
if ( (ret = mca_pml_ucx_init()) != 0) {
return NULL;
}
*priority = ompi_pml_ucx.priority;
return &ompi_pml_ucx.super;
}
static int mca_pml_ucx_component_fini(void)
{
return mca_pml_ucx_cleanup();
}

157
ompi/mca/pml/ucx/pml_ucx_datatype.c Обычный файл
Просмотреть файл

@ -0,0 +1,157 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx_datatype.h"
#include "ompi/runtime/mpiruntime.h"
#include <inttypes.h>
static void* pml_ucx_generic_datatype_start_pack(void *context, const void *buffer,
size_t count)
{
ompi_datatype_t *datatype = context;
mca_pml_ucx_convertor_t *convertor;
convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs);
OBJ_RETAIN(datatype);
convertor->datatype = datatype;
opal_convertor_copy_and_prepare_for_send(ompi_proc_local_proc->super.proc_convertor,
&datatype->super, count, buffer, 0,
&convertor->opal_conv);
return convertor;
}
static void* pml_ucx_generic_datatype_start_unpack(void *context, void *buffer,
size_t count)
{
ompi_datatype_t *datatype = context;
mca_pml_ucx_convertor_t *convertor;
convertor = (mca_pml_ucx_convertor_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.convs);
OBJ_RETAIN(datatype);
convertor->datatype = datatype;
opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor,
&datatype->super, count, buffer, 0,
&convertor->opal_conv);
return convertor;
}
static size_t pml_ucx_generic_datatype_packed_size(void *state)
{
mca_pml_ucx_convertor_t *convertor = state;
size_t size;
opal_convertor_get_packed_size(&convertor->opal_conv, &size);
return size;
}
static size_t pml_ucx_generic_datatype_pack(void *state, size_t offset,
void *dest, size_t max_length)
{
mca_pml_ucx_convertor_t *convertor = state;
uint32_t iov_count;
struct iovec iov;
size_t length;
iov_count = 1;
iov.iov_base = dest;
iov.iov_len = max_length;
opal_convertor_set_position(&convertor->opal_conv, &offset);
length = max_length;
opal_convertor_pack(&convertor->opal_conv, &iov, &iov_count, &length);
return length;
}
static ucs_status_t pml_ucx_generic_datatype_unpack(void *state, size_t offset,
const void *src, size_t length)
{
mca_pml_ucx_convertor_t *convertor = state;
uint32_t iov_count;
struct iovec iov;
iov_count = 1;
iov.iov_base = (void*)src;
iov.iov_len = length;
opal_convertor_set_position(&convertor->opal_conv, &offset);
opal_convertor_unpack(&convertor->opal_conv, &iov, &iov_count, &length);
return UCS_OK;
}
static void pml_ucx_generic_datatype_finish(void *state)
{
mca_pml_ucx_convertor_t *convertor = state;
opal_convertor_cleanup(&convertor->opal_conv);
OBJ_RELEASE(convertor->datatype);
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.convs, &convertor->super);
}
static ucp_generic_dt_ops_t pml_ucx_generic_datatype_ops = {
.start_pack = pml_ucx_generic_datatype_start_pack,
.start_unpack = pml_ucx_generic_datatype_start_unpack,
.packed_size = pml_ucx_generic_datatype_packed_size,
.pack = pml_ucx_generic_datatype_pack,
.unpack = pml_ucx_generic_datatype_unpack,
.finish = pml_ucx_generic_datatype_finish
};
ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype)
{
ucp_datatype_t ucp_datatype;
ucs_status_t status;
ptrdiff_t lb;
size_t size;
ompi_datatype_type_lb(datatype, &lb);
if ((datatype->super.flags & OPAL_DATATYPE_FLAG_CONTIGUOUS) &&
(datatype->super.flags & OPAL_DATATYPE_FLAG_NO_GAPS) &&
(lb == 0))
{
ompi_datatype_type_size(datatype, &size);
PML_UCX_ASSERT(size > 0);
datatype->pml_data = ucp_dt_make_contig(size);
return datatype->pml_data;
}
status = ucp_dt_create_generic(&pml_ucx_generic_datatype_ops,
datatype, &ucp_datatype);
if (status != UCS_OK) {
PML_UCX_ERROR("Failed to create UCX datatype for %s", datatype->name);
ompi_mpi_abort(&ompi_mpi_comm_world.comm, 1);
}
PML_UCX_VERBOSE(7, "created generic UCX datatype 0x%"PRIx64, ucp_datatype)
// TODO put this on a list to be destroyed later
datatype->pml_data = ucp_datatype;
return ucp_datatype;
}
static void mca_pml_ucx_convertor_construct(mca_pml_ucx_convertor_t *convertor)
{
OBJ_CONSTRUCT(&convertor->opal_conv, opal_convertor_t);
}
static void mca_pml_ucx_convertor_destruct(mca_pml_ucx_convertor_t *convertor)
{
OBJ_DESTRUCT(&convertor->opal_conv);
}
OBJ_CLASS_INSTANCE(mca_pml_ucx_convertor_t,
opal_free_list_item_t,
mca_pml_ucx_convertor_construct,
mca_pml_ucx_convertor_destruct);

39
ompi/mca/pml/ucx/pml_ucx_datatype.h Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_DATATYPE_H_
#define PML_UCX_DATATYPE_H_
#include "pml_ucx.h"
struct pml_ucx_convertor {
opal_free_list_item_t super;
ompi_datatype_t *datatype;
opal_convertor_t opal_conv;
};
ucp_datatype_t mca_pml_ucx_init_datatype(ompi_datatype_t *datatype);
OBJ_CLASS_DECLARATION(mca_pml_ucx_convertor_t);
static inline ucp_datatype_t mca_pml_ucx_get_datatype(ompi_datatype_t *datatype)
{
ucp_datatype_t ucp_type = datatype->pml_data;
if (OPAL_LIKELY(ucp_type != 0)) {
return ucp_type;
}
return mca_pml_ucx_init_datatype(datatype);
}
#endif /* PML_UCX_DATATYPE_H_ */

30
ompi/mca/pml/ucx/pml_ucx_freelist.h Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_FREELIST_H_
#define PML_UCX_FREELIST_H_
#include "ompi_config.h"
#include "opal/class/opal_free_list.h"
#define mca_pml_ucx_freelist_t opal_free_list_t
#define PML_UCX_FREELIST_GET(_freelist) \
opal_free_list_get (_freelist)
#define PML_UCX_FREELIST_RETURN(_freelist, _item) \
opal_free_list_return(_freelist, _item)
#define PML_UCX_FREELIST_INIT(_fl, _type, _initial, _max, _batch) \
opal_free_list_init(_fl, sizeof(_type), 8, OBJ_CLASS(_type), \
0, 0, _initial, _max, _batch, NULL, 0, NULL, NULL, NULL)
#endif /* PML_UCX_FREELIST_H_ */

178
ompi/mca/pml/ucx/pml_ucx_request.c Обычный файл
Просмотреть файл

@ -0,0 +1,178 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "pml_ucx_request.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/message/message.h"
#include <inttypes.h>
static int mca_pml_ucx_request_free(ompi_request_t **rptr)
{
ompi_request_t *req = *rptr;
PML_UCX_VERBOSE(9, "free request *%p=%p", (void*)rptr, (void*)req);
*rptr = MPI_REQUEST_NULL;
mca_pml_ucx_request_reset(req);
ucp_request_release(req);
return OMPI_SUCCESS;
}
void mca_pml_ucx_send_completion(void *request, ucs_status_t status)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "send request %p completed with status %s", (void*)req,
ucs_status_string(status));
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_send_status(&req->req_status, status);
PML_UCX_ASSERT(!req->req_complete);
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *req = request;
PML_UCX_VERBOSE(8, "receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)req, ucs_status_string(status), info->sender_tag,
info->length);
OPAL_THREAD_LOCK(&ompi_request_lock);
mca_pml_ucx_set_recv_status(&req->req_status, status, info);
PML_UCX_ASSERT(!req->req_complete);
ompi_request_complete(req, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq,
ompi_request_t *tmp_req)
{
preq->ompi.req_status = tmp_req->req_status;
ompi_request_complete(&preq->ompi, true);
tmp_req->req_complete_cb_data = NULL;
mca_pml_ucx_request_reset(tmp_req);
ucp_request_release(tmp_req);
}
static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req)
{
mca_pml_ucx_persistent_request_t *preq;
OPAL_THREAD_LOCK(&ompi_request_lock);
ompi_request_complete(tmp_req, false);
preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data;
if (preq != NULL) {
mca_pml_ucx_persistent_requset_complete(preq, tmp_req);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
void mca_pml_ucx_psend_completion(void *request, ucs_status_t status)
{
ompi_request_t *tmp_req = request;
PML_UCX_VERBOSE(8, "persistent send request %p completed with status %s",
(void*)tmp_req, ucs_status_string(status));
mca_pml_ucx_set_send_status(&tmp_req->req_status, status);
mca_pml_ucx_preq_completion(tmp_req);
}
void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
ompi_request_t *tmp_req = request;
PML_UCX_VERBOSE(8, "persistent receive request %p completed with status %s tag %"PRIx64" len %zu",
(void*)tmp_req, ucs_status_string(status), info->sender_tag,
info->length);
mca_pml_ucx_set_recv_status(&tmp_req->req_status, status, info);
mca_pml_ucx_preq_completion(tmp_req);
}
static void mca_pml_ucx_request_init_common(ompi_request_t* ompi_req,
bool req_persistent,
ompi_request_state_t state,
ompi_request_free_fn_t req_free,
ompi_request_cancel_fn_t req_cancel)
{
OMPI_REQUEST_INIT(ompi_req, req_persistent);
ompi_req->req_type = OMPI_REQUEST_PML;
ompi_req->req_state = state;
ompi_req->req_complete_cb = NULL;
ompi_req->req_complete_cb_data = NULL;
ompi_req->req_free = req_free;
ompi_req->req_cancel = req_cancel;
}
void mca_pml_ucx_request_init(void *request)
{
ompi_request_t* ompi_req = request;
mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
mca_pml_ucx_request_free, NULL);
}
void mca_pml_ucx_request_cleanup(void *request)
{
ompi_request_t* ompi_req = request;
OMPI_REQUEST_FINI(ompi_req);
}
static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr)
{
mca_pml_ucx_persistent_request_t* req = (mca_pml_ucx_persistent_request_t*)*rptr;
*rptr = MPI_REQUEST_NULL;
req->ompi.req_state = OMPI_REQUEST_INVALID;
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &req->ompi.super);
return OMPI_SUCCESS;
}
static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req)
{
mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE,
mca_pml_ucx_persistent_request_free, NULL);
}
static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req)
{
OMPI_REQUEST_FINI(&req->ompi);
}
OBJ_CLASS_INSTANCE(mca_pml_ucx_persistent_request_t,
ompi_request_t,
mca_pml_ucx_persisternt_request_construct,
mca_pml_ucx_persisternt_request_destruct);
static int mca_pml_completed_request_free(struct ompi_request_t** rptr)
{
*rptr = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static int mca_pml_completed_request_cancel(struct ompi_request_t* ompi_req, int flag)
{
return OMPI_SUCCESS;
}
void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req)
{
mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
mca_pml_completed_request_free,
mca_pml_completed_request_cancel);
ompi_request_complete(ompi_req, false);
}

183
ompi/mca/pml/ucx/pml_ucx_request.h Обычный файл
Просмотреть файл

@ -0,0 +1,183 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef PML_UCX_REQUEST_H_
#define PML_UCX_REQUEST_H_
#include "pml_ucx.h"
#include "pml_ucx_datatype.h"
enum {
MCA_PML_UCX_REQUEST_FLAG_SEND = (1 << 0), /* Persistent send */
MCA_PML_UCX_REQUEST_FLAG_FREE_CALLED = (1 << 1),
MCA_PML_UCX_REQUEST_FLAG_COMPLETED = (1 << 2)
};
/*
* UCX tag structure:
*
* 01234567 01234567 01234567 01234567 01234567 01234567 01234567 01234567
* | |
* message tag (24) | source rank (24) | context id (16)
* | |
*/
#define PML_UCX_TAG_BITS 24
#define PML_UCX_RANK_BITS 24
#define PML_UCX_CONTEXT_BITS 16
#define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \
((((uint64_t) (_tag) ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \
(((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \
((uint64_t)(_comm)->c_contextid))
#define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \
{ \
if ((_src) == MPI_ANY_SOURCE) { \
_ucp_tag_mask = 0x800000000000fffful; \
} else { \
_ucp_tag_mask = 0x800000fffffffffful; \
} \
\
_ucp_tag = (((uint64_t)(_src) & UCS_MASK(PML_UCX_RANK_BITS)) << PML_UCX_CONTEXT_BITS) | \
(_comm)->c_contextid; \
\
if ((_tag) != MPI_ANY_TAG) { \
_ucp_tag_mask |= 0x7fffff0000000000ul; \
_ucp_tag |= ((uint64_t)(_tag)) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS); \
} \
}
#define PML_UCX_TAG_GET_SOURCE(_tag) \
(((_tag) >> PML_UCX_CONTEXT_BITS) & UCS_MASK(PML_UCX_RANK_BITS))
#define PML_UCX_TAG_GET_MPI_TAG(_tag) \
((_tag) >> (PML_UCX_CONTEXT_BITS + PML_UCX_RANK_BITS))
#define PML_UCX_MESSAGE_NEW(_comm, _ucp_msg, _info, _message) \
{ \
struct ompi_message_t *msg = ompi_message_alloc(); \
if (msg == NULL) { \
/* TODO release UCP message */ \
return OMPI_ERR_OUT_OF_RESOURCE; \
} \
\
msg->comm = (_comm); \
msg->req_ptr = (_ucp_msg); \
msg->peer = PML_UCX_TAG_GET_SOURCE((_info)->sender_tag); \
msg->count = (_info)->length; \
*(_message) = msg; \
}
#define PML_UCX_MESSAGE_RELEASE(_message) \
{ \
ompi_message_return(*(_message)); \
*(_message) = NULL; \
}
struct pml_ucx_persistent_request {
ompi_request_t ompi;
unsigned flags;
void *buffer;
size_t count;
ucp_datatype_t datatype;
ucp_tag_t tag;
struct {
mca_pml_base_send_mode_t mode;
ucp_ep_h ep;
} send;
struct {
ucp_tag_t tag_mask;
} recv;
};
void mca_pml_ucx_send_completion(void *request, ucs_status_t status);
void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info);
void mca_pml_ucx_psend_completion(void *request, ucs_status_t status);
void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info);
void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq,
ompi_request_t *tmp_req);
void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req);
void mca_pml_ucx_request_init(void *request);
void mca_pml_ucx_request_cleanup(void *request);
static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
{
return ompi_comm_peer_lookup(comm, dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
}
static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
{
req->req_complete = false;
req->req_status._cancelled = false;
}
static void mca_pml_ucx_set_send_status(ompi_status_public_t* mpi_status,
ucs_status_t status)
{
if (status == UCS_OK) {
mpi_status->MPI_ERROR = MPI_SUCCESS;
} else if (status == UCS_ERR_CANCELED) {
mpi_status->_cancelled = true;
} else {
mpi_status->MPI_ERROR = MPI_ERR_INTERN;
}
}
static inline void mca_pml_ucx_set_recv_status(ompi_status_public_t* mpi_status,
ucs_status_t ucp_status,
const ucp_tag_recv_info_t *info)
{
int64_t tag;
if (ucp_status == UCS_OK) {
tag = info->sender_tag;
mpi_status->MPI_ERROR = MPI_SUCCESS;
mpi_status->MPI_SOURCE = PML_UCX_TAG_GET_SOURCE(tag);
mpi_status->MPI_TAG = PML_UCX_TAG_GET_MPI_TAG(tag);
mpi_status->_ucount = info->length;
} else if (ucp_status == UCS_ERR_MESSAGE_TRUNCATED) {
mpi_status->MPI_ERROR = MPI_ERR_TRUNCATE;
} else if (ucp_status == UCS_ERR_CANCELED) {
mpi_status->_cancelled = true;
} else {
mpi_status->MPI_ERROR = MPI_ERR_INTERN;
}
}
static inline void mca_pml_ucx_set_recv_status_safe(ompi_status_public_t* mpi_status,
ucs_status_t ucp_status,
const ucp_tag_recv_info_t *info)
{
if (mpi_status != MPI_STATUS_IGNORE) {
mca_pml_ucx_set_recv_status(mpi_status, ucp_status, info);
}
}
OBJ_CLASS_DECLARATION(mca_pml_ucx_persistent_request_t);
#endif /* PML_UCX_REQUEST_H_ */