1
1

first cut at blocking send/recv (via tcp)

This commit was SVN r842.
Этот коммит содержится в:
Tim Woodall 2004-03-11 22:02:01 +00:00
родитель 61d83a1875
Коммит 7b590f00b4
44 изменённых файлов: 797 добавлений и 606 удалений

Просмотреть файл

@ -40,6 +40,7 @@ struct lam_status_public_t {
int MPI_SOURCE; int MPI_SOURCE;
int MPI_TAG; int MPI_TAG;
int MPI_ERROR; int MPI_ERROR;
int MPI_LENGTH;
}; };
typedef struct lam_status_public_t lam_status_public_t; typedef struct lam_status_public_t lam_status_public_t;

Просмотреть файл

@ -37,7 +37,7 @@ libevent_la_DEPENDENCIES = $(lt_libobj)
include_HEADERS = event.h include_HEADERS = event.h
# LAM: Make it work in a VPATH environment # LAM: Make it work in a VPATH environment
INCLUDES = -I$(top_srcdir)/compat INCLUDES = -I$(top_srcdir)/compat -I$(top_srcdir)/../.. -I$(top_srcdir)/../../include
man_MANS = event.3 man_MANS = event.3

Просмотреть файл

@ -222,7 +222,7 @@ libevent_la_DEPENDENCIES = $(lt_libobj)
include_HEADERS = event.h include_HEADERS = event.h
# LAM: Make it work in a VPATH environment # LAM: Make it work in a VPATH environment
INCLUDES = -I$(top_srcdir)/compat INCLUDES = -I$(top_srcdir)/compat -I$(top_srcdir)/../.. -I$(top_srcdir)/../../include
man_MANS = event.3 man_MANS = event.3
DISTCLEANFILES = *~ DISTCLEANFILES = *~
all: config.h all: config.h

Просмотреть файл

@ -1,4 +1,4 @@
/* $OpenBSD: event.c,v 1.2 2002/06/25 15:50:15 mickey Exp $ */ /* $OpenBSD: event.c,v 1.2 2002/06/25 15:50:15 mickey Exp $ */
/* /*
* Copyright 2000-2002 Niels Provos <provos@citi.umich.edu> * Copyright 2000-2002 Niels Provos <provos@citi.umich.edu>
@ -58,10 +58,14 @@
#include "log.h" #include "log.h"
#else #else
#define LOG_DBG(x) #define LOG_DBG(x)
#define log_error(x) perror(x) #define log_error(x) perror(x)
#endif #endif
#include "event.h" #include "event.h"
#include "lam/types.h"
#include "lam/lfc/lam_object.h"
#include "lam/threads/mutex.h"
#include "lam/util/output.h"
#ifdef HAVE_SELECT #ifdef HAVE_SELECT
extern const struct lam_eventop lam_selectops; extern const struct lam_eventop lam_selectops;
@ -85,38 +89,41 @@ extern const struct lam_eventop lam_win32ops;
/* In order of preference */ /* In order of preference */
static const struct lam_eventop *lam_eventops[] = { static const struct lam_eventop *lam_eventops[] = {
#ifdef HAVE_WORKING_KQUEUE #ifdef HAVE_WORKING_KQUEUE
&lam_kqops, &lam_kqops,
#endif #endif
#ifdef HAVE_EPOLL #ifdef HAVE_EPOLL
&lam_epollops, &lam_epollops,
#endif #endif
#ifdef HAVE_RTSIG #ifdef HAVE_RTSIG
&lam_rtsigops, &lam_rtsigops,
#endif #endif
#ifdef HAVE_POLL #ifdef HAVE_POLL
&lam_pollops, &lam_pollops,
#endif #endif
#ifdef HAVE_SELECT #ifdef HAVE_SELECT
&lam_selectops, &lam_selectops,
#endif #endif
#ifdef WIN32 #ifdef WIN32
&lam_win32ops, &lam_win32ops,
#endif #endif
NULL NULL
}; };
const struct lam_eventop *lam_evsel; const struct lam_eventop *lam_evsel;
void *lam_evbase; void *lam_evbase;
/* Handle signals */ /* Handle signals */
int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */ int (*lam_event_sigcb)(void); /* Signal callback when gotsig is set */
int lam_event_gotsig; /* Set in signal handler */ int lam_event_gotsig; /* Set in signal handler */
/* Prototypes */ /* Prototypes */
static void lam_event_queue_insert(struct lam_event *, int); static void lam_event_queue_insert(struct lam_event *, int);
static void lam_event_queue_remove(struct lam_event *, int); static void lam_event_queue_remove(struct lam_event *, int);
static int lam_event_haveevents(void); static int lam_event_haveevents(void);
static void lam_event_process_active(void); static void lam_event_process_active(void);
static int lam_timeout_next(struct timeval *tv);
static void lam_timeout_correct(struct timeval *off);
static void lam_timeout_process(void);
static void lam_timeout_insert(struct lam_event *); static void lam_timeout_insert(struct lam_event *);
static RB_HEAD(lam_event_tree, lam_event) lam_timetree; static RB_HEAD(lam_event_tree, lam_event) lam_timetree;
@ -124,15 +131,16 @@ static struct lam_event_list lam_activequeue;
struct lam_event_list lam_signalqueue; struct lam_event_list lam_signalqueue;
struct lam_event_list lam_eventqueue; struct lam_event_list lam_eventqueue;
static struct timeval lam_event_tv; static struct timeval lam_event_tv;
lam_mutex_t lam_event_lock;
static int static int
compare(struct lam_event *a, struct lam_event *b) compare(struct lam_event *a, struct lam_event *b)
{ {
if (timercmp(&a->ev_timeout, &b->ev_timeout, <)) if (timercmp(&a->ev_timeout, &b->ev_timeout, <))
return (-1); return (-1);
else if (timercmp(&a->ev_timeout, &b->ev_timeout, >)) else if (timercmp(&a->ev_timeout, &b->ev_timeout, >))
return (1); return (1);
return (0); return (0);
} }
static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare); static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare);
@ -140,154 +148,160 @@ static RB_PROTOTYPE(lam_event_tree, lam_event, ev_timeout_node, compare);
static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare); static RB_GENERATE(lam_event_tree, lam_event, ev_timeout_node, compare);
void void
lam_event_init(void) lam_event_init(void)
{ {
int i; int i;
lam_event_sigcb = NULL; lam_event_sigcb = NULL;
lam_event_gotsig = 0; lam_event_gotsig = 0;
gettimeofday(&lam_event_tv, NULL); gettimeofday(&lam_event_tv, NULL);
RB_INIT(&lam_timetree); OBJ_CONSTRUCT(&lam_event_lock, lam_mutex_t);
TAILQ_INIT(&lam_eventqueue); RB_INIT(&lam_timetree);
TAILQ_INIT(&lam_activequeue); TAILQ_INIT(&lam_eventqueue);
TAILQ_INIT(&lam_signalqueue); TAILQ_INIT(&lam_activequeue);
TAILQ_INIT(&lam_signalqueue);
lam_evbase = NULL;
for (i = 0; lam_eventops[i] && !lam_evbase; i++) { lam_evbase = NULL;
lam_evsel = lam_eventops[i]; for (i = 0; lam_eventops[i] && !lam_evbase; i++) {
lam_evbase = lam_evsel->init(); lam_evsel = lam_eventops[i];
} lam_evbase = lam_evsel->init();
}
if (lam_evbase == NULL) if (lam_evbase == NULL)
errx(1, "%s: no event mechanism available", __func__); errx(1, "%s: no event mechanism available", __func__);
if (getenv("EVENT_SHOW_METHOD")) if (getenv("EVENT_SHOW_METHOD"))
fprintf(stderr, "libevent using: %s\n", lam_evsel->name); fprintf(stderr, "libevent using: %s\n", lam_evsel->name);
#if defined(USE_LOG) && defined(USE_DEBUG) #if defined(USE_LOG) && defined(USE_DEBUG)
log_to(stderr); log_to(stderr);
log_debug_cmd(LOG_MISC, 80); log_debug_cmd(LOG_MISC, 80);
#endif #endif
} }
static int static int
lam_event_haveevents(void) lam_event_haveevents(void)
{ {
return (RB_ROOT(&lam_timetree) || TAILQ_FIRST(&lam_eventqueue) || return (RB_ROOT(&lam_timetree) || TAILQ_FIRST(&lam_eventqueue) ||
TAILQ_FIRST(&lam_signalqueue) || TAILQ_FIRST(&lam_activequeue)); TAILQ_FIRST(&lam_signalqueue) || TAILQ_FIRST(&lam_activequeue));
} }
static void static void
lam_event_process_active(void) lam_event_process_active(void)
{ {
struct lam_event *ev; struct lam_event *ev;
short ncalls; short ncalls;
for (ev = TAILQ_FIRST(&lam_activequeue); ev; for (ev = TAILQ_FIRST(&lam_activequeue); ev;
ev = TAILQ_FIRST(&lam_activequeue)) { ev = TAILQ_FIRST(&lam_activequeue)) {
lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE);
/* Allows deletes to work */ /* Allows deletes to work */
ncalls = ev->ev_ncalls; ncalls = ev->ev_ncalls;
ev->ev_pncalls = &ncalls; ev->ev_pncalls = &ncalls;
while (ncalls) { while (ncalls) {
ncalls--; ncalls--;
ev->ev_ncalls = ncalls; ev->ev_ncalls = ncalls;
(*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg); lam_mutex_unlock(&lam_event_lock);
} (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
} lam_mutex_lock(&lam_event_lock);
}
}
} }
int int
lam_event_dispatch(void) lam_event_dispatch(void)
{ {
return (lam_event_loop(0)); return (lam_event_loop(0));
} }
int int
lam_event_loop(int flags) lam_event_loop(int flags)
{ {
struct timeval tv; struct timeval tv;
int res, done; int res, done;
/* Calculate the initial events that we are waiting for */ /* Calculate the initial events that we are waiting for */
if (lam_evsel->recalc(lam_evbase, 0) == -1) if (lam_evsel->recalc(lam_evbase, 0) == -1) {
return (-1); lam_output(0, "lam_event_loop: lam_evsel->recalc() failed.");
return (-1);
}
done = 0; done = 0;
while (!done) { while (!done) {
while (lam_event_gotsig) { while (lam_event_gotsig) {
lam_event_gotsig = 0; lam_event_gotsig = 0;
if (lam_event_sigcb) { if (lam_event_sigcb) {
res = (*lam_event_sigcb)(); res = (*lam_event_sigcb)();
if (res == -1) { if (res == -1) {
errno = EINTR; lam_output(0, "lam_event_loop: lam_event_sigcb() failed.");
return (-1); errno = EINTR;
} return (-1);
} }
} }
}
/* Check if time is running backwards */ /* Check if time is running backwards */
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
if (timercmp(&tv, &lam_event_tv, <)) { if (timercmp(&tv, &lam_event_tv, <)) {
struct timeval off; struct timeval off;
LOG_DBG((LOG_MISC, 10, LOG_DBG((LOG_MISC, 10,
"%s: time is running backwards, corrected", "%s: time is running backwards, corrected",
__func__)); __func__));
timersub(&lam_event_tv, &tv, &off); timersub(&lam_event_tv, &tv, &off);
lam_timeout_correct(&off); lam_timeout_correct(&off);
} }
lam_event_tv = tv; lam_event_tv = tv;
if (!(flags & LAM_EVLOOP_NONBLOCK)) if (!(flags & LAM_EVLOOP_NONBLOCK))
lam_timeout_next(&tv); lam_timeout_next(&tv);
else else
timerclear(&tv); timerclear(&tv);
/* If we have no events, we just exit */ res = lam_evsel->dispatch(lam_evbase, &tv);
if (!lam_event_haveevents()) if (res == -1) {
return (1); lam_output(0, "lam_event_loop: lam_evesel->dispatch() failed.");
return (-1);
}
res = lam_evsel->dispatch(lam_evbase, &tv); lam_timeout_process();
if (res == -1) if (TAILQ_FIRST(&lam_activequeue)) {
return (-1); lam_event_process_active();
if (flags & LAM_EVLOOP_ONCE)
done = 1;
} else if (flags & LAM_EVLOOP_NONBLOCK)
done = 1;
lam_timeout_process(); if (lam_evsel->recalc(lam_evbase, 0) == -1) {
lam_output(0, "lam_event_loop: lam_evesel->recalc() failed.");
if (TAILQ_FIRST(&lam_activequeue)) { return (-1);
lam_event_process_active(); }
if (flags & LAM_EVLOOP_ONCE) }
done = 1; lam_output(0, "lam_event_loop: done");
} else if (flags & LAM_EVLOOP_NONBLOCK) return (0);
done = 1;
if (lam_evsel->recalc(lam_evbase, 0) == -1)
return (-1);
}
return (0);
} }
void void
lam_event_set(struct lam_event *ev, int fd, short events, lam_event_set(struct lam_event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg) void (*callback)(int, short, void *), void *arg)
{ {
ev->ev_callback = callback; ev->ev_callback = callback;
ev->ev_arg = arg; ev->ev_arg = arg;
#ifdef WIN32 #ifdef WIN32
ev->ev_fd = (HANDLE)fd; ev->ev_fd = (HANDLE)fd;
ev->overlap.hEvent = ev; ev->overlap.hEvent = ev;
#else #else
ev->ev_fd = fd; ev->ev_fd = fd;
#endif #endif
ev->ev_events = events; ev->ev_events = events;
ev->ev_flags = LAM_EVLIST_INIT; ev->ev_flags = LAM_EVLIST_INIT;
ev->ev_ncalls = 0; ev->ev_ncalls = 0;
ev->ev_pncalls = NULL; ev->ev_pncalls = NULL;
} }
/* /*
@ -297,270 +311,297 @@ lam_event_set(struct lam_event *ev, int fd, short events,
int int
lam_event_pending(struct lam_event *ev, short event, struct timeval *tv) lam_event_pending(struct lam_event *ev, short event, struct timeval *tv)
{ {
int flags = 0; int flags = 0;
if (ev->ev_flags & LAM_EVLIST_INSERTED) if (ev->ev_flags & LAM_EVLIST_INSERTED)
flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)); flags |= (ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE));
if (ev->ev_flags & LAM_EVLIST_ACTIVE) if (ev->ev_flags & LAM_EVLIST_ACTIVE)
flags |= ev->ev_res; flags |= ev->ev_res;
if (ev->ev_flags & LAM_EVLIST_TIMEOUT) if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
flags |= LAM_EV_TIMEOUT; flags |= LAM_EV_TIMEOUT;
event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE); event &= (LAM_EV_TIMEOUT|LAM_EV_READ|LAM_EV_WRITE);
/* See if there is a timeout that we should report */ /* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & LAM_EV_TIMEOUT)) if (tv != NULL && (flags & event & LAM_EV_TIMEOUT))
*tv = ev->ev_timeout; *tv = ev->ev_timeout;
return (flags & event); return (flags & event);
}
int
lam_event_add_i(struct lam_event *ev, struct timeval *tv)
{
LOG_DBG((LOG_MISC, 55,
"event_add: event: %p, %s%s%scall %p",
ev,
ev->ev_events & LAM_EV_READ ? "LAM_EV_READ " : " ",
ev->ev_events & LAM_EV_WRITE ? "LAM_EV_WRITE " : " ",
tv ? "LAM_EV_TIMEOUT " : " ",
ev->ev_callback));
assert(!(ev->ev_flags & ~LAM_EVLIST_ALL));
if (tv != NULL) {
struct timeval now;
if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT);
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
if ((ev->ev_flags & LAM_EVLIST_ACTIVE) &&
(ev->ev_res & LAM_EV_TIMEOUT)) {
/* See if we are just active executing this
* event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE);
}
gettimeofday(&now, NULL);
timeradd(&now, tv, &ev->ev_timeout);
LOG_DBG((LOG_MISC, 55,
"event_add: timeout in %d seconds, call %p",
tv->tv_sec, ev->ev_callback));
lam_event_queue_insert(ev, LAM_EVLIST_TIMEOUT);
}
if ((ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)) &&
!(ev->ev_flags & (LAM_EVLIST_INSERTED|LAM_EVLIST_ACTIVE))) {
lam_event_queue_insert(ev, LAM_EVLIST_INSERTED);
return (lam_evsel->add(lam_evbase, ev));
} else if ((ev->ev_events & LAM_EV_SIGNAL) &&
!(ev->ev_flags & LAM_EVLIST_SIGNAL)) {
lam_event_queue_insert(ev, LAM_EVLIST_SIGNAL);
return (lam_evsel->add(lam_evbase, ev));
}
return (0);
} }
int int
lam_event_add(struct lam_event *ev, struct timeval *tv) lam_event_add(struct lam_event *ev, struct timeval *tv)
{ {
LOG_DBG((LOG_MISC, 55, int rc;
"event_add: event: %p, %s%s%scall %p", lam_mutex_lock(&lam_event_lock);
ev, rc = lam_event_add_i(ev, tv);
ev->ev_events & LAM_EV_READ ? "LAM_EV_READ " : " ", lam_mutex_unlock(&lam_event_lock);
ev->ev_events & LAM_EV_WRITE ? "LAM_EV_WRITE " : " ", return rc;
tv ? "LAM_EV_TIMEOUT " : " ",
ev->ev_callback));
assert(!(ev->ev_flags & ~LAM_EVLIST_ALL));
if (tv != NULL) {
struct timeval now;
if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT);
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
if ((ev->ev_flags & LAM_EVLIST_ACTIVE) &&
(ev->ev_res & LAM_EV_TIMEOUT)) {
/* See if we are just active executing this
* event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE);
}
gettimeofday(&now, NULL);
timeradd(&now, tv, &ev->ev_timeout);
LOG_DBG((LOG_MISC, 55,
"event_add: timeout in %d seconds, call %p",
tv->tv_sec, ev->ev_callback));
lam_event_queue_insert(ev, LAM_EVLIST_TIMEOUT);
}
if ((ev->ev_events & (LAM_EV_READ|LAM_EV_WRITE)) &&
!(ev->ev_flags & (LAM_EVLIST_INSERTED|LAM_EVLIST_ACTIVE))) {
lam_event_queue_insert(ev, LAM_EVLIST_INSERTED);
return (lam_evsel->add(lam_evbase, ev));
} else if ((ev->ev_events & LAM_EV_SIGNAL) &&
!(ev->ev_flags & LAM_EVLIST_SIGNAL)) {
lam_event_queue_insert(ev, LAM_EVLIST_SIGNAL);
return (lam_evsel->add(lam_evbase, ev));
}
return (0);
} }
int int lam_event_del_i(struct lam_event *ev)
lam_event_del(struct lam_event *ev)
{ {
LOG_DBG((LOG_MISC, 80, "event_del: %p, callback %p", LOG_DBG((LOG_MISC, 80, "event_del: %p, callback %p",
ev, ev->ev_callback)); ev, ev->ev_callback));
assert(!(ev->ev_flags & ~LAM_EVLIST_ALL)); assert(!(ev->ev_flags & ~LAM_EVLIST_ALL));
/* See if we are just active executing this event in a loop */ /* See if we are just active executing this event in a loop */
if (ev->ev_ncalls && ev->ev_pncalls) { if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */ /* Abort loop */
*ev->ev_pncalls = 0; *ev->ev_pncalls = 0;
} }
if (ev->ev_flags & LAM_EVLIST_TIMEOUT) if (ev->ev_flags & LAM_EVLIST_TIMEOUT)
lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT);
if (ev->ev_flags & LAM_EVLIST_ACTIVE) if (ev->ev_flags & LAM_EVLIST_ACTIVE)
lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE); lam_event_queue_remove(ev, LAM_EVLIST_ACTIVE);
if (ev->ev_flags & LAM_EVLIST_INSERTED) { if (ev->ev_flags & LAM_EVLIST_INSERTED) {
lam_event_queue_remove(ev, LAM_EVLIST_INSERTED); lam_event_queue_remove(ev, LAM_EVLIST_INSERTED);
return (lam_evsel->del(lam_evbase, ev)); return (lam_evsel->del(lam_evbase, ev));
} else if (ev->ev_flags & LAM_EVLIST_SIGNAL) { } else if (ev->ev_flags & LAM_EVLIST_SIGNAL) {
lam_event_queue_remove(ev, LAM_EVLIST_SIGNAL); lam_event_queue_remove(ev, LAM_EVLIST_SIGNAL);
return (lam_evsel->del(lam_evbase, ev)); return (lam_evsel->del(lam_evbase, ev));
} }
return (0);
}
return (0); int lam_event_del(struct lam_event *ev)
{
int rc;
lam_mutex_lock(&lam_event_lock);
rc = lam_event_del_i(ev);
lam_mutex_unlock(&lam_event_lock);
return rc;
} }
void void
lam_event_active(struct lam_event *ev, int res, short ncalls) lam_event_active_i(struct lam_event *ev, int res, short ncalls)
{ {
/* We get different kinds of events, add them together */ /* We get different kinds of events, add them together */
if (ev->ev_flags & LAM_EVLIST_ACTIVE) { if (ev->ev_flags & LAM_EVLIST_ACTIVE) {
ev->ev_res |= res; ev->ev_res |= res;
return; return;
} }
ev->ev_res = res; ev->ev_res = res;
ev->ev_ncalls = ncalls; ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL; ev->ev_pncalls = NULL;
lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE); lam_event_queue_insert(ev, LAM_EVLIST_ACTIVE);
} }
int
void
lam_event_active(struct lam_event* ev, int res, short ncalls)
{
lam_mutex_lock(&lam_event_lock);
lam_event_active_i(ev, res, ncalls);
lam_mutex_unlock(&lam_event_lock);
}
static int
lam_timeout_next(struct timeval *tv) lam_timeout_next(struct timeval *tv)
{ {
struct timeval dflt = LAM_TIMEOUT_DEFAULT; struct timeval dflt = LAM_TIMEOUT_DEFAULT;
struct timeval now; struct timeval now;
struct lam_event *ev; struct lam_event *ev;
if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) { if ((ev = RB_MIN(lam_event_tree, &lam_timetree)) == NULL) {
*tv = dflt; *tv = dflt;
return (0); return (0);
} }
if (gettimeofday(&now, NULL) == -1) if (gettimeofday(&now, NULL) == -1)
return (-1); return (-1);
if (timercmp(&ev->ev_timeout, &now, <=)) { if (timercmp(&ev->ev_timeout, &now, <=)) {
timerclear(tv); timerclear(tv);
return (0); return (0);
} }
timersub(&ev->ev_timeout, &now, tv); timersub(&ev->ev_timeout, &now, tv);
assert(tv->tv_sec >= 0); assert(tv->tv_sec >= 0);
assert(tv->tv_usec >= 0); assert(tv->tv_usec >= 0);
LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec)); LOG_DBG((LOG_MISC, 60, "timeout_next: in %d seconds", tv->tv_sec));
return (0); return (0);
} }
void
static void
lam_timeout_correct(struct timeval *off) lam_timeout_correct(struct timeval *off)
{ {
struct lam_event *ev; struct lam_event *ev;
/* We can modify the key element of the node without destroying /* We can modify the key element of the node without destroying
* the key, beause we apply it to all in the right order. * the key, beause we apply it to all in the right order.
*/ */
RB_FOREACH(ev, lam_event_tree, &lam_timetree) RB_FOREACH(ev, lam_event_tree, &lam_timetree)
timersub(&ev->ev_timeout, off, &ev->ev_timeout); timersub(&ev->ev_timeout, off, &ev->ev_timeout);
} }
void
static void
lam_timeout_process(void) lam_timeout_process(void)
{ {
struct timeval now; struct timeval now;
struct lam_event *ev, *next; struct lam_event *ev, *next;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
for (ev = RB_MIN(lam_event_tree, &lam_timetree); ev; ev = next) { for (ev = RB_MIN(lam_event_tree, &lam_timetree); ev; ev = next) {
if (timercmp(&ev->ev_timeout, &now, >)) if (timercmp(&ev->ev_timeout, &now, >))
break; break;
next = RB_NEXT(lam_event_tree, &lam_timetree, ev); next = RB_NEXT(lam_event_tree, &lam_timetree, ev);
lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT); lam_event_queue_remove(ev, LAM_EVLIST_TIMEOUT);
/* delete this event from the I/O queues */ /* delete this event from the I/O queues */
lam_event_del(ev); lam_event_del_i(ev);
LOG_DBG((LOG_MISC, 60, "timeout_process: call %p", LOG_DBG((LOG_MISC, 60, "timeout_process: call %p",
ev->ev_callback)); ev->ev_callback));
lam_event_active(ev, LAM_EV_TIMEOUT, 1); lam_event_active_i(ev, LAM_EV_TIMEOUT, 1);
} }
} }
static void static void
lam_timeout_insert(struct lam_event *ev) lam_timeout_insert(struct lam_event *ev)
{ {
struct lam_event *tmp; struct lam_event *tmp;
tmp = RB_FIND(lam_event_tree, &lam_timetree, ev); tmp = RB_FIND(lam_event_tree, &lam_timetree, ev);
if (tmp != NULL) { if (tmp != NULL) {
struct timeval tv; struct timeval tv;
struct timeval add = {0,1}; struct timeval add = {0,1};
/* Find unique time */ /* Find unique time */
tv = ev->ev_timeout; tv = ev->ev_timeout;
do { do {
timeradd(&tv, &add, &tv); timeradd(&tv, &add, &tv);
tmp = RB_NEXT(lam_event_tree, &lam_timetree, tmp); tmp = RB_NEXT(lam_event_tree, &lam_timetree, tmp);
} while (tmp != NULL && timercmp(&tmp->ev_timeout, &tv, ==)); } while (tmp != NULL && timercmp(&tmp->ev_timeout, &tv, ==));
ev->ev_timeout = tv; ev->ev_timeout = tv;
} }
tmp = RB_INSERT(lam_event_tree, &lam_timetree, ev); tmp = RB_INSERT(lam_event_tree, &lam_timetree, ev);
assert(tmp == NULL); assert(tmp == NULL);
} }
static void static void
lam_event_queue_remove(struct lam_event *ev, int queue) lam_event_queue_remove(struct lam_event *ev, int queue)
{ {
if (!(ev->ev_flags & queue)) if (!(ev->ev_flags & queue))
errx(1, "%s: %p(fd %d) not on queue %x", __func__, errx(1, "%s: %p(fd %d) not on queue %x", __func__,
ev, ev->ev_fd, queue); ev, ev->ev_fd, queue);
ev->ev_flags &= ~queue; ev->ev_flags &= ~queue;
switch (queue) { switch (queue) {
case LAM_EVLIST_ACTIVE: case LAM_EVLIST_ACTIVE:
TAILQ_REMOVE(&lam_activequeue, ev, ev_active_next); TAILQ_REMOVE(&lam_activequeue, ev, ev_active_next);
break; break;
case LAM_EVLIST_SIGNAL: case LAM_EVLIST_SIGNAL:
TAILQ_REMOVE(&lam_signalqueue, ev, ev_signal_next); TAILQ_REMOVE(&lam_signalqueue, ev, ev_signal_next);
break; break;
case LAM_EVLIST_TIMEOUT: case LAM_EVLIST_TIMEOUT:
RB_REMOVE(lam_event_tree, &lam_timetree, ev); RB_REMOVE(lam_event_tree, &lam_timetree, ev);
break; break;
case LAM_EVLIST_INSERTED: case LAM_EVLIST_INSERTED:
TAILQ_REMOVE(&lam_eventqueue, ev, ev_next); TAILQ_REMOVE(&lam_eventqueue, ev, ev_next);
break; break;
default: default:
errx(1, "%s: unknown queue %x", __func__, queue); errx(1, "%s: unknown queue %x", __func__, queue);
} }
} }
static void static void
lam_event_queue_insert(struct lam_event *ev, int queue) lam_event_queue_insert(struct lam_event *ev, int queue)
{ {
if (ev->ev_flags & queue) if (ev->ev_flags & queue)
errx(1, "%s: %p(fd %d) already on queue %x", __func__, errx(1, "%s: %p(fd %d) already on queue %x", __func__,
ev, ev->ev_fd, queue); ev, ev->ev_fd, queue);
ev->ev_flags |= queue; ev->ev_flags |= queue;
switch (queue) { switch (queue) {
case LAM_EVLIST_ACTIVE: case LAM_EVLIST_ACTIVE:
TAILQ_INSERT_TAIL(&lam_activequeue, ev, ev_active_next); TAILQ_INSERT_TAIL(&lam_activequeue, ev, ev_active_next);
break; break;
case LAM_EVLIST_SIGNAL: case LAM_EVLIST_SIGNAL:
TAILQ_INSERT_TAIL(&lam_signalqueue, ev, ev_signal_next); TAILQ_INSERT_TAIL(&lam_signalqueue, ev, ev_signal_next);
break; break;
case LAM_EVLIST_TIMEOUT: case LAM_EVLIST_TIMEOUT:
lam_timeout_insert(ev); lam_timeout_insert(ev);
break; break;
case LAM_EVLIST_INSERTED: case LAM_EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&lam_eventqueue, ev, ev_next); TAILQ_INSERT_TAIL(&lam_eventqueue, ev, ev_next);
break; break;
default: default:
errx(1, "%s: unknown queue %x", __func__, queue); errx(1, "%s: unknown queue %x", __func__, queue);
} }
} }

Просмотреть файл

@ -129,18 +129,14 @@ struct lam_eventop {
int (*dispatch)(void *, struct timeval *); int (*dispatch)(void *, struct timeval *);
}; };
#define LAM_TIMEOUT_DEFAULT {5, 0} #define LAM_TIMEOUT_DEFAULT {1, 0}
void lam_event_init(void);
int lam_event_dispatch(void);
#define LAM_EVLOOP_ONCE 0x01 #define LAM_EVLOOP_ONCE 0x01
#define LAM_EVLOOP_NONBLOCK 0x02 #define LAM_EVLOOP_NONBLOCK 0x02
void lam_event_init(void);
int lam_event_dispatch(void);
int lam_event_loop(int); int lam_event_loop(int);
int lam_timeout_next(struct timeval *);
void lam_timeout_correct(struct timeval *);
void lam_timeout_process(void);
#define lam_evtimer_add(ev, tv) lam_event_add(ev, tv) #define lam_evtimer_add(ev, tv) lam_event_add(ev, tv)
#define lam_evtimer_set(ev, cb, arg) lam_event_set(ev, -1, 0, cb, arg) #define lam_evtimer_set(ev, cb, arg) lam_event_set(ev, -1, 0, cb, arg)
@ -164,8 +160,8 @@ void lam_timeout_process(void);
void lam_event_set(struct lam_event *, int, short, void (*)(int, short, void *), void *); void lam_event_set(struct lam_event *, int, short, void (*)(int, short, void *), void *);
int lam_event_add(struct lam_event *, struct timeval *); int lam_event_add(struct lam_event *, struct timeval *);
int lam_event_del(struct lam_event *); int lam_event_del(struct lam_event *);
void lam_event_active(struct lam_event *, int, short);
int lam_event_pending(struct lam_event *, short, struct timeval *); int lam_event_pending(struct lam_event *, short, struct timeval *);
void lam_event_active(struct lam_event *, int, short);
#ifdef WIN32 #ifdef WIN32
#define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE) #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT && (ev)->ev_fd != INVALID_HANDLE_VALUE)
@ -173,6 +169,11 @@ int lam_event_pending(struct lam_event *, short, struct timeval *);
#define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT) #define lam_event_initialized(ev) ((ev)->ev_flags & LAM_EVLIST_INIT)
#endif #endif
/* for internal use only */
int lam_event_add_i(struct lam_event *, struct timeval *);
int lam_event_del_i(struct lam_event *);
void lam_event_active_i(struct lam_event *, int, short);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

Просмотреть файл

@ -248,10 +248,10 @@ kq_dispatch(void *arg, struct timeval *tv)
if (!(ev->ev_events & LAM_EV_PERSIST)) { if (!(ev->ev_events & LAM_EV_PERSIST)) {
ev->ev_flags &= ~EVLIST_X_KQINKERNEL; ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
lam_event_del(ev); lam_event_del_i(ev);
} }
lam_event_active(ev, which, lam_event_active_i(ev, which,
ev->ev_events & LAM_EV_SIGNAL ? events[i].data : 1); ev->ev_events & LAM_EV_SIGNAL ? events[i].data : 1);
} }

Просмотреть файл

@ -55,9 +55,12 @@
#include "event.h" #include "event.h"
#include "evsignal.h" #include "evsignal.h"
#include "lam/threads/mutex.h"
extern struct lam_event_list lam_eventqueue; extern struct lam_event_list lam_eventqueue;
extern volatile sig_atomic_t lam_evsignal_caught; extern volatile sig_atomic_t lam_evsignal_caught;
extern lam_mutex_t lam_event_lock;
struct pollop { struct pollop {
int event_count; /* Highest number alloc */ int event_count; /* Highest number alloc */
@ -166,7 +169,9 @@ poll_dispatch(void *arg, struct timeval *tv)
return (-1); return (-1);
sec = tv->tv_sec * 1000 + tv->tv_usec / 1000; sec = tv->tv_sec * 1000 + tv->tv_usec / 1000;
lam_mutex_unlock(&lam_event_lock);
res = poll(pop->event_set, nfds, sec); res = poll(pop->event_set, nfds, sec);
lam_mutex_lock(&lam_event_lock);
if (lam_evsignal_recalc(&pop->evsigmask) == -1) if (lam_evsignal_recalc(&pop->evsigmask) == -1)
return (-1); return (-1);
@ -209,8 +214,8 @@ poll_dispatch(void *arg, struct timeval *tv)
if (res) { if (res) {
if (!(ev->ev_events & LAM_EV_PERSIST)) if (!(ev->ev_events & LAM_EV_PERSIST))
lam_event_del(ev); lam_event_del_i(ev);
lam_event_active(ev, res, 1); lam_event_active_i(ev, res, 1);
} }
} }

Просмотреть файл

@ -197,8 +197,8 @@ select_dispatch(void *arg, struct timeval *tv)
if (res) { if (res) {
if (!(ev->ev_events & LAM_EV_PERSIST)) if (!(ev->ev_events & LAM_EV_PERSIST))
lam_event_del(ev); lam_event_del_i(ev);
lam_event_active(ev, res, 1); lam_event_active_i(ev, res, 1);
} else if (ev->ev_fd > maxfd) } else if (ev->ev_fd > maxfd)
maxfd = ev->ev_fd; maxfd = ev->ev_fd;
} }

Просмотреть файл

@ -147,8 +147,8 @@ lam_evsignal_process(void)
ncalls = lam_evsigcaught[LAM_EVENT_SIGNAL(ev)]; ncalls = lam_evsigcaught[LAM_EVENT_SIGNAL(ev)];
if (ncalls) { if (ncalls) {
if (!(ev->ev_events & LAM_EV_PERSIST)) if (!(ev->ev_events & LAM_EV_PERSIST))
lam_event_del(ev); lam_event_del_i(ev);
lam_event_active(ev, LAM_EV_SIGNAL, ncalls); lam_event_active_i(ev, LAM_EV_SIGNAL, ncalls);
} }
} }

Просмотреть файл

@ -14,11 +14,9 @@ lam_class_t mca_pml_base_request_t_class = {
void mca_pml_base_request_construct(mca_pml_base_request_t* req) void mca_pml_base_request_construct(mca_pml_base_request_t* req)
{ {
OBJ_CONSTRUCT(&req->req_lock, lam_mutex_t);
} }
void mca_pml_base_request_destruct(mca_pml_base_request_t* req) void mca_pml_base_request_destruct(mca_pml_base_request_t* req)
{ {
OBJ_DESTRUCT(&req->req_lock);
} }

Просмотреть файл

@ -13,16 +13,7 @@
extern lam_class_t mca_pml_base_request_t_class; extern lam_class_t mca_pml_base_request_t_class;
/* MPI request status */ /* request type */
typedef enum {
MCA_PML_STATUS_INVALID = 1,
MCA_PML_STATUS_INITED = 2,
MCA_PML_STATUS_INCOMPLETE = 3,
MCA_PML_STATUS_COMPLETE = 4,
MCA_PML_STATUS_INACTIVE = 5
} mca_pml_base_request_status_t;
typedef enum { typedef enum {
MCA_PML_REQUEST_SEND, MCA_PML_REQUEST_SEND,
MCA_PML_REQUEST_RECV MCA_PML_REQUEST_RECV
@ -30,7 +21,7 @@ typedef enum {
/* MPI pml (point-to-point) request */ /* MPI pml (point-to-point) request */
typedef struct { struct mca_pml_base_request_t {
/* base request */ /* base request */
lam_request_t super; lam_request_t super;
/* pointer to application buffer */ /* pointer to application buffer */
@ -42,24 +33,21 @@ typedef struct {
/* user defined tag */ /* user defined tag */
int32_t req_tag; int32_t req_tag;
/* communicator pointer */ /* communicator pointer */
lam_communicator_t *req_communicator; lam_communicator_t *req_comm;
/* pointer to data type */ /* pointer to data type */
lam_datatype_t *req_datatype; lam_datatype_t *req_datatype;
/* MPI request type - used for test */ /* MPI request type - used for test */
mca_pml_base_request_type_t req_type; mca_pml_base_request_type_t req_type;
/* MPI request status */
mca_pml_base_request_status_t req_status;
/* completion status */ /* completion status */
lam_status_public_t req_status_public; lam_status_public_t req_status;
/* flag indicating if the this is a persistent request */ /* flag indicating if the this is a persistent request */
bool req_persistent; bool req_persistent;
/* flag indicating if MPI is done with this request */ /* flag indicating if MPI is done with this request */
bool req_mpi_done; bool req_mpi_done;
/* flag indicating if the pt-2-pt layer is done with this request */ /* flag indicating if the pt-2-pt layer is done with this request */
bool req_pml_layer_done; bool req_pml_done;
/* lock to update request status */ };
lam_mutex_t req_lock; typedef struct mca_pml_base_request_t mca_pml_base_request_t;
} mca_pml_base_request_t;
void mca_pml_base_request_construct(mca_pml_base_request_t*); void mca_pml_base_request_construct(mca_pml_base_request_t*);

Просмотреть файл

@ -10,6 +10,7 @@
#ifndef MCA_PML_TEG_H #ifndef MCA_PML_TEG_H
#define MCA_PML_TEG_H #define MCA_PML_TEG_H
#include "lam/threads/thread.h"
#include "lam/threads/condition.h" #include "lam/threads/condition.h"
#include "lam/mem/free_list.h" #include "lam/mem/free_list.h"
#include "lam/util/cmd_line.h" #include "lam/util/cmd_line.h"
@ -34,7 +35,8 @@ struct mca_pml_teg_t {
lam_list_t teg_procs; lam_list_t teg_procs;
lam_mutex_t teg_lock; lam_mutex_t teg_lock;
lam_condition_t teg_condition; lam_condition_t teg_condition;
int teg_reqs_waiting; int teg_req_waiting;
lam_thread_t teg_thread;
int teg_free_list_num; /* initial size of free list */ int teg_free_list_num; /* initial size of free list */
int teg_free_list_max; /* maximum size of free list */ int teg_free_list_max; /* maximum size of free list */

Просмотреть файл

@ -12,6 +12,11 @@ int mca_pml_teg_irecv_init(
struct lam_request_t **request) struct lam_request_t **request)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_irecv_init: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid);
#endif
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;
@ -40,10 +45,15 @@ int mca_pml_teg_irecv(
struct lam_request_t **request) struct lam_request_t **request)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_irecv: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid);
#endif
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;
mca_ptl_base_recv_request_reinit( mca_ptl_base_recv_request_reinit(
recvreq, recvreq,
addr, addr,
@ -73,6 +83,11 @@ int mca_pml_teg_recv(
lam_status_public_t* status) lam_status_public_t* status)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_recv: src=%d tag=%d comm=%d\n", src, tag, comm->c_contextid);
#endif
mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc); mca_ptl_base_recv_request_t *recvreq = mca_pml_teg_recv_request_alloc(&rc);
if(NULL == recvreq) if(NULL == recvreq)
return rc; return rc;

Просмотреть файл

@ -19,6 +19,11 @@ int mca_pml_teg_isend_init(
lam_request_t **request) lam_request_t **request)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_isend_init: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid);
#endif
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
@ -51,10 +56,14 @@ int mca_pml_teg_isend(
lam_request_t **request) lam_request_t **request)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_isend: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid);
#endif
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
mca_ptl_base_send_request_reinit( mca_ptl_base_send_request_reinit(
sendreq, sendreq,
buf, buf,
@ -84,6 +93,11 @@ int mca_pml_teg_send(
lam_communicator_t* comm) lam_communicator_t* comm)
{ {
int rc; int rc;
#if 0
lam_output(0, "mca_pml_teg_send: dst=%d tag=%d comm=%d\n", dst, tag, comm->c_contextid);
#endif
mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc); mca_ptl_base_send_request_t* sendreq = mca_pml_teg_send_request_alloc(comm,dst,&rc);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
@ -102,6 +116,6 @@ int mca_pml_teg_send(
if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS) if((rc = mca_pml_teg_send_request_start(sendreq)) != LAM_SUCCESS)
return rc; return rc;
return mca_pml_teg_wait(&sendreq->super, NULL); return mca_pml_teg_wait((lam_request_t*)sendreq, NULL);
} }

Просмотреть файл

@ -2,6 +2,7 @@
* $HEADER$ * $HEADER$
*/ */
#include "lam/event/event.h"
#include "mpi.h" #include "mpi.h"
#include "mca/mpi/pml/pml.h" #include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
@ -85,13 +86,20 @@ int mca_pml_teg_module_close(void)
} }
static void* mca_pml_teg_thread(lam_object_t* thread)
{
lam_event_dispatch();
return NULL;
}
mca_pml_t* mca_pml_teg_module_init(int* priority, mca_pml_t* mca_pml_teg_module_init(int* priority,
bool *allow_multi_user_threads, bool *allow_multi_user_threads,
bool *have_hidden_threads) bool *have_hidden_threads)
{ {
*priority = 0; *priority = 0;
*allow_multi_user_threads = true; *allow_multi_user_threads = true;
*have_hidden_threads = false; *have_hidden_threads = true;
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t); OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, lam_mutex_t);
mca_pml_teg.teg_ptl_modules = NULL; mca_pml_teg.teg_ptl_modules = NULL;
@ -108,6 +116,11 @@ mca_pml_t* mca_pml_teg_module_init(int* priority,
mca_pml_teg.teg_free_list_inc, mca_pml_teg.teg_free_list_inc,
NULL); NULL);
OBJ_CONSTRUCT(&mca_pml_teg.teg_thread, lam_thread_t);
mca_pml_teg.teg_thread.t_run = mca_pml_teg_thread;
if(lam_thread_start(&mca_pml_teg.teg_thread) != LAM_SUCCESS)
return NULL;
mca_pml_teg.teg_recv_sequence = 0; mca_pml_teg.teg_recv_sequence = 0;
return &mca_pml_teg.super; return &mca_pml_teg.super;
} }

Просмотреть файл

@ -2,9 +2,17 @@
void mca_pml_teg_recv_request_progress( void mca_pml_teg_recv_request_progress(
mca_ptl_base_recv_request_t* request, mca_ptl_base_recv_request_t* req,
mca_ptl_base_recv_frag_t* frag) mca_ptl_base_recv_frag_t* frag)
{ {
lam_mutex_lock(&mca_pml_teg.teg_lock);
req->req_bytes_recvd += frag->super.frag_size;
if (req->req_bytes_recvd >= req->super.req_status.MPI_LENGTH) {
req->super.req_mpi_done = true;
if(mca_pml_teg.teg_req_waiting) {
lam_condition_broadcast(&mca_pml_teg.teg_condition);
}
}
lam_mutex_unlock(&mca_pml_teg.teg_lock);
} }

Просмотреть файл

@ -21,7 +21,6 @@ static inline mca_ptl_base_recv_request_t* mca_pml_teg_recv_request_alloc(int *r
static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t* request) static inline void mca_pml_teg_recv_request_return(mca_ptl_base_recv_request_t* request)
{ {
request->super.req_status = MCA_PML_STATUS_INVALID;
lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request); lam_free_list_return(&mca_pml_teg.teg_recv_requests, (lam_list_item_t*)request);
} }
@ -32,8 +31,7 @@ static inline int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* re
{ {
THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock, THREAD_SCOPED_LOCK(&mca_pml_teg.teg_lock,
(req->req_sequence = mca_pml_teg.teg_recv_sequence++)); (req->req_sequence = mca_pml_teg.teg_recv_sequence++));
req->super.req_status = MCA_PML_STATUS_INCOMPLETE;
if(req->super.req_peer == LAM_ANY_TAG) { if(req->super.req_peer == LAM_ANY_TAG) {
mca_ptl_base_recv_request_match_wild(req); mca_ptl_base_recv_request_match_wild(req);
} else { } else {

Просмотреть файл

@ -81,5 +81,14 @@ void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* req, mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag) mca_ptl_base_send_frag_t* frag)
{ {
lam_mutex_lock(&mca_pml_teg.teg_lock);
req->req_bytes_sent += frag->super.frag_size;
if (req->req_bytes_sent >= req->super.req_length) {
req->super.req_mpi_done = true;
if(mca_pml_teg.teg_req_waiting) {
lam_condition_broadcast(&mca_pml_teg.teg_condition);
}
}
lam_mutex_unlock(&mca_pml_teg.teg_lock);
} }

Просмотреть файл

@ -36,7 +36,6 @@ static inline mca_ptl_base_send_request_t* mca_pml_teg_send_request_alloc(
static inline void mca_ptl_base_send_request_return( static inline void mca_ptl_base_send_request_return(
mca_ptl_base_send_request_t* request) mca_ptl_base_send_request_t* request)
{ {
request->super.req_status = MCA_PML_STATUS_INVALID;
request->req_owner->ptl_request_return(request->req_owner, request); request->req_owner->ptl_request_return(request->req_owner, request);
} }
@ -48,8 +47,8 @@ static inline int mca_pml_teg_send_request_start(
int rc; int rc;
// start the first fragment // start the first fragment
if(req->req_length < first_fragment_size) if(req->super.req_length < first_fragment_size)
first_fragment_size = req->req_length; first_fragment_size = req->super.req_length;
rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size); rc = ptl->ptl_send(ptl, req->req_peer, req, first_fragment_size);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;

Просмотреть файл

@ -1,22 +1,22 @@
#include "pml_teg.h" #include "pml_teg.h"
#include "mca/mpi/pml/base/pml_base_request.h"
int mca_pml_teg_wait( int mca_pml_teg_wait(
lam_request_t* request, lam_request_t* request,
lam_status_public_t* status) lam_status_public_t* status)
{ {
#if 0
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)request; mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)request;
if(pml_request->req_mpi_done == false) { if(pml_request->req_mpi_done == false) {
lam_mutex_lock(&mca_pml_teg.teg_lock); lam_mutex_lock(&mca_pml_teg.teg_lock);
mca_pml_teg.teg_req_waiting++; mca_pml_teg.teg_req_waiting++;
while(request->req_mpi_done == false) while(pml_request->req_mpi_done == false)
lam_condition_wait(&mca_pml_teg.teg_condition); lam_condition_wait(&mca_pml_teg.teg_condition, &mca_pml_teg.teg_lock);
mca_pml_teg.teg_req_waiting--; mca_pml_teg.teg_req_waiting--;
lam_mutex_unlock(&mca_pml_teg.teg_lock); lam_mutex_unlock(&mca_pml_teg.teg_lock);
} }
*status = request->req_status; if (status != NULL)
#endif *status = pml_request->req_status;
return LAM_SUCCESS; return LAM_SUCCESS;
} }

Просмотреть файл

@ -43,46 +43,58 @@ int mca_pml_ptl_comm_init_size(mca_pml_comm_t* comm, size_t size)
comm->c_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); comm->c_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size);
if(NULL == comm->c_msg_seq) if(NULL == comm->c_msg_seq)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
memset(comm->c_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size);
/* send message sequence-number support - receiver side */ /* send message sequence-number support - receiver side */
comm->c_next_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); comm->c_next_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size);
if(NULL == comm->c_next_msg_seq) if(NULL == comm->c_next_msg_seq)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size);
/* matching lock */ /* matching lock */
comm->c_matching_lock = malloc(sizeof(lam_mutex_t) * size); comm->c_matching_lock = malloc(sizeof(lam_mutex_t) * size);
if(NULL == comm->c_matching_lock) if(NULL == comm->c_matching_lock)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->c_matching_lock+i, lam_mutex_t); lam_mutex_t *object = comm->c_matching_lock+i;
OBJ_CONSTRUCT(object, lam_mutex_t);
}
/* unexpected fragments queues */ /* unexpected fragments queues */
comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size); comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size);
if(NULL == comm->c_unexpected_frags) if(NULL == comm->c_unexpected_frags)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->c_unexpected_frags+i, lam_list_t); lam_list_t* object = comm->c_unexpected_frags+i;
OBJ_CONSTRUCT(object, lam_list_t);
}
/* these locks are needed to avoid a probe interfering with a match */ /* these locks are needed to avoid a probe interfering with a match */
comm->c_unexpected_frags_lock = malloc(sizeof(lam_mutex_t) * size); comm->c_unexpected_frags_lock = malloc(sizeof(lam_mutex_t) * size);
if(NULL == comm->c_unexpected_frags_lock) if(NULL == comm->c_unexpected_frags_lock)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->c_unexpected_frags_lock+i, lam_mutex_t); lam_mutex_t* object = comm->c_unexpected_frags_lock+i;
OBJ_CONSTRUCT(object, lam_mutex_t);
}
/* out-of-order fragments queues */ /* out-of-order fragments queues */
comm->c_frags_cant_match = malloc(sizeof(lam_list_t) * size); comm->c_frags_cant_match = malloc(sizeof(lam_list_t) * size);
if(NULL == comm->c_frags_cant_match) if(NULL == comm->c_frags_cant_match)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->c_frags_cant_match+i, lam_list_t); lam_list_t* object = comm->c_frags_cant_match+i;
OBJ_CONSTRUCT(object, lam_list_t);
}
/* queues of unmatched specific (source process specified) receives */ /* queues of unmatched specific (source process specified) receives */
comm->c_specific_receives = malloc(sizeof(lam_list_t) * size); comm->c_specific_receives = malloc(sizeof(lam_list_t) * size);
if(NULL == comm->c_specific_receives) if(NULL == comm->c_specific_receives)
return LAM_ERR_OUT_OF_RESOURCE; return LAM_ERR_OUT_OF_RESOURCE;
for(i=0; i<size; i++) for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->c_specific_receives+i, lam_list_t); lam_list_t *object = comm->c_specific_receives+i;
OBJ_CONSTRUCT(object, lam_list_t);
}
return LAM_SUCCESS; return LAM_SUCCESS;
} }

Просмотреть файл

@ -26,8 +26,8 @@ struct mca_pml_comm_t {
/* unexpected fragments queues */ /* unexpected fragments queues */
lam_list_t *c_unexpected_frags; lam_list_t *c_unexpected_frags;
/* these locks are needed to avoid a probe interfering with a match
*/ /* these locks are needed to avoid a probe interfering with a match */
lam_mutex_t *c_unexpected_frags_lock; lam_mutex_t *c_unexpected_frags_lock;
/* out-of-order fragments queues */ /* out-of-order fragments queues */
@ -47,7 +47,16 @@ struct mca_pml_comm_t {
typedef struct mca_pml_comm_t mca_pml_comm_t; typedef struct mca_pml_comm_t mca_pml_comm_t;
extern int mca_pml_ptl_comm_init_size(struct mca_pml_comm_t*, size_t); extern int mca_pml_ptl_comm_init_size(struct mca_pml_comm_t*, size_t);
static inline mca_ptl_base_sequence_t mca_pml_ptl_comm_send_sequence(struct mca_pml_comm_t* comm, int dst)
{
mca_ptl_base_sequence_t sequence;
lam_mutex_lock(comm->c_matching_lock+dst);
sequence = comm->c_msg_seq[dst]++;
lam_mutex_unlock(comm->c_matching_lock+dst);
return sequence;
}
#endif #endif

Просмотреть файл

@ -61,11 +61,11 @@ struct mca_ptl_base_match_header_t {
/* communicator index */ /* communicator index */
uint32_t hdr_contextid; uint32_t hdr_contextid;
/* source rank */ /* source rank */
int32_t hdr_src_rank; int32_t hdr_src;
/* destination rank */ /* destination rank */
int32_t hdr_dst_rank; int32_t hdr_dst;
/* user tag */ /* user tag */
int32_t hdr_user_tag; int32_t hdr_tag;
/* message length */ /* message length */
uint32_t hdr_msg_length; uint32_t hdr_msg_length;
/* message sequence number */ /* message sequence number */

Просмотреть файл

@ -95,7 +95,7 @@ int mca_ptl_base_match(mca_ptl_base_match_header_t *frag_header,
frag_msg_seq = frag_header->hdr_msg_seq; frag_msg_seq = frag_header->hdr_msg_seq;
/* get fragment communicator source rank */ /* get fragment communicator source rank */
frag_src = frag_header->hdr_src_rank; frag_src = frag_header->hdr_src;
/* get next expected message sequence number - if threaded /* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing * run, lock to make sure that if another thread is processing
@ -218,7 +218,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_receives_for_match
* look only at "specific" receives, or "wild" receives, * look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time. * or if we need to traverse both sets at the same time.
*/ */
frag_src = frag_header->hdr_src_rank; frag_src = frag_header->hdr_src;
if (lam_list_get_size((pml_comm->c_specific_receives)+frag_src) == 0 ){ if (lam_list_get_size((pml_comm->c_specific_receives)+frag_src) == 0 ){
/* /*
@ -268,11 +268,11 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
{ {
/* local parameters */ /* local parameters */
mca_ptl_base_recv_request_t *return_match, *wild_recv; mca_ptl_base_recv_request_t *return_match, *wild_recv;
int frag_user_tag,recv_user_tag; int frag_tag,recv_tag;
/* initialization */ /* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL; return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_user_tag=frag_header->hdr_user_tag; frag_tag=frag_header->hdr_tag;
/* /*
* Loop over the wild irecvs - no need to lock, the upper level * Loop over the wild irecvs - no need to lock, the upper level
@ -286,14 +286,14 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
wild_recv = (mca_ptl_base_recv_request_t *) wild_recv = (mca_ptl_base_recv_request_t *)
((lam_list_item_t *)wild_recv)->lam_list_next) { ((lam_list_item_t *)wild_recv)->lam_list_next) {
recv_user_tag = wild_recv->super.req_tag; recv_tag = wild_recv->super.req_tag;
if ( if (
/* exact tag match */ /* exact tag match */
(frag_user_tag == recv_user_tag) || (frag_tag == recv_tag) ||
/* wild tag match - negative tags (except for /* wild tag match - negative tags (except for
* LAM_ANY_TAG) are reserved for internal use, and will * LAM_ANY_TAG) are reserved for internal use, and will
* not be matched with LAM_ANY_TAG */ * not be matched with LAM_ANY_TAG */
( (recv_user_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) ( (recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) )
{ {
/* /*
@ -335,13 +335,13 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat
{ {
/* local variables */ /* local variables */
mca_ptl_base_recv_request_t *specific_recv, *return_match; mca_ptl_base_recv_request_t *specific_recv, *return_match;
int frag_src,recv_user_tag,frag_user_tag; int frag_src,recv_tag,frag_tag;
/* initialization */ /* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL; return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_src_rank; frag_src = frag_header->hdr_src;
frag_user_tag=frag_header->hdr_user_tag; frag_tag=frag_header->hdr_tag;
/* /*
* Loop over the specific irecvs. * Loop over the specific irecvs.
@ -355,9 +355,9 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat
/* /*
* Check for a match * Check for a match
*/ */
recv_user_tag = specific_recv->super.req_tag; recv_tag = specific_recv->super.req_tag;
if ( (frag_user_tag == recv_user_tag) || if ( (frag_tag == recv_tag) ||
( (recv_user_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) { ( (recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) ) {
/* /*
* Match made * Match made
@ -398,12 +398,12 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
/* local variables */ /* local variables */
mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match; mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match;
mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq; mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq;
int frag_src,frag_user_tag, wild_recv_tag, specific_recv_tag; int frag_src,frag_tag, wild_recv_tag, specific_recv_tag;
/* initialization */ /* initialization */
return_match=(mca_ptl_base_recv_request_t *)NULL; return_match=(mca_ptl_base_recv_request_t *)NULL;
frag_src = frag_header->hdr_src_rank; frag_src = frag_header->hdr_src;
frag_user_tag=frag_header->hdr_user_tag; frag_tag=frag_header->hdr_tag;
/* /*
* We know that when this is called, both specific and wild irecvs * We know that when this is called, both specific and wild irecvs
@ -426,8 +426,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
* try and match * try and match
*/ */
wild_recv_tag = wild_recv->super.req_tag; wild_recv_tag = wild_recv->super.req_tag;
if ( (frag_user_tag == wild_recv_tag) || if ( (frag_tag == wild_recv_tag) ||
( (wild_recv_tag == LAM_ANY_TAG) && (0 <= frag_user_tag) ) ) { ( (wild_recv_tag == LAM_ANY_TAG) && (0 <= frag_tag) ) ) {
/* /*
* Match made * Match made
@ -471,8 +471,8 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
* specific recv is earlier than the wild one. * specific recv is earlier than the wild one.
*/ */
specific_recv_tag=specific_recv->super.req_tag; specific_recv_tag=specific_recv->super.req_tag;
if ( (frag_user_tag == specific_recv_tag) || if ( (frag_tag == specific_recv_tag) ||
( (specific_recv_tag == LAM_ANY_TAG) && (0<=frag_user_tag)) ) ( (specific_recv_tag == LAM_ANY_TAG) && (0<=frag_tag)) )
{ {
/* /*
@ -540,12 +540,6 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
mca_ptl_base_recv_frag_t *frag_desc; mca_ptl_base_recv_frag_t *frag_desc;
mca_ptl_base_recv_request_t *matched_receive; mca_ptl_base_recv_request_t *matched_receive;
/*
* Initialize list size - assume that most of the time this search
* will come up empty, so just initialize count - not pointers
*/
lam_list_set_size(additional_matches,0);
/* /*
* Loop over all the out of sequence messages. No ordering is assumed * Loop over all the out of sequence messages. No ordering is assumed
* in the c_frags_cant_match list. * in the c_frags_cant_match list.
@ -577,16 +571,6 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
frag_seqber=frag_desc->super.frag_header.hdr_match.hdr_msg_seq; frag_seqber=frag_desc->super.frag_header.hdr_match.hdr_msg_seq;
if (frag_seqber == next_msg_seq_expected) { if (frag_seqber == next_msg_seq_expected) {
/* initialize list on first entry - assume that most
* of the time nothing is found, so initially we just
* set the count to zero, and don't initialize any
* other parameters
*/
if(0 == lam_list_get_size(additional_matches))
{
OBJ_CONSTRUCT(additional_matches, lam_list_t);
}
/* We're now expecting the next sequence number. */ /* We're now expecting the next sequence number. */
(pml_comm->c_next_msg_seq[frag_src])++; (pml_comm->c_next_msg_seq[frag_src])++;

Просмотреть файл

@ -3,8 +3,8 @@
*/ */
/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
#ifndef MCA_PML_BASE_RECVFRAG_H #ifndef MCA_PTL_BASE_RECVFRAG_H
#define MCA_PML_BASE_RECVFRAG_H #define MCA_PTL_BASE_RECVFRAG_H
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_fragment.h" #include "mca/mpi/ptl/base/ptl_base_fragment.h"
@ -60,8 +60,10 @@ static inline int mca_ptl_base_recv_frag_match(
{ {
bool matched; bool matched;
lam_list_t matched_frags; lam_list_t matched_frags;
int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags); int rc;
if(rc != LAM_SUCCESS)
OBJ_CONSTRUCT(&matched_frags, lam_list_t);
if((rc = mca_ptl_base_match(header, frag, &matched, &matched_frags)) != LAM_SUCCESS)
return rc; return rc;
if(matched) { if(matched) {

Просмотреть файл

@ -11,7 +11,7 @@
static void mca_ptl_base_recv_request_construct(mca_ptl_base_recv_request_t*); static void mca_ptl_base_recv_request_construct(mca_ptl_base_recv_request_t*);
static void mca_ptl_base_recv_request_destruct(mca_ptl_base_recv_request_t*); static void mca_ptl_base_recv_request_destruct(mca_ptl_base_recv_request_t*);
static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t*, int); static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t*, int);
lam_class_t mca_ptl_base_recv_request_t_class = { lam_class_t mca_ptl_base_recv_request_t_class = {
@ -39,21 +39,23 @@ static void mca_ptl_base_recv_request_destruct(mca_ptl_base_recv_request_t* requ
void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* request) void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* request)
{ {
lam_communicator_t *comm = request->super.req_communicator; lam_communicator_t *comm = request->super.req_comm;
mca_pml_comm_t* pml_comm = comm->c_pml_comm; mca_pml_comm_t* pml_comm = comm->c_pml_comm;
int req_peer = request->super.req_peer; int req_peer = request->super.req_peer;
mca_ptl_base_recv_frag_t* frag;
/* check for a specific match */ /* check for a specific match */
THREAD_LOCK(pml_comm->c_matching_lock+req_peer);
if (lam_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 && if (lam_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 &&
mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) { (frag = mca_ptl_base_recv_request_match_specific_proc(request, req_peer)) != NULL) {
THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer);
mca_ptl_base_recv_frag_process(frag);
return; /* match found */ return; /* match found */
} }
/* We didn't find any matches. Record this irecv so we can match /* We didn't find any matches. Record this irecv so we can match
* it when the message comes in. * it when the message comes in.
*/ */
THREAD_LOCK(pml_comm->c_matching_lock+req_peer);
lam_list_append(pml_comm->c_specific_receives+req_peer, (lam_list_item_t*)request); lam_list_append(pml_comm->c_specific_receives+req_peer, (lam_list_item_t*)request);
THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer); THREAD_UNLOCK(pml_comm->c_matching_lock+req_peer);
} }
@ -66,7 +68,7 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque
void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request) void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
{ {
lam_communicator_t *comm = request->super.req_communicator; lam_communicator_t *comm = request->super.req_comm;
mca_pml_comm_t* pml_comm = comm->c_pml_comm; mca_pml_comm_t* pml_comm = comm->c_pml_comm;
int proc_count = comm->c_remote_group->grp_proc_count; int proc_count = comm->c_remote_group->grp_proc_count;
int proc; int proc;
@ -78,15 +80,22 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
* process. * process.
*/ */
for (proc = 0; proc < proc_count; proc++) { for (proc = 0; proc < proc_count; proc++) {
mca_ptl_base_recv_frag_t* frag;
/* continue if no frags to match */ /* continue if no frags to match */
if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) THREAD_LOCK(pml_comm->c_matching_lock+proc);
if (lam_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) {
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
continue; continue;
}
/* loop over messages from the current proc */ /* loop over messages from the current proc */
if (mca_ptl_base_recv_request_match_specific_proc(request, proc)) { if ((frag = mca_ptl_base_recv_request_match_specific_proc(request, proc)) != NULL) {
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
mca_ptl_base_recv_frag_process(frag);
return; /* match found */ return; /* match found */
} }
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
} }
/* We didn't find any matches. Record this irecv so we can match to /* We didn't find any matches. Record this irecv so we can match to
@ -104,41 +113,33 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
* it places the request in the appropriate matched receive list. * it places the request in the appropriate matched receive list.
*/ */
static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_request_t* request, int proc) static mca_ptl_base_recv_frag_t* mca_ptl_base_recv_request_match_specific_proc(
mca_ptl_base_recv_request_t* request, int proc)
{ {
mca_pml_comm_t *pml_comm = request->super.req_communicator->c_pml_comm; mca_pml_comm_t *pml_comm = request->super.req_comm->c_pml_comm;
lam_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc; lam_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc;
mca_ptl_base_recv_frag_t* frag; mca_ptl_base_recv_frag_t* frag;
int tag = request->super.req_tag; int tag = request->super.req_tag;
/* lock for thread safety */
THREAD_LOCK(pml_comm->c_matching_lock+proc);
for (frag = (mca_ptl_base_recv_frag_t*)lam_list_get_first(unexpected_frags); for (frag = (mca_ptl_base_recv_frag_t*)lam_list_get_first(unexpected_frags);
frag != (mca_ptl_base_recv_frag_t*)lam_list_get_end(unexpected_frags); frag != (mca_ptl_base_recv_frag_t*)lam_list_get_end(unexpected_frags);
frag = (mca_ptl_base_recv_frag_t*)lam_list_get_next(frag)) { frag = (mca_ptl_base_recv_frag_t*)lam_list_get_next(frag)) {
mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match; mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match;
/* check first frag - we assume that process matching has been done already */ /* check first frag - we assume that process matching has been done already */
if (((tag == LAM_ANY_TAG) || (tag == header->hdr_user_tag))) { if (((tag == LAM_ANY_TAG) || (tag == header->hdr_tag))) {
if (tag == LAM_ANY_TAG && header->hdr_user_tag < 0) { if (tag == LAM_ANY_TAG && header->hdr_tag < 0) {
continue; continue;
} }
lam_list_remove_item(unexpected_frags, frag);
request->req_sequence = header->hdr_msg_seq; request->req_sequence = header->hdr_msg_seq;
request->super.req_tag = tag = header->hdr_user_tag; request->super.req_tag = tag = header->hdr_tag;
request->super.req_peer = header->hdr_src_rank; request->super.req_peer = header->hdr_src;
frag->frag_request = request; frag->frag_request = request;
return frag;
/* notify ptl fragment has been matched - send cts to peer */
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
mca_ptl_base_recv_frag_process(frag);
return true;
} }
} }
return NULL;
THREAD_UNLOCK(pml_comm->c_matching_lock+proc);
return false;
} }

Просмотреть файл

@ -14,7 +14,10 @@ struct mca_ptl_base_recv_frag_t;
struct mca_ptl_base_recv_request_t { struct mca_ptl_base_recv_request_t {
mca_pml_base_request_t super; mca_pml_base_request_t super;
/* request sequence number */
mca_ptl_base_sequence_t req_sequence; mca_ptl_base_sequence_t req_sequence;
/* number of bytes delivered */
size_t req_bytes_recvd;
}; };
typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t; typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
@ -34,18 +37,17 @@ static inline void mca_ptl_base_recv_request_reinit(
bool persistent) bool persistent)
{ {
request->req_sequence = 0; request->req_sequence = 0;
request->req_bytes_recvd = 0;
request->super.req_addr = addr; request->super.req_addr = addr;
request->super.req_length = length; request->super.req_length = length;
request->super.req_datatype = datatype; request->super.req_datatype = datatype;
request->super.req_peer = src; request->super.req_peer = src;
request->super.req_tag = tag; request->super.req_tag = tag;
request->super.req_communicator = comm; request->super.req_comm = comm;
request->super.req_type = MCA_PML_REQUEST_RECV; request->super.req_type = MCA_PML_REQUEST_RECV;
request->super.req_status = MCA_PML_STATUS_INITED;
request->super.req_persistent = persistent; request->super.req_persistent = persistent;
request->super.req_mpi_done = false; request->super.req_mpi_done = false;
request->super.req_pml_layer_done = false; request->super.req_pml_done = false;
} }
#endif #endif

Просмотреть файл

@ -1,8 +1,8 @@
/* /*
* $HEADER$ * $HEADER$
*/ */
#ifndef MCA_PML_BASE_SEND_FRAG_H #ifndef MCA_PTL_BASE_SEND_FRAG_H
#define MCA_PML_BASE_SEND_FRAG_H #define MCA_PTL_BASE_SEND_FRAG_H
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_fragment.h" #include "mca/mpi/ptl/base/ptl_base_fragment.h"

Просмотреть файл

@ -9,6 +9,7 @@
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/pml/base/pml_base_request.h" #include "mca/mpi/pml/base/pml_base_request.h"
#include "mca/mpi/ptl/base/ptl_base_comm.h"
extern lam_class_t mca_ptl_base_send_request_t_class; extern lam_class_t mca_ptl_base_send_request_t_class;
@ -18,10 +19,6 @@ struct mca_ptl_base_send_frag_t;
struct mca_ptl_base_send_request_t { struct mca_ptl_base_send_request_t {
/* request object - common data structure for use by wait/test */ /* request object - common data structure for use by wait/test */
mca_pml_base_request_t super; mca_pml_base_request_t super;
/* pointer to user data */
unsigned char *req_data;
/* size of send/recv in bytes */
size_t req_length;
/* number of bytes that have already been assigned to a fragment */ /* number of bytes that have already been assigned to a fragment */
size_t req_offset; size_t req_offset;
/* number of fragments that have been allocated */ /* number of fragments that have been allocated */
@ -35,7 +32,7 @@ struct mca_ptl_base_send_request_t {
/* type of send */ /* type of send */
mca_pml_base_send_mode_t req_send_mode; mca_pml_base_send_mode_t req_send_mode;
/* sequence number for MPI pt-2-pt ordering */ /* sequence number for MPI pt-2-pt ordering */
mca_ptl_base_sequence_t req_msg_sequence_number; mca_ptl_base_sequence_t req_msg_seq;
/* queue of fragments that are waiting to be acknowledged */ /* queue of fragments that are waiting to be acknowledged */
mca_ptl_base_queue_t req_unacked_frags; mca_ptl_base_queue_t req_unacked_frags;
/* PTL that allocated this descriptor */ /* PTL that allocated this descriptor */
@ -66,17 +63,17 @@ static inline void mca_ptl_base_send_request_reinit(
request->req_bytes_acked = 0; request->req_bytes_acked = 0;
request->req_send_mode = sendmode; request->req_send_mode = sendmode;
request->req_peer_request.lval = 0; request->req_peer_request.lval = 0;
request->req_msg_seq = mca_pml_ptl_comm_send_sequence(comm->c_pml_comm, peer);
request->super.req_addr = addr; request->super.req_addr = addr;
request->super.req_length = length; request->super.req_length = length;
request->super.req_datatype = datatype; request->super.req_datatype = datatype;
request->super.req_peer = peer; request->super.req_peer = peer;
request->super.req_tag = tag; request->super.req_tag = tag;
request->super.req_communicator = comm; request->super.req_comm = comm;
request->super.req_type = MCA_PML_REQUEST_SEND; request->super.req_type = MCA_PML_REQUEST_SEND;
request->super.req_status = MCA_PML_STATUS_INITED;
request->super.req_persistent = persistent; request->super.req_persistent = persistent;
request->super.req_mpi_done = false; request->super.req_mpi_done = false;
request->super.req_pml_layer_done = false; request->super.req_pml_done = false;
} }

Просмотреть файл

@ -6,14 +6,18 @@
#include "lam/util/if.h" #include "lam/util/if.h"
#include "mca/mpi/pml/pml.h" #include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/ptl_base_header.h"
#include "mca/mpi/ptl/base/ptl_base_sendreq.h" #include "mca/mpi/ptl/base/ptl_base_sendreq.h"
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h" #include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
#include "mca/mpi/ptl/base/ptl_base_recvreq.h"
#include "mca/mpi/ptl/base/ptl_base_recvfrag.h"
#include "mca/lam/base/mca_base_module_exchange.h" #include "mca/lam/base/mca_base_module_exchange.h"
#include "ptl_tcp.h" #include "ptl_tcp.h"
#include "ptl_tcp_addr.h" #include "ptl_tcp_addr.h"
#include "ptl_tcp_peer.h" #include "ptl_tcp_peer.h"
#include "ptl_tcp_proc.h" #include "ptl_tcp_proc.h"
#include "ptl_tcp_sendreq.h" #include "ptl_tcp_sendreq.h"
#include "ptl_tcp_recvfrag.h"
mca_ptl_tcp_t mca_ptl_tcp = { mca_ptl_tcp_t mca_ptl_tcp = {
@ -151,11 +155,25 @@ int mca_ptl_tcp_send(
void mca_ptl_tcp_recv( void mca_ptl_tcp_recv(
struct mca_ptl_t* ptl, mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* frag) mca_ptl_base_recv_frag_t* frag)
{ {
if(mca_ptl_tcp_recv_frag_cts((mca_ptl_tcp_recv_frag_t*)frag) == false) { /* fill in match */
lam_list_append(&mca_ptl_tcp_module.tcp_acks, (lam_list_item_t*)frag); mca_pml_base_request_t* request = &frag->frag_request->super;
mca_ptl_base_header_t* header = &frag->super.frag_header;
request->req_status.MPI_SOURCE = header->hdr_match.hdr_src;
request->req_status.MPI_TAG = header->hdr_match.hdr_tag;
request->req_status.MPI_ERROR = LAM_SUCCESS;
request->req_status.MPI_LENGTH = header->hdr_match.hdr_msg_length;
/* send cts ack back to peer */
if(request->req_status.MPI_LENGTH > frag->super.frag_size) {
if(mca_ptl_tcp_recv_frag_cts((mca_ptl_tcp_recv_frag_t*)frag) == false) {
lam_list_append(&mca_ptl_tcp_module.tcp_acks, (lam_list_item_t*)frag);
}
} }
/* process fragment if complete */
mca_ptl_tcp_recv_frag_process((mca_ptl_tcp_recv_frag_t*)frag);
} }

Просмотреть файл

@ -24,8 +24,8 @@ struct mca_ptl_tcp_module_1_0_0_t {
struct mca_ptl_tcp_t** tcp_ptls; struct mca_ptl_tcp_t** tcp_ptls;
size_t tcp_num_ptls; /**< number of ptls actually used */ size_t tcp_num_ptls; /**< number of ptls actually used */
size_t tcp_max_ptls; /**< maximum number of ptls - available kernel ifs */ size_t tcp_max_ptls; /**< maximum number of ptls - available kernel ifs */
int tcp_listen; int tcp_listen_sd;
unsigned short tcp_port; unsigned short tcp_listen_port;
char* tcp_if_include; /**< comma seperated list of interface to include */ char* tcp_if_include; /**< comma seperated list of interface to include */
char* tcp_if_exclude; /**< comma seperated list of interface to exclude */ char* tcp_if_exclude; /**< comma seperated list of interface to exclude */
int tcp_free_list_num; /**< initial size of free lists */ int tcp_free_list_num; /**< initial size of free lists */

Просмотреть файл

@ -3,6 +3,11 @@
*/ */
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "lam/constants.h" #include "lam/constants.h"
#include "lam/util/if.h" #include "lam/util/if.h"
@ -211,36 +216,57 @@ static int mca_ptl_tcp_module_create_instances(void)
static int mca_ptl_tcp_module_create_listen(void) static int mca_ptl_tcp_module_create_listen(void)
{ {
int flags;
/* create a listen socket for incoming connections */ /* create a listen socket for incoming connections */
mca_ptl_tcp_module.tcp_listen = socket(AF_INET, SOCK_STREAM, 0); mca_ptl_tcp_module.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_ptl_tcp_module.tcp_listen < 0) { if(mca_ptl_tcp_module.tcp_listen_sd < 0) {
lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno); lam_output(0,"mca_ptl_tcp_module_init: socket() failed with errno=%d", errno);
return LAM_ERROR; return LAM_ERROR;
} }
/* bind to all addresses and dynamically assigned port */ /* bind to all addresses and dynamically assigned port */
struct sockaddr_in inaddr; struct sockaddr_in inaddr;
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET; inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY; inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = 0; inaddr.sin_port = 0;
if(bind(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { if(bind(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
lam_output(0,"mca_ptl_tcp_module_init: bind() failed with errno=%d", errno); lam_output(0,"mca_ptl_tcp_module_init: bind() failed with errno=%d", errno);
return LAM_ERROR; return LAM_ERROR;
} }
/* resolve system assignend port */ /* resolve system assignend port */
lam_socklen_t addrlen = sizeof(struct sockaddr_in); lam_socklen_t addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&inaddr, &addrlen) < 0) { if(getsockname(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: getsockname() failed with errno=%d", errno); lam_output(0, "mca_ptl_tcp_module_init: getsockname() failed with errno=%d", errno);
return LAM_ERROR; return LAM_ERROR;
} }
mca_ptl_tcp_module.tcp_port = inaddr.sin_port; mca_ptl_tcp_module.tcp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_ptl_tcp_module.tcp_listen_sd, SOMAXCONN) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: listen() failed with errno=%d", errno);
return LAM_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_GETFL, 0)) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_GETFL) failed with errno=%d", errno);
return LAM_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_ptl_tcp_module.tcp_listen_sd, F_SETFL, flags) < 0) {
lam_output(0, "mca_ptl_tcp_module_init: fcntl(F_SETFL) failed with errno=%d", errno);
return LAM_ERROR;
}
}
/* register listen port */ /* register listen port */
lam_event_set( lam_event_set(
&mca_ptl_tcp_module.tcp_recv_event, &mca_ptl_tcp_module.tcp_recv_event,
mca_ptl_tcp_module.tcp_listen, mca_ptl_tcp_module.tcp_listen_sd,
LAM_EV_READ|LAM_EV_PERSIST, LAM_EV_READ|LAM_EV_PERSIST,
mca_ptl_tcp_module_recv_handler, mca_ptl_tcp_module_recv_handler,
0); 0);
@ -261,7 +287,7 @@ static int mca_ptl_tcp_module_exchange(void)
for(i=0; i<mca_ptl_tcp_module.tcp_num_ptls; i++) { for(i=0; i<mca_ptl_tcp_module.tcp_num_ptls; i++) {
mca_ptl_tcp_t* ptl = mca_ptl_tcp_module.tcp_ptls[i]; mca_ptl_tcp_t* ptl = mca_ptl_tcp_module.tcp_ptls[i];
addrs[i].addr_inet = ptl->ptl_ifaddr.sin_addr; addrs[i].addr_inet = ptl->ptl_ifaddr.sin_addr;
addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen; addrs[i].addr_port = mca_ptl_tcp_module.tcp_listen_port;
addrs[i].addr_inuse = 0; addrs[i].addr_inuse = 0;
} }
int rc = mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version, addrs, size); int rc = mca_base_modex_send(&mca_ptl_tcp_module.super.ptlm_version, addrs, size);
@ -353,10 +379,10 @@ void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp)
static void mca_ptl_tcp_module_accept(void) static void mca_ptl_tcp_module_accept(void)
{ {
lam_socklen_t addrlen = sizeof(struct sockaddr_in);
while(true) { while(true) {
lam_socklen_t addrlen = sizeof(struct sockaddr_in);
struct sockaddr_in addr; struct sockaddr_in addr;
int sd = accept(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&addr, &addrlen); int sd = accept(mca_ptl_tcp_module.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);
if(sd < 0) { if(sd < 0) {
if(errno == EINTR) if(errno == EINTR)
continue; continue;
@ -385,7 +411,7 @@ static void mca_ptl_tcp_module_recv_handler(int sd, short flags, void* user)
lam_socklen_t addr_len = sizeof(addr); lam_socklen_t addr_len = sizeof(addr);
/* accept new connections on the listen socket */ /* accept new connections on the listen socket */
if(mca_ptl_tcp_module.tcp_listen == sd) { if(mca_ptl_tcp_module.tcp_listen_sd == sd) {
mca_ptl_tcp_module_accept(); mca_ptl_tcp_module_accept();
return; return;
} }

Просмотреть файл

@ -11,6 +11,7 @@
#include "ptl_tcp_addr.h" #include "ptl_tcp_addr.h"
#include "ptl_tcp_peer.h" #include "ptl_tcp_peer.h"
#include "ptl_tcp_proc.h" #include "ptl_tcp_proc.h"
#include "ptl_tcp_sendfrag.h"
static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer); static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer);
@ -50,6 +51,22 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
} }
static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, int sd)
{
lam_event_set(
&ptl_peer->peer_recv_event,
ptl_peer->peer_sd,
LAM_EV_READ|LAM_EV_PERSIST,
mca_ptl_tcp_peer_recv_handler,
ptl_peer);
lam_event_set(
&ptl_peer->peer_send_event,
ptl_peer->peer_sd,
LAM_EV_WRITE|LAM_EV_PERSIST,
mca_ptl_tcp_peer_send_handler,
ptl_peer);
}
/* /*
* Cleanup any resources held by the peer. * Cleanup any resources held by the peer.
*/ */
@ -88,6 +105,8 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) { if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) {
ptl_peer->peer_send_frag = frag; ptl_peer->peer_send_frag = frag;
lam_event_add(&mca_ptl_tcp_module.tcp_send_event, 0); lam_event_add(&mca_ptl_tcp_module.tcp_send_event, 0);
} else {
ptl_peer->peer_ptl->super.ptl_send_progress(frag->super.frag_request, &frag->super);
} }
} }
break; break;
@ -96,6 +115,50 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
return rc; return rc;
} }
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(errno == EINTR)
continue;
if(errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
return -1;
}
}
cnt += retval;
}
return cnt;
}
/*
* Send the globally unique identifier for this process to a peer on
* a newly connected socket.
*/
static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer)
{
/* send process identifier to remote peer */
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local();
uint32_t size_n = htonl(ptl_proc->proc_guid_size);
if(mca_ptl_tcp_peer_send_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n) ||
mca_ptl_tcp_peer_send_blocking(ptl_peer, ptl_proc->proc_guid, ptl_proc->proc_guid_size) !=
ptl_proc->proc_guid_size) {
return LAM_ERR_UNREACH;
}
return LAM_SUCCESS;
}
/* /*
* Check the state of this peer. If the incoming connection request matches * Check the state of this peer. If the incoming connection request matches
* our peers address, check the state of our connection: * our peers address, check the state of our connection:
@ -118,6 +181,12 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
peer_proc->proc_lam->proc_vpid < this_proc->proc_lam->proc_vpid)) { peer_proc->proc_lam->proc_vpid < this_proc->proc_lam->proc_vpid)) {
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
ptl_peer->peer_sd = sd; ptl_peer->peer_sd = sd;
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) != LAM_SUCCESS) {
mca_ptl_tcp_peer_close_i(ptl_peer);
return false;
}
mca_ptl_tcp_peer_event_init(ptl_peer, sd);
lam_event_add(&ptl_peer->peer_recv_event, 0);
mca_ptl_tcp_peer_connected(ptl_peer); mca_ptl_tcp_peer_connected(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_lock); THREAD_UNLOCK(&ptl_peer->peer_lock);
return true; return true;
@ -172,19 +241,6 @@ static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer)
lam_list_remove_first(&ptl_peer->peer_frags); lam_list_remove_first(&ptl_peer->peer_frags);
lam_event_add(&ptl_peer->peer_send_event, 0); lam_event_add(&ptl_peer->peer_send_event, 0);
} }
lam_event_set(
&ptl_peer->peer_recv_event,
ptl_peer->peer_sd,
LAM_EV_READ|LAM_EV_PERSIST,
mca_ptl_tcp_peer_recv_handler,
ptl_peer);
lam_event_set(
&ptl_peer->peer_send_event,
ptl_peer->peer_sd,
LAM_EV_WRITE|LAM_EV_PERSIST,
mca_ptl_tcp_peer_send_handler,
ptl_peer);
lam_event_add(&ptl_peer->peer_recv_event, 0);
} }
@ -221,30 +277,6 @@ static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_base_peer_t* ptl_peer, void* d
} }
/*
* A blocking send on a non-blocking socket. Used to send the small amount of connection
* information that identifies the peers endpoint.
*/
static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* data, size_t size)
{
unsigned char* ptr = (unsigned char*)data;
size_t cnt = 0;
while(cnt < size) {
int retval = send(ptl_peer->peer_sd, ptr+cnt, size-cnt, 0);
if(retval < 0) {
if(errno == EINTR)
continue;
if(errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
return -1;
}
}
cnt += retval;
}
return cnt;
}
/* /*
* Receive the peers globally unique process identification from a newly * Receive the peers globally unique process identification from a newly
@ -258,7 +290,7 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
void* guid; void* guid;
mca_ptl_tcp_proc_t* ptl_proc = ptl_peer->peer_proc; mca_ptl_tcp_proc_t* ptl_proc = ptl_peer->peer_proc;
if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, &size_n, sizeof(size_n)) != size_n) if(mca_ptl_tcp_peer_recv_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n))
return LAM_ERR_UNREACH; return LAM_ERR_UNREACH;
size_h = ntohl(size_n); size_h = ntohl(size_n);
guid = malloc(size_h); guid = malloc(size_h);
@ -283,24 +315,6 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
} }
/*
* Send the globally unique identifier for this process to a peer on
* a newly connected socket.
*/
static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer)
{
/* send process identifier to remote peer */
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_local();
uint32_t size_n = htonl(ptl_proc->proc_guid_size);
if(mca_ptl_tcp_peer_send_blocking(ptl_peer, &size_n, sizeof(size_n)) != sizeof(size_n) ||
mca_ptl_tcp_peer_send_blocking(ptl_peer, ptl_proc->proc_guid, ptl_proc->proc_guid_size) !=
ptl_proc->proc_guid_size) {
return LAM_ERR_UNREACH;
}
return LAM_SUCCESS;
}
/* /*
* Start a connection to the peer. This will likely not complete, * Start a connection to the peer. This will likely not complete,
@ -319,6 +333,9 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
return LAM_ERR_UNREACH; return LAM_ERR_UNREACH;
} }
/* setup event callbacks */
mca_ptl_tcp_peer_event_init(ptl_peer, ptl_peer->peer_sd);
/* setup the socket as non-blocking */ /* setup the socket as non-blocking */
int flags; int flags;
if((flags = fcntl(ptl_peer->peer_sd, F_GETFL, 0)) < 0) { if((flags = fcntl(ptl_peer->peer_sd, F_GETFL, 0)) < 0) {
@ -330,13 +347,16 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
} }
/* start the connect - will likely fail with EINPROGRESS */ /* start the connect - will likely fail with EINPROGRESS */
if(connect(ptl_peer->peer_sd, (struct sockaddr*)&ptl_peer->peer_addr->addr_inet, struct sockaddr_in peer_addr;
sizeof(struct sockaddr_in)) < 0) { peer_addr.sin_family = AF_INET;
peer_addr.sin_addr = ptl_peer->peer_addr->addr_inet;
peer_addr.sin_port = ptl_peer->peer_addr->addr_port;
if(connect(ptl_peer->peer_sd, (struct sockaddr*)&peer_addr, sizeof(peer_addr)) < 0) {
/* non-blocking so wait for completion */ /* non-blocking so wait for completion */
if(errno == EINPROGRESS) { if(errno == EINPROGRESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING; ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING;
lam_event_add(&ptl_peer->peer_send_event, 0); lam_event_add(&ptl_peer->peer_send_event, 0);
return rc; return LAM_SUCCESS;
} }
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
ptl_peer->peer_retries++; ptl_peer->peer_retries++;
@ -379,7 +399,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
return; return;
} }
if(so_error != 0) { if(so_error != 0) {
lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failedd with errno=%d\n", so_error); lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
return; return;
} }
@ -452,11 +472,16 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
case MCA_PTL_TCP_CONNECTING: case MCA_PTL_TCP_CONNECTING:
mca_ptl_tcp_peer_complete_connect(ptl_peer); mca_ptl_tcp_peer_complete_connect(ptl_peer);
break; break;
case MCA_PTL_TCP_CONNECTED: case MCA_PTL_TCP_CONNECTED:
{
/* complete the current send */ /* complete the current send */
mca_ptl_t *ptl = (mca_ptl_t*)ptl_peer->peer_ptl;
do { do {
if(mca_ptl_tcp_send_frag_handler(ptl_peer->peer_send_frag, ptl_peer->peer_sd) == false) mca_ptl_tcp_send_frag_t* send_frag = ptl_peer->peer_send_frag;
if(mca_ptl_tcp_send_frag_handler(send_frag, ptl_peer->peer_sd) == false)
break; break;
ptl->ptl_send_progress(send_frag->super.frag_request, &send_frag->super);
/* progress any pending sends */ /* progress any pending sends */
ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*) ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)
lam_list_remove_first(&ptl_peer->peer_frags); lam_list_remove_first(&ptl_peer->peer_frags);
@ -467,6 +492,7 @@ static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
lam_event_del(&ptl_peer->peer_send_event); lam_event_del(&ptl_peer->peer_send_event);
} }
break; break;
}
default: default:
lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)", lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)",
ptl_peer->peer_state); ptl_peer->peer_state);

Просмотреть файл

@ -10,6 +10,11 @@
#include "ptl_tcp_recvfrag.h" #include "ptl_tcp_recvfrag.h"
#define frag_header super.super.frag_header
#define frag_peer super.super.frag_peer
#define frag_owner super.super.frag_owner
static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag);
static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag); static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag);
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t); static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t);
@ -43,8 +48,7 @@ void mca_ptl_tcp_recv_frag_reinit(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_pe
frag->frag_owner = &peer->peer_ptl->super; frag->frag_owner = &peer->peer_ptl->super;
frag->super.frag_request = 0; frag->super.frag_request = 0;
frag->frag_peer = peer; frag->frag_peer = peer;
frag->frag_addr = 0; frag->frag_buff = NULL;
frag->frag_size = 0;
frag->frag_hdr_cnt = 0; frag->frag_hdr_cnt = 0;
frag->frag_msg_cnt = 0; frag->frag_msg_cnt = 0;
} }
@ -124,21 +128,33 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false; return false;
if(frag->frag_msg_cnt == 0) { if(frag->frag_msg_cnt == 0) {
/* attempt to match a posted recv */ /* attempt to match a posted recv */
mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match); mca_ptl_base_recv_frag_match(&frag->super, &frag->frag_header.hdr_match);
/* match was not made - so allocate buffer for eager send */ /* match was not made - so allocate buffer for eager send */
if(NULL == frag->super.frag_request) { if (NULL == frag->super.frag_request) {
frag->frag_addr = malloc(frag->frag_header.hdr_frag.hdr_frag_length);
frag->frag_size = frag->frag_header.hdr_frag.hdr_frag_length;
} else {
frag->frag_addr = (unsigned char*)frag->super.super.frag_addr;
frag->frag_size = frag->super.super.frag_size;
}
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false;
} else if(frag->frag_msg_cnt < frag->super.super.frag_size) { #if 0
lam_output(0, "mca_ptl_tcp_recv_frag_match: src=%d tag=%d comm=%d seq=%ld",
frag->frag_header.hdr_match.hdr_src,
frag->frag_header.hdr_match.hdr_tag,
frag->frag_header.hdr_match.hdr_contextid,
frag->frag_header.hdr_match.hdr_msg_seq);
#endif
if(frag->frag_header.hdr_frag.hdr_frag_length > 0) {
frag->frag_buff = malloc(frag->frag_header.hdr_frag.hdr_frag_length);
frag->super.super.frag_size = frag->frag_header.hdr_frag.hdr_frag_length;
}
/* match was made - use application buffer */
} else {
frag->frag_buff = (unsigned char*)frag->super.super.frag_addr;
}
}
if(frag->frag_msg_cnt < frag->super.super.frag_size) {
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
return false; return false;
} }
@ -147,8 +163,8 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
return false; return false;
/* if match has already been made process the fragment */ /* if fragment has already been matched - go ahead and process */
if(NULL != frag->super.frag_request) if (frag->super.frag_request != NULL)
mca_ptl_tcp_recv_frag_process(frag); mca_ptl_tcp_recv_frag_process(frag);
return true; return true;
} }
@ -187,7 +203,8 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
{ {
int cnt = -1; int cnt = -1;
while(cnt < 0) { while(cnt < 0) {
cnt = recv(sd, (unsigned char*)frag->frag_addr+frag->frag_msg_cnt, frag->frag_size-frag->frag_msg_cnt, 0); cnt = recv(sd, (unsigned char*)frag->frag_buff+frag->frag_msg_cnt,
frag->super.super.frag_size-frag->frag_msg_cnt, 0);
if(cnt == 0) { if(cnt == 0) {
mca_ptl_tcp_peer_close(frag->frag_peer); mca_ptl_tcp_peer_close(frag->frag_peer);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
@ -208,7 +225,7 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
} }
} }
frag->frag_msg_cnt += cnt; frag->frag_msg_cnt += cnt;
return (frag->frag_msg_cnt >= frag->frag_size); return (frag->frag_msg_cnt >= frag->super.super.frag_size);
} }
@ -270,7 +287,7 @@ bool mca_ptl_tcp_recv_frag_cts(mca_ptl_tcp_recv_frag_t* frag)
ack->frag_owner = &ptl_peer->peer_ptl->super; ack->frag_owner = &ptl_peer->peer_ptl->super;
ack->frag_peer = ptl_peer; ack->frag_peer = ptl_peer;
ack->super.frag_request = 0; ack->super.frag_request = 0;
ack->super.super.frag_addr = 0; ack->super.super.frag_addr = NULL;
ack->super.super.frag_size = 0; ack->super.super.frag_size = 0;
ack->frag_vec_ptr = ack->frag_vec; ack->frag_vec_ptr = ack->frag_vec;
ack->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr; ack->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
@ -282,17 +299,13 @@ bool mca_ptl_tcp_recv_frag_cts(mca_ptl_tcp_recv_frag_t* frag)
/* /*
* Copy data into application buffer if required and update
* status of the request.
*/ */
void mca_ptl_tcp_recv_frag_process(mca_ptl_tcp_recv_frag_t* frag) void mca_ptl_tcp_recv_frag_process(mca_ptl_tcp_recv_frag_t* frag)
{ {
/* are we done receiving data */
if(frag->frag_msg_cnt >= frag->frag_header.hdr_frag.hdr_frag_length) { if(frag->frag_msg_cnt >= frag->frag_header.hdr_frag.hdr_frag_length) {
/* was a temporary buffer allocated */ if(frag->frag_buff != frag->super.super.frag_addr) {
if(frag->frag_addr != frag->super.super.frag_addr) { memcpy(frag->super.super.frag_addr, frag->frag_buff, frag->super.super.frag_size);
memcpy(frag->super.super.frag_addr, frag->frag_addr, frag->super.super.frag_size);
} }
frag->frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super); frag->frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super);
if(frag->frag_acked == true) { if(frag->frag_acked == true) {

Просмотреть файл

@ -20,14 +20,10 @@ extern lam_class_t mca_ptl_tcp_recv_frag_t_class;
struct mca_ptl_tcp_recv_frag_t { struct mca_ptl_tcp_recv_frag_t {
mca_ptl_base_recv_frag_t super; mca_ptl_base_recv_frag_t super;
mca_ptl_base_ack_header_t frag_ack; mca_ptl_base_ack_header_t frag_ack;
unsigned char* frag_addr; unsigned char* frag_buff;
size_t frag_size;
size_t frag_hdr_cnt; size_t frag_hdr_cnt;
size_t frag_msg_cnt; size_t frag_msg_cnt;
bool frag_acked; bool frag_acked;
#define frag_peer super.super.frag_peer
#define frag_owner super.super.frag_owner
#define frag_header super.super.frag_header
}; };
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t; typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
@ -39,8 +35,8 @@ static inline mca_ptl_tcp_recv_frag_t* mca_ptl_tcp_recv_frag_alloc(int* rc)
static inline void mca_ptl_tcp_recv_frag_return(mca_ptl_tcp_recv_frag_t* frag) static inline void mca_ptl_tcp_recv_frag_return(mca_ptl_tcp_recv_frag_t* frag)
{ {
if(frag->frag_addr != frag->super.super.frag_addr) if(frag->frag_buff != frag->super.super.frag_addr)
free(frag->frag_addr); free(frag->frag_buff);
lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag); lam_free_list_return(&mca_ptl_tcp_module.tcp_recv_frags, (lam_list_item_t*)frag);
} }

Просмотреть файл

@ -10,6 +10,10 @@
#include "ptl_tcp_peer.h" #include "ptl_tcp_peer.h"
#include "ptl_tcp_sendfrag.h" #include "ptl_tcp_sendfrag.h"
#define frag_header super.super.frag_header
#define frag_owner super.super.frag_owner
#define frag_peer super.super.frag_peer
static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag); static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag);
static void mca_ptl_tcp_send_frag_destruct(mca_ptl_tcp_send_frag_t* frag); static void mca_ptl_tcp_send_frag_destruct(mca_ptl_tcp_send_frag_t* frag);
@ -54,12 +58,12 @@ void mca_ptl_tcp_send_frag_reinit(
hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.pval = 0; hdr->hdr_frag.hdr_dst_ptr.pval = 0;
hdr->hdr_match.hdr_contextid = sendreq->super.req_communicator->c_contextid; hdr->hdr_match.hdr_contextid = sendreq->super.req_comm->c_contextid;
hdr->hdr_match.hdr_src_rank = sendreq->super.req_communicator->c_my_rank; hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst_rank = sendreq->super.req_peer; hdr->hdr_match.hdr_dst = sendreq->super.req_peer;
hdr->hdr_match.hdr_user_tag = sendreq->super.req_tag; hdr->hdr_match.hdr_tag = sendreq->super.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_length; hdr->hdr_match.hdr_msg_length = sendreq->super.req_length;
hdr->hdr_match.hdr_msg_seq = 0; hdr->hdr_match.hdr_msg_seq = sendreq->req_msg_seq;
} else { } else {
hdr->hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_flags = 0; hdr->hdr_flags = 0;
@ -71,8 +75,8 @@ void mca_ptl_tcp_send_frag_reinit(
} }
/* update request */ /* update request */
if(sendreq->req_offset + size > sendreq->req_length) if(sendreq->req_offset + size > sendreq->super.req_length)
size = sendreq->req_length = sendreq->req_offset; size = sendreq->super.req_length = sendreq->req_offset;
hdr->hdr_frag.hdr_frag_length = size; hdr->hdr_frag.hdr_frag_length = size;
sendreq->req_offset += size; sendreq->req_offset += size;
sendreq->req_frags++; sendreq->req_frags++;

Просмотреть файл

@ -21,9 +21,6 @@ struct mca_ptl_tcp_send_frag_t {
struct iovec *frag_vec_ptr; struct iovec *frag_vec_ptr;
size_t frag_vec_cnt; size_t frag_vec_cnt;
struct iovec frag_vec[2]; struct iovec frag_vec[2];
#define frag_peer super.super.frag_peer
#define frag_header super.super.frag_header
#define frag_owner super.super.frag_owner
}; };
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t; typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;

Просмотреть файл

@ -61,21 +61,22 @@ static inline int lam_comm_rank(lam_communicator_t* comm)
{ {
return comm->c_my_rank; return comm->c_my_rank;
} }
/**
* size of the communicator
*/
static inline int lam_comm_size(lam_communicator_t* comm)
{
return comm->c_remote_group->grp_proc_count;
}
/* return pointer to communicator associated with context id cid, /* return pointer to communicator associated with context id cid,
* No error checking is done*/ * No error checking is done*/
static inline lam_communicator_t *lam_comm_lookup(uint32_t cid) static inline lam_communicator_t *lam_comm_lookup(uint32_t cid)
{ {
/* array of pointers to communicators, indexed by context ID */ /* array of pointers to communicators, indexed by context ID */
extern lam_communicator_t **lam_mpi_comm_array; extern lam_pointer_array_t lam_mpi_communicators;
#if LAM_ENABLE_DEBUG return (lam_communicator_t*)lam_pointer_array_get_item(&lam_mpi_communicators, cid);
extern uint32_t lam_mpi_comm_array_size;
if (cid >= lam_mpi_comm_array_size) {
lam_output(0, "lam_comm_lookup: invalid communicator index (%d)", cid);
return (lam_communicator_t *) NULL;
}
#endif
return lam_mpi_comm_array[cid];
} }
static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, int peer_id) static inline lam_proc_t* lam_comm_lookup_peer(lam_communicator_t* comm, int peer_id)

Просмотреть файл

@ -15,11 +15,9 @@
* Global variables * Global variables
*/ */
lam_communicator_t **lam_mpi_comm_array; lam_pointer_array_t lam_mpi_communicators;
size_t lam_mpi_comm_array_size; lam_communicator_t lam_mpi_comm_world;
lam_communicator_t lam_mpi_comm_self;
lam_communicator_t lam_mpi_comm_world;
lam_communicator_t lam_mpi_comm_self;
static void lam_comm_construct(lam_communicator_t* comm) static void lam_comm_construct(lam_communicator_t* comm)
@ -62,6 +60,9 @@ int lam_comm_init(void)
lam_group_t *group; lam_group_t *group;
size_t size; size_t size;
/* Setup communicator array */
OBJ_CONSTRUCT(&lam_mpi_communicators, lam_pointer_array_t);
/* Setup MPI_COMM_WORLD */ /* Setup MPI_COMM_WORLD */
OBJ_CONSTRUCT(&lam_mpi_comm_world, lam_communicator_t); OBJ_CONSTRUCT(&lam_mpi_comm_world, lam_communicator_t);
group = OBJ_NEW(lam_group_t); group = OBJ_NEW(lam_group_t);
@ -75,6 +76,7 @@ int lam_comm_init(void)
lam_mpi_comm_world.c_local_group = group; lam_mpi_comm_world.c_local_group = group;
lam_mpi_comm_world.c_remote_group = group; lam_mpi_comm_world.c_remote_group = group;
mca_pml.pml_add_comm(&lam_mpi_comm_world); mca_pml.pml_add_comm(&lam_mpi_comm_world);
lam_pointer_array_set_item(&lam_mpi_communicators, 0, &lam_mpi_comm_world);
/* Setup MPI_COMM_SELF */ /* Setup MPI_COMM_SELF */
OBJ_CONSTRUCT(&lam_mpi_comm_self, lam_communicator_t); OBJ_CONSTRUCT(&lam_mpi_comm_self, lam_communicator_t);
@ -89,6 +91,7 @@ int lam_comm_init(void)
lam_mpi_comm_self.c_local_group = group; lam_mpi_comm_self.c_local_group = group;
lam_mpi_comm_self.c_remote_group = group; lam_mpi_comm_self.c_remote_group = group;
mca_pml.pml_add_comm(&lam_mpi_comm_self); mca_pml.pml_add_comm(&lam_mpi_comm_self);
lam_pointer_array_set_item(&lam_mpi_communicators, 1, &lam_mpi_comm_self);
return LAM_SUCCESS; return LAM_SUCCESS;
} }

Просмотреть файл

@ -6,11 +6,13 @@
#include "mpi.h" #include "mpi.h"
#include "mpi/interface/c/bindings.h" #include "mpi/interface/c/bindings.h"
#include "mpi/communicator/communicator.h"
#if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES #if LAM_HAVE_WEAK_SYMBOLS && LAM_PROFILING_DEFINES
#pragma weak MPI_Comm_size = PMPI_Comm_size #pragma weak MPI_Comm_size = PMPI_Comm_size
#endif #endif
int MPI_Comm_size(MPI_Comm comm, int *size) { int MPI_Comm_size(MPI_Comm comm, int *size) {
*size = lam_comm_size(comm);
return MPI_SUCCESS; return MPI_SUCCESS;
} }

Просмотреть файл

@ -3,6 +3,7 @@
*/ */
#include "lam_config.h" #include "lam_config.h"
#include <stdio.h> #include <stdio.h>
#include <sys/time.h>
#include "mpi.h" #include "mpi.h"
#include "mpi/interface/c/bindings.h" #include "mpi/interface/c/bindings.h"
@ -12,5 +13,10 @@
#endif #endif
double MPI_Wtime(void) { double MPI_Wtime(void) {
return (double)0; struct timeval tv;
double wtime;
gettimeofday(&tv, NULL);
wtime = tv.tv_sec;
wtime += (double)tv.tv_usec / 1000000.0;
return wtime;
} }

Просмотреть файл

@ -77,7 +77,6 @@ int lam_proc_init(void)
proc->proc_vpid = vpid; proc->proc_vpid = vpid;
if(proc->proc_vpid == local->vpid && strcmp(proc->proc_job, local->job_handle) == 0) { if(proc->proc_vpid == local->vpid && strcmp(proc->proc_job, local->job_handle) == 0) {
lam_proc_local_proc = proc; lam_proc_local_proc = proc;
lam_output(0, "myrank=%d\n", local->vpid);
} }
} }
free(procs); free(procs);

Просмотреть файл

@ -27,7 +27,7 @@ laminfo_SOURCES = \
param.cc \ param.cc \
version.cc version.cc
laminfo_LDADD = $(libs) $(LIBMPI_EXTRA_LIBS) $(LIBLAM_EXTRA_LIBS) laminfo_LDADD = $(libs) $(LIBMPI_EXTRA_LIBS) $(LIBLAM_EXTRA_LIBS)
laminfo_LDFLAGS = $(LIBMPI_EXTRA_LDFLAGS) $(LIBLAM_EXTRA_LDFLAGS) laminfo_LDFLAGS = $(LIBMPI_EXTRA_LDFLAGS) $(LIBLAM_EXTRA_LDFLAGS) -lpthread
laminfo_DEPENDENCIES = $(libs) laminfo_DEPENDENCIES = $(libs)
clean-local: clean-local:

Просмотреть файл

@ -12,6 +12,7 @@ noinst_PROGRAMS = chello
chello_SOURCES = chello.c chello_SOURCES = chello.c
chello_LDADD = \ chello_LDADD = \
$(top_builddir)/src/libmpi.la \ $(top_builddir)/src/libmpi.la \
$(top_builddir)/src/liblam.la $(top_builddir)/src/liblam.la \
-lpthread
chello_DEPENDENCIES = $(chello_LDADD) chello_DEPENDENCIES = $(chello_LDADD)