PML/UCX: use non-blocking fence instead of async progress
Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
Этот коммит содержится в:
родитель
10f2d831ec
Коммит
2745da7dcc
@ -78,11 +78,6 @@ mca_pml_ucx_module_t ompi_pml_ucx = {
|
||||
#define PML_UCX_REQ_ALLOCA() \
|
||||
((char *)alloca(ompi_pml_ucx.request_size) + ompi_pml_ucx.request_size);
|
||||
|
||||
typedef struct mca_pml_progress_data {
|
||||
ucp_worker_h worker;
|
||||
int complete;
|
||||
} mca_pml_progress_data_t;
|
||||
|
||||
|
||||
static int mca_pml_ucx_send_worker_address(void)
|
||||
{
|
||||
@ -392,27 +387,19 @@ static void mca_pml_ucx_waitall(void **reqs, size_t *count_p)
|
||||
*count_p = 0;
|
||||
}
|
||||
|
||||
static void *mca_pml_async_ucx_progress(void *arg)
|
||||
static void mca_pml_fence_complete_cb(int status, void *fenced)
|
||||
{
|
||||
mca_pml_progress_data_t *data = arg;
|
||||
|
||||
while (!data->complete) {
|
||||
ucp_worker_progress(data->worker);
|
||||
}
|
||||
return NULL;
|
||||
*(int*)fenced = 1;
|
||||
}
|
||||
|
||||
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
{
|
||||
int fenced = 0;
|
||||
ompi_proc_t *proc;
|
||||
size_t num_reqs, max_reqs;
|
||||
void *dreq, **dreqs;
|
||||
ucp_ep_h ep;
|
||||
size_t i;
|
||||
ucs_status_t ret;
|
||||
pthread_t progress_thr;
|
||||
mca_pml_progress_data_t progress_data;
|
||||
int thread_err;
|
||||
|
||||
max_reqs = ompi_pml_ucx.num_disconnect;
|
||||
if (max_reqs > nprocs) {
|
||||
@ -459,25 +446,10 @@ int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs)
|
||||
* finalize gracefully */
|
||||
ucp_worker_flush(ompi_pml_ucx.ucp_worker);
|
||||
|
||||
/* Some peers may be in active communication phase and
|
||||
* require progress from current process. Unfortunately
|
||||
* PMIX could not provide progress call during Fence, and
|
||||
* we will create own progress thread for UCX */
|
||||
progress_data.worker = ompi_pml_ucx.ucp_worker;
|
||||
progress_data.complete = 0;
|
||||
opal_pmix.fence_nb(NULL, 0, mca_pml_fence_complete_cb, &fenced);
|
||||
|
||||
thread_err = pthread_create(&progress_thr, NULL, mca_pml_async_ucx_progress,
|
||||
&progress_data);
|
||||
if (thread_err) {
|
||||
PML_UCX_ERROR("Failed to create async progress thread: %d", thread_err);
|
||||
}
|
||||
|
||||
opal_pmix.fence(NULL, 0);
|
||||
|
||||
/* All peers are completed comunication. Let's stop progress thread */
|
||||
if (!thread_err) {
|
||||
progress_data.complete = 1;
|
||||
pthread_join(progress_thr, NULL);
|
||||
while (!fenced) {
|
||||
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user