1
1

Further code reduction and simplifications.

Co-authored-by: Artem Polyakov <artpol84@gmail.com>

Signed-off-by: Tomislav Janjusic <tomislavj@mellanox.com>
Этот коммит содержится в:
Tommy Janjusic 2020-07-29 09:47:15 -05:00 коммит произвёл Tomislav Janjusic
родитель cbfc9a3263
Коммит 2c8da2c0a9
4 изменённых файлов: 85 добавлений и 153 удалений

Просмотреть файл

@ -20,7 +20,7 @@
*******************************************************************************
******************************************************************************/
OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL);
OBJ_CLASS_INSTANCE(opal_common_ucx_winfo_t, opal_list_item_t, NULL, _winfo_destructor);
OBJ_CLASS_INSTANCE(_ctx_record_t, opal_list_item_t, NULL, NULL);
OBJ_CLASS_INSTANCE(_mem_record_t, opal_list_item_t, NULL, NULL);
@ -69,7 +69,6 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
winfo->worker = worker;
winfo->endpoints = NULL;
winfo->comm_size = 0;
winfo->released = 0;
winfo->inflight_ops = NULL;
winfo->global_inflight_ops = 0;
winfo->inflight_req = UCS_OK;
@ -83,7 +82,7 @@ exit:
}
static void
_winfo_reset(opal_common_ucx_winfo_t *winfo)
_winfo_destructor(opal_common_ucx_winfo_t *winfo)
{
if (winfo->inflight_req != UCS_OK) {
opal_common_ucx_wait_request_mt(winfo->inflight_req,
@ -106,15 +105,9 @@ _winfo_reset(opal_common_ucx_winfo_t *winfo)
}
winfo->endpoints = NULL;
winfo->comm_size = 0;
winfo->released = 0;
}
static void
_winfo_release(opal_common_ucx_winfo_t *winfo)
{
OBJ_DESTRUCT(&winfo->mutex);
ucp_worker_destroy(winfo->worker);
free(winfo);
}
/* -----------------------------------------------------------------------------
@ -137,6 +130,9 @@ opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
free(wpool);
}
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
opal_common_ucx_winfo_t *winfo);
OPAL_DECLSPEC int
opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int proc_world_size, bool enable_mt)
@ -158,7 +154,7 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
status = ucp_config_read("MPI", NULL, &config);
if (UCS_OK != status) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
return OPAL_ERROR;
}
/* initialize UCP context */
@ -241,12 +237,11 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
/* Go over the list, free idle list items */
if (!opal_list_is_empty(&wpool->idle_workers)) {
_winfo_list_item_t *item, *next;
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers,
_winfo_list_item_t) {
opal_list_remove_item(&wpool->idle_workers, &item->super);
_winfo_release(item->ptr);
OBJ_RELEASE(item);
opal_common_ucx_winfo_t *winfo, *next;
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->idle_workers,
opal_common_ucx_winfo_t) {
opal_list_remove_item(&wpool->idle_workers, &winfo->super);
OBJ_RELEASE(winfo);
}
}
OBJ_DESTRUCT(&wpool->idle_workers);
@ -254,18 +249,15 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
/* Release active workers. They are no longer active actually
* because opal_common_ucx_wpool_finalize is being called. */
if (!opal_list_is_empty(&wpool->active_workers)) {
_winfo_list_item_t *item, *next;
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
_winfo_list_item_t) {
opal_list_remove_item(&wpool->active_workers, &item->super);
_winfo_reset(item->ptr);
_winfo_release(item->ptr);
OBJ_RELEASE(item);
opal_common_ucx_winfo_t *winfo, *next;
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
opal_common_ucx_winfo_t) {
opal_list_remove_item(&wpool->active_workers, &winfo->super);
OBJ_RELEASE(winfo);
}
}
OBJ_DESTRUCT(&wpool->active_workers);
OBJ_DESTRUCT(&wpool->mutex);
ucp_cleanup(wpool->ucp_ctx);
return;
@ -274,34 +266,23 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
OPAL_DECLSPEC int
opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
{
_winfo_list_item_t *item = NULL, *next = NULL;
opal_common_ucx_winfo_t *winfo = NULL, *next = NULL;
int completed = 0, progressed = 0;
/* Go over all active workers and progress them
* TODO: may want to have some partitioning to progress only part of
* workers */
if (!opal_mutex_trylock (&wpool->mutex)) {
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
_winfo_list_item_t) {
opal_common_ucx_winfo_t *winfo = item->ptr;
opal_mutex_lock(&winfo->mutex);
if( OPAL_UNLIKELY(winfo->released) ) {
/* Do garbage collection of worker info's if needed */
opal_list_remove_item(&wpool->active_workers, &item->super);
_winfo_reset(winfo);
opal_list_append(&wpool->idle_workers, &item->super);
completed++;
} else {
/* Progress worker until there are existing events */
do {
progressed = ucp_worker_progress(winfo->worker);
completed += progressed;
} while (progressed);
}
opal_mutex_unlock(&winfo->mutex);
}
opal_mutex_unlock(&wpool->mutex);
opal_mutex_lock(&wpool->mutex);
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
opal_common_ucx_winfo_t) {
opal_mutex_lock(&winfo->mutex);
do {
progressed = ucp_worker_progress(winfo->worker);
completed += progressed;
} while (progressed);
opal_mutex_unlock(&winfo->mutex);
}
opal_mutex_unlock(&wpool->mutex);
return completed;
}
@ -309,20 +290,7 @@ static int
_wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
opal_common_ucx_winfo_t *winfo)
{
_winfo_list_item_t *item;
item = OBJ_NEW(_winfo_list_item_t);
if (NULL == item) {
MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item");
return OPAL_ERR_OUT_OF_RESOURCE;
}
item->ptr = winfo;
winfo->self = item;
opal_mutex_lock(&wpool->mutex);
opal_list_append(list, &item->super);
opal_mutex_unlock(&wpool->mutex);
opal_list_append(list, &winfo->super);
return OPAL_SUCCESS;
}
@ -330,31 +298,26 @@ static opal_common_ucx_winfo_t*
_wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list)
{
opal_common_ucx_winfo_t *winfo = NULL;
_winfo_list_item_t *item = NULL;
opal_mutex_lock(&wpool->mutex);
if (!opal_list_is_empty(list)) {
item = (_winfo_list_item_t *)opal_list_get_first(list);
opal_list_remove_item(list, &item->super);
winfo = (opal_common_ucx_winfo_t *)opal_list_get_first(list);
opal_list_remove_item(list, &winfo->super);
}
opal_mutex_unlock(&wpool->mutex);
if (item != NULL) {
winfo = item->ptr;
OBJ_RELEASE(item);
}
return winfo;
}
static opal_common_ucx_winfo_t *
_wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
_wpool_get_winfo(opal_common_ucx_wpool_t *wpool, size_t comm_size)
{
opal_common_ucx_winfo_t *winfo;
opal_mutex_lock(&wpool->mutex);
winfo = _wpool_list_get(wpool, &wpool->idle_workers);
if (!winfo) {
winfo = _winfo_create(wpool);
if (!winfo) {
MCA_COMMON_UCX_ERROR("Failed to allocate worker info structure");
opal_mutex_unlock(&wpool->mutex);
return NULL;
}
}
@ -362,13 +325,24 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h));
winfo->inflight_ops = calloc(comm_size, sizeof(short));
winfo->comm_size = comm_size;
/* Put the worker on the active list */
_wpool_list_put(wpool, &wpool->active_workers, winfo);
opal_mutex_unlock(&wpool->mutex);
return winfo;
}
static int
_wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
static void
_wpool_put_winfo(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
{
return _wpool_list_put(wpool, &wpool->active_workers, winfo);
opal_mutex_lock(&wpool->mutex);
opal_list_remove_item(&wpool->active_workers, &winfo->super);
opal_list_prepend(&wpool->idle_workers, &winfo->super);
opal_mutex_unlock(&wpool->mutex);
return;
}
/* -----------------------------------------------------------------------------
@ -428,21 +402,12 @@ opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
_tlocal_ctx_rec_cleanup(ctx_rec);
}
/* Make sure that all the loads/stores are complete */
opal_atomic_mb();
_common_ucx_wpctx_free(ctx);
}
/* Final cleanup of the context structure
* once all references cleared */
static void
_common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
{
free(ctx->recv_worker_addrs);
free(ctx->recv_worker_displs);
OBJ_DESTRUCT(&ctx->mutex);
OBJ_DESTRUCT(&ctx->ctx_records);
free(ctx);
}
@ -573,8 +538,6 @@ void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
OBJ_DESTRUCT(&mem->mem_records);
opal_atomic_mb();
free(mem->mem_addrs);
free(mem->mem_displs);
@ -603,28 +566,21 @@ static void _ctx_rec_destructor(void *arg) {
static void
_tlocal_ctx_rec_cleanup(_ctx_record_t *ctx_rec)
{
opal_common_ucx_winfo_t *winfo = NULL;
opal_common_ucx_wpool_t *wpool = NULL;
if (NULL == ctx_rec) {
return;
}
winfo = ctx_rec->winfo;
wpool = ctx_rec->gctx->wpool;
opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
opal_common_ucx_wpool_t *wpool = ctx_rec->gctx->wpool;
opal_mutex_lock(&wpool->mutex);
/* Remove worker from active and return to idle list. */
_wpool_put_winfo(wpool, winfo);
/* Remove worker from active and return to idle list.
* Remove the context record from the ctx list */
opal_list_remove_item(&wpool->active_workers, &winfo->self->super);
opal_list_prepend(&wpool->idle_workers, &winfo->self->super);
/* Remove the context record from the ctx list. */
opal_mutex_lock(&ctx_rec->gctx->mutex);
opal_list_remove_item(&ctx_rec->gctx->ctx_records, &ctx_rec->super);
opal_mutex_unlock(&ctx_rec->gctx->mutex);
opal_mutex_unlock(&wpool->mutex);
OBJ_RELEASE(ctx_rec);
return;
@ -637,22 +593,17 @@ _tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx)
_ctx_record_t *ctx_rec = OBJ_NEW(_ctx_record_t);
if (!ctx_rec) {
return NULL;
MCA_COMMON_UCX_ERROR("Failed to allocate new ctx_rec");
goto error1;
}
ctx_rec->gctx = ctx;
ctx_rec->winfo = _wpool_get_idle(ctx->wpool, ctx->comm_size);
ctx_rec->winfo = _wpool_get_winfo(ctx->wpool, ctx->comm_size);
if (NULL == ctx_rec->winfo) {
MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
OBJ_RELEASE(ctx_rec);
return NULL;
goto error2;
}
opal_atomic_wmb();
/* Add this worker to the active worker list */
_wpool_add_active(ctx->wpool, ctx_rec->winfo);
/* Add ctx_rec to list */
opal_mutex_lock(&ctx->mutex);
opal_list_append(&ctx->ctx_records, &ctx_rec->super);
@ -661,12 +612,22 @@ _tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx)
/* Add tls reference to record */
rc = opal_tsd_tracked_key_set(&ctx->tls_key, ctx_rec);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(ctx_rec);
return NULL;
MCA_COMMON_UCX_ERROR("Failed to set ctx_rec tls key");
goto error3;
}
/* All good - return the record */
return ctx_rec;
error3:
opal_mutex_lock(&ctx->mutex);
opal_list_remove_item(&ctx->ctx_records, &ctx_rec->super);
opal_mutex_unlock(&ctx->mutex);
_wpool_put_winfo(ctx->wpool, ctx_rec->winfo);
error2:
OBJ_RELEASE(ctx_rec);
error1:
return NULL;
}
static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
@ -708,11 +669,13 @@ _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec)
return;
}
opal_mutex_lock(&mem_rec->winfo->mutex);
for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
if (mem_rec->rkeys[i]) {
ucp_rkey_destroy(mem_rec->rkeys[i]);
}
}
opal_mutex_unlock(&mem_rec->winfo->mutex);
free(mem_rec->rkeys);
/* Remove item from the list */
@ -733,23 +696,20 @@ static _mem_record_t *_tlocal_add_mem_rec(opal_common_ucx_wpmem_t *mem, _ctx_rec
return NULL;
}
opal_mutex_lock(&mem->mutex);
opal_list_append(&mem->mem_records, &mem_rec->super);
opal_mutex_unlock(&mem->mutex);
mem_rec->gmem = mem;
mem_rec->ctx_rec = ctx_rec;
mem_rec->winfo = ctx_rec->winfo;
mem_rec->rkeys = calloc(mem->ctx->comm_size, sizeof(*mem_rec->rkeys));
opal_atomic_wmb();
rc = opal_tsd_tracked_key_set(&mem->tls_key, mem_rec);
if (OPAL_SUCCESS != rc) {
return NULL;
}
opal_mutex_lock(&mem->mutex);
opal_list_append(&mem->mem_records, &mem_rec->super);
opal_mutex_unlock(&mem->mutex);
return mem_rec;
}
@ -875,9 +835,9 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
(NULL == winfo->endpoints[target])) {
continue;
}
opal_mutex_lock(&winfo->mutex);
rc = opal_common_ucx_winfo_flush(winfo, target, OPAL_COMMON_UCX_FLUSH_B,
scope, NULL);
opal_mutex_lock(&winfo->mutex);
switch (scope) {
case OPAL_COMMON_UCX_SCOPE_WORKER:
winfo->global_inflight_ops = 0;

Просмотреть файл

@ -110,8 +110,6 @@ typedef struct {
opal_tsd_tracked_key_t tls_key;
} opal_common_ucx_wpmem_t;
typedef struct __winfo_list_item_t _winfo_list_item_t;
/* The structure that wraps UCP worker and holds the state that is required
* for its use.
* The structure is allocated along with UCP worker on demand and is being held
@ -119,9 +117,8 @@ typedef struct __winfo_list_item_t _winfo_list_item_t;
* One wpmem is intended per shared memory segment (i.e. MPI Window).
*/
typedef struct opal_common_ucx_winfo {
opal_list_item_t super;
opal_recursive_mutex_t mutex;
_winfo_list_item_t *self;
volatile int released;
ucp_worker_h worker;
ucp_ep_h *endpoints;
size_t comm_size;
@ -129,6 +126,7 @@ typedef struct opal_common_ucx_winfo {
short global_inflight_ops;
ucs_status_ptr_t inflight_req;
} opal_common_ucx_winfo_t;
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);
typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
@ -163,12 +161,6 @@ typedef enum {
OPAL_COMMON_UCX_MEM_MAP
} opal_common_ucx_mem_type_t;
struct __winfo_list_item_t{
opal_list_item_t super;
opal_common_ucx_winfo_t *ptr;
};
OBJ_CLASS_DECLARATION(_winfo_list_item_t);
typedef struct {
opal_list_item_t super;
opal_common_ucx_ctx_t *gctx;
@ -216,7 +208,7 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
opal_common_ucx_winfo_t **_winfo)
{
_mem_record_t *mem_rec = NULL;
int expr;
int is_ready;
int rc = OPAL_SUCCESS;
/* First check the fast-path */
@ -224,9 +216,10 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
if (OPAL_SUCCESS != rc) {
return rc;
}
expr = mem_rec && (NULL != mem_rec->winfo) && (mem_rec->winfo->endpoints[target]) &&
is_ready = mem_rec && (mem_rec->winfo->endpoints[target]) &&
(NULL != mem_rec->rkeys[target]);
if (OPAL_UNLIKELY(!expr)) {
MCA_COMMON_UCX_ASSERT((NULL == mem_rec) || (NULL != mem_rec->winfo));
if (OPAL_UNLIKELY(!is_ready)) {
rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
if (OPAL_SUCCESS != rc) {
return rc;

Просмотреть файл

@ -12,25 +12,9 @@ static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int targ
/* Internal Worker Information (winfo) management */
static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool);
static void _winfo_release(opal_common_ucx_winfo_t *winfo);
static void _winfo_reset(opal_common_ucx_winfo_t *winfo);
static void _winfo_destructor(opal_common_ucx_winfo_t *winfo);
/* Internal Worker Pool (wpool) management */
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
opal_common_ucx_winfo_t *winfo);
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
opal_common_ucx_winfo_t *winfo);
static opal_common_ucx_winfo_t *_wpool_list_get(opal_common_ucx_wpool_t *wpool,
opal_list_t *list);
static opal_common_ucx_winfo_t *_wpool_get_idle(opal_common_ucx_wpool_t *wpool,
size_t comm_size);
static int _wpool_add_active(opal_common_ucx_wpool_t *wpool,
opal_common_ucx_winfo_t *winfo);
/* Internal Worker Pool Context management */
static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx);
/* Internal Worker Pool Memeory management */
/* Internal Worker Pool Memory management */
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
void **base, size_t size, ucp_mem_h *memh_ptr,
opal_common_ucx_mem_type_t mem_type);

Просмотреть файл

@ -474,11 +474,6 @@ opal_net_init()
int
opal_net_finalize()
{
#if OPAL_ENABLE_IPV6
if (NULL != hostname_tsd_key) {
OBJ_RELEASE(hostname_tsd_key);
}
#endif
return OPAL_SUCCESS;
}