1
1
openmpi/orte/mca/oob/tcp/oob_tcp_component.c
Ralph Castain 93cf3c7203 Update OPAL and ORTE for thread safety
(I swear, if I look this over one more time, I'll puke)

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
2017-06-06 12:30:57 -07:00

1474 строки
58 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2017 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 NVIDIA Corporation. All rights reserved.
* Copyright (c) 2015-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will not conflict with other error codes that
* are returned by these functions under UNIX/Linux environments
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "opal/runtime/opal_progress_threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/state/state.h"
#include "orte/util/attr.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/util/threads.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/oob/tcp/oob_tcp.h"
#include "orte/mca/oob/tcp/oob_tcp_common.h"
#include "orte/mca/oob/tcp/oob_tcp_component.h"
#include "orte/mca/oob/tcp/oob_tcp_peer.h"
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
#include "orte/mca/oob/tcp/oob_tcp_listener.h"
/*
* Local utility functions
*/
static int tcp_component_register(void);
static int tcp_component_open(void);
static int tcp_component_close(void);
static int component_available(void);
static int component_startup(void);
static void component_shutdown(void);
static int component_send(orte_rml_send_t *msg);
static char* component_get_addr(void);
static int component_set_addr(orte_process_name_t *peer,
char **uris);
static bool component_is_reachable(char *rtmod, orte_process_name_t *peer);
static orte_rml_pathway_t* component_query_transports(void);
#if OPAL_ENABLE_FT_CR == 1
static int component_ft_event(int state);
#endif
/*
* Struct of function pointers and all that to let us be initialized
*/
mca_oob_tcp_component_t mca_oob_tcp_component = {
{
.oob_base = {
MCA_OOB_BASE_VERSION_2_0_0,
.mca_component_name = "tcp",
MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION),
.mca_open_component = tcp_component_open,
.mca_close_component = tcp_component_close,
.mca_register_component_params = tcp_component_register,
},
.oob_data = {
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
.priority = 30, // default priority of this transport
.available = component_available,
.startup = component_startup,
.shutdown = component_shutdown,
.send_nb = component_send,
.get_addr = component_get_addr,
.set_addr = component_set_addr,
.is_reachable = component_is_reachable,
.query_transports = component_query_transports,
#if OPAL_ENABLE_FT_CR == 1
.ft_event = component_ft_event,
#endif
},
};
/*
* Initialize global variables used w/in this module.
*/
static int tcp_component_open(void)
{
mca_oob_tcp_component.next_base = 0;
OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
OBJ_CONSTRUCT(&mca_oob_tcp_component.ev_bases, opal_pointer_array_t);
opal_pointer_array_init(&mca_oob_tcp_component.ev_bases,
orte_oob_base.num_threads, 256, 8);
OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t);
if (ORTE_PROC_IS_HNP) {
OBJ_CONSTRUCT(&mca_oob_tcp_component.listen_thread, opal_thread_t);
mca_oob_tcp_component.listen_thread_active = false;
mca_oob_tcp_component.listen_thread_tv.tv_sec = 3600;
mca_oob_tcp_component.listen_thread_tv.tv_usec = 0;
}
mca_oob_tcp_component.addr_count = 0;
mca_oob_tcp_component.ipv4conns = NULL;
mca_oob_tcp_component.ipv4ports = NULL;
mca_oob_tcp_component.ipv6conns = NULL;
mca_oob_tcp_component.ipv6ports = NULL;
/* if_include and if_exclude need to be mutually exclusive */
if (OPAL_SUCCESS !=
mca_base_var_check_exclusive("orte",
mca_oob_tcp_component.super.oob_base.mca_type_name,
mca_oob_tcp_component.super.oob_base.mca_component_name,
"if_include",
mca_oob_tcp_component.super.oob_base.mca_type_name,
mca_oob_tcp_component.super.oob_base.mca_component_name,
"if_exclude")) {
/* Return ERR_NOT_AVAILABLE so that a warning message about
"open" failing is not printed */
return ORTE_ERR_NOT_AVAILABLE;
}
return ORTE_SUCCESS;
}
/*
* Cleanup of global variables used by this module.
*/
static int tcp_component_close(void)
{
/* cleanup listen event list */
OBJ_DESTRUCT(&mca_oob_tcp_component.listeners);
OBJ_DESTRUCT(&mca_oob_tcp_component.peers);
if (NULL != mca_oob_tcp_component.ipv4conns) {
opal_argv_free(mca_oob_tcp_component.ipv4conns);
}
if (NULL != mca_oob_tcp_component.ipv4ports) {
opal_argv_free(mca_oob_tcp_component.ipv4ports);
}
#if OPAL_ENABLE_IPV6
if (NULL != mca_oob_tcp_component.ipv6conns) {
opal_argv_free(mca_oob_tcp_component.ipv6conns);
}
if (NULL != mca_oob_tcp_component.ipv6ports) {
opal_argv_free(mca_oob_tcp_component.ipv6ports);
}
#endif
OBJ_DESTRUCT(&mca_oob_tcp_component.ev_bases);
return ORTE_SUCCESS;
}
static char *static_port_string;
#if OPAL_ENABLE_IPV6
static char *static_port_string6;
#endif // OPAL_ENABLE_IPV6
static char *dyn_port_string;
#if OPAL_ENABLE_IPV6
static char *dyn_port_string6;
#endif
static int tcp_component_register(void)
{
mca_base_component_t *component = &mca_oob_tcp_component.super.oob_base;
int var_id;
/* register oob module parameters */
mca_oob_tcp_component.peer_limit = -1;
(void)mca_base_component_var_register(component, "peer_limit",
"Maximum number of peer connections to simultaneously maintain (-1 = infinite)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.peer_limit);
mca_oob_tcp_component.max_retries = 2;
(void)mca_base_component_var_register(component, "peer_retries",
"Number of times to try shutting down a connection before giving up",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.max_retries);
mca_oob_tcp_component.tcp_sndbuf = 0;
(void)mca_base_component_var_register(component, "sndbuf",
"TCP socket send buffering size (in bytes, 0 => leave system default)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.tcp_sndbuf);
mca_oob_tcp_component.tcp_rcvbuf = 0;
(void)mca_base_component_var_register(component, "rcvbuf",
"TCP socket receive buffering size (in bytes, 0 => leave system default)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.tcp_rcvbuf);
mca_oob_tcp_component.if_include = NULL;
var_id = mca_base_component_var_register(component, "if_include",
"Comma-delimited list of devices and/or CIDR notation of TCP networks to use for Open MPI bootstrap communication (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with oob_tcp_if_exclude.",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_2,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.if_include);
(void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "include",
MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
mca_oob_tcp_component.if_exclude = NULL;
var_id = mca_base_component_var_register(component, "if_exclude",
"Comma-delimited list of devices and/or CIDR notation of TCP networks to NOT use for Open MPI bootstrap communication -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with oob_tcp_if_include.",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_2,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.if_exclude);
(void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "exclude",
MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
/* if_include and if_exclude need to be mutually exclusive */
if (NULL != mca_oob_tcp_component.if_include &&
NULL != mca_oob_tcp_component.if_exclude) {
/* Return ERR_NOT_AVAILABLE so that a warning message about
"open" failing is not printed */
orte_show_help("help-oob-tcp.txt", "include-exclude", true,
mca_oob_tcp_component.if_include,
mca_oob_tcp_component.if_exclude);
return ORTE_ERR_NOT_AVAILABLE;
}
static_port_string = NULL;
(void)mca_base_component_var_register(component, "static_ipv4_ports",
"Static ports for daemons and procs (IPv4)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_2,
MCA_BASE_VAR_SCOPE_READONLY,
&static_port_string);
/* if ports were provided, parse the provided range */
if (NULL != static_port_string) {
orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp_static_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp_static_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp_static_ports);
mca_oob_tcp_component.tcp_static_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp_static_ports = NULL;
}
#if OPAL_ENABLE_IPV6
static_port_string6 = NULL;
(void)mca_base_component_var_register(component, "static_ipv6_ports",
"Static ports for daemons and procs (IPv6)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_2,
MCA_BASE_VAR_SCOPE_READONLY,
&static_port_string6);
/* if ports were provided, parse the provided range */
if (NULL != static_port_string6) {
orte_util_parse_range_options(static_port_string6, &mca_oob_tcp_component.tcp6_static_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp6_static_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp6_static_ports);
mca_oob_tcp_component.tcp6_static_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp6_static_ports = NULL;
}
#endif // OPAL_ENABLE_IPV6
if (NULL != mca_oob_tcp_component.tcp_static_ports ||
NULL != mca_oob_tcp_component.tcp6_static_ports) {
orte_static_ports = true;
}
dyn_port_string = NULL;
(void)mca_base_component_var_register(component, "dynamic_ipv4_ports",
"Range of ports to be dynamically used by daemons and procs (IPv4)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&dyn_port_string);
/* if ports were provided, parse the provided range */
if (NULL != dyn_port_string) {
/* can't have both static and dynamic ports! */
if (orte_static_ports) {
char *err = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
opal_show_help("help-oob-tcp.txt", "static-and-dynamic", true,
err, dyn_port_string);
free(err);
return ORTE_ERROR;
}
orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp_dyn_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp_dyn_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp_dyn_ports);
mca_oob_tcp_component.tcp_dyn_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp_dyn_ports = NULL;
}
#if OPAL_ENABLE_IPV6
dyn_port_string6 = NULL;
(void)mca_base_component_var_register(component, "dynamic_ipv6_ports",
"Range of ports to be dynamically used by daemons and procs (IPv6)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&dyn_port_string6);
/* if ports were provided, parse the provided range */
if (NULL != dyn_port_string6) {
/* can't have both static and dynamic ports! */
if (orte_static_ports) {
char *err4=NULL, *err6=NULL;
if (NULL != mca_oob_tcp_component.tcp_static_ports) {
err4 = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
}
if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
err6 = opal_argv_join(mca_oob_tcp_component.tcp6_static_ports, ',');
}
opal_show_help("help-oob-tcp.txt", "static-and-dynamic-ipv6", true,
(NULL == err4) ? "N/A" : err4,
(NULL == err6) ? "N/A" : err6,
dyn_port_string6);
if (NULL != err4) {
free(err4);
}
if (NULL != err6) {
free(err6);
}
return ORTE_ERROR;
}
orte_util_parse_range_options(dyn_port_string6, &mca_oob_tcp_component.tcp6_dyn_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp6_dyn_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp6_dyn_ports);
mca_oob_tcp_component.tcp6_dyn_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp6_dyn_ports = NULL;
}
#endif // OPAL_ENABLE_IPV6
mca_oob_tcp_component.disable_ipv4_family = false;
(void)mca_base_component_var_register(component, "disable_ipv4_family",
"Disable the IPv4 interfaces",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.disable_ipv4_family);
#if OPAL_ENABLE_IPV6
mca_oob_tcp_component.disable_ipv6_family = false;
(void)mca_base_component_var_register(component, "disable_ipv6_family",
"Disable the IPv6 interfaces",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.disable_ipv6_family);
#endif // OPAL_ENABLE_IPV6
// Wait for this amount of time before sending the first keepalive probe
mca_oob_tcp_component.keepalive_time = 300;
(void)mca_base_component_var_register(component, "keepalive_time",
"Idle time in seconds before starting to send keepalives (keepalive_time <= 0 disables keepalive functionality)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.keepalive_time);
// Resend keepalive probe every INT seconds
mca_oob_tcp_component.keepalive_intvl = 20;
(void)mca_base_component_var_register(component, "keepalive_intvl",
"Time between successive keepalive pings when peer has not responded, in seconds (ignored if keepalive_time <= 0)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.keepalive_intvl);
// After sending PR probes every INT seconds consider the connection dead
mca_oob_tcp_component.keepalive_probes = 9;
(void)mca_base_component_var_register(component, "keepalive_probes",
"Number of keepalives that can be missed before declaring error (ignored if keepalive_time <= 0)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.keepalive_probes);
mca_oob_tcp_component.retry_delay = 0;
(void)mca_base_component_var_register(component, "retry_delay",
"Time (in sec) to wait before trying to connect to peer again",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.retry_delay);
mca_oob_tcp_component.max_recon_attempts = 10;
(void)mca_base_component_var_register(component, "max_recon_attempts",
"Max number of times to attempt connection before giving up (-1 -> never give up)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.max_recon_attempts);
return ORTE_SUCCESS;
}
static char **split_and_resolve(char **orig_str, char *name);
static int component_available(void)
{
int i, rc;
char **interfaces = NULL;
bool including = false, excluding = false;
char name[32];
struct sockaddr_storage my_ss;
int kindex;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"oob:tcp: component_available called");
/* if interface include was given, construct a list
* of those interfaces which match the specifications - remember,
* the includes could be given as named interfaces, IP addrs, or
* subnet+mask
*/
if (NULL != mca_oob_tcp_component.if_include) {
interfaces = split_and_resolve(&mca_oob_tcp_component.if_include,
"include");
including = true;
excluding = false;
} else if (NULL != mca_oob_tcp_component.if_exclude) {
interfaces = split_and_resolve(&mca_oob_tcp_component.if_exclude,
"exclude");
including = false;
excluding = true;
}
/* look at all available interfaces */
for (i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)) {
if (OPAL_SUCCESS != opal_ifindextoaddr(i, (struct sockaddr*) &my_ss,
sizeof (my_ss))) {
opal_output (0, "oob_tcp: problems getting address for index %i (kernel index %i)\n",
i, opal_ifindextokindex(i));
continue;
}
/* ignore non-ip4/6 interfaces */
if (AF_INET != my_ss.ss_family
#if OPAL_ENABLE_IPV6
&& AF_INET6 != my_ss.ss_family
#endif
) {
continue;
}
kindex = opal_ifindextokindex(i);
if (kindex <= 0) {
continue;
}
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"WORKING INTERFACE %d KERNEL INDEX %d FAMILY: %s", i, kindex,
(AF_INET == my_ss.ss_family) ? "V4" : "V6");
/* get the name for diagnostic purposes */
opal_ifindextoname(i, name, sizeof(name));
/* ignore any virtual interfaces */
if (0 == strncmp(name, "vir", 3)) {
continue;
}
/* handle include/exclude directives */
if (NULL != interfaces) {
/* check for match */
rc = opal_ifmatches(kindex, interfaces);
/* if one of the network specifications isn't parseable, then
* error out as we can't do what was requested
*/
if (OPAL_ERR_NETWORK_NOT_PARSEABLE == rc) {
orte_show_help("help-oob-tcp.txt", "not-parseable", true);
opal_argv_free(interfaces);
return ORTE_ERR_BAD_PARAM;
}
/* if we are including, then ignore this if not present */
if (including) {
if (OPAL_SUCCESS != rc) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting interface %s (not in include list)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
} else {
/* we are excluding, so ignore if present */
if (OPAL_SUCCESS == rc) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting interface %s (in exclude list)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
}
} else {
/* if no specific interfaces were provided, we ignore the loopback
* interface unless nothing else is available
*/
if (1 < opal_ifcount() && opal_ifisloopback(i)) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting loopback interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
}
/* Refs ticket #3019
* it would probably be worthwhile to print out a warning if OMPI detects multiple
* IP interfaces that are "up" on the same subnet (because that's a Bad Idea). Note
* that we should only check for this after applying the relevant include/exclude
* list MCA params. If we detect redundant ports, we can also automatically ignore
* them so that applications won't hang.
*/
/* add this address to our connections */
if (AF_INET == my_ss.ss_family) {
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"%s oob:tcp:init adding %s to our list of %s connections",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &my_ss),
(AF_INET == my_ss.ss_family) ? "V4" : "V6");
opal_argv_append_nosize(&mca_oob_tcp_component.ipv4conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
} else if (AF_INET6 == my_ss.ss_family) {
#if OPAL_ENABLE_IPV6
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"%s oob:tcp:init adding %s to our list of %s connections",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &my_ss),
(AF_INET == my_ss.ss_family) ? "V4" : "V6");
opal_argv_append_nosize(&mca_oob_tcp_component.ipv6conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
#endif // OPAL_ENABLE_IPV6
} else {
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"%s oob:tcp:init ignoring %s from out list of connections",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &my_ss));
}
}
/* cleanup */
if (NULL != interfaces) {
opal_argv_free(interfaces);
}
if (0 == opal_argv_count(mca_oob_tcp_component.ipv4conns)
#if OPAL_ENABLE_IPV6
&& 0 == opal_argv_count(mca_oob_tcp_component.ipv6conns)
#endif
) {
if (including) {
orte_show_help("help-oob-tcp.txt", "no-included-found", true, mca_oob_tcp_component.if_include);
} else if (excluding) {
orte_show_help("help-oob-tcp.txt", "excluded-all", true, mca_oob_tcp_component.if_exclude);
}
return ORTE_ERR_NOT_AVAILABLE;
}
return ORTE_SUCCESS;
}
static orte_rml_pathway_t* component_query_transports(void)
{
orte_rml_pathway_t *p;
char *qual;
/* if neither IPv4 or IPv6 connections are available, then
* we have nothing to support */
if (NULL == mca_oob_tcp_component.ipv4conns &&
NULL == mca_oob_tcp_component.ipv6conns) {
return NULL;
}
/* if we get here, then we support Ethernet and TCP */
p = OBJ_NEW(orte_rml_pathway_t);
p->component = strdup("oob");
orte_set_attribute(&p->attributes, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, "Ethernet", OPAL_STRING);
orte_set_attribute(&p->attributes, ORTE_RML_PROTOCOL_TYPE, ORTE_ATTR_LOCAL, "TCP", OPAL_STRING);
/* setup our qualifiers - we route communications, may have IPv4 and/or IPv6, etc. */
if (NULL != mca_oob_tcp_component.ipv4conns &&
NULL != mca_oob_tcp_component.ipv6conns) {
qual = "routed=true:ipv4:ipv6";
} else if (NULL == mca_oob_tcp_component.ipv6conns) {
qual = "routed=true:ipv4";
} else {
qual = "routed=true:ipv6";
}
orte_set_attribute(&p->attributes, ORTE_RML_QUALIFIER_ATTRIB, ORTE_ATTR_LOCAL, qual, OPAL_STRING);
return p;
}
/* Start all modules */
static int component_startup(void)
{
int rc = ORTE_SUCCESS;
int i;
char *tmp;
opal_event_base_t *evb;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP STARTUP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* initialize state */
if (0 == orte_oob_base.num_threads) {
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, orte_oob_base.ev_base);
} else {
for (i=0; i < orte_oob_base.num_threads; i++) {
asprintf(&tmp, "OOB-TCP-%d", i);
evb = opal_progress_thread_init(tmp);
opal_pointer_array_add(&mca_oob_tcp_component.ev_bases, evb);
opal_argv_append_nosize(&mca_oob_tcp_component.ev_threads, tmp);
free(tmp);
}
}
/* if we are a daemon/HNP, or we are a standalone app,
* then it is possible that someone else may initiate a
* connection to us. In these cases, we need to start the
* listening thread/event. Otherwise, we will be the one
* initiating communication, and there is no need for
* a listener */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON ||
orte_standalone_operation) {
if (ORTE_SUCCESS != (rc = orte_oob_tcp_start_listening())) {
ORTE_ERROR_LOG(rc);
}
}
return rc;
}
static void cleanup(int sd, short args, void *cbdata)
{
opal_list_item_t * item;
bool *active = (bool*)cbdata;
ORTE_ACQUIRE_OBJECT(active);
while (NULL != (item = opal_list_remove_first(&mca_oob_tcp_component.listeners))) {
OBJ_RELEASE(item);
}
if (NULL != active) {
*active = false;
}
}
static void component_shutdown(void)
{
mca_oob_tcp_peer_t *peer;
uint64_t ui64;
int i = 0;
bool active;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP SHUTDOWN",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* cleanup all peers */
OPAL_HASH_TABLE_FOREACH(ui64, uint64, peer, &mca_oob_tcp_component.peers) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s RELEASING PEER OBJ %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
if (NULL != peer) {
OBJ_RELEASE(peer);
}
}
if (0 < orte_oob_base.num_threads) {
for (i=0; i < orte_oob_base.num_threads; i++) {
opal_progress_thread_finalize(mca_oob_tcp_component.ev_threads[i]);
opal_pointer_array_set_item(&mca_oob_tcp_component.ev_bases, i, NULL);
}
opal_argv_free(mca_oob_tcp_component.ev_threads);
}
if (ORTE_PROC_IS_HNP && mca_oob_tcp_component.listen_thread_active) {
mca_oob_tcp_component.listen_thread_active = false;
/* tell the thread to exit */
write(mca_oob_tcp_component.stop_thread[1], &i, sizeof(int));
opal_thread_join(&mca_oob_tcp_component.listen_thread, NULL);
} else {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"no hnp or not active");
}
/* because the listeners are in a separate
* async thread for apps, we can't just release them here.
* Instead, we push it into that event thread and release
* them there */
if (ORTE_PROC_IS_APP) {
opal_event_t ev;
active = true;
opal_event_set(orte_event_base, &ev, -1,
OPAL_EV_WRITE, cleanup, &active);
opal_event_set_priority(&ev, ORTE_ERROR_PRI);
ORTE_POST_OBJECT(active);
opal_event_active(&ev, OPAL_EV_WRITE, 1);
ORTE_WAIT_FOR_COMPLETION(active);
} else {
/* we can call the destruct directly */
cleanup(0, 0, NULL);
}
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP SHUTDOWN done",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
static int component_send(orte_rml_send_t *msg)
{
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:tcp:send_nb to peer %s:%d seq = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst), msg->tag, msg->seq_num );
/* The module will first see if it knows
* of a way to send the data to the target, and then
* attempt to send the data. It will call the cbfunc
* with the status upon completion - if it can't do it for
* some reason, it will pass the error to our fn below so
* it can do something about it
*/
mca_oob_tcp_module.send_nb(msg);
return ORTE_SUCCESS;
}
static char* component_get_addr(void)
{
char *cptr=NULL, *tmp, *tp;
if (!mca_oob_tcp_component.disable_ipv4_family &&
NULL != mca_oob_tcp_component.ipv4conns) {
tmp = opal_argv_join(mca_oob_tcp_component.ipv4conns, ',');
tp = opal_argv_join(mca_oob_tcp_component.ipv4ports, ',');
asprintf(&cptr, "tcp://%s:%s", tmp, tp);
free(tmp);
free(tp);
}
#if OPAL_ENABLE_IPV6
if (!mca_oob_tcp_component.disable_ipv6_family &&
NULL != mca_oob_tcp_component.ipv6conns) {
char *tmp2;
/* Fixes #2498
* RFC 3986, section 3.2.2
* The notation in that case is to encode the IPv6 IP number in square brackets:
* "http://[2001:db8:1f70::999:de8:7648:6e8]:100/"
* A host identified by an Internet Protocol literal address, version 6 [RFC3513]
* or later, is distinguished by enclosing the IP literal within square brackets.
* This is the only place where square bracket characters are allowed in the URI
* syntax. In anticipation of future, as-yet-undefined IP literal address formats,
* an implementation may use an optional version flag to indicate such a format
* explicitly rather than rely on heuristic determination.
*/
tmp = opal_argv_join(mca_oob_tcp_component.ipv6conns, ',');
tp = opal_argv_join(mca_oob_tcp_component.ipv6ports, ',');
if (NULL == cptr) {
/* no ipv4 stuff */
asprintf(&cptr, "tcp6://[%s]:%s", tmp, tp);
} else {
asprintf(&tmp2, "%s;tcp6://[%s]:%s", cptr, tmp, tp);
free(cptr);
cptr = tmp2;
}
free(tmp);
free(tp);
}
#endif // OPAL_ENABLE_IPV6
/* return our uri */
return cptr;
}
/* the host in this case is always in "dot" notation, and
* thus we do not need to do a DNS lookup to convert it */
static int parse_uri(const uint16_t af_family,
const char* host,
const char *port,
struct sockaddr_storage* inaddr)
{
struct sockaddr_in *in;
if (AF_INET == af_family) {
memset(inaddr, 0, sizeof(struct sockaddr_in));
in = (struct sockaddr_in*) inaddr;
in->sin_family = AF_INET;
in->sin_addr.s_addr = inet_addr(host);
if (in->sin_addr.s_addr == INADDR_NONE) {
return ORTE_ERR_BAD_PARAM;
}
((struct sockaddr_in*) inaddr)->sin_port = htons(atoi(port));
}
#if OPAL_ENABLE_IPV6
else if (AF_INET6 == af_family) {
struct sockaddr_in6 *in6;
memset(inaddr, 0, sizeof(struct sockaddr_in6));
in6 = (struct sockaddr_in6*) inaddr;
if (0 == inet_pton(AF_INET6, host, (void*)&in6->sin6_addr)) {
opal_output (0, "oob_tcp_parse_uri: Could not convert %s\n", host);
return ORTE_ERR_BAD_PARAM;
}
}
#endif
else {
return ORTE_ERR_NOT_SUPPORTED;
}
return ORTE_SUCCESS;
}
static int component_set_addr(orte_process_name_t *peer,
char **uris)
{
char **addrs, *hptr;
char *tcpuri=NULL, *host, *ports;
int i, j, rc;
uint16_t af_family = AF_UNSPEC;
uint64_t ui64;
bool found;
mca_oob_tcp_peer_t *pr;
mca_oob_tcp_addr_t *maddr;
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
/* cycle across component parts and see if one belongs to us */
found = false;
for (i=0; NULL != uris[i]; i++) {
tcpuri = strdup(uris[i]);
if (NULL == tcpuri) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: out of memory",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
continue;
}
if (0 == strncmp(uris[i], "tcp:", 4)) {
af_family = AF_INET;
host = tcpuri + strlen("tcp://");
} else if (0 == strncmp(uris[i], "tcp6:", 5)) {
#if OPAL_ENABLE_IPV6
af_family = AF_INET6;
host = tcpuri + strlen("tcp6://");
#else // OPAL_ENABLE_IPV6
/* we don't support this connection type */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: address %s not supported",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
free(tcpuri);
continue;
#endif // OPAL_ENABLE_IPV6
} else {
/* not one of ours */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: ignoring address %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
free(tcpuri);
continue;
}
/* this one is ours - record the peer */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: working peer %s address %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), uris[i]);
/* separate the ports from the network addrs */
ports = strrchr(tcpuri, ':');
if (NULL == ports) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
free(tcpuri);
continue;
}
*ports = '\0';
ports++;
/* split the addrs */
/* if this is a tcp6 connection, the first one will have a '['
* at the beginning of it, and the last will have a ']' at the
* end - we need to remove those extra characters
*/
hptr = host;
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
if ('[' == host[0]) {
hptr = &host[1];
}
if (']' == host[strlen(host)-1]) {
host[strlen(host)-1] = '\0';
}
}
#endif // OPAL_ENABLE_IPV6
addrs = opal_argv_split(hptr, ',');
/* cycle across the provided addrs */
for (j=0; NULL != addrs[j]; j++) {
/* if they gave us "localhost", then just take the first conn on our list */
if (0 == strcasecmp(addrs[j], "localhost")) {
#if OPAL_ENABLE_IPV6
if (AF_INET6 == af_family) {
if (NULL == mca_oob_tcp_component.ipv6conns ||
NULL == mca_oob_tcp_component.ipv6conns[0]) {
continue;
}
host = mca_oob_tcp_component.ipv6conns[0];
} else {
#endif // OPAL_ENABLE_IPV6
if (NULL == mca_oob_tcp_component.ipv4conns ||
NULL == mca_oob_tcp_component.ipv4conns[0]) {
continue;
}
host = mca_oob_tcp_component.ipv4conns[0];
#if OPAL_ENABLE_IPV6
}
#endif
} else {
host = addrs[j];
}
if (NULL == (pr = mca_oob_tcp_peer_lookup(peer))) {
pr = OBJ_NEW(mca_oob_tcp_peer_t);
pr->name.jobid = peer->jobid;
pr->name.vpid = peer->vpid;
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s SET_PEER ADDING PEER %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer));
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr)) {
OBJ_RELEASE(pr);
return ORTE_ERR_TAKE_NEXT_OPTION;
}
}
maddr = OBJ_NEW(mca_oob_tcp_addr_t);
if (ORTE_SUCCESS != (rc = parse_uri(af_family, host, ports, (struct sockaddr_storage*) &(maddr->addr)))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(maddr);
opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, NULL);
OBJ_RELEASE(pr);
return ORTE_ERR_TAKE_NEXT_OPTION;
}
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s set_peer: peer %s is listening on net %s port %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer),
(NULL == host) ? "NULL" : host,
(NULL == ports) ? "NULL" : ports);
opal_list_append(&pr->addrs, &maddr->super);
found = true;
}
opal_argv_free(addrs);
free(tcpuri);
}
if (found) {
/* indicate that this peer is addressable by this component */
return ORTE_SUCCESS;
}
/* otherwise indicate that it is not addressable by us */
return ORTE_ERR_TAKE_NEXT_OPTION;
}
static bool component_is_reachable(char *rtmod, orte_process_name_t *peer)
{
orte_process_name_t hop;
/* if we have a route to this peer, then we can reach it */
hop = orte_routed.get_route(rtmod, peer);
if (ORTE_JOBID_INVALID == hop.jobid ||
ORTE_VPID_INVALID == hop.vpid) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s is NOT reachable by TCP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return false;
}
/* assume we can reach the hop - the module will tell us if it can't
* when we try to send the first time, and then we'll correct it */
return true;
}
#if OPAL_ENABLE_FT_CR == 1
static int component_ft_event(int state)
{
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP FT EVENT", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* pass it into the module */
if (NULL != mca_oob_tcp_module.api.ft_event) {
mca_oob_tcp_module.api.ft_event(state);
}
return ORTE_SUCCESS;
}
#endif // OPAL_ENABLE_FT_CR
void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
uint64_t ui64;
int rc;
orte_oob_base_peer_t *bpr;
ORTE_ACQUIRE_OBJECT(pop);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:set_module called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* make sure the OOB knows that we can reach this peer - we
* are in the same event base as the OOB base, so we can
* directly access its storage
*/
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) || NULL == bpr) {
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_set_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
bpr->component = &mca_oob_tcp_component.super;
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, bpr))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(pop);
}
void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
uint64_t ui64;
orte_oob_base_peer_t *bpr;
int rc;
ORTE_ACQUIRE_OBJECT(pop);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:lost connection called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* Mark that we no longer support this peer */
memcpy(&ui64, (char*)&pop->peer, sizeof(uint64_t));
if (OPAL_SUCCESS == opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) && NULL != bpr) {
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
OBJ_RELEASE(bpr);
}
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (!orte_finalizing) {
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(pop->rtmod, &pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
}
}
OBJ_RELEASE(pop);
}
void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
{
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
uint64_t ui64;
int rc;
orte_oob_base_peer_t *bpr;
ORTE_ACQUIRE_OBJECT(mop);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:no route called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));
/* mark that we cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) || NULL == bpr) {
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* report the error back to the OOB and let it try other components
* or declare a problem
*/
mop->rmsg->retries++;
/* activate the OOB send state */
ORTE_OOB_SEND(mop->rmsg);
OBJ_RELEASE(mop);
}
void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
{
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
uint64_t ui64;
orte_rml_send_t *snd;
orte_oob_base_peer_t *bpr;
ORTE_ACQUIRE_OBJECT(mop);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:unknown hop called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop));
if (orte_finalizing || orte_abnormal_term_ordered) {
/* just ignore the problem */
OBJ_RELEASE(mop);
return;
}
/* mark that this component cannot reach this hop */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) ||
NULL == bpr) {
/* the overall OOB has no knowledge of this hop. Only
* way this could happen is if the peer contacted us
* via this component, and it wasn't entered into the
* OOB framework hash table. We have no way of knowing
* what to do next, so just output an error message and
* abort */
opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of the reqd hop %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->snd->hdr.dst),
ORTE_NAME_PRINT(&mop->hop));
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_UNABLE_TO_SEND_MSG);
OBJ_RELEASE(mop);
return;
}
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
/* mark that this component cannot reach this destination either */
memcpy(&ui64, (char*)&(mop->snd->hdr.dst), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) ||
NULL == bpr) {
opal_output(0, "%s ERROR: message to %s requires routing and the OOB has no knowledge of this process",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->snd->hdr.dst));
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_UNABLE_TO_SEND_MSG);
OBJ_RELEASE(mop);
return;
}
opal_bitmap_clear_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
/* post the message to the OOB so it can see
* if another component can transfer it
*/
MCA_OOB_TCP_HDR_NTOH(&mop->snd->hdr);
snd = OBJ_NEW(orte_rml_send_t);
snd->retries = mop->rmsg->retries + 1;
snd->dst = mop->snd->hdr.dst;
snd->origin = mop->snd->hdr.origin;
snd->tag = mop->snd->hdr.tag;
snd->seq_num = mop->snd->hdr.seq_num;
snd->data = mop->snd->data;
snd->count = mop->snd->hdr.nbytes;
snd->cbfunc.iov = NULL;
snd->cbdata = NULL;
snd->routed = strdup(mop->snd->hdr.routed);
/* activate the OOB send state */
ORTE_OOB_SEND(snd);
/* protect the data */
mop->snd->data = NULL;
OBJ_RELEASE(mop);
}
void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
ORTE_ACQUIRE_OBJECT(pop);
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:failed_to_connect called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* if we are terminating, then don't attempt to reconnect */
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
OBJ_RELEASE(pop);
return;
}
/* activate the proc state */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:failed_to_connect unable to reach peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_FAILED_TO_CONNECT);
OBJ_RELEASE(pop);
}
/*
* Go through a list of argv; if there are any subnet specifications
* (a.b.c.d/e), resolve them to an interface name (Currently only
* supporting IPv4). If unresolvable, warn and remove.
*/
static char **split_and_resolve(char **orig_str, char *name)
{
int i, ret, save, if_index;
char **argv, *str, *tmp;
char if_name[IF_NAMESIZE];
struct sockaddr_storage argv_inaddr, if_inaddr;
uint32_t argv_prefix;
/* Sanity check */
if (NULL == orig_str || NULL == *orig_str) {
return NULL;
}
argv = opal_argv_split(*orig_str, ',');
if (NULL == argv) {
return NULL;
}
for (save = i = 0; NULL != argv[i]; ++i) {
if (isalpha(argv[i][0])) {
argv[save++] = argv[i];
continue;
}
/* Found a subnet notation. Convert it to an IP
address/netmask. Get the prefix first. */
argv_prefix = 0;
tmp = strdup(argv[i]);
str = strchr(argv[i], '/');
if (NULL == str) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename,
tmp, "Invalid specification (missing \"/\")");
free(argv[i]);
free(tmp);
continue;
}
*str = '\0';
argv_prefix = atoi(str + 1);
/* Now convert the IPv4 address */
((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
ret = inet_pton(AF_INET, argv[i],
&((struct sockaddr_in*) &argv_inaddr)->sin_addr);
free(argv[i]);
if (1 != ret) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename, tmp,
"Invalid specification (inet_pton() failed)");
free(tmp);
continue;
}
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp: Searching for %s address+prefix: %s / %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
name,
opal_net_get_hostname((struct sockaddr*) &argv_inaddr),
argv_prefix);
/* Go through all interfaces and see if we can find a match */
for (if_index = opal_ifbegin(); if_index >= 0;
if_index = opal_ifnext(if_index)) {
opal_ifindextoaddr(if_index,
(struct sockaddr*) &if_inaddr,
sizeof(if_inaddr));
if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr,
(struct sockaddr*) &if_inaddr,
argv_prefix)) {
break;
}
}
/* If we didn't find a match, keep trying */
if (if_index < 0) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename, tmp,
"Did not find interface matching this subnet");
free(tmp);
continue;
}
/* We found a match; get the name and replace it in the
argv */
opal_ifindextoname(if_index, if_name, sizeof(if_name));
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp: Found match: %s (%s)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &if_inaddr),
if_name);
argv[save++] = strdup(if_name);
free(tmp);
}
/* The list may have been compressed if there were invalid
entries, so ensure we end it with a NULL entry */
argv[save] = NULL;
free(*orig_str);
*orig_str = opal_argv_join(argv, ',');
return argv;
}
/* OOB TCP Class instances */
static void peer_cons(mca_oob_tcp_peer_t *peer)
{
peer->ev_base = NULL;
peer->auth_method = NULL;
peer->sd = -1;
OBJ_CONSTRUCT(&peer->addrs, opal_list_t);
peer->active_addr = NULL;
peer->state = MCA_OOB_TCP_UNCONNECTED;
peer->num_retries = 0;
OBJ_CONSTRUCT(&peer->send_queue, opal_list_t);
peer->send_msg = NULL;
peer->recv_msg = NULL;
peer->send_ev_active = false;
peer->recv_ev_active = false;
peer->timer_ev_active = false;
}
static void peer_des(mca_oob_tcp_peer_t *peer)
{
if (NULL != peer->auth_method) {
free(peer->auth_method);
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
}
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
}
if (peer->timer_ev_active) {
opal_event_del(&peer->timer_event);
}
if (0 <= peer->sd) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s CLOSING SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
peer->sd);
CLOSE_THE_SOCKET(peer->sd);
}
OPAL_LIST_DESTRUCT(&peer->addrs);
OPAL_LIST_DESTRUCT(&peer->send_queue);
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_t,
opal_list_item_t,
peer_cons, peer_des);
static void padd_cons(mca_oob_tcp_addr_t *ptr)
{
memset(&ptr->addr, 0, sizeof(ptr->addr));
ptr->retries = 0;
ptr->state = MCA_OOB_TCP_UNCONNECTED;
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_addr_t,
opal_list_item_t,
padd_cons, NULL);
static void pop_cons(mca_oob_tcp_peer_op_t *pop)
{
pop->rtmod = NULL;
pop->net = NULL;
pop->port = NULL;
}
static void pop_des(mca_oob_tcp_peer_op_t *pop)
{
if (NULL != pop->rtmod) {
free(pop->rtmod);
}
if (NULL != pop->net) {
free(pop->net);
}
if (NULL != pop->port) {
free(pop->port);
}
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_op_t,
opal_object_t,
pop_cons, pop_des);
OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_op_t,
opal_object_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t,
opal_object_t,
NULL, NULL);
static void nicaddr_cons(mca_oob_tcp_nicaddr_t *ptr)
{
ptr->af_family = PF_UNSPEC;
memset(&ptr->addr, 0, sizeof(ptr->addr));
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_nicaddr_t,
opal_list_item_t,
nicaddr_cons, NULL);