diff --git a/orte/mca/ess/cm/ess_cm_component.c b/orte/mca/ess/cm/ess_cm_component.c index 8d4729afe7..0b5d2d1348 100644 --- a/orte/mca/ess/cm/ess_cm_component.c +++ b/orte/mca/ess/cm/ess_cm_component.c @@ -70,10 +70,25 @@ orte_ess_cm_component_open(void) int orte_ess_cm_component_query(mca_base_module_t **module, int *priority) { +#if ORTE_ENABLE_MULTICAST + char *spec; + /* only select us if specified */ - *priority = 0; + spec = getenv("OMPI_MCA_ess"); + if (NULL == spec || 0 != strcmp("cm", spec)) { + *priority = 0; + *module = NULL; + return ORTE_ERROR; + } + *priority = 1000; *module = (mca_base_module_t *)&orte_ess_cm_module; return ORTE_SUCCESS; +#else + /* cannot be used */ + *priority = 0; + *module = NULL; + return ORTE_ERROR; +#endif } diff --git a/orte/mca/ess/cm/ess_cm_module.c b/orte/mca/ess/cm/ess_cm_module.c index 346eb35ab4..11e6a4d9f2 100644 --- a/orte/mca/ess/cm/ess_cm_module.c +++ b/orte/mca/ess/cm/ess_cm_module.c @@ -110,10 +110,6 @@ static int rte_init(void) goto error; } - /* initialize the global list of local children and job data */ - OBJ_CONSTRUCT(&orte_local_children, opal_list_t); - OBJ_CONSTRUCT(&orte_local_jobdata, opal_list_t); - /* get the list of nodes used for this job */ nodelist = getenv("OMPI_MCA_orte_nodelist"); @@ -370,11 +366,12 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata) /* ensure we default to failure */ name_success = false; - + /* unpack the cmd */ n = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &cmd, &n, ORTE_DAEMON_CMD_T))) { ORTE_ERROR_LOG(rc); + arrived = true; return; } @@ -383,6 +380,14 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata) n = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) { ORTE_ERROR_LOG(rc); + arrived = true; + return; + } + /* if we got an invalid name, then declare failure */ + if (ORTE_JOBID_INVALID == name.jobid && + ORTE_VPID_INVALID == name.vpid) { + opal_output(0, "got invalid name"); + arrived = true; return; } ORTE_PROC_MY_NAME->jobid = name.jobid; @@ -395,30 +400,21 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata) n = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &uri, &n, OPAL_STRING))) { ORTE_ERROR_LOG(rc); + arrived = true; return; } OPAL_OUTPUT_VERBOSE((1, orte_ess_base_output, "%s got hnp uri %s", ORTE_NAME_PRINT(&name), uri)); orte_process_info.my_hnp_uri = uri; - - name_success = true; + name_success = true; arrived = true; } static int cm_set_name(void) { - int i, rc; - struct sockaddr_in if_addr; - char *ifnames[] = { - "ce", - "eth0", - "eth1", - NULL - }; - int32_t net, rack, slot, function; - int32_t addr; + int rc; opal_buffer_t buf; orte_daemon_cmd_flag_t cmd; @@ -426,48 +422,7 @@ static int cm_set_name(void) OBJ_CONSTRUCT(&buf, opal_buffer_t); if (ORTE_PROC_IS_DAEMON) { - /* try constructing the name from the IP address - first, - * find an appropriate interface - */ - for (i=0; NULL != ifnames[i]; i++) { - if (ORTE_SUCCESS != (rc = opal_ifnametoaddr(ifnames[i], - (struct sockaddr*)&if_addr, - sizeof(struct sockaddr_in)))) { - continue; - } - addr = htonl(if_addr.sin_addr.s_addr); - - /* break address into sections */ - net = 0x000000FF & ((0xFF000000 & addr) >> 24); - rack = 0x000000FF & ((0x00FF0000 & addr) >> 16); - slot = 0x000000FF & ((0x0000FF00 & addr) >> 8); - function = 0x000000FF & addr; - - /* is this an appropriate interface to use */ - if (10 == net) { - /* set our vpid - add 1 to ensure it cannot be zero */ - ORTE_PROC_MY_NAME->vpid = (rack * mca_ess_cm_component.max_slots) + slot + function + 1; - /* set our jobid to 0 */ - ORTE_PROC_MY_NAME->jobid = 0; - /* notify the HNP of our existence */ - cmd = ORTE_DAEMON_CHECKIN_CMD; - opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T); - opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME); - goto checkin; - } else if (192 == net && 168 == rack) { - /* just use function */ - ORTE_PROC_MY_NAME->vpid = function + 1; - /* set our jobid to 0 */ - ORTE_PROC_MY_NAME->jobid = 0; - /* notify the HNP of our existence */ - cmd = ORTE_DAEMON_CHECKIN_CMD; - opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T); - opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME); - goto checkin; - } - } - /* if we get here, then we didn't find a usable interface. - * use the reliable multicast system to contact the HNP and + /* use the reliable multicast system to contact the HNP and * get a name */ cmd = ORTE_DAEMON_NAME_REQ_CMD; @@ -479,12 +434,11 @@ static int cm_set_name(void) opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME); } -checkin: /* always include our node name */ opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING); /* set the recv to get the answer */ - if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(0, ORTE_RMCAST_NON_PERSISTENT, + if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_NON_PERSISTENT, ORTE_RMCAST_TAG_BOOTSTRAP, cbfunc, NULL))) { ORTE_ERROR_LOG(rc); @@ -492,21 +446,24 @@ checkin: return rc; } + opal_output(0, "sending name request"); /* send the request */ - if (ORTE_SUCCESS != (rc = orte_rmcast.send(0, ORTE_RMCAST_TAG_BOOTSTRAP, + if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_SYS_CHANNEL, ORTE_RMCAST_TAG_BOOTSTRAP, &buf))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&buf); return rc; } - /* OBJ_DESTRUCT(&buf); */ + OBJ_DESTRUCT(&buf); /* wait for response */ ORTE_PROGRESSED_WAIT(arrived, 0, 1); /* if we got a valid name, return success */ if (name_success) { + opal_output(0, "returning success"); return ORTE_SUCCESS; } + opal_output(0, "returning not found"); return ORTE_ERR_NOT_FOUND; } diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 1d957ff6f4..585b5f2bd7 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -1279,6 +1279,14 @@ int orte_odls_base_default_launch_local(orte_jobid_t job, rc = ORTE_ERR_NOT_FOUND; goto CLEANUP; } + + /* do we have any local procs to launch? */ + if (0 == jobdat->num_local_procs) { + /* no - just return */ + opal_condition_signal(&orte_odls_globals.cond); + OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex); + return ORTE_SUCCESS; + } apps = jobdat->apps; num_apps = jobdat->num_apps; @@ -1874,10 +1882,17 @@ CLEANUP: * instead, we queue it up for local processing */ if (ORTE_PROC_IS_HNP) { + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:launch flagging launch report to myself", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, orte_plm_base_app_report_launch); } else { + OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, + "%s odls:launch sending launch report to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(ORTE_PROC_MY_HNP))); /* go ahead and send the update to the HNP */ if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) { ORTE_ERROR_LOG(ret); diff --git a/orte/mca/rmcast/base/base.h b/orte/mca/rmcast/base/base.h index a7453505ae..caef895e6c 100644 --- a/orte/mca/rmcast/base/base.h +++ b/orte/mca/rmcast/base/base.h @@ -17,6 +17,10 @@ */ #include "orte_config.h" +#ifdef HAVE_NETINET_IN_H +#include +#endif + #include "orte/mca/rmcast/rmcast.h" BEGIN_C_DECLS @@ -31,10 +35,10 @@ ORTE_DECLSPEC int orte_rmcast_base_open(void); typedef struct { int rmcast_output; opal_list_t rmcast_opened; - uint8_t octet1[2]; - uint8_t octet2[2]; - uint8_t octet3[2]; - uint8_t channel_offset; + uint32_t xmit_network; + char *my_group_name; + uint8_t my_group_number; + uint32_t interface; uint16_t ports[256]; } orte_rmcast_base_t; diff --git a/orte/mca/rmcast/base/help-rmcast-base.txt b/orte/mca/rmcast/base/help-rmcast-base.txt index 35fe6f8bd0..3c23a40997 100644 --- a/orte/mca/rmcast/base/help-rmcast-base.txt +++ b/orte/mca/rmcast/base/help-rmcast-base.txt @@ -9,17 +9,26 @@ # # This is the US/English general help file for Open RTE. # -[unrecognized-scope] -An out-of-range value for the scope of the multicast address range was specified: +[unrecognized-network] +An out-of-range value for the multicast transmit network was specified: - Scope: %s + Network: %s -Please specify a valid scope - as a reminder, you can use +Please specify a valid network - as a reminder, you can use ompi_info -param rmcast all to see the allowed values. # +[value-out-of-range] +The specified parameter is outside of the range or in an +incorrect format: + + Parameter: %s + Range/Format: %s + +Please adjust the value and try again. +# [value-range] The specified parameter is outside of valid range: @@ -28,3 +37,34 @@ The specified parameter is outside of valid range: Valid range: %s Please adjust the value and try again. +# +[no-avail-interfaces] +No multicast interfaces are available. Please contact +your system administrator for assistance. +# +[invalid-net-mask] +We were unable to parse the provided network interface: + + Interface: %s + Error: %s + +The interface must be one of the following forms: + + 123.456.789.123 + 123.456/16 + 123.456.789 + +The system can parse any one of these, and will find an interface +that matches within the provided scope. Please revise your input +and try again. +# +[too-many-values] +The specified parameter includes too many values: + + Paramater: %s + Value: %s + Number of results: %d + Allowed number: %d + +Please adjust the value and try again. + diff --git a/orte/mca/rmcast/base/rmcast_base_open.c b/orte/mca/rmcast/base/rmcast_base_open.c index 6ba8d52d38..04efb086be 100644 --- a/orte/mca/rmcast/base/rmcast_base_open.c +++ b/orte/mca/rmcast/base/rmcast_base_open.c @@ -24,7 +24,9 @@ #include "opal/mca/base/base.h" #include "opal/mca/base/mca_base_param.h" #include "opal/util/argv.h" +#include "opal/util/if.h" +#include "orte/mca/errmgr/errmgr.h" #include "orte/util/parse_options.h" #include "orte/util/show_help.h" @@ -66,10 +68,12 @@ orte_rmcast_module_t orte_rmcast = { NULL, NULL, NULL, + NULL, NULL }; orte_rmcast_base_t orte_rmcast_base; +static bool opened = false; /** * Function for finding and opening either all MCA components, or the one @@ -78,71 +82,163 @@ orte_rmcast_base_t orte_rmcast_base; int orte_rmcast_base_open(void) { int value, pval, i; - char *tmp, **ports=NULL; + char *tmp, **nets=NULL, **ports=NULL, *ptr; + int idx; + struct sockaddr_in inaddr; + uint32_t addr, netaddr, netmask; + bool assigned; + int rc; + + if (opened) { + /* ensure we don't go through here twice */ + return ORTE_SUCCESS; + } + opened = true; + + /* ensure all global values are initialized */ + orte_rmcast_base.xmit_network = 0; + orte_rmcast_base.my_group_name = NULL; + orte_rmcast_base.my_group_number = 0; + orte_rmcast_base.interface = 0; + for (i=0; i < 255; i++) { + orte_rmcast_base.ports[i] = 0; + } /* public multicast channel for this job */ - mca_base_param_reg_string_name("rmcast", "base_scope", - "Scope of the multicast system [link (default) | site | org | global]", + mca_base_param_reg_string_name("rmcast", "base_multicast_network", + "Network to use for multicast xmissions [link (default) | site | org | global | tuple-addr]", false, false, "link", &tmp); + rc = ORTE_ERR_SILENT; if (0 == strcasecmp(tmp, "site")) { - orte_rmcast_base.octet1[0] = 239; - orte_rmcast_base.octet1[1] = 239; - orte_rmcast_base.octet2[0] = 255; - orte_rmcast_base.octet2[1] = 255; - orte_rmcast_base.octet3[0] = 0; - orte_rmcast_base.octet3[1] = 255; + rc = opal_iftupletoaddr("239.255.0.0", &orte_rmcast_base.xmit_network, NULL); } else if (0 == strcasecmp(tmp, "org")) { - orte_rmcast_base.octet1[0] = 239; - orte_rmcast_base.octet1[1] = 239; - orte_rmcast_base.octet2[0] = 192; - orte_rmcast_base.octet2[1] = 195; - orte_rmcast_base.octet3[0] = 0; - orte_rmcast_base.octet3[1] = 255; + rc = opal_iftupletoaddr("239.192.0.0", &orte_rmcast_base.xmit_network, NULL); } else if (0 == strcasecmp(tmp, "global")) { - orte_rmcast_base.octet1[0] = 224; - orte_rmcast_base.octet1[1] = 238; - orte_rmcast_base.octet2[0] = 0; - orte_rmcast_base.octet2[1] = 255; - orte_rmcast_base.octet3[0] = 1; - orte_rmcast_base.octet3[1] = 255; + rc = opal_iftupletoaddr("224.0.1.0", &orte_rmcast_base.xmit_network, NULL); } else if (0 == strcasecmp(tmp, "link")) { /* default to link */ - orte_rmcast_base.octet1[0] = 224; - orte_rmcast_base.octet1[1] = 224; - orte_rmcast_base.octet2[0] = 0; - orte_rmcast_base.octet2[1] = 0; - orte_rmcast_base.octet3[0] = 0; - orte_rmcast_base.octet3[1] = 0; - } else { - orte_show_help("help-rmcast-base.txt", "unrecognized-scope", true, tmp); + rc = opal_iftupletoaddr("224.0.0.0", &orte_rmcast_base.xmit_network, NULL); + } else if (NULL != strchr(tmp, '.')) { + /* must have been given an actual network address */ + rc = opal_iftupletoaddr(tmp, &orte_rmcast_base.xmit_network, NULL); + } + + if (ORTE_SUCCESS != rc) { + orte_show_help("help-rmcast-base.txt", "unrecognized-network", true, tmp); return ORTE_ERR_SILENT; } /* channel offset */ - mca_base_param_reg_int_name("rmcast", "base_starting_channel", - "Offset to use within each network when computing channel (default: 0)", - false, false, 0, &value); - /* check for correctness of value */ - if (value < 0 || value > 255) { - orte_show_help("help-rmcast-base.txt", "value-range", true, - "starting channel", value, "0-255"); - return ORTE_ERR_SILENT; + mca_base_param_reg_string_name("rmcast", "base_group", + "Multicast group of this process (name:number)", + false, false, NULL, &tmp); + /* parse the value */ + if (NULL != tmp) { + if (NULL == (ptr = strrchr(tmp, ':'))) { + orte_show_help("help-rmcast-base.txt", "value-out-of-range", true, + tmp, "string-name:number"); + return ORTE_ERR_SILENT; + } + *ptr = '\0'; + orte_rmcast_base.my_group_name = strdup(tmp); + ptr++; + value = strtoul(ptr, NULL, 10); + if (value < 2 || value > 255) { + orte_show_help("help-rmcast-base.txt", "value-out-of-range", true, + ptr, "2-255"); + return ORTE_ERR_SILENT; + } + orte_rmcast_base.my_group_number = value; + } + + /* multicast interfaces */ + mca_base_param_reg_string_name("rmcast", "base_if_include", + "Comma-separated list of interfaces (given in IP form) to use for multicast messages", + false, false, NULL, &tmp); + /* if nothing was provided, default to first non-loopback interface */ + if (NULL == tmp) { + idx = opal_ifbegin(); + while (0 < idx) { + /* ignore the loopback interface */ + if (opal_ifisloopback(idx)) { + idx = opal_ifnext(idx); + continue; + } + if (ORTE_SUCCESS != (rc = opal_ifindextoaddr(idx, (struct sockaddr*)&inaddr, sizeof(inaddr)))) { + ORTE_ERROR_LOG(rc); + return rc; + } + orte_rmcast_base.interface = ntohl(inaddr.sin_addr.s_addr); + break; + } + if (idx < 0) { + orte_show_help("help-rmcast-base.txt", "no-avail-interfaces", true); + return ORTE_ERR_SILENT; + } + } else { + /* separate the list */ + nets = opal_argv_split(tmp, ','); + free(tmp); + idx = -1; + assigned = false; + for (i=0; NULL != nets[i] && !assigned; i++) { + if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(nets[i], &netaddr, &netmask))) { + orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, nets[i], ORTE_ERROR_NAME(rc)); + return ORTE_ERR_SILENT; + } + /* search for a matching interface - take the first one within the returned scope */ + idx = opal_ifbegin(); + while (0 < idx) { + /* ignore the loopback interface */ + if (opal_ifisloopback(idx)) { + idx = opal_ifnext(idx); + continue; + } + if (ORTE_SUCCESS != (rc = opal_ifindextoaddr(idx, (struct sockaddr*)&inaddr, sizeof(inaddr)))) { + ORTE_ERROR_LOG(rc); + return rc; + } + addr = ntohl(inaddr.sin_addr.s_addr); + if (netaddr == (addr & netmask)) { + orte_rmcast_base.interface = ntohl(inaddr.sin_addr.s_addr); + assigned = true; + break; + } + idx = opal_ifnext(idx); + } + } + opal_argv_free(nets); + if (idx < 0) { + orte_show_help("help-rmcast-base.txt", "no-avail-interfaces", true); + return ORTE_ERR_SILENT; + } } - orte_rmcast_base.channel_offset = (uint8_t)value; /* range of available ports */ mca_base_param_reg_string_name("rmcast", "base_multicast_ports", "Ports available for multicast channels (default: 6900-7155)", - false, false, "6900-7155", &tmp); + false, false, "6900-7154", &tmp); + ports = NULL; orte_util_parse_range_options(tmp, &ports); + if (255 < opal_argv_count(ports)) { + orte_show_help("help-rmcast-base.txt", "too-many-values", true, + "ports", tmp, opal_argv_count(ports), "255"); + free(tmp); + opal_argv_free(nets); + return ORTE_ERR_SILENT; + } for (i=0; i < opal_argv_count(ports); i++) { pval = strtoul(ports[i], NULL, 10); if (pval >= UINT16_MAX) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + free(tmp); + opal_argv_free(ports); return ORTE_ERR_BAD_PARAM; } orte_rmcast_base.ports[i] = pval; } + free(tmp); + opal_argv_free(ports); /* Debugging / verbose output. Always have stream open, with verbose set by the mca open system... */ diff --git a/orte/mca/rmcast/base/rmcast_base_select.c b/orte/mca/rmcast/base/rmcast_base_select.c index f4047e37fd..bca84de58c 100644 --- a/orte/mca/rmcast/base/rmcast_base_select.c +++ b/orte/mca/rmcast/base/rmcast_base_select.c @@ -17,6 +17,7 @@ #include "orte/mca/rmcast/base/base.h" +static bool selected = false; /* * Select one RMCAST component from all those that are available. @@ -27,6 +28,12 @@ int orte_rmcast_base_select(void) orte_rmcast_module_t *best_module = NULL; int rc; + if (selected) { + /* ensure we don't do this twice */ + return ORTE_SUCCESS; + } + selected = true; + /* * Select the best component */ diff --git a/orte/mca/rmcast/basic/rmcast_basic.c b/orte/mca/rmcast/basic/rmcast_basic.c index 31014b66cc..7d9e0d2871 100644 --- a/orte/mca/rmcast/basic/rmcast_basic.c +++ b/orte/mca/rmcast/basic/rmcast_basic.c @@ -40,45 +40,15 @@ static opal_mutex_t lock; static opal_list_t recvs; static opal_list_t channels; -static opal_list_t networks; static bool init_completed = false; -static uint8_t next_octet1 = 0; -static uint8_t next_octet2 = 0; -static uint8_t next_octet3 = 0; -static unsigned int next_channel = 0; +static orte_rmcast_channel_t next_channel; -/* LOCAL FUNCTIONS */ #define CLOSE_THE_SOCKET(socket) \ do { \ shutdown(socket, 2); \ close(socket); \ } while(0) -static void recv_handler(int sd, short flags, void* user); - -static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket); - -static void xmit_data(int sd, short flags, void* send_req); - -static uint32_t parse_network(char *network); - -/* - * Data structure for tracking networks and their channels - */ -typedef struct { - opal_list_item_t item; - uint32_t network; - uint8_t next_freq; -} rmcast_basic_network_t; -static void newtork_construct(rmcast_basic_network_t *ptr) -{ - ptr->network = 0; - ptr->next_freq = 0; -} -OBJ_CLASS_INSTANCE(rmcast_basic_network_t, - opal_list_item_t, - newtork_construct, - NULL); /* * Data structure for tracking assigned channels @@ -86,10 +56,10 @@ OBJ_CLASS_INSTANCE(rmcast_basic_network_t, typedef struct { opal_list_item_t item; char *name; - unsigned int channel; - uint32_t full_addr; + orte_rmcast_channel_t channel; + uint32_t network; uint16_t port; - uint8_t freq; + uint32_t interface; int xmit; int recv; struct sockaddr_in addr; @@ -105,10 +75,10 @@ typedef struct { static void channel_construct(rmcast_basic_channel_t *ptr) { ptr->name = NULL; - ptr->channel = 0; - ptr->full_addr = 0; + ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; + ptr->network = 0; ptr->port = 0; - ptr->freq = 0; + ptr->interface = 0; ptr->xmit = -1; ptr->recv = -1; memset(&ptr->addr, 0, sizeof(ptr->addr)); @@ -156,7 +126,7 @@ OBJ_CLASS_INSTANCE(rmcast_basic_channel_t, */ typedef struct { opal_list_item_t item; - uint32_t channel; + orte_rmcast_channel_t channel; bool recvd; opal_buffer_t *data; orte_rmcast_tag_t tag; @@ -167,7 +137,7 @@ typedef struct { static void recv_construct(rmcast_basic_recv_t *ptr) { - ptr->channel = 0; + ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; ptr->recvd = false; ptr->data = NULL; ptr->tag = ORTE_RMCAST_TAG_INVALID; @@ -214,35 +184,47 @@ OBJ_CLASS_INSTANCE(rmcast_basic_send_t, send_construct, NULL); +/* LOCAL FUNCTIONS */ +static void recv_handler(int sd, short flags, void* user); + +static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction); + +static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket); + +static void xmit_data(int sd, short flags, void* send_req); + /* API FUNCTIONS */ static int init(void); static void finalize(void); -static int basic_send(unsigned int channel, +static int basic_send(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf); -static int basic_send_nb(unsigned int channel, +static int basic_send_nb(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf, orte_rmcast_callback_fn_t cbfunc, void *cbdata); -static int basic_recv(unsigned int channel, +static int basic_recv(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf); -static int basic_recv_nb(unsigned int channel, +static int basic_recv_nb(orte_rmcast_channel_t channel, orte_rmcast_flag_t flags, orte_rmcast_tag_t tag, orte_rmcast_callback_fn_t cbfunc, void *cbdata); -static void cancel_recv(unsigned int channel, +static void cancel_recv(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag); -static unsigned int open_channel(char *name, char *network, uint8_t direction); +static int open_channel(orte_rmcast_channel_t *channel, char *name, + char *network, int port, char *interface, uint8_t direction); + +static int close_channel(orte_rmcast_channel_t channel); /* The API's in this module are solely used to support LOCAL * procs - i.e., procs that are co-located to the HNP. Remote @@ -258,18 +240,33 @@ orte_rmcast_module_t orte_rmcast_basic_module = { basic_recv, basic_recv_nb, cancel_recv, - open_channel + open_channel, + close_channel }; +/* during init, we setup two channels for both xmit and recv: + * (a) a public address announcement channel. There are two variants + * of this: + * (1) system processes - e.g., daemons, tools. This channel + * is reserved solely for their use in performing admin + * functions + * (2) application processes. This channel is used to announce + * their existence and contact info for auto-wireup + * (b) our own group's channel, which is where our own output + * will be sent. At this time, we assume that we always + * want to hear our peers, so this channels is also + * bidirectional + * + * In addition, the HNP opens a third channel which is used solely + * for cmd-control purposes. This is where a tool, for example, might + * send a cmd to the HNP to take some action - there is no point in + * having that cmd echo around to every daemon and/or other tool + * in the system. + */ static int init(void) { - int xmitsd, recvsd; rmcast_basic_channel_t *chan; - rmcast_basic_network_t *net; - uint8_t freq; - char *name; int rc; - uint16_t port; if (init_completed) { return ORTE_SUCCESS; @@ -283,75 +280,45 @@ static int init(void) OBJ_CONSTRUCT(&lock, opal_mutex_t); OBJ_CONSTRUCT(&recvs, opal_list_t); OBJ_CONSTRUCT(&channels, opal_list_t); - OBJ_CONSTRUCT(&networks, opal_list_t); - - /* set the last octets to point at the beginning of the - * specified octet ranges - */ - next_octet1 = orte_rmcast_base.octet1[0]; - next_octet2 = orte_rmcast_base.octet2[0]; - next_octet3 = orte_rmcast_base.octet3[0]; + next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS; - /* form our base-level network */ - net = OBJ_NEW(rmcast_basic_network_t); - net->network = OPAL_IF_ASSEMBLE_NETWORK(next_octet1, next_octet2, next_octet3); - - /* select our frequency and port */ - if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) { - freq = ORTE_RMCAST_SYS_ADDR + orte_rmcast_base.channel_offset; - name = "system"; - } else if (ORTE_PROC_IS_APP) { - freq = ORTE_RMCAST_APP_PUBLIC_ADDR + orte_rmcast_base.channel_offset; - name = "app-public"; - } else { - return ORTE_ERR_NOT_SUPPORTED; - } - port = orte_rmcast_base.ports[freq-orte_rmcast_base.channel_offset-1]; - - /* setup the next freq for this network */ - net->next_freq = freq + 1; - - /* add this channel to our list */ + /* setup the respective public address channel */ chan = OBJ_NEW(rmcast_basic_channel_t); - chan->name = strdup(name); - chan->full_addr = net->network + freq; - chan->port = port; - chan->channel = next_channel++; - - /* setup the IPv4 addr info */ - chan->addr.sin_family = AF_INET; - chan->addr.sin_addr.s_addr = htonl(chan->full_addr); - chan->addr.sin_port = htons(chan->port); - - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "addr %03d.%03d.%03d.%03d port %d freq %d offset %d ports %d", - OPAL_IF_FORMAT_ADDR(chan->full_addr), (int)port, - (int)freq, (int)orte_rmcast_base.channel_offset, - (int)orte_rmcast_base.ports[0])); - - /* create a xmit socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, port, false))) { + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) { + chan->name = strdup("system"); + chan->channel = ORTE_RMCAST_SYS_CHANNEL; + chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_SYS_CHANNEL; + chan->port = orte_rmcast_base.ports[ORTE_RMCAST_SYS_CHANNEL]; + chan->interface = orte_rmcast_base.interface; + } else if (ORTE_PROC_IS_APP) { + chan->name = strdup("app-announce"); + chan->channel = ORTE_RMCAST_APP_PUBLIC_CHANNEL; + chan->network = orte_rmcast_base.xmit_network + ORTE_RMCAST_APP_PUBLIC_CHANNEL; + chan->port = orte_rmcast_base.ports[ORTE_RMCAST_APP_PUBLIC_CHANNEL]; + chan->interface = orte_rmcast_base.interface; + } else { + opal_output(0, "rmcast:basic:init - unknown process type"); + return ORTE_ERR_SILENT; + } + if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; } - chan->xmit = xmitsd; - chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); - - /* create a recv socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, port, true))) { - ORTE_ERROR_LOG(rc); - return rc; + + /* finally, if we are an app, setup our grp channel, if one was given */ + if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { + chan = OBJ_NEW(rmcast_basic_channel_t); + chan->name = strdup(orte_rmcast_base.my_group_name); + chan->channel = orte_rmcast_base.my_group_number; + chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number; + chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number]; + chan->interface = orte_rmcast_base.interface; + if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) { + ORTE_ERROR_LOG(rc); + return rc; + } } - chan->recv = recvsd; - chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); - /* setup an event to catch messages */ - opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); - opal_event_add(&chan->recv_ev, 0); - - /* setup the event to xmit messages, but don't activate it */ - opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan); - return ORTE_SUCCESS; } @@ -372,10 +339,6 @@ static void finalize(void) OBJ_RELEASE(item); } OBJ_DESTRUCT(&channels); - while (NULL != (item = opal_list_remove_first(&networks))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&networks); OPAL_THREAD_UNLOCK(&lock); OBJ_DESTRUCT(&lock); @@ -384,14 +347,14 @@ static void finalize(void) } /* internal blocking send support */ -static void internal_snd_cb(int channel, opal_buffer_t *buf, void *cbdata) +static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata) { rmcast_basic_send_t *snd = (rmcast_basic_send_t*)cbdata; snd->send_complete = true; } -static int basic_send(unsigned int channel, +static int basic_send(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf) { @@ -420,14 +383,14 @@ static int basic_send(unsigned int channel, " called on multicast channel %03d.%03d.%03d.%03d %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)buf->bytes_used, - OPAL_IF_FORMAT_ADDR(ch->full_addr), ch->full_addr)); + OPAL_IF_FORMAT_ADDR(ch->network), ch->network)); /* check the msg size to ensure it isn't too big */ if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) { orte_show_help("help-orte-rmcast-basic.txt", "orte-rmcast-basic:msg-too-large", true, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - OPAL_IF_FORMAT_ADDR(ch->full_addr), tag, + OPAL_IF_FORMAT_ADDR(ch->network), tag, buf->bytes_used, ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10); return ORTE_ERR_NOT_SUPPORTED; @@ -457,7 +420,7 @@ static int basic_send(unsigned int channel, return ORTE_SUCCESS; } -static int basic_send_nb(unsigned int channel, +static int basic_send_nb(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf, orte_rmcast_callback_fn_t cbfunc, @@ -488,13 +451,13 @@ static int basic_send_nb(unsigned int channel, " called on multicast channel %03d.%03d.%03d.%03d %0x", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)buf->bytes_used, - OPAL_IF_FORMAT_ADDR(ch->full_addr), ch->full_addr)); + OPAL_IF_FORMAT_ADDR(ch->network), ch->network)); if (buf->bytes_used > (ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10)) { orte_show_help("help-orte-rmcast-basic.txt", "orte-rmcast-basic:msg-too-large", true, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - OPAL_IF_FORMAT_ADDR(ch->full_addr), tag, + OPAL_IF_FORMAT_ADDR(ch->network), tag, buf->bytes_used, ORTE_RMCAST_BASIC_MAX_MSG_SIZE-10); return ORTE_ERR_NOT_SUPPORTED; @@ -521,7 +484,7 @@ static int basic_send_nb(unsigned int channel, return ORTE_SUCCESS; } -static int basic_recv(unsigned int channel, +static int basic_recv(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf) { @@ -547,7 +510,7 @@ static int basic_recv(unsigned int channel, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->full_addr))); + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); recvptr = OBJ_NEW(rmcast_basic_recv_t); recvptr->channel = channel; @@ -564,7 +527,7 @@ static int basic_recv(unsigned int channel, return ORTE_SUCCESS; } -static int basic_recv_nb(unsigned int channel, +static int basic_recv_nb(orte_rmcast_channel_t channel, orte_rmcast_flag_t flags, orte_rmcast_tag_t tag, orte_rmcast_callback_fn_t cbfunc, void *cbdata) @@ -591,7 +554,7 @@ static int basic_recv_nb(unsigned int channel, OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->full_addr))); + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); recvptr = OBJ_NEW(rmcast_basic_recv_t); recvptr->channel = channel; @@ -603,7 +566,7 @@ static int basic_recv_nb(unsigned int channel, return ORTE_SUCCESS; } -static void cancel_recv(unsigned int channel, +static void cancel_recv(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag) { opal_list_item_t *item, *next; @@ -626,16 +589,31 @@ static void cancel_recv(unsigned int channel, } } -static unsigned int open_channel(char *name, char *network, uint8_t direction) +static int open_channel(orte_rmcast_channel_t *channel, char *name, + char *network, int port, char *interface, uint8_t direction) { opal_list_item_t *item; rmcast_basic_channel_t *nchan, *chan; - rmcast_basic_network_t *net, *netitem; - uint32_t netaddr; + uint32_t netaddr=0, netmask=0, intr=0; int rc; - int xmitsd, recvsd; - /* see if this name has already been assigned a channel */ + /* parse the network, if provided */ + if (NULL != network) { + if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(network, &netaddr, &netmask))) { + orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, network, ORTE_ERROR_NAME(rc)); + return ORTE_ERR_SILENT; + } + } + + /* parse the interface, if provided */ + if (NULL != interface) { + if (ORTE_SUCCESS != (rc = opal_iftupletoaddr(interface, &intr, NULL))) { + orte_show_help("help-rmcast-base.txt", "invalid-net-mask", true, interface, ORTE_ERROR_NAME(rc)); + return ORTE_ERR_SILENT; + } + } + + /* see if this name has already been assigned a channel on the specified network */ chan = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); @@ -643,6 +621,10 @@ static unsigned int open_channel(char *name, char *network, uint8_t direction) nchan = (rmcast_basic_channel_t*)item; if (0 == strcasecmp(nchan->name, name)) { + /* check the network, if one was specified */ + if (0 != netaddr && netaddr != (nchan->network & netmask)) { + continue; + } chan = nchan; break; } @@ -650,133 +632,70 @@ static unsigned int open_channel(char *name, char *network, uint8_t direction) if (NULL != chan) { /* already exists - check that the requested - * socket is setup + * sockets are setup */ - if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) { - setup_socket(&chan->xmit, chan->full_addr, chan->port, false); + if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { + ORTE_ERROR_LOG(rc); + return rc; } - if (0 > chan->recv && ORTE_RMCAST_RECV & direction) { - setup_socket(&chan->recv, chan->full_addr, chan->port, true); - /* setup an event to catch messages */ - opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ, recv_handler, chan); - opal_event_add(&chan->recv_ev, 0); - } - return chan->channel; - } - - /* the named channel doesn't exist - did they give - * us a specific network to use? - */ - if (NULL != network) { - net = NULL; - netaddr = parse_network(network); - for (item = opal_list_get_first(&networks); - item != opal_list_get_end(&networks); - item = opal_list_get_next(item)) { - netitem = (rmcast_basic_network_t*)item; - - if (netaddr == netitem->network) { - net = netitem; - break; - } - } - if (NULL == net) { - /* new network - create it */ - net = OBJ_NEW(rmcast_basic_network_t); - net->network = OPAL_IF_ASSEMBLE_NETWORK(next_octet1, next_octet2, next_octet3); - net->next_freq = orte_rmcast_base.channel_offset + 1; - } - /* assign next freq to the next available channel */ - chan = OBJ_NEW(rmcast_basic_channel_t); - chan->name = strdup(name); - chan->full_addr = net->network + net->next_freq; - chan->port = orte_rmcast_base.ports[net->next_freq-orte_rmcast_base.channel_offset-1]; - chan->channel = next_channel++; - /* setup the IPv4 addr info */ - chan->addr.sin_family = AF_INET; - chan->addr.sin_addr.s_addr = htonl(chan->full_addr); - chan->addr.sin_port = htons(chan->port); - if (ORTE_RMCAST_XMIT & direction) { - /* create a xmit socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, chan->port, false))) { - ORTE_ERROR_LOG(rc); - return 0; - } - chan->xmit = xmitsd; - chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); - } - if (ORTE_RMCAST_RECV & direction) { - /* create a recv socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, chan->port, true))) { - ORTE_ERROR_LOG(rc); - return 0; - } - chan->recv = recvsd; - chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); - } - /* change to the next freq */ - net->next_freq++; - return chan->channel; - } - - /* if we get here, then we couldn't find a channel of the given name - * AND we were not given a network address to use. In this case, use - * the next available network/freq - * - * RHC: for now, we are not going to worry about balancing loads - * across available networks. We are just going to use the next - * network with an available freq - */ - - net = NULL; - for (item = opal_list_get_first(&networks); - item != opal_list_get_end(&networks); - item = opal_list_get_next(item)) { - netitem = (rmcast_basic_network_t*)item; - - if (netitem->next_freq < 255) { - net = netitem; - break; - } - } - if (NULL == net) { - /* we are hosed */ - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return 255; + *channel = chan->channel; + return ORTE_SUCCESS; } + /* we didn't find an existing match, so create a new channel */ chan = OBJ_NEW(rmcast_basic_channel_t); /* puts it on list */ chan->name = strdup(name); - chan->full_addr = net->network + net->next_freq; - chan->port = orte_rmcast_base.ports[net->next_freq-orte_rmcast_base.channel_offset-1]; chan->channel = next_channel++; + /* if we were not given a network, use the default */ + if (NULL == network) { + chan->network = orte_rmcast_base.xmit_network + chan->channel; + } else { + chan->network = netaddr; + } + /* if we were not given an interface, use the default */ + if (NULL == interface) { + chan->interface = orte_rmcast_base.interface; + } else { + chan->interface = intr; + } + /* if we were not given a port, use a default one */ + if (port < 0) { + chan->port = orte_rmcast_base.ports[chan->channel]; + } else { + chan->port = port; + } - /* setup the IPv4 addr info */ - chan->addr.sin_family = AF_INET; - chan->addr.sin_addr.s_addr = htonl(chan->full_addr); - chan->addr.sin_port = htons(chan->port); - if (ORTE_RMCAST_XMIT & direction) { - /* create a xmit socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan->full_addr, chan->port, false))) { - ORTE_ERROR_LOG(rc); - return 255; - } - chan->xmit = xmitsd; - chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); + if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { + ORTE_ERROR_LOG(rc); + return rc; } - if (ORTE_RMCAST_RECV & direction) { - /* create a recv socket */ - if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan->full_addr, chan->port, true))) { - ORTE_ERROR_LOG(rc); - return 255; - } - chan->recv = recvsd; - chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); - } - /* change to the next freq */ - net->next_freq++; - return chan->channel; + *channel = chan->channel; + + return ORTE_SUCCESS; +} + +static int close_channel(orte_rmcast_channel_t channel) +{ + opal_list_item_t *item; + rmcast_basic_channel_t *chan; + + OPAL_THREAD_LOCK(&lock); + for (item = opal_list_get_first(&channels); + item != opal_list_get_end(&channels); + item = opal_list_get_next(item)) { + chan = (rmcast_basic_channel_t*)item; + + if (channel == chan->channel) { + opal_list_remove_item(&channels, item); + OBJ_RELEASE(chan); + OPAL_THREAD_UNLOCK(&lock); + return ORTE_SUCCESS; + } + } + + OPAL_THREAD_UNLOCK(&lock); + return ORTE_ERR_NOT_FOUND; } static void recv_handler(int sd, short flags, void* cbdata) @@ -811,11 +730,11 @@ static void recv_handler(int sd, short flags, void* cbdata) ORTE_NAME_PRINT(&name))); /* if this message is from myself, ignore it */ - if (name.jobid == ORTE_PROC_MY_NAME->jobid && - name.vpid == ORTE_PROC_MY_NAME->vpid) { + if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) { OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, - "%s rmcast:basic:recv sent to myself!", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + "%s rmcast:basic:recv sent from myself: %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&name))); return; } @@ -839,8 +758,8 @@ static void recv_handler(int sd, short flags, void* cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)ptr->channel, (int)ptr->tag)); - if (chan->channel == ptr->channel && - tag == ptr->tag) { + if ((chan->channel == ptr->channel || ORTE_RMCAST_WILDCARD_CHANNEL == ptr->channel) && + (tag == ptr->tag || ORTE_RMCAST_TAG_WILDCARD == ptr->tag)) { /* data must be placed in malloc'd area for buffer */ payload = (uint8_t*)malloc(sz-10); memcpy(payload, &chan->recvd_data[10], sz-10); @@ -871,7 +790,50 @@ static void recv_handler(int sd, short flags, void* cbdata) return; } -static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) +static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction) +{ + int rc; + int xmitsd, recvsd; + + /* setup the IPv4 addr info */ + chan->addr.sin_family = AF_INET; + chan->addr.sin_addr.s_addr = htonl(chan->network); + chan->addr.sin_port = htons(chan->port); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "setup:channel addr %03d.%03d.%03d.%03d port %d", + OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port)); + + if (0 > chan->xmit && ORTE_RMCAST_XMIT & direction) { + /* create a xmit socket */ + if (ORTE_SUCCESS != (rc = setup_socket(&xmitsd, chan, false))) { + ORTE_ERROR_LOG(rc); + return rc; + } + chan->xmit = xmitsd; + chan->send_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); + /* setup the event to xmit messages, but don't activate it */ + opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan); + } + + if (0 > chan->recv && ORTE_RMCAST_RECV & direction) { + /* create a recv socket */ + if (ORTE_SUCCESS != (rc = setup_socket(&recvsd, chan, true))) { + ORTE_ERROR_LOG(rc); + return rc; + } + chan->recv = recvsd; + chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); + + /* setup an event to catch messages */ + opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); + opal_event_add(&chan->recv_ev, 0); + } + + return ORTE_SUCCESS; +} + +static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket) { uint8_t ttl = 1; struct sockaddr_in inaddr; @@ -880,6 +842,10 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) int target_sd; int flags; + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "setup:socket addr %03d.%03d.%03d.%03d port %d", + OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port)); + target_sd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); if(target_sd < 0) { if (EAFNOSUPPORT != opal_socket_errno) { @@ -892,7 +858,7 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) /* set the multicast flags */ if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_TTL, (void *)&ttl, sizeof(ttl))) < 0) { - opal_output(0,"rmcast:init: socketopt() failed: %s (%d)", + opal_output(0,"rmcast:init: socketopt() failed on MULTICAST_TTL: %s (%d)", strerror(opal_socket_errno), opal_socket_errno); return ORTE_ERR_IN_ERRNO; } @@ -907,36 +873,60 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) return ORTE_ERROR; } - /* Bind the socket if requested */ - if (bindsocket) { + /* if this is the recv side... */ + if (recvsocket) { memset(&inaddr, 0, sizeof(inaddr)); inaddr.sin_family = AF_INET; - inaddr.sin_addr.s_addr = htonl(chan); - inaddr.sin_port = htons(port); + inaddr.sin_addr.s_addr = htonl(chan->network); + inaddr.sin_port = htons(chan->port); addrlen = sizeof(struct sockaddr_in); /* bind the socket */ if (bind(target_sd, (struct sockaddr*)&inaddr, addrlen) < 0) { - opal_output(0, "%s rmcast:init: bind() failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + opal_output(0, "%s rmcast:init: bind() failed for addr %03d.%03d.%03d.%03d port %d\n\tError: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(chan->network), (int)chan->port, + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(target_sd); + return ORTE_ERROR; + } + /* set membership on the multicast interface */ + memset(&req, 0, sizeof (req)); + req.imr_multiaddr.s_addr = htonl(chan->network); + req.imr_interface.s_addr = htonl(chan->interface); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "setup:socket:membership addr %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d", + OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface))); + + if ((setsockopt(target_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (void *)&req, sizeof (req))) < 0) { + opal_output(0, "%s rmcast:init: setsockopt() failed on ADD_MEMBERSHIP\n" + "\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface), + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(target_sd); + return ORTE_ERROR; + } + } else { + /* on the xmit side, need to set the interface */ + memset(&req, 0, sizeof (req)); + req.imr_interface.s_addr = htonl(chan->interface); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "setup:socket:xmit interface %03d.%03d.%03d.%03d", + OPAL_IF_FORMAT_ADDR(chan->interface))); + + if ((setsockopt(target_sd, IPPROTO_IP, IP_MULTICAST_IF, + (void *)&req, sizeof (req))) < 0) { + opal_output(0, "%s rmcast:init: setsockopt() failed on MULTICAST_IF\n" + "\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + OPAL_IF_FORMAT_ADDR(chan->network), OPAL_IF_FORMAT_ADDR(chan->interface), strerror(opal_socket_errno), opal_socket_errno); CLOSE_THE_SOCKET(target_sd); return ORTE_ERROR; } - } - - /* set membership to "any" */ - memset(&req, 0, sizeof (req)); - req.imr_multiaddr.s_addr = htonl(chan); - req.imr_interface.s_addr = htonl(INADDR_ANY); - - if ((setsockopt(target_sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, - (void *)&req, sizeof (req))) < 0) { - opal_output(0, "%s rmcast:init: setsockopt() failed: %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(target_sd); - return ORTE_ERROR; } /* set socket up to be non-blocking */ @@ -952,7 +942,7 @@ static int setup_socket(int *sd, uint32_t chan, uint16_t port, bool bindsocket) return ORTE_ERROR; } } - + /* return the socket */ *sd = target_sd; @@ -1025,22 +1015,3 @@ static void xmit_data(int sd, short flags, void* send_req) OPAL_THREAD_UNLOCK(&chan->send_lock); } - -static uint32_t parse_network(char *network) -{ - char **octets=NULL; - uint32_t net, oct1, oct2, oct3; - - /* the network will be provided as a set of dot-separated - * octets, so split at those points - */ - octets = opal_argv_split(network, '.'); - oct1 = strtoul(octets[0], NULL, 10); - oct2 = strtoul(octets[1], NULL, 10); - oct3 = strtoul(octets[2], NULL, 10); - net = ((oct1 >> 24) & 0x000000FF) | - ((oct2 >> 16) & 0x000000FF) | - ((oct3 >> 8) & 0x000000FF); - opal_argv_free(octets); - return net; -} diff --git a/orte/mca/rmcast/rmcast.h b/orte/mca/rmcast/rmcast.h index de5a59ee05..32504b4021 100644 --- a/orte/mca/rmcast/rmcast.h +++ b/orte/mca/rmcast/rmcast.h @@ -38,7 +38,7 @@ BEGIN_C_DECLS /** * Function prototype for callback from receiving multicast messages */ -typedef void (*orte_rmcast_callback_fn_t)(int channel, opal_buffer_t *buf, void* cbdata); +typedef void (*orte_rmcast_callback_fn_t)(orte_rmcast_channel_t channel, opal_buffer_t *buf, void* cbdata); /* initialize the selected module */ typedef int (*orte_rmcast_base_module_init_fn_t)(void); @@ -47,34 +47,38 @@ typedef int (*orte_rmcast_base_module_init_fn_t)(void); typedef void (*orte_rmcast_base_module_finalize_fn_t)(void); /* send a buffered message across a multicast channel */ -typedef int (*orte_rmcast_base_module_send_fn_t)(unsigned int channel, +typedef int (*orte_rmcast_base_module_send_fn_t)(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf); /* non-blocking send messages from a multicast channel */ -typedef int (*orte_rmcast_base_module_send_nb_fn_t)(unsigned int channel, +typedef int (*orte_rmcast_base_module_send_nb_fn_t)(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf, orte_rmcast_callback_fn_t cbfunc, void *cbdata); /* non-blocking receive messages from a multicast channel */ -typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(unsigned int channel, +typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channel, orte_rmcast_flag_t flags, orte_rmcast_tag_t tag, orte_rmcast_callback_fn_t cbfunc, void *cbdata); /* blocking receive from a multicast channel */ -typedef int (*orte_rmcast_base_module_recv_fn_t)(unsigned int channel, +typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag, opal_buffer_t *buf); /* cancel a receive */ -typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(unsigned int channel, +typedef void (*orte_rmcast_base_module_cancel_recv_fn_t)(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag); /* open the next available channel */ -typedef unsigned int (*orte_rmcast_base_module_open_rmcast_channel_fn_t)(char *name, char *network, uint8_t direction); +typedef int (*orte_rmcast_base_module_open_channel_fn_t)(orte_rmcast_channel_t *channel, char *name, + char *network, int port, char *interface, uint8_t direction); + +/* close the channel */ +typedef int (*orte_rmcast_base_module_close_channel_fn_t)(orte_rmcast_channel_t channel); /* * rmcast component @@ -94,14 +98,15 @@ typedef orte_rmcast_base_component_1_0_0_t orte_rmcast_base_component_t; * Component modules Ver 1.0 */ struct orte_rmcast_base_module_t { - orte_rmcast_base_module_init_fn_t init; - orte_rmcast_base_module_finalize_fn_t finalize; - orte_rmcast_base_module_send_fn_t send; - orte_rmcast_base_module_send_nb_fn_t send_nb; - orte_rmcast_base_module_recv_fn_t recv; - orte_rmcast_base_module_recv_nb_fn_t recv_nb; - orte_rmcast_base_module_cancel_recv_fn_t cancel_recv; - orte_rmcast_base_module_open_rmcast_channel_fn_t open_channel; + orte_rmcast_base_module_init_fn_t init; + orte_rmcast_base_module_finalize_fn_t finalize; + orte_rmcast_base_module_send_fn_t send; + orte_rmcast_base_module_send_nb_fn_t send_nb; + orte_rmcast_base_module_recv_fn_t recv; + orte_rmcast_base_module_recv_nb_fn_t recv_nb; + orte_rmcast_base_module_cancel_recv_fn_t cancel_recv; + orte_rmcast_base_module_open_channel_fn_t open_channel; + orte_rmcast_base_module_close_channel_fn_t close_channel; }; /** Convienence typedef */ typedef struct orte_rmcast_base_module_t orte_rmcast_module_t; diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index e5f8bf2ebc..52283aac58 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -21,12 +21,17 @@ BEGIN_C_DECLS +/* channel type */ +typedef int32_t orte_rmcast_channel_t; +#define ORTE_RMCAST_CHANNEL_T OPAL_INT32 + /* ORTE IP multicast channels */ -#define ORTE_RMCAST_SYS_ADDR 1 -#define ORTE_RMCAST_APP_PUBLIC_ADDR 2 +#define ORTE_RMCAST_WILDCARD_CHANNEL -1 +#define ORTE_RMCAST_INVALID_CHANNEL 0 +#define ORTE_RMCAST_SYS_CHANNEL 1 +#define ORTE_RMCAST_APP_PUBLIC_CHANNEL 2 - -#define ORTE_RMCAST_DYNAMIC_CHANNELS 100 +#define ORTE_RMCAST_DYNAMIC_CHANNELS 3 /* define channel directions */ @@ -35,12 +40,13 @@ BEGIN_C_DECLS #define ORTE_RMCAST_BIDIR 0x03 /* Message matching tag */ -typedef uint16_t orte_rmcast_tag_t; -#define ORTE_RMCAST_TAG_T OPAL_UINT16 +typedef int32_t orte_rmcast_tag_t; +#define ORTE_RMCAST_TAG_T OPAL_INT32 /* tag values for well-known services */ -#define ORTE_RMCAST_TAG_INVALID 0 -#define ORTE_RMCAST_TAG_BOOTSTRAP 1 +#define ORTE_RMCAST_TAG_WILDCARD -1 +#define ORTE_RMCAST_TAG_INVALID 0 +#define ORTE_RMCAST_TAG_BOOTSTRAP 1 /* starting value for dynamicall assignable tags */ #define ORTE_RMCAST_TAG_DYNAMIC 100 diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index dcf5e6ee3e..8e8024e84f 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -168,6 +168,9 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t); /* onesided barrier */ #define ORTE_RML_TAG_ONESIDED_BARRIER 34 +/* bootstrap */ +#define ORTE_RML_TAG_BOOTSTRAP 35 + #define ORTE_RML_TAG_MAX 100 diff --git a/orte/mca/routed/cm/routed_cm.c b/orte/mca/routed/cm/routed_cm.c index 982b3c2556..0f604cf684 100644 --- a/orte/mca/routed/cm/routed_cm.c +++ b/orte/mca/routed/cm/routed_cm.c @@ -219,13 +219,6 @@ static int update_route(orte_process_name_t *target, return ORTE_SUCCESS; } - /* if the job family is zero, then this is going to a local slave, - * so the path is direct and there is nothing to do here - */ - if (0 == ORTE_JOB_FAMILY(target->jobid)) { - return ORTE_SUCCESS; - } - OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, "%s routed_cm_update: %s --> %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -311,16 +304,13 @@ static orte_process_name_t get_route(orte_process_name_t *target) goto found; } - /* if the job family is zero, then this is going to a local slave, - * so the path is direct - */ - if (0 == ORTE_JOB_FAMILY(target->jobid)) { - ret = target; - goto found; - } - /* IF THIS IS FOR A DIFFERENT JOB FAMILY... */ if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) { + /* if I am a daemon, send it to the HNP */ + if (ORTE_PROC_IS_DAEMON) { + ret = ORTE_PROC_MY_HNP; + goto found; + } /* if I am the HNP, then I stored a route to * this job family, so look it up */ @@ -350,17 +340,13 @@ static orte_process_name_t get_route(orte_process_name_t *target) ret = target; goto found; } else { - /* if I am a daemon, route it through the HNP to avoid - * opening unnecessary sockets - */ - if (ORTE_PROC_IS_DAEMON) { - + /* otherwise, if I am the HNP, send to the daemon */ + if (ORTE_PROC_IS_HNP) { + ret = &daemon; + } else { + /* send to the HNP for routing */ ret = ORTE_PROC_MY_HNP; - goto found; } - - /* otherwise, if I am the HNP, send to that daemon */ - ret = &daemon; goto found; } @@ -510,7 +496,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) */ if (ORTE_PROC_IS_DAEMON) { - OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, + OPAL_OUTPUT_VERBOSE((0, orte_routed_base_output, "%s routed_cm: init routes for daemon job %s\n\thnp_uri %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job), @@ -537,7 +523,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) ORTE_ERROR_LOG(rc); return rc; } - + /* set our lifeline to the HNP - we will abort if that connection is lost */ lifeline = ORTE_PROC_MY_HNP; @@ -545,16 +531,10 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) * part of the message confirming they are read to go. HNP's * load their contact info during orte_init */ - } else { - /* ndat != NULL means we are getting an update of RML info - * for the daemons - so update our contact info and routes - */ - if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) { - ORTE_ERROR_LOG(rc); - } - return rc; - } - + } + + /* ignore any other call as we only talk to the HNP */ + OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output, "%s routed_cm: completed init routes", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -562,10 +542,10 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat) return ORTE_SUCCESS; } - + if (ORTE_PROC_IS_HNP) { - OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output, + OPAL_OUTPUT_VERBOSE((0, orte_routed_base_output, "%s routed_cm: init routes for HNP job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job))); diff --git a/orte/mca/routed/cm/routed_cm_component.c b/orte/mca/routed/cm/routed_cm_component.c index c2f82a91c8..54dce47233 100644 --- a/orte/mca/routed/cm/routed_cm_component.c +++ b/orte/mca/routed/cm/routed_cm_component.c @@ -46,8 +46,17 @@ orte_routed_component_t mca_routed_cm_component = { static int orte_routed_cm_component_query(mca_base_module_t **module, int *priority) { - /* only pick us if we were specifically directed to be used */ - *priority = 0; - *module = (mca_base_module_t *) &orte_routed_cm_module; + char *spec; + + /* only select us if specified */ + spec = getenv("OMPI_MCA_routed"); + if (NULL == spec || 0 != strcmp("cm", spec)) { + *priority = 0; + *module = NULL; + return ORTE_ERROR; + } + + *priority = 1000; + *module = (mca_base_module_t *)&orte_routed_cm_module; return ORTE_SUCCESS; } diff --git a/orte/orted/orted_main.c b/orte/orted/orted_main.c index 262cdf69a9..003ccefd52 100644 --- a/orte/orted/orted_main.c +++ b/orte/orted/orted_main.c @@ -718,14 +718,26 @@ int orte_daemon(int argc, char *argv[]) OBJ_RELEASE(buffer); goto DONE; } + } else if (orte_daemon_bootstrap) { + /* send to a different callback location as the + * HNP didn't launch us and isn't waiting for a + * callback + */ + if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer, + ORTE_RML_TAG_BOOTSTRAP, 0))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(buffer); + goto DONE; + } + } else { + if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer, + ORTE_RML_TAG_ORTED_CALLBACK, 0))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(buffer); + goto DONE; + } } - if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buffer, - ORTE_RML_TAG_ORTED_CALLBACK, 0))) { - ORTE_ERROR_LOG(ret); - OBJ_RELEASE(buffer); - goto DONE; - } OBJ_RELEASE(buffer); /* done with this */ }