1
1

SPML/UCX: added synchronized flush on quiet

- added synchronized flush operation on quiet call.
- flush is implemented using get operation

Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
(cherry picked from commit 0b108411f8)
Этот коммит содержится в:
Sergey Oblomov 2019-05-20 15:17:30 +03:00
родитель 4a7f6a4e2d
Коммит 69923e78c7
5 изменённых файлов: 115 добавлений и 4 удалений

Просмотреть файл

@ -45,6 +45,11 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
if (OPAL_LIKELY(!UCS_PTR_IS_ERR(status_ptr))) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}
return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker,
"ucp_atomic_fetch_nb");
}

Просмотреть файл

@ -51,6 +51,11 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx,
status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, size, rva,
ucx_mkey->rkey);
if (OPAL_LIKELY(UCS_OK == status)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}
return ucx_status_to_oshmem(status);
}

Просмотреть файл

@ -80,7 +80,8 @@ mca_spml_ucx_t mca_spml_ucx = {
.num_disconnect = 1,
.heap_reg_nb = 0,
.enabled = 0,
.get_mkey_slow = NULL
.get_mkey_slow = NULL,
.synchronized_quiet = false
};
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
@ -216,6 +217,40 @@ static void dump_address(int pe, char *addr, size_t len)
static char spml_ucx_transport_ids[1] = { 0 };
int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs)
{
int res;
if (mca_spml_ucx.synchronized_quiet) {
ctx->put_proc_indexes = malloc(nprocs * sizeof(*ctx->put_proc_indexes));
if (NULL == ctx->put_proc_indexes) {
return OSHMEM_ERR_OUT_OF_RESOURCE;
}
OBJ_CONSTRUCT(&ctx->put_op_bitmap, opal_bitmap_t);
res = opal_bitmap_init(&ctx->put_op_bitmap, nprocs);
if (OPAL_SUCCESS != res) {
free(ctx->put_proc_indexes);
ctx->put_proc_indexes = NULL;
return res;
}
ctx->put_proc_count = 0;
}
return OSHMEM_SUCCESS;
}
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx)
{
if (mca_spml_ucx.synchronized_quiet && ctx->put_proc_indexes) {
OBJ_DESTRUCT(&ctx->put_op_bitmap);
free(ctx->put_proc_indexes);
}
return OSHMEM_SUCCESS;
}
int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
{
size_t i, j, n;
@ -235,6 +270,11 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
goto error;
}
rc = mca_spml_ucx_init_put_op_mask(&mca_spml_ucx_ctx_default, nprocs);
if (OSHMEM_SUCCESS != rc) {
goto error;
}
err = ucp_worker_get_address(mca_spml_ucx_ctx_default.ucp_worker, &wk_local_addr, &wk_addr_len);
if (err != UCS_OK) {
goto error;
@ -297,6 +337,8 @@ error2:
free(mca_spml_ucx.remote_addrs_tbl[i]);
}
}
mca_spml_ucx_clear_put_op_mask(&mca_spml_ucx_ctx_default);
if (mca_spml_ucx_ctx_default.ucp_peers)
free(mca_spml_ucx_ctx_default.ucp_peers);
if (mca_spml_ucx.remote_addrs_tbl)
@ -583,6 +625,11 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
goto error;
}
rc = mca_spml_ucx_init_put_op_mask(ucx_ctx, nprocs);
if (OSHMEM_SUCCESS != rc) {
goto error2;
}
for (i = 0; i < nprocs; i++) {
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
@ -621,6 +668,8 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
}
}
mca_spml_ucx_clear_put_op_mask(ucx_ctx);
if (ucx_ctx->ucp_peers)
free(ucx_ctx->ucp_peers);
@ -715,6 +764,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
void *rva;
spml_ucx_mkey_t *ucx_mkey;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
int res;
#if HAVE_DECL_UCP_PUT_NB
ucs_status_ptr_t request;
#else
@ -725,12 +775,18 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
#if HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
#else
status = ucp_put(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return ucx_status_to_oshmem(status);
res = ucx_status_to_oshmem(status);
#endif
if (OPAL_LIKELY(OSHMEM_SUCCESS == res)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}
return res;
}
int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
@ -744,6 +800,10 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
if (OPAL_LIKELY(status >= 0)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}
return ucx_status_to_oshmem_nb(status);
}
@ -767,9 +827,28 @@ int mca_spml_ucx_fence(shmem_ctx_t ctx)
int mca_spml_ucx_quiet(shmem_ctx_t ctx)
{
int flush_get_data;
int ret;
unsigned i;
int idx;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
if (mca_spml_ucx.synchronized_quiet) {
for (i = 0; i < ucx_ctx->put_proc_count; i++) {
idx = ucx_ctx->put_proc_indexes[i];
ret = mca_spml_ucx_get_nb(ctx,
ucx_ctx->ucp_peers[idx].mkeys->super.super.va_base,
sizeof(flush_get_data), &flush_get_data, idx, NULL);
if (OMPI_SUCCESS != ret) {
oshmem_shmem_abort(-1);
return ret;
}
opal_bitmap_clear_bit(&ucx_ctx->put_op_bitmap, idx);
}
ucx_ctx->put_proc_count = 0;
}
opal_atomic_wmb();
ret = opal_common_ucx_worker_flush(ucx_ctx->ucp_worker);

Просмотреть файл

@ -33,6 +33,7 @@
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_bitmap.h"
#include "orte/runtime/orte_globals.h"
#include "opal/mca/common/ucx/common_ucx.h"
@ -70,6 +71,9 @@ struct mca_spml_ucx_ctx {
ucp_worker_h ucp_worker;
ucp_peer_t *ucp_peers;
long options;
opal_bitmap_t put_op_bitmap;
int *put_proc_indexes;
unsigned put_proc_count;
};
typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
@ -104,7 +108,7 @@ struct mca_spml_ucx {
mca_spml_ucx_ctx_t *aux_ctx;
pthread_spinlock_t async_lock;
int aux_refcnt;
bool synchronized_quiet;
};
typedef struct mca_spml_ucx mca_spml_ucx_t;
@ -171,6 +175,9 @@ extern int spml_ucx_ctx_progress(void);
extern int spml_ucx_progress_aux_ctx(void);
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);
int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs);
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx);
static inline void mca_spml_ucx_aux_lock(void)
{
if (mca_spml_ucx.async_progress) {
@ -224,6 +231,16 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
#endif
}
static inline void mca_spml_ucx_remote_op_posted(mca_spml_ucx_ctx_t *ctx, int dst)
{
if (OPAL_UNLIKELY(mca_spml_ucx.synchronized_quiet)) {
if (!opal_bitmap_is_set_bit(&ctx->put_op_bitmap, dst)) {
ctx->put_proc_indexes[ctx->put_proc_count++] = dst;
opal_bitmap_set_bit(&ctx->put_op_bitmap, dst);
}
}
}
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64

Просмотреть файл

@ -128,6 +128,10 @@ static int mca_spml_ucx_component_register(void)
"Asynchronous progress tick granularity (in usec)",
&mca_spml_ucx.async_tick);
mca_spml_ucx_param_register_bool("synchronized_quiet", 0,
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
&mca_spml_ucx.synchronized_quiet);
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
return OSHMEM_SUCCESS;
@ -329,6 +333,7 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
mca_spml_ucx.num_disconnect,
ctx->ucp_worker);
free(del_procs);
mca_spml_ucx_clear_put_op_mask(ctx);
free(ctx->ucp_peers);
}