1
1

Merge pull request #5882 from hjelmn/btl_uct_updates_for_the_v4_branch

btl/uct: bug fixes and general improvements
Этот коммит содержится в:
Howard Pritchard 2018-10-22 13:12:34 -06:00 коммит произвёл GitHub
родитель f9d2f3b912 e6f84e79de
Коммит e8f28a5506
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 402 добавлений и 281 удалений

Просмотреть файл

@ -68,7 +68,7 @@ struct mca_btl_uct_module_t {
opal_hash_table_t id_to_endpoint;
/** mutex to protect the module */
opal_mutex_t lock;
opal_recursive_mutex_t lock;
/** async context */
ucs_async_context_t *ucs_async;
@ -106,11 +106,11 @@ struct mca_btl_uct_module_t {
/** large registered frags for packing non-contiguous data */
opal_free_list_t max_frags;
/** RDMA completions */
opal_free_list_t rdma_completions;
/** frags that were waiting on connections that are now ready to send */
opal_list_t pending_frags;
/** pending connection requests */
opal_fifo_t pending_connection_reqs;
};
typedef struct mca_btl_uct_module_t mca_btl_uct_module_t;
@ -281,6 +281,7 @@ ucs_status_t mca_btl_uct_am_handler (void *arg, void *data, size_t length, unsig
struct mca_btl_base_endpoint_t *mca_btl_uct_get_ep (struct mca_btl_base_module_t *module, opal_proc_t *proc);
int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, uct_tl_resource_desc_t *tl_descs, unsigned tl_count);
int mca_btl_uct_process_connection_request (mca_btl_uct_module_t *module, mca_btl_uct_conn_req_t *req);
/**
* @brief Checks if a tl is suitable for using for RDMA
@ -289,7 +290,7 @@ int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, u
*/
static inline bool mca_btl_uct_tl_supports_rdma (mca_btl_uct_tl_t *tl)
{
return (tl->uct_iface_attr.cap.flags & (UCT_IFACE_FLAG_PUT_ZCOPY | UCT_IFACE_FLAG_GET_ZCOPY)) ==
return (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & (UCT_IFACE_FLAG_PUT_ZCOPY | UCT_IFACE_FLAG_GET_ZCOPY)) ==
(UCT_IFACE_FLAG_PUT_ZCOPY | UCT_IFACE_FLAG_GET_ZCOPY);
}
@ -298,7 +299,7 @@ static inline bool mca_btl_uct_tl_supports_rdma (mca_btl_uct_tl_t *tl)
*/
static inline bool mca_btl_uct_tl_support_am (mca_btl_uct_tl_t *tl)
{
return (tl->uct_iface_attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_ZCOPY));
return (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_ZCOPY));
}
/**
@ -308,7 +309,7 @@ static inline bool mca_btl_uct_tl_support_am (mca_btl_uct_tl_t *tl)
*/
static inline bool mca_btl_uct_tl_supports_conn (mca_btl_uct_tl_t *tl)
{
return (tl->uct_iface_attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_CONNECT_TO_IFACE)) ==
return (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_CONNECT_TO_IFACE)) ==
(UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_CONNECT_TO_IFACE);
}
@ -319,7 +320,7 @@ static inline bool mca_btl_uct_tl_supports_conn (mca_btl_uct_tl_t *tl)
*/
static inline bool mca_btl_uct_tl_requires_connection_tl (mca_btl_uct_tl_t *tl)
{
return !(tl->uct_iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE);
return !(MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE);
}
END_C_DECLS

Просмотреть файл

@ -25,7 +25,7 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
mca_btl_uct_module_t *uct_btl = (mca_btl_uct_module_t *) btl;
mca_btl_uct_base_frag_t *frag = NULL;
if ((size + 8) <= (size_t) uct_btl->am_tl->uct_iface_attr.cap.am.max_short) {
if (size <= (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) {
frag = mca_btl_uct_frag_alloc_short (uct_btl, endpoint);
} else if (size <= uct_btl->super.btl_eager_limit) {
frag = mca_btl_uct_frag_alloc_eager (uct_btl, endpoint);
@ -40,6 +40,10 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt
frag->base.des_flags = flags;
frag->base.order = order;
frag->uct_iov.length = size;
if (NULL != frag->base.super.registration) {
/* zero-copy fragments will need callbacks */
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
}
}
return (mca_btl_base_descriptor_t *) frag;
@ -95,14 +99,18 @@ struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src (mca_btl_base_module_t
return NULL;
}
frag->uct_iov.length = total_size;
frag->base.order = order;
frag->base.des_flags = flags;
if (total_size > (size_t) uct_btl->am_tl->uct_iface_attr.cap.am.max_short) {
if (total_size > (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) {
frag->segments[0].seg_len = reserve;
frag->segments[1].seg_len = *size;
frag->segments[1].seg_addr.pval = data_ptr;
frag->base.des_segment_count = 2;
} else {
frag->segments[0].seg_len = total_size;
memcpy ((void *)((intptr_t) frag->segments[1].seg_addr.pval + reserve), data_ptr, *size);
frag->base.des_segment_count = 1;
}
}
@ -130,7 +138,7 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
data = (void *)((intptr_t) data + 8);
/* this function should only ever get called with fragments with two segments */
for (size_t i = 0 ; i < 2 ; ++i) {
for (size_t i = 0 ; i < frag->base.des_segment_count ; ++i) {
const size_t seg_len = frag->segments[i].seg_len;
memcpy (data, frag->segments[i].seg_addr.pval, seg_len);
data = (void *)((intptr_t) data + seg_len);
@ -140,57 +148,84 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg)
return length;
}
int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint, mca_btl_uct_base_frag_t *frag,
int32_t flags, mca_btl_uct_device_context_t *context, uct_ep_h ep_handle)
static void mca_btl_uct_append_pending_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag,
mca_btl_uct_device_context_t *context, bool ready)
{
frag->ready = ready;
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
opal_atomic_wmb ();
opal_list_append (&uct_btl->pending_frags, (opal_list_item_t *) frag);
}
int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag, bool append)
{
mca_btl_uct_device_context_t *context = frag->context;
const ssize_t msg_size = frag->uct_iov.length + 8;
ssize_t size;
ucs_status_t ucs_status;
uct_ep_h ep_handle = NULL;
mca_btl_uct_context_lock (context);
/* if we get here then we must have an endpoint handle for this context/endpoint pair */
(void) mca_btl_uct_endpoint_test_am (uct_btl, frag->endpoint, frag->context, &ep_handle);
assert (NULL != ep_handle);
do {
/* if another thread set this we really don't care too much as this flag is only meant
* to protect against deep recursion */
if (!context->in_am_callback) {
mca_btl_uct_context_lock (context);
/* attempt to post the fragment */
if (NULL != frag->base.super.registration) {
frag->comp.dev_context = context;
ucs_status = uct_ep_am_zcopy (ep_handle, MCA_BTL_UCT_FRAG, &frag->header, sizeof (frag->header),
&frag->uct_iov, 1, 0, &frag->comp.uct_comp);
if (OPAL_LIKELY(UCS_INPROGRESS == ucs_status)) {
uct_worker_progress (context->uct_worker);
mca_btl_uct_context_unlock (context);
return OPAL_SUCCESS;
}
} else {
/* short message */
/* restore original flags */
frag->base.des_flags = flags;
if (1 == frag->base.des_segment_count) {
if (1 == frag->base.des_segment_count && (frag->uct_iov.length + 8) < MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) {
ucs_status = uct_ep_am_short (ep_handle, MCA_BTL_UCT_FRAG, frag->header.value, frag->uct_iov.buffer,
frag->uct_iov.length);
} else {
ucs_status = uct_ep_am_bcopy (ep_handle, MCA_BTL_UCT_FRAG, mca_btl_uct_send_frag_pack, frag, 0);
if (OPAL_LIKELY(UCS_OK == ucs_status)) {
uct_worker_progress (context->uct_worker);
mca_btl_uct_context_unlock (context);
/* send is complete */
mca_btl_uct_frag_complete (frag, OPAL_SUCCESS);
return 1;
}
}
size = uct_ep_am_bcopy (ep_handle, MCA_BTL_UCT_FRAG, mca_btl_uct_send_frag_pack, frag, 0);
if (OPAL_LIKELY(size == msg_size)) {
uct_worker_progress (context->uct_worker);
mca_btl_uct_context_unlock (context);
/* send is complete */
mca_btl_uct_frag_complete (frag, OPAL_SUCCESS);
return 1;
}
}
if (UCS_ERR_NO_RESOURCE != ucs_status) {
/* go ahead and progress the worker while we have the lock */
(void) uct_worker_progress (context->uct_worker);
break;
}
/* wait for something to happen */
uct_worker_progress (context->uct_worker);
mca_btl_uct_context_unlock (context);
/* wait for something to complete before trying again */
while (!uct_worker_progress (context->uct_worker));
} while (1);
mca_btl_uct_context_unlock (context);
if (UCS_OK == ucs_status) {
/* restore original flags */
frag->base.des_flags = flags;
/* send is complete */
mca_btl_uct_frag_complete (frag, OPAL_SUCCESS);
return 1;
mca_btl_uct_device_handle_completions (context);
}
if (OPAL_UNLIKELY(UCS_INPROGRESS != ucs_status)) {
if (!append) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
return 0;
OPAL_THREAD_LOCK(&uct_btl->lock);
mca_btl_uct_append_pending_frag (uct_btl, frag, context, true);
OPAL_THREAD_UNLOCK(&uct_btl->lock);
return OPAL_SUCCESS;
}
int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, mca_btl_base_descriptor_t *descriptor,
@ -199,7 +234,6 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
mca_btl_uct_module_t *uct_btl = (mca_btl_uct_module_t *) btl;
mca_btl_uct_device_context_t *context = mca_btl_uct_module_get_am_context (uct_btl);
mca_btl_uct_base_frag_t *frag = (mca_btl_uct_base_frag_t *) descriptor;
int flags = frag->base.des_flags;
uct_ep_h ep_handle;
int rc;
@ -208,28 +242,21 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
frag->header.data.tag = tag;
/* add the callback flag before posting to avoid potential races with other threads */
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
frag->context = context;
rc = mca_btl_uct_endpoint_check_am (uct_btl, endpoint, context, &ep_handle);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
OPAL_THREAD_LOCK(&endpoint->ep_lock);
OPAL_THREAD_LOCK(&uct_btl->lock);
/* check one more time in case another thread is completing the connection now */
if (OPAL_SUCCESS != mca_btl_uct_endpoint_test_am (uct_btl, endpoint, context, &ep_handle)) {
frag->context_id = context->context_id;
frag->ready = false;
OPAL_THREAD_LOCK(&uct_btl->lock);
opal_list_append (&uct_btl->pending_frags, (opal_list_item_t *) frag);
OPAL_THREAD_UNLOCK(&endpoint->ep_lock);
mca_btl_uct_append_pending_frag (uct_btl, frag, context, false);
OPAL_THREAD_UNLOCK(&uct_btl->lock);
return OPAL_SUCCESS;
}
OPAL_THREAD_UNLOCK(&endpoint->ep_lock);
OPAL_THREAD_UNLOCK(&uct_btl->lock);
}
return mca_btl_uct_send_frag (uct_btl, endpoint, frag, flags, context, ep_handle);
return mca_btl_uct_send_frag (uct_btl, frag, true);
}
struct mca_btl_uct_sendi_pack_args_t {
@ -253,10 +280,9 @@ static size_t mca_btl_uct_sendi_pack (void *data, void *arg)
return args->header_size + args->payload_size + 8;
}
static inline size_t mca_btl_uct_max_sendi (mca_btl_uct_module_t *uct_btl)
static inline size_t mca_btl_uct_max_sendi (mca_btl_uct_module_t *uct_btl, int context_id)
{
return (uct_btl->am_tl->uct_iface_attr.cap.am.max_short > uct_btl->am_tl->uct_iface_attr.cap.am.max_bcopy) ?
uct_btl->am_tl->uct_iface_attr.cap.am.max_short : uct_btl->am_tl->uct_iface_attr.cap.am.max_bcopy;
return MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, context_id).cap.am.max_bcopy;
}
int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, opal_convertor_t *convertor,
@ -269,12 +295,12 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp
/* message with header */
const size_t msg_size = total_size + 8;
mca_btl_uct_am_header_t am_header;
ucs_status_t ucs_status = UCS_OK;
ucs_status_t ucs_status = UCS_ERR_NO_RESOURCE;
uct_ep_h ep_handle;
int rc;
rc = mca_btl_uct_endpoint_check_am (uct_btl, endpoint, context, &ep_handle);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc || msg_size > mca_btl_uct_max_sendi (uct_btl))) {
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc || msg_size > mca_btl_uct_max_sendi (uct_btl, context->context_id))) {
if (descriptor) {
*descriptor = mca_btl_uct_alloc (btl, endpoint, order, total_size, flags);
}
@ -287,7 +313,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp
mca_btl_uct_context_lock (context);
if (0 == payload_size) {
ucs_status = uct_ep_am_short (ep_handle, MCA_BTL_UCT_FRAG, am_header.value, header, header_size);
} else if (msg_size < (size_t) uct_btl->am_tl->uct_iface_attr.cap.am.max_short) {
} else if (msg_size < (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, context->context_id).cap.am.max_short) {
int8_t *data = alloca (total_size);
_mca_btl_uct_send_pack (data, header, header_size, convertor, payload_size);
ucs_status = uct_ep_am_short (ep_handle, MCA_BTL_UCT_FRAG, am_header.value, data, total_size);

Просмотреть файл

@ -27,8 +27,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp
int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, mca_btl_base_descriptor_t *descriptor,
mca_btl_base_tag_t tag);
int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint, mca_btl_uct_base_frag_t *frag,
int32_t flags, mca_btl_uct_device_context_t *context, uct_ep_h ep_handle);
int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag, bool append);
mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint,
uint8_t order, size_t size, uint32_t flags);

Просмотреть файл

@ -104,8 +104,10 @@ int mca_btl_uct_afop (struct mca_btl_base_module_t *btl, struct mca_btl_base_end
rc = OPAL_SUCCESS;
} else if (UCS_OK == ucs_status) {
rc = 1;
mca_btl_uct_uct_completion_release (comp);
} else {
rc = OPAL_ERR_OUT_OF_RESOURCE;
mca_btl_uct_uct_completion_release (comp);
}
uct_rkey_release (&rkey);
@ -176,8 +178,10 @@ int mca_btl_uct_acswap (struct mca_btl_base_module_t *btl, struct mca_btl_base_e
rc = OPAL_SUCCESS;
} else if (UCS_OK == ucs_status) {
rc = 1;
mca_btl_uct_uct_completion_release (comp);
} else {
rc = OPAL_ERR_OUT_OF_RESOURCE;
mca_btl_uct_uct_completion_release (comp);
}
uct_rkey_release (&rkey);

Просмотреть файл

@ -28,6 +28,9 @@
#include "opal/mca/btl/base/base.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/util/argv.h"
#include "opal/memoryhooks/memory.h"
#include "opal/mca/memory/base/base.h"
#include <ucm/api/ucm.h>
#include <string.h>
@ -47,13 +50,13 @@ static int mca_btl_uct_component_register(void)
MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL,
&mca_btl_uct_component.memory_domains);
mca_btl_uct_component.allowed_transports = "any";
mca_btl_uct_component.allowed_transports = "dc_mlx5,rc_mlx5,ud,any";
(void) mca_base_component_var_register(&mca_btl_uct_component.super.btl_version,
"transports", "Comma-delimited list of transports of the form to use."
" The list of transports available can be queried using ucx_info. Special"
"values: any (any available) (default: any)", MCA_BASE_VAR_TYPE_STRING,
NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_LOCAL,
&mca_btl_uct_component.allowed_transports);
"transports", "Comma-delimited list of transports to use sorted by increasing "
"priority. The list of transports available can be queried using ucx_info. Special"
"values: any (any available) (default: dc_mlx5,rc_mlx5,ud,any)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_3,
MCA_BASE_VAR_SCOPE_LOCAL, &mca_btl_uct_component.allowed_transports);
mca_btl_uct_component.num_contexts_per_module = 0;
(void) mca_base_component_var_register(&mca_btl_uct_component.super.btl_version,
@ -93,6 +96,11 @@ static int mca_btl_uct_component_register(void)
&module->super);
}
static void mca_btl_uct_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc)
{
ucm_vm_munmap(buf, length);
}
static int mca_btl_uct_component_open(void)
{
if (0 == mca_btl_uct_component.num_contexts_per_module) {
@ -112,6 +120,15 @@ static int mca_btl_uct_component_open(void)
}
}
if (mca_btl_uct_component.num_contexts_per_module > MCA_BTL_UCT_MAX_WORKERS) {
mca_btl_uct_component.num_contexts_per_module = MCA_BTL_UCT_MAX_WORKERS;
}
if (mca_btl_uct_component.disable_ucx_memory_hooks) {
ucm_set_external_event(UCM_EVENT_VM_UNMAPPED);
opal_mem_hooks_register_release(mca_btl_uct_mem_release_cb, NULL);
}
return OPAL_SUCCESS;
}
@ -121,6 +138,10 @@ static int mca_btl_uct_component_open(void)
*/
static int mca_btl_uct_component_close(void)
{
if (mca_btl_uct_component.disable_ucx_memory_hooks) {
opal_mem_hooks_unregister_release (mca_btl_uct_mem_release_cb);
}
return OPAL_SUCCESS;
}
@ -128,12 +149,12 @@ static size_t mca_btl_uct_tl_modex_size (mca_btl_uct_tl_t *tl)
{
const size_t size = strlen (tl->uct_tl_name) + 1;
if (tl->uct_iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
if (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
/* pad out to a multiple of 4 bytes */
return (4 + 3 + size + tl->uct_iface_attr.device_addr_len + tl->uct_iface_attr.iface_addr_len) & ~3;
return (4 + 3 + size + MCA_BTL_UCT_TL_ATTR(tl, 0).device_addr_len + MCA_BTL_UCT_TL_ATTR(tl, 0).iface_addr_len) & ~3;
}
return (4 + 3 + size + tl->uct_iface_attr.device_addr_len) & ~3;
return (4 + 3 + size + MCA_BTL_UCT_TL_ATTR(tl, 0).device_addr_len) & ~3;
}
static size_t mca_btl_uct_module_modex_size (mca_btl_uct_module_t *module)
@ -172,13 +193,13 @@ static size_t mca_btl_uct_tl_modex_pack (mca_btl_uct_tl_t *tl, uint8_t *modex_da
* the same endpoint since we are only doing RDMA. if any of these assumptions are
* wrong then we can't delay creating the other contexts and must include their
* information in the modex. */
if (tl->uct_iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
if (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
uct_iface_get_address (dev_context->uct_iface, (uct_iface_addr_t *) modex_data);
modex_data += tl->uct_iface_attr.iface_addr_len;
modex_data += MCA_BTL_UCT_TL_ATTR(tl, 0).iface_addr_len;
}
uct_iface_get_device_address (dev_context->uct_iface, (uct_device_addr_t *) modex_data);
modex_data += tl->uct_iface_attr.device_addr_len;
modex_data += MCA_BTL_UCT_TL_ATTR(tl, 0).device_addr_len;
return modex_size;
}
@ -247,9 +268,9 @@ static mca_btl_uct_module_t *mca_btl_uct_alloc_module (const char *md_name, mca_
OBJ_CONSTRUCT(&module->short_frags, opal_free_list_t);
OBJ_CONSTRUCT(&module->eager_frags, opal_free_list_t);
OBJ_CONSTRUCT(&module->max_frags, opal_free_list_t);
OBJ_CONSTRUCT(&module->rdma_completions, opal_free_list_t);
OBJ_CONSTRUCT(&module->pending_frags, opal_list_t);
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
OBJ_CONSTRUCT(&module->pending_connection_reqs, opal_fifo_t);
module->md = md;
module->md_name = strdup (md_name);
@ -275,10 +296,13 @@ ucs_status_t mca_btl_uct_am_handler (void *arg, void *data, size_t length, unsig
.seg_len = length - sizeof (*header)};
mca_btl_uct_base_frag_t frag = {.base = {.des_segments = &seg, .des_segment_count = 1}};
/* prevent recursion */
tl_context->in_am_callback = true;
reg = mca_btl_base_active_message_trigger + header->data.tag;
mca_btl_uct_context_unlock (tl_context);
reg->cbfunc (&uct_btl->super, header->data.tag, &frag.base, reg->cbdata);
mca_btl_uct_context_lock (tl_context);
tl_context->in_am_callback = false;
return UCS_OK;
}
@ -468,8 +492,7 @@ static int mca_btl_uct_component_progress_pending (mca_btl_uct_module_t *uct_btl
opal_list_remove_item (&uct_btl->pending_frags, (opal_list_item_t *) frag);
if (OPAL_SUCCESS > mca_btl_uct_send (&uct_btl->super, frag->endpoint, &frag->base,
frag->header.data.tag)) {
if (OPAL_SUCCESS > mca_btl_uct_send_frag (uct_btl, frag, false)) {
opal_list_prepend (&uct_btl->pending_frags, (opal_list_item_t *) frag);
}
}
@ -500,9 +523,16 @@ static int mca_btl_uct_component_progress (void)
}
if (module->conn_tl) {
mca_btl_uct_pending_connection_request_t *request;
if (module->conn_tl != module->am_tl && module->conn_tl != module->rdma_tl) {
ret += mca_btl_uct_tl_progress (module->conn_tl, 0);
}
while (NULL != (request = (mca_btl_uct_pending_connection_request_t *) opal_fifo_pop_atomic (&module->pending_connection_reqs))) {
mca_btl_uct_process_connection_request (module, (mca_btl_uct_conn_req_t *) request->request_data);
OBJ_RELEASE(request);
}
}
if (0 != opal_list_get_size (&module->pending_frags)) {

Просмотреть файл

@ -23,7 +23,7 @@
* @param[in] tl btl uct tl pointer
* @param[in] context_id identifier for this context (0..MCA_BTL_UCT_MAX_WORKERS-1)
*/
mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl, int context_id);
mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl, int context_id, bool enable_progress);
/**
* @brief Destroy a device context and release all resources
@ -89,14 +89,12 @@ mca_btl_uct_module_get_tl_context_specific (mca_btl_uct_module_t *module, mca_bt
mca_btl_uct_device_context_t *context = tl->uct_dev_contexts[context_id];
if (OPAL_UNLIKELY(NULL == context)) {
mca_btl_uct_device_context_t *new_context;
new_context = mca_btl_uct_context_create (module, tl, context_id);
if (!opal_atomic_compare_exchange_strong_ptr (&tl->uct_dev_contexts[context_id], &context, new_context)) {
mca_btl_uct_context_destroy (new_context);
} else {
context = new_context;
OPAL_THREAD_LOCK(&module->lock);
context = tl->uct_dev_contexts[context_id];
if (OPAL_UNLIKELY(NULL == context)) {
context = tl->uct_dev_contexts[context_id] = mca_btl_uct_context_create (module, tl, context_id, true);
}
OPAL_THREAD_UNLOCK(&module->lock);
}
return context;

Просмотреть файл

@ -56,7 +56,7 @@ mca_btl_base_endpoint_t *mca_btl_uct_endpoint_create (opal_proc_t *proc)
static unsigned char *mca_btl_uct_process_modex_tl (unsigned char *modex_data)
{
BTL_VERBOSE(("processing modex for tl %s. size: %u", modex_data, *((uint32_t *) modex_data)));
BTL_VERBOSE(("processing modex for tl %s. size: %u", modex_data + 4, *((uint32_t *) modex_data)));
/* skip size and name */
return modex_data + 4 + strlen ((char *) modex_data + 4) + 1;
@ -109,15 +109,14 @@ static int mca_btl_uct_endpoint_connect_iface (mca_btl_uct_module_t *uct_btl, mc
/* easy case. just connect to the interface */
iface_addr = (uct_iface_addr_t *) tl_data;
device_addr = (uct_device_addr_t *) ((uintptr_t) iface_addr + tl->uct_iface_attr.iface_addr_len);
device_addr = (uct_device_addr_t *) ((uintptr_t) iface_addr + MCA_BTL_UCT_TL_ATTR(tl, tl_context->context_id).iface_addr_len);
BTL_VERBOSE(("connecting endpoint to interface"));
mca_btl_uct_context_lock (tl_context);
ucs_status = uct_ep_create_connected (tl_context->uct_iface, device_addr, iface_addr, &tl_endpoint->uct_ep);
mca_btl_uct_context_unlock (tl_context);
tl_endpoint->flags = MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY;
mca_btl_uct_context_unlock (tl_context);
return (UCS_OK == ucs_status) ? OPAL_SUCCESS : OPAL_ERROR;
}
@ -140,13 +139,13 @@ OBJ_CLASS_INSTANCE(mca_btl_uct_connection_ep_t, opal_object_t, mca_btl_uct_conne
static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint,
mca_btl_uct_device_context_t *conn_tl_context,
int64_t type, void *request, size_t request_length)
mca_btl_uct_conn_req_t *request, size_t request_length)
{
mca_btl_uct_connection_ep_t *conn_ep = endpoint->conn_ep;
ucs_status_t ucs_status;
BTL_VERBOSE(("sending connection request to peer. type: %" PRId64 ", length: %" PRIsize_t,
type, request_length));
BTL_VERBOSE(("sending connection request to peer. context id: %d, type: %d, length: %" PRIsize_t,
request->context_id, request->type, request_length));
OBJ_RETAIN(endpoint->conn_ep);
@ -155,7 +154,8 @@ static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mc
do {
MCA_BTL_UCT_CONTEXT_SERIALIZE(conn_tl_context, {
ucs_status = uct_ep_am_short (conn_ep->uct_ep, MCA_BTL_UCT_CONNECT_RDMA, type, request, request_length);
ucs_status = uct_ep_am_short (conn_ep->uct_ep, MCA_BTL_UCT_CONNECT_RDMA, request->type, request,
request_length);
});
if (OPAL_LIKELY(UCS_OK == ucs_status)) {
break;
@ -170,12 +170,10 @@ static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mc
} while (1);
/* for now we just wait for the connection request to complete before continuing */
MCA_BTL_UCT_CONTEXT_SERIALIZE(conn_tl_context, {
do {
uct_worker_progress (conn_tl_context->uct_worker);
ucs_status = uct_ep_flush (conn_ep->uct_ep, 0, NULL);
} while (UCS_INPROGRESS == ucs_status);
});
do {
ucs_status = uct_ep_flush (conn_ep->uct_ep, 0, NULL);
mca_btl_uct_context_progress (conn_tl_context);
} while (UCS_INPROGRESS == ucs_status);
opal_mutex_lock (&endpoint->ep_lock);
@ -189,7 +187,7 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl,
mca_btl_uct_tl_endpoint_t *tl_endpoint, uint8_t *tl_data,
uint8_t *conn_tl_data, void *ep_addr)
{
size_t request_length = sizeof (mca_btl_uct_conn_req_t) + tl->uct_iface_attr.ep_addr_len;
size_t request_length = sizeof (mca_btl_uct_conn_req_t) + MCA_BTL_UCT_TL_ATTR(tl, tl_context->context_id).ep_addr_len;
mca_btl_uct_connection_ep_t *conn_ep = endpoint->conn_ep;
mca_btl_uct_tl_t *conn_tl = uct_btl->conn_tl;
mca_btl_uct_device_context_t *conn_tl_context = conn_tl->uct_dev_contexts[0];
@ -208,7 +206,7 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl,
opal_process_name_print (endpoint->ep_proc->proc_name)));
iface_addr = (uct_iface_addr_t *) conn_tl_data;
device_addr = (uct_device_addr_t *) ((uintptr_t) conn_tl_data + conn_tl->uct_iface_attr.iface_addr_len);
device_addr = (uct_device_addr_t *) ((uintptr_t) conn_tl_data + MCA_BTL_UCT_TL_ATTR(conn_tl, 0).iface_addr_len);
endpoint->conn_ep = conn_ep = OBJ_NEW(mca_btl_uct_connection_ep_t);
if (OPAL_UNLIKELY(NULL == conn_ep)) {
@ -233,6 +231,7 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl,
request->proc_name = OPAL_PROC_MY_NAME;
request->context_id = tl_context->context_id;
request->tl_index = tl->tl_index;
request->type = !!(ep_addr);
if (NULL == tl_endpoint->uct_ep) {
BTL_VERBOSE(("allocating endpoint for peer %s and sending connection data",
@ -245,48 +244,37 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl,
OBJ_RELEASE(endpoint->conn_ep);
return OPAL_ERROR;
}
}
/* fill in connection request */
ucs_status = uct_ep_get_address (tl_endpoint->uct_ep, (uct_ep_addr_t *) request->ep_addr);
if (ep_addr) {
BTL_VERBOSE(("using remote endpoint address to connect endpoint for tl %s, index %d. ep_addr = %p",
tl->uct_tl_name, tl_context->context_id, ep_addr));
/* NTH: there is no need to lock the device context in this case */
ucs_status = uct_ep_connect_to_ep (tl_endpoint->uct_ep, (uct_device_addr_t *) tl_data, ep_addr);
if (UCS_OK != ucs_status) {
/* this is a fatal a fatal error */
OBJ_RELEASE(endpoint->conn_ep);
uct_ep_destroy (tl_endpoint->uct_ep);
tl_endpoint->uct_ep = NULL;
return OPAL_ERROR;
}
rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, 0, request,
request_length);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
OBJ_RELEASE(endpoint->conn_ep);
uct_ep_destroy (tl_endpoint->uct_ep);
tl_endpoint->uct_ep = NULL;
return OPAL_ERROR;
}
}
if (ep_addr) {
BTL_VERBOSE(("using remote endpoint address to connect endpoint. ep_addr = %p", ep_addr));
/* fill in connection request */
ucs_status = uct_ep_get_address (tl_endpoint->uct_ep, (uct_ep_addr_t *) request->ep_addr);
if (UCS_OK != ucs_status) {
/* this is a fatal a fatal error */
OBJ_RELEASE(endpoint->conn_ep);
uct_ep_destroy (tl_endpoint->uct_ep);
tl_endpoint->uct_ep = NULL;
return OPAL_ERROR;
}
device_addr = (uct_device_addr_t *) tl_data;
/* NTH: there is no need to lock the device context in this case */
ucs_status = uct_ep_connect_to_ep (tl_endpoint->uct_ep, device_addr, ep_addr);
if (UCS_OK != ucs_status) {
return OPAL_ERROR;
}
/* let the remote side know that the connection has been established and
* wait for the message to be sent */
rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, 1, request,
sizeof (mca_btl_uct_conn_req_t));
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
OBJ_RELEASE(endpoint->conn_ep);
uct_ep_destroy (tl_endpoint->uct_ep);
tl_endpoint->uct_ep = NULL;
return OPAL_ERROR;
}
/* let the remote side know that the connection has been established and
* wait for the message to be sent */
rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, request, request_length);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
OBJ_RELEASE(endpoint->conn_ep);
uct_ep_destroy (tl_endpoint->uct_ep);
tl_endpoint->uct_ep = NULL;
return OPAL_ERROR;
}
return (tl_endpoint->flags & MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY) ? OPAL_SUCCESS : OPAL_ERR_OUT_OF_RESOURCE;

Просмотреть файл

@ -72,7 +72,8 @@ static inline int mca_btl_uct_endpoint_check (mca_btl_uct_module_t *module, mca_
rc = mca_btl_uct_endpoint_connect (module, endpoint, ep_index, NULL, tl_index);
*ep_handle = endpoint->uct_eps[ep_index][tl_index].uct_ep;
BTL_VERBOSE(("mca_btl_uct_endpoint_connect returned %d", rc));
BTL_VERBOSE(("mca_btl_uct_endpoint_connect returned %d. context id = %d, flags = 0x%x", rc, ep_index,
MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY & endpoint->uct_eps[ep_index][tl_index].flags));
return rc;
}

Просмотреть файл

@ -31,15 +31,6 @@
#include "btl_uct_endpoint.h"
#include "btl_uct_am.h"
#include "opal/memoryhooks/memory.h"
#include "opal/mca/memory/base/base.h"
#include <ucm/api/ucm.h>
static void mca_btl_uct_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc)
{
ucm_vm_munmap(buf, length);
}
struct mca_btl_base_endpoint_t *mca_btl_uct_get_ep (struct mca_btl_base_module_t *module, opal_proc_t *proc)
{
mca_btl_uct_module_t *uct_module = (mca_btl_uct_module_t *) module;
@ -83,7 +74,6 @@ static int mca_btl_uct_add_procs (mca_btl_base_module_t *btl,
if (false == uct_module->initialized) {
mca_btl_uct_tl_t *am_tl = uct_module->am_tl;
mca_btl_uct_tl_t *rdma_tl = uct_module->rdma_tl;
/* NTH: might want to vary this size based off the universe size (if
* one exists). the table is only used for connection lookup and
@ -97,7 +87,7 @@ static int mca_btl_uct_add_procs (mca_btl_base_module_t *btl,
if (am_tl) {
rc = opal_free_list_init (&uct_module->short_frags, sizeof (mca_btl_uct_base_frag_t),
opal_cache_line_size, OBJ_CLASS(mca_btl_uct_base_frag_t),
am_tl->uct_iface_attr.cap.am.max_short, opal_cache_line_size,
MCA_BTL_UCT_TL_ATTR(am_tl, 0).cap.am.max_short, opal_cache_line_size,
0, 1024, 64, NULL, 0, NULL, NULL, NULL);
rc = opal_free_list_init (&uct_module->eager_frags, sizeof (mca_btl_uct_base_frag_t),
@ -111,18 +101,6 @@ static int mca_btl_uct_add_procs (mca_btl_base_module_t *btl,
NULL, 0, uct_module->rcache, NULL, NULL);
}
if (rdma_tl) {
rc = opal_free_list_init (&uct_module->rdma_completions, sizeof (mca_btl_uct_uct_completion_t),
opal_cache_line_size, OBJ_CLASS(mca_btl_uct_uct_completion_t),
0, opal_cache_line_size, 0, 4096, 128, NULL, 0, NULL, NULL,
NULL);
}
if (mca_btl_uct_component.disable_ucx_memory_hooks) {
ucm_set_external_event(UCM_EVENT_VM_UNMAPPED);
opal_mem_hooks_register_release(mca_btl_uct_mem_release_cb, NULL);
}
uct_module->initialized = true;
}
@ -288,10 +266,6 @@ int mca_btl_uct_finalize (mca_btl_base_module_t* btl)
mca_btl_uct_endpoint_t *endpoint;
uint64_t key;
if (mca_btl_uct_component.disable_ucx_memory_hooks) {
opal_mem_hooks_unregister_release (mca_btl_uct_mem_release_cb);
}
/* clean up any leftover endpoints */
OPAL_HASH_TABLE_FOREACH(key, uint64, endpoint, &uct_module->id_to_endpoint) {
OBJ_RELEASE(endpoint);
@ -300,9 +274,9 @@ int mca_btl_uct_finalize (mca_btl_base_module_t* btl)
OBJ_DESTRUCT(&uct_module->short_frags);
OBJ_DESTRUCT(&uct_module->eager_frags);
OBJ_DESTRUCT(&uct_module->max_frags);
OBJ_DESTRUCT(&uct_module->rdma_completions);
OBJ_DESTRUCT(&uct_module->pending_frags);
OBJ_DESTRUCT(&uct_module->lock);
OBJ_DESTRUCT(&uct_module->pending_connection_reqs);
if (uct_module->rcache) {
mca_rcache_base_module_destroy (uct_module->rcache);

Просмотреть файл

@ -30,13 +30,14 @@ static void mca_btl_uct_uct_completion_construct (mca_btl_uct_uct_completion_t *
OBJ_CLASS_INSTANCE(mca_btl_uct_uct_completion_t, opal_free_list_item_t, mca_btl_uct_uct_completion_construct, NULL);
mca_btl_uct_uct_completion_t *
mca_btl_uct_uct_completion_alloc (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint,
void *local_address, mca_btl_base_registration_handle_t *local_handle,
mca_btl_uct_device_context_t *dev_context, mca_btl_base_rdma_completion_fn_t cbfunc,
void *cbcontext, void *cbdata)
{
mca_btl_uct_uct_completion_t *comp = (mca_btl_uct_uct_completion_t *) opal_free_list_get (&uct_btl->rdma_completions);
mca_btl_uct_uct_completion_t *comp = (mca_btl_uct_uct_completion_t *) opal_free_list_get (&dev_context->rdma_completions);
if (OPAL_LIKELY(NULL != comp)) {
comp->uct_comp.count = 1;
comp->btl = &uct_btl->super;
@ -55,8 +56,7 @@ mca_btl_uct_uct_completion_alloc (mca_btl_uct_module_t *uct_btl, mca_btl_base_en
void mca_btl_uct_uct_completion_release (mca_btl_uct_uct_completion_t *comp)
{
if (comp) {
mca_btl_uct_module_t *uct_btl = (mca_btl_uct_module_t *) comp->btl;
opal_free_list_return (&uct_btl->rdma_completions, &comp->super);
opal_free_list_return (&comp->dev_context->rdma_completions, &comp->super);
}
}
@ -98,30 +98,36 @@ int mca_btl_uct_get (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi
mca_btl_uct_context_lock (context);
if (size <= uct_btl->rdma_tl->uct_iface_attr.cap.get.max_bcopy) {
if (size <= MCA_BTL_UCT_TL_ATTR(uct_btl->rdma_tl, context->context_id).cap.get.max_bcopy) {
ucs_status = uct_ep_get_bcopy (ep_handle, mca_btl_uct_get_unpack, local_address, size, remote_address,
rkey.rkey, &comp->uct_comp);
} else {
uct_iov_t iov = {.buffer = local_address, .length = size, .stride = 0, .count = 1,
.memh = MCA_BTL_UCT_REG_REMOTE_TO_LOCAL(local_handle)->uct_memh};
ucs_status = uct_ep_get_zcopy (ep_handle, &iov, 1, remote_address, rkey.rkey, &comp->uct_comp);
}
/* go ahead and progress the worker while we have the lock */
(void) uct_worker_progress (context->uct_worker);
/* go ahead and progress the worker while we have the lock (if we are not in an AM callback) */
if (!context->in_am_callback) {
(void) uct_worker_progress (context->uct_worker);
}
mca_btl_uct_context_unlock (context);
mca_btl_uct_device_handle_completions (context);
if (!context->in_am_callback) {
mca_btl_uct_device_handle_completions (context);
}
if (UCS_OK == ucs_status && cbfunc) {
/* if UCS_OK is returned the callback will never fire so we have to make the callback
* ourselves */
cbfunc (btl, endpoint, local_address, local_handle, cbcontext, cbdata, OPAL_SUCCESS);
mca_btl_uct_uct_completion_release (comp);
} else if (UCS_INPROGRESS == ucs_status) {
}
if (UCS_INPROGRESS == ucs_status) {
ucs_status = UCS_OK;
} else {
mca_btl_uct_uct_completion_release (comp);
}
BTL_VERBOSE(("get issued. status = %d", ucs_status));
@ -157,6 +163,8 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi
ucs_status_t ucs_status;
uct_rkey_bundle_t rkey;
uct_ep_h ep_handle;
bool use_short = false;
bool use_bcopy = false;
int rc;
BTL_VERBOSE(("performing put operation. local address: %p, length: %lu", local_address, (unsigned long) size));
@ -177,12 +185,19 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi
mca_btl_uct_context_lock (context);
/* determine what UCT prototol should be used */
if (size <= uct_btl->super.btl_put_local_registration_threshold) {
use_short = size <= MCA_BTL_UCT_TL_ATTR(uct_btl->rdma_tl, context->context_id).cap.put.max_short;
use_bcopy = !use_short;
}
do {
if (size <= uct_btl->rdma_tl->uct_iface_attr.cap.put.max_short) {
if (use_short) {
ucs_status = uct_ep_put_short (ep_handle, local_address, size, remote_address, rkey.rkey);
} else if (size <= uct_btl->super.btl_put_local_registration_threshold) {
} else if (use_bcopy) {
ssize_t tmp = uct_ep_put_bcopy (ep_handle, mca_btl_uct_put_pack,
&(mca_btl_uct_put_pack_args_t) {.local_address = local_address, .size = size},
&(mca_btl_uct_put_pack_args_t) {.local_address = local_address,
.size = size},
remote_address, rkey.rkey);
ucs_status = (tmp == (ssize_t) size) ? UCS_OK : UCS_ERR_NO_RESOURCE;
} else {
@ -193,8 +208,11 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi
}
/* go ahead and progress the worker while we have the lock */
if (UCS_ERR_NO_RESOURCE != ucs_status) {
(void) uct_worker_progress (context->uct_worker);
if (UCS_ERR_NO_RESOURCE != ucs_status || context->in_am_callback) {
if (!context->in_am_callback) {
(void) uct_worker_progress (context->uct_worker);
}
break;
}
@ -211,9 +229,12 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi
* ourselves. this callback is possibly being made before the data is visible to the
* remote process. */
cbfunc (btl, endpoint, local_address, local_handle, cbcontext, cbdata, OPAL_SUCCESS);
mca_btl_uct_uct_completion_release (comp);
} else if (UCS_INPROGRESS == ucs_status) {
}
if (UCS_INPROGRESS == ucs_status) {
ucs_status = UCS_OK;
} else {
mca_btl_uct_uct_completion_release (comp);
}
uct_rkey_release (&rkey);

Просмотреть файл

@ -61,11 +61,11 @@ static uint64_t mca_btl_uct_cap_to_btl_atomic_flag[][2] = {
static void mca_btl_uct_module_set_atomic_flags (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl)
{
uint64_t cap_flags = tl->uct_iface_attr.cap.flags;
uint64_t cap_flags = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags;
/* NTH: only use the fetching atomics for now */
uint64_t atomic_flags32 = tl->uct_iface_attr.cap.atomic32.fop_flags;
uint64_t atomic_flags64 = tl->uct_iface_attr.cap.atomic64.fop_flags;
uint64_t atomic_flags32 = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.atomic32.fop_flags;
uint64_t atomic_flags64 = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.atomic64.fop_flags;
/* NTH: don't really have a way to seperate 32-bit and 64-bit right now */
uint64_t all_flags = atomic_flags32 & atomic_flags64;
@ -110,7 +110,7 @@ static uint64_t mca_btl_uct_cap_to_btl_atomic_flag[][2] = {
*/
static void mca_btl_uct_module_set_atomic_flags (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl)
{
uint64_t cap_flags = tl->uct_iface_attr.cap.flags;
uint64_t cap_flags = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags;
module->super.btl_atomic_flags = 0;
@ -164,60 +164,70 @@ OBJ_CLASS_INSTANCE(mca_btl_uct_tl_t, opal_list_item_t, mca_btl_uct_tl_constructo
static ucs_status_t mca_btl_uct_conn_req_cb (void *arg, void *data, size_t length, unsigned flags)
{
mca_btl_uct_module_t *module = (mca_btl_uct_module_t *) arg;
mca_btl_uct_conn_req_t *req = (mca_btl_uct_conn_req_t *) ((uintptr_t) data + 8);
mca_btl_uct_pending_connection_request_t *request = calloc (1, length + sizeof (request->super));
/* it is not safe to process the connection request from the callback so just save it for
* later processing */
OBJ_CONSTRUCT(request, mca_btl_uct_pending_connection_request_t);
memcpy (&request->request_data, (void *) ((intptr_t) data + 8), length);
opal_fifo_push_atomic (&module->pending_connection_reqs, &request->super);
return UCS_OK;
}
OBJ_CLASS_INSTANCE(mca_btl_uct_pending_connection_request_t, opal_list_item_t, NULL, NULL);
int mca_btl_uct_process_connection_request (mca_btl_uct_module_t *module, mca_btl_uct_conn_req_t *req)
{
struct opal_proc_t *remote_proc = opal_proc_for_name (req->proc_name);
mca_btl_base_endpoint_t *endpoint = mca_btl_uct_get_ep (&module->super, remote_proc);
mca_btl_uct_tl_endpoint_t *tl_endpoint = endpoint->uct_eps[req->context_id] + req->tl_index;
int64_t type = *((int64_t *) data);
int32_t ep_flags;
int rc;
BTL_VERBOSE(("got connection request for endpoint %p. length = %lu", (void *) endpoint, length));
BTL_VERBOSE(("got connection request for endpoint %p. type = %d. context id = %d",
(void *) endpoint, req->type, req->context_id));
if (NULL == endpoint) {
BTL_ERROR(("could not create endpoint for connection request"));
return UCS_ERR_UNREACHABLE;
}
assert (type < 2);
assert (req->type < 2);
if (0 == type) {
ep_flags = opal_atomic_fetch_or_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC);
if (!(ep_flags & MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) {
/* create any necessary resources */
rc = mca_btl_uct_endpoint_connect (module, endpoint, req->context_id, req->ep_addr, req->tl_index);
if (OPAL_SUCCESS != rc && OPAL_ERR_OUT_OF_RESOURCE != rc) {
BTL_ERROR(("could not setup rdma endpoint"));
return UCS_ERR_UNREACHABLE;
BTL_ERROR(("could not setup rdma endpoint. rc = %d", rc));
return rc;
}
ep_flags = opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC);
} else {
ep_flags = opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY);
}
/* the connection is ready once we have received the connection data and also a connection ready
* message. this might be overkill but there is little documentation at the UCT level on when
* an endpoint can be used. */
if ((ep_flags & (MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY | MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) ==
(MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY | MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) {
if (req->type == 1) {
/* remote side is ready */
mca_btl_uct_base_frag_t *frag;
/* to avoid a race with send adding pending frags grab the lock here */
OPAL_THREAD_LOCK(&endpoint->ep_lock);
(void) opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY);
OPAL_THREAD_UNLOCK(&endpoint->ep_lock);
OPAL_THREAD_SCOPED_LOCK(&endpoint->ep_lock,{
BTL_VERBOSE(("connection ready. sending %d frags", opal_list_get_size (&module->pending_frags)));
(void) opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY);
opal_atomic_wmb ();
opal_atomic_wmb ();
OPAL_THREAD_SCOPED_LOCK(&module->lock, {
OPAL_LIST_FOREACH(frag, &module->pending_frags, mca_btl_uct_base_frag_t) {
if (frag->context_id == req->context_id && endpoint == frag->endpoint) {
if (frag->context->context_id == req->context_id && endpoint == frag->endpoint) {
frag->ready = true;
}
}
});
}
return UCS_OK;
return OPAL_SUCCESS;
}
static int mca_btl_uct_setup_connection_tl (mca_btl_uct_module_t *module)
@ -237,7 +247,20 @@ static int mca_btl_uct_setup_connection_tl (mca_btl_uct_module_t *module)
return UCS_OK == ucs_status ? OPAL_SUCCESS : OPAL_ERROR;
}
mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl, int context_id)
static void mca_btl_uct_context_enable_progress (mca_btl_uct_device_context_t *context)
{
if (!context->progress_enabled) {
#if HAVE_DECL_UCT_PROGRESS_THREAD_SAFE
uct_iface_progress_enable (context->uct_iface, UCT_PROGRESS_THREAD_SAFE | UCT_PROGRESS_SEND |
UCT_PROGRESS_RECV);
#else
uct_iface_progress_enable (context->uct_iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
#endif
context->progress_enabled = true;
}
}
mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl, int context_id, bool enable_progress)
{
uct_iface_params_t iface_params = {.rndv_cb = NULL, .eager_cb = NULL, .stats_root = NULL,
.rx_headroom = 0, .open_mode = UCT_IFACE_OPEN_MODE_DEVICE,
@ -245,6 +268,7 @@ mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *
.dev_name = tl->uct_dev_name}}};
mca_btl_uct_device_context_t *context;
ucs_status_t ucs_status;
int rc;
context = calloc (1, sizeof (*context));
if (OPAL_UNLIKELY(NULL == context)) {
@ -255,44 +279,54 @@ mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t *
context->uct_btl = module;
OBJ_CONSTRUCT(&context->completion_fifo, opal_fifo_t);
OBJ_CONSTRUCT(&context->mutex, opal_recursive_mutex_t);
OBJ_CONSTRUCT(&context->rdma_completions, opal_free_list_t);
do {
/* apparently (in contradiction to the spec) UCT is *not* thread safe. because we have to
* use our own locks just go ahead and use UCS_THREAD_MODE_SINGLE. if they ever fix their
* api then change this back to UCS_THREAD_MODE_MULTI and remove the locks around the
* various UCT calls. */
ucs_status = uct_worker_create (module->ucs_async, UCS_THREAD_MODE_SINGLE, &context->uct_worker);
if (UCS_OK != ucs_status) {
BTL_VERBOSE(("could not create a UCT worker"));
mca_btl_uct_context_destroy (context);
context = NULL;
break;
}
rc = opal_free_list_init (&context->rdma_completions, sizeof (mca_btl_uct_uct_completion_t),
opal_cache_line_size, OBJ_CLASS(mca_btl_uct_uct_completion_t),
0, opal_cache_line_size, 0, 4096, 128, NULL, 0, NULL, NULL,
NULL);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
mca_btl_uct_context_destroy (context);
return NULL;
}
ucs_status = uct_iface_open (tl->uct_md->uct_md, context->uct_worker, &iface_params,
tl->uct_tl_config, &context->uct_iface);
if (UCS_OK != ucs_status) {
BTL_VERBOSE(("could not open UCT interface. error code: %d", ucs_status));
mca_btl_uct_context_destroy (context);
context = NULL;
break;
}
/* apparently (in contradiction to the spec) UCT is *not* thread safe. because we have to
* use our own locks just go ahead and use UCS_THREAD_MODE_SINGLE. if they ever fix their
* api then change this back to UCS_THREAD_MODE_MULTI and remove the locks around the
* various UCT calls. */
ucs_status = uct_worker_create (module->ucs_async, UCS_THREAD_MODE_SINGLE, &context->uct_worker);
if (OPAL_UNLIKELY(UCS_OK != ucs_status)) {
BTL_VERBOSE(("could not create a UCT worker"));
mca_btl_uct_context_destroy (context);
return NULL;
}
ucs_status = uct_iface_open (tl->uct_md->uct_md, context->uct_worker, &iface_params,
tl->uct_tl_config, &context->uct_iface);
if (OPAL_UNLIKELY(UCS_OK != ucs_status)) {
BTL_VERBOSE(("could not open UCT interface. error code: %d", ucs_status));
mca_btl_uct_context_destroy (context);
return NULL;
}
/* only need to query one of the interfaces to get the attributes */
ucs_status = uct_iface_query (context->uct_iface, &context->uct_iface_attr);
if (UCS_OK != ucs_status) {
BTL_VERBOSE(("Error querying UCT interface"));
mca_btl_uct_context_destroy (context);
return NULL;
}
if (context_id > 0 && tl == module->am_tl) {
BTL_VERBOSE(("installing AM handler for tl %p context id %d", (void *) tl, context_id));
uct_iface_set_am_handler (context->uct_iface, MCA_BTL_UCT_FRAG, mca_btl_uct_am_handler,
context, UCT_CB_FLAG_SYNC);
}
if (enable_progress) {
BTL_VERBOSE(("enabling progress for tl %p context id %d", (void *) tl, context_id));
#if HAVE_DECL_UCT_PROGRESS_THREAD_SAFE
uct_iface_progress_enable (context->uct_iface, UCT_PROGRESS_THREAD_SAFE | UCT_PROGRESS_SEND |
UCT_PROGRESS_RECV);
#else
uct_iface_progress_enable (context->uct_iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
#endif
if (context_id > 0 && tl == module->am_tl) {
BTL_VERBOSE(("installing AM handler for tl %p context id %d", (void *) tl, context_id));
uct_iface_set_am_handler (context->uct_iface, MCA_BTL_UCT_FRAG, mca_btl_uct_am_handler,
context, UCT_CB_FLAG_SYNC);
}
} while (0);
mca_btl_uct_context_enable_progress (context);
}
return context;
}
@ -310,6 +344,7 @@ void mca_btl_uct_context_destroy (mca_btl_uct_device_context_t *context)
}
OBJ_DESTRUCT(&context->completion_fifo);
OBJ_DESTRUCT(&context->rdma_completions);
free (context);
}
@ -324,7 +359,6 @@ static int tl_compare (opal_list_item_t **a, opal_list_item_t **b)
static mca_btl_uct_tl_t *mca_btl_uct_create_tl (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, uct_tl_resource_desc_t *tl_desc, int priority)
{
mca_btl_uct_tl_t *tl = OBJ_NEW(mca_btl_uct_tl_t);
ucs_status_t ucs_status;
if (OPAL_UNLIKELY(NULL == tl)) {
return NULL;
@ -347,22 +381,15 @@ static mca_btl_uct_tl_t *mca_btl_uct_create_tl (mca_btl_uct_module_t *module, mc
(void) uct_md_iface_config_read (md->uct_md, tl_desc->tl_name, NULL, NULL, &tl->uct_tl_config);
/* always create a 0 context (needed to query) */
tl->uct_dev_contexts[0] = mca_btl_uct_context_create (module, tl, 0);
tl->uct_dev_contexts[0] = mca_btl_uct_context_create (module, tl, 0, false);
if (NULL == tl->uct_dev_contexts[0]) {
BTL_VERBOSE(("could not create a uct device context"));
OBJ_RELEASE(tl);
return NULL;
}
/* only need to query one of the interfaces to get the attributes */
ucs_status = uct_iface_query (tl->uct_dev_contexts[0]->uct_iface, &tl->uct_iface_attr);
if (UCS_OK != ucs_status) {
BTL_VERBOSE(("Error querying UCT interface"));
OBJ_RELEASE(tl);
return NULL;
}
BTL_VERBOSE(("Interface CAPS for tl %s::%s: 0x%lx", module->md_name, tl_desc->tl_name, (unsigned long) tl->uct_iface_attr.cap.flags));
BTL_VERBOSE(("Interface CAPS for tl %s::%s: 0x%lx", module->md_name, tl_desc->tl_name,
(unsigned long) MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags));
return tl;
}
@ -373,24 +400,20 @@ static void mca_btl_uct_set_tl_rdma (mca_btl_uct_module_t *module, mca_btl_uct_t
mca_btl_uct_module_set_atomic_flags (module, tl);
module->super.btl_get_limit = tl->uct_iface_attr.cap.get.max_zcopy;
if (tl->uct_iface_attr.cap.get.max_bcopy) {
module->super.btl_get_limit = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.get.max_zcopy;
if (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.get.max_bcopy) {
module->super.btl_get_alignment = 0;
module->super.btl_get_local_registration_threshold = tl->uct_iface_attr.cap.get.max_bcopy;
module->super.btl_get_local_registration_threshold = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.get.max_bcopy;
} else {
/* this is overkill in terms of alignment but we have no way to enforce a minimum get size */
module->super.btl_get_alignment = opal_next_poweroftwo_inclusive (tl->uct_iface_attr.cap.get.min_zcopy);
module->super.btl_get_alignment = opal_next_poweroftwo_inclusive (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.get.min_zcopy);
}
module->super.btl_put_limit = tl->uct_iface_attr.cap.put.max_zcopy;
module->super.btl_put_limit = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.put.max_zcopy;
module->super.btl_put_alignment = 0;
/* no registration needed when using short put */
if (tl->uct_iface_attr.cap.put.max_bcopy > tl->uct_iface_attr.cap.put.max_short) {
module->super.btl_put_local_registration_threshold = tl->uct_iface_attr.cap.put.max_bcopy;
} else {
module->super.btl_put_local_registration_threshold = tl->uct_iface_attr.cap.put.max_short;
}
/* no registration needed when using short/bcopy put */
module->super.btl_put_local_registration_threshold = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.put.max_bcopy;
module->rdma_tl = tl;
OBJ_RETAIN(tl);
@ -420,6 +443,9 @@ static void mca_btl_uct_set_tl_am (mca_btl_uct_module_t *module, mca_btl_uct_tl_
if (tl->max_device_contexts <= 1) {
tl->max_device_contexts = mca_btl_uct_component.num_contexts_per_module;
}
module->super.btl_max_send_size = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.am.max_zcopy - sizeof (mca_btl_uct_am_header_t);
module->super.btl_eager_limit = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.am.max_bcopy - sizeof (mca_btl_uct_am_header_t);
}
static int mca_btl_uct_set_tl_conn (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl)
@ -466,18 +492,23 @@ static int mca_btl_uct_evaluate_tl (mca_btl_uct_module_t *module, mca_btl_uct_tl
}
if (tl == module->rdma_tl || tl == module->am_tl) {
BTL_VERBOSE(("tl has flags 0x%" PRIx64, tl->uct_iface_attr.cap.flags));
module->super.btl_flags |= mca_btl_uct_module_flags (tl->uct_iface_attr.cap.flags);
BTL_VERBOSE(("tl has flags 0x%" PRIx64, MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags));
module->super.btl_flags |= mca_btl_uct_module_flags (MCA_BTL_UCT_TL_ATTR(tl, 0).cap.flags);
/* the bandwidth and latency numbers relate to both rdma and active messages. need to
* come up with a better estimate. */
/* UCT bandwidth is in bytes/sec, BTL is in MB/sec */
module->super.btl_bandwidth = (uint32_t) (tl->uct_iface_attr.bandwidth / 1048576.0);
module->super.btl_bandwidth = (uint32_t) (MCA_BTL_UCT_TL_ATTR(tl, 0).bandwidth / 1048576.0);
/* TODO -- figure out how to translate UCT latency to us */
module->super.btl_latency = 1;
}
if (tl == module->rdma_tl || tl == module->am_tl || tl == module->conn_tl) {
/* make sure progress is enabled on the default context now that we know this TL will be used */
mca_btl_uct_context_enable_progress (tl->uct_dev_contexts[0]);
}
return OPAL_SUCCESS;
}
@ -487,6 +518,7 @@ int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, u
mca_btl_uct_tl_t *tl;
opal_list_t tl_list;
char **tl_filter;
int any_priority = 0;
OBJ_CONSTRUCT(&tl_list, opal_list_t);
@ -499,23 +531,46 @@ int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, u
free (tl_filter[0]);
tl_filter[0] = tmp;
include = false;
} else if (0 == strcmp (tl_filter[0], "any")) {
any = true;
}
/* check for the any keyword */
for (unsigned j = 0 ; tl_filter[j] ; ++j) {
if (0 == strcmp (tl_filter[j], "any")) {
any_priority = j;
any = true;
break;
}
}
if (any && !include) {
opal_argv_free (tl_filter);
return OPAL_ERR_NOT_AVAILABLE;
}
for (unsigned i = 0 ; i < tl_count ; ++i) {
bool try_tl = any;
int priority = 0;
int priority = any_priority;
for (unsigned j = 0 ; tl_filter[j] && !try_tl ; ++j) {
try_tl = (0 == strcmp (tl_filter[j], tl_descs[i].tl_name)) == include;
priority = j;
for (unsigned j = 0 ; tl_filter[j] ; ++j) {
if (0 == strcmp (tl_filter[j], tl_descs[i].tl_name)) {
try_tl = include;
priority = j;
break;
}
}
BTL_VERBOSE(("tl filter: tl_name = %s, use = %d, priority = %d", tl_descs[i].tl_name, try_tl, priority));
if (!try_tl) {
continue;
}
if (0 == strcmp (tl_descs[i].tl_name, "ud")) {
/* ud looks like any normal transport but we do not want to use it for anything other
* than connection management so ensure it gets evaluated last */
priority = INT_MAX;
}
tl = mca_btl_uct_create_tl (module, md, tl_descs + i, priority);
if (tl) {
@ -523,6 +578,8 @@ int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, u
}
}
opal_argv_free (tl_filter);
if (0 == opal_list_get_size (&tl_list)) {
BTL_VERBOSE(("no suitable tls match filter: %s", mca_btl_uct_component.allowed_transports));
OBJ_DESTRUCT(&tl_list);

Просмотреть файл

@ -77,6 +77,9 @@ struct mca_btl_uct_conn_req_t {
/** name of the requesting process */
opal_process_name_t proc_name;
/** request type: 0 == endpoint data, 1 == endpoint data + remote ready */
int type;
/** context id that should be connected */
int context_id;
@ -141,9 +144,21 @@ struct mca_btl_uct_device_context_t {
/** UCT interface handle */
uct_iface_h uct_iface;
/** interface attributes */
uct_iface_attr_t uct_iface_attr;
/** RDMA completions */
opal_free_list_t rdma_completions;
/** complete fragments and rdma operations. this fifo is used to avoid making
* callbacks while holding the device lock. */
opal_fifo_t completion_fifo;
/** progress is enabled on this context */
bool progress_enabled;
/** context is in AM callback */
volatile bool in_am_callback;
};
typedef struct mca_btl_uct_device_context_t mca_btl_uct_device_context_t;
@ -229,8 +244,8 @@ struct mca_btl_uct_base_frag_t {
/** module this fragment is associated with */
struct mca_btl_uct_module_t *btl;
/** context this fragment is waiting on */
int context_id;
/* tl context */
mca_btl_uct_device_context_t *context;
/** is this frag ready to send (only used when pending) */
bool ready;
@ -301,9 +316,6 @@ struct mca_btl_uct_tl_t {
/** device name for this tl (used for creating device contexts) */
char *uct_dev_name;
/** interface attributes */
uct_iface_attr_t uct_iface_attr;
/** maxiumum number of device contexts that can be created */
int max_device_contexts;
@ -318,4 +330,14 @@ struct mca_btl_uct_tl_t {
typedef struct mca_btl_uct_tl_t mca_btl_uct_tl_t;
OBJ_CLASS_DECLARATION(mca_btl_uct_tl_t);
#define MCA_BTL_UCT_TL_ATTR(tl, context_id) (tl)->uct_dev_contexts[(context_id)]->uct_iface_attr
struct mca_btl_uct_pending_connection_request_t {
opal_list_item_t super;
uint8_t request_data[];
};
typedef struct mca_btl_uct_pending_connection_request_t mca_btl_uct_pending_connection_request_t;
OBJ_CLASS_DECLARATION(mca_btl_uct_pending_connection_request_t);
#endif /* !defined(BTL_UCT_TYPES_H) */