From 7440f21b75e61ed372666b07cc35b982fcfa2339 Mon Sep 17 00:00:00 2001 From: Jeff Squyres Date: Wed, 26 Feb 2014 22:21:25 +0000 Subject: [PATCH] Add usnic connectivity-checking agent service. Basically: since usnic is a connectionless transport, we do not get OS-provided services "for free" that connection-oriented transports get, namely: "hey, I wasn't able to make a connection to peer X", and "hey, your connection to peer X has died." This connectivity-checker runs in a separate progress thread in the usnic BTL in local rank 0 on each server. Upon first send in any process, the connectivty-checker agent will send some UDP pings to the peer to ensure that we can reach it. If we can't, we'll abort the job with a nice show_help message. There's a lengthy comment in btl_usnic_connectivity.h explains the scheme and how it works. Reviewed by Dave Goodell. cmr=v1.7.5:ticket=trac:4253 This commit was SVN r30860. The following Trac tickets were found above: Ticket 4253 --> https://svn.open-mpi.org/trac/ompi/ticket/4253 --- ompi/mca/btl/usnic/Makefile.am | 3 + ompi/mca/btl/usnic/btl_usnic.h | 8 +- ompi/mca/btl/usnic/btl_usnic_cagent.c | 996 ++++++++++++++++++++ ompi/mca/btl/usnic/btl_usnic_cclient.c | 213 +++++ ompi/mca/btl/usnic/btl_usnic_component.c | 15 + ompi/mca/btl/usnic/btl_usnic_connectivity.h | 296 ++++++ ompi/mca/btl/usnic/btl_usnic_endpoint.c | 1 + ompi/mca/btl/usnic/btl_usnic_endpoint.h | 3 + ompi/mca/btl/usnic/btl_usnic_mca.c | 44 + ompi/mca/btl/usnic/btl_usnic_module.c | 28 + ompi/mca/btl/usnic/btl_usnic_util.c | 27 + ompi/mca/btl/usnic/btl_usnic_util.h | 7 + ompi/mca/btl/usnic/help-mpi-btl-usnic.txt | 72 ++ ompi/mca/rte/rte.h | 3 + 14 files changed, 1715 insertions(+), 1 deletion(-) create mode 100644 ompi/mca/btl/usnic/btl_usnic_cagent.c create mode 100644 ompi/mca/btl/usnic/btl_usnic_cclient.c create mode 100644 ompi/mca/btl/usnic/btl_usnic_connectivity.h diff --git a/ompi/mca/btl/usnic/Makefile.am b/ompi/mca/btl/usnic/Makefile.am index e84fd2139a..551d3c37a0 100644 --- a/ompi/mca/btl/usnic/Makefile.am +++ b/ompi/mca/btl/usnic/Makefile.am @@ -50,6 +50,9 @@ sources = \ btl_usnic_ack.c \ btl_usnic_ack.h \ btl_usnic_component.c \ + btl_usnic_connectivity.h \ + btl_usnic_cclient.c \ + btl_usnic_cagent.c \ btl_usnic_endpoint.c \ btl_usnic_endpoint.h \ btl_usnic_frag.c \ diff --git a/ompi/mca/btl/usnic/btl_usnic.h b/ompi/mca/btl/usnic/btl_usnic.h index 9012f34de2..6ab8ca54d2 100644 --- a/ompi/mca/btl/usnic/btl_usnic.h +++ b/ompi/mca/btl/usnic/btl_usnic.h @@ -190,8 +190,14 @@ typedef struct ompi_btl_usnic_component_t { /** retrans characteristics */ int retrans_timeout; - /** socket used for rtnetlink queries */ struct usnic_rtnl_sk *unlsk; + + /** convertor packing threshold */ + /** connectivity verification: ACK timeout, number of retries + before issue an error/abort the job */ + bool connectivity_enabled; + int connectivity_ack_timeout; + int connectivity_num_retries; } ompi_btl_usnic_component_t; OMPI_MODULE_DECLSPEC extern ompi_btl_usnic_component_t mca_btl_usnic_component; diff --git a/ompi/mca/btl/usnic/btl_usnic_cagent.c b/ompi/mca/btl/usnic/btl_usnic_cagent.c new file mode 100644 index 0000000000..cfccff9c8f --- /dev/null +++ b/ompi/mca/btl/usnic/btl_usnic_cagent.c @@ -0,0 +1,996 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include +#include +#include +#include + +#include "opal_stdint.h" +#include "opal/threads/mutex.h" +#include "opal/mca/event/event.h" +#include "opal/util/show_help.h" +#include "opal/types.h" +#include "opal/util/output.h" + +#include "ompi/mca/rte/rte.h" +#include "ompi/runtime/mpiruntime.h" +#include "ompi/constants.h" + +#include "btl_usnic.h" +#include "btl_usnic_connectivity.h" + +/************************************************************************** + * Agent data and methods + **************************************************************************/ + +/* + * Data structure used to proxy messages between the RTE thread and + * the agent thread. + */ +typedef struct { + opal_buffer_t *buffer; + ompi_process_name_t sender; + opal_event_t event; +} agent_buf_name_t; + +/* + * Long enough to hold "xxx.xxx.xxx.xxx/xx" + */ +#define IPV4ADDRLEN 20 + +/* + * Long enough to hold "xx:xx:xx:xx:xx:xx" + */ +#define MACLEN 18 + +/* + * Holds all the information about a UDP port that the agent thread is + * listening on (for incoming PINGs and ACKs). + */ +typedef struct { + opal_list_item_t super; + + /* Data from the LISTEN command message */ + uint32_t ipv4_addr; + uint32_t cidrmask; + char ipv4_addr_str[IPV4ADDRLEN]; + uint32_t mtu; + char *nodename; + char *if_name; + char *usnic_name; + char mac_str[MACLEN]; + + /* File descriptor, UDP port, buffer to receive messages, and event */ + int fd; + uint32_t udp_port; + uint8_t *buffer; + opal_event_t event; +} agent_udp_port_listener_t; + +OBJ_CLASS_DECLARATION(agent_udp_port_listener_t); + +typedef enum { + AGENT_MSG_TYPE_PING = 17, + AGENT_MSG_TYPE_ACK +} agent_udp_message_type_t; + +/* + * Ping and ACK messages + */ +typedef struct { + uint8_t message_type; + + /* The sender's IP address and port (i.e., where the ACK can be + sent). This is actually redundant with the sockaddr that we + get from recvfrom(), but that's ok -- it provides a sanity + check. */ + uint32_t src_ipv4_addr; + uint32_t src_udp_port; + + /* If this is a PING, the message should be this size. + If this is an ACK, we are ACKing a ping of this size. */ + uint32_t size; +} agent_udp_message_t; + +typedef struct { + opal_list_item_t super; + + /* Data from the PING command message */ + uint32_t src_ipv4_addr; /* in network byte order */ + uint32_t src_udp_port; + agent_udp_port_listener_t *listener; + uint32_t dest_ipv4_addr; /* in network byte order */ + uint32_t dest_cidrmask; + uint32_t dest_udp_port; + struct sockaddr_in dest_sockaddr; + char *dest_nodename; + uint8_t dest_mac[6]; + + /* The sizes and corresponding buffers of the PING messages that + we'll send, and whether each of those PING messages have been + ACKed yet */ +#define NUM_PING_SIZES 2 + size_t sizes[NUM_PING_SIZES]; + uint8_t *buffers[NUM_PING_SIZES]; + bool acked[NUM_PING_SIZES]; + + /* Number of times we've sent this ping */ + int num_sends; + + /* Timer used to re-send the PING, and whether the timer is active + or not */ + opal_event_t timer; + bool timer_active; +} agent_ping_t; + +OBJ_CLASS_DECLARATION(agent_ping_t); + + +/************************************************************************** + * Utility functions, constructors, destructors + **************************************************************************/ + +static void port_listener_zero(agent_udp_port_listener_t *obj) +{ + obj->ipv4_addr = + obj->cidrmask = + obj->mtu = 0; + obj->nodename = + obj->if_name = + obj->usnic_name = NULL; + memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str)); + memset(obj->mac_str, 0, sizeof(obj->mac_str)); + + obj->fd = -1; + obj->udp_port = -1; + obj->buffer = NULL; +} + +static void port_listener_constructor(agent_udp_port_listener_t *obj) +{ + port_listener_zero(obj); +} + +static void port_listener_destructor(agent_udp_port_listener_t *obj) +{ + if (-1 != obj->fd) { + close(obj->fd); + } + if (NULL != obj->nodename) { + free(obj->nodename); + } + if (NULL != obj->if_name) { + free(obj->if_name); + } + if (NULL != obj->usnic_name) { + free(obj->usnic_name); + } + if (NULL != obj->buffer) { + free(obj->buffer); + } + + port_listener_zero(obj); +} + +OBJ_CLASS_INSTANCE(agent_udp_port_listener_t, + opal_list_item_t, + port_listener_constructor, + port_listener_destructor); + +static void agent_ping_result_zero(agent_ping_t *obj) +{ + obj->src_ipv4_addr = 0; + obj->src_udp_port = 0; + obj->listener = NULL; + obj->dest_ipv4_addr = 0; + obj->dest_udp_port = 0; + obj->num_sends = 0; + obj->timer_active = false; + + for (int i = 0; i < NUM_PING_SIZES; ++i) { + obj->sizes[i] = 0; + obj->buffers[i] = NULL; + obj->acked[i] = false; + } +} + +static void agent_ping_result_constructor(agent_ping_t *obj) +{ + agent_ping_result_zero(obj); +} + +static void agent_ping_result_destructor(agent_ping_t *obj) +{ + for (int i = 0; i < NUM_PING_SIZES; ++i) { + if (NULL != obj->buffers[i]) { + free(obj->buffers[i]); + } + } + if (obj->timer_active) { + opal_event_del(&obj->timer); + } + + agent_ping_result_zero(obj); +} + +OBJ_CLASS_INSTANCE(agent_ping_t, + opal_list_item_t, + agent_ping_result_constructor, + agent_ping_result_destructor); + +/* + * Wrapper around sendto() loop + */ +static void agent_sendto(int fd, char *buffer, ssize_t numbytes, + struct sockaddr *addr) +{ + ssize_t rc; + while (1) { + rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr)); + /* Note that since this is UDP, so we don't need to check + for 0 < rc < ap->sizes[i] */ + if (rc == numbytes) { + return; + } else if (rc < 0) { + if (errno == EAGAIN || errno == EINTR) { + continue; + } + + ABORT("Unexpected sendto() error"); + /* Will not return */ + } + + /* We should never get here, but just in case we do, sleep a + little, just so we don't hammer the CPU */ + usleep(1); + } + + /* Will not get here */ +} + +/************************************************************************** + * All of the following functions run in RTE thread + **************************************************************************/ + +/* This variable belongs to the agent thread, but is safe to access + from the RTE thread (because of its own internal locking) */ +static opal_event_base_t *evbase; + +/* Note that to avoid locking of the agent thread data structures, the + ORTE thread routines avoid reading/writing those data structures. + Instead, when the ORTE routines receive commands from the client + thread, they basically save the message and queue up an event to + run in the agent thread. */ + +/* Need to forward declare these functions; they are referenced as + function pointers in the RTE thread functions */ +static void agent_thread_cmd_listen(int fd, short flags, void *context); +static void agent_thread_cmd_ping(int fd, short flags, void *context); + +/* + * Save the message info and queue it up in an event to run in the + * agent thread. + */ +static void agent_queue_thread_cmd(opal_buffer_t *buffer, + ompi_process_name_t *sender, + opal_event_cbfunc_t callback) +{ + agent_buf_name_t *abn = malloc(sizeof(agent_buf_name_t)); + if (NULL == abn) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + + /* Copy the buffer so that we can have it after the RML receive + callback returns (have pending question in to Ralph as to why + this is necessary, vs. just OBJ_RETAIN'ing the buffer). Note + that copy_payload copies *from the current buffer position*, so + we don't need to re-unpack the command from the new buffer. */ + abn->buffer = OBJ_NEW(opal_buffer_t); + if (NULL == abn->buffer) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + opal_dss.copy_payload(abn->buffer, buffer); + + abn->sender = *sender; + + /* Queue up an immediately-active event in the agent thread */ + opal_event_set(evbase, &abn->event, -1, OPAL_EV_WRITE, callback, abn); + opal_event_active(&abn->event, OPAL_EV_WRITE, 1); +} + +/* + * Called when we get an incoming RML message + */ +static void agent_rml_receive(int status, ompi_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int32_t command; + + /* Unpack and invoke the command */ + UNPACK_INT32(buffer, command); + assert(CONNECTIVITY_AGENT_CMD_LISTEN == command || + CONNECTIVITY_AGENT_CMD_PING == command); + + switch(command) { + case CONNECTIVITY_AGENT_CMD_LISTEN: + agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_listen); + break; + case CONNECTIVITY_AGENT_CMD_PING: + agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_ping); + break; + default: + ABORT("Unexpected connectivity agent command"); + break; + } +} + +/************************************************************************** + * All of the following functions run in agent thread + **************************************************************************/ + +static struct timeval ack_timeout; +static opal_list_t listeners; +/* JMS The pings_pending and ping_results should probably both be hash + tables for more efficient lookups */ +static opal_list_t pings_pending; +static opal_list_t ping_results; +static volatile bool agent_thread_time_to_exit = false; + +/* + * A dummy function invoked in an event just for the purposes of + * waking up the agent main thread (in case it was blocked in the + * event loop with no other events to wake it up). + */ +static void agent_thread_noop(int fd, short flags, void *context) +{ + /* Intentionally a no op */ +} + +/* + * Handle an incoming PING message (send an ACK) + */ +static void agent_thread_handle_ping(agent_udp_port_listener_t *listener, + ssize_t numbytes, struct sockaddr *from) +{ + /* If the size we received isn't equal to what the sender says it + sent, do the simple thing: just don't send an ACK */ + agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer; + if (msg->size != numbytes) { + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity got bad message; discarded"); + return; + } + + char src_ipv4_addr_str[IPV4ADDRLEN]; + ompi_btl_usnic_snprintf_ipv4_addr(src_ipv4_addr_str, + sizeof(src_ipv4_addr_str), + msg->src_ipv4_addr, 0); + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity got PING (size=%ld) from %s; sending ACK", + numbytes, src_ipv4_addr_str); + + /* Send back an ACK. No need to allocate a new buffer; just + re-use the same buffer we just got. Note that msg->size is + already set.. */ + msg->message_type = AGENT_MSG_TYPE_ACK; + msg->src_ipv4_addr = listener->ipv4_addr; + msg->src_udp_port = listener->udp_port; + + agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from); +} + +/* + * Handle an incoming ACK message + */ +static void agent_thread_handle_ack(agent_udp_port_listener_t *listener, + ssize_t numbytes, struct sockaddr *from) +{ + /* If we got a wonky ACK message that is the wrong length, just + return */ + agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer; + if (numbytes != sizeof(*msg)) { + return; + } + + /* Find the pending ping request that this ACK is for */ + agent_ping_t *ap; + OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) { + if (ap->dest_ipv4_addr == msg->src_ipv4_addr && + ap->dest_udp_port == msg->src_udp_port) { + /* Found it -- indicate that it has been acked */ + for (int i = 0; i < NUM_PING_SIZES; ++i) { + if (ap->sizes[i] == msg->size) { + ap->acked[i] = true; + return; + } + } + } + } + + /* If we didn't find the matching ping for this ACK, then just + discard it */ +} + +/* + * Receive a message from the listening UDP socket + */ +static void agent_thread_receive_ping(int fd, short flags, void *context) +{ + agent_udp_port_listener_t *listener = + (agent_udp_port_listener_t *) context; + assert(NULL != listener); + + /* Receive the message */ + ssize_t numbytes; + struct sockaddr src_addr; + socklen_t addrlen = sizeof(src_addr); + + while (1) { + numbytes = recvfrom(listener->fd, listener->buffer, listener->mtu, 0, + &src_addr, &addrlen); + if (numbytes > 0) { + break; + } else if (numbytes < 0) { + if (errno == EAGAIN || errno == EINTR) { + continue; + } + + ABORT("Unexpected error from recvfrom"); + /* Will not return */ + } + } + + agent_udp_message_t *msg; + msg = (agent_udp_message_t *) listener->buffer; + switch (msg->message_type) { + case AGENT_MSG_TYPE_PING: + agent_thread_handle_ping(listener, numbytes, &src_addr); + break; + case AGENT_MSG_TYPE_ACK: + agent_thread_handle_ack(listener, numbytes, &src_addr); + break; + default: + ABORT("Unexpected connectivity ping message type"); + break; + } +} + +static bool agent_thread_already_listening(uint32_t ipv4_addr, + uint32_t *udp_port) +{ + agent_udp_port_listener_t *listener; + OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) { + if (listener->ipv4_addr == ipv4_addr) { + *udp_port = listener->udp_port; + return true; + } + } + + return false; +} + +/* + * Send an RML reply back from the LISTEN command: send back the IP + * address and UDP port that we're listening on. + */ +static int agent_thread_cmd_listen_reply(ompi_process_name_t *dest, + uint64_t addr, int32_t udp_port) +{ + opal_buffer_t *msg; + msg = OBJ_NEW(opal_buffer_t); + if (NULL == msg) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN); + PACK_UINT32(msg, addr); + PACK_UINT32(msg, udp_port); + + int ret; + ret = ompi_rte_send_buffer_nb(dest, msg, + OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY, + ompi_rte_send_cbfunc, NULL); + if (OMPI_SUCCESS != ret) { + OMPI_ERROR_LOG(ret); + OBJ_RELEASE(msg); + return ret; + } + + return OMPI_SUCCESS; +} + +/* + * The RTE thread will queue up an event to call this function when it + * receives a LISTEN command RML message. + */ +static void agent_thread_cmd_listen(int fd, short flags, void *context) +{ + agent_buf_name_t *abn = (agent_buf_name_t*) context; + opal_buffer_t *buffer = abn->buffer; + ompi_process_name_t *sender = &abn->sender; + + int ret; + uint32_t ipv4_addr, cidrmask; + UNPACK_UINT32(buffer, ipv4_addr); + UNPACK_UINT32(buffer, cidrmask); + + /* If we're already listening on this address, send the UDP port + back to the client. */ + uint32_t udp_port; + if (agent_thread_already_listening(ipv4_addr, &udp_port)) { + agent_thread_cmd_listen_reply(sender, ipv4_addr, udp_port); + + OBJ_RELEASE(buffer); + free(abn); + + return; + } + + /* We're not listening on this address already, so create a + listener entry */ + agent_udp_port_listener_t *listener = NULL; + listener = OBJ_NEW(agent_udp_port_listener_t); + if (NULL == listener) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + + /* Unpack the rest of the message */ + UNPACK_UINT32(buffer, listener->mtu); + UNPACK_STRING(buffer, listener->nodename); + UNPACK_STRING(buffer, listener->if_name); + UNPACK_STRING(buffer, listener->usnic_name); + uint8_t mac[6]; + UNPACK_BYTES(buffer, mac, sizeof(mac)); + + listener->ipv4_addr = ipv4_addr; + listener->cidrmask = cidrmask; + + /* We're now done with the RTE buffer */ + OBJ_RELEASE(buffer); + buffer = NULL; + + /* Fill in the ipv4_addr_str and mac_str. Since we don't have the + IPv4 address in sockaddr_in form, it's not worth using + inet_ntop() */ + ompi_btl_usnic_snprintf_ipv4_addr(listener->ipv4_addr_str, + sizeof(listener->ipv4_addr_str), + ipv4_addr, cidrmask); + ompi_btl_usnic_sprintf_mac(listener->mac_str, mac); + + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity agent listening on %s, (%s/%s)", + listener->ipv4_addr_str, + listener->usnic_name, listener->if_name); + + listener->buffer = malloc(listener->mtu); + if (NULL == listener->buffer) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + + /* Create the listening socket */ + listener->fd = socket(AF_INET, SOCK_DGRAM, 0); + if (listener->fd < 0) { + OMPI_ERROR_LOG(listener->fd); + ABORT("Could not open listening socket"); + /* Will not return */ + } + + /* Bind it to the designated interface */ + struct sockaddr_in inaddr; + memset(&inaddr, 0, sizeof(inaddr)); + inaddr.sin_family = AF_INET; + inaddr.sin_addr.s_addr = ipv4_addr; + inaddr.sin_port = htons(0); + + ret = bind(listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr)); + if (ret < 0) { + OMPI_ERROR_LOG(ret); + ABORT("Could not bind listening socket"); + /* Will not return */ + } + + /* Find out the port we got */ + opal_socklen_t addrlen = sizeof(struct sockaddr_in); + ret = getsockname(listener->fd, (struct sockaddr*) &inaddr, &addrlen); + if (ret < 0) { + OMPI_ERROR_LOG(ret); + ABORT("Could not get UDP port number from listening socket"); + /* Will not return */ + } + listener->udp_port = ntohs(inaddr.sin_port); + + /* Set the "don't fragment" bit on outgoing frames because we + want MTU-sized messages to get through successfully to the + peer, or fail if they have to fragment because of an MTU + mismatch somewhere enroute */ + int val = IP_PMTUDISC_DO; + ret = setsockopt(listener->fd, IPPROTO_IP, IP_MTU_DISCOVER, + &val, sizeof(val)); + if (0 != ret) { + OMPI_ERROR_LOG(ret); + ABORT("Unable to set \"do not fragment\" on UDP socket"); + /* Will not return */ + } + + /* Set the send and receive buffer sizes to our MTU size */ + int temp; + temp = (int) listener->mtu; + if ((ret = setsockopt(listener->fd, SOL_SOCKET, SO_RCVBUF, + &temp, sizeof(temp))) < 0 || + (ret = setsockopt(listener->fd, SOL_SOCKET, SO_SNDBUF, + &temp, sizeof(temp))) < 0) { + OMPI_ERROR_LOG(ret); + ABORT("Could not set socket buffer sizes"); + /* Will not return */ + } + + /* Create a listening event */ + opal_event_set(evbase, &listener->event, listener->fd, + OPAL_EV_READ | OPAL_EV_PERSIST, + agent_thread_receive_ping, listener); + opal_event_add(&listener->event, 0); + + /* Save this listener on the list of listeners */ + opal_list_append(&listeners, &listener->super); + + /* Return the port number to the sender */ + ret = agent_thread_cmd_listen_reply(sender, ipv4_addr, listener->udp_port); + + /* All done! */ + free(abn); + return; +} + +/* + * Send a ping + */ +static void agent_thread_send_ping(int fd, short flags, void *context) +{ + agent_ping_t *ap = (agent_ping_t*) context; + ap->timer_active = false; + + char dest_ipv4_addr_str[IPV4ADDRLEN]; + ompi_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str, + sizeof(dest_ipv4_addr_str), + ap->dest_ipv4_addr, ap->dest_cidrmask); + + /* If we got all the ACKs for this ping, then move this ping from + the "pending" list to the "results" list. We can also free the + buffers associated with this ping result, just to save some + space in the long run. */ + if (ap->acked[0] && ap->acked[1]) { + opal_list_remove_item(&pings_pending, &ap->super); + opal_list_append(&ping_results, &ap->super); + + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity GOOD between %s <--> %s", + ap->listener->ipv4_addr_str, + dest_ipv4_addr_str); + + for (int i = 0; i < 2; ++i) { + if (NULL != ap->buffers[i]) { + free(ap->buffers[i]); + ap->buffers[i] = NULL; + } + } + + return; + } + + /* If we've resent too many times, then just abort */ + if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) { + char *topic; + if (ap->acked[0] && !ap->acked[1]) { + topic = "connectivity error: small ok, large bad"; + } else if (!ap->acked[0] && ap->acked[1]) { + topic = "connectivity error: small bad, large ok"; + } else { + topic = "connectivity error: small bad, large bad"; + } + + char mac_str[MACLEN], ipv4_addr_str[IPV4ADDRLEN]; + ompi_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str), + ap->dest_ipv4_addr, + ap->dest_cidrmask); + ompi_btl_usnic_sprintf_mac(mac_str, ap->dest_mac); + opal_show_help("help-mpi-btl-usnic.txt", topic, true, + ompi_process_info.nodename, + ap->listener->ipv4_addr_str, + ap->listener->usnic_name, + ap->listener->if_name, + ap->listener->mac_str, + ap->dest_nodename, + ipv4_addr_str, + mac_str, + ap->sizes[0], + ap->sizes[1]); + ompi_btl_usnic_exit(); + /* Will not return */ + } + + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity pinging %s (%s) from %s (%s/%s)", + dest_ipv4_addr_str, ap->dest_nodename, + ap->listener->ipv4_addr_str, + ap->listener->if_name, ap->listener->usnic_name); + + /* Send the ping messages to the peer */ + for (int i = 0; i < NUM_PING_SIZES; ++i) { + agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i], + (struct sockaddr*) &ap->dest_sockaddr); + } + + /* Set a timer to check if these pings are ACKed */ + opal_event_set(evbase, &ap->timer, + -1, 0, agent_thread_send_ping, ap); + opal_event_add(&ap->timer, &ack_timeout); + ap->timer_active = true; + + /* Count how many times we've done this */ + ++ap->num_sends; +} + +/* + * The RTE thread will queue up an event to call this function when it + * receives a PING command RML message. + */ +static void agent_thread_cmd_ping(int fd, short flags, void *context) +{ + agent_buf_name_t *abn = (agent_buf_name_t*) context; + opal_buffer_t *buffer = abn->buffer; + + uint32_t src_ipv4_addr, src_udp_port; + uint32_t dest_ipv4_addr, dest_cidrmask, dest_udp_port, mtu; + uint8_t dest_mac[6]; + char *dest_nodename; + UNPACK_UINT32(buffer, src_ipv4_addr); + UNPACK_UINT32(buffer, src_udp_port); + UNPACK_UINT32(buffer, dest_ipv4_addr); + UNPACK_UINT32(buffer, dest_cidrmask); + UNPACK_UINT32(buffer, dest_udp_port); + UNPACK_BYTES(buffer, dest_mac, 6); + UNPACK_UINT32(buffer, mtu); + UNPACK_STRING(buffer, dest_nodename); + + /* We're now done with the original RML message buffer */ + OBJ_RELEASE(buffer); + free(abn); + + /* Have we already pinged this IP address / port? */ + agent_ping_t *ap; + OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) { + if (ap->dest_ipv4_addr == dest_ipv4_addr && + ap->dest_udp_port == dest_udp_port) { + /* We already have results from pinging this IP address / + port, so there's no need for further action */ + return; + } + } + + /* Are we in the middle of pinging this IP address / port? */ + OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) { + if (ap->dest_ipv4_addr == dest_ipv4_addr && + ap->dest_udp_port == dest_udp_port) { + /* We're already in the middle of pinging this IP address + / port, so there's no need for further action */ + return; + } + } + + /* This is a new ping request. Find the listener with this source + ipv4 address */ + bool found = false; + agent_udp_port_listener_t *listener; + OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) { + if (listener->ipv4_addr == src_ipv4_addr) { + found = true; + break; + } + } + if (!found) { + ABORT("Could not ping listener for ping request"); + /* Will not return */ + } + + /* This is a new ping request; track it */ + ap = OBJ_NEW(agent_ping_t); + if (NULL == ap) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + ap->src_ipv4_addr = src_ipv4_addr; + ap->src_udp_port = src_udp_port; + ap->listener = listener; + ap->dest_ipv4_addr = dest_ipv4_addr; + ap->dest_cidrmask = dest_cidrmask; + ap->dest_udp_port = dest_udp_port; + ap->dest_sockaddr.sin_family = AF_INET; + ap->dest_sockaddr.sin_addr.s_addr = dest_ipv4_addr; + ap->dest_sockaddr.sin_port = htons(dest_udp_port); + memcpy(ap->dest_mac, dest_mac, 6); + ap->dest_nodename = dest_nodename; + + /* The first message we send will be "short" (a simple control + message); the second will be "long" (i.e., caller-specified + length) */ + ap->sizes[0] = sizeof(agent_udp_message_t); + + /* Note that the MTU is the max Ethernet frame payload. So from + that MTU, we have to subtract off the max IP header (e.g., if + all IP options are enabled, which is 60 bytes), and then also + subtract off the UDP header (which is 8 bytes). So we need to + subtract off 68 bytes from the MTU, and that's the largest ping + payload we can send. */ + ap->sizes[1] = mtu - 68; + + /* Allocate a buffer for each size. Make sure the smallest size + is at least sizeof(agent_udp_message_t). */ + agent_udp_message_t *msg; + for (size_t i = 0; i < NUM_PING_SIZES; ++i) { + ap->buffers[i] = calloc(1, ap->sizes[i]); + if (NULL == ap->buffers[i]) { + OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE); + ABORT("Out of memory"); + /* Will not return */ + } + + /* Fill in the message with return addressing information */ + msg = (agent_udp_message_t*) ap->buffers[i]; + msg->message_type = AGENT_MSG_TYPE_PING; + msg->src_ipv4_addr = ap->src_ipv4_addr; + msg->src_udp_port = ap->src_udp_port; + msg->size = ap->sizes[i]; + } + + /* Save this ping request on the "pending" list */ + opal_list_append(&pings_pending, &ap->super); + + /* Send the ping */ + agent_thread_send_ping(0, 0, ap); +} + +/* + * Agent progress thread main entry point + */ +static void *agent_thread_main(opal_object_t *obj) +{ + while (!agent_thread_time_to_exit) { + opal_event_loop(evbase, OPAL_EVLOOP_ONCE); + } + + return NULL; +} + +/************************************************************************** + * All of the following functions run in the main application thread + **************************************************************************/ + +static bool agent_initialized = false; +static opal_thread_t agent_thread; + +/* + * Setup the agent and start its event loop running in a dedicated + * thread + */ +int ompi_btl_usnic_connectivity_agent_init(void) +{ + /* Only do this initialization if I am the agent (the agent is + local rank 0) */ + if (ompi_process_info.my_local_rank != 0) { + return OMPI_SUCCESS; + } + if (agent_initialized) { + return OMPI_SUCCESS; + } + + /* Create the event base */ + evbase = opal_event_base_create(); + + /* Make a struct timeval for use with timer events */ + ack_timeout.tv_sec = + mca_btl_usnic_component.connectivity_ack_timeout / 1000; + ack_timeout.tv_usec = mca_btl_usnic_component.connectivity_ack_timeout; + ack_timeout.tv_usec -= ack_timeout.tv_sec * 1000; + + /* Create lists */ + OBJ_CONSTRUCT(&listeners, opal_list_t); + OBJ_CONSTRUCT(&pings_pending, opal_list_t); + OBJ_CONSTRUCT(&ping_results, opal_list_t); + + /******************************************************************** + * Once all of the above is setup, launch the RML receives and + * start the event loop. + ********************************************************************/ + + /* Setup the RML receive */ + ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD, + OMPI_RML_TAG_USNIC_CONNECTIVITY, + OMPI_RML_PERSISTENT, + agent_rml_receive, NULL); + + /* Spawn the agent thread event loop */ + OBJ_CONSTRUCT(&agent_thread, opal_thread_t); + agent_thread.t_run = agent_thread_main; + agent_thread.t_arg = NULL; + int ret; + ret = opal_thread_start(&agent_thread); + if (OPAL_SUCCESS != ret) { + OMPI_ERROR_LOG(ret); + ABORT("Failed to start usNIC agent thread"); + /* Will not return */ + } + + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity agent initialized"); + agent_initialized = true; + return OMPI_SUCCESS; +} + +/* + * Shut down the agent + */ +int ompi_btl_usnic_connectivity_agent_finalize(void) +{ + agent_initialized = false; + + /* Only do this if I have the agent running */ + if (NULL == evbase) { + return OMPI_SUCCESS; + } + + /* Cancel the RML receive */ + ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_USNIC_CONNECTIVITY); + + /* Shut down the event loop. Send it a no-op event so that it + wakes up and exits the loop. */ + opal_event_t ev; + agent_thread_time_to_exit = true; + opal_event_set(evbase, &ev, -1, OPAL_EV_WRITE, agent_thread_noop, NULL); + opal_event_active(&ev, OPAL_EV_WRITE, 1); + opal_thread_join(&agent_thread, NULL); + + /* Shut down all active listeners */ + agent_udp_port_listener_t *listener, *lnext; + OPAL_LIST_FOREACH_SAFE(listener, lnext, &listeners, + agent_udp_port_listener_t) { + opal_event_del(&listener->event); + close(listener->fd); + opal_list_remove_item(&listeners, &listener->super); + OBJ_RELEASE(listener); + } + + /* Destroy the pending pings and ping results */ + agent_ping_t *request, *pnext; + OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) { + opal_list_remove_item(&pings_pending, &request->super); + OBJ_RELEASE(request); + } + + OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) { + opal_list_remove_item(&ping_results, &request->super); + OBJ_RELEASE(request); + } + + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity client finalized"); + return OMPI_SUCCESS; +} diff --git a/ompi/mca/btl/usnic/btl_usnic_cclient.c b/ompi/mca/btl/usnic/btl_usnic_cclient.c new file mode 100644 index 0000000000..0b52f83e40 --- /dev/null +++ b/ompi/mca/btl/usnic/btl_usnic_cclient.c @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include +#include +#include +#include +#include + +#include "opal_stdint.h" +#include "opal/threads/mutex.h" +#include "opal/mca/event/event.h" +#include "opal/mca/db/db.h" +#include "opal/util/output.h" + +#include "ompi/proc/proc.h" +#include "ompi/mca/rte/rte.h" +#include "ompi/constants.h" + +#include "btl_usnic.h" +#include "btl_usnic_module.h" +#include "btl_usnic_connectivity.h" + +/************************************************************************** + * Client-side data and methods + **************************************************************************/ + +static bool initialized = false; +static ompi_process_name_t agent_name; + +typedef struct { + uint32_t addr; + uint32_t udp_port; + bool receive_done; +} client_rml_receive_data_t; + + +/* + * Receive replies from the agent + */ +static void client_rml_receive(int status, ompi_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int32_t command; + volatile client_rml_receive_data_t *cddr = + (client_rml_receive_data_t*) cbdata; + + /* What command is this a reply for? */ + UNPACK_INT32(buffer, command); + assert(command == CONNECTIVITY_AGENT_CMD_LISTEN); + + UNPACK_UINT32(buffer, cddr->addr); + UNPACK_UINT32(buffer, cddr->udp_port); + + /* Tell the main thread that the reply is done */ + cddr->receive_done = true; +} + + +/* + * Startup the agent and share our MCA param values with the it. + */ +int ompi_btl_usnic_connectivity_client_init(void) +{ + /* If connectivity checking is not enabled, do nothing */ + if (!mca_btl_usnic_component.connectivity_enabled) { + return OMPI_SUCCESS; + } + + assert(!initialized); + + /* Get the name of the agent */ + int ret; + ompi_process_name_t *ptr; + ptr = &agent_name; + ret = ompi_rte_db_fetch(ompi_proc_local_proc, OPAL_DB_LOCALLDR, (void**) &ptr, OPAL_ID_T); + if (OMPI_SUCCESS != ret) { + OMPI_ERROR_LOG(ret); + BTL_ERROR(("usNIC connectivity client unable to db_fetch local leader")); + return ret; + } + + initialized = true; + opal_output_verbose(20, USNIC_OUT, + "usNIC connectivity client initialized"); + return OMPI_SUCCESS; +} + + +/* + * Send a listen command to the agent + */ +int ompi_btl_usnic_connectivity_listen(ompi_btl_usnic_module_t *module) +{ + /* If connectivity checking is not enabled, do nothing */ + if (!mca_btl_usnic_component.connectivity_enabled) { + return OMPI_SUCCESS; + } + + opal_buffer_t *msg; + msg = OBJ_NEW(opal_buffer_t); + if (NULL == msg) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* Send the LISTEN command. Include enough information for the + agent to be able to print a show_help() message, if + necessary. */ + PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN); + PACK_UINT32(msg, module->local_addr.ipv4_addr); + PACK_UINT32(msg, module->local_addr.cidrmask); + PACK_UINT32(msg, module->local_addr.mtu); + PACK_STRING(msg, ompi_process_info.nodename); + PACK_STRING(msg, module->if_name); + PACK_STRING(msg, ibv_get_device_name(module->device)); + PACK_BYTES(msg, module->local_addr.mac, 6); + + /* Post a receive for the agent to reply with the UDP port to me */ + volatile client_rml_receive_data_t data; + data.receive_done = false; + ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD, + OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY, + 0, + client_rml_receive, (void*) &data); + + /* Send it to the agent */ + int ret; + ret = ompi_rte_send_buffer_nb(&agent_name, msg, + OMPI_RML_TAG_USNIC_CONNECTIVITY, + ompi_rte_send_cbfunc, NULL); + if (OMPI_SUCCESS != ret) { + OMPI_ERROR_LOG(ret); + OBJ_RELEASE(msg); + return ret; + } + + /* Wait for the reply */ + while (!data.receive_done) { + /* Sleep to let the RTE progress thread run */ + usleep(1); + } + + /* Get the UDP port number that was received */ + module->local_addr.connectivity_udp_port = data.udp_port; + + return OMPI_SUCCESS; +} + + +int ompi_btl_usnic_connectivity_ping(uint32_t src_ipv4_addr, int src_port, + uint32_t dest_ipv4_addr, + uint32_t dest_cidrmask, int dest_port, + uint8_t dest_mac[6], char *dest_nodename, + size_t mtu) +{ + /* If connectivity checking is not enabled, do nothing */ + if (!mca_btl_usnic_component.connectivity_enabled) { + return OMPI_SUCCESS; + } + + opal_buffer_t *msg; + msg = OBJ_NEW(opal_buffer_t); + if (NULL == msg) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_PING); + PACK_UINT32(msg, src_ipv4_addr); + PACK_UINT32(msg, src_port); + PACK_UINT32(msg, dest_ipv4_addr); + PACK_UINT32(msg, dest_cidrmask); + PACK_UINT32(msg, dest_port); + PACK_BYTES(msg, dest_mac, 6); + PACK_UINT32(msg, mtu); + PACK_STRING(msg, dest_nodename); + + /* Send it to the agent */ + int ret; + ret = ompi_rte_send_buffer_nb(OMPI_PROC_MY_NAME, msg, + OMPI_RML_TAG_USNIC_CONNECTIVITY, + ompi_rte_send_cbfunc, NULL); + if (OMPI_SUCCESS != ret) { + OMPI_ERROR_LOG(ret); + OBJ_RELEASE(msg); + return ret; + } + + return OMPI_SUCCESS; +} + + +/* + * Shut down the connectivity client + */ +int ompi_btl_usnic_connectivity_client_finalize(void) +{ + /* Make it safe to finalize, even if we weren't initialized */ + if (!initialized) { + return OMPI_SUCCESS; + } + + initialized = false; + return OMPI_SUCCESS; +} diff --git a/ompi/mca/btl/usnic/btl_usnic_component.c b/ompi/mca/btl/usnic/btl_usnic_component.c index 42de9f4c33..caa2d886e4 100644 --- a/ompi/mca/btl/usnic/btl_usnic_component.c +++ b/ompi/mca/btl/usnic/btl_usnic_component.c @@ -64,6 +64,7 @@ #include "ompi/mca/common/verbs/common_verbs.h" #include "btl_usnic.h" +#include "btl_usnic_connectivity.h" #include "btl_usnic_frag.h" #include "btl_usnic_endpoint.h" #include "btl_usnic_module.h" @@ -207,6 +208,12 @@ static int usnic_component_close(void) usnic_clock_timer_event_set = false; } + /* Finalize the connectivity client and agent */ + if (mca_btl_usnic_component.connectivity_enabled) { + ompi_btl_usnic_connectivity_client_finalize(); + ompi_btl_usnic_connectivity_agent_finalize(); + } + free(mca_btl_usnic_component.usnic_all_modules); free(mca_btl_usnic_component.usnic_active_modules); @@ -517,6 +524,14 @@ static mca_btl_base_module_t** usnic_component_init(int* num_btl_modules, free(mca_btl_usnic_component.vendor_part_ids_string); mca_btl_usnic_component.vendor_part_ids_string = NULL; + /* Setup the connectivity checking agent and client. */ + if (mca_btl_usnic_component.connectivity_enabled) { + if (OMPI_SUCCESS != ompi_btl_usnic_connectivity_agent_init() || + OMPI_SUCCESS != ompi_btl_usnic_connectivity_client_init()) { + return NULL; + } + } + /************************************************************************ * Below this line, we assume that usnic is loaded on all procs, * and therefore we will guarantee to the the modex send, even if diff --git a/ompi/mca/btl/usnic/btl_usnic_connectivity.h b/ompi/mca/btl/usnic/btl_usnic_connectivity.h new file mode 100644 index 0000000000..a9e3336a9f --- /dev/null +++ b/ompi/mca/btl/usnic/btl_usnic_connectivity.h @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_BTL_USNIC_CONNECTIVITY_H +#define OMPI_BTL_USNIC_CONNECTIVITY_H + +#include "ompi_config.h" + +#include + +#include "opal/util/show_help.h" + +#include "ompi/runtime/mpiruntime.h" + +#include "btl_usnic_util.h" + + +/** + * Agent-based service to verify UDP connectivity between two peers. + * + * Generally, it is a client-server pattern with three entities + * involved: + * + * 1. Agent thread: running in MPI process local rank 0 + * 2. Client: running in the main application thread in every MPI process + * 3. RTE thread: running in every MPI process + * + * If enabled (via MCA param), the usnic module_init() will setup the + * client (and server on local rank 0). For each usnic module, Each + * client will RML send a request to the server asking it to listen on + * its usnic interface. The agent will discard duplicates and setup a + * single UDP socket listener on the eth interface corresponding to + * each requested usnic interface. The agent returns the listening + * UDP port number to the client, and each client puts this UDP port + * number in their modex information. + * + * At the first send to a given MPI process peer, the client will send + * another request to the server asking it to verify connectivity to + * the peer (supplying the peer's UDP listener port number from the + * peer's modex info). Again, the agent will discard duplicates -- it + * will only verify connectivity to each peer's *server* once. The + * agent will send a short UDP message and a long UDP message + * (basically, the MTU-68 bytes -- see comment in btl_usnic_cagent.c + * for the reasons why) to the listening peer UDP port. + * + * When the peer agent gets PING messages, it sends short ACK control + * messages back to the sending agent. When the sending agent gets + * all ACKs back from the peer, it rules that connectivity is GOOD and + * no further action is taken. If the sending agent doesn't get one + * or both ACKs back in a timely fashion, it re-sends the PING(s) that + * wasn't(weren't) ACKed. Eventually if the sending agent re-sends + * too many times and does not get an ACK back, it gives up, displays + * and error, and aborts the MPI job. + * + * Note that the client/server interaction is intentionally quite + * primitive: + * + * 1. Client requests agent to listen on interface X. Server responds + * with UDP port number of listener. + * + * 2. Client requests ping check to peer Y. Client does not wait for + * the answer; the agent either verifies the connectivity successfully + * or aborts the job. + * + * All client/agent communication is via the RML. + * + * As mentioned above, the agent is smart about discarding duplicate + * ping requests from clients. Since a single agent serves all MPI + * processes on a given server, this cuts down on a lot of PING + * traffic. + */ + +/* + * Forward declaration + */ +struct ompi_btl_usnic_module_t; + +/** @internal + * This macro just makes the macros below a little easier to read. + */ +#define ABORT(msg) ompi_btl_usnic_util_abort((msg), __FILE__, __LINE__, 1) + +/** + * Helper macro to pack binary bytes into RML message buffers. + * + * Note that this macro will call ompi_btl_usnic_util_abort() + * if an error occurs. + */ +#define PACK_BYTES(buffer, value, num_bytes) \ + do { \ + int ret; \ + ret = opal_dss.pack((buffer), &(value), (num_bytes), OPAL_BYTE); \ + if (OPAL_SUCCESS != ret) { \ + OMPI_ERROR_LOG(ret); \ + ABORT("Could not pack"); \ + } \ + } while(0) + +/** + * @internal + * + * Back-end macro for PACK_STRING, PACK_INT32, and PACK_UINT32. + * + * Note that this macro will call ompi_btl_usnic_util_abort() + * if an error occurs. + */ +#define PACK_(buffer, type, opal_type, value) \ + do { \ + int ret; \ + type temp = (type) (value); \ + if (OPAL_SUCCESS != \ + (ret = opal_dss.pack((buffer), &temp, 1, opal_type))) { \ + OMPI_ERROR_LOG(ret); \ + ABORT("Could not pack"); \ + } \ + } while(0) + +/** + * Helper macro the pack a string into RML message buffers. + */ +#define PACK_STRING(buffer, value) PACK_(buffer, char *, OPAL_STRING, value) +/** + * Helper macro the pack an int32_ into RML message buffers. + */ +#define PACK_INT32(buffer, value) PACK_(buffer, int32_t, OPAL_INT32, value) +/** + * Helper macro the pack a uint32_t into RML message buffers. + */ +#define PACK_UINT32(buffer, value) PACK_(buffer, uint32_t, OPAL_UINT32, value) + +/** + * Helper macro to unpack binary bytes from RML message buffers. + * + * Note that all of these macros will call ompi_btl_usnic_util_abort() if + * an error occurs. + */ +#define UNPACK_BYTES(buffer, value, num_bytes) \ + do { \ + int ret_value, n = (num_bytes); \ + ret_value = opal_dss.unpack((buffer), value, &n, OPAL_BYTE); \ + if (OPAL_SUCCESS != ret_value) { \ + OMPI_ERROR_LOG(ret_value); \ + ABORT("Could not unpack"); \ + } \ + } while(0) + +/** + * @internal + * + * Back-end macro for UNPACK_STRING, UNPACK_INT32, and UNPACK_UINT32 + * + * Note that this macro will call ompi_btl_usnic_util_abort() if an error + * occurs. + */ +#define UNPACK(buffer, type, opal_unpack_type, value) \ + do { \ + int ret_value, n = 1; \ + type temp; \ + ret_value = opal_dss.unpack((buffer), &temp, &n, opal_unpack_type); \ + if (OPAL_SUCCESS != ret_value) { \ + OMPI_ERROR_LOG(ret_value); \ + ABORT("Could not unpack"); \ + } else { \ + value = (type) temp; \ + } \ + } while(0) + +/** + * Helper macro to unpack a string from RML message buffers. + */ +#define UNPACK_STRING(buffer, value) \ + UNPACK(buffer, char *, OPAL_STRING, value); +/** + * Helper macro to unpack an int32_t from RML message buffers. + */ +#define UNPACK_INT32(buffer, value) \ + UNPACK(buffer, int32_t, OPAL_INT32, value); +/** + * Helper macro to unpack a uint32_t from RML message buffers. + */ +#define UNPACK_UINT32(buffer, value) \ + UNPACK(buffer, uint32_t, OPAL_UINT32, value) + +/** + * RML message types. This value is packed as the first field in each + * RML message to identify its type. Use a non-zero value as the + * first enum just as defensive programming (i.e., it's a slightly + * lower chance that an uninitialized message type would randomly + * match these values). + */ +enum { + CONNECTIVITY_AGENT_CMD_LISTEN = 17, + CONNECTIVITY_AGENT_CMD_PING, + CONNECTIVITY_AGENT_CMD_MAX +}; + +/** + * Startup the connectivity client. + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * It is safe to call this function even if the connectivity check is + * disabled; it will be a no-op in this case. + */ +int ompi_btl_usnic_connectivity_client_init(void); + +/** + * Tell the agent to establsh a listening port on the given IP + * address. + * + * @params[in] module The module that is requesting the listen. + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * The module contains the local interface addressing information, + * which tells the agent one which interface to listen. + * + * This routine will request the new listen from the agent, and wait + * for the agent to reply with the UDP port that is being used/was + * created. The UDP listening port will then be stuffed in + * module->local_addr.connectivity_udp_port (i.e., data that will be + * sent in the modex). + * + * It is safe to call this function even if the connectivity check is + * disabled; it will be a no-op in this case. + */ +int ompi_btl_usnic_connectivity_listen(struct ompi_btl_usnic_module_t *module); + +/** + * Tell the agent to ping a specific IP address and UDP port number + * with a specific message size. + * + * @param[in] src_ipv4_addr The source module IPv4 address + * @param[in] src_port The source module listening UDP port + * @param[in] dest_ipv4_addr The destination IPv4 address + * @param[in] dest_cidrmask The destination CIDR mask + * @param[in] dest_port The destination UDP port + * @param[in] dest_mac The destination MAC address + * @param[in] dest_nodename The destination server name + * @param[in] mtu The max ping message size to send + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * Note that several of the above parameters are only passed so that + * they can be used in a complete/helpful error message, if necessary. + * + * This function does not wait for a reply from the agent; it assumes + * the agent will successfully ping the remote peer or will abort the + * MPI job if the pinging fails. + * + * It is safe to call this function even if the connectivity check is + * disabled; it will be a no-op in this case. + */ +int ompi_btl_usnic_connectivity_ping(uint32_t src_ipv4_addr, int src_port, + uint32_t dest_ipv4_addr, + uint32_t dest_cidrmask, int dest_port, + uint8_t *dest_mac, char *dest_nodename, + size_t mtu); + +/** + * Shut down the connectivity service client. + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * It is safe to call this function even if the connectivity check is + * disabled; it will be a no-op in this case. + */ +int ompi_btl_usnic_connectivity_client_finalize(void); + +/** + * Startup the connectivity agent. + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * This function will be a no-op if this process is not the local rank + * 0. + */ +int ompi_btl_usnic_connectivity_agent_init(void); + +/** + * Shut down the connectivity agent + * + * @returns OMPI_SUCCESS or an OMPI error code. + * + * This function will be a no-op if this process is not the local rank + * 0. + */ +int ompi_btl_usnic_connectivity_agent_finalize(void); + +#endif /* OMPI_BTL_USNIC_CONNECITIVITY_H */ diff --git a/ompi/mca/btl/usnic/btl_usnic_endpoint.c b/ompi/mca/btl/usnic/btl_usnic_endpoint.c index 66b2a5adf0..245ce99b0d 100644 --- a/ompi/mca/btl/usnic/btl_usnic_endpoint.c +++ b/ompi/mca/btl/usnic/btl_usnic_endpoint.c @@ -53,6 +53,7 @@ static void endpoint_construct(mca_btl_base_endpoint_t* endpoint) endpoint->endpoint_proc = NULL; endpoint->endpoint_proc_index = -1; endpoint->endpoint_exiting = false; + endpoint->endpoint_connectivity_checked = false; for (i=0; iendpoint_remote_addr.qp_num[i] = 0; diff --git a/ompi/mca/btl/usnic/btl_usnic_endpoint.h b/ompi/mca/btl/usnic/btl_usnic_endpoint.h index e05d75eeb0..e6aced6267 100644 --- a/ompi/mca/btl/usnic/btl_usnic_endpoint.h +++ b/ompi/mca/btl/usnic/btl_usnic_endpoint.h @@ -70,6 +70,7 @@ typedef struct ompi_btl_usnic_addr_t { union ibv_gid gid; uint32_t ipv4_addr; uint32_t cidrmask; + uint32_t connectivity_udp_port; uint8_t mac[6]; int mtu; uint32_t link_speed_mbps; @@ -170,6 +171,8 @@ typedef struct mca_btl_base_endpoint_t { bool endpoint_rcvd_segs[WINDOW_SIZE]; uint32_t endpoint_rfstart; + + bool endpoint_connectivity_checked; } mca_btl_base_endpoint_t; typedef mca_btl_base_endpoint_t ompi_btl_usnic_endpoint_t; diff --git a/ompi/mca/btl/usnic/btl_usnic_mca.c b/ompi/mca/btl/usnic/btl_usnic_mca.c index 962a60f335..35c035bd10 100644 --- a/ompi/mca/btl/usnic/btl_usnic_mca.c +++ b/ompi/mca/btl/usnic/btl_usnic_mca.c @@ -126,6 +126,28 @@ static int reg_int(const char* param_name, } +/* + * utility routine for integer parameter registration + */ +static int reg_bool(const char* param_name, + const char* help_string, + bool default_value, bool *storage, int level) +{ + *storage = default_value; + mca_base_component_var_register(&mca_btl_usnic_component.super.btl_version, + param_name, help_string, + MCA_BASE_VAR_TYPE_BOOL, + NULL, + 0, + 0, + level, + MCA_BASE_VAR_SCOPE_READONLY, + storage); + + return OMPI_SUCCESS; +} + + int ompi_btl_usnic_component_register(void) { int tmp, ret = 0; @@ -249,6 +271,28 @@ int ompi_btl_usnic_component_register(void) ompi_btl_usnic_module_template.super.btl_bandwidth = 0; ompi_btl_usnic_module_template.super.btl_latency = 4; + /* Connectivity verification */ + mca_btl_usnic_component.connectivity_enabled = true; + CHECK(reg_bool("connectivity_check", + "Whether to enable the usNIC connectivity check upon first send (default = 1, enabled; 0 = disabled)", + mca_btl_usnic_component.connectivity_enabled, + &mca_btl_usnic_component.connectivity_enabled, + OPAL_INFO_LVL_3)); + + mca_btl_usnic_component.connectivity_ack_timeout = 1000; + CHECK(reg_int("connectivity_ack_timeout", + "Timeout, in milliseconds, while waiting for an ACK while verification connectivity between usNIC devices. If 0, the connectivity check is disabled (must be >=0).", + mca_btl_usnic_component.connectivity_ack_timeout, + &mca_btl_usnic_component.connectivity_ack_timeout, + REGINT_GE_ZERO, OPAL_INFO_LVL_3)); + + mca_btl_usnic_component.connectivity_num_retries = 10; + CHECK(reg_int("connectivity_error_num_retries", + "Number of times to retry usNIC connectivity verification before aborting the MPI job (must be >0).", + mca_btl_usnic_component.connectivity_num_retries, + &mca_btl_usnic_component.connectivity_num_retries, + REGINT_GE_ONE, OPAL_INFO_LVL_3)); + /* Register some synonyms to the ompi common verbs component */ ompi_common_verbs_mca_register(&mca_btl_usnic_component.super.btl_version); diff --git a/ompi/mca/btl/usnic/btl_usnic_module.c b/ompi/mca/btl/usnic/btl_usnic_module.c index a6d5ed50ba..79d445ad7b 100644 --- a/ompi/mca/btl/usnic/btl_usnic_module.c +++ b/ompi/mca/btl/usnic/btl_usnic_module.c @@ -41,6 +41,7 @@ #include "ompi/memchecker.h" #include "btl_usnic.h" +#include "btl_usnic_connectivity.h" #include "btl_usnic_frag.h" #include "btl_usnic_proc.h" #include "btl_usnic_endpoint.h" @@ -799,6 +800,20 @@ usnic_prepare_src( size_t osize = *size; #endif + /* Do we need to check the connectivity? */ + if (mca_btl_usnic_component.connectivity_enabled && + OPAL_UNLIKELY(!endpoint->endpoint_connectivity_checked)) { + ompi_btl_usnic_connectivity_ping(module->local_addr.ipv4_addr, + module->local_addr.connectivity_udp_port, + endpoint->endpoint_remote_addr.ipv4_addr, + endpoint->endpoint_remote_addr.cidrmask, + endpoint->endpoint_remote_addr.connectivity_udp_port, + endpoint->endpoint_remote_addr.mac, + endpoint->endpoint_proc->proc_ompi->proc_hostname, + endpoint->endpoint_remote_addr.mtu); + endpoint->endpoint_connectivity_checked = true; + } + /* * if total payload len fits in one MTU use small send, else large */ @@ -1929,6 +1944,19 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module) "btl:usnic: not sorting devices by NUMA distance (topology support not included)"); #endif + /* Setup a connectivity listener */ + if (mca_btl_usnic_component.connectivity_enabled) { + rc = ompi_btl_usnic_connectivity_listen(module); + if (OMPI_SUCCESS != rc) { + OMPI_ERROR_LOG(rc); + ABORT("Failed to notify connectivity agent to listen"); + } + } else { + /* If we're not doing a connectivity check, just set the port + to 0 */ + module->local_addr.connectivity_udp_port = 0; + } + /* Setup the pointer array for the procs that will be used by this module */ OBJ_CONSTRUCT(&module->all_procs, opal_pointer_array_t); diff --git a/ompi/mca/btl/usnic/btl_usnic_util.c b/ompi/mca/btl/usnic/btl_usnic_util.c index f5a013b034..278c1303b9 100644 --- a/ompi/mca/btl/usnic/btl_usnic_util.c +++ b/ompi/mca/btl/usnic/btl_usnic_util.c @@ -66,6 +66,33 @@ ompi_btl_usnic_dump_hex(uint8_t *addr, int len) } +/* + * Trivial wrapper around snprintf'ing an IPv4 address, with or + * without a CIDR mask (we don't usually carry around addresses in + * struct sockaddr form, so this wrapper is marginally easier than + * using inet_ntop()). + */ +void ompi_btl_usnic_snprintf_ipv4_addr(char *out, size_t maxlen, + uint32_t addr, uint32_t cidrmask) +{ + uint8_t *p = (uint8_t*) &addr; + if (cidrmask > 0) { + snprintf(out, maxlen, "%u.%u.%u.%u/%u", + p[0], + p[1], + p[2], + p[3], + cidrmask); + } else { + snprintf(out, maxlen, "%u.%u.%u.%u", + p[0], + p[1], + p[2], + p[3]); + } +} + + void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6]) { snprintf(out, 32, "%02x:%02x:%02x:%02x:%02x:%02x", diff --git a/ompi/mca/btl/usnic/btl_usnic_util.h b/ompi/mca/btl/usnic/btl_usnic_util.h index 9c261f6246..e41c8267e5 100644 --- a/ompi/mca/btl/usnic/btl_usnic_util.h +++ b/ompi/mca/btl/usnic/btl_usnic_util.h @@ -81,6 +81,13 @@ usnic_convertor_pack_simple( */ void ompi_btl_usnic_exit(void); +/* + * If cidrmask==0, it is not included in the output string. addr is + * expected to be in network byte order. + */ +void ompi_btl_usnic_snprintf_ipv4_addr(char *out, size_t maxlen, + uint32_t addr, uint32_t cidrmask); + void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6]); void ompi_btl_usnic_sprintf_gid_mac(char *out, union ibv_gid *gid); diff --git a/ompi/mca/btl/usnic/help-mpi-btl-usnic.txt b/ompi/mca/btl/usnic/help-mpi-btl-usnic.txt index 116450bec9..884f4758c1 100644 --- a/ompi/mca/btl/usnic/help-mpi-btl-usnic.txt +++ b/ompi/mca/btl/usnic/help-mpi-btl-usnic.txt @@ -198,3 +198,75 @@ The usnic BTL failed to initialize the rtnetlink query subsystem. Server: %s Error message: %s +# +[connectivity error: small ok, large bad] +The Open MPI usNIC BTL was unable to establish full connectivity +between at least one pair of servers in the MPI job. Specifically, +small UDP messages seem to flow between the servers, but large UDP +messages do not. + +Your MPI job is going to abort now. + + Source: + Hostname / IP: %s (%s) + Host interfaces: %s / %s + MAC address: %s + Destination: + Hostname / IP: %s (%s) + MAC address: %s + + Small message size: %u + Large message size: %u + +Note that this behavior usually indicates that the MTU of some network +link is too small between these two servers. You should verify that +UDP traffic with payloads up to the "large message size" listed above +can flow between these two servers. +# +[connectivity error: small bad, large ok] +The Open MPI usNIC BTL was unable to establish full connectivity +between at least one pair of servers in the MPI job. Specifically, +large UDP messages seem to flow between the servers, but small UDP +messages do not. + +Your MPI job is going to abort now. + + Source: + Hostname / IP: %s (%s) + Host interfaces: %s / %s + MAC address: %s + Destination: + Hostname / IP: %s (%s) + MAC address: %s + + Small message size: %u + Large message size: %u + +This is a very strange network error, and should not occur in most +situations. You may be experiencing high amounts of congestion, or +this may indicate some kind of network misconfiguration. You should +verify that UDP traffic with payloads up to the "large message size" +listed above can flow between these two servers. +# +[connectivity error: small bad, large bad] +The Open MPI usNIC BTL was unable to establish any connectivity +between at least one pair of servers in the MPI job. Specifically, +no UDP messages seemed to flow between these two servers. + +Your MPI job is going to abort now. + + Source: + Hostname / IP: %s (%s) + Host interfaces: %s / %s + MAC address: %s + Destination: + Hostname / IP: %s (%s) + MAC address: %s + + Small message size: %u + Large message size: %u + +Note that this behavior usually indicates some kind of network +misconfiguration. You should verify that UDP traffic with payloads up +to the "large message size" listed above can flow between these two +servers. diff --git a/ompi/mca/rte/rte.h b/ompi/mca/rte/rte.h index 8672f1c08f..dfe7585122 100644 --- a/ompi/mca/rte/rte.h +++ b/ompi/mca/rte/rte.h @@ -223,6 +223,9 @@ BEGIN_C_DECLS #define OMPI_RML_PCONNECT_TAG OMPI_RML_TAG_BASE+13 +#define OMPI_RML_TAG_USNIC_CONNECTIVITY OMPI_RML_TAG_BASE+14 +#define OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY OMPI_RML_TAG_BASE+15 + #define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200 /*