1
1

Add usnic connectivity-checking agent service.

Basically: since usnic is a connectionless transport, we do not get
OS-provided services "for free" that connection-oriented transports
get, namely: "hey, I wasn't able to make a connection to peer X", and
"hey, your connection to peer X has died."
    
This connectivity-checker runs in a separate progress thread in the
usnic BTL in local rank 0 on each server.  Upon first send in any
process, the connectivty-checker agent will send some UDP pings to the
peer to ensure that we can reach it.  If we can't, we'll abort the job
with a nice show_help message.
    
There's a lengthy comment in btl_usnic_connectivity.h explains the
scheme and how it works.

Reviewed by Dave Goodell.

cmr=v1.7.5:ticket=trac:4253

This commit was SVN r30860.

The following Trac tickets were found above:
  Ticket 4253 --> https://svn.open-mpi.org/trac/ompi/ticket/4253
Этот коммит содержится в:
Jeff Squyres 2014-02-26 22:21:25 +00:00
родитель f2043776f6
Коммит 7440f21b75
14 изменённых файлов: 1715 добавлений и 1 удалений

Просмотреть файл

@ -50,6 +50,9 @@ sources = \
btl_usnic_ack.c \ btl_usnic_ack.c \
btl_usnic_ack.h \ btl_usnic_ack.h \
btl_usnic_component.c \ btl_usnic_component.c \
btl_usnic_connectivity.h \
btl_usnic_cclient.c \
btl_usnic_cagent.c \
btl_usnic_endpoint.c \ btl_usnic_endpoint.c \
btl_usnic_endpoint.h \ btl_usnic_endpoint.h \
btl_usnic_frag.c \ btl_usnic_frag.c \

Просмотреть файл

@ -190,8 +190,14 @@ typedef struct ompi_btl_usnic_component_t {
/** retrans characteristics */ /** retrans characteristics */
int retrans_timeout; int retrans_timeout;
/** socket used for rtnetlink queries */
struct usnic_rtnl_sk *unlsk; struct usnic_rtnl_sk *unlsk;
/** convertor packing threshold */
/** connectivity verification: ACK timeout, number of retries
before issue an error/abort the job */
bool connectivity_enabled;
int connectivity_ack_timeout;
int connectivity_num_retries;
} ompi_btl_usnic_component_t; } ompi_btl_usnic_component_t;
OMPI_MODULE_DECLSPEC extern ompi_btl_usnic_component_t mca_btl_usnic_component; OMPI_MODULE_DECLSPEC extern ompi_btl_usnic_component_t mca_btl_usnic_component;

996
ompi/mca/btl/usnic/btl_usnic_cagent.c Обычный файл
Просмотреть файл

@ -0,0 +1,996 @@
/*
* Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include "opal_stdint.h"
#include "opal/threads/mutex.h"
#include "opal/mca/event/event.h"
#include "opal/util/show_help.h"
#include "opal/types.h"
#include "opal/util/output.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/constants.h"
#include "btl_usnic.h"
#include "btl_usnic_connectivity.h"
/**************************************************************************
* Agent data and methods
**************************************************************************/
/*
* Data structure used to proxy messages between the RTE thread and
* the agent thread.
*/
typedef struct {
opal_buffer_t *buffer;
ompi_process_name_t sender;
opal_event_t event;
} agent_buf_name_t;
/*
* Long enough to hold "xxx.xxx.xxx.xxx/xx"
*/
#define IPV4ADDRLEN 20
/*
* Long enough to hold "xx:xx:xx:xx:xx:xx"
*/
#define MACLEN 18
/*
* Holds all the information about a UDP port that the agent thread is
* listening on (for incoming PINGs and ACKs).
*/
typedef struct {
opal_list_item_t super;
/* Data from the LISTEN command message */
uint32_t ipv4_addr;
uint32_t cidrmask;
char ipv4_addr_str[IPV4ADDRLEN];
uint32_t mtu;
char *nodename;
char *if_name;
char *usnic_name;
char mac_str[MACLEN];
/* File descriptor, UDP port, buffer to receive messages, and event */
int fd;
uint32_t udp_port;
uint8_t *buffer;
opal_event_t event;
} agent_udp_port_listener_t;
OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
typedef enum {
AGENT_MSG_TYPE_PING = 17,
AGENT_MSG_TYPE_ACK
} agent_udp_message_type_t;
/*
* Ping and ACK messages
*/
typedef struct {
uint8_t message_type;
/* The sender's IP address and port (i.e., where the ACK can be
sent). This is actually redundant with the sockaddr that we
get from recvfrom(), but that's ok -- it provides a sanity
check. */
uint32_t src_ipv4_addr;
uint32_t src_udp_port;
/* If this is a PING, the message should be this size.
If this is an ACK, we are ACKing a ping of this size. */
uint32_t size;
} agent_udp_message_t;
typedef struct {
opal_list_item_t super;
/* Data from the PING command message */
uint32_t src_ipv4_addr; /* in network byte order */
uint32_t src_udp_port;
agent_udp_port_listener_t *listener;
uint32_t dest_ipv4_addr; /* in network byte order */
uint32_t dest_cidrmask;
uint32_t dest_udp_port;
struct sockaddr_in dest_sockaddr;
char *dest_nodename;
uint8_t dest_mac[6];
/* The sizes and corresponding buffers of the PING messages that
we'll send, and whether each of those PING messages have been
ACKed yet */
#define NUM_PING_SIZES 2
size_t sizes[NUM_PING_SIZES];
uint8_t *buffers[NUM_PING_SIZES];
bool acked[NUM_PING_SIZES];
/* Number of times we've sent this ping */
int num_sends;
/* Timer used to re-send the PING, and whether the timer is active
or not */
opal_event_t timer;
bool timer_active;
} agent_ping_t;
OBJ_CLASS_DECLARATION(agent_ping_t);
/**************************************************************************
* Utility functions, constructors, destructors
**************************************************************************/
static void port_listener_zero(agent_udp_port_listener_t *obj)
{
obj->ipv4_addr =
obj->cidrmask =
obj->mtu = 0;
obj->nodename =
obj->if_name =
obj->usnic_name = NULL;
memset(obj->ipv4_addr_str, 0, sizeof(obj->ipv4_addr_str));
memset(obj->mac_str, 0, sizeof(obj->mac_str));
obj->fd = -1;
obj->udp_port = -1;
obj->buffer = NULL;
}
static void port_listener_constructor(agent_udp_port_listener_t *obj)
{
port_listener_zero(obj);
}
static void port_listener_destructor(agent_udp_port_listener_t *obj)
{
if (-1 != obj->fd) {
close(obj->fd);
}
if (NULL != obj->nodename) {
free(obj->nodename);
}
if (NULL != obj->if_name) {
free(obj->if_name);
}
if (NULL != obj->usnic_name) {
free(obj->usnic_name);
}
if (NULL != obj->buffer) {
free(obj->buffer);
}
port_listener_zero(obj);
}
OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
opal_list_item_t,
port_listener_constructor,
port_listener_destructor);
static void agent_ping_result_zero(agent_ping_t *obj)
{
obj->src_ipv4_addr = 0;
obj->src_udp_port = 0;
obj->listener = NULL;
obj->dest_ipv4_addr = 0;
obj->dest_udp_port = 0;
obj->num_sends = 0;
obj->timer_active = false;
for (int i = 0; i < NUM_PING_SIZES; ++i) {
obj->sizes[i] = 0;
obj->buffers[i] = NULL;
obj->acked[i] = false;
}
}
static void agent_ping_result_constructor(agent_ping_t *obj)
{
agent_ping_result_zero(obj);
}
static void agent_ping_result_destructor(agent_ping_t *obj)
{
for (int i = 0; i < NUM_PING_SIZES; ++i) {
if (NULL != obj->buffers[i]) {
free(obj->buffers[i]);
}
}
if (obj->timer_active) {
opal_event_del(&obj->timer);
}
agent_ping_result_zero(obj);
}
OBJ_CLASS_INSTANCE(agent_ping_t,
opal_list_item_t,
agent_ping_result_constructor,
agent_ping_result_destructor);
/*
* Wrapper around sendto() loop
*/
static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
struct sockaddr *addr)
{
ssize_t rc;
while (1) {
rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
/* Note that since this is UDP, so we don't need to check
for 0 < rc < ap->sizes[i] */
if (rc == numbytes) {
return;
} else if (rc < 0) {
if (errno == EAGAIN || errno == EINTR) {
continue;
}
ABORT("Unexpected sendto() error");
/* Will not return */
}
/* We should never get here, but just in case we do, sleep a
little, just so we don't hammer the CPU */
usleep(1);
}
/* Will not get here */
}
/**************************************************************************
* All of the following functions run in RTE thread
**************************************************************************/
/* This variable belongs to the agent thread, but is safe to access
from the RTE thread (because of its own internal locking) */
static opal_event_base_t *evbase;
/* Note that to avoid locking of the agent thread data structures, the
ORTE thread routines avoid reading/writing those data structures.
Instead, when the ORTE routines receive commands from the client
thread, they basically save the message and queue up an event to
run in the agent thread. */
/* Need to forward declare these functions; they are referenced as
function pointers in the RTE thread functions */
static void agent_thread_cmd_listen(int fd, short flags, void *context);
static void agent_thread_cmd_ping(int fd, short flags, void *context);
/*
* Save the message info and queue it up in an event to run in the
* agent thread.
*/
static void agent_queue_thread_cmd(opal_buffer_t *buffer,
ompi_process_name_t *sender,
opal_event_cbfunc_t callback)
{
agent_buf_name_t *abn = malloc(sizeof(agent_buf_name_t));
if (NULL == abn) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Copy the buffer so that we can have it after the RML receive
callback returns (have pending question in to Ralph as to why
this is necessary, vs. just OBJ_RETAIN'ing the buffer). Note
that copy_payload copies *from the current buffer position*, so
we don't need to re-unpack the command from the new buffer. */
abn->buffer = OBJ_NEW(opal_buffer_t);
if (NULL == abn->buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
opal_dss.copy_payload(abn->buffer, buffer);
abn->sender = *sender;
/* Queue up an immediately-active event in the agent thread */
opal_event_set(evbase, &abn->event, -1, OPAL_EV_WRITE, callback, abn);
opal_event_active(&abn->event, OPAL_EV_WRITE, 1);
}
/*
* Called when we get an incoming RML message
*/
static void agent_rml_receive(int status, ompi_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int32_t command;
/* Unpack and invoke the command */
UNPACK_INT32(buffer, command);
assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
CONNECTIVITY_AGENT_CMD_PING == command);
switch(command) {
case CONNECTIVITY_AGENT_CMD_LISTEN:
agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_listen);
break;
case CONNECTIVITY_AGENT_CMD_PING:
agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_ping);
break;
default:
ABORT("Unexpected connectivity agent command");
break;
}
}
/**************************************************************************
* All of the following functions run in agent thread
**************************************************************************/
static struct timeval ack_timeout;
static opal_list_t listeners;
/* JMS The pings_pending and ping_results should probably both be hash
tables for more efficient lookups */
static opal_list_t pings_pending;
static opal_list_t ping_results;
static volatile bool agent_thread_time_to_exit = false;
/*
* A dummy function invoked in an event just for the purposes of
* waking up the agent main thread (in case it was blocked in the
* event loop with no other events to wake it up).
*/
static void agent_thread_noop(int fd, short flags, void *context)
{
/* Intentionally a no op */
}
/*
* Handle an incoming PING message (send an ACK)
*/
static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
ssize_t numbytes, struct sockaddr *from)
{
/* If the size we received isn't equal to what the sender says it
sent, do the simple thing: just don't send an ACK */
agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
if (msg->size != numbytes) {
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity got bad message; discarded");
return;
}
char src_ipv4_addr_str[IPV4ADDRLEN];
ompi_btl_usnic_snprintf_ipv4_addr(src_ipv4_addr_str,
sizeof(src_ipv4_addr_str),
msg->src_ipv4_addr, 0);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity got PING (size=%ld) from %s; sending ACK",
numbytes, src_ipv4_addr_str);
/* Send back an ACK. No need to allocate a new buffer; just
re-use the same buffer we just got. Note that msg->size is
already set.. */
msg->message_type = AGENT_MSG_TYPE_ACK;
msg->src_ipv4_addr = listener->ipv4_addr;
msg->src_udp_port = listener->udp_port;
agent_sendto(listener->fd, (char*) listener->buffer, sizeof(*msg), from);
}
/*
* Handle an incoming ACK message
*/
static void agent_thread_handle_ack(agent_udp_port_listener_t *listener,
ssize_t numbytes, struct sockaddr *from)
{
/* If we got a wonky ACK message that is the wrong length, just
return */
agent_udp_message_t *msg = (agent_udp_message_t*) listener->buffer;
if (numbytes != sizeof(*msg)) {
return;
}
/* Find the pending ping request that this ACK is for */
agent_ping_t *ap;
OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
if (ap->dest_ipv4_addr == msg->src_ipv4_addr &&
ap->dest_udp_port == msg->src_udp_port) {
/* Found it -- indicate that it has been acked */
for (int i = 0; i < NUM_PING_SIZES; ++i) {
if (ap->sizes[i] == msg->size) {
ap->acked[i] = true;
return;
}
}
}
}
/* If we didn't find the matching ping for this ACK, then just
discard it */
}
/*
* Receive a message from the listening UDP socket
*/
static void agent_thread_receive_ping(int fd, short flags, void *context)
{
agent_udp_port_listener_t *listener =
(agent_udp_port_listener_t *) context;
assert(NULL != listener);
/* Receive the message */
ssize_t numbytes;
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
while (1) {
numbytes = recvfrom(listener->fd, listener->buffer, listener->mtu, 0,
&src_addr, &addrlen);
if (numbytes > 0) {
break;
} else if (numbytes < 0) {
if (errno == EAGAIN || errno == EINTR) {
continue;
}
ABORT("Unexpected error from recvfrom");
/* Will not return */
}
}
agent_udp_message_t *msg;
msg = (agent_udp_message_t *) listener->buffer;
switch (msg->message_type) {
case AGENT_MSG_TYPE_PING:
agent_thread_handle_ping(listener, numbytes, &src_addr);
break;
case AGENT_MSG_TYPE_ACK:
agent_thread_handle_ack(listener, numbytes, &src_addr);
break;
default:
ABORT("Unexpected connectivity ping message type");
break;
}
}
static bool agent_thread_already_listening(uint32_t ipv4_addr,
uint32_t *udp_port)
{
agent_udp_port_listener_t *listener;
OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) {
if (listener->ipv4_addr == ipv4_addr) {
*udp_port = listener->udp_port;
return true;
}
}
return false;
}
/*
* Send an RML reply back from the LISTEN command: send back the IP
* address and UDP port that we're listening on.
*/
static int agent_thread_cmd_listen_reply(ompi_process_name_t *dest,
uint64_t addr, int32_t udp_port)
{
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN);
PACK_UINT32(msg, addr);
PACK_UINT32(msg, udp_port);
int ret;
ret = ompi_rte_send_buffer_nb(dest, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
}
return OMPI_SUCCESS;
}
/*
* The RTE thread will queue up an event to call this function when it
* receives a LISTEN command RML message.
*/
static void agent_thread_cmd_listen(int fd, short flags, void *context)
{
agent_buf_name_t *abn = (agent_buf_name_t*) context;
opal_buffer_t *buffer = abn->buffer;
ompi_process_name_t *sender = &abn->sender;
int ret;
uint32_t ipv4_addr, cidrmask;
UNPACK_UINT32(buffer, ipv4_addr);
UNPACK_UINT32(buffer, cidrmask);
/* If we're already listening on this address, send the UDP port
back to the client. */
uint32_t udp_port;
if (agent_thread_already_listening(ipv4_addr, &udp_port)) {
agent_thread_cmd_listen_reply(sender, ipv4_addr, udp_port);
OBJ_RELEASE(buffer);
free(abn);
return;
}
/* We're not listening on this address already, so create a
listener entry */
agent_udp_port_listener_t *listener = NULL;
listener = OBJ_NEW(agent_udp_port_listener_t);
if (NULL == listener) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Unpack the rest of the message */
UNPACK_UINT32(buffer, listener->mtu);
UNPACK_STRING(buffer, listener->nodename);
UNPACK_STRING(buffer, listener->if_name);
UNPACK_STRING(buffer, listener->usnic_name);
uint8_t mac[6];
UNPACK_BYTES(buffer, mac, sizeof(mac));
listener->ipv4_addr = ipv4_addr;
listener->cidrmask = cidrmask;
/* We're now done with the RTE buffer */
OBJ_RELEASE(buffer);
buffer = NULL;
/* Fill in the ipv4_addr_str and mac_str. Since we don't have the
IPv4 address in sockaddr_in form, it's not worth using
inet_ntop() */
ompi_btl_usnic_snprintf_ipv4_addr(listener->ipv4_addr_str,
sizeof(listener->ipv4_addr_str),
ipv4_addr, cidrmask);
ompi_btl_usnic_sprintf_mac(listener->mac_str, mac);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent listening on %s, (%s/%s)",
listener->ipv4_addr_str,
listener->usnic_name, listener->if_name);
listener->buffer = malloc(listener->mtu);
if (NULL == listener->buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Create the listening socket */
listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
if (listener->fd < 0) {
OMPI_ERROR_LOG(listener->fd);
ABORT("Could not open listening socket");
/* Will not return */
}
/* Bind it to the designated interface */
struct sockaddr_in inaddr;
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = ipv4_addr;
inaddr.sin_port = htons(0);
ret = bind(listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
if (ret < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not bind listening socket");
/* Will not return */
}
/* Find out the port we got */
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
ret = getsockname(listener->fd, (struct sockaddr*) &inaddr, &addrlen);
if (ret < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not get UDP port number from listening socket");
/* Will not return */
}
listener->udp_port = ntohs(inaddr.sin_port);
/* Set the "don't fragment" bit on outgoing frames because we
want MTU-sized messages to get through successfully to the
peer, or fail if they have to fragment because of an MTU
mismatch somewhere enroute */
int val = IP_PMTUDISC_DO;
ret = setsockopt(listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
&val, sizeof(val));
if (0 != ret) {
OMPI_ERROR_LOG(ret);
ABORT("Unable to set \"do not fragment\" on UDP socket");
/* Will not return */
}
/* Set the send and receive buffer sizes to our MTU size */
int temp;
temp = (int) listener->mtu;
if ((ret = setsockopt(listener->fd, SOL_SOCKET, SO_RCVBUF,
&temp, sizeof(temp))) < 0 ||
(ret = setsockopt(listener->fd, SOL_SOCKET, SO_SNDBUF,
&temp, sizeof(temp))) < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not set socket buffer sizes");
/* Will not return */
}
/* Create a listening event */
opal_event_set(evbase, &listener->event, listener->fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_receive_ping, listener);
opal_event_add(&listener->event, 0);
/* Save this listener on the list of listeners */
opal_list_append(&listeners, &listener->super);
/* Return the port number to the sender */
ret = agent_thread_cmd_listen_reply(sender, ipv4_addr, listener->udp_port);
/* All done! */
free(abn);
return;
}
/*
* Send a ping
*/
static void agent_thread_send_ping(int fd, short flags, void *context)
{
agent_ping_t *ap = (agent_ping_t*) context;
ap->timer_active = false;
char dest_ipv4_addr_str[IPV4ADDRLEN];
ompi_btl_usnic_snprintf_ipv4_addr(dest_ipv4_addr_str,
sizeof(dest_ipv4_addr_str),
ap->dest_ipv4_addr, ap->dest_cidrmask);
/* If we got all the ACKs for this ping, then move this ping from
the "pending" list to the "results" list. We can also free the
buffers associated with this ping result, just to save some
space in the long run. */
if (ap->acked[0] && ap->acked[1]) {
opal_list_remove_item(&pings_pending, &ap->super);
opal_list_append(&ping_results, &ap->super);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity GOOD between %s <--> %s",
ap->listener->ipv4_addr_str,
dest_ipv4_addr_str);
for (int i = 0; i < 2; ++i) {
if (NULL != ap->buffers[i]) {
free(ap->buffers[i]);
ap->buffers[i] = NULL;
}
}
return;
}
/* If we've resent too many times, then just abort */
if (ap->num_sends > mca_btl_usnic_component.connectivity_num_retries) {
char *topic;
if (ap->acked[0] && !ap->acked[1]) {
topic = "connectivity error: small ok, large bad";
} else if (!ap->acked[0] && ap->acked[1]) {
topic = "connectivity error: small bad, large ok";
} else {
topic = "connectivity error: small bad, large bad";
}
char mac_str[MACLEN], ipv4_addr_str[IPV4ADDRLEN];
ompi_btl_usnic_snprintf_ipv4_addr(ipv4_addr_str, sizeof(ipv4_addr_str),
ap->dest_ipv4_addr,
ap->dest_cidrmask);
ompi_btl_usnic_sprintf_mac(mac_str, ap->dest_mac);
opal_show_help("help-mpi-btl-usnic.txt", topic, true,
ompi_process_info.nodename,
ap->listener->ipv4_addr_str,
ap->listener->usnic_name,
ap->listener->if_name,
ap->listener->mac_str,
ap->dest_nodename,
ipv4_addr_str,
mac_str,
ap->sizes[0],
ap->sizes[1]);
ompi_btl_usnic_exit();
/* Will not return */
}
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity pinging %s (%s) from %s (%s/%s)",
dest_ipv4_addr_str, ap->dest_nodename,
ap->listener->ipv4_addr_str,
ap->listener->if_name, ap->listener->usnic_name);
/* Send the ping messages to the peer */
for (int i = 0; i < NUM_PING_SIZES; ++i) {
agent_sendto(ap->listener->fd, (char*) ap->buffers[i], ap->sizes[i],
(struct sockaddr*) &ap->dest_sockaddr);
}
/* Set a timer to check if these pings are ACKed */
opal_event_set(evbase, &ap->timer,
-1, 0, agent_thread_send_ping, ap);
opal_event_add(&ap->timer, &ack_timeout);
ap->timer_active = true;
/* Count how many times we've done this */
++ap->num_sends;
}
/*
* The RTE thread will queue up an event to call this function when it
* receives a PING command RML message.
*/
static void agent_thread_cmd_ping(int fd, short flags, void *context)
{
agent_buf_name_t *abn = (agent_buf_name_t*) context;
opal_buffer_t *buffer = abn->buffer;
uint32_t src_ipv4_addr, src_udp_port;
uint32_t dest_ipv4_addr, dest_cidrmask, dest_udp_port, mtu;
uint8_t dest_mac[6];
char *dest_nodename;
UNPACK_UINT32(buffer, src_ipv4_addr);
UNPACK_UINT32(buffer, src_udp_port);
UNPACK_UINT32(buffer, dest_ipv4_addr);
UNPACK_UINT32(buffer, dest_cidrmask);
UNPACK_UINT32(buffer, dest_udp_port);
UNPACK_BYTES(buffer, dest_mac, 6);
UNPACK_UINT32(buffer, mtu);
UNPACK_STRING(buffer, dest_nodename);
/* We're now done with the original RML message buffer */
OBJ_RELEASE(buffer);
free(abn);
/* Have we already pinged this IP address / port? */
agent_ping_t *ap;
OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
if (ap->dest_ipv4_addr == dest_ipv4_addr &&
ap->dest_udp_port == dest_udp_port) {
/* We already have results from pinging this IP address /
port, so there's no need for further action */
return;
}
}
/* Are we in the middle of pinging this IP address / port? */
OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
if (ap->dest_ipv4_addr == dest_ipv4_addr &&
ap->dest_udp_port == dest_udp_port) {
/* We're already in the middle of pinging this IP address
/ port, so there's no need for further action */
return;
}
}
/* This is a new ping request. Find the listener with this source
ipv4 address */
bool found = false;
agent_udp_port_listener_t *listener;
OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) {
if (listener->ipv4_addr == src_ipv4_addr) {
found = true;
break;
}
}
if (!found) {
ABORT("Could not ping listener for ping request");
/* Will not return */
}
/* This is a new ping request; track it */
ap = OBJ_NEW(agent_ping_t);
if (NULL == ap) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
ap->src_ipv4_addr = src_ipv4_addr;
ap->src_udp_port = src_udp_port;
ap->listener = listener;
ap->dest_ipv4_addr = dest_ipv4_addr;
ap->dest_cidrmask = dest_cidrmask;
ap->dest_udp_port = dest_udp_port;
ap->dest_sockaddr.sin_family = AF_INET;
ap->dest_sockaddr.sin_addr.s_addr = dest_ipv4_addr;
ap->dest_sockaddr.sin_port = htons(dest_udp_port);
memcpy(ap->dest_mac, dest_mac, 6);
ap->dest_nodename = dest_nodename;
/* The first message we send will be "short" (a simple control
message); the second will be "long" (i.e., caller-specified
length) */
ap->sizes[0] = sizeof(agent_udp_message_t);
/* Note that the MTU is the max Ethernet frame payload. So from
that MTU, we have to subtract off the max IP header (e.g., if
all IP options are enabled, which is 60 bytes), and then also
subtract off the UDP header (which is 8 bytes). So we need to
subtract off 68 bytes from the MTU, and that's the largest ping
payload we can send. */
ap->sizes[1] = mtu - 68;
/* Allocate a buffer for each size. Make sure the smallest size
is at least sizeof(agent_udp_message_t). */
agent_udp_message_t *msg;
for (size_t i = 0; i < NUM_PING_SIZES; ++i) {
ap->buffers[i] = calloc(1, ap->sizes[i]);
if (NULL == ap->buffers[i]) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Fill in the message with return addressing information */
msg = (agent_udp_message_t*) ap->buffers[i];
msg->message_type = AGENT_MSG_TYPE_PING;
msg->src_ipv4_addr = ap->src_ipv4_addr;
msg->src_udp_port = ap->src_udp_port;
msg->size = ap->sizes[i];
}
/* Save this ping request on the "pending" list */
opal_list_append(&pings_pending, &ap->super);
/* Send the ping */
agent_thread_send_ping(0, 0, ap);
}
/*
* Agent progress thread main entry point
*/
static void *agent_thread_main(opal_object_t *obj)
{
while (!agent_thread_time_to_exit) {
opal_event_loop(evbase, OPAL_EVLOOP_ONCE);
}
return NULL;
}
/**************************************************************************
* All of the following functions run in the main application thread
**************************************************************************/
static bool agent_initialized = false;
static opal_thread_t agent_thread;
/*
* Setup the agent and start its event loop running in a dedicated
* thread
*/
int ompi_btl_usnic_connectivity_agent_init(void)
{
/* Only do this initialization if I am the agent (the agent is
local rank 0) */
if (ompi_process_info.my_local_rank != 0) {
return OMPI_SUCCESS;
}
if (agent_initialized) {
return OMPI_SUCCESS;
}
/* Create the event base */
evbase = opal_event_base_create();
/* Make a struct timeval for use with timer events */
ack_timeout.tv_sec =
mca_btl_usnic_component.connectivity_ack_timeout / 1000;
ack_timeout.tv_usec = mca_btl_usnic_component.connectivity_ack_timeout;
ack_timeout.tv_usec -= ack_timeout.tv_sec * 1000;
/* Create lists */
OBJ_CONSTRUCT(&listeners, opal_list_t);
OBJ_CONSTRUCT(&pings_pending, opal_list_t);
OBJ_CONSTRUCT(&ping_results, opal_list_t);
/********************************************************************
* Once all of the above is setup, launch the RML receives and
* start the event loop.
********************************************************************/
/* Setup the RML receive */
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
OMPI_RML_PERSISTENT,
agent_rml_receive, NULL);
/* Spawn the agent thread event loop */
OBJ_CONSTRUCT(&agent_thread, opal_thread_t);
agent_thread.t_run = agent_thread_main;
agent_thread.t_arg = NULL;
int ret;
ret = opal_thread_start(&agent_thread);
if (OPAL_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
ABORT("Failed to start usNIC agent thread");
/* Will not return */
}
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent initialized");
agent_initialized = true;
return OMPI_SUCCESS;
}
/*
* Shut down the agent
*/
int ompi_btl_usnic_connectivity_agent_finalize(void)
{
agent_initialized = false;
/* Only do this if I have the agent running */
if (NULL == evbase) {
return OMPI_SUCCESS;
}
/* Cancel the RML receive */
ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_USNIC_CONNECTIVITY);
/* Shut down the event loop. Send it a no-op event so that it
wakes up and exits the loop. */
opal_event_t ev;
agent_thread_time_to_exit = true;
opal_event_set(evbase, &ev, -1, OPAL_EV_WRITE, agent_thread_noop, NULL);
opal_event_active(&ev, OPAL_EV_WRITE, 1);
opal_thread_join(&agent_thread, NULL);
/* Shut down all active listeners */
agent_udp_port_listener_t *listener, *lnext;
OPAL_LIST_FOREACH_SAFE(listener, lnext, &listeners,
agent_udp_port_listener_t) {
opal_event_del(&listener->event);
close(listener->fd);
opal_list_remove_item(&listeners, &listener->super);
OBJ_RELEASE(listener);
}
/* Destroy the pending pings and ping results */
agent_ping_t *request, *pnext;
OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
opal_list_remove_item(&pings_pending, &request->super);
OBJ_RELEASE(request);
}
OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
opal_list_remove_item(&ping_results, &request->super);
OBJ_RELEASE(request);
}
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity client finalized");
return OMPI_SUCCESS;
}

213
ompi/mca/btl/usnic/btl_usnic_cclient.c Обычный файл
Просмотреть файл

@ -0,0 +1,213 @@
/*
* Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include "opal_stdint.h"
#include "opal/threads/mutex.h"
#include "opal/mca/event/event.h"
#include "opal/mca/db/db.h"
#include "opal/util/output.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/constants.h"
#include "btl_usnic.h"
#include "btl_usnic_module.h"
#include "btl_usnic_connectivity.h"
/**************************************************************************
* Client-side data and methods
**************************************************************************/
static bool initialized = false;
static ompi_process_name_t agent_name;
typedef struct {
uint32_t addr;
uint32_t udp_port;
bool receive_done;
} client_rml_receive_data_t;
/*
* Receive replies from the agent
*/
static void client_rml_receive(int status, ompi_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int32_t command;
volatile client_rml_receive_data_t *cddr =
(client_rml_receive_data_t*) cbdata;
/* What command is this a reply for? */
UNPACK_INT32(buffer, command);
assert(command == CONNECTIVITY_AGENT_CMD_LISTEN);
UNPACK_UINT32(buffer, cddr->addr);
UNPACK_UINT32(buffer, cddr->udp_port);
/* Tell the main thread that the reply is done */
cddr->receive_done = true;
}
/*
* Startup the agent and share our MCA param values with the it.
*/
int ompi_btl_usnic_connectivity_client_init(void)
{
/* If connectivity checking is not enabled, do nothing */
if (!mca_btl_usnic_component.connectivity_enabled) {
return OMPI_SUCCESS;
}
assert(!initialized);
/* Get the name of the agent */
int ret;
ompi_process_name_t *ptr;
ptr = &agent_name;
ret = ompi_rte_db_fetch(ompi_proc_local_proc, OPAL_DB_LOCALLDR, (void**) &ptr, OPAL_ID_T);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
BTL_ERROR(("usNIC connectivity client unable to db_fetch local leader"));
return ret;
}
initialized = true;
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity client initialized");
return OMPI_SUCCESS;
}
/*
* Send a listen command to the agent
*/
int ompi_btl_usnic_connectivity_listen(ompi_btl_usnic_module_t *module)
{
/* If connectivity checking is not enabled, do nothing */
if (!mca_btl_usnic_component.connectivity_enabled) {
return OMPI_SUCCESS;
}
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Send the LISTEN command. Include enough information for the
agent to be able to print a show_help() message, if
necessary. */
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN);
PACK_UINT32(msg, module->local_addr.ipv4_addr);
PACK_UINT32(msg, module->local_addr.cidrmask);
PACK_UINT32(msg, module->local_addr.mtu);
PACK_STRING(msg, ompi_process_info.nodename);
PACK_STRING(msg, module->if_name);
PACK_STRING(msg, ibv_get_device_name(module->device));
PACK_BYTES(msg, module->local_addr.mac, 6);
/* Post a receive for the agent to reply with the UDP port to me */
volatile client_rml_receive_data_t data;
data.receive_done = false;
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY,
0,
client_rml_receive, (void*) &data);
/* Send it to the agent */
int ret;
ret = ompi_rte_send_buffer_nb(&agent_name, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
}
/* Wait for the reply */
while (!data.receive_done) {
/* Sleep to let the RTE progress thread run */
usleep(1);
}
/* Get the UDP port number that was received */
module->local_addr.connectivity_udp_port = data.udp_port;
return OMPI_SUCCESS;
}
int ompi_btl_usnic_connectivity_ping(uint32_t src_ipv4_addr, int src_port,
uint32_t dest_ipv4_addr,
uint32_t dest_cidrmask, int dest_port,
uint8_t dest_mac[6], char *dest_nodename,
size_t mtu)
{
/* If connectivity checking is not enabled, do nothing */
if (!mca_btl_usnic_component.connectivity_enabled) {
return OMPI_SUCCESS;
}
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_PING);
PACK_UINT32(msg, src_ipv4_addr);
PACK_UINT32(msg, src_port);
PACK_UINT32(msg, dest_ipv4_addr);
PACK_UINT32(msg, dest_cidrmask);
PACK_UINT32(msg, dest_port);
PACK_BYTES(msg, dest_mac, 6);
PACK_UINT32(msg, mtu);
PACK_STRING(msg, dest_nodename);
/* Send it to the agent */
int ret;
ret = ompi_rte_send_buffer_nb(OMPI_PROC_MY_NAME, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
}
return OMPI_SUCCESS;
}
/*
* Shut down the connectivity client
*/
int ompi_btl_usnic_connectivity_client_finalize(void)
{
/* Make it safe to finalize, even if we weren't initialized */
if (!initialized) {
return OMPI_SUCCESS;
}
initialized = false;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -64,6 +64,7 @@
#include "ompi/mca/common/verbs/common_verbs.h" #include "ompi/mca/common/verbs/common_verbs.h"
#include "btl_usnic.h" #include "btl_usnic.h"
#include "btl_usnic_connectivity.h"
#include "btl_usnic_frag.h" #include "btl_usnic_frag.h"
#include "btl_usnic_endpoint.h" #include "btl_usnic_endpoint.h"
#include "btl_usnic_module.h" #include "btl_usnic_module.h"
@ -207,6 +208,12 @@ static int usnic_component_close(void)
usnic_clock_timer_event_set = false; usnic_clock_timer_event_set = false;
} }
/* Finalize the connectivity client and agent */
if (mca_btl_usnic_component.connectivity_enabled) {
ompi_btl_usnic_connectivity_client_finalize();
ompi_btl_usnic_connectivity_agent_finalize();
}
free(mca_btl_usnic_component.usnic_all_modules); free(mca_btl_usnic_component.usnic_all_modules);
free(mca_btl_usnic_component.usnic_active_modules); free(mca_btl_usnic_component.usnic_active_modules);
@ -517,6 +524,14 @@ static mca_btl_base_module_t** usnic_component_init(int* num_btl_modules,
free(mca_btl_usnic_component.vendor_part_ids_string); free(mca_btl_usnic_component.vendor_part_ids_string);
mca_btl_usnic_component.vendor_part_ids_string = NULL; mca_btl_usnic_component.vendor_part_ids_string = NULL;
/* Setup the connectivity checking agent and client. */
if (mca_btl_usnic_component.connectivity_enabled) {
if (OMPI_SUCCESS != ompi_btl_usnic_connectivity_agent_init() ||
OMPI_SUCCESS != ompi_btl_usnic_connectivity_client_init()) {
return NULL;
}
}
/************************************************************************ /************************************************************************
* Below this line, we assume that usnic is loaded on all procs, * Below this line, we assume that usnic is loaded on all procs,
* and therefore we will guarantee to the the modex send, even if * and therefore we will guarantee to the the modex send, even if

296
ompi/mca/btl/usnic/btl_usnic_connectivity.h Обычный файл
Просмотреть файл

@ -0,0 +1,296 @@
/*
* Copyright (c) 2014 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_BTL_USNIC_CONNECTIVITY_H
#define OMPI_BTL_USNIC_CONNECTIVITY_H
#include "ompi_config.h"
#include <netinet/in.h>
#include "opal/util/show_help.h"
#include "ompi/runtime/mpiruntime.h"
#include "btl_usnic_util.h"
/**
* Agent-based service to verify UDP connectivity between two peers.
*
* Generally, it is a client-server pattern with three entities
* involved:
*
* 1. Agent thread: running in MPI process local rank 0
* 2. Client: running in the main application thread in every MPI process
* 3. RTE thread: running in every MPI process
*
* If enabled (via MCA param), the usnic module_init() will setup the
* client (and server on local rank 0). For each usnic module, Each
* client will RML send a request to the server asking it to listen on
* its usnic interface. The agent will discard duplicates and setup a
* single UDP socket listener on the eth interface corresponding to
* each requested usnic interface. The agent returns the listening
* UDP port number to the client, and each client puts this UDP port
* number in their modex information.
*
* At the first send to a given MPI process peer, the client will send
* another request to the server asking it to verify connectivity to
* the peer (supplying the peer's UDP listener port number from the
* peer's modex info). Again, the agent will discard duplicates -- it
* will only verify connectivity to each peer's *server* once. The
* agent will send a short UDP message and a long UDP message
* (basically, the MTU-68 bytes -- see comment in btl_usnic_cagent.c
* for the reasons why) to the listening peer UDP port.
*
* When the peer agent gets PING messages, it sends short ACK control
* messages back to the sending agent. When the sending agent gets
* all ACKs back from the peer, it rules that connectivity is GOOD and
* no further action is taken. If the sending agent doesn't get one
* or both ACKs back in a timely fashion, it re-sends the PING(s) that
* wasn't(weren't) ACKed. Eventually if the sending agent re-sends
* too many times and does not get an ACK back, it gives up, displays
* and error, and aborts the MPI job.
*
* Note that the client/server interaction is intentionally quite
* primitive:
*
* 1. Client requests agent to listen on interface X. Server responds
* with UDP port number of listener.
*
* 2. Client requests ping check to peer Y. Client does not wait for
* the answer; the agent either verifies the connectivity successfully
* or aborts the job.
*
* All client/agent communication is via the RML.
*
* As mentioned above, the agent is smart about discarding duplicate
* ping requests from clients. Since a single agent serves all MPI
* processes on a given server, this cuts down on a lot of PING
* traffic.
*/
/*
* Forward declaration
*/
struct ompi_btl_usnic_module_t;
/** @internal
* This macro just makes the macros below a little easier to read.
*/
#define ABORT(msg) ompi_btl_usnic_util_abort((msg), __FILE__, __LINE__, 1)
/**
* Helper macro to pack binary bytes into RML message buffers.
*
* Note that this macro will call ompi_btl_usnic_util_abort()
* if an error occurs.
*/
#define PACK_BYTES(buffer, value, num_bytes) \
do { \
int ret; \
ret = opal_dss.pack((buffer), &(value), (num_bytes), OPAL_BYTE); \
if (OPAL_SUCCESS != ret) { \
OMPI_ERROR_LOG(ret); \
ABORT("Could not pack"); \
} \
} while(0)
/**
* @internal
*
* Back-end macro for PACK_STRING, PACK_INT32, and PACK_UINT32.
*
* Note that this macro will call ompi_btl_usnic_util_abort()
* if an error occurs.
*/
#define PACK_(buffer, type, opal_type, value) \
do { \
int ret; \
type temp = (type) (value); \
if (OPAL_SUCCESS != \
(ret = opal_dss.pack((buffer), &temp, 1, opal_type))) { \
OMPI_ERROR_LOG(ret); \
ABORT("Could not pack"); \
} \
} while(0)
/**
* Helper macro the pack a string into RML message buffers.
*/
#define PACK_STRING(buffer, value) PACK_(buffer, char *, OPAL_STRING, value)
/**
* Helper macro the pack an int32_ into RML message buffers.
*/
#define PACK_INT32(buffer, value) PACK_(buffer, int32_t, OPAL_INT32, value)
/**
* Helper macro the pack a uint32_t into RML message buffers.
*/
#define PACK_UINT32(buffer, value) PACK_(buffer, uint32_t, OPAL_UINT32, value)
/**
* Helper macro to unpack binary bytes from RML message buffers.
*
* Note that all of these macros will call ompi_btl_usnic_util_abort() if
* an error occurs.
*/
#define UNPACK_BYTES(buffer, value, num_bytes) \
do { \
int ret_value, n = (num_bytes); \
ret_value = opal_dss.unpack((buffer), value, &n, OPAL_BYTE); \
if (OPAL_SUCCESS != ret_value) { \
OMPI_ERROR_LOG(ret_value); \
ABORT("Could not unpack"); \
} \
} while(0)
/**
* @internal
*
* Back-end macro for UNPACK_STRING, UNPACK_INT32, and UNPACK_UINT32
*
* Note that this macro will call ompi_btl_usnic_util_abort() if an error
* occurs.
*/
#define UNPACK(buffer, type, opal_unpack_type, value) \
do { \
int ret_value, n = 1; \
type temp; \
ret_value = opal_dss.unpack((buffer), &temp, &n, opal_unpack_type); \
if (OPAL_SUCCESS != ret_value) { \
OMPI_ERROR_LOG(ret_value); \
ABORT("Could not unpack"); \
} else { \
value = (type) temp; \
} \
} while(0)
/**
* Helper macro to unpack a string from RML message buffers.
*/
#define UNPACK_STRING(buffer, value) \
UNPACK(buffer, char *, OPAL_STRING, value);
/**
* Helper macro to unpack an int32_t from RML message buffers.
*/
#define UNPACK_INT32(buffer, value) \
UNPACK(buffer, int32_t, OPAL_INT32, value);
/**
* Helper macro to unpack a uint32_t from RML message buffers.
*/
#define UNPACK_UINT32(buffer, value) \
UNPACK(buffer, uint32_t, OPAL_UINT32, value)
/**
* RML message types. This value is packed as the first field in each
* RML message to identify its type. Use a non-zero value as the
* first enum just as defensive programming (i.e., it's a slightly
* lower chance that an uninitialized message type would randomly
* match these values).
*/
enum {
CONNECTIVITY_AGENT_CMD_LISTEN = 17,
CONNECTIVITY_AGENT_CMD_PING,
CONNECTIVITY_AGENT_CMD_MAX
};
/**
* Startup the connectivity client.
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* It is safe to call this function even if the connectivity check is
* disabled; it will be a no-op in this case.
*/
int ompi_btl_usnic_connectivity_client_init(void);
/**
* Tell the agent to establsh a listening port on the given IP
* address.
*
* @params[in] module The module that is requesting the listen.
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* The module contains the local interface addressing information,
* which tells the agent one which interface to listen.
*
* This routine will request the new listen from the agent, and wait
* for the agent to reply with the UDP port that is being used/was
* created. The UDP listening port will then be stuffed in
* module->local_addr.connectivity_udp_port (i.e., data that will be
* sent in the modex).
*
* It is safe to call this function even if the connectivity check is
* disabled; it will be a no-op in this case.
*/
int ompi_btl_usnic_connectivity_listen(struct ompi_btl_usnic_module_t *module);
/**
* Tell the agent to ping a specific IP address and UDP port number
* with a specific message size.
*
* @param[in] src_ipv4_addr The source module IPv4 address
* @param[in] src_port The source module listening UDP port
* @param[in] dest_ipv4_addr The destination IPv4 address
* @param[in] dest_cidrmask The destination CIDR mask
* @param[in] dest_port The destination UDP port
* @param[in] dest_mac The destination MAC address
* @param[in] dest_nodename The destination server name
* @param[in] mtu The max ping message size to send
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* Note that several of the above parameters are only passed so that
* they can be used in a complete/helpful error message, if necessary.
*
* This function does not wait for a reply from the agent; it assumes
* the agent will successfully ping the remote peer or will abort the
* MPI job if the pinging fails.
*
* It is safe to call this function even if the connectivity check is
* disabled; it will be a no-op in this case.
*/
int ompi_btl_usnic_connectivity_ping(uint32_t src_ipv4_addr, int src_port,
uint32_t dest_ipv4_addr,
uint32_t dest_cidrmask, int dest_port,
uint8_t *dest_mac, char *dest_nodename,
size_t mtu);
/**
* Shut down the connectivity service client.
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* It is safe to call this function even if the connectivity check is
* disabled; it will be a no-op in this case.
*/
int ompi_btl_usnic_connectivity_client_finalize(void);
/**
* Startup the connectivity agent.
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* This function will be a no-op if this process is not the local rank
* 0.
*/
int ompi_btl_usnic_connectivity_agent_init(void);
/**
* Shut down the connectivity agent
*
* @returns OMPI_SUCCESS or an OMPI error code.
*
* This function will be a no-op if this process is not the local rank
* 0.
*/
int ompi_btl_usnic_connectivity_agent_finalize(void);
#endif /* OMPI_BTL_USNIC_CONNECITIVITY_H */

Просмотреть файл

@ -53,6 +53,7 @@ static void endpoint_construct(mca_btl_base_endpoint_t* endpoint)
endpoint->endpoint_proc = NULL; endpoint->endpoint_proc = NULL;
endpoint->endpoint_proc_index = -1; endpoint->endpoint_proc_index = -1;
endpoint->endpoint_exiting = false; endpoint->endpoint_exiting = false;
endpoint->endpoint_connectivity_checked = false;
for (i=0; i<USNIC_NUM_CHANNELS; ++i) { for (i=0; i<USNIC_NUM_CHANNELS; ++i) {
endpoint->endpoint_remote_addr.qp_num[i] = 0; endpoint->endpoint_remote_addr.qp_num[i] = 0;

Просмотреть файл

@ -70,6 +70,7 @@ typedef struct ompi_btl_usnic_addr_t {
union ibv_gid gid; union ibv_gid gid;
uint32_t ipv4_addr; uint32_t ipv4_addr;
uint32_t cidrmask; uint32_t cidrmask;
uint32_t connectivity_udp_port;
uint8_t mac[6]; uint8_t mac[6];
int mtu; int mtu;
uint32_t link_speed_mbps; uint32_t link_speed_mbps;
@ -170,6 +171,8 @@ typedef struct mca_btl_base_endpoint_t {
bool endpoint_rcvd_segs[WINDOW_SIZE]; bool endpoint_rcvd_segs[WINDOW_SIZE];
uint32_t endpoint_rfstart; uint32_t endpoint_rfstart;
bool endpoint_connectivity_checked;
} mca_btl_base_endpoint_t; } mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t ompi_btl_usnic_endpoint_t; typedef mca_btl_base_endpoint_t ompi_btl_usnic_endpoint_t;

Просмотреть файл

@ -126,6 +126,28 @@ static int reg_int(const char* param_name,
} }
/*
* utility routine for integer parameter registration
*/
static int reg_bool(const char* param_name,
const char* help_string,
bool default_value, bool *storage, int level)
{
*storage = default_value;
mca_base_component_var_register(&mca_btl_usnic_component.super.btl_version,
param_name, help_string,
MCA_BASE_VAR_TYPE_BOOL,
NULL,
0,
0,
level,
MCA_BASE_VAR_SCOPE_READONLY,
storage);
return OMPI_SUCCESS;
}
int ompi_btl_usnic_component_register(void) int ompi_btl_usnic_component_register(void)
{ {
int tmp, ret = 0; int tmp, ret = 0;
@ -249,6 +271,28 @@ int ompi_btl_usnic_component_register(void)
ompi_btl_usnic_module_template.super.btl_bandwidth = 0; ompi_btl_usnic_module_template.super.btl_bandwidth = 0;
ompi_btl_usnic_module_template.super.btl_latency = 4; ompi_btl_usnic_module_template.super.btl_latency = 4;
/* Connectivity verification */
mca_btl_usnic_component.connectivity_enabled = true;
CHECK(reg_bool("connectivity_check",
"Whether to enable the usNIC connectivity check upon first send (default = 1, enabled; 0 = disabled)",
mca_btl_usnic_component.connectivity_enabled,
&mca_btl_usnic_component.connectivity_enabled,
OPAL_INFO_LVL_3));
mca_btl_usnic_component.connectivity_ack_timeout = 1000;
CHECK(reg_int("connectivity_ack_timeout",
"Timeout, in milliseconds, while waiting for an ACK while verification connectivity between usNIC devices. If 0, the connectivity check is disabled (must be >=0).",
mca_btl_usnic_component.connectivity_ack_timeout,
&mca_btl_usnic_component.connectivity_ack_timeout,
REGINT_GE_ZERO, OPAL_INFO_LVL_3));
mca_btl_usnic_component.connectivity_num_retries = 10;
CHECK(reg_int("connectivity_error_num_retries",
"Number of times to retry usNIC connectivity verification before aborting the MPI job (must be >0).",
mca_btl_usnic_component.connectivity_num_retries,
&mca_btl_usnic_component.connectivity_num_retries,
REGINT_GE_ONE, OPAL_INFO_LVL_3));
/* Register some synonyms to the ompi common verbs component */ /* Register some synonyms to the ompi common verbs component */
ompi_common_verbs_mca_register(&mca_btl_usnic_component.super.btl_version); ompi_common_verbs_mca_register(&mca_btl_usnic_component.super.btl_version);

Просмотреть файл

@ -41,6 +41,7 @@
#include "ompi/memchecker.h" #include "ompi/memchecker.h"
#include "btl_usnic.h" #include "btl_usnic.h"
#include "btl_usnic_connectivity.h"
#include "btl_usnic_frag.h" #include "btl_usnic_frag.h"
#include "btl_usnic_proc.h" #include "btl_usnic_proc.h"
#include "btl_usnic_endpoint.h" #include "btl_usnic_endpoint.h"
@ -799,6 +800,20 @@ usnic_prepare_src(
size_t osize = *size; size_t osize = *size;
#endif #endif
/* Do we need to check the connectivity? */
if (mca_btl_usnic_component.connectivity_enabled &&
OPAL_UNLIKELY(!endpoint->endpoint_connectivity_checked)) {
ompi_btl_usnic_connectivity_ping(module->local_addr.ipv4_addr,
module->local_addr.connectivity_udp_port,
endpoint->endpoint_remote_addr.ipv4_addr,
endpoint->endpoint_remote_addr.cidrmask,
endpoint->endpoint_remote_addr.connectivity_udp_port,
endpoint->endpoint_remote_addr.mac,
endpoint->endpoint_proc->proc_ompi->proc_hostname,
endpoint->endpoint_remote_addr.mtu);
endpoint->endpoint_connectivity_checked = true;
}
/* /*
* if total payload len fits in one MTU use small send, else large * if total payload len fits in one MTU use small send, else large
*/ */
@ -1929,6 +1944,19 @@ int ompi_btl_usnic_module_init(ompi_btl_usnic_module_t *module)
"btl:usnic: not sorting devices by NUMA distance (topology support not included)"); "btl:usnic: not sorting devices by NUMA distance (topology support not included)");
#endif #endif
/* Setup a connectivity listener */
if (mca_btl_usnic_component.connectivity_enabled) {
rc = ompi_btl_usnic_connectivity_listen(module);
if (OMPI_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
ABORT("Failed to notify connectivity agent to listen");
}
} else {
/* If we're not doing a connectivity check, just set the port
to 0 */
module->local_addr.connectivity_udp_port = 0;
}
/* Setup the pointer array for the procs that will be used by this /* Setup the pointer array for the procs that will be used by this
module */ module */
OBJ_CONSTRUCT(&module->all_procs, opal_pointer_array_t); OBJ_CONSTRUCT(&module->all_procs, opal_pointer_array_t);

Просмотреть файл

@ -66,6 +66,33 @@ ompi_btl_usnic_dump_hex(uint8_t *addr, int len)
} }
/*
* Trivial wrapper around snprintf'ing an IPv4 address, with or
* without a CIDR mask (we don't usually carry around addresses in
* struct sockaddr form, so this wrapper is marginally easier than
* using inet_ntop()).
*/
void ompi_btl_usnic_snprintf_ipv4_addr(char *out, size_t maxlen,
uint32_t addr, uint32_t cidrmask)
{
uint8_t *p = (uint8_t*) &addr;
if (cidrmask > 0) {
snprintf(out, maxlen, "%u.%u.%u.%u/%u",
p[0],
p[1],
p[2],
p[3],
cidrmask);
} else {
snprintf(out, maxlen, "%u.%u.%u.%u",
p[0],
p[1],
p[2],
p[3]);
}
}
void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6]) void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6])
{ {
snprintf(out, 32, "%02x:%02x:%02x:%02x:%02x:%02x", snprintf(out, 32, "%02x:%02x:%02x:%02x:%02x:%02x",

Просмотреть файл

@ -81,6 +81,13 @@ usnic_convertor_pack_simple(
*/ */
void ompi_btl_usnic_exit(void); void ompi_btl_usnic_exit(void);
/*
* If cidrmask==0, it is not included in the output string. addr is
* expected to be in network byte order.
*/
void ompi_btl_usnic_snprintf_ipv4_addr(char *out, size_t maxlen,
uint32_t addr, uint32_t cidrmask);
void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6]); void ompi_btl_usnic_sprintf_mac(char *out, const uint8_t mac[6]);
void ompi_btl_usnic_sprintf_gid_mac(char *out, union ibv_gid *gid); void ompi_btl_usnic_sprintf_gid_mac(char *out, union ibv_gid *gid);

Просмотреть файл

@ -198,3 +198,75 @@ The usnic BTL failed to initialize the rtnetlink query subsystem.
Server: %s Server: %s
Error message: %s Error message: %s
#
[connectivity error: small ok, large bad]
The Open MPI usNIC BTL was unable to establish full connectivity
between at least one pair of servers in the MPI job. Specifically,
small UDP messages seem to flow between the servers, but large UDP
messages do not.
Your MPI job is going to abort now.
Source:
Hostname / IP: %s (%s)
Host interfaces: %s / %s
MAC address: %s
Destination:
Hostname / IP: %s (%s)
MAC address: %s
Small message size: %u
Large message size: %u
Note that this behavior usually indicates that the MTU of some network
link is too small between these two servers. You should verify that
UDP traffic with payloads up to the "large message size" listed above
can flow between these two servers.
#
[connectivity error: small bad, large ok]
The Open MPI usNIC BTL was unable to establish full connectivity
between at least one pair of servers in the MPI job. Specifically,
large UDP messages seem to flow between the servers, but small UDP
messages do not.
Your MPI job is going to abort now.
Source:
Hostname / IP: %s (%s)
Host interfaces: %s / %s
MAC address: %s
Destination:
Hostname / IP: %s (%s)
MAC address: %s
Small message size: %u
Large message size: %u
This is a very strange network error, and should not occur in most
situations. You may be experiencing high amounts of congestion, or
this may indicate some kind of network misconfiguration. You should
verify that UDP traffic with payloads up to the "large message size"
listed above can flow between these two servers.
#
[connectivity error: small bad, large bad]
The Open MPI usNIC BTL was unable to establish any connectivity
between at least one pair of servers in the MPI job. Specifically,
no UDP messages seemed to flow between these two servers.
Your MPI job is going to abort now.
Source:
Hostname / IP: %s (%s)
Host interfaces: %s / %s
MAC address: %s
Destination:
Hostname / IP: %s (%s)
MAC address: %s
Small message size: %u
Large message size: %u
Note that this behavior usually indicates some kind of network
misconfiguration. You should verify that UDP traffic with payloads up
to the "large message size" listed above can flow between these two
servers.

Просмотреть файл

@ -223,6 +223,9 @@ BEGIN_C_DECLS
#define OMPI_RML_PCONNECT_TAG OMPI_RML_TAG_BASE+13 #define OMPI_RML_PCONNECT_TAG OMPI_RML_TAG_BASE+13
#define OMPI_RML_TAG_USNIC_CONNECTIVITY OMPI_RML_TAG_BASE+14
#define OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY OMPI_RML_TAG_BASE+15
#define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200 #define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200
/* /*