Merge pull request #5094 from xinzhao3/topic/osc-win-fix-master
OMPI/OSC/UCX: fix issue in impl of MPI_Win_create_dynamic/MPI_Win_attach/MPI_Win_detach
Этот коммит содержится в:
Коммит
32ddc6af7e
@ -16,10 +16,13 @@
|
||||
#include "ompi/communicator/communicator.h"
|
||||
|
||||
#define OMPI_OSC_UCX_POST_PEER_MAX 32
|
||||
#define OMPI_OSC_UCX_ATTACH_MAX 32
|
||||
#define OMPI_OSC_UCX_RKEY_BUF_MAX 1024
|
||||
|
||||
typedef struct ompi_osc_ucx_win_info {
|
||||
ucp_rkey_h rkey;
|
||||
uint64_t addr;
|
||||
bool rkey_init;
|
||||
} ompi_osc_ucx_win_info_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_component {
|
||||
@ -60,6 +63,18 @@ typedef struct ompi_osc_ucx_epoch_type {
|
||||
#define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3)
|
||||
#define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4)
|
||||
#define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5)
|
||||
#define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (5 + OMPI_OSC_UCX_POST_PEER_MAX))
|
||||
|
||||
typedef struct ompi_osc_dynamic_win_info {
|
||||
uint64_t base;
|
||||
size_t size;
|
||||
char rkey_buffer[OMPI_OSC_UCX_RKEY_BUF_MAX];
|
||||
} ompi_osc_dynamic_win_info_t;
|
||||
|
||||
typedef struct ompi_osc_local_dynamic_win_info {
|
||||
ucp_mem_h memh;
|
||||
int refcnt;
|
||||
} ompi_osc_local_dynamic_win_info_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_state {
|
||||
volatile uint64_t lock;
|
||||
@ -68,12 +83,16 @@ typedef struct ompi_osc_ucx_state {
|
||||
volatile uint64_t complete_count; /* # msgs received from complete processes */
|
||||
volatile uint64_t post_index;
|
||||
volatile uint64_t post_state[OMPI_OSC_UCX_POST_PEER_MAX];
|
||||
volatile uint64_t dynamic_win_count;
|
||||
volatile ompi_osc_dynamic_win_info_t dynamic_wins[OMPI_OSC_UCX_ATTACH_MAX];
|
||||
} ompi_osc_ucx_state_t;
|
||||
|
||||
typedef struct ompi_osc_ucx_module {
|
||||
ompi_osc_base_module_t super;
|
||||
struct ompi_communicator_t *comm;
|
||||
ucp_mem_h memh; /* remote accessible memory */
|
||||
int flavor;
|
||||
size_t size;
|
||||
ucp_mem_h state_memh;
|
||||
ompi_osc_ucx_win_info_t *win_info_array;
|
||||
ompi_osc_ucx_win_info_t *state_info_array;
|
||||
@ -83,6 +102,7 @@ typedef struct ompi_osc_ucx_module {
|
||||
int *disp_units;
|
||||
|
||||
ompi_osc_ucx_state_t state; /* remote accessible flags */
|
||||
ompi_osc_local_dynamic_win_info_t local_dynamic_win_info[OMPI_OSC_UCX_ATTACH_MAX];
|
||||
ompi_osc_ucx_epoch_type_t epoch_type;
|
||||
ompi_group_t *start_group;
|
||||
ompi_group_t *post_group;
|
||||
@ -185,6 +205,10 @@ int ompi_osc_ucx_flush_all(struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush_local(int target, struct ompi_win_t *win);
|
||||
int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
|
||||
|
||||
int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
|
||||
int min_index, int max_index,
|
||||
uint64_t base, size_t len, int *insert);
|
||||
|
||||
void req_completion(void *request, ucs_status_t status);
|
||||
void internal_req_init(void *request);
|
||||
|
||||
|
@ -325,13 +325,68 @@ static inline int end_atomicity(ompi_osc_ucx_module_t *module, ucp_ep_h ep, int
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
|
||||
ucp_ep_h ep, int target) {
|
||||
ucp_rkey_h state_rkey = (module->state_info_array)[target].rkey;
|
||||
uint64_t remote_state_addr = (module->state_info_array)[target].addr + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
|
||||
size_t len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX;
|
||||
char *temp_buf = malloc(len);
|
||||
ompi_osc_dynamic_win_info_t *temp_dynamic_wins;
|
||||
int win_count, contain, insert = -1;
|
||||
ucs_status_t status;
|
||||
|
||||
if ((module->win_info_array[target]).rkey_init == true) {
|
||||
ucp_rkey_destroy((module->win_info_array[target]).rkey);
|
||||
(module->win_info_array[target]).rkey_init == false;
|
||||
}
|
||||
|
||||
status = ucp_get_nbi(ep, (void *)temp_buf, len, remote_state_addr, state_rkey);
|
||||
if (status != UCS_OK && status != UCS_INPROGRESS) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_get_nbi failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
status = ucp_ep_flush(ep);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_flush failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
memcpy(&win_count, temp_buf, sizeof(uint64_t));
|
||||
assert(win_count > 0 && win_count <= OMPI_OSC_UCX_ATTACH_MAX);
|
||||
|
||||
temp_dynamic_wins = (ompi_osc_dynamic_win_info_t *)(temp_buf + sizeof(uint64_t));
|
||||
contain = ompi_osc_find_attached_region_position(temp_dynamic_wins, 0, win_count,
|
||||
remote_addr, 1, &insert);
|
||||
assert(contain >= 0 && contain < win_count);
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, temp_dynamic_wins[contain].rkey_buffer,
|
||||
&((module->win_info_array[target]).rkey));
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
(module->win_info_array[target]).rkey_init = true;
|
||||
|
||||
free(temp_buf);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
|
||||
int target, ptrdiff_t target_disp, int target_count,
|
||||
struct ompi_datatype_t *target_dt, struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
bool is_origin_contig = false, is_target_contig = false;
|
||||
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
|
||||
ucs_status_t status;
|
||||
@ -342,6 +397,15 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
|
||||
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);
|
||||
|
||||
@ -378,7 +442,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
|
||||
bool is_origin_contig = false, is_target_contig = false;
|
||||
ucs_status_t status;
|
||||
@ -389,6 +453,15 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
|
||||
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);
|
||||
|
||||
@ -557,10 +630,11 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
size_t dt_bytes;
|
||||
ompi_osc_ucx_internal_request_t *req = NULL;
|
||||
int ret = OMPI_SUCCESS;
|
||||
ucs_status_t status;
|
||||
|
||||
ret = check_sync_state(module, target, false);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
@ -572,6 +646,15 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
memcpy(result_addr, origin_addr, dt_bytes);
|
||||
req = ucp_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, *(uint64_t *)compare_addr,
|
||||
@ -604,17 +687,27 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
|
||||
op == &ompi_mpi_op_sum.op) {
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->win_info_array[target]).addr + target_disp * OSC_UCX_GET_DISP(module, target);
|
||||
ucp_rkey_h rkey = (module->win_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
uint64_t value = *(uint64_t *)origin_addr;
|
||||
ucp_atomic_fetch_op_t opcode;
|
||||
size_t dt_bytes;
|
||||
ompi_osc_ucx_internal_request_t *req = NULL;
|
||||
ucs_status_t status;
|
||||
|
||||
ret = start_atomicity(module, ep, target);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
ompi_datatype_type_size(dt, &dt_bytes);
|
||||
|
||||
if (op == &ompi_mpi_op_replace.op) {
|
||||
@ -789,7 +882,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET;
|
||||
ucp_rkey_h rkey = (module->state_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
ompi_osc_ucx_internal_request_t *internal_req = NULL;
|
||||
ucs_status_t status;
|
||||
@ -800,6 +893,15 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
@ -843,7 +945,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
ucp_ep_h ep = OSC_UCX_GET_EP(module->comm, target);
|
||||
uint64_t remote_addr = (module->state_info_array[target]).addr + OSC_UCX_STATE_REQ_FLAG_OFFSET;
|
||||
ucp_rkey_h rkey = (module->state_info_array[target]).rkey;
|
||||
ucp_rkey_h rkey;
|
||||
ompi_osc_ucx_request_t *ucx_req = NULL;
|
||||
ompi_osc_ucx_internal_request_t *internal_req = NULL;
|
||||
ucs_status_t status;
|
||||
@ -854,6 +956,15 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {
|
||||
status = get_dynamic_win_info(remote_addr, module, ep, target);
|
||||
if (status != UCS_OK) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
|
@ -252,7 +252,10 @@ static inline int mem_map(void **base, size_t size, ucp_mem_h *memh_ptr,
|
||||
ucs_status_t status;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
assert(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE);
|
||||
if (!(flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)
|
||||
|| size == 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
memset(&mem_params, 0, sizeof(ucp_mem_map_params_t));
|
||||
mem_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
|
||||
@ -321,6 +324,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
size_t my_info_len;
|
||||
int disps[comm_size];
|
||||
int rkey_sizes[comm_size];
|
||||
uint64_t zero = 0;
|
||||
|
||||
/* the osc/sm component is the exclusive provider for support for
|
||||
* shared memory windows */
|
||||
@ -385,10 +389,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
goto error;
|
||||
}
|
||||
|
||||
*model = MPI_WIN_UNIFIED;
|
||||
asprintf(&name, "ucx window %d", ompi_comm_get_cid(module->comm));
|
||||
ompi_win_set_name(win, name);
|
||||
free(name);
|
||||
|
||||
module->flavor = flavor;
|
||||
module->size = size;
|
||||
|
||||
/* share everyone's displacement units. Only do an allgather if
|
||||
strictly necessary, since it requires O(p) state. */
|
||||
values[0] = disp_unit;
|
||||
@ -506,14 +514,18 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
goto error;
|
||||
}
|
||||
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh,
|
||||
&rkey_buffer, &rkey_buffer_size);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_rkey_pack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
if (size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) {
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->memh,
|
||||
&rkey_buffer, &rkey_buffer_size);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_rkey_pack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
} else {
|
||||
rkey_buffer_size = 0;
|
||||
}
|
||||
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context, module->state_memh,
|
||||
@ -533,7 +545,11 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
goto error;
|
||||
}
|
||||
|
||||
memcpy(my_info, base, sizeof(uint64_t));
|
||||
if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE) {
|
||||
memcpy(my_info, base, sizeof(uint64_t));
|
||||
} else {
|
||||
memcpy(my_info, &zero, sizeof(uint64_t));
|
||||
}
|
||||
memcpy((void *)((char *)my_info + sizeof(uint64_t)), &state_base, sizeof(uint64_t));
|
||||
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t)), rkey_buffer, rkey_buffer_size);
|
||||
memcpy((void *)((char *)my_info + 2 * sizeof(uint64_t) + rkey_buffer_size),
|
||||
@ -559,14 +575,18 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
memcpy(&(module->state_info_array[i]).addr, &recv_buf[disps[i] + sizeof(uint64_t)],
|
||||
sizeof(uint64_t));
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]),
|
||||
&((module->win_info_array[i]).rkey));
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
(module->win_info_array[i]).rkey_init = false;
|
||||
if (size > 0 && (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE)) {
|
||||
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t)]),
|
||||
&((module->win_info_array[i]).rkey));
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_ep_rkey_unpack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
(module->win_info_array[i]).rkey_init = true;
|
||||
}
|
||||
|
||||
status = ucp_ep_rkey_unpack(ep, &(recv_buf[disps[i] + 2 * sizeof(uint64_t) + rkey_sizes[i]]),
|
||||
@ -578,12 +598,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
}
|
||||
(module->state_info_array[i]).rkey_init = true;
|
||||
}
|
||||
|
||||
free(my_info);
|
||||
free(recv_buf);
|
||||
|
||||
ucp_rkey_buffer_release(rkey_buffer);
|
||||
if (rkey_buffer_size != 0) {
|
||||
ucp_rkey_buffer_release(rkey_buffer);
|
||||
}
|
||||
ucp_rkey_buffer_release(state_rkey_buffer);
|
||||
|
||||
module->state.lock = TARGET_LOCK_UNLOCKED;
|
||||
@ -592,6 +615,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
module->state.complete_count = 0;
|
||||
module->state.req_flag = 0;
|
||||
module->state.acc_lock = TARGET_LOCK_UNLOCKED;
|
||||
module->state.dynamic_win_count = 0;
|
||||
for (i = 0; i < OMPI_OSC_UCX_ATTACH_MAX; i++) {
|
||||
module->local_dynamic_win_info[i].refcnt = 0;
|
||||
}
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
module->epoch_type.exposure = NONE_EPOCH;
|
||||
module->lock_count = 0;
|
||||
@ -652,11 +679,116 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
|
||||
int min_index, int max_index,
|
||||
uint64_t base, size_t len, int *insert) {
|
||||
int mid_index = (max_index + min_index) >> 1;
|
||||
|
||||
if (min_index > max_index) {
|
||||
(*insert) = min_index;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dynamic_wins[mid_index].base > base) {
|
||||
return ompi_osc_find_attached_region_position(dynamic_wins, min_index, mid_index-1,
|
||||
base, len, insert);
|
||||
} else if (base + len < dynamic_wins[mid_index].base + dynamic_wins[mid_index].size) {
|
||||
return mid_index;
|
||||
} else {
|
||||
return ompi_osc_find_attached_region_position(dynamic_wins, mid_index+1, max_index,
|
||||
base, len, insert);
|
||||
}
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_win_attach(struct ompi_win_t *win, void *base, size_t len) {
|
||||
return OMPI_SUCCESS;
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int insert_index = -1, contain_index;
|
||||
void *rkey_buffer;
|
||||
size_t rkey_buffer_size;
|
||||
int ret = OMPI_SUCCESS;
|
||||
ucs_status_t status;
|
||||
|
||||
if (module->state.dynamic_win_count >= OMPI_OSC_UCX_ATTACH_MAX) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
if (module->state.dynamic_win_count > 0) {
|
||||
contain_index = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins,
|
||||
0, (int)module->state.dynamic_win_count,
|
||||
(uint64_t)base, len, &insert_index);
|
||||
if (contain_index >= 0) {
|
||||
module->local_dynamic_win_info[contain_index].refcnt++;
|
||||
return ret;
|
||||
}
|
||||
|
||||
assert(insert_index >= 0 && insert_index < module->state.dynamic_win_count);
|
||||
|
||||
memmove((void *)&module->local_dynamic_win_info[insert_index+1],
|
||||
(void *)&module->local_dynamic_win_info[insert_index],
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
|
||||
memmove((void *)&module->state.dynamic_wins[insert_index+1],
|
||||
(void *)&module->state.dynamic_wins[insert_index],
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (insert_index + 1)) * sizeof(ompi_osc_dynamic_win_info_t));
|
||||
} else {
|
||||
insert_index = 0;
|
||||
}
|
||||
|
||||
ret = mem_map(&base, len, &(module->local_dynamic_win_info[insert_index].memh),
|
||||
module, MPI_WIN_FLAVOR_CREATE);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
module->state.dynamic_wins[insert_index].base = (uint64_t)base;
|
||||
module->state.dynamic_wins[insert_index].size = len;
|
||||
|
||||
status = ucp_rkey_pack(mca_osc_ucx_component.ucp_context,
|
||||
module->local_dynamic_win_info[insert_index].memh,
|
||||
&rkey_buffer, (size_t *)&rkey_buffer_size);
|
||||
if (status != UCS_OK) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ucp_rkey_pack failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
assert(rkey_buffer_size <= OMPI_OSC_UCX_RKEY_BUF_MAX);
|
||||
memcpy((char *)(module->state.dynamic_wins[insert_index].rkey_buffer),
|
||||
(char *)rkey_buffer, rkey_buffer_size);
|
||||
|
||||
module->local_dynamic_win_info[insert_index].refcnt++;
|
||||
module->state.dynamic_win_count++;
|
||||
|
||||
ucp_rkey_buffer_release(rkey_buffer);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int insert, contain;
|
||||
|
||||
assert(module->state.dynamic_win_count > 0);
|
||||
|
||||
contain = ompi_osc_find_attached_region_position((ompi_osc_dynamic_win_info_t *)module->state.dynamic_wins,
|
||||
0, (int)module->state.dynamic_win_count,
|
||||
(uint64_t)base, 1, &insert);
|
||||
assert(contain >= 0 && contain < module->state.dynamic_win_count);
|
||||
|
||||
module->local_dynamic_win_info[contain].refcnt--;
|
||||
if (module->local_dynamic_win_info[contain].refcnt == 0) {
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context,
|
||||
module->local_dynamic_win_info[contain].memh);
|
||||
memmove((void *)&(module->local_dynamic_win_info[contain]),
|
||||
(void *)&(module->local_dynamic_win_info[contain+1]),
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_local_dynamic_win_info_t));
|
||||
memmove((void *)&module->state.dynamic_wins[contain],
|
||||
(void *)&module->state.dynamic_wins[contain+1],
|
||||
(OMPI_OSC_UCX_ATTACH_MAX - (contain + 1)) * sizeof(ompi_osc_dynamic_win_info_t));
|
||||
|
||||
module->state.dynamic_win_count--;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -688,7 +820,10 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
module->comm->c_coll->coll_barrier_module);
|
||||
|
||||
for (i = 0; i < ompi_comm_size(module->comm); i++) {
|
||||
ucp_rkey_destroy((module->win_info_array[i]).rkey);
|
||||
if ((module->win_info_array[i]).rkey_init == true) {
|
||||
ucp_rkey_destroy((module->win_info_array[i]).rkey);
|
||||
(module->win_info_array[i]).rkey_init == false;
|
||||
}
|
||||
ucp_rkey_destroy((module->state_info_array[i]).rkey);
|
||||
}
|
||||
free(module->win_info_array);
|
||||
@ -696,7 +831,10 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
|
||||
free(module->per_target_ops_nums);
|
||||
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh);
|
||||
if ((module->flavor == MPI_WIN_FLAVOR_ALLOCATE || module->flavor == MPI_WIN_FLAVOR_CREATE)
|
||||
&& module->size > 0) {
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->memh);
|
||||
}
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context, module->state_memh);
|
||||
|
||||
if (module->disp_units) free(module->disp_units);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user