diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index f13edf6f58..5d466509a4 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -69,6 +69,12 @@ struct mca_pml_dr_t { ompi_free_list_t recv_frags; ompi_free_list_t vfrags; ompi_free_list_t buffers; + + int timer_wdog_sec; + int timer_wdog_usec; + int timer_ack_sec; + int timer_ack_usec; + }; typedef struct mca_pml_dr_t mca_pml_dr_t; diff --git a/ompi/mca/pml/dr/pml_dr_component.c b/ompi/mca/pml/dr/pml_dr_component.c index 50ba354736..2035e2d0b4 100644 --- a/ompi/mca/pml/dr/pml_dr_component.c +++ b/ompi/mca/pml/dr/pml_dr_component.c @@ -91,6 +91,15 @@ int mca_pml_dr_component_open(void) mca_pml_dr_param_register_int("eager_limit", 128 * 1024); mca_pml_dr.send_pipeline_depth = mca_pml_dr_param_register_int("send_pipeline_depth", 3); + mca_pml_dr.timer_wdog_sec = + mca_pml_dr_param_register_int("timer_wdog_sec", 1); + mca_pml_dr.timer_wdog_usec = + mca_pml_dr_param_register_int("timer_wdog_usec", 0); + mca_pml_dr.timer_ack_sec = + mca_pml_dr_param_register_int("timer_ack_sec", 1); + mca_pml_dr.timer_ack_usec = + mca_pml_dr_param_register_int("timer_ack_usec", 0); + OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t); /* requests */ diff --git a/ompi/mca/pml/dr/pml_dr_hdr.h b/ompi/mca/pml/dr/pml_dr_hdr.h index a3a124d540..f45e9ce9b2 100644 --- a/ompi/mca/pml/dr/pml_dr_hdr.h +++ b/ompi/mca/pml/dr/pml_dr_hdr.h @@ -32,8 +32,7 @@ #define MCA_PML_DR_HDR_TYPE_MATCH 1 #define MCA_PML_DR_HDR_TYPE_RNDV 2 #define MCA_PML_DR_HDR_TYPE_ACK 3 -#define MCA_PML_DR_HDR_TYPE_NACK 4 -#define MCA_PML_DR_HDR_TYPE_FRAG 5 +#define MCA_PML_DR_HDR_TYPE_FRAG 4 #define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */ #define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 22bf191f56..8e1547933e 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -24,6 +24,7 @@ #include "opal/class/opal_list.h" #include "opal/threads/mutex.h" +#include "opal/util/crc.h" #include "ompi/constants.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/pml/pml.h" @@ -77,39 +78,58 @@ void mca_pml_dr_recv_frag_callback( { mca_btl_base_segment_t* segments = des->des_dst; mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; + if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) { return; } switch(hdr->hdr_common.hdr_type) { case MCA_PML_DR_HDR_TYPE_MATCH: + { + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t))) { + assert(0); + return; + } + mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt); + break; + } case MCA_PML_DR_HDR_TYPE_RNDV: { + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t))) { + assert(0); + return; + } mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt); break; } case MCA_PML_DR_HDR_TYPE_ACK: { - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) + mca_pml_dr_send_request_t* sendreq; + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) { + assert(0); + return; + } + sendreq = (mca_pml_dr_send_request_t*) hdr->hdr_ack.hdr_src_req.pval; + mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack); break; } - case MCA_PML_DR_HDR_TYPE_NACK: - { - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) - hdr->hdr_ack.hdr_src_req.pval; - mca_pml_dr_send_request_nacked(sendreq, &hdr->hdr_ack); - break; - } case MCA_PML_DR_HDR_TYPE_FRAG: { - mca_pml_dr_recv_request_t* recvreq = (mca_pml_dr_recv_request_t*) + mca_pml_dr_recv_request_t* recvreq; + if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t))) { + assert(0); + return; + } + recvreq = (mca_pml_dr_recv_request_t*) hdr->hdr_frag.hdr_dst_req.pval; + mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt); break; } default: + return; /* drop it on the floor.. */ break; } } @@ -509,7 +529,7 @@ rematch: if (0 < opal_list_get_size(&proc->frags_cant_match)) { additional_match = mca_pml_dr_check_cantmatch_for_match(&additional_matches,comm,proc); } - + } else { /* @@ -697,6 +717,7 @@ rematch: void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag) { + mca_pml_dr_ack_hdr_t* ack; mca_btl_base_descriptor_t* des; mca_bml_base_endpoint_t* bml_endpoint; @@ -722,7 +743,8 @@ void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag) ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req; assert(ack->hdr_src_req.pval); ack->hdr_dst_req.pval = NULL; - + ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); + /* initialize descriptor */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbfunc = mca_pml_dr_ctl_completion; diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index f558814376..d689d1b961 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -17,6 +17,7 @@ */ #include "ompi_config.h" +#include "opal/util/crc.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/bml/bml.h" @@ -139,7 +140,7 @@ static void mca_pml_dr_ctl_completion( static void mca_pml_dr_recv_request_matched( mca_pml_dr_recv_request_t* recvreq, mca_pml_dr_rendezvous_hdr_t* hdr, - uint8_t type) + uint8_t mask) { ompi_proc_t* proc = recvreq->req_proc; mca_bml_base_endpoint_t* bml_endpoint = NULL; @@ -169,11 +170,12 @@ static void mca_pml_dr_recv_request_matched( ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH; ack->hdr_vid = hdr->hdr_match.hdr_vid; - ack->hdr_vmask = 0x1; + ack->hdr_vmask = mask; ack->hdr_src_req = hdr->hdr_match.hdr_src_req; assert(ack->hdr_src_req.pval); ack->hdr_dst_req.pval = recvreq; - + ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); + /* initialize descriptor */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbfunc = mca_pml_dr_ctl_completion; @@ -194,50 +196,6 @@ retry: opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag); } -/* - * Generate an ack to the peer after first fragment is matched. - */ - -static void mca_pml_dr_recv_request_nack( - mca_pml_dr_recv_request_t* recvreq, - mca_pml_dr_frag_hdr_t* hdr) -{ - ompi_proc_t* proc = recvreq->req_proc; - mca_bml_base_endpoint_t* bml_endpoint = NULL; - mca_btl_base_descriptor_t* des; - mca_bml_base_btl_t* bml_btl; - mca_pml_dr_ack_hdr_t* nack; - int rc; - - bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml; - bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); - - /* allocate descriptor */ - MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); - if(NULL == des) { - return; - } - - /* fill out header */ - nack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; - nack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_NACK; - nack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH; - nack->hdr_vid = hdr->hdr_vid; - nack->hdr_vmask = 1 << hdr->hdr_frag_idx; - nack->hdr_src_req = hdr->hdr_src_req; - assert(nack->hdr_src_req.pval); - nack->hdr_dst_req.pval = recvreq; - - /* initialize descriptor */ - des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; - des->des_cbfunc = mca_pml_dr_ctl_completion; - - rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); - if(rc != OMPI_SUCCESS) { - mca_bml_base_free(bml_btl, des); - } -} - /* * Generate an ack w/ the current vfrag status. @@ -272,6 +230,7 @@ static void mca_pml_dr_recv_request_vfrag_ack( ack->hdr_vmask = vfrag->vf_ack; ack->hdr_src_req = recvreq->req_vfrag0.vf_send; ack->hdr_dst_req.pval = recvreq; + ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); /* initialize descriptor */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -302,7 +261,9 @@ void mca_pml_dr_recv_request_progress( mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; size_t i; uint32_t csum = 0; - + uint64_t bit; + mca_pml_dr_vfrag_t* vfrag; + for(i=0; ihdr_rndv, - MCA_PML_DR_HDR_TYPE_ACK); /* mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, */ -/* (csum == hdr->hdr_match.hdr_csum) ? MCA_PML_DR_HDR_TYPE_ACK : MCA_PML_DR_HDR_TYPE_NACK); */ +/* MCA_PML_DR_HDR_TYPE_ACK); */ + if(csum != hdr->hdr_match.hdr_csum) { + assert(0); + } + + mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, + (csum == hdr->hdr_match.hdr_csum)); break; case MCA_PML_DR_HDR_TYPE_FRAG: - bytes_received -= sizeof(mca_pml_dr_frag_hdr_t); data_offset = hdr->hdr_frag.hdr_frag_offset; MCA_PML_DR_RECV_REQUEST_UNPACK( @@ -357,28 +321,26 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); - - /* if checksum fails - immediately nack this fragment */ - /* if(csum != hdr->hdr_frag.hdr_frag_csum) { */ - if(0) { - bytes_received = bytes_delivered = 0; - mca_pml_dr_recv_request_nack(recvreq, &hdr->hdr_frag); - } else { - mca_pml_dr_vfrag_t* vfrag; - uint64_t bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx); - - /* update vfrag status */ - MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag); - if((vfrag->vf_ack & bit) == 0) { - vfrag->vf_ack |= bit; - if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { - /* done w/ this vfrag - ack it */ - mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag); - } - } else { - /* duplicate fragment - send an ack w/ the frags completed */ + + bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx); + + MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag); + + /* update the mask to show that this vfrag was received, + note that it might still fail the checksum though */ + vfrag->vf_mask_processed |= bit; + if(csum == hdr->hdr_frag.hdr_frag_csum) { + /* this part of the vfrag passed the checksum, + mark it so that we ack it after receiving the + entire vfrag */ + vfrag->vf_ack |= bit; + if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) { + /* we have received all the pieces of the vfrag, ack + everything that passed the checksum */ mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag); } + } else { + bytes_received = bytes_delivered = 0; } break; diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.h b/ompi/mca/pml/dr/pml_dr_recvreq.h index 81f3db293a..d021d267b1 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.h +++ b/ompi/mca/pml/dr/pml_dr_recvreq.h @@ -245,10 +245,11 @@ do { &max_data, \ &free_after); \ bytes_delivered = max_data; \ + csum = request->req_recv.req_convertor.checksum; \ } else { \ bytes_delivered = 0; \ + csum = 0; \ } \ - csum = request->req_recv.req_convertor.checksum; \ } while (0) @@ -310,6 +311,7 @@ do { \ (vfrag)->vf_id = (hdr)->hdr_vid; \ (vfrag)->vf_len = (hdr)->hdr_vlen; \ (vfrag)->vf_ack = 0; \ + (vfrag)->vf_mask_processed = 0; \ if((hdr)->hdr_vlen == 64) { \ (vfrag)->vf_mask = ~(uint64_t)0; \ } else { \ diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index 0677f1c10c..173b7a32d7 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -82,11 +82,13 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) req->req_vfrag0.vf_len = 1; req->req_vfrag0.vf_idx = 1; req->req_vfrag0.vf_mask = 1; + req->req_vfrag0.vf_mask_processed = 0; + req->req_vfrag0.sendreq = req; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel; - + } static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) @@ -212,10 +214,25 @@ static void mca_pml_dr_frag_completion( struct mca_btl_base_descriptor_t* descriptor, int status) { - mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata; - mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context; + mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) descriptor->des_cbdata; + mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) vfrag->sendreq; + mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; + mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval; bool schedule; - + + uint64_t bit = ((uint64_t)1 << hdr->hdr_frag_idx); + vfrag->vf_mask_processed |= bit; + + /* when we have local completion of the entire vfrag + we stop the local wdog timers and set our ack timer + as the peer should be sending us an ack for the vfrag + */ + if(vfrag->vf_mask_processed == vfrag->vf_mask) { + MCA_PML_DR_VFRAG_WDOG_STOP(vfrag); + /* MCA_PML_DR_VFRAG_ACK_START(vfrag); */ + } else { + MCA_PML_DR_VFRAG_WDOG_RESET(vfrag); + } /* check completion status */ if(OMPI_SUCCESS != status) { /* TSW - FIX */ @@ -267,7 +284,8 @@ int mca_pml_dr_send_request_start_buffered( size_t max_data; int32_t free_after; int rc; - + uint32_t csum; + /* allocate descriptor */ mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_dr_rendezvous_hdr_t) + size); if(NULL == descriptor) { @@ -290,12 +308,14 @@ int mca_pml_dr_send_request_start_buffered( mca_bml_base_free(bml_btl, descriptor); return rc; } - + + csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; /* update lengths */ segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data; sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; - + sendreq->req_vfrag0.bml_btl = bml_btl; + descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_cbdata = sendreq; @@ -330,11 +350,11 @@ int mca_pml_dr_send_request_start_buffered( hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; - hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_match.hdr_csum = csum; hdr->hdr_match.hdr_src_req.pval = sendreq; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); - + /* re-init convertor for packed data */ ompi_convertor_prepare_for_send( &sendreq->req_send.req_convertor, @@ -408,15 +428,16 @@ int mca_pml_dr_send_request_start_copy( hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; - hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); - + /* update lengths */ segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data; sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; - + sendreq->req_vfrag0.bml_btl = bml_btl; /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion_free; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -472,8 +493,9 @@ int mca_pml_dr_send_request_start_prepare( hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; - hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum; + hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; /* nope */ hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); /* short message */ @@ -484,7 +506,7 @@ int mca_pml_dr_send_request_start_prepare( /* update lengths */ sendreq->req_send_offset = size; sendreq->req_vfrag0.vf_size = size; - + sendreq->req_vfrag0.bml_btl = bml_btl; /* send */ rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { @@ -541,15 +563,19 @@ int mca_pml_dr_send_request_start_rndv( hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_src_req.pval = sendreq; + hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; + hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; - + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t)); + /* first fragment of a long message */ des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbdata = sendreq; des->des_cbfunc = mca_pml_dr_rndv_completion; sendreq->req_send_offset = size; sendreq->req_vfrag0.vf_size = size; - + sendreq->req_vfrag0.bml_btl = bml_btl; + /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { @@ -559,6 +585,7 @@ int mca_pml_dr_send_request_start_rndv( } + /** * Schedule pipeline of send descriptors for the given request, * using send protocol. @@ -583,7 +610,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) mca_pml_dr_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + mca_bml_base_btl_t* bml_btl = NULL; mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag; size_t size = bytes_remaining; /* offset tells us how much of the vfrag has been scheduled */ @@ -593,6 +620,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) /* do we need to allocate a new vfrag (we scheduled all the vfrag already) */ if(vfrag->vf_size == offset) { + bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); if(NULL == vfrag) { OPAL_THREAD_LOCK(&mca_pml_dr.lock); @@ -601,8 +629,12 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) break; } MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag); + vfrag->bml_btl = bml_btl; + vfrag->sendreq = sendreq; offset = 0; sendreq->req_num_vfrags++; + } else { /* always schedule the vfrag accross the same btl */ + bml_btl = vfrag->bml_btl; } /* makes sure that we don't exceed vfrag size */ @@ -630,8 +662,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) break; } des->des_cbfunc = mca_pml_dr_frag_completion; - des->des_cbdata = sendreq; - + des->des_cbdata = vfrag; /* setup header */ hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; @@ -643,6 +674,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum; hdr->hdr_frag_offset = sendreq->req_send_offset; hdr->hdr_src_req.pval = sendreq; + hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv; hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t)); @@ -651,9 +683,10 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) sendreq->req_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); - /* start vfrag watchdog timer */ - MCA_PML_DR_VFRAG_WDOG_START(vfrag); - + /* start vfrag watchdog timer if this is the first part of the vfrag*/ + if(vfrag->vf_idx == 0) { + MCA_PML_DR_VFRAG_WDOG_START(vfrag); + } /* initiate send - note that this may complete before the call returns */ rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); @@ -683,14 +716,16 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) */ while(vfrag->vf_idx < vfrag->vf_len && sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) { - if(((1 << vfrag->vf_idx) & vfrag->vf_ack) == 0) { - mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + if(((1 << vfrag->vf_idx) & vfrag->vf_mask_processed) == 0) { + mca_bml_base_btl_t* bml_btl = vfrag->bml_btl; mca_pml_dr_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx); size_t size; int rc; + + if(vfrag->vf_idx == vfrag->vf_len - 1) { size = vfrag->vf_size - offset; } else { @@ -714,7 +749,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) break; } des->des_cbfunc = mca_pml_dr_frag_completion; - des->des_cbdata = sendreq; + des->des_cbdata = vfrag; /* setup header */ hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; @@ -735,9 +770,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) sendreq->req_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); - /* start vfrag watchdog timer */ - MCA_PML_DR_VFRAG_WDOG_START(vfrag); - + /* reset the vfrag watchdog timer due to retransmission */ + MCA_PML_DR_VFRAG_WDOG_RESET(vfrag); + /* initiate send - note that this may complete before the call returns */ rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); @@ -777,20 +812,28 @@ void mca_pml_dr_send_request_acked( mca_pml_dr_send_request_t* sendreq, mca_pml_dr_ack_hdr_t* ack) { + mca_pml_dr_vfrag_t* vfrag; assert(sendreq); + MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); + /* MCA_PML_DR_VFRAG_ACK_STOP(vfrag); */ if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) { - sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req; - MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); - } else { - mca_pml_dr_vfrag_t* vfrag; - MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); - if(NULL == vfrag) { - return; + if(ack->hdr_vmask) { + sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req; + MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); } - + else { + vfrag->vf_idx = 0; + vfrag->vf_ack = 0; + /* retransmit missing fragments */ + /* OPAL_THREAD_LOCK(&sendreq->req_mutex); */ +/* opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); */ +/* OPAL_THREAD_UNLOCK(&sendreq->req_mutex); */ +/* mca_pml_dr_send_request_schedule(sendreq); */ + } + } else { /* add in acknowledged fragments */ vfrag->vf_ack |= ack->hdr_vmask; - + /* have all fragments w/in this vfrag been acked? */ if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { @@ -810,6 +853,9 @@ void mca_pml_dr_send_request_acked( } else { /* retransmit missing fragments */ + /* reset local completion flags to only those that have been + successfully acked */ + vfrag->vf_mask_processed = vfrag->vf_ack; OPAL_THREAD_LOCK(&sendreq->req_mutex); vfrag->vf_idx = 0; opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); @@ -818,26 +864,3 @@ void mca_pml_dr_send_request_acked( } } } - - -void mca_pml_dr_send_request_nacked( - mca_pml_dr_send_request_t* sendreq, - mca_pml_dr_ack_hdr_t* ack) -{ - mca_pml_dr_vfrag_t* vfrag; - MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); - if(NULL == vfrag) { - return; - } - - /* removed nacked bits from acknowledged fragments */ - vfrag->vf_idx = 0; - vfrag->vf_ack &= ~ack->hdr_vmask; - - /* retransmit missing fragments */ - OPAL_THREAD_LOCK(&sendreq->req_mutex); - opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); - OPAL_THREAD_UNLOCK(&sendreq->req_mutex); - mca_pml_dr_send_request_schedule(sendreq); -} - diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 00abea6203..72bc19b14c 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -21,6 +21,7 @@ #ifndef OMPI_PML_DR_SEND_REQUEST_H #define OMPI_PML_DR_SEND_REQUEST_H +#include "opal/util/crc.h" #include "ompi_config.h" #include "ompi/datatype/convertor.h" #include "ompi/mca/btl/btl.h" @@ -32,6 +33,7 @@ #include "pml_dr_comm.h" #include "pml_dr_hdr.h" #include "pml_dr_vfrag.h" +#include "opal/event/event.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -60,6 +62,8 @@ struct mca_pml_dr_send_request_t { opal_mutex_t req_mutex; size_t req_num_acks; size_t req_num_vfrags; + + }; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; @@ -167,6 +171,8 @@ do { return OMPI_ERR_OUT_OF_RESOURCE; \ } \ segment = descriptor->des_src; \ + /* setup vfrag */ \ + sendreq->req_vfrag0.vf_size = 0; \ \ /* build hdr */ \ hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \ @@ -177,6 +183,8 @@ do { hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ hdr->hdr_match.hdr_src_req.pval = sendreq; \ + hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; \ + hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); \ \ /* short message */ \ descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \ @@ -331,6 +339,7 @@ do { vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \ } \ \ + vfrag->vf_mask_processed = 0; \ vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ vfrag->vf_ack = 0; \ vfrag->vf_offset = sendreq->req_send_offset; \ diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.c b/ompi/mca/pml/dr/pml_dr_vfrag.c index 8345e19858..2be525ebd0 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.c +++ b/ompi/mca/pml/dr/pml_dr_vfrag.c @@ -18,7 +18,9 @@ #include "ompi_config.h" #include "pml_dr_vfrag.h" - +#include "pml_dr_sendreq.h" +void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* vfrag); +void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* vfrag); static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) { @@ -32,7 +34,12 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag) vfrag->vf_max_send_size = 0; vfrag->vf_ack = 0; vfrag->vf_mask = 0; - memset(&vfrag->vf_event, 0, sizeof(vfrag->vf_event)); + vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec; + vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec; + vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec; + vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec; + opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_send_request_wdog_timeout, (void*) vfrag); + opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_send_request_ack_timeout, (void*) vfrag); } @@ -50,4 +57,34 @@ OBJ_CLASS_INSTANCE( ); +/** + * The wdog timer expired, better do something about it, like resend the current part of the vfrag + */ +void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* data) { + mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; + mca_pml_dr_send_request_t* sendreq = vfrag->sendreq; + OPAL_THREAD_LOCK(&sendreq->req_mutex); + vfrag->vf_idx = 0; + opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag); + opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + mca_pml_dr_send_request_schedule(sendreq); +} + +/** + * The ack timer expired, better do something about it, like resend the entire vfrag? + */ +void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* data) { + mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data; + mca_pml_dr_send_request_t* sendreq = vfrag->sendreq; + /* reset it all, so it will all retransmit */ + vfrag->vf_ack = vfrag->vf_mask_processed = 0; + OPAL_THREAD_LOCK(&sendreq->req_mutex); + vfrag->vf_idx = 0; + opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag); + opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); + OPAL_THREAD_UNLOCK(&sendreq->req_mutex); + mca_pml_dr_send_request_schedule(sendreq); +} + diff --git a/ompi/mca/pml/dr/pml_dr_vfrag.h b/ompi/mca/pml/dr/pml_dr_vfrag.h index fd28c21951..57a7825558 100644 --- a/ompi/mca/pml/dr/pml_dr_vfrag.h +++ b/ompi/mca/pml/dr/pml_dr_vfrag.h @@ -42,7 +42,21 @@ struct mca_pml_dr_vfrag_t { size_t vf_max_send_size; uint64_t vf_ack; uint64_t vf_mask; - opal_event_t vf_event; + uint64_t vf_mask_processed; + struct mca_pml_dr_send_request_t* sendreq; + struct mca_bml_base_btl_t* bml_btl; + /* we need a timer for the vfrag for: + 1) a watchdog timer for local completion of the current + operation + 2) a timeout for ACK of the VRAG + */ + struct timeval tv_wdog; + struct timeval tv_ack; + opal_event_t ev_ack; + opal_event_t ev_wdog; + uint8_t cnt_wdog; + uint8_t cnt_ack; + uint8_t cnt_nack; }; typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t; @@ -61,17 +75,46 @@ do { \ OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \ } while(0) +#if 0 #define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \ do { \ - \ + opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \ +} while(0) + +#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \ +do { \ + opal_event_del(&vfrag->ev_wdog); \ + opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \ } while(0) #define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \ do { \ + opal_event_del(&vfrag->ev_wdog); \ \ } while(0) +#define MCA_PML_DR_VFRAG_ACK_START(vfrag) \ +do { \ + opal_event_add(&vfrag->ev_ack, &vfrag->tv_ack); \ +} while(0) +#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) \ +do { \ + opal_event_del(&vfrag->ev_ack); \ + \ +} while(0) + +#endif + +#if 1 + +#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) +#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) +#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) +#define MCA_PML_DR_VFRAG_ACK_START(vfrag) +#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) + +#endif #if defined(c_plusplus) || defined(__cplusplus) } #endif