1
1

Ensure clean termination of threads and tcp multicast

This commit was SVN r24134.
Этот коммит содержится в:
Ralph Castain 2010-12-02 00:23:42 +00:00
родитель 94d4aa7253
Коммит f9ffff59f8
2 изменённых файлов: 40 добавлений и 3 удалений

Просмотреть файл

@ -52,9 +52,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
ORTE_ERROR_LOG(rc);
orte_rmcast_base.recv_ctl.running = false;
return rc;
}
orte_rmcast_base.recv_ctl.running = true;
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread started",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -82,9 +82,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
orte_rmcast_base.recv_process.t_run = rcv_processing_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_process))) {
ORTE_ERROR_LOG(rc);
orte_rmcast_base.recv_process_ctl.running = false;
return rc;
}
orte_rmcast_base.recv_process_ctl.running = true;
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv processing thread started",
@ -106,19 +106,27 @@ void orte_rmcast_base_stop_threads(void)
"%s rmcast:base: stopping recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if the thread is active, stop it */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.running) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
orte_rmcast_base.recv_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
opal_thread_join(&orte_rmcast_base.recv_thread, NULL);
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: stopping recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
if (orte_rmcast_base.recv_process_ctl.running) {
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
opal_fd_write(orte_rmcast_base.recv_pipe[1], sizeof(opal_buffer_t*), &msg);
opal_thread_join(&orte_rmcast_base.recv_process, NULL);
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: all threads stopped",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -342,21 +350,36 @@ static void* rcv_processing_thread(opal_object_t *obj)
{
opal_buffer_t *msg;
int rc;
struct timespec tp={0, 10};
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv processing thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
orte_rmcast_base.recv_process_ctl.running = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
while (1) {
/* block here until a trigger arrives */
if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0],
sizeof(opal_buffer_t*), &msg))) {
/* if something bad happened, punt */
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
orte_rmcast_base.recv_process_ctl.running = false;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
/* give a little delay to ensure the main thread gets into
* opal_thread_join before we exit
*/
nanosleep(&tp, NULL);
return OPAL_THREAD_CANCELLED;
}
/* check to see if we were told to stop */
if (NULL == msg) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
orte_rmcast_base.recv_process_ctl.running = false;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
return OPAL_THREAD_CANCELLED;
}
@ -369,14 +392,25 @@ static void* rcv_processing_thread(opal_object_t *obj)
static void* rcv_progress_thread(opal_object_t *obj)
{
struct timespec tp={0, 10};
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
orte_rmcast_base.recv_ctl.running = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
while (1) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.stop) {
orte_rmcast_base.recv_ctl.running = false;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
/* give a little delay to ensure the main thread gets into
* opal_thread_join before we exit
*/
nanosleep(&tp, NULL);
return OPAL_THREAD_CANCELLED;
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);

Просмотреть файл

@ -896,6 +896,9 @@ static void relay(int fd, short event, void *cbdata)
item != opal_list_get_end(&orte_local_children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
if (!child->alive) {
continue;
}
if (NULL == child->rml_uri) {
/* race condition */
OPAL_OUTPUT_VERBOSE((7, orte_rmcast_base.rmcast_output,