UCX/PML/SPML: fixed few coverity issues
- fixed incorrect pointer manipulation/free - cleaned dead code - minor optimization on process delete routine - fixed error handling - free pointers - added debug output for woker flush failure Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
Этот коммит содержится в:
родитель
abb87f9137
Коммит
502d04bf12
@ -338,7 +338,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
|
||||
|
||||
if ((module->win_info_array[target]).rkey_init == true) {
|
||||
ucp_rkey_destroy((module->win_info_array[target]).rkey);
|
||||
(module->win_info_array[target]).rkey_init == false;
|
||||
(module->win_info_array[target]).rkey_init = false;
|
||||
}
|
||||
|
||||
status = ucp_get_nbi(ep, (void *)temp_buf, len, remote_state_addr, state_rkey);
|
||||
@ -523,6 +523,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr_holder = NULL;
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
@ -540,7 +541,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = malloc(temp_extent * temp_count);
|
||||
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -616,7 +617,7 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
free(temp_addr);
|
||||
free(temp_addr_holder);
|
||||
}
|
||||
|
||||
ret = end_atomicity(module, ep, target);
|
||||
@ -774,6 +775,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
return ret;
|
||||
}
|
||||
} else {
|
||||
void *temp_addr_holder = NULL;
|
||||
void *temp_addr = NULL;
|
||||
uint32_t temp_count;
|
||||
ompi_datatype_t *temp_dt;
|
||||
@ -791,7 +793,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
}
|
||||
}
|
||||
ompi_datatype_get_true_extent(temp_dt, &temp_lb, &temp_extent);
|
||||
temp_addr = malloc(temp_extent * temp_count);
|
||||
temp_addr = temp_addr_holder = malloc(temp_extent * temp_count);
|
||||
if (temp_addr == NULL) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
@ -866,7 +868,7 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
free(temp_addr);
|
||||
free(temp_addr_holder);
|
||||
}
|
||||
}
|
||||
|
||||
@ -904,9 +906,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
assert(NULL != ucx_req);
|
||||
|
||||
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
@ -967,9 +967,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
|
||||
rkey = (module->win_info_array[target]).rkey;
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
assert(NULL != ucx_req);
|
||||
|
||||
ret = ompi_osc_ucx_get(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, win);
|
||||
@ -1016,9 +1014,7 @@ int ompi_osc_ucx_raccumulate(const void *origin_addr, int origin_count,
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
assert(NULL != ucx_req);
|
||||
|
||||
ret = ompi_osc_ucx_accumulate(origin_addr, origin_count, origin_dt, target, target_disp,
|
||||
target_count, target_dt, op, win);
|
||||
@ -1050,9 +1046,7 @@ int ompi_osc_ucx_rget_accumulate(const void *origin_addr, int origin_count,
|
||||
}
|
||||
|
||||
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
|
||||
if (NULL == ucx_req) {
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
assert(NULL != ucx_req);
|
||||
|
||||
ret = ompi_osc_ucx_get_accumulate(origin_addr, origin_count, origin_datatype,
|
||||
result_addr, result_count, result_datatype,
|
||||
|
@ -149,7 +149,7 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
|
||||
|
||||
/* initialize UCP context */
|
||||
|
||||
memset(&context_params, 0, sizeof(ucp_context_h));
|
||||
memset(&context_params, 0, sizeof(context_params));
|
||||
context_params.field_mask = UCP_PARAM_FIELD_FEATURES |
|
||||
UCP_PARAM_FIELD_MT_WORKERS_SHARED |
|
||||
UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
|
||||
@ -329,7 +329,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
ucp_worker_params_t worker_params;
|
||||
ucp_worker_attr_t worker_attr;
|
||||
|
||||
memset(&worker_params, 0, sizeof(ucp_worker_h));
|
||||
memset(&worker_params, 0, sizeof(worker_params));
|
||||
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
|
||||
worker_params.thread_mode = (mca_osc_ucx_component.enable_mpi_threads == true)
|
||||
? UCS_THREAD_MODE_MULTI : UCS_THREAD_MODE_SINGLE;
|
||||
@ -340,7 +340,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
"%s:%d: ucp_worker_create failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
goto error_nomem;
|
||||
}
|
||||
|
||||
ret = opal_progress_register(progress_callback);
|
||||
@ -360,7 +360,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
"%s:%d: ucp_worker_query failed: %d\n",
|
||||
__FILE__, __LINE__, status);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
goto error_nomem;
|
||||
}
|
||||
|
||||
if (mca_osc_ucx_component.enable_mpi_threads == true &&
|
||||
@ -369,7 +369,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
"%s:%d: ucx does not support multithreading\n",
|
||||
__FILE__, __LINE__);
|
||||
ret = OMPI_ERROR;
|
||||
goto error;
|
||||
goto error_nomem;
|
||||
}
|
||||
|
||||
worker_created = true;
|
||||
@ -379,7 +379,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
module = (ompi_osc_ucx_module_t *)calloc(1, sizeof(ompi_osc_ucx_module_t));
|
||||
if (module == NULL) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto error;
|
||||
goto error_nomem;
|
||||
}
|
||||
|
||||
/* fill in the function pointer part */
|
||||
@ -676,8 +676,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
|
||||
}
|
||||
}
|
||||
if (progress_registered) opal_progress_unregister(progress_callback);
|
||||
if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
|
||||
if (module) free(module);
|
||||
|
||||
error_nomem:
|
||||
if (worker_created) ucp_worker_destroy(mca_osc_ucx_component.ucp_worker);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -777,6 +779,11 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
(uint64_t)base, 1, &insert);
|
||||
assert(contain >= 0 && contain < module->state.dynamic_win_count);
|
||||
|
||||
/* if we can't find region - just exit */
|
||||
if (contain < 0) {
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
module->local_dynamic_win_info[contain].refcnt--;
|
||||
if (module->local_dynamic_win_info[contain].refcnt == 0) {
|
||||
ucp_mem_unmap(mca_osc_ucx_component.ucp_context,
|
||||
@ -796,16 +803,7 @@ int ompi_osc_ucx_win_detach(struct ompi_win_t *win, const void *base) {
|
||||
|
||||
int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
|
||||
int i, ret = OMPI_SUCCESS;
|
||||
|
||||
if ((module->epoch_type.access != NONE_EPOCH && module->epoch_type.access != FENCE_EPOCH)
|
||||
|| module->epoch_type.exposure != NONE_EPOCH) {
|
||||
ret = OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
if (module->start_group != NULL || module->post_group != NULL) {
|
||||
ret = OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
int i, ret;
|
||||
|
||||
assert(module->global_ops_num == 0);
|
||||
assert(module->lock_count == 0);
|
||||
@ -824,7 +822,7 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
|
||||
for (i = 0; i < ompi_comm_size(module->comm); i++) {
|
||||
if ((module->win_info_array[i]).rkey_init == true) {
|
||||
ucp_rkey_destroy((module->win_info_array[i]).rkey);
|
||||
(module->win_info_array[i]).rkey_init == false;
|
||||
(module->win_info_array[i]).rkey_init = false;
|
||||
}
|
||||
ucp_rkey_destroy((module->state_info_array[i]).rkey);
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
|
||||
__FILE__, __LINE__, status);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
assert(result_value >= 0);
|
||||
assert((int64_t)result_value >= 0);
|
||||
if (result_value >= TARGET_LOCK_EXCLUSIVE) {
|
||||
status = ucp_atomic_post(ep, UCP_ATOMIC_POST_OP_ADD, (-1), sizeof(uint64_t),
|
||||
remote_addr, rkey);
|
||||
@ -245,12 +245,8 @@ int ompi_osc_ucx_lock_all(int assert, struct ompi_win_t *win) {
|
||||
} else {
|
||||
module->lock_all_is_nocheck = true;
|
||||
}
|
||||
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
module->epoch_type.access = NONE_EPOCH;
|
||||
}
|
||||
|
||||
return ret;
|
||||
assert(OMPI_SUCCESS == ret);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int ompi_osc_ucx_unlock_all(struct ompi_win_t *win) {
|
||||
|
@ -367,12 +367,12 @@ static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void mca_pml_ucx_waitall(void **reqs, size_t *count_p)
|
||||
static void mca_pml_ucx_waitall(void **reqs, int *count_p)
|
||||
{
|
||||
ucs_status_t status;
|
||||
size_t i;
|
||||
int i;
|
||||
|
||||
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", (int)*count_p);
|
||||
PML_UCX_VERBOSE(2, "waiting for %d disconnect requests", *count_p);
|
||||
for (i = 0; i < *count_p; ++i) {
|
||||
do {
|
||||
opal_progress();
|
||||
@ -398,7 +398,8 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
{
|
||||
volatile int fenced = 0;
|
||||
ompi_proc_t *proc;
|
||||
size_t num_reqs, max_reqs;
|
||||
int num_reqs;
|
||||
size_t max_reqs;
|
||||
void *dreq, **dreqs;
|
||||
ucp_ep_h ep;
|
||||
size_t i;
|
||||
@ -422,6 +423,8 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
continue;
|
||||
}
|
||||
|
||||
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
|
||||
|
||||
PML_UCX_VERBOSE(2, "disconnecting from rank %d", proc->super.proc_name.vpid);
|
||||
dreq = ucp_disconnect_nb(ep);
|
||||
if (dreq != NULL) {
|
||||
@ -429,18 +432,18 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
PML_UCX_ERROR("ucp_disconnect_nb(%d) failed: %s",
|
||||
proc->super.proc_name.vpid,
|
||||
ucs_status_string(UCS_PTR_STATUS(dreq)));
|
||||
continue;
|
||||
} else {
|
||||
dreqs[num_reqs++] = dreq;
|
||||
if (num_reqs >= ompi_pml_ucx.num_disconnect) {
|
||||
mca_pml_ucx_waitall(dreqs, &num_reqs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL;
|
||||
|
||||
if ((int)num_reqs >= ompi_pml_ucx.num_disconnect) {
|
||||
mca_pml_ucx_waitall(dreqs, &num_reqs);
|
||||
}
|
||||
}
|
||||
|
||||
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
|
||||
* so suppress coverity warning */
|
||||
/* coverity[uninit_use_in_call] */
|
||||
mca_pml_ucx_waitall(dreqs, &num_reqs);
|
||||
free(dreqs);
|
||||
|
||||
@ -541,6 +544,7 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src
|
||||
|
||||
PML_UCX_TRACE_RECV("%s", buf, count, datatype, src, tag, comm, "recv");
|
||||
|
||||
/* coverity[bad_alloc_arithmetic] */
|
||||
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
|
||||
req = PML_UCX_REQ_ALLOCA();
|
||||
status = ucp_tag_recv_nbr(ompi_pml_ucx.ucp_worker, buf, count,
|
||||
@ -765,6 +769,7 @@ mca_pml_ucx_send_nbr(ucp_ep_h ep, const void *buf, size_t count,
|
||||
void *req;
|
||||
ucs_status_t status;
|
||||
|
||||
/* coverity[bad_alloc_arithmetic] */
|
||||
req = PML_UCX_REQ_ALLOCA();
|
||||
status = ucp_tag_send_nbr(ep, buf, count, ucx_datatype, tag, req);
|
||||
if (OPAL_LIKELY(status == UCS_OK)) {
|
||||
|
@ -91,10 +91,10 @@ int mca_spml_ucx_enable(bool enable)
|
||||
}
|
||||
|
||||
|
||||
static void mca_spml_ucx_waitall(void **reqs, size_t *count_p)
|
||||
static void mca_spml_ucx_waitall(void **reqs, int *count_p)
|
||||
{
|
||||
ucs_status_t status;
|
||||
size_t i;
|
||||
int i;
|
||||
|
||||
SPML_VERBOSE(10, "waiting for %d disconnect requests", *count_p);
|
||||
for (i = 0; i < *count_p; ++i) {
|
||||
@ -116,7 +116,8 @@ static void mca_spml_ucx_waitall(void **reqs, size_t *count_p)
|
||||
int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
{
|
||||
int my_rank = oshmem_my_proc_id();
|
||||
size_t num_reqs, max_reqs;
|
||||
int num_reqs;
|
||||
size_t max_reqs;
|
||||
void *dreq, **dreqs;
|
||||
ucp_ep_h ep;
|
||||
size_t i, n;
|
||||
@ -146,24 +147,26 @@ int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
continue;
|
||||
}
|
||||
|
||||
mca_spml_ucx.ucp_peers[n].ucp_conn = NULL;
|
||||
|
||||
SPML_VERBOSE(10, "disconnecting from peer %d", n);
|
||||
dreq = ucp_disconnect_nb(ep);
|
||||
if (dreq != NULL) {
|
||||
if (UCS_PTR_IS_ERR(dreq)) {
|
||||
SPML_ERROR("ucp_disconnect_nb(%d) failed: %s", n,
|
||||
ucs_status_string(UCS_PTR_STATUS(dreq)));
|
||||
continue;
|
||||
} else {
|
||||
dreqs[num_reqs++] = dreq;
|
||||
if (num_reqs >= mca_spml_ucx.num_disconnect) {
|
||||
mca_spml_ucx_waitall(dreqs, &num_reqs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mca_spml_ucx.ucp_peers[n].ucp_conn = NULL;
|
||||
|
||||
if ((int)num_reqs >= mca_spml_ucx.num_disconnect) {
|
||||
mca_spml_ucx_waitall(dreqs, &num_reqs);
|
||||
}
|
||||
}
|
||||
|
||||
/* num_reqs == 0 is processed by mca_pml_ucx_waitall routine,
|
||||
* so suppress coverity warning */
|
||||
/* coverity[uninit_use_in_call] */
|
||||
mca_spml_ucx_waitall(dreqs, &num_reqs);
|
||||
free(dreqs);
|
||||
|
||||
@ -255,8 +258,9 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
ucs_status_t err;
|
||||
ucp_address_t *wk_local_addr;
|
||||
size_t wk_addr_len;
|
||||
int *wk_roffs, *wk_rsizes;
|
||||
char *wk_raddrs;
|
||||
int *wk_roffs = NULL;
|
||||
int *wk_rsizes = NULL;
|
||||
char *wk_raddrs = NULL;
|
||||
ucp_ep_params_t ep_params;
|
||||
|
||||
|
||||
@ -315,12 +319,9 @@ error2:
|
||||
}
|
||||
if (mca_spml_ucx.ucp_peers)
|
||||
free(mca_spml_ucx.ucp_peers);
|
||||
if (wk_raddrs)
|
||||
free(wk_raddrs);
|
||||
if (wk_rsizes)
|
||||
free(wk_rsizes);
|
||||
if (wk_roffs)
|
||||
free(wk_roffs);
|
||||
free(wk_raddrs);
|
||||
free(wk_rsizes);
|
||||
free(wk_roffs);
|
||||
error:
|
||||
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
|
||||
SPML_ERROR("add procs FAILED rc=%d", rc);
|
||||
@ -531,6 +532,10 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
|
||||
|
||||
mem_seg = memheap_find_va(mkeys[0].va_base);
|
||||
ucx_mkey = (spml_ucx_mkey_t*)mkeys[0].spml_context;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == mem_seg)) {
|
||||
return OSHMEM_ERROR;
|
||||
}
|
||||
|
||||
if (MAP_SEGMENT_ALLOC_UCX != mem_seg->type) {
|
||||
ucp_mem_unmap(mca_spml_ucx.ucp_context, ucx_mkey->mem_h);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user