1
1

mca/threads/qthreads: implement missing functionalities

Signed-off-by: Shintaro Iwasaki <siwasaki@anl.gov>
Этот коммит содержится в:
Shintaro Iwasaki 2020-10-05 11:39:18 -05:00
родитель db3e598b6a
Коммит 919a16300c
11 изменённых файлов: 340 добавлений и 76 удалений

Просмотреть файл

@ -27,6 +27,7 @@ sources = \
threads_qthreads_condition.c \
threads_qthreads_module.c \
threads_qthreads_mutex.c \
threads_qthreads_wait_sync.c \
threads_qthreads_mutex.h \
threads_qthreads_threads.h \
threads_qthreads_tsd.h \

Просмотреть файл

@ -25,12 +25,6 @@
AC_DEFUN([OPAL_CONFIG_QTHREADS],[
AC_CHECK_HEADERS([qthread/qthread.h],
[AC_CHECK_LIB([qthread],[qthread_initialize],
[threads_qthreads_happy="yes"],
[threads_qthreads_happy="no"])],
[threads_qthreads_happy="no"])
AC_ARG_WITH([qthreads],
[AC_HELP_STRING([--with-qthreads=DIR],
[Specify location of qthreads installation. Error if qthreads support cannot be found.])])
@ -60,7 +54,7 @@ AC_DEFUN([OPAL_CONFIG_QTHREADS],[
AS_IF([test $opal_qthreads_happy = yes],
[OPAL_CHECK_PACKAGE([opal_qthreads],
[qthread/qthread.h],
[qthread.h],
[qthread],
[qthread_initialize],
[],

Просмотреть файл

@ -26,9 +26,10 @@
#ifndef OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_H
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_H 1
#include "@OPAL_QTHREADS_INCLUDE_PATH@qthread/qthread.h"
#include "@OPAL_QTHREADS_INCLUDE_PATH@qthread.h"
#include "@OPAL_QTHREADS_INCLUDE_PATH@qthread/tls.h"
static inline void ensure_init_qthreads(void)
static inline void opal_threads_ensure_init_qthreads(void)
{
qthread_initialize();
}

Просмотреть файл

@ -23,6 +23,7 @@
#include "opal_config.h"
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/mca/threads/thread.h"
#include "opal/mca/threads/threads.h"
#include "opal/constants.h"

Просмотреть файл

@ -19,13 +19,8 @@
* $HEADER$
*/
#include <unistd.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/constants.h"
#include "opal/util/sys_limits.h"
#include "opal/util/output.h"
#include "opal/prefetch.h"
#include "opal/mca/threads/threads.h"
#include "opal/mca/threads/tsd.h"
@ -34,40 +29,82 @@ struct opal_tsd_key_value {
opal_tsd_destructor_t destructor;
};
static int opal_main_thread;
struct opal_tsd_key_value *opal_tsd_key_values = NULL;
static int opal_tsd_key_values_count = 0;
/* false: uninitialized, true: initialized. */
static opal_atomic_lock_t opal_thread_self_key_lock = OPAL_ATOMIC_LOCK_INIT;
static bool opal_thread_self_key_init = false;
static opal_tsd_key_t opal_thread_self_key;
static inline void self_key_ensure_init(void)
{
if (false == opal_thread_self_key_init) {
/* not initialized yet. */
opal_atomic_lock(&opal_thread_self_key_lock);
/* check again. */
if (false == opal_thread_self_key_init) {
/* This thread is responsible for initializing this key. */
qthread_key_create(&opal_thread_self_key, NULL);
opal_atomic_mb();
opal_thread_self_key_init = true;
}
opal_atomic_unlock(&opal_thread_self_key_lock);
}
/* opal_thread_self_key has been already initialized. */
}
/*
* Constructor
*/
static void opal_thread_construct(opal_thread_t *t)
{
t->t_run = 0;
t->t_thread_ret = 0;
}
OBJ_CLASS_INSTANCE(opal_thread_t,
opal_object_t,
opal_thread_construct, NULL);
static inline aligned_t *opal_thread_get_qthreads_self(void)
{
self_key_ensure_init();
void *ptr = qthread_getspecific(opal_thread_self_key);
return (aligned_t *)ptr;
}
static aligned_t opal_thread_qthreads_wrapper(void *arg)
{
opal_thread_t *t = (opal_thread_t *)arg;
/* Register itself. */
self_key_ensure_init();
qthread_setspecific(opal_thread_self_key, t->t_thread_ret_ptr);
t->t_ret = ((void *(*)(void *))t->t_run)(t);
return 0;
}
opal_thread_t *opal_thread_get_self(void)
{
return NULL;
opal_threads_ensure_init_qthreads();
opal_thread_t *t = OBJ_NEW(opal_thread_t);
t->t_thread_ret_ptr = opal_thread_get_qthreads_self();
return t;
}
bool opal_thread_self_compare(opal_thread_t *t)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
int sync_wait_mt(void *p)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
return opal_thread_get_qthreads_self() == &t->t_thread_ret;
}
int opal_thread_join(opal_thread_t *t, void **thr_return)
{
return OPAL_ERR_NOT_IMPLEMENTED;
qthread_readFF(NULL, t->t_thread_ret_ptr);
if (thr_return) {
*thr_return = t->t_ret;
}
t->t_thread_ret = 0;
return OPAL_SUCCESS;
}
void opal_thread_set_main(void)
@ -76,12 +113,17 @@ void opal_thread_set_main(void)
int opal_thread_start(opal_thread_t *t)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
t->t_thread_ret_ptr = &t->t_thread_ret;
qthread_fork(opal_thread_qthreads_wrapper, t, &t->t_thread_ret);
return OPAL_SUCCESS;
}
OBJ_CLASS_DECLARATION(opal_thread_t);
int opal_tsd_key_create(opal_tsd_key_t *key, opal_tsd_destructor_t destructor)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
qthread_key_create(key, destructor);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -36,6 +36,8 @@ bool opal_uses_threads = false;
static void opal_mutex_construct(opal_mutex_t *m)
{
opal_threads_ensure_init_qthreads();
opal_mutex_create(m);
}
static void opal_mutex_destruct(opal_mutex_t *m)
@ -56,36 +58,82 @@ OBJ_CLASS_INSTANCE(opal_recursive_mutex_t,
opal_recursive_mutex_construct,
opal_mutex_destruct);
static void opal_cond_create(opal_cond_t *cond)
{
ensure_init_qthreads();
}
int opal_cond_init(opal_cond_t *cond)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_atomic_lock_init(&cond->m_lock, 0);
cond->m_waiter_head = NULL;
cond->m_waiter_tail = NULL;
return OPAL_SUCCESS;
}
typedef struct {
int m_signaled;
void *m_prev;
} cond_waiter_t;
int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock)
{
ensure_init_qthreads();
return 0 == qthread_readFE(*cond, &lock->m_lock_qthreads) ? OPAL_SUCCESS
: OPAL_ERROR;
opal_threads_ensure_init_qthreads();
/* This thread is taking "lock", so only this thread can access this
* condition variable. */
opal_atomic_lock(&cond->m_lock);
cond_waiter_t waiter = { 0, NULL };
if (NULL == cond->m_waiter_head) {
cond->m_waiter_tail = (void *)&waiter;
} else {
((cond_waiter_t *)cond->m_waiter_head)->m_prev = (void *)&waiter;
}
cond->m_waiter_head = (void *)&waiter;
opal_atomic_unlock(&cond->m_lock);
while (1) {
opal_mutex_unlock(lock);
qthread_yield();
opal_mutex_lock(lock);
/* Check if someone woke me up. */
opal_atomic_lock(&cond->m_lock);
int signaled = waiter.m_signaled;
opal_atomic_unlock(&cond->m_lock);
if (1 == signaled) {
break;
}
/* Unlock the lock again. */
}
return OPAL_SUCCESS;
}
int opal_cond_broadcast(opal_cond_t *cond)
{
ensure_init_qthreads();
return 0 == qthread_fill(*cond) ? OPAL_SUCCESS : OPAL_ERROR;
opal_atomic_lock(&cond->m_lock);
while (NULL != cond->m_waiter_tail) {
cond_waiter_t *p_cur_tail = (cond_waiter_t *)cond->m_waiter_tail;
cond->m_waiter_tail = p_cur_tail->m_prev;
/* Awaken one of threads in a FIFO manner. */
p_cur_tail->m_signaled = 1;
}
/* No waiters. */
cond->m_waiter_head = NULL;
opal_atomic_unlock(&cond->m_lock);
return OPAL_SUCCESS;
}
int opal_cond_signal(opal_cond_t *cond)
{
ensure_init_qthreads();
return 0 == qthread_fill(*cond) ? OPAL_SUCCESS : OPAL_ERROR;
opal_atomic_lock(&cond->m_lock);
if (NULL != cond->m_waiter_tail) {
cond_waiter_t *p_cur_tail = (cond_waiter_t *)cond->m_waiter_tail;
cond->m_waiter_tail = p_cur_tail->m_prev;
/* Awaken one of threads. */
p_cur_tail->m_signaled = 1;
if (NULL == cond->m_waiter_tail) {
cond->m_waiter_head = NULL;
}
}
opal_atomic_unlock(&cond->m_lock);
return OPAL_SUCCESS;
}
int opal_cond_destroy(opal_cond_t *cond)
{
return OPAL_ERR_NOT_IMPLEMENTED;
return OPAL_SUCCESS;
}

Просмотреть файл

@ -29,21 +29,13 @@
#ifndef OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_MUTEX_H
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_MUTEX_H 1
/**
* @file:
*
* Mutual exclusion functions: Unix implementation.
*
* Functions for locking of critical sections.
*
*/
#include "opal_config.h"
#include <errno.h>
#include <stdio.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/constants.h"
#include "opal/class/opal_object.h"
#include "opal/sys/atomic.h"
#include "opal/util/output.h"
@ -53,7 +45,7 @@ BEGIN_C_DECLS
struct opal_mutex_t {
opal_object_t super;
aligned_t m_lock_qthreads;
opal_atomic_lock_t m_lock;
int m_recursive;
#if OPAL_ENABLE_DEBUG
@ -72,7 +64,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 0, \
.m_lock_debug = 0, \
.m_lock_file = NULL, \
@ -83,7 +75,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 0, \
.m_lock_atomic = OPAL_ATOMIC_LOCK_INIT, \
}
@ -93,7 +85,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_RECURSIVE_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 1, \
.m_lock_debug = 0, \
.m_lock_file = NULL, \
@ -104,7 +96,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_RECURSIVE_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 1, \
.m_lock_atomic = OPAL_ATOMIC_LOCK_INIT, \
}
@ -118,28 +110,42 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
static inline void opal_mutex_create(struct opal_mutex_t *m)
{
opal_atomic_lock_init(&m->m_lock, 0);
opal_atomic_lock_init(&m->m_lock_atomic, 0);
m->m_recursive = 0;
#if OPAL_ENABLE_DEBUG
m->m_lock_debug = 0;
m->m_lock_file = NULL;
m->m_lock_line = 0;
#endif
}
static inline int opal_mutex_trylock(opal_mutex_t *m)
{
ensure_init_qthreads();
return 0 == qthread_readFE_nb(NULL, &m->m_lock_qthreads) ? OPAL_SUCCESS
: OPAL_ERROR;
opal_threads_ensure_init_qthreads();
int ret = opal_atomic_trylock(&m->m_lock);
if (0 != ret) {
/* Yield to avoid a deadlock. */
qthread_yield();
}
return ret;
}
static inline void opal_mutex_lock(opal_mutex_t *m)
{
ensure_init_qthreads();
qthread_lock(&m->m_lock_qthreads);
opal_threads_ensure_init_qthreads();
int ret = opal_atomic_trylock(&m->m_lock);
while (0 != ret) {
qthread_yield();
ret = opal_atomic_trylock(&m->m_lock);
}
}
static inline void opal_mutex_unlock(opal_mutex_t *m)
{
ensure_init_qthreads();
opal_threads_ensure_init_qthreads();
qthread_unlock(&m->m_lock_qthreads);
opal_atomic_unlock(&m->m_lock);
/* For fairness of locking. */
qthread_yield();
}
@ -194,8 +200,18 @@ static inline void opal_mutex_atomic_unlock(opal_mutex_t *m)
#endif
typedef aligned_t *opal_cond_t;
#define OPAL_CONDITION_STATIC_INIT 0
typedef struct opal_cond_t {
opal_atomic_lock_t m_lock;
void *m_waiter_head;
void *m_waiter_tail;
} opal_cond_t;
#define OPAL_CONDITION_STATIC_INIT \
{ \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_waiter_head = NULL, \
.m_waiter_tail = NULL, \
}
int opal_cond_init(opal_cond_t *cond);
int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock);

Просмотреть файл

@ -27,13 +27,15 @@
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_THREADS_H 1
#include <signal.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
struct opal_thread_t {
opal_object_t super;
opal_thread_fn_t t_run;
void *t_arg;
unsigned *t_handle;
void *t_ret;
aligned_t t_thread_ret;
aligned_t *t_thread_ret_ptr;
};
#endif /* OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_THREADS_H */

Просмотреть файл

@ -33,7 +33,7 @@ typedef qthread_key_t opal_tsd_key_t;
static inline int opal_tsd_key_delete(opal_tsd_key_t key)
{
return 0 == qthread_key_delete(&key) ? OPAL_SUCCESS : OPAL_ERROR;
return 0 == qthread_key_delete(key) ? OPAL_SUCCESS : OPAL_ERROR;
}
static inline int opal_tsd_set(opal_tsd_key_t key, void *value)
@ -43,7 +43,7 @@ static inline int opal_tsd_set(opal_tsd_key_t key, void *value)
static inline int opal_tsd_get(opal_tsd_key_t key, void **valuep)
{
qthread_getspecific(key);
*valuep = qthread_getspecific(key);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,116 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2019 Sandia National Laboratories. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/mca/threads/wait_sync.h"
static opal_mutex_t wait_sync_lock = OPAL_MUTEX_STATIC_INIT;
static ompi_wait_sync_t *wait_sync_list = NULL;
static opal_atomic_int32_t num_thread_in_progress = 0;
#define WAIT_SYNC_PASS_OWNERSHIP(who) \
do { \
opal_mutex_lock(&(who)->lock); \
opal_cond_signal(&(who)->condition); \
opal_mutex_unlock(&(who)->lock); \
} while (0)
int ompi_sync_wait_mt(ompi_wait_sync_t *sync)
{
/* Don't stop if the waiting synchronization is completed. We avoid the
* race condition around the release of the synchronization using the
* signaling field.
*/
if (sync->count <= 0) {
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}
/* lock so nobody can signal us during the list updating */
opal_mutex_lock(&sync->lock);
/* Now that we hold the lock make sure another thread has not already
* call cond_signal.
*/
if (sync->count <= 0) {
opal_mutex_unlock(&sync->lock);
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}
/* Insert sync on the list of pending synchronization constructs */
OPAL_THREAD_LOCK(&wait_sync_lock);
if (NULL == wait_sync_list) {
sync->next = sync->prev = sync;
wait_sync_list = sync;
} else {
sync->prev = wait_sync_list->prev;
sync->prev->next = sync;
sync->next = wait_sync_list;
wait_sync_list->prev = sync;
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
/**
* If we are not responsible for progressing, go silent until something
* worth noticing happen:
* - this thread has been promoted to take care of the progress
* - our sync has been triggered.
*/
check_status:
if (sync != wait_sync_list &&
num_thread_in_progress >= opal_max_thread_in_progress) {
opal_cond_wait(&sync->condition, &sync->lock);
/**
* At this point either the sync was completed in which case
* we should remove it from the wait list, or/and I was
* promoted as the progress manager.
*/
if (sync->count <= 0) { /* Completed? */
opal_mutex_unlock(&sync->lock);
goto i_am_done;
}
/* either promoted, or spurious wakeup ! */
goto check_status;
}
opal_mutex_unlock(&sync->lock);
OPAL_THREAD_ADD_FETCH32(&num_thread_in_progress, 1);
while (sync->count > 0) { /* progress till completion */
/* don't progress with the sync lock locked or you'll deadlock */
opal_progress();
qthread_yield();
}
OPAL_THREAD_ADD_FETCH32(&num_thread_in_progress, -1);
i_am_done:
/* My sync is now complete. Trim the list: remove self, wake next */
OPAL_THREAD_LOCK(&wait_sync_lock);
sync->prev->next = sync->next;
sync->next->prev = sync->prev;
/* In case I am the progress manager, pass the duties on */
if (sync == wait_sync_list) {
wait_sync_list = (sync == sync->next) ? NULL : sync->next;
if (NULL != wait_sync_list) {
WAIT_SYNC_PASS_OWNERSHIP(wait_sync_list);
}
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}

Просмотреть файл

@ -28,16 +28,20 @@
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_WAIT_SYNC_H 1
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/mca/threads/mutex.h"
typedef struct ompi_wait_sync_t {
opal_atomic_int32_t count;
int32_t status;
opal_cond_t condition;
opal_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
volatile bool signaling;
} ompi_wait_sync_t;
#define SYNC_WAIT(sync) (opal_using_threads() ? ompi_sync_wait_mt (sync) : sync_wait_st (sync))
#define SYNC_WAIT(sync) \
(opal_using_threads() ? ompi_sync_wait_mt (sync) : sync_wait_st (sync))
/* The loop in release handles a race condition between the signaling
* thread and the destruction of the condition variable. The signaling
@ -47,17 +51,56 @@ typedef struct ompi_wait_sync_t {
* as possible. Note that the race window is small so spinning here
* is more optimal than sleeping since this macro is called in
* the critical path. */
#define WAIT_SYNC_RELEASE(sync) do { } while (0)
#define WAIT_SYNC_RELEASE_NOWAIT(sync) do { } while (0)
#define WAIT_SYNC_SIGNAL(sync) do { } while (0)
#define WAIT_SYNC_SIGNALLED(sync) do { } while (0)
#define WAIT_SYNC_RELEASE(sync) \
if (opal_using_threads()) { \
while ((sync)->signaling) { \
qthread_yield(); \
continue; \
} \
opal_cond_destroy(&(sync)->condition); \
}
#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
if (opal_using_threads()) { \
opal_cond_destroy(&(sync)->condition); \
}
#define WAIT_SYNC_SIGNAL(sync) \
if (opal_using_threads()) { \
opal_mutex_lock(&(sync)->lock); \
opal_cond_signal(&(sync)->condition); \
opal_mutex_unlock(&(sync)->lock); \
(sync)->signaling = false; \
}
#define WAIT_SYNC_SIGNALLED(sync) \
{ \
(sync)->signaling = false; \
}
OPAL_DECLSPEC int ompi_sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st (ompi_wait_sync_t *sync)
static inline int sync_wait_st(ompi_wait_sync_t *sync)
{
while (sync->count > 0) {
opal_progress();
qthread_yield();
}
return sync->status;
}
#define WAIT_SYNC_INIT(sync,c) do { } while (0)
#define WAIT_SYNC_INIT(sync,c) \
do { \
(sync)->count = (c); \
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
(sync)->signaling = (0 != (c)); \
if (opal_using_threads()) { \
opal_cond_init(&(sync)->condition); \
opal_mutex_create(&(sync)->lock); \
} \
} while (0)
#endif /* OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_WAIT_SYNC_H */