diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 75af2b8873..6e67c238bb 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -134,13 +134,11 @@ void mca_pml_ob1_recv_frag_callback( mca_btl_base_module_t* btl, hdr->hdr_ack.hdr_send_offset, sendreq->req_send.req_bytes_packed - hdr->hdr_ack.hdr_send_offset); - if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 && - sendreq->req_bytes_delivered >= - sendreq->req_send.req_bytes_packed) { - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - } else { + + OPAL_THREAD_ADD32(&sendreq->req_state, -1); + + if(send_request_pml_complete_check(sendreq) == false) mca_pml_ob1_send_request_schedule(sendreq); - } break; } diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index aaccf77885..50f92e678b 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -113,9 +113,7 @@ static int mca_pml_ob1_send_request_free(struct ompi_request_t** request) &(sendreq->req_send.req_base), PERUSE_SEND ); if( true == sendreq->req_send.req_base.req_pml_complete ) { - /* don't free request if other thread running schedule */ - if(OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1) - MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); + MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); } OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -182,7 +180,7 @@ mca_pml_ob1_match_completion_cache( struct mca_btl_base_module_t* btl, MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor ); /* signal request completion */ - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); + send_request_pml_complete(sendreq); /* check for pending requests */ MCA_PML_OB1_PROGRESS_PENDING(bml_btl); @@ -217,7 +215,7 @@ mca_pml_ob1_match_completion_free( struct mca_btl_base_module_t* btl, mca_bml_base_free( bml_btl, descriptor ); /* signal request completion */ - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); + send_request_pml_complete(sendreq); /* check for pending requests */ MCA_PML_OB1_PROGRESS_PENDING(bml_btl); @@ -258,32 +256,16 @@ mca_pml_ob1_rndv_completion( mca_btl_base_module_t* btl, sizeof(mca_pml_ob1_rendezvous_hdr_t), req_bytes_delivered ); - OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, - req_bytes_delivered); + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, req_bytes_delivered); + /* return the descriptor */ mca_bml_base_free(bml_btl, descriptor); /* advance the request */ - if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 && - sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed) { - if(!sendreq->req_send.req_base.req_pml_complete){ - /* We must check that completion hasn't already occured */ - /* for the self BTL we may choose the RDMA PUT protocol */ - /* on the send side, in this case we send no eager data */ - /* if, on the receiver side the data is not contiguous we */ - /* may choose to use the copy in/out protocol */ - /* if this occurs, the entire request can be completed in a */ - /* single call to mca_pml_ob1_recv_request_ack */ - /* as soon as the last fragment of the copy in/out protocol */ - /* gets local completion. This doesn't occur in the general */ - /* case of the copy in/out protocol because when both sender */ - /* and receiver agree on the copy in/out protoocol we eagerly */ - /* send data, we don't update the request with this eagerly sent */ - /* data until here in this function, so completion could not have */ - /* yet occurred. */ - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - } - } + OPAL_THREAD_ADD32(&sendreq->req_state, -1); + + send_request_pml_complete_check(sendreq); + /* check for pending requests */ MCA_PML_OB1_PROGRESS_PENDING(bml_btl); } @@ -306,10 +288,9 @@ mca_pml_ob1_rget_completion( mca_btl_base_module_t* btl, /* count bytes of user data actually delivered and check for request completion */ MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( des->des_src, des->des_src_cnt, 0, req_bytes_delivered ); - if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered ) - == sendreq->req_send.req_bytes_packed ) { - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - } + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, req_bytes_delivered); + + send_request_pml_complete_check(sendreq); /* release resources */ btl->btl_free(btl,des); @@ -368,12 +349,10 @@ mca_pml_ob1_frag_completion( mca_btl_base_module_t* btl, /* return the descriptor */ mca_bml_base_free(bml_btl, descriptor); - if(OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, - req_bytes_delivered) == sendreq->req_send.req_bytes_packed) { - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - } else { + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, req_bytes_delivered); + + if(send_request_pml_complete_check(sendreq) == false) mca_pml_ob1_send_request_schedule(sendreq); - } /* check for pending requests */ MCA_PML_OB1_PROGRESS_PENDING(bml_btl); @@ -473,6 +452,10 @@ int mca_pml_ob1_send_request_start_buffered( MPI_BYTE, sendreq->req_send.req_bytes_packed, sendreq->req_send.req_addr ); + + /* wait for ack and completion */ + sendreq->req_state = 2; + /* request is complete at mpi level */ OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); @@ -790,6 +773,9 @@ int mca_pml_ob1_send_request_start_rdma( /* first fragment of a long message */ des->des_cbfunc = mca_pml_ob1_rndv_completion; + + /* wait for ack and completion */ + sendreq->req_state = 2; } des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -870,6 +856,9 @@ int mca_pml_ob1_send_request_start_rndv( mca_pml_ob1_send_request_t* sendreq, des->des_cbdata = sendreq; des->des_cbfunc = mca_pml_ob1_rndv_completion; + /* wait for ack and completion */ + sendreq->req_state = 2; + /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) { @@ -926,7 +915,6 @@ void mca_pml_ob1_send_request_copy_in_out( mca_pml_ob1_send_request_t *sendreq, int mca_pml_ob1_send_request_schedule_exclusive( mca_pml_ob1_send_request_t* sendreq) { - do { size_t prev_bytes_remaining = 0; mca_pml_ob1_send_range_t *range = NULL; @@ -1060,16 +1048,9 @@ int mca_pml_ob1_send_request_schedule_exclusive( } mca_bml.bml_progress(); } - OPAL_THREAD_LOCK(&ompi_request_lock); - if(sendreq->req_send.req_base.req_free_called && - sendreq->req_send.req_base.req_pml_complete) { - /* if request already completed and freed put it on a free list */ - MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - return MPI_SUCCESS; - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); - } while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0); + } while (OPAL_THREAD_ADD32(&sendreq->req_lock, -1) > 0); + + send_request_pml_complete_check(sendreq); return OMPI_SUCCESS; } @@ -1103,14 +1084,9 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, des->order, 0); /* check for request completion */ - if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length) - >= sendreq->req_send.req_bytes_packed) { - - /* if we've got completion on rndv packet */ - if (sendreq->req_state == 2) { - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - } - } + OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length); + + send_request_pml_complete_check(sendreq); MCA_PML_OB1_RDMA_FRAG_RETURN(frag); /* return rdma descriptor - do this after queuing the fin message - as @@ -1209,7 +1185,7 @@ void mca_pml_ob1_send_request_put( mca_pml_ob1_send_request_t* sendreq, size_t i, size = 0; if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_TYPE_ACK) { - OPAL_THREAD_ADD32(&sendreq->req_state, 1); + OPAL_THREAD_ADD32(&sendreq->req_state, -1); } MCA_PML_OB1_RDMA_FRAG_ALLOC(frag, rc); diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.h b/ompi/mca/pml/ob1/pml_ob1_sendreq.h index e02461fd22..72b16b229e 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.h @@ -42,13 +42,8 @@ struct mca_pml_ob1_send_request_t { mca_pml_base_send_request_t req_send; mca_bml_base_endpoint_t* req_endpoint; ompi_ptr_t req_recv; -#if OMPI_HAVE_THREAD_SUPPORT - volatile int32_t req_state; - volatile int32_t req_lock; -#else int32_t req_state; int32_t req_lock; -#endif bool req_throttle_sends; size_t req_pipeline_depth; size_t req_bytes_delivered; @@ -159,45 +154,77 @@ do { } while(0) /* - * The PML has completed a send request. Note that this request - * may have been orphaned by the user or have already completed - * at the MPI level. - * This macro will never be called directly from the upper level, as it should - * only be an internal call to the PML. + * Release resources associated with a request */ -#define MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq) \ - do { \ - assert( false == sendreq->req_send.req_base.req_pml_complete ); \ - \ - if( sendreq->req_send.req_bytes_packed > 0 ) { \ - PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END, \ - &(sendreq->req_send.req_base), \ - PERUSE_SEND ); \ - } \ - \ - /* return mpool resources */ \ - mca_pml_ob1_free_rdma_resources(sendreq); \ - \ - if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ - } \ - \ - OPAL_THREAD_LOCK(&ompi_request_lock); \ - if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \ - /* Should only be called for long messages (maybe synchronous) */ \ - MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); \ - } \ - sendreq->req_send.req_base.req_pml_complete = true; \ - \ - if( sendreq->req_send.req_base.req_free_called ) { \ - /* don't free request if other thread running schedule */ \ - if(OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1) \ - MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); \ - } \ - OPAL_THREAD_UNLOCK(&ompi_request_lock); \ - } while (0) +#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \ + do { \ + /* Let the base handle the reference counts */ \ + MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ + OMPI_FREE_LIST_RETURN( &mca_pml_base_send_requests, \ + (ompi_free_list_item_t*)sendreq); \ + } while(0) + + +/* + * The PML has completed a send request. Note that this request + * may have been orphaned by the user or have already completed + * at the MPI level. + * This function will never be called directly from the upper level, as it + * should only be an internal call to the PML. + * + */ +void static inline +send_request_pml_complete(mca_pml_ob1_send_request_t *sendreq) +{ + assert(false == sendreq->req_send.req_base.req_pml_complete); + + if(sendreq->req_send.req_bytes_packed > 0) { + PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END, + &(sendreq->req_send.req_base), PERUSE_SEND); + } + + /* return mpool resources */ + mca_pml_ob1_free_rdma_resources(sendreq); + + if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && + sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { + mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); + } + + OPAL_THREAD_LOCK(&ompi_request_lock); + if(false == sendreq->req_send.req_base.req_ompi.req_complete) { + /* Should only be called for long messages (maybe synchronous) */ + MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); + } + sendreq->req_send.req_base.req_pml_complete = true; + + if(sendreq->req_send.req_base.req_free_called) { + MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +/* returns true if request was completed on PML level */ +bool static inline +send_request_pml_complete_check(mca_pml_ob1_send_request_t *sendreq) +{ + opal_atomic_rmb(); + /* if no more events are expected for the request and the whole message is + * already sent and send fragment scheduling isn't running in another + * thread then complete the request on PML level. From now on, if user + * called free on this request, the request structure can be reused for + * another request or if the request is persistent it can be restarted */ + if(sendreq->req_state == 0 && + sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed + && OPAL_THREAD_ADD32(&sendreq->req_lock, 1) == 1) { + send_request_pml_complete(sendreq); + return true; + } + + return false; +} + /** * Schedule additional fragments @@ -219,18 +246,6 @@ static inline void mca_pml_ob1_send_request_schedule( mca_pml_ob1_send_request_schedule_exclusive(sendreq); } -/* - * Release resources associated with a request - */ - -#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \ - do { \ - /* Let the base handle the reference counts */ \ - MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ - OMPI_FREE_LIST_RETURN( &mca_pml_base_send_requests, \ - (ompi_free_list_item_t*)sendreq); \ - } while(0) - /** * Start the specified request */ @@ -337,7 +352,7 @@ mca_pml_ob1_send_request_start( mca_pml_ob1_send_request_t* sendreq ) sendreq->req_endpoint = endpoint; sendreq->req_state = 0; - sendreq->req_lock=0 ; + sendreq->req_lock = 0; sendreq->req_pipeline_depth = 0; sendreq->req_bytes_delivered = 0; sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE;