1
1
This commit was SVN r9192.
Этот коммит содержится в:
Tim Woodall 2006-03-04 00:36:16 +00:00
родитель ca116e3cf3
Коммит 274ee03df6
8 изменённых файлов: 379 добавлений и 372 удалений

Просмотреть файл

@ -85,7 +85,7 @@ inline int32_t ompi_convertor_pack( ompi_convertor_t* pConv,
struct iovec* iov, uint32_t* out_size,
size_t* max_data, int32_t* freeAfter )
{
pConv->checksum = 1;
pConv->checksum = 0;
/* protect against over packing data */
if( pConv->flags & CONVERTOR_COMPLETED ) {
iov[0].iov_len = 0;
@ -106,7 +106,7 @@ inline int32_t ompi_convertor_unpack( ompi_convertor_t* pConv,
struct iovec* iov, uint32_t* out_size,
size_t* max_data, int32_t* freeAfter )
{
pConv->checksum = 1;
pConv->checksum = 0;
/* protect against over unpacking data */
if( pConv->flags & CONVERTOR_COMPLETED ) {
iov[0].iov_len = 0;

Просмотреть файл

@ -31,19 +31,22 @@
#include "opal/types.h"
#define MCA_PML_DR_HDR_TYPE_MATCH 1
#define MCA_PML_DR_HDR_TYPE_RNDV 2
#define MCA_PML_DR_HDR_TYPE_ACK 3
#define MCA_PML_DR_HDR_TYPE_FRAG 4
#define MCA_PML_DR_HDR_TYPE_MATCH 0x01
#define MCA_PML_DR_HDR_TYPE_RNDV 0x02
#define MCA_PML_DR_HDR_TYPE_FRAG 0x04
#define MCA_PML_DR_HDR_TYPE_ACK 0x80
#define MCA_PML_DR_HDR_TYPE_MATCH_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_MATCH)
#define MCA_PML_DR_HDR_TYPE_RNDV_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_RNDV)
#define MCA_PML_DR_HDR_TYPE_FRAG_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_FRAG)
#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */
#define MCA_PML_DR_HDR_FLAGS_MATCHED 2
#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */
#define MCA_PML_DR_HDR_FLAGS_VFRAG 2
#define MCA_PML_DR_HDR_FLAGS_MATCH 4 /* is the ack in response to a match */
#define MCA_PML_DR_HDR_FLAGS_RNDV 8 /* is the ack in response to a rndv */
#define MCA_PML_DR_HDR_FLAGS_BUFFERED 16
/**
* Common hdr attributes - must be first element in each hdr type
*/
struct mca_pml_dr_common_hdr_t {
uint8_t hdr_type; /**< type of envelope */
uint8_t hdr_flags; /**< flags indicating how fragment should be processed */
@ -66,7 +69,7 @@ struct mca_pml_dr_match_hdr_t {
int32_t hdr_src; /**< source rank */
int32_t hdr_tag; /**< user tag */
uint32_t hdr_csum; /**< checksum over data */
ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */
ompi_ptr_t hdr_src_ptr; /**< pointer to source vfrag - returned in ack */
};
typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t;
@ -121,8 +124,8 @@ struct mca_pml_dr_frag_hdr_t {
uint16_t hdr_frag_idx; /**< bit index of this frag w/in vfrag */
uint32_t hdr_frag_csum; /**< checksum over data */
uint64_t hdr_frag_offset; /**< absolute offset of this fragment */
ompi_ptr_t hdr_src_req; /**< pointer to source req */
ompi_ptr_t hdr_dst_req; /**< pointer to receive req */
ompi_ptr_t hdr_src_ptr; /**< pointer to source vfrag */
ompi_ptr_t hdr_dst_ptr; /**< pointer to receive req */
};
typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t;
@ -147,8 +150,8 @@ struct mca_pml_dr_ack_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< virtual fragment id */
uint64_t hdr_vmask; /**< acknowledged frags */
ompi_ptr_t hdr_src_req; /**< source request */
ompi_ptr_t hdr_dst_req; /**< matched receive request */
ompi_ptr_t hdr_src_ptr; /**< source vfrag */
ompi_ptr_t hdr_dst_ptr; /**< matched receive request */
};
typedef struct mca_pml_dr_ack_hdr_t mca_pml_dr_ack_hdr_t;
@ -177,4 +180,23 @@ union mca_pml_dr_hdr_t {
typedef union mca_pml_dr_hdr_t mca_pml_dr_hdr_t;
static inline size_t mca_pml_dr_hdr_size(uint8_t type)
{
switch(type) {
case MCA_PML_DR_HDR_TYPE_MATCH:
return sizeof(mca_pml_dr_match_hdr_t);
case MCA_PML_DR_HDR_TYPE_RNDV:
return sizeof(mca_pml_dr_rendezvous_hdr_t);
case MCA_PML_DR_HDR_TYPE_FRAG:
return sizeof(mca_pml_dr_frag_hdr_t);
case MCA_PML_DR_HDR_TYPE_ACK:
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
return sizeof(mca_pml_dr_ack_hdr_t);
default:
assert(0);
}
}
#endif

Просмотреть файл

@ -52,10 +52,10 @@ OBJ_CLASS_INSTANCE(
);
static void mca_pml_dr_recv_frag_unmatched_ack(
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t flags,
uint8_t mask);
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t flags,
uint8_t mask);
/*
* Release resources.
@ -99,6 +99,15 @@ void mca_pml_dr_recv_frag_callback(
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
{
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) {
assert(0);
return;
}
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV:
{
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t))) {
@ -108,17 +117,13 @@ void mca_pml_dr_recv_frag_callback(
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_ACK:
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
{
mca_pml_dr_send_request_t* sendreq;
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) {
assert(0);
return;
}
sendreq = (mca_pml_dr_send_request_t*)
hdr->hdr_ack.hdr_src_req.pval;
mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack);
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -128,12 +133,19 @@ void mca_pml_dr_recv_frag_callback(
assert(0);
return;
}
recvreq = (mca_pml_dr_recv_request_t*)
hdr->hdr_frag.hdr_dst_req.pval;
recvreq = hdr->hdr_frag.hdr_dst_ptr.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
{
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) {
assert(0);
return;
}
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
break;
}
default:
return; /* drop it on the floor.. */
break;
@ -523,8 +535,7 @@ rematch:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return rc;
}
MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag );
}
@ -559,30 +570,17 @@ MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
/* release matching lock before processing fragment */
if(match != NULL) {
mca_pml_dr_recv_request_progress(match,btl,segments,num_segments);
} else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) {
COMPUTE_SPECIFIC_CHECKSUM((void*) (segments->seg_addr.lval +
sizeof(mca_pml_dr_match_hdr_t)),
segments->seg_len - sizeof(mca_pml_dr_match_hdr_t),
csum);
} else {
size_t hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type);
COMPUTE_SPECIFIC_CHECKSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size),
segments->seg_len - hdr_size, csum);
assert(csum == hdr->hdr_csum);
mca_pml_dr_recv_frag_unmatched_ack(hdr,
ompi_proc,
MCA_PML_DR_HDR_FLAGS_MATCH,
csum == hdr->hdr_csum ? 1 : 0);
} else {
COMPUTE_SPECIFIC_CHECKSUM((void*) (segments->seg_addr.lval +
sizeof(mca_pml_dr_rendezvous_hdr_t)),
segments->seg_len - sizeof(mca_pml_dr_rendezvous_hdr_t),
csum);
assert(csum == hdr->hdr_csum);
mca_pml_dr_recv_frag_unmatched_ack(hdr,
ompi_proc,
MCA_PML_DR_HDR_FLAGS_BUFFERED,
hdr->hdr_common.hdr_type,
csum == hdr->hdr_csum ? 1 : 0);
}
if(additional_match) {
opal_list_item_t* item;
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
@ -597,10 +595,10 @@ MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
static void mca_pml_dr_recv_frag_unmatched_ack(
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t flags,
uint8_t mask)
mca_pml_dr_match_hdr_t* hdr,
ompi_proc_t* ompi_proc,
uint8_t type,
uint8_t mask)
{
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
@ -620,13 +618,13 @@ static void mca_pml_dr_recv_frag_unmatched_ack(
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = flags;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vid = hdr->hdr_vid;
ack->hdr_vmask = mask;
ack->hdr_src_req = hdr->hdr_src_req;
assert(ack->hdr_src_req.pval);
ack->hdr_dst_req.pval = NULL;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = NULL;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
@ -829,9 +827,9 @@ void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag)
ack->hdr_common.hdr_flags = 0;
ack->hdr_vmask = 1;
ack->hdr_vid = frag->hdr.hdr_match.hdr_vid;
ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req;
assert(ack->hdr_src_req.pval);
ack->hdr_dst_req.pval = NULL;
ack->hdr_src_ptr = frag->hdr.hdr_match.hdr_src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = NULL;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */

Просмотреть файл

@ -100,14 +100,12 @@ static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel;
OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t);
OBJ_CONSTRUCT(&request->req_mutex, opal_mutex_t);
}
static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request)
{
OBJ_DESTRUCT(&request->req_vfrag0);
OBJ_DESTRUCT(&request->req_vfrags);
OBJ_DESTRUCT(&request->req_mutex);
}
@ -132,15 +130,13 @@ static void mca_pml_dr_ctl_completion(
MCA_BML_BASE_BTL_DES_RETURN(bml_btl, des);
}
/*
* Generate an ack to the peer after first fragment is matched.
*/
static void mca_pml_dr_recv_request_matched(
static void mca_pml_dr_recv_request_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_match_hdr_t* hdr,
uint8_t flags,
uint8_t mask)
{
ompi_proc_t* proc = recvreq->req_proc;
@ -151,9 +147,6 @@ static void mca_pml_dr_recv_request_matched(
mca_pml_dr_ack_hdr_t* ack;
int rc;
/* mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs + */
/* recvreq->req_recv.req_base.req_peer; */
/* if this hasn't been initialized yet - this is a synchronous send */
if(NULL == proc) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(
@ -171,15 +164,15 @@ static void mca_pml_dr_recv_request_matched(
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = flags;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_common.hdr_type;
ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCHED;
ack->hdr_vid = hdr->hdr_vid;
ack->hdr_vmask = mask;
ack->hdr_src_req = hdr->hdr_src_req;
assert(ack->hdr_src_req.pval);
ack->hdr_dst_req.pval = recvreq;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
@ -191,7 +184,6 @@ static void mca_pml_dr_recv_request_matched(
}
/* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */
return;
/* queue request to retry later */
@ -211,7 +203,8 @@ retry:
static void mca_pml_dr_recv_request_vfrag_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_vfrag_t* vfrag)
mca_pml_dr_vfrag_t* vfrag,
mca_pml_dr_frag_hdr_t* hdr)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
@ -220,10 +213,6 @@ static void mca_pml_dr_recv_request_vfrag_ack(
mca_pml_dr_ack_hdr_t* ack;
int rc;
/* mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs + */
/* recvreq->req_recv.req_base.req_peer; */
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
@ -235,13 +224,12 @@ static void mca_pml_dr_recv_request_vfrag_ack(
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_VFRAG;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG_ACK;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vid = vfrag->vf_id;
ack->hdr_vmask = vfrag->vf_ack;
ack->hdr_src_req = recvreq->req_vfrag0.vf_send;
ack->hdr_dst_req.pval = recvreq;
ack->hdr_src_ptr = hdr->hdr_src_ptr;
ack->hdr_dst_ptr.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
@ -285,6 +273,7 @@ void mca_pml_dr_recv_request_progress(
bytes_received -= sizeof(mca_pml_dr_match_hdr_t);
recvreq->req_recv.req_bytes_packed = bytes_received;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
@ -295,14 +284,18 @@ void mca_pml_dr_recv_request_progress(
bytes_received,
bytes_delivered,
csum);
if(csum != hdr->hdr_match.hdr_csum) {
assert(0);
}
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match,
csum == hdr->hdr_match.hdr_csum ? 1 : 0);
break;
case MCA_PML_DR_HDR_TYPE_RNDV:
bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_req;
recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq,
@ -313,15 +306,11 @@ void mca_pml_dr_recv_request_progress(
bytes_received,
bytes_delivered,
csum);
/* mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, */
/* MCA_PML_DR_HDR_TYPE_ACK); */
if(csum != hdr->hdr_match.hdr_csum) {
assert(0);
}
mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_match,
MCA_PML_DR_HDR_FLAGS_RNDV,
csum == hdr->hdr_match.hdr_csum ? 1 : 0);
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match,
csum == hdr->hdr_match.hdr_csum ? 1 : 0);
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -340,9 +329,10 @@ void mca_pml_dr_recv_request_progress(
bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
/* update the mask to show that this vfrag was received,
note that it might still fail the checksum though */
* note that it might still fail the checksum though
*/
vfrag->vf_mask_processed |= bit;
if(csum == hdr->hdr_frag.hdr_frag_csum) {
/* this part of the vfrag passed the checksum,
@ -352,7 +342,7 @@ void mca_pml_dr_recv_request_progress(
if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag);
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag, &hdr->hdr_frag);
}
} else {
bytes_received = bytes_delivered = 0;

Просмотреть файл

@ -75,15 +75,14 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
{
OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&req->req_pending, opal_list_t);
OBJ_CONSTRUCT(&req->req_retrans, opal_list_t);
OBJ_CONSTRUCT(&req->req_mutex, opal_mutex_t);
req->req_vfrag0.vf_len = 1;
req->req_vfrag0.vf_idx = 1;
req->req_vfrag0.vf_ack = 0;
req->req_vfrag0.vf_mask = 1;
req->req_vfrag0.vf_mask_processed = 0;
req->req_vfrag0.sendreq = req;
req->req_vfrag0.vf_send.pval = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
@ -94,9 +93,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req)
{
OBJ_DESTRUCT(&req->req_vfrag0);
OBJ_DESTRUCT(&req->req_pending);
OBJ_DESTRUCT(&req->req_retrans);
OBJ_DESTRUCT(&req->req_mutex);
}
@ -106,18 +103,19 @@ OBJ_CLASS_INSTANCE(
mca_pml_dr_send_request_construct,
mca_pml_dr_send_request_destruct);
/**
* Completion of a short message - nothing left to schedule.
*/
void mca_pml_dr_match_completion_cache(
static void mca_pml_dr_match_completion(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata;
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context;
mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata;
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
/* check completion status */
if(OMPI_SUCCESS != status) {
@ -126,47 +124,32 @@ void mca_pml_dr_match_completion_cache(
orte_errmgr.abort();
}
/* attempt to cache the descriptor */
MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor );
/* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/**
* Completion of a short message - nothing left to schedule.
*/
/* local completion */
vfrag->vf_mask_processed |= 0x1;
void mca_pml_dr_match_completion_free(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
/* been acked? */
if(vfrag->vf_ack == vfrag->vf_mask) {
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
/* update statistics */
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
/* we don't want to free the descriptor until we get a postive ACK */
/* free the descriptor */
/* mca_bml_base_free( bml_btl, descriptor ); */
/* signal request completion */
/* OPAL_THREAD_LOCK(&ompi_request_lock); */
/* MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); */
/* OPAL_THREAD_UNLOCK(&ompi_request_lock); */
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/*
* Completion of the first fragment of a long message that
* requires an acknowledgement
*/
static void mca_pml_dr_rndv_completion(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
@ -174,6 +157,9 @@ static void mca_pml_dr_rndv_completion(
int status)
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata;
mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0;
bool schedule = false;
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
@ -181,13 +167,29 @@ static void mca_pml_dr_rndv_completion(
orte_errmgr.abort();
}
/* return the descriptor */
/* mca_bml_base_free(bml_btl, descriptor); */
OPAL_THREAD_LOCK(&ompi_request_lock);
/* advance the request */
/* local completion */
vfrag->vf_mask_processed |= 0x1;
MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq);
/* been acked? */
if(vfrag->vf_ack == vfrag->vf_mask) {
if(sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
schedule = true;
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
/* check for pending requests */
MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING();
@ -205,25 +207,13 @@ static void mca_pml_dr_frag_completion(
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) vfrag->sendreq;
mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval;
bool schedule;
bool schedule = false;
uint64_t bit;
uint64_t bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_processed |= bit;
/* when we have local completion of the entire vfrag
we stop the local wdog timers and set our ack timer
as the peer should be sending us an ack for the vfrag
*/
if(vfrag->vf_mask_processed == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* MCA_PML_DR_VFRAG_ACK_START(vfrag); */
} else {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
}
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
@ -231,24 +221,52 @@ static void mca_pml_dr_frag_completion(
orte_errmgr.abort();
}
/* check for request completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_processed |= bit;
/* when we have local completion of the entire vfrag
we stop the local wdog timers and set our ack timer
as the peer should be sending us an ack for the vfrag
*/
if(vfrag->vf_mask_processed == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* MCA_PML_DR_VFRAG_ACK_START(vfrag); */
} else {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
}
/* count bytes of user data actually delivered */
if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 &&
(sendreq->req_send.req_bytes_packed - sendreq->req_send_offset) == 0) {
schedule = false;
} else {
/* update pipeline depth */
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
/* has the vfrag already been acked */
if(vfrag->vf_ack == vfrag->vf_mask) {
/* check to see if we need to schedule the remainder of the message */
sendreq->req_bytes_delivered += vfrag->vf_size;
/* return this vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);
/* are we done with this request ? */
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
} else if (sendreq->req_send_offset < sendreq->req_send.req_bytes_packed ||
opal_list_get_size(&sendreq->req_retrans)) {
schedule = true;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
/* return the descriptor */
mca_bml_base_free(bml_btl, descriptor);
/* schedule remainder of message? */
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
/* check for pending requests */
MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING();
}
@ -304,7 +322,6 @@ int mca_pml_dr_send_request_start_buffered(
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_mask = 1;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -341,7 +358,7 @@ int mca_pml_dr_send_request_start_buffered(
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = csum;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
@ -421,7 +438,7 @@ int mca_pml_dr_send_request_start_copy(
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
@ -430,10 +447,9 @@ int mca_pml_dr_send_request_start_copy(
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_mask = 1;
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion_free;
descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
@ -490,12 +506,12 @@ int mca_pml_dr_send_request_start_prepare(
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; /* nope */
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion_free;
descriptor->des_cbfunc = mca_pml_dr_match_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
@ -560,7 +576,7 @@ int mca_pml_dr_send_request_start_rndv(
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
@ -573,7 +589,6 @@ int mca_pml_dr_send_request_start_rndv(
sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
sendreq->req_vfrag0.vf_mask = 1;
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
@ -612,6 +627,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
mca_bml_base_btl_t* bml_btl = NULL;
mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag;
size_t size = bytes_remaining;
/* offset tells us how much of the vfrag has been scheduled */
size_t offset = sendreq->req_send_offset - vfrag->vf_offset;
int rc;
@ -629,7 +645,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
}
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag);
vfrag->bml_btl = bml_btl;
vfrag->sendreq = sendreq;
offset = 0;
} else { /* always schedule the vfrag accross the same btl */
@ -662,6 +677,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = vfrag;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
@ -672,9 +688,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_req.pval = sendreq;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
/* update state */
@ -760,8 +776,8 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_req.pval = sendreq;
hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv;
hdr->hdr_src_ptr.pval = vfrag;
hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
/* update state */
@ -792,10 +808,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* move from retrans to pending list */
if(vfrag->vf_idx == vfrag->vf_len) {
OPAL_THREAD_LOCK(&sendreq->req_mutex);
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
}
} while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0);
@ -805,103 +820,155 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/**
* Acknowledgment of vfrag
* Acknowledgment of match vfrag.
*/
void mca_pml_dr_send_request_acked(
mca_pml_dr_send_request_t* sendreq,
void mca_pml_dr_send_request_match_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag;
assert(sendreq);
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
/* MCA_PML_DR_VFRAG_ACK_STOP(vfrag); */
if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_BUFFERED) {
if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) {
if(sendreq->descriptor) {
mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
}
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
} else {
assert(0);
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag->vf_ack |= ack->hdr_vmask;
/* need to retransmit? */
if(vfrag->vf_ack != vfrag->vf_mask) {
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);
/* if already have local completion free descriptor and complete message */
} else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* return descriptor */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
}
} else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_RNDV) {
if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) {
sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req;
if(sendreq->descriptor) {
mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
}
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,
vfrag,
sizeof(mca_pml_dr_rendezvous_hdr_t));
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* do NOT complete message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* wait for local completion */
} else {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
}
/**
* Acknowledgment of rndv vfrag.
*/
void mca_pml_dr_send_request_rndv_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_THREAD_LOCK(&ompi_request_lock);
/* need to retransmit? */
if(ack->hdr_vmask != vfrag->vf_mask) {
mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML);
/* acked and local completion */
} else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* return descriptor for the first fragment */
if(NULL != sendreq->descriptor) {
mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor);
sendreq->descriptor = NULL;
}
/* do NOT schedule remainder of message until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
bool schedule = false;
sendreq->req_bytes_delivered = vfrag->vf_size;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq);
vfrag->vf_recv = ack->hdr_dst_ptr;
schedule = true;
}
}
else {
vfrag->vf_idx = 0;
vfrag->vf_ack = 0;
assert(0);
/* retransmit missing fragments */
/* OPAL_THREAD_LOCK(&sendreq->req_mutex); */
/* opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); */
/* OPAL_THREAD_UNLOCK(&sendreq->req_mutex); */
/* mca_pml_dr_send_request_schedule(sendreq); */
}
} else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) {
if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) {
mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor );
sendreq->descriptor = NULL;
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,
vfrag,
sizeof(mca_pml_dr_match_hdr_t));
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* everything was in the match, mark complete */
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
} else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_VFRAG){
/* add in acknowledged fragments */
vfrag->vf_ack |= ack->hdr_vmask;
/* have all fragments w/in this vfrag been acked? */
if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) {
OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,
vfrag,
sizeof(mca_pml_dr_frag_hdr_t));
/* vfrag has been matched at peer */
vfrag->vf_ack = ack->hdr_vmask;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* return vfrag */
if (vfrag != &sendreq->req_vfrag0) {
MCA_PML_DR_VFRAG_RETURN(vfrag);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
/* are we done with this request ? */
OPAL_THREAD_LOCK(&ompi_request_lock);
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
/* retransmit missing fragments */
/* reset local completion flags to only those that have been
successfully acked */
vfrag->vf_mask_processed = vfrag->vf_ack;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
vfrag->vf_idx = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}
/* wait for local completion */
} else {
/* may need this to schedule rest of the message */
vfrag->vf_recv = ack->hdr_dst_ptr;
/* dont set ack until matched at peer */
if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) {
vfrag->vf_ack = ack->hdr_vmask;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
}
/**
* Acknowledgment of vfrag.
*/
void mca_pml_dr_send_request_frag_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
bool schedule = false;
OPAL_THREAD_LOCK(&ompi_request_lock);
/* add in acknowledged fragments */
vfrag->vf_ack |= ack->hdr_vmask;
/* need to retransmit? */
if((vfrag->vf_ack & vfrag->vf_mask) != vfrag->vf_mask) {
/* reset local completion flags to only those that have been successfully acked */
vfrag->vf_mask_processed = vfrag->vf_ack;
vfrag->vf_idx = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
schedule = true;
/* acked and local completion */
} else if (vfrag->vf_mask_processed == vfrag->vf_mask) {
/* update statistics */
sendreq->req_bytes_delivered += vfrag->vf_size;
/* return vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);
/* are we done with this request ? */
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
/* is there something left to schedule */
} else if (sendreq->req_send_offset < sendreq->req_send.req_bytes_packed) {
schedule = true;
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
mca_pml_dr_send_request_schedule(sendreq);
}
}

Просмотреть файл

@ -56,9 +56,7 @@ struct mca_pml_dr_send_request_t {
mca_pml_dr_vfrag_t* req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_pending;
opal_list_t req_retrans;
opal_mutex_t req_mutex;
mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */
};
@ -142,6 +140,8 @@ do {
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \
sendreq->req_endpoint = endpoint; \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_ack = 0; \
sendreq->req_vfrag0.vf_mask_processed = 0; \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
\
/* select a btl */ \
@ -155,8 +155,11 @@ do {
case MCA_PML_BASE_SEND_BUFFERED: \
rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \
break; \
case MCA_PML_BASE_SEND_COMPLETE: \
rc = mca_pml_dr_send_request_start_prepare(sendreq, bml_btl, size); \
break; \
default: \
if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \
if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \
rc = mca_pml_dr_send_request_start_prepare(sendreq, bml_btl, size); \
} else { \
rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \
@ -223,31 +226,6 @@ do {
} while (0)
/*
* Advance a pending send request. Note that the initial descriptor must complete
* and the acknowledment received before the request can complete or be scheduled.
* However, these events may occur in either order.
*/
#define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \
do { \
bool schedule = false; \
\
/* has an acknowledgment been received */ \
if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \
OPAL_THREAD_LOCK(&ompi_request_lock); \
if(sendreq->req_bytes_delivered != sendreq->req_send.req_bytes_packed) { \
schedule = true; \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} \
\
/* additional data to schedule */ \
if(schedule == true) { \
mca_pml_dr_send_request_schedule(sendreq); \
} \
} while (0)
/*
* Release resources associated with a request
*/
@ -291,46 +269,17 @@ do {
vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \
} \
\
vfrag->vf_mask_processed = 0; \
vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
vfrag->vf_ack = 0; \
vfrag->vf_offset = sendreq->req_send_offset; \
vfrag->vf_ack = 0; \
vfrag->vf_idx = 0; \
vfrag->vf_mask_processed = 0; \
vfrag->vf_max_send_size = max_send_size; \
opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); \
vfrag->vf_send.pval = sendreq; \
sendreq->req_vfrag = vfrag; \
} while(0)
/*
*
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq,hdr,vfrag) \
do { \
if ((hdr)->hdr_vid == (sendreq)->req_vfrag0.vf_id) { \
vfrag = &(sendreq)->req_vfrag0; \
} else if((sendreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \
vfrag = (sendreq)->req_vfrag; \
opal_list_remove_item(&(sendreq)->req_pending,(opal_list_item_t*)vfrag); \
} else { \
opal_list_item_t* item; \
vfrag = NULL; \
OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \
for(item = opal_list_get_first(&(sendreq)->req_pending); \
item != opal_list_get_end(&(sendreq)->req_pending); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
opal_list_remove_item(&(sendreq)->req_pending,item); \
vfrag = vf; \
break; \
} \
} \
OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \
} \
} while(0)
/*
*
*/
@ -339,7 +288,6 @@ do {
do { \
opal_list_item_t* item; \
vfrag = NULL; \
OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \
for(item = opal_list_get_first(&(sendreq)->req_retrans); \
item != opal_list_get_end(&(sendreq)->req_retrans); \
item = opal_list_get_next(item)) { \
@ -349,7 +297,6 @@ do { \
break; \
} \
} \
OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \
} while(0)
@ -418,35 +365,19 @@ int mca_pml_dr_send_request_reschedule(
mca_pml_dr_vfrag_t* vfrag);
/**
* Acknowledgment of vfrag
* Acknowledgment of vfrags
*/
void mca_pml_dr_send_request_acked(
mca_pml_dr_send_request_t* sendreq,
void mca_pml_dr_send_request_match_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t*);
void mca_pml_dr_send_request_nacked(
mca_pml_dr_send_request_t* sendreq,
void mca_pml_dr_send_request_rndv_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t*);
/**
* Completion callback on match header
* Cache descriptor.
*/
void mca_pml_dr_match_completion_cache(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status);
/**
* Completion callback on match header
* Free descriptor.
*/
void mca_pml_dr_match_completion_free(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status);
void mca_pml_dr_send_request_frag_ack(
mca_btl_base_module_t* btl,
mca_pml_dr_ack_hdr_t*);
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -19,8 +19,8 @@
#include "ompi_config.h"
#include "pml_dr_vfrag.h"
#include "pml_dr_sendreq.h"
void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* vfrag);
void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* vfrag);
void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag);
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag);
static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
{
@ -38,8 +38,8 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec;
vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec;
vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec;
opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_send_request_wdog_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_send_request_ack_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag);
}
@ -60,30 +60,29 @@ OBJ_CLASS_INSTANCE(
/**
* The wdog timer expired, better do something about it, like resend the current part of the vfrag
*/
void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* data) {
void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
{
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->sendreq;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag->vf_idx = 0;
opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
}
/**
* The ack timer expired, better do something about it, like resend the entire vfrag?
*/
void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->sendreq;
void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = data;
mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval;
/* reset it all, so it will all retransmit */
vfrag->vf_ack = vfrag->vf_mask_processed = 0;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
OPAL_THREAD_LOCK(&ompi_request_lock);
vfrag->vf_idx = 0;
opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_schedule(sendreq);
}

Просмотреть файл

@ -43,8 +43,8 @@ struct mca_pml_dr_vfrag_t {
uint64_t vf_ack;
uint64_t vf_mask;
uint64_t vf_mask_processed;
struct mca_pml_dr_send_request_t* sendreq;
struct mca_bml_base_btl_t* bml_btl;
/* we need a timer for the vfrag for:
1) a watchdog timer for local completion of the current
operation