diff --git a/opal/runtime/opal_progress_threads.c b/opal/runtime/opal_progress_threads.c index da371f6be6..0860e27df0 100644 --- a/opal/runtime/opal_progress_threads.c +++ b/opal/runtime/opal_progress_threads.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -22,66 +23,79 @@ #include "opal/runtime/opal_progress_threads.h" + /* create a tracking object for progress threads */ typedef struct { opal_list_item_t super; + int refcount; char *name; + opal_event_base_t *ev_base; + + /* This will be set to false when it is time for the progress + thread to exit */ volatile bool ev_active; - bool block_active; + + /* This event will always be set on the ev_base (so that the + ev_base is not empty!) */ opal_event_t block; - bool engine_defined; + + bool engine_constructed; opal_thread_t engine; - int pipe[2]; } opal_progress_tracker_t; -static void trkcon(opal_progress_tracker_t *p) + +static void tracker_constructor(opal_progress_tracker_t *p) { p->refcount = 1; // start at one since someone created it p->name = NULL; p->ev_base = NULL; - p->ev_active = true; - p->block_active = false; - p->engine_defined = false; - p->pipe[0] = -1; - p->pipe[1] = -1; + p->ev_active = false; + p->engine_constructed = false; } -static void trkdes(opal_progress_tracker_t *p) + +static void tracker_destructor(opal_progress_tracker_t *p) { + opal_event_del(&p->block); + if (NULL != p->name) { free(p->name); } - if (p->block_active) { - opal_event_del(&p->block); - } if (NULL != p->ev_base) { opal_event_base_free(p->ev_base); } - if (0 <= p->pipe[0]) { - close(p->pipe[0]); - } - if (0 <= p->pipe[1]) { - close(p->pipe[1]); - } - if (p->engine_defined) { + if (p->engine_constructed) { OBJ_DESTRUCT(&p->engine); } } + static OBJ_CLASS_INSTANCE(opal_progress_tracker_t, opal_list_item_t, - trkcon, trkdes); + tracker_constructor, + tracker_destructor); -static opal_list_t tracking; static bool inited = false; -static void wakeup(int fd, short args, void *cbdata) +static opal_list_t tracking; +static struct timeval long_timeout = { + .tv_sec = 3600, + .tv_usec = 0 +}; +static const char *shared_thread_name = "OPAL-wide async progress thread"; + +/* + * If this event is fired, just restart it so that this event base + * continues to have something to block on. + */ +static void dummy_timeout_cb(int fd, short args, void *cbdata) { opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata; - /* if this event fired, then the blocker event will - * be deleted from the event base by libevent, so flag - * it so we don't try to delete it again */ - trk->block_active = false; + opal_event_add(&trk->block, &long_timeout); } + +/* + * Main for the progress thread + */ static void* progress_engine(opal_object_t *obj) { opal_thread_t *t = (opal_thread_t*)obj; @@ -90,11 +104,41 @@ static void* progress_engine(opal_object_t *obj) while (trk->ev_active) { opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE); } + return OPAL_THREAD_CANCELLED; } -opal_event_base_t *opal_start_progress_thread(char *name, - bool create_block) +static void stop_progress_engine(opal_progress_tracker_t *trk) +{ + assert(trk->ev_active); + trk->ev_active = false; + + /* break the event loop - this will cause the loop to exit upon + completion of any current event */ + opal_event_base_loopbreak(trk->ev_base); + + opal_thread_join(&trk->engine, NULL); +} + +static int start_progress_engine(opal_progress_tracker_t *trk) +{ + assert(!trk->ev_active); + trk->ev_active = true; + + /* fork off a thread to progress it */ + trk->engine.t_run = progress_engine; + trk->engine.t_arg = trk; + + int rc = opal_thread_start(&trk->engine); + if (OPAL_SUCCESS != rc) { + OPAL_ERROR_LOG(rc); + OBJ_RELEASE(trk); + } + + return rc; +} + +opal_event_base_t *opal_progress_thread_init(const char *name) { opal_progress_tracker_t *trk; int rc; @@ -104,6 +148,10 @@ opal_event_base_t *opal_start_progress_thread(char *name, inited = true; } + if (NULL == name) { + name = shared_thread_name; + } + /* check if we already have this thread */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { @@ -115,129 +163,132 @@ opal_event_base_t *opal_start_progress_thread(char *name, } trk = OBJ_NEW(opal_progress_tracker_t); + if (NULL == trk) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + return NULL; + } + trk->name = strdup(name); + if (NULL == trk->name) { + OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); + OBJ_RELEASE(trk); + return NULL; + } + if (NULL == (trk->ev_base = opal_event_base_create())) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(trk); return NULL; } - if (create_block) { - /* add an event it can block on */ - if (0 > pipe(trk->pipe)) { - OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO); - OBJ_RELEASE(trk); - return NULL; - } - /* Make sure the pipe FDs are set to close-on-exec so that - they don't leak into children */ - if (opal_fd_set_cloexec(trk->pipe[0]) != OPAL_SUCCESS || - opal_fd_set_cloexec(trk->pipe[1]) != OPAL_SUCCESS) { - OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO); - OBJ_RELEASE(trk); - return NULL; - } - opal_event_set(trk->ev_base, &trk->block, trk->pipe[0], OPAL_EV_READ, wakeup, trk); - opal_event_add(&trk->block, 0); - trk->block_active = true; - } + /* add an event to the new event base (if there are no events, + opal_event_loop() will return immediately) */ + opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST, + dummy_timeout_cb, trk); + opal_event_add(&trk->block, &long_timeout); /* construct the thread object */ OBJ_CONSTRUCT(&trk->engine, opal_thread_t); - trk->engine_defined = true; - /* fork off a thread to progress it */ - trk->engine.t_run = progress_engine; - trk->engine.t_arg = trk; - if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) { + trk->engine_constructed = true; + if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) { OPAL_ERROR_LOG(rc); OBJ_RELEASE(trk); return NULL; } opal_list_append(&tracking, &trk->super); + return trk->ev_base; } -void opal_stop_progress_thread(char *name, bool cleanup) +int opal_progress_thread_finalize(const char *name) { opal_progress_tracker_t *trk; - int i; if (!inited) { /* nothing we can do */ - return; + return OPAL_ERR_NOT_FOUND; + } + + if (NULL == name) { + name = shared_thread_name; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { - /* if it is already inactive, then just cleanup if that - * is the request */ - if (!trk->ev_active) { - if (cleanup) { - opal_list_remove_item(&tracking, &trk->super); - OBJ_RELEASE(trk); - } - return; - } /* decrement the refcount */ --trk->refcount; - /* if we have reached zero, then it's time to stop it */ - if (0 < trk->refcount) { - return; - } - /* mark it as inactive */ - trk->ev_active = false; - /* break the event loop - this will cause the loop to exit - * upon completion of any current event */ - opal_event_base_loopbreak(trk->ev_base); - /* if present, use the block to break it loose just in - * case the thread is blocked in a call to select for - * a long time */ - if (trk->block_active) { - i=1; - write(trk->pipe[1], &i, sizeof(int)); - } - /* wait for thread to exit */ - opal_thread_join(&trk->engine, NULL); - /* cleanup, if they indicated they are done with this event base */ - if (cleanup) { - opal_list_remove_item(&tracking, &trk->super); - OBJ_RELEASE(trk); - } - return; - } - } -} -int opal_restart_progress_thread(char *name) -{ - opal_progress_tracker_t *trk; - int rc; + /* If the refcount is still above 0, we're done here */ + if (trk->refcount > 0) { + return OPAL_SUCCESS; + } - if (!inited) { - /* nothing we can do */ - return OPAL_ERROR; - } + /* If the progress thread is active, stop it */ + if (trk->ev_active) { + stop_progress_engine(trk); + } - /* find the specified engine */ - OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { - if (0 == strcmp(name, trk->name)) { - if (!trk->engine_defined) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED); - return OPAL_ERR_NOT_SUPPORTED; - } - /* up the refcount */ - ++trk->refcount; - /* ensure the block is set, if requested */ - if (0 <= trk->pipe[0] && !trk->block_active) { - opal_event_add(&trk->block, 0); - trk->block_active = true; - } - /* start the thread again */ - if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) { - OPAL_ERROR_LOG(rc); - return rc; - } + opal_list_remove_item(&tracking, &trk->super); + OBJ_RELEASE(trk); + return OPAL_SUCCESS; + } + } + + return OPAL_ERR_NOT_FOUND; +} + +/* + * Stop the progress thread, but don't delete the tracker (or event base) + */ +int opal_progress_thread_pause(const char *name) +{ + opal_progress_tracker_t *trk; + + if (!inited) { + /* nothing we can do */ + return OPAL_ERR_NOT_FOUND; + } + + if (NULL == name) { + name = shared_thread_name; + } + + /* find the specified engine */ + OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { + if (0 == strcmp(name, trk->name)) { + if (trk->ev_active) { + stop_progress_engine(trk); + } + + return OPAL_SUCCESS; + } + } + + return OPAL_ERR_NOT_FOUND; +} + +int opal_progress_thread_resume(const char *name) +{ + opal_progress_tracker_t *trk; + + if (!inited) { + /* nothing we can do */ + return OPAL_ERR_NOT_FOUND; + } + + if (NULL == name) { + name = shared_thread_name; + } + + /* find the specified engine */ + OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { + if (0 == strcmp(name, trk->name)) { + if (trk->ev_active) { + return OPAL_ERR_RESOURCE_BUSY; + } + + return start_progress_engine(trk); } } diff --git a/opal/runtime/opal_progress_threads.h b/opal/runtime/opal_progress_threads.h index ec1f8e2adf..45321553c6 100644 --- a/opal/runtime/opal_progress_threads.h +++ b/opal/runtime/opal_progress_threads.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -14,20 +15,56 @@ #include "opal/mca/event/event.h" -/* start a progress thread, assigning it the provided name for - * tracking purposes. If create_block is true, then this function - * will also create a pipe so that libevent has something to block - * against, thus keeping the thread from free-running + +/** + * Initialize a progress thread name; if a progress thread is not + * already associated with that name, start a progress thread. + * + * If you have general events that need to run in *a* progress thread + * (but not necessarily a your own, dedicated progress thread), pass + * NULL the "name" argument to the opal_progress_thead_init() function + * to glom on to the general OPAL-wide progress thread. + * + * If a name is passed that was already used in a prior call to + * opal_start_progress_thread(), the event base associated with that + * already-running progress thread will be returned (i.e., no new + * progress thread will be started). */ -OPAL_DECLSPEC opal_event_base_t *opal_start_progress_thread(char *name, - bool create_block); +OPAL_DECLSPEC opal_event_base_t *opal_progress_thread_init(const char *name); -/* stop the progress thread of the provided name. This function will - * also cleanup the blocking pipes and release the event base if - * the cleanup param is true */ -OPAL_DECLSPEC void opal_stop_progress_thread(char *name, bool cleanup); +/** + * Finalize a progress thread name (reference counted). + * + * Once this function is invoked as many times as + * opal_progress_thread_init() was invoked on this name (or NULL), the + * progress function is shut down and the event base associated with + * it is destroyed. + * + * Will return OPAL_ERR_NOT_FOUND if the progress thread name does not + * exist; OPAL_SUCCESS otherwise. + */ +OPAL_DECLSPEC int opal_progress_thread_finalize(const char *name); -/* restart the progress thread of the provided name */ -OPAL_DECLSPEC int opal_restart_progress_thread(char *name); +/** + * Temporarily pause the progress thread associated with this name. + * + * This function does not destroy the event base associated with this + * progress thread name, but it does stop processing all events on + * that event base until opal_progress_thread_resume() is invoked on + * that name. + * + * Will return OPAL_ERR_NOT_FOUND if the progress thread name does not + * exist; OPAL_SUCCESS otherwise. + */ +OPAL_DECLSPEC int opal_progress_thread_pause(const char *name); + +/** + * Restart a previously-paused progress thread associated with this + * name. + * + * Will return OPAL_ERR_NOT_FOUND if the progress thread name does not + * exist; OPAL_SUCCESS otherwise. + */ +OPAL_DECLSPEC int opal_progress_thread_resume(const char *name); #endif