1
1

- inlining multiple functions - changed to macros functions the compiler wouldn't inline

- removed unused event handling code from critical path
- fixed threading issue in tcp code

This commit was SVN r1136.
Этот коммит содержится в:
Tim Woodall 2004-05-18 21:06:11 +00:00
родитель c39ba9ea6e
Коммит e68af0dde8
33 изменённых файлов: 778 добавлений и 671 удалений

Просмотреть файл

@ -57,7 +57,6 @@
extern struct lam_event_list lam_eventqueue; extern struct lam_event_list lam_eventqueue;
extern volatile sig_atomic_t lam_evsignal_caught; extern volatile sig_atomic_t lam_evsignal_caught;
extern lam_mutex_t lam_event_mutex;
extern lam_mutex_t lam_event_lock; extern lam_mutex_t lam_event_lock;
/* due to limitations in the epoll interface, we need to keep track of /* due to limitations in the epoll interface, we need to keep track of
@ -175,9 +174,13 @@ epoll_dispatch(void *arg, struct timeval *tv)
return (-1); return (-1);
timeout = tv->tv_sec * 1000 + tv->tv_usec / 1000; timeout = tv->tv_sec * 1000 + tv->tv_usec / 1000;
lam_mutex_unlock(&lam_event_lock); if(lam_using_threads()) {
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); lam_mutex_unlock(&lam_event_lock);
lam_mutex_lock(&lam_event_lock); res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
lam_mutex_lock(&lam_event_lock);
} else {
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
}
if (lam_evsignal_recalc(&epollop->evsigmask) == -1) if (lam_evsignal_recalc(&epollop->evsigmask) == -1)
return (-1); return (-1);

Просмотреть файл

@ -117,13 +117,12 @@ int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */
int lam_event_gotsig; /* Set in signal handler */ int lam_event_gotsig; /* Set in signal handler */
/* Prototypes */ /* Prototypes */
static void lam_event_process_active(void);
static void lam_timeout_correct(struct timeval *off);
static void lam_timeout_insert(struct lam_event *);
static void lam_event_queue_insert(struct lam_event *, int); static void lam_event_queue_insert(struct lam_event *, int);
static void lam_event_queue_remove(struct lam_event *, int); static void lam_event_queue_remove(struct lam_event *, int);
static void lam_event_process_active(void);
static int lam_timeout_next(struct timeval *tv);
static void lam_timeout_correct(struct timeval *off);
static void lam_timeout_process(void); static void lam_timeout_process(void);
static void lam_timeout_insert(struct lam_event *);
int lam_event_haveevents(void); int lam_event_haveevents(void);
static RB_HEAD(lam_event_tree, lam_event) lam_timetree; static RB_HEAD(lam_event_tree, lam_event) lam_timetree;
@ -153,6 +152,29 @@ static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare)
static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare) static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare)
static int lam_timeout_next(struct timeval *tv)
{
struct timeval dflt = LAM_TIMEOUT_DEFAULT;
struct timeval now;
struct lam_event *ev;
if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) {
*tv = dflt;
return(0);
}
if (gettimeofday(&now, NULL) == -1)
return (-1);
if (timercmp(&ev->ev_timeout, &now, <=)) {
timerclear(tv);
return (0);
}
timersub(&ev->ev_timeout, &now, tv);
return (0);
}
/* run loop for dispatch thread */ /* run loop for dispatch thread */
static void* lam_event_run(lam_object_t* arg) static void* lam_event_run(lam_object_t* arg)
{ {
@ -258,9 +280,13 @@ lam_event_process_active(void)
while (ncalls) { while (ncalls) {
ncalls--; ncalls--;
ev->ev_ncalls = ncalls; ev->ev_ncalls = ncalls;
lam_mutex_unlock(&lam_event_lock); if(lam_using_threads()) {
(*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); lam_mutex_unlock(&lam_event_lock);
lam_mutex_lock(&lam_event_lock); (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
lam_mutex_lock(&lam_event_lock);
} else {
(*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
}
} }
} }
} }
@ -277,12 +303,12 @@ lam_event_loop(int flags)
struct timeval tv; struct timeval tv;
int res, done; int res, done;
lam_mutex_lock(&lam_event_lock); THREAD_LOCK(&lam_event_lock);
/* Calculate the initial events that we are waiting for */ /* Calculate the initial events that we are waiting for */
if (lam_evsel->recalc(lam_evbase, 0) == -1) { if (lam_evsel->recalc && lam_evsel->recalc(lam_evbase, 0) == -1) {
lam_output(0, "lam_event_loop: lam_evsel->recalc() failed."); lam_output(0, "lam_event_loop: lam_evsel->recalc() failed.");
lam_mutex_unlock(&lam_event_lock); THREAD_UNLOCK(&lam_event_lock);
return (-1); return (-1);
} }
@ -295,28 +321,16 @@ lam_event_loop(int flags)
if (res == -1) { if (res == -1) {
lam_output(0, "lam_event_loop: lam_event_sigcb() failed."); lam_output(0, "lam_event_loop: lam_event_sigcb() failed.");
errno = EINTR; errno = EINTR;
lam_mutex_unlock(&lam_event_lock); THREAD_UNLOCK(&lam_event_lock);
return (-1); return (-1);
} }
} }
} }
/* Check if time is running backwards */ if (!(flags & LAM_EVLOOP_NONBLOCK)) {
gettimeofday(&tv, NULL); static struct timeval dflt = LAM_TIMEOUT_DEFAULT;
if (timercmp(&tv, &lam_event_tv, <)) { tv = dflt;
struct timeval off; } else
LOG_DBG((LOG_MISC, 10,
"%s: time is running backwards, corrected",
__func__));
timersub(&lam_event_tv, &tv, &off);
lam_timeout_correct(&off);
}
lam_event_tv = tv;
if (!(flags & LAM_EVLOOP_NONBLOCK))
lam_timeout_next(&tv);
else
timerclear(&tv); timerclear(&tv);
#if LAM_HAVE_THREADS #if LAM_HAVE_THREADS
@ -328,11 +342,25 @@ lam_event_loop(int flags)
#endif #endif
if (res == -1) { if (res == -1) {
lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed."); lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed.");
lam_mutex_unlock(&lam_event_lock); THREAD_UNLOCK(&lam_event_lock);
return (-1); return (-1);
} }
lam_timeout_process(); if(NULL != RB_MIN(lam_event_tree, &lam_timetree)) {
/* Check if time is running backwards */
gettimeofday(&tv, NULL);
if (timercmp(&tv, &lam_event_tv, <)) {
struct timeval off;
LOG_DBG((LOG_MISC, 10,
"%s: time is running backwards, corrected",
__func__));
timersub(&lam_event_tv, &tv, &off);
lam_timeout_correct(&off);
}
lam_event_tv = tv;
lam_timeout_process();
}
if (TAILQ_FIRST(&lam_activequeue)) { if (TAILQ_FIRST(&lam_activequeue)) {
lam_event_process_active(); lam_event_process_active();
@ -341,58 +369,16 @@ lam_event_loop(int flags)
} else if (flags & (LAM_EVLOOP_NONBLOCK|LAM_EVLOOP_ONCE)) } else if (flags & (LAM_EVLOOP_NONBLOCK|LAM_EVLOOP_ONCE))
done = 1; done = 1;
if (lam_evsel->recalc(lam_evbase, 0) == -1) { if (lam_evsel->recalc && lam_evsel->recalc(lam_evbase, 0) == -1) {
lam_output(0, "lam_event_loop: lam_evesel->recalc() failed."); lam_output(0, "lam_event_loop: lam_evesel->recalc() failed.");
lam_mutex_unlock(&lam_event_lock); THREAD_UNLOCK(&lam_event_lock);
return (-1); return (-1);
} }
} }
lam_mutex_unlock(&lam_event_lock); THREAD_UNLOCK(&lam_event_lock);
return (0); return (0);
} }
void
lam_event_set(struct lam_event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
{
ev->ev_callback = callback;
ev->ev_arg = arg;
#ifdef WIN32
ev->ev_fd = (HANDLE)fd;
ev->overlap.hEvent = ev;
#else
ev->ev_fd = fd;
#endif
ev->ev_events = events;
ev->ev_flags = LAM_EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
}
/*
* Checks if a specific event is pending or scheduled.
*/
int
lam_event_pending(struct lam_event *ev, short event, struct timeval *tv)
{
int flags = 0;
if (ev->ev_flags & LAM_EVLIST_INSERTED)
flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE));
if (ev->ev_flags & LAM_EVLIST_ACTIVE)
flags |= ev->ev_res;
if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
flags |= LAM_EV_TIMEOUT;
event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE);
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & LAM_EV_TIMEOUT))
*tv = ev->ev_timeout;
return (flags & event);
}
int int
lam_event_add_i(struct lam_event *ev, struct timeval *tv) lam_event_add_i(struct lam_event *ev, struct timeval *tv)
@ -461,15 +447,6 @@ lam_event_add_i(struct lam_event *ev, struct timeval *tv)
return rc; return rc;
} }
int
lam_event_add(struct lam_event *ev, struct timeval *tv)
{
int rc;
lam_mutex_lock(&lam_event_lock);
rc = lam_event_add_i(ev, tv);
lam_mutex_unlock(&lam_event_lock);
return rc;
}
int lam_event_del_i(struct lam_event *ev) int lam_event_del_i(struct lam_event *ev)
{ {
@ -509,72 +486,6 @@ int lam_event_del_i(struct lam_event *ev)
return (0); return (0);
} }
int lam_event_del(struct lam_event *ev)
{
int rc;
lam_mutex_lock(&lam_event_lock);
rc = lam_event_del_i(ev);
lam_mutex_unlock(&lam_event_lock);
return rc;
}
void
lam_event_active_i(struct lam_event *ev, int res, short ncalls)
{
/* We get different kinds of events, add them together */
if (ev->ev_flags & LAM_EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}
ev->ev_res = res;
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE);
}
void
lam_event_active(struct lam_event* ev, int res, short ncalls)
{
lam_mutex_lock(&lam_event_lock);
lam_event_active_i(ev, res, ncalls);
lam_mutex_unlock(&lam_event_lock);
}
static int
lam_timeout_next(struct timeval *tv)
{
struct timeval dflt = LAM_TIMEOUT_DEFAULT;
struct timeval now;
struct lam_event *ev;
if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) {
*tv = dflt;
return (0);
}
if (gettimeofday(&now, NULL) == -1)
return (-1);
if (timercmp(&ev->ev_timeout, &now, <=)) {
timerclear(tv);
return (0);
}
timersub(&ev->ev_timeout, &now, tv);
assert(tv->tv_sec >= 0);
assert(tv->tv_usec >= 0);
LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec));
return (0);
}
static void static void
lam_timeout_correct(struct timeval *off) lam_timeout_correct(struct timeval *off)
@ -690,3 +601,18 @@ lam_event_queue_insert(struct lam_event *ev, int queue)
} }
} }
void lam_event_active_i(struct lam_event * ev, int res, short ncalls)
{
/* We get different kinds of events, add them together */
if (ev->ev_flags & LAM_EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}
ev->ev_res = res;
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE);
}

Просмотреть файл

@ -39,6 +39,7 @@ extern "C" {
unqualified name for a header file. :-) */ unqualified name for a header file. :-) */
#include "lam_config.h" #include "lam_config.h"
#endif #endif
#include "threads/mutex.h"
#ifdef HAVE_SYS_TIME_H #ifdef HAVE_SYS_TIME_H
#include <sys/time.h> #include <sys/time.h>
@ -63,6 +64,10 @@ extern "C" {
#define LAM_EV_SIGNAL 0x08 #define LAM_EV_SIGNAL 0x08
#define LAM_EV_PERSIST 0x10 /* Persistant event */ #define LAM_EV_PERSIST 0x10 /* Persistant event */
#ifndef LAM_EVENT_USE_SIGNALS
#define LAM_EVENT_USE_SIGNALS 0
#endif
/* Fix so that ppl dont have to run with <sys/queue.h> */ /* Fix so that ppl dont have to run with <sys/queue.h> */
#ifndef TAILQ_ENTRY #ifndef TAILQ_ENTRY
#define _EVENT_DEFINED_TQENTRY #define _EVENT_DEFINED_TQENTRY
@ -160,11 +165,94 @@ int lam_event_loop(int);
#define lam_signal_pending(ev, tv) lam_event_pending(ev, LAM_EV_SIGNAL, tv) #define lam_signal_pending(ev, tv) lam_event_pending(ev, LAM_EV_SIGNAL, tv)
#define lam_signal_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) #define lam_signal_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT)
void lam_event_set(struct lam_event *, int, short, void (*)(int, short, void *), void *); /* for internal use only */
int lam_event_add(struct lam_event *, struct timeval *); int lam_event_add_i(struct lam_event *, struct timeval *);
int lam_event_del(struct lam_event *); int lam_event_del_i(struct lam_event *);
int lam_event_pending(struct lam_event *, short, struct timeval *); void lam_event_active_i(struct lam_event*, int, short);
void lam_event_active(struct lam_event *, int, short);
/* public functions */
static inline void
lam_event_set(struct lam_event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
{
ev->ev_callback = callback;
ev->ev_arg = arg;
#ifdef WIN32
ev->ev_fd = (HANDLE)fd;
ev->overlap.hEvent = ev;
#else
ev->ev_fd = fd;
#endif
ev->ev_events = events;
ev->ev_flags = LAM_EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
}
static inline int
lam_event_add(struct lam_event *ev, struct timeval *tv)
{
extern lam_mutex_t lam_event_lock;
int rc;
if(lam_using_threads()) {
lam_mutex_lock(&lam_event_lock);
rc = lam_event_add_i(ev, tv);
lam_mutex_unlock(&lam_event_lock);
} else {
rc = lam_event_add_i(ev, tv);
}
return rc;
}
static inline int
lam_event_del(struct lam_event *ev)
{
extern lam_mutex_t lam_event_lock;
int rc;
if(lam_using_threads()) {
lam_mutex_lock(&lam_event_lock);
rc = lam_event_del_i(ev);
lam_mutex_unlock(&lam_event_lock);
} else {
rc = lam_event_del_i(ev);
}
return rc;
}
static inline void
lam_event_active(struct lam_event* ev, int res, short ncalls)
{
extern lam_mutex_t lam_event_lock;
if(lam_using_threads()) {
lam_mutex_lock(&lam_event_lock);
lam_event_active_i(ev, res, ncalls);
lam_mutex_unlock(&lam_event_lock);
} else {
lam_event_active_i(ev, res, ncalls);
}
}
static inline int
lam_event_pending(struct lam_event *ev, short event, struct timeval *tv)
{
int flags = 0;
if (ev->ev_flags & LAM_EVLIST_INSERTED)
flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE));
if (ev->ev_flags & LAM_EVLIST_ACTIVE)
flags |= ev->ev_res;
if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
flags |= LAM_EV_TIMEOUT;
event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE);
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & LAM_EV_TIMEOUT))
*tv = ev->ev_timeout;
return (flags & event);
}
#ifdef WIN32 #ifdef WIN32
#define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE) #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE)
@ -172,11 +260,6 @@ void lam_event_active(struct lam_event *, int, short);
#define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT)
#endif #endif
/* for internal use only */
int lam_event_add_i(struct lam_event *, struct timeval *);
int lam_event_del_i(struct lam_event *);
void lam_event_active_i(struct lam_event *, int, short);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

Просмотреть файл

@ -200,10 +200,15 @@ kq_dispatch(void *arg, struct timeval *tv)
TIMEVAL_TO_TIMESPEC(tv, &ts); TIMEVAL_TO_TIMESPEC(tv, &ts);
/* release lock while waiting in kernel */ /* release lock while waiting in kernel */
lam_mutex_unlock(&lam_event_lock); if(lam_using_threads()) {
res = kevent(kqop->kq, changes, kqop->nchanges, lam_mutex_unlock(&lam_event_lock);
events, kqop->nevents, &ts); res = kevent(kqop->kq, changes, kqop->nchanges,
lam_mutex_lock(&lam_event_lock); events, kqop->nevents, &ts);
lam_mutex_lock(&lam_event_lock);
} else {
res = kevent(kqop->kq, changes, kqop->nchanges,
events, kqop->nevents, &ts);
}
kqop->nchanges = 0; kqop->nchanges = 0;
if (res == -1) { if (res == -1) {

Просмотреть файл

@ -52,7 +52,9 @@
#endif #endif
#include "event.h" #include "event.h"
#if LAM_EVENT_USE_SIGNALS
#include "evsignal.h" #include "evsignal.h"
#endif
#include "threads/mutex.h" #include "threads/mutex.h"
@ -64,7 +66,9 @@ struct pollop {
int event_count; /* Highest number alloc */ int event_count; /* Highest number alloc */
struct pollfd *event_set; struct pollfd *event_set;
struct lam_event **event_back; struct lam_event **event_back;
#if LAM_EVENT_USE_SIGNALS
sigset_t evsigmask; sigset_t evsigmask;
#endif
} pollop; } pollop;
static void *poll_init (void); static void *poll_init (void);
@ -78,7 +82,7 @@ const struct lam_eventop lam_pollops = {
poll_init, poll_init,
poll_add, poll_add,
poll_del, poll_del,
poll_recalc, NULL,
poll_dispatch poll_dispatch
}; };
@ -90,9 +94,9 @@ poll_init(void)
return (NULL); return (NULL);
memset(&pollop, 0, sizeof(pollop)); memset(&pollop, 0, sizeof(pollop));
#if LAM_EVENT_USE_SIGNALS
lam_evsignal_init(&pollop.evsigmask); lam_evsignal_init(&pollop.evsigmask);
#endif
return (&pollop); return (&pollop);
} }
@ -104,9 +108,12 @@ poll_init(void)
static int static int
poll_recalc(void *arg, int max) poll_recalc(void *arg, int max)
{ {
#if LAM_EVENT_USE_SIGNALS
struct pollop *pop = arg; struct pollop *pop = arg;
return (lam_evsignal_recalc(&pop->evsigmask)); return (lam_evsignal_recalc(&pop->evsigmask));
#else
return (0);
#endif
} }
static int static int
@ -163,16 +170,24 @@ poll_dispatch(void *arg, struct timeval *tv)
} }
} }
#if LAM_EVENT_USE_SIGNALS
if (lam_evsignal_deliver(&pop->evsigmask) == -1) if (lam_evsignal_deliver(&pop->evsigmask) == -1)
return (-1); return (-1);
#endif
sec = tv->tv_sec * 1000 + tv->tv_usec / 1000; sec = tv->tv_sec * 1000 + tv->tv_usec / 1000;
lam_mutex_unlock(&lam_event_lock); if(lam_using_threads()) {
res = poll(pop->event_set, nfds, sec); lam_mutex_unlock(&lam_event_lock);
lam_mutex_lock(&lam_event_lock); res = poll(pop->event_set, nfds, sec);
lam_mutex_lock(&lam_event_lock);
} else {
res = poll(pop->event_set, nfds, sec);
}
#if LAM_EVENT_USE_SIGNALS
if (lam_evsignal_recalc(&pop->evsigmask) == -1) if (lam_evsignal_recalc(&pop->evsigmask) == -1)
return (-1); return (-1);
#endif
if (res == -1) { if (res == -1) {
if (errno != EINTR) { if (errno != EINTR) {
@ -180,10 +195,16 @@ poll_dispatch(void *arg, struct timeval *tv)
return (-1); return (-1);
} }
#if LAM_EVENT_USE_SIGNALS
lam_evsignal_process(); lam_evsignal_process();
#endif
return (0); return (0);
} else if (lam_evsignal_caught) }
#if LAM_EVENT_USE_SIGNALS
else if (lam_evsignal_caught)
lam_evsignal_process(); lam_evsignal_process();
#endif
LOG_DBG((LOG_MISC, 80, "%s: poll reports %d", __func__, res)); LOG_DBG((LOG_MISC, 80, "%s: poll reports %d", __func__, res));
@ -223,11 +244,11 @@ poll_dispatch(void *arg, struct timeval *tv)
static int static int
poll_add(void *arg, struct lam_event *ev) poll_add(void *arg, struct lam_event *ev)
{ {
#if LAM_EVENT_USE_SIGNALS
struct pollop *pop = arg; struct pollop *pop = arg;
if (ev->ev_events & LAM_EV_SIGNAL) if (ev->ev_events & LAM_EV_SIGNAL)
return (lam_evsignal_add(&pop->evsigmask, ev)); return (lam_evsignal_add(&pop->evsigmask, ev));
#endif
return (0); return (0);
} }
@ -238,10 +259,15 @@ poll_add(void *arg, struct lam_event *ev)
static int static int
poll_del(void *arg, struct lam_event *ev) poll_del(void *arg, struct lam_event *ev)
{ {
#if LAM_EVENT_USE_SIGNALS
struct pollop *pop = arg; struct pollop *pop = arg;
#endif
if (!(ev->ev_events & LAM_EV_SIGNAL)) if (!(ev->ev_events & LAM_EV_SIGNAL))
return (0); return (0);
#if LAM_EVENT_USE_SIGNALS
return (lam_evsignal_del(&pop->evsigmask, ev)); return (lam_evsignal_del(&pop->evsigmask, ev));
#else
return (0);
#endif
} }

Просмотреть файл

@ -142,8 +142,11 @@ select_recalc(void *arg, int max)
sop->event_writeset = writeset; sop->event_writeset = writeset;
sop->event_fdsz = fdsz; sop->event_fdsz = fdsz;
} }
#if LAM_EVENT_USE_SIGNALS
return (lam_evsignal_recalc(&sop->evsigmask)); return (lam_evsignal_recalc(&sop->evsigmask));
#else
return (0);
#endif
} }
static int static int
@ -163,17 +166,26 @@ select_dispatch(void *arg, struct timeval *tv)
FD_SET(ev->ev_fd, sop->event_readset); FD_SET(ev->ev_fd, sop->event_readset);
} }
#if LAM_EVENT_USE_SIGNALS
if (lam_evsignal_deliver(&sop->evsigmask) == -1) if (lam_evsignal_deliver(&sop->evsigmask) == -1)
return (-1); return (-1);
#endif
/* release lock while waiting in kernel */ /* release lock while waiting in kernel */
lam_mutex_unlock(&lam_event_lock); if(lam_using_threads()) {
res = select(sop->event_fds + 1, sop->event_readset, lam_mutex_unlock(&lam_event_lock);
sop->event_writeset, NULL, tv); res = select(sop->event_fds + 1, sop->event_readset,
lam_mutex_lock(&lam_event_lock); sop->event_writeset, NULL, tv);
lam_mutex_lock(&lam_event_lock);
} else {
res = select(sop->event_fds + 1, sop->event_readset,
sop->event_writeset, NULL, tv);
}
#if LAM_EVENT_USE_SIGNALS
if (lam_evsignal_recalc(&sop->evsigmask) == -1) if (lam_evsignal_recalc(&sop->evsigmask) == -1)
return (-1); return (-1);
#endif
if (res == -1) { if (res == -1) {
if (errno != EINTR) { if (errno != EINTR) {
@ -181,13 +193,15 @@ select_dispatch(void *arg, struct timeval *tv)
return (-1); return (-1);
} }
#if LAM_EVENT_USE_SIGNALS
lam_evsignal_process(); lam_evsignal_process();
#endif
return (0); return (0);
} else if (lam_evsignal_caught) }
#if LAM_EVENT_USE_SIGNALS
else if (lam_evsignal_caught)
lam_evsignal_process(); lam_evsignal_process();
#endif
LOG_DBG((LOG_MISC, 80, "%s: select reports %d", __func__, res));
maxfd = 0; maxfd = 0;
for (ev = TAILQ_FIRST(&lam_eventqueue); ev != NULL; ev = next) { for (ev = TAILQ_FIRST(&lam_eventqueue); ev != NULL; ev = next) {
next = TAILQ_NEXT(ev, ev_next); next = TAILQ_NEXT(ev, ev_next);
@ -218,8 +232,10 @@ select_add(void *arg, struct lam_event *ev)
{ {
struct selectop *sop = arg; struct selectop *sop = arg;
#if LAM_EVENT_USE_SIGNALS
if (ev->ev_events & LAM_EV_SIGNAL) if (ev->ev_events & LAM_EV_SIGNAL)
return (lam_evsignal_add(&sop->evsigmask, ev)); return (lam_evsignal_add(&sop->evsigmask, ev));
#endif
/* /*
* Keep track of the highest fd, so that we can calculate the size * Keep track of the highest fd, so that we can calculate the size
@ -243,5 +259,10 @@ select_del(void *arg, struct lam_event *ev)
if (!(ev->ev_events & LAM_EV_SIGNAL)) if (!(ev->ev_events & LAM_EV_SIGNAL))
return (0); return (0);
#if LAM_EVENT_USE_SIGNALS
return (lam_evsignal_del(&sop->evsigmask, ev)); return (lam_evsignal_del(&sop->evsigmask, ev));
#else
return (0);
#endif
} }

Просмотреть файл

@ -69,33 +69,6 @@ static void lam_list_destruct(lam_list_t *list)
lam_list_construct(list); lam_list_construct(list);
} }
/**
* Adds item to the end of the list.
*
* @param list List accepting new item (IN/OUT)
*
* @param item Item being put on the new list (IN)
*
*/
void lam_list_append(lam_list_t *list, lam_list_item_t *item)
{
/* set new element's previous pointer */
item->lam_list_prev=list->lam_list_tail.lam_list_prev;
/* reset previous pointer on current last element */
list->lam_list_tail.lam_list_prev->lam_list_next=item;
/* reset new element's next pointer */
item->lam_list_next=&(list->lam_list_tail);
/* reset the list's tail element previous pointer */
list->lam_list_tail.lam_list_prev = item;
/* increment list element counter */
list->lam_list_length++;
}
int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx) int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx)
{ {
/* Adds item to list at index and retains item. */ /* Adds item to list at index and retains item. */
@ -127,91 +100,3 @@ int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx)
return 1; return 1;
} }
/*
* Adds item to the front of the list and retains item.
*
* @param list List accepting new item (IN/OUT)
*
* @param item Item being put on the new list (IN)
*
*/
void lam_list_prepend(lam_list_t *list, lam_list_item_t *item)
{
/* reset item's next pointer */
item->lam_list_next = list->lam_list_head.lam_list_next;
/* reset item's previous pointer */
item->lam_list_prev = &(list->lam_list_head);
/* reset previous first element's previous poiner */
list->lam_list_head.lam_list_next->lam_list_prev = item;
/* reset head's next pointer */
list->lam_list_head.lam_list_next = item;
/* increment list element counter */
list->lam_list_length++;
}
lam_list_item_t *lam_list_remove_first(lam_list_t *list)
{
/* Removes and returns first item on list.
Caller now owns the item and should release the item
when caller is done with it.
*/
volatile lam_list_item_t *item;
if ( 0 == list->lam_list_length )
return (lam_list_item_t *)NULL;
/* reset list length counter */
list->lam_list_length--;
/* get pointer to first element on the list */
item = list->lam_list_head.lam_list_next;
/* reset previous pointer of next item on the list */
item->lam_list_next->lam_list_prev=item->lam_list_prev;
/* reset the head next pointer */
list->lam_list_head.lam_list_next=item->lam_list_next;
#if LAM_ENABLE_DEBUG
/* debug code */
item->lam_list_prev=(lam_list_item_t *)NULL;
item->lam_list_next=(lam_list_item_t *)NULL;
#endif
return (lam_list_item_t *) item;
}
lam_list_item_t *lam_list_remove_last(lam_list_t *list)
{
/* Removes, releases and returns last item on list.
Caller now owns the item and should release the item
when caller is done with it.
*/
volatile lam_list_item_t *item;
if ( 0 == list->lam_list_length )
return (lam_list_item_t *)NULL;
/* reset list length counter */
list->lam_list_length--;
/* get item */
item = list->lam_list_tail.lam_list_prev;
/* reset previous pointer on next to last pointer */
item->lam_list_prev->lam_list_next=item->lam_list_next;
/* reset tail's previous pointer */
list->lam_list_tail.lam_list_prev=item->lam_list_prev;
#if LAM_ENABLE_DEBUG
/* debug code */
item->lam_list_next = item->lam_list_prev = (lam_list_item_t *)NULL;
#endif
return (lam_list_item_t *) item;
}

Просмотреть файл

@ -162,7 +162,25 @@ extern "C" {
/* /*
* Adds item to the end of the list but does not retain item. * Adds item to the end of the list but does not retain item.
*/ */
void lam_list_append(lam_list_t *list, lam_list_item_t *item);
static inline void lam_list_append(lam_list_t *list, lam_list_item_t *item)
{
/* set new element's previous pointer */
item->lam_list_prev=list->lam_list_tail.lam_list_prev;
/* reset previous pointer on current last element */
list->lam_list_tail.lam_list_prev->lam_list_next=item;
/* reset new element's next pointer */
item->lam_list_next=&(list->lam_list_tail);
/* reset the list's tail element previous pointer */
list->lam_list_tail.lam_list_prev = item;
/* increment list element counter */
list->lam_list_length++;
}
/* Adds item to list at index and retains item. /* Adds item to list at index and retains item.
Returns 1 if successful, 0 otherwise. Returns 1 if successful, 0 otherwise.
@ -170,25 +188,97 @@ extern "C" {
Example: if idx = 2 and list = item1->item2->item3->item4, then Example: if idx = 2 and list = item1->item2->item3->item4, then
after insert, list = item1->item2->item->item3->item4 after insert, list = item1->item2->item->item3->item4
*/ */
int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx); int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx);
/* /*
* Adds item to the front of the list and retains item. * Adds item to the front of the list and retains item.
*/ */
void lam_list_prepend(lam_list_t *list, lam_list_item_t *item); static inline void lam_list_prepend(lam_list_t *list, lam_list_item_t *item)
{
/* reset item's next pointer */
item->lam_list_next = list->lam_list_head.lam_list_next;
/* reset item's previous pointer */
item->lam_list_prev = &(list->lam_list_head);
/* reset previous first element's previous poiner */
list->lam_list_head.lam_list_next->lam_list_prev = item;
/* reset head's next pointer */
list->lam_list_head.lam_list_next = item;
/* increment list element counter */
list->lam_list_length++;
}
/* /*
* Removes and returns first item on list. * Removes and returns first item on list.
*/ */
lam_list_item_t *lam_list_remove_first(lam_list_t *list); static inline lam_list_item_t *lam_list_remove_first(lam_list_t *list)
{
/* Removes and returns first item on list.
Caller now owns the item and should release the item
when caller is done with it.
*/
volatile lam_list_item_t *item;
if ( 0 == list->lam_list_length )
return (lam_list_item_t *)NULL;
/* reset list length counter */
list->lam_list_length--;
/* get pointer to first element on the list */
item = list->lam_list_head.lam_list_next;
/* reset previous pointer of next item on the list */
item->lam_list_next->lam_list_prev=item->lam_list_prev;
/* reset the head next pointer */
list->lam_list_head.lam_list_next=item->lam_list_next;
#if LAM_ENABLE_DEBUG
/* debug code */
item->lam_list_prev=(lam_list_item_t *)NULL;
item->lam_list_next=(lam_list_item_t *)NULL;
#endif
return (lam_list_item_t *) item;
}
/* /*
* Removes and returns last item on list. * Removes and returns last item on list.
*/ */
lam_list_item_t *lam_list_remove_last(lam_list_t *list); static inline lam_list_item_t *lam_list_remove_last(lam_list_t *list)
{
/* Removes, releases and returns last item on list.
Caller now owns the item and should release the item
when caller is done with it.
*/
volatile lam_list_item_t *item;
if ( 0 == list->lam_list_length )
return (lam_list_item_t *)NULL;
/* reset list length counter */
list->lam_list_length--;
/* get item */
item = list->lam_list_tail.lam_list_prev;
/* reset previous pointer on next to last pointer */
item->lam_list_prev->lam_list_next=item->lam_list_next;
/* reset tail's previous pointer */
list->lam_list_tail.lam_list_prev=item->lam_list_prev;
#if LAM_ENABLE_DEBUG
/* debug code */
item->lam_list_next = item->lam_list_prev = (lam_list_item_t *)NULL;
#endif
return (lam_list_item_t *) item;
}
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif

Просмотреть файл

@ -230,45 +230,3 @@ int lam_pointer_array_set_item(lam_pointer_array_t *table, size_t index,
return LAM_SUCCESS; return LAM_SUCCESS;
} }
/**
* lookup pointer by index in pointer table
*
* @param table Pointer to lam_pointer_array_t object (IN)
* @param ptr Pointer to be added to table (IN)
*
* @return Pointer
*/
void *lam_pointer_array_get_item(lam_pointer_array_t *table, size_t index)
{
void *p;
#if 0
lam_output(0,"lam_pointer_array_get_item: IN: "
" table %p (size %ld, lowest free %ld, number free %ld)"
" addr[%d] = %p\n",
table, table->size, table->lowest_free, table->number_free,
index, table->addr[index]);
#endif
THREAD_LOCK(&(table->lock));
assert(table != NULL);
assert(table->addr != NULL);
assert(index >= 0);
assert(index < table->size);
p = table->addr[index];
THREAD_UNLOCK(&(table->lock));
#if 0
lam_output(0,"lam_pointer_array_get_item: OUT:"
" table %p (size %ld, lowest free %ld, number free %ld)"
" addr[%d] = %p\n",
table, table->size, table->lowest_free, table->number_free,
index, table->addr[index]);
#endif
return p;
}

Просмотреть файл

@ -64,7 +64,16 @@ int lam_pointer_array_set_item(lam_pointer_array_t *array,
* *
* @return Error code. NULL indicates an error. * @return Error code. NULL indicates an error.
*/ */
void *lam_pointer_array_get_item(lam_pointer_array_t *array, size_t index);
static inline void *lam_pointer_array_get_item(lam_pointer_array_t *table, size_t index)
{
void *p;
THREAD_LOCK(&(table->lock));
p = table->addr[index];
THREAD_UNLOCK(&(table->lock));
return p;
}
/** /**
* Get the size of the pointer array * Get the size of the pointer array

Просмотреть файл

@ -209,7 +209,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
struct mca_ptl_t *ptl = ptl_proc->ptl; struct mca_ptl_t *ptl = ptl_proc->ptl;
double weight; double weight;
if(ptl->ptl_bandwidth) if(ptl->ptl_bandwidth)
weight = total_bandwidth / ptl_proc->ptl->ptl_bandwidth; weight = ptl_proc->ptl->ptl_bandwidth / total_bandwidth;
else else
weight = 1.0 / n_size; weight = 1.0 / n_size;
ptl_proc->ptl_weight = (int)(weight * 100); ptl_proc->ptl_weight = (int)(weight * 100);

Просмотреть файл

@ -15,6 +15,7 @@
#include "request/request.h" #include "request/request.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"
#include "mca/pml/base/pml_base_request.h" #include "mca/pml/base/pml_base_request.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#define MCA_PML_TEG_STATISTICS 0 #define MCA_PML_TEG_STATISTICS 0
@ -223,5 +224,31 @@ extern int mca_pml_teg_free(
lam_request_t** request lam_request_t** request
); );
#define MCA_PML_TEG_FREE(request) \
{ \
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \
pml_request->req_free_called = true; \
if(pml_request->req_pml_done == true) \
{ \
switch(pml_request->req_type) { \
case MCA_PML_REQUEST_SEND: \
{ \
mca_ptl_base_send_request_t* sendreq = (mca_ptl_base_send_request_t*)pml_request; \
mca_ptl_t* ptl = sendreq->req_owner; \
ptl->ptl_request_return(ptl, sendreq); \
break; \
} \
case MCA_PML_REQUEST_RECV: \
{ \
LAM_FREE_LIST_RETURN(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)pml_request); \
break; \
} \
default: \
break; \
} \
} \
*(request) = NULL; \
}
#endif #endif

Просмотреть файл

@ -4,31 +4,9 @@
#include "mca/ptl/base/ptl_base_sendreq.h" #include "mca/ptl/base/ptl_base_sendreq.h"
int mca_pml_teg_free(lam_request_t** request) int mca_pml_teg_free(lam_request_t** request)
{ {
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)request; MCA_PML_TEG_FREE(request);
pml_request->req_free_called = true;
if(pml_request->req_pml_done == true)
{
switch(pml_request->req_type) {
case MCA_PML_REQUEST_SEND:
{
mca_ptl_base_send_request_t* sendreq = (mca_ptl_base_send_request_t*)pml_request;
mca_ptl_t* ptl = sendreq->req_owner;
ptl->ptl_request_return(ptl, sendreq);
break;
}
case MCA_PML_REQUEST_RECV:
{
lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)pml_request);
break;
}
default:
break;
}
}
*request = NULL;
return LAM_SUCCESS; return LAM_SUCCESS;
} }

Просмотреть файл

@ -11,11 +11,12 @@ int mca_pml_teg_irecv_init(
struct lam_request_t **request) struct lam_request_t **request)
{ {
int rc; int rc;
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq;
MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;
mca_ptl_base_recv_request_init( MCA_PTL_BASE_RECV_REQUEST_INIT(
recvreq, recvreq,
addr, addr,
count, count,
@ -40,14 +41,15 @@ int mca_pml_teg_irecv(
{ {
int rc; int rc;
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq;
MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_irecvs++; mca_pml_teg.teg_irecvs++;
#endif #endif
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;
mca_ptl_base_recv_request_init( MCA_PTL_BASE_RECV_REQUEST_INIT(
recvreq, recvreq,
addr, addr,
count, count,
@ -58,7 +60,7 @@ int mca_pml_teg_irecv(
false); false);
if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) { if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) {
mca_pml_teg_recv_request_return(recvreq); MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return rc; return rc;
} }
*request = (lam_request_t*)recvreq; *request = (lam_request_t*)recvreq;
@ -76,14 +78,15 @@ int mca_pml_teg_recv(
lam_status_public_t* status) lam_status_public_t* status)
{ {
int rc, index; int rc, index;
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq;
#if MCA_PML_TEG_STATISTICS MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_recvs++; mca_pml_teg.teg_recvs++;
#endif #endif
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;
mca_ptl_base_recv_request_init( MCA_PTL_BASE_RECV_REQUEST_INIT(
recvreq, recvreq,
addr, addr,
count, count,
@ -94,9 +97,27 @@ int mca_pml_teg_recv(
false); false);
if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) { if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) {
mca_pml_teg_recv_request_return(recvreq); MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return rc; return rc;
} }
return mca_pml_teg_wait(1, (lam_request_t**)&recvreq, &index, status);
if(recvreq->super.req_mpi_done == false) {
/* give up and sleep until completion */
if(lam_using_threads()) {
lam_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while(recvreq->super.req_mpi_done == false)
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while(recvreq->super.req_mpi_done == false)
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
}
}
MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return LAM_SUCCESS;
} }

Просмотреть файл

@ -19,11 +19,12 @@ int mca_pml_teg_isend_init(
{ {
int rc; int rc;
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
mca_ptl_base_send_request_init( MCA_PTL_BASE_SEND_REQUEST_INIT(
sendreq, sendreq,
buf, buf,
count, count,
@ -51,13 +52,14 @@ int mca_pml_teg_isend(
lam_request_t **request) lam_request_t **request)
{ {
int rc; int rc;
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_isends++; mca_pml_teg.teg_isends++;
#endif #endif
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
mca_ptl_base_send_request_init( MCA_PTL_BASE_SEND_REQUEST_INIT(
sendreq, sendreq,
buf, buf,
count, count,
@ -86,14 +88,15 @@ int mca_pml_teg_send(
lam_communicator_t* comm) lam_communicator_t* comm)
{ {
int rc, index; int rc, index;
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_sends++; mca_pml_teg.teg_sends++;
#endif #endif
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
mca_ptl_base_send_request_init( MCA_PTL_BASE_SEND_REQUEST_INIT(
sendreq, sendreq,
buf, buf,
count, count,
@ -105,8 +108,30 @@ int mca_pml_teg_send(
false false
); );
if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) {
MCA_PML_TEG_FREE((lam_request_t**)&sendreq);
return rc; return rc;
return mca_pml_teg_wait(1, (lam_request_t**)&sendreq, &index, NULL); }
if(sendreq->super.req_mpi_done == false) {
/* give up and sleep until completion */
if(lam_using_threads()) {
lam_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while(sendreq->super.req_mpi_done == false)
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while(sendreq->super.req_mpi_done == false)
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
}
}
/* return request to pool */
MCA_PML_TEG_FREE((lam_request_t**)&sendreq);
return LAM_SUCCESS;
} }

Просмотреть файл

@ -6,7 +6,7 @@ void mca_pml_teg_recv_request_progress(
mca_ptl_base_recv_request_t* req, mca_ptl_base_recv_request_t* req,
mca_ptl_base_recv_frag_t* frag) mca_ptl_base_recv_frag_t* frag)
{ {
lam_mutex_lock(&mca_pml_teg.teg_request_lock); THREAD_LOCK(&mca_pml_teg.teg_request_lock);
req->req_bytes_delivered += frag->super.frag_size; req->req_bytes_delivered += frag->super.frag_size;
req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length; req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length;
if (req->req_bytes_received >= req->req_bytes_msg) { if (req->req_bytes_received >= req->req_bytes_msg) {
@ -24,6 +24,6 @@ void mca_pml_teg_recv_request_progress(
lam_condition_broadcast(&mca_pml_teg.teg_request_cond); lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
} }
} }
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }

Просмотреть файл

@ -19,20 +19,20 @@
* @param rc (OUT) LAM_SUCCESS or error status on failure. * @param rc (OUT) LAM_SUCCESS or error status on failure.
* @return Receive request. * @return Receive request.
*/ */
static inline mca_ptl_base_recv_request_t* mca_pml_teg_recv_request_alloc(int *rc) #define MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc) \
{ { \
return (mca_ptl_base_recv_request_t*)lam_free_list_get(&mca_pml_teg.teg_recv_requests, rc); lam_list_item_t* item; \
} LAM_FREE_LIST_GET(&mca_pml_teg.teg_recv_requests, item, rc); \
recvreq = (mca_ptl_base_recv_request_t*)item; \
}
/** /**
* Return a recv request to the modules free list. * Return a recv request to the modules free list.
* *
* @param request (IN) Receive request. * @param request (IN) Receive request.
*/ */
static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t* request) #define MCA_PML_TEG_RECV_REQUEST_RETURN(request) \
{ LAM_FREE_LIST_RETURN(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request);
lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request);
}
/** /**
* Start an initialized request. * Start an initialized request.

Просмотреть файл

@ -47,9 +47,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
* previously assigned) * previously assigned)
*/ */
else { else {
bytes_to_frag = (ptl_proc->ptl_weight * req->req_bytes_msg) / 100; bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100;
if(bytes_to_frag > bytes_remaining)
bytes_to_frag = bytes_remaining;
} }
rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0); rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0);
@ -59,12 +57,12 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
/* unable to complete send - signal request failed */ /* unable to complete send - signal request failed */
if(bytes_remaining > 0) { if(bytes_remaining > 0) {
lam_mutex_lock(&mca_pml_teg.teg_request_lock); THREAD_LOCK(&mca_pml_teg.teg_request_lock);
req->super.req_mpi_done = true; req->super.req_mpi_done = true;
/* FIX - set status correctly */ /* FIX - set status correctly */
if(mca_pml_teg.teg_request_waiting) if(mca_pml_teg.teg_request_waiting)
lam_condition_broadcast(&mca_pml_teg.teg_request_cond); lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }
} }
@ -73,7 +71,7 @@ void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* req, mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag) mca_ptl_base_send_frag_t* frag)
{ {
lam_mutex_lock(&mca_pml_teg.teg_request_lock); THREAD_LOCK(&mca_pml_teg.teg_request_lock);
req->req_bytes_sent += frag->super.frag_size; req->req_bytes_sent += frag->super.frag_size;
if (req->req_bytes_sent >= req->req_bytes_msg) { if (req->req_bytes_sent >= req->req_bytes_msg) {
req->super.req_pml_done = true; req->super.req_pml_done = true;
@ -87,11 +85,11 @@ void mca_pml_teg_send_request_progress(
lam_condition_broadcast(&mca_pml_teg.teg_request_cond); lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
} }
} else if (req->super.req_free_called) } else if (req->super.req_free_called)
mca_pml_teg_free((lam_request_t**)&req); MCA_PML_TEG_FREE((lam_request_t**)&req);
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
return; return;
} }
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
/* if first fragment - schedule remaining fragments */ /* if first fragment - schedule remaining fragments */
if(req->req_frags == 1) { if(req->req_frags == 1) {

Просмотреть файл

@ -16,30 +16,26 @@
void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req); void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req);
static inline mca_ptl_base_send_request_t* mca_pml_teg_send_request_alloc( #define MCA_PML_TEG_SEND_REQUEST_ALLOC( \
lam_communicator_t* comm, comm, \
int dst, dst, \
int *rc) sendreq, \
{ rc) \
mca_ptl_base_send_request_t* sendreq; { \
mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \
mca_ptl_proc_t* ptl_proc; mca_ptl_proc_t* ptl_proc; \
mca_ptl_t* ptl; mca_ptl_t* ptl; \
\
THREAD_SCOPED_LOCK(&proc->proc_lock, THREAD_SCOPED_LOCK(&proc->proc_lock, \
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \
ptl = ptl_proc->ptl; ptl = ptl_proc->ptl; \
*rc = ptl->ptl_request_alloc(ptl,&sendreq); rc = ptl->ptl_request_alloc(ptl,&sendreq); \
if(NULL != sendreq) if(NULL != sendreq) \
sendreq->req_peer = ptl_proc->ptl_peer; sendreq->req_peer = ptl_proc->ptl_peer; \
return sendreq;
} }
static inline void mca_pml_teg_send_request_return( #define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \
mca_ptl_base_send_request_t* request)
{
request->req_owner->ptl_request_return(request->req_owner, request); request->req_owner->ptl_request_return(request->req_owner, request);
}
static inline int mca_pml_teg_send_request_start( static inline int mca_pml_teg_send_request_start(
mca_ptl_base_send_request_t* req) mca_ptl_base_send_request_t* req)

Просмотреть файл

@ -19,7 +19,7 @@ int mca_pml_teg_test(
if (NULL != status) if (NULL != status)
*status = pml_request->req_status; *status = pml_request->req_status;
if(false == pml_request->req_persistent) if(false == pml_request->req_persistent)
mca_pml_teg_free(requests+i); MCA_PML_TEG_FREE(requests+i);
} }
} }
@ -60,7 +60,7 @@ int mca_pml_teg_test_all(
} else { } else {
statuses[i] = pml_request->req_status; statuses[i] = pml_request->req_status;
if(false == pml_request->req_persistent) if(false == pml_request->req_persistent)
mca_pml_teg_free(requests+i); MCA_PML_TEG_FREE(requests+i);
} }
} }
} else { } else {
@ -68,7 +68,7 @@ int mca_pml_teg_test_all(
for(i=0; i<count; i++) { for(i=0; i<count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i];
if(NULL != pml_request && false == pml_request->req_persistent) if(NULL != pml_request && false == pml_request->req_persistent)
mca_pml_teg_free(requests+i); MCA_PML_TEG_FREE(requests+i);
} }
} }
return LAM_SUCCESS; return LAM_SUCCESS;

Просмотреть файл

@ -35,7 +35,7 @@ int mca_pml_teg_wait(
if(completed < 0) { if(completed < 0) {
/* give up and sleep until completion */ /* give up and sleep until completion */
lam_mutex_lock(&mca_pml_teg.teg_request_lock); THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
do { do {
for(i=0; i<count; i++) { for(i=0; i<count; i++) {
@ -53,12 +53,12 @@ int mca_pml_teg_wait(
} }
} while(completed < 0); } while(completed < 0);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }
/* return request to pool */ /* return request to pool */
if(false == pml_request->req_persistent) { if(false == pml_request->req_persistent) {
mca_pml_teg_free(request); MCA_PML_TEG_FREE(request);
} }
if (NULL != status) { if (NULL != status) {
*status = pml_request->req_status; *status = pml_request->req_status;
@ -87,7 +87,7 @@ int mca_pml_teg_wait_all(
* acquire lock and test for completion - if all requests are not completed * acquire lock and test for completion - if all requests are not completed
* pend on condition variable until a request completes * pend on condition variable until a request completes
*/ */
lam_mutex_lock(&mca_pml_teg.teg_request_lock); THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
do { do {
completed = 0; completed = 0;
@ -102,7 +102,7 @@ int mca_pml_teg_wait_all(
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
} while (completed != count); } while (completed != count);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
lam_mutex_unlock(&mca_pml_teg.teg_request_lock); THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }
if(NULL != statuses) { if(NULL != statuses) {
@ -114,7 +114,7 @@ int mca_pml_teg_wait_all(
} else { } else {
statuses[i] = pml_request->req_status; statuses[i] = pml_request->req_status;
if (false == pml_request->req_persistent) { if (false == pml_request->req_persistent) {
mca_pml_teg_free(&requests[i]); MCA_PML_TEG_FREE(&requests[i]);
} }
} }
} }
@ -123,7 +123,7 @@ int mca_pml_teg_wait_all(
for(i=0; i<count; i++) { for(i=0; i<count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i];
if (NULL != pml_request && false == pml_request->req_persistent) { if (NULL != pml_request && false == pml_request->req_persistent) {
mca_pml_teg_free(&requests[i]); MCA_PML_TEG_FREE(&requests[i]);
} }
} }
} }

Просмотреть файл

@ -53,9 +53,7 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size);
static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst) static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst)
{ {
mca_ptl_sequence_t sequence; mca_ptl_sequence_t sequence;
lam_mutex_lock(&comm->c_matching_lock); THREAD_SCOPED_LOCK(&comm->c_matching_lock, sequence = comm->c_msg_seq[dst]++);
sequence = comm->c_msg_seq[dst]++;
lam_mutex_unlock(&comm->c_matching_lock);
return sequence; return sequence;
} }

Просмотреть файл

@ -37,31 +37,31 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
* @param comm (IN) Communicator. * @param comm (IN) Communicator.
* @param persistent (IN) Is this a ersistent request. * @param persistent (IN) Is this a ersistent request.
*/ */
static inline void mca_ptl_base_recv_request_init( #define MCA_PTL_BASE_RECV_REQUEST_INIT( \
mca_ptl_base_recv_request_t *request, request, \
void *addr, addr, \
size_t count, count, \
lam_datatype_t* datatype, datatype, \
int src, src, \
int tag, tag, \
lam_communicator_t* comm, comm, \
bool persistent) persistent) \
{ { \
request->req_bytes_msg = 0; request->req_bytes_msg = 0; \
request->req_bytes_received = 0; request->req_bytes_received = 0; \
request->req_bytes_delivered = 0; request->req_bytes_delivered = 0; \
request->super.req_sequence = 0; request->super.req_sequence = 0; \
request->super.req_addr = addr; request->super.req_addr = addr; \
request->super.req_count = count; request->super.req_count = count; \
request->super.req_datatype = datatype; request->super.req_datatype = datatype; \
request->super.req_peer = src; request->super.req_peer = src; \
request->super.req_tag = tag; request->super.req_tag = tag; \
request->super.req_comm = comm; request->super.req_comm = comm; \
request->super.req_proc = NULL; request->super.req_proc = NULL; \
request->super.req_persistent = persistent; request->super.req_persistent = persistent; \
request->super.req_mpi_done = false; request->super.req_mpi_done = false; \
request->super.req_pml_done = false; request->super.req_pml_done = false; \
request->super.req_free_called = false; request->super.req_free_called = false; \
} }
/** /**

Просмотреть файл

@ -50,51 +50,51 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
* @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY) * @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY)
* @param persistent (IN) Is request persistent. * @param persistent (IN) Is request persistent.
*/ */
static inline void mca_ptl_base_send_request_init( #define MCA_PTL_BASE_SEND_REQUEST_INIT( \
mca_ptl_base_send_request_t *request, request, \
void *addr, addr, \
size_t count, count, \
lam_datatype_t* datatype, datatype, \
int peer, peer, \
int tag, tag, \
lam_communicator_t* comm, comm, \
mca_pml_base_send_mode_t mode, mode,\
bool persistent) persistent) \
{ { \
request->req_offset = 0; request->req_offset = 0; \
request->req_frags = 0; request->req_frags = 0; \
request->req_bytes_sent = 0; request->req_bytes_sent = 0; \
request->req_send_mode = mode; request->req_send_mode = mode; \
request->req_peer_request.lval = 0; request->req_peer_request.lval = 0; \
request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); \
request->super.req_addr = addr; request->super.req_addr = addr; \
request->super.req_count = count; request->super.req_count = count; \
request->super.req_datatype = datatype; request->super.req_datatype = datatype; \
request->super.req_peer = peer; request->super.req_peer = peer; \
request->super.req_tag = tag; request->super.req_tag = tag; \
request->super.req_comm = comm; request->super.req_comm = comm; \
request->super.req_proc = lam_comm_peer_lookup(comm,peer); request->super.req_proc = lam_comm_peer_lookup(comm,peer); \
request->super.req_persistent = persistent; request->super.req_persistent = persistent; \
request->super.req_mpi_done = false; request->super.req_mpi_done = false; \
request->super.req_pml_done = false; request->super.req_pml_done = false; \
request->super.req_free_called = false; request->super.req_free_called = false; \
\
/* initialize datatype convertor for this request */ /* initialize datatype convertor for this request */ \
if(count > 0) { if(count > 0) { \
int packed_size; int packed_size; \
lam_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); lam_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); \
lam_convertor_init_for_send( lam_convertor_init_for_send( \
&request->req_convertor, &request->req_convertor, \
0, 0, \
request->super.req_datatype, request->super.req_datatype, \
request->super.req_count, request->super.req_count, \
request->super.req_addr, request->super.req_addr, \
0); 0); \
lam_convertor_get_packed_size(&request->req_convertor, &packed_size); lam_convertor_get_packed_size(&request->req_convertor, &packed_size); \
request->req_bytes_msg = packed_size; request->req_bytes_msg = packed_size; \
} else { } else { \
request->req_bytes_msg = 0; request->req_bytes_msg = 0; \
} } \
} }

Просмотреть файл

@ -98,8 +98,9 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re
{ {
int rc; int rc;
mca_ptl_base_send_request_t* sendreq; mca_ptl_base_send_request_t* sendreq;
sendreq = (mca_ptl_base_send_request_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_requests, &rc); lam_list_item_t* item;
if(NULL != sendreq) LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_requests, item, rc);
if(NULL != (sendreq = (mca_ptl_base_send_request_t*)item))
sendreq->req_owner = ptl; sendreq->req_owner = ptl;
*request = sendreq; *request = sendreq;
return rc; return rc;
@ -109,7 +110,7 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re
void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request) void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request)
{ {
/* OBJ_DESTRUCT(&request->req_convertor); */ /* OBJ_DESTRUCT(&request->req_convertor); */
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request);
} }
@ -118,7 +119,7 @@ void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv
if(frag->super.frag_is_buffered) if(frag->super.frag_is_buffered)
free(frag->super.super.frag_addr); free(frag->super.super.frag_addr);
/* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */ /* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
} }
@ -130,7 +131,7 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send
pending = (mca_ptl_tcp_recv_frag_t*)lam_list_remove_first(&mca_ptl_tcp_module.tcp_pending_acks); pending = (mca_ptl_tcp_recv_frag_t*)lam_list_remove_first(&mca_ptl_tcp_module.tcp_pending_acks);
if(NULL == pending) { if(NULL == pending) {
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag);
return; return;
} }
THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
@ -138,7 +139,7 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send
mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag); mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag);
mca_ptl_tcp_recv_frag_return(ptl, pending); mca_ptl_tcp_recv_frag_return(ptl, pending);
} else { } else {
lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag);
} }
} }
@ -161,8 +162,9 @@ int mca_ptl_tcp_send(
sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag; sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag;
} else { } else {
int rc; int rc;
sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc); lam_list_item_t* item;
if(NULL == sendfrag) LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc);
if(NULL == (sendfrag = (mca_ptl_tcp_send_frag_t*)item))
return rc; return rc;
} }
mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, size, flags); mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, size, flags);
@ -183,8 +185,12 @@ void mca_ptl_tcp_recv(
mca_ptl_base_header_t* header = &frag->super.frag_header; mca_ptl_base_header_t* header = &frag->super.frag_header;
if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
int rc; int rc;
mca_ptl_tcp_send_frag_t* ack = mca_ptl_tcp_send_frag_alloc(&rc); mca_ptl_tcp_send_frag_t* ack;
mca_ptl_tcp_recv_frag_t* recv_frag = (mca_ptl_tcp_recv_frag_t*)frag; mca_ptl_tcp_recv_frag_t* recv_frag = (mca_ptl_tcp_recv_frag_t*)frag;
lam_list_item_t* item;
MCA_PTL_TCP_SEND_FRAG_ALLOC(item, rc);
ack = (mca_ptl_tcp_send_frag_t*)item;
if(NULL == ack) { if(NULL == ack) {
THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock); THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock);
recv_frag->frag_ack_pending = true; recv_frag->frag_ack_pending = true;

Просмотреть файл

@ -178,14 +178,16 @@ int mca_ptl_tcp_module_close(void)
* Create a ptl instance and add to modules list. * Create a ptl instance and add to modules list.
*/ */
static int mca_ptl_tcp_create(int if_index) static int mca_ptl_tcp_create(int if_index, const char* if_name)
{ {
mca_ptl_tcp_t* ptl = malloc(sizeof(mca_ptl_tcp_t)); mca_ptl_tcp_t* ptl = malloc(sizeof(mca_ptl_tcp_t));
char param[256];
char *value;
if(NULL == ptl) if(NULL == ptl)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp)); memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp));
mca_ptl_tcp_module.tcp_ptls[mca_ptl_tcp_module.tcp_num_ptls++] = ptl; mca_ptl_tcp_module.tcp_ptls[mca_ptl_tcp_module.tcp_num_ptls++] = ptl;
/* initialize the ptl */ /* initialize the ptl */
ptl->ptl_ifindex = if_index; ptl->ptl_ifindex = if_index;
#if MCA_PTL_TCP_STATISTICS #if MCA_PTL_TCP_STATISTICS
@ -195,6 +197,19 @@ static int mca_ptl_tcp_create(int if_index)
#endif #endif
lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->ptl_ifaddr, sizeof(ptl->ptl_ifaddr)); lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->ptl_ifaddr, sizeof(ptl->ptl_ifaddr));
lam_ifindextomask(if_index, (struct sockaddr*)&ptl->ptl_ifmask, sizeof(ptl->ptl_ifmask)); lam_ifindextomask(if_index, (struct sockaddr*)&ptl->ptl_ifmask, sizeof(ptl->ptl_ifmask));
/* allow user to specify interface bandwidth */
sprintf(param, "bandwidth_%s", if_name);
ptl->super.ptl_bandwidth = mca_ptl_tcp_param_register_int(param, 0);
/* allow user to override/specify latency ranking */
sprintf(param, "latency_%s", if_name);
ptl->super.ptl_latency = mca_ptl_tcp_param_register_int(param, 0);
#if LAM_ENABLE_DEBUG
lam_output(0,"interface: %s bandwidth %d latency %d\n",
if_name, ptl->super.ptl_bandwidth, ptl->super.ptl_latency);
#endif
return LAM_SUCCESS; return LAM_SUCCESS;
} }
@ -232,8 +247,7 @@ static int mca_ptl_tcp_module_create_instances(void)
if(if_index < 0) { if(if_index < 0) {
lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name); lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name);
} else { } else {
lam_output(0,"interface: %s\n", if_name); mca_ptl_tcp_create(if_index, if_name);
mca_ptl_tcp_create(if_index);
} }
argv++; argv++;
} }
@ -258,8 +272,7 @@ static int mca_ptl_tcp_module_create_instances(void)
} }
/* if this interface was not found in the excluded list - create a PTL */ /* if this interface was not found in the excluded list - create a PTL */
if(argv == 0 || *argv == 0) { if(argv == 0 || *argv == 0) {
lam_output(0,"interface: %s\n", if_name); mca_ptl_tcp_create(if_index, if_name);
mca_ptl_tcp_create(if_index);
} }
} }
lam_argv_free(exclude); lam_argv_free(exclude);
@ -282,6 +295,7 @@ static int mca_ptl_tcp_module_create_listen(void)
lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno); lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno);
return LAM_ERROR; return LAM_ERROR;
} }
mca_ptl_tcp_set_socket_options(mca_ptl_tcp_module.tcp_listen_sd);
/* bind to all addresses and dynamically assigned port */ /* bind to all addresses and dynamically assigned port */
memset(&inaddr, 0, sizeof(inaddr)); memset(&inaddr, 0, sizeof(inaddr));
@ -307,7 +321,7 @@ static int mca_ptl_tcp_module_create_listen(void)
lam_output(0, "mca_ptl_tcp_module_init: listen() failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_module_init: listen() failed with errno=%d", errno);
return LAM_ERROR; return LAM_ERROR;
} }
/* set socket up to be non-blocking, otherwise accept could block */ /* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_GETFL, 0)) < 0) { if((flags = fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_GETFL, 0)) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_GETFL) failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_GETFL) failed with errno=%d", errno);
@ -368,7 +382,7 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
*num_ptls = 0; *num_ptls = 0;
*allow_multi_user_threads = true; *allow_multi_user_threads = true;
*have_hidden_threads = true; *have_hidden_threads = LAM_HAVE_THREADS;
if((rc = lam_event_init()) != LAM_SUCCESS) { if((rc = lam_event_init()) != LAM_SUCCESS) {
lam_output(0, "mca_ptl_tcp_module_init: unable to initialize event dispatch thread: %d\n", rc); lam_output(0, "mca_ptl_tcp_module_init: unable to initialize event dispatch thread: %d\n", rc);

Просмотреть файл

@ -165,7 +165,9 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag); lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag);
} else { } else {
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) { if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) {
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
mca_ptl_tcp_send_frag_progress(frag); mca_ptl_tcp_send_frag_progress(frag);
return rc;
} else { } else {
ptl_peer->peer_send_frag = frag; ptl_peer->peer_send_frag = frag;
lam_event_add(&ptl_peer->peer_send_event, 0); lam_event_add(&ptl_peer->peer_send_event, 0);
@ -178,7 +180,6 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
} }
/* /*
* A blocking send on a non-blocking socket. Used to send the small amount of connection * A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint. * information that identifies the peers endpoint.
@ -384,28 +385,28 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
void mca_ptl_tcp_set_socket_options(int sd) void mca_ptl_tcp_set_socket_options(int sd)
{ {
int optval; int optval;
#if defined(TCP_NODELAY)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) < 0) {
lam_output(0,
"mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELAY) failed with errno=%d\n",
errno);
}
#endif
#if defined(SO_SNDBUF) #if defined(SO_SNDBUF)
if(mca_ptl_tcp_module.tcp_sndbuf > 0 && if(mca_ptl_tcp_module.tcp_sndbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_ptl_tcp_module.tcp_sndbuf, sizeof(int)) < 0) { setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_ptl_tcp_module.tcp_sndbuf, sizeof(int)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: SO_SNDBUF option: errno %d\n", errno); lam_output(0,
"mca_ptl_tcp_set_socket_options: SO_SNDBUF option: errno %d\n",
errno);
} }
#endif #endif
#if defined(SO_RCVBUF) #if defined(SO_RCVBUF)
if(mca_ptl_tcp_module.tcp_rcvbuf > 0 && if(mca_ptl_tcp_module.tcp_rcvbuf > 0 &&
setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_ptl_tcp_module.tcp_rcvbuf, sizeof(int)) < 0) { setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_ptl_tcp_module.tcp_rcvbuf, sizeof(int)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: SO_RCVBUF option: errno %d\n", errno); lam_output(0,
} "mca_ptl_tcp_set_socket_options: SO_RCVBUF option: errno %d\n",
#endif errno);
#if defined(TCP_NODELAY)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELAY) failed with errno=%d\n", errno);
}
#endif
#if defined(TCP_NODELACK)
optval = 1;
if(setsockopt(sd, IPPROTO_TCP, TCP_NODELACK, &optval, sizeof(optval)) < 0) {
lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELACK) failed with errno=%d\n", errno);
} }
#endif #endif
} }
@ -532,7 +533,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
mca_ptl_tcp_recv_frag_t* recv_frag = ptl_peer->peer_recv_frag; mca_ptl_tcp_recv_frag_t* recv_frag = ptl_peer->peer_recv_frag;
if(NULL == recv_frag) { if(NULL == recv_frag) {
int rc; int rc;
recv_frag = mca_ptl_tcp_recv_frag_alloc(&rc); MCA_PTL_TCP_RECV_FRAG_ALLOC(recv_frag, rc);
if(NULL == recv_frag) { if(NULL == recv_frag) {
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock); PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return; return;
@ -581,7 +582,9 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
} }
/* if required - update request status and release fragment */ /* if required - update request status and release fragment */
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
mca_ptl_tcp_send_frag_progress(frag); mca_ptl_tcp_send_frag_progress(frag);
THREAD_LOCK(&ptl_peer->peer_send_lock);
/* progress any pending sends */ /* progress any pending sends */
ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*) ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)

Просмотреть файл

@ -11,10 +11,6 @@
#include "ptl_tcp_sendfrag.h" #include "ptl_tcp_sendfrag.h"
#define frag_header super.super.frag_header
#define frag_peer super.super.frag_peer
#define frag_owner super.super.frag_owner
static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag);
static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag);
@ -46,12 +42,12 @@ static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag)
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer) void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer)
{ {
frag->frag_owner = &peer->peer_ptl->super; frag->super.super.frag_owner = &peer->peer_ptl->super;
frag->super.frag_request = 0;
frag->super.super.frag_addr = NULL; frag->super.super.frag_addr = NULL;
frag->super.super.frag_size = 0; frag->super.super.frag_size = 0;
frag->super.super.frag_peer = peer;
frag->super.frag_request = 0;
frag->super.frag_is_buffered = false; frag->super.frag_is_buffered = false;
frag->frag_peer = peer;
frag->frag_hdr_cnt = 0; frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0; frag->frag_msg_cnt = 0;
frag->frag_ack_pending = false; frag->frag_ack_pending = false;
@ -66,7 +62,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false) if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false)
return false; return false;
switch(frag->frag_header.hdr_common.hdr_type) { switch(frag->super.super.frag_header.hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH: case MCA_PTL_HDR_TYPE_MATCH:
return mca_ptl_tcp_recv_frag_match(frag, sd); return mca_ptl_tcp_recv_frag_match(frag, sd);
case MCA_PTL_HDR_TYPE_FRAG: case MCA_PTL_HDR_TYPE_FRAG:
@ -76,7 +72,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
return mca_ptl_tcp_recv_frag_ack(frag, sd); return mca_ptl_tcp_recv_frag_ack(frag, sd);
default: default:
lam_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X", lam_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
*(unsigned long*)&frag->frag_header); *(unsigned long*)&frag->super.super.frag_header);
return false; return false;
} }
} }
@ -85,12 +81,12 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size) static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size)
{ {
/* non-blocking read - continue if interrupted, otherwise wait until data available */ /* non-blocking read - continue if interrupted, otherwise wait until data available */
unsigned char* ptr = (unsigned char*)&frag->frag_header; unsigned char* ptr = (unsigned char*)&frag->super.super.frag_header;
while(frag->frag_hdr_cnt < size) { while(frag->frag_hdr_cnt < size) {
int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0); int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0);
if(cnt == 0) { if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
if(cnt < 0) { if(cnt < 0) {
@ -102,8 +98,8 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
return false; return false;
default: default:
lam_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
} }
@ -120,11 +116,11 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
{ {
mca_ptl_tcp_send_frag_t* sendfrag; mca_ptl_tcp_send_frag_t* sendfrag;
mca_ptl_base_send_request_t* sendreq; mca_ptl_base_send_request_t* sendreq;
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_header.hdr_ack.hdr_src_ptr.pval; sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->super.frag_request; sendreq = sendfrag->super.frag_request;
sendreq->req_peer_request = frag->frag_header.hdr_ack.hdr_dst_ptr; sendreq->req_peer_request = frag->super.super.frag_header.hdr_ack.hdr_dst_ptr;
sendfrag->frag_owner->ptl_send_progress(sendreq, &sendfrag->super); mca_ptl_tcp_send_frag_progress(sendfrag);
mca_ptl_tcp_recv_frag_return(frag->frag_owner, frag); mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
return true; return true;
} }
@ -134,13 +130,13 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
/* first pass through - attempt a match */ /* first pass through - attempt a match */
if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) { if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) {
/* attempt to match a posted recv */ /* attempt to match a posted recv */
if(mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match)) { if(mca_ptl_base_recv_frag_match(&frag->super, &frag->super.super.frag_header.hdr_match)) {
mca_ptl_tcp_recv_frag_matched(frag); mca_ptl_tcp_recv_frag_matched(frag);
} else { } else {
/* match was not made - so allocate buffer for eager send */ /* match was not made - so allocate buffer for eager send */
if(frag->frag_header.hdr_frag.hdr_frag_length > 0) { if(frag->super.super.frag_header.hdr_frag.hdr_frag_length > 0) {
frag->super.super.frag_addr = malloc(frag->frag_header.hdr_frag.hdr_frag_length); frag->super.super.frag_addr = malloc(frag->super.super.frag_header.hdr_frag.hdr_frag_length);
frag->super.super.frag_size = frag->frag_header.hdr_frag.hdr_frag_length; frag->super.super.frag_size = frag->super.super.frag_header.hdr_frag.hdr_frag_length;
frag->super.frag_is_buffered = true; frag->super.frag_is_buffered = true;
} }
} }
@ -154,7 +150,7 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
} }
/* discard any data that exceeds the posted receive */ /* discard any data that exceeds the posted receive */
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) { if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) {
return false; return false;
} }
@ -170,7 +166,7 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
{ {
/* get request from header */ /* get request from header */
if(frag->frag_msg_cnt == 0) { if(frag->frag_msg_cnt == 0) {
frag->super.frag_request = frag->frag_header.hdr_frag.hdr_dst_ptr.pval; frag->super.frag_request = frag->super.super.frag_header.hdr_frag.hdr_dst_ptr.pval;
mca_ptl_tcp_recv_frag_matched(frag); mca_ptl_tcp_recv_frag_matched(frag);
} }
@ -180,7 +176,7 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false; return false;
} }
if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false; return false;
@ -201,8 +197,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt, int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt,
frag->super.super.frag_size-frag->frag_msg_cnt, 0); frag->super.super.frag_size-frag->frag_msg_cnt, 0);
if(cnt == 0) { if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
if(cnt < 0) { if(cnt < 0) {
@ -213,8 +209,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false; return false;
default: default:
lam_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
} }
@ -234,13 +230,14 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
{ {
while(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) { while(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) {
void *rbuf = malloc(frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt); size_t count = frag->super.super.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt;
int cnt = recv(sd, rbuf, frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt, 0); void *rbuf = malloc(count);
int cnt = recv(sd, rbuf, count, 0);
free(rbuf); free(rbuf);
if(cnt == 0) { if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
if(cnt < 0) { if(cnt < 0) {
@ -252,8 +249,8 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false; return false;
default: default:
lam_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno);
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
return false; return false;
} }
} }

Просмотреть файл

@ -34,10 +34,13 @@ struct mca_ptl_tcp_recv_frag_t {
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
static inline mca_ptl_tcp_recv_frag_t* mca_ptl_tcp_recv_frag_alloc(int* rc) #define MCA_PTL_TCP_RECV_FRAG_ALLOC(frag, rc) \
{ { \
return (mca_ptl_tcp_recv_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_recv_frags, rc); lam_list_item_t* item; \
} LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_recv_frags, item, rc); \
frag = (mca_ptl_tcp_recv_frag_t*)item; \
}
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd); bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd);
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer); void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer);
@ -88,43 +91,50 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
} }
static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag) static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
{ {
if(frag->frag_msg_cnt >= frag->super.super.frag_header.hdr_frag.hdr_frag_length) { if((frag)->frag_msg_cnt >= (frag)->super.super.frag_header.hdr_frag.hdr_frag_length) {
if(fetchNset(&frag->frag_progressed, 1) == 0) { mca_ptl_base_recv_request_t* request = (frag)->super.frag_request;
mca_ptl_base_recv_request_t* request = frag->super.frag_request;
/* make sure this only happens once for threaded case */
if(frag->super.frag_is_buffered) { if(lam_using_threads()) {
mca_ptl_base_match_header_t* header = &frag->super.super.frag_header.hdr_match; if(fetchNset(&(frag)->frag_progressed, 1) == 1)
return;
/* } else {
* Initialize convertor and use it to unpack data if((frag)->frag_progressed == 1)
*/ return;
struct iovec iov; (frag)->frag_progressed = 1;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor);
lam_convertor_init_for_recv(
&frag->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
iov.iov_base = frag->super.super.frag_addr;
iov.iov_len = frag->super.super.frag_size;
lam_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1);
}
/* progress the request */
frag->super.super.frag_owner->ptl_recv_progress(request, &frag->super);
if(frag->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
}
} }
}
if((frag)->super.frag_is_buffered) {
mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match;
/*
* Initialize convertor and use it to unpack data
*/
struct iovec iov;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor);
lam_convertor_init_for_recv(
&(frag)->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
iov.iov_base = (frag)->super.super.frag_addr;
iov.iov_len = (frag)->super.super.frag_size;
lam_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1);
}
/* progress the request */
(frag)->super.super.frag_owner->ptl_recv_progress(request, &(frag)->super);
if((frag)->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag));
}
}
} }
#endif #endif

Просмотреть файл

@ -131,6 +131,7 @@ int mca_ptl_tcp_send_frag_init(
sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2; sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2;
sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr; sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t); sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
sendfrag->frag_progressed = 0;
return LAM_SUCCESS; return LAM_SUCCESS;
} }

Просмотреть файл

@ -28,14 +28,13 @@ struct mca_ptl_tcp_send_frag_t {
struct iovec *frag_vec_ptr; /**< pointer into iovec array */ struct iovec *frag_vec_ptr; /**< pointer into iovec array */
size_t frag_vec_cnt; /**< number of iovec structs left to process */ size_t frag_vec_cnt; /**< number of iovec structs left to process */
struct iovec frag_vec[2]; /**< array of iovecs for send */ struct iovec frag_vec[2]; /**< array of iovecs for send */
volatile int frag_progressed; /**< for threaded case - has request status been updated */
}; };
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t; typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
static inline mca_ptl_tcp_send_frag_t* mca_ptl_tcp_send_frag_alloc(int* rc) #define MCA_PTL_TCP_SEND_FRAG_ALLOC(item, rc) \
{ LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc);
return (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, rc);
}
bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd); bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd);
@ -49,26 +48,45 @@ int mca_ptl_tcp_send_frag_init(
int flags); int flags);
static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag) /*
{ * For fragments that require an acknowledgment, this routine will be called
mca_ptl_base_send_request_t* request = frag->super.frag_request; * twice, once when the send completes, and again when the acknowledgment is
* returned. Only the last caller should update the request status, so we
* add a lock w/ the frag_progressed flag.
*/
/* if this is an ack - simply return to pool */ static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag)
if(request == NULL) { {
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); mca_ptl_base_send_request_t* request = frag->super.frag_request;
/* otherwise, if an ack is not required or has already been received, update request status */ /* if this is an ack - simply return to pool */
} else if ((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 || if(request == NULL) {
mca_ptl_base_send_request_matched(request) == true) { mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
frag->super.super.frag_owner->ptl_send_progress(request, &frag->super);
/* otherwise, if the message has been sent, and an ack has already been
* received, go ahead and update the request status
*/
} else if (frag->frag_vec_cnt == 0 &&
((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 ||
mca_ptl_base_send_request_matched(request))) {
/* make sure this only happens once in threaded case */
if (lam_using_threads() && fetchNset(&frag->frag_progressed, 1) == 1) {
return;
} else {
frag->frag_progressed = 1;
}
/* update request status */
frag->super.super.frag_owner->ptl_send_progress(request, &frag->super);
/* the first fragment is allocated with the request, /* the first fragment is allocated with the request,
* all others need to be returned to free list * all others need to be returned to free list
*/ */
if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0) if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0)
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
} }
} }
static inline void mca_ptl_tcp_send_frag_init_ack( static inline void mca_ptl_tcp_send_frag_init_ack(

Просмотреть файл

@ -40,27 +40,29 @@ int lam_free_list_init(
int lam_free_list_grow(lam_free_list_t* flist, size_t num_elements); int lam_free_list_grow(lam_free_list_t* flist, size_t num_elements);
static inline lam_list_item_t *lam_free_list_get(lam_free_list_t * fl, int *rc) #define LAM_FREE_LIST_GET(fl, item, rc) \
{ { \
lam_list_item_t* item; if(lam_using_threads()) { \
THREAD_LOCK(&fl->fl_lock); lam_mutex_lock(&((fl)->fl_lock)); \
item = lam_list_remove_first(&fl->super); item = lam_list_remove_first(&((fl)->super)); \
if(NULL == item) { if(NULL == item) { \
lam_free_list_grow(fl, fl->fl_num_per_alloc); lam_free_list_grow((fl), (fl)->fl_num_per_alloc); \
item = lam_list_remove_first(&fl->super); item = lam_list_remove_first(&((fl)->super)); \
} } \
THREAD_UNLOCK(&fl->fl_lock); lam_mutex_unlock(&((fl)->fl_lock)); \
*rc = (NULL != item) ? LAM_SUCCESS : LAM_ERR_TEMP_OUT_OF_RESOURCE; } else { \
return item; item = lam_list_remove_first(&((fl)->super)); \
} if(NULL == item) { \
lam_free_list_grow((fl), (fl)->fl_num_per_alloc); \
item = lam_list_remove_first(&((fl)->super)); \
} \
} \
rc = (NULL == item) ? LAM_ERR_TEMP_OUT_OF_RESOURCE : LAM_SUCCESS; \
}
static inline int lam_free_list_return(lam_free_list_t *fl, lam_list_item_t *item)
{ #define LAM_FREE_LIST_RETURN(fl, item) \
THREAD_LOCK(&fl->fl_lock); THREAD_SCOPED_LOCK(&((fl)->fl_lock), lam_list_append(&((fl)->super), (item)));
lam_list_append(&fl->super, item);
THREAD_UNLOCK(&fl->fl_lock);
return LAM_SUCCESS;
}
#endif #endif

Просмотреть файл

@ -5,6 +5,7 @@
#define LAM_CONDITION_SPINLOCK_H #define LAM_CONDITION_SPINLOCK_H
#include "threads/condition.h" #include "threads/condition.h"
#include "threads/mutex.h"
#include "threads/mutex_spinlock.h" #include "threads/mutex_spinlock.h"
#include "runtime/lam_progress.h" #include "runtime/lam_progress.h"
@ -21,10 +22,16 @@ OBJ_CLASS_DECLARATION(lam_condition_t);
static inline int lam_condition_wait(lam_condition_t* c, lam_mutex_t* m) static inline int lam_condition_wait(lam_condition_t* c, lam_mutex_t* m)
{ {
c->c_waiting++; c->c_waiting++;
while(c->c_signaled == 0) { if(lam_using_threads()) {
lam_mutex_unlock(m); while(c->c_signaled == 0) {
lam_progress(); lam_mutex_unlock(m);
lam_mutex_lock(m); lam_progress();
lam_mutex_lock(m);
}
} else {
while(c->c_signaled == 0) {
lam_progress();
}
} }
c->c_signaled--; c->c_signaled--;
c->c_waiting--; c->c_waiting--;