1
1
openmpi/ompi/mca/ptl/tcp/ptl_tcp_recvfrag.h
Jeff Squyres 4ab17f019b Rename src -> ompi
This commit was SVN r6269.
2005-07-02 13:43:57 +00:00

186 строки
6.9 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_TCP_RECV_FRAG_H
#define MCA_PTL_TCP_RECV_FRAG_H
#include "ompi_config.h"
#include <string.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include "include/sys/atomic.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_tcp_peer.h"
#include "ptl_tcp.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
extern ompi_class_t mca_ptl_tcp_recv_frag_t_class;
/**
* TCP received fragment derived type.
*/
struct mca_ptl_tcp_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */
size_t frag_hdr_cnt; /**< number of header bytes received */
size_t frag_msg_cnt; /**< number of message bytes received */
size_t frag_size; /**< size of the fragment on the wire */
bool frag_ack_pending; /**< is an ack pending for this fragment */
volatile int frag_progressed; /**< flag used to atomically progress fragment */
};
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
#define MCA_PTL_TCP_RECV_FRAG_ALLOC(frag, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_ptl_tcp_component.tcp_recv_frags, item, rc); \
frag = (mca_ptl_tcp_recv_frag_t*)item; \
}
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd);
/*
* Initialize a TCP receive fragment for a specific peer.
*/
static inline void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer)
{
frag->frag_recv.frag_base.frag_owner = &(peer->peer_ptl->super);
frag->frag_recv.frag_base.frag_addr = NULL;
frag->frag_recv.frag_base.frag_size = 0;
frag->frag_recv.frag_base.frag_peer = peer;
frag->frag_recv.frag_request = 0;
frag->frag_recv.frag_is_buffered = false;
frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0;
frag->frag_ack_pending = false;
frag->frag_progressed = 0;
}
bool mca_ptl_tcp_recv_frag_send_ack(mca_ptl_tcp_recv_frag_t* frag);
extern void* mca_ptl_tcp_memalloc( size_t* length, void* data );
static inline void mca_ptl_tcp_recv_frag_matched(
mca_ptl_tcp_recv_frag_t* frag,
size_t frag_offset,
size_t frag_length)
{
mca_ptl_base_recv_request_t* request = frag->frag_recv.frag_request;
/* if there is data associated with the fragment -- setup to receive */
if(frag_length > 0) {
/* initialize receive convertor */
ompi_convertor_clone( &(request->req_recv.req_convertor),
&(frag->frag_recv.frag_base.frag_convertor), 1 );
ompi_convertor_personalize( &frag->frag_recv.frag_base.frag_convertor, 0,
&frag_offset, mca_ptl_tcp_memalloc, NULL );
/* non-contiguous - allocate buffer for receive */
if( 1 == ompi_convertor_need_buffers( &frag->frag_recv.frag_base.frag_convertor ) ) {
frag->frag_recv.frag_base.frag_addr = malloc(frag_length);
frag->frag_recv.frag_is_buffered = true;
/* determine offset into users buffer */
} else {
long true_lb, true_extent;
ompi_ddt_get_true_extent( request->req_recv.req_base.req_datatype, &true_lb, &true_extent );
frag->frag_recv.frag_base.frag_addr = ((unsigned char*)request->req_recv.req_base.req_addr) +
frag_offset + true_lb;
}
frag->frag_size = frag_length;
if(frag_offset + frag_length > request->req_recv.req_bytes_packed) {
if(frag_offset > request->req_recv.req_bytes_packed)
frag->frag_recv.frag_base.frag_size = 0;
else
frag->frag_recv.frag_base.frag_size = request->req_recv.req_bytes_packed - frag_offset;
} else {
frag->frag_recv.frag_base.frag_size = frag_length;
}
} else {
frag->frag_size = 0;
frag->frag_recv.frag_base.frag_size = 0;
}
}
static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
{
uint32_t iov_count;
size_t max_data;
int32_t freeAfter;
int32_t frag_progressed = ompi_atomic_add_32(&frag->frag_progressed,1);
/* For a match/rendezvous packet - we need to progress the fragment after
* the match has been made and all data has arrived. So this routine is
* called twice for this case. Otherwise, progress a fragment as soon as
* all of the data has been received.
*/
if((frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG &&
frag_progressed == 1 ) ||
(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type != MCA_PTL_HDR_TYPE_FRAG &&
frag_progressed == 2 )) {
mca_ptl_base_recv_request_t* request = frag->frag_recv.frag_request;
if(frag->frag_recv.frag_is_buffered) {
mca_ptl_base_frag_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_frag;
size_t offset = (header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_FRAG) ?
header->hdr_frag_offset : 0;
/*
* Initialize convertor and use it to unpack data
*/
struct iovec iov;
ompi_convertor_clone( &(request->req_recv.req_convertor),
&(frag->frag_recv.frag_base.frag_convertor), 1 );
ompi_convertor_personalize( &(frag->frag_recv.frag_base.frag_convertor),
0, &offset, mca_ptl_tcp_memalloc, NULL );
iov.iov_base = (ompi_iov_base_ptr_t)frag->frag_recv.frag_base.frag_addr;
iov.iov_len = frag->frag_recv.frag_base.frag_size;
iov_count = 1;
max_data = iov.iov_len;
ompi_convertor_unpack( &frag->frag_recv.frag_base.frag_convertor,
&iov, &iov_count, &max_data, &freeAfter );
}
/* progress the request */
frag->frag_recv.frag_base.frag_owner->ptl_recv_progress(
frag->frag_recv.frag_base.frag_owner,
request,
frag->frag_size,
frag->frag_recv.frag_base.frag_size);
if((frag)->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, (frag));
}
}
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif