Rewrite OB1 matching logic. Get rid of macros, make the code shorter.
This commit was SVN r16993.
Этот коммит содержится в:
родитель
2b48f42637
Коммит
35bf8c7c46
@ -155,244 +155,159 @@ void mca_pml_ob1_recv_frag_callback( mca_btl_base_module_t* btl,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to a generic
|
||||
* list of receives
|
||||
*
|
||||
* @param hdr Matching data from received fragment (IN)
|
||||
*
|
||||
* @param generic_receives Pointer to the receive list used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
#define MCA_PML_OB1_MATCH_GENERIC_RECEIVES(hdr,generic_receives,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
mca_pml_ob1_recv_request_t *generic_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_first(generic_receives); \
|
||||
mca_pml_ob1_recv_request_t *last_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_end(generic_receives); \
|
||||
register int recv_tag, frag_tag = hdr->hdr_tag; \
|
||||
\
|
||||
/* Loop over the receives. If the received tag is less than zero */ \
|
||||
/* enter in a special mode, where we match only our internal tags */ \
|
||||
/* (such as those used by the collectives.*/ \
|
||||
if( 0 <= frag_tag ) { \
|
||||
for( ; generic_recv != last_recv; \
|
||||
generic_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((opal_list_item_t *)generic_recv)->opal_list_next) { \
|
||||
/* Check for a match */ \
|
||||
recv_tag = generic_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == recv_tag) || (recv_tag == OMPI_ANY_TAG) ) { \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} else { \
|
||||
for( ; generic_recv != last_recv; \
|
||||
generic_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((opal_list_item_t *)generic_recv)->opal_list_next) { \
|
||||
/* Check for a match */ \
|
||||
recv_tag = generic_recv->req_recv.req_base.req_tag; \
|
||||
if( OPAL_UNLIKELY(frag_tag == recv_tag) ) { \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
if( generic_recv != (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_end(generic_receives) ) { \
|
||||
\
|
||||
/* Match made */ \
|
||||
return_match = generic_recv; \
|
||||
\
|
||||
/* remove descriptor from posted specific ireceive list */ \
|
||||
opal_list_remove_item(generic_receives, \
|
||||
(opal_list_item_t *)generic_recv); \
|
||||
PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q, \
|
||||
&(generic_recv->req_recv.req_base), \
|
||||
PERUSE_RECV); \
|
||||
\
|
||||
} \
|
||||
} while(0)
|
||||
#define PML_MAX_SEQ ~((mca_pml_sequence_t)0);
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
static inline mca_pml_ob1_recv_request_t* get_posted_recv(opal_list_t *queue)
|
||||
{
|
||||
if(opal_list_get_size(queue) == 0)
|
||||
return NULL;
|
||||
|
||||
#define MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local parameters */ \
|
||||
opal_list_t* wild_receives = &comm->wild_receives; \
|
||||
MCA_PML_OB1_MATCH_GENERIC_RECEIVES(hdr,wild_receives,proc,return_match); \
|
||||
} while(0)
|
||||
return (mca_pml_ob1_recv_request_t*)opal_list_get_first(queue);
|
||||
}
|
||||
|
||||
static inline mca_pml_ob1_recv_request_t* get_next_posted_recv(
|
||||
opal_list_t *queue,
|
||||
mca_pml_ob1_recv_request_t* req)
|
||||
{
|
||||
opal_list_item_t *i = opal_list_get_next((opal_list_item_t*)req);
|
||||
|
||||
if(opal_list_get_end(queue) == i)
|
||||
return NULL;
|
||||
|
||||
return (mca_pml_ob1_recv_request_t*)i;
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_request_t *match_incomming(
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_pml_ob1_comm_t *comm,
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
mca_pml_ob1_recv_request_t *specific_recv, *wild_recv;
|
||||
mca_pml_sequence_t wild_recv_seq, specific_recv_seq;
|
||||
int tag = hdr->hdr_tag;
|
||||
|
||||
specific_recv = get_posted_recv(&proc->specific_receives);
|
||||
wild_recv = get_posted_recv(&comm->wild_receives);
|
||||
|
||||
wild_recv_seq = wild_recv ?
|
||||
wild_recv->req_recv.req_base.req_sequence : PML_MAX_SEQ;
|
||||
specific_recv_seq = specific_recv ?
|
||||
specific_recv->req_recv.req_base.req_sequence : PML_MAX_SEQ;
|
||||
|
||||
/* they are equal only if both are PML_MAX_SEQ */
|
||||
while(wild_recv_seq != specific_recv_seq) {
|
||||
mca_pml_ob1_recv_request_t **match;
|
||||
opal_list_t *queue;
|
||||
int req_tag;
|
||||
mca_pml_sequence_t *seq;
|
||||
|
||||
if (OPAL_UNLIKELY(wild_recv_seq < specific_recv_seq)) {
|
||||
match = &wild_recv;
|
||||
queue = &comm->wild_receives;
|
||||
seq = &wild_recv_seq;
|
||||
} else {
|
||||
match = &specific_recv;
|
||||
queue = &proc->specific_receives;
|
||||
seq = &specific_recv_seq;
|
||||
}
|
||||
|
||||
req_tag = (*match)->req_recv.req_base.req_tag;
|
||||
if(req_tag == tag || (req_tag == OMPI_ANY_TAG && tag >= 0)) {
|
||||
opal_list_remove_item(queue, (opal_list_item_t*)(*match));
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
|
||||
&((*match)->req_recv.req_base), PERUSE_RECV);
|
||||
return *match;
|
||||
}
|
||||
|
||||
*match = get_next_posted_recv(queue, *match);
|
||||
*seq = (*match) ? (*match)->req_recv.req_base.req_sequence : PML_MAX_SEQ;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, mca_pml_ob1_recv_frag_t* frag)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if(NULL == frag) {
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
|
||||
}
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_request_t *match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, ompi_communicator_t *comm_ptr,
|
||||
mca_pml_ob1_comm_proc_t *proc,
|
||||
mca_pml_ob1_recv_frag_t* frag)
|
||||
{
|
||||
mca_pml_ob1_recv_request_t *match;
|
||||
mca_pml_ob1_comm_t *comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
do {
|
||||
match = match_incomming(hdr, comm, proc);
|
||||
|
||||
/* if match found, process data */
|
||||
if(OPAL_UNLIKELY(NULL == match)) {
|
||||
/* if no match found, place on unexpected queue */
|
||||
append_frag_to_list(&proc->unexpected_frags, btl, hdr, segments,
|
||||
num_segments, frag);
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
match->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
|
||||
if(MCA_PML_REQUEST_PROBE == match->req_recv.req_base.req_type) {
|
||||
/* complete the probe */
|
||||
mca_pml_ob1_recv_request_matched_probe(match, btl, segments,
|
||||
num_segments);
|
||||
/* attempt to match actual request */
|
||||
continue;
|
||||
}
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_MSG_MATCH_POSTED_REQ,
|
||||
&(match->req_recv.req_base), PERUSE_RECV);
|
||||
break;
|
||||
} while(true);
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_frag_t *check_cantmatch_for_match(
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
/* local parameters */
|
||||
mca_pml_ob1_recv_frag_t *frag;
|
||||
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag = (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_first(&proc->frags_cant_match);
|
||||
frag != (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_end(&proc->frags_cant_match);
|
||||
frag = (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_next(frag))
|
||||
{
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
if(hdr->hdr_seq != proc->expected_sequence)
|
||||
continue;
|
||||
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "specific" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
opal_list_t* specific_receives = &proc->specific_receives; \
|
||||
MCA_PML_OB1_MATCH_GENERIC_RECEIVES(hdr,specific_receives,proc,return_match); \
|
||||
} while(0)
|
||||
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
|
||||
return frag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives and "specific" receives. Used when both types
|
||||
* of receives have been posted, i.e. when we need to coordinate
|
||||
* between multiple lists to make sure ordered delivery occurs.
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
mca_pml_ob1_recv_request_t *specific_recv, *wild_recv; \
|
||||
mca_pml_sequence_t wild_recv_seq, specific_recv_seq; \
|
||||
int frag_tag, wild_recv_tag, specific_recv_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* We know that when this is called, both specific and wild irecvs \
|
||||
* have been posted. \
|
||||
*/ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_first(&(proc)->specific_receives); \
|
||||
wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_first(&comm->wild_receives); \
|
||||
\
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
while (true) { \
|
||||
if (wild_recv_seq < specific_recv_seq) { \
|
||||
/* wild recv is earlier than the specific one. */ \
|
||||
/* try and match */ \
|
||||
wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == wild_recv_tag) || \
|
||||
( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
|
||||
/* Match made */ \
|
||||
return_match=wild_recv; \
|
||||
\
|
||||
/* remove this recv from the wild receive queue */ \
|
||||
opal_list_remove_item(&comm->wild_receives, \
|
||||
(opal_list_item_t *)wild_recv); \
|
||||
\
|
||||
PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q, \
|
||||
&(wild_recv->req_recv.req_base), \
|
||||
PERUSE_RECV); \
|
||||
\
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* No match, go to the next */ \
|
||||
wild_recv=(mca_pml_ob1_recv_request_t *) \
|
||||
((opal_list_item_t *)wild_recv)->opal_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last wild one, just look at the \
|
||||
* rest of the specific ones. \
|
||||
*/ \
|
||||
if (wild_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_end(&comm->wild_receives) ) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
} else { \
|
||||
/* specific recv is earlier than the wild one. */ \
|
||||
specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == specific_recv_tag) || \
|
||||
( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \
|
||||
{ \
|
||||
/* Match made */ \
|
||||
return_match = specific_recv; \
|
||||
/* remove descriptor from specific receive list */ \
|
||||
opal_list_remove_item(&(proc)->specific_receives, \
|
||||
(opal_list_item_t *)specific_recv); \
|
||||
\
|
||||
PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q, \
|
||||
&(specific_recv->req_recv.req_base), \
|
||||
PERUSE_RECV); \
|
||||
\
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* No match, go on to the next specific irecv. */ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((opal_list_item_t *)specific_recv)->opal_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last specific irecv, process the \
|
||||
* rest of the wild ones. \
|
||||
*/ \
|
||||
if (specific_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
opal_list_get_end(&(proc)->specific_receives)) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/*
|
||||
* Specialized matching routines for internal use only.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match( opal_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
@ -435,13 +350,11 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
mca_pml_ob1_comm_t *comm;
|
||||
mca_pml_ob1_comm_proc_t *proc;
|
||||
bool additional_match=false;
|
||||
opal_list_t additional_matches;
|
||||
int rc;
|
||||
mca_pml_ob1_recv_frag_t* frag = NULL;
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
if( OPAL_UNLIKELY(NULL == comm_ptr) ) {
|
||||
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
|
||||
/* This is a special case. A message for a not yet exiting communicator can
|
||||
* happens, but right now we segfault. Instead, and until we find a better
|
||||
* solution, just drop the message. However, in the near future we should
|
||||
@ -456,7 +369,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq = hdr->hdr_seq;
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
proc = &comm->procs[hdr->hdr_src];
|
||||
|
||||
/**
|
||||
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
|
||||
@ -464,7 +377,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
* This will allow the tools to figure out if the messages are not received in the
|
||||
* correct order (if multiple network interfaces).
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_MSG_ARRIVED, comm_ptr,
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
@ -478,288 +391,71 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
|
||||
if( OPAL_LIKELY(frag_msg_seq == next_msg_seq_expected) ) {
|
||||
if(OPAL_UNLIKELY(frag_msg_seq != next_msg_seq_expected))
|
||||
goto wrong_seq;
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
out_of_order_match:
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
|
||||
/**
|
||||
* We generate the SEARCH_POSTED_QUEUE only when the message is received
|
||||
* in the correct sequence. Otherwise, we delay the event generation until
|
||||
* we reach the correct sequence number.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
/**
|
||||
* We generate the SEARCH_POSTED_QUEUE only when the message is received
|
||||
* in the correct sequence. Otherwise, we delay the event generation until
|
||||
* we reach the correct sequence number.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
rematch:
|
||||
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, frag);
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
if (opal_list_get_size(&proc->specific_receives) == 0 ){
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
|
||||
} else if (opal_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
}
|
||||
/**
|
||||
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
|
||||
* into the mca_pml_ob1_check_cantmatch_for_match so we can make a difference
|
||||
* for the searching time for all messages.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* if match found, process data */
|
||||
if( OPAL_LIKELY(match) ) {
|
||||
match->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
|
||||
/*
|
||||
* update delivered sequence number information, if needed.
|
||||
*/
|
||||
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
|
||||
|
||||
/* complete the probe */
|
||||
mca_pml_ob1_recv_request_matched_probe(match,btl,segments,num_segments);
|
||||
|
||||
/* attempt to match actual request */
|
||||
match = NULL;
|
||||
goto rematch;
|
||||
} else {
|
||||
if( (match->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE) ) {
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_MSG_MATCH_POSTED_REQ,
|
||||
&(match->req_recv.req_base), PERUSE_RECV);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
/**
|
||||
* As we return from the match function, we should generate the expected event.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl);
|
||||
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag );
|
||||
}
|
||||
|
||||
/**
|
||||
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
|
||||
* into the mca_pml_ob1_check_cantmatch_for_match so we can make a difference
|
||||
* for the searching time for all messages.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if( OPAL_UNLIKELY(0 < opal_list_get_size(&proc->frags_cant_match)) ) {
|
||||
additional_match = mca_pml_ob1_check_cantmatch_for_match(&additional_matches,comm,proc);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if( OPAL_UNLIKELY(OMPI_SUCCESS != rc) ) {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl);
|
||||
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
|
||||
|
||||
}
|
||||
/* release matching lock before processing fragment */
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
if( OPAL_LIKELY(match != NULL) ) {
|
||||
mca_pml_ob1_recv_request_progress(match,btl,segments,num_segments);
|
||||
} else {
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
}
|
||||
if( OPAL_UNLIKELY(additional_match) ) {
|
||||
opal_list_item_t* item;
|
||||
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
|
||||
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
mca_pml_ob1_recv_request_progress( frag->request, frag->btl, frag->segments,
|
||||
frag->num_segments );
|
||||
if(OPAL_LIKELY(match)) {
|
||||
mca_pml_ob1_recv_request_progress(match, btl, segments, num_segments);
|
||||
if(OPAL_UNLIKELY(frag))
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
if((frag = check_cantmatch_for_match(proc))) {
|
||||
hdr = &frag->hdr.hdr_match;
|
||||
segments = frag->segments;
|
||||
num_segments = frag->num_segments;
|
||||
btl = frag->btl;
|
||||
goto out_of_order_match;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
wrong_seq:
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
|
||||
num_segments, NULL);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Scan the list of frags that came in ahead of time to see if any
|
||||
* can be processed at this time. If they can, try and match the
|
||||
* frags.
|
||||
*
|
||||
* @param additional_matches List to hold new matches with fragments
|
||||
* from the c_frags_cant_match list. (IN/OUT)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match( opal_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc )
|
||||
{
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
uint16_t next_msg_seq_expected, frag_seq;
|
||||
mca_pml_ob1_recv_frag_t *frag;
|
||||
bool match_made = false;
|
||||
|
||||
/*
|
||||
* Loop over all the out of sequence messages. No ordering is assumed
|
||||
* in the c_frags_cant_match list.
|
||||
*/
|
||||
|
||||
match_found = 1;
|
||||
while ((0 < opal_list_get_size(&proc->frags_cant_match)) && match_found) {
|
||||
|
||||
/* initialize match flag for this search */
|
||||
match_found = 0;
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = proc->expected_sequence;
|
||||
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag = (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_first(&proc->frags_cant_match);
|
||||
frag != (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_end(&proc->frags_cant_match);
|
||||
frag = (mca_pml_ob1_recv_frag_t *)
|
||||
opal_list_get_next(frag))
|
||||
{
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
frag_seq=frag->hdr.hdr_match.hdr_seq;
|
||||
if (frag_seq == next_msg_seq_expected) {
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
|
||||
/* signal that match was made */
|
||||
match_found = 1;
|
||||
|
||||
/*
|
||||
* remove frag from list
|
||||
*/
|
||||
opal_list_remove_item(&proc->frags_cant_match,
|
||||
(opal_list_item_t *)frag);
|
||||
|
||||
rematch:
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
if (opal_list_get_size(&proc->specific_receives) == 0 ) {
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else if (opal_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if( OPAL_LIKELY(match) ) {
|
||||
match->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
|
||||
/*
|
||||
* If this was a probe need to queue fragment on unexpected list
|
||||
*/
|
||||
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
|
||||
|
||||
/* complete the probe */
|
||||
mca_pml_ob1_recv_request_matched_probe(match,frag->btl,frag->segments,frag->num_segments);
|
||||
|
||||
/* retry the match */
|
||||
match = NULL;
|
||||
goto rematch;
|
||||
|
||||
} else {
|
||||
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag->request=match;
|
||||
|
||||
/* add this fragment descriptor to the list of
|
||||
* descriptors to be processed later
|
||||
*/
|
||||
if(match_made == false) {
|
||||
match_made = true;
|
||||
OBJ_CONSTRUCT(additional_matches, opal_list_t);
|
||||
}
|
||||
opal_list_append(additional_matches, (opal_list_item_t *)frag);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag);
|
||||
}
|
||||
|
||||
/* c_frags_cant_match is not an ordered list, so exit loop
|
||||
* and re-start search for next sequence number */
|
||||
break;
|
||||
|
||||
} /* end if (frag_seq == next_msg_seq_expected) */
|
||||
|
||||
} /* end for (frag) loop */
|
||||
|
||||
} /* end while loop */
|
||||
|
||||
return match_made;
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,6 @@ typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t;
|
||||
struct mca_pml_ob1_recv_frag_t {
|
||||
ompi_free_list_item_t super;
|
||||
mca_pml_ob1_hdr_t hdr;
|
||||
struct mca_pml_ob1_recv_request_t* request;
|
||||
size_t num_segments;
|
||||
mca_btl_base_module_t* btl;
|
||||
mca_btl_base_segment_t segments[MCA_BTL_DES_MAX_SEGMENTS];
|
||||
|
@ -31,9 +31,6 @@
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "ompi/datatype/dt_arch.h"
|
||||
|
||||
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc);
|
||||
|
||||
void mca_pml_ob1_recv_request_process_pending(void)
|
||||
{
|
||||
mca_pml_ob1_recv_request_t* recvreq;
|
||||
@ -467,8 +464,6 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
recvreq->req_match_received = true;
|
||||
opal_atomic_wmb();
|
||||
break;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
@ -493,8 +488,6 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
|
||||
bytes_received,
|
||||
bytes_delivered );
|
||||
}
|
||||
recvreq->req_match_received = true;
|
||||
opal_atomic_wmb();
|
||||
break;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_RGET:
|
||||
@ -502,7 +495,6 @@ void mca_pml_ob1_recv_request_progress( mca_pml_ob1_recv_request_t* recvreq,
|
||||
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
|
||||
mca_pml_ob1_recv_request_rget(recvreq, btl, &hdr->hdr_rget);
|
||||
recvreq->req_match_received = true;
|
||||
return;
|
||||
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
@ -720,71 +712,72 @@ int mca_pml_ob1_recv_request_schedule_once(
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* This routine is used to match a posted receive when the source process
|
||||
* is specified.
|
||||
*/
|
||||
#define IS_PROB_REQ(R) \
|
||||
((MCA_PML_REQUEST_IPROBE == (R)->req_recv.req_base.req_type) || \
|
||||
(MCA_PML_REQUEST_PROBE == (R)->req_recv.req_base.req_type))
|
||||
|
||||
void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request)
|
||||
inline void append_recv_req_to_queue(opal_list_t*, mca_pml_ob1_recv_request_t*);
|
||||
mca_pml_ob1_recv_frag_t *recv_req_match_specific_proc(
|
||||
const mca_pml_ob1_recv_request_t*, mca_pml_ob1_comm_proc_t*);
|
||||
|
||||
inline void append_recv_req_to_queue(opal_list_t *queue,
|
||||
mca_pml_ob1_recv_request_t *req)
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
/* check for a specific match */
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
if(OPAL_UNLIKELY(req->req_recv.req_base.req_type == MCA_PML_REQUEST_IPROBE))
|
||||
return;
|
||||
|
||||
opal_list_append(queue, (opal_list_item_t*)req);
|
||||
|
||||
/**
|
||||
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
|
||||
* the cost of the request lock.
|
||||
* We don't want to generate this kind of event for MPI_Probe. Hopefully,
|
||||
* the compiler will optimize out the empty if loop in the case where PERUSE
|
||||
* support is not required by the user.
|
||||
*/
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_BEGIN,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
/* assign sequence number */
|
||||
request->req_recv.req_base.req_sequence = comm->recv_sequence++;
|
||||
|
||||
if (opal_list_get_size(&proc->unexpected_frags) > 0 &&
|
||||
(frag = mca_pml_ob1_recv_request_match_specific_proc(request, proc)) != NULL) {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||
mca_pml_ob1_recv_request_progress(request,frag->btl,frag->segments,frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
} else {
|
||||
mca_pml_ob1_recv_request_matched_probe(request,frag->btl,frag->segments,frag->num_segments);
|
||||
}
|
||||
return; /* match found */
|
||||
if(req->req_recv.req_base.req_type != MCA_PML_REQUEST_PROBE) {
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_INSERT_IN_POSTED_Q,
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
}
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
/* We didn't find any matches. Record this irecv so we can match
|
||||
* it when the message comes in.
|
||||
*/
|
||||
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE) {
|
||||
opal_list_append(&proc->specific_receives, (opal_list_item_t*)request);
|
||||
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_PROBE) {
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_INSERT_IN_POSTED_Q,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* this routine tries to match a posted receive. If a match is found,
|
||||
* it places the request in the appropriate matched receive list. This
|
||||
* function has to be called with the communicator matching lock held.
|
||||
*/
|
||||
mca_pml_ob1_recv_frag_t *recv_req_match_specific_proc(
|
||||
const mca_pml_ob1_recv_request_t *req,
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
opal_list_t* unexpected_frags = &proc->unexpected_frags;
|
||||
opal_list_item_t *i;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
int tag = req->req_recv.req_base.req_tag;
|
||||
|
||||
if(opal_list_get_size(unexpected_frags) == 0)
|
||||
return NULL;
|
||||
|
||||
for (i = opal_list_get_first(unexpected_frags);
|
||||
i != opal_list_get_end(unexpected_frags);
|
||||
i = opal_list_get_next(i)) {
|
||||
frag = (mca_pml_ob1_recv_frag_t*)i;
|
||||
|
||||
if(frag->hdr.hdr_match.hdr_tag == tag ||
|
||||
(OMPI_ANY_TAG == tag && frag->hdr.hdr_match.hdr_tag >= 0))
|
||||
return frag;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* this routine is used to try and match a wild posted receive - where
|
||||
* wild is determined by the value assigned to the source process
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
|
||||
static mca_pml_ob1_recv_frag_t *recv_req_match_wild(
|
||||
mca_pml_ob1_recv_request_t* req, mca_pml_ob1_comm_proc_t **p)
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc = comm->procs;
|
||||
size_t proc_count = comm->num_procs;
|
||||
size_t i;
|
||||
@ -795,123 +788,101 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
|
||||
* process, then an inner loop over the messages from the
|
||||
* process.
|
||||
*/
|
||||
for (i = 0; i < proc_count; i++) {
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
/* loop over messages from the current proc */
|
||||
if((frag = recv_req_match_specific_proc(req, &proc[i]))) {
|
||||
*p = &proc[i];
|
||||
return frag; /* match found */
|
||||
}
|
||||
}
|
||||
|
||||
*p = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
opal_list_t *queue;
|
||||
|
||||
/* init/re-init the request */
|
||||
req->req_lock = 0;
|
||||
req->req_pipeline_depth = 0;
|
||||
req->req_bytes_received = 0;
|
||||
req->req_bytes_delivered = 0;
|
||||
/* What about req_rdma_cnt ? */
|
||||
req->req_rdma_idx = 0;
|
||||
req->req_pending = false;
|
||||
req->req_ack_sent = false;
|
||||
req->req_match_received = false;
|
||||
|
||||
MCA_PML_BASE_RECV_START(&req->req_recv.req_base);
|
||||
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
/**
|
||||
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
|
||||
* the cost of the request lock.
|
||||
*/
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_BEGIN,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_BEGIN,
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
|
||||
/* assign sequence number */
|
||||
request->req_recv.req_base.req_sequence = comm->recv_sequence++;
|
||||
req->req_recv.req_base.req_sequence = comm->recv_sequence++;
|
||||
|
||||
for (i = 0; i < proc_count; i++) {
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
/* continue if no frags to match */
|
||||
if (opal_list_get_size(&proc->unexpected_frags) == 0) {
|
||||
proc++;
|
||||
continue;
|
||||
/* attempt to match posted recv */
|
||||
if(req->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) {
|
||||
frag = recv_req_match_wild(req, &proc);
|
||||
queue = &comm->wild_receives;
|
||||
if(proc)
|
||||
req->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
} else {
|
||||
proc = &comm->procs[req->req_recv.req_base.req_peer];
|
||||
req->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
frag = recv_req_match_specific_proc(req, proc);
|
||||
queue = &proc->specific_receives;
|
||||
/* wild cardrecv will be prepared on match */
|
||||
if((0 != req->req_recv.req_base.req_datatype->size) &&
|
||||
(0 != req->req_recv.req_base.req_count)) {
|
||||
prepare_recv_req_converter(req);
|
||||
}
|
||||
}
|
||||
|
||||
/* loop over messages from the current proc */
|
||||
if ((frag = mca_pml_ob1_recv_request_match_specific_proc(request, proc)) != NULL) {
|
||||
if(OPAL_UNLIKELY(NULL == frag)) {
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
/* We didn't find any matches. Record this irecv so we can match
|
||||
it when the message comes in. */
|
||||
append_recv_req_to_queue(queue, req);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
} else {
|
||||
if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_REMOVE_FROM_UNEX_Q,
|
||||
req->req_recv.req_base.req_comm, hdr->hdr_src, hdr->hdr_tag,
|
||||
PERUSE_RECV);
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
|
||||
opal_list_remove_item(&proc->unexpected_frags,
|
||||
(opal_list_item_t*)frag);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||
mca_pml_ob1_recv_request_progress(request,frag->btl,frag->segments,frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
} else {
|
||||
mca_pml_ob1_recv_request_matched_probe(request,frag->btl,frag->segments,frag->num_segments);
|
||||
}
|
||||
return; /* match found */
|
||||
}
|
||||
proc++;
|
||||
}
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_SEARCH_UNEX_Q_END,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
/* We didn't find any matches. Record this irecv so we can match to
|
||||
* it when the message comes in.
|
||||
*/
|
||||
|
||||
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE) {
|
||||
opal_list_append(&comm->wild_receives, (opal_list_item_t*)request);
|
||||
/**
|
||||
* We don't want to generate this kind of event for MPI_Probe. Hopefully,
|
||||
* the compiler will optimize out the empty if loop in the case where PERUSE
|
||||
* support is not required by the user.
|
||||
*/
|
||||
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_PROBE) {
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_INSERT_IN_POSTED_Q,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
mca_pml_ob1_recv_request_progress(req, frag->btl, frag->segments,
|
||||
frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
} else {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
|
||||
frag->segments, frag->num_segments);
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* this routine tries to match a posted receive. If a match is found,
|
||||
* it places the request in the appropriate matched receive list. This
|
||||
* function has to be called with the communicator matching lock held.
|
||||
*/
|
||||
|
||||
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
mca_pml_ob1_recv_request_t* request,
|
||||
mca_pml_ob1_comm_proc_t* proc)
|
||||
{
|
||||
opal_list_t* unexpected_frags = &proc->unexpected_frags;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_match_hdr_t* hdr;
|
||||
int tag = request->req_recv.req_base.req_tag;
|
||||
|
||||
if( OMPI_ANY_TAG == tag ) {
|
||||
for (frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_recv_frag_t*)opal_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_next(frag)) {
|
||||
hdr = &(frag->hdr.hdr_match);
|
||||
|
||||
/* check first frag - we assume that process matching has been done already */
|
||||
if( hdr->hdr_tag >= 0 ) {
|
||||
goto find_fragment;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_recv_frag_t*)opal_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_next(frag)) {
|
||||
hdr = &(frag->hdr.hdr_match);
|
||||
|
||||
/* check first frag - we assume that process matching has been done already */
|
||||
if ( tag == hdr->hdr_tag ) {
|
||||
/* we assume that the tag is correct from MPI point of view (ie. >= 0 ) */
|
||||
goto find_fragment;
|
||||
}
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
find_fragment:
|
||||
request->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_MATCH_UNEX,
|
||||
&(request->req_recv.req_base), PERUSE_RECV );
|
||||
|
||||
PERUSE_TRACE_MSG_EVENT( PERUSE_COMM_MSG_REMOVE_FROM_UNEX_Q,
|
||||
request->req_recv.req_base.req_comm,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV );
|
||||
opal_list_remove_item(unexpected_frags, (opal_list_item_t*)frag);
|
||||
frag->request = request;
|
||||
}
|
||||
|
||||
return frag;
|
||||
}
|
||||
|
||||
|
@ -194,96 +194,42 @@ recv_request_pml_complete_check(mca_pml_ob1_recv_request_t *recvreq)
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to match the request against the unexpected fragment list
|
||||
* for all source ranks w/in the communicator.
|
||||
*
|
||||
* @param request (IN) Request to match.
|
||||
*/
|
||||
void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request);
|
||||
extern void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req);
|
||||
#define MCA_PML_OB1_RECV_REQUEST_START(r) mca_pml_ob1_recv_req_start(r)
|
||||
|
||||
/**
|
||||
* Attempt to match the request against the unexpected fragment list
|
||||
* for a specific source rank.
|
||||
*
|
||||
* @param request (IN) Request to match.
|
||||
*/
|
||||
void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request);
|
||||
static inline void prepare_recv_req_converter(mca_pml_ob1_recv_request_t *req)
|
||||
{
|
||||
ompi_convertor_copy_and_prepare_for_recv(
|
||||
req->req_recv.req_base.req_proc->proc_convertor,
|
||||
req->req_recv.req_base.req_datatype,
|
||||
req->req_recv.req_base.req_count,
|
||||
req->req_recv.req_base.req_addr,
|
||||
0,
|
||||
&req->req_recv.req_base.req_convertor);
|
||||
ompi_convertor_get_unpacked_size(&req->req_recv.req_base.req_convertor,
|
||||
&req->req_bytes_delivered);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize diagnostic code for tracing rdma protocol timing
|
||||
*/
|
||||
#define MCA_PML_OB1_RECV_REQUEST_MATCHED(request, hdr) \
|
||||
recv_req_matched(request, hdr)
|
||||
|
||||
/**
|
||||
* Start an initialized request.
|
||||
*
|
||||
* @param request Receive request.
|
||||
* @return OMPI_SUCESS or error status on failure.
|
||||
*/
|
||||
#define MCA_PML_OB1_RECV_REQUEST_START(request) \
|
||||
do { \
|
||||
/* init/re-init the request */ \
|
||||
(request)->req_lock = 0; \
|
||||
(request)->req_pipeline_depth = 0; \
|
||||
(request)->req_bytes_received = 0; \
|
||||
(request)->req_bytes_delivered = 0; \
|
||||
/* What about req_rdma_cnt ? */ \
|
||||
(request)->req_rdma_idx = 0; \
|
||||
(request)->req_pending = false; \
|
||||
(request)->req_ack_sent = false; \
|
||||
(request)->req_match_received = false; \
|
||||
\
|
||||
MCA_PML_BASE_RECV_START( &(request)->req_recv.req_base ); \
|
||||
\
|
||||
/* attempt to match posted recv */ \
|
||||
if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \
|
||||
mca_pml_ob1_recv_request_match_wild(request); \
|
||||
} else { \
|
||||
(request)->req_recv.req_base.req_proc = \
|
||||
(request)->req_recv.req_base.req_comm->c_pml_comm->procs \
|
||||
[(request)->req_recv.req_base.req_peer].ompi_proc; \
|
||||
if( (0 != (request)->req_recv.req_base.req_datatype->size) && \
|
||||
(0 != (request)->req_recv.req_base.req_count) ) { \
|
||||
ompi_convertor_copy_and_prepare_for_recv( \
|
||||
(request)->req_recv.req_base.req_proc->proc_convertor, \
|
||||
(request)->req_recv.req_base.req_datatype, \
|
||||
(request)->req_recv.req_base.req_count, \
|
||||
(request)->req_recv.req_base.req_addr, \
|
||||
0, \
|
||||
&(request)->req_recv.req_base.req_convertor ); \
|
||||
ompi_convertor_get_unpacked_size( &(request)->req_recv.req_base.req_convertor, \
|
||||
&(request)->req_bytes_delivered ); \
|
||||
} \
|
||||
mca_pml_ob1_recv_request_match_specific(request); \
|
||||
} \
|
||||
} while (0)
|
||||
static inline void recv_req_matched(mca_pml_ob1_recv_request_t *req,
|
||||
mca_pml_ob1_match_hdr_t *hdr)
|
||||
{
|
||||
req->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
||||
req->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
||||
req->req_match_received = true;
|
||||
opal_atomic_wmb();
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_RECV_REQUEST_MATCHED( request, hdr ) \
|
||||
do { \
|
||||
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = (hdr)->hdr_src; \
|
||||
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
|
||||
\
|
||||
if((request)->req_recv.req_bytes_packed > 0) { \
|
||||
if( MPI_ANY_SOURCE == (request)->req_recv.req_base.req_peer ) { \
|
||||
ompi_convertor_copy_and_prepare_for_recv( \
|
||||
(request)->req_recv.req_base.req_proc->proc_convertor, \
|
||||
(request)->req_recv.req_base.req_datatype, \
|
||||
(request)->req_recv.req_base.req_count, \
|
||||
(request)->req_recv.req_base.req_addr, \
|
||||
0, \
|
||||
&(request)->req_recv.req_base.req_convertor ); \
|
||||
ompi_convertor_get_unpacked_size( &(request)->req_recv.req_base.req_convertor, \
|
||||
&(request)->req_bytes_delivered ); \
|
||||
} \
|
||||
PERUSE_TRACE_COMM_EVENT (PERUSE_COMM_REQ_XFER_BEGIN, \
|
||||
&((request)->req_recv.req_base), PERUSE_RECV); \
|
||||
} \
|
||||
} while (0)
|
||||
if(req->req_recv.req_bytes_packed > 0) {
|
||||
if(MPI_ANY_SOURCE == req->req_recv.req_base.req_peer) {
|
||||
/* non wildcard prepared during post recv */
|
||||
prepare_recv_req_converter(req);
|
||||
}
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_XFER_BEGIN,
|
||||
&req->req_recv.req_base, PERUSE_RECV);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user