1
1

MCA/COMMON/UCX: changed return type for wait_request

- for now wait_request returns OMPI status
- updated callers

Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
Этот коммит содержится в:
Sergey Oblomov 2018-07-04 23:29:38 +03:00
родитель 586c16be70
Коммит 8080283b3d
8 изменённых файлов: 122 добавлений и 212 удалений

Просмотреть файл

@ -60,7 +60,7 @@ static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *modu
int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
ucs_status_t status;
int ret;
if (module->epoch_type.access != NONE_EPOCH &&
module->epoch_type.access != FENCE_EPOCH) {
@ -74,12 +74,9 @@ int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) {
}
if (!(assert & MPI_MODE_NOPRECEDE)) {
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
}
@ -176,12 +173,9 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
module->epoch_type.access = NONE_EPOCH;
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
module->global_ops_num = 0;
memset(module->per_target_ops_nums, 0,
@ -201,12 +195,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
__FILE__, __LINE__, status);
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
}
opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
}
OBJ_RELEASE(module->start_group);
@ -232,7 +221,6 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
ompi_group_t *win_group = NULL;
int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL;
int myrank = ompi_comm_rank(module->comm);
ucs_status_t status;
size = ompi_group_size(module->post_group);
ranks_in_grp = malloc(sizeof(int) * size);
@ -260,14 +248,9 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
uint64_t curr_idx = 0, result = 0;
/* do fop first to get an post index */
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
&result, sizeof(result),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
__FILE__, __LINE__, status);
}
opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
&result, sizeof(result),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1);
@ -275,14 +258,9 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
/* do cas to send post message */
do {
status = opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result,
sizeof(result), remote_addr, rkey,
mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_cswap64 failed: %d\n",
__FILE__, __LINE__, status);
}
opal_common_ucx_atomic_cswap(ep, 0, (uint64_t)myrank + 1, &result,
sizeof(result), remote_addr, rkey,
mca_osc_ucx_component.ucp_worker);
if (result == 0)
break;

Просмотреть файл

@ -61,17 +61,14 @@ static inline int check_sync_state(ompi_osc_ucx_module_t *module, int target,
static inline int incr_and_check_ops_num(ompi_osc_ucx_module_t *module, int target,
ucp_ep_h ep) {
ucs_status_t status;
int status;
module->global_ops_num++;
module->per_target_ops_nums[target]++;
if (module->global_ops_num >= OSC_UCX_OPS_THRESHOLD) {
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
if (status != OMPI_SUCCESS) {
return status;
}
module->global_ops_num -= module->per_target_ops_nums[target];
module->per_target_ops_nums[target] = 0;
@ -309,16 +306,13 @@ static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int
uint64_t result_value = 0;
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET;
ucs_status_t status;
int ret;
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_swap64 failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (OMPI_SUCCESS != ret) {
return ret;
}
assert(result_value == TARGET_LOCK_EXCLUSIVE);
@ -336,6 +330,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
uint64_t win_count;
int contain, insert = -1;
ucs_status_t status;
int ret;
if ((module->win_info_array[target]).rkey_init == true) {
ucp_rkey_destroy((module->win_info_array[target]).rkey);
@ -350,12 +345,9 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
return OMPI_ERROR;
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
memcpy(&win_count, temp_buf, sizeof(uint64_t));
@ -529,7 +521,6 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
uint32_t temp_count;
ompi_datatype_t *temp_dt;
ptrdiff_t temp_lb, temp_extent;
ucs_status_t status;
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
if (ompi_datatype_is_predefined(target_dt)) {
@ -553,12 +544,9 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
return ret;
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
@ -610,12 +598,9 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
return ret;
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
free(temp_addr_holder);
@ -781,7 +766,6 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
uint32_t temp_count;
ompi_datatype_t *temp_dt;
ptrdiff_t temp_lb, temp_extent;
ucs_status_t status;
bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count);
if (ompi_datatype_is_predefined(target_dt)) {
@ -805,12 +789,9 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
return ret;
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) {
@ -861,12 +842,9 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
return ret;
}
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
free(temp_addr_holder);

Просмотреть файл

@ -24,16 +24,14 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
ucs_status_t status;
int ret;
while (true) {
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_fadd64 failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_FADD, 1,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (OMPI_SUCCESS != ret) {
return ret;
}
assert((int64_t)result_value >= 0);
if (result_value >= TARGET_LOCK_EXCLUSIVE) {
@ -99,16 +97,13 @@ static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
ucp_rkey_h rkey = (module->state_info_array)[target].rkey;
uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET;
ucs_status_t status;
int ret;
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_atomic_swap64 failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
&result_value, sizeof(result_value),
remote_addr, rkey, mca_osc_ucx_component.ucp_worker);
if (OMPI_SUCCESS != ret) {
return ret;
}
assert(result_value >= TARGET_LOCK_EXCLUSIVE);
@ -169,7 +164,6 @@ int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *
int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
ompi_osc_ucx_lock_t *lock = NULL;
ucs_status_t status;
int ret = OMPI_SUCCESS;
ucp_ep_h ep;
@ -186,12 +180,9 @@ int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) {
(uint32_t)target);
ep = OSC_UCX_GET_EP(module->comm, target);
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
module->global_ops_num -= module->per_target_ops_nums[target];
@ -252,7 +243,6 @@ int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module;
int comm_size = ompi_comm_size(module->comm);
ucs_status_t status;
int ret = OMPI_SUCCESS;
if (module->epoch_type.access != PASSIVE_ALL_EPOCH) {
@ -261,12 +251,9 @@ int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
assert(module->lock_count == 0);
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
module->global_ops_num = 0;
@ -309,7 +296,7 @@ int ompi_osc_ucx_sync(struct ompi_win_t *win) {
int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
ucp_ep_h ep;
ucs_status_t status;
int ret;
if (module->epoch_type.access != PASSIVE_EPOCH &&
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
@ -317,12 +304,9 @@ int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
}
ep = OSC_UCX_GET_EP(module->comm, target);
status = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_ep_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_ep_flush(ep, mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
module->global_ops_num -= module->per_target_ops_nums[target];
@ -333,19 +317,16 @@ int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) {
int ompi_osc_ucx_flush_all(struct ompi_win_t *win) {
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
ucs_status_t status;
int ret;
if (module->epoch_type.access != PASSIVE_EPOCH &&
module->epoch_type.access != PASSIVE_ALL_EPOCH) {
return OMPI_ERR_RMA_SYNC;
}
status = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (status != UCS_OK) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ucp_worker_flush failed: %d\n",
__FILE__, __LINE__, status);
return OMPI_ERROR;
ret = opal_common_ucx_worker_flush(mca_osc_ucx_component.ucp_worker);
if (ret != OMPI_SUCCESS) {
return ret;
}
module->global_ops_num = 0;

Просмотреть файл

@ -370,16 +370,11 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
static void mca_pml_ucx_waitall(void **reqs, int *count_p)
{
ucs_status_t status;
int i;
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
status = opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker);
if (status != UCS_OK) {
PML_UCX_ERROR("disconnect request failed: %s",
ucs_status_string(status));
}
opal_common_ucx_wait_request(reqs[i], ompi_pml_ucx.ucp_worker);
reqs[i] = NULL;
}

Просмотреть файл

@ -31,7 +31,7 @@ OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t
OPAL_DECLSPEC void opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
static inline
ucs_status_t opal_common_ucx_wait_request_internal(ucs_status_ptr_t request, ucp_worker_h worker)
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker)
{
ucs_status_t status;
int i;
@ -39,6 +39,14 @@ ucs_status_t opal_common_ucx_wait_request_internal(ucs_status_ptr_t request, ucp
ucp_tag_recv_info_t info;
#endif
/* check for request completed or failed */
if (OPAL_LIKELY(UCS_OK == request)) {
return OPAL_SUCCESS;
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
/* TODO: add diagnostic here */
return OPAL_ERROR;
}
while (1) {
/* call UCX progress */
for (i = 0; i < opal_common_ucx_progress_iterations; i++) {
@ -61,63 +69,42 @@ ucs_status_t opal_common_ucx_wait_request_internal(ucs_status_ptr_t request, ucp
}
static inline
int opal_common_ucx_wait_request_opal_status(ucs_status_ptr_t request, ucp_worker_h worker)
{
/* check for request completed or failed */
if (OPAL_LIKELY(UCS_OK == request)) {
return OPAL_SUCCESS;
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
return OPAL_ERROR;
}
return opal_common_ucx_wait_request_internal(request, worker) == UCS_OK ?
OPAL_SUCCESS : OPAL_ERROR;
}
static inline
ucs_status_t opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker)
{
/* check for request completed or failed */
if (OPAL_LIKELY(UCS_OK == request)) {
return UCS_OK;
} else if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(request))) {
return UCS_PTR_STATUS(request);
}
return opal_common_ucx_wait_request_internal(request, worker);
}
static inline
ucs_status_t opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
int opal_common_ucx_ep_flush(ucp_ep_h ep, ucp_worker_h worker)
{
#if HAVE_DECL_UCP_EP_FLUSH_NB
ucs_status_ptr_t status;
ucs_status_ptr_t request;
status = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(status, worker);
request = ucp_ep_flush_nb(ep, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, worker);
#else
return ucp_ep_flush(ep);
ucs_status_t status;
status = ucp_ep_flush(ep);
return (status == UCS_OK) ? OMPI_SUCCESS : OMPI_ERROR;
#endif
}
static inline
ucs_status_t opal_common_ucx_worker_flush(ucp_worker_h worker)
int opal_common_ucx_worker_flush(ucp_worker_h worker)
{
#if HAVE_DECL_UCP_WORKER_FLUSH_NB
ucs_status_ptr_t status;
ucs_status_ptr_t request;
status = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(status, worker);
request = ucp_worker_flush_nb(worker, 0, opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request(request, worker);
#else
return ucp_worker_flush(worker);
ucs_status_t status;
status = ucp_worker_flush(worker);
return (status == UCS_OK) ? OMPI_SUCCESS : OMPI_ERROR;
#endif
}
static inline
ucs_status_t opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker)
int opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker)
{
ucs_status_ptr_t request;
@ -127,17 +114,17 @@ ucs_status_t opal_common_ucx_atomic_fetch(ucp_ep_h ep, ucp_atomic_fetch_op_t opc
}
static inline
ucs_status_t opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker)
int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_worker_h worker)
{
uint64_t tmp = value;
ucs_status_t status;
int ret;
status = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
op_size, remote_addr, rkey, worker);
if (OPAL_LIKELY(UCS_OK == status)) {
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
op_size, remote_addr, rkey, worker);
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
/* in case if op_size is constant (like sizeof(type)) then this condition
* is evaluated in compile time */
if (op_size == sizeof(uint64_t)) {
@ -147,7 +134,7 @@ ucs_status_t opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
*(uint32_t*)result = tmp;
}
}
return status;
return ret;
}
END_C_DECLS

Просмотреть файл

@ -38,10 +38,10 @@ int mca_atomic_ucx_cswap(void *target,
assert(NULL != prev);
*prev = value;
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, mca_spml_self);
status_ptr = ucp_atomic_fetch_nb(mca_spml_self->ucp_peers[pe].ucp_conn,
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request_opal_status(status_ptr, mca_spml_self->ucp_worker);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker);
}

Просмотреть файл

@ -40,14 +40,12 @@ int mca_atomic_ucx_op(void *target,
ucp_atomic_post_op_t op)
{
ucs_status_t status;
ucs_status_ptr_t status_ptr;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
uint64_t val;
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, mca_spml_self);
status = ucp_atomic_post(mca_spml_self->ucp_peers[pe].ucp_conn,
op, value, size, rva,
ucx_mkey->rkey);
@ -68,12 +66,12 @@ int mca_atomic_ucx_fop(void *target,
assert((8 == size) || (4 == size));
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, &mca_spml_ucx);
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva, mca_spml_self);
status_ptr = ucp_atomic_fetch_nb(mca_spml_self->ucp_peers[pe].ucp_conn,
op, value, prev, size,
rva, ucx_mkey->rkey,
opal_common_ucx_empty_complete_cb);
return opal_common_ucx_wait_request_opal_status(status_ptr, mca_spml_self->ucp_worker);
return opal_common_ucx_wait_request(status_ptr, mca_spml_self->ucp_worker);
}
static int mca_atomic_ucx_add(void *target,

Просмотреть файл

@ -98,16 +98,11 @@ int mca_spml_ucx_enable(bool enable)
static void mca_spml_ucx_waitall(void **reqs, int *count_p)
{
ucs_status_t status;
int i;
SPML_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
for (i = 0; i < *count_p; ++i) {
status = opal_common_ucx_wait_request(reqs[i], mca_spml_ucx.ucp_worker);
if (status != UCS_OK) {
SPML_ERROR("disconnect request failed: %s",
ucs_status_string(status));
}
opal_common_ucx_wait_request(reqs[i], mca_spml_ucx.ucp_worker);
reqs[i] = NULL;
}
@ -555,24 +550,23 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
#if HAVE_DECL_UCP_GET_NB
ucs_status_ptr_t request;
#else
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_GET_NB
request = ucp_get_nb(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
/* TODO: replace wait_request by opal_common_ucx_wait_request_opal_status */
status = opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
#else
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
return ucx_status_to_oshmem(status);
#endif
}
int mca_spml_ucx_get_nb(void *src_addr, size_t size, void *dst_addr, int src, void **handle)
@ -591,23 +585,23 @@ int mca_spml_ucx_get_nb(void *src_addr, size_t size, void *dst_addr, int src, vo
int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
#if HAVE_DECL_UCP_PUT_NB
ucs_status_ptr_t request;
#else
ucs_status_t status;
#endif
ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva, &mca_spml_ucx);
#if HAVE_DECL_UCP_PUT_NB
request = ucp_put_nb(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
/* TODO: replace wait_request by opal_common_ucx_wait_request_opal_status */
status = opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
return opal_common_ucx_wait_request(request, mca_spml_ucx.ucp_worker);
#else
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
#endif
return ucx_status_to_oshmem(status);
#endif
}
int mca_spml_ucx_put_nb(void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
@ -638,13 +632,12 @@ int mca_spml_ucx_fence(void)
int mca_spml_ucx_quiet(void)
{
ucs_status_t err;
int ret;
err = opal_common_ucx_worker_flush(mca_spml_ucx.ucp_worker);
if (UCS_OK != err) {
SPML_ERROR("quiet failed: %s", ucs_status_string(err));
ret = opal_common_ucx_worker_flush(mca_spml_ucx.ucp_worker);
if (OMPI_SUCCESS != ret) {
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
return ret;
}
return OSHMEM_SUCCESS;
}