1
1

Use ports as multicast channels instead of networks so we avoid stepping into reserved spaces.

This commit was SVN r24666.
Этот коммит содержится в:
Ralph Castain 2011-04-29 18:46:40 +00:00
родитель 7b29a6153e
Коммит 138928fcf4
8 изменённых файлов: 206 добавлений и 47 удалений

Просмотреть файл

@ -46,7 +46,10 @@ typedef struct {
char *my_group_name;
uint8_t my_group_number;
uint32_t interface;
uint16_t ports[256];
struct {
char **start;
char **end;
} ports;
int cache_size;
bool opened;
orte_thread_ctl_t main_ctl;
@ -72,6 +75,8 @@ ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base;
ORTE_DECLSPEC int orte_rmcast_base_select(void);
ORTE_DECLSPEC int orte_rmcast_base_close(void);
ORTE_DECLSPEC void orte_rmcast_print_buffer_finalize(void);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -186,6 +186,8 @@ ORTE_DECLSPEC int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel);
ORTE_DECLSPEC int orte_rmcast_base_query(orte_rmcast_channel_t *output,
orte_rmcast_channel_t *input);
ORTE_DECLSPEC char* orte_rmcast_base_print_channel(orte_rmcast_channel_t channel);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -15,6 +15,7 @@
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/threads/threads.h"
#include "opal/threads/tsd.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
@ -381,3 +382,107 @@ static int insert_hdr(opal_buffer_t *buf,
}
return rc;
}
#define ORTE_RMCAST_PRINT_MAX_SIZE 50
#define ORTE_RMCAST_PRINT_NUM_BUFS 16
static bool fns_init=false;
static opal_tsd_key_t print_tsd_key;
static char* orte_rmcast_print_null = "NULL";
typedef struct {
char *buffers[ORTE_RMCAST_PRINT_NUM_BUFS];
int cntr;
} orte_rmcast_print_buffers_t;
void orte_rmcast_print_buffer_finalize(void)
{
if (fns_init) {
opal_tsd_key_delete(print_tsd_key);
}
}
static void buffer_cleanup(void *value)
{
int i;
orte_rmcast_print_buffers_t *ptr;
if (NULL != value) {
ptr = (orte_rmcast_print_buffers_t*)value;
for (i=0; i < ORTE_RMCAST_PRINT_NUM_BUFS; i++) {
free(ptr->buffers[i]);
}
}
}
static orte_rmcast_print_buffers_t *get_print_buffer(void)
{
orte_rmcast_print_buffers_t *ptr;
int ret, i;
if (!fns_init) {
/* setup the print_args function */
if (ORTE_SUCCESS != (ret = opal_tsd_key_create(&print_tsd_key, buffer_cleanup))) {
ORTE_ERROR_LOG(ret);
return NULL;
}
fns_init = true;
}
ret = opal_tsd_getspecific(print_tsd_key, (void**)&ptr);
if (OPAL_SUCCESS != ret) return NULL;
if (NULL == ptr) {
ptr = (orte_rmcast_print_buffers_t*)malloc(sizeof(orte_rmcast_print_buffers_t));
for (i=0; i < ORTE_RMCAST_PRINT_NUM_BUFS; i++) {
ptr->buffers[i] = (char *) malloc((ORTE_RMCAST_PRINT_MAX_SIZE+1) * sizeof(char));
}
ptr->cntr = 0;
ret = opal_tsd_setspecific(print_tsd_key, (void*)ptr);
}
return (orte_rmcast_print_buffers_t*) ptr;
}
char* orte_rmcast_base_print_channel(orte_rmcast_channel_t channel)
{
char *ret;
orte_rmcast_print_buffers_t *ptr;
switch(channel) {
case ORTE_RMCAST_GROUP_INPUT_CHANNEL:
return "INPUT";
case ORTE_RMCAST_DIRECT_CHANNEL:
return "DIRECT";
case ORTE_RMCAST_GROUP_OUTPUT_CHANNEL:
return "OUTPUT";
case ORTE_RMCAST_WILDCARD_CHANNEL:
return "WILDCARD";
case ORTE_RMCAST_INVALID_CHANNEL:
return "INVALID";
case ORTE_RMCAST_SYS_CHANNEL:
return "SYSTEM";
case ORTE_RMCAST_APP_PUBLIC_CHANNEL:
return "PUBLIC";
case ORTE_RMCAST_DATA_SERVER_CHANNEL:
return "DATA_SERVER";
case ORTE_RMCAST_ERROR_CHANNEL:
return "ERROR";
case ORTE_RMCAST_HEARTBEAT_CHANNEL:
return "HEARTBEAT";
default:
/* not a system-defined channel - so print the value out */
ptr = get_print_buffer();
if (NULL == ptr) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return orte_rmcast_print_null;
}
/* cycle around the ring */
if (ORTE_RMCAST_PRINT_NUM_BUFS == ptr->cntr) {
ptr->cntr = 0;
}
snprintf(ptr->buffers[ptr->cntr], ORTE_RMCAST_PRINT_MAX_SIZE, "%d", channel);
ret = ptr->buffers[ptr->cntr];
ptr->cntr++;
return ret;
}
}

Просмотреть файл

@ -90,8 +90,8 @@ static bool opened=false;
*/
int orte_rmcast_base_open(void)
{
int value, pval, i;
char *tmp, **nets=NULL, **ports=NULL, *ptr;
int value, i;
char *tmp, **nets=NULL, *ptr;
int idx, lb;
struct sockaddr_in inaddr;
uint32_t addr, netaddr, netmask;
@ -122,9 +122,8 @@ int orte_rmcast_base_open(void)
orte_rmcast_base.my_group_name = NULL;
orte_rmcast_base.my_group_number = 0;
orte_rmcast_base.interface = 0;
for (i=0; i < 255; i++) {
orte_rmcast_base.ports[i] = 0;
}
orte_rmcast_base.ports.start = NULL;
orte_rmcast_base.ports.end = NULL;
orte_rmcast_base.my_output_channel = NULL;
orte_rmcast_base.my_input_channel = NULL;
@ -134,14 +133,14 @@ int orte_rmcast_base_open(void)
false, false, "link", &tmp);
rc = ORTE_ERR_SILENT;
if (0 == strcasecmp(tmp, "site")) {
rc = opal_iftupletoaddr("239.255.0.0", &orte_rmcast_base.xmit_network, NULL);
rc = opal_iftupletoaddr("239.255.0.150", &orte_rmcast_base.xmit_network, NULL);
} else if (0 == strcasecmp(tmp, "org")) {
rc = opal_iftupletoaddr("239.192.0.0", &orte_rmcast_base.xmit_network, NULL);
rc = opal_iftupletoaddr("239.192.0.150", &orte_rmcast_base.xmit_network, NULL);
} else if (0 == strcasecmp(tmp, "global")) {
rc = opal_iftupletoaddr("224.0.1.0", &orte_rmcast_base.xmit_network, NULL);
rc = opal_iftupletoaddr("224.0.1.150", &orte_rmcast_base.xmit_network, NULL);
} else if (0 == strcasecmp(tmp, "link")) {
/* default to link */
rc = opal_iftupletoaddr("224.0.0.0", &orte_rmcast_base.xmit_network, NULL);
rc = opal_iftupletoaddr("224.0.0.150", &orte_rmcast_base.xmit_network, NULL);
} else if (NULL != strchr(tmp, '.')) {
/* must have been given an actual network address */
rc = opal_iftupletoaddr(tmp, &orte_rmcast_base.xmit_network, NULL);
@ -262,27 +261,7 @@ int orte_rmcast_base_open(void)
mca_base_param_reg_string_name("rmcast", "base_multicast_ports",
"Ports available for multicast channels (default: 6900-7155)",
false, false, "6900-7154", &tmp);
ports = NULL;
orte_util_parse_range_options(tmp, &ports);
if (255 < opal_argv_count(ports)) {
orte_show_help("help-rmcast-base.txt", "too-many-values", true,
"ports", tmp, opal_argv_count(ports), "255");
free(tmp);
opal_argv_free(nets);
return ORTE_ERR_SILENT;
}
for (i=0; i < opal_argv_count(ports); i++) {
pval = strtoul(ports[i], NULL, 10);
if (pval >= UINT16_MAX) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(tmp);
opal_argv_free(ports);
return ORTE_ERR_BAD_PARAM;
}
orte_rmcast_base.ports[i] = pval;
}
free(tmp);
opal_argv_free(ports);
orte_util_get_ranges(tmp, &orte_rmcast_base.ports.start, &orte_rmcast_base.ports.end);
/* send cache size */
mca_base_param_reg_int_name("rmcast", "base_cache_size",

Просмотреть файл

@ -647,6 +647,8 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
rmcast_base_channel_t *nchan, *chan;
uint32_t netaddr=0, netmask=0, intr=0;
int rc;
unsigned int i, n, start, end, range;
bool port_assigned;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s opening channel %d for %s",
@ -687,11 +689,7 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
if (nchan->channel == channel ||
0 == strcasecmp(nchan->name, name)) {
/* check the network, if one was specified */
if (0 != netaddr && netaddr != (nchan->network & netmask)) {
continue;
}
chan = nchan;
chan = nchan;
break;
}
}
@ -718,15 +716,16 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
/* we didn't find an existing match, so create a new channel */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s creating new channel %d for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, name));
"%s creating new channel %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
orte_rmcast_base_print_channel(channel), name));
chan = OBJ_NEW(rmcast_base_channel_t);
chan->name = strdup(name);
chan->channel = channel;
/* if we were not given a network, use the default */
if (NULL == network) {
chan->network = orte_rmcast_base.xmit_network + chan->channel;
chan->network = orte_rmcast_base.xmit_network;
} else {
chan->network = netaddr;
}
@ -738,7 +737,30 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
}
/* if we were not given a port, use a default one */
if (port < 0) {
chan->port = orte_rmcast_base.ports[chan->channel];
/* cycle thru the port ranges until we find the
* port corresponding to this channel number
*/
n=0;
port_assigned = false;
for (i=0; NULL != orte_rmcast_base.ports.start[i]; i++) {
/* how many ports are in this range? */
start = strtol(orte_rmcast_base.ports.start[i], NULL, 10);
end = strtol(orte_rmcast_base.ports.end[i], NULL, 10);
range = end - start + 1;
if (chan->channel < (n + range)) {
/* take the corresponding port */
chan->port = start + (chan->channel - n);
port_assigned = true;
break;
}
n += range;
}
if (!port_assigned) {
opal_output(0, "%s CANNOT ASSIGN PORT TO CHANNEL %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
orte_rmcast_base_print_channel(chan->channel));
return ORTE_ERROR;
}
} else {
chan->port = port;
}
@ -746,9 +768,9 @@ static int open_channel(orte_rmcast_channel_t channel, char *name,
ORTE_RELEASE_THREAD(&orte_rmcast_base.main_ctl);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:udp opening new channel %s:%d network %03d.%03d.%03d.%03d port %d for%s%s",
"%s rmcast:udp opening new channel %s:%s network %03d.%03d.%03d.%03d port %d for%s%s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
chan->name, chan->channel,
chan->name, orte_rmcast_base_print_channel(chan->channel),
OPAL_IF_FORMAT_ADDR(chan->network),
(int)chan->port,
(ORTE_RMCAST_RECV & direction) ? " RECV" : " ",

Просмотреть файл

@ -80,7 +80,7 @@ static void send_data(int fd, short flags, void *arg)
sent_seq_num++;
if (0 == (sent_seq_num % 100)) {
opal_output(0, "SENT SEQ_NUM %lu", sent_seq_num);
opal_output(0, "SENT SEQ_NUM %d", sent_seq_num);
}
/* reset the timer */
@ -158,13 +158,13 @@ static void cbfunc(int status,
if (0 < recvd_seq_num) {
if ((seq_num - recvd_seq_num) != 1) {
opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu",
opal_output(0, "%s MESSAGE LOST seq %d recvd_seq %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num);
}
}
recvd_seq_num = seq_num;
if (0 == (recvd_seq_num % 100)) {
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
opal_output(0, "RECVD SEQ_NUM %d", recvd_seq_num);
}
}
@ -178,13 +178,13 @@ static void cbfunc_iovec(int status,
{
if (0 < recvd_seq_num) {
if ((seq_num - recvd_seq_num) != 1) {
opal_output(0, "%s MESSAGE LOST seq %lu recvd_seq %lu",
opal_output(0, "%s MESSAGE LOST seq %d recvd_seq %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), seq_num, recvd_seq_num);
}
}
recvd_seq_num = seq_num;
if (0 == (recvd_seq_num % 100)) {
opal_output(0, "RECVD SEQ_NUM %lu", recvd_seq_num);
opal_output(0, "RECVD SEQ_NUM %d", recvd_seq_num);
}
}

Просмотреть файл

@ -31,7 +31,9 @@
#endif
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/parse_options.h"
@ -96,3 +98,45 @@ cleanup:
opal_argv_free(r2);
}
void orte_util_get_ranges(char *inp, char ***startpts, char ***endpts)
{
char **r1=NULL, **r2=NULL;
int i;
char *input;
/* protect against null input */
if (NULL == inp) {
return;
}
/* protect the provided input */
input = strdup(inp);
/* split on commas */
r1 = opal_argv_split(input, ',');
/* for each resulting element, check for range */
for (i=0; i < opal_argv_count(r1); i++) {
r2 = opal_argv_split(r1[i], '-');
if (2 == opal_argv_count(r2)) {
/* given range - get start and end */
opal_argv_append_nosize(startpts, r2[0]);
opal_argv_append_nosize(endpts, r2[1]);
} else if (1 == opal_argv_count(r2)) {
/* only one value provided, so it is both the start
* and the end
*/
opal_argv_append_nosize(startpts, r2[0]);
opal_argv_append_nosize(endpts, r2[0]);
} else {
/* no idea how to parse this */
opal_output(0, "%s Unknown parse error on string: %s(%s)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), inp, r1[i]);
}
}
free(input);
opal_argv_free(r1);
opal_argv_free(r2);
}

Просмотреть файл

@ -30,5 +30,7 @@ BEGIN_C_DECLS
ORTE_DECLSPEC void orte_util_parse_range_options(char *input, char ***output);
ORTE_DECLSPEC void orte_util_get_ranges(char *inp, char ***startpts, char ***endpts);
END_C_DECLS
#endif