Merge pull request #7681 from janjust/master-tls-refactor_v3
ompi/osc/ucx: remove global TLS tables
Этот коммит содержится в:
Коммит
dfb0ae748f
@ -639,7 +639,6 @@ int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
|
||||
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int insert, contain;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
assert(module->state.dynamic_win_count > 0);
|
||||
|
||||
@ -655,7 +654,7 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
|
||||
module->local_dynamic_win_info[contain].refcnt--;
|
||||
if (module->local_dynamic_win_info[contain].refcnt == 0) {
|
||||
ret = opal_common_ucx_wpmem_free(module->local_dynamic_win_info[contain].mem);
|
||||
opal_common_ucx_wpmem_free(module->local_dynamic_win_info[contain].mem);
|
||||
memmove((void *)&(module->local_dynamic_win_info[contain]),
|
||||
(void *)&(module->local_dynamic_win_info[contain+1]),
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
|
||||
@ -666,7 +665,7 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
module->state.dynamic_win_count--;
|
||||
}
|
||||
|
||||
return ret;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
@ -691,19 +690,14 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
free(module->addrs);
|
||||
free(module->state_addrs);
|
||||
|
||||
ret = opal_common_ucx_wpmem_free(module->state_mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = opal_common_ucx_wpmem_free(module->mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
opal_common_ucx_wpmem_free(module->state_mem);
|
||||
opal_common_ucx_wpmem_free(module->mem);
|
||||
|
||||
opal_common_ucx_wpctx_release(module->ctx);
|
||||
|
||||
if (module->disp_units) free(module->disp_units);
|
||||
if (module->disp_units) {
|
||||
free(module->disp_units);
|
||||
}
|
||||
ompi_comm_free(&module->comm);
|
||||
|
||||
free(module);
|
||||
|
@ -117,7 +117,6 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *
|
||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||
|
||||
|
||||
/**
|
||||
* Load an integer value of \c size bytes from \c ptr and cast it to uint64_t.
|
||||
*/
|
||||
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -38,7 +38,7 @@ BEGIN_C_DECLS
|
||||
typedef struct {
|
||||
/* Ref counting & locking*/
|
||||
int refcnt;
|
||||
opal_recursive_mutex_t mutex;
|
||||
opal_mutex_t mutex;
|
||||
|
||||
/* UCX data */
|
||||
ucp_context_h ucp_ctx;
|
||||
@ -46,15 +46,9 @@ typedef struct {
|
||||
ucp_address_t *recv_waddr;
|
||||
size_t recv_waddr_len;
|
||||
|
||||
/* Thread-local key to allow each thread to have
|
||||
* local information assisiated with this wpool */
|
||||
opal_tsd_tracked_key_t tls_key;
|
||||
|
||||
/* Bookkeeping information */
|
||||
opal_list_t idle_workers;
|
||||
opal_list_t active_workers;
|
||||
|
||||
opal_list_t tls_list;
|
||||
} opal_common_ucx_wpool_t;
|
||||
|
||||
/* Worker Pool Context (wpctx) is an object that is comprised of a set of UCP
|
||||
@ -67,16 +61,19 @@ typedef struct {
|
||||
* Context is bound to a particular Worker Pool object.
|
||||
*/
|
||||
typedef struct {
|
||||
opal_recursive_mutex_t mutex;
|
||||
opal_atomic_int32_t refcntr;
|
||||
opal_mutex_t mutex;
|
||||
|
||||
/* the reference to a Worker pool this context belongs to*/
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
/* A list of references to TLS context records
|
||||
* we need to keep track of them to have an ability to
|
||||
* let thread know that this context is no longer valid */
|
||||
opal_list_t tls_workers;
|
||||
volatile int released;
|
||||
|
||||
/* A list of context records
|
||||
* We need to keep a track of allocated context records so
|
||||
* that we can free them at the end if thread fails to release context record */
|
||||
opal_list_t ctx_records;
|
||||
|
||||
/* Thread-local key to allow each thread to have
|
||||
* local information associated with this wpctx */
|
||||
opal_tsd_tracked_key_t tls_key;
|
||||
|
||||
/* UCX addressing information */
|
||||
char *recv_worker_addrs;
|
||||
@ -92,22 +89,25 @@ typedef struct {
|
||||
* be possible to have one context for multiple windows.
|
||||
*/
|
||||
typedef struct {
|
||||
opal_mutex_t mutex;
|
||||
|
||||
/* reference context to which memory region belongs */
|
||||
opal_common_ucx_ctx_t *ctx;
|
||||
|
||||
/* object lifetime control */
|
||||
volatile int released;
|
||||
opal_atomic_int32_t refcntr;
|
||||
|
||||
/* UCX memory handler */
|
||||
ucp_mem_h memh;
|
||||
char *mem_addrs;
|
||||
int *mem_displs;
|
||||
|
||||
/* A list of mem records
|
||||
* We need to kepp trakc o fallocated memory records so that we can free them at the end
|
||||
* if a thread fails to release the memory record */
|
||||
opal_list_t mem_records;
|
||||
|
||||
/* TLS item that allows each thread to
|
||||
* store endpoints and rkey arrays
|
||||
* for faster access */
|
||||
opal_tsd_tracked_key_t mem_tls_key;
|
||||
opal_tsd_tracked_key_t tls_key;
|
||||
} opal_common_ucx_wpmem_t;
|
||||
|
||||
/* The structure that wraps UCP worker and holds the state that is required
|
||||
@ -117,8 +117,8 @@ typedef struct {
|
||||
* One wpmem is intended per shared memory segment (i.e. MPI Window).
|
||||
*/
|
||||
typedef struct opal_common_ucx_winfo {
|
||||
opal_list_item_t super;
|
||||
opal_recursive_mutex_t mutex;
|
||||
volatile int released;
|
||||
ucp_worker_h worker;
|
||||
ucp_ep_h *endpoints;
|
||||
size_t comm_size;
|
||||
@ -126,11 +126,7 @@ typedef struct opal_common_ucx_winfo {
|
||||
short global_inflight_ops;
|
||||
ucs_status_ptr_t inflight_req;
|
||||
} opal_common_ucx_winfo_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
ucp_rkey_h *rkeys;
|
||||
} opal_common_ucx_tlocal_fast_ptrs_t;
|
||||
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);
|
||||
|
||||
typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
|
||||
|
||||
@ -165,11 +161,26 @@ typedef enum {
|
||||
OPAL_COMMON_UCX_MEM_MAP
|
||||
} opal_common_ucx_mem_type_t;
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
} _ctx_record_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
ucp_rkey_h *rkeys;
|
||||
_ctx_record_t *ctx_rec;
|
||||
} _mem_record_t;
|
||||
OBJ_CLASS_DECLARATION(_mem_record_t);
|
||||
|
||||
typedef int (*opal_common_ucx_exchange_func_t)(void *my_info, size_t my_info_len,
|
||||
char **recv_info, int **disps,
|
||||
void *metadata);
|
||||
|
||||
|
||||
/* Manage Worker Pool (wpool) */
|
||||
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
|
||||
@ -196,34 +207,36 @@ opal_common_ucx_tlocal_fetch(opal_common_ucx_wpmem_t *mem, int target,
|
||||
ucp_ep_h *_ep, ucp_rkey_h *_rkey,
|
||||
opal_common_ucx_winfo_t **_winfo)
|
||||
{
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *fp = NULL;
|
||||
int expr;
|
||||
_mem_record_t *mem_rec = NULL;
|
||||
int is_ready;
|
||||
int rc = OPAL_SUCCESS;
|
||||
|
||||
/* First check the fast-path */
|
||||
rc = opal_tsd_tracked_key_get(&mem->mem_tls_key, (void**)&fp);
|
||||
rc = opal_tsd_tracked_key_get(&mem->tls_key, (void**)&mem_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
expr = fp && (NULL != fp->winfo) && (fp->winfo->endpoints[target]) &&
|
||||
(NULL != fp->rkeys[target]);
|
||||
if (OPAL_UNLIKELY(!expr)) {
|
||||
is_ready = mem_rec && (mem_rec->winfo->endpoints[target]) &&
|
||||
(NULL != mem_rec->rkeys[target]);
|
||||
MCA_COMMON_UCX_ASSERT((NULL == mem_rec) || (NULL != mem_rec->winfo));
|
||||
if (OPAL_UNLIKELY(!is_ready)) {
|
||||
rc = opal_common_ucx_tlocal_fetch_spath(mem, target);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
rc = opal_tsd_tracked_key_get(&mem->mem_tls_key, (void**)&fp);
|
||||
rc = opal_tsd_tracked_key_get(&mem->tls_key, (void**)&mem_rec);
|
||||
if (OPAL_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
MCA_COMMON_UCX_ASSERT(fp && (NULL != fp->winfo) &&
|
||||
(fp->winfo->endpoints[target])
|
||||
&& (NULL != fp->rkeys[target]));
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->winfo);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->winfo->endpoints[target]);
|
||||
MCA_COMMON_UCX_ASSERT(NULL != mem_rec->rkeys[target]);
|
||||
|
||||
*_rkey = fp->rkeys[target];
|
||||
*_winfo = fp->winfo;
|
||||
*_ep = fp->winfo->endpoints[target];
|
||||
*_rkey = mem_rec->rkeys[target];
|
||||
*_winfo = mem_rec->winfo;
|
||||
*_ep = mem_rec->winfo->endpoints[target];
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
@ -236,7 +249,7 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
|
||||
char **my_mem_addr,
|
||||
int *my_mem_addr_size,
|
||||
opal_common_ucx_wpmem_t **mem_ptr);
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
|
||||
opal_common_ucx_flush_scope_t scope,
|
||||
|
@ -5,108 +5,18 @@
|
||||
#include "common_ucx.h"
|
||||
#include "common_ucx_wpool.h"
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
opal_atomic_int32_t refcnt;
|
||||
} _tlocal_ctx_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_winfo_t *worker;
|
||||
ucp_rkey_h *rkeys;
|
||||
} _mem_info_t;
|
||||
|
||||
typedef struct {
|
||||
opal_common_ucx_wpmem_t *gmem;
|
||||
_mem_info_t *mem;
|
||||
opal_common_ucx_tlocal_fast_ptrs_t *mem_tls_ptr;
|
||||
_tlocal_ctx_t *ctx_rec;
|
||||
} _tlocal_mem_t;
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _winfo_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_winfo_list_item_t);
|
||||
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _ctx_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_list_item_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
_tlocal_mem_t *ptr;
|
||||
} _mem_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_mem_record_list_item_t);
|
||||
|
||||
/* thread-local table */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
_tlocal_ctx_t **ctx_tbl;
|
||||
size_t ctx_tbl_size;
|
||||
_tlocal_mem_t **mem_tbl;
|
||||
size_t mem_tbl_size;
|
||||
} _tlocal_table_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(_tlocal_table_t);
|
||||
|
||||
static int _tlocal_tls_ctxtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static int _tlocal_tls_memtbl_extend(_tlocal_table_t *tbl, size_t append);
|
||||
static _tlocal_table_t* _common_ucx_tls_init(opal_common_ucx_wpool_t *wpool);
|
||||
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls);
|
||||
static inline _tlocal_ctx_t *_tlocal_ctx_search(_tlocal_table_t *tls,
|
||||
opal_common_ucx_ctx_t *ctx);
|
||||
static int _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec);
|
||||
static _tlocal_ctx_t *_tlocal_add_ctx(_tlocal_table_t *tls,
|
||||
opal_common_ucx_ctx_t *ctx);
|
||||
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx, int target);
|
||||
static inline _tlocal_mem_t *_tlocal_search_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *gmem);
|
||||
static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
|
||||
opal_common_ucx_wpmem_t *mem);
|
||||
static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int target);
|
||||
// TOD: Return the error from it
|
||||
static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec);
|
||||
static void _tlocal_cleanup(void *arg);
|
||||
static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target);
|
||||
static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int target);
|
||||
|
||||
/* Sorted declarations */
|
||||
|
||||
|
||||
/* Internal Worker Information (winfo) management */
|
||||
static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool);
|
||||
static void _winfo_release(opal_common_ucx_winfo_t *winfo);
|
||||
static void _winfo_reset(opal_common_ucx_winfo_t *winfo);
|
||||
static void _winfo_destructor(opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool (wpool) management */
|
||||
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static int _wpool_list_put(opal_common_ucx_wpool_t *wpool, opal_list_t *list,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static opal_common_ucx_winfo_t *_wpool_list_get(opal_common_ucx_wpool_t *wpool,
|
||||
opal_list_t *list);
|
||||
static opal_common_ucx_winfo_t *_wpool_get_idle(opal_common_ucx_wpool_t *wpool,
|
||||
size_t comm_size);
|
||||
static int _wpool_add_active(opal_common_ucx_wpool_t *wpool,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool Context management */
|
||||
static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx);
|
||||
static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool Memeory management */
|
||||
/* Internal Worker Pool Memory management */
|
||||
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
opal_common_ucx_mem_type_t mem_type);
|
||||
static void _common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
|
||||
static int _common_ucx_wpmem_signup(opal_common_ucx_wpmem_t *mem);
|
||||
static void _common_ucx_mem_signout(opal_common_ucx_wpmem_t *mem);
|
||||
|
||||
|
||||
#endif // COMMON_UCX_WPOOL_INT_H
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user