From f897c8a1e06cb3797bec2a82e3cb43e62613c3c3 Mon Sep 17 00:00:00 2001 From: Vasily Filipov Date: Wed, 27 Feb 2013 13:21:30 +0000 Subject: [PATCH] MTL MXM: STREAM supporting for isend and irecv. This commit was SVN r28122. --- ompi/mca/mtl/mxm/mtl_mxm_recv.c | 72 ++++++++++++++++++++++++++++----- ompi/mca/mtl/mxm/mtl_mxm_send.c | 59 ++++++++++++++++----------- 2 files changed, 96 insertions(+), 35 deletions(-) diff --git a/ompi/mca/mtl/mxm/mtl_mxm_recv.c b/ompi/mca/mtl/mxm/mtl_mxm_recv.c index d1244887f4..1b348896a7 100644 --- a/ompi/mca/mtl/mxm/mtl_mxm_recv.c +++ b/ompi/mca/mtl/mxm/mtl_mxm_recv.c @@ -28,12 +28,66 @@ static void ompi_mtl_mxm_recv_completion_cb(void *context) ompi_req->req_status.MPI_SOURCE = mxm_recv_req->completion.sender_imm; ompi_req->req_status._ucount = mxm_recv_req->completion.actual_len; - /* Copy data and free the buffer */ - ompi_mtl_datatype_unpack(req->convertor, req->buf, - mxm_recv_req->completion.actual_len); req->super.completion_callback(&req->super); } +static size_t ompi_mtl_mxm_stream_unpack(void *buffer, size_t length, + size_t offset, void *context) +{ + struct iovec iov; + uint32_t iov_count = 1; + + mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *) context; + opal_convertor_t *convertor = mtl_mxm_request->convertor; + + iov.iov_len = length; + iov.iov_base = buffer; + + opal_convertor_set_position(convertor, &offset); + opal_convertor_unpack(convertor, &iov, &iov_count, &length); + + return length; +} + +static inline __opal_attribute_always_inline__ int + ompi_mtl_mxm_choose_recv_datatype(mca_mtl_mxm_request_t *mtl_mxm_request) +{ + void **buffer = &mtl_mxm_request->buf; + size_t *buffer_len = &mtl_mxm_request->length; + + mxm_recv_req_t *mxm_recv_req = &mtl_mxm_request->mxm.recv; + opal_convertor_t *convertor = mtl_mxm_request->convertor; + + opal_convertor_get_packed_size(convertor, buffer_len); + + if (0 == *buffer_len) { + *buffer = NULL; + *buffer_len = 0; + + mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER; + + return OMPI_SUCCESS; + } + + if (opal_convertor_need_buffers(convertor)) { + mxm_recv_req->base.data_type = MXM_REQ_DATA_STREAM; + mxm_recv_req->base.data.stream.length = *buffer_len; + mxm_recv_req->base.data.stream.cb = ompi_mtl_mxm_stream_unpack; + + return OMPI_SUCCESS; + } + + mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER; + + *buffer = convertor->pBaseBuf + + convertor->use_desc->desc[convertor->use_desc->used].end_loop.first_elem_disp; + + mxm_recv_req->base.data.buffer.ptr = *buffer; + mxm_recv_req->base.data.buffer.length = *buffer_len; + + return OMPI_SUCCESS; +} + static inline __opal_attribute_always_inline__ int ompi_mtl_mxm_recv_init(mca_mtl_mxm_request_t *mtl_mxm_request, opal_convertor_t *convertor, @@ -42,10 +96,7 @@ static inline __opal_attribute_always_inline__ int int ret; mtl_mxm_request->convertor = convertor; - ret = ompi_mtl_datatype_recv_buf(convertor, - &mtl_mxm_request->buf, - &mtl_mxm_request->length, - &mtl_mxm_request->free_after); + ret = ompi_mtl_mxm_choose_recv_datatype(mtl_mxm_request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } @@ -59,14 +110,13 @@ static inline __opal_attribute_always_inline__ int #if MXM_API < MXM_VERSION(2,0) mxm_recv_req->base.flags = 0; #endif - mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER; - mxm_recv_req->base.data.buffer.ptr = mtl_mxm_request->buf; - mxm_recv_req->base.data.buffer.length = mtl_mxm_request->length; + #if MXM_API < MXM_VERSION(1,5) mxm_recv_req->base.data.buffer.mkey = MXM_MKEY_NONE; #else mxm_recv_req->base.data.buffer.memh = MXM_INVALID_MEM_HANDLE; #endif + mxm_recv_req->base.context = mtl_mxm_request; mxm_recv_req->base.completed_cb = ompi_mtl_mxm_recv_completion_cb; @@ -81,7 +131,7 @@ int ompi_mtl_mxm_irecv(struct mca_mtl_base_module_t* mtl, int ret; mxm_error_t err; mxm_recv_req_t *mxm_recv_req; - mca_mtl_mxm_request_t * mtl_mxm_request; + mca_mtl_mxm_request_t *mtl_mxm_request; mtl_mxm_request = (mca_mtl_mxm_request_t*) mtl_request; mxm_recv_req = &mtl_mxm_request->mxm.recv; diff --git a/ompi/mca/mtl/mxm/mtl_mxm_send.c b/ompi/mca/mtl/mxm/mtl_mxm_send.c index 291ce7e93d..84a7a44261 100644 --- a/ompi/mca/mtl/mxm/mtl_mxm_send.c +++ b/ompi/mca/mtl/mxm/mtl_mxm_send.c @@ -15,13 +15,13 @@ #include "mtl_mxm_request.h" #include "ompi/mca/mtl/base/mtl_base_datatype.h" -static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offset, void *context) +static inline __opal_attribute_always_inline__ + size_t ompi_mtl_mxm_stream_pack(opal_convertor_t *convertor, void *buffer, + size_t length, size_t offset) { struct iovec iov; uint32_t iov_count = 1; - opal_convertor_t *convertor = (opal_convertor_t *) context; - iov.iov_len = length; iov.iov_base = buffer; @@ -31,9 +31,25 @@ static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offse return length; } +static size_t ompi_mtl_mxm_stream_isend(void *buffer, size_t length, size_t offset, void *context) +{ + mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *) context; + opal_convertor_t *convertor = mtl_mxm_request->convertor; + + return ompi_mtl_mxm_stream_pack(convertor, buffer, length, offset); +} + +static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offset, void *context) +{ + opal_convertor_t *convertor = (opal_convertor_t *) context; + + return ompi_mtl_mxm_stream_pack(convertor, buffer, length, offset); +} + static inline __opal_attribute_always_inline__ int ompi_mtl_mxm_choose_send_datatype(mxm_send_req_t *mxm_send_req, - opal_convertor_t *convertor) + opal_convertor_t *convertor, + mxm_stream_cb_t stream_cb) { struct iovec iov; uint32_t iov_count = 1; @@ -49,10 +65,9 @@ static inline __opal_attribute_always_inline__ int } if (opal_convertor_need_buffers(convertor)) { - mxm_send_req->base.context = convertor; mxm_send_req->base.data_type = MXM_REQ_DATA_STREAM; mxm_send_req->base.data.stream.length = *buffer_len; - mxm_send_req->base.data.stream.cb = ompi_mtl_mxm_stream_send; + mxm_send_req->base.data.stream.cb = stream_cb; return OMPI_SUCCESS; } @@ -72,10 +87,6 @@ static void ompi_mtl_mxm_send_completion_cb(void *context) { mca_mtl_mxm_request_t *mtl_mxm_request = context; - if (mtl_mxm_request->free_after) { - free(mtl_mxm_request->buf); - } - ompi_mtl_mxm_to_mpi_status(mtl_mxm_request->mxm.base.error, &mtl_mxm_request->super.ompi_req->req_status); mtl_mxm_request->super.completion_callback(&mtl_mxm_request->super); @@ -100,10 +111,11 @@ int ompi_mtl_mxm_send(struct mca_mtl_base_module_t* mtl, mxm_send_req.base.state = MXM_REQ_NEW; mxm_send_req.base.mq = ompi_mtl_mxm_mq_lookup(comm); mxm_send_req.base.conn = ompi_mtl_mxm_conn_lookup(comm, dest); - mxm_send_req.base.context = NULL; + mxm_send_req.base.context = convertor; mxm_send_req.base.completed_cb = NULL; - ret = ompi_mtl_mxm_choose_send_datatype(&mxm_send_req, convertor); + ret = ompi_mtl_mxm_choose_send_datatype(&mxm_send_req, convertor, + ompi_mtl_mxm_stream_send); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } @@ -153,7 +165,7 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl, mca_pml_base_send_mode_t mode, bool blocking, mca_mtl_request_t * mtl_request) { - mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *)mtl_request; + mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *) mtl_request; mxm_send_req_t *mxm_send_req; mxm_error_t err; int ret; @@ -161,13 +173,6 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl, assert(mtl == &ompi_mtl_mxm.super); mtl_mxm_request->convertor = convertor; - ret = ompi_mtl_datatype_pack(mtl_mxm_request->convertor, - &mtl_mxm_request->buf, - &mtl_mxm_request->length, - &mtl_mxm_request->free_after); - if (OMPI_SUCCESS != ret) { - return ret; - } mxm_send_req = &mtl_mxm_request->mxm.send; #if MXM_API >= MXM_VERSION(2,0) @@ -178,9 +183,16 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl, mxm_send_req->base.state = MXM_REQ_NEW; mxm_send_req->base.mq = ompi_mtl_mxm_mq_lookup(comm); mxm_send_req->base.conn = ompi_mtl_mxm_conn_lookup(comm, dest); - mxm_send_req->base.data_type = MXM_REQ_DATA_BUFFER; - mxm_send_req->base.data.buffer.ptr = mtl_mxm_request->buf; - mxm_send_req->base.data.buffer.length = mtl_mxm_request->length; + + ret = ompi_mtl_mxm_choose_send_datatype(mxm_send_req, convertor, + ompi_mtl_mxm_stream_isend); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } + + mtl_mxm_request->buf = mxm_send_req->base.data.buffer.ptr; + mtl_mxm_request->length = mxm_send_req->base.data.buffer.length; + #if MXM_API < MXM_VERSION(1,5) mxm_send_req->base.data.buffer.mkey = MXM_MKEY_NONE; #else @@ -194,7 +206,6 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl, mxm_send_req->opcode = MXM_REQ_OP_SEND; if (mode == MCA_PML_BASE_SEND_SYNCHRONOUS) { mxm_send_req->base.flags |= MXM_REQ_FLAG_SEND_SYNC; - } else { } #else mxm_send_req->flags = 0;