/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2010 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2007 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008 UT-Battelle, LLC. All rights reserved. * Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file */ #ifndef OMPI_PML_BFO_RECV_REQUEST_H #define OMPI_PML_BFO_RECV_REQUEST_H #include "pml_bfo.h" #include "pml_bfo_rdma.h" #include "pml_bfo_rdmafrag.h" #include "ompi/proc/proc.h" #include "ompi/mca/pml/bfo/pml_bfo_comm.h" #include "ompi/mca/mpool/base/base.h" #include "ompi/mca/pml/base/pml_base_recvreq.h" #if PML_BFO #define RECVREQ_RECVERRSENT 0x01 #define RECVREQ_RNDVRESTART_RECVED 0x02 #define RECVREQ_RNDVRESTART_ACKED 0x04 #endif /* PML_BFO */ BEGIN_C_DECLS struct mca_pml_bfo_recv_request_t { mca_pml_base_recv_request_t req_recv; ompi_ptr_t remote_req_send; #if PML_BFO int32_t req_msgseq; /* PML sequence number */ int32_t req_events; /* number of outstanding events on request */ int32_t req_restartseq; /* sequence number of restarted request */ int32_t req_errstate; /* state of request if in error */ #endif /* PML_BFO */ int32_t req_lock; size_t req_pipeline_depth; size_t req_bytes_received; /**< amount of data transferred into the user buffer */ size_t req_bytes_expected; /**< local size of the data as suggested by the user */ size_t req_rdma_offset; size_t req_send_offset; uint32_t req_rdma_cnt; uint32_t req_rdma_idx; bool req_pending; bool req_ack_sent; /**< whether ack was sent to the sender */ bool req_match_received; /**< Prevent request to be completed prematurely */ opal_mutex_t lock; mca_pml_bfo_com_btl_t req_rdma[1]; }; typedef struct mca_pml_bfo_recv_request_t mca_pml_bfo_recv_request_t; OBJ_CLASS_DECLARATION(mca_pml_bfo_recv_request_t); static inline bool lock_recv_request(mca_pml_bfo_recv_request_t *recvreq) { return OPAL_THREAD_ADD32(&recvreq->req_lock, 1) == 1; } static inline bool unlock_recv_request(mca_pml_bfo_recv_request_t *recvreq) { return OPAL_THREAD_ADD32(&recvreq->req_lock, -1) == 0; } /** * Allocate a recv request from the modules free list. * * @param rc (OUT) OMPI_SUCCESS or error status on failure. * @return Receive request. */ #define MCA_PML_BFO_RECV_REQUEST_ALLOC(recvreq, rc) \ do { \ ompi_free_list_item_t* item; \ rc = OMPI_SUCCESS; \ OMPI_FREE_LIST_GET(&mca_pml_base_recv_requests, item, rc); \ recvreq = (mca_pml_bfo_recv_request_t*)item; \ } while(0) /** * Initialize a receive request with call parameters. * * @param request (IN) Receive request. * @param addr (IN) User buffer. * @param count (IN) Number of elements of indicated datatype. * @param datatype (IN) User defined datatype. * @param src (IN) Source rank w/in the communicator. * @param tag (IN) User defined tag. * @param comm (IN) Communicator. * @param persistent (IN) Is this a ersistent request. */ #define MCA_PML_BFO_RECV_REQUEST_INIT( request, \ addr, \ count, \ datatype, \ src, \ tag, \ comm, \ persistent) \ do { \ MCA_PML_BASE_RECV_REQUEST_INIT( &(request)->req_recv, \ addr, \ count, \ datatype, \ src, \ tag, \ comm, \ persistent); \ } while(0) /** * Mark the request as completed at MPI level for internal purposes. * * @param recvreq (IN) Receive request. */ #define MCA_PML_BFO_RECV_REQUEST_MPI_COMPLETE( recvreq ) \ do { \ PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_COMPLETE, \ &(recvreq->req_recv.req_base), PERUSE_RECV ); \ ompi_request_complete( &(recvreq->req_recv.req_base.req_ompi), true ); \ } while (0) /* * Free the PML receive request */ #define MCA_PML_BFO_RECV_REQUEST_RETURN(recvreq) \ { \ MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ OMPI_FREE_LIST_RETURN( &mca_pml_base_recv_requests, \ (ompi_free_list_item_t*)(recvreq)); \ } /** * Complete receive request. Request structure cannot be accessed after calling * this function any more. * * @param recvreq (IN) Receive request. */ static inline void recv_request_pml_complete(mca_pml_bfo_recv_request_t *recvreq) { size_t i; assert(false == recvreq->req_recv.req_base.req_pml_complete); if(recvreq->req_recv.req_bytes_packed > 0) { PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END, &recvreq->req_recv.req_base, PERUSE_RECV ); } for(i = 0; i < recvreq->req_rdma_cnt; i++) { mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[i].btl_reg; if( NULL != btl_reg && btl_reg->mpool != NULL) { btl_reg->mpool->mpool_deregister( btl_reg->mpool, btl_reg ); } } recvreq->req_rdma_cnt = 0; #if PML_BFO recvreq->req_msgseq -= 100; #endif /* PML_BFO */ OPAL_THREAD_LOCK(&ompi_request_lock); if(true == recvreq->req_recv.req_base.req_free_called) { MCA_PML_BFO_RECV_REQUEST_RETURN(recvreq); } else { /* initialize request status */ recvreq->req_recv.req_base.req_pml_complete = true; recvreq->req_recv.req_base.req_ompi.req_status._ucount = recvreq->req_bytes_received; if (recvreq->req_recv.req_bytes_packed > recvreq->req_bytes_expected) { recvreq->req_recv.req_base.req_ompi.req_status._ucount = recvreq->req_recv.req_bytes_packed; recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR = MPI_ERR_TRUNCATE; } MCA_PML_BFO_RECV_REQUEST_MPI_COMPLETE(recvreq); } OPAL_THREAD_UNLOCK(&ompi_request_lock); } static inline bool recv_request_pml_complete_check(mca_pml_bfo_recv_request_t *recvreq) { #if OPAL_ENABLE_MULTI_THREADS opal_atomic_rmb(); #endif if(recvreq->req_match_received && recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed && #if PML_BFO (0 == recvreq->req_events) && lock_recv_request(recvreq)) { #else /* PML_BFO */ lock_recv_request(recvreq)) { #endif /* PML_BFO */ recv_request_pml_complete(recvreq); return true; } return false; } extern void mca_pml_bfo_recv_req_start(mca_pml_bfo_recv_request_t *req); #define MCA_PML_BFO_RECV_REQUEST_START(r) mca_pml_bfo_recv_req_start(r) static inline void prepare_recv_req_converter(mca_pml_bfo_recv_request_t *req) { if( req->req_recv.req_base.req_datatype->super.size | req->req_recv.req_base.req_count ) { opal_convertor_copy_and_prepare_for_recv( req->req_recv.req_base.req_proc->proc_convertor, &(req->req_recv.req_base.req_datatype->super), req->req_recv.req_base.req_count, req->req_recv.req_base.req_addr, 0, &req->req_recv.req_base.req_convertor); opal_convertor_get_unpacked_size(&req->req_recv.req_base.req_convertor, &req->req_bytes_expected); } } #define MCA_PML_BFO_RECV_REQUEST_MATCHED(request, hdr) \ recv_req_matched(request, hdr) static inline void recv_req_matched(mca_pml_bfo_recv_request_t *req, mca_pml_bfo_match_hdr_t *hdr) { req->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src; req->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag; req->req_match_received = true; #if PML_BFO req->req_msgseq = hdr->hdr_seq; #endif /* PML_BFO */ #if OPAL_ENABLE_MULTI_THREADS opal_atomic_wmb(); #endif if(req->req_recv.req_bytes_packed > 0) { #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT if(MPI_ANY_SOURCE == req->req_recv.req_base.req_peer) { /* non wildcard prepared during post recv */ prepare_recv_req_converter(req); } #endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */ PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_XFER_BEGIN, &req->req_recv.req_base, PERUSE_RECV); } } /** * */ #define MCA_PML_BFO_RECV_REQUEST_UNPACK( request, \ segments, \ num_segments, \ seg_offset, \ data_offset, \ bytes_received, \ bytes_delivered) \ do { \ bytes_delivered = 0; \ if(request->req_recv.req_bytes_packed > 0) { \ struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS]; \ uint32_t iov_count = 0; \ size_t max_data = bytes_received; \ size_t n, offset = seg_offset; \ mca_btl_base_segment_t* segment = segments; \ \ OPAL_THREAD_LOCK(&request->lock); \ for( n = 0; n < num_segments; n++, segment++ ) { \ if(offset >= segment->seg_len) { \ offset -= segment->seg_len; \ } else { \ iov[iov_count].iov_len = segment->seg_len - offset; \ iov[iov_count].iov_base = (IOVBASE_TYPE*) \ ((unsigned char*)segment->seg_addr.pval + offset); \ iov_count++; \ offset = 0; \ } \ } \ PERUSE_TRACE_COMM_OMPI_EVENT (PERUSE_COMM_REQ_XFER_CONTINUE, \ &(recvreq->req_recv.req_base), max_data, \ PERUSE_RECV); \ opal_convertor_set_position( &(request->req_recv.req_base.req_convertor), \ &data_offset ); \ opal_convertor_unpack( &(request)->req_recv.req_base.req_convertor, \ iov, \ &iov_count, \ &max_data ); \ bytes_delivered = max_data; \ OPAL_THREAD_UNLOCK(&request->lock); \ } \ } while (0) /** * */ void mca_pml_bfo_recv_request_progress_match( mca_pml_bfo_recv_request_t* req, struct mca_btl_base_module_t* btl, mca_btl_base_segment_t* segments, size_t num_segments); /** * */ void mca_pml_bfo_recv_request_progress_frag( mca_pml_bfo_recv_request_t* req, struct mca_btl_base_module_t* btl, mca_btl_base_segment_t* segments, size_t num_segments); /** * */ void mca_pml_bfo_recv_request_progress_rndv( mca_pml_bfo_recv_request_t* req, struct mca_btl_base_module_t* btl, mca_btl_base_segment_t* segments, size_t num_segments); /** * */ void mca_pml_bfo_recv_request_progress_rget( mca_pml_bfo_recv_request_t* req, struct mca_btl_base_module_t* btl, mca_btl_base_segment_t* segments, size_t num_segments); /** * */ void mca_pml_bfo_recv_request_matched_probe( mca_pml_bfo_recv_request_t* req, struct mca_btl_base_module_t* btl, mca_btl_base_segment_t* segments, size_t num_segments); /** * */ int mca_pml_bfo_recv_request_schedule_once( mca_pml_bfo_recv_request_t* req, mca_bml_base_btl_t* start_bml_btl); static inline int mca_pml_bfo_recv_request_schedule_exclusive( mca_pml_bfo_recv_request_t* req, mca_bml_base_btl_t* start_bml_btl) { int rc; do { rc = mca_pml_bfo_recv_request_schedule_once(req, start_bml_btl); if(rc == OMPI_ERR_OUT_OF_RESOURCE) break; } while(!unlock_recv_request(req)); if(OMPI_SUCCESS == rc) recv_request_pml_complete_check(req); return rc; } static inline void mca_pml_bfo_recv_request_schedule( mca_pml_bfo_recv_request_t* req, mca_bml_base_btl_t* start_bml_btl) { if(!lock_recv_request(req)) return; (void)mca_pml_bfo_recv_request_schedule_exclusive(req, start_bml_btl); } #define MCA_PML_BFO_ADD_ACK_TO_PENDING(P, S, D, O) \ do { \ mca_pml_bfo_pckt_pending_t *_pckt; \ int _rc; \ \ MCA_PML_BFO_PCKT_PENDING_ALLOC(_pckt,_rc); \ _pckt->hdr.hdr_common.hdr_type = MCA_PML_BFO_HDR_TYPE_ACK; \ _pckt->hdr.hdr_ack.hdr_src_req.lval = (S); \ _pckt->hdr.hdr_ack.hdr_dst_req.pval = (D); \ _pckt->hdr.hdr_ack.hdr_send_offset = (O); \ _pckt->proc = (P); \ _pckt->bml_btl = NULL; \ OPAL_THREAD_LOCK(&mca_pml_bfo.lock); \ opal_list_append(&mca_pml_bfo.pckt_pending, \ (opal_list_item_t*)_pckt); \ OPAL_THREAD_UNLOCK(&mca_pml_bfo.lock); \ } while(0) int mca_pml_bfo_recv_request_ack_send_btl(ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset, bool nordma); static inline int mca_pml_bfo_recv_request_ack_send(ompi_proc_t* proc, uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset, bool nordma) { size_t i; mca_bml_base_btl_t* bml_btl; mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)proc->proc_bml; for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) { bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); if(mca_pml_bfo_recv_request_ack_send_btl(proc, bml_btl, hdr_src_req, hdr_dst_req, hdr_send_offset, nordma) == OMPI_SUCCESS) return OMPI_SUCCESS; } MCA_PML_BFO_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req, hdr_send_offset); return OMPI_ERR_OUT_OF_RESOURCE; } int mca_pml_bfo_recv_request_get_frag(mca_pml_bfo_rdma_frag_t* frag); /* This function tries to continue recvreq that stuck due to resource * unavailability. Recvreq is added to recv_pending list if scheduling of put * operation cannot be accomplished for some reason. */ void mca_pml_bfo_recv_request_process_pending(void); END_C_DECLS #endif