1
1

Adding an Open Fabrics Interfaces (OFI) MTL.

This MTL implementation uses the OFIWG libfabric's tag messaging capabilities.
Этот коммит содержится в:
Yohann Burette 2014-12-16 15:42:00 -08:00
родитель 68d78fd718
Коммит 58a7a1e4ac
17 изменённых файлов: 2120 добавлений и 0 удалений

53
ompi/mca/mtl/ofi/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,53 @@
#
# Copyright (c) 2013-2014 Intel, Inc. All rights reserved
#
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
EXTRA_DIST = post_configure.sh
AM_CPPFLAGS = $(opal_common_libfabric_CPPFLAGS)
dist_ompidata_DATA = help-mtl-ofi.txt
mtl_ofi_sources = \
mtl_ofi.h \
mtl_ofi.c \
mtl_ofi_cancel.c \
mtl_ofi_component.c \
mtl_ofi_endpoint.h \
mtl_ofi_endpoint.c \
mtl_ofi_message.h \
mtl_ofi_message.c \
mtl_ofi_probe.c \
mtl_ofi_recv.c \
mtl_ofi_request.h \
mtl_ofi_send.c \
mtl_ofi_types.h
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_ompi_mtl_ofi_DSO
component_noinst =
component_install = mca_mtl_ofi.la
else
component_noinst = libmca_mtl_ofi.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_mtl_ofi_la_SOURCES = $(mtl_ofi_sources)
mca_mtl_ofi_la_LIBADD = $(opal_common_libfabric_LIBADD)
mca_mtl_ofi_la_LDFLAGS = -module -avoid-version $(mtl_ofi_LDFLAGS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_mtl_ofi_la_SOURCES = $(mtl_ofi_sources)
libmca_mtl_ofi_la_LIBADD = $(opal_common_libfabric_LIBADD)
libmca_mtl_ofi_la_LDFLAGS = -module -avoid-version $(mtl_ofi_LDFLAGS)

32
ompi/mca/mtl/ofi/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,32 @@
# -*- shell-script -*-
#
# Copyright (c) 2013-2014 Intel, Inc. All rights reserved
#
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_ompi_mtl_ofi_POST_CONFIG(will_build)
# ----------------------------------------
# Only require the tag if we're actually going to be built
AC_DEFUN([MCA_ompi_mtl_ofi_POST_CONFIG], [
AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([MTL])])
])dnl
# MCA_mtl_ofi_CONFIG([action-if-can-compile],
# [action-if-cant-compile])
# ------------------------------------------------
AC_DEFUN([MCA_ompi_mtl_ofi_CONFIG],[
AC_CONFIG_FILES([ompi/mca/mtl/ofi/Makefile])
AS_IF([test $opal_common_libfabric_happy -eq 1],
[$1],
[$2])
# substitute in the things needed to build ofi
AC_SUBST([opal_common_libfabric_CPPFLAGS])
AC_SUBST([opal_common_libfabric_LIBADD])
])dnl

19
ompi/mca/mtl/ofi/help-mtl-ofi.txt Обычный файл
Просмотреть файл

@ -0,0 +1,19 @@
# -*- text -*-
#
# Copyright (c) 2013-2014 Intel, Inc. All rights reserved
#
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
[ofi init]
Initialization of OFI library failed.
Error: %s
#
[debug level]
Unable to set OFI debug level.
Error: %s

220
ompi/mca/mtl/ofi/mtl_ofi.c Обычный файл
Просмотреть файл

@ -0,0 +1,220 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/mtl/mtl.h"
#include "opal/class/opal_list.h"
#include "opal/mca/pmix/pmix.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_endpoint.h"
mca_mtl_ofi_module_t ompi_mtl_ofi = {
{
8191, /* max cid - 2^13 - 1 */
(1UL << 30), /* max tag value - must allow negatives */
0, /* request reserve space */
0, /* flags */
ompi_mtl_ofi_add_procs,
ompi_mtl_ofi_del_procs,
ompi_mtl_ofi_finalize,
ompi_mtl_ofi_send,
ompi_mtl_ofi_isend,
ompi_mtl_ofi_irecv,
ompi_mtl_ofi_iprobe,
ompi_mtl_ofi_imrecv,
ompi_mtl_ofi_improbe,
ompi_mtl_ofi_cancel,
ompi_mtl_ofi_add_comm,
ompi_mtl_ofi_del_comm
},
0,
0,
NULL,
NULL
};
int
ompi_mtl_ofi_add_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t** procs)
{
int ret = OMPI_SUCCESS;
size_t i;
size_t size;
size_t namelen;
char *ep_name = NULL;
char *ep_names = NULL;
fi_addr_t *fi_addrs = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
namelen = ompi_mtl_ofi.epnamelen;
/**
* Create array of EP names.
*/
ep_names = malloc(nprocs * sizeof(namelen));
if (NULL == ep_names) {
ret = OMPI_ERROR;
goto bail;
}
/**
* Create array of fi_addrs.
*/
fi_addrs = malloc(nprocs * sizeof(fi_addr_t));
if (NULL == fi_addrs) {
ret = OMPI_ERROR;
goto bail;
}
/**
* Retrieve the processes' EP names from modex.
*/
for (i = 0 ; i < nprocs ; ++i) {
OPAL_MODEX_RECV(ret,
&mca_mtl_ofi_component.super.mtl_version,
&procs[i]->super,
(void**)&ep_name,
&size);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_modex_recv failed: %d\n",
__FILE__, __LINE__, ret);
goto bail;
}
memcpy(&ep_names[i*namelen], ep_name, namelen);
}
/**
* Map the EP names to fi_addrs.
*/
ret = fi_av_insert(ompi_mtl_ofi.av, ep_names, nprocs, fi_addrs, 0, NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_av_insert failed: %s\n",
__FILE__, __LINE__, fi_strerror(errno));
goto bail;
}
/**
* Store the fi_addrs within the endpoint objects.
*/
for (i = 0; i < (int) nprocs; i++) {
endpoint = OBJ_NEW(mca_mtl_ofi_endpoint_t);
endpoint->mtl_ofi_module = &ompi_mtl_ofi;
endpoint->peer_fiaddr = fi_addrs[i];
/* FIXME: What happens if this endpoint already exists? */
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = endpoint;
}
bail:
if (fi_addrs)
free(fi_addrs);
if (ep_names)
free(ep_names);
return ret;
}
int
ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t** procs)
{
int ret;
size_t i;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
for (i = 0 ; i < nprocs ; ++i) {
if (NULL != procs[i] &&
NULL != procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]) {
endpoint = procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
ret = fi_av_remove(ompi_mtl_ofi.av, &endpoint->peer_fiaddr, 1, 0);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_av_remove failed: %s\n", __FILE__, __LINE__, fi_strerror(errno));
return ret;
}
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = NULL;
OBJ_RELEASE(endpoint);
}
}
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
{
opal_progress_unregister(ompi_mtl_ofi_progress);
/**
* Close all the OFI objects
*/
if (fi_close((fid_t)ompi_mtl_ofi.ep)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
if (fi_close((fid_t)ompi_mtl_ofi.mr)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
if (fi_close((fid_t)ompi_mtl_ofi.cq)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
if (fi_close((fid_t)ompi_mtl_ofi.av)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
if (fi_close((fid_t)ompi_mtl_ofi.domain)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
if (fi_close((fid_t)ompi_mtl_ofi.fabric)) {
opal_output(ompi_mtl_base_framework.framework_output,
"fi_close failed: %s", strerror(errno));
abort();
}
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm)
{
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm)
{
return OMPI_SUCCESS;
}

98
ompi/mca/mtl/ofi/mtl_ofi.h Обычный файл
Просмотреть файл

@ -0,0 +1,98 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MTL_OFI_H_HAS_BEEN_INCLUDED
#define MTL_OFI_H_HAS_BEEN_INCLUDED
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/mtl/mtl.h"
#include "ompi/mca/mtl/base/base.h"
#include "opal/datatype/opal_convertor.h"
#include <rdma/fabric.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include <rdma/fi_errno.h>
#include <rdma/fi_tagged.h>
BEGIN_C_DECLS
/* MTL interface functions */
extern int ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl);
extern int ompi_mtl_ofi_add_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t **procs);
extern int ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t **procs);
extern int ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int dest,
int tag,
struct opal_convertor_t *convertor,
mca_pml_base_send_mode_t mode);
extern int ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int dest,
int tag,
struct opal_convertor_t *convertor,
mca_pml_base_send_mode_t mode,
bool blocking,
mca_mtl_request_t *mtl_request);
extern int ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
struct opal_convertor_t *convertor,
mca_mtl_request_t *mtl_request);
extern int ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
int *flag,
struct ompi_status_public_t *status);
extern int ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
struct opal_convertor_t *convertor,
struct ompi_message_t **message,
struct mca_mtl_request_t *mtl_request);
extern int ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
int *matched,
struct ompi_message_t **message,
struct ompi_status_public_t *status);
extern int ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
mca_mtl_request_t *mtl_request,
int flag);
extern int ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm);
extern int ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm);
extern int ompi_mtl_ofi_progress(void);
extern int ompi_mtl_ofi_get_error(int fi_error);
END_C_DECLS
#endif /* MTL_OFI_H_HAS_BEEN_INCLUDED */

58
ompi/mca/mtl/ofi/mtl_ofi_cancel.c Обычный файл
Просмотреть файл

@ -0,0 +1,58 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_request.h"
int
ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
mca_mtl_request_t *mtl_request,
int flag)
{
int ret;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
switch (ofi_req->type) {
case OMPI_MTL_OFI_SEND:
/**
* Cannot cancel sends yet
*/
break;
case OMPI_MTL_OFI_RECV:
/**
* Cancel a receive request only if it hasn't been matched yet.
* The event queue needs to be drained to make sure there isn't
* any pending receive completion event.
*/
ompi_mtl_ofi_progress();
if (!ofi_req->req_started) {
ret = fi_cancel((fid_t)ompi_mtl_ofi.ep, &ofi_req->ctx);
if (0 == ret) {
/**
* The request was successfully cancelled.
*/
ofi_req->super.ompi_req->req_status._cancelled = true;
ofi_req->super.completion_callback(&ofi_req->super);
}
}
break;
default:
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}

477
ompi/mca/mtl/ofi/mtl_ofi_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,477 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/mca/event/event.h"
#include "opal/util/output.h"
#include "opal/mca/pmix/pmix.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_request.h"
#include "mtl_ofi_message.h"
static int ompi_mtl_ofi_component_open(void);
static int ompi_mtl_ofi_component_close(void);
static int ompi_mtl_ofi_component_register(void);
static mca_mtl_base_module_t*
ompi_mtl_ofi_component_init(bool enable_progress_threads,
bool enable_mpi_threads);
mca_mtl_ofi_component_t mca_mtl_ofi_component = {
{
/* First, the mca_base_component_t struct containing meta
* information about the component itself */
{
MCA_MTL_BASE_VERSION_2_0_0,
"ofi", /* MCA component name */
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
ompi_mtl_ofi_component_open, /* component open */
ompi_mtl_ofi_component_close, /* component close */
NULL,
ompi_mtl_ofi_component_register
},
{
/* The component is not checkpoint ready */
MCA_BASE_METADATA_PARAM_NONE
},
ompi_mtl_ofi_component_init, /* component init */
}
};
static int
ompi_mtl_ofi_component_register(void)
{
return OMPI_SUCCESS;
}
static int
ompi_mtl_ofi_component_open(void)
{
ompi_mtl_ofi.base.mtl_request_size =
sizeof(ompi_mtl_ofi_request_t) - sizeof(struct mca_mtl_request_t);
OBJ_CONSTRUCT(&ompi_mtl_ofi.free_messages, opal_free_list_t);
opal_free_list_init(&ompi_mtl_ofi.free_messages,
sizeof(ompi_mtl_ofi_message_t),
OBJ_CLASS(ompi_mtl_ofi_message_t),
1, -1, 1);
ompi_mtl_ofi.domain = NULL;
ompi_mtl_ofi.av = NULL;
ompi_mtl_ofi.cq = NULL;
ompi_mtl_ofi.mr = NULL;
ompi_mtl_ofi.ep = NULL;
return OMPI_SUCCESS;
}
static int
ompi_mtl_ofi_component_close(void)
{
OBJ_DESTRUCT(&ompi_mtl_ofi.free_messages);
return OMPI_SUCCESS;
}
static mca_mtl_base_module_t*
ompi_mtl_ofi_component_init(bool enable_progress_threads,
bool enable_mpi_threads)
{
int ret, fi_version;
struct fi_info hints = {0};
struct fi_info *providers = NULL, *prov = NULL;
struct fi_domain_attr domain_attr = {0};
struct fi_tx_attr tx_attr = {0};
struct fi_cq_attr cq_attr = {0};
struct fi_av_attr av_attr = {0};
fi_addr_t ep_name = 0;
char *null_addr;
size_t namelen;
/**
* Hints to filter providers
* See man fi_getinfo for a list of all filters
* mode: Select capabilities MTL is prepared to support.
* In this case, MTL will pass in context into communication calls
* ep_type: reliable datagram operation
* caps: Capabilities required from the provider. The bits specified
* with buffered receive, cancel, and remote complete
* implements MPI semantics. Tagged is used to support tag
* matching.
* We expect to register all memory up front for use with this
* endpoint, so the MTL requires dynamic memory regions
*/
hints.mode = FI_CONTEXT;
hints.ep_type = FI_EP_RDM; /* Reliable datagram */
hints.caps = FI_TAGGED; /* Tag matching interface */
hints.caps |= FI_BUFFERED_RECV; /* Buffered receives */
hints.caps |= FI_REMOTE_COMPLETE; /* Remote completion */
hints.caps |= FI_CANCEL; /* Support cancel */
hints.caps |= FI_DYNAMIC_MR; /* Global dynamic mem region */
/**
* Refine filter for additional capabilities
* threading: Disable locking
* control_progress: enable async progress
* op_flags: Specifies default operation to set on all communication.
* In this case, we want remote completion to be set by default.
*/
domain_attr.threading = FI_THREAD_PROGRESS;
domain_attr.control_progress = FI_PROGRESS_AUTO;
tx_attr.op_flags = FI_REMOTE_COMPLETE;
hints.domain_attr = &domain_attr;
hints.tx_attr = &tx_attr;
/**
* FI_VERSION provides binary backward and forward compatibility support
* Specify the version of OFI is coded to, the provider will select struct
* layouts that are compatible with this version.
*/
fi_version = FI_VERSION(1, 0);
/**
* fi_getinfo: returns information about fabric services for reaching a
* remote node or service. this does not necessarily allocate resources.
* Pass NULL for name/service because we want a list of providers supported.
*/
ret = fi_getinfo(fi_version, /* OFI version requested */
NULL, /* Optional name or fabric to resolve */
NULL, /* Optional service name or port to request */
0ULL, /* Optional flag */
&hints, /* In: Hints to filter providers */
&providers); /* Out: List of matching providers */
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_getinfo failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Here we elect to use the first provider from the list.
* Further filtering could be done at this point (e.g. name).
*/
prov = providers;
/**
* Open fabric
* The getinfo struct returns a fabric attribute struct that can be used to
* instantiate the virtual or physical network. This opens a "fabric
* provider". See man fi_fabric for details.
*/
ret = fi_fabric(prov->fabric_attr, /* In: Fabric attributes */
&ompi_mtl_ofi.fabric, /* Out: Fabric handle */
NULL); /* Optional context for fabric events */
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_fabric failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Create the access domain, which is the physical or virtual network or
* hardware port/collection of ports. Returns a domain object that can be
* used to create endpoints. See man fi_domain for details.
*/
ret = fi_domain(ompi_mtl_ofi.fabric, /* In: Fabric object */
prov, /* In: Provider */
&ompi_mtl_ofi.domain, /* Out: Domain oject */
NULL); /* Optional context for domain events */
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_domain failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to completion counters or event queues and enabled,
* and the resources consumed by it, such as address vectors, counters,
* completion queues, etc.
* see man fi_endpoint for more details.
*/
ret = fi_endpoint(ompi_mtl_ofi.domain, /* In: Domain object */
prov, /* In: Provider */
&ompi_mtl_ofi.ep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_endpoint failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - completion queue for events
* - address vector of other endpoint addresses
* - dynamic memory-spanning memory region
*/
cq_attr.format = FI_CQ_FORMAT_TAGGED;
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_cq_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* The remote fi_addr will be stored in the ofi_endpoint struct.
* So, we use the AV in "map" mode.
*/
av_attr.type = FI_AV_MAP;
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_av_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* All OFI communication routines require at least one MR.
* This MTL only needs a single MR.
*/
ret = fi_mr_reg(ompi_mtl_ofi.domain, /* In: Domain object */
0, /* In: Lower memory address */
UINTPTR_MAX, /* In: Upper memory address */
FI_SEND | FI_RECV, /* In: Expose MR for read/write */
0ULL, /* In: base MR offset */
0ULL, /* In: requested key */
0ULL, /* In: No flags */
&ompi_mtl_ofi.mr, /* Out: memregion object */
NULL); /* Context: memregion events */
/**
* Bind the CQ and AV to the endpoint object.
*/
ret = fi_bind((fid_t)ompi_mtl_ofi.ep,
(fid_t)ompi_mtl_ofi.cq,
FI_SEND | FI_RECV);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_bind CQ-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
ret = fi_bind((fid_t)ompi_mtl_ofi.ep,
(fid_t)ompi_mtl_ofi.av,
0);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_bind AV-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Enable the endpoint for communication
* This commits the bind operations.
*/
ret = fi_enable(ompi_mtl_ofi.ep);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_enable failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Free providers info since it's not needed anymore.
*/
fi_freeinfo(providers);
/**
* Get our address and publish it with modex.
*/
namelen = sizeof(ep_name);
ret = fi_getname((fid_t)ompi_mtl_ofi.ep, &ep_name, &namelen);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_getname failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
OPAL_MODEX_SEND(ret, PMIX_SYNC_REQD, PMIX_GLOBAL,
&mca_mtl_ofi_component.super.mtl_version,
&ep_name, namelen);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_modex_send failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ompi_mtl_ofi.epnamelen = namelen;
/**
* Insert the ANY_SRC address.
*/
null_addr = malloc(namelen);
if (!null_addr) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: malloc failed\n", __FILE__, __LINE__);
goto error;
}
memset(null_addr, 0, namelen);
ret = fi_av_insert(ompi_mtl_ofi.av,
null_addr,
1,
&ompi_mtl_ofi.any_addr,
0ULL,
NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_av_insert failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
free(null_addr);
/**
* Activate progress callback.
*/
ret = opal_progress_register(ompi_mtl_ofi_progress);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
return &ompi_mtl_ofi.base;
error:
if (null_addr) {
free(null_addr);
}
if (providers) {
(void) fi_freeinfo(providers);
}
if (ompi_mtl_ofi.av) {
(void) fi_close((fid_t)ompi_mtl_ofi.av);
}
if (ompi_mtl_ofi.cq) {
(void) fi_close((fid_t)ompi_mtl_ofi.cq);
}
if (ompi_mtl_ofi.mr) {
(void) fi_close((fid_t)ompi_mtl_ofi.mr);
}
if (ompi_mtl_ofi.ep) {
(void) fi_close((fid_t)ompi_mtl_ofi.ep);
}
if (ompi_mtl_ofi.domain) {
(void) fi_close((fid_t)ompi_mtl_ofi.domain);
}
if (ompi_mtl_ofi.fabric) {
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
}
return NULL;
}
int
ompi_mtl_ofi_get_error(int error_num)
{
int ret;
switch (error_num) {
case 0:
ret = OMPI_SUCCESS;
break;
default:
ret = OMPI_ERROR;
}
return ret;
}
int
ompi_mtl_ofi_progress(void)
{
int ret, count = 0;
struct fi_cq_tagged_entry wc;
struct fi_cq_err_entry error;
ompi_mtl_ofi_request_t *ofi_req = NULL;
/**
* Read the work completions from the CQ.
* From the completion's op_context, we get the associated OFI request.
* Call the request's callback.
*/
while (true) {
memset(&wc, 0, sizeof(wc));
ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, 1);
if (ret > 0) {
count++;
if (NULL != wc.op_context) {
ofi_req = TO_OFI_REQ(wc.op_context);
assert(ofi_req);
ret = ofi_req->event_callback(&wc, ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_framework.framework_output,
"Error returned by request event callback: %d",
ret);
abort();
}
}
} else if (ret == -FI_EAVAIL) {
/**
* An error occured and is being reported via the CQ.
* Read the error and forward it to the upper layer.
*/
memset(&error, 0, sizeof(error));
ret = fi_cq_readerr(ompi_mtl_ofi.cq,
&error,
0);
if (ret) {
opal_output(ompi_mtl_base_framework.framework_output,
"Error returned from fi_cq_readerr: %d", ret);
}
assert(error.op_context);
ofi_req = TO_OFI_REQ(error.op_context);
assert(ofi_req);
ret = ofi_req->error_callback(&error, ofi_req);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_framework.framework_output,
"Error returned by request error callback: %d",
ret);
abort();
}
} else {
/**
* The CQ is empty. Return.
*/
break;
}
}
return count;
}

40
ompi/mca/mtl/ofi/mtl_ofi_endpoint.c Обычный файл
Просмотреть файл

@ -0,0 +1,40 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/types.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_endpoint.h"
/**
* Initialize state of the endpoint instance.
*/
static void mca_mtl_ofi_endpoint_construct(mca_mtl_ofi_endpoint_t *endpoint)
{
endpoint->mtl_ofi_module = NULL;
}
/**
* Destroy an endpoint
*/
static void mca_mtl_ofi_endpoint_destruct(mca_mtl_ofi_endpoint_t *endpoint)
{
}
OBJ_CLASS_INSTANCE(
mca_mtl_ofi_endpoint_t,
opal_list_item_t,
mca_mtl_ofi_endpoint_construct,
mca_mtl_ofi_endpoint_destruct
);

44
ompi/mca/mtl/ofi/mtl_ofi_endpoint.h Обычный файл
Просмотреть файл

@ -0,0 +1,44 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MTL_OFI_ENDPOINT_H
#define OMPI_MTL_OFI_ENDPOINT_H
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "ompi/mca/mtl/mtl.h"
#include "mtl_ofi.h"
BEGIN_C_DECLS
OBJ_CLASS_DECLARATION(mca_mtl_ofi_endpoint_t);
/**
* An abstraction that represents a connection to a endpoint process.
* An instance of mca_mtl_ofi_endpoint_t is associated with each process
* and MTL pair at startup. However, connections to the endpoint
* are established dynamically on an as-needed basis:
*/
struct mca_mtl_ofi_endpoint_t {
opal_list_item_t super;
/** MTL instance that created this connection */
struct mca_mtl_ofi_module_t *mtl_ofi_module;
/** The peer's fi_addr */
fi_addr_t peer_fiaddr;
};
typedef struct mca_mtl_ofi_endpoint_t mca_mtl_ofi_endpoint_t;
OBJ_CLASS_DECLARATION(mca_mtl_ofi_endpoint);
END_C_DECLS
#endif

29
ompi/mca/mtl/ofi/mtl_ofi_message.c Обычный файл
Просмотреть файл

@ -0,0 +1,29 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/constants.h"
#include "mtl_ofi.h"
#include "mtl_ofi_message.h"
static void
ompi_mtl_ofi_message_construct(ompi_mtl_ofi_message_t *message)
{
message->buffer = message + 1;
}
OBJ_CLASS_INSTANCE(
ompi_mtl_ofi_message_t,
opal_free_list_item_t,
ompi_mtl_ofi_message_construct,
NULL
);

51
ompi/mca/mtl/ofi/mtl_ofi_message.h Обычный файл
Просмотреть файл

@ -0,0 +1,51 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MTL_OFI_MESSAGE_H
#define MTL_OFI_MESSAGE_H
#include "mtl_ofi_types.h"
struct ompi_mtl_ofi_message_t {
opal_free_list_item_t super;
struct fi_cq_tagged_entry wc;
void *buffer;
};
typedef struct ompi_mtl_ofi_message_t ompi_mtl_ofi_message_t;
OBJ_CLASS_DECLARATION(ompi_mtl_ofi_message_t);
static inline ompi_mtl_ofi_message_t*
ompi_mtl_ofi_message_alloc(const struct fi_cq_tagged_entry *wc)
{
int rc;
opal_free_list_item_t *tmp;
ompi_mtl_ofi_message_t *message;
OPAL_FREE_LIST_GET(&ompi_mtl_ofi.free_messages,
tmp,
rc);
if (NULL == tmp) return NULL;
message = (ompi_mtl_ofi_message_t*) tmp;
message->wc = *wc;
return message;
}
static inline void
ompi_mtl_ofi_message_free(ompi_mtl_ofi_message_t *message)
{
OPAL_FREE_LIST_RETURN(&ompi_mtl_ofi.free_messages,
&message->super);
}
#endif

269
ompi/mca/mtl/ofi/mtl_ofi_probe.c Обычный файл
Просмотреть файл

@ -0,0 +1,269 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/communicator/communicator.h"
#include "ompi/message/message.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_request.h"
#include "mtl_ofi_endpoint.h"
#include "mtl_ofi_message.h"
/**
* Called when a probe request completes. Read fi_cq_tagged_entry's
* data field to determine whether or not a matching message was found.
*/
static int
ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
if (wc->data > 0) {
ofi_req->match_state = 1;
ofi_req->status.MPI_SOURCE = MTL_OFI_GET_SOURCE(wc->tag);
ofi_req->status.MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
ofi_req->status.MPI_ERROR = MPI_SUCCESS;
ofi_req->status._ucount = wc->len;
} else {
ofi_req->match_state = 0;
}
ofi_req->completion_count--;
return OMPI_SUCCESS;
}
/**
* Called when a probe request encounters an error.
*/
static int
ompi_mtl_ofi_probe_error_callback(struct fi_cq_err_entry *error,
ompi_mtl_ofi_request_t *ofi_req)
{
ofi_req->super.ompi_req->req_status.MPI_ERROR = MPI_ERR_INTERN;
ofi_req->completion_count--;
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
int *flag,
struct ompi_status_public_t *status)
{
struct ompi_mtl_ofi_request_t ofi_req;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
fi_addr_t remote_proc = 0;
size_t length = 0;
uint64_t match_bits, mask_bits;
int ret;
/**
* If the source is known, use its peer_fiaddr.
*/
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
remote_proc = endpoint->peer_fiaddr;
}
MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);
ofi_req.type = OMPI_MTL_OFI_PROBE;
ofi_req.event_callback = ompi_mtl_ofi_probe_callback;
ofi_req.error_callback = ompi_mtl_ofi_probe_error_callback;
ofi_req.completion_count = 1;
ofi_req.match_state = 0;
ret = fi_tsearch(ompi_mtl_ofi.ep,
&match_bits,
mask_bits,
0,
&remote_proc,
&length,
(void *)&ofi_req.ctx);
/**
* Probe is a blocking operation. fi_tsearch() is non-blocking.
* We inspect the return code and decide what to do.
* The request can either:
* - be queued successfully,
* - return no matching message, or
* - return a matching message.
*/
if (0 == ret) {
/**
* The search request was queued successfully. Wait until complete.
*/
while (0 < ofi_req.completion_count) {
opal_progress();
}
*flag = ofi_req.match_state;
if (1 == *flag) {
*status = ofi_req.status;
}
} else if (1 == ret) {
/**
* The search request completed and a matching message was found.
*/
ofi_req.match_state = 1;
ofi_req.status.MPI_SOURCE = MTL_OFI_GET_SOURCE(match_bits);
ofi_req.status.MPI_TAG = MTL_OFI_GET_TAG(match_bits);
ofi_req.status.MPI_ERROR = MPI_SUCCESS;
ofi_req.status._ucount = length;
*flag = 1;
*status = ofi_req.status;
} else if (ret < 0 && -FI_ENOMSG == ret) {
/**
* The search request completed but no matching message was found.
*/
*flag = 0;
} else if (ret < 0 && ret != -FI_ENOMSG) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsearch failed: %d (%s)",
__FILE__, __LINE__, ret, fi_strerror(ret));
return ompi_mtl_ofi_get_error(-ret);
} else {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: unexpected return code from fi_tsearch: %d",
__FILE__, __LINE__, ret);
return ompi_mtl_ofi_get_error(-ret);
}
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
int *matched,
struct ompi_message_t **message,
struct ompi_status_public_t *status)
{
struct ompi_mtl_ofi_request_t ofi_req;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
fi_addr_t remote_proc = 0;
size_t length = 0;
uint64_t match_bits, mask_bits;
int ret;
/**
* If the source is known, use its peer_fiaddr.
*/
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
remote_proc = endpoint->peer_fiaddr;
}
MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);
ofi_req.type = OMPI_MTL_OFI_PROBE;
ofi_req.event_callback = ompi_mtl_ofi_probe_callback;
ofi_req.error_callback = ompi_mtl_ofi_probe_error_callback;
ofi_req.completion_count = 1;
ofi_req.match_state = 0;
ret = fi_tsearch(ompi_mtl_ofi.ep,
&match_bits,
mask_bits,
FI_CLAIM,
&remote_proc,
&length,
(void *)&ofi_req.ctx);
/**
* Probe is a blocking operation. fi_tsearch() is non-blocking.
* We inspect the return code and decide what to do.
* The request can either:
* - be queued successfully,
* - return no matching message, or
* - return a matching message.
*/
if (ret == 0) {
/**
* The search request was queued successfully. Wait until complete.
*/
while (0 < ofi_req.completion_count) {
opal_progress();
}
*matched = ofi_req.match_state;
if (1 == *matched) {
*status = ofi_req.status;
(*message) = ompi_message_alloc();
if (NULL == (*message)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
(*message)->comm = comm;
(*message)->req_ptr = ofi_req.message;
(*message)->peer = status->MPI_SOURCE;
(*message)->count = status->_ucount;
if (NULL == (*message)->req_ptr) {
ompi_message_return(*message);
*message = NULL;
return OMPI_ERR_OUT_OF_RESOURCE;
}
} else {
(*message) = MPI_MESSAGE_NULL;
}
} else if (1 == ret) {
/**
* The search request completed and a matching message was found.
*/
*matched = 1;
*status = ofi_req.status;
ofi_req.match_state = 1;
(*message) = ompi_message_alloc();
if (NULL == (*message)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
(*message)->comm = comm;
(*message)->req_ptr = ofi_req.message;
(*message)->peer = status->MPI_SOURCE;
(*message)->count = status->_ucount;
if (NULL == (*message)->req_ptr) {
ompi_message_return(*message);
*message = NULL;
return OMPI_ERR_OUT_OF_RESOURCE;
}
} else if (ret < 0 && ret == -FI_ENOMSG) {
/**
* The search request completed but no matching message was found.
*/
*matched = 0;
} else if (ret < 0 && ret != -FI_ENOMSG) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsearch failed: %d (%s)",
__FILE__, __LINE__, ret, fi_strerror(ret));
return ompi_mtl_ofi_get_error(-ret);
} else {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: unexpected return code from fi_tsearch: %d",
__FILE__, __LINE__, ret);
return ompi_mtl_ofi_get_error(-ret);
}
return OMPI_SUCCESS;
}

261
ompi/mca/mtl/ofi/mtl_ofi_recv.c Обычный файл
Просмотреть файл

@ -0,0 +1,261 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/class/opal_list.h"
#include "ompi/communicator/communicator.h"
#include "ompi/datatype/ompi_datatype.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mtl/base/base.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
#include "ompi/message/message.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_endpoint.h"
#include "mtl_ofi_request.h"
#include "mtl_ofi_message.h"
/**
* Called when a completion for SYNC ACK send is received.
* This completes the synchronous recv operation. Thus, we
* call the upper layer's completion function.
*/
static int
ompi_mtl_ofi_sync_recv_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
ofi_req->super.completion_callback(&ofi_req->super);
return OMPI_SUCCESS;
}
/**
* Called when a completion for a posted recv is received.
*/
static int
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
int ret;
ssize_t ret_length;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
int src;
ompi_status_public_t *status = NULL;
assert(ofi_req->super.ompi_req);
status = &ofi_req->super.ompi_req->req_status;
/**
* Any event associated with a request starts it.
* This prevents a started request from being cancelled.
*/
ofi_req->req_started = true;
status->MPI_SOURCE = MTL_OFI_GET_SOURCE(wc->tag);
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
status->_ucount = wc->len;
if (OPAL_UNLIKELY(wc->len > ofi_req->length)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"truncate expected: %ld %ld",
wc->len, ofi_req->length);
status->MPI_ERROR = MPI_ERR_TRUNCATE;
}
/**
* Unpack data into recv buffer if necessary.
*/
if (OPAL_UNLIKELY(ofi_req->buffer)) {
ret = ompi_mtl_datatype_unpack(ofi_req->convertor,
ofi_req->buffer,
wc->len);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
__FILE__, __LINE__, ret);
status->MPI_ERROR = ret;
}
}
/**
* We do not want any SYNC_SEND_ACK here!
* See mtl_ofi_send.c for details.
*/
assert(!MTL_OFI_IS_SYNC_SEND_ACK(wc->tag));
/**
* If this recv is part of an MPI_Ssend operation, then we send an
* acknowledgment back to the sender. The fi_context can be
* re-used safely because the previous operation has completed.
* This recv request will complete once we get a completion for
* this send. See ompi_mtl_ofi_sync_recv_callback().
* Otherwise, this request is now complete.
*/
if (OPAL_UNLIKELY(MTL_OFI_IS_SYNC_SEND(wc->tag))) {
ofi_req->event_callback = ompi_mtl_ofi_sync_recv_callback;
/**
* If the recv request was posted for any source,
* we need to extract the source's actual address.
*/
if (!ofi_req->remote_addr) {
src = MTL_OFI_GET_SOURCE(wc->tag);
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src );
endpoint = ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
ofi_req->remote_addr = endpoint->peer_fiaddr;
}
ret_length = fi_tsend(ompi_mtl_ofi.ep,
NULL,
0,
ompi_mtl_ofi.mr,
ofi_req->remote_addr,
wc->tag | MTL_OFI_SYNC_SEND_ACK,
(void *) &ofi_req->ctx);
if (OPAL_UNLIKELY(ret_length < 0)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsend failed: %zd",
__FILE__, __LINE__, ret_length);
status->MPI_ERROR = OMPI_ERROR;
}
} else {
ofi_req->super.completion_callback(&ofi_req->super);
}
return OMPI_SUCCESS;
}
/**
* Called when an error occured on a recv request.
*/
static int
ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
ompi_mtl_ofi_request_t *ofi_req)
{
ompi_status_public_t *status;
assert(ofi_req->super.ompi_req);
status = &ofi_req->super.ompi_req->req_status;
status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
status->MPI_SOURCE = MTL_OFI_GET_SOURCE(ofi_req->match_bits);
/* FIXME: This could be done on a single line... */
switch (error->err) {
case FI_EMSGSIZE:
status->MPI_ERROR = MPI_ERR_TRUNCATE;
break;
default:
status->MPI_ERROR = MPI_ERR_INTERN;
}
ofi_req->super.completion_callback(&ofi_req->super);
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int src,
int tag,
struct opal_convertor_t *convertor,
mca_mtl_request_t *mtl_request)
{
int ret = OMPI_SUCCESS;
ssize_t ret_length;
uint64_t match_bits, mask_bits;
fi_addr_t remote_addr;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
void *start;
size_t length;
bool free_after;
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup(comm, src);
endpoint = ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
remote_addr = endpoint->peer_fiaddr;
} else {
remote_addr = ompi_mtl_ofi.any_addr;
}
MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);
ret = ompi_mtl_datatype_recv_buf(convertor, &start, &length, &free_after);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ofi_req->type = OMPI_MTL_OFI_RECV;
ofi_req->event_callback = ompi_mtl_ofi_recv_callback;
ofi_req->error_callback = ompi_mtl_ofi_recv_error_callback;
ofi_req->comm = comm;
ofi_req->buffer = (free_after) ? start : NULL;
ofi_req->length = length;
ofi_req->convertor = convertor;
ofi_req->req_started = false;
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
ofi_req->remote_addr = remote_addr;
ofi_req->match_bits = match_bits;
ret_length = fi_trecv(ompi_mtl_ofi.ep,
start,
length,
ompi_mtl_ofi.mr,
remote_addr,
match_bits,
mask_bits,
(void *)&ofi_req->ctx);
if (OPAL_UNLIKELY(ret_length < 0)) {
if (NULL != ofi_req->buffer) {
free(ofi_req->buffer);
}
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecv failed: %s(%zd)",
__FILE__, __LINE__, strerror(errno), ret_length);
return ompi_mtl_ofi_get_error(ret);
}
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
struct opal_convertor_t *convertor,
struct ompi_message_t **message,
struct mca_mtl_request_t *mtl_request)
{
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
void *start;
size_t length;
bool free_after;
int ret;
ompi_mtl_ofi_message_t *ofi_message =
(ompi_mtl_ofi_message_t*) (*message)->req_ptr;
ret = ompi_mtl_datatype_recv_buf(convertor, &start, &length, &free_after);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ofi_req->type = OMPI_MTL_OFI_RECV;
ofi_req->event_callback = ompi_mtl_ofi_recv_callback;
ofi_req->error_callback = ompi_mtl_ofi_recv_error_callback;
ofi_req->buffer = (free_after) ? start : NULL;
ofi_req->length = length;
ofi_req->convertor = convertor;
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
(*message) = MPI_MESSAGE_NULL;
return ompi_mtl_ofi_recv_callback(&(ofi_message->wc), ofi_req);
}

87
ompi/mca/mtl/ofi/mtl_ofi_request.h Обычный файл
Просмотреть файл

@ -0,0 +1,87 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MTL_OFI_REQUEST_H
#define OMPI_MTL_OFI_REQUEST_H
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mtl/mtl.h"
#define TO_OFI_REQ(_ptr_ctx) \
container_of((_ptr_ctx), struct ompi_mtl_ofi_request_t, ctx)
struct ompi_mtl_ofi_message_t;
typedef enum {
OMPI_MTL_OFI_SEND,
OMPI_MTL_OFI_RECV,
OMPI_MTL_OFI_ACK,
OMPI_MTL_OFI_PROBE
} ompi_mtl_ofi_request_type_t;
struct ompi_mtl_ofi_request_t;
struct ompi_mtl_ofi_request_t {
struct mca_mtl_request_t super;
/** OFI Request type */
ompi_mtl_ofi_request_type_t type;
/** OFI context */
struct fi_context ctx;
/** Completion count used by blocking and/or synchronous operations */
volatile int completion_count;
/** Event callback */
int (*event_callback)(struct fi_cq_tagged_entry *wc,
struct ompi_mtl_ofi_request_t*);
/** Error callback */
int (*error_callback)(struct fi_cq_err_entry *error,
struct ompi_mtl_ofi_request_t*);
/** Request status */
struct ompi_status_public_t status;
/** Match state used by Probe */
int match_state;
/** Associated message */
struct ompi_mtl_ofi_message_t *message;
/** Reference to the communicator used to */
/* lookup source of an ANY_SOURCE Recv */
struct ompi_communicator_t *comm;
/** Pack buffer */
void *buffer;
/** Pack buffer size */
size_t length;
/** Pack buffer convertor */
struct opal_convertor_t *convertor;
/** Flag to prevent MPI_Cancel from cancelling a started Recv request */
volatile bool req_started;
/** Request's tag used in case of an error. */
uint64_t match_bits;
/** Remote OFI address used when a Recv needs to be ACKed */
fi_addr_t remote_addr;
/** Parent request which needs to be ACKed (e.g. Synchronous Send) */
struct ompi_mtl_ofi_request_t *parent;
};
typedef struct ompi_mtl_ofi_request_t ompi_mtl_ofi_request_t;
#endif

240
ompi/mca/mtl/ofi/mtl_ofi_send.c Обычный файл
Просмотреть файл

@ -0,0 +1,240 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/communicator/communicator.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mtl/base/base.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
#include "mtl_ofi.h"
#include "mtl_ofi_types.h"
#include "mtl_ofi_request.h"
#include "mtl_ofi_endpoint.h"
static int
ompi_mtl_ofi_send_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
assert(ofi_req->completion_count > 0);
ofi_req->completion_count--;
return OMPI_SUCCESS;
}
static int
ompi_mtl_ofi_send_error_callback(struct fi_cq_err_entry *error,
ompi_mtl_ofi_request_t *ofi_req)
{
switch(error->err) {
case FI_EMSGSIZE:
ofi_req->status.MPI_ERROR = MPI_ERR_TRUNCATE;
break;
default:
ofi_req->status.MPI_ERROR = MPI_ERR_INTERN;
}
return ofi_req->event_callback(NULL, ofi_req);
}
static int
ompi_mtl_ofi_send_ack_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
free(ofi_req);
parent_req->event_callback(NULL, parent_req);
return OMPI_SUCCESS;
}
static int
ompi_mtl_ofi_send_ack_error_callback(struct fi_cq_err_entry *error,
ompi_mtl_ofi_request_t *ofi_req)
{
ompi_mtl_ofi_request_t *parent_req = ofi_req->parent;
free(ofi_req);
parent_req->status.MPI_ERROR = MPI_ERR_INTERN;
return parent_req->error_callback(error, parent_req);
}
static int
ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
assert(ofi_req->completion_count > 0);
ofi_req->completion_count--;
if (0 == ofi_req->completion_count) {
/* Request completed */
if (OPAL_UNLIKELY(NULL != ofi_req->buffer)) {
free(ofi_req->buffer);
ofi_req->buffer = NULL;
}
ofi_req->super.ompi_req->req_status.MPI_ERROR =
ofi_req->status.MPI_ERROR;
ofi_req->super.completion_callback(&ofi_req->super);
}
return OMPI_SUCCESS;
}
static inline int
ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int dest,
int tag,
struct opal_convertor_t *convertor,
mca_pml_base_send_mode_t mode,
ompi_mtl_ofi_request_t *ofi_req)
{
int ret;
void *start;
size_t length;
ssize_t ret_length;
bool free_after;
uint64_t match_bits;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
ompi_proc = ompi_comm_peer_lookup(comm, dest);
endpoint = ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL];
ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
if (OMPI_SUCCESS != ret) return ret;
ofi_req->buffer = (free_after) ? start : NULL;
ofi_req->length = length;
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
assert(ack_req);
ack_req->parent = ofi_req;
ack_req->event_callback = ompi_mtl_ofi_send_ack_callback;
ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;
ofi_req->completion_count = 2;
MTL_OFI_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank, tag, MTL_OFI_SYNC_SEND);
ret_length = fi_trecv(ompi_mtl_ofi.ep,
NULL,
0,
ompi_mtl_ofi.mr,
endpoint->peer_fiaddr,
match_bits | MTL_OFI_SYNC_SEND_ACK,
0, /* Exact match, no ignore bits */
(void *) &ack_req->ctx);
if (OPAL_UNLIKELY(ret_length < 0)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecv failed: %s(%zd)",
__FILE__, __LINE__,
strerror(errno), ret_length);
return ompi_mtl_ofi_get_error(ret);
}
} else {
ofi_req->completion_count = 1;
MTL_OFI_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank, tag, 0);
}
ret_length = fi_tsend(ompi_mtl_ofi.ep,
start,
length,
ompi_mtl_ofi.mr,
endpoint->peer_fiaddr,
match_bits,
(void *) &ofi_req->ctx);
if (OPAL_UNLIKELY(0 > ret_length)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsend failed: %zd",
__FILE__, __LINE__, ret_length);
return ompi_mtl_ofi_get_error(ret);
}
return OMPI_SUCCESS;
}
int
ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int dest,
int tag,
struct opal_convertor_t *convertor,
mca_pml_base_send_mode_t mode)
{
int ret = OMPI_SUCCESS;
ompi_mtl_ofi_request_t ofi_req;
/**
* Create a send request, start it and wait until it completes.
*/
ofi_req.event_callback = ompi_mtl_ofi_send_callback;
ofi_req.error_callback = ompi_mtl_ofi_send_error_callback;
ret = ompi_mtl_ofi_send_start(mtl, comm, dest, tag,
convertor, mode, &ofi_req);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
if (NULL != ofi_req.buffer) {
free(ofi_req.buffer);
}
return ret;
}
/**
* Wait until the request is completed.
* ompi_mtl_ofi_send_callback() updates this variable.
*/
while (0 < ofi_req.completion_count) {
ompi_mtl_ofi_progress();
}
if (OPAL_UNLIKELY(NULL != ofi_req.buffer)) {
free(ofi_req.buffer);
}
return ofi_req.status.MPI_ERROR;
}
int
ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
int dest,
int tag,
struct opal_convertor_t *convertor,
mca_pml_base_send_mode_t mode,
bool blocking,
mca_mtl_request_t *mtl_request)
{
int ret = OMPI_SUCCESS;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
ret = ompi_mtl_ofi_send_start(mtl, comm, dest, tag,
convertor, mode, ofi_req);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret && NULL != ofi_req->buffer)) {
free(ofi_req->buffer);
}
return ret;
}

141
ompi/mca/mtl/ofi/mtl_ofi_types.h Обычный файл
Просмотреть файл

@ -0,0 +1,141 @@
/*
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MTL_OFI_TYPES_H_HAS_BEEN_INCLUDED
#define MTL_OFI_TYPES_H_HAS_BEEN_INCLUDED
#include "ompi_config.h"
#include "ompi/mca/mtl/mtl.h"
#include "ompi/mca/mtl/base/base.h"
#include "ompi/communicator/communicator.h"
#include <rdma/fabric.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include "mtl_ofi_endpoint.h"
BEGIN_C_DECLS
/**
* MTL Module Interface
*/
typedef struct mca_mtl_ofi_module_t {
mca_mtl_base_module_t base;
/** Fabric Domain handle */
struct fid_fabric *fabric;
/** Access Domain handle */
struct fid_domain *domain;
/** Address vector handle */
struct fid_av *av;
/** Completion queue handle */
struct fid_cq *cq;
/** Memory region handle */
struct fid_mr *mr;
/** Endpoint to communicate on */
struct fid_ep *ep;
/** Endpoint name length */
size_t epnamelen;
/** "Any source" address */
fi_addr_t any_addr;
/** List of free messages for matched probe */
opal_free_list_t free_messages;
} mca_mtl_ofi_module_t;
extern mca_mtl_ofi_module_t ompi_mtl_ofi;
typedef struct mca_mtl_ofi_component_t {
/** Base MTL component */
mca_mtl_base_component_2_0_0_t super;
} mca_mtl_ofi_component_t;
OMPI_DECLSPEC mca_mtl_ofi_component_t mca_mtl_ofi_component;
/* match/ignore bit manipulation
*
* 0 123 4567 01234567 01234567 01234567 01234567 01234567 01234567 01234567
* | | | |
* | | context id | source | message tag
* ^| ^ | | |
* | |
* | +- protocol
* +---- ACK flag
*/
#define MTL_OFI_PROTOCOL_HEADER_MASK (0xF000000000000000ULL)
#define MTL_OFI_PROTOCOL_MASK (0x7000000000000000ULL)
#define MTL_OFI_CONTEXT_MASK (0x0FFF000000000000ULL)
#define MTL_OFI_SOURCE_MASK (0x0000FFFF00000000ULL)
#define MTL_OFI_TAG_MASK (0x00000000FFFFFFFFULL)
#define MTL_OFI_SYNC_SEND (0x1000000000000000ULL)
#define MTL_OFI_SYNC_SEND_ACK (0x9000000000000000ULL)
/* send posting */
#define MTL_OFI_SET_SEND_BITS(match_bits, contextid, source, tag, type) \
{ \
match_bits = contextid; \
match_bits = (match_bits << 16); \
match_bits |= source; \
match_bits = (match_bits << 32); \
match_bits |= (MTL_OFI_TAG_MASK & tag) | type; \
}
/* receive posting */
/* Special tags are used for collective operations.
* MPI_ANY_TAG should not match these special tags.
* See ompi/mca/coll/base/coll_tags.h
*/
#define MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, contextid, source, tag) \
{ \
match_bits = 0; \
mask_bits = MTL_OFI_PROTOCOL_MASK; \
\
match_bits = contextid; \
match_bits = (match_bits << 16); \
\
if (MPI_ANY_SOURCE == source) { \
match_bits = (match_bits << 32); \
mask_bits |= MTL_OFI_SOURCE_MASK; \
} else { \
match_bits |= source; \
match_bits = (match_bits << 32); \
} \
\
if (MPI_ANY_TAG == tag) { \
mask_bits |= 0x000000007FFFFFFFULL; \
} else { \
match_bits |= (MTL_OFI_TAG_MASK & tag); \
} \
}
#define MTL_OFI_IS_SYNC_SEND(match_bits) \
(MTL_OFI_SYNC_SEND == (MTL_OFI_PROTOCOL_HEADER_MASK & match_bits))
#define MTL_OFI_IS_SYNC_SEND_ACK(match_bits) \
(MTL_OFI_SYNC_SEND_ACK == (MTL_OFI_PROTOCOL_HEADER_MASK & match_bits))
#define MTL_OFI_GET_TAG(match_bits) \
((int)(match_bits & MTL_OFI_TAG_MASK))
#define MTL_OFI_GET_SOURCE(match_bits) \
((int)((match_bits & MTL_OFI_SOURCE_MASK) >> 32))
END_C_DECLS
#endif /* MTL_OFI_TYPES_H_HAS_BEEN_INCLUDED */

1
ompi/mca/mtl/ofi/post_configure.sh Обычный файл
Просмотреть файл

@ -0,0 +1 @@
DIRECT_CALL_HEADER="ompi/mca/mtl/ofi/mtl_ofi.h"