#include "opal_config.h" #include "common_ucx.h" #include "common_ucx_wpool.h" #include "common_ucx_wpool_int.h" #include "opal/mca/base/mca_base_var.h" #include "opal/mca/base/mca_base_framework.h" #include "opal/mca/pmix/pmix.h" #include "opal/memoryhooks/memory.h" #include /******************************************************************************* ******************************************************************************* * * Worker Pool (wpool) framework * Used to manage multi-threaded implementation of UCX for ompi/OSC & OSHMEM * ******************************************************************************* ******************************************************************************/ OBJ_CLASS_INSTANCE(_winfo_list_item_t, opal_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(_ctx_record_list_item_t, opal_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(_mem_record_list_item_t, opal_list_item_t, NULL, NULL); OBJ_CLASS_INSTANCE(_tlocal_table_t, opal_list_item_t, NULL, NULL); // TODO: Remove once debug is completed #ifdef OPAL_COMMON_UCX_WPOOL_DBG __thread FILE *tls_pf = NULL; __thread int initialized = 0; #endif /* ----------------------------------------------------------------------------- * Worker information (winfo) management functionality *----------------------------------------------------------------------------*/ static opal_common_ucx_winfo_t * _winfo_create(opal_common_ucx_wpool_t *wpool) { ucp_worker_params_t worker_params; ucp_worker_h worker; ucs_status_t status; opal_common_ucx_winfo_t *winfo = NULL; memset(&worker_params, 0, sizeof(worker_params)); worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; worker_params.thread_mode = UCS_THREAD_MODE_SINGLE; status = ucp_worker_create(wpool->ucp_ctx, &worker_params, &worker); if (UCS_OK != status) { MCA_COMMON_UCX_ERROR("ucp_worker_create failed: %d", status); goto exit; } winfo = calloc(1, sizeof(*winfo)); if (NULL == winfo) { MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info"); goto release_worker; } OBJ_CONSTRUCT(&winfo->mutex, opal_recursive_mutex_t); winfo->worker = worker; winfo->endpoints = NULL; winfo->comm_size = 0; winfo->released = 0; winfo->inflight_ops = NULL; winfo->global_inflight_ops = 0; winfo->inflight_req = UCS_OK; return winfo; release_worker: ucp_worker_destroy(worker); exit: return winfo; } static void _winfo_reset(opal_common_ucx_winfo_t *winfo) { if (winfo->inflight_req != UCS_OK) { opal_common_ucx_wait_request_mt(winfo->inflight_req, "opal_common_ucx_flush"); winfo->inflight_req = UCS_OK; } assert(winfo->global_inflight_ops == 0); if(winfo->comm_size != 0) { size_t i; for (i = 0; i < winfo->comm_size; i++) { if (NULL != winfo->endpoints[i]){ ucp_ep_destroy(winfo->endpoints[i]); } assert(winfo->inflight_ops[i] == 0); } free(winfo->endpoints); free(winfo->inflight_ops); } winfo->endpoints = NULL; winfo->comm_size = 0; winfo->released = 0; } static void _winfo_release(opal_common_ucx_winfo_t *winfo) { OBJ_DESTRUCT(&winfo->mutex); ucp_worker_destroy(winfo->worker); free(winfo); } /* ----------------------------------------------------------------------------- * Worker Pool management functionality *----------------------------------------------------------------------------*/ OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void) { opal_common_ucx_wpool_t *ptr = calloc(1, sizeof(opal_common_ucx_wpool_t)); ptr->refcnt = 0; return ptr; } OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool) { assert(wpool->refcnt == 0); free(wpool); } OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool, int proc_world_size, bool enable_mt) { ucp_config_t *config = NULL; ucp_params_t context_params; opal_common_ucx_winfo_t *winfo; ucs_status_t status; int rc = OPAL_SUCCESS; wpool->refcnt++; if (1 < wpool->refcnt) { return rc; } OBJ_CONSTRUCT(&wpool->mutex, opal_recursive_mutex_t); OBJ_CONSTRUCT(&wpool->tls_list, opal_list_t); status = ucp_config_read("MPI", NULL, &config); if (UCS_OK != status) { MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status); return OPAL_ERROR; } /* initialize UCP context */ memset(&context_params, 0, sizeof(context_params)); context_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_MT_WORKERS_SHARED | UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | UCP_PARAM_FIELD_REQUEST_INIT | UCP_PARAM_FIELD_REQUEST_SIZE; context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64; context_params.mt_workers_shared = (enable_mt ? 1 : 0); context_params.estimated_num_eps = proc_world_size; context_params.request_init = opal_common_ucx_req_init; context_params.request_size = sizeof(opal_common_ucx_request_t); status = ucp_init(&context_params, config, &wpool->ucp_ctx); ucp_config_release(config); if (UCS_OK != status) { MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status); rc = OPAL_ERROR; goto err_ucp_init; } /* create recv worker and add to idle pool */ OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t); OBJ_CONSTRUCT(&wpool->active_workers, opal_list_t); winfo = _winfo_create(wpool); if (NULL == winfo) { MCA_COMMON_UCX_ERROR("Failed to create receive worker"); rc = OPAL_ERROR; goto err_worker_create; } wpool->dflt_worker = winfo->worker; status = ucp_worker_get_address(wpool->dflt_worker, &wpool->recv_waddr, &wpool->recv_waddr_len); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status); rc = OPAL_ERROR; goto err_get_addr; } rc = _wpool_list_put(wpool, &wpool->idle_workers, winfo); if (rc) { goto err_wpool_add; } opal_tsd_key_create(&wpool->tls_key, _tlocal_cleanup); return rc; err_wpool_add: free(wpool->recv_waddr); err_get_addr: if (NULL != wpool->dflt_worker) { ucp_worker_destroy(wpool->dflt_worker); } err_worker_create: ucp_cleanup(wpool->ucp_ctx); err_ucp_init: return rc; } OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool) { _tlocal_table_t *tls_item = NULL, *tls_next; wpool->refcnt--; if (wpool->refcnt > 0) { return; } /* After this have been called no thread cleanup callback * will be called */ opal_tsd_key_delete(wpool->tls_key); /* Go over remaining TLS structures and release it */ OPAL_LIST_FOREACH_SAFE(tls_item, tls_next, &wpool->tls_list, _tlocal_table_t) { opal_list_remove_item(&wpool->tls_list, &tls_item->super); _common_ucx_tls_cleanup(tls_item); } OBJ_DESTRUCT(&wpool->tls_list); /* Release the address here. recv worker will be released * below along with other idle workers */ ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr); /* Go over the list, free idle list items */ if (!opal_list_is_empty(&wpool->idle_workers)) { _winfo_list_item_t *item, *next; OPAL_LIST_FOREACH_SAFE(item, next, &wpool->idle_workers, _winfo_list_item_t) { opal_list_remove_item(&wpool->idle_workers, &item->super); _winfo_release(item->ptr); OBJ_RELEASE(item); } } OBJ_DESTRUCT(&wpool->idle_workers); /* Release active workers. They are no longer active actually * because opal_common_ucx_wpool_finalize is being called. */ if (!opal_list_is_empty(&wpool->active_workers)) { _winfo_list_item_t *item, *next; OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers, _winfo_list_item_t) { opal_list_remove_item(&wpool->active_workers, &item->super); _winfo_reset(item->ptr); _winfo_release(item->ptr); OBJ_RELEASE(item); } } OBJ_DESTRUCT(&wpool->active_workers); OBJ_DESTRUCT(&wpool->mutex); ucp_cleanup(wpool->ucp_ctx); return; } OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool) { _winfo_list_item_t *item = NULL, *next = NULL; /* Go over all active workers and progress them * TODO: may want to have some partitioning to progress only part of * workers */ if (!opal_mutex_trylock (&wpool->mutex)) { OPAL_LIST_FOREACH_SAFE(item, next, &wpool->active_workers, _winfo_list_item_t) { opal_common_ucx_winfo_t *winfo = item->ptr; opal_mutex_lock(&winfo->mutex); if( OPAL_UNLIKELY(winfo->released) ) { /* Do garbage collection of worker info's if needed */ opal_list_remove_item(&wpool->active_workers, &item->super); _winfo_reset(winfo); opal_list_append(&wpool->idle_workers, &item->super); } else { /* Progress worker until there are existing events */ while(ucp_worker_progress(winfo->worker)); } opal_mutex_unlock(&winfo->mutex); } opal_mutex_unlock(&wpool->mutex); } } static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list, opal_common_ucx_winfo_t *winfo) { _winfo_list_item_t *item; item = OBJ_NEW(_winfo_list_item_t); if (NULL == item) { MCA_COMMON_UCX_ERROR("Cannot allocate memory for winfo list item"); return OPAL_ERR_OUT_OF_RESOURCE; } item->ptr = winfo; opal_mutex_lock(&wpool->mutex); opal_list_append(list, &item->super); opal_mutex_unlock(&wpool->mutex); return OPAL_SUCCESS; } static opal_common_ucx_winfo_t* _wpool_list_get(opal_common_ucx_wpool_t *wpool, opal_list_t *list) { opal_common_ucx_winfo_t *winfo = NULL; _winfo_list_item_t *item = NULL; opal_mutex_lock(&wpool->mutex); if (!opal_list_is_empty(list)) { item = (_winfo_list_item_t *)opal_list_get_first(list); opal_list_remove_item(list, &item->super); } opal_mutex_unlock(&wpool->mutex); if (item != NULL) { winfo = item->ptr; OBJ_RELEASE(item); } return winfo; } static opal_common_ucx_winfo_t * _wpool_get_idle(opal_common_ucx_wpool_t *wpool, size_t comm_size) { opal_common_ucx_winfo_t *winfo; winfo = _wpool_list_get(wpool, &wpool->idle_workers); if (!winfo) { winfo = _winfo_create(wpool); if (!winfo) { MCA_COMMON_UCX_ERROR("Failed to allocate worker info structure"); return NULL; } } winfo->endpoints = calloc(comm_size, sizeof(ucp_ep_h)); winfo->inflight_ops = calloc(comm_size, sizeof(short)); winfo->comm_size = comm_size; return winfo; } static int _wpool_add_active(opal_common_ucx_wpool_t *wpool, opal_common_ucx_winfo_t *winfo) { return _wpool_list_put(wpool, &wpool->active_workers, winfo); } /* ----------------------------------------------------------------------------- * Worker Pool Communication context management functionality *----------------------------------------------------------------------------*/ OPAL_DECLSPEC int opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size, opal_common_ucx_exchange_func_t exchange_func, void *exchange_metadata, opal_common_ucx_ctx_t **ctx_ptr) { opal_common_ucx_ctx_t *ctx = calloc(1, sizeof(*ctx)); int ret = OPAL_SUCCESS; OBJ_CONSTRUCT(&ctx->mutex, opal_recursive_mutex_t); OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t); ctx->released = 0; ctx->refcntr = 1; /* application holding the context */ ctx->wpool = wpool; ctx->comm_size = comm_size; ctx->recv_worker_addrs = NULL; ctx->recv_worker_displs = NULL; ret = exchange_func(wpool->recv_waddr, wpool->recv_waddr_len, &ctx->recv_worker_addrs, &ctx->recv_worker_displs, exchange_metadata); if (ret != OPAL_SUCCESS) { goto error; } (*ctx_ptr) = ctx; return ret; error: OBJ_DESTRUCT(&ctx->mutex); OBJ_DESTRUCT(&ctx->tls_workers); free(ctx); (*ctx_ptr) = NULL; return ret; } OPAL_DECLSPEC void opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx) { int my_refcntr = -1; /* Application is expected to guarantee that no operation * is performed on the context that is being released */ /* Mark that this context was released by application * Threads will use this flag to perform deferred cleanup */ ctx->released = 1; /* Decrement the reference counter */ my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1); /* Make sure that all the loads/stores are complete */ opal_atomic_mb(); /* If there is no more references to this handler * we can release it */ if (0 == my_refcntr) { _common_ucx_wpctx_free(ctx); } } /* Final cleanup of the context structure * once all references cleared */ static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx) { free(ctx->recv_worker_addrs); free(ctx->recv_worker_displs); OBJ_DESTRUCT(&ctx->mutex); OBJ_DESTRUCT(&ctx->tls_workers); free(ctx); } /* Subscribe a new TLS to this context */ static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx, opal_common_ucx_winfo_t *winfo) { _ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t); if (NULL == item) { return OPAL_ERR_OUT_OF_RESOURCE; } /* Increment the reference counter */ OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1); /* Add new worker to the context */ item->ptr = winfo; opal_mutex_lock(&ctx->mutex); opal_list_append(&ctx->tls_workers, &item->super); opal_mutex_unlock(&ctx->mutex); return OPAL_SUCCESS; } /* Unsubscribe a particular TLS to this context */ static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, opal_common_ucx_winfo_t *winfo) { _ctx_record_list_item_t *item = NULL, *next; int my_refcntr = -1; opal_mutex_lock(&ctx->mutex); OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers, _ctx_record_list_item_t) { if (winfo == item->ptr) { opal_list_remove_item(&ctx->tls_workers, &item->super); opal_mutex_lock(&winfo->mutex); winfo->released = 1; opal_mutex_unlock(&winfo->mutex); OBJ_RELEASE(item); break; } } opal_mutex_unlock(&ctx->mutex); /* Make sure that all the loads/stores are complete */ opal_atomic_rmb(); /* Decrement the reference counter */ my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1); /* a counterpart to the rmb above */ opal_atomic_wmb(); if (0 == my_refcntr) { /* All references to this data structure were removed * We can safely release communication context structure */ _common_ucx_wpctx_free(ctx); } return; } /* ----------------------------------------------------------------------------- * Worker Pool Memory management functionality *----------------------------------------------------------------------------*/ OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx, void **mem_base, size_t mem_size, opal_common_ucx_mem_type_t mem_type, opal_common_ucx_exchange_func_t exchange_func, void *exchange_metadata, char **my_mem_addr, int *my_mem_addr_size, opal_common_ucx_wpmem_t **mem_ptr) { opal_common_ucx_wpmem_t *mem = calloc(1, sizeof(*mem)); void *rkey_addr = NULL; size_t rkey_addr_len; ucs_status_t status; int ret = OPAL_SUCCESS; mem->released = 0; mem->refcntr = 1; /* application holding this memory handler */ mem->ctx = ctx; mem->mem_addrs = NULL; mem->mem_displs = NULL; ret = _comm_ucx_wpmem_map(ctx->wpool, mem_base, mem_size, &mem->memh, mem_type); if (ret != OPAL_SUCCESS) { MCA_COMMON_UCX_VERBOSE(1, "_comm_ucx_mem_map failed: %d", ret); goto error_mem_map; } status = ucp_rkey_pack(ctx->wpool->ucp_ctx, mem->memh, &rkey_addr, &rkey_addr_len); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status); ret = OPAL_ERROR; goto error_rkey_pack; } ret = exchange_func(rkey_addr, rkey_addr_len, &mem->mem_addrs, &mem->mem_displs, exchange_metadata); if (ret != OPAL_SUCCESS) { goto error_rkey_pack; } /* Dont need the destructor here, will use * wpool-level destructor */ opal_tsd_key_create(&mem->mem_tls_key, NULL); (*mem_ptr) = mem; (*my_mem_addr) = rkey_addr; (*my_mem_addr_size) = rkey_addr_len; return ret; error_rkey_pack: ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh); error_mem_map: free(mem); (*mem_ptr) = NULL; return ret; } OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem) { int my_refcntr = -1; /* Mark that this memory handler has been called */ mem->released = 1; /* Decrement the reference counter */ my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1); /* Make sure that all the loads/stores are complete */ opal_atomic_wmb(); if (0 == my_refcntr) { _common_ucx_wpmem_free(mem); } return OPAL_SUCCESS; } static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool, void **base, size_t size, ucp_mem_h *memh_ptr, opal_common_ucx_mem_type_t mem_type) { ucp_mem_map_params_t mem_params; ucp_mem_attr_t mem_attrs; ucs_status_t status; int ret = OPAL_SUCCESS; memset(&mem_params, 0, sizeof(ucp_mem_map_params_t)); mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH | UCP_MEM_MAP_PARAM_FIELD_FLAGS; mem_params.length = size; if (mem_type == OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) { mem_params.address = NULL; mem_params.flags = UCP_MEM_MAP_ALLOCATE; } else { mem_params.address = (*base); } status = ucp_mem_map(wpool->ucp_ctx, &mem_params, memh_ptr); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_map failed: %d", status); ret = OPAL_ERROR; return ret; } mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH; status = ucp_mem_query((*memh_ptr), &mem_attrs); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_query failed: %d", status); ret = OPAL_ERROR; goto error; } assert(mem_attrs.length >= size); if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) { assert(mem_attrs.address == (*base)); } else { (*base) = mem_attrs.address; } return ret; error: ucp_mem_unmap(wpool->ucp_ctx, (*memh_ptr)); return ret; } static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem) { opal_tsd_key_delete(mem->mem_tls_key); free(mem->mem_addrs); free(mem->mem_displs); ucp_mem_unmap(mem->ctx->wpool->ucp_ctx, mem->memh); free(mem); } static int _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem) { /* Increment the reference counter */ OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, 1); return OPAL_SUCCESS; } static void _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem) { int my_refcntr = -1; /* Make sure that all the loads are complete at this * point so if somebody else will see refcntr ==0 * and release the structure we would have all we need */ opal_atomic_rmb(); /* Decrement the reference counter */ my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&mem->refcntr, -1); /* a counterpart to the rmb above */ opal_atomic_wmb(); if (0 == my_refcntr) { _common_ucx_wpmem_free(mem); } return; } /* ----------------------------------------------------------------------------- * Worker Pool TLS management functions management functionality *----------------------------------------------------------------------------*/ static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool) { _tlocal_table_t *tls = OBJ_NEW(_tlocal_table_t); if (tls == NULL) { // return OPAL_ERR_OUT_OF_RESOURCE return NULL; } tls->ctx_tbl = NULL; tls->ctx_tbl_size = 0; tls->mem_tbl = NULL; tls->mem_tbl_size = 0; /* Add this TLS to the global wpool structure for future * cleanup purposes */ tls->wpool = wpool; opal_mutex_lock(&wpool->mutex); opal_list_append(&wpool->tls_list, &tls->super); opal_mutex_unlock(&wpool->mutex); if(_tlocal_tls_ctxtbl_extend(tls, 4)){ MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool context table"); return NULL; } if(_tlocal_tls_memtbl_extend(tls, 4)) { MCA_COMMON_UCX_ERROR("Failed to allocate Worker Pool memory table"); return NULL; } opal_tsd_setspecific(wpool->tls_key, tls); return tls; } static inline _tlocal_table_t * _tlocal_get_tls(opal_common_ucx_wpool_t *wpool){ _tlocal_table_t *tls; int rc = opal_tsd_getspecific(wpool->tls_key, (void**)&tls); if (OPAL_SUCCESS != rc) { return NULL; } if (OPAL_UNLIKELY(NULL == tls)) { tls = _common_ucx_tls_init(wpool); } return tls; } static void _tlocal_cleanup(void *arg) { _tlocal_table_t *item = NULL, *next; _tlocal_table_t *tls = (_tlocal_table_t *)arg; opal_common_ucx_wpool_t *wpool = NULL; if (NULL == tls) { return; } wpool = tls->wpool; /* 1. Remove us from tls_list */ tls->wpool = wpool; opal_mutex_lock(&wpool->mutex); OPAL_LIST_FOREACH_SAFE(item, next, &wpool->tls_list, _tlocal_table_t) { if (item == tls) { opal_list_remove_item(&wpool->tls_list, &item->super); break; } } opal_mutex_unlock(&wpool->mutex); _common_ucx_tls_cleanup(tls); } // TODO: don't want to inline this function static void _common_ucx_tls_cleanup(_tlocal_table_t *tls) { size_t i, size; // Cleanup memory table size = tls->mem_tbl_size; for (i = 0; i < size; i++) { if (NULL != tls->mem_tbl[i]->gmem){ _tlocal_mem_record_cleanup(tls->mem_tbl[i]); } free(tls->mem_tbl[i]); } // Cleanup ctx table size = tls->ctx_tbl_size; for (i = 0; i < size; i++) { if (NULL != tls->ctx_tbl[i]->gctx){ assert(tls->ctx_tbl[i]->refcnt == 0); _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]); } free(tls->ctx_tbl[i]); } opal_tsd_setspecific(tls->wpool->tls_key, NULL); OBJ_RELEASE(tls); return; } static int _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append) { size_t i; size_t newsize = (tbl->ctx_tbl_size + append); tbl->ctx_tbl = realloc(tbl->ctx_tbl, newsize * sizeof(*tbl->ctx_tbl)); for (i = tbl->ctx_tbl_size; i < newsize; i++) { tbl->ctx_tbl[i] = calloc(1, sizeof(*tbl->ctx_tbl[i])); if (NULL == tbl->ctx_tbl[i]) { return OPAL_ERR_OUT_OF_RESOURCE; } } tbl->ctx_tbl_size = newsize; return OPAL_SUCCESS; } static int _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append) { size_t i; size_t newsize = (tbl->mem_tbl_size + append); tbl->mem_tbl = realloc(tbl->mem_tbl, newsize * sizeof(*tbl->mem_tbl)); for (i = tbl->mem_tbl_size; i < tbl->mem_tbl_size + append; i++) { tbl->mem_tbl[i] = calloc(1, sizeof(*tbl->mem_tbl[i])); if (NULL == tbl->mem_tbl[i]) { return OPAL_ERR_OUT_OF_RESOURCE; } } tbl->mem_tbl_size = newsize; return OPAL_SUCCESS; } static inline _tlocal_ctx_t * _tlocal_ctx_search(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx) { size_t i; for(i=0; ictx_tbl_size; i++) { if (tls->ctx_tbl[i]->gctx == ctx){ return tls->ctx_tbl[i]; } } return NULL; } static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec) { if (NULL == ctx_rec->gctx) { return OPAL_SUCCESS; } if (ctx_rec->refcnt > 0) { return OPAL_SUCCESS; } /* Remove myself from the communication context structure * This may result in context release as we are using * delayed cleanup */ _common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo); /* Erase the record so it can be reused */ memset(ctx_rec, 0, sizeof(*ctx_rec)); return OPAL_SUCCESS; } // TODO: Don't want to inline this (slow path) static _tlocal_ctx_t * _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx) { size_t i, free_idx = -1; int rc, found = 0; /* Try to find available record in the TLS table * In parallel perform deferred cleanups */ for (i=0; ictx_tbl_size; i++) { if (NULL != tls->ctx_tbl[i]->gctx && tls->ctx_tbl[i]->refcnt == 0) { if (tls->ctx_tbl[i]->gctx->released ) { /* Found dirty record, need to clean first */ _tlocal_ctx_record_cleanup(tls->ctx_tbl[i]); } } if ((NULL == tls->ctx_tbl[i]->gctx) && !found) { /* Found clean record */ free_idx = i; found = 1; } } /* if needed - extend the table */ if (!found) { free_idx = tls->ctx_tbl_size; rc = _tlocal_tls_ctxtbl_extend(tls, 4); if (rc) { //TODO: error out return NULL; } } tls->ctx_tbl[free_idx]->gctx = ctx; tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size); if (NULL == tls->ctx_tbl[free_idx]->winfo) { MCA_COMMON_UCX_ERROR("Failed to allocate new worker"); return NULL; } /* Make sure that we completed all the data structures before * placing the item to the list * NOTE: essentially we don't need this as list append is an * operation protected by mutex */ opal_atomic_wmb(); /* Add this worker to the active worker list */ _wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo); /* add this worker into the context list */ rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo); if (rc) { //TODO: error out return NULL; } /* All good - return the record */ return tls->ctx_tbl[free_idx]; } static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target) { ucp_ep_params_t ep_params; opal_common_ucx_winfo_t *winfo = ctx_rec->winfo; opal_common_ucx_ctx_t *gctx = ctx_rec->gctx; ucs_status_t status; int displ; memset(&ep_params, 0, sizeof(ucp_ep_params_t)); ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; opal_mutex_lock(&winfo->mutex); displ = gctx->recv_worker_displs[target]; ep_params.address = (ucp_address_t *)&(gctx->recv_worker_addrs[displ]); status = ucp_ep_create(winfo->worker, &ep_params, &winfo->endpoints[target]); if (status != UCS_OK) { opal_mutex_unlock(&winfo->mutex); MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_create failed: %d", status); return OPAL_ERROR; } opal_mutex_unlock(&winfo->mutex); return OPAL_SUCCESS; } /* TLS memory management */ static inline _tlocal_mem_t * _tlocal_search_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *gmem) { size_t i; for(i=0; imem_tbl_size; i++) { if( tls->mem_tbl[i]->gmem == gmem){ return tls->mem_tbl[i]; } } return NULL; } static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec) { size_t i; for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) { if (mem_rec->mem->rkeys[i]) { ucp_rkey_destroy(mem_rec->mem->rkeys[i]); } } free(mem_rec->mem->rkeys); /* Remove myself from the memory context structure * This may result in context release as we are using * delayed cleanup */ _common_ucx_mem_signout(mem_rec->gmem); /* Release fast-path pointers */ if (NULL != mem_rec->mem_tls_ptr) { free(mem_rec->mem_tls_ptr); } assert(mem_rec->ctx_rec != NULL); OPAL_ATOMIC_ADD_FETCH32(&mem_rec->ctx_rec->refcnt, -1); assert(mem_rec->ctx_rec->refcnt >= 0); free(mem_rec->mem); memset(mem_rec, 0, sizeof(*mem_rec)); } static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls, opal_common_ucx_wpmem_t *mem) { size_t i, free_idx = -1; _tlocal_ctx_t *ctx_rec = NULL; int rc = OPAL_SUCCESS, found = 0; /* Try to find available spot in the table */ for (i=0; imem_tbl_size; i++) { if (NULL != tls->mem_tbl[i]->gmem) { if (tls->mem_tbl[i]->gmem->released) { /* Found a dirty record. Need to clean it first */ _tlocal_mem_record_cleanup(tls->mem_tbl[i]); } } if ((NULL == tls->mem_tbl[i]->gmem) && !found) { /* Found a clear record */ free_idx = i; found = 1; } } if (!found){ free_idx = tls->mem_tbl_size; rc = _tlocal_tls_memtbl_extend(tls, 4); if (rc != OPAL_SUCCESS) { //TODO: error out return NULL; } } tls->mem_tbl[free_idx]->gmem = mem; tls->mem_tbl[free_idx]->mem = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem)); ctx_rec = _tlocal_ctx_search(tls, mem->ctx); if (NULL == ctx_rec) { // TODO: act accordingly - cleanup return NULL; } tls->mem_tbl[free_idx]->ctx_rec = ctx_rec; OPAL_ATOMIC_ADD_FETCH32(&ctx_rec->refcnt, 1); tls->mem_tbl[free_idx]->mem->worker = ctx_rec->winfo; tls->mem_tbl[free_idx]->mem->rkeys = calloc(mem->ctx->comm_size, sizeof(*tls->mem_tbl[free_idx]->mem->rkeys)); tls->mem_tbl[free_idx]->mem_tls_ptr = calloc(1, sizeof(*tls->mem_tbl[free_idx]->mem_tls_ptr)); tls->mem_tbl[free_idx]->mem_tls_ptr->winfo = ctx_rec->winfo; tls->mem_tbl[free_idx]->mem_tls_ptr->rkeys = tls->mem_tbl[free_idx]->mem->rkeys; opal_tsd_setspecific(mem->mem_tls_key, tls->mem_tbl[free_idx]->mem_tls_ptr); /* Make sure that we completed all the data structures before * placing the item to the list * NOTE: essentially we don't need this as list append is an * operation protected by mutex */ opal_atomic_wmb(); rc = _common_ucx_wpmem_signup(mem); if (rc) { // TODO: error handling return NULL; } return tls->mem_tbl[free_idx]; } static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target) { _mem_info_t *minfo = mem_rec->mem; opal_common_ucx_wpmem_t *gmem = mem_rec->gmem; int displ = gmem->mem_displs[target]; ucs_status_t status; status = ucp_ep_rkey_unpack(ep, &gmem->mem_addrs[displ], &minfo->rkeys[target]); if (status != UCS_OK) { MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", status); return OPAL_ERROR; } return OPAL_SUCCESS; } /* Get the TLS in case of slow path (not everything has been yet initialized */ OPAL_DECLSPEC int opal_common_ucx_tlocal_fetch_spath(opal_common_ucx_wpmem_t *mem, int target) { _tlocal_table_t *tls = NULL; _tlocal_ctx_t *ctx_rec = NULL; opal_common_ucx_winfo_t *winfo = NULL; _tlocal_mem_t *mem_rec = NULL; _mem_info_t *mem_info = NULL; ucp_ep_h ep; int rc = OPAL_SUCCESS; tls = _tlocal_get_tls(mem->ctx->wpool); /* Obtain the worker structure */ ctx_rec = _tlocal_ctx_search(tls, mem->ctx); if (OPAL_UNLIKELY(NULL == ctx_rec)) { ctx_rec = _tlocal_add_ctx(tls, mem->ctx); if (NULL == ctx_rec) { return OPAL_ERR_OUT_OF_RESOURCE; } } winfo = ctx_rec->winfo; /* Obtain the endpoint */ if (OPAL_UNLIKELY(NULL == winfo->endpoints[target])) { rc = _tlocal_ctx_connect(ctx_rec, target); if (rc != OPAL_SUCCESS) { return rc; } } ep = winfo->endpoints[target]; /* Obtain the memory region info */ mem_rec = _tlocal_search_mem(tls, mem); if (OPAL_UNLIKELY(mem_rec == NULL)) { mem_rec = _tlocal_add_mem(tls, mem); if (NULL == mem_rec) { return OPAL_ERR_OUT_OF_RESOURCE; } } mem_info = mem_rec->mem; /* Obtain the rkey */ if (OPAL_UNLIKELY(NULL == mem_info->rkeys[target])) { /* Create the rkey */ rc = _tlocal_mem_create_rkey(mem_rec, ep, target); if (rc) { return rc; } } return OPAL_SUCCESS; } OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target, opal_common_ucx_flush_type_t type, opal_common_ucx_flush_scope_t scope, ucs_status_ptr_t *req_ptr) { ucs_status_ptr_t req; ucs_status_t status = UCS_OK; int rc = OPAL_SUCCESS; #if HAVE_DECL_UCP_EP_FLUSH_NB if (scope == OPAL_COMMON_UCX_SCOPE_EP) { req = ucp_ep_flush_nb(winfo->endpoints[target], 0, opal_common_ucx_empty_complete_cb); } else { req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_empty_complete_cb); } if (UCS_PTR_IS_PTR(req)) { ((opal_common_ucx_request_t *)req)->winfo = winfo; } if(OPAL_COMMON_UCX_FLUSH_B) { rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb"); } else { *req_ptr = req; } return rc; #endif switch (type) { case OPAL_COMMON_UCX_FLUSH_NB_PREFERRED: case OPAL_COMMON_UCX_FLUSH_B: if (scope == OPAL_COMMON_UCX_SCOPE_EP) { status = ucp_ep_flush(winfo->endpoints[target]); } else { status = ucp_worker_flush(winfo->worker); } rc = (status == UCS_OK) ? OPAL_SUCCESS : OPAL_ERROR; case OPAL_COMMON_UCX_FLUSH_NB: default: rc = OPAL_ERROR; } return rc; } OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem, opal_common_ucx_flush_scope_t scope, int target) { _ctx_record_list_item_t *item; opal_common_ucx_ctx_t *ctx = mem->ctx; int rc = OPAL_SUCCESS; opal_mutex_lock(&ctx->mutex); OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) { if ((scope == OPAL_COMMON_UCX_SCOPE_EP) && (NULL == item->ptr->endpoints[target])) { continue; } opal_mutex_lock(&item->ptr->mutex); rc = opal_common_ucx_winfo_flush(item->ptr, target, OPAL_COMMON_UCX_FLUSH_B, scope, NULL); switch (scope) { case OPAL_COMMON_UCX_SCOPE_WORKER: item->ptr->global_inflight_ops = 0; memset(item->ptr->inflight_ops, 0, item->ptr->comm_size * sizeof(short)); break; case OPAL_COMMON_UCX_SCOPE_EP: item->ptr->global_inflight_ops -= item->ptr->inflight_ops[target]; item->ptr->inflight_ops[target] = 0; break; } opal_mutex_unlock(&item->ptr->mutex); if (rc != OPAL_SUCCESS) { MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", rc); rc = OPAL_ERROR; } } opal_mutex_unlock(&ctx->mutex); return rc; } OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) { /* TODO */ return OPAL_SUCCESS; } OPAL_DECLSPEC void opal_common_ucx_req_init(void *request) { opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request; req->ext_req = NULL; req->ext_cb = NULL; req->winfo = NULL; } OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status) { opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request; if (req->ext_cb != NULL) { (*req->ext_cb)(req->ext_req); } ucp_request_release(req); }