Get the direct routed component to work with both TCP and USOCK OOB components. We previously had setup the direct component so it would only support direct-launched applications. Thus, all routes went direct between processes. However, if the job had been launched by mpirun, this made no sense - what you wanted instead was to have each app proc talk directly to its daemon, but have the daemons all directly connect to each other.
So we need all the routing code for dealing with cross-job communications, lifelines, etc. The HNP will be directly connected to all daemons as they must callback at startup, and so we need to track those children correctly so we know when it is okay to terminate. We still have to support direct launch, though, as this is the only component we can use in that scenario. So if the app doesn't have daemon URI info, then it must fall back to directly connecting to everything.
Этот коммит содержится в:
родитель
595740a8e3
Коммит
d6d69e2b13
@ -386,6 +386,11 @@ static void process_uri(char *uri)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
|
||||
opal_bitmap_set_bit(&pr->addressable, component->idx);
|
||||
} else {
|
||||
opal_output_verbose(5, orte_oob_base_framework.framework_output,
|
||||
"%s: peer %s is NOT reachable via component %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,6 @@
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/parse_options.h"
|
||||
#include "orte/util/show_help.h"
|
||||
|
@ -61,7 +61,6 @@
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/mca/state/state.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/parse_options.h"
|
||||
@ -302,21 +301,25 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
* by me via my daemon
|
||||
*/
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
ui64 = (uint64_t*)peer;
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers,
|
||||
(*ui64), (void**)&pr) || NULL == pr) {
|
||||
pr = OBJ_NEW(mca_oob_usock_peer_t);
|
||||
pr->name = *peer;
|
||||
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr);
|
||||
}
|
||||
if (ORTE_PROC_MY_DAEMON->jobid == peer->jobid) {
|
||||
/* if this is my daemon, then take it - otherwise, ignore */
|
||||
if (ORTE_PROC_MY_DAEMON->jobid == peer->jobid &&
|
||||
ORTE_PROC_MY_DAEMON->vpid == peer->vpid) {
|
||||
ui64 = (uint64_t*)peer;
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers,
|
||||
(*ui64), (void**)&pr) || NULL == pr) {
|
||||
pr = OBJ_NEW(mca_oob_usock_peer_t);
|
||||
pr->name = *peer;
|
||||
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr);
|
||||
}
|
||||
/* we have to initiate the connection because otherwise the
|
||||
* daemon has no way to communicate to us via this component
|
||||
* as the app doesn't have a listening port */
|
||||
pr->state = MCA_OOB_USOCK_CONNECTING;
|
||||
ORTE_ACTIVATE_USOCK_CONN_STATE(pr, mca_oob_usock_peer_try_connect);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
/* otherwise, indicate that we cannot reach this peer */
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
|
||||
/* if I am a daemon or HNP, I can only reach my
|
||||
@ -397,10 +400,12 @@ void mca_oob_usock_component_lost_connection(int fd, short args, void *cbdata)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* activate the proc state */
|
||||
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer->name)) {
|
||||
/* activate the proc state - since an app only connects to its parent daemon,
|
||||
* and the daemon is *always* its lifeline, activate the lifeline lost state */
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST);
|
||||
} else {
|
||||
/* we are the daemon end, so notify that the child's comm failed */
|
||||
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED);
|
||||
}
|
||||
|
||||
@ -457,10 +462,12 @@ void mca_oob_usock_component_failed_to_connect(int fd, short args, void *cbdata)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&pop->peer->name));
|
||||
|
||||
/* if this was a lifeline, then alert */
|
||||
if (ORTE_SUCCESS != orte_routed.route_lost(&pop->peer->name)) {
|
||||
/* since an app only connects to its parent daemon,
|
||||
* and the daemon is *always* its lifeline, activate the lifeline lost state */
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST);
|
||||
} else {
|
||||
/* we are the daemon end, so notify that the child's comm failed */
|
||||
ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED);
|
||||
}
|
||||
OBJ_RELEASE(pop);
|
||||
|
@ -66,7 +66,6 @@
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "oob_usock.h"
|
||||
@ -707,9 +706,6 @@ static void usock_peer_connected(mca_oob_usock_peer_t* peer)
|
||||
}
|
||||
peer->state = MCA_OOB_USOCK_CONNECTED;
|
||||
|
||||
/* update the route */
|
||||
orte_routed.update_route(&peer->name, &peer->name);
|
||||
|
||||
/* initiate send of first message on queue */
|
||||
if (NULL == peer->send_msg) {
|
||||
peer->send_msg = (mca_oob_usock_send_t*)
|
||||
|
@ -65,7 +65,6 @@
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/mca/state/state.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
|
@ -476,8 +476,6 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
@ -485,6 +483,8 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* if we are using static ports, set my lifeline to point at my parent */
|
||||
if (orte_static_ports) {
|
||||
@ -659,18 +659,18 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
/* extract the daemon's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
|
||||
/* set our lifeline to the local daemon - we will abort if this connection is lost */
|
||||
lifeline = ORTE_PROC_MY_DAEMON;
|
||||
orte_routing_is_enabled = true;
|
||||
|
@ -459,8 +459,6 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
@ -468,6 +466,8 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* if we are using static ports, set my lifeline to point at my parent */
|
||||
if (orte_static_ports) {
|
||||
@ -633,18 +633,18 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
/* extract the daemon's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
|
||||
/* set our lifeline to the local daemon - we will abort if this connection is lost */
|
||||
lifeline = ORTE_PROC_MY_DAEMON;
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/data_type_support/orte_dt_support.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
|
||||
@ -70,13 +71,18 @@ orte_routed_module_t orte_routed_direct_module = {
|
||||
#endif
|
||||
};
|
||||
|
||||
static orte_process_name_t *lifeline = NULL;
|
||||
static opal_list_t my_children;
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&my_children, opal_list_t);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
OPAL_LIST_DESTRUCT(&my_children);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -125,7 +131,16 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
||||
daemon.vpid = ORTE_PROC_MY_DAEMON->vpid;
|
||||
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
ret = ORTE_PROC_MY_DAEMON;
|
||||
/* if I am an application, AND I have knowledge of
|
||||
* my daemon (i.e., a daemon launched me), then I
|
||||
* always route thru the daemon */
|
||||
if (NULL != orte_process_info.my_daemon_uri) {
|
||||
ret = ORTE_PROC_MY_DAEMON;
|
||||
} else {
|
||||
/* I was direct launched and do not have
|
||||
* a daemon, so I have to route direct */
|
||||
ret = target;
|
||||
}
|
||||
goto found;
|
||||
}
|
||||
|
||||
@ -210,6 +225,16 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
||||
}
|
||||
|
||||
|
||||
static void recv_ack(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
bool *ack_waiting = (bool*)cbdata;
|
||||
|
||||
/* flag as complete */
|
||||
*ack_waiting = false;
|
||||
}
|
||||
|
||||
static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
{
|
||||
int rc;
|
||||
@ -240,8 +265,6 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
@ -249,7 +272,11 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
/* the HNP is my lifeline */
|
||||
lifeline = ORTE_PROC_MY_HNP;
|
||||
|
||||
/* daemons will send their contact info back to the HNP as
|
||||
* part of the message confirming they are read to go. HNP's
|
||||
* load their contact info during orte_init
|
||||
@ -296,50 +323,180 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
|
||||
/*** MUST BE A PROC ***/
|
||||
if (NULL == ndat) {
|
||||
/* if we were direct launched, there is nothing we need to do. If we
|
||||
* were launched by mpirun, then we need to set the HNP and daemon info */
|
||||
if (NULL != orte_process_info.my_hnp_uri) {
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
ORTE_PROC_MY_HNP, NULL))) {
|
||||
ORTE_PROC_MY_HNP, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* we don't set the HNP's contact info as we don't need it - we
|
||||
* only contact our local daemon, which might be the HNP (in which
|
||||
* case it will have also been passed as our daemon uri) */
|
||||
}
|
||||
|
||||
if (NULL != orte_process_info.my_daemon_uri) {
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
/* extract the daemon's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
/* my daemon is my lifeline */
|
||||
lifeline = ORTE_PROC_MY_DAEMON;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if ndat != NULL, then this is being invoked by the proc to
|
||||
* init a route to a specified process that is outside of our
|
||||
* job family. It really doesn't matter as everything must
|
||||
* go direct
|
||||
* job family. We want that route to go through our HNP, routed via
|
||||
* out local daemon - however, we cannot know for
|
||||
* certain that the HNP already knows how to talk to the specified
|
||||
* procs. For example, in OMPI's publish/subscribe procedures, the
|
||||
* DPM framework looks for an mca param containing the global ompi-server's
|
||||
* uri. This info will come here so the proc can setup a route to
|
||||
* the server - we need to pass the routing info to our HNP.
|
||||
*
|
||||
* Obviously, if we were direct launched, we won't have an HNP, in
|
||||
* which case we just update our own contact info and go direct
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
if (NULL == orte_process_info.my_hnp_uri) {
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_framework.framework_output,
|
||||
"%s routed_direct: init routes w/non-NULL data",
|
||||
"%s routed_direct: init routes w/non-NULL data and direct launched",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(ndat))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
opal_buffer_t *xfer;
|
||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||
bool ack_waiting;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_framework.framework_output,
|
||||
"%s routed_direct: init routes w/non-NULL data",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) != ORTE_JOB_FAMILY(job)) {
|
||||
/* if this is for a different job family, then we route via our HNP
|
||||
* to minimize connection counts to entities such as ompi-server, so
|
||||
* start by sending the contact info to the HNP for update
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_framework.framework_output,
|
||||
"%s routed_direct_init_routes: diff job family - sending update to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
/* prep the buffer for transmission to the HNP */
|
||||
xfer = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.pack(xfer, &cmd, 1, ORTE_RML_CMD);
|
||||
opal_dss.copy_payload(xfer, ndat);
|
||||
|
||||
/* save any new connections for use in subsequent connect_accept calls */
|
||||
orte_routed_base_update_hnps(ndat);
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, xfer,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(xfer);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
*/
|
||||
ack_waiting = true;
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_UPDATE_ROUTE_ACK,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
recv_ack, &ack_waiting);
|
||||
ORTE_WAIT_FOR_COMPLETION(ack_waiting);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_framework.framework_output,
|
||||
"%s routed_direct_init_routes: ack recvd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* our get_route function automatically routes all messages for
|
||||
* other job families via the HNP, so nothing more to do here
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int route_lost(const orte_process_name_t *route)
|
||||
{
|
||||
/* there is no lifeline, so we don't care */
|
||||
opal_list_item_t *item;
|
||||
orte_routed_tree_t *child;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
int i;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
|
||||
"%s route to %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(route)));
|
||||
|
||||
/* if the route is to a different job family and we are the HNP, look it up */
|
||||
if ((ORTE_JOB_FAMILY(route->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) &&
|
||||
ORTE_PROC_IS_HNP) {
|
||||
jfamily = ORTE_JOB_FAMILY(route->jobid);
|
||||
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
|
||||
"%s routed_direct: route to %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(route->jobid)));
|
||||
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||
OBJ_RELEASE(jfam);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if we lose the connection to the lifeline and we are NOT already,
|
||||
* in finalize, tell the OOB to abort.
|
||||
* NOTE: we cannot call abort from here as the OOB needs to first
|
||||
* release a thread-lock - otherwise, we will hang!!
|
||||
*/
|
||||
if (!orte_finalizing &&
|
||||
NULL != lifeline &&
|
||||
OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, route, lifeline)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
|
||||
"%s routed:direct: Connection to lifeline %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(lifeline)));
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
|
||||
/* if we are the HNP, and the route is a daemon,
|
||||
* see if it is one of our children - if so, remove it
|
||||
*/
|
||||
if (ORTE_PROC_IS_HNP &&
|
||||
route->jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
for (item = opal_list_get_first(&my_children);
|
||||
item != opal_list_get_end(&my_children);
|
||||
item = opal_list_get_next(item)) {
|
||||
child = (orte_routed_tree_t*)item;
|
||||
if (child->vpid == route->vpid) {
|
||||
opal_list_remove_item(&my_children, item);
|
||||
OBJ_RELEASE(item);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* we don't care about this one, so return success */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -358,29 +515,25 @@ static int set_lifeline(orte_process_name_t *proc)
|
||||
|
||||
static void update_routing_plan(void)
|
||||
{
|
||||
/* nothing to do here */
|
||||
return;
|
||||
}
|
||||
|
||||
static void get_routing_list(opal_list_t *coll)
|
||||
{
|
||||
orte_namelist_t *nm;
|
||||
orte_routed_tree_t *child;
|
||||
int32_t i;
|
||||
orte_job_t *jdata;
|
||||
orte_proc_t *proc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
|
||||
"%s routed:direct: update routing plan",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am anything other than daemons and the HNP, this
|
||||
* is a meaningless command as I am not allowed to route
|
||||
*/
|
||||
if (!ORTE_PROC_IS_DAEMON && !ORTE_PROC_IS_HNP) {
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
/* nothing to do */
|
||||
return;
|
||||
}
|
||||
|
||||
/* clear the current list */
|
||||
OPAL_LIST_DESTRUCT(&my_children);
|
||||
OBJ_CONSTRUCT(&my_children, opal_list_t);
|
||||
|
||||
/* daemons don't route */
|
||||
if (ORTE_PROC_IS_DAEMON) {
|
||||
return;
|
||||
}
|
||||
/* HNP sends direct to each daemon */
|
||||
/* HNP is directly connected to each daemon */
|
||||
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return;
|
||||
@ -389,26 +542,29 @@ static void get_routing_list(opal_list_t *coll)
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
|
||||
continue;
|
||||
}
|
||||
if( proc->state <= ORTE_PROC_STATE_UNTERMINATED &&
|
||||
NULL != proc->rml_uri ) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_framework.framework_output,
|
||||
"%s get_routing_tree: Adding process %s state %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(proc->name)),
|
||||
orte_proc_state_to_str(proc->state)));
|
||||
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = proc->name.jobid;
|
||||
nm->name.vpid = proc->name.vpid;
|
||||
opal_list_append(coll, &nm->super);
|
||||
} else {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_routed_base_framework.framework_output,
|
||||
"%s get_routing_tree: Skipped process %15s state %s (non functional daemon)",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&(proc->name)),
|
||||
orte_proc_state_to_str(proc->state)));
|
||||
}
|
||||
child = OBJ_NEW(orte_routed_tree_t);
|
||||
child->vpid = proc->name.vpid;
|
||||
opal_list_append(&my_children, &child->super);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static void get_routing_list(opal_list_t *coll)
|
||||
{
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
|
||||
"%s routed:direct: get routing list",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am anything other than daemons and the HNP, this
|
||||
* is a meaningless command as I am not allowed to route
|
||||
*/
|
||||
if (!ORTE_PROC_IS_DAEMON && !ORTE_PROC_IS_HNP) {
|
||||
return;
|
||||
}
|
||||
|
||||
orte_routed_base_xcast_routing(coll, &my_children);
|
||||
}
|
||||
|
||||
static int get_wireup_info(opal_buffer_t *buf)
|
||||
@ -426,21 +582,10 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
|
||||
static size_t num_routes(void)
|
||||
{
|
||||
orte_job_t *jdata;
|
||||
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* if I am the HNP, then the number of routes is
|
||||
* the number of daemons still alive (other than me)
|
||||
*/
|
||||
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (jdata->num_procs - jdata->num_terminated - 1);
|
||||
return opal_list_get_size(&my_children);
|
||||
}
|
||||
|
||||
#if OPAL_ENABLE_FT_CR == 1
|
||||
|
@ -480,8 +480,6 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
|
||||
return ORTE_ERR_FATAL;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* extract the hnp name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
|
||||
@ -489,6 +487,8 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
|
||||
|
||||
/* if we are using static ports, set my lifeline to point at my parent */
|
||||
if (orte_static_ports) {
|
||||
@ -660,18 +660,18 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
/* extract the daemon's name so we can update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
|
||||
ORTE_PROC_MY_DAEMON, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Set the contact info in the RML - this won't actually establish
|
||||
* the connection, but just tells the RML how to reach the daemon
|
||||
* if/when we attempt to send to it
|
||||
*/
|
||||
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
|
||||
|
||||
/* set our lifeline to the local daemon - we will abort if this connection is lost */
|
||||
lifeline = ORTE_PROC_MY_DAEMON;
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user