usnic: move cchecker to OPAL-wide progress thread
There's no longer any need for the usnic BTL to have its own progress thread: it can use the opal_progress_thread() infrastructure. This commit removes the code to startup/shutdown the usnic-BTL-specific progress thread and instead, just adds its events to the OPAL-wide progress thread. This necessitated a small change in the finalization step. Previously, we would stop the progress thread and then tear down the events. We can no longer stop the progress thread, and if we start tearing down events, this will cause shutdown/hangups to be sent across sockets, potentially firing some of the still-remaining events while some (but not all) of the data structures have been torn down. Chaos ensues. Instead, queue up an event to tear down all the pending events. Since the progress thread will only fire one event at a time, having a teardown event means that it can tear down all the pending events "atomically" and not have to worry that one of those events will get fired in the middle of the teardown process.
Этот коммит содержится в:
родитель
e77ff6b84e
Коммит
bad508687e
@ -226,6 +226,9 @@ typedef struct opal_btl_usnic_component_t {
|
||||
/ API >=v1.1, this is the endpoint.msg_prefix_size (i.e.,
|
||||
component.transport_header_len). */
|
||||
uint32_t prefix_send_offset;
|
||||
|
||||
/* OPAL async progress event base */
|
||||
opal_event_base_t *opal_evbase;
|
||||
} opal_btl_usnic_component_t;
|
||||
|
||||
OPAL_MODULE_DECLSPEC extern opal_btl_usnic_component_t mca_btl_usnic_component;
|
||||
|
@ -44,8 +44,7 @@ static opal_list_t ipc_listeners;
|
||||
tables for more efficient lookups */
|
||||
static opal_list_t pings_pending;
|
||||
static opal_list_t ping_results;
|
||||
static volatile bool agent_thread_time_to_exit = false;
|
||||
static opal_event_base_t *evbase = NULL;
|
||||
static volatile bool agent_initialized = false;
|
||||
|
||||
|
||||
/*
|
||||
@ -195,7 +194,7 @@ static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
|
||||
}
|
||||
|
||||
/* If the "active" flag is set, then the event is active and the
|
||||
item is on the ipc_listeners list */
|
||||
item is on the udp_port_listeners list */
|
||||
if (obj->active) {
|
||||
opal_event_del(&obj->event);
|
||||
opal_list_remove_item(&udp_port_listeners, &obj->super);
|
||||
@ -316,16 +315,6 @@ static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
|
||||
* All of the following functions run in agent thread
|
||||
**************************************************************************/
|
||||
|
||||
/*
|
||||
* A dummy function invoked in an event just for the purposes of
|
||||
* waking up the agent main thread (in case it was blocked in the
|
||||
* event loop with no other events to wake it up).
|
||||
*/
|
||||
static void agent_thread_noop(int fd, short flags, void *context)
|
||||
{
|
||||
/* Intentionally a no op */
|
||||
}
|
||||
|
||||
/*
|
||||
* Check to ensure that we expected to receive a ping from this sender
|
||||
* on the interface in which it was received (i.e., did the usnic
|
||||
@ -697,7 +686,8 @@ static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
|
||||
}
|
||||
|
||||
/* Create a listening event */
|
||||
opal_event_set(evbase, &udp_listener->event, udp_listener->fd,
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase,
|
||||
&udp_listener->event, udp_listener->fd,
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST,
|
||||
agent_thread_receive_ping, udp_listener);
|
||||
opal_event_add(&udp_listener->event, 0);
|
||||
@ -801,7 +791,7 @@ static void agent_thread_send_ping(int fd, short flags, void *context)
|
||||
}
|
||||
|
||||
/* Set a timer to check if these pings are ACKed */
|
||||
opal_event_set(evbase, &ap->timer,
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
|
||||
-1, 0, agent_thread_send_ping, ap);
|
||||
opal_event_add(&ap->timer, &ack_timeout);
|
||||
ap->timer_active = true;
|
||||
@ -1043,7 +1033,8 @@ static void agent_thread_accept(int fd, short flags, void *context)
|
||||
}
|
||||
|
||||
/* Add this IPC listener to the event base */
|
||||
opal_event_set(evbase, &listener->event, client_fd,
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase,
|
||||
&listener->event, client_fd,
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST,
|
||||
agent_thread_ipc_receive, listener);
|
||||
opal_event_add(&listener->event, 0);
|
||||
@ -1057,24 +1048,54 @@ static void agent_thread_accept(int fd, short flags, void *context)
|
||||
}
|
||||
|
||||
/*
|
||||
* Agent progress thread main entry point
|
||||
* Tear down all active events.
|
||||
*
|
||||
* This is done as an event callback in the agent threaf so that there
|
||||
* is no race condition in the teardown. Specifically: the progress
|
||||
* thread will only fire one event at a time. Therefore, this one
|
||||
* event can "atomically" delete all the events and data structures
|
||||
* and not have to worry about concurrent access from some event
|
||||
* firing in the middle of the teardown process.
|
||||
*/
|
||||
static void *agent_thread_main(opal_object_t *obj)
|
||||
static void agent_thread_finalize(int fd, short flags, void *context)
|
||||
{
|
||||
while (!agent_thread_time_to_exit) {
|
||||
opal_event_loop(evbase, OPAL_EVLOOP_ONCE);
|
||||
/* Remove the agent listening event from the opal async event
|
||||
base */
|
||||
opal_event_del(&ipc_event);
|
||||
|
||||
/* Shut down all active udp_port_listeners */
|
||||
agent_udp_port_listener_t *udp_listener, *ulnext;
|
||||
OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
|
||||
agent_udp_port_listener_t) {
|
||||
OBJ_RELEASE(udp_listener);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
/* Destroy the pending pings and ping results */
|
||||
agent_ping_t *request, *pnext;
|
||||
OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
|
||||
opal_list_remove_item(&pings_pending, &request->super);
|
||||
OBJ_RELEASE(request);
|
||||
}
|
||||
|
||||
OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
|
||||
opal_list_remove_item(&ping_results, &request->super);
|
||||
OBJ_RELEASE(request);
|
||||
}
|
||||
|
||||
/* Shut down all active ipc_listeners */
|
||||
agent_ipc_listener_t *ipc_listener, *inext;
|
||||
OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
|
||||
agent_ipc_listener_t) {
|
||||
OBJ_RELEASE(ipc_listener);
|
||||
}
|
||||
|
||||
agent_initialized = false;
|
||||
}
|
||||
|
||||
/**************************************************************************
|
||||
* All of the following functions run in the main application thread
|
||||
**************************************************************************/
|
||||
|
||||
static bool agent_initialized = false;
|
||||
static opal_thread_t agent_thread;
|
||||
|
||||
/*
|
||||
* Setup the agent and start its event loop running in a dedicated
|
||||
* thread
|
||||
@ -1090,9 +1111,6 @@ int opal_btl_usnic_connectivity_agent_init(void)
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Create the event base */
|
||||
evbase = opal_event_base_create();
|
||||
|
||||
/* Make a struct timeval for use with timer events. Note that the
|
||||
MCA param is expressed in terms of *milli*seconds, but the
|
||||
timeval timeout is expressed in terms of *micro*seconds. */
|
||||
@ -1154,23 +1172,12 @@ int opal_btl_usnic_connectivity_agent_init(void)
|
||||
}
|
||||
|
||||
/* Add the socket to the event base */
|
||||
opal_event_set(evbase, &ipc_event, ipc_accept_fd,
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase,
|
||||
&ipc_event, ipc_accept_fd,
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST,
|
||||
agent_thread_accept, NULL);
|
||||
opal_event_add(&ipc_event, 0);
|
||||
|
||||
/* Spawn the agent thread event loop */
|
||||
OBJ_CONSTRUCT(&agent_thread, opal_thread_t);
|
||||
agent_thread.t_run = agent_thread_main;
|
||||
agent_thread.t_arg = NULL;
|
||||
int ret;
|
||||
ret = opal_thread_start(&agent_thread);
|
||||
if (OPAL_SUCCESS != ret) {
|
||||
OPAL_ERROR_LOG(ret);
|
||||
ABORT("Failed to start usNIC agent thread");
|
||||
/* Will not return */
|
||||
}
|
||||
|
||||
opal_output_verbose(20, USNIC_OUT,
|
||||
"usNIC connectivity agent initialized");
|
||||
agent_initialized = true;
|
||||
@ -1182,45 +1189,26 @@ int opal_btl_usnic_connectivity_agent_init(void)
|
||||
*/
|
||||
int opal_btl_usnic_connectivity_agent_finalize(void)
|
||||
{
|
||||
agent_initialized = false;
|
||||
|
||||
/* Only do this if I have the agent running */
|
||||
if (NULL == evbase) {
|
||||
if (!agent_initialized) {
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Shut down the event loop. Send it a no-op event so that it
|
||||
wakes up and exits the loop. */
|
||||
/* Submit an event to the async thread and tell it to delete all
|
||||
the usNIC events. See the rationale for doing this in the
|
||||
comment in the agent_thread_finalize() function. */
|
||||
opal_event_t ev;
|
||||
agent_thread_time_to_exit = true;
|
||||
opal_event_set(evbase, &ev, -1, OPAL_EV_WRITE, agent_thread_noop, NULL);
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase,
|
||||
&ev, -1, OPAL_EV_WRITE, agent_thread_finalize, NULL);
|
||||
opal_event_active(&ev, OPAL_EV_WRITE, 1);
|
||||
opal_thread_join(&agent_thread, NULL);
|
||||
|
||||
/* Shut down all active udp_port_listeners */
|
||||
agent_udp_port_listener_t *udp_listener, *ulnext;
|
||||
OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
|
||||
agent_udp_port_listener_t) {
|
||||
OBJ_RELEASE(udp_listener);
|
||||
}
|
||||
|
||||
/* Destroy the pending pings and ping results */
|
||||
agent_ping_t *request, *pnext;
|
||||
OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
|
||||
opal_list_remove_item(&pings_pending, &request->super);
|
||||
OBJ_RELEASE(request);
|
||||
}
|
||||
|
||||
OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
|
||||
opal_list_remove_item(&ping_results, &request->super);
|
||||
OBJ_RELEASE(request);
|
||||
}
|
||||
|
||||
/* Shut down all active ipc_listeners */
|
||||
agent_ipc_listener_t *ipc_listener, *inext;
|
||||
OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
|
||||
agent_ipc_listener_t) {
|
||||
OBJ_RELEASE(ipc_listener);
|
||||
/* Wait for the event to fire and complete */
|
||||
while (agent_initialized) {
|
||||
struct timespec tp = {
|
||||
.tv_sec = 0,
|
||||
.tv_nsec = 1000
|
||||
};
|
||||
nanosleep(&tp, NULL);
|
||||
}
|
||||
|
||||
/* Close the local IPC socket and remove the file */
|
||||
|
@ -60,6 +60,7 @@
|
||||
#include "opal/mca/memchecker/base/base.h"
|
||||
#include "opal/util/show_help.h"
|
||||
#include "opal/constants.h"
|
||||
#include "opal/runtime/opal_progress_threads.h"
|
||||
|
||||
#if BTL_IN_OPAL
|
||||
#include "opal/mca/btl/btl.h"
|
||||
@ -210,6 +211,9 @@ static int usnic_component_close(void)
|
||||
opal_btl_usnic_connectivity_client_finalize();
|
||||
opal_btl_usnic_connectivity_agent_finalize();
|
||||
}
|
||||
if (mca_btl_usnic_component.opal_evbase) {
|
||||
opal_progress_thread_finalize(NULL);
|
||||
}
|
||||
|
||||
free(mca_btl_usnic_component.usnic_all_modules);
|
||||
free(mca_btl_usnic_component.usnic_active_modules);
|
||||
@ -907,8 +911,10 @@ static mca_btl_base_module_t** usnic_component_init(int* num_btl_modules,
|
||||
checking agent and client. */
|
||||
if (mca_btl_usnic_component.num_modules > 0 &&
|
||||
mca_btl_usnic_component.connectivity_enabled) {
|
||||
mca_btl_usnic_component.opal_evbase = opal_progress_thread_init(NULL);
|
||||
if (OPAL_SUCCESS != opal_btl_usnic_connectivity_agent_init() ||
|
||||
OPAL_SUCCESS != opal_btl_usnic_connectivity_client_init()) {
|
||||
opal_progress_thread_finalize(NULL);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -2101,7 +2101,8 @@ static void init_async_event(opal_btl_usnic_module_t *module)
|
||||
return;
|
||||
}
|
||||
|
||||
/* Get the fd to receive events on this device */
|
||||
/* Get the fd to receive events on this device. Keep this in the
|
||||
sync event base (not the async event base) */
|
||||
opal_event_set(opal_sync_event_base, &(module->device_async_event), fd,
|
||||
OPAL_EV_READ | OPAL_EV_PERSIST,
|
||||
module_async_event_callback, module);
|
||||
|
@ -212,7 +212,8 @@ int opal_btl_usnic_stats_init(opal_btl_usnic_module_t *module)
|
||||
module->stats.timeout.tv_sec = mca_btl_usnic_component.stats_frequency;
|
||||
module->stats.timeout.tv_usec = 0;
|
||||
|
||||
opal_event_set(opal_sync_event_base, &(module->stats.timer_event),
|
||||
opal_event_set(mca_btl_usnic_component.opal_evbase,
|
||||
&(module->stats.timer_event),
|
||||
-1, EV_TIMEOUT | EV_PERSIST,
|
||||
&usnic_stats_callback, module);
|
||||
opal_event_add(&(module->stats.timer_event),
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user