From e68af0dde80d059b5fe73071c60caaead19a0abc Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Tue, 18 May 2004 21:06:11 +0000 Subject: [PATCH] - inlining multiple functions - changed to macros functions the compiler wouldn't inline - removed unused event handling code from critical path - fixed threading issue in tcp code This commit was SVN r1136. --- src/event/epoll.c | 11 +- src/event/event.c | 224 +++++++++---------------- src/event/event.h | 103 ++++++++++-- src/event/kqueue.c | 13 +- src/event/poll.c | 50 ++++-- src/event/select.c | 39 ++++- src/lfc/lam_list.c | 115 ------------- src/lfc/lam_list.h | 100 ++++++++++- src/lfc/lam_pointer_array.c | 42 ----- src/lfc/lam_pointer_array.h | 11 +- src/mca/pml/teg/src/pml_teg.c | 2 +- src/mca/pml/teg/src/pml_teg.h | 27 +++ src/mca/pml/teg/src/pml_teg_free.c | 24 +-- src/mca/pml/teg/src/pml_teg_irecv.c | 45 +++-- src/mca/pml/teg/src/pml_teg_isend.c | 41 ++++- src/mca/pml/teg/src/pml_teg_recvreq.c | 4 +- src/mca/pml/teg/src/pml_teg_recvreq.h | 16 +- src/mca/pml/teg/src/pml_teg_sendreq.c | 16 +- src/mca/pml/teg/src/pml_teg_sendreq.h | 38 ++--- src/mca/pml/teg/src/pml_teg_test.c | 6 +- src/mca/pml/teg/src/pml_teg_wait.c | 14 +- src/mca/ptl/base/ptl_base_comm.h | 4 +- src/mca/ptl/base/ptl_base_recvreq.h | 50 +++--- src/mca/ptl/base/ptl_base_sendreq.h | 90 +++++----- src/mca/ptl/tcp/src/ptl_tcp.c | 24 ++- src/mca/ptl/tcp/src/ptl_tcp_module.c | 30 +++- src/mca/ptl/tcp/src/ptl_tcp_peer.c | 35 ++-- src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c | 69 ++++---- src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h | 90 +++++----- src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c | 1 + src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h | 58 ++++--- src/mem/free_list.h | 42 ++--- src/threads/condition_spinlock.h | 15 +- 33 files changed, 778 insertions(+), 671 deletions(-) diff --git a/src/event/epoll.c b/src/event/epoll.c index b70038fbdc..bc013839b8 100644 --- a/src/event/epoll.c +++ b/src/event/epoll.c @@ -57,7 +57,6 @@ extern struct lam_event_list lam_eventqueue; extern volatile sig_atomic_t lam_evsignal_caught; -extern lam_mutex_t lam_event_mutex; extern lam_mutex_t lam_event_lock; /* due to limitations in the epoll interface, we need to keep track of @@ -175,9 +174,13 @@ epoll_dispatch(void *arg, struct timeval *tv) return (-1); timeout = tv->tv_sec * 1000 + tv->tv_usec / 1000; - lam_mutex_unlock(&lam_event_lock); - res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); - lam_mutex_lock(&lam_event_lock); + if(lam_using_threads()) { + lam_mutex_unlock(&lam_event_lock); + res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); + lam_mutex_lock(&lam_event_lock); + } else { + res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); + } if (lam_evsignal_recalc(&epollop->evsigmask) == -1) return (-1); diff --git a/src/event/event.c b/src/event/event.c index e977d7ab7e..d0fcb0f12f 100644 --- a/src/event/event.c +++ b/src/event/event.c @@ -117,13 +117,12 @@ int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */ int lam_event_gotsig; /* Set in signal handler */ /* Prototypes */ +static void lam_event_process_active(void); +static void lam_timeout_correct(struct timeval *off); +static void lam_timeout_insert(struct lam_event *); static void lam_event_queue_insert(struct lam_event *, int); static void lam_event_queue_remove(struct lam_event *, int); -static void lam_event_process_active(void); -static int lam_timeout_next(struct timeval *tv); -static void lam_timeout_correct(struct timeval *off); static void lam_timeout_process(void); -static void lam_timeout_insert(struct lam_event *); int lam_event_haveevents(void); static RB_HEAD(lam_event_tree, lam_event) lam_timetree; @@ -153,6 +152,29 @@ static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare) static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare) +static int lam_timeout_next(struct timeval *tv) +{ + struct timeval dflt = LAM_TIMEOUT_DEFAULT; + struct timeval now; + struct lam_event *ev; + + if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) { + *tv = dflt; + return(0); + } + + if (gettimeofday(&now, NULL) == -1) + return (-1); + + if (timercmp(&ev->ev_timeout, &now, <=)) { + timerclear(tv); + return (0); + } + + timersub(&ev->ev_timeout, &now, tv); + return (0); +} + /* run loop for dispatch thread */ static void* lam_event_run(lam_object_t* arg) { @@ -258,9 +280,13 @@ lam_event_process_active(void) while (ncalls) { ncalls--; ev->ev_ncalls = ncalls; - lam_mutex_unlock(&lam_event_lock); - (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); - lam_mutex_lock(&lam_event_lock); + if(lam_using_threads()) { + lam_mutex_unlock(&lam_event_lock); + (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); + lam_mutex_lock(&lam_event_lock); + } else { + (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); + } } } } @@ -277,12 +303,12 @@ lam_event_loop(int flags) struct timeval tv; int res, done; - lam_mutex_lock(&lam_event_lock); + THREAD_LOCK(&lam_event_lock); /* Calculate the initial events that we are waiting for */ - if (lam_evsel->recalc(lam_evbase, 0) == -1) { + if (lam_evsel->recalc && lam_evsel->recalc(lam_evbase, 0) == -1) { lam_output(0, "lam_event_loop: lam_evsel->recalc() failed."); - lam_mutex_unlock(&lam_event_lock); + THREAD_UNLOCK(&lam_event_lock); return (-1); } @@ -295,28 +321,16 @@ lam_event_loop(int flags) if (res == -1) { lam_output(0, "lam_event_loop: lam_event_sigcb() failed."); errno = EINTR; - lam_mutex_unlock(&lam_event_lock); + THREAD_UNLOCK(&lam_event_lock); return (-1); } } } - /* Check if time is running backwards */ - gettimeofday(&tv, NULL); - if (timercmp(&tv, &lam_event_tv, <)) { - struct timeval off; - LOG_DBG((LOG_MISC, 10, - "%s: time is running backwards, corrected", - __func__)); - - timersub(&lam_event_tv, &tv, &off); - lam_timeout_correct(&off); - } - lam_event_tv = tv; - - if (!(flags & LAM_EVLOOP_NONBLOCK)) - lam_timeout_next(&tv); - else + if (!(flags & LAM_EVLOOP_NONBLOCK)) { + static struct timeval dflt = LAM_TIMEOUT_DEFAULT; + tv = dflt; + } else timerclear(&tv); #if LAM_HAVE_THREADS @@ -328,11 +342,25 @@ lam_event_loop(int flags) #endif if (res == -1) { lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed."); - lam_mutex_unlock(&lam_event_lock); + THREAD_UNLOCK(&lam_event_lock); return (-1); } - lam_timeout_process(); + if(NULL != RB_MIN(lam_event_tree, &lam_timetree)) { + /* Check if time is running backwards */ + gettimeofday(&tv, NULL); + if (timercmp(&tv, &lam_event_tv, <)) { + struct timeval off; + LOG_DBG((LOG_MISC, 10, + "%s: time is running backwards, corrected", + __func__)); + + timersub(&lam_event_tv, &tv, &off); + lam_timeout_correct(&off); + } + lam_event_tv = tv; + lam_timeout_process(); + } if (TAILQ_FIRST(&lam_activequeue)) { lam_event_process_active(); @@ -341,58 +369,16 @@ lam_event_loop(int flags) } else if (flags & (LAM_EVLOOP_NONBLOCK|LAM_EVLOOP_ONCE)) done = 1; - if (lam_evsel->recalc(lam_evbase, 0) == -1) { + if (lam_evsel->recalc && lam_evsel->recalc(lam_evbase, 0) == -1) { lam_output(0, "lam_event_loop: lam_evesel->recalc() failed."); - lam_mutex_unlock(&lam_event_lock); + THREAD_UNLOCK(&lam_event_lock); return (-1); } } - lam_mutex_unlock(&lam_event_lock); + THREAD_UNLOCK(&lam_event_lock); return (0); } -void -lam_event_set(struct lam_event *ev, int fd, short events, - void (*callback)(int, short, void *), void *arg) -{ - ev->ev_callback = callback; - ev->ev_arg = arg; -#ifdef WIN32 - ev->ev_fd = (HANDLE)fd; - ev->overlap.hEvent = ev; -#else - ev->ev_fd = fd; -#endif - ev->ev_events = events; - ev->ev_flags = LAM_EVLIST_INIT; - ev->ev_ncalls = 0; - ev->ev_pncalls = NULL; -} - -/* - * Checks if a specific event is pending or scheduled. - */ - -int -lam_event_pending(struct lam_event *ev, short event, struct timeval *tv) -{ - int flags = 0; - - if (ev->ev_flags & LAM_EVLIST_INSERTED) - flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)); - if (ev->ev_flags & LAM_EVLIST_ACTIVE) - flags |= ev->ev_res; - if (ev->ev_flags & LAM_EVLIST_TIMEOUT) - flags |= LAM_EV_TIMEOUT; - - event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE); - - /* See if there is a timeout that we should report */ - if (tv != NULL && (flags & event & LAM_EV_TIMEOUT)) - *tv = ev->ev_timeout; - - return (flags & event); -} int lam_event_add_i(struct lam_event *ev, struct timeval *tv) @@ -461,15 +447,6 @@ lam_event_add_i(struct lam_event *ev, struct timeval *tv) return rc; } -int -lam_event_add(struct lam_event *ev, struct timeval *tv) -{ - int rc; - lam_mutex_lock(&lam_event_lock); - rc = lam_event_add_i(ev, tv); - lam_mutex_unlock(&lam_event_lock); - return rc; -} int lam_event_del_i(struct lam_event *ev) { @@ -509,72 +486,6 @@ int lam_event_del_i(struct lam_event *ev) return (0); } -int lam_event_del(struct lam_event *ev) -{ - int rc; - lam_mutex_lock(&lam_event_lock); - rc = lam_event_del_i(ev); - - lam_mutex_unlock(&lam_event_lock); - return rc; -} - -void -lam_event_active_i(struct lam_event *ev, int res, short ncalls) -{ - /* We get different kinds of events, add them together */ - if (ev->ev_flags & LAM_EVLIST_ACTIVE) { - ev->ev_res |= res; - return; - } - - ev->ev_res = res; - ev->ev_ncalls = ncalls; - ev->ev_pncalls = NULL; - lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE); -} - - -void -lam_event_active(struct lam_event* ev, int res, short ncalls) -{ - lam_mutex_lock(&lam_event_lock); - lam_event_active_i(ev, res, ncalls); - lam_mutex_unlock(&lam_event_lock); -} - - - -static int -lam_timeout_next(struct timeval *tv) -{ - struct timeval dflt = LAM_TIMEOUT_DEFAULT; - - struct timeval now; - struct lam_event *ev; - - if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) { - *tv = dflt; - return (0); - } - - if (gettimeofday(&now, NULL) == -1) - return (-1); - - if (timercmp(&ev->ev_timeout, &now, <=)) { - timerclear(tv); - return (0); - } - - timersub(&ev->ev_timeout, &now, tv); - - assert(tv->tv_sec >= 0); - assert(tv->tv_usec >= 0); - - LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec)); - return (0); -} - static void lam_timeout_correct(struct timeval *off) @@ -690,3 +601,18 @@ lam_event_queue_insert(struct lam_event *ev, int queue) } } +void lam_event_active_i(struct lam_event * ev, int res, short ncalls) +{ + /* We get different kinds of events, add them together */ + if (ev->ev_flags & LAM_EVLIST_ACTIVE) { + ev->ev_res |= res; + return; + } + + ev->ev_res = res; + ev->ev_ncalls = ncalls; + ev->ev_pncalls = NULL; + lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE); +} + + diff --git a/src/event/event.h b/src/event/event.h index d703c3d502..ff91772a22 100644 --- a/src/event/event.h +++ b/src/event/event.h @@ -39,6 +39,7 @@ extern "C" { unqualified name for a header file. :-) */ #include "lam_config.h" #endif +#include "threads/mutex.h" #ifdef HAVE_SYS_TIME_H #include @@ -63,6 +64,10 @@ extern "C" { #define LAM_EV_SIGNAL 0x08 #define LAM_EV_PERSIST 0x10 /* Persistant event */ +#ifndef LAM_EVENT_USE_SIGNALS +#define LAM_EVENT_USE_SIGNALS 0 +#endif + /* Fix so that ppl dont have to run with */ #ifndef TAILQ_ENTRY #define _EVENT_DEFINED_TQENTRY @@ -160,11 +165,94 @@ int lam_event_loop(int); #define lam_signal_pending(ev, tv) lam_event_pending(ev, LAM_EV_SIGNAL, tv) #define lam_signal_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) -void lam_event_set(struct lam_event *, int, short, void (*)(int, short, void *), void *); -int lam_event_add(struct lam_event *, struct timeval *); -int lam_event_del(struct lam_event *); -int lam_event_pending(struct lam_event *, short, struct timeval *); -void lam_event_active(struct lam_event *, int, short); +/* for internal use only */ +int lam_event_add_i(struct lam_event *, struct timeval *); +int lam_event_del_i(struct lam_event *); +void lam_event_active_i(struct lam_event*, int, short); + +/* public functions */ +static inline void +lam_event_set(struct lam_event *ev, int fd, short events, + void (*callback)(int, short, void *), void *arg) +{ + ev->ev_callback = callback; + ev->ev_arg = arg; +#ifdef WIN32 + ev->ev_fd = (HANDLE)fd; + ev->overlap.hEvent = ev; +#else + ev->ev_fd = fd; +#endif + ev->ev_events = events; + ev->ev_flags = LAM_EVLIST_INIT; + ev->ev_ncalls = 0; + ev->ev_pncalls = NULL; +} + +static inline int +lam_event_add(struct lam_event *ev, struct timeval *tv) +{ + extern lam_mutex_t lam_event_lock; + int rc; + if(lam_using_threads()) { + lam_mutex_lock(&lam_event_lock); + rc = lam_event_add_i(ev, tv); + lam_mutex_unlock(&lam_event_lock); + } else { + rc = lam_event_add_i(ev, tv); + } + return rc; +} + +static inline int +lam_event_del(struct lam_event *ev) +{ + extern lam_mutex_t lam_event_lock; + int rc; + if(lam_using_threads()) { + lam_mutex_lock(&lam_event_lock); + rc = lam_event_del_i(ev); + lam_mutex_unlock(&lam_event_lock); + } else { + rc = lam_event_del_i(ev); + } + return rc; +} + +static inline void +lam_event_active(struct lam_event* ev, int res, short ncalls) +{ + extern lam_mutex_t lam_event_lock; + if(lam_using_threads()) { + lam_mutex_lock(&lam_event_lock); + lam_event_active_i(ev, res, ncalls); + lam_mutex_unlock(&lam_event_lock); + } else { + lam_event_active_i(ev, res, ncalls); + } +} + +static inline int +lam_event_pending(struct lam_event *ev, short event, struct timeval *tv) +{ + int flags = 0; + + if (ev->ev_flags & LAM_EVLIST_INSERTED) + flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)); + if (ev->ev_flags & LAM_EVLIST_ACTIVE) + flags |= ev->ev_res; + if (ev->ev_flags & LAM_EVLIST_TIMEOUT) + flags |= LAM_EV_TIMEOUT; + + event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE); + + /* See if there is a timeout that we should report */ + if (tv != NULL && (flags & event & LAM_EV_TIMEOUT)) + *tv = ev->ev_timeout; + + return (flags & event); +} + #ifdef WIN32 #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE) @@ -172,11 +260,6 @@ void lam_event_active(struct lam_event *, int, short); #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) #endif -/* for internal use only */ -int lam_event_add_i(struct lam_event *, struct timeval *); -int lam_event_del_i(struct lam_event *); -void lam_event_active_i(struct lam_event *, int, short); - #ifdef __cplusplus } #endif diff --git a/src/event/kqueue.c b/src/event/kqueue.c index 681a8b56cc..b4584a486b 100644 --- a/src/event/kqueue.c +++ b/src/event/kqueue.c @@ -200,10 +200,15 @@ kq_dispatch(void *arg, struct timeval *tv) TIMEVAL_TO_TIMESPEC(tv, &ts); /* release lock while waiting in kernel */ - lam_mutex_unlock(&lam_event_lock); - res = kevent(kqop->kq, changes, kqop->nchanges, - events, kqop->nevents, &ts); - lam_mutex_lock(&lam_event_lock); + if(lam_using_threads()) { + lam_mutex_unlock(&lam_event_lock); + res = kevent(kqop->kq, changes, kqop->nchanges, + events, kqop->nevents, &ts); + lam_mutex_lock(&lam_event_lock); + } else { + res = kevent(kqop->kq, changes, kqop->nchanges, + events, kqop->nevents, &ts); + } kqop->nchanges = 0; if (res == -1) { diff --git a/src/event/poll.c b/src/event/poll.c index 78a9489094..d99fcb7502 100644 --- a/src/event/poll.c +++ b/src/event/poll.c @@ -52,7 +52,9 @@ #endif #include "event.h" +#if LAM_EVENT_USE_SIGNALS #include "evsignal.h" +#endif #include "threads/mutex.h" @@ -64,7 +66,9 @@ struct pollop { int event_count; /* Highest number alloc */ struct pollfd *event_set; struct lam_event **event_back; +#if LAM_EVENT_USE_SIGNALS sigset_t evsigmask; +#endif } pollop; static void *poll_init (void); @@ -78,7 +82,7 @@ const struct lam_eventop lam_pollops = { poll_init, poll_add, poll_del, - poll_recalc, + NULL, poll_dispatch }; @@ -90,9 +94,9 @@ poll_init(void) return (NULL); memset(&pollop, 0, sizeof(pollop)); - +#if LAM_EVENT_USE_SIGNALS lam_evsignal_init(&pollop.evsigmask); - +#endif return (&pollop); } @@ -104,9 +108,12 @@ poll_init(void) static int poll_recalc(void *arg, int max) { +#if LAM_EVENT_USE_SIGNALS struct pollop *pop = arg; - return (lam_evsignal_recalc(&pop->evsigmask)); +#else + return (0); +#endif } static int @@ -163,16 +170,24 @@ poll_dispatch(void *arg, struct timeval *tv) } } +#if LAM_EVENT_USE_SIGNALS if (lam_evsignal_deliver(&pop->evsigmask) == -1) return (-1); +#endif sec = tv->tv_sec * 1000 + tv->tv_usec / 1000; - lam_mutex_unlock(&lam_event_lock); - res = poll(pop->event_set, nfds, sec); - lam_mutex_lock(&lam_event_lock); + if(lam_using_threads()) { + lam_mutex_unlock(&lam_event_lock); + res = poll(pop->event_set, nfds, sec); + lam_mutex_lock(&lam_event_lock); + } else { + res = poll(pop->event_set, nfds, sec); + } +#if LAM_EVENT_USE_SIGNALS if (lam_evsignal_recalc(&pop->evsigmask) == -1) return (-1); +#endif if (res == -1) { if (errno != EINTR) { @@ -180,10 +195,16 @@ poll_dispatch(void *arg, struct timeval *tv) return (-1); } +#if LAM_EVENT_USE_SIGNALS lam_evsignal_process(); +#endif return (0); - } else if (lam_evsignal_caught) + } + +#if LAM_EVENT_USE_SIGNALS + else if (lam_evsignal_caught) lam_evsignal_process(); +#endif LOG_DBG((LOG_MISC, 80, "%s: poll reports %d", __func__, res)); @@ -223,11 +244,11 @@ poll_dispatch(void *arg, struct timeval *tv) static int poll_add(void *arg, struct lam_event *ev) { +#if LAM_EVENT_USE_SIGNALS struct pollop *pop = arg; - if (ev->ev_events & LAM_EV_SIGNAL) return (lam_evsignal_add(&pop->evsigmask, ev)); - +#endif return (0); } @@ -238,10 +259,15 @@ poll_add(void *arg, struct lam_event *ev) static int poll_del(void *arg, struct lam_event *ev) { +#if LAM_EVENT_USE_SIGNALS struct pollop *pop = arg; - +#endif if (!(ev->ev_events & LAM_EV_SIGNAL)) return (0); - +#if LAM_EVENT_USE_SIGNALS return (lam_evsignal_del(&pop->evsigmask, ev)); +#else + return (0); +#endif } + diff --git a/src/event/select.c b/src/event/select.c index d440bf7147..ed617bdcbb 100644 --- a/src/event/select.c +++ b/src/event/select.c @@ -142,8 +142,11 @@ select_recalc(void *arg, int max) sop->event_writeset = writeset; sop->event_fdsz = fdsz; } - +#if LAM_EVENT_USE_SIGNALS return (lam_evsignal_recalc(&sop->evsigmask)); +#else + return (0); +#endif } static int @@ -163,17 +166,26 @@ select_dispatch(void *arg, struct timeval *tv) FD_SET(ev->ev_fd, sop->event_readset); } +#if LAM_EVENT_USE_SIGNALS if (lam_evsignal_deliver(&sop->evsigmask) == -1) return (-1); +#endif /* release lock while waiting in kernel */ - lam_mutex_unlock(&lam_event_lock); - res = select(sop->event_fds + 1, sop->event_readset, - sop->event_writeset, NULL, tv); - lam_mutex_lock(&lam_event_lock); + if(lam_using_threads()) { + lam_mutex_unlock(&lam_event_lock); + res = select(sop->event_fds + 1, sop->event_readset, + sop->event_writeset, NULL, tv); + lam_mutex_lock(&lam_event_lock); + } else { + res = select(sop->event_fds + 1, sop->event_readset, + sop->event_writeset, NULL, tv); + } +#if LAM_EVENT_USE_SIGNALS if (lam_evsignal_recalc(&sop->evsigmask) == -1) return (-1); +#endif if (res == -1) { if (errno != EINTR) { @@ -181,13 +193,15 @@ select_dispatch(void *arg, struct timeval *tv) return (-1); } +#if LAM_EVENT_USE_SIGNALS lam_evsignal_process(); +#endif return (0); - } else if (lam_evsignal_caught) + } +#if LAM_EVENT_USE_SIGNALS + else if (lam_evsignal_caught) lam_evsignal_process(); - - LOG_DBG((LOG_MISC, 80, "%s: select reports %d", __func__, res)); - +#endif maxfd = 0; for (ev = TAILQ_FIRST(&lam_eventqueue); ev != NULL; ev = next) { next = TAILQ_NEXT(ev, ev_next); @@ -218,8 +232,10 @@ select_add(void *arg, struct lam_event *ev) { struct selectop *sop = arg; +#if LAM_EVENT_USE_SIGNALS if (ev->ev_events & LAM_EV_SIGNAL) return (lam_evsignal_add(&sop->evsigmask, ev)); +#endif /* * Keep track of the highest fd, so that we can calculate the size @@ -243,5 +259,10 @@ select_del(void *arg, struct lam_event *ev) if (!(ev->ev_events & LAM_EV_SIGNAL)) return (0); +#if LAM_EVENT_USE_SIGNALS return (lam_evsignal_del(&sop->evsigmask, ev)); +#else + return (0); +#endif } + diff --git a/src/lfc/lam_list.c b/src/lfc/lam_list.c index ddfb6594d0..5c0d3f8cf5 100644 --- a/src/lfc/lam_list.c +++ b/src/lfc/lam_list.c @@ -69,33 +69,6 @@ static void lam_list_destruct(lam_list_t *list) lam_list_construct(list); } - -/** - * Adds item to the end of the list. - * - * @param list List accepting new item (IN/OUT) - * - * @param item Item being put on the new list (IN) - * - */ -void lam_list_append(lam_list_t *list, lam_list_item_t *item) -{ - /* set new element's previous pointer */ - item->lam_list_prev=list->lam_list_tail.lam_list_prev; - - /* reset previous pointer on current last element */ - list->lam_list_tail.lam_list_prev->lam_list_next=item; - - /* reset new element's next pointer */ - item->lam_list_next=&(list->lam_list_tail); - - /* reset the list's tail element previous pointer */ - list->lam_list_tail.lam_list_prev = item; - - /* increment list element counter */ - list->lam_list_length++; -} - int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx) { /* Adds item to list at index and retains item. */ @@ -127,91 +100,3 @@ int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx) return 1; } -/* - * Adds item to the front of the list and retains item. - * - * @param list List accepting new item (IN/OUT) - * - * @param item Item being put on the new list (IN) - * - */ -void lam_list_prepend(lam_list_t *list, lam_list_item_t *item) -{ - /* reset item's next pointer */ - item->lam_list_next = list->lam_list_head.lam_list_next; - - /* reset item's previous pointer */ - item->lam_list_prev = &(list->lam_list_head); - - /* reset previous first element's previous poiner */ - list->lam_list_head.lam_list_next->lam_list_prev = item; - - /* reset head's next pointer */ - list->lam_list_head.lam_list_next = item; - - /* increment list element counter */ - list->lam_list_length++; -} - -lam_list_item_t *lam_list_remove_first(lam_list_t *list) -{ - /* Removes and returns first item on list. - Caller now owns the item and should release the item - when caller is done with it. - */ - volatile lam_list_item_t *item; - if ( 0 == list->lam_list_length ) - return (lam_list_item_t *)NULL; - - /* reset list length counter */ - list->lam_list_length--; - - /* get pointer to first element on the list */ - item = list->lam_list_head.lam_list_next; - - /* reset previous pointer of next item on the list */ - item->lam_list_next->lam_list_prev=item->lam_list_prev; - - /* reset the head next pointer */ - list->lam_list_head.lam_list_next=item->lam_list_next; - -#if LAM_ENABLE_DEBUG - /* debug code */ - item->lam_list_prev=(lam_list_item_t *)NULL; - item->lam_list_next=(lam_list_item_t *)NULL; -#endif - - return (lam_list_item_t *) item; -} - -lam_list_item_t *lam_list_remove_last(lam_list_t *list) -{ - /* Removes, releases and returns last item on list. - Caller now owns the item and should release the item - when caller is done with it. - */ - volatile lam_list_item_t *item; - - if ( 0 == list->lam_list_length ) - return (lam_list_item_t *)NULL; - - /* reset list length counter */ - list->lam_list_length--; - - /* get item */ - item = list->lam_list_tail.lam_list_prev; - - /* reset previous pointer on next to last pointer */ - item->lam_list_prev->lam_list_next=item->lam_list_next; - - /* reset tail's previous pointer */ - list->lam_list_tail.lam_list_prev=item->lam_list_prev; - -#if LAM_ENABLE_DEBUG - /* debug code */ - item->lam_list_next = item->lam_list_prev = (lam_list_item_t *)NULL; -#endif - - return (lam_list_item_t *) item; -} - diff --git a/src/lfc/lam_list.h b/src/lfc/lam_list.h index a3979eeeff..54d93274b1 100644 --- a/src/lfc/lam_list.h +++ b/src/lfc/lam_list.h @@ -162,7 +162,25 @@ extern "C" { /* * Adds item to the end of the list but does not retain item. */ - void lam_list_append(lam_list_t *list, lam_list_item_t *item); + + static inline void lam_list_append(lam_list_t *list, lam_list_item_t *item) + { + /* set new element's previous pointer */ + item->lam_list_prev=list->lam_list_tail.lam_list_prev; + + /* reset previous pointer on current last element */ + list->lam_list_tail.lam_list_prev->lam_list_next=item; + + /* reset new element's next pointer */ + item->lam_list_next=&(list->lam_list_tail); + + /* reset the list's tail element previous pointer */ + list->lam_list_tail.lam_list_prev = item; + + /* increment list element counter */ + list->lam_list_length++; + } + /* Adds item to list at index and retains item. Returns 1 if successful, 0 otherwise. @@ -170,25 +188,97 @@ extern "C" { Example: if idx = 2 and list = item1->item2->item3->item4, then after insert, list = item1->item2->item->item3->item4 */ - int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx); + int lam_list_insert(lam_list_t *list, lam_list_item_t *item, long long idx); /* * Adds item to the front of the list and retains item. */ - void lam_list_prepend(lam_list_t *list, lam_list_item_t *item); + static inline void lam_list_prepend(lam_list_t *list, lam_list_item_t *item) + { + /* reset item's next pointer */ + item->lam_list_next = list->lam_list_head.lam_list_next; + + /* reset item's previous pointer */ + item->lam_list_prev = &(list->lam_list_head); + + /* reset previous first element's previous poiner */ + list->lam_list_head.lam_list_next->lam_list_prev = item; + + /* reset head's next pointer */ + list->lam_list_head.lam_list_next = item; + + /* increment list element counter */ + list->lam_list_length++; + } + /* * Removes and returns first item on list. */ - lam_list_item_t *lam_list_remove_first(lam_list_t *list); + static inline lam_list_item_t *lam_list_remove_first(lam_list_t *list) + { + /* Removes and returns first item on list. + Caller now owns the item and should release the item + when caller is done with it. + */ + volatile lam_list_item_t *item; + if ( 0 == list->lam_list_length ) + return (lam_list_item_t *)NULL; + + /* reset list length counter */ + list->lam_list_length--; + /* get pointer to first element on the list */ + item = list->lam_list_head.lam_list_next; + + /* reset previous pointer of next item on the list */ + item->lam_list_next->lam_list_prev=item->lam_list_prev; + + /* reset the head next pointer */ + list->lam_list_head.lam_list_next=item->lam_list_next; + +#if LAM_ENABLE_DEBUG + /* debug code */ + item->lam_list_prev=(lam_list_item_t *)NULL; + item->lam_list_next=(lam_list_item_t *)NULL; +#endif + return (lam_list_item_t *) item; + } /* * Removes and returns last item on list. */ - lam_list_item_t *lam_list_remove_last(lam_list_t *list); + static inline lam_list_item_t *lam_list_remove_last(lam_list_t *list) + { + /* Removes, releases and returns last item on list. + Caller now owns the item and should release the item + when caller is done with it. + */ + volatile lam_list_item_t *item; + if ( 0 == list->lam_list_length ) + return (lam_list_item_t *)NULL; + + /* reset list length counter */ + list->lam_list_length--; + + /* get item */ + item = list->lam_list_tail.lam_list_prev; + + /* reset previous pointer on next to last pointer */ + item->lam_list_prev->lam_list_next=item->lam_list_next; + + /* reset tail's previous pointer */ + list->lam_list_tail.lam_list_prev=item->lam_list_prev; + +#if LAM_ENABLE_DEBUG + /* debug code */ + item->lam_list_next = item->lam_list_prev = (lam_list_item_t *)NULL; +#endif + return (lam_list_item_t *) item; + } + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/src/lfc/lam_pointer_array.c b/src/lfc/lam_pointer_array.c index e3a747d162..76c9fb269b 100644 --- a/src/lfc/lam_pointer_array.c +++ b/src/lfc/lam_pointer_array.c @@ -230,45 +230,3 @@ int lam_pointer_array_set_item(lam_pointer_array_t *table, size_t index, return LAM_SUCCESS; } -/** - * lookup pointer by index in pointer table - * - * @param table Pointer to lam_pointer_array_t object (IN) - * @param ptr Pointer to be added to table (IN) - * - * @return Pointer - */ -void *lam_pointer_array_get_item(lam_pointer_array_t *table, size_t index) -{ - void *p; - -#if 0 - lam_output(0,"lam_pointer_array_get_item: IN: " - " table %p (size %ld, lowest free %ld, number free %ld)" - " addr[%d] = %p\n", - table, table->size, table->lowest_free, table->number_free, - index, table->addr[index]); -#endif - - THREAD_LOCK(&(table->lock)); - - assert(table != NULL); - assert(table->addr != NULL); - assert(index >= 0); - assert(index < table->size); - - p = table->addr[index]; - - THREAD_UNLOCK(&(table->lock)); - -#if 0 - lam_output(0,"lam_pointer_array_get_item: OUT:" - " table %p (size %ld, lowest free %ld, number free %ld)" - " addr[%d] = %p\n", - table, table->size, table->lowest_free, table->number_free, - index, table->addr[index]); -#endif - return p; -} - - diff --git a/src/lfc/lam_pointer_array.h b/src/lfc/lam_pointer_array.h index 095fb35132..dc855401cb 100644 --- a/src/lfc/lam_pointer_array.h +++ b/src/lfc/lam_pointer_array.h @@ -64,7 +64,16 @@ int lam_pointer_array_set_item(lam_pointer_array_t *array, * * @return Error code. NULL indicates an error. */ -void *lam_pointer_array_get_item(lam_pointer_array_t *array, size_t index); + +static inline void *lam_pointer_array_get_item(lam_pointer_array_t *table, size_t index) +{ + void *p; + THREAD_LOCK(&(table->lock)); + p = table->addr[index]; + THREAD_UNLOCK(&(table->lock)); + return p; +} + /** * Get the size of the pointer array diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index 509a227de2..f212abb358 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -209,7 +209,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs) struct mca_ptl_t *ptl = ptl_proc->ptl; double weight; if(ptl->ptl_bandwidth) - weight = total_bandwidth / ptl_proc->ptl->ptl_bandwidth; + weight = ptl_proc->ptl->ptl_bandwidth / total_bandwidth; else weight = 1.0 / n_size; ptl_proc->ptl_weight = (int)(weight * 100); diff --git a/src/mca/pml/teg/src/pml_teg.h b/src/mca/pml/teg/src/pml_teg.h index 261a94416a..d4a3cfda0f 100644 --- a/src/mca/pml/teg/src/pml_teg.h +++ b/src/mca/pml/teg/src/pml_teg.h @@ -15,6 +15,7 @@ #include "request/request.h" #include "mca/pml/pml.h" #include "mca/pml/base/pml_base_request.h" +#include "mca/ptl/base/ptl_base_sendreq.h" #include "mca/ptl/ptl.h" #define MCA_PML_TEG_STATISTICS 0 @@ -223,5 +224,31 @@ extern int mca_pml_teg_free( lam_request_t** request ); +#define MCA_PML_TEG_FREE(request) \ +{ \ + mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \ + pml_request->req_free_called = true; \ + if(pml_request->req_pml_done == true) \ + { \ + switch(pml_request->req_type) { \ + case MCA_PML_REQUEST_SEND: \ + { \ + mca_ptl_base_send_request_t* sendreq = (mca_ptl_base_send_request_t*)pml_request; \ + mca_ptl_t* ptl = sendreq->req_owner; \ + ptl->ptl_request_return(ptl, sendreq); \ + break; \ + } \ + case MCA_PML_REQUEST_RECV: \ + { \ + LAM_FREE_LIST_RETURN(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)pml_request); \ + break; \ + } \ + default: \ + break; \ + } \ + } \ + *(request) = NULL; \ +} + #endif diff --git a/src/mca/pml/teg/src/pml_teg_free.c b/src/mca/pml/teg/src/pml_teg_free.c index 743fb9e7c8..9b4df2691b 100644 --- a/src/mca/pml/teg/src/pml_teg_free.c +++ b/src/mca/pml/teg/src/pml_teg_free.c @@ -4,31 +4,9 @@ #include "mca/ptl/base/ptl_base_sendreq.h" - int mca_pml_teg_free(lam_request_t** request) { - mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)request; - pml_request->req_free_called = true; - if(pml_request->req_pml_done == true) - { - switch(pml_request->req_type) { - case MCA_PML_REQUEST_SEND: - { - mca_ptl_base_send_request_t* sendreq = (mca_ptl_base_send_request_t*)pml_request; - mca_ptl_t* ptl = sendreq->req_owner; - ptl->ptl_request_return(ptl, sendreq); - break; - } - case MCA_PML_REQUEST_RECV: - { - lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)pml_request); - break; - } - default: - break; - } - } - *request = NULL; + MCA_PML_TEG_FREE(request); return LAM_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_irecv.c b/src/mca/pml/teg/src/pml_teg_irecv.c index 26133a8fd1..e3c98c8de6 100644 --- a/src/mca/pml/teg/src/pml_teg_irecv.c +++ b/src/mca/pml/teg/src/pml_teg_irecv.c @@ -11,11 +11,12 @@ int mca_pml_teg_irecv_init( struct lam_request_t **request) { int rc; - mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); + mca_ptl_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); if(NULL == recvreq) return rc; - mca_ptl_base_recv_request_init( + MCA_PTL_BASE_RECV_REQUEST_INIT( recvreq, addr, count, @@ -40,14 +41,15 @@ int mca_pml_teg_irecv( { int rc; - mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); + mca_ptl_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); #if MCA_PML_TEG_STATISTICS mca_pml_teg.teg_irecvs++; #endif if(NULL == recvreq) return rc; - mca_ptl_base_recv_request_init( + MCA_PTL_BASE_RECV_REQUEST_INIT( recvreq, addr, count, @@ -58,7 +60,7 @@ int mca_pml_teg_irecv( false); if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) { - mca_pml_teg_recv_request_return(recvreq); + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); return rc; } *request = (lam_request_t*)recvreq; @@ -76,14 +78,15 @@ int mca_pml_teg_recv( lam_status_public_t* status) { int rc, index; - mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); -#if MCA_PML_TEG_STATISTICS + mca_ptl_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); + #if MCA_PML_TEG_STATISTICS mca_pml_teg.teg_recvs++; -#endif + #endif if(NULL == recvreq) return rc; - - mca_ptl_base_recv_request_init( + + MCA_PTL_BASE_RECV_REQUEST_INIT( recvreq, addr, count, @@ -94,9 +97,27 @@ int mca_pml_teg_recv( false); if((rc = mca_pml_teg_recv_request_start(recvreq)) != LAM_SUCCESS) { - mca_pml_teg_recv_request_return(recvreq); + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); return rc; } - return mca_pml_teg_wait(1, (lam_request_t**)&recvreq, &index, status); + + if(recvreq->super.req_mpi_done == false) { + /* give up and sleep until completion */ + if(lam_using_threads()) { + lam_mutex_lock(&mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting++; + while(recvreq->super.req_mpi_done == false) + lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + } else { + mca_pml_teg.teg_request_waiting++; + while(recvreq->super.req_mpi_done == false) + lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + } + } + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); + return LAM_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_isend.c b/src/mca/pml/teg/src/pml_teg_isend.c index 50a6ce7b06..80eb6a7f64 100644 --- a/src/mca/pml/teg/src/pml_teg_isend.c +++ b/src/mca/pml/teg/src/pml_teg_isend.c @@ -19,11 +19,12 @@ int mca_pml_teg_isend_init( { int rc; - mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); + mca_ptl_base_send_request_t* sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); if(rc != LAM_SUCCESS) return rc; - mca_ptl_base_send_request_init( + MCA_PTL_BASE_SEND_REQUEST_INIT( sendreq, buf, count, @@ -51,13 +52,14 @@ int mca_pml_teg_isend( lam_request_t **request) { int rc; - mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); + mca_ptl_base_send_request_t* sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); #if MCA_PML_TEG_STATISTICS mca_pml_teg.teg_isends++; #endif if(rc != LAM_SUCCESS) return rc; - mca_ptl_base_send_request_init( + MCA_PTL_BASE_SEND_REQUEST_INIT( sendreq, buf, count, @@ -86,14 +88,15 @@ int mca_pml_teg_send( lam_communicator_t* comm) { int rc, index; - mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); + mca_ptl_base_send_request_t* sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); #if MCA_PML_TEG_STATISTICS mca_pml_teg.teg_sends++; #endif if(rc != LAM_SUCCESS) return rc; - mca_ptl_base_send_request_init( + MCA_PTL_BASE_SEND_REQUEST_INIT( sendreq, buf, count, @@ -105,8 +108,30 @@ int mca_pml_teg_send( false ); - if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) + if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) { + MCA_PML_TEG_FREE((lam_request_t**)&sendreq); return rc; - return mca_pml_teg_wait(1, (lam_request_t**)&sendreq, &index, NULL); + } + + if(sendreq->super.req_mpi_done == false) { + /* give up and sleep until completion */ + if(lam_using_threads()) { + lam_mutex_lock(&mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting++; + while(sendreq->super.req_mpi_done == false) + lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + } else { + mca_pml_teg.teg_request_waiting++; + while(sendreq->super.req_mpi_done == false) + lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + mca_pml_teg.teg_request_waiting--; + } + } + + /* return request to pool */ + MCA_PML_TEG_FREE((lam_request_t**)&sendreq); + return LAM_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.c b/src/mca/pml/teg/src/pml_teg_recvreq.c index 30a9528a48..77bb379dc3 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/pml/teg/src/pml_teg_recvreq.c @@ -6,7 +6,7 @@ void mca_pml_teg_recv_request_progress( mca_ptl_base_recv_request_t* req, mca_ptl_base_recv_frag_t* frag) { - lam_mutex_lock(&mca_pml_teg.teg_request_lock); + THREAD_LOCK(&mca_pml_teg.teg_request_lock); req->req_bytes_delivered += frag->super.frag_size; req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length; if (req->req_bytes_received >= req->req_bytes_msg) { @@ -24,6 +24,6 @@ void mca_pml_teg_recv_request_progress( lam_condition_broadcast(&mca_pml_teg.teg_request_cond); } } - lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); } diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.h b/src/mca/pml/teg/src/pml_teg_recvreq.h index 0914ba584b..4b70fab99c 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.h +++ b/src/mca/pml/teg/src/pml_teg_recvreq.h @@ -19,20 +19,20 @@ * @param rc (OUT) LAM_SUCCESS or error status on failure. * @return Receive request. */ -static inline mca_ptl_base_recv_request_t* mca_pml_teg_recv_request_alloc(int *rc) -{ - return (mca_ptl_base_recv_request_t*)lam_free_list_get(&mca_pml_teg.teg_recv_requests, rc); -} +#define MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc) \ + { \ + lam_list_item_t* item; \ + LAM_FREE_LIST_GET(&mca_pml_teg.teg_recv_requests, item, rc); \ + recvreq = (mca_ptl_base_recv_request_t*)item; \ + } /** * Return a recv request to the modules free list. * * @param request (IN) Receive request. */ -static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t* request) -{ - lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request); -} +#define MCA_PML_TEG_RECV_REQUEST_RETURN(request) \ + LAM_FREE_LIST_RETURN(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request); /** * Start an initialized request. diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.c b/src/mca/pml/teg/src/pml_teg_sendreq.c index 159546f760..22a478db84 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/pml/teg/src/pml_teg_sendreq.c @@ -47,9 +47,7 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req) * previously assigned) */ else { - bytes_to_frag = (ptl_proc->ptl_weight * req->req_bytes_msg) / 100; - if(bytes_to_frag > bytes_remaining) - bytes_to_frag = bytes_remaining; + bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100; } rc = ptl->ptl_send(ptl, ptl_proc->ptl_peer, req, bytes_to_frag, 0); @@ -59,12 +57,12 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req) /* unable to complete send - signal request failed */ if(bytes_remaining > 0) { - lam_mutex_lock(&mca_pml_teg.teg_request_lock); + THREAD_LOCK(&mca_pml_teg.teg_request_lock); req->super.req_mpi_done = true; /* FIX - set status correctly */ if(mca_pml_teg.teg_request_waiting) lam_condition_broadcast(&mca_pml_teg.teg_request_cond); - lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); } } @@ -73,7 +71,7 @@ void mca_pml_teg_send_request_progress( mca_ptl_base_send_request_t* req, mca_ptl_base_send_frag_t* frag) { - lam_mutex_lock(&mca_pml_teg.teg_request_lock); + THREAD_LOCK(&mca_pml_teg.teg_request_lock); req->req_bytes_sent += frag->super.frag_size; if (req->req_bytes_sent >= req->req_bytes_msg) { req->super.req_pml_done = true; @@ -87,11 +85,11 @@ void mca_pml_teg_send_request_progress( lam_condition_broadcast(&mca_pml_teg.teg_request_cond); } } else if (req->super.req_free_called) - mca_pml_teg_free((lam_request_t**)&req); - lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + MCA_PML_TEG_FREE((lam_request_t**)&req); + THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); return; } - lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); /* if first fragment - schedule remaining fragments */ if(req->req_frags == 1) { diff --git a/src/mca/pml/teg/src/pml_teg_sendreq.h b/src/mca/pml/teg/src/pml_teg_sendreq.h index 6753278189..7bd3326de4 100644 --- a/src/mca/pml/teg/src/pml_teg_sendreq.h +++ b/src/mca/pml/teg/src/pml_teg_sendreq.h @@ -16,30 +16,26 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req); -static inline mca_ptl_base_send_request_t* mca_pml_teg_send_request_alloc( - lam_communicator_t* comm, - int dst, - int *rc) -{ - mca_ptl_base_send_request_t* sendreq; - mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); - mca_ptl_proc_t* ptl_proc; - mca_ptl_t* ptl; - - THREAD_SCOPED_LOCK(&proc->proc_lock, - (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); - ptl = ptl_proc->ptl; - *rc = ptl->ptl_request_alloc(ptl,&sendreq); - if(NULL != sendreq) - sendreq->req_peer = ptl_proc->ptl_peer; - return sendreq; +#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \ + comm, \ + dst, \ + sendreq, \ + rc) \ +{ \ + mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \ + mca_ptl_proc_t* ptl_proc; \ + mca_ptl_t* ptl; \ +\ + THREAD_SCOPED_LOCK(&proc->proc_lock, \ + (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \ + ptl = ptl_proc->ptl; \ + rc = ptl->ptl_request_alloc(ptl,&sendreq); \ + if(NULL != sendreq) \ + sendreq->req_peer = ptl_proc->ptl_peer; \ } -static inline void mca_pml_teg_send_request_return( - mca_ptl_base_send_request_t* request) -{ +#define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \ request->req_owner->ptl_request_return(request->req_owner, request); -} static inline int mca_pml_teg_send_request_start( mca_ptl_base_send_request_t* req) diff --git a/src/mca/pml/teg/src/pml_teg_test.c b/src/mca/pml/teg/src/pml_teg_test.c index 555b88179e..9331a0946f 100644 --- a/src/mca/pml/teg/src/pml_teg_test.c +++ b/src/mca/pml/teg/src/pml_teg_test.c @@ -19,7 +19,7 @@ int mca_pml_teg_test( if (NULL != status) *status = pml_request->req_status; if(false == pml_request->req_persistent) - mca_pml_teg_free(requests+i); + MCA_PML_TEG_FREE(requests+i); } } @@ -60,7 +60,7 @@ int mca_pml_teg_test_all( } else { statuses[i] = pml_request->req_status; if(false == pml_request->req_persistent) - mca_pml_teg_free(requests+i); + MCA_PML_TEG_FREE(requests+i); } } } else { @@ -68,7 +68,7 @@ int mca_pml_teg_test_all( for(i=0; ireq_persistent) - mca_pml_teg_free(requests+i); + MCA_PML_TEG_FREE(requests+i); } } return LAM_SUCCESS; diff --git a/src/mca/pml/teg/src/pml_teg_wait.c b/src/mca/pml/teg/src/pml_teg_wait.c index 082cd64042..ddd15baeb8 100644 --- a/src/mca/pml/teg/src/pml_teg_wait.c +++ b/src/mca/pml/teg/src/pml_teg_wait.c @@ -35,7 +35,7 @@ int mca_pml_teg_wait( if(completed < 0) { /* give up and sleep until completion */ - lam_mutex_lock(&mca_pml_teg.teg_request_lock); + THREAD_LOCK(&mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting++; do { for(i=0; ireq_persistent) { - mca_pml_teg_free(request); + MCA_PML_TEG_FREE(request); } if (NULL != status) { *status = pml_request->req_status; @@ -87,7 +87,7 @@ int mca_pml_teg_wait_all( * acquire lock and test for completion - if all requests are not completed * pend on condition variable until a request completes */ - lam_mutex_lock(&mca_pml_teg.teg_request_lock); + THREAD_LOCK(&mca_pml_teg.teg_request_lock); mca_pml_teg.teg_request_waiting++; do { completed = 0; @@ -102,7 +102,7 @@ int mca_pml_teg_wait_all( lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); } while (completed != count); mca_pml_teg.teg_request_waiting--; - lam_mutex_unlock(&mca_pml_teg.teg_request_lock); + THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); } if(NULL != statuses) { @@ -114,7 +114,7 @@ int mca_pml_teg_wait_all( } else { statuses[i] = pml_request->req_status; if (false == pml_request->req_persistent) { - mca_pml_teg_free(&requests[i]); + MCA_PML_TEG_FREE(&requests[i]); } } } @@ -123,7 +123,7 @@ int mca_pml_teg_wait_all( for(i=0; ireq_persistent) { - mca_pml_teg_free(&requests[i]); + MCA_PML_TEG_FREE(&requests[i]); } } } diff --git a/src/mca/ptl/base/ptl_base_comm.h b/src/mca/ptl/base/ptl_base_comm.h index 61d6faa973..73076d31e8 100644 --- a/src/mca/ptl/base/ptl_base_comm.h +++ b/src/mca/ptl/base/ptl_base_comm.h @@ -53,9 +53,7 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size); static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst) { mca_ptl_sequence_t sequence; - lam_mutex_lock(&comm->c_matching_lock); - sequence = comm->c_msg_seq[dst]++; - lam_mutex_unlock(&comm->c_matching_lock); + THREAD_SCOPED_LOCK(&comm->c_matching_lock, sequence = comm->c_msg_seq[dst]++); return sequence; } diff --git a/src/mca/ptl/base/ptl_base_recvreq.h b/src/mca/ptl/base/ptl_base_recvreq.h index 494ee437c7..1cb02c8116 100644 --- a/src/mca/ptl/base/ptl_base_recvreq.h +++ b/src/mca/ptl/base/ptl_base_recvreq.h @@ -37,31 +37,31 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t; * @param comm (IN) Communicator. * @param persistent (IN) Is this a ersistent request. */ -static inline void mca_ptl_base_recv_request_init( - mca_ptl_base_recv_request_t *request, - void *addr, - size_t count, - lam_datatype_t* datatype, - int src, - int tag, - lam_communicator_t* comm, - bool persistent) -{ - request->req_bytes_msg = 0; - request->req_bytes_received = 0; - request->req_bytes_delivered = 0; - request->super.req_sequence = 0; - request->super.req_addr = addr; - request->super.req_count = count; - request->super.req_datatype = datatype; - request->super.req_peer = src; - request->super.req_tag = tag; - request->super.req_comm = comm; - request->super.req_proc = NULL; - request->super.req_persistent = persistent; - request->super.req_mpi_done = false; - request->super.req_pml_done = false; - request->super.req_free_called = false; +#define MCA_PTL_BASE_RECV_REQUEST_INIT( \ + request, \ + addr, \ + count, \ + datatype, \ + src, \ + tag, \ + comm, \ + persistent) \ +{ \ + request->req_bytes_msg = 0; \ + request->req_bytes_received = 0; \ + request->req_bytes_delivered = 0; \ + request->super.req_sequence = 0; \ + request->super.req_addr = addr; \ + request->super.req_count = count; \ + request->super.req_datatype = datatype; \ + request->super.req_peer = src; \ + request->super.req_tag = tag; \ + request->super.req_comm = comm; \ + request->super.req_proc = NULL; \ + request->super.req_persistent = persistent; \ + request->super.req_mpi_done = false; \ + request->super.req_pml_done = false; \ + request->super.req_free_called = false; \ } /** diff --git a/src/mca/ptl/base/ptl_base_sendreq.h b/src/mca/ptl/base/ptl_base_sendreq.h index 67657cabed..73bc35ea84 100644 --- a/src/mca/ptl/base/ptl_base_sendreq.h +++ b/src/mca/ptl/base/ptl_base_sendreq.h @@ -50,51 +50,51 @@ typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; * @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY) * @param persistent (IN) Is request persistent. */ -static inline void mca_ptl_base_send_request_init( - mca_ptl_base_send_request_t *request, - void *addr, - size_t count, - lam_datatype_t* datatype, - int peer, - int tag, - lam_communicator_t* comm, - mca_pml_base_send_mode_t mode, - bool persistent) -{ - request->req_offset = 0; - request->req_frags = 0; - request->req_bytes_sent = 0; - request->req_send_mode = mode; - request->req_peer_request.lval = 0; - request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); - request->super.req_addr = addr; - request->super.req_count = count; - request->super.req_datatype = datatype; - request->super.req_peer = peer; - request->super.req_tag = tag; - request->super.req_comm = comm; - request->super.req_proc = lam_comm_peer_lookup(comm,peer); - request->super.req_persistent = persistent; - request->super.req_mpi_done = false; - request->super.req_pml_done = false; - request->super.req_free_called = false; - - /* initialize datatype convertor for this request */ - if(count > 0) { - int packed_size; - lam_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); - lam_convertor_init_for_send( - &request->req_convertor, - 0, - request->super.req_datatype, - request->super.req_count, - request->super.req_addr, - 0); - lam_convertor_get_packed_size(&request->req_convertor, &packed_size); - request->req_bytes_msg = packed_size; - } else { - request->req_bytes_msg = 0; - } +#define MCA_PTL_BASE_SEND_REQUEST_INIT( \ + request, \ + addr, \ + count, \ + datatype, \ + peer, \ + tag, \ + comm, \ + mode,\ + persistent) \ +{ \ + request->req_offset = 0; \ + request->req_frags = 0; \ + request->req_bytes_sent = 0; \ + request->req_send_mode = mode; \ + request->req_peer_request.lval = 0; \ + request->super.req_sequence = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); \ + request->super.req_addr = addr; \ + request->super.req_count = count; \ + request->super.req_datatype = datatype; \ + request->super.req_peer = peer; \ + request->super.req_tag = tag; \ + request->super.req_comm = comm; \ + request->super.req_proc = lam_comm_peer_lookup(comm,peer); \ + request->super.req_persistent = persistent; \ + request->super.req_mpi_done = false; \ + request->super.req_pml_done = false; \ + request->super.req_free_called = false; \ +\ + /* initialize datatype convertor for this request */ \ + if(count > 0) { \ + int packed_size; \ + lam_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); \ + lam_convertor_init_for_send( \ + &request->req_convertor, \ + 0, \ + request->super.req_datatype, \ + request->super.req_count, \ + request->super.req_addr, \ + 0); \ + lam_convertor_get_packed_size(&request->req_convertor, &packed_size); \ + request->req_bytes_msg = packed_size; \ + } else { \ + request->req_bytes_msg = 0; \ + } \ } diff --git a/src/mca/ptl/tcp/src/ptl_tcp.c b/src/mca/ptl/tcp/src/ptl_tcp.c index b47023a288..0be9d26efc 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/ptl/tcp/src/ptl_tcp.c @@ -98,8 +98,9 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re { int rc; mca_ptl_base_send_request_t* sendreq; - sendreq = (mca_ptl_base_send_request_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_requests, &rc); - if(NULL != sendreq) + lam_list_item_t* item; + LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_requests, item, rc); + if(NULL != (sendreq = (mca_ptl_base_send_request_t*)item)) sendreq->req_owner = ptl; *request = sendreq; return rc; @@ -109,7 +110,7 @@ int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_re void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request) { /* OBJ_DESTRUCT(&request->req_convertor); */ - lam_free_list_return(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_requests, (lam_list_item_t*)request); } @@ -118,7 +119,7 @@ void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv if(frag->super.frag_is_buffered) free(frag->super.super.frag_addr); /* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */ - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); } @@ -130,7 +131,7 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send pending = (mca_ptl_tcp_recv_frag_t*)lam_list_remove_first(&mca_ptl_tcp_module.tcp_pending_acks); if(NULL == pending) { THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); - lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); return; } THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock); @@ -138,7 +139,7 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag); mca_ptl_tcp_recv_frag_return(ptl, pending); } else { - lam_free_list_return(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (lam_list_item_t*)frag); } } @@ -161,8 +162,9 @@ int mca_ptl_tcp_send( sendfrag = &((mca_ptl_tcp_send_request_t*)sendreq)->req_frag; } else { int rc; - sendfrag = (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, &rc); - if(NULL == sendfrag) + lam_list_item_t* item; + LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc); + if(NULL == (sendfrag = (mca_ptl_tcp_send_frag_t*)item)) return rc; } mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, size, flags); @@ -183,8 +185,12 @@ void mca_ptl_tcp_recv( mca_ptl_base_header_t* header = &frag->super.frag_header; if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { int rc; - mca_ptl_tcp_send_frag_t* ack = mca_ptl_tcp_send_frag_alloc(&rc); + mca_ptl_tcp_send_frag_t* ack; mca_ptl_tcp_recv_frag_t* recv_frag = (mca_ptl_tcp_recv_frag_t*)frag; + lam_list_item_t* item; + MCA_PTL_TCP_SEND_FRAG_ALLOC(item, rc); + ack = (mca_ptl_tcp_send_frag_t*)item; + if(NULL == ack) { THREAD_LOCK(&mca_ptl_tcp_module.tcp_lock); recv_frag->frag_ack_pending = true; diff --git a/src/mca/ptl/tcp/src/ptl_tcp_module.c b/src/mca/ptl/tcp/src/ptl_tcp_module.c index 494494905b..937cc3fea5 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_module.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_module.c @@ -178,14 +178,16 @@ int mca_ptl_tcp_module_close(void) * Create a ptl instance and add to modules list. */ -static int mca_ptl_tcp_create(int if_index) +static int mca_ptl_tcp_create(int if_index, const char* if_name) { mca_ptl_tcp_t* ptl = malloc(sizeof(mca_ptl_tcp_t)); + char param[256]; + char *value; if(NULL == ptl) return LAM_ERR_OUT_OF_RESOURCE; memcpy(ptl, &mca_ptl_tcp, sizeof(mca_ptl_tcp)); mca_ptl_tcp_module.tcp_ptls[mca_ptl_tcp_module.tcp_num_ptls++] = ptl; - + /* initialize the ptl */ ptl->ptl_ifindex = if_index; #if MCA_PTL_TCP_STATISTICS @@ -195,6 +197,19 @@ static int mca_ptl_tcp_create(int if_index) #endif lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->ptl_ifaddr, sizeof(ptl->ptl_ifaddr)); lam_ifindextomask(if_index, (struct sockaddr*)&ptl->ptl_ifmask, sizeof(ptl->ptl_ifmask)); + + /* allow user to specify interface bandwidth */ + sprintf(param, "bandwidth_%s", if_name); + ptl->super.ptl_bandwidth = mca_ptl_tcp_param_register_int(param, 0); + + /* allow user to override/specify latency ranking */ + sprintf(param, "latency_%s", if_name); + ptl->super.ptl_latency = mca_ptl_tcp_param_register_int(param, 0); + +#if LAM_ENABLE_DEBUG + lam_output(0,"interface: %s bandwidth %d latency %d\n", + if_name, ptl->super.ptl_bandwidth, ptl->super.ptl_latency); +#endif return LAM_SUCCESS; } @@ -232,8 +247,7 @@ static int mca_ptl_tcp_module_create_instances(void) if(if_index < 0) { lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name); } else { - lam_output(0,"interface: %s\n", if_name); - mca_ptl_tcp_create(if_index); + mca_ptl_tcp_create(if_index, if_name); } argv++; } @@ -258,8 +272,7 @@ static int mca_ptl_tcp_module_create_instances(void) } /* if this interface was not found in the excluded list - create a PTL */ if(argv == 0 || *argv == 0) { - lam_output(0,"interface: %s\n", if_name); - mca_ptl_tcp_create(if_index); + mca_ptl_tcp_create(if_index, if_name); } } lam_argv_free(exclude); @@ -282,6 +295,7 @@ static int mca_ptl_tcp_module_create_listen(void) lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno); return LAM_ERROR; } + mca_ptl_tcp_set_socket_options(mca_ptl_tcp_module.tcp_listen_sd); /* bind to all addresses and dynamically assigned port */ memset(&inaddr, 0, sizeof(inaddr)); @@ -307,7 +321,7 @@ static int mca_ptl_tcp_module_create_listen(void) lam_output(0, "mca_ptl_tcp_module_init: listen() failed with errno=%d", errno); return LAM_ERROR; } - + /* set socket up to be non-blocking, otherwise accept could block */ if((flags = fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_GETFL, 0)) < 0) { lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_GETFL) failed with errno=%d", errno); @@ -368,7 +382,7 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls, *num_ptls = 0; *allow_multi_user_threads = true; - *have_hidden_threads = true; + *have_hidden_threads = LAM_HAVE_THREADS; if((rc = lam_event_init()) != LAM_SUCCESS) { lam_output(0, "mca_ptl_tcp_module_init: unable to initialize event dispatch thread: %d\n", rc); diff --git a/src/mca/ptl/tcp/src/ptl_tcp_peer.c b/src/mca/ptl/tcp/src/ptl_tcp_peer.c index 10a2eac065..2b98ec0899 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_peer.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_peer.c @@ -165,7 +165,9 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t lam_list_append(&ptl_peer->peer_frags, (lam_list_item_t*)frag); } else { if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) { + THREAD_UNLOCK(&ptl_peer->peer_send_lock); mca_ptl_tcp_send_frag_progress(frag); + return rc; } else { ptl_peer->peer_send_frag = frag; lam_event_add(&ptl_peer->peer_send_event, 0); @@ -178,7 +180,6 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t } - /* * A blocking send on a non-blocking socket. Used to send the small amount of connection * information that identifies the peers endpoint. @@ -384,28 +385,28 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer) void mca_ptl_tcp_set_socket_options(int sd) { int optval; +#if defined(TCP_NODELAY) + optval = 1; + if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) < 0) { + lam_output(0, + "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELAY) failed with errno=%d\n", + errno); + } +#endif #if defined(SO_SNDBUF) if(mca_ptl_tcp_module.tcp_sndbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&mca_ptl_tcp_module.tcp_sndbuf, sizeof(int)) < 0) { - lam_output(0, "mca_ptl_tcp_set_socket_options: SO_SNDBUF option: errno %d\n", errno); + lam_output(0, + "mca_ptl_tcp_set_socket_options: SO_SNDBUF option: errno %d\n", + errno); } #endif #if defined(SO_RCVBUF) if(mca_ptl_tcp_module.tcp_rcvbuf > 0 && setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&mca_ptl_tcp_module.tcp_rcvbuf, sizeof(int)) < 0) { - lam_output(0, "mca_ptl_tcp_set_socket_options: SO_RCVBUF option: errno %d\n", errno); - } -#endif -#if defined(TCP_NODELAY) - optval = 1; - if(setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) < 0) { - lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELAY) failed with errno=%d\n", errno); - } -#endif -#if defined(TCP_NODELACK) - optval = 1; - if(setsockopt(sd, IPPROTO_TCP, TCP_NODELACK, &optval, sizeof(optval)) < 0) { - lam_output(0, "mca_ptl_tcp_set_socket_options: setsockopt(TCP_NODELACK) failed with errno=%d\n", errno); + lam_output(0, + "mca_ptl_tcp_set_socket_options: SO_RCVBUF option: errno %d\n", + errno); } #endif } @@ -532,7 +533,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user) mca_ptl_tcp_recv_frag_t* recv_frag = ptl_peer->peer_recv_frag; if(NULL == recv_frag) { int rc; - recv_frag = mca_ptl_tcp_recv_frag_alloc(&rc); + MCA_PTL_TCP_RECV_FRAG_ALLOC(recv_frag, rc); if(NULL == recv_frag) { PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock); return; @@ -581,7 +582,9 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user) } /* if required - update request status and release fragment */ + THREAD_UNLOCK(&ptl_peer->peer_send_lock); mca_ptl_tcp_send_frag_progress(frag); + THREAD_LOCK(&ptl_peer->peer_send_lock); /* progress any pending sends */ ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*) diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c index 13d693d2ce..a791880349 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -11,10 +11,6 @@ #include "ptl_tcp_sendfrag.h" -#define frag_header super.super.frag_header -#define frag_peer super.super.frag_peer -#define frag_owner super.super.frag_owner - static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag); @@ -46,12 +42,12 @@ static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag) void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer) { - frag->frag_owner = &peer->peer_ptl->super; - frag->super.frag_request = 0; + frag->super.super.frag_owner = &peer->peer_ptl->super; frag->super.super.frag_addr = NULL; frag->super.super.frag_size = 0; + frag->super.super.frag_peer = peer; + frag->super.frag_request = 0; frag->super.frag_is_buffered = false; - frag->frag_peer = peer; frag->frag_hdr_cnt = 0; frag->frag_msg_cnt = 0; frag->frag_ack_pending = false; @@ -66,7 +62,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false) return false; - switch(frag->frag_header.hdr_common.hdr_type) { + switch(frag->super.super.frag_header.hdr_common.hdr_type) { case MCA_PTL_HDR_TYPE_MATCH: return mca_ptl_tcp_recv_frag_match(frag, sd); case MCA_PTL_HDR_TYPE_FRAG: @@ -76,7 +72,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) return mca_ptl_tcp_recv_frag_ack(frag, sd); default: lam_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X", - *(unsigned long*)&frag->frag_header); + *(unsigned long*)&frag->super.super.frag_header); return false; } } @@ -85,12 +81,12 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size) { /* non-blocking read - continue if interrupted, otherwise wait until data available */ - unsigned char* ptr = (unsigned char*)&frag->frag_header; + unsigned char* ptr = (unsigned char*)&frag->super.super.frag_header; while(frag->frag_hdr_cnt < size) { int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } if(cnt < 0) { @@ -102,8 +98,8 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, return false; default: lam_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } } @@ -120,11 +116,11 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd) { mca_ptl_tcp_send_frag_t* sendfrag; mca_ptl_base_send_request_t* sendreq; - sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_header.hdr_ack.hdr_src_ptr.pval; + sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval; sendreq = sendfrag->super.frag_request; - sendreq->req_peer_request = frag->frag_header.hdr_ack.hdr_dst_ptr; - sendfrag->frag_owner->ptl_send_progress(sendreq, &sendfrag->super); - mca_ptl_tcp_recv_frag_return(frag->frag_owner, frag); + sendreq->req_peer_request = frag->super.super.frag_header.hdr_ack.hdr_dst_ptr; + mca_ptl_tcp_send_frag_progress(sendfrag); + mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag); return true; } @@ -134,13 +130,13 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) /* first pass through - attempt a match */ if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) { /* attempt to match a posted recv */ - if(mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match)) { + if(mca_ptl_base_recv_frag_match(&frag->super, &frag->super.super.frag_header.hdr_match)) { mca_ptl_tcp_recv_frag_matched(frag); } else { /* match was not made - so allocate buffer for eager send */ - if(frag->frag_header.hdr_frag.hdr_frag_length > 0) { - frag->super.super.frag_addr = malloc(frag->frag_header.hdr_frag.hdr_frag_length); - frag->super.super.frag_size = frag->frag_header.hdr_frag.hdr_frag_length; + if(frag->super.super.frag_header.hdr_frag.hdr_frag_length > 0) { + frag->super.super.frag_addr = malloc(frag->super.super.frag_header.hdr_frag.hdr_frag_length); + frag->super.super.frag_size = frag->super.super.frag_header.hdr_frag.hdr_frag_length; frag->super.frag_is_buffered = true; } } @@ -154,7 +150,7 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) } /* discard any data that exceeds the posted receive */ - if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) + if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) { return false; } @@ -170,7 +166,7 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* get request from header */ if(frag->frag_msg_cnt == 0) { - frag->super.frag_request = frag->frag_header.hdr_frag.hdr_dst_ptr.pval; + frag->super.frag_request = frag->super.super.frag_header.hdr_frag.hdr_dst_ptr.pval; mca_ptl_tcp_recv_frag_matched(frag); } @@ -180,7 +176,7 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; } - if(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) + if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) return false; @@ -201,8 +197,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt, frag->super.super.frag_size-frag->frag_msg_cnt, 0); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } if(cnt < 0) { @@ -213,8 +209,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; default: lam_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } } @@ -234,13 +230,14 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd) { - while(frag->frag_msg_cnt < frag->frag_header.hdr_frag.hdr_frag_length) { - void *rbuf = malloc(frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt); - int cnt = recv(sd, rbuf, frag->frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt, 0); + while(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) { + size_t count = frag->super.super.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt; + void *rbuf = malloc(count); + int cnt = recv(sd, rbuf, count, 0); free(rbuf); if(cnt == 0) { - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } if(cnt < 0) { @@ -252,8 +249,8 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; default: lam_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno); - mca_ptl_tcp_peer_close(frag->frag_peer); - lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); + mca_ptl_tcp_peer_close(frag->super.super.frag_peer); + LAM_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); return false; } } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h index 42142bedff..4b071b4fa1 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.h @@ -34,10 +34,13 @@ struct mca_ptl_tcp_recv_frag_t { typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; -static inline mca_ptl_tcp_recv_frag_t* mca_ptl_tcp_recv_frag_alloc(int* rc) -{ - return (mca_ptl_tcp_recv_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_recv_frags, rc); -} +#define MCA_PTL_TCP_RECV_FRAG_ALLOC(frag, rc) \ + { \ + lam_list_item_t* item; \ + LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_recv_frags, item, rc); \ + frag = (mca_ptl_tcp_recv_frag_t*)item; \ + } + bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t*, int sd); void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, struct mca_ptl_base_peer_t* peer); @@ -88,43 +91,50 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag) } -static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag) -{ - if(frag->frag_msg_cnt >= frag->super.super.frag_header.hdr_frag.hdr_frag_length) { - if(fetchNset(&frag->frag_progressed, 1) == 0) { - mca_ptl_base_recv_request_t* request = frag->super.frag_request; - - if(frag->super.frag_is_buffered) { - mca_ptl_base_match_header_t* header = &frag->super.super.frag_header.hdr_match; - - /* - * Initialize convertor and use it to unpack data - */ - struct iovec iov; - lam_proc_t *proc = - lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer); - lam_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor); - lam_convertor_init_for_recv( - &frag->super.super.frag_convertor, /* convertor */ - 0, /* flags */ - request->super.req_datatype, /* datatype */ - request->super.req_count, /* count elements */ - request->super.req_addr, /* users buffer */ - header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */ - - - iov.iov_base = frag->super.super.frag_addr; - iov.iov_len = frag->super.super.frag_size; - lam_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1); - } - - /* progress the request */ - frag->super.super.frag_owner->ptl_recv_progress(request, &frag->super); - if(frag->frag_ack_pending == false) { - mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag); - } +static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag) +{ + if((frag)->frag_msg_cnt >= (frag)->super.super.frag_header.hdr_frag.hdr_frag_length) { + mca_ptl_base_recv_request_t* request = (frag)->super.frag_request; + + /* make sure this only happens once for threaded case */ + if(lam_using_threads()) { + if(fetchNset(&(frag)->frag_progressed, 1) == 1) + return; + } else { + if((frag)->frag_progressed == 1) + return; + (frag)->frag_progressed = 1; } - } + + if((frag)->super.frag_is_buffered) { + mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match; + + /* + * Initialize convertor and use it to unpack data + */ + struct iovec iov; + lam_proc_t *proc = + lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer); + lam_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor); + lam_convertor_init_for_recv( + &(frag)->super.super.frag_convertor, /* convertor */ + 0, /* flags */ + request->super.req_datatype, /* datatype */ + request->super.req_count, /* count elements */ + request->super.req_addr, /* users buffer */ + header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */ + + iov.iov_base = (frag)->super.super.frag_addr; + iov.iov_len = (frag)->super.super.frag_size; + lam_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1); + } + + /* progress the request */ + (frag)->super.super.frag_owner->ptl_recv_progress(request, &(frag)->super); + if((frag)->frag_ack_pending == false) { + mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag)); + } + } } #endif diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c index 99bc1cc0ee..3d088e7dcf 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -131,6 +131,7 @@ int mca_ptl_tcp_send_frag_init( sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2; sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr; sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t); + sendfrag->frag_progressed = 0; return LAM_SUCCESS; } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h index 9b7a618def..9f04826f92 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.h @@ -28,14 +28,13 @@ struct mca_ptl_tcp_send_frag_t { struct iovec *frag_vec_ptr; /**< pointer into iovec array */ size_t frag_vec_cnt; /**< number of iovec structs left to process */ struct iovec frag_vec[2]; /**< array of iovecs for send */ + volatile int frag_progressed; /**< for threaded case - has request status been updated */ }; typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t; -static inline mca_ptl_tcp_send_frag_t* mca_ptl_tcp_send_frag_alloc(int* rc) -{ - return (mca_ptl_tcp_send_frag_t*)lam_free_list_get(&mca_ptl_tcp_module.tcp_send_frags, rc); -} +#define MCA_PTL_TCP_SEND_FRAG_ALLOC(item, rc) \ + LAM_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_frags, item, rc); bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd); @@ -49,26 +48,45 @@ int mca_ptl_tcp_send_frag_init( int flags); -static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag) -{ - mca_ptl_base_send_request_t* request = frag->super.frag_request; +/* + * For fragments that require an acknowledgment, this routine will be called + * twice, once when the send completes, and again when the acknowledgment is + * returned. Only the last caller should update the request status, so we + * add a lock w/ the frag_progressed flag. + */ - /* if this is an ack - simply return to pool */ - if(request == NULL) { - mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); +static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag) +{ + mca_ptl_base_send_request_t* request = frag->super.frag_request; - /* otherwise, if an ack is not required or has already been received, update request status */ - } else if ((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 || - mca_ptl_base_send_request_matched(request) == true) { - frag->super.super.frag_owner->ptl_send_progress(request, &frag->super); + /* if this is an ack - simply return to pool */ + if(request == NULL) { + mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); + + /* otherwise, if the message has been sent, and an ack has already been + * received, go ahead and update the request status + */ + } else if (frag->frag_vec_cnt == 0 && + ((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 || + mca_ptl_base_send_request_matched(request))) { + + /* make sure this only happens once in threaded case */ + if (lam_using_threads() && fetchNset(&frag->frag_progressed, 1) == 1) { + return; + } else { + frag->frag_progressed = 1; + } + + /* update request status */ + frag->super.super.frag_owner->ptl_send_progress(request, &frag->super); /* the first fragment is allocated with the request, - * all others need to be returned to free list - */ - if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0) - mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); - } -} + * all others need to be returned to free list + */ + if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0) + mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag); + } +} static inline void mca_ptl_tcp_send_frag_init_ack( diff --git a/src/mem/free_list.h b/src/mem/free_list.h index d5aaa5ada5..2b5352651e 100644 --- a/src/mem/free_list.h +++ b/src/mem/free_list.h @@ -40,27 +40,29 @@ int lam_free_list_init( int lam_free_list_grow(lam_free_list_t* flist, size_t num_elements); -static inline lam_list_item_t *lam_free_list_get(lam_free_list_t * fl, int *rc) -{ - lam_list_item_t* item; - THREAD_LOCK(&fl->fl_lock); - item = lam_list_remove_first(&fl->super); - if(NULL == item) { - lam_free_list_grow(fl, fl->fl_num_per_alloc); - item = lam_list_remove_first(&fl->super); - } - THREAD_UNLOCK(&fl->fl_lock); - *rc = (NULL != item) ? LAM_SUCCESS : LAM_ERR_TEMP_OUT_OF_RESOURCE; - return item; -} +#define LAM_FREE_LIST_GET(fl, item, rc) \ +{ \ + if(lam_using_threads()) { \ + lam_mutex_lock(&((fl)->fl_lock)); \ + item = lam_list_remove_first(&((fl)->super)); \ + if(NULL == item) { \ + lam_free_list_grow((fl), (fl)->fl_num_per_alloc); \ + item = lam_list_remove_first(&((fl)->super)); \ + } \ + lam_mutex_unlock(&((fl)->fl_lock)); \ + } else { \ + item = lam_list_remove_first(&((fl)->super)); \ + if(NULL == item) { \ + lam_free_list_grow((fl), (fl)->fl_num_per_alloc); \ + item = lam_list_remove_first(&((fl)->super)); \ + } \ + } \ + rc = (NULL == item) ? LAM_ERR_TEMP_OUT_OF_RESOURCE : LAM_SUCCESS; \ +} -static inline int lam_free_list_return(lam_free_list_t *fl, lam_list_item_t *item) -{ - THREAD_LOCK(&fl->fl_lock); - lam_list_append(&fl->super, item); - THREAD_UNLOCK(&fl->fl_lock); - return LAM_SUCCESS; -} + +#define LAM_FREE_LIST_RETURN(fl, item) \ + THREAD_SCOPED_LOCK(&((fl)->fl_lock), lam_list_append(&((fl)->super), (item))); #endif diff --git a/src/threads/condition_spinlock.h b/src/threads/condition_spinlock.h index 417b3df245..c97c947b08 100644 --- a/src/threads/condition_spinlock.h +++ b/src/threads/condition_spinlock.h @@ -5,6 +5,7 @@ #define LAM_CONDITION_SPINLOCK_H #include "threads/condition.h" +#include "threads/mutex.h" #include "threads/mutex_spinlock.h" #include "runtime/lam_progress.h" @@ -21,10 +22,16 @@ OBJ_CLASS_DECLARATION(lam_condition_t); static inline int lam_condition_wait(lam_condition_t* c, lam_mutex_t* m) { c->c_waiting++; - while(c->c_signaled == 0) { - lam_mutex_unlock(m); - lam_progress(); - lam_mutex_lock(m); + if(lam_using_threads()) { + while(c->c_signaled == 0) { + lam_mutex_unlock(m); + lam_progress(); + lam_mutex_lock(m); + } + } else { + while(c->c_signaled == 0) { + lam_progress(); + } } c->c_signaled--; c->c_waiting--;