From 824afac48349c7c92122385e14a6e0d0a29d18c0 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 11:17:35 +0200 Subject: [PATCH] UCX common: add non-blocking compare-and-swap Signed-off-by: Joseph Schuchart --- opal/mca/common/ucx/common_ucx.h | 41 +++++++++++++-------- opal/mca/common/ucx/common_ucx_wpool.h | 50 ++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 5adbe190c4..2195c543f5 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -206,22 +206,33 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare, uint64_t remote_addr, ucp_rkey_h rkey, ucp_worker_h worker) { - uint64_t tmp = value; - int ret; - - ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp, - op_size, remote_addr, rkey, worker); - if (OPAL_LIKELY(OPAL_SUCCESS == ret)) { - /* in case if op_size is constant (like sizeof(type)) then this condition - * is evaluated in compile time */ - if (op_size == sizeof(uint64_t)) { - *(uint64_t*)result = tmp; - } else { - assert(op_size == sizeof(uint32_t)); - *(uint32_t*)result = tmp; - } + if (op_size == sizeof(uint64_t)) { + *(uint64_t*)result = value; + } else { + assert(op_size == sizeof(uint32_t)); + *(uint32_t*)result = value; } - return ret; + + return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, + op_size, remote_addr, rkey, worker); +} + +static inline +ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare, + uint64_t value, void *result, size_t op_size, + uint64_t remote_addr, ucp_rkey_h rkey, + ucp_send_callback_t req_handler, + ucp_worker_h worker) +{ + if (op_size == sizeof(uint64_t)) { + *(uint64_t*)result = value; + } else { + assert(op_size == sizeof(uint32_t)); + *(uint32_t*)result = value; + } + + return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, + op_size, remote_addr, rkey, req_handler, worker); } END_C_DECLS diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 79f2163c43..95308fd56c 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -418,6 +418,56 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare, return rc; } + +static inline int +opal_common_ucx_wpmem_cmpswp_nb(opal_common_ucx_wpmem_t *mem, uint64_t compare, + uint64_t value, int target, void *buffer, size_t len, + uint64_t rem_addr, + opal_common_ucx_user_req_handler_t user_req_cb, + void *user_req_ptr) +{ + ucp_ep_h ep; + ucp_rkey_h rkey; + opal_common_ucx_winfo_t *winfo = NULL; + opal_common_ucx_request_t *req; + int rc = OPAL_SUCCESS; + + rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc); + return rc; + } + + /* Perform the operation */ + opal_mutex_lock(&winfo->mutex); + req = opal_common_ucx_atomic_cswap_nb(ep, compare, value, + buffer, len, + rem_addr, rkey, opal_common_ucx_req_completion, + winfo->worker); + + if (UCS_PTR_IS_PTR(req)) { + req->ext_req = user_req_ptr; + req->ext_cb = user_req_cb; + req->winfo = winfo; + } else { + if (user_req_cb != NULL) { + (*user_req_cb)(user_req_ptr); + } + } + + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + + opal_mutex_unlock(&winfo->mutex); + + return rc; +} + + static inline int opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode, uint64_t value, int target, size_t len, uint64_t rem_addr)