Fix some bugs in the spread rmcast component
This commit was SVN r23086.
Этот коммит содержится в:
родитель
8e7faf9119
Коммит
8c5f442ee0
@ -53,6 +53,7 @@ static opal_list_t channels;
|
|||||||
static bool init_completed = false;
|
static bool init_completed = false;
|
||||||
static orte_rmcast_channel_t next_channel;
|
static orte_rmcast_channel_t next_channel;
|
||||||
static opal_pointer_array_t msg_log;
|
static opal_pointer_array_t msg_log;
|
||||||
|
static char groups[256][MAX_GROUP_NAME];
|
||||||
|
|
||||||
static char private_group[MAX_GROUP_NAME];
|
static char private_group[MAX_GROUP_NAME];
|
||||||
static mailbox Mbox;
|
static mailbox Mbox;
|
||||||
@ -235,14 +236,14 @@ static int init(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ((rc = SP_connect(SPREAD_NAME, getlogin(), 0, 1, &Mbox, private_group)) < 0) {
|
if ((rc = SP_connect(SPREAD_NAME, getlogin(), 0, 1, &Mbox, private_group)) < 0) {
|
||||||
char error_string[1024];
|
char error_string[1024];
|
||||||
|
|
||||||
SP_error2str(rc, error_string);
|
SP_error2str(rc, error_string);
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
"rmcast:spread: init SP_connect failed %s ",
|
"rmcast:spread: init SP_connect failed %s ",
|
||||||
error_string));
|
error_string));
|
||||||
rc = ORTE_ERROR;
|
rc = ORTE_ERROR;
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
init_completed = true;
|
init_completed = true;
|
||||||
@ -324,20 +325,6 @@ static void finalize(void)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void iovec2scatter(scatter *out, struct iovec *msg, int count)
|
|
||||||
{
|
|
||||||
int i;
|
|
||||||
|
|
||||||
for (i=0;i<count;i++) {
|
|
||||||
|
|
||||||
out->elements[i].buf = msg[i].iov_base;
|
|
||||||
out->elements[i].len = msg[i].iov_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
out->num_elements = count;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void internal_snd_cb(int status,
|
static void internal_snd_cb(int status,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
@ -366,7 +353,7 @@ static rmcast_base_channel_t *get_chan_from_name(char *name)
|
|||||||
item = opal_list_get_next(item)) {
|
item = opal_list_get_next(item)) {
|
||||||
ret = (rmcast_base_channel_t*)item;
|
ret = (rmcast_base_channel_t*)item;
|
||||||
if (!strcasecmp(name, ret->name)) {
|
if (!strcasecmp(name, ret->name)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -437,8 +424,8 @@ process:
|
|||||||
|
|
||||||
|
|
||||||
static int spread_send(orte_rmcast_channel_t channel,
|
static int spread_send(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
struct iovec *msg, int count)
|
struct iovec *msg, int count)
|
||||||
{
|
{
|
||||||
rmcast_base_send_t *snd;
|
rmcast_base_send_t *snd;
|
||||||
int ret = ORTE_ERROR;
|
int ret = ORTE_ERROR;
|
||||||
@ -464,10 +451,10 @@ static int spread_send(orte_rmcast_channel_t channel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_send_nb(orte_rmcast_channel_t channel,
|
static int spread_send_nb(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
struct iovec *msg, int count,
|
struct iovec *msg, int count,
|
||||||
orte_rmcast_callback_fn_t cbfunc,
|
orte_rmcast_callback_fn_t cbfunc,
|
||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rmcast_base_send_t *snd;
|
rmcast_base_send_t *snd;
|
||||||
@ -489,10 +476,10 @@ static int spread_send_nb(orte_rmcast_channel_t channel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_send_buffer(orte_rmcast_channel_t channel,
|
static int spread_send_buffer(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
opal_buffer_t *buf)
|
opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rmcast_base_send_t *snd;
|
rmcast_base_send_t *snd;
|
||||||
|
|
||||||
/* queue it to be sent - preserves order! */
|
/* queue it to be sent - preserves order! */
|
||||||
@ -515,10 +502,10 @@ static int spread_send_buffer(orte_rmcast_channel_t channel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_send_buffer_nb(orte_rmcast_channel_t channel,
|
static int spread_send_buffer_nb(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
opal_buffer_t *buf,
|
opal_buffer_t *buf,
|
||||||
orte_rmcast_callback_buffer_fn_t cbfunc,
|
orte_rmcast_callback_buffer_fn_t cbfunc,
|
||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
rmcast_base_send_t *snd;
|
rmcast_base_send_t *snd;
|
||||||
@ -609,9 +596,9 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_recv(orte_process_name_t *name,
|
static int spread_recv(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
struct iovec **msg, int *count)
|
struct iovec **msg, int *count)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
int ret;
|
int ret;
|
||||||
@ -648,9 +635,9 @@ static int spread_recv(orte_process_name_t *name,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_recv_nb(orte_rmcast_channel_t channel,
|
static int spread_recv_nb(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_rmcast_flag_t flags,
|
orte_rmcast_flag_t flags,
|
||||||
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
|
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
int ret;
|
int ret;
|
||||||
@ -681,9 +668,9 @@ static int spread_recv_nb(orte_rmcast_channel_t channel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_recv_buffer(orte_process_name_t *name,
|
static int spread_recv_buffer(orte_process_name_t *name,
|
||||||
orte_rmcast_channel_t channel,
|
orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
opal_buffer_t *buf)
|
opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
int ret;
|
int ret;
|
||||||
@ -726,9 +713,9 @@ cleanup:
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int spread_recv_buffer_nb(orte_rmcast_channel_t channel,
|
static int spread_recv_buffer_nb(orte_rmcast_channel_t channel,
|
||||||
orte_rmcast_tag_t tag,
|
orte_rmcast_tag_t tag,
|
||||||
orte_rmcast_flag_t flags,
|
orte_rmcast_flag_t flags,
|
||||||
orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata)
|
orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata)
|
||||||
{
|
{
|
||||||
rmcast_base_recv_t *recvptr;
|
rmcast_base_recv_t *recvptr;
|
||||||
int ret;
|
int ret;
|
||||||
@ -953,7 +940,7 @@ static void xmit_data(int sd, short flags, void* send_req)
|
|||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
"%s transmitting data for channel %d(%s)",
|
"%s transmitting data for channel %d(%s)",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), chan->channel,
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), chan->channel,
|
||||||
chan->name));
|
chan->name));
|
||||||
|
|
||||||
while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) {
|
while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) {
|
||||||
snd = (rmcast_base_send_t*)item;
|
snd = (rmcast_base_send_t*)item;
|
||||||
@ -1099,7 +1086,7 @@ static void xmit_data(int sd, short flags, void* send_req)
|
|||||||
|
|
||||||
/* roll to next message sequence number */
|
/* roll to next message sequence number */
|
||||||
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num);
|
ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num);
|
||||||
CLEANUP:
|
CLEANUP:
|
||||||
/* cleanup */
|
/* cleanup */
|
||||||
OBJ_RELEASE(item);
|
OBJ_RELEASE(item);
|
||||||
}
|
}
|
||||||
@ -1399,70 +1386,101 @@ cleanup:
|
|||||||
static void recv_handler(int sd, short flags, void* cbdata)
|
static void recv_handler(int sd, short flags, void* cbdata)
|
||||||
{
|
{
|
||||||
uint8_t *data;
|
uint8_t *data;
|
||||||
ssize_t sz;
|
int sz;
|
||||||
rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata;
|
rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata;
|
||||||
service srvc;
|
service srvc;
|
||||||
char sender[MAX_GROUP_NAME], groups[2][MAX_GROUP_NAME];
|
char sender[MAX_GROUP_NAME];
|
||||||
int num_groups;
|
int num_groups, size_data;
|
||||||
int16 mess_type;
|
int16 mess_type;
|
||||||
int endian_mismatch;
|
int endian_mismatch;
|
||||||
|
|
||||||
/* Read all available spread messages. */
|
/* Read all available spread messages. */
|
||||||
while (SP_poll(sd) > 0) {
|
while (SP_poll(sd) > 0) {
|
||||||
|
|
||||||
data = (uint8_t*)malloc(mca_rmcast_spread_component.max_msg_size * sizeof(uint8_t));
|
size_data = mca_rmcast_spread_component.max_msg_size;
|
||||||
|
data = (uint8_t*)malloc(size_data * sizeof(uint8_t));
|
||||||
|
|
||||||
srvc = 0;
|
srvc = 0;
|
||||||
sz = SP_receive(sd, &srvc, sender, 2, &num_groups, groups, &mess_type, &endian_mismatch, mca_rmcast_spread_component.max_msg_size, (char *)data);
|
do {
|
||||||
|
sz = SP_receive(sd, &srvc, sender, 256, &num_groups, groups, &mess_type, &endian_mismatch, size_data, (char *)data);
|
||||||
|
|
||||||
if (sz <= 0) {
|
if (sz < 0) {
|
||||||
char error_string[1024];
|
char error_string[1024];
|
||||||
|
|
||||||
SP_error2str(sz, error_string);
|
SP_error2str(sz, error_string);
|
||||||
/* this shouldn't happen - report the errno */
|
/* this shouldn't happen - report the errno */
|
||||||
opal_output(0, "%s Error on multicast recv spread event: %s(%d:%d:%d)",
|
opal_output(0, "%s Error on multicast recv spread event: %s(%d:%d:%d)",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), error_string, sz, num_groups, endian_mismatch);
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), error_string, sz, num_groups, endian_mismatch);
|
||||||
return;
|
switch (sz) {
|
||||||
}
|
case GROUPS_TOO_SHORT:
|
||||||
|
/*
|
||||||
|
* Just error out
|
||||||
|
*/
|
||||||
|
ORTE_ERROR_LOG(ORTE_ERR_TEMP_OUT_OF_RESOURCE);
|
||||||
|
exit(-1);
|
||||||
|
break;
|
||||||
|
case BUFFER_TOO_SHORT:
|
||||||
|
/*
|
||||||
|
* Size of buffer required is "-endian_mismatch" so we
|
||||||
|
* free the old data array and malloc a new one of the
|
||||||
|
* right size (-endian_mismatch)*sizeof(uint8_t).
|
||||||
|
*/
|
||||||
|
size_data = -endian_mismatch;
|
||||||
|
free(data);
|
||||||
|
data = (uint8_t*)malloc(size_data * sizeof(uint8_t));
|
||||||
|
if (!data) {
|
||||||
|
opal_output(0," %s Error in allocating data buffer for incoming message (%d)\n",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),size_data);
|
||||||
|
exit (-1);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case ILLEGAL_SESSION:
|
||||||
|
case ILLEGAL_MESSAGE:
|
||||||
|
case CONNECTION_CLOSED:
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
} while (sz < 0);
|
||||||
"%s rmcast:spread recvd %d bytes from channel %d(%s)",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(int)sz, num_groups, groups[0]));
|
|
||||||
|
|
||||||
if (Is_regular_mess(srvc)) {
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
int i;
|
"%s rmcast:spread recvd %d bytes from channel %d(%s)",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(int)sz, num_groups, groups[num_groups]));
|
||||||
|
|
||||||
for (i=0;i<num_groups;i++) {
|
if (Is_regular_mess(srvc)) {
|
||||||
chan = get_chan_from_name(groups[i]);
|
int i;
|
||||||
if (chan) {
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
|
||||||
"%s rmcast:spread recvd %d bytes from channel %d(%s)",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(int)sz, (int)chan->channel, chan->name));
|
|
||||||
|
|
||||||
/* clear the way for the next message */
|
for (i=0;i<num_groups;i++) {
|
||||||
ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv);
|
chan = get_chan_from_name(groups[i]);
|
||||||
} else {
|
if (chan) {
|
||||||
/*
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
* We've just received a message on a channel whose name
|
"%s rmcast:spread recvd %d bytes from channel %d(%s)",
|
||||||
* we don't recognize, so log a message and drop the
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
* message.
|
(int)sz, (int)chan->channel, chan->name));
|
||||||
*/
|
|
||||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
|
||||||
"%s rmcast:spread recvd %d bytes from unknown channel named (%s)",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
(int)sz, groups[i]));
|
|
||||||
free(data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/*
|
|
||||||
* We ignore all membership change messages for now.
|
|
||||||
*/
|
|
||||||
free(data);
|
|
||||||
|
|
||||||
}
|
/* clear the way for the next message */
|
||||||
|
ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv);
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* We've just received a message on a channel whose name
|
||||||
|
* we don't recognize, so log a message and drop the
|
||||||
|
* message.
|
||||||
|
*/
|
||||||
|
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||||
|
"%s rmcast:spread recvd %d bytes from unknown channel named (%s)",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
(int)sz, groups[i]));
|
||||||
|
free(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/*
|
||||||
|
* We ignore all membership change messages for now.
|
||||||
|
*/
|
||||||
|
free(data);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user