From 53bdfd1dcb09130179b0b7b08953cbc8e953ea02 Mon Sep 17 00:00:00 2001 From: Xin Zhao Date: Tue, 24 Apr 2018 11:53:58 +0300 Subject: [PATCH] OMPI/OSC/UCX: fix issue in impl of MPI_Win_create_dynamic/MPI_Win_attach/MPI_Win_detach Signed-off-by: Xin Zhao --- ompi/mca/osc/ucx/osc_ucx.h | 24 ++++ ompi/mca/osc/ucx/osc_ucx_comm.c | 123 +++++++++++++++++- ompi/mca/osc/ucx/osc_ucx_component.c | 182 +++++++++++++++++++++++---- 3 files changed, 301 insertions(+), 28 deletions(-) diff --git a/ompi/mca/osc/ucx/osc_ucx.h b/ompi/mca/osc/ucx/osc_ucx.h index 7c8f6930dd..7e93a1b74e 100644 --- a/ompi/mca/osc/ucx/osc_ucx.h +++ b/ompi/mca/osc/ucx/osc_ucx.h @@ -16,10 +16,13 @@ #include "ompi/communicator/communicator.h" #define OMPI_OSC_UCX_POST_PEER_MAX 32 +#define OMPI_OSC_UCX_ATTACH_MAX 32 +#define OMPI_OSC_UCX_RKEY_BUF_MAX 1024 typedef struct ompi_osc_ucx_win_info { ucp_rkey_h rkey; uint64_t addr; + bool rkey_init; } ompi_osc_ucx_win_info_t; typedef struct ompi_osc_ucx_component { @@ -59,6 +62,18 @@ typedef struct ompi_osc_ucx_epoch_type { #define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3) #define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4) #define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5) +#define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (5 + OMPI_OSC_UCX_POST_PEER_MAX)) + +typedef struct ompi_osc_dynamic_win_info { + uint64_t base; + size_t size; + char rkey_buffer[OMPI_OSC_UCX_RKEY_BUF_MAX]; +} ompi_osc_dynamic_win_info_t; + +typedef struct ompi_osc_local_dynamic_win_info { + ucp_mem_h memh; + int refcnt; +} ompi_osc_local_dynamic_win_info_t; typedef struct ompi_osc_ucx_state { volatile uint64_t lock; @@ -67,12 +82,16 @@ typedef struct ompi_osc_ucx_state { volatile uint64_t complete_count; /* # msgs received from complete processes */ volatile uint64_t post_index; volatile uint64_t post_state[OMPI_OSC_UCX_POST_PEER_MAX]; + volatile uint64_t dynamic_win_count; + volatile ompi_osc_dynamic_win_info_t dynamic_wins[OMPI_OSC_UCX_ATTACH_MAX]; } ompi_osc_ucx_state_t; typedef struct ompi_osc_ucx_module { ompi_osc_base_module_t super; struct ompi_communicator_t *comm; ucp_mem_h memh; /* remote accessible memory */ + int flavor; + size_t size; ucp_mem_h state_memh; ompi_osc_ucx_win_info_t *win_info_array; ompi_osc_ucx_win_info_t *state_info_array; @@ -82,6 +101,7 @@ typedef struct ompi_osc_ucx_module { int *disp_units; ompi_osc_ucx_state_t state; /* remote accessible flags */ + ompi_osc_local_dynamic_win_info_t local_dynamic_win_info[OMPI_OSC_UCX_ATTACH_MAX]; ompi_osc_ucx_epoch_type_t epoch_type; ompi_group_t *start_group; ompi_group_t *post_group; @@ -184,6 +204,10 @@ int ompi_osc_ucx_flush_all(struct ompi_win_t *win); int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win); int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win); +int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins, + int min_index, int max_index, + uint64_t base, size_t len, int *insert); + void req_completion(void *request, ucs_status_t status); void internal_req_init(void *request); diff --git a/ompi/mca/osc/ucx/osc_ucx_comm.c b/ompi/mca/osc/ucx/osc_ucx_comm.c index e4a3ede8a4..22f4ce1e94 100644 --- a/ompi/mca/osc/ucx/osc_ucx_comm.c +++ b/ompi/mca/osc/ucx/osc_ucx_comm.c @@ -325,13 +325,68 @@ static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int return OMPI_SUCCESS; } +static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module, + ucp_ep_h ep, int target) { + ucp_rkey_h state_rkey = (module->state_info_array)[target].rkey; + uint64_t remote_state_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET; + size_t len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX; + char *temp_buf = malloc(len); + ompi_osc_dynamic_win_info_t *temp_dynamic_wins; + int win_count, contain, insert = -1; + ucs_status_t status; + + if ((module->win_info_array[target]).rkey_init == true) { + ucp_rkey_destroy((module->win_info_array[target]).rkey); + (module->win_info_array[target]).rkey_init == false; + } + + status = ucp_get_nbi(ep, (void *)temp_buf, len, remote_state_addr, state_rkey); + if (status != UCS_OK && status != UCS_INPROGRESS) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_get_nbi failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + status = ucp_ep_flush(ep); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_flush failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + memcpy(&win_count, temp_buf, sizeof(uint64_t)); + assert(win_count > 0 && win_count <= OMPI_OSC_UCX_ATTACH_MAX); + + temp_dynamic_wins = (ompi_osc_dynamic_win_info_t *)(temp_buf + sizeof(uint64_t)); + contain = ompi_osc_find_attached_region_position(temp_dynamic_wins, 0, win_count, + remote_addr, 1, &insert); + assert(contain >= 0 && contain < win_count); + + status = ucp_ep_rkey_unpack(ep, temp_dynamic_wins[contain].rkey_buffer, + &((module->win_info_array[target]).rkey)); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_rkey_unpack failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + (module->win_info_array[target]).rkey_init = true; + + free(temp_buf); + + return status; +} + int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt, int target, ptrdiff_t target_disp, int target_count, struct ompi_datatype_t *target_dt, struct ompi_win_t *win) { ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); - ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + ucp_rkey_h rkey; bool is_origin_contig = false, is_target_contig = false; ptrdiff_t origin_lb, origin_extent, target_lb, target_extent; ucs_status_t status; @@ -342,6 +397,15 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent); ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent); @@ -378,7 +442,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); - ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + ucp_rkey_h rkey; ptrdiff_t origin_lb, origin_extent, target_lb, target_extent; bool is_origin_contig = false, is_target_contig = false; ucs_status_t status; @@ -389,6 +453,15 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count, return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent); ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent); @@ -557,10 +630,11 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module; ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); - ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + ucp_rkey_h rkey; size_t dt_bytes; ompi_osc_ucx_internal_request_t *req = NULL; int ret = OMPI_SUCCESS; + ucs_status_t status; ret = check_sync_state(module, target, false); if (ret != OMPI_SUCCESS) { @@ -572,6 +646,15 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + ompi_datatype_type_size(dt, &dt_bytes); memcpy(result_addr, origin_addr, dt_bytes); req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, *(uint64_t *)compare_addr, @@ -604,17 +687,27 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr, op == &ompi_mpi_op_sum.op) { ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target); - ucp_rkey_h rkey = (module->win_info_array[target]).rkey; + ucp_rkey_h rkey; uint64_t value = *(uint64_t *)origin_addr; ucp_atomic_fetch_op_t opcode; size_t dt_bytes; ompi_osc_ucx_internal_request_t *req = NULL; + ucs_status_t status; ret = start_atomicity(module, ep, target); if (ret != OMPI_SUCCESS) { return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + ompi_datatype_type_size(dt, &dt_bytes); if (op == &ompi_mpi_op_replace.op) { @@ -789,7 +882,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET; - ucp_rkey_h rkey = (module->state_info_array[target]).rkey; + ucp_rkey_h rkey; ompi_osc_ucx_request_t *ucx_req = NULL; ompi_osc_ucx_internal_request_t *internal_req = NULL; ucs_status_t status; @@ -800,6 +893,15 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count, return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); if (NULL == ucx_req) { return OMPI_ERR_TEMP_OUT_OF_RESOURCE; @@ -843,7 +945,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count, ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target); uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET; - ucp_rkey_h rkey = (module->state_info_array[target]).rkey; + ucp_rkey_h rkey; ompi_osc_ucx_request_t *ucx_req = NULL; ompi_osc_ucx_internal_request_t *internal_req = NULL; ucs_status_t status; @@ -854,6 +956,15 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count, return ret; } + if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { + status = get_dynamic_win_info(remote_addr, module, ep, target); + if (status != UCS_OK) { + return OMPI_ERROR; + } + } + + rkey = (module->win_info_array[target]).rkey; + OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req); if (NULL == ucx_req) { return OMPI_ERR_TEMP_OUT_OF_RESOURCE; diff --git a/ompi/mca/osc/ucx/osc_ucx_component.c b/ompi/mca/osc/ucx/osc_ucx_component.c index e339824f0e..5eb7888aeb 100644 --- a/ompi/mca/osc/ucx/osc_ucx_component.c +++ b/ompi/mca/osc/ucx/osc_ucx_component.c @@ -243,7 +243,10 @@ static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr, ucs_status_t status; int ret = OMPI_SUCCESS; - assert(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE); + if (!(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) + || size == 0) { + return ret; + } memset(&mem_params, 0, sizeof(ucp_mem_map_params_t)); mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | @@ -312,6 +315,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in size_t my_info_len; int disps[comm_size]; int rkey_sizes[comm_size]; + uint64_t zero = 0; /* the osc/sm component is the exclusive provider for support for * shared memory windows */ @@ -376,10 +380,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in goto error; } + *model = MPI_WIN_UNIFIED; asprintf(&name, "ucx window %d", ompi_comm_get_cid(module->comm)); ompi_win_set_name(win, name); free(name); + module->flavor = flavor; + module->size = size; + /* share everyone's displacement units. Only do an allgather if strictly necessary, since it requires O(p) state. */ values[0] = disp_unit; @@ -497,14 +505,18 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in goto error; } - status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh, - &rkey_buffer, &rkey_buffer_size); - if (status != UCS_OK) { - opal_output_verbose(1, ompi_osc_base_framework.framework_output, - "%s:%d: ucp_rkey_pack failed: %d\n", - __FILE__, __LINE__, status); - ret = OMPI_ERROR; - goto error; + if (size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) { + status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh, + &rkey_buffer, &rkey_buffer_size); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_rkey_pack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + } else { + rkey_buffer_size = 0; } status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->state_memh, @@ -524,7 +536,11 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in goto error; } - memcpy(my_info, base, sizeof(uint64_t)); + if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) { + memcpy(my_info, base, sizeof(uint64_t)); + } else { + memcpy(my_info, &zero, sizeof(uint64_t)); + } memcpy((void *)((char *)my_info + sizeof(uint64_t)), &state_base, sizeof(uint64_t)); memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t)), rkey_buffer, rkey_buffer_size); memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t) + rkey_buffer_size), @@ -550,14 +566,18 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in memcpy(&(module->state_info_array[i]).addr, &recv_buf[disps[i] + sizeof(uint64_t)], sizeof(uint64_t)); - status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]), - &((module->win_info_array[i]).rkey)); - if (status != UCS_OK) { - opal_output_verbose(1, ompi_osc_base_framework.framework_output, - "%s:%d: ucp_ep_rkey_unpack failed: %d\n", - __FILE__, __LINE__, status); - ret = OMPI_ERROR; - goto error; + (module->win_info_array[i]).rkey_init = false; + if (size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) { + status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]), + &((module->win_info_array[i]).rkey)); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_ep_rkey_unpack failed: %d\n", + __FILE__, __LINE__, status); + ret = OMPI_ERROR; + goto error; + } + (module->win_info_array[i]).rkey_init = true; } status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]), @@ -569,12 +589,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in ret = OMPI_ERROR; goto error; } + (module->state_info_array[i]).rkey_init = true; } free(my_info); free(recv_buf); - ucp_rkey_buffer_release(rkey_buffer); + if (rkey_buffer_size != 0) { + ucp_rkey_buffer_release(rkey_buffer); + } ucp_rkey_buffer_release(state_rkey_buffer); module->state.lock = TARGET_LOCK_UNLOCKED; @@ -583,6 +606,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in module->state.complete_count = 0; module->state.req_flag = 0; module->state.acc_lock = TARGET_LOCK_UNLOCKED; + module->state.dynamic_win_count = 0; + for (i = 0; i < OMPI_OSC_UCX_ATTACH_MAX; i++) { + module->local_dynamic_win_info[i].refcnt = 0; + } module->epoch_type.access = NONE_EPOCH; module->epoch_type.exposure = NONE_EPOCH; module->lock_count = 0; @@ -643,11 +670,116 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in return ret; } +int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins, + int min_index, int max_index, + uint64_t base, size_t len, int *insert) { + int mid_index = (max_index + min_index) >> 1; + + if (min_index > max_index) { + (*insert) = min_index; + return -1; + } + + if (dynamic_wins[mid_index].base > base) { + return ompi_osc_find_attached_region_position(dynamic_wins, min_index, mid_index-1, + base, len, insert); + } else if (base + len < dynamic_wins[mid_index].base + dynamic_wins[mid_index].size) { + return mid_index; + } else { + return ompi_osc_find_attached_region_position(dynamic_wins, mid_index+1, max_index, + base, len, insert); + } +} + int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) { - return OMPI_SUCCESS; + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int insert_index = -1, contain_index; + void *rkey_buffer; + size_t rkey_buffer_size; + int ret = OMPI_SUCCESS; + ucs_status_t status; + + if (module->state.dynamic_win_count >= OMPI_OSC_UCX_ATTACH_MAX) { + return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + } + + if (module->state.dynamic_win_count > 0) { + contain_index = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins, + 0, (int)module->state.dynamic_win_count, + (uint64_t)base, len, &insert_index); + if (contain_index >= 0) { + module->local_dynamic_win_info[contain_index].refcnt++; + return ret; + } + + assert(insert_index >= 0 && insert_index < module->state.dynamic_win_count); + + memmove((void *)&module->local_dynamic_win_info[insert_index+1], + (void *)&module->local_dynamic_win_info[insert_index], + (OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t)); + memmove((void *)&module->state.dynamic_wins[insert_index+1], + (void *)&module->state.dynamic_wins[insert_index], + (OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_dynamic_win_info_t)); + } else { + insert_index = 0; + } + + ret = mem_map(&base, len, &(module->local_dynamic_win_info[insert_index].memh), + module, MPI_WIN_FLAVOR_CREATE); + if (ret != OMPI_SUCCESS) { + return ret; + } + + module->state.dynamic_wins[insert_index].base = (uint64_t)base; + module->state.dynamic_wins[insert_index].size = len; + + status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, + module->local_dynamic_win_info[insert_index].memh, + &rkey_buffer, (size_t *)&rkey_buffer_size); + if (status != UCS_OK) { + opal_output_verbose(1, ompi_osc_base_framework.framework_output, + "%s:%d: ucp_rkey_pack failed: %d\n", + __FILE__, __LINE__, status); + return OMPI_ERROR; + } + + assert(rkey_buffer_size <= OMPI_OSC_UCX_RKEY_BUF_MAX); + memcpy((char *)(module->state.dynamic_wins[insert_index].rkey_buffer), + (char *)rkey_buffer, rkey_buffer_size); + + module->local_dynamic_win_info[insert_index].refcnt++; + module->state.dynamic_win_count++; + + ucp_rkey_buffer_release(rkey_buffer); + + return ret; } int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) { + ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module; + int insert, contain; + + assert(module->state.dynamic_win_count > 0); + + contain = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins, + 0, (int)module->state.dynamic_win_count, + (uint64_t)base, 1, &insert); + assert(contain >= 0 && contain < module->state.dynamic_win_count); + + module->local_dynamic_win_info[contain].refcnt--; + if (module->local_dynamic_win_info[contain].refcnt == 0) { + ucp_mem_unmap(mca_osc_ucx_component.ucp_context, + module->local_dynamic_win_info[contain].memh); + memmove((void *)&(module->local_dynamic_win_info[contain]), + (void *)&(module->local_dynamic_win_info[contain+1]), + (OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t)); + memmove((void *)&module->state.dynamic_wins[contain], + (void *)&module->state.dynamic_wins[contain+1], + (OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_dynamic_win_info_t)); + + module->state.dynamic_win_count--; + } + return OMPI_SUCCESS; } @@ -679,7 +811,10 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) { module->comm->c_coll->coll_barrier_module); for (i = 0; i < ompi_comm_size(module->comm); i++) { - ucp_rkey_destroy((module->win_info_array[i]).rkey); + if ((module->win_info_array[i]).rkey_init == true) { + ucp_rkey_destroy((module->win_info_array[i]).rkey); + (module->win_info_array[i]).rkey_init == false; + } ucp_rkey_destroy((module->state_info_array[i]).rkey); } free(module->win_info_array); @@ -687,7 +822,10 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) { free(module->per_target_ops_nums); - ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh); + if ((module->flavor == MPI_WIN_FLAVOR_ALLOCATE || module->flavor == MPI_WIN_FLAVOR_CREATE) + && module->size > 0) { + ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh); + } ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->state_memh); if (module->disp_units) free(module->disp_units);