Merge pull request #6492 from janjust/oshmem-multiple-contexts-master
Oshmem multiple contexts
Этот коммит содержится в:
Коммит
9ab6ecba65
@ -153,6 +153,11 @@ void opal_common_ucx_mca_proc_added(void)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
|
||||||
|
{
|
||||||
|
return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced);
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
|
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
|
||||||
{
|
{
|
||||||
volatile int fenced = 0;
|
volatile int fenced = 0;
|
||||||
@ -182,9 +187,8 @@ static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
|
size_t my_rank, size_t max_disconnect, ucp_worker_h worker) {
|
||||||
{
|
|
||||||
size_t num_reqs;
|
size_t num_reqs;
|
||||||
size_t max_reqs;
|
size_t max_reqs;
|
||||||
void *dreq, **dreqs;
|
void *dreq, **dreqs;
|
||||||
@ -232,10 +236,14 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
|
|||||||
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
|
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
|
||||||
free(dreqs);
|
free(dreqs);
|
||||||
|
|
||||||
if (OPAL_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(worker))) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
return OPAL_SUCCESS;
|
return OPAL_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||||
|
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
|
||||||
|
{
|
||||||
|
opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker);
|
||||||
|
|
||||||
|
return opal_common_ucx_mca_pmix_fence(worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -105,8 +105,11 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
|
|||||||
OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
|
OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
|
||||||
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
|
||||||
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
|
||||||
|
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced);
|
||||||
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||||
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||||
|
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
|
||||||
|
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
|
||||||
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
|
||||||
|
|
||||||
static inline
|
static inline
|
||||||
|
@ -149,7 +149,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr,
|
|||||||
void* src_addr,
|
void* src_addr,
|
||||||
int dst);
|
int dst);
|
||||||
|
|
||||||
static void mca_spml_ikrit_cache_mkeys(shmem_ctx_t ctx, sshmem_mkey_t *, uint32_t seg, int remote_pe, int tr_id);
|
static void mca_spml_ikrit_cache_mkeys(sshmem_mkey_t *, uint32_t seg, int remote_pe, int tr_id);
|
||||||
|
|
||||||
static mxm_mem_key_t *mca_spml_ikrit_get_mkey_slow(int pe, void *va, int ptl_id, void **rva);
|
static mxm_mem_key_t *mca_spml_ikrit_get_mkey_slow(int pe, void *va, int ptl_id, void **rva);
|
||||||
|
|
||||||
@ -185,7 +185,7 @@ mca_spml_ikrit_t mca_spml_ikrit = {
|
|||||||
mca_spml_ikrit_get_mkey_slow
|
mca_spml_ikrit_get_mkey_slow
|
||||||
};
|
};
|
||||||
|
|
||||||
static void mca_spml_ikrit_cache_mkeys(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t seg, int dst_pe, int tr_id)
|
static void mca_spml_ikrit_cache_mkeys(sshmem_mkey_t *mkey, uint32_t seg, int dst_pe, int tr_id)
|
||||||
{
|
{
|
||||||
mxm_peer_t *peer;
|
mxm_peer_t *peer;
|
||||||
|
|
||||||
@ -504,7 +504,7 @@ sshmem_mkey_t *mca_spml_ikrit_register(void* addr,
|
|||||||
my_rank, i, addr, (unsigned long long)size,
|
my_rank, i, addr, (unsigned long long)size,
|
||||||
mca_spml_base_mkey2str(&mkeys[i]));
|
mca_spml_base_mkey2str(&mkeys[i]));
|
||||||
|
|
||||||
mca_spml_ikrit_cache_mkeys(NULL, &mkeys[i], memheap_find_segnum(addr), my_rank, i);
|
mca_spml_ikrit_cache_mkeys(&mkeys[i], memheap_find_segnum(addr), my_rank, i);
|
||||||
}
|
}
|
||||||
*count = MXM_PTL_LAST;
|
*count = MXM_PTL_LAST;
|
||||||
|
|
||||||
@ -548,7 +548,7 @@ int mca_spml_ikrit_deregister(sshmem_mkey_t *mkeys)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys)
|
int mca_spml_ikrit_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys)
|
||||||
{
|
{
|
||||||
int ptl;
|
int ptl;
|
||||||
|
|
||||||
@ -567,7 +567,7 @@ int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_m
|
|||||||
mkeys[ptl].len = 0;
|
mkeys[ptl].len = 0;
|
||||||
mkeys[ptl].va_base = mca_memheap_seg2base_va(seg);
|
mkeys[ptl].va_base = mca_memheap_seg2base_va(seg);
|
||||||
mkeys[ptl].u.key = MAP_SEGMENT_SHM_INVALID;
|
mkeys[ptl].u.key = MAP_SEGMENT_SHM_INVALID;
|
||||||
mca_spml_ikrit_cache_mkeys(NULL, &mkeys[ptl], seg, pe, ptl);
|
mca_spml_ikrit_cache_mkeys(&mkeys[ptl], seg, pe, ptl);
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ extern sshmem_mkey_t *mca_spml_ikrit_register(void* addr,
|
|||||||
uint64_t shmid,
|
uint64_t shmid,
|
||||||
int *count);
|
int *count);
|
||||||
extern int mca_spml_ikrit_deregister(sshmem_mkey_t *mkeys);
|
extern int mca_spml_ikrit_deregister(sshmem_mkey_t *mkeys);
|
||||||
extern int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe,
|
extern int mca_spml_ikrit_oob_get_mkeys(int pe,
|
||||||
uint32_t segno,
|
uint32_t segno,
|
||||||
sshmem_mkey_t *mkeys);
|
sshmem_mkey_t *mkeys);
|
||||||
|
|
||||||
|
@ -41,9 +41,6 @@
|
|||||||
#define SPML_UCX_PUT_DEBUG 0
|
#define SPML_UCX_PUT_DEBUG 0
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static
|
|
||||||
spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva);
|
|
||||||
|
|
||||||
mca_spml_ucx_t mca_spml_ucx = {
|
mca_spml_ucx_t mca_spml_ucx = {
|
||||||
.super = {
|
.super = {
|
||||||
/* Init mca_spml_base_module_t */
|
/* Init mca_spml_base_module_t */
|
||||||
@ -77,11 +74,9 @@ mca_spml_ucx_t mca_spml_ucx = {
|
|||||||
.num_disconnect = 1,
|
.num_disconnect = 1,
|
||||||
.heap_reg_nb = 0,
|
.heap_reg_nb = 0,
|
||||||
.enabled = 0,
|
.enabled = 0,
|
||||||
.get_mkey_slow = mca_spml_ucx_get_mkey_slow
|
.get_mkey_slow = NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
|
|
||||||
|
|
||||||
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
|
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
|
||||||
.ucp_worker = NULL,
|
.ucp_worker = NULL,
|
||||||
.ucp_peers = NULL,
|
.ucp_peers = NULL,
|
||||||
@ -217,7 +212,7 @@ static char spml_ucx_transport_ids[1] = { 0 };
|
|||||||
|
|
||||||
int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||||
{
|
{
|
||||||
size_t i, n;
|
size_t i, j, n;
|
||||||
int rc = OSHMEM_ERROR;
|
int rc = OSHMEM_ERROR;
|
||||||
int my_rank = oshmem_my_proc_id();
|
int my_rank = oshmem_my_proc_id();
|
||||||
ucs_status_t err;
|
ucs_status_t err;
|
||||||
@ -246,7 +241,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
opal_progress_register(spml_ucx_progress);
|
opal_progress_register(spml_ucx_default_progress);
|
||||||
|
|
||||||
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
|
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
|
||||||
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
|
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
|
||||||
@ -270,6 +265,10 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
|||||||
OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
|
OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
|
||||||
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
|
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
|
||||||
|
|
||||||
|
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
|
||||||
|
mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
mca_spml_ucx.remote_addrs_tbl[i] = (char *)malloc(wk_rsizes[i]);
|
mca_spml_ucx.remote_addrs_tbl[i] = (char *)malloc(wk_rsizes[i]);
|
||||||
memcpy(mca_spml_ucx.remote_addrs_tbl[i], (char *)(wk_raddrs + wk_roffs[i]),
|
memcpy(mca_spml_ucx.remote_addrs_tbl[i], (char *)(wk_raddrs + wk_roffs[i]),
|
||||||
wk_rsizes[i]);
|
wk_rsizes[i]);
|
||||||
@ -306,22 +305,6 @@ error:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static
|
|
||||||
spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva)
|
|
||||||
{
|
|
||||||
sshmem_mkey_t *r_mkey;
|
|
||||||
|
|
||||||
r_mkey = mca_memheap_base_get_cached_mkey(ctx, pe, va, 0, rva);
|
|
||||||
if (OPAL_UNLIKELY(!r_mkey)) {
|
|
||||||
SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable",
|
|
||||||
pe, va);
|
|
||||||
oshmem_shmem_abort(-1);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
|
|
||||||
}
|
|
||||||
|
|
||||||
void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey)
|
void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey)
|
||||||
{
|
{
|
||||||
spml_ucx_mkey_t *ucx_mkey;
|
spml_ucx_mkey_t *ucx_mkey;
|
||||||
@ -366,7 +349,9 @@ void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t se
|
|||||||
goto error_fatal;
|
goto error_fatal;
|
||||||
}
|
}
|
||||||
|
|
||||||
mkey->spml_context = ucx_mkey;
|
if (ucx_ctx == &mca_spml_ucx_ctx_default) {
|
||||||
|
mkey->spml_context = ucx_mkey;
|
||||||
|
}
|
||||||
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe);
|
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -492,12 +477,14 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
|
|||||||
{
|
{
|
||||||
spml_ucx_mkey_t *ucx_mkey;
|
spml_ucx_mkey_t *ucx_mkey;
|
||||||
map_segment_t *mem_seg;
|
map_segment_t *mem_seg;
|
||||||
|
int segno;
|
||||||
|
int my_pe = oshmem_my_proc_id();
|
||||||
|
|
||||||
MCA_SPML_CALL(quiet(oshmem_ctx_default));
|
MCA_SPML_CALL(quiet(oshmem_ctx_default));
|
||||||
if (!mkeys)
|
if (!mkeys)
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
|
|
||||||
if (!mkeys[0].spml_context)
|
if (!mkeys[0].spml_context)
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
|
|
||||||
mem_seg = memheap_find_va(mkeys[0].va_base);
|
mem_seg = memheap_find_va(mkeys[0].va_base);
|
||||||
@ -511,25 +498,68 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
|
|||||||
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
|
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
|
||||||
}
|
}
|
||||||
ucp_rkey_destroy(ucx_mkey->rkey);
|
ucp_rkey_destroy(ucx_mkey->rkey);
|
||||||
|
ucx_mkey->rkey = NULL;
|
||||||
|
|
||||||
if (0 < mkeys[0].len) {
|
if (0 < mkeys[0].len) {
|
||||||
ucp_rkey_buffer_release(mkeys[0].u.data);
|
ucp_rkey_buffer_release(mkeys[0].u.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
free(mkeys);
|
||||||
|
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
if (array->ctxs_count < array->ctxs_num) {
|
||||||
|
array->ctxs[array->ctxs_count] = ctx;
|
||||||
|
} else {
|
||||||
|
array->ctxs = realloc(array->ctxs, (array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC) * sizeof(mca_spml_ucx_ctx_t *));
|
||||||
|
opal_atomic_wmb ();
|
||||||
|
for (i = array->ctxs_num; i < array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC; i++) {
|
||||||
|
array->ctxs[i] = NULL;
|
||||||
|
}
|
||||||
|
array->ctxs[array->ctxs_num] = ctx;
|
||||||
|
array->ctxs_num += MCA_SPML_UCX_CTXS_ARRAY_INC;
|
||||||
|
}
|
||||||
|
|
||||||
|
opal_atomic_wmb ();
|
||||||
|
array->ctxs_count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < array->ctxs_count; i++) {
|
||||||
|
if (array->ctxs[i] == ctx) {
|
||||||
|
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
|
||||||
|
array->ctxs[array->ctxs_count-1] = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
array->ctxs_count--;
|
||||||
|
opal_atomic_wmb ();
|
||||||
|
}
|
||||||
|
|
||||||
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||||
{
|
{
|
||||||
mca_spml_ucx_ctx_list_item_t *ctx_item;
|
mca_spml_ucx_ctx_t *ucx_ctx;
|
||||||
ucp_worker_params_t params;
|
ucp_worker_params_t params;
|
||||||
ucp_ep_params_t ep_params;
|
ucp_ep_params_t ep_params;
|
||||||
size_t i, nprocs = oshmem_num_procs();
|
size_t i, j, nprocs = oshmem_num_procs();
|
||||||
ucs_status_t err;
|
ucs_status_t err;
|
||||||
|
int my_pe = oshmem_my_proc_id();
|
||||||
|
size_t len;
|
||||||
|
spml_ucx_mkey_t *ucx_mkey;
|
||||||
|
sshmem_mkey_t *mkey;
|
||||||
int rc = OSHMEM_ERROR;
|
int rc = OSHMEM_ERROR;
|
||||||
|
|
||||||
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
|
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
|
||||||
ctx_item->ctx.options = options;
|
ucx_ctx->options = options;
|
||||||
|
|
||||||
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||||
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
|
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
|
||||||
@ -539,52 +569,66 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = ucp_worker_create(mca_spml_ucx.ucp_context, ¶ms,
|
err = ucp_worker_create(mca_spml_ucx.ucp_context, ¶ms,
|
||||||
&ctx_item->ctx.ucp_worker);
|
&ucx_ctx->ucp_worker);
|
||||||
if (UCS_OK != err) {
|
if (UCS_OK != err) {
|
||||||
OBJ_RELEASE(ctx_item);
|
free(ucx_ctx);
|
||||||
return OSHMEM_ERROR;
|
return OSHMEM_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
|
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
|
||||||
if (NULL == ctx_item->ctx.ucp_peers) {
|
if (NULL == ucx_ctx->ucp_peers) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mca_spml_ucx.active_array.ctxs_count == 0) {
|
||||||
|
opal_progress_register(spml_ucx_ctx_progress);
|
||||||
|
}
|
||||||
|
|
||||||
for (i = 0; i < nprocs; i++) {
|
for (i = 0; i < nprocs; i++) {
|
||||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||||
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
|
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
|
||||||
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
|
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
|
||||||
&ctx_item->ctx.ucp_peers[i].ucp_conn);
|
&ucx_ctx->ucp_peers[i].ucp_conn);
|
||||||
if (UCS_OK != err) {
|
if (UCS_OK != err) {
|
||||||
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
|
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
|
||||||
ucs_status_string(err));
|
ucs_status_string(err));
|
||||||
goto error2;
|
goto error2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
|
||||||
|
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
|
||||||
|
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
|
||||||
|
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
|
||||||
|
mkey->u.data,
|
||||||
|
&ucx_mkey->rkey);
|
||||||
|
if (UCS_OK != err) {
|
||||||
|
SPML_UCX_ERROR("failed to unpack rkey");
|
||||||
|
goto error2;
|
||||||
|
}
|
||||||
|
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
||||||
|
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
|
||||||
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
|
|
||||||
|
|
||||||
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
||||||
|
|
||||||
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
|
(*ctx) = (shmem_ctx_t)ucx_ctx;
|
||||||
|
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
|
|
||||||
error2:
|
error2:
|
||||||
for (i = 0; i < nprocs; i++) {
|
for (i = 0; i < nprocs; i++) {
|
||||||
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
|
if (ucx_ctx->ucp_peers[i].ucp_conn) {
|
||||||
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
|
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx_item->ctx.ucp_peers)
|
if (ucx_ctx->ucp_peers)
|
||||||
free(ctx_item->ctx.ucp_peers);
|
free(ucx_ctx->ucp_peers);
|
||||||
|
|
||||||
error:
|
error:
|
||||||
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
|
ucp_worker_destroy(ucx_ctx->ucp_worker);
|
||||||
OBJ_RELEASE(ctx_item);
|
free(ucx_ctx);
|
||||||
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
|
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
|
||||||
SPML_ERROR("ctx create FAILED rc=%d", rc);
|
SPML_ERROR("ctx create FAILED rc=%d", rc);
|
||||||
return rc;
|
return rc;
|
||||||
@ -592,29 +636,16 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
|||||||
|
|
||||||
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
|
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
|
||||||
{
|
{
|
||||||
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
|
|
||||||
size_t i, nprocs = oshmem_num_procs();
|
|
||||||
|
|
||||||
MCA_SPML_CALL(quiet(ctx));
|
MCA_SPML_CALL(quiet(ctx));
|
||||||
|
|
||||||
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
||||||
|
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
|
||||||
/* delete context object from list */
|
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
|
||||||
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
|
|
||||||
mca_spml_ucx_ctx_list_item_t) {
|
|
||||||
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
|
|
||||||
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
|
|
||||||
for (i = 0; i < nprocs; i++) {
|
|
||||||
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
|
|
||||||
}
|
|
||||||
free(ctx_item->ctx.ucp_peers);
|
|
||||||
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
|
|
||||||
OBJ_RELEASE(ctx_item);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
||||||
|
|
||||||
|
if (!mca_spml_ucx.active_array.ctxs_count) {
|
||||||
|
opal_progress_unregister(spml_ucx_ctx_progress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)
|
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)
|
||||||
|
@ -74,14 +74,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
|
|||||||
|
|
||||||
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
|
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
|
||||||
|
|
||||||
struct mca_spml_ucx_ctx_list_item {
|
|
||||||
opal_list_item_t super;
|
|
||||||
mca_spml_ucx_ctx_t ctx;
|
|
||||||
};
|
|
||||||
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
|
|
||||||
|
|
||||||
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
|
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
|
||||||
|
|
||||||
|
typedef struct mca_spml_ucx_ctx_array {
|
||||||
|
int ctxs_count;
|
||||||
|
int ctxs_num;
|
||||||
|
mca_spml_ucx_ctx_t **ctxs;
|
||||||
|
} mca_spml_ucx_ctx_array_t;
|
||||||
|
|
||||||
struct mca_spml_ucx {
|
struct mca_spml_ucx {
|
||||||
mca_spml_base_module_t super;
|
mca_spml_base_module_t super;
|
||||||
ucp_context_h ucp_context;
|
ucp_context_h ucp_context;
|
||||||
@ -90,7 +90,8 @@ struct mca_spml_ucx {
|
|||||||
bool enabled;
|
bool enabled;
|
||||||
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
|
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
|
||||||
char **remote_addrs_tbl;
|
char **remote_addrs_tbl;
|
||||||
opal_list_t ctx_list;
|
mca_spml_ucx_ctx_array_t active_array;
|
||||||
|
mca_spml_ucx_ctx_array_t idle_array;
|
||||||
int priority; /* component priority */
|
int priority; /* component priority */
|
||||||
shmem_internal_mutex_t internal_mutex;
|
shmem_internal_mutex_t internal_mutex;
|
||||||
};
|
};
|
||||||
@ -150,7 +151,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
|
|||||||
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
|
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
|
||||||
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
|
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
|
||||||
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
|
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
|
||||||
extern int spml_ucx_progress(void);
|
extern int spml_ucx_default_progress(void);
|
||||||
|
extern int spml_ucx_ctx_progress(void);
|
||||||
|
|
||||||
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
|
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
|
||||||
{
|
{
|
||||||
@ -168,37 +170,7 @@ mca_spml_ucx_get_mkey(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_uc
|
|||||||
|
|
||||||
mkey = ucx_ctx->ucp_peers[pe].mkeys;
|
mkey = ucx_ctx->ucp_peers[pe].mkeys;
|
||||||
mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va);
|
mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va);
|
||||||
if (OPAL_UNLIKELY(NULL == mkey)) {
|
assert(mkey != NULL);
|
||||||
if (ucx_ctx != &mca_spml_ucx_ctx_default && pe == oshmem_my_proc_id()) {
|
|
||||||
mkey = mca_spml_ucx_ctx_default.ucp_peers[pe].mkeys;
|
|
||||||
mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va);
|
|
||||||
if (OPAL_UNLIKELY(NULL == mkey)) {
|
|
||||||
assert(module->get_mkey_slow);
|
|
||||||
return module->get_mkey_slow(ctx, pe, va, rva);
|
|
||||||
} else {
|
|
||||||
uint32_t segno = memheap_find_segnum(va);
|
|
||||||
sshmem_mkey_t *new_mkey = (sshmem_mkey_t *)calloc(1, sizeof(*new_mkey));
|
|
||||||
spml_ucx_mkey_t *new_ucx_mkey = &(ucx_ctx->ucp_peers[pe].mkeys[segno].key);
|
|
||||||
size_t len;
|
|
||||||
|
|
||||||
new_mkey->spml_context = new_ucx_mkey;
|
|
||||||
|
|
||||||
ucp_rkey_pack(mca_spml_ucx.ucp_context, mkey->key.mem_h,
|
|
||||||
&(new_mkey->u.data), &len);
|
|
||||||
|
|
||||||
ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn,
|
|
||||||
new_mkey->u.data, &new_ucx_mkey->rkey);
|
|
||||||
new_mkey->len = len;
|
|
||||||
new_mkey->va_base = va;
|
|
||||||
|
|
||||||
*rva = map_segment_va2rva(&mkey->super, va);
|
|
||||||
return new_ucx_mkey;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert(module->get_mkey_slow);
|
|
||||||
return module->get_mkey_slow(ctx, pe, va, rva);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*rva = map_segment_va2rva(&mkey->super, va);
|
*rva = map_segment_va2rva(&mkey->super, va);
|
||||||
return &mkey->key;
|
return &mkey->key;
|
||||||
}
|
}
|
||||||
@ -221,6 +193,9 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
|
||||||
|
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64
|
||||||
|
|
||||||
END_C_DECLS
|
END_C_DECLS
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -109,7 +109,16 @@ static int mca_spml_ucx_component_register(void)
|
|||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int spml_ucx_progress(void)
|
int spml_ucx_ctx_progress(void)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
|
||||||
|
ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int spml_ucx_default_progress(void)
|
||||||
{
|
{
|
||||||
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
|
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
|
||||||
return 1;
|
return 1;
|
||||||
@ -168,7 +177,13 @@ static int spml_ucx_init(void)
|
|||||||
oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
|
oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t);
|
mca_spml_ucx.active_array.ctxs_count = mca_spml_ucx.idle_array.ctxs_count = 0;
|
||||||
|
mca_spml_ucx.active_array.ctxs_num = mca_spml_ucx.idle_array.ctxs_num = MCA_SPML_UCX_CTXS_ARRAY_SIZE;
|
||||||
|
mca_spml_ucx.active_array.ctxs = calloc(mca_spml_ucx.active_array.ctxs_num,
|
||||||
|
sizeof(mca_spml_ucx_ctx_t *));
|
||||||
|
mca_spml_ucx.idle_array.ctxs = calloc(mca_spml_ucx.idle_array.ctxs_num,
|
||||||
|
sizeof(mca_spml_ucx_ctx_t *));
|
||||||
|
|
||||||
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
|
||||||
|
|
||||||
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||||
@ -217,19 +232,91 @@ mca_spml_ucx_component_init(int* priority,
|
|||||||
return &mca_spml_ucx.super;
|
return &mca_spml_ucx.super;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
|
||||||
|
{
|
||||||
|
int i, j, nprocs = oshmem_num_procs();
|
||||||
|
opal_common_ucx_del_proc_t *del_procs;
|
||||||
|
|
||||||
|
del_procs = malloc(sizeof(*del_procs) * nprocs);
|
||||||
|
|
||||||
|
for (i = 0; i < nprocs; ++i) {
|
||||||
|
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
|
||||||
|
if (ctx->ucp_peers[i].mkeys[j].key.rkey != NULL) {
|
||||||
|
ucp_rkey_destroy(ctx->ucp_peers[i].mkeys[j].key.rkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
del_procs[i].ep = ctx->ucp_peers[i].ucp_conn;
|
||||||
|
del_procs[i].vpid = i;
|
||||||
|
ctx->ucp_peers[i].ucp_conn = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
|
||||||
|
mca_spml_ucx.num_disconnect,
|
||||||
|
ctx->ucp_worker);
|
||||||
|
free(del_procs);
|
||||||
|
free(ctx->ucp_peers);
|
||||||
|
}
|
||||||
|
|
||||||
static int mca_spml_ucx_component_fini(void)
|
static int mca_spml_ucx_component_fini(void)
|
||||||
{
|
{
|
||||||
opal_progress_unregister(spml_ucx_progress);
|
int fenced = 0, i;
|
||||||
|
int ret = OSHMEM_SUCCESS;
|
||||||
if (mca_spml_ucx_ctx_default.ucp_worker) {
|
|
||||||
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
|
opal_progress_unregister(spml_ucx_default_progress);
|
||||||
|
if (mca_spml_ucx.active_array.ctxs_count) {
|
||||||
|
opal_progress_unregister(spml_ucx_ctx_progress);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!mca_spml_ucx.enabled)
|
if(!mca_spml_ucx.enabled)
|
||||||
return OSHMEM_SUCCESS; /* never selected.. return success.. */
|
return OSHMEM_SUCCESS; /* never selected.. return success.. */
|
||||||
|
|
||||||
|
/* delete context objects from list */
|
||||||
|
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
|
||||||
|
_ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
|
||||||
|
_ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
|
||||||
|
if (OPAL_SUCCESS != ret) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!fenced) {
|
||||||
|
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
|
||||||
|
ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
|
||||||
|
ucp_worker_progress(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* delete all workers */
|
||||||
|
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
|
||||||
|
ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
|
||||||
|
free(mca_spml_ucx.active_array.ctxs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
|
||||||
|
ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
|
||||||
|
free(mca_spml_ucx.idle_array.ctxs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mca_spml_ucx_ctx_default.ucp_worker) {
|
||||||
|
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
|
||||||
|
}
|
||||||
|
|
||||||
mca_spml_ucx.enabled = false; /* not anymore */
|
mca_spml_ucx.enabled = false; /* not anymore */
|
||||||
|
|
||||||
OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list));
|
free(mca_spml_ucx.active_array.ctxs);
|
||||||
|
free(mca_spml_ucx.idle_array.ctxs);
|
||||||
|
|
||||||
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
|
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
|
||||||
|
|
||||||
if (mca_spml_ucx.ucp_context) {
|
if (mca_spml_ucx.ucp_context) {
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user