From 73a183408fccc43c0eda7b15b02540895425a4c6 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 4 Sep 2019 14:03:39 +0200 Subject: [PATCH 01/18] UCX osc: add support for acc_single_intrinsic info key / mca param Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx.h | 2 + ompi/mca/osc/ucx/osc_ucx_comm.c | 195 +++++++++++++++++++++++++-- ompi/mca/osc/ucx/osc_ucx_component.c | 13 +- 3 files changed, 201 insertions(+), 9 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx.h b/ompi/mca/osc/ucx/osc_ucx.h index d2eed4662d..3265dc2fbc 100644 --- a/ompi/mca/osc/ucx/osc_ucx.h +++ b/ompi/mca/osc/ucx/osc_ucx.h @@ -34,6 +34,7 @@ typedef struct ompi_osc_ucx_component { int num_incomplete_req_ops; int num_modules; bool no_locks; /* Default value of the no_locks info key for new windows */ + bool acc_single_intrinsic; unsigned int priority; } ompi_osc_ucx_component_t; @@ -115,6 +116,7 @@ typedef struct ompi_osc_ucx_module { int *start_grp_ranks; bool lock_all_is_nocheck; bool no_locks; + bool acc_single_intrinsic; opal_common_ucx_ctx_t *ctx; opal_common_ucx_wpmem_t *mem; opal_common_ucx_wpmem_t *state_mem; diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index e70516033f..062673c5ae 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -323,6 +323,149 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module return ret; } +static int atomic_op_replace_sum( + ompi_osc_ucx_module_t *module, + struct ompi_op_t *op, + int target, + const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + void *result_addr) +{ + int ret = OMPI_SUCCESS; + size_t origin_dt_bytes; + size_t target_dt_bytes; + ompi_datatype_type_size(origin_dt, &origin_dt_bytes); + ompi_datatype_type_size(target_dt, &target_dt_bytes); + + if (origin_dt_bytes > sizeof(uint64_t) || + origin_dt_bytes != target_dt_bytes || + target_count != origin_count) { + return OMPI_ERR_NOT_SUPPORTED; + } + + uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); + + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + ret = get_dynamic_win_info(remote_addr, module, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + + ucp_atomic_fetch_op_t opcode; + if (op == &ompi_mpi_op_replace.op) { + opcode = UCP_ATOMIC_FETCH_OP_SWAP; + } else { + opcode = UCP_ATOMIC_FETCH_OP_FADD; + } + + for (int i = 0; i < origin_count; ++i) { + uint64_t value = 0; + memcpy(&value, origin_addr, origin_dt_bytes); + ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, + result_addr ? result_addr : &(module->req_result), + origin_dt_bytes, remote_addr, NULL, NULL); + + // advance origin and remote address + origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes); + remote_addr += origin_dt_bytes; + if (result_addr) { + result_addr = (void*)((intptr_t)result_addr + origin_dt_bytes); + } + } + + return ret; +} + +static int atomic_op_cswap( + ompi_osc_ucx_module_t *module, + struct ompi_op_t *op, + int target, + const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + void *result_addr) +{ + int ret = OMPI_SUCCESS; + size_t origin_dt_bytes; + size_t target_dt_bytes; + ompi_datatype_type_size(origin_dt, &origin_dt_bytes); + ompi_datatype_type_size(target_dt, &target_dt_bytes); + + if (origin_dt_bytes > sizeof(uint64_t) || + origin_dt_bytes != target_dt_bytes || + target_count != origin_count) { + return OMPI_ERR_NOT_SUPPORTED; + } + + uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); + + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + ret = get_dynamic_win_info(remote_addr, module, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + + for (int i = 0; i < origin_count; ++i) { + + uint64_t tmp_val; + do { + uint64_t target_val = 0; + + // get the value from the origin + ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, + target, &target_val, origin_dt_bytes, + remote_addr); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + + tmp_val = target_val; + // compute the result value + ompi_op_reduce(op, (void *)origin_addr, &tmp_val, 1, origin_dt); + + // compare-and-swap the resulting value + ret = opal_common_ucx_wpmem_cmpswp(module->mem, target_val, tmp_val, + target, &tmp_val, origin_dt_bytes, + remote_addr); + if (ret != OMPI_SUCCESS) { + return ret; + } + + // check whether the conditional swap was successful + if (tmp_val == target_val) { + break; + } + + } while (1); + + // store the result if necessary + if (NULL != result_addr) { + memcpy(result_addr, &tmp_val, origin_dt_bytes); + result_addr = (void*)((intptr_t)result_addr + origin_dt_bytes); + } + // advance origin and remote address + origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes); + remote_addr += origin_dt_bytes; + } + + return ret; +} + + int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, int target, ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { @@ -449,6 +592,22 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, return ret; } + if (module->acc_single_intrinsic) { + if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { + ret = atomic_op_replace_sum(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + &(module->req_result)); + } else { + ret = atomic_op_cswap(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + &(module->req_result)); + } + return ret; + } + + ret = start_atomicity(module, target); if (ret != OMPI_SUCCESS) { return ret; @@ -569,9 +728,11 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a return ret; } - ret = start_atomicity(module, target); - if (ret != OMPI_SUCCESS) { - return ret; + if (!module->acc_single_intrinsic) { + ret = start_atomicity(module, target); + if (ret != OMPI_SUCCESS) { + return ret; + } } if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { @@ -585,7 +746,8 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a ret = opal_common_ucx_wpmem_cmpswp(module->mem,*(uint64_t *)compare_addr, *(uint64_t *)origin_addr, target, result_addr, dt_bytes, remote_addr); - if (ret != OMPI_SUCCESS) { + + if (module->acc_single_intrinsic) { return ret; } @@ -611,9 +773,11 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, ucp_atomic_fetch_op_t opcode; size_t dt_bytes; - ret = start_atomicity(module, target); - if (ret != OMPI_SUCCESS) { - return ret; + if (!module->acc_single_intrinsic) { + ret = start_atomicity(module, target); + if (ret != OMPI_SUCCESS) { + return ret; + } } if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { @@ -636,7 +800,8 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, ret = opal_common_ucx_wpmem_fetch(module->mem, opcode, value, target, (void *)result_addr, dt_bytes, remote_addr); - if (ret != OMPI_SUCCESS) { + + if (module->acc_single_intrinsic) { return ret; } @@ -662,6 +827,20 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, return ret; } + if (module->acc_single_intrinsic) { + if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { + ret = atomic_op_replace_sum(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, result_addr); + } else { + ret = atomic_op_cswap(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, result_addr); + } + return ret; + } + + ret = start_atomicity(module, target); if (ret != OMPI_SUCCESS) { return ret; diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c index a4b3cfd1d4..bcef7b884f 100644 --- a/ompi/mca/osc/ucx/osc_ucx_component.c +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -72,7 +72,8 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = { .wpool = NULL, .env_initialized = false, .num_incomplete_req_ops = 0, - .num_modules = 0 + .num_modules = 0, + .acc_single_intrinsic = false }; ompi_osc_ucx_module_t ompi_osc_ucx_module_template = { @@ -167,6 +168,15 @@ static int component_register(void) { MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.no_locks); free(description_str); + mca_osc_ucx_component.acc_single_intrinsic = false; + opal_asprintf(&description_str, "Enable optimizations for MPI_Fetch_and_op, MPI_Accumulate, etc for codes " + "that will not use anything more than a single predefined datatype (default: %s)", + mca_osc_ucx_component.acc_single_intrinsic ? "true" : "false"); + (void) mca_base_component_var_register(&mca_osc_ucx_component.super.osc_version, "acc_single_intrinsic", + description_str, MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_5, + MCA_BASE_VAR_SCOPE_GROUP, &mca_osc_ucx_component.acc_single_intrinsic); + free(description_str); + opal_common_ucx_mca_var_register(&mca_osc_ucx_component.super.osc_version); return OMPI_SUCCESS; @@ -389,6 +399,7 @@ select_unlock: module->flavor = flavor; module->size = size; module->no_locks = check_config_value_bool ("no_locks", info); + module->acc_single_intrinsic = check_config_value_bool ("acc_single_intrinsic", info); /* share everyone's displacement units. Only do an allgather if strictly necessary, since it requires O(p) state. */ From d448efd49c601476947c2b157dcdf223626b8b92 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 13 Sep 2019 18:10:42 +0200 Subject: [PATCH 02/18] UCX osc: properly clean up requests in case of errors Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 062673c5ae..c24894944c 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -998,6 +998,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, sizeof(uint64_t), remote_addr, req_completion, ucx_req); if (ret != OMPI_SUCCESS) { + OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } @@ -1049,6 +1050,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count, sizeof(uint64_t), remote_addr, req_completion, ucx_req); if (ret != OMPI_SUCCESS) { + OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } @@ -1077,6 +1079,7 @@ int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count, ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp, target_count, target_dt, op, win); if (ret != OMPI_SUCCESS) { + OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } @@ -1111,6 +1114,7 @@ int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count, target, target_disp, target_count, target_datatype, op, win); if (ret != OMPI_SUCCESS) { + OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } From 8606a02b87a36a47ffe18ee07a32d27474acc3f9 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 13 Sep 2019 18:11:17 +0200 Subject: [PATCH 03/18] UCX osc: fix macro parameter name usage in OMPI_OSC_UCX_REQUEST_RETURN Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_request.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_request.h b/ompi/mca/osc/ucx/osc_ucx_request.h index 7dae51986f..b471e671ba 100644 --- a/ompi/mca/osc/ucx/osc_ucx_request.h +++ b/ompi/mca/osc/ucx/osc_ucx_request.h @@ -43,7 +43,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t); #define OMPI_OSC_UCX_REQUEST_RETURN(req) \ do { \ - OMPI_REQUEST_FINI(&request->super); \ + OMPI_REQUEST_FINI(&req->super); \ opal_free_list_return (&mca_osc_ucx_component.requests, \ (opal_free_list_item_t*) req); \ } while (0) From 1a3c6bbf3595889405c0712eb99a2ea8d4e8c96f Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 13 Sep 2019 18:16:35 +0200 Subject: [PATCH 04/18] UCX osc: re-use value returned by cswap to save additional get Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index c24894944c..d3acaa1c66 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -417,21 +417,22 @@ static int atomic_op_cswap( for (int i = 0; i < origin_count; ++i) { uint64_t tmp_val; + uint64_t target_val = 0; + + // get the value from the origin + ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, + target, &target_val, origin_dt_bytes, + remote_addr); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + do { - uint64_t target_val = 0; - - // get the value from the origin - ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, - target, &target_val, origin_dt_bytes, - remote_addr); - if (ret != OMPI_SUCCESS) { - return ret; - } - - ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); - if (ret != OMPI_SUCCESS) { - return ret; - } tmp_val = target_val; // compute the result value From 557ae80858f697f71a7f292844ed7f525cfad77c Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 13 Sep 2019 18:19:19 +0200 Subject: [PATCH 05/18] UCX osc: allow for overlap with (some) request-based atomic operations Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 173 +++++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 49 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index d3acaa1c66..5745a051e9 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -323,7 +323,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module return ret; } -static int atomic_op_replace_sum( +static int do_atomic_op_replace_sum( ompi_osc_ucx_module_t *module, struct ompi_op_t *op, int target, @@ -333,7 +333,8 @@ static int atomic_op_replace_sum( ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *target_dt, - void *result_addr) + void *result_addr, + ompi_osc_ucx_request_t *ucx_req) { int ret = OMPI_SUCCESS; size_t origin_dt_bytes; @@ -363,12 +364,27 @@ static int atomic_op_replace_sum( opcode = UCP_ATOMIC_FETCH_OP_FADD; } + opal_common_ucx_user_req_handler_t user_req_cb = NULL; + void* user_req_ptr = NULL; for (int i = 0; i < origin_count; ++i) { uint64_t value = 0; + if ((origin_count - 1) == i && NULL != ucx_req) { + // the last item is used to feed the request, if needed + user_req_cb = &req_completion; + user_req_ptr = ucx_req; + // issue a fence if this is the last but not the only element + if (0 < i) { + ret = opal_common_ucx_wpmem_fence(module->mem); + if (ret != OMPI_SUCCESS) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); + return OMPI_ERROR; + } + } + } memcpy(&value, origin_addr, origin_dt_bytes); ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, result_addr ? result_addr : &(module->req_result), - origin_dt_bytes, remote_addr, NULL, NULL); + origin_dt_bytes, remote_addr, user_req_cb, user_req_ptr); // advance origin and remote address origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes); @@ -381,7 +397,7 @@ static int atomic_op_replace_sum( return ret; } -static int atomic_op_cswap( +static int do_atomic_op_cswap( ompi_osc_ucx_module_t *module, struct ompi_op_t *op, int target, @@ -391,7 +407,8 @@ static int atomic_op_cswap( ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *target_dt, - void *result_addr) + void *result_addr, + ompi_osc_ucx_request_t *ucx_req) { int ret = OMPI_SUCCESS; size_t origin_dt_bytes; @@ -432,6 +449,7 @@ static int atomic_op_cswap( return ret; } + /* JS: move this loop into the request to overlap multiple cas operations? */ do { tmp_val = target_val; @@ -451,6 +469,8 @@ static int atomic_op_cswap( break; } + target_val = tmp_val; + } while (1); // store the result if necessary @@ -463,6 +483,41 @@ static int atomic_op_cswap( remote_addr += origin_dt_bytes; } + if (NULL != ucx_req) { + // nothing to wait for so mark the request as completed + ompi_request_complete(&ucx_req->super, true); + } + + return ret; +} + +static inline +int do_atomic_op( + ompi_osc_ucx_module_t *module, + struct ompi_op_t *op, + int target, + const void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + ptrdiff_t target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + void *result_addr, + ompi_osc_ucx_request_t *ucx_req) +{ + int ret; + + if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { + ret = do_atomic_op_replace_sum(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + result_addr, ucx_req); + } else { + ret = do_atomic_op_cswap(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + result_addr, ucx_req); + } return ret; } @@ -576,11 +631,14 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count, } } -int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, - struct ompi_datatype_t *origin_dt, - int target, ptrdiff_t target_disp, int target_count, - struct ompi_datatype_t *target_dt, - struct ompi_op_t *op, struct ompi_win_t *win) { +static +int accumulate_req(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win, + ompi_osc_ucx_request_t *ucx_req) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; @@ -594,18 +652,10 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, } if (module->acc_single_intrinsic) { - if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { - ret = atomic_op_replace_sum(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - &(module->req_result)); - } else { - ret = atomic_op_cswap(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - &(module->req_result)); - } - return ret; + return do_atomic_op(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + NULL, ucx_req); } @@ -712,9 +762,23 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, free(temp_addr_holder); } + if (NULL != ucx_req) { + // nothing to wait for, mark request as completed + ompi_request_complete(&ucx_req->super, true); + } + return end_atomicity(module, target); } +int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win) { + return accumulate_req(origin_addr, origin_count, origin_dt, target, + target_disp, target_count, target_dt, op, win, NULL); +} + int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr, void *result_addr, struct ompi_datatype_t *dt, int target, ptrdiff_t target_disp, @@ -813,13 +877,15 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, } } -int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, - struct ompi_datatype_t *origin_dt, - void *result_addr, int result_count, - struct ompi_datatype_t *result_dt, - int target, ptrdiff_t target_disp, - int target_count, struct ompi_datatype_t *target_dt, - struct ompi_op_t *op, struct ompi_win_t *win) { +static +int get_accumulate_req(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + void *result_addr, int result_count, + struct ompi_datatype_t *result_dt, + int target, ptrdiff_t target_disp, + int target_count, struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win, + ompi_osc_ucx_request_t *ucx_req) { ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; @@ -829,19 +895,12 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, } if (module->acc_single_intrinsic) { - if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { - ret = atomic_op_replace_sum(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, result_addr); - } else { - ret = atomic_op_cswap(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, result_addr); - } - return ret; + return do_atomic_op(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + result_addr, ucx_req); } - ret = start_atomicity(module, target); if (ret != OMPI_SUCCESS) { return ret; @@ -953,9 +1012,28 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, } } + if (NULL != ucx_req) { + // nothing to wait for, mark request as completed + ompi_request_complete(&ucx_req->super, true); + } + + return end_atomicity(module, target); } +int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + void *result_addr, int result_count, + struct ompi_datatype_t *result_dt, + int target, ptrdiff_t target_disp, + int target_count, struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win) { + + return get_accumulate_req(origin_addr, origin_count, origin_dt, result_addr, + result_count, result_dt, target, target_disp, + target_count, target_dt, op, win, NULL); +} + int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, int target, ptrdiff_t target_disp, int target_count, @@ -1077,14 +1155,13 @@ int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count, OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); assert(NULL != ucx_req); - ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp, - target_count, target_dt, op, win); + ret = accumulate_req(origin_addr, origin_count, origin_dt, target, target_disp, + target_count, target_dt, op, win, ucx_req); if (ret != OMPI_SUCCESS) { OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } - ompi_request_complete(&ucx_req->super, true); *request = &ucx_req->super; return ret; @@ -1110,17 +1187,15 @@ int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count, OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); assert(NULL != ucx_req); - ret = ompi_osc_ucx_get_accumulate(origin_addr, origin_count, origin_datatype, - result_addr, result_count, result_datatype, - target, target_disp, target_count, - target_datatype, op, win); + ret = get_accumulate_req(origin_addr, origin_count, origin_datatype, + result_addr, result_count, result_datatype, + target, target_disp, target_count, + target_datatype, op, win, ucx_req); if (ret != OMPI_SUCCESS) { OMPI_OSC_UCX_REQUEST_RETURN(ucx_req); return ret; } - ompi_request_complete(&ucx_req->super, true); - *request = &ucx_req->super; return ret; From 7cfc0e71da4061f082038b326524151aeb1f3d6f Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 16 Sep 2019 17:21:15 +0200 Subject: [PATCH 06/18] UCX osc: allow to asynchronously compare-and-swap Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 5745a051e9..c97ab43d76 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -808,14 +808,29 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a } ompi_datatype_type_size(dt, &dt_bytes); - ret = opal_common_ucx_wpmem_cmpswp(module->mem,*(uint64_t *)compare_addr, - *(uint64_t *)origin_addr, target, - result_addr, dt_bytes, remote_addr); + if (sizeof(uint64_t) < dt_bytes) { + return OMPI_ERR_NOT_SUPPORTED; + } + + uint64_t compare_val; + memcpy(&compare_val, compare_addr, dt_bytes); + memcpy(result_addr, origin_addr, dt_bytes); + ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_CSWAP, + compare_val, target, + result_addr, dt_bytes, remote_addr, + NULL, NULL); if (module->acc_single_intrinsic) { return ret; } + // fence before releasing the accumulate lock + ret = opal_common_ucx_wpmem_fence(module->mem); + if (ret != OMPI_SUCCESS) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); + // don't return error, try to release the accumulate lock + } + return end_atomicity(module, target); } From d888b4fd76faba5c6927c530e82f9768931ea439 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 20 Sep 2019 14:39:59 +0200 Subject: [PATCH 07/18] UCX osc: correctly handle MPI_NO_OP Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index c97ab43d76..ce7f7fa9c7 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -358,10 +358,14 @@ static int do_atomic_op_replace_sum( } ucp_atomic_fetch_op_t opcode; + bool is_no_op = false; if (op == &ompi_mpi_op_replace.op) { opcode = UCP_ATOMIC_FETCH_OP_SWAP; } else { opcode = UCP_ATOMIC_FETCH_OP_FADD; + if (op == &ompi_mpi_op_no_op.op) { + is_no_op = true; + } } opal_common_ucx_user_req_handler_t user_req_cb = NULL; @@ -381,7 +385,11 @@ static int do_atomic_op_replace_sum( } } } - memcpy(&value, origin_addr, origin_dt_bytes); + if (is_no_op) { + value = 0; + } else { + memcpy(&value, origin_addr, origin_dt_bytes); + } ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, result_addr ? result_addr : &(module->req_result), origin_dt_bytes, remote_addr, user_req_cb, user_req_ptr); @@ -507,7 +515,9 @@ int do_atomic_op( { int ret; - if (op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { + if (op == &ompi_mpi_op_replace.op || + op == &ompi_mpi_op_sum.op || + op == &ompi_mpi_op_no_op.op) { ret = do_atomic_op_replace_sum(module, op, target, origin_addr, origin_count, origin_dt, target_disp, target_count, target_dt, From 899f58cef51fd10b724c75af60a08506c341444c Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 4 Nov 2019 15:03:28 +0100 Subject: [PATCH 08/18] UCX osc: simplify output address computation Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index ce7f7fa9c7..3a7161ef1f 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -369,7 +369,11 @@ static int do_atomic_op_replace_sum( } opal_common_ucx_user_req_handler_t user_req_cb = NULL; - void* user_req_ptr = NULL; + void *user_req_ptr = NULL; + void *output_addr = &(module->req_result); + if( result_addr ) { + output_addr = result_addr; + } for (int i = 0; i < origin_count; ++i) { uint64_t value = 0; if ((origin_count - 1) == i && NULL != ucx_req) { @@ -391,14 +395,14 @@ static int do_atomic_op_replace_sum( memcpy(&value, origin_addr, origin_dt_bytes); } ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, - result_addr ? result_addr : &(module->req_result), - origin_dt_bytes, remote_addr, user_req_cb, user_req_ptr); + output_addr, origin_dt_bytes, remote_addr, + user_req_cb, user_req_ptr); // advance origin and remote address origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes); remote_addr += origin_dt_bytes; if (result_addr) { - result_addr = (void*)((intptr_t)result_addr + origin_dt_bytes); + output_addr = (void*)((intptr_t)output_addr + origin_dt_bytes); } } From 4d7a3856face0dafc4d6dd19fc272bb4c2314a24 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 4 Nov 2019 15:04:22 +0100 Subject: [PATCH 09/18] UCX osc: Use accumulate for operations/datatypes that are not covered by UCX Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 169 ++++++-------------------------- 1 file changed, 30 insertions(+), 139 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 3a7161ef1f..366a920374 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -323,7 +323,25 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module return ret; } -static int do_atomic_op_replace_sum( +static inline +bool use_ucx_op(struct ompi_op_t *op, struct ompi_datatype_t *origin_dt) +{ + + if (op == &ompi_mpi_op_replace.op || + op == &ompi_mpi_op_sum.op || + op == &ompi_mpi_op_no_op.op) { + size_t dt_bytes; + ompi_datatype_type_size(origin_dt, &dt_bytes); + if (ompi_datatype_is_predefined(origin_dt) && + sizeof(uint64_t) >= dt_bytes) { + return true; + } + } + + return false; +} + +static int do_atomic_op_intrinsic( ompi_osc_ucx_module_t *module, struct ompi_op_t *op, int target, @@ -342,7 +360,7 @@ static int do_atomic_op_replace_sum( ompi_datatype_type_size(origin_dt, &origin_dt_bytes); ompi_datatype_type_size(target_dt, &target_dt_bytes); - if (origin_dt_bytes > sizeof(uint64_t) || + if (sizeof(uint64_t) > origin_dt_bytes || origin_dt_bytes != target_dt_bytes || target_count != origin_count) { return OMPI_ERR_NOT_SUPPORTED; @@ -409,133 +427,6 @@ static int do_atomic_op_replace_sum( return ret; } -static int do_atomic_op_cswap( - ompi_osc_ucx_module_t *module, - struct ompi_op_t *op, - int target, - const void *origin_addr, - int origin_count, - struct ompi_datatype_t *origin_dt, - ptrdiff_t target_disp, - int target_count, - struct ompi_datatype_t *target_dt, - void *result_addr, - ompi_osc_ucx_request_t *ucx_req) -{ - int ret = OMPI_SUCCESS; - size_t origin_dt_bytes; - size_t target_dt_bytes; - ompi_datatype_type_size(origin_dt, &origin_dt_bytes); - ompi_datatype_type_size(target_dt, &target_dt_bytes); - - if (origin_dt_bytes > sizeof(uint64_t) || - origin_dt_bytes != target_dt_bytes || - target_count != origin_count) { - return OMPI_ERR_NOT_SUPPORTED; - } - - uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); - - if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { - ret = get_dynamic_win_info(remote_addr, module, target); - if (ret != OMPI_SUCCESS) { - return ret; - } - } - - for (int i = 0; i < origin_count; ++i) { - - uint64_t tmp_val; - uint64_t target_val = 0; - - // get the value from the origin - ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, - target, &target_val, origin_dt_bytes, - remote_addr); - if (ret != OMPI_SUCCESS) { - return ret; - } - - ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); - if (ret != OMPI_SUCCESS) { - return ret; - } - - /* JS: move this loop into the request to overlap multiple cas operations? */ - do { - - tmp_val = target_val; - // compute the result value - ompi_op_reduce(op, (void *)origin_addr, &tmp_val, 1, origin_dt); - - // compare-and-swap the resulting value - ret = opal_common_ucx_wpmem_cmpswp(module->mem, target_val, tmp_val, - target, &tmp_val, origin_dt_bytes, - remote_addr); - if (ret != OMPI_SUCCESS) { - return ret; - } - - // check whether the conditional swap was successful - if (tmp_val == target_val) { - break; - } - - target_val = tmp_val; - - } while (1); - - // store the result if necessary - if (NULL != result_addr) { - memcpy(result_addr, &tmp_val, origin_dt_bytes); - result_addr = (void*)((intptr_t)result_addr + origin_dt_bytes); - } - // advance origin and remote address - origin_addr = (void*)((intptr_t)origin_addr + origin_dt_bytes); - remote_addr += origin_dt_bytes; - } - - if (NULL != ucx_req) { - // nothing to wait for so mark the request as completed - ompi_request_complete(&ucx_req->super, true); - } - - return ret; -} - -static inline -int do_atomic_op( - ompi_osc_ucx_module_t *module, - struct ompi_op_t *op, - int target, - const void *origin_addr, - int origin_count, - struct ompi_datatype_t *origin_dt, - ptrdiff_t target_disp, - int target_count, - struct ompi_datatype_t *target_dt, - void *result_addr, - ompi_osc_ucx_request_t *ucx_req) -{ - int ret; - - if (op == &ompi_mpi_op_replace.op || - op == &ompi_mpi_op_sum.op || - op == &ompi_mpi_op_no_op.op) { - ret = do_atomic_op_replace_sum(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - result_addr, ucx_req); - } else { - ret = do_atomic_op_cswap(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - result_addr, ucx_req); - } - return ret; -} - - int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, int target, ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { @@ -665,11 +556,11 @@ int accumulate_req(const void *origin_addr, int origin_count, return ret; } - if (module->acc_single_intrinsic) { - return do_atomic_op(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - NULL, ucx_req); + if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) { + return do_atomic_op_intrinsic(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + NULL, ucx_req); } @@ -923,11 +814,11 @@ int get_accumulate_req(const void *origin_addr, int origin_count, return ret; } - if (module->acc_single_intrinsic) { - return do_atomic_op(module, op, target, - origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - result_addr, ucx_req); + if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) { + return do_atomic_op_intrinsic(module, op, target, + origin_addr, origin_count, origin_dt, + target_disp, target_count, target_dt, + result_addr, ucx_req); } ret = start_atomicity(module, target); From 471d76777ad04cec37020a2cfd8e02988077657d Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 4 Nov 2019 16:58:31 +0100 Subject: [PATCH 10/18] UCX osc: fence active operations before releasing accumulate lock and free memory if required Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 53 +++++++++++++++------------------ 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 366a920374..60e5dddaaf 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -257,15 +257,30 @@ static inline int start_atomicity(ompi_osc_ucx_module_t *module, int target) { } } -static inline int end_atomicity(ompi_osc_ucx_module_t *module, int target) { +static inline int end_atomicity( + ompi_osc_ucx_module_t *module, + int target, + void *free_ptr) { uint64_t result_value = 0; uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET; int ret = OMPI_SUCCESS; + /* fence any still active operations */ + ret = opal_common_ucx_wpmem_fence(module->mem); + if (ret != OMPI_SUCCESS) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); + return OMPI_ERROR; + } + ret = opal_common_ucx_wpmem_fetch(module->state_mem, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, target, &result_value, sizeof(result_value), remote_addr); + + /* TODO: encapsulate in a request and make the release non-blocking */ + if (NULL != free_ptr) { + free(free_ptr); + } if (ret != OMPI_SUCCESS) { OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret); return OMPI_ERROR; @@ -546,6 +561,7 @@ int accumulate_req(const void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; + void *free_ptr = NULL; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { @@ -576,7 +592,6 @@ int accumulate_req(const void *origin_addr, int origin_count, return ret; } } else { - void *temp_addr_holder = NULL; void *temp_addr = NULL; uint32_t temp_count; ompi_datatype_t *temp_dt; @@ -593,7 +608,7 @@ int accumulate_req(const void *origin_addr, int origin_count, } } ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); - temp_addr = temp_addr_holder = malloc(temp_extent * temp_count); + temp_addr = free_ptr = malloc(temp_extent * temp_count); if (temp_addr == NULL) { return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } @@ -659,12 +674,6 @@ int accumulate_req(const void *origin_addr, int origin_count, return ret; } - ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); - if (ret != OMPI_SUCCESS) { - return ret; - } - - free(temp_addr_holder); } if (NULL != ucx_req) { @@ -672,7 +681,7 @@ int accumulate_req(const void *origin_addr, int origin_count, ompi_request_complete(&ucx_req->super, true); } - return end_atomicity(module, target); + return end_atomicity(module, target, free_ptr); } int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, @@ -729,14 +738,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a return ret; } - // fence before releasing the accumulate lock - ret = opal_common_ucx_wpmem_fence(module->mem); - if (ret != OMPI_SUCCESS) { - OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); - // don't return error, try to release the accumulate lock - } - - return end_atomicity(module, target); + return end_atomicity(module, target, NULL); } int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, @@ -790,7 +792,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, return ret; } - return end_atomicity(module, target); + return end_atomicity(module, target, NULL); } else { return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt, target, target_disp, 1, dt, op, win); @@ -808,6 +810,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, ompi_osc_ucx_request_t *ucx_req) { ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; + void *free_addr = NULL; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { @@ -841,7 +844,6 @@ int get_accumulate_req(const void *origin_addr, int origin_count, return ret; } } else { - void *temp_addr_holder = NULL; void *temp_addr = NULL; uint32_t temp_count; ompi_datatype_t *temp_dt; @@ -858,7 +860,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, } } ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); - temp_addr = temp_addr_holder = malloc(temp_extent * temp_count); + temp_addr = free_addr = malloc(temp_extent * temp_count); if (temp_addr == NULL) { return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } @@ -922,13 +924,6 @@ int get_accumulate_req(const void *origin_addr, int origin_count, if (ret != OMPI_SUCCESS) { return ret; } - - ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); - if (ret != OMPI_SUCCESS) { - return ret; - } - - free(temp_addr_holder); } } @@ -938,7 +933,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, } - return end_atomicity(module, target); + return end_atomicity(module, target, free_addr); } int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, From 427d4bd2265d1961f08c6965f6281435d7e822fa Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 4 Nov 2019 17:22:00 +0100 Subject: [PATCH 11/18] UCX osc: do not acquire accumulate lock if exclusive lock was taken Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 104 ++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 40 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 60e5dddaaf..d6f42f043d 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -235,48 +235,73 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module, return ret; } -static inline int start_atomicity(ompi_osc_ucx_module_t *module, int target) { +static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target) +{ + ompi_osc_ucx_lock_t *lock = NULL; + opal_hash_table_get_value_uint32(&module->outstanding_locks, + (uint32_t) target, (void **) &lock); + + /* if there is an exclusive lock there is no need to acqurie the accumulate lock */ + return !(NULL != lock && LOCK_EXCLUSIVE == lock->type); +} + +static inline int start_atomicity( + ompi_osc_ucx_module_t *module, + int target, + bool *lock_acquired) { uint64_t result_value = -1; uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET; int ret = OMPI_SUCCESS; - for (;;) { - ret = opal_common_ucx_wpmem_cmpswp(module->state_mem, - TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE, - target, &result_value, sizeof(result_value), - remote_addr); - if (ret != OMPI_SUCCESS) { - OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret); - return OMPI_ERROR; - } - if (result_value == TARGET_LOCK_UNLOCKED) { - return OMPI_SUCCESS; + if (need_acc_lock(module, target)) { + for (;;) { + ret = opal_common_ucx_wpmem_cmpswp(module->state_mem, + TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE, + target, &result_value, sizeof(result_value), + remote_addr); + if (ret != OMPI_SUCCESS) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret); + return OMPI_ERROR; + } + if (result_value == TARGET_LOCK_UNLOCKED) { + return OMPI_SUCCESS; + } + + ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); } - ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); + *lock_acquired = true; + } else { + *lock_acquired = false; } } static inline int end_atomicity( ompi_osc_ucx_module_t *module, int target, + bool lock_acquired, void *free_ptr) { - uint64_t result_value = 0; uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET; int ret = OMPI_SUCCESS; - /* fence any still active operations */ - ret = opal_common_ucx_wpmem_fence(module->mem); - if (ret != OMPI_SUCCESS) { - OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); - return OMPI_ERROR; + if (lock_acquired) { + uint64_t result_value = 0; + /* fence any still active operations */ + ret = opal_common_ucx_wpmem_fence(module->mem); + if (ret != OMPI_SUCCESS) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret); + return OMPI_ERROR; + } + + ret = opal_common_ucx_wpmem_fetch(module->state_mem, + UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, + target, &result_value, sizeof(result_value), + remote_addr); + assert(result_value == TARGET_LOCK_EXCLUSIVE); + } else if (NULL != free_ptr){ + /* flush before freeing the buffer */ + ret = opal_common_ucx_wpmem_flush(module->state_mem, OPAL_COMMON_UCX_SCOPE_EP, target); } - - ret = opal_common_ucx_wpmem_fetch(module->state_mem, - UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED, - target, &result_value, sizeof(result_value), - remote_addr); - /* TODO: encapsulate in a request and make the release non-blocking */ if (NULL != free_ptr) { free(free_ptr); @@ -286,8 +311,6 @@ static inline int end_atomicity( return OMPI_ERROR; } - assert(result_value == TARGET_LOCK_EXCLUSIVE); - return ret; } @@ -562,6 +585,7 @@ int accumulate_req(const void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; void *free_ptr = NULL; + bool lock_acquired = false; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { @@ -579,8 +603,7 @@ int accumulate_req(const void *origin_addr, int origin_count, NULL, ucx_req); } - - ret = start_atomicity(module, target); + ret = start_atomicity(module, target, &lock_acquired); if (ret != OMPI_SUCCESS) { return ret; } @@ -681,7 +704,7 @@ int accumulate_req(const void *origin_addr, int origin_count, ompi_request_complete(&ucx_req->super, true); } - return end_atomicity(module, target, free_ptr); + return end_atomicity(module, target, lock_acquired, free_ptr); } int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, @@ -701,17 +724,16 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); size_t dt_bytes; int ret = OMPI_SUCCESS; + bool lock_acquired = false; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { return ret; } - if (!module->acc_single_intrinsic) { - ret = start_atomicity(module, target); - if (ret != OMPI_SUCCESS) { - return ret; - } + ret = start_atomicity(module, target, &lock_acquired); + if (ret != OMPI_SUCCESS) { + return ret; } if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { @@ -738,7 +760,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a return ret; } - return end_atomicity(module, target, NULL); + return end_atomicity(module, target, lock_acquired, NULL); } int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, @@ -759,9 +781,10 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, uint64_t value = origin_addr ? *(uint64_t *)origin_addr : 0; ucp_atomic_fetch_op_t opcode; size_t dt_bytes; + bool lock_acquired = false; if (!module->acc_single_intrinsic) { - ret = start_atomicity(module, target); + ret = start_atomicity(module, target, &lock_acquired); if (ret != OMPI_SUCCESS) { return ret; } @@ -792,7 +815,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, return ret; } - return end_atomicity(module, target, NULL); + return end_atomicity(module, target, lock_acquired, NULL); } else { return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt, target, target_disp, 1, dt, op, win); @@ -811,6 +834,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; void *free_addr = NULL; + bool lock_acquired = false; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { @@ -824,7 +848,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, result_addr, ucx_req); } - ret = start_atomicity(module, target); + ret = start_atomicity(module, target, &lock_acquired); if (ret != OMPI_SUCCESS) { return ret; } @@ -933,7 +957,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, } - return end_atomicity(module, target, free_addr); + return end_atomicity(module, target, lock_acquired, free_addr); } int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, From d8696aa8c4b77e2edafc1ad864c8d54f5c7bd3f1 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 30 Mar 2020 21:56:39 +0200 Subject: [PATCH 12/18] UCX osc: centralize decision on whether to use AMOs Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 84 ++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index d6f42f043d..8841b8f8aa 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -264,7 +264,7 @@ static inline int start_atomicity( return OMPI_ERROR; } if (result_value == TARGET_LOCK_UNLOCKED) { - return OMPI_SUCCESS; + break; } ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker); @@ -274,6 +274,8 @@ static inline int start_atomicity( } else { *lock_acquired = false; } + + return OMPI_SUCCESS; } static inline int end_atomicity( @@ -362,16 +364,30 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module } static inline -bool use_ucx_op(struct ompi_op_t *op, struct ompi_datatype_t *origin_dt) +bool use_atomic_op( + ompi_osc_ucx_module_t *module, + struct ompi_op_t *op, + struct ompi_datatype_t *origin_dt, + struct ompi_datatype_t *target_dt, + int origin_count, + int target_count) { - if (op == &ompi_mpi_op_replace.op || - op == &ompi_mpi_op_sum.op || - op == &ompi_mpi_op_no_op.op) { - size_t dt_bytes; - ompi_datatype_type_size(origin_dt, &dt_bytes); - if (ompi_datatype_is_predefined(origin_dt) && - sizeof(uint64_t) >= dt_bytes) { + if (module->acc_single_intrinsic && + ompi_datatype_is_predefined(origin_dt) && + origin_count == 1 && + (op == &ompi_mpi_op_replace.op || + op == &ompi_mpi_op_sum.op || + op == &ompi_mpi_op_no_op.op)) { + size_t origin_dt_bytes; + size_t target_dt_bytes; + ompi_datatype_type_size(origin_dt, &origin_dt_bytes); + ompi_datatype_type_size(target_dt, &target_dt_bytes); + /* UCX only supports 32 and 64-bit operands atm */ + if (sizeof(uint64_t) >= origin_dt_bytes && + sizeof(uint32_t) <= origin_dt_bytes && + origin_dt_bytes == target_dt_bytes && + origin_count == target_count) { return true; } } @@ -384,25 +400,15 @@ static int do_atomic_op_intrinsic( struct ompi_op_t *op, int target, const void *origin_addr, - int origin_count, - struct ompi_datatype_t *origin_dt, + int count, + struct ompi_datatype_t *dt, ptrdiff_t target_disp, - int target_count, - struct ompi_datatype_t *target_dt, void *result_addr, ompi_osc_ucx_request_t *ucx_req) { int ret = OMPI_SUCCESS; size_t origin_dt_bytes; - size_t target_dt_bytes; - ompi_datatype_type_size(origin_dt, &origin_dt_bytes); - ompi_datatype_type_size(target_dt, &target_dt_bytes); - - if (sizeof(uint64_t) > origin_dt_bytes || - origin_dt_bytes != target_dt_bytes || - target_count != origin_count) { - return OMPI_ERR_NOT_SUPPORTED; - } + ompi_datatype_type_size(dt, &origin_dt_bytes); uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); @@ -430,9 +436,9 @@ static int do_atomic_op_intrinsic( if( result_addr ) { output_addr = result_addr; } - for (int i = 0; i < origin_count; ++i) { + for (int i = 0; i < count; ++i) { uint64_t value = 0; - if ((origin_count - 1) == i && NULL != ucx_req) { + if ((count - 1) == i && NULL != ucx_req) { // the last item is used to feed the request, if needed user_req_cb = &req_completion; user_req_ptr = ucx_req; @@ -596,11 +602,11 @@ int accumulate_req(const void *origin_addr, int origin_count, return ret; } - if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) { + /* rely on UCX network atomics if the user told us that it safe */ + if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) { return do_atomic_op_intrinsic(module, op, target, origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - NULL, ucx_req); + target_disp, NULL, ucx_req); } ret = start_atomicity(module, target, &lock_acquired); @@ -726,14 +732,21 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a int ret = OMPI_SUCCESS; bool lock_acquired = false; + ompi_datatype_type_size(dt, &dt_bytes); + if (sizeof(uint64_t) < dt_bytes) { + return OMPI_ERR_NOT_SUPPORTED; + } + ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { return ret; } - ret = start_atomicity(module, target, &lock_acquired); - if (ret != OMPI_SUCCESS) { - return ret; + if (!module->acc_single_intrinsic) { + ret = start_atomicity(module, target, &lock_acquired); + if (ret != OMPI_SUCCESS) { + return ret; + } } if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { @@ -743,11 +756,6 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a } } - ompi_datatype_type_size(dt, &dt_bytes); - if (sizeof(uint64_t) < dt_bytes) { - return OMPI_ERR_NOT_SUPPORTED; - } - uint64_t compare_val; memcpy(&compare_val, compare_addr, dt_bytes); memcpy(result_addr, origin_addr, dt_bytes); @@ -841,11 +849,11 @@ int get_accumulate_req(const void *origin_addr, int origin_count, return ret; } - if (module->acc_single_intrinsic && use_ucx_op(op, origin_dt)) { + /* rely on UCX network atomics if the user told us that it safe */ + if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) { return do_atomic_op_intrinsic(module, op, target, origin_addr, origin_count, origin_dt, - target_disp, target_count, target_dt, - result_addr, ucx_req); + target_disp, result_addr, ucx_req); } ret = start_atomicity(module, target, &lock_acquired); From 5f786bcce4b061d4ed8ba83200c30ffbf8fc9a42 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 30 Mar 2020 21:57:44 +0200 Subject: [PATCH 13/18] UCX osc: make MPI_Fetch_and_op non-blocking if possible Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 8841b8f8aa..3c700f1adc 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -816,8 +816,9 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, } } - ret = opal_common_ucx_wpmem_fetch(module->mem, opcode, value, target, - (void *)result_addr, dt_bytes, remote_addr); + ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, + (void *)result_addr, dt_bytes, + remote_addr, NULL, NULL); if (module->acc_single_intrinsic) { return ret; From 824afac48349c7c92122385e14a6e0d0a29d18c0 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 11:17:35 +0200 Subject: [PATCH 14/18] UCX common: add non-blocking compare-and-swap Signed-off-by: Joseph Schuchart --- opal/mca/common/ucx/common_ucx.h | 41 +++++++++++++-------- opal/mca/common/ucx/common_ucx_wpool.h | 50 ++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 5adbe190c4..2195c543f5 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -206,22 +206,33 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare, uint64_t remote_addr, ucp_rkey_h rkey, ucp_worker_h worker) { - uint64_t tmp = value; - int ret; - - ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp, - op_size, remote_addr, rkey, worker); - if (OPAL_LIKELY(OPAL_SUCCESS == ret)) { - /* in case if op_size is constant (like sizeof(type)) then this condition - * is evaluated in compile time */ - if (op_size == sizeof(uint64_t)) { - *(uint64_t*)result = tmp; - } else { - assert(op_size == sizeof(uint32_t)); - *(uint32_t*)result = tmp; - } + if (op_size == sizeof(uint64_t)) { + *(uint64_t*)result = value; + } else { + assert(op_size == sizeof(uint32_t)); + *(uint32_t*)result = value; } - return ret; + + return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, + op_size, remote_addr, rkey, worker); +} + +static inline +ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare, + uint64_t value, void *result, size_t op_size, + uint64_t remote_addr, ucp_rkey_h rkey, + ucp_send_callback_t req_handler, + ucp_worker_h worker) +{ + if (op_size == sizeof(uint64_t)) { + *(uint64_t*)result = value; + } else { + assert(op_size == sizeof(uint32_t)); + *(uint32_t*)result = value; + } + + return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, + op_size, remote_addr, rkey, req_handler, worker); } END_C_DECLS diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 79f2163c43..95308fd56c 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -418,6 +418,56 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare, return rc; } + +static inline int +opal_common_ucx_wpmem_cmpswp_nb(opal_common_ucx_wpmem_t *mem, uint64_t compare, + uint64_t value, int target, void *buffer, size_t len, + uint64_t rem_addr, + opal_common_ucx_user_req_handler_t user_req_cb, + void *user_req_ptr) +{ + ucp_ep_h ep; + ucp_rkey_h rkey; + opal_common_ucx_winfo_t *winfo = NULL; + opal_common_ucx_request_t *req; + int rc = OPAL_SUCCESS; + + rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc); + return rc; + } + + /* Perform the operation */ + opal_mutex_lock(&winfo->mutex); + req = opal_common_ucx_atomic_cswap_nb(ep, compare, value, + buffer, len, + rem_addr, rkey, opal_common_ucx_req_completion, + winfo->worker); + + if (UCS_PTR_IS_PTR(req)) { + req->ext_req = user_req_ptr; + req->ext_cb = user_req_cb; + req->winfo = winfo; + } else { + if (user_req_cb != NULL) { + (*user_req_cb)(user_req_ptr); + } + } + + + rc = _periodical_flush_nb(mem, winfo, target); + if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){ + MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc); + return rc; + } + + opal_mutex_unlock(&winfo->mutex); + + return rc; +} + + static inline int opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode, uint64_t value, int target, size_t len, uint64_t rem_addr) From 7d5a6e3e8b930595b80dc8dcb78ee73d3a69a6aa Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 11:19:08 +0200 Subject: [PATCH 15/18] UCX osc: safely load/store 64bit integer from variable size pointer Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 18 +++++------ opal/mca/common/ucx/common_ucx.h | 52 +++++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 3c700f1adc..6bafc4ea0b 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -454,7 +454,7 @@ static int do_atomic_op_intrinsic( if (is_no_op) { value = 0; } else { - memcpy(&value, origin_addr, origin_dt_bytes); + value = opal_common_ucx_load_uint64(origin_addr, origin_dt_bytes); } ret = opal_common_ucx_wpmem_fetch_nb(module->mem, opcode, value, target, output_addr, origin_dt_bytes, remote_addr, @@ -756,13 +756,11 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a } } - uint64_t compare_val; - memcpy(&compare_val, compare_addr, dt_bytes); - memcpy(result_addr, origin_addr, dt_bytes); - ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_CSWAP, - compare_val, target, - result_addr, dt_bytes, remote_addr, - NULL, NULL); + uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes); + uint64_t value = opal_common_ucx_load_uint64(origin_addr, dt_bytes); + ret = opal_common_ucx_wpmem_cmpswp_nb(module->mem, compare_val, value, target, + result_addr, dt_bytes, remote_addr, + NULL, NULL); if (module->acc_single_intrinsic) { return ret; @@ -785,8 +783,8 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, if (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op || op == &ompi_mpi_op_sum.op) { + uint64_t value; uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); - uint64_t value = origin_addr ? *(uint64_t *)origin_addr : 0; ucp_atomic_fetch_op_t opcode; size_t dt_bytes; bool lock_acquired = false; @@ -805,7 +803,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, } } - ompi_datatype_type_size(dt, &dt_bytes); + value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0; if (op == &ompi_mpi_op_replace.op) { opcode = UCP_ATOMIC_FETCH_OP_SWAP; diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 2195c543f5..f877742a2d 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -115,6 +115,42 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs_nofence(opal_common_ucx_del_proc_t * size_t my_rank, size_t max_disconnect, ucp_worker_h worker); OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component); + +/** + * Load an integer value of \c size bytes from \c ptr and cast it to uint64_t. + */ +static inline +uint64_t opal_common_ucx_load_uint64(void *ptr, size_t size) +{ + if (sizeof(uint8_t) == size) { + return *(uint8_t*)ptr; + } else if (sizeof(uint16_t) == size) { + return *(uint16_t*)ptr; + } else if (sizeof(uint32_t) == size) { + return *(uint32_t*)ptr; + } else { + return *(uint64_t*)ptr; + } +} + +/** + * Cast and store a uint64_t value to a value of \c size bytes pointed to by \c ptr. + */ +static inline +void opal_common_ucx_store_uint64(uint64_t value, void *ptr, size_t size) +{ + if (sizeof(uint8_t) == size) { + *(uint8_t*)ptr = value; + } else if (sizeof(uint16_t) == size) { + *(uint16_t*)ptr = value; + } else if (sizeof(uint32_t) == size) { + *(uint32_t*)ptr = value; + } else { + *(uint64_t*)ptr = value; + } +} + + static inline ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request) { @@ -206,13 +242,7 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare, uint64_t remote_addr, ucp_rkey_h rkey, ucp_worker_h worker) { - if (op_size == sizeof(uint64_t)) { - *(uint64_t*)result = value; - } else { - assert(op_size == sizeof(uint32_t)); - *(uint32_t*)result = value; - } - + opal_common_ucx_store_uint64(value, result, op_size); return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, op_size, remote_addr, rkey, worker); } @@ -224,13 +254,7 @@ ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare, ucp_send_callback_t req_handler, ucp_worker_h worker) { - if (op_size == sizeof(uint64_t)) { - *(uint64_t*)result = value; - } else { - assert(op_size == sizeof(uint32_t)); - *(uint32_t*)result = value; - } - + opal_common_ucx_store_uint64(value, result, op_size); return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result, op_size, remote_addr, rkey, req_handler, worker); } From 434c9055ee9bc2d951148def8e9fd2b2dba3b6fc Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 12:03:17 +0200 Subject: [PATCH 16/18] UCX osc: fall back to get-compare-put for unsupported datatypes Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 81 +++++++++++++++++++++++++------- opal/mca/common/ucx/common_ucx.h | 2 +- 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 6bafc4ea0b..46ce011599 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -722,6 +722,36 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, target_disp, target_count, target_dt, op, win, NULL); } +static int +do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr, + void *result_addr, struct ompi_datatype_t *dt, + int target, uint64_t remote_addr, + ompi_osc_ucx_module_t *module) +{ + int ret; + bool lock_acquired = false; + size_t dt_bytes; + if (!module->acc_single_intrinsic) { + ret = start_atomicity(module, target, &lock_acquired); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + + ompi_datatype_type_size(dt, &dt_bytes); + uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes); + uint64_t value = opal_common_ucx_load_uint64(origin_addr, dt_bytes); + ret = opal_common_ucx_wpmem_cmpswp_nb(module->mem, compare_val, value, target, + result_addr, dt_bytes, remote_addr, + NULL, NULL); + + if (module->acc_single_intrinsic) { + return ret; + } + + return end_atomicity(module, target, lock_acquired, NULL); +} + int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr, void *result_addr, struct ompi_datatype_t *dt, int target, ptrdiff_t target_disp, @@ -732,23 +762,11 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a int ret = OMPI_SUCCESS; bool lock_acquired = false; - ompi_datatype_type_size(dt, &dt_bytes); - if (sizeof(uint64_t) < dt_bytes) { - return OMPI_ERR_NOT_SUPPORTED; - } - ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { return ret; } - if (!module->acc_single_intrinsic) { - ret = start_atomicity(module, target, &lock_acquired); - if (ret != OMPI_SUCCESS) { - return ret; - } - } - if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { ret = get_dynamic_win_info(remote_addr, module, target); if (ret != OMPI_SUCCESS) { @@ -756,16 +774,43 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a } } - uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes); - uint64_t value = opal_common_ucx_load_uint64(origin_addr, dt_bytes); - ret = opal_common_ucx_wpmem_cmpswp_nb(module->mem, compare_val, value, target, - result_addr, dt_bytes, remote_addr, - NULL, NULL); + ompi_datatype_type_size(dt, &dt_bytes); + if (4 == dt_bytes || 8 == dt_bytes) { + // fast path using UCX atomic operations + return do_atomic_compare_and_swap(origin_addr, compare_addr, + result_addr, dt, target, + remote_addr, module); + } - if (module->acc_single_intrinsic) { + /* fall back to get-compare-put */ + + ret = start_atomicity(module, target, &lock_acquired); + if (ret != OMPI_SUCCESS) { return ret; } + ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_GET, target, + &result_addr, dt_bytes, remote_addr); + if (OPAL_SUCCESS != ret) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_putget failed: %d", ret); + return OMPI_ERROR; + } + + ret = opal_common_ucx_wpmem_flush(module->mem, OPAL_COMMON_UCX_SCOPE_EP, target); + if (ret != OPAL_SUCCESS) { + return ret; + } + + if (0 == memcmp(result_addr, compare_addr, dt_bytes)) { + // write the new value + ret = opal_common_ucx_wpmem_putget(module->mem, OPAL_COMMON_UCX_PUT, target, + (void*)origin_addr, dt_bytes, remote_addr); + if (OPAL_SUCCESS != ret) { + OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_putget failed: %d", ret); + return OMPI_ERROR; + } + } + return end_atomicity(module, target, lock_acquired, NULL); } diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index f877742a2d..2baf6ea946 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -120,7 +120,7 @@ OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t * * Load an integer value of \c size bytes from \c ptr and cast it to uint64_t. */ static inline -uint64_t opal_common_ucx_load_uint64(void *ptr, size_t size) +uint64_t opal_common_ucx_load_uint64(const void *ptr, size_t size) { if (sizeof(uint8_t) == size) { return *(uint8_t*)ptr; From e215eff43d20818c28ccc720fbc3b38000940973 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 11:20:37 +0200 Subject: [PATCH 17/18] UCX osc: atomic fetch-and-op only on 32 and 64bit values Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index 46ce011599..c4e7bcaaa7 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -24,6 +24,11 @@ return OMPI_ERROR; \ } +/* macro to check whether UCX supports atomic operation on the size the operands */ +#define ATOMIC_SIZE_SUPPORTED(_remote_addr, _size) \ + ((sizeof(uint32_t) == (_size) && !((_remote_addr) & 0x3)) || \ + (sizeof(uint64_t) == (_size) && !((_remote_addr) & 0x7))) + typedef struct ucx_iovec { void *addr; size_t len; @@ -367,6 +372,7 @@ static inline bool use_atomic_op( ompi_osc_ucx_module_t *module, struct ompi_op_t *op, + uint64_t remote_addr, struct ompi_datatype_t *origin_dt, struct ompi_datatype_t *target_dt, int origin_count, @@ -384,9 +390,8 @@ bool use_atomic_op( ompi_datatype_type_size(origin_dt, &origin_dt_bytes); ompi_datatype_type_size(target_dt, &target_dt_bytes); /* UCX only supports 32 and 64-bit operands atm */ - if (sizeof(uint64_t) >= origin_dt_bytes && - sizeof(uint32_t) <= origin_dt_bytes && - origin_dt_bytes == target_dt_bytes && + if (ATOMIC_SIZE_SUPPORTED(remote_addr, origin_dt_bytes) && + origin_dt_bytes == target_dt_bytes && origin_count == target_count) { return true; } @@ -603,7 +608,7 @@ int accumulate_req(const void *origin_addr, int origin_count, } /* rely on UCX network atomics if the user told us that it safe */ - if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) { + if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) { return do_atomic_op_intrinsic(module, op, target, origin_addr, origin_count, origin_dt, target_disp, NULL, ucx_req); @@ -775,7 +780,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a } ompi_datatype_type_size(dt, &dt_bytes); - if (4 == dt_bytes || 8 == dt_bytes) { + if (ATOMIC_SIZE_SUPPORTED(remote_addr, dt_bytes)) { // fast path using UCX atomic operations return do_atomic_compare_and_swap(origin_addr, compare_addr, result_addr, dt, target, @@ -818,6 +823,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, struct ompi_datatype_t *dt, int target, ptrdiff_t target_disp, struct ompi_op_t *op, struct ompi_win_t *win) { + size_t dt_bytes; ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; int ret = OMPI_SUCCESS; @@ -826,12 +832,15 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, return ret; } - if (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op || - op == &ompi_mpi_op_sum.op) { + uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); + ompi_datatype_type_size(dt, &dt_bytes); + + /* UCX atomics are only supported on 32 and 64 bit values */ + if (ATOMIC_SIZE_SUPPORTED(remote_addr, dt_bytes) && + (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op || + op == &ompi_mpi_op_sum.op)) { uint64_t value; - uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target); ucp_atomic_fetch_op_t opcode; - size_t dt_bytes; bool lock_acquired = false; if (!module->acc_single_intrinsic) { @@ -894,7 +903,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count, } /* rely on UCX network atomics if the user told us that it safe */ - if (use_atomic_op(module, op, origin_dt, target_dt, origin_count, target_count)) { + if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) { return do_atomic_op_intrinsic(module, op, target, origin_addr, origin_count, origin_dt, target_disp, result_addr, ucx_req); From e3b417c776f53d423fb1d9d1f5cb89b7a96cf2e7 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 3 Apr 2020 12:22:42 +0200 Subject: [PATCH 18/18] Add missing copyright header Signed-off-by: Joseph Schuchart --- ompi/mca/osc/ucx/osc_ucx_comm.c | 4 +++- opal/mca/common/ucx/common_ucx.h | 2 ++ opal/mca/common/ucx/common_ucx_wpool.h | 11 +++++++++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index c4e7bcaaa7..86189ca0f6 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -1,5 +1,7 @@ /* - * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * Copyright (C) 2001-2017 Mellanox Technologies Ltd. ALL RIGHTS RESERVED. + * Copyright (c) 2019-2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow diff --git a/opal/mca/common/ucx/common_ucx.h b/opal/mca/common/ucx/common_ucx.h index 2baf6ea946..a6d167d35d 100644 --- a/opal/mca/common/ucx/common_ucx.h +++ b/opal/mca/common/ucx/common_ucx.h @@ -3,6 +3,8 @@ * All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2019-2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow diff --git a/opal/mca/common/ucx/common_ucx_wpool.h b/opal/mca/common/ucx/common_ucx_wpool.h index 95308fd56c..230bedc9a8 100644 --- a/opal/mca/common/ucx/common_ucx_wpool.h +++ b/opal/mca/common/ucx/common_ucx_wpool.h @@ -1,3 +1,14 @@ +/* + * Copyright (C) 2001-2017 Mellanox Technologies Ltd. ALL RIGHTS RESERVED. + * Copyright (c) 2019-2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + #ifndef COMMON_UCX_WPOOL_H #define COMMON_UCX_WPOOL_H