From b577174a4a7ae2a4c93bb642ddbb40e2ffdc505d Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 15 Jul 2004 18:08:20 +0000 Subject: [PATCH] changes to PTL interface - moved pml_base_send_request_t allocation to pml - provide capability to cache requests on per ptl basis - and provide hooks for ptl to init/cleanup ptl specific data cached on the request - renamed request/fragment super fields - added ptl_send i/f which is called by pml for the first fragment, the ptl_put i/f is called for subsequent fragments of the same request This commit was SVN r1745. --- src/class/ompi_free_list.c | 4 +- src/class/ompi_free_list.h | 23 ++++- src/mca/oob/tcp/oob_tcp.c | 2 + src/mca/pml/base/pml_base_bsend.c | 4 +- src/mca/pml/base/pml_base_recvreq.c | 2 +- src/mca/pml/base/pml_base_recvreq.h | 60 ++++++------ src/mca/pml/base/pml_base_request.c | 2 +- src/mca/pml/base/pml_base_request.h | 10 +- src/mca/pml/base/pml_base_select.c | 1 - src/mca/pml/base/pml_base_sendreq.c | 2 +- src/mca/pml/base/pml_base_sendreq.h | 116 +++++++++++----------- src/mca/pml/teg/src/Makefile.am | 2 + src/mca/pml/teg/src/pml_ptl_array.h | 1 + src/mca/pml/teg/src/pml_teg.c | 37 ++++++- src/mca/pml/teg/src/pml_teg.h | 14 +-- src/mca/pml/teg/src/pml_teg_free.c | 1 + src/mca/pml/teg/src/pml_teg_iprobe.c | 20 ++-- src/mca/pml/teg/src/pml_teg_irecv.c | 6 +- src/mca/pml/teg/src/pml_teg_isend.c | 6 +- src/mca/pml/teg/src/pml_teg_module.c | 4 +- src/mca/pml/teg/src/pml_teg_proc.h | 4 +- src/mca/pml/teg/src/pml_teg_ptl.c | 23 +++++ src/mca/pml/teg/src/pml_teg_ptl.h | 18 ++++ src/mca/pml/teg/src/pml_teg_recvreq.c | 47 ++++----- src/mca/pml/teg/src/pml_teg_recvreq.h | 14 +-- src/mca/pml/teg/src/pml_teg_sendreq.c | 24 ++--- src/mca/pml/teg/src/pml_teg_sendreq.h | 103 ++++++++++++++------ src/mca/pml/teg/src/pml_teg_start.c | 2 +- src/mca/pml/teg/src/pml_teg_test.c | 1 + src/mca/pml/teg/src/pml_teg_wait.c | 1 + src/mca/ptl/base/ptl_base_match.c | 20 ++-- src/mca/ptl/base/ptl_base_recvfrag.h | 18 ++-- src/mca/ptl/base/ptl_base_sendfrag.h | 2 +- src/mca/ptl/ptl.h | 129 +++++++++++++++---------- src/mca/ptl/self/src/ptl_self.c | 88 +++++++++-------- src/mca/ptl/self/src/ptl_self.h | 14 +-- src/mca/ptl/tcp/src/ptl_tcp.c | 38 +++----- src/mca/ptl/tcp/src/ptl_tcp.h | 31 +++--- src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c | 77 ++++++++------- src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h | 73 +++++++------- src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c | 30 +++--- src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h | 43 +++++---- 42 files changed, 651 insertions(+), 466 deletions(-) create mode 100644 src/mca/pml/teg/src/pml_teg_ptl.c create mode 100644 src/mca/pml/teg/src/pml_teg_ptl.h diff --git a/src/class/ompi_free_list.c b/src/class/ompi_free_list.c index c3a3f72ffd..a64c8a6a34 100644 --- a/src/class/ompi_free_list.c +++ b/src/class/ompi_free_list.c @@ -48,7 +48,9 @@ int ompi_free_list_init( flist->fl_num_allocated = 0; flist->fl_num_per_alloc = num_elements_per_alloc; flist->fl_mpool = mpool; - return ompi_free_list_grow(flist, num_elements_to_alloc); + if(num_elements_to_alloc) + return ompi_free_list_grow(flist, num_elements_to_alloc); + return OMPI_SUCCESS; } diff --git a/src/class/ompi_free_list.h b/src/class/ompi_free_list.h index 70a0687014..884df38218 100644 --- a/src/class/ompi_free_list.h +++ b/src/class/ompi_free_list.h @@ -61,8 +61,29 @@ int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements); } +#define OMPI_FREE_LIST_WAIT(fl, item, rc) \ +{ \ + if(ompi_using_threads()) { \ + ompi_mutex_lock(&((fl)->fl_lock)); \ + item = ompi_list_remove_first(&((fl)->super)); \ + if(NULL == item) { \ + ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ + item = ompi_list_remove_first(&((fl)->super)); \ + } \ + ompi_mutex_unlock(&((fl)->fl_lock)); \ + } else { \ + item = ompi_list_remove_first(&((fl)->super)); \ + if(NULL == item) { \ + ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ + item = ompi_list_remove_first(&((fl)->super)); \ + } \ + } \ + rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \ +} + + #define OMPI_FREE_LIST_RETURN(fl, item) \ - THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_append(&((fl)->super), (item))); + THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_prepend(&((fl)->super), (item))); #endif diff --git a/src/mca/oob/tcp/oob_tcp.c b/src/mca/oob/tcp/oob_tcp.c index eccd84ea0a..ee01cf7576 100644 --- a/src/mca/oob/tcp/oob_tcp.c +++ b/src/mca/oob/tcp/oob_tcp.c @@ -109,8 +109,10 @@ static int ompi_process_name_compare(ompi_process_name_t* n1, ompi_process_name_ struct mca_oob_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads) { +#if 0 /* initialize data structures */ ompi_rb_tree_init(&mca_oob_tcp_module.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare); +#endif /* return &mca_oob_tcp; */ return NULL; } diff --git a/src/mca/pml/base/pml_base_bsend.c b/src/mca/pml/base/pml_base_bsend.c index 641b992e71..6a5cfccf6b 100644 --- a/src/mca/pml/base/pml_base_bsend.c +++ b/src/mca/pml/base/pml_base_bsend.c @@ -219,8 +219,8 @@ int mca_pml_base_bsend_request_init(ompi_request_t* request, bool persistent) mca_pml_bsend_count++; /* set flag indicating mpi layer is done */ - sendreq->super.req_persistent = persistent; - sendreq->super.req_mpi_done = true; + sendreq->req_base.req_persistent = persistent; + sendreq->req_base.req_mpi_done = true; OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex); return OMPI_SUCCESS; } diff --git a/src/mca/pml/base/pml_base_recvreq.c b/src/mca/pml/base/pml_base_recvreq.c index 0a06fd74d8..2464224c54 100644 --- a/src/mca/pml/base/pml_base_recvreq.c +++ b/src/mca/pml/base/pml_base_recvreq.c @@ -22,7 +22,7 @@ ompi_class_t mca_pml_base_recv_request_t_class = { static void mca_pml_base_recv_request_construct(mca_pml_base_recv_request_t* request) { /* no need to reinit for every recv -- never changes */ - request->super.req_type = MCA_PML_REQUEST_RECV; + request->req_base.req_type = MCA_PML_REQUEST_RECV; } diff --git a/src/mca/pml/base/pml_base_recvreq.h b/src/mca/pml/base/pml_base_recvreq.h index 05f54338a5..3379f6d386 100644 --- a/src/mca/pml/base/pml_base_recvreq.h +++ b/src/mca/pml/base/pml_base_recvreq.h @@ -15,10 +15,10 @@ extern ompi_class_t mca_pml_base_recv_request_t_class; * Base type for receive requests. */ struct mca_pml_base_recv_request_t { - mca_pml_base_request_t super; /**< base request */ - size_t req_bytes_packed; /**< size of message being received */ - size_t req_bytes_received; /**< number of bytes received from network */ - size_t req_bytes_delivered; /**< number of bytes delivered to user */ + mca_pml_base_request_t req_base; /**< base request */ + size_t req_bytes_packed; /**< size of message being received */ + size_t req_bytes_received; /**< number of bytes received from network */ + size_t req_bytes_delivered; /**< number of bytes delivered to user */ }; typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t; @@ -35,32 +35,32 @@ typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t; * @param comm (IN) Communicator. * @param persistent (IN) Is this a ersistent request. */ -#define MCA_PML_BASE_RECV_REQUEST_INIT( \ - request, \ - addr, \ - count, \ - datatype, \ - src, \ - tag, \ - comm, \ - persistent) \ -{ \ - OMPI_REQUEST_INIT(&(request)->super.super); \ - (request)->req_bytes_packed = 0; \ - (request)->req_bytes_received = 0; \ - (request)->req_bytes_delivered = 0; \ - (request)->super.req_sequence = 0; \ - (request)->super.req_addr = addr; \ - (request)->super.req_count = count; \ - (request)->super.req_datatype = datatype; \ - (request)->super.req_peer = src; \ - (request)->super.req_tag = tag; \ - (request)->super.req_comm = comm; \ - (request)->super.req_proc = NULL; \ - (request)->super.req_persistent = persistent; \ - (request)->super.req_mpi_done = false; \ - (request)->super.req_pml_done = false; \ - (request)->super.req_free_called = false; \ +#define MCA_PML_BASE_RECV_REQUEST_INIT( \ + request, \ + addr, \ + count, \ + datatype, \ + src, \ + tag, \ + comm, \ + persistent) \ +{ \ + OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ + (request)->req_bytes_packed = 0; \ + (request)->req_bytes_received = 0; \ + (request)->req_bytes_delivered = 0; \ + (request)->req_base.req_sequence = 0; \ + (request)->req_base.req_addr = addr; \ + (request)->req_base.req_count = count; \ + (request)->req_base.req_datatype = datatype; \ + (request)->req_base.req_peer = src; \ + (request)->req_base.req_tag = tag; \ + (request)->req_base.req_comm = comm; \ + (request)->req_base.req_proc = NULL; \ + (request)->req_base.req_persistent = persistent; \ + (request)->req_base.req_mpi_done = false; \ + (request)->req_base.req_pml_done = false; \ + (request)->req_base.req_free_called = false; \ } #endif diff --git a/src/mca/pml/base/pml_base_request.c b/src/mca/pml/base/pml_base_request.c index ca39416343..3fe0c4e12f 100644 --- a/src/mca/pml/base/pml_base_request.c +++ b/src/mca/pml/base/pml_base_request.c @@ -7,7 +7,7 @@ static void mca_pml_base_request_construct(mca_pml_base_request_t* req) { - req->super.req_type = OMPI_REQUEST_PML; + req->req_ompi.req_type = OMPI_REQUEST_PML; } static void mca_pml_base_request_destruct(mca_pml_base_request_t* req) diff --git a/src/mca/pml/base/pml_base_request.h b/src/mca/pml/base/pml_base_request.h index 21c54073d9..45dd2b3b86 100644 --- a/src/mca/pml/base/pml_base_request.h +++ b/src/mca/pml/base/pml_base_request.h @@ -31,17 +31,17 @@ typedef enum { * Base type for PML P2P requests */ struct mca_pml_base_request_t { - ompi_request_t super; /**< base request */ + ompi_request_t req_ompi; /**< base request */ void *req_addr; /**< pointer to application buffer */ size_t req_count; /**< count of user datatype elements */ int32_t req_peer; /**< peer process - rank w/in this communicator */ int32_t req_tag; /**< user defined tag */ - ompi_communicator_t *req_comm; /**< communicator pointer */ - ompi_proc_t* req_proc; /**< peer process */ + ompi_communicator_t *req_comm; /**< communicator pointer */ + ompi_proc_t* req_proc; /**< peer process */ mca_ptl_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */ - ompi_datatype_t *req_datatype; /**< pointer to data type */ + ompi_datatype_t *req_datatype; /**< pointer to data type */ mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */ - ompi_status_public_t req_status; /**< completion status */ + ompi_status_public_t req_status; /**< completion status */ bool req_persistent; /**< flag indicating if the this is a persistent request */ volatile bool req_mpi_done; /**< flag indicating if MPI is done with this request */ volatile bool req_pml_done; /**< flag indicating if the pt-2-pt layer is done with this request */ diff --git a/src/mca/pml/base/pml_base_select.c b/src/mca/pml/base/pml_base_select.c index 7459e0cf93..2d8e8bfaff 100644 --- a/src/mca/pml/base/pml_base_select.c +++ b/src/mca/pml/base/pml_base_select.c @@ -14,7 +14,6 @@ typedef struct opened_module_t { ompi_list_item_t super; - mca_pml_base_module_t *om_module; } opened_module_t; diff --git a/src/mca/pml/base/pml_base_sendreq.c b/src/mca/pml/base/pml_base_sendreq.c index 1d45e80f72..c01d2ac7fc 100644 --- a/src/mca/pml/base/pml_base_sendreq.c +++ b/src/mca/pml/base/pml_base_sendreq.c @@ -19,7 +19,7 @@ ompi_class_t mca_pml_base_send_request_t_class = { static void mca_pml_base_send_request_construct(mca_pml_base_send_request_t* request) { /* no need to reinit for every send -- never changes */ - request->super.req_type = MCA_PML_REQUEST_SEND; + request->req_base.req_type = MCA_PML_REQUEST_SEND; OBJ_CONSTRUCT(&request->req_convertor, ompi_convertor_t); } diff --git a/src/mca/pml/base/pml_base_sendreq.h b/src/mca/pml/base/pml_base_sendreq.h index cc49fdd0be..026d10c5cd 100644 --- a/src/mca/pml/base/pml_base_sendreq.h +++ b/src/mca/pml/base/pml_base_sendreq.h @@ -18,17 +18,17 @@ extern ompi_class_t mca_pml_base_send_request_t_class; * Base type for send requests */ struct mca_pml_base_send_request_t { - mca_pml_base_request_t super; /** base request type - common data structure for use by wait/test */ - size_t req_offset; /**< number of bytes that have already been assigned to a fragment */ - size_t req_bytes_packed; /**< packed size of a message given the datatype and count */ - size_t req_bytes_sent; /**< number of bytes that have been sent */ - mca_pml_base_send_mode_t req_send_mode; /**< type of send */ - struct mca_ptl_t* req_owner; /**< PTL that allocated this descriptor */ - struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */ - ompi_ptr_t req_peer_match; /**< matched receive at peer */ - ompi_ptr_t req_peer_addr; - size_t req_peer_size; - ompi_convertor_t req_convertor; /**< convertor that describes this datatype */ + mca_pml_base_request_t req_base; /** base request type - common data structure for use by wait/test */ + size_t req_offset; /**< number of bytes that have already been assigned to a fragment */ + size_t req_bytes_packed; /**< packed size of a message given the datatype and count */ + size_t req_bytes_sent; /**< number of bytes that have been sent */ + mca_pml_base_send_mode_t req_send_mode; /**< type of send */ + struct mca_ptl_t* req_ptl; /**< PTL that is selected for first fragment */ + struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */ + ompi_ptr_t req_peer_match; /**< matched receive at peer */ + ompi_ptr_t req_peer_addr; /**< peers remote buffer address */ + size_t req_peer_size; /**< size of peers remote buffer */ + ompi_convertor_t req_convertor; /**< convertor that describes this datatype */ }; typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t; @@ -47,52 +47,54 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t; * @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY) * @param persistent (IN) Is request persistent. */ -#define MCA_PML_BASE_SEND_REQUEST_INIT( \ - request, \ - addr, \ - count, \ - datatype, \ - peer, \ - tag, \ - comm, \ - mode,\ - persistent) \ -{ \ - OMPI_REQUEST_INIT(&(request)->super.super); \ - request->req_offset = 0; \ - request->req_bytes_sent = 0; \ - request->req_send_mode = mode; \ - request->req_peer_match.lval = 0; \ - request->req_peer_addr.lval = 0; \ - request->req_peer_size = 0; \ - request->super.req_addr = addr; \ - request->super.req_count = count; \ - request->super.req_datatype = datatype; \ - request->super.req_peer = peer; \ - request->super.req_tag = tag; \ - request->super.req_comm = comm; \ - request->super.req_proc = ompi_comm_peer_lookup(comm,peer); \ - request->super.req_persistent = persistent; \ - request->super.req_mpi_done = false; \ - request->super.req_pml_done = false; \ - request->super.req_free_called = false; \ -\ - /* initialize datatype convertor for this request */ \ - if(count > 0) { \ - int packed_size; \ - ompi_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); \ - ompi_convertor_init_for_send( \ - &request->req_convertor, \ - 0, \ - request->super.req_datatype, \ - request->super.req_count, \ - request->super.req_addr, \ - 0); \ - ompi_convertor_get_packed_size(&request->req_convertor, &packed_size); \ - request->req_bytes_packed = packed_size; \ - } else { \ - request->req_bytes_packed = 0; \ - } \ +#define MCA_PML_BASE_SEND_REQUEST_INIT( \ + request, \ + addr, \ + count, \ + datatype, \ + peer, \ + tag, \ + comm, \ + mode, \ + persistent) \ +{ \ + OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \ + request->req_offset = 0; \ + request->req_bytes_sent = 0; \ + request->req_send_mode = mode; \ + request->req_peer_match.lval = 0; \ + request->req_peer_addr.lval = 0; \ + request->req_peer_size = 0; \ + request->req_base.req_addr = addr; \ + request->req_base.req_count = count; \ + request->req_base.req_datatype = datatype; \ + request->req_base.req_peer = peer; \ + request->req_base.req_tag = tag; \ + request->req_base.req_comm = comm; \ + request->req_base.req_proc = ompi_comm_peer_lookup(comm,peer); \ + request->req_base.req_persistent = persistent; \ + request->req_base.req_mpi_done = false; \ + request->req_base.req_pml_done = false; \ + request->req_base.req_free_called = false; \ + \ + /* initialize datatype convertor for this request */ \ + if(count > 0) { \ + int packed_size; \ + ompi_convertor_copy( \ + request->req_base.req_proc->proc_convertor, \ + &request->req_convertor); \ + ompi_convertor_init_for_send( \ + &request->req_convertor, \ + 0, \ + request->req_base.req_datatype, \ + request->req_base.req_count, \ + request->req_base.req_addr, \ + 0); \ + ompi_convertor_get_packed_size(&request->req_convertor, &packed_size); \ + request->req_bytes_packed = packed_size; \ + } else { \ + request->req_bytes_packed = 0; \ + } \ } diff --git a/src/mca/pml/teg/src/Makefile.am b/src/mca/pml/teg/src/Makefile.am index c6412ce47a..5264c4bd93 100644 --- a/src/mca/pml/teg/src/Makefile.am +++ b/src/mca/pml/teg/src/Makefile.am @@ -15,6 +15,8 @@ libmca_pml_teg_la_SOURCES = \ pml_teg_irecv.c \ pml_teg_isend.c \ pml_teg_module.c \ + pml_teg_ptl.c \ + pml_teg_ptl.h \ pml_teg_proc.c \ pml_teg_proc.h \ pml_teg_progress.c \ diff --git a/src/mca/pml/teg/src/pml_ptl_array.h b/src/mca/pml/teg/src/pml_ptl_array.h index 5c6a237114..6902433667 100644 --- a/src/mca/pml/teg/src/pml_ptl_array.h +++ b/src/mca/pml/teg/src/pml_ptl_array.h @@ -20,6 +20,7 @@ extern ompi_class_t mca_pml_teg_ptl_array_t_class; struct mca_ptl_proc_t { int ptl_weight; /**< PTL weight for scheduling */ struct mca_ptl_base_peer_t* ptl_peer; /**< PTL addressing info */ + struct mca_pml_base_ptl_t* ptl_base; /**< PML specific PTL info */ mca_ptl_t *ptl; /**< PTL implementation */ }; typedef struct mca_ptl_proc_t mca_ptl_proc_t; diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index ec3bf9a4cc..acb0ac84aa 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -13,6 +13,7 @@ #include "mca/ptl/base/ptl_base_sendfrag.h" #include "pml_teg.h" #include "pml_teg_proc.h" +#include "pml_teg_ptl.h" #include "pml_teg_recvreq.h" #include "pml_teg_sendreq.h" @@ -84,6 +85,7 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls) /* build an array of ptls and ptl modules */ mca_ptl_base_selected_module_t* selected_ptl; size_t num_ptls = ompi_list_get_size(ptls); + size_t cache_bytes = 0; mca_pml_teg.teg_num_ptls = 0; mca_pml_teg.teg_num_ptl_modules = 0; mca_pml_teg.teg_ptls = malloc(sizeof(mca_ptl_t*) * num_ptls); @@ -108,8 +110,24 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls) ptl->ptl_match = mca_ptl_base_recv_frag_match; ptl->ptl_send_progress = mca_pml_teg_send_request_progress; ptl->ptl_recv_progress = mca_pml_teg_recv_request_progress; + ptl->ptl_stack = ptl; + ptl->ptl_base = NULL; + + /* find maximum required size for cache */ + if(ptl->ptl_cache_bytes > cache_bytes) + cache_bytes = ptl->ptl_cache_bytes; } + /* setup send fragments based on largest required send request */ + ompi_free_list_init( + &mca_pml_teg.teg_send_requests, + sizeof(mca_pml_base_send_request_t) + cache_bytes, + OBJ_CLASS(mca_pml_base_send_request_t), + mca_pml_teg.teg_free_list_num, + mca_pml_teg.teg_free_list_max, + mca_pml_teg.teg_free_list_inc, + NULL); + /* sort ptl list by exclusivity */ qsort(mca_pml_teg.teg_ptls, mca_pml_teg.teg_num_ptls, sizeof(struct mca_ptl_t*), ptl_exclusivity_compare); return OMPI_SUCCESS; @@ -199,11 +217,6 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs) proc_ptl->ptl = ptl; proc_ptl->ptl_peer = ptl_peers[p]; proc_ptl->ptl_weight = 0; - - /* if this ptl supports exclusive access then don't allow - * subsequent ptls to register - */ - proc_pml->proc_ptl_flags |= ptl->ptl_flags; } } @@ -245,12 +258,25 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs) struct mca_ptl_proc_t* proc_ptl = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, n_index); struct mca_ptl_t *ptl = proc_ptl->ptl; double weight; + + /* compute weighting factor for this ptl */ if(ptl->ptl_bandwidth) weight = proc_ptl->ptl->ptl_bandwidth / total_bandwidth; else weight = 1.0 / n_size; proc_ptl->ptl_weight = (int)(weight * 100); + /* + * save/create ptl extension for use by pml + */ + proc_ptl->ptl_base = ptl->ptl_base; + if(NULL == proc_ptl->ptl_base && ptl->ptl_cache_bytes > 0) { + mca_pml_base_ptl_t* ptl_base = OBJ_NEW(mca_pml_base_ptl_t); + ptl_base->ptl = ptl; + ptl_base->ptl_cache_size = ptl->ptl_cache_size; + proc_ptl->ptl_base = ptl->ptl_base = ptl_base; + } + /* check to see if this ptl is already in the array of ptls used for first * fragments - if not add it. */ @@ -270,6 +296,7 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs) *proc_new = *proc_ptl; } } + } } return OMPI_SUCCESS; diff --git a/src/mca/pml/teg/src/pml_teg.h b/src/mca/pml/teg/src/pml_teg.h index d493c29673..6b280d6190 100644 --- a/src/mca/pml/teg/src/pml_teg.h +++ b/src/mca/pml/teg/src/pml_teg.h @@ -42,7 +42,8 @@ struct mca_pml_teg_t { int teg_free_list_inc; /* number of elements to grow free list */ int teg_poll_iterations; /* number of iterations to poll for completion */ - /* free list of recv requests */ + /* free list of requests */ + ompi_free_list_t teg_send_requests; ompi_free_list_t teg_recv_requests; #if MCA_PML_TEG_STATISTICS @@ -56,9 +57,9 @@ struct mca_pml_teg_t { #endif /* request completion */ - ompi_mutex_t teg_request_lock; - ompi_condition_t teg_request_cond; - volatile int teg_request_waiting; + ompi_mutex_t teg_request_lock; + ompi_condition_t teg_request_cond; + volatile int teg_request_waiting; mca_pml_base_request_t teg_request_null; }; typedef struct mca_pml_teg_t mca_pml_teg_t; @@ -258,7 +259,7 @@ extern int mca_pml_teg_free( if(pml_request->req_free_called) { \ MCA_PML_TEG_FREE(request); \ } else { \ - pml_request->super.req_state = OMPI_REQUEST_INACTIVE; \ + pml_request->req_ompi.req_state = OMPI_REQUEST_INACTIVE; \ } \ } else { \ MCA_PML_TEG_FREE(request); \ @@ -277,11 +278,10 @@ extern int mca_pml_teg_free( case MCA_PML_REQUEST_SEND: \ { \ mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)pml_request; \ - mca_ptl_t* ptl = sendreq->req_owner; \ if(sendreq->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ } \ - ptl->ptl_request_return(ptl, sendreq); \ + MCA_PML_TEG_SEND_REQUEST_RETURN(sendreq); \ break; \ } \ case MCA_PML_REQUEST_RECV: \ diff --git a/src/mca/pml/teg/src/pml_teg_free.c b/src/mca/pml/teg/src/pml_teg_free.c index af2e8c18a5..1254bfb8e3 100644 --- a/src/mca/pml/teg/src/pml_teg_free.c +++ b/src/mca/pml/teg/src/pml_teg_free.c @@ -1,4 +1,5 @@ #include "pml_teg.h" +#include "pml_teg_sendreq.h" #include "mca/pml/base/pml_base_request.h" diff --git a/src/mca/pml/teg/src/pml_teg_iprobe.c b/src/mca/pml/teg/src/pml_teg_iprobe.c index 579ff818c7..6199c4eff4 100644 --- a/src/mca/pml/teg/src/pml_teg_iprobe.c +++ b/src/mca/pml/teg/src/pml_teg_iprobe.c @@ -11,8 +11,8 @@ int mca_pml_teg_iprobe( int rc; mca_pml_base_recv_request_t recvreq; - recvreq.super.super.req_type = OMPI_REQUEST_PML; - recvreq.super.req_type = MCA_PML_REQUEST_IPROBE; + recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; + recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE; MCA_PML_BASE_RECV_REQUEST_INIT( &recvreq, NULL, @@ -27,8 +27,8 @@ int mca_pml_teg_iprobe( OBJ_DESTRUCT(&recvreq); return rc; } - if((*matched = recvreq.super.req_mpi_done) == true && (NULL != status)) { - *status = recvreq.super.req_status; + if((*matched = recvreq.req_base.req_mpi_done) == true && (NULL != status)) { + *status = recvreq.req_base.req_status; } return OMPI_SUCCESS; } @@ -42,8 +42,8 @@ int mca_pml_teg_probe( { int rc; mca_pml_base_recv_request_t recvreq; - recvreq.super.super.req_type = OMPI_REQUEST_PML; - recvreq.super.req_type = MCA_PML_REQUEST_PROBE; + recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; + recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE; MCA_PML_BASE_RECV_REQUEST_INIT( &recvreq, NULL, @@ -59,25 +59,25 @@ int mca_pml_teg_probe( return rc; } - if(recvreq.super.req_mpi_done == false) { + if(recvreq.req_base.req_mpi_done == false) { /* give up and sleep until completion */ if(ompi_using_threads()) { ompi_mutex_lock(&mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting++; - while(recvreq.super.req_mpi_done == false) + while(recvreq.req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); } else { mca_pml_teg.teg_request_waiting++; - while(recvreq.super.req_mpi_done == false) + while(recvreq.req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; } } if(NULL != status) { - *status = recvreq.super.req_status; + *status = recvreq.req_base.req_status; } return OMPI_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_irecv.c b/src/mca/pml/teg/src/pml_teg_irecv.c index 1fe6f8f267..179b520c4e 100644 --- a/src/mca/pml/teg/src/pml_teg_irecv.c +++ b/src/mca/pml/teg/src/pml_teg_irecv.c @@ -101,18 +101,18 @@ int mca_pml_teg_recv( return rc; } - if(recvreq->super.req_mpi_done == false) { + if(recvreq->req_base.req_mpi_done == false) { /* give up and sleep until completion */ if(ompi_using_threads()) { ompi_mutex_lock(&mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting++; - while(recvreq->super.req_mpi_done == false) + while(recvreq->req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); } else { mca_pml_teg.teg_request_waiting++; - while(recvreq->super.req_mpi_done == false) + while(recvreq->req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; } diff --git a/src/mca/pml/teg/src/pml_teg_isend.c b/src/mca/pml/teg/src/pml_teg_isend.c index 896b05562b..dd365cd594 100644 --- a/src/mca/pml/teg/src/pml_teg_isend.c +++ b/src/mca/pml/teg/src/pml_teg_isend.c @@ -113,18 +113,18 @@ int mca_pml_teg_send( return rc; } - if(sendreq->super.req_mpi_done == false) { + if(sendreq->req_base.req_mpi_done == false) { /* give up and sleep until completion */ if(ompi_using_threads()) { ompi_mutex_lock(&mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting++; - while(sendreq->super.req_mpi_done == false) + while(sendreq->req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); } else { mca_pml_teg.teg_request_waiting++; - while(sendreq->super.req_mpi_done == false) + while(sendreq->req_base.req_mpi_done == false) ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting--; } diff --git a/src/mca/pml/teg/src/pml_teg_module.c b/src/mca/pml/teg/src/pml_teg_module.c index eb868e222d..10baf1cfce 100644 --- a/src/mca/pml/teg/src/pml_teg_module.c +++ b/src/mca/pml/teg/src/pml_teg_module.c @@ -62,6 +62,7 @@ int mca_pml_teg_module_open(void) { mca_pml_base_request_t* teg_null = &mca_pml_teg.teg_request_null; OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_pml_teg.teg_send_requests, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_teg.teg_recv_requests, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t); @@ -124,6 +125,7 @@ int mca_pml_teg_module_close(void) free(mca_pml_teg.teg_ptl_modules); if(NULL != mca_pml_teg.teg_ptls) free(mca_pml_teg.teg_ptls); + OBJ_DESTRUCT(&mca_pml_teg.teg_send_requests); OBJ_DESTRUCT(&mca_pml_teg.teg_recv_requests); OBJ_DESTRUCT(&mca_pml_teg.teg_procs); OBJ_DESTRUCT(&mca_pml_teg.teg_lock); @@ -153,7 +155,7 @@ mca_pml_t* mca_pml_teg_module_init(int* priority, mca_pml_teg.teg_free_list_max, mca_pml_teg.teg_free_list_inc, NULL); - + /* request completion */ OBJ_CONSTRUCT(&mca_pml_teg.teg_request_lock, ompi_mutex_t); OBJ_CONSTRUCT(&mca_pml_teg.teg_request_cond, ompi_condition_t); diff --git a/src/mca/pml/teg/src/pml_teg_proc.h b/src/mca/pml/teg/src/pml_teg_proc.h index d8042e9352..1d873bbb40 100644 --- a/src/mca/pml/teg/src/pml_teg_proc.h +++ b/src/mca/pml/teg/src/pml_teg_proc.h @@ -19,8 +19,8 @@ */ struct mca_pml_proc_t { ompi_list_item_t super; - ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */ - ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */ + ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */ + ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */ mca_ptl_array_t proc_ptl_first; /**< array of ptls to use for first fragments */ mca_ptl_array_t proc_ptl_next; /**< array of ptls to use for remaining fragments */ uint32_t proc_ptl_flags; /**< aggregate ptl flags */ diff --git a/src/mca/pml/teg/src/pml_teg_ptl.c b/src/mca/pml/teg/src/pml_teg_ptl.c new file mode 100644 index 0000000000..1b002e7bb3 --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_ptl.c @@ -0,0 +1,23 @@ +#include "pml_teg_ptl.h" + + +static void mca_pml_base_ptl_construct(mca_pml_base_ptl_t* ptl) +{ + OBJ_CONSTRUCT(&ptl->ptl_cache, ompi_list_t); + OBJ_CONSTRUCT(&ptl->ptl_cache_lock, ompi_mutex_t); + ptl->ptl = NULL; +} + +static void mca_pml_base_ptl_destruct(mca_pml_base_ptl_t* ptl) +{ + OBJ_DESTRUCT(&ptl->ptl_cache); + OBJ_DESTRUCT(&ptl->ptl_cache_lock); +} + +OBJ_CLASS_INSTANCE( + mca_pml_base_ptl_t, + ompi_list_t, + mca_pml_base_ptl_construct, + mca_pml_base_ptl_destruct +); + diff --git a/src/mca/pml/teg/src/pml_teg_ptl.h b/src/mca/pml/teg/src/pml_teg_ptl.h new file mode 100644 index 0000000000..c6beff31ad --- /dev/null +++ b/src/mca/pml/teg/src/pml_teg_ptl.h @@ -0,0 +1,18 @@ +#ifndef _MCA_PML_BASE_PTL_ +#define _MCA_PML_BASE_PTL_ + +#include "mca/ptl/ptl.h" + +struct mca_pml_base_ptl_t { + ompi_list_t ptl_cache; /**< cache of send requests */ + size_t ptl_cache_size; /**< maximum size of cache */ + ompi_mutex_t ptl_cache_lock; /**< lock for queue access */ + struct mca_ptl_t* ptl; /**< back pointer to ptl */ +}; +typedef struct mca_pml_base_ptl_t mca_pml_base_ptl_t; + +OBJ_CLASS_DECLARATION(mca_pml_base_ptl_t); + + +#endif + diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.c b/src/mca/pml/teg/src/pml_teg_recvreq.c index c10348c7b1..3b5945e22e 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/pml/teg/src/pml_teg_recvreq.c @@ -14,19 +14,20 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc( void mca_pml_teg_recv_request_progress( struct mca_ptl_t* ptl, mca_pml_base_recv_request_t* req, - mca_ptl_base_recv_frag_t* frag) + size_t bytes_received, + size_t bytes_delivered) { OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock); - req->req_bytes_delivered += frag->super.frag_size; - req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length; + req->req_bytes_received += bytes_received; + req->req_bytes_delivered += bytes_delivered; if (req->req_bytes_received >= req->req_bytes_packed) { /* initialize request status */ - req->super.req_status.MPI_SOURCE = req->super.req_peer; - req->super.req_status.MPI_TAG = req->super.req_tag; - req->super.req_status.MPI_ERROR = OMPI_SUCCESS; - req->super.req_status._count = req->req_bytes_delivered; - req->super.req_pml_done = true; - req->super.req_mpi_done = true; + req->req_base.req_status.MPI_SOURCE = req->req_base.req_peer; + req->req_base.req_status.MPI_TAG = req->req_base.req_tag; + req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_base.req_status._count = req->req_bytes_delivered; + req->req_base.req_pml_done = true; + req->req_base.req_mpi_done = true; if(mca_pml_teg.teg_request_waiting) { #if MCA_PML_TEG_STATISTICS mca_pml_teg.teg_condition_broadcasts++; @@ -46,20 +47,20 @@ void mca_pml_teg_recv_request_progress( void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* request) { - ompi_communicator_t *comm = request->super.req_comm; + ompi_communicator_t *comm = request->req_base.req_comm; mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm; - int req_peer = request->super.req_peer; + int req_peer = request->req_base.req_peer; mca_ptl_base_recv_frag_t* frag; /* check for a specific match */ OMPI_THREAD_LOCK(&pml_comm->c_matching_lock); /* assign sequence number */ - request->super.req_sequence = pml_comm->c_recv_seq++; + request->req_base.req_sequence = pml_comm->c_recv_seq++; if (ompi_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 && (frag = mca_pml_teg_recv_request_match_specific_proc(request, req_peer)) != NULL) { - mca_ptl_t* ptl = frag->super.frag_owner; + mca_ptl_t* ptl = frag->frag_base.frag_owner; OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); ptl->ptl_matched(ptl, frag); return; /* match found */ @@ -68,7 +69,7 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques /* We didn't find any matches. Record this irecv so we can match * it when the message comes in. */ - if(request->super.req_type != MCA_PML_REQUEST_IPROBE) + if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE) ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request); OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); } @@ -81,7 +82,7 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) { - ompi_communicator_t *comm = request->super.req_comm; + ompi_communicator_t *comm = request->req_base.req_comm; mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm; int proc_count = comm->c_remote_group->grp_proc_count; int proc; @@ -95,7 +96,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) OMPI_THREAD_LOCK(&pml_comm->c_matching_lock); /* assign sequence number */ - request->super.req_sequence = pml_comm->c_recv_seq++; + request->req_base.req_sequence = pml_comm->c_recv_seq++; for (proc = 0; proc < proc_count; proc++) { mca_ptl_base_recv_frag_t* frag; @@ -106,7 +107,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) /* loop over messages from the current proc */ if ((frag = mca_pml_teg_recv_request_match_specific_proc(request, proc)) != NULL) { - mca_ptl_t* ptl = frag->super.frag_owner; + mca_ptl_t* ptl = frag->frag_base.frag_owner; OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); ptl->ptl_matched(ptl, frag); return; /* match found */ @@ -117,7 +118,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) * it when the message comes in. */ - if(request->super.req_type != MCA_PML_REQUEST_IPROBE) + if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE) ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request); OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); } @@ -131,15 +132,15 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request) static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc( mca_pml_base_recv_request_t* request, int proc) { - mca_pml_ptl_comm_t *pml_comm = request->super.req_comm->c_pml_comm; + mca_pml_ptl_comm_t *pml_comm = request->req_base.req_comm->c_pml_comm; ompi_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc; mca_ptl_base_recv_frag_t* frag; - int tag = request->super.req_tag; + int tag = request->req_base.req_tag; for (frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_first(unexpected_frags); frag != (mca_ptl_base_recv_frag_t*)ompi_list_get_end(unexpected_frags); frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_next(frag)) { - mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match; + mca_ptl_base_match_header_t* header = &frag->frag_base.frag_header.hdr_match; /* check first frag - we assume that process matching has been done already */ if (((tag == OMPI_ANY_TAG) || (tag == header->hdr_tag))) { @@ -149,8 +150,8 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc( } ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag); request->req_bytes_packed = header->hdr_msg_length; - request->super.req_tag = header->hdr_tag; - request->super.req_peer = header->hdr_src; + request->req_base.req_tag = header->hdr_tag; + request->req_base.req_peer = header->hdr_src; frag->frag_request = request; return frag; } diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.h b/src/mca/pml/teg/src/pml_teg_recvreq.h index 40023f6de0..d1329eeb07 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.h +++ b/src/mca/pml/teg/src/pml_teg_recvreq.h @@ -58,8 +58,8 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques */ static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* request) { - request->super.super.req_state = OMPI_REQUEST_ACTIVE; - if(request->super.req_peer == OMPI_ANY_SOURCE) { + request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; + if(request->req_base.req_peer == OMPI_ANY_SOURCE) { mca_pml_teg_recv_request_match_wild(request); } else { mca_pml_teg_recv_request_match_specific(request); @@ -71,14 +71,16 @@ static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* re * Update status of a recv request based on the completion status of * the receive fragment. * - * @param ptl (IN) The PTL pointer. - * @param request (IN) Receive request. - * @param frag (IN) Receive fragment. + * @param ptl (IN) The PTL pointer. + * @param request (IN) Receive request. + * @param bytes_received (IN) Bytes received from peer. + * @param bytes_delivered (IN) Bytes delivered to application. */ void mca_pml_teg_recv_request_progress( struct mca_ptl_t* ptl, mca_pml_base_recv_request_t* request, - mca_ptl_base_recv_frag_t* frag + size_t bytes_received, + size_t bytes_delivered ); #endif diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.c b/src/mca/pml/teg/src/pml_teg_sendreq.c index 16d91109c0..d6a19cf513 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/pml/teg/src/pml_teg_sendreq.c @@ -20,7 +20,7 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req) { - ompi_proc_t *proc = ompi_comm_peer_lookup(req->super.req_comm, req->super.req_peer); + ompi_proc_t *proc = ompi_comm_peer_lookup(req->req_base.req_comm, req->req_base.req_peer); mca_pml_proc_t* proc_pml = proc->proc_pml; /* allocate remaining bytes to PTLs */ @@ -60,7 +60,7 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req) /* unable to complete send - signal request failed */ if(bytes_remaining > 0) { OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock); - req->super.req_mpi_done = true; + req->req_base.req_mpi_done = true; /* FIX - set status correctly */ if(mca_pml_teg.teg_request_waiting) ompi_condition_broadcast(&mca_pml_teg.teg_request_cond); @@ -82,24 +82,24 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req) void mca_pml_teg_send_request_progress( struct mca_ptl_t* ptl, mca_pml_base_send_request_t* req, - mca_ptl_base_send_frag_t* frag) + size_t bytes_sent) { bool first_frag; OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock); first_frag = (req->req_bytes_sent == 0 && req->req_bytes_packed > 0); - req->req_bytes_sent += frag->super.frag_size; + req->req_bytes_sent += bytes_sent; if (req->req_bytes_sent >= req->req_bytes_packed) { - req->super.req_pml_done = true; - if (req->super.req_mpi_done == false) { - req->super.req_status.MPI_SOURCE = req->super.req_comm->c_my_rank; - req->super.req_status.MPI_TAG = req->super.req_tag; - req->super.req_status.MPI_ERROR = OMPI_SUCCESS; - req->super.req_status._count = req->req_bytes_sent; - req->super.req_mpi_done = true; + req->req_base.req_pml_done = true; + if (req->req_base.req_mpi_done == false) { + req->req_base.req_status.MPI_SOURCE = req->req_base.req_comm->c_my_rank; + req->req_base.req_status.MPI_TAG = req->req_base.req_tag; + req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_base.req_status._count = req->req_bytes_sent; + req->req_base.req_mpi_done = true; if(mca_pml_teg.teg_request_waiting) { ompi_condition_broadcast(&mca_pml_teg.teg_request_cond); } - } else if (req->super.req_free_called) { + } else if (req->req_base.req_free_called) { MCA_PML_TEG_FREE((ompi_request_t**)&req); } OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.h b/src/mca/pml/teg/src/pml_teg_sendreq.h index 76acbd75a1..10635bf9ba 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.h +++ b/src/mca/pml/teg/src/pml_teg_sendreq.h @@ -7,48 +7,97 @@ #ifndef OMPI_PML_TEG_SEND_REQUEST_H #define OMPI_PML_TEG_SEND_REQUEST_H -#include "pml_teg_proc.h" #include "mca/ptl/ptl.h" #include "mca/pml/base/pml_base_sendreq.h" #include "mca/ptl/base/ptl_base_sendfrag.h" #include "mca/ptl/base/ptl_base_comm.h" +#include "pml_teg_proc.h" +#include "pml_teg_ptl.h" - -#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \ - comm, \ - dst, \ - sendreq, \ - rc) \ -{ \ - mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \ - mca_ptl_proc_t* ptl_proc; \ - mca_ptl_t* ptl; \ -\ - THREAD_SCOPED_LOCK(&proc->proc_lock, \ - (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \ - ptl = ptl_proc->ptl; \ - rc = ptl->ptl_request_alloc(ptl,&sendreq); \ - if(NULL != sendreq) \ - sendreq->req_peer = ptl_proc->ptl_peer; \ +#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \ + comm, \ + dst, \ + sendreq, \ + rc) \ +{ \ + mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \ + mca_ptl_proc_t* ptl_proc; \ + mca_pml_base_ptl_t* ptl_base; \ + \ + THREAD_SCOPED_LOCK(&proc->proc_lock, \ + (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \ + ptl_base = ptl_proc->ptl_base; \ + /* \ + * check to see if there is a cache of send requests associated with \ + * this ptl - if so try the allocation from there. \ + */ \ + if(NULL != ptl_base) { \ + OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \ + sendreq = (mca_pml_base_send_request_t*) \ + ompi_list_remove_first(&ptl_base->ptl_cache); \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + if(NULL != sendreq) { \ + rc = OMPI_SUCCESS; \ + } else { \ + mca_ptl_t* ptl = ptl_base->ptl; \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_teg.teg_send_requests, item, rc); \ + sendreq = (mca_pml_base_send_request_t*)item; \ + sendreq->req_ptl = ptl; \ + sendreq->req_peer = ptl_proc->ptl_peer; \ + ptl->ptl_request_init(ptl, sendreq); \ + } \ + \ + /* otherwise - take the allocation from the global list */ \ + } else { \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_teg.teg_send_requests, item, rc); \ + sendreq = (mca_pml_base_send_request_t*)item; \ + sendreq->req_ptl = ptl_proc->ptl; \ + sendreq->req_peer = ptl_proc->ptl_peer; \ + } \ } -#define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \ - request->req_owner->ptl_request_return(request->req_owner, request); + +#define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \ +{ \ + mca_ptl_t* ptl = sendreq->req_ptl; \ + mca_pml_base_ptl_t* ptl_base = ptl->ptl_base; \ + /* \ + * If there is a cache associated with the ptl - first attempt \ + * to return the send descriptor to the cache. \ + */ \ + if(NULL != ptl->ptl_base) { \ + OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \ + if(ompi_list_get_size(&ptl_base->ptl_cache) >= ptl_base->ptl_cache_size) { \ + /* if cache limit is exceeded - return to global pool */ \ + ptl->ptl_request_fini(ptl, sendreq); \ + OMPI_FREE_LIST_RETURN( \ + &mca_pml_teg.teg_send_requests, (ompi_list_item_t*)sendreq); \ + } else { \ + ompi_list_prepend(&ptl_base->ptl_cache, (ompi_list_item_t*)sendreq); \ + } \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + } else { \ + OMPI_FREE_LIST_RETURN( \ + &mca_pml_teg.teg_send_requests, (ompi_list_item_t*)request); \ + } \ +} static inline int mca_pml_teg_send_request_start( mca_pml_base_send_request_t* req) { - mca_ptl_t* ptl = req->req_owner; + mca_ptl_t* ptl = req->req_ptl; size_t first_fragment_size = ptl->ptl_first_frag_size; size_t offset = req->req_offset; int flags, rc; /* initialize request state and message sequence number */ - req->super.super.req_state = OMPI_REQUEST_ACTIVE; - req->super.req_sequence = mca_pml_ptl_comm_send_sequence( - req->super.req_comm->c_pml_comm, - req->super.req_peer); + req->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; + req->req_base.req_sequence = mca_pml_ptl_comm_send_sequence( + req->req_base.req_comm->c_pml_comm, + req->req_base.req_peer); /* start the first fragment */ if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) { @@ -59,7 +108,7 @@ static inline int mca_pml_teg_send_request_start( /* require match for first fragment of a multi-fragment */ flags = MCA_PTL_FLAGS_ACK_MATCHED; } - rc = ptl->ptl_put(ptl, req->req_peer, req, offset, first_fragment_size, flags); + rc = ptl->ptl_send(ptl, req->req_peer, req, offset, first_fragment_size, flags); if(rc != OMPI_SUCCESS) return rc; return OMPI_SUCCESS; @@ -72,7 +121,7 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req); void mca_pml_teg_send_request_progress( struct mca_ptl_t* ptl, mca_pml_base_send_request_t* send_request, - mca_ptl_base_send_frag_t* send_frag + size_t bytes_sent ); #endif diff --git a/src/mca/pml/teg/src/pml_teg_start.c b/src/mca/pml/teg/src/pml_teg_start.c index 4e0579269e..0f24710adc 100644 --- a/src/mca/pml/teg/src/pml_teg_start.c +++ b/src/mca/pml/teg/src/pml_teg_start.c @@ -18,7 +18,7 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests) * when the request completes - and create a new request. */ - switch(pml_request->super.req_state) { + switch(pml_request->req_ompi.req_state) { case OMPI_REQUEST_INVALID: return OMPI_ERR_REQUEST; case OMPI_REQUEST_INACTIVE: diff --git a/src/mca/pml/teg/src/pml_teg_test.c b/src/mca/pml/teg/src/pml_teg_test.c index a51f5685b9..7aaa8d4484 100644 --- a/src/mca/pml/teg/src/pml_teg_test.c +++ b/src/mca/pml/teg/src/pml_teg_test.c @@ -1,4 +1,5 @@ #include "pml_teg.h" +#include "pml_teg_sendreq.h" int mca_pml_teg_test( diff --git a/src/mca/pml/teg/src/pml_teg_wait.c b/src/mca/pml/teg/src/pml_teg_wait.c index 1b7ea8c110..47cb3f740a 100644 --- a/src/mca/pml/teg/src/pml_teg_wait.c +++ b/src/mca/pml/teg/src/pml_teg_wait.c @@ -1,4 +1,5 @@ #include "pml_teg.h" +#include "pml_teg_sendreq.h" #include "mca/ptl/base/ptl_base_comm.h" #include "mca/pml/base/pml_base_request.h" diff --git a/src/mca/ptl/base/ptl_base_match.c b/src/mca/ptl/base/ptl_base_match.c index 03953e63ed..56daf00254 100644 --- a/src/mca/ptl/base/ptl_base_match.c +++ b/src/mca/ptl/base/ptl_base_match.c @@ -264,7 +264,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match( wild_recv = (mca_pml_base_recv_request_t *) ((ompi_list_item_t *)wild_recv)->ompi_list_next) { - recv_tag = wild_recv->super.req_tag; + recv_tag = wild_recv->req_base.req_tag; if ( /* exact tag match */ (frag_tag == recv_tag) || @@ -332,7 +332,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat /* * Check for a match */ - recv_tag = specific_recv->super.req_tag; + recv_tag = specific_recv->req_base.req_tag; if ( (frag_tag == recv_tag) || ( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { @@ -391,8 +391,8 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive wild_recv = (mca_pml_base_recv_request_t *) ompi_list_get_first(&(pml_comm->c_wild_receives)); - specific_recv_seq = specific_recv->super.req_sequence; - wild_recv_seq = wild_recv->super.req_sequence; + specific_recv_seq = specific_recv->req_base.req_sequence; + wild_recv_seq = wild_recv->req_base.req_sequence; while (true) { if (wild_recv_seq < specific_recv_seq) { @@ -402,7 +402,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive /* * try and match */ - wild_recv_tag = wild_recv->super.req_tag; + wild_recv_tag = wild_recv->req_base.req_tag; if ( (frag_tag == wild_recv_tag) || ( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { @@ -441,13 +441,13 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive * Get the sequence number for this recv, and go * back to the top of the loop. */ - wild_recv_seq = wild_recv->super.req_sequence; + wild_recv_seq = wild_recv->req_base.req_sequence; } else { /* * specific recv is earlier than the wild one. */ - specific_recv_tag=specific_recv->super.req_tag; + specific_recv_tag=specific_recv->req_base.req_tag; if ( (frag_tag == specific_recv_tag) || ( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) { @@ -486,7 +486,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive * Get the sequence number for this recv, and go * back to the top of the loop. */ - specific_recv_seq = specific_recv->super.req_sequence; + specific_recv_seq = specific_recv->req_base.req_sequence; } } } @@ -545,7 +545,7 @@ static void mca_ptl_base_check_cantmatch_for_match(ompi_list_t *additional_match /* * If the message has the next expected seq from that proc... */ - frag_seq=frag_desc->super.frag_header.hdr_match.hdr_msg_seq; + frag_seq=frag_desc->frag_base.frag_header.hdr_match.hdr_msg_seq; if (frag_seq == next_msg_seq_expected) { /* We're now expecting the next sequence number. */ @@ -564,7 +564,7 @@ static void mca_ptl_base_check_cantmatch_for_match(ompi_list_t *additional_match * check to see if this frag matches a posted message */ matched_receive = mca_ptl_base_check_receives_for_match( - &frag_desc->super.frag_header.hdr_match, pml_comm); + &frag_desc->frag_base.frag_header.hdr_match, pml_comm); /* if match found, process data */ if (matched_receive) { diff --git a/src/mca/ptl/base/ptl_base_recvfrag.h b/src/mca/ptl/base/ptl_base_recvfrag.h index 58f0cdf493..f0b5e0e30e 100644 --- a/src/mca/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/ptl/base/ptl_base_recvfrag.h @@ -18,7 +18,7 @@ extern ompi_class_t mca_ptl_base_recv_frag_t_class; * Base type for receive fragment descriptors. */ struct mca_ptl_base_recv_frag_t { - mca_ptl_base_frag_t super; /**< base fragment descriptor */ + mca_ptl_base_frag_t frag_base; /**< base fragment descriptor */ mca_pml_base_recv_request_t *frag_request; /**< matched posted receive */ bool frag_is_buffered; /**< does fragment need to be unpacked into users buffer */ }; @@ -45,23 +45,27 @@ static inline bool mca_ptl_base_recv_frag_match( frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags); while(NULL != frag) { - mca_ptl_t* ptl = frag->super.frag_owner; + mca_ptl_t* ptl = frag->frag_base.frag_owner; mca_pml_base_recv_request_t *request = frag->frag_request; - mca_ptl_base_match_header_t *header = &frag->super.frag_header.hdr_match; + mca_ptl_base_match_header_t *header = &frag->frag_base.frag_header.hdr_match; /* * Initialize request status. */ request->req_bytes_packed = header->hdr_msg_length; - request->super.req_peer = header->hdr_src; - request->super.req_tag = header->hdr_tag; + request->req_base.req_peer = header->hdr_src; + request->req_base.req_tag = header->hdr_tag; /* * If probe - signal request is complete - but don't notify PTL */ - if(request->super.req_type == MCA_PML_REQUEST_PROBE) { + if(request->req_base.req_type == MCA_PML_REQUEST_PROBE) { - ptl->ptl_recv_progress(ptl, request, frag); + ptl->ptl_recv_progress( + ptl, + request, + frag->frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_base.frag_size); matched = mca_ptl_base_recv_frag_match( ptl, frag, header ); } else { diff --git a/src/mca/ptl/base/ptl_base_sendfrag.h b/src/mca/ptl/base/ptl_base_sendfrag.h index 9ea3e7e663..98618430a5 100644 --- a/src/mca/ptl/base/ptl_base_sendfrag.h +++ b/src/mca/ptl/base/ptl_base_sendfrag.h @@ -16,7 +16,7 @@ extern ompi_class_t mca_ptl_base_send_frag_t_class; * Base type for send fragment descriptors */ struct mca_ptl_base_send_frag_t { - mca_ptl_base_frag_t super; /**< base fragment descriptor */ + mca_ptl_base_frag_t frag_base; /**< base fragment descriptor */ struct mca_pml_base_send_request_t *frag_request; /**< pointer to send request */ }; typedef struct mca_ptl_base_send_frag_t mca_ptl_base_send_frag_t; diff --git a/src/mca/ptl/ptl.h b/src/mca/ptl/ptl.h index 657f8f808a..5dc1863585 100644 --- a/src/mca/ptl/ptl.h +++ b/src/mca/ptl/ptl.h @@ -299,66 +299,102 @@ typedef int (*mca_ptl_base_del_procs_fn_t)( ); /** - * PML->PTL Allocate a send request from the PTL modules free list. + * PML->PTL Initialize a send request for use by the PTL. * * @param ptl (IN) PTL instance * @param request (OUT) Pointer to allocated request. - * @return Status indicating if allocation was successful. * - * To reduce latency (number of required allocations), a derived - * type of mca_pml_base_send_request_t is obtained from the PTL that - * is selected to send the first fragment. The derived type should contain - * space for the base request structure, the PTL first fragment, - * and any other required buffer space. + * To reduce latency (number of required allocations), the PML allocates additional + * space along w/ each request - that may be used by the PTL for additional control + * information (e.g. first fragment descriptor). + * + * The init function is called the first time the request is ued by the PTL. On + * completion of the request - the PML will cache the request for later use by the + * same PTL. When the request is re-used from the cache, the init function is NOT + * called for subsequent sends. */ -typedef int (*mca_ptl_base_request_alloc_fn_t)( +typedef void (*mca_ptl_base_request_init_fn_t)( struct mca_ptl_t* ptl, - struct mca_pml_base_send_request_t** request + struct mca_pml_base_send_request_t* request ); + /** - * PML->PTL Return a send request to the PTL modules free list. + * PML->PTL Cleanup any resources that may have been associated with the + * request by the PTL. * * @param ptl (IN) PTL instance - * @param request (IN) Pointer to allocated request. + * @param request (OUT) Pointer to allocated request. * - * Called when the request has been completed at both the MPI - * and PML layers. + * The fini function is called when the PML removes a request from the PTLs + * cache (due to resource constraints) or reaching cache limit, prior to re-using + * the request for another PTL. This provides the PTL the chance to cleanup/release + * any resources cached on the send descriptor by the PTL. */ -typedef void (*mca_ptl_base_request_return_fn_t)( + +typedef void (*mca_ptl_base_request_fini_fn_t)( struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request ); /** - * PML->PTL Initiate a send/put to the peer. + * PML->PTL Initiate a send to the peer. * * @param ptl (IN) PTL instance * @param ptl_base_peer (IN) PTL peer addressing - * @param request (IN) Send request + * @param request (IN) Send request * @param offset Current offset into packed/contiguous buffer. - * @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver, + * @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver, * PTL returns number of bytes sucessfully fragmented * @param flags (IN) Flags that should be passed to the peer via the message header. * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments * * The PML implements a rendevouz protocol, with up to the PTL defined threshold - * bytes of the message sent in eager send mode. On receipt of an acknowledgment - * from the peer, the PML will schedule the remaining fragments. If the PTL supports - * RDMA functionality, these subsequent transfers may use RDMA put semantics. + * bytes of the message sent in eager send mode. * * If the PTL is unable to fragment the requested size, possibly due to resource - * constraints or datatype alighnment/offset, it should return the number of bytes + * constraints or datatype alighnment/offset, it should return the number of bytes * actually fragmented in the size parameter. */ -typedef int (*mca_ptl_base_put_fn_t)( - struct mca_ptl_t* ptl, - struct mca_ptl_base_peer_t* ptl_base_peer, +typedef int (*mca_ptl_base_send_fn_t)( + struct mca_ptl_t* ptl, + struct mca_ptl_base_peer_t* ptl_base_peer, struct mca_pml_base_send_request_t* request, size_t offset, size_t size, int flags ); + +/** + * PML->PTL Initiate a put to the peer. + * + * @param ptl (IN) PTL instance + * @param ptl_base_peer (IN) PTL peer addressing + * @param request (IN) Send request + * @param offset Current offset into packed/contiguous buffer. + * @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver, + * PTL returns number of bytes sucessfully fragmented + * @param flags (IN) Flags that should be passed to the peer via the message header. + * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments + * + * The PML implements a rendevouz protocol, with up to the PTL defined threshold + * bytes of the message sent in eager send mode (via mca_ptl_base_send_fn_t). On receipt + * of an acknowledgment from the peer, the PML will schedule the remaining fragments. If + * the PTL supports RDMA functionality, these subsequent transfers may use RDMA put semantics. + * + * If the PTL is unable to fragment the requested size, possibly due to resource + * constraints or datatype alighnment/offset, it should return the number of bytes + * actually fragmented in the size parameter. + */ +typedef int (*mca_ptl_base_put_fn_t)( + struct mca_ptl_t* ptl, + struct mca_ptl_base_peer_t* ptl_base_peer, + struct mca_pml_base_send_request_t* request, + size_t offset, + size_t size, + int flags +); + /** * PML->PTL Initiate a get from a peer. @@ -429,14 +465,16 @@ typedef void (*mca_ptl_base_matched_fn_t)( * PTL->PML Notification from the PTL to the PML that a fragment * has completed (e.g. been successfully delivered into users buffer) * - * @param ptr(IN) PTL instance - * @param recv_request (IN) Receive Request - * @param recv_frag (IN) Receive Fragment + * @param ptr(IN) PTL instance + * @param recv_request (IN) Receive Request + * @param bytes_received (IN) Number of bytes received from peer. + * @param bytes_delivered (IN) Number of bytes delivered to application. */ typedef void (*mca_ptl_base_recv_progress_fn_t)( struct mca_ptl_t* ptl, struct mca_pml_base_recv_request_t* recv_request, - struct mca_ptl_base_recv_frag_t* recv_frag + size_t bytes_received, + size_t bytes_delivered ); /** @@ -445,35 +483,14 @@ typedef void (*mca_ptl_base_recv_progress_fn_t)( * * @param ptr(IN) PTL instance * @param send_request (IN) Send Request - * @param send_frag (IN) Send Fragment + * @param bytes_sent (IN) Number of bytes sent to peer. */ typedef void (*mca_ptl_base_send_progress_fn_t)( struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* send_request, - struct mca_ptl_base_send_frag_t* send_frag + size_t bytes_sent ); -/** - * PTL function list. They are mainly used for the stack-based approach on the - * profilling PTL. - */ -struct mca_ptl_functions_t { - /* PML->PTL function table */ - mca_ptl_base_add_procs_fn_t ptl_add_procs; - mca_ptl_base_del_procs_fn_t ptl_del_procs; - mca_ptl_base_finalize_fn_t ptl_finalize; - mca_ptl_base_put_fn_t ptl_put; - mca_ptl_base_get_fn_t ptl_get; - mca_ptl_base_matched_fn_t ptl_matched; - mca_ptl_base_request_alloc_fn_t ptl_request_alloc; - mca_ptl_base_request_return_fn_t ptl_request_return; - /* PTL->PML function table - filled in by PML at init */ - mca_ptl_base_match_fn_t ptl_match; - mca_ptl_base_send_progress_fn_t ptl_send_progress; - mca_ptl_base_recv_progress_fn_t ptl_recv_progress; -}; -typedef struct mca_ptl_functions_t mca_ptl_functions_t; - /** * PTL instance interface functions and attributes. */ @@ -481,6 +498,8 @@ struct mca_ptl_t { /* PTL common attributes */ mca_ptl_base_module_t* ptl_module; /**< pointer back to the PTL module structure */ + size_t ptl_cache_size; /**< maximum size of request cache for this PTL */ + size_t ptl_cache_bytes; /**< number of bytes required by PTL for request cache */ size_t ptl_first_frag_size; /**< maximum size of first fragment -- eager send */ size_t ptl_min_frag_size; /**< threshold below which the PTL will not fragment */ size_t ptl_max_frag_size; /**< maximum fragment size supported by the PTL */ @@ -493,11 +512,12 @@ struct mca_ptl_t { mca_ptl_base_add_procs_fn_t ptl_add_procs; mca_ptl_base_del_procs_fn_t ptl_del_procs; mca_ptl_base_finalize_fn_t ptl_finalize; + mca_ptl_base_send_fn_t ptl_send; mca_ptl_base_put_fn_t ptl_put; mca_ptl_base_get_fn_t ptl_get; mca_ptl_base_matched_fn_t ptl_matched; - mca_ptl_base_request_alloc_fn_t ptl_request_alloc; - mca_ptl_base_request_return_fn_t ptl_request_return; + mca_ptl_base_request_init_fn_t ptl_request_init; + mca_ptl_base_request_fini_fn_t ptl_request_fini; /* PTL->PML function table - filled in by PML at init */ mca_ptl_base_match_fn_t ptl_match; @@ -506,6 +526,9 @@ struct mca_ptl_t { /* Allow the canibalization of the PTL */ struct mca_ptl_t* ptl_stack; + + /* for use by PML only */ + struct mca_pml_base_ptl_t* ptl_base; }; typedef struct mca_ptl_t mca_ptl_t; diff --git a/src/mca/ptl/self/src/ptl_self.c b/src/mca/ptl/self/src/ptl_self.c index b551e4f6da..12306d3451 100644 --- a/src/mca/ptl/self/src/ptl_self.c +++ b/src/mca/ptl/self/src/ptl_self.c @@ -25,6 +25,8 @@ mca_ptl_t mca_ptl_self = { &mca_ptl_self_module.super, + 8, /* ptl_cache_size */ + sizeof(mca_ptl_base_recv_frag_t), /* ptl_cache_bytes */ 0, /* ptl_frag_first_size */ 0, /* ptl_frag_min_size */ 0, /* ptl_frag_max_size */ @@ -36,10 +38,11 @@ mca_ptl_t mca_ptl_self = { mca_ptl_self_del_proc, mca_ptl_self_finalize, mca_ptl_self_send, /* put */ + mca_ptl_self_send, /* put */ NULL, /* get */ mca_ptl_self_matched, /* matched */ - mca_ptl_self_request_alloc, - mca_ptl_self_request_return, + mca_ptl_self_request_init, + mca_ptl_self_request_fini, NULL, /* match */ NULL, NULL @@ -73,23 +76,14 @@ int mca_ptl_self_finalize(struct mca_ptl_t* ptl) return OMPI_SUCCESS; } -int mca_ptl_self_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request) +void mca_ptl_self_request_init(struct mca_ptl_t* ptl, mca_pml_base_send_request_t* request) { - int rc; - mca_pml_base_send_request_t* sendreq; - ompi_list_item_t* send_item; - - OMPI_FREE_LIST_GET( &mca_ptl_self_module.self_send_requests, send_item, rc ); - - sendreq = (mca_pml_base_send_request_t*)send_item; - sendreq->req_owner = ptl; - *request = sendreq; - return rc; + OBJ_CONSTRUCT(request+1, mca_ptl_base_recv_frag_t); } -void mca_ptl_self_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request) +void mca_ptl_self_request_fini(struct mca_ptl_t* ptl, mca_pml_base_send_request_t* request) { - OMPI_FREE_LIST_RETURN( &mca_ptl_self_module.self_send_requests, (ompi_list_item_t*)request); + OBJ_DESTRUCT(request+1); } /* @@ -108,25 +102,25 @@ int mca_ptl_self_send( int flags ) { mca_ptl_self_send_request_t* req = (mca_ptl_self_send_request_t*)request; - mca_ptl_base_header_t* hdr = &(req->req_frag.super.frag_header); + mca_ptl_base_header_t* hdr = &(req->req_frag.frag_base.frag_header); hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_seq = 0; - hdr->hdr_match.hdr_contextid = request->super.req_comm->c_contextid; - hdr->hdr_match.hdr_src = request->super.req_comm->c_my_rank; - hdr->hdr_match.hdr_dst = request->super.req_peer; - hdr->hdr_match.hdr_tag = request->super.req_tag; + hdr->hdr_match.hdr_contextid = request->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = request->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = request->req_base.req_peer; + hdr->hdr_match.hdr_tag = request->req_base.req_tag; hdr->hdr_match.hdr_msg_length = request->req_bytes_packed; - hdr->hdr_match.hdr_msg_seq = request->super.req_sequence; + hdr->hdr_match.hdr_msg_seq = request->req_base.req_sequence; hdr->hdr_frag.hdr_frag_length = request->req_bytes_packed; hdr->hdr_frag.hdr_frag_offset = 0 ; hdr->hdr_frag.hdr_src_ptr.pval = (void*)req; - req->req_frag.super.frag_peer = ptl_base_peer; - req->req_frag.super.frag_size = request->req_bytes_packed; - req->req_frag.super.frag_owner = &mca_ptl_self; + req->req_frag.frag_base.frag_peer = ptl_base_peer; + req->req_frag.frag_base.frag_size = request->req_bytes_packed; + req->req_frag.frag_base.frag_owner = &mca_ptl_self; req->req_frag.frag_request = NULL; req->req_frag.frag_is_buffered = 0; ptl->ptl_match( ptl, &(req->req_frag), &(hdr->hdr_match) ); @@ -141,9 +135,9 @@ int mca_ptl_self_send( void mca_ptl_self_matched( mca_ptl_t* ptl, mca_ptl_base_recv_frag_t* frag) { - mca_ptl_self_send_request_t* sendreq = (mca_ptl_self_send_request_t*)(frag->super.frag_header.hdr_frag.hdr_src_ptr.pval); + mca_ptl_self_send_request_t* sendreq = (mca_ptl_self_send_request_t*) + frag->frag_base.frag_header.hdr_frag.hdr_src_ptr.pval; mca_pml_base_recv_request_t* recvreq = frag->frag_request; - mca_ptl_base_send_frag_t sendfrag; /* Did you have the same datatype or not ? If yes we can use an optimized version * for the copy function, if not we have to use a temporary buffer to pack/unpack @@ -152,24 +146,31 @@ void mca_ptl_self_matched( mca_ptl_t* ptl, * a contigous buffer and the convertor on the send request initialized to point * into this buffer. */ - if( sendreq->super.super.req_datatype == recvreq->super.req_datatype && - sendreq->super.req_send_mode != MCA_PML_BASE_SEND_BUFFERED) { - ompi_ddt_copy_content_same_ddt( recvreq->super.req_datatype, recvreq->super.req_count, - recvreq->super.req_addr, sendreq->super.super.req_addr ); + if( sendreq->req_send.req_base.req_datatype == recvreq->req_base.req_datatype && + sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_BUFFERED) { + ompi_ddt_copy_content_same_ddt( + recvreq->req_base.req_datatype, + recvreq->req_base.req_count, + recvreq->req_base.req_addr, + sendreq->req_send.req_base.req_addr ); } else { ompi_convertor_t *pSendConvertor, *pRecvConvertor; struct iovec iov[1]; int completed, iov_count, length; char* buf; - + /* We use a temporary buffer as it look to be faster on much architectures */ length = 64 * 1024; buf = malloc( length * sizeof(char) ); - ompi_convertor_init_for_recv( &(frag->super.frag_convertor), 0, recvreq->super.req_datatype, - recvreq->super.req_count, recvreq->super.req_addr, 0 ); - pSendConvertor = &(sendreq->super.req_convertor); - pRecvConvertor = &(frag->super.frag_convertor); + ompi_convertor_init_for_recv( + &frag->frag_base.frag_convertor, + 0, + recvreq->req_base.req_datatype, + recvreq->req_base.req_count, + recvreq->req_base.req_addr, 0 ); + pSendConvertor = &(sendreq->req_send.req_convertor); + pRecvConvertor = &(frag->frag_base.frag_convertor); completed = 0; while( !completed ) { iov[0].iov_base = buf; @@ -180,15 +181,12 @@ void mca_ptl_self_matched( mca_ptl_t* ptl, completed |= ompi_convertor_unpack( pRecvConvertor, iov, iov_count ); /*assert( freeAfter == 0 );*/ } - free( buf ); + free( buf ); } - /* Now lets progress the request */ - sendfrag.frag_request = &(sendreq->super); - /*sendfrag.super.frag_header = ;*/ - sendfrag.super.frag_owner = &mca_ptl_self; - sendfrag.super.frag_peer = NULL; - sendfrag.super.frag_addr = NULL; - sendfrag.super.frag_size = sendreq->super.req_bytes_packed; - ptl->ptl_send_progress( ptl, &(sendreq->super), &(sendfrag) ); - ptl->ptl_recv_progress( ptl, recvreq, frag ); + ptl->ptl_send_progress( ptl, &sendreq->req_send, sendreq->req_send.req_bytes_packed ); + ptl->ptl_recv_progress( ptl, + recvreq, + frag->frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_base.frag_size ); } + diff --git a/src/mca/ptl/self/src/ptl_self.h b/src/mca/ptl/self/src/ptl_self.h index 98673194a3..6acd42ce63 100644 --- a/src/mca/ptl/self/src/ptl_self.h +++ b/src/mca/ptl/self/src/ptl_self.h @@ -40,7 +40,7 @@ typedef struct mca_ptl_self_module_1_0_0_t mca_ptl_self_module_t; * base send request, and the base receive fragment which will be used to do the match. */ struct mca_ptl_self_send_request_t { - mca_pml_base_send_request_t super; + mca_pml_base_send_request_t req_send; mca_ptl_base_recv_frag_t req_frag; /* first fragment */ }; typedef struct mca_ptl_self_send_request_t mca_ptl_self_send_request_t; @@ -74,12 +74,12 @@ extern mca_ptl_t** mca_ptl_self_module_init( bool *have_hidden_threads ); -int mca_ptl_self_add_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **ompi_proc, struct mca_ptl_base_peer_t** peer_ret, ompi_bitmap_t* reachable); -int mca_ptl_self_del_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **proc, struct mca_ptl_base_peer_t** ptl_peer); -int mca_ptl_self_finalize(struct mca_ptl_t* ptl); -int mca_ptl_self_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request); -void mca_ptl_self_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request); -int mca_ptl_self_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_base_peer, struct mca_pml_base_send_request_t* request, +int mca_ptl_self_add_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **ompi_proc, struct mca_ptl_base_peer_t** peer_ret, ompi_bitmap_t* reachable); +int mca_ptl_self_del_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **proc, struct mca_ptl_base_peer_t** ptl_peer); +int mca_ptl_self_finalize(struct mca_ptl_t* ptl); +void mca_ptl_self_request_init(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request); +void mca_ptl_self_request_fini(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request); +int mca_ptl_self_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_base_peer, struct mca_pml_base_send_request_t* request, size_t offset, size_t size, int flags ); void mca_ptl_self_matched( mca_ptl_t* ptl, mca_ptl_base_recv_frag_t* frag ); diff --git a/src/mca/ptl/tcp/src/ptl_tcp.c b/src/mca/ptl/tcp/src/ptl_tcp.c index ad5f77dce0..729dd6d4b9 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/ptl/tcp/src/ptl_tcp.c @@ -24,6 +24,8 @@ mca_ptl_tcp_t mca_ptl_tcp = { { &mca_ptl_tcp_module.super, + 1, + sizeof(mca_ptl_tcp_send_frag_t), 0, /* ptl_exclusivity */ 0, /* ptl_latency */ 0, /* ptl_andwidth */ @@ -35,10 +37,11 @@ mca_ptl_tcp_t mca_ptl_tcp = { mca_ptl_tcp_del_procs, mca_ptl_tcp_finalize, mca_ptl_tcp_send, + mca_ptl_tcp_send, NULL, mca_ptl_tcp_matched, - mca_ptl_tcp_request_alloc, - mca_ptl_tcp_request_return + mca_ptl_tcp_request_init, + mca_ptl_tcp_request_fini } }; @@ -108,31 +111,22 @@ int mca_ptl_tcp_finalize(struct mca_ptl_t* ptl) return OMPI_SUCCESS; } -int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request) +void mca_ptl_tcp_request_init(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request) { - int rc; - mca_pml_base_send_request_t* sendreq; - ompi_list_item_t* item; - OMPI_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_requests, item, rc); - if(NULL != (sendreq = (mca_pml_base_send_request_t*)item)) - sendreq->req_owner = ptl; - *request = sendreq; - return rc; + OBJ_CONSTRUCT(request+1, mca_ptl_tcp_send_frag_t); } -void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request) +void mca_ptl_tcp_request_fini(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request) { - /* OBJ_DESTRUCT(&request->req_convertor); */ - OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_requests, (ompi_list_item_t*)request); + OBJ_DESTRUCT(request+1); } void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv_frag_t* frag) { - if(frag->super.frag_is_buffered) - free(frag->super.super.frag_addr); - /* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */ + if(frag->frag_recv.frag_is_buffered) + free(frag->frag_recv.frag_base.frag_addr); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); } @@ -149,8 +143,8 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send return; } OMPI_THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); - mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->super.super.frag_peer, pending); - mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag); + mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->frag_recv.frag_base.frag_peer, pending); + mca_ptl_tcp_peer_send(pending->frag_recv.frag_base.frag_peer, frag); mca_ptl_tcp_recv_frag_return(ptl, pending); } else { OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (ompi_list_item_t*)frag); @@ -203,7 +197,7 @@ void mca_ptl_tcp_matched( mca_ptl_base_recv_frag_t* frag) { /* send ack back to peer? */ - mca_ptl_base_header_t* header = &frag->super.frag_header; + mca_ptl_base_header_t* header = &frag->frag_base.frag_header; if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { int rc; mca_ptl_tcp_send_frag_t* ack; @@ -218,8 +212,8 @@ void mca_ptl_tcp_matched( ompi_list_append(&mca_ptl_tcp_module.tcp_pending_acks, (ompi_list_item_t*)frag); OMPI_THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); } else { - mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->super.super.frag_peer, recv_frag); - mca_ptl_tcp_peer_send(ack->super.super.frag_peer, ack); + mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->frag_recv.frag_base.frag_peer, recv_frag); + mca_ptl_tcp_peer_send(ack->frag_send.frag_base.frag_peer, ack); } } diff --git a/src/mca/ptl/tcp/src/ptl_tcp.h b/src/mca/ptl/tcp/src/ptl_tcp.h index 36bbd39c2e..f59f22577c 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.h +++ b/src/mca/ptl/tcp/src/ptl_tcp.h @@ -166,26 +166,25 @@ extern int mca_ptl_tcp_del_procs( ); /** - * PML->PTL Allocate a send request from the PTL modules free list. - * - * @param ptl (IN) PTL instance - * @param request (OUT) Pointer to allocated request. - * @return Status indicating if allocation was successful. - * - */ -extern int mca_ptl_tcp_request_alloc( - struct mca_ptl_t* ptl, - struct mca_pml_base_send_request_t** -); - -/** - * PML->PTL Return a send request to the PTL modules free list. + * PML->PTL Initialize a send request for TCP cache. * * @param ptl (IN) PTL instance * @param request (IN) Pointer to allocated request. * */ -extern void mca_ptl_tcp_request_return( +extern void mca_ptl_tcp_request_init( + struct mca_ptl_t* ptl, + struct mca_pml_base_send_request_t* +); + +/** + * PML->PTL Cleanup a send request that is being removed from the cache. + * + * @param ptl (IN) PTL instance + * @param request (IN) Pointer to allocated request. + * + */ +extern void mca_ptl_tcp_request_fini( struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* ); @@ -207,7 +206,7 @@ extern void mca_ptl_tcp_matched( * * @param ptl (IN) PTL instance * @param ptl_base_peer (IN) PTL peer addressing - * @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t) + * @param send_request (IN/OUT) Send request (initialized by PML via mca_ptl_base_request_init_fn_t) * @param size (IN) Number of bytes PML is requesting PTL to deliver * @param flags (IN) Flags that should be passed to the peer via the message header. * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c index 2127dbc231..af0d2a9e85 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -54,12 +54,12 @@ static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag) void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer) { - frag->super.super.frag_owner = &peer->peer_ptl->super; - frag->super.super.frag_addr = NULL; - frag->super.super.frag_size = 0; - frag->super.super.frag_peer = peer; - frag->super.frag_request = 0; - frag->super.frag_is_buffered = false; + frag->frag_recv.frag_base.frag_owner = &peer->peer_ptl->super; + frag->frag_recv.frag_base.frag_addr = NULL; + frag->frag_recv.frag_base.frag_size = 0; + frag->frag_recv.frag_base.frag_peer = peer; + frag->frag_recv.frag_request = 0; + frag->frag_recv.frag_is_buffered = false; frag->frag_hdr_cnt = 0; frag->frag_msg_cnt = 0; frag->frag_ack_pending = false; @@ -78,7 +78,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false) return false; - switch(frag->super.super.frag_header.hdr_common.hdr_type) { + switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) { case MCA_PTL_HDR_TYPE_MATCH: return mca_ptl_tcp_recv_frag_match(frag, sd); case MCA_PTL_HDR_TYPE_FRAG: @@ -88,7 +88,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) return mca_ptl_tcp_recv_frag_ack(frag, sd); default: ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X", - *(unsigned long*)&frag->super.super.frag_header); + *(unsigned long*)&frag->frag_recv.frag_base.frag_header); return false; } } @@ -100,11 +100,11 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size) { /* non-blocking read - continue if interrupted, otherwise wait until data available */ - unsigned char* ptr = (unsigned char*)&frag->super.super.frag_header; + unsigned char* ptr = (unsigned char*)&frag->frag_recv.frag_base.frag_header; while(frag->frag_hdr_cnt < size) { int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } @@ -117,7 +117,7 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, return false; default: ompi_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } @@ -139,11 +139,11 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd) { mca_ptl_tcp_send_frag_t* sendfrag; mca_pml_base_send_request_t* sendreq; - sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval; - sendreq = sendfrag->super.frag_request; - sendreq->req_peer_match = frag->super.super.frag_header.hdr_ack.hdr_dst_match; + sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_src_ptr.pval; + sendreq = sendfrag->frag_send.frag_request; + sendreq->req_peer_match = frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_dst_match; mca_ptl_tcp_send_frag_progress(sendfrag); - mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag); + mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, frag); return true; } @@ -155,35 +155,40 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* first pass through - attempt a match */ - if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) { + if(NULL == frag->frag_recv.frag_request && 0 == frag->frag_msg_cnt) { /* attempt to match a posted recv */ - if(mca_ptl_base_recv_frag_match( frag->super.super.frag_owner, &frag->super, &frag->super.super.frag_header.hdr_match)) { + if (mca_ptl_base_recv_frag_match( + frag->frag_recv.frag_base.frag_owner, + &frag->frag_recv, + &frag->frag_recv.frag_base.frag_header.hdr_match)) { mca_ptl_tcp_recv_frag_matched(frag); } else { /* match was not made - so allocate buffer for eager send */ - if(frag->super.super.frag_header.hdr_frag.hdr_frag_length > 0) { - frag->super.super.frag_addr = malloc(frag->super.super.frag_header.hdr_frag.hdr_frag_length); - frag->super.super.frag_size = frag->super.super.frag_header.hdr_frag.hdr_frag_length; - frag->super.frag_is_buffered = true; + if(frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length > 0) { + frag->frag_recv.frag_base.frag_addr = + malloc(frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length); + frag->frag_recv.frag_base.frag_size = + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length; + frag->frag_recv.frag_is_buffered = true; } } } /* receive fragment data */ - if(frag->frag_msg_cnt < frag->super.super.frag_size) { + if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) { if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) { return false; } } /* discard any data that exceeds the posted receive */ - if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) + if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) { return false; } /* if fragment has already been matched - go ahead and process */ - if (NULL != frag->super.frag_request) + if (NULL != frag->frag_recv.frag_request) mca_ptl_tcp_recv_frag_progress(frag); return true; } @@ -197,17 +202,17 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* get request from header */ if(frag->frag_msg_cnt == 0) { - frag->super.frag_request = frag->super.super.frag_header.hdr_frag.hdr_dst_ptr.pval; + frag->frag_recv.frag_request = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_dst_ptr.pval; mca_ptl_tcp_recv_frag_matched(frag); } /* continue to receive user data */ - if(frag->frag_msg_cnt < frag->super.super.frag_size) { + if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) { if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) return false; } - if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) + if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) return false; @@ -224,11 +229,11 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) { - while(frag->frag_msg_cnt < frag->super.super.frag_size) { - int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt, - frag->super.super.frag_size-frag->frag_msg_cnt, 0); + while(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) { + int cnt = recv(sd, (unsigned char*)frag->frag_recv.frag_base.frag_addr+frag->frag_msg_cnt, + frag->frag_recv.frag_base.frag_size-frag->frag_msg_cnt, 0); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } @@ -240,7 +245,7 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; default: ompi_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } @@ -261,13 +266,13 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd) { - while(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) { - size_t count = frag->super.super.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt; + while(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) { + size_t count = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt; void *rbuf = malloc(count); int cnt = recv(sd, rbuf, count, 0); free(rbuf); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } @@ -280,7 +285,7 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; default: ompi_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer); OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag); return false; } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h index 6502690939..44de0aa2d5 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h @@ -25,11 +25,11 @@ extern ompi_class_t mca_ptl_tcp_recv_frag_t_class; * TCP received fragment derived type. */ struct mca_ptl_tcp_recv_frag_t { - mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */ - size_t frag_hdr_cnt; /**< number of header bytes received */ - size_t frag_msg_cnt; /**< number of message bytes received */ - bool frag_ack_pending; /**< is an ack pending for this fragment */ - volatile int frag_progressed; /**< flag used to atomically progress fragment */ + mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */ + size_t frag_hdr_cnt; /**< number of header bytes received */ + size_t frag_msg_cnt; /**< number of message bytes received */ + bool frag_ack_pending; /**< is an ack pending for this fragment */ + volatile int frag_progressed; /**< flag used to atomically progress fragment */ }; typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; @@ -50,8 +50,8 @@ bool mca_ptl_tcp_recv_frag_send_ack(mca_ptl_tcp_recv_frag_t* frag); static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag) { - mca_pml_base_recv_request_t* request = frag->super.frag_request; - mca_ptl_base_frag_header_t* header = &frag->super.super.frag_header.hdr_frag; + mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; + mca_ptl_base_frag_header_t* header = &frag->frag_recv.frag_base.frag_header.hdr_frag; /* if there is data associated with the fragment -- setup to receive */ if(header->hdr_frag_length > 0) { @@ -59,14 +59,14 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag) /* initialize receive convertor */ struct iovec iov; ompi_proc_t *proc = - ompi_comm_peer_lookup(request->super.req_comm, request->super.req_peer); - ompi_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor); + ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer); + ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor); ompi_convertor_init_for_recv( - &frag->super.super.frag_convertor, /* convertor */ + &frag->frag_recv.frag_base.frag_convertor, /* convertor */ 0, /* flags */ - request->super.req_datatype, /* datatype */ - request->super.req_count, /* count elements */ - request->super.req_addr, /* users buffer */ + request->req_base.req_datatype, /* datatype */ + request->req_base.req_count, /* count elements */ + request->req_base.req_addr, /* users buffer */ header->hdr_frag_offset); /* offset in bytes into packed buffer */ /* @@ -75,17 +75,17 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag) */ iov.iov_base = NULL; iov.iov_len = header->hdr_frag_length; - ompi_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1); + ompi_convertor_unpack(&frag->frag_recv.frag_base.frag_convertor, &iov, 1); /* non-contiguous - allocate buffer for receive */ if(NULL == iov.iov_base) { - frag->super.super.frag_addr = malloc(header->hdr_frag_length); - frag->super.super.frag_size = header->hdr_frag_length; - frag->super.frag_is_buffered = true; + frag->frag_recv.frag_base.frag_addr = malloc(header->hdr_frag_length); + frag->frag_recv.frag_base.frag_size = header->hdr_frag_length; + frag->frag_recv.frag_is_buffered = true; /* we now have correct offset into users buffer */ } else { - frag->super.super.frag_addr = iov.iov_base; - frag->super.super.frag_size = iov.iov_len; + frag->frag_recv.frag_base.frag_addr = iov.iov_base; + frag->frag_recv.frag_base.frag_size = iov.iov_len; } } } @@ -93,37 +93,42 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag) static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag) { - if((frag)->frag_msg_cnt >= (frag)->super.super.frag_header.hdr_frag.hdr_frag_length) { + if((frag)->frag_msg_cnt >= frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) { /* make sure this only happens once for threaded case */ if(fetchNset(&frag->frag_progressed, 1) == 0) { - mca_pml_base_recv_request_t* request = (frag)->super.frag_request; - if((frag)->super.frag_is_buffered) { - mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match; + mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; + if(frag->frag_recv.frag_is_buffered) { + mca_ptl_base_match_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_match; /* * Initialize convertor and use it to unpack data */ struct iovec iov; ompi_proc_t *proc = - ompi_comm_peer_lookup(request->super.req_comm, request->super.req_peer); - ompi_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor); + ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer); + ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor); ompi_convertor_init_for_recv( - &(frag)->super.super.frag_convertor, /* convertor */ + &frag->frag_recv.frag_base.frag_convertor, /* convertor */ 0, /* flags */ - request->super.req_datatype, /* datatype */ - request->super.req_count, /* count elements */ - request->super.req_addr, /* users buffer */ + request->req_base.req_datatype, /* datatype */ + request->req_base.req_count, /* count elements */ + request->req_base.req_addr, /* users buffer */ header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */ - iov.iov_base = (frag)->super.super.frag_addr; - iov.iov_len = (frag)->super.super.frag_size; - ompi_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1); + iov.iov_base = frag->frag_recv.frag_base.frag_addr; + iov.iov_len = frag->frag_recv.frag_base.frag_size; + ompi_convertor_unpack(&frag->frag_recv.frag_base.frag_convertor, &iov, 1); } /* progress the request */ - (frag)->super.super.frag_owner->ptl_recv_progress((frag)->super.super.frag_owner, request, &(frag)->super); + frag->frag_recv.frag_base.frag_owner->ptl_recv_progress( + frag->frag_recv.frag_base.frag_owner, + request, + frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length, + frag->frag_recv.frag_base.frag_size); + if((frag)->frag_ack_pending == false) { - mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag)); + mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, (frag)); } } } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c index 556b6367b7..e0b208590c 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -12,10 +12,10 @@ #include "ptl_tcp_proc.h" #include "ptl_tcp_sendfrag.h" -#define frag_header super.super.frag_header -#define frag_owner super.super.frag_owner -#define frag_peer super.super.frag_peer -#define frag_convertor super.super.frag_convertor +#define frag_header frag_send.frag_base.frag_header +#define frag_owner frag_send.frag_base.frag_owner +#define frag_peer frag_send.frag_base.frag_peer +#define frag_convertor frag_send.frag_base.frag_convertor static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag); @@ -70,12 +70,12 @@ int mca_ptl_tcp_send_frag_init( hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; hdr->hdr_frag.hdr_dst_ptr.lval = 0; - hdr->hdr_match.hdr_contextid = sendreq->super.req_comm->c_contextid; - hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank; - hdr->hdr_match.hdr_dst = sendreq->super.req_peer; - hdr->hdr_match.hdr_tag = sendreq->super.req_tag; + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; - hdr->hdr_match.hdr_msg_seq = sendreq->super.req_sequence; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; } else { hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_common.hdr_flags = flags; @@ -105,9 +105,9 @@ int mca_ptl_tcp_send_frag_init( ompi_convertor_init_for_send( convertor, 0, - sendreq->super.req_datatype, - sendreq->super.req_count, - sendreq->super.req_addr, + sendreq->req_base.req_datatype, + sendreq->req_base.req_count, + sendreq->req_base.req_addr, offset); } @@ -129,9 +129,9 @@ int mca_ptl_tcp_send_frag_init( /* fragment state */ sendfrag->frag_owner = &ptl_peer->peer_ptl->super; - sendfrag->super.frag_request = sendreq; - sendfrag->super.super.frag_addr = sendfrag->frag_vec[1].iov_base; - sendfrag->super.super.frag_size = size_out; + sendfrag->frag_send.frag_request = sendreq; + sendfrag->frag_send.frag_base.frag_addr = sendfrag->frag_vec[1].iov_base; + sendfrag->frag_send.frag_base.frag_size = size_out; sendfrag->frag_peer = ptl_peer; sendfrag->frag_vec_ptr = sendfrag->frag_vec; diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h index 8d3d94509c..be97702ad2 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h @@ -25,11 +25,11 @@ struct mca_ptl_base_peer_t; * TCP send fragment derived type. */ struct mca_ptl_tcp_send_frag_t { - mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */ - struct iovec *frag_vec_ptr; /**< pointer into iovec array */ - size_t frag_vec_cnt; /**< number of iovec structs left to process */ - struct iovec frag_vec[2]; /**< array of iovecs for send */ - volatile int frag_progressed; /**< for threaded case - has request status been updated */ + mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */ + struct iovec *frag_vec_ptr; /**< pointer into iovec array */ + size_t frag_vec_cnt; /**< number of iovec structs left to process */ + struct iovec frag_vec[2]; /**< array of iovecs for send */ + volatile int frag_progressed; /**< for threaded case - has request status been updated */ }; typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t; @@ -70,30 +70,33 @@ int mca_ptl_tcp_send_frag_init( static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag) { - mca_pml_base_send_request_t* request = frag->super.frag_request; + mca_pml_base_send_request_t* request = frag->frag_send.frag_request; /* if this is an ack - simply return to pool */ if(request == NULL) { - mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); + mca_ptl_tcp_send_frag_return(frag->frag_send.frag_base.frag_owner, frag); /* otherwise, if the message has been sent, and an ack has already been * received, go ahead and update the request status */ } else if (frag->frag_vec_cnt == 0 && - ((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 || + ((frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 || mca_pml_base_send_request_matched(request))) { /* make sure this only happens once in threaded case */ if(fetchNset(&frag->frag_progressed,1) == 0) { /* update request status */ - frag->super.super.frag_owner->ptl_send_progress(frag->super.super.frag_owner, request, &frag->super); + frag->frag_send.frag_base.frag_owner->ptl_send_progress( + frag->frag_send.frag_base.frag_owner, + request, + frag->frag_send.frag_base.frag_size); /* the first fragment is allocated with the request, * all others need to be returned to free list */ - if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0) - mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); + if(frag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0) + mca_ptl_tcp_send_frag_return(frag->frag_send.frag_base.frag_owner, frag); } } } @@ -105,22 +108,22 @@ static inline void mca_ptl_tcp_send_frag_init_ack( struct mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_recv_frag_t* frag) { - mca_ptl_base_header_t* hdr = &ack->super.super.frag_header; - mca_pml_base_recv_request_t* request = frag->super.frag_request; + mca_ptl_base_header_t* hdr = &ack->frag_send.frag_base.frag_header; + mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); - hdr->hdr_ack.hdr_src_ptr = frag->super.super.frag_header.hdr_frag.hdr_src_ptr; + hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr; hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_ack.hdr_dst_match.pval = request; hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_ack.hdr_dst_addr.pval = request->super.req_addr; + hdr->hdr_ack.hdr_dst_addr.pval = request->req_base.req_addr; hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; - ack->super.frag_request = 0; - ack->super.super.frag_peer = ptl_peer; - ack->super.super.frag_owner = ptl; - ack->super.super.frag_addr = NULL; - ack->super.super.frag_size = 0; + ack->frag_send.frag_request = 0; + ack->frag_send.frag_base.frag_peer = ptl_peer; + ack->frag_send.frag_base.frag_owner = ptl; + ack->frag_send.frag_base.frag_addr = NULL; + ack->frag_send.frag_base.frag_size = 0; ack->frag_vec_ptr = ack->frag_vec; ack->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr; ack->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);