diff --git a/orte/mca/rmcast/base/rmcast_base_open.c b/orte/mca/rmcast/base/rmcast_base_open.c index d587975a17..7c7a15cabe 100644 --- a/orte/mca/rmcast/base/rmcast_base_open.c +++ b/orte/mca/rmcast/base/rmcast_base_open.c @@ -69,6 +69,7 @@ orte_rmcast_module_t orte_rmcast = { NULL, NULL, NULL, + NULL, NULL }; orte_rmcast_base_t orte_rmcast_base; diff --git a/orte/mca/rmcast/basic/rmcast_basic.c b/orte/mca/rmcast/basic/rmcast_basic.c index 191733a8e6..3073298c64 100644 --- a/orte/mca/rmcast/basic/rmcast_basic.c +++ b/orte/mca/rmcast/basic/rmcast_basic.c @@ -87,6 +87,8 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, static int close_channel(orte_rmcast_channel_t channel); +static orte_rmcast_channel_t query(void); + /* Define the module */ orte_rmcast_module_t orte_rmcast_basic_module = { @@ -98,7 +100,8 @@ orte_rmcast_module_t orte_rmcast_basic_module = { basic_recv_nb, cancel_recv, open_channel, - close_channel + close_channel, + query }; /* during init, we setup two channels for both xmit and recv: @@ -444,6 +447,23 @@ static int basic_recv_nb(orte_rmcast_channel_t channel, return ORTE_ERR_NOT_FOUND; } + /* do we already have a recv for this channel/tag/cbfunc? */ + OPAL_THREAD_LOCK(&lock); + for (item = opal_list_get_first(&recvs); + item != opal_list_get_end(&recvs); + item = opal_list_get_next(item)) { + recvptr = (rmcast_base_recv_t*)item; + if (channel == recvptr->channel && + tag == recvptr->tag && + cbfunc == recvptr->cbfunc) { + /* matching recv in place */ + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); + return ORTE_SUCCESS; + } + } + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); @@ -520,6 +540,11 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, if (0 != netaddr && netaddr != (nchan->network & netmask)) { continue; } + /* check the channel, if one was given */ + if (ORTE_RMCAST_INVALID_CHANNEL != *channel && + nchan->channel != *channel) { + continue; + } chan = nchan; break; } @@ -533,14 +558,19 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, ORTE_ERROR_LOG(rc); return rc; } - *channel = chan->channel; return ORTE_SUCCESS; } /* we didn't find an existing match, so create a new channel */ chan = OBJ_NEW(rmcast_base_channel_t); /* puts it on list */ chan->name = strdup(name); - chan->channel = next_channel++; + /* if we were given a channel, then just use it */ + if (ORTE_RMCAST_INVALID_CHANNEL != *channel) { + chan->channel = *channel; + } else { + chan->channel = next_channel++; + *channel = chan->channel; + } /* if we were not given a network, use the default */ if (NULL == network) { chan->network = orte_rmcast_base.xmit_network + chan->channel; @@ -567,8 +597,6 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, ORTE_ERROR_LOG(rc); return rc; } - - *channel = chan->channel; return ORTE_SUCCESS; } @@ -596,6 +624,14 @@ static int close_channel(orte_rmcast_channel_t channel) return ORTE_ERR_NOT_FOUND; } +static orte_rmcast_channel_t query(void) +{ + return orte_rmcast_base.my_group_number; +} + + +/**** LOCAL FUNCTIONS ****/ + static void process_recv(int fd, short event, void *cbdata) { orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata; diff --git a/orte/mca/rmcast/rmcast.h b/orte/mca/rmcast/rmcast.h index 32504b4021..0259bba11f 100644 --- a/orte/mca/rmcast/rmcast.h +++ b/orte/mca/rmcast/rmcast.h @@ -80,6 +80,9 @@ typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t * /* close the channel */ typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t channel); +/* return my group's channel */ +typedef orte_rmcast_channel_t (*orte_rmcast_base_module_query_channel_fn_t)(void); + /* * rmcast component */ @@ -107,6 +110,7 @@ struct orte_rmcast_base_module_t { orte_rmcast_base_module_cancel_recv_fn_t cancel_recv; orte_rmcast_base_module_open_channel_fn_t open_channel; orte_rmcast_base_module_close_channel_fn_t close_channel; + orte_rmcast_base_module_query_channel_fn_t query_channel; }; /** Convienence typedef */ typedef struct orte_rmcast_base_module_t orte_rmcast_module_t;