There is room in convertor to copy the packed data. It works just need to add the correct memcopy. It does not manage the short messages but I alreqdy think of a workaround for this (and it might even be better regarding latency).
This commit was SVN r16169.
Этот коммит содержится в:
родитель
54c87daaed
Коммит
bc318b35e2
@ -201,22 +201,18 @@ static int mca_pml_v_enable(bool enable)
|
||||
int ret;
|
||||
|
||||
/* Enable the real PML (no threading issues there as threads are started
|
||||
* later
|
||||
* later)
|
||||
*/
|
||||
ret = mca_pml_v.host_pml.pml_enable(enable);
|
||||
if(OMPI_SUCCESS != ret) return ret;
|
||||
|
||||
if(enable) {
|
||||
if(mca_vprotocol_base_selected()) {
|
||||
if(mca_vprotocol.enable)
|
||||
return mca_vprotocol.enable(enable);
|
||||
else
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
/* Check if a protocol have been selected during init */
|
||||
if(! mca_vprotocol_base_selected())
|
||||
mca_vprotocol_base_select(pml_v_enable_progress_treads,
|
||||
pml_v_enable_mpi_threads);
|
||||
|
||||
mca_vprotocol_base_select(pml_v_enable_progress_treads,
|
||||
pml_v_enable_mpi_threads);
|
||||
|
||||
/* Check if we succeeded selecting a protocol */
|
||||
if(mca_vprotocol_base_selected()) {
|
||||
V_OUTPUT_VERBOSE(1, "I don't want to die: I will parasite %s host component %s with %s %s",
|
||||
mca_pml_base_selected_component.pmlm_version.mca_type_name,
|
||||
|
@ -15,7 +15,7 @@ mca_vprotocol_pessimist_module_t mca_vprotocol_pessimist =
|
||||
{
|
||||
/* mca_pml_base_module_add_procs_fn_t */ NULL,
|
||||
/* mca_pml_base_module_del_procs_fn_t */ NULL,
|
||||
/* mca_pml_base_module_enable_fn_f */ NULL,
|
||||
/* mca_pml_base_module_enable_fn_f */ mca_vprotocol_pessimist_enable,
|
||||
/* mca_pml_base_module_progress_fn_t */ NULL, /*mca_vprotocol_pessimist_progress,*/
|
||||
|
||||
/* mca_pml_base_module_add_comm_fn_t */ NULL,
|
||||
|
@ -45,7 +45,7 @@ typedef struct mca_vprotocol_pessimist_module_t {
|
||||
|
||||
extern mca_vprotocol_pessimist_module_t mca_vprotocol_pessimist;
|
||||
|
||||
|
||||
int mca_vprotocol_pessimist_enable(bool enable);
|
||||
int mca_vprotocol_pessimist_dump(struct ompi_communicator_t* comm, int verbose);
|
||||
|
||||
int mca_vprotocol_pessimist_add_procs(struct ompi_proc_t **procs, size_t nprocs);
|
||||
|
@ -112,10 +112,6 @@ static mca_vprotocol_base_module_t *mca_vprotocol_pessimist_component_init( int*
|
||||
mca_vprotocol_pessimist.event_buffer =
|
||||
(vprotocol_pessimist_mem_event_t *) malloc(_event_buffer_size);
|
||||
|
||||
if(vprotocol_pessimist_sender_based_init(_mmap_file_name,
|
||||
_sender_based_size) == -1)
|
||||
return NULL;
|
||||
|
||||
return &mca_vprotocol_pessimist.super;
|
||||
}
|
||||
|
||||
@ -123,11 +119,24 @@ static int mca_vprotocol_pessimist_component_finalize(void)
|
||||
{
|
||||
V_OUTPUT_VERBOSE(500, "vprotocol_pessimist_finalize");
|
||||
free(mca_vprotocol_pessimist.event_buffer);
|
||||
vprotocol_pessimist_sender_based_finalize();
|
||||
/** TODO: fix memleak... */
|
||||
OBJ_DESTRUCT(&mca_vprotocol_pessimist.replay_events);
|
||||
OBJ_DESTRUCT(&mca_vprotocol_pessimist.pending_events);
|
||||
OBJ_DESTRUCT(&mca_vprotocol_pessimist.events_pool);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_vprotocol_pessimist_enable(bool enable) {
|
||||
if(enable) {
|
||||
int ret;
|
||||
if((ret = vprotocol_pessimist_sender_based_init(_mmap_file_name,
|
||||
_sender_based_size)) < OPAL_SUCCESS)
|
||||
return ret;
|
||||
}
|
||||
else {
|
||||
vprotocol_pessimist_sender_based_finalize();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
static inline int mca_param_register_int( const char* param_name,
|
||||
|
@ -27,7 +27,7 @@ static void vprotocol_pessimist_request_construct(mca_pml_base_request_t *req)
|
||||
req->req_ompi.req_status.MPI_SOURCE = -1; /* no matching made flag */
|
||||
VPESSIMIST_REQ(req)->pml_req_free = req->req_ompi.req_free;
|
||||
VPESSIMIST_REQ(req)->event = NULL;
|
||||
VPESSIMIST_REQ(req)->sb_reqs[0] = NULL;
|
||||
/* VPESSIMIST_REQ(req)->sb_reqs[0] = NULL;*/
|
||||
assert(VPESSIMIST_REQ(req)->pml_req_free == req->req_ompi.req_free); /* detection of aligment issues on different arch */
|
||||
req->req_ompi.req_free = mca_vprotocol_pessimist_request_free;
|
||||
}
|
||||
|
@ -23,8 +23,10 @@ extern "C" {
|
||||
typedef struct mca_vprotocol_pessimist_request_t {
|
||||
ompi_request_free_fn_t pml_req_free;
|
||||
vprotocol_pessimist_clock_t reqid;
|
||||
ompi_request_t *sb_reqs[2];
|
||||
/* ompi_request_t *sb_reqs[2]; */
|
||||
mca_vprotocol_pessimist_event_t *event;
|
||||
convertor_advance_fct_t conv_advance;
|
||||
uint32_t conv_flags;
|
||||
} mca_vprotocol_pessimist_request_t;
|
||||
typedef mca_vprotocol_pessimist_request_t mca_vprotocol_pessimist_recv_request_t;
|
||||
typedef mca_vprotocol_pessimist_request_t mca_vprotocol_pessimist_send_request_t;
|
||||
|
@ -23,6 +23,13 @@
|
||||
int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size)
|
||||
{
|
||||
char path[PATH_MAX];
|
||||
#ifdef SB_USE_CONVERTOR_METHOD
|
||||
mca_pml_base_send_request_t pml_req;
|
||||
sb.sb_conv_to_pessimist_offset = VPROTOCOL_SEND_REQ(NULL) -
|
||||
((uintptr_t) & pml_req.req_base.req_convertor -
|
||||
(uintptr_t) & pml_req);
|
||||
V_OUTPUT_VERBOSE(1, "conv_to_pessimist_offset: %p", sb.sb_conv_to_pessimist_offset);
|
||||
#endif
|
||||
sb.sb_offset = 0;
|
||||
sb.sb_length = size;
|
||||
sb.sb_pagesize = getpagesize();
|
||||
@ -39,9 +46,9 @@ int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size)
|
||||
{
|
||||
V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_init: open (%s): %s",
|
||||
path, strerror(errno));
|
||||
return -1;
|
||||
return OPAL_ERR_FILE_OPEN_FAILURE;
|
||||
}
|
||||
return sb.sb_fd;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
void vprotocol_pessimist_sender_based_finalize(void)
|
||||
@ -69,7 +76,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len)
|
||||
{
|
||||
if(((uintptr_t) NULL) != sb.sb_addr)
|
||||
munmap((void *) sb.sb_addr, sb.sb_length);
|
||||
#if SB_USE_SELFCOMM_METHOD
|
||||
#ifdef SB_USE_SELFCOMM_METHOD
|
||||
else
|
||||
ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1);
|
||||
#endif
|
||||
@ -124,13 +131,17 @@ uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t* pC
|
||||
uint32_t* out_size,
|
||||
size_t* max_data) {
|
||||
int ret;
|
||||
vprotocol_pessimist_send_request_t * preq;
|
||||
mca_vprotocol_pessimist_send_request_t *preq;
|
||||
|
||||
preq = VPESSIMIST_SEND_REQ(pConvertor);
|
||||
preq = VPESSIMIST_CONV_REQ(pConvertor);
|
||||
V_OUTPUT_VERBOSE(1, "pessimist:\tsb\tadvce\t %p (+%p) %p\tfadvce %p\tout %u\tmax %lu", pConvertor, mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset, preq, preq->conv_advance, *out_size, *max_data);
|
||||
pConvertor->flags = preq->conv_flags;
|
||||
pConvertor->f_advance = preq->conv_advance;
|
||||
ret = pConvertor->f_advance(pConvertor, iov, out_size, max_data);
|
||||
|
||||
pConvertor->fAdvance = preq->conv_advance;
|
||||
ret = ompi_convertor_pack(pConvertor, iov, out_size, max_data);
|
||||
V_OUTPUT_VERBOSE(1, "pessimist:\tsb\tadvce\t%p\tout %u\tmax %lu", preq, *out_size, *max_data);
|
||||
|
||||
pConvertor->flags &= ~CONVERTOR_NO_OP;
|
||||
pConvertor->fAdvance = vprotocol_pessimist_sender_based_convertor_advance;
|
||||
return ret;
|
||||
}
|
||||
#endif
|
||||
|
@ -18,18 +18,21 @@
|
||||
/* There is several different ways of packing the data to the sender-based
|
||||
* buffer. Just pick one.
|
||||
*/
|
||||
#define SB_USE_PACK_METHOD
|
||||
#undef SB_USE_PACK_METHOD
|
||||
#undef SB_USE_SELFCOMM_METHOD
|
||||
#undef SB_USE_CONVERTOR_METHOD
|
||||
#define SB_USE_CONVERTOR_METHOD
|
||||
|
||||
|
||||
typedef struct vprotocol_pessimist_sender_based_t
|
||||
{
|
||||
int sb_pagesize; /* size of memory pages on this architecture */
|
||||
#ifdef SB_USE_CONVERTOR_METHOD
|
||||
uintptr_t sb_conv_to_pessimist_offset; /* end of request from req_conv */
|
||||
#endif
|
||||
#ifdef SB_USE_SELF_METHOD
|
||||
ompi_communicator_t *sb_comm;
|
||||
#endif
|
||||
|
||||
|
||||
int sb_fd; /* file descriptor of mapped file */
|
||||
off_t sb_offset; /* offset in mmaped file */
|
||||
|
||||
@ -144,23 +147,30 @@ void vprotocol_pessimist_sender_based_alloc(size_t len);
|
||||
|
||||
#elif defined(SB_USE_CONVERTOR_METHOD)
|
||||
/* Convertor replacement (non blocking) method */
|
||||
convertor_advance_fct_t vprotocol_pessimist_sender_based_convertor_advance;
|
||||
uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t*,
|
||||
struct iovec*,
|
||||
uint32_t*,
|
||||
size_t*);
|
||||
|
||||
#define __SENDER_BASED_METHOD_COPY(REQ) do { \
|
||||
ompi_convertor_t *pConv; \
|
||||
vprotocol_pessimist_send_request_t *pReq; \
|
||||
mca_vprotocol_pessimist_send_request_t *pReq; \
|
||||
\
|
||||
pConv = & (REQ)->req_base.req_convertor; \
|
||||
pReq = VPESSIMIST_SEND_REQ(REQ); \
|
||||
pReq->conv_flags = pConv->flags; \
|
||||
pReq->conv_advance = pConv->f_advance; \
|
||||
\
|
||||
pConv->flags |= CONVERTOR_NO_OP; \
|
||||
pConv->f_advance = vprotocol_pessimist_sender_based_convertor_advance; \
|
||||
pReq->conv_advance = pConv->fAdvance; \
|
||||
V_OUTPUT_VERBOSE(1, "req %p preq %p conv %p advance %p", (REQ), pReq, pConv, pConv->fAdvance); \
|
||||
pConv->flags &= ~CONVERTOR_NO_OP; \
|
||||
pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
|
||||
} while(0)
|
||||
|
||||
#define __SENDER_BASED_METHOD_FLUSH(REQ)
|
||||
|
||||
#define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
|
||||
(mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
|
||||
(uintptr_t) (CONV)))
|
||||
|
||||
#endif /* SB_USE_*_METHOD */
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user