1
1

Add unix socket component to OOB - no longer require active network for local operations. Demonstrate inter-transport crossover.

VERY tentatively schedule this for 1.7.5 - only to be applied if we see no troubles AND the branch is ready in advance.

cmr=v1.7.5:reviewer=rhc:subject=Add unix socket component to OOB

This commit was SVN r30742.
Этот коммит содержится в:
Ralph Castain 2014-02-16 20:54:12 +00:00
родитель ecfca4c5f9
Коммит d42f4be8a4
17 изменённых файлов: 3731 добавлений и 0 удалений

58
orte/mca/oob/usock/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,58 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2009 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2012-2013 Los Alamos National Security, LLC.
# All rights reserved
# Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
oob_usock_component.h \
oob_usock.h \
oob_usock_listener.h \
oob_usock_component.c \
oob_usock_connection.h \
oob_usock_sendrecv.h \
oob_usock_hdr.h \
oob_usock_peer.h \
oob_usock_ping.h \
oob_usock.c \
oob_usock_listener.c \
oob_usock_connection.c \
oob_usock_sendrecv.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_oob_usock_DSO
component_noinst =
component_install = mca_oob_usock.la
else
component_noinst = libmca_oob_usock.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_oob_usock_la_SOURCES = $(sources)
mca_oob_usock_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_oob_usock_la_SOURCES = $(sources)
libmca_oob_usock_la_LDFLAGS = -module -avoid-version

42
orte/mca/oob/usock/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,42 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2011-2013 Los Alamos National Security, LLC.
# All rights reserved.
# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2013 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_oob_usock_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_orte_oob_usock_CONFIG],[
AC_CONFIG_FILES([orte/mca/oob/usock/Makefile])
# check for sockaddr_un (a good sign we have Unix domain sockets)
AC_CHECK_TYPES([struct sockaddr_un],
[oob_usock_happy="yes"],
[oob_usock_happy="no"],
[AC_INCLUDES_DEFAULT
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif])
AS_IF([test "$oob_usock_happy" = "yes"], [$1], [$2])
])dnl

70
orte/mca/oob/usock/help-oob-tcp.txt Обычный файл
Просмотреть файл

@ -0,0 +1,70 @@
# -*- text -*-
#
# Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2006 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
[static-and-dynamic]
Both static and dynamic port ranges were specified for the
out-of-band (OOB) communication subsystem:
Static ports: %s
Dynamic ports: %s
Only one can be specified. Please choose either static or
dynamic ports and try again.
#
[include-exclude]
Both TCP interface include and exclude lists were specified:
Include: %s
Exclude: %s
Only one of these can be given.
#
[not-parseable]
The specified network is not parseable. Since we cannot determine
your desired intent, we cannot establish a TCP socket for out-of-band
communications and will therefore abort. Please correct the network
specification and retry.
#
[no-included-found]
None of the networks specified to be included for out-of-band communications
could be found:
Value given: %s
Please revise the specification and try again.
#
[excluded-all]
The specified list of networks to be excluded for out-of-band communications
resulted in no networks being available:
Value given: %s
Please revise the specification and try again.
#
[no-interfaces-avail]
No network interfaces were found for out-of-band communications. We require
at least one available network for TCP-based messaging.
#
[invalid if_inexclude]
WARNING: An invalid value was given for oob_tcp_if_%s. This
value will be ignored.
Local host: %s
Value: %s
Message: %s
#

622
orte/mca/oob/usock/oob_usock.c Обычный файл
Просмотреть файл

@ -0,0 +1,622 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/oob/usock/oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "orte/mca/oob/usock/oob_usock_peer.h"
#include "orte/mca/oob/usock/oob_usock_connection.h"
#include "orte/mca/oob/usock/oob_usock_ping.h"
static void usock_init(void);
static void usock_fini(void);
static void accept_connection(const int accepted_fd,
const struct sockaddr *addr);
static void ping(const orte_process_name_t *proc);
static void send_nb(orte_rml_send_t *msg);
static void ft_event(int state);
mca_oob_usock_module_t mca_oob_usock_module = {
{
usock_init,
usock_fini,
accept_connection,
ping,
send_nb,
ft_event
}
};
/*
* Local utility functions
*/
static void recv_handler(int sd, short flags, void* user);
static void* progress_thread_engine(opal_object_t *obj)
{
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s USOCK PROGRESS THREAD RUNNING",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
while (mca_oob_usock_module.ev_active) {
opal_event_loop(mca_oob_usock_module.ev_base, OPAL_EVLOOP_ONCE);
}
return OPAL_THREAD_CANCELLED;
}
/*
* Initialize global variables used w/in this module.
*/
static void usock_init(void)
{
/* setup the module's state variables */
OBJ_CONSTRUCT(&mca_oob_usock_module.peers, opal_hash_table_t);
opal_hash_table_init(&mca_oob_usock_module.peers, 32);
mca_oob_usock_module.ev_active = false;
if (orte_oob_base.use_module_threads) {
/* if we are to use independent progress threads at
* the module level, start it now
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s STARTING USOCK PROGRESS THREAD",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
mca_oob_usock_module.ev_base = opal_event_base_create();
/* construct the thread object */
OBJ_CONSTRUCT(&mca_oob_usock_module.progress_thread, opal_thread_t);
/* fork off a thread to progress it */
mca_oob_usock_module.progress_thread.t_run = progress_thread_engine;
mca_oob_usock_module.progress_thread.t_arg = NULL;
mca_oob_usock_module.ev_active = true;
if (OPAL_SUCCESS != opal_thread_start(&mca_oob_usock_module.progress_thread)) {
opal_output(0, "%s USOCK progress thread failed to start",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
} else {
mca_oob_usock_module.ev_base = orte_event_base;
}
}
/*
* Module cleanup.
*/
static void usock_fini(void)
{
/* cleanup all peers */
OBJ_DESTRUCT(&mca_oob_usock_module.peers);
if (mca_oob_usock_module.ev_active) {
/* if we used an independent progress thread at
* the module level, stop it now
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s STOPPING USOCK PROGRESS THREAD",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* stop the progress thread */
mca_oob_usock_module.ev_active = false;
/* break the event loop */
opal_event_base_loopexit(mca_oob_usock_module.ev_base);
/* wait for thread to exit */
opal_thread_join(&mca_oob_usock_module.progress_thread, NULL);
OBJ_DESTRUCT(&mca_oob_usock_module.progress_thread);
/* release the event base */
opal_event_base_free(mca_oob_usock_module.ev_base);
}
}
/* Called by mca_oob_usock_accept() and connection_handler() on
* a socket that has been accepted. This call finishes processing the
* socket by registering for the OOB-level connection handshake. Used
* in both the threaded and event listen modes.
*/
static void accept_connection(const int accepted_fd,
const struct sockaddr *addr)
{
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s accept_connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* use a one-time event to wait for receipt of peer's
* process ident message to complete this connection
*/
ORTE_ACTIVATE_USOCK_ACCEPT_STATE(accepted_fd, addr, recv_handler);
}
/* API functions */
static void process_ping(int fd, short args, void *cbdata)
{
mca_oob_usock_ping_t *op = (mca_oob_usock_ping_t*)cbdata;
mca_oob_usock_peer_t *peer;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] processing ping to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->peer));
/* do we know this peer? */
if (NULL == (peer = mca_oob_usock_peer_lookup(&op->peer))) {
/* push this back to the framework so another component can try */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] hop %s unknown",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->peer));
#if 0
ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, NULL, &op->peer, mca_oob_usock_component_hop_unknown);
#endif
goto cleanup;
}
/* if we are already connected, there is nothing to do */
if (MCA_OOB_USOCK_CONNECTED == peer->state) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] already connected to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->peer));
goto cleanup;
}
/* if we are already connecting, there is nothing to do */
if (MCA_OOB_USOCK_CONNECTING == peer->state &&
MCA_OOB_USOCK_CONNECT_ACK == peer->state) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] already connecting to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->peer));
goto cleanup;
}
/* attempt the connection */
peer->state = MCA_OOB_USOCK_CONNECTING;
ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect);
cleanup:
OBJ_RELEASE(op);
}
static void ping(const orte_process_name_t *proc)
{
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] pinging peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(proc));
/* push this into our event base for processing */
ORTE_ACTIVATE_USOCK_PING(proc, process_ping);
}
static void process_send(int fd, short args, void *cbdata)
{
mca_oob_usock_msg_op_t *op = (mca_oob_usock_msg_op_t*)cbdata;
mca_oob_usock_peer_t *peer;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s:[%s:%d] processing send to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
__FILE__, __LINE__,
ORTE_NAME_PRINT(&op->msg->dst));
/* if I am an app, the only route is to my daemon, so
* send the msg there
*/
if (ORTE_PROC_IS_APP) {
if (NULL == (peer = mca_oob_usock_peer_lookup(ORTE_PROC_MY_DAEMON))) {
/* we don't know how to talk to our daemon,
* which is strange since we already got here.
* likely means we lost a race condition, so
*
*/
ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, op->msg,
ORTE_PROC_MY_DAEMON,
mca_oob_usock_component_cannot_send);
goto cleanup;
}
} else if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
/* if I am a daemon, the only way I should be given this
* message to send is if the proc is local to me
*/
if (NULL == (peer = mca_oob_usock_peer_lookup(&op->msg->dst))) {
/* we don't know how to talk to this proc,
* so send this back up to the OOB base so it
* can try another transport
*/
ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, op->msg,
&op->msg->dst,
mca_oob_usock_component_cannot_send);
goto cleanup;
}
} else {
/* otherwise, this message can't be handled by me, so
* notify the component of the mistake
*/
opal_output(0, "CAN'T BE HANDLED");
goto cleanup;
}
/* add the msg to the target's send queue */
if (MCA_OOB_USOCK_CONNECTED == peer->state) {
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s usock:send_nb: already connected to %s - queueing for send",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
MCA_OOB_USOCK_QUEUE_SEND(op->msg, peer);
goto cleanup;
}
/* add the message to the queue for sending after the
* connection is formed
*/
MCA_OOB_USOCK_QUEUE_PENDING(op->msg, peer);
if (MCA_OOB_USOCK_CONNECTING != peer->state &&
MCA_OOB_USOCK_CONNECT_ACK != peer->state) {
/* we have to initiate the connection - again, we do not
* want to block while the connection is created.
* So throw us into an event that will create
* the connection via a mini-state-machine :-)
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s usock:send_nb: initiating connection to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
peer->state = MCA_OOB_USOCK_CONNECTING;
ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect);
}
cleanup:
OBJ_RELEASE(op);
}
static void send_nb(orte_rml_send_t *msg)
{
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s usock:send_nb to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst));
/* push this into our event base for processing */
ORTE_ACTIVATE_USOCK_POST_SEND(msg, process_send);
}
/*
* Handle probe
*/
static void recv_probe(int sd, mca_oob_usock_hdr_t* hdr)
{
unsigned char* ptr = (unsigned char*)hdr;
size_t cnt = 0;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:probe called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
hdr->type = MCA_OOB_USOCK_PROBE;
hdr->dst = *ORTE_PROC_MY_NAME;
while (cnt < sizeof(mca_oob_usock_hdr_t)) {
int retval = send(sd, (char *)ptr+cnt, sizeof(mca_oob_usock_hdr_t)-cnt, 0);
if (retval < 0) {
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s-%s mca_oob_usock_peer_recv_probe: send() failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(hdr->dst)),
strerror(opal_socket_errno),
opal_socket_errno);
CLOSE_THE_SOCKET(sd);
return;
}
continue;
}
cnt += retval;
}
CLOSE_THE_SOCKET(sd);
}
/*
* Complete the OOB-level handshake to establish a connection with
* another peer. Called when the remote peer replies with his process
* identifier. Used in both the threaded and event listen modes.
*/
static void recv_connect(int sd, mca_oob_usock_hdr_t* hdr)
{
mca_oob_usock_peer_t* peer;
int flags;
int cmpval;
uint64_t *ui64;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:connect called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* check for invalid name - if this is true, then we have an error
*/
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_NAME_INVALID);
if (cmpval == OPAL_EQUAL) {
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
return;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s mca_oob_usock_recv_connect: processing connection from %s for socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->dst), sd);
/* lookup the corresponding process */
peer = mca_oob_usock_peer_lookup(&hdr->dst);
if (NULL == peer) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s mca_oob_usock_recv_connect: connection from new peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&hdr->dst));
peer = OBJ_NEW(mca_oob_usock_peer_t);
peer->name = hdr->dst;
peer->state = MCA_OOB_USOCK_ACCEPTING;
ui64 = (uint64_t*)(&peer->name);
if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) {
OBJ_RELEASE(peer);
return;
}
} else {
/* check for a race condition - if I was in the process of
* creating a connection to the peer, or have already established
* such a connection, then we need to reject this connection. We will
* let the higher ranked process retry - if I'm the lower ranked
* process, I'll simply defer until I receive the request
*/
if (MCA_OOB_USOCK_CONNECTED == peer->state ||
MCA_OOB_USOCK_CONNECTING == peer->state ||
MCA_OOB_USOCK_CONNECT_ACK == peer->state) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SIMUL CONNECTION WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&hdr->dst));
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (0 < peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
peer->sd = -1;
}
CLOSE_THE_SOCKET(sd);
cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr->dst, ORTE_PROC_MY_NAME);
if (OPAL_VALUE1_GREATER == cmpval) {
/* force the other end to retry the connection */
peer->state = MCA_OOB_USOCK_UNCONNECTED;
return;
} else {
/* retry the connection */
peer->state = MCA_OOB_USOCK_CONNECTING;
ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect);
return;
}
}
}
/* set socket up to be non-blocking */
if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
} else {
flags |= O_NONBLOCK;
if (fcntl(sd, F_SETFL, flags) < 0) {
opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_SETFL) failed: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
}
}
/* is the peer instance willing to accept this connection */
peer->sd = sd;
if (mca_oob_usock_peer_accept(peer) == false) {
if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
opal_output(0, "%s-%s mca_oob_usock_recv_connect: "
"rejected connection state %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->state);
}
CLOSE_THE_SOCKET(sd);
ui64 = (uint64_t*)(&peer->name);
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL);
OBJ_RELEASE(peer);
}
}
/*
* Event callback when there is data available on the registered
* socket to recv. This is called for the listen sockets to accept an
* incoming connection, on new sockets trying to complete the software
* connection process, and for probes. Data on an established
* connection is handled elsewhere.
*/
static void recv_handler(int sd, short flags, void *cbdata)
{
mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata;
mca_oob_usock_hdr_t hdr;
int rc;
size_t cnt;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* ensure all is zero'd */
memset(&hdr, 0, sizeof(hdr));
/* recv the process identifier */
cnt = 0;
while (cnt < sizeof(hdr)) {
rc = recv(sd, (char *)&hdr, sizeof(hdr), 0);
if (0 == rc) {
if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
opal_output(0, "%s mca_oob_usock_recv_handler: peer closed connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
CLOSE_THE_SOCKET(sd);
goto cleanup;
} else if (rc < 0) {
if (opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s mca_oob_usock_recv_handler: recv() failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(sd);
goto cleanup;
}
continue;
}
cnt += rc;
}
/* dispatch based on message type */
switch (hdr.type) {
case MCA_OOB_USOCK_PROBE:
recv_probe(sd, &hdr);
break;
case MCA_OOB_USOCK_IDENT:
recv_connect(sd, &hdr);
break;
default:
opal_output(0, "%s mca_oob_usock_recv_handler: invalid message type: %d\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr.type);
CLOSE_THE_SOCKET(sd);
break;
}
cleanup:
OBJ_RELEASE(op);
}
/* Dummy function for when we are not using FT. */
#if OPAL_ENABLE_FT_CR == 0
static void ft_event(int state)
{
return;
}
#else
static void ft_event(int state) {
opal_list_item_t *item;
if(OPAL_CRS_CHECKPOINT == state) {
#if 0
/*
* Disable event processing while we are working
*/
opal_event_disable();
#endif
}
else if(OPAL_CRS_CONTINUE == state) {
#if 0
/*
* Resume event processing
*/
opal_event_enable();
}
else if(OPAL_CRS_RESTART == state) {
/*
* Clean out cached connection information
* Select pieces of finalize/init
*/
for (item = opal_list_remove_first(&mod->peer_list);
item != NULL;
item = opal_list_remove_first(&mod->peer_list)) {
mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)item;
/* JJH: Use the below command for debugging restarts with invalid sockets
* mca_oob_usock_peer_dump(peer, "RESTART CLEAN")
*/
MCA_OOB_USOCK_PEER_RETURN(peer);
}
OBJ_DESTRUCT(&mod->peer_free);
OBJ_DESTRUCT(&mod->peer_names);
OBJ_DESTRUCT(&mod->peers);
OBJ_DESTRUCT(&mod->peer_list);
OBJ_CONSTRUCT(&mod->peer_list, opal_list_t);
OBJ_CONSTRUCT(&mod->peers, opal_hash_table_t);
OBJ_CONSTRUCT(&mod->peer_names, opal_hash_table_t);
OBJ_CONSTRUCT(&mod->peer_free, opal_free_list_t);
/*
* Resume event processing
*/
opal_event_enable();
#endif
}
else if(OPAL_CRS_TERM == state ) {
;
}
else {
;
}
return;
}
#endif

97
orte/mca/oob/usock/oob_usock.h Обычный файл
Просмотреть файл

@ -0,0 +1,97 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_H_
#define _MCA_OOB_USOCK_H_
#include "orte_config.h"
#include "orte/types.h"
#include "opal/mca/base/base.h"
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/event/event.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/oob/base/base.h"
BEGIN_C_DECLS
/* define some debug levels */
#define OOB_USOCK_DEBUG_FAIL 2
#define OOB_USOCK_DEBUG_CONNECT 7
/* forward declare a couple of structures */
struct mca_oob_usock_module_t;
struct mca_oob_usock_msg_error_t;
/* Module definition */
typedef void (*mca_oob_usock_module_init_fn_t)(void);
typedef void (*mca_oob_usock_module_fini_fn_t)(void);
typedef void (*mca_oob_usock_module_accept_connection_fn_t)(const int accepted_fd,
const struct sockaddr *addr);
typedef void (*mca_oob_usock_module_ping_fn_t)(const orte_process_name_t *proc);
typedef void (*mca_oob_usock_module_send_nb_fn_t)(orte_rml_send_t *msg);
typedef void (*mca_oob_usock_module_ft_event_fn_t)(int state);
typedef struct {
mca_oob_usock_module_init_fn_t init;
mca_oob_usock_module_fini_fn_t finalize;
mca_oob_usock_module_accept_connection_fn_t accept_connection;
mca_oob_usock_module_ping_fn_t ping;
mca_oob_usock_module_send_nb_fn_t send_nb;
mca_oob_usock_module_ft_event_fn_t ft_event;
} mca_oob_usock_module_api_t;
typedef struct {
mca_oob_usock_module_api_t api;
opal_event_base_t *ev_base; /* event base for the module progress thread */
bool ev_active;
opal_thread_t progress_thread;
opal_hash_table_t peers; // peer connection info
} mca_oob_usock_module_t;
ORTE_MODULE_DECLSPEC extern mca_oob_usock_module_t mca_oob_usock_module;
/**
* the state of the connection
*/
typedef enum {
MCA_OOB_USOCK_UNCONNECTED,
MCA_OOB_USOCK_CLOSED,
MCA_OOB_USOCK_RESOLVE,
MCA_OOB_USOCK_CONNECTING,
MCA_OOB_USOCK_CONNECT_ACK,
MCA_OOB_USOCK_CONNECTED,
MCA_OOB_USOCK_FAILED,
MCA_OOB_USOCK_ACCEPTING
} mca_oob_usock_state_t;
/* module-level shared functions */
ORTE_MODULE_DECLSPEC void mca_oob_usock_send_handler(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC void mca_oob_usock_recv_handler(int fd, short args, void *cbdata);
END_C_DECLS
#endif /* MCA_OOB_USOCK_H_ */

84
orte/mca/oob/usock/oob_usock_common.c Обычный файл
Просмотреть файл

@ -0,0 +1,84 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/oob/usock/oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "oob_usock_peer.h"
#include "oob_usock_common.h"
/**
* Set socket buffering
*/
mca_oob_usock_peer_t* mca_oob_usock_peer_lookup(const orte_process_name_t *name)
{
mca_oob_usock_peer_t *peer;
uint64_t ui64;
memcpy(&ui64, (char*)name, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, ui64, (void**)&peer)) {
return NULL;
}
return peer;
}

554
orte/mca/oob/usock/oob_usock_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,554 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will not conflict with other error codes that
* are returned by these functions under UNIX/Linux environments
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/os_path.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/session_dir.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/oob/usock/oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "orte/mca/oob/usock/oob_usock_peer.h"
#include "orte/mca/oob/usock/oob_usock_connection.h"
#include "orte/mca/oob/usock/oob_usock_listener.h"
#include "orte/mca/oob/usock/oob_usock_ping.h"
/*
* Local utility functions
*/
static int usock_component_register(void);
static int usock_component_open(void);
static int usock_component_close(void);
static bool component_available(void);
static int component_startup(void);
static void component_shutdown(void);
static int component_send(orte_rml_send_t *msg);
static char* component_get_addr(void);
static int component_set_addr(orte_process_name_t *peer,
char **uris);
static bool component_is_reachable(orte_process_name_t *peer);
/*
* Struct of function pointers and all that to let us be initialized
*/
mca_oob_usock_component_t mca_oob_usock_component = {
{
{
MCA_OOB_BASE_VERSION_2_0_0,
"usock", /* MCA module name */
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
usock_component_open, /* component open */
usock_component_close, /* component close */
NULL, /* component query */
usock_component_register, /* component register */
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
0, // reserve space for an assigned index
100, // default priority of this transport
component_available,
component_startup,
component_shutdown,
component_send,
component_get_addr,
component_set_addr,
component_is_reachable
},
};
/*
* Initialize global variables used w/in this module.
*/
static int usock_component_open(void)
{
return ORTE_SUCCESS;
}
/*
* Cleanup of global variables used by this module.
*/
static int usock_component_close(void)
{
return ORTE_SUCCESS;
}
static int usock_component_register(void)
{
mca_base_component_t *component = &mca_oob_usock_component.super.oob_base;
/* register oob module parameters */
mca_oob_usock_component.max_retries = 2;
(void)mca_base_component_var_register(component, "peer_retries",
"Number of times to try shutting down a connection before giving up",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_LOCAL,
&mca_oob_usock_component.max_retries);
return ORTE_SUCCESS;
}
static bool component_available(void)
{
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"oob:usock: component_available called");
/* if session directories were forbidden, then we cannot be used */
if (!orte_create_session_dirs) {
return false;
}
/* this component is not available to tools */
if (ORTE_PROC_IS_TOOL) {
return false;
}
/* direct-launched apps cannot use it either */
if (ORTE_PROC_IS_APP &&
(NULL == orte_process_info.my_daemon_uri)) {
return false;
}
/* otherwise, we are available */
return true;
}
/* Start the module */
static int component_startup(void)
{
int rc=ORTE_SUCCESS;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s USOCK STARTUP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* if the session directory has not already been setup, do so */
if (NULL == orte_process_info.top_session_dir) {
if (ORTE_SUCCESS != (rc = orte_session_dir(true,
orte_process_info.tmpdir_base,
orte_process_info.nodename, NULL,
ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* setup the path to the daemon rendezvous point */
memset(&mca_oob_usock_component.address, 0, sizeof(struct sockaddr_un));
mca_oob_usock_component.address.sun_family = AF_UNIX;
snprintf(mca_oob_usock_component.address.sun_path,
sizeof(mca_oob_usock_component.address.sun_path)-1,
"%s/%s/%s/0/%s", orte_process_info.tmpdir_base,
orte_process_info.top_session_dir,
ORTE_JOB_FAMILY_PRINT(ORTE_PROC_MY_NAME->jobid), "usock");
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"SUNPATH: %s", mca_oob_usock_component.address.sun_path);
/* if we are a daemon/HNP, start the listening event - this will create
* the rendezvous link
*/
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
if (ORTE_SUCCESS != (rc = orte_oob_usock_start_listening())) {
ORTE_ERROR_LOG(rc);
}
}
/* start the module */
mca_oob_usock_module.api.init();
return rc;
}
static void component_shutdown(void)
{
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s USOCK SHUTDOWN",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
if (mca_oob_usock_component.listener_ev_active) {
opal_event_del(&mca_oob_usock_component.listener_event);
mca_oob_usock_component.listener_ev_active = false;
}
/* delete the rendezvous file */
unlink(mca_oob_usock_component.address.sun_path);
}
/* shutdown the module */
if (NULL != mca_oob_usock_module.api.finalize) {
mca_oob_usock_module.api.finalize();
}
}
static int component_send(orte_rml_send_t *msg)
{
orte_proc_t *proc;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:usock:send_nb to peer %s:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst), msg->tag);
if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) {
/* daemons can only reach local procs */
if (NULL == (proc = orte_get_proc_object(&msg->dst))) {
return ORTE_ERR_TAKE_NEXT_OPTION;
}
if (!proc->local_proc) {
return ORTE_ERR_TAKE_NEXT_OPTION;
}
}
/* apps can reach anyone via this module as the daemon
* will route the message to the final destination
*/
mca_oob_usock_module.api.send_nb(msg);
return ORTE_SUCCESS;
}
static char* component_get_addr(void)
{
char *tmp;
tmp = strdup(mca_oob_usock_component.address.sun_path);
return tmp;
}
static int component_set_addr(orte_process_name_t *peer,
char **uris)
{
orte_proc_t *proc;
mca_oob_usock_peer_t *pr;
uint64_t *ui64;
/* if I am an application, then everything is addressable
* by me via my daemon
*/
if (ORTE_PROC_IS_APP) {
ui64 = (uint64_t*)peer;
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers,
(*ui64), (void**)&pr) || NULL == pr) {
pr = OBJ_NEW(mca_oob_usock_peer_t);
pr->name = *peer;
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr);
return ORTE_SUCCESS;
}
}
/* if I am a daemon or HNP, I can only reach my
* own local procs via this component
*/
if (ORTE_PROC_MY_NAME->jobid == peer->jobid) {
/* another daemon */
return ORTE_ERR_TAKE_NEXT_OPTION;
}
if (NULL == (proc = orte_get_proc_object(peer)) ||
!proc->local_proc) {
return ORTE_ERR_TAKE_NEXT_OPTION;
}
/* indicate that this peer is addressable by this component */
ui64 = (uint64_t*)peer;
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers,
(*ui64), (void**)&pr) || NULL == pr) {
pr = OBJ_NEW(mca_oob_usock_peer_t);
pr->name = *peer;
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr);
}
return ORTE_SUCCESS;
}
void mca_oob_usock_component_set_module(int fd, short args, void *cbdata)
{
mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata;
uint64_t ui64;
int rc;
orte_oob_base_peer_t *bpr;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:set_module called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer->name));
/* retrieve the peer's name */
memcpy(&ui64, (char*)&(pop->peer->name), sizeof(uint64_t));
/* make sure the OOB knows that we are handling this peer - we
* are in the same event base as the OOB base, so we can
* directly access its storage
*/
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&bpr) || NULL == bpr) {
bpr = OBJ_NEW(orte_oob_base_peer_t);
}
opal_bitmap_set_bit(&bpr->addressable, mca_oob_usock_component.super.idx);
bpr->component = &mca_oob_usock_component.super;
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, bpr))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(pop);
}
void mca_oob_usock_component_lost_connection(int fd, short args, void *cbdata)
{
mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata;
uint64_t ui64;
int rc;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:lost connection called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer->name));
/* retrieve the peer's name */
memcpy(&ui64, (char*)&(pop->peer->name), sizeof(uint64_t));
/* mark the OOB's table that we can't reach it any more - for now, we don't
* worry about shifting to another component. Eventually, we will want to push
* this decision to the OOB so it can try other components and eventually error out
*/
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* activate the proc state */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer->name)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED);
}
OBJ_RELEASE(pop);
}
void mca_oob_usock_component_cannot_send(int fd, short args, void *cbdata)
{
mca_oob_usock_msg_error_t *pop = (mca_oob_usock_msg_error_t*)cbdata;
uint64_t ui64;
int rc;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:unable to send to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->hop));
/* retrieve the peer's name */
memcpy(&ui64, (char*)&(pop->hop), sizeof(uint64_t));
/* mark the OOB's table that we can't reach it any more - for now, we don't
* worry about shifting to another component. Eventually, we will want to push
* this decision to the OOB so it can try other components and eventually error out
*/
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers,
ui64, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* have the OOB base try to send it again */
ORTE_OOB_SEND(pop->rmsg);
OBJ_RELEASE(pop);
}
void mca_oob_usock_component_failed_to_connect(int fd, short args, void *cbdata)
{
mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:failed_to_connect called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer->name));
/* if we are terminating, then don't do anything further */
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
OBJ_RELEASE(pop);
return;
}
/* activate the proc state */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:failed_to_connect unable to reach peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pop->peer->name));
/* if this was a lifeline, then alert */
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer->name)) {
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST);
} else {
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED);
}
OBJ_RELEASE(pop);
}
static bool component_is_reachable(orte_process_name_t *peer)
{
orte_proc_t *proc;
/* if I am an application, then everything is reachable
* by me via my daemon
*/
if (ORTE_PROC_IS_APP) {
return true;
}
/* if I am a daemon or HNP, I can only reach my
* own local procs via this component
*/
if (ORTE_PROC_MY_NAME->jobid == peer->jobid) {
/* another daemon */
return false;
}
if (NULL == (proc = orte_get_proc_object(peer)) ||
!proc->local_proc) {
return false;
}
/* indicate that this peer is reachable by this component */
return true;
}
char* mca_oob_usock_state_print(mca_oob_usock_state_t state)
{
switch (state) {
case MCA_OOB_USOCK_UNCONNECTED:
return "UNCONNECTED";
case MCA_OOB_USOCK_CLOSED:
return "CLOSED";
case MCA_OOB_USOCK_RESOLVE:
return "RESOLVE";
case MCA_OOB_USOCK_CONNECTING:
return "CONNECTING";
case MCA_OOB_USOCK_CONNECT_ACK:
return "ACK";
case MCA_OOB_USOCK_CONNECTED:
return "CONNECTED";
case MCA_OOB_USOCK_FAILED:
return "FAILED";
default:
return "UNKNOWN";
}
}
mca_oob_usock_peer_t* mca_oob_usock_peer_lookup(const orte_process_name_t *name)
{
mca_oob_usock_peer_t *peer;
uint64_t ui64;
memcpy(&ui64, (char*)name, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, ui64, (void**)&peer)) {
return NULL;
}
return peer;
}
/* OOB USOCK Class instances */
static void peer_cons(mca_oob_usock_peer_t *peer)
{
peer->sd = -1;
peer->state = MCA_OOB_USOCK_UNCONNECTED;
OBJ_CONSTRUCT(&peer->send_queue, opal_list_t);
peer->send_msg = NULL;
peer->recv_msg = NULL;
peer->send_ev_active = false;
peer->recv_ev_active = false;
peer->timer_ev_active = false;
}
static void peer_des(mca_oob_usock_peer_t *peer)
{
if (0 <= peer->sd) {
CLOSE_THE_SOCKET(peer->sd);
}
OPAL_LIST_DESTRUCT(&peer->send_queue);
}
OBJ_CLASS_INSTANCE(mca_oob_usock_peer_t,
opal_list_item_t,
peer_cons, peer_des);
OBJ_CLASS_INSTANCE(mca_oob_usock_peer_op_t,
opal_object_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(mca_oob_usock_msg_op_t,
opal_object_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(mca_oob_usock_conn_op_t,
opal_object_t,
NULL, NULL);
OBJ_CLASS_INSTANCE(mca_oob_usock_ping_t,
opal_object_t,
NULL, NULL);

69
orte/mca/oob/usock/oob_usock_component.h Обычный файл
Просмотреть файл

@ -0,0 +1,69 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_COMPONENT_H_
#define _MCA_OOB_USOCK_COMPONENT_H_
#include "orte_config.h"
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
#include "opal/class/opal_bitmap.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_pointer_array.h"
#include "orte/mca/oob/oob.h"
#include "oob_usock_listener.h"
#include "oob_usock_peer.h"
#include "oob_usock.h"
/**
* OOB USOCK Component
*/
typedef struct {
mca_oob_base_component_t super; /**< base OOB component */
int max_retries; /**< max number of retries before declaring peer gone */
struct sockaddr_un address; /**< address of our rendezvous point */
/* connection support */
opal_event_t listener_event; /**< my listener event */
bool listener_ev_active;
int listener_socket;
} mca_oob_usock_component_t;
ORTE_MODULE_DECLSPEC extern mca_oob_usock_component_t mca_oob_usock_component;
ORTE_MODULE_DECLSPEC char* mca_oob_usock_state_print(mca_oob_usock_state_t state);
ORTE_MODULE_DECLSPEC void mca_oob_usock_component_set_module(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC void mca_oob_usock_component_lost_connection(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC void mca_oob_usock_component_failed_to_connect(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC mca_oob_usock_peer_t* mca_oob_usock_peer_lookup(const orte_process_name_t *name);
ORTE_MODULE_DECLSPEC void mca_oob_usock_component_cannot_send(int fd, short args, void *cbdata);
#endif /* _MCA_OOB_USOCK_COMPONENT_H_ */

755
orte/mca/oob/usock/oob_usock_connection.c Обычный файл
Просмотреть файл

@ -0,0 +1,755 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <fcntl.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_NET_UIO_H
#include <net/uio.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "opal/opal_socket_errno.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#include "opal/types.h"
#include "opal_stdint.h"
#include "opal/mca/backtrace/backtrace.h"
#include "opal/mca/base/mca_base_var.h"
#include "opal/util/output.h"
#include "opal/util/net.h"
#include "opal/util/error.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/event/event.h"
#include "orte/util/name_fns.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/routed/routed.h"
#include "orte/runtime/orte_wait.h"
#include "oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "orte/mca/oob/usock/oob_usock_peer.h"
#include "orte/mca/oob/usock/oob_usock_connection.h"
static void usock_peer_event_init(mca_oob_usock_peer_t* peer);
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer);
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
void* data, size_t size);
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
void* data, size_t size);
static void usock_peer_connected(mca_oob_usock_peer_t* peer);
static int usock_peer_create_socket(mca_oob_usock_peer_t* peer)
{
int flags;
if (peer->sd > 0) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output,
"%s oob:usock:peer creating socket to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name))));
peer->sd = socket(PF_UNIX, SOCK_STREAM, 0);
if (peer->sd < 0) {
opal_output(0, "%s-%s usock_peer_create_socket: socket() failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
return ORTE_ERR_UNREACH;
}
/* setup event callbacks */
usock_peer_event_init(peer);
/* setup the socket as non-blocking */
if (peer->sd >= 0) {
if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
opal_output(0, "%s-%s usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(peer->sd, F_SETFL, flags) < 0)
opal_output(0, "%s-%s usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
}
}
return ORTE_SUCCESS;
}
/*
* Try connecting to a peer
*/
void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata)
{
mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata;
mca_oob_usock_peer_t *peer = op->peer;
int rc;
opal_socklen_t addrlen = 0;
mca_oob_usock_send_t *snd;
bool connected = false;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s orte_usock_peer_try_connect: "
"attempting to connect to proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
rc = usock_peer_create_socket(peer);
if (ORTE_SUCCESS != rc) {
/* FIXME: we cannot create a USOCK socket - report
* back to the component that this peer is
* unreachable so it can remove the peer
* from its list and report back to the base
* NOTE: this could be a reconnect attempt,
* so we also need to mark any queued messages
* and return them as "unreachable"
*/
opal_output(0, "%s CANNOT CREATE SOCKET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_FORCED_TERMINATE(1);
OBJ_RELEASE(op);
return;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s orte_usock_peer_try_connect: "
"attempting to connect to proc %s on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
addrlen = sizeof(struct sockaddr_un);
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s orte_usock_peer_try_connect: "
"attempting to connect to proc %s - %d retries",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->retries);
retry_connect:
peer->retries++;
if (connect(peer->sd, (struct sockaddr *) &mca_oob_usock_component.address, addrlen) < 0) {
/* non-blocking so wait for completion */
if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s waiting for connect completion to %s - activating send event",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* just ensure the send_event is active */
if (!peer->send_ev_active) {
opal_event_add(&peer->send_event, 0);
peer->send_ev_active = true;
}
OBJ_RELEASE(op);
return;
}
/* Some kernels (Linux 2.6) will automatically software
abort a connection that was ECONNREFUSED on the last
attempt, without even trying to establish the
connection. Handle that case in a semi-rational
way by trying twice before giving up */
if (ECONNABORTED == opal_socket_errno) {
if (peer->retries < mca_oob_usock_component.max_retries) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connection aborted by OS to %s - retrying",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
goto retry_connect;
} else {
/* We were unsuccessful in establishing this connection, and are
* not likely to suddenly become successful,
*/
peer->state = MCA_OOB_USOCK_FAILED;
connected = false;
}
}
} else {
/* connection succeeded */
peer->retries = 0;
connected = true;
}
if (!connected) {
/* we cannot reach this peer */
peer->state = MCA_OOB_USOCK_FAILED;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s orte_usock_peer_try_connect: "
"Connection across unix domain socket to local proc %s failed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* let the USOCK component know that this module failed to make
* the connection so it can try other modules, and/or fail back
* to the OOB level so another component can try. This will activate
* an event in the component event base, and so it will fire async
* from us if we are in our own progress thread
*/
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect);
/* FIXME: post any messages in the send queue back to the OOB
* level for reassignment
*/
if (NULL != peer->send_msg) {
}
while (NULL != (snd = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue))) {
}
goto cleanup;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s orte_usock_peer_try_connect: "
"Connection across to proc %s succeeded",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* setup our recv to catch the return ack call */
if (!peer->recv_ev_active) {
opal_event_add(&peer->recv_event, 0);
peer->recv_ev_active = true;
}
/* send our globally unique process identifier to the peer */
if (ORTE_SUCCESS == (rc = usock_peer_send_connect_ack(peer))) {
peer->state = MCA_OOB_USOCK_CONNECT_ACK;
} else {
opal_output(0,
"%s orte_usock_peer_try_connect: "
"usock_peer_send_connect_ack to proc %s failed: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
opal_strerror(rc), rc);
ORTE_FORCED_TERMINATE(1);
}
cleanup:
OBJ_RELEASE(op);
}
static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer)
{
mca_oob_usock_hdr_t hdr;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* send a handshake that includes our process identifier
* to ensure we are talking to another OMPI process
*/
hdr.dst = *ORTE_PROC_MY_NAME;
hdr.type = MCA_OOB_USOCK_IDENT;
hdr.tag = 0;
hdr.nbytes = 0;
if (ORTE_SUCCESS != usock_peer_send_blocking(peer, &hdr, sizeof(hdr))) {
ORTE_ERROR_LOG(ORTE_ERR_UNREACH);
return ORTE_ERR_UNREACH;
}
return ORTE_SUCCESS;
}
/*
* Initialize events to be used by the peer instance for USOCK select/poll callbacks.
*/
static void usock_peer_event_init(mca_oob_usock_peer_t* peer)
{
if (peer->sd >= 0) {
opal_event_set(mca_oob_usock_module.ev_base,
&peer->recv_event,
peer->sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_usock_recv_handler,
peer);
opal_event_set_priority(&peer->recv_event, ORTE_MSG_PRI);
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
opal_event_set(mca_oob_usock_module.ev_base,
&peer->send_event,
peer->sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_oob_usock_send_handler,
peer);
opal_event_set_priority(&peer->send_event, ORTE_MSG_PRI);
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
}
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the peer on the
* newly connected socket.
*/
void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t *peer)
{
int so_error = 0;
opal_socklen_t so_length = sizeof(so_error);
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:complete_connect called for peer %s on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
/* check connect completion status */
if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
opal_output(0, "%s usock_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
return;
}
if (so_error == EINPROGRESS) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:send:handler still in progress",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return;
} else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s-%s usock_peer_complete_connect: connection failed: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(so_error),
so_error);
mca_oob_usock_peer_close(peer);
return;
} else if (so_error != 0) {
/* No need to worry about the return code here - we return regardless
at this point, and if an error did occur a message has already been
printed for the user */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s-%s usock_peer_complete_connect: "
"connection failed with error %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), so_error);
mca_oob_usock_peer_close(peer);
return;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock_peer_complete_connect: "
"sending ack to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
if (usock_peer_send_connect_ack(peer) == ORTE_SUCCESS) {
peer->state = MCA_OOB_USOCK_CONNECT_ACK;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock_peer_complete_connect: "
"setting read event on connection to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
if (!peer->recv_ev_active) {
opal_event_add(&peer->recv_event, 0);
peer->recv_ev_active = true;
}
} else {
opal_output(0, "%s usock_peer_complete_connect: unable to send connect ack to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
}
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer,
void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
int retval;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s sending connect-ack to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
while (cnt < size) {
retval = send(peer->sd, (char*)ptr+cnt, size-cnt, 0);
if (retval < 0) {
if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "%s usock_peer_send_blocking: send() to %s failed: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
return ORTE_ERR_UNREACH;
}
continue;
}
cnt += retval;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack sent to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
return ORTE_SUCCESS;
}
/*
* Receive the peers globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer)
{
mca_oob_usock_hdr_t hdr;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
/* ensure all is zero'd */
memset(&hdr, 0, sizeof(hdr));
if (usock_peer_recv_blocking(peer, &hdr, sizeof(hdr))) {
/* If the peer state is CONNECT_ACK, then we were waiting for
* the connection to be ack'd
*/
if (peer->state != MCA_OOB_USOCK_CONNECT_ACK) {
/* handshake broke down - abort this connection */
opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
mca_oob_usock_peer_close(peer);
return ORTE_ERR_UNREACH;
}
} else {
/* unable to complete the recv */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->sd);
return ORTE_ERR_UNREACH;
}
if (hdr.type != MCA_OOB_USOCK_IDENT) {
opal_output(0, "usock_peer_recv_connect_ack: invalid header type: %d\n", hdr.type);
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
return ORTE_ERR_UNREACH;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect-ack recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
/* set the peer into the component and OOB-level peer tables to indicate
* that we know this peer and we will be handling him
*/
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_set_module);
/* connected */
usock_peer_connected(peer);
if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
mca_oob_usock_peer_dump(peer, "connected");
}
return ORTE_SUCCESS;
}
/*
* Setup peer state to reflect that connection has been established,
* and start any pending sends.
*/
static void usock_peer_connected(mca_oob_usock_peer_t* peer)
{
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s-%s usock_peer_connected on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
if (peer->timer_ev_active) {
opal_event_del(&peer->timer_event);
peer->timer_ev_active = false;
}
peer->state = MCA_OOB_USOCK_CONNECTED;
/* update the route */
orte_routed.update_route(&peer->name, &peer->name);
/* initiate send of first message on queue */
if (NULL == peer->send_msg) {
peer->send_msg = (mca_oob_usock_send_t*)
opal_list_remove_first(&peer->send_queue);
}
if (NULL != peer->send_msg && !peer->send_ev_active) {
opal_event_add(&peer->send_event, 0);
peer->send_ev_active = true;
}
}
/*
* Remove any event registrations associated with the socket
* and update the peer state to reflect the connection has
* been closed.
*/
void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer)
{
mca_oob_usock_send_t *snd;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock_peer_close for %s sd %d state %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->sd, mca_oob_usock_state_print(peer->state));
peer->state = MCA_OOB_USOCK_CLOSED;
/* release the socket */
close(peer->sd);
/* inform the component-level that we have lost a connection so
* it can decide what to do about it.
*/
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_lost_connection);
if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) {
/* nothing more to do */
return;
}
/* FIXME: push any queued messages back onto the OOB for retry - note that
* this must be done after the prior call to ensure that the component
* processes the "lost connection" notice before the OOB begins to
* handle these recycled messages. This prevents us from unintentionally
* attempting to send the message again across the now-failed interface
*/
if (NULL != peer->send_msg) {
}
while (NULL != (snd = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue))) {
}
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the peers endpoint.
*/
static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer,
void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s waiting for connect ack from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
while (cnt < size) {
int retval = recv(peer->sd, (char *)ptr+cnt, size-cnt, 0);
/* remote closed connection */
if (retval == 0) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s-%s usock_peer_recv_blocking: "
"peer closed connection: peer state %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->state);
mca_oob_usock_peer_close(peer);
return false;
}
/* socket is non-blocking so handle errors */
if (retval < 0) {
if (opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
if (peer->state == MCA_OOB_USOCK_CONNECT_ACK) {
/* If we overflow the listen backlog, it's
possible that even though we finished the three
way handshake, the remote host was unable to
transition the connection from half connected
(received the initial SYN) to fully connected
(in the listen backlog). We likely won't see
the failure until we try to receive, due to
timing and the like. The first thing we'll get
in that case is a RST packet, which receive
will turn into a connection reset by peer
errno. In that case, leave the socket in
CONNECT_ACK and propogate the error up to
recv_connect_ack, who will try to establish the
connection again */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect ack received error %s from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
strerror(opal_socket_errno),
ORTE_NAME_PRINT(&(peer->name)));
return false;
} else {
opal_output(0,
"%s usock_peer_recv_blocking: "
"recv() failed for %s: %s (%d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
return false;
}
}
continue;
}
cnt += retval;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connect ack received from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
return true;
}
/*
* Routine for debugging to print the connection state and socket options
*/
void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg)
{
char buff[255];
int nodelay,flags;
if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) {
opal_output(0, "usock_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n",
strerror(opal_socket_errno),
opal_socket_errno);
}
#if defined(USOCK_NODELAY)
optlen = sizeof(nodelay);
if (getsockopt(peer->sd, IPPROTO_USOCK, USOCK_NODELAY, (char *)&nodelay, &optlen) < 0) {
opal_output(0, "usock_peer_dump: USOCK_NODELAY option: %s (%d)\n",
strerror(opal_socket_errno),
opal_socket_errno);
}
#else
nodelay = 0;
#endif
snprintf(buff, sizeof(buff), "%s-%s %s: nodelay %d flags %08x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
msg, nodelay, flags);
opal_output(0, "%s", buff);
}
/*
* Accept incoming connection - if not already connected
*/
bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer)
{
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:peer_accept called for peer %s in state %s on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name),
mca_oob_usock_state_print(peer->state), peer->sd);
if (peer->state != MCA_OOB_USOCK_CONNECTED) {
usock_peer_event_init(peer);
if (usock_peer_send_connect_ack(peer) != ORTE_SUCCESS) {
opal_output(0, "%s-%s usock_peer_accept: "
"usock_peer_send_connect_ack failed\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
peer->state = MCA_OOB_USOCK_FAILED;
mca_oob_usock_peer_close(peer);
return false;
}
/* set the peer into the component and OOB-level peer tables to indicate
* that we know this peer and we will be handling him
*/
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_set_module);
usock_peer_connected(peer);
if (!peer->recv_ev_active) {
opal_event_add(&peer->recv_event, 0);
peer->recv_ev_active = true;
}
/* if a message is waiting to be sent, ensure the send event is active */
if (NULL != peer->send_msg && !peer->send_ev_active) {
opal_event_add(&peer->send_event, 0);
peer->send_ev_active = true;
}
if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) {
mca_oob_usock_peer_dump(peer, "accepted");
}
return true;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:peer_accept ignored for peer %s in state %s on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name),
mca_oob_usock_state_print(peer->state), peer->sd);
return false;
}

101
orte/mca/oob/usock/oob_usock_connection.h Обычный файл
Просмотреть файл

@ -0,0 +1,101 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_CONNECTION_H_
#define _MCA_OOB_USOCK_CONNECTION_H_
#include "orte_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#include "oob_usock.h"
#include "oob_usock_peer.h"
/* State machine for connection operations */
typedef struct {
opal_object_t super;
mca_oob_usock_peer_t *peer;
opal_event_t ev;
} mca_oob_usock_conn_op_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_conn_op_t);
#define CLOSE_THE_SOCKET(socket) \
do { \
shutdown(socket, 2); \
close(socket); \
} while(0)
#define ORTE_ACTIVATE_USOCK_CONN_STATE(p, cbfunc) \
do { \
mca_oob_usock_conn_op_t *cop; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] connect to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT((&(p)->name))); \
cop = OBJ_NEW(mca_oob_usock_conn_op_t); \
cop->peer = (p); \
opal_event_set(mca_oob_usock_module.ev_base, &cop->ev, -1, \
OPAL_EV_WRITE, (cbfunc), cop); \
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
opal_event_active(&cop->ev, OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_ACTIVATE_USOCK_ACCEPT_STATE(s, a, cbfunc) \
do { \
mca_oob_usock_conn_op_t *cop; \
cop = OBJ_NEW(mca_oob_usock_conn_op_t); \
opal_event_set(mca_oob_usock_module.ev_base, &cop->ev, s, \
OPAL_EV_READ, (cbfunc), cop); \
opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \
opal_event_add(&cop->ev, 0); \
} while(0);
#define ORTE_RETRY_USOCK_CONN_STATE(p, cbfunc, tv) \
do { \
mca_oob_usock_conn_op_t *cop; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] retry connect to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT((&(p)->name))); \
cop = OBJ_NEW(mca_oob_usock_conn_op_t); \
cop->peer = (p); \
opal_event_evtimer_set(mca_oob_usock_module.ev_base, \
&cop->ev, \
(cbfunc), cop); \
opal_event_evtimer_add(&cop->ev, (tv)); \
} while(0);
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata);
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg);
ORTE_MODULE_DECLSPEC bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer);
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t* peer);
ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer);
ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer);
#endif /* _MCA_OOB_USOCK_CONNECTION_H_ */

53
orte/mca/oob/usock/oob_usock_hdr.h Обычный файл
Просмотреть файл

@ -0,0 +1,53 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_HDR_H_
#define _MCA_OOB_USOCK_HDR_H_
#include "orte_config.h"
/* define several internal-only message
* types this component uses for its own
* handshake operations, plus one indicating
* the message came from an external (to
* this component) source
*/
typedef enum {
MCA_OOB_USOCK_IDENT,
MCA_OOB_USOCK_PROBE,
MCA_OOB_USOCK_PING,
MCA_OOB_USOCK_USER
} mca_oob_usock_msg_type_t;
/* header for usock msgs */
typedef struct {
/* the intended final recipient */
orte_process_name_t dst;
/* type of message */
mca_oob_usock_msg_type_t type;
/* the rml tag where this message is headed */
orte_rml_tag_t tag;
/* number of bytes in message */
uint32_t nbytes;
} mca_oob_usock_hdr_t;
#endif /* _MCA_OOB_USOCK_HDR_H_ */

181
orte/mca/oob/usock/oob_usock_listener.c Обычный файл
Просмотреть файл

@ -0,0 +1,181 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will not conflict with other error codes that
* are returned by these functions under UNIX/Linux environments
*/
#include "orte_config.h"
#include "orte/types.h"
#include "opal/types.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include <fcntl.h>
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#include <ctype.h>
#include "opal/util/show_help.h"
#include "opal/util/error.h"
#include "opal/util/output.h"
#include "opal/opal_socket_errno.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
#include "opal/util/argv.h"
#include "opal/class/opal_hash_table.h"
#include "opal/class/opal_list.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/util/name_fns.h"
#include "orte/util/parse_options.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/oob/usock/oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "orte/mca/oob/usock/oob_usock_peer.h"
#include "orte/mca/oob/usock/oob_usock_connection.h"
#include "orte/mca/oob/usock/oob_usock_listener.h"
static void connection_event_handler(int incoming_sd, short flags, void* cbdata);
/*
* start listening on our rendezvous file
*/
int orte_oob_usock_start_listening(void)
{
int flags;
opal_socklen_t addrlen;
int sd = -1;
/* create a listen socket for incoming connection attempts */
sd = socket(PF_UNIX, SOCK_STREAM, 0);
if (sd < 0) {
if (EAFNOSUPPORT != opal_socket_errno) {
opal_output(0,"mca_oob_usock_start_listening: socket() failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
}
return ORTE_ERR_IN_ERRNO;
}
addrlen = sizeof(struct sockaddr_un);
if (bind(sd, (struct sockaddr*)&mca_oob_usock_component.address, addrlen) < 0) {
opal_output(0, "%s bind() failed on error %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
strerror(opal_socket_errno),
opal_socket_errno );
CLOSE_THE_SOCKET(sd);
return ORTE_ERROR;
}
/* setup listen backlog to maximum allowed by kernel */
if (listen(sd, SOMAXCONN) < 0) {
opal_output(0, "mca_oob_usock_component_init: listen(): %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if ((flags = fcntl(sd, F_GETFL, 0)) < 0) {
opal_output(0, "mca_oob_usock_component_init: fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
flags |= O_NONBLOCK;
if (fcntl(sd, F_SETFL, flags) < 0) {
opal_output(0, "mca_oob_usock_component_init: fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERROR;
}
/* record this socket */
mca_oob_usock_component.listener_socket = sd;
/* setup to listen via the event lib */
mca_oob_usock_component.listener_ev_active = true;
opal_event_set(orte_event_base, &mca_oob_usock_component.listener_event,
mca_oob_usock_component.listener_socket,
OPAL_EV_READ|OPAL_EV_PERSIST,
connection_event_handler,
0);
opal_event_set_priority(&mca_oob_usock_component.listener_event, ORTE_MSG_PRI);
opal_event_add(&mca_oob_usock_component.listener_event, 0);
return ORTE_SUCCESS;
}
/*
* Handler for accepting connections from the event library
*/
static void connection_event_handler(int incoming_sd, short flags, void* cbdata)
{
struct sockaddr addr;
opal_socklen_t addrlen = sizeof(struct sockaddr);
int sd;
sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen);
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s connection_event_handler: working connection "
"(%d, %d)\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
sd, opal_socket_errno);
if (sd < 0) {
if (EINTR == opal_socket_errno) {
return;
}
if (opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
if (EMFILE == opal_socket_errno) {
/*
* Close incoming_sd so that orte_show_help will have a file
* descriptor with which to open the help file. We will be
* exiting anyway, so we don't need to keep it open.
*/
CLOSE_THE_SOCKET(incoming_sd);
ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS);
orte_show_help("help-orterun.txt", "orterun:sys-limit-sockets", true);
} else {
opal_output(0, "mca_oob_usock_accept: accept() failed: %s (%d).",
strerror(opal_socket_errno), opal_socket_errno);
}
orte_errmgr.abort(ORTE_ERROR_DEFAULT_EXIT_CODE, NULL);
}
return;
}
/* process the connection */
mca_oob_usock_module.api.accept_connection(sd, &addr);
}

52
orte/mca/oob/usock/oob_usock_listener.h Обычный файл
Просмотреть файл

@ -0,0 +1,52 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_LISTENER_H_
#define _MCA_OOB_USOCK_LISTENER_H_
#include "orte_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
/*
* Data structure for accepting connections.
*/
struct mca_oob_usock_listener_t {
opal_object_t super;
bool ev_active;
opal_event_t event;
int sd;
};
typedef struct mca_oob_usock_listener_t mca_oob_usock_listener_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_listener_t);
ORTE_MODULE_DECLSPEC int orte_oob_usock_start_listening(void);
#endif /* _MCA_OOB_USOCK_LISTENER_H_ */

84
orte/mca/oob/usock/oob_usock_peer.h Обычный файл
Просмотреть файл

@ -0,0 +1,84 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_PEER_H_
#define _MCA_OOB_USOCK_PEER_H_
#include "orte_config.h"
#include "oob_usock.h"
#include "oob_usock_sendrecv.h"
/* object for tracking peers */
typedef struct {
opal_list_item_t super;
/* although not required, there is enough debug
* value that retaining the name makes sense
*/
orte_process_name_t name;
int sd;
int retries; // number of times we have tried to connect to this address
mca_oob_usock_state_t state;
opal_event_t op_event; // used for connecting and operations other than read/write
opal_event_t send_event; /**< registration with event thread for send events */
bool send_ev_active;
opal_event_t recv_event; /**< registration with event thread for recv events */
bool recv_ev_active;
opal_event_t timer_event; /**< timer for retrying connection failures */
bool timer_ev_active;
opal_list_t send_queue; /**< list of messages to send */
mca_oob_usock_send_t *send_msg; /**< current send in progress */
mca_oob_usock_recv_t *recv_msg; /**< current recv in progress */
} mca_oob_usock_peer_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_peer_t);
typedef struct {
opal_object_t super;
opal_event_t ev;
mca_oob_usock_peer_t *peer;
} mca_oob_usock_peer_op_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_peer_op_t);
#define ORTE_ACTIVATE_USOCK_PEER_OP(p, cbfunc) \
do { \
mca_oob_usock_peer_op_t *op; \
op = OBJ_NEW(mca_oob_usock_peer_op_t); \
op->peer = (p); \
opal_event_set(mca_usock_component.ev_base, &op->ev, -1, \
OPAL_EV_WRITE, (cbfunc), op); \
opal_event_set_priority(&op->ev, ORTE_MSG_PRI); \
opal_event_active(&op->ev, OPAL_EV_WRITE, 1); \
} while(0);
#define ORTE_ACTIVATE_USOCK_CMP_OP(p, cbfunc) \
do { \
mca_oob_usock_peer_op_t *pop; \
pop = OBJ_NEW(mca_oob_usock_peer_op_t); \
pop->peer = (p); \
opal_event_set(orte_event_base, &pop->ev, -1, \
OPAL_EV_WRITE, (cbfunc), pop); \
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
} while(0);
#endif /* _MCA_OOB_USOCK_PEER_H_ */

52
orte/mca/oob/usock/oob_usock_ping.h Обычный файл
Просмотреть файл

@ -0,0 +1,52 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_PING_H_
#define _MCA_OOB_USOCK_PING_H_
#include "orte_config.h"
#include "opal/mca/event/event.h"
#include "oob_usock.h"
#include "oob_usock_sendrecv.h"
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_process_name_t peer;
} mca_oob_usock_ping_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_ping_t);
#define ORTE_ACTIVATE_USOCK_PING(p, cbfunc) \
do { \
mca_oob_usock_ping_t *pop; \
pop = OBJ_NEW(mca_oob_usock_ping_t); \
pop->peer.jobid = (p)->jobid; \
pop->peer.vpid = (p)->vpid; \
opal_event_set(mca_oob_usock_module.ev_base, &pop->ev, -1, \
OPAL_EV_WRITE, (cbfunc), pop); \
opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \
opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \
} while(0);
#endif /* _MCA_OOB_USOCK_PING_H_ */

608
orte/mca/oob/usock/oob_usock_sendrecv.c Обычный файл
Просмотреть файл

@ -0,0 +1,608 @@
/*
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
*/
#include "orte_config.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <fcntl.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_NET_UIO_H
#include <net/uio.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "opal/opal_socket_errno.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#include "opal_stdint.h"
#include "opal/types.h"
#include "opal/mca/backtrace/backtrace.h"
#include "opal/util/output.h"
#include "opal/util/net.h"
#include "opal/util/error.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/event/event.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/state/state.h"
#include "orte/runtime/orte_wait.h"
#include "oob_usock.h"
#include "orte/mca/oob/usock/oob_usock_component.h"
#include "orte/mca/oob/usock/oob_usock_peer.h"
#include "orte/mca/oob/usock/oob_usock_connection.h"
static int send_bytes(mca_oob_usock_peer_t* peer)
{
mca_oob_usock_send_t* msg = peer->send_msg;
int rc;
while (0 < msg->sdbytes) {
rc = write(peer->sd, msg->sdptr, msg->sdbytes);
if (rc < 0) {
if (opal_socket_errno == EINTR) {
continue;
} else if (opal_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_RESOURCE_BUSY;
} else if (opal_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_WOULD_BLOCK;
}
/* we hit an error and cannot progress this message */
opal_output(0, "%s->%s mca_oob_usock_msg_send_bytes: write failed: %s (%d) [sd = %d]",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno,
peer->sd);
return ORTE_ERR_COMM_FAILURE;
}
/* update location */
msg->sdbytes -= rc;
msg->sdptr += rc;
}
/* we sent the full data block */
return ORTE_SUCCESS;
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
void mca_oob_usock_send_handler(int sd, short flags, void *cbdata)
{
mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)cbdata;
mca_oob_usock_send_t* msg = peer->send_msg;
int rc;
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:send_handler called to send to peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
switch (peer->state) {
case MCA_OOB_USOCK_CONNECTING:
case MCA_OOB_USOCK_CLOSED:
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:send_handler %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
mca_oob_usock_state_print(peer->state));
mca_oob_usock_peer_complete_connect(peer);
/* de-activate the send event until the connection
* handshake completes
*/
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
break;
case MCA_OOB_USOCK_CONNECTED:
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s usock:send_handler SENDING TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
if (NULL != msg) {
/* if the header hasn't been completely sent, send it */
if (!msg->hdr_sent) {
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
/* header is completely sent */
msg->hdr_sent = true;
/* setup to send the data */
if (NULL == msg->msg) {
/* this was a zero-byte msg - nothing more to do */
OBJ_RELEASE(msg);
peer->send_msg = NULL;
goto next;
} else if (NULL != msg->msg->buffer) {
/* send the buffer data as a single block */
msg->sdptr = msg->msg->buffer->base_ptr;
msg->sdbytes = msg->msg->buffer->bytes_used;
} else if (NULL != msg->msg->iov) {
/* start with the first iovec */
msg->sdptr = msg->msg->iov[0].iov_base;
msg->sdbytes = msg->msg->iov[0].iov_len;
msg->iovnum = 0;
} else {
msg->sdptr = msg->msg->data;
msg->sdbytes = msg->msg->count;
}
/* fall thru and let the send progress */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: unable to send header",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
msg->msg->status = rc;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
goto next;
}
}
/* progress the data transmission */
if (msg->hdr_sent) {
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
/* this block is complete */
if (NULL != msg->msg->buffer) {
/* we are done - notify the RML */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
msg->hdr.nbytes, peer->sd);
msg->msg->status = ORTE_SUCCESS;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else if (NULL != msg->msg->data) {
/* this was a relay message - nothing more to do */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
msg->hdr.nbytes, peer->sd);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else {
/* rotate to the next iovec */
msg->iovnum++;
if (msg->iovnum < msg->msg->count) {
msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
/* exit this event to give the event lib
* a chance to progress any other pending
* actions
*/
return;
} else {
/* this message is complete - notify the RML */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
msg->hdr.nbytes, peer->sd);
msg->msg->status = ORTE_SUCCESS;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
}
}
/* fall thru to queue the next message */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: unable to send message ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
msg->msg->status = rc;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
ORTE_FORCED_TERMINATE(1);
return;
}
}
next:
/* if current message completed - progress any pending sends by
* moving the next in the queue into the "on-deck" position. Note
* that this doesn't mean we send the message right now - we will
* wait for another send_event to fire before doing so. This gives
* us a chance to service any pending recvs.
*/
peer->send_msg = (mca_oob_usock_send_t*)
opal_list_remove_first(&peer->send_queue);
}
/* if nothing else to do unregister for send event notifications */
if (NULL == peer->send_msg && peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
break;
default:
opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: invalid connection state (%d) on socket %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->state, peer->sd);
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
break;
}
}
static int read_bytes(mca_oob_usock_peer_t* peer)
{
int rc;
/* read until all bytes recvd or error */
while (0 < peer->recv_msg->rdbytes) {
rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes);
if (rc < 0) {
if(opal_socket_errno == EINTR) {
continue;
} else if (opal_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_RESOURCE_BUSY;
} else if (opal_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_WOULD_BLOCK;
}
/* we hit an error and cannot progress this message - report
* the error back to the RML and let the caller know
* to abort this message
*/
opal_output_verbose(OOB_USOCK_DEBUG_FAIL, orte_oob_base_framework.framework_output,
"%s-%s mca_oob_usock_msg_recv: readv failed: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno);
// mca_oob_usock_peer_close(peer);
// if (NULL != mca_oob_usock.oob_exception_callback) {
// mca_oob_usock.oob_exception_callback(&peer->name, ORTE_RML_PEER_DISCONNECTED);
//}
return ORTE_ERR_COMM_FAILURE;
} else if (rc == 0) {
/* the remote peer closed the connection - report that condition
* and let the caller know
*/
opal_output_verbose(OOB_USOCK_DEBUG_FAIL, orte_oob_base_framework.framework_output,
"%s-%s mca_oob_usock_msg_recv: peer closed connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
/* stop all events */
if (peer->recv_ev_active) {
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}
if (peer->timer_ev_active) {
opal_event_del(&peer->timer_event);
peer->timer_ev_active = false;
}
if (peer->send_ev_active) {
opal_event_del(&peer->send_event);
peer->send_ev_active = false;
}
if (NULL != peer->recv_msg) {
OBJ_RELEASE(peer->recv_msg);
peer->recv_msg = NULL;
}
mca_oob_usock_peer_close(peer);
//if (NULL != mca_oob_usock.oob_exception_callback) {
// mca_oob_usock.oob_exception_callback(&peer->peer_name, ORTE_RML_PEER_DISCONNECTED);
//}
return ORTE_ERR_WOULD_BLOCK;
}
/* we were able to read something, so adjust counters and location */
peer->recv_msg->rdbytes -= rc;
peer->recv_msg->rdptr += rc;
}
/* we read the full data block */
return ORTE_SUCCESS;
}
/*
* Dispatch to the appropriate action routine based on the state
* of the connection with the peer.
*/
void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata)
{
mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)cbdata;
int rc;
orte_rml_send_t *snd;
if (orte_abnormal_term_ordered) {
return;
}
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler called for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
switch (peer->state) {
case MCA_OOB_USOCK_CONNECT_ACK:
if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer))) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler starting send/recv events",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* we connected! Start the send/recv events */
if (!peer->recv_ev_active) {
opal_event_add(&peer->recv_event, 0);
peer->recv_ev_active = true;
}
if (peer->timer_ev_active) {
opal_event_del(&peer->timer_event);
peer->timer_ev_active = false;
}
/* if there is a message waiting to be sent, queue it */
if (NULL == peer->send_msg) {
peer->send_msg = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue);
}
if (NULL != peer->send_msg && !peer->send_ev_active) {
opal_event_add(&peer->send_event, 0);
peer->send_ev_active = true;
}
/* update our state */
peer->state = MCA_OOB_USOCK_CONNECTED;
} else {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name));
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
ORTE_FORCED_TERMINATE(1);
return;
}
break;
case MCA_OOB_USOCK_CONNECTED:
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler CONNECTED",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* allocate a new message and setup for recv */
if (NULL == peer->recv_msg) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler allocate new recv msg",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
peer->recv_msg = OBJ_NEW(mca_oob_usock_recv_t);
if (NULL == peer->recv_msg) {
opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: unable to allocate recv message\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
return;
}
/* start by reading the header */
peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
peer->recv_msg->rdbytes = sizeof(mca_oob_usock_hdr_t);
}
/* if the header hasn't been completely read, read it */
if (!peer->recv_msg->hdr_recvd) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler read hdr",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
/* completed reading the header */
peer->recv_msg->hdr_recvd = true;
/* if this is a zero-byte message, then we are done */
if (0 == peer->recv_msg->hdr.nbytes) {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag);
peer->recv_msg->data = NULL; // make sure
peer->recv_msg->rdptr = NULL;
peer->recv_msg->rdbytes = 0;
} else {
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler allocate data region of size %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)peer->recv_msg->hdr.nbytes);
/* allocate the data region */
peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
/* point to it */
peer->recv_msg->rdptr = peer->recv_msg->data;
peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
}
/* fall thru and attempt to read the data */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
/* close the connection */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s:usock:recv:handler error reading bytes - closing connection",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
mca_oob_usock_peer_close(peer);
return;
}
}
if (peer->recv_msg->hdr_recvd) {
/* continue to read the data block - we start from
* wherever we left off, which could be at the
* beginning or somewhere in the message
*/
if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
/* we recvd all of the message */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s RECVD COMPLETE MESSAGE FROM %s OF %d BYTES FOR DEST %s TAG %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer->name),
(int)peer->recv_msg->hdr.nbytes,
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
peer->recv_msg->hdr.tag);
/* am I the intended recipient? */
if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid &&
peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) {
/* yes - post it to the RML for delivery */
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s DELIVERING TO RML",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_RML_POST_MESSAGE(&peer->name, peer->recv_msg->hdr.tag,
peer->recv_msg->data,
peer->recv_msg->hdr.nbytes);
OBJ_RELEASE(peer->recv_msg);
} else {
/* no - we don't route things, so we promote this
* back to the OOB and let another transport move
* it along
*/
snd = OBJ_NEW(orte_rml_send_t);
snd->dst = peer->recv_msg->hdr.dst;
snd->origin = peer->name;
snd->tag = peer->recv_msg->hdr.tag;
snd->data = peer->recv_msg->data;
snd->count = peer->recv_msg->hdr.nbytes;
snd->cbfunc.iov = NULL;
snd->cbdata = NULL;
/* activate the OOB send state */
ORTE_OOB_SEND(snd);
/* protect the data */
peer->recv_msg->data = NULL;
/* cleanup */
OBJ_RELEASE(peer->recv_msg);
return;
}
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: unable to recv message",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
/* turn off the recv event */
opal_event_del(&peer->recv_event);
peer->recv_ev_active = false;
ORTE_FORCED_TERMINATE(1);
return;
}
}
break;
default:
opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: invalid socket state(%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
peer->state);
// mca_oob_usock_peer_close(peer);
break;
}
}
static void snd_cons(mca_oob_usock_send_t *ptr)
{
ptr->msg = NULL;
ptr->data = NULL;
ptr->hdr_sent = false;
ptr->iovnum = 0;
ptr->sdptr = NULL;
ptr->sdbytes = 0;
}
/* we don't destruct any RML msg that is
* attached to our send as the RML owns
* that memory. However, if we relay a
* msg, the data in the relay belongs to
* us and must be free'd
*/
static void snd_des(mca_oob_usock_send_t *ptr)
{
if (NULL != ptr->data) {
free(ptr->data);
}
}
OBJ_CLASS_INSTANCE(mca_oob_usock_send_t,
opal_list_item_t,
snd_cons, snd_des);
static void rcv_cons(mca_oob_usock_recv_t *ptr)
{
ptr->hdr_recvd = false;
ptr->rdptr = NULL;
ptr->rdbytes = 0;
}
OBJ_CLASS_INSTANCE(mca_oob_usock_recv_t,
opal_list_item_t,
rcv_cons, NULL);
static void err_cons(mca_oob_usock_msg_error_t *ptr)
{
ptr->rmsg = NULL;
ptr->snd = NULL;
}
OBJ_CLASS_INSTANCE(mca_oob_usock_msg_error_t,
opal_object_t,
err_cons, NULL);

249
orte/mca/oob/usock/oob_usock_sendrecv.h Обычный файл
Просмотреть файл

@ -0,0 +1,249 @@
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef _MCA_OOB_USOCK_SENDRECV_H_
#define _MCA_OOB_USOCK_SENDRECV_H_
#include "orte_config.h"
#include "opal/class/opal_list.h"
#include "orte/mca/rml/base/base.h"
#include "oob_usock.h"
#include "oob_usock_hdr.h"
/* usock structure for sending a message */
typedef struct {
opal_list_item_t super;
mca_oob_usock_hdr_t hdr;
orte_rml_send_t *msg;
char *data;
bool hdr_sent;
int iovnum;
char *sdptr;
size_t sdbytes;
} mca_oob_usock_send_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_send_t);
/* usock structure for recving a message */
typedef struct {
opal_list_item_t super;
mca_oob_usock_hdr_t hdr;
bool hdr_recvd;
char *data;
char *rdptr;
size_t rdbytes;
} mca_oob_usock_recv_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t);
/* Queue a message to be sent to a specified peer. The macro
* checks to see if a message is already in position to be
* sent - if it is, then the message provided is simply added
* to the peer's message queue. If not, then the provided message
* is placed in the "ready" position
*
* If the provided boolean is true, then the send event for the
* peer is checked and activated if not already active. This allows
* the macro to either immediately send the message, or to queue
* it as "pending" for later transmission - e.g., after the
* connection procedure is completed
*
* p => pointer to mca_oob_usock_peer_t
* s => pointer to mca_oob_usock_send_t
* f => true if send event is to be activated
*/
#define MCA_OOB_USOCK_QUEUE_MSG(p, s, f) \
do { \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] queue msg to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((s)->hdr.dst))); \
/* if there is no message on-deck, put this one there */ \
if (NULL == (p)->send_msg) { \
(p)->send_msg = (s); \
} else { \
/* add it to the queue */ \
opal_list_append(&(p)->send_queue, &(s)->super); \
} \
if ((f)) { \
/* if we aren't connected, then start connecting */ \
if (MCA_OOB_USOCK_CONNECTED != (p)->state) { \
(p)->state = MCA_OOB_USOCK_CONNECTING; \
ORTE_ACTIVATE_USOCK_CONN_STATE((p), \
mca_oob_usock_peer_try_connect); \
} else { \
/* ensure the send event is active */ \
if (!(p)->send_ev_active) { \
opal_event_add(&(p)->send_event, 0); \
(p)->send_ev_active = true; \
} \
} \
} \
}while(0);
/* queue a message to be sent by one of our modules - must
* provide the following params:
*
* m - the RML message to be sent
* p - the final recipient
*/
#define MCA_OOB_USOCK_QUEUE_SEND(m, p) \
do { \
mca_oob_usock_send_t *msg; \
int i; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] queue send to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((m)->dst))); \
msg = OBJ_NEW(mca_oob_usock_send_t); \
/* setup the header */ \
msg->hdr.dst = (m)->dst; \
msg->hdr.type = MCA_OOB_USOCK_USER; \
msg->hdr.tag = (m)->tag; \
/* point to the actual message */ \
msg->msg = (m); \
/* set the total number of bytes to be sent */ \
if (NULL != (m)->buffer) { \
msg->hdr.nbytes = (m)->buffer->bytes_used; \
} else if (NULL != (m)->iov) { \
msg->hdr.nbytes = 0; \
for (i=0; i < (m)->count; i++) { \
msg->hdr.nbytes += (m)->iov[i].iov_len; \
} \
} else { \
msg->hdr.nbytes = (m)->count; \
} \
/* start the send with the header */ \
msg->sdptr = (char*)&msg->hdr; \
msg->sdbytes = sizeof(mca_oob_usock_hdr_t); \
/* add to the msg queue for this peer */ \
MCA_OOB_USOCK_QUEUE_MSG((p), msg, true); \
}while(0);
/* queue a message to be sent by one of our modules upon completing
* the connection process - must provide the following params:
*
* m - the RML message to be sent
* p - the final recipient
*/
#define MCA_OOB_USOCK_QUEUE_PENDING(m, p) \
do { \
mca_oob_usock_send_t *msg; \
int i; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] queue pending to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((m)->dst))); \
msg = OBJ_NEW(mca_oob_usock_send_t); \
/* setup the header */ \
msg->hdr.dst = (m)->dst; \
msg->hdr.type = MCA_OOB_USOCK_USER; \
msg->hdr.tag = (m)->tag; \
/* point to the actual message */ \
msg->msg = (m); \
/* set the total number of bytes to be sent */ \
if (NULL != (m)->buffer) { \
msg->hdr.nbytes = (m)->buffer->bytes_used; \
} else if (NULL != (m)->iov) { \
msg->hdr.nbytes = 0; \
for (i=0; i < (m)->count; i++) { \
msg->hdr.nbytes += (m)->iov[i].iov_len; \
} \
} else { \
msg->hdr.nbytes = (m)->count; \
} \
/* start the send with the header */ \
msg->sdptr = (char*)&msg->hdr; \
msg->sdbytes = sizeof(mca_oob_usock_hdr_t); \
/* add to the msg queue for this peer */ \
MCA_OOB_USOCK_QUEUE_MSG((p), msg, false); \
}while(0);
/* State machine for processing message */
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_rml_send_t *msg;
} mca_oob_usock_msg_op_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_msg_op_t);
#define ORTE_ACTIVATE_USOCK_POST_SEND(ms, cbfunc) \
do { \
mca_oob_usock_msg_op_t *mop; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] post send to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT(&((ms)->dst))); \
mop = OBJ_NEW(mca_oob_usock_msg_op_t); \
mop->msg = (ms); \
opal_event_set(mca_oob_usock_module.ev_base, &mop->ev, -1, \
OPAL_EV_WRITE, (cbfunc), mop); \
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
} while(0);
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_rml_send_t *rmsg;
mca_oob_usock_send_t *snd;
orte_process_name_t hop;
} mca_oob_usock_msg_error_t;
OBJ_CLASS_DECLARATION(mca_oob_usock_msg_error_t);
/* macro for reporting delivery errors back to the
* component for error handling
*
* s -> mca_oob_usock_send_t that failed (can be NULL)
* r -> orte_rml_send_t that failed (can be NULL)
* h -> process name for the next recipient
* cbfunc -> function to handle the callback
*/
#define ORTE_ACTIVATE_USOCK_MSG_ERROR(s, r, h, cbfunc) \
do { \
mca_oob_usock_msg_error_t *mop; \
opal_output_verbose(5, orte_oob_base_framework.framework_output, \
"%s:[%s:%d] post msg error to %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__, \
ORTE_NAME_PRINT((h))); \
mop = OBJ_NEW(mca_oob_usock_msg_error_t); \
if (NULL != (s)) { \
mop->snd = (s); \
} else if (NULL != (r)) { \
/* use a proxy so we can pass NULL into the macro */ \
mop->rmsg = (r); \
} \
mop->hop.jobid = (h)->jobid; \
mop->hop.vpid = (h)->vpid; \
opal_event_set(orte_event_base, &mop->ev, -1, \
OPAL_EV_WRITE, (cbfunc), mop); \
opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \
opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \
} while(0);
#endif /* _MCA_OOB_USOCK_SENDRECV_H_ */