1
1

UCX common: add non-blocking compare-and-swap

Signed-off-by: Joseph Schuchart <schuchart@hlrs.de>
Этот коммит содержится в:
Joseph Schuchart 2020-04-03 11:17:35 +02:00
родитель 5f786bcce4
Коммит 824afac483
2 изменённых файлов: 76 добавлений и 15 удалений

Просмотреть файл

@ -206,22 +206,33 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker)
{
uint64_t tmp = value;
int ret;
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
op_size, remote_addr, rkey, worker);
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
/* in case if op_size is constant (like sizeof(type)) then this condition
* is evaluated in compile time */
if (op_size == sizeof(uint64_t)) {
*(uint64_t*)result = tmp;
*(uint64_t*)result = value;
} else {
assert(op_size == sizeof(uint32_t));
*(uint32_t*)result = tmp;
*(uint32_t*)result = value;
}
return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
op_size, remote_addr, rkey, worker);
}
static inline
ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_send_callback_t req_handler,
ucp_worker_h worker)
{
if (op_size == sizeof(uint64_t)) {
*(uint64_t*)result = value;
} else {
assert(op_size == sizeof(uint32_t));
*(uint32_t*)result = value;
}
return ret;
return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
op_size, remote_addr, rkey, req_handler, worker);
}
END_C_DECLS

Просмотреть файл

@ -418,6 +418,56 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
return rc;
}
static inline int
opal_common_ucx_wpmem_cmpswp_nb(opal_common_ucx_wpmem_t *mem, uint64_t compare,
uint64_t value, int target, void *buffer, size_t len,
uint64_t rem_addr,
opal_common_ucx_user_req_handler_t user_req_cb,
void *user_req_ptr)
{
ucp_ep_h ep;
ucp_rkey_h rkey;
opal_common_ucx_winfo_t *winfo = NULL;
opal_common_ucx_request_t *req;
int rc = OPAL_SUCCESS;
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc);
return rc;
}
/* Perform the operation */
opal_mutex_lock(&winfo->mutex);
req = opal_common_ucx_atomic_cswap_nb(ep, compare, value,
buffer, len,
rem_addr, rkey, opal_common_ucx_req_completion,
winfo->worker);
if (UCS_PTR_IS_PTR(req)) {
req->ext_req = user_req_ptr;
req->ext_cb = user_req_cb;
req->winfo = winfo;
} else {
if (user_req_cb != NULL) {
(*user_req_cb)(user_req_ptr);
}
}
rc = _periodical_flush_nb(mem, winfo, target);
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
return rc;
}
opal_mutex_unlock(&winfo->mutex);
return rc;
}
static inline int
opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
uint64_t value, int target, size_t len, uint64_t rem_addr)