diff --git a/ompi/datatype/convertor.c b/ompi/datatype/convertor.c index 748b7941a9..9dbb71c424 100644 --- a/ompi/datatype/convertor.c +++ b/ompi/datatype/convertor.c @@ -85,7 +85,7 @@ inline int32_t ompi_convertor_pack( ompi_convertor_t* pConv, struct iovec* iov, uint32_t* out_size, size_t* max_data, int32_t* freeAfter ) { - pConv->checksum = 1; + pConv->checksum = 0; /* protect against over packing data */ if( pConv->flags & CONVERTOR_COMPLETED ) { iov[0].iov_len = 0; @@ -106,7 +106,7 @@ inline int32_t ompi_convertor_unpack( ompi_convertor_t* pConv, struct iovec* iov, uint32_t* out_size, size_t* max_data, int32_t* freeAfter ) { - pConv->checksum = 1; + pConv->checksum = 0; /* protect against over unpacking data */ if( pConv->flags & CONVERTOR_COMPLETED ) { iov[0].iov_len = 0; diff --git a/ompi/mca/pml/dr/pml_dr_hdr.h b/ompi/mca/pml/dr/pml_dr_hdr.h index 3b3412436d..8f112f757f 100644 --- a/ompi/mca/pml/dr/pml_dr_hdr.h +++ b/ompi/mca/pml/dr/pml_dr_hdr.h @@ -31,19 +31,22 @@ #include "opal/types.h" -#define MCA_PML_DR_HDR_TYPE_MATCH 1 -#define MCA_PML_DR_HDR_TYPE_RNDV 2 -#define MCA_PML_DR_HDR_TYPE_ACK 3 -#define MCA_PML_DR_HDR_TYPE_FRAG 4 +#define MCA_PML_DR_HDR_TYPE_MATCH 0x01 +#define MCA_PML_DR_HDR_TYPE_RNDV 0x02 +#define MCA_PML_DR_HDR_TYPE_FRAG 0x04 +#define MCA_PML_DR_HDR_TYPE_ACK 0x80 + +#define MCA_PML_DR_HDR_TYPE_MATCH_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_MATCH) +#define MCA_PML_DR_HDR_TYPE_RNDV_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_RNDV) +#define MCA_PML_DR_HDR_TYPE_FRAG_ACK (MCA_PML_DR_HDR_TYPE_ACK | MCA_PML_DR_HDR_TYPE_FRAG) + +#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */ +#define MCA_PML_DR_HDR_FLAGS_MATCHED 2 -#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */ -#define MCA_PML_DR_HDR_FLAGS_VFRAG 2 -#define MCA_PML_DR_HDR_FLAGS_MATCH 4 /* is the ack in response to a match */ -#define MCA_PML_DR_HDR_FLAGS_RNDV 8 /* is the ack in response to a rndv */ -#define MCA_PML_DR_HDR_FLAGS_BUFFERED 16 /** * Common hdr attributes - must be first element in each hdr type */ + struct mca_pml_dr_common_hdr_t { uint8_t hdr_type; /**< type of envelope */ uint8_t hdr_flags; /**< flags indicating how fragment should be processed */ @@ -66,7 +69,7 @@ struct mca_pml_dr_match_hdr_t { int32_t hdr_src; /**< source rank */ int32_t hdr_tag; /**< user tag */ uint32_t hdr_csum; /**< checksum over data */ - ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */ + ompi_ptr_t hdr_src_ptr; /**< pointer to source vfrag - returned in ack */ }; typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t; @@ -121,8 +124,8 @@ struct mca_pml_dr_frag_hdr_t { uint16_t hdr_frag_idx; /**< bit index of this frag w/in vfrag */ uint32_t hdr_frag_csum; /**< checksum over data */ uint64_t hdr_frag_offset; /**< absolute offset of this fragment */ - ompi_ptr_t hdr_src_req; /**< pointer to source req */ - ompi_ptr_t hdr_dst_req; /**< pointer to receive req */ + ompi_ptr_t hdr_src_ptr; /**< pointer to source vfrag */ + ompi_ptr_t hdr_dst_ptr; /**< pointer to receive req */ }; typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t; @@ -147,8 +150,8 @@ struct mca_pml_dr_ack_hdr_t { mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ uint32_t hdr_vid; /**< virtual fragment id */ uint64_t hdr_vmask; /**< acknowledged frags */ - ompi_ptr_t hdr_src_req; /**< source request */ - ompi_ptr_t hdr_dst_req; /**< matched receive request */ + ompi_ptr_t hdr_src_ptr; /**< source vfrag */ + ompi_ptr_t hdr_dst_ptr; /**< matched receive request */ }; typedef struct mca_pml_dr_ack_hdr_t mca_pml_dr_ack_hdr_t; @@ -177,4 +180,23 @@ union mca_pml_dr_hdr_t { typedef union mca_pml_dr_hdr_t mca_pml_dr_hdr_t; +static inline size_t mca_pml_dr_hdr_size(uint8_t type) +{ + switch(type) { + case MCA_PML_DR_HDR_TYPE_MATCH: + return sizeof(mca_pml_dr_match_hdr_t); + case MCA_PML_DR_HDR_TYPE_RNDV: + return sizeof(mca_pml_dr_rendezvous_hdr_t); + case MCA_PML_DR_HDR_TYPE_FRAG: + return sizeof(mca_pml_dr_frag_hdr_t); + case MCA_PML_DR_HDR_TYPE_ACK: + case MCA_PML_DR_HDR_TYPE_MATCH_ACK: + case MCA_PML_DR_HDR_TYPE_RNDV_ACK: + case MCA_PML_DR_HDR_TYPE_FRAG_ACK: + return sizeof(mca_pml_dr_ack_hdr_t); + default: + assert(0); + } +} + #endif diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index a93a3671a2..ab0e694f87 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -52,10 +52,10 @@ OBJ_CLASS_INSTANCE( ); static void mca_pml_dr_recv_frag_unmatched_ack( - mca_pml_dr_match_hdr_t* hdr, - ompi_proc_t* ompi_proc, - uint8_t flags, - uint8_t mask); + mca_pml_dr_match_hdr_t* hdr, + ompi_proc_t* ompi_proc, + uint8_t flags, + uint8_t mask); /* * Release resources. @@ -99,6 +99,15 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt); break; } + case MCA_PML_DR_HDR_TYPE_MATCH_ACK: + { + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) { + assert(0); + return; + } + mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack); + break; + } case MCA_PML_DR_HDR_TYPE_RNDV: { if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t))) { @@ -108,17 +117,13 @@ void mca_pml_dr_recv_frag_callback( mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt); break; } - case MCA_PML_DR_HDR_TYPE_ACK: + case MCA_PML_DR_HDR_TYPE_RNDV_ACK: { - mca_pml_dr_send_request_t* sendreq; if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) { assert(0); return; } - sendreq = (mca_pml_dr_send_request_t*) - hdr->hdr_ack.hdr_src_req.pval; - - mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack); + mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack); break; } case MCA_PML_DR_HDR_TYPE_FRAG: @@ -128,12 +133,19 @@ void mca_pml_dr_recv_frag_callback( assert(0); return; } - recvreq = (mca_pml_dr_recv_request_t*) - hdr->hdr_frag.hdr_dst_req.pval; - + recvreq = hdr->hdr_frag.hdr_dst_ptr.pval; mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt); break; } + case MCA_PML_DR_HDR_TYPE_FRAG_ACK: + { + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) { + assert(0); + return; + } + mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack); + break; + } default: return; /* drop it on the floor.. */ break; @@ -523,8 +535,7 @@ rematch: OPAL_THREAD_UNLOCK(&comm->matching_lock); return rc; } - -MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); + MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag ); } @@ -559,30 +570,17 @@ MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); /* release matching lock before processing fragment */ if(match != NULL) { mca_pml_dr_recv_request_progress(match,btl,segments,num_segments); - } else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) { - COMPUTE_SPECIFIC_CHECKSUM((void*) (segments->seg_addr.lval + - sizeof(mca_pml_dr_match_hdr_t)), - segments->seg_len - sizeof(mca_pml_dr_match_hdr_t), - csum); + } else { + size_t hdr_size = mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type); + COMPUTE_SPECIFIC_CHECKSUM((void*)((unsigned char*)segments->seg_addr.pval + hdr_size), + segments->seg_len - hdr_size, csum); assert(csum == hdr->hdr_csum); - mca_pml_dr_recv_frag_unmatched_ack(hdr, ompi_proc, - MCA_PML_DR_HDR_FLAGS_MATCH, - csum == hdr->hdr_csum ? 1 : 0); - - } else { - COMPUTE_SPECIFIC_CHECKSUM((void*) (segments->seg_addr.lval + - sizeof(mca_pml_dr_rendezvous_hdr_t)), - segments->seg_len - sizeof(mca_pml_dr_rendezvous_hdr_t), - csum); - assert(csum == hdr->hdr_csum); - mca_pml_dr_recv_frag_unmatched_ack(hdr, - ompi_proc, - MCA_PML_DR_HDR_FLAGS_BUFFERED, + hdr->hdr_common.hdr_type, csum == hdr->hdr_csum ? 1 : 0); } - + if(additional_match) { opal_list_item_t* item; while(NULL != (item = opal_list_remove_first(&additional_matches))) { @@ -597,10 +595,10 @@ MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); static void mca_pml_dr_recv_frag_unmatched_ack( - mca_pml_dr_match_hdr_t* hdr, - ompi_proc_t* ompi_proc, - uint8_t flags, - uint8_t mask) + mca_pml_dr_match_hdr_t* hdr, + ompi_proc_t* ompi_proc, + uint8_t type, + uint8_t mask) { mca_bml_base_endpoint_t* bml_endpoint = NULL; mca_btl_base_descriptor_t* des; @@ -620,13 +618,13 @@ static void mca_pml_dr_recv_frag_unmatched_ack( /* fill out header */ ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; - ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = flags; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | type; + ack->hdr_common.hdr_flags = 0; ack->hdr_vid = hdr->hdr_vid; ack->hdr_vmask = mask; - ack->hdr_src_req = hdr->hdr_src_req; - assert(ack->hdr_src_req.pval); - ack->hdr_dst_req.pval = NULL; + ack->hdr_src_ptr = hdr->hdr_src_ptr; + assert(ack->hdr_src_ptr.pval); + ack->hdr_dst_ptr.pval = NULL; ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); /* initialize descriptor */ @@ -829,9 +827,9 @@ void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag) ack->hdr_common.hdr_flags = 0; ack->hdr_vmask = 1; ack->hdr_vid = frag->hdr.hdr_match.hdr_vid; - ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req; - assert(ack->hdr_src_req.pval); - ack->hdr_dst_req.pval = NULL; + ack->hdr_src_ptr = frag->hdr.hdr_match.hdr_src_ptr; + assert(ack->hdr_src_ptr.pval); + ack->hdr_dst_ptr.pval = NULL; ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); /* initialize descriptor */ diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index 413f35285f..cb8d5af237 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -100,14 +100,12 @@ static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel; OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t); OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t); - OBJ_CONSTRUCT(&request->req_mutex, opal_mutex_t); } static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request) { OBJ_DESTRUCT(&request->req_vfrag0); OBJ_DESTRUCT(&request->req_vfrags); - OBJ_DESTRUCT(&request->req_mutex); } @@ -132,15 +130,13 @@ static void mca_pml_dr_ctl_completion( MCA_BML_BASE_BTL_DES_RETURN(bml_btl, des); } - /* * Generate an ack to the peer after first fragment is matched. */ -static void mca_pml_dr_recv_request_matched( +static void mca_pml_dr_recv_request_ack( mca_pml_dr_recv_request_t* recvreq, mca_pml_dr_match_hdr_t* hdr, - uint8_t flags, uint8_t mask) { ompi_proc_t* proc = recvreq->req_proc; @@ -151,9 +147,6 @@ static void mca_pml_dr_recv_request_matched( mca_pml_dr_ack_hdr_t* ack; int rc; - /* mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs + */ -/* recvreq->req_recv.req_base.req_peer; */ - /* if this hasn't been initialized yet - this is a synchronous send */ if(NULL == proc) { ompi_proc_t *ompi_proc = ompi_comm_peer_lookup( @@ -171,15 +164,15 @@ static void mca_pml_dr_recv_request_matched( /* fill out header */ ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; - ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = flags; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_common.hdr_type; + ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCHED; ack->hdr_vid = hdr->hdr_vid; ack->hdr_vmask = mask; - ack->hdr_src_req = hdr->hdr_src_req; - assert(ack->hdr_src_req.pval); - ack->hdr_dst_req.pval = recvreq; + ack->hdr_src_ptr = hdr->hdr_src_ptr; + assert(ack->hdr_src_ptr.pval); + ack->hdr_dst_ptr.pval = recvreq; ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); - + /* initialize descriptor */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbfunc = mca_pml_dr_ctl_completion; @@ -191,7 +184,6 @@ static void mca_pml_dr_recv_request_matched( } /* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */ - return; /* queue request to retry later */ @@ -211,7 +203,8 @@ retry: static void mca_pml_dr_recv_request_vfrag_ack( mca_pml_dr_recv_request_t* recvreq, - mca_pml_dr_vfrag_t* vfrag) + mca_pml_dr_vfrag_t* vfrag, + mca_pml_dr_frag_hdr_t* hdr) { ompi_proc_t* proc = recvreq->req_proc; mca_bml_base_endpoint_t* bml_endpoint = NULL; @@ -220,10 +213,6 @@ static void mca_pml_dr_recv_request_vfrag_ack( mca_pml_dr_ack_hdr_t* ack; int rc; - - /* mca_pml_dr_comm_proc_t* comm_proc = recvreq->req_recv.req_base.req_comm->c_pml_comm->procs + */ -/* recvreq->req_recv.req_base.req_peer; */ - bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml; bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); @@ -235,13 +224,12 @@ static void mca_pml_dr_recv_request_vfrag_ack( /* fill out header */ ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; - - ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_VFRAG; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG_ACK; + ack->hdr_common.hdr_flags = 0; ack->hdr_vid = vfrag->vf_id; ack->hdr_vmask = vfrag->vf_ack; - ack->hdr_src_req = recvreq->req_vfrag0.vf_send; - ack->hdr_dst_req.pval = recvreq; + ack->hdr_src_ptr = hdr->hdr_src_ptr; + ack->hdr_dst_ptr.pval = recvreq; ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); /* initialize descriptor */ @@ -285,6 +273,7 @@ void mca_pml_dr_recv_request_progress( bytes_received -= sizeof(mca_pml_dr_match_hdr_t); recvreq->req_recv.req_bytes_packed = bytes_received; + recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr; MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); MCA_PML_DR_RECV_REQUEST_UNPACK( recvreq, @@ -295,14 +284,18 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); - + if(csum != hdr->hdr_match.hdr_csum) { + assert(0); + } + mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match, + csum == hdr->hdr_match.hdr_csum ? 1 : 0); break; case MCA_PML_DR_HDR_TYPE_RNDV: bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t); recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; - recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_req; + recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_ptr; MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); MCA_PML_DR_RECV_REQUEST_UNPACK( recvreq, @@ -313,15 +306,11 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); - /* mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, */ -/* MCA_PML_DR_HDR_TYPE_ACK); */ if(csum != hdr->hdr_match.hdr_csum) { assert(0); } - - mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_match, - MCA_PML_DR_HDR_FLAGS_RNDV, - csum == hdr->hdr_match.hdr_csum ? 1 : 0); + mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_match, + csum == hdr->hdr_match.hdr_csum ? 1 : 0); break; case MCA_PML_DR_HDR_TYPE_FRAG: @@ -340,9 +329,10 @@ void mca_pml_dr_recv_request_progress( bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx); MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag); - + /* update the mask to show that this vfrag was received, - note that it might still fail the checksum though */ + * note that it might still fail the checksum though + */ vfrag->vf_mask_processed |= bit; if(csum == hdr->hdr_frag.hdr_frag_csum) { /* this part of the vfrag passed the checksum, @@ -352,7 +342,7 @@ void mca_pml_dr_recv_request_progress( if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { /* we have received all the pieces of the vfrag, ack everything that passed the checksum */ - mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag); + mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag, &hdr->hdr_frag); } } else { bytes_received = bytes_delivered = 0; diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index ea2187ab9a..c1d38997b9 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -75,15 +75,14 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) { OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t); - OBJ_CONSTRUCT(&req->req_pending, opal_list_t); OBJ_CONSTRUCT(&req->req_retrans, opal_list_t); - OBJ_CONSTRUCT(&req->req_mutex, opal_mutex_t); req->req_vfrag0.vf_len = 1; req->req_vfrag0.vf_idx = 1; + req->req_vfrag0.vf_ack = 0; req->req_vfrag0.vf_mask = 1; req->req_vfrag0.vf_mask_processed = 0; - req->req_vfrag0.sendreq = req; + req->req_vfrag0.vf_send.pval = req; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; @@ -94,9 +93,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) { OBJ_DESTRUCT(&req->req_vfrag0); - OBJ_DESTRUCT(&req->req_pending); OBJ_DESTRUCT(&req->req_retrans); - OBJ_DESTRUCT(&req->req_mutex); } @@ -106,18 +103,19 @@ OBJ_CLASS_INSTANCE( mca_pml_dr_send_request_construct, mca_pml_dr_send_request_destruct); + /** * Completion of a short message - nothing left to schedule. */ -void mca_pml_dr_match_completion_cache( +static void mca_pml_dr_match_completion( struct mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* ep, struct mca_btl_base_descriptor_t* descriptor, int status) { - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata; - mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context; + mca_pml_dr_send_request_t* sendreq = descriptor->des_cbdata; + mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; /* check completion status */ if(OMPI_SUCCESS != status) { @@ -126,47 +124,32 @@ void mca_pml_dr_match_completion_cache( orte_errmgr.abort(); } - /* attempt to cache the descriptor */ - MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor ); - - /* signal request completion */ OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); -} -/** - * Completion of a short message - nothing left to schedule. - */ + /* local completion */ + vfrag->vf_mask_processed |= 0x1; -void mca_pml_dr_match_completion_free( - struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t* ep, - struct mca_btl_base_descriptor_t* descriptor, - int status) -{ - - /* check completion status */ - if(OMPI_SUCCESS != status) { - /* TSW - FIX */ - opal_output(0, "%s:%d FATAL", __FILE__, __LINE__); - orte_errmgr.abort(); + /* been acked? */ + if(vfrag->vf_ack == vfrag->vf_mask) { + + /* return descriptor */ + if(NULL != sendreq->descriptor) { + mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); + sendreq->descriptor = NULL; + } + + /* update statistics */ + sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } - - /* we don't want to free the descriptor until we get a postive ACK */ - /* free the descriptor */ - /* mca_bml_base_free( bml_btl, descriptor ); */ - - /* signal request completion */ - /* OPAL_THREAD_LOCK(&ompi_request_lock); */ -/* MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); */ -/* OPAL_THREAD_UNLOCK(&ompi_request_lock); */ + OPAL_THREAD_UNLOCK(&ompi_request_lock); } /* * Completion of the first fragment of a long message that * requires an acknowledgement */ + static void mca_pml_dr_rndv_completion( mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* ep, @@ -174,6 +157,9 @@ static void mca_pml_dr_rndv_completion( int status) { mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata; + mca_pml_dr_vfrag_t* vfrag = &sendreq->req_vfrag0; + bool schedule = false; + /* check completion status */ if(OMPI_SUCCESS != status) { /* TSW - FIX */ @@ -181,13 +167,29 @@ static void mca_pml_dr_rndv_completion( orte_errmgr.abort(); } - - /* return the descriptor */ - /* mca_bml_base_free(bml_btl, descriptor); */ + OPAL_THREAD_LOCK(&ompi_request_lock); - /* advance the request */ + /* local completion */ + vfrag->vf_mask_processed |= 0x1; - MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + /* been acked? */ + if(vfrag->vf_ack == vfrag->vf_mask) { + + if(sendreq->descriptor) { + mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); + sendreq->descriptor = NULL; + } + sendreq->req_bytes_delivered = vfrag->vf_size; + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } else { + schedule = true; + } + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + if(schedule) { + mca_pml_dr_send_request_schedule(sendreq); + } /* check for pending requests */ MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING(); @@ -205,25 +207,13 @@ static void mca_pml_dr_frag_completion( struct mca_btl_base_descriptor_t* descriptor, int status) { - mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) descriptor->des_cbdata; - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) vfrag->sendreq; + mca_pml_dr_vfrag_t* vfrag = descriptor->des_cbdata; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval; - bool schedule; + bool schedule = false; + uint64_t bit; - uint64_t bit = ((uint64_t)1 << hdr->hdr_frag_idx); - vfrag->vf_mask_processed |= bit; - - /* when we have local completion of the entire vfrag - we stop the local wdog timers and set our ack timer - as the peer should be sending us an ack for the vfrag - */ - if(vfrag->vf_mask_processed == vfrag->vf_mask) { - MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); - /* MCA_PML_DR_VFRAG_ACK_START(vfrag); */ - } else { - MCA_PML_DR_VFRAG_WDOG_RESET(vfrag); - } /* check completion status */ if(OMPI_SUCCESS != status) { /* TSW - FIX */ @@ -231,24 +221,52 @@ static void mca_pml_dr_frag_completion( orte_errmgr.abort(); } - /* check for request completion */ OPAL_THREAD_LOCK(&ompi_request_lock); + bit = ((uint64_t)1 << hdr->hdr_frag_idx); + vfrag->vf_mask_processed |= bit; + + /* when we have local completion of the entire vfrag + we stop the local wdog timers and set our ack timer + as the peer should be sending us an ack for the vfrag + */ + if(vfrag->vf_mask_processed == vfrag->vf_mask) { + MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); + /* MCA_PML_DR_VFRAG_ACK_START(vfrag); */ + } else { + MCA_PML_DR_VFRAG_WDOG_RESET(vfrag); + } - /* count bytes of user data actually delivered */ - if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 && - (sendreq->req_send.req_bytes_packed - sendreq->req_send_offset) == 0) { - schedule = false; - } else { + /* update pipeline depth */ + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); + + /* has the vfrag already been acked */ + if(vfrag->vf_ack == vfrag->vf_mask) { + + /* check to see if we need to schedule the remainder of the message */ + sendreq->req_bytes_delivered += vfrag->vf_size; + + /* return this vfrag */ + MCA_PML_DR_VFRAG_RETURN(vfrag); + + /* are we done with this request ? */ + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } + + } else if (sendreq->req_send_offset < sendreq->req_send.req_bytes_packed || + opal_list_get_size(&sendreq->req_retrans)) { schedule = true; } OPAL_THREAD_UNLOCK(&ompi_request_lock); - if(schedule) { - mca_pml_dr_send_request_schedule(sendreq); - } /* return the descriptor */ mca_bml_base_free(bml_btl, descriptor); + /* schedule remainder of message? */ + if(schedule) { + mca_pml_dr_send_request_schedule(sendreq); + } + /* check for pending requests */ MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING(); } @@ -304,7 +322,6 @@ int mca_pml_dr_send_request_start_buffered( sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.bml_btl = bml_btl; - sendreq->req_vfrag0.vf_mask = 1; descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -341,7 +358,7 @@ int mca_pml_dr_send_request_start_buffered( hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = csum; - hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); @@ -421,7 +438,7 @@ int mca_pml_dr_send_request_start_copy( hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; - hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); @@ -430,10 +447,9 @@ int mca_pml_dr_send_request_start_copy( sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.bml_btl = bml_btl; - sendreq->req_vfrag0.vf_mask = 1; /* short message */ - descriptor->des_cbfunc = mca_pml_dr_match_completion_free; + descriptor->des_cbfunc = mca_pml_dr_match_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_cbdata = sendreq; @@ -490,12 +506,12 @@ int mca_pml_dr_send_request_start_prepare( hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; /* nope */ - hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); /* short message */ - descriptor->des_cbfunc = mca_pml_dr_match_completion_free; + descriptor->des_cbfunc = mca_pml_dr_match_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_cbdata = sendreq; @@ -560,7 +576,7 @@ int mca_pml_dr_send_request_start_rndv( hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; - hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; @@ -573,7 +589,6 @@ int mca_pml_dr_send_request_start_rndv( sendreq->req_send_offset = size; sendreq->req_vfrag0.vf_size = size; sendreq->req_vfrag0.bml_btl = bml_btl; - sendreq->req_vfrag0.vf_mask = 1; /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); @@ -612,6 +627,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) mca_bml_base_btl_t* bml_btl = NULL; mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag; size_t size = bytes_remaining; + /* offset tells us how much of the vfrag has been scheduled */ size_t offset = sendreq->req_send_offset - vfrag->vf_offset; int rc; @@ -629,7 +645,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) } MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag); vfrag->bml_btl = bml_btl; - vfrag->sendreq = sendreq; offset = 0; } else { /* always schedule the vfrag accross the same btl */ @@ -662,6 +677,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) } des->des_cbfunc = mca_pml_dr_frag_completion; des->des_cbdata = vfrag; + /* setup header */ hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; @@ -672,9 +688,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_frag_idx = vfrag->vf_idx; hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; hdr->hdr_frag_offset = sendreq->req_send_offset; - hdr->hdr_src_req.pval = sendreq; + hdr->hdr_src_ptr.pval = vfrag; - hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv; + hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); /* update state */ @@ -760,8 +776,8 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_frag_idx = vfrag->vf_idx; hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; hdr->hdr_frag_offset = sendreq->req_send_offset; - hdr->hdr_src_req.pval = sendreq; - hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv; + hdr->hdr_src_ptr.pval = vfrag; + hdr->hdr_dst_ptr = sendreq->req_vfrag0.vf_recv; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); /* update state */ @@ -792,10 +808,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* move from retrans to pending list */ if(vfrag->vf_idx == vfrag->vf_len) { - OPAL_THREAD_LOCK(&sendreq->req_mutex); + OPAL_THREAD_LOCK(&ompi_request_lock); opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + OPAL_THREAD_UNLOCK(&ompi_request_lock); } } } while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0); @@ -805,103 +820,155 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /** - * Acknowledgment of vfrag + * Acknowledgment of match vfrag. */ -void mca_pml_dr_send_request_acked( - mca_pml_dr_send_request_t* sendreq, + +void mca_pml_dr_send_request_match_ack( + mca_btl_base_module_t* btl, mca_pml_dr_ack_hdr_t* ack) { - mca_pml_dr_vfrag_t* vfrag; - assert(sendreq); - - MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); - /* MCA_PML_DR_VFRAG_ACK_STOP(vfrag); */ - - if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_BUFFERED) { - if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { - if(sendreq->descriptor) { - mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor ); - sendreq->descriptor = NULL; - } + mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; - } else { - assert(0); + OPAL_THREAD_LOCK(&ompi_request_lock); + vfrag->vf_ack |= ack->hdr_vmask; + + /* need to retransmit? */ + if(vfrag->vf_ack != vfrag->vf_mask) { + + mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context; + OPAL_THREAD_UNLOCK(&ompi_request_lock); + mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML); + + /* if already have local completion free descriptor and complete message */ + } else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { + + /* return descriptor */ + if(NULL != sendreq->descriptor) { + mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor ); + sendreq->descriptor = NULL; } - } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_RNDV) { - if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { - sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req; - if(sendreq->descriptor) { - mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor ); - sendreq->descriptor = NULL; - } - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, - vfrag, - sizeof(mca_pml_dr_rendezvous_hdr_t)); - - OPAL_THREAD_UNLOCK(&ompi_request_lock); + + /* do NOT complete message until matched at peer */ + if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { + /* update statistics */ + sendreq->req_bytes_delivered = vfrag->vf_size; + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + /* wait for local completion */ + } else { + OPAL_THREAD_UNLOCK(&ompi_request_lock); + } +} + +/** + * Acknowledgment of rndv vfrag. + */ + +void mca_pml_dr_send_request_rndv_ack( + mca_btl_base_module_t* btl, + mca_pml_dr_ack_hdr_t* ack) +{ + mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + + OPAL_THREAD_LOCK(&ompi_request_lock); + + /* need to retransmit? */ + if(ack->hdr_vmask != vfrag->vf_mask) { + + mca_bml_base_btl_t* bml_btl = sendreq->descriptor->des_context; + OPAL_THREAD_UNLOCK(&ompi_request_lock); + mca_bml_base_send(bml_btl, sendreq->descriptor, MCA_BTL_TAG_PML); + + /* acked and local completion */ + } else if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { + + /* return descriptor for the first fragment */ + if(NULL != sendreq->descriptor) { + mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); + sendreq->descriptor = NULL; + } + + /* do NOT schedule remainder of message until matched at peer */ + if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { + bool schedule = false; + sendreq->req_bytes_delivered = vfrag->vf_size; if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } else { - MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + vfrag->vf_recv = ack->hdr_dst_ptr; + schedule = true; } - } - else { - vfrag->vf_idx = 0; - vfrag->vf_ack = 0; - assert(0); - /* retransmit missing fragments */ - /* OPAL_THREAD_LOCK(&sendreq->req_mutex); */ -/* opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); */ -/* OPAL_THREAD_UNLOCK(&sendreq->req_mutex); */ -/* mca_pml_dr_send_request_schedule(sendreq); */ - } - } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) { - if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { - mca_bml_base_free( (mca_bml_base_btl_t*) sendreq->descriptor->des_context, sendreq->descriptor ); - sendreq->descriptor = NULL; - - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, - vfrag, - sizeof(mca_pml_dr_match_hdr_t)); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - /* everything was in the match, mark complete */ - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - } - } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_VFRAG){ - /* add in acknowledged fragments */ - vfrag->vf_ack |= ack->hdr_vmask; - - /* have all fragments w/in this vfrag been acked? */ - if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, - vfrag, - sizeof(mca_pml_dr_frag_hdr_t)); + /* vfrag has been matched at peer */ + vfrag->vf_ack = ack->hdr_vmask; OPAL_THREAD_UNLOCK(&ompi_request_lock); - /* return vfrag */ - if (vfrag != &sendreq->req_vfrag0) { - MCA_PML_DR_VFRAG_RETURN(vfrag); + if(schedule) { + mca_pml_dr_send_request_schedule(sendreq); } - - /* are we done with this request ? */ - OPAL_THREAD_LOCK(&ompi_request_lock); - if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); - - } else { - /* retransmit missing fragments */ - /* reset local completion flags to only those that have been - successfully acked */ - vfrag->vf_mask_processed = vfrag->vf_ack; - OPAL_THREAD_LOCK(&sendreq->req_mutex); - vfrag->vf_idx = 0; - opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&sendreq->req_mutex); - mca_pml_dr_send_request_schedule(sendreq); } + + /* wait for local completion */ + } else { + /* may need this to schedule rest of the message */ + vfrag->vf_recv = ack->hdr_dst_ptr; + + /* dont set ack until matched at peer */ + if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { + vfrag->vf_ack = ack->hdr_vmask; + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); } } + +/** + * Acknowledgment of vfrag. + */ + +void mca_pml_dr_send_request_frag_ack( + mca_btl_base_module_t* btl, + mca_pml_dr_ack_hdr_t* ack) +{ + mca_pml_dr_vfrag_t* vfrag = ack->hdr_src_ptr.pval; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + bool schedule = false; + + OPAL_THREAD_LOCK(&ompi_request_lock); + + /* add in acknowledged fragments */ + vfrag->vf_ack |= ack->hdr_vmask; + + /* need to retransmit? */ + if((vfrag->vf_ack & vfrag->vf_mask) != vfrag->vf_mask) { + + /* reset local completion flags to only those that have been successfully acked */ + vfrag->vf_mask_processed = vfrag->vf_ack; + vfrag->vf_idx = 0; + opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + schedule = true; + + /* acked and local completion */ + } else if (vfrag->vf_mask_processed == vfrag->vf_mask) { + + /* update statistics */ + sendreq->req_bytes_delivered += vfrag->vf_size; + + /* return vfrag */ + MCA_PML_DR_VFRAG_RETURN(vfrag); + + /* are we done with this request ? */ + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + /* is there something left to schedule */ + } else if (sendreq->req_send_offset < sendreq->req_send.req_bytes_packed) { + schedule = true; + } + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + if(schedule) { + mca_pml_dr_send_request_schedule(sendreq); + } +} + diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 6a1fbb7845..393d31f660 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -56,9 +56,7 @@ struct mca_pml_dr_send_request_t { mca_pml_dr_vfrag_t* req_vfrag; mca_pml_dr_vfrag_t req_vfrag0; - opal_list_t req_pending; opal_list_t req_retrans; - opal_mutex_t req_mutex; mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */ }; @@ -142,6 +140,8 @@ do { sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \ sendreq->req_endpoint = endpoint; \ sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ + sendreq->req_vfrag0.vf_ack = 0; \ + sendreq->req_vfrag0.vf_mask_processed = 0; \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ \ /* select a btl */ \ @@ -155,8 +155,11 @@ do { case MCA_PML_BASE_SEND_BUFFERED: \ rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \ break; \ + case MCA_PML_BASE_SEND_COMPLETE: \ + rc = mca_pml_dr_send_request_start_prepare(sendreq, bml_btl, size); \ + break; \ default: \ - if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \ + if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \ rc = mca_pml_dr_send_request_start_prepare(sendreq, bml_btl, size); \ } else { \ rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \ @@ -223,31 +226,6 @@ do { } while (0) -/* - * Advance a pending send request. Note that the initial descriptor must complete - * and the acknowledment received before the request can complete or be scheduled. - * However, these events may occur in either order. - */ - -#define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \ -do { \ - bool schedule = false; \ - \ - /* has an acknowledgment been received */ \ - if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ - OPAL_THREAD_LOCK(&ompi_request_lock); \ - if(sendreq->req_bytes_delivered != sendreq->req_send.req_bytes_packed) { \ - schedule = true; \ - } \ - OPAL_THREAD_UNLOCK(&ompi_request_lock); \ - } \ - \ - /* additional data to schedule */ \ - if(schedule == true) { \ - mca_pml_dr_send_request_schedule(sendreq); \ - } \ -} while (0) - /* * Release resources associated with a request */ @@ -291,46 +269,17 @@ do { vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \ } \ \ - vfrag->vf_mask_processed = 0; \ vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ - vfrag->vf_ack = 0; \ vfrag->vf_offset = sendreq->req_send_offset; \ + vfrag->vf_ack = 0; \ vfrag->vf_idx = 0; \ + vfrag->vf_mask_processed = 0; \ vfrag->vf_max_send_size = max_send_size; \ - opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); \ + vfrag->vf_send.pval = sendreq; \ sendreq->req_vfrag = vfrag; \ } while(0) -/* - * - */ - -#define MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq,hdr,vfrag) \ -do { \ - if ((hdr)->hdr_vid == (sendreq)->req_vfrag0.vf_id) { \ - vfrag = &(sendreq)->req_vfrag0; \ - } else if((sendreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \ - vfrag = (sendreq)->req_vfrag; \ - opal_list_remove_item(&(sendreq)->req_pending,(opal_list_item_t*)vfrag); \ - } else { \ - opal_list_item_t* item; \ - vfrag = NULL; \ - OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \ - for(item = opal_list_get_first(&(sendreq)->req_pending); \ - item != opal_list_get_end(&(sendreq)->req_pending); \ - item = opal_list_get_next(item)) { \ - mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \ - if(vf->vf_id == (hdr)->hdr_vid) { \ - opal_list_remove_item(&(sendreq)->req_pending,item); \ - vfrag = vf; \ - break; \ - } \ - } \ - OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \ - } \ -} while(0) - /* * */ @@ -339,7 +288,6 @@ do { do { \ opal_list_item_t* item; \ vfrag = NULL; \ - OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \ for(item = opal_list_get_first(&(sendreq)->req_retrans); \ item != opal_list_get_end(&(sendreq)->req_retrans); \ item = opal_list_get_next(item)) { \ @@ -349,7 +297,6 @@ do { \ break; \ } \ } \ - OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \ } while(0) @@ -418,35 +365,19 @@ int mca_pml_dr_send_request_reschedule( mca_pml_dr_vfrag_t* vfrag); /** - * Acknowledgment of vfrag + * Acknowledgment of vfrags */ -void mca_pml_dr_send_request_acked( - mca_pml_dr_send_request_t* sendreq, +void mca_pml_dr_send_request_match_ack( + mca_btl_base_module_t* btl, mca_pml_dr_ack_hdr_t*); -void mca_pml_dr_send_request_nacked( - mca_pml_dr_send_request_t* sendreq, +void mca_pml_dr_send_request_rndv_ack( + mca_btl_base_module_t* btl, mca_pml_dr_ack_hdr_t*); -/** - * Completion callback on match header - * Cache descriptor. - */ -void mca_pml_dr_match_completion_cache( - struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t* ep, - struct mca_btl_base_descriptor_t* descriptor, - int status); - -/** - * Completion callback on match header - * Free descriptor. - */ -void mca_pml_dr_match_completion_free( - struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t* ep, - struct mca_btl_base_descriptor_t* descriptor, - int status); +void mca_pml_dr_send_request_frag_ack( + mca_btl_base_module_t* btl, + mca_pml_dr_ack_hdr_t*); #if defined(c_plusplus) || defined(__cplusplus) diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.c b/ompi/mca/pml/dr/pml_dr_vfrag.c index 2be525ebd0..05109bf242 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.c +++ b/ompi/mca/pml/dr/pml_dr_vfrag.c @@ -19,8 +19,8 @@ #include "ompi_config.h" #include "pml_dr_vfrag.h" #include "pml_dr_sendreq.h" -void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* vfrag); -void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* vfrag); +void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* vfrag); +void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* vfrag); static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) { @@ -38,8 +38,8 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec; vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec; vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec; - opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_send_request_wdog_timeout, (void*) vfrag); - opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_send_request_ack_timeout, (void*) vfrag); + opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag); + opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag); } @@ -60,30 +60,29 @@ OBJ_CLASS_INSTANCE( /** * The wdog timer expired, better do something about it, like resend the current part of the vfrag */ -void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* data) { +void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data) +{ mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; - mca_pml_dr_send_request_t* sendreq = vfrag->sendreq; - OPAL_THREAD_LOCK(&sendreq->req_mutex); + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; + OPAL_THREAD_LOCK(&ompi_request_lock); vfrag->vf_idx = 0; - opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag); opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_schedule(sendreq); } /** * The ack timer expired, better do something about it, like resend the entire vfrag? */ -void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* data) { - mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; - mca_pml_dr_send_request_t* sendreq = vfrag->sendreq; +void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) { + mca_pml_dr_vfrag_t* vfrag = data; + mca_pml_dr_send_request_t* sendreq = vfrag->vf_send.pval; /* reset it all, so it will all retransmit */ vfrag->vf_ack = vfrag->vf_mask_processed = 0; - OPAL_THREAD_LOCK(&sendreq->req_mutex); + OPAL_THREAD_LOCK(&ompi_request_lock); vfrag->vf_idx = 0; - opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag); opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_schedule(sendreq); } diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.h b/ompi/mca/pml/dr/pml_dr_vfrag.h index 57a7825558..a07cdbcb6c 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.h +++ b/ompi/mca/pml/dr/pml_dr_vfrag.h @@ -43,8 +43,8 @@ struct mca_pml_dr_vfrag_t { uint64_t vf_ack; uint64_t vf_mask; uint64_t vf_mask_processed; - struct mca_pml_dr_send_request_t* sendreq; struct mca_bml_base_btl_t* bml_btl; + /* we need a timer for the vfrag for: 1) a watchdog timer for local completion of the current operation