diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index baa71d3584..6efa8db8db 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -124,7 +124,7 @@ void mca_pml_dr_recv_frag_callback( if(false == duplicate) { mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt); } else { - OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n")); + OPAL_OUTPUT((0, "%s:%d: dropping duplicate fragment\n", __FILE__, __LINE__)); } break; } @@ -676,7 +676,6 @@ void mca_pml_dr_recv_frag_send_ack( mca_bml_base_free(bml_btl, des); goto retry; } - return; /* queue request to retry later */ diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index 2367394dec..6a26b24142 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -205,7 +205,6 @@ static void mca_pml_dr_recv_request_ack( } mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_common.hdr_vid); - return; /* queue request to retry later */ diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index f5d240777a..99d96d67f6 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -69,7 +69,7 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) OBJ_CONSTRUCT(&req->req_retrans, opal_list_t); req->req_vfrag0.vf_len = 1; - req->req_vfrag0.vf_idx = 1; + req->req_vfrag0.vf_idx = 0; req->req_vfrag0.vf_ack = 0; req->req_vfrag0.vf_mask = 1; req->req_vfrag0.vf_mask_processed = 0; @@ -122,13 +122,14 @@ static void mca_pml_dr_match_completion( /* been acked? */ if(vfrag->vf_ack == vfrag->vf_mask) { MCA_PML_DR_VFRAG_ACK_STOP(vfrag); + /* return descriptor */ if(NULL != sendreq->descriptor) { mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); sendreq->descriptor = NULL; } - /* update statistics */ + /* update statistics and complete */ sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -136,7 +137,6 @@ static void mca_pml_dr_match_completion( MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag); } - } /* @@ -166,29 +166,33 @@ static void mca_pml_dr_rndv_completion( /* local completion */ vfrag->vf_mask_processed |= 0x1; - /* been acked? */ + /* positive ack? */ if(vfrag->vf_ack == vfrag->vf_mask) { MCA_PML_DR_VFRAG_ACK_STOP(vfrag); if(sendreq->descriptor) { mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); sendreq->descriptor = NULL; } - sendreq->req_bytes_delivered = vfrag->vf_size; - if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - } else { - schedule = true; + + /* matched at peer? */ + if(sendreq->req_matched) { + sendreq->req_bytes_delivered = vfrag->vf_size; + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } else { + schedule = true; + } } OPAL_THREAD_UNLOCK(&ompi_request_lock); if(schedule) { mca_pml_dr_send_request_schedule(sendreq); } + + /* negative ack - need to retransmit? */ } else if(vfrag->vf_retrans) { MCA_PML_DR_VFRAG_ACK_STOP(vfrag); MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag); } - - /* check for pending requests */ MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING(); @@ -238,6 +242,7 @@ static void mca_pml_dr_frag_completion( /* check to see if we need to schedule the remainder of the message */ sendreq->req_bytes_delivered += vfrag->vf_size; + assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); /* return this vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); @@ -734,7 +739,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) while(opal_list_get_size(&sendreq->req_retrans) && sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans); - vfrag->vf_retry_cnt ++; /* * Retransmit fragments that have not been acked. @@ -745,20 +749,20 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; mca_pml_dr_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; - size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx); + size_t offset_in_vfrag = vfrag->vf_max_send_size * vfrag->vf_idx; + size_t offset_in_msg = vfrag->vf_offset + offset_in_vfrag; size_t size; int rc; - - + vfrag->vf_retry_cnt ++; if(vfrag->vf_idx == vfrag->vf_len - 1) { - size = vfrag->vf_size - offset; + size = vfrag->vf_size - offset_in_vfrag; } else { size = vfrag->vf_max_send_size; } /* pack into a descriptor */ - ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset_in_msg); mca_bml_base_prepare_src( bml_btl, NULL, @@ -794,8 +798,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); /* update state */ - vfrag->vf_idx++; - sendreq->req_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); /* reset the vfrag watchdog timer due to retransmission */ @@ -807,7 +809,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) if(rc == OMPI_SUCCESS) { bytes_remaining -= size; } else { - vfrag->vf_idx--; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1); mca_bml_base_free(bml_btl,des); OPAL_THREAD_LOCK(&mca_pml_dr.lock); @@ -846,10 +847,11 @@ void mca_pml_dr_send_request_match_ack( OPAL_THREAD_LOCK(&ompi_request_lock); assert(vfrag->vf_ack == 0); + vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { MCA_PML_DR_VFRAG_ACK_STOP(vfrag); /* need to retransmit? */ - if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) { + if(vfrag->vf_ack != vfrag->vf_mask) { MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag); } else { /* if already have local completion free descriptor and complete message */ @@ -862,21 +864,20 @@ void mca_pml_dr_send_request_match_ack( /* do NOT complete message until matched at peer */ if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { /* update statistics */ + sendreq->req_matched = true; sendreq->req_bytes_delivered = vfrag->vf_size; MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } OPAL_THREAD_UNLOCK(&ompi_request_lock); } - /* wait for local completion */ } else { /* need to retransmit? */ - if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) { + if(vfrag->vf_ack != vfrag->vf_mask) { vfrag->vf_retrans = vfrag->vf_mask; - } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { - vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; - + } else if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { + sendreq->req_matched = true; } OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -895,22 +896,28 @@ void mca_pml_dr_send_request_rndv_ack( OPAL_THREAD_LOCK(&ompi_request_lock); + /* set acked bits */ + vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; + + /* local completion? */ if ((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { bool schedule = false; MCA_PML_DR_VFRAG_ACK_STOP(vfrag); + + /* need to retransmit? */ - if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) { + if(vfrag->vf_ack != vfrag->vf_mask) { MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag); } else { - /* acked and local completion */ - /* return descriptor for the first fragment */ + /* return descriptor of first fragment */ if(NULL != sendreq->descriptor) { mca_bml_base_free(sendreq->descriptor->des_context, sendreq->descriptor); sendreq->descriptor = NULL; } - /* do NOT schedule remainder of message until matched at peer */ + /* matched at peer? */ if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { + sendreq->req_matched = true; sendreq->req_bytes_delivered = vfrag->vf_size; if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); @@ -918,9 +925,6 @@ void mca_pml_dr_send_request_rndv_ack( vfrag->vf_recv = ack->hdr_dst_ptr; schedule = true; } - - /* vfrag has been matched at peer */ - vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; } OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -928,18 +932,20 @@ void mca_pml_dr_send_request_rndv_ack( mca_pml_dr_send_request_schedule(sendreq); } } + /* wait for local completion */ } else { + /* need to retransmit? */ - if((ack->hdr_vmask & vfrag->vf_mask) != vfrag->vf_mask) { + if(vfrag->vf_ack != vfrag->vf_mask) { vfrag->vf_retrans = vfrag->vf_mask; } else { /* may need this to schedule rest of the message */ vfrag->vf_recv = ack->hdr_dst_ptr; - /* dont set ack until matched at peer */ + /* matched at peer? */ if (ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCHED) { - vfrag->vf_ack = ack->hdr_vmask & vfrag->vf_mask; + sendreq->req_matched = true; } } OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -968,7 +974,7 @@ void mca_pml_dr_send_request_frag_ack( /* reset local completion flags to only those that have been successfully acked */ vfrag->vf_mask_processed = vfrag->vf_ack; - vfrag->vf_idx = 1; + vfrag->vf_idx = 0; opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); schedule = true; @@ -977,6 +983,7 @@ void mca_pml_dr_send_request_frag_ack( /* update statistics */ sendreq->req_bytes_delivered += vfrag->vf_size; + assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); /* return vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 3e742ba7b9..e339a6aa08 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -53,6 +53,7 @@ struct mca_pml_dr_send_request_t { size_t req_pipeline_depth; size_t req_bytes_delivered; size_t req_send_offset; + bool req_matched; mca_pml_dr_vfrag_t* req_vfrag; mca_pml_dr_vfrag_t req_vfrag0; @@ -158,6 +159,7 @@ do { sendreq->req_bytes_delivered = 0; \ sendreq->req_state = 0; \ sendreq->req_send_offset = 0; \ + sendreq->req_matched = false; \ sendreq->req_send.req_base.req_pml_complete = false; \ sendreq->req_send.req_base.req_ompi.req_complete = false; \ sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ @@ -168,7 +170,7 @@ do { sendreq->req_vfrag0.vf_ack = 0; \ sendreq->req_vfrag0.vf_mask_processed = 0; \ sendreq->req_vfrag0.vf_retrans = 0; \ - sendreq->req_vfrag0.vf_retry_cnt = 0; \ + sendreq->req_vfrag0.vf_retry_cnt = 0; \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ \ /* select a btl */ \ @@ -359,7 +361,7 @@ do { \ \ opal_output(0, "%s:%d:%s, retransmitting\n", __FILE__, __LINE__, __func__); \ assert(sendreq->descriptor->des_src != NULL); \ - vfrag->vf_idx = 1; \ + vfrag->vf_idx = 0; \ vfrag->vf_mask_processed = 0; \ vfrag->vf_ack = 0; \ vfrag->vf_retrans = 0; \ diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.c b/ompi/mca/pml/dr/pml_dr_vfrag.c index 9186b6011f..03833331a0 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.c +++ b/ompi/mca/pml/dr/pml_dr_vfrag.c @@ -73,7 +73,7 @@ void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data) opal_output(0, "%s:%d:%s, wdog retry count exceeded! FATAL", __FILE__, __LINE__, __func__); orte_errmgr.abort(); } - vfrag->vf_idx = 1; + vfrag->vf_idx = 0; vfrag->vf_mask_processed = 0; vfrag->vf_ack = 0; vfrag->vf_retrans = 0; @@ -93,11 +93,12 @@ void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data) { opal_output(0, "%s:%d: maximum ack retry count exceeded: FATAL", __FILE__, __LINE__); orte_errmgr.abort(); } + if(0 == vfrag->vf_offset) { /* this is the first part of the message that we need to resend */ MCA_PML_DR_SEND_REQUEST_RETRY(sendreq, vfrag); } else { - vfrag->vf_idx = 1; + vfrag->vf_idx = 0; vfrag->vf_mask_processed = 0; vfrag->vf_ack = 0; vfrag->vf_retrans = 0;