/* * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2009-2013 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * * In windows, many of the socket functions return an EWOULDBLOCK * instead of things like EAGAIN, EINPROGRESS, etc. It has been * verified that this will not conflict with other error codes that * are returned by these functions under UNIX/Linux environments */ #include "orte_config.h" #include "orte/types.h" #include "opal/types.h" #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_SYS_TYPES_H #include #endif #include #ifdef HAVE_NETINET_IN_H #include #endif #ifdef HAVE_ARPA_INET_H #include #endif #ifdef HAVE_NETDB_H #include #endif #include #include "opal/util/show_help.h" #include "opal/util/error.h" #include "opal/util/output.h" #include "opal/opal_socket_errno.h" #include "opal/util/if.h" #include "opal/util/net.h" #include "opal/util/argv.h" #include "opal/class/opal_hash_table.h" #include "opal/class/opal_list.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/mca/routed/routed.h" #include "orte/mca/state/state.h" #include "orte/util/name_fns.h" #include "orte/util/parse_options.h" #include "orte/util/show_help.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/oob/tcp/oob_tcp.h" #include "orte/mca/oob/tcp/oob_tcp_component.h" #include "orte/mca/oob/tcp/oob_tcp_peer.h" #include "orte/mca/oob/tcp/oob_tcp_connection.h" #include "orte/mca/oob/tcp/oob_tcp_listener.h" #include "orte/mca/oob/tcp/oob_tcp_ping.h" /* * Local utility functions */ static int tcp_component_register(void); static int tcp_component_open(void); static int tcp_component_close(void); static bool component_available(void); static int component_startup(void); static void component_shutdown(void); static int component_send(orte_rml_send_t *msg); static char* component_get_addr(void); static int component_set_addr(orte_process_name_t *peer, char **uris); static bool component_is_reachable(orte_process_name_t *peer); /* * Struct of function pointers and all that to let us be initialized */ mca_oob_tcp_component_t mca_oob_tcp_component = { { { MCA_OOB_BASE_VERSION_2_0_0, "tcp", /* MCA module name */ ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, ORTE_RELEASE_VERSION, tcp_component_open, /* component open */ tcp_component_close, /* component close */ NULL, /* component query */ tcp_component_register, /* component register */ }, { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, 0, // reserve space for an assigned index 100, // default priority of this transport component_available, component_startup, component_shutdown, component_send, component_get_addr, component_set_addr, component_is_reachable }, }; /* * Initialize global variables used w/in this module. */ static int tcp_component_open(void) { /* initialize state */ OBJ_CONSTRUCT(&mca_oob_tcp_component.listeners, opal_list_t); if (ORTE_PROC_IS_HNP) { OBJ_CONSTRUCT(&mca_oob_tcp_component.listen_thread, opal_thread_t); mca_oob_tcp_component.listen_thread_active = false; mca_oob_tcp_component.listen_thread_tv.tv_sec = 1; mca_oob_tcp_component.listen_thread_tv.tv_usec = 0; } mca_oob_tcp_component.addr_count = 0; OBJ_CONSTRUCT(&mca_oob_tcp_component.modules, opal_pointer_array_t); opal_pointer_array_init(&mca_oob_tcp_component.modules, 4, INT_MAX, 2); mca_oob_tcp_component.ipv4conns = NULL; mca_oob_tcp_component.ipv4ports = NULL; OBJ_CONSTRUCT(&mca_oob_tcp_component.peers, opal_hash_table_t); opal_hash_table_init(&mca_oob_tcp_component.peers, 32); /* if_include and if_exclude need to be mutually exclusive */ if (OPAL_SUCCESS != mca_base_var_check_exclusive("orte", mca_oob_tcp_component.super.oob_base.mca_type_name, mca_oob_tcp_component.super.oob_base.mca_component_name, "if_include", mca_oob_tcp_component.super.oob_base.mca_type_name, mca_oob_tcp_component.super.oob_base.mca_component_name, "if_exclude")) { /* Return ERR_NOT_AVAILABLE so that a warning message about "open" failing is not printed */ return ORTE_ERR_NOT_AVAILABLE; } return ORTE_SUCCESS; } /* * Cleanup of global variables used by this module. */ static int tcp_component_close(void) { int i; mca_oob_tcp_module_t *mod; /* don't cleanup the listen thread as it wasn't constructed * for anything other than the HNP, and we don't want to incur * the timeout penalty when the HNP exits that would be required * to stop the thread */ /* cleanup listen event list */ OBJ_DESTRUCT(&mca_oob_tcp_component.listeners); /* cleanup modules */ for (i=0; i < mca_oob_tcp_component.modules.size; i++) { if (NULL != (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) { free(mod->if_name); free(mod); } } OBJ_DESTRUCT(&mca_oob_tcp_component.modules); if (NULL != mca_oob_tcp_component.ipv4conns) { opal_argv_free(mca_oob_tcp_component.ipv4conns); } if (NULL != mca_oob_tcp_component.ipv4ports) { opal_argv_free(mca_oob_tcp_component.ipv4ports); } OBJ_DESTRUCT(&mca_oob_tcp_component.peers); return ORTE_SUCCESS; } #if ORTE_ENABLE_STATIC_PORTS static char *static_port_string; #endif static char *dyn_port_string; static int tcp_component_register(void) { mca_base_component_t *component = &mca_oob_tcp_component.super.oob_base; int var_id; /* register oob module parameters */ mca_oob_tcp_component.peer_limit = -1; (void)mca_base_component_var_register(component, "peer_limit", "Maximum number of peer connections to simultaneously maintain (-1 = infinite)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.peer_limit); mca_oob_tcp_component.max_retries = 2; (void)mca_base_component_var_register(component, "peer_retries", "Number of times to try shutting down a connection before giving up", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.max_retries); mca_oob_tcp_component.tcp_sndbuf = 128 * 1024; (void)mca_base_component_var_register(component, "sndbuf", "TCP socket send buffering size (in bytes)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.tcp_sndbuf); mca_oob_tcp_component.tcp_rcvbuf = 128 * 1024; (void)mca_base_component_var_register(component, "rcvbuf", "TCP socket receive buffering size (in bytes)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.tcp_rcvbuf); mca_oob_tcp_component.if_include = NULL; var_id = mca_base_component_var_register(component, "if_include", "Comma-delimited list of devices and/or CIDR notation of TCP networks to use for Open MPI bootstrap communication (e.g., \"eth0,192.168.0.0/16\"). Mutually exclusive with oob_tcp_if_exclude.", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.if_include); (void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "include", MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL); mca_oob_tcp_component.if_exclude = NULL; var_id = mca_base_component_var_register(component, "if_exclude", "Comma-delimited list of devices and/or CIDR notation of TCP networks to NOT use for Open MPI bootstrap communication -- all devices not matching these specifications will be used (e.g., \"eth0,192.168.0.0/16\"). If set to a non-default value, it is mutually exclusive with oob_tcp_if_include.", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_LOCAL, &mca_oob_tcp_component.if_exclude); (void)mca_base_var_register_synonym(var_id, "orte", "oob", "tcp", "exclude", MCA_BASE_VAR_SYN_FLAG_DEPRECATED | MCA_BASE_VAR_SYN_FLAG_INTERNAL); /* if_include and if_exclude need to be mutually exclusive */ if (NULL != mca_oob_tcp_component.if_include && NULL != mca_oob_tcp_component.if_exclude) { /* Return ERR_NOT_AVAILABLE so that a warning message about "open" failing is not printed */ orte_show_help("help-oob-tcp.txt", "include-exclude", true, mca_oob_tcp_component.if_include, mca_oob_tcp_component.if_exclude); return ORTE_ERR_NOT_AVAILABLE; } #if ORTE_ENABLE_STATIC_PORTS static_port_string = NULL; (void)mca_base_component_var_register(component, "static_ipv4_ports", "Static ports for daemons and procs (IPv4)", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &static_port_string); /* if ports were provided, parse the provided range */ if (NULL != static_port_string) { orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp_static_ports); if (0 == strcmp(mca_oob_tcp_component.tcp_static_ports[0], "-1")) { opal_argv_free(mca_oob_tcp_component.tcp_static_ports); mca_oob_tcp_component.tcp_static_ports = NULL; } } else { mca_oob_tcp_component.tcp_static_ports = NULL; } #if OPAL_ENABLE_IPV6 static_port_string = NULL; (void)mca_base_component_var_register(component, "static_ipv6_ports", "Static ports for daemons and procs (IPv6)", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &static_port_string); /* if ports were provided, parse the provided range */ if (NULL != static_port_string) { orte_util_parse_range_options(static_port_string, &mca_oob_tcp_component.tcp6_static_ports); if (0 == strcmp(mca_oob_tcp_component.tcp6_static_ports[0], "-1")) { opal_argv_free(mca_oob_tcp_component.tcp6_static_ports); mca_oob_tcp_component.tcp6_static_ports = NULL; } } else { mca_oob_tcp_component.tcp6_static_ports = NULL; } if (NULL == mca_oob_tcp_component.tcp_static_ports && NULL == mca_oob_tcp_component.tcp6_static_ports) { orte_static_ports = false; } else { orte_static_ports = true; } #endif #endif dyn_port_string = NULL; (void)mca_base_component_var_register(component, "dynamic_ipv4_ports", "Range of ports to be dynamically used by daemons and procs (IPv4)", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &dyn_port_string); /* if ports were provided, parse the provided range */ if (NULL != dyn_port_string) { /* can't have both static and dynamic ports! */ if (orte_static_ports) { char *err = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ','); opal_show_help("help-oob-tcp.txt", "static-and-dynamic", true, err, dyn_port_string); free(err); return ORTE_ERROR; } orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp_dyn_ports); if (0 == strcmp(mca_oob_tcp_component.tcp_dyn_ports[0], "-1")) { opal_argv_free(mca_oob_tcp_component.tcp_dyn_ports); mca_oob_tcp_component.tcp_dyn_ports = NULL; } } else { mca_oob_tcp_component.tcp_dyn_ports = NULL; } #if OPAL_ENABLE_IPV6 dyn_port_string = NULL; (void)mca_base_component_var_register(component, "dynamic_ipv6_ports", "Range of ports to be dynamically used by daemons and procs (IPv6)", MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &dyn_port_string); /* if ports were provided, parse the provided range */ if (NULL != dyn_port_string) { /* can't have both static and dynamic ports! */ if (orte_static_ports) { char *err4=NULL, *err6=NULL; if (NULL != mca_oob_tcp_component.tcp_static_ports) { err4 = opal_argv_join(mca_oob_tcp_component.tcp_static_ports, ','); } if (NULL != mca_oob_tcp_component.tcp6_static_ports) { err6 = opal_argv_join(mca_oob_tcp_component.tcp6_static_ports, ','); } opal_show_help("help-oob-tcp.txt", "static-and-dynamic-ipv6", true, (NULL == err4) ? "N/A" : err4, (NULL == err6) ? "N/A" : err6, dyn_port_string); if (NULL != err4) { free(err4); } if (NULL != err6) { free(err6); } return ORTE_ERROR; } orte_util_parse_range_options(dyn_port_string, &mca_oob_tcp_component.tcp6_dyn_ports); if (0 == strcmp(mca_oob_tcp_component.tcp6_dyn_ports[0], "-1")) { opal_argv_free(mca_oob_tcp_component.tcp6_dyn_ports); mca_oob_tcp_component.tcp6_dyn_ports = NULL; } } else { mca_oob_tcp_component.tcp6_dyn_ports = NULL; } #endif mca_oob_tcp_component.disable_ipv4_family = false; (void)mca_base_component_var_register(component, "disable_ipv4_family", "Disable the IPv4 interfaces", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_oob_tcp_component.disable_ipv4_family); #if OPAL_ENABLE_IPV6 mca_oob_tcp_component.disable_ipv6_family = false; (void)mca_base_component_var_register(component, "disable_ipv6_family", "Disable the IPv6 interfaces", MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_oob_tcp_component.disable_ipv6_family); #endif return ORTE_SUCCESS; } static char **split_and_resolve(char **orig_str, char *name); static int mca_oob_tcp_create(int if_idx, const char *if_name); static bool component_available(void) { int i, j, rc; char **interfaces = NULL; bool including = false, excluding = false; char name[32]; struct sockaddr_storage my_ss; int kindex; bool add_this_nic; mca_oob_tcp_module_t *mod; mca_oob_tcp_nicaddr_t *nicaddr; opal_output_verbose(5, orte_oob_base_framework.framework_output, "oob:tcp: component_available called"); /* if interface include was given, construct a list * of those interfaces which match the specifications - remember, * the includes could be given as named interfaces, IP addrs, or * subnet+mask */ if (NULL != mca_oob_tcp_component.if_include) { interfaces = split_and_resolve(&mca_oob_tcp_component.if_include, "include"); including = true; excluding = false; } else if (NULL != mca_oob_tcp_component.if_exclude) { interfaces = split_and_resolve(&mca_oob_tcp_component.if_exclude, "exclude"); including = false; excluding = true; } /* look at all available interfaces */ for (i = opal_ifbegin(); i >= 0; i = opal_ifnext(i)) { if (OPAL_SUCCESS != opal_ifindextoaddr(i, (struct sockaddr*) &my_ss, sizeof (my_ss))) { opal_output (0, "oob_tcp: problems getting address for index %i (kernel index %i)\n", i, opal_ifindextokindex(i)); continue; } /* ignore non-ip4/6 interfaces */ if (AF_INET != my_ss.ss_family #if OPAL_ENABLE_IPV6 && AF_INET6 != my_ss.ss_family #endif ) { continue; } kindex = opal_ifindextokindex(i); if (kindex <= 0) { continue; } opal_output_verbose(10, orte_oob_base_framework.framework_output, "WORKING INTERFACE %d KERNEL INDEX %d FAMILY: %s", i, kindex, (AF_INET == my_ss.ss_family) ? "V4" : "V6"); /* get the name for diagnostic purposes */ opal_ifindextoname(i, name, sizeof(name)); /* handle include/exclude directives */ if (NULL != interfaces) { /* check for match */ rc = opal_ifmatches(kindex, interfaces); /* if one of the network specifications isn't parseable, then * error out as we can't do what was requested */ if (OPAL_ERR_NETWORK_NOT_PARSEABLE == rc) { orte_show_help("help-oob-tcp.txt", "not-parseable", true); opal_argv_free(interfaces); return false; } /* if we are including, then ignore this if not present */ if (including) { if (OPAL_SUCCESS != rc) { opal_output_verbose(20, orte_oob_base_framework.framework_output, "%s oob:tcp:init rejecting interface %s (not in include list)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name); continue; } } else { /* we are excluding, so ignore if present */ if (OPAL_SUCCESS == rc) { opal_output_verbose(20, orte_oob_base_framework.framework_output, "%s oob:tcp:init rejecting interface %s (in exclude list)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name); continue; } } } else { /* if no specific interfaces were provided, we ignore the loopback * interface unless nothing else is available */ if (1 < opal_ifcount() && opal_ifisloopback(i)) { opal_output_verbose(20, orte_oob_base_framework.framework_output, "%s oob:tcp:init rejecting loopback interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name); continue; } } /* we know we want this address - check if we have seen this NIC before */ add_this_nic = true; for (j = 0; j < mca_oob_tcp_component.modules.size; j++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, j))) { continue; } /* Have we seen this NIC already? */ if (kindex == mod->if_kidx) { add_this_nic = false; /* we don't want another module to be created. But we still need to preserve * the address as a given NIC can have multiple addresses. */ nicaddr = OBJ_NEW(mca_oob_tcp_nicaddr_t); nicaddr->af_family = my_ss.ss_family; memcpy(&nicaddr->addr, &my_ss, sizeof(struct sockaddr)); opal_list_append(&mod->addresses, &nicaddr->super); opal_output_verbose(10, orte_oob_base_framework.framework_output, "%s oob:tcp:init adding %s address to interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (AF_INET == my_ss.ss_family) ? "V4" : "V6", name); break; } } /* Refs ticket #3019 * it would probably be worthwhile to print out a warning if OMPI detects multiple * IP interfaces that are "up" on the same subnet (because that's a Bad Idea). Note * that we should only check for this after applying the relevant include/exclude * list MCA params. If we detect redundant ports, we can also automatically ignore * them so that applications won't hang. */ if (add_this_nic) { /* we want to support this interface, so create a module for it */ if (ORTE_SUCCESS != (rc = mca_oob_tcp_create(kindex, name))) { ORTE_ERROR_LOG(rc); return false; } } /* add this address to our connections */ if (AF_INET == my_ss.ss_family) { opal_argv_append_nosize(&mca_oob_tcp_component.ipv4conns, opal_net_get_hostname((struct sockaddr*) &my_ss)); } else if (AF_INET6 == my_ss.ss_family) { #if OPAL_ENABLE_IPV6 opal_argv_append_nosize(&mca_oob_tcp_component.ipv6conns, opal_net_get_hostname((struct sockaddr*) &my_ss)); #endif } } /* cleanup */ if (NULL != interfaces) { opal_argv_free(interfaces); } if (0 == mca_oob_tcp_component.num_modules) { if (including) { orte_show_help("help-oob-tcp.txt", "no-included-found", true, mca_oob_tcp_component.if_include); } else if (excluding) { orte_show_help("help-oob-tcp.txt", "excluded-all", true, mca_oob_tcp_component.if_exclude); } else { orte_show_help("help-oob-tcp.txt", "no-interfaces-avail", true); } return false; } return true; } /* Start all modules */ static int component_startup(void) { mca_oob_tcp_module_t *mod; int i, rc; opal_output_verbose(2, orte_oob_base_framework.framework_output, "%s TCP STARTUP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* start the modules */ for (i=0; i < mca_oob_tcp_component.modules.size; i++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) { continue; } if (NULL != mod->api.init) { mod->api.init((struct mca_oob_tcp_module_t*)mod); } } /* start the listening thread/event */ if (ORTE_SUCCESS != (rc = orte_oob_tcp_start_listening())) { ORTE_ERROR_LOG(rc); } return rc; } static void component_shutdown(void) { mca_oob_tcp_module_t *mod; int i; opal_list_item_t *item; opal_output_verbose(2, orte_oob_base_framework.framework_output, "%s TCP SHUTDOWN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); while (NULL != (item = opal_list_remove_first(&mca_oob_tcp_component.listeners))) { OBJ_RELEASE(item); } /* shutdown the modules */ for (i=0; i < mca_oob_tcp_component.modules.size; i++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) { continue; } if (NULL != mod->api.finalize) { mod->api.finalize((struct mca_oob_tcp_module_t*)mod); } } } static int component_send(orte_rml_send_t *msg) { int i; mca_oob_tcp_component_peer_t *pr; uint64_t ui64; mca_oob_tcp_module_t *mod; opal_output_verbose(5, orte_oob_base_framework.framework_output, "%s oob:tcp:send_nb to peer %s:%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&msg->peer), msg->tag); /* do we know some way of potentially reaching this peer? */ memcpy(&ui64, (char*)&msg->peer, sizeof(uint64_t)); if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr)) { /* nope - let someone else try */ return ORTE_ERR_TAKE_NEXT_OPTION; } /* if we knew the peer but have found all routes unreachable, then * we can't send it */ if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) { return ORTE_ERR_TAKE_NEXT_OPTION; } /* if a module is assigned, then use it */ if (NULL != pr->mod) { /* the module is potentially running on its own event * base, so all it can do is push our send request * onto an event - it cannot tell us if it will * succeed. The module will attempt to send the data and * will call the cbfunc with the status * upon completion - if it can't do it for * some reason, it will call the component error * function so we can try with another module */ pr->mod->api.send_nb((struct mca_oob_tcp_module_t*)pr->mod, msg); return ORTE_SUCCESS; } /* if a module isn't assigned, give it to the highest priority reachable * module as a place to start. The module will attempt to send the data and * will call the cbfunc with the status upon completion - if it can't do it for * some reason, it will call the component error function so we can try with another module */ for (i=0; i < mca_oob_tcp_component.modules.size; i++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, i))) { continue; } /* check to see if we have a contact address for this peer or * some route to it */ if (!opal_bitmap_is_set_bit(&pr->reachable, mod->if_kidx)) { continue; } /* mark that this module has been assigned */ pr->mod = mod; /* pass the message along to be sent */ mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, msg); /* upon successful completion, we will mark the module as the "best" * one for future messages */ return ORTE_SUCCESS; } /* if for some reason all our modules are down, * then let the base stub keep searching */ return ORTE_ERR_TAKE_NEXT_OPTION; } static char* component_get_addr(void) { char *cptr=NULL, *tmp, *tp; if (!mca_oob_tcp_component.disable_ipv4_family && NULL != mca_oob_tcp_component.ipv4conns) { tmp = opal_argv_join(mca_oob_tcp_component.ipv4conns, ','); tp = opal_argv_join(mca_oob_tcp_component.ipv4ports, ','); asprintf(&cptr, "tcp://%s:%s", tmp, tp); free(tmp); free(tp); } #if OPAL_ENABLE_IPV6 if (!mca_oob_tcp_component.disable_ipv6_family && NULL != mca_oob_tcp_component.ipv6conns) { char *tmp2; /* Fixes #2498 * RFC 3986, section 3.2.2 * The notation in that case is to encode the IPv6 IP number in square brackets: * "http://[2001:db8:1f70::999:de8:7648:6e8]:100/" * A host identified by an Internet Protocol literal address, version 6 [RFC3513] * or later, is distinguished by enclosing the IP literal within square brackets. * This is the only place where square bracket characters are allowed in the URI * syntax. In anticipation of future, as-yet-undefined IP literal address formats, * an implementation may use an optional version flag to indicate such a format * explicitly rather than rely on heuristic determination. */ tmp = opal_argv_join(mca_oob_tcp_component.ipv6conns, ','); tp = opal_argv_join(mca_oob_tcp_component.ipv6ports, ','); if (NULL == cptr) { /* no ipv4 stuff */ asprintf(&cptr, "tcp6://[%s]:%s", tmp, tp); } else { asprintf(&tmp2, "%s;tcp6://[%s]:%s", cptr, tmp, tp); free(cptr); cptr = tmp2; } free(tmp); free(tp); } #endif /* return our uri */ return cptr; } static int component_set_addr(orte_process_name_t *peer, char **uris) { char **addrs, *hptr; char *tcpuri=NULL, *host, *ports; int i, j, k, rc; mca_oob_tcp_module_t *mod; mca_oob_tcp_component_peer_t *pr; uint16_t af_family = AF_UNSPEC; uint64_t ui64; bool found; memcpy(&ui64, (char*)peer, sizeof(uint64_t)); /* cycle across component parts and see if one belongs to us */ found = false; for (i=0; NULL != uris[i]; i++) { if (0 == strncmp(uris[i], "tcp:", 4)) { af_family = AF_INET; tcpuri = strdup(uris[i]); host = tcpuri + strlen("tcp://"); } else if (0 == strncmp(uris[i], "tcp6:", 5)) { #if OPAL_ENABLE_IPV6 af_family = AF_INET6; tcpuri = strdup(uris[i]); host = tcpuri + strlen("tcp6://"); #else /* we don't support this connection type */ opal_output_verbose(2, orte_oob_base_framework.framework_output, "%s oob:tcp: address %s not supported", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]); continue; #endif } else { /* not one of ours */ opal_output_verbose(2, orte_oob_base_framework.framework_output, "%s oob:tcp: ignoring address %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uris[i]); continue; } /* this one is ours - record the peer */ opal_output_verbose(2, orte_oob_base_framework.framework_output, "%s oob:tcp: working peer %s address %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), uris[i]); /* separate the ports from the network addrs */ ports = strrchr(tcpuri, ':'); *ports = '\0'; ports++; /* split the addrs */ if (NULL == host || 0 == strlen(host)) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "FORMAT ERROR IN ADDR: %s", (NULL == host) ? "NULL" : "ZERO LENGTH"); return ORTE_ERR_BAD_PARAM; } /* if this is a tcp6 connection, the first one will have a '[' * at the beginning of it, and the last will have a ']' at the * end - we need to remove those extra characters */ hptr = host; if (AF_INET6 == af_family) { if ('[' == host[0]) { hptr = &host[1]; } if (']' == host[strlen(host)-1]) { host[strlen(host)-1] = '\0'; } } addrs = opal_argv_split(hptr, ','); /* cycle across the provided addrs */ for (j=0; NULL != addrs[j]; j++) { /* lookup the kernel index of this address */ if (0 >= (k = opal_ifaddrtokindex(addrs[j]))) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s UNFOUND KERNEL INDEX %d FOR ADDRESS %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, addrs[j]); /* we don't have an interface on this subnet - ignore it */ continue; } if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s NO MODULE AT KINDEX %d FOR ADDRESS %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), k, addrs[j]); continue; } /* record that this peer may be reachable via this module, but don't assign * the peer to this module until later when we actually connect */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { pr = OBJ_NEW(mca_oob_tcp_component_peer_t); if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, (void*)pr))) { ORTE_ERROR_LOG(rc); return rc; } } opal_bitmap_set_bit(&pr->reachable, k); /* pass this proc, and its ports, to the * module for handling - this module will be responsible * for communicating with the proc via this network. * Note that the modules are *not* necessarily running * on our event base - thus, the modules will push this * call into their own event base for processing. */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s PASSING ADDR %s TO INTERFACE %s AT KERNEL INDEX %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), addrs[j], mod->if_name, k); mod->api.set_peer((struct mca_oob_tcp_module_t*)mod, peer, af_family, addrs[j], ports); found = true; } if (NULL != addrs) { opal_argv_free(addrs); } if (NULL != tcpuri) { free(tcpuri); } } if (found) { /* indicate that this peer is addressable by this component */ return ORTE_SUCCESS; } /* otherwise indicate that it is not addressable by us */ return ORTE_ERR_TAKE_NEXT_OPTION; } static bool component_is_reachable(orte_process_name_t *peer) { orte_process_name_t hop; uint64_t ui64; mca_oob_tcp_component_peer_t *pr, *pnew; int rc; /* if we have a route to this peer, then we can reach it */ hop = orte_routed.get_route(peer); if (ORTE_JOBID_INVALID == hop.jobid || ORTE_VPID_INVALID == hop.vpid) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s is NOT reachable by TCP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return false; } /* we have a route, but which (if any) module can reach the hop? */ memcpy(&ui64, (char*)&hop, sizeof(uint64_t)); if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr)) { /* nope - we can't get there */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s is NOT reachable by TCP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return false; } /* if we know the hop but have found all routes unreachable, then * we can't send it */ if (NULL == pr || opal_bitmap_is_clear(&pr->reachable)) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s is NOT reachable by TCP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); return false; } opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s is reachable by TCP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* mark it so we can find this peer when we try to send */ memcpy(&ui64, (char*)peer, sizeof(uint64_t)); if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pnew) || NULL == pnew) { pnew = OBJ_NEW(mca_oob_tcp_component_peer_t); opal_bitmap_copy(&pnew->reachable, &pr->reachable); if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, (void*)pnew))) { ORTE_ERROR_LOG(rc); return false; } } return true; } /* * Create a module instance and add to modules array. */ static int mca_oob_tcp_create(int kindex, const char *if_name) { mca_oob_tcp_module_t *mod; OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output, "%s creating OOB-TCP module for interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), if_name)); mod = (mca_oob_tcp_module_t*)malloc(sizeof(mca_oob_tcp_module_t)); if (NULL == mod) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } mod->if_name = strdup(if_name); /* copy the APIs across */ memcpy(mod, &mca_oob_tcp_module.api, sizeof(mca_oob_tcp_module_api_t)); /* point to the interface it will service */ mod->if_kidx = kindex; /* setup the list of addresses */ OBJ_CONSTRUCT(&mod->addresses, opal_list_t); /* setup the default event base */ mod->ev_base = orte_event_base; /* add it to our array */ opal_pointer_array_set_item(&mca_oob_tcp_component.modules, kindex, mod); mca_oob_tcp_component.num_modules++; return ORTE_SUCCESS; } void mca_oob_tcp_component_set_module(int fd, short args, void *cbdata) { mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata; uint64_t ui64; int rc; mca_oob_tcp_component_peer_t *pr; orte_oob_base_peer_t *bpr; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:set_module called for peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); /* retrieve the peer's name */ memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t)); /* mark that this peer is being handled by the specified module */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { /* must have come from an inbound connection */ pr = OBJ_NEW(mca_oob_tcp_component_peer_t); opal_bitmap_set_bit(&pr->reachable, pop->mod->if_kidx); opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, pr); } pr->mod = pop->mod; /* make sure the OOB knows that we are handling this peer - we * are in the same event base as the OOB base, so we can * directly access its storage */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers, ui64, (void**)&bpr) || NULL == bpr) { bpr = OBJ_NEW(orte_oob_base_peer_t); } opal_bitmap_set_bit(&bpr->addressable, mca_oob_tcp_component.super.idx); bpr->component = &mca_oob_tcp_component.super; if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, ui64, bpr))) { ORTE_ERROR_LOG(rc); goto cleanup; } cleanup: OBJ_RELEASE(pop); } void mca_oob_tcp_component_lost_connection(int fd, short args, void *cbdata) { mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata; uint64_t ui64; int rc, k; mca_oob_tcp_component_peer_t *pr; mca_oob_tcp_module_t *mod; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:lost connection called for peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); /* retrieve the peer's name */ memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t)); /* mark that this peer is no longer reachable from this module */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx); /* if we are terminating, or recovery isn't enabled, then don't attempt to reconnect */ if (!orte_enable_recovery || orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) { goto cleanup; } /* if at least one module can still reach this peer, then we *might* be okay */ if (!opal_bitmap_is_clear(&pr->reachable)) { /* any pending messages were re-queued when the module closed the connection */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:lost connection still can reach peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); for (k=0; k < mca_oob_tcp_component.modules.size; k++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) { continue; } if (opal_bitmap_is_set_bit(&pr->reachable, k)) { /* we cannot look into the module itself to see if messages * are pending that would cause a connection to the next address * to occur as the module could be operating in a separate event * base. Instead, we trigger an event to ask it to start * the connection procedure by issuing a "ping" request */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:lost pinging peer %s on interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer), mod->if_name); mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer); /* cleanup */ OBJ_RELEASE(pop); return; } } } /* if we get here, then we no longer have any way to reach this peer. * Mark that we no longer support this peer */ if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&mca_oob_tcp_component.peers, ui64, NULL))) { ORTE_ERROR_LOG(rc); } /* do the same to the OOB's table - for now, we don't worry about shifting to * another component. Eventually, we will want to push this decision to * the OOB so it can try other components and eventually error out */ if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, ui64, NULL))) { ORTE_ERROR_LOG(rc); } cleanup: /* activate the proc state */ if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) { ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST); } else { ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED); } OBJ_RELEASE(pop); } void mca_oob_tcp_component_no_route(int fd, short args, void *cbdata) { mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata; mca_oob_tcp_module_t *mod; uint64_t ui64; int k; mca_oob_tcp_component_peer_t *pr; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:no route called for peer %s on interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&mop->hop), mop->mod->if_name); /* retrieve the hop's name */ memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t)); /* get the peer object */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { goto cleanup; } /* ensure we mark that this peer isn't reachable by this module */ opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx); /* do we have any other modules (i.e., NICs) we can try? */ for (k=0; k < mca_oob_tcp_component.modules.size; k++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) { continue; } if (opal_bitmap_is_set_bit(&pr->reachable, k)) { /* let this module try */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:unknown hop attempting send to peer %s via interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&mop->hop), mod->if_name); mod->api.send_nb((struct mca_oob_tcp_module_t*)mod, mop->rmsg); OBJ_RELEASE(mop); return; } } /* if we get here, then we have no other modules - so we report * the error back to the OOB and let it try other components * or declare a problem */ if (!orte_finalizing && !orte_abnormal_term_ordered) { /* if this was a lifeline, then alert */ if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) { ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST); } else { ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED); } } cleanup: OBJ_RELEASE(mop); } void mca_oob_tcp_component_hop_unknown(int fd, short args, void *cbdata) { mca_oob_tcp_msg_error_t *mop = (mca_oob_tcp_msg_error_t*)cbdata; mca_oob_tcp_module_t *mod; uint64_t ui64; int k; mca_oob_tcp_component_peer_t *pr; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:unknown hop called for peer %s on interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&mop->hop), mop->mod->if_name); /* retrieve the hop's name */ memcpy(&ui64, (char*)&(mop->hop), sizeof(uint64_t)); /* get the peer object */ if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } /* ensure we mark that this peer isn't reachable by this module */ opal_bitmap_clear_bit(&pr->reachable, mop->mod->if_kidx); /* do we have any other modules (i.e., NICs) we can try? */ for (k=0; k < mca_oob_tcp_component.modules.size; k++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) { continue; } if (opal_bitmap_is_set_bit(&pr->reachable, k)) { /* let this module try */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:unknown hop attempting send to peer %s via interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&mop->hop), mod->if_name); mod->api.resend((struct mca_oob_tcp_msg_error_t*)mop); OBJ_RELEASE(mop); return; } } /* if we get here, then we have no other modules - so we report * the error back to the OOB and let it try other components * or declare a problem */ if (!orte_finalizing && !orte_abnormal_term_ordered) { /* if this was a lifeline, then alert */ if (ORTE_SUCCESS != orte_routed.route_lost(&mop->hop)) { ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_LIFELINE_LOST); } else { ORTE_ACTIVATE_PROC_STATE(&mop->hop, ORTE_PROC_STATE_COMM_FAILED); } } cleanup: OBJ_RELEASE(mop); } void mca_oob_tcp_component_failed_to_connect(int fd, short args, void *cbdata) { mca_oob_tcp_peer_op_t *pop = (mca_oob_tcp_peer_op_t*)cbdata; mca_oob_tcp_module_t *mod; uint64_t ui64; int k; mca_oob_tcp_component_peer_t *pr; opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:failed_to_connect called for peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); /* get the peer object */ memcpy(&ui64, (char*)&(pop->peer), sizeof(uint64_t)); if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_component.peers, ui64, (void**)&pr) || NULL == pr) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto cleanup; } /* mark the peer as unreachable via this interface */ opal_bitmap_clear_bit(&pr->reachable, pop->mod->if_kidx); /* if we are terminating, then don't attempt to reconnect */ if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) { OBJ_RELEASE(pop); return; } /* if at least one module can still reach this peer, then we *might* be okay */ if (!opal_bitmap_is_clear(&pr->reachable)) { opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:attempting different module for connection to peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); for (k=0; k < mca_oob_tcp_component.modules.size; k++) { if (NULL == (mod = (mca_oob_tcp_module_t*)opal_pointer_array_get_item(&mca_oob_tcp_component.modules, k))) { continue; } if (opal_bitmap_is_set_bit(&pr->reachable, k)) { /* we cannot look into the module itself to see if messages * are pending that would cause a connection to the next address * to occur as the module could be operating in a separate event * base. Instead, we trigger an event to ask it to start * the connection procedure by issuing a "ping" request */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:lost pinging peer %s on interface %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer), mod->if_name); mod->api.ping((struct mca_oob_tcp_module_t*)mod, &pop->peer); /* cleanup */ OBJ_RELEASE(pop); return; } } } /* get here if nobody else can reach it - activate the proc state */ opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s tcp:failed_to_connect unable to reach peer %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&pop->peer)); cleanup: /* if this was a lifeline, then alert */ if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer)) { ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_LIFELINE_LOST); } else { ORTE_ACTIVATE_PROC_STATE(&pop->peer, ORTE_PROC_STATE_COMM_FAILED); } OBJ_RELEASE(pop); } /* * Go through a list of argv; if there are any subnet specifications * (a.b.c.d/e), resolve them to an interface name (Currently only * supporting IPv4). If unresolvable, warn and remove. */ static char **split_and_resolve(char **orig_str, char *name) { int i, ret, save, if_index; char **argv, *str, *tmp; char if_name[IF_NAMESIZE]; struct sockaddr_storage argv_inaddr, if_inaddr; uint32_t argv_prefix; /* Sanity check */ if (NULL == orig_str || NULL == *orig_str) { return NULL; } argv = opal_argv_split(*orig_str, ','); if (NULL == argv) { return NULL; } for (save = i = 0; NULL != argv[i]; ++i) { if (isalpha(argv[i][0])) { argv[save++] = argv[i]; continue; } /* Found a subnet notation. Convert it to an IP address/netmask. Get the prefix first. */ argv_prefix = 0; tmp = strdup(argv[i]); str = strchr(argv[i], '/'); if (NULL == str) { orte_show_help("help-oob-tcp.txt", "invalid if_inexclude", true, name, orte_process_info.nodename, tmp, "Invalid specification (missing \"/\")"); free(argv[i]); free(tmp); continue; } *str = '\0'; argv_prefix = atoi(str + 1); /* Now convert the IPv4 address */ ((struct sockaddr*) &argv_inaddr)->sa_family = AF_INET; ret = inet_pton(AF_INET, argv[i], &((struct sockaddr_in*) &argv_inaddr)->sin_addr); free(argv[i]); if (1 != ret) { orte_show_help("help-oob-tcp.txt", "invalid if_inexclude", true, name, orte_process_info.nodename, tmp, "Invalid specification (inet_pton() failed)"); free(tmp); continue; } opal_output_verbose(20, orte_oob_base_framework.framework_output, "%s oob:tcp: Searching for %s address+prefix: %s / %u", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name, opal_net_get_hostname((struct sockaddr*) &argv_inaddr), argv_prefix); /* Go through all interfaces and see if we can find a match */ for (if_index = 0; if_index < opal_ifcount(); if_index++) { opal_ifindextoaddr(if_index, (struct sockaddr*) &if_inaddr, sizeof(if_inaddr)); if (opal_net_samenetwork((struct sockaddr*) &argv_inaddr, (struct sockaddr*) &if_inaddr, argv_prefix)) { break; } } /* If we didn't find a match, keep trying */ if (if_index == opal_ifcount()) { orte_show_help("help-oob-tcp.txt", "invalid if_inexclude", true, name, orte_process_info.nodename, tmp, "Did not find interface matching this subnet"); free(tmp); continue; } /* We found a match; get the name and replace it in the argv */ opal_ifindextoname(if_index, if_name, sizeof(if_name)); opal_output_verbose(20, orte_oob_base_framework.framework_output, "%s oob:tcp: Found match: %s (%s)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), opal_net_get_hostname((struct sockaddr*) &if_inaddr), if_name); argv[save++] = strdup(if_name); free(tmp); } /* The list may have been compressed if there were invalid entries, so ensure we end it with a NULL entry */ argv[save] = NULL; free(*orig_str); *orig_str = opal_argv_join(argv, ','); return argv; } /* OOB TCP Class instances */ static void peer_cons(mca_oob_tcp_peer_t *peer) { peer->sd = -1; OBJ_CONSTRUCT(&peer->addrs, opal_list_t); peer->active_addr = NULL; peer->state = MCA_OOB_TCP_UNCONNECTED; OBJ_CONSTRUCT(&peer->send_queue, opal_list_t); peer->send_msg = NULL; peer->recv_msg = NULL; peer->send_ev_active = false; peer->recv_ev_active = false; peer->timer_ev_active = false; } static void peer_des(mca_oob_tcp_peer_t *peer) { if (0 <= peer->sd) { CLOSE_THE_SOCKET(peer->sd); } OPAL_LIST_DESTRUCT(&peer->addrs); OPAL_LIST_DESTRUCT(&peer->send_queue); } OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_t, opal_list_item_t, peer_cons, peer_des); static void padd_cons(mca_oob_tcp_addr_t *ptr) { memset(&ptr->addr, 0, sizeof(ptr->addr)); ptr->retries = 0; ptr->state = MCA_OOB_TCP_UNCONNECTED; } OBJ_CLASS_INSTANCE(mca_oob_tcp_addr_t, opal_list_item_t, padd_cons, NULL); static void pop_cons(mca_oob_tcp_peer_op_t *pop) { pop->net = NULL; pop->port = NULL; } static void pop_des(mca_oob_tcp_peer_op_t *pop) { if (NULL != pop->net) { free(pop->net); } if (NULL != pop->port) { free(pop->port); } } OBJ_CLASS_INSTANCE(mca_oob_tcp_peer_op_t, opal_object_t, pop_cons, pop_des); OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_op_t, opal_object_t, NULL, NULL); OBJ_CLASS_INSTANCE(mca_oob_tcp_conn_op_t, opal_object_t, NULL, NULL); static void cmp_peer_cons(mca_oob_tcp_component_peer_t *ptr) { ptr->mod = NULL; OBJ_CONSTRUCT(&ptr->reachable, opal_bitmap_t); opal_bitmap_init(&ptr->reachable, 8); // default to 8 bits } static void cmp_peer_des(mca_oob_tcp_component_peer_t *ptr) { OBJ_DESTRUCT(&ptr->reachable); } OBJ_CLASS_INSTANCE(mca_oob_tcp_component_peer_t, opal_object_t, cmp_peer_cons, cmp_peer_des); OBJ_CLASS_INSTANCE(mca_oob_tcp_ping_t, opal_object_t, NULL, NULL); static void nicaddr_cons(mca_oob_tcp_nicaddr_t *ptr) { ptr->af_family = PF_UNSPEC; memset(&ptr->addr, 0, sizeof(ptr->addr)); } OBJ_CLASS_INSTANCE(mca_oob_tcp_nicaddr_t, opal_list_item_t, nicaddr_cons, NULL);