1
1

UCX osc: make progress on default worker if none are active

Signed-off-by: Joseph Schuchart <schuchart@hlrs.de>
Этот коммит содержится в:
Joseph Schuchart 2020-04-15 11:59:33 +02:00
родитель 3ea0658f4d
Коммит 581478dc91
5 изменённых файлов: 40 добавлений и 16 удалений

Просмотреть файл

@ -279,7 +279,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0); ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
} }
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
usleep(100); usleep(100);
} while (1); } while (1);
} }

Просмотреть файл

@ -279,7 +279,7 @@ static inline int start_atomicity(
break; break;
} }
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
} }
*lock_acquired = true; *lock_acquired = true;

Просмотреть файл

@ -42,7 +42,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
} else { } else {
break; break;
} }
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
} }
return ret; return ret;
@ -70,8 +70,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
if (result_value == TARGET_LOCK_UNLOCKED) { if (result_value == TARGET_LOCK_UNLOCKED) {
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
} }
} }

Просмотреть файл

@ -59,7 +59,7 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
goto exit; goto exit;
} }
winfo = calloc(1, sizeof(*winfo)); winfo = OBJ_NEW(opal_common_ucx_winfo_t);
if (NULL == winfo) { if (NULL == winfo) {
MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info"); MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
goto release_worker; goto release_worker;
@ -194,9 +194,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
rc = OPAL_ERROR; rc = OPAL_ERROR;
goto err_worker_create; goto err_worker_create;
} }
wpool->dflt_worker = winfo->worker; wpool->dflt_winfo = winfo;
OBJ_RETAIN(wpool->dflt_winfo);
status = ucp_worker_get_address(wpool->dflt_worker, status = ucp_worker_get_address(wpool->dflt_winfo->worker,
&wpool->recv_waddr, &wpool->recv_waddr_len); &wpool->recv_waddr, &wpool->recv_waddr_len);
if (status != UCS_OK) { if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status); MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
@ -214,8 +215,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
err_wpool_add: err_wpool_add:
free(wpool->recv_waddr); free(wpool->recv_waddr);
err_get_addr: err_get_addr:
if (NULL != wpool->dflt_worker) { if (NULL != wpool) {
ucp_worker_destroy(wpool->dflt_worker); OBJ_RELEASE(winfo);
OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;
} }
err_worker_create: err_worker_create:
ucp_cleanup(wpool->ucp_ctx); ucp_cleanup(wpool->ucp_ctx);
@ -233,7 +236,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
/* Release the address here. recv worker will be released /* Release the address here. recv worker will be released
* below along with other idle workers */ * below along with other idle workers */
ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr); ucp_worker_release_address(wpool->dflt_winfo->worker, wpool->recv_waddr);
/* Go over the list, free idle list items */ /* Go over the list, free idle list items */
if (!opal_list_is_empty(&wpool->idle_workers)) { if (!opal_list_is_empty(&wpool->idle_workers)) {
@ -258,6 +261,9 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
} }
OBJ_DESTRUCT(&wpool->active_workers); OBJ_DESTRUCT(&wpool->active_workers);
OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;
OBJ_DESTRUCT(&wpool->mutex); OBJ_DESTRUCT(&wpool->mutex);
ucp_cleanup(wpool->ucp_ctx); ucp_cleanup(wpool->ucp_ctx);
return; return;
@ -272,17 +278,33 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
/* Go over all active workers and progress them /* Go over all active workers and progress them
* TODO: may want to have some partitioning to progress only part of * TODO: may want to have some partitioning to progress only part of
* workers */ * workers */
opal_mutex_lock(&wpool->mutex); if (0 != opal_mutex_trylock(&wpool->mutex)) {
return completed;
}
bool progress_dflt_worker = true;
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers, OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
opal_common_ucx_winfo_t) { opal_common_ucx_winfo_t) {
opal_mutex_lock(&winfo->mutex); if (0 != opal_mutex_trylock(&winfo->mutex)) {
continue;
}
do { do {
if (winfo == wpool->dflt_winfo) {
progress_dflt_worker = false;
}
progressed = ucp_worker_progress(winfo->worker); progressed = ucp_worker_progress(winfo->worker);
completed += progressed; completed += progressed;
} while (progressed); } while (progressed);
opal_mutex_unlock(&winfo->mutex); opal_mutex_unlock(&winfo->mutex);
} }
opal_mutex_unlock(&wpool->mutex); opal_mutex_unlock(&wpool->mutex);
if (progress_dflt_worker) {
/* make sure to progress at least some */
opal_mutex_lock(&wpool->dflt_winfo->mutex);
completed += ucp_worker_progress(wpool->dflt_winfo->worker);
opal_mutex_unlock(&wpool->dflt_winfo->mutex);
}
return completed; return completed;
} }

Просмотреть файл

@ -30,6 +30,9 @@
BEGIN_C_DECLS BEGIN_C_DECLS
/* fordward declaration */
typedef struct opal_common_ucx_winfo opal_common_ucx_winfo_t;
/* Worker pool is a global object that that is allocated per component or can be /* Worker pool is a global object that that is allocated per component or can be
* shared between multiple compatible components. * shared between multiple compatible components.
* The lifetime of this object is normally equal to the lifetime of a component[s]. * The lifetime of this object is normally equal to the lifetime of a component[s].
@ -42,7 +45,7 @@ typedef struct {
/* UCX data */ /* UCX data */
ucp_context_h ucp_ctx; ucp_context_h ucp_ctx;
ucp_worker_h dflt_worker; opal_common_ucx_winfo_t *dflt_winfo;
ucp_address_t *recv_waddr; ucp_address_t *recv_waddr;
size_t recv_waddr_len; size_t recv_waddr_len;
@ -116,7 +119,7 @@ typedef struct {
* in the Worker Pool lists (either active or idle). * in the Worker Pool lists (either active or idle).
* One wpmem is intended per shared memory segment (i.e. MPI Window). * One wpmem is intended per shared memory segment (i.e. MPI Window).
*/ */
typedef struct opal_common_ucx_winfo { struct opal_common_ucx_winfo {
opal_list_item_t super; opal_list_item_t super;
opal_recursive_mutex_t mutex; opal_recursive_mutex_t mutex;
ucp_worker_h worker; ucp_worker_h worker;
@ -125,7 +128,7 @@ typedef struct opal_common_ucx_winfo {
short *inflight_ops; short *inflight_ops;
short global_inflight_ops; short global_inflight_ops;
ucs_status_ptr_t inflight_req; ucs_status_ptr_t inflight_req;
} opal_common_ucx_winfo_t; };
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t); OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);
typedef void (*opal_common_ucx_user_req_handler_t)(void *request); typedef void (*opal_common_ucx_user_req_handler_t)(void *request);