1
1
openmpi/orte/mca/oob/tcp/oob_tcp_peer.c
Josh Hursey c11ba09655 Remove the progress engine stuff from abort. This was causing
some orted's to stall on locks in the MPI Dynamics cases. Since it
is not essentual that we call these functions, they can so away.

Unlock the peer lock when aborting. This causes a potential deadlock
in do_waitall [see comment in code]. This was causing orteds to
deadlock at times when the seed had terminated. With proper interleaving
and timing the orted was deadlocking. This seems to have fixed this in 
my stress testing with MPI 2 Dynamics.

This commit was SVN r7539.
2005-09-29 05:04:43 +00:00

946 строки
35 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <fcntl.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "include/ompi_socket_errno.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
#include "class/orte_proc_table.h"
#include "opal/util/output.h"
#include "util/univ_info.h"
#include "mca/gpr/gpr.h"
#include "mca/ns/ns.h"
#include "mca/errmgr/errmgr.h"
#include "oob_tcp.h"
#include "oob_tcp_peer.h"
#define IMPORTANT_WINDOWS_COMMENT() \
/* In windows, many of the socket functions return an EWOULDBLOCK instead of \
things like EAGAIN, EINPROGRESS, etc. It has been verified that this will \
not conflict with other error codes that are returned by these functions \
under UNIX/Linux environments */
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
OBJ_CLASS_INSTANCE(
mca_oob_tcp_peer_t,
opal_list_item_t,
mca_oob_tcp_peer_construct,
mca_oob_tcp_peer_destruct);
/*
* This is the constructor function for the mca_oob_tcp_peer
* struct. Note that this function and OBJ_NEW should NEVER
* be called directly. Instead, use mca_oob_tcp_add_peer
*
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be initialized
* @retval none
*/
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
{
OBJ_CONSTRUCT(&(peer->peer_send_queue), opal_list_t);
OBJ_CONSTRUCT(&(peer->peer_lock), opal_mutex_t);
memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));
memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));
memset(&peer->peer_timer_event, 0, sizeof(peer->peer_timer_event));
opal_evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);
}
/*
* This is the destructor function for the mca_oob_tcp_peer
* struct. Note that this function and OBJ_RELEASE should NEVER
* be called directly. Instead, use mca_oob_tcp_del_peer
*
* @param peer a pointer to the mca_oob_tcp_peer_t struct to be destroyed
* @retval none
*/
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t * peer)
{
mca_oob_tcp_peer_shutdown(peer);
OBJ_DESTRUCT(&(peer->peer_send_queue));
OBJ_DESTRUCT(&(peer->peer_lock));
}
/*
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
{
memset(&peer->peer_recv_event, 0, sizeof(peer->peer_recv_event));
memset(&peer->peer_send_event, 0, sizeof(peer->peer_send_event));
opal_event_set(
&peer->peer_recv_event,
peer->peer_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_peer_recv_handler,
peer);
opal_event_set(
&peer->peer_send_event,
peer->peer_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_oob_tcp_peer_send_handler,
peer);
return OMPI_SUCCESS;
}
/*
* Initiate the appropriate action based on the state of the connection
* to the peer.
*
*/
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
{
int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECTING:
case MCA_OOB_TCP_CONNECT_ACK:
case MCA_OOB_TCP_CLOSED:
case MCA_OOB_TCP_RESOLVE:
/*
* queue the message and attempt to resolve the peer address
*/
opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);
if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
peer->peer_state = MCA_OOB_TCP_RESOLVE;
OPAL_THREAD_UNLOCK(&peer->peer_lock);
return mca_oob_tcp_resolve(peer);
}
break;
case MCA_OOB_TCP_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_OOB_TCP_CONNECTED:
/*
* start the message and queue if not completed
*/
if (NULL != peer->peer_send_msg) {
opal_list_append(&peer->peer_send_queue, (opal_list_item_t*)msg);
} else {
/*if the send does not complete */
if(!mca_oob_tcp_msg_send_handler(msg, peer)) {
peer->peer_send_msg = msg;
opal_event_add(&peer->peer_send_event, 0);
} else {
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
}
}
break;
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
return rc;
}
/*
* Lookup a peer by name, create one if it doesn't exist.
* @param name Peers globally unique identifier.
* @retval Pointer to the newly created struture or NULL on error.
*/
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const orte_process_name_t* name)
{
int rc;
mca_oob_tcp_peer_t * peer, *old;
opal_list_item_t* item;
if (NULL == name) { /* can't look this one up */
return NULL;
}
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
peer = (mca_oob_tcp_peer_t*)orte_hash_table_get_proc(
&mca_oob_tcp_component.tcp_peers, name);
if(NULL != peer && memcmp(&peer->peer_name,name,sizeof(peer->peer_name)) == 0) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return peer;
}
/* search the peer list - if we find it here this is a bug in the tree */
for(item = opal_list_get_first(&mca_oob_tcp_component.tcp_peer_list);
item != opal_list_get_end(&mca_oob_tcp_component.tcp_peer_list);
item = opal_list_get_next(item)) {
peer = (mca_oob_tcp_peer_t*)item;
if (memcmp(&peer->peer_name, name, sizeof(peer->peer_name)) == 0) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return peer;
}
}
/* allocate from free list */
MCA_OOB_TCP_PEER_ALLOC(peer, rc);
if(NULL == peer) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return NULL;
}
/* initialize peer state */
peer->peer_name = *name;
peer->peer_addr = NULL;
peer->peer_sd = -1;
peer->peer_state = MCA_OOB_TCP_CLOSED;
peer->peer_recv_msg = NULL;
peer->peer_send_msg = NULL;
peer->peer_retries = 0;
/* add to lookup table */
if(OMPI_SUCCESS != orte_hash_table_set_proc(&mca_oob_tcp_component.tcp_peers,
&peer->peer_name, peer)) {
MCA_OOB_TCP_PEER_RETURN(peer);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return NULL;
}
/* if the peer list is over the maximum size, remove one unsed peer */
opal_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (opal_list_item_t *) peer);
if(mca_oob_tcp_component.tcp_peer_limit > 0 &&
(int)opal_list_get_size(&mca_oob_tcp_component.tcp_peer_list) >
mca_oob_tcp_component.tcp_peer_limit) {
old = (mca_oob_tcp_peer_t *)
opal_list_get_last(&mca_oob_tcp_component.tcp_peer_list);
while(1) {
if(0 == opal_list_get_size(&(old->peer_send_queue)) &&
NULL == peer->peer_recv_msg) {
opal_list_remove_item(&mca_oob_tcp_component.tcp_peer_list,
(opal_list_item_t *) old);
MCA_OOB_TCP_PEER_RETURN(old);
break;
} else {
old = (mca_oob_tcp_peer_t *) opal_list_get_prev(old);
if(opal_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (opal_list_item_t*)old) {
/* we tried, but we couldn't find one that was valid to get rid
* of. Oh well. */
break;
}
}
}
}
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return peer;
}
/*
* Start a connection to the peer. This will likely not complete,
* as the socket is set to non-blocking, so register for event
* notification of connect completion. On connection we send
* our globally unique process identifier to the peer and wait for
* the peers response.
*/
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
{
int rc, flags;
struct sockaddr_in inaddr;
/* create socket */
peer->peer_state = MCA_OOB_TCP_CONNECTING;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) {
struct timeval tv = { 1,0 };
opal_output(0,
"[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: socket() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ompi_socket_errno);
mca_oob_tcp_peer_shutdown(peer);
opal_evtimer_add(&peer->peer_timer_event, &tv);
return OMPI_ERR_UNREACH;
}
/* setup event callbacks */
mca_oob_tcp_peer_event_init(peer);
/* setup the socket as non-blocking */
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ompi_socket_errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(peer->peer_sd, F_SETFL, flags) < 0)
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_connect: fcntl(F_SETFL) failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ompi_socket_errno);
}
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: trying all %d addresses\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)), peer->peer_addr->addr_count );
}
/*
* We should parse all the IP addresses exported by the peer and try to connect to each of them.
*/
do {
/* pick an address in round-robin fashion from the list exported by the peer */
if((rc = mca_oob_tcp_addr_get_next(peer->peer_addr, &inaddr)) != OMPI_SUCCESS) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: mca_oob_tcp_addr_get_next failed with error=%d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
rc);
break;
}
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: connecting port %d to: %s:%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ntohs(mca_oob_tcp_component.tcp_listen_port),
inet_ntoa(inaddr.sin_addr),
ntohs(inaddr.sin_port));
}
/* start the connect - will likely fail with EINPROGRESS */
if(connect(peer->peer_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
/* non-blocking so wait for completion */
IMPORTANT_WINDOWS_COMMENT();
if(ompi_socket_errno == EINPROGRESS || ompi_socket_errno == EWOULDBLOCK) {
opal_event_add(&peer->peer_send_event, 0);
/* Waiting for completion in the middle of the list ?! Let's just hope we try with the
* correct IP address...
*/
return OMPI_SUCCESS;
}
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: connect to %s:%d failed with errno=%d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
inet_ntoa(inaddr.sin_addr),
ntohs(inaddr.sin_port),
ompi_socket_errno);
}
continue;
}
/* send our globally unique process identifier to the peer */
if((rc = mca_oob_tcp_peer_send_connect_ack(peer)) == OMPI_SUCCESS) {
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
opal_event_add(&peer->peer_recv_event, 0);
return OMPI_SUCCESS; /* successfully connect to the peer */
} else {
opal_output(0,
"[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_start_connect: "
"mca_oob_tcp_peer_send_connect_ack to %s:%d failed with errno=%d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
inet_ntoa(inaddr.sin_addr),
ntohs(inaddr.sin_port),
rc);
}
} while( peer->peer_addr->addr_next != 0 );
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the peer on the
* newly connected socket.
*/
static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
{
int so_error = 0;
ompi_socklen_t so_length = sizeof(so_error);
/* unregister from receiving event notifications */
opal_event_del(&peer->peer_send_event);
/* check connect completion status */
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ompi_socket_errno);
mca_oob_tcp_peer_close(peer);
return;
}
if(so_error == EINPROGRESS) {
opal_event_add(&peer->peer_send_event, 0);
return;
} else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) {
struct timeval tv = { 1,0 };
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: "
"connection failed (errno=%d) - retrying (pid=%d)\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
so_error, getpid());
mca_oob_tcp_peer_shutdown(peer);
opal_evtimer_add(&peer->peer_timer_event, &tv);
return;
} else if(so_error != 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
so_error);
mca_oob_tcp_peer_close(peer);
return;
}
if(mca_oob_tcp_peer_send_connect_ack(peer) == OMPI_SUCCESS) {
peer->peer_state = MCA_OOB_TCP_CONNECT_ACK;
opal_event_add(&peer->peer_recv_event, 0);
} else {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_complete_connect: unable to send connect ack.",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)));
mca_oob_tcp_peer_close(peer);
}
}
/*
* Setup peer state to reflect that connection has been established,
* and start any pending sends.
*/
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
{
opal_event_del(&peer->peer_timer_event);
peer->peer_state = MCA_OOB_TCP_CONNECTED;
peer->peer_retries = 0;
if(opal_list_get_size(&peer->peer_send_queue) > 0) {
if(NULL == peer->peer_send_msg)
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
opal_list_remove_first(&peer->peer_send_queue);
opal_event_add(&peer->peer_send_event, 0);
}
}
/*
* Remove any event registrations associated with the socket
* and update the peer state to reflect the connection has
* been closed.
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_close(%p) sd %d state %d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
peer,
peer->peer_sd,
peer->peer_state);
}
/* if we lose the connection to the seed - abort */
if(memcmp(&peer->peer_name,&mca_oob_name_seed,sizeof(mca_oob_name_seed)) == 0) {
/* If we are not already inside orte_finalize, then call abort */
if (ORTE_UNIVERSE_STATE_FINALIZE > orte_universe_info.state) {
/* Should free the peer lock before we abort so we don't
* get stuck in the orte_wait_kill when receiving messages in the
* tcp OOB. */
OPAL_THREAD_UNLOCK(&peer->peer_lock);
orte_errmgr.abort();
}
}
mca_oob_tcp_peer_shutdown(peer);
}
void mca_oob_tcp_peer_shutdown(mca_oob_tcp_peer_t* peer)
{
/* giving up and cleanup any pending messages */
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
mca_oob_tcp_msg_t *msg = peer->peer_send_msg;
while(msg != NULL) {
msg->msg_rc = OMPI_ERR_UNREACH;
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
msg = (mca_oob_tcp_msg_t*)opal_list_remove_first(&peer->peer_send_queue);
}
peer->peer_send_msg = NULL;
}
if (peer->peer_sd >= 0) {
opal_event_del(&peer->peer_recv_event);
opal_event_del(&peer->peer_send_event);
close(peer->peer_sd);
peer->peer_sd = -1;
}
opal_event_del(&peer->peer_timer_event);
peer->peer_state = MCA_OOB_TCP_CLOSED;
}
/*
* Send the globally unique identifier for this process to a peer on
* a newly connected socket.
*/
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{
/* send process identifier of self and peer - note that we may
* have assigned the peer a unique process name - if it came up
* without one.
*/
mca_oob_tcp_hdr_t hdr;
memset(&hdr,0,sizeof(hdr));
if (NULL == orte_process_info.my_name) { /* my name isn't defined yet */
hdr.msg_src = *MCA_OOB_NAME_ANY;
} else {
hdr.msg_src = *(orte_process_info.my_name);
}
hdr.msg_dst = peer->peer_name;
hdr.msg_type = MCA_OOB_TCP_CONNECT;
MCA_OOB_TCP_HDR_HTON(&hdr);
if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) {
return OMPI_ERR_UNREACH;
}
return OMPI_SUCCESS;
}
/*
* Receive the peers globally unique process identification from a newly
* connected socket and verify the expected response. If so, move the
* socket to a connected state.
*/
static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_hdr_t hdr;
if((mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr))) != sizeof(hdr)) {
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
MCA_OOB_TCP_HDR_NTOH(&hdr);
if(hdr.msg_type != MCA_OOB_TCP_CONNECT) {
opal_output(0, "mca_oob_tcp_peer_recv_connect_ack: invalid header type: %d\n", hdr.msg_type);
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* compare the peers name to the expected value */
if(memcmp(&peer->peer_name, &hdr.msg_src, sizeof(orte_process_name_t)) != 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_connect_ack: "
"received unexpected process identifier [%d,%d,%d]\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ORTE_NAME_ARGS(&(hdr.msg_src)));
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
/* if we have a wildcard name - use the name returned by the peer */
if(orte_process_info.my_name == NULL) {
orte_ns.create_process_name(&orte_process_info.my_name,
hdr.msg_dst.cellid, hdr.msg_dst.jobid, hdr.msg_dst.vpid);
} else if(orte_ns.compare(ORTE_NS_CMP_ALL, orte_process_info.my_name, &mca_oob_name_any) == 0) {
*orte_process_info.my_name = hdr.msg_dst;
}
/* connected */
mca_oob_tcp_peer_connected(peer);
if(mca_oob_tcp_component.tcp_debug > 0) {
mca_oob_tcp_peer_dump(peer, "connected");
}
return OMPI_SUCCESS;
}
/*
* A blocking recv on a non-blocking socket. Used to receive the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = recv(peer->peer_sd,(char *)ptr+cnt, size-cnt, 0);
/* remote closed connection */
if(retval == 0) {
if(mca_oob_tcp_component.tcp_debug > 0) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: "
"peer closed connection: peer state %d",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
peer->peer_state);
}
mca_oob_tcp_peer_close(peer);
return -1;
}
/* socket is non-blocking so handle errors */
if(retval < 0) {
IMPORTANT_WINDOWS_COMMENT();
if(ompi_socket_errno != EINTR && ompi_socket_errno != EAGAIN && ompi_socket_errno != EWOULDBLOCK) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_blocking: recv() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
errno);
mca_oob_tcp_peer_close(peer);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(peer->peer_sd, (char *)ptr+cnt, size-cnt, 0);
if(retval < 0) {
IMPORTANT_WINDOWS_COMMENT();
if(ompi_socket_errno != EINTR && ompi_socket_errno != EAGAIN && ompi_socket_errno != EWOULDBLOCK) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_blocking: send() failed with errno=%d\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
ompi_socket_errno);
mca_oob_tcp_peer_close(peer);
return -1;
}
continue;
}
cnt += retval;
}
return cnt;
}
int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_hdr_t hdr;
if(peer->peer_state != MCA_OOB_TCP_CONNECTED)
return OMPI_SUCCESS;
hdr.msg_src = *orte_process_info.my_name;
hdr.msg_dst = peer->peer_name;
hdr.msg_type = MCA_OOB_TCP_IDENT;
hdr.msg_size = 0;
hdr.msg_tag = 0;
MCA_OOB_TCP_HDR_HTON(&hdr);
if(mca_oob_tcp_peer_send_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))
return OMPI_ERR_UNREACH;
return OMPI_SUCCESS;
}
/* static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr) */
/* { */
/* OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); */
/* ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name); */
/* peer->peer_name = hdr->msg_src; */
/* ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer); */
/* OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); */
/* } */
/*
* Dispatch to the appropriate action routine based on the state
* of the connection with the peer.
*/
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
{
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;
OPAL_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECT_ACK:
{
mca_oob_tcp_peer_recv_connect_ack(peer);
break;
}
case MCA_OOB_TCP_CONNECTED:
{
/* allocate a new message and setup for recv */
if(NULL == peer->peer_recv_msg) {
int rc;
mca_oob_tcp_msg_t* msg;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)));
return;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
msg->msg_rc = 0;
msg->msg_flags = 0;
msg->msg_peer = peer->peer_name;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,2);
msg->msg_rwbuf = NULL;
msg->msg_rwiov->iov_base = (ompi_iov_base_ptr_t)msg->msg_rwbuf;
msg->msg_rwiov->iov_len = 1;
msg->msg_rwcnt = msg->msg_rwnum = 1;
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_rwiov[0].iov_base = (ompi_iov_base_ptr_t)&msg->msg_hdr;
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
peer->peer_recv_msg = msg;
}
if (peer->peer_recv_msg &&
mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;
peer->peer_recv_msg = NULL;
OPAL_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_msg_recv_complete(msg, peer);
return;
}
break;
}
default:
{
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
peer->peer_state);
mca_oob_tcp_peer_close(peer);
break;
}
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
{
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t *)user;
OPAL_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECTING:
mca_oob_tcp_peer_complete_connect(peer);
break;
case MCA_OOB_TCP_CONNECTED:
{
while(peer->peer_send_msg != NULL) {
/* complete the current send */
mca_oob_tcp_msg_t* msg = peer->peer_send_msg;
if(mca_oob_tcp_msg_send_handler(msg, peer)) {
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
} else {
break;
}
/* if current completed - progress any pending sends */
peer->peer_send_msg = (mca_oob_tcp_msg_t*)
opal_list_remove_first(&peer->peer_send_queue);
}
/* if nothing else to do unregister for send event notifications */
if(NULL == peer->peer_send_msg) {
opal_event_del(&peer->peer_send_event);
}
break;
}
default:
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_send_handler: invalid connection state (%d)",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
peer->peer_state);
opal_event_del(&peer->peer_send_event);
break;
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* Routine for debugging to print the connection state and socket options
*/
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
{
char src[64];
char dst[64];
char buff[255];
int sndbuf,rcvbuf,nodelay,flags;
struct sockaddr_in inaddr;
ompi_socklen_t optlen;
ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
getsockname(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
getpeername(peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
if((flags = fcntl(peer->peer_sd, F_GETFL, 0)) < 0) {
opal_output(0, "mca_oob_tcp_peer_dump: fcntl(F_GETFL) failed with errno=%d\n", ompi_socket_errno);
}
#if defined(SO_SNDBUF)
optlen = sizeof(sndbuf);
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
opal_output(0, "mca_oob_tcp_peer_dump: SO_SNDBUF option: errno %d\n", ompi_socket_errno);
}
#else
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
optlen = sizeof(rcvbuf);
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
opal_output(0, "mca_oob_tcp_peer_dump: SO_RCVBUF option: errno %d\n", ompi_socket_errno);
}
#else
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
optlen = sizeof(nodelay);
if(getsockopt(peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &optlen) < 0) {
opal_output(0, "mca_oob_tcp_peer_dump: TCP_NODELAY option: errno %d\n", ompi_socket_errno);
}
#else
nodelay = 0;
#endif
sprintf(buff, "[%lu,%lu,%lu]-[%lu,%lu,%lu] %s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)),
msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
opal_output(0, buff);
}
/*
* Accept incoming connection - if not already connected.
*/
bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
{
int cmpval;
OPAL_THREAD_LOCK(&peer->peer_lock);
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, &peer->peer_name, orte_process_info.my_name);
if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||
(peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
(peer->peer_state != MCA_OOB_TCP_CONNECTED &&
cmpval < 0)) {
if(peer->peer_state != MCA_OOB_TCP_CLOSED) {
mca_oob_tcp_peer_close(peer);
}
peer->peer_sd = sd;
mca_oob_tcp_peer_event_init(peer);
if(mca_oob_tcp_peer_send_connect_ack(peer) != OMPI_SUCCESS) {
opal_output(0, "[%lu,%lu,%lu]-[%lu,%lu,%lu] mca_oob_tcp_peer_accept: "
"mca_oob_tcp_peer_send_connect_ack failed\n",
ORTE_NAME_ARGS(orte_process_info.my_name),
ORTE_NAME_ARGS(&(peer->peer_name)));
mca_oob_tcp_peer_close(peer);
OPAL_THREAD_UNLOCK(&peer->peer_lock);
return false;
}
mca_oob_tcp_peer_connected(peer);
opal_event_add(&peer->peer_recv_event, 0);
if(mca_oob_tcp_component.tcp_debug > 0) {
mca_oob_tcp_peer_dump(peer, "accepted");
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
return true;
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
return false;
}
/*
* resolve process name to an actual internet address.
*/
void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* addr)
{
OPAL_THREAD_LOCK(&peer->peer_lock);
peer->peer_addr = addr;
if((peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
(peer->peer_state == MCA_OOB_TCP_CLOSED && opal_list_get_size(&peer->peer_send_queue))) {
mca_oob_tcp_peer_start_connect(peer);
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* Callback on timeout - retry connection attempt.
*/
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
{
/* start the connection to the peer */
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user;
opal_output(0, "mca_oob_tcp_peer_timer_handler\n");
OPAL_THREAD_LOCK(&peer->peer_lock);
if(peer->peer_state == MCA_OOB_TCP_CLOSED)
mca_oob_tcp_peer_start_connect(peer);
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}
/*
* Remove any references to the indicated message.
*/
void mca_oob_tcp_peer_dequeue_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
{
opal_list_item_t* item;
OPAL_THREAD_LOCK(&peer->peer_lock);
if (peer->peer_send_msg == msg)
peer->peer_send_msg = NULL;
if (peer->peer_recv_msg == msg)
peer->peer_recv_msg = NULL;
for( item = opal_list_get_first(&peer->peer_send_queue);
item != opal_list_get_end(&peer->peer_send_queue);
item = opal_list_get_next(item)) {
if(item == (opal_list_item_t*)msg) {
opal_list_remove_item(&peer->peer_send_queue, item);
break;
}
}
OPAL_THREAD_UNLOCK(&peer->peer_lock);
}