1
1
Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
Artem Polyakov 2018-12-06 20:13:12 -08:00
родитель f38c9f3e5f
Коммит fd98ee14eb
3 изменённых файлов: 151 добавлений и 113 удалений

Просмотреть файл

@ -63,7 +63,8 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
winfo->comm_size = 0;
winfo->released = 0;
DBG_OUT("_winfo_create: worker = %p\n", (void *)worker);
DBG_OUT(_dbg_winfo, "winfo = %p, worker = %p\n",
(void*)winfo, (void *)winfo->worker);
return winfo;
release_worker:
@ -87,12 +88,14 @@ _winfo_reset(opal_common_ucx_winfo_t *winfo)
winfo->endpoints = NULL;
winfo->comm_size = 0;
winfo->released = 0;
DBG_OUT(_dbg_winfo, "winfo = %p, worker = %p\n",
(void*)winfo, (void*)winfo->worker);
}
static void
_winfo_release(opal_common_ucx_winfo_t *winfo)
{
DBG_OUT("_release_ctx_worker: winfo = %p, worker = %p\n",
DBG_OUT(_dbg_winfo, "winfo = %p, worker = %p\n",
(void*)winfo, (void *)winfo->worker);
OBJ_DESTRUCT(&winfo->mutex);
ucp_worker_destroy(winfo->worker);
@ -109,7 +112,7 @@ opal_common_ucx_wpool_allocate(void)
opal_common_ucx_wpool_t *ptr = calloc(1, sizeof(opal_common_ucx_wpool_t));
ptr->refcnt = 0;
DBG_OUT("opal_common_ucx_wpool_allocate: wpool = %p\n", (void *)ptr);
DBG_OUT(_dbg_wpool, "wpool = %p\n", (void *)ptr);
return ptr;
}
@ -117,9 +120,7 @@ OPAL_DECLSPEC void
opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)
{
assert(wpool->refcnt == 0);
DBG_OUT("opal_common_ucx_wpool_free: wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p\n", (void *)wpool);
free(wpool);
}
@ -136,6 +137,8 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int rc = OPAL_SUCCESS;
wpool->refcnt++;
DBG_OUT(_dbg_wpool, "wpool = %p, refctnrt = %d\n",
(void *)wpool, wpool->refcnt);
if (1 < wpool->refcnt) {
return rc;
@ -200,7 +203,7 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
pthread_key_create(&wpool->tls_key, _tlocal_cleanup);
DBG_OUT("opal_common_ucx_wpool_init: wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p. Done\n", (void *)wpool);
return rc;
err_wpool_add:
@ -220,12 +223,11 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
{
_tlocal_table_t *tls_item = NULL, *tls_next;
DBG_OUT("opal_common_ucx_wpool_finalize(start): wpool = %p\n",
(void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p. Start\n", (void *)wpool);
wpool->refcnt--;
if (wpool->refcnt > 0) {
DBG_OUT("opal_common_ucx_wpool_finalize: wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p. Still in use\n", (void *)wpool);
return;
}
@ -238,8 +240,8 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
_tlocal_table_t) {
opal_list_remove_item(&wpool->tls_list, &tls_item->super);
_common_ucx_tls_cleanup(tls_item);
DBG_OUT("opal_common_ucx_wpool_finalize: cleanup wpool = %p\n",
(void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p. Cleanup TLS = %p\n",
(void *)wpool, (void*)tls_item);
}
OBJ_DESTRUCT(&wpool->tls_list);
@ -253,7 +255,8 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers,
_winfo_list_item_t) {
opal_list_remove_item(&wpool->idle_workers, &item->super);
// _winfo_reset(item->ptr); - should be already reset
DBG_OUT(_dbg_wpool, "wpool = %p. Cleanup idle winfo = %p\n",
(void *)wpool, (void*)item->ptr);
_winfo_release(item->ptr);
OBJ_RELEASE(item);
}
@ -268,14 +271,16 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
opal_list_remove_item(&wpool->active_workers, &item->super);
_winfo_reset(item->ptr);
_winfo_release(item->ptr);
DBG_OUT(_dbg_wpool, "wpool = %p. Cleanup active winfo = %p\n",
(void *)wpool, (void*)item->ptr);
OBJ_RELEASE(item);
}
}
OBJ_DESTRUCT(&wpool->active_workers);
//OBJ_DESTRUCT(&wpool->mutex);
OBJ_DESTRUCT(&wpool->mutex);
ucp_cleanup(wpool->ucp_ctx);
DBG_OUT("opal_common_ucx_wpool_finalize: wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p. Done\n", (void *)wpool);
return;
}
@ -287,7 +292,6 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
/* Go over all active workers and progress them
* TODO: may want to have some partitioning to progress only part of
* workers */
opal_mutex_lock(&wpool->mutex);
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers,
_winfo_list_item_t) {
@ -314,6 +318,9 @@ _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
{
_winfo_list_item_t *item;
DBG_OUT(_dbg_wpool, "wpool = %p, winfo = %p\n",
(void *)wpool, (void *)winfo);
item = OBJ_NEW(_winfo_list_item_t);
if (NULL == item) {
MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item");
@ -325,8 +332,6 @@ _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
opal_list_append(list, &item->super);
opal_mutex_unlock(&wpool->mutex);
DBG_OUT("_wpool_list_put: wpool = %p winfo = %p\n",
(void *)wpool, (void *)winfo);
return OPAL_SUCCESS;
}
@ -347,8 +352,8 @@ _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list)
winfo = item->ptr;
OBJ_RELEASE(item);
}
DBG_OUT("_wpool_remove_from_idle: wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_wpool, "wpool = %p, winfo = %p\n",
(void *)wpool, (void *)winfo);
return winfo;
}
@ -365,8 +370,9 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
}
}
DBG_OUT("_wpool_get_idle: wpool = %p winfo = %p\n",
DBG_OUT(_dbg_wpool, "wpool = %p, winfo = %p\n",
(void *)wpool, (void *)winfo);
winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h));
winfo->comm_size = comm_size;
return winfo;
@ -375,6 +381,8 @@ _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size)
static int
_wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo)
{
DBG_OUT(_dbg_wpool, "wpool = %p, winfo = %p\n",
(void *)wpool, (void *)winfo);
return _wpool_list_put(wpool, &wpool->active_workers, winfo);
}
@ -392,7 +400,7 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
int ret = OPAL_SUCCESS;
ctx->ctx_id = OPAL_ATOMIC_ADD_FETCH32(&wpool->cur_ctxid, 1);
DBG_OUT("ctx_create: ctx_id = %d\n", (int)ctx->ctx_id);
DBG_OUT(_dbg_ctx, "ctx_create: ctx_id = %d\n", (int)ctx->ctx_id);
OBJ_CONSTRUCT(&ctx->mutex, opal_mutex_t);
OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t);
@ -411,11 +419,11 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
}
(*ctx_ptr) = ctx;
DBG_OUT("opal_common_ucx_ctx_create: wpool = %p, (*ctx_ptr) = %p\n",
DBG_OUT(_dbg_ctx,"wpool = %p, ctx = %p\n",
(void *)wpool, (void *)(*ctx_ptr));
return ret;
error:
//OBJ_DESTRUCT(&ctx->mutex);
OBJ_DESTRUCT(&ctx->mutex);
OBJ_DESTRUCT(&ctx->tls_workers);
free(ctx);
(*ctx_ptr) = NULL;
@ -427,7 +435,7 @@ opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
{
int my_refcntr = -1;
DBG_OUT("opal_common_ucx_ctx_release: ctx = %p\n", (void *)ctx);
DBG_OUT(_dbg_ctx, "ctx = %p\n", (void *)ctx);
/* Application is expected to guarantee that no operation
* is performed on the context that is being released */
@ -458,7 +466,7 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
free(ctx->recv_worker_displs);
OBJ_DESTRUCT(&ctx->mutex);
OBJ_DESTRUCT(&ctx->tls_workers);
DBG_OUT("_common_ucx_ctx_free: ctx = %p\n", (void *)ctx);
DBG_OUT(_dbg_ctx, "ctx = %p\n", (void *)ctx);
free(ctx);
}
@ -480,7 +488,7 @@ _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
opal_list_append(&ctx->tls_workers, &item->super);
opal_mutex_unlock(&ctx->mutex);
DBG_OUT("_common_ucx_wpctx_append: ctx = %p, winfo = %p\n",
DBG_OUT(_dbg_ctx, "ctx = %p, winfo = %p\n",
(void *)ctx, (void *)winfo);
return OPAL_SUCCESS;
}
@ -522,8 +530,7 @@ _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
* We can safely release communication context structure */
_common_ucx_wpctx_free(ctx);
}
DBG_OUT("_common_ucx_wpctx_remove: ctx = %p, ctx_rec = %p\n",
(void *)ctx, (void *)winfo);
DBG_OUT(_dbg_ctx, "ctx = %p\n", (void *)ctx);
return;
}
@ -547,7 +554,7 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
mem->mem_id = OPAL_ATOMIC_ADD_FETCH32(&ctx->wpool->cur_memid, 1);
DBG_OUT("mem_create: mem_id = %d\n", (int)mem->mem_id);
DBG_OUT(_dbg_mem, "ctx = %p, mem_id = %d\n", (void *)ctx, (int)mem->mem_id);
mem->released = 0;
mem->refcntr = 1; /* application holding this memory handler */
@ -561,7 +568,7 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
MCA_COMMON_UCX_VERBOSE(1, "_comm_ucx_mem_map failed: %d", ret);
goto error_mem_map;
}
DBG_OUT("opal_common_ucx_mem_create: base = %p, memh = %p\n",
DBG_OUT(_dbg_mem, "\tbase = %p, memh = %p\n",
(void *)(*mem_base), (void *)(mem->memh));
status = ucp_rkey_pack(ctx->wpool->ucp_ctx, mem->memh,
@ -571,15 +578,12 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
ret = OPAL_ERROR;
goto error_rkey_pack;
}
DBG_OUT("opal_common_ucx_mem_create: rkey_addr = %p, rkey_addr_len = %d\n",
DBG_OUT(_dbg_mem, "\trkey_addr = %p, rkey_addr_len = %d\n",
(void *)rkey_addr, (int)rkey_addr_len);
ret = exchange_func(rkey_addr, rkey_addr_len,
&mem->mem_addrs, &mem->mem_displs, exchange_metadata);
DBG_OUT("opal_common_ucx_mem_create: rkey_addr = %p, rkey_addr_len = %d "
"mem_addrs = %p mem_displs = %p\n",
(void*)rkey_addr, (int)rkey_addr_len, (void *)mem->mem_addrs,
(void*)mem->mem_displs);
DBG_OUT(_dbg_mem, "\tcomplete exchange");
ucp_rkey_buffer_release(rkey_addr);
if (ret != OPAL_SUCCESS) {
@ -592,7 +596,7 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
(*mem_ptr) = mem;
DBG_OUT("opal_common_ucx_mem_create(end): mem = %p\n", (void *)mem);
DBG_OUT(_dbg_mem, "mem = %p. Done\n", (void *)mem);
return ret;
error_rkey_pack:
@ -608,6 +612,8 @@ opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
{
int my_refcntr = -1;
DBG_OUT(_dbg_mem, "mem = %p\n", (void *)mem);
/* Mark that this memory handler has been called */
mem->released = 1;
@ -633,6 +639,8 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
ucs_status_t status;
int ret = OPAL_SUCCESS;
DBG_OUT(_dbg_mem, "wpool = %p\n", (void *)wpool);
memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
UCP_MEM_MAP_PARAM_FIELD_LENGTH |
@ -651,7 +659,7 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
ret = OPAL_ERROR;
return ret;
}
DBG_OUT("_comm_ucx_mem_map(after ucp_mem_map): memh = %p\n", (void *)(*memh_ptr));
DBG_OUT(_dbg_mem, "\tmemh = %p\n", (void *)(*memh_ptr));
mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
status = ucp_mem_query((*memh_ptr), &mem_attrs);
@ -660,7 +668,7 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
ret = OPAL_ERROR;
goto error;
}
DBG_OUT("_comm_ucx_mem_map(after ucp_mem_query): memh = %p\n", (void *)(*memh_ptr));
DBG_OUT(_dbg_mem, "\tmemh = %p\n", (void *)(*memh_ptr));
assert(mem_attrs.length >= size);
if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
@ -669,7 +677,7 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
(*base) = mem_attrs.address;
}
DBG_OUT("_comm_ucx_mem_map(end): wpool = %p, addr = %p size = %d memh = %p\n",
DBG_OUT(_dbg_mem, "\twpool = %p, addr = %p size = %d memh = %p\n",
(void *)wpool, (void *)(*base), (int)size, (void *)(*memh_ptr));
return ret;
error:
@ -683,7 +691,7 @@ static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
free(mem->mem_addrs);
free(mem->mem_displs);
ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh);
DBG_OUT("_common_ucx_mem_free: mem = %p\n", (void *)mem);
DBG_OUT(_dbg_mem, "mem = %p\n", (void *)mem);
free(mem);
}
@ -693,7 +701,7 @@ _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem)
/* Increment the reference counter */
OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1);
DBG_OUT("_common_ucx_wpmem_signup: mem = %p\n", (void *)mem);
DBG_OUT(_dbg_mem, "mem = %p\n", (void *)mem);
return OPAL_SUCCESS;
}
@ -718,7 +726,7 @@ _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
_common_ucx_wpmem_free(mem);
}
DBG_OUT("_common_ucx_mem_signoff: mem = %p\n", (void *)mem);
DBG_OUT(_dbg_mem, "mem = %p\n", (void *)mem);
return;
}
@ -726,7 +734,6 @@ _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
* Worker Pool TLS management functions management functionality
*----------------------------------------------------------------------------*/
// TODO: don't want to inline this function
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
{
_tlocal_table_t *tls = OBJ_NEW(_tlocal_table_t);
@ -749,16 +756,19 @@ static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
opal_mutex_unlock(&wpool->mutex);
if(_tlocal_tls_ctxtbl_extend(tls, 4)){
DBG_OUT("_tlocal_tls_ctxtbl_extend failed\n");
// TODO: handle error
MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool context table");
return NULL;
}
if(_tlocal_tls_memtbl_extend(tls, 4)) {
DBG_OUT("_tlocal_tls_memtbl_extend failed\n");
// TODO: handle error
MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool memory table");
return NULL;
}
pthread_setspecific(wpool->tls_key, tls);
DBG_OUT("_common_ucx_tls_init(end): wpool = %p\n", (void *)wpool);
DBG_OUT(_dbg_tls, "tls = %p, wpool = %p\n",
(void *)tls, (void*)wpool);
return tls;
}
@ -768,8 +778,8 @@ _tlocal_get_tls(opal_common_ucx_wpool_t *wpool){
if( OPAL_UNLIKELY(NULL == tls) ) {
tls = _common_ucx_tls_init(wpool);
}
DBG_OUT("_tlocal_get_tls(end): wpool = %p tls = %p\n",
(void *)wpool, (void *)tls);
DBG_OUT(_dbg_tls, "tls = %p, wpool = %p\n",
(void *)tls, (void*)wpool);
return tls;
}
@ -779,13 +789,14 @@ static void _tlocal_cleanup(void *arg)
_tlocal_table_t *tls = (_tlocal_table_t *)arg;
opal_common_ucx_wpool_t *wpool = NULL;
DBG_OUT("_cleanup_tlocal: start\n");
if (NULL == tls) {
return;
}
wpool = tls->wpool;
DBG_OUT(_dbg_tls, "tls = %p, wpool = %p\n",
(void *)tls, (void*)wpool);
/* 1. Remove us from tls_list */
tls->wpool = wpool;
opal_mutex_lock(&wpool->mutex);
@ -825,10 +836,10 @@ static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)
}
pthread_setspecific(tls->wpool->tls_key, NULL);
DBG_OUT("_common_ucx_tls_cleanup(end): tls = %p\n", (void *)tls);
DBG_OUT(_dbg_tls, "tls = %p, wpool = %p\n",
(void *)tls, (void*)tls->wpool);
OBJ_RELEASE(tls);
return;
}
@ -846,7 +857,7 @@ _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append)
}
tbl->ctx_tbl_size = newsize;
DBG_OUT("_tlocal_tls_ctxtbl_extend(end): tbl = %p\n", (void *)tbl);
DBG_OUT(_dbg_tls, "new size = %d\n", (int)newsize);
return OPAL_SUCCESS;
}
@ -864,7 +875,7 @@ _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append)
}
}
tbl->mem_tbl_size = newsize;
DBG_OUT("_tlocal_tls_memtbl_extend(end): tbl = %p\n", (void *)tbl);
DBG_OUT(_dbg_tls, "new size = %d\n", (int)newsize);
return OPAL_SUCCESS;
}
@ -878,7 +889,8 @@ _tlocal_ctx_search(_tlocal_table_t *tls, int ctx_id)
return tls->ctx_tbl[i];
}
}
DBG_OUT("_tlocal_ctx_search: tls = %p ctx_id = %d\n", (void *)tls, ctx_id);
DBG_OUT(_dbg_tls, "tls = %p, ctx_id = %d\n",
(void *)tls, (int)ctx_id);
return NULL;
}
@ -893,9 +905,13 @@ _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
* delayed cleanup */
_common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo);
DBG_OUT(_dbg_tls, "wpool = %p, winfo = %p, worker = %p\n",
(void*)ctx_rec->gctx->wpool, (void*)ctx_rec->winfo,
(void*)ctx_rec->winfo->worker);
/* Erase the record so it can be reused */
memset(ctx_rec, 0, sizeof(*ctx_rec));
DBG_OUT("_tlocal_cleanup_ctx_record(end): ctx_rec = %p\n", (void *)ctx_rec);
return OPAL_SUCCESS;
}
@ -954,7 +970,7 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
return NULL;
}
DBG_OUT("_tlocal_add_ctx: tls = %p, ctx_rec = %p, winfo = %p\n",
DBG_OUT(_dbg_tls || _dbg_ctx, "tls = %p, ctx_rec = %p, winfo = %p\n",
(void *)tls, (void *)&tls->ctx_tbl[free_idx],
(void *)tls->ctx_tbl[free_idx]->winfo);
@ -982,7 +998,7 @@ static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status);
return OPAL_ERROR;
}
DBG_OUT("_tlocal_ctx_connect(after ucp_ep_create): worker = %p ep = %p\n",
DBG_OUT(_dbg_tls || _dbg_ctx, "worker = %p ep = %p\n",
(void *)winfo->worker, (void *)winfo->endpoints[target]);
opal_mutex_unlock(&winfo->mutex);
return OPAL_SUCCESS;
@ -994,7 +1010,7 @@ static inline _tlocal_mem_t *
_tlocal_search_mem(_tlocal_table_t *tls, int mem_id)
{
size_t i;
DBG_OUT("_tlocal_search_mem(begin): tls = %p mem_id = %d\n",
DBG_OUT(_dbg_tls || _dbg_mem, "tls = %p mem_id = %d\n",
(void *)tls, (int)mem_id);
for(i=0; i<tls->mem_tbl_size; i++) {
if( tls->mem_tbl[i]->mem_id == mem_id){
@ -1008,7 +1024,7 @@ static void
_tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
{
size_t i;
DBG_OUT("_tlocal_mem_record_cleanup: record=%p, is_freed = %d\n",
DBG_OUT(_dbg_tls || _dbg_mem, "record=%p, is_freed = %d\n",
(void *)mem_rec, mem_rec->gmem->released);
if (mem_rec->gmem->released) {
return;
@ -1017,13 +1033,13 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
* This may result in context release as we are using
* delayed cleanup */
_common_ucx_mem_signout(mem_rec->gmem);
DBG_OUT("_tlocal_mem_record_cleanup(_common_ucx_mem_remove): gmem = %p mem_rec = %p\n",
DBG_OUT(_dbg_tls || _dbg_mem, "gmem = %p mem_rec = %p\n",
(void *)mem_rec->gmem, (void *)mem_rec);
for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
if (mem_rec->mem->rkeys[i]) {
ucp_rkey_destroy(mem_rec->mem->rkeys[i]);
DBG_OUT("_tlocal_mem_record_cleanup(after ucp_rkey_destroy): rkey_entry = %p\n",
DBG_OUT(_dbg_tls || _dbg_mem, "rkey_entry = %p\n",
(void *)mem_rec->mem->rkeys[i]);
}
}
@ -1039,8 +1055,6 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
memset(mem_rec, 0, sizeof(*mem_rec));
}
// TODO: Don't want to inline this (slow path)
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
opal_common_ucx_wpmem_t *mem)
{
@ -1053,8 +1067,6 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
if (tls->mem_tbl[i]->gmem->released) {
/* Found a dirty record. Need to clean it first */
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
DBG_OUT("_tlocal_add_mem(after _tlocal_mem_record_cleanup): tls = %p mem_tbl_entry = %p\n",
(void *)tls, (void *)tls->mem_tbl[i]);
break;
}
if ((0 == tls->mem_tbl[i]->mem_id) && (0 > free_idx)) {
@ -1070,8 +1082,7 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
//TODO: error out
return NULL;
}
DBG_OUT("_tlocal_add_mem(after _tlocal_tls_memtbl_extend): tls = %p\n",
(void *)tls);
DBG_OUT(_dbg_tls || _dbg_mem, "tls = %p\n", (void *)tls);
}
tls->mem_tbl[free_idx]->mem_id = mem->mem_id;
tls->mem_tbl[free_idx]->gmem = mem;
@ -1107,13 +1118,15 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
// TODO: error handling
return NULL;
}
DBG_OUT("_tlocal_add_mem(after _common_ucx_mem_append): mem = %p, mem_tbl_entry = %p\n",
DBG_OUT(_dbg_tls || _dbg_mem,
"mem = %p, mem_tbl_entry = %p\n",
(void *)mem, (void *)tls->mem_tbl[free_idx]);
return tls->mem_tbl[free_idx];
}
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
static int
_tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
{
_mem_info_t *minfo = mem_rec->mem;
opal_common_ucx_wpmem_t *gmem = mem_rec->gmem;
@ -1126,13 +1139,14 @@ static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int targ
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
return OPAL_ERROR;
}
DBG_OUT("_tlocal_mem_create_rkey(after ucp_ep_rkey_unpack): mem_rec = %p ep = %p target = %d\n",
DBG_OUT(_dbg_tls || _dbg_mem, "mem_rec = %p ep = %p target = %d\n",
(void *)mem_rec, (void *)ep, target);
return OPAL_SUCCESS;
}
/* TODO: no inline */
OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
/* Get the TLS in case of slow path (not everything has been yet initialized */
OPAL_DECLSPEC int
opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
{
_tlocal_table_t *tls = NULL;
_tlocal_ctx_t *ctx_rec = NULL;
@ -1142,26 +1156,25 @@ OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *me
ucp_ep_h ep;
int rc = OPAL_SUCCESS;
DBG_OUT("_tlocal_fetch: starttls \n");
tls = _tlocal_get_tls(mem->ctx->wpool);
DBG_OUT("_tlocal_fetch: tls = %p\n",(void*)tls);
DBG_OUT(_dbg_tls || _dbg_mem, "tls = %p\n",(void*)tls);
/* Obtain the worker structure */
ctx_rec = _tlocal_ctx_search(tls, mem->ctx->ctx_id);
DBG_OUT("_tlocal_fetch(after _tlocal_ctx_search): ctx_id = %d, ctx_rec=%p\n",
DBG_OUT(_dbg_tls || _dbg_mem, "ctx_id = %d, ctx_rec=%p\n",
(int)mem->ctx->ctx_id, (void *)ctx_rec);
if (OPAL_UNLIKELY(NULL == ctx_rec)) {
ctx_rec = _tlocal_add_ctx(tls, mem->ctx);
if (NULL == ctx_rec) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
DBG_OUT("_tlocal_fetch(after _tlocal_add_ctx): tls = %p ctx = %p\n", (void *)tls, (void *)mem->ctx);
DBG_OUT("_tlocal_fetch(after _tlocal_add_ctx): tls = %p ctx = %p\n",
(void *)tls, (void *)mem->ctx);
}
winfo = ctx_rec->winfo;
DBG_OUT("_tlocal_fetch: winfo = %p ctx=%p\n", (void *)winfo, (void *)mem->ctx);
DBG_OUT(_dbg_tls || _dbg_mem, "winfo = %p ctx=%p\n",
(void *)winfo, (void *)mem->ctx);
/* Obtain the endpoint */
if (OPAL_UNLIKELY(NULL == winfo->endpoints[target])) {
@ -1169,23 +1182,26 @@ OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *me
if (rc != OPAL_SUCCESS) {
return rc;
}
DBG_OUT("_tlocal_fetch(after _tlocal_ctx_connect): ctx_rec = %p target = %d\n", (void *)ctx_rec, target);
DBG_OUT(_dbg_tls || _dbg_mem, "ctx_rec = %p target = %d\n",
(void *)ctx_rec, target);
}
ep = winfo->endpoints[target];
DBG_OUT("_tlocal_fetch: ep = %p\n", (void *)ep);
DBG_OUT(_dbg_tls || _dbg_mem, "ep = %p\n", (void *)ep);
/* Obtain the memory region info */
mem_rec = _tlocal_search_mem(tls, mem->mem_id);
DBG_OUT("_tlocal_fetch: tls = %p mem_rec = %p mem_id = %d\n", (void *)tls, (void *)mem_rec, (int)mem->mem_id);
DBG_OUT(_dbg_tls || _dbg_mem, "tls = %p mem_rec = %p mem_id = %d\n",
(void *)tls, (void *)mem_rec, (int)mem->mem_id);
if (OPAL_UNLIKELY(mem_rec == NULL)) {
mem_rec = _tlocal_add_mem(tls, mem);
DBG_OUT("_tlocal_fetch(after _tlocal_add_mem): tls = %p mem = %p\n", (void *)tls, (void *)mem);
DBG_OUT(_dbg_tls || _dbg_mem, "tls = %p mem = %p\n",
(void *)tls, (void *)mem);
if (NULL == mem_rec) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
}
mem_info = mem_rec->mem;
DBG_OUT("_tlocal_fetch: mem_info = %p\n", (void *)mem_info);
DBG_OUT(_dbg_tls || _dbg_mem, "mem_info = %p\n", (void *)mem_info);
/* Obtain the rkey */
if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) {
@ -1194,7 +1210,7 @@ OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *me
if (rc) {
return rc;
}
DBG_OUT("_tlocal_fetch: creating rkey ...\n");
DBG_OUT(_dbg_tls || _dbg_mem, "creating rkey ...\n");
}
return OPAL_SUCCESS;
@ -1209,7 +1225,7 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
opal_common_ucx_ctx_t *ctx = mem->ctx;
int rc = OPAL_SUCCESS;
DBG_OUT("opal_common_ucx_mem_flush: mem = %p, target = %d\n", (void *)mem, target);
DBG_OUT(_dbg_tls || _dbg_mem, "mem = %p, target = %d\n", (void *)mem, target);
opal_mutex_lock(&ctx->mutex);
@ -1219,10 +1235,11 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
opal_mutex_lock(&item->ptr->mutex);
rc = opal_common_ucx_worker_flush(item->ptr->worker);
if (rc != OPAL_SUCCESS) {
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_worker_flush failed: %d", rc);
MCA_COMMON_UCX_ERROR("opal_common_ucx_worker_flush failed: %d",
rc);
rc = OPAL_ERROR;
}
DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): worker = %p\n",
DBG_OUT(_dbg_tls || _dbg_mem, "worker = %p\n",
(void *)item->ptr->worker);
opal_mutex_unlock(&item->ptr->mutex);
break;
@ -1235,7 +1252,8 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_ep_flush failed: %d", rc);
rc = OPAL_ERROR;
}
DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): ep = %p worker = %p\n",
DBG_OUT(_dbg_tls || _dbg_mem, "target = %d, ep = %p worker = %p\n",
(int)target,
(void *)item->ptr->endpoints[target],
(void *)item->ptr->worker);
opal_mutex_unlock(&item->ptr->mutex);

Просмотреть файл

@ -122,6 +122,12 @@ extern __thread int initialized;
#include <time.h>
#include <sys/time.h>
static int _dbg_winfo = 0;
static int _dbg_wpool = 0;
static int _dbg_ctx = 0;
static int _dbg_mem = 0;
static int _dbg_tls = 0;
static inline void init_tls_dbg(void)
{
if( !initialized ) {
@ -133,22 +139,34 @@ static inline void init_tls_dbg(void)
sprintf(fname, "%s.%d.log", hname, tid);
tls_pf = fopen(fname, "w");
initialized = 1;
// Create issusion that they are used to avoid compiler warnings
(void)_dbg_ctx;
(void)_dbg_mem;
(void)_dbg_tls;
(void)_dbg_winfo;
(void)_dbg_wpool;
}
}
#define DBG_OUT(...) \
{ \
struct timeval start_; \
time_t nowtime_; \
struct tm *nowtm_; \
char tmbuf_[64]; \
gettimeofday(&start_, NULL); \
nowtime_ = start_.tv_sec; \
nowtm_ = localtime(&nowtime_); \
strftime(tmbuf_, sizeof(tmbuf_), "%H:%M:%S", nowtm_); \
init_tls_dbg(); \
fprintf(tls_pf, "[%s.%06ld] ", tmbuf_, start_.tv_usec);\
fprintf(tls_pf, __VA_ARGS__); \
#define DBG_OUT(level, ...) \
{ \
struct timeval start_; \
time_t nowtime_; \
struct tm *nowtm_; \
char tmbuf_[64]; \
gettimeofday(&start_, NULL); \
nowtime_ = start_.tv_sec; \
nowtm_ = localtime(&nowtime_); \
strftime(tmbuf_, sizeof(tmbuf_), \
"%H:%M:%S", nowtm_); \
init_tls_dbg(); \
if (level) { \
fprintf(tls_pf, "[%s.%06ld] %s:", \
tmbuf_, start_.tv_usec, \
__func__); \
} \
fprintf(tls_pf, __VA_ARGS__); \
}
#else

Просмотреть файл

@ -60,17 +60,19 @@ static int _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append);
static int _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append);
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool);
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls);
static inline _tlocal_ctx_t *_tlocal_ctx_search(_tlocal_table_t *tls, int ctx_id);
static inline _tlocal_ctx_t *_tlocal_ctx_search(_tlocal_table_t *tls,
int ctx_id);
static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec);
static _tlocal_ctx_t *_tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx);
static _tlocal_ctx_t *_tlocal_add_ctx(_tlocal_table_t *tls,
opal_common_ucx_ctx_t *ctx);
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx, int target);
static inline _tlocal_mem_t *_tlocal_search_mem(_tlocal_table_t *tls, int mem_id);
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *mem);
static inline _tlocal_mem_t *_tlocal_search_mem(_tlocal_table_t *tls,
int mem_id);
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
opal_common_ucx_wpmem_t *mem);
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target);
// TOD: Return the error from it
static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec);
static void _tlocal_cleanup(void *arg);
/* Sorted declarations */