2011-02-02 20:11:33 +03:00
/*
* Copyright ( c ) 2004 - 2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation . All rights reserved .
* Copyright ( c ) 2004 - 2009 The University of Tennessee and The University
* of Tennessee Research Foundation . All rights
* reserved .
* Copyright ( c ) 2004 - 2005 High Performance Computing Center Stuttgart ,
* University of Stuttgart . All rights reserved .
* Copyright ( c ) 2004 - 2005 The Regents of the University of California .
* All rights reserved .
2011-02-02 20:25:56 +03:00
* Copyright ( c ) 2007 - 2010 Cisco Systems , Inc . All rights reserved .
2011-02-02 20:11:33 +03:00
* Copyright ( c ) 2008 Sun Microsystems , Inc . All rights reserved .
* Copyright ( c ) 2009 Oak Ridge National Laboratory
* $ COPYRIGHT $
*
* Additional copyrights may follow
*
* $ HEADER $
*
*/
# include "ompi_config.h"
# include "opal/opal_socket_errno.h"
# ifdef HAVE_UNISTD_H
# include <unistd.h>
# endif
# include <string.h>
# include <fcntl.h>
# ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
# endif
# ifdef HAVE_SYS_SOCKET_H
# include <sys/socket.h>
# endif
# ifdef HAVE_NETINET_IN_H
# include <netinet/in.h>
# endif
# ifdef HAVE_ARPA_INET_H
# include <arpa/inet.h>
# endif
# if OPAL_WANT_IPV6
# ifdef HAVE_NETDB_H
# include <netdb.h>
# endif
# endif
# include <ctype.h>
# include <limits.h>
# include "ompi/constants.h"
# include "opal/mca/event/event.h"
# include "opal/util/if.h"
# include "opal/util/output.h"
# include "opal/util/argv.h"
# include "opal/util/net.h"
# include "opal/util/opal_sos.h"
# include "orte/types.h"
# include "orte/util/show_help.h"
# include "ompi/mca/btl/btl.h"
# include "opal/mca/base/mca_base_param.h"
# include "ompi/runtime/ompi_module_exchange.h"
# include "ompi/mca/mpool/base/base.h"
# include "ompi/mca/btl/base/btl_base_error.h"
# include "btl_tcp2.h"
# include "btl_tcp2_addr.h"
# include "btl_tcp2_proc.h"
# include "btl_tcp2_frag.h"
# include "btl_tcp2_endpoint.h"
# include "ompi/mca/btl/base/base.h"
mca_btl_tcp2_component_t mca_btl_tcp2_component = {
{
/* First, the mca_base_component_t struct containing meta information
about the component itself */
{
MCA_BTL_BASE_VERSION_2_0_0 ,
" tcp2 " , /* MCA component name */
OMPI_MAJOR_VERSION , /* MCA component major version */
OMPI_MINOR_VERSION , /* MCA component minor version */
OMPI_RELEASE_VERSION , /* MCA component release version */
mca_btl_tcp2_component_open , /* component open */
mca_btl_tcp2_component_close /* component close */
} ,
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
} ,
mca_btl_tcp2_component_init ,
NULL ,
}
} ;
/*
* utility routines for parameter registration
*/
static inline char * mca_btl_tcp2_param_register_string (
const char * param_name ,
const char * help_string ,
const char * default_value )
{
char * value ;
mca_base_param_reg_string ( & mca_btl_tcp2_component . super . btl_version ,
param_name , help_string , false , false ,
default_value , & value ) ;
return value ;
}
static inline int mca_btl_tcp2_param_register_int (
const char * param_name ,
const char * help_string ,
int default_value )
{
int value ;
mca_base_param_reg_int ( & mca_btl_tcp2_component . super . btl_version ,
param_name , help_string , false , false ,
default_value , & value ) ;
return value ;
}
/*
* Data structure for accepting connections .
*/
struct mca_btl_tcp2_event_t {
opal_list_item_t item ;
opal_event_t event ;
} ;
typedef struct mca_btl_tcp2_event_t mca_btl_tcp2_event_t ;
static void mca_btl_tcp2_event_construct ( mca_btl_tcp2_event_t * event )
{
OPAL_THREAD_LOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
opal_list_append ( & mca_btl_tcp2_component . tcp_events , & event - > item ) ;
OPAL_THREAD_UNLOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
}
static void mca_btl_tcp2_event_destruct ( mca_btl_tcp2_event_t * event )
{
OPAL_THREAD_LOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
opal_list_remove_item ( & mca_btl_tcp2_component . tcp_events , & event - > item ) ;
OPAL_THREAD_UNLOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
}
OBJ_CLASS_INSTANCE (
mca_btl_tcp2_event_t ,
opal_list_item_t ,
mca_btl_tcp2_event_construct ,
mca_btl_tcp2_event_destruct ) ;
/*
* functions for receiving event callbacks
*/
static void mca_btl_tcp2_component_recv_handler ( int , short , void * ) ;
static void mca_btl_tcp2_component_accept_handler ( int , short , void * ) ;
/*
* Called by MCA framework to open the component , registers
* component parameters .
*/
int mca_btl_tcp2_component_open ( void )
{
char * message ;
/* initialize state */
mca_btl_tcp2_component . tcp_listen_sd = - 1 ;
# if OPAL_WANT_IPV6
mca_btl_tcp2_component . tcp6_listen_sd = - 1 ;
# endif
mca_btl_tcp2_component . tcp_num_btls = 0 ;
mca_btl_tcp2_component . tcp_addr_count = 0 ;
mca_btl_tcp2_component . tcp_btls = NULL ;
/* initialize objects */
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_lock , opal_mutex_t ) ;
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_procs , opal_hash_table_t ) ;
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_events , opal_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_frag_eager , ompi_free_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_frag_max , ompi_free_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_tcp2_component . tcp_frag_user , ompi_free_list_t ) ;
opal_hash_table_init ( & mca_btl_tcp2_component . tcp_procs , 256 ) ;
/* register TCP component parameters */
mca_btl_tcp2_component . tcp_num_links =
mca_btl_tcp2_param_register_int ( " links " , NULL , 1 ) ;
mca_btl_tcp2_component . tcp_if_include =
mca_btl_tcp2_param_register_string ( " if_include " , " Comma-delimited list of devices or CIDR notation of networks to use for MPI communication (e.g., \" eth0,eth1 \" or \" 192.168.0.0/16,10.1.4.0/24 \" ). Mutually exclusive with btl_tcp2_if_exclude. " , " " ) ;
mca_btl_tcp2_component . tcp_if_exclude =
mca_btl_tcp2_param_register_string ( " if_exclude " , " Comma-delimited list of devices or CIDR notation of networks to NOT use for MPI communication -- all devices not matching these specifications will be used (e.g., \" eth0,eth1 \" or \" 192.168.0.0/16,10.1.4.0/24 \" ). Mutually exclusive with btl_tcp2_if_include. " , " lo,sppp " ) ;
mca_btl_tcp2_component . tcp_free_list_num =
mca_btl_tcp2_param_register_int ( " free_list_num " , NULL , 8 ) ;
mca_btl_tcp2_component . tcp_free_list_max =
mca_btl_tcp2_param_register_int ( " free_list_max " , NULL , - 1 ) ;
mca_btl_tcp2_component . tcp_free_list_inc =
mca_btl_tcp2_param_register_int ( " free_list_inc " , NULL , 32 ) ;
mca_btl_tcp2_component . tcp_sndbuf =
mca_btl_tcp2_param_register_int ( " sndbuf " , NULL , 128 * 1024 ) ;
mca_btl_tcp2_component . tcp_rcvbuf =
mca_btl_tcp2_param_register_int ( " rcvbuf " , NULL , 128 * 1024 ) ;
mca_btl_tcp2_component . tcp_endpoint_cache =
mca_btl_tcp2_param_register_int ( " endpoint_cache " ,
" The size of the internal cache for each TCP connection. This cache is "
" used to reduce the number of syscalls, by replacing them with memcpy. "
" Every read will read the expected data plus the amount of the "
" endpoint_cache " , 30 * 1024 ) ;
mca_btl_tcp2_component . tcp_use_nodelay =
! mca_btl_tcp2_param_register_int ( " use_nagle " , " Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency) " , 0 ) ;
mca_btl_tcp2_component . tcp_port_min =
mca_btl_tcp2_param_register_int ( " port_min_v4 " ,
" The minimum port where the TCP BTL will try to bind (default 1024) " , 1024 ) ;
if ( mca_btl_tcp2_component . tcp_port_min > USHRT_MAX ) {
orte_show_help ( " help-mpi-btl-tcp2.txt " , " invalid minimum port " ,
true , " v4 " , orte_process_info . nodename ,
mca_btl_tcp2_component . tcp_port_min ) ;
mca_btl_tcp2_component . tcp_port_min = 1024 ;
}
asprintf ( & message ,
" The number of ports where the TCP BTL will try to bind (default %d). "
" This parameter together with the port min, define a range of ports "
" where Open MPI will open sockets. " ,
( 0x1 < < 16 ) - mca_btl_tcp2_component . tcp_port_min - 1 ) ;
mca_btl_tcp2_component . tcp_port_range =
mca_btl_tcp2_param_register_int ( " port_range_v4 " , message ,
( 0x1 < < 16 ) - mca_btl_tcp2_component . tcp_port_min - 1 ) ;
free ( message ) ;
# if OPAL_WANT_IPV6
mca_btl_tcp2_component . tcp6_port_min =
mca_btl_tcp2_param_register_int ( " port_min_v6 " ,
" The minimum port where the TCP BTL will try to bind (default 1024) " , 1024 ) ;
if ( mca_btl_tcp2_component . tcp6_port_min > USHRT_MAX ) {
orte_show_help ( " help-mpi-btl-tcp2.txt " , " invalid minimum port " ,
true , " v6 " , orte_process_info . nodename ,
mca_btl_tcp2_component . tcp6_port_min ) ;
mca_btl_tcp2_component . tcp6_port_min = 1024 ;
}
asprintf ( & message ,
" The number of ports where the TCP BTL will try to bind (default %d). "
" This parameter together with the port min, define a range of ports "
" where Open MPI will open sockets. " ,
( 0x1 < < 16 ) - mca_btl_tcp2_component . tcp6_port_min - 1 ) ;
mca_btl_tcp2_component . tcp6_port_range =
mca_btl_tcp2_param_register_int ( " port_range_v6 " , message ,
( 0x1 < < 16 ) - mca_btl_tcp2_component . tcp6_port_min - 1 ) ;
free ( message ) ;
# endif
mca_btl_tcp2_module . super . btl_exclusivity = MCA_BTL_EXCLUSIVITY_LOW + 100 ;
mca_btl_tcp2_module . super . btl_eager_limit = 64 * 1024 ;
mca_btl_tcp2_module . super . btl_rndv_eager_limit = 64 * 1024 ;
mca_btl_tcp2_module . super . btl_max_send_size = 128 * 1024 ;
mca_btl_tcp2_module . super . btl_rdma_pipeline_send_length = 128 * 1024 ;
mca_btl_tcp2_module . super . btl_rdma_pipeline_frag_size = INT_MAX ;
mca_btl_tcp2_module . super . btl_min_rdma_pipeline_size = 0 ;
mca_btl_tcp2_module . super . btl_flags = MCA_BTL_FLAGS_PUT |
MCA_BTL_FLAGS_SEND_INPLACE |
MCA_BTL_FLAGS_NEED_CSUM |
MCA_BTL_FLAGS_NEED_ACK |
MCA_BTL_FLAGS_HETEROGENEOUS_RDMA ;
mca_btl_tcp2_module . super . btl_bandwidth = 100 ;
mca_btl_tcp2_module . super . btl_latency = 100 ;
mca_btl_base_param_register ( & mca_btl_tcp2_component . super . btl_version ,
& mca_btl_tcp2_module . super ) ;
mca_btl_tcp2_component . tcp_disable_family =
mca_btl_tcp2_param_register_int ( " disable_family " , NULL , 0 ) ;
return OMPI_SUCCESS ;
}
/*
* module cleanup - sanity checking of queue lengths
*/
int mca_btl_tcp2_component_close ( void )
{
opal_list_item_t * item ;
opal_list_item_t * next ;
if ( NULL ! = mca_btl_tcp2_component . tcp_if_include ) {
free ( mca_btl_tcp2_component . tcp_if_include ) ;
mca_btl_tcp2_component . tcp_if_include = NULL ;
}
if ( NULL ! = mca_btl_tcp2_component . tcp_if_exclude ) {
free ( mca_btl_tcp2_component . tcp_if_exclude ) ;
mca_btl_tcp2_component . tcp_if_exclude = NULL ;
}
if ( NULL ! = mca_btl_tcp2_component . tcp_btls )
free ( mca_btl_tcp2_component . tcp_btls ) ;
if ( mca_btl_tcp2_component . tcp_listen_sd > = 0 ) {
opal_event_del ( & mca_btl_tcp2_component . tcp_recv_event ) ;
CLOSE_THE_SOCKET ( mca_btl_tcp2_component . tcp_listen_sd ) ;
mca_btl_tcp2_component . tcp_listen_sd = - 1 ;
}
# if OPAL_WANT_IPV6
if ( mca_btl_tcp2_component . tcp6_listen_sd > = 0 ) {
opal_event_del ( & mca_btl_tcp2_component . tcp6_recv_event ) ;
CLOSE_THE_SOCKET ( mca_btl_tcp2_component . tcp6_listen_sd ) ;
mca_btl_tcp2_component . tcp6_listen_sd = - 1 ;
}
# endif
/* cleanup any pending events */
OPAL_THREAD_LOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
for ( item = opal_list_get_first ( & mca_btl_tcp2_component . tcp_events ) ;
item ! = opal_list_get_end ( & mca_btl_tcp2_component . tcp_events ) ;
item = next ) {
mca_btl_tcp2_event_t * event = ( mca_btl_tcp2_event_t * ) item ;
next = opal_list_get_next ( item ) ;
opal_event_del ( & event - > event ) ;
OBJ_RELEASE ( event ) ;
}
OPAL_THREAD_UNLOCK ( & mca_btl_tcp2_component . tcp_lock ) ;
/* release resources */
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_procs ) ;
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_events ) ;
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_frag_eager ) ;
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_frag_max ) ;
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_frag_user ) ;
OBJ_DESTRUCT ( & mca_btl_tcp2_component . tcp_lock ) ;
return OMPI_SUCCESS ;
}
/*
* Create a btl instance and add to modules list .
*/
static int mca_btl_tcp2_create ( int if_kindex , const char * if_name )
{
struct mca_btl_tcp2_module_t * btl ;
char param [ 256 ] ;
int i ;
for ( i = 0 ; i < ( int ) mca_btl_tcp2_component . tcp_num_links ; i + + ) {
btl = ( struct mca_btl_tcp2_module_t * ) malloc ( sizeof ( mca_btl_tcp2_module_t ) ) ;
if ( NULL = = btl )
return OMPI_ERR_OUT_OF_RESOURCE ;
memcpy ( btl , & mca_btl_tcp2_module , sizeof ( mca_btl_tcp2_module ) ) ;
OBJ_CONSTRUCT ( & btl - > tcp_endpoints , opal_list_t ) ;
mca_btl_tcp2_component . tcp_btls [ mca_btl_tcp2_component . tcp_num_btls + + ] = btl ;
/* initialize the btl */
btl - > tcp_ifkindex = ( uint16_t ) if_kindex ;
# if MCA_BTL_TCP_STATISTICS
btl - > tcp_bytes_recv = 0 ;
btl - > tcp_bytes_sent = 0 ;
btl - > tcp_send_handler = 0 ;
# endif
/* allow user to specify interface bandwidth */
sprintf ( param , " bandwidth_%s " , if_name ) ;
btl - > super . btl_bandwidth = mca_btl_tcp2_param_register_int ( param , NULL , btl - > super . btl_bandwidth ) ;
/* allow user to override/specify latency ranking */
sprintf ( param , " latency_%s " , if_name ) ;
btl - > super . btl_latency = mca_btl_tcp2_param_register_int ( param , NULL , btl - > super . btl_latency ) ;
if ( i > 0 ) {
btl - > super . btl_bandwidth > > = 1 ;
btl - > super . btl_latency < < = 1 ;
}
/* allow user to specify interface bandwidth */
sprintf ( param , " bandwidth_%s:%d " , if_name , i ) ;
btl - > super . btl_bandwidth = mca_btl_tcp2_param_register_int ( param , NULL , btl - > super . btl_bandwidth ) ;
/* allow user to override/specify latency ranking */
sprintf ( param , " latency_%s:%d " , if_name , i ) ;
btl - > super . btl_latency = mca_btl_tcp2_param_register_int ( param , NULL , btl - > super . btl_latency ) ;
#if 0 && OPAL_ENABLE_DEBUG
BTL_OUTPUT ( ( " interface %s instance %i: bandwidth %d latency %d \n " , if_name , i ,
btl - > super . btl_bandwidth , btl - > super . btl_latency ) ) ;
# endif
}
return OMPI_SUCCESS ;
}
/*
* Go through a list of argv ; if there are any subnet specifications
* ( a . b . c . d / e ) , resolve them to an interface name ( Currently only
* supporting IPv4 ) . If unresolvable , warn and remove .
*/
static char * * split_and_resolve ( char * * orig_str , char * name )
{
int i , ret , save , if_index ;
char * * argv , * str , * tmp ;
char if_name [ IF_NAMESIZE ] ;
struct sockaddr_storage argv_inaddr , if_inaddr ;
uint32_t argv_prefix ;
/* Sanity check */
if ( NULL = = orig_str | | NULL = = * orig_str ) {
return NULL ;
}
argv = opal_argv_split ( * orig_str , ' , ' ) ;
if ( NULL = = argv ) {
return NULL ;
}
for ( save = i = 0 ; NULL ! = argv [ i ] ; + + i ) {
if ( isalpha ( argv [ i ] [ 0 ] ) ) {
argv [ save + + ] = argv [ i ] ;
continue ;
}
/* Found a subnet notation. Convert it to an IP
address / netmask . Get the prefix first . */
argv_prefix = 0 ;
tmp = strdup ( argv [ i ] ) ;
str = strchr ( argv [ i ] , ' / ' ) ;
if ( NULL = = str ) {
orte_show_help ( " help-mpi-btl-tcp2.txt " , " invalid if_inexclude " ,
true , name , orte_process_info . nodename ,
tmp , " Invalid specification (missing \" / \" ) " ) ;
free ( argv [ i ] ) ;
free ( tmp ) ;
continue ;
}
* str = ' \0 ' ;
argv_prefix = atoi ( str + 1 ) ;
/* Now convert the IPv4 address */
( ( struct sockaddr * ) & argv_inaddr ) - > sa_family = AF_INET ;
ret = inet_pton ( AF_INET , argv [ i ] ,
& ( ( struct sockaddr_in * ) & argv_inaddr ) - > sin_addr ) ;
free ( argv [ i ] ) ;
if ( 1 ! = ret ) {
orte_show_help ( " help-mpi-btl-tcp2.txt " , " invalid if_inexclude " ,
true , name , orte_process_info . nodename , tmp ,
" Invalid specification (inet_pton() failed) " ) ;
free ( tmp ) ;
continue ;
}
opal_output_verbose ( 20 , mca_btl_base_output ,
" btl: tcp: Searching for %s address+prefix: %s / %u " ,
name ,
opal_net_get_hostname ( ( struct sockaddr * ) & argv_inaddr ) ,
argv_prefix ) ;
/* Go through all interfaces and see if we can find a match */
for ( if_index = opal_ifbegin ( ) ; if_index > = 0 ;
if_index = opal_ifnext ( if_index ) ) {
opal_ifindextoaddr ( if_index ,
( struct sockaddr * ) & if_inaddr ,
sizeof ( if_inaddr ) ) ;
if ( opal_net_samenetwork ( ( struct sockaddr * ) & argv_inaddr ,
( struct sockaddr * ) & if_inaddr ,
argv_prefix ) ) {
break ;
}
}
/* If we didn't find a match, keep trying */
if ( if_index < 0 ) {
orte_show_help ( " help-mpi-btl-tcp2.txt " , " invalid if_inexclude " ,
true , name , orte_process_info . nodename , tmp ,
" Did not find interface matching this subnet " ) ;
free ( tmp ) ;
continue ;
}
/* We found a match; get the name and replace it in the
argv */
opal_ifindextoname ( if_index , if_name , sizeof ( if_name ) ) ;
opal_output_verbose ( 20 , mca_btl_base_output ,
" btl: tcp: Found match: %s (%s) " ,
opal_net_get_hostname ( ( struct sockaddr * ) & if_inaddr ) ,
if_name ) ;
argv [ save + + ] = strdup ( if_name ) ;
free ( tmp ) ;
}
/* The list may have been compressed if there were invalid
entries , so ensure we end it with a NULL entry */
argv [ save ] = NULL ;
free ( * orig_str ) ;
* orig_str = opal_argv_join ( argv , ' , ' ) ;
return argv ;
}
/*
* Create a TCP BTL instance for either :
* ( 1 ) all interfaces specified by the user
* ( 2 ) all available interfaces
* ( 3 ) all available interfaces except for those excluded by the user
*/
static int mca_btl_tcp2_component_create_instances ( void )
{
const int if_count = opal_ifcount ( ) ;
int if_index ;
int kif_count = 0 ;
int * kindexes = NULL ; /* this array is way too large, but never too small */
char * * include ;
char * * exclude ;
char * * argv ;
int ret = OMPI_SUCCESS ;
if ( if_count < = 0 ) {
return OMPI_ERROR ;
}
kindexes = ( int * ) malloc ( sizeof ( int ) * if_count ) ;
if ( NULL = = kindexes ) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}
/* calculate the number of kernel indexes (number of physical NICs) */
{
int j ;
/* initialize array to 0. Assumption: 0 isn't a valid kernel index */
memset ( kindexes , 0 , sizeof ( int ) * if_count ) ;
/* assign the corresponding kernel indexes for all opal_list indexes
* ( loop over all addresses )
*/
for ( if_index = opal_ifbegin ( ) ; if_index > = 0 ; if_index = opal_ifnext ( if_index ) ) {
int index = opal_ifindextokindex ( if_index ) ;
if ( index > 0 ) {
bool already_seen = false ;
for ( j = 0 ; ( false = = already_seen ) & & ( j < kif_count ) ; j + + ) {
if ( kindexes [ j ] = = index ) {
already_seen = true ;
}
}
if ( false = = already_seen ) {
kindexes [ kif_count ] = index ;
kif_count + + ;
}
}
}
}
/* allocate memory for btls */
mca_btl_tcp2_component . tcp_btls = ( mca_btl_tcp2_module_t * * ) malloc ( mca_btl_tcp2_component . tcp_num_links *
kif_count * sizeof ( mca_btl_tcp2_module_t * ) ) ;
if ( NULL = = mca_btl_tcp2_component . tcp_btls ) {
ret = OMPI_ERR_OUT_OF_RESOURCE ;
goto cleanup ;
}
mca_btl_tcp2_component . tcp_addr_count = if_count ;
/* if the user specified an interface list - use these exclusively */
argv = include = split_and_resolve ( & mca_btl_tcp2_component . tcp_if_include ,
" include " ) ;
while ( argv & & * argv ) {
char * if_name = * argv ;
int if_index = opal_ifnametokindex ( if_name ) ;
if ( if_index < 0 ) {
BTL_ERROR ( ( " invalid interface \" %s \" " , if_name ) ) ;
ret = OMPI_ERR_NOT_FOUND ;
goto cleanup ;
}
mca_btl_tcp2_create ( if_index , if_name ) ;
argv + + ;
}
opal_argv_free ( include ) ;
/* If we made any modules, then the "include" list was non-empty,
and therefore we ' re done . */
if ( mca_btl_tcp2_component . tcp_num_btls > 0 ) {
ret = OMPI_SUCCESS ;
goto cleanup ;
}
/* if the interface list was not specified by the user, create
* a BTL for each interface that was not excluded .
*/
exclude = split_and_resolve ( & mca_btl_tcp2_component . tcp_if_exclude ,
" exclude " ) ;
{
int i ;
for ( i = 0 ; i < kif_count ; i + + ) {
/* IF_NAMESIZE is defined in opal/util/if.h */
char if_name [ IF_NAMESIZE ] ;
if_index = kindexes [ i ] ;
opal_ifkindextoname ( if_index , if_name , sizeof ( if_name ) ) ;
/* check to see if this interface exists in the exclude list */
argv = exclude ;
while ( argv & & * argv ) {
if ( strncmp ( * argv , if_name , strlen ( * argv ) ) = = 0 )
break ;
argv + + ;
}
/* if this interface was not found in the excluded list, create a BTL */
if ( argv = = 0 | | * argv = = 0 ) {
mca_btl_tcp2_create ( if_index , if_name ) ;
}
}
}
opal_argv_free ( exclude ) ;
cleanup :
if ( NULL ! = kindexes ) {
free ( kindexes ) ;
}
return ret ;
}
/*
* Create a listen socket and bind to all interfaces
*/
static int mca_btl_tcp2_component_create_listen ( uint16_t af_family )
{
int flags ;
int sd ;
struct sockaddr_storage inaddr ;
opal_socklen_t addrlen ;
/* create a listen socket for incoming connections */
sd = socket ( af_family , SOCK_STREAM , 0 ) ;
if ( sd < 0 ) {
if ( EAFNOSUPPORT ! = opal_socket_errno ) {
BTL_ERROR ( ( " socket() failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
}
return OMPI_ERR_IN_ERRNO ;
}
mca_btl_tcp2_set_socket_options ( sd ) ;
# if OPAL_WANT_IPV6
{
struct addrinfo hints , * res = NULL ;
int error ;
memset ( & hints , 0 , sizeof ( hints ) ) ;
hints . ai_family = af_family ;
hints . ai_socktype = SOCK_STREAM ;
hints . ai_flags = AI_PASSIVE ;
if ( ( error = getaddrinfo ( NULL , " 0 " , & hints , & res ) ) ) {
opal_output ( 0 ,
" mca_btl_tcp2_create_listen: unable to resolve. %s \n " ,
gai_strerror ( error ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
memcpy ( & inaddr , res - > ai_addr , res - > ai_addrlen ) ;
addrlen = res - > ai_addrlen ;
freeaddrinfo ( res ) ;
# ifdef IPV6_V6ONLY
/* in case of AF_INET6, disable v4-mapped addresses */
if ( AF_INET6 = = af_family ) {
int flg = 1 ;
if ( setsockopt ( sd , IPPROTO_IPV6 , IPV6_V6ONLY ,
( char * ) & flg , sizeof ( flg ) ) < 0 ) {
opal_output ( 0 ,
" mca_btl_tcp2_create_listen: unable to disable v4-mapped addresses \n " ) ;
}
}
# endif /* IPV6_V6ONLY */
}
# else
( ( struct sockaddr_in * ) & inaddr ) - > sin_family = AF_INET ;
( ( struct sockaddr_in * ) & inaddr ) - > sin_addr . s_addr = INADDR_ANY ;
addrlen = sizeof ( struct sockaddr_in ) ;
# endif
{ /* Don't reuse ports */
int flg = 0 ;
if ( setsockopt ( sd , SOL_SOCKET , SO_REUSEADDR , ( const char * ) & flg , sizeof ( flg ) ) < 0 ) {
BTL_ERROR ( ( " mca_btl_tcp2_create_listen: unable to unset the "
" SO_REUSEADDR option (%s:%d) \n " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
}
{
int index , range , port ;
range = mca_btl_tcp2_component . tcp_port_range ;
port = mca_btl_tcp2_component . tcp_port_min ;
# if OPAL_WANT_IPV6
if ( AF_INET6 = = af_family ) {
range = mca_btl_tcp2_component . tcp6_port_range ;
port = mca_btl_tcp2_component . tcp6_port_min ;
}
# endif /* OPAL_WANT_IPV6 */
for ( index = 0 ; index < range ; index + + ) {
# if OPAL_WANT_IPV6
( ( struct sockaddr_in6 * ) & inaddr ) - > sin6_port = htons ( port + index ) ;
# else
( ( struct sockaddr_in * ) & inaddr ) - > sin_port = htons ( port + index ) ;
# endif /* OPAL_WANT_IPV6 */
if ( bind ( sd , ( struct sockaddr * ) & inaddr , addrlen ) < 0 ) {
if ( ( EADDRINUSE = = opal_socket_errno ) | | ( EADDRNOTAVAIL = = opal_socket_errno ) ) {
continue ;
}
BTL_ERROR ( ( " bind() failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
goto socket_binded ;
}
if ( AF_INET = = af_family ) {
BTL_ERROR ( ( " bind() failed: no port available in the range [%d..%d] " ,
mca_btl_tcp2_component . tcp_port_min ,
mca_btl_tcp2_component . tcp_port_min + range ) ) ;
}
# if OPAL_WANT_IPV6
if ( AF_INET6 = = af_family ) {
BTL_ERROR ( ( " bind6() failed: no port available in the range [%d..%d] " ,
mca_btl_tcp2_component . tcp6_port_min ,
mca_btl_tcp2_component . tcp6_port_min + range ) ) ;
}
# endif /* OPAL_WANT_IPV6 */
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
socket_binded :
/* resolve system assignend port */
if ( getsockname ( sd , ( struct sockaddr * ) & inaddr , & addrlen ) < 0 ) {
BTL_ERROR ( ( " getsockname() failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
if ( AF_INET = = af_family ) {
mca_btl_tcp2_component . tcp_listen_port = ( ( struct sockaddr_in * ) & inaddr ) - > sin_port ;
mca_btl_tcp2_component . tcp_listen_sd = sd ;
}
# if OPAL_WANT_IPV6
if ( AF_INET6 = = af_family ) {
mca_btl_tcp2_component . tcp6_listen_port = ( ( struct sockaddr_in6 * ) & inaddr ) - > sin6_port ;
mca_btl_tcp2_component . tcp6_listen_sd = sd ;
}
# endif
/* setup listen backlog to maximum allowed by kernel */
if ( listen ( sd , SOMAXCONN ) < 0 ) {
BTL_ERROR ( ( " listen() failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
/* set socket up to be non-blocking, otherwise accept could block */
if ( ( flags = fcntl ( sd , F_GETFL , 0 ) ) < 0 ) {
BTL_ERROR ( ( " fcntl(F_GETFL) failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
} else {
flags | = O_NONBLOCK ;
if ( fcntl ( sd , F_SETFL , flags ) < 0 ) {
BTL_ERROR ( ( " fcntl(F_SETFL) failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return OMPI_ERROR ;
}
}
/* register listen port */
if ( AF_INET = = af_family ) {
opal_event_set ( opal_event_base , & mca_btl_tcp2_component . tcp_recv_event ,
mca_btl_tcp2_component . tcp_listen_sd ,
OPAL_EV_READ | OPAL_EV_PERSIST ,
mca_btl_tcp2_component_accept_handler ,
0 ) ;
opal_event_add ( & mca_btl_tcp2_component . tcp_recv_event , 0 ) ;
}
# if OPAL_WANT_IPV6
if ( AF_INET6 = = af_family ) {
opal_event_set ( opal_event_base , & mca_btl_tcp2_component . tcp6_recv_event ,
mca_btl_tcp2_component . tcp6_listen_sd ,
OPAL_EV_READ | OPAL_EV_PERSIST ,
mca_btl_tcp2_component_accept_handler ,
0 ) ;
opal_event_add ( & mca_btl_tcp2_component . tcp6_recv_event , 0 ) ;
}
# endif
return OMPI_SUCCESS ;
}
/*
* Register TCP module addressing information . The MCA framework
* will make this available to all peers .
*/
static int mca_btl_tcp2_component_exchange ( void )
{
int rc = 0 , index ;
size_t i = 0 ;
size_t size = mca_btl_tcp2_component . tcp_addr_count *
mca_btl_tcp2_component . tcp_num_links * sizeof ( mca_btl_tcp2_addr_t ) ;
/* adi@2007-04-12:
*
* We ' ll need to explain things a bit here :
* 1. We normally have as many BTLs as physical NICs .
* 2. With num_links , we now have num_btl = num_links * # NICs
* 3. we might have more than one address per NIC
*/
size_t xfer_size = 0 ; /* real size to transfer (may differ from 'size') */
size_t current_addr = 0 ;
if ( mca_btl_tcp2_component . tcp_num_btls ! = 0 ) {
mca_btl_tcp2_addr_t * addrs = ( mca_btl_tcp2_addr_t * ) malloc ( size ) ;
memset ( addrs , 0 , size ) ;
/* here we start populating our addresses */
for ( i = 0 ; i < mca_btl_tcp2_component . tcp_num_btls ; i + + ) {
for ( index = opal_ifbegin ( ) ; index > = 0 ;
index = opal_ifnext ( index ) ) {
struct sockaddr_storage my_ss ;
/* look if the address belongs to this (enabled) NIC.
* If not , go to next address
*/
if ( opal_ifindextokindex ( index ) ! =
mca_btl_tcp2_component . tcp_btls [ i ] - > tcp_ifkindex ) {
continue ;
}
if ( OPAL_SUCCESS ! =
opal_ifindextoaddr ( index , ( struct sockaddr * ) & my_ss ,
sizeof ( my_ss ) ) ) {
opal_output ( 0 ,
" btl_tcp2_component: problems getting address for index %i (kernel index %i) \n " ,
index , opal_ifindextokindex ( index ) ) ;
continue ;
}
if ( ( AF_INET = = my_ss . ss_family ) & &
( 4 ! = mca_btl_tcp2_component . tcp_disable_family ) ) {
memcpy ( & addrs [ current_addr ] . addr_inet ,
& ( ( struct sockaddr_in * ) & my_ss ) - > sin_addr ,
sizeof ( addrs [ 0 ] . addr_inet ) ) ;
addrs [ current_addr ] . addr_port =
mca_btl_tcp2_component . tcp_listen_port ;
addrs [ current_addr ] . addr_family = MCA_BTL_TCP_AF_INET ;
xfer_size + = sizeof ( mca_btl_tcp2_addr_t ) ;
addrs [ current_addr ] . addr_inuse = 0 ;
addrs [ current_addr ] . addr_ifkindex =
opal_ifindextokindex ( index ) ;
current_addr + + ;
}
# if OPAL_WANT_IPV6
if ( ( AF_INET6 = = my_ss . ss_family ) & &
( 6 ! = mca_btl_tcp2_component . tcp_disable_family ) ) {
memcpy ( & addrs [ current_addr ] . addr_inet ,
& ( ( struct sockaddr_in6 * ) & my_ss ) - > sin6_addr ,
sizeof ( addrs [ 0 ] . addr_inet ) ) ;
addrs [ current_addr ] . addr_port =
mca_btl_tcp2_component . tcp6_listen_port ;
addrs [ current_addr ] . addr_family = MCA_BTL_TCP_AF_INET6 ;
xfer_size + = sizeof ( mca_btl_tcp2_addr_t ) ;
addrs [ current_addr ] . addr_inuse = 0 ;
addrs [ current_addr ] . addr_ifkindex =
opal_ifindextokindex ( index ) ;
current_addr + + ;
}
# endif
} /* end of for opal_ifbegin() */
} /* end of for tcp_num_btls */
rc = ompi_modex_send ( & mca_btl_tcp2_component . super . btl_version ,
addrs , xfer_size ) ;
free ( addrs ) ;
} /* end if */
return rc ;
}
/*
* TCP module initialization :
* ( 1 ) read interface list from kernel and compare against module parameters
* then create a BTL instance for selected interfaces
* ( 2 ) setup TCP listen socket for incoming connection attempts
* ( 3 ) register BTL parameters with the MCA
*/
mca_btl_base_module_t * * mca_btl_tcp2_component_init ( int * num_btl_modules ,
bool enable_progress_threads ,
bool enable_mpi_threads )
{
int ret = OMPI_SUCCESS ;
mca_btl_base_module_t * * btls ;
* num_btl_modules = 0 ;
/* initialize free lists */
ompi_free_list_init_new ( & mca_btl_tcp2_component . tcp_frag_eager ,
sizeof ( mca_btl_tcp2_frag_eager_t ) +
mca_btl_tcp2_module . super . btl_eager_limit ,
opal_cache_line_size ,
OBJ_CLASS ( mca_btl_tcp2_frag_eager_t ) ,
0 , opal_cache_line_size ,
mca_btl_tcp2_component . tcp_free_list_num ,
mca_btl_tcp2_component . tcp_free_list_max ,
mca_btl_tcp2_component . tcp_free_list_inc ,
NULL ) ;
ompi_free_list_init_new ( & mca_btl_tcp2_component . tcp_frag_max ,
sizeof ( mca_btl_tcp2_frag_max_t ) +
mca_btl_tcp2_module . super . btl_max_send_size ,
opal_cache_line_size ,
OBJ_CLASS ( mca_btl_tcp2_frag_max_t ) ,
0 , opal_cache_line_size ,
mca_btl_tcp2_component . tcp_free_list_num ,
mca_btl_tcp2_component . tcp_free_list_max ,
mca_btl_tcp2_component . tcp_free_list_inc ,
NULL ) ;
ompi_free_list_init_new ( & mca_btl_tcp2_component . tcp_frag_user ,
sizeof ( mca_btl_tcp2_frag_user_t ) ,
opal_cache_line_size ,
OBJ_CLASS ( mca_btl_tcp2_frag_user_t ) ,
0 , opal_cache_line_size ,
mca_btl_tcp2_component . tcp_free_list_num ,
mca_btl_tcp2_component . tcp_free_list_max ,
mca_btl_tcp2_component . tcp_free_list_inc ,
NULL ) ;
/* create a BTL TCP module for selected interfaces */
if ( OMPI_SUCCESS ! = ( ret = mca_btl_tcp2_component_create_instances ( ) ) ) {
return 0 ;
}
/* create a TCP listen socket for incoming connection attempts */
if ( OMPI_SUCCESS ! = ( ret = mca_btl_tcp2_component_create_listen ( AF_INET ) ) ) {
return 0 ;
}
# if OPAL_WANT_IPV6
if ( ( ret = mca_btl_tcp2_component_create_listen ( AF_INET6 ) ) ! = OMPI_SUCCESS ) {
if ( ! ( OMPI_ERR_IN_ERRNO = = OPAL_SOS_GET_ERROR_CODE ( ret ) & &
EAFNOSUPPORT = = opal_socket_errno ) ) {
opal_output ( 0 , " mca_btl_tcp2_component: IPv6 listening socket failed \n " ) ;
return 0 ;
}
}
# endif
/* publish TCP parameters with the MCA framework */
if ( OMPI_SUCCESS ! = ( ret = mca_btl_tcp2_component_exchange ( ) ) ) {
return 0 ;
}
btls = ( mca_btl_base_module_t * * ) malloc ( mca_btl_tcp2_component . tcp_num_btls *
sizeof ( mca_btl_base_module_t * ) ) ;
if ( NULL = = btls ) {
return NULL ;
}
memcpy ( btls , mca_btl_tcp2_component . tcp_btls , mca_btl_tcp2_component . tcp_num_btls * sizeof ( mca_btl_tcp2_module_t * ) ) ;
* num_btl_modules = mca_btl_tcp2_component . tcp_num_btls ;
return btls ;
}
/*
* TCP module control
*/
int mca_btl_tcp2_component_control ( int param , void * value , size_t size )
{
return OMPI_SUCCESS ;
}
/**
* Called by the event engine when the listening socket has
* a connection event . Accept the incoming connection request
* and queue them for completion of the connection handshake .
*/
static void mca_btl_tcp2_component_accept_handler ( int incoming_sd ,
short ignored ,
void * unused )
{
while ( true ) {
# if OPAL_WANT_IPV6
struct sockaddr_in6 addr ;
# else
struct sockaddr_in addr ;
# endif
opal_socklen_t addrlen = sizeof ( addr ) ;
mca_btl_tcp2_event_t * event ;
int sd = accept ( incoming_sd , ( struct sockaddr * ) & addr , & addrlen ) ;
if ( sd < 0 ) {
if ( opal_socket_errno = = EINTR )
continue ;
if ( opal_socket_errno ! = EAGAIN & & opal_socket_errno ! = EWOULDBLOCK )
BTL_ERROR ( ( " accept() failed: %s (%d). " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
return ;
}
mca_btl_tcp2_set_socket_options ( sd ) ;
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW ( mca_btl_tcp2_event_t ) ;
opal_event_set ( opal_event_base , & event - > event , sd , OPAL_EV_READ , mca_btl_tcp2_component_recv_handler , event ) ;
opal_event_add ( & event - > event , 0 ) ;
}
}
/**
* Event callback when there is data available on the registered
* socket to recv . This callback is triggered only once per lifetime
* for any socket , in the beginning when we setup the handshake
* protocol .
*/
static void mca_btl_tcp2_component_recv_handler ( int sd , short flags , void * user )
{
orte_process_name_t guid ;
struct sockaddr_storage addr ;
int retval ;
mca_btl_tcp2_proc_t * btl_proc ;
opal_socklen_t addr_len = sizeof ( addr ) ;
mca_btl_tcp2_event_t * event = ( mca_btl_tcp2_event_t * ) user ;
OBJ_RELEASE ( event ) ;
/* recv the process identifier */
retval = recv ( sd , ( char * ) & guid , sizeof ( guid ) , 0 ) ;
if ( retval ! = sizeof ( guid ) ) {
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
ORTE_PROCESS_NAME_NTOH ( guid ) ;
/* now set socket up to be non-blocking */
if ( ( flags = fcntl ( sd , F_GETFL , 0 ) ) < 0 ) {
BTL_ERROR ( ( " fcntl(F_GETFL) failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
} else {
flags | = O_NONBLOCK ;
if ( fcntl ( sd , F_SETFL , flags ) < 0 ) {
BTL_ERROR ( ( " fcntl(F_SETFL) failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
}
}
/* lookup the corresponding process */
btl_proc = mca_btl_tcp2_proc_lookup ( & guid ) ;
if ( NULL = = btl_proc ) {
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
/* lookup peer address */
if ( getpeername ( sd , ( struct sockaddr * ) & addr , & addr_len ) ! = 0 ) {
BTL_ERROR ( ( " getpeername() failed: %s (%d) " ,
strerror ( opal_socket_errno ) , opal_socket_errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
/* are there any existing peer instances will to accept this connection */
if ( mca_btl_tcp2_proc_accept ( btl_proc , ( struct sockaddr * ) & addr , sd ) = = false ) {
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
}