continue to fill in the fragment matching logic.
This commit was SVN r311.
Этот коммит содержится в:
родитель
b8b37b18c8
Коммит
5f93493527
@ -6,15 +6,14 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "lam/constants.h"
|
||||
#include "lam/lfc/list.h"
|
||||
#include "lam/threads/mutex.h"
|
||||
#include "mpi/communicator/communicator.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/pml/base/pml_base_header.h"
|
||||
#include "mca/mpi/pml/base/pml_base_recvfrag.h"
|
||||
#include "mca/mpi/pml/base/pml_base_recvreq.h"
|
||||
#include "lam/threads/mutex.h"
|
||||
#include "lam/constants.h"
|
||||
#include "mpi/communicator/communicator.h"
|
||||
#include "mca/mpi/pml/base/pml_base_header.h"
|
||||
#include "mca/mpi/pml/base/pml_base_comm.h"
|
||||
#include "lam/lfc/list.h"
|
||||
#include "mca/mpi/ptl/base/ptl_base_match.h"
|
||||
|
||||
/**
|
||||
@ -51,61 +50,62 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_base_recv_frag_t *frag_desc, int *match_made,
|
||||
lam_list_t *additional_matches)
|
||||
{
|
||||
/* local variables */
|
||||
mca_pml_base_sequence_t frag_msg_seq_num,next_msg_seq_num_expected;
|
||||
lam_communicator_t *comm_ptr;
|
||||
mca_pml_base_recv_request_t *matched_receive;
|
||||
mca_pml_comm_t *pml_comm;
|
||||
int frag_src;
|
||||
/* local variables */
|
||||
mca_pml_base_sequence_t frag_msg_seq_num,next_msg_seq_num_expected;
|
||||
lam_communicator_t *comm_ptr;
|
||||
mca_pml_base_recv_request_t *matched_receive;
|
||||
mca_pml_comm_t *pml_comm;
|
||||
int frag_src;
|
||||
|
||||
/* initialization */
|
||||
*match_made=0;
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr=lam_comm_lookup(frag_header->hdr_base.hdr_contextid);
|
||||
/* communicator pointer */
|
||||
comm_ptr=lam_comm_lookup(frag_header->hdr_base.hdr_contextid);
|
||||
pml_comm=(mca_pml_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq_num = frag_header->hdr_msg_seq_num;
|
||||
/* source sequence number */
|
||||
frag_msg_seq_num = frag_header->hdr_msg_seq_num;
|
||||
|
||||
/* get fragment communicator source rank */
|
||||
frag_src = frag_header->hdr_frag_seq_num;
|
||||
/* get fragment communicator source rank */
|
||||
frag_src = frag_header->hdr_frag_seq_num;
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
* a frag from the same message a match is made only once.
|
||||
* Also, this prevents other posted receives (for a pair of
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
* a frag from the same message a match is made only once.
|
||||
* Also, this prevents other posted receives (for a pair of
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
THREAD_LOCK((pml_comm->c_matching_lock)+frag_src);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_num_expected = *((pml_comm->c_next_msg_seq_num)+frag_src);
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_num_expected = *((pml_comm->c_next_msg_seq_num)+frag_src);
|
||||
|
||||
if (frag_msg_seq_num == next_msg_seq_num_expected) {
|
||||
if (frag_msg_seq_num == next_msg_seq_num_expected) {
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(pml_comm->c_next_msg_seq_num[frag_src])++;
|
||||
/* We're now expecting the next sequence number. */
|
||||
(pml_comm->c_next_msg_seq_num[frag_src])++;
|
||||
|
||||
/* see if receive has already been posted */
|
||||
matched_receive = lam_check_recieves_for_match(frag_header);
|
||||
/* see if receive has already been posted */
|
||||
matched_receive = mca_ptl_base_check_recieves_for_match(frag_header,
|
||||
pml_comm);
|
||||
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
/*
|
||||
* if threaded, ok to release lock, since the posted
|
||||
* receive is not on any queue, so it won't be
|
||||
* matched again, and the fragment can be processed
|
||||
* w/o any conflict from other threads - locks will
|
||||
* be used where concurent access needs to be managed.
|
||||
*/
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
/*
|
||||
* if threaded, ok to release lock, since the posted
|
||||
* receive is not on any queue, so it won't be
|
||||
* matched again, and the fragment can be processed
|
||||
* w/o any conflict from other threads - locks will
|
||||
* be used where concurent access needs to be managed.
|
||||
*/
|
||||
|
||||
/* set flag indicating the input fragment was matched */
|
||||
*match_made=1;
|
||||
@ -113,13 +113,13 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
* descriptor */
|
||||
frag_desc->matched_recv=matched_receive;
|
||||
|
||||
/*
|
||||
* update deliverd sequence number information,
|
||||
* if need be.
|
||||
*/
|
||||
/*
|
||||
* update deliverd sequence number information,
|
||||
* if need be.
|
||||
*/
|
||||
|
||||
} else {
|
||||
/* if no match found, place on unexpected queue - need to
|
||||
} else {
|
||||
/* if no match found, place on unexpected queue - need to
|
||||
* lock to prevent probe from interfering with updating
|
||||
* the list */
|
||||
THREAD_LOCK((pml_comm->unexpected_frags_lock)+frag_src);
|
||||
@ -127,30 +127,30 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
(lam_list_item_t *)frag_desc);
|
||||
THREAD_UNLOCK((pml_comm->unexpected_frags_lock)+frag_src);
|
||||
|
||||
/* now that the fragment is on the list, ok to
|
||||
* release match - other matches may be attempted */
|
||||
/* now that the fragment is on the list, ok to
|
||||
* release match - other matches may be attempted */
|
||||
THREAD_UNLOCK((pml_comm->c_matching_lock)+frag_src);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if (lam_list_get_size((pml_comm->frags_cant_match)+frag_src)) {
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if (lam_list_get_size((pml_comm->frags_cant_match)+frag_src)) {
|
||||
/* initialize list to empty */
|
||||
lam_list_set_size(additional_matches,0);
|
||||
/* need to handle this -- lam_check_cantmatch_for_match();
|
||||
/* need to handle this -- lam_check_cantmatch_for_match();
|
||||
* */
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* mark message as done, if it has completed - need to mark
|
||||
* it this late to avoid a race condition with another thread
|
||||
* waiting to complete a recv, completing, and try to free the
|
||||
* communicator before the current thread is done referencing
|
||||
* this communicator - is this true ?
|
||||
*/
|
||||
/*
|
||||
* mark message as done, if it has completed - need to mark
|
||||
* it this late to avoid a race condition with another thread
|
||||
* waiting to complete a recv, completing, and try to free the
|
||||
* communicator before the current thread is done referencing
|
||||
* this communicator - is this true ?
|
||||
*/
|
||||
} else {
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
@ -166,3 +166,58 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upper level routine for matching a recieved message header to posted
|
||||
* receives.
|
||||
*
|
||||
* @param frag_header Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes.
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* try and match frag to posted receives The calling routine
|
||||
* garantees that no other thread will access the posted receive
|
||||
* queues while this routine is being executed.
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
mca_pml_base_recv_request_t *mca_ptl_base_check_recieves_for_match
|
||||
(mca_pml_base_reliable_hdr_t *frag_header, mca_pml_comm_t *pml_comm)
|
||||
{
|
||||
/* local parameters */
|
||||
mca_pml_base_recv_request_t *return_match;
|
||||
int frag_src;
|
||||
|
||||
/* initialization */
|
||||
return_match=(mca_pml_base_recv_request_t *)NULL;
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" recieves, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
frag_src = frag_header->hdr_frag_seq_num;
|
||||
|
||||
if (lam_list_get_size((pml_comm->specific_receives)+frag_src) == 0 ){
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
return_match = check_wild_receives_for_match(frag_header, pml_comm);
|
||||
} else if (lam_list_get_size(&(pml_comm->wild_receives)) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
return_match = check_specific_receives_for_match(frag_header,
|
||||
pml_comm);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
return_match = check_specific_and_wild_receives_for_match(frag_header,
|
||||
pml_comm);
|
||||
}
|
||||
|
||||
return return_match;
|
||||
}
|
||||
|
@ -9,8 +9,21 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_base_recv_frag_t *frag_desc, int *match_made,
|
||||
lam_list_t *additional_matches);
|
||||
|
||||
mca_pml_base_recv_request_t *lam_check_recieves_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header);
|
||||
mca_pml_base_recv_request_t *mca_ptl_base_check_recieves_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_comm_t *pml_comm);
|
||||
|
||||
mca_pml_base_recv_request_t *check_wild_receives_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_comm_t *pml_comm);
|
||||
|
||||
mca_pml_base_recv_request_t *check_specific_receives_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_comm_t *pml_comm);
|
||||
|
||||
mca_pml_base_recv_request_t *check_specific_and_wild_receives_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_comm_t *pml_comm);
|
||||
|
||||
#endif /* MCA_PTL_BASE_MATCH_H */
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user