1
1
openmpi/orte/mca/oob/tcp/oob_tcp_component.c
Ralph Castain 2a116ecdfc Fix a race condition created when two processes attempt to send to each other at the same time. This causes both processes to start connection procedures, resulting in a c
onflict that can cause messages to be lost. Add detection of this condition, and have both processes cancel their connect operations. The process with the higher rank will
 reconnect, while the lower rank process will simply wait for the connection to be created.

Refs trac:3696

This commit was SVN r29139.

The following Trac tickets were found above:
  Ticket 3696 --> https://svn.open-mpi.org/trac/ompi/ticket/3696
2013-09-06 05:15:25 +00:00

1502 строки
60 KiB
C

/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will not conflict with other error codes that
* are returned by these functions under UNIX/Linux environments
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/oob/tcp/oob_tcp.h"
#include "orte/mca/oob/tcp/oob_tcp_component.h"
#include "orte/mca/oob/tcp/oob_tcp_peer.h"
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
#include "orte/mca/oob/tcp/oob_tcp_listener.h"
#include "orte/mca/oob/tcp/oob_tcp_ping.h"
/*
* Local utility functions
*/
static int tcp_component_register(void);
static int tcp_component_open(void);
static int tcp_component_close(void);
static bool component_available(void);
static int component_startup(void);
static void component_shutdown(void);
static int component_send(orte_rml_send_t *msg);
static char* component_get_addr(void);
static int component_set_addr(orte_process_name_t *peer,
char **uris);
static bool component_is_reachable(orte_process_name_t *peer);
/*
* Struct of function pointers and all that to let us be initialized
*/
mca_oob_tcp_component_t mca_oob_tcp_component = {
{
{
MCA_OOB_BASE_VERSION_2_0_0,
"tcp", /* MCA module name */
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
tcp_component_open, /* component open */
tcp_component_close, /* component close */
NULL, /* component query */
tcp_component_register, /* component register */
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
0, // reserve space for an assigned index
100, // default priority of this transport
component_available,
component_startup,
component_shutdown,
component_send,
component_get_addr,
component_set_addr,
component_is_reachable
},
};
/*
* Initialize global variables used w/in this module.
*/
static int tcp_component_open(void)
{
/* initialize state */
OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t);
if (ORTE_PROC_IS_HNP) {
OBJ_CONSTRUCT(&mca_oob_tcp_component.listen_thread, opal_thread_t);
mca_oob_tcp_component.listen_thread_active = false;
mca_oob_tcp_component.listen_thread_tv.tv_sec = 1;
mca_oob_tcp_component.listen_thread_tv.tv_usec = 0;
}
mca_oob_tcp_component.addr_count = 0;
OBJ_CONSTRUCT(&mca_oob_tcp_component.modules, opal_pointer_array_t);
opal_pointer_array_init(&mca_oob_tcp_component.modules, 4, INT_MAX, 2);
mca_oob_tcp_component.ipv4conns = NULL;
mca_oob_tcp_component.ipv4ports = NULL;
OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t);
opal_hash_table_init(&mca_oob_tcp_component.peers, 32);
/* if_include and if_exclude need to be mutually exclusive */
if (OPAL_SUCCESS !=
mca_base_var_check_exclusive("orte",
mca_oob_tcp_component.super.oob_base.mca_type_name,
mca_oob_tcp_component.super.oob_base.mca_component_name,
"if_include",
mca_oob_tcp_component.super.oob_base.mca_type_name,
mca_oob_tcp_component.super.oob_base.mca_component_name,
"if_exclude")) {
/* Return ERR_NOT_AVAILABLE so that a warning message about
"open" failing is not printed */
return ORTE_ERR_NOT_AVAILABLE;
}
return ORTE_SUCCESS;
}
/*
* Cleanup of global variables used by this module.
*/
static int tcp_component_close(void)
{
int i;
mca_oob_tcp_module_t *mod;
/* don't cleanup the listen thread as it wasn't constructed
* for anything other than the HNP, and we don't want to incur
* the timeout penalty when the HNP exits that would be required
* to stop the thread
*/
/* cleanup listen event list */
OBJ_DESTRUCT(&mca_oob_tcp_component.listeners);
/* cleanup modules */
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
if (NULL != (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
free(mod);
}
}
OBJ_DESTRUCT(&mca_oob_tcp_component.modules);
if (NULL != mca_oob_tcp_component.ipv4conns) {
opal_argv_free(mca_oob_tcp_component.ipv4conns);
}
if (NULL != mca_oob_tcp_component.ipv4ports) {
opal_argv_free(mca_oob_tcp_component.ipv4ports);
}
OBJ_DESTRUCT(&mca_oob_tcp_component.peers);
return ORTE_SUCCESS;
}
static char *static_port_string;
static char *dyn_port_string;
static int tcp_component_register(void)
{
mca_base_component_t *component = &mca_oob_tcp_component.super.oob_base;
int var_id;
/* register oob module parameters */
mca_oob_tcp_component.peer_limit = -1;
(void)mca_base_component_var_register(component, "peer_limit",
"Maximum number of peer connections to simultaneously maintain (-1 = infinite)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.peer_limit);
mca_oob_tcp_component.max_retries = 2;
(void)mca_base_component_var_register(component, "peer_retries",
"Number of times to try shutting down a connection before giving up",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.max_retries);
mca_oob_tcp_component.tcp_sndbuf = 128 * 1024;
(void)mca_base_component_var_register(component, "sndbuf",
"TCP socket send buffering size (in bytes)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.tcp_sndbuf);
mca_oob_tcp_component.tcp_rcvbuf = 128 * 1024;
(void)mca_base_component_var_register(component, "rcvbuf",
"TCP socket receive buffering size (in bytes)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.tcp_rcvbuf);
mca_oob_tcp_component.if_include = NULL;
var_id = mca_base_component_var_register(component, "if_include",
"Comma-delimited list of devices and/or CIDR notation of TCP networks to use for Open MPI bootstrap communication (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with oob_tcp_if_exclude.",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.if_include);
(void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "include",
MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
mca_oob_tcp_component.if_exclude = NULL;
var_id = mca_base_component_var_register(component, "if_exclude",
"Comma-delimited list of devices and/or CIDR notation of TCP networks to NOT use for Open MPI bootstrap communication -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with oob_tcp_if_include.",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_tcp_component.if_exclude);
(void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "exclude",
MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL);
/* if_include and if_exclude need to be mutually exclusive */
if (NULL != mca_oob_tcp_component.if_include &&
NULL != mca_oob_tcp_component.if_exclude) {
/* Return ERR_NOT_AVAILABLE so that a warning message about
"open" failing is not printed */
orte_show_help("help-oob-tcp.txt", "include-exclude", true,
mca_oob_tcp_component.if_include,
mca_oob_tcp_component.if_exclude);
return ORTE_ERR_NOT_AVAILABLE;
}
#if ORTE_ENABLE_STATIC_PORTS
static_port_string = NULL;
(void)mca_base_component_var_register(component, "static_ipv4_ports",
"Static ports for daemons and procs (IPv4)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&static_port_string);
/* if ports were provided, parse the provided range */
if (NULL != static_port_string) {
orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp_static_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp_static_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp_static_ports);
mca_oob_tcp_component.tcp_static_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp_static_ports = NULL;
}
#if OPAL_ENABLE_IPV6
static_port_string = NULL;
(void)mca_base_component_var_register(component, "static_ipv6_ports",
"Static ports for daemons and procs (IPv6)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&static_port_string);
/* if ports were provided, parse the provided range */
if (NULL != static_port_string) {
orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp6_static_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp6_static_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp6_static_ports);
mca_oob_tcp_component.tcp6_static_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp6_static_ports = NULL;
}
if (NULL == mca_oob_tcp_component.tcp_static_ports &&
NULL == mca_oob_tcp_component.tcp6_static_ports) {
orte_static_ports = false;
} else {
orte_static_ports = true;
}
#endif
#endif
dyn_port_string = NULL;
(void)mca_base_component_var_register(component, "dynamic_ipv4_ports",
"Range of ports to be dynamically used by daemons and procs (IPv4)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&dyn_port_string);
/* if ports were provided, parse the provided range */
if (NULL != dyn_port_string) {
/* can't have both static and dynamic ports! */
if (orte_static_ports) {
char *err = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
opal_show_help("help-oob-tcp.txt", "static-and-dynamic", true,
err, dyn_port_string);
free(err);
return ORTE_ERROR;
}
orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp_dyn_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp_dyn_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp_dyn_ports);
mca_oob_tcp_component.tcp_dyn_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp_dyn_ports = NULL;
}
#if OPAL_ENABLE_IPV6
dyn_port_string = NULL;
(void)mca_base_component_var_register(component, "dynamic_ipv6_ports",
"Range of ports to be dynamically used by daemons and procs (IPv6)",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&dyn_port_string);
/* if ports were provided, parse the provided range */
if (NULL != dyn_port_string) {
/* can't have both static and dynamic ports! */
if (orte_static_ports) {
char *err4=NULL, *err6=NULL;
if (NULL != mca_oob_tcp_component.tcp_static_ports) {
err4 = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ',');
}
if (NULL != mca_oob_tcp_component.tcp6_static_ports) {
err6 = opal_argv_join(mca_oob_tcp_component.tcp6_static_ports, ',');
}
opal_show_help("help-oob-tcp.txt", "static-and-dynamic-ipv6", true,
(NULL == err4) ? "N/A" : err4,
(NULL == err6) ? "N/A" : err6,
dyn_port_string);
if (NULL != err4) {
free(err4);
}
if (NULL != err6) {
free(err6);
}
return ORTE_ERROR;
}
orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp6_dyn_ports);
if (0 == strcmp(mca_oob_tcp_component.tcp6_dyn_ports[0], "-1")) {
opal_argv_free(mca_oob_tcp_component.tcp6_dyn_ports);
mca_oob_tcp_component.tcp6_dyn_ports = NULL;
}
} else {
mca_oob_tcp_component.tcp6_dyn_ports = NULL;
}
#endif
mca_oob_tcp_component.disable_ipv4_family = false;
(void)mca_base_component_var_register(component, "disable_ipv4_family",
"Disable the IPv4 interfaces",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.disable_ipv4_family);
#if OPAL_ENABLE_IPV6
mca_oob_tcp_component.disable_ipv6_family = false;
(void)mca_base_component_var_register(component, "disable_ipv6_family",
"Disable the IPv6 interfaces",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_oob_tcp_component.disable_ipv6_family);
#endif
return ORTE_SUCCESS;
}
static char **split_and_resolve(char **orig_str, char *name);
static int mca_oob_tcp_create(int if_idx,
const char *if_name);
static bool component_available(void)
{
int i, j, rc;
char **interfaces = NULL;
bool including = false, excluding = false;
char name[32];
struct sockaddr_storage my_ss;
int kindex;
bool add_this_nic;
mca_oob_tcp_module_t *mod;
mca_oob_tcp_nicaddr_t *nicaddr;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"oob:tcp: component_available called");
/* if interface include was given, construct a list
* of those interfaces which match the specifications - remember,
* the includes could be given as named interfaces, IP addrs, or
* subnet+mask
*/
if (NULL != mca_oob_tcp_component.if_include) {
interfaces = split_and_resolve(&mca_oob_tcp_component.if_include,
"include");
including = true;
excluding = false;
} else if (NULL != mca_oob_tcp_component.if_exclude) {
interfaces = split_and_resolve(&mca_oob_tcp_component.if_exclude,
"exclude");
including = false;
excluding = true;
}
/* look at all available interfaces */
for (i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)) {
if (OPAL_SUCCESS != opal_ifindextoaddr(i, (struct sockaddr*) &my_ss,
sizeof (my_ss))) {
opal_output (0, "oob_tcp: problems getting address for index %i (kernel index %i)\n",
i, opal_ifindextokindex(i));
continue;
}
/* ignore non-ip4/6 interfaces */
if (AF_INET != my_ss.ss_family
#if OPAL_ENABLE_IPV6
&& AF_INET6 != my_ss.ss_family
#endif
) {
continue;
}
kindex = opal_ifindextokindex(i);
if (kindex <= 0) {
continue;
}
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"WORKING INTERFACE %d KERNEL INDEX %d FAMILY: %s", i, kindex,
(AF_INET == my_ss.ss_family) ? "V4" : "V6");
/* get the name for diagnostic purposes */
opal_ifindextoname(i, name, sizeof(name));
/* handle include/exclude directives */
if (NULL != interfaces) {
/* check for match */
rc = opal_ifmatches(i, interfaces);
/* if one of the network specifications isn't parseable, then
* error out as we can't do what was requested
*/
if (OPAL_ERR_NETWORK_NOT_PARSEABLE == rc) {
orte_show_help("help-oob-tcp.txt", "not-parseable", true);
opal_argv_free(interfaces);
return false;
}
/* if we are including, then ignore this if not present */
if (including) {
if (OPAL_SUCCESS != rc) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting interface %s (not in include list)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
} else {
/* we are excluding, so ignore if present */
if (OPAL_SUCCESS == rc) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting interface %s (in exclude list)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
}
} else {
/* if no specific interfaces were provided, we ignore the loopback
* interface unless nothing else is available
*/
if (1 < opal_ifcount() && opal_ifisloopback(i)) {
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp:init rejecting loopback interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name);
continue;
}
}
/* we know we want this address - check if we have seen this NIC before */
add_this_nic = true;
for (j = 0; j < mca_oob_tcp_component.modules.size; j++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, j))) {
continue;
}
/* Have we seen this NIC already? */
if (kindex == mod->if_kidx) {
add_this_nic = false;
/* we don't want another module to be created. But we still need to preserve
* the address as a given NIC can have multiple addresses.
*/
nicaddr = OBJ_NEW(mca_oob_tcp_nicaddr_t);
nicaddr->af_family = my_ss.ss_family;
memcpy(&nicaddr->addr, &my_ss, sizeof(struct sockaddr));
opal_list_append(&mod->addresses, &nicaddr->super);
opal_output_verbose(10, orte_oob_base_framework.framework_output,
"%s oob:tcp:init adding %s address to interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(AF_INET == my_ss.ss_family) ? "V4" : "V6", name);
break;
}
}
/* Refs ticket #3019
* it would probably be worthwhile to print out a warning if OMPI detects multiple
* IP interfaces that are "up" on the same subnet (because that's a Bad Idea). Note
* that we should only check for this after applying the relevant include/exclude
* list MCA params. If we detect redundant ports, we can also automatically ignore
* them so that applications won't hang.
*/
if (add_this_nic) {
/* we want to support this interface, so create a module for it */
if (ORTE_SUCCESS != (rc = mca_oob_tcp_create(kindex, name))) {
ORTE_ERROR_LOG(rc);
return false;
}
}
/* add this address to our connections */
if (AF_INET == my_ss.ss_family) {
opal_argv_append_nosize(&mca_oob_tcp_component.ipv4conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
} else if (AF_INET6 == my_ss.ss_family) {
#if OPAL_ENABLE_IPV6
opal_argv_append_nosize(&mca_oob_tcp_component.ipv6conns, opal_net_get_hostname((struct sockaddr*) &my_ss));
#endif
}
}
/* cleanup */
if (NULL != interfaces) {
opal_argv_free(interfaces);
}
if (0 == mca_oob_tcp_component.num_modules) {
if (including) {
orte_show_help("help-oob-tcp.txt", "no-included-found", true, mca_oob_tcp_component.if_include);
} else if (excluding) {
orte_show_help("help-oob-tcp.txt", "excluded-all", true, mca_oob_tcp_component.if_exclude);
} else {
orte_show_help("help-oob-tcp.txt", "no-interfaces-avail", true);
}
return false;
}
return true;
}
/* Start all modules */
static int component_startup(void)
{
mca_oob_tcp_module_t *mod;
int i, rc;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP STARTUP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* start the modules */
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
continue;
}
if (NULL != mod->api.init) {
mod->api.init((struct mca_oob_tcp_module_t*)mod);
}
}
/* start the listening thread/event */
if (ORTE_SUCCESS != (rc = orte_oob_tcp_start_listening())) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
static void component_shutdown(void)
{
mca_oob_tcp_module_t *mod;
int i;
opal_list_item_t *item;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s TCP SHUTDOWN",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
while (NULL != (item = opal_list_remove_first(&mca_oob_tcp_component.listeners))) {
OBJ_RELEASE(item);
}
/* shutdown the modules */
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
continue;
}
if (NULL != mod->api.finalize) {
mod->api.finalize((struct mca_oob_tcp_module_t*)mod);
}
}
}
static int component_send(orte_rml_send_t *msg)
{
int i;
mca_oob_tcp_component_peer_t *pr;
uint64_t ui64;
mca_oob_tcp_module_t *mod;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:tcp:send_nb to peer %s:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->peer), msg->tag);
/* do we know some way of potentially reaching this peer? */
memcpy(&ui64, (char*)&msg->peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr)) {
/* nope - let someone else try */
return ORTE_ERR_TAKE_NEXT_OPTION;
}
/* if we knew the peer but have found all routes unreachable, then
* we can't send it
*/
if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) {
return ORTE_ERR_TAKE_NEXT_OPTION;
}
/* if a module is assigned, then use it */
if (NULL != pr->mod) {
/* the module is potentially running on its own event
* base, so all it can do is push our send request
* onto an event - it cannot tell us if it will
* succeed. The module will attempt to send the data and
* will call the cbfunc with the status
* upon completion - if it can't do it for
* some reason, it will call the component error
* function so we can try with another module
*/
pr->mod->api.send_nb((struct mca_oob_tcp_module_t*)pr->mod, msg);
return ORTE_SUCCESS;
}
/* if a module isn't assigned, give it to the highest priority reachable
* module as a place to start. The module will attempt to send the data and
* will call the cbfunc with the status upon completion - if it can't do it for
* some reason, it will call the component error function so we can try with another module
*/
for (i=0; i < mca_oob_tcp_component.modules.size; i++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) {
continue;
}
/* check to see if we have a contact address for this peer or
* some route to it
*/
if (!opal_bitmap_is_set_bit(&pr->reachable, mod->if_kidx)) {
continue;
}
/* mark that this module has been assigned */
pr->mod = mod;
/* pass the message along to be sent */
mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, msg);
/* upon successful completion, we will mark the module as the "best"
* one for future messages
*/
return ORTE_SUCCESS;
}
/* if for some reason all our modules are down,
* then let the base stub keep searching
*/
return ORTE_ERR_TAKE_NEXT_OPTION;
}
static char* component_get_addr(void)
{
char *cptr=NULL, *tmp, *tp;
if (!mca_oob_tcp_component.disable_ipv4_family &&
NULL != mca_oob_tcp_component.ipv4conns) {
tmp = opal_argv_join(mca_oob_tcp_component.ipv4conns, ',');
tp = opal_argv_join(mca_oob_tcp_component.ipv4ports, ',');
asprintf(&cptr, "tcp://%s:%s", tmp, tp);
free(tmp);
free(tp);
}
#if OPAL_ENABLE_IPV6
if (!mca_oob_tcp_component.disable_ipv6_family &&
NULL != mca_oob_tcp_component.ipv6conns) {
char *tmp2;
/* Fixes #2498
* RFC 3986, section 3.2.2
* The notation in that case is to encode the IPv6 IP number in square brackets:
* "http://[2001:db8:1f70::999:de8:7648:6e8]:100/"
* A host identified by an Internet Protocol literal address, version 6 [RFC3513]
* or later, is distinguished by enclosing the IP literal within square brackets.
* This is the only place where square bracket characters are allowed in the URI
* syntax. In anticipation of future, as-yet-undefined IP literal address formats,
* an implementation may use an optional version flag to indicate such a format
* explicitly rather than rely on heuristic determination.
*/
tmp = opal_argv_join(mca_oob_tcp_component.ipv6conns, ',');
tp = opal_argv_join(mca_oob_tcp_component.ipv6ports, ',');
if (NULL == cptr) {
/* no ipv4 stuff */
asprintf(&cptr, "tcp6://[%s]:%s", tmp, tp);
} else {
asprintf(&tmp2, "%s;tcp6://[%s]:%s", cptr, tmp, tp);
free(cptr);
cptr = tmp2;
}
free(tmp);
free(tp);
}
#endif
/* return our uri */
return cptr;
}
static int component_set_addr(orte_process_name_t *peer,
char **uris)
{
char **addrs, *hptr;
char *tcpuri=NULL, *host, *ports;
int i, j, k, rc;
mca_oob_tcp_module_t *mod;
mca_oob_tcp_component_peer_t *pr;
uint16_t af_family = AF_UNSPEC;
uint64_t ui64;
bool found;
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
/* cycle across component parts and see if one belongs to us */
found = false;
for (i=0; NULL != uris[i]; i++) {
if (0 == strncmp(uris[i], "tcp:", 4)) {
af_family = AF_INET;
tcpuri = strdup(uris[i]);
host = tcpuri + strlen("tcp://");
} else if (0 == strncmp(uris[i], "tcp6:", 5)) {
#if OPAL_ENABLE_IPV6
af_family = AF_INET6;
tcpuri = strdup(uris[i]);
host = tcpuri + strlen("tcp6://");
#else
/* we don't support this connection type */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: address %s not supported",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
continue;
#endif
} else {
/* not one of ours */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: ignoring address %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]);
continue;
}
/* this one is ours - record the peer */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s oob:tcp: working peer %s address %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), uris[i]);
/* separate the ports from the network addrs */
ports = strrchr(tcpuri, ':');
*ports = '\0';
ports++;
/* split the addrs */
if (NULL == host || 0 == strlen(host)) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"FORMAT ERROR IN ADDR: %s",
(NULL == host) ? "NULL" : "ZERO LENGTH");
return ORTE_ERR_BAD_PARAM;
}
/* if this is a tcp6 connection, the first one will have a '['
* at the beginning of it, and the last will have a ']' at the
* end - we need to remove those extra characters
*/
hptr = host;
if (AF_INET6 == af_family) {
if ('[' == host[0]) {
hptr = &host[1];
}
if (']' == host[strlen(host)-1]) {
host[strlen(host)-1] = '\0';
}
}
addrs = opal_argv_split(hptr, ',');
/* cycle across the provided addrs */
for (j=0; NULL != addrs[j]; j++) {
/* lookup the kernel index of this address */
if (0 >= (k = opal_ifaddrtokindex(addrs[j]))) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s UNFOUND KERNEL INDEX %d FOR ADDRESS %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, addrs[j]);
/* we don't have an interface on this subnet - ignore it */
continue;
}
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s NO MODULE AT KINDEX %d FOR ADDRESS %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, addrs[j]);
continue;
}
/* record that this peer may be reachable via this module, but don't assign
* the peer to this module until later when we actually connect
*/
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
pr = OBJ_NEW(mca_oob_tcp_component_peer_t);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void*)pr))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
opal_bitmap_set_bit(&pr->reachable, k);
/* pass this proc, and its ports, to the
* module for handling - this module will be responsible
* for communicating with the proc via this network.
* Note that the modules are *not* necessarily running
* on our event base - thus, the modules will push this
* call into their own event base for processing.
*/
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s PASSING ADDR %s TO INTERFACE %s AT KERNEL INDEX %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), addrs[j],
mod->if_name, k);
mod->api.set_peer((struct mca_oob_tcp_module_t*)mod,
peer, af_family, addrs[j], ports);
found = true;
}
if (NULL != addrs) {
opal_argv_free(addrs);
}
if (NULL != tcpuri) {
free(tcpuri);
}
}
if (found) {
/* indicate that this peer is addressable by this component */
return ORTE_SUCCESS;
}
/* otherwise indicate that it is not addressable by us */
return ORTE_ERR_TAKE_NEXT_OPTION;
}
static bool component_is_reachable(orte_process_name_t *peer)
{
orte_process_name_t hop;
uint64_t ui64;
mca_oob_tcp_component_peer_t *pr, *pnew;
int rc;
/* if we have a route to this peer, then we can reach it */
hop = orte_routed.get_route(peer);
if (ORTE_JOBID_INVALID == hop.jobid ||
ORTE_VPID_INVALID == hop.vpid) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s is NOT reachable by TCP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return false;
}
/* we have a route, but which (if any) module can reach the hop? */
memcpy(&ui64, (char*)&hop, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr)) {
/* nope - we can't get there */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s is NOT reachable by TCP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return false;
}
/* if we know the hop but have found all routes unreachable, then
* we can't send it
*/
if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s is NOT reachable by TCP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return false;
}
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s is reachable by TCP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* mark it so we can find this peer when we try to send */
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pnew) || NULL == pnew) {
pnew = OBJ_NEW(mca_oob_tcp_component_peer_t);
opal_bitmap_copy(&pnew->reachable, &pr->reachable);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void*)pnew))) {
ORTE_ERROR_LOG(rc);
return false;
}
}
return true;
}
/*
* Create a module instance and add to modules array.
*/
static int mca_oob_tcp_create(int kindex, const char *if_name)
{
mca_oob_tcp_module_t *mod;
OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output,
"%s creating OOB-TCP module for interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), if_name));
mod = (mca_oob_tcp_module_t*)malloc(sizeof(mca_oob_tcp_module_t));
if (NULL == mod) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
mod->if_name = strdup(if_name);
/* copy the APIs across */
memcpy(mod, &mca_oob_tcp_module.api, sizeof(mca_oob_tcp_module_api_t));
/* point to the interface it will service */
mod->if_kidx = kindex;
/* setup the list of addresses */
OBJ_CONSTRUCT(&mod->addresses, opal_list_t);
/* setup the default event base */
mod->ev_base = orte_event_base;
/* add it to our array */
opal_pointer_array_set_item(&mca_oob_tcp_component.modules, kindex, mod);
mca_oob_tcp_component.num_modules++;
return ORTE_SUCCESS;
}
void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
uint64_t ui64;
int rc;
mca_oob_tcp_component_peer_t *pr;
orte_oob_base_peer_t *bpr;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:set_module called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* retrieve the peer's name */
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
/* mark that this peer is being handled by the specified module */
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
/* must have come from an inbound connection */
pr = OBJ_NEW(mca_oob_tcp_component_peer_t);
opal_bitmap_set_bit(&pr->reachable, pop->mod->if_kidx);
opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr);
}
pr->mod = pop->mod;
/* make sure the OOB knows that we are handling this peer - we
* are in the same event base as the OOB base, so we can
* directly access its storage
*/
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) || NULL == bpr) {
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_set_bit(&bpr->addressable, mca_oob_tcp_component.super.idx);
bpr->component = &mca_oob_tcp_component.super;
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, bpr))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cleanup:
OBJ_RELEASE(pop);
}
void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
uint64_t ui64;
int rc, k;
mca_oob_tcp_component_peer_t *pr;
mca_oob_tcp_module_t *mod;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:lost connection called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* retrieve the peer's name */
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
/* mark that this peer is no longer reachable from this module */
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto cleanup;
}
opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx);
/* if we are terminating, or recovery isn't enabled, then don't attempt to reconnect */
if (!orte_enable_recovery || orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
goto cleanup;
}
/* if at least one module can still reach this peer, then we *might* be okay */
if (!opal_bitmap_is_clear(&pr->reachable)) {
/* any pending messages were re-queued when the module closed the connection */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:lost connection still can reach peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
continue;
}
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
/* we cannot look into the module itself to see if messages
* are pending that would cause a connection to the next address
* to occur as the module could be operating in a separate event
* base. Instead, we trigger an event to ask it to start
* the connection procedure by issuing a "ping" request
*/
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:lost pinging peer %s on interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer), mod->if_name);
mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer);
/* cleanup */
OBJ_RELEASE(pop);
return;
}
}
}
/* if we get here, then we no longer have any way to reach this peer.
* Mark that we no longer support this peer
*/
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* do the same to the OOB's table - for now, we don't worry about shifting to
* another component. Eventually, we will want to push this decision to
* the OOB so it can try other components and eventually error out
*/
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
cleanup:
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
}
OBJ_RELEASE(pop);
}
void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata)
{
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
mca_oob_tcp_module_t *mod;
uint64_t ui64;
int k;
mca_oob_tcp_component_peer_t *pr;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:no route called for peer %s on interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop),
mop->mod->if_name);
/* retrieve the hop's name */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
/* get the peer object */
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto cleanup;
}
/* ensure we mark that this peer isn't reachable by this module */
opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx);
/* do we have any other modules (i.e., NICs) we can try? */
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
continue;
}
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
/* let this module try */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:unknown hop attempting send to peer %s via interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop), mod->if_name);
mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, mop->rmsg);
OBJ_RELEASE(mop);
return;
}
}
/* if we get here, then we have no other modules - so we report
* the error back to the OOB and let it try other components
* or declare a problem
*/
if (!orte_finalizing && !orte_abnormal_term_ordered) {
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
}
}
cleanup:
OBJ_RELEASE(mop);
}
void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata)
{
mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata;
mca_oob_tcp_module_t *mod;
uint64_t ui64;
int k;
mca_oob_tcp_component_peer_t *pr;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:unknown hop called for peer %s on interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop),
mop->mod->if_name);
/* retrieve the hop's name */
memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t));
/* get the peer object */
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto cleanup;
}
/* ensure we mark that this peer isn't reachable by this module */
opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx);
/* do we have any other modules (i.e., NICs) we can try? */
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
continue;
}
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
/* let this module try */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:unknown hop attempting send to peer %s via interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mop->hop), mod->if_name);
mod->api.resend((struct mca_oob_tcp_msg_error_t*)mop);
OBJ_RELEASE(mop);
return;
}
}
/* if we get here, then we have no other modules - so we report
* the error back to the OOB and let it try other components
* or declare a problem
*/
if (!orte_finalizing && !orte_abnormal_term_ordered) {
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED);
}
}
cleanup:
OBJ_RELEASE(mop);
}
void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata)
{
mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata;
mca_oob_tcp_module_t *mod;
uint64_t ui64;
int k;
mca_oob_tcp_component_peer_t *pr;
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:failed_to_connect called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
/* get the peer object */
memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers,
ui64, (void**)&pr) || NULL == pr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto cleanup;
}
/* mark the peer as unreachable via this interface */
opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx);
/* if we are terminating, then don't attempt to reconnect */
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
OBJ_RELEASE(pop);
return;
}
/* if at least one module can still reach this peer, then we *might* be okay */
if (!opal_bitmap_is_clear(&pr->reachable)) {
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:attempting different module for connection to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
for (k=0; k < mca_oob_tcp_component.modules.size; k++) {
if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) {
continue;
}
if (opal_bitmap_is_set_bit(&pr->reachable, k)) {
/* we cannot look into the module itself to see if messages
* are pending that would cause a connection to the next address
* to occur as the module could be operating in a separate event
* base. Instead, we trigger an event to ask it to start
* the connection procedure by issuing a "ping" request
*/
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:lost pinging peer %s on interface %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer), mod->if_name);
mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer);
/* cleanup */
OBJ_RELEASE(pop);
return;
}
}
}
/* get here if nobody else can reach it - activate the proc state */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s tcp:failed_to_connect unable to reach peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer));
cleanup:
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED);
}
OBJ_RELEASE(pop);
}
/*
* Go through a list of argv; if there are any subnet specifications
* (a.b.c.d/e), resolve them to an interface name (Currently only
* supporting IPv4). If unresolvable, warn and remove.
*/
static char **split_and_resolve(char **orig_str, char *name)
{
int i, ret, save, if_index;
char **argv, *str, *tmp;
char if_name[IF_NAMESIZE];
struct sockaddr_storage argv_inaddr, if_inaddr;
uint32_t argv_prefix;
/* Sanity check */
if (NULL == orig_str || NULL == *orig_str) {
return NULL;
}
argv = opal_argv_split(*orig_str, ',');
if (NULL == argv) {
return NULL;
}
for (save = i = 0; NULL != argv[i]; ++i) {
if (isalpha(argv[i][0])) {
argv[save++] = argv[i];
continue;
}
/* Found a subnet notation. Convert it to an IP
address/netmask. Get the prefix first. */
argv_prefix = 0;
tmp = strdup(argv[i]);
str = strchr(argv[i], '/');
if (NULL == str) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename,
tmp, "Invalid specification (missing \"/\")");
free(argv[i]);
free(tmp);
continue;
}
*str = '\0';
argv_prefix = atoi(str + 1);
/* Now convert the IPv4 address */
((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET;
ret = inet_pton(AF_INET, argv[i],
&((struct sockaddr_in*) &argv_inaddr)->sin_addr);
free(argv[i]);
if (1 != ret) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename, tmp,
"Invalid specification (inet_pton() failed)");
free(tmp);
continue;
}
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp: Searching for %s address+prefix: %s / %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
name,
opal_net_get_hostname((struct sockaddr*) &argv_inaddr),
argv_prefix);
/* Go through all interfaces and see if we can find a match */
for (if_index = 0; if_index < opal_ifcount(); if_index++) {
opal_ifindextoaddr(if_index,
(struct sockaddr*) &if_inaddr,
sizeof(if_inaddr));
if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr,
(struct sockaddr*) &if_inaddr,
argv_prefix)) {
break;
}
}
/* If we didn't find a match, keep trying */
if (if_index == opal_ifcount()) {
orte_show_help("help-oob-tcp.txt", "invalid if_inexclude",
true, name, orte_process_info.nodename, tmp,
"Did not find interface matching this subnet");
free(tmp);
continue;
}
/* We found a match; get the name and replace it in the
argv */
opal_ifindextoname(if_index, if_name, sizeof(if_name));
opal_output_verbose(20, orte_oob_base_framework.framework_output,
"%s oob:tcp: Found match: %s (%s)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
opal_net_get_hostname((struct sockaddr*) &if_inaddr),
if_name);
argv[save++] = strdup(if_name);
free(tmp);
}
/* The list may have been compressed if there were invalid
entries, so ensure we end it with a NULL entry */
argv[save] = NULL;
free(*orig_str);
*orig_str = opal_argv_join(argv, ',');
return argv;
}
/* OOB TCP Class instances */
static void peer_cons(mca_oob_tcp_peer_t *peer)
{
peer->sd = -1;
OBJ_CONSTRUCT(&peer->addrs, opal_list_t);
peer->active_addr = NULL;
peer->state = MCA_OOB_TCP_UNCONNECTED;
OBJ_CONSTRUCT(&peer->send_queue, opal_list_t);
peer->send_msg = NULL;
peer->recv_msg = NULL;
peer->send_ev_active = false;
peer->recv_ev_active = false;
peer->timer_ev_active = false;
}
static void peer_des(mca_oob_tcp_peer_t *peer)
{
if (0 <= peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
}
OPAL_LIST_DESTRUCT(&peer->addrs);
OPAL_LIST_DESTRUCT(&peer->send_queue);
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_t,
opal_list_item_t,
peer_cons, peer_des);
static void padd_cons(mca_oob_tcp_addr_t *ptr)
{
memset(&ptr->addr, 0, sizeof(ptr->addr));
ptr->retries = 0;
ptr->state = MCA_OOB_TCP_UNCONNECTED;
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_addr_t,
opal_list_item_t,
padd_cons, NULL);
static void pop_cons(mca_oob_tcp_peer_op_t *pop)
{
pop->net = NULL;
pop->port = NULL;
}
static void pop_des(mca_oob_tcp_peer_op_t *pop)
{
if (NULL != pop->net) {
free(pop->net);
}
if (NULL != pop->port) {
free(pop->port);
}
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_op_t,
opal_object_t,
pop_cons, pop_des);
OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_op_t,
opal_object_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t,
opal_object_t,
NULL, NULL);
static void cmp_peer_cons(mca_oob_tcp_component_peer_t *ptr)
{
ptr->mod = NULL;
OBJ_CONSTRUCT(&ptr->reachable, opal_bitmap_t);
opal_bitmap_init(&ptr->reachable, 8); // default to 8 bits
}
static void cmp_peer_des(mca_oob_tcp_component_peer_t *ptr)
{
OBJ_DESTRUCT(&ptr->reachable);
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_component_peer_t,
opal_object_t,
cmp_peer_cons, cmp_peer_des);
OBJ_CLASS_INSTANCE(mca_oob_tcp_ping_t,
opal_object_t,
NULL, NULL);
static void nicaddr_cons(mca_oob_tcp_nicaddr_t *ptr)
{
ptr->af_family = PF_UNSPEC;
memset(&ptr->addr, 0, sizeof(ptr->addr));
}
OBJ_CLASS_INSTANCE(mca_oob_tcp_nicaddr_t,
opal_list_item_t,
nicaddr_cons, NULL);