1
1

Clean up a few initialization issues - don't think these are impacting the shared memory situation as it didn't fix the problem.

Setup the event API to support multiple bases in preparation for splitting the OMPI and ORTE events. Holding here pending shared memory resolution.

This commit was SVN r23943.
This commit is contained in:
Ralph Castain 2010-10-26 02:41:42 +00:00
parent fc46dfa78a
commit 86c7365e8e
52 changed files with 265 additions and 217 deletions

View File

@ -37,9 +37,10 @@ int mca_btl_base_close(void)
} else if (--mca_btl_base_already_opened > 0) {
return OMPI_SUCCESS;
}
#if 0
/* disable event processing while cleaning up btls */
opal_event.disable();
#endif
/* Finalize all the btl components and free their list items */
for (item = opal_list_remove_first(&mca_btl_base_modules_initialized);
@ -69,9 +70,10 @@ int mca_btl_base_close(void)
if(NULL != mca_btl_base_exclude)
free(mca_btl_base_exclude);
#if 0
/* restore event processing */
opal_event.enable();
#endif
/* All done */
return OMPI_SUCCESS;
}

View File

@ -175,7 +175,7 @@ static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd)
/* Make an event for this fd */
ri->ri_event_used = true;
OBJ_CONSTRUCT(&ri->ri_event, opal_event_t);
opal_event.set(&ri->ri_event, ri->ri_fd,
opal_event.set(opal_event_base, &ri->ri_event, ri->ri_fd,
ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback,
ri);
opal_event.add(&ri->ri_event, 0);
@ -483,7 +483,7 @@ int ompi_btl_openib_fd_init(void)
/* Create a libevent event that is used in the main thread
to watch its pipe */
OBJ_CONSTRUCT(&main_thread_event, opal_event_t);
opal_event.set(&main_thread_event, pipe_to_main_thread[0],
opal_event.set(opal_event_base, &main_thread_event, pipe_to_main_thread[0],
OPAL_EV_READ | OPAL_EV_PERSIST,
main_thread_event_callback, NULL);
opal_event.add(&main_thread_event, 0);

View File

@ -602,7 +602,7 @@ static int mca_btl_sctp_component_create_listen(void)
/* register listen port */
opal_event.set(
opal_event.set(opal_event_base,
&mca_btl_sctp_component.sctp_recv_event,
mca_btl_sctp_component.sctp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
@ -652,7 +652,7 @@ static int mca_btl_sctp_component_register_listen(void)
/* register listen port */
opal_event.set(
opal_event.set(opal_event_base,
&mca_btl_sctp_component.sctp_recv_event,
mca_btl_sctp_component.sctp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
@ -893,7 +893,7 @@ void mca_btl_sctp_component_accept(void)
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_sctp_event_t);
opal_event.set(&event->event, sd, OPAL_EV_READ, mca_btl_sctp_component_recv_handler, event);
opal_event.set(opal_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_sctp_component_recv_handler, event);
opal_event.add(&event->event, 0);
}
}
@ -913,7 +913,7 @@ void mca_btl_sctp_component_accept(void)
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_sctp_event_t);
opal_event.set(&event->event, sd, OPAL_EV_READ, mca_btl_sctp_recv_handler, event);
opal_event.set(opal_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_sctp_recv_handler, event);
opal_event.add(&event->event, 0);
}
}

View File

@ -273,12 +273,12 @@ static inline void mca_btl_sctp_endpoint_event_init(mca_btl_base_endpoint_t* btl
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
opal_event.set( &btl_endpoint->endpoint_recv_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_recv_handler,
btl_endpoint );
opal_event.set( &btl_endpoint->endpoint_send_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_send_handler,
@ -297,12 +297,12 @@ static inline void mca_btl_sctp_endpoint_event_init(mca_btl_base_endpoint_t* btl
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_SCTP_ENDPOINT_CACHE */
opal_event.set( &btl_endpoint->endpoint_recv_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_sctp_recv_handler,
btl_endpoint );
opal_event.set( &btl_endpoint->endpoint_send_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,
mca_btl_sctp_endpoint_send_handler,

View File

@ -799,7 +799,7 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
/* register listen port */
if (AF_INET == af_family) {
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp_recv_event, opal_event_t);
opal_event.set( &mca_btl_tcp_component.tcp_recv_event,
opal_event.set(opal_event_base, &mca_btl_tcp_component.tcp_recv_event,
mca_btl_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
@ -809,7 +809,7 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
#if OPAL_WANT_IPV6
if (AF_INET6 == af_family) {
OBJ_CONSTRUCT(&mca_btl_tcp_component.tcp6_recv_event, opal_event_t);
opal_event.set( &mca_btl_tcp_component.tcp6_recv_event,
opal_event.set(opal_event_base, &mca_btl_tcp_component.tcp6_recv_event,
mca_btl_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_component_accept_handler,
@ -1032,7 +1032,7 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_btl_tcp_event_t);
opal_event.set(&event->event, sd, OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
opal_event.set(opal_event_base, &event->event, sd, OPAL_EV_READ, mca_btl_tcp_component_recv_handler, event);
opal_event.add(&event->event, 0);
}
}

View File

@ -214,7 +214,7 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
opal_event.set( &btl_endpoint->endpoint_recv_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_recv_event,
btl_endpoint->endpoint_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_recv_handler,
@ -225,7 +225,7 @@ static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* btl_
* will be fired only once, and when the endpoint is marked as
* CONNECTED the event should be recreated with the correct flags.
*/
opal_event.set( &btl_endpoint->endpoint_send_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE,
mca_btl_tcp_endpoint_send_handler,
@ -420,7 +420,7 @@ static void mca_btl_tcp_endpoint_connected(mca_btl_base_endpoint_t* btl_endpoint
btl_endpoint->endpoint_retries = 0;
/* Create the send event in a persistent manner. */
opal_event.set( &btl_endpoint->endpoint_send_event,
opal_event.set(opal_event_base, &btl_endpoint->endpoint_send_event,
btl_endpoint->endpoint_sd,
OPAL_EV_WRITE | OPAL_EV_PERSIST,
mca_btl_tcp_endpoint_send_handler,

View File

@ -4445,7 +4445,7 @@ static int ft_event_exchange_bookmarks(void)
/* Wait for all bookmarks to arrive */
START_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
while( total_recv_bookmarks > 0 ) {
opal_event.loop(OPAL_EVLOOP_NONBLOCK);
opal_event.loop(opal_event_base, OPAL_EVLOOP_NONBLOCK);
}
total_recv_bookmarks = 0;
END_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
@ -5240,7 +5240,7 @@ static int wait_quiesce_drain_ack(void)
}
}
opal_event.loop(OPAL_EVLOOP_NONBLOCK);
opal_event.loop(opal_event_base, OPAL_EVLOOP_NONBLOCK);
}
/* Clear the ack queue if it isn't already clear (it should already be) */

View File

@ -43,9 +43,9 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_wdog_cnt = 0;
vfrag->vf_ack_cnt = 0;
OBJ_CONSTRUCT(&vfrag->vf_wdog_ev, opal_event_t);
opal_event.evtimer_set(&vfrag->vf_wdog_ev, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag);
opal_event.evtimer_set(opal_event_base, &vfrag->vf_wdog_ev, mca_pml_dr_vfrag_wdog_timeout, (void*) vfrag);
OBJ_CONSTRUCT(&vfrag->vf_ack_ev, opal_event_t);
opal_event.evtimer_set(&vfrag->vf_ack_ev, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag);
opal_event.evtimer_set(opal_event_base, &vfrag->vf_ack_ev, mca_pml_dr_vfrag_ack_timeout, (void*) vfrag);
}

View File

@ -242,11 +242,11 @@ int main(int argc, char *argv[])
* after ourselves.
*/
OBJ_CONSTRUCT(&term_handler, opal_event_t);
opal_event.set(&term_handler, SIGTERM, OPAL_EV_SIGNAL,
opal_event.set(opal_event_base, &term_handler, SIGTERM, OPAL_EV_SIGNAL,
shutdown_callback, NULL);
opal_event.add(&term_handler, NULL);
OBJ_CONSTRUCT(&int_handler, opal_event_t);
opal_event.set(&int_handler, SIGINT, OPAL_EV_SIGNAL,
opal_event.set(opal_event_base, &int_handler, SIGINT, OPAL_EV_SIGNAL,
shutdown_callback, NULL);
opal_event.add(&int_handler, NULL);
@ -284,7 +284,7 @@ int main(int argc, char *argv[])
}
/* wait to hear we are done */
opal_event.dispatch();
opal_event.dispatch(opal_event_base);
/* should never get here, but if we do... */

View File

@ -282,9 +282,7 @@ void ompi_info_open_components(void)
map->components = &opal_memory_base_components_opened;
opal_pointer_array_add(&component_map, map);
if (OPAL_SUCCESS != opal_event_base_open()) {
goto error;
}
/* the event framework is already open - just get its components */
map = OBJ_NEW(ompi_info_component_map_t);
map->type = strdup("event");
map->components = &opal_event_components;

View File

@ -19,8 +19,10 @@ int opal_event_base_close(void)
{
opal_list_item_t *item;
/* If there is a selected event module, finalize it */
/* release the event base */
OBJ_RELEASE(opal_event_base);
/* If there is a selected event module, finalize it */
if (NULL != opal_event.finalize) {
opal_event.finalize();
}

View File

@ -33,7 +33,7 @@
int opal_event_base_output = -1;
opal_event_module_t opal_event = {0};
opal_list_t opal_event_components;
opal_event_base_t *opal_event_base=NULL;
/*
* Only ONE event component can compile at any time, so
* just open that one - it will be statically built
@ -81,12 +81,13 @@ int opal_event_base_open(void)
}
}
/* be sure to init the final module */
/* Init the final module */
if (NULL != opal_event.init) {
rc = opal_event.init();
}
/* All done */
/* get our event base */
opal_event_base = OBJ_NEW(opal_event_base_t);
return rc;
}
@ -108,3 +109,21 @@ OBJ_CLASS_INSTANCE(opal_event_t,
opal_object_t,
ev_construct,
ev_destruct);
static void evbase_construct(opal_event_base_t *ptr)
{
ptr->base = NULL;
if (NULL != opal_event.construct_base) {
opal_event.construct_base(ptr);
}
}
static void evbase_destruct(opal_event_base_t *ptr)
{
if (NULL != opal_event.destruct_base) {
opal_event.destruct_base(ptr);
}
}
OBJ_CLASS_INSTANCE(opal_event_base_t,
opal_object_t,
evbase_construct,
evbase_destruct);

View File

@ -33,6 +33,7 @@
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/class/opal_object.h"
#include "opal/class/opal_list.h"
BEGIN_C_DECLS
@ -69,6 +70,12 @@ typedef struct {
} opal_event_t;
OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_event_t);
typedef struct {
opal_object_t super;
void *base;
} opal_event_base_t;
OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_event_base_t);
#define OPAL_TIMEOUT_DEFAULT {1, 0}
typedef void (*opal_event_callback_fn_t)(int, short, void *);
@ -77,11 +84,8 @@ typedef int (*opal_event_base_module_init_fn_t)(void);
typedef int (*opal_event_base_module_fini_fn_t)(void);
typedef void (*opal_event_base_module_set_debug_output_fn_t)(bool output);
typedef int (*opal_event_base_module_enable_fn_t)(void);
typedef int (*opal_event_base_module_disable_fn_t)(void);
typedef int (*opal_event_base_module_restart_fn_t)(void);
typedef int (*opal_event_base_module_set_fn_t)(opal_event_t *ev, int fd, short events,
typedef int (*opal_event_base_module_set_fn_t)(opal_event_base_t *evbase,
opal_event_t *ev, int fd, short events,
opal_event_callback_fn_t cbfunc, void *arg);
typedef int (*opal_event_base_module_add_fn_t)(opal_event_t *ev, const struct timeval *tv);
@ -89,12 +93,14 @@ typedef int (*opal_event_base_module_del_fn_t)(opal_event_t *ev);
typedef int (*opal_event_base_module_get_signal_fn_t)(opal_event_t *ev);
typedef int (*opal_event_base_module_dispatch_fn_t)(void);
typedef int (*opal_event_base_module_dispatch_fn_t)(opal_event_base_t *evbase);
/**
Create a timer event
*/
typedef opal_event_t* (*opal_event_base_module_evtimer_new_fn_t)(opal_event_callback_fn_t cbfunc, void *cbdata);
typedef opal_event_t* (*opal_event_base_module_evtimer_new_fn_t)(opal_event_base_t *evbase,
opal_event_callback_fn_t cbfunc,
void *cbdata);
/**
Add a timer event.
@ -111,7 +117,9 @@ typedef int (*opal_event_base_module_evtimer_add_fn_t)(opal_event_t *ev, const s
@param cb callback function
@param arg argument that will be passed to the callback function
*/
typedef void (*opal_event_base_module_evtimer_set_fn_t)(opal_event_t *ev, opal_event_callback_fn_t cbfunc, void *cbdata);
typedef void (*opal_event_base_module_evtimer_set_fn_t)(opal_event_base_t *evbase,
opal_event_t *ev,
opal_event_callback_fn_t cbfunc, void *cbdata);
/**
* Delete a timer event.
@ -127,7 +135,8 @@ typedef int (*opal_event_base_module_evtimer_initialized_fn_t)(opal_event_t *ev)
typedef int (*opal_event_base_module_signal_add_fn_t)(opal_event_t *ev, struct timeval *tv);
typedef int (*opal_event_base_module_signal_set_fn_t)(opal_event_t *ev, int fd,
typedef int (*opal_event_base_module_signal_set_fn_t)(opal_event_base_t *evbase,
opal_event_t *ev, int fd,
opal_event_callback_fn_t cbfunc, void *cbdata);
typedef int (*opal_event_base_module_signal_del_fn_t)(opal_event_t *ev);
@ -136,13 +145,17 @@ typedef int (*opal_event_base_module_signal_pending_fn_t)(opal_event_t *ev, stru
typedef int (*opal_event_base_module_signal_initialized_fn_t)(opal_event_t *ev);
typedef int (*opal_event_base_module_loop_fn_t)(int flags);
typedef int (*opal_event_base_module_loop_fn_t)(opal_event_base_t *evbase, int flags);
/* construct/destruct the event struct hidden inside the opal_event_t object */
typedef void (*opal_event_base_module_construct_fn_t)(opal_event_t *ev);
typedef void (*opal_event_base_module_destruct_fn_t)(opal_event_t *ev);
/* construct/destruct the event base hidden inside the opal_event_base_t object */
typedef void (*opal_event_base_construct_base_fn_t)(opal_event_base_t *evbase);
typedef void (*opal_event_base_destruct_base_fn_t)(opal_event_base_t *evbase);
/* This is to prevent event library from picking up the win32_ops
since this will be picked up over select(). By using select, we can
@ -176,13 +189,13 @@ struct opal_event_base_module_1_0_0_t {
/* constructor/destructor needed for event struct */
opal_event_base_module_construct_fn_t construct;
opal_event_base_module_destruct_fn_t destruct;
/* constructor/destructor needed for event_base struct */
opal_event_base_construct_base_fn_t construct_base;
opal_event_base_destruct_base_fn_t destruct_base;
/* all API functions */
opal_event_base_module_init_fn_t init;
opal_event_base_module_fini_fn_t finalize;
opal_event_base_module_set_debug_output_fn_t set_debug_output;
opal_event_base_module_enable_fn_t enable;
opal_event_base_module_disable_fn_t disable;
opal_event_base_module_restart_fn_t restart;
opal_event_base_module_set_fn_t set;
opal_event_base_module_add_fn_t add;
opal_event_base_module_del_fn_t del;
@ -217,6 +230,7 @@ typedef struct opal_event_base_module_1_0_0_t opal_event_module_t;
/* Global structure for accessing event functions */
OPAL_DECLSPEC extern opal_event_module_t opal_event;
OPAL_DECLSPEC extern opal_event_base_t *opal_event_base;
END_C_DECLS

View File

@ -2110,8 +2110,11 @@ event_del(struct event *ev)
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
return 0;
#if 0
event_warnx("%s: event has no event_base set.", __func__);
return -1;
#endif
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

View File

@ -47,46 +47,47 @@
#include "opal/mca/event/event.h"
static int opal_event_inited = 0;
static bool opal_event_enabled = false;
static struct event_base *current_base = NULL;
static void constructor(opal_event_t *ev);
static void destructor(opal_event_t *ev);
static void construct_base(opal_event_base_t *evbase);
static void destruct_base(opal_event_base_t *evbase);
static int init(void);
static int finalize(void);
static void set_debug_output(bool output);
static int enable(void);
static int disable(void);
static int restart(void);
static int set(opal_event_t *ev, int fd, short events,
static int set(opal_event_base_t *evbase,
opal_event_t *ev, int fd, short events,
opal_event_callback_fn_t cbfunc, void *arg);
static int add(opal_event_t *ev, const struct timeval *tv);
static int del(opal_event_t *ev);
static int get_signal(opal_event_t *ev);
static int dispatch(void);
static opal_event_t* module_evtimer_new(opal_event_callback_fn_t cbfunc, void *cbdata);
static int dispatch(opal_event_base_t *evbase);
static opal_event_t* module_evtimer_new(opal_event_base_t *evbase,
opal_event_callback_fn_t cbfunc,
void *cbdata);
static int module_evtimer_add(opal_event_t *ev, const struct timeval *tv);
static void module_evtimer_set(opal_event_t *ev, opal_event_callback_fn_t cbfunc, void *cbdata);
static void module_evtimer_set(opal_event_base_t *evbase,
opal_event_t *ev,
opal_event_callback_fn_t cbfunc, void *cbdata);
static int module_evtimer_del(opal_event_t *ev);
static int module_evtimer_pending(opal_event_t *ev, struct timeval *tv);
static int module_evtimer_initialized(opal_event_t *ev);
static int module_signal_add(opal_event_t *ev, struct timeval *tv);
static int module_signal_set(opal_event_t *ev, int fd, opal_event_callback_fn_t cbfunc, void *cbdata);
static int module_signal_set(opal_event_base_t *evbase,
opal_event_t *ev, int fd,
opal_event_callback_fn_t cbfunc, void *cbdata);
static int module_signal_del(opal_event_t *ev);
static int module_signal_pending(opal_event_t *ev, struct timeval *tv);
static int module_signal_initialized(opal_event_t *ev);
static int loop(int flags);
static int loop(opal_event_base_t *evbase, int flags);
const opal_event_module_t opal_event_libevent207 = {
constructor,
destructor,
construct_base,
destruct_base,
init,
finalize,
set_debug_output,
enable,
disable,
restart,
set,
add,
del,
@ -156,6 +157,7 @@ static const struct eventop *eventops[] = {
};
static int debug_output = -1;
static struct event_config *config=NULL;
static void constructor(opal_event_t *ev)
{
@ -166,16 +168,34 @@ static void constructor(opal_event_t *ev)
static void destructor(opal_event_t *ev)
{
if (NULL != ev->event) {
free(ev->event);
event_free(ev->event);
}
}
static void construct_base(opal_event_base_t *evbase)
{
struct event_base *base;
base = event_base_new_with_config(config);
if (NULL == base) {
/* there is no backend method that does what we want */
opal_output(0, "No event method available");
evbase->base = NULL;
return;
}
evbase->base = (void*)base;
}
static void destruct_base(opal_event_base_t *evbase)
{
if (NULL != evbase->base) {
event_base_free(evbase->base);
evbase->base = NULL;
}
}
static int init(void)
{
if(opal_event_inited++ != 0) {
return OPAL_SUCCESS;
}
if (4 < opal_output_get_verbosity(opal_event_base_output)) {
debug_output = opal_output_open(NULL);
event_enable_debug_mode();
@ -209,7 +229,6 @@ static int init(void)
* - ...?
*/
{
struct event_config *config;
char* event_module_include=NULL;
char **modules=NULL, **includes=NULL;
bool dumpit;
@ -283,17 +302,6 @@ static int init(void)
}
opal_argv_free(includes);
opal_argv_free(modules);
current_base = event_base_new_with_config(config);
if (NULL == current_base) {
/* there is no backend method that does what we want */
opal_output(0, "No event method available");
event_config_free(config);
return OPAL_ERR_FATAL;
}
event_config_free(config);
enable();
}
return OPAL_SUCCESS;
@ -301,11 +309,6 @@ static int init(void)
static int finalize(void)
{
OPAL_OUTPUT((debug_output, "event: finalized event library"));
disable();
opal_event_inited--;
return OPAL_SUCCESS;
}
@ -314,34 +317,12 @@ static void set_debug_output(bool output)
event_set_debug_output(output);
}
static int enable(void)
{
OPAL_OUTPUT((debug_output, "event: event library enabled"));
opal_event_enabled = true;
return OPAL_SUCCESS;
}
static int disable(void)
{
OPAL_OUTPUT((debug_output, "event: event library disabled"));
opal_event_enabled = false;
return OPAL_SUCCESS;
}
static int restart(void)
{
enable();
return (OPAL_SUCCESS);
}
static int set(opal_event_t *ev, int fd, short events,
static int set(opal_event_base_t *evbase,
opal_event_t *ev, int fd, short events,
opal_event_callback_fn_t cbfunc, void *arg)
{
OPAL_OUTPUT((debug_output, "event: event set called"));
return event_assign(ev->event, current_base, fd, events, cbfunc, arg);
return event_assign(ev->event, evbase->base, fd, events, cbfunc, arg);
}
static int add(opal_event_t *ev, const struct timeval *tv)
@ -358,12 +339,13 @@ int del(opal_event_t *ev)
/**** TIMER APIs ****/
static opal_event_t* module_evtimer_new(opal_event_callback_fn_t cbfunc, void *cbdata)
static opal_event_t* module_evtimer_new(opal_event_base_t *evbase,
opal_event_callback_fn_t cbfunc, void *cbdata)
{
opal_event_t *tmp;
tmp = OBJ_NEW(opal_event_t);
event_assign(tmp->event, current_base, -1, 0, cbfunc, cbdata);
event_assign(tmp->event, evbase->base, -1, 0, cbfunc, cbdata);
OPAL_OUTPUT((debug_output, "event: timer event created"));
return tmp;
@ -375,10 +357,12 @@ static int module_evtimer_add(opal_event_t *ev, const struct timeval *tv)
return event_add(ev->event, tv);
}
static void module_evtimer_set(opal_event_t *ev, opal_event_callback_fn_t cbfunc, void *cbdata)
static void module_evtimer_set(opal_event_base_t *evbase,
opal_event_t *ev,
opal_event_callback_fn_t cbfunc, void *cbdata)
{
OPAL_OUTPUT((debug_output, "event: timer event set"));
event_assign(ev->event, current_base, -1, 0, cbfunc, cbdata);
event_assign(ev->event, evbase->base, -1, 0, cbfunc, cbdata);
}
static int module_evtimer_del(opal_event_t *ev)
@ -405,10 +389,11 @@ static int module_signal_add(opal_event_t *ev, struct timeval *tv)
return event_add(ev->event, tv);
}
static int module_signal_set(opal_event_t *ev, int fd, opal_event_callback_fn_t cbfunc, void *cbdata)
static int module_signal_set(opal_event_base_t *evbase,
opal_event_t *ev, int fd, opal_event_callback_fn_t cbfunc, void *cbdata)
{
OPAL_OUTPUT((debug_output, "event: signal event set"));
return event_assign(ev->event, current_base, fd, EV_SIGNAL|EV_PERSIST, cbfunc, cbdata);
return event_assign(ev->event, evbase->base, fd, EV_SIGNAL|EV_PERSIST, cbfunc, cbdata);
}
static int module_signal_del(opal_event_t *ev)
@ -432,19 +417,19 @@ static int get_signal(opal_event_t *ev)
return event_get_signal(ev->event);
}
static int loop(int flags)
static int loop(opal_event_base_t *evbase, int flags)
{
int rc;
OPAL_OUTPUT((debug_output, "event: looping event library"));
rc = event_base_loop(current_base, flags);
rc = event_base_loop(evbase->base, flags);
assert(rc >= 0);
return rc;
}
static int dispatch(void)
static int dispatch(opal_event_base_t *evbase)
{
OPAL_OUTPUT((debug_output, "event: dispatching event library"));
return event_base_loop(current_base, 0);
return event_base_loop(evbase->base, 0);
}

View File

@ -186,7 +186,7 @@ opal_progress(void)
event_progress_last_time = (num_event_users > 0) ?
now - event_progress_delta : now;
events += opal_event.loop(opal_progress_event_flag);
events += opal_event.loop(opal_event_base, opal_progress_event_flag);
}
#else /* OPAL_PROGRESS_USE_TIMERS */
@ -195,7 +195,7 @@ opal_progress(void)
if (OPAL_THREAD_ADD32(&event_progress_counter, -1) <= 0 ) {
event_progress_counter =
(num_event_users > 0) ? 0 : event_progress_delta;
events += opal_event.loop(opal_progress_event_flag);
events += opal_event.loop(opal_event_base, opal_progress_event_flag);
}
#endif /* OPAL_PROGRESS_USE_TIMERS */

View File

@ -156,7 +156,7 @@ void init_before_spawn(orte_job_t *jdata)
attach_fifo);
free(attach_fifo);
fifo_active = true;
opal_event.set(&attach, attach_fd, OPAL_EV_READ, attach_debugger, NULL);
opal_event.set(opal_event_base, &attach, attach_fd, OPAL_EV_READ, attach_debugger, NULL);
opal_event.add(&attach, 0);
}
return;

View File

@ -512,7 +512,7 @@ static void errmgr_autor_process_fault_app(orte_job_t *jdata,
if( !autor_timer_active ) {
autor_timer_active = true;
opal_event.evtimer_set(autor_timer_event, errmgr_autor_recover_processes, NULL);
opal_event.evtimer_set(opal_event_base, autor_timer_event, errmgr_autor_recover_processes, NULL);
soon.tv_sec = mca_errmgr_hnp_component.autor_recovery_delay;
soon.tv_usec = 0;
opal_event.evtimer_add(autor_timer_event, &soon);

View File

@ -99,28 +99,28 @@ int orte_ess_base_orted_setup(char **hosts)
#ifndef __WINDOWS__
/* setup callback for SIGPIPE */
OBJ_CONSTRUCT(&epipe_handler, opal_event_t);
opal_event.signal_set(&epipe_handler, SIGPIPE,
opal_event.signal_set(opal_event_base, &epipe_handler, SIGPIPE,
epipe_signal_callback, &epipe_handler);
opal_event.signal_add(&epipe_handler, NULL);
/* Set signal handlers to catch kill signals so we can properly clean up
* after ourselves.
*/
OBJ_CONSTRUCT(&term_handler, opal_event_t);
opal_event.set(&term_handler, SIGTERM, OPAL_EV_SIGNAL,
opal_event.set(opal_event_base, &term_handler, SIGTERM, OPAL_EV_SIGNAL,
shutdown_signal, NULL);
opal_event.add(&term_handler, NULL);
OBJ_CONSTRUCT(&int_handler, opal_event_t);
opal_event.set(&int_handler, SIGINT, OPAL_EV_SIGNAL,
opal_event.set(opal_event_base, &int_handler, SIGINT, OPAL_EV_SIGNAL,
shutdown_signal, NULL);
opal_event.add(&int_handler, NULL);
/** setup callbacks for signals we should ignore */
OBJ_CONSTRUCT(&sigusr1_handler, opal_event_t);
opal_event.signal_set(&sigusr1_handler, SIGUSR1,
opal_event.signal_set(opal_event_base, &sigusr1_handler, SIGUSR1,
signal_callback, &sigusr1_handler);
opal_event.signal_add(&sigusr1_handler, NULL);
OBJ_CONSTRUCT(&sigusr2_handler, opal_event_t);
opal_event.signal_set(&sigusr2_handler, SIGUSR2,
opal_event.signal_set(opal_event_base, &sigusr2_handler, SIGUSR2,
signal_callback, &sigusr2_handler);
opal_event.signal_add(&sigusr2_handler, NULL);
#endif /* __WINDOWS__ */

View File

@ -149,7 +149,7 @@ static int rte_init(void)
#ifndef __WINDOWS__
/* setup callback for SIGPIPE */
OBJ_CONSTRUCT(&epipe_handler, opal_event_t);
opal_event.signal_set(&epipe_handler, SIGPIPE,
opal_event.signal_set(opal_event_base, &epipe_handler, SIGPIPE,
epipe_signal_callback, &epipe_handler);
opal_event.signal_add(&epipe_handler, NULL);
/** setup callbacks for abort signals - from this point
@ -157,30 +157,30 @@ static int rte_init(void)
* to cleanup
*/
OBJ_CONSTRUCT(&term_handler, opal_event_t);
opal_event.signal_set(&term_handler, SIGTERM,
opal_event.signal_set(opal_event_base, &term_handler, SIGTERM,
abort_signal_callback, &term_handler);
opal_event.signal_add(&term_handler, NULL);
OBJ_CONSTRUCT(&int_handler, opal_event_t);
opal_event.signal_set(&int_handler, SIGINT,
opal_event.signal_set(opal_event_base, &int_handler, SIGINT,
abort_signal_callback, &int_handler);
opal_event.signal_add(&int_handler, NULL);
/** setup callbacks for signals we should foward */
OBJ_CONSTRUCT(&sigusr1_handler, opal_event_t);
opal_event.signal_set(&sigusr1_handler, SIGUSR1,
opal_event.signal_set(opal_event_base, &sigusr1_handler, SIGUSR1,
signal_forward_callback, &sigusr1_handler);
opal_event.signal_add(&sigusr1_handler, NULL);
OBJ_CONSTRUCT(&sigusr2_handler, opal_event_t);
opal_event.signal_set(&sigusr2_handler, SIGUSR2,
opal_event.signal_set(opal_event_base, &sigusr2_handler, SIGUSR2,
signal_forward_callback, &sigusr2_handler);
opal_event.signal_add(&sigusr2_handler, NULL);
if (orte_forward_job_control) {
OBJ_CONSTRUCT(&sigtstp_handler, opal_event_t);
opal_event.signal_set(&sigtstp_handler, SIGTSTP,
opal_event.signal_set(opal_event_base, &sigtstp_handler, SIGTSTP,
signal_forward_callback, &sigtstp_handler);
opal_event.signal_add(&sigtstp_handler, NULL);
OBJ_CONSTRUCT(&sigcont_handler, opal_event_t);
opal_event.signal_set(&sigcont_handler, SIGCONT,
opal_event.signal_set(opal_event_base, &sigcont_handler, SIGCONT,
signal_forward_callback, &sigcont_handler);
opal_event.signal_add(&sigcont_handler, NULL);
}

View File

@ -138,7 +138,8 @@ typedef struct orte_iof_base_t orte_iof_base_t;
ep->tag = (tg); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
opal_event.set(&(ep->wev->ev), ep->wev->fd, \
opal_event.set(opal_event_base, \
&(ep->wev->ev), ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep); \
} \
@ -172,7 +173,8 @@ typedef struct orte_iof_base_t orte_iof_base_t;
*(rv) = rev; \
rev->file = strdup(__FILE__); \
rev->line = __LINE__; \
opal_event.set(&rev->ev, (fid), \
opal_event.set(opal_event_base, \
&rev->ev, (fid), \
OPAL_EV_READ, \
(cbfunc), rev); \
if ((actv)) { \
@ -193,7 +195,8 @@ typedef struct orte_iof_base_t orte_iof_base_t;
ep->tag = (tg); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
opal_event.set(&(ep->wev->ev), ep->wev->fd, \
opal_event.set(opal_event_base, \
&(ep->wev->ev), ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep); \
} \
@ -211,7 +214,8 @@ typedef struct orte_iof_base_t orte_iof_base_t;
rev->name.vpid = (nm)->vpid; \
rev->tag = (tg); \
*(rv) = rev; \
opal_event.set(&rev->ev, (fid), \
opal_event.set(opal_event_base, \
&rev->ev, (fid), \
OPAL_EV_READ, \
(cbfunc), rev); \
if ((actv)) { \

View File

@ -313,7 +313,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
* filedescriptor is not a tty, don't worry about it
* and always stay connected.
*/
opal_event.signal_set(&mca_iof_hnp_component.stdinsig,
opal_event.signal_set(opal_event_base, &mca_iof_hnp_component.stdinsig,
SIGCONT, orte_iof_hnp_stdin_cb,
NULL);

View File

@ -484,7 +484,7 @@ mca_oob_tcp_create_connection(const int accepted_fd,
/* wait for receipt of peers process identifier to complete this connection */
event = OBJ_NEW(mca_oob_tcp_event_t);
opal_event.set(&event->event, accepted_fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event.set(opal_event_base, &event->event, accepted_fd, OPAL_EV_READ, mca_oob_tcp_recv_handler, event);
opal_event.add(&event->event, 0);
}
@ -1082,12 +1082,12 @@ mca_oob_tcp_accept_thread_handler(int sd, short flags, void* user)
tv.tv_sec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec;
tv.tv_usec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec;
#ifdef HAVE_PIPE
opal_event.set(&mca_oob_tcp_component.tcp_listen_thread_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp_listen_thread_event,
mca_oob_tcp_component.tcp_connections_pipe[0],
OPAL_EV_READ,
mca_oob_tcp_accept_thread_handler, NULL);
#else
opal_event.set(&mca_oob_tcp_component.tcp_listen_thread_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp_listen_thread_event,
-1, 0,
mca_oob_tcp_accept_thread_handler, NULL);
#endif
@ -1118,12 +1118,12 @@ mca_oob_tcp_create_listen_thread(void)
tv.tv_sec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_sec;
tv.tv_usec = mca_oob_tcp_component.tcp_listen_thread_tv.tv_usec;
#ifdef HAVE_PIPE
opal_event.set(&mca_oob_tcp_component.tcp_listen_thread_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp_listen_thread_event,
mca_oob_tcp_component.tcp_connections_pipe[0],
OPAL_EV_READ,
mca_oob_tcp_accept_thread_handler, NULL);
#else
opal_event.set(&mca_oob_tcp_component.tcp_listen_thread_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp_listen_thread_event,
-1, 0,
mca_oob_tcp_accept_thread_handler, NULL);
#endif
@ -1588,7 +1588,7 @@ int mca_oob_tcp_init(void)
mca_oob_tcp_component.tcp_listen_thread_sds[idx] =
mca_oob_tcp_component.tcp_listen_sd;
} else {
opal_event.set(&mca_oob_tcp_component.tcp_recv_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp_recv_event,
mca_oob_tcp_component.tcp_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
@ -1620,7 +1620,7 @@ int mca_oob_tcp_init(void)
mca_oob_tcp_component.tcp_listen_thread_sds[idx] =
mca_oob_tcp_component.tcp6_listen_sd;
} else {
opal_event.set(&mca_oob_tcp_component.tcp6_recv_event,
opal_event.set(opal_event_base, &mca_oob_tcp_component.tcp6_recv_event,
mca_oob_tcp_component.tcp6_listen_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_recv_handler,
@ -1671,8 +1671,9 @@ int mca_oob_tcp_fini(void)
void *data;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
#if 0
opal_event.disable(); /* disable event processing */
#endif
/* shut down the listening system */
if (OOB_TCP_LISTEN_THREAD == mca_oob_tcp_component.tcp_listen_type) {
mca_oob_tcp_component.tcp_shutdown = true;
@ -1720,7 +1721,6 @@ int mca_oob_tcp_fini(void)
OBJ_RELEASE(event);
}
opal_event.enable();
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return ORTE_SUCCESS;
}
@ -1976,20 +1976,24 @@ int mca_oob_tcp_ft_event(int state) {
opal_list_item_t *item;
if(OPAL_CRS_CHECKPOINT == state) {
#if 0
/*
* Disable event processing while we are working
*/
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_event.disable();
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
#endif
}
else if(OPAL_CRS_CONTINUE == state) {
#if 0
/*
* Resume event processing
*/
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
opal_event.enable();
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
#endif
}
else if(OPAL_CRS_RESTART == state) {
/*
@ -2021,7 +2025,9 @@ int mca_oob_tcp_ft_event(int state) {
/*
* Resume event processing
*/
#if 0
opal_event.enable();
#endif
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
}
else if(OPAL_CRS_TERM == state ) {

View File

@ -83,7 +83,7 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
if(opal_event_progress_thread()) {
int rc;
OPAL_THREAD_UNLOCK(&msg->msg_lock);
rc = opal_event.loop(OPAL_EVLOOP_ONCE);
rc = opal_event.loop(opal_event_base, OPAL_EVLOOP_ONCE);
assert(rc >= 0);
OPAL_THREAD_LOCK(&msg->msg_lock);
} else {
@ -112,7 +112,7 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
library, as EVLOOP_ONCE may block for a short period of
time. */
opal_progress();
opal_event.loop(OPAL_EVLOOP_NONBLOCK);
opal_event.loop(opal_event_base, OPAL_EVLOOP_NONBLOCK);
OPAL_CR_TEST_CHECKPOINT_READY();
}
#endif
@ -146,7 +146,7 @@ int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* rc, struct timespec*
if(opal_event_progress_thread()) {
int rc;
OPAL_THREAD_UNLOCK(&msg->msg_lock);
rc = opal_event.loop(OPAL_EVLOOP_ONCE);
rc = opal_event.loop(opal_event_base, OPAL_EVLOOP_ONCE);
assert(rc >= 0);
OPAL_THREAD_LOCK(&msg->msg_lock);
} else {
@ -162,7 +162,7 @@ int mca_oob_tcp_msg_timedwait(mca_oob_tcp_msg_t* msg, int* rc, struct timespec*
((uint32_t)tv.tv_sec == secs && (uint32_t)tv.tv_usec < usecs))) {
/* see comment in tcp_msg_wait, above */
opal_progress();
opal_event.loop(OPAL_EVLOOP_NONBLOCK);
opal_event.loop(opal_event_base, OPAL_EVLOOP_NONBLOCK);
gettimeofday(&tv,NULL);
}
#endif

View File

@ -106,7 +106,7 @@ static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
peer->peer_sd = -1;
peer->peer_current_af = AF_UNSPEC;
OBJ_CONSTRUCT(&peer->peer_timer_event, opal_event_t);
opal_event.evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);
opal_event.evtimer_set(opal_event_base, &peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);
}
/*
@ -139,13 +139,13 @@ static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
OBJ_CONSTRUCT(&peer->peer_send_event, opal_event_t);
if (peer->peer_sd >= 0) {
opal_event.set(
opal_event.set(opal_event_base,
&peer->peer_recv_event,
peer->peer_sd,
OPAL_EV_READ|OPAL_EV_PERSIST,
mca_oob_tcp_peer_recv_handler,
peer);
opal_event.set(
opal_event.set(opal_event_base,
&peer->peer_send_event,
peer->peer_sd,
OPAL_EV_WRITE|OPAL_EV_PERSIST,

View File

@ -188,7 +188,7 @@ mca_oob_tcp_ping(const orte_process_name_t* name,
/* Ignore SIGPIPE in the write -- determine success or failure in
the ping by looking at the return code from write() */
OBJ_CONSTRUCT(&sigpipe_handler, opal_event_t);
opal_event.signal_set(&sigpipe_handler, SIGPIPE,
opal_event.signal_set(opal_event_base, &sigpipe_handler, SIGPIPE,
noop, &sigpipe_handler);
opal_event.signal_add(&sigpipe_handler, NULL);
#endif

View File

@ -92,7 +92,7 @@ int orte_plm_base_comm_start(void)
#endif
OBJ_CONSTRUCT(&ready, opal_event_t);
opal_event.set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL);
opal_event.set(opal_event_base, &ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL);
opal_event.add(&ready, 0);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,

View File

@ -1484,8 +1484,9 @@ static int orte_plm_process_launch_threaded(orte_jobid_t jobid)
if( opal_event_progress_thread() ) {
stack.rc = orte_plm_process_launch( jobid );
} else {
opal_evtimer_set(&event, orte_plm_process_launch_cb, &stack);
opal_evtimer_add(&event, &tv);
OBJ_CONSTRUCT(&event, opal_event_t);
opal_event.evtimer_set(opal_event_base, &event, orte_plm_process_launch_cb, &stack);
opal_event.evtimer_add(&event, &tv);
OPAL_THREAD_LOCK(&stack.mutex);
while (stack.complete == false) {

View File

@ -1569,7 +1569,8 @@ static int orte_plm_rsh_launch_threaded(orte_jobid_t jobid)
if( opal_event_progress_thread() ) {
stack.rc = orte_plm_rsh_launch( jobid );
} else {
opal_evtimer_set(&event, orte_plm_rsh_launch_cb, &stack);
OBJ_CONSTRUCT(&event, opal_event_t);
opal_evtimer_set(opal_event_base, &event, orte_plm_rsh_launch_cb, &stack);
opal_evtimer_add(&event, &tv);
OPAL_THREAD_LOCK(&stack.mutex);

View File

@ -478,7 +478,8 @@ static int orte_plm_rshd_launch_threaded(orte_jobid_t jobid)
if( opal_event_progress_thread() ) {
stack.rc = orte_plm_rshd_launch( jobid );
} else {
opal_evtimer_set(&event, orte_plm_rshd_launch_cb, &stack);
OBJ_CONSTRUCT(&event, opal_event_t);
opal_evtimer_set(opal_event_base, &event, orte_plm_rshd_launch_cb, &stack);
opal_evtimer_add(&event, &tv);
OPAL_THREAD_LOCK(&stack.mutex);

View File

@ -1034,7 +1034,8 @@ static int orte_plm_submit_launch_threaded(orte_jobid_t jobid)
if( opal_event_progress_thread() ) {
stack.rc = orte_plm_submit_launch( jobid );
} else {
opal_evtimer_set(&event, orte_plm_submit_launch_cb, &stack);