diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 418fc0f43c..388b3f2042 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -78,11 +78,6 @@ mca_pml_ucx_module_t ompi_pml_ucx = { #define PML_UCX_REQ_ALLOCA() \ ((char *)alloca(ompi_pml_ucx.request_size) + ompi_pml_ucx.request_size); -typedef struct mca_pml_progress_data { - ucp_worker_h worker; - int complete; -} mca_pml_progress_data_t; - static int mca_pml_ucx_send_worker_address(void) { @@ -392,27 +387,19 @@ static void mca_pml_ucx_waitall(void **reqs, size_t *count_p) *count_p = 0; } -static void *mca_pml_async_ucx_progress(void *arg) +static void mca_pml_fence_complete_cb(int status, void *fenced) { - mca_pml_progress_data_t *data = arg; - - while (!data->complete) { - ucp_worker_progress(data->worker); - } - return NULL; + *(int*)fenced = 1; } int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs) { + int fenced = 0; ompi_proc_t *proc; size_t num_reqs, max_reqs; void *dreq, **dreqs; ucp_ep_h ep; size_t i; - ucs_status_t ret; - pthread_t progress_thr; - mca_pml_progress_data_t progress_data; - int thread_err; max_reqs = ompi_pml_ucx.num_disconnect; if (max_reqs > nprocs) { @@ -459,25 +446,10 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs) * finalize gracefully */ ucp_worker_flush(ompi_pml_ucx.ucp_worker); - /* Some peers may be in active communication phase and - * require progress from current process. Unfortunately - * PMIX could not provide progress call during Fence, and - * we will create own progress thread for UCX */ - progress_data.worker = ompi_pml_ucx.ucp_worker; - progress_data.complete = 0; + opal_pmix.fence_nb(NULL, 0, mca_pml_fence_complete_cb, &fenced); - thread_err = pthread_create(&progress_thr, NULL, mca_pml_async_ucx_progress, - &progress_data); - if (thread_err) { - PML_UCX_ERROR("Failed to create async progress thread: %d", thread_err); - } - - opal_pmix.fence(NULL, 0); - - /* All peers are completed comunication. Let's stop progress thread */ - if (!thread_err) { - progress_data.complete = 1; - pthread_join(progress_thr, NULL); + while (!fenced) { + ucp_worker_progress(ompi_pml_ucx.ucp_worker); } return OMPI_SUCCESS;