From f9ffff59f8ecd10b299d7702c63ebe1a33110776 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 2 Dec 2010 00:23:42 +0000 Subject: [PATCH] Ensure clean termination of threads and tcp multicast This commit was SVN r24134. --- orte/mca/rmcast/base/rmcast_base_threads.c | 40 ++++++++++++++++++++-- orte/mca/rmcast/tcp/rmcast_tcp.c | 3 ++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/orte/mca/rmcast/base/rmcast_base_threads.c b/orte/mca/rmcast/base/rmcast_base_threads.c index 847252e8e8..6aae2ca48b 100644 --- a/orte/mca/rmcast/base/rmcast_base_threads.c +++ b/orte/mca/rmcast/base/rmcast_base_threads.c @@ -52,9 +52,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread) orte_rmcast_base.recv_thread.t_run = rcv_progress_thread; if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) { ORTE_ERROR_LOG(rc); + orte_rmcast_base.recv_ctl.running = false; return rc; } - orte_rmcast_base.recv_ctl.running = true; OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: recv thread started", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -82,9 +82,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread) orte_rmcast_base.recv_process.t_run = rcv_processing_thread; if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_process))) { ORTE_ERROR_LOG(rc); + orte_rmcast_base.recv_process_ctl.running = false; return rc; } - orte_rmcast_base.recv_process_ctl.running = true; OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: recv processing thread started", @@ -106,19 +106,27 @@ void orte_rmcast_base_stop_threads(void) "%s rmcast:base: stopping recv thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); /* if the thread is active, stop it */ + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); if (orte_rmcast_base.recv_ctl.running) { - ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); orte_rmcast_base.recv_ctl.stop = true; ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); opal_thread_join(&orte_rmcast_base.recv_thread, NULL); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); } + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); + OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: stopping recv processing thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); if (orte_rmcast_base.recv_process_ctl.running) { + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); opal_fd_write(orte_rmcast_base.recv_pipe[1], sizeof(opal_buffer_t*), &msg); opal_thread_join(&orte_rmcast_base.recv_process, NULL); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); } + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); + OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: all threads stopped", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -342,21 +350,36 @@ static void* rcv_processing_thread(opal_object_t *obj) { opal_buffer_t *msg; int rc; + struct timespec tp={0, 10}; OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: recv processing thread operational", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); + orte_rmcast_base.recv_process_ctl.running = true; + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); + while (1) { /* block here until a trigger arrives */ if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0], sizeof(opal_buffer_t*), &msg))) { /* if something bad happened, punt */ opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); + orte_rmcast_base.recv_process_ctl.running = false; + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); + /* give a little delay to ensure the main thread gets into + * opal_thread_join before we exit + */ + nanosleep(&tp, NULL); return OPAL_THREAD_CANCELLED; } /* check to see if we were told to stop */ if (NULL == msg) { + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); + orte_rmcast_base.recv_process_ctl.running = false; + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); return OPAL_THREAD_CANCELLED; } @@ -369,14 +392,25 @@ static void* rcv_processing_thread(opal_object_t *obj) static void* rcv_progress_thread(opal_object_t *obj) { + struct timespec tp={0, 10}; + OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, "%s rmcast:base: recv thread operational", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); + orte_rmcast_base.recv_ctl.running = true; + ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); + while (1) { ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); if (orte_rmcast_base.recv_ctl.stop) { + orte_rmcast_base.recv_ctl.running = false; ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); + /* give a little delay to ensure the main thread gets into + * opal_thread_join before we exit + */ + nanosleep(&tp, NULL); return OPAL_THREAD_CANCELLED; } ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); diff --git a/orte/mca/rmcast/tcp/rmcast_tcp.c b/orte/mca/rmcast/tcp/rmcast_tcp.c index 17fb9b1283..0d3e41da71 100644 --- a/orte/mca/rmcast/tcp/rmcast_tcp.c +++ b/orte/mca/rmcast/tcp/rmcast_tcp.c @@ -896,6 +896,9 @@ static void relay(int fd, short event, void *cbdata) item != opal_list_get_end(&orte_local_children); item = opal_list_get_next(item)) { child = (orte_odls_child_t*)item; + if (!child->alive) { + continue; + } if (NULL == child->rml_uri) { /* race condition */ OPAL_OUTPUT_VERBOSE((7, orte_rmcast_base.rmcast_output,