Merge pull request #7632 from devreal/osc-ucx-progress
UCX osc: make progress on idle worker if none are active
Этот коммит содержится в:
Коммит
f9ef4b4ac0
@ -279,7 +279,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
|
||||
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
|
||||
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
|
||||
usleep(100);
|
||||
} while (1);
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ static inline int start_atomicity(
|
||||
break;
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
|
||||
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
|
||||
}
|
||||
|
||||
*lock_acquired = true;
|
||||
|
@ -42,7 +42,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
|
||||
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -70,8 +70,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
|
||||
if (result_value == TARGET_LOCK_UNLOCKED) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
|
||||
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
|
||||
goto exit;
|
||||
}
|
||||
|
||||
winfo = calloc(1, sizeof(*winfo));
|
||||
winfo = OBJ_NEW(opal_common_ucx_winfo_t);
|
||||
if (NULL == winfo) {
|
||||
MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
|
||||
goto release_worker;
|
||||
@ -194,9 +194,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
|
||||
rc = OPAL_ERROR;
|
||||
goto err_worker_create;
|
||||
}
|
||||
wpool->dflt_worker = winfo->worker;
|
||||
wpool->dflt_winfo = winfo;
|
||||
OBJ_RETAIN(wpool->dflt_winfo);
|
||||
|
||||
status = ucp_worker_get_address(wpool->dflt_worker,
|
||||
status = ucp_worker_get_address(wpool->dflt_winfo->worker,
|
||||
&wpool->recv_waddr, &wpool->recv_waddr_len);
|
||||
if (status != UCS_OK) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
|
||||
@ -214,8 +215,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
|
||||
err_wpool_add:
|
||||
free(wpool->recv_waddr);
|
||||
err_get_addr:
|
||||
if (NULL != wpool->dflt_worker) {
|
||||
ucp_worker_destroy(wpool->dflt_worker);
|
||||
if (NULL != wpool) {
|
||||
OBJ_RELEASE(winfo);
|
||||
OBJ_RELEASE(wpool->dflt_winfo);
|
||||
wpool->dflt_winfo = NULL;
|
||||
}
|
||||
err_worker_create:
|
||||
ucp_cleanup(wpool->ucp_ctx);
|
||||
@ -233,7 +236,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
|
||||
|
||||
/* Release the address here. recv worker will be released
|
||||
* below along with other idle workers */
|
||||
ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
|
||||
ucp_worker_release_address(wpool->dflt_winfo->worker, wpool->recv_waddr);
|
||||
|
||||
/* Go over the list, free idle list items */
|
||||
if (!opal_list_is_empty(&wpool->idle_workers)) {
|
||||
@ -258,6 +261,9 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
|
||||
}
|
||||
OBJ_DESTRUCT(&wpool->active_workers);
|
||||
|
||||
OBJ_RELEASE(wpool->dflt_winfo);
|
||||
wpool->dflt_winfo = NULL;
|
||||
|
||||
OBJ_DESTRUCT(&wpool->mutex);
|
||||
ucp_cleanup(wpool->ucp_ctx);
|
||||
return;
|
||||
@ -272,17 +278,33 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
|
||||
/* Go over all active workers and progress them
|
||||
* TODO: may want to have some partitioning to progress only part of
|
||||
* workers */
|
||||
opal_mutex_lock(&wpool->mutex);
|
||||
if (0 != opal_mutex_trylock(&wpool->mutex)) {
|
||||
return completed;
|
||||
}
|
||||
|
||||
bool progress_dflt_worker = true;
|
||||
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
|
||||
opal_common_ucx_winfo_t) {
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
if (0 != opal_mutex_trylock(&winfo->mutex)) {
|
||||
continue;
|
||||
}
|
||||
do {
|
||||
if (winfo == wpool->dflt_winfo) {
|
||||
progress_dflt_worker = false;
|
||||
}
|
||||
progressed = ucp_worker_progress(winfo->worker);
|
||||
completed += progressed;
|
||||
} while (progressed);
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
}
|
||||
opal_mutex_unlock(&wpool->mutex);
|
||||
|
||||
if (progress_dflt_worker) {
|
||||
/* make sure to progress at least some */
|
||||
opal_mutex_lock(&wpool->dflt_winfo->mutex);
|
||||
completed += ucp_worker_progress(wpool->dflt_winfo->worker);
|
||||
opal_mutex_unlock(&wpool->dflt_winfo->mutex);
|
||||
}
|
||||
return completed;
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,9 @@
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/* fordward declaration */
|
||||
typedef struct opal_common_ucx_winfo opal_common_ucx_winfo_t;
|
||||
|
||||
/* Worker pool is a global object that that is allocated per component or can be
|
||||
* shared between multiple compatible components.
|
||||
* The lifetime of this object is normally equal to the lifetime of a component[s].
|
||||
@ -42,7 +45,7 @@ typedef struct {
|
||||
|
||||
/* UCX data */
|
||||
ucp_context_h ucp_ctx;
|
||||
ucp_worker_h dflt_worker;
|
||||
opal_common_ucx_winfo_t *dflt_winfo;
|
||||
ucp_address_t *recv_waddr;
|
||||
size_t recv_waddr_len;
|
||||
|
||||
@ -116,7 +119,7 @@ typedef struct {
|
||||
* in the Worker Pool lists (either active or idle).
|
||||
* One wpmem is intended per shared memory segment (i.e. MPI Window).
|
||||
*/
|
||||
typedef struct opal_common_ucx_winfo {
|
||||
struct opal_common_ucx_winfo {
|
||||
opal_list_item_t super;
|
||||
opal_recursive_mutex_t mutex;
|
||||
ucp_worker_h worker;
|
||||
@ -125,7 +128,7 @@ typedef struct opal_common_ucx_winfo {
|
||||
short *inflight_ops;
|
||||
short global_inflight_ops;
|
||||
ucs_status_ptr_t inflight_req;
|
||||
} opal_common_ucx_winfo_t;
|
||||
};
|
||||
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);
|
||||
|
||||
typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user