From b00209e1f518ad1a817d81c39675121460a9445a Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Sat, 9 Mar 2019 00:30:07 +0200 Subject: [PATCH 1/8] Revert "OMPI/OSHMEM: bug-fix: store mkeys for each oshmem ctx." This reverts commit f1b095c784de6d1908fa40dcf76e733110cbeaf2. Signed-off-by: Tomislav Janjusic --- oshmem/mca/atomic/ucx/atomic_ucx_cswap.c | 2 +- oshmem/mca/atomic/ucx/atomic_ucx_module.c | 4 +- oshmem/mca/memheap/base/base.h | 8 ++-- oshmem/mca/memheap/base/memheap_base_mkey.c | 21 +++++----- oshmem/mca/spml/base/base.h | 5 +-- oshmem/mca/spml/base/spml_base.c | 4 +- oshmem/mca/spml/ikrit/spml_ikrit.c | 10 ++--- oshmem/mca/spml/ikrit/spml_ikrit.h | 2 +- oshmem/mca/spml/spml.h | 4 +- oshmem/mca/spml/ucx/spml_ucx.c | 33 +++++++++------ oshmem/mca/spml/ucx/spml_ucx.h | 45 +++------------------ oshmem/shmem/c/shmem_addr_accessible.c | 3 +- oshmem/shmem/c/shmem_ptr.c | 3 +- 13 files changed, 54 insertions(+), 90 deletions(-) diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c index 51b0762947..25fe992688 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -40,7 +40,7 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx, assert(NULL != prev); *prev = value; - ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn, UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size, rva, ucx_mkey->rkey, diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_module.c b/oshmem/mca/atomic/ucx/atomic_ucx_module.c index 91d4551e45..4d269065cb 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_module.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_module.c @@ -47,7 +47,7 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx, assert((8 == size) || (4 == size)); - ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn, op, value, size, rva, ucx_mkey->rkey); @@ -70,7 +70,7 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx, assert((8 == size) || (4 == size)); - ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn, op, value, prev, size, rva, ucx_mkey->rkey, diff --git a/oshmem/mca/memheap/base/base.h b/oshmem/mca/memheap/base/base.h index 7178685f0a..6b4a79fb9e 100644 --- a/oshmem/mca/memheap/base/base.h +++ b/oshmem/mca/memheap/base/base.h @@ -69,8 +69,7 @@ void memheap_oob_destruct(void); OSHMEM_DECLSPEC int mca_memheap_base_is_symmetric_addr(const void* va); OSHMEM_DECLSPEC sshmem_mkey_t *mca_memheap_base_get_mkey(void* va, int tr_id); -OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx, - map_segment_t *s, +OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, int pe, void* va, int btl_id, @@ -244,8 +243,7 @@ static inline map_segment_t *memheap_find_va(void* va) return s; } -static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(shmem_ctx_t ctx, - int pe, +static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe, void* va, int btl_id, void** rva) @@ -275,7 +273,7 @@ static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(shmem_ctx_t ctx, return mkey; } - return mca_memheap_base_get_cached_mkey_slow(ctx, s, pe, va, btl_id, rva); + return mca_memheap_base_get_cached_mkey_slow(s, pe, va, btl_id, rva); } static inline int mca_memheap_base_num_transports(void) diff --git a/oshmem/mca/memheap/base/memheap_base_mkey.c b/oshmem/mca/memheap/base/memheap_base_mkey.c index fea00694ba..8d92293210 100644 --- a/oshmem/mca/memheap/base/memheap_base_mkey.c +++ b/oshmem/mca/memheap/base/memheap_base_mkey.c @@ -55,7 +55,6 @@ struct oob_comm { oob_comm_request_t req_pool[MEMHEAP_RECV_REQS_MAX]; opal_list_t req_list; int is_inited; - shmem_ctx_t ctx; }; mca_memheap_map_t* memheap_map = NULL; @@ -67,7 +66,7 @@ static int send_buffer(int pe, opal_buffer_t *msg); static int oshmem_mkey_recv_cb(void); /* pickup list of rkeys and remote va */ -static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, +static int memheap_oob_get_mkeys(int pe, uint32_t va_seg_num, sshmem_mkey_t *mkey); @@ -143,7 +142,7 @@ static void memheap_attach_segment(sshmem_mkey_t *mkey, int tr_id) } -static void unpack_remote_mkeys(shmem_ctx_t ctx, opal_buffer_t *msg, int remote_pe) +static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe) { int32_t cnt; int32_t n; @@ -183,7 +182,7 @@ static void unpack_remote_mkeys(shmem_ctx_t ctx, opal_buffer_t *msg, int remote_ } else { memheap_oob.mkeys[tr_id].u.key = MAP_SEGMENT_SHM_INVALID; } - MCA_SPML_CALL(rmkey_unpack(ctx, &memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id)); + MCA_SPML_CALL(rmkey_unpack(&memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id)); } MEMHEAP_VERBOSE(5, @@ -243,7 +242,7 @@ static void do_recv(int source_pe, opal_buffer_t* buffer) case MEMHEAP_RKEY_RESP: MEMHEAP_VERBOSE(5, "*** RKEY RESP"); OPAL_THREAD_LOCK(&memheap_oob.lck); - unpack_remote_mkeys(memheap_oob.ctx, buffer, source_pe); + unpack_remote_mkeys(buffer, source_pe); memheap_oob.mkeys_rcvd = MEMHEAP_RKEY_RESP; opal_condition_broadcast(&memheap_oob.cond); OPAL_THREAD_UNLOCK(&memheap_oob.lck); @@ -456,14 +455,14 @@ static int send_buffer(int pe, opal_buffer_t *msg) return rc; } -static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys) +static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) { opal_buffer_t *msg; uint8_t cmd; int i; int rc; - if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(ctx, pe, seg, mkeys))) { + if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(pe, seg, mkeys))) { for (i = 0; i < memheap_map->num_transports; i++) { MEMHEAP_VERBOSE(5, "MKEY CALCULATED BY LOCAL SPML: pe: %d tr_id: %d %s", @@ -479,7 +478,6 @@ static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_m memheap_oob.mkeys = mkeys; memheap_oob.segno = seg; memheap_oob.mkeys_rcvd = 0; - memheap_oob.ctx = ctx; msg = OBJ_NEW(opal_buffer_t); if (!msg) { @@ -647,7 +645,7 @@ void mca_memheap_modex_recv_all(void) } memheap_oob.mkeys = s->mkeys_cache[i]; memheap_oob.segno = j; - unpack_remote_mkeys(oshmem_ctx_default, msg, i); + unpack_remote_mkeys(msg, i); } } @@ -676,8 +674,7 @@ exit_fatal: } } -sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx, - map_segment_t *s, +sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, int pe, void* va, int btl_id, @@ -695,7 +692,7 @@ sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx, if (!s->mkeys_cache[pe]) return NULL ; - rc = memheap_oob_get_mkeys(ctx, pe, + rc = memheap_oob_get_mkeys(pe, s - memheap_map->mem_segs, s->mkeys_cache[pe]); if (OSHMEM_SUCCESS != rc) diff --git a/oshmem/mca/spml/base/base.h b/oshmem/mca/spml/base/base.h index e3ec1b6855..4aeff7d760 100644 --- a/oshmem/mca/spml/base/base.h +++ b/oshmem/mca/spml/base/base.h @@ -72,12 +72,11 @@ OSHMEM_DECLSPEC int mca_spml_base_test(void* addr, void* value, int datatype, int *out_value); -OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx, - int pe, +OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys); -OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id); +OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id); OSHMEM_DECLSPEC void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey); OSHMEM_DECLSPEC void *mca_spml_base_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe); diff --git a/oshmem/mca/spml/base/spml_base.c b/oshmem/mca/spml/base/spml_base.c index 48a7c1c712..4e3331ae8c 100644 --- a/oshmem/mca/spml/base/spml_base.c +++ b/oshmem/mca/spml/base/spml_base.c @@ -247,12 +247,12 @@ int mca_spml_base_wait_nb(void* handle) return OSHMEM_SUCCESS; } -int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t segno, sshmem_mkey_t *mkeys) +int mca_spml_base_oob_get_mkeys(int pe, uint32_t segno, sshmem_mkey_t *mkeys) { return OSHMEM_ERROR; } -void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) +void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) { } diff --git a/oshmem/mca/spml/ikrit/spml_ikrit.c b/oshmem/mca/spml/ikrit/spml_ikrit.c index 88dca5fd4c..37b1cb41a9 100644 --- a/oshmem/mca/spml/ikrit/spml_ikrit.c +++ b/oshmem/mca/spml/ikrit/spml_ikrit.c @@ -149,7 +149,7 @@ int mca_spml_ikrit_put_simple(void* dst_addr, void* src_addr, int dst); -static void mca_spml_ikrit_cache_mkeys(shmem_ctx_t ctx, sshmem_mkey_t *, uint32_t seg, int remote_pe, int tr_id); +static void mca_spml_ikrit_cache_mkeys(sshmem_mkey_t *, uint32_t seg, int remote_pe, int tr_id); static mxm_mem_key_t *mca_spml_ikrit_get_mkey_slow(int pe, void *va, int ptl_id, void **rva); @@ -185,7 +185,7 @@ mca_spml_ikrit_t mca_spml_ikrit = { mca_spml_ikrit_get_mkey_slow }; -static void mca_spml_ikrit_cache_mkeys(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t seg, int dst_pe, int tr_id) +static void mca_spml_ikrit_cache_mkeys(sshmem_mkey_t *mkey, uint32_t seg, int dst_pe, int tr_id) { mxm_peer_t *peer; @@ -504,7 +504,7 @@ sshmem_mkey_t *mca_spml_ikrit_register(void* addr, my_rank, i, addr, (unsigned long long)size, mca_spml_base_mkey2str(&mkeys[i])); - mca_spml_ikrit_cache_mkeys(NULL, &mkeys[i], memheap_find_segnum(addr), my_rank, i); + mca_spml_ikrit_cache_mkeys(&mkeys[i], memheap_find_segnum(addr), my_rank, i); } *count = MXM_PTL_LAST; @@ -548,7 +548,7 @@ int mca_spml_ikrit_deregister(sshmem_mkey_t *mkeys) } -int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys) +int mca_spml_ikrit_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) { int ptl; @@ -567,7 +567,7 @@ int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_m mkeys[ptl].len = 0; mkeys[ptl].va_base = mca_memheap_seg2base_va(seg); mkeys[ptl].u.key = MAP_SEGMENT_SHM_INVALID; - mca_spml_ikrit_cache_mkeys(NULL, &mkeys[ptl], seg, pe, ptl); + mca_spml_ikrit_cache_mkeys(&mkeys[ptl], seg, pe, ptl); return OSHMEM_SUCCESS; } diff --git a/oshmem/mca/spml/ikrit/spml_ikrit.h b/oshmem/mca/spml/ikrit/spml_ikrit.h index 7cfc91c74c..543d9a3070 100644 --- a/oshmem/mca/spml/ikrit/spml_ikrit.h +++ b/oshmem/mca/spml/ikrit/spml_ikrit.h @@ -182,7 +182,7 @@ extern sshmem_mkey_t *mca_spml_ikrit_register(void* addr, uint64_t shmid, int *count); extern int mca_spml_ikrit_deregister(sshmem_mkey_t *mkeys); -extern int mca_spml_ikrit_oob_get_mkeys(shmem_ctx_t ctx, int pe, +extern int mca_spml_ikrit_oob_get_mkeys(int pe, uint32_t segno, sshmem_mkey_t *mkeys); diff --git a/oshmem/mca/spml/spml.h b/oshmem/mca/spml/spml.h index fa992db91c..c78ed6cbdd 100644 --- a/oshmem/mca/spml/spml.h +++ b/oshmem/mca/spml/spml.h @@ -132,7 +132,7 @@ typedef int (*mca_spml_base_module_test_fn_t)(void* addr, * * @param mkey remote mkey */ -typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(shmem_ctx_t ctx, sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id); +typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id); /** * If possible, get a pointer to the remote memory described by the mkey @@ -180,7 +180,7 @@ typedef int (*mca_spml_base_module_deregister_fn_t)(sshmem_mkey_t *mkeys); * * @return OSHMEM_SUCCSESS if keys are found */ -typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(shmem_ctx_t ctx, int pe, +typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(int pe, uint32_t seg, sshmem_mkey_t *mkeys); diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 8b27077e6d..bc7bbeb6c6 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -42,7 +42,7 @@ #endif static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva); +spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva); mca_spml_ucx_t mca_spml_ucx = { .super = { @@ -308,11 +308,11 @@ error: static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva) +spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva) { sshmem_mkey_t *r_mkey; - r_mkey = mca_memheap_base_get_cached_mkey(ctx, pe, va, 0, rva); + r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva); if (OPAL_UNLIKELY(!r_mkey)) { SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable", pe, va); @@ -350,16 +350,23 @@ void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe) #endif } -void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) +static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) +{ + ucp_peer_t *peer; + + peer = &(ucx_ctx->ucp_peers[dst_pe]); + mkey_segment_init(&peer->mkeys[segno].super, mkey, segno); +} + +void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) { spml_ucx_mkey_t *ucx_mkey; - mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; ucs_status_t err; - ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; + ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[pe].mkeys[segno].key; - err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn, - mkey->u.data, + err = ucp_ep_rkey_unpack(mca_spml_ucx_ctx_default.ucp_peers[pe].ucp_conn, + mkey->u.data, &ucx_mkey->rkey); if (UCS_OK != err) { SPML_UCX_ERROR("failed to unpack rkey: %s", ucs_status_string(err)); @@ -367,7 +374,7 @@ void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t se } mkey->spml_context = ucx_mkey; - mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe); + mca_spml_ucx_cache_mkey(&mca_spml_ucx_ctx_default, mkey, segno, pe); return; error_fatal: @@ -628,7 +635,7 @@ int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_add ucs_status_t status; #endif - ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx); #if HAVE_DECL_UCP_GET_NB request = ucp_get_nb(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); @@ -647,7 +654,7 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_ spml_ucx_mkey_t *ucx_mkey; mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; - ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx); status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey); @@ -665,7 +672,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add ucs_status_t status; #endif - ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx); #if HAVE_DECL_UCP_PUT_NB request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); @@ -684,7 +691,7 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_ spml_ucx_mkey_t *ucx_mkey; mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; - ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx); status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey); diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 35889d0125..48fe66e1d0 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -80,7 +80,7 @@ struct mca_spml_ucx_ctx_list_item { }; typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t; -typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva); +typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(int pe, void *va, void **rva); struct mca_spml_ucx { mca_spml_base_module_t super; @@ -142,7 +142,7 @@ extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys); extern void mca_spml_ucx_memuse_hook(void *addr, size_t length); -extern void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id); +extern void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id); extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey); extern void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *, int pe); @@ -152,52 +152,17 @@ extern int mca_spml_ucx_fence(shmem_ctx_t ctx); extern int mca_spml_ucx_quiet(shmem_ctx_t ctx); extern int spml_ucx_progress(void); -static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) -{ - ucp_peer_t *peer; - - peer = &(ucx_ctx->ucp_peers[dst_pe]); - mkey_segment_init(&peer->mkeys[segno].super, mkey, segno); -} static inline spml_ucx_mkey_t * -mca_spml_ucx_get_mkey(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module) +mca_spml_ucx_get_mkey(mca_spml_ucx_ctx_t *ucx_ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module) { spml_ucx_cached_mkey_t *mkey; - mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; mkey = ucx_ctx->ucp_peers[pe].mkeys; mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va); if (OPAL_UNLIKELY(NULL == mkey)) { - if (ucx_ctx != &mca_spml_ucx_ctx_default && pe == oshmem_my_proc_id()) { - mkey = mca_spml_ucx_ctx_default.ucp_peers[pe].mkeys; - mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va); - if (OPAL_UNLIKELY(NULL == mkey)) { - assert(module->get_mkey_slow); - return module->get_mkey_slow(ctx, pe, va, rva); - } else { - uint32_t segno = memheap_find_segnum(va); - sshmem_mkey_t *new_mkey = (sshmem_mkey_t *)calloc(1, sizeof(*new_mkey)); - spml_ucx_mkey_t *new_ucx_mkey = &(ucx_ctx->ucp_peers[pe].mkeys[segno].key); - size_t len; - - new_mkey->spml_context = new_ucx_mkey; - - ucp_rkey_pack(mca_spml_ucx.ucp_context, mkey->key.mem_h, - &(new_mkey->u.data), &len); - - ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn, - new_mkey->u.data, &new_ucx_mkey->rkey); - new_mkey->len = len; - new_mkey->va_base = va; - - *rva = map_segment_va2rva(&mkey->super, va); - return new_ucx_mkey; - } - } else { - assert(module->get_mkey_slow); - return module->get_mkey_slow(ctx, pe, va, rva); - } + assert(module->get_mkey_slow); + return module->get_mkey_slow(pe, va, rva); } *rva = map_segment_va2rva(&mkey->super, va); return &mkey->key; diff --git a/oshmem/shmem/c/shmem_addr_accessible.c b/oshmem/shmem/c/shmem_addr_accessible.c index 724318a894..8d44ff4181 100644 --- a/oshmem/shmem/c/shmem_addr_accessible.c +++ b/oshmem/shmem/c/shmem_addr_accessible.c @@ -31,8 +31,7 @@ int shmem_addr_accessible(const void *addr, int pe) RUNTIME_CHECK_INIT(); for (i = 0; i < mca_memheap_base_num_transports(); i++) { - /* TODO: iterate on all ctxs, try to get cached mkey */ - mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)addr, i, &rva); + mkey = mca_memheap_base_get_cached_mkey(pe, (void *)addr, i, &rva); if (mkey) { return 1; } diff --git a/oshmem/shmem/c/shmem_ptr.c b/oshmem/shmem/c/shmem_ptr.c index 7bfb6d014f..c7c4d49507 100644 --- a/oshmem/shmem/c/shmem_ptr.c +++ b/oshmem/shmem/c/shmem_ptr.c @@ -52,8 +52,7 @@ void *shmem_ptr(const void *dst_addr, int pe) } for (i = 0; i < mca_memheap_base_num_transports(); i++) { - /* TODO: iterate on all ctxs, try to get cached mkeys */ - mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)dst_addr, i, &rva); + mkey = mca_memheap_base_get_cached_mkey(pe, (void *)dst_addr, i, &rva); if (!mkey) { continue; } From 79ba7526677bd1641239bb77559a2999c8cd3a4a Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Tue, 5 Mar 2019 06:08:11 +0200 Subject: [PATCH 2/8] ompi/oshmem/spml/ucx: fix eps destroy in shmem_ctx_destroy(). Signed-off-by: Tomislav Janjusic --- oshmem/mca/spml/ucx/spml_ucx.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index bc7bbeb6c6..a760557e68 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -604,6 +604,8 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) MCA_SPML_CALL(quiet(ctx)); + oshmem_shmem_barrier(); + SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); /* delete context object from list */ @@ -611,10 +613,22 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) mca_spml_ucx_ctx_list_item_t) { if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) { opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); - for (i = 0; i < nprocs; i++) { - ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn); + + opal_common_ucx_del_proc_t *del_procs; + del_procs = malloc(sizeof(*del_procs) * nprocs); + + for (i = 0; i < nprocs; ++i) { + del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; + del_procs[i].vpid = i; + ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; } + + opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(), + mca_spml_ucx.num_disconnect, + ctx_item->ctx.ucp_worker); + free(del_procs); free(ctx_item->ctx.ucp_peers); + ucp_worker_destroy(ctx_item->ctx.ucp_worker); OBJ_RELEASE(ctx_item); break; From 289595e45dc3ebfe5ae1a9dc6f347b1b2d569c4a Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Thu, 3 Jan 2019 18:20:30 -0800 Subject: [PATCH 3/8] OMPI/OSHMEM: bug-fix: store mkeys for each oshmem ctx. Signed-off-by: Xin Zhao Signed-off-by: Tomislav Janjusic --- oshmem/mca/atomic/ucx/atomic_ucx_cswap.c | 2 +- oshmem/mca/atomic/ucx/atomic_ucx_module.c | 4 +- oshmem/mca/memheap/base/base.h | 8 +- oshmem/mca/memheap/base/memheap_base_mkey.c | 21 ++-- oshmem/mca/spml/base/base.h | 5 +- oshmem/mca/spml/base/spml_base.c | 4 +- oshmem/mca/spml/spml.h | 4 +- oshmem/mca/spml/ucx/spml_ucx.c | 124 +++++++++++++------- oshmem/mca/spml/ucx/spml_ucx.h | 16 ++- oshmem/mca/spml/ucx/spml_ucx_component.c | 7 ++ oshmem/mca/sshmem/sshmem_types.h | 1 - oshmem/shmem/c/shmem_addr_accessible.c | 3 +- oshmem/shmem/c/shmem_ptr.c | 3 +- 13 files changed, 133 insertions(+), 69 deletions(-) diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c index 25fe992688..51b0762947 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -40,7 +40,7 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx, assert(NULL != prev); *prev = value; - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn, UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size, rva, ucx_mkey->rkey, diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_module.c b/oshmem/mca/atomic/ucx/atomic_ucx_module.c index 4d269065cb..91d4551e45 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_module.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_module.c @@ -47,7 +47,7 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx, assert((8 == size) || (4 == size)); - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn, op, value, size, rva, ucx_mkey->rkey); @@ -70,7 +70,7 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx, assert((8 == size) || (4 == size)); - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self); status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn, op, value, prev, size, rva, ucx_mkey->rkey, diff --git a/oshmem/mca/memheap/base/base.h b/oshmem/mca/memheap/base/base.h index 6b4a79fb9e..7178685f0a 100644 --- a/oshmem/mca/memheap/base/base.h +++ b/oshmem/mca/memheap/base/base.h @@ -69,7 +69,8 @@ void memheap_oob_destruct(void); OSHMEM_DECLSPEC int mca_memheap_base_is_symmetric_addr(const void* va); OSHMEM_DECLSPEC sshmem_mkey_t *mca_memheap_base_get_mkey(void* va, int tr_id); -OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, +OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx, + map_segment_t *s, int pe, void* va, int btl_id, @@ -243,7 +244,8 @@ static inline map_segment_t *memheap_find_va(void* va) return s; } -static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe, +static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(shmem_ctx_t ctx, + int pe, void* va, int btl_id, void** rva) @@ -273,7 +275,7 @@ static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe, return mkey; } - return mca_memheap_base_get_cached_mkey_slow(s, pe, va, btl_id, rva); + return mca_memheap_base_get_cached_mkey_slow(ctx, s, pe, va, btl_id, rva); } static inline int mca_memheap_base_num_transports(void) diff --git a/oshmem/mca/memheap/base/memheap_base_mkey.c b/oshmem/mca/memheap/base/memheap_base_mkey.c index 8d92293210..fea00694ba 100644 --- a/oshmem/mca/memheap/base/memheap_base_mkey.c +++ b/oshmem/mca/memheap/base/memheap_base_mkey.c @@ -55,6 +55,7 @@ struct oob_comm { oob_comm_request_t req_pool[MEMHEAP_RECV_REQS_MAX]; opal_list_t req_list; int is_inited; + shmem_ctx_t ctx; }; mca_memheap_map_t* memheap_map = NULL; @@ -66,7 +67,7 @@ static int send_buffer(int pe, opal_buffer_t *msg); static int oshmem_mkey_recv_cb(void); /* pickup list of rkeys and remote va */ -static int memheap_oob_get_mkeys(int pe, +static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t va_seg_num, sshmem_mkey_t *mkey); @@ -142,7 +143,7 @@ static void memheap_attach_segment(sshmem_mkey_t *mkey, int tr_id) } -static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe) +static void unpack_remote_mkeys(shmem_ctx_t ctx, opal_buffer_t *msg, int remote_pe) { int32_t cnt; int32_t n; @@ -182,7 +183,7 @@ static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe) } else { memheap_oob.mkeys[tr_id].u.key = MAP_SEGMENT_SHM_INVALID; } - MCA_SPML_CALL(rmkey_unpack(&memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id)); + MCA_SPML_CALL(rmkey_unpack(ctx, &memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id)); } MEMHEAP_VERBOSE(5, @@ -242,7 +243,7 @@ static void do_recv(int source_pe, opal_buffer_t* buffer) case MEMHEAP_RKEY_RESP: MEMHEAP_VERBOSE(5, "*** RKEY RESP"); OPAL_THREAD_LOCK(&memheap_oob.lck); - unpack_remote_mkeys(buffer, source_pe); + unpack_remote_mkeys(memheap_oob.ctx, buffer, source_pe); memheap_oob.mkeys_rcvd = MEMHEAP_RKEY_RESP; opal_condition_broadcast(&memheap_oob.cond); OPAL_THREAD_UNLOCK(&memheap_oob.lck); @@ -455,14 +456,14 @@ static int send_buffer(int pe, opal_buffer_t *msg) return rc; } -static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) +static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys) { opal_buffer_t *msg; uint8_t cmd; int i; int rc; - if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(pe, seg, mkeys))) { + if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(ctx, pe, seg, mkeys))) { for (i = 0; i < memheap_map->num_transports; i++) { MEMHEAP_VERBOSE(5, "MKEY CALCULATED BY LOCAL SPML: pe: %d tr_id: %d %s", @@ -478,6 +479,7 @@ static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys) memheap_oob.mkeys = mkeys; memheap_oob.segno = seg; memheap_oob.mkeys_rcvd = 0; + memheap_oob.ctx = ctx; msg = OBJ_NEW(opal_buffer_t); if (!msg) { @@ -645,7 +647,7 @@ void mca_memheap_modex_recv_all(void) } memheap_oob.mkeys = s->mkeys_cache[i]; memheap_oob.segno = j; - unpack_remote_mkeys(msg, i); + unpack_remote_mkeys(oshmem_ctx_default, msg, i); } } @@ -674,7 +676,8 @@ exit_fatal: } } -sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, +sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx, + map_segment_t *s, int pe, void* va, int btl_id, @@ -692,7 +695,7 @@ sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s, if (!s->mkeys_cache[pe]) return NULL ; - rc = memheap_oob_get_mkeys(pe, + rc = memheap_oob_get_mkeys(ctx, pe, s - memheap_map->mem_segs, s->mkeys_cache[pe]); if (OSHMEM_SUCCESS != rc) diff --git a/oshmem/mca/spml/base/base.h b/oshmem/mca/spml/base/base.h index 4aeff7d760..e3ec1b6855 100644 --- a/oshmem/mca/spml/base/base.h +++ b/oshmem/mca/spml/base/base.h @@ -72,11 +72,12 @@ OSHMEM_DECLSPEC int mca_spml_base_test(void* addr, void* value, int datatype, int *out_value); -OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(int pe, +OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx, + int pe, uint32_t seg, sshmem_mkey_t *mkeys); -OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id); +OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id); OSHMEM_DECLSPEC void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey); OSHMEM_DECLSPEC void *mca_spml_base_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe); diff --git a/oshmem/mca/spml/base/spml_base.c b/oshmem/mca/spml/base/spml_base.c index 4e3331ae8c..48a7c1c712 100644 --- a/oshmem/mca/spml/base/spml_base.c +++ b/oshmem/mca/spml/base/spml_base.c @@ -247,12 +247,12 @@ int mca_spml_base_wait_nb(void* handle) return OSHMEM_SUCCESS; } -int mca_spml_base_oob_get_mkeys(int pe, uint32_t segno, sshmem_mkey_t *mkeys) +int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t segno, sshmem_mkey_t *mkeys) { return OSHMEM_ERROR; } -void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) +void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) { } diff --git a/oshmem/mca/spml/spml.h b/oshmem/mca/spml/spml.h index c78ed6cbdd..fa992db91c 100644 --- a/oshmem/mca/spml/spml.h +++ b/oshmem/mca/spml/spml.h @@ -132,7 +132,7 @@ typedef int (*mca_spml_base_module_test_fn_t)(void* addr, * * @param mkey remote mkey */ -typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id); +typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(shmem_ctx_t ctx, sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id); /** * If possible, get a pointer to the remote memory described by the mkey @@ -180,7 +180,7 @@ typedef int (*mca_spml_base_module_deregister_fn_t)(sshmem_mkey_t *mkeys); * * @return OSHMEM_SUCCSESS if keys are found */ -typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(int pe, +typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys); diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index a760557e68..56cc9199aa 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -42,7 +42,7 @@ #endif static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva); +spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva); mca_spml_ucx_t mca_spml_ucx = { .super = { @@ -103,7 +103,7 @@ int mca_spml_ucx_enable(bool enable) int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs) { opal_common_ucx_del_proc_t *del_procs; - size_t i; + size_t i, j; int ret; oshmem_shmem_barrier(); @@ -118,6 +118,12 @@ int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs) } for (i = 0; i < nprocs; ++i) { + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + if (mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey != NULL) { + ucp_rkey_destroy(mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey); + } + } + del_procs[i].ep = mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn; del_procs[i].vpid = i; @@ -217,7 +223,7 @@ static char spml_ucx_transport_ids[1] = { 0 }; int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) { - size_t i, n; + size_t i, j, n; int rc = OSHMEM_ERROR; int my_rank = oshmem_my_proc_id(); ucs_status_t err; @@ -270,6 +276,10 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) OSHMEM_PROC_DATA(procs[i])->num_transports = 1; OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids; + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey = NULL; + } + mca_spml_ucx.remote_addrs_tbl[i] = (char *)malloc(wk_rsizes[i]); memcpy(mca_spml_ucx.remote_addrs_tbl[i], (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]); @@ -308,37 +318,47 @@ error: static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva) +spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva) { sshmem_mkey_t *r_mkey; + spml_ucx_mkey_t *ucx_mkey; + uint32_t segno; + mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; + ucs_status_t err; - r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva); + r_mkey = mca_memheap_base_get_cached_mkey(ctx, pe, va, 0, rva); if (OPAL_UNLIKELY(!r_mkey)) { SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable", pe, va); oshmem_shmem_abort(-1); return NULL; } - return (spml_ucx_mkey_t *)(r_mkey->spml_context); + + segno = memheap_find_segnum(va); + ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; + + if (ucx_mkey->rkey == NULL) { + err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn, + r_mkey->u.data, + &ucx_mkey->rkey); + mca_spml_ucx_cache_mkey(ucx_ctx, r_mkey, segno, pe); /* make sure it is properly cached */ + } + + return ucx_mkey; } void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey) { - spml_ucx_mkey_t *ucx_mkey; - - if (!mkey->spml_context) { - return; - } - ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context); - ucp_rkey_destroy(ucx_mkey->rkey); } -void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe) +void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *key, int pe) { #if (((UCP_API_MAJOR >= 1) && (UCP_API_MINOR >= 3)) || (UCP_API_MAJOR >= 2)) void *rva; ucs_status_t err; - spml_ucx_mkey_t *ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context); + mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)&mca_spml_ucx_ctx_default; + uint32_t segno = memheap_find_segnum((void*)dst_addr); + spml_ucx_mkey_t *ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; err = ucp_rkey_ptr(ucx_mkey->rkey, (uint64_t)dst_addr, &rva); if (UCS_OK != err) { @@ -350,31 +370,23 @@ void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe) #endif } -static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) -{ - ucp_peer_t *peer; - - peer = &(ucx_ctx->ucp_peers[dst_pe]); - mkey_segment_init(&peer->mkeys[segno].super, mkey, segno); -} - -void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) +void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id) { spml_ucx_mkey_t *ucx_mkey; + mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; ucs_status_t err; - ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[pe].mkeys[segno].key; + ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; - err = ucp_ep_rkey_unpack(mca_spml_ucx_ctx_default.ucp_peers[pe].ucp_conn, - mkey->u.data, + err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn, + mkey->u.data, &ucx_mkey->rkey); if (UCS_OK != err) { SPML_UCX_ERROR("failed to unpack rkey: %s", ucs_status_string(err)); goto error_fatal; } - mkey->spml_context = ucx_mkey; - mca_spml_ucx_cache_mkey(&mca_spml_ucx_ctx_default, mkey, segno, pe); + mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe); return; error_fatal: @@ -436,7 +448,6 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr, mem_seg = memheap_find_seg(segno); ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[my_pe].mkeys[segno].key; - mkeys[0].spml_context = ucx_mkey; /* if possible use mem handle already created by ucx allocator */ if (MAP_SEGMENT_ALLOC_UCX != mem_seg->type) { @@ -499,16 +510,16 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) { spml_ucx_mkey_t *ucx_mkey; map_segment_t *mem_seg; + int segno; + int my_pe = oshmem_my_proc_id(); MCA_SPML_CALL(quiet(oshmem_ctx_default)); - if (!mkeys) - return OSHMEM_SUCCESS; - - if (!mkeys[0].spml_context) + if (!mkeys || !mkeys[0].va_base) return OSHMEM_SUCCESS; mem_seg = memheap_find_va(mkeys[0].va_base); - ucx_mkey = (spml_ucx_mkey_t*)mkeys[0].spml_context; + segno = memheap_find_segnum(mkeys[0].va_base); + ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[my_pe].mkeys[segno].key; if (OPAL_UNLIKELY(NULL == mem_seg)) { return OSHMEM_ERROR; @@ -518,11 +529,14 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h); } ucp_rkey_destroy(ucx_mkey->rkey); + ucx_mkey->rkey = NULL; if (0 < mkeys[0].len) { ucp_rkey_buffer_release(mkeys[0].u.data); } + free(mkeys); + return OSHMEM_SUCCESS; } @@ -531,8 +545,12 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) mca_spml_ucx_ctx_list_item_t *ctx_item; ucp_worker_params_t params; ucp_ep_params_t ep_params; - size_t i, nprocs = oshmem_num_procs(); + size_t i, j, nprocs = oshmem_num_procs(); ucs_status_t err; + int my_pe = oshmem_my_proc_id(); + size_t len; + spml_ucx_mkey_t *ucx_mkey; + sshmem_mkey_t *mkey; int rc = OSHMEM_ERROR; ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t); @@ -567,6 +585,24 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) ucs_status_string(err)); goto error2; } + + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey = NULL; + } + } + + for (i = 0; i < MCA_MEMHEAP_SEG_COUNT; i++) { + mkey = &memheap_map->mem_segs[i].mkeys_cache[my_pe][0]; + ucx_mkey = &ctx_item->ctx.ucp_peers[my_pe].mkeys[i].key; + err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[my_pe].ucp_conn, + mkey->u.data, + &ucx_mkey->rkey); + if (UCS_OK != err) { + SPML_UCX_ERROR("failed to unpack rkey"); + goto error2; + } + + mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, i, my_pe); } SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); @@ -600,7 +636,7 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) { mca_spml_ucx_ctx_list_item_t *ctx_item, *next; - size_t i, nprocs = oshmem_num_procs(); + size_t i, j, nprocs = oshmem_num_procs(); MCA_SPML_CALL(quiet(ctx)); @@ -618,6 +654,12 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) del_procs = malloc(sizeof(*del_procs) * nprocs); for (i = 0; i < nprocs; ++i) { + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { + ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); + } + } + del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; del_procs[i].vpid = i; ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; @@ -649,7 +691,7 @@ int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_add ucs_status_t status; #endif - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx); #if HAVE_DECL_UCP_GET_NB request = ucp_get_nb(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); @@ -668,7 +710,7 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_ spml_ucx_mkey_t *ucx_mkey; mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx); status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size, (uint64_t)rva, ucx_mkey->rkey); @@ -686,7 +728,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add ucs_status_t status; #endif - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx); #if HAVE_DECL_UCP_PUT_NB request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb); @@ -705,7 +747,7 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_ spml_ucx_mkey_t *ucx_mkey; mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; - ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx); + ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx); status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size, (uint64_t)rva, ucx_mkey->rkey); diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 48fe66e1d0..7fff74a3c5 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -80,7 +80,7 @@ struct mca_spml_ucx_ctx_list_item { }; typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t; -typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(int pe, void *va, void **rva); +typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva); struct mca_spml_ucx { mca_spml_base_module_t super; @@ -142,7 +142,7 @@ extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys); extern void mca_spml_ucx_memuse_hook(void *addr, size_t length); -extern void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id); +extern void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id); extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey); extern void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *, int pe); @@ -152,17 +152,25 @@ extern int mca_spml_ucx_fence(shmem_ctx_t ctx); extern int mca_spml_ucx_quiet(shmem_ctx_t ctx); extern int spml_ucx_progress(void); +static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) +{ + ucp_peer_t *peer; + + peer = &(ucx_ctx->ucp_peers[dst_pe]); + mkey_segment_init(&peer->mkeys[segno].super, mkey, segno); +} static inline spml_ucx_mkey_t * -mca_spml_ucx_get_mkey(mca_spml_ucx_ctx_t *ucx_ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module) +mca_spml_ucx_get_mkey(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module) { spml_ucx_cached_mkey_t *mkey; + mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; mkey = ucx_ctx->ucp_peers[pe].mkeys; mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va); if (OPAL_UNLIKELY(NULL == mkey)) { assert(module->get_mkey_slow); - return module->get_mkey_slow(pe, va, rva); + return module->get_mkey_slow(ctx, pe, va, rva); } *rva = map_segment_va2rva(&mkey->super, va); return &mkey->key; diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c index c91edbfc12..835223ff23 100644 --- a/oshmem/mca/spml/ucx/spml_ucx_component.c +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -111,7 +111,14 @@ static int mca_spml_ucx_component_register(void) int spml_ucx_progress(void) { + mca_spml_ucx_ctx_list_item_t *ctx_item, *next; ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); + SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), + mca_spml_ucx_ctx_list_item_t) { + ucp_worker_progress(ctx_item->ctx.ucp_worker); + } + SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); return 1; } diff --git a/oshmem/mca/sshmem/sshmem_types.h b/oshmem/mca/sshmem/sshmem_types.h index ccdf8995b5..88520d3d20 100644 --- a/oshmem/mca/sshmem/sshmem_types.h +++ b/oshmem/mca/sshmem/sshmem_types.h @@ -94,7 +94,6 @@ typedef struct sshmem_mkey { void *data; uint64_t key; } u; - void *spml_context; /* spml module can attach internal structures here */ } sshmem_mkey_t; typedef struct map_base_segment { diff --git a/oshmem/shmem/c/shmem_addr_accessible.c b/oshmem/shmem/c/shmem_addr_accessible.c index 8d44ff4181..724318a894 100644 --- a/oshmem/shmem/c/shmem_addr_accessible.c +++ b/oshmem/shmem/c/shmem_addr_accessible.c @@ -31,7 +31,8 @@ int shmem_addr_accessible(const void *addr, int pe) RUNTIME_CHECK_INIT(); for (i = 0; i < mca_memheap_base_num_transports(); i++) { - mkey = mca_memheap_base_get_cached_mkey(pe, (void *)addr, i, &rva); + /* TODO: iterate on all ctxs, try to get cached mkey */ + mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)addr, i, &rva); if (mkey) { return 1; } diff --git a/oshmem/shmem/c/shmem_ptr.c b/oshmem/shmem/c/shmem_ptr.c index c7c4d49507..7bfb6d014f 100644 --- a/oshmem/shmem/c/shmem_ptr.c +++ b/oshmem/shmem/c/shmem_ptr.c @@ -52,7 +52,8 @@ void *shmem_ptr(const void *dst_addr, int pe) } for (i = 0; i < mca_memheap_base_num_transports(); i++) { - mkey = mca_memheap_base_get_cached_mkey(pe, (void *)dst_addr, i, &rva); + /* TODO: iterate on all ctxs, try to get cached mkeys */ + mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)dst_addr, i, &rva); if (!mkey) { continue; } From 9a060009622e9220d6332d9b63f6a1a7328418a0 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Thu, 7 Mar 2019 04:15:08 +0200 Subject: [PATCH 4/8] ompi/oshmem/spml/ucx: let shmem_finalize to clean up any ctx left Signed-off-by: Tomislav Janjusic --- oshmem/mca/spml/ucx/spml_ucx_component.c | 40 ++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c index 835223ff23..3fe67d2e4f 100644 --- a/oshmem/mca/spml/ucx/spml_ucx_component.c +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -226,13 +226,47 @@ mca_spml_ucx_component_init(int* priority, static int mca_spml_ucx_component_fini(void) { + mca_spml_ucx_ctx_list_item_t *ctx_item, *next; + size_t i, j, nprocs = oshmem_num_procs(); + opal_progress_unregister(spml_ucx_progress); - + + if(!mca_spml_ucx.enabled) + return OSHMEM_SUCCESS; /* never selected.. return success.. */ + + /* delete context objects from list */ + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), + mca_spml_ucx_ctx_list_item_t) { + opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); + + opal_common_ucx_del_proc_t *del_procs; + del_procs = malloc(sizeof(*del_procs) * nprocs); + + for (i = 0; i < nprocs; ++i) { + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { + ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); + } + } + + del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; + del_procs[i].vpid = i; + ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; + } + + opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(), + mca_spml_ucx.num_disconnect, + ctx_item->ctx.ucp_worker); + free(del_procs); + free(ctx_item->ctx.ucp_peers); + + ucp_worker_destroy(ctx_item->ctx.ucp_worker); + OBJ_RELEASE(ctx_item); + } + if (mca_spml_ucx_ctx_default.ucp_worker) { ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker); } - if(!mca_spml_ucx.enabled) - return OSHMEM_SUCCESS; /* never selected.. return success.. */ mca_spml_ucx.enabled = false; /* not anymore */ From 48033ac1f43159c053241b65e74a39777e5e31e4 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Thu, 7 Mar 2019 22:23:07 +0200 Subject: [PATCH 5/8] ompi/oshmem: add spml_context back to sshmem_type in memheap, to keep track of ucx_ctx_default's rkeys Signed-off-by: Tomislav Janjusic --- oshmem/mca/spml/ucx/spml_ucx.c | 33 ++++++++++++++++++-------------- oshmem/mca/sshmem/sshmem_types.h | 1 + 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 56cc9199aa..5ba1afe670 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -103,7 +103,7 @@ int mca_spml_ucx_enable(bool enable) int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs) { opal_common_ucx_del_proc_t *del_procs; - size_t i, j; + size_t i; int ret; oshmem_shmem_barrier(); @@ -118,12 +118,6 @@ int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs) } for (i = 0; i < nprocs; ++i) { - for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { - if (mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey != NULL) { - ucp_rkey_destroy(mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey); - } - } - del_procs[i].ep = mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn; del_procs[i].vpid = i; @@ -349,16 +343,21 @@ spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey) { + spml_ucx_mkey_t *ucx_mkey; + + if (!mkey->spml_context) { + return; + } + ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context); + ucp_rkey_destroy(ucx_mkey->rkey); } -void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *key, int pe) +void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe) { #if (((UCP_API_MAJOR >= 1) && (UCP_API_MINOR >= 3)) || (UCP_API_MAJOR >= 2)) void *rva; ucs_status_t err; - mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)&mca_spml_ucx_ctx_default; - uint32_t segno = memheap_find_segnum((void*)dst_addr); - spml_ucx_mkey_t *ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; + spml_ucx_mkey_t *ucx_mkey = (spml_ucx_mkey_t *)(mkey->spml_context); err = ucp_rkey_ptr(ucx_mkey->rkey, (uint64_t)dst_addr, &rva); if (UCS_OK != err) { @@ -386,6 +385,9 @@ void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t se goto error_fatal; } + if (ucx_ctx == &mca_spml_ucx_ctx_default) { + mkey->spml_context = ucx_mkey; + } mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe); return; @@ -448,6 +450,7 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr, mem_seg = memheap_find_seg(segno); ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[my_pe].mkeys[segno].key; + mkeys[0].spml_context = ucx_mkey; /* if possible use mem handle already created by ucx allocator */ if (MAP_SEGMENT_ALLOC_UCX != mem_seg->type) { @@ -514,12 +517,14 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) int my_pe = oshmem_my_proc_id(); MCA_SPML_CALL(quiet(oshmem_ctx_default)); - if (!mkeys || !mkeys[0].va_base) + if (!mkeys) + return OSHMEM_SUCCESS; + + if (!mkeys[0].spml_context) return OSHMEM_SUCCESS; mem_seg = memheap_find_va(mkeys[0].va_base); - segno = memheap_find_segnum(mkeys[0].va_base); - ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[my_pe].mkeys[segno].key; + ucx_mkey = (spml_ucx_mkey_t*)mkeys[0].spml_context; if (OPAL_UNLIKELY(NULL == mem_seg)) { return OSHMEM_ERROR; diff --git a/oshmem/mca/sshmem/sshmem_types.h b/oshmem/mca/sshmem/sshmem_types.h index 88520d3d20..ccdf8995b5 100644 --- a/oshmem/mca/sshmem/sshmem_types.h +++ b/oshmem/mca/sshmem/sshmem_types.h @@ -94,6 +94,7 @@ typedef struct sshmem_mkey { void *data; uint64_t key; } u; + void *spml_context; /* spml module can attach internal structures here */ } sshmem_mkey_t; typedef struct map_base_segment { From e1c1ab020227fc18d145379ab29ea86a3cdb66b1 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Fri, 8 Mar 2019 06:19:39 +0200 Subject: [PATCH 6/8] ompi/oshmem/spml/ucx: defer clean up shmem_ctx to shmem_finalize Signed-off-by: Tomislav Janjusic --- opal/mca/common/ucx/common_ucx.c | 21 ++++-- opal/mca/common/ucx/common_ucx.h | 3 + oshmem/mca/spml/ucx/spml_ucx.c | 25 +------ oshmem/mca/spml/ucx/spml_ucx.h | 1 + oshmem/mca/spml/ucx/spml_ucx_component.c | 87 +++++++++++++++++------- 5 files changed, 83 insertions(+), 54 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c index 5927ca98c6..a7b276b404 100644 --- a/opal/mca/common/ucx/common_ucx.c +++ b/opal/mca/common/ucx/common_ucx.c @@ -151,6 +151,10 @@ void opal_common_ucx_mca_proc_added(void) } } #endif + +OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced) +{ + return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced); } OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker) @@ -182,9 +186,8 @@ static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker } } -OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count, - size_t my_rank, size_t max_disconnect, ucp_worker_h worker) -{ +OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count, + size_t my_rank, size_t max_disconnect, ucp_worker_h worker) { size_t num_reqs; size_t max_reqs; void *dreq, **dreqs; @@ -232,10 +235,14 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker); free(dreqs); - if (OPAL_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(worker))) { - return ret; - } - return OPAL_SUCCESS; } +OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count, + size_t my_rank, size_t max_disconnect, ucp_worker_h worker) +{ + opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker); + + return opal_common_ucx_mca_pmix_fence(worker); +} + diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 4cc24352c3..0fb3722d70 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -105,8 +105,11 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void); OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void); OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status); OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker); +OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced); OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count, size_t my_rank, size_t max_disconnect, ucp_worker_h worker); +OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count, + size_t my_rank, size_t max_disconnect, ucp_worker_h worker); OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component); static inline diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 5ba1afe670..349f9126b6 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -654,30 +654,7 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) mca_spml_ucx_ctx_list_item_t) { if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) { opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); - - opal_common_ucx_del_proc_t *del_procs; - del_procs = malloc(sizeof(*del_procs) * nprocs); - - for (i = 0; i < nprocs; ++i) { - for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { - if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { - ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); - } - } - - del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; - del_procs[i].vpid = i; - ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; - } - - opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(), - mca_spml_ucx.num_disconnect, - ctx_item->ctx.ucp_worker); - free(del_procs); - free(ctx_item->ctx.ucp_peers); - - ucp_worker_destroy(ctx_item->ctx.ucp_worker); - OBJ_RELEASE(ctx_item); + opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super); break; } } diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 7fff74a3c5..17d997216c 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -91,6 +91,7 @@ struct mca_spml_ucx { mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow; char **remote_addrs_tbl; opal_list_t ctx_list; + opal_list_t idle_ctx_list; int priority; /* component priority */ shmem_internal_mutex_t internal_mutex; }; diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c index 3fe67d2e4f..c774841f57 100644 --- a/oshmem/mca/spml/ucx/spml_ucx_component.c +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -176,6 +176,7 @@ static int spml_ucx_init(void) } OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t); + OBJ_CONSTRUCT(&(mca_spml_ucx.idle_ctx_list), opal_list_t); SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex); wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; @@ -224,10 +225,37 @@ mca_spml_ucx_component_init(int* priority, return &mca_spml_ucx.super; } +static void _ctx_cleanup(mca_spml_ucx_ctx_list_item_t *ctx_item) +{ + int i, j, nprocs = oshmem_num_procs(); + opal_common_ucx_del_proc_t *del_procs; + + del_procs = malloc(sizeof(*del_procs) * nprocs); + + for (i = 0; i < nprocs; ++i) { + for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { + if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { + ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); + } + } + + del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; + del_procs[i].vpid = i; + ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; + } + + opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(), + mca_spml_ucx.num_disconnect, + ctx_item->ctx.ucp_worker); + free(del_procs); + free(ctx_item->ctx.ucp_peers); +} + static int mca_spml_ucx_component_fini(void) { mca_spml_ucx_ctx_list_item_t *ctx_item, *next; - size_t i, j, nprocs = oshmem_num_procs(); + int fenced = 0; + int ret = OSHMEM_SUCCESS; opal_progress_unregister(spml_ucx_progress); @@ -235,31 +263,43 @@ static int mca_spml_ucx_component_fini(void) return OSHMEM_SUCCESS; /* never selected.. return success.. */ /* delete context objects from list */ + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), + mca_spml_ucx_ctx_list_item_t) { + _ctx_cleanup(ctx_item); + } + + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), + mca_spml_ucx_ctx_list_item_t) { + _ctx_cleanup(ctx_item); + } + + ret = opal_common_ucx_mca_pmix_fence_nb(&fenced); + if (OPAL_SUCCESS != ret) { + return ret; + } + + while (!fenced) { + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), + mca_spml_ucx_ctx_list_item_t) { + ucp_worker_progress(ctx_item->ctx.ucp_worker); + } + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), + mca_spml_ucx_ctx_list_item_t) { + ucp_worker_progress(ctx_item->ctx.ucp_worker); + } + ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); + } + + /* delete all workers */ + OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), + mca_spml_ucx_ctx_list_item_t) { + opal_list_remove_item(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super); + ucp_worker_destroy(ctx_item->ctx.ucp_worker); + OBJ_RELEASE(ctx_item); + } OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), mca_spml_ucx_ctx_list_item_t) { opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); - - opal_common_ucx_del_proc_t *del_procs; - del_procs = malloc(sizeof(*del_procs) * nprocs); - - for (i = 0; i < nprocs; ++i) { - for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { - if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { - ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); - } - } - - del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; - del_procs[i].vpid = i; - ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; - } - - opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(), - mca_spml_ucx.num_disconnect, - ctx_item->ctx.ucp_worker); - free(del_procs); - free(ctx_item->ctx.ucp_peers); - ucp_worker_destroy(ctx_item->ctx.ucp_worker); OBJ_RELEASE(ctx_item); } @@ -271,6 +311,7 @@ static int mca_spml_ucx_component_fini(void) mca_spml_ucx.enabled = false; /* not anymore */ OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list)); + OBJ_DESTRUCT(&(mca_spml_ucx.idle_ctx_list)); SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex); if (mca_spml_ucx.ucp_context) { From e0414006b0c0a8e9918a4cf8ac4bb819b977ec91 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Fri, 8 Mar 2019 07:29:48 +0200 Subject: [PATCH 7/8] ompi/oshmem/spml/ucx:delete oob path of getting rkeys in spml ucx Signed-off-by: Tomislav Janjusic --- oshmem/mca/spml/ucx/spml_ucx.c | 61 ++++++---------------------------- oshmem/mca/spml/ucx/spml_ucx.h | 5 +-- 2 files changed, 12 insertions(+), 54 deletions(-) diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 349f9126b6..2eb145ef78 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -41,9 +41,6 @@ #define SPML_UCX_PUT_DEBUG 0 #endif -static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva); - mca_spml_ucx_t mca_spml_ucx = { .super = { /* Init mca_spml_base_module_t */ @@ -77,7 +74,7 @@ mca_spml_ucx_t mca_spml_ucx = { .num_disconnect = 1, .heap_reg_nb = 0, .enabled = 0, - .get_mkey_slow = mca_spml_ucx_get_mkey_slow + .get_mkey_slow = NULL }; OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL); @@ -310,37 +307,6 @@ error: } - -static -spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(shmem_ctx_t ctx, int pe, void *va, void **rva) -{ - sshmem_mkey_t *r_mkey; - spml_ucx_mkey_t *ucx_mkey; - uint32_t segno; - mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx; - ucs_status_t err; - - r_mkey = mca_memheap_base_get_cached_mkey(ctx, pe, va, 0, rva); - if (OPAL_UNLIKELY(!r_mkey)) { - SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable", - pe, va); - oshmem_shmem_abort(-1); - return NULL; - } - - segno = memheap_find_segnum(va); - ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key; - - if (ucx_mkey->rkey == NULL) { - err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn, - r_mkey->u.data, - &ucx_mkey->rkey); - mca_spml_ucx_cache_mkey(ucx_ctx, r_mkey, segno, pe); /* make sure it is properly cached */ - } - - return ucx_mkey; -} - void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey) { spml_ucx_mkey_t *ucx_mkey; @@ -592,24 +558,19 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) } for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { - ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey = NULL; + mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0]; + ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key; + err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn, + mkey->u.data, + &ucx_mkey->rkey); + if (UCS_OK != err) { + SPML_UCX_ERROR("failed to unpack rkey"); + goto error2; + } + mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i); } } - for (i = 0; i < MCA_MEMHEAP_SEG_COUNT; i++) { - mkey = &memheap_map->mem_segs[i].mkeys_cache[my_pe][0]; - ucx_mkey = &ctx_item->ctx.ucp_peers[my_pe].mkeys[i].key; - err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[my_pe].ucp_conn, - mkey->u.data, - &ucx_mkey->rkey); - if (UCS_OK != err) { - SPML_UCX_ERROR("failed to unpack rkey"); - goto error2; - } - - mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, i, my_pe); - } - SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super); diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 17d997216c..fb6a1940da 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -169,10 +169,7 @@ mca_spml_ucx_get_mkey(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_uc mkey = ucx_ctx->ucp_peers[pe].mkeys; mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va); - if (OPAL_UNLIKELY(NULL == mkey)) { - assert(module->get_mkey_slow); - return module->get_mkey_slow(ctx, pe, va, rva); - } + assert(mkey != NULL); *rva = map_segment_va2rva(&mkey->super, va); return &mkey->key; } From 9c3d00b144641d2929f830279dcc9d163c38e9e1 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Wed, 13 Mar 2019 04:39:26 +0200 Subject: [PATCH 8/8] ompi/oshmem/spml/ucx: use lockfree array to optimize spml_ucx_progress/delete oshmem_barrier in shmem_ctx_destroy ompi/oshmem/spml/ucx: optimize spml ucx progress Signed-off-by: Tomislav Janjusic --- opal/mca/common/ucx/common_ucx.c | 1 + oshmem/mca/spml/ucx/spml_ucx.c | 109 ++++++++++++++--------- oshmem/mca/spml/ucx/spml_ucx.h | 22 +++-- oshmem/mca/spml/ucx/spml_ucx_component.c | 93 ++++++++++--------- 4 files changed, 130 insertions(+), 95 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx.c b/opal/mca/common/ucx/common_ucx.c index a7b276b404..34e4c2e872 100644 --- a/opal/mca/common/ucx/common_ucx.c +++ b/opal/mca/common/ucx/common_ucx.c @@ -151,6 +151,7 @@ void opal_common_ucx_mca_proc_added(void) } } #endif +} OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced) { diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 2eb145ef78..6aa87b3c6c 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -77,8 +77,6 @@ mca_spml_ucx_t mca_spml_ucx = { .get_mkey_slow = NULL }; -OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL); - mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = { .ucp_worker = NULL, .ucp_peers = NULL, @@ -243,7 +241,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) goto error; } - opal_progress_register(spml_ucx_progress); + opal_progress_register(spml_ucx_default_progress); mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *)); memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *)); @@ -511,9 +509,45 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) return OSHMEM_SUCCESS; } +static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx) +{ + int i; + + if (array->ctxs_count < array->ctxs_num) { + array->ctxs[array->ctxs_count] = ctx; + } else { + array->ctxs = realloc(array->ctxs, (array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC) * sizeof(mca_spml_ucx_ctx_t *)); + opal_atomic_wmb (); + for (i = array->ctxs_num; i < array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC; i++) { + array->ctxs[i] = NULL; + } + array->ctxs[array->ctxs_num] = ctx; + array->ctxs_num += MCA_SPML_UCX_CTXS_ARRAY_INC; + } + + opal_atomic_wmb (); + array->ctxs_count++; +} + +static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx) +{ + int i; + + for (i = 0; i < array->ctxs_count; i++) { + if (array->ctxs[i] == ctx) { + array->ctxs[i] = array->ctxs[array->ctxs_count-1]; + array->ctxs[array->ctxs_count-1] = NULL; + break; + } + } + + array->ctxs_count--; + opal_atomic_wmb (); +} + int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) { - mca_spml_ucx_ctx_list_item_t *ctx_item; + mca_spml_ucx_ctx_t *ucx_ctx; ucp_worker_params_t params; ucp_ep_params_t ep_params; size_t i, j, nprocs = oshmem_num_procs(); @@ -524,8 +558,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) sshmem_mkey_t *mkey; int rc = OSHMEM_ERROR; - ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t); - ctx_item->ctx.options = options; + ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t)); + ucx_ctx->options = options; params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) { @@ -535,22 +569,26 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) } err = ucp_worker_create(mca_spml_ucx.ucp_context, ¶ms, - &ctx_item->ctx.ucp_worker); + &ucx_ctx->ucp_worker); if (UCS_OK != err) { - OBJ_RELEASE(ctx_item); + free(ucx_ctx); return OSHMEM_ERROR; } - ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers))); - if (NULL == ctx_item->ctx.ucp_peers) { + ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers))); + if (NULL == ucx_ctx->ucp_peers) { goto error; } + if (mca_spml_ucx.active_array.ctxs_count == 0) { + opal_progress_register(spml_ucx_ctx_progress); + } + for (i = 0; i < nprocs; i++) { ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]); - err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params, - &ctx_item->ctx.ucp_peers[i].ucp_conn); + err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params, + &ucx_ctx->ucp_peers[i].ucp_conn); if (UCS_OK != err) { SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs, ucs_status_string(err)); @@ -559,41 +597,38 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0]; - ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key; - err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn, + ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key; + err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn, mkey->u.data, &ucx_mkey->rkey); if (UCS_OK != err) { SPML_UCX_ERROR("failed to unpack rkey"); goto error2; } - mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i); + mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i); } } SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); - - opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super); - + _ctx_add(&mca_spml_ucx.active_array, ucx_ctx); SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); - (*ctx) = (shmem_ctx_t)(&ctx_item->ctx); - + (*ctx) = (shmem_ctx_t)ucx_ctx; return OSHMEM_SUCCESS; error2: for (i = 0; i < nprocs; i++) { - if (ctx_item->ctx.ucp_peers[i].ucp_conn) { - ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn); + if (ucx_ctx->ucp_peers[i].ucp_conn) { + ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn); } } - if (ctx_item->ctx.ucp_peers) - free(ctx_item->ctx.ucp_peers); + if (ucx_ctx->ucp_peers) + free(ucx_ctx->ucp_peers); error: - ucp_worker_destroy(ctx_item->ctx.ucp_worker); - OBJ_RELEASE(ctx_item); + ucp_worker_destroy(ucx_ctx->ucp_worker); + free(ucx_ctx); rc = OSHMEM_ERR_OUT_OF_RESOURCE; SPML_ERROR("ctx create FAILED rc=%d", rc); return rc; @@ -601,26 +636,16 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) { - mca_spml_ucx_ctx_list_item_t *ctx_item, *next; - size_t i, j, nprocs = oshmem_num_procs(); - MCA_SPML_CALL(quiet(ctx)); - oshmem_shmem_barrier(); - SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); - - /* delete context object from list */ - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), - mca_spml_ucx_ctx_list_item_t) { - if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) { - opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); - opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super); - break; - } - } - + _ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx); + _ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx); SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); + + if (!mca_spml_ucx.active_array.ctxs_count) { + opal_progress_unregister(spml_ucx_ctx_progress); + } } int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src) diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index fb6a1940da..4093cb3194 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -74,14 +74,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t; extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default; -struct mca_spml_ucx_ctx_list_item { - opal_list_item_t super; - mca_spml_ucx_ctx_t ctx; -}; -typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t; - typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva); +typedef struct mca_spml_ucx_ctx_array { + int ctxs_count; + int ctxs_num; + mca_spml_ucx_ctx_t **ctxs; +} mca_spml_ucx_ctx_array_t; + struct mca_spml_ucx { mca_spml_base_module_t super; ucp_context_h ucp_context; @@ -90,8 +90,8 @@ struct mca_spml_ucx { bool enabled; mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow; char **remote_addrs_tbl; - opal_list_t ctx_list; - opal_list_t idle_ctx_list; + mca_spml_ucx_ctx_array_t active_array; + mca_spml_ucx_ctx_array_t idle_array; int priority; /* component priority */ shmem_internal_mutex_t internal_mutex; }; @@ -151,7 +151,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs); extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs); extern int mca_spml_ucx_fence(shmem_ctx_t ctx); extern int mca_spml_ucx_quiet(shmem_ctx_t ctx); -extern int spml_ucx_progress(void); +extern int spml_ucx_default_progress(void); +extern int spml_ucx_ctx_progress(void); static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) { @@ -192,6 +193,9 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status) #endif } +#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64 +#define MCA_SPML_UCX_CTXS_ARRAY_INC 64 + END_C_DECLS #endif diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c index c774841f57..6493a44b26 100644 --- a/oshmem/mca/spml/ucx/spml_ucx_component.c +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -109,16 +109,18 @@ static int mca_spml_ucx_component_register(void) return OSHMEM_SUCCESS; } -int spml_ucx_progress(void) +int spml_ucx_ctx_progress(void) { - mca_spml_ucx_ctx_list_item_t *ctx_item, *next; - ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); - SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), - mca_spml_ucx_ctx_list_item_t) { - ucp_worker_progress(ctx_item->ctx.ucp_worker); + int i; + for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) { + ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker); } - SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); + return 1; +} + +int spml_ucx_default_progress(void) +{ + ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); return 1; } @@ -175,8 +177,13 @@ static int spml_ucx_init(void) oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE; } - OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t); - OBJ_CONSTRUCT(&(mca_spml_ucx.idle_ctx_list), opal_list_t); + mca_spml_ucx.active_array.ctxs_count = mca_spml_ucx.idle_array.ctxs_count = 0; + mca_spml_ucx.active_array.ctxs_num = mca_spml_ucx.idle_array.ctxs_num = MCA_SPML_UCX_CTXS_ARRAY_SIZE; + mca_spml_ucx.active_array.ctxs = calloc(mca_spml_ucx.active_array.ctxs_num, + sizeof(mca_spml_ucx_ctx_t *)); + mca_spml_ucx.idle_array.ctxs = calloc(mca_spml_ucx.idle_array.ctxs_num, + sizeof(mca_spml_ucx_ctx_t *)); + SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex); wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; @@ -225,7 +232,7 @@ mca_spml_ucx_component_init(int* priority, return &mca_spml_ucx.super; } -static void _ctx_cleanup(mca_spml_ucx_ctx_list_item_t *ctx_item) +static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx) { int i, j, nprocs = oshmem_num_procs(); opal_common_ucx_del_proc_t *del_procs; @@ -234,43 +241,43 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_list_item_t *ctx_item) for (i = 0; i < nprocs; ++i) { for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) { - if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) { - ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey); + if (ctx->ucp_peers[i].mkeys[j].key.rkey != NULL) { + ucp_rkey_destroy(ctx->ucp_peers[i].mkeys[j].key.rkey); } } - del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn; + del_procs[i].ep = ctx->ucp_peers[i].ucp_conn; del_procs[i].vpid = i; - ctx_item->ctx.ucp_peers[i].ucp_conn = NULL; + ctx->ucp_peers[i].ucp_conn = NULL; } opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(), mca_spml_ucx.num_disconnect, - ctx_item->ctx.ucp_worker); + ctx->ucp_worker); free(del_procs); - free(ctx_item->ctx.ucp_peers); + free(ctx->ucp_peers); } static int mca_spml_ucx_component_fini(void) { - mca_spml_ucx_ctx_list_item_t *ctx_item, *next; - int fenced = 0; + int fenced = 0, i; int ret = OSHMEM_SUCCESS; - opal_progress_unregister(spml_ucx_progress); + opal_progress_unregister(spml_ucx_default_progress); + if (mca_spml_ucx.active_array.ctxs_count) { + opal_progress_unregister(spml_ucx_ctx_progress); + } if(!mca_spml_ucx.enabled) return OSHMEM_SUCCESS; /* never selected.. return success.. */ /* delete context objects from list */ - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), - mca_spml_ucx_ctx_list_item_t) { - _ctx_cleanup(ctx_item); + for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) { + _ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]); } - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), - mca_spml_ucx_ctx_list_item_t) { - _ctx_cleanup(ctx_item); + for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) { + _ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]); } ret = opal_common_ucx_mca_pmix_fence_nb(&fenced); @@ -279,29 +286,26 @@ static int mca_spml_ucx_component_fini(void) } while (!fenced) { - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), - mca_spml_ucx_ctx_list_item_t) { - ucp_worker_progress(ctx_item->ctx.ucp_worker); + for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) { + ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker); } - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), - mca_spml_ucx_ctx_list_item_t) { - ucp_worker_progress(ctx_item->ctx.ucp_worker); + + for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) { + ucp_worker_progress(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker); } + ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); } /* delete all workers */ - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list), - mca_spml_ucx_ctx_list_item_t) { - opal_list_remove_item(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super); - ucp_worker_destroy(ctx_item->ctx.ucp_worker); - OBJ_RELEASE(ctx_item); + for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) { + ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker); + free(mca_spml_ucx.active_array.ctxs[i]); } - OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list), - mca_spml_ucx_ctx_list_item_t) { - opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super); - ucp_worker_destroy(ctx_item->ctx.ucp_worker); - OBJ_RELEASE(ctx_item); + + for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) { + ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker); + free(mca_spml_ucx.idle_array.ctxs[i]); } if (mca_spml_ucx_ctx_default.ucp_worker) { @@ -310,8 +314,9 @@ static int mca_spml_ucx_component_fini(void) mca_spml_ucx.enabled = false; /* not anymore */ - OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list)); - OBJ_DESTRUCT(&(mca_spml_ucx.idle_ctx_list)); + free(mca_spml_ucx.active_array.ctxs); + free(mca_spml_ucx.idle_array.ctxs); + SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex); if (mca_spml_ucx.ucp_context) {