1
1

Cleanup auto-wireup and enable tools to "discover" the HNP via multicast

This commit was SVN r22012.
Этот коммит содержится в:
Ralph Castain 2009-09-25 01:00:09 +00:00
родитель 2af7657db1
Коммит 709b36efb4
5 изменённых файлов: 194 добавлений и 97 удалений

@ -127,9 +127,7 @@ int orte_ess_base_tool_setup(void)
/* setup I/O forwarding system - must come after we init routes */
if (NULL != orte_process_info.my_hnp_uri) {
/* only do this if we were NOT given an HNP - i.e., if we
* are a standalone tool
*/
/* only do this if we were given an HNP */
if (ORTE_SUCCESS != (ret = orte_iof_base_open())) {
ORTE_ERROR_LOG(ret);
error = "orte_iof_base_open";

@ -31,6 +31,7 @@
#include "orte/mca/rmcast/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/plm/base/base.h"
#include "orte/util/show_help.h"
#include "orte/util/proc_info.h"
#include "orte/util/name_fns.h"
@ -81,10 +82,6 @@ static int rte_init(void)
* be calling this module
*/
/* initialize the global list of local children and job data */
OBJ_CONSTRUCT(&orte_local_children, opal_list_t);
OBJ_CONSTRUCT(&orte_local_jobdata, opal_list_t);
/* run the prolog */
if (ORTE_SUCCESS != (ret = orte_ess_base_std_prolog())) {
error = "orte_ess_base_std_prolog";
@ -106,25 +103,65 @@ static int rte_init(void)
goto error;
}
/* get a name for ourselves */
if (ORTE_SUCCESS != (ret = cm_set_name())) {
error = "set_name";
goto error;
if (ORTE_PROC_IS_DAEMON) {
/* get a name for ourselves */
if (ORTE_SUCCESS != (ret = cm_set_name())) {
error = "set_name";
goto error;
}
/* initialize the global list of local children and job data */
OBJ_CONSTRUCT(&orte_local_children, opal_list_t);
OBJ_CONSTRUCT(&orte_local_jobdata, opal_list_t);
/* get the list of nodes used for this job */
nodelist = getenv("OMPI_MCA_orte_nodelist");
if (NULL != nodelist) {
/* split the node list into an argv array */
hosts = opal_argv_split(nodelist, ',');
}
if (ORTE_SUCCESS != (ret = orte_ess_base_orted_setup(hosts))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_orted_setup";
goto error;
}
opal_argv_free(hosts);
} else if (ORTE_PROC_IS_TOOL) {
if (ORTE_SUCCESS != (ret = orte_plm_base_open())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_base_open";
goto error;
}
if (ORTE_SUCCESS != (ret = orte_plm_base_select())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_base_select";
goto error;
}
if (ORTE_SUCCESS != (ret = orte_plm.set_hnp_name())) {
ORTE_ERROR_LOG(ret);
error = "orte_plm_set_hnp_name";
goto error;
}
/* close the plm since we opened it to set our
* name, but have no further use for it
*/
orte_plm_base_close();
/* checkin with the HNP */
if (ORTE_SUCCESS != (ret = cm_set_name())) {
error = "set_name";
goto error;
}
/* do the rest of the standard tool init */
if (ORTE_SUCCESS != (ret = orte_ess_base_tool_setup())) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_tool_setup";
goto error;
}
}
/* get the list of nodes used for this job */
nodelist = getenv("OMPI_MCA_orte_nodelist");
if (NULL != nodelist) {
/* split the node list into an argv array */
hosts = opal_argv_split(nodelist, ',');
}
if (ORTE_SUCCESS != (ret = orte_ess_base_orted_setup(hosts))) {
ORTE_ERROR_LOG(ret);
error = "orte_ess_base_orted_setup";
goto error;
}
opal_argv_free(hosts);
return ORTE_SUCCESS;
error:
@ -139,13 +176,19 @@ static int rte_finalize(void)
{
int ret;
if (ORTE_SUCCESS != (ret = orte_ess_base_orted_finalize())) {
ORTE_ERROR_LOG(ret);
if (ORTE_PROC_IS_DAEMON) {
if (ORTE_SUCCESS != (ret = orte_ess_base_orted_finalize())) {
ORTE_ERROR_LOG(ret);
}
/* deconstruct the nidmap and jobmap arrays */
orte_util_nidmap_finalize();
} else if (ORTE_PROC_IS_TOOL) {
if (ORTE_SUCCESS != (ret = orte_ess_base_tool_finalize())) {
ORTE_ERROR_LOG(ret);
}
}
/* deconstruct the nidmap and jobmap arrays */
orte_util_nidmap_finalize();
return ret;
}
@ -320,6 +363,7 @@ static bool name_success = false;
static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
{
int32_t n;
orte_daemon_cmd_flag_t cmd;
orte_process_name_t name;
int rc;
char *uri;
@ -327,18 +371,26 @@ static void cbfunc(int channel, opal_buffer_t *buf, void *cbdata)
/* ensure we default to failure */
name_success = false;
/* unpack the name */
/* unpack the cmd */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &cmd, &n, ORTE_DAEMON_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
ORTE_PROC_MY_NAME->jobid = name.jobid;
ORTE_PROC_MY_NAME->vpid = name.vpid;
OPAL_OUTPUT_VERBOSE((1, orte_ess_base_output,
"set my name to %s", ORTE_NAME_PRINT(&name)));
if (ORTE_DAEMON_NAME_REQ_CMD == cmd) {
/* unpack the name */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &name, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
}
ORTE_PROC_MY_NAME->jobid = name.jobid;
ORTE_PROC_MY_NAME->vpid = name.vpid;
OPAL_OUTPUT_VERBOSE((1, orte_ess_base_output,
"set my name to %s", ORTE_NAME_PRINT(&name)));
}
/* unpack the HNP uri */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &uri, &n, OPAL_STRING))) {
@ -370,47 +422,67 @@ static int cm_set_name(void)
opal_buffer_t buf;
orte_daemon_cmd_flag_t cmd;
/* try constructing the name from the IP address - first,
* find an appropriate interface
*/
for (i=0; NULL != ifnames[i]; i++) {
if (ORTE_SUCCESS != (rc = opal_ifnametoaddr(ifnames[i],
(struct sockaddr*)&if_addr,
sizeof(struct sockaddr_in)))) {
continue;
}
addr = htonl(if_addr.sin_addr.s_addr);
/* break address into sections */
net = 0x000000FF & ((0xFF000000 & addr) >> 24);
rack = 0x000000FF & ((0x00FF0000 & addr) >> 16);
slot = 0x000000FF & ((0x0000FF00 & addr) >> 8);
function = 0x000000FF & addr;
/* is this an appropriate interface to use */
if (10 == net) {
/* set our vpid - add 1 to ensure it cannot be zero */
ORTE_PROC_MY_NAME->vpid = (rack * mca_ess_cm_component.max_slots) + slot + function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
return ORTE_SUCCESS;
} else if (192 == net && 168 == rack) {
/* just use function */
ORTE_PROC_MY_NAME->vpid = function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
return ORTE_SUCCESS;
}
}
/* if we get here, then we didn't find a usable interface.
* use the reliable multicast system to contact the HNP and
* get a name
*/
/* setup the query */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
cmd = ORTE_DAEMON_NAME_REQ_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
if (ORTE_PROC_IS_DAEMON) {
/* try constructing the name from the IP address - first,
* find an appropriate interface
*/
for (i=0; NULL != ifnames[i]; i++) {
if (ORTE_SUCCESS != (rc = opal_ifnametoaddr(ifnames[i],
(struct sockaddr*)&if_addr,
sizeof(struct sockaddr_in)))) {
continue;
}
addr = htonl(if_addr.sin_addr.s_addr);
/* break address into sections */
net = 0x000000FF & ((0xFF000000 & addr) >> 24);
rack = 0x000000FF & ((0x00FF0000 & addr) >> 16);
slot = 0x000000FF & ((0x0000FF00 & addr) >> 8);
function = 0x000000FF & addr;
/* is this an appropriate interface to use */
if (10 == net) {
/* set our vpid - add 1 to ensure it cannot be zero */
ORTE_PROC_MY_NAME->vpid = (rack * mca_ess_cm_component.max_slots) + slot + function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
/* notify the HNP of our existence */
cmd = ORTE_DAEMON_CHECKIN_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
goto checkin;
} else if (192 == net && 168 == rack) {
/* just use function */
ORTE_PROC_MY_NAME->vpid = function + 1;
/* set our jobid to 0 */
ORTE_PROC_MY_NAME->jobid = 0;
/* notify the HNP of our existence */
cmd = ORTE_DAEMON_CHECKIN_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
goto checkin;
}
}
/* if we get here, then we didn't find a usable interface.
* use the reliable multicast system to contact the HNP and
* get a name
*/
cmd = ORTE_DAEMON_NAME_REQ_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
} else if (ORTE_PROC_IS_TOOL) {
cmd = ORTE_TOOL_CHECKIN_CMD;
opal_dss.pack(&buf, &cmd, 1, ORTE_DAEMON_CMD_T);
/* provide our name */
opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME);
}
checkin:
/* always include our node name */
opal_dss.pack(&buf, &orte_process_info.nodename, 1, OPAL_STRING);
/* set the recv to get the answer */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_SYS_ADDR,
ORTE_RMCAST_NON_PERSISTENT,
@ -420,6 +492,7 @@ static int cm_set_name(void)
OBJ_DESTRUCT(&buf);
return rc;
}
/* send the request */
if (ORTE_SUCCESS != (rc = orte_rmcast.send(ORTE_RMCAST_SYS_ADDR,
ORTE_RMCAST_TAG_BOOTSTRAP,
@ -439,4 +512,3 @@ static int cm_set_name(void)
}
return ORTE_ERR_NOT_FOUND;
}

@ -75,7 +75,8 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* bootstrap */
#define ORTE_DAEMON_NAME_REQ_CMD (orte_daemon_cmd_flag_t) 23
#define ORTE_DAEMON_CHECKIN_CMD (orte_daemon_cmd_flag_t) 24
#define ORTE_TOOL_CHECKIN_CMD (orte_daemon_cmd_flag_t) 25
/*
* List object to locally store the process names and pids of
* our children. This can subsequently be used to order termination

@ -41,7 +41,6 @@ static opal_mutex_t lock;
static opal_list_t recvs;
static opal_list_t channels;
static unsigned int next_channel;
static uint8_t my_packed_name[8];
static bool init_completed = false;
/* LOCAL FUNCTIONS */
@ -237,7 +236,6 @@ static int init(void)
rmcast_basic_channel_t *chan;
int channel;
char *name;
uint32_t tmp;
if (init_completed) {
return ORTE_SUCCESS;
@ -252,12 +250,6 @@ static int init(void)
OBJ_CONSTRUCT(&recvs, opal_list_t);
OBJ_CONSTRUCT(&channels, opal_list_t);
/* convert my name to get it into network-byte-order */
tmp = htonl(ORTE_PROC_MY_NAME->jobid);
memcpy(&my_packed_name[0], &tmp, 4);
tmp = htonl(ORTE_PROC_MY_NAME->vpid);
memcpy(&my_packed_name[4], &tmp, 4);
/* define the starting point for new channels */
next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS;
@ -783,6 +775,7 @@ static void xmit_data(int sd, short flags, void* send_req)
int32_t sz;
int rc;
uint16_t tmp;
uint32_t nm;
OPAL_THREAD_LOCK(&chan->send_lock);
while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) {
@ -792,7 +785,10 @@ static void xmit_data(int sd, short flags, void* send_req)
opal_dss.unload(snd->data, (void**)&bytes, &sz);
/* start the send data area with our name in network-byte-order */
memcpy(chan->send_data, my_packed_name, 8);
nm = htonl(ORTE_PROC_MY_NAME->jobid);
memcpy(&chan->send_data[0], &nm, 4);
nm = htonl(ORTE_PROC_MY_NAME->vpid);
memcpy(&chan->send_data[4], &nm, 4);
/* add the tag data, also converted */
tmp = htons(snd->tag);

@ -97,9 +97,16 @@ static int finalize(void)
{
int rc;
/* if I am a tool without a daemon, just cleanout
* the basics and leave
*/
if (ORTE_PROC_IS_TOOL && NULL == orte_process_info.my_daemon_uri) {
goto cleanup;
}
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
*/
* truly finalizing prior to departure
*/
if (ORTE_PROC_IS_APP) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync(false))) {
ORTE_ERROR_LOG(rc);
@ -112,6 +119,7 @@ static int finalize(void)
orte_routed_base_comm_stop();
}
cleanup:
OBJ_DESTRUCT(&jobfam_list);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
@ -291,18 +299,18 @@ static orte_process_name_t get_route(orte_process_name_t *target)
goto found;
}
/* if I am a tool without a daemon, the route is direct */
if (ORTE_PROC_IS_TOOL && NULL == orte_process_info.my_daemon_uri) {
ret = target;
goto found;
}
/* if I am an application process, always route via my local daemon */
if (ORTE_PROC_IS_APP) {
ret = ORTE_PROC_MY_DAEMON;
goto found;
}
/* if I am a tool, the route is direct */
if (ORTE_PROC_IS_TOOL) {
ret = target;
goto found;
}
/* if the job family is zero, then this is going to a local slave,
* so the path is direct
*/
@ -469,8 +477,30 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
*/
int rc;
/* if I am a tool, then I stand alone - there is nothing to do */
/* if I am a tool, then see if I stand alone - otherwise,
* setup the HNP info
*/
if (ORTE_PROC_IS_TOOL) {
if (NULL == orte_process_info.my_hnp_uri) {
return ORTE_SUCCESS;
}
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set our lifeline to the HNP - we will abort if that connection is lost */
lifeline = ORTE_PROC_MY_HNP;
return ORTE_SUCCESS;
}