UCX osc: centralize decision on whether to use AMOs
Signed-off-by: Joseph Schuchart <schuchart@hlrs.de>
Этот коммит содержится в:
родитель
427d4bd226
Коммит
d8696aa8c4
@ -264,7 +264,7 @@ static inline int start_atomicity(
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
if (result_value == TARGET_LOCK_UNLOCKED) {
|
||||
return OMPI_SUCCESS;
|
||||
break;
|
||||
}
|
||||
|
||||
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
|
||||
@ -274,6 +274,8 @@ static inline int start_atomicity(
|
||||
} else {
|
||||
*lock_acquired = false;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int end_atomicity(
|
||||
@ -362,16 +364,30 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
|
||||
}
|
||||
|
||||
static inline
|
||||
bool use_ucx_op(struct ompi_op_t *op, struct ompi_datatype_t *origin_dt)
|
||||
bool use_atomic_op(
|
||||
ompi_osc_ucx_module_t *module,
|
||||
struct ompi_op_t *op,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
int origin_count,
|
||||
int target_count)
|
||||
{
|
||||
|
||||
if (op == &ompi_mpi_op_replace.op ||
|
||||
op == &ompi_mpi_op_sum.op ||
|
||||
op == &ompi_mpi_op_no_op.op) {
|
||||
size_t dt_bytes;
|
||||
ompi_datatype_type_size(origin_dt, &dt_bytes);
|
||||
if (ompi_datatype_is_predefined(origin_dt) &&
|
||||
sizeof(uint64_t) >= dt_bytes) {
|
||||
if (module->acc_single_intrinsic &&
|
||||
ompi_datatype_is_predefined(origin_dt) &&
|
||||
origin_count == 1 &&
|
||||
(op == &ompi_mpi_op_replace.op ||
|
||||
op == &ompi_mpi_op_sum.op ||
|
||||
op == &ompi_mpi_op_no_op.op)) {
|
||||
size_t origin_dt_bytes;
|
||||
size_t target_dt_bytes;
|
||||
ompi_datatype_type_size(origin_dt, &origin_dt_bytes);
|
||||
ompi_datatype_type_size(target_dt, &target_dt_bytes);
|
||||
/* UCX only supports 32 and 64-bit operands atm */
|
||||
if (sizeof(uint64_t) >= origin_dt_bytes &&
|
||||
sizeof(uint32_t) <= origin_dt_bytes &&
|
||||
origin_dt_bytes == target_dt_bytes &&
|
||||
origin_count == target_count) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -384,25 +400,15 @@ static int do_atomic_op_intrinsic(
|
||||
struct ompi_op_t *op,
|
||||
int target,
|
||||
const void *origin_addr,
|
||||
int origin_count,
|
||||
struct ompi_datatype_t *origin_dt,
|
||||
int count,
|
||||
struct ompi_datatype_t *dt,
|
||||
ptrdiff_t target_disp,
|
||||
int target_count,
|
||||
struct ompi_datatype_t *target_dt,
|
||||
void *result_addr,
|
||||
ompi_osc_ucx_request_t *ucx_req)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
size_t origin_dt_bytes;
|
||||
size_t target_dt_bytes;
|
||||
ompi_datatype_type_size(origin_dt, &origin_dt_bytes);
|
||||
ompi_datatype_type_size(target_dt, &target_dt_bytes);
|
||||
|
||||
if (sizeof(uint64_t) > origin_dt_bytes ||
|
||||
origin_dt_bytes != target_dt_bytes ||
|
||||
target_count != origin_count) {
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
ompi_datatype_type_size(dt, &origin_dt_bytes);
|
||||
|
||||
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
|
||||
@ -430,9 +436,9 @@ static int do_atomic_op_intrinsic(
|
||||
if( result_addr ) {
|
||||
output_addr = result_addr;
|
||||
}
|
||||
for (int i = 0; i < origin_count; ++i) {
|
||||
for (int i = 0; i < count; ++i) {
|
||||
uint64_t value = 0;
|
||||
if ((origin_count - 1) == i && NULL != ucx_req) {
|
||||
if ((count - 1) == i && NULL != ucx_req) {
|
||||
// the last item is used to feed the request, if needed
|
||||
user_req_cb = &req_completion;
|
||||
user_req_ptr = ucx_req;
|
||||
@ -596,11 +602,11 @@ int accumulate_req(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) {
|
||||
/* rely on UCX network atomics if the user told us that it safe */
|
||||
if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) {
|
||||
return do_atomic_op_intrinsic(module, op, target,
|
||||
origin_addr, origin_count, origin_dt,
|
||||
target_disp, target_count, target_dt,
|
||||
NULL, ucx_req);
|
||||
target_disp, NULL, ucx_req);
|
||||
}
|
||||
|
||||
ret = start_atomicity(module, target, &lock_acquired);
|
||||
@ -726,14 +732,21 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
|
||||
int ret = OMPI_SUCCESS;
|
||||
bool lock_acquired = false;
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
if (sizeof(uint64_t) < dt_bytes) {
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = start_atomicity(module, target, &lock_acquired);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
if (!module->acc_single_intrinsic) {
|
||||
ret = start_atomicity(module, target, &lock_acquired);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
@ -743,11 +756,6 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
|
||||
}
|
||||
}
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
if (sizeof(uint64_t) < dt_bytes) {
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
uint64_t compare_val;
|
||||
memcpy(&compare_val, compare_addr, dt_bytes);
|
||||
memcpy(result_addr, origin_addr, dt_bytes);
|
||||
@ -841,11 +849,11 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) {
|
||||
/* rely on UCX network atomics if the user told us that it safe */
|
||||
if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) {
|
||||
return do_atomic_op_intrinsic(module, op, target,
|
||||
origin_addr, origin_count, origin_dt,
|
||||
target_disp, target_count, target_dt,
|
||||
result_addr, ucx_req);
|
||||
target_disp, result_addr, ucx_req);
|
||||
}
|
||||
|
||||
ret = start_atomicity(module, target, &lock_acquired);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user