1
1

opal/common/ucx: Complete initialization of the Worker Pool

Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
Xin Zhao 2018-11-26 11:52:30 -08:00 коммит произвёл Artem Polyakov
родитель e28fadb048
Коммит bfbf818fe1
2 изменённых файлов: 169 добавлений и 4 удалений

Просмотреть файл

@ -129,7 +129,89 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
ucp_request_init_callback_t req_init_ptr,
size_t req_size, bool enable_mt)
{
ucp_config_t *config = NULL;
ucp_params_t context_params;
opal_common_ucx_winfo_t *winfo;
ucs_status_t status;
int rc = OPAL_SUCCESS;
wpool->refcnt++;
if (1 < wpool->refcnt) {
return rc;
}
wpool->cur_ctxid = wpool->cur_memid = 0;
OBJ_CONSTRUCT(&wpool->mutex, opal_mutex_t);
OBJ_CONSTRUCT(&wpool->tls_list, opal_list_t);
status = ucp_config_read("MPI", NULL, &config);
if (UCS_OK != status) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_config_read failed: %d", status);
return OPAL_ERROR;
}
/* initialize UCP context */
memset(&context_params, 0, sizeof(context_params));
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
UCP_PARAM_FIELD_REQUEST_INIT |
UCP_PARAM_FIELD_REQUEST_SIZE;
context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 |
UCP_FEATURE_AMO64;
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
context_params.estimated_num_eps = proc_world_size;
context_params.request_init = req_init_ptr;
context_params.request_size = req_size;
status = ucp_init(&context_params, config, &wpool->ucp_ctx);
ucp_config_release(config);
if (UCS_OK != status) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_init failed: %d", status);
rc = OPAL_ERROR;
goto err_ucp_init;
}
/* create recv worker and add to idle pool */
OBJ_CONSTRUCT(&wpool->idle_workers, opal_list_t);
OBJ_CONSTRUCT(&wpool->active_workers, opal_list_t);
winfo = _winfo_create(wpool);
if (NULL == winfo) {
MCA_COMMON_UCX_ERROR("Failed to create receive worker");
rc = OPAL_ERROR;
goto err_worker_create;
}
wpool->recv_worker = winfo->worker;
status = ucp_worker_get_address(wpool->recv_worker,
&wpool->recv_waddr, &wpool->recv_waddr_len);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
rc = OPAL_ERROR;
goto err_get_addr;
}
rc = _wpool_list_put(wpool, &wpool->idle_workers, winfo);
if (rc) {
goto err_wpool_add;
}
pthread_key_create(&wpool->tls_key, _tlocal_cleanup);
DBG_OUT("opal_common_ucx_wpool_init: wpool = %p\n", (void *)wpool);
return rc;
err_wpool_add:
free(wpool->recv_waddr);
err_get_addr:
if (NULL != wpool->recv_worker) {
ucp_worker_destroy(wpool->recv_worker);
}
err_worker_create:
ucp_cleanup(wpool->ucp_ctx);
err_ucp_init:
return rc;
}
@ -466,6 +548,9 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
opal_common_ucx_wpmem_t **mem_ptr)
{
opal_common_ucx_wpmem_t *mem = calloc(1, sizeof(*mem));
void *rkey_addr = NULL;
size_t rkey_addr_len;
ucs_status_t status;
int ret = OPAL_SUCCESS;
mem->mem_id = OPAL_ATOMIC_ADD_FETCH32(&ctx->wpool->cur_memid, 1);
@ -481,6 +566,34 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
ret = _comm_ucx_wpmem_map(ctx->wpool, mem_base, mem_size, &mem->memh,
mem_type);
if (ret != OPAL_SUCCESS) {
MCA_COMMON_UCX_VERBOSE(1, "_comm_ucx_mem_map failed: %d", ret);
goto error_mem_map;
}
DBG_OUT("opal_common_ucx_mem_create: base = %p, memh = %p\n",
(void *)(*mem_base), (void *)(mem->memh));
status = ucp_rkey_pack(ctx->wpool->ucp_ctx, mem->memh,
&rkey_addr, &rkey_addr_len);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_rkey_pack failed: %d", status);
ret = OPAL_ERROR;
goto error_rkey_pack;
}
DBG_OUT("opal_common_ucx_mem_create: rkey_addr = %p, rkey_addr_len = %d\n",
(void *)rkey_addr, (int)rkey_addr_len);
ret = exchange_func(rkey_addr, rkey_addr_len,
&mem->mem_addrs, &mem->mem_displs, exchange_metadata);
DBG_OUT("opal_common_ucx_mem_create: rkey_addr = %p, rkey_addr_len = %d "
"mem_addrs = %p mem_displs = %p\n",
(void*)rkey_addr, (int)rkey_addr_len, (void *)mem->mem_addrs,
(void*)mem->mem_displs);
ucp_rkey_buffer_release(rkey_addr);
if (ret != OPAL_SUCCESS) {
goto error_rkey_pack;
}
/* Dont need the destructor here, will use
* wpool-level destructor */
@ -490,6 +603,15 @@ int opal_common_ucx_wpmem_create(opal_common_ucx_ctx_t *ctx,
DBG_OUT("opal_common_ucx_mem_create(end): mem = %p\n", (void *)mem);
return ret;
error_rkey_pack:
ucp_mem_unmap(ctx->wpool->ucp_ctx, mem->memh);
error_mem_map:
OBJ_DESTRUCT(&mem->mutex);
OBJ_DESTRUCT(&mem->registrations);
free(mem);
(*mem_ptr) = NULL;
return ret;
}
OPAL_DECLSPEC int
@ -521,7 +643,52 @@ static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
void **base, size_t size, ucp_mem_h *memh_ptr,
opal_common_ucx_mem_type_t mem_type)
{
ucp_mem_map_params_t mem_params;
ucp_mem_attr_t mem_attrs;
ucs_status_t status;
int ret = OPAL_SUCCESS;
memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
UCP_MEM_MAP_PARAM_FIELD_LENGTH |
UCP_MEM_MAP_PARAM_FIELD_FLAGS;
mem_params.length = size;
if (mem_type == OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
mem_params.address = NULL;
mem_params.flags = UCP_MEM_MAP_ALLOCATE;
} else {
mem_params.address = (*base);
}
status = ucp_mem_map(wpool->ucp_ctx, &mem_params, memh_ptr);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_map failed: %d", status);
ret = OPAL_ERROR;
return ret;
}
DBG_OUT("_comm_ucx_mem_map(after ucp_mem_map): memh = %p\n", (void *)(*memh_ptr));
mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH;
status = ucp_mem_query((*memh_ptr), &mem_attrs);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_mem_query failed: %d", status);
ret = OPAL_ERROR;
goto error;
}
DBG_OUT("_comm_ucx_mem_map(after ucp_mem_query): memh = %p\n", (void *)(*memh_ptr));
assert(mem_attrs.length >= size);
if (mem_type != OPAL_COMMON_UCX_MEM_ALLOCATE_MAP) {
assert(mem_attrs.address == (*base));
} else {
(*base) = mem_attrs.address;
}
DBG_OUT("_comm_ucx_mem_map(end): wpool = %p, addr = %p size = %d memh = %p\n",
(void *)wpool, (void *)(*base), (int)size, (void *)(*memh_ptr));
return ret;
error:
ucp_mem_unmap(wpool->ucp_ctx, (*memh_ptr));
return ret;
}
@ -632,7 +799,6 @@ _tlocal_get_tls(opal_common_ucx_wpool_t *wpool){
return tls;
}
/*
static void _tlocal_cleanup(void *arg)
{
_tlocal_table_t *item = NULL, *next;
@ -646,7 +812,7 @@ static void _tlocal_cleanup(void *arg)
}
wpool = tls->wpool;
*//* 1. Remove us from tls_list *//*
/* 1. Remove us from tls_list */
tls->wpool = wpool;
opal_mutex_lock(&wpool->mutex);
OPAL_LIST_FOREACH_SAFE(item, next, &wpool->tls_list, _tlocal_table_t) {
@ -658,7 +824,6 @@ static void _tlocal_cleanup(void *arg)
opal_mutex_unlock(&wpool->mutex);
_common_ucx_tls_cleanup(tls);
}
*/
// TODO: don't want to inline this function
static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)

Просмотреть файл

@ -74,7 +74,7 @@ static int _tlocal_mem_create_rkey(_tlocal_mem_t *mem_rec, ucp_ep_h ep, int targ
static void _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec);
//static void _tlocal_cleanup(void *arg);
static void _tlocal_cleanup(void *arg);
/* Sorted declarations */