diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c index f08c500a61..690a51254f 100644 --- a/ompi/mca/pml/ob1/pml_ob1.c +++ b/ompi/mca/pml/ob1/pml_ob1.c @@ -339,13 +339,13 @@ void mca_pml_ob1_process_pending_packets(mca_bml_base_btl_t* bml_btl) send_dst, pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_dst_req.pval, - pckt->hdr.hdr_ack.hdr_rdma_offset); + pckt->hdr.hdr_ack.hdr_send_offset); MCA_PML_OB1_PCKT_PENDING_RETURN(pckt); if(OMPI_ERR_OUT_OF_RESOURCE == rc) { MCA_PML_OB1_ADD_ACK_TO_PENDING(pckt->proc, pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_dst_req.pval, - pckt->hdr.hdr_ack.hdr_rdma_offset); + pckt->hdr.hdr_ack.hdr_send_offset); return; } break; diff --git a/ompi/mca/pml/ob1/pml_ob1.h b/ompi/mca/pml/ob1/pml_ob1.h index 49c3f3fa0b..bb371f55b9 100644 --- a/ompi/mca/pml/ob1/pml_ob1.h +++ b/ompi/mca/pml/ob1/pml_ob1.h @@ -67,6 +67,7 @@ struct mca_pml_ob1_t { ompi_free_list_t recv_frags; ompi_free_list_t pending_pckts; ompi_free_list_t buffers; + ompi_free_list_t send_ranges; /* list of pending operations */ opal_list_t pckt_pending; diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 1e3ccdd317..198564d3a6 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -197,7 +197,16 @@ int mca_pml_ob1_component_open(void) OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t); - + OBJ_CONSTRUCT(&mca_pml_ob1.send_ranges, ompi_free_list_t); + ompi_free_list_init( + &mca_pml_ob1.send_ranges, + sizeof(mca_pml_ob1_send_range_t), + OBJ_CLASS(mca_pml_ob1_send_range_t), + mca_pml_ob1.free_list_num, + mca_pml_ob1.free_list_max, + mca_pml_ob1.free_list_inc, + NULL); + /* pending operations */ OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t); diff --git a/ompi/mca/pml/ob1/pml_ob1_hdr.h b/ompi/mca/pml/ob1/pml_ob1_hdr.h index 12c9f25c48..87eac7d55e 100644 --- a/ompi/mca/pml/ob1/pml_ob1_hdr.h +++ b/ompi/mca/pml/ob1/pml_ob1_hdr.h @@ -172,7 +172,7 @@ struct mca_pml_ob1_ack_hdr_t { #endif ompi_ptr_t hdr_src_req; /**< source request */ ompi_ptr_t hdr_dst_req; /**< matched receive request */ - uint64_t hdr_rdma_offset; /**< starting point rdma protocol */ + uint64_t hdr_send_offset; /**< starting point of copy in/out */ }; typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; @@ -183,13 +183,13 @@ typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; #define MCA_PML_OB1_ACK_HDR_NTOH(h) \ do { \ MCA_PML_OB1_COMMON_HDR_NTOH(h.hdr_common); \ - (h).hdr_rdma_offset = ntoh64((h).hdr_rdma_offset); \ + (h).hdr_send_offset = ntoh64((h).hdr_send_offset); \ } while (0) #define MCA_PML_OB1_ACK_HDR_HTON(h) \ do { \ MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \ - (h).hdr_rdma_offset = hton64((h).hdr_rdma_offset); \ + (h).hdr_send_offset = hton64((h).hdr_send_offset); \ } while (0) /** diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 750501978b..4fab2666e3 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -128,7 +128,16 @@ void mca_pml_ob1_recv_frag_callback( #endif sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval; sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; - sendreq->req_rdma_offset = (size_t)hdr->hdr_ack.hdr_rdma_offset; + + /* if the request should be delivered entirely by copy in/out + * then throttle sends */ + if(sendreq->req_bytes_delivered == hdr->hdr_ack.hdr_send_offset) + sendreq->req_throttle_sends = true; + + mca_pml_ob1_send_requst_copy_in_out(sendreq, + hdr->hdr_ack.hdr_send_offset, + sendreq->req_send.req_bytes_packed - + hdr->hdr_ack.hdr_send_offset); if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 && sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed) { diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index ba450f8e9f..94114af592 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -166,7 +166,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); - } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -179,7 +179,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, int mca_pml_ob1_recv_request_ack_send_btl( ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, - uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) + uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset) { mca_btl_base_descriptor_t* des; mca_pml_ob1_ack_hdr_t* ack; @@ -197,7 +197,7 @@ int mca_pml_ob1_recv_request_ack_send_btl( ack->hdr_common.hdr_flags = 0; ack->hdr_src_req.lval = hdr_src_req; ack->hdr_dst_req.pval = hdr_dst_req; - ack->hdr_rdma_offset = hdr_rdma_offset; + ack->hdr_send_offset = hdr_send_offset; #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #ifdef WORDS_BIGENDIAN @@ -235,11 +235,9 @@ static int mca_pml_ob1_recv_request_ack( bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml; - /* by default copy */ - recvreq->req_rdma_offset = hdr->hdr_msg_length; + /* by default copy everything */ + recvreq->req_send_offset = bytes_received; if(hdr->hdr_msg_length > bytes_received) { - - /* * lookup request buffer to determine if memory is already * registered. @@ -259,29 +257,30 @@ static int mca_pml_ob1_recv_request_ack( if (hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PIN && recvreq->req_rdma_cnt != 0) { - recvreq->req_rdma_offset = bytes_received; + recvreq->req_send_offset = hdr->hdr_msg_length; /* are rdma devices available for long rdma protocol */ } else if (bml_endpoint->btl_send_limit < hdr->hdr_msg_length && bml_endpoint->btl_rdma_offset < hdr->hdr_msg_length && mca_bml_base_btl_array_get_size(&bml_endpoint->btl_rdma)) { /* use convertor to figure out the rdma offset for this request */ - recvreq->req_rdma_offset = bml_endpoint->btl_rdma_offset; - if(recvreq->req_rdma_offset < bytes_received) { - recvreq->req_rdma_offset = bytes_received; + recvreq->req_send_offset = hdr->hdr_msg_length - + bml_endpoint->btl_rdma_offset; + if(recvreq->req_send_offset < bytes_received) { + recvreq->req_send_offset = bytes_received; } ompi_convertor_set_position( &recvreq->req_recv.req_convertor, - &recvreq->req_rdma_offset ); + &recvreq->req_send_offset ); } } - /* start rdma at current fragment offset - no need to ack */ - if(recvreq->req_rdma_offset == bytes_received) + /* nothing to send by copy in/out - no need to ack */ + if(recvreq->req_send_offset == hdr->hdr_msg_length) return OMPI_SUCCESS; } /* let know to shedule function there is no need to put ACK flag */ recvreq->req_ack_sent = true; return mca_pml_ob1_recv_request_ack_send(proc, hdr->hdr_src_req.lval, - recvreq, recvreq->req_rdma_offset); + recvreq, recvreq->req_send_offset); } @@ -473,6 +472,7 @@ void mca_pml_ob1_recv_request_progress( bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t); recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; recvreq->req_send = hdr->hdr_rndv.hdr_src_req; + recvreq->req_rdma_offset = bytes_received; MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received); if(recvreq->req_recv.req_base.req_pml_complete) { @@ -537,7 +537,7 @@ void mca_pml_ob1_recv_request_progress( if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); - } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -599,7 +599,7 @@ int mca_pml_ob1_recv_request_schedule_exclusive( mca_pml_ob1_recv_request_t* rec num_tries = recvreq->req_rdma_cnt; do { - size_t bytes_remaining = recvreq->req_recv.req_bytes_packed - + size_t bytes_remaining = recvreq->req_send_offset - recvreq->req_rdma_offset; size_t prev_bytes_remaining = 0; int num_fail = 0; diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.h b/ompi/mca/pml/ob1/pml_ob1_recvreq.h index c9e44cadb1..0928e025bb 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.h @@ -47,6 +47,7 @@ struct mca_pml_ob1_recv_request_t { size_t req_bytes_received; size_t req_bytes_delivered; size_t req_rdma_offset; + size_t req_send_offset; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; uint32_t req_rdma_cnt; uint32_t req_rdma_idx; @@ -350,7 +351,7 @@ static inline void mca_pml_ob1_recv_request_schedule( _pckt->hdr.hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK; \ _pckt->hdr.hdr_ack.hdr_src_req.lval = (S); \ _pckt->hdr.hdr_ack.hdr_dst_req.pval = (D); \ - _pckt->hdr.hdr_ack.hdr_rdma_offset = (O); \ + _pckt->hdr.hdr_ack.hdr_send_offset = (O); \ _pckt->proc = (P); \ _pckt->bml_btl = NULL; \ OPAL_THREAD_LOCK(&mca_pml_ob1.lock); \ @@ -364,7 +365,7 @@ int mca_pml_ob1_recv_request_ack_send_btl(ompi_proc_t* proc, uint64_t hdr_rdma_offset); static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc, - uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) + uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset) { size_t i; mca_bml_base_btl_t* bml_btl; @@ -374,12 +375,12 @@ static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc, for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) { bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); if(mca_pml_ob1_recv_request_ack_send_btl(proc, bml_btl, hdr_src_req, - hdr_dst_req, hdr_rdma_offset) == OMPI_SUCCESS) + hdr_dst_req, hdr_send_offset) == OMPI_SUCCESS) return OMPI_SUCCESS; } MCA_PML_OB1_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req, - hdr_rdma_offset); + hdr_send_offset); return OMPI_ERR_OUT_OF_RESOURCE; } diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index 1bd64878d4..8c7a3d6d76 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -33,6 +33,8 @@ #include "ompi/mca/bml/base/base.h" #include "ompi/datatype/dt_arch.h" +OBJ_CLASS_INSTANCE(mca_pml_ob1_send_range_t, ompi_free_list_item_t, + NULL, NULL); void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl) { @@ -131,6 +133,8 @@ static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req) req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; req->req_rdma_cnt = 0; + req->req_throttle_sends = false; + OBJ_CONSTRUCT(&req->req_send_ranges, opal_list_t); } OBJ_CLASS_INSTANCE( @@ -435,7 +439,6 @@ int mca_pml_ob1_send_request_start_buffered( /* update lengths */ segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data; - sendreq->req_send_offset = max_data; descriptor->des_cbfunc = mca_pml_ob1_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -448,8 +451,8 @@ int mca_pml_ob1_send_request_start_buffered( return rc; } - iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + sendreq->req_send_offset); - iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset; + iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + max_data); + iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - max_data; if((rc = ompi_convertor_pack( &sendreq->req_send.req_convertor, &iov, @@ -549,8 +552,6 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq, /* update lengths */ segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data; - sendreq->req_send_offset = max_data; - sendreq->req_rdma_offset = max_data; /* short message */ descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -625,10 +626,6 @@ int mca_pml_ob1_send_request_start_prepare( mca_pml_ob1_send_request_t* sendreq, /* short message */ descriptor->des_cbfunc = mca_pml_ob1_match_completion_free; - /* update lengths */ - sendreq->req_send_offset = size; - sendreq->req_rdma_offset = size; - descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_cbdata = sendreq; @@ -786,7 +783,6 @@ int mca_pml_ob1_send_request_start_rdma( /* update lengths with number of bytes actually packed */ segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t); - sendreq->req_rdma_offset = 0; /* first fragment of a long message */ des->des_cbfunc = mca_pml_ob1_rndv_completion; @@ -873,8 +869,6 @@ int mca_pml_ob1_send_request_start_rndv( des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbdata = sendreq; des->des_cbfunc = mca_pml_ob1_rndv_completion; - sendreq->req_send_offset = size; - sendreq->req_rdma_offset = size; /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); @@ -884,6 +878,26 @@ int mca_pml_ob1_send_request_start_rndv( return rc; } +void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq, + uint64_t send_offset, uint64_t send_length) +{ + mca_pml_ob1_send_range_t *sr; + ompi_free_list_item_t *i; + int rc = OMPI_SUCCESS; + + if(0 == send_length) + return; + + OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_ranges, i, rc); + + sr = (mca_pml_ob1_send_range_t*)i; + + sr->range_send_offset = send_offset; + sr->range_send_length = send_length; + OPAL_THREAD_LOCK(&sendreq->req_send_range_lock); + opal_list_append(&sendreq->req_send_ranges, (opal_list_item_t*)sr); + OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock); +} /** * Schedule pipeline of send descriptors for the given request. @@ -901,33 +915,47 @@ int mca_pml_ob1_send_request_schedule_exclusive( mca_bml_base_btl_array_get_size(&bml_endpoint->btl_send); do { - /* allocate remaining bytes to BTLs */ - size_t bytes_remaining = sendreq->req_rdma_offset - - sendreq->req_send_offset; size_t prev_bytes_remaining = 0, num_fail = 0; + mca_pml_ob1_send_range_t *range = NULL; - if(bytes_remaining == 0) { - OPAL_THREAD_ADD32(&sendreq->req_lock, -sendreq->req_lock); - return OMPI_SUCCESS; - } - while((int32_t)bytes_remaining > 0 && - (sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth - || - sendreq->req_rdma_offset < sendreq->req_send.req_bytes_packed)) - { + while(true) { mca_pml_ob1_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; int rc; - size_t size; + size_t size; + opal_list_item_t *item; mca_bml_base_btl_t* bml_btl = - mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + + if(NULL == range || 0 == range->range_send_length) { + OPAL_THREAD_LOCK(&sendreq->req_send_range_lock); + if(range) { + opal_list_remove_first(&sendreq->req_send_ranges); + OMPI_FREE_LIST_RETURN(&mca_pml_ob1.send_ranges, + &range->base); + } + + item = opal_list_get_first(&sendreq->req_send_ranges); + OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock); + + if(opal_list_get_end(&sendreq->req_send_ranges) == item) + break; + + range = (mca_pml_ob1_send_range_t*)item; + prev_bytes_remaining = 0; + } - if(prev_bytes_remaining == bytes_remaining) + if(true == sendreq->req_throttle_sends && + sendreq->req_pipeline_depth >= + mca_pml_ob1.send_pipeline_depth) + break; + + if(prev_bytes_remaining == range->range_send_length) num_fail++; else num_fail = 0; - prev_bytes_remaining = bytes_remaining; + prev_bytes_remaining = range->range_send_length; if (num_fail == num_btl_avail) { assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE); @@ -940,15 +968,15 @@ int mca_pml_ob1_send_request_schedule_exclusive( } if(num_btl_avail == 1 || - bytes_remaining < bml_btl->btl_min_send_size) { - size = bytes_remaining; + range->range_send_length < bml_btl->btl_min_send_size) { + size = range->range_send_length; } else { /* otherwise attempt to give the BTL a percentage of the message * based on a weighting factor. for simplicity calculate this as * a percentage of the overall message length (regardless of * amount previously assigned) */ - size = (size_t)(bml_btl->btl_weight * bytes_remaining); + size = (size_t)(bml_btl->btl_weight * range->range_send_length); } /* makes sure that we don't exceed BTL max send size */ @@ -959,7 +987,7 @@ int mca_pml_ob1_send_request_schedule_exclusive( /* pack into a descriptor */ ompi_convertor_set_position(&sendreq->req_send.req_convertor, - &sendreq->req_send_offset); + &range->range_send_offset); mca_bml_base_prepare_src(bml_btl, NULL, &sendreq->req_send.req_convertor, @@ -975,7 +1003,7 @@ int mca_pml_ob1_send_request_schedule_exclusive( hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG; - hdr->hdr_frag_offset = sendreq->req_send_offset; + hdr->hdr_frag_offset = range->range_send_offset; hdr->hdr_src_req.pval = sendreq; hdr->hdr_dst_req = sendreq->req_recv; @@ -996,25 +1024,18 @@ int mca_pml_ob1_send_request_schedule_exclusive( #endif #endif - /* - * The if-clause should be optimized away, in case the macro - * extends to ; - */ #if OMPI_WANT_PERUSE - if( 0 != sendreq->req_send_offset ) { - PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE, - &(sendreq->req_send.req_base), - size, PERUSE_SEND); - } + PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE, + &(sendreq->req_send.req_base), size, PERUSE_SEND); #endif /* OMPI_WANT_PERUSE */ /* initiate send - note that this may complete before the call returns */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); if(rc == OMPI_SUCCESS) { - bytes_remaining -= size; + range->range_send_length -= size; /* update state */ - sendreq->req_send_offset += size; + range->range_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1); } else { mca_bml_base_free(bml_btl,des); diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.h b/ompi/mca/pml/ob1/pml_ob1_sendreq.h index 7bd4e2ac01..75b7afcea2 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.h @@ -51,18 +51,26 @@ struct mca_pml_ob1_send_request_t { int32_t req_state; int32_t req_lock; #endif + bool req_throttle_sends; size_t req_pipeline_depth; size_t req_bytes_delivered; - size_t req_send_offset; - size_t req_rdma_offset; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; uint32_t req_rdma_cnt; mca_pml_ob1_send_pending_t req_pending; + opal_mutex_t req_send_range_lock; + opal_list_t req_send_ranges; }; typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t; OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); +struct mca_pml_ob1_send_range_t { + ompi_free_list_item_t base; + uint64_t range_send_offset; + uint64_t range_send_length; +}; +typedef struct mca_pml_ob1_send_range_t mca_pml_ob1_send_range_t; +OBJ_CLASS_DECLARATION(mca_pml_ob1_send_range_t); #define MCA_PML_OB1_SEND_REQUEST_ALLOC( \ comm, \ @@ -328,7 +336,6 @@ static inline int mca_pml_ob1_send_request_start( sendreq->req_lock = 0; sendreq->req_pipeline_depth = 0; sendreq->req_bytes_delivered = 0; - sendreq->req_send_offset = 0; sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE; sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32( &comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1); @@ -394,6 +401,8 @@ int mca_pml_ob1_send_request_put_frag(mca_pml_ob1_rdma_frag_t* frag); * should be considered for sending packets */ void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl); +void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq, + uint64_t send_offset, uint64_t send_length); #if defined(c_plusplus) || defined(__cplusplus) } #endif