Receive side changes, basically uses multiple active message callbacks rather
than using a single receive callback followed by a switch on the header. Also fast pathed the matching for small fragments. This commit was SVN r18549.
Этот коммит содержится в:
родитель
8b01499d6e
Коммит
4da4c44210
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -248,8 +249,44 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_BTL_TAG_PML,
|
||||
mca_pml_ob1_recv_frag_callback,
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_MATCH,
|
||||
mca_pml_ob1_recv_frag_callback_match,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_RNDV,
|
||||
mca_pml_ob1_recv_frag_callback_rndv,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_RGET,
|
||||
mca_pml_ob1_recv_frag_callback_rget,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_ACK,
|
||||
mca_pml_ob1_recv_frag_callback_ack,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_FRAG,
|
||||
mca_pml_ob1_recv_frag_callback_frag,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_PUT,
|
||||
mca_pml_ob1_recv_frag_callback_put,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
|
||||
rc = mca_bml.bml_register( MCA_PML_OB1_HDR_TYPE_FIN,
|
||||
mca_pml_ob1_recv_frag_callback_fin,
|
||||
NULL );
|
||||
if(OMPI_SUCCESS != rc)
|
||||
goto cleanup_and_return;
|
||||
@ -347,12 +384,10 @@ int mca_pml_ob1_send_fin( ompi_proc_t* proc,
|
||||
ob1_hdr_hton(hdr, MCA_PML_OB1_HDR_TYPE_FIN, proc);
|
||||
|
||||
/* queue request */
|
||||
rc = mca_bml_base_send(
|
||||
bml_btl,
|
||||
fin,
|
||||
MCA_BTL_TAG_PML
|
||||
);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
rc = mca_bml_base_send( bml_btl,
|
||||
fin,
|
||||
MCA_PML_OB1_HDR_TYPE_FIN );
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
mca_bml_base_free(bml_btl, fin);
|
||||
MCA_PML_OB1_ADD_FIN_TO_PENDING(proc, hdr_des, bml_btl, order, status);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -35,6 +36,8 @@
|
||||
#include "pml_ob1_sendreq.h"
|
||||
#include "pml_ob1_hdr.h"
|
||||
#include "ompi/peruse/peruse-internal.h"
|
||||
#include "ompi/memchecker.h"
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE( mca_pml_ob1_buffer_t,
|
||||
ompi_free_list_item_t,
|
||||
@ -63,98 +66,297 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_frag_t,
|
||||
static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments );
|
||||
size_t num_segments,
|
||||
int type);
|
||||
|
||||
static mca_pml_ob1_recv_request_t *match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, ompi_communicator_t *comm_ptr,
|
||||
mca_pml_ob1_comm_proc_t *proc,
|
||||
mca_pml_ob1_recv_frag_t* frag);
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segment = des->des_dst;
|
||||
mca_pml_ob1_match_hdr_t* hdr = (mca_pml_ob1_match_hdr_t*)segment->seg_addr.pval;
|
||||
ompi_communicator_t *comm_ptr;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
mca_pml_ob1_comm_t *comm;
|
||||
mca_pml_ob1_comm_proc_t *proc;
|
||||
mca_pml_ob1_recv_frag_t* frag = NULL;
|
||||
size_t num_segments = des->des_dst_cnt;
|
||||
size_t bytes_received = 0;
|
||||
|
||||
|
||||
/**
|
||||
* Callback from BTL on receive.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback( mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata )
|
||||
{
|
||||
if( OPAL_UNLIKELY(segment->seg_len < sizeof(mca_pml_ob1_match_hdr_t))) {
|
||||
return;
|
||||
}
|
||||
ob1_hdr_ntoh(((mca_pml_ob1_hdr_t*) hdr), MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
|
||||
/* This is a special case. A message for a not yet existing communicator can
|
||||
* happens, but right now we segfault. Instead, and until we find a better
|
||||
* solution, just drop the message. However, in the near future we should
|
||||
* store this fragment in a global list, and deliver it to the right
|
||||
* communicator once it get created.
|
||||
*/
|
||||
opal_output( 0, "Dropped message for the non-existing communicator %d\n",
|
||||
(int)hdr->hdr_ctx );
|
||||
return;
|
||||
}
|
||||
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
proc = &comm->procs[hdr->hdr_src];
|
||||
|
||||
/**
|
||||
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
|
||||
* fragment arrival. Independing if it is received on the correct order or not.
|
||||
* This will allow the tools to figure out if the messages are not received in the
|
||||
* correct order (if multiple network interfaces).
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
* a frag from the same message a match is made only once.
|
||||
* Also, this prevents other posted receives (for a pair of
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
|
||||
(opal_list_get_size(&proc->frags_cant_match) > 0 ) ||
|
||||
(num_segments > 1))) {
|
||||
goto slow_path;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
|
||||
/**
|
||||
* We generate the SEARCH_POSTED_QUEUE only when the message is received
|
||||
* in the correct sequence. Otherwise, we delay the event generation until
|
||||
* we reach the correct sequence number.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
match = match_one(btl, hdr, segment, num_segments, comm_ptr, proc, frag);
|
||||
|
||||
/**
|
||||
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
|
||||
* into the mca_pml_ob1_check_cantmatch_for_match so we can make a difference
|
||||
* for the searching time for all messages.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* release matching lock before processing fragment */
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
if(OPAL_LIKELY(match)) {
|
||||
bytes_received = segment->seg_len - sizeof(mca_pml_ob1_match_hdr_t);
|
||||
match->req_recv.req_bytes_packed = bytes_received;
|
||||
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(match, hdr);
|
||||
if(bytes_received > 0) {
|
||||
struct iovec iov[2];
|
||||
uint32_t iov_count = num_segments;
|
||||
|
||||
/*
|
||||
* Make user buffer accessable(defined) before unpacking.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
match->req_recv.req_base.req_addr,
|
||||
match->req_recv.req_base.req_count,
|
||||
match->req_recv.req_base.req_datatype);
|
||||
);
|
||||
|
||||
iov[0].iov_len = bytes_received;
|
||||
iov[0].iov_base = (IOVBASE_TYPE*)((unsigned char*)segment->seg_addr.pval +
|
||||
sizeof(mca_pml_ob1_match_hdr_t));
|
||||
if(num_segments > 1) {
|
||||
bytes_received += segment[1].seg_len;
|
||||
iov[1].iov_len = segment[1].seg_len;
|
||||
iov[1].iov_base = (IOVBASE_TYPE*)((unsigned char*)segment[1].seg_addr.pval);
|
||||
}
|
||||
ompi_convertor_unpack( &match->req_recv.req_base.req_convertor,
|
||||
iov,
|
||||
&iov_count,
|
||||
&bytes_received );
|
||||
match->req_bytes_received = bytes_received;
|
||||
/*
|
||||
* Unpacking finished, make the user buffer unaccessable again.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
match->req_recv.req_base.req_addr,
|
||||
match->req_recv.req_base.req_count,
|
||||
match->req_recv.req_base.req_datatype);
|
||||
);
|
||||
}
|
||||
|
||||
/* no need to check if complete we know we are.. */
|
||||
/* don't need a rmb as that is for checking */
|
||||
recv_request_pml_complete(match);
|
||||
}
|
||||
return;
|
||||
|
||||
slow_path:
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
mca_pml_ob1_recv_frag_match(btl, hdr, segment,
|
||||
num_segments, MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_rndv(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
|
||||
|
||||
|
||||
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* hdr_type and hdr_flags are uint8_t, so no endian problems */
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
{
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
mca_pml_ob1_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
{
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
mca_pml_ob1_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
{
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_RGET);
|
||||
mca_pml_ob1_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_ACK:
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq;
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_ACK);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
|
||||
/* if the request should be delivered entirely by copy in/out
|
||||
* then throttle sends */
|
||||
if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_NORDMA)
|
||||
sendreq->req_throttle_sends = true;
|
||||
|
||||
mca_pml_ob1_send_request_copy_in_out(sendreq,
|
||||
hdr->hdr_ack.hdr_send_offset,
|
||||
sendreq->req_send.req_bytes_packed -
|
||||
hdr->hdr_ack.hdr_send_offset);
|
||||
|
||||
OPAL_THREAD_ADD32(&sendreq->req_state, -1);
|
||||
|
||||
if(send_request_pml_complete_check(sendreq) == false)
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
{
|
||||
mca_pml_ob1_recv_request_t* recvreq;
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_FRAG);
|
||||
recvreq = (mca_pml_ob1_recv_request_t*)hdr->hdr_frag.hdr_dst_req.pval;
|
||||
mca_pml_ob1_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_PUT:
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq;
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_PUT);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_rdma.hdr_req.pval;
|
||||
mca_pml_ob1_send_request_put(sendreq,btl,&hdr->hdr_rdma);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_FIN:
|
||||
{
|
||||
mca_btl_base_descriptor_t* rdma;
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_FIN);
|
||||
rdma = (mca_btl_base_descriptor_t*)hdr->hdr_fin.hdr_des.pval;
|
||||
rdma->des_cbfunc(btl, NULL, rdma,
|
||||
hdr->hdr_fin.hdr_fail ? OMPI_ERROR : OMPI_SUCCESS);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
mca_pml_ob1_recv_frag_match(btl, &hdr->hdr_match, segments,
|
||||
des->des_dst_cnt, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
return;
|
||||
}
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_rget(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_RGET);
|
||||
mca_pml_ob1_recv_frag_match(btl, &hdr->hdr_match, segments,
|
||||
des->des_dst_cnt, MCA_PML_OB1_HDR_TYPE_RGET);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_ack(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
mca_pml_ob1_send_request_t* sendreq;
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_ACK);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
|
||||
/* if the request should be delivered entirely by copy in/out
|
||||
* then throttle sends */
|
||||
if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_NORDMA)
|
||||
sendreq->req_throttle_sends = true;
|
||||
|
||||
mca_pml_ob1_send_request_copy_in_out(sendreq,
|
||||
hdr->hdr_ack.hdr_send_offset,
|
||||
sendreq->req_send.req_bytes_packed -
|
||||
hdr->hdr_ack.hdr_send_offset);
|
||||
|
||||
OPAL_THREAD_ADD32(&sendreq->req_state, -1);
|
||||
|
||||
if(send_request_pml_complete_check(sendreq) == false)
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_frag(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
mca_pml_ob1_recv_request_t* recvreq;
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_FRAG);
|
||||
recvreq = (mca_pml_ob1_recv_request_t*)hdr->hdr_frag.hdr_dst_req.pval;
|
||||
mca_pml_ob1_recv_request_progress_frag(recvreq,btl,segments,des->des_dst_cnt);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_put(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
mca_pml_ob1_send_request_t* sendreq;
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_PUT);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_rdma.hdr_req.pval;
|
||||
mca_pml_ob1_send_request_put(sendreq,btl,&hdr->hdr_rdma);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_fin(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
void* cbdata ) {
|
||||
mca_btl_base_segment_t* segments = des->des_dst;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
mca_btl_base_descriptor_t* rdma;
|
||||
|
||||
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
ob1_hdr_ntoh(hdr, MCA_PML_OB1_HDR_TYPE_FIN);
|
||||
rdma = (mca_btl_base_descriptor_t*)hdr->hdr_fin.hdr_des.pval;
|
||||
rdma->des_cbfunc(btl, NULL, rdma,
|
||||
hdr->hdr_fin.hdr_fail ? OMPI_ERROR : OMPI_SUCCESS);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
#define PML_MAX_SEQ ~((mca_pml_sequence_t)0);
|
||||
|
||||
static inline mca_pml_ob1_recv_request_t* get_posted_recv(opal_list_t *queue)
|
||||
@ -341,7 +543,8 @@ static mca_pml_ob1_recv_frag_t *check_cantmatch_for_match(
|
||||
static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
size_t num_segments,
|
||||
int type)
|
||||
{
|
||||
/* local variables */
|
||||
uint16_t next_msg_seq_expected, frag_msg_seq;
|
||||
@ -354,7 +557,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
/* communicator pointer */
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
|
||||
/* This is a special case. A message for a not yet exiting communicator can
|
||||
/* This is a special case. A message for a not yet existing communicator can
|
||||
* happens, but right now we segfault. Instead, and until we find a better
|
||||
* solution, just drop the message. However, in the near future we should
|
||||
* store this fragment in a global list, and deliver it to the right
|
||||
@ -425,11 +628,22 @@ out_of_order_match:
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
if(OPAL_LIKELY(match)) {
|
||||
mca_pml_ob1_recv_request_progress(match, btl, segments, num_segments);
|
||||
switch(type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
mca_pml_ob1_recv_request_progress_match(match, btl, segments, num_segments);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
mca_pml_ob1_recv_request_progress_rndv(match, btl, segments, num_segments);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
mca_pml_ob1_recv_request_progress_rget(match, btl, segments, num_segments);
|
||||
break;
|
||||
}
|
||||
|
||||
if(OPAL_UNLIKELY(frag))
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
@ -442,6 +656,7 @@ out_of_order_match:
|
||||
segments = frag->segments;
|
||||
num_segments = frag->num_segments;
|
||||
btl = frag->btl;
|
||||
type = hdr->hdr_common.hdr_type;
|
||||
goto out_of_order_match;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -109,14 +110,65 @@ do { \
|
||||
|
||||
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag.
|
||||
* Callback from BTL on receipt of a recv_frag (match).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_match( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (rndv).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_rndv( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (rget).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_rget( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (ack).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_ack( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (frag).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_frag( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (put).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_put( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
/**
|
||||
* Callback from BTL on receipt of a recv_frag (fin).
|
||||
*/
|
||||
|
||||
extern void mca_pml_ob1_recv_frag_callback_fin( mca_btl_base_module_t *btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* descriptor,
|
||||
void* cbdata );
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -233,7 +234,7 @@ int mca_pml_ob1_recv_request_ack_send_btl(
|
||||
/* initialize descriptor */
|
||||
des->des_cbfunc = mca_pml_ob1_recv_ctl_completion;
|
||||
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_ACK);
|
||||
if( OPAL_UNLIKELY(rc != OMPI_SUCCESS) ) {
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -399,19 +400,90 @@ int mca_pml_ob1_recv_request_get_frag( mca_pml_ob1_rdma_frag_t* frag )
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void mca_pml_ob1_recv_request_rget(
|
||||
mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_pml_ob1_rget_hdr_t* hdr)
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* Update the recv request status to reflect the number of bytes
|
||||
* received and actually delivered to the application.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_frag( mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
{
|
||||
size_t bytes_received = 0;
|
||||
size_t bytes_delivered = 0;
|
||||
size_t data_offset = 0;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
0, bytes_received );
|
||||
bytes_received -= sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
data_offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
/*
|
||||
* Make user buffer accessable(defined) before unpacking.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered );
|
||||
/*
|
||||
* Unpacking finished, make the user buffer unaccessable again.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
|
||||
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
|
||||
/* check completion status */
|
||||
if(recv_request_pml_complete_check(recvreq) == false &&
|
||||
recvreq->req_rdma_offset < recvreq->req_send_offset) {
|
||||
/* schedule additional rdma operations */
|
||||
mca_pml_ob1_recv_request_schedule(recvreq, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the recv request status to reflect the number of bytes
|
||||
* received and actually delivered to the application.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_rget( mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
{
|
||||
size_t bytes_received = 0;
|
||||
mca_pml_ob1_rget_hdr_t* hdr = (mca_pml_ob1_rget_hdr_t*)segments->seg_addr.pval;
|
||||
mca_bml_base_endpoint_t* bml_endpoint = NULL;
|
||||
mca_pml_ob1_rdma_frag_t* frag;
|
||||
size_t i, size = 0;
|
||||
int rc;
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
0, bytes_received );
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_rndv.hdr_match);
|
||||
|
||||
|
||||
/* if receive buffer is not contiguous we can't just RDMA read into it, so
|
||||
* fall back to copy in/out protocol. It is a pity because buffer on the
|
||||
* sender side is already registered. We need to ne smarter here, perhaps
|
||||
* sender side is already registered. We need to be smarter here, perhaps
|
||||
* do couple of RDMA reads */
|
||||
if(ompi_convertor_need_buffers(&recvreq->req_recv.req_base.req_convertor) == true) {
|
||||
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, 0);
|
||||
@ -454,18 +526,25 @@ static void mca_pml_ob1_recv_request_rget(
|
||||
frag->reg = NULL;
|
||||
|
||||
mca_pml_ob1_recv_request_get_frag(frag);
|
||||
|
||||
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
|
||||
/* check completion status */
|
||||
if(recv_request_pml_complete_check(recvreq) == false &&
|
||||
recvreq->req_rdma_offset < recvreq->req_send_offset) {
|
||||
/* schedule additional rdma operations */
|
||||
mca_pml_ob1_recv_request_schedule(recvreq, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Update the recv request status to reflect the number of bytes
|
||||
* received and actually delivered to the application.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
void mca_pml_ob1_recv_request_progress_rndv( mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
{
|
||||
size_t bytes_received = 0;
|
||||
size_t bytes_delivered = 0;
|
||||
@ -474,126 +553,103 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
0, bytes_received );
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
|
||||
bytes_received -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = bytes_received;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
|
||||
/*
|
||||
* Make user buffer accessible (defined) before unpacking.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_match_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
/*
|
||||
* Unpacking finished, make the user buffer unaccessable again.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
break;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
|
||||
bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
recvreq->req_send = hdr->hdr_rndv.hdr_src_req;
|
||||
recvreq->req_rdma_offset = bytes_received;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
|
||||
/**
|
||||
* The PUT protocol do not attach any data to the original request.
|
||||
* Therefore, we might want to avoid unpacking if there is nothing to
|
||||
* unpack.
|
||||
*/
|
||||
if( 0 < bytes_received ) {
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered );
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
}
|
||||
break;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_rget(recvreq, btl, &hdr->hdr_rget);
|
||||
return;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
bytes_received -= sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
data_offset = hdr->hdr_frag.hdr_frag_offset;
|
||||
/*
|
||||
* Make user buffer accessable(defined) before unpacking.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered );
|
||||
/*
|
||||
* Unpacking finished, make the user buffer unaccessable again.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
|
||||
bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
recvreq->req_send = hdr->hdr_rndv.hdr_src_req;
|
||||
recvreq->req_rdma_offset = bytes_received;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
|
||||
/**
|
||||
* The PUT protocol do not attach any data to the original request.
|
||||
* Therefore, we might want to avoid unpacking if there is nothing to
|
||||
* unpack.
|
||||
*/
|
||||
if( 0 < bytes_received ) {
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered );
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
}
|
||||
|
||||
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
|
||||
/* check completion status */
|
||||
if(recv_request_pml_complete_check(recvreq) == false &&
|
||||
recvreq->req_rdma_offset < recvreq->req_send_offset) {
|
||||
recvreq->req_rdma_offset < recvreq->req_send_offset) {
|
||||
/* schedule additional rdma operations */
|
||||
mca_pml_ob1_recv_request_schedule(recvreq, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Update the recv request status to reflect the number of bytes
|
||||
* received and actually delivered to the application.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_match( mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments )
|
||||
{
|
||||
size_t bytes_received = 0;
|
||||
size_t bytes_delivered = 0;
|
||||
size_t data_offset = 0;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
bool complete;
|
||||
|
||||
MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( segments, num_segments,
|
||||
0, bytes_received );
|
||||
bytes_received -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
recvreq->req_recv.req_bytes_packed = bytes_received;
|
||||
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq, &hdr->hdr_match);
|
||||
/*
|
||||
* Make user buffer accessable(defined) before unpacking.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_defined,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
MCA_PML_OB1_RECV_REQUEST_UNPACK( recvreq,
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_match_hdr_t),
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
/*
|
||||
* Unpacking finished, make the user buffer unaccessable again.
|
||||
*/
|
||||
MEMCHECKER(
|
||||
memchecker_call(&opal_memchecker_base_mem_noaccess,
|
||||
recvreq->req_recv.req_base.req_addr,
|
||||
recvreq->req_recv.req_base.req_count,
|
||||
recvreq->req_recv.req_base.req_datatype);
|
||||
);
|
||||
|
||||
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received);
|
||||
complete = recv_request_pml_complete_check(recvreq);
|
||||
assert(complete);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handle completion of a probe request
|
||||
*/
|
||||
@ -762,7 +818,7 @@ int mca_pml_ob1_recv_request_schedule_once(
|
||||
PERUSE_RECV);
|
||||
|
||||
/* send rdma request to peer */
|
||||
rc = mca_bml_base_send(bml_btl, ctl, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, ctl, MCA_PML_OB1_HDR_TYPE_PUT);
|
||||
if(OPAL_LIKELY(OMPI_SUCCESS == rc)) {
|
||||
/* update request state */
|
||||
recvreq->req_rdma_offset += size;
|
||||
@ -872,6 +928,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
mca_pml_ob1_comm_proc_t* proc;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
opal_list_t *queue;
|
||||
mca_pml_ob1_hdr_t* hdr;
|
||||
|
||||
/* init/re-init the request */
|
||||
req->req_lock = 0;
|
||||
@ -940,14 +997,33 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
opal_list_remove_item(&proc->unexpected_frags,
|
||||
(opal_list_item_t*)frag);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
mca_pml_ob1_recv_request_progress(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
|
||||
hdr = (mca_pml_ob1_hdr_t*)frag->segments->seg_addr.pval;
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
mca_pml_ob1_recv_request_progress_match(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
mca_pml_ob1_recv_request_progress_rndv(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
mca_pml_ob1_recv_request_progress_rget(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
mca_pml_ob1_recv_request_progress_frag(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
break;
|
||||
}
|
||||
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
|
||||
} else {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
|
||||
frag->segments, frag->num_segments);
|
||||
frag->segments, frag->num_segments);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -118,7 +119,7 @@ do { \
|
||||
do { \
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_COMPLETE, \
|
||||
&(recvreq->req_recv.req_base), PERUSE_RECV ); \
|
||||
ompi_request_complete( &(recvreq->req_recv.req_base.req_ompi) ); \
|
||||
ompi_request_complete( &(recvreq->req_recv.req_base.req_ompi) ); \
|
||||
} while (0)
|
||||
|
||||
/*
|
||||
@ -274,11 +275,88 @@ do {
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress(
|
||||
#define MCA_PML_OB1_RECV_REQUEST_UNPACK_ALL( \
|
||||
request, \
|
||||
segments, \
|
||||
num_segments, \
|
||||
seg_offset, \
|
||||
data_offset, \
|
||||
bytes_received, \
|
||||
bytes_delivered) \
|
||||
do { \
|
||||
bytes_delivered = 0; \
|
||||
if(request->req_recv.req_bytes_packed > 0) { \
|
||||
struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS]; \
|
||||
uint32_t iov_count = 0; \
|
||||
size_t max_data = bytes_received; \
|
||||
size_t n, offset = seg_offset; \
|
||||
mca_btl_base_segment_t* segment = segments; \
|
||||
\
|
||||
OPAL_THREAD_LOCK(&request->lock); \
|
||||
for( n = 0; n < num_segments; n++, segment++ ) { \
|
||||
if(offset >= segment->seg_len) { \
|
||||
offset -= segment->seg_len; \
|
||||
} else { \
|
||||
iov[iov_count].iov_len = segment->seg_len - offset; \
|
||||
iov[iov_count].iov_base = (IOVBASE_TYPE*)((unsigned char*)segment->seg_addr.pval + offset); \
|
||||
iov_count++; \
|
||||
} \
|
||||
} \
|
||||
PERUSE_TRACE_COMM_OMPI_EVENT (PERUSE_COMM_REQ_XFER_CONTINUE, \
|
||||
&(recvreq->req_recv.req_base), \
|
||||
bytes_received, \
|
||||
PERUSE_RECV); \
|
||||
ompi_convertor_set_position( &(request->req_recv.req_base.req_convertor), \
|
||||
&data_offset ); \
|
||||
ompi_convertor_unpack( &(request)->req_recv.req_base.req_convertor, \
|
||||
iov, \
|
||||
&iov_count, \
|
||||
&max_data ); \
|
||||
bytes_delivered = max_data; \
|
||||
OPAL_THREAD_UNLOCK(&request->lock); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_match(
|
||||
mca_pml_ob1_recv_request_t* req,
|
||||
struct mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_frag(
|
||||
mca_pml_ob1_recv_request_t* req,
|
||||
struct mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_rndv(
|
||||
mca_pml_ob1_recv_request_t* req,
|
||||
struct mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_progress_rget(
|
||||
mca_pml_ob1_recv_request_t* req,
|
||||
struct mca_btl_base_module_t* btl,
|
||||
mca_btl_base_segment_t* segments,
|
||||
@ -368,7 +446,7 @@ static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc,
|
||||
}
|
||||
|
||||
MCA_PML_OB1_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req,
|
||||
hdr_send_offset);
|
||||
hdr_send_offset);
|
||||
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
@ -9,6 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -394,7 +395,7 @@ int mca_pml_ob1_send_request_start_buffered(
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* send */
|
||||
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, descriptor, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
mca_bml_base_free(bml_btl, descriptor );
|
||||
}
|
||||
@ -481,7 +482,7 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq,
|
||||
|
||||
|
||||
/* send */
|
||||
rc = mca_bml_base_send_status(bml_btl, descriptor, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send_status(bml_btl, descriptor, MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
switch(rc) {
|
||||
case OMPI_SUCCESS:
|
||||
/* packet is on wire; signal request completion */
|
||||
@ -546,7 +547,7 @@ int mca_pml_ob1_send_request_start_prepare( mca_pml_ob1_send_request_t* sendreq,
|
||||
descriptor->des_cbdata = sendreq;
|
||||
|
||||
/* send */
|
||||
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, descriptor, MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
mca_bml_base_free(bml_btl, descriptor );
|
||||
}
|
||||
@ -705,7 +706,7 @@ int mca_pml_ob1_send_request_start_rdma(
|
||||
des->des_cbdata = sendreq;
|
||||
|
||||
/* send */
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
mca_bml_base_free(bml_btl, des);
|
||||
}
|
||||
@ -785,7 +786,7 @@ int mca_pml_ob1_send_request_start_rndv( mca_pml_ob1_send_request_t* sendreq,
|
||||
sendreq->req_state = 2;
|
||||
|
||||
/* send */
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_RNDV);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
mca_bml_base_free(bml_btl, des );
|
||||
}
|
||||
@ -996,7 +997,7 @@ cannot_pack:
|
||||
#endif /* OMPI_WANT_PERUSE */
|
||||
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
|
||||
rc = mca_bml_base_send(bml_btl, des, MCA_PML_OB1_HDR_TYPE_FRAG);
|
||||
|
||||
if( OPAL_LIKELY(rc == OMPI_SUCCESS) ) {
|
||||
/* update state */
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user