1
1

Update the multicast test. Some cleanups to the basic rmcast module

This commit was SVN r22257.
Этот коммит содержится в:
Ralph Castain 2009-12-03 04:30:58 +00:00
родитель 66efa05a53
Коммит 93ebed48b1
2 изменённых файлов: 131 добавлений и 130 удалений

Просмотреть файл

@ -153,8 +153,8 @@ orte_rmcast_module_t orte_rmcast_basic_module = {
*/
static int init(void)
{
rmcast_base_channel_t *chan;
int rc;
orte_rmcast_channel_t channel;
if (init_completed) {
return ORTE_SUCCESS;
@ -171,47 +171,34 @@ static int init(void)
next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS;
/* setup the respective public address channel */
chan = OBJ_NEW(rmcast_base_channel_t);
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) {
chan->name = strdup("system");
chan->channel = ORTE_RMCAST_SYS_CHANNEL;
chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_SYS_CHANNEL;
chan->port = orte_rmcast_base.ports[ORTE_RMCAST_SYS_CHANNEL];
chan->interface = orte_rmcast_base.interface;
channel = ORTE_RMCAST_SYS_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "system",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else if (ORTE_PROC_IS_APP) {
chan->name = strdup("app-announce");
chan->channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL;
chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_APP_PUBLIC_CHANNEL;
chan->port = orte_rmcast_base.ports[ORTE_RMCAST_APP_PUBLIC_CHANNEL];
chan->interface = orte_rmcast_base.interface;
channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL;
if (ORTE_SUCCESS != (rc = open_channel(&channel, "app-announce",
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
opal_output(0, "rmcast:basic:init - unknown process type");
return ORTE_ERR_SILENT;
}
OPAL_THREAD_LOCK(&lock);
opal_list_append(&channels, &chan->item);
OPAL_THREAD_UNLOCK(&lock);
if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* finally, if we are an app, setup our grp channel, if one was given */
if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) {
chan = OBJ_NEW(rmcast_base_channel_t);
chan->name = strdup(orte_rmcast_base.my_group_name);
chan->channel = orte_rmcast_base.my_group_number;
chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number;
chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number];
chan->interface = orte_rmcast_base.interface;
OPAL_THREAD_LOCK(&lock);
opal_list_append(&channels, &chan->item);
OPAL_THREAD_UNLOCK(&lock);
if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_XMIT))) {
channel = orte_rmcast_base.my_group_number;
if (ORTE_SUCCESS != (rc = open_channel(&channel, orte_rmcast_base.my_group_name,
NULL, -1, NULL, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
my_group_channel = chan;
my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&channels);
}
return ORTE_SUCCESS;
@ -432,6 +419,7 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
{
opal_list_item_t *item;
rmcast_base_channel_t *ch, *chptr;
rmcast_base_recv_t *rptr;
/* find the channel */
ch = NULL;
@ -450,8 +438,8 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
"%s rmcast:basic: queue_recv called on multicast channel %03d.%03d.%03d.%03d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag));
if (!blocking) {
/* do we already have a recv for this channel/tag/cbfunc? */
@ -459,15 +447,15 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
for (item = opal_list_get_first(&recvs);
item != opal_list_get_end(&recvs);
item = opal_list_get_next(item)) {
recvptr = (rmcast_base_recv_t*)item;
if (channel == recvptr->channel &&
tag == recvptr->tag &&
((NULL != cbfunc_iovec && cbfunc_iovec == recvptr->cbfunc_iovec) ||
(NULL != cbfunc_buffer && cbfunc_buffer == recvptr->cbfunc_buffer))) {
rptr = (rmcast_base_recv_t*)item;
if (channel == rptr->channel &&
tag == rptr->tag &&
((NULL != cbfunc_iovec && cbfunc_iovec == rptr->cbfunc_iovec) ||
(NULL != cbfunc_buffer && cbfunc_buffer == rptr->cbfunc_buffer))) {
/* matching recv in place */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
"%s rmcast:basic: matching recv already active on multicast channel %03d.%03d.%03d.%03d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag));
OPAL_THREAD_UNLOCK(&lock);
return ORTE_EXISTS;
}
@ -475,6 +463,9 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
OPAL_THREAD_UNLOCK(&lock);
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: adding non-blocking recv on multicast channel %03d.%03d.%03d.%03d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag));
OPAL_THREAD_LOCK(&lock);
opal_list_append(&recvs, &recvptr->item);
OPAL_THREAD_UNLOCK(&lock);
@ -605,8 +596,8 @@ static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
int ret;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: recv_buffer_nb called on multicast channel %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel));
"%s rmcast:basic: recv_buffer_nb called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
recvptr = OBJ_NEW(rmcast_base_recv_t);
recvptr->channel = channel;
@ -702,6 +693,12 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
/* already exists - check that the requested
* sockets are setup
*/
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic using existing channel network %03d.%03d.%03d.%03d port %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(chan->network),
(int)chan->port));
if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
ORTE_ERROR_LOG(rc);
return rc;
@ -710,7 +707,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
}
/* we didn't find an existing match, so create a new channel */
chan = OBJ_NEW(rmcast_base_channel_t); /* puts it on list */
chan = OBJ_NEW(rmcast_base_channel_t);
chan->name = strdup(name);
/* if we were given a channel, then just use it */
if (ORTE_RMCAST_INVALID_CHANNEL != *channel) {
@ -741,6 +738,14 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
opal_list_append(&channels, &chan->item);
OPAL_THREAD_UNLOCK(&lock);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic opening new channel network %03d.%03d.%03d.%03d port %d for%s%s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(chan->network),
(int)chan->port,
(ORTE_RMCAST_RECV & direction) ? " RECV" : " ",
(ORTE_RMCAST_XMIT & direction) ? " XMIT" : " "));
if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
ORTE_ERROR_LOG(rc);
return rc;
@ -831,7 +836,7 @@ static void process_recv(int fd, short event, void *cbdata)
next = opal_list_get_next(item);
ptr = (rmcast_base_recv_t*)item;
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic:recv checking channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)ptr->channel, (int)ptr->tag));
@ -879,6 +884,10 @@ static void process_recv(int fd, short event, void *cbdata)
}
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic:recv delivering message to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
if (0 == flag) {
/* dealing with iovecs */
if (NULL != ptr->cbfunc_iovec) {
@ -979,7 +988,8 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
chan->addr.sin_port = htons(chan->port);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"setup:channel addr %03d.%03d.%03d.%03d port %d",
"%s setup:channel addr %03d.%03d.%03d.%03d port %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port));
if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) {
@ -1003,6 +1013,10 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
chan->recv = recvsd;
/* setup an event to catch messages */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s setup:channel activating recv event on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv));
opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0);
}
@ -1216,9 +1230,9 @@ static void xmit_data(int sd, short flags, void* send_req)
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic sending %d bytes to tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
outbound, (int)snd->tag));
"%s rmcast:basic multicasting %d bytes to network %03d.%03d.%03d.%03d port %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), outbound,
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port, (int)snd->tag));
if (outbound != (rc = sendto(chan->xmit, chan->send_data, outbound, 0,
(struct sockaddr *)&(chan->addr), sizeof(struct sockaddr_in)))) {

Просмотреть файл

@ -36,6 +36,45 @@ static void cbfunc_iovec(int status,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata);
orte_rmcast_channel_t chan=4;
static void send_data(int fd, short flags, void *arg)
{
opal_buffer_t buf, *bfptr;
int32_t i32;
struct iovec iovec_array[3];
int rc, i;
opal_event_t *tmp = (opal_event_t*)arg;
struct timeval now;
bfptr = OBJ_NEW(opal_buffer_t);
i32 = -1;
opal_dss.pack(bfptr, &i32, 1, OPAL_INT32);
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(chan,
ORTE_RMCAST_TAG_OUTPUT, bfptr,
cbfunc_buf_snt, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(bfptr);
return;
}
/* create an iovec array */
for (i=0; i < 3; i++) {
iovec_array[i].iov_base = (uint8_t*)malloc(30);
iovec_array[i].iov_len = 30;
}
/* send it out */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(chan,
ORTE_RMCAST_TAG_OUTPUT,
iovec_array, 3))) {
ORTE_ERROR_LOG(rc);
return;
}
/* reset the timer */
now.tv_sec = 5;
now.tv_usec = 0;
opal_evtimer_add(tmp, &now);
}
int main(int argc, char* argv[])
{
int rc, i;
@ -44,7 +83,6 @@ int main(int argc, char* argv[])
opal_buffer_t buf, *bfptr;
int32_t i32=1;
struct iovec iovec_array[3];
orte_rmcast_channel_t chan=4;
if (0 > (rc = orte_init(ORTE_PROC_NON_MPI))) {
fprintf(stderr, "orte_nodename: couldn't init orte - error code %d\n", rc);
@ -57,17 +95,20 @@ int main(int argc, char* argv[])
printf("orte_mcast: Node %s Name %s Pid %ld\n",
hostname, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)pid);
if (0 == ORTE_PROC_MY_NAME->vpid) {
orte_grpcomm.barrier();
/* open a new channel */
if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_XMIT))) {
ORTE_ERROR_LOG(rc);
goto blast;
}
orte_grpcomm.barrier();
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pass the new channel number */
i32 = chan;
opal_dss.pack(&buf, &i32, 1, OPAL_INT32);
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_ANNOUNCE, &buf))) {
@ -75,60 +116,18 @@ int main(int argc, char* argv[])
OBJ_DESTRUCT(&buf);
goto blast;
}
orte_grpcomm.barrier();
OBJ_DESTRUCT(&buf);
bfptr = OBJ_NEW(opal_buffer_t);
i32 = 2;
opal_dss.pack(bfptr, &i32, 1, OPAL_INT32);
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(chan,
ORTE_RMCAST_TAG_OUTPUT, bfptr,
cbfunc_buf_snt, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(bfptr);
goto blast;
}
orte_grpcomm.barrier();
/* create an iovec array */
for (i=0; i < 3; i++) {
iovec_array[i].iov_base = (uint8_t*)malloc(30);
iovec_array[i].iov_len = 30;
}
/* send it out */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD,
iovec_array, 3))) {
ORTE_ERROR_LOG(rc);
goto blast;
}
orte_grpcomm.barrier();
orte_finalize();
return 0;
/* wake up every 5 seconds and send something */
ORTE_TIMER_EVENT(5, 0, send_data);
} else {
/* open a new channel */
if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_RECV))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(chan,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
cbfunc_iovec, NULL))) {
ORTE_ERROR_LOG(rc);
}
orte_grpcomm.barrier();
orte_grpcomm.barrier(); /* ensure the public recv is ready */
}
opal_event_dispatch();
@ -153,29 +152,29 @@ static void cbfunc(int status,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), channel, tag, i32);
orte_grpcomm.barrier();
#if 0
opal_buffer_t *buf;
int32_t i32=2, rc;
buf = OBJ_NEW(opal_buffer_t);
opal_dss.pack(buf, &i32, 1, OPAL_INT32);
opal_dss.pack(buf, &i32, 1, OPAL_INT32);
opal_dss.pack(buf, &i32, 1, OPAL_INT32);
if (0 != ORTE_PROC_MY_NAME->vpid) {
if (1 == i32) {
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD, buf))) {
ORTE_ERROR_LOG(rc);
}
}
OBJ_RELEASE(buf);
if (i32 < 0) {
return;
}
/* open a new channel */
chan = i32;
if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_RECV))) {
ORTE_ERROR_LOG(rc);
}
/* setup to recv data on it */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(chan,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(chan,
ORTE_RMCAST_TAG_OUTPUT,
ORTE_RMCAST_PERSISTENT,
cbfunc_iovec, NULL))) {
ORTE_ERROR_LOG(rc);
}
#endif
}
static void cbfunc_iovec(int status,
@ -189,18 +188,6 @@ static void cbfunc_iovec(int status,
opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count);
#if 0
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* send it back */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_TAG_WILDCARD, msg, count))) {
ORTE_ERROR_LOG(rc);
}
}
#endif
orte_grpcomm.barrier();
orte_finalize();
exit(0);
}
static void cbfunc_buf_snt(int status,