/* * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "opal_config.h" #include "opal/constants.h" #include "opal/class/opal_list.h" #include "opal/mca/event/event.h" #include "opal/threads/threads.h" #include "opal/util/error.h" #include "opal/util/fd.h" #include "opal/runtime/opal_progress_threads.h" /* create a tracking object for progress threads */ typedef struct { opal_list_item_t super; char *name; opal_event_base_t *ev_base; volatile bool ev_active; bool block_active; opal_event_t block; bool engine_defined; opal_thread_t engine; int pipe[2]; } opal_progress_tracker_t; static void trkcon(opal_progress_tracker_t *p) { p->name = NULL; p->ev_base = NULL; p->ev_active = true; p->block_active = false; p->engine_defined = false; p->pipe[0] = -1; p->pipe[1] = -1; } static void trkdes(opal_progress_tracker_t *p) { if (NULL != p->name) { free(p->name); } if (p->block_active) { opal_event_del(&p->block); } if (NULL != p->ev_base) { opal_event_base_free(p->ev_base); } if (0 <= p->pipe[0]) { close(p->pipe[0]); } if (0 <= p->pipe[1]) { close(p->pipe[1]); } if (p->engine_defined) { OBJ_DESTRUCT(&p->engine); } } static OBJ_CLASS_INSTANCE(opal_progress_tracker_t, opal_list_item_t, trkcon, trkdes); static opal_list_t tracking; static bool inited = false; static void wakeup(int fd, short args, void *cbdata) { opal_progress_tracker_t *trk = (opal_progress_tracker_t*)cbdata; /* if this event fired, then the blocker event will * be deleted from the event base by libevent, so flag * it so we don't try to delete it again */ trk->block_active = false; } static void* progress_engine(opal_object_t *obj) { opal_thread_t *t = (opal_thread_t*)obj; opal_progress_tracker_t *trk = (opal_progress_tracker_t*)t->t_arg; while (trk->ev_active) { opal_event_loop(trk->ev_base, OPAL_EVLOOP_ONCE); } return OPAL_THREAD_CANCELLED; } opal_event_base_t *opal_start_progress_thread(char *name, bool create_block) { opal_progress_tracker_t *trk; int rc; trk = OBJ_NEW(opal_progress_tracker_t); trk->name = strdup(name); if (NULL == (trk->ev_base = opal_event_base_create())) { OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); OBJ_RELEASE(trk); return NULL; } if (create_block) { /* add an event it can block on */ if (0 > pipe(trk->pipe)) { OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO); OBJ_RELEASE(trk); return NULL; } /* Make sure the pipe FDs are set to close-on-exec so that they don't leak into children */ if (opal_fd_set_cloexec(trk->pipe[0]) != OPAL_SUCCESS || opal_fd_set_cloexec(trk->pipe[1]) != OPAL_SUCCESS) { OPAL_ERROR_LOG(OPAL_ERR_IN_ERRNO); OBJ_RELEASE(trk); return NULL; } opal_event_set(trk->ev_base, &trk->block, trk->pipe[0], OPAL_EV_READ, wakeup, trk); opal_event_add(&trk->block, 0); trk->block_active = true; } /* construct the thread object */ OBJ_CONSTRUCT(&trk->engine, opal_thread_t); trk->engine_defined = true; /* fork off a thread to progress it */ trk->engine.t_run = progress_engine; trk->engine.t_arg = trk; if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) { OPAL_ERROR_LOG(rc); OBJ_RELEASE(trk); return NULL; } if (!inited) { OBJ_CONSTRUCT(&tracking, opal_list_t); inited = true; } opal_list_append(&tracking, &trk->super); return trk->ev_base; } void opal_stop_progress_thread(char *name, bool cleanup) { opal_progress_tracker_t *trk; int i; if (!inited) { /* nothing we can do */ return; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { /* mark it as inactive */ trk->ev_active = false; /* break the event loop - this will cause the loop to exit * upon completion of any current event */ opal_event_base_loopbreak(trk->ev_base); /* if present, use the block to break it loose just in * case the thread is blocked in a call to select for * a long time */ if (trk->block_active) { i=1; write(trk->pipe[1], &i, sizeof(int)); } /* wait for thread to exit */ opal_thread_join(&trk->engine, NULL); /* cleanup, if they indicated they are done with this event base */ if (cleanup) { opal_list_remove_item(&tracking, &trk->super); OBJ_RELEASE(trk); } return; } } } int opal_restart_progress_thread(char *name) { opal_progress_tracker_t *trk; int rc; if (!inited) { /* nothing we can do */ return OPAL_ERROR; } /* find the specified engine */ OPAL_LIST_FOREACH(trk, &tracking, opal_progress_tracker_t) { if (0 == strcmp(name, trk->name)) { if (!trk->engine_defined) { OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED); return OPAL_ERR_NOT_SUPPORTED; } /* ensure the block is set, if requested */ if (0 <= trk->pipe[0] && !trk->block_active) { opal_event_add(&trk->block, 0); trk->block_active = true; } /* start the thread again */ if (OPAL_SUCCESS != (rc = opal_thread_start(&trk->engine))) { OPAL_ERROR_LOG(rc); return rc; } } } return OPAL_ERR_NOT_FOUND; }