add support for fragment matching in the teg pml.
This commit was SVN r290.
Этот коммит содержится в:
родитель
f68f52fdda
Коммит
889bf52ba4
@ -73,6 +73,9 @@ void lam_list_destroy(lam_list_t *list);
|
||||
#define lam_list_get_size(list) \
|
||||
((lam_list_t*)list)->lam_list_length
|
||||
|
||||
/* set list size */
|
||||
#define lam_list_set_size(list,size) \
|
||||
((lam_list_t*)list)->lam_list_length=(int)size;
|
||||
|
||||
/*
|
||||
* Returns first item on list, but does not remove it from the list.
|
||||
|
@ -13,6 +13,9 @@
|
||||
#include "lam/threads/mutex.h"
|
||||
#include "lam/constants.h"
|
||||
#include "mpi/communicator/communicator.h"
|
||||
#include "mca/mpi/ptl/base/match.h"
|
||||
#include "mca/mpi/pml/teg/comm.h"
|
||||
#include "lam/lfc/list.h"
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
@ -52,20 +55,21 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_base_sequence_t frag_msg_seq_num,next_msg_seq_num_expected;
|
||||
lam_communicator_t *comm_ptr;
|
||||
mca_pml_base_recv_request_t *matched_receive;
|
||||
mca_pml_comm_t *pml_comm;
|
||||
int frag_src;
|
||||
|
||||
/* initialization */
|
||||
*match_made=0;
|
||||
|
||||
#if (0)
|
||||
/* communicator pointer */
|
||||
comm_ptr=;
|
||||
comm_ptr=get_comm_ptr(frag_header->hdr_base.hdr_contextid);
|
||||
pml_comm=(mca_pml_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq_num = frag_header->p2p_rel_hdr_msg_seq_num;
|
||||
frag_msg_seq_num = frag_header->hdr_msg_seq_num;
|
||||
|
||||
/* get fragment communicator source rank */
|
||||
frag_src = frag_header->p2p_hdr_src_rank;
|
||||
frag_src = frag_header->hdr_frag_seq_num;
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
@ -74,10 +78,10 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
THREAD_LOCK(c_matching_lock+frag_src);
|
||||
THREAD_LOCK((pml_comm->c_matching_lock)+frag_src);
|
||||
|
||||
//! get sequence number of next message that can be processed
|
||||
next_msg_seq_num_expected = c_next_msg_seq_num[frag_src];
|
||||
next_msg_seq_num_expected = *((pml_comm->c_next_msg_seq_num)+frag_src);
|
||||
|
||||
if (frag_msg_seq_num == next_msg_seq_num_expected) {
|
||||
|
||||
@ -88,7 +92,7 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
c_next_msg_seq_num[frag_src]++;
|
||||
(pml_comm->c_next_msg_seq_num[frag_src])++;
|
||||
|
||||
/* see if receive has already been posted */
|
||||
matched_receive = lam_check_recieves_for_match(frag_header);
|
||||
@ -102,38 +106,30 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
* w/o any conflict from other threads - locks will
|
||||
* be used where concurent access needs to be managed.
|
||||
*/
|
||||
THREAD_UNLOCK(c_matching_lock+frag_src);
|
||||
|
||||
/*
|
||||
* Process received data. This routine:
|
||||
* - delivers data to the user buffers, if need be
|
||||
* including any required data checks.
|
||||
* - updates counters, such as number of bytes
|
||||
* delivered
|
||||
* - sets library completion flags, for completed
|
||||
* sends
|
||||
* - generates acks, if need be
|
||||
* - place the fragment descriptor on the appropriate
|
||||
* queue
|
||||
*/
|
||||
return_code=CDI_deliver_data();
|
||||
/* set flag indicating the input fragment was matched */
|
||||
*match_made=1;
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->matched_recv=matched_receive;
|
||||
|
||||
/*
|
||||
* update deliverd sequence number information,
|
||||
* if need be.
|
||||
*/
|
||||
|
||||
/*
|
||||
* send ack, if need be
|
||||
*/
|
||||
|
||||
} else {
|
||||
/* if no match found, place on unexpected queue */
|
||||
c_unexpected_frags[];
|
||||
/* if no match found, place on unexpected queue - need to
|
||||
* lock to prevent probe from interfering with updating
|
||||
* the list */
|
||||
THREAD_LOCK((pml_comm->unexpected_frags_lock)+frag_src);
|
||||
lam_list_append( ((pml_comm->unexpected_frags)+frag_src),
|
||||
(lam_list_item_t *)frag_desc);
|
||||
THREAD_UNLOCK((pml_comm->unexpected_frags_lock)+frag_src);
|
||||
|
||||
/* now that the fragment is on the list, ok to
|
||||
* release match - other matches may be attempted */
|
||||
THREAD_UNLOCK(c_matching_lock+frag_src);
|
||||
THREAD_UNLOCK((pml_comm->c_matching_lock)+frag_src);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -141,8 +137,11 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
* any fragments on the c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if (c_frags_cant_match[frag_src]->size() > 0) {
|
||||
lam_check_cantmatch_for_match();
|
||||
if (lam_list_get_size((pml_comm->frags_cant_match)+frag_src)) {
|
||||
/* initialize list to empty */
|
||||
lam_list_set_size(additional_matches,0);
|
||||
/* need to handle this -- lam_check_cantmatch_for_match();
|
||||
* */
|
||||
}
|
||||
|
||||
/*
|
||||
@ -157,14 +156,13 @@ int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
c_frags_cant_match[frag_src]->AppendNoLock(frag_header);
|
||||
lam_list_append( ((pml_comm->frags_cant_match)+frag_src),
|
||||
(lam_list_item_t *)frag_desc);
|
||||
|
||||
/* now that the fragment is on the list, ok to
|
||||
* release match - other matches may be attempted */
|
||||
|
||||
THREAD_UNLOCK(c_matching_lock+frag_src);
|
||||
THREAD_UNLOCK((pml_comm->c_matching_lock)+frag_src);
|
||||
}
|
||||
|
||||
#endif /* ifdef (0) */
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
16
src/mca/mpi/ptl/base/ptl_base_match.h
Обычный файл
16
src/mca/mpi/ptl/base/ptl_base_match.h
Обычный файл
@ -0,0 +1,16 @@
|
||||
/*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef MCA_PTL_BASE_MATCH_H
|
||||
#define MCA_PTL_BASE_MATCH_H
|
||||
|
||||
int mca_ptl_base_match(mca_pml_base_reliable_hdr_t *frag_header,
|
||||
mca_pml_base_recv_frag_t *frag_desc, int *match_made,
|
||||
lam_list_t *additional_matches);
|
||||
|
||||
mca_pml_base_recv_request_t *lam_check_recieves_for_match(
|
||||
mca_pml_base_reliable_hdr_t *frag_header);
|
||||
|
||||
#endif /* MCA_PTL_BASE_MATCH_H */
|
||||
|
@ -31,8 +31,7 @@ struct lam_communicator_t {
|
||||
MPI_Errhandler c_error_handler;
|
||||
|
||||
/* Hooks for PML to hang things */
|
||||
|
||||
struct mca_pml_comm_t* c_pml_comm;
|
||||
void* c_pml_comm;
|
||||
|
||||
/* Hooks for collectives to hang things */
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user