New improved version of sender-based. Under dev but a new framework for expressing various methods have been added.
This commit was SVN r16159.
Этот коммит содержится в:
родитель
6bf121e17b
Коммит
bbac6e650a
@ -28,7 +28,9 @@ int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size)
|
||||
sb.sb_pagesize = getpagesize();
|
||||
sb.sb_cursor = sb.sb_addr = (uintptr_t) NULL;
|
||||
sb.sb_available = 0;
|
||||
#ifdef SB_USE_SELFCOMM_METHOD
|
||||
sb.sb_comm = MPI_COMM_NULL;
|
||||
#endif
|
||||
|
||||
sprintf(path, "%s"OPAL_PATH_SEP"%s", orte_process_info.proc_session_dir,
|
||||
mmapfile);
|
||||
@ -46,13 +48,8 @@ void vprotocol_pessimist_sender_based_finalize(void)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if(sb.sb_comm != MPI_COMM_NULL)
|
||||
if(((uintptr_t) NULL) != sb.sb_addr)
|
||||
{
|
||||
/* TODO: check this has already been freed by MPI_Finalize
|
||||
* ret = ompi_comm_free(&sb.sb_comm);
|
||||
* if(MPI_SUCCESS != ret)
|
||||
* opal_output(0, "pml_v: protocol_pessimist: sender_based_finalize: ompi_comm_free failed (%d)", ret);
|
||||
*/
|
||||
ret = munmap((void *) sb.sb_addr, sb.sb_length);
|
||||
if(-1 == ret)
|
||||
V_OUTPUT_ERR("pml_v: protocol_pessimsit: sender_based_finalize: munmap (%p): %s",
|
||||
@ -70,11 +67,13 @@ void vprotocol_pessimist_sender_based_finalize(void)
|
||||
*/
|
||||
void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
{
|
||||
if(sb.sb_comm == MPI_COMM_NULL)
|
||||
ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1);
|
||||
else
|
||||
if(((uintptr_t) NULL) != sb.sb_addr)
|
||||
munmap((void *) sb.sb_addr, sb.sb_length);
|
||||
|
||||
#if SB_USE_SELFCOMM_METHOD
|
||||
else
|
||||
ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1);
|
||||
#endif
|
||||
|
||||
/* Take care of alignement of sb_offset */
|
||||
sb.sb_offset += sb.sb_cursor - sb.sb_addr;
|
||||
sb.sb_cursor = sb.sb_offset % sb.sb_pagesize;
|
||||
@ -91,7 +90,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
if(-1 == lseek(sb.sb_fd, sb.sb_offset + sb.sb_length, SEEK_SET))
|
||||
{
|
||||
V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: lseek: %s",
|
||||
strerror(errno));
|
||||
з strerror(errno));
|
||||
close(sb.sb_fd);
|
||||
ompi_mpi_abort(MPI_COMM_NULL, MPI_ERR_NO_SPACE, false);
|
||||
}
|
||||
@ -105,8 +104,9 @@ void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
sb.sb_addr = (uintptr_t) mmap((void *) sb.sb_addr, sb.sb_length,
|
||||
PROT_WRITE | PROT_READ, MAP_SHARED, sb.sb_fd,
|
||||
sb.sb_offset);
|
||||
#endif
|
||||
#else
|
||||
sb.sb_addr = (uintptr_t) malloc(sb.sb_length);
|
||||
#endif
|
||||
if(((uintptr_t) -1) == sb.sb_addr)
|
||||
{
|
||||
V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: mmap: %s",
|
||||
@ -118,4 +118,21 @@ void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
V_OUTPUT_VERBOSE(30, "pessimist:\tsb\tgrow\toffset %llu\tlength %llu\tbase %p\tcursor %p", (unsigned long long) sb.sb_offset, (unsigned long long) sb.sb_length, (void *) sb.sb_addr, (void *) sb.sb_cursor);
|
||||
}
|
||||
|
||||
#ifdef SB_USE_CONVERTOR_METHOD
|
||||
uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t* pConvertor,
|
||||
struct iovec* iov,
|
||||
uint32_t* out_size,
|
||||
size_t* max_data) {
|
||||
int ret;
|
||||
vprotocol_pessimist_send_request_t * preq;
|
||||
|
||||
preq = VPESSIMIST_SEND_REQ(pConvertor);
|
||||
pConvertor->flags = preq->conv_flags;
|
||||
pConvertor->f_advance = preq->conv_advance;
|
||||
ret = pConvertor->f_advance(pConvertor, iov, out_size, max_data);
|
||||
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
|
||||
#undef sb
|
||||
|
@ -15,11 +15,21 @@
|
||||
#include "ompi/mca/pml/base/pml_base_sendreq.h"
|
||||
|
||||
|
||||
/* There is several different ways of packing the data to the sender-based
|
||||
* buffer. Just pick one.
|
||||
*/
|
||||
#define SB_USE_PACK_METHOD
|
||||
#undef SB_USE_SELFCOMM_METHOD
|
||||
#undef SB_USE_CONVERTOR_METHOD
|
||||
|
||||
|
||||
typedef struct vprotocol_pessimist_sender_based_t
|
||||
{
|
||||
int sb_pagesize; /* size of memory pages on this architecture */
|
||||
#ifdef SB_USE_SELF_METHOD
|
||||
ompi_communicator_t *sb_comm;
|
||||
|
||||
#endif
|
||||
|
||||
int sb_fd; /* file descriptor of mapped file */
|
||||
off_t sb_offset; /* offset in mmaped file */
|
||||
|
||||
@ -49,6 +59,7 @@ void vprotocol_pessimist_sender_based_finalize(void);
|
||||
*/
|
||||
void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
|
||||
|
||||
/** Copy data associated to a pml_base_send_request_t to the sender based
|
||||
* message payload buffer
|
||||
*/
|
||||
@ -60,10 +71,10 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
{ \
|
||||
vprotocol_pessimist_sender_based_alloc(req->req_bytes_packed); \
|
||||
} \
|
||||
__SENDER_BASED_PACK(req); \
|
||||
__SENDER_BASED_COPY(req); \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_PACK(req) do { \
|
||||
#define __SENDER_BASED_COPY(req) do { \
|
||||
vprotocol_pessimist_sender_based_header_t *sbhdr = \
|
||||
(vprotocol_pessimist_sender_based_header_t *) \
|
||||
mca_vprotocol_pessimist.sender_based.sb_cursor; \
|
||||
@ -75,7 +86,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
mca_vprotocol_pessimist.sender_based.sb_cursor += \
|
||||
sizeof(vprotocol_pessimist_sender_based_header_t); \
|
||||
\
|
||||
__SENDER_BASED_CNVTOR_PACK(req); \
|
||||
__SENDER_BASED_METHOD_COPY(req); \
|
||||
mca_vprotocol_pessimist.sender_based.sb_cursor += sbhdr->size; \
|
||||
mca_vprotocol_pessimist.sender_based.sb_available -= (sbhdr->size + \
|
||||
sizeof(vprotocol_pessimist_sender_based_header_t)); \
|
||||
@ -83,14 +94,12 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
} while(0)
|
||||
|
||||
/** Ensure sender based is finished before allowing user to touch send buffer
|
||||
*/
|
||||
#define VPROTOCOL_PESSIMIST_SENDER_BASED_FLUSH(REQ) __SENDER_BASED_CNVTOR_FLUSH(REQ)
|
||||
*/
|
||||
#define VPROTOCOL_PESSIMIST_SENDER_BASED_FLUSH(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
|
||||
/* There is 2 different ways of packing the data to the sender-based buffer
|
||||
* just pick one
|
||||
*/
|
||||
#define __SENDER_BASED_CNVTOR_PACK(req) do { \
|
||||
#if defined(SB_USE_PACK_METHOD)
|
||||
/* Convertor pack (blocking) method */
|
||||
#define __SENDER_BASED_METHOD_COPY(req) do { \
|
||||
if(0 != req->req_bytes_packed) { \
|
||||
ompi_convertor_t conv; \
|
||||
size_t max_data; \
|
||||
@ -107,11 +116,11 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_CNVTOR_FLUSH(REQ)
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
|
||||
|
||||
#define __SENDER_BASED_SNDRCV_PACK(req) do { \
|
||||
#elif defined(SB_USE_SELFCOMM_METHOD)
|
||||
/* iRecv/Send on SELF pack method */
|
||||
#define __SENDER_BASED_METHOD_COPY(req) do { \
|
||||
mca_pml_v.host_pml.pml_irecv( \
|
||||
mca_vprotocol_pessimist.sender_based.sb_cursor, \
|
||||
req->req_bytes_packed, MPI_PACKED, 0, 0, \
|
||||
@ -124,7 +133,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
&VPESSIMIST_SEND_REQ(req)->sb_reqs[1]); \
|
||||
} while(0);
|
||||
|
||||
#define __SENDER_BASED_SNDRCV_FLUSH(REQ) do { \
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ) do { \
|
||||
if(NULL != VPESSIMIST_REQ(REQ)->sb_reqs[0]) \
|
||||
{ \
|
||||
ompi_request_wait_all(2, VPESSIMIST_REQ(REQ)->sb_reqs, \
|
||||
@ -133,4 +142,25 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#elif defined(SB_USE_CONVERTOR_METHOD)
|
||||
/* Convertor replacement (non blocking) method */
|
||||
convertor_advance_fct_t vprotocol_pessimist_sender_based_convertor_advance;
|
||||
|
||||
#define __SENDER_BASED_METHOD_COPY(REQ) do { \
|
||||
ompi_convertor_t *pConv; \
|
||||
vprotocol_pessimist_send_request_t *pReq; \
|
||||
\
|
||||
pConv = & (REQ)->req_base.req_convertor; \
|
||||
pReq = VPESSIMIST_SEND_REQ(REQ); \
|
||||
pReq->conv_flags = pConv->flags; \
|
||||
pReq->conv_advance = pConv->f_advance; \
|
||||
\
|
||||
pConv->flags |= CONVERTOR_NO_OP; \
|
||||
pConv->f_advance = vprotocol_pessimist_sender_based_convertor_advance; \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
#endif /* SB_USE_*_METHOD */
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user