1
1
openmpi/opal/runtime/opal_progress_threads.c

296 строки
7.0 KiB
C
Исходник Обычный вид История

/*
2015-06-23 20:59:57 -07:00
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
2015-06-23 20:59:57 -07:00
*
* Additional copyrights may follow
2015-06-23 20:59:57 -07:00
*
* $HEADER$
*/
#include "opal_config.h"
#include "opal/constants.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "opal/class/opal_list.h"
#include "opal/mca/event/event.h"
#include "opal/threads/threads.h"
#include "opal/util/error.h"
#include "opal/util/fd.h"
#include "opal/runtime/opal_progress_threads.h"
/* create a tracking object for progress threads */
typedef struct {
opal_list_item_t super;
int refcount;
char *name;
opal_event_base_t *ev_base;
/* This will be set to false when it is time for the progress
thread to exit */
volatile bool ev_active;
/* This event will always be set on the ev_base (so that the
ev_base is not empty!) */
opal_event_t block;
bool engine_constructed;
opal_thread_t engine;
} opal_progress_tracker_t;
static void tracker_constructor(opal_progress_tracker_t *p)
{
p->refcount = 1; // start at one since someone created it
p->name = NULL;
p->ev_base = NULL;
p->ev_active = false;
p->engine_constructed = false;
}
static void tracker_destructor(opal_progress_tracker_t *p)
{
opal_event_del(&p->block);
if (NULL != p->name) {
free(p->name);
}
if (NULL != p->ev_base) {
opal_event_base_free(p->ev_base);
}
if (p->engine_constructed) {
OBJ_DESTRUCT(&p->engine);
}
}
static OBJ_CLASS_INSTANCE(opal_progress_tracker_t,
opal_list_item_t,
tracker_constructor,
tracker_destructor);
static bool inited = false;
static opal_list_t tracking;
static struct timeval long_timeout = {
.tv_sec = 3600,
.tv_usec = 0
};
static const char *shared_thread_name = "OPAL-wide async progress thread";
/*
* If this event is fired, just restart it so that this event base
* continues to have something to block on.
*/
static void dummy_timeout_cb(int fd, short args, void *cbdata)
{
opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata;
opal_event_add(&trk->block, &long_timeout);
}
/*
* Main for the progress thread
*/
static void* progress_engine(opal_object_t *obj)
{
opal_thread_t *t = (opal_thread_t*)obj;
opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg;
while (trk->ev_active) {
opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE);
}
return OPAL_THREAD_CANCELLED;
}
static void stop_progress_engine(opal_progress_tracker_t *trk)
{
assert(trk->ev_active);
trk->ev_active = false;
/* break the event loop - this will cause the loop to exit upon
completion of any current event */
opal_event_base_loopbreak(trk->ev_base);
opal_thread_join(&trk->engine, NULL);
}
static int start_progress_engine(opal_progress_tracker_t *trk)
{
assert(!trk->ev_active);
trk->ev_active = true;
/* fork off a thread to progress it */
trk->engine.t_run = progress_engine;
trk->engine.t_arg = trk;
int rc = opal_thread_start(&trk->engine);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
}
return rc;
}
opal_event_base_t *opal_progress_thread_init(const char *name)
{
opal_progress_tracker_t *trk;
int rc;
if (!inited) {
OBJ_CONSTRUCT(&tracking, opal_list_t);
inited = true;
}
if (NULL == name) {
name = shared_thread_name;
}
/* check if we already have this thread */
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
if (0 == strcmp(name, trk->name)) {
/* we do, so up the refcount on it */
++trk->refcount;
/* return the existing base */
return trk->ev_base;
}
}
trk = OBJ_NEW(opal_progress_tracker_t);
if (NULL == trk) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
return NULL;
}
trk->name = strdup(name);
if (NULL == trk->name) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(trk);
return NULL;
}
if (NULL == (trk->ev_base = opal_event_base_create())) {
OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(trk);
return NULL;
}
/* add an event to the new event base (if there are no events,
opal_event_loop() will return immediately) */
opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST,
dummy_timeout_cb, trk);
opal_event_add(&trk->block, &long_timeout);
/* construct the thread object */
OBJ_CONSTRUCT(&trk->engine, opal_thread_t);
trk->engine_constructed = true;
if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) {
OPAL_ERROR_LOG(rc);
OBJ_RELEASE(trk);
return NULL;
}
opal_list_append(&tracking, &trk->super);
return trk->ev_base;
}
int opal_progress_thread_finalize(const char *name)
{
opal_progress_tracker_t *trk;
if (!inited) {
/* nothing we can do */
return OPAL_ERR_NOT_FOUND;
}
if (NULL == name) {
name = shared_thread_name;
}
/* find the specified engine */
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
if (0 == strcmp(name, trk->name)) {
/* decrement the refcount */
--trk->refcount;
/* If the refcount is still above 0, we're done here */
if (trk->refcount > 0) {
return OPAL_SUCCESS;
}
/* If the progress thread is active, stop it */
if (trk->ev_active) {
stop_progress_engine(trk);
}
opal_list_remove_item(&tracking, &trk->super);
OBJ_RELEASE(trk);
return OPAL_SUCCESS;
}
}
return OPAL_ERR_NOT_FOUND;
}
/*
* Stop the progress thread, but don't delete the tracker (or event base)
*/
int opal_progress_thread_pause(const char *name)
{
opal_progress_tracker_t *trk;
if (!inited) {
/* nothing we can do */
return OPAL_ERR_NOT_FOUND;
}
if (NULL == name) {
name = shared_thread_name;
}
/* find the specified engine */
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
if (0 == strcmp(name, trk->name)) {
if (trk->ev_active) {
stop_progress_engine(trk);
}
return OPAL_SUCCESS;
}
}
return OPAL_ERR_NOT_FOUND;
}
int opal_progress_thread_resume(const char *name)
{
opal_progress_tracker_t *trk;
if (!inited) {
/* nothing we can do */
return OPAL_ERR_NOT_FOUND;
}
if (NULL == name) {
name = shared_thread_name;
}
/* find the specified engine */
OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) {
if (0 == strcmp(name, trk->name)) {
if (trk->ev_active) {
return OPAL_ERR_RESOURCE_BUSY;
}
return start_progress_engine(trk);
}
}
return OPAL_ERR_NOT_FOUND;
}