1
1

Update the ORTE thread acquire/release/wakeup macros to trigger release from event_loop so that conditions can be checked.

Add macro versions of condition_wait and friends for debug use.

This commit was SVN r24115.
Этот коммит содержится в:
Ralph Castain 2010-11-30 17:27:58 +00:00
родитель 83723037d5
Коммит 09f02b3087
3 изменённых файлов: 154 добавлений и 71 удалений

Просмотреть файл

@ -18,8 +18,8 @@
*
* $HEADER$
*/
#ifndef ORTE_CONDITION_SPINLOCK_H
#define ORTE_CONDITION_SPINLOCK_H
#ifndef ORTE_CONDITION_H
#define ORTE_CONDITION_H
#include "orte_config.h"
#ifdef HAVE_SYS_TIME_H
@ -38,6 +38,8 @@
#include "opal/threads/threads.h"
#include "opal/runtime/opal_cr.h"
#include "orte/runtime/orte_globals.h"
BEGIN_C_DECLS
static inline int orte_condition_wait(opal_condition_t *c, opal_mutex_t *m)
@ -45,11 +47,13 @@ static inline int orte_condition_wait(opal_condition_t *c, opal_mutex_t *m)
int rc = 0;
c->c_waiting++;
/*
#if OPAL_HAVE_POSIX_THREADS
if (orte_progress_threads_enabled) {
rc = pthread_cond_wait(&c->c_cond, &m->m_lock_pthread);
#elif OPAL_HAVE_SOLARIS_THREADS
rc = cond_wait(&c->c_cond, &m->m_lock_solaris);
#else
} else {
#endif
*/
if (c->c_signaled) {
c->c_waiting--;
opal_mutex_unlock(m);
@ -64,34 +68,48 @@ static inline int orte_condition_wait(opal_condition_t *c, opal_mutex_t *m)
OPAL_CR_TEST_CHECKPOINT_READY_STALL();
opal_mutex_lock(m);
}
/*
#if OPAL_HAVE_POSIX_THREADS
}
#endif
*/
c->c_signaled--;
c->c_waiting--;
return rc;
}
#if OPAL_ENABLE_DEBUG
#define ORTE_CONDITION_WAIT(x, y) \
do { \
if (opal_debug_threads) { \
opal_output(0, "Entering condition wait for %s at %s:%d", \
(NULL == (x)->name) ? "NULL" : (x)->name, \
__FILE__, __LINE__); \
} \
orte_condition_wait((x), (y)); \
} while (0);
#else
#define ORTE_CONDITION_WAIT(x, y) orte_condition_wait(x, y)
#endif
static inline int orte_condition_timedwait(opal_condition_t *c,
opal_mutex_t *m,
const struct timespec *abstime)
{
int rc = 0;
struct timeval tv;
struct timeval absolute;
c->c_waiting++;
/*
#if OPAL_HAVE_POSIX_THREADS
rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock_pthread, abstime);
#elif OPAL_HAVE_SOLARIS_THREADS
{
/* deal with const-ness */
timestruc_t to;
to.tv_sec = abstime->tv_sec;
to.tv_nsec = abstime->tv_nsec;
rc = cond_timedwait(&c->c_cond, &m->m_lock_solaris, &to);
}
#else
{
struct timeval tv;
struct timeval absolute;
if (orte_progress_threads_enabled) {
rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock_pthread, abstime);
} else {
#endif
*/
absolute.tv_sec = abstime->tv_sec;
absolute.tv_usec = abstime->tv_nsec * 1000;
gettimeofday(&tv,NULL);
@ -105,9 +123,11 @@ static inline int orte_condition_timedwait(opal_condition_t *c,
(tv.tv_sec <= absolute.tv_sec ||
(tv.tv_sec == absolute.tv_sec && tv.tv_usec < absolute.tv_usec)));
}
/*
#if OPAL_HAVE_POSIX_THREADS
}
#endif
*/
if (c->c_signaled != 0) c->c_signaled--;
c->c_waiting--;
return rc;
@ -117,30 +137,62 @@ static inline int orte_condition_signal(opal_condition_t *c)
{
if (c->c_waiting) {
c->c_signaled++;
/*
#if OPAL_HAVE_POSIX_THREADS
if (orte_progress_threads_enabled) {
pthread_cond_signal(&c->c_cond);
#elif OPAL_HAVE_SOLARIS_THREADS
cond_signal(&c->c_cond);
}
#endif
*/
}
return 0;
}
#if OPAL_ENABLE_DEBUG
#define ORTE_CONDITION_SIGNAL(x) \
do { \
if (opal_debug_threads) { \
opal_output(0, "Signaling condition %s at %s:%d", \
(NULL == (x)->name) ? "NULL" : (x)->name, \
__FILE__, __LINE__); \
} \
orte_condition_signal((x)); \
} while(0);
#else
#define ORTE_CONDITION_SIGNAL(x) orte_condition_signal(x)
#endif
static inline int orte_condition_broadcast(opal_condition_t *c)
{
c->c_signaled = c->c_waiting;
/*
#if OPAL_HAVE_POSIX_THREADS
if( 1 == c->c_waiting ) {
pthread_cond_signal(&c->c_cond);
} else {
pthread_cond_broadcast(&c->c_cond);
if (orte_progress_threads_enabled) {
if( 1 == c->c_waiting ) {
pthread_cond_signal(&c->c_cond);
} else {
pthread_cond_broadcast(&c->c_cond);
}
}
#elif OPAL_HAVE_SOLARIS_THREADS
cond_broadcast(&c->c_cond);
#endif
*/
return 0;
}
#if OPAL_ENABLE_DEBUG
#define ORTE_CONDITION_BROADCAST(x) \
do { \
if (opal_debug_threads) { \
opal_output(0, "Broadcasting condition %s at %s:%d", \
(NULL == (x)->name) ? "NULL" : (x)->name, \
__FILE__, __LINE__); \
} \
orte_condition_broadcast((x)); \
} while(0);
#else
#define ORTE_CONDITION_BROADCAST(x) orte_condition_broadcast(x)
#endif
END_C_DECLS
#endif

Просмотреть файл

@ -9,9 +9,9 @@
#include "orte_config.h"
#include "orte/threads/threads.h"
#include "opal/mca/event/event.h"
bool orte_debug_threads = false;
#include "orte/threads/threads.h"
static void constructor(orte_thread_ctl_t *ptr)
{
@ -20,15 +20,18 @@ static void constructor(orte_thread_ctl_t *ptr)
ptr->active = false;
ptr->running = false;
ptr->stop = false;
ptr->rate.tv_sec = 0;
ptr->rate.tv_usec = 0;
ptr->name = NULL;
/* default to waking up the global base */
ptr->wakeup_pipe = opal_event_base->wakeup_pipe[1];
}
static void destructor(orte_thread_ctl_t *ptr)
{
OBJ_DESTRUCT(&ptr->lock);
OBJ_DESTRUCT(&ptr->cond);
if (NULL != ptr->name) {
free(ptr->name);
}
}
OBJ_CLASS_INSTANCE(orte_thread_ctl_t,
opal_object_t,
constructor, destructor);

Просмотреть файл

@ -26,6 +26,8 @@
#if OPAL_ENABLE_DEBUG
#include "opal/util/output.h"
#endif
#include "opal/util/fd.h"
#include "opal/mca/event/event.h"
#include "mutex.h"
#include "condition.h"
@ -39,35 +41,37 @@ typedef struct {
volatile bool active;
volatile bool running;
volatile bool stop;
struct timeval rate;
int wakeup_pipe;
char *name;
} orte_thread_ctl_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_thread_ctl_t);
ORTE_DECLSPEC extern bool orte_debug_threads;
#if OPAL_ENABLE_DEBUG
#define ORTE_ACQUIRE_THREAD(ctl) \
do { \
ORTE_THREAD_LOCK(&(ctl)->lock); \
if (orte_debug_threads) { \
opal_output(0, "Waiting for thread %s:%d", \
__FILE__, __LINE__); \
} \
while ((ctl)->active) { \
orte_condition_wait(&(ctl)->cond, &(ctl)->lock); \
} \
if (orte_debug_threads) { \
opal_output(0, "Thread obtained %s:%d", \
__FILE__, __LINE__); \
} \
(ctl)->active = true; \
#define ORTE_ACQUIRE_THREAD(ctl) \
do { \
ORTE_THREAD_LOCK(&(ctl)->lock); \
if (opal_debug_threads) { \
opal_output(0, "Waiting for thread %s at %s:%d:%s", \
(NULL == (ctl)->name) ? "NULL" : (ctl)->name, \
__FILE__, __LINE__, \
((ctl)->active) ? "TRUE" : "FALSE"); \
} \
while ((ctl)->active) { \
ORTE_CONDITION_WAIT(&(ctl)->cond, &(ctl)->lock); \
} \
if (opal_debug_threads) { \
opal_output(0, "Thread %s acquired at %s:%d", \
(NULL == (ctl)->name) ? "NULL" : (ctl)->name, \
__FILE__, __LINE__); \
} \
(ctl)->active = true; \
} while(0);
#else
#define ORTE_ACQUIRE_THREAD(ctl) \
do { \
ORTE_THREAD_LOCK(&(ctl)->lock); \
while ((ctl)->active) { \
orte_condition_wait(&(ctl)->cond, &(ctl)->lock); \
ORTE_CONDITION_WAIT(&(ctl)->cond, &(ctl)->lock); \
} \
(ctl)->active = true; \
} while(0);
@ -75,32 +79,56 @@ ORTE_DECLSPEC extern bool orte_debug_threads;
#if OPAL_ENABLE_DEBUG
#define ORTE_RELEASE_THREAD(ctl) \
do { \
if (orte_debug_threads) { \
opal_output(0, "Releasing thread %s:%d", \
__FILE__, __LINE__); \
} \
(ctl)->active = false; \
orte_condition_broadcast(&(ctl)->cond); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
#define ORTE_RELEASE_THREAD(ctl) \
do { \
char byte='a'; \
if (opal_debug_threads) { \
opal_output(0, "Releasing thread %s at %s:%d", \
(NULL == (ctl)->name) ? "NULL" : (ctl)->name, \
__FILE__, __LINE__); \
} \
(ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0);
#else
#define ORTE_RELEASE_THREAD(ctl) \
do { \
(ctl)->active = false; \
orte_condition_broadcast(&(ctl)->cond); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
#define ORTE_RELEASE_THREAD(ctl) \
do { \
char byte='a'; \
(ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0);
#endif
#define ORTE_WAKEUP_THREAD(ctl) \
do { \
(ctl)->active = false; \
orte_condition_broadcast(&(ctl)->cond); \
#if OPAL_ENABLE_DEBUG
#define ORTE_WAKEUP_THREAD(ctl) \
do { \
char byte='a'; \
ORTE_THREAD_LOCK(&(ctl)->lock); \
if (opal_debug_threads) { \
opal_output(0, "Waking up thread %s at %s:%d", \
(NULL == (ctl)->name) ? "NULL" : (ctl)->name, \
__FILE__, __LINE__); \
} \
(ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0);
#else
#define ORTE_WAKEUP_THREAD(ctl) \
do { \
char byte='a'; \
ORTE_THREAD_LOCK(&(ctl)->lock); \
(ctl)->active = false; \
ORTE_CONDITION_BROADCAST(&(ctl)->cond); \
opal_fd_write((ctl)->wakeup_pipe, 1, &byte); \
ORTE_THREAD_UNLOCK(&(ctl)->lock); \
} while(0);
#endif
END_C_DECLS