diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index 0a83e7a1fb..018c6c4511 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -78,16 +78,11 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request, OPAL_THREAD_LOCK(&ompi_request_lock); ompi_request->req_status._cancelled = true; - ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions - * on this particular request will finish */ - /* Now we have a problem if we are in a multi-threaded environment. We shou ld - * broadcast the condition on the request in order to allow the other threa ds - * to complete their test/wait functions. + /* This macro will set the req_complete to true so the MPI Test/Wait* functions + * on this request will be able to complete. As the status is marked as + * cancelled the cancel state will be detected. */ - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } + MCA_PML_BASE_REQUEST_MPI_COMPLETE(ompi_request); OPAL_THREAD_UNLOCK(&ompi_request_lock); return OMPI_SUCCESS; } @@ -140,6 +135,7 @@ static void mca_pml_ob1_put_completion( mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)des->des_cbdata; mca_btl_base_segment_t* segments = des->des_dst; size_t i, bytes_received = 0; + bool schedule = false; for(i=0; ides_dst_cnt; i++) bytes_received += segments[i].seg_len; @@ -148,22 +144,21 @@ static void mca_pml_ob1_put_completion( mca_bml_base_free(bml_btl, des); /* check completion status */ + OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_bytes_received += bytes_received; recvreq->req_bytes_delivered += bytes_received; if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) { - OPAL_THREAD_LOCK(&ompi_request_lock); /* initialize request status */ recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered; recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + schedule = true; + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + if( true == schedule ) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -197,6 +192,9 @@ static void mca_pml_ob1_recv_request_ack( if(hdr->hdr_msg_length > bytes_received) { + /* by default copy */ + recvreq->req_rdma_offset = hdr->hdr_msg_length; + /* * lookup request buffer to determine if memory is already * registered. @@ -230,20 +228,8 @@ static void mca_pml_ob1_recv_request_ack( ompi_convertor_set_position( &recvreq->req_recv.req_convertor, &recvreq->req_rdma_offset); - - /* copy */ - } else { - recvreq->req_rdma_offset = hdr->hdr_msg_length; } - - /* copy */ - } else { - recvreq->req_rdma_offset = hdr->hdr_msg_length; } - - /* copy */ - } else { - recvreq->req_rdma_offset = hdr->hdr_msg_length; } /* allocate descriptor */ @@ -323,12 +309,7 @@ static void mca_pml_ob1_rget_completion( if(recvreq->req_bytes_received == recvreq->req_recv.req_bytes_packed) { recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered; recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); } OPAL_THREAD_UNLOCK(&ompi_request_lock); @@ -455,6 +436,7 @@ void mca_pml_ob1_recv_request_progress( size_t data_offset = 0; mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval; size_t i; + bool schedule = false; for(i=0; ireq_bytes_received += bytes_received; recvreq->req_bytes_delivered += bytes_delivered; if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) { - OPAL_THREAD_LOCK(&ompi_request_lock); /* initialize request status */ recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered; recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + schedule = true; + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + if( true == schedule ) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -570,17 +551,12 @@ void mca_pml_ob1_recv_request_matched_probe( } /* set completion status */ - OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src; recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; + OPAL_THREAD_LOCK(&ompi_request_lock); recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); OPAL_THREAD_UNLOCK(&ompi_request_lock); } diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index da1c414997..f65d4171df 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -20,9 +20,6 @@ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ #include "ompi_config.h" -#ifdef HAVE_SCHED_H -#include -#endif /* HAVE_SCHED_H */ #include "ompi/include/constants.h" #include "mca/pml/pml.h" #include "mca/btl/btl.h" diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.h b/ompi/mca/pml/ob1/pml_ob1_sendreq.h index 0c7e354528..3f53ce0528 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.h @@ -235,11 +235,7 @@ do { (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ (sendreq)->req_send.req_base.req_ompi.req_status._count = \ (sendreq)->req_send.req_bytes_packed; \ - (sendreq)->req_send.req_base.req_ompi.req_complete = true; \ - ompi_request_completed++; \ - if(ompi_request_waiting) { \ - opal_condition_broadcast(&ompi_request_cond); \ - } \ + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \ } while(0) /* @@ -251,6 +247,7 @@ do { do { \ size_t r; \ /* request completed at pml level */ \ + assert( false == (sendreq)->req_send.req_base.req_pml_complete ); \ (sendreq)->req_send.req_base.req_pml_complete = true; \ \ /* return mpool resources */ \ @@ -321,14 +318,13 @@ do { #define MCA_PML_OB1_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \ do { \ - size_t i; \ + size_t i, req_bytes_delivered = 0; \ mca_btl_base_segment_t* segments = descriptor->des_src; \ \ for(i=0; ides_src_cnt; i++) { \ - sendreq->req_bytes_delivered += segments[i].seg_len; \ + req_bytes_delivered += segments[i].seg_len; \ } \ - sendreq->req_bytes_delivered -= hdrlen; \ - \ + sendreq->req_bytes_delivered += (req_bytes_delivered - hdrlen); \ } while(0) /*