From f38c9f3e5fe75bdf4969bcb6237c94de37d0117c Mon Sep 17 00:00:00 2001 From: Artem Polyakov Date: Tue, 27 Nov 2018 16:43:15 -0800 Subject: [PATCH] opal/common/ucx: Simplify Worker Pool memory handler Signed-off-by: Artem Polyakov --- opal/mca/common/ucx/common_ucx_wpool.c | 144 ++++++++++----------- opal/mca/common/ucx/common_ucx_wpool.h | 10 +- opal/mca/common/ucx/common_ucx_wpool_int.h | 7 +- 3 files changed, 71 insertions(+), 90 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx_wpool.c b/opal/mca/common/ucx/common_ucx_wpool.c index 91553af703..c1d4ecc265 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.c +++ b/opal/mca/common/ucx/common_ucx_wpool.c @@ -509,11 +509,14 @@ _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, opal_mutex_unlock(&ctx->mutex); /* Make sure that all the loads/stores are complete */ - opal_atomic_mb(); + opal_atomic_rmb(); /* Decrement the reference counter */ my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1); + /* a counterpart to the rmb above */ + opal_atomic_wmb(); + if (0 == my_refcntr) { /* All references to this data structure were removed * We can safely release communication context structure */ @@ -546,9 +549,8 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx, DBG_OUT("mem_create: mem_id = %d\n", (int)mem->mem_id); - OBJ_CONSTRUCT(&mem->mutex, opal_mutex_t); - OBJ_CONSTRUCT(&mem->registrations, opal_list_t); mem->released = 0; + mem->refcntr = 1; /* application holding this memory handler */ mem->ctx = ctx; mem->mem_addrs = NULL; mem->mem_displs = NULL; @@ -596,8 +598,6 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx, error_rkey_pack: ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh); error_mem_map: - OBJ_DESTRUCT(&mem->mutex); - OBJ_DESTRUCT(&mem->registrations); free(mem); (*mem_ptr) = NULL; return ret; @@ -606,22 +606,18 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx, OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem) { - _mem_record_list_item_t *item = NULL; - int can_free = 0; + int my_refcntr = -1; - opal_mutex_lock(&mem->mutex); - /* Mark that this function has been called */ + /* Mark that this memory handler has been called */ mem->released = 1; - /* Go over all TLS subscribed to this context and mark - * that this handler is no longer in use */ - OPAL_LIST_FOREACH(item, &mem->registrations, _mem_record_list_item_t) { - item->ptr->released = 1; - } - if(0 == opal_list_get_size(&mem->registrations)){ - can_free = 1; - } - opal_mutex_unlock(&mem->mutex); - if (can_free) { + + /* Decrement the reference counter */ + my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1); + + /* Make sure that all the loads/stores are complete */ + opal_atomic_wmb(); + + if (0 == my_refcntr) { _common_ucx_wpmem_free(mem); } return OPAL_SUCCESS; @@ -687,59 +683,48 @@ static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem) free(mem->mem_addrs); free(mem->mem_displs); ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh); - OBJ_DESTRUCT(&mem->mutex); - OBJ_DESTRUCT(&mem->registrations); DBG_OUT("_common_ucx_mem_free: mem = %p\n", (void *)mem); free(mem); } static int -_common_ucx_wpmem_append(opal_common_ucx_wpmem_t *mem, - _tlocal_mem_t *mem_rec) +_common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem) { - _mem_record_list_item_t *item = OBJ_NEW(_mem_record_list_item_t); - if (NULL == item) { - return OPAL_ERR_OUT_OF_RESOURCE; - } - item->ptr = mem_rec; - opal_mutex_lock(&mem->mutex); - opal_list_append(&mem->registrations, &item->super); - opal_mutex_unlock(&mem->mutex); - DBG_OUT("_common_ucx_mem_append: mem = %p, mem_rec = %p\n", (void *)mem, (void *)mem_rec); + /* Increment the reference counter */ + OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1); + + DBG_OUT("_common_ucx_wpmem_signup: mem = %p\n", (void *)mem); return OPAL_SUCCESS; } static void -_common_ucx_mem_remove(opal_common_ucx_wpmem_t *mem, _tlocal_mem_t *mem_rec) +_common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem) { - int can_free = 0; - _mem_record_list_item_t *item = NULL, *next; + int my_refcntr = -1; - opal_mutex_lock(&mem->mutex); - OPAL_LIST_FOREACH_SAFE(item, next, &mem->registrations, - _mem_record_list_item_t) { - if (mem_rec == item->ptr) { - opal_list_remove_item(&mem->registrations, &item->super); - OBJ_RELEASE(item); - break; - } - } - if (0 == opal_list_get_size(&mem->registrations)) { - can_free = 1; - } - opal_mutex_unlock(&mem->mutex); + /* Make sure that all the loads are complete at this + * point so if somebody else will see refcntr ==0 + * and release the structure we would have all we need + */ + opal_atomic_rmb(); - if (can_free) { - /* All references to this data structure are removed - * we can safely release communication context structure */ + /* Decrement the reference counter */ + my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1); + + /* a counterpart to the rmb above */ + opal_atomic_wmb(); + + if (0 == my_refcntr) { _common_ucx_wpmem_free(mem); } - DBG_OUT("_common_ucx_mem_remove(end): mem = %p mem_rec = %p\n", - (void *)mem, (void *)mem_rec); + + DBG_OUT("_common_ucx_mem_signoff: mem = %p\n", (void *)mem); return; } -/* TLS management functions */ +/* ----------------------------------------------------------------------------- + * Worker Pool TLS management functions management functionality + *----------------------------------------------------------------------------*/ // TODO: don't want to inline this function static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool) @@ -1024,14 +1009,14 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec) { size_t i; DBG_OUT("_tlocal_mem_record_cleanup: record=%p, is_freed = %d\n", - (void *)mem_rec, mem_rec->released); - if (mem_rec->released) { + (void *)mem_rec, mem_rec->gmem->released); + if (mem_rec->gmem->released) { return; } /* Remove myself from the memory context structure * This may result in context release as we are using * delayed cleanup */ - _common_ucx_mem_remove(mem_rec->gmem, mem_rec); + _common_ucx_mem_signout(mem_rec->gmem); DBG_OUT("_tlocal_mem_record_cleanup(_common_ucx_mem_remove): gmem = %p mem_rec = %p\n", (void *)mem_rec->gmem, (void *)mem_rec); @@ -1059,26 +1044,27 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec) static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *mem) { - size_t i; + size_t i, free_idx = -1; _tlocal_ctx_t *ctx_rec = NULL; int rc = OPAL_SUCCESS; /* Try to find available spot in the table */ for (i=0; imem_tbl_size; i++) { - if (0 == tls->mem_tbl[i]->mem_id) { - /* Found a clear record */ - } - if (tls->mem_tbl[i]->released) { + if (tls->mem_tbl[i]->gmem->released) { /* Found a dirty record. Need to clean it first */ _tlocal_mem_record_cleanup(tls->mem_tbl[i]); DBG_OUT("_tlocal_add_mem(after _tlocal_mem_record_cleanup): tls = %p mem_tbl_entry = %p\n", (void *)tls, (void *)tls->mem_tbl[i]); break; } + if ((0 == tls->mem_tbl[i]->mem_id) && (0 > free_idx)) { + /* Found a clear record */ + free_idx = i; + } } - if( i >= tls->mem_tbl_size ){ - i = tls->mem_tbl_size; + if (0 > free_idx){ + free_idx = tls->mem_tbl_size; rc = _tlocal_tls_memtbl_extend(tls, 4); if (rc != OPAL_SUCCESS) { //TODO: error out @@ -1087,10 +1073,10 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, DBG_OUT("_tlocal_add_mem(after _tlocal_tls_memtbl_extend): tls = %p\n", (void *)tls); } - tls->mem_tbl[i]->mem_id = mem->mem_id; - tls->mem_tbl[i]->gmem = mem; - tls->mem_tbl[i]->released = 0; - tls->mem_tbl[i]->mem = calloc(1, sizeof(*tls->mem_tbl[i]->mem)); + tls->mem_tbl[free_idx]->mem_id = mem->mem_id; + tls->mem_tbl[free_idx]->gmem = mem; + tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem)); + ctx_rec = _tlocal_ctx_search(tls, mem->ctx->ctx_id); if (NULL == ctx_rec) { // TODO: act accordingly - cleanup @@ -1099,15 +1085,15 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, DBG_OUT("_tlocal_add_mem(after _tlocal_ctx_search): tls = %p, ctx_id = %d\n", (void *)tls, (int)mem->ctx->ctx_id); - tls->mem_tbl[i]->mem->worker = ctx_rec->winfo; - tls->mem_tbl[i]->mem->rkeys = calloc(mem->ctx->comm_size, - sizeof(*tls->mem_tbl[i]->mem->rkeys)); + tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo; + tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size, + sizeof(*tls->mem_tbl[free_idx]->mem->rkeys)); - tls->mem_tbl[i]->mem_tls_ptr = - calloc(1, sizeof(*tls->mem_tbl[i]->mem_tls_ptr)); - tls->mem_tbl[i]->mem_tls_ptr->winfo = ctx_rec->winfo; - tls->mem_tbl[i]->mem_tls_ptr->rkeys = tls->mem_tbl[i]->mem->rkeys; - pthread_setspecific(mem->mem_tls_key, tls->mem_tbl[i]->mem_tls_ptr); + tls->mem_tbl[free_idx]->mem_tls_ptr = + calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr)); + tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo; + tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys; + pthread_setspecific(mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr); /* Make sure that we completed all the data structures before * placing the item to the list @@ -1116,15 +1102,15 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, */ opal_atomic_wmb(); - rc = _common_ucx_wpmem_append(mem, tls->mem_tbl[i]); + rc = _common_ucx_wpmem_signup(mem); if (rc) { // TODO: error handling return NULL; } DBG_OUT("_tlocal_add_mem(after _common_ucx_mem_append): mem = %p, mem_tbl_entry = %p\n", - (void *)mem, (void *)tls->mem_tbl[i]); + (void *)mem, (void *)tls->mem_tbl[free_idx]); - return tls->mem_tbl[i]; + return tls->mem_tbl[free_idx]; } static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target) diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index d526f05d07..952ec83db7 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -62,20 +62,18 @@ typedef struct { typedef struct { int mem_id; - opal_mutex_t mutex; /* reference context to which memory region belongs */ opal_common_ucx_ctx_t *ctx; + /* object lifetime control */ + volatile int released; + opal_atomic_int32_t refcntr; + /* UCX memory handler */ ucp_mem_h memh; char *mem_addrs; int *mem_displs; - /* list of TLS components that become - * assosiated with this mem region */ - opal_list_t registrations; - volatile int released; - /* TLS item that allows each thread to * store endpoints and rkey arrays * for faster access */ diff --git a/opal/mca/common/ucx/common_ucx_wpool_int.h b/opal/mca/common/ucx/common_ucx_wpool_int.h index ef7bf9a943..65b2d9c154 100644 --- a/opal/mca/common/ucx/common_ucx_wpool_int.h +++ b/opal/mca/common/ucx/common_ucx_wpool_int.h @@ -18,7 +18,6 @@ typedef struct { typedef struct { int mem_id; - volatile int released; opal_common_ucx_wpmem_t *gmem; _mem_info_t *mem; opal_common_ucx_tlocal_fast_ptrs_t *mem_tls_ptr; @@ -106,10 +105,8 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool, void **base, size_t size, ucp_mem_h *memh_ptr, opal_common_ucx_mem_type_t mem_type); static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem); -static int _common_ucx_wpmem_append(opal_common_ucx_wpmem_t *mem, - _tlocal_mem_t *mem_rec); -static void _common_ucx_mem_remove(opal_common_ucx_wpmem_t *mem, - _tlocal_mem_t *mem_rec); +static int _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem); +static void _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem); #endif // COMMON_UCX_WPOOL_INT_H