From d93b67257b8eb33ddb2bacb98e2ca614e273299e Mon Sep 17 00:00:00 2001 From: Alina Sklarevich Date: Thu, 13 Apr 2017 18:11:55 +0300 Subject: [PATCH] PML UCX: handle a synchronous send. MCA_PML_BASE_SEND_SYNCHRONOUS Signed-off-by: Alina Sklarevich --- ompi/mca/pml/ucx/pml_ucx.c | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 9b38008dac..26da666de0 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -661,6 +661,19 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count, return OMPI_SUCCESS; } +static ompi_request_t* mca_pml_ucx_tag_send_nb(ucp_ep_h ep, const void *buf, + size_t count, ucp_datatype_t datatype, + ucp_tag_t tag, mca_pml_base_send_mode_t mode) +{ + if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) { + return (ompi_request_t*)ucp_tag_send_sync_nb(ep, buf, count, datatype, + tag, mca_pml_ucx_send_completion); + } else { + return (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, datatype, + tag, mca_pml_ucx_send_completion); + } +} + int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, int dst, int tag, mca_pml_base_send_mode_t mode, struct ompi_communicator_t* comm, @@ -674,8 +687,6 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, mode == MCA_PML_BASE_SEND_BUFFERED ? "b" : "", (void*)request) - /* TODO special care to sync/buffered send */ - ep = mca_pml_ucx_get_ep(comm, dst); if (OPAL_UNLIKELY(NULL == ep)) { PML_UCX_ERROR("Failed to get ep for rank %d", dst); @@ -689,10 +700,9 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, PML_UCX_MAKE_SEND_TAG(tag, comm)); } - req = (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, - mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm), - mca_pml_ucx_send_completion); + req = mca_pml_ucx_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype), + PML_UCX_MAKE_SEND_TAG(tag, comm), mode); + if (req == NULL) { PML_UCX_VERBOSE(8, "returning completed request"); *request = &ompi_pml_ucx.completed_send_req; @@ -723,16 +733,15 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i return OMPI_ERROR; } - /* Special care to sync/buffered send */ + /* Special care to buffered send */ if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) { return mca_pml_ucx_bsend(ep, buf, count, datatype, PML_UCX_MAKE_SEND_TAG(tag, comm)); } - req = (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, - mca_pml_ucx_get_datatype(datatype), - PML_UCX_MAKE_SEND_TAG(tag, comm), - mca_pml_ucx_send_completion); + req = mca_pml_ucx_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype), + PML_UCX_MAKE_SEND_TAG(tag, comm), mode); + if (OPAL_LIKELY(req == NULL)) { return OMPI_SUCCESS; } else if (!UCS_PTR_IS_ERR(req)) { @@ -915,6 +924,13 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests) } /* pretend that we got immediate completion */ tmp_req = NULL; + } else if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == preq->send.mode)) { + PML_UCX_VERBOSE(8, "start send sync request %p", (void*)preq); + tmp_req = (ompi_request_t*)ucp_tag_send_sync_nb(preq->send.ep, + preq->buffer, + preq->count, preq->datatype, + preq->tag, + mca_pml_ucx_psend_completion); } else { PML_UCX_VERBOSE(8, "start send request %p", (void*)preq); tmp_req = (ompi_request_t*)ucp_tag_send_nb(preq->send.ep, preq->buffer,