1
1
This commit was SVN r24031.
Этот коммит содержится в:
Ralph Castain 2010-11-10 19:36:44 +00:00
родитель e5e301b564
Коммит 22e40d92a0
2 изменённых файлов: 32 добавлений и 4 удалений

Просмотреть файл

@ -55,6 +55,7 @@ typedef struct {
opal_list_t channels;
rmcast_base_channel_t *my_output_channel;
rmcast_base_channel_t *my_input_channel;
int recv_ctl_pipe[2];
int process_ctl_pipe[2];
opal_list_t msg_list;
opal_event_base_t *event_base;

Просмотреть файл

@ -40,6 +40,11 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (pipe(orte_rmcast_base.recv_ctl_pipe) < 0) {
opal_output(0, "%s Cannot open recv thread ctl pipe",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_OUT_OF_RESOURCE;
}
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
ORTE_ERROR_LOG(rc);
@ -77,6 +82,8 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
void orte_rmcast_base_stop_threads(void)
{
char byte='s';
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: stopping recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -85,7 +92,7 @@ void orte_rmcast_base_stop_threads(void)
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
orte_rmcast_base.recv_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
opal_thread_kill(&orte_rmcast_base.recv_thread, SIGKILL);
opal_fd_write(orte_rmcast_base.recv_ctl_pipe[1], 1, &byte);
opal_thread_join(&orte_rmcast_base.recv_thread, NULL);
}
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
@ -95,7 +102,7 @@ void orte_rmcast_base_stop_threads(void)
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
orte_rmcast_base.recv_process_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
opal_thread_kill(&orte_rmcast_base.recv_process, SIGKILL);
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte);
opal_thread_join(&orte_rmcast_base.recv_process, NULL);
}
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
@ -136,6 +143,11 @@ static void* rcv_processing_thread(opal_object_t *obj)
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return OPAL_THREAD_CANCELLED;
}
/* check to see if this is a shutdown note */
if (byte == 's') {
return OPAL_THREAD_CANCELLED;
}
/* get a message off the list */
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
if (NULL == (msg = (orte_mcast_msg_event_t*)opal_list_remove_first(&orte_rmcast_base.msg_list))) {
@ -336,11 +348,18 @@ static void* rcv_processing_thread(opal_object_t *obj)
iovec_array = NULL;
iovec_count = 0;
}
OBJ_RELEASE(msg);
if (NULL != msg) {
OBJ_RELEASE(msg);
}
}
}
static opal_event_t stop_event;
static void stop_handler(int sd, short flags, void* cbdata)
{
opal_event_del(&stop_event);
return;
}
static void* rcv_progress_thread(opal_object_t *obj)
{
@ -349,6 +368,14 @@ static void* rcv_progress_thread(opal_object_t *obj)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: recv thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* define an event that will be used to kick us out of a blocking
* situation when we want to exit
*/
opal_event_set(orte_rmcast_base.event_base, &stop_event,
orte_rmcast_base.recv_ctl_pipe[0], OPAL_EV_READ, stop_handler, NULL);
opal_event_add(&stop_event, 0);
while (1) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.stop) {