diff --git a/ompi/mca/osc/ucx/Makefile.am b/ompi/mca/osc/ucx/Makefile.am new file mode 100644 index 0000000000..8db7383e23 --- /dev/null +++ b/ompi/mca/osc/ucx/Makefile.am @@ -0,0 +1,42 @@ +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +ucx_sources = \ + osc_ucx.h \ + osc_ucx_request.h \ + osc_ucx_comm.c \ + osc_ucx_component.c \ + osc_ucx_request.c \ + osc_ucx_active_target.c \ + osc_ucx_passive_target.c + +AM_CPPFLAGS = $(osc_ucx_CPPFLAGS) + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_ompi_osc_ucx_DSO +component_noinst = +component_install = mca_osc_ucx.la +else +component_noinst = libmca_osc_ucx.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_osc_ucx_la_SOURCES = $(ucx_sources) +mca_osc_ucx_la_LIBADD = $(osc_ucx_LIBS) +mca_osc_ucx_la_LDFLAGS = -module -avoid-version $(osc_ucx_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_osc_ucx_la_SOURCES = $(ucx_sources) +libmca_osc_ucx_la_LIBADD = $(osc_ucx_LIBS) +libmca_osc_ucx_la_LDFLAGS = -module -avoid-version $(osc_ucx_LDFLAGS) diff --git a/ompi/mca/osc/ucx/configure.m4 b/ompi/mca/osc/ucx/configure.m4 new file mode 100644 index 0000000000..72f5527d97 --- /dev/null +++ b/ompi/mca/osc/ucx/configure.m4 @@ -0,0 +1,36 @@ +# -*- shell-script -*- +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_ompi_osc_ucx_POST_CONFIG(will_build) +# ---------------------------------------- +# Only require the tag if we're actually going to be built +AC_DEFUN([MCA_ompi_osc_ucx_POST_CONFIG], [ + AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([UCX])]) +])dnl + +# MCA_osc_ucx_CONFIG(action-if-can-compile, +# [action-if-cant-compile]) +# ------------------------------------------------ +AC_DEFUN([MCA_ompi_osc_ucx_CONFIG],[ + AC_CONFIG_FILES([ompi/mca/osc/ucx/Makefile]) + + OMPI_CHECK_UCX([osc_ucx], + [osc_ucx_happy="yes"], + [osc_ucx_happy="no"]) + + AS_IF([test "$osc_ucx_happy" = "yes"], + [$1], + [$2]) + + # substitute in the things needed to build ucx + AC_SUBST([osc_ucx_CPPFLAGS]) + AC_SUBST([osc_ucx_LDFLAGS]) + AC_SUBST([osc_ucx_LIBS]) +])dnl diff --git a/ompi/mca/osc/ucx/osc_ucx.h b/ompi/mca/osc/ucx/osc_ucx.h new file mode 100644 index 0000000000..7c8f6930dd --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx.h @@ -0,0 +1,190 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_UCX_H +#define OMPI_OSC_UCX_H + +#include + +#include "ompi/group/group.h" +#include "ompi/communicator/communicator.h" + +#define OMPI_OSC_UCX_POST_PEER_MAX 32 + +typedef struct ompi_osc_ucx_win_info { + ucp_rkey_h rkey; + uint64_t addr; +} ompi_osc_ucx_win_info_t; + +typedef struct ompi_osc_ucx_component { + ompi_osc_base_component_t super; + ucp_context_h ucp_context; + ucp_worker_h ucp_worker; + bool enable_mpi_threads; + opal_free_list_t requests; /* request free list for the r* communication variants */ + int num_incomplete_req_ops; +} ompi_osc_ucx_component_t; + +OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component; + +typedef enum ompi_osc_ucx_epoch { + NONE_EPOCH, + FENCE_EPOCH, + POST_WAIT_EPOCH, + START_COMPLETE_EPOCH, + PASSIVE_EPOCH, + PASSIVE_ALL_EPOCH +} ompi_osc_ucx_epoch_t; + +typedef struct ompi_osc_ucx_epoch_type { + ompi_osc_ucx_epoch_t access; + ompi_osc_ucx_epoch_t exposure; +} ompi_osc_ucx_epoch_type_t; + +#define TARGET_LOCK_UNLOCKED ((uint64_t)(0x0000000000000000ULL)) +#define TARGET_LOCK_EXCLUSIVE ((uint64_t)(0x0000000100000000ULL)) + +#define OSC_UCX_IOVEC_MAX 128 +#define OSC_UCX_OPS_THRESHOLD 1000000 + +#define OSC_UCX_STATE_LOCK_OFFSET 0 +#define OSC_UCX_STATE_REQ_FLAG_OFFSET sizeof(uint64_t) +#define OSC_UCX_STATE_ACC_LOCK_OFFSET (sizeof(uint64_t) * 2) +#define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3) +#define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4) +#define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5) + +typedef struct ompi_osc_ucx_state { + volatile uint64_t lock; + volatile uint64_t req_flag; + volatile uint64_t acc_lock; + volatile uint64_t complete_count; /* # msgs received from complete processes */ + volatile uint64_t post_index; + volatile uint64_t post_state[OMPI_OSC_UCX_POST_PEER_MAX]; +} ompi_osc_ucx_state_t; + +typedef struct ompi_osc_ucx_module { + ompi_osc_base_module_t super; + struct ompi_communicator_t *comm; + ucp_mem_h memh; /* remote accessible memory */ + ucp_mem_h state_memh; + ompi_osc_ucx_win_info_t *win_info_array; + ompi_osc_ucx_win_info_t *state_info_array; + int disp_unit; /* if disp_unit >= 0, then everyone has the same + * disp unit size; if disp_unit == -1, then we + * need to look at disp_units */ + int *disp_units; + + ompi_osc_ucx_state_t state; /* remote accessible flags */ + ompi_osc_ucx_epoch_type_t epoch_type; + ompi_group_t *start_group; + ompi_group_t *post_group; + opal_hash_table_t outstanding_locks; + opal_list_t pending_posts; + int lock_count; + int post_count; + int global_ops_num; + int *per_target_ops_nums; + uint64_t req_result; + int *start_grp_ranks; + bool lock_all_is_nocheck; +} ompi_osc_ucx_module_t; + +typedef enum locktype { + LOCK_EXCLUSIVE, + LOCK_SHARED +} lock_type_t; + +typedef struct ompi_osc_ucx_lock { + opal_object_t super; + int target_rank; + lock_type_t type; + bool is_nocheck; +} ompi_osc_ucx_lock_t; + +#define OSC_UCX_GET_EP(comm_, rank_) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX]) +#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit) + +int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len); +int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base); +int ompi_osc_ucx_free(struct ompi_win_t *win); + +int ompi_osc_ucx_put(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win); +int ompi_osc_ucx_get(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win); +int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win); +int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr, + void *result_addr, struct ompi_datatype_t *dt, + int target, ptrdiff_t target_disp, + struct ompi_win_t *win); +int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, + struct ompi_datatype_t *dt, int target, + ptrdiff_t target_disp, struct ompi_op_t *op, + struct ompi_win_t *win); +int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_datatype, + void *result_addr, int result_count, + struct ompi_datatype_t *result_datatype, + int target_rank, ptrdiff_t target_disp, + int target_count, struct ompi_datatype_t *target_datatype, + struct ompi_op_t *op, struct ompi_win_t *win); +int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, struct ompi_request_t **request); +int ompi_osc_ucx_rget(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win, + struct ompi_request_t **request); +int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_op_t *op, + struct ompi_win_t *win, struct ompi_request_t **request); +int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_datatype, + void *result_addr, int result_count, + struct ompi_datatype_t *result_datatype, + int target_rank, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_datatype, + struct ompi_op_t *op, struct ompi_win_t *win, + struct ompi_request_t **request); + +int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win); +int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win); +int ompi_osc_ucx_complete(struct ompi_win_t *win); +int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win); +int ompi_osc_ucx_wait(struct ompi_win_t *win); +int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag); + +int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win); +int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win); +int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win); +int ompi_osc_ucx_unlock_all(struct ompi_win_t *win); +int ompi_osc_ucx_sync(struct ompi_win_t *win); +int ompi_osc_ucx_flush(int target, struct ompi_win_t *win); +int ompi_osc_ucx_flush_all(struct ompi_win_t *win); +int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win); +int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win); + +void req_completion(void *request, ucs_status_t status); +void internal_req_init(void *request); + +#endif /* OMPI_OSC_UCX_H */ diff --git a/ompi/mca/osc/ucx/osc_ucx_active_target.c b/ompi/mca/osc/ucx/osc_ucx_active_target.c new file mode 100644 index 0000000000..50eebdb19f --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_active_target.c @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2010 IBM Corporation. All rights reserved. + * Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2017 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_ucx.h" + +typedef struct ompi_osc_ucx_pending_post { + opal_list_item_t super; + int rank; +} ompi_osc_ucx_pending_post_t; + +OBJ_CLASS_INSTANCE(ompi_osc_ucx_pending_post_t, opal_list_item_t, NULL, NULL); + +static inline void ompi_osc_ucx_handle_incoming_post(ompi_osc_ucx_module_t *module, volatile uint64_t *post_ptr, int ranks_in_win_grp[], int grp_size) { + int i, post_rank = (*post_ptr) - 1; + ompi_osc_ucx_pending_post_t *pending_post = NULL; + + (*post_ptr) = 0; + + for (i = 0; i < grp_size; i++) { + if (post_rank == ranks_in_win_grp[i]) { + module->post_count++; + return; + } + } + + /* post does not belong to this start epoch. save it for later */ + pending_post = OBJ_NEW(ompi_osc_ucx_pending_post_t); + pending_post->rank = post_rank; + opal_list_append(&module->pending_posts, &pending_post->super); +} + +int ompi_osc_ucx_fence(int assert, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucs_status_t status; + + if (module->epoch_type.access != NONE_EPOCH && + module->epoch_type.access != FENCE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + if (assert & MPI_MODE_NOSUCCEED) { + module->epoch_type.access = NONE_EPOCH; + } else { + module->epoch_type.access = FENCE_EPOCH; + } + + if (!(assert & MPI_MODE_NOPRECEDE)) { + status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + module->global_ops_num = 0; + memset(module->per_target_ops_nums, 0, + sizeof(int) * ompi_comm_size(module->comm)); + + return module->comm->c_coll->coll_barrier(module->comm, + module->comm->c_coll->coll_barrier_module); +} + +int ompi_osc_ucx_start(struct ompi_group_t *group, int assert, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int i, size, *ranks_in_grp = NULL, *ranks_in_win_grp = NULL; + ompi_group_t *win_group = NULL; + int ret = OMPI_SUCCESS; + + if (module->epoch_type.access != NONE_EPOCH && + module->epoch_type.access != FENCE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + module->epoch_type.access = START_COMPLETE_EPOCH; + + OBJ_RETAIN(group); + module->start_group = group; + size = ompi_group_size(module->start_group); + + ranks_in_grp = malloc(sizeof(int) * size); + ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm)); + + for (i = 0; i < size; i++) { + ranks_in_grp[i] = i; + } + + ret = ompi_comm_group(module->comm, &win_group); + if (ret != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + ret = ompi_group_translate_ranks(module->start_group, size, ranks_in_grp, + win_group, ranks_in_win_grp); + if (ret != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + if ((assert & MPI_MODE_NOCHECK) == 0) { + ompi_osc_ucx_pending_post_t *pending_post, *next; + + /* first look through the pending list */ + OPAL_LIST_FOREACH_SAFE(pending_post, next, &module->pending_posts, ompi_osc_ucx_pending_post_t) { + for (i = 0; i < size; i++) { + if (pending_post->rank == ranks_in_win_grp[i]) { + opal_list_remove_item(&module->pending_posts, &pending_post->super); + OBJ_RELEASE(pending_post); + module->post_count++; + break; + } + } + } + + /* waiting for the rest post requests to come */ + while (module->post_count != size) { + for (i = 0; i < OMPI_OSC_UCX_POST_PEER_MAX; i++) { + if (0 == module->state.post_state[i]) { + continue; + } + + ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[i]), ranks_in_win_grp, size); + } + ucp_worker_progress(mca_osc_ucx_component.ucp_worker); + } + + module->post_count = 0; + } + + free(ranks_in_grp); + ompi_group_free(&win_group); + + module->start_grp_ranks = ranks_in_win_grp; + + return ret; +} + +int ompi_osc_ucx_complete(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucs_status_t status; + int i, size; + int ret = OMPI_SUCCESS; + + if (module->epoch_type.access != START_COMPLETE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + module->epoch_type.access = NONE_EPOCH; + + status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + module->global_ops_num = 0; + memset(module->per_target_ops_nums, 0, + sizeof(int) * ompi_comm_size(module->comm)); + + size = ompi_group_size(module->start_group); + for (i = 0; i < size; i++) { + uint64_t remote_addr = (module->state_info_array)[module->start_grp_ranks[i]].addr + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; /* write to state.complete_count on remote side */ + ucp_rkey_h rkey = (module->state_info_array)[module->start_grp_ranks[i]].rkey; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, module->start_grp_ranks[i]); + + status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, 1, + sizeof(uint64_t), remote_addr, rkey); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_post failed: %d\n", + __FILE__, __LINE__, status); + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + } + } + + OBJ_RELEASE(module->start_group); + module->start_group = NULL; + free(module->start_grp_ranks); + + return ret; +} + +int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int ret = OMPI_SUCCESS; + + if (module->epoch_type.exposure != NONE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + OBJ_RETAIN(group); + module->post_group = group; + + if ((assert & MPI_MODE_NOCHECK) == 0) { + int i, j, size; + ompi_group_t *win_group = NULL; + int *ranks_in_grp = NULL, *ranks_in_win_grp = NULL; + int myrank = ompi_comm_rank(module->comm); + ucs_status_t status; + + size = ompi_group_size(module->post_group); + ranks_in_grp = malloc(sizeof(int) * size); + ranks_in_win_grp = malloc(sizeof(int) * ompi_comm_size(module->comm)); + + for (i = 0; i < size; i++) { + ranks_in_grp[i] = i; + } + + ret = ompi_comm_group(module->comm, &win_group); + if (ret != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + ret = ompi_group_translate_ranks(module->post_group, size, ranks_in_grp, + win_group, ranks_in_win_grp); + if (ret != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + for (i = 0; i < size; i++) { + uint64_t remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_INDEX_OFFSET; /* write to state.post_index on remote side */ + ucp_rkey_h rkey = (module->state_info_array)[ranks_in_win_grp[i]].rkey; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, ranks_in_win_grp[i]); + uint64_t curr_idx = 0, result = 0; + + /* do fop first to get an post index */ + status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_fadd64 failed: %d\n", + __FILE__, __LINE__, status); + } + + curr_idx = result & (OMPI_OSC_UCX_POST_PEER_MAX - 1); + + remote_addr = (module->state_info_array)[ranks_in_win_grp[i]].addr + OSC_UCX_STATE_POST_STATE_OFFSET + sizeof(uint64_t) * curr_idx; + + /* do cas to send post message */ + do { + status = ucp_atomic_cswap64(ep, 0, (uint64_t)myrank + 1, + remote_addr, rkey, &result); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_cswap64 failed: %d\n", + __FILE__, __LINE__, status); + } + + if (result == 0) + break; + + /* prevent circular wait by checking for post messages received */ + for (j = 0; j < OMPI_OSC_UCX_POST_PEER_MAX; j++) { + /* no post at this index (yet) */ + if (0 == module->state.post_state[j]) { + continue; + } + + ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0); + } + + usleep(100); + } while (1); + } + + free(ranks_in_grp); + free(ranks_in_win_grp); + ompi_group_free(&win_group); + } + + module->epoch_type.exposure = POST_WAIT_EPOCH; + + return ret; +} + +int ompi_osc_ucx_wait(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int size; + + if (module->epoch_type.exposure != POST_WAIT_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + size = ompi_group_size(module->post_group); + + while (module->state.complete_count != (uint64_t)size) { + /* not sure if this is required */ + ucp_worker_progress(mca_osc_ucx_component.ucp_worker); + } + + module->state.complete_count = 0; + + OBJ_RELEASE(module->post_group); + module->post_group = NULL; + + module->epoch_type.exposure = NONE_EPOCH; + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_test(struct ompi_win_t *win, int *flag) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int size; + + if (module->epoch_type.exposure != POST_WAIT_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + size = ompi_group_size(module->post_group); + + opal_progress(); + + if (module->state.complete_count == (uint64_t)size) { + OBJ_RELEASE(module->post_group); + module->post_group = NULL; + + module->state.complete_count = 0; + + module->epoch_type.exposure = NONE_EPOCH; + *flag = 1; + } else { + *flag = 0; + } + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c new file mode 100644 index 0000000000..ddab2c2d5b --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -0,0 +1,938 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_ucx.h" +#include "osc_ucx_request.h" + +typedef struct ucx_iovec { + void *addr; + size_t len; +} ucx_iovec_t; + +static inline int check_sync_state(ompi_osc_ucx_module_t *module, int target, + bool is_req_ops) { + if (is_req_ops == false) { + if (module->epoch_type.access == NONE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } else if (module->epoch_type.access == START_COMPLETE_EPOCH) { + int i, size = ompi_group_size(module->start_group); + for (i = 0; i < size; i++) { + if (module->start_grp_ranks[i] == target) { + break; + } + } + if (i == size) { + return OMPI_ERR_RMA_SYNC; + } + } else if (module->epoch_type.access == PASSIVE_EPOCH) { + ompi_osc_ucx_lock_t *item = NULL; + opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item); + if (item == NULL) { + return OMPI_ERR_RMA_SYNC; + } + } + } else { + if (module->epoch_type.access != PASSIVE_EPOCH && + module->epoch_type.access != PASSIVE_ALL_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } else if (module->epoch_type.access == PASSIVE_EPOCH) { + ompi_osc_ucx_lock_t *item = NULL; + opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item); + if (item == NULL) { + return OMPI_ERR_RMA_SYNC; + } + } + } + return OMPI_SUCCESS; +} + +static inline int incr_and_check_ops_num(ompi_osc_ucx_module_t *module, int target, + ucp_ep_h ep) { + ucs_status_t status; + + module->global_ops_num++; + module->per_target_ops_nums[target]++; + if (module->global_ops_num >= OSC_UCX_OPS_THRESHOLD) { + /* TODO: ucp_ep_flush needs to be replaced with its non-blocking counterpart + * when it is implemented in UCX */ + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + module->global_ops_num -= module->per_target_ops_nums[target]; + module->per_target_ops_nums[target] = 0; + } + return OMPI_SUCCESS; +} + +static inline int create_iov_list(const void *addr, int count, ompi_datatype_t *datatype, + ucx_iovec_t **ucx_iov, uint32_t *ucx_iov_count) { + int ret = OMPI_SUCCESS; + size_t size; + bool done = false; + opal_convertor_t convertor; + uint32_t iov_count, iov_idx; + struct iovec iov[OSC_UCX_IOVEC_MAX]; + uint32_t ucx_iov_idx; + + OBJ_CONSTRUCT(&convertor, opal_convertor_t); + ret = opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, + &datatype->super, count, + addr, 0, &convertor); + if (ret != OMPI_SUCCESS) { + return ret; + } + + (*ucx_iov_count) = 0; + ucx_iov_idx = 0; + + do { + iov_count = OSC_UCX_IOVEC_MAX; + iov_idx = 0; + + done = opal_convertor_raw(&convertor, iov, &iov_count, &size); + + (*ucx_iov_count) += iov_count; + (*ucx_iov) = (ucx_iovec_t *)realloc((*ucx_iov), (*ucx_iov_count) * sizeof(ucx_iovec_t)); + if (*ucx_iov == NULL) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + while (iov_idx != iov_count) { + (*ucx_iov)[ucx_iov_idx].addr = iov[iov_idx].iov_base; + (*ucx_iov)[ucx_iov_idx].len = iov[iov_idx].iov_len; + ucx_iov_idx++; + iov_idx++; + } + + assert((*ucx_iov_count) == ucx_iov_idx); + + } while (!done); + + opal_convertor_cleanup(&convertor); + OBJ_DESTRUCT(&convertor); + + return ret; +} + +static inline int ddt_put_get(ompi_osc_ucx_module_t *module, + const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + bool is_origin_contig, ptrdiff_t origin_lb, + int target, ucp_ep_h ep, uint64_t remote_addr, ucp_rkey_h rkey, + int target_count, struct ompi_datatype_t *target_dt, + bool is_target_contig, ptrdiff_t target_lb, bool is_get) { + ucx_iovec_t *origin_ucx_iov = NULL, *target_ucx_iov = NULL; + uint32_t origin_ucx_iov_count = 0, target_ucx_iov_count = 0; + uint32_t origin_ucx_iov_idx = 0, target_ucx_iov_idx = 0; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + if (!is_origin_contig) { + ret = create_iov_list(origin_addr, origin_count, origin_dt, + &origin_ucx_iov, &origin_ucx_iov_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + + if (!is_target_contig) { + ret = create_iov_list(NULL, target_count, target_dt, + &target_ucx_iov, &target_ucx_iov_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + + if (!is_origin_contig && !is_target_contig) { + size_t curr_len = 0; + while (origin_ucx_iov_idx < origin_ucx_iov_count) { + curr_len = MIN(origin_ucx_iov[origin_ucx_iov_idx].len, + target_ucx_iov[target_ucx_iov_idx].len); + + if (!is_get) { + status = ucp_put_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len, + remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_put_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } else { + status = ucp_get_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, curr_len, + remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_get_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + ret = incr_and_check_ops_num(module, target, ep); + if (ret != OMPI_SUCCESS) { + return ret; + } + + origin_ucx_iov[origin_ucx_iov_idx].addr = (void *)((intptr_t)origin_ucx_iov[origin_ucx_iov_idx].addr + curr_len); + target_ucx_iov[target_ucx_iov_idx].addr = (void *)((intptr_t)target_ucx_iov[target_ucx_iov_idx].addr + curr_len); + + origin_ucx_iov[origin_ucx_iov_idx].len -= curr_len; + if (origin_ucx_iov[origin_ucx_iov_idx].len == 0) { + origin_ucx_iov_idx++; + } + target_ucx_iov[target_ucx_iov_idx].len -= curr_len; + if (target_ucx_iov[target_ucx_iov_idx].len == 0) { + target_ucx_iov_idx++; + } + } + + assert(origin_ucx_iov_idx == origin_ucx_iov_count && + target_ucx_iov_idx == target_ucx_iov_count); + + } else if (!is_origin_contig) { + size_t prev_len = 0; + while (origin_ucx_iov_idx < origin_ucx_iov_count) { + if (!is_get) { + status = ucp_put_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, + origin_ucx_iov[origin_ucx_iov_idx].len, + remote_addr + target_lb + prev_len, rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_put_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } else { + status = ucp_get_nbi(ep, origin_ucx_iov[origin_ucx_iov_idx].addr, + origin_ucx_iov[origin_ucx_iov_idx].len, + remote_addr + target_lb + prev_len, rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_get_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + ret = incr_and_check_ops_num(module, target, ep); + if (ret != OMPI_SUCCESS) { + return ret; + } + + prev_len += origin_ucx_iov[origin_ucx_iov_idx].len; + origin_ucx_iov_idx++; + } + } else { + size_t prev_len = 0; + while (target_ucx_iov_idx < target_ucx_iov_count) { + if (!is_get) { + status = ucp_put_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb + prev_len), + target_ucx_iov[target_ucx_iov_idx].len, + remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_put_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } else { + status = ucp_get_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb + prev_len), + target_ucx_iov[target_ucx_iov_idx].len, + remote_addr + (uint64_t)(target_ucx_iov[target_ucx_iov_idx].addr), rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_get_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + ret = incr_and_check_ops_num(module, target, ep); + if (ret != OMPI_SUCCESS) { + return ret; + } + + prev_len += target_ucx_iov[target_ucx_iov_idx].len; + target_ucx_iov_idx++; + } + } + + if (origin_ucx_iov != NULL) { + free(origin_ucx_iov); + } + if (target_ucx_iov != NULL) { + free(target_ucx_iov); + } + + return ret; +} + +static inline int start_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int target) { + uint64_t result_value = -1; + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET; + ucs_status_t status; + + while (result_value != TARGET_LOCK_UNLOCKED) { + status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED, + TARGET_LOCK_EXCLUSIVE, + remote_addr, rkey, &result_value); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_cswap64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int target) { + uint64_t result_value = 0; + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_ACC_LOCK_OFFSET; + ucs_status_t status; + + status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED, + remote_addr, rkey, &result_value); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_swap64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + assert(result_value == TARGET_LOCK_EXCLUSIVE); + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); + ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + bool is_origin_contig = false, is_target_contig = false; + ptrdiff_t origin_lb, origin_extent, target_lb, target_extent; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent); + ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent); + + is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count); + is_target_contig = ompi_datatype_is_contiguous_memory_layout(target_dt, target_count); + + if (is_origin_contig && is_target_contig) { + /* fast path */ + size_t origin_len; + + ompi_datatype_type_size(origin_dt, &origin_len); + origin_len *= origin_count; + + status = ucp_put_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len, + remote_addr + target_lb, rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_put_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + return incr_and_check_ops_num(module, target, ep); + } else { + return ddt_put_get(module, origin_addr, origin_count, origin_dt, is_origin_contig, + origin_lb, target, ep, remote_addr, rkey, target_count, target_dt, + is_target_contig, target_lb, false); + } +} + +int ompi_osc_ucx_get(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); + ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + ptrdiff_t origin_lb, origin_extent, target_lb, target_extent; + bool is_origin_contig = false, is_target_contig = false; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent); + ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent); + + is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count); + is_target_contig = ompi_datatype_is_contiguous_memory_layout(target_dt, target_count); + + if (is_origin_contig && is_target_contig) { + /* fast path */ + size_t origin_len; + + ompi_datatype_type_size(origin_dt, &origin_len); + origin_len *= origin_count; + + status = ucp_get_nbi(ep, (void *)((intptr_t)origin_addr + origin_lb), origin_len, + remote_addr + target_lb, rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_get_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + return incr_and_check_ops_num(module, target, ep); + } else { + return ddt_put_get(module, origin_addr, origin_count, origin_dt, is_origin_contig, + origin_lb, target, ep, remote_addr, rkey, target_count, target_dt, + is_target_contig, target_lb, true); + } +} + +int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if (op == &ompi_mpi_op_no_op.op) { + return ret; + } + + ret = start_atomicity(module, ep, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, + target_disp, target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + } else { + void *temp_addr = NULL; + uint32_t temp_count; + ompi_datatype_t *temp_dt; + ptrdiff_t temp_lb, temp_extent; + ucs_status_t status; + bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count); + + if (ompi_datatype_is_predefined(target_dt)) { + temp_dt = target_dt; + temp_count = target_count; + } else { + ret = ompi_osc_base_get_primitive_type_info(target_dt, &temp_dt, &temp_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); + temp_addr = malloc(temp_extent * temp_count); + if (temp_addr == NULL) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_get(temp_addr, (int)temp_count, temp_dt, + target, target_disp, target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) { + ompi_op_reduce(op, (void *)origin_addr, temp_addr, (int)temp_count, temp_dt); + } else { + ucx_iovec_t *origin_ucx_iov = NULL; + uint32_t origin_ucx_iov_count = 0; + uint32_t origin_ucx_iov_idx = 0; + + ret = create_iov_list(origin_addr, origin_count, origin_dt, + &origin_ucx_iov, &origin_ucx_iov_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if ((op != &ompi_mpi_op_maxloc.op && op != &ompi_mpi_op_minloc.op) || + ompi_datatype_is_contiguous_memory_layout(temp_dt, temp_count)) { + size_t temp_size; + ompi_datatype_type_size(temp_dt, &temp_size); + while (origin_ucx_iov_idx < origin_ucx_iov_count) { + int curr_count = origin_ucx_iov[origin_ucx_iov_idx].len / temp_size; + ompi_op_reduce(op, origin_ucx_iov[origin_ucx_iov_idx].addr, + temp_addr, curr_count, temp_dt); + temp_addr = (void *)((char *)temp_addr + curr_count * temp_size); + origin_ucx_iov_idx++; + } + } else { + int i; + void *curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr; + for (i = 0; i < (int)temp_count; i++) { + ompi_op_reduce(op, curr_origin_addr, + (void *)((char *)temp_addr + i * temp_extent), + 1, temp_dt); + curr_origin_addr = (void *)((char *)curr_origin_addr + temp_extent); + origin_ucx_iov_idx++; + if (curr_origin_addr >= (void *)((char *)origin_ucx_iov[origin_ucx_iov_idx].addr + origin_ucx_iov[origin_ucx_iov_idx].len)) { + origin_ucx_iov_idx++; + curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr; + } + } + } + + free(origin_ucx_iov); + } + + ret = ompi_osc_ucx_put(temp_addr, (int)temp_count, temp_dt, target, target_disp, + target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + free(temp_addr); + } + + ret = end_atomicity(module, ep, target); + + return ret; +} + +int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr, + void *result_addr, struct ompi_datatype_t *dt, + int target, ptrdiff_t target_disp, + struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); + ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + size_t dt_bytes; + ompi_osc_ucx_internal_request_t *req = NULL; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ompi_datatype_type_size(dt, &dt_bytes); + memcpy(result_addr, origin_addr, dt_bytes); + req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, *(uint64_t *)compare_addr, + result_addr, dt_bytes, remote_addr, rkey, req_completion); + if (UCS_PTR_IS_PTR(req)) { + ucp_request_release(req); + } + + return incr_and_check_ops_num(module, target, ep); +} + +int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, + struct ompi_datatype_t *dt, int target, + ptrdiff_t target_disp, struct ompi_op_t *op, + struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if (op == &ompi_mpi_op_no_op.op || op == &ompi_mpi_op_replace.op || + op == &ompi_mpi_op_sum.op) { + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); + ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + uint64_t value = *(uint64_t *)origin_addr; + ucp_atomic_fetch_op_t opcode; + size_t dt_bytes; + ompi_osc_ucx_internal_request_t *req = NULL; + + ompi_datatype_type_size(dt, &dt_bytes); + + if (op == &ompi_mpi_op_replace.op) { + opcode = UCP_ATOMIC_FETCH_OP_SWAP; + } else { + opcode = UCP_ATOMIC_FETCH_OP_FADD; + if (op == &ompi_mpi_op_no_op.op) { + value = 0; + } + } + + req = ucp_atomic_fetch_nb(ep, opcode, value, result_addr, + dt_bytes, remote_addr, rkey, req_completion); + if (UCS_PTR_IS_PTR(req)) { + ucp_request_release(req); + } + + return incr_and_check_ops_num(module, target, ep); + } else { + return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt, + target, target_disp, 1, dt, op, win); + } +} + +int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + void *result_addr, int result_count, + struct ompi_datatype_t *result_dt, + int target, ptrdiff_t target_disp, + int target_count, struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, false); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ret = start_atomicity(module, ep, target); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ret = ompi_osc_ucx_get(result_addr, result_count, result_dt, target, + target_disp, target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if (op != &ompi_mpi_op_no_op.op) { + if (op == &ompi_mpi_op_replace.op) { + ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, + target, target_disp, target_count, + target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + } else { + void *temp_addr = NULL; + uint32_t temp_count; + ompi_datatype_t *temp_dt; + ptrdiff_t temp_lb, temp_extent; + ucs_status_t status; + bool is_origin_contig = ompi_datatype_is_contiguous_memory_layout(origin_dt, origin_count); + + if (ompi_datatype_is_predefined(target_dt)) { + temp_dt = target_dt; + temp_count = target_count; + } else { + ret = ompi_osc_base_get_primitive_type_info(target_dt, &temp_dt, &temp_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + } + ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent); + temp_addr = malloc(temp_extent * temp_count); + if (temp_addr == NULL) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_get(temp_addr, (int)temp_count, temp_dt, + target, target_disp, target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + if (ompi_datatype_is_predefined(origin_dt) || is_origin_contig) { + ompi_op_reduce(op, (void *)origin_addr, temp_addr, (int)temp_count, temp_dt); + } else { + ucx_iovec_t *origin_ucx_iov = NULL; + uint32_t origin_ucx_iov_count = 0; + uint32_t origin_ucx_iov_idx = 0; + + ret = create_iov_list(origin_addr, origin_count, origin_dt, + &origin_ucx_iov, &origin_ucx_iov_count); + if (ret != OMPI_SUCCESS) { + return ret; + } + + if ((op != &ompi_mpi_op_maxloc.op && op != &ompi_mpi_op_minloc.op) || + ompi_datatype_is_contiguous_memory_layout(temp_dt, temp_count)) { + size_t temp_size; + ompi_datatype_type_size(temp_dt, &temp_size); + while (origin_ucx_iov_idx < origin_ucx_iov_count) { + int curr_count = origin_ucx_iov[origin_ucx_iov_idx].len / temp_size; + ompi_op_reduce(op, origin_ucx_iov[origin_ucx_iov_idx].addr, + temp_addr, curr_count, temp_dt); + temp_addr = (void *)((char *)temp_addr + curr_count * temp_size); + origin_ucx_iov_idx++; + } + } else { + int i; + void *curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr; + for (i = 0; i < (int)temp_count; i++) { + ompi_op_reduce(op, curr_origin_addr, + (void *)((char *)temp_addr + i * temp_extent), + 1, temp_dt); + curr_origin_addr = (void *)((char *)curr_origin_addr + temp_extent); + origin_ucx_iov_idx++; + if (curr_origin_addr >= (void *)((char *)origin_ucx_iov[origin_ucx_iov_idx].addr + origin_ucx_iov[origin_ucx_iov_idx].len)) { + origin_ucx_iov_idx++; + curr_origin_addr = origin_ucx_iov[origin_ucx_iov_idx].addr; + } + } + } + free(origin_ucx_iov); + } + + ret = ompi_osc_ucx_put(temp_addr, (int)temp_count, temp_dt, target, target_disp, + target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + free(temp_addr); + } + } + + ret = end_atomicity(module, ep, target); + + return ret; +} + +int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win, struct ompi_request_t **request) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET; + ucp_rkey_h rkey = (module->state_info_array[target]).rkey; + ompi_osc_ucx_request_t *ucx_req = NULL; + ompi_osc_ucx_internal_request_t *internal_req = NULL; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, true); + if (ret != OMPI_SUCCESS) { + return ret; + } + + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); + if (NULL == ucx_req) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, target_disp, + target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_fence failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + internal_req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_FADD, 0, + &(module->req_result), sizeof(uint64_t), + remote_addr, rkey, req_completion); + + if (UCS_PTR_IS_PTR(internal_req)) { + internal_req->external_req = ucx_req; + mca_osc_ucx_component.num_incomplete_req_ops++; + } else { + ompi_request_complete(&ucx_req->super, true); + } + + *request = &ucx_req->super; + + return incr_and_check_ops_num(module, target, ep); +} + +int ompi_osc_ucx_rget(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_win_t *win, + struct ompi_request_t **request) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET; + ucp_rkey_h rkey = (module->state_info_array[target]).rkey; + ompi_osc_ucx_request_t *ucx_req = NULL; + ompi_osc_ucx_internal_request_t *internal_req = NULL; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, true); + if (ret != OMPI_SUCCESS) { + return ret; + } + + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); + if (NULL == ucx_req) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_get(origin_addr, origin_count, origin_dt, target, target_disp, + target_count, target_dt, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_fence failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + internal_req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_FADD, 0, + &(module->req_result), sizeof(uint64_t), + remote_addr, rkey, req_completion); + + if (UCS_PTR_IS_PTR(internal_req)) { + internal_req->external_req = ucx_req; + mca_osc_ucx_component.num_incomplete_req_ops++; + } else { + ompi_request_complete(&ucx_req->super, true); + } + + *request = &ucx_req->super; + + return incr_and_check_ops_num(module, target, ep); +} + +int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_dt, struct ompi_op_t *op, + struct ompi_win_t *win, struct ompi_request_t **request) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ompi_osc_ucx_request_t *ucx_req = NULL; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, true); + if (ret != OMPI_SUCCESS) { + return ret; + } + + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); + if (NULL == ucx_req) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp, + target_count, target_dt, op, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ompi_request_complete(&ucx_req->super, true); + *request = &ucx_req->super; + + return ret; +} + +int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_datatype, + void *result_addr, int result_count, + struct ompi_datatype_t *result_datatype, + int target, ptrdiff_t target_disp, int target_count, + struct ompi_datatype_t *target_datatype, + struct ompi_op_t *op, struct ompi_win_t *win, + struct ompi_request_t **request) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ompi_osc_ucx_request_t *ucx_req = NULL; + int ret = OMPI_SUCCESS; + + ret = check_sync_state(module, target, true); + if (ret != OMPI_SUCCESS) { + return ret; + } + + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); + if (NULL == ucx_req) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + ret = ompi_osc_ucx_get_accumulate(origin_addr, origin_count, origin_datatype, + result_addr, result_count, result_datatype, + target, target_disp, target_count, + target_datatype, op, win); + if (ret != OMPI_SUCCESS) { + return ret; + } + + ompi_request_complete(&ucx_req->super, true); + + *request = &ucx_req->super; + + return ret; +} diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c new file mode 100644 index 0000000000..e339824f0e --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -0,0 +1,699 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_ucx.h" +#include "osc_ucx_request.h" + +static int component_open(void); +static int component_register(void); +static int component_init(bool enable_progress_threads, bool enable_mpi_threads); +static int component_finalize(void); +static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor); +static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor, int *model); + +ompi_osc_ucx_component_t mca_osc_ucx_component = { + { /* ompi_osc_base_component_t */ + .osc_version = { + OMPI_OSC_BASE_VERSION_3_0_0, + .mca_component_name = "ucx", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_open_component = component_open, + .mca_register_component_params = component_register, + }, + .osc_data = { + /* The component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + .osc_init = component_init, + .osc_query = component_query, + .osc_select = component_select, + .osc_finalize = component_finalize, + } +}; + +ompi_osc_ucx_module_t ompi_osc_ucx_module_template = { + { + .osc_win_attach = ompi_osc_ucx_win_attach, + .osc_win_detach = ompi_osc_ucx_win_detach, + .osc_free = ompi_osc_ucx_free, + + .osc_put = ompi_osc_ucx_put, + .osc_get = ompi_osc_ucx_get, + .osc_accumulate = ompi_osc_ucx_accumulate, + .osc_compare_and_swap = ompi_osc_ucx_compare_and_swap, + .osc_fetch_and_op = ompi_osc_ucx_fetch_and_op, + .osc_get_accumulate = ompi_osc_ucx_get_accumulate, + + .osc_rput = ompi_osc_ucx_rput, + .osc_rget = ompi_osc_ucx_rget, + .osc_raccumulate = ompi_osc_ucx_raccumulate, + .osc_rget_accumulate = ompi_osc_ucx_rget_accumulate, + + .osc_fence = ompi_osc_ucx_fence, + + .osc_start = ompi_osc_ucx_start, + .osc_complete = ompi_osc_ucx_complete, + .osc_post = ompi_osc_ucx_post, + .osc_wait = ompi_osc_ucx_wait, + .osc_test = ompi_osc_ucx_test, + + .osc_lock = ompi_osc_ucx_lock, + .osc_unlock = ompi_osc_ucx_unlock, + .osc_lock_all = ompi_osc_ucx_lock_all, + .osc_unlock_all = ompi_osc_ucx_unlock_all, + + .osc_sync = ompi_osc_ucx_sync, + .osc_flush = ompi_osc_ucx_flush, + .osc_flush_all = ompi_osc_ucx_flush_all, + .osc_flush_local = ompi_osc_ucx_flush_local, + .osc_flush_local_all = ompi_osc_ucx_flush_local_all, + } +}; + +static int component_open(void) { + return OMPI_SUCCESS; +} + +static int component_register(void) { + return OMPI_SUCCESS; +} + +static int progress_callback(void) { + if (mca_osc_ucx_component.ucp_worker != NULL && + mca_osc_ucx_component.num_incomplete_req_ops > 0) { + ucp_worker_progress(mca_osc_ucx_component.ucp_worker); + } + return 0; +} + +static int component_init(bool enable_progress_threads, bool enable_mpi_threads) { + ucp_config_t *config = NULL; + ucp_params_t context_params; + bool progress_registered = false, requests_created = false; + int ret = OMPI_SUCCESS; + ucs_status_t status; + + mca_osc_ucx_component.ucp_context = NULL; + mca_osc_ucx_component.ucp_worker = NULL; + mca_osc_ucx_component.enable_mpi_threads = enable_mpi_threads; + + status = ucp_config_read("MPI", NULL, &config); + if (UCS_OK != status) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_config_read failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + OBJ_CONSTRUCT(&mca_osc_ucx_component.requests, opal_free_list_t); + requests_created = true; + ret = opal_free_list_init (&mca_osc_ucx_component.requests, + sizeof(ompi_osc_ucx_request_t), + opal_cache_line_size, + OBJ_CLASS(ompi_osc_ucx_request_t), + 0, 0, 8, 0, 8, NULL, 0, NULL, NULL, NULL); + if (OMPI_SUCCESS != ret) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: opal_free_list_init failed: %d\n", + __FILE__, __LINE__, ret); + goto error; + } + + mca_osc_ucx_component.num_incomplete_req_ops = 0; + + ret = opal_progress_register(progress_callback); + progress_registered = true; + if (OMPI_SUCCESS != ret) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: opal_progress_register failed: %d\n", + __FILE__, __LINE__, ret); + goto error; + } + + /* initialize UCP context */ + + memset(&context_params, 0, sizeof(ucp_context_h)); + context_params.field_mask = UCP_PARAM_FIELD_FEATURES | + UCP_PARAM_FIELD_MT_WORKERS_SHARED | + UCP_PARAM_FIELD_ESTIMATED_NUM_EPS | + UCP_PARAM_FIELD_REQUEST_INIT | + UCP_PARAM_FIELD_REQUEST_SIZE; + context_params.features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64; + context_params.mt_workers_shared = 0; + context_params.estimated_num_eps = ompi_proc_world_size(); + context_params.request_init = internal_req_init; + context_params.request_size = sizeof(ompi_osc_ucx_internal_request_t); + + status = ucp_init(&context_params, config, &mca_osc_ucx_component.ucp_context); + ucp_config_release(config); + if (UCS_OK != status) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_init failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + return ret; + error: + if (progress_registered) opal_progress_unregister(progress_callback); + if (requests_created) OBJ_DESTRUCT(&mca_osc_ucx_component.requests); + if (mca_osc_ucx_component.ucp_context) ucp_cleanup(mca_osc_ucx_component.ucp_context); + return ret; +} + +static int component_finalize(void) { + int i; + for (i = 0; i < ompi_proc_world_size(); i++) { + ucp_ep_h ep = OSC_UCX_GET_EP(&(ompi_mpi_comm_world.comm), i); + if (ep != NULL) { + ucp_ep_destroy(ep); + } + } + + if (mca_osc_ucx_component.ucp_worker != NULL) { + ucp_worker_destroy(mca_osc_ucx_component.ucp_worker); + } + + assert(mca_osc_ucx_component.num_incomplete_req_ops == 0); + OBJ_DESTRUCT(&mca_osc_ucx_component.requests); + opal_progress_unregister(progress_callback); + ucp_cleanup(mca_osc_ucx_component.ucp_context); + return OMPI_SUCCESS; +} + +static int component_query(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, int flavor) { + if (MPI_WIN_FLAVOR_SHARED == flavor) return -1; + return 100; +} + +static inline int allgather_len_and_info(void *my_info, int my_info_len, char **recv_info, + int *disps, struct ompi_communicator_t *comm) { + int ret = OMPI_SUCCESS; + int comm_size = ompi_comm_size(comm); + int lens[comm_size]; + int total_len, i; + + ret = comm->c_coll->coll_allgather(&my_info_len, 1, MPI_INT, + lens, 1, MPI_INT, comm, + comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) { + return ret; + } + + total_len = 0; + for (i = 0; i < comm_size; i++) { + disps[i] = total_len; + total_len += lens[i]; + } + + (*recv_info) = (char *)malloc(total_len); + + ret = comm->c_coll->coll_allgatherv(my_info, my_info_len, MPI_BYTE, + (void *)(*recv_info), lens, disps, MPI_BYTE, + comm, comm->c_coll->coll_allgatherv_module); + if (OMPI_SUCCESS != ret) { + return ret; + } + + return ret; +} + +static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr, + ompi_osc_ucx_module_t *module, int flavor) { + ucp_mem_map_params_t mem_params; + ucp_mem_attr_t mem_attrs; + ucs_status_t status; + int ret = OMPI_SUCCESS; + + assert(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE); + + memset(&mem_params, 0, sizeof(ucp_mem_map_params_t)); + mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | + UCP_MEM_MAP_PARAM_FIELD_LENGTH | + UCP_MEM_MAP_PARAM_FIELD_FLAGS; + mem_params.length = size; + if (flavor == MPI_WIN_FLAVOR_ALLOCATE) { + mem_params.address = NULL; + mem_params.flags = UCP_MEM_MAP_ALLOCATE; + } else { + mem_params.address = (*base); + } + + /* memory map */ + + status = ucp_mem_map(mca_osc_ucx_component.ucp_context, &mem_params, memh_ptr); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_mem_map failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + mem_attrs.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS | UCP_MEM_ATTR_FIELD_LENGTH; + status = ucp_mem_query((*memh_ptr), &mem_attrs); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_mem_query failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + assert(mem_attrs.length >= size); + if (flavor == MPI_WIN_FLAVOR_CREATE) { + assert(mem_attrs.address == (*base)); + } else { + (*base) = mem_attrs.address; + } + + return ret; + error: + ucp_mem_unmap(mca_osc_ucx_component.ucp_context, (*memh_ptr)); + return ret; +} + +static int component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit, + struct ompi_communicator_t *comm, struct opal_info_t *info, + int flavor, int *model) { + ompi_osc_ucx_module_t *module = NULL; + char *name = NULL; + long values[2]; + int ret = OMPI_SUCCESS; + ucs_status_t status; + int i, comm_size = ompi_comm_size(comm); + int is_eps_ready; + bool eps_created = false, worker_created = false; + ucp_address_t *my_addr = NULL; + size_t my_addr_len; + char *recv_buf = NULL; + void *rkey_buffer = NULL, *state_rkey_buffer = NULL; + size_t rkey_buffer_size, state_rkey_buffer_size; + void *state_base = NULL; + void * my_info = NULL; + size_t my_info_len; + int disps[comm_size]; + int rkey_sizes[comm_size]; + + /* the osc/sm component is the exclusive provider for support for + * shared memory windows */ + if (flavor == MPI_WIN_FLAVOR_SHARED) { + return OMPI_ERR_NOT_SUPPORTED; + } + + /* if UCP worker has never been initialized before, init it first */ + if (mca_osc_ucx_component.ucp_worker == NULL) { + ucp_worker_params_t worker_params; + ucp_worker_attr_t worker_attr; + + memset(&worker_params, 0, sizeof(ucp_worker_h)); + worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; + worker_params.thread_mode = (mca_osc_ucx_component.enable_mpi_threads == true) + ? UCS_THREAD_MODE_MULTI : UCS_THREAD_MODE_SINGLE; + status = ucp_worker_create(mca_osc_ucx_component.ucp_context, &worker_params, + &(mca_osc_ucx_component.ucp_worker)); + if (UCS_OK != status) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_create failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + /* query UCP worker attributes */ + worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE; + status = ucp_worker_query(mca_osc_ucx_component.ucp_worker, &worker_attr); + if (UCS_OK != status) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_query failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + if (mca_osc_ucx_component.enable_mpi_threads == true && + worker_attr.thread_mode != UCS_THREAD_MODE_MULTI) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucx does not support multithreading\n", + __FILE__, __LINE__); + ret = OMPI_ERROR; + goto error; + } + + worker_created = true; + } + + /* create module structure */ + module = (ompi_osc_ucx_module_t *)calloc(1, sizeof(ompi_osc_ucx_module_t)); + if (module == NULL) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + /* fill in the function pointer part */ + memcpy(module, &ompi_osc_ucx_module_template, sizeof(ompi_osc_base_module_t)); + + ret = ompi_comm_dup(comm, &module->comm); + if (ret != OMPI_SUCCESS) { + goto error; + } + + asprintf(&name, "ucx window %d", ompi_comm_get_cid(module->comm)); + ompi_win_set_name(win, name); + free(name); + + /* share everyone's displacement units. Only do an allgather if + strictly necessary, since it requires O(p) state. */ + values[0] = disp_unit; + values[1] = -disp_unit; + + ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, values, 2, MPI_LONG, + MPI_MIN, module->comm, + module->comm->c_coll->coll_allreduce_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + + if (values[0] == -values[1]) { /* everyone has the same disp_unit, we do not need O(p) space */ + module->disp_unit = disp_unit; + } else { /* different disp_unit sizes, allocate O(p) space to store them */ + module->disp_unit = -1; + module->disp_units = calloc(comm_size, sizeof(int)); + if (module->disp_units == NULL) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + ret = module->comm->c_coll->coll_allgather(&disp_unit, 1, MPI_INT, + module->disp_units, 1, MPI_INT, + module->comm, + module->comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + } + + /* exchange endpoints if necessary */ + is_eps_ready = 1; + for (i = 0; i < comm_size; i++) { + if (OSC_UCX_GET_EP(module->comm, i) == NULL) { + is_eps_ready = 0; + break; + } + } + + ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, &is_eps_ready, 1, MPI_INT, + MPI_LAND, + module->comm, + module->comm->c_coll->coll_allreduce_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + + if (!is_eps_ready) { + status = ucp_worker_get_address(mca_osc_ucx_component.ucp_worker, + &my_addr, &my_addr_len); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_get_address failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + ret = allgather_len_and_info(my_addr, (int)my_addr_len, + &recv_buf, disps, module->comm); + if (ret != OMPI_SUCCESS) { + goto error; + } + + for (i = 0; i < comm_size; i++) { + if (OSC_UCX_GET_EP(module->comm, i) == NULL) { + ucp_ep_params_t ep_params; + ucp_ep_h ep; + memset(&ep_params, 0, sizeof(ucp_ep_params_t)); + ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; + ep_params.address = (ucp_address_t *)&(recv_buf[disps[i]]); + status = ucp_ep_create(mca_osc_ucx_component.ucp_worker, &ep_params, &ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_create failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + ompi_comm_peer_lookup(module->comm, i)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX] = ep; + } + } + + ucp_worker_release_address(mca_osc_ucx_component.ucp_worker, my_addr); + my_addr = NULL; + free(recv_buf); + recv_buf = NULL; + + eps_created = true; + } + + ret = mem_map(base, size, &(module->memh), module, flavor); + if (ret != OMPI_SUCCESS) { + goto error; + } + + state_base = (void *)&(module->state); + ret = mem_map(&state_base, sizeof(ompi_osc_ucx_state_t), &(module->state_memh), + module, MPI_WIN_FLAVOR_CREATE); + if (ret != OMPI_SUCCESS) { + goto error; + } + + module->win_info_array = calloc(comm_size, sizeof(ompi_osc_ucx_win_info_t)); + if (module->win_info_array == NULL) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + module->state_info_array = calloc(comm_size, sizeof(ompi_osc_ucx_win_info_t)); + if (module->state_info_array == NULL) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh, + &rkey_buffer, &rkey_buffer_size); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_rkey_pack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->state_memh, + &state_rkey_buffer, &state_rkey_buffer_size); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_rkey_pack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + my_info_len = 2 * sizeof(uint64_t) + rkey_buffer_size + state_rkey_buffer_size; + my_info = malloc(my_info_len); + if (my_info == NULL) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto error; + } + + memcpy(my_info, base, sizeof(uint64_t)); + memcpy((void *)((char *)my_info + sizeof(uint64_t)), &state_base, sizeof(uint64_t)); + memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t)), rkey_buffer, rkey_buffer_size); + memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t) + rkey_buffer_size), + state_rkey_buffer, state_rkey_buffer_size); + + ret = allgather_len_and_info(my_info, (int)my_info_len, &recv_buf, disps, module->comm); + if (ret != OMPI_SUCCESS) { + goto error; + } + + ret = comm->c_coll->coll_allgather((void *)&rkey_buffer_size, 1, MPI_INT, + rkey_sizes, 1, MPI_INT, comm, + comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) { + goto error; + } + + for (i = 0; i < comm_size; i++) { + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, i); + assert(ep != NULL); + + memcpy(&(module->win_info_array[i]).addr, &recv_buf[disps[i]], sizeof(uint64_t)); + memcpy(&(module->state_info_array[i]).addr, &recv_buf[disps[i] + sizeof(uint64_t)], + sizeof(uint64_t)); + + status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]), + &((module->win_info_array[i]).rkey)); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_rkey_unpack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + + status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]), + &((module->state_info_array[i]).rkey)); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_rkey_unpack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + } + + free(my_info); + free(recv_buf); + + ucp_rkey_buffer_release(rkey_buffer); + ucp_rkey_buffer_release(state_rkey_buffer); + + module->state.lock = TARGET_LOCK_UNLOCKED; + module->state.post_index = 0; + memset((void *)module->state.post_state, 0, sizeof(uint64_t) * OMPI_OSC_UCX_POST_PEER_MAX); + module->state.complete_count = 0; + module->state.req_flag = 0; + module->state.acc_lock = TARGET_LOCK_UNLOCKED; + module->epoch_type.access = NONE_EPOCH; + module->epoch_type.exposure = NONE_EPOCH; + module->lock_count = 0; + module->post_count = 0; + module->start_group = NULL; + module->post_group = NULL; + OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t); + OBJ_CONSTRUCT(&module->pending_posts, opal_list_t); + module->global_ops_num = 0; + module->per_target_ops_nums = calloc(comm_size, sizeof(int)); + module->start_grp_ranks = NULL; + module->lock_all_is_nocheck = false; + + ret = opal_hash_table_init(&module->outstanding_locks, comm_size); + if (ret != OPAL_SUCCESS) { + goto error; + } + + win->w_osc_module = &module->super; + + /* sync with everyone */ + + ret = module->comm->c_coll->coll_barrier(module->comm, + module->comm->c_coll->coll_barrier_module); + if (ret != OMPI_SUCCESS) { + goto error; + } + + return ret; + + error: + if (my_addr) ucp_worker_release_address(mca_osc_ucx_component.ucp_worker, my_addr); + if (recv_buf) free(recv_buf); + if (my_info) free(my_info); + for (i = 0; i < comm_size; i++) { + if ((module->win_info_array[i]).rkey != NULL) { + ucp_rkey_destroy((module->win_info_array[i]).rkey); + } + if ((module->state_info_array[i]).rkey != NULL) { + ucp_rkey_destroy((module->state_info_array[i]).rkey); + } + } + if (rkey_buffer) ucp_rkey_buffer_release(rkey_buffer); + if (state_rkey_buffer) ucp_rkey_buffer_release(state_rkey_buffer); + if (module->win_info_array) free(module->win_info_array); + if (module->state_info_array) free(module->state_info_array); + if (module->disp_units) free(module->disp_units); + if (module->comm) ompi_comm_free(&module->comm); + if (module->per_target_ops_nums) free(module->per_target_ops_nums); + if (eps_created) { + for (i = 0; i < comm_size; i++) { + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, i); + ucp_ep_destroy(ep); + } + } + if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker); + if (module) free(module); + return ret; +} + +int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) { + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) { + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_free(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int i, ret = OMPI_SUCCESS; + + if ((module->epoch_type.access != NONE_EPOCH && module->epoch_type.access != FENCE_EPOCH) + || module->epoch_type.exposure != NONE_EPOCH) { + ret = OMPI_ERR_RMA_SYNC; + } + + if (module->start_group != NULL || module->post_group != NULL) { + ret = OMPI_ERR_RMA_SYNC; + } + + assert(module->global_ops_num == 0); + assert(module->lock_count == 0); + assert(opal_list_is_empty(&module->pending_posts) == true); + OBJ_DESTRUCT(&module->outstanding_locks); + OBJ_DESTRUCT(&module->pending_posts); + + while (module->state.lock != TARGET_LOCK_UNLOCKED) { + /* not sure if this is required */ + ucp_worker_progress(mca_osc_ucx_component.ucp_worker); + } + + ret = module->comm->c_coll->coll_barrier(module->comm, + module->comm->c_coll->coll_barrier_module); + + for (i = 0; i < ompi_comm_size(module->comm); i++) { + ucp_rkey_destroy((module->win_info_array[i]).rkey); + ucp_rkey_destroy((module->state_info_array[i]).rkey); + } + free(module->win_info_array); + free(module->state_info_array); + + free(module->per_target_ops_nums); + + ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh); + ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->state_memh); + + if (module->disp_units) free(module->disp_units); + ompi_comm_free(&module->comm); + + free(module); + + return ret; +} diff --git a/ompi/mca/osc/ucx/osc_ucx_passive_target.c b/ompi/mca/osc/ucx/osc_ucx_passive_target.c new file mode 100644 index 0000000000..9f2fe98b63 --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_passive_target.c @@ -0,0 +1,365 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_ucx.h" + +OBJ_CLASS_INSTANCE(ompi_osc_ucx_lock_t, opal_object_t, NULL, NULL); + +static inline int start_shared(ompi_osc_ucx_module_t *module, int target) { + uint64_t result_value = -1; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; + ucs_status_t status; + + while (true) { + status = ucp_atomic_fadd64(ep, 1, remote_addr, rkey, &result_value); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_fadd64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + assert(result_value >= 0); + if (result_value >= TARGET_LOCK_EXCLUSIVE) { + status = ucp_atomic_add64(ep, (-1), remote_addr, rkey); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_add64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } else { + break; + } + } + + return OMPI_SUCCESS; +} + +static inline int end_shared(ompi_osc_ucx_module_t *module, int target) { + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; + ucs_status_t status; + + status = ucp_atomic_add64(ep, (-1), remote_addr, rkey); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_add64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) { + uint64_t result_value = -1; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; + ucs_status_t status; + + while (result_value != TARGET_LOCK_UNLOCKED) { + status = ucp_atomic_cswap64(ep, TARGET_LOCK_UNLOCKED, + TARGET_LOCK_EXCLUSIVE, + remote_addr, rkey, &result_value); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_cswap64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) { + uint64_t result_value = 0; + ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); + ucp_rkey_h rkey = (module->state_info_array)[target].rkey; + uint64_t remote_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_LOCK_OFFSET; + ucs_status_t status; + + status = ucp_atomic_swap64(ep, TARGET_LOCK_UNLOCKED, + remote_addr, rkey, &result_value); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_atomic_swap64 failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + assert(result_value >= TARGET_LOCK_EXCLUSIVE); + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_lock(int lock_type, int target, int assert, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; + ompi_osc_ucx_lock_t *lock = NULL; + ompi_osc_ucx_epoch_t original_epoch = module->epoch_type.access; + int ret = OMPI_SUCCESS; + + if (module->lock_count == 0) { + if (module->epoch_type.access != NONE_EPOCH && + module->epoch_type.access != FENCE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + } else { + ompi_osc_ucx_lock_t *item = NULL; + assert(module->epoch_type.access == PASSIVE_EPOCH); + opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &item); + if (item != NULL) { + return OMPI_ERR_RMA_SYNC; + } + } + + module->epoch_type.access = PASSIVE_EPOCH; + module->lock_count++; + assert(module->lock_count <= ompi_comm_size(module->comm)); + + lock = OBJ_NEW(ompi_osc_ucx_lock_t); + lock->target_rank = target; + + if ((assert & MPI_MODE_NOCHECK) == 0) { + lock->is_nocheck = false; + if (lock_type == MPI_LOCK_EXCLUSIVE) { + ret = start_exclusive(module, target); + lock->type = LOCK_EXCLUSIVE; + } else { + ret = start_shared(module, target); + lock->type = LOCK_SHARED; + } + } else { + lock->is_nocheck = true; + } + + if (ret == OMPI_SUCCESS) { + opal_hash_table_set_value_uint32(&module->outstanding_locks, (uint32_t)target, (void *)lock); + } else { + OBJ_RELEASE(lock); + module->epoch_type.access = original_epoch; + } + + return ret; +} + +int ompi_osc_ucx_unlock(int target, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; + ompi_osc_ucx_lock_t *lock = NULL; + ucs_status_t status; + int ret = OMPI_SUCCESS; + ucp_ep_h ep; + + if (module->epoch_type.access != PASSIVE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + opal_hash_table_get_value_uint32(&module->outstanding_locks, (uint32_t) target, (void **) &lock); + if (lock == NULL) { + return OMPI_ERR_RMA_SYNC; + } + + opal_hash_table_remove_value_uint32(&module->outstanding_locks, + (uint32_t)target); + + ep = OSC_UCX_GET_EP(module->comm, target); + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + module->global_ops_num -= module->per_target_ops_nums[target]; + module->per_target_ops_nums[target] = 0; + + if (lock->is_nocheck == false) { + if (lock->type == LOCK_EXCLUSIVE) { + ret = end_exclusive(module, target); + } else { + ret = end_shared(module, target); + } + } + + OBJ_RELEASE(lock); + + module->lock_count--; + assert(module->lock_count >= 0); + if (module->lock_count == 0) { + module->epoch_type.access = NONE_EPOCH; + assert(module->global_ops_num == 0); + } + + return ret; +} + +int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int ret = OMPI_SUCCESS; + + if (module->epoch_type.access != NONE_EPOCH && + module->epoch_type.access != FENCE_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + module->epoch_type.access = PASSIVE_ALL_EPOCH; + + if (0 == (assert & MPI_MODE_NOCHECK)) { + int i, comm_size; + module->lock_all_is_nocheck = false; + comm_size = ompi_comm_size(module->comm); + for (i = 0; i < comm_size; i++) { + ret = start_shared(module, i); + if (ret != OMPI_SUCCESS) { + int j; + for (j = 0; j < i; j++) { + end_shared(module, j); + } + return ret; + } + } + } else { + module->lock_all_is_nocheck = true; + } + + if (ret != OMPI_SUCCESS) { + module->epoch_type.access = NONE_EPOCH; + } + + return ret; +} + +int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*)win->w_osc_module; + int comm_size = ompi_comm_size(module->comm); + ucs_status_t status; + int ret = OMPI_SUCCESS; + + if (module->epoch_type.access != PASSIVE_ALL_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + assert(module->lock_count == 0); + + status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + module->global_ops_num = 0; + memset(module->per_target_ops_nums, 0, sizeof(int) * comm_size); + + if (!module->lock_all_is_nocheck) { + int i; + for (i = 0; i < comm_size; i++) { + ret |= end_shared(module, i); + } + } + + module->epoch_type.access = NONE_EPOCH; + + return ret; +} + +int ompi_osc_ucx_sync(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; + ucs_status_t status; + + if (module->epoch_type.access != PASSIVE_EPOCH && + module->epoch_type.access != PASSIVE_ALL_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + opal_atomic_mb(); + + status = ucp_worker_fence(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_fence failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_flush(int target, struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + ucp_ep_h ep; + ucs_status_t status; + + if (module->epoch_type.access != PASSIVE_EPOCH && + module->epoch_type.access != PASSIVE_ALL_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + ep = OSC_UCX_GET_EP(module->comm, target); + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + module->global_ops_num -= module->per_target_ops_nums[target]; + module->per_target_ops_nums[target] = 0; + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_flush_all(struct ompi_win_t *win) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; + ucs_status_t status; + + if (module->epoch_type.access != PASSIVE_EPOCH && + module->epoch_type.access != PASSIVE_ALL_EPOCH) { + return OMPI_ERR_RMA_SYNC; + } + + status = ucp_worker_flush(mca_osc_ucx_component.ucp_worker); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_worker_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + module->global_ops_num = 0; + memset(module->per_target_ops_nums, 0, + sizeof(int) * ompi_comm_size(module->comm)); + + return OMPI_SUCCESS; +} + +int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win) { + /* TODO: currently euqals to ompi_osc_ucx_flush, should find a way + * to implement local completion */ + return ompi_osc_ucx_flush(target, win); +} + +int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win) { + /* TODO: currently euqals to ompi_osc_ucx_flush_all, should find a way + * to implement local completion */ + return ompi_osc_ucx_flush_all(win); +} diff --git a/ompi/mca/osc/ucx/osc_ucx_request.c b/ompi/mca/osc/ucx/osc_ucx_request.c new file mode 100644 index 0000000000..efbd9c38cc --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_request.c @@ -0,0 +1,65 @@ +/* + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/request/request.h" +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/osc/base/base.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" + +#include "osc_ucx.h" +#include "osc_ucx_request.h" + +static int request_cancel(struct ompi_request_t *request, int complete) +{ + return MPI_ERR_REQUEST; +} + +static int request_free(struct ompi_request_t **ompi_req) +{ + ompi_osc_ucx_request_t *request = (ompi_osc_ucx_request_t*) *ompi_req; + + if (true != (bool)(request->super.req_complete)) { + return MPI_ERR_REQUEST; + } + + OMPI_OSC_UCX_REQUEST_RETURN(request); + + *ompi_req = MPI_REQUEST_NULL; + + return OMPI_SUCCESS; +} + +static void request_construct(ompi_osc_ucx_request_t *request) +{ + request->super.req_type = OMPI_REQUEST_WIN; + request->super.req_status._cancelled = 0; + request->super.req_free = request_free; + request->super.req_cancel = request_cancel; +} + +void internal_req_init(void *request) { + ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request; + req->external_req = NULL; +} + +void req_completion(void *request, ucs_status_t status) { + ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request; + + if(req->external_req != NULL) { + ompi_request_complete(&(req->external_req->super), true); + ucp_request_release(req); + mca_osc_ucx_component.num_incomplete_req_ops--; + assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0); + } +} + +OBJ_CLASS_INSTANCE(ompi_osc_ucx_request_t, ompi_request_t, + request_construct, NULL); diff --git a/ompi/mca/osc/ucx/osc_ucx_request.h b/ompi/mca/osc/ucx/osc_ucx_request.h new file mode 100644 index 0000000000..b33bc54c2d --- /dev/null +++ b/ompi/mca/osc/ucx/osc_ucx_request.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2011-2013 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_UCX_REQUEST_H +#define OMPI_OSC_UCX_REQUEST_H + +#include "ompi/request/request.h" + +typedef struct ompi_osc_ucx_request { + ompi_request_t super; +} ompi_osc_ucx_request_t; + +OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t); + +typedef struct ompi_osc_ucx_internal_request { + ompi_osc_ucx_request_t *external_req; +} ompi_osc_ucx_internal_request_t; + +#define OMPI_OSC_UCX_REQUEST_ALLOC(win, req) \ + do { \ + opal_free_list_item_t *item; \ + do { \ + item = opal_free_list_get(&mca_osc_ucx_component.requests); \ + if (item == NULL) { \ + if (mca_osc_ucx_component.ucp_worker != NULL && \ + mca_osc_ucx_component.num_incomplete_req_ops > 0) { \ + ucp_worker_progress(mca_osc_ucx_component.ucp_worker); \ + } \ + } \ + } while (item == NULL); \ + req = (ompi_osc_ucx_request_t*) item; \ + OMPI_REQUEST_INIT(&req->super, false); \ + req->super.req_mpi_object.win = win; \ + req->super.req_complete = false; \ + req->super.req_state = OMPI_REQUEST_ACTIVE; \ + req->super.req_status.MPI_ERROR = MPI_SUCCESS; \ + } while (0) + +#define OMPI_OSC_UCX_REQUEST_RETURN(req) \ + do { \ + OMPI_REQUEST_FINI(&request->super); \ + opal_free_list_return (&mca_osc_ucx_component.requests, \ + (opal_free_list_item_t*) req); \ + } while (0) + +#endif /* OMPI_OSC_UCX_REQUEST_H */