/* * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "opal_config.h" #include "opal/constants.h" #ifdef HAVE_UNISTD_H #include #endif #include "opal/class/opal_list.h" #include "opal/mca/event/event.h" #include "opal/mca/threads/threads.h" #include "opal/util/error.h" #include "opal/util/fd.h" #include "opal/runtime/opal_progress_threads.h" /* create a tracking object for progress threads */ typedef struct { opal_list_item_t super; int refcount; char *name; opal_event_base_t *ev_base; /* This will be set to false when it is time for the progress thread to exit */ volatile bool ev_active; /* This event will always be set on the ev_base (so that the ev_base is not empty!) */ opal_event_t block; bool engine_constructed; opal_thread_t engine; } opal_progress_tracker_t; static void tracker_constructor(opal_progress_tracker_t *p) { p->refcount = 1; // start at one since someone created it p->name = NULL; p->ev_base = NULL; p->ev_active = false; p->engine_constructed = false; } static void tracker_destructor(opal_progress_tracker_t *p) { opal_event_del(&p->block); if (NULL != p->name) { free(p->name); } if (NULL != p->ev_base) { opal_event_base_free(p->ev_base); } if (p->engine_constructed) { OBJ_DESTRUCT(&p->engine); } } static OBJ_CLASS_INSTANCE(opal_progress_tracker_t, opal_list_item_t, tracker_constructor, tracker_destructor); static bool inited = false; static opal_list_t tracking; static struct timeval long_timeout = { .tv_sec = 3600, .tv_usec = 0 }; static const char *shared_thread_name = "OPAL-wide async progress thread"; /* * If this event is fired, just restart it so that this event base * continues to have something to block on. */ static void dummy_timeout_cb(int fd, short args, void *cbdata) { opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata; opal_event_add(&trk->block, &long_timeout); } /* * Main for the progress thread */ static void* progress_engine(opal_object_t *obj) { opal_thread_t *t = (opal_thread_t*)obj; opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg; while (trk->ev_active) { opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE); } return OPAL_THREAD_CANCELLED; } static void stop_progress_engine(opal_progress_tracker_t *trk) { assert(trk->ev_active); trk->ev_active = false; /* break the event loop - this will cause the loop to exit upon completion of any current event */ opal_event_base_loopbreak(trk->ev_base); opal_thread_join(&trk->engine, NULL); } static int start_progress_engine(opal_progress_tracker_t *trk) { assert(!trk->ev_active); trk->ev_active = true; /* fork off a thread to progress it */ trk->engine.t_run = progress_engine; trk->engine.t_arg = trk; int rc = opal_thread_start(&trk->engine); if (OPAL_SUCCESS != rc) { OPAL_ERROR_LOG(rc); } return rc; } opal_event_base_t *opal_progress_thread_init(const char *name) { opal_progress_tracker_t *trk; int rc; if (!inited) { OBJ_CONSTRUCT(&tracking, opal_list_t); inited = true; } if (NULL == name) { name = shared_thread_name; } /* check if we already have this thread */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { /* we do, so up the refcount on it */ ++trk->refcount; /* return the existing base */ return trk->ev_base; } } trk = OBJ_NEW(opal_progress_tracker_t); if (NULL == trk) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); return NULL; } trk->name = strdup(name); if (NULL == trk->name) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(trk); return NULL; } if (NULL == (trk->ev_base = opal_event_base_create())) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(trk); return NULL; } /* add an event to the new event base (if there are no events, opal_event_loop() will return immediately) */ opal_event_set(trk->ev_base, &trk->block, -1, OPAL_EV_PERSIST, dummy_timeout_cb, trk); opal_event_add(&trk->block, &long_timeout); /* construct the thread object */ OBJ_CONSTRUCT(&trk->engine, opal_thread_t); trk->engine_constructed = true; if (OPAL_SUCCESS != (rc = start_progress_engine(trk))) { OPAL_ERROR_LOG(rc); OBJ_RELEASE(trk); return NULL; } opal_list_append(&tracking, &trk->super); return trk->ev_base; } int opal_progress_thread_finalize(const char *name) { opal_progress_tracker_t *trk; if (!inited) { /* nothing we can do */ return OPAL_ERR_NOT_FOUND; } if (NULL == name) { name = shared_thread_name; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { /* decrement the refcount */ --trk->refcount; /* If the refcount is still above 0, we're done here */ if (trk->refcount > 0) { return OPAL_SUCCESS; } /* If the progress thread is active, stop it */ if (trk->ev_active) { stop_progress_engine(trk); } opal_list_remove_item(&tracking, &trk->super); OBJ_RELEASE(trk); return OPAL_SUCCESS; } } return OPAL_ERR_NOT_FOUND; } /* * Stop the progress thread, but don't delete the tracker (or event base) */ int opal_progress_thread_pause(const char *name) { opal_progress_tracker_t *trk; if (!inited) { /* nothing we can do */ return OPAL_ERR_NOT_FOUND; } if (NULL == name) { name = shared_thread_name; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { if (trk->ev_active) { stop_progress_engine(trk); } return OPAL_SUCCESS; } } return OPAL_ERR_NOT_FOUND; } int opal_progress_thread_resume(const char *name) { opal_progress_tracker_t *trk; if (!inited) { /* nothing we can do */ return OPAL_ERR_NOT_FOUND; } if (NULL == name) { name = shared_thread_name; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { if (trk->ev_active) { return OPAL_ERR_RESOURCE_BUSY; } return start_progress_engine(trk); } } return OPAL_ERR_NOT_FOUND; }