From 2c8da2c0a95182554d8d603077fffa8dbf53f1eb Mon Sep 17 00:00:00 2001 From: Tommy Janjusic Date: Wed, 29 Jul 2020 09:47:15 -0500 Subject: [PATCH] Further code reduction and simplifications. Co-authored-by: Artem Polyakov Signed-off-by: Tomislav Janjusic --- opal/mca/common/ucx/common_ucx_wpool.c | 194 ++++++++------------- opal/mca/common/ucx/common_ucx_wpool.h | 19 +- opal/mca/common/ucx/common_ucx_wpool_int.h | 20 +-- opal/util/net.c | 5 - 4 files changed, 85 insertions(+), 153 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index c9793a7079..25e28e8e15 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -20,7 +20,7 @@ ******************************************************************************* ******************************************************************************/ -OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL); +OBJ_CLASS_INSTANCE(opal_common_ucx_winfo_t, opal_list_item_t, NULL, _winfo_destructor); OBJ_CLASS_INSTANCE(_ctx_record_t, opal_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(_mem_record_t, opal_list_item_t, NULL, NULL); @@ -69,7 +69,6 @@ _winfo_create(opal_common_ucx_wpool_t *wpool) winfo->worker = worker; winfo->endpoints = NULL; winfo->comm_size = 0; - winfo->released = 0; winfo->inflight_ops = NULL; winfo->global_inflight_ops = 0; winfo->inflight_req = UCS_OK; @@ -83,7 +82,7 @@ exit: } static void -_winfo_reset(opal_common_ucx_winfo_t *winfo) +_winfo_destructor(opal_common_ucx_winfo_t *winfo) { if (winfo->inflight_req != UCS_OK) { opal_common_ucx_wait_request_mt(winfo->inflight_req, @@ -106,15 +105,9 @@ _winfo_reset(opal_common_ucx_winfo_t *winfo) } winfo->endpoints = NULL; winfo->comm_size = 0; - winfo->released = 0; -} -static void -_winfo_release(opal_common_ucx_winfo_t *winfo) -{ OBJ_DESTRUCT(&winfo->mutex); ucp_worker_destroy(winfo->worker); - free(winfo); } /* ----------------------------------------------------------------------------- @@ -137,6 +130,9 @@ opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool) free(wpool); } +static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list, + opal_common_ucx_winfo_t *winfo); + OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, int proc_world_size, bool enable_mt) @@ -158,7 +154,7 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, status = ucp_config_read("MPI", NULL, &config); if (UCS_OK != status) { MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status); - + return OPAL_ERROR; } /* initialize UCP context */ @@ -241,12 +237,11 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) /* Go over the list, free idle list items */ if (!opal_list_is_empty(&wpool->idle_workers)) { - _winfo_list_item_t *item, *next; - OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers, - _winfo_list_item_t) { - opal_list_remove_item(&wpool->idle_workers, &item->super); - _winfo_release(item->ptr); - OBJ_RELEASE(item); + opal_common_ucx_winfo_t *winfo, *next; + OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->idle_workers, + opal_common_ucx_winfo_t) { + opal_list_remove_item(&wpool->idle_workers, &winfo->super); + OBJ_RELEASE(winfo); } } OBJ_DESTRUCT(&wpool->idle_workers); @@ -254,18 +249,15 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) /* Release active workers. They are no longer active actually * because opal_common_ucx_wpool_finalize is being called. */ if (!opal_list_is_empty(&wpool->active_workers)) { - _winfo_list_item_t *item, *next; - OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers, - _winfo_list_item_t) { - opal_list_remove_item(&wpool->active_workers, &item->super); - _winfo_reset(item->ptr); - _winfo_release(item->ptr); - OBJ_RELEASE(item); + opal_common_ucx_winfo_t *winfo, *next; + OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers, + opal_common_ucx_winfo_t) { + opal_list_remove_item(&wpool->active_workers, &winfo->super); + OBJ_RELEASE(winfo); } } OBJ_DESTRUCT(&wpool->active_workers); - OBJ_DESTRUCT(&wpool->mutex); ucp_cleanup(wpool->ucp_ctx); return; @@ -274,34 +266,23 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) OPAL_DECLSPEC int opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool) { - _winfo_list_item_t *item = NULL, *next = NULL; + opal_common_ucx_winfo_t *winfo = NULL, *next = NULL; int completed = 0, progressed = 0; /* Go over all active workers and progress them * TODO: may want to have some partitioning to progress only part of * workers */ - if (!opal_mutex_trylock (&wpool->mutex)) { - OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers, - _winfo_list_item_t) { - opal_common_ucx_winfo_t *winfo = item->ptr; - opal_mutex_lock(&winfo->mutex); - if( OPAL_UNLIKELY(winfo->released) ) { - /* Do garbage collection of worker info's if needed */ - opal_list_remove_item(&wpool->active_workers, &item->super); - _winfo_reset(winfo); - opal_list_append(&wpool->idle_workers, &item->super); - completed++; - } else { - /* Progress worker until there are existing events */ - do { - progressed = ucp_worker_progress(winfo->worker); - completed += progressed; - } while (progressed); - } - opal_mutex_unlock(&winfo->mutex); - } - opal_mutex_unlock(&wpool->mutex); + opal_mutex_lock(&wpool->mutex); + OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers, + opal_common_ucx_winfo_t) { + opal_mutex_lock(&winfo->mutex); + do { + progressed = ucp_worker_progress(winfo->worker); + completed += progressed; + } while (progressed); + opal_mutex_unlock(&winfo->mutex); } + opal_mutex_unlock(&wpool->mutex); return completed; } @@ -309,20 +290,7 @@ static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list, opal_common_ucx_winfo_t *winfo) { - _winfo_list_item_t *item; - - item = OBJ_NEW(_winfo_list_item_t); - if (NULL == item) { - MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item"); - return OPAL_ERR_OUT_OF_RESOURCE; - } - item->ptr = winfo; - winfo->self = item; - - opal_mutex_lock(&wpool->mutex); - opal_list_append(list, &item->super); - opal_mutex_unlock(&wpool->mutex); - + opal_list_append(list, &winfo->super); return OPAL_SUCCESS; } @@ -330,31 +298,26 @@ static opal_common_ucx_winfo_t* _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list) { opal_common_ucx_winfo_t *winfo = NULL; - _winfo_list_item_t *item = NULL; - opal_mutex_lock(&wpool->mutex); if (!opal_list_is_empty(list)) { - item = (_winfo_list_item_t *)opal_list_get_first(list); - opal_list_remove_item(list, &item->super); + winfo = (opal_common_ucx_winfo_t *)opal_list_get_first(list); + opal_list_remove_item(list, &winfo->super); } - opal_mutex_unlock(&wpool->mutex); - if (item != NULL) { - winfo = item->ptr; - OBJ_RELEASE(item); - } return winfo; } static opal_common_ucx_winfo_t * -_wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size) +_wpool_get_winfo(opal_common_ucx_wpool_t *wpool, size_t comm_size) { opal_common_ucx_winfo_t *winfo; + opal_mutex_lock(&wpool->mutex); winfo = _wpool_list_get(wpool, &wpool->idle_workers); if (!winfo) { winfo = _winfo_create(wpool); if (!winfo) { MCA_COMMON_UCX_ERROR("Failed to allocate worker info structure"); + opal_mutex_unlock(&wpool->mutex); return NULL; } } @@ -362,13 +325,24 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size) winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h)); winfo->inflight_ops = calloc(comm_size, sizeof(short)); winfo->comm_size = comm_size; + + /* Put the worker on the active list */ + _wpool_list_put(wpool, &wpool->active_workers, winfo); + + opal_mutex_unlock(&wpool->mutex); + return winfo; } -static int -_wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo) +static void +_wpool_put_winfo(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo) { - return _wpool_list_put(wpool, &wpool->active_workers, winfo); + opal_mutex_lock(&wpool->mutex); + opal_list_remove_item(&wpool->active_workers, &winfo->super); + opal_list_prepend(&wpool->idle_workers, &winfo->super); + opal_mutex_unlock(&wpool->mutex); + + return; } /* ----------------------------------------------------------------------------- @@ -428,21 +402,12 @@ opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx) _tlocal_ctx_rec_cleanup(ctx_rec); } - /* Make sure that all the loads/stores are complete */ - opal_atomic_mb(); - - _common_ucx_wpctx_free(ctx); -} - -/* Final cleanup of the context structure - * once all references cleared */ -static void -_common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx) -{ free(ctx->recv_worker_addrs); free(ctx->recv_worker_displs); + OBJ_DESTRUCT(&ctx->mutex); OBJ_DESTRUCT(&ctx->ctx_records); + free(ctx); } @@ -573,8 +538,6 @@ void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem) OBJ_DESTRUCT(&mem->mem_records); - opal_atomic_mb(); - free(mem->mem_addrs); free(mem->mem_displs); @@ -603,28 +566,21 @@ static void _ctx_rec_destructor(void *arg) { static void _tlocal_ctx_rec_cleanup(_ctx_record_t *ctx_rec) { - opal_common_ucx_winfo_t *winfo = NULL; - opal_common_ucx_wpool_t *wpool = NULL; - if (NULL == ctx_rec) { return; } - winfo = ctx_rec->winfo; - wpool = ctx_rec->gctx->wpool; + opal_common_ucx_winfo_t *winfo = ctx_rec->winfo; + opal_common_ucx_wpool_t *wpool = ctx_rec->gctx->wpool; - opal_mutex_lock(&wpool->mutex); + /* Remove worker from active and return to idle list. */ + _wpool_put_winfo(wpool, winfo); - /* Remove worker from active and return to idle list. - * Remove the context record from the ctx list */ - opal_list_remove_item(&wpool->active_workers, &winfo->self->super); - opal_list_prepend(&wpool->idle_workers, &winfo->self->super); + /* Remove the context record from the ctx list. */ opal_mutex_lock(&ctx_rec->gctx->mutex); opal_list_remove_item(&ctx_rec->gctx->ctx_records, &ctx_rec->super); opal_mutex_unlock(&ctx_rec->gctx->mutex); - opal_mutex_unlock(&wpool->mutex); - OBJ_RELEASE(ctx_rec); return; @@ -637,22 +593,17 @@ _tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx) _ctx_record_t *ctx_rec = OBJ_NEW(_ctx_record_t); if (!ctx_rec) { - return NULL; + MCA_COMMON_UCX_ERROR("Failed to allocate new ctx_rec"); + goto error1; } ctx_rec->gctx = ctx; - ctx_rec->winfo = _wpool_get_idle(ctx->wpool, ctx->comm_size); + ctx_rec->winfo = _wpool_get_winfo(ctx->wpool, ctx->comm_size); if (NULL == ctx_rec->winfo) { MCA_COMMON_UCX_ERROR("Failed to allocate new worker"); - OBJ_RELEASE(ctx_rec); - return NULL; + goto error2; } - opal_atomic_wmb(); - - /* Add this worker to the active worker list */ - _wpool_add_active(ctx->wpool, ctx_rec->winfo); - /* Add ctx_rec to list */ opal_mutex_lock(&ctx->mutex); opal_list_append(&ctx->ctx_records, &ctx_rec->super); @@ -661,12 +612,22 @@ _tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx) /* Add tls reference to record */ rc = opal_tsd_tracked_key_set(&ctx->tls_key, ctx_rec); if (OPAL_SUCCESS != rc) { - OBJ_RELEASE(ctx_rec); - return NULL; + MCA_COMMON_UCX_ERROR("Failed to set ctx_rec tls key"); + goto error3; } /* All good - return the record */ return ctx_rec; + +error3: + opal_mutex_lock(&ctx->mutex); + opal_list_remove_item(&ctx->ctx_records, &ctx_rec->super); + opal_mutex_unlock(&ctx->mutex); + _wpool_put_winfo(ctx->wpool, ctx_rec->winfo); +error2: + OBJ_RELEASE(ctx_rec); +error1: + return NULL; } static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target) @@ -708,11 +669,13 @@ _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec) return; } + opal_mutex_lock(&mem_rec->winfo->mutex); for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) { if (mem_rec->rkeys[i]) { ucp_rkey_destroy(mem_rec->rkeys[i]); } } + opal_mutex_unlock(&mem_rec->winfo->mutex); free(mem_rec->rkeys); /* Remove item from the list */ @@ -733,23 +696,20 @@ static _mem_record_t *_tlocal_add_mem_rec(opal_common_ucx_wpmem_t *mem, _ctx_rec return NULL; } - opal_mutex_lock(&mem->mutex); - opal_list_append(&mem->mem_records, &mem_rec->super); - opal_mutex_unlock(&mem->mutex); - - mem_rec->gmem = mem; mem_rec->ctx_rec = ctx_rec; mem_rec->winfo = ctx_rec->winfo; mem_rec->rkeys = calloc(mem->ctx->comm_size, sizeof(*mem_rec->rkeys)); - opal_atomic_wmb(); - rc = opal_tsd_tracked_key_set(&mem->tls_key, mem_rec); if (OPAL_SUCCESS != rc) { return NULL; } + opal_mutex_lock(&mem->mutex); + opal_list_append(&mem->mem_records, &mem_rec->super); + opal_mutex_unlock(&mem->mutex); + return mem_rec; } @@ -875,9 +835,9 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, (NULL == winfo->endpoints[target])) { continue; } + opal_mutex_lock(&winfo->mutex); rc = opal_common_ucx_winfo_flush(winfo, target, OPAL_COMMON_UCX_FLUSH_B, scope, NULL); - opal_mutex_lock(&winfo->mutex); switch (scope) { case OPAL_COMMON_UCX_SCOPE_WORKER: winfo->global_inflight_ops = 0; diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index babc45121f..2de331bbc5 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -110,8 +110,6 @@ typedef struct { opal_tsd_tracked_key_t tls_key; } opal_common_ucx_wpmem_t; -typedef struct __winfo_list_item_t _winfo_list_item_t; - /* The structure that wraps UCP worker and holds the state that is required * for its use. * The structure is allocated along with UCP worker on demand and is being held @@ -119,9 +117,8 @@ typedef struct __winfo_list_item_t _winfo_list_item_t; * One wpmem is intended per shared memory segment (i.e. MPI Window). */ typedef struct opal_common_ucx_winfo { + opal_list_item_t super; opal_recursive_mutex_t mutex; - _winfo_list_item_t *self; - volatile int released; ucp_worker_h worker; ucp_ep_h *endpoints; size_t comm_size; @@ -129,6 +126,7 @@ typedef struct opal_common_ucx_winfo { short global_inflight_ops; ucs_status_ptr_t inflight_req; } opal_common_ucx_winfo_t; +OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t); typedef void (*opal_common_ucx_user_req_handler_t)(void *request); @@ -163,12 +161,6 @@ typedef enum { OPAL_COMMON_UCX_MEM_MAP } opal_common_ucx_mem_type_t; -struct __winfo_list_item_t{ - opal_list_item_t super; - opal_common_ucx_winfo_t *ptr; -}; -OBJ_CLASS_DECLARATION(_winfo_list_item_t); - typedef struct { opal_list_item_t super; opal_common_ucx_ctx_t *gctx; @@ -216,7 +208,7 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target, opal_common_ucx_winfo_t **_winfo) { _mem_record_t *mem_rec = NULL; - int expr; + int is_ready; int rc = OPAL_SUCCESS; /* First check the fast-path */ @@ -224,9 +216,10 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target, if (OPAL_SUCCESS != rc) { return rc; } - expr = mem_rec && (NULL != mem_rec->winfo) && (mem_rec->winfo->endpoints[target]) && + is_ready = mem_rec && (mem_rec->winfo->endpoints[target]) && (NULL != mem_rec->rkeys[target]); - if (OPAL_UNLIKELY(!expr)) { + MCA_COMMON_UCX_ASSERT((NULL == mem_rec) || (NULL != mem_rec->winfo)); + if (OPAL_UNLIKELY(!is_ready)) { rc = opal_common_ucx_tlocal_fetch_spath(mem, target); if (OPAL_SUCCESS != rc) { return rc; diff --git a/opal/mca/common/ucx/common_ucx_wpool_int.h b/opal/mca/common/ucx/common_ucx_wpool_int.h index 2d50e6caa6..2dcbf2c970 100644 --- a/opal/mca/common/ucx/common_ucx_wpool_int.h +++ b/opal/mca/common/ucx/common_ucx_wpool_int.h @@ -12,25 +12,9 @@ static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int targ /* Internal Worker Information (winfo) management */ static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool); -static void _winfo_release(opal_common_ucx_winfo_t *winfo); -static void _winfo_reset(opal_common_ucx_winfo_t *winfo); +static void _winfo_destructor(opal_common_ucx_winfo_t *winfo); -/* Internal Worker Pool (wpool) management */ -static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list, - opal_common_ucx_winfo_t *winfo); -static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list, - opal_common_ucx_winfo_t *winfo); -static opal_common_ucx_winfo_t *_wpool_list_get(opal_common_ucx_wpool_t *wpool, - opal_list_t *list); -static opal_common_ucx_winfo_t *_wpool_get_idle(opal_common_ucx_wpool_t *wpool, - size_t comm_size); -static int _wpool_add_active(opal_common_ucx_wpool_t *wpool, - opal_common_ucx_winfo_t *winfo); - -/* Internal Worker Pool Context management */ -static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx); - -/* Internal Worker Pool Memeory management */ +/* Internal Worker Pool Memory management */ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool, void **base, size_t size, ucp_mem_h *memh_ptr, opal_common_ucx_mem_type_t mem_type); diff --git a/opal/util/net.c b/opal/util/net.c index b51e39bdf9..7c16d03d5d 100644 --- a/opal/util/net.c +++ b/opal/util/net.c @@ -474,11 +474,6 @@ opal_net_init() int opal_net_finalize() { -#if OPAL_ENABLE_IPV6 - if (NULL != hostname_tsd_key) { - OBJ_RELEASE(hostname_tsd_key); - } -#endif return OPAL_SUCCESS; }