1
1

usnic: update cclient/cagent to use unix domain sockets (not RML)

In preparation for moving the BTLs down to OPAL, discontinue the use
of the RML for connectivity client/agent communication.  Instead, use
local unix domain sockets in the job session directory (all
communication is between processes on the same server, so unix domain
sockets are fine).

This commit was SVN r31710.
Этот коммит содержится в:
Jeff Squyres 2014-05-09 20:35:36 +00:00
родитель 3de7bb61cb
Коммит e37c7af0fb
3 изменённых файлов: 571 добавлений и 489 удалений

Просмотреть файл

@ -12,7 +12,9 @@
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <alloca.h>
#include "opal_stdint.h"
#include "opal/threads/mutex.h"
@ -20,6 +22,7 @@
#include "opal/util/show_help.h"
#include "opal/types.h"
#include "opal/util/output.h"
#include "opal/util/fd.h"
#include "ompi/mca/rte/rte.h"
#include "ompi/runtime/mpiruntime.h"
@ -32,16 +35,6 @@
* Agent data and methods
**************************************************************************/
/*
* Data structure used to proxy messages between the RTE thread and
* the agent thread.
*/
typedef struct {
opal_buffer_t *buffer;
ompi_process_name_t sender;
opal_event_t event;
} agent_buf_name_t;
/*
* Long enough to hold "xxx.xxx.xxx.xxx/xx"
*/
@ -52,6 +45,23 @@ typedef struct {
*/
#define MACLEN 18
/*
* Local variables
*/
static int ipc_accept_fd = -1;
static char *ipc_filename = NULL;
static opal_event_t ipc_event;
static struct timeval ack_timeout;
static opal_list_t udp_port_listeners;
static opal_list_t ipc_listeners;
/* JMS The pings_pending and ping_results should probably both be hash
tables for more efficient lookups */
static opal_list_t pings_pending;
static opal_list_t ping_results;
static volatile bool agent_thread_time_to_exit = false;
static opal_event_base_t *evbase = NULL;
/*
* Holds all the information about a UDP port that the agent thread is
* listening on (for incoming PINGs and ACKs).
@ -74,10 +84,25 @@ typedef struct {
uint32_t udp_port;
uint8_t *buffer;
opal_event_t event;
bool active;
} agent_udp_port_listener_t;
OBJ_CLASS_DECLARATION(agent_udp_port_listener_t);
/*
* Holds information for a local IPC socket fd (i.e., a connection
* from a local process to this agent).
*/
typedef struct {
opal_list_item_t super;
int client_fd;
opal_event_t event;
bool active;
} agent_ipc_listener_t;
OBJ_CLASS_DECLARATION(agent_ipc_listener_t);
typedef enum {
AGENT_MSG_TYPE_PING = 17,
AGENT_MSG_TYPE_ACK
@ -139,7 +164,7 @@ OBJ_CLASS_DECLARATION(agent_ping_t);
* Utility functions, constructors, destructors
**************************************************************************/
static void port_listener_zero(agent_udp_port_listener_t *obj)
static void udp_port_listener_zero(agent_udp_port_listener_t *obj)
{
obj->ipv4_addr =
obj->cidrmask =
@ -153,14 +178,16 @@ static void port_listener_zero(agent_udp_port_listener_t *obj)
obj->fd = -1;
obj->udp_port = -1;
obj->buffer = NULL;
obj->active = false;
}
static void port_listener_constructor(agent_udp_port_listener_t *obj)
static void udp_port_listener_constructor(agent_udp_port_listener_t *obj)
{
port_listener_zero(obj);
udp_port_listener_zero(obj);
}
static void port_listener_destructor(agent_udp_port_listener_t *obj)
static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
{
if (-1 != obj->fd) {
close(obj->fd);
@ -178,13 +205,52 @@ static void port_listener_destructor(agent_udp_port_listener_t *obj)
free(obj->buffer);
}
port_listener_zero(obj);
/* If the "active" flag is set, then the event is active and the
item is on the ipc_listeners list */
if (obj->active) {
opal_event_del(&obj->event);
opal_list_remove_item(&udp_port_listeners, &obj->super);
}
udp_port_listener_zero(obj);
}
OBJ_CLASS_INSTANCE(agent_udp_port_listener_t,
opal_list_item_t,
port_listener_constructor,
port_listener_destructor);
udp_port_listener_constructor,
udp_port_listener_destructor);
static void ipc_listener_zero(agent_ipc_listener_t *obj)
{
obj->client_fd = -1;
obj->active = false;
}
static void ipc_listener_constructor(agent_ipc_listener_t *obj)
{
ipc_listener_zero(obj);
}
static void ipc_listener_destructor(agent_ipc_listener_t *obj)
{
if (-1 != obj->client_fd) {
close(obj->client_fd);
}
/* If the "active" flag is set, then the event is active and the
item is on the ipc_listeners list */
if (obj->active) {
opal_event_del(&obj->event);
opal_list_remove_item(&ipc_listeners, &obj->super);
}
ipc_listener_zero(obj);
}
OBJ_CLASS_INSTANCE(agent_ipc_listener_t,
opal_list_item_t,
ipc_listener_constructor,
ipc_listener_destructor);
static void agent_ping_result_zero(agent_ping_t *obj)
{
@ -237,7 +303,7 @@ static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
while (1) {
rc = sendto(fd, buffer, numbytes, 0, addr, sizeof(*addr));
/* Note that since this is UDP, so we don't need to check
for 0 < rc < ap->sizes[i] */
for 0 < rc < numbytes */
if (rc == numbytes) {
return;
} else if (rc < 0) {
@ -257,99 +323,10 @@ static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
/* Will not get here */
}
/**************************************************************************
* All of the following functions run in RTE thread
**************************************************************************/
/* This variable belongs to the agent thread, but is safe to access
from the RTE thread (because of its own internal locking) */
static opal_event_base_t *evbase;
/* Note that to avoid locking of the agent thread data structures, the
ORTE thread routines avoid reading/writing those data structures.
Instead, when the ORTE routines receive commands from the client
thread, they basically save the message and queue up an event to
run in the agent thread. */
/* Need to forward declare these functions; they are referenced as
function pointers in the RTE thread functions */
static void agent_thread_cmd_listen(int fd, short flags, void *context);
static void agent_thread_cmd_ping(int fd, short flags, void *context);
/*
* Save the message info and queue it up in an event to run in the
* agent thread.
*/
static void agent_queue_thread_cmd(opal_buffer_t *buffer,
ompi_process_name_t *sender,
opal_event_cbfunc_t callback)
{
agent_buf_name_t *abn = malloc(sizeof(agent_buf_name_t));
if (NULL == abn) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Copy the buffer so that we can have it after the RML receive
callback returns (have pending question in to Ralph as to why
this is necessary, vs. just OBJ_RETAIN'ing the buffer). Note
that copy_payload copies *from the current buffer position*, so
we don't need to re-unpack the command from the new buffer. */
abn->buffer = OBJ_NEW(opal_buffer_t);
if (NULL == abn->buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
opal_dss.copy_payload(abn->buffer, buffer);
abn->sender = *sender;
/* Queue up an immediately-active event in the agent thread */
opal_event_set(evbase, &abn->event, -1, OPAL_EV_WRITE, callback, abn);
opal_event_active(&abn->event, OPAL_EV_WRITE, 1);
}
/*
* Called when we get an incoming RML message
*/
static void agent_rml_receive(int status, ompi_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int32_t command;
/* Unpack and invoke the command */
UNPACK_INT32(buffer, command);
assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
CONNECTIVITY_AGENT_CMD_PING == command);
switch(command) {
case CONNECTIVITY_AGENT_CMD_LISTEN:
agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_listen);
break;
case CONNECTIVITY_AGENT_CMD_PING:
agent_queue_thread_cmd(buffer, sender, agent_thread_cmd_ping);
break;
default:
ABORT("Unexpected connectivity agent command");
break;
}
}
/**************************************************************************
* All of the following functions run in agent thread
**************************************************************************/
static struct timeval ack_timeout;
static opal_list_t listeners;
/* JMS The pings_pending and ping_results should probably both be hash
tables for more efficient lookups */
static opal_list_t pings_pending;
static opal_list_t ping_results;
static volatile bool agent_thread_time_to_exit = false;
/*
* A dummy function invoked in an event just for the purposes of
* waking up the agent main thread (in case it was blocked in the
@ -390,7 +367,7 @@ static void agent_thread_handle_ping(agent_udp_port_listener_t *listener,
/* Send back an ACK. No need to allocate a new buffer; just
re-use the same buffer we just got. Note that msg->size is
already set.. */
already set. */
msg->message_type = AGENT_MSG_TYPE_ACK;
msg->src_ipv4_addr = listener->ipv4_addr;
msg->src_udp_port = listener->udp_port;
@ -494,7 +471,7 @@ static bool agent_thread_already_listening(uint32_t ipv4_addr,
uint32_t *udp_port)
{
agent_udp_port_listener_t *listener;
OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) {
OPAL_LIST_FOREACH(listener, &udp_port_listeners, agent_udp_port_listener_t) {
if (listener->ipv4_addr == ipv4_addr) {
*udp_port = listener->udp_port;
return true;
@ -505,111 +482,90 @@ static bool agent_thread_already_listening(uint32_t ipv4_addr,
}
/*
* Send an RML reply back from the LISTEN command: send back the IP
* address and UDP port that we're listening on.
* Send reply back from the LISTEN command: send back the IP address
* and UDP port that we're listening on.
*/
static int agent_thread_cmd_listen_reply(ompi_process_name_t *dest,
uint64_t addr, int32_t udp_port)
static int agent_thread_cmd_listen_reply(int fd,
uint32_t addr, int32_t udp_port)
{
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN);
PACK_UINT32(msg, addr);
PACK_UINT32(msg, udp_port);
int ret;
ret = ompi_rte_send_buffer_nb(dest, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
ompi_btl_usnic_connectivity_cmd_listen_reply_t cmd = {
.cmd = CONNECTIVITY_AGENT_CMD_LISTEN,
.ipv4_addr = addr,
.udp_port = udp_port
};
ret = opal_fd_write(fd, sizeof(cmd), &cmd);
if (OPAL_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
ABORT("usnic connectivity agent IPC write failed");
/* Will not return */
}
return OMPI_SUCCESS;
}
/*
* The RTE thread will queue up an event to call this function when it
* receives a LISTEN command RML message.
* Receive and process the rest of a LISTEN command from a local IPC
* client.
*/
static void agent_thread_cmd_listen(int fd, short flags, void *context)
static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
{
agent_buf_name_t *abn = (agent_buf_name_t*) context;
opal_buffer_t *buffer = abn->buffer;
ompi_process_name_t *sender = &abn->sender;
/* Read the rest of the LISTEN command from the IPC socket */
int ret;
uint32_t ipv4_addr, cidrmask;
UNPACK_UINT32(buffer, ipv4_addr);
UNPACK_UINT32(buffer, cidrmask);
ompi_btl_usnic_connectivity_cmd_listen_t cmd;
ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
if (OPAL_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
ABORT("usnic connectivity agent IPC LISTEN read failed");
/* Will not return */
}
/* If we're already listening on this address, send the UDP port
back to the client. */
uint32_t udp_port;
if (agent_thread_already_listening(ipv4_addr, &udp_port)) {
agent_thread_cmd_listen_reply(sender, ipv4_addr, udp_port);
OBJ_RELEASE(buffer);
free(abn);
if (agent_thread_already_listening(cmd.ipv4_addr, &udp_port)) {
agent_thread_cmd_listen_reply(ipc_listener->client_fd,
cmd.ipv4_addr, udp_port);
return;
}
/* We're not listening on this address already, so create a
listener entry */
agent_udp_port_listener_t *listener = NULL;
listener = OBJ_NEW(agent_udp_port_listener_t);
if (NULL == listener) {
/* We're not listening on this interface already, so create a
UDP port listener entry */
agent_udp_port_listener_t *udp_listener = NULL;
udp_listener = OBJ_NEW(agent_udp_port_listener_t);
if (NULL == udp_listener) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Unpack the rest of the message */
UNPACK_UINT32(buffer, listener->mtu);
UNPACK_STRING(buffer, listener->nodename);
UNPACK_STRING(buffer, listener->if_name);
UNPACK_STRING(buffer, listener->usnic_name);
uint8_t mac[6];
UNPACK_BYTES(buffer, mac, sizeof(mac));
listener->ipv4_addr = ipv4_addr;
listener->cidrmask = cidrmask;
/* We're now done with the RTE buffer */
OBJ_RELEASE(buffer);
buffer = NULL;
udp_listener->mtu = cmd.mtu;
udp_listener->ipv4_addr = cmd.ipv4_addr;
udp_listener->cidrmask = cmd.cidrmask;
udp_listener->if_name = strdup(cmd.if_name);
udp_listener->usnic_name = strdup(cmd.usnic_name);
/* Fill in the ipv4_addr_str and mac_str. Since we don't have the
IPv4 address in sockaddr_in form, it's not worth using
inet_ntop() */
ompi_btl_usnic_snprintf_ipv4_addr(listener->ipv4_addr_str,
sizeof(listener->ipv4_addr_str),
ipv4_addr, cidrmask);
ompi_btl_usnic_sprintf_mac(listener->mac_str, mac);
ompi_btl_usnic_snprintf_ipv4_addr(udp_listener->ipv4_addr_str,
sizeof(udp_listener->ipv4_addr_str),
cmd.ipv4_addr, cmd.cidrmask);
ompi_btl_usnic_sprintf_mac(udp_listener->mac_str, cmd.mac);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent listening on %s, (%s/%s)",
listener->ipv4_addr_str,
listener->usnic_name, listener->if_name);
listener->buffer = malloc(listener->mtu);
if (NULL == listener->buffer) {
udp_listener->buffer = malloc(udp_listener->mtu);
if (NULL == udp_listener->buffer) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
/* Create the listening socket */
listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
if (listener->fd < 0) {
OMPI_ERROR_LOG(listener->fd);
udp_listener->fd = socket(AF_INET, SOCK_DGRAM, 0);
if (udp_listener->fd < 0) {
OMPI_ERROR_LOG(udp_listener->fd);
ABORT("Could not open listening socket");
/* Will not return */
}
@ -618,10 +574,10 @@ static void agent_thread_cmd_listen(int fd, short flags, void *context)
struct sockaddr_in inaddr;
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = ipv4_addr;
inaddr.sin_addr.s_addr = cmd.ipv4_addr;
inaddr.sin_port = htons(0);
ret = bind(listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
ret = bind(udp_listener->fd, (struct sockaddr*) &inaddr, sizeof(inaddr));
if (ret < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not bind listening socket");
@ -630,20 +586,26 @@ static void agent_thread_cmd_listen(int fd, short flags, void *context)
/* Find out the port we got */
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
ret = getsockname(listener->fd, (struct sockaddr*) &inaddr, &addrlen);
ret = getsockname(udp_listener->fd, (struct sockaddr*) &inaddr, &addrlen);
if (ret < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not get UDP port number from listening socket");
/* Will not return */
}
listener->udp_port = ntohs(inaddr.sin_port);
udp_listener->udp_port = ntohs(inaddr.sin_port);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent listening on %s:%d, (%s/%s)",
udp_listener->ipv4_addr_str,
udp_listener->udp_port,
udp_listener->usnic_name, udp_listener->if_name);
/* Set the "don't fragment" bit on outgoing frames because we
want MTU-sized messages to get through successfully to the
peer, or fail if they have to fragment because of an MTU
mismatch somewhere enroute */
int val = IP_PMTUDISC_DO;
ret = setsockopt(listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
ret = setsockopt(udp_listener->fd, IPPROTO_IP, IP_MTU_DISCOVER,
&val, sizeof(val));
if (0 != ret) {
OMPI_ERROR_LOG(ret);
@ -653,10 +615,10 @@ static void agent_thread_cmd_listen(int fd, short flags, void *context)
/* Set the send and receive buffer sizes to our MTU size */
int temp;
temp = (int) listener->mtu;
if ((ret = setsockopt(listener->fd, SOL_SOCKET, SO_RCVBUF,
temp = (int) udp_listener->mtu;
if ((ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_RCVBUF,
&temp, sizeof(temp))) < 0 ||
(ret = setsockopt(listener->fd, SOL_SOCKET, SO_SNDBUF,
(ret = setsockopt(udp_listener->fd, SOL_SOCKET, SO_SNDBUF,
&temp, sizeof(temp))) < 0) {
OMPI_ERROR_LOG(ret);
ABORT("Could not set socket buffer sizes");
@ -664,19 +626,23 @@ static void agent_thread_cmd_listen(int fd, short flags, void *context)
}
/* Create a listening event */
opal_event_set(evbase, &listener->event, listener->fd,
opal_event_set(evbase, &udp_listener->event, udp_listener->fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_receive_ping, listener);
opal_event_add(&listener->event, 0);
agent_thread_receive_ping, udp_listener);
opal_event_add(&udp_listener->event, 0);
/* Save this listener on the list of listeners */
opal_list_append(&listeners, &listener->super);
/* Save this listener on the list of udp_port_listeners */
opal_list_append(&udp_port_listeners, &udp_listener->super);
udp_listener->active = true;
/* Return the port number to the sender */
ret = agent_thread_cmd_listen_reply(sender, ipv4_addr, listener->udp_port);
ret = agent_thread_cmd_listen_reply(ipc_listener->client_fd,
cmd.ipv4_addr, udp_listener->udp_port);
/* All done! */
free(abn);
opal_output_verbose(20, USNIC_OUT,
"====== usNIC connectivity agent LISTEN all setup");
return;
}
@ -753,11 +719,15 @@ static void agent_thread_send_ping(int fd, short flags, void *context)
/* Will not return */
}
time_t t = time(NULL);
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity pinging %s (%s) from %s (%s/%s)",
dest_ipv4_addr_str, ap->dest_nodename,
"usNIC connectivity pinging %s:%d (%s) from %s (%s/%s) at %s",
dest_ipv4_addr_str,
ntohs(ap->dest_sockaddr.sin_port),
ap->dest_nodename,
ap->listener->ipv4_addr_str,
ap->listener->if_name, ap->listener->usnic_name);
ap->listener->if_name, ap->listener->usnic_name,
ctime(&t));
/* Send the ping messages to the peer */
for (int i = 0; i < NUM_PING_SIZES; ++i) {
@ -776,46 +746,38 @@ static void agent_thread_send_ping(int fd, short flags, void *context)
}
/*
* The RTE thread will queue up an event to call this function when it
* receives a PING command RML message.
* Receive and process the rest of a PING command from a local IPC
* client.
*/
static void agent_thread_cmd_ping(int fd, short flags, void *context)
static void agent_thread_cmd_ping(agent_ipc_listener_t *ipc_listener)
{
agent_buf_name_t *abn = (agent_buf_name_t*) context;
opal_buffer_t *buffer = abn->buffer;
uint32_t src_ipv4_addr, src_udp_port;
uint32_t dest_ipv4_addr, dest_cidrmask, dest_udp_port, mtu;
uint8_t dest_mac[6];
char *dest_nodename;
UNPACK_UINT32(buffer, src_ipv4_addr);
UNPACK_UINT32(buffer, src_udp_port);
UNPACK_UINT32(buffer, dest_ipv4_addr);
UNPACK_UINT32(buffer, dest_cidrmask);
UNPACK_UINT32(buffer, dest_udp_port);
UNPACK_BYTES(buffer, dest_mac, 6);
UNPACK_UINT32(buffer, mtu);
UNPACK_STRING(buffer, dest_nodename);
/* We're now done with the original RML message buffer */
OBJ_RELEASE(buffer);
free(abn);
/* Read the rest of the PING command from the IPC socket */
int ret;
ompi_btl_usnic_connectivity_cmd_ping_t cmd;
ret = opal_fd_read(ipc_listener->client_fd, sizeof(cmd), &cmd);
if (OPAL_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
ABORT("usnic connectivity agent IPC PING read failed");
/* Will not return */
}
/* Have we already pinged this IP address / port? */
agent_ping_t *ap;
OPAL_LIST_FOREACH(ap, &ping_results, agent_ping_t) {
if (ap->dest_ipv4_addr == dest_ipv4_addr &&
ap->dest_udp_port == dest_udp_port) {
if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
ap->dest_udp_port == cmd.dest_udp_port) {
/* We already have results from pinging this IP address /
port, so there's no need for further action */
opal_output_verbose(20, USNIC_OUT,
"====== usNIC connectivity agent already pinged this peer");
return;
}
}
/* Are we in the middle of pinging this IP address / port? */
OPAL_LIST_FOREACH(ap, &pings_pending, agent_ping_t) {
if (ap->dest_ipv4_addr == dest_ipv4_addr &&
ap->dest_udp_port == dest_udp_port) {
if (ap->dest_ipv4_addr == cmd.dest_ipv4_addr &&
ap->dest_udp_port == cmd.dest_udp_port) {
/* We're already in the middle of pinging this IP address
/ port, so there's no need for further action */
return;
@ -825,9 +787,10 @@ static void agent_thread_cmd_ping(int fd, short flags, void *context)
/* This is a new ping request. Find the listener with this source
ipv4 address */
bool found = false;
agent_udp_port_listener_t *listener;
OPAL_LIST_FOREACH(listener, &listeners, agent_udp_port_listener_t) {
if (listener->ipv4_addr == src_ipv4_addr) {
agent_udp_port_listener_t *udp_listener;
OPAL_LIST_FOREACH(udp_listener, &udp_port_listeners,
agent_udp_port_listener_t) {
if (udp_listener->ipv4_addr == cmd.src_ipv4_addr) {
found = true;
break;
}
@ -844,17 +807,17 @@ static void agent_thread_cmd_ping(int fd, short flags, void *context)
ABORT("Out of memory");
/* Will not return */
}
ap->src_ipv4_addr = src_ipv4_addr;
ap->src_udp_port = src_udp_port;
ap->listener = listener;
ap->dest_ipv4_addr = dest_ipv4_addr;
ap->dest_cidrmask = dest_cidrmask;
ap->dest_udp_port = dest_udp_port;
ap->src_ipv4_addr = cmd.src_ipv4_addr;
ap->src_udp_port = cmd.src_udp_port;
ap->listener = udp_listener;
ap->dest_ipv4_addr = cmd.dest_ipv4_addr;
ap->dest_cidrmask = cmd.dest_cidrmask;
ap->dest_udp_port = cmd.dest_udp_port;
ap->dest_sockaddr.sin_family = AF_INET;
ap->dest_sockaddr.sin_addr.s_addr = dest_ipv4_addr;
ap->dest_sockaddr.sin_port = htons(dest_udp_port);
memcpy(ap->dest_mac, dest_mac, 6);
ap->dest_nodename = dest_nodename;
ap->dest_sockaddr.sin_addr.s_addr = cmd.dest_ipv4_addr;
ap->dest_sockaddr.sin_port = htons(cmd.dest_udp_port);
memcpy(ap->dest_mac, cmd.dest_mac, 6);
ap->dest_nodename = strdup(cmd.dest_nodename);
/* The first message we send will be "short" (a simple control
message); the second will be "long" (i.e., caller-specified
@ -867,7 +830,7 @@ static void agent_thread_cmd_ping(int fd, short flags, void *context)
subtract off the UDP header (which is 8 bytes). So we need to
subtract off 68 bytes from the MTU, and that's the largest ping
payload we can send. */
ap->sizes[1] = mtu - 68;
ap->sizes[1] = cmd.mtu - 68;
/* Allocate a buffer for each size. Make sure the smallest size
is at least sizeof(agent_udp_message_t). */
@ -895,6 +858,107 @@ static void agent_thread_cmd_ping(int fd, short flags, void *context)
agent_thread_send_ping(0, 0, ap);
}
/*
* Called when we get an incoming IPC message
*/
static void agent_thread_ipc_receive(int fd, short flags, void *context)
{
int32_t command;
agent_ipc_listener_t *ipc_listener = (agent_ipc_listener_t*) context;
/* Read the command */
command = -1;
int ret = opal_fd_read(fd, sizeof(command), &command);
if (OPAL_ERR_TIMEOUT == ret) {
/* We get OPAL_ERR_TIMEOUT if the remote side hung up */
OBJ_RELEASE(ipc_listener);
return;
} else if (OPAL_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
ABORT("usnic connectivity agent IPC command read failed");
/* Will not return */
}
assert(CONNECTIVITY_AGENT_CMD_LISTEN == command ||
CONNECTIVITY_AGENT_CMD_PING == command);
switch (command) {
case CONNECTIVITY_AGENT_CMD_LISTEN:
agent_thread_cmd_listen(ipc_listener);
break;
case CONNECTIVITY_AGENT_CMD_PING:
agent_thread_cmd_ping(ipc_listener);
break;
default:
ABORT("Unexpected connectivity agent command");
break;
}
}
/*
* We got a new connection on the IPC named socket. Add it to the
* event base.
*/
static void agent_thread_accept(int fd, short flags, void *context)
{
struct sockaddr addr;
socklen_t len;
agent_ipc_listener_t *listener = NULL;
int client_fd = accept(fd, &addr, &len);
if (-1 == client_fd) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("accept() failed");
/* Will not return */
}
/* If we got a good client, verify that it sent the magic token */
int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
char *msg = alloca(tlen + 1);
if (NULL == msg) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("Out of memory");
/* Will not return */
}
if (OPAL_SUCCESS != opal_fd_read(client_fd, tlen, msg)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity agent IPC read failed");
/* Will not return */
}
if (0 != memcmp(msg, CONNECTIVITY_MAGIC_TOKEN, tlen)) {
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity got bad IPC client (wrong magic token); disconnected");
close(client_fd);
return;
}
/* Make a listener object for this peer */
listener = OBJ_NEW(agent_ipc_listener_t);
listener->client_fd = client_fd;
/* Write back the magic token to ACK that we got the peer's
magic token and all is kosher */
if (OPAL_SUCCESS != opal_fd_write(client_fd, tlen,
CONNECTIVITY_MAGIC_TOKEN)) {
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
ABORT("usnic connectivity agent IPC read failed");
/* Will not return */
}
/* Add this IPC listener to the event base */
opal_event_set(evbase, &listener->event, client_fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_ipc_receive, listener);
opal_event_add(&listener->event, 0);
/* Save this listener on the list of ipc_listeners */
opal_list_append(&ipc_listeners, &listener->super);
listener->active = true;
return;
}
/*
* Agent progress thread main entry point
*/
@ -941,20 +1005,58 @@ int ompi_btl_usnic_connectivity_agent_init(void)
1000 * (mca_btl_usnic_component.connectivity_ack_timeout % 1000);
/* Create lists */
OBJ_CONSTRUCT(&listeners, opal_list_t);
OBJ_CONSTRUCT(&udp_port_listeners, opal_list_t);
OBJ_CONSTRUCT(&ipc_listeners, opal_list_t);
OBJ_CONSTRUCT(&pings_pending, opal_list_t);
OBJ_CONSTRUCT(&ping_results, opal_list_t);
/********************************************************************
* Once all of the above is setup, launch the RML receives and
* start the event loop.
* Once all of the above is setup, create the unix domain socket
* and start the event loop.
********************************************************************/
/* Setup the RML receive */
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
OMPI_RML_PERSISTENT,
agent_rml_receive, NULL);
/* Create the unix domain socket in the job session directory */
ipc_accept_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (ipc_accept_fd < 0) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("socket() failed");
/* Will not return */
}
asprintf(&ipc_filename, "%s/%s",
ompi_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
if (NULL == ipc_filename) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("Out of memory");
/* Will not return */
}
unlink(ipc_filename);
struct sockaddr_un address;
assert(strlen(ipc_filename) < sizeof(address.sun_path));
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
strncpy(address.sun_path, ipc_filename, sizeof(address.sun_path));
if (bind(ipc_accept_fd, (struct sockaddr *) &address,
sizeof(struct sockaddr_un)) != 0) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("bind() failed");
/* Will not return */
}
if (listen(ipc_accept_fd, 5) != 0) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("listen() failed");
/* Will not return */
}
/* Add the socket to the event base */
opal_event_set(evbase, &ipc_event, ipc_accept_fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_accept, NULL);
opal_event_add(&ipc_event, 0);
/* Spawn the agent thread event loop */
OBJ_CONSTRUCT(&agent_thread, opal_thread_t);
@ -986,9 +1088,6 @@ int ompi_btl_usnic_connectivity_agent_finalize(void)
return OMPI_SUCCESS;
}
/* Cancel the RML receive */
ompi_rte_recv_cancel(OMPI_NAME_WILDCARD, OMPI_RML_TAG_USNIC_CONNECTIVITY);
/* Shut down the event loop. Send it a no-op event so that it
wakes up and exits the loop. */
opal_event_t ev;
@ -997,14 +1096,11 @@ int ompi_btl_usnic_connectivity_agent_finalize(void)
opal_event_active(&ev, OPAL_EV_WRITE, 1);
opal_thread_join(&agent_thread, NULL);
/* Shut down all active listeners */
agent_udp_port_listener_t *listener, *lnext;
OPAL_LIST_FOREACH_SAFE(listener, lnext, &listeners,
/* Shut down all active udp_port_listeners */
agent_udp_port_listener_t *udp_listener, *ulnext;
OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
agent_udp_port_listener_t) {
opal_event_del(&listener->event);
close(listener->fd);
opal_list_remove_item(&listeners, &listener->super);
OBJ_RELEASE(listener);
OBJ_RELEASE(udp_listener);
}
/* Destroy the pending pings and ping results */
@ -1019,6 +1115,24 @@ int ompi_btl_usnic_connectivity_agent_finalize(void)
OBJ_RELEASE(request);
}
/* Shut down all active ipc_listeners */
agent_ipc_listener_t *ipc_listener, *inext;
OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
agent_ipc_listener_t) {
OBJ_RELEASE(ipc_listener);
}
/* Close the local IPC socket and remove the file */
if (ipc_accept_fd != -1) {
close(ipc_accept_fd);
ipc_accept_fd = -1;
}
if (NULL != ipc_filename) {
unlink(ipc_filename);
free(ipc_filename);
ipc_filename = NULL;
}
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity client finalized");
return OMPI_SUCCESS;

Просмотреть файл

@ -10,16 +10,20 @@
#include "ompi_config.h"
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>
#include <alloca.h>
#include "opal_stdint.h"
#include "opal/threads/mutex.h"
#include "opal/mca/event/event.h"
#include "opal/mca/dstore/dstore.h"
#include "opal/util/output.h"
#include "opal/util/fd.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/rte/rte.h"
@ -34,37 +38,7 @@
**************************************************************************/
static bool initialized = false;
static ompi_process_name_t agent_name;
typedef struct {
uint32_t addr;
uint32_t udp_port;
bool receive_done;
} client_rml_receive_data_t;
/*
* Receive replies from the agent
*/
static void client_rml_receive(int status, ompi_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int32_t command;
volatile client_rml_receive_data_t *cddr =
(client_rml_receive_data_t*) cbdata;
/* What command is this a reply for? */
UNPACK_INT32(buffer, command);
assert(command == CONNECTIVITY_AGENT_CMD_LISTEN);
UNPACK_UINT32(buffer, cddr->addr);
UNPACK_UINT32(buffer, cddr->udp_port);
/* Tell the main thread that the reply is done */
opal_atomic_mb();
cddr->receive_done = true;
}
static int agent_fd = -1;
/*
@ -76,38 +50,88 @@ int ompi_btl_usnic_connectivity_client_init(void)
if (!mca_btl_usnic_component.connectivity_enabled) {
return OMPI_SUCCESS;
}
assert(!initialized);
/* Get the name of the agent */
int ret;
opal_list_t vals;
opal_value_t *kv;
OBJ_CONSTRUCT(&vals, opal_list_t);
ret = opal_dstore.fetch(opal_dstore_internal,
(opal_identifier_t*) &ompi_proc_local_proc->proc_name,
OPAL_DSTORE_LOCALLDR, &vals);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
BTL_ERROR(("usNIC connectivity client unable to db_fetch local leader (1)"));
OPAL_LIST_DESTRUCT(&vals);
return ret;
/* Open local IPC socket to the agent */
agent_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (agent_fd < 0) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("socket() failed");
/* Will not return */
}
kv = (opal_value_t*) opal_list_get_first(&vals);
if (NULL == kv) {
ret = OMPI_ERR_NOT_FOUND;
OMPI_ERROR_LOG(ret);
BTL_ERROR(("usNIC connectivity client unable to db_fetch local leader (2)"));
OPAL_LIST_DESTRUCT(&vals);
return ret;
char *ipc_filename = NULL;
asprintf(&ipc_filename, "%s/%s",
ompi_process_info.job_session_dir, CONNECTIVITY_SOCK_NAME);
if (NULL == ipc_filename) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("Out of memory");
/* Will not return */
}
#if !defined(NDEBUG)
struct sockaddr_un sun;
assert(strlen(ipc_filename) <= sizeof(sun.sun_path));
#endif
/* Wait for the agent to create its socket */
struct stat sbuf;
while (1) {
int ret = stat(ipc_filename, &sbuf);
if (0 == ret) {
break;
} else if (ENOENT != errno) {
/* If the error wasn't "file not found", then something
else Bad happened */
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("stat() failed");
/* Will not return */
}
/* If the named socket wasn't there yet, then give the agent a
little time to establish it */
usleep(1);
}
/* Note that it is guaranteed that sizeof(ompi_process_name_t) ==
sizeof(uint64_t) */
memcpy(&agent_name, &kv->data.uint64, sizeof(agent_name));
OPAL_LIST_DESTRUCT(&vals);
/* Connect */
struct sockaddr_un address;
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
strncpy(address.sun_path, ipc_filename, sizeof(address.sun_path));
if (0 != connect(agent_fd, (struct sockaddr*) &address, sizeof(address))) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("connect() failed");
/* Will not return */
}
/* Send the magic token */
int tlen = strlen(CONNECTIVITY_MAGIC_TOKEN);
if (OPAL_SUCCESS != opal_fd_write(agent_fd, tlen,
CONNECTIVITY_MAGIC_TOKEN)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC connect write failed");
/* Will not return */
}
/* Receive a magic token back */
char *ack = alloca(tlen + 1);
if (NULL == ack) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("Out of memory");
/* Will not return */
}
if (OPAL_SUCCESS != opal_fd_read(agent_fd, tlen, ack)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC connect read failed");
/* Will not return */
}
if (memcmp(ack, CONNECTIVITY_MAGIC_TOKEN, tlen) != 0) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client got wrong token back from agent");
/* Will not return */
}
/* All done */
initialized = true;
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity client initialized");
@ -125,52 +149,46 @@ int ompi_btl_usnic_connectivity_listen(ompi_btl_usnic_module_t *module)
return OMPI_SUCCESS;
}
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
/* Send the LISTEN command */
int id = CONNECTIVITY_AGENT_CMD_LISTEN;
if (OPAL_SUCCESS != opal_fd_write(agent_fd, sizeof(id), &id)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC write failed");
/* Will not return */
}
/* Send the LISTEN command. Include enough information for the
agent to be able to print a show_help() message, if
necessary. */
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_LISTEN);
PACK_UINT32(msg, module->local_addr.ipv4_addr);
PACK_UINT32(msg, module->local_addr.cidrmask);
PACK_UINT32(msg, module->local_addr.mtu);
PACK_STRING(msg, ompi_process_info.nodename);
PACK_STRING(msg, module->if_name);
PACK_STRING(msg, ibv_get_device_name(module->device));
PACK_BYTES(msg, module->local_addr.mac, 6);
/* Send the LISTEN command parameters */
ompi_btl_usnic_connectivity_cmd_listen_t cmd = {
.ipv4_addr = module->local_addr.ipv4_addr,
.cidrmask = module->local_addr.cidrmask,
.mtu = module->local_addr.mtu
};
/* Ensure to NULL-terminate the passed strings */
strncpy(cmd.nodename, ompi_process_info.nodename,
CONNECTIVITY_NODENAME_LEN - 1);
strncpy(cmd.if_name, module->if_name, CONNECTIVITY_IFNAME_LEN - 1);
strncpy(cmd.usnic_name, ibv_get_device_name(module->device),
CONNECTIVITY_IFNAME_LEN - 1);
memcpy(cmd.mac, module->local_addr.mac, 6);
/* Post a receive for the agent to reply with the UDP port to me */
volatile client_rml_receive_data_t data;
data.receive_done = false;
ompi_rte_recv_buffer_nb(OMPI_NAME_WILDCARD,
OMPI_RML_TAG_USNIC_CONNECTIVITY_REPLY,
0,
client_rml_receive, (void*) &data);
/* Send it to the agent */
int ret;
ret = ompi_rte_send_buffer_nb(&agent_name, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
if (OPAL_SUCCESS != opal_fd_write(agent_fd, sizeof(cmd), &cmd)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC write failed");
/* Will not return */
}
/* Wait for the reply */
while (!data.receive_done) {
/* Sleep to let the RTE progress thread run */
usleep(1);
/* Wait for the reply with the UDP port */
ompi_btl_usnic_connectivity_cmd_listen_reply_t reply;
memset(&reply, 0, sizeof(reply));
if (OPAL_SUCCESS != opal_fd_read(agent_fd, sizeof(reply), &reply)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC read failed");
/* Will not return */
}
/* Get the UDP port number that was received */
opal_atomic_mb();
module->local_addr.connectivity_udp_port = data.udp_port;
assert(CONNECTIVITY_AGENT_CMD_LISTEN == reply.cmd);
module->local_addr.connectivity_udp_port = reply.udp_port;
return OMPI_SUCCESS;
}
@ -187,31 +205,31 @@ int ompi_btl_usnic_connectivity_ping(uint32_t src_ipv4_addr, int src_port,
return OMPI_SUCCESS;
}
opal_buffer_t *msg;
msg = OBJ_NEW(opal_buffer_t);
if (NULL == msg) {
return OMPI_ERR_OUT_OF_RESOURCE;
/* Send the PING command */
int id = CONNECTIVITY_AGENT_CMD_PING;
if (OPAL_SUCCESS != opal_fd_write(agent_fd, sizeof(id), &id)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC write failed");
/* Will not return */
}
PACK_INT32(msg, CONNECTIVITY_AGENT_CMD_PING);
PACK_UINT32(msg, src_ipv4_addr);
PACK_UINT32(msg, src_port);
PACK_UINT32(msg, dest_ipv4_addr);
PACK_UINT32(msg, dest_cidrmask);
PACK_UINT32(msg, dest_port);
PACK_BYTES(msg, dest_mac, 6);
PACK_UINT32(msg, mtu);
PACK_STRING(msg, dest_nodename);
/* Send the PING command parameters */
ompi_btl_usnic_connectivity_cmd_ping_t cmd = {
.src_ipv4_addr = src_ipv4_addr,
.src_udp_port = src_port,
.dest_ipv4_addr = dest_ipv4_addr,
.dest_cidrmask = dest_cidrmask,
.dest_udp_port = dest_port,
.mtu = mtu
};
/* Ensure to NULL-terminate the passed string */
strncpy(cmd.dest_nodename, dest_nodename, CONNECTIVITY_NODENAME_LEN - 1);
memcpy(cmd.dest_mac, dest_mac, 6);
/* Send it to the agent */
int ret;
ret = ompi_rte_send_buffer_nb(OMPI_PROC_MY_NAME, msg,
OMPI_RML_TAG_USNIC_CONNECTIVITY,
ompi_rte_send_cbfunc, NULL);
if (OMPI_SUCCESS != ret) {
OMPI_ERROR_LOG(ret);
OBJ_RELEASE(msg);
return ret;
if (OPAL_SUCCESS != opal_fd_write(agent_fd, sizeof(cmd), &cmd)) {
OMPI_ERROR_LOG(OMPI_ERR_IN_ERRNO);
ABORT("usnic connectivity client IPC write failed");
/* Will not return */
}
return OMPI_SUCCESS;
@ -228,6 +246,9 @@ int ompi_btl_usnic_connectivity_client_finalize(void)
return OMPI_SUCCESS;
}
close(agent_fd);
agent_fd = -1;
initialized = false;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -33,12 +33,12 @@
*
* If enabled (via MCA param), the usnic module_init() will setup the
* client (and server on local rank 0). For each usnic module, Each
* client will RML send a request to the server asking it to listen on
* its usnic interface. The agent will discard duplicates and setup a
* single UDP socket listener on the eth interface corresponding to
* each requested usnic interface. The agent returns the listening
* UDP port number to the client, and each client puts this UDP port
* number in their modex information.
* client will send a request to the server (via local Unix domain
* socket) asking it to listen on its usnic interface. The agent will
* discard duplicates and setup a single UDP socket listener on the
* eth interface corresponding to each requested usnic interface. The
* agent returns the listening UDP port number to the client, and each
* client puts this UDP port number in their modex information.
*
* At the first send to a given MPI process peer, the client will send
* another request to the server asking it to verify connectivity to
@ -68,7 +68,8 @@
* the answer; the agent either verifies the connectivity successfully
* or aborts the job.
*
* All client/agent communication is via the RML.
* All client/agent communication is via blocking calls to a local
* Unix domain socket.
*
* As mentioned above, the agent is smart about discarding duplicate
* ping requests from clients. Since a single agent serves all MPI
@ -87,117 +88,11 @@ struct ompi_btl_usnic_module_t;
#define ABORT(msg) ompi_btl_usnic_util_abort((msg), __FILE__, __LINE__, 1)
/**
* Helper macro to pack binary bytes into RML message buffers.
*
* Note that this macro will call ompi_btl_usnic_util_abort()
* if an error occurs.
*/
#define PACK_BYTES(buffer, value, num_bytes) \
do { \
int ret; \
ret = opal_dss.pack((buffer), (value), (num_bytes), OPAL_BYTE); \
if (OPAL_SUCCESS != ret) { \
OMPI_ERROR_LOG(ret); \
ABORT("Could not pack"); \
} \
} while(0)
/**
* @internal
*
* Back-end macro for PACK_STRING, PACK_INT32, and PACK_UINT32.
*
* Note that this macro will call ompi_btl_usnic_util_abort()
* if an error occurs.
*/
#define PACK_(buffer, type, opal_type, value) \
do { \
int ret; \
type temp = (type) (value); \
if (OPAL_SUCCESS != \
(ret = opal_dss.pack((buffer), &temp, 1, opal_type))) { \
OMPI_ERROR_LOG(ret); \
ABORT("Could not pack"); \
} \
} while(0)
/**
* Helper macro the pack a string into RML message buffers.
*/
#define PACK_STRING(buffer, value) PACK_(buffer, char *, OPAL_STRING, value)
/**
* Helper macro the pack an int32_ into RML message buffers.
*/
#define PACK_INT32(buffer, value) PACK_(buffer, int32_t, OPAL_INT32, value)
/**
* Helper macro the pack a uint32_t into RML message buffers.
*/
#define PACK_UINT32(buffer, value) PACK_(buffer, uint32_t, OPAL_UINT32, value)
/**
* Helper macro to unpack binary bytes from RML message buffers.
*
* Note that all of these macros will call ompi_btl_usnic_util_abort() if
* an error occurs.
*/
#define UNPACK_BYTES(buffer, value, num_bytes) \
do { \
int ret_value, n = (num_bytes); \
ret_value = opal_dss.unpack((buffer), value, &n, OPAL_BYTE); \
if (OPAL_SUCCESS != ret_value) { \
OMPI_ERROR_LOG(ret_value); \
ABORT("Could not unpack"); \
} \
} while(0)
/**
* @internal
*
* Back-end macro for UNPACK_STRING, UNPACK_INT32, and UNPACK_UINT32
*
* Note that this macro will call ompi_btl_usnic_util_abort() if an error
* occurs.
*
* Also note that we use a temp variable of the correct type because
* some of the values passed in to this macro are volatile, and the
* call to opal_dss.unpack() will discard that volatile qualifier.
*/
#define UNPACK(buffer, type, opal_unpack_type, value) \
do { \
int ret_value, n = 1; \
type temp; \
value = (type) 0; \
ret_value = opal_dss.unpack((buffer), &temp, &n, opal_unpack_type); \
if (OPAL_SUCCESS != ret_value) { \
OMPI_ERROR_LOG(ret_value); \
ABORT("Could not unpack"); \
} else { \
value = (type) temp; \
} \
} while(0)
/**
* Helper macro to unpack a string from RML message buffers.
*/
#define UNPACK_STRING(buffer, value) \
UNPACK(buffer, char *, OPAL_STRING, value);
/**
* Helper macro to unpack an int32_t from RML message buffers.
*/
#define UNPACK_INT32(buffer, value) \
UNPACK(buffer, int32_t, OPAL_INT32, value);
/**
* Helper macro to unpack a uint32_t from RML message buffers.
*/
#define UNPACK_UINT32(buffer, value) \
UNPACK(buffer, uint32_t, OPAL_UINT32, value)
/**
* RML message types. This value is packed as the first field in each
* RML message to identify its type. Use a non-zero value as the
* first enum just as defensive programming (i.e., it's a slightly
* lower chance that an uninitialized message type would randomly
* match these values).
* Local IPC socket message types. This value is either sent or
* packed as the first field in each message to identify its type.
* Use a non-zero value as the first enum just as defensive
* programming (i.e., it's a slightly lower chance that an
* uninitialized message type would randomly match these values).
*/
enum {
CONNECTIVITY_AGENT_CMD_LISTEN = 17,
@ -205,6 +100,58 @@ enum {
CONNECTIVITY_AGENT_CMD_MAX
};
#define CONNECTIVITY_NODENAME_LEN 128
#define CONNECTIVITY_IFNAME_LEN 32
/*
* Unix domain socket name
*/
#define CONNECTIVITY_SOCK_NAME "btl-usnic-cagent-socket"
/*
* Magic token to ensure that client/server recognize each other
*/
#define CONNECTIVITY_MAGIC_TOKEN "-*-I am usNIC; hear me roar-*-"
/*
* Fields for the LISTEN command. This struct is sent down the IPC
* socket from the cclient to the cagent.
*/
typedef struct {
uint32_t ipv4_addr;
uint32_t cidrmask;
uint32_t mtu;
char nodename[CONNECTIVITY_NODENAME_LEN];
char if_name[CONNECTIVITY_IFNAME_LEN];
char usnic_name[CONNECTIVITY_IFNAME_LEN];
uint8_t mac[6];
} ompi_btl_usnic_connectivity_cmd_listen_t;
/*
* Command+fields for the reply to the LISTEN command. This struct is
* sent down the IPC socket from the cagent to the cclient.
*/
typedef struct {
int32_t cmd;
uint32_t ipv4_addr;
uint32_t udp_port;
} ompi_btl_usnic_connectivity_cmd_listen_reply_t;
/*
* Fields for the PING command. This struct is sent down the IPC
* socket from the cclient to the cagent.
*/
typedef struct {
uint32_t src_ipv4_addr;
uint32_t src_udp_port;
uint32_t dest_ipv4_addr;
uint32_t dest_cidrmask;
uint32_t dest_udp_port;
uint32_t mtu;
char dest_nodename[CONNECTIVITY_NODENAME_LEN];
uint8_t dest_mac[6];
} ompi_btl_usnic_connectivity_cmd_ping_t;
/**
* Startup the connectivity client.
*