From 6b7acdf21fc0fbb2e2eeb0ae2cfef85b2a5346cb Mon Sep 17 00:00:00 2001 From: Artem Polyakov Date: Tue, 27 Nov 2018 11:05:51 -0800 Subject: [PATCH] opal/common/ucx: Somplify Worker Pool context management Signed-off-by: Artem Polyakov --- opal/mca/common/ucx/common_ucx_wpool.c | 178 +++++++++------------ opal/mca/common/ucx/common_ucx_wpool.h | 5 +- opal/mca/common/ucx/common_ucx_wpool_int.h | 8 +- 3 files changed, 83 insertions(+), 108 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index bfcd4e4858..91553af703 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -389,20 +389,15 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size, opal_common_ucx_ctx_t **ctx_ptr) { opal_common_ucx_ctx_t *ctx = calloc(1, sizeof(*ctx)); - pthread_rwlockattr_t attr; int ret = OPAL_SUCCESS; ctx->ctx_id = OPAL_ATOMIC_ADD_FETCH32(&wpool->cur_ctxid, 1); DBG_OUT("ctx_create: ctx_id = %d\n", (int)ctx->ctx_id); - //OBJ_CONSTRUCT(&ctx->mutex, opal_mutex_t); - // TODO: check return codes - pthread_rwlockattr_init(&attr); - pthread_rwlock_init(&ctx->rwlock, &attr); - pthread_rwlockattr_destroy(&attr); - + OBJ_CONSTRUCT(&ctx->mutex, opal_mutex_t); OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t); ctx->released = 0; + ctx->refcntr = 1; /* application holding the context */ ctx->wpool = wpool; ctx->comm_size = comm_size; @@ -430,39 +425,26 @@ error: OPAL_DECLSPEC void opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx) { - _ctx_record_list_item_t *item = NULL; - int can_free = 0; + int my_refcntr = -1; DBG_OUT("opal_common_ucx_ctx_release: ctx = %p\n", (void *)ctx); - /* Mark all the contexts and workers as being released - * Threads involved in this context will reconize this - * through the "released" flag and will perform cleanup - * in a deferred manner. - * If Worker pool will be destroyed earlier - wpool filalize - * will take care of this. - */ + /* Application is expected to guarantee that no operation + * is performed on the context that is being released */ - //opal_mutex_lock(&ctx->mutex); - pthread_rwlock_rdlock(&ctx->rwlock); - - /* Mark that this function has been called */ + /* Mark that this context was released by application + * Threads will use this flag to perform deferred cleanup */ ctx->released = 1; - /* Go over all TLS subscribed to this context and mark - * that this handler is no longer in use */ - OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) { - item->ptr->released = 1; - } - if (0 == opal_list_get_size(&ctx->tls_workers)) { - can_free = 1; - } + /* Decrement the reference counter */ + my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1); - //opal_mutex_unlock(&ctx->mutex); - pthread_rwlock_unlock(&ctx->rwlock); + /* Make sure that all the loads/stores are complete */ + opal_atomic_mb(); - - if (can_free) { + /* If there is no more references to this handler + * we can release it */ + if (0 == my_refcntr) { _common_ucx_wpctx_free(ctx); } } @@ -474,7 +456,7 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx) { free(ctx->recv_worker_addrs); free(ctx->recv_worker_displs); - //OBJ_DESTRUCT(&ctx->mutex); + OBJ_DESTRUCT(&ctx->mutex); OBJ_DESTRUCT(&ctx->tls_workers); DBG_OUT("_common_ucx_ctx_free: ctx = %p\n", (void *)ctx); free(ctx); @@ -482,56 +464,63 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx) /* Subscribe a new TLS to this context */ static int -_common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx, _tlocal_ctx_t *ctx_rec) +_common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx, + opal_common_ucx_winfo_t *winfo) { _ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t); if (NULL == item) { return OPAL_ERR_OUT_OF_RESOURCE; } - item->ptr = ctx_rec; + /* Increment the reference counter */ + OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1); - //opal_mutex_lock(&ctx->mutex); - pthread_rwlock_wrlock(&ctx->rwlock); + /* Add new worker to the context */ + item->ptr = winfo; + opal_mutex_lock(&ctx->mutex); opal_list_append(&ctx->tls_workers, &item->super); - //opal_mutex_unlock(&ctx->mutex); - pthread_rwlock_unlock(&ctx->rwlock); + opal_mutex_unlock(&ctx->mutex); - DBG_OUT("_common_ucx_ctx_append: ctx = %p, ctx_rec = %p\n", - (void *)ctx, (void *)ctx_rec); + DBG_OUT("_common_ucx_wpctx_append: ctx = %p, winfo = %p\n", + (void *)ctx, (void *)winfo); return OPAL_SUCCESS; } /* Unsubscribe a particular TLS to this context */ static void -_common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, _tlocal_ctx_t *ctx_rec) +_common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, + opal_common_ucx_winfo_t *winfo) { - int can_free = 0; _ctx_record_list_item_t *item = NULL, *next; + int my_refcntr = -1; - // opal_mutex_lock(&ctx->mutex); - pthread_rwlock_wrlock(&ctx->rwlock); + opal_mutex_lock(&ctx->mutex); OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers, _ctx_record_list_item_t) { - if (ctx_rec == item->ptr) { + if (winfo == item->ptr) { opal_list_remove_item(&ctx->tls_workers, &item->super); + opal_mutex_lock(&winfo->mutex); + winfo->released = 1; + opal_mutex_unlock(&winfo->mutex); OBJ_RELEASE(item); break; } } - if (0 == opal_list_get_size(&ctx->tls_workers) && ctx->released) { - can_free = 1; - } - //opal_mutex_unlock(&ctx->mutex); - pthread_rwlock_unlock(&ctx->rwlock); + opal_mutex_unlock(&ctx->mutex); - if (can_free) { - /* All references to this data structure are removed - * we can safely release communication context structure */ + /* Make sure that all the loads/stores are complete */ + opal_atomic_mb(); + + /* Decrement the reference counter */ + my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1); + + if (0 == my_refcntr) { + /* All references to this data structure were removed + * We can safely release communication context structure */ _common_ucx_wpctx_free(ctx); } - DBG_OUT("_common_ucx_ctx_remove: ctx = %p, ctx_rec = %p\n", - (void *)ctx, (void *)ctx_rec); + DBG_OUT("_common_ucx_wpctx_remove: ctx = %p, ctx_rec = %p\n", + (void *)ctx, (void *)winfo); return; } @@ -911,20 +900,13 @@ _tlocal_ctx_search(_tlocal_table_t *tls, int ctx_id) static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec) { - opal_common_ucx_winfo_t *winfo = ctx_rec->winfo; if (0 == ctx_rec->ctx_id) { return OPAL_SUCCESS; } /* Remove myself from the communication context structure * This may result in context release as we are using * delayed cleanup */ - _common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec); - - /* Mark this winfo as free and it will be garbage-collected by - * progress or flush function */ - opal_mutex_lock(&winfo->mutex); - winfo->released = 1; - opal_mutex_unlock(&winfo->mutex); + _common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo); /* Erase the record so it can be reused */ memset(ctx_rec, 0, sizeof(*ctx_rec)); @@ -936,25 +918,25 @@ _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec) static _tlocal_ctx_t * _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx) { - size_t i; + size_t i, free_idx = -1; int rc; - /* Try to find available record in the TLS table */ + /* Try to find available record in the TLS table + * In parallel perform deferred cleanups */ for (i=0; ictx_tbl_size; i++) { - if (0 == tls->ctx_tbl[i]->ctx_id) { - /* Found clean record */ - break; - } - if (tls->ctx_tbl[i]->released ) { + if (tls->ctx_tbl[i]->gctx->released ) { /* Found dirty record, need to clean first */ _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]); - break; + } + if ((0 == tls->ctx_tbl[i]->ctx_id) && (0 > free_idx)) { + /* Found clean record */ + free_idx = i; } } /* if needed - extend the table */ - if( i >= tls->ctx_tbl_size ){ - i = tls->ctx_tbl_size; + if (0 > free_idx) { + free_idx = tls->ctx_tbl_size; rc = _tlocal_tls_ctxtbl_extend(tls, 4); if (rc) { //TODO: error out @@ -962,10 +944,10 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx) } } - tls->ctx_tbl[i]->ctx_id = ctx->ctx_id; - tls->ctx_tbl[i]->gctx = ctx; - tls->ctx_tbl[i]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size); - if (NULL == tls->ctx_tbl[i]->winfo) { + tls->ctx_tbl[free_idx]->ctx_id = ctx->ctx_id; + tls->ctx_tbl[free_idx]->gctx = ctx; + tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size); + if (NULL == tls->ctx_tbl[free_idx]->winfo) { MCA_COMMON_UCX_ERROR("Failed to allocate new worker"); return NULL; } @@ -978,21 +960,21 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx) opal_atomic_wmb(); /* Add this worker to the active worker list */ - _wpool_add_active(tls->wpool, tls->ctx_tbl[i]->winfo); + _wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo); /* add this worker into the context list */ - rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[i]); + rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo); if (rc) { //TODO: error out return NULL; } DBG_OUT("_tlocal_add_ctx: tls = %p, ctx_rec = %p, winfo = %p\n", - (void *)tls, (void *)&tls->ctx_tbl[i], - (void *)tls->ctx_tbl[i]->winfo); + (void *)tls, (void *)&tls->ctx_tbl[free_idx], + (void *)tls->ctx_tbl[free_idx]->winfo); /* All good - return the record */ - return tls->ctx_tbl[i]; + return tls->ctx_tbl[free_idx]; } static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target) @@ -1037,7 +1019,6 @@ _tlocal_search_mem(_tlocal_table_t *tls, int mem_id) return NULL; } - static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec) { @@ -1244,41 +1225,38 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, DBG_OUT("opal_common_ucx_mem_flush: mem = %p, target = %d\n", (void *)mem, target); - // TODO: make this as a read lock - //opal_mutex_lock(&ctx->mutex); - pthread_rwlock_rdlock(&ctx->rwlock); + opal_mutex_lock(&ctx->mutex); OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) { switch (scope) { case OPAL_COMMON_UCX_SCOPE_WORKER: - opal_mutex_lock(&item->ptr->winfo->mutex); - rc = opal_common_ucx_worker_flush(item->ptr->winfo->worker); + opal_mutex_lock(&item->ptr->mutex); + rc = opal_common_ucx_worker_flush(item->ptr->worker); if (rc != OPAL_SUCCESS) { MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_worker_flush failed: %d", rc); rc = OPAL_ERROR; } DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): worker = %p\n", - (void *)item->ptr->winfo->worker); - opal_mutex_unlock(&item->ptr->winfo->mutex); + (void *)item->ptr->worker); + opal_mutex_unlock(&item->ptr->mutex); break; case OPAL_COMMON_UCX_SCOPE_EP: - if (NULL != item->ptr->winfo->endpoints[target] ) { - opal_mutex_lock(&item->ptr->winfo->mutex); - rc = opal_common_ucx_ep_flush(item->ptr->winfo->endpoints[target], - item->ptr->winfo->worker); + if (NULL != item->ptr->endpoints[target] ) { + opal_mutex_lock(&item->ptr->mutex); + rc = opal_common_ucx_ep_flush(item->ptr->endpoints[target], + item->ptr->worker); if (rc != OPAL_SUCCESS) { MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_ep_flush failed: %d", rc); rc = OPAL_ERROR; } DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): ep = %p worker = %p\n", - (void *)item->ptr->winfo->endpoints[target], - (void *)item->ptr->winfo->worker); - opal_mutex_unlock(&item->ptr->winfo->mutex); + (void *)item->ptr->endpoints[target], + (void *)item->ptr->worker); + opal_mutex_unlock(&item->ptr->mutex); } } } - //opal_mutex_unlock(&ctx->mutex); - pthread_rwlock_unlock(&ctx->rwlock); + opal_mutex_unlock(&ctx->mutex); return rc; } diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 3f217f2da7..d526f05d07 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -22,7 +22,6 @@ typedef struct { /* Ref counting & locking*/ int refcnt; opal_mutex_t mutex; - //pthread_rwlock_t rwlock; /* UCX data */ ucp_context_h ucp_ctx; @@ -44,8 +43,8 @@ typedef struct { typedef struct { int ctx_id; - //opal_mutex_t mutex; - pthread_rwlock_t rwlock; + opal_mutex_t mutex; + opal_atomic_int32_t refcntr; /* the reference to a Worker pool this context belongs to*/ opal_common_ucx_wpool_t *wpool; diff --git a/opal/mca/common/ucx/common_ucx_wpool_int.h b/opal/mca/common/ucx/common_ucx_wpool_int.h index efe893a534..ef7bf9a943 100644 --- a/opal/mca/common/ucx/common_ucx_wpool_int.h +++ b/opal/mca/common/ucx/common_ucx_wpool_int.h @@ -7,8 +7,6 @@ typedef struct { int ctx_id; - // TODO: make sure that this is being set by external thread - volatile int released; opal_common_ucx_ctx_t *gctx; opal_common_ucx_winfo_t *winfo; } _tlocal_ctx_t; @@ -35,7 +33,7 @@ OBJ_CLASS_DECLARATION(_winfo_list_item_t); typedef struct { opal_list_item_t super; - _tlocal_ctx_t *ptr; + opal_common_ucx_winfo_t *ptr; } _ctx_record_list_item_t; OBJ_CLASS_DECLARATION(_ctx_record_list_item_t); @@ -99,9 +97,9 @@ static int _wpool_add_active(opal_common_ucx_wpool_t *wpool, /* Internal Worker Pool Context management */ static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx); static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx, - _tlocal_ctx_t *ctx_rec); + opal_common_ucx_winfo_t *winfo); static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, - _tlocal_ctx_t *ctx_rec); + opal_common_ucx_winfo_t *winfo); /* Internal Worker Pool Memeory management */ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,