1
1
* Remove the connect() timeout code, as it had some nasty race conditions
    when connections were established as the trigger was firing.  A better
    solution has been found for the cluster where this was needed, so just
    removing it was easiest.
  * When a fatal error (too many connection failures) occurs, set an error
    on messages in the queue even if there isn't an active message.  The
    first message to any peer will be queued without being active (and
    so will all subsequent messages until the connection is established),
    and the orteds will hang until that first message completes.  So if
    an orted can never contact it's peer, it will never exit and just sit
    waiting for that message to complete.
  * Cover an interesting RST condition in the connect code.  A connection
    can complete the three-way handshake, the connector can even send
    some data, but the server side will drop the connection because it
    can't move it from the half-connected to fully-connected state because
    of space shortage in the listen backlog queue.  This causes a RST to
    be received first time that recv() is called, which will be when waiting
    for the remote side of the OOB ack.  In this case, transition the
    connection back into a CLOSED state and try to connect again.
  * Add levels of debugging, rather than all or nothing, each building on
    the previous level.  0 (default) is hard errors.  1 is connection 
    error debugging info.  2 is all connection info.  3 is more state
    info.  4 includes all message info.
  * Add some hopefully useful comments

This commit was SVN r14261.
Этот коммит содержится в:
Brian Barrett 2007-04-07 22:33:30 +00:00
родитель df4c468bb4
Коммит 8a55c84d0b
7 изменённых файлов: 183 добавлений и 128 удалений

3
NEWS
Просмотреть файл

@ -45,6 +45,9 @@ Trunk (not on release branches yet)
- Added checkpoint/restart process fault tolerance support. Initially
support a LAM/MPI-like protocol.
--> Expected: 1.3
- Fixed a number of connection establishment errors in the TCP out-
of-band messaging system.
--> Expected: 1.2.x
1.2.1

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -48,6 +50,9 @@
#include "orte/mca/ns/ns.h"
#include "orte/mca/gpr/gpr.h"
#undef SOMAXCONN
#define SOMAXCONN 1
/*
* Data structure for accepting connections.
*/
@ -215,6 +220,8 @@ int mca_oob_tcp_component_open(void)
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_copy_in_connections, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy, opal_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock, opal_mutex_t);
/* register oob module parameters */
@ -232,17 +239,6 @@ int mca_oob_tcp_component_open(void)
mca_oob_tcp_param_register_int("sndbuf", 128*1024);
mca_oob_tcp_component.tcp_rcvbuf =
mca_oob_tcp_param_register_int("rcvbuf", 128*1024);
/* AWF - may need to increase this for large-scale jobs -
see AWF comment in oob_tcp_peer.c */
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"connect_timeout",
"connect timeout in seconds, before trying next interface (0 means block until connect() times out)",
false,
false,
0,
&mca_oob_tcp_component.tcp_timeout);
mca_base_param_reg_int(&mca_oob_tcp_component.super.oob_base,
"connect_sleep",
@ -323,19 +319,27 @@ int mca_oob_tcp_component_close(void)
#endif
/* cleanup resources */
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peers);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_names);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_events);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return_copy);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_connections_return);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_copy_out_connections);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_pending_connections_fl);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_listen_thread);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_cond);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_completed);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_events);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_names);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peers);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_subscriptions);
OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
return ORTE_SUCCESS;
}
@ -349,6 +353,7 @@ int mca_oob_tcp_component_close(void)
static void mca_oob_tcp_accept(void)
{
#if 0
while(true) {
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
@ -369,11 +374,11 @@ static void mca_oob_tcp_accept(void)
mca_oob_tcp_set_socket_options(sd);
/* log the accept */
if(mca_oob_tcp_component.tcp_debug) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_accept: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
inet_ntoa(addr.sin_addr),
addr.sin_port);
ORTE_NAME_ARGS(orte_process_info.my_name),
inet_ntoa(addr.sin_addr),
addr.sin_port);
}
/* wait for receipt of peers process identifier to complete this connection */
@ -381,6 +386,7 @@ static void mca_oob_tcp_accept(void)
opal_event_set(&event->event, sd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
}
#endif
}
/*
@ -487,32 +493,14 @@ static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
while (count < mca_oob_tcp_component.tcp_copy_spin_count &&
opal_list_get_size(&mca_oob_tcp_component.tcp_copy_in_connections) <
(size_t) mca_oob_tcp_component.tcp_copy_max_size) {
/* we have to use "real" lock and unlock if */
/* we don't have thread support compiled in */
/* because OPAL_FREE_LIST_WAIT will not */
/* use locks otherwise and we have a race */
/* with the accept thread and the main thread on */
/* waiting/returning items to the free list */
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
OPAL_FREE_LIST_WAIT(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item, rc);
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
item = (mca_oob_tcp_pending_connection_t*) fl_item;
item->fd = accept(mca_oob_tcp_component.tcp_listen_sd,
(struct sockaddr*)&(item->addr), &addrlen);
if(item->fd < 0) {
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
fl_item);
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
if (mca_oob_tcp_component.tcp_shutdown) return NULL;
if(opal_socket_errno != EAGAIN || opal_socket_errno != EWOULDBLOCK) {
@ -526,7 +514,7 @@ static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
continue;
}
if(mca_oob_tcp_component.tcp_debug) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_thread: (%d, %d) %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
item->fd, opal_socket_errno,
@ -543,6 +531,9 @@ static void* mca_oob_tcp_listen_thread(opal_object_t *obj)
opal_list_join(&mca_oob_tcp_component.tcp_pending_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_pending_connections),
&mca_oob_tcp_component.tcp_copy_in_connections);
while (NULL != (fl_item = (opal_free_list_item_t*) opal_list_remove_first(&mca_oob_tcp_component.tcp_connections_return_copy))) {
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl, fl_item);
}
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
}
}
@ -573,12 +564,16 @@ static int mca_oob_tcp_listen_progress(void)
/* copy the pending connections from the list the accept
thread is inserting into into a temporary list for us to
process from. This is an O(1) operation, so we minimize
the lock time */
process from. Then copy the returned free list items into
that thread's return list, so it can free them soonish.
This is an O(1) operation, so we minimize the lock time. */
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
opal_list_join(&mca_oob_tcp_component.tcp_copy_out_connections,
opal_list_get_end(&mca_oob_tcp_component.tcp_copy_out_connections),
&mca_oob_tcp_component.tcp_pending_connections);
opal_list_join(&mca_oob_tcp_component.tcp_connections_return_copy,
opal_list_get_end(&mca_oob_tcp_component.tcp_connections_return_copy),
&mca_oob_tcp_component.tcp_connections_return);
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
/* process al the connections */
@ -590,7 +585,7 @@ static int mca_oob_tcp_listen_progress(void)
mca_oob_tcp_set_socket_options(item->fd);
/* log the accept */
if(mca_oob_tcp_component.tcp_debug) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_listen_progress: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
inet_ntoa(item->addr.sin_addr),
@ -602,20 +597,9 @@ static int mca_oob_tcp_listen_progress(void)
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event_set(&event->event, item->fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event_add(&event->event, 0);
/* we have to use "real" lock and unlock if */
/* we don't have thread support compiled in */
/* because OPAL_FREE_LIST_WAIT will not */
/* use locks otherwise and we have a race */
/* with the accept thread and the main thread on */
/* waiting/returning items to the free list */
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_lock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
OPAL_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_pending_connections_fl,
(opal_free_list_item_t *) item);
#if !OMPI_HAVE_THREAD_SUPPORT
opal_mutex_unlock(&mca_oob_tcp_component.tcp_pending_connections_lock);
#endif
/* put on the needs returning list */
opal_list_append(&mca_oob_tcp_component.tcp_connections_return,
(opal_list_item_t*) item);
count++;
}
@ -770,7 +754,7 @@ static void mca_oob_tcp_recv_connect(int sd, mca_oob_tcp_hdr_t* hdr)
}
/* is the peer instance willing to accept this connection */
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
if(mca_oob_tcp_component.tcp_debug > 0) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_recv_handler: "
"rejected connection from [%lu,%lu,%lu] connection state %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
@ -808,7 +792,7 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* recv the process identifier */
while((rc = recv(sd, (char *)&hdr, sizeof(hdr), 0)) != sizeof(hdr)) {
if(rc >= 0) {
if(mca_oob_tcp_component.tcp_debug > 1) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_recv_handler: peer closed connection",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
@ -898,7 +882,7 @@ void mca_oob_tcp_registry_callback(
mca_oob_tcp_addr_t* addr, *existing;
mca_oob_tcp_peer_t* peer;
if(mca_oob_tcp_component.tcp_debug > 1) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
@ -940,7 +924,7 @@ void mca_oob_tcp_registry_callback(
continue;
}
if(mca_oob_tcp_component.tcp_debug > 1) {
if(mca_oob_tcp_component.tcp_debug > OOB_TCP_DEBUG_INFO) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_registry_callback: received peer [%lu,%lu,%lu]\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(addr->addr_name)));
@ -1123,7 +1107,7 @@ int mca_oob_tcp_init(void)
-1, /* maximum number */
16); /* increment to grow by */
opal_progress_register(mca_oob_tcp_listen_progress);
if (mca_oob_tcp_component.tcp_debug) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "[%lu,%lu,%lu] accepting connections via listen thread",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
@ -1136,7 +1120,7 @@ int mca_oob_tcp_init(void)
opal_output(0, "mca_oob_tcp_init: unable to create listen socket");
return ORTE_ERROR;
}
if (mca_oob_tcp_component.tcp_debug) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "[%lu,%lu,%lu] accepting connections via event library",
ORTE_NAME_ARGS(orte_process_info.my_name));
}
@ -1161,7 +1145,7 @@ int mca_oob_tcp_init(void)
opal_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
if(mca_oob_tcp_component.tcp_debug > 2) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_ALL) {
opal_output(0, "[%lu,%lu,%lu] mca_oob_tcp_init: calling orte_gpr.subscribe\n",
ORTE_NAME_ARGS(orte_process_info.my_name));
}

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -45,6 +47,12 @@ extern "C" {
#define ORTE_OOB_TCP_KEY "oob-tcp"
#define OOB_TCP_DEBUG_CONNECT_FAIL 1 /* debug connection establishment failures */
#define OOB_TCP_DEBUG_CONNECT 2 /* other connection information */
#define OOB_TCP_DEBUG_INFO 3 /* information about startup, connection establish, etc. */
#define OOB_TCP_DEBUG_ALL 4 /* everything else */
/*
* standard component functions
*/
@ -250,7 +258,6 @@ struct mca_oob_tcp_component_t {
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
int tcp_sndbuf; /**< socket send buffer size */
int tcp_rcvbuf; /**< socket recv buffer size */
int tcp_timeout; /**< socket connect timeout in seconds */
opal_free_list_t tcp_msgs; /**< free list of messages */
opal_event_t tcp_send_event; /**< event structure for sends */
opal_event_t tcp_recv_event; /**< event structure for recvs */
@ -266,12 +273,16 @@ struct mca_oob_tcp_component_t {
bool tcp_shutdown;
mca_oob_tcp_listen_type_t tcp_listen_type;
opal_thread_t tcp_listen_thread;
opal_free_list_t tcp_pending_connections_fl;
opal_list_t tcp_pending_connections;
opal_list_t tcp_copy_out_connections;
opal_list_t tcp_copy_in_connections;
opal_list_t tcp_connections_return;
opal_list_t tcp_connections_return_copy;
opal_mutex_t tcp_pending_connections_lock;
opal_timer_t tcp_last_copy_time;
opal_timer_t tcp_copy_delta;
int tcp_copy_max_size;

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -352,7 +354,7 @@ static bool mca_oob_tcp_msg_recv(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
return false;
}
} else if (rc == 0) {
if(mca_oob_tcp_component.tcp_debug > 3) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT_FAIL) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_msg_recv: peer closed connection",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)));

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -277,7 +279,7 @@ static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer)
return ORTE_ERR_UNREACH;
}
if(mca_oob_tcp_component.tcp_debug > 0) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "
"connecting port %d to: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
@ -292,33 +294,19 @@ static int mca_oob_tcp_peer_try_connect(mca_oob_tcp_peer_t* peer)
(struct sockaddr*)&inaddr, sizeof(struct sockaddr_in)) < 0) {
/* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
if (mca_oob_tcp_component.tcp_timeout > 0) {
struct timeval tv;
tv.tv_sec = mca_oob_tcp_component.tcp_timeout;
tv.tv_usec = 0;
/* The first event is responsible for our timeout,
while the second event may occur sooner, due to
a successful connect() */
opal_evtimer_add(&peer->peer_timer_event, &tv);
}
opal_event_add(&peer->peer_send_event, 0);
return ORTE_SUCCESS;
}
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "
"connect to %s:%d failed: %s (%d)",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
inet_ntoa(inaddr.sin_addr),
ntohs(inaddr.sin_port),
strerror(opal_socket_errno),
opal_socket_errno);
}
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_try_connect: "
"connect to %s:%d failed: %s (%d)",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
inet_ntoa(inaddr.sin_addr),
ntohs(inaddr.sin_port),
strerror(opal_socket_errno),
opal_socket_errno);
continue;
}
@ -368,6 +356,7 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
peer->peer_state = MCA_OOB_TCP_CONNECTING;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) {
/* if we didn't successfully connect, wait 1 second and then try again */
struct timeval tv = { 1,0 };
opal_output(0,
"[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: socket() failed: %s (%d)\n",
@ -403,12 +392,6 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
opal_socket_errno);
}
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: trying all %d addresses\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)), peer->peer_addr->addr_count );
}
/*
* We should parse all the IP addresses exported by the peer and try to connect to each of them.
*/
@ -429,7 +412,6 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
/* unregister from receiving event notifications */
opal_event_del(&peer->peer_send_event);
opal_event_del(&peer->peer_timer_event);
/* check connect completion status */
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
@ -448,10 +430,11 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
} else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
struct timeval tv = { 1,0 };
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: "
"connection failed (errno=%d) - retrying (pid=%d)\n",
"connection failed: %s (%d) - retrying\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
so_error, getpid());
strerror(so_error),
so_error);
mca_oob_tcp_peer_shutdown(peer);
opal_evtimer_add(&peer->peer_timer_event, &tv);
return;
@ -463,6 +446,13 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
return;
}
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: "
"sending ack, %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)), so_error);
}
if(mca_oob_tcp_peer_send_connect_ack(peer) == ORTE_SUCCESS) {
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
opal_event_add(&peer->peer_recv_event, 0);
@ -498,7 +488,7 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{
if(mca_oob_tcp_component.tcp_debug > 0) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_close(%p) sd %d state %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
@ -526,13 +516,32 @@ void mca_oob_tcp_peer_shutdown(mca_oob_tcp_peer_t* peer)
{
/* giving up and cleanup any pending messages */
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
mca_oob_tcp_msg_t *msg = peer->peer_send_msg;
while(msg != NULL) {
mca_oob_tcp_msg_t *msg;
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_shutdown: retries exceeded",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)));
/* There are cases during the initial connection setup where
the peer_send_msg is NULL but there are things in the queue
-- handle that case */
if (NULL != (msg = peer->peer_send_msg)) {
msg->msg_complete = true;
msg->msg_rc = ORTE_ERR_UNREACH;
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
msg = (mca_oob_tcp_msg_t*)opal_list_remove_first(&peer->peer_send_queue);
}
peer->peer_send_msg = NULL;
while (NULL !=
(msg = (mca_oob_tcp_msg_t*)opal_list_remove_first(&peer->peer_send_queue))) {
msg->msg_complete = true;
msg->msg_rc = ORTE_ERR_UNREACH;
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
}
/* We were unsuccessful in establishing a connection, and are
not likely to suddenly become successful, so abort the
whole thing */
peer->peer_state = MCA_OOB_TCP_FAILED;
}
if (peer->peer_sd >= 0) {
@ -581,16 +590,37 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_hdr_t hdr;
if((mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr))) != sizeof(hdr)) {
mca_oob_tcp_peer_close(peer);
return ORTE_ERR_UNREACH;
/* If the peer state is still CONNECT_ACK, that indicates that
the error was a reset from the remote host because the
connection was not able to be fully established. In that
case, Clean up the connection and give it another go. */
if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) {
if (mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
struct timeval tv = { 1,0 };
opal_output(0,
"[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_connect_ack "
"connect failed during receive. Restarting (%s).",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
strerror(opal_socket_errno));
}
opal_event_del(&peer->peer_recv_event);
mca_oob_tcp_peer_shutdown(peer);
opal_evtimer_add(&peer->peer_timer_event, &tv);
return ORTE_SUCCESS;
} else {
mca_oob_tcp_peer_close(peer);
return ORTE_ERR_UNREACH;
}
}
MCA_OOB_TCP_HDR_NTOH(&hdr);
if(hdr.msg_type != MCA_OOB_TCP_CONNECT) {
opal_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n", hdr.msg_type);
opal_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n",
hdr.msg_type);
mca_oob_tcp_peer_close(peer);
return ORTE_ERR_UNREACH;
}
/* compare the peers name to the expected value */
if(memcmp(&peer->peer_name, &hdr.msg_src, sizeof(orte_process_name_t)) != 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_connect_ack: "
@ -602,8 +632,9 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
return ORTE_ERR_UNREACH;
}
/* if we have an invalid name or do not have one assigned at all - use the name returned by the peer.
* This needs to be a LITERAL comparison - we do NOT want wildcard values to return EQUAL
/* if we have an invalid name or do not have one assigned at all -
* use the name returned by the peer. This needs to be a LITERAL
* comparison - we do NOT want wildcard values to return EQUAL
*/
if(orte_process_info.my_name == NULL) {
orte_ns.create_process_name(&orte_process_info.my_name,
@ -614,7 +645,7 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
/* connected */
mca_oob_tcp_peer_connected(peer);
if(mca_oob_tcp_component.tcp_debug > 0) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_CONNECT) {
mca_oob_tcp_peer_dump(peer, "connected");
}
return ORTE_SUCCESS;
@ -634,7 +665,7 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
/* remote closed connection */
if(retval == 0) {
if(mca_oob_tcp_component.tcp_debug > 0) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_INFO) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: "
"peer closed connection: peer state %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
@ -647,14 +678,36 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
/* socket is non-blocking so handle errors */
if(retval < 0) {
if(opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: recv() failed: %s (%d)\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
strerror(errno),
errno);
mca_oob_tcp_peer_close(peer);
return -1;
if(opal_socket_errno != EINTR &&
opal_socket_errno != EAGAIN &&
opal_socket_errno != EWOULDBLOCK) {
if (peer->peer_state == MCA_OOB_TCP_CONNECT_ACK) {
/* If we overflow the listen backlog, it's
possible that even though we finished the three
way handshake, the remote host was unable to
transition the connection from half connected
(received the initial SYN) to fully connected
(in the listen backlog). We likely won't see
the failure until we try to receive, due to
timing and the like. The first thing we'll get
in that case is a RST packet, which receive
will turn into a connection reset by peer
errno. In that case, leave the socket in
CONNECT_ACK and propogate the error up to
recv_connect_ack, who will try to establish the
connection again */
return -1;
} else {
opal_output(0,
"[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: "
"recv() failed: %s (%d)\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
strerror(errno),
errno);
mca_oob_tcp_peer_close(peer);
return -1;
}
}
continue;
}
@ -967,8 +1020,6 @@ static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
OPAL_THREAD_LOCK(&peer->peer_lock);
if(peer->peer_state == MCA_OOB_TCP_CLOSED)
mca_oob_tcp_peer_start_connect(peer);
else if(peer->peer_state == MCA_OOB_TCP_CONNECTING)
mca_oob_tcp_peer_complete_connect(peer);
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -43,7 +45,7 @@ int mca_oob_tcp_recv(
mca_oob_tcp_msg_t *msg;
int i, rc = 0, size = 0;
if(mca_oob_tcp_component.tcp_debug > 3) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_ALL) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_recv: tag %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(peer),

Просмотреть файл

@ -9,6 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -98,7 +100,7 @@ int mca_oob_tcp_send(
if(NULL == peer)
return ORTE_ERR_UNREACH;
if(mca_oob_tcp_component.tcp_debug > 3) {
if(mca_oob_tcp_component.tcp_debug >= OOB_TCP_DEBUG_ALL) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_send: tag %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),