1
1

Add the necessary infrastructure to allow the dumping of all TCP

informations related to an endpoint (status and all pending fragments).
Do some minor space cleanup.
Этот коммит содержится в:
George Bosilca 2014-12-13 01:59:55 -05:00
родитель 5b8616d890
Коммит 2edbe16c47
3 изменённых файлов: 238 добавлений и 158 удалений

Просмотреть файл

@ -2,10 +2,10 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2014 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
@ -13,9 +13,9 @@
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
*
* Additional copyrights may follow
*
*
* $HEADER$
*
*/
@ -57,7 +57,7 @@
#include "opal/mca/btl/base/btl_base_error.h"
#include "btl_tcp.h"
#include "btl_tcp_endpoint.h"
#include "btl_tcp_endpoint.h"
#include "btl_tcp_proc.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_addr.h"
@ -102,9 +102,9 @@ static void mca_btl_tcp_endpoint_destruct(mca_btl_tcp_endpoint_t* endpoint)
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp_endpoint_t,
opal_list_item_t,
mca_btl_tcp_endpoint_construct,
mca_btl_tcp_endpoint_t,
opal_list_item_t,
mca_btl_tcp_endpoint_construct,
mca_btl_tcp_endpoint_destruct);
@ -125,11 +125,21 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user);
*/
#if WANT_PEER_DUMP
static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, const char* msg)
/**
* The lack of protection in the mca_btl_tcp_endpoint_dump function is voluntary
* so that it can be called regardless of the state of the mutexes. As a result,
* when multiple threads work on the same endpoint not only the information
* displayed might be inacurate, but when we manipulate the pending fragments we
* might access freed memory. Thus, the caller should lock the endpoint prior
* to the call.
*/
static void
mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint,
bool full_info,
const char* msg)
{
char src[64];
char dst[64];
int sndbuf,rcvbuf,nodelay,flags;
char src[64], dst[64], outmsg[1024];
int sndbuf, rcvbuf, nodelay, flags, used = 0;
#if OPAL_ENABLE_IPV6
struct sockaddr_storage inaddr;
#else
@ -137,6 +147,7 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
#endif
opal_socklen_t obtlen;
opal_socklen_t addrlen = sizeof(inaddr);
mca_btl_tcp_frag_t* item;
getsockname(btl_endpoint->endpoint_sd, (struct sockaddr*)&inaddr, &addrlen);
#if OPAL_ENABLE_IPV6
@ -163,43 +174,93 @@ static void mca_btl_tcp_endpoint_dump(mca_btl_base_endpoint_t* btl_endpoint, con
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
#endif
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
used += snprintf(&outmsg[used], 1024 - used, "%s: %s - %s [%d",
msg, src, dst, btl_endpoint->endpoint_sd);
switch(btl_endpoint->endpoint_state) {
case MCA_BTL_TCP_CONNECTING:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "connecting");
break;
case MCA_BTL_TCP_CONNECT_ACK:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "ack");
break;
case MCA_BTL_TCP_CLOSED:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "close");
break;
case MCA_BTL_TCP_FAILED:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "failed");
break;
case MCA_BTL_TCP_CONNECTED:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "connected");
break;
default:
used += snprintf(&outmsg[used], 1024 - used, ":%s]", "unknown");
break;
}
if( full_info ) {
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#if defined(SO_SNDBUF)
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(sndbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &obtlen) < 0) {
BTL_ERROR(("SO_SNDBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
sndbuf = -1;
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(rcvbuf);
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &obtlen) < 0) {
BTL_ERROR(("SO_RCVBUF option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
rcvbuf = -1;
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
obtlen = sizeof(nodelay);
if(getsockopt(btl_endpoint->endpoint_sd, IPPROTO_TCP, TCP_NODELAY, (char *)&nodelay, &obtlen) < 0) {
BTL_ERROR(("TCP_NODELAY option: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#else
nodelay = 0;
nodelay = 0;
#endif
used += snprintf(&outmsg[used], 1024 - used, " nodelay %d sndbuf %d rcvbuf %d flags %08x",
nodelay, sndbuf, rcvbuf, flags);
#if MCA_BTL_TCP_ENDPOINT_CACHE
used += snprintf(&outmsg[used], 1024 - used, "\n\t[cache %p used %lu/%lu]",
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
btl_endpoint->endpoint_cache_length);
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
used += snprintf(&outmsg[used], 1024 - used, "{%s - retries %d}",
(btl_endpoint->endpoint_nbo ? "NBO" : ""), (int)btl_endpoint->endpoint_retries);
}
used += snprintf(&outmsg[used], 1024 - used, "\n");
BTL_VERBOSE(("%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags));
if( NULL != btl_endpoint->endpoint_recv_frag )
used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_recv_frag, "active recv",
&outmsg[used], 1024 - used);
if( NULL != btl_endpoint->endpoint_send_frag )
used += mca_btl_tcp_frag_dump(btl_endpoint->endpoint_send_frag, "active send (inaccurate iov)",
&outmsg[used], 1024 - used);
OPAL_LIST_FOREACH(item, &btl_endpoint->endpoint_frags, mca_btl_tcp_frag_t) {
used += mca_btl_tcp_frag_dump(item, "pending send", &outmsg[used], 1024 - used);
}
BTL_VERBOSE(("%s", outmsg));
}
#endif
#endif /* WANT_PEER_DUMP */
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
#define MCA_BTL_TCP_ENDPOINT_DUMP(ENDPOINT, INFO, MSG) mca_btl_tcp_endpoint_dump((ENDPOINT), (INFO), (MSG))
#else
#define MCA_BTL_TCP_ENDPOINT_DUMP(ENDPOINT, INFO, MSG)
#endif /* OPAL_ENABLE_DEBUG && WANT_PEER_DUMP */
/*
* Initialize events to be used by the endpoint instance for TCP select/poll callbacks.
@ -213,20 +274,20 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
opal_event_set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_recv_handler,
btl_endpoint );
/**
* The send event should be non persistent until the endpoint is
* completely connected. This means, when the event is created it
* will be fired only once, and when the endpoint is marked as
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp_endpoint_send_handler,
btl_endpoint);
}
@ -267,13 +328,16 @@ int mca_btl_tcp_endpoint_send(mca_btl_base_endpoint_t* btl_endpoint, mca_btl_tcp
if( btl_ownership ) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "complete send fragment [endpoint_send]");
return 1;
} else {
btl_endpoint->endpoint_send_frag = frag;
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [endpoint_send]");
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
}
} else {
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "send fragment enqueued [endpoint_send]");
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;
opal_list_append(&btl_endpoint->endpoint_frags, (opal_list_item_t*)frag);
}
@ -311,7 +375,7 @@ static int mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpo
/*
* Send the globally unique identifier for this process to a endpoint on
* Send the globally unique identifier for this process to a endpoint on
* a newly connected socket.
*/
@ -322,7 +386,7 @@ static int mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_en
opal_process_name_t guid = btl_proc->proc_opal->proc_name;
OPAL_PROCESS_NAME_HTON(guid);
if(mca_btl_tcp_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
if(mca_btl_tcp_endpoint_send_blocking(btl_endpoint, &guid, sizeof(guid)) !=
sizeof(guid)) {
return OPAL_ERR_UNREACH;
}
@ -335,7 +399,7 @@ static int mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_en
* (1) if a connection has not been attempted, accept the connection
* (2) if a connection has not been established, and the endpoints process identifier
* is less than the local process, accept the connection
* otherwise, reject the connection and continue with the current connection
* otherwise, reject the connection and continue with the current connection
*/
bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
@ -368,11 +432,12 @@ bool mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
return false;
}
mca_btl_tcp_endpoint_event_init(btl_endpoint);
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(recv) [endpoint_accept]");
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
mca_btl_tcp_endpoint_connected(btl_endpoint);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp_endpoint_dump(btl_endpoint, "accepted");
#endif
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "accepted");
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return true;
@ -393,7 +458,9 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
if(btl_endpoint->endpoint_sd < 0)
return;
btl_endpoint->endpoint_retries++;
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(recv) [close]");
opal_event_del(&btl_endpoint->endpoint_recv_event);
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(send) [close]");
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
@ -410,7 +477,7 @@ void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
*/
if( MCA_BTL_TCP_FAILED == btl_endpoint->endpoint_state ) {
mca_btl_tcp_frag_t* frag = btl_endpoint->endpoint_send_frag;
if( NULL == frag )
if( NULL == frag )
frag = (mca_btl_tcp_frag_t*)opal_list_remove_first(&btl_endpoint->endpoint_frags);
while(NULL != frag) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, OPAL_ERR_UNREACH);
@ -436,8 +503,8 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
opal_event_set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_send_handler,
btl_endpoint );
@ -446,6 +513,7 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
if(NULL == btl_endpoint->endpoint_send_frag)
btl_endpoint->endpoint_send_frag = (mca_btl_tcp_frag_t*)
opal_list_remove_first(&btl_endpoint->endpoint_frags);
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [endpoint_connected]");
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
}
}
@ -518,7 +586,7 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
* to be able to exchange the opal_process_name_t over the network.
*/
if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
BTL_ERROR(("received unexpected process identifier %s",
BTL_ERROR(("received unexpected process identifier %s",
OPAL_NAME_PRINT(guid)));
mca_btl_tcp_endpoint_close(btl_endpoint);
return OPAL_ERR_UNREACH;
@ -534,21 +602,21 @@ void mca_btl_tcp_set_socket_options(int sd)
int optval;
optval = !mca_btl_tcp_component.tcp_not_use_nodelay;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)) < 0) {
BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
BTL_ERROR(("setsockopt(TCP_NODELAY) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_SNDBUF)
if(mca_btl_tcp_component.tcp_sndbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_btl_tcp_component.tcp_sndbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
BTL_ERROR(("setsockopt(SO_SNDBUF) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
#if defined(SO_RCVBUF)
if(mca_btl_tcp_component.tcp_rcvbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_btl_tcp_component.tcp_rcvbuf, sizeof(int)) < 0) {
BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
BTL_ERROR(("setsockopt(SO_RCVBUF) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
#endif
@ -570,7 +638,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
/* By default consider a IPv4 connection */
uint16_t af_family = AF_INET;
opal_socklen_t addrlen = sizeof(struct sockaddr_in);
#if OPAL_ENABLE_IPV6
if (AF_INET6 == btl_endpoint->endpoint_addr->addr_family) {
af_family = AF_INET6;
@ -592,19 +660,19 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
/* setup the socket as non-blocking */
if((flags = fcntl(btl_endpoint->endpoint_sd, F_GETFL, 0)) < 0) {
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
BTL_ERROR(("fcntl(F_GETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
} else {
flags |= O_NONBLOCK;
if(fcntl(btl_endpoint->endpoint_sd, F_SETFL, flags) < 0)
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
BTL_ERROR(("fcntl(F_SETFL) failed: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno));
}
/* start the connect - will likely fail with EINPROGRESS */
mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
opal_output_verbose(20, opal_btl_base_framework.framework_output,
opal_output_verbose(20, opal_btl_base_framework.framework_output,
"btl: tcp: attempting to connect() to %s address %s on port %d",
OPAL_NAME_PRINT(btl_endpoint->endpoint_proc->proc_opal->proc_name),
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
@ -614,6 +682,7 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
/* non-blocking so wait for completion */
if(opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECTING;
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(send) [start_connect]");
opal_event_add(&btl_endpoint->endpoint_send_event, 0);
return OPAL_SUCCESS;
}
@ -634,18 +703,18 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
/* send our globally unique process identifier to the endpoint */
if((rc = mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint)) == OPAL_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_add(recv) [start_connect]");
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
} else {
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return OPAL_SUCCESS;
}
return rc;
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
}
/*
* Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the endpoint on the
* later. Otherwise, send this processes identifier to the endpoint on the
* newly connected socket.
*/
static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_endpoint)
@ -658,7 +727,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
/* check connect completion status */
if(getsockopt(btl_endpoint->endpoint_sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) {
BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
BTL_ERROR(("getsockopt() to %s failed: %s (%d)",
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
strerror(opal_socket_errno), opal_socket_errno));
mca_btl_tcp_endpoint_close(btl_endpoint);
@ -668,7 +737,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
return;
}
if(so_error != 0) {
BTL_ERROR(("connect() to %s failed: %s (%d)",
BTL_ERROR(("connect() to %s failed: %s (%d)",
opal_net_get_hostname((struct sockaddr*) &endpoint_addr),
strerror(so_error), so_error));
mca_btl_tcp_endpoint_close(btl_endpoint);
@ -683,6 +752,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
if(mca_btl_tcp_endpoint_send_connect_ack(btl_endpoint) == OPAL_SUCCESS) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_CONNECT_ACK;
opal_event_add(&btl_endpoint->endpoint_recv_event, 0);
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_add(recv) [complete_connect]");
} else {
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
@ -691,7 +761,7 @@ static void mca_btl_tcp_endpoint_complete_connect(mca_btl_base_endpoint_t* btl_e
/*
* A file descriptor is available/ready for recv. Check the state
* A file descriptor is available/ready for recv. Check the state
* of the socket and take the appropriate action.
*/
@ -734,9 +804,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
OPAL_THREAD_LOCK(&btl_endpoint->endpoint_send_lock);
mca_btl_tcp_endpoint_connected(btl_endpoint);
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
#if OPAL_ENABLE_DEBUG && WANT_PEER_DUMP
mca_btl_tcp_endpoint_dump(btl_endpoint, "connected");
#endif
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "connected");
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
@ -747,13 +815,13 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
frag = btl_endpoint->endpoint_recv_frag;
if(NULL == frag) {
if(mca_btl_tcp_module.super.btl_max_send_size >
mca_btl_tcp_module.super.btl_eager_limit) {
if(mca_btl_tcp_module.super.btl_max_send_size >
mca_btl_tcp_module.super.btl_eager_limit) {
MCA_BTL_TCP_FRAG_ALLOC_MAX(frag);
} else {
} else {
MCA_BTL_TCP_FRAG_ALLOC_EAGER(frag);
}
if(NULL == frag) {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
return;
@ -858,16 +926,15 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
/* if nothing else to do unregister for send event notifications */
if(NULL == btl_endpoint->endpoint_send_frag) {
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, false, "event_del(send) [endpoint_send_handler]");
opal_event_del(&btl_endpoint->endpoint_send_event);
}
break;
default:
BTL_ERROR(("invalid connection state (%d)", btl_endpoint->endpoint_state));
MCA_BTL_TCP_ENDPOINT_DUMP(btl_endpoint, true, "event_del(send) [endpoint_send_handler:error]");
opal_event_del(&btl_endpoint->endpoint_send_event);
break;
}
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
}

Просмотреть файл

@ -3,10 +3,10 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2014 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
@ -14,15 +14,15 @@
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
*
* Additional copyrights may follow
*
*
* $HEADER$
*
* In windows, many of the socket functions return an EWOULDBLOCK
* instead of \ things like EAGAIN, EINPROGRESS, etc. It has been
* verified that this will \ not conflict with other error codes that
* are returned by these functions \ under UNIX/Linux environments
* are returned by these functions \ under UNIX/Linux environments
*/
#include "opal_config.h"
@ -42,52 +42,65 @@
#include "opal/opal_socket_errno.h"
#include "opal/mca/btl/base/btl_base_error.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_frag.h"
#include "btl_tcp_endpoint.h"
static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = mca_btl_tcp_module.super.btl_eager_limit;
static void mca_btl_tcp_frag_eager_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = mca_btl_tcp_module.super.btl_eager_limit;
frag->my_list = &mca_btl_tcp_component.tcp_frag_eager;
}
static void mca_btl_tcp_frag_max_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = mca_btl_tcp_module.super.btl_max_send_size;
static void mca_btl_tcp_frag_max_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = mca_btl_tcp_module.super.btl_max_send_size;
frag->my_list = &mca_btl_tcp_component.tcp_frag_max;
}
static void mca_btl_tcp_frag_user_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = 0;
static void mca_btl_tcp_frag_user_constructor(mca_btl_tcp_frag_t* frag)
{
frag->size = 0;
frag->my_list = &mca_btl_tcp_component.tcp_frag_user;
}
OBJ_CLASS_INSTANCE(
mca_btl_tcp_frag_t,
mca_btl_base_descriptor_t,
NULL,
NULL);
mca_btl_tcp_frag_t,
mca_btl_base_descriptor_t,
NULL,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp_frag_eager_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_eager_constructor,
NULL);
mca_btl_tcp_frag_eager_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_eager_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp_frag_max_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_max_constructor,
NULL);
mca_btl_tcp_frag_max_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_max_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_tcp_frag_user_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_user_constructor,
NULL);
mca_btl_tcp_frag_user_t,
mca_btl_base_descriptor_t,
mca_btl_tcp_frag_user_constructor,
NULL);
size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t* frag, char* msg, char* buf, size_t length)
{
int i, used = 0;
used += snprintf(&buf[used], length - used, "%s frag %p iov_cnt %d iov_idx %d size %lu\n",
msg, (void*)frag, (int)frag->iov_cnt, (int)frag->iov_idx, frag->size);
for( i = 0; i < (int)frag->iov_cnt; i++ ) {
used += snprintf(&buf[used], length - used, "[%s%p:%lu] ",
(i < (int)frag->iov_idx ? "*" : ""),
frag->iov[i].iov_base, frag->iov[i].iov_len);
}
return used;
}
bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
{
@ -111,7 +124,7 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
mca_btl_tcp_endpoint_close(frag->endpoint);
return false;
default:
BTL_ERROR(("mca_btl_tcp_frag_send: writev failed: %s (%d)",
BTL_ERROR(("mca_btl_tcp_frag_send: writev failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
frag->endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
@ -178,7 +191,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
* iovec for the caching in the fragment structure (the +1).
*/
frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache_pos;
frag->iov_ptr[num_vecs].iov_len =
frag->iov_ptr[num_vecs].iov_len =
mca_btl_tcp_component.tcp_endpoint_cache - btl_endpoint->endpoint_cache_length;
num_vecs++;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
@ -187,32 +200,32 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
cnt = -1;
while( cnt < 0 ) {
cnt = readv(sd, frag->iov_ptr, num_vecs);
if( 0 < cnt ) goto advance_iov_position;
if( cnt == 0 ) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
switch(opal_socket_errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
case EFAULT:
if( 0 < cnt ) goto advance_iov_position;
if( cnt == 0 ) {
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
switch(opal_socket_errno) {
case EINTR:
continue;
case EWOULDBLOCK:
return false;
case EFAULT:
BTL_ERROR(("mca_btl_tcp_frag_recv: readv error (%p, %lu)\n\t%s(%lu)\n",
frag->iov_ptr[0].iov_base, (unsigned long) frag->iov_ptr[0].iov_len,
strerror(opal_socket_errno), (unsigned long) frag->iov_cnt));
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
default:
BTL_ERROR(("mca_btl_tcp_frag_recv: readv failed: %s (%d)",
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
default:
BTL_ERROR(("mca_btl_tcp_frag_recv: readv failed: %s (%d)",
strerror(opal_socket_errno),
opal_socket_errno));
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
}
advance_iov_position:
@ -225,11 +238,11 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
frag->iov_ptr->iov_len -= cnt;
cnt = 0;
break;
}
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
frag->iov_cnt--;
}
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
frag->iov_cnt--;
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
btl_endpoint->endpoint_cache_length = cnt;
@ -247,7 +260,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
frag->iov[1].iov_len = frag->hdr.size;
frag->iov_cnt++;
#ifndef __sparc
/* The following cannot be done for sparc code
/* The following cannot be done for sparc code
* because it causes alignment errors when accessing
* structures later on in the btl and pml code.
*/
@ -279,4 +292,3 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
}
return false;
}

Просмотреть файл

@ -3,19 +3,19 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2013 The University of Tennessee and The University
* Copyright (c) 2004-2014 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
*
* Additional copyrights may follow
*
*
* $HEADER$
*/
@ -35,7 +35,7 @@
#include <net/uio.h>
#endif
#include "btl_tcp.h"
#include "btl_tcp.h"
#include "btl_tcp_hdr.h"
BEGIN_C_DECLS
@ -46,33 +46,33 @@ BEGIN_C_DECLS
* TCP fragment derived type.
*/
struct mca_btl_tcp_frag_t {
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segments[2];
struct mca_btl_base_endpoint_t *endpoint;
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segments[2];
struct mca_btl_base_endpoint_t *endpoint;
struct mca_btl_tcp_module_t* btl;
mca_btl_tcp_hdr_t hdr;
struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
struct iovec *iov_ptr;
size_t iov_cnt;
size_t iov_idx;
size_t size;
size_t size;
int rc;
ompi_free_list_t* my_list;
};
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_t);
};
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_t);
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_eager_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_eager_t);
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_eager_t;
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_max_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_max_t);
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_eager_t);
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_user_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_user_t);
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_max_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_max_t);
typedef struct mca_btl_tcp_frag_t mca_btl_tcp_frag_user_t;
OBJ_CLASS_DECLARATION(mca_btl_tcp_frag_user_t);
/*
@ -126,6 +126,7 @@ do { \
bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t*, int sd);
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t*, int sd);
size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t*, char*, char*, size_t);
END_C_DECLS
#endif