1
1

Checkpoint the threading support for multicast - will be revised shortly, but this version currently works.

This commit was SVN r24117.
Этот коммит содержится в:
Ralph Castain 2010-11-30 17:30:16 +00:00
родитель 0465605a9c
Коммит d20c023348
5 изменённых файлов: 39 добавлений и 58 удалений

Просмотреть файл

@ -56,12 +56,11 @@ typedef struct {
rmcast_base_channel_t *my_output_channel; rmcast_base_channel_t *my_output_channel;
rmcast_base_channel_t *my_input_channel; rmcast_base_channel_t *my_input_channel;
bool enable_progress_thread; bool enable_progress_thread;
int recv_ctl_pipe[2];
int process_ctl_pipe[2];
opal_list_t msg_list; opal_list_t msg_list;
opal_event_base_t *event_base; opal_event_base_t *event_base;
opal_thread_t recv_thread; opal_thread_t recv_thread;
orte_thread_ctl_t recv_ctl; orte_thread_ctl_t recv_ctl;
int recv_pipe[2];
opal_thread_t recv_process; opal_thread_t recv_process;
orte_thread_ctl_t recv_process_ctl; orte_thread_ctl_t recv_process_ctl;
} orte_rmcast_base_t; } orte_rmcast_base_t;

Просмотреть файл

@ -142,23 +142,23 @@ typedef struct {
} rmcast_send_log_t; } rmcast_send_log_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t); ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \ #define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
do { \ do { \
orte_mcast_msg_event_t *mev; \ char byte='a'; \
char byte='a'; \ orte_mcast_msg_event_t *mev; \
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \ OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
"defining mcast msg event: %s %d", \ "defining mcast msg event: %s %d", \
__FILE__, __LINE__)); \ __FILE__, __LINE__)); \
mev = OBJ_NEW(orte_mcast_msg_event_t); \ mev = OBJ_NEW(orte_mcast_msg_event_t); \
opal_dss.load(mev->buf, (dt), (sz)); \ opal_dss.load(mev->buf, (dt), (sz)); \
if (orte_rmcast_base.enable_progress_thread) { \ if (orte_rmcast_base.enable_progress_thread) { \
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \ ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \
opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \ opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \ ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte); \ opal_fd_write(orte_rmcast_base.recv_pipe[1], 1, &byte); \
} else { \ } else { \
orte_rmcast_base_process_msg(mev); \ orte_rmcast_base_process_msg(mev); \
} \ } \
} while(0); } while(0);

Просмотреть файл

@ -131,8 +131,8 @@ int orte_rmcast_base_open(void)
/* whether or not to use progress thread */ /* whether or not to use progress thread */
mca_base_param_reg_int_name("rmcast", "enable_progress_thread", mca_base_param_reg_int_name("rmcast", "enable_progress_thread",
"Whether or not to enable progress thread (default: false)", "Whether or not to enable progress thread (default: true)",
false, false, (int)false, &value); false, false, (int)true, &value);
orte_rmcast_base.enable_progress_thread = OPAL_INT_TO_BOOL(value); orte_rmcast_base.enable_progress_thread = OPAL_INT_TO_BOOL(value);
if (orte_rmcast_base.enable_progress_thread) { if (orte_rmcast_base.enable_progress_thread) {

Просмотреть файл

@ -44,11 +44,9 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv thread", "%s rmcast:base: starting recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (pipe(orte_rmcast_base.recv_ctl_pipe) < 0) { /* set the wakeup pipe to target the rmcast event base */
opal_output(0, "%s Cannot open recv thread ctl pipe", orte_rmcast_base.recv_ctl.wakeup_pipe = orte_rmcast_base.event_base->wakeup_pipe[1];
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* start the thread */
return ORTE_ERR_OUT_OF_RESOURCE;
}
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread; orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) { if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -64,11 +62,19 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv processing thread", "%s rmcast:base: starting recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (pipe(orte_rmcast_base.process_ctl_pipe) < 0) { /* set the wakeup pipe to target the rmcast event base */
opal_output(0, "%s Cannot open processing thread ctl pipe", orte_rmcast_base.recv_process_ctl.wakeup_pipe = orte_rmcast_base.event_base->wakeup_pipe[1];
/* setup a pipe that we will use to signal the thread that a message
* is waiting to be processed - don't define an event for it
*/
if (pipe(orte_rmcast_base.recv_pipe) < 0) {
opal_output(0, "%s Cannot open recv processing thread ctl pipe",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return ORTE_ERR_OUT_OF_RESOURCE; return ORTE_ERR_OUT_OF_RESOURCE;
} }
/* start the thread - we will send it a specific character when
* we want it to stop
*/
orte_rmcast_base.recv_process.t_run = rcv_processing_thread; orte_rmcast_base.recv_process.t_run = rcv_processing_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_process))) { if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_process))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -100,17 +106,13 @@ void orte_rmcast_base_stop_threads(void)
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
orte_rmcast_base.recv_ctl.stop = true; orte_rmcast_base.recv_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl); ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_ctl);
opal_fd_write(orte_rmcast_base.recv_ctl_pipe[1], 1, &byte);
opal_thread_join(&orte_rmcast_base.recv_thread, NULL); opal_thread_join(&orte_rmcast_base.recv_thread, NULL);
} }
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: stopping recv processing thread", "%s rmcast:base: stopping recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_rmcast_base.recv_process_ctl.running) { if (orte_rmcast_base.recv_process_ctl.running) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); opal_fd_write(orte_rmcast_base.recv_pipe[1], 1, &byte);
orte_rmcast_base.recv_process_ctl.stop = true;
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte);
opal_thread_join(&orte_rmcast_base.recv_process, NULL); opal_thread_join(&orte_rmcast_base.recv_process, NULL);
} }
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
@ -338,21 +340,14 @@ static void* rcv_processing_thread(opal_object_t *obj)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
while (1) { while (1) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl);
if (orte_rmcast_base.recv_process_ctl.stop) {
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
return OPAL_THREAD_CANCELLED;
}
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
/* block here until a trigger arrives */ /* block here until a trigger arrives */
if (0 > (rc = opal_fd_read(orte_rmcast_base.process_ctl_pipe[0], 1, &byte))) { if (0 > (rc = opal_fd_read(orte_rmcast_base.recv_pipe[0], 1, &byte))) {
/* if something bad happened, punt */ /* if something bad happened, punt */
opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); opal_output(0, "%s PUNTING THREAD", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return OPAL_THREAD_CANCELLED; return OPAL_THREAD_CANCELLED;
} }
/* check to see if this is a shutdown note */ /* check to see if we were told to stop */
if (byte == 's') { if ('s' == byte) {
return OPAL_THREAD_CANCELLED; return OPAL_THREAD_CANCELLED;
} }
@ -374,13 +369,6 @@ static void* rcv_processing_thread(opal_object_t *obj)
} }
} }
static opal_event_t stop_event;
static void stop_handler(int sd, short flags, void* cbdata)
{
opal_event_del(&stop_event);
return;
}
static void* rcv_progress_thread(opal_object_t *obj) static void* rcv_progress_thread(opal_object_t *obj)
{ {
int events=0; int events=0;
@ -389,13 +377,6 @@ static void* rcv_progress_thread(opal_object_t *obj)
"%s rmcast:base: recv thread operational", "%s rmcast:base: recv thread operational",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* define an event that will be used to kick us out of a blocking
* situation when we want to exit
*/
opal_event_set(orte_rmcast_base.event_base, &stop_event,
orte_rmcast_base.recv_ctl_pipe[0], OPAL_EV_READ, stop_handler, NULL);
opal_event_add(&stop_event, 0);
while (1) { while (1) {
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl); ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_ctl);
if (orte_rmcast_base.recv_ctl.stop) { if (orte_rmcast_base.recv_ctl.stop) {

Просмотреть файл

@ -741,7 +741,8 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
"%s setup:channel activating recv event on fd %d", "%s setup:channel activating recv event on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv)); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv));
opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); opal_event_set(orte_rmcast_base.event_base, &chan->recv_ev,
chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0); opal_event_add(&chan->recv_ev, 0);
} }