opal/common/ucx: Simplify Worker Pool memory handler
Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
родитель
6b7acdf21f
Коммит
f38c9f3e5f
@ -509,11 +509,14 @@ _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_mb();
|
||||
opal_atomic_rmb();
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
|
||||
|
||||
/* a counterpart to the rmb above */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
/* All references to this data structure were removed
|
||||
* We can safely release communication context structure */
|
||||
@ -546,9 +549,8 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
|
||||
DBG_OUT("mem_create: mem_id = %d\n", (int)mem->mem_id);
|
||||
|
||||
OBJ_CONSTRUCT(&mem->mutex, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mem->registrations, opal_list_t);
|
||||
mem->released = 0;
|
||||
mem->refcntr = 1; /* application holding this memory handler */
|
||||
mem->ctx = ctx;
|
||||
mem->mem_addrs = NULL;
|
||||
mem->mem_displs = NULL;
|
||||
@ -596,8 +598,6 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
error_rkey_pack:
|
||||
ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh);
|
||||
error_mem_map:
|
||||
OBJ_DESTRUCT(&mem->mutex);
|
||||
OBJ_DESTRUCT(&mem->registrations);
|
||||
free(mem);
|
||||
(*mem_ptr) = NULL;
|
||||
return ret;
|
||||
@ -606,22 +606,18 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
OPAL_DECLSPEC int
|
||||
opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
_mem_record_list_item_t *item = NULL;
|
||||
int can_free = 0;
|
||||
int my_refcntr = -1;
|
||||
|
||||
opal_mutex_lock(&mem->mutex);
|
||||
/* Mark that this function has been called */
|
||||
/* Mark that this memory handler has been called */
|
||||
mem->released = 1;
|
||||
/* Go over all TLS subscribed to this context and mark
|
||||
* that this handler is no longer in use */
|
||||
OPAL_LIST_FOREACH(item, &mem->registrations, _mem_record_list_item_t) {
|
||||
item->ptr->released = 1;
|
||||
}
|
||||
if(0 == opal_list_get_size(&mem->registrations)){
|
||||
can_free = 1;
|
||||
}
|
||||
opal_mutex_unlock(&mem->mutex);
|
||||
if (can_free) {
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
|
||||
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpmem_free(mem);
|
||||
}
|
||||
return OPAL_SUCCESS;
|
||||
@ -687,59 +683,48 @@ static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
|
||||
free(mem->mem_addrs);
|
||||
free(mem->mem_displs);
|
||||
ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh);
|
||||
OBJ_DESTRUCT(&mem->mutex);
|
||||
OBJ_DESTRUCT(&mem->registrations);
|
||||
DBG_OUT("_common_ucx_mem_free: mem = %p\n", (void *)mem);
|
||||
free(mem);
|
||||
}
|
||||
|
||||
static int
|
||||
_common_ucx_wpmem_append(opal_common_ucx_wpmem_t *mem,
|
||||
_tlocal_mem_t *mem_rec)
|
||||
_common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
_mem_record_list_item_t *item = OBJ_NEW(_mem_record_list_item_t);
|
||||
if (NULL == item) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
item->ptr = mem_rec;
|
||||
opal_mutex_lock(&mem->mutex);
|
||||
opal_list_append(&mem->registrations, &item->super);
|
||||
opal_mutex_unlock(&mem->mutex);
|
||||
DBG_OUT("_common_ucx_mem_append: mem = %p, mem_rec = %p\n", (void *)mem, (void *)mem_rec);
|
||||
/* Increment the reference counter */
|
||||
OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1);
|
||||
|
||||
DBG_OUT("_common_ucx_wpmem_signup: mem = %p\n", (void *)mem);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
static void
|
||||
_common_ucx_mem_remove(opal_common_ucx_wpmem_t *mem, _tlocal_mem_t *mem_rec)
|
||||
_common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
int can_free = 0;
|
||||
_mem_record_list_item_t *item = NULL, *next;
|
||||
int my_refcntr = -1;
|
||||
|
||||
opal_mutex_lock(&mem->mutex);
|
||||
OPAL_LIST_FOREACH_SAFE(item, next, &mem->registrations,
|
||||
_mem_record_list_item_t) {
|
||||
if (mem_rec == item->ptr) {
|
||||
opal_list_remove_item(&mem->registrations, &item->super);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (0 == opal_list_get_size(&mem->registrations)) {
|
||||
can_free = 1;
|
||||
}
|
||||
opal_mutex_unlock(&mem->mutex);
|
||||
/* Make sure that all the loads are complete at this
|
||||
* point so if somebody else will see refcntr ==0
|
||||
* and release the structure we would have all we need
|
||||
*/
|
||||
opal_atomic_rmb();
|
||||
|
||||
if (can_free) {
|
||||
/* All references to this data structure are removed
|
||||
* we can safely release communication context structure */
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
|
||||
|
||||
/* a counterpart to the rmb above */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpmem_free(mem);
|
||||
}
|
||||
DBG_OUT("_common_ucx_mem_remove(end): mem = %p mem_rec = %p\n",
|
||||
(void *)mem, (void *)mem_rec);
|
||||
|
||||
DBG_OUT("_common_ucx_mem_signoff: mem = %p\n", (void *)mem);
|
||||
return;
|
||||
}
|
||||
|
||||
/* TLS management functions */
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Worker Pool TLS management functions management functionality
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
// TODO: don't want to inline this function
|
||||
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
|
||||
@ -1024,14 +1009,14 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
|
||||
{
|
||||
size_t i;
|
||||
DBG_OUT("_tlocal_mem_record_cleanup: record=%p, is_freed = %d\n",
|
||||
(void *)mem_rec, mem_rec->released);
|
||||
if (mem_rec->released) {
|
||||
(void *)mem_rec, mem_rec->gmem->released);
|
||||
if (mem_rec->gmem->released) {
|
||||
return;
|
||||
}
|
||||
/* Remove myself from the memory context structure
|
||||
* This may result in context release as we are using
|
||||
* delayed cleanup */
|
||||
_common_ucx_mem_remove(mem_rec->gmem, mem_rec);
|
||||
_common_ucx_mem_signout(mem_rec->gmem);
|
||||
DBG_OUT("_tlocal_mem_record_cleanup(_common_ucx_mem_remove): gmem = %p mem_rec = %p\n",
|
||||
(void *)mem_rec->gmem, (void *)mem_rec);
|
||||
|
||||
@ -1059,26 +1044,27 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
|
||||
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
size_t i;
|
||||
size_t i, free_idx = -1;
|
||||
_tlocal_ctx_t *ctx_rec = NULL;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
/* Try to find available spot in the table */
|
||||
for (i=0; i<tls->mem_tbl_size; i++) {
|
||||
if (0 == tls->mem_tbl[i]->mem_id) {
|
||||
/* Found a clear record */
|
||||
}
|
||||
if (tls->mem_tbl[i]->released) {
|
||||
if (tls->mem_tbl[i]->gmem->released) {
|
||||
/* Found a dirty record. Need to clean it first */
|
||||
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
|
||||
DBG_OUT("_tlocal_add_mem(after _tlocal_mem_record_cleanup): tls = %p mem_tbl_entry = %p\n",
|
||||
(void *)tls, (void *)tls->mem_tbl[i]);
|
||||
break;
|
||||
}
|
||||
if ((0 == tls->mem_tbl[i]->mem_id) && (0 > free_idx)) {
|
||||
/* Found a clear record */
|
||||
free_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
if( i >= tls->mem_tbl_size ){
|
||||
i = tls->mem_tbl_size;
|
||||
if (0 > free_idx){
|
||||
free_idx = tls->mem_tbl_size;
|
||||
rc = _tlocal_tls_memtbl_extend(tls, 4);
|
||||
if (rc != OPAL_SUCCESS) {
|
||||
//TODO: error out
|
||||
@ -1087,10 +1073,10 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
DBG_OUT("_tlocal_add_mem(after _tlocal_tls_memtbl_extend): tls = %p\n",
|
||||
(void *)tls);
|
||||
}
|
||||
tls->mem_tbl[i]->mem_id = mem->mem_id;
|
||||
tls->mem_tbl[i]->gmem = mem;
|
||||
tls->mem_tbl[i]->released = 0;
|
||||
tls->mem_tbl[i]->mem = calloc(1, sizeof(*tls->mem_tbl[i]->mem));
|
||||
tls->mem_tbl[free_idx]->mem_id = mem->mem_id;
|
||||
tls->mem_tbl[free_idx]->gmem = mem;
|
||||
tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem));
|
||||
|
||||
ctx_rec = _tlocal_ctx_search(tls, mem->ctx->ctx_id);
|
||||
if (NULL == ctx_rec) {
|
||||
// TODO: act accordingly - cleanup
|
||||
@ -1099,15 +1085,15 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
DBG_OUT("_tlocal_add_mem(after _tlocal_ctx_search): tls = %p, ctx_id = %d\n",
|
||||
(void *)tls, (int)mem->ctx->ctx_id);
|
||||
|
||||
tls->mem_tbl[i]->mem->worker = ctx_rec->winfo;
|
||||
tls->mem_tbl[i]->mem->rkeys = calloc(mem->ctx->comm_size,
|
||||
sizeof(*tls->mem_tbl[i]->mem->rkeys));
|
||||
tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo;
|
||||
tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size,
|
||||
sizeof(*tls->mem_tbl[free_idx]->mem->rkeys));
|
||||
|
||||
tls->mem_tbl[i]->mem_tls_ptr =
|
||||
calloc(1, sizeof(*tls->mem_tbl[i]->mem_tls_ptr));
|
||||
tls->mem_tbl[i]->mem_tls_ptr->winfo = ctx_rec->winfo;
|
||||
tls->mem_tbl[i]->mem_tls_ptr->rkeys = tls->mem_tbl[i]->mem->rkeys;
|
||||
pthread_setspecific(mem->mem_tls_key, tls->mem_tbl[i]->mem_tls_ptr);
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr =
|
||||
calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr));
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo;
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys;
|
||||
pthread_setspecific(mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr);
|
||||
|
||||
/* Make sure that we completed all the data structures before
|
||||
* placing the item to the list
|
||||
@ -1116,15 +1102,15 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
*/
|
||||
opal_atomic_wmb();
|
||||
|
||||
rc = _common_ucx_wpmem_append(mem, tls->mem_tbl[i]);
|
||||
rc = _common_ucx_wpmem_signup(mem);
|
||||
if (rc) {
|
||||
// TODO: error handling
|
||||
return NULL;
|
||||
}
|
||||
DBG_OUT("_tlocal_add_mem(after _common_ucx_mem_append): mem = %p, mem_tbl_entry = %p\n",
|
||||
(void *)mem, (void *)tls->mem_tbl[i]);
|
||||
(void *)mem, (void *)tls->mem_tbl[free_idx]);
|
||||
|
||||
return tls->mem_tbl[i];
|
||||
return tls->mem_tbl[free_idx];
|
||||
}
|
||||
|
||||
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
|
||||
|
@ -62,20 +62,18 @@ typedef struct {
|
||||
|
||||
typedef struct {
|
||||
int mem_id;
|
||||
opal_mutex_t mutex;
|
||||
/* reference context to which memory region belongs */
|
||||
opal_common_ucx_ctx_t *ctx;
|
||||
|
||||
/* object lifetime control */
|
||||
volatile int released;
|
||||
opal_atomic_int32_t refcntr;
|
||||
|
||||
/* UCX memory handler */
|
||||
ucp_mem_h memh;
|
||||
char *mem_addrs;
|
||||
int *mem_displs;
|
||||
|
||||
/* list of TLS components that become
|
||||
* assosiated with this mem region */
|
||||
opal_list_t registrations;
|
||||
volatile int released;
|
||||
|
||||
/* TLS item that allows each thread to
|
||||
* store endpoints and rkey arrays
|
||||
* for faster access */
|
||||
|
@ -18,7 +18,6 @@ typedef struct {
|
||||
|
||||
typedef struct {
|
||||
int mem_id;
|
||||
volatile int released;
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
_mem_info_t *mem;
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *mem_tls_ptr;
|
||||
@ -106,10 +105,8 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
opal_common_ucx_mem_type_t mem_type);
|
||||
static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
static int _common_ucx_wpmem_append(opal_common_ucx_wpmem_t *mem,
|
||||
_tlocal_mem_t *mem_rec);
|
||||
static void _common_ucx_mem_remove(opal_common_ucx_wpmem_t *mem,
|
||||
_tlocal_mem_t *mem_rec);
|
||||
static int _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem);
|
||||
static void _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
|
||||
#endif // COMMON_UCX_WPOOL_INT_H
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user