1
1

Merge pull request #6980 from devreal/ucx-acc-singel-intrinsics

UCX osc: add support for acc_single_intrinsic
Этот коммит содержится в:
Artem Polyakov 2020-06-25 07:39:42 -07:00 коммит произвёл GitHub
родитель 7814f4195c e3b417c776
Коммит 907f4e196a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 473 добавлений и 101 удалений

Просмотреть файл

@ -34,6 +34,7 @@ typedef struct ompi_osc_ucx_component {
int num_incomplete_req_ops; int num_incomplete_req_ops;
int num_modules; int num_modules;
bool no_locks; /* Default value of the no_locks info key for new windows */ bool no_locks; /* Default value of the no_locks info key for new windows */
bool acc_single_intrinsic;
unsigned int priority; unsigned int priority;
} ompi_osc_ucx_component_t; } ompi_osc_ucx_component_t;
@ -115,6 +116,7 @@ typedef struct ompi_osc_ucx_module {
int *start_grp_ranks; int *start_grp_ranks;
bool lock_all_is_nocheck; bool lock_all_is_nocheck;
bool no_locks; bool no_locks;
bool acc_single_intrinsic;
opal_common_ucx_ctx_t *ctx; opal_common_ucx_ctx_t *ctx;
opal_common_ucx_wpmem_t *mem; opal_common_ucx_wpmem_t *mem;
opal_common_ucx_wpmem_t *state_mem; opal_common_ucx_wpmem_t *state_mem;

Просмотреть файл

@ -1,5 +1,7 @@
/* /*
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. * Copyright (C) 2001-2017 Mellanox Technologies Ltd. ALL RIGHTS RESERVED.
* Copyright (c) 2019-2020 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -24,6 +26,11 @@
return OMPI_ERROR; \ return OMPI_ERROR; \
} }
/* macro to check whether UCX supports atomic operation on the size the operands */
#define ATOMIC_SIZE_SUPPORTED(_remote_addr, _size) \
((sizeof(uint32_t) == (_size) && !((_remote_addr) & 0x3)) || \
(sizeof(uint64_t) == (_size) && !((_remote_addr) & 0x7)))
typedef struct ucx_iovec { typedef struct ucx_iovec {
void *addr; void *addr;
size_t len; size_t len;
@ -235,44 +242,84 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
return ret; return ret;
} }
static inline int start_atomicity(ompi_osc_ucx_module_t *module, int target) { static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target)
{
ompi_osc_ucx_lock_t *lock = NULL;
opal_hash_table_get_value_uint32(&module->outstanding_locks,
(uint32_t) target, (void **) &lock);
/* if there is an exclusive lock there is no need to acqurie the accumulate lock */
return !(NULL != lock && LOCK_EXCLUSIVE == lock->type);
}
static inline int start_atomicity(
ompi_osc_ucx_module_t *module,
int target,
bool *lock_acquired) {
uint64_t result_value = -1; uint64_t result_value = -1;
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET; uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
for (;;) { if (need_acc_lock(module, target)) {
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem, for (;;) {
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE, ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
target, &result_value, sizeof(result_value), TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
remote_addr); target, &result_value, sizeof(result_value),
if (ret != OMPI_SUCCESS) { remote_addr);
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret); if (ret != OMPI_SUCCESS) {
return OMPI_ERROR; OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
} return OMPI_ERROR;
if (result_value == TARGET_LOCK_UNLOCKED) { }
return OMPI_SUCCESS; if (result_value == TARGET_LOCK_UNLOCKED) {
break;
}
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
} }
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); *lock_acquired = true;
} else {
*lock_acquired = false;
} }
return OMPI_SUCCESS;
} }
static inline int end_atomicity(ompi_osc_ucx_module_t *module, int target) { static inline int end_atomicity(
uint64_t result_value = 0; ompi_osc_ucx_module_t *module,
int target,
bool lock_acquired,
void *free_ptr) {
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET; uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
ret = opal_common_ucx_wpmem_fetch(module->state_mem, if (lock_acquired) {
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, uint64_t result_value = 0;
target, &result_value, sizeof(result_value), /* fence any still active operations */
remote_addr); ret = opal_common_ucx_wpmem_fence(module->mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
target, &result_value, sizeof(result_value),
remote_addr);
assert(result_value == TARGET_LOCK_EXCLUSIVE);
} else if (NULL != free_ptr){
/* flush before freeing the buffer */
ret = opal_common_ucx_wpmem_flush(module->state_mem, OPAL_COMMON_UCX_SCOPE_EP, target);
}
/* TODO: encapsulate in a request and make the release non-blocking */
if (NULL != free_ptr) {
free(free_ptr);
}
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret); OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret);
return OMPI_ERROR; return OMPI_ERROR;
} }
assert(result_value == TARGET_LOCK_EXCLUSIVE);
return ret; return ret;
} }
@ -323,6 +370,114 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
return ret; return ret;
} }
static inline
bool use_atomic_op(
ompi_osc_ucx_module_t *module,
struct ompi_op_t *op,
uint64_t remote_addr,
struct ompi_datatype_t *origin_dt,
struct ompi_datatype_t *target_dt,
int origin_count,
int target_count)
{
if (module->acc_single_intrinsic &&
ompi_datatype_is_predefined(origin_dt) &&
origin_count == 1 &&
(op == &ompi_mpi_op_replace.op ||
op == &ompi_mpi_op_sum.op ||
op == &ompi_mpi_op_no_op.op)) {
size_t origin_dt_bytes;
size_t target_dt_bytes;
ompi_datatype_type_size(origin_dt, &origin_dt_bytes);
ompi_datatype_type_size(target_dt, &target_dt_bytes);
/* UCX only supports 32 and 64-bit operands atm */
if (ATOMIC_SIZE_SUPPORTED(remote_addr, origin_dt_bytes) &&
origin_dt_bytes == target_dt_bytes &&
origin_count == target_count) {
return true;
}
}
return false;
}
static int do_atomic_op_intrinsic(
ompi_osc_ucx_module_t *module,
struct ompi_op_t *op,
int target,
const void *origin_addr,
int count,
struct ompi_datatype_t *dt,
ptrdiff_t target_disp,
void *result_addr,
ompi_osc_ucx_request_t *ucx_req)
{
int ret = OMPI_SUCCESS;
size_t origin_dt_bytes;
ompi_datatype_type_size(dt, &origin_dt_bytes);
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
ret = get_dynamic_win_info(remote_addr, module, target);
if (ret != OMPI_SUCCESS) {
return ret;
}
}
ucp_atomic_fetch_op_t opcode;
bool is_no_op = false;
if (op == &ompi_mpi_op_replace.op) {
opcode = UCP_ATOMIC_FETCH_OP_SWAP;
} else {
opcode = UCP_ATOMIC_FETCH_OP_FADD;
if (op == &ompi_mpi_op_no_op.op) {
is_no_op = true;
}
}
opal_common_ucx_user_req_handler_t user_req_cb = NULL;
void *user_req_ptr = NULL;
void *output_addr = &(module->req_result);
if( result_addr ) {
output_addr = result_addr;
}
for (int i = 0; i < count; ++i) {
uint64_t value = 0;
if ((count - 1) == i && NULL != ucx_req) {
// the last item is used to feed the request, if needed
user_req_cb = &req_completion;
user_req_ptr = ucx_req;
// issue a fence if this is the last but not the only element
if (0 < i) {
ret = opal_common_ucx_wpmem_fence(module->mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}
}
}
if (is_no_op) {
value = 0;
} else {
value = opal_common_ucx_load_uint64(origin_addr, origin_dt_bytes);
}
ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target,
output_addr, origin_dt_bytes, remote_addr,
user_req_cb, user_req_ptr);
// advance origin and remote address
origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes);
remote_addr += origin_dt_bytes;
if (result_addr) {
output_addr = (void*)((intptr_t)output_addr + origin_dt_bytes);
}
}
return ret;
}
int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
int target, ptrdiff_t target_disp, int target_count, int target, ptrdiff_t target_disp, int target_count,
struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { struct ompi_datatype_t *target_dt, struct ompi_win_t *win) {
@ -432,13 +587,18 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
} }
} }
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, static
struct ompi_datatype_t *origin_dt, int accumulate_req(const void *origin_addr, int origin_count,
int target, ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *origin_dt,
struct ompi_datatype_t *target_dt, int target, ptrdiff_t target_disp, int target_count,
struct ompi_op_t *op, struct ompi_win_t *win) { struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, struct ompi_win_t *win,
ompi_osc_ucx_request_t *ucx_req) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
void *free_ptr = NULL;
bool lock_acquired = false;
ret = check_sync_state(module, target, false); ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
@ -449,7 +609,14 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
return ret; return ret;
} }
ret = start_atomicity(module, target); /* rely on UCX network atomics if the user told us that it safe */
if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) {
return do_atomic_op_intrinsic(module, op, target,
origin_addr, origin_count, origin_dt,
target_disp, NULL, ucx_req);
}
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
@ -461,7 +628,6 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
return ret; return ret;
} }
} else { } else {
void *temp_addr_holder = NULL;
void *temp_addr = NULL; void *temp_addr = NULL;
uint32_t temp_count; uint32_t temp_count;
ompi_datatype_t *temp_dt; ompi_datatype_t *temp_dt;
@ -478,7 +644,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
} }
} }
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count); temp_addr = free_ptr = malloc(temp_extent * temp_count);
if (temp_addr == NULL) { if (temp_addr == NULL) {
return OMPI_ERR_TEMP_OUT_OF_RESOURCE; return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
} }
@ -544,15 +710,53 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
return ret; return ret;
} }
ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); }
if (NULL != ucx_req) {
// nothing to wait for, mark request as completed
ompi_request_complete(&ucx_req->super, true);
}
return end_atomicity(module, target, lock_acquired, free_ptr);
}
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, ptrdiff_t target_disp, int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, struct ompi_win_t *win) {
return accumulate_req(origin_addr, origin_count, origin_dt, target,
target_disp, target_count, target_dt, op, win, NULL);
}
static int
do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
void *result_addr, struct ompi_datatype_t *dt,
int target, uint64_t remote_addr,
ompi_osc_ucx_module_t *module)
{
int ret;
bool lock_acquired = false;
size_t dt_bytes;
if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
free(temp_addr_holder);
} }
return end_atomicity(module, target); ompi_datatype_type_size(dt, &dt_bytes);
uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes);
uint64_t value = opal_common_ucx_load_uint64(origin_addr, dt_bytes);
ret = opal_common_ucx_wpmem_cmpswp_nb(module->mem, compare_val, value, target,
result_addr, dt_bytes, remote_addr,
NULL, NULL);
if (module->acc_single_intrinsic) {
return ret;
}
return end_atomicity(module, target, lock_acquired, NULL);
} }
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr, int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
@ -563,17 +767,13 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
size_t dt_bytes; size_t dt_bytes;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
bool lock_acquired = false;
ret = check_sync_state(module, target, false); ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
ret = start_atomicity(module, target);
if (ret != OMPI_SUCCESS) {
return ret;
}
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
ret = get_dynamic_win_info(remote_addr, module, target); ret = get_dynamic_win_info(remote_addr, module, target);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
@ -582,20 +782,50 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
} }
ompi_datatype_type_size(dt, &dt_bytes); ompi_datatype_type_size(dt, &dt_bytes);
ret = opal_common_ucx_wpmem_cmpswp(module->mem,*(uint64_t *)compare_addr, if (ATOMIC_SIZE_SUPPORTED(remote_addr, dt_bytes)) {
*(uint64_t *)origin_addr, target, // fast path using UCX atomic operations
result_addr, dt_bytes, remote_addr); return do_atomic_compare_and_swap(origin_addr, compare_addr,
result_addr, dt, target,
remote_addr, module);
}
/* fall back to get-compare-put */
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
return end_atomicity(module, target); ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, target,
&result_addr, dt_bytes, remote_addr);
if (OPAL_SUCCESS != ret) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_putget failed: %d", ret);
return OMPI_ERROR;
}
ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
if (ret != OPAL_SUCCESS) {
return ret;
}
if (0 == memcmp(result_addr, compare_addr, dt_bytes)) {
// write the new value
ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_PUT, target,
(void*)origin_addr, dt_bytes, remote_addr);
if (OPAL_SUCCESS != ret) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_putget failed: %d", ret);
return OMPI_ERROR;
}
}
return end_atomicity(module, target, lock_acquired, NULL);
} }
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
struct ompi_datatype_t *dt, int target, struct ompi_datatype_t *dt, int target,
ptrdiff_t target_disp, struct ompi_op_t *op, ptrdiff_t target_disp, struct ompi_op_t *op,
struct ompi_win_t *win) { struct ompi_win_t *win) {
size_t dt_bytes;
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
@ -604,16 +834,22 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
return ret; return ret;
} }
if (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op || uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
op == &ompi_mpi_op_sum.op) { ompi_datatype_type_size(dt, &dt_bytes);
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
uint64_t value = origin_addr ? *(uint64_t *)origin_addr : 0;
ucp_atomic_fetch_op_t opcode;
size_t dt_bytes;
ret = start_atomicity(module, target); /* UCX atomics are only supported on 32 and 64 bit values */
if (ret != OMPI_SUCCESS) { if (ATOMIC_SIZE_SUPPORTED(remote_addr, dt_bytes) &&
return ret; (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op ||
op == &ompi_mpi_op_sum.op)) {
uint64_t value;
ucp_atomic_fetch_op_t opcode;
bool lock_acquired = false;
if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) {
return ret;
}
} }
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
@ -623,7 +859,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
} }
} }
ompi_datatype_type_size(dt, &dt_bytes); value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0;
if (op == &ompi_mpi_op_replace.op) { if (op == &ompi_mpi_op_replace.op) {
opcode = UCP_ATOMIC_FETCH_OP_SWAP; opcode = UCP_ATOMIC_FETCH_OP_SWAP;
@ -634,35 +870,48 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
} }
} }
ret = opal_common_ucx_wpmem_fetch(module->mem, opcode, value, target, ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target,
(void *)result_addr, dt_bytes, remote_addr); (void *)result_addr, dt_bytes,
if (ret != OMPI_SUCCESS) { remote_addr, NULL, NULL);
if (module->acc_single_intrinsic) {
return ret; return ret;
} }
return end_atomicity(module, target); return end_atomicity(module, target, lock_acquired, NULL);
} else { } else {
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt, return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
target, target_disp, 1, dt, op, win); target, target_disp, 1, dt, op, win);
} }
} }
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, static
struct ompi_datatype_t *origin_dt, int get_accumulate_req(const void *origin_addr, int origin_count,
void *result_addr, int result_count, struct ompi_datatype_t *origin_dt,
struct ompi_datatype_t *result_dt, void *result_addr, int result_count,
int target, ptrdiff_t target_disp, struct ompi_datatype_t *result_dt,
int target_count, struct ompi_datatype_t *target_dt, int target, ptrdiff_t target_disp,
struct ompi_op_t *op, struct ompi_win_t *win) { int target_count, struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, struct ompi_win_t *win,
ompi_osc_ucx_request_t *ucx_req) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
void *free_addr = NULL;
bool lock_acquired = false;
ret = check_sync_state(module, target, false); ret = check_sync_state(module, target, false);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
ret = start_atomicity(module, target); /* rely on UCX network atomics if the user told us that it safe */
if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) {
return do_atomic_op_intrinsic(module, op, target,
origin_addr, origin_count, origin_dt,
target_disp, result_addr, ucx_req);
}
ret = start_atomicity(module, target, &lock_acquired);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
@ -682,7 +931,6 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
return ret; return ret;
} }
} else { } else {
void *temp_addr_holder = NULL;
void *temp_addr = NULL; void *temp_addr = NULL;
uint32_t temp_count; uint32_t temp_count;
ompi_datatype_t *temp_dt; ompi_datatype_t *temp_dt;
@ -699,7 +947,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
} }
} }
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count); temp_addr = free_addr = malloc(temp_extent * temp_count);
if (temp_addr == NULL) { if (temp_addr == NULL) {
return OMPI_ERR_TEMP_OUT_OF_RESOURCE; return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
} }
@ -763,17 +1011,29 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
return ret; return ret;
} }
ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target);
if (ret != OMPI_SUCCESS) {
return ret;
}
free(temp_addr_holder);
} }
} }
return end_atomicity(module, target); if (NULL != ucx_req) {
// nothing to wait for, mark request as completed
ompi_request_complete(&ucx_req->super, true);
}
return end_atomicity(module, target, lock_acquired, free_addr);
}
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
void *result_addr, int result_count,
struct ompi_datatype_t *result_dt,
int target, ptrdiff_t target_disp,
int target_count, struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, struct ompi_win_t *win) {
return get_accumulate_req(origin_addr, origin_count, origin_dt, result_addr,
result_count, result_dt, target, target_disp,
target_count, target_dt, op, win, NULL);
} }
int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
@ -819,6 +1079,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
sizeof(uint64_t), remote_addr, sizeof(uint64_t), remote_addr,
req_completion, ucx_req); req_completion, ucx_req);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret; return ret;
} }
@ -870,6 +1131,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
sizeof(uint64_t), remote_addr, sizeof(uint64_t), remote_addr,
req_completion, ucx_req); req_completion, ucx_req);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret; return ret;
} }
@ -895,13 +1157,13 @@ int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count,
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
assert(NULL != ucx_req); assert(NULL != ucx_req);
ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp, ret = accumulate_req(origin_addr, origin_count, origin_dt, target, target_disp,
target_count, target_dt, op, win); target_count, target_dt, op, win, ucx_req);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret; return ret;
} }
ompi_request_complete(&ucx_req->super, true);
*request = &ucx_req->super; *request = &ucx_req->super;
return ret; return ret;
@ -927,16 +1189,15 @@ int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count,
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
assert(NULL != ucx_req); assert(NULL != ucx_req);
ret = ompi_osc_ucx_get_accumulate(origin_addr, origin_count, origin_datatype, ret = get_accumulate_req(origin_addr, origin_count, origin_datatype,
result_addr, result_count, result_datatype, result_addr, result_count, result_datatype,
target, target_disp, target_count, target, target_disp, target_count,
target_datatype, op, win); target_datatype, op, win, ucx_req);
if (ret != OMPI_SUCCESS) { if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret; return ret;
} }
ompi_request_complete(&ucx_req->super, true);
*request = &ucx_req->super; *request = &ucx_req->super;
return ret; return ret;

Просмотреть файл

@ -72,7 +72,8 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
.wpool = NULL, .wpool = NULL,
.env_initialized = false, .env_initialized = false,
.num_incomplete_req_ops = 0, .num_incomplete_req_ops = 0,
.num_modules = 0 .num_modules = 0,
.acc_single_intrinsic = false
}; };
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = { ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
@ -167,6 +168,15 @@ static int component_register(void) {
MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.no_locks); MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.no_locks);
free(description_str); free(description_str);
mca_osc_ucx_component.acc_single_intrinsic = false;
opal_asprintf(&description_str, "Enable optimizations for MPI_Fetch_and_op, MPI_Accumulate, etc for codes "
"that will not use anything more than a single predefined datatype (default: %s)",
mca_osc_ucx_component.acc_single_intrinsic ? "true" : "false");
(void) mca_base_component_var_register(&mca_osc_ucx_component.super.osc_version, "acc_single_intrinsic",
description_str, MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.acc_single_intrinsic);
free(description_str);
opal_common_ucx_mca_var_register(&mca_osc_ucx_component.super.osc_version); opal_common_ucx_mca_var_register(&mca_osc_ucx_component.super.osc_version);
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -389,6 +399,7 @@ select_unlock:
module->flavor = flavor; module->flavor = flavor;
module->size = size; module->size = size;
module->no_locks = check_config_value_bool ("no_locks", info); module->no_locks = check_config_value_bool ("no_locks", info);
module->acc_single_intrinsic = check_config_value_bool ("acc_single_intrinsic", info);
/* share everyone's displacement units. Only do an allgather if /* share everyone's displacement units. Only do an allgather if
strictly necessary, since it requires O(p) state. */ strictly necessary, since it requires O(p) state. */

Просмотреть файл

@ -43,7 +43,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);
#define OMPI_OSC_UCX_REQUEST_RETURN(req) \ #define OMPI_OSC_UCX_REQUEST_RETURN(req) \
do { \ do { \
OMPI_REQUEST_FINI(&request->super); \ OMPI_REQUEST_FINI(&req->super); \
opal_free_list_return (&mca_osc_ucx_component.requests, \ opal_free_list_return (&mca_osc_ucx_component.requests, \
(opal_free_list_item_t*) req); \ (opal_free_list_item_t*) req); \
} while (0) } while (0)

Просмотреть файл

@ -3,6 +3,8 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science * Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved. * and Technology (RIST). All rights reserved.
* Copyright (c) 2019-2020 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -115,6 +117,42 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t *
size_t my_rank, size_t max_disconnect, ucp_worker_h worker); size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component); OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
/**
* Load an integer value of \c size bytes from \c ptr and cast it to uint64_t.
*/
static inline
uint64_t opal_common_ucx_load_uint64(const void *ptr, size_t size)
{
if (sizeof(uint8_t) == size) {
return *(uint8_t*)ptr;
} else if (sizeof(uint16_t) == size) {
return *(uint16_t*)ptr;
} else if (sizeof(uint32_t) == size) {
return *(uint32_t*)ptr;
} else {
return *(uint64_t*)ptr;
}
}
/**
* Cast and store a uint64_t value to a value of \c size bytes pointed to by \c ptr.
*/
static inline
void opal_common_ucx_store_uint64(uint64_t value, void *ptr, size_t size)
{
if (sizeof(uint8_t) == size) {
*(uint8_t*)ptr = value;
} else if (sizeof(uint16_t) == size) {
*(uint16_t*)ptr = value;
} else if (sizeof(uint32_t) == size) {
*(uint32_t*)ptr = value;
} else {
*(uint64_t*)ptr = value;
}
}
static inline static inline
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request) ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
{ {
@ -206,22 +244,21 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
uint64_t remote_addr, ucp_rkey_h rkey, uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker) ucp_worker_h worker)
{ {
uint64_t tmp = value; opal_common_ucx_store_uint64(value, result, op_size);
int ret; return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
op_size, remote_addr, rkey, worker);
}
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp, static inline
op_size, remote_addr, rkey, worker); ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare,
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) { uint64_t value, void *result, size_t op_size,
/* in case if op_size is constant (like sizeof(type)) then this condition uint64_t remote_addr, ucp_rkey_h rkey,
* is evaluated in compile time */ ucp_send_callback_t req_handler,
if (op_size == sizeof(uint64_t)) { ucp_worker_h worker)
*(uint64_t*)result = tmp; {
} else { opal_common_ucx_store_uint64(value, result, op_size);
assert(op_size == sizeof(uint32_t)); return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
*(uint32_t*)result = tmp; op_size, remote_addr, rkey, req_handler, worker);
}
}
return ret;
} }
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -1,3 +1,14 @@
/*
* Copyright (C) 2001-2017 Mellanox Technologies Ltd. ALL RIGHTS RESERVED.
* Copyright (c) 2019-2020 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef COMMON_UCX_WPOOL_H #ifndef COMMON_UCX_WPOOL_H
#define COMMON_UCX_WPOOL_H #define COMMON_UCX_WPOOL_H
@ -418,6 +429,56 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
return rc; return rc;
} }
static inline int
opal_common_ucx_wpmem_cmpswp_nb(opal_common_ucx_wpmem_t *mem, uint64_t compare,
uint64_t value, int target, void *buffer, size_t len,
uint64_t rem_addr,
opal_common_ucx_user_req_handler_t user_req_cb,
void *user_req_ptr)
{
ucp_ep_h ep;
ucp_rkey_h rkey;
opal_common_ucx_winfo_t *winfo = NULL;
opal_common_ucx_request_t *req;
int rc = OPAL_SUCCESS;
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc);
return rc;
}
/* Perform the operation */
opal_mutex_lock(&winfo->mutex);
req = opal_common_ucx_atomic_cswap_nb(ep, compare, value,
buffer, len,
rem_addr, rkey, opal_common_ucx_req_completion,
winfo->worker);
if (UCS_PTR_IS_PTR(req)) {
req->ext_req = user_req_ptr;
req->ext_cb = user_req_cb;
req->winfo = winfo;
} else {
if (user_req_cb != NULL) {
(*user_req_cb)(user_req_ptr);
}
}
rc = _periodical_flush_nb(mem, winfo, target);
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
return rc;
}
opal_mutex_unlock(&winfo->mutex);
return rc;
}
static inline int static inline int
opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode, opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
uint64_t value, int target, size_t len, uint64_t rem_addr) uint64_t value, int target, size_t len, uint64_t rem_addr)