diff --git a/src/mca/mpi/ptl/base/ptl_base_fragment.h b/src/mca/mpi/ptl/base/ptl_base_fragment.h index 7df3841294..8516d5ed91 100644 --- a/src/mca/mpi/ptl/base/ptl_base_fragment.h +++ b/src/mca/mpi/ptl/base/ptl_base_fragment.h @@ -17,6 +17,9 @@ struct mca_ptl_base_frag_t { lam_list_item_t super; mca_ptl_base_header_t frag_header; struct mca_ptl_t* frag_owner; /**< PTL that allocated this fragment */ + struct mca_ptl_peer_t* frag_peer; /**< PTL specific addressing info */ + void *frag_addr; /* pointer into request buffer at fragment offset */ + size_t frag_size; /* number of bytes available in request buffer */ }; typedef struct mca_ptl_base_frag_t mca_ptl_base_frag_t; diff --git a/src/mca/mpi/ptl/base/ptl_base_match.c b/src/mca/mpi/ptl/base/ptl_base_match.c index 0660a5360e..48ddb7edd9 100644 --- a/src/mca/mpi/ptl/base/ptl_base_match.c +++ b/src/mca/mpi/ptl/base/ptl_base_match.c @@ -22,7 +22,7 @@ * Specialized matching routines for internal use only. */ -static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match( +static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match( mca_ptl_base_header_t *frag_header, mca_pml_comm_t *ptl_comm); @@ -121,7 +121,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header, (pml_comm->c_next_msg_seq[frag_src])++; /* see if receive has already been posted */ - matched_receive = mca_ptl_base_check_recieves_for_match(frag_header, + matched_receive = mca_ptl_base_check_receives_for_match(frag_header, pml_comm); /* if match found, process data */ @@ -131,7 +131,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header, *match_made=true; /* associate the receive descriptor with the fragment * descriptor */ - frag_desc->frag_match=matched_receive; + frag_desc->frag_request=matched_receive; /* * update deliverd sequence number information, @@ -203,7 +203,7 @@ int mca_ptl_base_match(mca_ptl_base_header_t *frag_header, * This routine assumes that the appropriate matching locks are * set by the upper level routine. */ -static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match +static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match (mca_ptl_base_header_t *frag_header, mca_pml_comm_t *pml_comm) { /* local parameters */ @@ -215,7 +215,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_recieves_for_match /* * figure out what sort of matching logic to use, if need to - * look only at "specific" recieves, or "wild" receives, + * look only at "specific" receives, or "wild" receives, * or if we need to traverse both sets at the same time. */ frag_src = frag_header->hdr_frag_seq; @@ -329,7 +329,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match( * This routine assumes that the appropriate matching locks are * set by the upper level routine. */ -static mca_ptl_base_recv_request_t *mca_ptl_base_check_c_specific_receives_for_match( +static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_match( mca_ptl_base_header_t *frag_header, mca_pml_comm_t *pml_comm) { @@ -454,7 +454,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive if (wild_recv == (mca_ptl_base_recv_request_t *) lam_list_get_end(&(pml_comm->c_wild_receives)) ) { - return_match = mca_ptl_base_check_c_specific_receives_for_match(frag_header, + return_match = mca_ptl_base_check_specific_receives_for_match(frag_header, pml_comm); return return_match; @@ -514,8 +514,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive } } -#if (0) - /* need to handle this -- lam_check_cantmatch_for_match(); + /* need to handle this -- mca_ptl_base_check_cantmatch_for_match(); */ /** * Scan the list of frags that came in ahead of time to see if any @@ -537,9 +536,9 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche { /* local parameters */ int match_found; - mca_pml_base_sequence_t next_msg_seq_expected, frag_seqber; - mca_pml_base_recv_frag_t *frag_desc; - mca_pml_base_recv_request_t *matched_receive; + mca_ptl_base_sequence_t next_msg_seq_expected, frag_seqber; + mca_ptl_base_recv_frag_t *frag_desc; + mca_ptl_base_recv_request_t *matched_receive; /* * Initialize list size - assume that most of the time this search @@ -565,12 +564,12 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche /* search the list for a fragment from the send with sequence * number next_msg_seq_expected */ - for(frag_desc = (mca_pml_base_recv_frag_t *) + for(frag_desc = (mca_ptl_base_recv_frag_t *) lam_list_get_first((pml_comm->c_frags_cant_match)+frag_src); - frag_desc != (mca_pml_base_recv_frag_t *) + frag_desc != (mca_ptl_base_recv_frag_t *) lam_list_get_end((pml_comm->c_frags_cant_match)+frag_src); - frag_desc = (mca_pml_base_recv_frag_t *) - ((lam_list_item_t *)c_frags_cant_match)->lam_list_next) + frag_desc = (mca_ptl_base_recv_frag_t *) + lam_list_get_next(frag_desc)) { /* * If the message has the next expected seq from that proc... @@ -603,15 +602,15 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche /* * check to see if this frag matches a posted message */ - matched_receive = mca_ptl_base_check_recieves_for_match( - frag_desc, pml_comm); + matched_receive = mca_ptl_base_check_receives_for_match( + &frag_desc->super.frag_header, pml_comm); /* if match found, process data */ if (matched_receive) { /* associate the receive descriptor with the fragment * descriptor */ - frag_desc->frag_match=matched_receive; + frag_desc->frag_request=matched_receive; /* add this fragment descriptor to the list of * descriptors to be processed later @@ -643,4 +642,4 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche return; } -#endif /* if (0) */ + diff --git a/src/mca/mpi/ptl/base/ptl_base_recvfrag.c b/src/mca/mpi/ptl/base/ptl_base_recvfrag.c index 43d6986f96..6bea3f620b 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvfrag.c +++ b/src/mca/mpi/ptl/base/ptl_base_recvfrag.c @@ -3,7 +3,10 @@ */ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ +#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/base/ptl_base_recvfrag.h" +#include "mca/mpi/ptl/base/ptl_base_match.h" + lam_class_info_t mca_ptl_base_recv_frag_cls = { "mca_ptl_base_recv_frag_t", @@ -23,3 +26,51 @@ void mca_ptl_base_recv_frag_destroy(mca_ptl_base_recv_frag_t* frag) SUPER_DESTROY(frag, &mca_ptl_base_frag_cls); } +int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t* frag, mca_ptl_base_header_t* header) +{ + lam_list_t matched_frags; + bool matched; + int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags); + if(rc != LAM_SUCCESS) + return rc; + + if(matched) { + do { + mca_ptl_base_recv_request_t* request = frag->frag_request; + mca_ptl_t* ptl = frag->super.frag_owner; + + /* determine the offset and size of posted buffer */ + if (request->super.req_length < frag->super.frag_header.hdr_msg_offset) { + + /* user buffer is to small - discard entire fragment */ + frag->super.frag_addr = 0; + frag->super.frag_size = 0; + + } else if (request->super.req_length < frag->super.frag_header.hdr_msg_offset + + frag->super.frag_header.hdr_frag_length) { + + /* user buffer is to small - discard part of fragment */ + frag->super.frag_addr = ((unsigned char*)request->super.req_addr + + frag->super.frag_header.hdr_msg_offset); + frag->super.frag_size = request->super.req_length - frag->super.frag_header.hdr_msg_offset; + + } else { + + /* user buffer is large enough for this fragment */ + frag->super.frag_addr = ((unsigned char*)request->super.req_addr + + frag->super.frag_header.hdr_msg_offset); + frag->super.frag_size = frag->super.frag_header.hdr_frag_length; + + } + + /* send cts acknowledgment back to peer */ + ptl->ptl_cts(ptl, frag); + + /* process any fragments that arrived out of order */ + frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags); + } while(NULL != frag); + } + return LAM_SUCCESS; +} + + diff --git a/src/mca/mpi/ptl/base/ptl_base_recvfrag.h b/src/mca/mpi/ptl/base/ptl_base_recvfrag.h index afdaa502ce..adad35e209 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/mpi/ptl/base/ptl_base_recvfrag.h @@ -13,15 +13,17 @@ extern lam_class_info_t mca_ptl_base_recv_frag_cls; -typedef struct { +struct mca_ptl_base_recv_frag_t { mca_ptl_base_frag_t super; - /* matched receve request corresponding to this fragment */ - mca_ptl_base_recv_request_t *frag_match; -} mca_ptl_base_recv_frag_t; + mca_ptl_base_recv_request_t *frag_request; /* matched posted receive */ + struct mca_ptl_peer_t* frag_peer; /* peer received from */ +}; +typedef struct mca_ptl_base_recv_frag_t mca_ptl_base_recv_frag_t; void mca_ptl_base_recv_frag_init(mca_ptl_base_recv_frag_t*); void mca_ptl_base_recv_frag_destroy(mca_ptl_base_recv_frag_t*); +int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t*, mca_ptl_base_header_t*); #endif diff --git a/src/mca/mpi/ptl/base/ptl_base_recvreq.c b/src/mca/mpi/ptl/base/ptl_base_recvreq.c index d3c0c603ad..44c1e7ef55 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvreq.c +++ b/src/mca/mpi/ptl/base/ptl_base_recvreq.c @@ -5,6 +5,7 @@ #include "mca/mpi/ptl/base/ptl_base_comm.h" #include "mca/mpi/ptl/base/ptl_base_recvreq.h" +#include "mca/mpi/ptl/base/ptl_base_recvfrag.h" lam_class_info_t mca_ptl_base_recv_request_cls = { @@ -48,4 +49,8 @@ int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* req) return LAM_SUCCESS; } +void mca_ptl_base_recv_request_progress(mca_ptl_base_recv_request_t* req, mca_ptl_base_recv_frag_t* frag) +{ + +} diff --git a/src/mca/mpi/ptl/base/ptl_base_recvreq.h b/src/mca/mpi/ptl/base/ptl_base_recvreq.h index ff99f8869b..76c2a68586 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvreq.h +++ b/src/mca/mpi/ptl/base/ptl_base_recvreq.h @@ -9,6 +9,7 @@ #include "mca/mpi/pml/base/pml_base_request.h" extern lam_class_info_t mca_ptl_base_recv_request_cls;; +struct mca_ptl_base_recv_frag_t; typedef struct { @@ -19,8 +20,9 @@ typedef struct { void mca_ptl_base_recv_request_init(mca_ptl_base_recv_request_t*); void mca_ptl_base_recv_request_destroy(mca_ptl_base_recv_request_t*); -int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*); -int mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*); +int mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*); +int mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*); +void mca_ptl_base_recv_request_progress(mca_ptl_base_recv_request_t*, struct mca_ptl_base_recv_frag_t*); static inline void mca_ptl_base_recv_request_reinit( diff --git a/src/mca/mpi/ptl/base/ptl_base_sendfrag.h b/src/mca/mpi/ptl/base/ptl_base_sendfrag.h index 44d9c54138..c2b9c1ba66 100644 --- a/src/mca/mpi/ptl/base/ptl_base_sendfrag.h +++ b/src/mca/mpi/ptl/base/ptl_base_sendfrag.h @@ -13,8 +13,6 @@ extern lam_class_info_t mca_ptl_base_send_frag_cls; struct mca_ptl_base_send_frag_t { mca_ptl_base_frag_t super; struct mca_ptl_base_send_request_t *frag_request; - unsigned char* frag_data; - size_t frag_size; }; typedef struct mca_ptl_base_send_frag_t mca_ptl_base_send_frag_t; diff --git a/src/mca/mpi/ptl/ptl.h b/src/mca/mpi/ptl/ptl.h index 1574dee1a0..a8cbb7f185 100644 --- a/src/mca/mpi/ptl/ptl.h +++ b/src/mca/mpi/ptl/ptl.h @@ -23,6 +23,7 @@ struct mca_ptl_t; struct mca_ptl_peer_t; struct mca_ptl_base_fragment_t; struct mca_ptl_base_send_request_t; +struct mca_ptl_base_recv_frag_t; typedef uint64_t mca_ptl_base_sequence_t; typedef uint64_t mca_ptl_base_tstamp_t; @@ -126,6 +127,11 @@ typedef void (*mca_ptl_base_request_return_fn_t)( struct mca_ptl_base_send_request_t* request ); +typedef void (*mca_ptl_base_frag_return_fn_t)( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* frag +); + typedef int (*mca_ptl_base_send_fn_t)( struct mca_ptl_t* ptl, struct mca_ptl_peer_t* ptl_peer, @@ -134,6 +140,11 @@ typedef int (*mca_ptl_base_send_fn_t)( bool* complete ); +typedef int (*mca_ptl_base_cts_fn_t)( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* recv_frag +); + /** * PTL instance interface functions and common state. */ @@ -154,8 +165,10 @@ struct mca_ptl_t { mca_ptl_base_del_proc_fn_t ptl_del_proc; mca_ptl_base_finalize_fn_t ptl_finalize; mca_ptl_base_send_fn_t ptl_send; + mca_ptl_base_cts_fn_t ptl_cts; mca_ptl_base_request_alloc_fn_t ptl_request_alloc; mca_ptl_base_request_return_fn_t ptl_request_return; + mca_ptl_base_frag_return_fn_t ptl_frag_return; }; typedef struct mca_ptl_t mca_ptl_t; diff --git a/src/mca/mpi/ptl/tcp/src/Makefile.am b/src/mca/mpi/ptl/tcp/src/Makefile.am index 5bbccc69b9..aaa0c8dba7 100644 --- a/src/mca/mpi/ptl/tcp/src/Makefile.am +++ b/src/mca/mpi/ptl/tcp/src/Makefile.am @@ -21,6 +21,7 @@ libmca_ptl_tcp_la_SOURCES = \ ptl_tcp_module.c \ ptl_tcp_recvfrag.c \ ptl_tcp_recvfrag.h \ - ptl_tcp_send.c \ ptl_tcp_sendfrag.c \ - ptl_tcp_sendfrag.h + ptl_tcp_sendfrag.h \ + ptl_tcp_sendreq.c \ + ptl_tcp_sendreq.h diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp.c index 1c1618b6db..070d6b403d 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp.c @@ -8,11 +8,13 @@ #include "mca/mpi/pml/pml.h" #include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/base/ptl_base_sendreq.h" +#include "mca/mpi/ptl/base/ptl_base_sendfrag.h" #include "mca/lam/base/mca_base_module_exchange.h" #include "ptl_tcp.h" #include "ptl_tcp_addr.h" #include "ptl_tcp_peer.h" #include "ptl_tcp_proc.h" +#include "ptl_tcp_sendreq.h" mca_ptl_tcp_t mca_ptl_tcp = { @@ -28,8 +30,10 @@ mca_ptl_tcp_t mca_ptl_tcp = { mca_ptl_tcp_del_proc, mca_ptl_tcp_finalize, mca_ptl_tcp_send, + mca_ptl_tcp_cts, mca_ptl_tcp_request_alloc, - mca_ptl_tcp_request_return + mca_ptl_tcp_request_return, + mca_ptl_tcp_frag_return } }; @@ -114,8 +118,44 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re return rc; } + void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request) { lam_free_list_return(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request); } + +void mca_ptl_tcp_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_base_recv_frag_t* frag) +{ + lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); +} + + +int mca_ptl_tcp_send( + struct mca_ptl_t* ptl, + struct mca_ptl_peer_t* ptl_peer, + struct mca_ptl_base_send_request_t* sendreq, + size_t size, + bool* complete) +{ + mca_ptl_tcp_send_frag_t* sendfrag; + if (sendreq->req_frags == 0) { + sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag; + } else { + int rc; + sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc); + if(sendfrag == 0) + return rc; + } + mca_ptl_tcp_send_frag_reinit(sendfrag, ptl_peer, sendreq, size); + return mca_ptl_tcp_peer_send(ptl_peer, sendfrag); +} + + +int mca_ptl_tcp_cts( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* frag) +{ + return LAM_ERROR; +} + diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp.h index f249e3bae1..44181b027b 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp.h @@ -102,6 +102,11 @@ extern void mca_ptl_tcp_request_return( struct mca_ptl_base_send_request_t* ); +extern void mca_ptl_tcp_frag_return( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* +); + extern int mca_ptl_tcp_send( struct mca_ptl_t* ptl, struct mca_ptl_peer_t* ptl_peer, @@ -110,6 +115,11 @@ extern int mca_ptl_tcp_send( bool* complete ); +extern int mca_ptl_tcp_cts( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* frag +); + #endif diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c index 261912f6b0..663972dd41 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c @@ -19,6 +19,7 @@ #include "ptl_tcp_proc.h" #include "ptl_tcp_recvfrag.h" #include "ptl_tcp_sendfrag.h" +#include "ptl_tcp_sendreq.h" mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = { @@ -30,7 +31,7 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = { /* Indicate that we are a pml v1.0.0 module (which also implies a specific MCA version) */ - MCA_PML_BASE_VERSION_1_0_0, + MCA_PTL_BASE_VERSION_1_0_0, "tcp", /* MCA module name */ 1, /* MCA module major version */ @@ -280,8 +281,8 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls, /* initialize free lists */ STATIC_INIT(mca_ptl_tcp_module.tcp_send_requests, &lam_free_list_cls); lam_free_list_init_with(&mca_ptl_tcp_module.tcp_send_requests, - sizeof(mca_ptl_base_send_request_t) + sizeof(mca_ptl_tcp_send_frag_t), - &mca_ptl_base_send_request_cls, + sizeof(mca_ptl_tcp_send_request_t), + &mca_ptl_tcp_send_request_cls, mca_ptl_tcp_module.tcp_free_list_num, mca_ptl_tcp_module.tcp_free_list_max, mca_ptl_tcp_module.tcp_free_list_inc, diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c index bae560864c..36b70074cd 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -21,27 +21,29 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd); static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd); - void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag) { SUPER_INIT(frag, &mca_ptl_base_recv_frag_cls); } + void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t* frag) { SUPER_DESTROY(frag, &mca_ptl_base_recv_frag_cls); } + void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_peer_t* peer) { frag->frag_owner = &peer->peer_ptl->super; - frag->frag_match = 0; + frag->super.frag_request = 0; frag->frag_peer = peer; frag->frag_addr = 0; frag->frag_size = 0; frag->frag_hdr_cnt = 0; frag->frag_msg_cnt = 0; } + bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) { @@ -57,10 +59,14 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) return false; - /* done - do something */ + if(NULL != frag->super.frag_request) { + /* indicate completion status */ + mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super); + } return true; } + static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* non-blocking read - continue if interrupted, otherwise wait until data available */ @@ -94,12 +100,15 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; /* attempt to match a posted recv */ - /* FIX */ + mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header); /* match was not made - so allocate buffer for eager send */ - if(NULL == frag->frag_match) { + if(NULL == frag->super.frag_request) { frag->frag_addr = (unsigned char*)LAM_MALLOC(frag->frag_header.hdr_frag_length); frag->frag_size = frag->frag_header.hdr_frag_length; + } else { + frag->frag_addr = (unsigned char*)frag->super.super.frag_addr; + frag->frag_size = frag->super.super.frag_size; } return true; } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h index 9668d35fb1..f55380d8e7 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h @@ -18,12 +18,11 @@ extern lam_class_info_t mca_ptl_tcp_recv_frag_cls; struct mca_ptl_tcp_recv_frag_t { mca_ptl_base_recv_frag_t super; - struct mca_ptl_peer_t* frag_peer; unsigned char* frag_addr; size_t frag_size; size_t frag_hdr_cnt; size_t frag_msg_cnt; -#define frag_match super.frag_match +#define frag_peer super.super.frag_peer #define frag_owner super.super.frag_owner #define frag_header super.super.frag_header }; @@ -33,7 +32,8 @@ typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t*); void mca_ptl_tcp_recv_frag_destroy(mca_ptl_tcp_recv_frag_t*); bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd); -void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t*, struct mca_ptl_peer_t*); +void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_peer_t* peer); + #endif diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c index 370720df71..b5d4947e3c 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -63,8 +63,8 @@ void mca_ptl_tcp_send_frag_reinit( /* fragment state */ sendfrag->frag_owner = &ptl_peer->peer_ptl->super; sendfrag->super.frag_request = sendreq; - sendfrag->super.frag_data = sendreq->req_data + hdr->hdr_msg_offset; - sendfrag->super.frag_size = size; + sendfrag->super.super.frag_addr = sendreq->super.req_addr + hdr->hdr_msg_offset; + sendfrag->super.super.frag_size = size; sendfrag->frag_peer = ptl_peer; sendfrag->frag_vec_ptr = sendfrag->frag_vec; @@ -72,8 +72,8 @@ void mca_ptl_tcp_send_frag_reinit( sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t); sendfrag->frag_vec_cnt = 1; if(size > 0) { - sendfrag->frag_vec[1].iov_base = (lam_iov_base_ptr_t)sendfrag->super.frag_data; - sendfrag->frag_vec[1].iov_len = sendfrag->super.frag_size; + sendfrag->frag_vec[1].iov_base = (lam_iov_base_ptr_t)sendfrag->super.super.frag_addr; + sendfrag->frag_vec[1].iov_len = sendfrag->super.super.frag_size; sendfrag->frag_vec_cnt++; } } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h index f10d875010..64c63f24d0 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h @@ -18,10 +18,10 @@ extern lam_class_info_t mca_ptl_tcp_send_frag_cls; struct mca_ptl_tcp_send_frag_t { mca_ptl_base_send_frag_t super; - struct mca_ptl_peer_t* frag_peer; struct iovec *frag_vec_ptr; size_t frag_vec_cnt; struct iovec frag_vec[2]; +#define frag_peer super.super.frag_peer #define frag_header super.super.frag_header #define frag_owner super.super.frag_owner }; diff --git a/src/mpi/communicator/communicator.h b/src/mpi/communicator/communicator.h index 6dae5d1234..4da751fc16 100644 --- a/src/mpi/communicator/communicator.h +++ b/src/mpi/communicator/communicator.h @@ -47,15 +47,15 @@ typedef struct lam_communicator_t lam_communicator_t; static inline lam_communicator_t *lam_comm_lookup(uint32_t cid) { /* array of pointers to communicators, indexed by context ID */ - extern lam_communicator_t **lam_communicator_array; + extern lam_communicator_t **lam_mpi_comm_array; #ifdef LAM_ENABLE_DEBUG - extern uint32_t lam_communicator_array_len; - if(cid >= lam_communicator_array_len) { + extern uint32_t lam_mpi_comm_array_size; + if(cid >= lam_mpi_comm_array_size) { lam_output(0, "lam_comm_lookup: invalid communicator index (%d)", cid); return (lam_communicator_t *) NULL; } #endif - return lam_communicator_array[cid]; + return lam_mpi_comm_array[cid]; } static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, size_t peer_id) diff --git a/src/mpi/communicator/mpi_comm_globals.c b/src/mpi/communicator/mpi_comm_globals.c index 0d49b557f6..2208a7ced5 100644 --- a/src/mpi/communicator/mpi_comm_globals.c +++ b/src/mpi/communicator/mpi_comm_globals.c @@ -14,5 +14,8 @@ * Global variables */ +lam_communicator_t *lam_mpi_comm_array; +size_t lam_mpi_comm_array_size; + lam_communicator_t lam_mpi_comm_world; lam_communicator_t lam_mpi_comm_self;