diff --git a/src/mca/pml/teg/src/Makefile.am b/src/mca/pml/teg/src/Makefile.am index 0d4f1365be..69e9e92d74 100644 --- a/src/mca/pml/teg/src/Makefile.am +++ b/src/mca/pml/teg/src/Makefile.am @@ -19,6 +19,7 @@ libmca_pml_teg_la_SOURCES = \ pml_teg_proc.c \ pml_teg_proc.h \ pml_teg_progress.c \ + pml_teg_recvfrag.c \ pml_teg_recvreq.c \ pml_teg_recvreq.h \ pml_teg_sendreq.c \ diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index 12147f7978..a6271929be 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -16,6 +16,7 @@ #include "pml_teg_ptl.h" #include "pml_teg_recvreq.h" #include "pml_teg_sendreq.h" +#include "pml_teg_recvfrag.h" mca_pml_teg_t mca_pml_teg = { @@ -112,7 +113,7 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls) } /* setup ptl */ - ptl->ptl_match = mca_ptl_base_recv_frag_match; + ptl->ptl_match = mca_pml_teg_recv_frag_match; ptl->ptl_send_progress = mca_pml_teg_send_request_progress; ptl->ptl_recv_progress = mca_pml_teg_recv_request_progress; ptl->ptl_stack = ptl; diff --git a/src/mca/pml/teg/src/pml_teg_proc.h b/src/mca/pml/teg/src/pml_teg_proc.h index 1d873bbb40..bfe9ffd950 100644 --- a/src/mca/pml/teg/src/pml_teg_proc.h +++ b/src/mca/pml/teg/src/pml_teg_proc.h @@ -59,5 +59,31 @@ static inline mca_pml_proc_t* mca_pml_teg_proc_lookup_remote(ompi_communicator_t return proc->proc_pml; } +/** + * Return the mca_ptl_peer_t instance corresponding to the process/ptl combination. + * + * @param comm Communicator + * @param rank Peer rank + * @return mca_pml_proc_t instance + */ + +static inline struct mca_ptl_base_peer_t* mca_pml_teg_proc_lookup_remote_peer( + ompi_communicator_t* comm, + int rank, + struct mca_ptl_base_module_t* ptl) +{ + ompi_proc_t* proc = comm->c_local_group->grp_proc_pointers[rank]; + mca_pml_proc_t* proc_pml = proc->proc_pml; + size_t i, size = mca_ptl_array_get_size(&proc_pml->proc_ptl_first); + mca_ptl_proc_t* proc_ptl = proc_pml->proc_ptl_first.ptl_procs; + for(i = 0; i < size; i++) { + if(proc_ptl->ptl == ptl) { + return proc_ptl->ptl_peer; + } + proc_ptl++; + } + return NULL; +} + #endif diff --git a/src/mca/pml/teg/src/pml_teg_recvfrag.c b/src/mca/pml/teg/src/pml_teg_recvfrag.c new file mode 100644 index 0000000000..dbed2cba6f --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_recvfrag.c @@ -0,0 +1,75 @@ +/* + * + */ +/** + * @file + */ +#include "pml_teg_recvfrag.h" +#include "pml_teg_proc.h" + + +extern ompi_class_t mca_ptl_base_recv_frag_t_class; + + +/** + * Called by the PTL to match attempt a match for new fragments. + * + * @param ptl (IN) The PTL pointer + * @param frag (IN) Receive fragment descriptor. + * @param header (IN) Header corresponding to the receive fragment. + * @return OMPI_SUCCESS or error status on failure. + */ +bool mca_pml_teg_recv_frag_match( + mca_ptl_base_module_t* ptl, + mca_ptl_base_recv_frag_t* frag, + mca_ptl_base_match_header_t* header) +{ + bool matched; + ompi_list_t matched_frags; + OBJ_CONSTRUCT(&matched_frags, ompi_list_t); + if((matched = mca_ptl_base_match(header, frag, &matched_frags)) == false) { + frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); + } + + while(NULL != frag) { + mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + mca_pml_base_recv_request_t *request = frag->frag_request; + mca_ptl_base_match_header_t *header = &frag->frag_base.frag_header.hdr_match; + + /* + * Initialize request status. + */ + request->req_bytes_packed = header->hdr_msg_length; + request->req_base.req_peer = header->hdr_src; + request->req_base.req_tag = header->hdr_tag; + + /* + * If probe - signal request is complete - but don't notify PTL + */ + if(request->req_base.req_type == MCA_PML_REQUEST_PROBE) { + + ptl->ptl_recv_progress( + ptl, + request, + frag->frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_base.frag_size); + matched = mca_pml_teg_recv_frag_match( ptl, frag, header ); + + } else { + + /* if required - setup pointer to ptls peer */ + if (NULL == frag->frag_base.frag_peer) { + frag->frag_base.frag_peer = mca_pml_teg_proc_lookup_remote_peer(request->req_base.req_comm,header->hdr_src,ptl); + } + + /* notify ptl of match */ + ptl->ptl_matched(ptl, frag); + + /* process any additional fragments that arrived out of order */ + frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); + }; + }; + return matched; +} + + diff --git a/src/mca/pml/teg/src/pml_teg_recvfrag.h b/src/mca/pml/teg/src/pml_teg_recvfrag.h new file mode 100644 index 0000000000..210662edfb --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_recvfrag.h @@ -0,0 +1,30 @@ +/* + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PML_TEG_RECVFRAG_H +#define MCA_PML_TEG_RECVFRAG_H + +#include "mca/ptl/ptl.h" +#include "mca/pml/base/pml_base_recvreq.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" + +/** + * Called by the PTL to match attempt a match for new fragments. + * + * @param ptl (IN) The PTL pointer + * @param frag (IN) Receive fragment descriptor. + * @param header (IN) Header corresponding to the receive fragment. + * @return OMPI_SUCCESS or error status on failure. + */ +bool mca_pml_teg_recv_frag_match( + mca_ptl_base_module_t* ptl, + mca_ptl_base_recv_frag_t* frag, + mca_ptl_base_match_header_t* header +); + +#endif + diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.c b/src/mca/pml/teg/src/pml_teg_recvreq.c index d514716923..67ca2b1011 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/pml/teg/src/pml_teg_recvreq.c @@ -61,6 +61,9 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques if (ompi_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 && (frag = mca_pml_teg_recv_request_match_specific_proc(request, req_peer)) != NULL) { mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + /* setup pointer to ptls peer */ + if(NULL == frag->frag_base.frag_peer) + frag->frag_base.frag_peer = mca_pml_teg_proc_lookup_remote_peer(comm,req_peer,ptl); OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); ptl->ptl_matched(ptl, frag); return; /* match found */ @@ -109,6 +112,9 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) /* loop over messages from the current proc */ if ((frag = mca_pml_teg_recv_request_match_specific_proc(request, proc)) != NULL) { mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + /* if required - setup pointer to ptls peer */ + if(NULL == frag->frag_base.frag_peer) + frag->frag_base.frag_peer = mca_pml_teg_proc_lookup_remote_peer(comm,proc,ptl); OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); ptl->ptl_matched(ptl, frag); return; /* match found */ diff --git a/src/mca/ptl/base/ptl_base_recvfrag.h b/src/mca/ptl/base/ptl_base_recvfrag.h index 64c2e16899..a21c3302c7 100644 --- a/src/mca/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/ptl/base/ptl_base_recvfrag.h @@ -25,62 +25,5 @@ struct mca_ptl_base_recv_frag_t { typedef struct mca_ptl_base_recv_frag_t mca_ptl_base_recv_frag_t; -/** - * Called by the PTL to match attempt a match for new fragments. - * - * @param ptl (IN) The PTL pointer - * @param frag (IN) Receive fragment descriptor. - * @param header (IN) Header corresponding to the receive fragment. - * @return OMPI_SUCCESS or error status on failure. - */ -static inline bool mca_ptl_base_recv_frag_match( - mca_ptl_base_module_t* ptl, - mca_ptl_base_recv_frag_t* frag, - mca_ptl_base_match_header_t* header) -{ - bool matched; - ompi_list_t matched_frags; - OBJ_CONSTRUCT(&matched_frags, ompi_list_t); - if((matched = mca_ptl_base_match(header, frag, &matched_frags)) == false) { - frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); - } - - while(NULL != frag) { - mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; - mca_pml_base_recv_request_t *request = frag->frag_request; - mca_ptl_base_match_header_t *header = &frag->frag_base.frag_header.hdr_match; - - /* - * Initialize request status. - */ - request->req_bytes_packed = header->hdr_msg_length; - request->req_base.req_peer = header->hdr_src; - request->req_base.req_tag = header->hdr_tag; - - /* - * If probe - signal request is complete - but don't notify PTL - */ - if(request->req_base.req_type == MCA_PML_REQUEST_PROBE) { - - ptl->ptl_recv_progress( - ptl, - request, - frag->frag_base.frag_header.hdr_frag.hdr_frag_length, - frag->frag_base.frag_size); - matched = mca_ptl_base_recv_frag_match( ptl, frag, header ); - - } else { - - /* notify ptl of match */ - ptl->ptl_matched(ptl, frag); - - /* process any additional fragments that arrived out of order */ - frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); - }; - }; - return matched; -} - - #endif diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c index b38b2d4a2f..d76528bd11 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -156,9 +156,10 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* first pass through - attempt a match */ if(NULL == frag->frag_recv.frag_request && 0 == frag->frag_msg_cnt) { + mca_ptl_base_module_t* ptl = frag->frag_recv.frag_base.frag_owner; /* attempt to match a posted recv */ - if (mca_ptl_base_recv_frag_match( - frag->frag_recv.frag_base.frag_owner, + if (ptl->ptl_match( + ptl, &frag->frag_recv, &frag->frag_recv.frag_base.frag_header.hdr_match)) { mca_ptl_tcp_recv_frag_matched(frag);