1
1
http://www.open-mpi.org/community/lists/devel/2011/01/8894.php
    http://blogs.cisco.com/performance/building-3rd-party-open-mpi-components/

Contribute a sample of how to build MCA components outside of the Open
MPI source tree.

This commit was SVN r24346.
Этот коммит содержится в:
Jeff Squyres 2011-02-02 17:11:33 +00:00
родитель 0d08f636b0
Коммит 2755a9f261
19 изменённых файлов: 4866 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,46 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2009 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2009-2011 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
EXTRA_DIST = autogen.sh
dist_pkgdata_DATA = help-mpi-btl-tcp2.txt
sources = \
btl_tcp2.c \
btl_tcp2.h \
btl_tcp2_addr.h \
btl_tcp2_component.c \
btl_tcp2_endpoint.c \
btl_tcp2_endpoint.h \
btl_tcp2_frag.c \
btl_tcp2_frag.h \
btl_tcp2_hdr.h \
btl_tcp2_proc.c \
btl_tcp2_proc.h \
btl_tcp2_ft.c \
btl_tcp2_ft.h
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = mca_btl_tcp2.la
mca_btl_tcp2_la_SOURCES = $(sources)
mca_btl_tcp2_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -0,0 +1,161 @@
2 Feb 2011
Description
===========
This sample "tcp2" BTL component is a simple example of how to build
an Open MPI MCA component from outside of the Open MPI source tree.
This is a valuable technique for 3rd parties who want to provide their
own components for Open MPI, but do not want to be in the mainstream
distribution (i.e., their code is not part of the main Open MPI code
base).
NOTE: We do recommend that 3rd party developers investigate using a
DVCS such as Mercurial or Git to keep up with Open MPI
development. Using a DVCS allows you to host your component in
your own copy of the Open MPI source tree, and yet still keep up
with development changes, stable releases, etc.
Previous colloquial knowledge held that building a component from
outside of the Open MPI source tree required configuring Open MPI
--with-devel-headers, and then building and installing it. This
configure switch installs all of OMPI's internal .h files under
$prefix/include/openmpi, and therefore allows 3rd party code to be
compiled outside of the Open MPI tree.
This method definitely works, but is annoying:
* You have to ask users to use this special configure switch.
* Not all users install from source; many get binary packages (e.g.,
RPMs).
This example package shows two ways to build an Open MPI MCA component
from outside the Open MPI source tree:
1. Using the above --with-devel-headers technique
2. Compiling against the Open MPI source tree itself (vs. the
installation tree)
The user still has to have a source tree, but at least they don't have
to be required to use --with-devel-headers (which most users don't) --
they can likely build off the source tree that they already used.
Example project contents
========================
The "tcp2" component is a direct copy of the TCP BTL -- it has just
been renamed so that it can be built separately and installed
alongside the real TCP BTL component.
Most of the mojo for both methods is handled in the example
components' configure.ac, but the same techniques are applicable
outside of the GNU Auto toolchain.
This sample "tcp2" component has an autogen.sh script that requires
the normal Autoconf, Automake, and Libtool. It also adds the
following two configure switches:
--with-openmpi-install=DIR
If provided, DIR is an Open MPI installation tree that was
installed --with-devel-headers.
This switch uses the installed mpicc --showme:<foo> functionality
to extract the relevant CPPFLAGS, LDFLAGS, and LIBS.
--with-openmpi-source=DIR
If provided, DIR is the source of a configured and built Open MPI
source tree (corresponding to the version expected by the example
component). The source tree is not required to have been
configured --with-devel-headers.
This switch uses the source tree's config.status script to extract
the relevant CPPFLAGS and CFLAGS.
Either one of these two switches must be provided, or appropriate
CPPFLAGS, CFLAGS, LDFLAGS, and/or LIBS must be provided such that
valid Open MPI header and library files can be found and compiled /
linked against, respectively.
Example use
===========
First, download, build, and install Open MPI:
-----
$ cd $HOME
$ wget \
http://www.open-mpi.org/software/ompi/v1.5/downloads/openmpi-1.5.1.tar.bz2
[lots of output]
$ tar jxf openmpi-1.5.1.tar.bz2
$ cd openmpi-1.5.1
$ ./configure --prefix=/opt/openmpi ...
[lots of output]
$ make -j 4 install
[lots of output]
$ /opt/openmpi/bin/ompi_info | grep btl
MCA btl: self (MCA v2.0, API v2.0, Component v1.5.2)
MCA btl: sm (MCA v2.0, API v2.0, Component v1.5.2)
MCA btl: tcp (MCA v2.0, API v2.0, Component v1.5.2)
$
-----
Notice the installed BTLs from ompi_info.
Now cd into the v1.5 directory in this example project and build it,
pointing it to the source directory of the Open MPI that you just
built. Note that we use the same --prefix as when installing Open MPI
(so that the built component will be installed into the Right place):
-----
$ cd /path/to/this/sample/component/v1.5
$ ./autogen.sh
$ ./configure --prefix=/opt/openmpi --with-openmpi-source=$HOME/openmpi-1.5.1
[lots of output]
$ make -j 4 install
[lots of output]
$ /opt/openmpi/bin/ompi_info | grep btl
MCA btl: self (MCA v2.0, API v2.0, Component v1.5.2)
MCA btl: sm (MCA v2.0, API v2.0, Component v1.5.2)
MCA btl: tcp (MCA v2.0, API v2.0, Component v1.5.2)
MCA btl: tcp2 (MCA v2.0, API v2.0, Component v1.5.2)
$
-----
Notice that the "tcp2" BTL is now installed.
Random notes
============
The component in this project is just an example; I whipped it up in
the span of several hours. Your component may be a bit more complex
than this or have slightly different requirements. So you may need to
tweak the configury or build system in each of the components to fit
what you need.
Changes required to the component to make it build in a standalone
mode:
1. Write your own configure script. This component is just a sample.
You basically need to build against an OMPI install that was
installed --with-devel-headers or a built OMPI source tree. See
./configure --help for details.
2. I also provided a bogus btl_tcp2_config.h (generated by configure).
This file is not included anywhere, but it does provide protection
against re-defined PACKAGE_* macros when running configure, which
is quite annoying.
3. Modify Makefile.am to only build DSOs. I.e., you can optionally
take the static option out since the component can *only* build in
DSO mode when building standalone. That being said, it doesn't
hurt to leave the static builds in -- this would (hypothetically)
allow the component to be built both in-tree and out-of-tree.
Ping the Open MPI devel list if you have questions about this
project.
Enjoy.
- Jeff Squyres

2
contrib/build-mca-comps-outside-of-tree/autogen.sh Исполняемый файл
Просмотреть файл

@ -0,0 +1,2 @@
:
autoreconf -ivf

Просмотреть файл

@ -0,0 +1,72 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_TCP_ADDR_H
#define MCA_BTL_TCP_ADDR_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
/**
* Structure used to publish TCP connection information to peers.
*/
struct mca_btl_tcp2_addr_t {
/* the following information is exchanged between different
machines (read: byte order), so use network byte order
for everything and don't add padding
*/
#if OPAL_WANT_IPV6
struct in6_addr addr_inet; /**< IPv4/IPv6 listen address > */
#else
/* Bug, FIXME: needs testing */
struct my_in6_addr {
union {
uint32_t u6_addr32[4];
struct _my_in6_addr {
struct in_addr _addr_inet;
uint32_t _pad[3];
} _addr__inet;
} _union_inet;
} addr_inet;
#endif
in_port_t addr_port; /**< listen port */
uint16_t addr_ifkindex; /**< remote interface index assigned with
this address */
unsigned short addr_inuse; /**< local meaning only */
uint8_t addr_family; /**< AF_INET or AF_INET6 */
};
typedef struct mca_btl_tcp2_addr_t mca_btl_tcp2_addr_t;
#define MCA_BTL_TCP_AF_INET 0
#if OPAL_WANT_IPV6
# define MCA_BTL_TCP_AF_INET6 1
#endif
#endif

Просмотреть файл

@ -0,0 +1,487 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "opal/class/opal_bitmap.h"
#include "ompi/mca/btl/btl.h"
#include "btl_tcp2.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_proc.h"
#include "btl_tcp2_endpoint.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/proc/proc.h"
mca_btl_tcp2_module_t mca_btl_tcp2_module = {
{
&mca_btl_tcp2_component.super,
0, /* max size of first fragment */
0, /* min send fragment size */
0, /* max send fragment size */
0, /* btl_rdma_pipeline_send_length */
0, /* btl_rdma_pipeline_frag_size */
0, /* btl_min_rdma_pipeline_size */
0, /* exclusivity */
0, /* latency */
0, /* bandwidth */
0, /* flags */
mca_btl_tcp2_add_procs,
mca_btl_tcp2_del_procs,
NULL,
mca_btl_tcp2_finalize,
mca_btl_tcp2_alloc,
mca_btl_tcp2_free,
mca_btl_tcp2_prepare_src,
mca_btl_tcp2_prepare_dst,
mca_btl_tcp2_send,
NULL, /* send immediate */
mca_btl_tcp2_put,
NULL, /* get */
mca_btl_base_dump,
NULL, /* mpool */
NULL, /* register error */
mca_btl_tcp2_ft_event
}
};
/**
*
*/
int mca_btl_tcp2_add_procs( struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **ompi_procs,
struct mca_btl_base_endpoint_t** peers,
opal_bitmap_t* reachable )
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*)btl;
ompi_proc_t* my_proc; /* pointer to caller's proc structure */
int i, rc;
/* get pointer to my proc structure */
my_proc = ompi_proc_local();
if( NULL == my_proc ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
for(i = 0; i < (int) nprocs; i++) {
struct ompi_proc_t* ompi_proc = ompi_procs[i];
mca_btl_tcp2_proc_t* tcp_proc;
mca_btl_base_endpoint_t* tcp_endpoint;
/* Do not create loopback TCP connections */
if( my_proc == ompi_proc ) {
continue;
}
if(NULL == (tcp_proc = mca_btl_tcp2_proc_create(ompi_proc))) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*
* Check to make sure that the peer has at least as many interface
* addresses exported as we are trying to use. If not, then
* don't bind this BTL instance to the proc.
*/
OPAL_THREAD_LOCK(&tcp_proc->proc_lock);
/* The btl_proc datastructure is shared by all TCP BTL
* instances that are trying to reach this destination.
* Cache the peer instance on the btl_proc.
*/
tcp_endpoint = OBJ_NEW(mca_btl_tcp2_endpoint_t);
if(NULL == tcp_endpoint) {
OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
tcp_endpoint->endpoint_btl = tcp_btl;
rc = mca_btl_tcp2_proc_insert(tcp_proc, tcp_endpoint);
if(rc != OMPI_SUCCESS) {
OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
OBJ_RELEASE(tcp_endpoint);
continue;
}
opal_bitmap_set_bit(reachable, i);
OPAL_THREAD_UNLOCK(&tcp_proc->proc_lock);
peers[i] = tcp_endpoint;
opal_list_append(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
/* we increase the count of MPI users of the event library
once per peer, so that we are used until we aren't
connected to a peer */
opal_progress_event_users_increment();
}
return OMPI_SUCCESS;
}
int mca_btl_tcp2_del_procs(struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t ** endpoints)
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*)btl;
size_t i;
for(i=0; i<nprocs; i++) {
mca_btl_tcp2_endpoint_t* tcp_endpoint = endpoints[i];
if(tcp_endpoint->endpoint_proc != mca_btl_tcp2_proc_local()) {
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
OBJ_RELEASE(tcp_endpoint);
}
opal_progress_event_users_decrement();
}
return OMPI_SUCCESS;
}
/**
* Allocate a segment.
*
* @param btl (IN) BTL module
* @param size (IN) Request segment size.
*/
mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
uint8_t order,
size_t size,
uint32_t flags)
{
mca_btl_tcp2_frag_t* frag = NULL;
int rc;
if(size <= btl->btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag, rc);
} else if (size <= btl->btl_max_send_size) {
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag, rc);
}
if( OPAL_UNLIKELY(NULL == frag) ) {
return NULL;
}
frag->segments[0].seg_len = size;
frag->segments[0].seg_addr.pval = frag+1;
frag->base.des_src = frag->segments;
frag->base.des_src_cnt = 1;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER;
frag->btl = (mca_btl_tcp2_module_t*)btl;
return (mca_btl_base_descriptor_t*)frag;
}
/**
* Return a segment
*/
int mca_btl_tcp2_free(
struct mca_btl_base_module_t* btl,
mca_btl_base_descriptor_t* des)
{
mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)des;
MCA_BTL_TCP_FRAG_RETURN(frag);
return OMPI_SUCCESS;
}
/**
* Pack data and return a descriptor that can be
* used for send/put.
*
* @param btl (IN) BTL module
* @param peer (IN) BTL peer addressing
*/
mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_src(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_mpool_base_registration_t* registration,
struct opal_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size,
uint32_t flags)
{
mca_btl_tcp2_frag_t* frag;
struct iovec iov;
uint32_t iov_count = 1;
size_t max_data = *size;
int rc;
if( OPAL_UNLIKELY(max_data > UINT32_MAX) ) { /* limit the size to what we support */
max_data = (size_t)UINT32_MAX;
}
/*
* if we aren't pinning the data and the requested size is less
* than the eager limit pack into a fragment from the eager pool
*/
if (max_data+reserve <= btl->btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag, rc);
} else {
/*
* otherwise pack as much data as we can into a fragment
* that is the max send size.
*/
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag, rc);
}
if( OPAL_UNLIKELY(NULL == frag) ) {
return NULL;
}
frag->segments[0].seg_addr.pval = (frag + 1);
frag->segments[0].seg_len = reserve;
frag->base.des_src_cnt = 1;
if(opal_convertor_need_buffers(convertor)) {
if (max_data + reserve > frag->size) {
max_data = frag->size - reserve;
}
iov.iov_len = max_data;
iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)(frag->segments[0].seg_addr.pval)) + reserve);
rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
if( OPAL_UNLIKELY(rc < 0) ) {
mca_btl_tcp2_free(btl, &frag->base);
return NULL;
}
frag->segments[0].seg_len += max_data;
} else {
iov.iov_len = max_data;
iov.iov_base = NULL;
rc = opal_convertor_pack(convertor, &iov, &iov_count, &max_data );
if( OPAL_UNLIKELY(rc < 0) ) {
mca_btl_tcp2_free(btl, &frag->base);
return NULL;
}
frag->segments[1].seg_addr.pval = iov.iov_base;
frag->segments[1].seg_len = max_data;
frag->base.des_src_cnt = 2;
}
frag->base.des_src = frag->segments;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER;
*size = max_data;
return &frag->base;
}
/**
* Prepare a descriptor for send/rdma using the supplied
* convertor. If the convertor references data that is contigous,
* the descriptor may simply point to the user buffer. Otherwise,
* this routine is responsible for allocating buffer space and
* packing if required.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL peer addressing
* @param convertor (IN) Data type convertor
* @param reserve (IN) Additional bytes requested by upper layer to precede user data
* @param size (IN/OUT) Number of bytes to prepare (IN), number of bytes actually prepared (OUT)
*/
mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_dst(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_mpool_base_registration_t* registration,
struct opal_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size,
uint32_t flags)
{
mca_btl_tcp2_frag_t* frag;
int rc;
if( OPAL_UNLIKELY((*size) > UINT32_MAX) ) { /* limit the size to what we support */
*size = (size_t)UINT32_MAX;
}
MCA_BTL_TCP_FRAG_ALLOC_USER(frag, rc);
if( OPAL_UNLIKELY(NULL == frag) ) {
return NULL;
}
frag->segments->seg_len = *size;
opal_convertor_get_current_pointer( convertor, (void**)&(frag->segments->seg_addr.pval) );
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = frag->segments;
frag->base.des_dst_cnt = 1;
frag->base.des_flags = flags;
frag->base.order = MCA_BTL_NO_ORDER;
return &frag->base;
}
/**
* Initiate an asynchronous send.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transfered
* @param tag (IN) The tag value used to notify the peer.
*/
int mca_btl_tcp2_send( struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
mca_btl_base_tag_t tag )
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
int i;
frag->btl = tcp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->iov_cnt = 1;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->hdr.size = 0;
for( i = 0; i < (int)frag->base.des_src_cnt; i++) {
frag->hdr.size += frag->segments[i].seg_len;
frag->iov[i+1].iov_len = frag->segments[i].seg_len;
frag->iov[i+1].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
frag->iov_cnt++;
}
frag->hdr.base.tag = tag;
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_SEND;
frag->hdr.count = 0;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return mca_btl_tcp2_endpoint_send(endpoint,frag);
}
/**
* Initiate an asynchronous put.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
int mca_btl_tcp2_put( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor )
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
int i;
frag->btl = tcp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->hdr.size = 0;
frag->iov_cnt = 2;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_dst;
frag->iov[1].iov_len = frag->base.des_dst_cnt * sizeof(mca_btl_base_segment_t);
for( i = 0; i < (int)frag->base.des_src_cnt; i++ ) {
frag->hdr.size += frag->segments[i].seg_len;
frag->iov[i+2].iov_len = frag->segments[i].seg_len;
frag->iov[i+2].iov_base = (IOVBASE_TYPE*)frag->segments[i].seg_addr.pval;
frag->iov_cnt++;
}
frag->hdr.base.tag = MCA_BTL_TAG_BTL;
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
frag->hdr.count = frag->base.des_dst_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((i = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : i);
}
/**
* Initiate an asynchronous get.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*
*/
int mca_btl_tcp2_get(
mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint,
mca_btl_base_descriptor_t* descriptor)
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
mca_btl_tcp2_frag_t* frag = (mca_btl_tcp2_frag_t*)descriptor;
int rc;
frag->btl = tcp_btl;
frag->endpoint = endpoint;
frag->rc = 0;
frag->iov_idx = 0;
frag->hdr.size = 0;
frag->iov_cnt = 2;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->base.des_src;
frag->iov[1].iov_len = frag->base.des_src_cnt * sizeof(mca_btl_base_segment_t);
frag->hdr.base.tag = MCA_BTL_TAG_BTL;
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_GET;
frag->hdr.count = frag->base.des_src_cnt;
if (endpoint->endpoint_nbo) MCA_BTL_TCP_HDR_HTON(frag->hdr);
return ((rc = mca_btl_tcp2_endpoint_send(endpoint,frag)) >= 0 ? OMPI_SUCCESS : rc);
}
/*
* Cleanup/release module resources.
*/
int mca_btl_tcp2_finalize(struct mca_btl_base_module_t* btl)
{
mca_btl_tcp2_module_t* tcp_btl = (mca_btl_tcp2_module_t*) btl;
opal_list_item_t* item;
for( item = opal_list_remove_first(&tcp_btl->tcp_endpoints);
item != NULL;
item = opal_list_remove_first(&tcp_btl->tcp_endpoints)) {
mca_btl_tcp2_endpoint_t *endpoint = (mca_btl_tcp2_endpoint_t*)item;
OBJ_RELEASE(endpoint);
opal_progress_event_users_decrement();
}
free(tcp_btl);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,337 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2009 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_TCP_H
#define MCA_BTL_TCP_H
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
/* Open MPI includes */
#include "opal/mca/event/event.h"
#include "ompi/class/ompi_free_list.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/mca/btl/btl.h"
#include "opal/class/opal_hash_table.h"
#define MCA_BTL_TCP_STATISTICS 0
BEGIN_C_DECLS
/**
* TCP BTL component.
*/
struct mca_btl_tcp2_component_t {
mca_btl_base_component_2_0_0_t super; /**< base BTL component */
uint32_t tcp_addr_count; /**< total number of addresses */
uint32_t tcp_num_btls; /**< number of hcas available to the TCP component */
uint32_t tcp_num_links; /**< number of logical links per physical device */
struct mca_btl_tcp2_module_t **tcp_btls; /**< array of available BTL modules */
struct mca_btl_tcp2_proc_t* tcp_local; /**< local proc struct */
int tcp_free_list_num; /**< initial size of free lists */
int tcp_free_list_max; /**< maximum size of free lists */
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
int tcp_endpoint_cache; /**< amount of cache on each endpoint */
opal_hash_table_t tcp_procs; /**< hash table of tcp proc structures */
opal_list_t tcp_events; /**< list of pending tcp events */
opal_mutex_t tcp_lock; /**< lock for accessing module state */
opal_event_t tcp_recv_event; /**< recv event for IPv4 listen socket */
int tcp_listen_sd; /**< IPv4 listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< IPv4 listen port */
int32_t tcp_port_min; /**< IPv4 minimum port */
int32_t tcp_port_range; /**< IPv4 port range */
#if OPAL_WANT_IPV6
opal_event_t tcp6_recv_event; /**< recv event for IPv6 listen socket */
int tcp6_listen_sd; /**< IPv6 listen socket for incoming connection requests */
unsigned short tcp6_listen_port; /**< IPv6 listen port */
int32_t tcp6_port_min; /**< IPv4 minimum port */
int32_t tcp6_port_range; /**< IPv4 port range */
#endif
/* Port range restriction */
char* tcp_if_include; /**< comma seperated list of interface to include */
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
int tcp_sndbuf; /**< socket sndbuf size */
int tcp_rcvbuf; /**< socket rcvbuf size */
int tcp_disable_family; /**< disabled AF_family */
/* free list of fragment descriptors */
ompi_free_list_t tcp_frag_eager;
ompi_free_list_t tcp_frag_max;
ompi_free_list_t tcp_frag_user;
/* Do we want to use TCP_NODELAY? */
int tcp_use_nodelay;
};
typedef struct mca_btl_tcp2_component_t mca_btl_tcp2_component_t;
OMPI_MODULE_DECLSPEC extern mca_btl_tcp2_component_t mca_btl_tcp2_component;
/**
* BTL Module Interface
*/
struct mca_btl_tcp2_module_t {
mca_btl_base_module_t super; /**< base BTL interface */
uint16_t tcp_ifkindex; /** <BTL kernel interface index */
#if 0
int tcp_ifindex; /**< PTL interface index */
#endif
struct sockaddr_storage tcp_ifaddr; /**< PTL interface address */
uint32_t tcp_ifmask; /**< PTL interface netmask */
opal_list_t tcp_endpoints;
#if MCA_BTL_TCP_STATISTICS
size_t tcp_bytes_sent;
size_t tcp_bytes_recv;
size_t tcp_send_handler;
#endif
};
typedef struct mca_btl_tcp2_module_t mca_btl_tcp2_module_t;
extern mca_btl_tcp2_module_t mca_btl_tcp2_module;
#if defined(__WINDOWS__)
#define CLOSE_THE_SOCKET(socket) closesocket(socket)
#else
#define CLOSE_THE_SOCKET(socket) close(socket)
#endif /* defined(__WINDOWS__) */
/**
* Register TCP component parameters with the MCA framework
*/
extern int mca_btl_tcp2_component_open(void);
/**
* Any final cleanup before being unloaded.
*/
extern int mca_btl_tcp2_component_close(void);
/**
* TCP component initialization.
*
* @param num_btl_modules (OUT) Number of BTLs returned in BTL array.
* @param allow_multi_user_threads (OUT) Flag indicating wether BTL supports user threads (TRUE)
* @param have_hidden_threads (OUT) Flag indicating wether BTL uses threads (TRUE)
*/
extern mca_btl_base_module_t** mca_btl_tcp2_component_init(
int *num_btl_modules,
bool allow_multi_user_threads,
bool have_hidden_threads
);
/**
* TCP component control.
*/
int mca_btl_tcp2_component_control(
int param,
void* value,
size_t size
);
/**
* TCP component progress.
*/
extern int mca_btl_tcp2_component_progress(void);
/**
* Cleanup any resources held by the BTL.
*
* @param btl BTL instance.
* @return OMPI_SUCCESS or error status on failure.
*/
extern int mca_btl_tcp2_finalize(
struct mca_btl_base_module_t* btl
);
/**
* PML->BTL notification of change in the process list.
*
* @param btl (IN)
* @param nprocs (IN) Number of processes
* @param procs (IN) Set of processes
* @param peers (OUT) Set of (optional) peer addressing info.
* @param peers (IN/OUT) Set of processes that are reachable via this BTL.
* @return OMPI_SUCCESS or error status on failure.
*
*/
extern int mca_btl_tcp2_add_procs(
struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t** peers,
opal_bitmap_t* reachable
);
/**
* PML->BTL notification of change in the process list.
*
* @param btl (IN) BTL instance
* @param nproc (IN) Number of processes.
* @param procs (IN) Set of processes.
* @param peers (IN) Set of peer data structures.
* @return Status indicating if cleanup was successful
*
*/
extern int mca_btl_tcp2_del_procs(
struct mca_btl_base_module_t* btl,
size_t nprocs,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t** peers
);
/**
* Initiate an asynchronous send.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transfered
* @param tag (IN) The tag value used to notify the peer.
*/
extern int mca_btl_tcp2_send(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* descriptor,
mca_btl_base_tag_t tag
);
/**
* Initiate an asynchronous put.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
extern int mca_btl_tcp2_put(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* decriptor
);
/**
* Initiate an asynchronous get.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL addressing information
* @param descriptor (IN) Description of the data to be transferred
*/
extern int mca_btl_tcp2_get(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* btl_peer,
struct mca_btl_base_descriptor_t* decriptor
);
/**
* Allocate a descriptor with a segment of the requested size.
* Note that the BTL layer may choose to return a smaller size
* if it cannot support the request.
*
* @param btl (IN) BTL module
* @param size (IN) Request segment size.
*/
extern mca_btl_base_descriptor_t* mca_btl_tcp2_alloc(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
uint8_t order,
size_t size,
uint32_t flags);
/**
* Return a segment allocated by this BTL.
*
* @param btl (IN) BTL module
* @param descriptor (IN) Allocated descriptor.
*/
extern int mca_btl_tcp2_free(
struct mca_btl_base_module_t* btl,
mca_btl_base_descriptor_t* des);
/**
* Prepare a descriptor for send/rdma using the supplied
* convertor. If the convertor references data that is contigous,
* the descriptor may simply point to the user buffer. Otherwise,
* this routine is responsible for allocating buffer space and
* packing if required.
*
* @param btl (IN) BTL module
* @param endpoint (IN) BTL peer addressing
* @param convertor (IN) Data type convertor
* @param reserve (IN) Additional bytes requested by upper layer to precede user data
* @param size (IN/OUT) Number of bytes to prepare (IN), number of bytes actually prepared (OUT)
*/
mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_src(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
struct mca_mpool_base_registration_t*,
struct opal_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size,
uint32_t flags
);
extern mca_btl_base_descriptor_t* mca_btl_tcp2_prepare_dst(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
struct mca_mpool_base_registration_t*,
struct opal_convertor_t* convertor,
uint8_t order,
size_t reserve,
size_t* size,
uint32_t flags);
/**
* Fault Tolerance Event Notification Function
* @param state Checkpoint Stae
* @return OMPI_SUCCESS or failure status
*/
int mca_btl_tcp2_ft_event(int state);
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,72 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_TCP_ADDR_H
#define MCA_BTL_TCP_ADDR_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
/**
* Structure used to publish TCP connection information to peers.
*/
struct mca_btl_tcp2_addr_t {
/* the following information is exchanged between different
machines (read: byte order), so use network byte order
for everything and don't add padding
*/
#if OPAL_WANT_IPV6
struct in6_addr addr_inet; /**< IPv4/IPv6 listen address > */
#else
/* Bug, FIXME: needs testing */
struct my_in6_addr {
union {
uint32_t u6_addr32[4];
struct _my_in6_addr {
struct in_addr _addr_inet;
uint32_t _pad[3];
} _addr__inet;
} _union_inet;
} addr_inet;
#endif
in_port_t addr_port; /**< listen port */
uint16_t addr_ifkindex; /**< remote interface index assigned with
this address */
unsigned short addr_inuse; /**< local meaning only */
uint8_t addr_family; /**< AF_INET or AF_INET6 */
};
typedef struct mca_btl_tcp2_addr_t mca_btl_tcp2_addr_t;
#define MCA_BTL_TCP_AF_INET 0
#if OPAL_WANT_IPV6
# define MCA_BTL_TCP_AF_INET6 1
#endif
#endif

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,806 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2008 Sun Microsystems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "ompi_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "opal/opal_socket_errno.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#ifdef HAVE_TIME_H
#include <time.h>
#endif /* HAVE_TIME_H */
#include "opal/mca/event/event.h"
#include "ompi/types.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "opal/util/net.h"
#include "btl_tcp2.h"
#include "btl_tcp2_endpoint.h"
#include "btl_tcp2_proc.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_addr.h"
/*
* Initialize state of the endpoint instance.
*
*/
static void mca_btl_tcp2_endpoint_construct(mca_btl_tcp2_endpoint_t* endpoint)
{
endpoint->endpoint_btl = NULL;
endpoint->endpoint_proc = NULL;
endpoint->endpoint_addr = NULL;
endpoint->endpoint_sd = -1;
endpoint->endpoint_send_frag = 0;
endpoint->endpoint_recv_frag = 0;
endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
endpoint->endpoint_retries = 0;
endpoint->endpoint_nbo = false;
#if MCA_BTL_TCP_ENDPOINT_CACHE
endpoint->endpoint_cache = NULL;
endpoint->endpoint_cache_pos = NULL;
endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
}
/*
* Destroy a endpoint
*
*/
static void mca_btl_tcp2_endpoint_destruct(mca_btl_tcp2_endpoint_t* endpoint)
{
mca_btl_tcp2_proc_remove(endpoint->endpoint_proc, endpoint);
mca_btl_tcp2_endpoint_close(endpoint);
OBJ_DESTRUCT(&endpoint->endpoint_frags);
OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp2_endpoint_t,
opal_list_item_t,
mca_btl_tcp2_endpoint_construct,
mca_btl_tcp2_endpoint_destruct);
static void mca_btl_tcp2_endpoint_construct(mca_btl_base_endpoint_t* btl_endpoint);
static void mca_btl_tcp2_endpoint_destruct(mca_btl_base_endpoint_t* btl_endpoint);
static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t*);
static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t*);
static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user);
static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user);
/*
* Diagnostics: change this to "1" to enable the function
* mca_btl_tcp2_endpoint_dump(), below
*/
#define WANT_PEER_DUMP 0
/*
* diagnostics
*/
#if WANT_PEER_DUMP
static void mca_btl_tcp2_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
{
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
#if OPAL_WANT_IPV6
struct sockaddr_storage inaddr;
#else
struct sockaddr_in inaddr;
#endif
opal_socklen_t obtlen;
opal_socklen_t addrlen = sizeof(inaddr);
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
{
char *address;
address = (char *) opal_net_get_hostname((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(src, "%s", address);
}
}
#else
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
#endif
getpeername(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_WANT_IPV6
{
char *address;
address = (char *) opal_net_get_hostname ((struct sockaddr*) &inaddr);
if (NULL != address) {
sprintf(dst, "%s", address);
}
}
#else
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
#endif
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
nodelay = 0;
#endif
BTL_VERBOSE(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags));
}
#endif
/*
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
*/
static inline void mca_btl_tcp2_endpoint_event_init(mca_btl_base_endpoint_t* btl_endpoint)
{
#if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache = (char*)malloc(mca_btl_tcp2_component.tcp_endpoint_cache);
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_recv_handler,
btl_endpoint );
/**
* The send event should be non persistent until the endpoint is
* completely connected. This means, when the event is created it
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp2_endpoint_send_handler,
btl_endpoint);
}
/*
* Attempt to send a fragment using a given endpoint. If the endpoint is not connected,
* queue the fragment and start the connection as required.
*/
int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp2_frag_t* frag)
{
int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
case MCA_BTL_TCP_CONNECT_ACK:
case MCA_BTL_TCP_CLOSED:
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
if(btl_endpoint->endpoint_state == MCA_BTL_TCP_CLOSED)
rc = mca_btl_tcp2_endpoint_start_connect(btl_endpoint);
break;
case MCA_BTL_TCP_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_BTL_TCP_CONNECTED:
if (btl_endpoint->endpoint_send_frag == NULL) {
if(frag->base.des_flags & MCA_BTL_DES_FLAGS_PRIORITY &&
mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd)) {
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
if( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK ) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
}
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
return 1;
} else {
btl_endpoint->endpoint_send_frag = frag;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
}
} else {
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
}
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
return rc;
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the endpoints endpoint.
*/
static int mca_btl_tcp2_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(btl_endpoint->endpoint_sd, (const char *)ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("send() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
/*
* Send the globally unique identifier for this process to a endpoint on
* a newly connected socket.
*/
static int mca_btl_tcp2_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
{
/* send process identifier to remote endpoint */
mca_btl_tcp2_proc_t* btl_proc = mca_btl_tcp2_proc_local();
orte_process_name_t guid = btl_proc->proc_ompi->proc_name;
ORTE_PROCESS_NAME_HTON(guid);
if(mca_btl_tcp2_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
sizeof(guid)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
}
/*
* Check the state of this endpoint. If the incoming connection request matches
* our endpoints address, check the state of our connection:
* (1) if a connection has not been attempted, accept the connection
* (2) if a connection has not been established, and the endpoints process identifier
* is less than the local process, accept the connection
* otherwise, reject the connection and continue with the current connection
*/
bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
struct sockaddr* addr, int sd)
{
mca_btl_tcp2_proc_t* this_proc = mca_btl_tcp2_proc_local();
mca_btl_tcp2_proc_t *endpoint_proc = btl_endpoint->endpoint_proc;
int cmpval;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
if(NULL == btl_endpoint->endpoint_addr) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&endpoint_proc->proc_ompi->proc_name,
&this_proc->proc_ompi->proc_name);
if((btl_endpoint->endpoint_sd < 0) ||
(btl_endpoint->endpoint_state != MCA_BTL_TCP_CONNECTED &&
cmpval < 0)) {
mca_btl_tcp2_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_sd = sd;
if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) != OMPI_SUCCESS) {
mca_btl_tcp2_endpoint_close(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
mca_btl_tcp2_endpoint_event_init(btl_endpoint);
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
mca_btl_tcp2_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp2_endpoint_dump(btl_endpoint, "accepted");
#endif
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return true;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return false;
}
/*
* Remove any event registrations associated with the socket
* and update the endpoint state to reflect the connection has
* been closed.
*/
void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{
if(btl_endpoint->endpoint_sd < 0)
return;
btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
btl_endpoint->endpoint_retries++;
opal_event_del(&btl_endpoint->endpoint_recv_event);
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
}
/*
* Setup endpoint state to reflect that connection has been established,
* and start any pending sends. This function should be called with the
* send lock locked.
*/
static void mca_btl_tcp2_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint)
{
/* setup socket options */
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTED;
btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp2_endpoint_send_handler,
btl_endpoint );
if(opal_list_get_size(&btl_endpoint->endpoint_frags) > 0) {
if(NULL == btl_endpoint->endpoint_send_frag)
btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
}
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the endpoints endpoint.
*/
static int mca_btl_tcp2_endpoint_recv_blocking(mca_btl_base_endpoint_t* btl_endpoint, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = recv(btl_endpoint->endpoint_sd, (char *)ptr+cnt, size-cnt, 0);
/* remote closed connection */
if(retval == 0) {
mca_btl_tcp2_endpoint_close(btl_endpoint);
return -1;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
BTL_ERROR(("recv(%d) failed: %s (%d)",
btl_endpoint->endpoint_sd, strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
/*
* Receive the endpoints globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static int mca_btl_tcp2_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
{
orte_process_name_t guid;
mca_btl_tcp2_proc_t* btl_proc = btl_endpoint->endpoint_proc;
if((mca_btl_tcp2_endpoint_recv_blocking(btl_endpoint, &guid, sizeof(orte_process_name_t))) != sizeof(orte_process_name_t)) {
return OMPI_ERR_UNREACH;
}
ORTE_PROCESS_NAME_NTOH(guid);
/* compare this to the expected values */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&btl_proc->proc_ompi->proc_name,
&guid)) {
BTL_ERROR(("received unexpected process identifier %s",
ORTE_NAME_PRINT(&guid)));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
}
void mca_btl_tcp2_set_socket_options(int sd)
{
int optval;
#if defined(TCP_NODELAY)
optval = mca_btl_tcp2_component.tcp_use_nodelay;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_SNDBUF)
if(mca_btl_tcp2_component.tcp_sndbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp2_component.tcp_sndbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_RCVBUF)
if(mca_btl_tcp2_component.tcp_rcvbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp2_component.tcp_rcvbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
}
/*
* Start a connection to the endpoint. This will likely not complete,
* as the socket is set to non-blocking, so register for event
* notification of connect completion. On connection we send
* our globally unique process identifier to the endpoint and wait for
* the endpoints response.
*/
static int mca_btl_tcp2_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
{
int rc,flags;
struct sockaddr_storage endpoint_addr;
/* By default consider a IPv4 connection */
uint16_t af_family = AF_INET;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
#if OPAL_WANT_IPV6
if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
af_family = AF_INET6;
addrlen = sizeof (struct sockaddr_in6);
}
#endif
btl_endpoint->endpoint_sd = socket(af_family, SOCK_STREAM, 0);
if (btl_endpoint->endpoint_sd < 0) {
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
/* setup socket buffer sizes */
mca_btl_tcp2_set_socket_options(btl_endpoint->endpoint_sd);
/* setup event callbacks */
mca_btl_tcp2_endpoint_event_init(btl_endpoint);
/* setup the socket as non-blocking */
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
/* start the connect - will likely fail with EINPROGRESS */
mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
opal_output_verbose(20, mca_btl_base_output,
"btl: tcp: attempting to connect() to address %s on port %d",
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
btl_endpoint->endpoint_addr->addr_port);
if(connect(btl_endpoint->endpoint_sd, (struct sockaddr*)&endpoint_addr, addrlen) < 0) {
/* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return OMPI_SUCCESS;
}
{
char *address;
address = opal_net_get_hostname((struct sockaddr*) &endpoint_addr);
BTL_PEER_ERROR( btl_endpoint->endpoint_proc->proc_ompi,
( "Unable to connect to the peer %s on port %d: %s\n",
address,
btl_endpoint->endpoint_addr->addr_port, strerror(opal_socket_errno) ) );
}
mca_btl_tcp2_endpoint_close(btl_endpoint);
btl_endpoint->endpoint_retries++;
return OMPI_ERR_UNREACH;
}
/* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint)) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else {
mca_btl_tcp2_endpoint_close(btl_endpoint);
}
return rc;
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the endpoint on the
* newly connected socket.
*/
static void mca_btl_tcp2_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
{
int so_error = 0;
opal_socklen_t so_length = sizeof(so_error);
struct sockaddr_storage endpoint_addr;
mca_btl_tcp2_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
/* unregister from receiving event notifications */
opal_event_del(&btl_endpoint->endpoint_send_event);
/* check connect completion status */
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return;
}
if(so_error == EINPROGRESS || so_error == EWOULDBLOCK) {
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return;
}
if(so_error != 0) {
BTL_ERROR(("connect() to %s failed: %s (%d)",
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
strerror(so_error), so_error));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return;
}
if(mca_btl_tcp2_endpoint_send_connect_ack(btl_endpoint) == OMPI_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else {
mca_btl_tcp2_endpoint_close(btl_endpoint);
}
}
/*
* A file descriptor is available/ready for recv. Check the state
* of the socket and take the appropriate action.
*/
static void mca_btl_tcp2_endpoint_recv_handler(int sd, short flags, void* user)
{
mca_btl_base_endpoint_t* btl_endpoint = (mca_btl_base_endpoint_t *)user;
/* Make sure we don't have a race between a thread that remove the
* recv event, and one event already scheduled.
*/
if( sd != btl_endpoint->endpoint_sd )
return;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_recv_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECT_ACK:
{
int rc = OMPI_ERROR;
rc = mca_btl_tcp2_endpoint_recv_connect_ack(btl_endpoint);
if( OMPI_SUCCESS == rc ) {
/* we are now connected. Start sending the data */
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_tcp2_endpoint_connected(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp2_endpoint_dump(btl_endpoint, "connected");
#endif
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
}
case MCA_BTL_TCP_CONNECTED:
{
mca_btl_tcp2_frag_t* frag;
frag = btl_endpoint->endpoint_recv_frag;
if(NULL == frag) {
int rc;
if(mca_btl_tcp2_module.super.btl_max_send_size >
mca_btl_tcp2_module.super.btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag, rc);
} else {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag, rc);
}
if(NULL == frag) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
}
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current fragment */
if(mca_btl_tcp2_frag_recv(frag, btl_endpoint->endpoint_sd) == false) {
btl_endpoint->endpoint_recv_frag = frag;
} else {
btl_endpoint->endpoint_recv_frag = NULL;
if( MCA_BTL_TCP_HDR_TYPE_SEND == frag->hdr.type ) {
mca_btl_active_message_callback_t* reg;
reg = mca_btl_base_active_message_trigger + frag->hdr.base.tag;
reg->cbfunc(&frag->btl->super, frag->hdr.base.tag, &frag->base, reg->cbdata);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
/* If the cache still contain some data we can reuse the same fragment
* until we flush it completly.
*/
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
goto data_still_pending_on_endpoint;
}
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
MCA_BTL_TCP_FRAG_RETURN(frag);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
assert( 0 == btl_endpoint->endpoint_cache_length );
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
}
case MCA_BTL_TCP_CLOSED:
/* This is a thread-safety issue. As multiple threads are allowed
* to generate events (in the lib event) we endup with several
* threads executing the receive callback, when we reach the end
* of the MPI_Finalize. The first one will close the connections,
* and all others will complain.
*/
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
break;
default:
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
mca_btl_tcp2_endpoint_close(btl_endpoint);
break;
}
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
static void mca_btl_tcp2_endpoint_send_handler(int sd, short flags, void* user)
{
mca_btl_tcp2_endpoint_t* btl_endpoint = (mca_btl_tcp2_endpoint_t *)user;
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
mca_btl_tcp2_endpoint_complete_connect(btl_endpoint);
break;
case MCA_BTL_TCP_CONNECTED:
/* complete the current send */
while (NULL != btl_endpoint->endpoint_send_frag) {
mca_btl_tcp2_frag_t* frag = btl_endpoint->endpoint_send_frag;
int btl_ownership = (frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP);
if(mca_btl_tcp2_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
break;
}
/* progress any pending sends */
btl_endpoint->endpoint_send_frag = (mca_btl_tcp2_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
/* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
assert( frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK );
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
}
/* if nothing else to do unregister for send event notifications */
if(NULL == btl_endpoint->endpoint_send_frag) {
opal_event_del(&btl_endpoint->endpoint_send_event);
}
break;
default:
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
opal_event_del(&btl_endpoint->endpoint_send_event);
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
}

Просмотреть файл

@ -0,0 +1,83 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_TCP_ENDPOINT_H
#define MCA_BTL_TCP_ENDPOINT_H
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2.h"
BEGIN_C_DECLS
#define MCA_BTL_TCP_ENDPOINT_CACHE 1
/**
* State of TCP endpoint connection.
*/
typedef enum {
MCA_BTL_TCP_CONNECTING = 0,
MCA_BTL_TCP_CONNECT_ACK,
MCA_BTL_TCP_CLOSED,
MCA_BTL_TCP_FAILED,
MCA_BTL_TCP_CONNECTED
} mca_btl_tcp2_state_t;
/**
* An abstraction that represents a connection to a endpoint process.
* An instance of mca_btl_base_endpoint_t is associated w/ each process
* and BTL pair at startup. However, connections to the endpoint
* are established dynamically on an as-needed basis:
*/
struct mca_btl_base_endpoint_t {
opal_list_item_t super;
struct mca_btl_tcp2_module_t* endpoint_btl; /**< BTL instance that created this connection */
struct mca_btl_tcp2_proc_t* endpoint_proc; /**< proc structure corresponding to endpoint */
struct mca_btl_tcp2_addr_t* endpoint_addr; /**< address of endpoint */
int endpoint_sd; /**< socket connection to endpoint */
#if MCA_BTL_TCP_ENDPOINT_CACHE
char* endpoint_cache; /**< cache for the recv (reduce the number of recv syscall) */
char* endpoint_cache_pos; /**< current position in the cache */
size_t endpoint_cache_length; /**< length of the data in the cache */
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
struct mca_btl_tcp2_frag_t* endpoint_send_frag; /**< current send frag being processed */
struct mca_btl_tcp2_frag_t* endpoint_recv_frag; /**< current recv frag being processed */
mca_btl_tcp2_state_t endpoint_state; /**< current state of the connection */
size_t endpoint_retries; /**< number of connection retries attempted */
opal_list_t endpoint_frags; /**< list of pending frags to send */
opal_mutex_t endpoint_send_lock; /**< lock for concurrent access to endpoint state */
opal_mutex_t endpoint_recv_lock; /**< lock for concurrent access to endpoint state */
opal_event_t endpoint_send_event; /**< event for async processing of send frags */
opal_event_t endpoint_recv_event; /**< event for async processing of recv frags */
bool endpoint_nbo; /**< convert headers to network byte order? */
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_tcp2_endpoint_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_endpoint_t);
void mca_btl_tcp2_set_socket_options(int sd);
void mca_btl_tcp2_endpoint_close(mca_btl_base_endpoint_t*);
int mca_btl_tcp2_endpoint_send(mca_btl_base_endpoint_t*, struct mca_btl_tcp2_frag_t*);
bool mca_btl_tcp2_endpoint_accept(mca_btl_base_endpoint_t*, struct sockaddr*, int);
void mca_btl_tcp2_endpoint_shutdown(mca_btl_base_endpoint_t*);
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,285 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_NET_UIO_H
#include <net/uio.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include "opal/opal_socket_errno.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_endpoint.h"
static void mca_btl_tcp2_frag_common_constructor(mca_btl_tcp2_frag_t* frag)
{
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
}
static void mca_btl_tcp2_frag_eager_constructor(mca_btl_tcp2_frag_t* frag)
{
frag->size = mca_btl_tcp2_module.super.btl_eager_limit;
frag->my_list = &mca_btl_tcp2_component.tcp_frag_eager;
mca_btl_tcp2_frag_common_constructor(frag);
}
static void mca_btl_tcp2_frag_max_constructor(mca_btl_tcp2_frag_t* frag)
{
frag->size = mca_btl_tcp2_module.super.btl_max_send_size;
frag->my_list = &mca_btl_tcp2_component.tcp_frag_max;
mca_btl_tcp2_frag_common_constructor(frag);
}
static void mca_btl_tcp2_frag_user_constructor(mca_btl_tcp2_frag_t* frag)
{
frag->size = 0;
frag->my_list = &mca_btl_tcp2_component.tcp_frag_user;
mca_btl_tcp2_frag_common_constructor(frag);
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp2_frag_t,
mca_btl_base_descriptor_t,
NULL,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp2_frag_eager_t,
mca_btl_base_descriptor_t,
mca_btl_tcp2_frag_eager_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp2_frag_max_t,
mca_btl_base_descriptor_t,
mca_btl_tcp2_frag_max_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp2_frag_user_t,
mca_btl_base_descriptor_t,
mca_btl_tcp2_frag_user_constructor,
NULL);
bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t* frag, int sd)
{
int cnt=-1;
size_t i, num_vecs;
/* non-blocking write, but continue if interrupted */
while(cnt < 0) {
cnt = writev(sd, frag->iov_ptr, frag->iov_cnt);
if(cnt < 0) {
switch(opal_socket_errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
case EFAULT:
BTL_ERROR(("mca_btl_tcp2_frag_send: writev error (%p, %lu)\n\t%s(%lu)\n",
frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), (unsigned long) frag->iov_cnt));
mca_btl_tcp2_endpoint_close(frag->endpoint);
return false;
default:
BTL_ERROR(("mca_btl_tcp2_frag_send: writev failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
mca_btl_tcp2_endpoint_close(frag->endpoint);
return false;
}
}
}
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for(i=0; i<num_vecs; i++) {
if(cnt >= (int)frag->iov_ptr->iov_len) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_ptr++;
frag->iov_idx++;
frag->iov_cnt--;
} else {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
break;
}
}
return (frag->iov_cnt == 0);
}
bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t* frag, int sd)
{
int cnt, dont_copy_data = 0;
size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
repeat:
num_vecs = frag->iov_cnt;
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length;
/* It's strange at the first look but cnt have to be set to the full amount of data
* available. After going to advance_iov_position we will use cnt to detect if there
* is still some data pending.
*/
cnt = length = btl_endpoint->endpoint_cache_length;
for( i = 0; i < frag->iov_cnt; i++ ) {
if( length > frag->iov_ptr[i].iov_len )
length = frag->iov_ptr[i].iov_len;
if( (0 == dont_copy_data) || (length < frag->iov_ptr[i].iov_len) ) {
memcpy( frag->iov_ptr[i].iov_base, btl_endpoint->endpoint_cache_pos, length );
} else {
frag->segments[0].seg_addr.pval = btl_endpoint->endpoint_cache_pos;
frag->iov_ptr[i].iov_base = btl_endpoint->endpoint_cache_pos;
}
btl_endpoint->endpoint_cache_pos += length;
btl_endpoint->endpoint_cache_length -= length;
length = btl_endpoint->endpoint_cache_length;
if( 0 == length ) {
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
break;
}
}
goto advance_iov_position;
}
/* What's happens if all iovecs are used by the fragment ? It still work, as we reserve one
* iovec for the caching in the fragment structure (the +1).
*/
frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache_pos;
frag->iov_ptr[num_vecs].iov_len =
mca_btl_tcp2_component.tcp_endpoint_cache - btl_endpoint->endpoint_cache_length;
num_vecs++;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* non-blocking read, but continue if interrupted */
cnt = -1;
while( cnt < 0 ) {
cnt = readv(sd, frag->iov_ptr, num_vecs);
if( 0 < cnt ) goto advance_iov_position;
if( cnt == 0 ) {
mca_btl_tcp2_endpoint_close(btl_endpoint);
return false;
}
switch(opal_socket_errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
case EFAULT:
BTL_ERROR(("mca_btl_tcp2_frag_recv: readv error (%p, %lu)\n\t%s(%lu)\n",
frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), (unsigned long) frag->iov_cnt));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return false;
default:
BTL_ERROR(("mca_btl_tcp2_frag_recv: readv failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
mca_btl_tcp2_endpoint_close(btl_endpoint);
return false;
}
};
advance_iov_position:
/* if the read didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for( i = 0; i < num_vecs; i++ ) {
if( cnt < (int)frag->iov_ptr->iov_len ) {
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
cnt = 0;
break;
}
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
frag->iov_cnt--;
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache_length = cnt;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* read header */
if(frag->iov_cnt == 0) {
if (btl_endpoint->endpoint_nbo && frag->iov_idx == 1) MCA_BTL_TCP_HDR_NTOH(frag->hdr);
switch(frag->hdr.type) {
case MCA_BTL_TCP_HDR_TYPE_SEND:
if(frag->iov_idx == 1 && frag->hdr.size) {
frag->segments[0].seg_addr.pval = frag+1;
frag->segments[0].seg_len = frag->hdr.size;
frag->iov[1].iov_base = (IOVBASE_TYPE*)(frag->segments[0].seg_addr.pval);
frag->iov[1].iov_len = frag->hdr.size;
frag->iov_cnt++;
#ifndef __sparc
/* The following cannot be done for sparc code
* because it causes alignment errors when accessing
* structures later on in the btl and pml code.
*/
dont_copy_data = 1;
#endif
goto repeat;
}
break;
case MCA_BTL_TCP_HDR_TYPE_PUT:
if(frag->iov_idx == 1) {
frag->iov[1].iov_base = (IOVBASE_TYPE*)frag->segments;
frag->iov[1].iov_len = frag->hdr.count * sizeof(mca_btl_base_segment_t);
frag->iov_cnt++;
goto repeat;
} else if (frag->iov_idx == 2) {
for( i = 0; i < frag->hdr.count; i++ ) {
frag->iov[i+2].iov_base = (IOVBASE_TYPE*)ompi_ptr_ltop(frag->segments[i].seg_addr.lval);
frag->iov[i+2].iov_len = frag->segments[i].seg_len;
}
frag->iov_cnt += frag->hdr.count;
goto repeat;
}
break;
case MCA_BTL_TCP_HDR_TYPE_GET:
default:
break;
}
return true;
}
return false;
}

Просмотреть файл

@ -0,0 +1,129 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_TCP_FRAG_H
#define MCA_BTL_TCP_FRAG_H
#define MCA_BTL_TCP_FRAG_ALIGN (8)
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_NET_UIO_H
#include <net/uio.h>
#endif
#include "btl_tcp2.h"
#include "btl_tcp2_hdr.h"
BEGIN_C_DECLS
#define MCA_BTL_TCP_FRAG_IOVEC_NUMBER 4
/**
* TCP fragment derived type.
*/
struct mca_btl_tcp2_frag_t {
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segments[2];
struct mca_btl_base_endpoint_t *endpoint;
struct mca_btl_tcp2_module_t* btl;
mca_btl_tcp2_hdr_t hdr;
struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
struct iovec *iov_ptr;
size_t iov_cnt;
size_t iov_idx;
size_t size;
int rc;
ompi_free_list_t* my_list;
};
typedef struct mca_btl_tcp2_frag_t mca_btl_tcp2_frag_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_t);
typedef struct mca_btl_tcp2_frag_t mca_btl_tcp2_frag_eager_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_eager_t);
typedef struct mca_btl_tcp2_frag_t mca_btl_tcp2_frag_max_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_max_t);
typedef struct mca_btl_tcp2_frag_t mca_btl_tcp2_frag_user_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_frag_user_t);
/*
* Macros to allocate/return descriptors from module specific
* free list(s).
*/
#define MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag, rc) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_eager, item, rc); \
frag = (mca_btl_tcp2_frag_t*) item; \
}
#define MCA_BTL_TCP_FRAG_ALLOC_MAX(frag, rc) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_max, item, rc); \
frag = (mca_btl_tcp2_frag_t*) item; \
}
#define MCA_BTL_TCP_FRAG_ALLOC_USER(frag, rc) \
{ \
ompi_free_list_item_t *item; \
OMPI_FREE_LIST_GET(&mca_btl_tcp2_component.tcp_frag_user, item, rc); \
frag = (mca_btl_tcp2_frag_t*) item; \
}
#define MCA_BTL_TCP_FRAG_RETURN(frag) \
{ \
OMPI_FREE_LIST_RETURN(frag->my_list, (ompi_free_list_item_t*)(frag)); \
}
#define MCA_BTL_TCP_FRAG_INIT_DST(frag,ep) \
do { \
frag->rc = 0; \
frag->btl = ep->endpoint_btl; \
frag->endpoint = ep; \
frag->iov[0].iov_len = sizeof(frag->hdr); \
frag->iov[0].iov_base = (IOVBASE_TYPE*)&frag->hdr; \
frag->iov_cnt = 1; \
frag->iov_idx = 0; \
frag->iov_ptr = frag->iov; \
frag->base.des_src = NULL; \
frag->base.des_dst_cnt = 0; \
frag->base.des_dst = frag->segments; \
frag->base.des_dst_cnt = 1; \
} while(0)
bool mca_btl_tcp2_frag_send(mca_btl_tcp2_frag_t*, int sd);
bool mca_btl_tcp2_frag_recv(mca_btl_tcp2_frag_t*, int sd);
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,54 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Los Alamos National Security, LLC. All rights
* reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "btl_tcp2.h"
#include "btl_tcp2_frag.h"
#include "btl_tcp2_proc.h"
#include "btl_tcp2_endpoint.h"
#include "btl_tcp2_ft.h"
int mca_btl_tcp2_ft_event(int state)
{
if(OPAL_CRS_CHECKPOINT == state) {
;
}
else if(OPAL_CRS_CONTINUE == state) {
;
}
else if(OPAL_CRS_RESTART == state) {
;
}
else if(OPAL_CRS_TERM == state ) {
;
}
else {
;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,34 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_BTL_TCP_FT_H
#define MCA_BTL_TCP_FT_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
BEGIN_C_DECLS
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,58 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_TCP_HDR_H
#define MCA_BTL_TCP_HDR_H
#include "ompi_config.h"
#include "ompi/mca/btl/base/base.h"
#include "btl_tcp2.h"
BEGIN_C_DECLS
/**
* TCP header.
*/
#define MCA_BTL_TCP_HDR_TYPE_SEND 1
#define MCA_BTL_TCP_HDR_TYPE_PUT 2
#define MCA_BTL_TCP_HDR_TYPE_GET 3
struct mca_btl_tcp2_hdr_t {
mca_btl_base_header_t base;
uint8_t type;
uint16_t count;
uint32_t size;
};
typedef struct mca_btl_tcp2_hdr_t mca_btl_tcp2_hdr_t;
#define MCA_BTL_TCP_HDR_HTON(hdr) \
do { \
hdr.count = htons(hdr.count); \
hdr.size = htonl(hdr.size); \
} while (0)
#define MCA_BTL_TCP_HDR_NTOH(hdr) \
do { \
hdr.count = ntohs(hdr.count); \
hdr.size = ntohl(hdr.size); \
} while (0)
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,799 @@
/*
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2010 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008-2010 Oracle and/or its affiliates. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "opal/util/arch.h"
#include "opal/util/argv.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "btl_tcp2.h"
#include "btl_tcp2_proc.h"
static void mca_btl_tcp2_proc_construct(mca_btl_tcp2_proc_t* proc);
static void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* proc);
static mca_btl_tcp2_interface_t** local_interfaces = NULL;
static int local_kindex_to_index[MAX_KERNEL_INTERFACE_INDEX];
static size_t num_local_interfaces, max_local_interfaces;
static mca_btl_tcp2_interface_t** peer_interfaces = NULL;
static size_t num_peer_interfaces, max_peer_interfaces;
static int peer_kindex_to_index[MAX_KERNEL_INTERFACE_INDEX];
static unsigned int *best_assignment;
static int max_assignment_weight;
static int max_assignment_cardinality;
static enum mca_btl_tcp2_connection_quality **weights;
static struct mca_btl_tcp2_addr_t ***best_addr;
OBJ_CLASS_INSTANCE( mca_btl_tcp2_proc_t,
opal_list_item_t,
mca_btl_tcp2_proc_construct,
mca_btl_tcp2_proc_destruct );
void mca_btl_tcp2_proc_construct(mca_btl_tcp2_proc_t* tcp_proc)
{
tcp_proc->proc_ompi = 0;
tcp_proc->proc_addrs = NULL;
tcp_proc->proc_addr_count = 0;
tcp_proc->proc_endpoints = NULL;
tcp_proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&tcp_proc->proc_lock, opal_mutex_t);
}
/*
* Cleanup ib proc instance
*/
void mca_btl_tcp2_proc_destruct(mca_btl_tcp2_proc_t* tcp_proc)
{
/* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
opal_hash_table_remove_value_uint64(&mca_btl_tcp2_component.tcp_procs,
orte_util_hash_name(&tcp_proc->proc_ompi->proc_name));
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
/* release resources */
if(NULL != tcp_proc->proc_endpoints) {
free(tcp_proc->proc_endpoints);
}
OBJ_DESTRUCT(&tcp_proc->proc_lock);
}
/*
* Create a TCP process structure. There is a one-to-one correspondence
* between a ompi_proc_t and a mca_btl_tcp2_proc_t instance. We cache
* additional data (specifically the list of mca_btl_tcp2_endpoint_t instances,
* and published addresses) associated w/ a given destination on this
* datastructure.
*/
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc)
{
int rc;
size_t size;
mca_btl_tcp2_proc_t* btl_proc;
uint64_t hash = orte_util_hash_name(&ompi_proc->proc_name);
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
rc = opal_hash_table_get_value_uint64(&mca_btl_tcp2_component.tcp_procs,
hash, (void**)&btl_proc);
if(OMPI_SUCCESS == rc) {
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
return btl_proc;
}
btl_proc = OBJ_NEW(mca_btl_tcp2_proc_t);
if(NULL == btl_proc)
return NULL;
btl_proc->proc_ompi = ompi_proc;
/* add to hash table of all proc instance */
opal_hash_table_set_value_uint64(&mca_btl_tcp2_component.tcp_procs,
hash, btl_proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
/* lookup tcp parameters exported by this proc */
rc = ompi_modex_recv( &mca_btl_tcp2_component.super.btl_version,
ompi_proc,
(void**)&btl_proc->proc_addrs,
&size );
if(rc != OMPI_SUCCESS) {
BTL_ERROR(("mca_base_modex_recv: failed with return value=%d", rc));
OBJ_RELEASE(btl_proc);
return NULL;
}
if(0 != (size % sizeof(mca_btl_tcp2_addr_t))) {
BTL_ERROR(("mca_base_modex_recv: invalid size %lu: btl-size: %lu\n",
(unsigned long) size, (unsigned long)sizeof(mca_btl_tcp2_addr_t)));
return NULL;
}
btl_proc->proc_addr_count = size / sizeof(mca_btl_tcp2_addr_t);
/* allocate space for endpoint array - one for each exported address */
btl_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc((1 + btl_proc->proc_addr_count) *
sizeof(mca_btl_base_endpoint_t*));
if(NULL == btl_proc->proc_endpoints) {
OBJ_RELEASE(btl_proc);
return NULL;
}
if(NULL == mca_btl_tcp2_component.tcp_local && ompi_proc == ompi_proc_local()) {
mca_btl_tcp2_component.tcp_local = btl_proc;
}
{
/* convert the OMPI addr_family field to OS constants,
* so we can check for AF_INET (or AF_INET6) and don't have
* to deal with byte ordering anymore.
*/
unsigned int i;
for (i = 0; i < btl_proc->proc_addr_count; i++) {
if (MCA_BTL_TCP_AF_INET == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET;
}
#if OPAL_WANT_IPV6
if (MCA_BTL_TCP_AF_INET6 == btl_proc->proc_addrs[i].addr_family) {
btl_proc->proc_addrs[i].addr_family = AF_INET6;
}
#endif
}
}
return btl_proc;
}
static void evaluate_assignment(int *a) {
size_t i;
unsigned int max_interfaces = num_local_interfaces;
int assignment_weight = 0;
int assignment_cardinality = 0;
if(max_interfaces < num_peer_interfaces) {
max_interfaces = num_peer_interfaces;
}
for(i = 0; i < max_interfaces; ++i) {
if(0 < weights[i][a[i]-1]) {
++assignment_cardinality;
assignment_weight += weights[i][a[i]-1];
}
}
/*
* check wether current solution beats all previous solutions
*/
if(assignment_cardinality > max_assignment_cardinality
|| (assignment_cardinality == max_assignment_cardinality
&& assignment_weight > max_assignment_weight)) {
for(i = 0; i < max_interfaces; ++i) {
best_assignment[i] = a[i]-1;
}
max_assignment_weight = assignment_weight;
max_assignment_cardinality = assignment_cardinality;
}
}
static void visit(int k, int level, int siz, int *a)
{
level = level+1; a[k] = level;
if (level == siz) {
evaluate_assignment(a);
} else {
int i;
for ( i = 0; i < siz; i++)
if (a[i] == 0)
visit(i, level, siz, a);
}
level = level-1; a[k] = 0;
}
static void mca_btl_tcp2_initialise_interface(mca_btl_tcp2_interface_t* interface,
int ifk_index, int index)
{
interface->kernel_index = ifk_index;
interface->peer_interface = -1;
interface->ipv4_address = NULL;
interface->ipv6_address = NULL;
interface->index = index;
interface->inuse = 0;
}
static mca_btl_tcp2_interface_t** mca_btl_tcp2_retrieve_local_interfaces(void)
{
struct sockaddr_storage local_addr;
char local_if_name[IF_NAMESIZE];
char **include, **exclude, **argv;
int idx;
if( NULL != local_interfaces )
return local_interfaces;
max_local_interfaces = MAX_KERNEL_INTERFACES;
num_local_interfaces = 0;
local_interfaces = (mca_btl_tcp2_interface_t**)calloc( max_local_interfaces, sizeof(mca_btl_tcp2_interface_t*) );
if( NULL == local_interfaces )
return NULL;
memset(local_kindex_to_index, -1, sizeof(int)*MAX_KERNEL_INTERFACE_INDEX);
/* Collect up the list of included and excluded interfaces, if any */
include = opal_argv_split(mca_btl_tcp2_component.tcp_if_include,',');
exclude = opal_argv_split(mca_btl_tcp2_component.tcp_if_exclude,',');
/*
* identify all kernel interfaces and the associated addresses of
* the local node
*/
for( idx = opal_ifbegin(); idx >= 0; idx = opal_ifnext (idx) ) {
int kindex, index;
bool skip = false;
opal_ifindextoaddr (idx, (struct sockaddr*) &local_addr, sizeof (local_addr));
opal_ifindextoname (idx, local_if_name, sizeof (local_if_name));
/* If we were given a list of included interfaces, then check
* to see if the current one is a member of this set. If so,
* drop down and complete processing. If not, skip it and
* continue on to the next one. Note that providing an include
* list will override providing an exclude list as the two are
* mutually exclusive. This matches how it works in
* mca_btl_tcp2_component_create_instances() which is the function
* that exports the interfaces. */
if(NULL != include) {
argv = include;
skip = true;
while(argv && *argv) {
/* When comparing included interfaces, we look for exact matches.
That is why we are using strcmp() here. */
if (0 == strcmp(*argv, local_if_name)) {
skip = false;
break;
}
argv++;
}
} else if (NULL != exclude) {
/* If we were given a list of excluded interfaces, then check to see if the
* current one is a member of this set. If not, drop down and complete
* processing. If so, skip it and continue on to the next one. */
argv = exclude;
while(argv && *argv) {
/* When looking for interfaces to exclude, we only look at
* the number of characters equal to what the user provided.
* For example, excluding "lo" excludes "lo", "lo0" and
* anything that starts with "lo" */
if(0 == strncmp(*argv, local_if_name, strlen(*argv))) {
skip = true;
break;
}
argv++;
}
}
if (true == skip) {
/* This interface is not part of the requested set, so skip it */
continue;
}
kindex = opal_ifindextokindex(idx);
index = local_kindex_to_index[kindex];
/* create entry for this kernel index previously not seen */
if(-1 == index) {
index = num_local_interfaces++;
local_kindex_to_index[kindex] = index;
if( num_local_interfaces == max_local_interfaces ) {
max_local_interfaces <<= 1;
local_interfaces = (mca_btl_tcp2_interface_t**)realloc( local_interfaces,
max_local_interfaces * sizeof(mca_btl_tcp2_interface_t*) );
if( NULL == local_interfaces )
return NULL;
}
local_interfaces[index] = (mca_btl_tcp2_interface_t *) malloc(sizeof(mca_btl_tcp2_interface_t));
assert(NULL != local_interfaces[index]);
mca_btl_tcp2_initialise_interface(local_interfaces[index], kindex, index);
}
switch(local_addr.ss_family) {
case AF_INET:
/* if AF is disabled, skip it completely */
if (4 == mca_btl_tcp2_component.tcp_disable_family) {
continue;
}
local_interfaces[local_kindex_to_index[kindex]]->ipv4_address =
(struct sockaddr_storage*) malloc(sizeof(local_addr));
memcpy(local_interfaces[local_kindex_to_index[kindex]]->ipv4_address,
&local_addr, sizeof(local_addr));
opal_ifindextomask(idx,
&local_interfaces[local_kindex_to_index[kindex]]->ipv4_netmask,
sizeof(int));
break;
case AF_INET6:
/* if AF is disabled, skip it completely */
if (6 == mca_btl_tcp2_component.tcp_disable_family) {
continue;
}
local_interfaces[local_kindex_to_index[kindex]]->ipv6_address
= (struct sockaddr_storage*) malloc(sizeof(local_addr));
memcpy(local_interfaces[local_kindex_to_index[kindex]]->ipv6_address,
&local_addr, sizeof(local_addr));
opal_ifindextomask(idx,
&local_interfaces[local_kindex_to_index[kindex]]->ipv6_netmask,
sizeof(int));
break;
default:
opal_output(0, "unknown address family for tcp: %d\n",
local_addr.ss_family);
}
}
opal_argv_free(include);
opal_argv_free(exclude);
return local_interfaces;
}
/*
* Note that this routine must be called with the lock on the process
* already held. Insert a btl instance into the proc array and assign
* it an address.
*/
int mca_btl_tcp2_proc_insert( mca_btl_tcp2_proc_t* btl_proc,
mca_btl_base_endpoint_t* btl_endpoint )
{
struct sockaddr_storage endpoint_addr_ss;
unsigned int perm_size;
int rc, *a = NULL;
size_t i, j;
#ifndef WORDS_BIGENDIAN
/* if we are little endian and our peer is not so lucky, then we
need to put all information sent to him in big endian (aka
Network Byte Order) and expect all information received to
be in NBO. Since big endian machines always send and receive
in NBO, we don't care so much about that case. */
if (btl_proc->proc_ompi->proc_arch & OPAL_ARCH_ISBIGENDIAN) {
btl_endpoint->endpoint_nbo = true;
}
#endif
/* insert into endpoint array */
btl_endpoint->endpoint_proc = btl_proc;
btl_proc->proc_endpoints[btl_proc->proc_endpoint_count++] = btl_endpoint;
/* sanity checks */
if( NULL == local_interfaces ) {
if( NULL == mca_btl_tcp2_retrieve_local_interfaces() )
return OMPI_ERR_OUT_OF_RESOURCE;
}
if( 0 == num_local_interfaces ) {
return OMPI_ERR_UNREACH;
}
if( NULL == peer_interfaces ) {
max_peer_interfaces = max_local_interfaces;
peer_interfaces = (mca_btl_tcp2_interface_t**)malloc( max_peer_interfaces * sizeof(mca_btl_tcp2_interface_t*) );
}
num_peer_interfaces = 0;
memset(peer_kindex_to_index, -1, sizeof(int)*MAX_KERNEL_INTERFACE_INDEX);
memset(peer_interfaces, 0, max_peer_interfaces * sizeof(mca_btl_tcp2_interface_t*));
/*
* identify all kernel interfaces and the associated addresses of
* the peer
*/
for( i = 0; i < btl_proc->proc_addr_count; i++ ) {
int index;
mca_btl_tcp2_addr_t* endpoint_addr = btl_proc->proc_addrs + i;
mca_btl_tcp2_proc_tosocks (endpoint_addr, &endpoint_addr_ss);
index = peer_kindex_to_index[endpoint_addr->addr_ifkindex];
if(-1 == index) {
index = num_peer_interfaces++;
peer_kindex_to_index[endpoint_addr->addr_ifkindex] = index;
if( num_peer_interfaces == max_peer_interfaces ) {
max_peer_interfaces <<= 1;
peer_interfaces = (mca_btl_tcp2_interface_t**)realloc( peer_interfaces,
max_peer_interfaces * sizeof(mca_btl_tcp2_interface_t*) );
if( NULL == peer_interfaces )
return OMPI_ERR_OUT_OF_RESOURCE;
}
peer_interfaces[index] = (mca_btl_tcp2_interface_t *) malloc(sizeof(mca_btl_tcp2_interface_t));
mca_btl_tcp2_initialise_interface(peer_interfaces[index],
endpoint_addr->addr_ifkindex, index);
}
/*
* in case one of the peer addresses is already in use,
* mark the complete peer interface as 'not available'
*/
if(endpoint_addr->addr_inuse) {
peer_interfaces[index]->inuse = 1;
}
switch(endpoint_addr_ss.ss_family) {
case AF_INET:
peer_interfaces[index]->ipv4_address = (struct sockaddr_storage*) malloc(sizeof(endpoint_addr_ss));
peer_interfaces[index]->ipv4_endpoint_addr = endpoint_addr;
memcpy(peer_interfaces[index]->ipv4_address,
&endpoint_addr_ss, sizeof(endpoint_addr_ss));
break;
case AF_INET6:
peer_interfaces[index]->ipv6_address = (struct sockaddr_storage*) malloc(sizeof(endpoint_addr_ss));
peer_interfaces[index]->ipv6_endpoint_addr = endpoint_addr;
memcpy(peer_interfaces[index]->ipv6_address,
&endpoint_addr_ss, sizeof(endpoint_addr_ss));
break;
default:
opal_output(0, "unknown address family for tcp: %d\n",
endpoint_addr_ss.ss_family);
/*
* return OMPI_UNREACH or some error, as this is not
* good
*/
}
}
/*
* assign weights to each possible pair of interfaces
*/
perm_size = num_local_interfaces;
if(num_peer_interfaces > perm_size) {
perm_size = num_peer_interfaces;
}
weights = (enum mca_btl_tcp2_connection_quality**) malloc(perm_size
* sizeof(enum mca_btl_tcp2_connection_quality*));
best_addr = (mca_btl_tcp2_addr_t ***) malloc(perm_size
* sizeof(mca_btl_tcp2_addr_t **));
for(i = 0; i < perm_size; ++i) {
weights[i] = (enum mca_btl_tcp2_connection_quality*) malloc(perm_size *
sizeof(enum mca_btl_tcp2_connection_quality));
memset(weights[i], 0, perm_size * sizeof(enum mca_btl_tcp2_connection_quality));
best_addr[i] = (mca_btl_tcp2_addr_t **) malloc(perm_size *
sizeof(mca_btl_tcp2_addr_t *));
memset(best_addr[i], 0, perm_size * sizeof(mca_btl_tcp2_addr_t *));
}
for(i=0; i<num_local_interfaces; ++i) {
for(j=0; j<num_peer_interfaces; ++j) {
/* initially, assume no connection is possible */
weights[i][j] = CQ_NO_CONNECTION;
/* check state of ipv4 address pair */
if(NULL != local_interfaces[i]->ipv4_address &&
NULL != peer_interfaces[j]->ipv4_address) {
/* check for loopback */
if ((opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv4_address)
&& !opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv4_address))
|| (opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv4_address)
&& !opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv4_address))
|| (opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv4_address)
&& !opal_ifislocal(btl_proc->proc_ompi->proc_hostname))) {
/* No connection is possible on these interfaces */
/* check for RFC1918 */
} else if(opal_net_addr_isipv4public((struct sockaddr*) local_interfaces[i]->ipv4_address)
&& opal_net_addr_isipv4public((struct sockaddr*)
peer_interfaces[j]->ipv4_address)) {
if(opal_net_samenetwork((struct sockaddr*) local_interfaces[i]->ipv4_address,
(struct sockaddr*) peer_interfaces[j]->ipv4_address,
local_interfaces[i]->ipv4_netmask)) {
weights[i][j] = CQ_PUBLIC_SAME_NETWORK;
} else {
weights[i][j] = CQ_PUBLIC_DIFFERENT_NETWORK;
}
best_addr[i][j] = peer_interfaces[j]->ipv4_endpoint_addr;
continue;
} else {
if(opal_net_samenetwork((struct sockaddr*) local_interfaces[i]->ipv4_address,
(struct sockaddr*) peer_interfaces[j]->ipv4_address,
local_interfaces[i]->ipv4_netmask)) {
weights[i][j] = CQ_PRIVATE_SAME_NETWORK;
} else {
weights[i][j] = CQ_PRIVATE_DIFFERENT_NETWORK;
}
best_addr[i][j] = peer_interfaces[j]->ipv4_endpoint_addr;
}
}
/* check state of ipv6 address pair - ipv6 is always public,
* since link-local addresses are skipped in opal_ifinit()
*/
if(NULL != local_interfaces[i]->ipv6_address &&
NULL != peer_interfaces[j]->ipv6_address) {
/* check for loopback */
if ((opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv6_address)
&& !opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv6_address))
|| (opal_net_islocalhost((struct sockaddr *)peer_interfaces[j]->ipv6_address)
&& !opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv6_address))
|| (opal_net_islocalhost((struct sockaddr *)local_interfaces[i]->ipv6_address)
&& !opal_ifislocal(btl_proc->proc_ompi->proc_hostname))) {
/* No connection is possible on these interfaces */
} else if(opal_net_samenetwork((struct sockaddr*) local_interfaces[i]->ipv6_address,
(struct sockaddr*) peer_interfaces[j]->ipv6_address,
local_interfaces[i]->ipv6_netmask)) {
weights[i][j] = CQ_PUBLIC_SAME_NETWORK;
} else {
weights[i][j] = CQ_PUBLIC_DIFFERENT_NETWORK;
}
best_addr[i][j] = peer_interfaces[j]->ipv6_endpoint_addr;
}
} /* for each peer interface */
} /* for each local interface */
/*
* determine the size of the set to permute (max number of
* interfaces
*/
best_assignment = (unsigned int *) malloc (perm_size * sizeof(int));
a = (int *) malloc(perm_size * sizeof(int));
if (NULL == a) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Can only find the best set of connections when the number of
* interfaces is not too big. When it gets larger, we fall back
* to a simpler and faster (and not as optimal) algorithm.
* See ticket https://svn.open-mpi.org/trac/ompi/ticket/2031
* for more details about this issue. */
if (perm_size <= MAX_PERMUTATION_INTERFACES) {
memset(a, 0, perm_size * sizeof(int));
max_assignment_cardinality = -1;
max_assignment_weight = -1;
visit(0, -1, perm_size, a);
rc = OMPI_ERR_UNREACH;
for(i = 0; i < perm_size; ++i) {
if(best_assignment[i] > num_peer_interfaces
|| weights[i][best_assignment[i]] == CQ_NO_CONNECTION
|| peer_interfaces[best_assignment[i]]->inuse
|| NULL == peer_interfaces[best_assignment[i]]) {
continue;
}
peer_interfaces[best_assignment[i]]->inuse++;
btl_endpoint->endpoint_addr = best_addr[i][best_assignment[i]];
btl_endpoint->endpoint_addr->addr_inuse++;
rc = OMPI_SUCCESS;
break;
}
} else {
enum mca_btl_tcp2_connection_quality max;
int i_max = 0, j_max = 0;
/* Find the best connection that is not in use. Save away
* the indices of the best location. */
max = CQ_NO_CONNECTION;
for(i=0; i<num_local_interfaces; ++i) {
for(j=0; j<num_peer_interfaces; ++j) {
if (!peer_interfaces[j]->inuse) {
if (weights[i][j] > max) {
max = weights[i][j];
i_max = i;
j_max = j;
}
}
}
}
/* Now see if there is a some type of connection available. */
rc = OMPI_ERR_UNREACH;
if (CQ_NO_CONNECTION != max) {
peer_interfaces[j_max]->inuse++;
btl_endpoint->endpoint_addr = best_addr[i_max][j_max];
btl_endpoint->endpoint_addr->addr_inuse++;
rc = OMPI_SUCCESS;
}
}
for(i = 0; i < perm_size; ++i) {
free(weights[i]);
free(best_addr[i]);
}
for(i = 0; i < num_peer_interfaces; ++i) {
if(NULL != peer_interfaces[i]->ipv4_address) {
free(peer_interfaces[i]->ipv4_address);
}
if(NULL != peer_interfaces[i]->ipv6_address) {
free(peer_interfaces[i]->ipv6_address);
}
free(peer_interfaces[i]);
}
free(peer_interfaces);
peer_interfaces = NULL;
max_peer_interfaces = 0;
for(i = 0; i < num_local_interfaces; ++i) {
if(NULL != local_interfaces[i]->ipv4_address) {
free(local_interfaces[i]->ipv4_address);
}
if(NULL != local_interfaces[i]->ipv6_address) {
free(local_interfaces[i]->ipv6_address);
}
free(local_interfaces[i]);
}
free(local_interfaces);
local_interfaces = NULL;
max_local_interfaces = 0;
free(weights);
free(best_addr);
free(best_assignment);
free(a);
return rc;
}
/*
* Remove an endpoint from the proc array and indicate the address is
* no longer in use.
*/
int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t* btl_proc, mca_btl_base_endpoint_t* btl_endpoint)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
for(i=0; i<btl_proc->proc_endpoint_count; i++) {
if(btl_proc->proc_endpoints[i] == btl_endpoint) {
memmove(btl_proc->proc_endpoints+i, btl_proc->proc_endpoints+i+1,
(btl_proc->proc_endpoint_count-i-1)*sizeof(mca_btl_base_endpoint_t*));
if(--btl_proc->proc_endpoint_count == 0) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
OBJ_RELEASE(btl_proc);
return OMPI_SUCCESS;
}
/* The endpoint_addr may still be NULL if this enpoint is
being removed early in the wireup sequence (e.g., if it
is unreachable by all other procs) */
if (NULL != btl_endpoint->endpoint_addr) {
btl_endpoint->endpoint_addr->addr_inuse--;
}
break;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return OMPI_SUCCESS;
}
/*
* Look for an existing TCP process instance based on the globally unique
* process identifier.
*/
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t *name)
{
mca_btl_tcp2_proc_t* proc = NULL;
OPAL_THREAD_LOCK(&mca_btl_tcp2_component.tcp_lock);
opal_hash_table_get_value_uint64(&mca_btl_tcp2_component.tcp_procs,
orte_util_hash_name(name), (void**)&proc);
OPAL_THREAD_UNLOCK(&mca_btl_tcp2_component.tcp_lock);
return proc;
}
/*
* loop through all available BTLs for one matching the source address
* of the request.
*/
bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t* btl_proc, struct sockaddr* addr, int sd)
{
size_t i;
OPAL_THREAD_LOCK(&btl_proc->proc_lock);
for( i = 0; i < btl_proc->proc_endpoint_count; i++ ) {
mca_btl_base_endpoint_t* btl_endpoint = btl_proc->proc_endpoints[i];
/* Check all conditions before going to try to accept the connection. */
if( btl_endpoint->endpoint_addr->addr_family != addr->sa_family ) {
continue;
}
switch (addr->sa_family) {
case AF_INET:
if( memcmp( &btl_endpoint->endpoint_addr->addr_inet,
&(((struct sockaddr_in*)addr)->sin_addr),
sizeof(struct in_addr) ) ) {
continue;
}
break;
#if OPAL_WANT_IPV6
case AF_INET6:
if( memcmp( &btl_endpoint->endpoint_addr->addr_inet,
&(((struct sockaddr_in6*)addr)->sin6_addr),
sizeof(struct in6_addr) ) ) {
continue;
}
break;
#endif
default:
;
}
if(mca_btl_tcp2_endpoint_accept(btl_endpoint, addr, sd)) {
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return true;
}
}
OPAL_THREAD_UNLOCK(&btl_proc->proc_lock);
return false;
}
/*
* convert internal data structure (mca_btl_tcp2_addr_t) to sockaddr_storage
*
*/
bool mca_btl_tcp2_proc_tosocks(mca_btl_tcp2_addr_t* proc_addr,
struct sockaddr_storage* output)
{
memset(output, 0, sizeof (*output));
switch (proc_addr->addr_family) {
case AF_INET:
output->ss_family = AF_INET;
memcpy(&((struct sockaddr_in*)output)->sin_addr,
&proc_addr->addr_inet, sizeof(struct in_addr));
((struct sockaddr_in*)output)->sin_port = proc_addr->addr_port;
break;
#if OPAL_WANT_IPV6
case AF_INET6:
{
struct sockaddr_in6* inaddr = (struct sockaddr_in6*)output;
output->ss_family = AF_INET6;
memcpy(&inaddr->sin6_addr, &proc_addr->addr_inet,
sizeof (proc_addr->addr_inet));
inaddr->sin6_port = proc_addr->addr_port;
inaddr->sin6_scope_id = 0;
inaddr->sin6_flowinfo = 0;
}
break;
#endif
default:
opal_output( 0, "mca_btl_tcp2_proc: unknown af_family received: %d\n",
proc_addr->addr_family );
return false;
}
return true;
}

Просмотреть файл

@ -0,0 +1,127 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MCA_BTL_TCP_PROC_H
#define MCA_BTL_TCP_PROC_H
#include "opal/class/opal_object.h"
#include "ompi/proc/proc.h"
#include "orte/types.h"
#include "btl_tcp2.h"
#include "btl_tcp2_addr.h"
#include "btl_tcp2_endpoint.h"
BEGIN_C_DECLS
/**
* Represents the state of a remote process and the set of addresses
* that it exports. Also cache an instance of mca_btl_base_endpoint_t for
* each
* BTL instance that attempts to open a connection to the process.
*/
struct mca_btl_tcp2_proc_t {
opal_list_item_t super;
/**< allow proc to be placed on a list */
ompi_proc_t *proc_ompi;
/**< pointer to corresponding ompi_proc_t */
struct mca_btl_tcp2_addr_t* proc_addrs;
/**< array of addresses exported by peer */
size_t proc_addr_count;
/**< number of addresses published by endpoint */
struct mca_btl_base_endpoint_t **proc_endpoints;
/**< array of endpoints that have been created to access this proc */
size_t proc_endpoint_count;
/**< number of endpoints */
opal_mutex_t proc_lock;
/**< lock to protect against concurrent access to proc state */
};
typedef struct mca_btl_tcp2_proc_t mca_btl_tcp2_proc_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp2_proc_t);
/* the highest possible interface kernel index we can handle */
#define MAX_KERNEL_INTERFACE_INDEX 65536
/* the maximum number of kernel interfaces we can handle */
#define MAX_KERNEL_INTERFACES 8
/* The maximum number of interfaces that we can have and use the
* recursion code for determining the best set of connections. When
* the number is greater than this, we switch to a simpler algorithm
* to speed things up. */
#define MAX_PERMUTATION_INTERFACES 8
/*
* FIXME: this should probably be part of an ompi list, so we need the
* appropriate definitions
*/
struct mca_btl_tcp2_interface_t {
struct sockaddr_storage* ipv4_address;
struct sockaddr_storage* ipv6_address;
mca_btl_tcp2_addr_t* ipv4_endpoint_addr;
mca_btl_tcp2_addr_t* ipv6_endpoint_addr;
uint32_t ipv4_netmask;
uint32_t ipv6_netmask;
int kernel_index;
int peer_interface;
int index;
int inuse;
};
typedef struct mca_btl_tcp2_interface_t mca_btl_tcp2_interface_t;
/*
* describes the quality of a possible connection between a local and
* a remote network interface
*/
enum mca_btl_tcp2_connection_quality {
CQ_NO_CONNECTION,
CQ_PRIVATE_DIFFERENT_NETWORK,
CQ_PRIVATE_SAME_NETWORK,
CQ_PUBLIC_DIFFERENT_NETWORK,
CQ_PUBLIC_SAME_NETWORK
};
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_create(ompi_proc_t* ompi_proc);
mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_lookup(const orte_process_name_t* name);
int mca_btl_tcp2_proc_insert(mca_btl_tcp2_proc_t*, mca_btl_base_endpoint_t*);
int mca_btl_tcp2_proc_remove(mca_btl_tcp2_proc_t*, mca_btl_base_endpoint_t*);
bool mca_btl_tcp2_proc_accept(mca_btl_tcp2_proc_t*, struct sockaddr*, int);
bool mca_btl_tcp2_proc_tosocks(mca_btl_tcp2_addr_t*, struct sockaddr_storage*);
/**
* Inlined function to return local TCP proc instance.
*/
static inline mca_btl_tcp2_proc_t* mca_btl_tcp2_proc_local(void)
{
if(NULL == mca_btl_tcp2_component.tcp_local)
mca_btl_tcp2_component.tcp_local = mca_btl_tcp2_proc_create(ompi_proc_local());
return mca_btl_tcp2_component.tcp_local;
}
END_C_DECLS
#endif

Просмотреть файл

@ -0,0 +1,194 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Normal Autotools setup stuff
AC_INIT([openmpi_btl_tcp2], [1.0.0],
[http://example.com/help], [openmpi_btl_tcp2])
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_MACRO_DIR(config)
AM_INIT_AUTOMAKE([foreign dist-bzip2 no-define 1.11])
AM_SILENT_RULES([yes])
AC_LANG([C])
AH_TOP([/* -*- c -*-
* Automatically-generated header file from configure.
*/
#ifndef BTL_TCP2_CONFIG_H
#define BTL_TCP2_CONFIG_H
])
AH_BOTTOM([
#endif /* BTL_TCP2_CONFIG_H */
])
AM_CONFIG_HEADER([btl_tcp2_config.h])
CFLAGS_save="$CFLAGS"
AC_PROG_CC
CFLAGS=$CFLAGS_SAVE
# Allow the OMPI header and lib dirs to be specified in shell
# variables (e.g., CPPFLAGS and LDFLAGS) or via command line options.
# Use an existing Open MPI installation tree
AC_ARG_WITH([openmpi-install],
[AS_HELP_STRING([--with-openmpi-install=DIR],
[Specify the --prefix directory used with a "--with-devel-headers" Open MPI installation])],
[
# Check for the header directories
AC_MSG_CHECKING([for Open MPI install dir flags])
flags="`$withval/bin/mpicc --showme:incdirs`"
result=
for f in $flags; do
result="-I$f $result"
done
CPPFLAGS="$CPPFLAGS $result"
AC_MSG_RESULT([not found; double check --with-openmpi-install value])
# Check for the library directories
AC_MSG_CHECKING([for Open MPI install dir LD flags])
flags="`$withval/bin/mpicc --showme:libdirs`"
result=
for f in $flags; do
result="-L$f $result"
done
LDFLAGS="$LDFLAGS $result"
AC_MSG_RESULT([$result])
# Check for the library files
AC_MSG_CHECKING([for Open MPI install dir libs])
flags="`$withval/bin/mpicc --showme:libs`"
result=
for f in $flags; do
result="-l$f $result"
done
LIBS="$LIBS $result"
AC_MSG_RESULT([not found; double check --with-openmpi-install value])
])
# Use an existing Open MPI source tree (assuming that is configured
# and built already)
AC_ARG_WITH([openmpi-source],
[AS_HELP_STRING([--with-openmpi-source=DIR],
[Specify the top directory for the Open MPI source tree])],
[
# This works with v1.4 and v1.5, but not with trunk
# because trunk has the new build opal/event stuff. This
# means that there is a variable file to be included that
# is selected by the winning event component. I don't
# know yet how to -I this file properly. In a
# --with-devel-headers, this is easy -- I just slurp all
# flags from the wrapper compiler. But in a source tree
# case, how do I know which component won and what -I's
# to add? In the current trunk, libevent207 provides a
# nice include file that is relative to the root of the
# source tree. That's good. But then that file ends up
# including <event2/event-config.h>, which is most
# definitely *not* relative to the top of the OMPI source
# tree -- it's relative to the libevent207 tree. This is
# handled properly by libevent207's configure.m4 when
# building in-tree, but when we're building
# out-of-the-tree (like here), how the heck are we
# supposed to know what -I's to add? This is the problem
# that needs to be solved. Probably not *too* hard to
# fix, but I'm outta time today...
AC_MSG_CHECKING([for Open MPI source tree])
AS_IF([test -d "$withval" -a -d "$withval/opal" -a -d "$withval/orte" -a -d "$withval/ompi" -a -f "$withval/VERSION" -a -f "$withval/Makefile.man-page-rules" -a -f "$withval/opal/include/opal_config.h" -a -x "$withval/config.status"],
[AC_MSG_RESULT([$withval])],
[AC_MSG_RESULT([not found; double check --with-openmpi-source value])
AC_MSG_ERROR([Cannot continue])])
# Run the config.status in the source tree to extract the
# CPPFLAGS, CFLAGS, LDFLAGS, and LIBS.
AC_MSG_CHECKING([for Open MPI source tree flags])
file=source-flags.sh
rm -f $file.in
cat > $file.in <<EOF
:
source_CPPFLAGS="@CPPFLAGS@"
source_CFLAGS="@CFLAGS@"
EOF
$withval/config.status --file=${file}:$file.in > /dev/null
AS_IF([test "$?" != "0"],
[AC_MSG_WARN([config.status from the Open MPI source tree did not run cleanly])
AC_MSG_WARN([May experience problems later in the build...])])
# The flags will contain -I$(top_srcdir). Filter that out
sed -e 's/-I$(top_srcdir)//' source-flags.sh > source-flags-filtered
cp -f source-flags-filtered source-flags.sh
rm -f source-flags-filtered
chmod +x $file
. ./$file
rm -f $file $file.in
AC_MSG_RESULT([found])
echo " --> CPPFLAGS:" $source_CPPFLAGS
echo " --> CFLAGS:" $source_CFLAGS
CPPFLAGS="$CPPFLAGS -I$withval -I$withval/opal/include -I$withval/orte/include -I$withval/ompi/include"
# Open MPI v1.7 libraries
LDFLAGS="$LDFLAGS -L$withval/ompi/.libs"
# For OMPI v1.7 and later
LIBS="$LIBS -lmpi"
CPPFLAGS="$CPPFLAGS $source_CPPFLAGS"
CFLAGS="$CFLAGS $source_CFLAGS"
])
# Check for the OMPI header files and libraries
AC_CHECK_HEADER([ompi_config.h], [],
[AC_MSG_WARN([Cannot find ompi_config.h])
AC_MSG_ERROR([Cannot continue])])
AC_CHECK_HEADER([ompi/runtime/mpiruntime.h], [],
[AC_MSG_WARN([Cannot find ompi/runtime/mpiruntime.h])
AC_MSG_ERROR([Cannot continue])])
AC_CHECK_FUNC([mca_btl_base_open], [],
[AC_MSG_WARN([Could not find mca_btl_base_open])
AC_MSG_ERROR([Cannot continue])])
AC_CHECK_FUNC([orte_show_help], [],
[AC_MSG_WARN([Could not find orte_show_help])
AC_MSG_ERROR([Cannot continue])])
# Check for types we need for this component
AC_CHECK_HEADERS([netinet/in.h])
AC_CHECK_TYPES([struct sockaddr_in], [],
[AC_MSG_WARN([No struct sockaddr_in])
AC_MSG_ERROR([Cannot continue])],
[AC_INCLUDES_DEFAULT
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif])
LT_INIT([disable-static])
# Party on
AC_CONFIG_FILES([Makefile])
AC_OUTPUT

Просмотреть файл

@ -0,0 +1,27 @@
# -*- text -*-
#
# Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# This is the US/English help file for Open MPI's TCP support
# (the openib BTL).
#
[invalid if_inexclude]
WARNING: An invalid value was given for btl_tcp2_if_%s. This
value will be ignored.
Local host: %s
Value: %s
Message: %s
%
[invalid minimum port]
WARNING: An invalid value was given for the btl_tcp2_port_min_%s. Legal
values are in the range [1 .. 2^16-1]. This value will be ignored
(reset to the default value of 1024).
Local host: %s
Value: %d