1
1

Modify the routed framework to allow greater control/flexibility over response to lost routes and initial wireup of jobs as required by several soon-to-come new modules.

Specifically, add two new APIs:

1. lost_route: allows the OOB to report that a connection has failed, thereby giving the routed module an opportunity to respond appropriately to its topology. Creating the API also allows each routed component to hold its own definition of "lifeline" - in some cases, this may be a single connection, but in others it may be multiple connections. Some modules may choose to re-route messaging if the lifeline or any other connection is lost, while others may choose to abort the job.

Both the tree and unity modules retain the current behavior and abort the job if the lifeline connection is lost, while ignoring other lost connections.

2. get_wireup_info: returns (in a provided buffer) info required to wireup connections for the specified job. Some routed modules do not need to return any info as they can wireup via alternative means, while some need to xchg data with their peers. If info is inserted into the buffer, the plm_base_launch_apps function will xcast the contents to the specified job.

The commit also removes the "lifeline" entry from the orte_process_info struct (and the associated ORTE_PROC_MY_LIFELINE definition) as the lifeline info is now contained within the respective routed module.

This commit was SVN r17969.
Этот коммит содержится в:
Ralph Castain 2008-03-26 01:00:24 +00:00
родитель 64bc580c78
Коммит 60d931217f
12 изменённых файлов: 815 добавлений и 699 удалений

Просмотреть файл

@ -581,20 +581,18 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
peer->peer_state);
}
/* if we lose the connection to the lifeline and we are NOT already in finalize - abort */
if (!orte_finalizing &&
NULL != ORTE_PROC_MY_LIFELINE &&
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->peer_name, ORTE_PROC_MY_LIFELINE)) {
/* Should free the peer lock before we abort so we don't
* get stuck in the orte_wait_kill when receiving messages in the
* tcp OOB. */
OPAL_THREAD_UNLOCK(&peer->peer_lock);
opal_output(0, "%s OOB: Connection to lifeline %s lost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_LIFELINE));
orte_errmgr.abort(1, NULL);
/* inform the routed framework that we have lost a connection so
* it can decide if this is important, what to do about it, etc.
*/
if (ORTE_SUCCESS != orte_routed.route_lost(&peer->peer_name)) {
/* Should free the peer lock before we abort so we don't
* get stuck in the orte_wait_kill when receiving messages in the
* tcp OOB
*/
OPAL_THREAD_UNLOCK(&peer->peer_lock);
orte_errmgr.abort(1, NULL);
}
mca_oob_tcp_peer_shutdown(peer);
}

Просмотреть файл

@ -300,8 +300,7 @@ CLEANUP:
int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
{
int rc;
opal_buffer_t *buf;
orte_rml_cmd_flag_t command;
opal_buffer_t *wireup;
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:daemon_callback",
@ -332,31 +331,23 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
/* all done launching - update the num_procs in my local structure */
orte_process_info.num_procs = jdatorted->num_procs;
/* update everyone's contact info so all daemons
* can talk to each other
*/
buf = OBJ_NEW(opal_buffer_t);
/* pack the update-RML command */
command = ORTE_RML_UPDATE_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) {
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
/* if anything was inserted, send it out */
if (0 < wireup->bytes_used) {
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, wireup, ORTE_RML_TAG_RML_INFO_UPDATE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, buf, ORTE_RML_TAG_RML_INFO_UPDATE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
/* done with the buffer */
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -135,11 +135,52 @@ typedef int (*orte_routed_module_finalize_fn_t)(void);
typedef int (*orte_routed_module_update_route_fn_t)(orte_process_name_t *target,
orte_process_name_t *route);
/**
* Get the next hop towards the target
*
* Obtain the next process on the route to the target. ORTE's routing system
* works one hop at-a-time, so this function doesn't return the entire path
* to the target - it only returns the next hop. This could be the target itself,
* or it could be an intermediate relay. By design, we -never- use application
* procs as relays, so any relay will be an orted.
*/
typedef orte_process_name_t (*orte_routed_module_get_route_fn_t)(orte_process_name_t *target);
/**
* Initialize the routing table
*
* Initialize the routing table for the specified job. This can be rather complex
* and depends entirely upon both the selected module AND whether the function
* is being called by the HNP, an orted, a tool, or an application proc. To
* understand what is happening, you really need to look at the specific module.
*
* Regardless, at the end of the function, the routes to any other process in the
* specified job -must- be defined (even if it is direct)
*/
typedef int (*orte_routed_module_init_routes_fn_t)(orte_jobid_t job, opal_buffer_t *ndat);
/**
* Report a route as "lost"
*
* Report that an existing connection has been lost, therefore potentially
* "breaking" a route in the routing table. It is critical that broken
* connections be reported so that the selected routing module has the
* option of dealing with it. This could consist of nothing more than
* removing that route from the routing table, or could - in the case
* of a "lifeline" connection - result in abort of the process.
*/
typedef int (*orte_routed_module_route_lost_fn_t)(const orte_process_name_t *route);
/**
* Get wireup data for the specified job
*
* Given a jobid and a pointer to a buffer, add whatever routing data
* this module requires to allow inter-process messaging for the
* job.
*/
typedef int (*orte_routed_module_get_wireup_info_fn_t)(orte_jobid_t job,
opal_buffer_t *buf);
/* ******************************************************************** */
@ -159,8 +200,10 @@ struct orte_routed_module_t {
orte_routed_module_update_route_fn_t update_route;
orte_routed_module_get_route_fn_t get_route;
orte_routed_module_init_routes_fn_t init_routes;
orte_routed_module_route_lost_fn_t route_lost;
orte_routed_module_get_wireup_info_fn_t get_wireup_info;
};
/** Convienence typedef */
/** Convenience typedef */
typedef struct orte_routed_module_t orte_routed_module_t;
/** Interface for routed communication */

Просмотреть файл

@ -30,9 +30,101 @@
#include "orte/mca/routed/base/base.h"
#include "routed_tree.h"
int
orte_routed_tree_update_route(orte_process_name_t *target,
orte_process_name_t *route)
static int init(void);
static int finalize(void);
static int update_route(orte_process_name_t *target,
orte_process_name_t *route);
static orte_process_name_t get_route(orte_process_name_t *target);
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
static int route_lost(const orte_process_name_t *route);
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf);
static orte_process_name_t *lifeline=NULL;
orte_routed_module_t orte_routed_tree_module = {
init,
finalize,
update_route,
get_route,
init_routes,
route_lost,
get_wireup_info
};
/* local globals */
static opal_hash_table_t peer_list;
static opal_hash_table_t vpid_wildcard_list;
static orte_process_name_t wildcard_route;
static opal_condition_t cond;
static opal_mutex_t lock;
static int init(void)
{
OBJ_CONSTRUCT(&peer_list, opal_hash_table_t);
opal_hash_table_init(&peer_list, 128);
OBJ_CONSTRUCT(&vpid_wildcard_list, opal_hash_table_t);
opal_hash_table_init(&vpid_wildcard_list, 128);
wildcard_route.jobid = ORTE_NAME_INVALID->jobid;
wildcard_route.vpid = ORTE_NAME_INVALID->vpid;
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
OBJ_CONSTRUCT(&lock, opal_mutex_t);
return ORTE_SUCCESS;
}
static int finalize(void)
{
int rc;
uint64_t key;
void * value, *node, *next_node;
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
*/
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if I am the HNP, I need to stop the comm recv */
if (orte_process_info.hnp) {
orte_routed_base_comm_stop();
}
/* don't destruct the routes until *after* we send the
* sync as the oob will be asking us how to route
* the message!
*/
rc = opal_hash_table_get_first_key_uint64(&peer_list,
&key, &value, &node);
while(OPAL_SUCCESS == rc) {
if(NULL != value) {
free(value);
}
rc = opal_hash_table_get_next_key_uint64(&peer_list,
&key, &value, node, &next_node);
node = next_node;
}
OBJ_DESTRUCT(&peer_list);
OBJ_DESTRUCT(&vpid_wildcard_list);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
return ORTE_SUCCESS;
}
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t * route_copy;
@ -76,7 +168,7 @@ orte_routed_tree_update_route(orte_process_name_t *target,
/* exact match */
if (target->jobid != ORTE_JOBID_WILDCARD &&
target->vpid != ORTE_VPID_WILDCARD) {
rc = opal_hash_table_set_value_uint64(&orte_routed_tree_module.peer_list,
rc = opal_hash_table_set_value_uint64(&peer_list,
orte_util_hash_name(target), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
@ -87,7 +179,7 @@ orte_routed_tree_update_route(orte_process_name_t *target,
/* vpid wildcard */
if (target->jobid != ORTE_JOBID_WILDCARD &&
target->vpid == ORTE_VPID_WILDCARD) {
opal_hash_table_set_value_uint32(&orte_routed_tree_module.vpid_wildcard_list,
opal_hash_table_set_value_uint32(&vpid_wildcard_list,
target->jobid, route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
@ -100,8 +192,7 @@ orte_routed_tree_update_route(orte_process_name_t *target,
}
orte_process_name_t
orte_routed_tree_get_route(orte_process_name_t *target)
static orte_process_name_t get_route(orte_process_name_t *target)
{
orte_process_name_t *ret;
int rc;
@ -113,7 +204,7 @@ orte_routed_tree_get_route(orte_process_name_t *target)
}
/* check exact matches */
rc = opal_hash_table_get_value_uint64(&orte_routed_tree_module.peer_list,
rc = opal_hash_table_get_value_uint64(&peer_list,
orte_util_hash_name(target), (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
@ -121,7 +212,7 @@ orte_routed_tree_get_route(orte_process_name_t *target)
}
/* didn't find an exact match - check to see if a route for this job was defined */
rc = opal_hash_table_get_value_uint32(&orte_routed_tree_module.vpid_wildcard_list,
rc = opal_hash_table_get_value_uint32(&vpid_wildcard_list,
target->jobid, (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
@ -129,7 +220,7 @@ orte_routed_tree_get_route(orte_process_name_t *target)
}
/* default to wildcard route */
ret = &orte_routed_tree_module.wildcard_route;
ret = &wildcard_route;
found:
@ -207,7 +298,7 @@ static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
return ORTE_SUCCESS;
}
int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
{
/* the tree module routes all proc communications through
* the local daemon. Daemons must identify which of their
@ -262,19 +353,18 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
/* if ndat is NULL, then this is being called during init,
* so just seed the routing table with a path back to the HNP...
*/
if (ORTE_SUCCESS != (rc = orte_routed_tree_update_route(ORTE_PROC_MY_HNP,
ORTE_PROC_MY_HNP))) {
if (ORTE_SUCCESS != (rc = update_route(ORTE_PROC_MY_HNP, ORTE_PROC_MY_HNP))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* set the wildcard route for anybody whose name we don't recognize
* to be the HNP
*/
orte_routed_tree_module.wildcard_route.jobid = ORTE_PROC_MY_HNP->jobid;
orte_routed_tree_module.wildcard_route.vpid = ORTE_PROC_MY_HNP->vpid;
wildcard_route.jobid = ORTE_PROC_MY_HNP->jobid;
wildcard_route.vpid = ORTE_PROC_MY_HNP->vpid;
/* set our lifeline to the the HNP - we will abort if that connection is lost */
orte_process_info.lifeline = ORTE_PROC_MY_HNP;
lifeline = ORTE_PROC_MY_HNP;
/* daemons will send their contact info back to the HNP as
* part of the message confirming they are read to go. HNP's
@ -313,7 +403,8 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
ORTE_ERROR_LOG(rc);
return rc;
}
/* the HNP has no lifeline, so leave that field the default NULL */
/* the HNP has no lifeline */
lifeline = NULL;
} else {
/* if this is for my own jobid, then I am getting an update of RML info
* for the daemons - so update our contact info and routes
@ -416,11 +507,11 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
}
/* setup the route to all other procs to flow through the daemon */
orte_routed_tree_module.wildcard_route.jobid = ORTE_PROC_MY_DAEMON->jobid;
orte_routed_tree_module.wildcard_route.vpid = ORTE_PROC_MY_DAEMON->vpid;
wildcard_route.jobid = ORTE_PROC_MY_DAEMON->jobid;
wildcard_route.vpid = ORTE_PROC_MY_DAEMON->vpid;
/* set our lifeline to the local daemon - we will abort if this connection is lost */
orte_process_info.lifeline = ORTE_PROC_MY_DAEMON;
lifeline = ORTE_PROC_MY_DAEMON;
/* register ourselves -this sends a message to the daemon (warming up that connection)
* and sends our contact info to the HNP when all local procs have reported
@ -446,3 +537,45 @@ int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat)
return ORTE_SUCCESS;
}
}
static int route_lost(const orte_process_name_t *route)
{
/* if we lose the connection to the lifeline and we are NOT already,
* in finalize, tell the OOB to abort.
* NOTE: we cannot call abort from here as the OOB needs to first
* release a thread-lock - otherwise, we will hang!!
*/
if (!orte_finalizing &&
NULL != lifeline &&
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route, lifeline)) {
opal_output(0, "%s routed:tree: Connection to lifeline %s lost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(lifeline));
return ORTE_ERR_FATAL;
}
/* we don't care about this one, so return success */
return ORTE_SUCCESS;
}
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
{
orte_rml_cmd_flag_t command;
int rc;
/* pack the update-RML command */
command = ORTE_RML_UPDATE_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -14,41 +14,13 @@
#include "orte_config.h"
#include "orte/types.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_hash_table.h"
#include "opal/threads/condition.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/routed/routed.h"
BEGIN_C_DECLS
struct orte_routed_tree_module_t {
orte_routed_module_t super;
opal_hash_table_t peer_list;
opal_hash_table_t vpid_wildcard_list;
orte_process_name_t wildcard_route;
opal_condition_t cond;
opal_mutex_t lock;
};
typedef struct orte_routed_tree_module_t orte_routed_tree_module_t;
ORTE_MODULE_DECLSPEC extern orte_routed_component_t mca_routed_tree_component;
extern orte_routed_tree_module_t orte_routed_tree_module;
int orte_routed_tree_module_init(void);
int orte_routed_tree_finalize(void);
int orte_routed_tree_update_route(orte_process_name_t *target,
orte_process_name_t *route);
orte_process_name_t orte_routed_tree_get_route(orte_process_name_t *target);
int orte_routed_tree_init_routes(orte_jobid_t job, opal_buffer_t *ndat);
extern orte_routed_module_t orte_routed_tree_module;
END_C_DECLS

Просмотреть файл

@ -53,88 +53,11 @@ orte_routed_component_t mca_routed_tree_component = {
routed_tree_init
};
orte_routed_tree_module_t orte_routed_tree_module = {
{
orte_routed_tree_module_init,
orte_routed_tree_finalize,
orte_routed_tree_update_route,
orte_routed_tree_get_route,
orte_routed_tree_init_routes
}
};
static orte_routed_module_t*
routed_tree_init(int* priority)
{
*priority = 70;
return &orte_routed_tree_module.super;
}
int
orte_routed_tree_module_init(void)
{
OBJ_CONSTRUCT(&orte_routed_tree_module.peer_list, opal_hash_table_t);
opal_hash_table_init(&orte_routed_tree_module.peer_list, 128);
OBJ_CONSTRUCT(&orte_routed_tree_module.vpid_wildcard_list, opal_hash_table_t);
opal_hash_table_init(&orte_routed_tree_module.vpid_wildcard_list, 128);
orte_routed_tree_module.wildcard_route.jobid = ORTE_NAME_INVALID->jobid;
orte_routed_tree_module.wildcard_route.vpid = ORTE_NAME_INVALID->vpid;
/* setup the global condition and lock */
OBJ_CONSTRUCT(&orte_routed_tree_module.cond, opal_condition_t);
OBJ_CONSTRUCT(&orte_routed_tree_module.lock, opal_mutex_t);
return ORTE_SUCCESS;
}
int
orte_routed_tree_finalize(void)
{
int rc;
uint64_t key;
void * value, *node, *next_node;
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
*/
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if I am the HNP, I need to stop the comm recv */
if (orte_process_info.hnp) {
orte_routed_base_comm_stop();
}
/* don't destruct the routes until *after* we send the
* sync as the oob will be asking us how to route
* the message!
*/
rc = opal_hash_table_get_first_key_uint64(&orte_routed_tree_module.peer_list,
&key, &value, &node);
while(OPAL_SUCCESS == rc) {
if(NULL != value) {
free(value);
}
rc = opal_hash_table_get_next_key_uint64(&orte_routed_tree_module.peer_list,
&key, &value, node, &next_node);
node = next_node;
}
OBJ_DESTRUCT(&orte_routed_tree_module.peer_list);
OBJ_DESTRUCT(&orte_routed_tree_module.vpid_wildcard_list);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&orte_routed_tree_module.cond);
OBJ_DESTRUCT(&orte_routed_tree_module.lock);
return ORTE_SUCCESS;
return &orte_routed_tree_module;
}

Просмотреть файл

@ -10,7 +10,8 @@
sources = \
routed_unity.h \
routed_unity_component.c
routed_unity_component.c \
routed_unity.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

587
orte/mca/routed/unity/routed_unity.c Обычный файл
Просмотреть файл

@ -0,0 +1,587 @@
/*
* Copyright (c) 2007 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/util/output.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/threads/condition.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/base/base.h"
#include "routed_unity.h"
static opal_condition_t cond;
static opal_mutex_t lock;
static opal_hash_table_t peer_list;
static int init(void);
static int finalize(void);
static int update_route(orte_process_name_t *target,
orte_process_name_t *route);
static orte_process_name_t get_route(orte_process_name_t *target);
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat);
static int route_lost(const orte_process_name_t *route);
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf);
static orte_process_name_t *lifeline=NULL;
orte_routed_module_t orte_routed_unity_module = {
init,
finalize,
update_route,
get_route,
init_routes,
route_lost,
get_wireup_info
};
static int init(void)
{
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
OBJ_CONSTRUCT(&lock, opal_mutex_t);
OBJ_CONSTRUCT(&peer_list, opal_hash_table_t);
opal_hash_table_init(&peer_list, 128);
return ORTE_SUCCESS;
}
static int finalize(void)
{
int rc;
uint64_t key;
void * value, *node, *next_node;
/* if I am the HNP, I need to stop the comm recv */
if (orte_process_info.hnp) {
orte_routed_base_comm_stop();
}
/* if I am an application process (but NOT a tool), indicate that I am
* truly finalizing prior to departure
*/
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* don't destruct the routes until *after* we send the
* sync as the oob will be asking us how to route
* the message!
*/
rc = opal_hash_table_get_first_key_uint64(&peer_list, &key, &value, &node);
while(OPAL_SUCCESS == rc) {
if(NULL != value) {
free(value);
}
rc = opal_hash_table_get_next_key_uint64(&peer_list, &key, &value, node, &next_node);
node = next_node;
}
OBJ_DESTRUCT(&peer_list);
/* cleanup the global condition */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
return ORTE_SUCCESS;
}
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t * route_copy;
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* this message came from a different job family, so we will update
* our local route table so we know how to get there
*/
/* if the route is direct, do nothing - we default to direct routing */
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
target, route)) {
goto direct;
}
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity_update: diff job family routing %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(route)));
route_copy = malloc(sizeof(orte_process_name_t));
*route_copy = *route;
/* if we are routing everything for this target through one place,
* then the target vpid is ORTE_VPID_WILDCARD. So no need for
* special cases, just add it
*/
rc = opal_hash_table_set_value_uint64(&peer_list, orte_util_hash_name(target),
route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
direct:
/* if it came from our own job family or was direct, there is nothing to do */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity_update: %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(route)));
return ORTE_SUCCESS;
}
static orte_process_name_t get_route(orte_process_name_t *target)
{
orte_process_name_t *ret, lookup;
int rc;
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
rc = opal_hash_table_get_value_uint64(&peer_list, orte_util_hash_name(target),
(void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
}
/* check to see if we specified the route to be for all vpids in the job */
lookup = *target;
lookup.vpid = ORTE_VPID_WILDCARD;
rc = opal_hash_table_get_value_uint64(&peer_list, orte_util_hash_name(&lookup),
(void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
}
}
/* if it is our own job family, or we didn't find it on the list, just go direct */
ret = target;
found:
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed_unity_get(%s) --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(ret)));
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_process_name_t name;
opal_buffer_t buf;
orte_std_cntr_t cnt;
char *rml_uri;
int rc;
/* lookup the job object */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, then send out the info to complete the exchange */
if (jdata->num_reported == jdata->num_procs) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity:callback trigger fired on job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid)));
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack the RML contact info for each proc */
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(jdata->jobid, &buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* send it to all procs via xcast */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jdata->jobid, &buf, ORTE_RML_TAG_INIT_ROUTES))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
}
static int init_routes(orte_jobid_t job, opal_buffer_t *ndata)
{
/* the unity module just sends direct to everyone, so it requires
* that the RML get loaded with contact info from all of our peers.
* We also look for and provide contact info for our local daemon
* so we can use it if needed
*/
/* if I am a tool, then I stand alone - there is nothing to do */
if (orte_process_info.tool) {
return ORTE_SUCCESS;
}
/* if I am a daemon... */
if (orte_process_info.daemon ) {
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes for daemon job %s\n\thnp_uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri));
if (NULL == ndata) {
/* indicates this is being called during orte_init.
* since the daemons in the unity component don't route messages,
* there is nothing for them to do - daemons will send their
* contact info as part of the message confirming they are ready
* to go. Just get the HNP's name for possible later use
*/
if (NULL == orte_process_info.my_hnp_uri) {
/* fatal error */
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* we don't have to update the route as the unity component is
* always "direct"
*/
/* set our lifeline as the HNP - we will abort if that connection fails */
lifeline = ORTE_PROC_MY_HNP;
return ORTE_SUCCESS;
}
/* if ndata isn't NULL, then we are getting this as part of an
* update due to a dynamic spawn of more daemons. We need to
* pass the buffer on to the rml for processing so the contact
* info can be added to our hash tables - thus allowing us to
* execute routing xcasts, for example.
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndata))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* if I am the HNP... */
if (orte_process_info.hnp) {
/* if this is for my own job, we handle
* updates of daemon contact info separately, so this
* shouldn't get called during daemon startup. This situation
* would occur, though, when we are doing orte_init within the HNP
* itself, but we store our data during orte_init anyway
* However, for the unity component, I do have to make myself
* available for processing incoming rml contact info messages
* from the procs - so setup that receive here
*/
int rc;
if (ORTE_PROC_MY_NAME->jobid == job) {
if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
/* if its from some other job, then this is info I need
* to process
*/
if (ORTE_SUCCESS != (rc = process_callback(job, ndata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* I do not have a lifeline */
lifeline = NULL;
return ORTE_SUCCESS;
}
{ /* MUST BE A PROC */
/* if ndata != NULL, then this is being invoked by the proc to
* init a route to a specified process. For example, in OMPI's
* publish/subscribe procedures, the DPM framework looks for an
* mca param containing the global ompi-server's uri. This info
* will come here so the proc can setup a route to
* the server
*/
if (NULL != ndata) {
int rc;
orte_std_cntr_t cnt;
orte_rml_cmd_flag_t command;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* extract the RML command from the buffer and discard it - this
* command is in there for compatibility with other routed
* components but is not needed here
*/
cnt=1;
opal_dss.unpack(ndata, &command, &cnt, ORTE_RML_CMD);
/* Set the contact info in the RML - this won't actually establish
* the connection, but just tells the RML how to reach the
* target proc(s)
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
{
/* if ndata=NULL, then we are being called during orte_init. In this
* case, we need to setup a few critical pieces of info
*/
int rc;
opal_buffer_t buf;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes for proc job %s\n\thnp_uri %s\n\tdaemon uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri,
(NULL == orte_process_info.my_daemon_uri) ? "NULL" : orte_process_info.my_daemon_uri));
/* get the local daemon's uri - this may not always be provided, so
* don't error if it isn't there
*/
if (NULL != orte_process_info.my_daemon_uri) {
/* Set the contact info in the RML and establish
* the connection so the daemon knows how to reach us.
* We have to do this as any non-direct xcast will come
* via our local daemon - and if it doesn't know how to
* reach us, then it will error out the message
*/
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_daemon_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the daemon's name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
ORTE_PROC_MY_DAEMON, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* we don't have to update the route as the unity component is
* always "direct"
*/
}
/* setup the hnp - this must always be provided, so
* error if it isn't there as we won't know how to complete
* the wireup for the unity component
*/
if (NULL == orte_process_info.my_hnp_uri) {
/* fatal error */
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: set hnp contact info and name",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* declare the HNP as our "lifeline" - this means that we will automatically
* abort if we lose that connection
*/
lifeline = ORTE_PROC_MY_HNP;
/* we don't have to update the route as the unity component is
* always "direct"
*/
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: register sync",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* register myself to require that I finalize before exiting
* This also will cause the local orted to send our contact
* into to the HNP once all my local peers have registered
*/
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: wait to recv contact info for peers",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* now setup a blocking receive and wait right here until we get
* the contact info for all of our peers
*/
OBJ_CONSTRUCT(&buf, opal_buffer_t);
rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &buf, ORTE_RML_TAG_INIT_ROUTES, 0);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: peer contact info recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* process it */
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
}
}
static int route_lost(const orte_process_name_t *route)
{
/* if we lose the connection to the lifeline and we are NOT already,
* in finalize, tell the OOB to abort.
* NOTE: we cannot call abort from here as the OOB needs to first
* release a thread-lock - otherwise, we will hang!!
*/
if (!orte_finalizing &&
NULL != lifeline &&
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route, lifeline)) {
opal_output(0, "%s routed:unity: Connection to lifeline %s lost",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(lifeline));
return ORTE_ERR_FATAL;
}
/* we don't care about this one, so return success */
return ORTE_SUCCESS;
}
static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf)
{
orte_rml_cmd_flag_t command;
int rc;
/* pack the update-RML command */
command = ORTE_RML_UPDATE_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -14,8 +14,6 @@
#include "orte_config.h"
#include "orte/types.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/routed/routed.h"
BEGIN_C_DECLS
@ -23,18 +21,8 @@ BEGIN_C_DECLS
ORTE_MODULE_DECLSPEC extern orte_routed_component_t mca_routed_unity_component;
int orte_routed_unity_module_init(void);
extern orte_routed_module_t orte_routed_unity_module;
int orte_routed_unity_finalize(void);
int orte_routed_unity_update_route(orte_process_name_t *target,
orte_process_name_t *route);
orte_process_name_t orte_routed_unity_get_route(orte_process_name_t *target);
int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndat);
int orte_routed_unity_warmup_routes(void);
END_C_DECLS

Просмотреть файл

@ -11,31 +11,10 @@
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/util/output.h"
#include "opal/class/opal_hash_table.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/threads/condition.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/runtime.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/base/base.h"
#include "routed_unity.h"
static orte_routed_module_t* routed_unity_init(int* priority);
static opal_condition_t cond;
static opal_mutex_t lock;
static opal_hash_table_t peer_list;
/**
@ -67,14 +46,6 @@ orte_routed_component_t mca_routed_unity_component = {
routed_unity_init
};
orte_routed_module_t orte_routed_unity_module = {
orte_routed_unity_module_init,
orte_routed_unity_finalize,
orte_routed_unity_update_route,
orte_routed_unity_get_route,
orte_routed_unity_init_routes
};
static orte_routed_module_t*
routed_unity_init(int* priority)
{
@ -82,492 +53,3 @@ routed_unity_init(int* priority)
return &orte_routed_unity_module;
}
int orte_routed_unity_module_init(void)
{
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
OBJ_CONSTRUCT(&lock, opal_mutex_t);
OBJ_CONSTRUCT(&peer_list, opal_hash_table_t);
opal_hash_table_init(&peer_list, 128);
return ORTE_SUCCESS;
}
int
orte_routed_unity_finalize(void)
{
int rc;
uint64_t key;
void * value, *node, *next_node;
/* if I am the HNP, I need to stop the comm recv */
if (orte_process_info.hnp) {
orte_routed_base_comm_stop();
}
/* if I am an application process (but NOT a tool), indicate that I am
* truly finalizing prior to departure
*/
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* don't destruct the routes until *after* we send the
* sync as the oob will be asking us how to route
* the message!
*/
rc = opal_hash_table_get_first_key_uint64(&peer_list, &key, &value, &node);
while(OPAL_SUCCESS == rc) {
if(NULL != value) {
free(value);
}
rc = opal_hash_table_get_next_key_uint64(&peer_list, &key, &value, node, &next_node);
node = next_node;
}
OBJ_DESTRUCT(&peer_list);
/* cleanup the global condition */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
return ORTE_SUCCESS;
}
int
orte_routed_unity_update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t * route_copy;
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* this message came from a different job family, so we will update
* our local route table so we know how to get there
*/
/* if the route is direct, do nothing - we default to direct routing */
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
target, route)) {
goto direct;
}
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity_update: diff job family routing %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(route)));
route_copy = malloc(sizeof(orte_process_name_t));
*route_copy = *route;
/* if we are routing everything for this target through one place,
* then the target vpid is ORTE_VPID_WILDCARD. So no need for
* special cases, just add it
*/
rc = opal_hash_table_set_value_uint64(&peer_list, orte_util_hash_name(target),
route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
direct:
/* if it came from our own job family or was direct, there is nothing to do */
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity_update: %s --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(route)));
return ORTE_SUCCESS;
}
orte_process_name_t
orte_routed_unity_get_route(orte_process_name_t *target)
{
orte_process_name_t *ret, lookup;
int rc;
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
rc = opal_hash_table_get_value_uint64(&peer_list, orte_util_hash_name(target),
(void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
}
/* check to see if we specified the route to be for all vpids in the job */
lookup = *target;
lookup.vpid = ORTE_VPID_WILDCARD;
rc = opal_hash_table_get_value_uint64(&peer_list, orte_util_hash_name(&lookup),
(void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
}
}
/* if it is our own job family, or we didn't find it on the list, just go direct */
ret = target;
found:
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_output,
"%s routed_unity_get(%s) --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(ret)));
return *ret;
}
static int process_callback(orte_jobid_t job, opal_buffer_t *buffer)
{
orte_proc_t **procs;
orte_job_t *jdata;
orte_process_name_t name;
opal_buffer_t buf;
orte_std_cntr_t cnt;
char *rml_uri;
int rc;
/* lookup the job object */
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
procs = (orte_proc_t**)jdata->procs->addr;
/* unpack the data for each entry */
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(buffer, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity:callback got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == rml_uri) ? "NULL" : rml_uri));
if (rml_uri == NULL) continue;
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* extract the proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
continue;
}
/* the procs are stored in vpid order, so update the record */
procs[name.vpid]->rml_uri = strdup(rml_uri);
free(rml_uri);
/* update the proc state */
if (procs[name.vpid]->state < ORTE_PROC_STATE_RUNNING) {
procs[name.vpid]->state = ORTE_PROC_STATE_RUNNING;
}
++jdata->num_reported;
cnt = 1;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if all procs have reported, then send out the info to complete the exchange */
if (jdata->num_reported == jdata->num_procs) {
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity:callback trigger fired on job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid)));
/* update the job state */
if (jdata->state < ORTE_JOB_STATE_RUNNING) {
jdata->state = ORTE_JOB_STATE_RUNNING;
}
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* pack the RML contact info for each proc */
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(jdata->jobid, &buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* send it to all procs via xcast */
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(jdata->jobid, &buf, ORTE_RML_TAG_INIT_ROUTES))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
}
return ORTE_SUCCESS;
}
int orte_routed_unity_init_routes(orte_jobid_t job, opal_buffer_t *ndata)
{
/* the unity module just sends direct to everyone, so it requires
* that the RML get loaded with contact info from all of our peers.
* We also look for and provide contact info for our local daemon
* so we can use it if needed
*/
/* if I am a tool, then I stand alone - there is nothing to do */
if (orte_process_info.tool) {
return ORTE_SUCCESS;
}
/* if I am a daemon... */
if (orte_process_info.daemon ) {
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes for daemon job %s\n\thnp_uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri));
if (NULL == ndata) {
/* indicates this is being called during orte_init.
* since the daemons in the unity component don't route messages,
* there is nothing for them to do - daemons will send their
* contact info as part of the message confirming they are ready
* to go. Just get the HNP's name for possible later use
*/
if (NULL == orte_process_info.my_hnp_uri) {
/* fatal error */
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* we don't have to update the route as the unity component is
* always "direct"
*/
/* set our lifeline as the HNP - we will abort if that connection fails */
orte_process_info.lifeline = ORTE_PROC_MY_HNP;
return ORTE_SUCCESS;
}
/* if ndata isn't NULL, then we are getting this as part of an
* update due to a dynamic spawn of more daemons. We need to
* pass the buffer on to the rml for processing so the contact
* info can be added to our hash tables - thus allowing us to
* execute routing xcasts, for example.
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndata))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* if I am the HNP... */
if (orte_process_info.hnp) {
/* if this is for my own job, we handle
* updates of daemon contact info separately, so this
* shouldn't get called during daemon startup. This situation
* would occur, though, when we are doing orte_init within the HNP
* itself, but we store our data during orte_init anyway
* However, for the unity component, I do have to make myself
* available for processing incoming rml contact info messages
* from the procs - so setup that receive here
*/
int rc;
if (ORTE_PROC_MY_NAME->jobid == job) {
if (ORTE_SUCCESS != (rc = orte_routed_base_comm_start())) {
ORTE_ERROR_LOG(rc);
return rc;
}
} else {
/* if its from some other job, then this is info I need
* to process
*/
if (ORTE_SUCCESS != (rc = process_callback(job, ndata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* I do not have a lifeline, so leave it as the default NULL */
return ORTE_SUCCESS;
}
{ /* MUST BE A PROC */
/* if ndata != NULL, then this is being invoked by the proc to
* init a route to a specified process. For example, in OMPI's
* publish/subscribe procedures, the DPM framework looks for an
* mca param containing the global ompi-server's uri. This info
* will come here so the proc can setup a route to
* the server
*/
if (NULL != ndata) {
int rc;
orte_std_cntr_t cnt;
orte_rml_cmd_flag_t command;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes w/non-NULL data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* extract the RML command from the buffer and discard it - this
* command is in there for compatibility with other routed
* components but is not needed here
*/
cnt=1;
opal_dss.unpack(ndata, &command, &cnt, ORTE_RML_CMD);
/* Set the contact info in the RML - this won't actually establish
* the connection, but just tells the RML how to reach the
* target proc(s)
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndata))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
{
/* if ndata=NULL, then we are being called during orte_init. In this
* case, we need to setup a few critical pieces of info
*/
int rc;
opal_buffer_t buf;
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
"%s routed_unity: init routes for proc job %s\n\thnp_uri %s\n\tdaemon uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job),
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri,
(NULL == orte_process_info.my_daemon_uri) ? "NULL" : orte_process_info.my_daemon_uri));
/* get the local daemon's uri - this may not always be provided, so
* don't error if it isn't there
*/
if (NULL != orte_process_info.my_daemon_uri) {
/* Set the contact info in the RML and establish
* the connection so the daemon knows how to reach us.
* We have to do this as any non-direct xcast will come
* via our local daemon - and if it doesn't know how to
* reach us, then it will error out the message
*/
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_daemon_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the daemon's name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
ORTE_PROC_MY_DAEMON, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* we don't have to update the route as the unity component is
* always "direct"
*/
}
/* setup the hnp - this must always be provided, so
* error if it isn't there as we won't know how to complete
* the wireup for the unity component
*/
if (NULL == orte_process_info.my_hnp_uri) {
/* fatal error */
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: set hnp contact info and name",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(orte_process_info.my_hnp_uri))) {
ORTE_ERROR_LOG(rc);
return(rc);
}
/* extract the hnp name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* declare the HNP as our "lifeline" - this means that we will automatically
* abort if we lose that connection
*/
orte_process_info.lifeline = ORTE_PROC_MY_HNP;
/* we don't have to update the route as the unity component is
* always "direct"
*/
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: register sync",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* register myself to require that I finalize before exiting
* This also will cause the local orted to send our contact
* into to the HNP once all my local peers have registered
*/
if (ORTE_SUCCESS != (rc = orte_routed_base_register_sync())) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: wait to recv contact info for peers",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* now setup a blocking receive and wait right here until we get
* the contact info for all of our peers
*/
OBJ_CONSTRUCT(&buf, opal_buffer_t);
rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &buf, ORTE_RML_TAG_INIT_ROUTES, 0);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_unity_init: peer contact info recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* process it */
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
}
}

Просмотреть файл

@ -43,7 +43,6 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
/* .my_daemon_uri = */ NULL,
/* .my_hnp = */ {0, 0},
/* .my_hnp_uri = */ NULL,
/* .lifeline = */ NULL,
/* .hnp_pid = */ 0,
/* ,app_num = */ -1,
/* ,universe_size = */ -1,

Просмотреть файл

@ -50,7 +50,6 @@ struct orte_proc_info_t {
char *my_daemon_uri; /**< Contact info to local daemon */
orte_process_name_t my_hnp; /**< Name of my hnp */
char *my_hnp_uri; /**< Contact info for my hnp */
orte_process_name_t *lifeline; /**< Name of the contact I cannot live without */
pid_t hnp_pid; /**< hnp pid - used if singleton */
orte_std_cntr_t app_num; /**< our index into the app_context array */
orte_std_cntr_t universe_size; /**< the size of the universe we are in */