This patch adds actual non-blocking sender-based message logging. This improves bandwidth. Still need to work on malloc/mmap storage to reach optimal bandwidth.
This commit was SVN r16172.
Этот коммит содержится в:
родитель
bc318b35e2
Коммит
d3b376a340
@ -135,7 +135,7 @@ int mca_vprotocol_pessimist_enable(bool enable) {
|
||||
else {
|
||||
vprotocol_pessimist_sender_based_finalize();
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -25,8 +25,10 @@ typedef struct mca_vprotocol_pessimist_request_t {
|
||||
vprotocol_pessimist_clock_t reqid;
|
||||
/* ompi_request_t *sb_reqs[2]; */
|
||||
mca_vprotocol_pessimist_event_t *event;
|
||||
convertor_advance_fct_t conv_advance;
|
||||
uint32_t conv_flags;
|
||||
|
||||
uintptr_t sb_cursor;
|
||||
convertor_advance_fct_t sb_conv_advance;
|
||||
uint32_t sb_conv_flags;
|
||||
} mca_vprotocol_pessimist_request_t;
|
||||
typedef mca_vprotocol_pessimist_request_t mca_vprotocol_pessimist_recv_request_t;
|
||||
typedef mca_vprotocol_pessimist_request_t mca_vprotocol_pessimist_send_request_t;
|
||||
|
@ -16,6 +16,7 @@
|
||||
#if defined(HAVE_UNISTD_H)
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
#include "ompi/datatype/datatype_memcpy.h"
|
||||
#include <fcntl.h>
|
||||
|
||||
#define sb mca_vprotocol_pessimist.sender_based
|
||||
@ -28,7 +29,7 @@ int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size)
|
||||
sb.sb_conv_to_pessimist_offset = VPROTOCOL_SEND_REQ(NULL) -
|
||||
((uintptr_t) & pml_req.req_base.req_convertor -
|
||||
(uintptr_t) & pml_req);
|
||||
V_OUTPUT_VERBOSE(1, "conv_to_pessimist_offset: %p", sb.sb_conv_to_pessimist_offset);
|
||||
V_OUTPUT_VERBOSE(500, "pessimist: conv_to_pessimist_offset: %p", sb.sb_conv_to_pessimist_offset);
|
||||
#endif
|
||||
sb.sb_offset = 0;
|
||||
sb.sb_length = size;
|
||||
@ -97,7 +98,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
if(-1 == lseek(sb.sb_fd, sb.sb_offset + sb.sb_length, SEEK_SET))
|
||||
{
|
||||
V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: lseek: %s",
|
||||
з strerror(errno));
|
||||
strerror(errno));
|
||||
close(sb.sb_fd);
|
||||
ompi_mpi_abort(MPI_COMM_NULL, MPI_ERR_NO_SPACE, false);
|
||||
}
|
||||
@ -131,14 +132,23 @@ uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t* pC
|
||||
uint32_t* out_size,
|
||||
size_t* max_data) {
|
||||
int ret;
|
||||
int i;
|
||||
size_t pending_length;
|
||||
mca_vprotocol_pessimist_send_request_t *preq;
|
||||
|
||||
preq = VPESSIMIST_CONV_REQ(pConvertor);
|
||||
V_OUTPUT_VERBOSE(1, "pessimist:\tsb\tadvce\t %p (+%p) %p\tfadvce %p\tout %u\tmax %lu", pConvertor, mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset, preq, preq->conv_advance, *out_size, *max_data);
|
||||
pConvertor->flags = preq->conv_flags;
|
||||
pConvertor->fAdvance = preq->conv_advance;
|
||||
pConvertor->flags = preq->sb_conv_flags;
|
||||
pConvertor->fAdvance = preq->sb_conv_advance;
|
||||
ret = ompi_convertor_pack(pConvertor, iov, out_size, max_data);
|
||||
V_OUTPUT_VERBOSE(1, "pessimist:\tsb\tadvce\t%p\tout %u\tmax %lu", preq, *out_size, *max_data);
|
||||
V_OUTPUT_VERBOSE(39, "pessimist:\tsb\tpack\t%lu", *max_data);
|
||||
|
||||
for(i = 0, pending_length = *max_data; pending_length > 0; i++) {
|
||||
assert(i < *out_size);
|
||||
MEMCPY(preq->sb_cursor, iov[i].iov_base, iov[i].iov_len);
|
||||
pending_length -= iov[i].iov_len;
|
||||
preq->sb_cursor += iov[i].iov_len;
|
||||
}
|
||||
assert(pending_length == 0);
|
||||
|
||||
pConvertor->flags &= ~CONVERTOR_NO_OP;
|
||||
pConvertor->fAdvance = vprotocol_pessimist_sender_based_convertor_advance;
|
||||
|
@ -100,8 +100,12 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
*/
|
||||
#define VPROTOCOL_PESSIMIST_SENDER_BASED_FLUSH(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
* Convertor pack (blocking) method (good latency, bad bandwidth)
|
||||
*/
|
||||
#if defined(SB_USE_PACK_METHOD)
|
||||
/* Convertor pack (blocking) method */
|
||||
#define __SENDER_BASED_METHOD_COPY(req) do { \
|
||||
if(0 != req->req_bytes_packed) { \
|
||||
ompi_convertor_t conv; \
|
||||
@ -121,8 +125,41 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
* Convertor replacement (non blocking) method (under testing)
|
||||
*/
|
||||
#elif defined(SB_USE_CONVERTOR_METHOD)
|
||||
uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t*,
|
||||
struct iovec*,
|
||||
uint32_t*,
|
||||
size_t*);
|
||||
|
||||
#define __SENDER_BASED_METHOD_COPY(REQ) do { \
|
||||
ompi_convertor_t *pConv; \
|
||||
mca_vprotocol_pessimist_send_request_t *preq; \
|
||||
\
|
||||
pConv = & (REQ)->req_base.req_convertor; \
|
||||
preq = VPESSIMIST_SEND_REQ(REQ); \
|
||||
preq->sb_cursor = mca_vprotocol_pessimist.sender_based.sb_cursor; \
|
||||
preq->sb_conv_flags = pConv->flags; \
|
||||
preq->sb_conv_advance = pConv->fAdvance; \
|
||||
\
|
||||
pConv->flags &= ~CONVERTOR_NO_OP; \
|
||||
pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
#define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
|
||||
(mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
|
||||
(uintptr_t) (CONV)))
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
* iRecv/Send on SELF pack method (bad latency, no improvement on bandwidth)
|
||||
*/
|
||||
#elif defined(SB_USE_SELFCOMM_METHOD)
|
||||
/* iRecv/Send on SELF pack method */
|
||||
#define __SENDER_BASED_METHOD_COPY(req) do { \
|
||||
mca_pml_v.host_pml.pml_irecv( \
|
||||
mca_vprotocol_pessimist.sender_based.sb_cursor, \
|
||||
@ -145,32 +182,6 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#elif defined(SB_USE_CONVERTOR_METHOD)
|
||||
/* Convertor replacement (non blocking) method */
|
||||
uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t*,
|
||||
struct iovec*,
|
||||
uint32_t*,
|
||||
size_t*);
|
||||
|
||||
#define __SENDER_BASED_METHOD_COPY(REQ) do { \
|
||||
ompi_convertor_t *pConv; \
|
||||
mca_vprotocol_pessimist_send_request_t *pReq; \
|
||||
\
|
||||
pConv = & (REQ)->req_base.req_convertor; \
|
||||
pReq = VPESSIMIST_SEND_REQ(REQ); \
|
||||
pReq->conv_flags = pConv->flags; \
|
||||
pReq->conv_advance = pConv->fAdvance; \
|
||||
V_OUTPUT_VERBOSE(1, "req %p preq %p conv %p advance %p", (REQ), pReq, pConv, pConv->fAdvance); \
|
||||
pConv->flags &= ~CONVERTOR_NO_OP; \
|
||||
pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
#define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
|
||||
(mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
|
||||
(uintptr_t) (CONV)))
|
||||
|
||||
#endif /* SB_USE_*_METHOD */
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user