Add a new interface to the rmcast framework to query the output channel for the proc
This commit was SVN r22105.
Этот коммит содержится в:
родитель
c96af5654c
Коммит
49ce2b4342
@ -69,6 +69,7 @@ orte_rmcast_module_t orte_rmcast = {
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL
|
||||
};
|
||||
orte_rmcast_base_t orte_rmcast_base;
|
||||
|
@ -87,6 +87,8 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
|
||||
|
||||
static int close_channel(orte_rmcast_channel_t channel);
|
||||
|
||||
static orte_rmcast_channel_t query(void);
|
||||
|
||||
/* Define the module */
|
||||
|
||||
orte_rmcast_module_t orte_rmcast_basic_module = {
|
||||
@ -98,7 +100,8 @@ orte_rmcast_module_t orte_rmcast_basic_module = {
|
||||
basic_recv_nb,
|
||||
cancel_recv,
|
||||
open_channel,
|
||||
close_channel
|
||||
close_channel,
|
||||
query
|
||||
};
|
||||
|
||||
/* during init, we setup two channels for both xmit and recv:
|
||||
@ -444,6 +447,23 @@ static int basic_recv_nb(orte_rmcast_channel_t channel,
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
/* do we already have a recv for this channel/tag/cbfunc? */
|
||||
OPAL_THREAD_LOCK(&lock);
|
||||
for (item = opal_list_get_first(&recvs);
|
||||
item != opal_list_get_end(&recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
recvptr = (rmcast_base_recv_t*)item;
|
||||
if (channel == recvptr->channel &&
|
||||
tag == recvptr->tag &&
|
||||
cbfunc == recvptr->cbfunc) {
|
||||
/* matching recv in place */
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
|
||||
@ -520,6 +540,11 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
|
||||
if (0 != netaddr && netaddr != (nchan->network & netmask)) {
|
||||
continue;
|
||||
}
|
||||
/* check the channel, if one was given */
|
||||
if (ORTE_RMCAST_INVALID_CHANNEL != *channel &&
|
||||
nchan->channel != *channel) {
|
||||
continue;
|
||||
}
|
||||
chan = nchan;
|
||||
break;
|
||||
}
|
||||
@ -533,14 +558,19 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
*channel = chan->channel;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* we didn't find an existing match, so create a new channel */
|
||||
chan = OBJ_NEW(rmcast_base_channel_t); /* puts it on list */
|
||||
chan->name = strdup(name);
|
||||
chan->channel = next_channel++;
|
||||
/* if we were given a channel, then just use it */
|
||||
if (ORTE_RMCAST_INVALID_CHANNEL != *channel) {
|
||||
chan->channel = *channel;
|
||||
} else {
|
||||
chan->channel = next_channel++;
|
||||
*channel = chan->channel;
|
||||
}
|
||||
/* if we were not given a network, use the default */
|
||||
if (NULL == network) {
|
||||
chan->network = orte_rmcast_base.xmit_network + chan->channel;
|
||||
@ -567,8 +597,6 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
*channel = chan->channel;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -596,6 +624,14 @@ static int close_channel(orte_rmcast_channel_t channel)
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
static orte_rmcast_channel_t query(void)
|
||||
{
|
||||
return orte_rmcast_base.my_group_number;
|
||||
}
|
||||
|
||||
|
||||
/**** LOCAL FUNCTIONS ****/
|
||||
|
||||
static void process_recv(int fd, short event, void *cbdata)
|
||||
{
|
||||
orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata;
|
||||
|
@ -80,6 +80,9 @@ typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t *
|
||||
/* close the channel */
|
||||
typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t channel);
|
||||
|
||||
/* return my group's channel */
|
||||
typedef orte_rmcast_channel_t (*orte_rmcast_base_module_query_channel_fn_t)(void);
|
||||
|
||||
/*
|
||||
* rmcast component
|
||||
*/
|
||||
@ -107,6 +110,7 @@ struct orte_rmcast_base_module_t {
|
||||
orte_rmcast_base_module_cancel_recv_fn_t cancel_recv;
|
||||
orte_rmcast_base_module_open_channel_fn_t open_channel;
|
||||
orte_rmcast_base_module_close_channel_fn_t close_channel;
|
||||
orte_rmcast_base_module_query_channel_fn_t query_channel;
|
||||
};
|
||||
/** Convienence typedef */
|
||||
typedef struct orte_rmcast_base_module_t orte_rmcast_module_t;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user