UCX osc: fence active operations before releasing accumulate lock and free memory if required
Signed-off-by: Joseph Schuchart <schuchart@hlrs.de>
Этот коммит содержится в:
родитель
4d7a3856fa
Коммит
471d76777a
@ -257,15 +257,30 @@ static inline int start_atomicity(ompi_osc_ucx_module_t *module, int target) {
|
||||
}
|
||||
}
|
||||
|
||||
static inline int end_atomicity(ompi_osc_ucx_module_t *module, int target) {
|
||||
static inline int end_atomicity(
|
||||
ompi_osc_ucx_module_t *module,
|
||||
int target,
|
||||
void *free_ptr) {
|
||||
uint64_t result_value = 0;
|
||||
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
/* fence any still active operations */
|
||||
ret = opal_common_ucx_wpmem_fence(module->mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
|
||||
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
|
||||
target, &result_value, sizeof(result_value),
|
||||
remote_addr);
|
||||
|
||||
/* TODO: encapsulate in a request and make the release non-blocking */
|
||||
if (NULL != free_ptr) {
|
||||
free(free_ptr);
|
||||
}
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret);
|
||||
return OMPI_ERROR;
|
||||
@ -546,6 +561,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int ret = OMPI_SUCCESS;
|
||||
void *free_ptr = NULL;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
@ -576,7 +592,6 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr_holder = NULL;
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
@ -593,7 +608,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count);
|
||||
temp_addr = free_ptr = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -659,12 +674,6 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
free(temp_addr_holder);
|
||||
}
|
||||
|
||||
if (NULL != ucx_req) {
|
||||
@ -672,7 +681,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
ompi_request_complete(&ucx_req->super, true);
|
||||
}
|
||||
|
||||
return end_atomicity(module, target);
|
||||
return end_atomicity(module, target, free_ptr);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
@ -729,14 +738,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
|
||||
return ret;
|
||||
}
|
||||
|
||||
// fence before releasing the accumulate lock
|
||||
ret = opal_common_ucx_wpmem_fence(module->mem);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
|
||||
// don't return error, try to release the accumulate lock
|
||||
}
|
||||
|
||||
return end_atomicity(module, target);
|
||||
return end_atomicity(module, target, NULL);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
|
||||
@ -790,7 +792,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
|
||||
return ret;
|
||||
}
|
||||
|
||||
return end_atomicity(module, target);
|
||||
return end_atomicity(module, target, NULL);
|
||||
} else {
|
||||
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
|
||||
target, target_disp, 1, dt, op, win);
|
||||
@ -808,6 +810,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
ompi_osc_ucx_request_t *ucx_req) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int ret = OMPI_SUCCESS;
|
||||
void *free_addr = NULL;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
@ -841,7 +844,6 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr_holder = NULL;
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
@ -858,7 +860,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count);
|
||||
temp_addr = free_addr = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -922,13 +924,6 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
free(temp_addr_holder);
|
||||
}
|
||||
}
|
||||
|
||||
@ -938,7 +933,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
}
|
||||
|
||||
|
||||
return end_atomicity(module, target);
|
||||
return end_atomicity(module, target, free_addr);
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user