1
1

Prevent a receive request from been freed while other thread holds a reference

to it or there is an outstanding completion for the request.

This commit was SVN r16153.
Этот коммит содержится в:
Gleb Natapov 2007-09-18 16:18:47 +00:00
родитель 164a577908
Коммит 097b17d30e
2 изменённых файлов: 226 добавлений и 215 удалений

@ -66,9 +66,7 @@ static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request)
&(recvreq->req_recv.req_base), PERUSE_RECV );
if( true == recvreq->req_recv.req_base.req_pml_complete ) {
if(OPAL_THREAD_ADD32(&recvreq->req_lock, 1) == 1) {
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
}
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -98,7 +96,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
&(request->req_recv.req_base), PERUSE_RECV );
/**
* As now the PML is done with this request we have to force the lmp_complete
* As now the PML is done with this request we have to force the pml_complete
* to true. Otherwise, the request will never be freed.
*/
request->req_recv.req_base.req_pml_complete = true;
@ -168,10 +166,9 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl,
mca_bml_base_free(bml_btl, des);
/* check completion status */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
} else if (recvreq->req_rdma_offset < recvreq->req_send_offset) {
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
if(recv_request_pml_complete_check(recvreq) == false &&
recvreq->req_rdma_offset < recvreq->req_send_offset) {
/* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq, bml_btl);
}
@ -322,10 +319,8 @@ static void mca_pml_ob1_rget_completion( mca_btl_base_module_t* btl,
des->order, 0);
/* is receive request complete */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length)
== recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
}
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length);
recv_request_pml_complete_check(recvreq);
MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
@ -481,6 +476,8 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
data_offset,
bytes_received,
bytes_delivered);
recvreq->req_match_received = true;
opal_atomic_wmb();
break;
case MCA_PML_OB1_HDR_TYPE_RNDV:
@ -491,23 +488,6 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
recvreq->req_rdma_offset = bytes_received;
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
if(recvreq->req_recv.req_base.req_pml_complete) {
/* We must check that completion hasn't already occured */
/* for the self BTL we may choose the RDMA PUT protocol */
/* on the send side, in this case we send no eager data */
/* if, on the receiver side the data is not contiguous we */
/* may choose to use the copy in/out protocol */
/* if this occurs, the entire request can be completed in a */
/* single call to mca_pml_ob1_recv_request_ack */
/* as soon as the last fragment of the copy in/out protocol */
/* gets local completion. This doesn't occur in the general */
/* case of the copy in/out protocol because when both sender */
/* and receiver agree on the copy in/out protoocol we eagerly */
/* send data, we don't update the request with this eagerly sent */
/* data until the end of this function, so completion could not have */
/* yet occurred. */
return;
}
/**
* The PUT protocol do not attach any data to the original request.
* Therefore, we might want to avoid unpacking if there is nothing to
@ -522,6 +502,8 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
bytes_received,
bytes_delivered );
}
recvreq->req_match_received = true;
opal_atomic_wmb();
break;
case MCA_PML_OB1_HDR_TYPE_RGET:
@ -529,6 +511,7 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
mca_pml_ob1_recv_request_rget(recvreq, btl, &hdr->hdr_rget);
recvreq->req_match_received = true;
return;
case MCA_PML_OB1_HDR_TYPE_FRAG:
@ -546,11 +529,11 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
default:
break;
}
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
/* check completion status */
if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received)
>= recvreq->req_recv.req_bytes_packed ) {
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
} else if (recvreq->req_rdma_offset < recvreq->req_send_offset) {
if(recv_request_pml_complete_check(recvreq) == false &&
recvreq->req_rdma_offset < recvreq->req_send_offset) {
/* schedule additional rdma operations */
mca_pml_ob1_recv_request_schedule(recvreq, NULL);
}
@ -589,7 +572,7 @@ void mca_pml_ob1_recv_request_matched_probe( mca_pml_ob1_recv_request_t* recvreq
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_match.hdr_src;
recvreq->req_bytes_received = bytes_packed;
recvreq->req_bytes_delivered = bytes_packed;
MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq );
recv_request_pml_complete(recvreq);
}
@ -598,15 +581,15 @@ void mca_pml_ob1_recv_request_matched_probe( mca_pml_ob1_recv_request_t* recvreq
*
*/
int mca_pml_ob1_recv_request_schedule_exclusive(
int mca_pml_ob1_recv_request_schedule_once(
mca_pml_ob1_recv_request_t* recvreq,
mca_bml_base_btl_t *start_bml_btl)
{
mca_bml_base_btl_t* bml_btl;
int num_tries = recvreq->req_rdma_cnt;
size_t i;
int num_tries = recvreq->req_rdma_cnt, num_fail = 0;
size_t i, prev_bytes_remaining = 0;
size_t bytes_remaining = recvreq->req_send_offset -
recvreq->req_rdma_offset;
recvreq->req_rdma_offset;
/* if starting bml_btl is provided schedule next fragment on it first */
if(start_bml_btl != NULL) {
@ -620,151 +603,138 @@ int mca_pml_ob1_recv_request_schedule_exclusive(
}
}
do {
size_t prev_bytes_remaining = 0;
int num_fail = 0;
while(bytes_remaining > 0 &&
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
size_t hdr_size;
size_t size;
mca_pml_ob1_rdma_hdr_t* hdr;
mca_btl_base_descriptor_t* dst;
mca_btl_base_descriptor_t* ctl;
mca_mpool_base_registration_t * reg = NULL;
int rc, rdma_idx;
while( bytes_remaining > 0 &&
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth ) {
size_t hdr_size;
size_t size;
mca_pml_ob1_rdma_hdr_t* hdr;
mca_btl_base_descriptor_t* dst;
mca_btl_base_descriptor_t* ctl;
mca_mpool_base_registration_t * reg = NULL;
int rc, rdma_idx;
if(prev_bytes_remaining == bytes_remaining) {
if( ++num_fail == num_tries ) {
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
if(false == recvreq->req_pending) {
opal_list_append(&mca_pml_ob1.recv_pending,
(opal_list_item_t*)recvreq);
recvreq->req_pending = true;
}
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
return OMPI_ERR_OUT_OF_RESOURCE;
if(prev_bytes_remaining == bytes_remaining) {
if(++num_fail == num_tries) {
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
if(false == recvreq->req_pending) {
opal_list_append(&mca_pml_ob1.recv_pending,
(opal_list_item_t*)recvreq);
recvreq->req_pending = true;
}
} else {
num_fail = 0;
prev_bytes_remaining = bytes_remaining;
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
} else {
num_fail = 0;
prev_bytes_remaining = bytes_remaining;
}
do {
rdma_idx = recvreq->req_rdma_idx;
bml_btl = recvreq->req_rdma[rdma_idx].bml_btl;
reg = recvreq->req_rdma[rdma_idx].btl_reg;
size = recvreq->req_rdma[rdma_idx].length;
if(++recvreq->req_rdma_idx >= recvreq->req_rdma_cnt)
recvreq->req_rdma_idx = 0;
} while(!size);
do {
rdma_idx = recvreq->req_rdma_idx;
bml_btl = recvreq->req_rdma[rdma_idx].bml_btl;
reg = recvreq->req_rdma[rdma_idx].btl_reg;
size = recvreq->req_rdma[rdma_idx].length;
if(++recvreq->req_rdma_idx >= recvreq->req_rdma_cnt)
recvreq->req_rdma_idx = 0;
} while(!size);
/* makes sure that we don't exceed BTL max rdma size
* if memory is not pinned already */
if(NULL == reg &&
bml_btl->btl_rdma_pipeline_frag_size != 0 &&
size > bml_btl->btl_rdma_pipeline_frag_size) {
size = bml_btl->btl_rdma_pipeline_frag_size;
}
/* makes sure that we don't exceed BTL max rdma size
* if memory is not pinned already */
if(NULL == reg &&
bml_btl->btl_rdma_pipeline_frag_size != 0 &&
size > bml_btl->btl_rdma_pipeline_frag_size) {
size = bml_btl->btl_rdma_pipeline_frag_size;
}
/* take lock to protect converter against concurrent access
* from unpack */
OPAL_THREAD_LOCK(&recvreq->lock);
ompi_convertor_set_position(
&recvreq->req_recv.req_base.req_convertor,
&recvreq->req_rdma_offset);
/* take lock to protect converter against concurrent access
* from unpack */
OPAL_THREAD_LOCK(&recvreq->lock);
ompi_convertor_set_position(
&recvreq->req_recv.req_base.req_convertor,
&recvreq->req_rdma_offset);
/* prepare a descriptor for RDMA */
mca_bml_base_prepare_dst(bml_btl, reg,
&recvreq->req_recv.req_base.req_convertor,
MCA_BTL_NO_ORDER, 0, &size, &dst);
OPAL_THREAD_UNLOCK(&recvreq->lock);
/* prepare a descriptor for RDMA */
mca_bml_base_prepare_dst(bml_btl, reg,
&recvreq->req_recv.req_base.req_convertor,
MCA_BTL_NO_ORDER, 0, &size, &dst);
OPAL_THREAD_UNLOCK(&recvreq->lock);
if( OPAL_UNLIKELY(dst == NULL) ) {
continue;
}
if(OPAL_UNLIKELY(dst == NULL)) {
continue;
}
dst->des_cbfunc = mca_pml_ob1_put_completion;
dst->des_cbdata = recvreq;
dst->des_cbfunc = mca_pml_ob1_put_completion;
dst->des_cbdata = recvreq;
/* prepare a descriptor for rdma control message */
hdr_size = sizeof(mca_pml_ob1_rdma_hdr_t);
if(dst->des_dst_cnt > 1) {
hdr_size += (sizeof(mca_btl_base_segment_t) *
(dst->des_dst_cnt-1));
}
/* prepare a descriptor for rdma control message */
hdr_size = sizeof(mca_pml_ob1_rdma_hdr_t);
if(dst->des_dst_cnt > 1) {
hdr_size += (sizeof(mca_btl_base_segment_t) *
(dst->des_dst_cnt-1));
}
MCA_PML_OB1_DES_ALLOC(bml_btl, ctl, MCA_BTL_NO_ORDER, hdr_size);
if( OPAL_UNLIKELY(NULL == ctl) ) {
mca_bml_base_free(bml_btl,dst);
continue;
}
ctl->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
ctl->des_cbfunc = mca_pml_ob1_recv_ctl_completion;
/* fill in rdma header */
hdr = (mca_pml_ob1_rdma_hdr_t*)ctl->des_src->seg_addr.pval;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_PUT;
hdr->hdr_common.hdr_flags =
(!recvreq->req_ack_sent) ? MCA_PML_OB1_HDR_TYPE_ACK : 0;
hdr->hdr_req = recvreq->req_send;
hdr->hdr_des.pval = dst;
hdr->hdr_rdma_offset = recvreq->req_rdma_offset;
hdr->hdr_seg_cnt = dst->des_dst_cnt;
MCA_PML_OB1_DES_ALLOC(bml_btl, ctl, MCA_BTL_NO_ORDER, hdr_size);
if( OPAL_UNLIKELY(NULL == ctl) ) {
mca_bml_base_free(bml_btl,dst);
continue;
}
ctl->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
ctl->des_cbfunc = mca_pml_ob1_recv_ctl_completion;
/* fill in rdma header */
hdr = (mca_pml_ob1_rdma_hdr_t*)ctl->des_src->seg_addr.pval;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_PUT;
hdr->hdr_common.hdr_flags =
(!recvreq->req_ack_sent) ? MCA_PML_OB1_HDR_TYPE_ACK : 0;
hdr->hdr_req = recvreq->req_send;
hdr->hdr_des.pval = dst;
hdr->hdr_rdma_offset = recvreq->req_rdma_offset;
hdr->hdr_seg_cnt = dst->des_dst_cnt;
for( i = 0; i < dst->des_dst_cnt; i++ ) {
hdr->hdr_segs[i].seg_addr.lval = ompi_ptr_ptol(dst->des_dst[i].seg_addr.pval);
hdr->hdr_segs[i].seg_len = dst->des_dst[i].seg_len;
hdr->hdr_segs[i].seg_key.key64 = dst->des_dst[i].seg_key.key64;
}
for( i = 0; i < dst->des_dst_cnt; i++ ) {
hdr->hdr_segs[i].seg_addr.lval = ompi_ptr_ptol(dst->des_dst[i].seg_addr.pval);
hdr->hdr_segs[i].seg_len = dst->des_dst[i].seg_len;
hdr->hdr_segs[i].seg_key.key64 = dst->des_dst[i].seg_key.key64;
}
if(!recvreq->req_ack_sent)
recvreq->req_ack_sent = true;
if(!recvreq->req_ack_sent)
recvreq->req_ack_sent = true;
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
#ifdef WORDS_BIGENDIAN
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
#else
/* if we are little endian and the remote side is big endian,
we're responsible for making sure the data is in network byte
order */
if (recvreq->req_recv.req_base.req_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
MCA_PML_OB1_RDMA_HDR_HTON(*hdr);
}
/* if we are little endian and the remote side is big endian,
we're responsible for making sure the data is in network byte
order */
if(recvreq->req_recv.req_base.req_proc->proc_arch &
OMPI_ARCH_ISBIGENDIAN) {
hdr->hdr_common.hdr_flags |= MCA_PML_OB1_HDR_FLAGS_NBO;
MCA_PML_OB1_RDMA_HDR_HTON(*hdr);
}
#endif
#endif
PERUSE_TRACE_COMM_OMPI_EVENT( PERUSE_COMM_REQ_XFER_CONTINUE,
&(recvreq->req_recv.req_base), size,
PERUSE_RECV);
PERUSE_TRACE_COMM_OMPI_EVENT( PERUSE_COMM_REQ_XFER_CONTINUE,
&(recvreq->req_recv.req_base), size,
PERUSE_RECV);
/* send rdma request to peer */
rc = mca_bml_base_send(bml_btl, ctl, MCA_BTL_TAG_PML);
if( OPAL_LIKELY(OMPI_SUCCESS == rc) ) {
/* update request state */
recvreq->req_rdma_offset += size;
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth,1);
recvreq->req_rdma[rdma_idx].length -= size;
bytes_remaining -= size;
} else {
mca_bml_base_free(bml_btl,ctl);
mca_bml_base_free(bml_btl,dst);
continue;
}
/* send rdma request to peer */
rc = mca_bml_base_send(bml_btl, ctl, MCA_BTL_TAG_PML);
if(OPAL_LIKELY(OMPI_SUCCESS == rc)) {
/* update request state */
recvreq->req_rdma_offset += size;
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth, 1);
recvreq->req_rdma[rdma_idx].length -= size;
bytes_remaining -= size;
} else {
mca_bml_base_free(bml_btl,ctl);
mca_bml_base_free(bml_btl,dst);
continue;
}
/* run progress as the prepare (pinning) can take some time */
mca_bml.bml_progress();
}
bytes_remaining = recvreq->req_send_offset - recvreq->req_rdma_offset;
OPAL_THREAD_LOCK(&ompi_request_lock);
if(recvreq->req_recv.req_base.req_pml_complete &&
recvreq->req_recv.req_base.req_free_called) {
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq );
OPAL_THREAD_UNLOCK(&ompi_request_lock);
return MPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} while(OPAL_THREAD_ADD32(&recvreq->req_lock,-1) > 0);
/* run progress as the prepare (pinning) can take some time */
mca_bml.bml_progress();
}
return OMPI_SUCCESS;
}

@ -38,11 +38,7 @@ extern "C" {
struct mca_pml_ob1_recv_request_t {
mca_pml_base_recv_request_t req_recv;
ompi_ptr_t req_send;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_lock;
#else
int32_t req_lock;
#endif
size_t req_pipeline_depth;
size_t req_bytes_received;
size_t req_bytes_delivered;
@ -52,6 +48,7 @@ struct mca_pml_ob1_recv_request_t {
uint32_t req_rdma_idx;
bool req_pending;
bool req_ack_sent; /**< whether ack was sent to the sender */
bool req_match_received; /**< Prevent request to be completed prematurely */
opal_mutex_t lock;
mca_pml_ob1_com_btl_t req_rdma[1];
};
@ -59,6 +56,15 @@ typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t);
static inline bool lock_recv_request(mca_pml_ob1_recv_request_t *recvreq)
{
return OPAL_THREAD_ADD32(&recvreq->req_lock, 1) == 1;
}
static inline bool unlock_recv_request(mca_pml_ob1_recv_request_t *recvreq)
{
return OPAL_THREAD_ADD32(&recvreq->req_lock, -1) == 0;
}
/**
* Allocate a recv request from the modules free list.
@ -118,52 +124,6 @@ do { \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \
} while (0)
/**
* Return a recv request to the modules free list.
*
* @param recvreq (IN) Receive request.
*/
#define MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE(recvreq) \
do { \
size_t r; \
\
assert( false == recvreq->req_recv.req_base.req_pml_complete ); \
\
if((recvreq)->req_recv.req_bytes_packed > 0) { \
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END, \
&(recvreq->req_recv.req_base), PERUSE_RECV ); \
} \
\
for( r = 0; r < recvreq->req_rdma_cnt; r++ ) { \
mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[r].btl_reg; \
if( NULL != btl_reg && btl_reg->mpool != NULL) { \
btl_reg->mpool->mpool_deregister( btl_reg->mpool, btl_reg ); \
} \
} \
recvreq->req_rdma_cnt = 0; \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
\
if( true == recvreq->req_recv.req_base.req_free_called ) { \
if(OPAL_THREAD_ADD32(&recvreq->req_lock, 1) == 1) { \
MCA_PML_OB1_RECV_REQUEST_RETURN( recvreq ); \
} \
} else { \
/* initialize request status */ \
recvreq->req_recv.req_base.req_pml_complete = true; \
recvreq->req_recv.req_base.req_ompi.req_status._count = \
(int)recvreq->req_bytes_received; \
if (recvreq->req_bytes_received > recvreq->req_bytes_delivered) { \
recvreq->req_recv.req_base.req_ompi.req_status._count = \
(int)recvreq->req_bytes_delivered; \
recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR = \
MPI_ERR_TRUNCATE; \
} \
MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE( recvreq ); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0)
/*
* Free the PML receive request
*/
@ -174,6 +134,66 @@ do {
(ompi_free_list_item_t*)(recvreq)); \
}
/**
* Complete receive request. Request structure cannot be accessed after calling
* this function any more.
*
* @param recvreq (IN) Receive request.
*/
void static inline
recv_request_pml_complete(mca_pml_ob1_recv_request_t *recvreq)
{
size_t i;
assert(false == recvreq->req_recv.req_base.req_pml_complete);
if(recvreq->req_recv.req_bytes_packed > 0) {
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_XFER_END,
&recvreq->req_recv.req_base, PERUSE_RECV );
}
for(i = 0; i < recvreq->req_rdma_cnt; i++) {
mca_mpool_base_registration_t* btl_reg = recvreq->req_rdma[i].btl_reg;
if( NULL != btl_reg && btl_reg->mpool != NULL) {
btl_reg->mpool->mpool_deregister( btl_reg->mpool, btl_reg );
}
}
recvreq->req_rdma_cnt = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
if(true == recvreq->req_recv.req_base.req_free_called) {
MCA_PML_OB1_RECV_REQUEST_RETURN(recvreq);
} else {
/* initialize request status */
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_status._count =
(int)recvreq->req_bytes_received;
if (recvreq->req_bytes_received > recvreq->req_bytes_delivered) {
recvreq->req_recv.req_base.req_ompi.req_status._count =
(int)recvreq->req_bytes_delivered;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR =
MPI_ERR_TRUNCATE;
}
MCA_PML_OB1_RECV_REQUEST_MPI_COMPLETE(recvreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
bool static inline
recv_request_pml_complete_check(mca_pml_ob1_recv_request_t *recvreq)
{
opal_atomic_rmb();
if(recvreq->req_match_received &&
recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed &&
lock_recv_request(recvreq)) {
recv_request_pml_complete(recvreq);
return true;
}
return false;
}
/**
* Attempt to match the request against the unexpected fragment list
* for all source ranks w/in the communicator.
@ -211,6 +231,7 @@ do {
(request)->req_rdma_idx = 0; \
(request)->req_pending = false; \
(request)->req_ack_sent = false; \
(request)->req_match_received = false; \
\
MCA_PML_BASE_RECV_START( &(request)->req_recv.req_base ); \
\
@ -334,15 +355,35 @@ void mca_pml_ob1_recv_request_matched_probe(
*
*/
int mca_pml_ob1_recv_request_schedule_exclusive(
int mca_pml_ob1_recv_request_schedule_once(
mca_pml_ob1_recv_request_t* req, mca_bml_base_btl_t* start_bml_btl);
static inline int mca_pml_ob1_recv_request_schedule_exclusive(
mca_pml_ob1_recv_request_t* req,
mca_bml_base_btl_t* start_bml_btl)
{
int rc;
do {
rc = mca_pml_ob1_recv_request_schedule_once(req, start_bml_btl);
if(rc == OMPI_ERR_OUT_OF_RESOURCE)
break;
} while(!unlock_recv_request(req));
if(OMPI_SUCCESS == rc)
recv_request_pml_complete_check(req);
return rc;
}
static inline void mca_pml_ob1_recv_request_schedule(
mca_pml_ob1_recv_request_t* req,
mca_bml_base_btl_t* start_bml_btl)
{
if(OPAL_THREAD_ADD32(&req->req_lock,1) == 1)
mca_pml_ob1_recv_request_schedule_exclusive(req, start_bml_btl);
if(!lock_recv_request(req))
return;
(void)mca_pml_ob1_recv_request_schedule_exclusive(req, start_bml_btl);
}
#define MCA_PML_OB1_ADD_ACK_TO_PENDING(P, S, D, O) \