1
1

Extend the rmcast APIs to allow enable/disable of comm, required for clean termination by upper layer users.

Point the recv thread event base to the right place so it can wakeup when required.

Add a new error code for "comm disabled" when attempting to communicate after disabling comm.

This commit was SVN r24129.
Этот коммит содержится в:
Ralph Castain 2010-12-01 13:41:19 +00:00
родитель 9224302c10
Коммит eba65e97f3
6 изменённых файлов: 138 добавлений и 7 удалений

Просмотреть файл

@ -117,7 +117,8 @@ enum {
ORTE_ERR_UNRECOVERABLE = (ORTE_ERR_BASE - 35),
ORTE_ERR_MEM_LIMIT_EXCEEDED = (ORTE_ERR_BASE - 36),
ORTE_ERR_HEARTBEAT_LOST = (ORTE_ERR_BASE - 37),
ORTE_ERR_PROC_STALLED = (ORTE_ERR_BASE - 38)
ORTE_ERR_PROC_STALLED = (ORTE_ERR_BASE - 38),
ORTE_ERR_COMM_DISABLED = (ORTE_ERR_BASE - 39)
};
#define ORTE_ERR_MAX (ORTE_ERR_BASE - 100)

Просмотреть файл

@ -40,10 +40,14 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
return ORTE_SUCCESS;
}
if (rcv_thread) {
if (rcv_thread && !orte_rmcast_base.recv_ctl.running) {
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the update to target the rmcast event base since the
* recv thread will be progressing that event base
*/
orte_rmcast_base.recv_ctl.evbase = orte_rmcast_base.event_base;
/* start the thread */
orte_rmcast_base.recv_thread.t_run = rcv_progress_thread;
if (ORTE_SUCCESS != (rc = opal_thread_start(&orte_rmcast_base.recv_thread))) {
@ -56,7 +60,7 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
if (processing_thread) {
if (processing_thread && !orte_rmcast_base.recv_process_ctl.running) {
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
"%s rmcast:base: starting recv processing thread",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));

Просмотреть файл

@ -137,6 +137,17 @@ typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t
typedef int (*orte_rmcast_base_module_query_channel_fn_t)(orte_rmcast_channel_t *output,
orte_rmcast_channel_t *input);
/* disable comm - includes terminating all threads. This is
* required for clean shutdown of codes that use this framework
* as otherwise rmcast can segfault if it is executing a cbfunc
* for a recvd message and the receiver goes away!
*/
typedef void (*orte_rmcast_base_module_disable_comm_fn_t)(void);
/* reverses the effect */
typedef void (*orte_rmcast_base_module_enable_comm_fn_t)(void);
/*
* rmcast component
*/
@ -169,6 +180,8 @@ struct orte_rmcast_base_module_t {
orte_rmcast_base_module_open_channel_fn_t open_channel;
orte_rmcast_base_module_close_channel_fn_t close_channel;
orte_rmcast_base_module_query_channel_fn_t query_channel;
orte_rmcast_base_module_enable_comm_fn_t enable_comm;
orte_rmcast_base_module_disable_comm_fn_t disable_comm;
};
/** Convienence typedef */
typedef struct orte_rmcast_base_module_t orte_rmcast_module_t;

Просмотреть файл

@ -44,6 +44,7 @@
/* LOCAL DATA */
static bool init_completed = false;
static orte_job_t *daemons=NULL;
static bool comm_enabled = false;
/* LOCAL FUNCTIONS */
static void recv_handler(int status, orte_process_name_t* sender,
@ -109,6 +110,10 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel,
static int open_channel(orte_rmcast_channel_t channel, char *name,
char *network, int port, char *interface, uint8_t direction);
static void enable_comm(void);
static void disable_comm(void);
/* Define the module */
orte_rmcast_module_t orte_rmcast_tcp_module = {
@ -125,7 +130,9 @@ orte_rmcast_module_t orte_rmcast_tcp_module = {
orte_rmcast_base_cancel_recv,
open_channel,
orte_rmcast_base_close_channel,
orte_rmcast_base_query
orte_rmcast_base_query,
enable_comm,
disable_comm
};
/* during init, we setup two channels for both xmit and recv:
@ -267,6 +274,7 @@ static int init(void)
return rc;
}
comm_enabled = true;
return ORTE_SUCCESS;
}
@ -276,6 +284,9 @@ static void finalize(void)
"%s rmcast:tcp: finalize called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* stop the chatter */
comm_enabled = false;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST);
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MULTICAST_RELAY);
@ -284,9 +295,22 @@ static void finalize(void)
/* stop the processing thread */
orte_rmcast_base_stop_threads();
init_completed = false;
return;
}
static void enable_comm(void)
{
orte_rmcast_base_start_threads(false, true);
comm_enabled = true;
}
static void disable_comm(void)
{
comm_enabled = false;
orte_rmcast_base_stop_threads();
}
/* internal blocking send support */
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
@ -322,6 +346,10 @@ static int send_data(rmcast_base_send_t *snd,
opal_buffer_t *buf;
rmcast_base_channel_t *ch;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: send of %d %s"
" called on multicast channel %d",
@ -452,6 +480,10 @@ static int tcp_send(orte_rmcast_channel_t channel,
rmcast_base_send_t snd;
int ret;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
@ -482,6 +514,10 @@ static int tcp_send_nb(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.iovec_array = msg;
@ -507,6 +543,10 @@ static int tcp_send_buffer(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
@ -536,6 +576,10 @@ static int tcp_send_buffer_nb(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
OBJ_CONSTRUCT(&snd, rmcast_base_send_t);
snd.buf = buf;
@ -563,6 +607,10 @@ static int tcp_recv(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -640,6 +688,10 @@ static int tcp_recv_buffer(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: recv_buffer called on multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
@ -781,6 +833,10 @@ static void recv_handler(int status, orte_process_name_t* sender,
uint8_t *data;
int32_t siz;
if (!comm_enabled) {
return;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp recvd multicast msg",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));

Просмотреть файл

@ -41,6 +41,7 @@
/* LOCAL DATA */
static bool init_completed = false;
static opal_pointer_array_t msg_log;
static bool comm_enabled = false;
/* LOCAL FUNCTIONS */
static void recv_handler(int sd, short flags, void* user);
@ -103,6 +104,10 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
static int open_channel(orte_rmcast_channel_t channel, char *name,
char *network, int port, char *interface, uint8_t direction);
static void enable_comm(void);
static void disable_comm(void);
/* Define the module */
orte_rmcast_module_t orte_rmcast_udp_module = {
@ -119,7 +124,9 @@ orte_rmcast_module_t orte_rmcast_udp_module = {
orte_rmcast_base_cancel_recv,
open_channel,
orte_rmcast_base_close_channel,
orte_rmcast_base_query
orte_rmcast_base_query,
enable_comm,
disable_comm
};
/* during init, we setup two channels for both xmit and recv:
@ -234,7 +241,7 @@ static int init(void)
}
init_completed = true;
comm_enabled = true;
return ORTE_SUCCESS;
}
@ -246,6 +253,9 @@ static void finalize(void)
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:udp: finalize called",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* stop the chatter */
comm_enabled = false;
/* stop the threads */
orte_rmcast_base_stop_threads();
@ -260,6 +270,18 @@ static void finalize(void)
return;
}
static void enable_comm(void)
{
orte_rmcast_base_start_threads(true, true);
comm_enabled = true;
}
static void disable_comm(void)
{
comm_enabled = false;
orte_rmcast_base_stop_threads();
}
/* internal blocking send support */
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
@ -290,6 +312,10 @@ static int udp_send(orte_rmcast_channel_t channel,
rmcast_base_send_t *snd;
int ret;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->iovec_array = msg;
@ -323,6 +349,10 @@ static int udp_send_nb(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t *snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->iovec_array = msg;
@ -346,6 +376,10 @@ static int udp_send_buffer(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t *snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->buf = buf;
@ -378,6 +412,10 @@ static int udp_send_buffer_nb(orte_rmcast_channel_t channel,
int ret;
rmcast_base_send_t *snd;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
/* queue it to be sent - preserves order! */
snd = OBJ_NEW(rmcast_base_send_t);
snd->buf = buf;
@ -404,6 +442,10 @@ static int udp_recv(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
if (ORTE_RMCAST_GROUP_INPUT_CHANNEL == channel) {
chan = orte_rmcast_base.my_input_channel->channel;
} else if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) {
@ -480,6 +522,10 @@ static int udp_recv_buffer(orte_process_name_t *name,
int ret;
orte_rmcast_channel_t chan;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp: recv_buffer called on multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
@ -680,6 +726,11 @@ static void recv_handler(int sd, short flags, void* cbdata)
data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size * sizeof(uint8_t));
siz = read(sd, data, orte_rmcast_udp_sndbuf_size);
if (!comm_enabled) {
free(data);
return;
}
if (siz <= 0) {
/* this shouldn't happen - report the errno */
opal_output(0, "%s Error on multicast recv socket event: %s(%d)",
@ -901,6 +952,10 @@ static int send_data(rmcast_base_send_t *snd, orte_rmcast_channel_t channel)
opal_buffer_t *buf;
rmcast_base_channel_t *ch;
if (!comm_enabled) {
return ORTE_ERR_COMM_DISABLED;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s transmitting data for channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));

Просмотреть файл

@ -138,7 +138,9 @@ const char *orte_err2str(int errnum)
case ORTE_ERR_UNRECOVERABLE:
retval = "Unrecoverable error";
break;
case ORTE_ERR_COMM_DISABLED:
retval = "Communications have been disabled";
break;
default:
if (orte_report_silent_errors) {