diff --git a/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.c b/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.c index b8de5d08ab..68b9e9970d 100644 --- a/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.c +++ b/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.c @@ -28,7 +28,9 @@ int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size) sb.sb_pagesize = getpagesize(); sb.sb_cursor = sb.sb_addr = (uintptr_t) NULL; sb.sb_available = 0; +#ifdef SB_USE_SELFCOMM_METHOD sb.sb_comm = MPI_COMM_NULL; +#endif sprintf(path, "%s"OPAL_PATH_SEP"%s", orte_process_info.proc_session_dir, mmapfile); @@ -46,13 +48,8 @@ void vprotocol_pessimist_sender_based_finalize(void) { int ret; - if(sb.sb_comm != MPI_COMM_NULL) + if(((uintptr_t) NULL) != sb.sb_addr) { -/* TODO: check this has already been freed by MPI_Finalize - * ret = ompi_comm_free(&sb.sb_comm); - * if(MPI_SUCCESS != ret) - * opal_output(0, "pml_v: protocol_pessimist: sender_based_finalize: ompi_comm_free failed (%d)", ret); - */ ret = munmap((void *) sb.sb_addr, sb.sb_length); if(-1 == ret) V_OUTPUT_ERR("pml_v: protocol_pessimsit: sender_based_finalize: munmap (%p): %s", @@ -70,11 +67,13 @@ void vprotocol_pessimist_sender_based_finalize(void) */ void vprotocol_pessimist_sender_based_alloc(size_t len) { - if(sb.sb_comm == MPI_COMM_NULL) - ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1); - else + if(((uintptr_t) NULL) != sb.sb_addr) munmap((void *) sb.sb_addr, sb.sb_length); - +#if SB_USE_SELFCOMM_METHOD + else + ompi_comm_dup(MPI_COMM_SELF, &sb.sb_comm, 1); +#endif + /* Take care of alignement of sb_offset */ sb.sb_offset += sb.sb_cursor - sb.sb_addr; sb.sb_cursor = sb.sb_offset % sb.sb_pagesize; @@ -91,7 +90,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len) if(-1 == lseek(sb.sb_fd, sb.sb_offset + sb.sb_length, SEEK_SET)) { V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: lseek: %s", - strerror(errno)); + § strerror(errno)); close(sb.sb_fd); ompi_mpi_abort(MPI_COMM_NULL, MPI_ERR_NO_SPACE, false); } @@ -105,8 +104,9 @@ void vprotocol_pessimist_sender_based_alloc(size_t len) sb.sb_addr = (uintptr_t) mmap((void *) sb.sb_addr, sb.sb_length, PROT_WRITE | PROT_READ, MAP_SHARED, sb.sb_fd, sb.sb_offset); -#endif +#else sb.sb_addr = (uintptr_t) malloc(sb.sb_length); +#endif if(((uintptr_t) -1) == sb.sb_addr) { V_OUTPUT_ERR("pml_v: vprotocol_pessimist: sender_based_alloc: mmap: %s", @@ -118,4 +118,21 @@ void vprotocol_pessimist_sender_based_alloc(size_t len) V_OUTPUT_VERBOSE(30, "pessimist:\tsb\tgrow\toffset %llu\tlength %llu\tbase %p\tcursor %p", (unsigned long long) sb.sb_offset, (unsigned long long) sb.sb_length, (void *) sb.sb_addr, (void *) sb.sb_cursor); } +#ifdef SB_USE_CONVERTOR_METHOD +uint32_t vprotocol_pessimist_sender_based_convertor_advance(ompi_convertor_t* pConvertor, + struct iovec* iov, + uint32_t* out_size, + size_t* max_data) { + int ret; + vprotocol_pessimist_send_request_t * preq; + + preq = VPESSIMIST_SEND_REQ(pConvertor); + pConvertor->flags = preq->conv_flags; + pConvertor->f_advance = preq->conv_advance; + ret = pConvertor->f_advance(pConvertor, iov, out_size, max_data); + + return ret; +} +#endif + #undef sb diff --git a/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.h b/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.h index 8761ab11ff..43401a1f01 100644 --- a/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.h +++ b/ompi/mca/pml/v/vprotocol/pessimist/vprotocol_pessimist_sender_based.h @@ -15,11 +15,21 @@ #include "ompi/mca/pml/base/pml_base_sendreq.h" +/* There is several different ways of packing the data to the sender-based + * buffer. Just pick one. + */ +#define SB_USE_PACK_METHOD +#undef SB_USE_SELFCOMM_METHOD +#undef SB_USE_CONVERTOR_METHOD + + typedef struct vprotocol_pessimist_sender_based_t { int sb_pagesize; /* size of memory pages on this architecture */ +#ifdef SB_USE_SELF_METHOD ompi_communicator_t *sb_comm; - +#endif + int sb_fd; /* file descriptor of mapped file */ off_t sb_offset; /* offset in mmaped file */ @@ -49,6 +59,7 @@ void vprotocol_pessimist_sender_based_finalize(void); */ void vprotocol_pessimist_sender_based_alloc(size_t len); + /** Copy data associated to a pml_base_send_request_t to the sender based * message payload buffer */ @@ -60,10 +71,10 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); { \ vprotocol_pessimist_sender_based_alloc(req->req_bytes_packed); \ } \ - __SENDER_BASED_PACK(req); \ + __SENDER_BASED_COPY(req); \ } while(0) -#define __SENDER_BASED_PACK(req) do { \ +#define __SENDER_BASED_COPY(req) do { \ vprotocol_pessimist_sender_based_header_t *sbhdr = \ (vprotocol_pessimist_sender_based_header_t *) \ mca_vprotocol_pessimist.sender_based.sb_cursor; \ @@ -75,7 +86,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); mca_vprotocol_pessimist.sender_based.sb_cursor += \ sizeof(vprotocol_pessimist_sender_based_header_t); \ \ - __SENDER_BASED_CNVTOR_PACK(req); \ + __SENDER_BASED_METHOD_COPY(req); \ mca_vprotocol_pessimist.sender_based.sb_cursor += sbhdr->size; \ mca_vprotocol_pessimist.sender_based.sb_available -= (sbhdr->size + \ sizeof(vprotocol_pessimist_sender_based_header_t)); \ @@ -83,14 +94,12 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); } while(0) /** Ensure sender based is finished before allowing user to touch send buffer -*/ -#define VPROTOCOL_PESSIMIST_SENDER_BASED_FLUSH(REQ) __SENDER_BASED_CNVTOR_FLUSH(REQ) + */ +#define VPROTOCOL_PESSIMIST_SENDER_BASED_FLUSH(REQ) __SENDER_BASED_METHOD_FLUSH(REQ) - -/* There is 2 different ways of packing the data to the sender-based buffer - * just pick one - */ -#define __SENDER_BASED_CNVTOR_PACK(req) do { \ +#if defined(SB_USE_PACK_METHOD) + /* Convertor pack (blocking) method */ +#define __SENDER_BASED_METHOD_COPY(req) do { \ if(0 != req->req_bytes_packed) { \ ompi_convertor_t conv; \ size_t max_data; \ @@ -107,11 +116,11 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); } \ } while(0) -#define __SENDER_BASED_CNVTOR_FLUSH(REQ) +#define __SENDER_BASED_METHOD_FLUSH(REQ) - - -#define __SENDER_BASED_SNDRCV_PACK(req) do { \ +#elif defined(SB_USE_SELFCOMM_METHOD) +/* iRecv/Send on SELF pack method */ +#define __SENDER_BASED_METHOD_COPY(req) do { \ mca_pml_v.host_pml.pml_irecv( \ mca_vprotocol_pessimist.sender_based.sb_cursor, \ req->req_bytes_packed, MPI_PACKED, 0, 0, \ @@ -124,7 +133,7 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); &VPESSIMIST_SEND_REQ(req)->sb_reqs[1]); \ } while(0); -#define __SENDER_BASED_SNDRCV_FLUSH(REQ) do { \ +#define __SENDER_BASED_METHOD_FLUSH(REQ) do { \ if(NULL != VPESSIMIST_REQ(REQ)->sb_reqs[0]) \ { \ ompi_request_wait_all(2, VPESSIMIST_REQ(REQ)->sb_reqs, \ @@ -133,4 +142,25 @@ void vprotocol_pessimist_sender_based_alloc(size_t len); } \ } while(0) +#elif defined(SB_USE_CONVERTOR_METHOD) +/* Convertor replacement (non blocking) method */ +convertor_advance_fct_t vprotocol_pessimist_sender_based_convertor_advance; + +#define __SENDER_BASED_METHOD_COPY(REQ) do { \ + ompi_convertor_t *pConv; \ + vprotocol_pessimist_send_request_t *pReq; \ + \ + pConv = & (REQ)->req_base.req_convertor; \ + pReq = VPESSIMIST_SEND_REQ(REQ); \ + pReq->conv_flags = pConv->flags; \ + pReq->conv_advance = pConv->f_advance; \ + \ + pConv->flags |= CONVERTOR_NO_OP; \ + pConv->f_advance = vprotocol_pessimist_sender_based_convertor_advance; \ +} while(0) + +#define __SENDER_BASED_METHOD_FLUSH(REQ) + +#endif /* SB_USE_*_METHOD */ + #endif