diff --git a/orte/mca/rmcast/basic/rmcast_basic.c b/orte/mca/rmcast/basic/rmcast_basic.c index a03e62b754..128ec20b9e 100644 --- a/orte/mca/rmcast/basic/rmcast_basic.c +++ b/orte/mca/rmcast/basic/rmcast_basic.c @@ -153,8 +153,8 @@ orte_rmcast_module_t orte_rmcast_basic_module = { */ static int init(void) { - rmcast_base_channel_t *chan; int rc; + orte_rmcast_channel_t channel; if (init_completed) { return ORTE_SUCCESS; @@ -171,47 +171,34 @@ static int init(void) next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS; /* setup the respective public address channel */ - chan = OBJ_NEW(rmcast_base_channel_t); if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) { - chan->name = strdup("system"); - chan->channel = ORTE_RMCAST_SYS_CHANNEL; - chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_SYS_CHANNEL; - chan->port = orte_rmcast_base.ports[ORTE_RMCAST_SYS_CHANNEL]; - chan->interface = orte_rmcast_base.interface; + channel = ORTE_RMCAST_SYS_CHANNEL; + if (ORTE_SUCCESS != (rc = open_channel(&channel, "system", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } } else if (ORTE_PROC_IS_APP) { - chan->name = strdup("app-announce"); - chan->channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL; - chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_APP_PUBLIC_CHANNEL; - chan->port = orte_rmcast_base.ports[ORTE_RMCAST_APP_PUBLIC_CHANNEL]; - chan->interface = orte_rmcast_base.interface; + channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL; + if (ORTE_SUCCESS != (rc = open_channel(&channel, "app-announce", + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } } else { opal_output(0, "rmcast:basic:init - unknown process type"); return ORTE_ERR_SILENT; } - OPAL_THREAD_LOCK(&lock); - opal_list_append(&channels, &chan->item); - OPAL_THREAD_UNLOCK(&lock); - if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) { - ORTE_ERROR_LOG(rc); - return rc; - } /* finally, if we are an app, setup our grp channel, if one was given */ if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { - chan = OBJ_NEW(rmcast_base_channel_t); - chan->name = strdup(orte_rmcast_base.my_group_name); - chan->channel = orte_rmcast_base.my_group_number; - chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number; - chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number]; - chan->interface = orte_rmcast_base.interface; - OPAL_THREAD_LOCK(&lock); - opal_list_append(&channels, &chan->item); - OPAL_THREAD_UNLOCK(&lock); - if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_XMIT))) { + channel = orte_rmcast_base.my_group_number; + if (ORTE_SUCCESS != (rc = open_channel(&channel, orte_rmcast_base.my_group_name, + NULL, -1, NULL, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; } - my_group_channel = chan; + my_group_channel = (rmcast_base_channel_t*)opal_list_get_last(&channels); } return ORTE_SUCCESS; @@ -432,6 +419,7 @@ static int queue_recv(rmcast_base_recv_t *recvptr, { opal_list_item_t *item; rmcast_base_channel_t *ch, *chptr; + rmcast_base_recv_t *rptr; /* find the channel */ ch = NULL; @@ -450,8 +438,8 @@ static int queue_recv(rmcast_base_recv_t *recvptr, } OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); + "%s rmcast:basic: queue_recv called on multicast channel %03d.%03d.%03d.%03d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag)); if (!blocking) { /* do we already have a recv for this channel/tag/cbfunc? */ @@ -459,15 +447,15 @@ static int queue_recv(rmcast_base_recv_t *recvptr, for (item = opal_list_get_first(&recvs); item != opal_list_get_end(&recvs); item = opal_list_get_next(item)) { - recvptr = (rmcast_base_recv_t*)item; - if (channel == recvptr->channel && - tag == recvptr->tag && - ((NULL != cbfunc_iovec && cbfunc_iovec == recvptr->cbfunc_iovec) || - (NULL != cbfunc_buffer && cbfunc_buffer == recvptr->cbfunc_buffer))) { + rptr = (rmcast_base_recv_t*)item; + if (channel == rptr->channel && + tag == rptr->tag && + ((NULL != cbfunc_iovec && cbfunc_iovec == rptr->cbfunc_iovec) || + (NULL != cbfunc_buffer && cbfunc_buffer == rptr->cbfunc_buffer))) { /* matching recv in place */ OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); + "%s rmcast:basic: matching recv already active on multicast channel %03d.%03d.%03d.%03d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag)); OPAL_THREAD_UNLOCK(&lock); return ORTE_EXISTS; } @@ -475,6 +463,9 @@ static int queue_recv(rmcast_base_recv_t *recvptr, OPAL_THREAD_UNLOCK(&lock); } + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic: adding non-blocking recv on multicast channel %03d.%03d.%03d.%03d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network), tag)); OPAL_THREAD_LOCK(&lock); opal_list_append(&recvs, &recvptr->item); OPAL_THREAD_UNLOCK(&lock); @@ -605,8 +596,8 @@ static int basic_recv_buffer_nb(orte_rmcast_channel_t channel, int ret; OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:basic: recv_buffer_nb called on multicast channel %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel)); + "%s rmcast:basic: recv_buffer_nb called on multicast channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag)); recvptr = OBJ_NEW(rmcast_base_recv_t); recvptr->channel = channel; @@ -702,6 +693,12 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, /* already exists - check that the requested * sockets are setup */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic using existing channel network %03d.%03d.%03d.%03d port %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + OPAL_IF_FORMAT_ADDR(chan->network), + (int)chan->port)); + if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { ORTE_ERROR_LOG(rc); return rc; @@ -710,7 +707,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, } /* we didn't find an existing match, so create a new channel */ - chan = OBJ_NEW(rmcast_base_channel_t); /* puts it on list */ + chan = OBJ_NEW(rmcast_base_channel_t); chan->name = strdup(name); /* if we were given a channel, then just use it */ if (ORTE_RMCAST_INVALID_CHANNEL != *channel) { @@ -741,6 +738,14 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, opal_list_append(&channels, &chan->item); OPAL_THREAD_UNLOCK(&lock); + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic opening new channel network %03d.%03d.%03d.%03d port %d for%s%s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + OPAL_IF_FORMAT_ADDR(chan->network), + (int)chan->port, + (ORTE_RMCAST_RECV & direction) ? " RECV" : " ", + (ORTE_RMCAST_XMIT & direction) ? " XMIT" : " ")); + if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { ORTE_ERROR_LOG(rc); return rc; @@ -831,7 +836,7 @@ static void process_recv(int fd, short event, void *cbdata) next = opal_list_get_next(item); ptr = (rmcast_base_recv_t*)item; - OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic:recv checking channel %d tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ptr->channel, (int)ptr->tag)); @@ -879,6 +884,10 @@ static void process_recv(int fd, short event, void *cbdata) } } + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic:recv delivering message to channel %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag)); + if (0 == flag) { /* dealing with iovecs */ if (NULL != ptr->cbfunc_iovec) { @@ -979,7 +988,8 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction) chan->addr.sin_port = htons(chan->port); OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "setup:channel addr %03d.%03d.%03d.%03d port %d", + "%s setup:channel addr %03d.%03d.%03d.%03d port %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port)); if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) { @@ -1003,6 +1013,10 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction) chan->recv = recvsd; /* setup an event to catch messages */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s setup:channel activating recv event on fd %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),(int)chan->recv)); + opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); opal_event_add(&chan->recv_ev, 0); } @@ -1216,9 +1230,9 @@ static void xmit_data(int sd, short flags, void* send_req) } OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:basic sending %d bytes to tag %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - outbound, (int)snd->tag)); + "%s rmcast:basic multicasting %d bytes to network %03d.%03d.%03d.%03d port %d tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), outbound, + OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port, (int)snd->tag)); if (outbound != (rc = sendto(chan->xmit, chan->send_data, outbound, 0, (struct sockaddr *)&(chan->addr), sizeof(struct sockaddr_in)))) { diff --git a/orte/test/system/orte_mcast.c b/orte/test/system/orte_mcast.c index bcc264a48e..ef0620351c 100644 --- a/orte/test/system/orte_mcast.c +++ b/orte/test/system/orte_mcast.c @@ -36,6 +36,45 @@ static void cbfunc_iovec(int status, orte_process_name_t *sender, struct iovec *msg, int count, void* cbdata); +orte_rmcast_channel_t chan=4; + +static void send_data(int fd, short flags, void *arg) +{ + opal_buffer_t buf, *bfptr; + int32_t i32; + struct iovec iovec_array[3]; + int rc, i; + opal_event_t *tmp = (opal_event_t*)arg; + struct timeval now; + + bfptr = OBJ_NEW(opal_buffer_t); + i32 = -1; + opal_dss.pack(bfptr, &i32, 1, OPAL_INT32); + if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(chan, + ORTE_RMCAST_TAG_OUTPUT, bfptr, + cbfunc_buf_snt, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(bfptr); + return; + } + /* create an iovec array */ + for (i=0; i < 3; i++) { + iovec_array[i].iov_base = (uint8_t*)malloc(30); + iovec_array[i].iov_len = 30; + } + /* send it out */ + if (ORTE_SUCCESS != (rc = orte_rmcast.send(chan, + ORTE_RMCAST_TAG_OUTPUT, + iovec_array, 3))) { + ORTE_ERROR_LOG(rc); + return; + } + /* reset the timer */ + now.tv_sec = 5; + now.tv_usec = 0; + opal_evtimer_add(tmp, &now); +} + int main(int argc, char* argv[]) { int rc, i; @@ -44,7 +83,6 @@ int main(int argc, char* argv[]) opal_buffer_t buf, *bfptr; int32_t i32=1; struct iovec iovec_array[3]; - orte_rmcast_channel_t chan=4; if (0 > (rc = orte_init(ORTE_PROC_NON_MPI))) { fprintf(stderr, "orte_nodename: couldn't init orte - error code %d\n", rc); @@ -57,17 +95,20 @@ int main(int argc, char* argv[]) printf("orte_mcast: Node %s Name %s Pid %ld\n", hostname, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)pid); - + if (0 == ORTE_PROC_MY_NAME->vpid) { + orte_grpcomm.barrier(); + /* open a new channel */ if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_XMIT))) { ORTE_ERROR_LOG(rc); goto blast; } - orte_grpcomm.barrier(); OBJ_CONSTRUCT(&buf, opal_buffer_t); + /* pass the new channel number */ + i32 = chan; opal_dss.pack(&buf, &i32, 1, OPAL_INT32); if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_APP_PUBLIC_CHANNEL, ORTE_RMCAST_TAG_ANNOUNCE, &buf))) { @@ -75,60 +116,18 @@ int main(int argc, char* argv[]) OBJ_DESTRUCT(&buf); goto blast; } - orte_grpcomm.barrier(); OBJ_DESTRUCT(&buf); - bfptr = OBJ_NEW(opal_buffer_t); - i32 = 2; - opal_dss.pack(bfptr, &i32, 1, OPAL_INT32); - if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer_nb(chan, - ORTE_RMCAST_TAG_OUTPUT, bfptr, - cbfunc_buf_snt, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(bfptr); - goto blast; - } - orte_grpcomm.barrier(); - /* create an iovec array */ - for (i=0; i < 3; i++) { - iovec_array[i].iov_base = (uint8_t*)malloc(30); - iovec_array[i].iov_len = 30; - } - /* send it out */ - if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_APP_PUBLIC_CHANNEL, - ORTE_RMCAST_TAG_WILDCARD, - iovec_array, 3))) { - ORTE_ERROR_LOG(rc); - goto blast; - } - orte_grpcomm.barrier(); - orte_finalize(); - return 0; + + /* wake up every 5 seconds and send something */ + ORTE_TIMER_EVENT(5, 0, send_data); } else { - /* open a new channel */ - if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_RECV))) { - ORTE_ERROR_LOG(rc); - } if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL, ORTE_RMCAST_TAG_WILDCARD, ORTE_RMCAST_PERSISTENT, cbfunc, NULL))) { ORTE_ERROR_LOG(rc); } - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(chan, - ORTE_RMCAST_TAG_WILDCARD, - ORTE_RMCAST_PERSISTENT, - cbfunc, NULL))) { - ORTE_ERROR_LOG(rc); - } - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL, - ORTE_RMCAST_TAG_WILDCARD, - ORTE_RMCAST_PERSISTENT, - cbfunc_iovec, NULL))) { - ORTE_ERROR_LOG(rc); - } - - orte_grpcomm.barrier(); - + orte_grpcomm.barrier(); /* ensure the public recv is ready */ } opal_event_dispatch(); @@ -153,29 +152,29 @@ static void cbfunc(int status, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), channel, tag, i32); - orte_grpcomm.barrier(); - -#if 0 - opal_buffer_t *buf; - int32_t i32=2, rc; - - buf = OBJ_NEW(opal_buffer_t); - opal_dss.pack(buf, &i32, 1, OPAL_INT32); - opal_dss.pack(buf, &i32, 1, OPAL_INT32); - opal_dss.pack(buf, &i32, 1, OPAL_INT32); - - - - if (0 != ORTE_PROC_MY_NAME->vpid) { - if (1 == i32) { - if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_APP_PUBLIC_CHANNEL, - ORTE_RMCAST_TAG_WILDCARD, buf))) { - ORTE_ERROR_LOG(rc); - } - } - OBJ_RELEASE(buf); + if (i32 < 0) { + return; + } + + /* open a new channel */ + chan = i32; + if (ORTE_SUCCESS != (rc = orte_rmcast.open_channel(&chan, "orte_mcast", NULL, -1, NULL, ORTE_RMCAST_RECV))) { + ORTE_ERROR_LOG(rc); + } + + /* setup to recv data on it */ + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(chan, + ORTE_RMCAST_TAG_OUTPUT, + ORTE_RMCAST_PERSISTENT, + cbfunc, NULL))) { + ORTE_ERROR_LOG(rc); + } + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(chan, + ORTE_RMCAST_TAG_OUTPUT, + ORTE_RMCAST_PERSISTENT, + cbfunc_iovec, NULL))) { + ORTE_ERROR_LOG(rc); } -#endif } static void cbfunc_iovec(int status, @@ -189,18 +188,6 @@ static void cbfunc_iovec(int status, opal_output(0, "%s GOT IOVEC MESSAGE from %s of %d elements\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender), count); -#if 0 - if (0 != ORTE_PROC_MY_NAME->vpid) { - /* send it back */ - if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_APP_PUBLIC_CHANNEL, - ORTE_RMCAST_TAG_WILDCARD, msg, count))) { - ORTE_ERROR_LOG(rc); - } - } -#endif - orte_grpcomm.barrier(); - orte_finalize(); - exit(0); } static void cbfunc_buf_snt(int status,