1
1
openmpi/ompi/mca/vprotocol/pessimist/vprotocol_pessimist_sender_based.h
2015-06-23 20:59:57 -07:00

215 строки
8.2 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of the University of Tennessee.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef __VPROTOCOL_PESSIMIST_SENDERBASED_H__
#define __VPROTOCOL_PESSIMIST_SENDERBASED_H__
#include "ompi_config.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/pml/v/pml_v_output.h"
#include "vprotocol_pessimist_sender_based_types.h"
#include "vprotocol_pessimist_request.h"
#include "vprotocol_pessimist.h"
BEGIN_C_DECLS
/** Prepare for using the sender based storage
*/
int vprotocol_pessimist_sender_based_init(const char *mmapfile, size_t size);
/** Cleanup mmap etc
*/
void vprotocol_pessimist_sender_based_finalize(void);
/** Manage mmap floating window, allocating enough memory for the message to be
* asynchronously copied to disk.
*/
void vprotocol_pessimist_sender_based_alloc(size_t len);
/*******************************************************************************
* Convertor pack (blocking) method (good latency, bad bandwidth)
*/
#if defined(SB_USE_PACK_METHOD)
static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *pmlreq)
{
if(0 != pmlreq->req_bytes_packed)
{
opal_convertor_t conv;
size_t max_data;
size_t zero = 0;
unsigned int iov_count = 1;
struct iovec iov;
max_data = iov.iov_len = pmlreq->req_bytes_packed;
iov.iov_base = (IOVBASE_TYPE *) VPESSIMIST_SEND_FTREQ(pmlreq)->sb.cursor;
opal_convertor_clone_with_position( &pmlreq->req_base.req_convertor,
&conv, 0, &zero );
opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
}
}
#define __SENDER_BASED_METHOD_FLUSH(REQ)
/*******************************************************************************
* Convertor replacement (non blocking) method (under testing)
*/
#elif defined(SB_USE_CONVERTOR_METHOD)
int32_t vprotocol_pessimist_sender_based_convertor_advance(opal_convertor_t*,
struct iovec*,
uint32_t*,
size_t*);
#define __SENDER_BASED_METHOD_COPY(REQ) do { \
opal_convertor_t *pConv; \
mca_vprotocol_pessimist_send_request_t *ftreq; \
\
pConv = & (REQ)->req_base.req_convertor; \
ftreq = VPESSIMIST_SEND_FTREQ(REQ); \
ftreq->sb.conv_flags = pConv->flags; \
ftreq->sb.conv_advance = pConv->fAdvance; \
\
pConv->flags &= ~CONVERTOR_NO_OP; \
pConv->fAdvance = vprotocol_pessimist_sender_based_convertor_advance; \
} while(0)
#define __SENDER_BASED_METHOD_FLUSH(REQ)
#define VPESSIMIST_CONV_REQ(CONV) ((mca_vprotocol_pessimist_send_request_t *) \
(mca_vprotocol_pessimist.sender_based.sb_conv_to_pessimist_offset + \
(uintptr_t) ((CONV)->clone_of)))
/*******************************************************************************
* progress method
*/
#elif defined(SB_USE_PROGRESS_METHOD)
static inline void __SENDER_BASED_METHOD_COPY(mca_pml_base_send_request_t *req)
{
if(req->req_bytes_packed)
{
mca_vprotocol_pessimist_send_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
ftreq->sb.bytes_progressed = 0;
opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
&ftreq->list_item);
}
}
static inline int vprotocol_pessimist_sb_progress_req(mca_pml_base_send_request_t *req)
{
mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
size_t max_data = 0;
if(ftreq->sb.bytes_progressed < req->req_bytes_packed)
{
opal_convertor_t conv;
unsigned int iov_count = 1;
struct iovec iov;
uintptr_t position = ftreq->sb.bytes_progressed;
max_data = req->req_bytes_packed - ftreq->sb.bytes_progressed;
iov.iov_len = max_data;
iov.iov_base = (IOVBASE_TYPE *) (ftreq->sb.cursor + position);
V_OUTPUT_VERBOSE(80, "pessimist:\tsb\tprgress\t%"PRIpclock"\tsize %lu from position %lu", ftreq->reqid, max_data, position);
opal_convertor_clone_with_position(&req->req_base.req_convertor,
&conv, 0, &position );
opal_convertor_pack(&conv, &iov, &iov_count, &max_data);
ftreq->sb.bytes_progressed += max_data;
}
return max_data;
}
static inline int vprotocol_pessimist_sb_progress_all_reqs(void)
{
int ret = 0;
/* progress any waiting Sender Based copy */
if(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq))
{
mca_vprotocol_pessimist_request_t *ftreq = (mca_vprotocol_pessimist_request_t *)
opal_list_remove_first(&mca_vprotocol_pessimist.sender_based.sb_sendreq);
if(vprotocol_pessimist_sb_progress_req(VPROTOCOL_SEND_REQ(ftreq)))
ret = 1;
opal_list_append(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
&ftreq->list_item);
}
return ret;
}
static inline void __SENDER_BASED_METHOD_FLUSH(ompi_request_t *req)
{
mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
if((pmlreq->req_base.req_type == MCA_PML_REQUEST_SEND) &&
pmlreq->req_bytes_packed)
{
mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
assert(!opal_list_is_empty(&mca_vprotocol_pessimist.sender_based.sb_sendreq));
opal_list_remove_item(&mca_vprotocol_pessimist.sender_based.sb_sendreq,
(opal_list_item_t *) ftreq);
vprotocol_pessimist_sb_progress_req(pmlreq);
assert(pmlreq->req_bytes_packed == ftreq->sb.bytes_progressed);
}
}
#endif /* SB_USE_*_METHOD */
/** Copy data associated to a pml_base_send_request_t to the sender based
* message payload buffer
*/
static inline void vprotocol_pessimist_sender_based_copy_start(ompi_request_t *req)
{
vprotocol_pessimist_sender_based_header_t *sbhdr;
mca_vprotocol_pessimist_request_t *ftreq = VPESSIMIST_SEND_FTREQ(req);
mca_pml_base_send_request_t *pmlreq = (mca_pml_base_send_request_t *) req;
/* Allocate enough sender-based space to hold the message */
if(mca_vprotocol_pessimist.sender_based.sb_available <
pmlreq->req_bytes_packed +
sizeof(vprotocol_pessimist_sender_based_header_t))
{
vprotocol_pessimist_sender_based_alloc(pmlreq->req_bytes_packed);
}
/* Copy message header to the sender-based space */
/* /!\ This is NOT thread safe */
ftreq->sb.cursor = mca_vprotocol_pessimist.sender_based.sb_cursor;
#if 1
mca_vprotocol_pessimist.sender_based.sb_cursor +=
sizeof(vprotocol_pessimist_sender_based_header_t) +
pmlreq->req_bytes_packed;
mca_vprotocol_pessimist.sender_based.sb_available -=
sizeof(vprotocol_pessimist_sender_based_header_t) +
pmlreq->req_bytes_packed;
#endif
sbhdr = (vprotocol_pessimist_sender_based_header_t *) ftreq->sb.cursor;
sbhdr->size = pmlreq->req_bytes_packed;
sbhdr->dst = pmlreq->req_base.req_peer;
sbhdr->tag = pmlreq->req_base.req_tag;
sbhdr->contextid = pmlreq->req_base.req_comm->c_contextid;
sbhdr->sequence = pmlreq->req_base.req_sequence;
ftreq->sb.cursor += sizeof(vprotocol_pessimist_sender_based_header_t);
V_OUTPUT_VERBOSE(70, "pessimist:\tsb\tsend\t%"PRIpclock"\tsize %lu (+%lu header)", VPESSIMIST_FTREQ(req)->reqid, (long unsigned)pmlreq->req_bytes_packed, (long unsigned)sizeof(vprotocol_pessimist_sender_based_header_t));
/* Use one of the previous data copy method */
__SENDER_BASED_METHOD_COPY(pmlreq);
}
/** Ensure sender based is finished before allowing user to touch send buffer
*/
#define vprotocol_pessimist_sender_based_flush(REQ) __SENDER_BASED_METHOD_FLUSH(REQ)
END_C_DECLS
#endif