1
1

*** no code review - needed to get this in so others could run benchmarks ***

- optional pml/ptl statistics for diagnostics
- fixed libevent select case, added support for waking up progress thread 
  that is asleep in select/poll to pickup changes to fdset
- setup tcp socket options when socket is created
- added mca parameter for tcp snd/rcv buffer size

This commit was SVN r1084.
Этот коммит содержится в:
Tim Woodall 2004-04-23 01:38:41 +00:00
родитель d355978053
Коммит d1e599d3ea
19 изменённых файлов: 261 добавлений и 62 удалений

Просмотреть файл

@ -132,7 +132,12 @@ struct lam_event_list lam_signalqueue;
struct lam_event_list lam_eventqueue;
static struct timeval lam_event_tv;
lam_mutex_t lam_event_lock;
#if LAM_HAVE_THREADS
lam_thread_t lam_event_thread;
lam_event_t lam_event_pipe_event;
int lam_event_pipe[2];
int lam_event_pipe_signalled;
#endif
static int
compare(struct lam_event *a, struct lam_event *b)
@ -156,6 +161,17 @@ static void* lam_event_run(lam_object_t* arg)
}
#if LAM_HAVE_THREADS
static void lam_event_pipe_handler(int sd, short flags, void* user)
{
unsigned char byte;
if(read(sd, &byte, 1) != 1) {
lam_output(0, "lam_event_pipe: read failed with: errno=%d\n", errno);
lam_event_del(&lam_event_pipe_event);
}
}
#endif
int
lam_event_init(void)
@ -189,6 +205,21 @@ lam_event_init(void)
errx(1, "%s: no event mechanism available", __func__);
#if LAM_HAVE_THREADS
if(pipe(lam_event_pipe) != 0) {
lam_output(0, "lam_event_init: pipe() failed with errno=%d\n", errno);
return LAM_ERROR;
}
lam_event_pipe_signalled = 1;
lam_event_set(
&lam_event_pipe_event,
lam_event_pipe[0],
LAM_EV_READ|LAM_EV_PERSIST,
lam_event_pipe_handler,
0);
lam_event_add_i(&lam_event_pipe_event, 0);
lam_event_pipe_signalled = 0;
/* spin up a thread to dispatch events */
OBJ_CONSTRUCT(&lam_event_thread, lam_thread_t);
lam_event_thread.t_run = lam_event_run;
@ -251,6 +282,7 @@ lam_event_loop(int flags)
/* Calculate the initial events that we are waiting for */
if (lam_evsel->recalc(lam_evbase, 0) == -1) {
lam_output(0, "lam_event_loop: lam_evsel->recalc() failed.");
lam_mutex_unlock(&lam_event_lock);
return (-1);
}
@ -263,6 +295,7 @@ lam_event_loop(int flags)
if (res == -1) {
lam_output(0, "lam_event_loop: lam_event_sigcb() failed.");
errno = EINTR;
lam_mutex_unlock(&lam_event_lock);
return (-1);
}
}
@ -286,9 +319,16 @@ lam_event_loop(int flags)
else
timerclear(&tv);
#if LAM_HAVE_THREADS
lam_event_pipe_signalled = 0;
#endif
res = lam_evsel->dispatch(lam_evbase, &tv);
#if LAM_HAVE_THREADS
lam_event_pipe_signalled = 1;
#endif
if (res == -1) {
lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed.");
lam_mutex_unlock(&lam_event_lock);
return (-1);
}
@ -298,11 +338,12 @@ lam_event_loop(int flags)
lam_event_process_active();
if (flags & LAM_EVLOOP_ONCE)
done = 1;
} else if (flags & LAM_EVLOOP_NONBLOCK)
} else if (flags & (LAM_EVLOOP_NONBLOCK|LAM_EVLOOP_ONCE))
done = 1;
if (lam_evsel->recalc(lam_evbase, 0) == -1) {
lam_output(0, "lam_event_loop: lam_evesel->recalc() failed.");
lam_mutex_unlock(&lam_event_lock);
return (-1);
}
}
@ -407,6 +448,15 @@ lam_event_add_i(struct lam_event *ev, struct timeval *tv)
lam_event_queue_insert(ev, LAM_EVLIST_SIGNAL);
return (lam_evsel->add(lam_evbase, ev));
}
#if LAM_HAVE_THREADS
if(lam_event_pipe_signalled == 0) {
unsigned char byte = 0;
if(write(lam_event_pipe[1], &byte, 1) != 1)
lam_output(0, "lam_event_add: write() to lam_event_pipe[1] failed with errno=%d\n", errno);
lam_event_pipe_signalled++;
}
#endif
return (0);
}
@ -446,6 +496,15 @@ int lam_event_del_i(struct lam_event *ev)
lam_event_queue_remove(ev, LAM_EVLIST_SIGNAL);
return (lam_evsel->del(lam_evbase, ev));
}
#if LAM_HAVE_THREADS
if(lam_event_pipe_signalled == 0) {
unsigned char byte = 0;
if(write(lam_event_pipe[1], &byte, 1) != 1)
lam_output(0, "lam_event_add: write() to lam_event_pipe[1] failed with errno=%d\n", errno);
lam_event_pipe_signalled++;
}
#endif
return (0);
}
@ -454,6 +513,7 @@ int lam_event_del(struct lam_event *ev)
int rc;
lam_mutex_lock(&lam_event_lock);
rc = lam_event_del_i(ev);
lam_mutex_unlock(&lam_event_lock);
return rc;
}

Просмотреть файл

@ -132,7 +132,7 @@ struct lam_eventop {
int (*dispatch)(void *, struct timeval *);
};
#define LAM_TIMEOUT_DEFAULT {0, 100000}
#define LAM_TIMEOUT_DEFAULT {10, 0}
#define LAM_EVLOOP_ONCE 0x01
#define LAM_EVLOOP_NONBLOCK 0x02

Просмотреть файл

@ -27,6 +27,7 @@
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "lam_config.h"
#include "util/output.h"
#include <sys/types.h>
#ifdef HAVE_SYS_TIME_H
@ -202,7 +203,8 @@ select_dispatch(void *arg, struct timeval *tv)
if (!(ev->ev_events & LAM_EV_PERSIST))
lam_event_del_i(ev);
lam_event_active_i(ev, res, 1);
} else if (ev->ev_fd > maxfd)
}
if (ev->ev_fd > maxfd)
maxfd = ev->ev_fd;
}

Просмотреть файл

@ -18,7 +18,7 @@ extern lam_class_t mca_pml_teg_ptl_array_t_class;
* that can be used to reach the process.
*/
struct mca_ptl_proc_t {
double ptl_weight; /**< PTL weight for scheduling */
int ptl_weight; /**< PTL weight for scheduling */
struct mca_ptl_base_peer_t* ptl_peer; /**< PTL addressing info */
mca_ptl_t *ptl; /**< PTL implementation */
};

Просмотреть файл

@ -137,7 +137,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
size_t p;
for(p=0; p<nprocs; p++) {
lam_proc_t *proc = procs[p];
uint64_t total_bandwidth = 0;
double total_bandwidth = 0;
uint32_t latency = 0;
size_t n_index, p_index;
size_t n_size;
@ -207,10 +207,12 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
for(n_index = 0; n_index < n_size; n_index++) {
struct mca_ptl_proc_t* ptl_proc = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, n_index);
struct mca_ptl_t *ptl = ptl_proc->ptl;
double weight;
if(ptl->ptl_bandwidth)
ptl_proc->ptl_weight = total_bandwidth / ptl_proc->ptl->ptl_bandwidth;
weight = total_bandwidth / ptl_proc->ptl->ptl_bandwidth;
else
ptl_proc->ptl_weight = 1.0 / n_size;
weight = 1.0 / n_size;
ptl_proc->ptl_weight = (int)(weight * 100);
/* check to see if this ptl is already in the array of ptls used for first
* fragments - if not add it.

Просмотреть файл

@ -17,7 +17,7 @@
#include "mca/pml/base/pml_base_request.h"
#include "mca/ptl/ptl.h"
#define MCA_PML_TEG_STATISTICS 1
#define MCA_PML_TEG_STATISTICS 0
/**
* TEG PML Interface
@ -44,6 +44,10 @@ struct mca_pml_teg_t {
lam_free_list_t teg_recv_requests;
#if MCA_PML_TEG_STATISTICS
long teg_isends;
long teg_irecvs;
long teg_sends;
long teg_recvs;
long teg_waits;
long teg_condition_waits;
long teg_condition_broadcasts;

Просмотреть файл

@ -41,6 +41,9 @@ int mca_pml_teg_irecv(
int rc;
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_irecvs++;
#endif
if(NULL == recvreq)
return rc;
@ -74,6 +77,9 @@ int mca_pml_teg_recv(
{
int rc, index;
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_recvs++;
#endif
if(NULL == recvreq)
return rc;

Просмотреть файл

@ -51,8 +51,10 @@ int mca_pml_teg_isend(
lam_request_t **request)
{
int rc;
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_isends++;
#endif
if(rc != LAM_SUCCESS)
return rc;
mca_ptl_base_send_request_init(
@ -85,6 +87,9 @@ int mca_pml_teg_send(
{
int rc, index;
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_sends++;
#endif
if(rc != LAM_SUCCESS)
return rc;

Просмотреть файл

@ -73,6 +73,10 @@ int mca_pml_teg_module_open(void)
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_waits = 0;
mca_pml_teg.teg_sends = 0;
mca_pml_teg.teg_recvs = 0;
mca_pml_teg.teg_isends = 0;
mca_pml_teg.teg_irecvs = 0;
mca_pml_teg.teg_condition_waits = 0;
mca_pml_teg.teg_condition_broadcasts = 0;
#endif
@ -91,9 +95,17 @@ int mca_pml_teg_module_open(void)
int mca_pml_teg_module_close(void)
{
#if MCA_PML_TEG_STATISTICS
#if MCA_PML_TEG_STATISTICS && LAM_ENABLE_DEBUG
lam_output(0, "mca_pml_teg.teg_waits = %d\n",
mca_pml_teg.teg_waits);
lam_output(0, "mca_pml_teg.teg_sends = %d\n",
mca_pml_teg.teg_sends);
lam_output(0, "mca_pml_teg.teg_recvs = %d\n",
mca_pml_teg.teg_recvs);
lam_output(0, "mca_pml_teg.teg_isends = %d\n",
mca_pml_teg.teg_isends);
lam_output(0, "mca_pml_teg.teg_irecvs = %d\n",
mca_pml_teg.teg_irecvs);
lam_output(0, "mca_pml_teg.teg_condition_waits = %d\n",
mca_pml_teg.teg_condition_waits);
lam_output(0, "mca_pml_teg.teg_condition_broadcast = %d\n",

Просмотреть файл

@ -20,8 +20,8 @@ void mca_pml_teg_recv_request_progress(
if(mca_pml_teg.teg_request_waiting) {
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_condition_broadcasts++;
lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
#endif
lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
}
}
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);

Просмотреть файл

@ -47,7 +47,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
* previously assigned)
*/
else {
bytes_to_frag = ptl_proc->ptl_weight * req->req_bytes_msg;
bytes_to_frag = (ptl_proc->ptl_weight * req->req_bytes_msg) / 100;
if(bytes_to_frag > bytes_remaining)
bytes_to_frag = bytes_remaining;
}

Просмотреть файл

@ -124,7 +124,6 @@ void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv
void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send_frag_t* frag)
{
/* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */
if(lam_list_get_size(&mca_ptl_tcp_module.tcp_pending_acks)) {
mca_ptl_tcp_recv_frag_t* pending;
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);

Просмотреть файл

@ -15,6 +15,7 @@
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#define MCA_PTL_TCP_STATISTICS 0
/**
* TCP PTL module.
@ -31,6 +32,8 @@ struct mca_ptl_tcp_module_1_0_0_t {
int tcp_free_list_num; /**< initial size of free lists */
int tcp_free_list_max; /**< maximum size of free lists */
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
int tcp_sndbuf; /**< socket sndbuf size */
int tcp_rcvbuf; /**< socket rcvbuf size */
lam_free_list_t tcp_send_requests; /**< free list of tcp send requests -- sendreq + sendfrag */
lam_free_list_t tcp_send_frags; /**< free list of tcp send fragments */
lam_free_list_t tcp_recv_frags; /**< free list of tcp recv fragments */
@ -101,6 +104,11 @@ struct mca_ptl_tcp_t {
int ptl_ifindex; /**< PTL interface index */
struct sockaddr_in ptl_ifaddr; /**< PTL interface address */
struct sockaddr_in ptl_ifmask; /**< PTL interface netmask */
#if MCA_PTL_TCP_STATISTICS
size_t ptl_bytes_sent;
size_t ptl_bytes_recv;
size_t ptl_send_handler;
#endif
};
typedef struct mca_ptl_tcp_t mca_ptl_tcp_t;

Просмотреть файл

@ -123,6 +123,10 @@ int mca_ptl_tcp_module_open(void)
mca_ptl_tcp_param_register_int("free_list_max", -1);
mca_ptl_tcp_module.tcp_free_list_inc =
mca_ptl_tcp_param_register_int("free_list_inc", 256);
mca_ptl_tcp_module.tcp_sndbuf =
mca_ptl_tcp_param_register_int("sndbuf", 128*1024);
mca_ptl_tcp_module.tcp_rcvbuf =
mca_ptl_tcp_param_register_int("rcvbuf", 128*1024);
mca_ptl_tcp.super.ptl_exclusivity =
mca_ptl_tcp_param_register_int("exclusivity", 0);
mca_ptl_tcp.super.ptl_first_frag_size =
@ -184,6 +188,11 @@ static int mca_ptl_tcp_create(int if_index)
/* initialize the ptl */
ptl->ptl_ifindex = if_index;
#if MCA_PTL_TCP_STATISTICS
ptl->ptl_bytes_recv = 0;
ptl->ptl_bytes_sent = 0;
ptl->ptl_send_handler = 0;
#endif
lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->ptl_ifaddr, sizeof(ptl->ptl_ifaddr));
lam_ifindextomask(if_index, (struct sockaddr*)&ptl->ptl_ifmask, sizeof(ptl->ptl_ifmask));
return LAM_SUCCESS;
@ -463,6 +472,7 @@ static void mca_ptl_tcp_module_accept(void)
lam_output(0, "mca_ptl_tcp_module_accept: accept() failed with errno %d.", errno);
return;
}
mca_ptl_tcp_set_socket_options(sd);
/* wait for receipt of peers process identifier to complete this connection */
event = malloc(sizeof(lam_event_t));
@ -521,6 +531,16 @@ static void mca_ptl_tcp_module_recv_handler(int sd, short flags, void* user)
return;
}
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
lam_output(0, "mca_ptl_tcp_module_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno);
}
}
/* lookup the corresponding process */
ptl_proc = mca_ptl_tcp_proc_lookup(guid, size);
if(NULL == ptl_proc) {

Просмотреть файл

@ -7,6 +7,9 @@
#include <sys/errno.h>
#include <sys/types.h>
#include <sys/fcntl.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "types.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "ptl_tcp.h"
@ -57,6 +60,59 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
}
/*
* diagnostics
*/
static void mca_ptl_tcp_peer_dump(mca_ptl_base_peer_t* ptl_peer, const char* msg)
{
char src[64];
char dst[64];
char buff[255];
int sndbuf,rcvbuf,nodelay,flags;
struct sockaddr_in inaddr;
lam_socklen_t optlen;
lam_socklen_t addrlen = sizeof(struct sockaddr_in);
getsockname(ptl_peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(src, "%s", inet_ntoa(inaddr.sin_addr));
getpeername(ptl_peer->peer_sd, (struct sockaddr*)&inaddr, &addrlen);
sprintf(dst, "%s", inet_ntoa(inaddr.sin_addr));
if((flags = fcntl(ptl_peer->peer_sd, F_GETFL, 0)) < 0) {
lam_output(0, "mca_ptl_tcp_peer_connect: fcntl(F_GETFL) failed with errno=%d\n", errno);
}
#if defined(SO_SNDBUF)
optlen = sizeof(sndbuf);
if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf, &optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_dump: SO_SNDBUF option: errno %d\n", errno);
}
#else
sndbuf = -1;
#endif
#if defined(SO_RCVBUF)
optlen = sizeof(rcvbuf);
if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_RCVBUF, (char *)&rcvbuf, &optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_dump: SO_RCVBUF option: errno %d\n", errno);
}
#else
rcvbuf = -1;
#endif
#if defined(TCP_NODELAY)
optlen = sizeof(nodelay);
if(getsockopt(ptl_peer->peer_sd, IPPROTO_TCP, TCP_NODELAY, &nodelay, &optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_dump: TCP_NODELAY option: errno %d\n", errno);
}
#else
nodelay = 0;
#endif
sprintf(buff, "%s: %s - %s nodelay %d sndbuf %d rcvbuf %d flags %08x\n",
msg, src, dst, nodelay, sndbuf, rcvbuf, flags);
lam_output(0, buff);
}
static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, int sd)
{
lam_event_set(
@ -105,9 +161,9 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
rc = LAM_ERR_UNREACH;
break;
case MCA_PTL_TCP_CONNECTED:
if (NULL != ptl_peer->peer_send_frag)
if (NULL != ptl_peer->peer_send_frag) {
lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag);
else {
} else {
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) {
mca_ptl_tcp_send_frag_progress(frag);
} else {
@ -197,6 +253,9 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
mca_ptl_tcp_peer_event_init(ptl_peer, sd);
lam_event_add(&ptl_peer->peer_recv_event, 0);
mca_ptl_tcp_peer_connected(ptl_peer);
#if LAM_ENABLE_DEBUG
mca_ptl_tcp_peer_dump(ptl_peer, "accepted");
#endif
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return true;
@ -233,30 +292,6 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer)
{
/* setup socket options */
int optval = 1;
lam_socklen_t optlen = sizeof(optval);
#if defined(TCP_NODELAY)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_connected: setsockopt(TCP_NODELAY) failed with errno=%d\n", errno);
}
#endif
#if defined(TCP_NODELACK)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELACK, &optval, optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_connected: setsockopt(TCP_NODELACK) failed with errno=%d\n", errno);
}
#endif
#if 0
#if defined(TCP_QUICKACK)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_QUICKACK, &optval, optlen) < 0) {
lam_output(0, "mca_ptl_tcp_peer_connected: setsockopt(TCP_QUICKACK) failed with errno=%d\n", errno);
}
#endif
#endif
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTED;
ptl_peer->peer_retries = 0;
if(lam_list_get_size(&ptl_peer->peer_frags) > 0) {
@ -339,10 +374,41 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
/* connected */
mca_ptl_tcp_peer_connected(ptl_peer);
#if LAM_ENABLE_DEBUG
mca_ptl_tcp_peer_dump(ptl_peer, "connected");
#endif
return LAM_SUCCESS;
}
void mca_ptl_tcp_set_socket_options(int sd)
{
int optval;
#if defined(SO_SNDBUF)
if(setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_ptl_tcp_module.tcp_sndbuf, sizeof(int)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: SO_SNDBUF option: errno %d\n", errno);
}
#endif
#if defined(SO_RCVBUF)
if(setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_ptl_tcp_module.tcp_rcvbuf, sizeof(int)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: SO_RCVBUF option: errno %d\n", errno);
}
#endif
#if defined(TCP_NODELAY)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELAY) failed with errno=%d\n", errno);
}
#endif
#if defined(TCP_NODELACK)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELACK, &optval, sizeof(optval)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELACK) failed with errno=%d\n", errno);
}
#endif
}
/*
* Start a connection to the peer. This will likely not complete,
@ -363,6 +429,9 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
return LAM_ERR_UNREACH;
}
/* setup socket buffer sizes */
mca_ptl_tcp_set_socket_options(ptl_peer->peer_sd);
/* setup event callbacks */
mca_ptl_tcp_peer_event_init(ptl_peer, ptl_peer->peer_sd);
@ -505,8 +574,9 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
/* complete the current send */
do {
mca_ptl_tcp_send_frag_t* frag = ptl_peer->peer_send_frag;
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false)
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) {
break;
}
/* if required - update request status and release fragment */
mca_ptl_tcp_send_frag_progress(frag);

Просмотреть файл

@ -55,6 +55,7 @@ typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;
extern lam_class_t mca_ptl_tcp_peer_t_class;
typedef struct mca_ptl_base_peer_t mca_ptl_tcp_peer_t;
void mca_ptl_tcp_set_socket_options(int sd);
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t*);
int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t*, mca_ptl_tcp_send_frag_t*);
bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t*, struct sockaddr_in*, int);

Просмотреть файл

@ -86,9 +86,8 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
{
/* non-blocking read - continue if interrupted, otherwise wait until data available */
unsigned char* ptr = (unsigned char*)&frag->frag_header;
int cnt = -1;
while(cnt < 0) {
cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0);
while(frag->frag_hdr_cnt < size) {
int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
@ -99,6 +98,7 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
case EINTR:
continue;
case EWOULDBLOCK:
/* lam_output(0, "mca_ptl_tcp_recv_frag_header: EWOULDBLOCK\n"); */
return false;
default:
lam_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno);
@ -108,10 +108,11 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
}
}
frag->frag_hdr_cnt += cnt;
#if MCA_PTL_TCP_STATISTICS
((mca_ptl_tcp_t*)frag->frag_owner)->ptl_bytes_recv += cnt;
#endif
}
/* is the entire common header available? */
return (frag->frag_hdr_cnt == size);
return true;
}
@ -196,9 +197,8 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
int cnt = -1;
while(cnt < 0) {
cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt,
while(frag->frag_msg_cnt < frag->super.super.frag_size) {
int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt,
frag->super.super.frag_size-frag->frag_msg_cnt, 0);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
@ -218,9 +218,12 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false;
}
}
}
frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->super.super.frag_size);
#if MCA_PTL_TCP_STATISTICS
((mca_ptl_tcp_t*)frag->frag_owner)->ptl_bytes_recv += cnt;
#endif
}
return true;
}
@ -231,10 +234,9 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
int cnt = -1;
while(cnt < 0) {
while(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) {
void *rbuf = malloc(frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt);
cnt = recv(sd, rbuf, frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt, 0);
int cnt = recv(sd, rbuf, frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt, 0);
free(rbuf);
if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer);
@ -246,6 +248,7 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
case EINTR:
continue;
case EWOULDBLOCK:
/* lam_output(0, "mca_ptl_tcp_recv_frag_discard: EWOULDBLOCK\n"); */
return false;
default:
lam_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno);
@ -254,8 +257,11 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false;
}
}
}
frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->frag_header.hdr_frag.hdr_frag_length);
#if MCA_PTL_TCP_STATISTICS
((mca_ptl_tcp_t*)frag->frag_owner)->ptl_bytes_recv += cnt;
#endif
}
return true;
}

Просмотреть файл

@ -89,7 +89,7 @@ int mca_ptl_tcp_send_frag_init(
* can use the convertor initialized on the request, remaining fragments
* must copy/reinit the convertor as the transfer could be in parallel.
*/
if(sendreq->req_frags < 2) {
if( sendreq->req_frags < 2 ) {
convertor = &sendreq->req_convertor;
} else {
@ -153,6 +153,7 @@ bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)
case EINTR:
continue;
case EWOULDBLOCK:
/* lam_output(0, "mca_ptl_tcp_send_frag_handler: EWOULDBLOCK\n"); */
return false;
default:
{
@ -164,6 +165,11 @@ bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)
}
}
#if MCA_PTL_TCP_STATISTICS
((mca_ptl_tcp_t*)frag->frag_owner)->ptl_bytes_sent += cnt;
((mca_ptl_tcp_t*)frag->frag_owner)->ptl_send_handler++;
#endif
/* if the write didn't complete - update the iovec state */
num_vecs = frag->frag_vec_cnt;
for(i=0; i<num_vecs; i++) {
@ -178,8 +184,6 @@ bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)
break;
}
}
/* done with this frag? */
return (frag->frag_vec_cnt == 0);
}