From d66b01d380a05518c3121db5d6aac4ef2789b5b9 Mon Sep 17 00:00:00 2001 From: yosefe Date: Mon, 9 Nov 2015 14:55:22 +0200 Subject: [PATCH 1/2] pml_ucx: implement cancel, and add small optimizations. --- ompi/mca/pml/ucx/pml_ucx.c | 5 +++- ompi/mca/pml/ucx/pml_ucx_request.c | 44 ++++++++++++++++++++++++++---- ompi/mca/pml/ucx/pml_ucx_request.h | 1 + 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 6b746e03c4..0eab042e48 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -386,6 +386,7 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src return OMPI_ERROR; } + ucp_worker_progress(ompi_pml_ucx.ucp_worker); while (!req->req_complete) { opal_progress(); } @@ -492,10 +493,11 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i mca_pml_ucx_get_datatype(datatype), PML_UCX_MAKE_SEND_TAG(tag, comm), mca_pml_ucx_send_completion); - if (req == NULL) { + if (OPAL_LIKELY(req == NULL)) { return OMPI_SUCCESS; } else if (!UCS_PTR_IS_ERR(req)) { PML_UCX_VERBOSE(8, "got request %p", (void*)req); + ucp_worker_progress(ompi_pml_ucx.ucp_worker); ompi_request_wait(&req, MPI_STATUS_IGNORE); return OMPI_SUCCESS; } else { @@ -698,6 +700,7 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests) PML_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p", (void*)tmp_req, (void*)preq); tmp_req->req_complete_cb_data = preq; + preq->tmp_req = tmp_req; } OPAL_THREAD_UNLOCK(&ompi_request_lock); } else { diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index 42066a1f66..67711e23ce 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -25,6 +25,12 @@ static int mca_pml_ucx_request_free(ompi_request_t **rptr) return OMPI_SUCCESS; } +static int mca_pml_ucx_request_cancel(ompi_request_t *req, int flag) +{ + ucp_request_cancel(ompi_pml_ucx.ucp_worker, req); + return OMPI_SUCCESS; +} + void mca_pml_ucx_send_completion(void *request, ucs_status_t status) { ompi_request_t *req = request; @@ -55,12 +61,19 @@ void mca_pml_ucx_recv_completion(void *request, ucs_status_t status, OPAL_THREAD_UNLOCK(&ompi_request_lock); } +static void mca_pml_ucx_persistent_requset_detach(mca_pml_ucx_persistent_request_t *preq, + ompi_request_t *tmp_req) +{ + tmp_req->req_complete_cb_data = NULL; + preq->tmp_req = NULL; +} + void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq, ompi_request_t *tmp_req) { preq->ompi.req_status = tmp_req->req_status; ompi_request_complete(&preq->ompi, true); - tmp_req->req_complete_cb_data = NULL; + mca_pml_ucx_persistent_requset_detach(preq, tmp_req); mca_pml_ucx_request_reset(tmp_req); ucp_request_release(tmp_req); } @@ -73,6 +86,7 @@ static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req) ompi_request_complete(tmp_req, false); preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data; if (preq != NULL) { + PML_UCX_ASSERT(preq->tmp_req != NULL); mca_pml_ucx_persistent_requset_complete(preq, tmp_req); } OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -120,7 +134,8 @@ void mca_pml_ucx_request_init(void *request) ompi_request_t* ompi_req = request; OBJ_CONSTRUCT(ompi_req, ompi_request_t); mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE, - mca_pml_ucx_request_free, NULL); + mca_pml_ucx_request_free, + mca_pml_ucx_request_cancel); } void mca_pml_ucx_request_cleanup(void *request) @@ -133,18 +148,35 @@ void mca_pml_ucx_request_cleanup(void *request) static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr) { - mca_pml_ucx_persistent_request_t* req = (mca_pml_ucx_persistent_request_t*)*rptr; + mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)*rptr; + ompi_request_t *tmp_req = preq->tmp_req; + preq->ompi.req_state = OMPI_REQUEST_INVALID; + if (tmp_req != NULL) { + mca_pml_ucx_persistent_requset_detach(preq, tmp_req); + ucp_request_release(tmp_req); + } + PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &preq->ompi.super); *rptr = MPI_REQUEST_NULL; - req->ompi.req_state = OMPI_REQUEST_INVALID; - PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &req->ompi.super); + return OMPI_SUCCESS; +} + +static int mca_pml_ucx_persistent_request_cancel(ompi_request_t *req, int flag) +{ + mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)req; + + if (preq->tmp_req != NULL) { + ucp_request_cancel(ompi_pml_ucx.ucp_worker, preq->tmp_req); + } return OMPI_SUCCESS; } static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req) { mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE, - mca_pml_ucx_persistent_request_free, NULL); + mca_pml_ucx_persistent_request_free, + mca_pml_ucx_persistent_request_cancel); + req->tmp_req = NULL; } static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req) diff --git a/ompi/mca/pml/ucx/pml_ucx_request.h b/ompi/mca/pml/ucx/pml_ucx_request.h index dfd91f31e4..6d3fd0a067 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.h +++ b/ompi/mca/pml/ucx/pml_ucx_request.h @@ -89,6 +89,7 @@ enum { struct pml_ucx_persistent_request { ompi_request_t ompi; + ompi_request_t *tmp_req; unsigned flags; void *buffer; size_t count; From 7becc54d67d71068b809ffdffffef09df7648917 Mon Sep 17 00:00:00 2001 From: yosefe Date: Thu, 12 Nov 2015 09:57:19 +0200 Subject: [PATCH 2/2] pml_ucx: fix typo. --- ompi/mca/pml/ucx/pml_ucx_request.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index 67711e23ce..5fb0d3fc1b 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -61,19 +61,19 @@ void mca_pml_ucx_recv_completion(void *request, ucs_status_t status, OPAL_THREAD_UNLOCK(&ompi_request_lock); } -static void mca_pml_ucx_persistent_requset_detach(mca_pml_ucx_persistent_request_t *preq, +static void mca_pml_ucx_persistent_request_detach(mca_pml_ucx_persistent_request_t *preq, ompi_request_t *tmp_req) { tmp_req->req_complete_cb_data = NULL; preq->tmp_req = NULL; } -void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq, +void mca_pml_ucx_persistent_request_complete(mca_pml_ucx_persistent_request_t *preq, ompi_request_t *tmp_req) { preq->ompi.req_status = tmp_req->req_status; ompi_request_complete(&preq->ompi, true); - mca_pml_ucx_persistent_requset_detach(preq, tmp_req); + mca_pml_ucx_persistent_request_detach(preq, tmp_req); mca_pml_ucx_request_reset(tmp_req); ucp_request_release(tmp_req); } @@ -87,7 +87,7 @@ static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req) preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data; if (preq != NULL) { PML_UCX_ASSERT(preq->tmp_req != NULL); - mca_pml_ucx_persistent_requset_complete(preq, tmp_req); + mca_pml_ucx_persistent_request_complete(preq, tmp_req); } OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -153,7 +153,7 @@ static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr) preq->ompi.req_state = OMPI_REQUEST_INVALID; if (tmp_req != NULL) { - mca_pml_ucx_persistent_requset_detach(preq, tmp_req); + mca_pml_ucx_persistent_request_detach(preq, tmp_req); ucp_request_release(tmp_req); } PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &preq->ompi.super);