From 33517428a1c2f36180f6eee3a2b86365c1026802 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Tue, 4 Dec 2018 13:45:32 -0800 Subject: [PATCH] opal/common/ucx: add periodical flush and counter to opal directory. Signed-off-by: Xin Zhao --- opal/mca/common/ucx/common_ucx_int.h | 3 + opal/mca/common/ucx/common_ucx_wpool.c | 105 +++++++++++++++++++------ opal/mca/common/ucx/common_ucx_wpool.h | 103 +++++++++++++++++++++++- 3 files changed, 182 insertions(+), 29 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx_int.h b/opal/mca/common/ucx/common_ucx_int.h index 304d5421d2..5e189ca793 100644 --- a/opal/mca/common/ucx/common_ucx_int.h +++ b/opal/mca/common/ucx/common_ucx_int.h @@ -25,6 +25,9 @@ BEGIN_C_DECLS # define MCA_COMMON_UCX_ASSERT(_x) #endif +#define MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD 100000 +#define MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD 1000000 + #define _MCA_COMMON_UCX_QUOTE(_x) \ # _x #define MCA_COMMON_UCX_QUOTE(_x) \ diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index 57457b7ec5..0db8c7922e 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -62,6 +62,9 @@ _winfo_create(opal_common_ucx_wpool_t *wpool) winfo->endpoints = NULL; winfo->comm_size = 0; winfo->released = 0; + winfo->inflight_ops = NULL; + winfo->global_inflight_ops = 0; + winfo->inflight_req = UCS_OK; WPOOL_DBG_OUT(_dbg_winfo, "winfo = %p, worker = %p\n", (void*)winfo, (void *)winfo->worker); @@ -76,14 +79,24 @@ exit: static void _winfo_reset(opal_common_ucx_winfo_t *winfo) { + if (winfo->inflight_req != UCS_OK) { + opal_common_ucx_wait_request(winfo->inflight_req, winfo->worker, + "opal_common_ucx_flush"); + winfo->inflight_req = UCS_OK; + } + + assert(winfo->global_inflight_ops == 0); + if(winfo->comm_size != 0) { size_t i; for (i = 0; i < winfo->comm_size; i++) { if (NULL != winfo->endpoints[i]){ ucp_ep_destroy(winfo->endpoints[i]); } + assert(winfo->inflight_ops[i] == 0); } free(winfo->endpoints); + free(winfo->inflight_ops); } winfo->endpoints = NULL; winfo->comm_size = 0; @@ -372,6 +385,7 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size) (void *)wpool, (void *)winfo); winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h)); + winfo->inflight_ops = calloc(comm_size, sizeof(short)); winfo->comm_size = comm_size; return winfo; } @@ -1213,6 +1227,46 @@ opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target) return OPAL_SUCCESS; } +OPAL_DECLSPEC int +opal_common_ucx_flush(ucp_ep_h ep, ucp_worker_h worker, + opal_common_ucx_flush_type_t type, + opal_common_ucx_flush_scope_t scope, + ucs_status_ptr_t *req_ptr) +{ + ucs_status_ptr_t req; + ucs_status_t status = UCS_OK; + int rc = OPAL_SUCCESS; + +#if HAVE_DECL_UCP_EP_FLUSH_NB + if (scope == OPAL_COMMON_UCX_SCOPE_EP) { + req = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb); + } else { + req = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb); + } + if(OPAL_COMMON_UCX_FLUSH_B) { + rc = opal_common_ucx_wait_request(req, worker, "ucp_ep_flush_nb"); + } else { + *req_ptr = req; + } + return rc; +#endif + switch (type) { + case OPAL_COMMON_UCX_FLUSH_NB_PREFERRED: + case OPAL_COMMON_UCX_FLUSH_B: + if (scope == OPAL_COMMON_UCX_SCOPE_EP) { + status = ucp_ep_flush(ep); + } else { + status = ucp_worker_flush(worker); + } + rc = (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR; + case OPAL_COMMON_UCX_FLUSH_NB: + default: + rc = OPAL_ERROR; + } + return rc; +} + + OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, opal_common_ucx_flush_scope_t scope, @@ -1228,37 +1282,36 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, opal_mutex_lock(&ctx->mutex); OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) { + if ((scope == OPAL_COMMON_UCX_SCOPE_EP) && + (NULL == item->ptr->endpoints[target])) { + continue; + } + opal_mutex_lock(&item->ptr->mutex); + rc = opal_common_ucx_flush(item->ptr->endpoints[target], + item->ptr->worker, OPAL_COMMON_UCX_FLUSH_B, + scope, NULL); switch (scope) { case OPAL_COMMON_UCX_SCOPE_WORKER: - opal_mutex_lock(&item->ptr->mutex); - rc = opal_common_ucx_worker_flush(item->ptr->worker); - if (rc != OPAL_SUCCESS) { - MCA_COMMON_UCX_ERROR("opal_common_ucx_worker_flush failed: %d", - rc); - rc = OPAL_ERROR; - } - WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, "worker = %p\n", - (void *)item->ptr->worker); - opal_mutex_unlock(&item->ptr->mutex); + item->ptr->global_inflight_ops = 0; + memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short)); break; case OPAL_COMMON_UCX_SCOPE_EP: - if (NULL != item->ptr->endpoints[target] ) { - opal_mutex_lock(&item->ptr->mutex); - rc = opal_common_ucx_ep_flush(item->ptr->endpoints[target], - item->ptr->worker); - if (rc != OPAL_SUCCESS) { - MCA_COMMON_UCX_ERROR("opal_common_ucx_ep_flush failed: %d", - rc); - rc = OPAL_ERROR; - } - WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, - "target = %d, ep = %p worker = %p\n", - (int)target, - (void *)item->ptr->endpoints[target], - (void *)item->ptr->worker); - opal_mutex_unlock(&item->ptr->mutex); - } + item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target]; + item->ptr->inflight_ops[target] = 0; + break; } + opal_mutex_unlock(&item->ptr->mutex); + + if (rc != OPAL_SUCCESS) { + MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", + rc); + rc = OPAL_ERROR; + } + WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, + "target = %d, ep = %p worker = %p\n", + (int)target, + (void *)item->ptr->endpoints[target], + (void *)item->ptr->worker); } opal_mutex_unlock(&ctx->mutex); diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 8da87ee05e..48f72d4800 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -7,6 +7,7 @@ #include "common_ucx_int.h" #include "common_ucx_request.h" #include +#include #include #include @@ -84,6 +85,9 @@ typedef struct { ucp_worker_h worker; ucp_ep_h *endpoints; size_t comm_size; + short *inflight_ops; + short global_inflight_ops; + ucs_status_ptr_t inflight_req; } opal_common_ucx_winfo_t; typedef struct { @@ -101,6 +105,12 @@ typedef enum { OPAL_COMMON_UCX_SCOPE_WORKER } opal_common_ucx_flush_scope_t; +typedef enum { + OPAL_COMMON_UCX_FLUSH_NB, + OPAL_COMMON_UCX_FLUSH_B, + OPAL_COMMON_UCX_FLUSH_NB_PREFERRED +} opal_common_ucx_flush_type_t; + typedef enum { OPAL_COMMON_UCX_MEM_ALLOCATE_MAP, OPAL_COMMON_UCX_MEM_MAP @@ -236,6 +246,58 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, int target); OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem); +OPAL_DECLSPEC int opal_common_ucx_flush(ucp_ep_h ep, ucp_worker_h worker, + opal_common_ucx_flush_type_t type, + opal_common_ucx_flush_scope_t scope, + ucs_status_ptr_t *req_ptr); + +static inline int _periodical_flush_nb(opal_common_ucx_wpmem_t *mem, + opal_common_ucx_winfo_t *winfo, + int target) { + int rc = OPAL_SUCCESS; + + winfo->inflight_ops[target]++; + winfo->global_inflight_ops++; + + if (OPAL_UNLIKELY(winfo->inflight_ops[target] >= MCA_COMMON_UCX_PER_TARGET_OPS_THRESHOLD) || + OPAL_UNLIKELY(winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD)) { + opal_common_ucx_flush_scope_t scope; + + if (winfo->inflight_req != UCS_OK) { + rc = opal_common_ucx_wait_request(winfo->inflight_req, winfo->worker, + "opal_common_ucx_flush_nb"); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_wait_request failed: %d", rc); + return rc; + } + winfo->inflight_req = UCS_OK; + } + + if (winfo->global_inflight_ops >= MCA_COMMON_UCX_GLOBAL_OPS_THRESHOLD) { + scope = OPAL_COMMON_UCX_SCOPE_WORKER; + winfo->global_inflight_ops = 0; + memset(winfo->inflight_ops, 0, winfo->comm_size * sizeof(short)); + } else { + scope = OPAL_COMMON_UCX_SCOPE_EP; + winfo->global_inflight_ops -= winfo->inflight_ops[target]; + winfo->inflight_ops[target] = 0; + } + + rc = opal_common_ucx_flush(winfo->endpoints[target], winfo->worker, + OPAL_COMMON_UCX_FLUSH_NB_PREFERRED, scope, + &winfo->inflight_req); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_flush failed: %d", rc); + return rc; + } + } else if (OPAL_UNLIKELY(winfo->inflight_req != UCS_OK)) { + int ret; + do { + ret = ucp_worker_progress(winfo->worker); + } while (ret); + } + return rc; +} static inline int opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t op, @@ -269,7 +331,6 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t called_func = "ucp_get_nbi"; break; } - opal_mutex_unlock(&winfo->mutex); if (OPAL_UNLIKELY(status != UCS_OK && status != UCS_INPROGRESS)) { MCA_COMMON_UCX_ERROR("%s failed: %d", called_func, status); @@ -278,6 +339,15 @@ opal_common_ucx_wpmem_putget(opal_common_ucx_wpmem_t *mem, opal_common_ucx_op_t WPOOL_DBG_OUT(_dbg_mem,"ep = %p, rkey = %p\n", (void *)ep, (void *)rkey); } + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + + opal_mutex_unlock(&winfo->mutex); + return rc; } @@ -314,6 +384,13 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare, WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n", (void *)ep, (void *)rkey); } + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + opal_mutex_unlock(&winfo->mutex); return rc; @@ -349,6 +426,13 @@ opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t op WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n", (void *)ep, (void *)rkey); } + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + opal_mutex_unlock(&winfo->mutex); return rc; } @@ -386,6 +470,13 @@ opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem, WPOOL_DBG_OUT(_dbg_mem, "ep = %p, rkey = %p\n", (void *)ep, (void *)rkey); } + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + opal_mutex_unlock(&winfo->mutex); return rc; @@ -416,8 +507,6 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem, req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len, rem_addr, rkey, opal_common_ucx_req_completion, winfo->worker); - opal_mutex_unlock(&winfo->mutex); - if (UCS_PTR_IS_PTR(req)) { req->ext_req = user_req_ptr; req->ext_cb = user_req_cb; @@ -427,6 +516,14 @@ opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem, } } + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + + opal_mutex_unlock(&winfo->mutex); + return rc; }