Merge pull request #4419 from thananon/pr_newob1
pml/ob1: match callback will now queue wrong sequence frag and return.
Этот коммит содержится в:
Коммит
3f43e2c8df
@ -3,7 +3,7 @@
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2016 The University of Tennessee and The University
|
||||
* Copyright (c) 2004-2017 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
|
||||
@ -66,7 +66,7 @@ OBJ_CLASS_INSTANCE( mca_pml_ob1_recv_frag_t,
|
||||
*/
|
||||
|
||||
/**
|
||||
* Append a unexpected descriptor to a queue. This function will allocate and
|
||||
* Append an unexpected descriptor to a queue. This function will allocate and
|
||||
* initialize the fragment (if necessary) and then will add it to the specified
|
||||
* queue. The allocated fragment is not returned to the caller.
|
||||
*/
|
||||
@ -82,21 +82,92 @@ append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
}
|
||||
|
||||
/**
|
||||
* Append an unexpected descriptor to an ordered queue. This function will allocate and
|
||||
* initialize the fragment (if necessary) and then will add it to the specified
|
||||
* queue respecting the sequence number. The allocated fragment is not returned to the caller.
|
||||
*/
|
||||
static void
|
||||
append_frag_to_ordered_list(opal_list_t* queue, mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
|
||||
size_t num_segments, mca_pml_ob1_recv_frag_t* frag)
|
||||
{
|
||||
mca_pml_ob1_recv_frag_t* tmpfrag;
|
||||
mca_pml_ob1_match_hdr_t* tmphdr;
|
||||
|
||||
if(NULL == frag) {
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag);
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
|
||||
}
|
||||
|
||||
if( opal_list_is_empty(queue) ) { /* no pending fragments yet */
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
return;
|
||||
}
|
||||
/* Shortcut for sequence number earlier than the first fragment in the list */
|
||||
tmpfrag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(queue);
|
||||
tmphdr = &tmpfrag->hdr.hdr_match;
|
||||
assert(hdr->hdr_seq != tmphdr->hdr_seq);
|
||||
if( hdr->hdr_seq < tmphdr->hdr_seq ) {
|
||||
opal_list_prepend(queue, (opal_list_item_t*)frag);
|
||||
return;
|
||||
}
|
||||
/* Shortcut for sequence number later than the last fragment in the list */
|
||||
tmpfrag = (mca_pml_ob1_recv_frag_t*)opal_list_get_last(queue);
|
||||
tmphdr = &tmpfrag->hdr.hdr_match;
|
||||
if( hdr->hdr_seq > tmphdr->hdr_seq ) {
|
||||
opal_list_append(queue, (opal_list_item_t*)frag);
|
||||
return;
|
||||
}
|
||||
/* For all other cases (sequence number missing in the list) */
|
||||
OPAL_LIST_FOREACH(tmpfrag, queue, mca_pml_ob1_recv_frag_t) {
|
||||
tmphdr = &tmpfrag->hdr.hdr_match;
|
||||
if( hdr->hdr_seq < tmphdr->hdr_seq ) {
|
||||
opal_list_insert_pos(queue, (opal_list_item_t*)tmpfrag,
|
||||
(opal_list_item_t*) frag);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Match incoming recv_frags against posted receives.
|
||||
* Supports out of order delivery.
|
||||
*
|
||||
* @param frag_header (IN) Header of received recv_frag.
|
||||
* @param frag_desc (IN) Received recv_frag descriptor.
|
||||
* @param match_made (OUT) Flag indicating wether a match was made.
|
||||
* @param additional_matches (OUT) List of additional matches
|
||||
* @param hdr (IN) Header of received recv_frag.
|
||||
* @param segments (IN) Received recv_frag descriptor.
|
||||
* @param num_segments (IN) Flag indicating wether a match was made.
|
||||
* @param type (IN) Type of the message header.
|
||||
* @return OMPI_SUCCESS or error status on failure.
|
||||
*/
|
||||
static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments,
|
||||
int type);
|
||||
int type );
|
||||
|
||||
/**
|
||||
* Match incoming frags against posted receives. If frag is not NULL then we assume
|
||||
* it is already local and that it can be released upon completion.
|
||||
* Supports out of order delivery.
|
||||
*
|
||||
* @param comm_ptr (IN) Communicator where the message has been received
|
||||
* @param proc (IN) Proc for which we have received the message.
|
||||
* @param hdr (IN) Header of received recv_frag.
|
||||
* @param segments (IN) Received recv_frag descriptor.
|
||||
* @param num_segments (IN) Flag indicating wether a match was made.
|
||||
* @param type (IN) Type of the message header.
|
||||
* @return OMPI_SUCCESS or error status on failure.
|
||||
*/
|
||||
static int
|
||||
mca_pml_ob1_recv_frag_match_proc( mca_btl_base_module_t *btl,
|
||||
ompi_communicator_t* comm_ptr,
|
||||
mca_pml_ob1_comm_proc_t *proc,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments,
|
||||
int type,
|
||||
mca_pml_ob1_recv_frag_t* frag );
|
||||
|
||||
static mca_pml_ob1_recv_request_t*
|
||||
match_one(mca_btl_base_module_t *btl,
|
||||
@ -105,6 +176,19 @@ match_one(mca_btl_base_module_t *btl,
|
||||
mca_pml_ob1_comm_proc_t *proc,
|
||||
mca_pml_ob1_recv_frag_t* frag);
|
||||
|
||||
static inline mca_pml_ob1_recv_frag_t* check_cantmatch_for_match(mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
mca_pml_ob1_recv_frag_t *frag = NULL;
|
||||
|
||||
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
|
||||
if( (opal_list_get_end(&proc->frags_cant_match) != (opal_list_item_t*)frag) &&
|
||||
(frag->hdr.hdr_match.hdr_seq == proc->expected_sequence) ) {
|
||||
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
|
||||
return frag;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
mca_btl_base_tag_t tag,
|
||||
mca_btl_base_descriptor_t* des,
|
||||
@ -164,15 +248,16 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
OB1_MATCHING_LOCK(&comm->matching_lock);
|
||||
|
||||
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm_ptr)) {
|
||||
/* get sequence number of next message that can be processed */
|
||||
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
|
||||
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
|
||||
goto slow_path;
|
||||
}
|
||||
|
||||
/* This is the sequence number we were expecting, so we can try
|
||||
* matching it to already posted receives.
|
||||
/* get sequence number of next message that can be processed.
|
||||
* If this frag is out of sequence, queue it up in the list
|
||||
* now as we still have the lock.
|
||||
*/
|
||||
if(OPAL_UNLIKELY(((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence))) {
|
||||
append_frag_to_ordered_list(&proc->frags_cant_match, btl,
|
||||
hdr, segments, num_segments, NULL);
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
@ -183,14 +268,13 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
* generation until we reach the correct sequence number.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, NULL);
|
||||
|
||||
/* The match is over. We generate the SEARCH_POSTED_Q_END here,
|
||||
* before going into the mca_pml_ob1_check_cantmatch_for_match so
|
||||
* we can make a difference for the searching time for all
|
||||
* messages.
|
||||
* before going into check_cantmatch_for_match so we can make
|
||||
* a difference for the searching time for all messages.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
@ -246,12 +330,31 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
/* don't need a rmb as that is for checking */
|
||||
recv_request_pml_complete(match);
|
||||
}
|
||||
return;
|
||||
|
||||
slow_path:
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
mca_pml_ob1_recv_frag_match(btl, hdr, segments,
|
||||
num_segments, MCA_PML_OB1_HDR_TYPE_MATCH);
|
||||
/* We matched the frag, Now see if we already have the next sequence in
|
||||
* our OOS list. If yes, try to match it.
|
||||
*
|
||||
* NOTE:
|
||||
* To optimize the number of lock used, mca_pml_ob1_recv_frag_match_proc()
|
||||
* MUST be called with communicator lock and will RELEASE the lock. This is
|
||||
* not ideal but it is better for the performance.
|
||||
*/
|
||||
if(0 != opal_list_get_size(&proc->frags_cant_match)) {
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
OB1_MATCHING_LOCK(&comm->matching_lock);
|
||||
if((frag = check_cantmatch_for_match(proc))) {
|
||||
/* mca_pml_ob1_recv_frag_match_proc() will release the lock. */
|
||||
mca_pml_ob1_recv_frag_match_proc(frag->btl, comm_ptr, proc,
|
||||
&frag->hdr.hdr_match,
|
||||
frag->segments, frag->num_segments,
|
||||
hdr->hdr_common.hdr_type, frag);
|
||||
} else {
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -590,31 +693,6 @@ match_one(mca_btl_base_module_t *btl,
|
||||
} while(true);
|
||||
}
|
||||
|
||||
static mca_pml_ob1_recv_frag_t* check_cantmatch_for_match(mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
mca_pml_ob1_recv_frag_t *frag;
|
||||
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
|
||||
frag != (mca_pml_ob1_recv_frag_t*)opal_list_get_end(&proc->frags_cant_match);
|
||||
frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_next(frag))
|
||||
{
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
if(hdr->hdr_seq != proc->expected_sequence)
|
||||
continue;
|
||||
|
||||
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
|
||||
return frag;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
*
|
||||
@ -652,12 +730,11 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
int type)
|
||||
{
|
||||
/* local variables */
|
||||
uint16_t next_msg_seq_expected, frag_msg_seq;
|
||||
uint16_t frag_msg_seq;
|
||||
uint16_t next_msg_seq_expected;
|
||||
ompi_communicator_t *comm_ptr;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
mca_pml_ob1_comm_t *comm;
|
||||
mca_pml_ob1_comm_proc_t *proc;
|
||||
mca_pml_ob1_recv_frag_t* frag = NULL;
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
|
||||
@ -676,14 +753,13 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq = hdr->hdr_seq;
|
||||
proc = mca_pml_ob1_peer_lookup (comm_ptr, hdr->hdr_src);
|
||||
|
||||
/**
|
||||
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
|
||||
* fragment arrival. Independing if it is received on the correct order or not.
|
||||
* This will allow the tools to figure out if the messages are not received in the
|
||||
* correct order (if multiple network interfaces).
|
||||
/* We generate the MSG_ARRIVED event as soon as the PML is aware
|
||||
* of a matching fragment arrival. Independing if it is received
|
||||
* on the correct order or not. This will allow the tools to
|
||||
* figure out if the messages are not received in the correct
|
||||
* order (if multiple network interfaces).
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
@ -697,38 +773,67 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
*/
|
||||
OB1_MATCHING_LOCK(&comm->matching_lock);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
frag_msg_seq = hdr->hdr_seq;
|
||||
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
|
||||
if(OPAL_UNLIKELY(frag_msg_seq != next_msg_seq_expected))
|
||||
goto wrong_seq;
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
/* If the sequence number is wrong, queue it up for later. */
|
||||
if(OPAL_UNLIKELY(frag_msg_seq != next_msg_seq_expected)) {
|
||||
append_frag_to_ordered_list(&proc->frags_cant_match, btl, hdr, segments,
|
||||
num_segments, NULL);
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* mca_pml_ob1_recv_frag_match_proc() will release the lock. */
|
||||
return mca_pml_ob1_recv_frag_match_proc(btl, comm_ptr, proc, hdr,
|
||||
segments, num_segments,
|
||||
type, NULL);
|
||||
}
|
||||
|
||||
|
||||
/* mca_pml_ob1_recv_frag_match_proc() will match the given frag and
|
||||
* then try to match the next frag in sequence by looking into arrived
|
||||
* out of order frags in frags_cant_match list until it can't find one.
|
||||
*
|
||||
* ATTENTION: THIS FUNCTION MUST BE CALLED WITH COMMUNICATOR LOCK HELD.
|
||||
* THE LOCK WILL BE RELEASED UPON RETURN. USE WITH CARE. */
|
||||
static int
|
||||
mca_pml_ob1_recv_frag_match_proc( mca_btl_base_module_t *btl,
|
||||
ompi_communicator_t* comm_ptr,
|
||||
mca_pml_ob1_comm_proc_t *proc,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_btl_base_segment_t* segments,
|
||||
size_t num_segments,
|
||||
int type,
|
||||
mca_pml_ob1_recv_frag_t* frag )
|
||||
{
|
||||
/* local variables */
|
||||
mca_pml_ob1_comm_t* comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
|
||||
/* If we are here, this is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted receives.
|
||||
*/
|
||||
|
||||
out_of_order_match:
|
||||
match_this_frag:
|
||||
/* We're now expecting the next sequence number. */
|
||||
proc->expected_sequence++;
|
||||
|
||||
/**
|
||||
* We generate the SEARCH_POSTED_QUEUE only when the message is received
|
||||
* in the correct sequence. Otherwise, we delay the event generation until
|
||||
* we reach the correct sequence number.
|
||||
/* We generate the SEARCH_POSTED_QUEUE only when the message is
|
||||
* received in the correct sequence. Otherwise, we delay the event
|
||||
* generation until we reach the correct sequence number.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, frag);
|
||||
|
||||
/**
|
||||
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
|
||||
* into the mca_pml_ob1_check_cantmatch_for_match so we can make a difference
|
||||
* for the searching time for all messages.
|
||||
/* The match is over. We generate the SEARCH_POSTED_Q_END here,
|
||||
* before going into check_cantmatch_for_match we can make a
|
||||
* difference for the searching time for all messages.
|
||||
*/
|
||||
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
|
||||
|
||||
/* release matching lock before processing fragment */
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
@ -752,7 +857,7 @@ out_of_order_match:
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
* any fragments on the frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
|
||||
@ -763,20 +868,11 @@ out_of_order_match:
|
||||
num_segments = frag->num_segments;
|
||||
btl = frag->btl;
|
||||
type = hdr->hdr_common.hdr_type;
|
||||
goto out_of_order_match;
|
||||
goto match_this_frag;
|
||||
}
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
wrong_seq:
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
|
||||
num_segments, NULL);
|
||||
OB1_MATCHING_UNLOCK(&comm->matching_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user