diff --git a/src/include/mpi.h b/src/include/mpi.h index c75db317aa..d45ad3048d 100644 --- a/src/include/mpi.h +++ b/src/include/mpi.h @@ -40,6 +40,7 @@ struct lam_status_public_t { int MPI_SOURCE; int MPI_TAG; int MPI_ERROR; + int MPI_LENGTH; }; typedef struct lam_status_public_t lam_status_public_t; diff --git a/src/lam/event/Makefile.am b/src/lam/event/Makefile.am index 5c275b3e93..7e6cccdeb0 100644 --- a/src/lam/event/Makefile.am +++ b/src/lam/event/Makefile.am @@ -37,7 +37,7 @@ libevent_la_DEPENDENCIES = $(lt_libobj) include_HEADERS = event.h # LAM: Make it work in a VPATH environment -INCLUDES = -I$(top_srcdir)/compat +INCLUDES = -I$(top_srcdir)/compat -I$(top_srcdir)/../.. -I$(top_srcdir)/../../include man_MANS = event.3 diff --git a/src/lam/event/Makefile.in b/src/lam/event/Makefile.in index 9ac2f03e78..6074b5daf5 100644 --- a/src/lam/event/Makefile.in +++ b/src/lam/event/Makefile.in @@ -222,7 +222,7 @@ libevent_la_DEPENDENCIES = $(lt_libobj) include_HEADERS = event.h # LAM: Make it work in a VPATH environment -INCLUDES = -I$(top_srcdir)/compat +INCLUDES = -I$(top_srcdir)/compat -I$(top_srcdir)/../.. -I$(top_srcdir)/../../include man_MANS = event.3 DISTCLEANFILES = *~ all: config.h diff --git a/src/lam/event/event.c b/src/lam/event/event.c index 546ffcbf59..18bc7071ec 100644 --- a/src/lam/event/event.c +++ b/src/lam/event/event.c @@ -1,4 +1,4 @@ -/* $OpenBSD: event.c,v 1.2 2002/06/25 15:50:15 mickey Exp $ */ +/* $OpenBSD: event.c,v 1.2 2002/06/25 15:50:15 mickey Exp $ */ /* * Copyright 2000-2002 Niels Provos @@ -58,10 +58,14 @@ #include "log.h" #else #define LOG_DBG(x) -#define log_error(x) perror(x) +#define log_error(x) perror(x) #endif #include "event.h" +#include "lam/types.h" +#include "lam/lfc/lam_object.h" +#include "lam/threads/mutex.h" +#include "lam/util/output.h" #ifdef HAVE_SELECT extern const struct lam_eventop lam_selectops; @@ -85,38 +89,41 @@ extern const struct lam_eventop lam_win32ops; /* In order of preference */ static const struct lam_eventop *lam_eventops[] = { #ifdef HAVE_WORKING_KQUEUE - &lam_kqops, + &lam_kqops, #endif #ifdef HAVE_EPOLL - &lam_epollops, + &lam_epollops, #endif #ifdef HAVE_RTSIG - &lam_rtsigops, + &lam_rtsigops, #endif #ifdef HAVE_POLL - &lam_pollops, + &lam_pollops, #endif #ifdef HAVE_SELECT - &lam_selectops, + &lam_selectops, #endif #ifdef WIN32 - &lam_win32ops, + &lam_win32ops, #endif - NULL + NULL }; const struct lam_eventop *lam_evsel; void *lam_evbase; /* Handle signals */ -int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */ -int lam_event_gotsig; /* Set in signal handler */ +int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */ +int lam_event_gotsig; /* Set in signal handler */ /* Prototypes */ static void lam_event_queue_insert(struct lam_event *, int); static void lam_event_queue_remove(struct lam_event *, int); static int lam_event_haveevents(void); static void lam_event_process_active(void); +static int lam_timeout_next(struct timeval *tv); +static void lam_timeout_correct(struct timeval *off); +static void lam_timeout_process(void); static void lam_timeout_insert(struct lam_event *); static RB_HEAD(lam_event_tree, lam_event) lam_timetree; @@ -124,15 +131,16 @@ static struct lam_event_list lam_activequeue; struct lam_event_list lam_signalqueue; struct lam_event_list lam_eventqueue; static struct timeval lam_event_tv; +lam_mutex_t lam_event_lock; static int compare(struct lam_event *a, struct lam_event *b) { - if (timercmp(&a->ev_timeout, &b->ev_timeout, <)) - return (-1); - else if (timercmp(&a->ev_timeout, &b->ev_timeout, >)) - return (1); - return (0); + if (timercmp(&a->ev_timeout, &b->ev_timeout, <)) + return (-1); + else if (timercmp(&a->ev_timeout, &b->ev_timeout, >)) + return (1); + return (0); } static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare); @@ -140,154 +148,160 @@ static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare); static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare); + void lam_event_init(void) { - int i; + int i; - lam_event_sigcb = NULL; - lam_event_gotsig = 0; - gettimeofday(&lam_event_tv, NULL); - - RB_INIT(&lam_timetree); - TAILQ_INIT(&lam_eventqueue); - TAILQ_INIT(&lam_activequeue); - TAILQ_INIT(&lam_signalqueue); - - lam_evbase = NULL; - for (i = 0; lam_eventops[i] && !lam_evbase; i++) { - lam_evsel = lam_eventops[i]; - lam_evbase = lam_evsel->init(); - } + lam_event_sigcb = NULL; + lam_event_gotsig = 0; + gettimeofday(&lam_event_tv, NULL); + + OBJ_CONSTRUCT(&lam_event_lock, lam_mutex_t); + RB_INIT(&lam_timetree); + TAILQ_INIT(&lam_eventqueue); + TAILQ_INIT(&lam_activequeue); + TAILQ_INIT(&lam_signalqueue); + + lam_evbase = NULL; + for (i = 0; lam_eventops[i] && !lam_evbase; i++) { + lam_evsel = lam_eventops[i]; + lam_evbase = lam_evsel->init(); + } - if (lam_evbase == NULL) - errx(1, "%s: no event mechanism available", __func__); + if (lam_evbase == NULL) + errx(1, "%s: no event mechanism available", __func__); - if (getenv("EVENT_SHOW_METHOD")) - fprintf(stderr, "libevent using: %s\n", lam_evsel->name); + if (getenv("EVENT_SHOW_METHOD")) + fprintf(stderr, "libevent using: %s\n", lam_evsel->name); #if defined(USE_LOG) && defined(USE_DEBUG) - log_to(stderr); - log_debug_cmd(LOG_MISC, 80); + log_to(stderr); + log_debug_cmd(LOG_MISC, 80); #endif } static int lam_event_haveevents(void) { - return (RB_ROOT(&lam_timetree) || TAILQ_FIRST(&lam_eventqueue) || - TAILQ_FIRST(&lam_signalqueue) || TAILQ_FIRST(&lam_activequeue)); + return (RB_ROOT(&lam_timetree) || TAILQ_FIRST(&lam_eventqueue) || + TAILQ_FIRST(&lam_signalqueue) || TAILQ_FIRST(&lam_activequeue)); } static void lam_event_process_active(void) { - struct lam_event *ev; - short ncalls; + struct lam_event *ev; + short ncalls; - for (ev = TAILQ_FIRST(&lam_activequeue); ev; - ev = TAILQ_FIRST(&lam_activequeue)) { - lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); - - /* Allows deletes to work */ - ncalls = ev->ev_ncalls; - ev->ev_pncalls = &ncalls; - while (ncalls) { - ncalls--; - ev->ev_ncalls = ncalls; - (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); - } - } + for (ev = TAILQ_FIRST(&lam_activequeue); ev; + ev = TAILQ_FIRST(&lam_activequeue)) { + lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); + + /* Allows deletes to work */ + ncalls = ev->ev_ncalls; + ev->ev_pncalls = &ncalls; + while (ncalls) { + ncalls--; + ev->ev_ncalls = ncalls; + lam_mutex_unlock(&lam_event_lock); + (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); + lam_mutex_lock(&lam_event_lock); + } + } } int lam_event_dispatch(void) { - return (lam_event_loop(0)); + return (lam_event_loop(0)); } int lam_event_loop(int flags) { - struct timeval tv; - int res, done; + struct timeval tv; + int res, done; - /* Calculate the initial events that we are waiting for */ - if (lam_evsel->recalc(lam_evbase, 0) == -1) - return (-1); + /* Calculate the initial events that we are waiting for */ + if (lam_evsel->recalc(lam_evbase, 0) == -1) { + lam_output(0, "lam_event_loop: lam_evsel->recalc() failed."); + return (-1); + } - done = 0; - while (!done) { - while (lam_event_gotsig) { - lam_event_gotsig = 0; - if (lam_event_sigcb) { - res = (*lam_event_sigcb)(); - if (res == -1) { - errno = EINTR; - return (-1); - } - } - } + done = 0; + while (!done) { + while (lam_event_gotsig) { + lam_event_gotsig = 0; + if (lam_event_sigcb) { + res = (*lam_event_sigcb)(); + if (res == -1) { + lam_output(0, "lam_event_loop: lam_event_sigcb() failed."); + errno = EINTR; + return (-1); + } + } + } - /* Check if time is running backwards */ - gettimeofday(&tv, NULL); - if (timercmp(&tv, &lam_event_tv, <)) { - struct timeval off; - LOG_DBG((LOG_MISC, 10, - "%s: time is running backwards, corrected", - __func__)); + /* Check if time is running backwards */ + gettimeofday(&tv, NULL); + if (timercmp(&tv, &lam_event_tv, <)) { + struct timeval off; + LOG_DBG((LOG_MISC, 10, + "%s: time is running backwards, corrected", + __func__)); - timersub(&lam_event_tv, &tv, &off); - lam_timeout_correct(&off); - } - lam_event_tv = tv; + timersub(&lam_event_tv, &tv, &off); + lam_timeout_correct(&off); + } + lam_event_tv = tv; - if (!(flags & LAM_EVLOOP_NONBLOCK)) - lam_timeout_next(&tv); - else - timerclear(&tv); - - /* If we have no events, we just exit */ - if (!lam_event_haveevents()) - return (1); + if (!(flags & LAM_EVLOOP_NONBLOCK)) + lam_timeout_next(&tv); + else + timerclear(&tv); + + res = lam_evsel->dispatch(lam_evbase, &tv); + if (res == -1) { + lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed."); + return (-1); + } - res = lam_evsel->dispatch(lam_evbase, &tv); + lam_timeout_process(); - if (res == -1) - return (-1); + if (TAILQ_FIRST(&lam_activequeue)) { + lam_event_process_active(); + if (flags & LAM_EVLOOP_ONCE) + done = 1; + } else if (flags & LAM_EVLOOP_NONBLOCK) + done = 1; - lam_timeout_process(); - - if (TAILQ_FIRST(&lam_activequeue)) { - lam_event_process_active(); - if (flags & LAM_EVLOOP_ONCE) - done = 1; - } else if (flags & LAM_EVLOOP_NONBLOCK) - done = 1; - - if (lam_evsel->recalc(lam_evbase, 0) == -1) - return (-1); - } - - return (0); + if (lam_evsel->recalc(lam_evbase, 0) == -1) { + lam_output(0, "lam_event_loop: lam_evesel->recalc() failed."); + return (-1); + } + } + lam_output(0, "lam_event_loop: done"); + return (0); } void lam_event_set(struct lam_event *ev, int fd, short events, - void (*callback)(int, short, void *), void *arg) + void (*callback)(int, short, void *), void *arg) { - ev->ev_callback = callback; - ev->ev_arg = arg; + ev->ev_callback = callback; + ev->ev_arg = arg; #ifdef WIN32 - ev->ev_fd = (HANDLE)fd; - ev->overlap.hEvent = ev; + ev->ev_fd = (HANDLE)fd; + ev->overlap.hEvent = ev; #else - ev->ev_fd = fd; + ev->ev_fd = fd; #endif - ev->ev_events = events; - ev->ev_flags = LAM_EVLIST_INIT; - ev->ev_ncalls = 0; - ev->ev_pncalls = NULL; + ev->ev_events = events; + ev->ev_flags = LAM_EVLIST_INIT; + ev->ev_ncalls = 0; + ev->ev_pncalls = NULL; } /* @@ -297,270 +311,297 @@ lam_event_set(struct lam_event *ev, int fd, short events, int lam_event_pending(struct lam_event *ev, short event, struct timeval *tv) { - int flags = 0; + int flags = 0; - if (ev->ev_flags & LAM_EVLIST_INSERTED) - flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)); - if (ev->ev_flags & LAM_EVLIST_ACTIVE) - flags |= ev->ev_res; - if (ev->ev_flags & LAM_EVLIST_TIMEOUT) - flags |= LAM_EV_TIMEOUT; + if (ev->ev_flags & LAM_EVLIST_INSERTED) + flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)); + if (ev->ev_flags & LAM_EVLIST_ACTIVE) + flags |= ev->ev_res; + if (ev->ev_flags & LAM_EVLIST_TIMEOUT) + flags |= LAM_EV_TIMEOUT; - event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE); + event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE); - /* See if there is a timeout that we should report */ - if (tv != NULL && (flags & event & LAM_EV_TIMEOUT)) - *tv = ev->ev_timeout; + /* See if there is a timeout that we should report */ + if (tv != NULL && (flags & event & LAM_EV_TIMEOUT)) + *tv = ev->ev_timeout; - return (flags & event); + return (flags & event); +} + +int +lam_event_add_i(struct lam_event *ev, struct timeval *tv) +{ + LOG_DBG((LOG_MISC, 55, + "event_add: event: %p, %s%s%scall %p", + ev, + ev->ev_events & LAM_EV_READ ? "LAM_EV_READ " : " ", + ev->ev_events & LAM_EV_WRITE ? "LAM_EV_WRITE " : " ", + tv ? "LAM_EV_TIMEOUT " : " ", + ev->ev_callback)); + + assert(!(ev->ev_flags & ~LAM_EVLIST_ALL)); + + if (tv != NULL) { + struct timeval now; + + if (ev->ev_flags & LAM_EVLIST_TIMEOUT) + lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); + + /* Check if it is active due to a timeout. Rescheduling + * this timeout before the callback can be executed + * removes it from the active list. */ + if ((ev->ev_flags & LAM_EVLIST_ACTIVE) && + (ev->ev_res & LAM_EV_TIMEOUT)) { + /* See if we are just active executing this + * event in a loop + */ + if (ev->ev_ncalls && ev->ev_pncalls) { + /* Abort loop */ + *ev->ev_pncalls = 0; + } + + lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); + } + + gettimeofday(&now, NULL); + timeradd(&now, tv, &ev->ev_timeout); + + LOG_DBG((LOG_MISC, 55, + "event_add: timeout in %d seconds, call %p", + tv->tv_sec, ev->ev_callback)); + + lam_event_queue_insert(ev, LAM_EVLIST_TIMEOUT); + } + + if ((ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)) && + !(ev->ev_flags & (LAM_EVLIST_INSERTED|LAM_EVLIST_ACTIVE))) { + lam_event_queue_insert(ev, LAM_EVLIST_INSERTED); + return (lam_evsel->add(lam_evbase, ev)); + } else if ((ev->ev_events & LAM_EV_SIGNAL) && + !(ev->ev_flags & LAM_EVLIST_SIGNAL)) { + lam_event_queue_insert(ev, LAM_EVLIST_SIGNAL); + return (lam_evsel->add(lam_evbase, ev)); + } + return (0); } int lam_event_add(struct lam_event *ev, struct timeval *tv) { - LOG_DBG((LOG_MISC, 55, - "event_add: event: %p, %s%s%scall %p", - ev, - ev->ev_events & LAM_EV_READ ? "LAM_EV_READ " : " ", - ev->ev_events & LAM_EV_WRITE ? "LAM_EV_WRITE " : " ", - tv ? "LAM_EV_TIMEOUT " : " ", - ev->ev_callback)); - - assert(!(ev->ev_flags & ~LAM_EVLIST_ALL)); - - if (tv != NULL) { - struct timeval now; - - if (ev->ev_flags & LAM_EVLIST_TIMEOUT) - lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); - - /* Check if it is active due to a timeout. Rescheduling - * this timeout before the callback can be executed - * removes it from the active list. */ - if ((ev->ev_flags & LAM_EVLIST_ACTIVE) && - (ev->ev_res & LAM_EV_TIMEOUT)) { - /* See if we are just active executing this - * event in a loop - */ - if (ev->ev_ncalls && ev->ev_pncalls) { - /* Abort loop */ - *ev->ev_pncalls = 0; - } - - lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); - } - - gettimeofday(&now, NULL); - timeradd(&now, tv, &ev->ev_timeout); - - LOG_DBG((LOG_MISC, 55, - "event_add: timeout in %d seconds, call %p", - tv->tv_sec, ev->ev_callback)); - - lam_event_queue_insert(ev, LAM_EVLIST_TIMEOUT); - } - - if ((ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)) && - !(ev->ev_flags & (LAM_EVLIST_INSERTED|LAM_EVLIST_ACTIVE))) { - lam_event_queue_insert(ev, LAM_EVLIST_INSERTED); - - return (lam_evsel->add(lam_evbase, ev)); - } else if ((ev->ev_events & LAM_EV_SIGNAL) && - !(ev->ev_flags & LAM_EVLIST_SIGNAL)) { - lam_event_queue_insert(ev, LAM_EVLIST_SIGNAL); - - return (lam_evsel->add(lam_evbase, ev)); - } - - return (0); + int rc; + lam_mutex_lock(&lam_event_lock); + rc = lam_event_add_i(ev, tv); + lam_mutex_unlock(&lam_event_lock); + return rc; } -int -lam_event_del(struct lam_event *ev) +int lam_event_del_i(struct lam_event *ev) { - LOG_DBG((LOG_MISC, 80, "event_del: %p, callback %p", - ev, ev->ev_callback)); + LOG_DBG((LOG_MISC, 80, "event_del: %p, callback %p", + ev, ev->ev_callback)); - assert(!(ev->ev_flags & ~LAM_EVLIST_ALL)); + assert(!(ev->ev_flags & ~LAM_EVLIST_ALL)); - /* See if we are just active executing this event in a loop */ - if (ev->ev_ncalls && ev->ev_pncalls) { - /* Abort loop */ - *ev->ev_pncalls = 0; - } + /* See if we are just active executing this event in a loop */ + if (ev->ev_ncalls && ev->ev_pncalls) { + /* Abort loop */ + *ev->ev_pncalls = 0; + } - if (ev->ev_flags & LAM_EVLIST_TIMEOUT) - lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); + if (ev->ev_flags & LAM_EVLIST_TIMEOUT) + lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); - if (ev->ev_flags & LAM_EVLIST_ACTIVE) - lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); + if (ev->ev_flags & LAM_EVLIST_ACTIVE) + lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); - if (ev->ev_flags & LAM_EVLIST_INSERTED) { - lam_event_queue_remove(ev, LAM_EVLIST_INSERTED); - return (lam_evsel->del(lam_evbase, ev)); - } else if (ev->ev_flags & LAM_EVLIST_SIGNAL) { - lam_event_queue_remove(ev, LAM_EVLIST_SIGNAL); - return (lam_evsel->del(lam_evbase, ev)); - } + if (ev->ev_flags & LAM_EVLIST_INSERTED) { + lam_event_queue_remove(ev, LAM_EVLIST_INSERTED); + return (lam_evsel->del(lam_evbase, ev)); + } else if (ev->ev_flags & LAM_EVLIST_SIGNAL) { + lam_event_queue_remove(ev, LAM_EVLIST_SIGNAL); + return (lam_evsel->del(lam_evbase, ev)); + } + return (0); +} - return (0); +int lam_event_del(struct lam_event *ev) +{ + int rc; + lam_mutex_lock(&lam_event_lock); + rc = lam_event_del_i(ev); + lam_mutex_unlock(&lam_event_lock); + return rc; } void -lam_event_active(struct lam_event *ev, int res, short ncalls) +lam_event_active_i(struct lam_event *ev, int res, short ncalls) { - /* We get different kinds of events, add them together */ - if (ev->ev_flags & LAM_EVLIST_ACTIVE) { - ev->ev_res |= res; - return; - } + /* We get different kinds of events, add them together */ + if (ev->ev_flags & LAM_EVLIST_ACTIVE) { + ev->ev_res |= res; + return; + } - ev->ev_res = res; - ev->ev_ncalls = ncalls; - ev->ev_pncalls = NULL; - lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE); + ev->ev_res = res; + ev->ev_ncalls = ncalls; + ev->ev_pncalls = NULL; + lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE); } -int + +void +lam_event_active(struct lam_event* ev, int res, short ncalls) +{ + lam_mutex_lock(&lam_event_lock); + lam_event_active_i(ev, res, ncalls); + lam_mutex_unlock(&lam_event_lock); +} + + + +static int lam_timeout_next(struct timeval *tv) { - struct timeval dflt = LAM_TIMEOUT_DEFAULT; + struct timeval dflt = LAM_TIMEOUT_DEFAULT; - struct timeval now; - struct lam_event *ev; + struct timeval now; + struct lam_event *ev; - if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) { - *tv = dflt; - return (0); - } + if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) { + *tv = dflt; + return (0); + } - if (gettimeofday(&now, NULL) == -1) - return (-1); + if (gettimeofday(&now, NULL) == -1) + return (-1); - if (timercmp(&ev->ev_timeout, &now, <=)) { - timerclear(tv); - return (0); - } + if (timercmp(&ev->ev_timeout, &now, <=)) { + timerclear(tv); + return (0); + } - timersub(&ev->ev_timeout, &now, tv); + timersub(&ev->ev_timeout, &now, tv); - assert(tv->tv_sec >= 0); - assert(tv->tv_usec >= 0); + assert(tv->tv_sec >= 0); + assert(tv->tv_usec >= 0); - LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec)); - return (0); + LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec)); + return (0); } -void + +static void lam_timeout_correct(struct timeval *off) { - struct lam_event *ev; + struct lam_event *ev; - /* We can modify the key element of the node without destroying - * the key, beause we apply it to all in the right order. - */ - RB_FOREACH(ev, lam_event_tree, &lam_timetree) - timersub(&ev->ev_timeout, off, &ev->ev_timeout); + /* We can modify the key element of the node without destroying + * the key, beause we apply it to all in the right order. + */ + RB_FOREACH(ev, lam_event_tree, &lam_timetree) + timersub(&ev->ev_timeout, off, &ev->ev_timeout); } -void + +static void lam_timeout_process(void) { - struct timeval now; - struct lam_event *ev, *next; + struct timeval now; + struct lam_event *ev, *next; - gettimeofday(&now, NULL); + gettimeofday(&now, NULL); - for (ev = RB_MIN(lam_event_tree, &lam_timetree); ev; ev = next) { - if (timercmp(&ev->ev_timeout, &now, >)) - break; - next = RB_NEXT(lam_event_tree, &lam_timetree, ev); + for (ev = RB_MIN(lam_event_tree, &lam_timetree); ev; ev = next) { + if (timercmp(&ev->ev_timeout, &now, >)) + break; + next = RB_NEXT(lam_event_tree, &lam_timetree, ev); - lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); + lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); - /* delete this event from the I/O queues */ - lam_event_del(ev); + /* delete this event from the I/O queues */ + lam_event_del_i(ev); - LOG_DBG((LOG_MISC, 60, "timeout_process: call %p", - ev->ev_callback)); - lam_event_active(ev, LAM_EV_TIMEOUT, 1); - } + LOG_DBG((LOG_MISC, 60, "timeout_process: call %p", + ev->ev_callback)); + lam_event_active_i(ev, LAM_EV_TIMEOUT, 1); + } } static void lam_timeout_insert(struct lam_event *ev) { - struct lam_event *tmp; + struct lam_event *tmp; - tmp = RB_FIND(lam_event_tree, &lam_timetree, ev); + tmp = RB_FIND(lam_event_tree, &lam_timetree, ev); - if (tmp != NULL) { - struct timeval tv; - struct timeval add = {0,1}; + if (tmp != NULL) { + struct timeval tv; + struct timeval add = {0,1}; - /* Find unique time */ - tv = ev->ev_timeout; - do { - timeradd(&tv, &add, &tv); - tmp = RB_NEXT(lam_event_tree, &lam_timetree, tmp); - } while (tmp != NULL && timercmp(&tmp->ev_timeout, &tv, ==)); + /* Find unique time */ + tv = ev->ev_timeout; + do { + timeradd(&tv, &add, &tv); + tmp = RB_NEXT(lam_event_tree, &lam_timetree, tmp); + } while (tmp != NULL && timercmp(&tmp->ev_timeout, &tv, ==)); - ev->ev_timeout = tv; - } + ev->ev_timeout = tv; + } - tmp = RB_INSERT(lam_event_tree, &lam_timetree, ev); - assert(tmp == NULL); + tmp = RB_INSERT(lam_event_tree, &lam_timetree, ev); + assert(tmp == NULL); } static void lam_event_queue_remove(struct lam_event *ev, int queue) { - if (!(ev->ev_flags & queue)) - errx(1, "%s: %p(fd %d) not on queue %x", __func__, - ev, ev->ev_fd, queue); + if (!(ev->ev_flags & queue)) + errx(1, "%s: %p(fd %d) not on queue %x", __func__, + ev, ev->ev_fd, queue); - ev->ev_flags &= ~queue; - switch (queue) { - case LAM_EVLIST_ACTIVE: - TAILQ_REMOVE(&lam_activequeue, ev, ev_active_next); - break; - case LAM_EVLIST_SIGNAL: - TAILQ_REMOVE(&lam_signalqueue, ev, ev_signal_next); - break; - case LAM_EVLIST_TIMEOUT: - RB_REMOVE(lam_event_tree, &lam_timetree, ev); - break; - case LAM_EVLIST_INSERTED: - TAILQ_REMOVE(&lam_eventqueue, ev, ev_next); - break; - default: - errx(1, "%s: unknown queue %x", __func__, queue); - } + ev->ev_flags &= ~queue; + switch (queue) { + case LAM_EVLIST_ACTIVE: + TAILQ_REMOVE(&lam_activequeue, ev, ev_active_next); + break; + case LAM_EVLIST_SIGNAL: + TAILQ_REMOVE(&lam_signalqueue, ev, ev_signal_next); + break; + case LAM_EVLIST_TIMEOUT: + RB_REMOVE(lam_event_tree, &lam_timetree, ev); + break; + case LAM_EVLIST_INSERTED: + TAILQ_REMOVE(&lam_eventqueue, ev, ev_next); + break; + default: + errx(1, "%s: unknown queue %x", __func__, queue); + } } static void lam_event_queue_insert(struct lam_event *ev, int queue) { - if (ev->ev_flags & queue) - errx(1, "%s: %p(fd %d) already on queue %x", __func__, - ev, ev->ev_fd, queue); + if (ev->ev_flags & queue) + errx(1, "%s: %p(fd %d) already on queue %x", __func__, + ev, ev->ev_fd, queue); - ev->ev_flags |= queue; - switch (queue) { - case LAM_EVLIST_ACTIVE: - TAILQ_INSERT_TAIL(&lam_activequeue, ev, ev_active_next); - break; - case LAM_EVLIST_SIGNAL: - TAILQ_INSERT_TAIL(&lam_signalqueue, ev, ev_signal_next); - break; - case LAM_EVLIST_TIMEOUT: - lam_timeout_insert(ev); - break; - case LAM_EVLIST_INSERTED: - TAILQ_INSERT_TAIL(&lam_eventqueue, ev, ev_next); - break; - default: - errx(1, "%s: unknown queue %x", __func__, queue); - } + ev->ev_flags |= queue; + switch (queue) { + case LAM_EVLIST_ACTIVE: + TAILQ_INSERT_TAIL(&lam_activequeue, ev, ev_active_next); + break; + case LAM_EVLIST_SIGNAL: + TAILQ_INSERT_TAIL(&lam_signalqueue, ev, ev_signal_next); + break; + case LAM_EVLIST_TIMEOUT: + lam_timeout_insert(ev); + break; + case LAM_EVLIST_INSERTED: + TAILQ_INSERT_TAIL(&lam_eventqueue, ev, ev_next); + break; + default: + errx(1, "%s: unknown queue %x", __func__, queue); + } } diff --git a/src/lam/event/event.h b/src/lam/event/event.h index 0f76da4bc4..f8c231913e 100644 --- a/src/lam/event/event.h +++ b/src/lam/event/event.h @@ -129,18 +129,14 @@ struct lam_eventop { int (*dispatch)(void *, struct timeval *); }; -#define LAM_TIMEOUT_DEFAULT {5, 0} - -void lam_event_init(void); -int lam_event_dispatch(void); - +#define LAM_TIMEOUT_DEFAULT {1, 0} #define LAM_EVLOOP_ONCE 0x01 #define LAM_EVLOOP_NONBLOCK 0x02 + +void lam_event_init(void); +int lam_event_dispatch(void); int lam_event_loop(int); -int lam_timeout_next(struct timeval *); -void lam_timeout_correct(struct timeval *); -void lam_timeout_process(void); #define lam_evtimer_add(ev, tv) lam_event_add(ev, tv) #define lam_evtimer_set(ev, cb, arg) lam_event_set(ev, -1, 0, cb, arg) @@ -164,8 +160,8 @@ void lam_timeout_process(void); void lam_event_set(struct lam_event *, int, short, void (*)(int, short, void *), void *); int lam_event_add(struct lam_event *, struct timeval *); int lam_event_del(struct lam_event *); -void lam_event_active(struct lam_event *, int, short); int lam_event_pending(struct lam_event *, short, struct timeval *); +void lam_event_active(struct lam_event *, int, short); #ifdef WIN32 #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE) @@ -173,6 +169,11 @@ int lam_event_pending(struct lam_event *, short, struct timeval *); #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) #endif +/* for internal use only */ +int lam_event_add_i(struct lam_event *, struct timeval *); +int lam_event_del_i(struct lam_event *); +void lam_event_active_i(struct lam_event *, int, short); + #ifdef __cplusplus } #endif diff --git a/src/lam/event/kqueue.c b/src/lam/event/kqueue.c index a49a990c16..5579b35d40 100644 --- a/src/lam/event/kqueue.c +++ b/src/lam/event/kqueue.c @@ -248,10 +248,10 @@ kq_dispatch(void *arg, struct timeval *tv) if (!(ev->ev_events & LAM_EV_PERSIST)) { ev->ev_flags &= ~EVLIST_X_KQINKERNEL; - lam_event_del(ev); + lam_event_del_i(ev); } - lam_event_active(ev, which, + lam_event_active_i(ev, which, ev->ev_events & LAM_EV_SIGNAL ? events[i].data : 1); } diff --git a/src/lam/event/poll.c b/src/lam/event/poll.c index 47dca957ae..f8b1fa0306 100644 --- a/src/lam/event/poll.c +++ b/src/lam/event/poll.c @@ -55,9 +55,12 @@ #include "event.h" #include "evsignal.h" +#include "lam/threads/mutex.h" + extern struct lam_event_list lam_eventqueue; extern volatile sig_atomic_t lam_evsignal_caught; +extern lam_mutex_t lam_event_lock; struct pollop { int event_count; /* Highest number alloc */ @@ -166,7 +169,9 @@ poll_dispatch(void *arg, struct timeval *tv) return (-1); sec = tv->tv_sec * 1000 + tv->tv_usec / 1000; + lam_mutex_unlock(&lam_event_lock); res = poll(pop->event_set, nfds, sec); + lam_mutex_lock(&lam_event_lock); if (lam_evsignal_recalc(&pop->evsigmask) == -1) return (-1); @@ -209,8 +214,8 @@ poll_dispatch(void *arg, struct timeval *tv) if (res) { if (!(ev->ev_events & LAM_EV_PERSIST)) - lam_event_del(ev); - lam_event_active(ev, res, 1); + lam_event_del_i(ev); + lam_event_active_i(ev, res, 1); } } diff --git a/src/lam/event/select.c b/src/lam/event/select.c index 6deecb200f..025043c68a 100644 --- a/src/lam/event/select.c +++ b/src/lam/event/select.c @@ -197,8 +197,8 @@ select_dispatch(void *arg, struct timeval *tv) if (res) { if (!(ev->ev_events & LAM_EV_PERSIST)) - lam_event_del(ev); - lam_event_active(ev, res, 1); + lam_event_del_i(ev); + lam_event_active_i(ev, res, 1); } else if (ev->ev_fd > maxfd) maxfd = ev->ev_fd; } diff --git a/src/lam/event/signal.c b/src/lam/event/signal.c index e834dfc9ee..3dcfa96b68 100644 --- a/src/lam/event/signal.c +++ b/src/lam/event/signal.c @@ -147,8 +147,8 @@ lam_evsignal_process(void) ncalls = lam_evsigcaught[LAM_EVENT_SIGNAL(ev)]; if (ncalls) { if (!(ev->ev_events & LAM_EV_PERSIST)) - lam_event_del(ev); - lam_event_active(ev, LAM_EV_SIGNAL, ncalls); + lam_event_del_i(ev); + lam_event_active_i(ev, LAM_EV_SIGNAL, ncalls); } } diff --git a/src/mca/mpi/pml/base/pml_base_request.c b/src/mca/mpi/pml/base/pml_base_request.c index 38a8263768..2db0df7ebc 100644 --- a/src/mca/mpi/pml/base/pml_base_request.c +++ b/src/mca/mpi/pml/base/pml_base_request.c @@ -14,11 +14,9 @@ lam_class_t mca_pml_base_request_t_class = { void mca_pml_base_request_construct(mca_pml_base_request_t* req) { - OBJ_CONSTRUCT(&req->req_lock, lam_mutex_t); } void mca_pml_base_request_destruct(mca_pml_base_request_t* req) { - OBJ_DESTRUCT(&req->req_lock); } diff --git a/src/mca/mpi/pml/base/pml_base_request.h b/src/mca/mpi/pml/base/pml_base_request.h index 0fb90e27d2..686af944c9 100644 --- a/src/mca/mpi/pml/base/pml_base_request.h +++ b/src/mca/mpi/pml/base/pml_base_request.h @@ -13,16 +13,7 @@ extern lam_class_t mca_pml_base_request_t_class; -/* MPI request status */ -typedef enum { - MCA_PML_STATUS_INVALID = 1, - MCA_PML_STATUS_INITED = 2, - MCA_PML_STATUS_INCOMPLETE = 3, - MCA_PML_STATUS_COMPLETE = 4, - MCA_PML_STATUS_INACTIVE = 5 -} mca_pml_base_request_status_t; - - +/* request type */ typedef enum { MCA_PML_REQUEST_SEND, MCA_PML_REQUEST_RECV @@ -30,7 +21,7 @@ typedef enum { /* MPI pml (point-to-point) request */ -typedef struct { +struct mca_pml_base_request_t { /* base request */ lam_request_t super; /* pointer to application buffer */ @@ -42,24 +33,21 @@ typedef struct { /* user defined tag */ int32_t req_tag; /* communicator pointer */ - lam_communicator_t *req_communicator; + lam_communicator_t *req_comm; /* pointer to data type */ lam_datatype_t *req_datatype; /* MPI request type - used for test */ mca_pml_base_request_type_t req_type; - /* MPI request status */ - mca_pml_base_request_status_t req_status; /* completion status */ - lam_status_public_t req_status_public; + lam_status_public_t req_status; /* flag indicating if the this is a persistent request */ bool req_persistent; /* flag indicating if MPI is done with this request */ bool req_mpi_done; /* flag indicating if the pt-2-pt layer is done with this request */ - bool req_pml_layer_done; - /* lock to update request status */ - lam_mutex_t req_lock; -} mca_pml_base_request_t; + bool req_pml_done; +}; +typedef struct mca_pml_base_request_t mca_pml_base_request_t; void mca_pml_base_request_construct(mca_pml_base_request_t*); diff --git a/src/mca/mpi/pml/teg/src/pml_teg.h b/src/mca/mpi/pml/teg/src/pml_teg.h index ca74febe94..1b374a947e 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg.h +++ b/src/mca/mpi/pml/teg/src/pml_teg.h @@ -10,6 +10,7 @@ #ifndef MCA_PML_TEG_H #define MCA_PML_TEG_H +#include "lam/threads/thread.h" #include "lam/threads/condition.h" #include "lam/mem/free_list.h" #include "lam/util/cmd_line.h" @@ -34,7 +35,8 @@ struct mca_pml_teg_t { lam_list_t teg_procs; lam_mutex_t teg_lock; lam_condition_t teg_condition; - int teg_reqs_waiting; + int teg_req_waiting; + lam_thread_t teg_thread; int teg_free_list_num; /* initial size of free list */ int teg_free_list_max; /* maximum size of free list */ diff --git a/src/mca/mpi/pml/teg/src/pml_teg_irecv.c b/src/mca/mpi/pml/teg/src/pml_teg_irecv.c index 9499b21bd7..e75b88372d 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_irecv.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_irecv.c @@ -12,6 +12,11 @@ int mca_pml_teg_irecv_init( struct lam_request_t **request) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_irecv_init: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid); +#endif + mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); if(NULL == recvreq) return rc; @@ -40,10 +45,15 @@ int mca_pml_teg_irecv( struct lam_request_t **request) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_irecv: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid); +#endif + mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); if(NULL == recvreq) return rc; - + mca_ptl_base_recv_request_reinit( recvreq, addr, @@ -73,6 +83,11 @@ int mca_pml_teg_recv( lam_status_public_t* status) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_recv: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid); +#endif + mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); if(NULL == recvreq) return rc; diff --git a/src/mca/mpi/pml/teg/src/pml_teg_isend.c b/src/mca/mpi/pml/teg/src/pml_teg_isend.c index cf5d06af77..93f0ba0cb8 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_isend.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_isend.c @@ -19,6 +19,11 @@ int mca_pml_teg_isend_init( lam_request_t **request) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_isend_init: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid); +#endif + mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); if(rc != LAM_SUCCESS) return rc; @@ -51,10 +56,14 @@ int mca_pml_teg_isend( lam_request_t **request) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_isend: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid); +#endif + mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); if(rc != LAM_SUCCESS) return rc; - mca_ptl_base_send_request_reinit( sendreq, buf, @@ -84,6 +93,11 @@ int mca_pml_teg_send( lam_communicator_t* comm) { int rc; + +#if 0 + lam_output(0, "mca_pml_teg_send: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid); +#endif + mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); if(rc != LAM_SUCCESS) return rc; @@ -102,6 +116,6 @@ int mca_pml_teg_send( if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) return rc; - return mca_pml_teg_wait(&sendreq->super, NULL); + return mca_pml_teg_wait((lam_request_t*)sendreq, NULL); } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_module.c b/src/mca/mpi/pml/teg/src/pml_teg_module.c index 3b17baf3f9..74515a7ea1 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_module.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_module.c @@ -2,6 +2,7 @@ * $HEADER$ */ +#include "lam/event/event.h" #include "mpi.h" #include "mca/mpi/pml/pml.h" #include "mca/mpi/ptl/ptl.h" @@ -85,13 +86,20 @@ int mca_pml_teg_module_close(void) } +static void* mca_pml_teg_thread(lam_object_t* thread) +{ + lam_event_dispatch(); + return NULL; +} + + mca_pml_t* mca_pml_teg_module_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads) { *priority = 0; *allow_multi_user_threads = true; - *have_hidden_threads = false; + *have_hidden_threads = true; OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t); mca_pml_teg.teg_ptl_modules = NULL; @@ -108,6 +116,11 @@ mca_pml_t* mca_pml_teg_module_init(int* priority, mca_pml_teg.teg_free_list_inc, NULL); + OBJ_CONSTRUCT(&mca_pml_teg.teg_thread, lam_thread_t); + mca_pml_teg.teg_thread.t_run = mca_pml_teg_thread; + if(lam_thread_start(&mca_pml_teg.teg_thread) != LAM_SUCCESS) + return NULL; + mca_pml_teg.teg_recv_sequence = 0; return &mca_pml_teg.super; } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c index ba025ea3cf..5dfe938ccf 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.c @@ -2,9 +2,17 @@ void mca_pml_teg_recv_request_progress( - mca_ptl_base_recv_request_t* request, + mca_ptl_base_recv_request_t* req, mca_ptl_base_recv_frag_t* frag) { - + lam_mutex_lock(&mca_pml_teg.teg_lock); + req->req_bytes_recvd += frag->super.frag_size; + if (req->req_bytes_recvd >= req->super.req_status.MPI_LENGTH) { + req->super.req_mpi_done = true; + if(mca_pml_teg.teg_req_waiting) { + lam_condition_broadcast(&mca_pml_teg.teg_condition); + } + } + lam_mutex_unlock(&mca_pml_teg.teg_lock); } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.h b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.h index 0b3c09ad51..fe3acaa6b4 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_recvreq.h +++ b/src/mca/mpi/pml/teg/src/pml_teg_recvreq.h @@ -21,7 +21,6 @@ static inline mca_ptl_base_recv_request_t* mca_pml_teg_recv_request_alloc(int *r static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t* request) { - request->super.req_status = MCA_PML_STATUS_INVALID; lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request); } @@ -32,8 +31,7 @@ static inline int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* re { THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock, (req->req_sequence = mca_pml_teg.teg_recv_sequence++)); - - req->super.req_status = MCA_PML_STATUS_INCOMPLETE; + if(req->super.req_peer == LAM_ANY_TAG) { mca_ptl_base_recv_request_match_wild(req); } else { diff --git a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c index 83896191ef..7336ebaf2d 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.c @@ -81,5 +81,14 @@ void mca_pml_teg_send_request_progress( mca_ptl_base_send_request_t* req, mca_ptl_base_send_frag_t* frag) { + lam_mutex_lock(&mca_pml_teg.teg_lock); + req->req_bytes_sent += frag->super.frag_size; + if (req->req_bytes_sent >= req->super.req_length) { + req->super.req_mpi_done = true; + if(mca_pml_teg.teg_req_waiting) { + lam_condition_broadcast(&mca_pml_teg.teg_condition); + } + } + lam_mutex_unlock(&mca_pml_teg.teg_lock); } diff --git a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h index 7238d35d47..2c185b5fa8 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h +++ b/src/mca/mpi/pml/teg/src/pml_teg_sendreq.h @@ -36,7 +36,6 @@ static inline mca_ptl_base_send_request_t* mca_pml_teg_send_request_alloc( static inline void mca_ptl_base_send_request_return( mca_ptl_base_send_request_t* request) { - request->super.req_status = MCA_PML_STATUS_INVALID; request->req_owner->ptl_request_return(request->req_owner, request); } @@ -48,8 +47,8 @@ static inline int mca_pml_teg_send_request_start( int rc; // start the first fragment - if(req->req_length < first_fragment_size) - first_fragment_size = req->req_length; + if(req->super.req_length < first_fragment_size) + first_fragment_size = req->super.req_length; rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size); if(rc != LAM_SUCCESS) return rc; diff --git a/src/mca/mpi/pml/teg/src/pml_teg_wait.c b/src/mca/mpi/pml/teg/src/pml_teg_wait.c index 3bcb5d4698..483becfbac 100644 --- a/src/mca/mpi/pml/teg/src/pml_teg_wait.c +++ b/src/mca/mpi/pml/teg/src/pml_teg_wait.c @@ -1,22 +1,22 @@ #include "pml_teg.h" +#include "mca/mpi/pml/base/pml_base_request.h" int mca_pml_teg_wait( lam_request_t* request, lam_status_public_t* status) { -#if 0 mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)request; if(pml_request->req_mpi_done == false) { lam_mutex_lock(&mca_pml_teg.teg_lock); mca_pml_teg.teg_req_waiting++; - while(request->req_mpi_done == false) - lam_condition_wait(&mca_pml_teg.teg_condition); + while(pml_request->req_mpi_done == false) + lam_condition_wait(&mca_pml_teg.teg_condition, &mca_pml_teg.teg_lock); mca_pml_teg.teg_req_waiting--; lam_mutex_unlock(&mca_pml_teg.teg_lock); } - *status = request->req_status; -#endif + if (status != NULL) + *status = pml_request->req_status; return LAM_SUCCESS; } diff --git a/src/mca/mpi/ptl/base/ptl_base_comm.c b/src/mca/mpi/ptl/base/ptl_base_comm.c index b2d95b6cc0..a5b8cd5f17 100644 --- a/src/mca/mpi/ptl/base/ptl_base_comm.c +++ b/src/mca/mpi/ptl/base/ptl_base_comm.c @@ -43,46 +43,58 @@ int mca_pml_ptl_comm_init_size(mca_pml_comm_t* comm, size_t size) comm->c_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); if(NULL == comm->c_msg_seq) return LAM_ERR_OUT_OF_RESOURCE; + memset(comm->c_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size); /* send message sequence-number support - receiver side */ comm->c_next_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); if(NULL == comm->c_next_msg_seq) return LAM_ERR_OUT_OF_RESOURCE; + memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size); /* matching lock */ comm->c_matching_lock = malloc(sizeof(lam_mutex_t) * size); if(NULL == comm->c_matching_lock) return LAM_ERR_OUT_OF_RESOURCE; - for(i=0; ic_matching_lock+i, lam_mutex_t); + for(i=0; ic_matching_lock+i; + OBJ_CONSTRUCT(object, lam_mutex_t); + } /* unexpected fragments queues */ comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size); if(NULL == comm->c_unexpected_frags) return LAM_ERR_OUT_OF_RESOURCE; - for(i=0; ic_unexpected_frags+i, lam_list_t); + for(i=0; ic_unexpected_frags+i; + OBJ_CONSTRUCT(object, lam_list_t); + } /* these locks are needed to avoid a probe interfering with a match */ comm->c_unexpected_frags_lock = malloc(sizeof(lam_mutex_t) * size); if(NULL == comm->c_unexpected_frags_lock) return LAM_ERR_OUT_OF_RESOURCE; - for(i=0; ic_unexpected_frags_lock+i, lam_mutex_t); + for(i=0; ic_unexpected_frags_lock+i; + OBJ_CONSTRUCT(object, lam_mutex_t); + } /* out-of-order fragments queues */ comm->c_frags_cant_match = malloc(sizeof(lam_list_t) * size); if(NULL == comm->c_frags_cant_match) return LAM_ERR_OUT_OF_RESOURCE; - for(i=0; ic_frags_cant_match+i, lam_list_t); + for(i=0; ic_frags_cant_match+i; + OBJ_CONSTRUCT(object, lam_list_t); + } /* queues of unmatched specific (source process specified) receives */ comm->c_specific_receives = malloc(sizeof(lam_list_t) * size); if(NULL == comm->c_specific_receives) return LAM_ERR_OUT_OF_RESOURCE; - for(i=0; ic_specific_receives+i, lam_list_t); + for(i=0; ic_specific_receives+i; + OBJ_CONSTRUCT(object, lam_list_t); + } return LAM_SUCCESS; } diff --git a/src/mca/mpi/ptl/base/ptl_base_comm.h b/src/mca/mpi/ptl/base/ptl_base_comm.h index 21ffdda83c..da11a71f6f 100644 --- a/src/mca/mpi/ptl/base/ptl_base_comm.h +++ b/src/mca/mpi/ptl/base/ptl_base_comm.h @@ -26,8 +26,8 @@ struct mca_pml_comm_t { /* unexpected fragments queues */ lam_list_t *c_unexpected_frags; - /* these locks are needed to avoid a probe interfering with a match - */ + + /* these locks are needed to avoid a probe interfering with a match */ lam_mutex_t *c_unexpected_frags_lock; /* out-of-order fragments queues */ @@ -47,7 +47,16 @@ struct mca_pml_comm_t { typedef struct mca_pml_comm_t mca_pml_comm_t; -extern int mca_pml_ptl_comm_init_size(struct mca_pml_comm_t*, size_t); +extern int mca_pml_ptl_comm_init_size(struct mca_pml_comm_t*, size_t); + +static inline mca_ptl_base_sequence_t mca_pml_ptl_comm_send_sequence(struct mca_pml_comm_t* comm, int dst) +{ + mca_ptl_base_sequence_t sequence; + lam_mutex_lock(comm->c_matching_lock+dst); + sequence = comm->c_msg_seq[dst]++; + lam_mutex_unlock(comm->c_matching_lock+dst); + return sequence; +} #endif diff --git a/src/mca/mpi/ptl/base/ptl_base_header.h b/src/mca/mpi/ptl/base/ptl_base_header.h index d30a9d8b86..50cf576aa7 100644 --- a/src/mca/mpi/ptl/base/ptl_base_header.h +++ b/src/mca/mpi/ptl/base/ptl_base_header.h @@ -61,11 +61,11 @@ struct mca_ptl_base_match_header_t { /* communicator index */ uint32_t hdr_contextid; /* source rank */ - int32_t hdr_src_rank; + int32_t hdr_src; /* destination rank */ - int32_t hdr_dst_rank; + int32_t hdr_dst; /* user tag */ - int32_t hdr_user_tag; + int32_t hdr_tag; /* message length */ uint32_t hdr_msg_length; /* message sequence number */ diff --git a/src/mca/mpi/ptl/base/ptl_base_match.c b/src/mca/mpi/ptl/base/ptl_base_match.c index 4cc6cc56e7..a7fce738bd 100644 --- a/src/mca/mpi/ptl/base/ptl_base_match.c +++ b/src/mca/mpi/ptl/base/ptl_base_match.c @@ -95,7 +95,7 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header, frag_msg_seq = frag_header->hdr_msg_seq; /* get fragment communicator source rank */ - frag_src = frag_header->hdr_src_rank; + frag_src = frag_header->hdr_src; /* get next expected message sequence number - if threaded * run, lock to make sure that if another thread is processing @@ -218,7 +218,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match * look only at "specific" receives, or "wild" receives, * or if we need to traverse both sets at the same time. */ - frag_src = frag_header->hdr_src_rank; + frag_src = frag_header->hdr_src; if (lam_list_get_size((pml_comm->c_specific_receives)+frag_src) == 0 ){ /* @@ -268,11 +268,11 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match( { /* local parameters */ mca_ptl_base_recv_request_t *return_match, *wild_recv; - int frag_user_tag,recv_user_tag; + int frag_tag,recv_tag; /* initialization */ return_match=(mca_ptl_base_recv_request_t *)NULL; - frag_user_tag=frag_header->hdr_user_tag; + frag_tag=frag_header->hdr_tag; /* * Loop over the wild irecvs - no need to lock, the upper level @@ -286,14 +286,14 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match( wild_recv = (mca_ptl_base_recv_request_t *) ((lam_list_item_t *)wild_recv)->lam_list_next) { - recv_user_tag = wild_recv->super.req_tag; + recv_tag = wild_recv->super.req_tag; if ( /* exact tag match */ - (frag_user_tag == recv_user_tag) || + (frag_tag == recv_tag) || /* wild tag match - negative tags (except for * LAM_ANY_TAG) are reserved for internal use, and will * not be matched with LAM_ANY_TAG */ - ( (recv_user_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) + ( (recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) ) { /* @@ -335,13 +335,13 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat { /* local variables */ mca_ptl_base_recv_request_t *specific_recv, *return_match; - int frag_src,recv_user_tag,frag_user_tag; + int frag_src,recv_tag,frag_tag; /* initialization */ return_match=(mca_ptl_base_recv_request_t *)NULL; - frag_src = frag_header->hdr_src_rank; - frag_user_tag=frag_header->hdr_user_tag; + frag_src = frag_header->hdr_src; + frag_tag=frag_header->hdr_tag; /* * Loop over the specific irecvs. @@ -355,9 +355,9 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat /* * Check for a match */ - recv_user_tag = specific_recv->super.req_tag; - if ( (frag_user_tag == recv_user_tag) || - ( (recv_user_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) { + recv_tag = specific_recv->super.req_tag; + if ( (frag_tag == recv_tag) || + ( (recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) ) { /* * Match made @@ -398,12 +398,12 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive /* local variables */ mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match; mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq; - int frag_src,frag_user_tag, wild_recv_tag, specific_recv_tag; + int frag_src,frag_tag, wild_recv_tag, specific_recv_tag; /* initialization */ return_match=(mca_ptl_base_recv_request_t *)NULL; - frag_src = frag_header->hdr_src_rank; - frag_user_tag=frag_header->hdr_user_tag; + frag_src = frag_header->hdr_src; + frag_tag=frag_header->hdr_tag; /* * We know that when this is called, both specific and wild irecvs @@ -426,8 +426,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive * try and match */ wild_recv_tag = wild_recv->super.req_tag; - if ( (frag_user_tag == wild_recv_tag) || - ( (wild_recv_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) { + if ( (frag_tag == wild_recv_tag) || + ( (wild_recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) ) { /* * Match made @@ -471,8 +471,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive * specific recv is earlier than the wild one. */ specific_recv_tag=specific_recv->super.req_tag; - if ( (frag_user_tag == specific_recv_tag) || - ( (specific_recv_tag == LAM_ANY_TAG) && (0<=frag_user_tag)) ) + if ( (frag_tag == specific_recv_tag) || + ( (specific_recv_tag == LAM_ANY_TAG) && (0<=frag_tag)) ) { /* @@ -540,12 +540,6 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche mca_ptl_base_recv_frag_t *frag_desc; mca_ptl_base_recv_request_t *matched_receive; - /* - * Initialize list size - assume that most of the time this search - * will come up empty, so just initialize count - not pointers - */ - lam_list_set_size(additional_matches,0); - /* * Loop over all the out of sequence messages. No ordering is assumed * in the c_frags_cant_match list. @@ -577,16 +571,6 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche frag_seqber=frag_desc->super.frag_header.hdr_match.hdr_msg_seq; if (frag_seqber == next_msg_seq_expected) { - /* initialize list on first entry - assume that most - * of the time nothing is found, so initially we just - * set the count to zero, and don't initialize any - * other parameters - */ - if(0 == lam_list_get_size(additional_matches)) - { - OBJ_CONSTRUCT(additional_matches, lam_list_t); - } - /* We're now expecting the next sequence number. */ (pml_comm->c_next_msg_seq[frag_src])++; diff --git a/src/mca/mpi/ptl/base/ptl_base_recvfrag.h b/src/mca/mpi/ptl/base/ptl_base_recvfrag.h index 71af3a4ae8..f42c41b6ab 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvfrag.h +++ b/src/mca/mpi/ptl/base/ptl_base_recvfrag.h @@ -3,8 +3,8 @@ */ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ -#ifndef MCA_PML_BASE_RECVFRAG_H -#define MCA_PML_BASE_RECVFRAG_H +#ifndef MCA_PTL_BASE_RECVFRAG_H +#define MCA_PTL_BASE_RECVFRAG_H #include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/base/ptl_base_fragment.h" @@ -60,8 +60,10 @@ static inline int mca_ptl_base_recv_frag_match( { bool matched; lam_list_t matched_frags; - int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags); - if(rc != LAM_SUCCESS) + int rc; + + OBJ_CONSTRUCT(&matched_frags, lam_list_t); + if((rc = mca_ptl_base_match(header, frag, &matched, &matched_frags)) != LAM_SUCCESS) return rc; if(matched) { diff --git a/src/mca/mpi/ptl/base/ptl_base_recvreq.c b/src/mca/mpi/ptl/base/ptl_base_recvreq.c index f24cad1990..45327949da 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvreq.c +++ b/src/mca/mpi/ptl/base/ptl_base_recvreq.c @@ -11,7 +11,7 @@ static void mca_ptl_base_recv_request_construct(mca_ptl_base_recv_request_t*); static void mca_ptl_base_recv_request_destruct(mca_ptl_base_recv_request_t*); -static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t*, int); +static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t*, int); lam_class_t mca_ptl_base_recv_request_t_class = { @@ -39,21 +39,23 @@ static void mca_ptl_base_recv_request_destruct(mca_ptl_base_recv_request_t* requ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* request) { - lam_communicator_t *comm = request->super.req_communicator; + lam_communicator_t *comm = request->super.req_comm; mca_pml_comm_t* pml_comm = comm->c_pml_comm; int req_peer = request->super.req_peer; - + mca_ptl_base_recv_frag_t* frag; + /* check for a specific match */ + THREAD_LOCK(pml_comm->c_matching_lock+req_peer); if (lam_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 && - mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) { + (frag = mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) != NULL) { + THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer); + mca_ptl_base_recv_frag_process(frag); return; /* match found */ } /* We didn't find any matches. Record this irecv so we can match * it when the message comes in. */ - - THREAD_LOCK(pml_comm->c_matching_lock+req_peer); lam_list_append(pml_comm->c_specific_receives+req_peer, (lam_list_item_t*)request); THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer); } @@ -66,7 +68,7 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) { - lam_communicator_t *comm = request->super.req_communicator; + lam_communicator_t *comm = request->super.req_comm; mca_pml_comm_t* pml_comm = comm->c_pml_comm; int proc_count = comm->c_remote_group->grp_proc_count; int proc; @@ -78,15 +80,22 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) * process. */ for (proc = 0; proc < proc_count; proc++) { - + mca_ptl_base_recv_frag_t* frag; + /* continue if no frags to match */ - if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) + THREAD_LOCK(pml_comm->c_matching_lock+proc); + if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) { + THREAD_UNLOCK(pml_comm->c_matching_lock+proc); continue; + } /* loop over messages from the current proc */ - if (mca_ptl_base_recv_request_match_specific_proc(request, proc)) { + if ((frag = mca_ptl_base_recv_request_match_specific_proc(request, proc)) != NULL) { + THREAD_UNLOCK(pml_comm->c_matching_lock+proc); + mca_ptl_base_recv_frag_process(frag); return; /* match found */ } + THREAD_UNLOCK(pml_comm->c_matching_lock+proc); } /* We didn't find any matches. Record this irecv so we can match to @@ -104,41 +113,33 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) * it places the request in the appropriate matched receive list. */ -static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t* request, int proc) +static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc( + mca_ptl_base_recv_request_t* request, int proc) { - mca_pml_comm_t *pml_comm = request->super.req_communicator->c_pml_comm; + mca_pml_comm_t *pml_comm = request->super.req_comm->c_pml_comm; lam_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc; mca_ptl_base_recv_frag_t* frag; int tag = request->super.req_tag; - /* lock for thread safety */ - THREAD_LOCK(pml_comm->c_matching_lock+proc); for (frag = (mca_ptl_base_recv_frag_t*)lam_list_get_first(unexpected_frags); frag != (mca_ptl_base_recv_frag_t*)lam_list_get_end(unexpected_frags); frag = (mca_ptl_base_recv_frag_t*)lam_list_get_next(frag)) { mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match; /* check first frag - we assume that process matching has been done already */ - if (((tag == LAM_ANY_TAG) || (tag == header->hdr_user_tag))) { + if (((tag == LAM_ANY_TAG) || (tag == header->hdr_tag))) { - if (tag == LAM_ANY_TAG && header->hdr_user_tag < 0) { + if (tag == LAM_ANY_TAG && header->hdr_tag < 0) { continue; } - + lam_list_remove_item(unexpected_frags, frag); request->req_sequence = header->hdr_msg_seq; - request->super.req_tag = tag = header->hdr_user_tag; - request->super.req_peer = header->hdr_src_rank; + request->super.req_tag = tag = header->hdr_tag; + request->super.req_peer = header->hdr_src; frag->frag_request = request; - - /* notify ptl fragment has been matched - send cts to peer */ - THREAD_UNLOCK(pml_comm->c_matching_lock+proc); - - mca_ptl_base_recv_frag_process(frag); - return true; + return frag; } } - - THREAD_UNLOCK(pml_comm->c_matching_lock+proc); - return false; + return NULL; } diff --git a/src/mca/mpi/ptl/base/ptl_base_recvreq.h b/src/mca/mpi/ptl/base/ptl_base_recvreq.h index 5ca958a9d1..b7b1d5485e 100644 --- a/src/mca/mpi/ptl/base/ptl_base_recvreq.h +++ b/src/mca/mpi/ptl/base/ptl_base_recvreq.h @@ -14,7 +14,10 @@ struct mca_ptl_base_recv_frag_t; struct mca_ptl_base_recv_request_t { mca_pml_base_request_t super; + /* request sequence number */ mca_ptl_base_sequence_t req_sequence; + /* number of bytes delivered */ + size_t req_bytes_recvd; }; typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t; @@ -34,18 +37,17 @@ static inline void mca_ptl_base_recv_request_reinit( bool persistent) { request->req_sequence = 0; + request->req_bytes_recvd = 0; request->super.req_addr = addr; request->super.req_length = length; request->super.req_datatype = datatype; request->super.req_peer = src; request->super.req_tag = tag; - request->super.req_communicator = comm; + request->super.req_comm = comm; request->super.req_type = MCA_PML_REQUEST_RECV; - request->super.req_status = MCA_PML_STATUS_INITED; request->super.req_persistent = persistent; request->super.req_mpi_done = false; - request->super.req_pml_layer_done = false; + request->super.req_pml_done = false; } - #endif diff --git a/src/mca/mpi/ptl/base/ptl_base_sendfrag.h b/src/mca/mpi/ptl/base/ptl_base_sendfrag.h index 8b4a4c0db8..a92b340ea1 100644 --- a/src/mca/mpi/ptl/base/ptl_base_sendfrag.h +++ b/src/mca/mpi/ptl/base/ptl_base_sendfrag.h @@ -1,8 +1,8 @@ /* * $HEADER$ */ -#ifndef MCA_PML_BASE_SEND_FRAG_H -#define MCA_PML_BASE_SEND_FRAG_H +#ifndef MCA_PTL_BASE_SEND_FRAG_H +#define MCA_PTL_BASE_SEND_FRAG_H #include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/base/ptl_base_fragment.h" diff --git a/src/mca/mpi/ptl/base/ptl_base_sendreq.h b/src/mca/mpi/ptl/base/ptl_base_sendreq.h index bb8452faa9..9f28914df0 100644 --- a/src/mca/mpi/ptl/base/ptl_base_sendreq.h +++ b/src/mca/mpi/ptl/base/ptl_base_sendreq.h @@ -9,6 +9,7 @@ #include "mca/mpi/ptl/ptl.h" #include "mca/mpi/pml/base/pml_base_request.h" +#include "mca/mpi/ptl/base/ptl_base_comm.h" extern lam_class_t mca_ptl_base_send_request_t_class; @@ -18,10 +19,6 @@ struct mca_ptl_base_send_frag_t; struct mca_ptl_base_send_request_t { /* request object - common data structure for use by wait/test */ mca_pml_base_request_t super; - /* pointer to user data */ - unsigned char *req_data; - /* size of send/recv in bytes */ - size_t req_length; /* number of bytes that have already been assigned to a fragment */ size_t req_offset; /* number of fragments that have been allocated */ @@ -35,7 +32,7 @@ struct mca_ptl_base_send_request_t { /* type of send */ mca_pml_base_send_mode_t req_send_mode; /* sequence number for MPI pt-2-pt ordering */ - mca_ptl_base_sequence_t req_msg_sequence_number; + mca_ptl_base_sequence_t req_msg_seq; /* queue of fragments that are waiting to be acknowledged */ mca_ptl_base_queue_t req_unacked_frags; /* PTL that allocated this descriptor */ @@ -66,17 +63,17 @@ static inline void mca_ptl_base_send_request_reinit( request->req_bytes_acked = 0; request->req_send_mode = sendmode; request->req_peer_request.lval = 0; + request->req_msg_seq = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer); request->super.req_addr = addr; request->super.req_length = length; request->super.req_datatype = datatype; request->super.req_peer = peer; request->super.req_tag = tag; - request->super.req_communicator = comm; + request->super.req_comm = comm; request->super.req_type = MCA_PML_REQUEST_SEND; - request->super.req_status = MCA_PML_STATUS_INITED; request->super.req_persistent = persistent; request->super.req_mpi_done = false; - request->super.req_pml_layer_done = false; + request->super.req_pml_done = false; } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp.c index dcd6a4542a..11068fd3b4 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp.c @@ -6,14 +6,18 @@ #include "lam/util/if.h" #include "mca/mpi/pml/pml.h" #include "mca/mpi/ptl/ptl.h" +#include "mca/mpi/ptl/base/ptl_base_header.h" #include "mca/mpi/ptl/base/ptl_base_sendreq.h" #include "mca/mpi/ptl/base/ptl_base_sendfrag.h" +#include "mca/mpi/ptl/base/ptl_base_recvreq.h" +#include "mca/mpi/ptl/base/ptl_base_recvfrag.h" #include "mca/lam/base/mca_base_module_exchange.h" #include "ptl_tcp.h" #include "ptl_tcp_addr.h" #include "ptl_tcp_peer.h" #include "ptl_tcp_proc.h" #include "ptl_tcp_sendreq.h" +#include "ptl_tcp_recvfrag.h" mca_ptl_tcp_t mca_ptl_tcp = { @@ -151,11 +155,25 @@ int mca_ptl_tcp_send( void mca_ptl_tcp_recv( - struct mca_ptl_t* ptl, - struct mca_ptl_base_recv_frag_t* frag) + mca_ptl_t* ptl, + mca_ptl_base_recv_frag_t* frag) { - if(mca_ptl_tcp_recv_frag_cts((mca_ptl_tcp_recv_frag_t*)frag) == false) { - lam_list_append(&mca_ptl_tcp_module.tcp_acks, (lam_list_item_t*)frag); + /* fill in match */ + mca_pml_base_request_t* request = &frag->frag_request->super; + mca_ptl_base_header_t* header = &frag->super.frag_header; + request->req_status.MPI_SOURCE = header->hdr_match.hdr_src; + request->req_status.MPI_TAG = header->hdr_match.hdr_tag; + request->req_status.MPI_ERROR = LAM_SUCCESS; + request->req_status.MPI_LENGTH = header->hdr_match.hdr_msg_length; + + /* send cts ack back to peer */ + if(request->req_status.MPI_LENGTH > frag->super.frag_size) { + if(mca_ptl_tcp_recv_frag_cts((mca_ptl_tcp_recv_frag_t*)frag) == false) { + lam_list_append(&mca_ptl_tcp_module.tcp_acks, (lam_list_item_t*)frag); + } } + + /* process fragment if complete */ + mca_ptl_tcp_recv_frag_process((mca_ptl_tcp_recv_frag_t*)frag); } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp.h index c6ce6c46fc..8fb61d1975 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp.h @@ -24,8 +24,8 @@ struct mca_ptl_tcp_module_1_0_0_t { struct mca_ptl_tcp_t** tcp_ptls; size_t tcp_num_ptls; /**< number of ptls actually used */ size_t tcp_max_ptls; /**< maximum number of ptls - available kernel ifs */ - int tcp_listen; - unsigned short tcp_port; + int tcp_listen_sd; + unsigned short tcp_listen_port; char* tcp_if_include; /**< comma seperated list of interface to include */ char* tcp_if_exclude; /**< comma seperated list of interface to exclude */ int tcp_free_list_num; /**< initial size of free lists */ diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c index 6427b4213c..e786cbcc1d 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_module.c @@ -3,6 +3,11 @@ */ #include #include +#include +#include +#include +#include +#include #include "lam/constants.h" #include "lam/util/if.h" @@ -211,36 +216,57 @@ static int mca_ptl_tcp_module_create_instances(void) static int mca_ptl_tcp_module_create_listen(void) { + int flags; + /* create a listen socket for incoming connections */ - mca_ptl_tcp_module.tcp_listen = socket(AF_INET, SOCK_STREAM, 0); - if(mca_ptl_tcp_module.tcp_listen < 0) { + mca_ptl_tcp_module.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0); + if(mca_ptl_tcp_module.tcp_listen_sd < 0) { lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno); return LAM_ERROR; } /* bind to all addresses and dynamically assigned port */ struct sockaddr_in inaddr; + memset(&inaddr, 0, sizeof(inaddr)); inaddr.sin_family = AF_INET; inaddr.sin_addr.s_addr = INADDR_ANY; inaddr.sin_port = 0; - if(bind(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { + if(bind(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { lam_output(0,"mca_ptl_tcp_module_init: bind() failed with errno=%d", errno); return LAM_ERROR; } /* resolve system assignend port */ lam_socklen_t addrlen = sizeof(struct sockaddr_in); - if(getsockname(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, &addrlen) < 0) { + if(getsockname(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) { lam_output(0, "mca_ptl_tcp_module_init: getsockname() failed with errno=%d", errno); return LAM_ERROR; } - mca_ptl_tcp_module.tcp_port = inaddr.sin_port; + mca_ptl_tcp_module.tcp_listen_port = inaddr.sin_port; + /* setup listen backlog to maximum allowed by kernel */ + if(listen(mca_ptl_tcp_module.tcp_listen_sd, SOMAXCONN) < 0) { + lam_output(0, "mca_ptl_tcp_module_init: listen() failed with errno=%d", errno); + return LAM_ERROR; + } + + /* set socket up to be non-blocking, otherwise accept could block */ + if((flags = fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_GETFL, 0)) < 0) { + lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_GETFL) failed with errno=%d", errno); + return LAM_ERROR; + } else { + flags |= O_NONBLOCK; + if(fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_SETFL, flags) < 0) { + lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_SETFL) failed with errno=%d", errno); + return LAM_ERROR; + } + } + /* register listen port */ lam_event_set( &mca_ptl_tcp_module.tcp_recv_event, - mca_ptl_tcp_module.tcp_listen, + mca_ptl_tcp_module.tcp_listen_sd, LAM_EV_READ|LAM_EV_PERSIST, mca_ptl_tcp_module_recv_handler, 0); @@ -261,7 +287,7 @@ static int mca_ptl_tcp_module_exchange(void) for(i=0; iptl_ifaddr.sin_addr; - addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen; + addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen_port; addrs[i].addr_inuse = 0; } int rc = mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version, addrs, size); @@ -353,10 +379,10 @@ void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp) static void mca_ptl_tcp_module_accept(void) { - lam_socklen_t addrlen = sizeof(struct sockaddr_in); while(true) { + lam_socklen_t addrlen = sizeof(struct sockaddr_in); struct sockaddr_in addr; - int sd = accept(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&addr, &addrlen); + int sd = accept(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen); if(sd < 0) { if(errno == EINTR) continue; @@ -385,7 +411,7 @@ static void mca_ptl_tcp_module_recv_handler(int sd, short flags, void* user) lam_socklen_t addr_len = sizeof(addr); /* accept new connections on the listen socket */ - if(mca_ptl_tcp_module.tcp_listen == sd) { + if(mca_ptl_tcp_module.tcp_listen_sd == sd) { mca_ptl_tcp_module_accept(); return; } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_peer.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_peer.c index 8b508996cb..ea3fbca263 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_peer.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_peer.c @@ -11,6 +11,7 @@ #include "ptl_tcp_addr.h" #include "ptl_tcp_peer.h" #include "ptl_tcp_proc.h" +#include "ptl_tcp_sendfrag.h" static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer); @@ -50,6 +51,22 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer) } +static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, int sd) +{ + lam_event_set( + &ptl_peer->peer_recv_event, + ptl_peer->peer_sd, + LAM_EV_READ|LAM_EV_PERSIST, + mca_ptl_tcp_peer_recv_handler, + ptl_peer); + lam_event_set( + &ptl_peer->peer_send_event, + ptl_peer->peer_sd, + LAM_EV_WRITE|LAM_EV_PERSIST, + mca_ptl_tcp_peer_send_handler, + ptl_peer); +} + /* * Cleanup any resources held by the peer. */ @@ -88,6 +105,8 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) { ptl_peer->peer_send_frag = frag; lam_event_add(&mca_ptl_tcp_module.tcp_send_event, 0); + } else { + ptl_peer->peer_ptl->super.ptl_send_progress(frag->super.frag_request, &frag->super); } } break; @@ -96,6 +115,50 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t return rc; } + +/* + * A blocking send on a non-blocking socket. Used to send the small amount of connection + * information that identifies the peers endpoint. + */ +static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size) +{ + unsigned char* ptr = (unsigned char*)data; + size_t cnt = 0; + while(cnt < size) { + int retval = send(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0); + if(retval < 0) { + if(errno == EINTR) + continue; + if(errno != EAGAIN && errno != EWOULDBLOCK) { + lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno); + mca_ptl_tcp_peer_close_i(ptl_peer); + return -1; + } + } + cnt += retval; + } + return cnt; +} + + +/* + * Send the globally unique identifier for this process to a peer on + * a newly connected socket. + */ + +static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer) +{ + /* send process identifier to remote peer */ + mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local(); + uint32_t size_n = htonl(ptl_proc->proc_guid_size); + if(mca_ptl_tcp_peer_send_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n) || + mca_ptl_tcp_peer_send_blocking(ptl_peer, ptl_proc->proc_guid, ptl_proc->proc_guid_size) != + ptl_proc->proc_guid_size) { + return LAM_ERR_UNREACH; + } + return LAM_SUCCESS; +} + /* * Check the state of this peer. If the incoming connection request matches * our peers address, check the state of our connection: @@ -118,6 +181,12 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in* peer_proc->proc_lam->proc_vpid < this_proc->proc_lam->proc_vpid)) { mca_ptl_tcp_peer_close_i(ptl_peer); ptl_peer->peer_sd = sd; + if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) != LAM_SUCCESS) { + mca_ptl_tcp_peer_close_i(ptl_peer); + return false; + } + mca_ptl_tcp_peer_event_init(ptl_peer, sd); + lam_event_add(&ptl_peer->peer_recv_event, 0); mca_ptl_tcp_peer_connected(ptl_peer); THREAD_UNLOCK(&ptl_peer->peer_lock); return true; @@ -172,19 +241,6 @@ static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer) lam_list_remove_first(&ptl_peer->peer_frags); lam_event_add(&ptl_peer->peer_send_event, 0); } - lam_event_set( - &ptl_peer->peer_recv_event, - ptl_peer->peer_sd, - LAM_EV_READ|LAM_EV_PERSIST, - mca_ptl_tcp_peer_recv_handler, - ptl_peer); - lam_event_set( - &ptl_peer->peer_send_event, - ptl_peer->peer_sd, - LAM_EV_WRITE|LAM_EV_PERSIST, - mca_ptl_tcp_peer_send_handler, - ptl_peer); - lam_event_add(&ptl_peer->peer_recv_event, 0); } @@ -221,30 +277,6 @@ static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_base_peer_t* ptl_peer, void* d } -/* - * A blocking send on a non-blocking socket. Used to send the small amount of connection - * information that identifies the peers endpoint. - */ -static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size) -{ - unsigned char* ptr = (unsigned char*)data; - size_t cnt = 0; - while(cnt < size) { - int retval = send(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0); - if(retval < 0) { - if(errno == EINTR) - continue; - if(errno != EAGAIN && errno != EWOULDBLOCK) { - lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno); - mca_ptl_tcp_peer_close_i(ptl_peer); - return -1; - } - } - cnt += retval; - } - return cnt; -} - /* * Receive the peers globally unique process identification from a newly @@ -258,7 +290,7 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer) void* guid; mca_ptl_tcp_proc_t* ptl_proc = ptl_peer->peer_proc; - if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, &size_n, sizeof(size_n)) != size_n) + if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n)) return LAM_ERR_UNREACH; size_h = ntohl(size_n); guid = malloc(size_h); @@ -283,24 +315,6 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer) } -/* - * Send the globally unique identifier for this process to a peer on - * a newly connected socket. - */ - -static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer) -{ - /* send process identifier to remote peer */ - mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local(); - uint32_t size_n = htonl(ptl_proc->proc_guid_size); - if(mca_ptl_tcp_peer_send_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n) || - mca_ptl_tcp_peer_send_blocking(ptl_peer, ptl_proc->proc_guid, ptl_proc->proc_guid_size) != - ptl_proc->proc_guid_size) { - return LAM_ERR_UNREACH; - } - return LAM_SUCCESS; -} - /* * Start a connection to the peer. This will likely not complete, @@ -319,6 +333,9 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer) return LAM_ERR_UNREACH; } + /* setup event callbacks */ + mca_ptl_tcp_peer_event_init(ptl_peer, ptl_peer->peer_sd); + /* setup the socket as non-blocking */ int flags; if((flags = fcntl(ptl_peer->peer_sd, F_GETFL, 0)) < 0) { @@ -330,13 +347,16 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer) } /* start the connect - will likely fail with EINPROGRESS */ - if(connect(ptl_peer->peer_sd, (struct sockaddr*)&ptl_peer->peer_addr->addr_inet, - sizeof(struct sockaddr_in)) < 0) { + struct sockaddr_in peer_addr; + peer_addr.sin_family = AF_INET; + peer_addr.sin_addr = ptl_peer->peer_addr->addr_inet; + peer_addr.sin_port = ptl_peer->peer_addr->addr_port; + if(connect(ptl_peer->peer_sd, (struct sockaddr*)&peer_addr, sizeof(peer_addr)) < 0) { /* non-blocking so wait for completion */ if(errno == EINPROGRESS) { ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING; lam_event_add(&ptl_peer->peer_send_event, 0); - return rc; + return LAM_SUCCESS; } mca_ptl_tcp_peer_close_i(ptl_peer); ptl_peer->peer_retries++; @@ -379,7 +399,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer) return; } if(so_error != 0) { - lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failedd with errno=%d\n", so_error); + lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error); mca_ptl_tcp_peer_close_i(ptl_peer); return; } @@ -452,11 +472,16 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user) case MCA_PTL_TCP_CONNECTING: mca_ptl_tcp_peer_complete_connect(ptl_peer); break; - case MCA_PTL_TCP_CONNECTED: + case MCA_PTL_TCP_CONNECTED: + { /* complete the current send */ + mca_ptl_t *ptl = (mca_ptl_t*)ptl_peer->peer_ptl; do { - if(mca_ptl_tcp_send_frag_handler(ptl_peer->peer_send_frag, ptl_peer->peer_sd) == false) + mca_ptl_tcp_send_frag_t* send_frag = ptl_peer->peer_send_frag; + if(mca_ptl_tcp_send_frag_handler(send_frag, ptl_peer->peer_sd) == false) break; + ptl->ptl_send_progress(send_frag->super.frag_request, &send_frag->super); + /* progress any pending sends */ ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*) lam_list_remove_first(&ptl_peer->peer_frags); @@ -467,6 +492,7 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user) lam_event_del(&ptl_peer->peer_send_event); } break; + } default: lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)", ptl_peer->peer_state); diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c index 8073f954e3..e1214830ea 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -10,6 +10,11 @@ #include "ptl_tcp_recvfrag.h" +#define frag_header super.super.frag_header +#define frag_peer super.super.frag_peer +#define frag_owner super.super.frag_owner + + static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag); static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t); @@ -43,8 +48,7 @@ void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_pe frag->frag_owner = &peer->peer_ptl->super; frag->super.frag_request = 0; frag->frag_peer = peer; - frag->frag_addr = 0; - frag->frag_size = 0; + frag->frag_buff = NULL; frag->frag_hdr_cnt = 0; frag->frag_msg_cnt = 0; } @@ -124,21 +128,33 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) return false; if(frag->frag_msg_cnt == 0) { + /* attempt to match a posted recv */ mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match); /* match was not made - so allocate buffer for eager send */ - if(NULL == frag->super.frag_request) { - frag->frag_addr = malloc(frag->frag_header.hdr_frag.hdr_frag_length); - frag->frag_size = frag->frag_header.hdr_frag.hdr_frag_length; - } else { - frag->frag_addr = (unsigned char*)frag->super.super.frag_addr; - frag->frag_size = frag->super.super.frag_size; - } - if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) - return false; + if (NULL == frag->super.frag_request) { - } else if(frag->frag_msg_cnt < frag->super.super.frag_size) { +#if 0 + lam_output(0, "mca_ptl_tcp_recv_frag_match: src=%d tag=%d comm=%d seq=%ld", + frag->frag_header.hdr_match.hdr_src, + frag->frag_header.hdr_match.hdr_tag, + frag->frag_header.hdr_match.hdr_contextid, + frag->frag_header.hdr_match.hdr_msg_seq); +#endif + + if(frag->frag_header.hdr_frag.hdr_frag_length > 0) { + frag->frag_buff = malloc(frag->frag_header.hdr_frag.hdr_frag_length); + frag->super.super.frag_size = frag->frag_header.hdr_frag.hdr_frag_length; + } + + /* match was made - use application buffer */ + } else { + frag->frag_buff = (unsigned char*)frag->super.super.frag_addr; + } + } + + if(frag->frag_msg_cnt < frag->super.super.frag_size) { if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) return false; } @@ -147,8 +163,8 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) return false; - /* if match has already been made process the fragment */ - if(NULL != frag->super.frag_request) + /* if fragment has already been matched - go ahead and process */ + if (frag->super.frag_request != NULL) mca_ptl_tcp_recv_frag_process(frag); return true; } @@ -187,7 +203,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) { int cnt = -1; while(cnt < 0) { - cnt = recv(sd, (unsigned char*)frag->frag_addr+frag->frag_msg_cnt, frag->frag_size-frag->frag_msg_cnt, 0); + cnt = recv(sd, (unsigned char*)frag->frag_buff+frag->frag_msg_cnt, + frag->super.super.frag_size-frag->frag_msg_cnt, 0); if(cnt == 0) { mca_ptl_tcp_peer_close(frag->frag_peer); lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); @@ -208,7 +225,7 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd) } } frag->frag_msg_cnt += cnt; - return (frag->frag_msg_cnt >= frag->frag_size); + return (frag->frag_msg_cnt >= frag->super.super.frag_size); } @@ -270,7 +287,7 @@ bool mca_ptl_tcp_recv_frag_cts(mca_ptl_tcp_recv_frag_t* frag) ack->frag_owner = &ptl_peer->peer_ptl->super; ack->frag_peer = ptl_peer; ack->super.frag_request = 0; - ack->super.super.frag_addr = 0; + ack->super.super.frag_addr = NULL; ack->super.super.frag_size = 0; ack->frag_vec_ptr = ack->frag_vec; ack->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr; @@ -282,17 +299,13 @@ bool mca_ptl_tcp_recv_frag_cts(mca_ptl_tcp_recv_frag_t* frag) /* - * Copy data into application buffer if required and update - * status of the request. */ void mca_ptl_tcp_recv_frag_process(mca_ptl_tcp_recv_frag_t* frag) { - /* are we done receiving data */ if(frag->frag_msg_cnt >= frag->frag_header.hdr_frag.hdr_frag_length) { - /* was a temporary buffer allocated */ - if(frag->frag_addr != frag->super.super.frag_addr) { - memcpy(frag->super.super.frag_addr, frag->frag_addr, frag->super.super.frag_size); + if(frag->frag_buff != frag->super.super.frag_addr) { + memcpy(frag->super.super.frag_addr, frag->frag_buff, frag->super.super.frag_size); } frag->frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super); if(frag->frag_acked == true) { diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h index d4101ea717..e96c95c039 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_recvfrag.h @@ -20,14 +20,10 @@ extern lam_class_t mca_ptl_tcp_recv_frag_t_class; struct mca_ptl_tcp_recv_frag_t { mca_ptl_base_recv_frag_t super; mca_ptl_base_ack_header_t frag_ack; - unsigned char* frag_addr; - size_t frag_size; + unsigned char* frag_buff; size_t frag_hdr_cnt; size_t frag_msg_cnt; bool frag_acked; -#define frag_peer super.super.frag_peer -#define frag_owner super.super.frag_owner -#define frag_header super.super.frag_header }; typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; @@ -39,8 +35,8 @@ static inline mca_ptl_tcp_recv_frag_t* mca_ptl_tcp_recv_frag_alloc(int* rc) static inline void mca_ptl_tcp_recv_frag_return(mca_ptl_tcp_recv_frag_t* frag) { - if(frag->frag_addr != frag->super.super.frag_addr) - free(frag->frag_addr); + if(frag->frag_buff != frag->super.super.frag_addr) + free(frag->frag_buff); lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); } diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c index f0922022b2..033632d6d8 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -10,6 +10,10 @@ #include "ptl_tcp_peer.h" #include "ptl_tcp_sendfrag.h" +#define frag_header super.super.frag_header +#define frag_owner super.super.frag_owner +#define frag_peer super.super.frag_peer + static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag); static void mca_ptl_tcp_send_frag_destruct(mca_ptl_tcp_send_frag_t* frag); @@ -54,12 +58,12 @@ void mca_ptl_tcp_send_frag_reinit( hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; hdr->hdr_frag.hdr_dst_ptr.pval = 0; - hdr->hdr_match.hdr_contextid = sendreq->super.req_communicator->c_contextid; - hdr->hdr_match.hdr_src_rank = sendreq->super.req_communicator->c_my_rank; - hdr->hdr_match.hdr_dst_rank = sendreq->super.req_peer; - hdr->hdr_match.hdr_user_tag = sendreq->super.req_tag; - hdr->hdr_match.hdr_msg_length = sendreq->req_length; - hdr->hdr_match.hdr_msg_seq = 0; + hdr->hdr_match.hdr_contextid = sendreq->super.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->super.req_peer; + hdr->hdr_match.hdr_tag = sendreq->super.req_tag; + hdr->hdr_match.hdr_msg_length = sendreq->super.req_length; + hdr->hdr_match.hdr_msg_seq = sendreq->req_msg_seq; } else { hdr->hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_flags = 0; @@ -71,8 +75,8 @@ void mca_ptl_tcp_send_frag_reinit( } /* update request */ - if(sendreq->req_offset + size > sendreq->req_length) - size = sendreq->req_length = sendreq->req_offset; + if(sendreq->req_offset + size > sendreq->super.req_length) + size = sendreq->super.req_length = sendreq->req_offset; hdr->hdr_frag.hdr_frag_length = size; sendreq->req_offset += size; sendreq->req_frags++; diff --git a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h index aefd50dd26..7ab7bf1b4b 100644 --- a/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h +++ b/src/mca/mpi/ptl/tcp/src/ptl_tcp_sendfrag.h @@ -21,9 +21,6 @@ struct mca_ptl_tcp_send_frag_t { struct iovec *frag_vec_ptr; size_t frag_vec_cnt; struct iovec frag_vec[2]; -#define frag_peer super.super.frag_peer -#define frag_header super.super.frag_header -#define frag_owner super.super.frag_owner }; typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t; diff --git a/src/mpi/communicator/communicator.h b/src/mpi/communicator/communicator.h index 0bb9546482..e34f4d0031 100644 --- a/src/mpi/communicator/communicator.h +++ b/src/mpi/communicator/communicator.h @@ -61,21 +61,22 @@ static inline int lam_comm_rank(lam_communicator_t* comm) { return comm->c_my_rank; } +/** + * size of the communicator + */ +static inline int lam_comm_size(lam_communicator_t* comm) +{ + return comm->c_remote_group->grp_proc_count; +} + /* return pointer to communicator associated with context id cid, * No error checking is done*/ static inline lam_communicator_t *lam_comm_lookup(uint32_t cid) { /* array of pointers to communicators, indexed by context ID */ - extern lam_communicator_t **lam_mpi_comm_array; -#if LAM_ENABLE_DEBUG - extern uint32_t lam_mpi_comm_array_size; - if (cid >= lam_mpi_comm_array_size) { - lam_output(0, "lam_comm_lookup: invalid communicator index (%d)", cid); - return (lam_communicator_t *) NULL; - } -#endif - return lam_mpi_comm_array[cid]; + extern lam_pointer_array_t lam_mpi_communicators; + return (lam_communicator_t*)lam_pointer_array_get_item(&lam_mpi_communicators, cid); } static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, int peer_id) diff --git a/src/mpi/communicator/lam_comm_init.c b/src/mpi/communicator/lam_comm_init.c index 1a05184ddd..eba8d9c38e 100644 --- a/src/mpi/communicator/lam_comm_init.c +++ b/src/mpi/communicator/lam_comm_init.c @@ -15,11 +15,9 @@ * Global variables */ -lam_communicator_t **lam_mpi_comm_array; -size_t lam_mpi_comm_array_size; - -lam_communicator_t lam_mpi_comm_world; -lam_communicator_t lam_mpi_comm_self; +lam_pointer_array_t lam_mpi_communicators; +lam_communicator_t lam_mpi_comm_world; +lam_communicator_t lam_mpi_comm_self; static void lam_comm_construct(lam_communicator_t* comm) @@ -62,6 +60,9 @@ int lam_comm_init(void) lam_group_t *group; size_t size; + /* Setup communicator array */ + OBJ_CONSTRUCT(&lam_mpi_communicators, lam_pointer_array_t); + /* Setup MPI_COMM_WORLD */ OBJ_CONSTRUCT(&lam_mpi_comm_world, lam_communicator_t); group = OBJ_NEW(lam_group_t); @@ -75,6 +76,7 @@ int lam_comm_init(void) lam_mpi_comm_world.c_local_group = group; lam_mpi_comm_world.c_remote_group = group; mca_pml.pml_add_comm(&lam_mpi_comm_world); + lam_pointer_array_set_item(&lam_mpi_communicators, 0, &lam_mpi_comm_world); /* Setup MPI_COMM_SELF */ OBJ_CONSTRUCT(&lam_mpi_comm_self, lam_communicator_t); @@ -89,6 +91,7 @@ int lam_comm_init(void) lam_mpi_comm_self.c_local_group = group; lam_mpi_comm_self.c_remote_group = group; mca_pml.pml_add_comm(&lam_mpi_comm_self); + lam_pointer_array_set_item(&lam_mpi_communicators, 1, &lam_mpi_comm_self); return LAM_SUCCESS; } diff --git a/src/mpi/interface/c/comm_size.c b/src/mpi/interface/c/comm_size.c index 99c6bd443e..6e1af18004 100644 --- a/src/mpi/interface/c/comm_size.c +++ b/src/mpi/interface/c/comm_size.c @@ -6,11 +6,13 @@ #include "mpi.h" #include "mpi/interface/c/bindings.h" +#include "mpi/communicator/communicator.h" #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #pragma weak MPI_Comm_size = PMPI_Comm_size #endif int MPI_Comm_size(MPI_Comm comm, int *size) { + *size = lam_comm_size(comm); return MPI_SUCCESS; } diff --git a/src/mpi/interface/c/wtime.c b/src/mpi/interface/c/wtime.c index 3c55c80b09..29cb1036fe 100644 --- a/src/mpi/interface/c/wtime.c +++ b/src/mpi/interface/c/wtime.c @@ -3,6 +3,7 @@ */ #include "lam_config.h" #include +#include #include "mpi.h" #include "mpi/interface/c/bindings.h" @@ -12,5 +13,10 @@ #endif double MPI_Wtime(void) { - return (double)0; + struct timeval tv; + double wtime; + gettimeofday(&tv, NULL); + wtime = tv.tv_sec; + wtime += (double)tv.tv_usec / 1000000.0; + return wtime; } diff --git a/src/mpi/proc/proc.c b/src/mpi/proc/proc.c index fcea6ee751..258bf683c2 100644 --- a/src/mpi/proc/proc.c +++ b/src/mpi/proc/proc.c @@ -77,7 +77,6 @@ int lam_proc_init(void) proc->proc_vpid = vpid; if(proc->proc_vpid == local->vpid && strcmp(proc->proc_job, local->job_handle) == 0) { lam_proc_local_proc = proc; - lam_output(0, "myrank=%d\n", local->vpid); } } free(procs); diff --git a/src/tools/laminfo/Makefile.am b/src/tools/laminfo/Makefile.am index e65d94dd74..1ee75f3764 100644 --- a/src/tools/laminfo/Makefile.am +++ b/src/tools/laminfo/Makefile.am @@ -27,7 +27,7 @@ laminfo_SOURCES = \ param.cc \ version.cc laminfo_LDADD = $(libs) $(LIBMPI_EXTRA_LIBS) $(LIBLAM_EXTRA_LIBS) -laminfo_LDFLAGS = $(LIBMPI_EXTRA_LDFLAGS) $(LIBLAM_EXTRA_LDFLAGS) +laminfo_LDFLAGS = $(LIBMPI_EXTRA_LDFLAGS) $(LIBLAM_EXTRA_LDFLAGS) -lpthread laminfo_DEPENDENCIES = $(libs) clean-local: diff --git a/test/mpi/environment/Makefile.am b/test/mpi/environment/Makefile.am index 1856444dc9..92294cc713 100644 --- a/test/mpi/environment/Makefile.am +++ b/test/mpi/environment/Makefile.am @@ -12,6 +12,7 @@ noinst_PROGRAMS = chello chello_SOURCES = chello.c chello_LDADD = \ $(top_builddir)/src/libmpi.la \ - $(top_builddir)/src/liblam.la + $(top_builddir)/src/liblam.la \ + -lpthread chello_DEPENDENCIES = $(chello_LDADD)