1
1

changed to PML/PTL I/F to support both put/get semantics

and receiver side scheduling

This commit was SVN r1140.
Этот коммит содержится в:
Tim Woodall 2004-05-20 13:45:34 +00:00
родитель 63fb9ea1ee
Коммит 9fa9cd9b0a
21 изменённых файлов: 161 добавлений и 94 удалений

Просмотреть файл

@ -374,8 +374,7 @@ static inline void lam_obj_run_destructors(lam_object_t *object)
* @param cls Pointer to the class descriptor of this object
* @return Pointer to the object
*/
static inline lam_object_t *lam_obj_new(size_t size,
lam_class_t *cls)
static inline lam_object_t *lam_obj_new(size_t size, lam_class_t *cls)
{
lam_object_t *object;

Просмотреть файл

@ -160,7 +160,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
proc->proc_pml = proc_pml;
}
/* allow each ptl to register with the proc */
/* attempt to add the proc to each ptl */
for(p_index = 0; p_index < mca_pml_teg.teg_num_ptls; p_index++) {
mca_ptl_t* ptl = mca_pml_teg.teg_ptls[p_index];
@ -181,6 +181,8 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
/* if this ptl supports exclusive access then don't allow
* subsequent ptls to register
*/
proc_pml->proc_ptl_flags |= ptl->ptl_flags;
if(ptl->ptl_exclusivity)
break;
}

Просмотреть файл

@ -77,7 +77,7 @@ int mca_pml_teg_recv(
struct lam_communicator_t* comm,
lam_status_public_t* status)
{
int rc, index;
int rc;
mca_ptl_base_recv_request_t *recvreq;
MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
#if MCA_PML_TEG_STATISTICS

Просмотреть файл

@ -11,6 +11,7 @@
static void mca_pml_teg_proc_construct(mca_pml_proc_t* proc)
{
proc->proc_lam = NULL;
proc->proc_ptl_flags = 0;
OBJ_CONSTRUCT(&proc->proc_lock, lam_mutex_t);
OBJ_CONSTRUCT(&proc->proc_ptl_first, mca_pml_teg_ptl_array_t);
OBJ_CONSTRUCT(&proc->proc_ptl_next, mca_pml_teg_ptl_array_t);

Просмотреть файл

@ -23,6 +23,7 @@ struct mca_pml_proc_t {
lam_mutex_t proc_lock; /**< lock to protect against concurrent access */
mca_ptl_array_t proc_ptl_first; /**< array of ptls to use for first fragments */
mca_ptl_array_t proc_ptl_next; /**< array of ptls to use for remaining fragments */
uint32_t proc_ptl_flags; /**< aggregate ptl flags */
};
typedef struct mca_pml_proc_t mca_pml_proc_t;

Просмотреть файл

@ -9,7 +9,7 @@ void mca_pml_teg_recv_request_progress(
THREAD_LOCK(&mca_pml_teg.teg_request_lock);
req->req_bytes_delivered += frag->super.frag_size;
req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length;
if (req->req_bytes_received >= req->req_bytes_msg) {
if (req->req_bytes_received >= req->req_bytes_packed) {
/* initialize request status */
req->super.req_status.MPI_SOURCE = req->super.req_peer;
req->super.req_status.MPI_TAG = req->super.req_tag;

Просмотреть файл

@ -25,7 +25,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
mca_pml_proc_t* proc_pml = proc->proc_pml;
/* allocate remaining bytes to PTLs */
size_t bytes_remaining = req->req_bytes_msg - req->req_offset;
size_t bytes_remaining = req->req_bytes_packed - req->req_offset;
size_t num_ptl_avail = proc_pml->proc_ptl_next.ptl_size;
size_t num_ptl = 0;
while(bytes_remaining > 0 && num_ptl++ < num_ptl_avail) {
@ -50,9 +50,11 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100;
}
rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0);
if(rc == LAM_SUCCESS)
bytes_remaining = req->req_bytes_msg - req->req_offset;
rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, &bytes_to_frag, 0);
if(rc == LAM_SUCCESS) {
req->req_offset += bytes_to_frag;
bytes_remaining = req->req_bytes_packed - req->req_offset;
}
}
/* unable to complete send - signal request failed */
@ -71,9 +73,11 @@ void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag)
{
bool first_frag;
THREAD_LOCK(&mca_pml_teg.teg_request_lock);
first_frag = (req->req_bytes_sent == 0);
req->req_bytes_sent += frag->super.frag_size;
if (req->req_bytes_sent >= req->req_bytes_msg) {
if (req->req_bytes_sent >= req->req_bytes_packed) {
req->super.req_pml_done = true;
if (req->super.req_mpi_done == false) {
req->super.req_status.MPI_SOURCE = req->super.req_comm->c_my_rank;
@ -91,8 +95,8 @@ void mca_pml_teg_send_request_progress(
}
THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
/* if first fragment - schedule remaining fragments */
if(req->req_frags == 1) {
/* if first fragment - shedule remaining fragments */
if(first_frag == 1) {
mca_pml_teg_send_request_schedule(req);
}
}

Просмотреть файл

@ -45,17 +45,19 @@ static inline int mca_pml_teg_send_request_start(
int flags, rc;
/* start the first fragment */
if(first_fragment_size <= 0 || req->req_bytes_msg <= first_fragment_size) {
first_fragment_size = req->req_bytes_msg;
flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? MCA_PTL_FLAGS_ACK_MATCHED : 0;
if(first_fragment_size <= 0 || req->req_bytes_packed <= first_fragment_size) {
first_fragment_size = req->req_bytes_packed;
flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ?
MCA_PTL_FLAGS_ACK_MATCHED : 0;
} else {
/* require match for first fragment of a multi-fragment message or if synchronous send */
/* require match for first fragment of a multi-fragment */
flags = MCA_PTL_FLAGS_ACK_MATCHED;
}
rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size, flags);
rc = ptl->ptl_put(ptl, req->req_peer, req, 0, &first_fragment_size, flags);
if(rc != LAM_SUCCESS)
return rc;
req->req_offset += first_fragment_size;
return LAM_SUCCESS;
}

Просмотреть файл

@ -10,7 +10,10 @@ int mca_pml_teg_wait(
int *index,
lam_status_public_t* status)
{
int c, i;
#if LAM_HAVE_THREADS
int c;
#endif
int i;
int completed = -1;
mca_pml_base_request_t* pml_request;

Просмотреть файл

@ -14,6 +14,7 @@
#define MCA_PTL_HDR_TYPE_FRAG 1
#define MCA_PTL_HDR_TYPE_ACK 2
#define MCA_PTL_HDR_TYPE_NACK 3
#define MCA_PTL_HDR_TYPE_GET 4
#define MCA_PTL_FLAGS_ACK_MATCHED 1
#define MCA_PTL_FLAGS_ACK_AGGREGATE 2
@ -37,7 +38,7 @@ struct mca_ptl_base_frag_header_t {
mca_ptl_base_common_header_t hdr_common; /**< common attributes */
uint32_t hdr_frag_length; /**< fragment length */
uint32_t hdr_frag_offset; /**< offset into message */
mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */
mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */
lam_ptr_t hdr_src_ptr; /**< pointer to source fragment */
lam_ptr_t hdr_dst_ptr; /**< pointer to matched receive */
};
@ -55,7 +56,8 @@ struct mca_ptl_base_match_header_t {
int32_t hdr_dst; /**< destination rank */
int32_t hdr_tag; /**< user tag */
uint32_t hdr_msg_length; /**< message length */
mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */
mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */
lam_ptr_t hdr_src_ptr;
};
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
@ -66,7 +68,9 @@ typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
struct mca_ptl_base_ack_header_t {
mca_ptl_base_common_header_t hdr_common; /**< common attributes */
lam_ptr_t hdr_src_ptr; /**< source fragment */
lam_ptr_t hdr_dst_ptr; /**< matched receive request */
lam_ptr_t hdr_dst_match; /**< matched receive request */
lam_ptr_t hdr_dst_addr; /**< posted receive buffer */
uint32_t hdr_dst_size; /**< size of posted buffer */
/* sequence range? */
};
typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t;

Просмотреть файл

@ -50,12 +50,12 @@ static inline bool mca_ptl_base_recv_frag_match(
/*
* Initialize request status.
*/
request->req_bytes_msg = header->hdr_msg_length;
request->req_bytes_packed = header->hdr_msg_length;
request->super.req_peer = header->hdr_src;
request->super.req_tag = header->hdr_tag;
/* notify ptl of match */
ptl->ptl_recv(ptl, frag);
ptl->ptl_matched(ptl, frag);
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);

Просмотреть файл

@ -56,7 +56,7 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque
(frag = mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) != NULL) {
mca_ptl_t* ptl = frag->super.frag_owner;
THREAD_UNLOCK(&pml_comm->c_matching_lock);
ptl->ptl_recv(ptl, frag);
ptl->ptl_matched(ptl, frag);
return; /* match found */
}
@ -102,7 +102,7 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
if ((frag = mca_ptl_base_recv_request_match_specific_proc(request, proc)) != NULL) {
mca_ptl_t* ptl = frag->super.frag_owner;
THREAD_UNLOCK(&pml_comm->c_matching_lock);
ptl->ptl_recv(ptl, frag);
ptl->ptl_matched(ptl, frag);
return; /* match found */
}
}
@ -141,7 +141,7 @@ static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc(
continue;
}
lam_list_remove_item(unexpected_frags, (lam_list_item_t*)frag);
request->req_bytes_msg = header->hdr_msg_length;
request->req_bytes_packed = header->hdr_msg_length;
request->super.req_tag = header->hdr_tag;
request->super.req_peer = header->hdr_src;
frag->frag_request = request;

Просмотреть файл

@ -18,7 +18,7 @@ struct mca_ptl_base_recv_frag_t;
*/
struct mca_ptl_base_recv_request_t {
mca_pml_base_request_t super; /**< base request */
size_t req_bytes_msg; /**< size of message being received */
size_t req_bytes_packed; /**< size of message being received */
size_t req_bytes_received; /**< number of bytes received from network */
size_t req_bytes_delivered; /**< number of bytes delivered to user */
};
@ -47,7 +47,7 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
comm, \
persistent) \
{ \
request->req_bytes_msg = 0; \
request->req_bytes_packed = 0; \
request->req_bytes_received = 0; \
request->req_bytes_delivered = 0; \
request->super.req_sequence = 0; \

Просмотреть файл

@ -24,13 +24,14 @@ struct mca_ptl_base_send_frag_t;
struct mca_ptl_base_send_request_t {
mca_pml_base_request_t super; /** base request type - common data structure for use by wait/test */
size_t req_offset; /**< number of bytes that have already been assigned to a fragment */
size_t req_frags; /**< number of fragments that have been allocated */
size_t req_bytes_msg; /**< packed size of a message given the datatype and count */
size_t req_bytes_packed; /**< packed size of a message given the datatype and count */
size_t req_bytes_sent; /**< number of bytes that have been sent */
mca_pml_base_send_mode_t req_send_mode; /**< type of send */
struct mca_ptl_t* req_owner; /**< PTL that allocated this descriptor */
struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */
lam_ptr_t req_peer_request; /**< matched receive at peer */
lam_ptr_t req_peer_match; /**< matched receive at peer */
lam_ptr_t req_peer_addr;
size_t req_peer_size;
lam_convertor_t req_convertor; /**< convertor that describes this datatype */
};
typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
@ -62,10 +63,11 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
persistent) \
{ \
request->req_offset = 0; \
request->req_frags = 0; \
request->req_bytes_sent = 0; \
request->req_send_mode = mode; \
request->req_peer_request.lval = 0; \
request->req_peer_match.lval = 0; \
request->req_peer_addr.lval = 0; \
request->req_peer_size = 0; \
request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); \
request->super.req_addr = addr; \
request->super.req_count = count; \
@ -91,9 +93,9 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
request->super.req_addr, \
0); \
lam_convertor_get_packed_size(&request->req_convertor, &packed_size); \
request->req_bytes_msg = packed_size; \
request->req_bytes_packed = packed_size; \
} else { \
request->req_bytes_msg = 0; \
request->req_bytes_packed = 0; \
} \
}
@ -107,7 +109,7 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
static inline bool mca_ptl_base_send_request_matched(
mca_ptl_base_send_request_t* request)
{
return (NULL != request->req_peer_request.pval);
return (NULL != request->req_peer_match.pval);
}
#endif

Просмотреть файл

@ -106,9 +106,9 @@
* When a header of this type is received, to minimize latency the PTL should call the
* ptl_match() method as soon as entire header is available, potentially prior to receiving
* any data associated with the first fragment. If a match is made, the PML will call
* the ptl_recv() method of the fragments PTL.
* the ptl_matched() method of the fragments PTL.
*
* The ptl_recv() method should generate, if required, an ack to the source process. An
* The ptl_matched() method should generate, if required, an ack to the source process. An
* ack is required if the MCA_PTL_FLAGS_ACK_MATCHED bit is set by the source in the initial
* message header. The ack should contain a pointer to the matched request, along
* with the pointer to the orignal send fragment contained in the initial message header.
@ -152,6 +152,12 @@ typedef enum {
MCA_PTL_ENABLE
} mca_ptl_control_t;
/**
* PTL flags
*/
#define MCA_PTL_PUT 1
#define MCA_PTL_GET 2
/*
* PTL module interface functions and datatype.
*/
@ -319,49 +325,59 @@ typedef void (*mca_ptl_base_request_return_fn_t)(
);
/**
* PML->PTL Initiate a send of the specified size.
* PML->PTL Initiate a send/put to the peer.
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t)
* @param size (IN) Number of bytes PML is requesting PTL to deliver
* @param request (IN) Send request
* @param offset Current offset into packed/contiguous buffer.
* @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver,
* PTL returns number of bytes sucessfully fragmented
* @param flags (IN) Flags that should be passed to the peer via the message header.
* @param request (OUT) LAM_SUCCESS if the PTL was able to queue one or more fragments
*
* When multiple PTLs are available, a single request (that is large enough)
* will be split across the available PTLs. The PML scheduler will determine
* the percentage given to a PTL based on the bandwidth provided by the transport
* and its current resource usage. The size parameter to the send function indicates
* the number of bytes the PML is requesting the PTL to send. The PTL may choose
* to send 0->size bytes based on available resources.
* The PML implements a rendevouz protocol, with up to the PTL defined threshold
* bytes of the message sent in eager send mode. On receipt of an acknowledgment
* from the peer, the PML will schedule the remaining fragments. If the PTL supports
* RDMA functionality, these subsequent transfers may use RDMA put semantics.
*
* The current offset into the users buffer is passed into the send function
* via the req_offset member of the send request parameter. The send function
* must update req_offset with the actual number of bytes the PTL is able to
* fragment for delivery.
* If the PTL is unable to fragment the requested size, possibly due to resource
* constraints or datatype alighnment/offset, it should return the number of bytes
* actually fragmented in the size parameter.
*/
typedef int (*mca_ptl_base_send_fn_t)(
typedef int (*mca_ptl_base_put_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_send_request_t* send_request,
size_t size,
struct mca_ptl_base_send_request_t* request,
size_t offset,
size_t *size,
int flags
);
/**
* PML->PTL Notification that a receive fragment has been matched.
* PML->PTL Initiate a get from a peer.
*
* @param ptl (IN) PTL instance
* @param recv_frag (IN) Receive fragment
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param request (IN) Recv request
* @param offset Current offset into packed/contiguous buffer.
* @param size (IN/OUT) Number of bytes PML is requesting PTL to pull from peer,
* PTL returns number of bytes sucessfully fragmented.
* @param flags (IN)
* @param request (OUT) LAM_SUCCESS if the PTL was able to queue one or more fragments
*
* A fragment may be matched either when a new receive is posted,
* or on receipt of a fragment from the network. In either case,
* the PML will downcall into the PTL to provide a notification
* that the match was made.
* Initiate an RDMA get request to pull data from the peer. This is initiated
* at the receiver side when a request is matched if the PTL indicates that it
* supports RDMA get semantics.
*/
typedef void (*mca_ptl_base_recv_fn_t)(
typedef int (*mca_ptl_base_get_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* recv_frag
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_recv_request_t* request,
size_t offset,
size_t *size,
int flags
);
/**
@ -387,6 +403,21 @@ typedef bool (*mca_ptl_base_match_fn_t)(
struct mca_ptl_base_match_header_t* header
);
/**
* PML->PTL Notification from the PML to the PTL that a receive has
* been posted and matched against the indicated fragment.
*
* @param ptl (IN) PTL instance
* @param recv_frag Matched fragment
*
*/
typedef void (*mca_ptl_base_matched_fn_t)(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* request
);
/**
* PTL->PML Notification from the PTL to the PML that a fragment
* has completed (e.g. been successfully delivered into users buffer)
@ -424,13 +455,15 @@ struct mca_ptl_t {
uint32_t ptl_exclusivity; /**< indicates this PTL should be used exclusively */
uint32_t ptl_latency; /**< relative ranking of latency used to prioritize ptls */
uint32_t ptl_bandwidth; /**< bandwidth (Mbytes/sec) supported by each endpoint */
uint32_t ptl_flags; /**< flags (put/get...) */
/* PML->PTL function table */
mca_ptl_base_add_proc_fn_t ptl_add_proc;
mca_ptl_base_del_proc_fn_t ptl_del_proc;
mca_ptl_base_finalize_fn_t ptl_finalize;
mca_ptl_base_send_fn_t ptl_send;
mca_ptl_base_recv_fn_t ptl_recv;
mca_ptl_base_put_fn_t ptl_put;
mca_ptl_base_get_fn_t ptl_get;
mca_ptl_base_matched_fn_t ptl_matched;
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
mca_ptl_base_request_return_fn_t ptl_request_return;

Просмотреть файл

@ -30,11 +30,13 @@ mca_ptl_tcp_t mca_ptl_tcp = {
0, /* ptl_frag_first_size */
0, /* ptl_frag_min_size */
0, /* ptl_frag_max_size */
MCA_PTL_PUT, /* ptl flags */
mca_ptl_tcp_add_proc,
mca_ptl_tcp_del_proc,
mca_ptl_tcp_finalize,
mca_ptl_tcp_send,
mca_ptl_tcp_recv,
NULL,
mca_ptl_tcp_matched,
mca_ptl_tcp_request_alloc,
mca_ptl_tcp_request_return
}
@ -154,20 +156,23 @@ int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t size,
size_t offset,
size_t *size,
int flags)
{
mca_ptl_tcp_send_frag_t* sendfrag;
if (sendreq->req_frags == 0) {
int rc;
if (offset == 0) {
sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag;
} else {
int rc;
lam_list_item_t* item;
LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc);
if(NULL == (sendfrag = (mca_ptl_tcp_send_frag_t*)item))
return rc;
}
mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, size, flags);
rc = mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, offset, size, flags);
if(rc != LAM_SUCCESS)
return rc;
return mca_ptl_tcp_peer_send(ptl_peer, sendfrag);
}
@ -177,7 +182,7 @@ int mca_ptl_tcp_send(
* ack back to the peer and process the fragment.
*/
void mca_ptl_tcp_recv(
void mca_ptl_tcp_matched(
mca_ptl_t* ptl,
mca_ptl_base_recv_frag_t* frag)
{
@ -206,3 +211,4 @@ void mca_ptl_tcp_recv(
mca_ptl_tcp_recv_frag_progress((mca_ptl_tcp_recv_frag_t*)frag);
}

Просмотреть файл

@ -191,7 +191,7 @@ extern void mca_ptl_tcp_request_return(
* @param recv_frag (IN) Receive fragment
*
*/
extern void mca_ptl_tcp_recv(
extern void mca_ptl_tcp_matched(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* frag
);
@ -210,7 +210,8 @@ extern int mca_ptl_tcp_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t*,
size_t size,
size_t offset,
size_t *size,
int flags
);

Просмотреть файл

@ -182,7 +182,6 @@ static int mca_ptl_tcp_create(int if_index, const char* if_name)
{
mca_ptl_tcp_t* ptl = malloc(sizeof(mca_ptl_tcp_t));
char param[256];
char *value;
if(NULL == ptl)
return LAM_ERR_OUT_OF_RESOURCE;
memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp));

Просмотреть файл

@ -118,7 +118,7 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
mca_ptl_base_send_request_t* sendreq;
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->super.frag_request;
sendreq->req_peer_request = frag->super.super.frag_header.hdr_ack.hdr_dst_ptr;
sendreq->req_peer_match = frag->super.super.frag_header.hdr_ack.hdr_dst_match;
mca_ptl_tcp_send_frag_progress(sendfrag);
mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
return true;

Просмотреть файл

@ -49,16 +49,20 @@ int mca_ptl_tcp_send_frag_init(
mca_ptl_tcp_send_frag_t* sendfrag,
mca_ptl_base_peer_t* ptl_peer,
mca_ptl_base_send_request_t* sendreq,
size_t size,
size_t offset,
size_t* size,
int flags)
{
/* message header */
size_t size_in = *size;
size_t size_out;
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
if(sendreq->req_frags == 0) {
if(offset == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
@ -67,21 +71,21 @@ int mca_ptl_tcp_send_frag_init(
hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->super.req_peer;
hdr->hdr_match.hdr_tag = sendreq->super.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_msg;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->super.req_sequence;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_request;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
}
/* initialize convertor */
if(size > 0) {
if(size_in > 0) {
lam_convertor_t *convertor;
int rc;
@ -89,7 +93,7 @@ int mca_ptl_tcp_send_frag_init(
* can use the convertor initialized on the request, remaining fragments
* must copy/reinit the convertor as the transfer could be in parallel.
*/
if( sendreq->req_frags < 2 ) {
if( offset <= mca_ptl_tcp.super.ptl_first_frag_size ) {
convertor = &sendreq->req_convertor;
} else {
@ -101,7 +105,7 @@ int mca_ptl_tcp_send_frag_init(
sendreq->super.req_datatype,
sendreq->super.req_count,
sendreq->super.req_addr,
sendreq->req_offset);
offset);
}
/* if data is contigous convertor will return an offset
@ -109,29 +113,30 @@ int mca_ptl_tcp_send_frag_init(
* that holds the packed data
*/
sendfrag->frag_vec[1].iov_base = NULL;
sendfrag->frag_vec[1].iov_len = size;
sendfrag->frag_vec[1].iov_len = size_in;
if((rc = lam_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) < 0)
return LAM_ERROR;
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
size = sendfrag->frag_vec[1].iov_len;
sendreq->req_offset += size;
size_out = sendfrag->frag_vec[1].iov_len;
} else {
size_out = size_in;
}
hdr->hdr_frag.hdr_frag_length = size;
sendreq->req_frags++;
hdr->hdr_frag.hdr_frag_length = size_out;
/* fragment state */
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->super.frag_request = sendreq;
sendfrag->super.super.frag_addr = sendfrag->frag_vec[1].iov_base;
sendfrag->super.super.frag_size = size;
sendfrag->super.super.frag_size = size_out;
sendfrag->frag_peer = ptl_peer;
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2;
sendfrag->frag_vec_cnt = (size_out == 0) ? 1 : 2;
sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
sendfrag->frag_progressed = 0;
*size = size_out;
return LAM_SUCCESS;
}

Просмотреть файл

@ -44,7 +44,8 @@ int mca_ptl_tcp_send_frag_init(
mca_ptl_tcp_send_frag_t*,
struct mca_ptl_base_peer_t*,
struct mca_ptl_base_send_request_t*,
size_t size,
size_t offset,
size_t* size,
int flags);
@ -96,12 +97,16 @@ static inline void mca_ptl_tcp_send_frag_init_ack(
mca_ptl_tcp_recv_frag_t* frag)
{
mca_ptl_base_header_t* hdr = &ack->super.super.frag_header;
mca_ptl_base_recv_request_t* request = frag->super.frag_request;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
hdr->hdr_ack.hdr_src_ptr = frag->super.super.frag_header.hdr_frag.hdr_src_ptr;
hdr->hdr_ack.hdr_dst_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_ack.hdr_dst_ptr.pval = frag->super.frag_request;
hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_ack.hdr_dst_match.pval = request;
hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_ack.hdr_dst_addr.pval = request->super.req_addr;
hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
ack->super.frag_request = 0;
ack->super.super.frag_peer = ptl_peer;
ack->super.super.frag_owner = ptl;