/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2008 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007-2013 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2008 Institut National de Recherche en Informatique * et Automatique. All rights reserved. * Copyright (c) 2014 Intel Corporation. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "orte_config.h" #include #include #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_SYS_QUEUE_H #include #endif #include #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_SYS_TYPES_H #include #endif #include #include #include #include #include #ifdef HAVE_SYS_WAIT_H #include #endif #include "opal/dss/dss_types.h" #include "opal/class/opal_object.h" #include "opal/util/output.h" #include "opal/class/opal_list.h" #include "opal/mca/event/event.h" #include "opal/threads/mutex.h" #include "opal/threads/condition.h" #include "opal/sys/atomic.h" #include "orte/constants.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" /* Timer Object Declaration */ static void timer_const(orte_timer_t *tm) { tm->ev = opal_event_alloc(); tm->payload = NULL; } static void timer_dest(orte_timer_t *tm) { opal_event_free(tm->ev); } OBJ_CLASS_INSTANCE(orte_timer_t, opal_object_t, timer_const, timer_dest); /* Local objects */ typedef struct { opal_list_item_t super; opal_event_t ev; orte_proc_t *child; orte_wait_fn_t cbfunc; void *cbdata; } orte_wait_tracker_t; static void wccon(orte_wait_tracker_t *p) { p->child = NULL; p->cbfunc = NULL; p->cbdata = NULL; } static void wcdes(orte_wait_tracker_t *p) { if (NULL != p->child) { OBJ_RELEASE(p->child); } } static OBJ_CLASS_INSTANCE(orte_wait_tracker_t, opal_list_item_t, wccon, wcdes); /* Local Variables */ static opal_event_t handler; static opal_list_t pending_cbs; /* Local Function Prototypes */ static void wait_signal_callback(int fd, short event, void *arg); /* Interface Functions */ void orte_wait_disable(void) { opal_event_del(&handler); } void orte_wait_enable(void) { opal_event_add(&handler, NULL); } int orte_wait_init(void) { OBJ_CONSTRUCT(&pending_cbs, opal_list_t); opal_event_set(orte_event_base, &handler, SIGCHLD, OPAL_EV_SIGNAL|OPAL_EV_PERSIST, wait_signal_callback, &handler); opal_event_set_priority(&handler, ORTE_SYS_PRI); opal_event_add(&handler, NULL); return ORTE_SUCCESS; } int orte_wait_finalize(void) { opal_event_del(&handler); /* clear out the pending cbs */ OPAL_LIST_DESTRUCT(&pending_cbs); return ORTE_SUCCESS; } static void register_callback(int fd, short args, void *cbdata) { orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata; orte_wait_tracker_t *t2; /* see if this proc is still alive */ if (!ORTE_FLAG_TEST(trk->child, ORTE_PROC_FLAG_ALIVE)) { /* already heard this proc is dead, so just do the callback */ if (NULL != trk->cbfunc) { trk->cbfunc(trk->child, trk->cbdata); OBJ_RELEASE(trk); return; } } /* we just override any existing registration */ OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) { if (t2->child == trk->child) { t2->cbfunc = trk->cbfunc; t2->cbdata = trk->cbdata; OBJ_RELEASE(trk); return; } } /* get here if this is a new registration */ opal_list_append(&pending_cbs, &trk->super); } void orte_wait_cb(orte_proc_t *child, orte_wait_fn_t callback, void *data) { orte_wait_tracker_t *trk; if (NULL == child || NULL == callback) { /* bozo protection */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return; } /* push this into the event library for handling */ trk = OBJ_NEW(orte_wait_tracker_t); OBJ_RETAIN(child); // protect against race conditions trk->child = child; trk->cbfunc = callback; trk->cbdata = data; opal_event_set(orte_event_base, &trk->ev, -1, OPAL_EV_WRITE, register_callback, trk); opal_event_set_priority(&trk->ev, ORTE_SYS_PRI); opal_event_active(&trk->ev, OPAL_EV_WRITE, 1); } static void cancel_callback(int fd, short args, void *cbdata) { orte_wait_tracker_t *trk = (orte_wait_tracker_t*)cbdata; orte_wait_tracker_t *t2; OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) { if (t2->child == trk->child) { opal_list_remove_item(&pending_cbs, &t2->super); OBJ_RELEASE(t2); OBJ_RELEASE(trk); return; } } OBJ_RELEASE(trk); } void orte_wait_cb_cancel(orte_proc_t *child) { orte_wait_tracker_t *trk; if (NULL == child) { /* bozo protection */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return; } /* push this into the event library for handling */ trk = OBJ_NEW(orte_wait_tracker_t); OBJ_RETAIN(child); // protect against race conditions trk->child = child; opal_event_set(orte_event_base, &trk->ev, -1, OPAL_EV_WRITE, cancel_callback, trk); opal_event_set_priority(&trk->ev, ORTE_SYS_PRI); opal_event_active(&trk->ev, OPAL_EV_WRITE, 1); } /* callback from the event library whenever a SIGCHLD is received */ static void wait_signal_callback(int fd, short event, void *arg) { opal_event_t *signal = (opal_event_t*) arg; int status; pid_t pid; orte_wait_tracker_t *t2; if (SIGCHLD != OPAL_EVENT_SIGNAL(signal)) { return; } /* we can have multiple children leave but only get one * sigchild callback, so reap all the waitpids until we * don't get anything valid back */ while (1) { pid = waitpid(-1, &status, WNOHANG); if (-1 == pid && EINTR == errno) { /* try it again */ continue; } /* if we got garbage, then nothing we can do */ if (pid <= 0) { return; } /* we are already in an event, so it is safe to access the list */ OPAL_LIST_FOREACH(t2, &pending_cbs, orte_wait_tracker_t) { if (pid == t2->child->pid) { /* found it! */ t2->child->exit_code = status; if (NULL != t2->cbfunc) { t2->cbfunc(t2->child, t2->cbdata); } opal_list_remove_item(&pending_cbs, &t2->super); OBJ_RELEASE(t2); break; } } } }