diff --git a/src/lfc/lam_object.h b/src/lfc/lam_object.h index 96356d9473..3f0edf8266 100644 --- a/src/lfc/lam_object.h +++ b/src/lfc/lam_object.h @@ -374,8 +374,7 @@ static inline void lam_obj_run_destructors(lam_object_t *object) * @param cls Pointer to the class descriptor of this object * @return Pointer to the object */ -static inline lam_object_t *lam_obj_new(size_t size, - lam_class_t *cls) +static inline lam_object_t *lam_obj_new(size_t size, lam_class_t *cls) { lam_object_t *object; diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index f212abb358..35003b93d9 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -160,7 +160,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs) proc->proc_pml = proc_pml; } - /* allow each ptl to register with the proc */ + /* attempt to add the proc to each ptl */ for(p_index = 0; p_index < mca_pml_teg.teg_num_ptls; p_index++) { mca_ptl_t* ptl = mca_pml_teg.teg_ptls[p_index]; @@ -181,6 +181,8 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs) /* if this ptl supports exclusive access then don't allow * subsequent ptls to register */ + + proc_pml->proc_ptl_flags |= ptl->ptl_flags; if(ptl->ptl_exclusivity) break; } diff --git a/src/mca/pml/teg/src/pml_teg_irecv.c b/src/mca/pml/teg/src/pml_teg_irecv.c index e3c98c8de6..eda798b2a2 100644 --- a/src/mca/pml/teg/src/pml_teg_irecv.c +++ b/src/mca/pml/teg/src/pml_teg_irecv.c @@ -77,7 +77,7 @@ int mca_pml_teg_recv( struct lam_communicator_t* comm, lam_status_public_t* status) { - int rc, index; + int rc; mca_ptl_base_recv_request_t *recvreq; MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); #if MCA_PML_TEG_STATISTICS diff --git a/src/mca/pml/teg/src/pml_teg_proc.c b/src/mca/pml/teg/src/pml_teg_proc.c index 72445ad31f..7eb5d85088 100644 --- a/src/mca/pml/teg/src/pml_teg_proc.c +++ b/src/mca/pml/teg/src/pml_teg_proc.c @@ -11,6 +11,7 @@ static void mca_pml_teg_proc_construct(mca_pml_proc_t* proc) { proc->proc_lam = NULL; + proc->proc_ptl_flags = 0; OBJ_CONSTRUCT(&proc->proc_lock, lam_mutex_t); OBJ_CONSTRUCT(&proc->proc_ptl_first, mca_pml_teg_ptl_array_t); OBJ_CONSTRUCT(&proc->proc_ptl_next, mca_pml_teg_ptl_array_t); diff --git a/src/mca/pml/teg/src/pml_teg_proc.h b/src/mca/pml/teg/src/pml_teg_proc.h index f2636c3d02..64b42a19f7 100644 --- a/src/mca/pml/teg/src/pml_teg_proc.h +++ b/src/mca/pml/teg/src/pml_teg_proc.h @@ -23,6 +23,7 @@ struct mca_pml_proc_t { lam_mutex_t proc_lock; /**< lock to protect against concurrent access */ mca_ptl_array_t proc_ptl_first; /**< array of ptls to use for first fragments */ mca_ptl_array_t proc_ptl_next; /**< array of ptls to use for remaining fragments */ + uint32_t proc_ptl_flags; /**< aggregate ptl flags */ }; typedef struct mca_pml_proc_t mca_pml_proc_t; diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.c b/src/mca/pml/teg/src/pml_teg_recvreq.c index 77bb379dc3..6f5a8d7624 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/pml/teg/src/pml_teg_recvreq.c @@ -9,7 +9,7 @@ void mca_pml_teg_recv_request_progress( THREAD_LOCK(&mca_pml_teg.teg_request_lock); req->req_bytes_delivered += frag->super.frag_size; req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length; - if (req->req_bytes_received >= req->req_bytes_msg) { + if (req->req_bytes_received >= req->req_bytes_packed) { /* initialize request status */ req->super.req_status.MPI_SOURCE = req->super.req_peer; req->super.req_status.MPI_TAG = req->super.req_tag; diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.c b/src/mca/pml/teg/src/pml_teg_sendreq.c index 22a478db84..29cf29db62 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/pml/teg/src/pml_teg_sendreq.c @@ -25,7 +25,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req) mca_pml_proc_t* proc_pml = proc->proc_pml; /* allocate remaining bytes to PTLs */ - size_t bytes_remaining = req->req_bytes_msg - req->req_offset; + size_t bytes_remaining = req->req_bytes_packed - req->req_offset; size_t num_ptl_avail = proc_pml->proc_ptl_next.ptl_size; size_t num_ptl = 0; while(bytes_remaining > 0 && num_ptl++ < num_ptl_avail) { @@ -50,9 +50,11 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req) bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100; } - rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0); - if(rc == LAM_SUCCESS) - bytes_remaining = req->req_bytes_msg - req->req_offset; + rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, &bytes_to_frag, 0); + if(rc == LAM_SUCCESS) { + req->req_offset += bytes_to_frag; + bytes_remaining = req->req_bytes_packed - req->req_offset; + } } /* unable to complete send - signal request failed */ @@ -71,9 +73,11 @@ void mca_pml_teg_send_request_progress( mca_ptl_base_send_request_t* req, mca_ptl_base_send_frag_t* frag) { + bool first_frag; THREAD_LOCK(&mca_pml_teg.teg_request_lock); + first_frag = (req->req_bytes_sent == 0); req->req_bytes_sent += frag->super.frag_size; - if (req->req_bytes_sent >= req->req_bytes_msg) { + if (req->req_bytes_sent >= req->req_bytes_packed) { req->super.req_pml_done = true; if (req->super.req_mpi_done == false) { req->super.req_status.MPI_SOURCE = req->super.req_comm->c_my_rank; @@ -91,8 +95,8 @@ void mca_pml_teg_send_request_progress( } THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); - /* if first fragment - schedule remaining fragments */ - if(req->req_frags == 1) { + /* if first fragment - shedule remaining fragments */ + if(first_frag == 1) { mca_pml_teg_send_request_schedule(req); } } diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.h b/src/mca/pml/teg/src/pml_teg_sendreq.h index 7bd3326de4..8a39474284 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.h +++ b/src/mca/pml/teg/src/pml_teg_sendreq.h @@ -45,17 +45,19 @@ static inline int mca_pml_teg_send_request_start( int flags, rc; /* start the first fragment */ - if(first_fragment_size <= 0 || req->req_bytes_msg <= first_fragment_size) { - first_fragment_size = req->req_bytes_msg; - flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? MCA_PTL_FLAGS_ACK_MATCHED : 0; + if(first_fragment_size <= 0 || req->req_bytes_packed <= first_fragment_size) { + first_fragment_size = req->req_bytes_packed; + flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? + MCA_PTL_FLAGS_ACK_MATCHED : 0; } else { - /* require match for first fragment of a multi-fragment message or if synchronous send */ + /* require match for first fragment of a multi-fragment */ flags = MCA_PTL_FLAGS_ACK_MATCHED; } - rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size, flags); + rc = ptl->ptl_put(ptl, req->req_peer, req, 0, &first_fragment_size, flags); if(rc != LAM_SUCCESS) return rc; + req->req_offset += first_fragment_size; return LAM_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_wait.c b/src/mca/pml/teg/src/pml_teg_wait.c index ddd15baeb8..4836949774 100644 --- a/src/mca/pml/teg/src/pml_teg_wait.c +++ b/src/mca/pml/teg/src/pml_teg_wait.c @@ -10,7 +10,10 @@ int mca_pml_teg_wait( int *index, lam_status_public_t* status) { - int c, i; +#if LAM_HAVE_THREADS + int c; +#endif + int i; int completed = -1; mca_pml_base_request_t* pml_request; diff --git a/src/mca/ptl/base/ptl_base_header.h b/src/mca/ptl/base/ptl_base_header.h index a02b721593..92751e534c 100644 --- a/src/mca/ptl/base/ptl_base_header.h +++ b/src/mca/ptl/base/ptl_base_header.h @@ -14,6 +14,7 @@ #define MCA_PTL_HDR_TYPE_FRAG 1 #define MCA_PTL_HDR_TYPE_ACK 2 #define MCA_PTL_HDR_TYPE_NACK 3 +#define MCA_PTL_HDR_TYPE_GET 4 #define MCA_PTL_FLAGS_ACK_MATCHED 1 #define MCA_PTL_FLAGS_ACK_AGGREGATE 2 @@ -37,7 +38,7 @@ struct mca_ptl_base_frag_header_t { mca_ptl_base_common_header_t hdr_common; /**< common attributes */ uint32_t hdr_frag_length; /**< fragment length */ uint32_t hdr_frag_offset; /**< offset into message */ - mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */ + mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */ lam_ptr_t hdr_src_ptr; /**< pointer to source fragment */ lam_ptr_t hdr_dst_ptr; /**< pointer to matched receive */ }; @@ -55,7 +56,8 @@ struct mca_ptl_base_match_header_t { int32_t hdr_dst; /**< destination rank */ int32_t hdr_tag; /**< user tag */ uint32_t hdr_msg_length; /**< message length */ - mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */ + mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */ + lam_ptr_t hdr_src_ptr; }; typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t; @@ -66,7 +68,9 @@ typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t; struct mca_ptl_base_ack_header_t { mca_ptl_base_common_header_t hdr_common; /**< common attributes */ lam_ptr_t hdr_src_ptr; /**< source fragment */ - lam_ptr_t hdr_dst_ptr; /**< matched receive request */ + lam_ptr_t hdr_dst_match; /**< matched receive request */ + lam_ptr_t hdr_dst_addr; /**< posted receive buffer */ + uint32_t hdr_dst_size; /**< size of posted buffer */ /* sequence range? */ }; typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t; diff --git a/src/mca/ptl/base/ptl_base_recvfrag.h b/src/mca/ptl/base/ptl_base_recvfrag.h index dd88aca004..a8034dd15b 100644 --- a/src/mca/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/ptl/base/ptl_base_recvfrag.h @@ -50,12 +50,12 @@ static inline bool mca_ptl_base_recv_frag_match( /* * Initialize request status. */ - request->req_bytes_msg = header->hdr_msg_length; + request->req_bytes_packed = header->hdr_msg_length; request->super.req_peer = header->hdr_src; request->super.req_tag = header->hdr_tag; /* notify ptl of match */ - ptl->ptl_recv(ptl, frag); + ptl->ptl_matched(ptl, frag); /* process any additional fragments that arrived out of order */ frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags); diff --git a/src/mca/ptl/base/ptl_base_recvreq.c b/src/mca/ptl/base/ptl_base_recvreq.c index 8dc831ecbf..2d0b7d9e17 100644 --- a/src/mca/ptl/base/ptl_base_recvreq.c +++ b/src/mca/ptl/base/ptl_base_recvreq.c @@ -56,7 +56,7 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque (frag = mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) != NULL) { mca_ptl_t* ptl = frag->super.frag_owner; THREAD_UNLOCK(&pml_comm->c_matching_lock); - ptl->ptl_recv(ptl, frag); + ptl->ptl_matched(ptl, frag); return; /* match found */ } @@ -102,7 +102,7 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) if ((frag = mca_ptl_base_recv_request_match_specific_proc(request, proc)) != NULL) { mca_ptl_t* ptl = frag->super.frag_owner; THREAD_UNLOCK(&pml_comm->c_matching_lock); - ptl->ptl_recv(ptl, frag); + ptl->ptl_matched(ptl, frag); return; /* match found */ } } @@ -141,7 +141,7 @@ static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc( continue; } lam_list_remove_item(unexpected_frags, (lam_list_item_t*)frag); - request->req_bytes_msg = header->hdr_msg_length; + request->req_bytes_packed = header->hdr_msg_length; request->super.req_tag = header->hdr_tag; request->super.req_peer = header->hdr_src; frag->frag_request = request; diff --git a/src/mca/ptl/base/ptl_base_recvreq.h b/src/mca/ptl/base/ptl_base_recvreq.h index 1cb02c8116..c88898dbcd 100644 --- a/src/mca/ptl/base/ptl_base_recvreq.h +++ b/src/mca/ptl/base/ptl_base_recvreq.h @@ -18,7 +18,7 @@ struct mca_ptl_base_recv_frag_t; */ struct mca_ptl_base_recv_request_t { mca_pml_base_request_t super; /**< base request */ - size_t req_bytes_msg; /**< size of message being received */ + size_t req_bytes_packed; /**< size of message being received */ size_t req_bytes_received; /**< number of bytes received from network */ size_t req_bytes_delivered; /**< number of bytes delivered to user */ }; @@ -47,7 +47,7 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t; comm, \ persistent) \ { \ - request->req_bytes_msg = 0; \ + request->req_bytes_packed = 0; \ request->req_bytes_received = 0; \ request->req_bytes_delivered = 0; \ request->super.req_sequence = 0; \ diff --git a/src/mca/ptl/base/ptl_base_sendreq.h b/src/mca/ptl/base/ptl_base_sendreq.h index 73bc35ea84..9fbe69292e 100644 --- a/src/mca/ptl/base/ptl_base_sendreq.h +++ b/src/mca/ptl/base/ptl_base_sendreq.h @@ -24,13 +24,14 @@ struct mca_ptl_base_send_frag_t; struct mca_ptl_base_send_request_t { mca_pml_base_request_t super; /** base request type - common data structure for use by wait/test */ size_t req_offset; /**< number of bytes that have already been assigned to a fragment */ - size_t req_frags; /**< number of fragments that have been allocated */ - size_t req_bytes_msg; /**< packed size of a message given the datatype and count */ + size_t req_bytes_packed; /**< packed size of a message given the datatype and count */ size_t req_bytes_sent; /**< number of bytes that have been sent */ mca_pml_base_send_mode_t req_send_mode; /**< type of send */ struct mca_ptl_t* req_owner; /**< PTL that allocated this descriptor */ struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */ - lam_ptr_t req_peer_request; /**< matched receive at peer */ + lam_ptr_t req_peer_match; /**< matched receive at peer */ + lam_ptr_t req_peer_addr; + size_t req_peer_size; lam_convertor_t req_convertor; /**< convertor that describes this datatype */ }; typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; @@ -62,10 +63,11 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; persistent) \ { \ request->req_offset = 0; \ - request->req_frags = 0; \ request->req_bytes_sent = 0; \ request->req_send_mode = mode; \ - request->req_peer_request.lval = 0; \ + request->req_peer_match.lval = 0; \ + request->req_peer_addr.lval = 0; \ + request->req_peer_size = 0; \ request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); \ request->super.req_addr = addr; \ request->super.req_count = count; \ @@ -91,9 +93,9 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; request->super.req_addr, \ 0); \ lam_convertor_get_packed_size(&request->req_convertor, &packed_size); \ - request->req_bytes_msg = packed_size; \ + request->req_bytes_packed = packed_size; \ } else { \ - request->req_bytes_msg = 0; \ + request->req_bytes_packed = 0; \ } \ } @@ -107,7 +109,7 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; static inline bool mca_ptl_base_send_request_matched( mca_ptl_base_send_request_t* request) { - return (NULL != request->req_peer_request.pval); + return (NULL != request->req_peer_match.pval); } #endif diff --git a/src/mca/ptl/ptl.h b/src/mca/ptl/ptl.h index 85209af2d4..ab99ed6b9a 100644 --- a/src/mca/ptl/ptl.h +++ b/src/mca/ptl/ptl.h @@ -106,9 +106,9 @@ * When a header of this type is received, to minimize latency the PTL should call the * ptl_match() method as soon as entire header is available, potentially prior to receiving * any data associated with the first fragment. If a match is made, the PML will call - * the ptl_recv() method of the fragments PTL. + * the ptl_matched() method of the fragments PTL. * - * The ptl_recv() method should generate, if required, an ack to the source process. An + * The ptl_matched() method should generate, if required, an ack to the source process. An * ack is required if the MCA_PTL_FLAGS_ACK_MATCHED bit is set by the source in the initial * message header. The ack should contain a pointer to the matched request, along * with the pointer to the orignal send fragment contained in the initial message header. @@ -152,6 +152,12 @@ typedef enum { MCA_PTL_ENABLE } mca_ptl_control_t; +/** + * PTL flags + */ +#define MCA_PTL_PUT 1 +#define MCA_PTL_GET 2 + /* * PTL module interface functions and datatype. */ @@ -319,49 +325,59 @@ typedef void (*mca_ptl_base_request_return_fn_t)( ); /** - * PML->PTL Initiate a send of the specified size. + * PML->PTL Initiate a send/put to the peer. * * @param ptl (IN) PTL instance * @param ptl_base_peer (IN) PTL peer addressing - * @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t) - * @param size (IN) Number of bytes PML is requesting PTL to deliver + * @param request (IN) Send request + * @param offset Current offset into packed/contiguous buffer. + * @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver, + * PTL returns number of bytes sucessfully fragmented * @param flags (IN) Flags that should be passed to the peer via the message header. * @param request (OUT) LAM_SUCCESS if the PTL was able to queue one or more fragments * - * When multiple PTLs are available, a single request (that is large enough) - * will be split across the available PTLs. The PML scheduler will determine - * the percentage given to a PTL based on the bandwidth provided by the transport - * and its current resource usage. The size parameter to the send function indicates - * the number of bytes the PML is requesting the PTL to send. The PTL may choose - * to send 0->size bytes based on available resources. + * The PML implements a rendevouz protocol, with up to the PTL defined threshold + * bytes of the message sent in eager send mode. On receipt of an acknowledgment + * from the peer, the PML will schedule the remaining fragments. If the PTL supports + * RDMA functionality, these subsequent transfers may use RDMA put semantics. * - * The current offset into the users buffer is passed into the send function - * via the req_offset member of the send request parameter. The send function - * must update req_offset with the actual number of bytes the PTL is able to - * fragment for delivery. + * If the PTL is unable to fragment the requested size, possibly due to resource + * constraints or datatype alighnment/offset, it should return the number of bytes + * actually fragmented in the size parameter. */ -typedef int (*mca_ptl_base_send_fn_t)( +typedef int (*mca_ptl_base_put_fn_t)( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_base_peer, - struct mca_ptl_base_send_request_t* send_request, - size_t size, + struct mca_ptl_base_send_request_t* request, + size_t offset, + size_t *size, int flags ); /** - * PML->PTL Notification that a receive fragment has been matched. + * PML->PTL Initiate a get from a peer. * - * @param ptl (IN) PTL instance - * @param recv_frag (IN) Receive fragment + * @param ptl (IN) PTL instance + * @param ptl_base_peer (IN) PTL peer addressing + * @param request (IN) Recv request + * @param offset Current offset into packed/contiguous buffer. + * @param size (IN/OUT) Number of bytes PML is requesting PTL to pull from peer, + * PTL returns number of bytes sucessfully fragmented. + * @param flags (IN) + * @param request (OUT) LAM_SUCCESS if the PTL was able to queue one or more fragments * - * A fragment may be matched either when a new receive is posted, - * or on receipt of a fragment from the network. In either case, - * the PML will downcall into the PTL to provide a notification - * that the match was made. + * Initiate an RDMA get request to pull data from the peer. This is initiated + * at the receiver side when a request is matched if the PTL indicates that it + * supports RDMA get semantics. */ -typedef void (*mca_ptl_base_recv_fn_t)( + +typedef int (*mca_ptl_base_get_fn_t)( struct mca_ptl_t* ptl, - struct mca_ptl_base_recv_frag_t* recv_frag + struct mca_ptl_base_peer_t* ptl_base_peer, + struct mca_ptl_base_recv_request_t* request, + size_t offset, + size_t *size, + int flags ); /** @@ -387,6 +403,21 @@ typedef bool (*mca_ptl_base_match_fn_t)( struct mca_ptl_base_match_header_t* header ); + +/** + * PML->PTL Notification from the PML to the PTL that a receive has + * been posted and matched against the indicated fragment. + * + * @param ptl (IN) PTL instance + * @param recv_frag Matched fragment + * + */ + +typedef void (*mca_ptl_base_matched_fn_t)( + struct mca_ptl_t* ptl, + struct mca_ptl_base_recv_frag_t* request +); + /** * PTL->PML Notification from the PTL to the PML that a fragment * has completed (e.g. been successfully delivered into users buffer) @@ -424,13 +455,15 @@ struct mca_ptl_t { uint32_t ptl_exclusivity; /**< indicates this PTL should be used exclusively */ uint32_t ptl_latency; /**< relative ranking of latency used to prioritize ptls */ uint32_t ptl_bandwidth; /**< bandwidth (Mbytes/sec) supported by each endpoint */ + uint32_t ptl_flags; /**< flags (put/get...) */ /* PML->PTL function table */ mca_ptl_base_add_proc_fn_t ptl_add_proc; mca_ptl_base_del_proc_fn_t ptl_del_proc; mca_ptl_base_finalize_fn_t ptl_finalize; - mca_ptl_base_send_fn_t ptl_send; - mca_ptl_base_recv_fn_t ptl_recv; + mca_ptl_base_put_fn_t ptl_put; + mca_ptl_base_get_fn_t ptl_get; + mca_ptl_base_matched_fn_t ptl_matched; mca_ptl_base_request_alloc_fn_t ptl_request_alloc; mca_ptl_base_request_return_fn_t ptl_request_return; diff --git a/src/mca/ptl/tcp/src/ptl_tcp.c b/src/mca/ptl/tcp/src/ptl_tcp.c index 0be9d26efc..63859ee635 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/ptl/tcp/src/ptl_tcp.c @@ -30,11 +30,13 @@ mca_ptl_tcp_t mca_ptl_tcp = { 0, /* ptl_frag_first_size */ 0, /* ptl_frag_min_size */ 0, /* ptl_frag_max_size */ + MCA_PTL_PUT, /* ptl flags */ mca_ptl_tcp_add_proc, mca_ptl_tcp_del_proc, mca_ptl_tcp_finalize, mca_ptl_tcp_send, - mca_ptl_tcp_recv, + NULL, + mca_ptl_tcp_matched, mca_ptl_tcp_request_alloc, mca_ptl_tcp_request_return } @@ -154,20 +156,23 @@ int mca_ptl_tcp_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_peer, struct mca_ptl_base_send_request_t* sendreq, - size_t size, + size_t offset, + size_t *size, int flags) { mca_ptl_tcp_send_frag_t* sendfrag; - if (sendreq->req_frags == 0) { + int rc; + if (offset == 0) { sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag; } else { - int rc; lam_list_item_t* item; LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc); if(NULL == (sendfrag = (mca_ptl_tcp_send_frag_t*)item)) return rc; } - mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, size, flags); + rc = mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, offset, size, flags); + if(rc != LAM_SUCCESS) + return rc; return mca_ptl_tcp_peer_send(ptl_peer, sendfrag); } @@ -177,7 +182,7 @@ int mca_ptl_tcp_send( * ack back to the peer and process the fragment. */ -void mca_ptl_tcp_recv( +void mca_ptl_tcp_matched( mca_ptl_t* ptl, mca_ptl_base_recv_frag_t* frag) { @@ -206,3 +211,4 @@ void mca_ptl_tcp_recv( mca_ptl_tcp_recv_frag_progress((mca_ptl_tcp_recv_frag_t*)frag); } + diff --git a/src/mca/ptl/tcp/src/ptl_tcp.h b/src/mca/ptl/tcp/src/ptl_tcp.h index 1670a880ab..2c2cf5c75a 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.h +++ b/src/mca/ptl/tcp/src/ptl_tcp.h @@ -191,7 +191,7 @@ extern void mca_ptl_tcp_request_return( * @param recv_frag (IN) Receive fragment * */ -extern void mca_ptl_tcp_recv( +extern void mca_ptl_tcp_matched( struct mca_ptl_t* ptl, struct mca_ptl_base_recv_frag_t* frag ); @@ -210,7 +210,8 @@ extern int mca_ptl_tcp_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_peer, struct mca_ptl_base_send_request_t*, - size_t size, + size_t offset, + size_t *size, int flags ); diff --git a/src/mca/ptl/tcp/src/ptl_tcp_module.c b/src/mca/ptl/tcp/src/ptl_tcp_module.c index 937cc3fea5..c745ea0cff 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_module.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_module.c @@ -182,7 +182,6 @@ static int mca_ptl_tcp_create(int if_index, const char* if_name) { mca_ptl_tcp_t* ptl = malloc(sizeof(mca_ptl_tcp_t)); char param[256]; - char *value; if(NULL == ptl) return LAM_ERR_OUT_OF_RESOURCE; memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp)); diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c index a791880349..28dc320703 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -118,7 +118,7 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd) mca_ptl_base_send_request_t* sendreq; sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval; sendreq = sendfrag->super.frag_request; - sendreq->req_peer_request = frag->super.super.frag_header.hdr_ack.hdr_dst_ptr; + sendreq->req_peer_match = frag->super.super.frag_header.hdr_ack.hdr_dst_match; mca_ptl_tcp_send_frag_progress(sendfrag); mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag); return true; diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c index 3d088e7dcf..fd878a10a7 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -49,16 +49,20 @@ int mca_ptl_tcp_send_frag_init( mca_ptl_tcp_send_frag_t* sendfrag, mca_ptl_base_peer_t* ptl_peer, mca_ptl_base_send_request_t* sendreq, - size_t size, + size_t offset, + size_t* size, int flags) { /* message header */ + size_t size_in = *size; + size_t size_out; + mca_ptl_base_header_t* hdr = &sendfrag->frag_header; - if(sendreq->req_frags == 0) { + if(offset == 0) { hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); - hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset; + hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; @@ -67,21 +71,21 @@ int mca_ptl_tcp_send_frag_init( hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank; hdr->hdr_match.hdr_dst = sendreq->super.req_peer; hdr->hdr_match.hdr_tag = sendreq->super.req_tag; - hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_msg; + hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; hdr->hdr_match.hdr_msg_seq = sendreq->super.req_sequence; } else { hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t); - hdr->hdr_frag.hdr_frag_offset = sendreq->req_offset; + hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_request; + hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; } /* initialize convertor */ - if(size > 0) { + if(size_in > 0) { lam_convertor_t *convertor; int rc; @@ -89,7 +93,7 @@ int mca_ptl_tcp_send_frag_init( * can use the convertor initialized on the request, remaining fragments * must copy/reinit the convertor as the transfer could be in parallel. */ - if( sendreq->req_frags < 2 ) { + if( offset <= mca_ptl_tcp.super.ptl_first_frag_size ) { convertor = &sendreq->req_convertor; } else { @@ -101,7 +105,7 @@ int mca_ptl_tcp_send_frag_init( sendreq->super.req_datatype, sendreq->super.req_count, sendreq->super.req_addr, - sendreq->req_offset); + offset); } /* if data is contigous convertor will return an offset @@ -109,29 +113,30 @@ int mca_ptl_tcp_send_frag_init( * that holds the packed data */ sendfrag->frag_vec[1].iov_base = NULL; - sendfrag->frag_vec[1].iov_len = size; + sendfrag->frag_vec[1].iov_len = size_in; if((rc = lam_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) < 0) return LAM_ERROR; /* adjust size and request offset to reflect actual number of bytes packed by convertor */ - size = sendfrag->frag_vec[1].iov_len; - sendreq->req_offset += size; + size_out = sendfrag->frag_vec[1].iov_len; + } else { + size_out = size_in; } - hdr->hdr_frag.hdr_frag_length = size; - sendreq->req_frags++; + hdr->hdr_frag.hdr_frag_length = size_out; /* fragment state */ sendfrag->frag_owner = &ptl_peer->peer_ptl->super; sendfrag->super.frag_request = sendreq; sendfrag->super.super.frag_addr = sendfrag->frag_vec[1].iov_base; - sendfrag->super.super.frag_size = size; + sendfrag->super.super.frag_size = size_out; sendfrag->frag_peer = ptl_peer; sendfrag->frag_vec_ptr = sendfrag->frag_vec; - sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2; + sendfrag->frag_vec_cnt = (size_out == 0) ? 1 : 2; sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr; sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t); sendfrag->frag_progressed = 0; + *size = size_out; return LAM_SUCCESS; } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h index 9f04826f92..8b9dbf6098 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h @@ -44,7 +44,8 @@ int mca_ptl_tcp_send_frag_init( mca_ptl_tcp_send_frag_t*, struct mca_ptl_base_peer_t*, struct mca_ptl_base_send_request_t*, - size_t size, + size_t offset, + size_t* size, int flags); @@ -96,12 +97,16 @@ static inline void mca_ptl_tcp_send_frag_init_ack( mca_ptl_tcp_recv_frag_t* frag) { mca_ptl_base_header_t* hdr = &ack->super.super.frag_header; + mca_ptl_base_recv_request_t* request = frag->super.frag_request; hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); hdr->hdr_ack.hdr_src_ptr = frag->super.super.frag_header.hdr_frag.hdr_src_ptr; - hdr->hdr_ack.hdr_dst_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ - hdr->hdr_ack.hdr_dst_ptr.pval = frag->super.frag_request; + hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_ack.hdr_dst_match.pval = request; + hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_ack.hdr_dst_addr.pval = request->super.req_addr; + hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed; ack->super.frag_request = 0; ack->super.super.frag_peer = ptl_peer; ack->super.super.frag_owner = ptl;