1
1

UCX osc: do not acquire accumulate lock if exclusive lock was taken

Signed-off-by: Joseph Schuchart <schuchart@hlrs.de>
Этот коммит содержится в:
Joseph Schuchart 2019-11-04 17:22:00 +01:00
родитель 471d76777a
Коммит 427d4bd226

Просмотреть файл

@ -235,48 +235,73 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
return ret;
}
static inline int start_atomicity(ompi_osc_ucx_module_t *module, int target) {
static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target)
{
ompi_osc_ucx_lock_t *lock = NULL;
opal_hash_table_get_value_uint32(&module->outstanding_locks,
(uint32_t) target, (void **) &lock);
/* if there is an exclusive lock there is no need to acqurie the accumulate lock */
return !(NULL != lock && LOCK_EXCLUSIVE == lock->type);
}
static inline int start_atomicity(
ompi_osc_ucx_module_t *module,
int target,
bool *lock_acquired) {
uint64_t result_value = -1;
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS;
for (;;) {
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
target, &result_value, sizeof(result_value),
remote_addr);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
return OMPI_ERROR;
}
if (result_value == TARGET_LOCK_UNLOCKED) {
return OMPI_SUCCESS;
if (need_acc_lock(module, target)) {
for (;;) {
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
target, &result_value, sizeof(result_value),
remote_addr);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
return OMPI_ERROR;
}
if (result_value == TARGET_LOCK_UNLOCKED) {
return OMPI_SUCCESS;
}
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
}
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
*lock_acquired = true;
} else {
*lock_acquired = false;
}
}
static inline int end_atomicity(
ompi_osc_ucx_module_t *module,
int target,
bool lock_acquired,
void *free_ptr) {
uint64_t result_value = 0;
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS;
/* fence any still active operations */
ret = opal_common_ucx_wpmem_fence(module->mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
if (lock_acquired) {
uint64_t result_value = 0;
/* fence any still active operations */
ret = opal_common_ucx_wpmem_fence(module->mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
target, &result_value, sizeof(result_value),
remote_addr);
assert(result_value == TARGET_LOCK_EXCLUSIVE);
} else if (NULL != free_ptr){
/* flush before freeing the buffer */
ret = opal_common_ucx_wpmem_flush(module->state_mem, OPAL_COMMON_UCX_SCOPE_EP, target);
}
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
target, &result_value, sizeof(result_value),
remote_addr);
/* TODO: encapsulate in a request and make the release non-blocking */
if (NULL != free_ptr) {
free(free_ptr);
@ -286,8 +311,6 @@ static inline int end_atomicity(
return OMPI_ERROR;
}
assert(result_value == TARGET_LOCK_EXCLUSIVE);
return ret;
}
@ -562,6 +585,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret = OMPI_SUCCESS;
void *free_ptr = NULL;
bool lock_acquired = false;
ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) {
@ -579,8 +603,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
NULL, ucx_req);
}
ret = start_atomicity(module, target);
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) {
return ret;
}
@ -681,7 +704,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
ompi_request_complete(&ucx_req->super, true);
}
return end_atomicity(module, target, free_ptr);
return end_atomicity(module, target, lock_acquired, free_ptr);
}
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
@ -701,17 +724,16 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
size_t dt_bytes;
int ret = OMPI_SUCCESS;
bool lock_acquired = false;
ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) {
return ret;
}
if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target);
if (ret != OMPI_SUCCESS) {
return ret;
}
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) {
return ret;
}
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
@ -738,7 +760,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
return ret;
}
return end_atomicity(module, target, NULL);
return end_atomicity(module, target, lock_acquired, NULL);
}
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
@ -759,9 +781,10 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
uint64_t value = origin_addr ? *(uint64_t *)origin_addr : 0;
ucp_atomic_fetch_op_t opcode;
size_t dt_bytes;
bool lock_acquired = false;
if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target);
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) {
return ret;
}
@ -792,7 +815,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
return ret;
}
return end_atomicity(module, target, NULL);
return end_atomicity(module, target, lock_acquired, NULL);
} else {
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
target, target_disp, 1, dt, op, win);
@ -811,6 +834,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret = OMPI_SUCCESS;
void *free_addr = NULL;
bool lock_acquired = false;
ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) {
@ -824,7 +848,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
result_addr, ucx_req);
}
ret = start_atomicity(module, target);
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) {
return ret;
}
@ -933,7 +957,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
}
return end_atomicity(module, target, free_addr);
return end_atomicity(module, target, lock_acquired, free_addr);
}
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,