From 8c5f442ee0ec7dfdc13b389bc4a5475f39c47810 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 4 May 2010 02:38:37 +0000 Subject: [PATCH] Fix some bugs in the spread rmcast component This commit was SVN r23086. --- orte/mca/rmcast/spread/rmcast_spread.c | 294 +++++++++++++------------ 1 file changed, 156 insertions(+), 138 deletions(-) diff --git a/orte/mca/rmcast/spread/rmcast_spread.c b/orte/mca/rmcast/spread/rmcast_spread.c index 4bdee7124e..dabc7dcf65 100644 --- a/orte/mca/rmcast/spread/rmcast_spread.c +++ b/orte/mca/rmcast/spread/rmcast_spread.c @@ -53,6 +53,7 @@ static opal_list_t channels; static bool init_completed = false; static orte_rmcast_channel_t next_channel; static opal_pointer_array_t msg_log; +static char groups[256][MAX_GROUP_NAME]; static char private_group[MAX_GROUP_NAME]; static mailbox Mbox; @@ -233,18 +234,18 @@ static int init(void) if (init_completed) { return ORTE_SUCCESS; } - + if ((rc = SP_connect(SPREAD_NAME, getlogin(), 0, 1, &Mbox, private_group)) < 0) { - char error_string[1024]; - - SP_error2str(rc, error_string); - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "rmcast:spread: init SP_connect failed %s ", - error_string)); - rc = ORTE_ERROR; - return rc; + char error_string[1024]; + + SP_error2str(rc, error_string); + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "rmcast:spread: init SP_connect failed %s ", + error_string)); + rc = ORTE_ERROR; + return rc; } - + init_completed = true; OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:spread: init called", @@ -320,24 +321,10 @@ static void finalize(void) OBJ_DESTRUCT(&lock); - + return; } -static void iovec2scatter(scatter *out, struct iovec *msg, int count) -{ - int i; - - for (i=0;ielements[i].buf = msg[i].iov_base; - out->elements[i].len = msg[i].iov_len; - } - - out->num_elements = count; -} - - static void internal_snd_cb(int status, orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, @@ -360,16 +347,16 @@ static rmcast_base_channel_t *get_chan_from_name(char *name) { rmcast_base_channel_t *ret = NULL; opal_list_item_t *item; - + for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { ret = (rmcast_base_channel_t*)item; if (!strcasecmp(name, ret->name)) { - return ret; + return ret; } } - + return NULL; } @@ -384,7 +371,7 @@ static int queue_xmit(rmcast_base_send_t *snd, "%s rmcast:spread: queue_xmit to %d:%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); - + /* if we were asked to send this on our group output * channel, substitute it */ @@ -437,8 +424,8 @@ process: static int spread_send(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - struct iovec *msg, int count) + orte_rmcast_tag_t tag, + struct iovec *msg, int count) { rmcast_base_send_t *snd; int ret = ORTE_ERROR; @@ -459,15 +446,15 @@ static int spread_send(orte_rmcast_channel_t channel, /* now wait for the send to complete */ ORTE_PROGRESSED_WAIT(snd->send_complete, 0, 1); - + return ORTE_SUCCESS; } static int spread_send_nb(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - struct iovec *msg, int count, - orte_rmcast_callback_fn_t cbfunc, - void *cbdata) + orte_rmcast_tag_t tag, + struct iovec *msg, int count, + orte_rmcast_callback_fn_t cbfunc, + void *cbdata) { int ret; rmcast_base_send_t *snd; @@ -489,10 +476,10 @@ static int spread_send_nb(orte_rmcast_channel_t channel, } static int spread_send_buffer(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - opal_buffer_t *buf) + orte_rmcast_tag_t tag, + opal_buffer_t *buf) { - int ret; + int ret; rmcast_base_send_t *snd; /* queue it to be sent - preserves order! */ @@ -502,7 +489,7 @@ static int spread_send_buffer(orte_rmcast_channel_t channel, snd->cbfunc_buffer = internal_snd_buf_cb; snd->cbdata = snd; snd->send_complete = false; - + if (ORTE_SUCCESS != (ret = queue_xmit(snd, channel, tag))) { ORTE_ERROR_LOG(ret); OBJ_RELEASE(snd); @@ -515,10 +502,10 @@ static int spread_send_buffer(orte_rmcast_channel_t channel, } static int spread_send_buffer_nb(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - opal_buffer_t *buf, - orte_rmcast_callback_buffer_fn_t cbfunc, - void *cbdata) + orte_rmcast_tag_t tag, + opal_buffer_t *buf, + orte_rmcast_callback_buffer_fn_t cbfunc, + void *cbdata) { int ret; rmcast_base_send_t *snd; @@ -604,14 +591,14 @@ static int queue_recv(rmcast_base_recv_t *recvptr, OPAL_THREAD_LOCK(&lock); opal_list_append(&recvs, &recvptr->item); OPAL_THREAD_UNLOCK(&lock); - + return ORTE_SUCCESS; } static int spread_recv(orte_process_name_t *name, - orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - struct iovec **msg, int *count) + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + struct iovec **msg, int *count) { rmcast_base_recv_t *recvptr; int ret; @@ -643,14 +630,14 @@ static int spread_recv(orte_process_name_t *name, opal_list_remove_item(&recvs, &recvptr->item); OPAL_THREAD_UNLOCK(&lock); OBJ_RELEASE(recvptr); - + return ORTE_SUCCESS; } static int spread_recv_nb(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - orte_rmcast_flag_t flags, - orte_rmcast_callback_fn_t cbfunc, void *cbdata) + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_fn_t cbfunc, void *cbdata) { rmcast_base_recv_t *recvptr; int ret; @@ -676,14 +663,14 @@ static int spread_recv_nb(orte_rmcast_channel_t channel, OBJ_RELEASE(recvptr); return ret; } - + return ORTE_SUCCESS; } static int spread_recv_buffer(orte_process_name_t *name, - orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - opal_buffer_t *buf) + orte_rmcast_channel_t channel, + orte_rmcast_tag_t tag, + opal_buffer_t *buf) { rmcast_base_recv_t *recvptr; int ret; @@ -691,7 +678,7 @@ static int spread_recv_buffer(orte_process_name_t *name, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:spread: recv_buffer called on multicast channel %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); - + recvptr = OBJ_NEW(rmcast_base_recv_t); recvptr->channel = channel; recvptr->tag = tag; @@ -722,13 +709,13 @@ cleanup: OBJ_RELEASE(recvptr); return ret; - + } static int spread_recv_buffer_nb(orte_rmcast_channel_t channel, - orte_rmcast_tag_t tag, - orte_rmcast_flag_t flags, - orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata) + orte_rmcast_tag_t tag, + orte_rmcast_flag_t flags, + orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata) { rmcast_base_recv_t *recvptr; int ret; @@ -797,7 +784,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, OPAL_OUTPUT_VERBOSE((7, orte_rmcast_base.rmcast_output, "%s open_channel: searching for %s:%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), name, *channel)); - + chan = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); @@ -808,10 +795,10 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, "%s open_channel: channel %s:%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nchan->name, *channel)); - + if (nchan->channel == *channel || 0 == strcasecmp(nchan->name, name)) { - + /* check the channel, if one was given */ if (ORTE_RMCAST_INVALID_CHANNEL != *channel && nchan->channel != *channel) { @@ -848,7 +835,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, chan->channel = next_channel++; *channel = chan->channel; } - + OPAL_THREAD_LOCK(&lock); opal_list_append(&channels, &chan->item); OPAL_THREAD_UNLOCK(&lock); @@ -859,12 +846,12 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, chan->name, chan->channel, (ORTE_RMCAST_RECV & direction) ? " RECV" : " ", (ORTE_RMCAST_XMIT & direction) ? " XMIT" : " ")); - + if (ORTE_SUCCESS != (rc = setup_channel(chan, direction, Mbox))) { ORTE_ERROR_LOG(rc); return rc; } - + return ORTE_SUCCESS; } @@ -872,7 +859,7 @@ static int close_channel(orte_rmcast_channel_t channel) { opal_list_item_t *item; rmcast_base_channel_t *chan; - + OPAL_THREAD_LOCK(&lock); for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); @@ -907,12 +894,12 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction, mailbox chan->channel)); return ORTE_SUCCESS; } - + SP_join( Mbox, chan->name); - + if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) { - + chan->xmit = Mbox; chan->send_data = (uint8_t*)malloc(mca_rmcast_spread_component.max_msg_size); /* setup the event to xmit messages, but don't activate it */ @@ -920,7 +907,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction, mailbox } if (0 > chan->recv && ORTE_RMCAST_RECV & direction) { - + chan->recv = Mbox; /* setup an event to catch messages */ @@ -931,7 +918,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction, mailbox opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); opal_event_add(&chan->recv_ev, 0); } - + return ORTE_SUCCESS; } @@ -953,14 +940,14 @@ static void xmit_data(int sd, short flags, void* send_req) OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s transmitting data for channel %d(%s)", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), chan->channel, - chan->name)); + chan->name)); while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) { snd = (rmcast_base_send_t*)item; /* setup a tmp buffer for a working area */ OBJ_CONSTRUCT(&buf, opal_buffer_t); - + /* start the send data area with our header */ ORTE_MULTICAST_MESSAGE_HDR_HTON(chan->send_data, snd->tag, chan->seq_num); @@ -1049,7 +1036,7 @@ static void xmit_data(int sd, short flags, void* send_req) /* done with the working buf */ OBJ_DESTRUCT(&buf); - + /* add the payload, up to the limit */ ORTE_MULTICAST_LOAD_MESSAGE(chan->send_data, bytes, sz, mca_rmcast_spread_component.max_msg_size, @@ -1074,7 +1061,7 @@ static void xmit_data(int sd, short flags, void* send_req) "%s rmcast:spread multicasting %d bytes to group %s tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), outbound, chan->name, (int)snd->tag)); - + if (outbound != (rc = SP_multicast(chan->xmit, RELIABLE_MESS, chan->name, 0, outbound, (const char *)chan->send_data))) { /* didn't get the message out */ opal_output(0, "%s failed to send message to spread group %s", @@ -1096,10 +1083,10 @@ static void xmit_data(int sd, short flags, void* send_req) snd->iovec_array, snd->iovec_count, snd->cbdata); } } - + /* roll to next message sequence number */ ORTE_MULTICAST_NEXT_SEQUENCE_NUM(chan->seq_num); -CLEANUP: + CLEANUP: /* cleanup */ OBJ_RELEASE(item); } @@ -1107,7 +1094,7 @@ CLEANUP: /* cleanup */ opal_event_del(&chan->send_ev); chan->sends_in_progress = false; - + OPAL_THREAD_UNLOCK(&chan->send_lock); } @@ -1130,7 +1117,7 @@ static void process_recv(int fd, short event, void *cbdata) int rc; orte_rmcast_seq_t recvd_seq_num; rmcast_recv_log_t *log, *lg; - + /* extract the header */ ORTE_MULTICAST_MESSAGE_HDR_NTOH(msg->data, &name, tag, recvd_seq_num); @@ -1399,70 +1386,101 @@ cleanup: static void recv_handler(int sd, short flags, void* cbdata) { uint8_t *data; - ssize_t sz; + int sz; rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata; service srvc; - char sender[MAX_GROUP_NAME], groups[2][MAX_GROUP_NAME]; - int num_groups; + char sender[MAX_GROUP_NAME]; + int num_groups, size_data; int16 mess_type; int endian_mismatch; /* Read all available spread messages. */ while (SP_poll(sd) > 0) { - - data = (uint8_t*)malloc(mca_rmcast_spread_component.max_msg_size * sizeof(uint8_t)); - - srvc = 0; - sz = SP_receive(sd, &srvc, sender, 2, &num_groups, groups, &mess_type, &endian_mismatch, mca_rmcast_spread_component.max_msg_size, (char *)data); - - if (sz <= 0) { - char error_string[1024]; - - SP_error2str(sz, error_string); - /* this shouldn't happen - report the errno */ - opal_output(0, "%s Error on multicast recv spread event: %s(%d:%d:%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), error_string, sz, num_groups, endian_mismatch); - return; - } - - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:spread recvd %d bytes from channel %d(%s)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)sz, num_groups, groups[0])); - - if (Is_regular_mess(srvc)) { - int i; - - for (i=0;ichannel, chan->name)); - - /* clear the way for the next message */ - ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv); - } else { - /* - * We've just received a message on a channel whose name - * we don't recognize, so log a message and drop the - * message. - */ - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:spread recvd %d bytes from unknown channel named (%s)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)sz, groups[i])); - free(data); - } - } - } else { - /* - * We ignore all membership change messages for now. - */ - free(data); - - } + + size_data = mca_rmcast_spread_component.max_msg_size; + data = (uint8_t*)malloc(size_data * sizeof(uint8_t)); + + srvc = 0; + do { + sz = SP_receive(sd, &srvc, sender, 256, &num_groups, groups, &mess_type, &endian_mismatch, size_data, (char *)data); + + if (sz < 0) { + char error_string[1024]; + + SP_error2str(sz, error_string); + /* this shouldn't happen - report the errno */ + opal_output(0, "%s Error on multicast recv spread event: %s(%d:%d:%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), error_string, sz, num_groups, endian_mismatch); + switch (sz) { + case GROUPS_TOO_SHORT: + /* + * Just error out + */ + ORTE_ERROR_LOG(ORTE_ERR_TEMP_OUT_OF_RESOURCE); + exit(-1); + break; + case BUFFER_TOO_SHORT: + /* + * Size of buffer required is "-endian_mismatch" so we + * free the old data array and malloc a new one of the + * right size (-endian_mismatch)*sizeof(uint8_t). + */ + size_data = -endian_mismatch; + free(data); + data = (uint8_t*)malloc(size_data * sizeof(uint8_t)); + if (!data) { + opal_output(0," %s Error in allocating data buffer for incoming message (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),size_data); + exit (-1); + } + break; + case ILLEGAL_SESSION: + case ILLEGAL_MESSAGE: + case CONNECTION_CLOSED: + exit(-1); + } + } + + } while (sz < 0); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:spread recvd %d bytes from channel %d(%s)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)sz, num_groups, groups[num_groups])); + + if (Is_regular_mess(srvc)) { + int i; + + for (i=0;ichannel, chan->name)); + + /* clear the way for the next message */ + ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv); + } else { + /* + * We've just received a message on a channel whose name + * we don't recognize, so log a message and drop the + * message. + */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:spread recvd %d bytes from unknown channel named (%s)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)sz, groups[i])); + free(data); + } + } + } else { + /* + * We ignore all membership change messages for now. + */ + free(data); + + } } return; }