1
1

MTL/OFI: Add OFI Scalable Endpoint support

OFI MTL supports OFI Scalable Endpoints feature as means to improve
multi-threaded application throughput and message rate. Currently the feature
is designed to utilize multiple TX/RX contexts exposed by the OFI provider in
conjunction with a multi-communicator MPI application model. For more
information, refer to README under mtl/ofi.

Reviewed-by: Matias Cabral <matias.a.cabral@intel.com>
Reviewed-by: Neil Spruit <neil.r.spruit@intel.com>
Signed-off-by: Aravind Gopalakrishnan <Aravind.Gopalakrishnan@intel.com>
(cherry picked from commit 109d0569ffdc29f40518d02ad7a4d5bca3adc3d1)
Signed-off-by: Brian Barrett <bbarrett@amazon.com>
Этот коммит содержится в:
Aravind Gopalakrishnan 2018-10-24 14:38:23 -07:00 коммит произвёл Brian Barrett
родитель 8ebd0d8f24
Коммит 22d0857ee5
5 изменённых файлов: 796 добавлений и 197 удалений

Просмотреть файл

@ -1,5 +1,5 @@
OFI MTL
OFI MTL:
--------
The OFI MTL supports Libfabric (a.k.a. Open Fabrics Interfaces OFI,
https://ofiwg.github.io/libfabric/) tagged APIs (fi_tagged(3)). At
initialization time, the MTL queries libfabric for providers supporting tag matching
@ -9,6 +9,7 @@ The user may modify the OFI provider selection with mca parameters
mtl_ofi_provider_include or mtl_ofi_provider_exclude.
PROGRESS:
---------
The MTL registers a progress function to opal_progress. There is currently
no support for asynchronous progress. The progress function reads multiple events
from the OFI provider Completion Queue (CQ) per iteration (defaults to 100, can be
@ -16,12 +17,14 @@ modified with the mca mtl_ofi_progress_event_cnt) and iterates until the
completion queue is drained.
COMPLETIONS:
------------
Each operation uses a request type ompi_mtl_ofi_request_t which includes a reference
to an operation specific completion callback, an MPI request, and a context. The
to an operation specific completion callback, an MPI request, and a context. The
context (fi_context) is used to map completion events with MPI_requests when reading the
CQ.
OFI TAG:
--------
MPI needs to send 96 bits of information per message (32 bits communicator id,
32 bits source rank, 32 bits MPI tag) but OFI only offers 64 bits tags. In
addition, the OFI MTL uses 2 bits of the OFI tag for the synchronous send protocol.
@ -67,3 +70,76 @@ This is signaled in mem_tag_format (see fi_endpoint(3)) by setting higher order
to zero. In such cases, the OFI MTL will reduce the number of communicator ids supported
by reducing the bits available for the communicator ID field in the OFI tag.
SCALABLE ENDPOINTS:
-------------------
OFI MTL supports OFI Scalable Endpoints feature as a means to improve
multi-threaded application throughput and message rate. Currently the feature
is designed to utilize multiple TX/RX contexts exposed by the OFI provider in
conjunction with a multi-communicator MPI application model. Therefore, new OFI
contexts are created as and when communicators are duplicated in a lazy fashion
instead of creating them all at once during init time and this approach also
favours only creating as many contexts as needed.
1. Multi-communicator model:
With this approach, the application first duplicates the communicators it
wants to use with MPI operations (ideally creating as many communicators as
the number of threads it wants to use to call into MPI). The duplicated
communicators are then used by the corresponding threads to perform MPI
operations. A possible usage scenario could be in an MPI + OMP
application as follows (example limited to 2 ranks):
MPI_Comm dup_comm[n];
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
for (i = 0; i < n; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &dup_comm[i]);
}
if (rank == 0) {
#pragma omp parallel for private(host_sbuf, host_rbuf) num_threads(n)
for (i = 0; i < n ; i++) {
MPI_Send(host_sbuf, MYBUFSIZE, MPI_CHAR,
1, MSG_TAG, dup_comm[i]);
MPI_Recv(host_rbuf, MYBUFSIZE, MPI_CHAR,
1, MSG_TAG, dup_comm[i], &status);
}
} else if (rank == 1) {
#pragma omp parallel for private(status, host_sbuf, host_rbuf) num_threads(n)
for (i = 0; i < n ; i++) {
MPI_Recv(host_rbuf, MYBUFSIZE, MPI_CHAR,
0, MSG_TAG, dup_comm[i], &status);
MPI_Send(host_sbuf, MYBUFSIZE, MPI_CHAR,
0, MSG_TAG, dup_comm[i]);
}
}
2. MCA variable:
To utilize the feature, the following MCA variable needs to be set:
mtl_ofi_thread_grouping:
This MCA variable is at the OFI MTL level and needs to be set to switch
the feature on.
Default: 0
It is not recommended to set the MCA variable for:
- Multi-threaded MPI applications not following multi-communicator approach.
- Applications that have multiple threads using a single communicator as
it may degrade performance.
Command-line syntax to set the MCA variable:
"-mca mtl_ofi_thread_grouping 1"
3. Notes on performance:
- OFI MTL will create as many TX/RX contexts as allowed by an underlying
provider (each provider may have different thresholds). Once the threshold
is exceeded, contexts are used in a round-robin fashion which leads to
resource sharing among threads. Therefore locks are required to guard
against race conditions. For performance, it is recommended to have
Number of communicators = Number of contexts
For example, when using PSM2 provider, the number of contexts is dictated
by the Intel Omni-Path HFI1 driver module.
- For applications using a single thread with multiple communicators and MCA
variable "mtl_ofi_thread_grouping" set to 1, the MTL will use multiple
contexts, but the benefits may be negligible as only one thread is driving
progress.

Просмотреть файл

@ -24,5 +24,21 @@ fi_info -v -p %s
Local host: %s
Location: %s:%d
[SEP unavailable]
Scalable Endpoint feature is required for Thread Grouping feature to work
but it is not supported by %s provider. Try disabling this feature.
Local host: %s
Location: %s:%d
[SEP ctxt limit]
Reached limit (%d) for number of OFI contexts that can be opened with the
provider. Creating new communicators beyond this limit is possible but
they will re-use existing contexts in round-robin fashion.
Using new communicators beyond the limit will impact performance.
Local host: %s
Location: %s:%d
[message too big]
Message size %llu bigger than supported by selected transport. Max = %llu

Просмотреть файл

@ -42,7 +42,6 @@
#include "mtl_ofi_endpoint.h"
#include "mtl_ofi_compat.h"
BEGIN_C_DECLS
extern mca_mtl_ofi_module_t ompi_mtl_ofi;
@ -54,14 +53,47 @@ extern int ompi_mtl_ofi_del_procs(struct mca_mtl_base_module_t *mtl,
int ompi_mtl_ofi_progress_no_inline(void);
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_progress(void)
#if OPAL_HAVE_THREAD_LOCAL
extern opal_thread_local int per_thread_ctx;
extern opal_thread_local struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
#endif
/* Set OFI context for operations which generate completion events */
__opal_attribute_always_inline__ static inline void
set_thread_context(int ctxt)
{
#if OPAL_HAVE_THREAD_LOCAL
per_thread_ctx = ctxt;
return;
#endif
}
/* Retrieve OFI context to use for CQ poll */
__opal_attribute_always_inline__ static inline void
get_thread_context(int *ctxt)
{
#if OPAL_HAVE_THREAD_LOCAL
*ctxt = per_thread_ctx;
#endif
return;
}
#define MTL_OFI_CONTEXT_LOCK(ctxt_id) \
OPAL_LIKELY(!opal_mutex_atomic_trylock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock))
#define MTL_OFI_CONTEXT_UNLOCK(ctxt_id) \
opal_mutex_atomic_unlock(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock)
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_context_progress(int ctxt_id)
{
ssize_t ret;
int count = 0, i, events_read;
struct fi_cq_err_entry error = { 0 };
ompi_mtl_ofi_request_t *ofi_req = NULL;
struct fi_cq_tagged_entry wc[ompi_mtl_ofi.ofi_progress_event_count];
struct fi_cq_err_entry error = { 0 };
ssize_t ret;
#if !OPAL_HAVE_THREAD_LOCAL
struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
#endif
/**
* Read the work completions from the CQ.
@ -69,7 +101,8 @@ ompi_mtl_ofi_progress(void)
* Call the request's callback.
*/
while (true) {
ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, ompi_mtl_ofi.ofi_progress_event_count);
ret = fi_cq_read(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, (void *)&wc,
ompi_mtl_ofi.ofi_progress_event_count);
if (ret > 0) {
count+= ret;
events_read = ret;
@ -92,7 +125,7 @@ ompi_mtl_ofi_progress(void)
* An error occured and is being reported via the CQ.
* Read the error and forward it to the upper layer.
*/
ret = fi_cq_readerr(ompi_mtl_ofi.cq,
ret = fi_cq_readerr(ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
&error,
0);
if (0 > ret) {
@ -126,6 +159,51 @@ ompi_mtl_ofi_progress(void)
}
}
}
return count;
}
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_progress(void)
{
int count = 0, ctxt_id = 0, i;
static volatile uint32_t num_calls = 0;
get_thread_context(&ctxt_id);
if (ompi_mtl_ofi.mpi_thread_multiple) {
if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
count += ompi_mtl_ofi_context_progress(ctxt_id);
MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
}
} else {
count += ompi_mtl_ofi_context_progress(ctxt_id);
}
#if OPAL_HAVE_THREAD_LOCAL
/*
* Try to progress other CQs in round-robin fashion.
* Progress is only made if no events were read from the CQ
* local to the calling thread past 16 times.
*/
if (OPAL_UNLIKELY((count == 0) && ompi_mtl_ofi.mpi_thread_multiple &&
(((num_calls++) & 0xF) == 0 ))) {
for (i = 0; i < ompi_mtl_ofi.total_ctxts_used - 1; i++) {
ctxt_id = (ctxt_id + 1) % ompi_mtl_ofi.total_ctxts_used;
if (MTL_OFI_CONTEXT_LOCK(ctxt_id)) {
count += ompi_mtl_ofi_context_progress(ctxt_id);
MTL_OFI_CONTEXT_UNLOCK(ctxt_id);
}
/* Upon any work done, exit to let other threads take lock */
if (OPAL_LIKELY(count > 0)) {
break;
}
}
}
#endif
return count;
}
@ -148,6 +226,13 @@ ompi_mtl_ofi_progress(void)
} while (OPAL_LIKELY(-FI_EAGAIN == RETURN)); \
} while (0);
#define MTL_OFI_LOG_FI_ERR(err, string) \
do { \
opal_output_verbose(1, ompi_mtl_base_framework.framework_output, \
"%s:%d:%s: %s\n", \
__FILE__, __LINE__, string, fi_strerror(-err)); \
} while(0);
/* MTL interface functions */
int ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl);
@ -239,6 +324,20 @@ ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
return OMPI_SUCCESS;
}
#define MTL_OFI_MAP_COMM_TO_CONTEXT(comm_id, ctxt_id) \
do { \
if (ompi_mtl_ofi.thread_grouping && \
(!ompi_mtl_ofi.threshold_comm_context_id || \
((uint32_t) ompi_mtl_ofi.threshold_comm_context_id > comm_id))) { \
ctxt_id = ompi_mtl_ofi.comm_to_context[comm_id]; \
} else if (ompi_mtl_ofi.thread_grouping) { \
/* Round-robin assignment of contexts if threshold is reached */ \
ctxt_id = comm_id % ompi_mtl_ofi.total_ctxts_used; \
} else { \
ctxt_id = 0; \
} \
} while (0);
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
struct ompi_communicator_t *comm,
@ -249,8 +348,12 @@ ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
int tag)
{
ssize_t ret = OMPI_SUCCESS;
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
int ctxt_id = 0;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
assert(ack_req);
ack_req->parent = ofi_req;
@ -259,7 +362,7 @@ ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req,
ofi_req->completion_count += 1;
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
NULL,
0,
NULL,
@ -290,7 +393,7 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
{
ssize_t ret = OMPI_SUCCESS;
ompi_mtl_ofi_request_t ofi_req;
int ompi_ret;
int ompi_ret, ctxt_id = 0;
void *start;
bool free_after;
size_t length;
@ -299,6 +402,10 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
fi_addr_t src_addr = 0;
fi_addr_t sep_peer_fiaddr = 0;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
/**
* Create a send request, start it and wait until it completes.
@ -309,6 +416,9 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
ompi_proc = ompi_comm_peer_lookup(comm, dest);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
/* For Scalable Endpoints, gather target receive context */
sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) {
return ompi_ret;
@ -328,7 +438,7 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
if (ompi_mtl_ofi.fi_cq_data) {
match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
src_addr = endpoint->peer_fiaddr;
src_addr = sep_peer_fiaddr;
} else {
match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
comm->c_my_rank, tag);
@ -345,27 +455,25 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
if (ompi_mtl_ofi.max_inject_size >= length) {
if (ompi_mtl_ofi.fi_cq_data) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
comm->c_my_rank,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits), ret);
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits), ret);
}
if (OPAL_UNLIKELY(0 > ret)) {
char *fi_api = ompi_mtl_ofi.fi_cq_data ? "fi_tinjectddata" : "fi_tinject";
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: %s failed: %s(%zd)",
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret,
ompi_mtl_ofi.fi_cq_data ? "fi_tinjectdata failed"
: "fi_tinject failed");
if (ack_req) {
fi_cancel((fid_t)ompi_mtl_ofi.ep, &ack_req->ctx);
fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, &ack_req->ctx);
free(ack_req);
}
@ -375,30 +483,27 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
} else {
ofi_req.completion_count += 1;
if (ompi_mtl_ofi.fi_cq_data) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
NULL,
comm->c_my_rank,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits,
(void *) &ofi_req.ctx), ret);
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
NULL,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits,
(void *) &ofi_req.ctx), ret);
}
if (OPAL_UNLIKELY(0 > ret)) {
char *fi_api = ompi_mtl_ofi.fi_cq_data ? "fi_tsendddata" : "fi_send";
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: %s failed: %s(%zd)",
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
free(fi_api);
MTL_OFI_LOG_FI_ERR(ret,
ompi_mtl_ofi.fi_cq_data ? "fi_tsenddata failed"
: "fi_tsend failed");
ofi_req.status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
goto free_request_buffer;
}
@ -432,7 +537,7 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
{
ssize_t ret = OMPI_SUCCESS;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request;
int ompi_ret;
int ompi_ret, ctxt_id = 0;
void *start;
size_t length;
bool free_after;
@ -440,7 +545,10 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
fi_addr_t src_addr = 0;
fi_addr_t sep_peer_fiaddr = 0;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
ofi_req->event_callback = ompi_mtl_ofi_isend_callback;
ofi_req->error_callback = ompi_mtl_ofi_send_error_callback;
@ -448,6 +556,9 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
ompi_proc = ompi_comm_peer_lookup(comm, dest);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
/* For Scalable Endpoints, gather target receive context */
sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
ompi_ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) return ompi_ret;
@ -465,7 +576,6 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
if (ompi_mtl_ofi.fi_cq_data) {
match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
src_addr = endpoint->peer_fiaddr;
} else {
match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
comm->c_my_rank, tag);
@ -473,7 +583,7 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
}
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &src_addr,
ofi_req->status.MPI_ERROR = ompi_mtl_ofi_ssend_recv(ack_req, comm, &sep_peer_fiaddr,
ofi_req, endpoint,
&match_bits, tag);
if (OPAL_UNLIKELY(ofi_req->status.MPI_ERROR != OMPI_SUCCESS))
@ -481,35 +591,27 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
}
if (ompi_mtl_ofi.fi_cq_data) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
NULL,
comm->c_my_rank,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits,
(void *) &ofi_req->ctx), ret);
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
start,
length,
NULL,
endpoint->peer_fiaddr,
sep_peer_fiaddr,
match_bits,
(void *) &ofi_req->ctx), ret);
}
if (OPAL_UNLIKELY(0 > ret)) {
char *fi_api;
if (ompi_mtl_ofi.fi_cq_data) {
asprintf( &fi_api, "fi_tsendddata") ;
}
else {
asprintf( &fi_api, "fi_send") ;
}
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: %s failed: %s(%zd)",
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
free(fi_api);
MTL_OFI_LOG_FI_ERR(ret,
ompi_mtl_ofi.fi_cq_data ? "fi_tsenddata failed"
: "fi_tsend failed");
ofi_req->status.MPI_ERROR = ompi_mtl_ofi_get_error(ret);
}
@ -529,7 +631,7 @@ __opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
ompi_mtl_ofi_request_t *ofi_req)
{
int ompi_ret;
int ompi_ret, ctxt_id = 0;
ssize_t ret;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
@ -537,6 +639,8 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
ompi_status_public_t *status = NULL;
struct fi_msg_tagged tagged_msg;
MTL_OFI_MAP_COMM_TO_CONTEXT(ofi_req->comm->c_contextid, ctxt_id);
assert(ofi_req->super.ompi_req);
status = &ofi_req->super.ompi_req->req_status;
@ -599,7 +703,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
ofi_req->remote_addr = endpoint->peer_fiaddr;
ofi_req->remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
}
tagged_msg.msg_iov = NULL;
@ -615,12 +719,10 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
tagged_msg.context = NULL;
tagged_msg.data = 0;
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_tsendmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep,
&tagged_msg, 0), ret);
if (OPAL_UNLIKELY(0 > ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsendmsg failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret, "fi_tsendmsg failed");
status->MPI_ERROR = OMPI_ERROR;
}
}
@ -666,7 +768,7 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
struct opal_convertor_t *convertor,
mca_mtl_request_t *mtl_request)
{
int ompi_ret = OMPI_SUCCESS;
int ompi_ret = OMPI_SUCCESS, ctxt_id = 0;
ssize_t ret;
uint64_t match_bits, mask_bits;
fi_addr_t remote_addr = ompi_mtl_ofi.any_addr;
@ -677,12 +779,14 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
size_t length;
bool free_after;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
if (ompi_mtl_ofi.fi_cq_data) {
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup(comm, src);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_addr = endpoint->peer_fiaddr;
remote_addr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
}
mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
@ -713,7 +817,7 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
ofi_req->remote_addr = remote_addr;
ofi_req->match_bits = match_bits;
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ep,
MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
start,
length,
NULL,
@ -725,9 +829,7 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
if (NULL != ofi_req->buffer) {
free(ofi_req->buffer);
}
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecv failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret, "fi_trecv failed");
return ompi_mtl_ofi_get_error(ret);
}
@ -798,9 +900,13 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
bool free_after;
struct iovec iov;
struct fi_msg_tagged msg;
int ompi_ret;
int ompi_ret, ctxt_id = 0;
ssize_t ret;
uint64_t msgflags = FI_CLAIM | FI_COMPLETION;
struct ompi_communicator_t *comm = (*message)->comm;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
&start,
@ -833,11 +939,9 @@ ompi_mtl_ofi_imrecv(struct mca_mtl_base_module_t *mtl,
msg.context = (void *)&ofi_req->ctx;
msg.data = 0;
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ep, &msg, msgflags), ret);
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
if (OPAL_UNLIKELY(0 > ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecvmsg failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
return ompi_mtl_ofi_get_error(ret);
}
@ -891,13 +995,17 @@ ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
ssize_t ret;
struct fi_msg_tagged msg;
uint64_t msgflags = FI_PEEK | FI_COMPLETION;
int ctxt_id = 0;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
if (ompi_mtl_ofi.fi_cq_data) {
/* If the source is known, use its peer_fiaddr. */
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
}
mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
@ -932,7 +1040,7 @@ ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
ofi_req.completion_count = 1;
ofi_req.match_state = 0;
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ep, &msg, msgflags), ret);
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
if (-FI_ENOMSG == ret) {
/**
* The search request completed but no matching message was found.
@ -940,9 +1048,7 @@ ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
*flag = 0;
return OMPI_SUCCESS;
} else if (OPAL_UNLIKELY(0 > ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecvmsg failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
return ompi_mtl_ofi_get_error(ret);
}
@ -977,6 +1083,10 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
ssize_t ret;
struct fi_msg_tagged msg;
uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION;
int ctxt_id = 0;
MTL_OFI_MAP_COMM_TO_CONTEXT(comm->c_contextid, ctxt_id);
set_thread_context(ctxt_id);
ofi_req = malloc(sizeof *ofi_req);
if (NULL == ofi_req) {
@ -991,7 +1101,7 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
remote_proc = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits);
}
mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
@ -1027,7 +1137,7 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
ofi_req->match_state = 0;
ofi_req->mask_bits = mask_bits;
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ep, &msg, msgflags), ret);
MTL_OFI_RETRY_UNTIL_DONE(fi_trecvmsg(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, &msg, msgflags), ret);
if (-FI_ENOMSG == ret) {
/**
* The search request completed but no matching message was found.
@ -1036,9 +1146,7 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
free(ofi_req);
return OMPI_SUCCESS;
} else if (OPAL_UNLIKELY(0 > ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_trecvmsg failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
MTL_OFI_LOG_FI_ERR(ret, "fi_trecvmsg failed");
free(ofi_req);
return ompi_mtl_ofi_get_error(ret);
}
@ -1076,9 +1184,11 @@ ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
mca_mtl_request_t *mtl_request,
int flag)
{
int ret;
int ret, ctxt_id = 0;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
MTL_OFI_MAP_COMM_TO_CONTEXT(ofi_req->comm->c_contextid, ctxt_id);
switch (ofi_req->type) {
case OMPI_MTL_OFI_SEND:
/**
@ -1095,7 +1205,8 @@ ompi_mtl_ofi_cancel(struct mca_mtl_base_module_t *mtl,
ompi_mtl_ofi_progress();
if (!ofi_req->req_started) {
ret = fi_cancel((fid_t)ompi_mtl_ofi.ep, &ofi_req->ctx);
ret = fi_cancel((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep,
&ofi_req->ctx);
if (0 == ret) {
/**
* Wait for the request to be cancelled.
@ -1122,18 +1233,224 @@ ofi_cancel_not_possible:
return OMPI_SUCCESS;
}
static int ompi_mtl_ofi_init_contexts(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
mca_mtl_ofi_ep_type ep_type)
{
int ret;
int ctxt_id = ompi_mtl_ofi.total_ctxts_used;
struct fi_cq_attr cq_attr = {0};
cq_attr.format = FI_CQ_FORMAT_TAGGED;
cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
if (OFI_REGULAR_EP == ep_type) {
/*
* For regular endpoints, just create the Lock object and register
* progress function.
*/
goto init_regular_ep;
}
/*
* We only create upto Max number of contexts allowed by provider.
* If user enables thread grouping feature and creates more number of
* communicators than we have contexts, then we set the threshold
* context_id so we know to use context 0 for operations involving these
* "extra" communicators.
*/
if (ompi_mtl_ofi.max_ctx_cnt <= ctxt_id) {
if (!ompi_mtl_ofi.threshold_comm_context_id) {
ompi_mtl_ofi.threshold_comm_context_id = comm->c_contextid;
opal_show_help("help-mtl-ofi.txt", "SEP ctxt limit", true, ctxt_id,
ompi_process_info.nodename, __FILE__, __LINE__);
}
return OMPI_SUCCESS;
}
/* Init context info for Scalable EPs */
ret = fi_tx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, NULL);
if (ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_tx_context failed");
goto init_error;
}
ret = fi_rx_context(ompi_mtl_ofi.sep, ctxt_id, NULL, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, NULL);
if (ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_rx_context failed");
goto init_error;
}
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq, NULL);
if (ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_cq_open failed");
goto init_error;
}
/* Bind CQ to TX/RX context object */
ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
FI_TRANSMIT | FI_SELECTIVE_COMPLETION);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_TRANSMIT) failed");
goto init_error;
}
ret = fi_ep_bind(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep, (fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq,
FI_RECV | FI_SELECTIVE_COMPLETION);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP (FI_RECV) failed");
goto init_error;
}
/* Enable Endpoint for communication. This commits the bind operations */
ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_enable (send context) failed");
goto init_error;
}
ret = fi_enable(ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_enable (recv context) failed");
goto init_error;
}
init_regular_ep:
/* Initialize per-context lock */
OBJ_CONSTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock, opal_mutex_t);
if (MPI_COMM_WORLD == comm) {
ret = opal_progress_register(ompi_mtl_ofi_progress_no_inline);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
__FILE__, __LINE__, ret);
goto init_error;
}
}
ompi_mtl_ofi.comm_to_context[comm->c_contextid] = ompi_mtl_ofi.total_ctxts_used;
ompi_mtl_ofi.total_ctxts_used++;
return OMPI_SUCCESS;
init_error:
if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep) {
(void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep);
}
if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep) {
(void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep);
}
if (ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq) {
(void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq);
}
return ret;
}
static int ompi_mtl_ofi_finalize_contexts(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm,
mca_mtl_ofi_ep_type ep_type)
{
int ret = OMPI_SUCCESS, ctxt_id = 0;
if (OFI_REGULAR_EP == ep_type) {
/* For regular EPs, simply destruct Lock object and exit */
goto finalize_regular_ep;
}
if (ompi_mtl_ofi.thread_grouping &&
ompi_mtl_ofi.threshold_comm_context_id &&
((uint32_t) ompi_mtl_ofi.threshold_comm_context_id <= comm->c_contextid)) {
return OMPI_SUCCESS;
}
ctxt_id = ompi_mtl_ofi.thread_grouping ?
ompi_mtl_ofi.comm_to_context[comm->c_contextid] : 0;
/*
* For regular EPs, TX/RX contexts are aliased to SEP object which is
* closed in ompi_mtl_ofi_finalize(). So, skip handling those here.
*/
if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].tx_ep))) {
goto finalize_err;
}
if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].rx_ep))) {
goto finalize_err;
}
if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[ctxt_id].cq))) {
goto finalize_err;
}
finalize_regular_ep:
/* Destroy context lock */
OBJ_DESTRUCT(&ompi_mtl_ofi.ofi_ctxt[ctxt_id].context_lock);
return OMPI_SUCCESS;
finalize_err:
opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
"fi_close",
ompi_process_info.nodename, __FILE__, __LINE__,
fi_strerror(-ret), ret);
return OMPI_ERROR;
}
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm)
{
int ret;
mca_mtl_ofi_ep_type ep_type = (false == ompi_mtl_ofi.sep_supported) ?
OFI_REGULAR_EP : OFI_SCALABLE_EP;
/*
* If thread grouping enabled, add new OFI context for each communicator
* other than MPI_COMM_SELF.
*/
if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
/* If no thread grouping, add new OFI context only
* for MPI_COMM_WORLD.
*/
(!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
ret = ompi_mtl_ofi_init_contexts(mtl, comm, ep_type);
if (OMPI_SUCCESS != ret) {
goto error;
}
}
return OMPI_SUCCESS;
error:
return OMPI_ERROR;
}
__opal_attribute_always_inline__ static inline int
ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl,
struct ompi_communicator_t *comm)
{
return OMPI_SUCCESS;
int ret = OMPI_SUCCESS;
mca_mtl_ofi_ep_type ep_type = (false == ompi_mtl_ofi.sep_supported) ?
OFI_REGULAR_EP : OFI_SCALABLE_EP;
/*
* Clean up OFI contexts information.
*/
if ((ompi_mtl_ofi.thread_grouping && (MPI_COMM_SELF != comm)) ||
(!ompi_mtl_ofi.thread_grouping && (MPI_COMM_WORLD == comm))) {
ret = ompi_mtl_ofi_finalize_contexts(mtl, comm, ep_type);
}
return ret;
}
END_C_DECLS

Просмотреть файл

@ -33,6 +33,11 @@ static int data_progress;
static int av_type;
static int ofi_tag_mode;
#if OPAL_HAVE_THREAD_LOCAL
opal_thread_local int per_thread_ctx;
opal_thread_local struct fi_cq_tagged_entry wc[MTL_OFI_MAX_PROG_EVENT_COUNT];
#endif
/*
* Enumerators
*/
@ -142,8 +147,8 @@ ompi_mtl_ofi_component_register(void)
MCA_BASE_VAR_SCOPE_READONLY,
&prov_exclude);
ompi_mtl_ofi.ofi_progress_event_count = 100;
asprintf(&desc, "Max number of events to read each call to OFI progress (default: %d events will be read per OFI progress call)", ompi_mtl_ofi.ofi_progress_event_count);
ompi_mtl_ofi.ofi_progress_event_count = MTL_OFI_MAX_PROG_EVENT_COUNT;
opal_asprintf(&desc, "Max number of events to read each call to OFI progress (default: %d events will be read per OFI progress call)", ompi_mtl_ofi.ofi_progress_event_count);
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
"progress_event_cnt",
desc,
@ -229,6 +234,15 @@ ompi_mtl_ofi_component_register(void)
&av_type);
OBJ_RELEASE(new_enum);
ompi_mtl_ofi.thread_grouping = 0;
mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version,
"thread_grouping",
"Enable/Disable Thread Grouping feature",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_3,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_mtl_ofi.thread_grouping);
return OMPI_SUCCESS;
}
@ -242,8 +256,7 @@ ompi_mtl_ofi_component_open(void)
ompi_mtl_ofi.domain = NULL;
ompi_mtl_ofi.av = NULL;
ompi_mtl_ofi.cq = NULL;
ompi_mtl_ofi.ep = NULL;
ompi_mtl_ofi.sep = NULL;
/**
* Sanity check: provider_include and provider_exclude must be mutually
@ -431,6 +444,158 @@ ompi_mtl_ofi_define_tag_mode(int ofi_tag_mode, int *bits_for_cid) {
}
}
#define MTL_OFI_ALLOC_COMM_TO_CONTEXT(num_ofi_ctxts) \
do { \
ompi_mtl_ofi.comm_to_context = calloc(num_ofi_ctxts, sizeof(int)); \
if (OPAL_UNLIKELY(!ompi_mtl_ofi.comm_to_context)) { \
opal_output_verbose(1, ompi_mtl_base_framework.framework_output, \
"%s:%d: alloc of comm_to_context array failed: %s\n",\
__FILE__, __LINE__, strerror(errno)); \
return ret; \
} \
} while (0);
#define MTL_OFI_ALLOC_OFI_CTXTS() \
do { \
ompi_mtl_ofi.ofi_ctxt = (mca_mtl_ofi_context_t *) malloc(ompi_mtl_ofi.max_ctx_cnt * \
sizeof(mca_mtl_ofi_context_t)); \
if (OPAL_UNLIKELY(!ompi_mtl_ofi.ofi_ctxt)) { \
opal_output_verbose(1, ompi_mtl_base_framework.framework_output, \
"%s:%d: alloc of ofi_ctxt array failed: %s\n", \
__FILE__, __LINE__, strerror(errno)); \
return ret; \
} \
} while(0);
static int ompi_mtl_ofi_init_sep(struct fi_info *prov)
{
int ret = OMPI_SUCCESS, num_ofi_ctxts;
struct fi_av_attr av_attr = {0};
ompi_mtl_ofi.max_ctx_cnt = (prov->domain_attr->max_ep_tx_ctx <
prov->domain_attr->max_ep_rx_ctx) ?
prov->domain_attr->max_ep_tx_ctx :
prov->domain_attr->max_ep_rx_ctx;
/* Provision enough contexts to service all ranks in a node */
ompi_mtl_ofi.max_ctx_cnt /= (1 + ompi_process_info.num_local_peers);
prov->ep_attr->tx_ctx_cnt = prov->ep_attr->rx_ctx_cnt =
ompi_mtl_ofi.max_ctx_cnt;
ret = fi_scalable_ep(ompi_mtl_ofi.domain, prov, &ompi_mtl_ofi.sep, NULL);
if (0 != ret) {
opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
"fi_scalable_ep",
ompi_process_info.nodename, __FILE__, __LINE__,
fi_strerror(-ret), -ret);
return ret;
}
ompi_mtl_ofi.rx_ctx_bits = 0;
while (ompi_mtl_ofi.max_ctx_cnt >> ++ompi_mtl_ofi.rx_ctx_bits);
av_attr.type = (MTL_OFI_AV_TABLE == av_type) ? FI_AV_TABLE: FI_AV_MAP;
av_attr.rx_ctx_bits = ompi_mtl_ofi.rx_ctx_bits;
av_attr.count = ompi_mtl_ofi.max_ctx_cnt;
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_av_open failed");
return ret;
}
ret = fi_scalable_ep_bind(ompi_mtl_ofi.sep, (fid_t)ompi_mtl_ofi.av, 0);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_bind AV-EP failed");
return ret;
}
/*
* If SEP supported and Thread Grouping feature enabled, use
* max_ctx_cnt + 2. Extra 2 items is to accomodate Open MPI contextid
* numbering- COMM_WORLD is 0, COMM_SELF is 1. Other user created
* Comm contextid values are assigned sequentially starting with 3.
*/
num_ofi_ctxts = ompi_mtl_ofi.thread_grouping ?
ompi_mtl_ofi.max_ctx_cnt + 2 : 1;
MTL_OFI_ALLOC_COMM_TO_CONTEXT(num_ofi_ctxts);
ompi_mtl_ofi.total_ctxts_used = 0;
ompi_mtl_ofi.threshold_comm_context_id = 0;
/* Allocate memory for OFI contexts */
MTL_OFI_ALLOC_OFI_CTXTS();
return ret;
}
static int ompi_mtl_ofi_init_regular_ep(struct fi_info * prov)
{
int ret = OMPI_SUCCESS, num_ofi_ctxts;
struct fi_av_attr av_attr = {0};
struct fi_cq_attr cq_attr = {0};
cq_attr.format = FI_CQ_FORMAT_TAGGED;
cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
ompi_mtl_ofi.max_ctx_cnt = 1;
ret = fi_endpoint(ompi_mtl_ofi.domain, /* In: Domain object */
prov, /* In: Provider */
&ompi_mtl_ofi.sep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
"fi_endpoint",
ompi_process_info.nodename, __FILE__, __LINE__,
fi_strerror(-ret), -ret);
return ret;
}
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - address vector and completion queues
*/
av_attr.type = (MTL_OFI_AV_TABLE == av_type) ? FI_AV_TABLE: FI_AV_MAP;
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
if (ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_av_open failed");
return ret;
}
ret = fi_ep_bind(ompi_mtl_ofi.sep,
(fid_t)ompi_mtl_ofi.av,
0);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_bind AV-EP failed");
return ret;
}
num_ofi_ctxts = 1;
MTL_OFI_ALLOC_COMM_TO_CONTEXT(num_ofi_ctxts);
/* Allocate memory for OFI contexts */
MTL_OFI_ALLOC_OFI_CTXTS();
ompi_mtl_ofi.ofi_ctxt[0].tx_ep = ompi_mtl_ofi.sep;
ompi_mtl_ofi.ofi_ctxt[0].rx_ep = ompi_mtl_ofi.sep;
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.ofi_ctxt[0].cq, NULL);
if (ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_cq_open failed");
return ret;
}
/* Bind CQ to endpoint object */
ret = fi_ep_bind(ompi_mtl_ofi.sep, (fid_t)ompi_mtl_ofi.ofi_ctxt[0].cq,
FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION);
if (0 != ret) {
MTL_OFI_LOG_FI_ERR(ret, "fi_bind CQ-EP failed");
return ret;
}
return ret;
}
static mca_mtl_base_module_t*
ompi_mtl_ofi_component_init(bool enable_progress_threads,
bool enable_mpi_threads)
@ -440,8 +605,6 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
struct fi_info *providers = NULL;
struct fi_info *prov = NULL;
struct fi_info *prov_cq_data = NULL;
struct fi_cq_attr cq_attr = {0};
struct fi_av_attr av_attr = {0};
char ep_name[FI_NAME_MAX] = {0};
size_t namelen;
int ofi_tag_leading_zeros;
@ -473,8 +636,10 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
hints->tx_attr->op_flags = FI_COMPLETION;
if (enable_mpi_threads) {
ompi_mtl_ofi.mpi_thread_multiple = true;
hints->domain_attr->threading = FI_THREAD_SAFE;
} else {
ompi_mtl_ofi.mpi_thread_multiple = false;
hints->domain_attr->threading = FI_THREAD_DOMAIN;
}
@ -607,6 +772,27 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
ompi_mtl_ofi.base.mtl_max_contextid = (int)((1ULL << ofi_tag_bits_for_cid) - 1);
ompi_mtl_ofi.num_peers = 0;
/* Check if Scalable Endpoints can be enabled for the provider */
ompi_mtl_ofi.sep_supported = false;
if ((prov->domain_attr->max_ep_tx_ctx > 1) ||
(prov->domain_attr->max_ep_rx_ctx > 1)) {
ompi_mtl_ofi.sep_supported = true;
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: Scalable EP supported in %s provider. Enabling in MTL.\n",
__FILE__, __LINE__, prov->fabric_attr->prov_name);
}
/*
* Scalable Endpoints is required for Thread Grouping feature
*/
if (!ompi_mtl_ofi.sep_supported && ompi_mtl_ofi.thread_grouping) {
opal_show_help("help-mtl-ofi.txt", "SEP unavailable", true,
prov->fabric_attr->prov_name,
ompi_process_info.nodename, __FILE__, __LINE__,
fi_strerror(-ret), -ret);
goto error;
}
/**
* Open fabric
* The getinfo struct returns a fabric attribute struct that can be used to
@ -641,25 +827,6 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
goto error;
}
/**
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to completion counters or event queues and enabled,
* and the resources consumed by it, such as address vectors, counters,
* completion queues, etc.
* see man fi_endpoint for more details.
*/
ret = fi_endpoint(ompi_mtl_ofi.domain, /* In: Domain object */
prov, /* In: Provider */
&ompi_mtl_ofi.ep, /* Out: Endpoint object */
NULL); /* Optional context */
if (0 != ret) {
opal_show_help("help-mtl-ofi.txt", "OFI call fail", true,
"fi_endpoint",
ompi_process_info.nodename, __FILE__, __LINE__,
fi_strerror(-ret), -ret);
goto error;
}
/**
* Save the maximum sizes.
*/
@ -667,76 +834,37 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
ompi_mtl_ofi.max_msg_size = prov->ep_attr->max_msg_size;
/**
* Create the objects that will be bound to the endpoint.
* The objects include:
* - completion queue for events
* - address vector of other endpoint addresses
* - dynamic memory-spanning memory region
* The user is not allowed to exceed MTL_OFI_MAX_PROG_EVENT_COUNT.
* The reason is because progress entries array is now a TLS variable
* as opposed to being allocated on the heap for thread-safety purposes.
*/
cq_attr.format = FI_CQ_FORMAT_TAGGED;
if (ompi_mtl_ofi.ofi_progress_event_count > MTL_OFI_MAX_PROG_EVENT_COUNT) {
ompi_mtl_ofi.ofi_progress_event_count = MTL_OFI_MAX_PROG_EVENT_COUNT;
}
/**
* If a user has set an ofi_progress_event_count > the default, then
* the CQ size hint is set to the user's desired value such that
* the CQ created will have enough slots to store up to
* ofi_progress_event_count events. If a user has not set the
* ofi_progress_event_count, then the provider is trusted to set a
* default high CQ size and the CQ size hint is left unspecified.
* Create a transport level communication endpoint. To use the endpoint,
* it must be bound to the resources consumed by it such as address
* vectors, completion counters or event queues etc, and enabled.
* See man fi_endpoint for more details.
*/
if (ompi_mtl_ofi.ofi_progress_event_count > 100) {
cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count;
if (true == ompi_mtl_ofi.sep_supported) {
ret = ompi_mtl_ofi_init_sep(prov);
} else {
ret = ompi_mtl_ofi_init_regular_ep(prov);
}
ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_cq_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
if (OMPI_SUCCESS != ret) {
goto error;
}
av_attr.type = (MTL_OFI_AV_TABLE == av_type) ? FI_AV_TABLE: FI_AV_MAP;
ompi_mtl_ofi.total_ctxts_used = 0;
ompi_mtl_ofi.threshold_comm_context_id = 0;
ret = fi_av_open(ompi_mtl_ofi.domain, &av_attr, &ompi_mtl_ofi.av, NULL);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_av_open failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Bind the CQ and AV to the endpoint object.
*/
ret = fi_ep_bind(ompi_mtl_ofi.ep,
(fid_t)ompi_mtl_ofi.cq,
FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION);
/* Enable Endpoint for communication */
ret = fi_enable(ompi_mtl_ofi.sep);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_bind CQ-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
ret = fi_ep_bind(ompi_mtl_ofi.ep,
(fid_t)ompi_mtl_ofi.av,
0);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_bind AV-EP failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
goto error;
}
/**
* Enable the endpoint for communication
* This commits the bind operations.
*/
ret = fi_enable(ompi_mtl_ofi.ep);
if (0 != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_enable failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
MTL_OFI_LOG_FI_ERR(ret, "fi_enable failed");
goto error;
}
@ -754,11 +882,11 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
* Get our address and publish it with modex.
*/
namelen = sizeof(ep_name);
ret = fi_getname((fid_t)ompi_mtl_ofi.ep, &ep_name[0], &namelen);
ret = fi_getname((fid_t)ompi_mtl_ofi.sep,
&ep_name[0],
&namelen);
if (ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_getname failed: %s\n",
__FILE__, __LINE__, fi_strerror(-ret));
MTL_OFI_LOG_FI_ERR(ret, "fi_getname failed");
goto error;
}
@ -780,17 +908,6 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads,
*/
ompi_mtl_ofi.any_addr = FI_ADDR_UNSPEC;
/**
* Activate progress callback.
*/
ret = opal_progress_register(ompi_mtl_ofi_progress_no_inline);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
return &ompi_mtl_ofi.base;
error:
@ -803,14 +920,16 @@ error:
if (hints) {
(void) fi_freeinfo(hints);
}
if (ompi_mtl_ofi.sep) {
(void) fi_close((fid_t)ompi_mtl_ofi.sep);
}
if (ompi_mtl_ofi.av) {
(void) fi_close((fid_t)ompi_mtl_ofi.av);
}
if (ompi_mtl_ofi.cq) {
(void) fi_close((fid_t)ompi_mtl_ofi.cq);
}
if (ompi_mtl_ofi.ep) {
(void) fi_close((fid_t)ompi_mtl_ofi.ep);
if ((false == ompi_mtl_ofi.sep_supported) &&
ompi_mtl_ofi.ofi_ctxt[0].cq) {
/* Check if CQ[0] was created for non-SEP case and close if needed */
(void) fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[0].cq);
}
if (ompi_mtl_ofi.domain) {
(void) fi_close((fid_t)ompi_mtl_ofi.domain);
@ -818,6 +937,12 @@ error:
if (ompi_mtl_ofi.fabric) {
(void) fi_close((fid_t)ompi_mtl_ofi.fabric);
}
if (ompi_mtl_ofi.comm_to_context) {
free(ompi_mtl_ofi.comm_to_context);
}
if (ompi_mtl_ofi.ofi_ctxt) {
free(ompi_mtl_ofi.ofi_ctxt);
}
return NULL;
}
@ -830,11 +955,7 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
opal_progress_unregister(ompi_mtl_ofi_progress_no_inline);
/* Close all the OFI objects */
if ((ret = fi_close((fid_t)ompi_mtl_ofi.ep))) {
goto finalize_err;
}
if ((ret = fi_close((fid_t)ompi_mtl_ofi.cq))) {
if ((ret = fi_close((fid_t)ompi_mtl_ofi.sep))) {
goto finalize_err;
}
@ -842,6 +963,18 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
goto finalize_err;
}
if (false == ompi_mtl_ofi.sep_supported) {
/*
* CQ[0] is bound to SEP object when SEP is not supported by a
* provider. OFI spec requires that we close the Endpoint that is bound
* to the CQ before closing the CQ itself. So, for the non-SEP case, we
* handle the closing of CQ[0] here.
*/
if ((ret = fi_close((fid_t)ompi_mtl_ofi.ofi_ctxt[0].cq))) {
goto finalize_err;
}
}
if ((ret = fi_close((fid_t)ompi_mtl_ofi.domain))) {
goto finalize_err;
}
@ -850,6 +983,10 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl)
goto finalize_err;
}
/* Free memory allocated for TX/RX contexts */
free(ompi_mtl_ofi.comm_to_context);
free(ompi_mtl_ofi.ofi_ctxt);
return OMPI_SUCCESS;
finalize_err:

Просмотреть файл

@ -19,6 +19,19 @@ BEGIN_C_DECLS
/**
* MTL Module Interface
*/
typedef struct mca_mtl_ofi_context_t {
/* Transmit and receive contexts */
struct fid_ep *tx_ep;
struct fid_ep *rx_ep;
/* Completion queue */
struct fid_cq *cq;
/* Thread locking */
opal_mutex_t context_lock;
} mca_mtl_ofi_context_t;
typedef struct mca_mtl_ofi_module_t {
mca_mtl_base_module_t base;
@ -31,11 +44,38 @@ typedef struct mca_mtl_ofi_module_t {
/** Address vector handle */
struct fid_av *av;
/** Completion queue handle */
struct fid_cq *cq;
/* Scalable Endpoint */
struct fid_ep *sep;
/** Endpoint to communicate on */
struct fid_ep *ep;
/* Multi-threaded Application flag */
bool mpi_thread_multiple;
/* OFI contexts */
mca_mtl_ofi_context_t *ofi_ctxt;
/* Max context count for scalable endpoints */
int max_ctx_cnt;
/* Total number of TX/RX contexts used by MTL */
int total_ctxts_used;
/*
* Store context id of communicator if creating more than number of
* contexts
*/
int threshold_comm_context_id;
/* Mapping of communicator ID to OFI context */
int *comm_to_context;
/* MCA parameter for Thread grouping feature */
int thread_grouping;
/* Boolen value to indicate if provider supports Scalable EP or not */
bool sep_supported;
/* Numbers of bits used for rx contexts */
int rx_ctx_bits;
/** Endpoint name length */
size_t epnamelen;
@ -80,6 +120,19 @@ typedef struct mca_mtl_ofi_component_t {
mca_mtl_base_component_2_0_0_t super;
} mca_mtl_ofi_component_t;
typedef enum {
OFI_REGULAR_EP = 0,
OFI_SCALABLE_EP,
} mca_mtl_ofi_ep_type;
/*
* Define upper limit for number of events read from a CQ.
* Setting this to 100 as this was deemed optimal from empirical data.
* If one wants to read lesser number of events from the CQ, the MCA
* variable can be used.
*/
#define MTL_OFI_MAX_PROG_EVENT_COUNT 100
/*OFI TAG:
* Define 3 different OFI tag distributions:
* 1) Support FI_REMOTE_CQ_DATA: No need for source rank in the tag