1
1

Next phase of auto-wireup using multicast. Enable use of multicast groups to separate comm from different application groups. Have the orted bootstrap message go to a different rml tag so the node can be added to the pool.

This commit was SVN r22083.
Этот коммит содержится в:
Ralph Castain 2009-10-10 01:19:56 +00:00
родитель 40e2299fa7
Коммит 84cc847be8
14 изменённых файлов: 600 добавлений и 480 удалений

Просмотреть файл

@ -70,10 +70,25 @@ orte_ess_cm_component_open(void)
int orte_ess_cm_component_query(mca_base_module_t **module, int *priority) int orte_ess_cm_component_query(mca_base_module_t **module, int *priority)
{ {
#if ORTE_ENABLE_MULTICAST
char *spec;
/* only select us if specified */ /* only select us if specified */
*priority = 0; spec = getenv("OMPI_MCA_ess");
if (NULL == spec || 0 != strcmp("cm", spec)) {
*priority = 0;
*module = NULL;
return ORTE_ERROR;
}
*priority = 1000;
*module = (mca_base_module_t *)&orte_ess_cm_module; *module = (mca_base_module_t *)&orte_ess_cm_module;
return ORTE_SUCCESS; return ORTE_SUCCESS;
#else
/* cannot be used */
*priority = 0;
*module = NULL;
return ORTE_ERROR;
#endif
} }

Просмотреть файл

@ -110,10 +110,6 @@ static int rte_init(void)
goto error; goto error;
} }
/* initialize the global list of local children and job data */
OBJ_CONSTRUCT(&orte_local_children, opal_list_t);
OBJ_CONSTRUCT(&orte_local_jobdata, opal_list_t);
/* get the list of nodes used for this job */ /* get the list of nodes used for this job */
nodelist = getenv("OMPI_MCA_orte_nodelist"); nodelist = getenv("OMPI_MCA_orte_nodelist");
@ -375,6 +371,7 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
n = 1; n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &cmd, &n, ORTE_DAEMON_CMD_T))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &cmd, &n, ORTE_DAEMON_CMD_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
arrived = true;
return; return;
} }
@ -383,6 +380,14 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
n = 1; n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
arrived = true;
return;
}
/* if we got an invalid name, then declare failure */
if (ORTE_JOBID_INVALID == name.jobid &&
ORTE_VPID_INVALID == name.vpid) {
opal_output(0, "got invalid name");
arrived = true;
return; return;
} }
ORTE_PROC_MY_NAME->jobid = name.jobid; ORTE_PROC_MY_NAME->jobid = name.jobid;
@ -395,6 +400,7 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
n = 1; n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &uri, &n, OPAL_STRING))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &uri, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
arrived = true;
return; return;
} }
OPAL_OUTPUT_VERBOSE((1, orte_ess_base_output, OPAL_OUTPUT_VERBOSE((1, orte_ess_base_output,
@ -403,22 +409,12 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
orte_process_info.my_hnp_uri = uri; orte_process_info.my_hnp_uri = uri;
name_success = true; name_success = true;
arrived = true; arrived = true;
} }
static int cm_set_name(void) static int cm_set_name(void)
{ {
int i, rc; int rc;
struct sockaddr_in if_addr;
char *ifnames[] = {
"ce",
"eth0",
"eth1",
NULL
};
int32_t net, rack, slot, function;
int32_t addr;
opal_buffer_t buf; opal_buffer_t buf;
orte_daemon_cmd_flag_t cmd; orte_daemon_cmd_flag_t cmd;
@ -426,48 +422,7 @@ static int cm_set_name(void)
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_PROC_IS_DAEMON) { if (ORTE_PROC_IS_DAEMON) {
/* try constructing the name from the IP address - first, /* use the reliable multicast system to contact the HNP and
* find an appropriate interface
*/
for (i=0; NULL != ifnames[i]; i++) {
if (ORTE_SUCCESS != (rc = opal_ifnametoaddr(ifnames[i],
(struct sockaddr*)&if_addr,
sizeof(struct sockaddr_in)))) {
continue;
}
addr = htonl(if_addr.sin_addr.s_addr);
/* break address into sections */
net = 0x000000FF & ((0xFF000000 & addr) >> 24);
rack = 0x000000FF & ((0x00FF0000 & addr) >> 16);
slot = 0x000000FF & ((0x0000FF00 & addr) >> 8);
function = 0x000000FF & addr;
/* is this an appropriate interface to use */
if (10 == net) {
/* set our vpid - add 1 to ensure it cannot be zero */
ORTE_PROC_MY_NAME->vpid = (rack * mca_ess_cm_component.max_slots) + slot + function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
/* notify the HNP of our existence */
cmd = ORTE_DAEMON_CHECKIN_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
goto checkin;
} else if (192 == net && 168 == rack) {
/* just use function */
ORTE_PROC_MY_NAME->vpid = function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
/* notify the HNP of our existence */
cmd = ORTE_DAEMON_CHECKIN_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
goto checkin;
}
}
/* if we get here, then we didn't find a usable interface.
* use the reliable multicast system to contact the HNP and
* get a name * get a name
*/ */
cmd = ORTE_DAEMON_NAME_REQ_CMD; cmd = ORTE_DAEMON_NAME_REQ_CMD;
@ -479,12 +434,11 @@ static int cm_set_name(void)
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME); opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
} }
checkin:
/* always include our node name */ /* always include our node name */
opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING); opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING);
/* set the recv to get the answer */ /* set the recv to get the answer */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(0, ORTE_RMCAST_NON_PERSISTENT, if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_NON_PERSISTENT,
ORTE_RMCAST_TAG_BOOTSTRAP, ORTE_RMCAST_TAG_BOOTSTRAP,
cbfunc, NULL))) { cbfunc, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
@ -492,21 +446,24 @@ checkin:
return rc; return rc;
} }
opal_output(0, "sending name request");
/* send the request */ /* send the request */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(0, ORTE_RMCAST_TAG_BOOTSTRAP, if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_TAG_BOOTSTRAP,
&buf))) { &buf))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&buf);
return rc; return rc;
} }
/* OBJ_DESTRUCT(&buf); */ OBJ_DESTRUCT(&buf);
/* wait for response */ /* wait for response */
ORTE_PROGRESSED_WAIT(arrived, 0, 1); ORTE_PROGRESSED_WAIT(arrived, 0, 1);
/* if we got a valid name, return success */ /* if we got a valid name, return success */
if (name_success) { if (name_success) {
opal_output(0, "returning success");
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
opal_output(0, "returning not found");
return ORTE_ERR_NOT_FOUND; return ORTE_ERR_NOT_FOUND;
} }

Просмотреть файл

@ -1279,6 +1279,14 @@ int orte_odls_base_default_launch_local(orte_jobid_t job,
rc = ORTE_ERR_NOT_FOUND; rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP; goto CLEANUP;
} }
/* do we have any local procs to launch? */
if (0 == jobdat->num_local_procs) {
/* no - just return */
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return ORTE_SUCCESS;
}
apps = jobdat->apps; apps = jobdat->apps;
num_apps = jobdat->num_apps; num_apps = jobdat->num_apps;
@ -1874,10 +1882,17 @@ CLEANUP:
* instead, we queue it up for local processing * instead, we queue it up for local processing
*/ */
if (ORTE_PROC_IS_HNP) { if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:launch flagging launch report to myself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_APP_LAUNCH_CALLBACK, ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch); orte_plm_base_app_report_launch);
} else { } else {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:launch sending launch report to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* go ahead and send the update to the HNP */ /* go ahead and send the update to the HNP */
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) { if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -17,6 +17,10 @@
*/ */
#include "orte_config.h" #include "orte_config.h"
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include "orte/mca/rmcast/rmcast.h" #include "orte/mca/rmcast/rmcast.h"
BEGIN_C_DECLS BEGIN_C_DECLS
@ -31,10 +35,10 @@ ORTE_DECLSPEC int orte_rmcast_base_open(void);
typedef struct { typedef struct {
int rmcast_output; int rmcast_output;
opal_list_t rmcast_opened; opal_list_t rmcast_opened;
uint8_t octet1[2]; uint32_t xmit_network;
uint8_t octet2[2]; char *my_group_name;
uint8_t octet3[2]; uint8_t my_group_number;
uint8_t channel_offset; uint32_t interface;
uint16_t ports[256]; uint16_t ports[256];
} orte_rmcast_base_t; } orte_rmcast_base_t;

Просмотреть файл

@ -9,17 +9,26 @@
# #
# This is the US/English general help file for Open RTE. # This is the US/English general help file for Open RTE.
# #
[unrecognized-scope] [unrecognized-network]
An out-of-range value for the scope of the multicast address range was specified: An out-of-range value for the multicast transmit network was specified:
Scope: %s Network: %s
Please specify a valid scope - as a reminder, you can use Please specify a valid network - as a reminder, you can use
ompi_info -param rmcast all ompi_info -param rmcast all
to see the allowed values. to see the allowed values.
# #
[value-out-of-range]
The specified parameter is outside of the range or in an
incorrect format:
Parameter: %s
Range/Format: %s
Please adjust the value and try again.
#
[value-range] [value-range]
The specified parameter is outside of valid range: The specified parameter is outside of valid range:
@ -28,3 +37,34 @@ The specified parameter is outside of valid range:
Valid range: %s Valid range: %s
Please adjust the value and try again. Please adjust the value and try again.
#
[no-avail-interfaces]
No multicast interfaces are available. Please contact
your system administrator for assistance.
#
[invalid-net-mask]
We were unable to parse the provided network interface:
Interface: %s
Error: %s
The interface must be one of the following forms:
123.456.789.123
123.456/16
123.456.789
The system can parse any one of these, and will find an interface
that matches within the provided scope. Please revise your input
and try again.
#
[too-many-values]
The specified parameter includes too many values:
Paramater: %s
Value: %s
Number of results: %d
Allowed number: %d
Please adjust the value and try again.

Просмотреть файл

@ -24,7 +24,9 @@
#include "opal/mca/base/base.h" #include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h" #include "opal/mca/base/mca_base_param.h"
#include "opal/util/argv.h" #include "opal/util/argv.h"
#include "opal/util/if.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/parse_options.h" #include "orte/util/parse_options.h"
#include "orte/util/show_help.h" #include "orte/util/show_help.h"
@ -66,10 +68,12 @@ orte_rmcast_module_t orte_rmcast = {
NULL, NULL,
NULL, NULL,
NULL, NULL,
NULL,
NULL NULL
}; };
orte_rmcast_base_t orte_rmcast_base; orte_rmcast_base_t orte_rmcast_base;
static bool opened = false;
/** /**
* Function for finding and opening either all MCA components, or the one * Function for finding and opening either all MCA components, or the one
@ -78,71 +82,163 @@ orte_rmcast_base_t orte_rmcast_base;
int orte_rmcast_base_open(void) int orte_rmcast_base_open(void)
{ {
int value, pval, i; int value, pval, i;
char *tmp, **ports=NULL; char *tmp, **nets=NULL, **ports=NULL, *ptr;
int idx;
struct sockaddr_in inaddr;
uint32_t addr, netaddr, netmask;
bool assigned;
int rc;
if (opened) {
/* ensure we don't go through here twice */
return ORTE_SUCCESS;
}
opened = true;
/* ensure all global values are initialized */
orte_rmcast_base.xmit_network = 0;
orte_rmcast_base.my_group_name = NULL;
orte_rmcast_base.my_group_number = 0;
orte_rmcast_base.interface = 0;
for (i=0; i < 255; i++) {
orte_rmcast_base.ports[i] = 0;
}
/* public multicast channel for this job */ /* public multicast channel for this job */
mca_base_param_reg_string_name("rmcast", "base_scope", mca_base_param_reg_string_name("rmcast", "base_multicast_network",
"Scope of the multicast system [link (default) | site | org | global]", "Network to use for multicast xmissions [link (default) | site | org | global | tuple-addr]",
false, false, "link", &tmp); false, false, "link", &tmp);
rc = ORTE_ERR_SILENT;
if (0 == strcasecmp(tmp, "site")) { if (0 == strcasecmp(tmp, "site")) {
orte_rmcast_base.octet1[0] = 239; rc = opal_iftupletoaddr("239.255.0.0", &orte_rmcast_base.xmit_network, NULL);
orte_rmcast_base.octet1[1] = 239;
orte_rmcast_base.octet2[0] = 255;
orte_rmcast_base.octet2[1] = 255;
orte_rmcast_base.octet3[0] = 0;
orte_rmcast_base.octet3[1] = 255;
} else if (0 == strcasecmp(tmp, "org")) { } else if (0 == strcasecmp(tmp, "org")) {
orte_rmcast_base.octet1[0] = 239; rc = opal_iftupletoaddr("239.192.0.0", &orte_rmcast_base.xmit_network, NULL);
orte_rmcast_base.octet1[1] = 239;
orte_rmcast_base.octet2[0] = 192;
orte_rmcast_base.octet2[1] = 195;
orte_rmcast_base.octet3[0] = 0;
orte_rmcast_base.octet3[1] = 255;
} else if (0 == strcasecmp(tmp, "global")) { } else if (0 == strcasecmp(tmp, "global")) {
orte_rmcast_base.octet1[0] = 224; rc = opal_iftupletoaddr("224.0.1.0", &orte_rmcast_base.xmit_network, NULL);
orte_rmcast_base.octet1[1] = 238;
orte_rmcast_base.octet2[0] = 0;
orte_rmcast_base.octet2[1] = 255;
orte_rmcast_base.octet3[0] = 1;
orte_rmcast_base.octet3[1] = 255;
} else if (0 == strcasecmp(tmp, "link")) { } else if (0 == strcasecmp(tmp, "link")) {
/* default to link */ /* default to link */
orte_rmcast_base.octet1[0] = 224; rc = opal_iftupletoaddr("224.0.0.0", &orte_rmcast_base.xmit_network, NULL);
orte_rmcast_base.octet1[1] = 224; } else if (NULL != strchr(tmp, '.')) {
orte_rmcast_base.octet2[0] = 0; /* must have been given an actual network address */
orte_rmcast_base.octet2[1] = 0; rc = opal_iftupletoaddr(tmp, &orte_rmcast_base.xmit_network, NULL);
orte_rmcast_base.octet3[0] = 0; }
orte_rmcast_base.octet3[1] = 0;
} else { if (ORTE_SUCCESS != rc) {
orte_show_help("help-rmcast-base.txt", "unrecognized-scope", true, tmp); orte_show_help("help-rmcast-base.txt", "unrecognized-network", true, tmp);
return ORTE_ERR_SILENT; return ORTE_ERR_SILENT;
} }
/* channel offset */ /* channel offset */
mca_base_param_reg_int_name("rmcast", "base_starting_channel", mca_base_param_reg_string_name("rmcast", "base_group",
"Offset to use within each network when computing channel (default: 0)", "Multicast group of this process (name:number)",
false, false, 0, &value); false, false, NULL, &tmp);
/* check for correctness of value */ /* parse the value */
if (value < 0 || value > 255) { if (NULL != tmp) {
orte_show_help("help-rmcast-base.txt", "value-range", true, if (NULL == (ptr = strrchr(tmp, ':'))) {
"starting channel", value, "0-255"); orte_show_help("help-rmcast-base.txt", "value-out-of-range", true,
return ORTE_ERR_SILENT; tmp, "string-name:number");
return ORTE_ERR_SILENT;
}
*ptr = '\0';
orte_rmcast_base.my_group_name = strdup(tmp);
ptr++;
value = strtoul(ptr, NULL, 10);
if (value < 2 || value > 255) {
orte_show_help("help-rmcast-base.txt", "value-out-of-range", true,
ptr, "2-255");
return ORTE_ERR_SILENT;
}
orte_rmcast_base.my_group_number = value;
}
/* multicast interfaces */
mca_base_param_reg_string_name("rmcast", "base_if_include",
"Comma-separated list of interfaces (given in IP form) to use for multicast messages",
false, false, NULL, &tmp);
/* if nothing was provided, default to first non-loopback interface */
if (NULL == tmp) {
idx = opal_ifbegin();
while (0 < idx) {
/* ignore the loopback interface */
if (opal_ifisloopback(idx)) {
idx = opal_ifnext(idx);
continue;
}
if (ORTE_SUCCESS != (rc = opal_ifindextoaddr(idx, (struct sockaddr*)&inaddr, sizeof(inaddr)))) {
ORTE_ERROR_LOG(rc);
return rc;
}
orte_rmcast_base.interface = ntohl(inaddr.sin_addr.s_addr);
break;
}
if (idx < 0) {
orte_show_help("help-rmcast-base.txt", "no-avail-interfaces", true);
return ORTE_ERR_SILENT;
}
} else {
/* separate the list */
nets = opal_argv_split(tmp, ',');
free(tmp);
idx = -1;
assigned = false;
for (i=0; NULL != nets[i] && !assigned; i++) {
if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(nets[i], &netaddr, &netmask))) {
orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, nets[i], ORTE_ERROR_NAME(rc));
return ORTE_ERR_SILENT;
}
/* search for a matching interface - take the first one within the returned scope */
idx = opal_ifbegin();
while (0 < idx) {
/* ignore the loopback interface */
if (opal_ifisloopback(idx)) {
idx = opal_ifnext(idx);
continue;
}
if (ORTE_SUCCESS != (rc = opal_ifindextoaddr(idx, (struct sockaddr*)&inaddr, sizeof(inaddr)))) {
ORTE_ERROR_LOG(rc);
return rc;
}
addr = ntohl(inaddr.sin_addr.s_addr);
if (netaddr == (addr & netmask)) {
orte_rmcast_base.interface = ntohl(inaddr.sin_addr.s_addr);
assigned = true;
break;
}
idx = opal_ifnext(idx);
}
}
opal_argv_free(nets);
if (idx < 0) {
orte_show_help("help-rmcast-base.txt", "no-avail-interfaces", true);
return ORTE_ERR_SILENT;
}
} }
orte_rmcast_base.channel_offset = (uint8_t)value;
/* range of available ports */ /* range of available ports */
mca_base_param_reg_string_name("rmcast", "base_multicast_ports", mca_base_param_reg_string_name("rmcast", "base_multicast_ports",
"Ports available for multicast channels (default: 6900-7155)", "Ports available for multicast channels (default: 6900-7155)",
false, false, "6900-7155", &tmp); false, false, "6900-7154", &tmp);
ports = NULL;
orte_util_parse_range_options(tmp, &ports); orte_util_parse_range_options(tmp, &ports);
if (255 < opal_argv_count(ports)) {
orte_show_help("help-rmcast-base.txt", "too-many-values", true,
"ports", tmp, opal_argv_count(ports), "255");
free(tmp);
opal_argv_free(nets);
return ORTE_ERR_SILENT;
}
for (i=0; i < opal_argv_count(ports); i++) { for (i=0; i < opal_argv_count(ports); i++) {
pval = strtoul(ports[i], NULL, 10); pval = strtoul(ports[i], NULL, 10);
if (pval >= UINT16_MAX) { if (pval >= UINT16_MAX) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(tmp);
opal_argv_free(ports);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
orte_rmcast_base.ports[i] = pval; orte_rmcast_base.ports[i] = pval;
} }
free(tmp);
opal_argv_free(ports);
/* Debugging / verbose output. Always have stream open, with /* Debugging / verbose output. Always have stream open, with
verbose set by the mca open system... */ verbose set by the mca open system... */

Просмотреть файл

@ -17,6 +17,7 @@
#include "orte/mca/rmcast/base/base.h" #include "orte/mca/rmcast/base/base.h"
static bool selected = false;
/* /*
* Select one RMCAST component from all those that are available. * Select one RMCAST component from all those that are available.
@ -27,6 +28,12 @@ int orte_rmcast_base_select(void)
orte_rmcast_module_t *best_module = NULL; orte_rmcast_module_t *best_module = NULL;
int rc; int rc;
if (selected) {
/* ensure we don't do this twice */
return ORTE_SUCCESS;
}
selected = true;
/* /*
* Select the best component * Select the best component
*/ */

Просмотреть файл

@ -40,45 +40,15 @@
static opal_mutex_t lock; static opal_mutex_t lock;
static opal_list_t recvs; static opal_list_t recvs;
static opal_list_t channels; static opal_list_t channels;
static opal_list_t networks;
static bool init_completed = false; static bool init_completed = false;
static uint8_t next_octet1 = 0; static orte_rmcast_channel_t next_channel;
static uint8_t next_octet2 = 0;
static uint8_t next_octet3 = 0;
static unsigned int next_channel = 0;
/* LOCAL FUNCTIONS */
#define CLOSE_THE_SOCKET(socket) \ #define CLOSE_THE_SOCKET(socket) \
do { \ do { \
shutdown(socket, 2); \ shutdown(socket, 2); \
close(socket); \ close(socket); \
} while(0) } while(0)
static void recv_handler(int sd, short flags, void* user);
static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket);
static void xmit_data(int sd, short flags, void* send_req);
static uint32_t parse_network(char *network);
/*
* Data structure for tracking networks and their channels
*/
typedef struct {
opal_list_item_t item;
uint32_t network;
uint8_t next_freq;
} rmcast_basic_network_t;
static void newtork_construct(rmcast_basic_network_t *ptr)
{
ptr->network = 0;
ptr->next_freq = 0;
}
OBJ_CLASS_INSTANCE(rmcast_basic_network_t,
opal_list_item_t,
newtork_construct,
NULL);
/* /*
* Data structure for tracking assigned channels * Data structure for tracking assigned channels
@ -86,10 +56,10 @@ OBJ_CLASS_INSTANCE(rmcast_basic_network_t,
typedef struct { typedef struct {
opal_list_item_t item; opal_list_item_t item;
char *name; char *name;
unsigned int channel; orte_rmcast_channel_t channel;
uint32_t full_addr; uint32_t network;
uint16_t port; uint16_t port;
uint8_t freq; uint32_t interface;
int xmit; int xmit;
int recv; int recv;
struct sockaddr_in addr; struct sockaddr_in addr;
@ -105,10 +75,10 @@ typedef struct {
static void channel_construct(rmcast_basic_channel_t *ptr) static void channel_construct(rmcast_basic_channel_t *ptr)
{ {
ptr->name = NULL; ptr->name = NULL;
ptr->channel = 0; ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->full_addr = 0; ptr->network = 0;
ptr->port = 0; ptr->port = 0;
ptr->freq = 0; ptr->interface = 0;
ptr->xmit = -1; ptr->xmit = -1;
ptr->recv = -1; ptr->recv = -1;
memset(&ptr->addr, 0, sizeof(ptr->addr)); memset(&ptr->addr, 0, sizeof(ptr->addr));
@ -156,7 +126,7 @@ OBJ_CLASS_INSTANCE(rmcast_basic_channel_t,
*/ */
typedef struct { typedef struct {
opal_list_item_t item; opal_list_item_t item;
uint32_t channel; orte_rmcast_channel_t channel;
bool recvd; bool recvd;
opal_buffer_t *data; opal_buffer_t *data;
orte_rmcast_tag_t tag; orte_rmcast_tag_t tag;
@ -167,7 +137,7 @@ typedef struct {
static void recv_construct(rmcast_basic_recv_t *ptr) static void recv_construct(rmcast_basic_recv_t *ptr)
{ {
ptr->channel = 0; ptr->channel = ORTE_RMCAST_INVALID_CHANNEL;
ptr->recvd = false; ptr->recvd = false;
ptr->data = NULL; ptr->data = NULL;
ptr->tag = ORTE_RMCAST_TAG_INVALID; ptr->tag = ORTE_RMCAST_TAG_INVALID;
@ -214,35 +184,47 @@ OBJ_CLASS_INSTANCE(rmcast_basic_send_t,
send_construct, send_construct,
NULL); NULL);
/* LOCAL FUNCTIONS */
static void recv_handler(int sd, short flags, void* user);
static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction);
static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket);
static void xmit_data(int sd, short flags, void* send_req);
/* API FUNCTIONS */ /* API FUNCTIONS */
static int init(void); static int init(void);
static void finalize(void); static void finalize(void);
static int basic_send(unsigned int channel, static int basic_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf);
static int basic_send_nb(unsigned int channel, static int basic_send_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf, opal_buffer_t *buf,
orte_rmcast_callback_fn_t cbfunc, orte_rmcast_callback_fn_t cbfunc,
void *cbdata); void *cbdata);
static int basic_recv(unsigned int channel, static int basic_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf);
static int basic_recv_nb(unsigned int channel, static int basic_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags, orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_rmcast_callback_fn_t cbfunc, void *cbdata); orte_rmcast_callback_fn_t cbfunc, void *cbdata);
static void cancel_recv(unsigned int channel, static void cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag); orte_rmcast_tag_t tag);
static unsigned int open_channel(char *name, char *network, uint8_t direction); static int open_channel(orte_rmcast_channel_t *channel, char *name,
char *network, int port, char *interface, uint8_t direction);
static int close_channel(orte_rmcast_channel_t channel);
/* The API's in this module are solely used to support LOCAL /* The API's in this module are solely used to support LOCAL
* procs - i.e., procs that are co-located to the HNP. Remote * procs - i.e., procs that are co-located to the HNP. Remote
@ -258,18 +240,33 @@ orte_rmcast_module_t orte_rmcast_basic_module = {
basic_recv, basic_recv,
basic_recv_nb, basic_recv_nb,
cancel_recv, cancel_recv,
open_channel open_channel,
close_channel
}; };
/* during init, we setup two channels for both xmit and recv:
* (a) a public address announcement channel. There are two variants
* of this:
* (1) system processes - e.g., daemons, tools. This channel
* is reserved solely for their use in performing admin
* functions
* (2) application processes. This channel is used to announce
* their existence and contact info for auto-wireup
* (b) our own group's channel, which is where our own output
* will be sent. At this time, we assume that we always
* want to hear our peers, so this channels is also
* bidirectional
*
* In addition, the HNP opens a third channel which is used solely
* for cmd-control purposes. This is where a tool, for example, might
* send a cmd to the HNP to take some action - there is no point in
* having that cmd echo around to every daemon and/or other tool
* in the system.
*/
static int init(void) static int init(void)
{ {
int xmitsd, recvsd;
rmcast_basic_channel_t *chan; rmcast_basic_channel_t *chan;
rmcast_basic_network_t *net;
uint8_t freq;
char *name;
int rc; int rc;
uint16_t port;
if (init_completed) { if (init_completed) {
return ORTE_SUCCESS; return ORTE_SUCCESS;
@ -283,74 +280,44 @@ static int init(void)
OBJ_CONSTRUCT(&lock, opal_mutex_t); OBJ_CONSTRUCT(&lock, opal_mutex_t);
OBJ_CONSTRUCT(&recvs, opal_list_t); OBJ_CONSTRUCT(&recvs, opal_list_t);
OBJ_CONSTRUCT(&channels, opal_list_t); OBJ_CONSTRUCT(&channels, opal_list_t);
OBJ_CONSTRUCT(&networks, opal_list_t); next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS;
/* set the last octets to point at the beginning of the /* setup the respective public address channel */
* specified octet ranges
*/
next_octet1 = orte_rmcast_base.octet1[0];
next_octet2 = orte_rmcast_base.octet2[0];
next_octet3 = orte_rmcast_base.octet3[0];
/* form our base-level network */
net = OBJ_NEW(rmcast_basic_network_t);
net->network = OPAL_IF_ASSEMBLE_NETWORK(next_octet1, next_octet2, next_octet3);
/* select our frequency and port */
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) {
freq = ORTE_RMCAST_SYS_ADDR + orte_rmcast_base.channel_offset;
name = "system";
} else if (ORTE_PROC_IS_APP) {
freq = ORTE_RMCAST_APP_PUBLIC_ADDR + orte_rmcast_base.channel_offset;
name = "app-public";
} else {
return ORTE_ERR_NOT_SUPPORTED;
}
port = orte_rmcast_base.ports[freq-orte_rmcast_base.channel_offset-1];
/* setup the next freq for this network */
net->next_freq = freq + 1;
/* add this channel to our list */
chan = OBJ_NEW(rmcast_basic_channel_t); chan = OBJ_NEW(rmcast_basic_channel_t);
chan->name = strdup(name); if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) {
chan->full_addr = net->network + freq; chan->name = strdup("system");
chan->port = port; chan->channel = ORTE_RMCAST_SYS_CHANNEL;
chan->channel = next_channel++; chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_SYS_CHANNEL;
chan->port = orte_rmcast_base.ports[ORTE_RMCAST_SYS_CHANNEL];
/* setup the IPv4 addr info */ chan->interface = orte_rmcast_base.interface;
chan->addr.sin_family = AF_INET; } else if (ORTE_PROC_IS_APP) {
chan->addr.sin_addr.s_addr = htonl(chan->full_addr); chan->name = strdup("app-announce");
chan->addr.sin_port = htons(chan->port); chan->channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL;
chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_APP_PUBLIC_CHANNEL;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, chan->port = orte_rmcast_base.ports[ORTE_RMCAST_APP_PUBLIC_CHANNEL];
"addr %03d.%03d.%03d.%03d port %d freq %d offset %d ports %d", chan->interface = orte_rmcast_base.interface;
OPAL_IF_FORMAT_ADDR(chan->full_addr), (int)port, } else {
(int)freq, (int)orte_rmcast_base.channel_offset, opal_output(0, "rmcast:basic:init - unknown process type");
(int)orte_rmcast_base.ports[0])); return ORTE_ERR_SILENT;
}
/* create a xmit socket */ if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) {
if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, port, false))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
/* create a recv socket */ /* finally, if we are an app, setup our grp channel, if one was given */
if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, port, true))) { if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) {
ORTE_ERROR_LOG(rc); chan = OBJ_NEW(rmcast_basic_channel_t);
return rc; chan->name = strdup(orte_rmcast_base.my_group_name);
chan->channel = orte_rmcast_base.my_group_number;
chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number;
chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number];
chan->interface = orte_rmcast_base.interface;
if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
} }
chan->recv = recvsd;
chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
/* setup an event to catch messages */
opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0);
/* setup the event to xmit messages, but don't activate it */
opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan);
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
@ -372,10 +339,6 @@ static void finalize(void)
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OBJ_DESTRUCT(&channels); OBJ_DESTRUCT(&channels);
while (NULL != (item = opal_list_remove_first(&networks))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&networks);
OPAL_THREAD_UNLOCK(&lock); OPAL_THREAD_UNLOCK(&lock);
OBJ_DESTRUCT(&lock); OBJ_DESTRUCT(&lock);
@ -384,14 +347,14 @@ static void finalize(void)
} }
/* internal blocking send support */ /* internal blocking send support */
static void internal_snd_cb(int channel, opal_buffer_t *buf, void *cbdata) static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata)
{ {
rmcast_basic_send_t *snd = (rmcast_basic_send_t*)cbdata; rmcast_basic_send_t *snd = (rmcast_basic_send_t*)cbdata;
snd->send_complete = true; snd->send_complete = true;
} }
static int basic_send(unsigned int channel, static int basic_send(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf) opal_buffer_t *buf)
{ {
@ -420,14 +383,14 @@ static int basic_send(unsigned int channel,
" called on multicast channel %03d.%03d.%03d.%03d %0x", " called on multicast channel %03d.%03d.%03d.%03d %0x",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)buf->bytes_used, (unsigned long)buf->bytes_used,
OPAL_IF_FORMAT_ADDR(ch->full_addr), ch->full_addr)); OPAL_IF_FORMAT_ADDR(ch->network), ch->network));
/* check the msg size to ensure it isn't too big */ /* check the msg size to ensure it isn't too big */
if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) { if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) {
orte_show_help("help-orte-rmcast-basic.txt", orte_show_help("help-orte-rmcast-basic.txt",
"orte-rmcast-basic:msg-too-large", true, "orte-rmcast-basic:msg-too-large", true,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(ch->full_addr), tag, OPAL_IF_FORMAT_ADDR(ch->network), tag,
buf->bytes_used, buf->bytes_used,
ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10); ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10);
return ORTE_ERR_NOT_SUPPORTED; return ORTE_ERR_NOT_SUPPORTED;
@ -457,7 +420,7 @@ static int basic_send(unsigned int channel,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int basic_send_nb(unsigned int channel, static int basic_send_nb(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf, opal_buffer_t *buf,
orte_rmcast_callback_fn_t cbfunc, orte_rmcast_callback_fn_t cbfunc,
@ -488,13 +451,13 @@ static int basic_send_nb(unsigned int channel,
" called on multicast channel %03d.%03d.%03d.%03d %0x", " called on multicast channel %03d.%03d.%03d.%03d %0x",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)buf->bytes_used, (unsigned long)buf->bytes_used,
OPAL_IF_FORMAT_ADDR(ch->full_addr), ch->full_addr)); OPAL_IF_FORMAT_ADDR(ch->network), ch->network));
if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) { if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) {
orte_show_help("help-orte-rmcast-basic.txt", orte_show_help("help-orte-rmcast-basic.txt",
"orte-rmcast-basic:msg-too-large", true, "orte-rmcast-basic:msg-too-large", true,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(ch->full_addr), tag, OPAL_IF_FORMAT_ADDR(ch->network), tag,
buf->bytes_used, buf->bytes_used,
ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10); ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10);
return ORTE_ERR_NOT_SUPPORTED; return ORTE_ERR_NOT_SUPPORTED;
@ -521,7 +484,7 @@ static int basic_send_nb(unsigned int channel,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int basic_recv(unsigned int channel, static int basic_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf) opal_buffer_t *buf)
{ {
@ -547,7 +510,7 @@ static int basic_recv(unsigned int channel,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d", "%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->full_addr))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
recvptr = OBJ_NEW(rmcast_basic_recv_t); recvptr = OBJ_NEW(rmcast_basic_recv_t);
recvptr->channel = channel; recvptr->channel = channel;
@ -564,7 +527,7 @@ static int basic_recv(unsigned int channel,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int basic_recv_nb(unsigned int channel, static int basic_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags, orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_rmcast_callback_fn_t cbfunc, void *cbdata) orte_rmcast_callback_fn_t cbfunc, void *cbdata)
@ -591,7 +554,7 @@ static int basic_recv_nb(unsigned int channel,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d", "%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->full_addr))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
recvptr = OBJ_NEW(rmcast_basic_recv_t); recvptr = OBJ_NEW(rmcast_basic_recv_t);
recvptr->channel = channel; recvptr->channel = channel;
@ -603,7 +566,7 @@ static int basic_recv_nb(unsigned int channel,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static void cancel_recv(unsigned int channel, static void cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag) orte_rmcast_tag_t tag)
{ {
opal_list_item_t *item, *next; opal_list_item_t *item, *next;
@ -626,16 +589,31 @@ static void cancel_recv(unsigned int channel,
} }
} }
static unsigned int open_channel(char *name, char *network, uint8_t direction) static int open_channel(orte_rmcast_channel_t *channel, char *name,
char *network, int port, char *interface, uint8_t direction)
{ {
opal_list_item_t *item; opal_list_item_t *item;
rmcast_basic_channel_t *nchan, *chan; rmcast_basic_channel_t *nchan, *chan;
rmcast_basic_network_t *net, *netitem; uint32_t netaddr=0, netmask=0, intr=0;
uint32_t netaddr;
int rc; int rc;
int xmitsd, recvsd;
/* see if this name has already been assigned a channel */ /* parse the network, if provided */
if (NULL != network) {
if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(network, &netaddr, &netmask))) {
orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, network, ORTE_ERROR_NAME(rc));
return ORTE_ERR_SILENT;
}
}
/* parse the interface, if provided */
if (NULL != interface) {
if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(interface, &intr, NULL))) {
orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, interface, ORTE_ERROR_NAME(rc));
return ORTE_ERR_SILENT;
}
}
/* see if this name has already been assigned a channel on the specified network */
chan = NULL; chan = NULL;
for (item = opal_list_get_first(&channels); for (item = opal_list_get_first(&channels);
item != opal_list_get_end(&channels); item != opal_list_get_end(&channels);
@ -643,6 +621,10 @@ static unsigned int open_channel(char *name, char *network, uint8_t direction)
nchan = (rmcast_basic_channel_t*)item; nchan = (rmcast_basic_channel_t*)item;
if (0 == strcasecmp(nchan->name, name)) { if (0 == strcasecmp(nchan->name, name)) {
/* check the network, if one was specified */
if (0 != netaddr && netaddr != (nchan->network & netmask)) {
continue;
}
chan = nchan; chan = nchan;
break; break;
} }
@ -650,133 +632,70 @@ static unsigned int open_channel(char *name, char *network, uint8_t direction)
if (NULL != chan) { if (NULL != chan) {
/* already exists - check that the requested /* already exists - check that the requested
* socket is setup * sockets are setup
*/ */
if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) { if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
setup_socket(&chan->xmit, chan->full_addr, chan->port, false); ORTE_ERROR_LOG(rc);
return rc;
} }
if (0 > chan->recv && ORTE_RMCAST_RECV & direction) { *channel = chan->channel;
setup_socket(&chan->recv, chan->full_addr, chan->port, true); return ORTE_SUCCESS;
/* setup an event to catch messages */
opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0);
}
return chan->channel;
}
/* the named channel doesn't exist - did they give
* us a specific network to use?
*/
if (NULL != network) {
net = NULL;
netaddr = parse_network(network);
for (item = opal_list_get_first(&networks);
item != opal_list_get_end(&networks);
item = opal_list_get_next(item)) {
netitem = (rmcast_basic_network_t*)item;
if (netaddr == netitem->network) {
net = netitem;
break;
}
}
if (NULL == net) {
/* new network - create it */
net = OBJ_NEW(rmcast_basic_network_t);
net->network = OPAL_IF_ASSEMBLE_NETWORK(next_octet1, next_octet2, next_octet3);
net->next_freq = orte_rmcast_base.channel_offset + 1;
}
/* assign next freq to the next available channel */
chan = OBJ_NEW(rmcast_basic_channel_t);
chan->name = strdup(name);
chan->full_addr = net->network + net->next_freq;
chan->port = orte_rmcast_base.ports[net->next_freq-orte_rmcast_base.channel_offset-1];
chan->channel = next_channel++;
/* setup the IPv4 addr info */
chan->addr.sin_family = AF_INET;
chan->addr.sin_addr.s_addr = htonl(chan->full_addr);
chan->addr.sin_port = htons(chan->port);
if (ORTE_RMCAST_XMIT & direction) {
/* create a xmit socket */
if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, chan->port, false))) {
ORTE_ERROR_LOG(rc);
return 0;
}
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
}
if (ORTE_RMCAST_RECV & direction) {
/* create a recv socket */
if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, chan->port, true))) {
ORTE_ERROR_LOG(rc);
return 0;
}
chan->recv = recvsd;
chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
}
/* change to the next freq */
net->next_freq++;
return chan->channel;
}
/* if we get here, then we couldn't find a channel of the given name
* AND we were not given a network address to use. In this case, use
* the next available network/freq
*
* RHC: for now, we are not going to worry about balancing loads
* across available networks. We are just going to use the next
* network with an available freq
*/
net = NULL;
for (item = opal_list_get_first(&networks);
item != opal_list_get_end(&networks);
item = opal_list_get_next(item)) {
netitem = (rmcast_basic_network_t*)item;
if (netitem->next_freq < 255) {
net = netitem;
break;
}
}
if (NULL == net) {
/* we are hosed */
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return 255;
} }
/* we didn't find an existing match, so create a new channel */
chan = OBJ_NEW(rmcast_basic_channel_t); /* puts it on list */ chan = OBJ_NEW(rmcast_basic_channel_t); /* puts it on list */
chan->name = strdup(name); chan->name = strdup(name);
chan->full_addr = net->network + net->next_freq;
chan->port = orte_rmcast_base.ports[net->next_freq-orte_rmcast_base.channel_offset-1];
chan->channel = next_channel++; chan->channel = next_channel++;
/* if we were not given a network, use the default */
/* setup the IPv4 addr info */ if (NULL == network) {
chan->addr.sin_family = AF_INET; chan->network = orte_rmcast_base.xmit_network + chan->channel;
chan->addr.sin_addr.s_addr = htonl(chan->full_addr); } else {
chan->addr.sin_port = htons(chan->port); chan->network = netaddr;
if (ORTE_RMCAST_XMIT & direction) {
/* create a xmit socket */
if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, chan->port, false))) {
ORTE_ERROR_LOG(rc);
return 255;
}
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
} }
if (ORTE_RMCAST_RECV & direction) { /* if we were not given an interface, use the default */
/* create a recv socket */ if (NULL == interface) {
if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, chan->port, true))) { chan->interface = orte_rmcast_base.interface;
ORTE_ERROR_LOG(rc); } else {
return 255; chan->interface = intr;
} }
chan->recv = recvsd; /* if we were not given a port, use a default one */
chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); if (port < 0) {
chan->port = orte_rmcast_base.ports[chan->channel];
} else {
chan->port = port;
} }
/* change to the next freq */
net->next_freq++;
return chan->channel; if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) {
ORTE_ERROR_LOG(rc);
return rc;
}
*channel = chan->channel;
return ORTE_SUCCESS;
}
static int close_channel(orte_rmcast_channel_t channel)
{
opal_list_item_t *item;
rmcast_basic_channel_t *chan;
OPAL_THREAD_LOCK(&lock);
for (item = opal_list_get_first(&channels);
item != opal_list_get_end(&channels);
item = opal_list_get_next(item)) {
chan = (rmcast_basic_channel_t*)item;
if (channel == chan->channel) {
opal_list_remove_item(&channels, item);
OBJ_RELEASE(chan);
OPAL_THREAD_UNLOCK(&lock);
return ORTE_SUCCESS;
}
}
OPAL_THREAD_UNLOCK(&lock);
return ORTE_ERR_NOT_FOUND;
} }
static void recv_handler(int sd, short flags, void* cbdata) static void recv_handler(int sd, short flags, void* cbdata)
@ -811,11 +730,11 @@ static void recv_handler(int sd, short flags, void* cbdata)
ORTE_NAME_PRINT(&name))); ORTE_NAME_PRINT(&name)));
/* if this message is from myself, ignore it */ /* if this message is from myself, ignore it */
if (name.jobid == ORTE_PROC_MY_NAME->jobid && if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
name.vpid == ORTE_PROC_MY_NAME->vpid) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:basic:recv sent to myself!", "%s rmcast:basic:recv sent from myself: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
return; return;
} }
@ -839,8 +758,8 @@ static void recv_handler(int sd, short flags, void* cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)ptr->channel, (int)ptr->tag)); (int)ptr->channel, (int)ptr->tag));
if (chan->channel == ptr->channel && if ((chan->channel == ptr->channel || ORTE_RMCAST_WILDCARD_CHANNEL == ptr->channel) &&
tag == ptr->tag) { (tag == ptr->tag || ORTE_RMCAST_TAG_WILDCARD == ptr->tag)) {
/* data must be placed in malloc'd area for buffer */ /* data must be placed in malloc'd area for buffer */
payload = (uint8_t*)malloc(sz-10); payload = (uint8_t*)malloc(sz-10);
memcpy(payload, &chan->recvd_data[10], sz-10); memcpy(payload, &chan->recvd_data[10], sz-10);
@ -871,7 +790,50 @@ static void recv_handler(int sd, short flags, void* cbdata)
return; return;
} }
static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction)
{
int rc;
int xmitsd, recvsd;
/* setup the IPv4 addr info */
chan->addr.sin_family = AF_INET;
chan->addr.sin_addr.s_addr = htonl(chan->network);
chan->addr.sin_port = htons(chan->port);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"setup:channel addr %03d.%03d.%03d.%03d port %d",
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port));
if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) {
/* create a xmit socket */
if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan, false))) {
ORTE_ERROR_LOG(rc);
return rc;
}
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
/* setup the event to xmit messages, but don't activate it */
opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan);
}
if (0 > chan->recv && ORTE_RMCAST_RECV & direction) {
/* create a recv socket */
if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan, true))) {
ORTE_ERROR_LOG(rc);
return rc;
}
chan->recv = recvsd;
chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size);
/* setup an event to catch messages */
opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan);
opal_event_add(&chan->recv_ev, 0);
}
return ORTE_SUCCESS;
}
static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket)
{ {
uint8_t ttl = 1; uint8_t ttl = 1;
struct sockaddr_in inaddr; struct sockaddr_in inaddr;
@ -880,6 +842,10 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket)
int target_sd; int target_sd;
int flags; int flags;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"setup:socket addr %03d.%03d.%03d.%03d port %d",
OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port));
target_sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); target_sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(target_sd < 0) { if(target_sd < 0) {
if (EAFNOSUPPORT != opal_socket_errno) { if (EAFNOSUPPORT != opal_socket_errno) {
@ -892,7 +858,7 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket)
/* set the multicast flags */ /* set the multicast flags */
if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_TTL, if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_TTL,
(void *)&ttl, sizeof(ttl))) < 0) { (void *)&ttl, sizeof(ttl))) < 0) {
opal_output(0,"rmcast:init: socketopt() failed: %s (%d)", opal_output(0,"rmcast:init: socketopt() failed on MULTICAST_TTL: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno); strerror(opal_socket_errno), opal_socket_errno);
return ORTE_ERR_IN_ERRNO; return ORTE_ERR_IN_ERRNO;
} }
@ -907,36 +873,60 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket)
return ORTE_ERROR; return ORTE_ERROR;
} }
/* Bind the socket if requested */ /* if this is the recv side... */
if (bindsocket) { if (recvsocket) {
memset(&inaddr, 0, sizeof(inaddr)); memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET; inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = htonl(chan); inaddr.sin_addr.s_addr = htonl(chan->network);
inaddr.sin_port = htons(port); inaddr.sin_port = htons(chan->port);
addrlen = sizeof(struct sockaddr_in); addrlen = sizeof(struct sockaddr_in);
/* bind the socket */ /* bind the socket */
if (bind(target_sd, (struct sockaddr*)&inaddr, addrlen) < 0) { if (bind(target_sd, (struct sockaddr*)&inaddr, addrlen) < 0) {
opal_output(0, "%s rmcast:init: bind() failed: %s (%d)", opal_output(0, "%s rmcast:init: bind() failed for addr %03d.%03d.%03d.%03d port %d\n\tError: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port,
strerror(opal_socket_errno), opal_socket_errno); strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(target_sd); CLOSE_THE_SOCKET(target_sd);
return ORTE_ERROR; return ORTE_ERROR;
} }
} /* set membership on the multicast interface */
memset(&req, 0, sizeof (req));
req.imr_multiaddr.s_addr = htonl(chan->network);
req.imr_interface.s_addr = htonl(chan->interface);
/* set membership to "any" */ OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
memset(&req, 0, sizeof (req)); "setup:socket:membership addr %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d",
req.imr_multiaddr.s_addr = htonl(chan); OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface)));
req.imr_interface.s_addr = htonl(INADDR_ANY);
if ((setsockopt(target_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, if ((setsockopt(target_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(void *)&req, sizeof (req))) < 0) { (void *)&req, sizeof (req))) < 0) {
opal_output(0, "%s rmcast:init: setsockopt() failed: %s (%d)", opal_output(0, "%s rmcast:init: setsockopt() failed on ADD_MEMBERSHIP\n"
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), "\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
strerror(opal_socket_errno), opal_socket_errno); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
CLOSE_THE_SOCKET(target_sd); OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface),
return ORTE_ERROR; strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(target_sd);
return ORTE_ERROR;
}
} else {
/* on the xmit side, need to set the interface */
memset(&req, 0, sizeof (req));
req.imr_interface.s_addr = htonl(chan->interface);
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"setup:socket:xmit interface %03d.%03d.%03d.%03d",
OPAL_IF_FORMAT_ADDR(chan->interface)));
if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_IF,
(void *)&req, sizeof (req))) < 0) {
opal_output(0, "%s rmcast:init: setsockopt() failed on MULTICAST_IF\n"
"\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface),
strerror(opal_socket_errno), opal_socket_errno);
CLOSE_THE_SOCKET(target_sd);
return ORTE_ERROR;
}
} }
/* set socket up to be non-blocking */ /* set socket up to be non-blocking */
@ -1025,22 +1015,3 @@ static void xmit_data(int sd, short flags, void* send_req)
OPAL_THREAD_UNLOCK(&chan->send_lock); OPAL_THREAD_UNLOCK(&chan->send_lock);
} }
static uint32_t parse_network(char *network)
{
char **octets=NULL;
uint32_t net, oct1, oct2, oct3;
/* the network will be provided as a set of dot-separated
* octets, so split at those points
*/
octets = opal_argv_split(network, '.');
oct1 = strtoul(octets[0], NULL, 10);
oct2 = strtoul(octets[1], NULL, 10);
oct3 = strtoul(octets[2], NULL, 10);
net = ((oct1 >> 24) & 0x000000FF) |
((oct2 >> 16) & 0x000000FF) |
((oct3 >> 8) & 0x000000FF);
opal_argv_free(octets);
return net;
}

Просмотреть файл

@ -38,7 +38,7 @@ BEGIN_C_DECLS
/** /**
* Function prototype for callback from receiving multicast messages * Function prototype for callback from receiving multicast messages
*/ */
typedef void (*orte_rmcast_callback_fn_t)(int channel, opal_buffer_t *buf, void* cbdata); typedef void (*orte_rmcast_callback_fn_t)(orte_rmcast_channel_t channel, opal_buffer_t *buf, void* cbdata);
/* initialize the selected module */ /* initialize the selected module */
typedef int (*orte_rmcast_base_module_init_fn_t)(void); typedef int (*orte_rmcast_base_module_init_fn_t)(void);
@ -47,34 +47,38 @@ typedef int (*orte_rmcast_base_module_init_fn_t)(void);
typedef void (*orte_rmcast_base_module_finalize_fn_t)(void); typedef void (*orte_rmcast_base_module_finalize_fn_t)(void);
/* send a buffered message across a multicast channel */ /* send a buffered message across a multicast channel */
typedef int (*orte_rmcast_base_module_send_fn_t)(unsigned int channel, typedef int (*orte_rmcast_base_module_send_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf);
/* non-blocking send messages from a multicast channel */ /* non-blocking send messages from a multicast channel */
typedef int (*orte_rmcast_base_module_send_nb_fn_t)(unsigned int channel, typedef int (*orte_rmcast_base_module_send_nb_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf, opal_buffer_t *buf,
orte_rmcast_callback_fn_t cbfunc, orte_rmcast_callback_fn_t cbfunc,
void *cbdata); void *cbdata);
/* non-blocking receive messages from a multicast channel */ /* non-blocking receive messages from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(unsigned int channel, typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags, orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
orte_rmcast_callback_fn_t cbfunc, void *cbdata); orte_rmcast_callback_fn_t cbfunc, void *cbdata);
/* blocking receive from a multicast channel */ /* blocking receive from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_fn_t)(unsigned int channel, typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag, orte_rmcast_tag_t tag,
opal_buffer_t *buf); opal_buffer_t *buf);
/* cancel a receive */ /* cancel a receive */
typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(unsigned int channel, typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag); orte_rmcast_tag_t tag);
/* open the next available channel */ /* open the next available channel */
typedef unsigned int (*orte_rmcast_base_module_open_rmcast_channel_fn_t)(char *name, char *network, uint8_t direction); typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t *channel, char *name,
char *network, int port, char *interface, uint8_t direction);
/* close the channel */
typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t channel);
/* /*
* rmcast component * rmcast component
@ -94,14 +98,15 @@ typedef orte_rmcast_base_component_1_0_0_t orte_rmcast_base_component_t;
* Component modules Ver 1.0 * Component modules Ver 1.0
*/ */
struct orte_rmcast_base_module_t { struct orte_rmcast_base_module_t {
orte_rmcast_base_module_init_fn_t init; orte_rmcast_base_module_init_fn_t init;
orte_rmcast_base_module_finalize_fn_t finalize; orte_rmcast_base_module_finalize_fn_t finalize;
orte_rmcast_base_module_send_fn_t send; orte_rmcast_base_module_send_fn_t send;
orte_rmcast_base_module_send_nb_fn_t send_nb; orte_rmcast_base_module_send_nb_fn_t send_nb;
orte_rmcast_base_module_recv_fn_t recv; orte_rmcast_base_module_recv_fn_t recv;
orte_rmcast_base_module_recv_nb_fn_t recv_nb; orte_rmcast_base_module_recv_nb_fn_t recv_nb;
orte_rmcast_base_module_cancel_recv_fn_t cancel_recv; orte_rmcast_base_module_cancel_recv_fn_t cancel_recv;
orte_rmcast_base_module_open_rmcast_channel_fn_t open_channel; orte_rmcast_base_module_open_channel_fn_t open_channel;
orte_rmcast_base_module_close_channel_fn_t close_channel;
}; };
/** Convienence typedef */ /** Convienence typedef */
typedef struct orte_rmcast_base_module_t orte_rmcast_module_t; typedef struct orte_rmcast_base_module_t orte_rmcast_module_t;

Просмотреть файл

@ -21,12 +21,17 @@
BEGIN_C_DECLS BEGIN_C_DECLS
/* channel type */
typedef int32_t orte_rmcast_channel_t;
#define ORTE_RMCAST_CHANNEL_T OPAL_INT32
/* ORTE IP multicast channels */ /* ORTE IP multicast channels */
#define ORTE_RMCAST_SYS_ADDR 1 #define ORTE_RMCAST_WILDCARD_CHANNEL -1
#define ORTE_RMCAST_APP_PUBLIC_ADDR 2 #define ORTE_RMCAST_INVALID_CHANNEL 0
#define ORTE_RMCAST_SYS_CHANNEL 1
#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 2
#define ORTE_RMCAST_DYNAMIC_CHANNELS 3
#define ORTE_RMCAST_DYNAMIC_CHANNELS 100
/* define channel directions */ /* define channel directions */
@ -35,12 +40,13 @@ BEGIN_C_DECLS
#define ORTE_RMCAST_BIDIR 0x03 #define ORTE_RMCAST_BIDIR 0x03
/* Message matching tag */ /* Message matching tag */
typedef uint16_t orte_rmcast_tag_t; typedef int32_t orte_rmcast_tag_t;
#define ORTE_RMCAST_TAG_T OPAL_UINT16 #define ORTE_RMCAST_TAG_T OPAL_INT32
/* tag values for well-known services */ /* tag values for well-known services */
#define ORTE_RMCAST_TAG_INVALID 0 #define ORTE_RMCAST_TAG_WILDCARD -1
#define ORTE_RMCAST_TAG_BOOTSTRAP 1 #define ORTE_RMCAST_TAG_INVALID 0
#define ORTE_RMCAST_TAG_BOOTSTRAP 1
/* starting value for dynamicall assignable tags */ /* starting value for dynamicall assignable tags */
#define ORTE_RMCAST_TAG_DYNAMIC 100 #define ORTE_RMCAST_TAG_DYNAMIC 100

Просмотреть файл

@ -168,6 +168,9 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t);
/* onesided barrier */ /* onesided barrier */
#define ORTE_RML_TAG_ONESIDED_BARRIER 34 #define ORTE_RML_TAG_ONESIDED_BARRIER 34
/* bootstrap */
#define ORTE_RML_TAG_BOOTSTRAP 35
#define ORTE_RML_TAG_MAX 100 #define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -219,13 +219,6 @@ static int update_route(orte_process_name_t *target,
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
/* if the job family is zero, then this is going to a local slave,
* so the path is direct and there is nothing to do here
*/
if (0 == ORTE_JOB_FAMILY(target->jobid)) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_cm_update: %s --> %s", "%s routed_cm_update: %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -311,16 +304,13 @@ static orte_process_name_t get_route(orte_process_name_t *target)
goto found; goto found;
} }
/* if the job family is zero, then this is going to a local slave,
* so the path is direct
*/
if (0 == ORTE_JOB_FAMILY(target->jobid)) {
ret = target;
goto found;
}
/* IF THIS IS FOR A DIFFERENT JOB FAMILY... */ /* IF THIS IS FOR A DIFFERENT JOB FAMILY... */
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) { if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* if I am a daemon, send it to the HNP */
if (ORTE_PROC_IS_DAEMON) {
ret = ORTE_PROC_MY_HNP;
goto found;
}
/* if I am the HNP, then I stored a route to /* if I am the HNP, then I stored a route to
* this job family, so look it up * this job family, so look it up
*/ */
@ -350,17 +340,13 @@ static orte_process_name_t get_route(orte_process_name_t *target)
ret = target; ret = target;
goto found; goto found;
} else { } else {
/* if I am a daemon, route it through the HNP to avoid /* otherwise, if I am the HNP, send to the daemon */
* opening unnecessary sockets if (ORTE_PROC_IS_HNP) {
*/ ret = &daemon;
if (ORTE_PROC_IS_DAEMON) { } else {
/* send to the HNP for routing */
ret = ORTE_PROC_MY_HNP; ret = ORTE_PROC_MY_HNP;
goto found;
} }
/* otherwise, if I am the HNP, send to that daemon */
ret = &daemon;
goto found; goto found;
} }
@ -510,7 +496,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
*/ */
if (ORTE_PROC_IS_DAEMON) { if (ORTE_PROC_IS_DAEMON) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, OPAL_OUTPUT_VERBOSE((0, orte_routed_base_output,
"%s routed_cm: init routes for daemon job %s\n\thnp_uri %s", "%s routed_cm: init routes for daemon job %s\n\thnp_uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job), ORTE_JOBID_PRINT(job),
@ -545,15 +531,9 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
* part of the message confirming they are read to go. HNP's * part of the message confirming they are read to go. HNP's
* load their contact info during orte_init * load their contact info during orte_init
*/ */
} else { }
/* ndat != NULL means we are getting an update of RML info
* for the daemons - so update our contact info and routes /* ignore any other call as we only talk to the HNP */
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm: completed init routes", "%s routed_cm: completed init routes",
@ -565,7 +545,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
if (ORTE_PROC_IS_HNP) { if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, OPAL_OUTPUT_VERBOSE((0, orte_routed_base_output,
"%s routed_cm: init routes for HNP job %s", "%s routed_cm: init routes for HNP job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job))); ORTE_JOBID_PRINT(job)));

Просмотреть файл

@ -46,8 +46,17 @@ orte_routed_component_t mca_routed_cm_component = {
static int orte_routed_cm_component_query(mca_base_module_t **module, int *priority) static int orte_routed_cm_component_query(mca_base_module_t **module, int *priority)
{ {
/* only pick us if we were specifically directed to be used */ char *spec;
*priority = 0;
*module = (mca_base_module_t *) &orte_routed_cm_module; /* only select us if specified */
spec = getenv("OMPI_MCA_routed");
if (NULL == spec || 0 != strcmp("cm", spec)) {
*priority = 0;
*module = NULL;
return ORTE_ERROR;
}
*priority = 1000;
*module = (mca_base_module_t *)&orte_routed_cm_module;
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }

Просмотреть файл

@ -718,14 +718,26 @@ int orte_daemon(int argc, char *argv[])
OBJ_RELEASE(buffer); OBJ_RELEASE(buffer);
goto DONE; goto DONE;
} }
} else if (orte_daemon_bootstrap) {
/* send to a different callback location as the
* HNP didn't launch us and isn't waiting for a
* callback
*/
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer,
ORTE_RML_TAG_BOOTSTRAP, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
} else {
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer,
ORTE_RML_TAG_ORTED_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
} }
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer,
ORTE_RML_TAG_ORTED_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OBJ_RELEASE(buffer); /* done with this */ OBJ_RELEASE(buffer); /* done with this */
} }