SPML/UCX: CR comments p1
Signed-off-by: Mikhail Brinskii <mikhailb@mellanox.com>
Этот коммит содержится в:
родитель
2ef5bd8b36
Коммит
c4c99457db
@ -168,8 +168,20 @@ OSHMEM_DECLSPEC void shmemx_int16_prod_to_all(int16_t *target, const int16_t *so
|
||||
OSHMEM_DECLSPEC void shmemx_int32_prod_to_all(int32_t *target, const int32_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int32_t *pWrk, long *pSync);
|
||||
OSHMEM_DECLSPEC void shmemx_int64_prod_to_all(int64_t *target, const int64_t *source, int nreduce, int PE_start, int logPE_stride, int PE_size, int64_t *pWrk, long *pSync);
|
||||
|
||||
/* Alltoall put with atomic counter increase */
|
||||
OSHMEM_DECLSPEC void shmemx_put_with_long_inc_all(void *target, const void *source, size_t size, long *counter);
|
||||
/* shmemx_alltoall_global_nb is a nonblocking collective routine, where each PE
|
||||
* exchanges “size” bytes of data with all other PEs in the OpenSHMEM job.
|
||||
|
||||
* @param dest A symmetric data object that is large enough to receive
|
||||
* “size” bytes of data.
|
||||
* @param source A symmetric data object that contains “size” bytes of data
|
||||
* for each PE in the OpenSHMEM job.
|
||||
* @param size The number of bytes to be sent to each PE in the job.
|
||||
* @param counter A symmetric data object to be atomically incremented after
|
||||
* the target buffer is updated.
|
||||
*
|
||||
* @return OSHMEM_SUCCESS or failure status.
|
||||
*/
|
||||
OSHMEM_DECLSPEC void shmemx_alltoall_global_nb(void *dest, const void *source, size_t size, long *counter);
|
||||
|
||||
/*
|
||||
* Backward compatibility section
|
||||
|
@ -630,12 +630,12 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
|
||||
mca_spml_ucx_ctx_t *ucx_ctx;
|
||||
int rc;
|
||||
|
||||
/* Take a lock controlling aux context. AUX context may set specific
|
||||
/* Take a lock controlling context creation. AUX context may set specific
|
||||
* UCX parameters affecting worker creation, which are not needed for
|
||||
* regular contexts. */
|
||||
mca_spml_ucx_aux_lock();
|
||||
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
|
||||
rc = mca_spml_ucx_ctx_create_common(options, &ucx_ctx);
|
||||
mca_spml_ucx_aux_unlock();
|
||||
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
|
||||
if (rc != OSHMEM_SUCCESS) {
|
||||
return rc;
|
||||
}
|
||||
@ -813,6 +813,7 @@ int mca_spml_ucx_send(void* buf,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* this can be called with request==NULL in case of immediate completion */
|
||||
static void mca_spml_ucx_put_all_complete_cb(void *request, ucs_status_t status)
|
||||
{
|
||||
if (mca_spml_ucx.async_progress && (--mca_spml_ucx.aux_refcnt == 0)) {
|
||||
@ -838,14 +839,14 @@ static int mca_spml_ucx_create_aux_ctx(void)
|
||||
rand_dci_supp = UCX_VERSION(major, minor, rel_number) >= UCX_VERSION(1, 6, 0);
|
||||
|
||||
if (rand_dci_supp) {
|
||||
opal_setenv("UCX_DC_TX_POLICY", "rand", 1, &environ);
|
||||
pthread_mutex_lock(&mca_spml_ucx.ctx_create_mutex);
|
||||
opal_setenv("UCX_DC_MLX5_TX_POLICY", "rand", 1, &environ);
|
||||
}
|
||||
|
||||
rc = mca_spml_ucx_ctx_create_common(SHMEM_CTX_PRIVATE, &mca_spml_ucx.aux_ctx);
|
||||
|
||||
if (rand_dci_supp) {
|
||||
opal_unsetenv("UCX_DC_TX_POLICY", &environ);
|
||||
pthread_mutex_unlock(&mca_spml_ucx.ctx_create_mutex);
|
||||
opal_unsetenv("UCX_DC_MLX5_TX_POLICY", &environ);
|
||||
}
|
||||
|
||||
@ -871,14 +872,13 @@ int mca_spml_ucx_put_all_nb(void *dest, const void *source, size_t size, long *c
|
||||
}
|
||||
}
|
||||
|
||||
if (!mca_spml_ucx.aux_refcnt) {
|
||||
if (mca_spml_ucx.aux_refcnt++ == 0) {
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = mca_spml_ucx.async_tick;
|
||||
opal_event_evtimer_add(mca_spml_ucx.tick_event, &tv);
|
||||
opal_progress_register(spml_ucx_progress_aux_ctx);
|
||||
}
|
||||
ctx = (shmem_ctx_t)mca_spml_ucx.aux_ctx;
|
||||
++mca_spml_ucx.aux_refcnt;
|
||||
} else {
|
||||
ctx = oshmem_ctx_default;
|
||||
}
|
||||
|
@ -94,6 +94,7 @@ struct mca_spml_ucx {
|
||||
mca_spml_ucx_ctx_array_t idle_array;
|
||||
int priority; /* component priority */
|
||||
shmem_internal_mutex_t internal_mutex;
|
||||
pthread_mutex_t ctx_create_mutex;
|
||||
/* Fields controlling aux context for put_all_nb SPML routine */
|
||||
bool async_progress;
|
||||
int async_tick;
|
||||
@ -169,16 +170,18 @@ extern int spml_ucx_ctx_progress(void);
|
||||
extern int spml_ucx_progress_aux_ctx(void);
|
||||
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);
|
||||
|
||||
static inline int mca_spml_ucx_aux_lock(void)
|
||||
static inline void mca_spml_ucx_aux_lock(void)
|
||||
{
|
||||
return mca_spml_ucx.async_progress ?
|
||||
pthread_spin_lock(&mca_spml_ucx.async_lock) : 0;
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_lock(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
}
|
||||
|
||||
static inline int mca_spml_ucx_aux_unlock(void)
|
||||
static inline void mca_spml_ucx_aux_unlock(void)
|
||||
{
|
||||
return mca_spml_ucx.async_progress ?
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock) : 0;
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock);
|
||||
}
|
||||
}
|
||||
|
||||
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
|
||||
|
@ -150,18 +150,20 @@ int spml_ucx_default_progress(void)
|
||||
|
||||
int spml_ucx_progress_aux_ctx(void)
|
||||
{
|
||||
unsigned count;
|
||||
|
||||
if (OPAL_UNLIKELY(!mca_spml_ucx.aux_ctx)) {
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pthread_spin_trylock(&mca_spml_ucx.async_lock)) {
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
count = ucp_worker_progress(mca_spml_ucx.aux_ctx->ucp_worker);
|
||||
pthread_spin_unlock(&mca_spml_ucx.async_lock);
|
||||
|
||||
return 1;
|
||||
return count;
|
||||
}
|
||||
|
||||
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata)
|
||||
@ -240,6 +242,7 @@ static int spml_ucx_init(void)
|
||||
sizeof(mca_spml_ucx_ctx_t *));
|
||||
|
||||
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
|
||||
pthread_mutex_init(&mca_spml_ucx.ctx_create_mutex, NULL);
|
||||
|
||||
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||
if (oshmem_mpi_thread_requested == SHMEM_THREAD_MULTIPLE) {
|
||||
@ -265,7 +268,7 @@ static int spml_ucx_init(void)
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
pthread_spin_init(&mca_spml_ucx.async_lock, 0);
|
||||
mca_spml_ucx.async_event_base = opal_progress_thread_init(NULL);
|
||||
if (NULL == mca_spml_ucx.async_event_base) {
|
||||
if (NULL == mca_spml_ucx.async_event_base) {
|
||||
SPML_UCX_ERROR("failed to init async progress thread");
|
||||
return OSHMEM_ERROR;
|
||||
}
|
||||
@ -274,6 +277,7 @@ static int spml_ucx_init(void)
|
||||
opal_event_set(mca_spml_ucx.async_event_base, mca_spml_ucx.tick_event,
|
||||
-1, EV_PERSIST, mca_spml_ucx_async_cb, NULL);
|
||||
}
|
||||
|
||||
mca_spml_ucx.aux_ctx = NULL;
|
||||
mca_spml_ucx.aux_refcnt = 0;
|
||||
|
||||
@ -342,8 +346,8 @@ static int mca_spml_ucx_component_fini(void)
|
||||
return OSHMEM_SUCCESS; /* never selected.. return success.. */
|
||||
|
||||
if (mca_spml_ucx.async_progress) {
|
||||
opal_event_evtimer_del(mca_spml_ucx.tick_event);
|
||||
opal_progress_thread_finalize(NULL);
|
||||
opal_event_evtimer_del(mca_spml_ucx.tick_event);
|
||||
if (mca_spml_ucx.aux_ctx != NULL) {
|
||||
_ctx_cleanup(mca_spml_ucx.aux_ctx);
|
||||
}
|
||||
@ -408,6 +412,7 @@ static int mca_spml_ucx_component_fini(void)
|
||||
free(mca_spml_ucx.aux_ctx);
|
||||
|
||||
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
|
||||
pthread_mutex_destroy(&mca_spml_ucx.ctx_create_mutex);
|
||||
|
||||
if (mca_spml_ucx.ucp_context) {
|
||||
ucp_cleanup(mca_spml_ucx.ucp_context);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user