diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c index 8014ac6f23..a5b392568b 100644 --- a/ompi/mca/osc/ucx/osc_ucx_component.c +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -17,8 +17,6 @@ #include "osc_ucx.h" #include "osc_ucx_request.h" -#define UCX_VERSION(_major, _minor, _build) (((_major) * 100) + (_minor)) - #define memcpy_off(_dst, _src, _len, _off) \ memcpy(((char*)(_dst)) + (_off), _src, _len); \ (_off) += (_len); diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 7ea91ab729..202131ac89 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -34,6 +34,8 @@ BEGIN_C_DECLS # define MCA_COMMON_UCX_ASSERT(_x) #endif +#define UCX_VERSION(_major, _minor, _build) (((_major) * 100) + (_minor)) + #define _MCA_COMMON_UCX_QUOTE(_x) \ # _x #define MCA_COMMON_UCX_QUOTE(_x) \ diff --git a/oshmem/include/shmemx.h b/oshmem/include/shmemx.h index b3b55cb7dd..d99ca11533 100644 --- a/oshmem/include/shmemx.h +++ b/oshmem/include/shmemx.h @@ -168,6 +168,21 @@ OSHMEM_DECLSPEC void shmemx_int16_prod_to_all(int16_t *target, const int16_t *so OSHMEM_DECLSPEC void shmemx_int32_prod_to_all(int32_t *target, const int32_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int32_t *pWrk, long *pSync); OSHMEM_DECLSPEC void shmemx_int64_prod_to_all(int64_t *target, const int64_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int64_t *pWrk, long *pSync); +/* shmemx_alltoall_global_nb is a nonblocking collective routine, where each PE + * exchanges “size” bytes of data with all other PEs in the OpenSHMEM job. + + * @param dest A symmetric data object that is large enough to receive + * “size” bytes of data from each PE in the OpenSHMEM job. + * @param source A symmetric data object that contains “size” bytes of data + * for each PE in the OpenSHMEM job. + * @param size The number of bytes to be sent to each PE in the job. + * @param counter A symmetric data object to be atomically incremented after + * the target buffer is updated. + * + * @return OSHMEM_SUCCESS or failure status. + */ +OSHMEM_DECLSPEC void shmemx_alltoall_global_nb(void *dest, const void *source, size_t size, long *counter); + /* * Backward compatibility section */ diff --git a/oshmem/mca/spml/base/base.h b/oshmem/mca/spml/base/base.h index e3ec1b6855..75a4eaec18 100644 --- a/oshmem/mca/spml/base/base.h +++ b/oshmem/mca/spml/base/base.h @@ -93,6 +93,10 @@ OSHMEM_DECLSPEC int mca_spml_base_get_nb(void *dst_addr, void **handle); OSHMEM_DECLSPEC void mca_spml_base_memuse_hook(void *addr, size_t length); + +OSHMEM_DECLSPEC int mca_spml_base_put_all_nb(void *target, const void *source, + size_t size, long *counter); + /* * MCA framework */ diff --git a/oshmem/mca/spml/base/spml_base.c b/oshmem/mca/spml/base/spml_base.c index 8ef3894e98..52ca7b4d61 100644 --- a/oshmem/mca/spml/base/spml_base.c +++ b/oshmem/mca/spml/base/spml_base.c @@ -280,3 +280,9 @@ int mca_spml_base_get_nb(void *dst_addr, size_t size, void mca_spml_base_memuse_hook(void *addr, size_t length) { } + +int mca_spml_base_put_all_nb(void *target, const void *source, + size_t size, long *counter) +{ + return OSHMEM_ERR_NOT_IMPLEMENTED; +} diff --git a/oshmem/mca/spml/ikrit/spml_ikrit.c b/oshmem/mca/spml/ikrit/spml_ikrit.c index 523baf7763..1cff194b4a 100644 --- a/oshmem/mca/spml/ikrit/spml_ikrit.c +++ b/oshmem/mca/spml/ikrit/spml_ikrit.c @@ -181,6 +181,7 @@ mca_spml_ikrit_t mca_spml_ikrit = { mca_spml_base_rmkey_free, mca_spml_base_rmkey_ptr, mca_spml_base_memuse_hook, + mca_spml_base_put_all_nb, (void*)&mca_spml_ikrit }, diff --git a/oshmem/mca/spml/spml.h b/oshmem/mca/spml/spml.h index fa992db91c..ca62b5f0bd 100644 --- a/oshmem/mca/spml/spml.h +++ b/oshmem/mca/spml/spml.h @@ -314,6 +314,35 @@ typedef int (*mca_spml_base_module_send_fn_t)(void *buf, int dst, mca_spml_base_put_mode_t mode); +/** + * The routine transfers the data asynchronously from the source PE to all + * PEs in the OpenSHMEM job. The routine returns immediately. The source and + * target buffers are reusable only after the completion of the routine. + * After the data is transferred to the target buffers, the counter object + * is updated atomically. The counter object can be read either using atomic + * operations such as shmem_atomic_fetch or can use point-to-point synchronization + * routines such as shmem_wait_until and shmem_test. + * + * Shmem_quiet may be used for completing the operation, but not required for + * progress or completion. In a multithreaded OpenSHMEM program, the user + * (the OpenSHMEM program) should ensure the correct ordering of + * shmemx_alltoall_global calls. + * + * @param dest A symmetric data object that is large enough to receive + * “size” bytes of data from each PE in the OpenSHMEM job. + * @param source A symmetric data object that contains “size” bytes of data + * for each PE in the OpenSHMEM job. + * @param size The number of bytes to be sent to each PE in the job. + * @param counter A symmetric data object to be atomically incremented after + * the target buffer is updated. + * + * @return OSHMEM_SUCCESS or failure status. + */ +typedef int (*mca_spml_base_module_put_all_nb_fn_t)(void *dest, + const void *source, + size_t size, + long *counter); + /** * Assures ordering of delivery of put() requests * @@ -381,6 +410,7 @@ struct mca_spml_base_module_1_0_0_t { mca_spml_base_module_mkey_ptr_fn_t spml_rmkey_ptr; mca_spml_base_module_memuse_hook_fn_t spml_memuse_hook; + mca_spml_base_module_put_all_nb_fn_t spml_put_all_nb; void *self; }; diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 0522ba0966..bd43baf381 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -34,6 +34,7 @@ #include "oshmem/proc/proc.h" #include "oshmem/mca/spml/base/base.h" #include "oshmem/mca/spml/base/spml_base_putreq.h" +#include "oshmem/mca/atomic/atomic.h" #include "oshmem/runtime/runtime.h" #include "orte/util/show_help.h" @@ -70,6 +71,7 @@ mca_spml_ucx_t mca_spml_ucx = { .spml_rmkey_free = mca_spml_ucx_rmkey_free, .spml_rmkey_ptr = mca_spml_ucx_rmkey_ptr, .spml_memuse_hook = mca_spml_ucx_memuse_hook, + .spml_put_all_nb = mca_spml_ucx_put_all_nb, .self = (void*)&mca_spml_ucx }, @@ -442,8 +444,8 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr, ucx_mkey->mem_h = (ucp_mem_h)mem_seg->context; } - status = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h, - &mkeys[0].u.data, &len); + status = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h, + &mkeys[0].u.data, &len); if (UCS_OK != status) { goto error_unmap; } @@ -480,8 +482,6 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) { spml_ucx_mkey_t *ucx_mkey; map_segment_t *mem_seg; - int segno; - int my_pe = oshmem_my_proc_id(); MCA_SPML_CALL(quiet(oshmem_ctx_default)); if (!mkeys) @@ -496,7 +496,7 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) if (OPAL_UNLIKELY(NULL == mem_seg)) { return OSHMEM_ERROR; } - + if (MAP_SEGMENT_ALLOC_UCX != mem_seg->type) { ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h); } @@ -548,17 +548,15 @@ static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx opal_atomic_wmb (); } -int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) +static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx_ctx_p) { - mca_spml_ucx_ctx_t *ucx_ctx; ucp_worker_params_t params; ucp_ep_params_t ep_params; size_t i, j, nprocs = oshmem_num_procs(); ucs_status_t err; - int my_pe = oshmem_my_proc_id(); - size_t len; spml_ucx_mkey_t *ucx_mkey; sshmem_mkey_t *mkey; + mca_spml_ucx_ctx_t *ucx_ctx; int rc = OSHMEM_ERROR; ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t)); @@ -583,10 +581,6 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) goto error; } - if (mca_spml_ucx.active_array.ctxs_count == 0) { - opal_progress_register(spml_ucx_ctx_progress); - } - for (i = 0; i < nprocs; i++) { ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]); @@ -612,11 +606,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) } } - SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); - _ctx_add(&mca_spml_ucx.active_array, ucx_ctx); - SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); + *ucx_ctx_p = ucx_ctx; - (*ctx) = (shmem_ctx_t)ucx_ctx; return OSHMEM_SUCCESS; error2: @@ -637,6 +628,33 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) return rc; } +int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx) +{ + mca_spml_ucx_ctx_t *ucx_ctx; + int rc; + + /* Take a lock controlling context creation. AUX context may set specific + * UCX parameters affecting worker creation, which are not needed for + * regular contexts. */ + pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex); + rc = mca_spml_ucx_ctx_create_common(options, &ucx_ctx); + pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex); + if (rc != OSHMEM_SUCCESS) { + return rc; + } + + if (mca_spml_ucx.active_array.ctxs_count == 0) { + opal_progress_register(spml_ucx_ctx_progress); + } + + SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex); + _ctx_add(&mca_spml_ucx.active_array, ucx_ctx); + SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex); + + (*ctx) = (shmem_ctx_t)ucx_ctx; + return OSHMEM_SUCCESS; +} + void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx) { MCA_SPML_CALL(quiet(ctx)); @@ -751,6 +769,15 @@ int mca_spml_ucx_quiet(shmem_ctx_t ctx) oshmem_shmem_abort(-1); return ret; } + + /* If put_all_nb op/s is/are being executed asynchronously, need to wait its + * completion as well. */ + if (ctx == oshmem_ctx_default) { + while (mca_spml_ucx.aux_refcnt) { + opal_progress(); + } + } + return OSHMEM_SUCCESS; } @@ -788,3 +815,99 @@ int mca_spml_ucx_send(void* buf, return rc; } + +/* this can be called with request==NULL in case of immediate completion */ +static void mca_spml_ucx_put_all_complete_cb(void *request, ucs_status_t status) +{ + if (mca_spml_ucx.async_progress && (--mca_spml_ucx.aux_refcnt == 0)) { + opal_event_evtimer_del(mca_spml_ucx.tick_event); + opal_progress_unregister(spml_ucx_progress_aux_ctx); + } + + if (request != NULL) { + ucp_request_free(request); + } +} + +/* Should be called with AUX lock taken */ +static int mca_spml_ucx_create_aux_ctx(void) +{ + unsigned major = 0; + unsigned minor = 0; + unsigned rel_number = 0; + int rc; + bool rand_dci_supp; + + ucp_get_version(&major, &minor, &rel_number); + rand_dci_supp = UCX_VERSION(major, minor, rel_number) >= UCX_VERSION(1, 6, 0); + + if (rand_dci_supp) { + pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex); + opal_setenv("UCX_DC_MLX5_TX_POLICY", "rand", 0, &environ); + } + + rc = mca_spml_ucx_ctx_create_common(SHMEM_CTX_PRIVATE, &mca_spml_ucx.aux_ctx); + + if (rand_dci_supp) { + opal_unsetenv("UCX_DC_MLX5_TX_POLICY", &environ); + pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex); + } + + return rc; +} + +int mca_spml_ucx_put_all_nb(void *dest, const void *source, size_t size, long *counter) +{ + int my_pe = oshmem_my_proc_id(); + long val = 1; + int peer, dst_pe, rc; + shmem_ctx_t ctx; + struct timeval tv; + void *request; + + mca_spml_ucx_aux_lock(); + if (mca_spml_ucx.async_progress) { + if (mca_spml_ucx.aux_ctx == NULL) { + rc = mca_spml_ucx_create_aux_ctx(); + if (rc != OMPI_SUCCESS) { + mca_spml_ucx_aux_unlock(); + oshmem_shmem_abort(-1); + } + } + + if (mca_spml_ucx.aux_refcnt++ == 0) { + tv.tv_sec = 0; + tv.tv_usec = mca_spml_ucx.async_tick; + opal_event_evtimer_add(mca_spml_ucx.tick_event, &tv); + opal_progress_register(spml_ucx_progress_aux_ctx); + } + ctx = (shmem_ctx_t)mca_spml_ucx.aux_ctx; + } else { + ctx = oshmem_ctx_default; + } + + for (peer = 0; peer < oshmem_num_procs(); peer++) { + dst_pe = (peer + my_pe) % oshmem_group_all->proc_count; + rc = mca_spml_ucx_put_nb(ctx, + (void*)((uintptr_t)dest + my_pe * size), + size, + (void*)((uintptr_t)source + dst_pe * size), + dst_pe, NULL); + RUNTIME_CHECK_RC(rc); + + mca_spml_ucx_fence(ctx); + + rc = MCA_ATOMIC_CALL(add(ctx, (void*)counter, val, sizeof(val), dst_pe)); + RUNTIME_CHECK_RC(rc); + } + + request = ucp_worker_flush_nb(((mca_spml_ucx_ctx_t*)ctx)->ucp_worker, 0, + mca_spml_ucx_put_all_complete_cb); + if (!UCS_PTR_IS_PTR(request)) { + mca_spml_ucx_put_all_complete_cb(NULL, UCS_PTR_STATUS(request)); + } + + mca_spml_ucx_aux_unlock(); + + return OSHMEM_SUCCESS; +} diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 6c2424ba76..7c9c7689bd 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -95,10 +95,19 @@ struct mca_spml_ucx { mca_spml_ucx_ctx_array_t idle_array; int priority; /* component priority */ shmem_internal_mutex_t internal_mutex; + pthread_mutex_t ctx_create_mutex; + /* Fields controlling aux context for put_all_nb SPML routine */ + bool async_progress; + int async_tick; + opal_event_base_t *async_event_base; + opal_event_t *tick_event; + mca_spml_ucx_ctx_t *aux_ctx; + pthread_spinlock_t async_lock; + int aux_refcnt; + }; typedef struct mca_spml_ucx mca_spml_ucx_t; - extern mca_spml_ucx_t mca_spml_ucx; extern int mca_spml_ucx_enable(bool enable); @@ -118,23 +127,28 @@ extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void **handle); extern int mca_spml_ucx_put(shmem_ctx_t ctx, - void* dst_addr, - size_t size, - void* src_addr, - int dst); + void* dst_addr, + size_t size, + void* src_addr, + int dst); extern int mca_spml_ucx_put_nb(shmem_ctx_t ctx, - void* dst_addr, - size_t size, - void* src_addr, - int dst, - void **handle); + void* dst_addr, + size_t size, + void* src_addr, + int dst, + void **handle); extern int mca_spml_ucx_recv(void* buf, size_t size, int src); extern int mca_spml_ucx_send(void* buf, - size_t size, - int dst, - mca_spml_base_put_mode_t mode); + size_t size, + int dst, + mca_spml_base_put_mode_t mode); + +extern int mca_spml_ucx_put_all_nb(void *target, + const void *source, + size_t size, + long *counter); extern sshmem_mkey_t *mca_spml_ucx_register(void* addr, size_t size, @@ -154,6 +168,22 @@ extern int mca_spml_ucx_fence(shmem_ctx_t ctx); extern int mca_spml_ucx_quiet(shmem_ctx_t ctx); extern int spml_ucx_default_progress(void); extern int spml_ucx_ctx_progress(void); +extern int spml_ucx_progress_aux_ctx(void); +void mca_spml_ucx_async_cb(int fd, short event, void *cbdata); + +static inline void mca_spml_ucx_aux_lock(void) +{ + if (mca_spml_ucx.async_progress) { + pthread_spin_lock(&mca_spml_ucx.async_lock); + } +} + +static inline void mca_spml_ucx_aux_unlock(void) +{ + if (mca_spml_ucx.async_progress) { + pthread_spin_unlock(&mca_spml_ucx.async_lock); + } +} static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe) { diff --git a/oshmem/mca/spml/ucx/spml_ucx_component.c b/oshmem/mca/spml/ucx/spml_ucx_component.c index 720dbf88f6..9a4a73ef12 100644 --- a/oshmem/mca/spml/ucx/spml_ucx_component.c +++ b/oshmem/mca/spml/ucx/spml_ucx_component.c @@ -24,6 +24,7 @@ #include "orte/util/show_help.h" #include "opal/util/opal_environ.h" +#include "opal/runtime/opal_progress_threads.h" static int mca_spml_ucx_component_register(void); static int mca_spml_ucx_component_open(void); @@ -90,11 +91,26 @@ static inline void mca_spml_ucx_param_register_string(const char* param_name, storage); } +static inline void mca_spml_ucx_param_register_bool(const char* param_name, + bool default_value, + const char *help_msg, + bool *storage) +{ + *storage = default_value; + (void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version, + param_name, + help_msg, + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + storage); +} + static int mca_spml_ucx_component_register(void) { mca_spml_ucx_param_register_int("priority", 21, - "[integer] ucx priority", - &mca_spml_ucx.priority); + "[integer] ucx priority", + &mca_spml_ucx.priority); mca_spml_ucx_param_register_int("num_disconnect", 1, "How may disconnects go in parallel", @@ -104,6 +120,14 @@ static int mca_spml_ucx_component_register(void) "Use non-blocking memory registration for shared heap", &mca_spml_ucx.heap_reg_nb); + mca_spml_ucx_param_register_bool("async_progress", 0, + "Enable asynchronous progress thread", + &mca_spml_ucx.async_progress); + + mca_spml_ucx_param_register_int("async_tick_usec", 3000, + "Asynchronous progress tick granularity (in usec)", + &mca_spml_ucx.async_tick); + opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version); return OSHMEM_SUCCESS; @@ -124,6 +148,39 @@ int spml_ucx_default_progress(void) return 1; } +int spml_ucx_progress_aux_ctx(void) +{ + unsigned count; + + if (OPAL_UNLIKELY(!mca_spml_ucx.aux_ctx)) { + return 0; + } + + if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) { + return 0; + } + + count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker); + pthread_spin_unlock(&mca_spml_ucx.async_lock); + + return count; +} + +void mca_spml_ucx_async_cb(int fd, short event, void *cbdata) +{ + int count = 0; + + if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) { + return; + } + + do { + count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker); + } while (count); + + pthread_spin_unlock(&mca_spml_ucx.async_lock); +} + static int mca_spml_ucx_component_open(void) { return OSHMEM_SUCCESS; @@ -185,6 +242,7 @@ static int spml_ucx_init(void) sizeof(mca_spml_ucx_ctx_t *)); SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex); + pthread_mutex_init(&mca_spml_ucx.ctx_create_mutex, NULL); wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) { @@ -207,6 +265,22 @@ static int spml_ucx_init(void) oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE; } + if (mca_spml_ucx.async_progress) { + pthread_spin_init(&mca_spml_ucx.async_lock, 0); + mca_spml_ucx.async_event_base = opal_progress_thread_init(NULL); + if (NULL == mca_spml_ucx.async_event_base) { + SPML_UCX_ERROR("failed to init async progress thread"); + return OSHMEM_ERROR; + } + + mca_spml_ucx.tick_event = opal_event_alloc(); + opal_event_set(mca_spml_ucx.async_event_base, mca_spml_ucx.tick_event, + -1, EV_PERSIST, mca_spml_ucx_async_cb, NULL); + } + + mca_spml_ucx.aux_ctx = NULL; + mca_spml_ucx.aux_refcnt = 0; + oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default; return OSHMEM_SUCCESS; @@ -252,8 +326,8 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx) } opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(), - mca_spml_ucx.num_disconnect, - ctx->ucp_worker); + mca_spml_ucx.num_disconnect, + ctx->ucp_worker); free(del_procs); free(ctx->ucp_peers); } @@ -271,6 +345,16 @@ static int mca_spml_ucx_component_fini(void) if(!mca_spml_ucx.enabled) return OSHMEM_SUCCESS; /* never selected.. return success.. */ + if (mca_spml_ucx.async_progress) { + opal_progress_thread_finalize(NULL); + opal_event_evtimer_del(mca_spml_ucx.tick_event); + if (mca_spml_ucx.aux_ctx != NULL) { + _ctx_cleanup(mca_spml_ucx.aux_ctx); + } + opal_progress_unregister(spml_ucx_progress_aux_ctx); + pthread_spin_destroy(&mca_spml_ucx.async_lock); + } + /* delete context objects from list */ for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) { _ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]); @@ -280,6 +364,7 @@ static int mca_spml_ucx_component_fini(void) _ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]); } + ret = opal_common_ucx_mca_pmix_fence_nb(&fenced); if (OPAL_SUCCESS != ret) { return ret; @@ -295,6 +380,10 @@ static int mca_spml_ucx_component_fini(void) } ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker); + + if (mca_spml_ucx.aux_ctx != NULL) { + ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker); + } } /* delete all workers */ @@ -312,12 +401,18 @@ static int mca_spml_ucx_component_fini(void) ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker); } + if (mca_spml_ucx.aux_ctx != NULL) { + ucp_worker_destroy(mca_spml_ucx.aux_ctx->ucp_worker); + } + mca_spml_ucx.enabled = false; /* not anymore */ free(mca_spml_ucx.active_array.ctxs); free(mca_spml_ucx.idle_array.ctxs); + free(mca_spml_ucx.aux_ctx); SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex); + pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex); if (mca_spml_ucx.ucp_context) { ucp_cleanup(mca_spml_ucx.ucp_context); diff --git a/oshmem/shmem/c/shmem_put_nb.c b/oshmem/shmem/c/shmem_put_nb.c index 0ec88120e5..bf63130e23 100644 --- a/oshmem/shmem/c/shmem_put_nb.c +++ b/oshmem/shmem/c/shmem_put_nb.c @@ -226,3 +226,12 @@ SHMEM_TYPE_PUTMEM_NB(_put32, 4, shmem) SHMEM_TYPE_PUTMEM_NB(_put64, 8, shmem) SHMEM_TYPE_PUTMEM_NB(_put128, 16, shmem) SHMEM_TYPE_PUTMEM_NB(_putmem, 1, shmem) + +void shmemx_alltoall_global_nb(void *dest, + const void *source, + size_t size, + long *counter) +{ + int rc = MCA_SPML_CALL(put_all_nb(dest, source, size, counter)); + RUNTIME_CHECK_RC(rc); +}