1
1

Replace usage of the deprecated NB API of UCX with NBX

Signed-off-by: Leonid Genkin <lgenkin@nvidia.com>
This commit is contained in:
Leonid Genkin 2020-11-05 13:41:21 +02:00
parent 62a76c9e7d
commit 7f9a305a64
4 changed files with 180 additions and 28 deletions

View File

@ -109,7 +109,8 @@ AC_DEFUN([OMPI_CHECK_UCX],[
[have ucp_tag_send_nbr()])], [],
[#include <ucp/api/ucp.h>])
AC_CHECK_DECLS([ucp_ep_flush_nb, ucp_worker_flush_nb,
ucp_request_check_status, ucp_put_nb, ucp_get_nb],
ucp_request_check_status, ucp_put_nb, ucp_get_nb,
ucp_put_nbx, ucp_get_nbx, ucp_atomic_op_nbx],
[], [],
[#include <ucp/api/ucp.h>])
AC_CHECK_DECLS([ucm_test_events,

View File

@ -31,6 +31,14 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
ucp_request_param_t param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FIELD_REPLY_BUFFER,
.datatype = ucp_dt_make_contig(size),
.reply_buffer = prev
};
#endif
if ((8 != size) && (4 != size)) {
ATOMIC_ERROR("[#%d] Type size must be 4 or 8 bytes.", my_pe);
@ -41,15 +49,25 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
*prev = value;
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
status_ptr = ucp_atomic_op_nbx(ucx_ctx->ucp_peers[pe].ucp_conn,
UCP_ATOMIC_OP_CSWAP, &cond, 1, rva,
ucx_mkey->rkey, &param);
#else
status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
#endif
if (OPAL_LIKELY(!UCS_PTR_IS_ERR(status_ptr))) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}
return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0],
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
"ucp_atomic_op_nbx");
#else
"ucp_atomic_fetch_nb");
#endif
}

View File

@ -18,6 +18,17 @@
#include "oshmem/proc/proc.h"
#include "atomic_ucx.h"
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
/*
* A static params array, for datatypes of size 4 and 8. "size >> 3" is used to
* access the corresponding offset.
*/
static ucp_request_param_t mca_spml_ucp_request_params[] = {
{.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE, .datatype = ucp_dt_make_contig(4)},
{.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE, .datatype = ucp_dt_make_contig(8)}
};
#endif
/*
* Initial query function that is invoked during initialization, allowing
* this module to indicate what level of thread support it provides.
@ -38,20 +49,37 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx,
uint64_t value,
size_t size,
int pe,
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
ucp_atomic_op_t op)
#else
ucp_atomic_post_op_t op)
#endif
{
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
ucs_status_ptr_t status_ptr;
#endif
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
status_ptr = ucp_atomic_op_nbx(ucx_ctx->ucp_peers[pe].ucp_conn,
op, &value, 1, rva, ucx_mkey->rkey,
&mca_spml_ucp_request_params[size >> 3]);
if (OPAL_LIKELY(!UCS_PTR_IS_ERR(status_ptr))) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}
status = UCS_PTR_STATUS(status_ptr);
#else
status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, size, rva,
ucx_mkey->rkey);
#endif
if (OPAL_LIKELY(UCS_OK == status)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
}
@ -66,22 +94,41 @@ int mca_atomic_ucx_fop(shmem_ctx_t ctx,
uint64_t value,
size_t size,
int pe,
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
ucp_atomic_op_t op)
#else
ucp_atomic_fetch_op_t op)
#endif
{
ucs_status_ptr_t status_ptr;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
ucp_request_param_t param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FIELD_REPLY_BUFFER,
.datatype = ucp_dt_make_contig(size),
.reply_buffer = prev
};
#endif
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(ctx, pe, target, (void *)&rva, mca_spml_self);
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
status_ptr = ucp_atomic_op_nbx(ucx_ctx->ucp_peers[pe].ucp_conn, op, &value, 1,
rva, ucx_mkey->rkey, &param);
return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0],
"ucp_atomic_op_nbx");
#else
status_ptr = ucp_atomic_fetch_nb(ucx_ctx->ucp_peers[pe].ucp_conn,
op, value, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker[0],
"ucp_atomic_fetch_nb");
#endif
}
static int mca_atomic_ucx_add(shmem_ctx_t ctx,
@ -90,7 +137,11 @@ static int mca_atomic_ucx_add(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_OP_ADD);
#else
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_ADD);
#endif
}
static int mca_atomic_ucx_and(shmem_ctx_t ctx,
@ -99,7 +150,9 @@ static int mca_atomic_ucx_and(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_POST_OP_AND
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_OP_AND);
#elif HAVE_DECL_UCP_ATOMIC_POST_OP_AND
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_AND);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -112,7 +165,9 @@ static int mca_atomic_ucx_or(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_POST_OP_OR
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_OP_OR);
#elif HAVE_DECL_UCP_ATOMIC_POST_OP_OR
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_OR);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -125,7 +180,9 @@ static int mca_atomic_ucx_xor(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_POST_OP_XOR
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_OP_XOR);
#elif HAVE_DECL_UCP_ATOMIC_POST_OP_XOR
return mca_atomic_ucx_op(ctx, target, value, size, pe, UCP_ATOMIC_POST_OP_XOR);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -139,7 +196,11 @@ static int mca_atomic_ucx_fadd(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_OP_ADD);
#else
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FADD);
#endif
}
static int mca_atomic_ucx_fand(shmem_ctx_t ctx,
@ -149,7 +210,9 @@ static int mca_atomic_ucx_fand(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FAND
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_OP_AND);
#elif HAVE_DECL_UCP_ATOMIC_FETCH_OP_FAND
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FAND);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -163,7 +226,9 @@ static int mca_atomic_ucx_for(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FOR
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_OP_OR);
#elif HAVE_DECL_UCP_ATOMIC_FETCH_OP_FOR
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FOR);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -177,7 +242,9 @@ static int mca_atomic_ucx_fxor(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_FETCH_OP_FXOR
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_OP_XOR);
#elif HAVE_DECL_UCP_ATOMIC_FETCH_OP_FXOR
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_FXOR);
#else
return OSHMEM_ERR_NOT_IMPLEMENTED;
@ -191,7 +258,11 @@ static int mca_atomic_ucx_swap(shmem_ctx_t ctx,
size_t size,
int pe)
{
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_OP_SWAP);
#else
return mca_atomic_ucx_fop(ctx, target, prev, value, size, pe, UCP_ATOMIC_FETCH_OP_SWAP);
#endif
}

View File

@ -87,6 +87,10 @@ mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
.options = 0
};
#if HAVE_DECL_UCP_ATOMIC_OP_NBX
static ucp_request_param_t mca_spml_ucx_request_param = {0};
#endif
int mca_spml_ucx_enable(bool enable)
{
SPML_UCX_VERBOSE(50, "*** ucx ENABLED ****");
@ -810,16 +814,19 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_GET_NB
#if (HAVE_DECL_UCP_GET_NBX || HAVE_DECL_UCP_GET_NB)
ucs_status_ptr_t request;
#else
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_GET_NB
#if HAVE_DECL_UCP_GET_NBX
request = ucp_get_nbx(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, &mca_spml_ucx_request_param);
return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_get_nbx");
#elif HAVE_DECL_UCP_GET_NB
request = ucp_get_nb(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_get_nb");
@ -834,13 +841,25 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_
{
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_GET_NBX
ucs_status_ptr_t status_ptr;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_GET_NBX
status_ptr = ucp_get_nbx(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, &mca_spml_ucx_request_param);
if (UCS_PTR_IS_PTR(status_ptr)) {
ucp_request_free(status_ptr);
status = UCS_INPROGRESS;
} else {
status = UCS_PTR_STATUS(status_ptr);
}
#else
status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
return ucx_status_to_oshmem_nb(status);
}
@ -849,12 +868,26 @@ int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx, void *src_addr, size_t size,
unsigned int i;
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_GET_NBX
ucs_status_ptr_t status_ptr;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_GET_NBX
status_ptr = ucp_get_nbx(ucx_ctx->ucp_peers[src].ucp_conn,
dst_addr, size, (uint64_t)rva,
ucx_mkey->rkey, &mca_spml_ucx_request_param);
if (UCS_PTR_IS_PTR(status_ptr)) {
ucp_request_free(status_ptr);
status = UCS_INPROGRESS;
} else {
status = UCS_PTR_STATUS(status_ptr);
}
#else
status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
if (++ucx_ctx->nb_progress_cnt > mca_spml_ucx.nb_get_progress_thresh) {
for (i = 0; i < mca_spml_ucx.nb_ucp_worker_progress; i++) {
@ -871,17 +904,20 @@ int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx, void *src_addr, size_t size,
int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
int res;
#if HAVE_DECL_UCP_PUT_NB
#if (HAVE_DECL_UCP_PUT_NBX || HAVE_DECL_UCP_PUT_NB)
ucs_status_ptr_t request;
#else
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_PUT_NB
#if HAVE_DECL_UCP_PUT_NBX
request = ucp_put_nbx(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, &mca_spml_ucx_request_param);
res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_put_nbx");
#elif HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker[0], "ucp_put_nb");
@ -901,14 +937,27 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
{
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
ucs_status_t status;
#if HAVE_DECL_UCP_PUT_NBX
ucs_status_ptr_t status_ptr;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_PUT_NBX
status_ptr = ucp_put_nbx(ucx_ctx->ucp_peers[dst].ucp_conn,
src_addr, size, (uint64_t)rva,
ucx_mkey->rkey, &mca_spml_ucx_request_param);
if (UCS_PTR_IS_PTR(status_ptr)) {
ucp_request_free(status_ptr);
status = UCS_INPROGRESS;
} else {
status = UCS_PTR_STATUS(status_ptr);
}
#else
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
if (OPAL_LIKELY(status >= 0)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}
@ -921,13 +970,26 @@ int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx, void* dst_addr, size_t size,
unsigned int i;
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
spml_ucx_mkey_t *ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
#if HAVE_DECL_UCP_PUT_NBX
ucs_status_ptr_t status_ptr;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_PUT_NBX
status_ptr = ucp_put_nbx(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey,
&mca_spml_ucx_request_param);
if (UCS_PTR_IS_PTR(status_ptr)) {
ucp_request_free(status_ptr);
status = UCS_INPROGRESS;
} else {
status = UCS_PTR_STATUS(status_ptr);
}
#else
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
if (OPAL_LIKELY(status >= 0)) {
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
}