diff --git a/opal/mca/btl/uct/btl_uct.h b/opal/mca/btl/uct/btl_uct.h index 989fe64132..3875679443 100644 --- a/opal/mca/btl/uct/btl_uct.h +++ b/opal/mca/btl/uct/btl_uct.h @@ -68,7 +68,7 @@ struct mca_btl_uct_module_t { opal_hash_table_t id_to_endpoint; /** mutex to protect the module */ - opal_mutex_t lock; + opal_recursive_mutex_t lock; /** async context */ ucs_async_context_t *ucs_async; @@ -108,6 +108,9 @@ struct mca_btl_uct_module_t { /** frags that were waiting on connections that are now ready to send */ opal_list_t pending_frags; + + /** pending connection requests */ + opal_fifo_t pending_connection_reqs; }; typedef struct mca_btl_uct_module_t mca_btl_uct_module_t; @@ -278,6 +281,7 @@ ucs_status_t mca_btl_uct_am_handler (void *arg, void *data, size_t length, unsig struct mca_btl_base_endpoint_t *mca_btl_uct_get_ep (struct mca_btl_base_module_t *module, opal_proc_t *proc); int mca_btl_uct_query_tls (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, uct_tl_resource_desc_t *tl_descs, unsigned tl_count); +int mca_btl_uct_process_connection_request (mca_btl_uct_module_t *module, mca_btl_uct_conn_req_t *req); /** * @brief Checks if a tl is suitable for using for RDMA diff --git a/opal/mca/btl/uct/btl_uct_am.c b/opal/mca/btl/uct/btl_uct_am.c index 5d3f0ef042..90ea28eed5 100644 --- a/opal/mca/btl/uct/btl_uct_am.c +++ b/opal/mca/btl/uct/btl_uct_am.c @@ -25,7 +25,7 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt mca_btl_uct_module_t *uct_btl = (mca_btl_uct_module_t *) btl; mca_btl_uct_base_frag_t *frag = NULL; - if ((size + 8) <= (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) { + if (size <= (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) { frag = mca_btl_uct_frag_alloc_short (uct_btl, endpoint); } else if (size <= uct_btl->super.btl_eager_limit) { frag = mca_btl_uct_frag_alloc_eager (uct_btl, endpoint); @@ -40,6 +40,10 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_bt frag->base.des_flags = flags; frag->base.order = order; frag->uct_iov.length = size; + if (NULL != frag->base.super.registration) { + /* zero-copy fragments will need callbacks */ + frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; + } } return (mca_btl_base_descriptor_t *) frag; @@ -95,14 +99,18 @@ struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src (mca_btl_base_module_t return NULL; } + frag->uct_iov.length = total_size; frag->base.order = order; frag->base.des_flags = flags; if (total_size > (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) { + frag->segments[0].seg_len = reserve; frag->segments[1].seg_len = *size; frag->segments[1].seg_addr.pval = data_ptr; frag->base.des_segment_count = 2; } else { + frag->segments[0].seg_len = total_size; memcpy ((void *)((intptr_t) frag->segments[1].seg_addr.pval + reserve), data_ptr, *size); + frag->base.des_segment_count = 1; } } @@ -130,7 +138,7 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg) data = (void *)((intptr_t) data + 8); /* this function should only ever get called with fragments with two segments */ - for (size_t i = 0 ; i < 2 ; ++i) { + for (size_t i = 0 ; i < frag->base.des_segment_count ; ++i) { const size_t seg_len = frag->segments[i].seg_len; memcpy (data, frag->segments[i].seg_addr.pval, seg_len); data = (void *)((intptr_t) data + seg_len); @@ -140,57 +148,84 @@ static size_t mca_btl_uct_send_frag_pack (void *data, void *arg) return length; } -int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint, mca_btl_uct_base_frag_t *frag, - int32_t flags, mca_btl_uct_device_context_t *context, uct_ep_h ep_handle) +static void mca_btl_uct_append_pending_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag, + mca_btl_uct_device_context_t *context, bool ready) { + frag->ready = ready; + frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; + opal_atomic_wmb (); + + opal_list_append (&uct_btl->pending_frags, (opal_list_item_t *) frag); +} + +int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag, bool append) +{ + mca_btl_uct_device_context_t *context = frag->context; + const ssize_t msg_size = frag->uct_iov.length + 8; + ssize_t size; ucs_status_t ucs_status; + uct_ep_h ep_handle = NULL; - mca_btl_uct_context_lock (context); + /* if we get here then we must have an endpoint handle for this context/endpoint pair */ + (void) mca_btl_uct_endpoint_test_am (uct_btl, frag->endpoint, frag->context, &ep_handle); + assert (NULL != ep_handle); - do { + /* if another thread set this we really don't care too much as this flag is only meant + * to protect against deep recursion */ + if (!context->in_am_callback) { + mca_btl_uct_context_lock (context); + /* attempt to post the fragment */ if (NULL != frag->base.super.registration) { frag->comp.dev_context = context; - ucs_status = uct_ep_am_zcopy (ep_handle, MCA_BTL_UCT_FRAG, &frag->header, sizeof (frag->header), &frag->uct_iov, 1, 0, &frag->comp.uct_comp); + + if (OPAL_LIKELY(UCS_INPROGRESS == ucs_status)) { + uct_worker_progress (context->uct_worker); + mca_btl_uct_context_unlock (context); + return OPAL_SUCCESS; + } } else { /* short message */ - /* restore original flags */ - frag->base.des_flags = flags; - - if (1 == frag->base.des_segment_count) { + if (1 == frag->base.des_segment_count && (frag->uct_iov.length + 8) < MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, 0).cap.am.max_short) { ucs_status = uct_ep_am_short (ep_handle, MCA_BTL_UCT_FRAG, frag->header.value, frag->uct_iov.buffer, frag->uct_iov.length); - } else { - ucs_status = uct_ep_am_bcopy (ep_handle, MCA_BTL_UCT_FRAG, mca_btl_uct_send_frag_pack, frag, 0); + + if (OPAL_LIKELY(UCS_OK == ucs_status)) { + uct_worker_progress (context->uct_worker); + mca_btl_uct_context_unlock (context); + /* send is complete */ + mca_btl_uct_frag_complete (frag, OPAL_SUCCESS); + return 1; + } + } + + size = uct_ep_am_bcopy (ep_handle, MCA_BTL_UCT_FRAG, mca_btl_uct_send_frag_pack, frag, 0); + if (OPAL_LIKELY(size == msg_size)) { + uct_worker_progress (context->uct_worker); + mca_btl_uct_context_unlock (context); + /* send is complete */ + mca_btl_uct_frag_complete (frag, OPAL_SUCCESS); + return 1; } } - if (UCS_ERR_NO_RESOURCE != ucs_status) { - /* go ahead and progress the worker while we have the lock */ - (void) uct_worker_progress (context->uct_worker); - break; - } + /* wait for something to happen */ + uct_worker_progress (context->uct_worker); + mca_btl_uct_context_unlock (context); - /* wait for something to complete before trying again */ - while (!uct_worker_progress (context->uct_worker)); - } while (1); - - mca_btl_uct_context_unlock (context); - - if (UCS_OK == ucs_status) { - /* restore original flags */ - frag->base.des_flags = flags; - /* send is complete */ - mca_btl_uct_frag_complete (frag, OPAL_SUCCESS); - return 1; + mca_btl_uct_device_handle_completions (context); } - if (OPAL_UNLIKELY(UCS_INPROGRESS != ucs_status)) { + if (!append) { return OPAL_ERR_OUT_OF_RESOURCE; } - return 0; + OPAL_THREAD_LOCK(&uct_btl->lock); + mca_btl_uct_append_pending_frag (uct_btl, frag, context, true); + OPAL_THREAD_UNLOCK(&uct_btl->lock); + + return OPAL_SUCCESS; } int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, mca_btl_base_descriptor_t *descriptor, @@ -199,7 +234,6 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo mca_btl_uct_module_t *uct_btl = (mca_btl_uct_module_t *) btl; mca_btl_uct_device_context_t *context = mca_btl_uct_module_get_am_context (uct_btl); mca_btl_uct_base_frag_t *frag = (mca_btl_uct_base_frag_t *) descriptor; - int flags = frag->base.des_flags; uct_ep_h ep_handle; int rc; @@ -208,28 +242,21 @@ int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo frag->header.data.tag = tag; - - /* add the callback flag before posting to avoid potential races with other threads */ - frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK; + frag->context = context; rc = mca_btl_uct_endpoint_check_am (uct_btl, endpoint, context, &ep_handle); if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { - OPAL_THREAD_LOCK(&endpoint->ep_lock); + OPAL_THREAD_LOCK(&uct_btl->lock); /* check one more time in case another thread is completing the connection now */ if (OPAL_SUCCESS != mca_btl_uct_endpoint_test_am (uct_btl, endpoint, context, &ep_handle)) { - frag->context_id = context->context_id; - frag->ready = false; - OPAL_THREAD_LOCK(&uct_btl->lock); - opal_list_append (&uct_btl->pending_frags, (opal_list_item_t *) frag); - OPAL_THREAD_UNLOCK(&endpoint->ep_lock); + mca_btl_uct_append_pending_frag (uct_btl, frag, context, false); OPAL_THREAD_UNLOCK(&uct_btl->lock); - return OPAL_SUCCESS; } - OPAL_THREAD_UNLOCK(&endpoint->ep_lock); + OPAL_THREAD_UNLOCK(&uct_btl->lock); } - return mca_btl_uct_send_frag (uct_btl, endpoint, frag, flags, context, ep_handle); + return mca_btl_uct_send_frag (uct_btl, frag, true); } struct mca_btl_uct_sendi_pack_args_t { @@ -255,9 +282,7 @@ static size_t mca_btl_uct_sendi_pack (void *data, void *arg) static inline size_t mca_btl_uct_max_sendi (mca_btl_uct_module_t *uct_btl, int context_id) { - const mca_btl_uct_tl_t *tl = uct_btl->am_tl; - return (MCA_BTL_UCT_TL_ATTR(tl, context_id).cap.am.max_short > MCA_BTL_UCT_TL_ATTR(tl, context_id).cap.am.max_bcopy) ? - MCA_BTL_UCT_TL_ATTR(tl, context_id).cap.am.max_short : MCA_BTL_UCT_TL_ATTR(tl, context_id).cap.am.max_bcopy; + return MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, context_id).cap.am.max_bcopy; } int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, opal_convertor_t *convertor, @@ -270,7 +295,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp /* message with header */ const size_t msg_size = total_size + 8; mca_btl_uct_am_header_t am_header; - ucs_status_t ucs_status = UCS_OK; + ucs_status_t ucs_status = UCS_ERR_NO_RESOURCE; uct_ep_h ep_handle; int rc; diff --git a/opal/mca/btl/uct/btl_uct_am.h b/opal/mca/btl/uct/btl_uct_am.h index 97cf46bb66..9035540e71 100644 --- a/opal/mca/btl/uct/btl_uct_am.h +++ b/opal/mca/btl/uct/btl_uct_am.h @@ -27,8 +27,7 @@ int mca_btl_uct_sendi (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endp int mca_btl_uct_send (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, mca_btl_base_descriptor_t *descriptor, mca_btl_base_tag_t tag); -int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint, mca_btl_uct_base_frag_t *frag, - int32_t flags, mca_btl_uct_device_context_t *context, uct_ep_h ep_handle); +int mca_btl_uct_send_frag (mca_btl_uct_module_t *uct_btl, mca_btl_uct_base_frag_t *frag, bool append); mca_btl_base_descriptor_t *mca_btl_uct_alloc (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoint, uint8_t order, size_t size, uint32_t flags); diff --git a/opal/mca/btl/uct/btl_uct_component.c b/opal/mca/btl/uct/btl_uct_component.c index 70f1726787..efa19656c6 100644 --- a/opal/mca/btl/uct/btl_uct_component.c +++ b/opal/mca/btl/uct/btl_uct_component.c @@ -272,7 +272,8 @@ static mca_btl_uct_module_t *mca_btl_uct_alloc_module (const char *md_name, mca_ OBJ_CONSTRUCT(&module->eager_frags, opal_free_list_t); OBJ_CONSTRUCT(&module->max_frags, opal_free_list_t); OBJ_CONSTRUCT(&module->pending_frags, opal_list_t); - OBJ_CONSTRUCT(&module->lock, opal_mutex_t); + OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t); + OBJ_CONSTRUCT(&module->pending_connection_reqs, opal_fifo_t); module->md = md; module->md_name = strdup (md_name); @@ -298,10 +299,13 @@ ucs_status_t mca_btl_uct_am_handler (void *arg, void *data, size_t length, unsig .seg_len = length - sizeof (*header)}; mca_btl_uct_base_frag_t frag = {.base = {.des_segments = &seg, .des_segment_count = 1}}; + /* prevent recursion */ + tl_context->in_am_callback = true; + reg = mca_btl_base_active_message_trigger + header->data.tag; - mca_btl_uct_context_unlock (tl_context); reg->cbfunc (&uct_btl->super, header->data.tag, &frag.base, reg->cbdata); - mca_btl_uct_context_lock (tl_context); + + tl_context->in_am_callback = false; return UCS_OK; } @@ -491,8 +495,7 @@ static int mca_btl_uct_component_progress_pending (mca_btl_uct_module_t *uct_btl opal_list_remove_item (&uct_btl->pending_frags, (opal_list_item_t *) frag); - if (OPAL_SUCCESS > mca_btl_uct_send (&uct_btl->super, frag->endpoint, &frag->base, - frag->header.data.tag)) { + if (OPAL_SUCCESS > mca_btl_uct_send_frag (uct_btl, frag, false)) { opal_list_prepend (&uct_btl->pending_frags, (opal_list_item_t *) frag); } } @@ -523,9 +526,16 @@ static int mca_btl_uct_component_progress (void) } if (module->conn_tl) { + mca_btl_uct_pending_connection_request_t *request; + if (module->conn_tl != module->am_tl && module->conn_tl != module->rdma_tl) { ret += mca_btl_uct_tl_progress (module->conn_tl, 0); } + + while (NULL != (request = (mca_btl_uct_pending_connection_request_t *) opal_fifo_pop_atomic (&module->pending_connection_reqs))) { + mca_btl_uct_process_connection_request (module, (mca_btl_uct_conn_req_t *) request->request_data); + OBJ_RELEASE(request); + } } if (0 != opal_list_get_size (&module->pending_frags)) { diff --git a/opal/mca/btl/uct/btl_uct_device_context.h b/opal/mca/btl/uct/btl_uct_device_context.h index 4d1120b6e2..f88e431471 100644 --- a/opal/mca/btl/uct/btl_uct_device_context.h +++ b/opal/mca/btl/uct/btl_uct_device_context.h @@ -89,15 +89,12 @@ mca_btl_uct_module_get_tl_context_specific (mca_btl_uct_module_t *module, mca_bt mca_btl_uct_device_context_t *context = tl->uct_dev_contexts[context_id]; if (OPAL_UNLIKELY(NULL == context)) { - mca_btl_uct_device_context_t *new_context; - - new_context = mca_btl_uct_context_create (module, tl, context_id, true); - if (!opal_atomic_compare_exchange_strong_ptr ((opal_atomic_intptr_t *) &tl->uct_dev_contexts[context_id], - (intptr_t *) &context, (intptr_t) new_context)) { - mca_btl_uct_context_destroy (new_context); - } else { - context = new_context; + OPAL_THREAD_LOCK(&module->lock); + context = tl->uct_dev_contexts[context_id]; + if (OPAL_UNLIKELY(NULL == context)) { + context = tl->uct_dev_contexts[context_id] = mca_btl_uct_context_create (module, tl, context_id, true); } + OPAL_THREAD_UNLOCK(&module->lock); } return context; diff --git a/opal/mca/btl/uct/btl_uct_endpoint.c b/opal/mca/btl/uct/btl_uct_endpoint.c index e0d39dee55..804820bbba 100644 --- a/opal/mca/btl/uct/btl_uct_endpoint.c +++ b/opal/mca/btl/uct/btl_uct_endpoint.c @@ -56,7 +56,7 @@ mca_btl_base_endpoint_t *mca_btl_uct_endpoint_create (opal_proc_t *proc) static unsigned char *mca_btl_uct_process_modex_tl (unsigned char *modex_data) { - BTL_VERBOSE(("processing modex for tl %s. size: %u", modex_data, *((uint32_t *) modex_data))); + BTL_VERBOSE(("processing modex for tl %s. size: %u", modex_data + 4, *((uint32_t *) modex_data))); /* skip size and name */ return modex_data + 4 + strlen ((char *) modex_data + 4) + 1; @@ -139,13 +139,13 @@ OBJ_CLASS_INSTANCE(mca_btl_uct_connection_ep_t, opal_object_t, mca_btl_uct_conne static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mca_btl_base_endpoint_t *endpoint, mca_btl_uct_device_context_t *conn_tl_context, - int64_t type, void *request, size_t request_length) + mca_btl_uct_conn_req_t *request, size_t request_length) { mca_btl_uct_connection_ep_t *conn_ep = endpoint->conn_ep; ucs_status_t ucs_status; - BTL_VERBOSE(("sending connection request to peer. type: %" PRId64 ", length: %" PRIsize_t, - type, request_length)); + BTL_VERBOSE(("sending connection request to peer. context id: %d, type: %d, length: %" PRIsize_t, + request->context_id, request->type, request_length)); OBJ_RETAIN(endpoint->conn_ep); @@ -154,7 +154,8 @@ static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mc do { MCA_BTL_UCT_CONTEXT_SERIALIZE(conn_tl_context, { - ucs_status = uct_ep_am_short (conn_ep->uct_ep, MCA_BTL_UCT_CONNECT_RDMA, type, request, request_length); + ucs_status = uct_ep_am_short (conn_ep->uct_ep, MCA_BTL_UCT_CONNECT_RDMA, request->type, request, + request_length); }); if (OPAL_LIKELY(UCS_OK == ucs_status)) { break; @@ -169,12 +170,10 @@ static int mca_btl_uct_endpoint_send_conn_req (mca_btl_uct_module_t *uct_btl, mc } while (1); /* for now we just wait for the connection request to complete before continuing */ - MCA_BTL_UCT_CONTEXT_SERIALIZE(conn_tl_context, { - do { - uct_worker_progress (conn_tl_context->uct_worker); - ucs_status = uct_ep_flush (conn_ep->uct_ep, 0, NULL); - } while (UCS_INPROGRESS == ucs_status); - }); + do { + ucs_status = uct_ep_flush (conn_ep->uct_ep, 0, NULL); + mca_btl_uct_context_progress (conn_tl_context); + } while (UCS_INPROGRESS == ucs_status); opal_mutex_lock (&endpoint->ep_lock); @@ -232,6 +231,7 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl, request->proc_name = OPAL_PROC_MY_NAME; request->context_id = tl_context->context_id; request->tl_index = tl->tl_index; + request->type = !!(ep_addr); if (NULL == tl_endpoint->uct_ep) { BTL_VERBOSE(("allocating endpoint for peer %s and sending connection data", @@ -244,48 +244,37 @@ static int mca_btl_uct_endpoint_connect_endpoint (mca_btl_uct_module_t *uct_btl, OBJ_RELEASE(endpoint->conn_ep); return OPAL_ERROR; } + } - /* fill in connection request */ - ucs_status = uct_ep_get_address (tl_endpoint->uct_ep, (uct_ep_addr_t *) request->ep_addr); + if (ep_addr) { + BTL_VERBOSE(("using remote endpoint address to connect endpoint for tl %s, index %d. ep_addr = %p", + tl->uct_tl_name, tl_context->context_id, ep_addr)); + + /* NTH: there is no need to lock the device context in this case */ + ucs_status = uct_ep_connect_to_ep (tl_endpoint->uct_ep, (uct_device_addr_t *) tl_data, ep_addr); if (UCS_OK != ucs_status) { - /* this is a fatal a fatal error */ - OBJ_RELEASE(endpoint->conn_ep); - uct_ep_destroy (tl_endpoint->uct_ep); - tl_endpoint->uct_ep = NULL; - return OPAL_ERROR; - } - - rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, 0, request, - request_length); - if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { - OBJ_RELEASE(endpoint->conn_ep); - uct_ep_destroy (tl_endpoint->uct_ep); - tl_endpoint->uct_ep = NULL; return OPAL_ERROR; } } - if (ep_addr) { - BTL_VERBOSE(("using remote endpoint address to connect endpoint. ep_addr = %p", ep_addr)); + /* fill in connection request */ + ucs_status = uct_ep_get_address (tl_endpoint->uct_ep, (uct_ep_addr_t *) request->ep_addr); + if (UCS_OK != ucs_status) { + /* this is a fatal a fatal error */ + OBJ_RELEASE(endpoint->conn_ep); + uct_ep_destroy (tl_endpoint->uct_ep); + tl_endpoint->uct_ep = NULL; + return OPAL_ERROR; + } - device_addr = (uct_device_addr_t *) tl_data; - - /* NTH: there is no need to lock the device context in this case */ - ucs_status = uct_ep_connect_to_ep (tl_endpoint->uct_ep, device_addr, ep_addr); - if (UCS_OK != ucs_status) { - return OPAL_ERROR; - } - - /* let the remote side know that the connection has been established and - * wait for the message to be sent */ - rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, 1, request, - sizeof (mca_btl_uct_conn_req_t)); - if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { - OBJ_RELEASE(endpoint->conn_ep); - uct_ep_destroy (tl_endpoint->uct_ep); - tl_endpoint->uct_ep = NULL; - return OPAL_ERROR; - } + /* let the remote side know that the connection has been established and + * wait for the message to be sent */ + rc = mca_btl_uct_endpoint_send_conn_req (uct_btl, endpoint, conn_tl_context, request, request_length); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + OBJ_RELEASE(endpoint->conn_ep); + uct_ep_destroy (tl_endpoint->uct_ep); + tl_endpoint->uct_ep = NULL; + return OPAL_ERROR; } return (tl_endpoint->flags & MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY) ? OPAL_SUCCESS : OPAL_ERR_OUT_OF_RESOURCE; diff --git a/opal/mca/btl/uct/btl_uct_endpoint.h b/opal/mca/btl/uct/btl_uct_endpoint.h index 9a264bddbb..6add6f2719 100644 --- a/opal/mca/btl/uct/btl_uct_endpoint.h +++ b/opal/mca/btl/uct/btl_uct_endpoint.h @@ -72,7 +72,8 @@ static inline int mca_btl_uct_endpoint_check (mca_btl_uct_module_t *module, mca_ rc = mca_btl_uct_endpoint_connect (module, endpoint, ep_index, NULL, tl_index); *ep_handle = endpoint->uct_eps[ep_index][tl_index].uct_ep; - BTL_VERBOSE(("mca_btl_uct_endpoint_connect returned %d", rc)); + BTL_VERBOSE(("mca_btl_uct_endpoint_connect returned %d. context id = %d, flags = 0x%x", rc, ep_index, + MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY & endpoint->uct_eps[ep_index][tl_index].flags)); return rc; } diff --git a/opal/mca/btl/uct/btl_uct_module.c b/opal/mca/btl/uct/btl_uct_module.c index ebd7ab6d68..f080286754 100644 --- a/opal/mca/btl/uct/btl_uct_module.c +++ b/opal/mca/btl/uct/btl_uct_module.c @@ -74,7 +74,6 @@ static int mca_btl_uct_add_procs (mca_btl_base_module_t *btl, if (false == uct_module->initialized) { mca_btl_uct_tl_t *am_tl = uct_module->am_tl; - mca_btl_uct_tl_t *rdma_tl = uct_module->rdma_tl; /* NTH: might want to vary this size based off the universe size (if * one exists). the table is only used for connection lookup and @@ -277,6 +276,7 @@ int mca_btl_uct_finalize (mca_btl_base_module_t* btl) OBJ_DESTRUCT(&uct_module->max_frags); OBJ_DESTRUCT(&uct_module->pending_frags); OBJ_DESTRUCT(&uct_module->lock); + OBJ_DESTRUCT(&uct_module->pending_connection_reqs); if (uct_module->rcache) { mca_rcache_base_module_destroy (uct_module->rcache); diff --git a/opal/mca/btl/uct/btl_uct_rdma.c b/opal/mca/btl/uct/btl_uct_rdma.c index 708dea148d..2d2d1c3f04 100644 --- a/opal/mca/btl/uct/btl_uct_rdma.c +++ b/opal/mca/btl/uct/btl_uct_rdma.c @@ -107,19 +107,24 @@ int mca_btl_uct_get (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi ucs_status = uct_ep_get_zcopy (ep_handle, &iov, 1, remote_address, rkey.rkey, &comp->uct_comp); } - /* go ahead and progress the worker while we have the lock */ - (void) uct_worker_progress (context->uct_worker); + /* go ahead and progress the worker while we have the lock (if we are not in an AM callback) */ + if (!context->in_am_callback) { + (void) uct_worker_progress (context->uct_worker); + } mca_btl_uct_context_unlock (context); - mca_btl_uct_device_handle_completions (context); + if (!context->in_am_callback) { + mca_btl_uct_device_handle_completions (context); + } if (UCS_OK == ucs_status && cbfunc) { /* if UCS_OK is returned the callback will never fire so we have to make the callback * ourselves */ cbfunc (btl, endpoint, local_address, local_handle, cbcontext, cbdata, OPAL_SUCCESS); - mca_btl_uct_uct_completion_release (comp); - } else if (UCS_INPROGRESS == ucs_status) { + } + + if (UCS_INPROGRESS == ucs_status) { ucs_status = UCS_OK; } else { mca_btl_uct_uct_completion_release (comp); @@ -203,8 +208,11 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi } /* go ahead and progress the worker while we have the lock */ - if (UCS_ERR_NO_RESOURCE != ucs_status) { - (void) uct_worker_progress (context->uct_worker); + if (UCS_ERR_NO_RESOURCE != ucs_status || context->in_am_callback) { + if (!context->in_am_callback) { + (void) uct_worker_progress (context->uct_worker); + } + break; } @@ -221,9 +229,12 @@ int mca_btl_uct_put (mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpoi * ourselves. this callback is possibly being made before the data is visible to the * remote process. */ cbfunc (btl, endpoint, local_address, local_handle, cbcontext, cbdata, OPAL_SUCCESS); - mca_btl_uct_uct_completion_release (comp); - } else if (UCS_INPROGRESS == ucs_status) { + } + + if (UCS_INPROGRESS == ucs_status) { ucs_status = UCS_OK; + } else { + mca_btl_uct_uct_completion_release (comp); } uct_rkey_release (&rkey); diff --git a/opal/mca/btl/uct/btl_uct_tl.c b/opal/mca/btl/uct/btl_uct_tl.c index bca62c4813..721015fb2a 100644 --- a/opal/mca/btl/uct/btl_uct_tl.c +++ b/opal/mca/btl/uct/btl_uct_tl.c @@ -164,60 +164,70 @@ OBJ_CLASS_INSTANCE(mca_btl_uct_tl_t, opal_list_item_t, mca_btl_uct_tl_constructo static ucs_status_t mca_btl_uct_conn_req_cb (void *arg, void *data, size_t length, unsigned flags) { mca_btl_uct_module_t *module = (mca_btl_uct_module_t *) arg; - mca_btl_uct_conn_req_t *req = (mca_btl_uct_conn_req_t *) ((uintptr_t) data + 8); + mca_btl_uct_pending_connection_request_t *request = calloc (1, length + sizeof (request->super)); + + /* it is not safe to process the connection request from the callback so just save it for + * later processing */ + OBJ_CONSTRUCT(request, mca_btl_uct_pending_connection_request_t); + memcpy (&request->request_data, (void *) ((intptr_t) data + 8), length); + opal_fifo_push_atomic (&module->pending_connection_reqs, &request->super); + + return UCS_OK; +} + +OBJ_CLASS_INSTANCE(mca_btl_uct_pending_connection_request_t, opal_list_item_t, NULL, NULL); + +int mca_btl_uct_process_connection_request (mca_btl_uct_module_t *module, mca_btl_uct_conn_req_t *req) +{ struct opal_proc_t *remote_proc = opal_proc_for_name (req->proc_name); mca_btl_base_endpoint_t *endpoint = mca_btl_uct_get_ep (&module->super, remote_proc); mca_btl_uct_tl_endpoint_t *tl_endpoint = endpoint->uct_eps[req->context_id] + req->tl_index; - int64_t type = *((int64_t *) data); int32_t ep_flags; int rc; - BTL_VERBOSE(("got connection request for endpoint %p. length = %lu", (void *) endpoint, length)); + BTL_VERBOSE(("got connection request for endpoint %p. type = %d. context id = %d", + (void *) endpoint, req->type, req->context_id)); if (NULL == endpoint) { BTL_ERROR(("could not create endpoint for connection request")); return UCS_ERR_UNREACHABLE; } - assert (type < 2); + assert (req->type < 2); - if (0 == type) { + ep_flags = opal_atomic_fetch_or_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC); + + if (!(ep_flags & MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) { /* create any necessary resources */ rc = mca_btl_uct_endpoint_connect (module, endpoint, req->context_id, req->ep_addr, req->tl_index); if (OPAL_SUCCESS != rc && OPAL_ERR_OUT_OF_RESOURCE != rc) { - BTL_ERROR(("could not setup rdma endpoint")); - return UCS_ERR_UNREACHABLE; + BTL_ERROR(("could not setup rdma endpoint. rc = %d", rc)); + return rc; } - - ep_flags = opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC); - } else { - ep_flags = opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY); } /* the connection is ready once we have received the connection data and also a connection ready * message. this might be overkill but there is little documentation at the UCT level on when * an endpoint can be used. */ - if ((ep_flags & (MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY | MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) == - (MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REM_READY | MCA_BTL_UCT_ENDPOINT_FLAG_CONN_REC)) { + if (req->type == 1) { + /* remote side is ready */ mca_btl_uct_base_frag_t *frag; /* to avoid a race with send adding pending frags grab the lock here */ - OPAL_THREAD_LOCK(&endpoint->ep_lock); - (void) opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY); - OPAL_THREAD_UNLOCK(&endpoint->ep_lock); + OPAL_THREAD_SCOPED_LOCK(&endpoint->ep_lock,{ + BTL_VERBOSE(("connection ready. sending %d frags", opal_list_get_size (&module->pending_frags))); + (void) opal_atomic_or_fetch_32 (&tl_endpoint->flags, MCA_BTL_UCT_ENDPOINT_FLAG_CONN_READY); + opal_atomic_wmb (); - opal_atomic_wmb (); - - OPAL_THREAD_SCOPED_LOCK(&module->lock, { OPAL_LIST_FOREACH(frag, &module->pending_frags, mca_btl_uct_base_frag_t) { - if (frag->context_id == req->context_id && endpoint == frag->endpoint) { + if (frag->context->context_id == req->context_id && endpoint == frag->endpoint) { frag->ready = true; } } }); } - return UCS_OK; + return OPAL_SUCCESS; } static int mca_btl_uct_setup_connection_tl (mca_btl_uct_module_t *module) @@ -284,7 +294,7 @@ mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t * * use our own locks just go ahead and use UCS_THREAD_MODE_SINGLE. if they ever fix their * api then change this back to UCS_THREAD_MODE_MULTI and remove the locks around the * various UCT calls. */ - ucs_status = uct_worker_create (module->ucs_async, UCS_THREAD_MODE_SERIALIZED, &context->uct_worker); + ucs_status = uct_worker_create (module->ucs_async, UCS_THREAD_MODE_SINGLE, &context->uct_worker); if (OPAL_UNLIKELY(UCS_OK != ucs_status)) { BTL_VERBOSE(("could not create a UCT worker")); mca_btl_uct_context_destroy (context); @@ -307,17 +317,17 @@ mca_btl_uct_device_context_t *mca_btl_uct_context_create (mca_btl_uct_module_t * return NULL; } - if (enable_progress) { - BTL_VERBOSE(("enabling progress for tl %p context id %d", (void *) tl, context_id)); - mca_btl_uct_context_enable_progress (context); - } - if (context_id > 0 && tl == module->am_tl) { BTL_VERBOSE(("installing AM handler for tl %p context id %d", (void *) tl, context_id)); uct_iface_set_am_handler (context->uct_iface, MCA_BTL_UCT_FRAG, mca_btl_uct_am_handler, context, UCT_CB_FLAG_SYNC); } + if (enable_progress) { + BTL_VERBOSE(("enabling progress for tl %p context id %d", (void *) tl, context_id)); + mca_btl_uct_context_enable_progress (context); + } + return context; } @@ -349,7 +359,6 @@ static int tl_compare (opal_list_item_t **a, opal_list_item_t **b) static mca_btl_uct_tl_t *mca_btl_uct_create_tl (mca_btl_uct_module_t *module, mca_btl_uct_md_t *md, uct_tl_resource_desc_t *tl_desc, int priority) { mca_btl_uct_tl_t *tl = OBJ_NEW(mca_btl_uct_tl_t); - ucs_status_t ucs_status; if (OPAL_UNLIKELY(NULL == tl)) { return NULL; @@ -434,6 +443,9 @@ static void mca_btl_uct_set_tl_am (mca_btl_uct_module_t *module, mca_btl_uct_tl_ if (tl->max_device_contexts <= 1) { tl->max_device_contexts = mca_btl_uct_component.num_contexts_per_module; } + + module->super.btl_max_send_size = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.am.max_zcopy - sizeof (mca_btl_uct_am_header_t); + module->super.btl_eager_limit = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.am.max_bcopy - sizeof (mca_btl_uct_am_header_t); } static int mca_btl_uct_set_tl_conn (mca_btl_uct_module_t *module, mca_btl_uct_tl_t *tl) diff --git a/opal/mca/btl/uct/btl_uct_types.h b/opal/mca/btl/uct/btl_uct_types.h index 1df08eb199..4b0df21e2a 100644 --- a/opal/mca/btl/uct/btl_uct_types.h +++ b/opal/mca/btl/uct/btl_uct_types.h @@ -77,6 +77,9 @@ struct mca_btl_uct_conn_req_t { /** name of the requesting process */ opal_process_name_t proc_name; + /** request type: 0 == endpoint data, 1 == endpoint data + remote ready */ + int type; + /** context id that should be connected */ int context_id; @@ -153,6 +156,9 @@ struct mca_btl_uct_device_context_t { /** progress is enabled on this context */ bool progress_enabled; + + /** context is in AM callback */ + volatile bool in_am_callback; }; typedef struct mca_btl_uct_device_context_t mca_btl_uct_device_context_t; @@ -238,8 +244,8 @@ struct mca_btl_uct_base_frag_t { /** module this fragment is associated with */ struct mca_btl_uct_module_t *btl; - /** context this fragment is waiting on */ - int context_id; + /* tl context */ + mca_btl_uct_device_context_t *context; /** is this frag ready to send (only used when pending) */ bool ready; @@ -326,4 +332,12 @@ OBJ_CLASS_DECLARATION(mca_btl_uct_tl_t); #define MCA_BTL_UCT_TL_ATTR(tl, context_id) (tl)->uct_dev_contexts[(context_id)]->uct_iface_attr +struct mca_btl_uct_pending_connection_request_t { + opal_list_item_t super; + uint8_t request_data[]; +}; + +typedef struct mca_btl_uct_pending_connection_request_t mca_btl_uct_pending_connection_request_t; +OBJ_CLASS_DECLARATION(mca_btl_uct_pending_connection_request_t); + #endif /* !defined(BTL_UCT_TYPES_H) */