1
1

Merge pull request #8053 from shintaro-iwasaki/topic/fix_issue_8036

opal/mca/threads/qthreads: Fix #8036
Этот коммит содержится в:
Jeff Squyres 2020-10-08 09:40:58 -04:00 коммит произвёл GitHub
родитель 4c86172886 84dcb233bf
Коммит 0bcef049c9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 447 добавлений и 100 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -688,3 +688,4 @@ test/util/bipartite_graph
opal/test/reachable/reachable_netlink
opal/test/reachable/reachable_weighted
opal/mca/threads/argobots/threads_argobots.h
opal/mca/threads/qthreads/threads_qthreads.h

Просмотреть файл

@ -123,12 +123,6 @@ AC_DEFUN([MCA_opal_threads_argobots_POST_CONFIG],[
THREAD_CXXCPPFLAGS="$TPKG_CXXCPPFLAGS"
THREAD_LDFLAGS="$TPKG_LDFLAGS"
THREAD_LIBS="$TPKG_LIBS"
AC_SUBST(THREAD_CFLAGS)
AC_SUBST(THREAD_FCFLAGS)
AC_SUBST(THREAD_CXXFLAGS)
AC_SUBST(THREAD_CPPFLAGS)
AC_SUBST(THREAD_LDFLAGS)
AC_SUBST(THREAD_LIBS)
LIBS="$LIBS $THREAD_LIBS"
LDFLAGS="$LDFLAGS $THREAD_LDFLAGS"
])

Просмотреть файл

@ -56,5 +56,12 @@ AS_IF([test x"$opal_thread_type_found" = x""],
AC_MSG_RESULT([Found thread type $opal_thread_type_found])
AC_SUBST(THREAD_CFLAGS)
AC_SUBST(THREAD_FCFLAGS)
AC_SUBST(THREAD_CXXFLAGS)
AC_SUBST(THREAD_CPPFLAGS)
AC_SUBST(THREAD_LDFLAGS)
AC_SUBST(THREAD_LIBS)
OPAL_SUMMARY_ADD([[Miscellaneous]],[[Threading Package]],[], [$opal_thread_type_found])
])dnl

Просмотреть файл

@ -824,12 +824,6 @@ AC_DEFUN([MCA_opal_threads_pthreads_POST_CONFIG],[
THREAD_CXXCPPFLAGS="$TPKG_CXXCPPFLAGS"
THREAD_LDFLAGS="$TPKG_LDFLAGS"
THREAD_LIBS="$TPKG_LIBS"
AC_SUBST(THREAD_CFLAGS)
AC_SUBST(THREAD_FCFLAGS)
AC_SUBST(THREAD_CXXFLAGS)
AC_SUBST(THREAD_CPPFLAGS)
AC_SUBST(THREAD_LDFLAGS)
AC_SUBST(THREAD_LIBS)
])
])dnl

Просмотреть файл

@ -19,12 +19,38 @@
# $HEADER$
#
noinst_LTLIBRARIES = libmca_threads_qthreads.la
AM_CPPFLAGS = $(opal_qthreads_CPPFLAGS)
libmca_threads_qthreads_la_SOURCES = \
sources = \
threads_qthreads.h \
threads_qthreads_component.c \
threads_qthreads_mutex.c \
threads_qthreads_condition.c \
threads_qthreads_module.c
threads_qthreads_module.c \
threads_qthreads_mutex.c \
threads_qthreads_wait_sync.c \
threads_qthreads_mutex.h \
threads_qthreads_threads.h \
threads_qthreads_tsd.h \
threads_qthreads_wait_sync.h
AM_LDFLAGS = -lqthread
lib_sources = $(sources)
if MCA_BUILD_opal_threads_qthreads_DSO
component_noinst =
component_install = mca_threads_qthreads.la
else
component_noinst = libmca_threads_qthreads.la
component_install =
endif
mcacomponentdir = $(opallibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_threads_qthreads_la_SOURCES = $(sources)
mca_threads_qthreads_la_LDFLAGS = -module -avoid-version
mca_threads_qthreads_la_LIBADD = $(top_builddir)/opal/lib@OPAL_LIB_PREFIX@open-pal.la \
$(opal_qthreads_LIBS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_threads_qthreads_la_SOURCES = $(sources)
libmca_threads_qthreads_la_LDFLAGS = -module -avoid-version $(opal_qthreads_LDFLAGS)
libmca_threads_qthreads_la_LIBADD = $(opal_qthreads_LIBS)

Просмотреть файл

@ -25,13 +25,68 @@
AC_DEFUN([OPAL_CONFIG_QTHREADS],[
AC_CHECK_HEADERS([qthread/qthread.h],
[AC_CHECK_LIB([qthread],[qthread_initialize],
[threads_qthreads_happy="yes"],
[threads_qthreads_happy="no"])],
[threads_qthreads_happy="no"])
AC_ARG_WITH([qthreads],
[AC_HELP_STRING([--with-qthreads=DIR],
[Specify location of qthreads installation. Error if qthreads support cannot be found.])])
AS_IF([test "$threads_qthreads_happy" = "yes"],
AC_ARG_WITH([qthreads-libdir],
[AC_HELP_STRING([--with-qthreads-libdir=DIR],
[Search for qthreads libraries in DIR])])
opal_check_qthreads_save_CPPFLAGS=$CPPFLAGS
opal_check_qthreads_save_LDFLAGS=$LDFLAGS
opal_check_qthreads_save_LIBS=$LIBS
opal_qthreads_happy=yes
AS_IF([test "$with_qthreads" = "no"],
[opal_qthreads_happy=no])
AS_IF([test $opal_qthreads_happy = yes],
[AC_MSG_CHECKING([looking for qthreads in])
AS_IF([test "$with_qthreads" != "yes"],
[opal_qthreads_dir=$with_qthreads
AC_MSG_RESULT([($opal_qthreads_dir)])],
[AC_MSG_RESULT([(default search paths)])])
AS_IF([test ! -z "$with_qthreads_libdir" && \
test "$with_qthreads_libdir" != "yes"],
[opal_qthreads_libdir=$with_qthreads_libdir])
])
AS_IF([test $opal_qthreads_happy = yes],
[OPAL_CHECK_PACKAGE([opal_qthreads],
[qthread.h],
[qthread],
[qthread_initialize],
[],
[$opal_qthreads_dir],
[$opal_qthreads_libdir],
[],
[opal_qthreads_happy=no])])
AS_IF([test $opal_qthreads_happy = yes && test -n "$opal_qthreads_dir"],
[OPAL_QTHREADS_INCLUDE_PATH="$opal_qthreads_dir/include/"],
[OPAL_QTHREADS_INCLUDE_PATH=""])
AS_IF([test $opal_qthreads_happy = yes],
[TPKG_CFLAGS="$opal_qthreads_CPPFLAGS"
TPKG_FCFLAGS="$opal_qthreads_CPPFLAGS"
TPKG_CXXFLAGS="$opal_qthreads_CPPFLAGS"
TPKG_CPPFLAGS="$opal_qthreads_CPPFLAGS"
TPKG_CXXCPPFLAGS="$opal_qthreads_CPPFLAGS"
TPKG_LDFLAGS="$opal_qthreads_LDFLAGS"
TPKG_LIBS="$opal_qthreads_LIBS"])
AC_CONFIG_FILES([opal/mca/threads/qthreads/threads_qthreads.h])
AC_SUBST([OPAL_QTHREADS_INCLUDE_PATH])
AC_SUBST([opal_qthreads_CPPFLAGS])
AC_SUBST([opal_qthreads_LDFLAGS])
AC_SUBST([opal_qthreads_LIBS])
CPPFLAGS=$opal_check_qthreads_save_CPPFLAGS
LDFLAGS=$opal_check_qthreads_save_LDFLAGS
LIBS=$opal_check_qthreads_save_LIBS
AS_IF([test "$opal_qthreads_happy" = "yes"],
[$1],
[$2])
])dnl
@ -60,6 +115,15 @@ AC_DEFUN([MCA_opal_threads_qthreads_POST_CONFIG],[
AC_DEFINE_UNQUOTED([MCA_threads_wait_sync_base_include_HEADER],
["opal/mca/threads/qthreads/threads_qthreads_wait_sync.h"],
[Header to include for wait_sync implementation])
THREAD_CFLAGS="$TPKG_CFLAGS"
THREAD_FCFLAGS="$TPKG_FCFLAGS"
THREAD_CXXFLAGS="$TPKG_CXXFLAGS"
THREAD_CPPFLAGS="$TPKG_CPPFLAGS"
THREAD_CXXCPPFLAGS="$TPKG_CXXCPPFLAGS"
THREAD_LDFLAGS="$TPKG_LDFLAGS"
THREAD_LIBS="$TPKG_LIBS"
LIBS="$LIBS $THREAD_LIBS"
LDFLAGS="$LDFLAGS $THREAD_LDFLAGS"
])
])dnl

Просмотреть файл

@ -26,9 +26,10 @@
#ifndef OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_H
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_H 1
#include <qthread/qthread.h>
#include "@OPAL_QTHREADS_INCLUDE_PATH@qthread.h"
#include "@OPAL_QTHREADS_INCLUDE_PATH@qthread/tls.h"
static inline void ensure_init_qthreads(void)
static inline void opal_threads_ensure_init_qthreads(void)
{
qthread_initialize();
}

Просмотреть файл

@ -23,6 +23,7 @@
#include "opal_config.h"
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/mca/threads/thread.h"
#include "opal/mca/threads/threads.h"
#include "opal/constants.h"

Просмотреть файл

@ -19,13 +19,8 @@
* $HEADER$
*/
#include <unistd.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/constants.h"
#include "opal/util/sys_limits.h"
#include "opal/util/output.h"
#include "opal/prefetch.h"
#include "opal/mca/threads/threads.h"
#include "opal/mca/threads/tsd.h"
@ -34,40 +29,82 @@ struct opal_tsd_key_value {
opal_tsd_destructor_t destructor;
};
static int opal_main_thread;
struct opal_tsd_key_value *opal_tsd_key_values = NULL;
static int opal_tsd_key_values_count = 0;
/* false: uninitialized, true: initialized. */
static opal_atomic_lock_t opal_thread_self_key_lock = OPAL_ATOMIC_LOCK_INIT;
static bool opal_thread_self_key_init = false;
static opal_tsd_key_t opal_thread_self_key;
static inline void self_key_ensure_init(void)
{
if (false == opal_thread_self_key_init) {
/* not initialized yet. */
opal_atomic_lock(&opal_thread_self_key_lock);
/* check again. */
if (false == opal_thread_self_key_init) {
/* This thread is responsible for initializing this key. */
qthread_key_create(&opal_thread_self_key, NULL);
opal_atomic_mb();
opal_thread_self_key_init = true;
}
opal_atomic_unlock(&opal_thread_self_key_lock);
}
/* opal_thread_self_key has been already initialized. */
}
/*
* Constructor
*/
static void opal_thread_construct(opal_thread_t *t)
{
t->t_run = 0;
t->t_thread_ret = 0;
}
OBJ_CLASS_INSTANCE(opal_thread_t,
opal_object_t,
opal_thread_construct, NULL);
static inline aligned_t *opal_thread_get_qthreads_self(void)
{
self_key_ensure_init();
void *ptr = qthread_getspecific(opal_thread_self_key);
return (aligned_t *)ptr;
}
static aligned_t opal_thread_qthreads_wrapper(void *arg)
{
opal_thread_t *t = (opal_thread_t *)arg;
/* Register itself. */
self_key_ensure_init();
qthread_setspecific(opal_thread_self_key, t->t_thread_ret_ptr);
t->t_ret = ((void *(*)(void *))t->t_run)(t);
return 0;
}
opal_thread_t *opal_thread_get_self(void)
{
return NULL;
opal_threads_ensure_init_qthreads();
opal_thread_t *t = OBJ_NEW(opal_thread_t);
t->t_thread_ret_ptr = opal_thread_get_qthreads_self();
return t;
}
bool opal_thread_self_compare(opal_thread_t *t)
{
return OPAL_ERR_NOT_IMPLEMENTED;
}
int sync_wait_mt(void *p)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
return opal_thread_get_qthreads_self() == &t->t_thread_ret;
}
int opal_thread_join(opal_thread_t *t, void **thr_return)
{
return OPAL_ERR_NOT_IMPLEMENTED;
qthread_readFF(NULL, t->t_thread_ret_ptr);
if (thr_return) {
*thr_return = t->t_ret;
}
t->t_thread_ret = 0;
return OPAL_SUCCESS;
}
void opal_thread_set_main(void)
@ -76,12 +113,17 @@ void opal_thread_set_main(void)
int opal_thread_start(opal_thread_t *t)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
t->t_thread_ret_ptr = &t->t_thread_ret;
qthread_fork(opal_thread_qthreads_wrapper, t, &t->t_thread_ret);
return OPAL_SUCCESS;
}
OBJ_CLASS_DECLARATION(opal_thread_t);
int opal_tsd_key_create(opal_tsd_key_t *key, opal_tsd_destructor_t destructor)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_threads_ensure_init_qthreads();
qthread_key_create(key, destructor);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -36,6 +36,8 @@ bool opal_uses_threads = false;
static void opal_mutex_construct(opal_mutex_t *m)
{
opal_threads_ensure_init_qthreads();
opal_mutex_create(m);
}
static void opal_mutex_destruct(opal_mutex_t *m)
@ -56,36 +58,82 @@ OBJ_CLASS_INSTANCE(opal_recursive_mutex_t,
opal_recursive_mutex_construct,
opal_mutex_destruct);
static void opal_cond_create(opal_cond_t *cond)
{
ensure_init_qthreads();
}
int opal_cond_init(opal_cond_t *cond)
{
return OPAL_ERR_NOT_IMPLEMENTED;
opal_atomic_lock_init(&cond->m_lock, 0);
cond->m_waiter_head = NULL;
cond->m_waiter_tail = NULL;
return OPAL_SUCCESS;
}
typedef struct {
int m_signaled;
void *m_prev;
} cond_waiter_t;
int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock)
{
ensure_init_qthreads();
return 0 == qthread_readFE(*cond, &lock->m_lock_qthreads) ? OPAL_SUCCESS
: OPAL_ERROR;
opal_threads_ensure_init_qthreads();
/* This thread is taking "lock", so only this thread can access this
* condition variable. */
opal_atomic_lock(&cond->m_lock);
cond_waiter_t waiter = { 0, NULL };
if (NULL == cond->m_waiter_head) {
cond->m_waiter_tail = (void *)&waiter;
} else {
((cond_waiter_t *)cond->m_waiter_head)->m_prev = (void *)&waiter;
}
cond->m_waiter_head = (void *)&waiter;
opal_atomic_unlock(&cond->m_lock);
while (1) {
opal_mutex_unlock(lock);
qthread_yield();
opal_mutex_lock(lock);
/* Check if someone woke me up. */
opal_atomic_lock(&cond->m_lock);
int signaled = waiter.m_signaled;
opal_atomic_unlock(&cond->m_lock);
if (1 == signaled) {
break;
}
/* Unlock the lock again. */
}
return OPAL_SUCCESS;
}
int opal_cond_broadcast(opal_cond_t *cond)
{
ensure_init_qthreads();
return 0 == qthread_fill(*cond) ? OPAL_SUCCESS : OPAL_ERROR;
opal_atomic_lock(&cond->m_lock);
while (NULL != cond->m_waiter_tail) {
cond_waiter_t *p_cur_tail = (cond_waiter_t *)cond->m_waiter_tail;
cond->m_waiter_tail = p_cur_tail->m_prev;
/* Awaken one of threads in a FIFO manner. */
p_cur_tail->m_signaled = 1;
}
/* No waiters. */
cond->m_waiter_head = NULL;
opal_atomic_unlock(&cond->m_lock);
return OPAL_SUCCESS;
}
int opal_cond_signal(opal_cond_t *cond)
{
ensure_init_qthreads();
return 0 == qthread_fill(*cond) ? OPAL_SUCCESS : OPAL_ERROR;
opal_atomic_lock(&cond->m_lock);
if (NULL != cond->m_waiter_tail) {
cond_waiter_t *p_cur_tail = (cond_waiter_t *)cond->m_waiter_tail;
cond->m_waiter_tail = p_cur_tail->m_prev;
/* Awaken one of threads. */
p_cur_tail->m_signaled = 1;
if (NULL == cond->m_waiter_tail) {
cond->m_waiter_head = NULL;
}
}
opal_atomic_unlock(&cond->m_lock);
return OPAL_SUCCESS;
}
int opal_cond_destroy(opal_cond_t *cond)
{
return OPAL_ERR_NOT_IMPLEMENTED;
return OPAL_SUCCESS;
}

Просмотреть файл

@ -29,33 +29,23 @@
#ifndef OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_MUTEX_H
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_MUTEX_H 1
/**
* @file:
*
* Mutual exclusion functions: Unix implementation.
*
* Functions for locking of critical sections.
*
*/
#include "opal_config.h"
#include <errno.h>
#include <stdio.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/constants.h"
#include "opal/class/opal_object.h"
#include "opal/sys/atomic.h"
#include "opal/util/output.h"
#include <qthread/qthread.h>
BEGIN_C_DECLS
struct opal_mutex_t {
opal_object_t super;
aligned_t m_lock_qthreads;
opal_atomic_lock_t m_lock;
int m_recursive;
#if OPAL_ENABLE_DEBUG
@ -74,7 +64,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 0, \
.m_lock_debug = 0, \
.m_lock_file = NULL, \
@ -85,7 +75,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 0, \
.m_lock_atomic = OPAL_ATOMIC_LOCK_INIT, \
}
@ -95,7 +85,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_RECURSIVE_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 1, \
.m_lock_debug = 0, \
.m_lock_file = NULL, \
@ -106,7 +96,7 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
#define OPAL_RECURSIVE_MUTEX_STATIC_INIT \
{ \
.super = OPAL_OBJ_STATIC_INIT(opal_mutex_t), \
.m_lock_qthreads = 0, \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_recursive = 1, \
.m_lock_atomic = OPAL_ATOMIC_LOCK_INIT, \
}
@ -120,28 +110,42 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_recursive_mutex_t);
static inline void opal_mutex_create(struct opal_mutex_t *m)
{
opal_atomic_lock_init(&m->m_lock, 0);
opal_atomic_lock_init(&m->m_lock_atomic, 0);
m->m_recursive = 0;
#if OPAL_ENABLE_DEBUG
m->m_lock_debug = 0;
m->m_lock_file = NULL;
m->m_lock_line = 0;
#endif
}
static inline int opal_mutex_trylock(opal_mutex_t *m)
{
ensure_init_qthreads();
return 0 == qthread_readFE_nb(NULL, &m->m_lock_qthreads) ? OPAL_SUCCESS
: OPAL_ERROR;
opal_threads_ensure_init_qthreads();
int ret = opal_atomic_trylock(&m->m_lock);
if (0 != ret) {
/* Yield to avoid a deadlock. */
qthread_yield();
}
return ret;
}
static inline void opal_mutex_lock(opal_mutex_t *m)
{
ensure_init_qthreads();
qthread_lock(&m->m_lock_qthreads);
opal_threads_ensure_init_qthreads();
int ret = opal_atomic_trylock(&m->m_lock);
while (0 != ret) {
qthread_yield();
ret = opal_atomic_trylock(&m->m_lock);
}
}
static inline void opal_mutex_unlock(opal_mutex_t *m)
{
ensure_init_qthreads();
opal_threads_ensure_init_qthreads();
qthread_unlock(&m->m_lock_qthreads);
opal_atomic_unlock(&m->m_lock);
/* For fairness of locking. */
qthread_yield();
}
@ -196,8 +200,18 @@ static inline void opal_mutex_atomic_unlock(opal_mutex_t *m)
#endif
typedef aligned_t *opal_cond_t;
#define OPAL_CONDITION_STATIC_INIT 0
typedef struct opal_cond_t {
opal_atomic_lock_t m_lock;
void *m_waiter_head;
void *m_waiter_tail;
} opal_cond_t;
#define OPAL_CONDITION_STATIC_INIT \
{ \
.m_lock = OPAL_ATOMIC_LOCK_INIT, \
.m_waiter_head = NULL, \
.m_waiter_tail = NULL, \
}
int opal_cond_init(opal_cond_t *cond);
int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock);

Просмотреть файл

@ -26,15 +26,16 @@
#ifndef OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_THREADS_H
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_THREADS_H 1
#include <qthread/qthread.h>
#include <signal.h>
#include "opal/mca/threads/qthreads/threads_qthreads.h"
struct opal_thread_t {
opal_object_t super;
opal_thread_fn_t t_run;
void *t_arg;
unsigned *t_handle;
void *t_ret;
aligned_t t_thread_ret;
aligned_t *t_thread_ret_ptr;
};
#endif /* OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_THREADS_H */

Просмотреть файл

@ -28,14 +28,12 @@
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_TSD_H 1
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include <qthread/qthread.h>
#include <qthread/tls.h>
typedef qthread_key_t opal_tsd_key_t;
static inline int opal_tsd_key_delete(opal_tsd_key_t key)
{
return 0 == qthread_key_delete(&key) ? OPAL_SUCCESS : OPAL_ERROR;
return 0 == qthread_key_delete(key) ? OPAL_SUCCESS : OPAL_ERROR;
}
static inline int opal_tsd_set(opal_tsd_key_t key, void *value)
@ -45,7 +43,7 @@ static inline int opal_tsd_set(opal_tsd_key_t key, void *value)
static inline int opal_tsd_get(opal_tsd_key_t key, void **valuep)
{
qthread_getspecific(key);
*valuep = qthread_getspecific(key);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,116 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2019 Sandia National Laboratories. All rights reserved.
*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include "opal/mca/threads/wait_sync.h"
static opal_mutex_t wait_sync_lock = OPAL_MUTEX_STATIC_INIT;
static ompi_wait_sync_t *wait_sync_list = NULL;
static opal_atomic_int32_t num_thread_in_progress = 0;
#define WAIT_SYNC_PASS_OWNERSHIP(who) \
do { \
opal_mutex_lock(&(who)->lock); \
opal_cond_signal(&(who)->condition); \
opal_mutex_unlock(&(who)->lock); \
} while (0)
int ompi_sync_wait_mt(ompi_wait_sync_t *sync)
{
/* Don't stop if the waiting synchronization is completed. We avoid the
* race condition around the release of the synchronization using the
* signaling field.
*/
if (sync->count <= 0) {
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}
/* lock so nobody can signal us during the list updating */
opal_mutex_lock(&sync->lock);
/* Now that we hold the lock make sure another thread has not already
* call cond_signal.
*/
if (sync->count <= 0) {
opal_mutex_unlock(&sync->lock);
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}
/* Insert sync on the list of pending synchronization constructs */
OPAL_THREAD_LOCK(&wait_sync_lock);
if (NULL == wait_sync_list) {
sync->next = sync->prev = sync;
wait_sync_list = sync;
} else {
sync->prev = wait_sync_list->prev;
sync->prev->next = sync;
sync->next = wait_sync_list;
wait_sync_list->prev = sync;
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
/**
* If we are not responsible for progressing, go silent until something
* worth noticing happen:
* - this thread has been promoted to take care of the progress
* - our sync has been triggered.
*/
check_status:
if (sync != wait_sync_list &&
num_thread_in_progress >= opal_max_thread_in_progress) {
opal_cond_wait(&sync->condition, &sync->lock);
/**
* At this point either the sync was completed in which case
* we should remove it from the wait list, or/and I was
* promoted as the progress manager.
*/
if (sync->count <= 0) { /* Completed? */
opal_mutex_unlock(&sync->lock);
goto i_am_done;
}
/* either promoted, or spurious wakeup ! */
goto check_status;
}
opal_mutex_unlock(&sync->lock);
OPAL_THREAD_ADD_FETCH32(&num_thread_in_progress, 1);
while (sync->count > 0) { /* progress till completion */
/* don't progress with the sync lock locked or you'll deadlock */
opal_progress();
qthread_yield();
}
OPAL_THREAD_ADD_FETCH32(&num_thread_in_progress, -1);
i_am_done:
/* My sync is now complete. Trim the list: remove self, wake next */
OPAL_THREAD_LOCK(&wait_sync_lock);
sync->prev->next = sync->next;
sync->next->prev = sync->prev;
/* In case I am the progress manager, pass the duties on */
if (sync == wait_sync_list) {
wait_sync_list = (sync == sync->next) ? NULL : sync->next;
if (NULL != wait_sync_list) {
WAIT_SYNC_PASS_OWNERSHIP(wait_sync_list);
}
}
OPAL_THREAD_UNLOCK(&wait_sync_lock);
return (0 == sync->status) ? OPAL_SUCCESS : OPAL_ERROR;
}

Просмотреть файл

@ -28,19 +28,20 @@
#define OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_WAIT_SYNC_H 1
#include "opal/mca/threads/qthreads/threads_qthreads.h"
#include <abt.h>
#include "opal/mca/threads/mutex.h"
typedef struct ompi_wait_sync_t {
opal_atomic_int32_t count;
int32_t status;
ABT_cond condition;
ABT_mutex lock;
opal_cond_t condition;
opal_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
volatile bool signaling;
} ompi_wait_sync_t;
#define SYNC_WAIT(sync) (opal_using_threads() ? ompi_sync_wait_mt (sync) : sync_wait_st (sync))
#define SYNC_WAIT(sync) \
(opal_using_threads() ? ompi_sync_wait_mt (sync) : sync_wait_st (sync))
/* The loop in release handles a race condition between the signaling
* thread and the destruction of the condition variable. The signaling
@ -50,17 +51,56 @@ typedef struct ompi_wait_sync_t {
* as possible. Note that the race window is small so spinning here
* is more optimal than sleeping since this macro is called in
* the critical path. */
#define WAIT_SYNC_RELEASE(sync) do { } while (0)
#define WAIT_SYNC_RELEASE_NOWAIT(sync) do { } while (0)
#define WAIT_SYNC_SIGNAL(sync) do { } while (0)
#define WAIT_SYNC_SIGNALLED(sync) do { } while (0)
#define WAIT_SYNC_RELEASE(sync) \
if (opal_using_threads()) { \
while ((sync)->signaling) { \
qthread_yield(); \
continue; \
} \
opal_cond_destroy(&(sync)->condition); \
}
#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
if (opal_using_threads()) { \
opal_cond_destroy(&(sync)->condition); \
}
#define WAIT_SYNC_SIGNAL(sync) \
if (opal_using_threads()) { \
opal_mutex_lock(&(sync)->lock); \
opal_cond_signal(&(sync)->condition); \
opal_mutex_unlock(&(sync)->lock); \
(sync)->signaling = false; \
}
#define WAIT_SYNC_SIGNALLED(sync) \
{ \
(sync)->signaling = false; \
}
OPAL_DECLSPEC int ompi_sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st (ompi_wait_sync_t *sync)
static inline int sync_wait_st(ompi_wait_sync_t *sync)
{
while (sync->count > 0) {
opal_progress();
qthread_yield();
}
return sync->status;
}
#define WAIT_SYNC_INIT(sync,c) do { } while (0)
#define WAIT_SYNC_INIT(sync,c) \
do { \
(sync)->count = (c); \
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
(sync)->signaling = (0 != (c)); \
if (opal_using_threads()) { \
opal_cond_init(&(sync)->condition); \
opal_mutex_create(&(sync)->lock); \
} \
} while (0)
#endif /* OPAL_MCA_THREADS_QTHREADS_THREADS_QTHREADS_WAIT_SYNC_H */