2007-11-14 02:39:16 +03:00
/*
* Copyright ( c ) 2004 - 2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation . All rights reserved .
* Copyright ( c ) 2004 - 2006 The University of Tennessee and The University
* of Tennessee Research Foundation . All rights
* reserved .
* Copyright ( c ) 2004 - 2005 High Performance Computing Center Stuttgart ,
* University of Stuttgart . All rights reserved .
* Copyright ( c ) 2004 - 2005 The Regents of the University of California .
* All rights reserved .
2008-10-28 21:29:57 +03:00
* Copyright ( c ) 2008 Cisco Systems , Inc . All rights reserved .
2008-10-29 18:15:37 +03:00
* Copyright ( c ) 2008 Sun Microsystems , Inc . All rights reserved .
2007-11-14 02:39:16 +03:00
* $ COPYRIGHT $
*
* Additional copyrights may follow
*
* $ HEADER $
*
* In windows , many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN , EINPROGRESS , etc . It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX / Linux environments
*/
# include "ompi_config.h"
# include "opal/opal_socket_errno.h"
# ifdef HAVE_UNISTD_H
# include <unistd.h>
# endif
# include <string.h>
# include <fcntl.h>
# ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
# endif
# ifdef HAVE_SYS_SOCKET_H
# include <sys/socket.h>
# endif
# ifdef HAVE_NETINET_IN_H
# include <netinet/in.h>
# endif
# ifdef HAVE_ARPA_INET_H
# include <arpa/inet.h>
# endif
# include "ompi/constants.h"
# include "opal/event/event.h"
# include "opal/util/if.h"
# include "opal/util/argv.h"
# include "ompi/mca/btl/btl.h"
# include "opal/mca/base/mca_base_param.h"
# include "ompi/runtime/ompi_module_exchange.h"
2008-10-28 21:29:57 +03:00
# include "ompi/runtime/mpiruntime.h"
2007-11-14 02:39:16 +03:00
# include "ompi/mca/mpool/base/base.h"
# include "ompi/mca/btl/base/btl_base_error.h"
# include "btl_sctp.h"
# include "btl_sctp_addr.h"
# include "btl_sctp_proc.h"
# include "btl_sctp_frag.h"
# include "btl_sctp_endpoint.h"
# include "ompi/mca/btl/base/base.h"
# include "ompi/datatype/convertor.h"
# include <netinet/sctp.h>
# include "btl_sctp_recv_handler.h"
# include "btl_sctp_component.h"
mca_btl_sctp_component_t mca_btl_sctp_component = {
{
/* First, the mca_base_component_t struct containing meta information
about the component itself */
{
2008-07-29 02:40:57 +04:00
MCA_BTL_BASE_VERSION_2_0_0 ,
2007-11-14 02:39:16 +03:00
" sctp " , /* MCA component name */
OMPI_MAJOR_VERSION , /* MCA component major version */
OMPI_MINOR_VERSION , /* MCA component minor version */
OMPI_RELEASE_VERSION , /* MCA component release version */
mca_btl_sctp_component_open , /* component open */
mca_btl_sctp_component_close /* component close */
} ,
{
/* Whether the component is checkpointable or not */
MCA_BASE_METADATA_PARAM_CHECKPOINT
} ,
mca_btl_sctp_component_init ,
NULL ,
}
} ;
2007-11-17 05:56:27 +03:00
# if MCA_BTL_SCTP_DONT_USE_HASH
struct mca_btl_sctp_proc_table_node * recvr_proc_table ;
struct mca_btl_sctp_proc_table_node * sender_proc_table ;
# endif
2007-11-14 02:39:16 +03:00
/*
* utility routines for parameter registration
*/
static inline char * mca_btl_sctp_param_register_string (
const char * param_name ,
const char * default_value )
{
char * param_value ;
char * help_string = NULL ;
mca_base_param_reg_string ( & mca_btl_sctp_component . super . btl_version ,
param_name , help_string , false , false ,
default_value , & param_value ) ;
return param_value ;
}
static inline int mca_btl_sctp_param_register_int (
const char * param_name ,
int default_value )
{
int param_value ;
char * help_string = NULL ;
mca_base_param_reg_int ( & mca_btl_sctp_component . super . btl_version ,
param_name , help_string , false , false ,
default_value , & param_value ) ;
return param_value ;
}
/*
* Data structure for accepting connections .
*/
struct mca_btl_sctp_event_t {
opal_list_item_t item ;
opal_event_t event ;
} ;
typedef struct mca_btl_sctp_event_t mca_btl_sctp_event_t ;
static void mca_btl_sctp_event_construct ( mca_btl_sctp_event_t * event )
{
OPAL_THREAD_LOCK ( & mca_btl_sctp_component . sctp_lock ) ;
opal_list_append ( & mca_btl_sctp_component . sctp_events , & event - > item ) ;
OPAL_THREAD_UNLOCK ( & mca_btl_sctp_component . sctp_lock ) ;
}
static void mca_btl_sctp_event_destruct ( mca_btl_sctp_event_t * event )
{
OPAL_THREAD_LOCK ( & mca_btl_sctp_component . sctp_lock ) ;
opal_list_remove_item ( & mca_btl_sctp_component . sctp_events , & event - > item ) ;
OPAL_THREAD_UNLOCK ( & mca_btl_sctp_component . sctp_lock ) ;
}
OBJ_CLASS_INSTANCE (
mca_btl_sctp_event_t ,
opal_list_item_t ,
mca_btl_sctp_event_construct ,
mca_btl_sctp_event_destruct ) ;
/*
* functions for receiving event callbacks
*/
static void mca_btl_sctp_component_recv_handler ( int , short , void * ) ; /* for 1-1 */
/* mca_btl_sctp_recv_handler(int, short, void*) for 1-many is in btl_sctp_recv_handler.h */
/*
* Called by MCA framework to open the component , registers
* component parameters .
*/
int mca_btl_sctp_component_open ( void )
{
# ifdef __WINDOWS__
WSADATA win_sock_data ;
if ( WSAStartup ( MAKEWORD ( 2 , 2 ) , & win_sock_data ) ! = 0 ) {
BTL_ERROR ( ( " failed to initialise windows sockets:%d " , WSAGetLastError ( ) ) ) ;
return OMPI_ERROR ;
}
# endif
/* initialize state */
mca_btl_sctp_component . sctp_listen_sd = - 1 ;
/* TODO different sd for ipv6 */
mca_btl_sctp_component . sctp_num_btls = 0 ;
/* addr_count */
mca_btl_sctp_component . sctp_btls = NULL ;
/* initialize objects */
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_lock , opal_mutex_t ) ;
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_procs , opal_hash_table_t ) ;
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_events , opal_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_frag_eager , ompi_free_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_frag_max , ompi_free_list_t ) ;
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_frag_user , ompi_free_list_t ) ;
opal_hash_table_init ( & mca_btl_sctp_component . sctp_procs , 256 ) ;
# if MCA_BTL_SCTP_DONT_USE_HASH
/* TODO make this only allocate how much it needs to. Currently
* allocates 256 ( to match sctp_procs ) . recvr_proc_table and
* sender_proc_table are malloc ' d in mca_btl_sctp_component_init .
*/
recvr_proc_table = NULL ;
sender_proc_table = NULL ;
# else
OBJ_CONSTRUCT ( & mca_btl_sctp_component . sctp_assocID_hash , opal_hash_table_t ) ;
opal_hash_table_init ( & mca_btl_sctp_component . sctp_assocID_hash , 256 ) ;
# endif
/* register SCTP component parameters */
/* num links */
mca_btl_sctp_component . sctp_if_include =
mca_btl_sctp_param_register_string ( " if_include " , " " ) ;
mca_btl_sctp_component . sctp_if_exclude =
mca_btl_sctp_param_register_string ( " if_exclude " , " lo " ) ;
mca_btl_sctp_component . sctp_free_list_num =
mca_btl_sctp_param_register_int ( " free_list_num " , 8 ) ;
mca_btl_sctp_component . sctp_free_list_max =
mca_btl_sctp_param_register_int ( " free_list_max " , - 1 ) ;
mca_btl_sctp_component . sctp_free_list_inc =
mca_btl_sctp_param_register_int ( " free_list_inc " , 32 ) ;
mca_btl_sctp_component . sctp_sndbuf =
mca_btl_sctp_param_register_int ( " sndbuf " , 128 * 1024 ) ;
mca_btl_sctp_component . sctp_rcvbuf =
mca_btl_sctp_param_register_int ( " rcvbuf " , 128 * 1024 ) ;
mca_btl_sctp_component . sctp_endpoint_cache =
mca_btl_sctp_param_register_int ( " endpoint_cache " , 30 * 1024 ) ;
mca_btl_sctp_component . sctp_use_nodelay =
! mca_btl_sctp_param_register_int ( " use_nagle " , 0 ) ;
/* port_min */
/* port_range */
2007-12-15 06:28:10 +03:00
/* use a single one-to-many socket by default except in Solaris (see
* the configure . m4 file )
*/
2008-03-17 12:18:31 +03:00
mca_base_param_reg_int ( & mca_btl_sctp_component . super . btl_version ,
" if_11 " , " If 0, have one SCTP BTL module and let SCTP do multilink scheduling. If non-zero, have an SCTP BTL module per link and let the PML do the scheduling. " ,
false , false ,
OMPI_MCA_BTL_SCTP_USE_ONE_TO_ONE_SOCKET ,
& mca_btl_sctp_component . sctp_if_11 ) ;
2007-11-14 02:39:16 +03:00
/* have lower exclusivity than tcp */
2007-12-12 18:55:37 +03:00
mca_btl_sctp_module . super . btl_exclusivity = MCA_BTL_EXCLUSIVITY_LOW ;
2007-11-14 02:39:16 +03:00
mca_btl_sctp_module . super . btl_eager_limit = 64 * 1024 ;
2007-12-16 11:35:17 +03:00
mca_btl_sctp_module . super . btl_rndv_eager_limit = 64 * 1024 ;
2007-11-14 02:39:16 +03:00
mca_btl_sctp_module . super . btl_max_send_size = 128 * 1024 ;
mca_btl_sctp_module . super . btl_rdma_pipeline_send_length = 128 * 1024 ;
mca_btl_sctp_module . super . btl_rdma_pipeline_frag_size = INT_MAX ;
mca_btl_sctp_module . super . btl_min_rdma_pipeline_size = 0 ;
mca_btl_sctp_module . super . btl_flags = MCA_BTL_FLAGS_PUT |
MCA_BTL_FLAGS_SEND_INPLACE |
MCA_BTL_FLAGS_NEED_CSUM |
MCA_BTL_FLAGS_NEED_ACK |
MCA_BTL_FLAGS_HETEROGENEOUS_RDMA ;
mca_btl_sctp_module . super . btl_bandwidth = 100 ;
mca_btl_sctp_module . super . btl_latency = 100 ;
mca_btl_base_param_register ( & mca_btl_sctp_component . super . btl_version ,
& mca_btl_sctp_module . super ) ;
/* disable_family */
/* setup receive buffer */
if ( 0 = = mca_btl_sctp_recv_handler_initbuf ( ) ) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}
return OMPI_SUCCESS ;
}
/*
* module cleanup - sanity checking of queue lengths
*/
int mca_btl_sctp_component_close ( void )
{
opal_list_item_t * item ;
2008-11-04 19:46:45 +03:00
opal_list_item_t * next ;
2007-11-14 02:39:16 +03:00
if ( NULL ! = mca_btl_sctp_component . sctp_if_include ) {
free ( mca_btl_sctp_component . sctp_if_include ) ;
}
if ( NULL ! = mca_btl_sctp_component . sctp_if_exclude ) {
free ( mca_btl_sctp_component . sctp_if_exclude ) ;
}
if ( NULL ! = mca_btl_sctp_component . sctp_btls ) {
free ( mca_btl_sctp_component . sctp_btls ) ;
}
mca_btl_sctp_recv_handler_freebuf ( ) ;
if ( mca_btl_sctp_component . sctp_listen_sd > = 0 ) {
opal_event_del ( & mca_btl_sctp_component . sctp_recv_event ) ;
CLOSE_THE_SOCKET ( mca_btl_sctp_component . sctp_listen_sd ) ;
mca_btl_sctp_component . sctp_listen_sd = - 1 ;
}
/* cleanup any pending events */
OPAL_THREAD_LOCK ( & mca_btl_sctp_component . sctp_lock ) ;
2008-11-04 19:46:45 +03:00
for ( item = opal_list_get_first ( & mca_btl_sctp_component . sctp_events ) ;
item ! = opal_list_get_end ( & mca_btl_sctp_component . sctp_events ) ;
item = next ) {
mca_btl_sctp_event_t * event = ( mca_btl_sctp_event_t * ) item ;
next = opal_list_get_next ( item ) ;
opal_event_del ( & event - > event ) ;
OBJ_RELEASE ( event ) ;
2007-11-14 02:39:16 +03:00
}
OPAL_THREAD_UNLOCK ( & mca_btl_sctp_component . sctp_lock ) ;
/* release resources */
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_procs ) ;
# if MCA_BTL_SCTP_DONT_USE_HASH
if ( NULL ! = recvr_proc_table ) {
free ( recvr_proc_table ) ;
}
if ( NULL ! = sender_proc_table ) {
free ( sender_proc_table ) ;
}
# else
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_assocID_hash ) ;
# endif
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_events ) ;
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_frag_eager ) ;
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_frag_max ) ;
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_frag_user ) ;
OBJ_DESTRUCT ( & mca_btl_sctp_component . sctp_lock ) ;
# ifdef __WINDOWS__
WSACleanup ( ) ;
# endif
return OMPI_SUCCESS ;
}
/*
* Create a btl instance and add to modules list .
*/
static int mca_btl_sctp_create ( int if_index , const char * if_name )
{
if ( mca_btl_sctp_component . sctp_if_11 ) {
char param [ 256 ] ;
struct mca_btl_sctp_module_t * btl = ( struct mca_btl_sctp_module_t * ) malloc ( sizeof ( mca_btl_sctp_module_t ) ) ;
if ( NULL = = btl ) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}
memcpy ( btl , & mca_btl_sctp_module , sizeof ( mca_btl_sctp_module ) ) ;
OBJ_CONSTRUCT ( & btl - > sctp_endpoints , opal_list_t ) ;
mca_btl_sctp_component . sctp_btls [ mca_btl_sctp_component . sctp_num_btls + + ] = btl ;
/* initialize the btl */
btl - > sctp_ifindex = if_index ;
# if MCA_BTL_SCTP_STATISTICS
btl - > sctp_bytes_recv = 0 ;
btl - > sctp_bytes_sent = 0 ;
btl - > sctp_send_handler = 0 ;
# endif
opal_ifindextoaddr ( if_index , ( struct sockaddr * ) & btl - > sctp_ifaddr , sizeof ( btl - > sctp_ifaddr ) ) ;
/* prepare for bind call later before connect */
btl - > sctp_ifaddr . sin_family = AF_INET ;
# ifdef FREEBSD
btl - > sctp_ifaddr . sin_len = sizeof ( struct sockaddr ) ;
# endif
btl - > sctp_ifaddr . sin_port = 0 ;
opal_ifindextomask ( if_index , ( uint32_t * ) & btl - > sctp_ifmask , sizeof ( btl - > sctp_ifmask ) ) ;
/* allow user to specify interface bandwidth */
sprintf ( param , " bandwidth_%s " , if_name ) ;
btl - > super . btl_bandwidth = mca_btl_sctp_param_register_int ( param , 0 ) ;
/* allow user to override/specify latency ranking */
sprintf ( param , " latency_%s " , if_name ) ;
btl - > super . btl_latency = mca_btl_sctp_param_register_int ( param , 0 ) ;
#if 0 && OMPI_ENABLE_DEBUG
BTL_OUTPUT ( ( " interface: %s bandwidth %d latency %d " ,
if_name , btl - > super . btl_bandwidth , btl - > super . btl_latency ) ) ;
# endif
return OMPI_SUCCESS ;
}
else {
/* 1 to many */
struct mca_btl_sctp_module_t * btl ;
char param [ 256 ] ;
struct sockaddr_in next_ifaddr ;
socklen_t len = sizeof ( struct sockaddr_in ) ;
opal_socklen_t addrlen ;
/* check if this is the first time this function is being called */
if ( 0 = = mca_btl_sctp_component . sctp_num_btls ) {
/* fill in btl struct with first interface's information (arbitary) */
btl = ( struct mca_btl_sctp_module_t * ) malloc ( sizeof ( mca_btl_sctp_module_t ) ) ;
if ( NULL = = btl ) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}
memcpy ( btl , & mca_btl_sctp_module , sizeof ( mca_btl_sctp_module ) ) ;
OBJ_CONSTRUCT ( & btl - > sctp_endpoints , opal_list_t ) ;
mca_btl_sctp_component . sctp_btls [ mca_btl_sctp_component . sctp_num_btls + + ] = btl ;
/* initialize the btl */
btl - > sctp_ifindex = if_index ;
# if MCA_BTL_SCTP_STATISTICS
btl - > sctp_bytes_recv = 0 ;
btl - > sctp_bytes_sent = 0 ;
btl - > sctp_send_handler = 0 ;
# endif
opal_ifindextoaddr ( if_index , ( struct sockaddr * ) & btl - > sctp_ifaddr , sizeof ( btl - > sctp_ifaddr ) ) ;
opal_ifindextomask ( if_index , ( uint32_t * ) & btl - > sctp_ifmask , sizeof ( btl - > sctp_ifmask ) ) ;
/* allow user to specify interface bandwidth */
sprintf ( param , " bandwidth_%s " , if_name ) ;
btl - > super . btl_bandwidth = mca_btl_sctp_param_register_int ( param , 0 ) ;
/* allow user to override/specify latency ranking */
sprintf ( param , " latency_%s " , if_name ) ;
btl - > super . btl_latency = mca_btl_sctp_param_register_int ( param , 0 ) ;
#if 0 && OMPI_ENABLE_DEBUG
BTL_OUTPUT ( ( " interface: %s bandwidth %d latency %d " ,
if_name , btl - > super . btl_bandwidth , btl - > super . btl_latency ) ) ;
# endif
/* call bind to this (initial) addr */
opal_ifindextoaddr ( if_index , ( struct sockaddr * ) & next_ifaddr , sizeof ( next_ifaddr ) ) ;
next_ifaddr . sin_family = AF_INET ;
# ifdef FREEBSD
next_ifaddr . sin_len = sizeof ( struct sockaddr ) ;
# endif
next_ifaddr . sin_port = 0 ;
if ( bind ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & next_ifaddr , len ) < 0 ) {
return OMPI_ERR_FATAL ;
}
/* resolve system assignend port */
addrlen = sizeof ( struct sockaddr_in ) ;
if ( getsockname ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & next_ifaddr , & addrlen ) < 0 ) {
BTL_ERROR ( ( " getsockname() failed with errno=%d " , opal_socket_errno ) ) ;
return OMPI_ERROR ;
}
/* need to get the port after the first bind call for subsequent
* sctp_bindx calls .
*/
mca_btl_sctp_component . sctp_listen_port = next_ifaddr . sin_port ;
}
else {
next_ifaddr . sin_port = htons ( ( unsigned short ) mca_btl_sctp_component . sctp_listen_port ) ;
/* add this addr to bindx */
opal_ifindextoaddr ( if_index , ( struct sockaddr * ) & next_ifaddr , sizeof ( next_ifaddr ) ) ;
next_ifaddr . sin_family = AF_INET ;
# ifdef FREEBSD
next_ifaddr . sin_len = sizeof ( struct sockaddr ) ;
# endif
if ( sctp_bindx ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & next_ifaddr ,
1 , SCTP_BINDX_ADD_ADDR ) < 0 ) {
return OMPI_ERR_FATAL ;
}
}
return OMPI_SUCCESS ;
}
}
/*
* Create SCTP BTL instance ( s ) using either :
* ( 1 ) all interfaces specified by the user
* ( 2 ) all available interfaces
* ( 3 ) all available interfaces except for those excluded by the user
*
* For 1 - 1 sockets , have a BTL per interface .
* For 1 - many sockets , use bind for the first interface and sctp_bindx for each interface thereafter .
*
*/
static int mca_btl_sctp_component_create_instance ( void )
{
int if_count = opal_ifcount ( ) ;
int if_index ;
char * * include ;
char * * exclude ;
char * * argv ;
if ( if_count < = 0 ) {
return OMPI_ERROR ;
}
2008-10-29 18:15:37 +03:00
/* Allocate memory for btl pointers. This may be more space then
we need as some of the interfaces may get filtered out by the
if_include and if_exclude parameters . But that is just a few
unused pointers . */
mca_btl_sctp_component . sctp_btls =
( mca_btl_sctp_module_t * * ) malloc ( if_count * sizeof ( mca_btl_sctp_module_t * ) ) ;
2007-11-14 02:39:16 +03:00
if ( NULL = = mca_btl_sctp_component . sctp_btls ) {
return OMPI_ERR_OUT_OF_RESOURCE ;
}
/* if the user specified an interface list - use these exclusively */
argv = include = opal_argv_split ( mca_btl_sctp_component . sctp_if_include , ' , ' ) ;
while ( argv & & * argv ) {
char * if_name = * argv ;
int if_index = opal_ifnametoindex ( if_name ) ;
if ( if_index < 0 ) {
BTL_ERROR ( ( " invalid interface \" %s \" " , if_name ) ) ;
} else {
mca_btl_sctp_create ( if_index , if_name ) ;
}
argv + + ;
}
opal_argv_free ( include ) ;
if ( mca_btl_sctp_component . sctp_num_btls ) {
return OMPI_SUCCESS ;
}
/* if the interface list was not specified by the user, create
* a BTL for each interface that was not excluded .
*/
exclude = opal_argv_split ( mca_btl_sctp_component . sctp_if_exclude , ' , ' ) ;
for ( if_index = opal_ifbegin ( ) ; if_index > = 0 ; if_index = opal_ifnext ( if_index ) ) {
char if_name [ 32 ] ;
opal_ifindextoname ( if_index , if_name , sizeof ( if_name ) ) ;
/* check to see if this interface exists in the exclude list */
if ( opal_ifcount ( ) > 1 ) {
argv = exclude ;
while ( argv & & * argv ) {
if ( strncmp ( * argv , if_name , strlen ( * argv ) ) = = 0 ) {
break ;
}
argv + + ;
}
/* if this interface was not found in the excluded list - create a BTL */
if ( argv = = 0 | | * argv = = 0 ) {
mca_btl_sctp_create ( if_index , if_name ) ;
}
} else {
mca_btl_sctp_create ( if_index , if_name ) ;
}
}
opal_argv_free ( exclude ) ;
return OMPI_SUCCESS ;
}
/*
* Create a listen socket and bind to all interfaces
*/
static int mca_btl_sctp_component_create_listen ( void )
{
if ( mca_btl_sctp_component . sctp_if_11 ) {
/* 1 to 1 */
int rc ;
struct sockaddr_in inaddr ;
opal_socklen_t addrlen ;
/* create a listen socket for incoming connections */
mca_btl_sctp_component . sctp_listen_sd = socket ( AF_INET , SOCK_STREAM , IPPROTO_SCTP ) ;
if ( mca_btl_sctp_component . sctp_listen_sd < 0 ) {
2007-12-13 06:15:29 +03:00
if ( opal_socket_errno ! = ESOCKTNOSUPPORT ) {
/* may have SCTP API but SCTP itself unable to auto-load into
* the kernel ( like RHEL4U4 ) , so avoid noisy error output
*/
BTL_ERROR ( ( " socket() failed with errno=%d " , opal_socket_errno ) ) ;
}
2007-11-14 02:39:16 +03:00
return OMPI_ERROR ;
}
if ( ( rc = mca_btl_sctp_set_socket_options ( mca_btl_sctp_component . sctp_listen_sd ) ) ! = OMPI_SUCCESS ) {
return rc ;
}
/* bind to all addresses and dynamically assigned port */
memset ( & inaddr , 0 , sizeof ( inaddr ) ) ;
inaddr . sin_family = AF_INET ;
inaddr . sin_addr . s_addr = INADDR_ANY ;
inaddr . sin_port = 0 ;
if ( bind ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & inaddr , sizeof ( inaddr ) ) < 0 ) {
BTL_ERROR ( ( " bind() failed with errno=%d " , opal_socket_errno ) ) ;
return OMPI_ERROR ;
}
/* resolve system assignend port */
addrlen = sizeof ( struct sockaddr_in ) ;
if ( getsockname ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & inaddr , & addrlen ) < 0 ) {
BTL_ERROR ( ( " getsockname() failed with errno=%d " , opal_socket_errno ) ) ;
return OMPI_ERROR ;
}
mca_btl_sctp_component . sctp_listen_port = inaddr . sin_port ;
/* setup listen backlog to maximum allowed by kernel */
if ( listen ( mca_btl_sctp_component . sctp_listen_sd , SOMAXCONN ) < 0 ) {
BTL_ERROR ( ( " listen() failed with errno=%d " , opal_socket_errno ) ) ;
return OMPI_ERROR ;
}
/* register listen port */
opal_event_set (
& mca_btl_sctp_component . sctp_recv_event ,
mca_btl_sctp_component . sctp_listen_sd ,
OPAL_EV_READ | OPAL_EV_PERSIST ,
mca_btl_sctp_component_recv_handler ,
0 ) ;
opal_event_add ( & mca_btl_sctp_component . sctp_recv_event , 0 ) ;
return OMPI_SUCCESS ;
}
else {
/* 1 to many */
int rc ;
/* create a one to many listen socket for incoming connections and ALL sent/received messages */
mca_btl_sctp_component . sctp_listen_sd = socket ( AF_INET , SOCK_SEQPACKET , IPPROTO_SCTP ) ;
if ( mca_btl_sctp_component . sctp_listen_sd < 0 ) {
2007-12-13 06:15:29 +03:00
if ( opal_socket_errno ! = ESOCKTNOSUPPORT ) {
/* may have SCTP API but SCTP itself unable to auto-load into
* the kernel ( like RHEL4U4 ) , so avoid noisy error output
*/
BTL_ERROR ( ( " socket() failed with errno=%d " , opal_socket_errno ) ) ;
}
2007-11-14 02:39:16 +03:00
return OMPI_ERROR ;
}
if ( ( rc = mca_btl_sctp_set_socket_options ( mca_btl_sctp_component . sctp_listen_sd ) ) ! = OMPI_SUCCESS ) {
return rc ;
}
/* port set to zero to indicate "unset" in mca_btl_sctp_create */
mca_btl_sctp_component . sctp_listen_port = 0 ;
return OMPI_SUCCESS ;
}
}
static int mca_btl_sctp_component_register_listen ( void )
{
/* setup listen backlog to maximum allowed by kernel */
if ( listen ( mca_btl_sctp_component . sctp_listen_sd , SOMAXCONN ) < 0 ) {
BTL_ERROR ( ( " listen() failed with errno=%d " , opal_socket_errno ) ) ;
return OMPI_ERROR ;
}
/* register listen port */
opal_event_set (
& mca_btl_sctp_component . sctp_recv_event ,
mca_btl_sctp_component . sctp_listen_sd ,
OPAL_EV_READ | OPAL_EV_PERSIST ,
mca_btl_sctp_recv_handler ,
0 ) ;
opal_event_add ( & mca_btl_sctp_component . sctp_recv_event , 0 ) ;
return OMPI_SUCCESS ;
}
/*
* Register SCTP module addressing information . The MCA framework
* will make this available to all peers .
*/
static int mca_btl_sctp_component_exchange ( void )
{
int rc = 0 ;
size_t i = 0 ;
size_t size = mca_btl_sctp_component . sctp_num_btls * sizeof ( mca_btl_sctp_addr_t ) ;
if ( mca_btl_sctp_component . sctp_num_btls ! = 0 ) {
mca_btl_sctp_addr_t * addrs = ( mca_btl_sctp_addr_t * ) malloc ( size ) ;
for ( i = 0 ; i < mca_btl_sctp_component . sctp_num_btls ; i + + ) {
struct mca_btl_sctp_module_t * btl = mca_btl_sctp_component . sctp_btls [ i ] ;
addrs [ i ] . addr_inet = btl - > sctp_ifaddr . sin_addr ;
addrs [ i ] . addr_port = mca_btl_sctp_component . sctp_listen_port ;
addrs [ i ] . addr_inuse = 0 ;
}
rc = ompi_modex_send ( & mca_btl_sctp_component . super . btl_version , addrs , size ) ;
free ( addrs ) ;
}
return rc ;
}
/*
* SCTP module initialization :
* ( 1 ) read interface list from kernel and compare against module parameters
* then create a BTL instance for selected interfaces
* ( 2 ) setup SCTP listen socket for incoming connection attempts
* ( 3 ) register BTL parameters with the MCA
*/
mca_btl_base_module_t * * mca_btl_sctp_component_init ( int * num_btl_modules ,
bool enable_progress_threads ,
bool enable_mpi_threads )
{
2008-10-28 21:29:57 +03:00
/* Currently refuse to run if MPI_THREAD_MULTIPLE is enabled */
if ( ompi_mpi_thread_multiple & & ! mca_btl_base_thread_multiple_override ) {
return NULL ;
}
2007-11-14 02:39:16 +03:00
if ( mca_btl_sctp_component . sctp_if_11 ) {
/* 1 to 1 */
mca_btl_base_module_t * * btls ;
* num_btl_modules = 0 ;
/* initialize free lists */
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_eager ,
sizeof ( mca_btl_sctp_frag_eager_t ) +
mca_btl_sctp_module . super . btl_eager_limit ,
OBJ_CLASS ( mca_btl_sctp_frag_eager_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_max ,
sizeof ( mca_btl_sctp_frag_max_t ) +
mca_btl_sctp_module . super . btl_max_send_size ,
OBJ_CLASS ( mca_btl_sctp_frag_max_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_user ,
sizeof ( mca_btl_sctp_frag_user_t ) ,
OBJ_CLASS ( mca_btl_sctp_frag_user_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
/* create a BTL SCTP module for selected interfaces */
if ( mca_btl_sctp_component_create_instance ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
/* create a SCTP listen socket for incoming connection attempts */
if ( mca_btl_sctp_component_create_listen ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
/* publish SCTP parameters with the MCA framework */
if ( mca_btl_sctp_component_exchange ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
btls = ( mca_btl_base_module_t * * ) malloc ( mca_btl_sctp_component . sctp_num_btls *
sizeof ( mca_btl_base_module_t * ) ) ;
if ( NULL = = btls ) {
return NULL ;
}
memcpy ( btls , mca_btl_sctp_component . sctp_btls , mca_btl_sctp_component . sctp_num_btls * sizeof ( mca_btl_sctp_module_t * ) ) ;
* num_btl_modules = mca_btl_sctp_component . sctp_num_btls ;
return btls ;
}
else {
/* 1 to many */
int i ;
mca_btl_base_module_t * * btls ;
* num_btl_modules = 0 ;
/* initialize free lists */
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_eager ,
sizeof ( mca_btl_sctp_frag_eager_t ) +
mca_btl_sctp_module . super . btl_eager_limit ,
OBJ_CLASS ( mca_btl_sctp_frag_eager_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_max ,
sizeof ( mca_btl_sctp_frag_max_t ) +
mca_btl_sctp_module . super . btl_max_send_size ,
OBJ_CLASS ( mca_btl_sctp_frag_max_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
ompi_free_list_init ( & mca_btl_sctp_component . sctp_frag_user ,
sizeof ( mca_btl_sctp_frag_user_t ) ,
OBJ_CLASS ( mca_btl_sctp_frag_user_t ) ,
mca_btl_sctp_component . sctp_free_list_num ,
mca_btl_sctp_component . sctp_free_list_max ,
mca_btl_sctp_component . sctp_free_list_inc ,
NULL ) ;
/* create a SCTP listen socket for incoming connection attempts */
if ( mca_btl_sctp_component_create_listen ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
/* create a BTL SCTP module for selected interfaces */
if ( mca_btl_sctp_component_create_instance ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
/* register the now completed single BTL */
if ( mca_btl_sctp_component_register_listen ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
/* publish SCTP parameters with the MCA framework */
if ( mca_btl_sctp_component_exchange ( ) ! = OMPI_SUCCESS ) {
return 0 ;
}
# if MCA_BTL_SCTP_DONT_USE_HASH
/* Initialize the proc_tables to all negative ones. */
recvr_proc_table = ( mca_btl_sctp_proc_table_node * ) malloc ( sizeof ( mca_btl_sctp_proc_table_node ) * MCA_BTL_SCTP_PROC_TABLE_SIZE ) ;
sender_proc_table = ( mca_btl_sctp_proc_table_node * ) malloc ( sizeof ( mca_btl_sctp_proc_table_node ) * MCA_BTL_SCTP_PROC_TABLE_SIZE ) ;
if ( NULL = = recvr_proc_table | | NULL = = sender_proc_table ) {
return 0 ;
}
for ( i = 0 ; i < MCA_BTL_SCTP_PROC_TABLE_SIZE ; i + + ) {
recvr_proc_table [ i ] . valid = 0 ;
recvr_proc_table [ i ] . sctp_assoc_id = 0 ;
2007-12-22 00:50:00 +03:00
recvr_proc_table [ i ] . vpid = 0 ;
2007-11-14 02:39:16 +03:00
recvr_proc_table [ i ] . proc = NULL ;
sender_proc_table [ i ] . valid = 0 ;
sender_proc_table [ i ] . sctp_assoc_id = 0 ;
2007-12-22 00:50:00 +03:00
sender_proc_table [ i ] . vpid = 0 ;
2007-11-14 02:39:16 +03:00
sender_proc_table [ i ] . proc = NULL ;
}
# endif
btls = ( mca_btl_base_module_t * * ) malloc ( mca_btl_sctp_component . sctp_num_btls *
sizeof ( mca_btl_base_module_t * ) ) ;
if ( NULL = = btls ) {
return NULL ;
}
memcpy ( btls , mca_btl_sctp_component . sctp_btls , mca_btl_sctp_component . sctp_num_btls * sizeof ( mca_btl_sctp_module_t * ) ) ;
* num_btl_modules = mca_btl_sctp_component . sctp_num_btls ;
return btls ;
}
}
/*
* SCTP module control
*/
int mca_btl_sctp_component_control ( int param , void * value , size_t size )
{
return OMPI_SUCCESS ;
}
/*
* Called by mca_btl_sctp_component_recv ( ) when the SCTP listen
* socket has pending connection requests . Accept incoming
* requests and queue for completion of the connection handshake .
*/
void mca_btl_sctp_component_accept ( void )
{
if ( mca_btl_sctp_component . sctp_if_11 ) {
/* 1 to 1 */
while ( true ) {
opal_socklen_t addrlen = sizeof ( struct sockaddr_in ) ;
struct sockaddr_in addr ;
mca_btl_sctp_event_t * event ;
int rc , sd = accept ( mca_btl_sctp_component . sctp_listen_sd , ( struct sockaddr * ) & addr , & addrlen ) ;
if ( sd < 0 ) {
if ( opal_socket_errno = = EINTR ) {
continue ;
}
if ( opal_socket_errno = = ECONNRESET | | opal_socket_errno = = EBADF ) {
/* closed remotely while on listen queue */
close ( sd ) ;
}
else if ( opal_socket_errno ! = EAGAIN & & opal_socket_errno ! = EWOULDBLOCK ) {
BTL_ERROR ( ( " accept() failed with errno %d. " , opal_socket_errno ) ) ;
}
return ;
}
if ( ( rc = mca_btl_sctp_set_socket_options ( sd ) ) ! = OMPI_SUCCESS ) {
BTL_ERROR ( ( " failed to set socket options " ) ) ;
return ;
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW ( mca_btl_sctp_event_t ) ;
opal_event_set ( & event - > event , sd , OPAL_EV_READ , mca_btl_sctp_component_recv_handler , event ) ;
opal_event_add ( & event - > event , 0 ) ;
}
}
else {
/* 1 to many */
/* Called by mca_btl_sctp_recv_handler to get a valid *user pointer */
mca_btl_sctp_event_t * event ;
int sd = mca_btl_sctp_component . sctp_listen_sd ;
if ( sd < 0 ) {
BTL_ERROR ( ( " mca_btl_sctp_component_accept(): Invalid socket descriptor. \n " ) ) ;
}
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW ( mca_btl_sctp_event_t ) ;
opal_event_set ( & event - > event , sd , OPAL_EV_READ , mca_btl_sctp_recv_handler , event ) ;
opal_event_add ( & event - > event , 0 ) ;
}
}
/* Used only with one-to-one socket.
*
* Event callback when there is data available on the registered
* socket to recv .
*/
static void mca_btl_sctp_component_recv_handler ( int sd , short flags , void * user )
{
orte_process_name_t guid ;
struct sockaddr_in addr ;
int retval ;
mca_btl_sctp_proc_t * btl_proc ;
opal_socklen_t addr_len = sizeof ( addr ) ;
mca_btl_sctp_event_t * event = ( mca_btl_sctp_event_t * ) user ;
int msg_flags = 0 ;
struct sctp_sndrcvinfo sri ;
/* accept new connections on the listen socket */
if ( mca_btl_sctp_component . sctp_listen_sd = = sd ) {
mca_btl_sctp_component_accept ( ) ;
return ;
}
OBJ_RELEASE ( event ) ;
retval = sctp_recvmsg ( sd , ( char * ) & guid , sizeof ( guid ) , 0 , 0 , & sri , & msg_flags ) ;
if ( retval ! = sizeof ( guid ) ) {
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
2007-11-20 09:02:08 +03:00
SCTP_BTL_ERROR ( ( " mca_btl_sctp_component_recv_handler() sd=%d, got %d byte guid. \n " , sd , retval ) ) ;
2007-11-14 02:39:16 +03:00
ORTE_PROCESS_NAME_NTOH ( guid ) ;
/* lookup the corresponding process */
btl_proc = mca_btl_sctp_proc_lookup ( & guid ) ;
if ( NULL = = btl_proc ) {
BTL_ERROR ( ( " errno=%d " , errno ) ) ;
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
/* lookup peer address */
if ( getpeername ( sd , ( struct sockaddr * ) & addr , & addr_len ) ! = 0 ) {
if ( opal_socket_errno ! = ECONNRESET & & opal_socket_errno ! = EBADF & & opal_socket_errno ! = ENOTCONN ) {
BTL_ERROR ( ( " getpeername() failed with errno=%d " , opal_socket_errno ) ) ;
}
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
/* are there any existing peer instances will to accept this connection */
if ( mca_btl_sctp_proc_accept ( btl_proc , & addr , sd ) = = false ) {
CLOSE_THE_SOCKET ( sd ) ;
return ;
}
}