/* * Copyright (c) 2004-2005 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file */ #ifndef MCA_PTL_TCP_RECV_FRAG_H #define MCA_PTL_TCP_RECV_FRAG_H #include "ompi_config.h" #include #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_SYS_SOCKET_H #include #endif #ifdef HAVE_NETINET_IN_H #include #endif #include "include/sys/atomic.h" #include "mca/ptl/base/ptl_base_recvfrag.h" #include "ptl_tcp_peer.h" #include "ptl_tcp.h" #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif extern opal_class_t mca_ptl_tcp_recv_frag_t_class; /** * TCP received fragment derived type. */ struct mca_ptl_tcp_recv_frag_t { mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */ size_t frag_hdr_cnt; /**< number of header bytes received */ size_t frag_msg_cnt; /**< number of message bytes received */ size_t frag_size; /**< size of the fragment on the wire */ bool frag_ack_pending; /**< is an ack pending for this fragment */ volatile int frag_progressed; /**< flag used to atomically progress fragment */ }; typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; #define MCA_PTL_TCP_RECV_FRAG_ALLOC(frag, rc) \ { \ opal_list_item_t* item; \ OMPI_FREE_LIST_GET(&mca_ptl_tcp_component.tcp_recv_frags, item, rc); \ frag = (mca_ptl_tcp_recv_frag_t*)item; \ } bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd); /* * Initialize a TCP receive fragment for a specific peer. */ static inline void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer) { frag->frag_recv.frag_base.frag_owner = &(peer->peer_ptl->super); frag->frag_recv.frag_base.frag_addr = NULL; frag->frag_recv.frag_base.frag_size = 0; frag->frag_recv.frag_base.frag_peer = peer; frag->frag_recv.frag_request = 0; frag->frag_recv.frag_is_buffered = false; frag->frag_hdr_cnt = 0; frag->frag_msg_cnt = 0; frag->frag_ack_pending = false; frag->frag_progressed = 0; } bool mca_ptl_tcp_recv_frag_send_ack(mca_ptl_tcp_recv_frag_t* frag); extern void* mca_ptl_tcp_memalloc( size_t* length, void* data ); static inline void mca_ptl_tcp_recv_frag_matched( mca_ptl_tcp_recv_frag_t* frag, size_t frag_offset, size_t frag_length) { mca_ptl_base_recv_request_t* request = frag->frag_recv.frag_request; /* if there is data associated with the fragment -- setup to receive */ if(frag_length > 0) { /* initialize receive convertor */ ompi_convertor_clone( &(request->req_recv.req_convertor), &(frag->frag_recv.frag_base.frag_convertor), 1 ); ompi_convertor_personalize( &frag->frag_recv.frag_base.frag_convertor, 0, &frag_offset, mca_ptl_tcp_memalloc, NULL ); /* non-contiguous - allocate buffer for receive */ if( 1 == ompi_convertor_need_buffers( &frag->frag_recv.frag_base.frag_convertor ) ) { frag->frag_recv.frag_base.frag_addr = malloc(frag_length); frag->frag_recv.frag_is_buffered = true; /* determine offset into users buffer */ } else { long true_lb, true_extent; ompi_ddt_get_true_extent( request->req_recv.req_base.req_datatype, &true_lb, &true_extent ); frag->frag_recv.frag_base.frag_addr = ((unsigned char*)request->req_recv.req_base.req_addr) + frag_offset + true_lb; } frag->frag_size = frag_length; if(frag_offset + frag_length > request->req_recv.req_bytes_packed) { if(frag_offset > request->req_recv.req_bytes_packed) frag->frag_recv.frag_base.frag_size = 0; else frag->frag_recv.frag_base.frag_size = request->req_recv.req_bytes_packed - frag_offset; } else { frag->frag_recv.frag_base.frag_size = frag_length; } } else { frag->frag_size = 0; frag->frag_recv.frag_base.frag_size = 0; } } static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag) { uint32_t iov_count; size_t max_data; int32_t freeAfter; int32_t frag_progressed = opal_atomic_add_32(&frag->frag_progressed,1); /* For a match/rendezvous packet - we need to progress the fragment after * the match has been made and all data has arrived. So this routine is * called twice for this case. Otherwise, progress a fragment as soon as * all of the data has been received. */ if((frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG && frag_progressed == 1 ) || (frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type != MCA_PTL_HDR_TYPE_FRAG && frag_progressed == 2 )) { mca_ptl_base_recv_request_t* request = frag->frag_recv.frag_request; if(frag->frag_recv.frag_is_buffered) { mca_ptl_base_frag_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_frag; size_t offset = (header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG) ? header->hdr_frag_offset : 0; /* * Initialize convertor and use it to unpack data */ struct iovec iov; ompi_convertor_clone( &(request->req_recv.req_convertor), &(frag->frag_recv.frag_base.frag_convertor), 1 ); ompi_convertor_personalize( &(frag->frag_recv.frag_base.frag_convertor), 0, &offset, mca_ptl_tcp_memalloc, NULL ); iov.iov_base = (ompi_iov_base_ptr_t)frag->frag_recv.frag_base.frag_addr; iov.iov_len = frag->frag_recv.frag_base.frag_size; iov_count = 1; max_data = iov.iov_len; ompi_convertor_unpack( &frag->frag_recv.frag_base.frag_convertor, &iov, &iov_count, &max_data, &freeAfter ); } /* progress the request */ frag->frag_recv.frag_base.frag_owner->ptl_recv_progress( frag->frag_recv.frag_base.frag_owner, request, frag->frag_size, frag->frag_recv.frag_base.frag_size); if((frag)->frag_ack_pending == false) { mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, (frag)); } } } #if defined(c_plusplus) || defined(__cplusplus) } #endif #endif