2a116ecdfc
onflict that can cause messages to be lost. Add detection of this condition, and have both processes cancel their connect operations. The process with the higher rank will reconnect, while the lower rank process will simply wait for the connection to be created. Refs trac:3696 This commit was SVN r29139. The following Trac tickets were found above: Ticket 3696 --> https://svn.open-mpi.org/trac/ompi/ticket/3696
643 строки
29 KiB
C
643 строки
29 KiB
C
/*
|
|
* Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2011 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
|
|
* All rights reserved.
|
|
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
|
|
* Copyright (c) 2013 Intel, Inc. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*
|
|
* In windows, many of the socket functions return an EWOULDBLOCK
|
|
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
|
|
* verified that this will \ not conflict with other error codes that
|
|
* are returned by these functions \ under UNIX/Linux environments
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
|
|
#ifdef HAVE_UNISTD_H
|
|
#include <unistd.h>
|
|
#endif
|
|
#include <fcntl.h>
|
|
#ifdef HAVE_SYS_UIO_H
|
|
#include <sys/uio.h>
|
|
#endif
|
|
#ifdef HAVE_NET_UIO_H
|
|
#include <net/uio.h>
|
|
#endif
|
|
#ifdef HAVE_SYS_TYPES_H
|
|
#include <sys/types.h>
|
|
#endif
|
|
#include "opal/opal_socket_errno.h"
|
|
#ifdef HAVE_NETINET_IN_H
|
|
#include <netinet/in.h>
|
|
#endif
|
|
#ifdef HAVE_ARPA_INET_H
|
|
#include <arpa/inet.h>
|
|
#endif
|
|
#ifdef HAVE_NETINET_TCP_H
|
|
#include <netinet/tcp.h>
|
|
#endif
|
|
|
|
#include "opal_stdint.h"
|
|
#include "opal/types.h"
|
|
#include "opal/mca/backtrace/backtrace.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/util/net.h"
|
|
#include "opal/util/error.h"
|
|
#include "opal/class/opal_hash_table.h"
|
|
#include "opal/mca/event/event.h"
|
|
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/ess/ess.h"
|
|
#include "orte/mca/routed/routed.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/runtime/orte_wait.h"
|
|
|
|
#include "oob_tcp.h"
|
|
#include "orte/mca/oob/tcp/oob_tcp_component.h"
|
|
#include "orte/mca/oob/tcp/oob_tcp_peer.h"
|
|
#include "orte/mca/oob/tcp/oob_tcp_common.h"
|
|
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
|
|
|
|
static int send_bytes(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
mca_oob_tcp_send_t* msg = peer->send_msg;
|
|
int rc;
|
|
|
|
while (0 < msg->sdbytes) {
|
|
rc = write(peer->sd, msg->sdptr, msg->sdbytes);
|
|
if (rc < 0) {
|
|
if (opal_socket_errno == EINTR) {
|
|
continue;
|
|
} else if (opal_socket_errno == EAGAIN) {
|
|
/* tell the caller to keep this message on active,
|
|
* but let the event lib cycle so other messages
|
|
* can progress while this socket is busy
|
|
*/
|
|
return ORTE_ERR_RESOURCE_BUSY;
|
|
} else if (opal_socket_errno == EWOULDBLOCK) {
|
|
/* tell the caller to keep this message on active,
|
|
* but let the event lib cycle so other messages
|
|
* can progress while this socket is busy
|
|
*/
|
|
return ORTE_ERR_WOULD_BLOCK;
|
|
}
|
|
/* we hit an error and cannot progress this message */
|
|
opal_output(0, "%s->%s mca_oob_tcp_msg_send_bytes: write failed: %s (%d) [sd = %d]",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno,
|
|
peer->sd);
|
|
return ORTE_ERR_COMM_FAILURE;
|
|
}
|
|
/* update location */
|
|
msg->sdbytes -= rc;
|
|
msg->sdptr += rc;
|
|
}
|
|
/* we sent the full data block */
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* A file descriptor is available/ready for send. Check the state
|
|
* of the socket and take the appropriate action.
|
|
*/
|
|
void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
|
|
{
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
|
|
mca_oob_tcp_send_t* msg = peer->send_msg;
|
|
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)peer->mod;
|
|
int rc;
|
|
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s tcp:send_handler called to send to peer %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name));
|
|
|
|
switch (peer->state) {
|
|
case MCA_OOB_TCP_CONNECTING:
|
|
case MCA_OOB_TCP_CLOSED:
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s tcp:send_handler %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
mca_oob_tcp_state_print(peer->state));
|
|
mca_oob_tcp_peer_complete_connect(mod, peer);
|
|
/* de-activate the send event until the connection
|
|
* handshake completes
|
|
*/
|
|
if (peer->send_ev_active) {
|
|
opal_event_del(&peer->send_event);
|
|
peer->send_ev_active = false;
|
|
}
|
|
break;
|
|
case MCA_OOB_TCP_CONNECTED:
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s tcp:send_handler SENDING TO %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
|
|
if (NULL != msg) {
|
|
/* if the header hasn't been completely sent, send it */
|
|
if (!msg->hdr_sent) {
|
|
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
|
|
/* header is completely sent */
|
|
msg->hdr_sent = true;
|
|
/* setup to send the data */
|
|
if (NULL != msg->data) {
|
|
/* relay msg - send that data */
|
|
msg->sdptr = msg->data;
|
|
msg->sdbytes = (int)ntohl(msg->hdr.nbytes);
|
|
} else if (NULL == msg->msg) {
|
|
/* this was a zero-byte relay - nothing more to do */
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
goto next;
|
|
} else if (NULL != msg->msg->buffer) {
|
|
/* send the buffer data as a single block */
|
|
msg->sdptr = msg->msg->buffer->base_ptr;
|
|
msg->sdbytes = msg->msg->buffer->bytes_used;
|
|
} else {
|
|
/* start with the first iovec */
|
|
msg->sdptr = msg->msg->iov[0].iov_base;
|
|
msg->sdbytes = msg->msg->iov[0].iov_len;
|
|
msg->iovnum = 0;
|
|
}
|
|
/* fall thru and let the send progress */
|
|
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
|
|
ORTE_ERR_WOULD_BLOCK == rc) {
|
|
/* exit this event and let the event lib progress */
|
|
return;
|
|
} else {
|
|
// report the error
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send header",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)));
|
|
opal_event_del(&peer->send_event);
|
|
msg->msg->status = rc;
|
|
ORTE_RML_SEND_COMPLETE(msg->msg);
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
goto next;
|
|
}
|
|
}
|
|
/* progress the data transmission */
|
|
if (msg->hdr_sent) {
|
|
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
|
|
/* this block is complete */
|
|
if (NULL != msg->data || NULL == msg->msg) {
|
|
/* the relay is complete - release the data */
|
|
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
|
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
(int)ntohl(msg->hdr.nbytes), peer->sd);
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
} else if (NULL != msg->msg->buffer) {
|
|
/* we are done - notify the RML */
|
|
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
|
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
(int)ntohl(msg->hdr.nbytes), peer->sd);
|
|
msg->msg->status = ORTE_SUCCESS;
|
|
ORTE_RML_SEND_COMPLETE(msg->msg);
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
} else {
|
|
/* rotate to the next iovec */
|
|
msg->iovnum++;
|
|
if (msg->iovnum < msg->msg->count) {
|
|
msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
|
|
msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
|
|
/* exit this event to give the event lib
|
|
* a chance to progress any other pending
|
|
* actions
|
|
*/
|
|
return;
|
|
} else {
|
|
/* this message is complete - notify the RML */
|
|
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
|
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
(int)ntohl(msg->hdr.nbytes), peer->sd);
|
|
msg->msg->status = ORTE_SUCCESS;
|
|
ORTE_RML_SEND_COMPLETE(msg->msg);
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
}
|
|
}
|
|
/* fall thru to queue the next message */
|
|
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
|
|
ORTE_ERR_WOULD_BLOCK == rc) {
|
|
/* exit this event and let the event lib progress */
|
|
return;
|
|
} else {
|
|
// report the error
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
|
|
opal_event_del(&peer->send_event);
|
|
msg->msg->status = rc;
|
|
ORTE_RML_SEND_COMPLETE(msg->msg);
|
|
OBJ_RELEASE(msg);
|
|
peer->send_msg = NULL;
|
|
ORTE_FORCED_TERMINATE(1);
|
|
return;
|
|
}
|
|
}
|
|
|
|
next:
|
|
/* if current message completed - progress any pending sends by
|
|
* moving the next in the queue into the "on-deck" position. Note
|
|
* that this doesn't mean we send the message right now - we will
|
|
* wait for another send_event to fire before doing so. This gives
|
|
* us a chance to service any pending recvs.
|
|
*/
|
|
peer->send_msg = (mca_oob_tcp_send_t*)
|
|
opal_list_remove_first(&peer->send_queue);
|
|
}
|
|
|
|
/* if nothing else to do unregister for send event notifications */
|
|
if (NULL == peer->send_msg && peer->send_ev_active) {
|
|
opal_event_del(&peer->send_event);
|
|
peer->send_ev_active = false;
|
|
}
|
|
break;
|
|
default:
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: invalid connection state (%d) on socket %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
peer->state, peer->sd);
|
|
if (peer->send_ev_active) {
|
|
opal_event_del(&peer->send_event);
|
|
peer->send_ev_active = false;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
static int read_bytes(mca_oob_tcp_peer_t* peer)
|
|
{
|
|
int rc;
|
|
|
|
/* read until all bytes recvd or error */
|
|
while (0 < peer->recv_msg->rdbytes) {
|
|
rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes);
|
|
if (rc < 0) {
|
|
if(opal_socket_errno == EINTR) {
|
|
continue;
|
|
} else if (opal_socket_errno == EAGAIN) {
|
|
/* tell the caller to keep this message on active,
|
|
* but let the event lib cycle so other messages
|
|
* can progress while this socket is busy
|
|
*/
|
|
return ORTE_ERR_RESOURCE_BUSY;
|
|
} else if (opal_socket_errno == EWOULDBLOCK) {
|
|
/* tell the caller to keep this message on active,
|
|
* but let the event lib cycle so other messages
|
|
* can progress while this socket is busy
|
|
*/
|
|
return ORTE_ERR_WOULD_BLOCK;
|
|
}
|
|
/* we hit an error and cannot progress this message - report
|
|
* the error back to the RML and let the caller know
|
|
* to abort this message
|
|
*/
|
|
if (opal_output_get_verbosity(orte_oob_base_framework.framework_output) >= OOB_TCP_DEBUG_FAIL) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_msg_recv: readv failed: %s (%d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
strerror(opal_socket_errno),
|
|
opal_socket_errno);
|
|
}
|
|
// mca_oob_tcp_peer_close(peer);
|
|
// if (NULL != mca_oob_tcp.oob_exception_callback) {
|
|
// mca_oob_tcp.oob_exception_callback(&peer->name, ORTE_RML_PEER_DISCONNECTED);
|
|
//}
|
|
return ORTE_ERR_COMM_FAILURE;
|
|
} else if (rc == 0) {
|
|
/* the remote peer closed the connection - report that condition
|
|
* and let the caller know
|
|
*/
|
|
if (opal_output_get_verbosity(orte_oob_base_framework.framework_output) >= OOB_TCP_DEBUG_FAIL) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_msg_recv: peer closed connection",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)));
|
|
}
|
|
/* stop all events */
|
|
if (peer->recv_ev_active) {
|
|
opal_event_del(&peer->recv_event);
|
|
peer->recv_ev_active = false;
|
|
}
|
|
if (peer->timer_ev_active) {
|
|
opal_event_del(&peer->timer_event);
|
|
peer->timer_ev_active = false;
|
|
}
|
|
if (peer->send_ev_active) {
|
|
opal_event_del(&peer->send_event);
|
|
peer->send_ev_active = false;
|
|
}
|
|
if (NULL != peer->recv_msg) {
|
|
OBJ_RELEASE(peer->recv_msg);
|
|
peer->recv_msg = NULL;
|
|
}
|
|
mca_oob_tcp_peer_close((mca_oob_tcp_module_t*)peer->mod, peer);
|
|
//if (NULL != mca_oob_tcp.oob_exception_callback) {
|
|
// mca_oob_tcp.oob_exception_callback(&peer->peer_name, ORTE_RML_PEER_DISCONNECTED);
|
|
//}
|
|
return ORTE_ERR_WOULD_BLOCK;
|
|
}
|
|
/* we were able to read something, so adjust counters and location */
|
|
peer->recv_msg->rdbytes -= rc;
|
|
peer->recv_msg->rdptr += rc;
|
|
}
|
|
/* we read the full data block */
|
|
return ORTE_SUCCESS;
|
|
}
|
|
|
|
/*
|
|
* Dispatch to the appropriate action routine based on the state
|
|
* of the connection with the peer.
|
|
*/
|
|
|
|
void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
|
{
|
|
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
|
|
mca_oob_tcp_module_t *mod = (mca_oob_tcp_module_t*)peer->mod;
|
|
int rc;
|
|
orte_process_name_t hop;
|
|
mca_oob_tcp_peer_t *relay;
|
|
uint64_t ui64;
|
|
|
|
if (orte_abnormal_term_ordered) {
|
|
return;
|
|
}
|
|
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler called for peer %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name));
|
|
|
|
switch (peer->state) {
|
|
case MCA_OOB_TCP_CONNECT_ACK:
|
|
if (ORTE_SUCCESS == (rc = mca_oob_tcp_peer_recv_connect_ack(mod, peer))) {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler starting send/recv events",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
/* we connected! Start the send/recv events */
|
|
if (!peer->recv_ev_active) {
|
|
opal_event_add(&peer->recv_event, 0);
|
|
peer->recv_ev_active = true;
|
|
}
|
|
if (peer->timer_ev_active) {
|
|
opal_event_del(&peer->timer_event);
|
|
peer->timer_ev_active = false;
|
|
}
|
|
/* if there is a message waiting to be sent, queue it */
|
|
if (NULL == peer->send_msg) {
|
|
peer->send_msg = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue);
|
|
}
|
|
if (NULL != peer->send_msg && !peer->send_ev_active) {
|
|
opal_event_add(&peer->send_event, 0);
|
|
peer->send_ev_active = true;
|
|
}
|
|
/* update our state */
|
|
peer->state = MCA_OOB_TCP_CONNECTED;
|
|
} else {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s UNABLE TO COMPLETE CONNECT ACK WITH %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name));
|
|
opal_event_del(&peer->recv_event);
|
|
ORTE_FORCED_TERMINATE(1);
|
|
return;
|
|
}
|
|
break;
|
|
case MCA_OOB_TCP_CONNECTED:
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler CONNECTED",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
/* allocate a new message and setup for recv */
|
|
if (NULL == peer->recv_msg) {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler allocate new recv msg",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
peer->recv_msg = OBJ_NEW(mca_oob_tcp_recv_t);
|
|
if (NULL == peer->recv_msg) {
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)));
|
|
return;
|
|
}
|
|
/* start by reading the header */
|
|
peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr;
|
|
peer->recv_msg->rdbytes = sizeof(mca_oob_tcp_hdr_t);
|
|
}
|
|
/* if the header hasn't been completely read, read it */
|
|
if (!peer->recv_msg->hdr_recvd) {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler read hdr",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
|
|
/* completed reading the header */
|
|
peer->recv_msg->hdr_recvd = true;
|
|
/* convert the header */
|
|
MCA_OOB_TCP_HDR_NTOH(&peer->recv_msg->hdr);
|
|
/* if this is a zero-byte message, then we are done */
|
|
if (0 == peer->recv_msg->hdr.nbytes) {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag);
|
|
peer->recv_msg->data = NULL; // make sure
|
|
} else {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler allocate data region",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
/* allocate the data region */
|
|
peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes);
|
|
/* point to it */
|
|
peer->recv_msg->rdptr = peer->recv_msg->data;
|
|
peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes;
|
|
}
|
|
/* fall thru and attempt to read the data */
|
|
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
|
|
ORTE_ERR_WOULD_BLOCK == rc) {
|
|
/* exit this event and let the event lib progress */
|
|
return;
|
|
} else {
|
|
/* close the connection */
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s:tcp:recv:handler error reading bytes - closing connection",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
mca_oob_tcp_peer_close(mod, peer);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (peer->recv_msg->hdr_recvd) {
|
|
/* continue to read the data block - we start from
|
|
* wherever we left off, which could be at the
|
|
* beginning or somewhere in the message
|
|
*/
|
|
if (ORTE_SUCCESS == (rc = read_bytes(peer))) {
|
|
/* we recvd all of the message */
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s RECVD COMPLETE MESSAGE FROM %s (ORIGIN %s) OF %d BYTES FOR DEST %s TAG %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name),
|
|
ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin),
|
|
(int)peer->recv_msg->hdr.nbytes,
|
|
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
|
|
peer->recv_msg->hdr.tag);
|
|
/* am I the intended recipient (header was already converted back to host order)? */
|
|
if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid &&
|
|
peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) {
|
|
/* yes - post it to the RML for delivery */
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s DELIVERING TO RML",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin, peer->recv_msg->hdr.tag,
|
|
peer->recv_msg->data,
|
|
peer->recv_msg->hdr.nbytes);
|
|
OBJ_RELEASE(peer->recv_msg);
|
|
} else {
|
|
/* no - find the next hop in the route */
|
|
hop = orte_routed.get_route(&peer->recv_msg->hdr.dst);
|
|
if (hop.jobid == ORTE_JOBID_INVALID ||
|
|
hop.vpid == ORTE_VPID_INVALID) {
|
|
/* no hop known - post the error to the component
|
|
* and let the OOB see if there is another way
|
|
* to get there from here
|
|
*/
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s NO ROUTE TO %s FROM HERE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&peer->name));
|
|
/* let the component know about the problem */
|
|
ORTE_ACTIVATE_TCP_MSG_ERROR(mod, NULL, peer->recv_msg, &hop, mca_oob_tcp_component_no_route);
|
|
/* cleanup */
|
|
OBJ_RELEASE(peer->recv_msg);
|
|
return;
|
|
} else {
|
|
/* does this module know how to reach the next hop? */
|
|
memcpy(&ui64, (char*)&hop, sizeof(uint64_t));
|
|
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mod->peers, ui64, (void**)&relay)) {
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s ADDRESS OF NEXT HOP %s TO %s IS UNKNOWN VIA MODULE %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&hop),
|
|
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst),
|
|
peer->mod->if_name);
|
|
/* let the component know about the problem */
|
|
ORTE_ACTIVATE_TCP_MSG_ERROR(mod, NULL, peer->recv_msg, &hop, mca_oob_tcp_component_hop_unknown);
|
|
/* cleanup */
|
|
OBJ_RELEASE(peer->recv_msg);
|
|
return;
|
|
}
|
|
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
|
"%s ROUTING TO %s FROM HERE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&relay->name));
|
|
/* if this came from a different job family, then ensure
|
|
* we know how to return
|
|
*/
|
|
if (ORTE_JOB_FAMILY(peer->recv_msg->hdr.origin.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
|
orte_routed.update_route(&(peer->recv_msg->hdr.origin), &peer->name);
|
|
}
|
|
/* post the message for retransmission */
|
|
MCA_OOB_TCP_QUEUE_RELAY(peer->recv_msg, relay);
|
|
OBJ_RELEASE(peer->recv_msg);
|
|
}
|
|
}
|
|
peer->recv_msg = NULL;
|
|
return;
|
|
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
|
|
ORTE_ERR_WOULD_BLOCK == rc) {
|
|
/* exit this event and let the event lib progress */
|
|
return;
|
|
} else {
|
|
// report the error
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: unable to recv message",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)));
|
|
/* turn off the recv event */
|
|
opal_event_del(&peer->recv_event);
|
|
ORTE_FORCED_TERMINATE(1);
|
|
return;
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
opal_output(0, "%s-%s mca_oob_tcp_peer_recv_handler: invalid socket state(%d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&(peer->name)),
|
|
peer->state);
|
|
// mca_oob_tcp_peer_close(peer);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void snd_cons(mca_oob_tcp_send_t *ptr)
|
|
{
|
|
ptr->msg = NULL;
|
|
ptr->data = NULL;
|
|
ptr->hdr_sent = false;
|
|
ptr->iovnum = 0;
|
|
ptr->sdptr = NULL;
|
|
ptr->sdbytes = 0;
|
|
}
|
|
/* we don't destruct any RML msg that is
|
|
* attached to our send as the RML owns
|
|
* that memory. However, if we relay a
|
|
* msg, the data in the relay belongs to
|
|
* us and must be free'd
|
|
*/
|
|
static void snd_des(mca_oob_tcp_send_t *ptr)
|
|
{
|
|
if (NULL != ptr->data) {
|
|
free(ptr->data);
|
|
}
|
|
}
|
|
OBJ_CLASS_INSTANCE(mca_oob_tcp_send_t,
|
|
opal_list_item_t,
|
|
snd_cons, snd_des);
|
|
|
|
static void rcv_cons(mca_oob_tcp_recv_t *ptr)
|
|
{
|
|
ptr->hdr_recvd = false;
|
|
ptr->rdptr = NULL;
|
|
ptr->rdbytes = 0;
|
|
}
|
|
OBJ_CLASS_INSTANCE(mca_oob_tcp_recv_t,
|
|
opal_list_item_t,
|
|
rcv_cons, NULL);
|
|
|
|
static void err_cons(mca_oob_tcp_msg_error_t *ptr)
|
|
{
|
|
ptr->rmsg = NULL;
|
|
ptr->snd = NULL;
|
|
}
|
|
OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_error_t,
|
|
opal_object_t,
|
|
err_cons, NULL);
|
|
|