From 581478dc919a78be096db7d8fafde6e5938a8af8 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 15 Apr 2020 11:59:33 +0200 Subject: [PATCH] UCX osc: make progress on default worker if none are active Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_active_target.c | 2 +- ompi/mca/osc/ucx/osc_ucx_comm.c | 2 +- ompi/mca/osc/ucx/osc_ucx_passive_target.c | 5 ++- opal/mca/common/ucx/common_ucx_wpool.c | 38 ++++++++++++++++++----- opal/mca/common/ucx/common_ucx_wpool.h | 9 ++++-- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_active_target.c b/ompi/mca/osc/ucx/osc_ucx_active_target.c index 17a01d6061..1c1aff65c1 100644 --- a/ompi/mca/osc/ucx/osc_ucx_active_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_active_target.c @@ -279,7 +279,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0); } - ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); + opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool); usleep(100); } while (1); } diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 43e1fe59c1..72c1fdf92f 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -279,7 +279,7 @@ static inline int start_atomicity( break; } - ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); + opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool); } *lock_acquired = true; diff --git a/ompi/mca/osc/ucx/osc_ucx_passive_target.c b/ompi/mca/osc/ucx/osc_ucx_passive_target.c index 93188cd705..92fd15d187 100644 --- a/ompi/mca/osc/ucx/osc_ucx_passive_target.c +++ b/ompi/mca/osc/ucx/osc_ucx_passive_target.c @@ -42,7 +42,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) { } else { break; } - ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); + opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool); } return ret; @@ -70,8 +70,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) { if (result_value == TARGET_LOCK_UNLOCKED) { return OMPI_SUCCESS; } - - ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); + opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool); } } diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index dd4c13ec3c..960a95f9bd 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -59,7 +59,7 @@ _winfo_create(opal_common_ucx_wpool_t *wpool) goto exit; } - winfo = calloc(1, sizeof(*winfo)); + winfo = OBJ_NEW(opal_common_ucx_winfo_t); if (NULL == winfo) { MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info"); goto release_worker; @@ -194,9 +194,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, rc = OPAL_ERROR; goto err_worker_create; } - wpool->dflt_worker = winfo->worker; + wpool->dflt_winfo = winfo; + OBJ_RETAIN(wpool->dflt_winfo); - status = ucp_worker_get_address(wpool->dflt_worker, + status = ucp_worker_get_address(wpool->dflt_winfo->worker, &wpool->recv_waddr, &wpool->recv_waddr_len); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status); @@ -214,8 +215,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, err_wpool_add: free(wpool->recv_waddr); err_get_addr: - if (NULL != wpool->dflt_worker) { - ucp_worker_destroy(wpool->dflt_worker); + if (NULL != wpool) { + OBJ_RELEASE(winfo); + OBJ_RELEASE(wpool->dflt_winfo); + wpool->dflt_winfo = NULL; } err_worker_create: ucp_cleanup(wpool->ucp_ctx); @@ -233,7 +236,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) /* Release the address here. recv worker will be released * below along with other idle workers */ - ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr); + ucp_worker_release_address(wpool->dflt_winfo->worker, wpool->recv_waddr); /* Go over the list, free idle list items */ if (!opal_list_is_empty(&wpool->idle_workers)) { @@ -258,6 +261,9 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) } OBJ_DESTRUCT(&wpool->active_workers); + OBJ_RELEASE(wpool->dflt_winfo); + wpool->dflt_winfo = NULL; + OBJ_DESTRUCT(&wpool->mutex); ucp_cleanup(wpool->ucp_ctx); return; @@ -272,17 +278,33 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool) /* Go over all active workers and progress them * TODO: may want to have some partitioning to progress only part of * workers */ - opal_mutex_lock(&wpool->mutex); + if (0 != opal_mutex_trylock(&wpool->mutex)) { + return completed; + } + + bool progress_dflt_worker = true; OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers, opal_common_ucx_winfo_t) { - opal_mutex_lock(&winfo->mutex); + if (0 != opal_mutex_trylock(&winfo->mutex)) { + continue; + } do { + if (winfo == wpool->dflt_winfo) { + progress_dflt_worker = false; + } progressed = ucp_worker_progress(winfo->worker); completed += progressed; } while (progressed); opal_mutex_unlock(&winfo->mutex); } opal_mutex_unlock(&wpool->mutex); + + if (progress_dflt_worker) { + /* make sure to progress at least some */ + opal_mutex_lock(&wpool->dflt_winfo->mutex); + completed += ucp_worker_progress(wpool->dflt_winfo->worker); + opal_mutex_unlock(&wpool->dflt_winfo->mutex); + } return completed; } diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 2de331bbc5..ce5ef4d3b4 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -30,6 +30,9 @@ BEGIN_C_DECLS +/* fordward declaration */ +typedef struct opal_common_ucx_winfo opal_common_ucx_winfo_t; + /* Worker pool is a global object that that is allocated per component or can be * shared between multiple compatible components. * The lifetime of this object is normally equal to the lifetime of a component[s]. @@ -42,7 +45,7 @@ typedef struct { /* UCX data */ ucp_context_h ucp_ctx; - ucp_worker_h dflt_worker; + opal_common_ucx_winfo_t *dflt_winfo; ucp_address_t *recv_waddr; size_t recv_waddr_len; @@ -116,7 +119,7 @@ typedef struct { * in the Worker Pool lists (either active or idle). * One wpmem is intended per shared memory segment (i.e. MPI Window). */ -typedef struct opal_common_ucx_winfo { +struct opal_common_ucx_winfo { opal_list_item_t super; opal_recursive_mutex_t mutex; ucp_worker_h worker; @@ -125,7 +128,7 @@ typedef struct opal_common_ucx_winfo { short *inflight_ops; short global_inflight_ops; ucs_status_ptr_t inflight_req; -} opal_common_ucx_winfo_t; +}; OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t); typedef void (*opal_common_ucx_user_req_handler_t)(void *request);