1
1

Merge pull request #6509 from janjust/oshmem-multiple-contexts-v4.0.x

v4.0.x: Oshmem multiple contexts
Этот коммит содержится в:
Howard Pritchard 2019-04-01 13:15:38 -06:00 коммит произвёл GitHub
родитель b11cb23b71 69a80fce9f
Коммит 976cc1e07f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 275 добавлений и 133 удалений

Просмотреть файл

@ -153,6 +153,11 @@ void opal_common_ucx_mca_proc_added(void)
#endif
}
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
{
return opal_pmix.fence_nb(NULL, 0, opal_common_ucx_mca_fence_complete_cb, (void *)fenced);
}
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
{
volatile int fenced = 0;
@ -181,9 +186,8 @@ static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker
}
}
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
{
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker) {
size_t num_reqs;
size_t max_reqs;
void *dreq, **dreqs;
@ -230,7 +234,13 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
free(dreqs);
opal_common_ucx_mca_pmix_fence(worker);
return OPAL_SUCCESS;
}
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
{
opal_common_ucx_del_procs_nofence(procs, count, my_rank, max_disconnect, worker);
return opal_common_ucx_mca_pmix_fence(worker);
}

Просмотреть файл

@ -100,9 +100,12 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced);
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *procs, size_t count,
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
static inline
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)

Просмотреть файл

@ -40,7 +40,7 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
assert(NULL != prev);
*prev = value;
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,

Просмотреть файл

@ -47,7 +47,7 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx,
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, size, rva,
ucx_mkey->rkey);
@ -70,7 +70,7 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx,
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, pe, target, (void *)&rva, mca_spml_self);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, prev, size,
rva, ucx_mkey->rkey,

Просмотреть файл

@ -69,7 +69,8 @@ void memheap_oob_destruct(void);
OSHMEM_DECLSPEC int mca_memheap_base_is_symmetric_addr(const void* va);
OSHMEM_DECLSPEC sshmem_mkey_t *mca_memheap_base_get_mkey(void* va,
int tr_id);
OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s,
OSHMEM_DECLSPEC sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx,
map_segment_t *s,
int pe,
void* va,
int btl_id,
@ -243,7 +244,8 @@ static inline map_segment_t *memheap_find_va(void* va)
return s;
}
static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe,
static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(shmem_ctx_t ctx,
int pe,
void* va,
int btl_id,
void** rva)
@ -273,7 +275,7 @@ static inline sshmem_mkey_t *mca_memheap_base_get_cached_mkey(int pe,
return mkey;
}
return mca_memheap_base_get_cached_mkey_slow(s, pe, va, btl_id, rva);
return mca_memheap_base_get_cached_mkey_slow(ctx, s, pe, va, btl_id, rva);
}
static inline int mca_memheap_base_num_transports(void)

Просмотреть файл

@ -55,6 +55,7 @@ struct oob_comm {
oob_comm_request_t req_pool[MEMHEAP_RECV_REQS_MAX];
opal_list_t req_list;
int is_inited;
shmem_ctx_t ctx;
};
mca_memheap_map_t* memheap_map = NULL;
@ -66,7 +67,7 @@ static int send_buffer(int pe, opal_buffer_t *msg);
static int oshmem_mkey_recv_cb(void);
/* pickup list of rkeys and remote va */
static int memheap_oob_get_mkeys(int pe,
static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe,
uint32_t va_seg_num,
sshmem_mkey_t *mkey);
@ -142,7 +143,7 @@ static void memheap_attach_segment(sshmem_mkey_t *mkey, int tr_id)
}
static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe)
static void unpack_remote_mkeys(shmem_ctx_t ctx, opal_buffer_t *msg, int remote_pe)
{
int32_t cnt;
int32_t n;
@ -182,7 +183,7 @@ static void unpack_remote_mkeys(opal_buffer_t *msg, int remote_pe)
} else {
memheap_oob.mkeys[tr_id].u.key = MAP_SEGMENT_SHM_INVALID;
}
MCA_SPML_CALL(rmkey_unpack(&memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id));
MCA_SPML_CALL(rmkey_unpack(ctx, &memheap_oob.mkeys[tr_id], memheap_oob.segno, remote_pe, tr_id));
}
MEMHEAP_VERBOSE(5,
@ -242,7 +243,7 @@ static void do_recv(int source_pe, opal_buffer_t* buffer)
case MEMHEAP_RKEY_RESP:
MEMHEAP_VERBOSE(5, "*** RKEY RESP");
OPAL_THREAD_LOCK(&memheap_oob.lck);
unpack_remote_mkeys(buffer, source_pe);
unpack_remote_mkeys(memheap_oob.ctx, buffer, source_pe);
memheap_oob.mkeys_rcvd = MEMHEAP_RKEY_RESP;
opal_condition_broadcast(&memheap_oob.cond);
OPAL_THREAD_UNLOCK(&memheap_oob.lck);
@ -455,14 +456,14 @@ static int send_buffer(int pe, opal_buffer_t *msg)
return rc;
}
static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys)
static int memheap_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t seg, sshmem_mkey_t *mkeys)
{
opal_buffer_t *msg;
uint8_t cmd;
int i;
int rc;
if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(pe, seg, mkeys))) {
if (OSHMEM_SUCCESS == MCA_SPML_CALL(oob_get_mkeys(ctx, pe, seg, mkeys))) {
for (i = 0; i < memheap_map->num_transports; i++) {
MEMHEAP_VERBOSE(5,
"MKEY CALCULATED BY LOCAL SPML: pe: %d tr_id: %d %s",
@ -478,6 +479,7 @@ static int memheap_oob_get_mkeys(int pe, uint32_t seg, sshmem_mkey_t *mkeys)
memheap_oob.mkeys = mkeys;
memheap_oob.segno = seg;
memheap_oob.mkeys_rcvd = 0;
memheap_oob.ctx = ctx;
msg = OBJ_NEW(opal_buffer_t);
if (!msg) {
@ -645,7 +647,7 @@ void mca_memheap_modex_recv_all(void)
}
memheap_oob.mkeys = s->mkeys_cache[i];
memheap_oob.segno = j;
unpack_remote_mkeys(msg, i);
unpack_remote_mkeys(oshmem_ctx_default, msg, i);
}
}
@ -674,7 +676,8 @@ exit_fatal:
}
}
sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s,
sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(shmem_ctx_t ctx,
map_segment_t *s,
int pe,
void* va,
int btl_id,
@ -692,7 +695,7 @@ sshmem_mkey_t * mca_memheap_base_get_cached_mkey_slow(map_segment_t *s,
if (!s->mkeys_cache[pe])
return NULL ;
rc = memheap_oob_get_mkeys(pe,
rc = memheap_oob_get_mkeys(ctx, pe,
s - memheap_map->mem_segs,
s->mkeys_cache[pe]);
if (OSHMEM_SUCCESS != rc)

Просмотреть файл

@ -72,11 +72,12 @@ OSHMEM_DECLSPEC int mca_spml_base_test(void* addr,
void* value,
int datatype,
int *out_value);
OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(int pe,
OSHMEM_DECLSPEC int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx,
int pe,
uint32_t seg,
sshmem_mkey_t *mkeys);
OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id);
OSHMEM_DECLSPEC void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t seg, int pe, int tr_id);
OSHMEM_DECLSPEC void mca_spml_base_rmkey_free(sshmem_mkey_t *mkey);
OSHMEM_DECLSPEC void *mca_spml_base_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe);

Просмотреть файл

@ -247,12 +247,12 @@ int mca_spml_base_wait_nb(void* handle)
return OSHMEM_SUCCESS;
}
int mca_spml_base_oob_get_mkeys(int pe, uint32_t segno, sshmem_mkey_t *mkeys)
int mca_spml_base_oob_get_mkeys(shmem_ctx_t ctx, int pe, uint32_t segno, sshmem_mkey_t *mkeys)
{
return OSHMEM_ERROR;
}
void mca_spml_base_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id)
void mca_spml_base_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id)
{
}

Просмотреть файл

@ -132,7 +132,7 @@ typedef int (*mca_spml_base_module_test_fn_t)(void* addr,
*
* @param mkey remote mkey
*/
typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id);
typedef void (*mca_spml_base_module_mkey_unpack_fn_t)(shmem_ctx_t ctx, sshmem_mkey_t *, uint32_t segno, int remote_pe, int tr_id);
/**
* If possible, get a pointer to the remote memory described by the mkey
@ -180,7 +180,7 @@ typedef int (*mca_spml_base_module_deregister_fn_t)(sshmem_mkey_t *mkeys);
*
* @return OSHMEM_SUCCSESS if keys are found
*/
typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(int pe,
typedef int (*mca_spml_base_module_oob_get_mkeys_fn_t)(shmem_ctx_t ctx, int pe,
uint32_t seg,
sshmem_mkey_t *mkeys);

Просмотреть файл

@ -44,9 +44,6 @@
#define SPML_UCX_PUT_DEBUG 0
#endif
static
spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva);
mca_spml_ucx_t mca_spml_ucx = {
.super = {
/* Init mca_spml_base_module_t */
@ -80,11 +77,9 @@ mca_spml_ucx_t mca_spml_ucx = {
.num_disconnect = 1,
.heap_reg_nb = 0,
.enabled = 0,
.get_mkey_slow = mca_spml_ucx_get_mkey_slow
.get_mkey_slow = NULL
};
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
.ucp_worker = NULL,
.ucp_peers = NULL,
@ -220,7 +215,7 @@ static char spml_ucx_transport_ids[1] = { 0 };
int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
{
size_t i, n;
size_t i, j, n;
int rc = OSHMEM_ERROR;
int my_rank = oshmem_my_proc_id();
ucs_status_t err;
@ -249,7 +244,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
goto error;
}
opal_progress_register(spml_ucx_progress);
opal_progress_register(spml_ucx_default_progress);
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
@ -273,6 +268,10 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
mca_spml_ucx_ctx_default.ucp_peers[i].mkeys[j].key.rkey = NULL;
}
mca_spml_ucx.remote_addrs_tbl[i] = (char *)malloc(wk_rsizes[i]);
memcpy(mca_spml_ucx.remote_addrs_tbl[i], (char *)(wk_raddrs + wk_roffs[i]),
wk_rsizes[i]);
@ -309,22 +308,6 @@ error:
}
static
spml_ucx_mkey_t * mca_spml_ucx_get_mkey_slow(int pe, void *va, void **rva)
{
sshmem_mkey_t *r_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_UCX_ERROR("pe=%d: %p is not address of symmetric variable",
pe, va);
oshmem_shmem_abort(-1);
return NULL;
}
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
}
void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey)
{
spml_ucx_mkey_t *ucx_mkey;
@ -353,31 +336,26 @@ void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *mkey, int pe)
#endif
}
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
{
ucp_peer_t *peer;
peer = &(ucx_ctx->ucp_peers[dst_pe]);
mkey_segment_init(&peer->mkeys[segno].super, mkey, segno);
}
void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id)
void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id)
{
spml_ucx_mkey_t *ucx_mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
ucs_status_t err;
ucx_mkey = &mca_spml_ucx_ctx_default.ucp_peers[pe].mkeys[segno].key;
ucx_mkey = &ucx_ctx->ucp_peers[pe].mkeys[segno].key;
err = ucp_ep_rkey_unpack(mca_spml_ucx_ctx_default.ucp_peers[pe].ucp_conn,
mkey->u.data,
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[pe].ucp_conn,
mkey->u.data,
&ucx_mkey->rkey);
if (UCS_OK != err) {
SPML_UCX_ERROR("failed to unpack rkey: %s", ucs_status_string(err));
goto error_fatal;
}
mkey->spml_context = ucx_mkey;
mca_spml_ucx_cache_mkey(&mca_spml_ucx_ctx_default, mkey, segno, pe);
if (ucx_ctx == &mca_spml_ucx_ctx_default) {
mkey->spml_context = ucx_mkey;
}
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, segno, pe);
return;
error_fatal:
@ -502,12 +480,14 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
{
spml_ucx_mkey_t *ucx_mkey;
map_segment_t *mem_seg;
int segno;
int my_pe = oshmem_my_proc_id();
MCA_SPML_CALL(quiet(oshmem_ctx_default));
if (!mkeys)
return OSHMEM_SUCCESS;
if (!mkeys[0].spml_context)
if (!mkeys[0].spml_context)
return OSHMEM_SUCCESS;
mem_seg = memheap_find_va(mkeys[0].va_base);
@ -521,25 +501,68 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
}
ucp_rkey_destroy(ucx_mkey->rkey);
ucx_mkey->rkey = NULL;
if (0 < mkeys[0].len) {
ucp_rkey_buffer_release(mkeys[0].u.data);
}
free(mkeys);
return OSHMEM_SUCCESS;
}
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
{
int i;
if (array->ctxs_count < array->ctxs_num) {
array->ctxs[array->ctxs_count] = ctx;
} else {
array->ctxs = realloc(array->ctxs, (array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC) * sizeof(mca_spml_ucx_ctx_t *));
opal_atomic_wmb ();
for (i = array->ctxs_num; i < array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC; i++) {
array->ctxs[i] = NULL;
}
array->ctxs[array->ctxs_num] = ctx;
array->ctxs_num += MCA_SPML_UCX_CTXS_ARRAY_INC;
}
opal_atomic_wmb ();
array->ctxs_count++;
}
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
{
int i;
for (i = 0; i < array->ctxs_count; i++) {
if (array->ctxs[i] == ctx) {
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
array->ctxs[array->ctxs_count-1] = NULL;
break;
}
}
array->ctxs_count--;
opal_atomic_wmb ();
}
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
{
mca_spml_ucx_ctx_list_item_t *ctx_item;
mca_spml_ucx_ctx_t *ucx_ctx;
ucp_worker_params_t params;
ucp_ep_params_t ep_params;
size_t i, nprocs = oshmem_num_procs();
size_t i, j, nprocs = oshmem_num_procs();
ucs_status_t err;
int my_pe = oshmem_my_proc_id();
size_t len;
spml_ucx_mkey_t *ucx_mkey;
sshmem_mkey_t *mkey;
int rc = OSHMEM_ERROR;
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
ctx_item->ctx.options = options;
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
ucx_ctx->options = options;
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
@ -549,52 +572,66 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
}
err = ucp_worker_create(mca_spml_ucx.ucp_context, &params,
&ctx_item->ctx.ucp_worker);
&ucx_ctx->ucp_worker);
if (UCS_OK != err) {
OBJ_RELEASE(ctx_item);
free(ucx_ctx);
return OSHMEM_ERROR;
}
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
if (NULL == ctx_item->ctx.ucp_peers) {
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
if (NULL == ucx_ctx->ucp_peers) {
goto error;
}
if (mca_spml_ucx.active_array.ctxs_count == 0) {
opal_progress_register(spml_ucx_ctx_progress);
}
for (i = 0; i < nprocs; i++) {
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
&ctx_item->ctx.ucp_peers[i].ucp_conn);
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
&ucx_ctx->ucp_peers[i].ucp_conn);
if (UCS_OK != err) {
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
ucs_status_string(err));
goto error2;
}
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
mkey->u.data,
&ucx_mkey->rkey);
if (UCS_OK != err) {
SPML_UCX_ERROR("failed to unpack rkey");
goto error2;
}
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
}
}
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
(*ctx) = (shmem_ctx_t)ucx_ctx;
return OSHMEM_SUCCESS;
error2:
for (i = 0; i < nprocs; i++) {
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
if (ucx_ctx->ucp_peers[i].ucp_conn) {
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
}
}
if (ctx_item->ctx.ucp_peers)
free(ctx_item->ctx.ucp_peers);
if (ucx_ctx->ucp_peers)
free(ucx_ctx->ucp_peers);
error:
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
OBJ_RELEASE(ctx_item);
ucp_worker_destroy(ucx_ctx->ucp_worker);
free(ucx_ctx);
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
SPML_ERROR("ctx create FAILED rc=%d", rc);
return rc;
@ -602,29 +639,16 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
{
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
size_t i, nprocs = oshmem_num_procs();
MCA_SPML_CALL(quiet(ctx));
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
/* delete context object from list */
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
mca_spml_ucx_ctx_list_item_t) {
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
for (i = 0; i < nprocs; i++) {
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
}
free(ctx_item->ctx.ucp_peers);
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
OBJ_RELEASE(ctx_item);
break;
}
}
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
if (!mca_spml_ucx.active_array.ctxs_count) {
opal_progress_unregister(spml_ucx_ctx_progress);
}
}
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)
@ -638,7 +662,7 @@ int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_add
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_GET_NB
request = ucp_get_nb(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
@ -657,7 +681,7 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_
spml_ucx_mkey_t *ucx_mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, src, src_addr, &rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
@ -675,7 +699,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
@ -694,7 +718,7 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
spml_ucx_mkey_t *ucx_mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
ucx_mkey = mca_spml_ucx_get_mkey(ucx_ctx, dst, dst_addr, &rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

Просмотреть файл

@ -75,13 +75,13 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
struct mca_spml_ucx_ctx_list_item {
opal_list_item_t super;
mca_spml_ucx_ctx_t ctx;
};
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(int pe, void *va, void **rva);
typedef struct mca_spml_ucx_ctx_array {
int ctxs_count;
int ctxs_num;
mca_spml_ucx_ctx_t **ctxs;
} mca_spml_ucx_ctx_array_t;
struct mca_spml_ucx {
mca_spml_base_module_t super;
@ -91,7 +91,8 @@ struct mca_spml_ucx {
bool enabled;
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
char **remote_addrs_tbl;
opal_list_t ctx_list;
mca_spml_ucx_ctx_array_t active_array;
mca_spml_ucx_ctx_array_t idle_array;
int priority; /* component priority */
shmem_internal_mutex_t internal_mutex;
};
@ -143,7 +144,7 @@ extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys);
extern void mca_spml_ucx_memuse_hook(void *addr, size_t length);
extern void mca_spml_ucx_rmkey_unpack(sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id);
extern void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id);
extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey);
extern void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *, int pe);
@ -151,20 +152,26 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
extern int spml_ucx_progress(void);
extern int spml_ucx_default_progress(void);
extern int spml_ucx_ctx_progress(void);
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
{
ucp_peer_t *peer;
peer = &(ucx_ctx->ucp_peers[dst_pe]);
mkey_segment_init(&peer->mkeys[segno].super, mkey, segno);
}
static inline spml_ucx_mkey_t *
mca_spml_ucx_get_mkey(mca_spml_ucx_ctx_t *ucx_ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module)
mca_spml_ucx_get_mkey(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module)
{
spml_ucx_cached_mkey_t *mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
mkey = ucx_ctx->ucp_peers[pe].mkeys;
mkey = (spml_ucx_cached_mkey_t *)map_segment_find_va(&mkey->super.super, sizeof(*mkey), va);
if (OPAL_UNLIKELY(NULL == mkey)) {
assert(module->get_mkey_slow);
return module->get_mkey_slow(pe, va, rva);
}
assert(mkey != NULL);
*rva = map_segment_va2rva(&mkey->super, va);
return &mkey->key;
}
@ -187,6 +194,9 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
#endif
}
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64
END_C_DECLS
#endif

Просмотреть файл

@ -109,7 +109,16 @@ static int mca_spml_ucx_component_register(void)
return OSHMEM_SUCCESS;
}
int spml_ucx_progress(void)
int spml_ucx_ctx_progress(void)
{
int i;
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
}
return 1;
}
int spml_ucx_default_progress(void)
{
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
return 1;
@ -168,7 +177,13 @@ static int spml_ucx_init(void)
oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
}
OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t);
mca_spml_ucx.active_array.ctxs_count = mca_spml_ucx.idle_array.ctxs_count = 0;
mca_spml_ucx.active_array.ctxs_num = mca_spml_ucx.idle_array.ctxs_num = MCA_SPML_UCX_CTXS_ARRAY_SIZE;
mca_spml_ucx.active_array.ctxs = calloc(mca_spml_ucx.active_array.ctxs_num,
sizeof(mca_spml_ucx_ctx_t *));
mca_spml_ucx.idle_array.ctxs = calloc(mca_spml_ucx.idle_array.ctxs_num,
sizeof(mca_spml_ucx_ctx_t *));
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
@ -217,19 +232,91 @@ mca_spml_ucx_component_init(int* priority,
return &mca_spml_ucx.super;
}
static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
{
int i, j, nprocs = oshmem_num_procs();
opal_common_ucx_del_proc_t *del_procs;
del_procs = malloc(sizeof(*del_procs) * nprocs);
for (i = 0; i < nprocs; ++i) {
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
if (ctx->ucp_peers[i].mkeys[j].key.rkey != NULL) {
ucp_rkey_destroy(ctx->ucp_peers[i].mkeys[j].key.rkey);
}
}
del_procs[i].ep = ctx->ucp_peers[i].ucp_conn;
del_procs[i].vpid = i;
ctx->ucp_peers[i].ucp_conn = NULL;
}
opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
mca_spml_ucx.num_disconnect,
ctx->ucp_worker);
free(del_procs);
free(ctx->ucp_peers);
}
static int mca_spml_ucx_component_fini(void)
{
opal_progress_unregister(spml_ucx_progress);
if (mca_spml_ucx_ctx_default.ucp_worker) {
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
int fenced = 0, i;
int ret = OSHMEM_SUCCESS;
opal_progress_unregister(spml_ucx_default_progress);
if (mca_spml_ucx.active_array.ctxs_count) {
opal_progress_unregister(spml_ucx_ctx_progress);
}
if(!mca_spml_ucx.enabled)
return OSHMEM_SUCCESS; /* never selected.. return success.. */
/* delete context objects from list */
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
_ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]);
}
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
_ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]);
}
ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
if (OPAL_SUCCESS != ret) {
return ret;
}
while (!fenced) {
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
ucp_worker_progress(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
}
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
ucp_worker_progress(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
}
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
}
/* delete all workers */
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
ucp_worker_destroy(mca_spml_ucx.active_array.ctxs[i]->ucp_worker);
free(mca_spml_ucx.active_array.ctxs[i]);
}
for (i = 0; i < mca_spml_ucx.idle_array.ctxs_count; i++) {
ucp_worker_destroy(mca_spml_ucx.idle_array.ctxs[i]->ucp_worker);
free(mca_spml_ucx.idle_array.ctxs[i]);
}
if (mca_spml_ucx_ctx_default.ucp_worker) {
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
}
mca_spml_ucx.enabled = false; /* not anymore */
OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list));
free(mca_spml_ucx.active_array.ctxs);
free(mca_spml_ucx.idle_array.ctxs);
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
if (mca_spml_ucx.ucp_context) {

Просмотреть файл

@ -31,7 +31,8 @@ int shmem_addr_accessible(const void *addr, int pe)
RUNTIME_CHECK_INIT();
for (i = 0; i < mca_memheap_base_num_transports(); i++) {
mkey = mca_memheap_base_get_cached_mkey(pe, (void *)addr, i, &rva);
/* TODO: iterate on all ctxs, try to get cached mkey */
mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)addr, i, &rva);
if (mkey) {
return 1;
}

Просмотреть файл

@ -52,7 +52,8 @@ void *shmem_ptr(const void *dst_addr, int pe)
}
for (i = 0; i < mca_memheap_base_num_transports(); i++) {
mkey = mca_memheap_base_get_cached_mkey(pe, (void *)dst_addr, i, &rva);
/* TODO: iterate on all ctxs, try to get cached mkeys */
mkey = mca_memheap_base_get_cached_mkey(oshmem_ctx_default, pe, (void *)dst_addr, i, &rva);
if (!mkey) {
continue;
}