diff --git a/ompi/mca/pml/dr/pml_dr.h b/ompi/mca/pml/dr/pml_dr.h index 327943c847..f67bbdd61d 100644 --- a/ompi/mca/pml/dr/pml_dr.h +++ b/ompi/mca/pml/dr/pml_dr.h @@ -251,5 +251,40 @@ extern int mca_pml_dr_start( MCA_BML_BASE_BTL_DES_ALLOC(bml_btl, des, \ sizeof(mca_pml_dr_hdr_t) + (sizeof(mca_btl_base_segment_t) << 4), size) + +/* ADLER_NMAX is the largest n such that 255n(n+1)/2 + (n+1)(BASE-1) <= 2^32-1 */ +#define ADLER_NMAX 5551 +#define MOD_ADLER 65521 + +#define DO1(buf,i) {_a += buf[i]; _b += _a;} +#define DO2(buf,i) DO1(buf,i); DO1(buf,i+1); +#define DO4(buf,i) DO2(buf,i); DO2(buf,i+2); +#define DO8(buf,i) DO4(buf,i); DO4(buf,i+4); +#define DO16(buf) DO8(buf,0); DO8(buf,8); + +#define COMPUTE_SPECIFIC_CHECKSUM( DATA, LENGTH, ADLER32) \ +do { \ + uint8_t *_data = (DATA); /* Pointer to the data to be summed */ \ + size_t _len = (LENGTH); /* Length in bytes */ \ + uint32_t _a = (ADLER32) & 0xffff, \ + _b = ((ADLER32) >> 16) & 0xffff; \ +\ + while( _len > 0 ) { \ + unsigned _tlen = _len > ADLER_NMAX ? ADLER_NMAX : _len; \ + _len -= _tlen; \ + while( _tlen >= 16 ) { \ + DO16(_data); \ + _data += 16; \ + _tlen -= 16; \ + } \ + if( 0 != _tlen ) do { \ + _a += *_data++; _b += _a; \ + } while( --_tlen > 0 ); \ + _a = _a % MOD_ADLER; \ + _b = _b % MOD_ADLER; \ + } \ + (ADLER32) = _b << 16 | _a; \ +} while(0) + #endif diff --git a/ompi/mca/pml/dr/pml_dr_comm.c b/ompi/mca/pml/dr/pml_dr_comm.c index 1a8e465413..d3df3ce6c0 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.c +++ b/ompi/mca/pml/dr/pml_dr_comm.c @@ -36,6 +36,7 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t); OBJ_CONSTRUCT(&proc->acked_vfrags, opal_list_t); + proc->acked_vfrags_ptr = NULL; } diff --git a/ompi/mca/pml/dr/pml_dr_comm.h b/ompi/mca/pml/dr/pml_dr_comm.h index 44ddf8b73d..5efd2cede5 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.h +++ b/ompi/mca/pml/dr/pml_dr_comm.h @@ -115,24 +115,19 @@ static inline void mca_pml_dr_comm_proc_set_acked(mca_pml_dr_comm_proc_t* proc, int8_t direction = 0; /* 1 is next, -1 is previous */ mca_pml_dr_acked_item_t *new_item, *next_item, *prev_item; while(true) { - if(NULL == item && direction == 0) { - + if(NULL == item && + (direction == 0 || direction == 1)) { + new_item = OBJ_NEW(mca_pml_dr_acked_item_t); new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id; - opal_list_append(&proc->acked_vfrags, (opal_list_item_t*) item); - proc->acked_vfrags_ptr = (opal_list_item_t*) item; + opal_list_append(&proc->acked_vfrags, (opal_list_item_t*) new_item); + proc->acked_vfrags_ptr = (opal_list_item_t*) new_item; return; - - } else if (NULL == item && direction == 1) { - - opal_list_append(&proc->acked_vfrags, (opal_list_item_t*) item); - proc->acked_vfrags_ptr = (opal_list_item_t*) item; - return; - } else if (NULL == item && direction == -1) { - - opal_list_prepend(&proc->acked_vfrags, (opal_list_item_t*) item); - proc->acked_vfrags_ptr = (opal_list_item_t*) item; + new_item = OBJ_NEW(mca_pml_dr_acked_item_t); + new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id; + opal_list_prepend(&proc->acked_vfrags, (opal_list_item_t*) new_item); + proc->acked_vfrags_ptr = (opal_list_item_t*) new_item; return; } else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) { @@ -177,7 +172,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(mca_pml_dr_comm_proc_t* proc, opal_list_insert_pos(&proc->acked_vfrags, (opal_list_item_t*) item, (opal_list_item_t*) new_item); - proc->acked_vfrags_ptr = (opal_list_item_t*) item; + proc->acked_vfrags_ptr = (opal_list_item_t*) new_item; return; } else { direction = 1; @@ -195,7 +190,7 @@ static inline void mca_pml_dr_comm_proc_set_acked(mca_pml_dr_comm_proc_t* proc, (opal_list_item_t*) next_item, (opal_list_item_t*) new_item); } - proc->acked_vfrags_ptr = (opal_list_item_t*) item; + proc->acked_vfrags_ptr = (opal_list_item_t*) new_item; return; } else { direction = -1; diff --git a/ompi/mca/pml/dr/pml_dr_hdr.h b/ompi/mca/pml/dr/pml_dr_hdr.h index 5438076ca7..3b3412436d 100644 --- a/ompi/mca/pml/dr/pml_dr_hdr.h +++ b/ompi/mca/pml/dr/pml_dr_hdr.h @@ -36,11 +36,11 @@ #define MCA_PML_DR_HDR_TYPE_ACK 3 #define MCA_PML_DR_HDR_TYPE_FRAG 4 -#define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */ -#define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ +#define MCA_PML_DR_HDR_FLAGS_NBO 1 /* is the hdr in network byte order */ +#define MCA_PML_DR_HDR_FLAGS_VFRAG 2 #define MCA_PML_DR_HDR_FLAGS_MATCH 4 /* is the ack in response to a match */ - - +#define MCA_PML_DR_HDR_FLAGS_RNDV 8 /* is the ack in response to a rndv */ +#define MCA_PML_DR_HDR_FLAGS_BUFFERED 16 /** * Common hdr attributes - must be first element in each hdr type */ diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 8e1547933e..77a4e8d88d 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -51,6 +51,12 @@ OBJ_CLASS_INSTANCE( NULL ); +static void mca_pml_dr_recv_frag_unmatched_ack( + mca_pml_dr_match_hdr_t* hdr, + ompi_proc_t* ompi_proc, + uint8_t flags, + uint8_t mask); + /* * Release resources. */ @@ -437,7 +443,7 @@ bool mca_pml_dr_recv_frag_match( opal_list_t additional_matches; ompi_proc_t* ompi_proc; int rc; - + uint32_t csum = 0; /* communicator pointer */ comm_ptr=ompi_comm_lookup(hdr->hdr_ctx); comm=(mca_pml_dr_comm_t *)comm_ptr->c_pml_comm; @@ -517,7 +523,8 @@ rematch: OPAL_THREAD_UNLOCK(&comm->matching_lock); return rc; } - MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); + +MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl); opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag ); } @@ -536,6 +543,7 @@ rematch: * This message comes after the next expected, so it * is ahead of sequence. Save it for later. */ + mca_pml_dr_recv_frag_t* frag; MCA_PML_DR_RECV_FRAG_ALLOC(frag, rc); if(OMPI_SUCCESS != rc) { @@ -551,7 +559,27 @@ rematch: /* release matching lock before processing fragment */ if(match != NULL) { mca_pml_dr_recv_request_progress(match,btl,segments,num_segments); - } + } else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) { + COMPUTE_SPECIFIC_CHECKSUM(segments->seg_addr.pval + + sizeof(mca_pml_dr_match_hdr_t), + segments->seg_len - sizeof(mca_pml_dr_match_hdr_t), + csum); + mca_pml_dr_recv_frag_unmatched_ack(hdr, + ompi_proc, + MCA_PML_DR_HDR_FLAGS_MATCH, + csum == hdr->hdr_csum ? 1 : 0); + + } else { + COMPUTE_SPECIFIC_CHECKSUM(segments->seg_addr.pval + + sizeof(mca_pml_dr_rendezvous_hdr_t), + segments->seg_len - sizeof(mca_pml_dr_rendezvous_hdr_t), + csum); + mca_pml_dr_recv_frag_unmatched_ack(hdr, + ompi_proc, + MCA_PML_DR_HDR_FLAGS_BUFFERED, + csum == hdr->hdr_csum ? : 0); + } + if(additional_match) { opal_list_item_t* item; while(NULL != (item = opal_list_remove_first(&additional_matches))) { @@ -564,6 +592,64 @@ rematch: } + +static void mca_pml_dr_recv_frag_unmatched_ack( + mca_pml_dr_match_hdr_t* hdr, + ompi_proc_t* ompi_proc, + uint8_t flags, + uint8_t mask) + { + mca_bml_base_endpoint_t* bml_endpoint = NULL; + mca_btl_base_descriptor_t* des; + mca_bml_base_btl_t* bml_btl; + mca_pml_dr_recv_frag_t* frag; + mca_pml_dr_ack_hdr_t* ack; + int rc; + + bml_endpoint = (mca_bml_base_endpoint_t*) ompi_proc->proc_pml; + bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager); + + /* allocate descriptor */ + MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t)); + if(NULL == des) { + goto retry; + } + + /* fill out header */ + ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; + ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; + ack->hdr_common.hdr_flags = flags; + ack->hdr_vid = hdr->hdr_vid; + ack->hdr_vmask = mask; + ack->hdr_src_req = hdr->hdr_src_req; + assert(ack->hdr_src_req.pval); + ack->hdr_dst_req.pval = NULL; + ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); + + /* initialize descriptor */ + des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; + des->des_cbfunc = mca_pml_dr_ctl_completion; + + rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); + if(rc != OMPI_SUCCESS) { + mca_bml_base_free(bml_btl, des); + goto retry; + } + +/* mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); */ + + return; + + /* queue request to retry later */ +retry: + MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc); + frag->hdr.hdr_match = *hdr; + frag->num_segments = 0; + opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag); +} + + + /** * Scan the list of frags that came in ahead of time to see if any * can be processed at this time. If they can, try and match the diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index 0c65459c7a..499d56c710 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -139,7 +139,8 @@ static void mca_pml_dr_ctl_completion( static void mca_pml_dr_recv_request_matched( mca_pml_dr_recv_request_t* recvreq, - mca_pml_dr_rendezvous_hdr_t* hdr, + mca_pml_dr_match_hdr_t* hdr, + uint8_t flags, uint8_t mask) { ompi_proc_t* proc = recvreq->req_proc; @@ -156,7 +157,7 @@ static void mca_pml_dr_recv_request_matched( /* if this hasn't been initialized yet - this is a synchronous send */ if(NULL == proc) { ompi_proc_t *ompi_proc = ompi_comm_peer_lookup( - recvreq->req_recv.req_base.req_comm, hdr->hdr_match.hdr_src); + recvreq->req_recv.req_base.req_comm, hdr->hdr_src); proc = recvreq->req_proc = ompi_proc; } bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml; @@ -171,10 +172,10 @@ static void mca_pml_dr_recv_request_matched( /* fill out header */ ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH; - ack->hdr_vid = hdr->hdr_match.hdr_vid; + ack->hdr_common.hdr_flags = flags; + ack->hdr_vid = hdr->hdr_vid; ack->hdr_vmask = mask; - ack->hdr_src_req = hdr->hdr_match.hdr_src_req; + ack->hdr_src_req = hdr->hdr_src_req; assert(ack->hdr_src_req.pval); ack->hdr_dst_req.pval = recvreq; ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)); @@ -189,20 +190,21 @@ static void mca_pml_dr_recv_request_matched( goto retry; } - mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); + mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); return; /* queue request to retry later */ retry: MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc); - frag->hdr.hdr_rndv = *hdr; + frag->hdr.hdr_match = *hdr; frag->num_segments = 0; frag->request = recvreq; opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag); } + /* * Generate an ack w/ the current vfrag status. */ @@ -235,7 +237,7 @@ static void mca_pml_dr_recv_request_vfrag_ack( ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; - ack->hdr_common.hdr_flags = 0; + ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_VFRAG; ack->hdr_vid = vfrag->vf_id; ack->hdr_vmask = vfrag->vf_ack; ack->hdr_src_req = recvreq->req_vfrag0.vf_send; @@ -250,7 +252,7 @@ static void mca_pml_dr_recv_request_vfrag_ack( if(rc != OMPI_SUCCESS) { mca_bml_base_free(bml_btl, des); } - mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); + mca_pml_dr_comm_proc_set_acked(comm_proc, ack->hdr_vid); } @@ -293,6 +295,7 @@ void mca_pml_dr_recv_request_progress( bytes_received, bytes_delivered, csum); + break; case MCA_PML_DR_HDR_TYPE_RNDV: @@ -315,9 +318,10 @@ void mca_pml_dr_recv_request_progress( if(csum != hdr->hdr_match.hdr_csum) { assert(0); } - - mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, - (csum == hdr->hdr_match.hdr_csum)); + + mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_match, + MCA_PML_DR_HDR_FLAGS_RNDV, + csum == hdr->hdr_match.hdr_csum ? 1 : 0); break; case MCA_PML_DR_HDR_TYPE_FRAG: diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index 173b7a32d7..4322071a4f 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -131,9 +131,7 @@ void mca_pml_dr_match_completion_cache( /* signal request completion */ OPAL_THREAD_LOCK(&ompi_request_lock); - if(sendreq->req_num_acks == sendreq->req_num_vfrags) { - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - } + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -157,15 +155,14 @@ void mca_pml_dr_match_completion_free( orte_errmgr.abort(); } + /* we don't want to free the descriptor until we get a postive ACK */ /* free the descriptor */ - mca_bml_base_free( bml_btl, descriptor ); + /* mca_bml_base_free( bml_btl, descriptor ); */ /* signal request completion */ - OPAL_THREAD_LOCK(&ompi_request_lock); - if(sendreq->req_num_acks == sendreq->req_num_vfrags) { - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); + /* OPAL_THREAD_LOCK(&ompi_request_lock); */ +/* MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); */ +/* OPAL_THREAD_UNLOCK(&ompi_request_lock); */ } /* @@ -187,15 +184,12 @@ static void mca_pml_dr_rndv_completion( orte_errmgr.abort(); } - /* count bytes of user data actually delivered */ - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_dr_rendezvous_hdr_t)); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - + /* return the descriptor */ - mca_bml_base_free(bml_btl, descriptor); + /* mca_bml_base_free(bml_btl, descriptor); */ /* advance the request */ + MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); /* check for pending requests */ @@ -244,11 +238,8 @@ static void mca_pml_dr_frag_completion( OPAL_THREAD_LOCK(&ompi_request_lock); /* count bytes of user data actually delivered */ - MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_dr_frag_hdr_t)); if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 && - sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed && - sendreq->req_num_acks == sendreq->req_num_vfrags) { - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + (sendreq->req_send.req_bytes_packed - sendreq->req_send_offset) == 0) { schedule = false; } else { schedule = true; @@ -291,8 +282,9 @@ int mca_pml_dr_send_request_start_buffered( if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; } + sendreq->descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; - + /* pack the data into the BTL supplied buffer */ iov.iov_base = (void*)((unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_dr_rendezvous_hdr_t)); @@ -315,6 +307,7 @@ int mca_pml_dr_send_request_start_buffered( sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.bml_btl = bml_btl; + sendreq->req_vfrag0.vf_mask = 1; descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -394,12 +387,14 @@ int mca_pml_dr_send_request_start_copy( size_t max_data; int32_t free_after; int rc; - + + /* allocate descriptor */ mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_dr_match_hdr_t) + size); if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; - } + } + sendreq->descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; /* pack the data into the supplied buffer */ @@ -438,6 +433,8 @@ int mca_pml_dr_send_request_start_copy( sendreq->req_send_offset = max_data; sendreq->req_vfrag0.vf_size = max_data; sendreq->req_vfrag0.bml_btl = bml_btl; + sendreq->req_vfrag0.vf_mask = 1; + /* short message */ descriptor->des_cbfunc = mca_pml_dr_match_completion_free; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -452,7 +449,8 @@ int mca_pml_dr_send_request_start_copy( rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); if(OMPI_SUCCESS != rc) { mca_bml_base_free(bml_btl, descriptor ); - } + } + return rc; } @@ -482,6 +480,7 @@ int mca_pml_dr_send_request_start_prepare( if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; } + sendreq->descriptor = descriptor; /* hang on to this for later */ segment = descriptor->des_src; /* build match header */ @@ -532,6 +531,7 @@ int mca_pml_dr_send_request_start_rndv( mca_pml_dr_hdr_t* hdr; int rc; + /* prepare descriptor */ if(size == 0) { mca_bml_base_alloc( @@ -552,8 +552,9 @@ int mca_pml_dr_send_request_start_rndv( if(NULL == des) { return OMPI_ERR_OUT_OF_RESOURCE; } + sendreq->descriptor = des; /* hang on to this for later */ segment = des->des_src; - + /* build hdr */ hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr->hdr_common.hdr_flags = flags; @@ -575,6 +576,7 @@ int mca_pml_dr_send_request_start_rndv( sendreq->req_send_offset = size; sendreq->req_vfrag0.vf_size = size; sendreq->req_vfrag0.bml_btl = bml_btl; + sendreq->req_vfrag0.vf_mask = 1; /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); @@ -632,7 +634,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq) vfrag->bml_btl = bml_btl; vfrag->sendreq = sendreq; offset = 0; - sendreq->req_num_vfrags++; + } else { /* always schedule the vfrag accross the same btl */ bml_btl = vfrag->bml_btl; } @@ -813,41 +815,91 @@ void mca_pml_dr_send_request_acked( mca_pml_dr_ack_hdr_t* ack) { mca_pml_dr_vfrag_t* vfrag; + mca_btl_base_descriptor_t* descriptor; assert(sendreq); + descriptor = sendreq->descriptor; + MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag); /* MCA_PML_DR_VFRAG_ACK_STOP(vfrag); */ - if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) { - if(ack->hdr_vmask) { + + if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_BUFFERED) { + if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { + OPAL_THREAD_LOCK(&ompi_request_lock); + mca_bml_base_free( (mca_bml_base_btl_t*) descriptor->des_context, descriptor ); + sendreq->descriptor = NULL; + MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, + vfrag, + sizeof(mca_pml_dr_rendezvous_hdr_t)); + + OPAL_THREAD_UNLOCK(&ompi_request_lock); + } else { + assert(0); + } + } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_RNDV) { + if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { + OPAL_THREAD_LOCK(&ompi_request_lock); + if(sendreq->descriptor) { + mca_bml_base_free( (mca_bml_base_btl_t*) descriptor->des_context, descriptor ); + sendreq->descriptor = NULL; + MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, + vfrag, + sizeof(mca_pml_dr_rendezvous_hdr_t)); + + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req; - MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + /* we know that we have more data, otherwise it would have been + an MCA_PML_DR_HDR_FLAGS_MATCH */ + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } else { + MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); + } } else { vfrag->vf_idx = 0; vfrag->vf_ack = 0; + assert(0); /* retransmit missing fragments */ /* OPAL_THREAD_LOCK(&sendreq->req_mutex); */ /* opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); */ /* OPAL_THREAD_UNLOCK(&sendreq->req_mutex); */ /* mca_pml_dr_send_request_schedule(sendreq); */ } - } else { + } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) { + if((ack->hdr_vmask & vfrag->vf_mask) == vfrag->vf_mask) { + mca_bml_base_free( (mca_bml_base_btl_t*) descriptor->des_context, descriptor ); + sendreq->descriptor = NULL; + + OPAL_THREAD_LOCK(&ompi_request_lock); + MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, + vfrag, + sizeof(mca_pml_dr_match_hdr_t)); + OPAL_THREAD_UNLOCK(&ompi_request_lock); + /* everything was in the match, mark complete */ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + } + } else if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_VFRAG){ /* add in acknowledged fragments */ vfrag->vf_ack |= ack->hdr_vmask; - + /* have all fragments w/in this vfrag been acked? */ if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) { - + OPAL_THREAD_LOCK(&ompi_request_lock); + MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, + vfrag, + sizeof(mca_pml_dr_frag_hdr_t)); + OPAL_THREAD_UNLOCK(&ompi_request_lock); /* return vfrag */ if (vfrag != &sendreq->req_vfrag0) { MCA_PML_DR_VFRAG_RETURN(vfrag); } - - /* are we done with this request */ + + /* are we done with this request ? */ OPAL_THREAD_LOCK(&ompi_request_lock); - sendreq->req_num_acks++; - if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed && - sendreq->req_num_acks == sendreq->req_num_vfrags) { - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed){ + MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } OPAL_THREAD_UNLOCK(&ompi_request_lock); diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 72bc19b14c..c2376be7fa 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -39,7 +39,6 @@ extern "C" { #endif - struct mca_pml_dr_send_request_t { mca_pml_base_send_request_t req_send; /* ompi_proc_t* req_proc; */ @@ -60,9 +59,7 @@ struct mca_pml_dr_send_request_t { opal_list_t req_pending; opal_list_t req_retrans; opal_mutex_t req_mutex; - size_t req_num_acks; - size_t req_num_vfrags; - + mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */ }; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; @@ -146,8 +143,6 @@ do { sendreq->req_endpoint = endpoint; \ sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \ sendreq->req_vfrag = &sendreq->req_vfrag0; \ - sendreq->req_num_acks = 0; \ - sendreq->req_num_vfrags = 0; \ \ /* select a btl */ \ bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ @@ -161,48 +156,7 @@ do { rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \ break; \ default: \ - if(size == 0) { \ - mca_btl_base_descriptor_t* descriptor; \ - mca_btl_base_segment_t* segment; \ - mca_pml_dr_hdr_t* hdr; \ - /* allocate a descriptor */ \ - MCA_PML_DR_DES_ALLOC(bml_btl, descriptor, sizeof(mca_pml_dr_match_hdr_t)); \ - if(NULL == descriptor) { \ - return OMPI_ERR_OUT_OF_RESOURCE; \ - } \ - segment = descriptor->des_src; \ - /* setup vfrag */ \ - sendreq->req_vfrag0.vf_size = 0; \ - \ - /* build hdr */ \ - hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \ - hdr->hdr_common.hdr_flags = 0; \ - hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; \ - hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ - hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ - hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ - hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ - hdr->hdr_match.hdr_src_req.pval = sendreq; \ - hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; \ - hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); \ - \ - /* short message */ \ - descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \ - descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \ - descriptor->des_cbdata = sendreq; \ - \ - /* request is complete at mpi level */ \ - OPAL_THREAD_LOCK(&ompi_request_lock); \ - MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ - OPAL_THREAD_UNLOCK(&ompi_request_lock); \ - \ - /* send */ \ - rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); \ - if(OMPI_SUCCESS != rc) { \ - mca_bml_base_free(bml_btl, descriptor ); \ - } \ - \ - } else if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \ + if (bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \ rc = mca_pml_dr_send_request_start_prepare(sendreq, bml_btl, size); \ } else { \ rc = mca_pml_dr_send_request_start_copy(sendreq, bml_btl, size); \ @@ -282,9 +236,7 @@ do { /* has an acknowledgment been received */ \ if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ OPAL_THREAD_LOCK(&ompi_request_lock); \ - if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ - MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); \ - } else { \ + if(sendreq->req_send.req_bytes_packed - sendreq->req_send_offset) { \ schedule = true; \ } \ OPAL_THREAD_UNLOCK(&ompi_request_lock); \ @@ -405,16 +357,9 @@ do { \ * Update bytes delivered on request based on supplied descriptor */ -#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \ +#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, vfrag, hdrlen) \ do { \ - size_t i; \ - mca_btl_base_segment_t* segments = descriptor->des_src; \ - \ - for(i=0; ides_src_cnt; i++) { \ - sendreq->req_bytes_delivered += segments[i].seg_len; \ - } \ - sendreq->req_bytes_delivered -= hdrlen; \ - \ + sendreq->req_bytes_delivered += vfrag->vf_size; \ } while(0) /*