oshmem/ucx: Improves performance for non-blocking put/get operations.
Improves the performance when excess non-blocking operations are posted
by periodically calling progress on ucx workers.
Co-authored with:
Artem Y. Polyakov <artemp@mellanox.com>,
Manjunath Gorentla Venkata <manjunath@mellanox.com>
Signed-off-by: Tomislav Janjusic <tomislavj@mellanox.com>
(cherry picked from commit 1b58e3d073
)
Этот коммит содержится в:
родитель
629d0efa15
Коммит
9e755d3803
@ -777,6 +777,30 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_
|
|||||||
return ucx_status_to_oshmem_nb(status);
|
return ucx_status_to_oshmem_nb(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src, void **handle)
|
||||||
|
{
|
||||||
|
unsigned int i;
|
||||||
|
void *rva;
|
||||||
|
ucs_status_t status;
|
||||||
|
spml_ucx_mkey_t *ucx_mkey;
|
||||||
|
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
|
||||||
|
|
||||||
|
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
|
||||||
|
status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
|
||||||
|
(uint64_t)rva, ucx_mkey->rkey);
|
||||||
|
|
||||||
|
if (++ucx_ctx->nb_progress_cnt > mca_spml_ucx.nb_get_progress_thresh) {
|
||||||
|
for (i = 0; i < mca_spml_ucx.nb_ucp_worker_progress; i++) {
|
||||||
|
if (!ucp_worker_progress(ucx_ctx->ucp_worker)) {
|
||||||
|
ucx_ctx->nb_progress_cnt = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ucx_status_to_oshmem_nb(status);
|
||||||
|
}
|
||||||
|
|
||||||
int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst)
|
int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst)
|
||||||
{
|
{
|
||||||
void *rva;
|
void *rva;
|
||||||
@ -825,7 +849,33 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
|
|||||||
return ucx_status_to_oshmem_nb(status);
|
return ucx_status_to_oshmem_nb(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
|
||||||
|
{
|
||||||
|
unsigned int i;
|
||||||
|
void *rva;
|
||||||
|
ucs_status_t status;
|
||||||
|
spml_ucx_mkey_t *ucx_mkey;
|
||||||
|
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
|
||||||
|
|
||||||
|
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
|
||||||
|
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
|
||||||
|
(uint64_t)rva, ucx_mkey->rkey);
|
||||||
|
|
||||||
|
if (OPAL_LIKELY(status >= 0)) {
|
||||||
|
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++ucx_ctx->nb_progress_cnt > mca_spml_ucx.nb_put_progress_thresh) {
|
||||||
|
for (i = 0; i < mca_spml_ucx.nb_ucp_worker_progress; i++) {
|
||||||
|
if (!ucp_worker_progress(ucx_ctx->ucp_worker)) {
|
||||||
|
ucx_ctx->nb_progress_cnt = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ucx_status_to_oshmem_nb(status);
|
||||||
|
}
|
||||||
|
|
||||||
int mca_spml_ucx_fence(shmem_ctx_t ctx)
|
int mca_spml_ucx_fence(shmem_ctx_t ctx)
|
||||||
{
|
{
|
||||||
@ -883,6 +933,8 @@ int mca_spml_ucx_quiet(shmem_ctx_t ctx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ucx_ctx->nb_progress_cnt = 0;
|
||||||
|
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,6 +72,7 @@ struct mca_spml_ucx_ctx {
|
|||||||
ucp_peer_t *ucp_peers;
|
ucp_peer_t *ucp_peers;
|
||||||
long options;
|
long options;
|
||||||
opal_bitmap_t put_op_bitmap;
|
opal_bitmap_t put_op_bitmap;
|
||||||
|
unsigned long nb_progress_cnt;
|
||||||
int *put_proc_indexes;
|
int *put_proc_indexes;
|
||||||
unsigned put_proc_count;
|
unsigned put_proc_count;
|
||||||
};
|
};
|
||||||
@ -109,6 +110,10 @@ struct mca_spml_ucx {
|
|||||||
pthread_spinlock_t async_lock;
|
pthread_spinlock_t async_lock;
|
||||||
int aux_refcnt;
|
int aux_refcnt;
|
||||||
bool synchronized_quiet;
|
bool synchronized_quiet;
|
||||||
|
unsigned long nb_progress_thresh_global;
|
||||||
|
unsigned long nb_put_progress_thresh;
|
||||||
|
unsigned long nb_get_progress_thresh;
|
||||||
|
unsigned long nb_ucp_worker_progress;
|
||||||
};
|
};
|
||||||
typedef struct mca_spml_ucx mca_spml_ucx_t;
|
typedef struct mca_spml_ucx mca_spml_ucx_t;
|
||||||
|
|
||||||
@ -123,6 +128,7 @@ extern int mca_spml_ucx_get(shmem_ctx_t ctx,
|
|||||||
size_t size,
|
size_t size,
|
||||||
void* src_addr,
|
void* src_addr,
|
||||||
int src);
|
int src);
|
||||||
|
|
||||||
extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
|
extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
|
||||||
void* dst_addr,
|
void* dst_addr,
|
||||||
size_t size,
|
size_t size,
|
||||||
@ -130,6 +136,13 @@ extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
|
|||||||
int src,
|
int src,
|
||||||
void **handle);
|
void **handle);
|
||||||
|
|
||||||
|
extern int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx,
|
||||||
|
void* dst_addr,
|
||||||
|
size_t size,
|
||||||
|
void* src_addr,
|
||||||
|
int src,
|
||||||
|
void **handle);
|
||||||
|
|
||||||
extern int mca_spml_ucx_put(shmem_ctx_t ctx,
|
extern int mca_spml_ucx_put(shmem_ctx_t ctx,
|
||||||
void* dst_addr,
|
void* dst_addr,
|
||||||
size_t size,
|
size_t size,
|
||||||
@ -143,6 +156,13 @@ extern int mca_spml_ucx_put_nb(shmem_ctx_t ctx,
|
|||||||
int dst,
|
int dst,
|
||||||
void **handle);
|
void **handle);
|
||||||
|
|
||||||
|
extern int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx,
|
||||||
|
void* dst_addr,
|
||||||
|
size_t size,
|
||||||
|
void* src_addr,
|
||||||
|
int dst,
|
||||||
|
void **handle);
|
||||||
|
|
||||||
extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
|
extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
|
||||||
extern int mca_spml_ucx_send(void* buf,
|
extern int mca_spml_ucx_send(void* buf,
|
||||||
size_t size,
|
size_t size,
|
||||||
|
@ -60,6 +60,20 @@ mca_spml_base_component_2_0_0_t mca_spml_ucx_component = {
|
|||||||
.spmlm_finalize = mca_spml_ucx_component_fini
|
.spmlm_finalize = mca_spml_ucx_component_fini
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static inline void mca_spml_ucx_param_register_ulong(const char* param_name,
|
||||||
|
unsigned long default_value,
|
||||||
|
const char *help_msg,
|
||||||
|
unsigned long *storage)
|
||||||
|
{
|
||||||
|
*storage = default_value;
|
||||||
|
(void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
|
||||||
|
param_name,
|
||||||
|
help_msg,
|
||||||
|
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, 0, 0,
|
||||||
|
OPAL_INFO_LVL_9,
|
||||||
|
MCA_BASE_VAR_SCOPE_READONLY,
|
||||||
|
storage);
|
||||||
|
}
|
||||||
|
|
||||||
static inline void mca_spml_ucx_param_register_int(const char* param_name,
|
static inline void mca_spml_ucx_param_register_int(const char* param_name,
|
||||||
int default_value,
|
int default_value,
|
||||||
@ -132,6 +146,22 @@ static int mca_spml_ucx_component_register(void)
|
|||||||
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
|
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
|
||||||
&mca_spml_ucx.synchronized_quiet);
|
&mca_spml_ucx.synchronized_quiet);
|
||||||
|
|
||||||
|
mca_spml_ucx_param_register_ulong("nb_progress_thresh_global", 0,
|
||||||
|
"Number of nb_put or nb_get operations before ucx progress is triggered. Disabled by default (0)",
|
||||||
|
&mca_spml_ucx.nb_progress_thresh_global);
|
||||||
|
|
||||||
|
mca_spml_ucx_param_register_ulong("nb_put_progress_thresh", mca_spml_ucx.nb_progress_thresh_global,
|
||||||
|
"Number of nb_put operations before ucx progress is triggered. Disabled by default (0), setting this value will override nb_progress_thresh_global",
|
||||||
|
&mca_spml_ucx.nb_put_progress_thresh);
|
||||||
|
|
||||||
|
mca_spml_ucx_param_register_ulong("nb_get_progress_thresh", mca_spml_ucx.nb_progress_thresh_global,
|
||||||
|
"Number of nb_get operations before ucx progress is triggered. Disabled by default (0), setting this value will override nb_progress_thresh_global ",
|
||||||
|
&mca_spml_ucx.nb_get_progress_thresh);
|
||||||
|
|
||||||
|
mca_spml_ucx_param_register_ulong("nb_ucp_worker_progress", 32,
|
||||||
|
"Maximum number of ucx worker progress calls if triggered during nb_put or nb_get",
|
||||||
|
&mca_spml_ucx.nb_ucp_worker_progress);
|
||||||
|
|
||||||
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
|
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
|
||||||
|
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
@ -294,6 +324,13 @@ static int spml_ucx_init(void)
|
|||||||
mca_spml_ucx.aux_ctx = NULL;
|
mca_spml_ucx.aux_ctx = NULL;
|
||||||
mca_spml_ucx.aux_refcnt = 0;
|
mca_spml_ucx.aux_refcnt = 0;
|
||||||
|
|
||||||
|
if (mca_spml_ucx.nb_put_progress_thresh) {
|
||||||
|
mca_spml_ucx.super.spml_put_nb = &mca_spml_ucx_put_nb_wprogress;
|
||||||
|
}
|
||||||
|
if (mca_spml_ucx.nb_get_progress_thresh) {
|
||||||
|
mca_spml_ucx.super.spml_get_nb = &mca_spml_ucx_get_nb_wprogress;
|
||||||
|
}
|
||||||
|
|
||||||
oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
|
oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
|
||||||
|
|
||||||
return OSHMEM_SUCCESS;
|
return OSHMEM_SUCCESS;
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user