diff --git a/ompi/mca/io/base/io_base_request.c b/ompi/mca/io/base/io_base_request.c index 613951c368..e6c414b433 100644 --- a/ompi/mca/io/base/io_base_request.c +++ b/ompi/mca/io/base/io_base_request.c @@ -164,9 +164,7 @@ int mca_io_base_request_alloc(ompi_file_t *file, (*req)->req_file = file; (*req)->req_ver = file->f_io_version; (*req)->free_called = false; - (*req)->super.req_fini = - file->f_io_selected_module.v1_0_0.io_module_request_fini; - (*req)->super.req_free = + (*req)->super.req_free = file->f_io_selected_module.v1_0_0.io_module_request_free; (*req)->super.req_cancel = file->f_io_selected_module.v1_0_0.io_module_request_cancel; @@ -195,7 +193,7 @@ int mca_io_base_request_alloc(ompi_file_t *file, /* Initialize the request */ - OMPI_REQUEST_INIT(&((*req)->super)); + OMPI_REQUEST_INIT(&((*req)->super), false); /* All done */ diff --git a/ompi/mca/io/io.h b/ompi/mca/io/io.h index 7360412040..ac1c4c7233 100644 --- a/ompi/mca/io/io.h +++ b/ompi/mca/io/io.h @@ -350,10 +350,6 @@ struct mca_io_base_module_1_0_0_t { mca_io_base_module_request_once_finalize_fn_t io_module_request_once_finalize; - /** Finalize a request (per usage) */ - - ompi_request_free_fn_t io_module_request_fini; - /** Free a request (per usage) */ ompi_request_free_fn_t io_module_request_free; diff --git a/ompi/mca/io/romio/src/io_romio.h b/ompi/mca/io/romio/src/io_romio.h index f2c5aa0f3f..6093e991bd 100644 --- a/ompi/mca/io/romio/src/io_romio.h +++ b/ompi/mca/io/romio/src/io_romio.h @@ -64,7 +64,6 @@ typedef struct mca_io_romio_data_t mca_io_romio_data_t; */ int mca_io_romio_progress(void); -int mca_io_romio_request_fini(ompi_request_t **req); int mca_io_romio_request_free(ompi_request_t **req); int mca_io_romio_request_cancel(ompi_request_t *req, int flag); diff --git a/ompi/mca/io/romio/src/io_romio_component.c b/ompi/mca/io/romio/src/io_romio_component.c index 66e1753c76..621538eb92 100644 --- a/ompi/mca/io/romio/src/io_romio_component.c +++ b/ompi/mca/io/romio/src/io_romio_component.c @@ -289,7 +289,7 @@ static int progress() * here */ if (ioreq->free_called) { - ret = ioreq->super.req_fini((ompi_request_t**) &ioreq); + ret = ompi_request_free((ompi_request_t**) &ioreq); if (OMPI_SUCCESS != ret) { OPAL_THREAD_UNLOCK(&mca_io_romio_mutex); return ret; diff --git a/ompi/mca/io/romio/src/io_romio_module.c b/ompi/mca/io/romio/src/io_romio_module.c index 9700dafd96..a933b46a2d 100644 --- a/ompi/mca/io/romio/src/io_romio_module.c +++ b/ompi/mca/io/romio/src/io_romio_module.c @@ -43,7 +43,6 @@ mca_io_base_module_1_0_0_t mca_io_romio_module = { /* Finalize, free, cancel */ - mca_io_romio_request_fini, mca_io_romio_request_free, mca_io_romio_request_cancel, diff --git a/ompi/mca/io/romio/src/io_romio_request.c b/ompi/mca/io/romio/src/io_romio_request.c index f441c57e20..4c8db35ea1 100644 --- a/ompi/mca/io/romio/src/io_romio_request.c +++ b/ompi/mca/io/romio/src/io_romio_request.c @@ -24,43 +24,26 @@ #include "io_romio.h" -int mca_io_romio_request_fini(ompi_request_t **req) +int mca_io_romio_request_free(ompi_request_t **req) { mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req); - int ret = OMPI_SUCCESS; OPAL_THREAD_LOCK(&mca_io_romio_mutex); /* clean up the fortran stuff, mark us as invalid */ OMPI_REQUEST_FINI(*req); - /* and shove us back in the free list */ - mca_io_base_request_free(ioreq->req_file, ioreq); - - OPAL_THREAD_UNLOCK(&mca_io_romio_mutex); - - *req = MPI_REQUEST_NULL; - return ret; -} - - -int mca_io_romio_request_free(ompi_request_t **req) -{ - mca_io_base_request_t *ioreq = *((mca_io_base_request_t**) req); - int ret = OMPI_SUCCESS; - - OPAL_THREAD_LOCK(&mca_io_romio_mutex); ioreq->free_called = true; /* if the thing is done already, finalize it and get out... */ if (ioreq->super.req_complete) { - ret = ioreq->super.req_fini(req); + mca_io_base_request_free(ioreq->req_file, ioreq); } OPAL_THREAD_UNLOCK(&mca_io_romio_mutex); *req = MPI_REQUEST_NULL; - return ret; + return OMPI_SUCCESS; } diff --git a/ompi/mca/mpool/base/mpool_base_close.c b/ompi/mca/mpool/base/mpool_base_close.c index 0a22f4bf44..aaadfdddd1 100644 --- a/ompi/mca/mpool/base/mpool_base_close.c +++ b/ompi/mca/mpool/base/mpool_base_close.c @@ -63,8 +63,8 @@ int mca_mpool_base_close(void) if(mca_mpool_base_use_mem_hooks && 0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) { opal_mem_hooks_unregister_release(mca_mpool_base_mem_cb); - OBJ_DESTRUCT(&mca_mpool_base_mem_cb_array); } + OBJ_DESTRUCT(&mca_mpool_base_mem_cb_array); /* All done */ return OMPI_SUCCESS; diff --git a/ompi/mca/mpool/base/mpool_base_lookup.c b/ompi/mca/mpool/base/mpool_base_lookup.c index b121d08778..a4d4d5631f 100644 --- a/ompi/mca/mpool/base/mpool_base_lookup.c +++ b/ompi/mca/mpool/base/mpool_base_lookup.c @@ -93,7 +93,6 @@ mca_mpool_base_module_t* mca_mpool_base_module_create( if(mca_mpool_base_use_mem_hooks && 0 != (OPAL_MEMORY_FREE_SUPPORT & opal_mem_hooks_support_level())) { opal_mem_hooks_register_release(mca_mpool_base_mem_cb, NULL); - OBJ_CONSTRUCT(&mca_mpool_base_mem_cb_array, ompi_pointer_array_t); } #if defined(HAVE_MALLOPT) @@ -103,6 +102,7 @@ mca_mpool_base_module_t* mca_mpool_base_module_create( } #endif /* defined(HAVE_MALLOPT) */ } + OBJ_CONSTRUCT(&mca_mpool_base_mem_cb_array, ompi_pointer_array_t); return module; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sendreq.c b/ompi/mca/osc/pt2pt/osc_pt2pt_sendreq.c index 8a621adc6a..fec0742df4 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sendreq.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sendreq.c @@ -66,7 +66,6 @@ ompi_osc_pt2pt_sendreq_alloc_init(ompi_osc_pt2pt_req_type_t req_type, static void ompi_osc_pt2pt_sendreq_construct(ompi_osc_pt2pt_sendreq_t *req) { req->super.req_type = OMPI_REQUEST_WIN; - req->super.req_fini = NULL; req->super.req_free = NULL; req->super.req_cancel = NULL; OBJ_CONSTRUCT(&(req->req_origin_convertor), ompi_convertor_t); diff --git a/ompi/mca/pml/base/pml_base_recvreq.h b/ompi/mca/pml/base/pml_base_recvreq.h index 2b57ea0aae..a6b3d9d3d3 100644 --- a/ompi/mca/pml/base/pml_base_recvreq.h +++ b/ompi/mca/pml/base/pml_base_recvreq.h @@ -66,7 +66,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t); OBJ_RETAIN(comm); \ OBJ_RETAIN(datatype); \ \ - OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ + OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, persistent); \ (request)->req_bytes_packed = 0; \ (request)->req_base.req_sequence = 0; \ (request)->req_base.req_addr = addr; \ @@ -76,7 +76,6 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_base_recv_request_t); (request)->req_base.req_tag = tag; \ (request)->req_base.req_comm = comm; \ (request)->req_base.req_proc = NULL; \ - (request)->req_base.req_persistent = persistent; \ (request)->req_base.req_pml_complete = (persistent ? true : false); \ (request)->req_base.req_free_called = false; \ } diff --git a/ompi/mca/pml/base/pml_base_request.h b/ompi/mca/pml/base/pml_base_request.h index dd75b65952..4753c1b46d 100644 --- a/ompi/mca/pml/base/pml_base_request.h +++ b/ompi/mca/pml/base/pml_base_request.h @@ -56,7 +56,6 @@ struct mca_pml_base_request_t { uint64_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */ struct ompi_datatype_t *req_datatype; /**< pointer to data type */ mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */ - bool req_persistent; /**< flag indicating if the this is a persistent request */ volatile bool req_pml_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ volatile bool req_free_called; /**< flag indicating if the user has freed this request */ }; diff --git a/ompi/mca/pml/base/pml_base_sendreq.h b/ompi/mca/pml/base/pml_base_sendreq.h index 6ef07d8b0f..d4b072573d 100644 --- a/ompi/mca/pml/base/pml_base_sendreq.h +++ b/ompi/mca/pml/base/pml_base_sendreq.h @@ -80,7 +80,7 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t; OBJ_RETAIN(comm); \ OBJ_RETAIN(datatype); \ \ - OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ + OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, persistent); \ (request)->req_addr = addr; \ (request)->req_count = count; \ (request)->req_datatype = datatype; \ @@ -92,7 +92,6 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t; (request)->req_base.req_tag = (int32_t)tag; \ (request)->req_base.req_comm = comm; \ /* (request)->req_base.req_proc is set on request allocation */ \ - (request)->req_base.req_persistent = persistent; \ (request)->req_base.req_pml_complete = (persistent ? true : false); \ (request)->req_base.req_free_called = false; \ (request)->req_base.req_ompi.req_status._cancelled = 0; \ diff --git a/ompi/mca/pml/dr/pml_dr_irecv.c b/ompi/mca/pml/dr/pml_dr_irecv.c index 3add584e29..1112130c4c 100644 --- a/ompi/mca/pml/dr/pml_dr_irecv.c +++ b/ompi/mca/pml/dr/pml_dr_irecv.c @@ -88,6 +88,12 @@ int mca_pml_dr_recv(void *addr, MCA_PML_DR_RECV_REQUEST_START(recvreq); if (recvreq->req_recv.req_base.req_ompi.req_complete == false) { +#if OMPI_ENABLE_PROGRESS_THREADS + if(opal_progress_spin(&recvreq->req_recv.req_base.req_ompi.req_complete)) { + goto finished; + } +#endif + /* give up and sleep until completion */ if (opal_using_threads()) { opal_mutex_lock(&ompi_request_lock); @@ -104,11 +110,15 @@ int mca_pml_dr_recv(void *addr, } } +#if OMPI_ENABLE_PROGRESS_THREADS +finished: +#endif + if (NULL != status) { /* return status */ *status = recvreq->req_recv.req_base.req_ompi.req_status; } rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR; - MCA_PML_DR_RECV_REQUEST_RETURN(recvreq); + ompi_request_free( (ompi_request_t**)&recvreq ); return rc; } diff --git a/ompi/mca/pml/dr/pml_dr_isend.c b/ompi/mca/pml/dr/pml_dr_isend.c index a3ad6ecd77..a1fad654dd 100644 --- a/ompi/mca/pml/dr/pml_dr_isend.c +++ b/ompi/mca/pml/dr/pml_dr_isend.c @@ -108,6 +108,13 @@ int mca_pml_dr_send(void *buf, } if (sendreq->req_send.req_base.req_ompi.req_complete == false) { +#if OMPI_ENABLE_PROGRESS_THREADS + if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) { + ompi_request_free( (ompi_request_t**)&sendreq ); + return OMPI_SUCCESS; + } +#endif + /* give up and sleep until completion */ if (opal_using_threads()) { opal_mutex_lock(&ompi_request_lock); @@ -125,7 +132,8 @@ int mca_pml_dr_send(void *buf, } /* return request to pool */ - MCA_PML_DR_FREE((ompi_request_t **) & sendreq); - return OMPI_SUCCESS; + rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR; + ompi_request_free((ompi_request_t **) & sendreq); + return rc; } diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index 344f56d45e..65eca7fa26 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -29,32 +29,27 @@ #include "pml_dr_sendreq.h" #include "ompi/mca/bml/base/base.h" #include "orte/mca/errmgr/errmgr.h" - + static mca_pml_dr_recv_frag_t* mca_pml_dr_recv_request_match_specific_proc( mca_pml_dr_recv_request_t* request, mca_pml_dr_comm_proc_t* proc); -static int mca_pml_dr_recv_request_fini(struct ompi_request_t** request) +static inline int mca_pml_dr_recv_request_free(struct ompi_request_t** request) { - mca_pml_dr_recv_request_t* recvreq = *(mca_pml_dr_recv_request_t**)request; - if(recvreq->req_recv.req_base.req_persistent) { - if(recvreq->req_recv.req_base.req_free_called) { - MCA_PML_DR_FREE(request); - } else { - recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; - } - } else { - MCA_PML_DR_FREE(request); + mca_pml_dr_recv_request_t* recvreq = *(mca_pml_dr_recv_request_t**)request; + assert( false == recvreq->req_recv.req_base.req_free_called ); + + OPAL_THREAD_LOCK(&ompi_request_lock); + recvreq->req_recv.req_base.req_free_called = true; + if( true == recvreq->req_recv.req_base.req_pml_complete ) { + MCA_PML_DR_RECV_REQUEST_RETURN( recvreq ); } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + *request = MPI_REQUEST_NULL; return OMPI_SUCCESS; } -static int mca_pml_dr_recv_request_free(struct ompi_request_t** request) -{ - MCA_PML_DR_FREE(request); - return OMPI_SUCCESS; -} - static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, int complete) { mca_pml_dr_recv_request_t* request = (mca_pml_dr_recv_request_t*)ompi_request; @@ -63,7 +58,7 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i if( true == ompi_request->req_complete ) { /* way to late to cancel this one */ return OMPI_SUCCESS; } - + /* The rest should be protected behind the match logic lock */ OPAL_THREAD_LOCK(&comm->matching_lock); if( OMPI_ANY_TAG == ompi_request->req_status.MPI_TAG ) { /* the match has not been already done */ @@ -75,19 +70,14 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i } } OPAL_THREAD_UNLOCK(&comm->matching_lock); - + OPAL_THREAD_LOCK(&ompi_request_lock); ompi_request->req_status._cancelled = true; - ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions - * on this particular request will finish */ - /* Now we have a problem if we are in a multi-threaded environment. We should - * broadcast the condition on the request in order to allow the other threads - * to complete their test/wait functions. + /* This macro will set the req_complete to true so the MPI Test/Wait* functions + * on this request will be able to complete. As the status is marked as + * cancelled the cancel state will be detected. */ - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } + MCA_PML_BASE_REQUEST_MPI_COMPLETE(ompi_request); OPAL_THREAD_UNLOCK(&ompi_request_lock); return OMPI_SUCCESS; } @@ -95,7 +85,6 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request) { request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV; - request->req_recv.req_base.req_ompi.req_fini = mca_pml_dr_recv_request_fini; request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel; OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t); @@ -358,16 +347,7 @@ void mca_pml_dr_recv_request_progress( recvreq->req_bytes_received += bytes_received; recvreq->req_bytes_delivered += bytes_delivered; if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) { - - /* initialize request status */ - recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered; - recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } + MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq); } OPAL_THREAD_UNLOCK(&ompi_request_lock); } @@ -401,19 +381,8 @@ void mca_pml_dr_recv_request_matched_probe( break; } - /* check completion status */ - OPAL_THREAD_LOCK(&ompi_request_lock); - recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; - recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src; - recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; - recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_complete = true; - - ompi_request_completed++; - if(ompi_request_waiting) { - opal_condition_broadcast(&ompi_request_cond); - } - OPAL_THREAD_UNLOCK(&ompi_request_lock); + /* mark probe request completed */ + MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq); } /* diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.h b/ompi/mca/pml/dr/pml_dr_recvreq.h index e7a38dbe05..0ecfaf5084 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.h +++ b/ompi/mca/pml/dr/pml_dr_recvreq.h @@ -105,6 +105,37 @@ do { \ persistent); \ } while(0) +/** + * Mark a recv request complete. + * + * @param request (IN) Receive request. + */ + +#define MCA_PML_DR_RECV_REQUEST_PML_COMPLETE(recvreq) \ +do { \ + assert( false == recvreq->req_recv.req_base.req_pml_complete ); \ + \ + OPAL_THREAD_LOCK(&ompi_request_lock); \ + /* initialize request status */ \ + recvreq->req_recv.req_base.req_pml_complete = true; \ + recvreq->req_recv.req_base.req_ompi.req_status._count = \ + (recvreq->req_bytes_received < recvreq->req_bytes_delivered ? \ + recvreq->req_bytes_received : recvreq->req_bytes_delivered); \ + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \ + \ + if( true == recvreq->req_recv.req_base.req_free_called ) { \ + MCA_PML_DR_RECV_REQUEST_RETURN( recvreq ); \ + } else { \ + if(recvreq->req_recv.req_base.req_ompi.req_persistent) { \ + if( !recvreq->req_recv.req_base.req_free_called ) { \ + recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \ + } \ + } \ + } \ + OPAL_THREAD_UNLOCK(&ompi_request_lock); \ +} while(0) + + /** * Return a recv request to the modules free list. * @@ -133,7 +164,7 @@ do { * @param request (IN) Request to match. */ void mca_pml_dr_recv_request_match_wild(mca_pml_dr_recv_request_t* request); - + /** * Attempt to match the request against the unexpected fragment list * for a specific source rank. diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index 4756376c12..0fd6401de4 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -34,34 +34,26 @@ #include "ompi/mca/bml/base/base.h" -static int mca_pml_dr_send_request_fini(struct ompi_request_t** request) +/* + * The free call mark the final stage in a request life-cycle. Starting from this + * point the request is completed at both PML and user level, and can be used + * for others p2p communications. Therefore, in the case of the DR PML it should + * be added to the free request list. + */ +static inline int mca_pml_dr_send_request_free(struct ompi_request_t** request) { - mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)(request); - if(sendreq->req_send.req_base.req_persistent) { - if(sendreq->req_send.req_base.req_free_called) { - MCA_PML_DR_FREE(request); - } else { - sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; - /* rewind convertor */ - if(sendreq->req_send.req_bytes_packed) { - size_t offset = 0; - ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); - } - /* if buffered send - release any resources */ - if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && - sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); - } - } - } else { - MCA_PML_DR_FREE(request); - } - return OMPI_SUCCESS; -} + mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)request; -static int mca_pml_dr_send_request_free(struct ompi_request_t** request) -{ - MCA_PML_DR_FREE(request); + assert( false == sendreq->req_send.req_base.req_free_called ); + + OPAL_THREAD_LOCK(&ompi_request_lock); + sendreq->req_send.req_base.req_free_called = true; + if( true == sendreq->req_send.req_base.req_pml_complete ) { + MCA_PML_DR_SEND_REQUEST_RETURN( sendreq ); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + *request = MPI_REQUEST_NULL; return OMPI_SUCCESS; } @@ -73,7 +65,6 @@ static int mca_pml_dr_send_request_cancel(struct ompi_request_t* request, int co static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) { - OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t); OBJ_CONSTRUCT(&req->req_retrans, opal_list_t); @@ -84,10 +75,8 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) req->req_vfrag0.vf_mask_processed = 0; req->req_vfrag0.vf_send.pval = req; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; - req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel; - } static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.h b/ompi/mca/pml/dr/pml_dr_sendreq.h index 82dcfead10..9c63bb678d 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.h +++ b/ompi/mca/pml/dr/pml_dr_sendreq.h @@ -58,7 +58,6 @@ struct mca_pml_dr_send_request_t { mca_pml_dr_vfrag_t req_vfrag0; opal_list_t req_retrans; mca_btl_base_descriptor_t* descriptor; /* descriptor for first frag, retransmission */ - }; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; @@ -181,62 +180,66 @@ do { * Mark a send request as completed at the MPI level. */ -#define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \ -do { \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ - (sendreq)->req_send.req_base.req_comm->c_my_rank; \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \ - (sendreq)->req_send.req_base.req_tag; \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ - (sendreq)->req_send.req_base.req_ompi.req_status._count = \ - (sendreq)->req_send.req_bytes_packed; \ - (sendreq)->req_send.req_base.req_ompi.req_complete = true; \ - ompi_request_completed++; \ - if(ompi_request_waiting) { \ - opal_condition_broadcast(&ompi_request_cond); \ - } \ +#define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \ +do { \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ + (sendreq)->req_send.req_base.req_comm->c_my_rank; \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \ + (sendreq)->req_send.req_base.req_tag; \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ + (sendreq)->req_send.req_base.req_ompi.req_status._count = \ + (sendreq)->req_send.req_bytes_packed; \ + if( (sendreq)->req_send.req_base.req_ompi.req_persistent ) { \ + (sendreq)->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; \ + } \ + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \ } while(0) /* - * The PML has completed a send request. Note that this request - * may have been orphaned by the user or have already completed - * at the MPI level. + * The request fini is responsible for releasing all ressources at the PML + * level. It will never be called directly from the upper level, as it should + * only be an internal call to the PML. However, in the case when the user + * already lost the MPI reference to the request (MPI_Request_free was called) + * fini should completely free the MPI request. */ -#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ -do { \ - /* request completed at pml level */ \ - assert((sendreq)->req_send.req_base.req_pml_complete == false); \ - (sendreq)->req_send.req_base.req_pml_complete = true; \ - \ - /* user has already released the request so simply free it */ \ - if((sendreq)->req_send.req_base.req_free_called) { \ - /* if buffered send - release any resources */ \ - if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ - } \ - MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \ - /* is request complete at mpi level */ \ - } else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \ - MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ - /* buffered send - release any resources */ \ - } else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ - } \ -} while (0) +#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ +do { \ + assert( false == sendreq->req_send.req_base.req_pml_complete ); \ + if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ + sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \ + mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ + } \ + \ + OPAL_THREAD_LOCK(&ompi_request_lock); \ + if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \ + /* Should only be called for long messages (maybe synchronous) */ \ + MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ + } \ + sendreq->req_send.req_base.req_pml_complete = true; \ + \ + if( sendreq->req_send.req_base.req_free_called ) { \ + MCA_PML_DR_SEND_REQUEST_RETURN( sendreq ); \ + } else { \ + if(sendreq->req_send.req_base.req_ompi.req_persistent) { \ + /* rewind convertor */ \ + size_t offset = 0; \ + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); \ + } \ + } \ + OPAL_THREAD_UNLOCK(&ompi_request_lock); \ +} while (0) /* * Release resources associated with a request */ -#define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \ -do { \ - /* Let the base handle the reference counts */ \ - MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ - OMPI_FREE_LIST_RETURN( \ - &mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \ +#define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \ + do { \ + /* Let the base handle the reference counts */ \ + MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ + OMPI_FREE_LIST_RETURN( &mca_pml_dr.send_requests, \ + (opal_list_item_t*)sendreq ); \ } while(0) /* @@ -302,7 +305,6 @@ do { \ } \ } \ } while(0) - /* * Update bytes delivered on request based on supplied descriptor @@ -312,7 +314,6 @@ do { \ do { \ sendreq->req_bytes_delivered += vfrag->vf_size; \ } while(0) - /* * Attempt to process any pending requests */ diff --git a/ompi/mca/pml/ob1/pml_ob1_irecv.c b/ompi/mca/pml/ob1/pml_ob1_irecv.c index c5a9e7780c..04c9061188 100644 --- a/ompi/mca/pml/ob1/pml_ob1_irecv.c +++ b/ompi/mca/pml/ob1/pml_ob1_irecv.c @@ -117,7 +117,6 @@ finished: *status = recvreq->req_recv.req_base.req_ompi.req_status; } rc = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR; - MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq); + ompi_request_free( (ompi_request_t**)&recvreq ); return rc; } - diff --git a/ompi/mca/pml/ob1/pml_ob1_isend.c b/ompi/mca/pml/ob1/pml_ob1_isend.c index 46413d3846..0d406b312e 100644 --- a/ompi/mca/pml/ob1/pml_ob1_isend.c +++ b/ompi/mca/pml/ob1/pml_ob1_isend.c @@ -103,16 +103,14 @@ int mca_pml_ob1_send(void *buf, MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc); if (rc != OMPI_SUCCESS) { - MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); + MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); return rc; } if (sendreq->req_send.req_base.req_ompi.req_complete == false) { #if OMPI_ENABLE_PROGRESS_THREADS if(opal_progress_spin(&sendreq->req_send.req_base.req_ompi.req_complete)) { - opal_mutex_lock(&ompi_request_lock); - MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); - opal_mutex_unlock(&ompi_request_lock); + ompi_request_free( (ompi_request_t**)&sendreq ); return OMPI_SUCCESS; } #endif @@ -124,16 +122,16 @@ int mca_pml_ob1_send(void *buf, while (sendreq->req_send.req_base.req_ompi.req_complete == false) opal_condition_wait(&ompi_request_cond, &ompi_request_lock); ompi_request_waiting--; - MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); opal_mutex_unlock(&ompi_request_lock); } else { ompi_request_waiting++; while (sendreq->req_send.req_base.req_ompi.req_complete == false) opal_condition_wait(&ompi_request_cond, &ompi_request_lock); ompi_request_waiting--; - MCA_PML_OB1_SEND_REQUEST_FREE( sendreq ); } } - return OMPI_SUCCESS; + rc = sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR; + ompi_request_free( (ompi_request_t**)&sendreq ); + return rc; } diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 66edad4503..e61d5b8bba 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -68,7 +68,7 @@ void mca_pml_ob1_recv_frag_callback( if(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) { return; } - + /* hdr_type and hdr_flags are uint8_t, so no endian problems */ switch(hdr->hdr_common.hdr_type) { case MCA_PML_OB1_HDR_TYPE_MATCH: diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.h b/ompi/mca/pml/ob1/pml_ob1_recvfrag.h index 321cb546a4..2a1ebb4686 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.h +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.h @@ -58,7 +58,7 @@ do { \ } while(0) -#define MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr,segs,cnt,btl) \ +#define MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segs, cnt, btl ) \ do { \ size_t i; \ mca_btl_base_segment_t* macro_segments = frag->segments; \ diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index ef6014bbc9..bdbd4c6997 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -29,32 +29,25 @@ #include "pml_ob1_rdmafrag.h" #include "ompi/mca/bml/base/base.h" #include "orte/mca/errmgr/errmgr.h" -#include "ompi/datatype/dt_arch.h" +#include "ompi/datatype/dt_arch.h" static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc( mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc); -static int mca_pml_ob1_recv_request_fini(struct ompi_request_t** request) +static inline int mca_pml_ob1_recv_request_free(struct ompi_request_t** request) { mca_pml_ob1_recv_request_t* recvreq = *(mca_pml_ob1_recv_request_t**)request; - if(recvreq->req_recv.req_base.req_persistent) { - if(recvreq->req_recv.req_base.req_free_called) { - MCA_PML_OB1_RECV_REQUEST_FREE(recvreq); - *request = MPI_REQUEST_NULL; - } else { - recvreq->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; - } - } else { - MCA_PML_OB1_RECV_REQUEST_FREE(recvreq); - *request = MPI_REQUEST_NULL; - } - return OMPI_SUCCESS; -} -static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request) -{ - MCA_PML_OB1_RECV_REQUEST_FREE( *(mca_pml_ob1_recv_request_t**)request ); + assert( false == recvreq->req_recv.req_base.req_free_called ); + + OPAL_THREAD_LOCK(&ompi_request_lock); + recvreq->req_recv.req_base.req_free_called = true; + if( true == recvreq->req_recv.req_base.req_pml_complete ) { + MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq ); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + *request = MPI_REQUEST_NULL; return OMPI_SUCCESS; } @@ -94,9 +87,9 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request, static void mca_pml_ob1_recv_request_construct(mca_pml_ob1_recv_request_t* request) { request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV; - request->req_recv.req_base.req_ompi.req_fini = mca_pml_ob1_recv_request_fini; request->req_recv.req_base.req_ompi.req_free = mca_pml_ob1_recv_request_free; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_ob1_recv_request_cancel; + request->req_rdma_cnt = 0; } static void mca_pml_ob1_recv_request_destruct(mca_pml_ob1_recv_request_t* request) @@ -147,14 +140,7 @@ static void mca_pml_ob1_put_completion( /* check completion status */ if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { - /* initialize request status */ - recvreq->req_recv.req_base.req_pml_complete = true; - recvreq->req_recv.req_base.req_ompi.req_status._count = - (recvreq->req_bytes_received < recvreq->req_bytes_delivered ? - recvreq->req_bytes_received : recvreq->req_bytes_delivered); - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); @@ -303,14 +289,13 @@ retry: /** * Return resources used by the RDMA */ - + static void mca_pml_ob1_fin_completion( mca_btl_base_module_t* btl, struct mca_btl_base_endpoint_t* ep, struct mca_btl_base_descriptor_t* des, int status) { - mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata; mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) des->des_context; MCA_PML_OB1_RDMA_FRAG_RETURN(frag); @@ -335,18 +320,6 @@ static void mca_pml_ob1_rget_completion( mca_btl_base_descriptor_t *fin; int rc; - /* is receive request complete */ - if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length) - == recvreq->req_recv.req_bytes_packed ) { - recvreq->req_recv.req_base.req_ompi.req_status._count = - (recvreq->req_bytes_received < recvreq->req_bytes_delivered ? - recvreq->req_bytes_received : recvreq->req_bytes_delivered); - recvreq->req_recv.req_base.req_pml_complete = true; - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - } - /* return descriptor */ mca_bml_base_free(bml_btl, des); @@ -359,7 +332,7 @@ static void mca_pml_ob1_rget_completion( fin->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; fin->des_cbfunc = mca_pml_ob1_fin_completion; fin->des_cbdata = frag; - + /* fill in header */ hdr = (mca_pml_ob1_fin_hdr_t*)fin->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; @@ -378,13 +351,18 @@ static void mca_pml_ob1_rget_completion( MCA_PML_OB1_FIN_HDR_HTON(*hdr); } #endif -#endif +#endif + /* is receive request complete */ + if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length) + == recvreq->req_recv.req_bytes_packed ) { + MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); + } + /* queue request */ - rc = mca_bml_base_send( - bml_btl, - fin, - MCA_BTL_TAG_PML - ); + rc = mca_bml_base_send( bml_btl, + fin, + MCA_BTL_TAG_PML + ); if(OMPI_SUCCESS != rc) { opal_output(0, "[%s:%d] unable to queue fin", __FILE__,__LINE__); orte_errmgr.abort(); @@ -549,14 +527,7 @@ void mca_pml_ob1_recv_request_progress( /* check completion status */ if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { - /* initialize request status */ - recvreq->req_recv.req_base.req_ompi.req_status._count = - (recvreq->req_bytes_received < recvreq->req_bytes_delivered ? - recvreq->req_bytes_received : recvreq->req_bytes_delivered); - recvreq->req_recv.req_base.req_pml_complete = true; - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); @@ -595,11 +566,9 @@ void mca_pml_ob1_recv_request_matched_probe( /* set completion status */ recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag; recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src; - recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed; - OPAL_THREAD_LOCK(&ompi_request_lock); - recvreq->req_recv.req_base.req_pml_complete = true; - MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); + recvreq->req_bytes_received = bytes_packed; + recvreq->req_bytes_delivered = bytes_packed; + MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); } diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.h b/ompi/mca/pml/ob1/pml_ob1_recvreq.h index 78fabc57da..c7b8ab28cf 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.h @@ -81,26 +81,23 @@ do { \ * @param comm (IN) Communicator. * @param persistent (IN) Is this a ersistent request. */ -#define MCA_PML_OB1_RECV_REQUEST_INIT( \ - request, \ - addr, \ - count, \ - datatype, \ - src, \ - tag, \ - comm, \ - persistent) \ -do { \ - (request)->req_rdma_cnt = 0; \ - MCA_PML_BASE_RECV_REQUEST_INIT( \ - &(request)->req_recv, \ - addr, \ - count, \ - datatype, \ - src, \ - tag, \ - comm, \ - persistent); \ +#define MCA_PML_OB1_RECV_REQUEST_INIT( request, \ + addr, \ + count, \ + datatype, \ + src, \ + tag, \ + comm, \ + persistent) \ +do { \ + MCA_PML_BASE_RECV_REQUEST_INIT( &(request)->req_recv, \ + addr, \ + count, \ + datatype, \ + src, \ + tag, \ + comm, \ + persistent); \ } while(0) /** @@ -108,29 +105,43 @@ do { \ * * @param request (IN) Receive request. */ -#define MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq) \ -do { \ - size_t r; \ - for(r=0; rreq_rdma_cnt; r++) { \ - mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[r].btl_reg; \ - if(NULL != btl_reg) { \ - btl_reg->mpool->mpool_release(btl_reg->mpool, btl_reg); \ - } \ - } \ - MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ - OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (opal_list_item_t*)(recvreq)); \ +#define MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE(recvreq) \ +do { \ + size_t r; \ + \ + assert( false == recvreq->req_recv.req_base.req_pml_complete ); \ + \ + for( r = 0; r < recvreq->req_rdma_cnt; r++ ) { \ + mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[r].btl_reg; \ + if( NULL != btl_reg ) { \ + btl_reg->mpool->mpool_release( btl_reg->mpool, btl_reg ); \ + } \ + } \ + recvreq->req_rdma_cnt = 0; \ + \ + OPAL_THREAD_LOCK(&ompi_request_lock); \ + \ + if( true == recvreq->req_recv.req_base.req_free_called ) { \ + MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq ); \ + } else { \ + /* initialize request status */ \ + recvreq->req_recv.req_base.req_pml_complete = true; \ + recvreq->req_recv.req_base.req_ompi.req_status._count = \ + (recvreq->req_bytes_received < recvreq->req_bytes_delivered ? \ + recvreq->req_bytes_received : recvreq->req_bytes_delivered); \ + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \ + } \ + OPAL_THREAD_UNLOCK(&ompi_request_lock); \ } while(0) /* * Free the PML receive request */ -#define MCA_PML_OB1_RECV_REQUEST_FREE(recvreq) \ +#define MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq) \ { \ - mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)(recvreq); \ - pml_request->req_free_called = true; \ - if( pml_request->req_pml_complete == true) { \ - MCA_PML_OB1_RECV_REQUEST_RETURN((recvreq)); \ - } \ + MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ + OMPI_FREE_LIST_RETURN( &mca_pml_ob1.recv_requests, \ + (opal_list_item_t*)(recvreq)); \ } /** @@ -140,7 +151,7 @@ do { * @param request (IN) Request to match. */ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request); - + /** * Attempt to match the request against the unexpected fragment list * for a specific source rank. @@ -166,7 +177,6 @@ do { (request)->req_bytes_delivered = 0; \ (request)->req_lock = 0; \ (request)->req_pipeline_depth = 0; \ - (request)->req_rdma_cnt = 0; \ (request)->req_rdma_idx = 0; \ (request)->req_recv.req_base.req_pml_complete = false; \ (request)->req_recv.req_base.req_ompi.req_complete = false; \ diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index d4e9c401a2..c83a8cf96f 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -2,7 +2,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University + * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -35,41 +35,25 @@ #include "ompi/mca/bml/base/base.h" #include "ompi/datatype/dt_arch.h" - -static int mca_pml_ob1_send_request_fini(struct ompi_request_t** request) +/* + * The free call mark the final stage in a request life-cycle. Starting from this + * point the request is completed at both PML and user level, and can be used + * for others p2p communications. Therefore, in the case of the OB1 PML it should + * be added to the free request list. + */ +static inline int mca_pml_ob1_send_request_free(struct ompi_request_t** request) { - mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)(request); + mca_pml_ob1_send_request_t* sendreq = *(mca_pml_ob1_send_request_t**)request; + + assert( false == sendreq->req_send.req_base.req_free_called ); + OPAL_THREAD_LOCK(&ompi_request_lock); - if(sendreq->req_send.req_base.req_persistent) { - if(sendreq->req_send.req_base.req_free_called) { - MCA_PML_OB1_SEND_REQUEST_FREE(sendreq); - *request = MPI_REQUEST_NULL; - } else { - sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_INACTIVE; - /* rewind convertor */ - if(sendreq->req_send.req_bytes_packed) { - size_t offset = 0; - ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); - } - /* if buffered send - release any resources */ - if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && - sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); - } - } - } else { - MCA_PML_OB1_SEND_REQUEST_FREE(sendreq); - *request = MPI_REQUEST_NULL; + sendreq->req_send.req_base.req_free_called = true; + if( true == sendreq->req_send.req_base.req_pml_complete ) { + MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); } OPAL_THREAD_UNLOCK(&ompi_request_lock); - return OMPI_SUCCESS; -} -static int mca_pml_ob1_send_request_free(struct ompi_request_t** request) -{ - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_OB1_SEND_REQUEST_FREE( *(mca_pml_ob1_send_request_t**)request ); - OPAL_THREAD_UNLOCK(&ompi_request_lock); *request = MPI_REQUEST_NULL; return OMPI_SUCCESS; } @@ -83,9 +67,9 @@ static int mca_pml_ob1_send_request_cancel(struct ompi_request_t* request, int c static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req) { req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; - req->req_send.req_base.req_ompi.req_fini = mca_pml_ob1_send_request_fini; req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; + req->req_rdma_cnt = 0; } static void mca_pml_ob1_send_request_destruct(mca_pml_ob1_send_request_t* req) @@ -100,7 +84,8 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_send_request_destruct); /** - * Completion of a short message - nothing left to schedule. + * Completion of a short message - nothing left to schedule. Note that this + * function is only called for 0 sized messages. */ void mca_pml_ob1_match_completion_cache( @@ -123,9 +108,7 @@ void mca_pml_ob1_match_completion_cache( MCA_BML_BASE_BTL_DES_RETURN( bml_btl, descriptor ); /* signal request completion */ - OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); } /** @@ -152,9 +135,7 @@ void mca_pml_ob1_match_completion_free( mca_bml_base_free( bml_btl, descriptor ); /* signal request completion */ - OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); } /* @@ -215,9 +196,7 @@ static void mca_pml_ob1_rget_completion( 0, req_bytes_delivered ); if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered ) == sendreq->req_send.req_bytes_packed ) { - OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); } /* release resources */ @@ -273,9 +252,7 @@ static void mca_pml_ob1_frag_completion( req_bytes_delivered == sendreq->req_send.req_bytes_packed) { /*if( OPAL_THREAD_ADD_SIZE_T( &sendreq->req_bytes_delivered, req_bytes_delivered ) == sendreq->req_send.req_bytes_packed) {*/ - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); + MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); } else { mca_pml_ob1_send_request_schedule(sendreq); } @@ -389,7 +366,10 @@ int mca_pml_ob1_send_request_start_buffered( sendreq->req_send.req_datatype, sendreq->req_send.req_count, sendreq->req_send.req_addr); - +#if 0 + ompi_convertor_set_position( &sendreq->req_send.req_convertor, + &sendreq->req_send_offset ); +#endif /* request is complete at mpi level */ OPAL_THREAD_LOCK(&ompi_request_lock); MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); @@ -406,7 +386,8 @@ int mca_pml_ob1_send_request_start_buffered( /** * BTL requires "specially" allocated memory. Request a segment that - * is used for initial hdr and any eager data. + * is used for initial hdr and any eager data. This is used only from + * the _START macro. */ int mca_pml_ob1_send_request_start_copy( @@ -495,7 +476,7 @@ int mca_pml_ob1_send_request_start_copy( /** * BTL can send directly from user buffer so allow the BTL - * to prepare the segment list. + * to prepare the segment list. Start sending a small message. */ int mca_pml_ob1_send_request_start_prepare( @@ -510,7 +491,7 @@ int mca_pml_ob1_send_request_start_prepare( /* prepare descriptor */ mca_bml_base_prepare_src( - bml_btl, + bml_btl, NULL, &sendreq->req_send.req_convertor, sizeof(mca_pml_ob1_match_hdr_t), @@ -518,7 +499,7 @@ int mca_pml_ob1_send_request_start_prepare( &descriptor); if(NULL == descriptor) { return OMPI_ERR_OUT_OF_RESOURCE; - } + } segment = descriptor->des_src; /* build match header */ @@ -546,7 +527,7 @@ int mca_pml_ob1_send_request_start_prepare( /* short message */ descriptor->des_cbfunc = mca_pml_ob1_match_completion_free; - + /* update lengths */ sendreq->req_send_offset = size; sendreq->req_rdma_offset = size; @@ -816,13 +797,12 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq) if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) { size = bytes_remaining; - - /* otherwise attempt to give the BTL a percentage of the message - * based on a weighting factor. for simplicity calculate this as - * a percentage of the overall message length (regardless of amount - * previously assigned) - */ } else { + /* otherwise attempt to give the BTL a percentage of the message + * based on a weighting factor. for simplicity calculate this as + * a percentage of the overall message length (regardless of amount + * previously assigned) + */ size = (size_t)(bml_btl->btl_weight * bytes_remaining); } @@ -830,7 +810,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq) if (bml_btl->btl_max_send_size != 0 && size > bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) { size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t); - +#if defined(GEORGE_HAVE_TO_MAKE_SURE_THAT_WE_DONT_NEED_IT) /* very expensive - however for buffered sends we need to send on a * boundary that the receiver will be able to unpack completely * using the native datatype @@ -854,13 +834,13 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq) OBJ_DESTRUCT( &convertor ); size = position - sendreq->req_send_offset; } +#endif } - /* pack into a descriptor */ ompi_convertor_set_position(&sendreq->req_send.req_convertor, &sendreq->req_send_offset); - + mca_bml_base_prepare_src( bml_btl, NULL, @@ -971,14 +951,6 @@ static void mca_pml_ob1_put_completion( orte_errmgr.abort(); } - /* check for request completion */ - if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length) - >= sendreq->req_send.req_bytes_packed) { - OPAL_THREAD_LOCK(&ompi_request_lock); - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); - OPAL_THREAD_UNLOCK(&ompi_request_lock); - } - /* allocate descriptor for fin control message - note that * the rdma descriptor cannot be reused as it points directly * at the user buffer @@ -1033,6 +1005,13 @@ static void mca_pml_ob1_put_completion( ORTE_ERROR_LOG(rc); orte_errmgr.abort(); } + goto cleanup; + } + + /* check for request completion */ + if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length) + >= sendreq->req_send.req_bytes_packed) { + MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); } cleanup: diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.h b/ompi/mca/pml/ob1/pml_ob1_sendreq.h index 0f813a1548..35841d4ed2 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.h @@ -113,13 +113,13 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); #define MCA_PML_OB1_SEND_REQUEST_START_MATCH_FIXUP(sendreq, hdr) \ hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT -#define MCA_PML_OB1_SEND_REQUEST_START_MATCH_FIXUP(sendreq, hdr) \ - do { \ - if (sendreq->req_send.req_base.req_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { \ - hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO; \ - MCA_PML_OB1_MATCH_HDR_HTON(hdr->hdr_match); \ - } \ - } while (0) +#define MCA_PML_OB1_SEND_REQUEST_START_MATCH_FIXUP(sendreq, hdr) \ +do { \ + if (sendreq->req_send.req_base.req_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { \ + hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO; \ + MCA_PML_OB1_MATCH_HDR_HTON(hdr->hdr_match); \ + } \ +} while (0) #else #define MCA_PML_OB1_SEND_REQUEST_START_MATCH_FIXUP(sendreq, hdr) #endif @@ -128,8 +128,8 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); #define MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc) \ do { \ mca_pml_ob1_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ - mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*) \ - sendreq->req_send.req_base.req_proc->proc_pml; \ + mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*) \ + sendreq->req_send.req_base.req_proc->proc_pml; \ mca_bml_base_btl_t* bml_btl; \ size_t size = sendreq->req_send.req_bytes_packed; \ size_t eager_limit; \ @@ -142,7 +142,6 @@ do { sendreq->req_lock = 0; \ sendreq->req_pipeline_depth = 0; \ sendreq->req_bytes_delivered = 0; \ - sendreq->req_rdma_cnt = 0; \ sendreq->req_state = 0; \ sendreq->req_send_offset = 0; \ sendreq->req_send.req_base.req_pml_complete = false; \ @@ -242,55 +241,63 @@ do { * Mark a send request as completed at the MPI level. */ -#define MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq) \ -do { \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ - (sendreq)->req_send.req_base.req_comm->c_my_rank; \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \ - (sendreq)->req_send.req_base.req_tag; \ - (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ - (sendreq)->req_send.req_base.req_ompi.req_status._count = \ - (sendreq)->req_send.req_bytes_packed; \ - MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \ +#define MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq) \ +do { \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ + (sendreq)->req_send.req_base.req_comm->c_my_rank; \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \ + (sendreq)->req_send.req_base.req_tag; \ + (sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \ + (sendreq)->req_send.req_base.req_ompi.req_status._count = \ + (sendreq)->req_send.req_bytes_packed; \ + MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \ } while(0) /* * The PML has completed a send request. Note that this request * may have been orphaned by the user or have already completed * at the MPI level. + * This macro will never be called directly from the upper level, as it should + * only be an internal call to the PML. */ -#define MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq) \ -do { \ - size_t r; \ - /* request completed at pml level */ \ - assert( false == (sendreq)->req_send.req_base.req_pml_complete ); \ - (sendreq)->req_send.req_base.req_pml_complete = true; \ - \ - /* return mpool resources */ \ - for(r=0; rreq_rdma_cnt; r++) { \ - mca_mpool_base_registration_t* reg = sendreq->req_rdma[r].btl_reg; \ - if( NULL != reg ) { \ - reg->mpool->mpool_release(reg->mpool, reg); \ - } \ - } \ - sendreq->req_rdma_cnt = 0; \ - \ - /* user has already released the request so simply free it */ \ - if((sendreq)->req_send.req_base.req_free_called) { \ - /* if buffered send - release any resources */ \ - if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ - } \ - MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \ - /* is request complete at mpi level */ \ - } else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \ - MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); \ - /* buffered send - release any resources */ \ - } else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ - } \ + +#define MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq) \ + do { \ + size_t r; \ + \ + assert( false == sendreq->req_send.req_base.req_pml_complete ); \ + \ + /* return mpool resources */ \ + for( r = 0; r < sendreq->req_rdma_cnt; r++ ) { \ + mca_mpool_base_registration_t* reg = sendreq->req_rdma[r].btl_reg; \ + if( NULL != reg ) { \ + reg->mpool->mpool_release(reg->mpool, reg); \ + } \ + } \ + sendreq->req_rdma_cnt = 0; \ + \ + if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ + sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \ + mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ + } \ + \ + OPAL_THREAD_LOCK(&ompi_request_lock); \ + if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \ + /* Should only be called for long messages (maybe synchronous) */ \ + MCA_PML_OB1_SEND_REQUEST_MPI_COMPLETE(sendreq); \ + } \ + sendreq->req_send.req_base.req_pml_complete = true; \ + \ + if( sendreq->req_send.req_base.req_free_called ) { \ + MCA_PML_OB1_SEND_REQUEST_RETURN( sendreq ); \ + } else { \ + if(sendreq->req_send.req_base.req_ompi.req_persistent) { \ + /* rewind convertor */ \ + size_t offset = 0; \ + ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); \ + } \ + } \ + OPAL_THREAD_UNLOCK(&ompi_request_lock); \ } while (0) @@ -300,19 +307,17 @@ do { * However, these events may occur in either order. */ -#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq) \ -do { \ - /* has an acknowledgment been received */ \ - if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ - if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ - OPAL_THREAD_LOCK(&ompi_request_lock); \ - MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); \ - OPAL_THREAD_UNLOCK(&ompi_request_lock); \ - } else { \ - /* additional data to schedule */ \ - mca_pml_ob1_send_request_schedule(sendreq); \ - } \ - } \ +#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq) \ +do { \ + /* has an acknowledgment been received */ \ + if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ + if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ + MCA_PML_OB1_SEND_REQUEST_PML_COMPLETE(sendreq); \ + } else { \ + /* additional data to schedule */ \ + mca_pml_ob1_send_request_schedule(sendreq); \ + } \ + } \ } while (0) /* @@ -327,22 +332,6 @@ do { &mca_pml_ob1.send_requests, (opal_list_item_t*)sendreq); \ } -/* - * Free a send request - */ -#define MCA_PML_OB1_SEND_REQUEST_FREE(sendreq) \ -{ \ - mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)(sendreq); \ - pml_request->req_free_called = true; \ - if( pml_request->req_pml_complete == true) { \ - if((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ - (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ - mca_pml_base_bsend_request_fini((ompi_request_t*)(sendreq)); \ - } \ - MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \ - } \ -} - /* * Attempt to process any pending requests */ diff --git a/ompi/request/grequest.c b/ompi/request/grequest.c index 9d969b5e02..252099610d 100644 --- a/ompi/request/grequest.c +++ b/ompi/request/grequest.c @@ -44,14 +44,12 @@ static int ompi_grequest_cancel(ompi_request_t* req, int flag) static void ompi_grequest_construct(ompi_grequest_t* greq) { - OMPI_REQUEST_INIT(&greq->greq_base); - greq->greq_base.req_fini = ompi_grequest_free; + OMPI_REQUEST_INIT(&greq->greq_base, false); greq->greq_base.req_free = ompi_grequest_free; greq->greq_base.req_cancel = ompi_grequest_cancel; greq->greq_base.req_type = OMPI_REQUEST_GEN; } - - + static void ompi_grequest_destruct(ompi_grequest_t* greq) { OMPI_REQUEST_FINI(&greq->greq_base); diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index ea810c6671..ddddcb6516 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -5,14 +5,14 @@ * Copyright (c) 2004-2005 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ @@ -37,18 +37,21 @@ int ompi_request_test_any( rptr = requests; for (i = 0; i < count; i++, rptr++) { request = *rptr; - if (request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE) { + if( request->req_state == OMPI_REQUEST_INACTIVE ) { num_requests_null_inactive++; continue; } - if (request->req_complete) { + if( request->req_complete ) { *index = i; *completed = true; if (MPI_STATUS_IGNORE != status) { *status = request->req_status; } - return request->req_fini(rptr); + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + return OMPI_SUCCESS; + } + return ompi_request_free(rptr); } } @@ -84,8 +87,7 @@ int ompi_request_test_all( rptr = requests; for (i = 0; i < count; i++, rptr++) { request = *rptr; - if (request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE || + if( request->req_state == OMPI_REQUEST_INACTIVE || request->req_complete) { num_completed++; } @@ -99,34 +101,41 @@ int ompi_request_test_all( return OMPI_SUCCESS; } + rptr = requests; *completed = true; if (MPI_STATUSES_IGNORE != statuses) { /* fill out completion status and free request if required */ - rptr = requests; - for (i = 0; i < count; i++) { + for( i = 0; i < count; i++, rptr++ ) { int rc; request = *rptr; - if(request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE) { + if( request->req_state == OMPI_REQUEST_INACTIVE ) { statuses[i] = ompi_status_empty; - } else { - statuses[i] = request->req_status; + continue; } - rc = request->req_fini(rptr); + statuses[i] = request->req_status; + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + continue; + } + rc = ompi_request_free(rptr); if(rc != OMPI_SUCCESS) - return rc; - rptr++; + return rc; } } else { /* free request if required */ - rptr = requests; - for (i = 0; i < count; i++) { + for( i = 0; i < count; i++, rptr++ ) { int rc; request = *rptr; - rc = request->req_fini(rptr); + if( request->req_state == OMPI_REQUEST_INACTIVE) { + continue; + } + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + continue; + } + rc = ompi_request_free(rptr); if(rc != OMPI_SUCCESS) return rc; - rptr++; } } return OMPI_SUCCESS; diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 917dcd0142..9815bb04f8 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -2,7 +2,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University + * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -26,8 +26,9 @@ int ompi_request_wait( ompi_status_public_t * status) { ompi_request_t *req = *req_ptr; + if(req->req_complete == false) { - + #if OMPI_ENABLE_PROGRESS_THREADS /* poll for completion */ if(opal_progress_spin(&req->req_complete)) @@ -43,18 +44,25 @@ int ompi_request_wait( ompi_request_waiting--; OPAL_THREAD_UNLOCK(&ompi_request_lock); } - + #if OMPI_ENABLE_PROGRESS_THREADS finished: #endif - + /* return status */ - if (MPI_STATUS_IGNORE != status) { + if( MPI_STATUS_IGNORE != status ) { *status = req->req_status; } - + if( req->req_state == OMPI_REQUEST_INACTIVE ) { + return OMPI_SUCCESS; + } + if( req->req_persistent ) { + req->req_state = OMPI_REQUEST_INACTIVE; + return OMPI_SUCCESS; + } + /* return request to pool */ - return req->req_fini(req_ptr); + return ompi_request_free(req_ptr); } @@ -144,13 +152,17 @@ finished: *status = ompi_status_empty; } } else { - + assert( true == request->req_complete ); /* return status */ if (MPI_STATUS_IGNORE != status) { *status = request->req_status; } - /* return request to pool */ - rc = request->req_fini(rptr); + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + } else { + /* return request to pool */ + rc = ompi_request_free(rptr); + } *index = completed; } return rc; @@ -165,6 +177,7 @@ int ompi_request_wait_all( size_t completed = 0, i; ompi_request_t **rptr; ompi_request_t *request; + int mpi_error = OMPI_SUCCESS; rptr = requests; for (i = 0; i < count; i++) { @@ -205,7 +218,7 @@ int ompi_request_wait_all( size_t start = ompi_request_completed; size_t pending = count - completed; /* - * wait until at least pending requests complete + * wait until at least pending requests complete */ while (pending > ompi_request_completed - start) { opal_condition_wait(&ompi_request_cond, &ompi_request_lock); @@ -225,38 +238,48 @@ int ompi_request_wait_all( OPAL_THREAD_UNLOCK(&ompi_request_lock); } + rptr = requests; if (MPI_STATUSES_IGNORE != statuses) { /* fill out status and free request if required */ - rptr = requests; - for (i = 0; i < count; i++) { - int rc; + for( i = 0; i < count; i++, rptr++ ) { request = *rptr; - if(request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE) { + assert( true == request->req_complete ); + if( request->req_state == OMPI_REQUEST_INACTIVE ) { statuses[i] = ompi_status_empty; } else { statuses[i] = request->req_status; - rc = request->req_fini(rptr); - if (rc != OMPI_SUCCESS) - return rc; } - rptr++; + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + } else { + (void)ompi_request_free(rptr); + } + if( statuses[i].MPI_ERROR != OMPI_SUCCESS) { + mpi_error = OMPI_ERR_IN_ERRNO; + } } } else { /* free request if required */ - rptr = requests; - for (i = 0; i < count; i++, rptr++) { + for( i = 0; i < count; i++, rptr++ ) { int rc; request = *rptr; - if(request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE) { - continue; + + assert( true == request->req_complete ); + if( request->req_state == OMPI_REQUEST_INACTIVE ) { + rc = ompi_status_empty.MPI_ERROR; + } else { + rc = request->req_status.MPI_ERROR; } - rc = request->req_fini(rptr); - if (rc != OMPI_SUCCESS) - return rc; - } + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + } else { + (void)ompi_request_free(rptr); + } + if( rc != OMPI_SUCCESS) { + mpi_error = rc; + } + } } - return OMPI_SUCCESS; + return mpi_error; } diff --git a/ompi/request/request.c b/ompi/request/request.c index 0f5dd6fd83..702bc59368 100644 --- a/ompi/request/request.c +++ b/ompi/request/request.c @@ -34,8 +34,7 @@ ompi_status_public_t ompi_status_empty; static void ompi_request_construct(ompi_request_t* req) { - OMPI_REQUEST_INIT(req); - req->req_fini = NULL; + OMPI_REQUEST_INIT(req, false); req->req_free = NULL; req->req_cancel = NULL; req->req_f_to_c_index = MPI_UNDEFINED; @@ -43,7 +42,8 @@ static void ompi_request_construct(ompi_request_t* req) static void ompi_request_destruct(ompi_request_t* req) { - OMPI_REQUEST_FINI(req); + assert( MPI_UNDEFINED == req->req_f_to_c_index ); + assert( OMPI_REQUEST_INVALID == req->req_state ); } static int ompi_request_null_free(ompi_request_t** request) @@ -87,7 +87,6 @@ int ompi_request_init(void) ompi_request_null.req_state = OMPI_REQUEST_INACTIVE; ompi_request_null.req_complete = true; ompi_request_null.req_type = OMPI_REQUEST_NULL; - ompi_request_null.req_fini = ompi_request_null_free; ompi_request_null.req_free = ompi_request_null_free; ompi_request_null.req_cancel = ompi_request_null_cancel; ompi_request_null.req_f_to_c_index = @@ -104,7 +103,7 @@ int ompi_request_init(void) * The main difference to ompi_request_null is * req_state being OMPI_REQUEST_ACTIVE, so that MPI_Waitall * does not set the status to ompi_status_empty and the different - * req_fini and req_free function, which resets the + * req_free function, which resets the * request to MPI_REQUEST_NULL. * The req_cancel function need not be changed. */ @@ -117,7 +116,6 @@ int ompi_request_init(void) ompi_request_empty.req_state = OMPI_REQUEST_ACTIVE; ompi_request_empty.req_complete = true; ompi_request_empty.req_type = OMPI_REQUEST_NULL; - ompi_request_empty.req_fini = ompi_request_empty_free; ompi_request_empty.req_free = ompi_request_empty_free; ompi_request_empty.req_cancel = ompi_request_null_cancel; ompi_request_empty.req_f_to_c_index = @@ -139,10 +137,13 @@ int ompi_request_init(void) int ompi_request_finalize(void) { - OBJ_DESTRUCT(&ompi_request_null); - OBJ_DESTRUCT(&ompi_request_cond); - OBJ_DESTRUCT(&ompi_request_lock); - OBJ_DESTRUCT(&ompi_request_f_to_c_table); + OMPI_REQUEST_FINI( &ompi_request_null ); + OBJ_DESTRUCT( &ompi_request_null ); + OMPI_REQUEST_FINI( &ompi_request_empty ); + OBJ_DESTRUCT( &ompi_request_empty ); + OBJ_DESTRUCT( &ompi_request_cond ); + OBJ_DESTRUCT( &ompi_request_lock ); + OBJ_DESTRUCT( &ompi_request_f_to_c_table ); return OMPI_SUCCESS; } diff --git a/ompi/request/request.h b/ompi/request/request.h index 8558f882d3..36f2bd064d 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -29,7 +29,7 @@ #include "ompi/class/ompi_pointer_array.h" #include "ompi/errhandler/errhandler.h" #include "opal/threads/condition.h" - + #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif @@ -88,8 +88,8 @@ struct ompi_request_t { ompi_status_public_t req_status; /**< Completion status */ volatile bool req_complete; /**< Flag indicating wether request has completed */ volatile ompi_request_state_t req_state; /**< enum indicate state of the request */ + bool req_persistent; /**< flag indicating if the this is a persistent request */ int req_f_to_c_index; /**< Index in Fortran <-> C translation array */ - ompi_request_free_fn_t req_fini; /**< Called by test/wait */ ompi_request_free_fn_t req_free; /**< Called by free */ ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */ }; @@ -106,11 +106,11 @@ typedef struct ompi_request_t ompi_request_t; * performance path (since requests may be re-used, it is possible * that we will have to initialize a request multiple times). */ -#define OMPI_REQUEST_INIT(request) \ - do { \ +#define OMPI_REQUEST_INIT(request, persistent) \ + do { \ (request)->req_state = OMPI_REQUEST_INACTIVE; \ - (request)->req_complete = false; \ - (request)->req_f_to_c_index = MPI_UNDEFINED; \ + (request)->req_complete = false; \ + (request)->req_persistent = (persistent); \ } while (0); /** @@ -122,17 +122,21 @@ typedef struct ompi_request_t ompi_request_t; * When finalizing a request, if MPI_Request_f2c() was previously * invoked on that request, then this request was added to the f2c * table, and we need to remove it + * + * This function should be called only from the MPI layer. It should + * never be called from the PML. It take care of the upper level clean-up. + * When the user call MPI_Request_free we should release all MPI level + * ressources, so we have to call this function too. */ - -#define OMPI_REQUEST_FINI(request) \ - do { \ - (request)->req_state = OMPI_REQUEST_INVALID; \ - if (MPI_UNDEFINED != (request)->req_f_to_c_index) { \ - ompi_pointer_array_set_item(&ompi_request_f_to_c_table, \ - (request)->req_f_to_c_index, NULL); \ - (request)->req_f_to_c_index = MPI_UNDEFINED; \ - } \ - } while (0); +#define OMPI_REQUEST_FINI(request) \ +do { \ + (request)->req_state = OMPI_REQUEST_INVALID; \ + if (MPI_UNDEFINED != (request)->req_f_to_c_index) { \ + ompi_pointer_array_set_item(&ompi_request_f_to_c_table, \ + (request)->req_f_to_c_index, NULL); \ + (request)->req_f_to_c_index = MPI_UNDEFINED; \ + } \ +} while (0); /** * Globals used for tracking requests and request completion. @@ -205,34 +209,35 @@ static inline int ompi_request_free(ompi_request_t** request) */ -static inline int ompi_request_test( - ompi_request_t ** rptr, - int *completed, - ompi_status_public_t * status) +static inline int ompi_request_test( ompi_request_t ** rptr, + int *completed, + ompi_status_public_t * status ) { ompi_request_t *request = *rptr; opal_atomic_mb(); - if (request == MPI_REQUEST_NULL || - request->req_state == OMPI_REQUEST_INACTIVE) { + if( request->req_state == OMPI_REQUEST_INACTIVE ) { *completed = true; if (MPI_STATUS_IGNORE != status) { *status = ompi_status_empty; } return OMPI_SUCCESS; } - else if (request->req_complete) { + if (request->req_complete) { *completed = true; if (MPI_STATUS_IGNORE != status) { *status = request->req_status; } - return request->req_fini(rptr); - } else { - *completed = false; -#if OMPI_ENABLE_PROGRESS_THREADS == 0 - opal_progress(); -#endif - return OMPI_SUCCESS; + if( request->req_persistent ) { + request->req_state = OMPI_REQUEST_INACTIVE; + return OMPI_SUCCESS; + } + return ompi_request_free(rptr); } + *completed = false; +#if OMPI_ENABLE_PROGRESS_THREADS == 0 + opal_progress(); +#endif + return OMPI_SUCCESS; } /** @@ -249,13 +254,13 @@ static inline int ompi_request_test( * request handle at index set to NULL. */ -int ompi_request_test_any( +OMPI_DECLSPEC int ompi_request_test_any( size_t count, ompi_request_t ** requests, int *index, int *completed, ompi_status_public_t * status); - + /** * Non-blocking test for request completion. *