54b2cf747e
The commit has been tested for C/R and Cray operations, and on Odin (SLURM, rsh) and RoadRunner (TM). I tried to update all environments, but obviously could not test them. I know that Windows needs some work, and have highlighted what is know to be needed in the odls process component. This represents a lot of work by Brian, Tim P, Josh, and myself, with much advice from Jeff and others. For posterity, I have appended a copy of the email describing the work that was done: As we have repeatedly noted, the modex operation in MPI_Init is the single greatest consumer of time during startup. To-date, we have executed that operation as an ORTE stage gate that held the process until a startup message containing all required modex (and OOB contact info - see #3 below) info could be sent to it. Each process would send its data to the HNP's registry, which assembled and sent the message when all processes had reported in. In addition, ORTE had taken responsibility for monitoring process status as it progressed through a series of "stage gates". The process reported its status at each gate, and ORTE would then send a "release" message once all procs had reported in. The incoming changes revamp these procedures in three ways: 1. eliminating the ORTE stage gate system and cleanly delineating responsibility between the OMPI and ORTE layers for MPI init/finalize. The modex stage gate (STG1) has been replaced by a collective operation in the modex itself that performs an allgather on the required modex info. The allgather is implemented using the orte_grpcomm framework since the BTL's are not active at that point. At the moment, the grpcomm framework only has a "basic" component analogous to OMPI's "basic" coll framework - I would recommend that the MPI team create additional, more advanced components to improve performance of this step. The other stage gates have been replaced by orte_grpcomm barrier functions. We tried to use MPI barriers instead (since the BTL's are active at that point), but - as we discussed on the telecon - these are not currently true barriers so the job would hang when we fell through while messages were still in process. Note that the grpcomm barrier doesn't actually resolve that problem, but Brian has pointed out that we are unlikely to ever see it violated. Again, you might want to spend a little time on an advanced barrier algorithm as the one in "basic" is very simplistic. Summarizing this change: ORTE no longer tracks process state nor has direct responsibility for synchronizing jobs. This is now done via collective operations within the MPI layer, albeit using ORTE collective communication services. I -strongly- urge the MPI team to implement advanced collective algorithms to improve the performance of this critical procedure. 2. reducing the volume of data exchanged during modex. Data in the modex consisted of the process name, the name of the node where that process is located (expressed as a string), plus a string representation of all contact info. The nodename was required in order for the modex to determine if the process was local or not - in addition, some people like to have it to print pretty error messages when a connection failed. The size of this data has been reduced in three ways: (a) reducing the size of the process name itself. The process name consisted of two 32-bit fields for the jobid and vpid. This is far larger than any current system, or system likely to exist in the near future, can support. Accordingly, the default size of these fields has been reduced to 16-bits, which means you can have 32k procs in each of 32k jobs. Since the daemons must have a vpid, and we require one daemon/node, this also restricts the default configuration to 32k nodes. To support any future "mega-clusters", a configuration option --enable-jumbo-apps has been added. This option increases the jobid and vpid field sizes to 32-bits. Someday, if necessary, someone can add yet another option to increase them to 64-bits, I suppose. (b) replacing the string nodename with an integer nodeid. Since we have one daemon/node, the nodeid corresponds to the local daemon's vpid. This replaces an often lengthy string with only 2 (or at most 4) bytes, a substantial reduction. (c) when the mca param requesting that nodenames be sent to support pretty error messages, a second mca param is now used to request FQDN - otherwise, the domain name is stripped (by default) from the message to save space. If someone wants to combine those into a single param somehow (perhaps with an argument?), they are welcome to do so - I didn't want to alter what people are already using. While these may seem like small savings, they actually amount to a significant impact when aggregated across the entire modex operation. Since every proc must receive the modex data regardless of the collective used to send it, just reducing the size of the process name removes nearly 400MBytes of communication from a 32k proc job (admittedly, much of this comm may occur in parallel). So it does add up pretty quickly. 3. routing RML messages to reduce connections. The default messaging system remains point-to-point - i.e., each proc opens a socket to every proc it communicates with and sends its messages directly. A new option uses the orteds as routers - i.e., each proc only opens a single socket to its local orted. All messages are sent from the proc to the orted, which forwards the message to the orted on the node where the intended recipient proc is located - that orted then forwards the message to its local proc (the recipient). This greatly reduces the connection storm we have encountered during startup. It also has the benefit of removing the sharing of every proc's OOB contact with every other proc. The orted routing tables are populated during launch since every orted gets a map of where every proc is being placed. Each proc, therefore, only needs to know the contact info for its local daemon, which is passed in via the environment when the proc is fork/exec'd by the daemon. This alone removes ~50 bytes/process of communication that was in the current STG1 startup message - so for our 32k proc job, this saves us roughly 32k*50 = 1.6MBytes sent to 32k procs = 51GBytes of messaging. Note that you can use the new routing method by specifying -mca routed tree - if you so desire. This mode will become the default at some point in the future. There are a few minor additional changes in the commit that I'll just note in passing: * propagation of command line mca params to the orteds - fixes ticket #1073. See note there for details. * requiring of "finalize" prior to "exit" for MPI procs - fixes ticket #1144. See note there for details. * cleanup of some stale header files This commit was SVN r16364.
1144 строки
42 KiB
C
1144 строки
42 KiB
C
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2006 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*
|
|
* In windows, many of the socket functions return an EWOULDBLOCK
|
|
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
|
|
* verified that this will \ not conflict with other error codes that
|
|
* are returned by these functions \ under UNIX/Linux environments
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif
|
|
#include <fcntl.h>
|
|
#ifdef HAVE_SYS_UIO_H
|
|
#include <sys/uio.h>
|
|
#endif
|
|
#ifdef HAVE_NET_UIO_H
|
|
#include <net/uio.h>
|
|
#endif
|
|
#ifdef HAVE_SYS_TYPES_H
|
|
#include <sys/types.h>
|
|
#endif
|
|
#include "opal/opal_socket_errno.h"
|
|
#ifdef HAVE_NETINET_IN_H
|
|
#include <netinet/in.h>
|
|
#endif
|
|
#ifdef HAVE_ARPA_INET_H
|
|
#include <arpa/inet.h>
|
|
#endif
|
|
#ifdef HAVE_NETINET_TCP_H
|
|
#include <netinet/tcp.h>
|
|
#endif
|
|
|
|
#include "opal/mca/backtrace/backtrace.h"
|
|
#include "orte/class/orte_proc_table.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/util/if.h"
|
|
#include "opal/util/net.h"
|
|
#include "orte/util/univ_info.h"
|
|
|
|
#include "orte/mca/gpr/gpr.h"
|
|
#include "orte/mca/ns/ns.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
|
|
#include "oob_tcp.h"
|
|
#include "oob_tcp_peer.h"
|
|
|
|
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
|
|
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
|
|
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer, int sd);
|
|
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
|
|
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);
|
|
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer, int sd);
|
|
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer, int sd);
|
|
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size);
|
|
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size);
|
|
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
|
|
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
|
|
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);
|
|
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
|
|
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
mca_oob_tcp_peer_t,
|
|
opal_free_list_item_t,
|
|
mca_oob_tcp_peer_construct,
|
|
mca_oob_tcp_peer_destruct);
|
|
|
|
|
|
/*
|
|
* This is the constructor function for the mca_oob_tcp_peer
|
|
* struct. Note that this function and OBJ_NEW should NEVER
|
|
* be called directly. Instead, use mca_oob_tcp_add_peer
|
|
*
|
|
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized
|
|
* @retval none
|
|
*/
|
|
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
OBJ_CONSTRUCT(&(peer->peer_send_queue), opal_list_t);
|
|
OBJ_CONSTRUCT(&(peer->peer_lock), opal_mutex_t);
|
|
memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));
|
|
memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));
|
|
peer->peer_sd = -1;
|
|
peer->peer_current_af = AF_UNSPEC;
|
|
memset(&peer->peer_timer_event, 0, sizeof(peer->peer_timer_event));
|
|
opal_evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);
|
|
}
|
|
|
|
/*
|
|
* This is the destructor function for the mca_oob_tcp_peer
|
|
* struct. Note that this function and OBJ_RELEASE should NEVER
|
|
* be called directly. Instead, use mca_oob_tcp_del_peer
|
|
*
|
|
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed
|
|
* @retval none
|
|
*/
|
|
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
|
|
{
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
OBJ_DESTRUCT(&(peer->peer_send_queue));
|
|
OBJ_DESTRUCT(&(peer->peer_lock));
|
|
}
|
|
|
|
/*
|
|
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
|
|
*/
|
|
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));
|
|
memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));
|
|
|
|
if (peer->peer_sd >= 0) {
|
|
opal_event_set(
|
|
&peer->peer_recv_event,
|
|
peer->peer_sd,
|
|
OPAL_EV_READ|OPAL_EV_PERSIST,
|
|
mca_oob_tcp_peer_recv_handler,
|
|
peer);
|
|
opal_event_set(
|
|
&peer->peer_send_event,
|
|
peer->peer_sd,
|
|
OPAL_EV_WRITE|OPAL_EV_PERSIST,
|
|
mca_oob_tcp_peer_send_handler,
|
|
peer);
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* Initiate the appropriate action based on the state of the connection
|
|
* to the peer.
|
|
*
|
|
*/
|
|
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
|
{
|
|
int rc = ORTE_SUCCESS;
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
switch(peer->peer_state) {
|
|
case MCA_OOB_TCP_CONNECTING:
|
|
case MCA_OOB_TCP_CONNECT_ACK:
|
|
case MCA_OOB_TCP_CLOSED:
|
|
case MCA_OOB_TCP_RESOLVE:
|
|
/*
|
|
* queue the message and attempt to resolve the peer address
|
|
*/
|
|
opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);
|
|
if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
|
|
peer->peer_state = MCA_OOB_TCP_RESOLVE;
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
return mca_oob_tcp_resolve(peer);
|
|
}
|
|
break;
|
|
case MCA_OOB_TCP_FAILED:
|
|
rc = ORTE_ERR_UNREACH;
|
|
break;
|
|
case MCA_OOB_TCP_CONNECTED:
|
|
/*
|
|
* start the message and queue if not completed
|
|
*/
|
|
if (NULL != peer->peer_send_msg) {
|
|
opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);
|
|
} else {
|
|
/*if the send does not complete */
|
|
if(!mca_oob_tcp_msg_send_handler(msg, peer)) {
|
|
peer->peer_send_msg = msg;
|
|
opal_event_add(&peer->peer_send_event, 0);
|
|
} else {
|
|
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
return rc;
|
|
}
|
|
|
|
/*
|
|
* Lookup a peer by name, create one if it doesn't exist.
|
|
* @param name Peers globally unique identifier.
|
|
* @retval Pointer to the newly created struture or NULL on error.
|
|
*/
|
|
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name)
|
|
{
|
|
int rc;
|
|
mca_oob_tcp_peer_t * peer, *old;
|
|
if (NULL == name) { /* can't look this one up */
|
|
return NULL;
|
|
}
|
|
|
|
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
|
|
peer = (mca_oob_tcp_peer_t*)orte_hash_table_get_proc(
|
|
&mca_oob_tcp_component.tcp_peers, name);
|
|
if (NULL != peer && 0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, name)) {
|
|
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
return peer;
|
|
}
|
|
|
|
/* allocate from free list */
|
|
MCA_OOB_TCP_PEER_ALLOC(peer, rc);
|
|
if(NULL == peer) {
|
|
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
return NULL;
|
|
}
|
|
|
|
/* initialize peer state */
|
|
peer->peer_name = *name;
|
|
peer->peer_addr = NULL;
|
|
peer->peer_sd = -1;
|
|
peer->peer_current_af = AF_UNSPEC;
|
|
peer->peer_state = MCA_OOB_TCP_CLOSED;
|
|
peer->peer_recv_msg = NULL;
|
|
peer->peer_send_msg = NULL;
|
|
peer->peer_retries = 0;
|
|
|
|
/* add to lookup table */
|
|
if(ORTE_SUCCESS != orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peers,
|
|
&peer->peer_name, peer)) {
|
|
MCA_OOB_TCP_PEER_RETURN(peer);
|
|
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
return NULL;
|
|
}
|
|
|
|
/* if the peer list is over the maximum size, remove one unsed peer */
|
|
opal_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (opal_list_item_t *) peer);
|
|
if(mca_oob_tcp_component.tcp_peer_limit > 0 &&
|
|
(int)opal_list_get_size(&mca_oob_tcp_component.tcp_peer_list) >
|
|
mca_oob_tcp_component.tcp_peer_limit) {
|
|
old = (mca_oob_tcp_peer_t *)
|
|
opal_list_get_last(&mca_oob_tcp_component.tcp_peer_list);
|
|
while(1) {
|
|
if(0 == opal_list_get_size(&(old->peer_send_queue)) &&
|
|
NULL == peer->peer_recv_msg) {
|
|
opal_list_remove_item(&mca_oob_tcp_component.tcp_peer_list,
|
|
(opal_list_item_t *) old);
|
|
MCA_OOB_TCP_PEER_RETURN(old);
|
|
break;
|
|
} else {
|
|
old = (mca_oob_tcp_peer_t *) opal_list_get_prev(old);
|
|
if(opal_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (opal_list_item_t*)old) {
|
|
/* we tried, but we couldn't find one that was valid to get rid
|
|
* of. Oh well. */
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
|
|
return peer;
|
|
}
|
|
|
|
|
|
static int
|
|
mca_oob_tcp_peer_create_socket(mca_oob_tcp_peer_t* peer,
|
|
uint16_t af_family)
|
|
{
|
|
int flags;
|
|
|
|
if (peer->peer_current_af == af_family && peer->peer_sd > 0) {
|
|
return ORTE_SUCCESS;
|
|
} else if (peer->peer_sd > 0) {
|
|
int state = peer->peer_state;
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
peer->peer_state = (mca_oob_tcp_state_t) state;
|
|
}
|
|
|
|
peer->peer_sd = socket(af_family, SOCK_STREAM, 0);
|
|
peer->peer_current_af = af_family;
|
|
|
|
if (peer->peer_sd < 0) {
|
|
opal_output(0,
|
|
"%s-%s mca_oob_tcp_peer_create_socket: socket() failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
|
|
/* setup socket options */
|
|
mca_oob_tcp_set_socket_options(peer->peer_sd);
|
|
|
|
/* setup event callbacks */
|
|
mca_oob_tcp_peer_event_init(peer);
|
|
|
|
/* setup the socket as non-blocking */
|
|
if (peer->peer_sd >= 0) {
|
|
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
} else {
|
|
flags |= O_NONBLOCK;
|
|
if(fcntl(peer->peer_sd, F_SETFL, flags) < 0)
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
}
|
|
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
/*
|
|
* Try connecting to a peer using all the addresses that peer exported.
|
|
*/
|
|
static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
struct sockaddr_storage inaddr;
|
|
int rc;
|
|
opal_socklen_t addrlen = 0;
|
|
|
|
do {
|
|
/* pick an address in round-robin fashion from the list exported by the peer */
|
|
if(ORTE_SUCCESS != (rc = mca_oob_tcp_addr_get_next(peer->peer_addr, (struct sockaddr*) &inaddr))) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_try_connect: "
|
|
"mca_oob_tcp_addr_get_next failed with error=%d",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
rc);
|
|
mca_oob_tcp_peer_close(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
|
|
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_try_connect: "
|
|
"connecting port %d to: %s:%d\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
/* Bug, FIXME: output tcp6_listen_port for AF_INET6 */
|
|
ntohs(mca_oob_tcp_component.tcp_listen_port),
|
|
opal_net_get_hostname((struct sockaddr*) &inaddr),
|
|
opal_net_get_port((struct sockaddr*) &inaddr));
|
|
}
|
|
|
|
rc = mca_oob_tcp_peer_create_socket(peer, inaddr.ss_family);
|
|
if (ORTE_SUCCESS != rc) {
|
|
struct timeval tv = { 1,0 };
|
|
opal_evtimer_add(&peer->peer_timer_event, &tv);
|
|
return rc;
|
|
}
|
|
|
|
if (AF_INET == inaddr.ss_family) {
|
|
addrlen = sizeof(struct sockaddr_in);
|
|
} else if (AF_INET6 == inaddr.ss_family) {
|
|
addrlen = sizeof(struct sockaddr_in6);
|
|
}
|
|
|
|
if (connect(peer->peer_sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
|
|
/* non-blocking so wait for completion */
|
|
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
|
|
opal_event_add(&peer->peer_send_event, 0);
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_try_connect: "
|
|
"connect to %s:%d failed: %s (%d)",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
opal_net_get_hostname((struct sockaddr*) &inaddr),
|
|
opal_net_get_port((struct sockaddr*) &inaddr),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
continue;
|
|
}
|
|
|
|
/* send our globally unique process identifier to the peer */
|
|
if((rc = mca_oob_tcp_peer_send_connect_ack(peer, peer->peer_sd)) == ORTE_SUCCESS) {
|
|
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
|
|
opal_event_add(&peer->peer_recv_event, 0);
|
|
return ORTE_SUCCESS;
|
|
} else {
|
|
opal_output(0,
|
|
"%s-%s mca_oob_tcp_peer_try_connect: "
|
|
"mca_oob_tcp_peer_send_connect_ack to %s:%d failed: %s (%d)",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
opal_net_get_hostname((struct sockaddr*) &inaddr),
|
|
opal_net_get_port((struct sockaddr*) &inaddr),
|
|
opal_strerror(rc),
|
|
rc);
|
|
}
|
|
} while(peer->peer_addr->addr_next != 0);
|
|
|
|
/* None of the interfaces worked.. */
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_try_connect: "
|
|
"connect to %s:%d failed, connecting over all interfaces failed!",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
opal_net_get_hostname((struct sockaddr*) &inaddr),
|
|
opal_net_get_port((struct sockaddr*) &inaddr));
|
|
mca_oob_tcp_peer_close(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
|
|
|
|
/*
|
|
* Start a connection to the peer. This will likely not complete,
|
|
* as the socket is set to non-blocking, so register for event
|
|
* notification of connect completion. On connection we send
|
|
* our globally unique process identifier to the peer and wait for
|
|
* the peers response.
|
|
*/
|
|
|
|
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
/* create socket */
|
|
peer->peer_state = MCA_OOB_TCP_CONNECTING;
|
|
|
|
|
|
/*
|
|
* We should parse all the IP addresses exported by the peer and
|
|
* try to connect to each of them.
|
|
*/
|
|
return mca_oob_tcp_peer_try_connect(peer);
|
|
}
|
|
|
|
|
|
/*
|
|
* Check the status of the connection. If the connection failed, will retry
|
|
* later. Otherwise, send this processes identifier to the peer on the
|
|
* newly connected socket.
|
|
*/
|
|
static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer, int sd)
|
|
{
|
|
int so_error = 0;
|
|
opal_socklen_t so_length = sizeof(so_error);
|
|
|
|
/* unregister from receiving event notifications */
|
|
opal_event_del(&peer->peer_send_event);
|
|
|
|
/* check connect completion status */
|
|
if(getsockopt(sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_complete_connect: getsockopt() failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
mca_oob_tcp_peer_close(peer);
|
|
return;
|
|
}
|
|
|
|
if(so_error == EINPROGRESS) {
|
|
opal_event_add(&peer->peer_send_event, 0);
|
|
return;
|
|
} else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
|
|
struct timeval tv = { 1,0 };
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_complete_connect: "
|
|
"connection failed: %s (%d) - retrying\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(so_error),
|
|
so_error);
|
|
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_complete_connect: "
|
|
"sending ack, %d",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)), so_error);
|
|
}
|
|
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
opal_evtimer_add(&peer->peer_timer_event, &tv);
|
|
return;
|
|
} else if(so_error != 0) {
|
|
/* No need to worry about the return code here - we return regardless
|
|
at this point, and if an error did occur a message has already been
|
|
printed for the user */
|
|
mca_oob_tcp_peer_try_connect(peer);
|
|
return;
|
|
}
|
|
|
|
if (mca_oob_tcp_peer_send_connect_ack(peer, sd) == ORTE_SUCCESS) {
|
|
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
|
|
opal_event_add(&peer->peer_recv_event, 0);
|
|
} else {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_complete_connect: unable to send connect ack.",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)));
|
|
mca_oob_tcp_peer_close(peer);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Setup peer state to reflect that connection has been established,
|
|
* and start any pending sends.
|
|
*/
|
|
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer, int sd)
|
|
{
|
|
opal_event_del(&peer->peer_timer_event);
|
|
peer->peer_state = MCA_OOB_TCP_CONNECTED;
|
|
peer->peer_retries = 0;
|
|
|
|
/* Since we have a direct connection established to this peer, use
|
|
the connection as a direct route between peers */
|
|
orte_routed.update_route(&peer->peer_name, &peer->peer_name);
|
|
|
|
if(opal_list_get_size(&peer->peer_send_queue) > 0) {
|
|
if(NULL == peer->peer_send_msg) {
|
|
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
|
|
opal_list_remove_first(&peer->peer_send_queue);
|
|
}
|
|
opal_event_add(&peer->peer_send_event, 0);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Remove any event registrations associated with the socket
|
|
* and update the peer state to reflect the connection has
|
|
* been closed.
|
|
*/
|
|
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_close(%p) sd %d state %d\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
(void*)peer,
|
|
peer->peer_sd,
|
|
peer->peer_state);
|
|
}
|
|
|
|
/* if we lose the connection to the seed - abort */
|
|
if (0 == orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, ORTE_PROC_MY_HNP)) {
|
|
/* If we are not already inside orte_finalize, then call abort */
|
|
if (ORTE_UNIVERSE_STATE_FINALIZE > orte_universe_info.state) {
|
|
/* Should free the peer lock before we abort so we don't
|
|
* get stuck in the orte_wait_kill when receiving messages in the
|
|
* tcp OOB. */
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
orte_errmgr.error_detected(1, "OOB: Connection to HNP lost", NULL);
|
|
}
|
|
}
|
|
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
}
|
|
|
|
void mca_oob_tcp_peer_shutdown(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
/* giving up and cleanup any pending messages */
|
|
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
|
|
mca_oob_tcp_msg_t *msg;
|
|
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_shutdown: retries exceeded",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)));
|
|
|
|
/* There are cases during the initial connection setup where
|
|
the peer_send_msg is NULL but there are things in the queue
|
|
-- handle that case */
|
|
if (NULL != (msg = peer->peer_send_msg)) {
|
|
msg->msg_complete = true;
|
|
msg->msg_rc = ORTE_ERR_UNREACH;
|
|
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
|
|
}
|
|
peer->peer_send_msg = NULL;
|
|
while (NULL !=
|
|
(msg = (mca_oob_tcp_msg_t*)opal_list_remove_first(&peer->peer_send_queue))) {
|
|
msg->msg_complete = true;
|
|
msg->msg_rc = ORTE_ERR_UNREACH;
|
|
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
|
|
}
|
|
|
|
/* We were unsuccessful in establishing a connection, and are
|
|
not likely to suddenly become successful, so abort the
|
|
whole thing */
|
|
peer->peer_state = MCA_OOB_TCP_FAILED;
|
|
}
|
|
|
|
if (peer->peer_sd >= 0) {
|
|
opal_event_del(&peer->peer_recv_event);
|
|
opal_event_del(&peer->peer_send_event);
|
|
CLOSE_THE_SOCKET(peer->peer_sd);
|
|
peer->peer_sd = -1;
|
|
peer->peer_current_af = AF_UNSPEC;
|
|
}
|
|
|
|
opal_event_del(&peer->peer_timer_event);
|
|
peer->peer_state = MCA_OOB_TCP_CLOSED;
|
|
}
|
|
|
|
/*
|
|
* Send the globally unique identifier for this process to a peer on
|
|
* a newly connected socket.
|
|
*/
|
|
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer, int sd)
|
|
{
|
|
/* send process identifier of self and peer - note that we may
|
|
* have assigned the peer a unique process name - if it came up
|
|
* without one.
|
|
*/
|
|
mca_oob_tcp_hdr_t hdr;
|
|
memset(&hdr,0,sizeof(hdr));
|
|
if (NULL == orte_process_info.my_name) { /* my name isn't defined yet */
|
|
hdr.msg_src = *ORTE_NAME_INVALID;
|
|
} else {
|
|
hdr.msg_src = *(orte_process_info.my_name);
|
|
}
|
|
hdr.msg_dst = peer->peer_name;
|
|
hdr.msg_type = MCA_OOB_TCP_CONNECT;
|
|
MCA_OOB_TCP_HDR_HTON(&hdr);
|
|
if(mca_oob_tcp_peer_send_blocking(peer, sd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* Receive the peers globally unique process identification from a newly
|
|
* connected socket and verify the expected response. If so, move the
|
|
* socket to a connected state.
|
|
*/
|
|
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer, int sd)
|
|
{
|
|
mca_oob_tcp_hdr_t hdr;
|
|
if((mca_oob_tcp_peer_recv_blocking(peer, sd, &hdr, sizeof(hdr))) != sizeof(hdr)) {
|
|
/* If the peer state is still CONNECT_ACK, that indicates that
|
|
the error was a reset from the remote host because the
|
|
connection was not able to be fully established. In that
|
|
case, Clean up the connection and give it another go. */
|
|
if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) {
|
|
struct timeval tv = { 1,0 };
|
|
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
|
|
opal_output(0,
|
|
"%s-%s mca_oob_tcp_peer_recv_connect_ack "
|
|
"connect failed during receive. Restarting (%s).",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno));
|
|
}
|
|
opal_event_del(&peer->peer_recv_event);
|
|
mca_oob_tcp_peer_shutdown(peer);
|
|
opal_evtimer_add(&peer->peer_timer_event, &tv);
|
|
return ORTE_SUCCESS;
|
|
} else {
|
|
mca_oob_tcp_peer_close(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
}
|
|
MCA_OOB_TCP_HDR_NTOH(&hdr);
|
|
if(hdr.msg_type != MCA_OOB_TCP_CONNECT) {
|
|
opal_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n",
|
|
hdr.msg_type);
|
|
mca_oob_tcp_peer_close(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
|
|
/* compare the peers name to the expected value */
|
|
if (0 != orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, &hdr.msg_src)) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_connect_ack: "
|
|
"received unexpected process identifier %s\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
ORTE_NAME_PRINT(&(hdr.msg_src)));
|
|
mca_oob_tcp_peer_close(peer);
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
|
|
/* if we have an invalid name or do not have one assigned at all -
|
|
* use the name returned by the peer. This needs to be a LITERAL
|
|
* comparison - we do NOT want wildcard values to return EQUAL
|
|
*/
|
|
if(orte_process_info.my_name == NULL) {
|
|
orte_ns.create_process_name(&orte_process_info.my_name,
|
|
hdr.msg_dst.jobid, hdr.msg_dst.vpid);
|
|
} else if (orte_ns.compare_fields(ORTE_NS_CMP_ALL, orte_process_info.my_name, ORTE_NAME_INVALID) == ORTE_EQUAL) {
|
|
*orte_process_info.my_name = hdr.msg_dst;
|
|
}
|
|
|
|
/* connected */
|
|
mca_oob_tcp_peer_connected(peer, sd);
|
|
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
|
|
mca_oob_tcp_peer_dump(peer, "connected");
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
/*
|
|
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
|
|
* information that identifies the peers endpoint.
|
|
*/
|
|
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size)
|
|
{
|
|
unsigned char* ptr = (unsigned char*)data;
|
|
size_t cnt = 0;
|
|
while(cnt < size) {
|
|
int retval = recv(sd,(char *)ptr+cnt, size-cnt, 0);
|
|
|
|
/* remote closed connection */
|
|
if(retval == 0) {
|
|
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_blocking: "
|
|
"peer closed connection: peer state %d",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
peer->peer_state);
|
|
}
|
|
mca_oob_tcp_peer_close(peer);
|
|
return -1;
|
|
}
|
|
|
|
/* socket is non-blocking so handle errors */
|
|
if(retval < 0) {
|
|
if(opal_socket_errno != EINTR &&
|
|
opal_socket_errno != EAGAIN &&
|
|
opal_socket_errno != EWOULDBLOCK) {
|
|
if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) {
|
|
/* If we overflow the listen backlog, it's
|
|
possible that even though we finished the three
|
|
way handshake, the remote host was unable to
|
|
transition the connection from half connected
|
|
(received the initial SYN) to fully connected
|
|
(in the listen backlog). We likely won't see
|
|
the failure until we try to receive, due to
|
|
timing and the like. The first thing we'll get
|
|
in that case is a RST packet, which receive
|
|
will turn into a connection reset by peer
|
|
errno. In that case, leave the socket in
|
|
CONNECT_ACK and propogate the error up to
|
|
recv_connect_ack, who will try to establish the
|
|
connection again */
|
|
return -1;
|
|
} else {
|
|
opal_output(0,
|
|
"%s-%s mca_oob_tcp_peer_recv_blocking: "
|
|
"recv() failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(errno),
|
|
errno);
|
|
mca_oob_tcp_peer_close(peer);
|
|
return -1;
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
cnt += retval;
|
|
}
|
|
return cnt;
|
|
}
|
|
|
|
/*
|
|
* A blocking send on a non-blocking socket. Used to send the small amount of connection
|
|
* information that identifies the peers endpoint.
|
|
*/
|
|
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, int sd, void* data, size_t size)
|
|
{
|
|
unsigned char* ptr = (unsigned char*)data;
|
|
size_t cnt = 0;
|
|
while(cnt < size) {
|
|
int retval = send(sd, (char *)ptr+cnt, size-cnt, 0);
|
|
if(retval < 0) {
|
|
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_send_blocking: send() failed: %s (%d)\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
mca_oob_tcp_peer_close(peer);
|
|
return -1;
|
|
}
|
|
continue;
|
|
}
|
|
cnt += retval;
|
|
}
|
|
return cnt;
|
|
}
|
|
|
|
|
|
int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
mca_oob_tcp_hdr_t hdr;
|
|
if(peer->peer_state != MCA_OOB_TCP_CONNECTED)
|
|
return ORTE_SUCCESS;
|
|
hdr.msg_src = *orte_process_info.my_name;
|
|
hdr.msg_dst = peer->peer_name;
|
|
hdr.msg_type = MCA_OOB_TCP_IDENT;
|
|
hdr.msg_size = 0;
|
|
hdr.msg_tag = 0;
|
|
MCA_OOB_TCP_HDR_HTON(&hdr);
|
|
if(mca_oob_tcp_peer_send_blocking(peer, peer->peer_sd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
|
|
return ORTE_ERR_UNREACH;
|
|
}
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
|
|
/* static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr) */
|
|
/* { */
|
|
/* OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); */
|
|
/* ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name); */
|
|
/* peer->peer_name = hdr->msg_src; */
|
|
/* ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer); */
|
|
/* OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); */
|
|
/* } */
|
|
|
|
|
|
/*
|
|
* Dispatch to the appropriate action routine based on the state
|
|
* of the connection with the peer.
|
|
*/
|
|
|
|
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
|
|
{
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
switch(peer->peer_state) {
|
|
case MCA_OOB_TCP_CONNECT_ACK:
|
|
{
|
|
mca_oob_tcp_peer_recv_connect_ack(peer, sd);
|
|
break;
|
|
}
|
|
case MCA_OOB_TCP_CONNECTED:
|
|
{
|
|
/* allocate a new message and setup for recv */
|
|
if(NULL == peer->peer_recv_msg) {
|
|
int rc;
|
|
mca_oob_tcp_msg_t* msg;
|
|
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
|
|
if(NULL == msg) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)));
|
|
return;
|
|
}
|
|
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
|
|
msg->msg_rc = 0;
|
|
msg->msg_flags = 0;
|
|
msg->msg_peer = peer->peer_name;
|
|
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,2);
|
|
msg->msg_rwbuf = NULL;
|
|
msg->msg_rwcnt = msg->msg_rwnum = 1;
|
|
msg->msg_rwptr = msg->msg_rwiov;
|
|
msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr;
|
|
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
|
|
peer->peer_recv_msg = msg;
|
|
}
|
|
|
|
if (peer->peer_recv_msg &&
|
|
mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
|
|
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;
|
|
peer->peer_recv_msg = NULL;
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
mca_oob_tcp_msg_recv_complete(msg, peer);
|
|
return;
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
peer->peer_state);
|
|
mca_oob_tcp_peer_close(peer);
|
|
break;
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
}
|
|
|
|
/*
|
|
* A file descriptor is available/ready for send. Check the state
|
|
* of the socket and take the appropriate action.
|
|
*/
|
|
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
|
|
{
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
switch(peer->peer_state) {
|
|
case MCA_OOB_TCP_CONNECTING:
|
|
mca_oob_tcp_peer_complete_connect(peer, sd);
|
|
break;
|
|
case MCA_OOB_TCP_CONNECTED:
|
|
{
|
|
while(peer->peer_send_msg != NULL) {
|
|
|
|
/* complete the current send */
|
|
mca_oob_tcp_msg_t* msg = peer->peer_send_msg;
|
|
if(ntohl(msg->msg_hdr.msg_type) == MCA_OOB_TCP_PING ||
|
|
mca_oob_tcp_msg_send_handler(msg, peer)) {
|
|
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
|
|
} else {
|
|
break;
|
|
}
|
|
|
|
/* if current completed - progress any pending sends */
|
|
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
|
|
opal_list_remove_first(&peer->peer_send_queue);
|
|
}
|
|
|
|
/* if nothing else to do unregister for send event notifications */
|
|
if(NULL == peer->peer_send_msg) {
|
|
opal_event_del(&peer->peer_send_event);
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d)",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
peer->peer_state);
|
|
opal_event_del(&peer->peer_send_event);
|
|
break;
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
}
|
|
|
|
|
|
/*
|
|
* Routine for debugging to print the connection state and socket options
|
|
*/
|
|
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
|
|
{
|
|
char src[64];
|
|
char dst[64];
|
|
char buff[255];
|
|
int sndbuf,rcvbuf,nodelay,flags;
|
|
struct sockaddr_storage inaddr;
|
|
opal_socklen_t addrlen = sizeof(struct sockaddr_storage);
|
|
opal_socklen_t optlen;
|
|
|
|
getsockname(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
|
|
snprintf(src, sizeof(src), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
|
|
getpeername(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
|
|
snprintf(dst, sizeof(dst), "%s", opal_net_get_hostname((struct sockaddr*) &inaddr));
|
|
|
|
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
|
|
opal_output(0, "mca_oob_tcp_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n",
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
|
|
#if defined(SO_SNDBUF)
|
|
optlen = sizeof(sndbuf);
|
|
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
|
|
opal_output(0, "mca_oob_tcp_peer_dump: SO_SNDBUF option: %s (%d)\n",
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#else
|
|
sndbuf = -1;
|
|
#endif
|
|
#if defined(SO_RCVBUF)
|
|
optlen = sizeof(rcvbuf);
|
|
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
|
|
opal_output(0, "mca_oob_tcp_peer_dump: SO_RCVBUF option: %s (%d)\n",
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#else
|
|
rcvbuf = -1;
|
|
#endif
|
|
#if defined(TCP_NODELAY)
|
|
optlen = sizeof(nodelay);
|
|
if(getsockopt(peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) {
|
|
opal_output(0, "mca_oob_tcp_peer_dump: TCP_NODELAY option: %s (%d)\n",
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#else
|
|
nodelay = 0;
|
|
#endif
|
|
|
|
snprintf(buff, sizeof(buff), "%s-%s %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)),
|
|
msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
|
|
opal_output(0, buff);
|
|
}
|
|
|
|
|
|
/*
|
|
* Accept incoming connection - if not already connected. We compare the name of the
|
|
* peer to our own name using the ns.compare_fields function as we want this to be
|
|
* a LITERAL comparison - i.e., there is no occasion when the peer's name should
|
|
* be a wildcard value.
|
|
*
|
|
* To avoid competing reciprocal connection attempts, we only accept connections from
|
|
* processes whose names are "greater" than our own.
|
|
*/
|
|
|
|
bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
|
|
{
|
|
int cmpval;
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
cmpval = orte_ns.compare_fields(ORTE_NS_CMP_ALL, &peer->peer_name, orte_process_info.my_name);
|
|
if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||
|
|
(peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
|
|
(peer->peer_state != MCA_OOB_TCP_CONNECTED &&
|
|
cmpval == ORTE_VALUE1_GREATER)) {
|
|
|
|
if(peer->peer_state != MCA_OOB_TCP_CLOSED) {
|
|
mca_oob_tcp_peer_close(peer);
|
|
}
|
|
peer->peer_sd = sd;
|
|
mca_oob_tcp_peer_event_init(peer);
|
|
|
|
if(mca_oob_tcp_peer_send_connect_ack(peer, sd) != ORTE_SUCCESS) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_accept: "
|
|
"mca_oob_tcp_peer_send_connect_ack failed\n",
|
|
ORTE_NAME_PRINT(orte_process_info.my_name),
|
|
ORTE_NAME_PRINT(&(peer->peer_name)));
|
|
mca_oob_tcp_peer_close(peer);
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
return false;
|
|
}
|
|
|
|
mca_oob_tcp_peer_connected(peer, sd);
|
|
if (sd == peer->peer_sd) {
|
|
opal_event_add(&peer->peer_recv_event, 0);
|
|
}
|
|
if(mca_oob_tcp_component.tcp_debug > 0) {
|
|
mca_oob_tcp_peer_dump(peer, "accepted");
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
return true;
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
return false;
|
|
}
|
|
|
|
|
|
/*
|
|
* resolve process name to an actual internet address.
|
|
*/
|
|
|
|
void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr)
|
|
{
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
peer->peer_addr = addr;
|
|
if((peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
|
|
(peer->peer_state == MCA_OOB_TCP_CLOSED && opal_list_get_size(&peer->peer_send_queue))) {
|
|
mca_oob_tcp_peer_start_connect(peer);
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
}
|
|
|
|
/*
|
|
* Callback on timeout - retry connection attempt.
|
|
*/
|
|
|
|
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
|
|
{
|
|
/* start the connection to the peer */
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user;
|
|
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
|
|
mca_oob_tcp_peer_start_connect(peer);
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
}
|
|
|
|
/*
|
|
* Remove any references to the indicated message.
|
|
*/
|
|
|
|
void mca_oob_tcp_peer_dequeue_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
|
{
|
|
opal_list_item_t* item;
|
|
OPAL_THREAD_LOCK(&peer->peer_lock);
|
|
if (peer->peer_send_msg == msg)
|
|
peer->peer_send_msg = NULL;
|
|
if (peer->peer_recv_msg == msg)
|
|
peer->peer_recv_msg = NULL;
|
|
|
|
for( item = opal_list_get_first(&peer->peer_send_queue);
|
|
item != opal_list_get_end(&peer->peer_send_queue);
|
|
item = opal_list_get_next(item)) {
|
|
if(item == (opal_list_item_t*)msg) {
|
|
opal_list_remove_item(&peer->peer_send_queue, item);
|
|
break;
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&peer->peer_lock);
|
|
}
|
|
|
|
|
|
/**
|
|
* Set socket buffering
|
|
*/
|
|
|
|
void mca_oob_tcp_set_socket_options(int sd)
|
|
{
|
|
int optval;
|
|
#if defined(TCP_NODELAY)
|
|
optval = 1;
|
|
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
|
|
opal_backtrace_print(stderr);
|
|
opal_output(0, "[%s:%d] setsockopt(TCP_NODELAY) failed: %s (%d)",
|
|
__FILE__, __LINE__,
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#endif
|
|
#if defined(SO_SNDBUF)
|
|
if(mca_oob_tcp_component.tcp_sndbuf > 0 &&
|
|
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_oob_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
|
|
opal_output(0, "[%s:%d] setsockopt(SO_SNDBUF) failed: %s (%d)",
|
|
__FILE__, __LINE__,
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#endif
|
|
#if defined(SO_RCVBUF)
|
|
if(mca_oob_tcp_component.tcp_rcvbuf > 0 &&
|
|
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_oob_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
|
|
opal_output(0, "[%s:%d] setsockopt(SO_RCVBUF) failed: %s (%d)",
|
|
__FILE__, __LINE__,
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
#endif
|
|
}
|
|
|
|
|
|
|