MTL MXM: STREAM supporting for isend and irecv.
This commit was SVN r28122.
Этот коммит содержится в:
родитель
c29dcd02f1
Коммит
f897c8a1e0
@ -28,12 +28,66 @@ static void ompi_mtl_mxm_recv_completion_cb(void *context)
|
|||||||
ompi_req->req_status.MPI_SOURCE = mxm_recv_req->completion.sender_imm;
|
ompi_req->req_status.MPI_SOURCE = mxm_recv_req->completion.sender_imm;
|
||||||
ompi_req->req_status._ucount = mxm_recv_req->completion.actual_len;
|
ompi_req->req_status._ucount = mxm_recv_req->completion.actual_len;
|
||||||
|
|
||||||
/* Copy data and free the buffer */
|
|
||||||
ompi_mtl_datatype_unpack(req->convertor, req->buf,
|
|
||||||
mxm_recv_req->completion.actual_len);
|
|
||||||
req->super.completion_callback(&req->super);
|
req->super.completion_callback(&req->super);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static size_t ompi_mtl_mxm_stream_unpack(void *buffer, size_t length,
|
||||||
|
size_t offset, void *context)
|
||||||
|
{
|
||||||
|
struct iovec iov;
|
||||||
|
uint32_t iov_count = 1;
|
||||||
|
|
||||||
|
mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *) context;
|
||||||
|
opal_convertor_t *convertor = mtl_mxm_request->convertor;
|
||||||
|
|
||||||
|
iov.iov_len = length;
|
||||||
|
iov.iov_base = buffer;
|
||||||
|
|
||||||
|
opal_convertor_set_position(convertor, &offset);
|
||||||
|
opal_convertor_unpack(convertor, &iov, &iov_count, &length);
|
||||||
|
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline __opal_attribute_always_inline__ int
|
||||||
|
ompi_mtl_mxm_choose_recv_datatype(mca_mtl_mxm_request_t *mtl_mxm_request)
|
||||||
|
{
|
||||||
|
void **buffer = &mtl_mxm_request->buf;
|
||||||
|
size_t *buffer_len = &mtl_mxm_request->length;
|
||||||
|
|
||||||
|
mxm_recv_req_t *mxm_recv_req = &mtl_mxm_request->mxm.recv;
|
||||||
|
opal_convertor_t *convertor = mtl_mxm_request->convertor;
|
||||||
|
|
||||||
|
opal_convertor_get_packed_size(convertor, buffer_len);
|
||||||
|
|
||||||
|
if (0 == *buffer_len) {
|
||||||
|
*buffer = NULL;
|
||||||
|
*buffer_len = 0;
|
||||||
|
|
||||||
|
mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER;
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opal_convertor_need_buffers(convertor)) {
|
||||||
|
mxm_recv_req->base.data_type = MXM_REQ_DATA_STREAM;
|
||||||
|
mxm_recv_req->base.data.stream.length = *buffer_len;
|
||||||
|
mxm_recv_req->base.data.stream.cb = ompi_mtl_mxm_stream_unpack;
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER;
|
||||||
|
|
||||||
|
*buffer = convertor->pBaseBuf +
|
||||||
|
convertor->use_desc->desc[convertor->use_desc->used].end_loop.first_elem_disp;
|
||||||
|
|
||||||
|
mxm_recv_req->base.data.buffer.ptr = *buffer;
|
||||||
|
mxm_recv_req->base.data.buffer.length = *buffer_len;
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static inline __opal_attribute_always_inline__ int
|
static inline __opal_attribute_always_inline__ int
|
||||||
ompi_mtl_mxm_recv_init(mca_mtl_mxm_request_t *mtl_mxm_request,
|
ompi_mtl_mxm_recv_init(mca_mtl_mxm_request_t *mtl_mxm_request,
|
||||||
opal_convertor_t *convertor,
|
opal_convertor_t *convertor,
|
||||||
@ -42,10 +96,7 @@ static inline __opal_attribute_always_inline__ int
|
|||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
mtl_mxm_request->convertor = convertor;
|
mtl_mxm_request->convertor = convertor;
|
||||||
ret = ompi_mtl_datatype_recv_buf(convertor,
|
ret = ompi_mtl_mxm_choose_recv_datatype(mtl_mxm_request);
|
||||||
&mtl_mxm_request->buf,
|
|
||||||
&mtl_mxm_request->length,
|
|
||||||
&mtl_mxm_request->free_after);
|
|
||||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -59,14 +110,13 @@ static inline __opal_attribute_always_inline__ int
|
|||||||
#if MXM_API < MXM_VERSION(2,0)
|
#if MXM_API < MXM_VERSION(2,0)
|
||||||
mxm_recv_req->base.flags = 0;
|
mxm_recv_req->base.flags = 0;
|
||||||
#endif
|
#endif
|
||||||
mxm_recv_req->base.data_type = MXM_REQ_DATA_BUFFER;
|
|
||||||
mxm_recv_req->base.data.buffer.ptr = mtl_mxm_request->buf;
|
|
||||||
mxm_recv_req->base.data.buffer.length = mtl_mxm_request->length;
|
|
||||||
#if MXM_API < MXM_VERSION(1,5)
|
#if MXM_API < MXM_VERSION(1,5)
|
||||||
mxm_recv_req->base.data.buffer.mkey = MXM_MKEY_NONE;
|
mxm_recv_req->base.data.buffer.mkey = MXM_MKEY_NONE;
|
||||||
#else
|
#else
|
||||||
mxm_recv_req->base.data.buffer.memh = MXM_INVALID_MEM_HANDLE;
|
mxm_recv_req->base.data.buffer.memh = MXM_INVALID_MEM_HANDLE;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
mxm_recv_req->base.context = mtl_mxm_request;
|
mxm_recv_req->base.context = mtl_mxm_request;
|
||||||
mxm_recv_req->base.completed_cb = ompi_mtl_mxm_recv_completion_cb;
|
mxm_recv_req->base.completed_cb = ompi_mtl_mxm_recv_completion_cb;
|
||||||
|
|
||||||
|
@ -15,13 +15,13 @@
|
|||||||
#include "mtl_mxm_request.h"
|
#include "mtl_mxm_request.h"
|
||||||
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
|
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
|
||||||
|
|
||||||
static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offset, void *context)
|
static inline __opal_attribute_always_inline__
|
||||||
|
size_t ompi_mtl_mxm_stream_pack(opal_convertor_t *convertor, void *buffer,
|
||||||
|
size_t length, size_t offset)
|
||||||
{
|
{
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
uint32_t iov_count = 1;
|
uint32_t iov_count = 1;
|
||||||
|
|
||||||
opal_convertor_t *convertor = (opal_convertor_t *) context;
|
|
||||||
|
|
||||||
iov.iov_len = length;
|
iov.iov_len = length;
|
||||||
iov.iov_base = buffer;
|
iov.iov_base = buffer;
|
||||||
|
|
||||||
@ -31,9 +31,25 @@ static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offse
|
|||||||
return length;
|
return length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static size_t ompi_mtl_mxm_stream_isend(void *buffer, size_t length, size_t offset, void *context)
|
||||||
|
{
|
||||||
|
mca_mtl_mxm_request_t *mtl_mxm_request = (mca_mtl_mxm_request_t *) context;
|
||||||
|
opal_convertor_t *convertor = mtl_mxm_request->convertor;
|
||||||
|
|
||||||
|
return ompi_mtl_mxm_stream_pack(convertor, buffer, length, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
static size_t ompi_mtl_mxm_stream_send(void *buffer, size_t length, size_t offset, void *context)
|
||||||
|
{
|
||||||
|
opal_convertor_t *convertor = (opal_convertor_t *) context;
|
||||||
|
|
||||||
|
return ompi_mtl_mxm_stream_pack(convertor, buffer, length, offset);
|
||||||
|
}
|
||||||
|
|
||||||
static inline __opal_attribute_always_inline__ int
|
static inline __opal_attribute_always_inline__ int
|
||||||
ompi_mtl_mxm_choose_send_datatype(mxm_send_req_t *mxm_send_req,
|
ompi_mtl_mxm_choose_send_datatype(mxm_send_req_t *mxm_send_req,
|
||||||
opal_convertor_t *convertor)
|
opal_convertor_t *convertor,
|
||||||
|
mxm_stream_cb_t stream_cb)
|
||||||
{
|
{
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
uint32_t iov_count = 1;
|
uint32_t iov_count = 1;
|
||||||
@ -49,10 +65,9 @@ static inline __opal_attribute_always_inline__ int
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (opal_convertor_need_buffers(convertor)) {
|
if (opal_convertor_need_buffers(convertor)) {
|
||||||
mxm_send_req->base.context = convertor;
|
|
||||||
mxm_send_req->base.data_type = MXM_REQ_DATA_STREAM;
|
mxm_send_req->base.data_type = MXM_REQ_DATA_STREAM;
|
||||||
mxm_send_req->base.data.stream.length = *buffer_len;
|
mxm_send_req->base.data.stream.length = *buffer_len;
|
||||||
mxm_send_req->base.data.stream.cb = ompi_mtl_mxm_stream_send;
|
mxm_send_req->base.data.stream.cb = stream_cb;
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -72,10 +87,6 @@ static void ompi_mtl_mxm_send_completion_cb(void *context)
|
|||||||
{
|
{
|
||||||
mca_mtl_mxm_request_t *mtl_mxm_request = context;
|
mca_mtl_mxm_request_t *mtl_mxm_request = context;
|
||||||
|
|
||||||
if (mtl_mxm_request->free_after) {
|
|
||||||
free(mtl_mxm_request->buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
ompi_mtl_mxm_to_mpi_status(mtl_mxm_request->mxm.base.error,
|
ompi_mtl_mxm_to_mpi_status(mtl_mxm_request->mxm.base.error,
|
||||||
&mtl_mxm_request->super.ompi_req->req_status);
|
&mtl_mxm_request->super.ompi_req->req_status);
|
||||||
mtl_mxm_request->super.completion_callback(&mtl_mxm_request->super);
|
mtl_mxm_request->super.completion_callback(&mtl_mxm_request->super);
|
||||||
@ -100,10 +111,11 @@ int ompi_mtl_mxm_send(struct mca_mtl_base_module_t* mtl,
|
|||||||
mxm_send_req.base.state = MXM_REQ_NEW;
|
mxm_send_req.base.state = MXM_REQ_NEW;
|
||||||
mxm_send_req.base.mq = ompi_mtl_mxm_mq_lookup(comm);
|
mxm_send_req.base.mq = ompi_mtl_mxm_mq_lookup(comm);
|
||||||
mxm_send_req.base.conn = ompi_mtl_mxm_conn_lookup(comm, dest);
|
mxm_send_req.base.conn = ompi_mtl_mxm_conn_lookup(comm, dest);
|
||||||
mxm_send_req.base.context = NULL;
|
mxm_send_req.base.context = convertor;
|
||||||
mxm_send_req.base.completed_cb = NULL;
|
mxm_send_req.base.completed_cb = NULL;
|
||||||
|
|
||||||
ret = ompi_mtl_mxm_choose_send_datatype(&mxm_send_req, convertor);
|
ret = ompi_mtl_mxm_choose_send_datatype(&mxm_send_req, convertor,
|
||||||
|
ompi_mtl_mxm_stream_send);
|
||||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -161,13 +173,6 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl,
|
|||||||
assert(mtl == &ompi_mtl_mxm.super);
|
assert(mtl == &ompi_mtl_mxm.super);
|
||||||
|
|
||||||
mtl_mxm_request->convertor = convertor;
|
mtl_mxm_request->convertor = convertor;
|
||||||
ret = ompi_mtl_datatype_pack(mtl_mxm_request->convertor,
|
|
||||||
&mtl_mxm_request->buf,
|
|
||||||
&mtl_mxm_request->length,
|
|
||||||
&mtl_mxm_request->free_after);
|
|
||||||
if (OMPI_SUCCESS != ret) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
mxm_send_req = &mtl_mxm_request->mxm.send;
|
mxm_send_req = &mtl_mxm_request->mxm.send;
|
||||||
#if MXM_API >= MXM_VERSION(2,0)
|
#if MXM_API >= MXM_VERSION(2,0)
|
||||||
@ -178,9 +183,16 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl,
|
|||||||
mxm_send_req->base.state = MXM_REQ_NEW;
|
mxm_send_req->base.state = MXM_REQ_NEW;
|
||||||
mxm_send_req->base.mq = ompi_mtl_mxm_mq_lookup(comm);
|
mxm_send_req->base.mq = ompi_mtl_mxm_mq_lookup(comm);
|
||||||
mxm_send_req->base.conn = ompi_mtl_mxm_conn_lookup(comm, dest);
|
mxm_send_req->base.conn = ompi_mtl_mxm_conn_lookup(comm, dest);
|
||||||
mxm_send_req->base.data_type = MXM_REQ_DATA_BUFFER;
|
|
||||||
mxm_send_req->base.data.buffer.ptr = mtl_mxm_request->buf;
|
ret = ompi_mtl_mxm_choose_send_datatype(mxm_send_req, convertor,
|
||||||
mxm_send_req->base.data.buffer.length = mtl_mxm_request->length;
|
ompi_mtl_mxm_stream_isend);
|
||||||
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
mtl_mxm_request->buf = mxm_send_req->base.data.buffer.ptr;
|
||||||
|
mtl_mxm_request->length = mxm_send_req->base.data.buffer.length;
|
||||||
|
|
||||||
#if MXM_API < MXM_VERSION(1,5)
|
#if MXM_API < MXM_VERSION(1,5)
|
||||||
mxm_send_req->base.data.buffer.mkey = MXM_MKEY_NONE;
|
mxm_send_req->base.data.buffer.mkey = MXM_MKEY_NONE;
|
||||||
#else
|
#else
|
||||||
@ -194,7 +206,6 @@ int ompi_mtl_mxm_isend(struct mca_mtl_base_module_t* mtl,
|
|||||||
mxm_send_req->opcode = MXM_REQ_OP_SEND;
|
mxm_send_req->opcode = MXM_REQ_OP_SEND;
|
||||||
if (mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
if (mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||||
mxm_send_req->base.flags |= MXM_REQ_FLAG_SEND_SYNC;
|
mxm_send_req->base.flags |= MXM_REQ_FLAG_SEND_SYNC;
|
||||||
} else {
|
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
mxm_send_req->flags = 0;
|
mxm_send_req->flags = 0;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user