Merge pull request #6632 from brminich/topic/shmem_all2all_put_4.0.x
SPML/UCX: Add shmemx_alltoall_global_nb routine to shmemx.h 4.0.x
Этот коммит содержится в:
Коммит
73f9bcc374
@ -17,8 +17,6 @@
|
||||
#include "osc_ucx.h"
|
||||
#include "osc_ucx_request.h"
|
||||
|
||||
#define UCX_VERSION(_major, _minor, _build) (((_major) * 100) + (_minor))
|
||||
|
||||
#define memcpy_off(_dst, _src, _len, _off) \
|
||||
memcpy(((char*)(_dst)) + (_off), _src, _len); \
|
||||
(_off) += (_len);
|
||||
|
@ -34,6 +34,8 @@ BEGIN_C_DECLS
|
||||
# define MCA_COMMON_UCX_ASSERT(_x)
|
||||
#endif
|
||||
|
||||
#define UCX_VERSION(_major, _minor, _build) (((_major) * 100) + (_minor))
|
||||
|
||||
#define _MCA_COMMON_UCX_QUOTE(_x) \
|
||||
# _x
|
||||
#define MCA_COMMON_UCX_QUOTE(_x) \
|
||||
|
@ -168,6 +168,21 @@ OSHMEM_DECLSPEC void shmemx_int16_prod_to_all(int16_t *target, const int16_t *so
|
||||
OSHMEM_DECLSPEC void shmemx_int32_prod_to_all(int32_t *target, const int32_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int32_t *pWrk, long *pSync);
|
||||
OSHMEM_DECLSPEC void shmemx_int64_prod_to_all(int64_t *target, const int64_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int64_t *pWrk, long *pSync);
|
||||
|
||||
/* shmemx_alltoall_global_nb is a nonblocking collective routine, where each PE
|
||||
* exchanges “size” bytes of data with all other PEs in the OpenSHMEM job.
|
||||
|
||||
* @param dest A symmetric data object that is large enough to receive
|
||||
* “size” bytes of data from each PE in the OpenSHMEM job.
|
||||
* @param source A symmetric data object that contains “size” bytes of data
|
||||
* for each PE in the OpenSHMEM job.
|
||||
* @param size The number of bytes to be sent to each PE in the job.
|
||||
* @param counter A symmetric data object to be atomically incremented after
|
||||
* the target buffer is updated.
|
||||
*
|
||||
* @return OSHMEM_SUCCESS or failure status.
|
||||
*/
|
||||
OSHMEM_DECLSPEC void shmemx_alltoall_global_nb(void *dest, const void *source, size_t size, long *counter);
|
||||
|
||||
/*
|
||||
* Backward compatibility section
|
||||
*/
|
||||
|
@ -93,6 +93,10 @@ OSHMEM_DECLSPEC int mca_spml_base_get_nb(void *dst_addr,
|
||||
void **handle);
|
||||
|
||||
OSHMEM_DECLSPEC void mca_spml_base_memuse_hook(void *addr, size_t length);
|
||||
|
||||
OSHMEM_DECLSPEC int mca_spml_base_put_all_nb(void *target, const void *source,
|
||||
size_t size, long *counter);
|
||||
|
||||
/*
|
||||
* MCA framework
|
||||
*/
|
||||
|
@ -280,3 +280,9 @@ int mca_spml_base_get_nb(void *dst_addr, size_t size,
|
||||
void mca_spml_base_memuse_hook(void *addr, size_t length)
|
||||
{
|
||||
}
|
||||
|
||||
int mca_spml_base_put_all_nb(void *target, const void *source,
|
||||
size_t size, long *counter)
|
||||
{
|
||||
return OSHMEM_ERR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
@ -181,6 +181,7 @@ mca_spml_ikrit_t mca_spml_ikrit = {
|
||||
mca_spml_base_rmkey_free,
|
||||
mca_spml_base_rmkey_ptr,
|
||||
mca_spml_base_memuse_hook,
|
||||
mca_spml_base_put_all_nb,
|
||||
|
||||
(void*)&mca_spml_ikrit
|
||||
},
|
||||
|
@ -314,6 +314,35 @@ typedef int (*mca_spml_base_module_send_fn_t)(void *buf,
|
||||
int dst,
|
||||
mca_spml_base_put_mode_t mode);
|
||||
|
||||
/**
|
||||
* The routine transfers the data asynchronously from the source PE to all
|
||||
* PEs in the OpenSHMEM job. The routine returns immediately. The source and
|
||||
* target buffers are reusable only after the completion of the routine.
|
||||
* After the data is transferred to the target buffers, the counter object
|
||||
* is updated atomically. The counter object can be read either using atomic
|
||||
* operations such as shmem_atomic_fetch or can use point-to-point synchronization
|
||||
* routines such as shmem_wait_until and shmem_test.
|
||||
*
|
||||
* Shmem_quiet may be used for completing the operation, but not required for
|
||||
* progress or completion. In a multithreaded OpenSHMEM program, the user
|
||||
* (the OpenSHMEM program) should ensure the correct ordering of
|
||||
* shmemx_alltoall_global calls.
|
||||
*
|
||||
* @param dest A symmetric data object that is large enough to receive
|
||||
* “size” bytes of data from each PE in the OpenSHMEM job.
|
||||
* @param source A symmetric data object that contains “size” bytes of data
|
||||
* for each PE in the OpenSHMEM job.
|
||||
* @param size The number of bytes to be sent to each PE in the job.
|
||||
* @param counter A symmetric data object to be atomically incremented after
|
||||
* the target buffer is updated.
|
||||
*
|
||||
* @return OSHMEM_SUCCESS or failure status.
|
||||
*/
|
||||
typedef int (*mca_spml_base_module_put_all_nb_fn_t)(void *dest,
|
||||
const void *source,
|
||||
size_t size,
|
||||
long *counter);
|
||||
|
||||
/**
|
||||
* Assures ordering of delivery of put() requests
|
||||
*
|
||||
@ -381,6 +410,7 @@ struct mca_spml_base_module_1_0_0_t {
|
||||
mca_spml_base_module_mkey_ptr_fn_t spml_rmkey_ptr;
|
||||
|
||||
mca_spml_base_module_memuse_hook_fn_t spml_memuse_hook;
|
||||
mca_spml_base_module_put_all_nb_fn_t spml_put_all_nb;
|
||||
void *self;
|
||||
};
|
||||
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "oshmem/proc/proc.h"
|
||||
#include "oshmem/mca/spml/base/base.h"
|
||||
#include "oshmem/mca/spml/base/spml_base_putreq.h"
|
||||
#include "oshmem/mca/atomic/atomic.h"
|
||||
#include "oshmem/runtime/runtime.h"
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
@ -70,6 +71,7 @@ mca_spml_ucx_t mca_spml_ucx = {
|
||||
.spml_rmkey_free = mca_spml_ucx_rmkey_free,
|
||||
.spml_rmkey_ptr = mca_spml_ucx_rmkey_ptr,
|
||||
.spml_memuse_hook = mca_spml_ucx_memuse_hook,
|
||||
.spml_put_all_nb = mca_spml_ucx_put_all_nb,
|
||||
.self = (void*)&mca_spml_ucx
|
||||
},
|
||||
|
||||
@ -442,8 +444,8 @@ sshmem_mkey_t *mca_spml_ucx_register(void* addr,
|
||||
ucx_mkey->mem_h = (ucp_mem_h)mem_seg->context;
|
||||
}
|
||||
|
||||
status = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h,
|
||||
&mkeys[0].u.data, &len);
|
||||
status = ucp_rkey_pack(mca_spml_ucx.ucp_context, ucx_mkey->mem_h,
|
||||
&mkeys[0].u.data, &len);
|
||||
if (UCS_OK != status) {
|
||||
goto error_unmap;
|
||||
}
|
||||
@ -480,8 +482,6 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
|
||||
{
|
||||
spml_ucx_mkey_t *ucx_mkey;
|
||||
map_segment_t *mem_seg;
|
||||
int segno;
|
||||
int my_pe = oshmem_my_proc_id();
|
||||
|
||||
MCA_SPML_CALL(quiet(oshmem_ctx_default));
|
||||
if (!mkeys)
|
||||
@ -496,7 +496,7 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
|
||||
if (OPAL_UNLIKELY(NULL == mem_seg)) {
|
||||
return OSHMEM_ERROR;
|
||||
}
|
||||
|
||||
|
||||
if (MAP_SEGMENT_ALLOC_UCX != mem_seg->type) {
|
||||
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
|
||||
}
|
||||
@ -548,17 +548,15 @@ static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx
|
||||
opal_atomic_wmb ();
|
||||
}
|
||||
|
||||
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx_ctx_p)
|
||||
{
|
||||
mca_spml_ucx_ctx_t *ucx_ctx;
|
||||
ucp_worker_params_t params;
|
||||
ucp_ep_params_t ep_params;
|
||||
size_t i, j, nprocs = oshmem_num_procs();
|
||||
ucs_status_t err;
|
||||
int my_pe = oshmem_my_proc_id();
|
||||
size_t len;
|
||||
spml_ucx_mkey_t *ucx_mkey;
|
||||
sshmem_mkey_t *mkey;
|
||||
mca_spml_ucx_ctx_t *ucx_ctx;
|
||||
int rc = OSHMEM_ERROR;
|
||||
|
||||
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
|
||||
@ -583,10 +581,6 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (mca_spml_ucx.active_array.ctxs_count == 0) {
|
||||
opal_progress_register(spml_ucx_ctx_progress);
|
||||
}
|
||||
|
||||
for (i = 0; i < nprocs; i++) {
|
||||
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
|
||||
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
|
||||
@ -612,11 +606,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
}
|
||||
}
|
||||
|
||||
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
||||
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
|
||||
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
||||
*ucx_ctx_p = ucx_ctx;
|
||||
|
||||
(*ctx) = (shmem_ctx_t)ucx_ctx;
|
||||
return OSHMEM_SUCCESS;
|
||||
|
||||
error2:
|
||||
@ -637,6 +628,33 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
{
|
||||
mca_spml_ucx_ctx_t *ucx_ctx;
|
||||
int rc;
|
||||
|
||||
/* Take a lock controlling context creation. AUX context may set specific
|
||||
* UCX parameters affecting worker creation, which are not needed for
|
||||
* regular contexts. */
|
||||
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
|
||||
rc = mca_spml_ucx_ctx_create_common(options, &ucx_ctx);
|
||||
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
|
||||
if (rc != OSHMEM_SUCCESS) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (mca_spml_ucx.active_array.ctxs_count == 0) {
|
||||
opal_progress_register(spml_ucx_ctx_progress);
|
||||
}
|
||||
|
||||
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
|
||||
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
|
||||
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
|
||||
|
||||
(*ctx) = (shmem_ctx_t)ucx_ctx;
|
||||
return OSHMEM_SUCCESS;
|
||||
}
|
||||
|
||||
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
|
||||
{
|
||||
MCA_SPML_CALL(quiet(ctx));
|
||||
@ -751,6 +769,15 @@ int mca_spml_ucx_quiet(shmem_ctx_t ctx)
|
||||
oshmem_shmem_abort(-1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* If put_all_nb op/s is/are being executed asynchronously, need to wait its
|
||||
* completion as well. */
|
||||
if (ctx == oshmem_ctx_default) {
|
||||
while (mca_spml_ucx.aux_refcnt) {
|
||||
opal_progress();
|
||||
}
|
||||
}
|
||||
|
||||
return OSHMEM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -788,3 +815,99 @@ int mca_spml_ucx_send(void* buf,
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* this can be called with request==NULL in case of immediate completion */
|
||||
static void mca_spml_ucx_put_all_complete_cb(void *request, ucs_status_t status)
|
||||
{
|
||||
if (mca_spml_ucx.async_progress && (--mca_spml_ucx.aux_refcnt == 0)) {
|
||||
opal_event_evtimer_del(mca_spml_ucx.tick_event);
|
||||
opal_progress_unregister(spml_ucx_progress_aux_ctx);
|
||||
}
|
||||
|
||||
if (request != NULL) {
|
||||
ucp_request_free(request);
|
||||
}
|
||||
}
|
||||
|
||||
/* Should be called with AUX lock taken */
|
||||
static int mca_spml_ucx_create_aux_ctx(void)
|
||||
{
|
||||
unsigned major = 0;
|
||||
unsigned minor = 0;
|
||||
unsigned rel_number = 0;
|
||||
int rc;
|
||||
bool rand_dci_supp;
|
||||
|
||||
ucp_get_version(&major, &minor, &rel_number);
|
||||
rand_dci_supp = UCX_VERSION(major, minor, rel_number) >= UCX_VERSION(1, 6, 0);
|
||||
|
||||
if (rand_dci_supp) {
|
||||
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
|
||||
opal_setenv("UCX_DC_MLX5_TX_POLICY", "rand", 0, &environ);
|
||||
}
|
||||
|
||||
rc = mca_spml_ucx_ctx_create_common(SHMEM_CTX_PRIVATE, &mca_spml_ucx.aux_ctx);
|
||||
|
||||
if (rand_dci_supp) {
|
||||
opal_unsetenv("UCX_DC_MLX5_TX_POLICY", &environ);
|
||||
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int mca_spml_ucx_put_all_nb(void *dest, const void *source, size_t size, long *counter)
|
||||
{
|
||||
int my_pe = oshmem_my_proc_id();
|
||||
long val = 1;
|
||||
int peer, dst_pe, rc;
|
||||
shmem_ctx_t ctx;
|
||||
struct timeval tv;
|
||||
void *request;
|
||||
|
||||
mca_spml_ucx_aux_lock();
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
if (mca_spml_ucx.aux_ctx == NULL) {
|
||||
rc = mca_spml_ucx_create_aux_ctx();
|
||||
if (rc != OMPI_SUCCESS) {
|
||||
mca_spml_ucx_aux_unlock();
|
||||
oshmem_shmem_abort(-1);
|
||||
}
|
||||
}
|
||||
|
||||
if (mca_spml_ucx.aux_refcnt++ == 0) {
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = mca_spml_ucx.async_tick;
|
||||
opal_event_evtimer_add(mca_spml_ucx.tick_event, &tv);
|
||||
opal_progress_register(spml_ucx_progress_aux_ctx);
|
||||
}
|
||||
ctx = (shmem_ctx_t)mca_spml_ucx.aux_ctx;
|
||||
} else {
|
||||
ctx = oshmem_ctx_default;
|
||||
}
|
||||
|
||||
for (peer = 0; peer < oshmem_num_procs(); peer++) {
|
||||
dst_pe = (peer + my_pe) % oshmem_group_all->proc_count;
|
||||
rc = mca_spml_ucx_put_nb(ctx,
|
||||
(void*)((uintptr_t)dest + my_pe * size),
|
||||
size,
|
||||
(void*)((uintptr_t)source + dst_pe * size),
|
||||
dst_pe, NULL);
|
||||
RUNTIME_CHECK_RC(rc);
|
||||
|
||||
mca_spml_ucx_fence(ctx);
|
||||
|
||||
rc = MCA_ATOMIC_CALL(add(ctx, (void*)counter, val, sizeof(val), dst_pe));
|
||||
RUNTIME_CHECK_RC(rc);
|
||||
}
|
||||
|
||||
request = ucp_worker_flush_nb(((mca_spml_ucx_ctx_t*)ctx)->ucp_worker, 0,
|
||||
mca_spml_ucx_put_all_complete_cb);
|
||||
if (!UCS_PTR_IS_PTR(request)) {
|
||||
mca_spml_ucx_put_all_complete_cb(NULL, UCS_PTR_STATUS(request));
|
||||
}
|
||||
|
||||
mca_spml_ucx_aux_unlock();
|
||||
|
||||
return OSHMEM_SUCCESS;
|
||||
}
|
||||
|
@ -95,10 +95,19 @@ struct mca_spml_ucx {
|
||||
mca_spml_ucx_ctx_array_t idle_array;
|
||||
int priority; /* component priority */
|
||||
shmem_internal_mutex_t internal_mutex;
|
||||
pthread_mutex_t ctx_create_mutex;
|
||||
/* Fields controlling aux context for put_all_nb SPML routine */
|
||||
bool async_progress;
|
||||
int async_tick;
|
||||
opal_event_base_t *async_event_base;
|
||||
opal_event_t *tick_event;
|
||||
mca_spml_ucx_ctx_t *aux_ctx;
|
||||
pthread_spinlock_t async_lock;
|
||||
int aux_refcnt;
|
||||
|
||||
};
|
||||
typedef struct mca_spml_ucx mca_spml_ucx_t;
|
||||
|
||||
|
||||
extern mca_spml_ucx_t mca_spml_ucx;
|
||||
|
||||
extern int mca_spml_ucx_enable(bool enable);
|
||||
@ -118,23 +127,28 @@ extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
|
||||
void **handle);
|
||||
|
||||
extern int mca_spml_ucx_put(shmem_ctx_t ctx,
|
||||
void* dst_addr,
|
||||
size_t size,
|
||||
void* src_addr,
|
||||
int dst);
|
||||
void* dst_addr,
|
||||
size_t size,
|
||||
void* src_addr,
|
||||
int dst);
|
||||
|
||||
extern int mca_spml_ucx_put_nb(shmem_ctx_t ctx,
|
||||
void* dst_addr,
|
||||
size_t size,
|
||||
void* src_addr,
|
||||
int dst,
|
||||
void **handle);
|
||||
void* dst_addr,
|
||||
size_t size,
|
||||
void* src_addr,
|
||||
int dst,
|
||||
void **handle);
|
||||
|
||||
extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
|
||||
extern int mca_spml_ucx_send(void* buf,
|
||||
size_t size,
|
||||
int dst,
|
||||
mca_spml_base_put_mode_t mode);
|
||||
size_t size,
|
||||
int dst,
|
||||
mca_spml_base_put_mode_t mode);
|
||||
|
||||
extern int mca_spml_ucx_put_all_nb(void *target,
|
||||
const void *source,
|
||||
size_t size,
|
||||
long *counter);
|
||||
|
||||
extern sshmem_mkey_t *mca_spml_ucx_register(void* addr,
|
||||
size_t size,
|
||||
@ -154,6 +168,22 @@ extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
|
||||
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
|
||||
extern int spml_ucx_default_progress(void);
|
||||
extern int spml_ucx_ctx_progress(void);
|
||||
extern int spml_ucx_progress_aux_ctx(void);
|
||||
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);
|
||||
|
||||
static inline void mca_spml_ucx_aux_lock(void)
|
||||
{
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_lock(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
}
|
||||
|
||||
static inline void mca_spml_ucx_aux_unlock(void)
|
||||
{
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
}
|
||||
|
||||
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
|
||||
{
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
#include "orte/util/show_help.h"
|
||||
#include "opal/util/opal_environ.h"
|
||||
#include "opal/runtime/opal_progress_threads.h"
|
||||
|
||||
static int mca_spml_ucx_component_register(void);
|
||||
static int mca_spml_ucx_component_open(void);
|
||||
@ -90,11 +91,26 @@ static inline void mca_spml_ucx_param_register_string(const char* param_name,
|
||||
storage);
|
||||
}
|
||||
|
||||
static inline void mca_spml_ucx_param_register_bool(const char* param_name,
|
||||
bool default_value,
|
||||
const char *help_msg,
|
||||
bool *storage)
|
||||
{
|
||||
*storage = default_value;
|
||||
(void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
|
||||
param_name,
|
||||
help_msg,
|
||||
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
|
||||
OPAL_INFO_LVL_9,
|
||||
MCA_BASE_VAR_SCOPE_READONLY,
|
||||
storage);
|
||||
}
|
||||
|
||||
static int mca_spml_ucx_component_register(void)
|
||||
{
|
||||
mca_spml_ucx_param_register_int("priority", 21,
|
||||
"[integer] ucx priority",
|
||||
&mca_spml_ucx.priority);
|
||||
"[integer] ucx priority",
|
||||
&mca_spml_ucx.priority);
|
||||
|
||||
mca_spml_ucx_param_register_int("num_disconnect", 1,
|
||||
"How may disconnects go in parallel",
|
||||
@ -104,6 +120,14 @@ static int mca_spml_ucx_component_register(void)
|
||||
"Use non-blocking memory registration for shared heap",
|
||||
&mca_spml_ucx.heap_reg_nb);
|
||||
|
||||
mca_spml_ucx_param_register_bool("async_progress", 0,
|
||||
"Enable asynchronous progress thread",
|
||||
&mca_spml_ucx.async_progress);
|
||||
|
||||
mca_spml_ucx_param_register_int("async_tick_usec", 3000,
|
||||
"Asynchronous progress tick granularity (in usec)",
|
||||
&mca_spml_ucx.async_tick);
|
||||
|
||||
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
|
||||
|
||||
return OSHMEM_SUCCESS;
|
||||
@ -124,6 +148,39 @@ int spml_ucx_default_progress(void)
|
||||
return 1;
|
||||
}
|
||||
|
||||
int spml_ucx_progress_aux_ctx(void)
|
||||
{
|
||||
unsigned count;
|
||||
|
||||
if (OPAL_UNLIKELY(!mca_spml_ucx.aux_ctx)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata)
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
|
||||
return;
|
||||
}
|
||||
|
||||
do {
|
||||
count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
} while (count);
|
||||
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
|
||||
static int mca_spml_ucx_component_open(void)
|
||||
{
|
||||
return OSHMEM_SUCCESS;
|
||||
@ -185,6 +242,7 @@ static int spml_ucx_init(void)
|
||||
sizeof(mca_spml_ucx_ctx_t *));
|
||||
|
||||
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
|
||||
pthread_mutex_init(&mca_spml_ucx.ctx_create_mutex, NULL);
|
||||
|
||||
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||
if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
|
||||
@ -207,6 +265,22 @@ static int spml_ucx_init(void)
|
||||
oshmem_mpi_thread_provided = SHMEM_THREAD_SINGLE;
|
||||
}
|
||||
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_init(&mca_spml_ucx.async_lock, 0);
|
||||
mca_spml_ucx.async_event_base = opal_progress_thread_init(NULL);
|
||||
if (NULL == mca_spml_ucx.async_event_base) {
|
||||
SPML_UCX_ERROR("failed to init async progress thread");
|
||||
return OSHMEM_ERROR;
|
||||
}
|
||||
|
||||
mca_spml_ucx.tick_event = opal_event_alloc();
|
||||
opal_event_set(mca_spml_ucx.async_event_base, mca_spml_ucx.tick_event,
|
||||
-1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
|
||||
}
|
||||
|
||||
mca_spml_ucx.aux_ctx = NULL;
|
||||
mca_spml_ucx.aux_refcnt = 0;
|
||||
|
||||
oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
|
||||
|
||||
return OSHMEM_SUCCESS;
|
||||
@ -252,8 +326,8 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
|
||||
}
|
||||
|
||||
opal_common_ucx_del_procs_nofence(del_procs, nprocs, oshmem_my_proc_id(),
|
||||
mca_spml_ucx.num_disconnect,
|
||||
ctx->ucp_worker);
|
||||
mca_spml_ucx.num_disconnect,
|
||||
ctx->ucp_worker);
|
||||
free(del_procs);
|
||||
free(ctx->ucp_peers);
|
||||
}
|
||||
@ -271,6 +345,16 @@ static int mca_spml_ucx_component_fini(void)
|
||||
if(!mca_spml_ucx.enabled)
|
||||
return OSHMEM_SUCCESS; /* never selected.. return success.. */
|
||||
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
opal_progress_thread_finalize(NULL);
|
||||
opal_event_evtimer_del(mca_spml_ucx.tick_event);
|
||||
if (mca_spml_ucx.aux_ctx != NULL) {
|
||||
_ctx_cleanup(mca_spml_ucx.aux_ctx);
|
||||
}
|
||||
opal_progress_unregister(spml_ucx_progress_aux_ctx);
|
||||
pthread_spin_destroy(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
|
||||
/* delete context objects from list */
|
||||
for (i = 0; i < mca_spml_ucx.active_array.ctxs_count; i++) {
|
||||
_ctx_cleanup(mca_spml_ucx.active_array.ctxs[i]);
|
||||
@ -280,6 +364,7 @@ static int mca_spml_ucx_component_fini(void)
|
||||
_ctx_cleanup(mca_spml_ucx.idle_array.ctxs[i]);
|
||||
}
|
||||
|
||||
|
||||
ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
return ret;
|
||||
@ -295,6 +380,10 @@ static int mca_spml_ucx_component_fini(void)
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
|
||||
|
||||
if (mca_spml_ucx.aux_ctx != NULL) {
|
||||
ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
}
|
||||
}
|
||||
|
||||
/* delete all workers */
|
||||
@ -312,12 +401,18 @@ static int mca_spml_ucx_component_fini(void)
|
||||
ucp_worker_destroy(mca_spml_ucx_ctx_default.ucp_worker);
|
||||
}
|
||||
|
||||
if (mca_spml_ucx.aux_ctx != NULL) {
|
||||
ucp_worker_destroy(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
}
|
||||
|
||||
mca_spml_ucx.enabled = false; /* not anymore */
|
||||
|
||||
free(mca_spml_ucx.active_array.ctxs);
|
||||
free(mca_spml_ucx.idle_array.ctxs);
|
||||
free(mca_spml_ucx.aux_ctx);
|
||||
|
||||
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
|
||||
pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex);
|
||||
|
||||
if (mca_spml_ucx.ucp_context) {
|
||||
ucp_cleanup(mca_spml_ucx.ucp_context);
|
||||
|
@ -226,3 +226,12 @@ SHMEM_TYPE_PUTMEM_NB(_put32, 4, shmem)
|
||||
SHMEM_TYPE_PUTMEM_NB(_put64, 8, shmem)
|
||||
SHMEM_TYPE_PUTMEM_NB(_put128, 16, shmem)
|
||||
SHMEM_TYPE_PUTMEM_NB(_putmem, 1, shmem)
|
||||
|
||||
void shmemx_alltoall_global_nb(void *dest,
|
||||
const void *source,
|
||||
size_t size,
|
||||
long *counter)
|
||||
{
|
||||
int rc = MCA_SPML_CALL(put_all_nb(dest, source, size, counter));
|
||||
RUNTIME_CHECK_RC(rc);
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user