ompi/osc/ucx: Remove workerpool's global thread storage tables.
Co-authored-by: Artem Y. Polyakov <artemp@mellanox.com> Signed-off-by: Tomislav Janjusic <tomislavj@mellanox.com>
Этот коммит содержится в:
родитель
41df122083
Коммит
27ba4b612f
@ -639,7 +639,6 @@ int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
|
||||
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int insert, contain;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
assert(module->state.dynamic_win_count > 0);
|
||||
|
||||
@ -655,7 +654,7 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
|
||||
module->local_dynamic_win_info[contain].refcnt--;
|
||||
if (module->local_dynamic_win_info[contain].refcnt == 0) {
|
||||
ret = opal_common_ucx_wpmem_free(module->local_dynamic_win_info[contain].mem);
|
||||
opal_common_ucx_wpmem_free(module->local_dynamic_win_info[contain].mem);
|
||||
memmove((void *)&(module->local_dynamic_win_info[contain]),
|
||||
(void *)&(module->local_dynamic_win_info[contain+1]),
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
|
||||
@ -666,7 +665,7 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
module->state.dynamic_win_count--;
|
||||
}
|
||||
|
||||
return ret;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
@ -691,19 +690,13 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
free(module->addrs);
|
||||
free(module->state_addrs);
|
||||
|
||||
ret = opal_common_ucx_wpmem_free(module->state_mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = opal_common_ucx_wpmem_free(module->mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
opal_common_ucx_wpmem_free(module->state_mem);
|
||||
opal_common_ucx_wpmem_free(module->mem);
|
||||
|
||||
opal_common_ucx_wpctx_release(module->ctx);
|
||||
|
||||
if (module->disp_units) free(module->disp_units);
|
||||
if (module->disp_units)
|
||||
free(module->disp_units);
|
||||
ompi_comm_free(&module->comm);
|
||||
|
||||
free(module);
|
||||
|
@ -117,7 +117,6 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
|
||||
/**
|
||||
* Load an integer value of \c size bytes from \c ptr and cast it to uint64_t.
|
||||
*/
|
||||
|
@ -21,20 +21,27 @@
|
||||
******************************************************************************/
|
||||
|
||||
OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL);
|
||||
OBJ_CLASS_INSTANCE(_ctx_record_list_item_t, opal_list_item_t, NULL, NULL);
|
||||
OBJ_CLASS_INSTANCE(_mem_record_list_item_t, opal_list_item_t, NULL, NULL);
|
||||
OBJ_CLASS_INSTANCE(_tlocal_table_t, opal_list_item_t, NULL, NULL);
|
||||
OBJ_CLASS_INSTANCE(_ctx_record_t, opal_list_item_t, NULL, NULL);
|
||||
OBJ_CLASS_INSTANCE(_mem_record_t, opal_list_item_t, NULL, NULL);
|
||||
|
||||
// TODO: Remove once debug is completed
|
||||
#ifdef OPAL_COMMON_UCX_WPOOL_DBG
|
||||
__thread FILE *tls_pf = NULL;
|
||||
__thread int initialized = 0;
|
||||
#endif
|
||||
|
||||
static _ctx_record_t *
|
||||
_tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx);
|
||||
static inline _ctx_record_t *
|
||||
_tlocal_get_ctx_rec(opal_tsd_key_t tls_key);
|
||||
static void _tlocal_ctx_rec_cleanup(_ctx_record_t *ctx_rec);
|
||||
static void _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec);
|
||||
static void _ctx_rec_destructor(void *arg);
|
||||
static void _mem_rec_destructor(void *arg);
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Worker information (winfo) management functionality
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
static opal_common_ucx_winfo_t *
|
||||
_winfo_create(opal_common_ucx_wpool_t *wpool)
|
||||
{
|
||||
@ -146,13 +153,12 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
|
||||
return rc;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&wpool->mutex, opal_recursive_mutex_t);
|
||||
OBJ_CONSTRUCT(&wpool->tls_list, opal_list_t);
|
||||
OBJ_CONSTRUCT(&wpool->mutex, opal_mutex_t);
|
||||
|
||||
status = ucp_config_read("MPI", NULL, &config);
|
||||
if (UCS_OK != status) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
|
||||
return OPAL_ERROR;
|
||||
|
||||
}
|
||||
|
||||
/* initialize UCP context */
|
||||
@ -207,8 +213,6 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
|
||||
goto err_wpool_add;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&wpool->tls_key, opal_tsd_tracked_key_t);
|
||||
opal_tsd_tracked_key_set_destructor(&wpool->tls_key, _tlocal_cleanup);
|
||||
return rc;
|
||||
|
||||
err_wpool_add:
|
||||
@ -226,25 +230,11 @@ err_get_addr:
|
||||
OPAL_DECLSPEC
|
||||
void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
|
||||
{
|
||||
_tlocal_table_t *tls_item = NULL, *tls_next;
|
||||
|
||||
wpool->refcnt--;
|
||||
if (wpool->refcnt > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* After this have been called no thread cleanup callback
|
||||
* will be called */
|
||||
OBJ_DESTRUCT(&wpool->tls_key);
|
||||
|
||||
/* Go over remaining TLS structures and release it */
|
||||
OPAL_LIST_FOREACH_SAFE(tls_item, tls_next, &wpool->tls_list,
|
||||
_tlocal_table_t) {
|
||||
opal_list_remove_item(&wpool->tls_list, &tls_item->super);
|
||||
_common_ucx_tls_cleanup(tls_item);
|
||||
}
|
||||
OBJ_DESTRUCT(&wpool->tls_list);
|
||||
|
||||
/* Release the address here. recv worker will be released
|
||||
* below along with other idle workers */
|
||||
ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
|
||||
@ -275,6 +265,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
|
||||
}
|
||||
OBJ_DESTRUCT(&wpool->active_workers);
|
||||
|
||||
|
||||
OBJ_DESTRUCT(&wpool->mutex);
|
||||
ucp_cleanup(wpool->ucp_ctx);
|
||||
return;
|
||||
@ -314,6 +305,20 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
|
||||
return completed;
|
||||
}
|
||||
|
||||
// static int
|
||||
// _wpool_list_return_item(opal_common_ucx_wpool_t *wpool, opal_list_t *idle_list, opal_list_t *active_list,
|
||||
// opal_common_ucx_winfo_t *winfo)
|
||||
// {
|
||||
// opal_list_item_t *tmp = NULL;
|
||||
// opal_mutex_lock(&wpool->mutex);
|
||||
// tmp = opal_list_remove_item(active_list, &(winfo->self->super));
|
||||
// assert(NULL != tmp);
|
||||
// opal_list_append(idle_list, &(winfo->self->super));
|
||||
// opal_mutex_unlock(&wpool->mutex);
|
||||
//
|
||||
// return OPAL_SUCCESS;
|
||||
// }
|
||||
|
||||
static int
|
||||
_wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
opal_common_ucx_winfo_t *winfo)
|
||||
@ -326,6 +331,7 @@ _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
item->ptr = winfo;
|
||||
winfo->self = item;
|
||||
|
||||
opal_mutex_lock(&wpool->mutex);
|
||||
opal_list_append(list, &item->super);
|
||||
@ -352,6 +358,15 @@ _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list)
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
return winfo;
|
||||
|
||||
/*XXX*/
|
||||
// opal_mutex_lock(&wpool->mutex);
|
||||
// _winfo_list_item_t *item = (_winfo_list_item_t *)opal_list_remove_first(list);
|
||||
// opal_mutex_unlock(&wpool->mutex);
|
||||
// winfo = item->ptr;
|
||||
// OBJ_DESTRUCT(item);
|
||||
//
|
||||
// return winfo;
|
||||
}
|
||||
|
||||
static opal_common_ucx_winfo_t *
|
||||
@ -393,9 +408,8 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
|
||||
int ret = OPAL_SUCCESS;
|
||||
|
||||
OBJ_CONSTRUCT(&ctx->mutex, opal_recursive_mutex_t);
|
||||
OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t);
|
||||
ctx->released = 0;
|
||||
ctx->refcntr = 1; /* application holding the context */
|
||||
OBJ_CONSTRUCT(&ctx->ctx_records, opal_list_t);
|
||||
|
||||
ctx->wpool = wpool;
|
||||
ctx->comm_size = comm_size;
|
||||
|
||||
@ -408,11 +422,13 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
|
||||
goto error;
|
||||
}
|
||||
|
||||
opal_tsd_key_create(&ctx->tls_key, _ctx_rec_destructor);
|
||||
|
||||
(*ctx_ptr) = ctx;
|
||||
return ret;
|
||||
error:
|
||||
OBJ_DESTRUCT(&ctx->mutex);
|
||||
OBJ_DESTRUCT(&ctx->tls_workers);
|
||||
OBJ_DESTRUCT(&ctx->ctx_records);
|
||||
free(ctx);
|
||||
(*ctx_ptr) = NULL;
|
||||
return ret;
|
||||
@ -421,26 +437,23 @@ error:
|
||||
OPAL_DECLSPEC void
|
||||
opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
int my_refcntr = -1;
|
||||
_ctx_record_t *ctx_rec = NULL;
|
||||
|
||||
/* Application is expected to guarantee that no operation
|
||||
* is performed on the context that is being released */
|
||||
|
||||
/* Mark that this context was released by application
|
||||
* Threads will use this flag to perform deferred cleanup */
|
||||
ctx->released = 1;
|
||||
/* destroy key so that other threads don't invoke destructors */
|
||||
opal_tsd_key_delete(ctx->tls_key);
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
|
||||
/* loop through list of records */
|
||||
OPAL_LIST_FOREACH(ctx_rec, &ctx->ctx_records, _ctx_record_t) {
|
||||
_tlocal_ctx_rec_cleanup(ctx_rec);
|
||||
}
|
||||
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_mb();
|
||||
|
||||
/* If there is no more references to this handler
|
||||
* we can release it */
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpctx_free(ctx);
|
||||
}
|
||||
_common_ucx_wpctx_free(ctx);
|
||||
}
|
||||
|
||||
/* Final cleanup of the context structure
|
||||
@ -451,71 +464,10 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
|
||||
free(ctx->recv_worker_addrs);
|
||||
free(ctx->recv_worker_displs);
|
||||
OBJ_DESTRUCT(&ctx->mutex);
|
||||
OBJ_DESTRUCT(&ctx->tls_workers);
|
||||
OBJ_DESTRUCT(&ctx->ctx_records);
|
||||
free(ctx);
|
||||
}
|
||||
|
||||
/* Subscribe a new TLS to this context */
|
||||
static int
|
||||
_common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo)
|
||||
{
|
||||
_ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t);
|
||||
if (NULL == item) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
/* Increment the reference counter */
|
||||
OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1);
|
||||
|
||||
/* Add new worker to the context */
|
||||
item->ptr = winfo;
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
opal_list_append(&ctx->tls_workers, &item->super);
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Unsubscribe a particular TLS to this context */
|
||||
static void
|
||||
_common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo)
|
||||
{
|
||||
_ctx_record_list_item_t *item = NULL, *next;
|
||||
int my_refcntr = -1;
|
||||
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
|
||||
OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers,
|
||||
_ctx_record_list_item_t) {
|
||||
if (winfo == item->ptr) {
|
||||
opal_list_remove_item(&ctx->tls_workers, &item->super);
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
winfo->released = 1;
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_rmb();
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
|
||||
|
||||
/* a counterpart to the rmb above */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
/* All references to this data structure were removed
|
||||
* We can safely release communication context structure */
|
||||
_common_ucx_wpctx_free(ctx);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Worker Pool Memory management functionality
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -536,11 +488,11 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
ucs_status_t status;
|
||||
int ret = OPAL_SUCCESS;
|
||||
|
||||
mem->released = 0;
|
||||
mem->refcntr = 1; /* application holding this memory handler */
|
||||
mem->ctx = ctx;
|
||||
mem->mem_addrs = NULL;
|
||||
mem->mem_displs = NULL;
|
||||
|
||||
OBJ_CONSTRUCT(&mem->mem_records, opal_list_t);
|
||||
|
||||
ret = _comm_ucx_wpmem_map(ctx->wpool, mem_base, mem_size, &mem->memh,
|
||||
mem_type);
|
||||
@ -563,9 +515,7 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
goto error_rkey_pack;
|
||||
}
|
||||
|
||||
/* Dont need the destructor here, will use
|
||||
* wpool-level destructor */
|
||||
OBJ_CONSTRUCT(&mem->mem_tls_key, opal_tsd_tracked_key_t);
|
||||
opal_tsd_key_create(&mem->tls_key, _mem_rec_destructor);
|
||||
|
||||
(*mem_ptr) = mem;
|
||||
(*my_mem_addr) = rkey_addr;
|
||||
@ -581,27 +531,6 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
OPAL_DECLSPEC int
|
||||
opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
int my_refcntr = -1;
|
||||
|
||||
/* Mark that this memory handler has been called */
|
||||
mem->released = 1;
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
|
||||
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpmem_free(mem);
|
||||
}
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
opal_common_ucx_mem_type_t mem_type)
|
||||
@ -651,287 +580,114 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
|
||||
void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
OBJ_DESTRUCT(&mem->mem_tls_key);
|
||||
_mem_record_t *mem_rec = NULL;
|
||||
|
||||
opal_tsd_key_delete(mem->tls_key);
|
||||
|
||||
/* Loop through list of records */
|
||||
OPAL_LIST_FOREACH(mem_rec, &mem->mem_records, _mem_record_t) {
|
||||
_tlocal_mem_rec_cleanup(mem_rec);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&mem->mem_records);
|
||||
|
||||
opal_atomic_mb();
|
||||
|
||||
free(mem->mem_addrs);
|
||||
free(mem->mem_displs);
|
||||
|
||||
ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh);
|
||||
free(mem);
|
||||
}
|
||||
|
||||
static int
|
||||
_common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
/* Increment the reference counter */
|
||||
OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
static void
|
||||
_common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem)
|
||||
{
|
||||
int my_refcntr = -1;
|
||||
|
||||
/* Make sure that all the loads are complete at this
|
||||
* point so if somebody else will see refcntr ==0
|
||||
* and release the structure we would have all we need
|
||||
*/
|
||||
opal_atomic_rmb();
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1);
|
||||
|
||||
/* a counterpart to the rmb above */
|
||||
opal_atomic_wmb();
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpmem_free(mem);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Worker Pool TLS management functions management functionality
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool)
|
||||
{
|
||||
_tlocal_table_t *tls = OBJ_NEW(_tlocal_table_t);
|
||||
|
||||
if (tls == NULL) {
|
||||
// return OPAL_ERR_OUT_OF_RESOURCE
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tls->ctx_tbl = NULL;
|
||||
tls->ctx_tbl_size = 0;
|
||||
tls->mem_tbl = NULL;
|
||||
tls->mem_tbl_size = 0;
|
||||
|
||||
/* Add this TLS to the global wpool structure for future
|
||||
* cleanup purposes */
|
||||
tls->wpool = wpool;
|
||||
opal_mutex_lock(&wpool->mutex);
|
||||
opal_list_append(&wpool->tls_list, &tls->super);
|
||||
opal_mutex_unlock(&wpool->mutex);
|
||||
|
||||
if(_tlocal_tls_ctxtbl_extend(tls, 4)){
|
||||
MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool context table");
|
||||
return NULL;
|
||||
}
|
||||
if(_tlocal_tls_memtbl_extend(tls, 4)) {
|
||||
MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool memory table");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
opal_tsd_tracked_key_set(&wpool->tls_key, tls);
|
||||
|
||||
return tls;
|
||||
}
|
||||
|
||||
static inline _tlocal_table_t *
|
||||
_tlocal_get_tls(opal_common_ucx_wpool_t *wpool){
|
||||
_tlocal_table_t *tls;
|
||||
int rc = opal_tsd_tracked_key_get(&wpool->tls_key, (void**)&tls);
|
||||
static inline _ctx_record_t *
|
||||
_tlocal_get_ctx_rec(opal_tsd_key_t tls_key){
|
||||
_ctx_record_t *ctx_rec = NULL;
|
||||
int rc = opal_tsd_getspecific(tls_key, (void**)&ctx_rec);
|
||||
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == tls)) {
|
||||
tls = _common_ucx_tls_init(wpool);
|
||||
}
|
||||
return tls;
|
||||
return ctx_rec;
|
||||
}
|
||||
|
||||
static void _tlocal_cleanup(void *arg)
|
||||
{
|
||||
_tlocal_table_t *item = NULL, *next;
|
||||
_tlocal_table_t *tls = (_tlocal_table_t *)arg;
|
||||
opal_common_ucx_wpool_t *wpool = NULL;
|
||||
|
||||
if (NULL == tls) {
|
||||
return;
|
||||
}
|
||||
wpool = tls->wpool;
|
||||
|
||||
/* 1. Remove us from tls_list */
|
||||
tls->wpool = wpool;
|
||||
opal_mutex_lock(&wpool->mutex);
|
||||
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->tls_list, _tlocal_table_t) {
|
||||
if (item == tls) {
|
||||
opal_list_remove_item(&wpool->tls_list, &item->super);
|
||||
break;
|
||||
}
|
||||
}
|
||||
opal_mutex_unlock(&wpool->mutex);
|
||||
_common_ucx_tls_cleanup(tls);
|
||||
}
|
||||
|
||||
// TODO: don't want to inline this function
|
||||
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)
|
||||
{
|
||||
size_t i, size;
|
||||
|
||||
// Cleanup memory table
|
||||
size = tls->mem_tbl_size;
|
||||
for (i = 0; i < size; i++) {
|
||||
if (NULL != tls->mem_tbl[i]->gmem){
|
||||
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
|
||||
}
|
||||
|
||||
free(tls->mem_tbl[i]);
|
||||
}
|
||||
|
||||
// Cleanup ctx table
|
||||
size = tls->ctx_tbl_size;
|
||||
for (i = 0; i < size; i++) {
|
||||
if (NULL != tls->ctx_tbl[i]->gctx){
|
||||
assert(tls->ctx_tbl[i]->refcnt == 0);
|
||||
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
|
||||
}
|
||||
free(tls->ctx_tbl[i]);
|
||||
}
|
||||
|
||||
opal_tsd_tracked_key_set(&tls->wpool->tls_key, NULL);
|
||||
|
||||
OBJ_RELEASE(tls);
|
||||
static void _ctx_rec_destructor(void *arg) {
|
||||
_tlocal_ctx_rec_cleanup ((_ctx_record_t *) arg);
|
||||
return;
|
||||
}
|
||||
|
||||
static int
|
||||
_tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append)
|
||||
/* Thread local storage destructor, also called from wpool_ctx release */
|
||||
static void
|
||||
_tlocal_ctx_rec_cleanup(_ctx_record_t *ctx_rec)
|
||||
{
|
||||
size_t i;
|
||||
size_t newsize = (tbl->ctx_tbl_size + append);
|
||||
tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl));
|
||||
for (i = tbl->ctx_tbl_size; i < newsize; i++) {
|
||||
tbl->ctx_tbl[i] = calloc(1, sizeof(*tbl->ctx_tbl[i]));
|
||||
if (NULL == tbl->ctx_tbl[i]) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
opal_common_ucx_winfo_t *winfo = NULL;
|
||||
opal_common_ucx_wpool_t *wpool = NULL;
|
||||
|
||||
if (NULL == ctx_rec) {
|
||||
return;
|
||||
}
|
||||
tbl->ctx_tbl_size = newsize;
|
||||
return OPAL_SUCCESS;
|
||||
|
||||
winfo = ctx_rec->winfo;
|
||||
wpool = ctx_rec->gctx->wpool;
|
||||
|
||||
opal_mutex_lock(&wpool->mutex);
|
||||
|
||||
/* Remove worker from active and return to idle list.
|
||||
* Remove the context record from the ctx list */
|
||||
opal_list_remove_item(&wpool->active_workers, &winfo->self->super);
|
||||
opal_list_prepend(&wpool->idle_workers, &winfo->self->super);
|
||||
opal_list_remove_item(&ctx_rec->gctx->ctx_records, &ctx_rec->super);
|
||||
|
||||
opal_mutex_unlock(&wpool->mutex);
|
||||
|
||||
OBJ_RELEASE(ctx_rec);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static int
|
||||
_tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append)
|
||||
static _ctx_record_t *
|
||||
_tlocal_add_ctx_rec(opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
size_t i;
|
||||
size_t newsize = (tbl->mem_tbl_size + append);
|
||||
|
||||
tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl));
|
||||
for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) {
|
||||
tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i]));
|
||||
if (NULL == tbl->mem_tbl[i]) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
tbl->mem_tbl_size = newsize;
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static inline _tlocal_ctx_t *
|
||||
_tlocal_ctx_search(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
size_t i;
|
||||
for(i=0; i<tls->ctx_tbl_size; i++) {
|
||||
if (tls->ctx_tbl[i]->gctx == ctx){
|
||||
return tls->ctx_tbl[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int
|
||||
_tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
|
||||
{
|
||||
if (NULL == ctx_rec->gctx) {
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
if (ctx_rec->refcnt > 0) {
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Remove myself from the communication context structure
|
||||
* This may result in context release as we are using
|
||||
* delayed cleanup */
|
||||
_common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo);
|
||||
|
||||
/* Erase the record so it can be reused */
|
||||
memset(ctx_rec, 0, sizeof(*ctx_rec));
|
||||
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
// TODO: Don't want to inline this (slow path)
|
||||
static _tlocal_ctx_t *
|
||||
_tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
size_t i, free_idx = -1;
|
||||
int rc, found = 0;
|
||||
|
||||
/* Try to find available record in the TLS table
|
||||
* In parallel perform deferred cleanups */
|
||||
for (i=0; i<tls->ctx_tbl_size; i++) {
|
||||
if (NULL != tls->ctx_tbl[i]->gctx && tls->ctx_tbl[i]->refcnt == 0) {
|
||||
if (tls->ctx_tbl[i]->gctx->released ) {
|
||||
/* Found dirty record, need to clean first */
|
||||
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
|
||||
}
|
||||
}
|
||||
if ((NULL == tls->ctx_tbl[i]->gctx) && !found) {
|
||||
/* Found clean record */
|
||||
free_idx = i;
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* if needed - extend the table */
|
||||
if (!found) {
|
||||
free_idx = tls->ctx_tbl_size;
|
||||
rc = _tlocal_tls_ctxtbl_extend(tls, 4);
|
||||
if (rc) {
|
||||
//TODO: error out
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
tls->ctx_tbl[free_idx]->gctx = ctx;
|
||||
tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size);
|
||||
if (NULL == tls->ctx_tbl[free_idx]->winfo) {
|
||||
MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
|
||||
int rc;
|
||||
|
||||
_ctx_record_t *ctx_rec = OBJ_NEW(_ctx_record_t);
|
||||
if (!ctx_rec) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ctx_rec->gctx = ctx;
|
||||
ctx_rec->winfo = _wpool_get_idle(ctx->wpool, ctx->comm_size);
|
||||
if (NULL == ctx_rec->winfo) {
|
||||
MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
|
||||
OBJ_RELEASE(ctx_rec);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Make sure that we completed all the data structures before
|
||||
* placing the item to the list
|
||||
* NOTE: essentially we don't need this as list append is an
|
||||
* operation protected by mutex
|
||||
*/
|
||||
opal_atomic_wmb();
|
||||
|
||||
/* Add this worker to the active worker list */
|
||||
_wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo);
|
||||
_wpool_add_active(ctx->wpool, ctx_rec->winfo);
|
||||
|
||||
/* add this worker into the context list */
|
||||
rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo);
|
||||
if (rc) {
|
||||
//TODO: error out
|
||||
/* Add ctx_rec to list */
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
opal_list_append(&ctx->ctx_records, &ctx_rec->super);
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
/* Add tls reference to record */
|
||||
rc = opal_tsd_setspecific(ctx->tls_key, ctx_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
OBJ_RELEASE(ctx_rec);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* All good - return the record */
|
||||
return tls->ctx_tbl[free_idx];
|
||||
return ctx_rec;
|
||||
}
|
||||
|
||||
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
|
||||
static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
|
||||
{
|
||||
ucp_ep_params_t ep_params;
|
||||
opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
|
||||
@ -955,130 +711,74 @@ static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* TLS memory management */
|
||||
|
||||
static inline _tlocal_mem_t *
|
||||
_tlocal_search_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *gmem)
|
||||
{
|
||||
size_t i;
|
||||
for(i=0; i<tls->mem_tbl_size; i++) {
|
||||
if( tls->mem_tbl[i]->gmem == gmem){
|
||||
return tls->mem_tbl[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
static void
|
||||
_mem_rec_destructor(void * arg) {
|
||||
_tlocal_mem_rec_cleanup ((_mem_record_t *) arg);
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
_tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
|
||||
_tlocal_mem_rec_cleanup(_mem_record_t *mem_rec)
|
||||
{
|
||||
size_t i;
|
||||
if (NULL == mem_rec) {
|
||||
return;
|
||||
}
|
||||
|
||||
for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
|
||||
if (mem_rec->mem->rkeys[i]) {
|
||||
ucp_rkey_destroy(mem_rec->mem->rkeys[i]);
|
||||
if (mem_rec->rkeys[i]) {
|
||||
ucp_rkey_destroy(mem_rec->rkeys[i]);
|
||||
}
|
||||
}
|
||||
free(mem_rec->mem->rkeys);
|
||||
free(mem_rec->rkeys);
|
||||
|
||||
/* Remove myself from the memory context structure
|
||||
* This may result in context release as we are using
|
||||
* delayed cleanup */
|
||||
_common_ucx_mem_signout(mem_rec->gmem);
|
||||
/* Remove item from the list */
|
||||
opal_mutex_lock(&mem_rec->gmem->ctx->mutex);
|
||||
opal_list_remove_item(&mem_rec->gmem->mem_records, &mem_rec->super);
|
||||
opal_mutex_unlock(&mem_rec->gmem->ctx->mutex);
|
||||
|
||||
/* Release fast-path pointers */
|
||||
if (NULL != mem_rec->mem_tls_ptr) {
|
||||
free(mem_rec->mem_tls_ptr);
|
||||
}
|
||||
OBJ_RELEASE(mem_rec);
|
||||
|
||||
assert(mem_rec->ctx_rec != NULL);
|
||||
OPAL_ATOMIC_ADD_FETCH32(&mem_rec->ctx_rec->refcnt, -1);
|
||||
assert(mem_rec->ctx_rec->refcnt >= 0);
|
||||
|
||||
free(mem_rec->mem);
|
||||
|
||||
memset(mem_rec, 0, sizeof(*mem_rec));
|
||||
return;
|
||||
}
|
||||
|
||||
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *mem)
|
||||
static _mem_record_t *_tlocal_add_mem_rec(opal_common_ucx_wpmem_t *mem, _ctx_record_t *ctx_rec)
|
||||
{
|
||||
size_t i, free_idx = -1;
|
||||
_tlocal_ctx_t *ctx_rec = NULL;
|
||||
int rc = OPAL_SUCCESS, found = 0;
|
||||
|
||||
/* Try to find available spot in the table */
|
||||
for (i=0; i<tls->mem_tbl_size; i++) {
|
||||
if (NULL != tls->mem_tbl[i]->gmem) {
|
||||
if (tls->mem_tbl[i]->gmem->released) {
|
||||
/* Found a dirty record. Need to clean it first */
|
||||
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
|
||||
}
|
||||
}
|
||||
if ((NULL == tls->mem_tbl[i]->gmem) && !found) {
|
||||
/* Found a clear record */
|
||||
free_idx = i;
|
||||
found = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found){
|
||||
free_idx = tls->mem_tbl_size;
|
||||
rc = _tlocal_tls_memtbl_extend(tls, 4);
|
||||
if (rc != OPAL_SUCCESS) {
|
||||
//TODO: error out
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
tls->mem_tbl[free_idx]->gmem = mem;
|
||||
tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem));
|
||||
|
||||
ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
|
||||
if (NULL == ctx_rec) {
|
||||
// TODO: act accordingly - cleanup
|
||||
int rc = OPAL_SUCCESS;
|
||||
_mem_record_t *mem_rec = OBJ_NEW(_mem_record_t);
|
||||
if (NULL == mem_rec) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tls->mem_tbl[free_idx]->ctx_rec = ctx_rec;
|
||||
OPAL_ATOMIC_ADD_FETCH32(&ctx_rec->refcnt, 1);
|
||||
opal_mutex_lock(&ctx_rec->gctx->mutex);
|
||||
opal_list_append(&mem->mem_records, &mem_rec->super);
|
||||
opal_mutex_unlock(&ctx_rec->gctx->mutex);
|
||||
|
||||
tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo;
|
||||
tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size,
|
||||
sizeof(*tls->mem_tbl[free_idx]->mem->rkeys));
|
||||
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr =
|
||||
calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr));
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo;
|
||||
tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys;
|
||||
opal_tsd_tracked_key_set(&mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr);
|
||||
|
||||
/* Make sure that we completed all the data structures before
|
||||
* placing the item to the list
|
||||
* NOTE: essentially we don't need this as list append is an
|
||||
* operation protected by mutex
|
||||
*/
|
||||
mem_rec->gmem = mem;
|
||||
mem_rec->ctx_rec = ctx_rec;
|
||||
mem_rec->winfo = ctx_rec->winfo;
|
||||
mem_rec->rkeys = calloc(mem->ctx->comm_size, sizeof(*mem_rec->rkeys));
|
||||
|
||||
opal_atomic_wmb();
|
||||
|
||||
rc = _common_ucx_wpmem_signup(mem);
|
||||
if (rc) {
|
||||
// TODO: error handling
|
||||
rc = opal_tsd_setspecific(mem->tls_key, mem_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return tls->mem_tbl[free_idx];
|
||||
return mem_rec;
|
||||
}
|
||||
|
||||
static int
|
||||
_tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
|
||||
_tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int target)
|
||||
{
|
||||
_mem_info_t *minfo = mem_rec->mem;
|
||||
opal_common_ucx_wpmem_t *gmem = mem_rec->gmem;
|
||||
int displ = gmem->mem_displs[target];
|
||||
ucs_status_t status;
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ],
|
||||
&minfo->rkeys[target]);
|
||||
&mem_rec->rkeys[target]);
|
||||
if (status != UCS_OK) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status);
|
||||
return OPAL_ERROR;
|
||||
@ -1091,21 +791,15 @@ _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target)
|
||||
OPAL_DECLSPEC int
|
||||
opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
|
||||
{
|
||||
_tlocal_table_t *tls = NULL;
|
||||
_tlocal_ctx_t *ctx_rec = NULL;
|
||||
_ctx_record_t *ctx_rec = NULL;
|
||||
_mem_record_t *mem_rec = NULL;
|
||||
opal_common_ucx_winfo_t *winfo = NULL;
|
||||
_tlocal_mem_t *mem_rec = NULL;
|
||||
_mem_info_t *mem_info = NULL;
|
||||
ucp_ep_h ep;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
tls = _tlocal_get_tls(mem->ctx->wpool);
|
||||
|
||||
/* Obtain the worker structure */
|
||||
ctx_rec = _tlocal_ctx_search(tls, mem->ctx);
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == ctx_rec)) {
|
||||
ctx_rec = _tlocal_add_ctx(tls, mem->ctx);
|
||||
ctx_rec = _tlocal_get_ctx_rec(mem->ctx->tls_key);
|
||||
if (OPAL_UNLIKELY(!ctx_rec)) {
|
||||
ctx_rec = _tlocal_add_ctx_rec(mem->ctx);
|
||||
if (NULL == ctx_rec) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -1122,17 +816,10 @@ opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target)
|
||||
ep = winfo->endpoints[target];
|
||||
|
||||
/* Obtain the memory region info */
|
||||
mem_rec = _tlocal_search_mem(tls, mem);
|
||||
if (OPAL_UNLIKELY(mem_rec == NULL)) {
|
||||
mem_rec = _tlocal_add_mem(tls, mem);
|
||||
if (NULL == mem_rec) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
mem_info = mem_rec->mem;
|
||||
mem_rec = _tlocal_add_mem_rec(mem, ctx_rec);
|
||||
|
||||
/* Obtain the rkey */
|
||||
if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) {
|
||||
if (OPAL_UNLIKELY(NULL == mem_rec->rkeys[target])) {
|
||||
/* Create the rkey */
|
||||
rc = _tlocal_mem_create_rkey(mem_rec, ep, target);
|
||||
if (rc) {
|
||||
@ -1186,37 +873,37 @@ opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
OPAL_DECLSPEC int
|
||||
opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
|
||||
opal_common_ucx_flush_scope_t scope,
|
||||
int target)
|
||||
{
|
||||
_ctx_record_list_item_t *item;
|
||||
_ctx_record_t *ctx_rec;
|
||||
opal_common_ucx_ctx_t *ctx = mem->ctx;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
|
||||
OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
|
||||
OPAL_LIST_FOREACH(ctx_rec, &ctx->ctx_records, _ctx_record_t) {
|
||||
opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
|
||||
if ((scope == OPAL_COMMON_UCX_SCOPE_EP) &&
|
||||
(NULL == item->ptr->endpoints[target])) {
|
||||
(NULL == winfo->endpoints[target])) {
|
||||
continue;
|
||||
}
|
||||
opal_mutex_lock(&item->ptr->mutex);
|
||||
rc = opal_common_ucx_winfo_flush(item->ptr, target, OPAL_COMMON_UCX_FLUSH_B,
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
rc = opal_common_ucx_winfo_flush(winfo, target, OPAL_COMMON_UCX_FLUSH_B,
|
||||
scope, NULL);
|
||||
switch (scope) {
|
||||
case OPAL_COMMON_UCX_SCOPE_WORKER:
|
||||
item->ptr->global_inflight_ops = 0;
|
||||
memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short));
|
||||
winfo->global_inflight_ops = 0;
|
||||
memset(winfo->inflight_ops, 0, winfo->comm_size * sizeof(short));
|
||||
break;
|
||||
case OPAL_COMMON_UCX_SCOPE_EP:
|
||||
item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target];
|
||||
item->ptr->inflight_ops[target] = 0;
|
||||
winfo->global_inflight_ops -= winfo->inflight_ops[target];
|
||||
winfo->inflight_ops[target] = 0;
|
||||
break;
|
||||
}
|
||||
opal_mutex_unlock(&item->ptr->mutex);
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
|
||||
if (rc != OPAL_SUCCESS) {
|
||||
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d",
|
||||
|
@ -46,15 +46,9 @@ typedef struct {
|
||||
ucp_address_t *recv_waddr;
|
||||
size_t recv_waddr_len;
|
||||
|
||||
/* Thread-local key to allow each thread to have
|
||||
* local information assisiated with this wpool */
|
||||
opal_tsd_tracked_key_t tls_key;
|
||||
|
||||
/* Bookkeeping information */
|
||||
opal_list_t idle_workers;
|
||||
opal_list_t active_workers;
|
||||
|
||||
opal_list_t tls_list;
|
||||
} opal_common_ucx_wpool_t;
|
||||
|
||||
/* Worker Pool Context (wpctx) is an object that is comprised of a set of UCP
|
||||
@ -68,15 +62,18 @@ typedef struct {
|
||||
*/
|
||||
typedef struct {
|
||||
opal_recursive_mutex_t mutex;
|
||||
opal_atomic_int32_t refcntr;
|
||||
|
||||
/* the reference to a Worker pool this context belongs to*/
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
/* A list of references to TLS context records
|
||||
* we need to keep track of them to have an ability to
|
||||
* let thread know that this context is no longer valid */
|
||||
opal_list_t tls_workers;
|
||||
volatile int released;
|
||||
|
||||
/* A list of context records
|
||||
* We need to keep a track of allocated context records so
|
||||
* that we can free them at the end if thread fails to release context record */
|
||||
opal_list_t ctx_records;
|
||||
|
||||
/* Thread-local key to allow each thread to have
|
||||
* local information associated with this wpctx */
|
||||
opal_tsd_key_t tls_key;
|
||||
|
||||
/* UCX addressing information */
|
||||
char *recv_worker_addrs;
|
||||
@ -95,21 +92,24 @@ typedef struct {
|
||||
/* reference context to which memory region belongs */
|
||||
opal_common_ucx_ctx_t *ctx;
|
||||
|
||||
/* object lifetime control */
|
||||
volatile int released;
|
||||
opal_atomic_int32_t refcntr;
|
||||
|
||||
/* UCX memory handler */
|
||||
ucp_mem_h memh;
|
||||
char *mem_addrs;
|
||||
int *mem_displs;
|
||||
|
||||
/* A list of mem records
|
||||
* We need to kepp trakc o fallocated memory records so that we can free them at the end
|
||||
* if a thread fails to release the memory record */
|
||||
opal_list_t mem_records;
|
||||
|
||||
/* TLS item that allows each thread to
|
||||
* store endpoints and rkey arrays
|
||||
* for faster access */
|
||||
opal_tsd_tracked_key_t mem_tls_key;
|
||||
opal_tsd_key_t tls_key;
|
||||
} opal_common_ucx_wpmem_t;
|
||||
|
||||
typedef struct __winfo_list_item_t _winfo_list_item_t;
|
||||
|
||||
/* The structure that wraps UCP worker and holds the state that is required
|
||||
* for its use.
|
||||
* The structure is allocated along with UCP worker on demand and is being held
|
||||
@ -118,6 +118,7 @@ typedef struct {
|
||||
*/
|
||||
typedef struct opal_common_ucx_winfo {
|
||||
opal_recursive_mutex_t mutex;
|
||||
_winfo_list_item_t *self;
|
||||
volatile int released;
|
||||
ucp_worker_h worker;
|
||||
ucp_ep_h *endpoints;
|
||||
@ -127,11 +128,6 @@ typedef struct opal_common_ucx_winfo {
|
||||
ucs_status_ptr_t inflight_req;
|
||||
} opal_common_ucx_winfo_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
ucp_rkey_h *rkeys;
|
||||
} opal_common_ucx_tlocal_fast_ptrs_t;
|
||||
|
||||
typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
|
||||
|
||||
/* A fast-path structure that gathers all pointers that are required to
|
||||
@ -165,11 +161,32 @@ typedef enum {
|
||||
OPAL_COMMON_UCX_MEM_MAP
|
||||
} opal_common_ucx_mem_type_t;
|
||||
|
||||
struct __winfo_list_item_t{
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
};
|
||||
OBJ_CLASS_DECLARATION(_winfo_list_item_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
} _ctx_record_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
ucp_rkey_h *rkeys;
|
||||
_ctx_record_t *ctx_rec;
|
||||
} _mem_record_t;
|
||||
OBJ_CLASS_DECLARATION(_mem_record_t);
|
||||
|
||||
typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len,
|
||||
char **recv_info, int **disps,
|
||||
void *metadata);
|
||||
|
||||
|
||||
/* Manage Worker Pool (wpool) */
|
||||
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
|
||||
@ -196,34 +213,35 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
|
||||
ucp_ep_h *_ep, ucp_rkey_h *_rkey,
|
||||
opal_common_ucx_winfo_t **_winfo)
|
||||
{
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *fp = NULL;
|
||||
_mem_record_t *mem_rec = NULL;
|
||||
int expr;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
/* First check the fast-path */
|
||||
rc = opal_tsd_tracked_key_get(&mem->mem_tls_key, (void**)&fp);
|
||||
rc = opal_tsd_getspecific(mem->tls_key, (void**)&mem_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
expr = fp && (NULL != fp->winfo) && (fp->winfo->endpoints[target]) &&
|
||||
(NULL != fp->rkeys[target]);
|
||||
expr = mem_rec && (NULL != mem_rec->winfo) && (mem_rec->winfo->endpoints[target]) &&
|
||||
(NULL != mem_rec->rkeys[target]);
|
||||
if (OPAL_UNLIKELY(!expr)) {
|
||||
rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
rc = opal_tsd_tracked_key_get(&mem->mem_tls_key, (void**)&fp);
|
||||
rc = opal_tsd_getspecific(mem->tls_key, (void**)&mem_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
MCA_COMMON_UCX_ASSERT(fp && (NULL != fp->winfo) &&
|
||||
(fp->winfo->endpoints[target])
|
||||
&& (NULL != fp->rkeys[target]));
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->winfo);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->winfo->endpoints[target]);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->rkeys[target]);
|
||||
|
||||
*_rkey = fp->rkeys[target];
|
||||
*_winfo = fp->winfo;
|
||||
*_ep = fp->winfo->endpoints[target];
|
||||
*_rkey = mem_rec->rkeys[target];
|
||||
*_winfo = mem_rec->winfo;
|
||||
*_ep = mem_rec->winfo->endpoints[target];
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
@ -236,7 +254,7 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
char **my_mem_addr,
|
||||
int *my_mem_addr_size,
|
||||
opal_common_ucx_wpmem_t **mem_ptr);
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
|
||||
opal_common_ucx_flush_scope_t scope,
|
||||
|
@ -5,77 +5,11 @@
|
||||
#include "common_ucx.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
opal_atomic_int32_t refcnt;
|
||||
} _tlocal_ctx_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *worker;
|
||||
ucp_rkey_h *rkeys;
|
||||
} _mem_info_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
_mem_info_t *mem;
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *mem_tls_ptr;
|
||||
_tlocal_ctx_t *ctx_rec;
|
||||
} _tlocal_mem_t;
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _winfo_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_winfo_list_item_t);
|
||||
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _ctx_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_list_item_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
_tlocal_mem_t *ptr;
|
||||
} _mem_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_mem_record_list_item_t);
|
||||
|
||||
/* thread-local table */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
_tlocal_ctx_t **ctx_tbl;
|
||||
size_t ctx_tbl_size;
|
||||
_tlocal_mem_t **mem_tbl;
|
||||
size_t mem_tbl_size;
|
||||
} _tlocal_table_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(_tlocal_table_t);
|
||||
|
||||
static int _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static int _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool);
|
||||
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls);
|
||||
static inline _tlocal_ctx_t *_tlocal_ctx_search(_tlocal_table_t *tls,
|
||||
opal_common_ucx_ctx_t *ctx);
|
||||
static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec);
|
||||
static _tlocal_ctx_t *_tlocal_add_ctx(_tlocal_table_t *tls,
|
||||
opal_common_ucx_ctx_t *ctx);
|
||||
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx, int target);
|
||||
static inline _tlocal_mem_t *_tlocal_search_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *gmem);
|
||||
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *mem);
|
||||
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target);
|
||||
// TOD: Return the error from it
|
||||
static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec);
|
||||
static void _tlocal_cleanup(void *arg);
|
||||
static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target);
|
||||
static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int target);
|
||||
|
||||
/* Sorted declarations */
|
||||
|
||||
|
||||
/* Internal Worker Information (winfo) management */
|
||||
static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool);
|
||||
static void _winfo_release(opal_common_ucx_winfo_t *winfo);
|
||||
@ -95,18 +29,10 @@ static int _wpool_add_active(opal_common_ucx_wpool_t *wpool,
|
||||
|
||||
/* Internal Worker Pool Context management */
|
||||
static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx);
|
||||
static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool Memeory management */
|
||||
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
opal_common_ucx_mem_type_t mem_type);
|
||||
static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
static int _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem);
|
||||
static void _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
|
||||
#endif // COMMON_UCX_WPOOL_INT_H
|
||||
|
@ -474,6 +474,11 @@ opal_net_init()
|
||||
int
|
||||
opal_net_finalize()
|
||||
{
|
||||
#if OPAL_ENABLE_IPV6
|
||||
if (NULL != hostname_tsd_key) {
|
||||
OBJ_RELEASE(hostname_tsd_key);
|
||||
}
|
||||
#endif
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user