1
1
openmpi/orte/mca/oob/base/oob_base_stubs.c
Ralph Castain d6d69e2b13 Get the direct routed component to work with both TCP and USOCK OOB components. We previously had setup the direct component so it would only support direct-launched applications. Thus, all routes went direct between processes. However, if the job had been launched by mpirun, this made no sense - what you wanted instead was to have each app proc talk directly to its daemon, but have the daemons all directly connect to each other.
So we need all the routing code for dealing with cross-job communications, lifelines, etc. The HNP will be directly connected to all daemons as they must callback at startup, and so we need to track those children correctly so we know when it is okay to terminate.

We still have to support direct launch, though, as this is the only component we can use in that scenario. So if the app doesn't have daemon URI info, then it must fall back to directly connecting to everything.
2014-12-07 09:11:48 -08:00

432 строки
16 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/util/output.h"
#include "opal/mca/dstore/dstore.h"
#include "opal/util/argv.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/state/state.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/oob/base/base.h"
#if OPAL_ENABLE_FT_CR == 1
#include "orte/mca/state/base/base.h"
#endif
static void process_uri(char *uri);
void orte_oob_base_send_nb(int fd, short args, void *cbdata)
{
orte_oob_send_t *cd = (orte_oob_send_t*)cbdata;
orte_rml_send_t *msg = cd->msg;
mca_base_component_list_item_t *cli;
orte_oob_base_peer_t *pr;
int rc;
uint64_t ui64;
bool msg_sent;
mca_oob_base_component_t *component;
bool reachable;
opal_list_t myvals;
opal_value_t *kv;
/* done with this. release it now */
OBJ_RELEASE(cd);
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send to target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst));
/* check if we have this peer in our hash table */
memcpy(&ui64, (char*)&msg->dst, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&pr) ||
NULL == pr) {
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send unknown peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst));
/* for direct launched procs, the URI might be in the database,
* so check there next - if it is, the peer object will be added
* to our hash table
*/
OBJ_CONSTRUCT(&myvals, opal_list_t);
if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_internal, &msg->dst,
OPAL_DSTORE_URI, &myvals)) {
kv = (opal_value_t*)opal_list_get_first(&myvals);
if (NULL != kv) {
process_uri(kv->data.string);
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&pr) ||
NULL == pr) {
/* that is just plain wrong */
ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
msg->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(msg);
OPAL_LIST_DESTRUCT(&myvals);
return;
}
} else {
ORTE_ERROR_LOG(ORTE_ERR_ADDRESSEE_UNKNOWN);
msg->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(msg);
OPAL_LIST_DESTRUCT(&myvals);
return;
}
OPAL_LIST_DESTRUCT(&myvals);
} else {
/* even though we don't know about this peer yet, we still might
* be able to get to it via routing, so ask each component if
* it can reach it
*/
reachable = false;
pr = NULL;
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
if (NULL != component->is_reachable) {
if (component->is_reachable(&msg->dst)) {
/* there is a way to reach this peer - record it
* so we don't waste this time again
*/
if (NULL == pr) {
pr = OBJ_NEW(orte_oob_base_peer_t);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, ui64, (void*)pr))) {
ORTE_ERROR_LOG(rc);
msg->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(msg);
return;
}
}
/* mark that this component can reach the peer */
opal_bitmap_set_bit(&pr->addressable, component->idx);
/* flag that at least one component can reach this peer */
reachable = true;
}
}
}
/* if nobody could reach it, then that's an error */
if (!reachable) {
msg->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(msg);
return;
}
}
}
/* if we already have a connection to this peer, use it */
if (NULL != pr->component) {
/* post this msg for send by this transport - the component
* runs on our event base, so we can just call their function
*/
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send known transport for peer %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst));
if (ORTE_SUCCESS == (rc = pr->component->send_nb(msg))) {
return;
}
}
/* if we haven't identified a transport to this peer,
* loop across all available components in priority order until
* one replies that it has a module that can reach this peer.
* Let it try to make the connection
*/
msg_sent = false;
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
/* is this peer addressable by this component? */
if (!opal_bitmap_is_set_bit(&pr->addressable, component->idx)) {
continue;
}
/* it is addressable, so attempt to send via that transport */
if (ORTE_SUCCESS == (rc = component->send_nb(msg))) {
/* the msg status will be set upon send completion/failure */
msg_sent = true;
/* point to this transport for any future messages */
pr->component = component;
break;
} else if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
/* components return "next option" if they can't connect
* to this peer. anything else is a true error.
*/
ORTE_ERROR_LOG(rc);
msg->status = rc;
ORTE_RML_SEND_COMPLETE(msg);
return;
}
}
/* if no component can reach this peer, that's an error - post
* it back to the RML for handling
*/
if (!msg_sent) {
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:send no path to target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&msg->dst));
msg->status = ORTE_ERR_NO_PATH_TO_TARGET;
ORTE_RML_SEND_COMPLETE(msg);
}
}
/**
* Obtain a uri for initial connection purposes
*
* During initial wireup, we can only transfer contact info on the daemon
* command line. This limits what we can send to a string representation of
* the actual contact info, which gets sent in a uri-like form. Not every
* oob module can support this transaction, so this function will loop
* across all oob components/modules, letting each add to the uri string if
* it supports bootstrap operations. An error will be returned in the cbfunc
* if NO component can successfully provide a contact.
*
* Note: since there is a limit to what an OS will allow on a cmd line, we
* impose a limit on the length of the resulting uri via an MCA param. The
* default value of -1 implies unlimited - however, users with large numbers
* of interfaces on their nodes may wish to restrict the size.
*/
void orte_oob_base_get_addr(char **uri)
{
char *turi, *final=NULL, *tmp;
size_t len = 0;
int rc=ORTE_SUCCESS;
bool one_added = false;
mca_base_component_list_item_t *cli;
mca_oob_base_component_t *component;
/* start with our process name */
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
goto unblock;
}
len = strlen(final);
/* loop across all available modules to get their input
* up to the max length
*/
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
/* ask the component for its input, obtained when it
* opened its modules
*/
if (NULL == component->get_addr) {
/* doesn't support this ability */
continue;
}
/* the components operate within our event base, so we
* can directly call their get_uri function to get the
* pointer to the uri - this is not a copy, so
* do NOT free it!
*/
turi = component->get_addr();
if (NULL != turi) {
/* check overall length for limits */
if (0 < orte_oob_base.max_uri_length &&
orte_oob_base.max_uri_length < (int)(len + strlen(turi))) {
/* cannot accept the payload */
continue;
}
/* add new value to final one */
asprintf(&tmp, "%s;%s", final, turi);
free(turi);
free(final);
final = tmp;
len = strlen(final);
/* flag that at least one contributed */
one_added = true;
}
}
if (!one_added) {
/* nobody could contribute */
if (NULL != final) {
free(final);
final = NULL;
}
}
unblock:
*uri = final;
}
/**
* This function will loop
* across all oob components, letting each look at the uri and extract
* info from it if it can. An error is to be returned if NO component
* can successfully extract a contact.
*/
static void req_cons(mca_oob_uri_req_t *ptr)
{
ptr->uri = NULL;
}
static void req_des(mca_oob_uri_req_t *ptr)
{
if (NULL != ptr->uri) {
free(ptr->uri);
}
}
OBJ_CLASS_INSTANCE(mca_oob_uri_req_t,
opal_object_t,
req_cons, req_des);
void orte_oob_base_set_addr(int fd, short args, void *cbdata)
{
mca_oob_uri_req_t *req = (mca_oob_uri_req_t*)cbdata;
char *uri = req->uri;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s: set_addr to uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == uri) ? "NULL" : uri);
/* if the request doesn't contain a URI, then we
* have an error
*/
if (NULL == uri) {
opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_FORCED_TERMINATE(1);
OBJ_RELEASE(req);
return;
}
process_uri(uri);
OBJ_RELEASE(req);
}
static void process_uri(char *uri)
{
orte_process_name_t peer;
char *cptr;
mca_base_component_list_item_t *cli;
mca_oob_base_component_t *component;
char **uris=NULL;
int rc;
uint64_t ui64;
orte_oob_base_peer_t *pr;
/* find the first semi-colon in the string */
cptr = strchr(uri, ';');
if (NULL == cptr) {
/* got a problem - there must be at least two fields,
* the first containing the process name of our peer
* and all others containing the OOB contact info
*/
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return;
}
*cptr = '\0';
cptr++;
/* the first field is the process name, so convert it */
orte_util_convert_string_to_process_name(&peer, uri);
/* if the peer is us, no need to go further as we already
* know our own contact info
*/
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid) {
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s:set_addr peer %s is me",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer));
return;
}
/* split the rest of the uri into component parts */
uris = opal_argv_split(cptr, ';');
/* get the peer object for this process */
memcpy(&ui64, (char*)&peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers,
ui64, (void**)&pr) ||
NULL == pr) {
pr = OBJ_NEW(orte_oob_base_peer_t);
if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, ui64, (void*)pr))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(uris);
return;
}
}
/* loop across all available components and let them extract
* whatever piece(s) of the uri they find relevant - they
* are all operating on our event base, so we can just
* directly call their functions
*/
rc = ORTE_ERR_UNREACH;
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s:set_addr checking if peer %s is reachable via component %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
if (NULL != component->set_addr) {
if (ORTE_SUCCESS == component->set_addr(&peer, uris)) {
/* this component found reachable addresses
* in the uris
*/
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s: peer %s is reachable via component %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
opal_bitmap_set_bit(&pr->addressable, component->idx);
} else {
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s: peer %s is NOT reachable via component %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer), component->oob_base.mca_component_name);
}
}
}
opal_argv_free(uris);
}
#if OPAL_ENABLE_FT_CR == 1
void orte_oob_base_ft_event(int sd, short argc, void *cbdata)
{
int rc;
mca_base_component_list_item_t *cli;
mca_oob_base_component_t *component;
orte_state_caddy_t *state = (orte_state_caddy_t*)cbdata;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s oob:base:ft_event %s(%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
orte_job_state_to_str(state->job_state),
state->job_state);
/* loop across all available modules in priority order
* and call each one's ft_event handler
*/
OPAL_LIST_FOREACH(cli, &orte_oob_base.actives, mca_base_component_list_item_t) {
component = (mca_oob_base_component_t*)cli->cli_component;
if (NULL == component->ft_event) {
/* doesn't support this ability */
continue;
}
if (ORTE_SUCCESS != (rc = component->ft_event(state->job_state))) {
ORTE_ERROR_LOG(rc);
}
}
OBJ_RELEASE(state);
}
#endif