1
1
This commit was SVN r16723.
Этот коммит содержится в:
Brad Penoff 2007-11-13 23:39:16 +00:00
родитель bbef304f04
Коммит 5abd2d8064
23 изменённых файлов: 5379 добавлений и 0 удалений

0
ompi/mca/btl/sctp/.ompi_ignore Обычный файл
Просмотреть файл

4
ompi/mca/btl/sctp/.ompi_unignore Обычный файл
Просмотреть файл

@ -0,0 +1,4 @@
penoff
kmroz
wagner
humaira

76
ompi/mca/btl/sctp/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,76 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Use the top-level Makefile.options
CFLAGS = $(btl_sctp_CFLAGS)
AM_CPPFLAGS = $(btl_sctp_CPPFLAGS)
sources = \
btl_sctp.c \
btl_sctp.h \
btl_sctp_addr.h \
btl_sctp_component.c \
btl_sctp_endpoint.c \
btl_sctp_endpoint.h \
btl_sctp_frag.c \
btl_sctp_frag.h \
btl_sctp_hdr.h \
btl_sctp_proc.c \
btl_sctp_proc.h \
sctp_writev.c \
sctp_writev.h \
btl_sctp_recv_handler.c \
btl_sctp_recv_handler.h \
btl_sctp_utils.c \
btl_sctp_utils.h
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_btl_sctp_DSO
lib =
lib_sources =
component = mca_btl_sctp.la
component_sources = $(sources)
else
lib = libmca_btl_sctp.la
lib_sources = $(sources)
component =
component_sources =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component)
mca_btl_sctp_la_SOURCES = $(component_sources)
mca_btl_sctp_la_LDFLAGS = -module -avoid-version $(btl_sctp_LDFLAGS)
mca_btl_sctp_la_LIBADD = \
$(btl_sctp_LIBS) \
$(top_ompi_builddir)/ompi/libmpi.la \
$(top_ompi_builddir)/orte/libopen-rte.la \
$(top_ompi_builddir)/opal/libopen-pal.la
#mca_btl_sctp_la_CPPFLAGS = $(btl_sctp_CPPFLAGS)
noinst_LTLIBRARIES = $(lib)
libmca_btl_sctp_la_SOURCES = $(lib_sources)
libmca_btl_sctp_la_LDFLAGS = -module -avoid-version $(btl_sctp_LDFLAGS)
#libmca_btl_sctp_la_CPPFLAGS = $(btl_sctp_CPPFLAGS)

508
ompi/mca/btl/sctp/btl_sctp.c Обычный файл
Просмотреть файл

@ -0,0 +1,508 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "opal/util/output.h"
#include "opal/util/if.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h"
#include "btl_sctp.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_proc.h"
#include "btl_sctp_endpoint.h"
#include "ompi/datatype/convertor.h"
#include "ompi/datatype/datatype.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/proc/proc.h"
mca_btl_sctp_module_t mca_btl_sctp_module = {
{
&mca_btl_sctp_component.super,
0, /* max size of first fragment */
0, /* min send fragment size */
0, /* max send fragment size */
0, /* btl_rdma_pipeline_send_length */
0, /* btl_rdma_pipeline_frag_size */
0, /* btl_min_rdma_pipeline_size */
0, /* exclusivity */
0, /* latency */
0, /* bandwidth */
0, /* flags */
mca_btl_sctp_add_procs,
mca_btl_sctp_del_procs,
mca_btl_sctp_register,
mca_btl_sctp_finalize,
mca_btl_sctp_alloc,
mca_btl_sctp_free,
mca_btl_sctp_prepare_src,
mca_btl_sctp_prepare_dst,
mca_btl_sctp_send,
mca_btl_sctp_put,
NULL, /* get */
mca_btl_base_dump,
NULL, /* mpool */
NULL /* register error */
}
};
/**
*
*/
int mca_btl_sctp_add_procs(
struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **ompi_procs,
struct mca_btl_base_endpoint_t** peers,
ompi_bitmap_t* reachable)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*)btl;
ompi_proc_t* my_proc; /* pointer to caller's proc structure */
int i, rc;
/* get pointer to my proc structure */
my_proc = ompi_proc_local();
if( NULL == my_proc ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for(i = 0; i < (int) nprocs; i++) {
struct ompi_proc_t* ompi_proc = ompi_procs[i];
mca_btl_sctp_proc_t* sctp_proc;
mca_btl_base_endpoint_t* sctp_endpoint;
/* Do not create loopback SCTP connections */
if( my_proc == ompi_proc ) {
continue;
}
if(NULL == (sctp_proc = mca_btl_sctp_proc_create(ompi_proc))) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*
* Check to make sure that the peer has at least as many interface
* addresses exported as we are trying to use. If not, then
* don't bind this BTL instance to the proc.
*/
OPAL_THREAD_LOCK(&sctp_proc->proc_lock);
/* The btl_proc datastructure is shared by all SCTP BTL
* instances that are trying to reach this destination.
* Cache the peer instance on the btl_proc.
*/
sctp_endpoint = OBJ_NEW(mca_btl_sctp_endpoint_t);
if(NULL == sctp_endpoint) {
OPAL_THREAD_UNLOCK(&sctp_proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
sctp_endpoint->endpoint_btl = sctp_btl;
rc = mca_btl_sctp_proc_insert(sctp_proc, sctp_endpoint);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(sctp_endpoint);
OPAL_THREAD_UNLOCK(&sctp_proc->proc_lock);
continue;
}
ompi_bitmap_set_bit(reachable, i);
OPAL_THREAD_UNLOCK(&sctp_proc->proc_lock);
peers[i] = sctp_endpoint;
opal_list_append(&sctp_btl->sctp_endpoints, (opal_list_item_t*)sctp_endpoint);
/* we increase the count of MPI users of the event library
once per peer, so that we are used until we aren't
connected to a peer */
opal_progress_event_users_increment();
}
return OMPI_SUCCESS;
}
int mca_btl_sctp_del_procs(struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t ** endpoints)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*)btl;
size_t i;
for(i=0; i<nprocs; i++) {
mca_btl_sctp_endpoint_t* sctp_endpoint = endpoints[i];
if(sctp_endpoint->endpoint_proc != mca_btl_sctp_proc_local()) {
opal_list_remove_item(&sctp_btl->sctp_endpoints, (opal_list_item_t*)sctp_endpoint);
OBJ_RELEASE(sctp_endpoint);
}
opal_progress_event_users_decrement();
}
return OMPI_SUCCESS;
}
/**
* Register callback function to support send/recv semantics
*/
int mca_btl_sctp_register(
struct mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_module_recv_cb_fn_t cbfunc,
void* cbdata)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*) btl;
sctp_btl->sctp_reg[tag].cbfunc = cbfunc;
sctp_btl->sctp_reg[tag].cbdata = cbdata;
return OMPI_SUCCESS;
}
/**
* Allocate a segment.
*
* @param btl (IN) BTL module
* @param size (IN) Request segment size.
*/
mca_btl_base_descriptor_t* mca_btl_sctp_alloc(
struct mca_btl_base_module_t* btl,
uint8_t order,
size_t size)
{
mca_btl_sctp_frag_t* frag;
int rc;
if(size <= btl->btl_eager_limit) {
MCA_BTL_SCTP_FRAG_ALLOC_EAGER(frag, rc);
frag->segments[0].seg_len = size;
} else if (size <= btl->btl_max_send_size) {
MCA_BTL_SCTP_FRAG_ALLOC_MAX(frag, rc);
frag->segments[0].seg_len = size;
} else {
return NULL;
}
frag->segments[0].seg_addr.pval = frag+1;
frag->base.des_src = frag->segments;
frag->base.des_src_cnt = 1;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
frag->base.des_flags = 0;
frag->base.order = MCA_BTL_NO_ORDER;
frag->btl = (mca_btl_sctp_module_t*)btl;
return (mca_btl_base_descriptor_t*)frag;
}
/**
* Return a segment
*/
int mca_btl_sctp_free(
struct mca_btl_base_module_t* btl,
mca_btl_base_descriptor_t* des)
{
mca_btl_sctp_frag_t* frag = (mca_btl_sctp_frag_t*)des;
MCA_BTL_SCTP_FRAG_RETURN(frag);
return OMPI_SUCCESS;
}
/**
* Pack data and return a descriptor that can be
* used for send/put.
*
* @param btl (IN) BTL module
* @param peer (IN) BTL peer addressing
*/
mca_btl_base_descriptor_t* mca_btl_sctp_prepare_src(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_mpool_base_registration_t* registration,
struct ompi_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size)
{
mca_btl_sctp_frag_t* frag;
struct iovec iov;
uint32_t iov_count = 1;
size_t max_data = *size;
int rc;
/*
* if we aren't pinning the data and the requested size is less
* than the eager limit pack into a fragment from the eager pool
*/
if (max_data+reserve <= btl->btl_eager_limit) {
MCA_BTL_SCTP_FRAG_ALLOC_EAGER(frag, rc);
}
/*
* otherwise pack as much data as we can into a fragment
* that is the max send size.
*/
else {
MCA_BTL_SCTP_FRAG_ALLOC_MAX(frag, rc);
}
if(NULL == frag) {
return NULL;
}
if(max_data == 0) {
frag->segments[0].seg_addr.pval = (frag + 1);
frag->segments[0].seg_len = reserve;
frag->base.des_src_cnt = 1;
} else if(ompi_convertor_need_buffers(convertor)) {
if (max_data + reserve > frag->size) {
max_data = frag->size - reserve;
}
iov.iov_len = max_data;
iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)(frag+1)) + reserve);
rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data );
if( rc < 0 ) {
mca_btl_sctp_free(btl, &frag->base);
return NULL;
}
frag->segments[0].seg_addr.pval = (frag + 1);
frag->segments[0].seg_len = max_data + reserve;
frag->base.des_src_cnt = 1;
} else {
iov.iov_len = max_data;
iov.iov_base = NULL;
rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data );
if( rc < 0 ) {
mca_btl_sctp_free(btl, &frag->base);
return NULL;
}
frag->segments[0].seg_addr.pval = frag+1;
frag->segments[0].seg_len = reserve;
frag->segments[1].seg_addr.pval = iov.iov_base;
frag->segments[1].seg_len = max_data;
frag->base.des_src_cnt = 2;
}
frag->base.des_src = frag->segments;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
frag->base.des_flags = 0;
*size = max_data;
return &frag->base;
}
/**
* Prepare a descriptor for send/rdma using the supplied
* convertor. If the convertor references data that is contigous,
* the descriptor may simply point to the user buffer. Otherwise,
* this routine is responsible for allocating buffer space and
* packing if required.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL peer addressing
* @param convertor (IN) Data type convertor
* @param reserve (IN) Additional bytes requested by upper layer to precede user data
* @param size (IN/OUT) Number of bytes to prepare (IN), number of bytes actually prepared (OUT)
*/
mca_btl_base_descriptor_t* mca_btl_sctp_prepare_dst(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_mpool_base_registration_t* registration,
struct ompi_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size)
{
mca_btl_sctp_frag_t* frag;
ptrdiff_t lb;
int rc;
MCA_BTL_SCTP_FRAG_ALLOC_USER(frag, rc);
if(NULL == frag) {
return NULL;
}
ompi_ddt_type_lb(convertor->pDesc, &lb);
frag->segments->seg_len = *size;
frag->segments->seg_addr.pval = convertor->pBaseBuf + lb + convertor->bConverted;
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = frag->segments;
frag->base.des_dst_cnt = 1;
frag->base.des_flags = 0;
return &frag->base;
}
/**
* Initiate an asynchronous send.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transfered
* @param tag (IN) The tag value used to notify the peer.
*/
int mca_btl_sctp_send(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
mca_btl_base_tag_t tag)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*) btl;
mca_btl_sctp_frag_t* frag = (mca_btl_sctp_frag_t*)descriptor;
size_t i;
frag->btl = sctp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->iov_cnt = 1;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->hdr.size = 0;
for(i=0; i<frag->base.des_src_cnt; i++) {
frag->hdr.size += frag->segments[i].seg_len;
frag->iov[i+1].iov_len = frag->segments[i].seg_len;
frag->iov[i+1].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
frag->iov_cnt++;
}
frag->hdr.base.tag = tag;
frag->hdr.type = MCA_BTL_SCTP_HDR_TYPE_SEND;
frag->hdr.count = 0;
if (endpoint->endpoint_nbo) {
MCA_BTL_SCTP_HDR_HTON(frag->hdr);
}
return mca_btl_sctp_endpoint_send(endpoint,frag);
}
/**
* Initiate an asynchronous put.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
int mca_btl_sctp_put(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*) btl;
mca_btl_sctp_frag_t* frag = (mca_btl_sctp_frag_t*)descriptor;
size_t i;
frag->btl = sctp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->hdr.size = 0;
frag->iov_cnt = 2;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_dst;
frag->iov[1].iov_len = frag->base.des_dst_cnt * sizeof(mca_btl_base_segment_t);
for(i=0; i<frag->base.des_src_cnt; i++) {
frag->hdr.size += frag->segments[i].seg_len;
frag->iov[i+2].iov_len = frag->segments[i].seg_len;
frag->iov[i+2].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
frag->iov_cnt++;
}
frag->hdr.base.tag = MCA_BTL_TAG_BTL;
frag->hdr.type = MCA_BTL_SCTP_HDR_TYPE_PUT;
frag->hdr.count = frag->base.des_dst_cnt;
if (endpoint->endpoint_nbo) {
MCA_BTL_SCTP_HDR_HTON(frag->hdr);
}
return mca_btl_sctp_endpoint_send(endpoint,frag);
}
/**
* Initiate an asynchronous get.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*
*/
int mca_btl_sctp_get(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*) btl;
mca_btl_sctp_frag_t* frag = (mca_btl_sctp_frag_t*)descriptor;
frag->btl = sctp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->hdr.size = 0;
frag->iov_cnt = 2;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_src;
frag->iov[1].iov_len = frag->base.des_src_cnt * sizeof(mca_btl_base_segment_t);
frag->hdr.base.tag = MCA_BTL_TAG_BTL;
frag->hdr.type = MCA_BTL_SCTP_HDR_TYPE_GET;
frag->hdr.count = frag->base.des_src_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_SCTP_HDR_HTON(frag->hdr);
return mca_btl_sctp_endpoint_send(endpoint,frag);
}
/*
* Cleanup/release module resources.
*/
int mca_btl_sctp_finalize(struct mca_btl_base_module_t* btl)
{
mca_btl_sctp_module_t* sctp_btl = (mca_btl_sctp_module_t*) btl;
opal_list_item_t* item;
for( item = opal_list_remove_first(&sctp_btl->sctp_endpoints);
item != NULL;
item = opal_list_remove_first(&sctp_btl->sctp_endpoints)) {
mca_btl_sctp_endpoint_t *endpoint = (mca_btl_sctp_endpoint_t*)item;
OBJ_RELEASE(endpoint);
opal_progress_event_users_decrement();
}
free(sctp_btl);
return OMPI_SUCCESS;
}

348
ompi/mca/btl/sctp/btl_sctp.h Обычный файл
Просмотреть файл

@ -0,0 +1,348 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_SCTP_H
#define MCA_BTL_SCTP_H
/* Standard system includes */
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
/* Open MPI includes */
#include "opal/event/event.h"
#include "opal/util/output.h"
#include "ompi/class/ompi_bitmap.h"
#include "ompi/class/ompi_free_list.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/mca/btl/btl.h"
#include "opal/class/opal_hash_table.h"
/* For the assocID -> proc mapping, do we use opal_hash_table (0)
* or our homemade array (1)?
*/
#define MCA_BTL_SCTP_DONT_USE_HASH 1
#if 1
/* if you do not want to see these debug messages */
#define SCTP_BTL_ERROR(args)
#else
#define SCTP_BTL_ERROR(args) BTL_ERROR(args)
#endif
#define MCA_BTL_SCTP_STATISTICS 0
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* SCTP BTL component.
*/
struct mca_btl_sctp_component_t {
mca_btl_base_component_1_0_1_t super; /**< base BTL component */
uint32_t sctp_num_btls; /**< number of hcas available to the SCTP component */
struct mca_btl_sctp_module_t **sctp_btls; /**< array of available BTL modules */
struct mca_btl_sctp_proc_t* sctp_local; /**< local proc struct */
int sctp_free_list_num; /**< initial size of free lists */
int sctp_free_list_max; /**< maximum size of free lists */
int sctp_free_list_inc; /**< number of elements to alloc when growing free lists */
int sctp_endpoint_cache; /**< amount of cache on each endpoint */
opal_hash_table_t sctp_procs; /**< hash table of sctp proc structures */
#if MCA_BTL_SCTP_DONT_USE_HASH
#else
opal_hash_table_t sctp_assocID_hash; /**< hash table of procs keyed on assocIDs */
#endif
opal_list_t sctp_events; /**< list of pending sctp events */
opal_mutex_t sctp_lock; /**< lock for accessing module state */
opal_event_t sctp_recv_event; /**< recv event for listen socket */
int sctp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short sctp_listen_port; /**< listen port */
char* sctp_if_include; /**< comma seperated list of interface to include */
char* sctp_if_exclude; /**< comma seperated list of interface to exclude */
int sctp_sndbuf; /**< socket sndbuf size */
int sctp_rcvbuf; /**< socket rcvbuf size */
int sctp_use_nodelay; /**< SCTP_NODELAY value */
int sctp_if_11; /**< Are we going 1 to 1? 1 to many is default. */
/* free list of fragment descriptors */
ompi_free_list_t sctp_frag_eager;
ompi_free_list_t sctp_frag_max;
ompi_free_list_t sctp_frag_user;
};
typedef struct mca_btl_sctp_component_t mca_btl_sctp_component_t;
OMPI_MODULE_DECLSPEC extern mca_btl_sctp_component_t mca_btl_sctp_component;
/**
* BTL Module Interface
*/
struct mca_btl_sctp_module_t {
mca_btl_base_module_t super; /**< base BTL interface */
mca_btl_base_recv_reg_t sctp_reg[256];
int sctp_ifindex; /**< PTL interface index */
struct sockaddr_in sctp_ifaddr; /**< PTL interface address */
struct sockaddr_in sctp_ifmask; /**< PTL interface netmask */
opal_list_t sctp_endpoints;
#if MCA_BTL_SCTP_STATISTICS
size_t sctp_bytes_sent;
size_t sctp_bytes_recv;
size_t sctp_send_handler;
#endif
};
typedef struct mca_btl_sctp_module_t mca_btl_sctp_module_t;
extern mca_btl_sctp_module_t mca_btl_sctp_module;
#if defined(__WINDOWS__)
#define CLOSE_THE_SOCKET(socket) closesocket(socket)
#else
#define CLOSE_THE_SOCKET(socket) close(socket)
#endif /* defined(__WINDOWS__) */
/**
* Register SCTP component parameters with the MCA framework
*/
extern int mca_btl_sctp_component_open(void);
/**
* Any final cleanup before being unloaded.
*/
extern int mca_btl_sctp_component_close(void);
/**
* SCTP component initialization.
*
* @param num_btl_modules (OUT) Number of BTLs returned in BTL array.
* @param allow_multi_user_threads (OUT) Flag indicating wether BTL supports user threads (TRUE)
* @param have_hidden_threads (OUT) Flag indicating wether BTL uses threads (TRUE)
*/
extern mca_btl_base_module_t** mca_btl_sctp_component_init(
int *num_btl_modules,
bool allow_multi_user_threads,
bool have_hidden_threads
);
/**
* SCTP component control.
*/
int mca_btl_sctp_component_control(
int param,
void* value,
size_t size
);
/**
* SCTP component progress.
*/
extern int mca_btl_sctp_component_progress(void);
/**
* Cleanup any resources held by the BTL.
*
* @param btl BTL instance.
* @return OMPI_SUCCESS or error status on failure.
*/
extern int mca_btl_sctp_finalize(
struct mca_btl_base_module_t* btl
);
/**
* PML->BTL notification of change in the process list.
*
* @param btl (IN)
* @param nprocs (IN) Number of processes
* @param procs (IN) Set of processes
* @param peers (OUT) Set of (optional) peer addressing info.
* @param peers (IN/OUT) Set of processes that are reachable via this BTL.
* @return OMPI_SUCCESS or error status on failure.
*
*/
extern int mca_btl_sctp_add_procs(
struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t** peers,
ompi_bitmap_t* reachable
);
/**
* PML->BTL notification of change in the process list.
*
* @param btl (IN) BTL instance
* @param nproc (IN) Number of processes.
* @param procs (IN) Set of processes.
* @param peers (IN) Set of peer data structures.
* @return Status indicating if cleanup was successful
*
*/
extern int mca_btl_sctp_del_procs(
struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t** peers
);
/**
* Initiate an asynchronous send.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transfered
* @param tag (IN) The tag value used to notify the peer.
*/
extern int mca_btl_sctp_send(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* descriptor,
mca_btl_base_tag_t tag
);
/**
* Initiate an asynchronous put.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
extern int mca_btl_sctp_put(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* decriptor
);
/**
* Initiate an asynchronous get.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
extern int mca_btl_sctp_get(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* decriptor
);
/**
* Register a callback function that is called on receipt
* of a fragment.
*
* @param btl (IN) BTL module
* @return Status indicating if registration was successful
*
*/
extern int mca_btl_sctp_register(
struct mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_module_recv_cb_fn_t cbfunc,
void* cbdata);
/**
* Allocate a descriptor with a segment of the requested size.
* Note that the BTL layer may choose to return a smaller size
* if it cannot support the request.
*
* @param btl (IN) BTL module
* @param size (IN) Request segment size.
*/
extern mca_btl_base_descriptor_t* mca_btl_sctp_alloc(
struct mca_btl_base_module_t* btl,
uint8_t order,
size_t size);
/**
* Return a segment allocated by this BTL.
*
* @param btl (IN) BTL module
* @param descriptor (IN) Allocated descriptor.
*/
extern int mca_btl_sctp_free(
struct mca_btl_base_module_t* btl,
mca_btl_base_descriptor_t* des);
/**
* Prepare a descriptor for send/rdma using the supplied
* convertor. If the convertor references data that is contigous,
* the descriptor may simply point to the user buffer. Otherwise,
* this routine is responsible for allocating buffer space and
* packing if required.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL peer addressing
* @param convertor (IN) Data type convertor
* @param reserve (IN) Additional bytes requested by upper layer to precede user data
* @param size (IN/OUT) Number of bytes to prepare (IN), number of bytes actually prepared (OUT)
*/
mca_btl_base_descriptor_t* mca_btl_sctp_prepare_src(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
struct mca_mpool_base_registration_t*,
struct ompi_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size
);
extern mca_btl_base_descriptor_t* mca_btl_sctp_prepare_dst(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
struct mca_mpool_base_registration_t*,
struct ompi_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

46
ompi/mca/btl/sctp/btl_sctp_addr.h Обычный файл
Просмотреть файл

@ -0,0 +1,46 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_SCTP_ADDR_H
#define MCA_BTL_SCTP_ADDR_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
/**
* Structure used to publish SCTP connection information to peers.
*/
struct mca_btl_sctp_addr_t {
struct in_addr addr_inet; /**< IPv4 address in network byte order */
in_port_t addr_port; /**< listen port */
unsigned short addr_inuse; /**< local meaning only */
};
typedef struct mca_btl_sctp_addr_t mca_btl_sctp_addr_t;
#endif

952
ompi/mca/btl/sctp/btl_sctp_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,952 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "ompi_config.h"
#include "opal/opal_socket_errno.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <string.h>
#include <fcntl.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include "ompi/constants.h"
#include "opal/event/event.h"
#include "opal/util/if.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/ns/ns_types.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "btl_sctp.h"
#include "btl_sctp_addr.h"
#include "btl_sctp_proc.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_endpoint.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/datatype/convertor.h"
#include <netinet/sctp.h>
#include "btl_sctp_recv_handler.h"
#include "btl_sctp_component.h"
mca_btl_sctp_component_t mca_btl_sctp_component = {
{
/* First, the mca_base_component_t struct containing meta information
about the component itself */
{
/* Indicate that we are a pml v1.0.0 component (which also implies a
specific MCA version) */
MCA_BTL_BASE_VERSION_1_0_1,
"sctp", /* MCA component name */
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
mca_btl_sctp_component_open, /* component open */
mca_btl_sctp_component_close /* component close */
},
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
mca_btl_sctp_component_init,
NULL,
}
};
/*
* utility routines for parameter registration
*/
static inline char* mca_btl_sctp_param_register_string(
const char* param_name,
const char* default_value)
{
char *param_value;
char *help_string = NULL;
mca_base_param_reg_string(&mca_btl_sctp_component.super.btl_version,
param_name, help_string, false, false,
default_value, &param_value);
return param_value;
}
static inline int mca_btl_sctp_param_register_int(
const char* param_name,
int default_value)
{
int param_value;
char *help_string = NULL;
mca_base_param_reg_int(&mca_btl_sctp_component.super.btl_version,
param_name, help_string, false, false,
default_value, &param_value);
return param_value;
}
/*
* Data structure for accepting connections.
*/
struct mca_btl_sctp_event_t {
opal_list_item_t item;
opal_event_t event;
};
typedef struct mca_btl_sctp_event_t mca_btl_sctp_event_t;
static void mca_btl_sctp_event_construct(mca_btl_sctp_event_t* event)
{
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
opal_list_append(&mca_btl_sctp_component.sctp_events, &event->item);
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
}
static void mca_btl_sctp_event_destruct(mca_btl_sctp_event_t* event)
{
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
opal_list_remove_item(&mca_btl_sctp_component.sctp_events, &event->item);
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
}
OBJ_CLASS_INSTANCE(
mca_btl_sctp_event_t,
opal_list_item_t,
mca_btl_sctp_event_construct,
mca_btl_sctp_event_destruct);
/*
* functions for receiving event callbacks
*/
static void mca_btl_sctp_component_recv_handler(int, short, void*); /* for 1-1 */
/* mca_btl_sctp_recv_handler(int, short, void*) for 1-many is in btl_sctp_recv_handler.h */
/*
* Called by MCA framework to open the component, registers
* component parameters.
*/
int mca_btl_sctp_component_open(void)
{
#ifdef __WINDOWS__
WSADATA win_sock_data;
if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) {
BTL_ERROR(("failed to initialise windows sockets:%d", WSAGetLastError()));
return OMPI_ERROR;
}
#endif
/* initialize state */
mca_btl_sctp_component.sctp_listen_sd = -1;
/* TODO different sd for ipv6 */
mca_btl_sctp_component.sctp_num_btls=0;
/* addr_count */
mca_btl_sctp_component.sctp_btls=NULL;
/* initialize objects */
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_procs, opal_hash_table_t);
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_events, opal_list_t);
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_frag_eager, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_frag_max, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_frag_user, ompi_free_list_t);
opal_hash_table_init(&mca_btl_sctp_component.sctp_procs, 256);
#if MCA_BTL_SCTP_DONT_USE_HASH
/* TODO make this only allocate how much it needs to. Currently
* allocates 256 (to match sctp_procs). recvr_proc_table and
* sender_proc_table are malloc'd in mca_btl_sctp_component_init.
*/
recvr_proc_table = NULL;
sender_proc_table = NULL;
#else
OBJ_CONSTRUCT(&mca_btl_sctp_component.sctp_assocID_hash, opal_hash_table_t);
opal_hash_table_init(&mca_btl_sctp_component.sctp_assocID_hash, 256);
#endif
/* register SCTP component parameters */
/* num links */
mca_btl_sctp_component.sctp_if_include =
mca_btl_sctp_param_register_string("if_include", "");
mca_btl_sctp_component.sctp_if_exclude =
mca_btl_sctp_param_register_string("if_exclude", "lo");
mca_btl_sctp_component.sctp_free_list_num =
mca_btl_sctp_param_register_int ("free_list_num", 8);
mca_btl_sctp_component.sctp_free_list_max =
mca_btl_sctp_param_register_int ("free_list_max", -1);
mca_btl_sctp_component.sctp_free_list_inc =
mca_btl_sctp_param_register_int ("free_list_inc", 32);
mca_btl_sctp_component.sctp_sndbuf =
mca_btl_sctp_param_register_int ("sndbuf", 128*1024);
mca_btl_sctp_component.sctp_rcvbuf =
mca_btl_sctp_param_register_int ("rcvbuf", 128*1024);
mca_btl_sctp_component.sctp_endpoint_cache =
mca_btl_sctp_param_register_int ("endpoint_cache", 30*1024);
mca_btl_sctp_component.sctp_use_nodelay =
!mca_btl_sctp_param_register_int ("use_nagle", 0);
/* port_min */
/* port_range */
/* use a single one-to-many socket by default */
mca_btl_sctp_component.sctp_if_11 =
mca_btl_sctp_param_register_int ("if_11", 0);
/* have lower exclusivity than tcp */
mca_btl_sctp_module.super.btl_exclusivity = MCA_BTL_EXCLUSIVITY_LOW - 1;
mca_btl_sctp_module.super.btl_eager_limit = 64*1024;
mca_btl_sctp_module.super.btl_min_send_size = 64*1024;
mca_btl_sctp_module.super.btl_max_send_size = 128*1024;
mca_btl_sctp_module.super.btl_rdma_pipeline_send_length = 128*1024;
mca_btl_sctp_module.super.btl_rdma_pipeline_frag_size = INT_MAX;
mca_btl_sctp_module.super.btl_min_rdma_pipeline_size = 0;
mca_btl_sctp_module.super.btl_flags = MCA_BTL_FLAGS_PUT |
MCA_BTL_FLAGS_SEND_INPLACE |
MCA_BTL_FLAGS_NEED_CSUM |
MCA_BTL_FLAGS_NEED_ACK |
MCA_BTL_FLAGS_HETEROGENEOUS_RDMA;
mca_btl_sctp_module.super.btl_bandwidth = 100;
mca_btl_sctp_module.super.btl_latency = 100;
mca_btl_base_param_register(&mca_btl_sctp_component.super.btl_version,
&mca_btl_sctp_module.super);
/* disable_family */
/* setup receive buffer */
if(0 == mca_btl_sctp_recv_handler_initbuf()) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
return OMPI_SUCCESS;
}
/*
* module cleanup - sanity checking of queue lengths
*/
int mca_btl_sctp_component_close(void)
{
opal_list_item_t* item;
if(NULL != mca_btl_sctp_component.sctp_if_include) {
free(mca_btl_sctp_component.sctp_if_include);
}
if(NULL != mca_btl_sctp_component.sctp_if_exclude) {
free(mca_btl_sctp_component.sctp_if_exclude);
}
if (NULL != mca_btl_sctp_component.sctp_btls) {
free(mca_btl_sctp_component.sctp_btls);
}
mca_btl_sctp_recv_handler_freebuf();
if (mca_btl_sctp_component.sctp_listen_sd >= 0) {
opal_event_del(&mca_btl_sctp_component.sctp_recv_event);
CLOSE_THE_SOCKET(mca_btl_sctp_component.sctp_listen_sd);
mca_btl_sctp_component.sctp_listen_sd = -1;
}
/* cleanup any pending events */
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
for(item = opal_list_remove_first(&mca_btl_sctp_component.sctp_events);
item != NULL;
item = opal_list_remove_first(&mca_btl_sctp_component.sctp_events)) {
mca_btl_sctp_event_t* event = (mca_btl_sctp_event_t*)item;
opal_event_del(&event->event);
OBJ_RELEASE(event);
}
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
/* release resources */
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_procs);
#if MCA_BTL_SCTP_DONT_USE_HASH
if(NULL != recvr_proc_table) {
free(recvr_proc_table);
}
if(NULL != sender_proc_table) {
free(sender_proc_table);
}
#else
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_assocID_hash);
#endif
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_events);
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_frag_eager);
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_frag_max);
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_frag_user);
OBJ_DESTRUCT(&mca_btl_sctp_component.sctp_lock);
#ifdef __WINDOWS__
WSACleanup();
#endif
return OMPI_SUCCESS;
}
/*
* Create a btl instance and add to modules list.
*/
static int mca_btl_sctp_create(int if_index, const char* if_name)
{
if(mca_btl_sctp_component.sctp_if_11) {
char param[256];
struct mca_btl_sctp_module_t* btl = (struct mca_btl_sctp_module_t *)malloc(sizeof(mca_btl_sctp_module_t));
if(NULL == btl) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(btl, &mca_btl_sctp_module, sizeof(mca_btl_sctp_module));
OBJ_CONSTRUCT(&btl->sctp_endpoints, opal_list_t);
mca_btl_sctp_component.sctp_btls[mca_btl_sctp_component.sctp_num_btls++] = btl;
/* initialize the btl */
btl->sctp_ifindex = if_index;
#if MCA_BTL_SCTP_STATISTICS
btl->sctp_bytes_recv = 0;
btl->sctp_bytes_sent = 0;
btl->sctp_send_handler = 0;
#endif
opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->sctp_ifaddr, sizeof(btl->sctp_ifaddr));
/* prepare for bind call later before connect */
btl->sctp_ifaddr.sin_family = AF_INET;
#ifdef FREEBSD
btl->sctp_ifaddr.sin_len = sizeof(struct sockaddr);
#endif
btl->sctp_ifaddr.sin_port = 0;
opal_ifindextomask(if_index, (uint32_t *)&btl->sctp_ifmask, sizeof(btl->sctp_ifmask));
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s", if_name);
btl->super.btl_bandwidth = mca_btl_sctp_param_register_int(param, 0);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s", if_name);
btl->super.btl_latency = mca_btl_sctp_param_register_int(param, 0);
#if 0 && OMPI_ENABLE_DEBUG
BTL_OUTPUT(("interface: %s bandwidth %d latency %d",
if_name, btl->super.btl_bandwidth, btl->super.btl_latency));
#endif
return OMPI_SUCCESS;
}
else {
/* 1 to many */
struct mca_btl_sctp_module_t* btl;
char param[256];
struct sockaddr_in next_ifaddr;
socklen_t len = sizeof(struct sockaddr_in);
opal_socklen_t addrlen;
/* check if this is the first time this function is being called */
if(0 == mca_btl_sctp_component.sctp_num_btls) {
/* fill in btl struct with first interface's information (arbitary) */
btl = (struct mca_btl_sctp_module_t *)malloc(sizeof(mca_btl_sctp_module_t));
if(NULL == btl) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(btl, &mca_btl_sctp_module, sizeof(mca_btl_sctp_module));
OBJ_CONSTRUCT(&btl->sctp_endpoints, opal_list_t);
mca_btl_sctp_component.sctp_btls[mca_btl_sctp_component.sctp_num_btls++] = btl;
/* initialize the btl */
btl->sctp_ifindex = if_index;
#if MCA_BTL_SCTP_STATISTICS
btl->sctp_bytes_recv = 0;
btl->sctp_bytes_sent = 0;
btl->sctp_send_handler = 0;
#endif
opal_ifindextoaddr(if_index, (struct sockaddr*)&btl->sctp_ifaddr, sizeof(btl->sctp_ifaddr));
opal_ifindextomask(if_index, (uint32_t *)&btl->sctp_ifmask, sizeof(btl->sctp_ifmask));
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s", if_name);
btl->super.btl_bandwidth = mca_btl_sctp_param_register_int(param, 0);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s", if_name);
btl->super.btl_latency = mca_btl_sctp_param_register_int(param, 0);
#if 0 && OMPI_ENABLE_DEBUG
BTL_OUTPUT(("interface: %s bandwidth %d latency %d",
if_name, btl->super.btl_bandwidth, btl->super.btl_latency));
#endif
/* call bind to this (initial) addr */
opal_ifindextoaddr(if_index, (struct sockaddr*)&next_ifaddr, sizeof(next_ifaddr));
next_ifaddr.sin_family = AF_INET;
#ifdef FREEBSD
next_ifaddr.sin_len = sizeof(struct sockaddr);
#endif
next_ifaddr.sin_port = 0;
if(bind(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr *) &next_ifaddr, len) < 0) {
return OMPI_ERR_FATAL;
}
/* resolve system assignend port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr*)&next_ifaddr, &addrlen) < 0) {
BTL_ERROR(("getsockname() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
/* need to get the port after the first bind call for subsequent
* sctp_bindx calls.
*/
mca_btl_sctp_component.sctp_listen_port = next_ifaddr.sin_port;
}
else {
next_ifaddr.sin_port = htons((unsigned short) mca_btl_sctp_component.sctp_listen_port);
/* add this addr to bindx */
opal_ifindextoaddr(if_index, (struct sockaddr*)&next_ifaddr, sizeof(next_ifaddr));
next_ifaddr.sin_family = AF_INET;
#ifdef FREEBSD
next_ifaddr.sin_len = sizeof(struct sockaddr);
#endif
if(sctp_bindx(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr *) &next_ifaddr,
1, SCTP_BINDX_ADD_ADDR) < 0) {
return OMPI_ERR_FATAL;
}
}
return OMPI_SUCCESS;
}
}
/*
* Create SCTP BTL instance(s) using either:
* (1) all interfaces specified by the user
* (2) all available interfaces
* (3) all available interfaces except for those excluded by the user
*
* For 1-1 sockets, have a BTL per interface.
* For 1-many sockets, use bind for the first interface and sctp_bindx for each interface thereafter.
*
*/
static int mca_btl_sctp_component_create_instance(void)
{
int if_count = opal_ifcount();
int if_index;
char **include;
char **exclude;
char **argv;
if(if_count <= 0) {
return OMPI_ERROR;
}
/* allocate memory for btl */
mca_btl_sctp_component.sctp_btls = (mca_btl_sctp_module_t **)malloc(sizeof(mca_btl_sctp_module_t*));
if(NULL == mca_btl_sctp_component.sctp_btls) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* if the user specified an interface list - use these exclusively */
argv = include = opal_argv_split(mca_btl_sctp_component.sctp_if_include,',');
while(argv && *argv) {
char* if_name = *argv;
int if_index = opal_ifnametoindex(if_name);
if(if_index < 0) {
BTL_ERROR(("invalid interface \"%s\"", if_name));
} else {
mca_btl_sctp_create(if_index, if_name);
}
argv++;
}
opal_argv_free(include);
if(mca_btl_sctp_component.sctp_num_btls) {
return OMPI_SUCCESS;
}
/* if the interface list was not specified by the user, create
* a BTL for each interface that was not excluded.
*/
exclude = opal_argv_split(mca_btl_sctp_component.sctp_if_exclude,',');
for(if_index = opal_ifbegin(); if_index >= 0; if_index = opal_ifnext(if_index)) {
char if_name[32];
opal_ifindextoname(if_index, if_name, sizeof(if_name));
/* check to see if this interface exists in the exclude list */
if(opal_ifcount() > 1) {
argv = exclude;
while(argv && *argv) {
if(strncmp(*argv,if_name,strlen(*argv)) == 0) {
break;
}
argv++;
}
/* if this interface was not found in the excluded list - create a BTL */
if(argv == 0 || *argv == 0) {
mca_btl_sctp_create(if_index, if_name);
}
} else {
mca_btl_sctp_create(if_index, if_name);
}
}
opal_argv_free(exclude);
return OMPI_SUCCESS;
}
/*
* Create a listen socket and bind to all interfaces
*/
static int mca_btl_sctp_component_create_listen(void)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
int rc;
struct sockaddr_in inaddr;
opal_socklen_t addrlen;
/* create a listen socket for incoming connections */
mca_btl_sctp_component.sctp_listen_sd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if(mca_btl_sctp_component.sctp_listen_sd < 0) {
BTL_ERROR(("socket() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
if((rc = mca_btl_sctp_set_socket_options(mca_btl_sctp_component.sctp_listen_sd)) != OMPI_SUCCESS) {
return rc;
}
/* bind to all addresses and dynamically assigned port */
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = 0;
if(bind(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
BTL_ERROR(("bind() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
/* resolve system assignend port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
BTL_ERROR(("getsockname() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
mca_btl_sctp_component.sctp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_btl_sctp_component.sctp_listen_sd, SOMAXCONN) < 0) {
BTL_ERROR(("listen() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
/* register listen port */
opal_event_set(
&mca_btl_sctp_component.sctp_recv_event,
mca_btl_sctp_component.sctp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_component_recv_handler,
0);
opal_event_add(&mca_btl_sctp_component.sctp_recv_event,0);
return OMPI_SUCCESS;
}
else {
/* 1 to many */
int rc;
/* create a one to many listen socket for incoming connections and ALL sent/received messages */
mca_btl_sctp_component.sctp_listen_sd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP);
if(mca_btl_sctp_component.sctp_listen_sd < 0) {
BTL_ERROR(("socket() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
if((rc = mca_btl_sctp_set_socket_options(mca_btl_sctp_component.sctp_listen_sd)) != OMPI_SUCCESS) {
return rc;
}
/* port set to zero to indicate "unset" in mca_btl_sctp_create */
mca_btl_sctp_component.sctp_listen_port = 0;
return OMPI_SUCCESS;
}
}
static int mca_btl_sctp_component_register_listen(void)
{
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_btl_sctp_component.sctp_listen_sd, SOMAXCONN) < 0) {
BTL_ERROR(("listen() failed with errno=%d", opal_socket_errno));
return OMPI_ERROR;
}
/* register listen port */
opal_event_set(
&mca_btl_sctp_component.sctp_recv_event,
mca_btl_sctp_component.sctp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_recv_handler,
0);
opal_event_add(&mca_btl_sctp_component.sctp_recv_event,0);
return OMPI_SUCCESS;
}
/*
* Register SCTP module addressing information. The MCA framework
* will make this available to all peers.
*/
static int mca_btl_sctp_component_exchange(void)
{
int rc=0;
size_t i=0;
size_t size = mca_btl_sctp_component.sctp_num_btls * sizeof(mca_btl_sctp_addr_t);
if(mca_btl_sctp_component.sctp_num_btls != 0) {
mca_btl_sctp_addr_t *addrs = (mca_btl_sctp_addr_t *)malloc(size);
for(i=0; i<mca_btl_sctp_component.sctp_num_btls; i++) {
struct mca_btl_sctp_module_t* btl = mca_btl_sctp_component.sctp_btls[i];
addrs[i].addr_inet = btl->sctp_ifaddr.sin_addr;
addrs[i].addr_port = mca_btl_sctp_component.sctp_listen_port;
addrs[i].addr_inuse = 0;
}
rc = ompi_modex_send(&mca_btl_sctp_component.super.btl_version, addrs, size);
free(addrs);
}
return rc;
}
/*
* SCTP module initialization:
* (1) read interface list from kernel and compare against module parameters
* then create a BTL instance for selected interfaces
* (2) setup SCTP listen socket for incoming connection attempts
* (3) register BTL parameters with the MCA
*/
mca_btl_base_module_t** mca_btl_sctp_component_init(int *num_btl_modules,
bool enable_progress_threads,
bool enable_mpi_threads)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
mca_btl_base_module_t **btls;
*num_btl_modules = 0;
/* initialize free lists */
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_eager,
sizeof (mca_btl_sctp_frag_eager_t) +
mca_btl_sctp_module.super.btl_eager_limit,
OBJ_CLASS (mca_btl_sctp_frag_eager_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_max,
sizeof (mca_btl_sctp_frag_max_t) +
mca_btl_sctp_module.super.btl_max_send_size,
OBJ_CLASS (mca_btl_sctp_frag_max_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_user,
sizeof (mca_btl_sctp_frag_user_t),
OBJ_CLASS (mca_btl_sctp_frag_user_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
/* create a BTL SCTP module for selected interfaces */
if(mca_btl_sctp_component_create_instance() != OMPI_SUCCESS) {
return 0;
}
/* create a SCTP listen socket for incoming connection attempts */
if(mca_btl_sctp_component_create_listen() != OMPI_SUCCESS) {
return 0;
}
/* publish SCTP parameters with the MCA framework */
if(mca_btl_sctp_component_exchange() != OMPI_SUCCESS) {
return 0;
}
btls = (mca_btl_base_module_t **)malloc(mca_btl_sctp_component.sctp_num_btls *
sizeof(mca_btl_base_module_t*));
if(NULL == btls) {
return NULL;
}
memcpy(btls, mca_btl_sctp_component.sctp_btls, mca_btl_sctp_component.sctp_num_btls*sizeof(mca_btl_sctp_module_t*));
*num_btl_modules = mca_btl_sctp_component.sctp_num_btls;
return btls;
}
else {
/* 1 to many */
int i;
mca_btl_base_module_t **btls;
*num_btl_modules = 0;
/* initialize free lists */
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_eager,
sizeof (mca_btl_sctp_frag_eager_t) +
mca_btl_sctp_module.super.btl_eager_limit,
OBJ_CLASS (mca_btl_sctp_frag_eager_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_max,
sizeof (mca_btl_sctp_frag_max_t) +
mca_btl_sctp_module.super.btl_max_send_size,
OBJ_CLASS (mca_btl_sctp_frag_max_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
ompi_free_list_init( &mca_btl_sctp_component.sctp_frag_user,
sizeof (mca_btl_sctp_frag_user_t),
OBJ_CLASS (mca_btl_sctp_frag_user_t),
mca_btl_sctp_component.sctp_free_list_num,
mca_btl_sctp_component.sctp_free_list_max,
mca_btl_sctp_component.sctp_free_list_inc,
NULL );
/* create a SCTP listen socket for incoming connection attempts */
if(mca_btl_sctp_component_create_listen() != OMPI_SUCCESS) {
return 0;
}
/* create a BTL SCTP module for selected interfaces */
if(mca_btl_sctp_component_create_instance() != OMPI_SUCCESS) {
return 0;
}
/* register the now completed single BTL */
if(mca_btl_sctp_component_register_listen() != OMPI_SUCCESS) {
return 0;
}
/* publish SCTP parameters with the MCA framework */
if(mca_btl_sctp_component_exchange() != OMPI_SUCCESS) {
return 0;
}
#if MCA_BTL_SCTP_DONT_USE_HASH
/* Initialize the proc_tables to all negative ones. */
recvr_proc_table = (mca_btl_sctp_proc_table_node *) malloc(sizeof(mca_btl_sctp_proc_table_node) * MCA_BTL_SCTP_PROC_TABLE_SIZE);
sender_proc_table = (mca_btl_sctp_proc_table_node *) malloc(sizeof(mca_btl_sctp_proc_table_node) * MCA_BTL_SCTP_PROC_TABLE_SIZE);
if(NULL == recvr_proc_table || NULL == sender_proc_table) {
return 0;
}
for(i = 0; i < MCA_BTL_SCTP_PROC_TABLE_SIZE; i++) {
recvr_proc_table[i].valid = 0;
recvr_proc_table[i].sctp_assoc_id = 0;
recvr_proc_table[i].proc = NULL;
sender_proc_table[i].valid = 0;
sender_proc_table[i].sctp_assoc_id = 0;
sender_proc_table[i].proc = NULL;
}
#endif
btls = (mca_btl_base_module_t **)malloc(mca_btl_sctp_component.sctp_num_btls *
sizeof(mca_btl_base_module_t*));
if(NULL == btls) {
return NULL;
}
memcpy(btls, mca_btl_sctp_component.sctp_btls, mca_btl_sctp_component.sctp_num_btls*sizeof(mca_btl_sctp_module_t*));
*num_btl_modules = mca_btl_sctp_component.sctp_num_btls;
return btls;
}
}
/*
* SCTP module control
*/
int mca_btl_sctp_component_control(int param, void* value, size_t size)
{
return OMPI_SUCCESS;
}
/*
* Called by mca_btl_sctp_component_recv() when the SCTP listen
* socket has pending connection requests. Accept incoming
* requests and queue for completion of the connection handshake.
*/
void mca_btl_sctp_component_accept(void)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
while(true) {
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
mca_btl_sctp_event_t *event;
int rc, sd = accept(mca_btl_sctp_component.sctp_listen_sd, (struct sockaddr*)&addr, &addrlen);
if(sd < 0) {
if(opal_socket_errno == EINTR) {
continue;
}
if(opal_socket_errno == ECONNRESET || opal_socket_errno == EBADF) {
/* closed remotely while on listen queue */
close(sd);
}
else if(opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("accept() failed with errno %d.", opal_socket_errno));
}
return;
}
if((rc = mca_btl_sctp_set_socket_options(sd)) != OMPI_SUCCESS) {
BTL_ERROR(("failed to set socket options"));
return;
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_sctp_event_t);
opal_event_set(&event->event, sd, OPAL_EV_READ, mca_btl_sctp_component_recv_handler, event);
opal_event_add(&event->event, 0);
}
}
else {
/* 1 to many */
/* Called by mca_btl_sctp_recv_handler to get a valid *user pointer */
mca_btl_sctp_event_t *event;
int sd = mca_btl_sctp_component.sctp_listen_sd;
if(sd < 0) {
BTL_ERROR(("mca_btl_sctp_component_accept(): Invalid socket descriptor.\n"));
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_sctp_event_t);
opal_event_set(&event->event, sd, OPAL_EV_READ, mca_btl_sctp_recv_handler, event);
opal_event_add(&event->event, 0);
}
}
/* Used only with one-to-one socket.
*
* Event callback when there is data available on the registered
* socket to recv.
*/
static void mca_btl_sctp_component_recv_handler(int sd, short flags, void* user)
{
orte_process_name_t guid;
struct sockaddr_in addr;
int retval;
mca_btl_sctp_proc_t* btl_proc;
opal_socklen_t addr_len = sizeof(addr);
mca_btl_sctp_event_t *event = (mca_btl_sctp_event_t *)user;
int msg_flags=0;
struct sctp_sndrcvinfo sri;
/* accept new connections on the listen socket */
if(mca_btl_sctp_component.sctp_listen_sd == sd) {
mca_btl_sctp_component_accept();
return;
}
OBJ_RELEASE(event);
retval = sctp_recvmsg(sd, (char *)&guid, sizeof(guid), 0, 0, &sri, &msg_flags);
if(retval != sizeof(guid)) {
CLOSE_THE_SOCKET(sd);
return;
}
ORTE_PROCESS_NAME_NTOH(guid);
/* lookup the corresponding process */
btl_proc = mca_btl_sctp_proc_lookup(&guid);
if(NULL == btl_proc) {
BTL_ERROR(("errno=%d",errno));
CLOSE_THE_SOCKET(sd);
return;
}
/* lookup peer address */
if(getpeername(sd, (struct sockaddr*)&addr, &addr_len) != 0) {
if(opal_socket_errno != ECONNRESET && opal_socket_errno != EBADF && opal_socket_errno != ENOTCONN) {
BTL_ERROR(("getpeername() failed with errno=%d", opal_socket_errno));
}
CLOSE_THE_SOCKET(sd);
return;
}
/* are there any existing peer instances will to accept this connection */
if(mca_btl_sctp_proc_accept(btl_proc, &addr, sd) == false) {
CLOSE_THE_SOCKET(sd);
return;
}
}

25
ompi/mca/btl/sctp/btl_sctp_component.h Обычный файл
Просмотреть файл

@ -0,0 +1,25 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#ifndef BTL_SCTP_COMPONENT_H
#define BTL_SCTP_COMPONENT_H
void mca_btl_sctp_component_accept(void);
#endif

1296
ompi/mca/btl/sctp/btl_sctp_endpoint.c Обычный файл

Разница между файлами не показана из-за своего большого размера Загрузить разницу

100
ompi/mca/btl/sctp/btl_sctp_endpoint.h Обычный файл
Просмотреть файл

@ -0,0 +1,100 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_ENDPOINT_H
#define MCA_BTL_SCTP_ENDPOINT_H
#include "opal/class/opal_list.h"
#include "opal/event/event.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h"
#include "btl_sctp_frag.h"
#include "btl_sctp.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
#define MCA_BTL_SCTP_ENDPOINT_CACHE 1
/**
* State of SCTP endpoint connection.
*/
typedef enum {
MCA_BTL_SCTP_CONNECTING = 0,
MCA_BTL_SCTP_CONNECT_ACK,
MCA_BTL_SCTP_CLOSED,
MCA_BTL_SCTP_FAILED,
MCA_BTL_SCTP_CONNECTED,
MCA_BTL_SCTP_SHUTDOWN
} mca_btl_sctp_state_t;
/**
* An abstraction that represents a connection to a endpoint process.
* An instance of mca_btl_base_endpoint_t is associated w/ each process
* and BTL pair at startup. However, connections to the endpoint
* are established dynamically on an as-needed basis:
*/
struct mca_btl_base_endpoint_t {
opal_list_item_t super;
struct mca_btl_sctp_module_t* endpoint_btl; /**< BTL instance that created this connection */
struct mca_btl_sctp_proc_t* endpoint_proc; /**< proc structure corresponding to endpoint */
struct mca_btl_sctp_addr_t* endpoint_addr; /**< address of endpoint */
int endpoint_sd; /**< socket connection to endpoint */
#if MCA_BTL_SCTP_ENDPOINT_CACHE
char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall) */
char* endpoint_cache_pos; /**< current position in the cache */
size_t endpoint_cache_length; /**< length of the data in the cache */
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
struct mca_btl_sctp_frag_t* endpoint_send_frag; /**< current send frag being processed */
struct mca_btl_sctp_frag_t* endpoint_recv_frag; /**< current recv frag being processed */
mca_btl_sctp_state_t endpoint_state; /**< current state of the connection */
size_t endpoint_retries; /**< number of connection retries attempted */
opal_list_t endpoint_frags; /**< list of pending frags to send */
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
opal_event_t endpoint_send_event; /**< event for async processing of send frags */
opal_event_t endpoint_recv_event; /**< event for async processing of recv frags */
bool endpoint_nbo; /**< convert headers to network byte order? */
int endpoint_has_initialized;
int endpoint_in_list;
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_sctp_endpoint_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_endpoint_t);
struct our_sctp_endpoint {
opal_list_item_t super;
mca_btl_sctp_endpoint_t *endpoint;
};
typedef struct our_sctp_endpoint our_sctp_endpoint;
OBJ_CLASS_DECLARATION(our_sctp_endpoint);
int mca_btl_sctp_set_socket_options(int sd);
void mca_btl_sctp_endpoint_close(mca_btl_base_endpoint_t*);
int mca_btl_sctp_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_sctp_frag_t*);
bool mca_btl_sctp_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr_in*, int);
void mca_btl_sctp_endpoint_shutdown(mca_btl_base_endpoint_t*);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

647
ompi/mca/btl/sctp/btl_sctp_frag.c Обычный файл
Просмотреть файл

@ -0,0 +1,647 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include "opal/opal_socket_errno.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_endpoint.h"
#include "orte/util/proc_info.h"
/* Needed to use sctp_writev() which is a slightly modified sctp_sendmsg()
* call.
*/
#include "sctp_writev.h"
#include "btl_sctp.h"
#include "btl_sctp_addr.h"
#include "btl_sctp_utils.h"
#include <sys/socket.h>
#include <netinet/sctp.h>
static void mca_btl_sctp_frag_common_constructor(mca_btl_sctp_frag_t* frag)
{
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
}
static void mca_btl_sctp_frag_eager_constructor(mca_btl_sctp_frag_t* frag)
{
frag->size = mca_btl_sctp_module.super.btl_eager_limit;
frag->my_list = &mca_btl_sctp_component.sctp_frag_eager;
mca_btl_sctp_frag_common_constructor(frag);
}
static void mca_btl_sctp_frag_max_constructor(mca_btl_sctp_frag_t* frag)
{
frag->size = mca_btl_sctp_module.super.btl_max_send_size;
frag->my_list = &mca_btl_sctp_component.sctp_frag_max;
mca_btl_sctp_frag_common_constructor(frag);
}
static void mca_btl_sctp_frag_user_constructor(mca_btl_sctp_frag_t* frag)
{
frag->size = 0;
frag->my_list = &mca_btl_sctp_component.sctp_frag_user;
mca_btl_sctp_frag_common_constructor(frag);
}
OBJ_CLASS_INSTANCE(
mca_btl_sctp_frag_t,
mca_btl_base_descriptor_t,
NULL,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_sctp_frag_eager_t,
mca_btl_base_descriptor_t,
mca_btl_sctp_frag_eager_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_sctp_frag_max_t,
mca_btl_base_descriptor_t,
mca_btl_sctp_frag_max_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_sctp_frag_user_t,
mca_btl_base_descriptor_t,
mca_btl_sctp_frag_user_constructor,
NULL);
/**
* int mca_btl_sctp_frag_get_msg_size(mca_btl_sctp_frag_t *frag)
* -------------------------------------------
* Returns the full size of the message to send (stored in the iov array)
* including all header information.
*/
int mca_btl_sctp_frag_get_msg_size(mca_btl_sctp_frag_t *frag) {
int count;
int size = 0;
for(count = 0; count < (int) frag->iov_cnt; count++) {
size += frag->iov_ptr[count].iov_len;
}
return size;
}
/**
* mca_btl_sctp_frag_large_send(mca_btl_sctp_frag_t* frag, int sd)
* ---------------------------------------------------------------
* Hopefully will fragment the message and send it on the wire. One approach is
* to fragment the message, then call mca_btl_sctp_frag_send() with each
* smaller fragment from a while loop until the entire message is sent. This is
* approach 1.
*/
bool mca_btl_sctp_frag_large_send(mca_btl_sctp_frag_t* frag, int sd, int iov_fragment, int *amt_sent) {
int done = 0;
int count_down = 0;
int cnt = -1;
int data_sent = 0;
int to_send;
struct sockaddr_in btl_sockaddr;
*amt_sent = 0;
/* Determine full size of message that needs to be sent. */
count_down = mca_btl_sctp_frag_get_msg_size(frag);
/* Setup addressing information. */
btl_sockaddr = mca_btl_sctp_utils_sockaddr_from_frag(frag);
while(!done) {
if(frag->iov_ptr->iov_len == 0) {
/* Just used to jump over the 3rd empty iovec in the array. Open MPI
* sets up their message as shown above and so this is added for
* potential compatibility?
*/
frag->iov_ptr++;
}
if(frag->iov_ptr->iov_len <= MCA_BTL_SCTP_MAX_FRAG_SIZE) {
to_send = frag->iov_ptr->iov_len;
} else { /* iov_ptr->iov_len > MCA_BTL_SCTP_MAX_FRAG_SIZE */
to_send = MCA_BTL_SCTP_MAX_FRAG_SIZE;
}
if(mca_btl_sctp_component.sctp_if_11) {
cnt = sctp_sendmsg(sd, frag->iov_ptr->iov_base, to_send, 0, 0, 0, 0, 0, 0, 0 );
} else {
cnt = sctp_sendmsg(sd, frag->iov_ptr->iov_base, to_send,
(struct sockaddr *)&btl_sockaddr, sizeof(btl_sockaddr), 0, 0,
0, 0, 0 );
}
if(cnt >= 0) {
SCTP_BTL_ERROR(("mca_btl_sctp_frag_large_send() sent %d bytes.\n",cnt));
} else {
/* cnt < 0 */
switch(opal_socket_errno) {
case EINTR:
case EWOULDBLOCK:
if(data_sent) {SCTP_BTL_ERROR(("leaving large_send (data_sent = %d)\n",data_sent));}
cnt=0;
break;
case EFAULT:
BTL_ERROR(("writev error (%p, %d)\n\t%s(%d)\n",
frag->iov_ptr[0].iov_base, frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), frag->iov_cnt));
default:
{
BTL_ERROR(("writev failed with errno=%d", opal_socket_errno));
mca_btl_sctp_endpoint_close(frag->endpoint);
return false;
}
}
}
if(cnt > 0) {
/* update frag book-keeping with each iteration */
/* SCTP sends all or nothing */
assert(to_send == cnt);
data_sent += cnt;
*amt_sent = data_sent;
if(frag->iov_ptr->iov_len <= MCA_BTL_SCTP_MAX_FRAG_SIZE)
{
/* completed sending this vector element */
assert(cnt == (int) frag->iov_ptr->iov_len);
frag->iov_ptr++;
frag->iov_idx++;
frag->iov_cnt--;
}
else /* iov_ptr->iov_len > MCA_BTL_SCTP_MAX_FRAG_SIZE */
{
/* sent only a portion of this vector element */
assert(cnt < (int) frag->iov_ptr->iov_len);
assert(cnt == MCA_BTL_SCTP_MAX_FRAG_SIZE);
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
}
}
if(cnt == 0) { /* just in case nothing can be sent, we'll go back to the
* progress function */
return false;
}
if(data_sent == count_down) {
/* If data_sent == count_down then we're done. */
done = 1;
}
}
return (done == 1);
}
/**
* mca_btl_sctp_frag_send(mca_btl_sctp_frag_t* frag, int sd)
* ---------------------------------------------------------------
* Send a message frag.
*/
bool mca_btl_sctp_frag_send(mca_btl_sctp_frag_t* frag, int sd)
{
int zero=0,cnt=-1;
size_t i, num_vecs;
int large_message_send_return;
int large_vector;
/* Check each iov_len field in frag.iov[] and see is any of them are
* above 64K.
*/
size_t count;
int large_msg = 0;
for(count = 0; count < frag->iov_cnt; count++) {
zero += frag->iov_ptr[count].iov_len;
/* True if we have a message that is too long to send in one shot via
* SCTP.
*/
if(frag->iov_ptr[count].iov_len > MCA_BTL_SCTP_MAX_FRAG_SIZE) {
large_msg = 1; /* Set large message flag to true. */
large_vector = count;
break;
}
}
/* if only an empty iov element remains, let it fall
* through in order to decrement the count */
if(0 == zero) {
cnt = 0; /* don't try to send */
}
/* non-blocking write, but continue if interrupted */
if(large_msg) {
large_message_send_return = mca_btl_sctp_frag_large_send(frag, sd, large_vector, &cnt);
}
else if(!large_msg) {
/* Setup addressing information. */
socklen_t len;
struct sockaddr_in btl_sockaddr;
btl_sockaddr = mca_btl_sctp_utils_sockaddr_from_frag(frag);
len = sizeof(struct sockaddr_in);
while(cnt < 0) {
if(mca_btl_sctp_component.sctp_if_11) {
cnt = sctp_writev(sd, frag->iov_ptr, frag->iov_cnt, 0, 0, 0, 0, 0, 0, 0);
} else {
cnt = sctp_writev(sd, frag->iov_ptr, frag->iov_cnt, (struct sockaddr *)&btl_sockaddr, len, 0,
0, 0, 0, 0);
}
if(cnt >= 0) {
SCTP_BTL_ERROR(("mca_btl_sctp_frag_send() sd=%d, sent %d bytes.\n",sd, cnt));
} else {
/* cnt < 0 */
switch(opal_socket_errno) {
case EINTR:
case EWOULDBLOCK:
return false;
case EFAULT:
BTL_ERROR(("writev error (%p, %d)\n\t%s(%d)\n",
frag->iov_ptr[0].iov_base, frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), frag->iov_cnt));
default:
{
BTL_ERROR(("writev failed with errno=%d", opal_socket_errno));
mca_btl_sctp_endpoint_close(frag->endpoint);
return false;
}
}
}
}
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for(i=0; i<num_vecs; i++) {
if(cnt >= (int)frag->iov_ptr->iov_len) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_ptr++;
frag->iov_idx++;
frag->iov_cnt--;
} else {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
break;
}
}
}
return (frag->iov_cnt == 0);
}
/**
* bool mca_btl_sctp_frag_recv(mca_btl_sctp_frag_t *frag, int sd, char *buf, int len)
* ----------------------------------------------------------------------------------
* Recv message frag.
*/
bool mca_btl_sctp_frag_recv(mca_btl_sctp_frag_t* frag, int sd, char *buf, int len)
{
if(mca_btl_sctp_component.sctp_if_11) {
/* 1 to 1 */
int cnt;
size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
repeat11:
num_vecs = frag->iov_cnt;
#if MCA_BTL_SCTP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length = btl_endpoint->endpoint_cache_length;
/* It's strange at the first look but cnt have to be set to the full amount of data available.
* After going to advance_iov_position11 we will use cnt to detect if there is still some
* data pending.
*/
cnt = btl_endpoint->endpoint_cache_length;
for( i = 0; i < frag->iov_cnt; i++ ) {
if( length > frag->iov_ptr[i].iov_len )
length = frag->iov_ptr[0].iov_len;
memcpy( frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length );
btl_endpoint->endpoint_cache_pos += length;
btl_endpoint->endpoint_cache_length -= length;
length = btl_endpoint->endpoint_cache_length;
if( 0 == length ) {
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
break;
}
}
goto advance_iov_position11;
}
/* What's happens if all iovecs are used by the fragment ? It still work, as we reserve one
* iovec for the caching in the fragment structure (the +1).
*/
frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache;
frag->iov_ptr[num_vecs].iov_len = mca_btl_sctp_component.sctp_endpoint_cache;
num_vecs++;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* non-blocking read, but continue if interrupted */
cnt = -1;
while( cnt < 0 ) {
cnt = readv(sd, frag->iov_ptr, num_vecs);
if(cnt >= 0) {SCTP_BTL_ERROR(("readv (sd=%d) %d bytes\n", sd, cnt));}
if(cnt < 0) {
switch(opal_socket_errno) {
case EINTR:
continue;
case ECONNRESET:
case EBADF:
close(sd);
case EWOULDBLOCK:
return false;
case EFAULT:
opal_output( 0, "mca_btl_sctp_frag_send: writev error (%p, %d)\n\t%s(%d)\n",
frag->iov_ptr[0].iov_base, frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), frag->iov_cnt );
default:
opal_output(0, "mca_btl_sctp_frag_send: writev failed with errno=%d",
opal_socket_errno);
mca_btl_sctp_endpoint_close(btl_endpoint);
return false;
}
}
if( cnt == 0 ) {
return false;
}
goto advance_iov_position11;
};
advance_iov_position11:
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for( i = 0; i < num_vecs; i++ ) {
if( cnt >= (int)frag->iov_ptr->iov_len ) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
frag->iov_cnt--;
} else {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
cnt = 0;
break;
}
}
#if MCA_BTL_SCTP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache_length = cnt;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* read header */
if(frag->iov_cnt == 0) {
if (btl_endpoint->endpoint_nbo) {
MCA_BTL_SCTP_HDR_NTOH(frag->hdr);
}
switch(frag->hdr.type) {
case MCA_BTL_SCTP_HDR_TYPE_SEND:
if(frag->iov_idx == 1 && frag->hdr.size) {
frag->iov[1].iov_base = (IOVBASE_TYPE*)(frag+1);
frag->iov[1].iov_len = frag->hdr.size;
frag->segments[0].seg_addr.pval = frag+1;
frag->segments[0].seg_len = frag->hdr.size;
frag->iov_cnt++;
goto repeat11;
}
break;
case MCA_BTL_SCTP_HDR_TYPE_PUT:
if(frag->iov_idx == 1) {
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->segments;
frag->iov[1].iov_len = frag->hdr.count * sizeof(mca_btl_base_segment_t);
frag->iov_cnt++;
goto repeat11;
} else if (frag->iov_idx == 2) {
for(i=0; i<frag->hdr.count; i++) {
frag->iov[i+2].iov_base = (IOVBASE_TYPE*)ompi_ptr_ltop(frag->segments[i].seg_addr.lval);
frag->iov[i+2].iov_len = frag->segments[i].seg_len;
frag->iov_cnt++;
}
goto repeat11;
}
break;
case MCA_BTL_SCTP_HDR_TYPE_GET:
default:
break;
}
return true;
}
return false;
}
else {
int cnt;
size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
/* Ugly way of getting my own macro in to jump back into the recv_handler
* and progress engine so that I can do my next read from the socket.
*/
int done = 0;
repeat:
/* In other words, we've packed the frag with the data from buf and need
* to return to the recv_handler and the subsequent progress engine to
* get another piece of data... hence the notion of 'done.' */
if(done) {
goto ret_false;
}
num_vecs = frag->iov_cnt;
#if MCA_BTL_SCTP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length = btl_endpoint->endpoint_cache_length;
/* It's strange at the first look but cnt have to be set to the full amount of data available.
* After going to advance_iov_position we will use cnt to detect if there is still some
* data pending.
*/
cnt = btl_endpoint->endpoint_cache_length;
for( i = 0; i < frag->iov_cnt; i++ ) {
if( length > frag->iov_ptr[i].iov_len )
length = frag->iov_ptr[0].iov_len;
memcpy( frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length );
btl_endpoint->endpoint_cache_pos += length;
btl_endpoint->endpoint_cache_length -= length;
length = btl_endpoint->endpoint_cache_length;
if( 0 == length ) {
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
break;
}
}
goto advance_iov_position;
}
frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache;
frag->iov_ptr[num_vecs].iov_len = mca_btl_sctp_component.sctp_endpoint_cache;
num_vecs++;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* non-blocking read, but continue if interrupted */
cnt = -1;
while( cnt < 0 ) {
/* Replaces the traditional readv() of the endpoint_recv_handler. */
memcpy(frag->iov_ptr->iov_base, buf, len);
cnt = len;
if(cnt < 0) {
/* TODO move full error handling code to recv_handler */
/* never happens. len would have to passed in -1... */
/* ...plus I don't think the errno from the sctp_recvmsg
* will percolate this far (reset at other syscalls)!
*/
switch(opal_socket_errno) {
case EINTR:
case EWOULDBLOCK:
return false;
case EFAULT:
opal_output( 0, "mca_btl_sctp_frag_send: writev error (%p, %d)\n\t%s(%d)\n",
frag->iov_ptr[0].iov_base, frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), frag->iov_cnt );
default:
opal_output(0, "mca_btl_sctp_frag_send: writev failed with errno=%d",
opal_socket_errno);
mca_btl_sctp_endpoint_close(btl_endpoint);
return false;
}
}
if( cnt == 0 ) {
return false;
}
goto advance_iov_position;
};
advance_iov_position:
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for( i = 0; i < num_vecs; i++ ) {
if( cnt >= (int)frag->iov_ptr->iov_len ) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
frag->iov_cnt--;
} else {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
cnt = 0;
break;
}
}
ret_false:
/* NOT SURE IF I NEED THIS BLOCK... */
/* Further... the reason I do an 'if(done)' check here is that the code
* beneath gets executed along with the 'goto advance_iov_position' which
* is hit elsewhere.
*/
if(done) {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
cnt = 0;
}
/* ...UP TO HERE. */
#if MCA_BTL_SCTP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache_length = cnt;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* read header */
if(frag->iov_cnt == 0) {
if (btl_endpoint->endpoint_nbo) {
MCA_BTL_SCTP_HDR_NTOH(frag->hdr);
}
switch(frag->hdr.type) {
case MCA_BTL_SCTP_HDR_TYPE_SEND:
if(frag->iov_idx == 1 && frag->hdr.size) {
frag->iov[1].iov_base = (IOVBASE_TYPE*)(frag+1);
frag->iov[1].iov_len = frag->hdr.size;
frag->segments[0].seg_addr.pval = frag+1;
frag->segments[0].seg_len = frag->hdr.size;
frag->iov_cnt++;
done = 1;
goto repeat;
}
break;
case MCA_BTL_SCTP_HDR_TYPE_PUT:
if(frag->iov_idx == 1) {
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->segments;
frag->iov[1].iov_len = frag->hdr.count * sizeof(mca_btl_base_segment_t);
frag->iov_cnt++;
done = 1;
goto repeat;
} else if (frag->iov_idx == 2) {
for(i=0; i<frag->hdr.count; i++) {
frag->iov[i+2].iov_base = (IOVBASE_TYPE*)ompi_ptr_ltop(frag->segments[i].seg_addr.lval);
frag->iov[i+2].iov_len = frag->segments[i].seg_len;
frag->iov_cnt++;
}
done = 1;
goto repeat;
}
break;
case MCA_BTL_SCTP_HDR_TYPE_GET:
default:
break;
}
return true;
}
return false;
}
}

138
ompi/mca/btl/sctp/btl_sctp_frag.h Обычный файл
Просмотреть файл

@ -0,0 +1,138 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_FRAG_H
#define MCA_BTL_SCTP_FRAG_H
#define MCA_BTL_SCTP_FRAG_ALIGN (8)
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#include "btl_sctp.h"
#include "btl_sctp_hdr.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
#define MCA_BTL_SCTP_FRAG_IOVEC_NUMBER 4
/**
* SCTP fragment derived type.
*/
struct mca_btl_sctp_frag_t {
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segments[2];
struct mca_btl_base_endpoint_t *endpoint;
struct mca_btl_sctp_module_t* btl;
mca_btl_sctp_hdr_t hdr;
struct iovec iov[MCA_BTL_SCTP_FRAG_IOVEC_NUMBER + 1];
struct iovec *iov_ptr;
size_t iov_cnt;
size_t iov_idx;
size_t size;
int rc;
ompi_free_list_t* my_list;
};
typedef struct mca_btl_sctp_frag_t mca_btl_sctp_frag_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_frag_t);
typedef struct mca_btl_sctp_frag_t mca_btl_sctp_frag_eager_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_frag_eager_t);
typedef struct mca_btl_sctp_frag_t mca_btl_sctp_frag_max_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_frag_max_t);
typedef struct mca_btl_sctp_frag_t mca_btl_sctp_frag_user_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_frag_user_t);
/*
* Macros to allocate/return descriptors from module specific
* free list(s).
*/
#define MCA_BTL_SCTP_FRAG_ALLOC_EAGER(frag, rc) \
{ \
\
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&mca_btl_sctp_component.sctp_frag_eager, item, rc); \
frag = (mca_btl_sctp_frag_t*) item; \
}
#define MCA_BTL_SCTP_FRAG_ALLOC_MAX(frag, rc) \
{ \
\
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&mca_btl_sctp_component.sctp_frag_max, item, rc); \
frag = (mca_btl_sctp_frag_t*) item; \
}
#define MCA_BTL_SCTP_FRAG_ALLOC_USER(frag, rc) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&mca_btl_sctp_component.sctp_frag_user, item, rc); \
frag = (mca_btl_sctp_frag_t*) item; \
}
#define MCA_BTL_SCTP_FRAG_RETURN(frag) \
{ \
OMPI_FREE_LIST_RETURN(frag->my_list, \
(ompi_free_list_item_t*)(frag)); \
}
#define MCA_BTL_SCTP_FRAG_INIT_DST(frag,ep) \
do { \
frag->rc = 0; \
frag->btl = ep->endpoint_btl; \
frag->endpoint = ep; \
frag->iov[0].iov_len = sizeof(frag->hdr); \
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \
frag->iov_cnt = 1; \
frag->iov_idx = 0; \
frag->iov_ptr = frag->iov; \
frag->base.des_src = NULL; \
frag->base.des_dst_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
} while(0)
#define MCA_BTL_SCTP_MAX_FRAG_SIZE 65536
bool mca_btl_sctp_frag_large_send(mca_btl_sctp_frag_t*, int sd, int iov_fragment, int *amt_sent);
int mca_btl_sctp_frag_get_msg_size(mca_btl_sctp_frag_t *frag);
int mca_btl_sctp_frag_get_msg_tag(mca_btl_sctp_frag_t *frag);
bool mca_btl_sctp_frag_send(mca_btl_sctp_frag_t*, int sd);
bool mca_btl_sctp_frag_recv(mca_btl_sctp_frag_t*, int sd, char *buf, int len);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

68
ompi/mca/btl/sctp/btl_sctp_hdr.h Обычный файл
Просмотреть файл

@ -0,0 +1,68 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_HDR_H
#define MCA_BTL_SCTP_HDR_H
#include "ompi_config.h"
#include "ompi/mca/btl/base/base.h"
#include "btl_sctp.h"
#include "opal/types.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* SCTP header.
*/
#define MCA_BTL_SCTP_HDR_TYPE_SEND 1
#define MCA_BTL_SCTP_HDR_TYPE_PUT 2
#define MCA_BTL_SCTP_HDR_TYPE_GET 3
struct mca_btl_sctp_hdr_t {
mca_btl_base_header_t base;
uint8_t type;
uint16_t count;
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
/* uint64_t may be required to be 8 byte aligned. */
uint8_t padding[4];
#endif
uint64_t size;
};
typedef struct mca_btl_sctp_hdr_t mca_btl_sctp_hdr_t;
#define MCA_BTL_SCTP_HDR_HTON(hdr) \
do { \
hdr.count = htons(hdr.count); \
hdr.size = hton64(hdr.size); \
} while (0)
#define MCA_BTL_SCTP_HDR_NTOH(hdr) \
do { \
hdr.count = ntohs(hdr.count); \
hdr.size = ntoh64(hdr.size); \
} while (0)
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

438
ompi/mca/btl/sctp/btl_sctp_proc.c Обычный файл
Просмотреть файл

@ -0,0 +1,438 @@
/*
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include "orte/class/orte_proc_table.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/datatype/dt_arch.h"
#include "btl_sctp.h"
#include "btl_sctp_proc.h"
static void mca_btl_sctp_proc_construct(mca_btl_sctp_proc_t* proc);
static void mca_btl_sctp_proc_destruct(mca_btl_sctp_proc_t* proc);
static bool is_private_ipv4(struct in_addr *in);
OBJ_CLASS_INSTANCE(
mca_btl_sctp_proc_t,
opal_list_item_t,
mca_btl_sctp_proc_construct,
mca_btl_sctp_proc_destruct);
void mca_btl_sctp_proc_construct(mca_btl_sctp_proc_t* proc)
{
proc->proc_ompi = 0;
proc->proc_addrs = NULL;
proc->proc_addr_count = 0;
proc->proc_endpoints = NULL;
proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t);
}
/*
* Cleanup ib proc instance
*/
void mca_btl_sctp_proc_destruct(mca_btl_sctp_proc_t* proc)
{
/* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
orte_hash_table_remove_proc(&mca_btl_sctp_component.sctp_procs, &proc->proc_name);
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
/* release resources */
if(NULL != proc->proc_endpoints) {
free(proc->proc_endpoints);
OBJ_DESTRUCT(&proc->proc_lock);
}
}
/*
* Check to see if an IPv4 struct in_addr is public or private. We
* can only do IPv4 here because some of the SCTP BTL endpoint structs
* only hold the struct in_addr, not the upper-level sin_family that
* would indicate if the address is IPv6.
*/
static bool is_private_ipv4(struct in_addr *in)
{
/* There are definitely ways to do this more efficiently, but
since this is not performance-critical code, it seems better to
use clear code (vs. clever code) */
uint32_t addr = ntohl((uint32_t) in->s_addr);
unsigned int a = (addr & 0xff000000) >> 24;
unsigned int b = (addr & 0x00ff0000) >> 16;
return ((10 == a) ||
(192 == a && 168 == b) ||
(172 == a && 16 == b)) ? true : false;
}
/*
* Create a SCTP process structure. There is a one-to-one correspondence
* between a ompi_proc_t and a mca_btl_sctp_proc_t instance. We cache
* additional data (specifically the list of mca_btl_sctp_endpoint_t instances,
* and published addresses) associated w/ a given destination on this
* datastructure.
*/
mca_btl_sctp_proc_t* mca_btl_sctp_proc_create(ompi_proc_t* ompi_proc)
{
int rc;
size_t size;
mca_btl_sctp_proc_t* btl_proc;
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
btl_proc = (mca_btl_sctp_proc_t*)orte_hash_table_get_proc(
&mca_btl_sctp_component.sctp_procs, &ompi_proc->proc_name);
if(NULL != btl_proc) {
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
return btl_proc;
}
btl_proc = OBJ_NEW(mca_btl_sctp_proc_t);
if(NULL == btl_proc) {
return NULL;
}
btl_proc->proc_ompi = ompi_proc;
btl_proc->proc_name = ompi_proc->proc_name;
/* add to hash table of all proc instance */
orte_hash_table_set_proc(
&mca_btl_sctp_component.sctp_procs,
&btl_proc->proc_name,
btl_proc);
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
/* lookup sctp parameters exported by this proc */
rc = ompi_modex_recv( &mca_btl_sctp_component.super.btl_version,
ompi_proc,
(void**)&btl_proc->proc_addrs,
&size );
if(rc != OMPI_SUCCESS) {
BTL_ERROR(("mca_base_modex_recv: failed with return value=%d", rc));
OBJ_RELEASE(btl_proc);
return NULL;
}
if(0 != (size % sizeof(mca_btl_sctp_addr_t))) {
BTL_ERROR(("mca_base_modex_recv: invalid size %d\n", size));
return NULL;
}
btl_proc->proc_addr_count = size / sizeof(mca_btl_sctp_addr_t);
/* allocate space for endpoint array - one for each exported address */
btl_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(btl_proc->proc_addr_count * sizeof(mca_btl_base_endpoint_t*));
if(NULL == btl_proc->proc_endpoints) {
OBJ_RELEASE(btl_proc);
return NULL;
}
if(NULL == mca_btl_sctp_component.sctp_local && ompi_proc == ompi_proc_local()) {
mca_btl_sctp_component.sctp_local = btl_proc;
}
return btl_proc;
}
/*
* Note that this routine must be called with the lock on the process
* already held. Insert a btl instance into the proc array and assign
* it an address.
*/
int mca_btl_sctp_proc_insert(
mca_btl_sctp_proc_t* btl_proc,
mca_btl_base_endpoint_t* btl_endpoint)
{
struct mca_btl_sctp_module_t *btl_sctp = btl_endpoint->endpoint_btl;
size_t i;
unsigned long net1;
#ifndef WORDS_BIGENDIAN
/* if we are little endian and our peer is not so lucky, then we
need to put all information sent to him in big endian (aka
Network Byte Order) and expect all information received to
be in NBO. Since big endian machines always send and receive
in NBO, we don't care so much about that case. */
if (btl_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
btl_endpoint->endpoint_nbo = true;
}
#endif
/* insert into endpoint array */
btl_endpoint->endpoint_proc = btl_proc;
btl_proc->proc_endpoints[btl_proc->proc_endpoint_count++] = btl_endpoint;
net1 = btl_sctp->sctp_ifaddr.sin_addr.s_addr & btl_sctp->sctp_ifmask.sin_addr.s_addr;
/*
* Look through the proc instance for an address that is on the
* directly attached network. If we don't find one, pick the first
* unused address.
*/
for(i=0; i<btl_proc->proc_addr_count; i++) {
mca_btl_sctp_addr_t* endpoint_addr = btl_proc->proc_addrs + i;
unsigned long net2 = endpoint_addr->addr_inet.s_addr & btl_sctp->sctp_ifmask.sin_addr.s_addr;
if(endpoint_addr->addr_inuse != 0) {
continue;
}
if(net1 == net2) {
btl_endpoint->endpoint_addr = endpoint_addr;
break;
} else if(btl_endpoint->endpoint_addr != 0) {
btl_endpoint->endpoint_addr = endpoint_addr;
}
}
/* Make sure there is a common interface */
if( NULL != btl_endpoint->endpoint_addr ) {
btl_endpoint->endpoint_addr->addr_inuse++;
return OMPI_SUCCESS;
}
/* There was no common interface. So what do we do? For the
moment, we'll do enough to cover 2 common cases:
1. Running MPI processes on two computers that are not on the
same subnet, but still have routable addresses to each
other. In this case, the above subnet matching will fail,
but since the addresses are routable, the
OS/networking/routers will make it all work ok. So we need
to make this function *not* return OMPI_ERR_UNREACH.
2. Running MPI processes on a typical cluster configuration
where a head node has 2 SCTP NICs (one public IP address one
private IP address) and all the back-end compute nodes have
only private IP addresses. In this scenario, the MPI
process on the head node will have 2 SCTP BTL modules (one
for the public, one for the private). The module with the
private IP address will match the subnet and all will work
fine. The module with the public IP address will not match
anything and fall through to here -- we want it to return
OMPI_ERR_UNREACH so that that module will effectively have
no peers that it can communicate with.
To support these two scenarios, do the following:
- if my address is private (10., 192.168., or 172.16.), return
UNREACH.
- if my address is public, return the first public address from
my peer (and hope for the best), or UNREACH if there are none
available.
This does not cover some other scenarios that we'll likely need
to support in the future, such as:
- Flat neighborhood networks -- where all the IP's in question
are private, the subnet masking won't necessarily match, but
they're routable to each other.
- Really large, private SCTP-based clusters, such as a 1024 node
SCTP-based cluster. Depending on how the subnet masks are set
by the admins, there may be a subnet mask that effectively
spans the entire cluster, or (for example) subnet masks may
be set such that only nodes on the same switches are on the
same subnet. This latter scenario will not be supported
by the above cases.
To support these kinds of scenarios, we really need "something
better", such as allowing the user to specify a config file
indicating which subnets are reachable by which interface, etc.
*/
else {
/* If my address is private, return UNREACH */
if (is_private_ipv4(&(btl_sctp->sctp_ifaddr.sin_addr))) {
return OMPI_ERR_UNREACH;
}
/* Find the first public peer address */
for (i = 0; i < btl_proc->proc_addr_count; ++i) {
mca_btl_sctp_addr_t* endpoint_addr = btl_proc->proc_addrs + i;
if (!is_private_ipv4(&(endpoint_addr->addr_inet))) {
btl_endpoint->endpoint_addr = endpoint_addr;
btl_endpoint->endpoint_addr->addr_inuse++;
return OMPI_SUCCESS;
}
}
/* Didn't find any peer addresses that were public, so return
UNREACH */
return OMPI_ERR_UNREACH;
}
}
/*
* Remove an endpoint from the proc array and indicate the address is
* no longer in use.
*/
int mca_btl_sctp_proc_remove(mca_btl_sctp_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
for(i=0; i<btl_proc->proc_endpoint_count; i++) {
if(btl_proc->proc_endpoints[i] == btl_endpoint) {
memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1,
(btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*));
if(--btl_proc->proc_endpoint_count == 0) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
OBJ_RELEASE(btl_proc);
return OMPI_SUCCESS;
}
/* The endpoint_addr may still be NULL if this enpoint is
being removed early in the wireup sequence (e.g., if it
is unreachable by all other procs) */
if (NULL != btl_endpoint->endpoint_addr) {
btl_endpoint->endpoint_addr->addr_inuse--;
}
break;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return OMPI_SUCCESS;
}
/*
* Look for an existing SCTP process instance based on the globally unique
* process identifier.
*/
mca_btl_sctp_proc_t* mca_btl_sctp_proc_lookup(const orte_process_name_t *name)
{
mca_btl_sctp_proc_t* proc;
OPAL_THREAD_LOCK(&mca_btl_sctp_component.sctp_lock);
proc = (mca_btl_sctp_proc_t*)orte_hash_table_get_proc(
&mca_btl_sctp_component.sctp_procs, name);
OPAL_THREAD_UNLOCK(&mca_btl_sctp_component.sctp_lock);
return proc;
}
/*
* loop through all available PTLs for one matching the source address
* of the request.
*/
bool mca_btl_sctp_proc_accept(mca_btl_sctp_proc_t* btl_proc, struct sockaddr_in* addr, int sd)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
for(i=0; i<btl_proc->proc_endpoint_count; i++) {
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
if(mca_btl_sctp_endpoint_accept(btl_endpoint, addr, sd)) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return true;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return false;
}
/**
* int mca_btl_sctp_proc_check
* ----------------------------
* This function simply consults a table of procs and checks if the table at
* position 'id' is valid or not.
*
* TODO - change this to use a hash for constant time performance
*/
int mca_btl_sctp_proc_check(uint32_t id, struct mca_btl_sctp_proc_table_node *table) {
#if MCA_BTL_SCTP_DONT_USE_HASH
int i;
for(i = 0; i < MCA_BTL_SCTP_PROC_TABLE_SIZE; i++) {
/* once invalid is found, can return INVALID_ENTRY (added incrementally) */
if(table[i].valid && table[i].sctp_assoc_id == id) {
return VALID_ENTRY;
} else if(table[i].valid == 0) {
break;
}
}
return INVALID_ENTRY;
#else
mca_btl_sctp_proc_t *val;
int rc = opal_hash_table_get_value_uint32(&mca_btl_sctp_component.sctp_assocID_hash, id, &val);
if(OPAL_SUCCESS == rc) {
return VALID_ENTRY;
} else {
return INVALID_ENTRY;
}
#endif
}
/**
* void mca_btl_sctp_proc_add
* ---------------------------
* Add a proc entry to the table indexed by association id.
*
* TODO change this to a hash table that can expand to eliminate
* MCA_BTL_SCTP_PROC_TABLE_SIZE limitation
*/
void mca_btl_sctp_proc_add(uint32_t id, struct mca_btl_sctp_proc_t *proc, struct mca_btl_sctp_proc_table_node *table) {
#if MCA_BTL_SCTP_DONT_USE_HASH
int i;
for(i = 0; i < MCA_BTL_SCTP_PROC_TABLE_SIZE; i++) {
if(table[i].sctp_assoc_id == 0 && table[i].valid == 0) {
table[i].sctp_assoc_id = id;
table[i].proc = proc;
table[i].valid = 1;
return;
}
}
#else
int rc = opal_hash_table_set_value_uint32(&mca_btl_sctp_component.sctp_assocID_hash, id, proc);
/* TODO handle return code */
#endif
}
/**
* mca_btl_sctp_proc_t* mca_btl_sctp_proc_get
* ------------------------------------------
* Returns pointer to a proc that is indexed by the association id.
*/
mca_btl_sctp_proc_t *mca_btl_sctp_proc_get(uint32_t id, struct mca_btl_sctp_proc_table_node *table) {
#if MCA_BTL_SCTP_DONT_USE_HASH
int i;
for(i = 0; i < MCA_BTL_SCTP_PROC_TABLE_SIZE; i++){
if(table[i].sctp_assoc_id == id) {
return table[i].proc;
}
}
return NULL;
#else
mca_btl_sctp_proc_t *val;
int rc = opal_hash_table_get_value_uint32(&mca_btl_sctp_component.sctp_assocID_hash, id, &val);
if(OPAL_SUCCESS == rc) {
return val;
} else {
return NULL;
}
#endif
}

112
ompi/mca/btl/sctp/btl_sctp_proc.h Обычный файл
Просмотреть файл

@ -0,0 +1,112 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_PROC_H
#define MCA_BTL_SCTP_PROC_H
#include "opal/class/opal_object.h"
#include "orte/mca/ns/ns.h"
#include "ompi/proc/proc.h"
#include "btl_sctp.h"
#include "btl_sctp_addr.h"
#include "btl_sctp_endpoint.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Represents the state of a remote process and the set of addresses
* that it exports. Also cache an instance of mca_btl_base_endpoint_t for
* each
* BTL instance that attempts to open a connection to the process.
*/
struct mca_btl_sctp_proc_t {
opal_list_item_t super;
/**< allow proc to be placed on a list */
ompi_proc_t *proc_ompi;
/**< pointer to corresponding ompi_proc_t */
orte_process_name_t proc_name;
/**< globally unique identifier for the process */
struct mca_btl_sctp_addr_t* proc_addrs;
/**< array of addresses exported by peer */
size_t proc_addr_count;
/**< number of addresses published by endpoint */
struct mca_btl_base_endpoint_t **proc_endpoints;
/**< array of endpoints that have been created to access this proc */
size_t proc_endpoint_count;
/**< number of endpoints */
opal_mutex_t proc_lock;
/**< lock to protect against concurrent access to proc state */
};
typedef struct mca_btl_sctp_proc_t mca_btl_sctp_proc_t;
OBJ_CLASS_DECLARATION(mca_btl_sctp_proc_t);
mca_btl_sctp_proc_t* mca_btl_sctp_proc_create(ompi_proc_t* ompi_proc);
mca_btl_sctp_proc_t* mca_btl_sctp_proc_lookup(const orte_process_name_t* name);
int mca_btl_sctp_proc_insert(mca_btl_sctp_proc_t*, mca_btl_base_endpoint_t*);
int mca_btl_sctp_proc_remove(mca_btl_sctp_proc_t*, mca_btl_base_endpoint_t*);
bool mca_btl_sctp_proc_accept(mca_btl_sctp_proc_t*, struct sockaddr_in*, int);
/**
* Inlined function to return local SCTP proc instance.
*/
static inline mca_btl_sctp_proc_t* mca_btl_sctp_proc_local(void)
{
if(NULL == mca_btl_sctp_component.sctp_local) {
mca_btl_sctp_component.sctp_local = mca_btl_sctp_proc_create(ompi_proc_local());
}
return mca_btl_sctp_component.sctp_local;
}
enum {
INVALID_ENTRY = 0,
VALID_ENTRY = 1
};
/* Table of procs indexed by SCTP association id used by the receiver to
* identify senders. It is initialized to 0 in mca_btl_sctp_component_init().
* NOTE: 256 matches the size of the proc hash_table
*/
#define MCA_BTL_SCTP_PROC_TABLE_SIZE 256
struct mca_btl_sctp_proc_table_node {
int valid;
uint32_t sctp_assoc_id;
struct mca_btl_sctp_proc_t *proc;
};
typedef struct mca_btl_sctp_proc_table_node mca_btl_sctp_proc_table_node;
struct mca_btl_sctp_proc_table_node *recvr_proc_table;
struct mca_btl_sctp_proc_table_node *sender_proc_table;
int mca_btl_sctp_proc_check(uint32_t id, struct mca_btl_sctp_proc_table_node *table);
void mca_btl_sctp_proc_add(uint32_t id, struct mca_btl_sctp_proc_t *proc, struct mca_btl_sctp_proc_table_node *table);
mca_btl_sctp_proc_t *mca_btl_sctp_proc_get(uint32_t id, struct mca_btl_sctp_proc_table_node *table);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

248
ompi/mca/btl/sctp/btl_sctp_recv_handler.c Обычный файл
Просмотреть файл

@ -0,0 +1,248 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "opal/opal_socket_errno.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <string.h>
#include <fcntl.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include "ompi/constants.h"
#include "opal/event/event.h"
#include "opal/util/if.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/ns/ns_types.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/btl/btl.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "btl_sctp.h"
#include "btl_sctp_addr.h"
#include "btl_sctp_proc.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_endpoint.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/datatype/convertor.h"
#include <netinet/sctp.h>
#include "btl_sctp_recv_handler.h"
#include "btl_sctp_component.h"
/**
* void mca_btl_sctp_print_sri(struct sctp_sndrcvinfo *sri)
* --------------------------------------------------------
* Prints out sndrcvinfo data. Used only for diagnostic purposes.
*/
void mca_btl_sctp_print_sri(struct sctp_sndrcvinfo *sri) {
SCTP_BTL_ERROR(("sri->sinfo_stream: %d\n", (int*)(sri->sinfo_stream)));
SCTP_BTL_ERROR(("sri->sinfo_ssn: %d\n", sri->sinfo_ssn));
SCTP_BTL_ERROR(("sri->sinfo_flags: %d\n", sri->sinfo_flags));
SCTP_BTL_ERROR(("sri->sinfo_ppid: %d\n", sri->sinfo_ppid));
SCTP_BTL_ERROR(("sri->sinfo_context: %d\n", sri->sinfo_context));
SCTP_BTL_ERROR(("sri->sinfo_timetolive: %d\n", sri->sinfo_timetolive));
SCTP_BTL_ERROR(("sri->sinfo_tsn: %d\n", sri->sinfo_tsn));
SCTP_BTL_ERROR(("sri->sinfo_cumtsn: %d\n", sri->sinfo_cumtsn));
SCTP_BTL_ERROR(("sri->sinfo_assoc_id: %u\n", sri->sinfo_assoc_id));
}
/* setup receive buffer */
static char * sctp_recv_buf = NULL;
int mca_btl_sctp_recv_handler_initbuf() {
return (NULL != (sctp_recv_buf = (char *) malloc(mca_btl_sctp_component.sctp_rcvbuf + 1)));
}
/* free receive buffer */
void mca_btl_sctp_recv_handler_freebuf() {
if(sctp_recv_buf) {
free(sctp_recv_buf);
sctp_recv_buf = NULL;
}
}
/**
* void mca_btl_sctp_recv_handler(int sd, short flags, void *user)
* ---------------------------------------------------------------
* General callback function for when we have an event on our one to many SCTP
* socket.
*/
void mca_btl_sctp_recv_handler(int sd, short flags, void *user) {
/* allocated this elsewhere only once per BTL to avoid repeatedly calling malloc */
char *buf = sctp_recv_buf;
orte_process_name_t guid;
struct sockaddr_in their_addr;
int retval;
mca_btl_sctp_proc_t *btl_proc;
/* mca_btl_sctp_event_t *event = (mca_btl_sctp_event_t *)user; */
int msg_flags;
socklen_t len = sizeof(struct sockaddr_in);
struct sctp_sndrcvinfo sri;
recv_handler_1_to_many_loop:
/* Receive data from the socket. */
retval = sctp_recvmsg(sd, buf, mca_btl_sctp_component.sctp_rcvbuf,
(struct sockaddr *)&their_addr, &len, &sri, &msg_flags);
/* TODO move full error handling code from frag_recv to here. Can we? See next TODO... */
if(-1 == retval && (errno == EAGAIN || errno == EINTR)) {
return;
} else if(-1 == retval) {
perror("retval is -1");
/* TODO can we determine the endpoint from the sri if
* this is an error? */
/* mca_btl_sctp_endpoint_close(btl_endpoint); */
return;
}
SCTP_BTL_ERROR(("mca_btl_sctp_recv_handler(): got %d bytes.\n", retval));
/* Print sndrcvinfo data. */
mca_btl_sctp_print_sri(&sri);
/* Check if sender is known to us. */
if((mca_btl_sctp_proc_check(((uint32_t)(sri.sinfo_assoc_id)), recvr_proc_table)) == VALID_ENTRY) {
mca_btl_base_endpoint_t *btl_endpoint;
mca_btl_sctp_frag_t* frag;
btl_proc = mca_btl_sctp_proc_get(sri.sinfo_assoc_id, recvr_proc_table);
btl_endpoint = btl_proc->proc_endpoints[0];
assert(btl_proc->proc_endpoint_count == 1); /* true for 1-many */
/* can't thread lock until after recv call since can't specify
* the receiver in sctp_recvmsg on a one-to-many socket... is
* this a problem?
*/
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
frag = btl_endpoint->endpoint_recv_frag;
if(NULL == frag) {
int rc;
if(mca_btl_sctp_module.super.btl_max_send_size >
mca_btl_sctp_module.super.btl_eager_limit) {
MCA_BTL_SCTP_FRAG_ALLOC_MAX(frag, rc);
} else {
MCA_BTL_SCTP_FRAG_ALLOC_EAGER(frag, rc);
}
if(NULL == frag) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
}
MCA_BTL_SCTP_FRAG_INIT_DST(frag, btl_endpoint);
}
#if MCA_BTL_SCTP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */
if(mca_btl_sctp_frag_recv(frag, sd, buf, retval) == false) {
btl_endpoint->endpoint_recv_frag = frag;
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return; /* EAGAIN occurred underneath so stop looping */
} else {
btl_endpoint->endpoint_recv_frag = NULL;
switch(frag->hdr.type) {
case MCA_BTL_SCTP_HDR_TYPE_SEND:
{
mca_btl_base_recv_reg_t* reg = frag->btl->sctp_reg + frag->hdr.base.tag;
reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
break;
}
default:
break;
}
#if MCA_BTL_SCTP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
/* If the cache still contain some data we can reuse the same fragment
* until we flush it completly.
*/
MCA_BTL_SCTP_FRAG_INIT_DST(frag, btl_endpoint);
goto data_still_pending_on_endpoint;
}
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
MCA_BTL_SCTP_FRAG_RETURN(frag);
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
#if MCA_BTL_SCTP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
}
/* If not known then data recvd should be GUID. */
else {
/* TODO OPAL_THREAD_LOCK when endpoint unknown? */
if(retval != sizeof(guid)) {
BTL_ERROR(("Unexpected size of GUID.\n"));
return;
}
/* Setup guid. */
memcpy(&guid, buf, retval);
ORTE_PROCESS_NAME_NTOH(guid);
/* lookup the corresponding process */
btl_proc = mca_btl_sctp_proc_lookup(&guid);
if(NULL == btl_proc) {
BTL_ERROR(("errno=%d",errno));
CLOSE_THE_SOCKET(sd);
return;
}
mca_btl_sctp_proc_add((uint32_t)sri.sinfo_assoc_id, btl_proc, recvr_proc_table);
/* are there any existing peer instances will to accept this connection */
if(mca_btl_sctp_proc_accept(btl_proc, &their_addr, sd) == false) {
BTL_ERROR(("no one accepted!\n"));
CLOSE_THE_SOCKET(sd);
}
}
goto recv_handler_1_to_many_loop;
}

27
ompi/mca/btl/sctp/btl_sctp_recv_handler.h Обычный файл
Просмотреть файл

@ -0,0 +1,27 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef BTL_SCTP_RECV_HANDLER_H
#define BTL_SCTP_RECV_HANDLER_H
void mca_btl_sctp_print_sri(struct sctp_sndrcvinfo *sri);
void mca_btl_sctp_recv_handler(int sd, short flags, void *user);
int mca_btl_sctp_recv_handler_initbuf(void);
void mca_btl_sctp_recv_handler_freebuf(void);
#endif

55
ompi/mca/btl/sctp/btl_sctp_utils.c Обычный файл
Просмотреть файл

@ -0,0 +1,55 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/* *****************************************************************************
* General purpose routines.
* ***************************************************************************
*/
#include "btl_sctp_utils.h"
/**
* struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_frag
* --------------------------------------------------------
* Returns a sockaddr_in struct associated with the mca_btl_sctp_frag_t *frag.
*/
struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_frag(struct mca_btl_sctp_frag_t *frag) {
struct sockaddr_in btl_sockaddr;
bzero(&btl_sockaddr, sizeof(struct sockaddr_in));
btl_sockaddr.sin_family = AF_INET;
btl_sockaddr.sin_port = frag->endpoint->endpoint_addr->addr_port;
btl_sockaddr.sin_addr.s_addr = frag->endpoint->endpoint_addr->addr_inet.s_addr;
return btl_sockaddr;
}
/**
* struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_endpoint
* ------------------------------------------------------------
* Returns a sockaddr_in struct associated with the mca_btl_base_endpoint_t *ep.
*/
struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_endpoint(struct mca_btl_base_endpoint_t *ep) {
struct sockaddr_in btl_sockaddr;
bzero(&btl_sockaddr, sizeof(struct sockaddr_in));
btl_sockaddr.sin_family = AF_INET;
btl_sockaddr.sin_port = ep->endpoint_addr->addr_port;
btl_sockaddr.sin_addr.s_addr = ep->endpoint_addr->addr_inet.s_addr;
return btl_sockaddr;
}

30
ompi/mca/btl/sctp/btl_sctp_utils.h Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_UTILS_H
#define MCA_BTL_SCTP_UTILS_H
#include "btl_sctp.h"
#include "btl_sctp_frag.h"
#include "btl_sctp_endpoint.h"
#include "btl_sctp_addr.h"
struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_frag(struct mca_btl_sctp_frag_t *frag);
struct sockaddr_in mca_btl_sctp_utils_sockaddr_from_endpoint(struct mca_btl_base_endpoint_t *ep);
#endif

104
ompi/mca/btl/sctp/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,104 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# OMPI_CHECK_SCTP(prefix, [action-if-found], [action-if-not-found])
# --------------------------------------------------------
# check if SCTP support can be found. sets prefix_{CPPFLAGS,
# LDFLAGS, LIBS} as needed and runs action-if-found if there is
# support, otherwise executes action-if-not-found
AC_DEFUN([OMPI_CHECK_SCTP],[
AC_ARG_WITH([sctp],
[AC_HELP_STRING([--with-sctp(=DIR)],
[Build SCTP support, searching for libraries in DIR])])
AC_ARG_WITH([sctp-libdir],
[AC_HELP_STRING([--with-sctp-libdir=DIR],
[Search for SCTP libraries in DIR])])
btl_sctp_CFLAGS="`echo $CFLAGS`"
#only try to build this on Linux or some BSD variant
ompi_sctp_try_to_build="no"
case "$host" in
*linux*)
ompi_sctp_try_to_build="yes"
;;
*bsd*)
# only add -DFREEBSD once to get extra sin_len field
btl_sctp_CFLAGS="`echo $btl_sctp_CFLAGS | sed 's/-DFREEBSD//g'`"
btl_sctp_CFLAGS="$btl_sctp_CFLAGS -DFREEBSD"
ompi_sctp_try_to_build="yes"
AC_MSG_WARN([Adding -DFREEBSD to set extra sin_len field in sockaddr.])
;;
#TODO add Mac OS X support for SCTP NKE. Adjustments should look like *bsd*...
*)
AC_MSG_WARN([Only build sctp BTL on Linux and BSD variants])
;;
esac
AS_IF([test "$with_sctp" != "no" -a "$ompi_sctp_try_to_build" = "yes"],
[AS_IF([test ! -z "$with_sctp" -a "$with_sctp" != "yes"],
[ompi_check_sctp_dir="$with_sctp"])
AS_IF([test ! -z "$with_sctp_libdir" -a "$with_sctp_libdir" != "yes"],
[ompi_check_sctp_libdir="$with_sctp_libdir"])
# TODO how to structure this if some OS's have the SCTP API calls in libc and some
# in libsctp ? For now, assume libsctp and have user make softlink to libc if
# libsctp does not exist...
OMPI_CHECK_PACKAGE([$1],
[netinet/sctp.h],
[sctp],
[sctp_recvmsg],
[],
[$ompi_check_sctp_dir],
[$ompi_check_sctp_libdir],
[ompi_check_sctp_happy="yes"],
[ompi_check_sctp_happy="no"])
],
[ompi_check_sctp_happy="no"])
AS_IF([test "$ompi_check_sctp_happy" = "yes"],
[$2],
[AS_IF([test ! -z "$with_sctp" -a "$with_sctp" != "no"],
[AC_MSG_ERROR([SCTP support requested but not found. Aborting])])
$3])
])
# MCA_btl_sctp_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_btl_sctp_CONFIG],[
OMPI_CHECK_SCTP([btl_sctp],
[btl_sctp_happy="yes"],
[btl_sctp_happy="no"])
AS_IF([test "$btl_sctp_happy" = "yes"],
[btl_sctp_WRAPPER_EXTRA_LDFLAGS="$btl_sctp_LDFLAGS"
btl_sctp_WRAPPER_EXTRA_LIBS="$btl_sctp_LIBS"
btl_sctp_WRAPPER_EXTRA_CPPFLAGS="$btl_sctp_CPPFLAGS"
btl_sctp_WRAPPER_EXTRA_CFLAGS="$btl_sctp_CFLAGS"
$1],
[$2])
# substitute in the things needed to build sctp
AC_SUBST([btl_sctp_CFLAGS])
AC_SUBST([btl_sctp_CPPFLAGS])
AC_SUBST([btl_sctp_LDFLAGS])
AC_SUBST([btl_sctp_LIBS])
])dnl

24
ompi/mca/btl/sctp/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,24 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2007 Los Alamos National Security, LLC. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_CONFIG_FILES="Makefile"

98
ompi/mca/btl/sctp/sctp_writev.c Обычный файл
Просмотреть файл

@ -0,0 +1,98 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/* SCTP kernel reference Implementation: User API extensions.
*
* sendmsg.c
*
* Distributed under the terms of the LGPL v2.1 as described in
* http://www.gnu.org/copyleft/lesser.txt
*
* This file is part of the user library that offers support for the
* SCTP kernel reference Implementation. The main purpose of this
* code is to provide the SCTP Socket API mappings for user
* application to interface with the SCTP in kernel.
*
* This implementation is based on the Socket API Extensions for SCTP
* defined in <draft-ietf-tsvwg-sctpsocket-10.txt>
*
* Copyright (c) 2003 Intel Corp.
*
* Written or modified by:
* Ardelle Fan <ardelle.fan@intel.com>
*/
/* Small modifications made by Karol Mroz (kmroz). */
/* #include <string.h>
* #include <sys/socket.h>
* #include <netinet/sctp.h>
*/
#include "sctp_writev.h"
/* This library function assists the user with the advanced features
* of SCTP. This is a new SCTP API described in the section 8.7 of the
* Sockets API Extensions for SCTP. This is implemented using the
* sendmsg() interface.
*
* kmroz: Modification to this was trivial. const char* was replaced with const
* struct iovec* and the sendmsg call was replaced with a call to writev that
* takes *vector as a parameter.
*/
int
sctp_writev(int s, /* const */ struct iovec *vector, size_t len, struct sockaddr *to,
socklen_t tolen, uint32_t ppid, uint32_t flags,
uint16_t stream_no, uint32_t timetolive, uint32_t context)
{
struct msghdr outmsg;
struct iovec iov;
char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
outmsg.msg_name = to;
outmsg.msg_namelen = tolen;
/* outmsg.msg_iov = &iov; */
outmsg.msg_iov = vector;
iov.iov_base = vector->iov_base;
iov.iov_len = vector->iov_len;
outmsg.msg_iovlen = 1;
outmsg.msg_control = outcmsg;
outmsg.msg_controllen = sizeof(outcmsg);
outmsg.msg_flags = 0;
cmsg = CMSG_FIRSTHDR(&outmsg);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
outmsg.msg_controllen = cmsg->cmsg_len;
sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = ppid;
sinfo->sinfo_flags = flags;
sinfo->sinfo_stream = stream_no;
sinfo->sinfo_timetolive = timetolive;
sinfo->sinfo_context = context;
return sendmsg(s, &outmsg, 0);
/* return writev(s, vector, len); */
}

35
ompi/mca/btl/sctp/sctp_writev.h Обычный файл
Просмотреть файл

@ -0,0 +1,35 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_SCTP_WRITEV_H
#define MCA_BTL_SCTP_WRITEV_H
#include <string.h>
#include <sys/socket.h>
#include <netinet/sctp.h>
/*
* sctp_writev.h
* -------------
*/
int sctp_writev(int s, /* const */ struct iovec *vector, size_t len, struct sockaddr *to,
socklen_t tolen, uint32_t ppid, uint32_t flags,
uint16_t stream_no, uint32_t timetolive, uint32_t context);
#endif