1
1

Bring the ofi/rml component online by completing the wireup protocol for the daemons. Cleanup the current confusion over how connection info gets created and

passed to make it all flow thru the opal/pmix "put/get" operations. Update the PMIx code to latest master to pickup some required behaviors.

Remove the no-longer-required get_contact_info and set_contact_info from the RML layer.

Add an MCA param to allow the ofi/rml component to route messages if desired. This is mainly for experimentation at this point as we aren't sure if routing wi
ll be beneficial at large scales. Leave it "off" by default.

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-07-06 09:48:48 -07:00
родитель 855b430632
Коммит b225366012
35 изменённых файлов: 527 добавлений и 699 удалений

@ -243,6 +243,7 @@ int pmix2x_store_local(const opal_process_name_t *proc, opal_value_t *val)
pmix_status_t rc;
pmix_proc_t p;
char *nsptr;
opal_pmix2x_jobid_trkr_t *job;
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
@ -254,7 +255,13 @@ int pmix2x_store_local(const opal_process_name_t *proc, opal_value_t *val)
if (NULL != proc) {
if (NULL == (nsptr = pmix2x_convert_jobid(proc->jobid))) {
return OPAL_ERR_NOT_FOUND;
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
(void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, proc->jobid);
job->jobid = proc->jobid;
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
nsptr = job->nspace;
}
(void)strncpy(p.nspace, nsptr, PMIX_MAX_NSLEN);
p.rank = pmix2x_convert_opalrank(proc->vpid);

@ -198,6 +198,8 @@ int orte_ess_base_app_setup(bool db_restrict_local)
}
}
if (NULL != orte_process_info.my_daemon_uri) {
opal_value_t val;
/* extract the daemon's name so we can update the routing table */
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
ORTE_PROC_MY_DAEMON, NULL))) {
@ -205,11 +207,25 @@ int orte_ess_base_app_setup(bool db_restrict_local)
error = "orte_rml_parse_daemon";
goto error;
}
/* Set the contact info in the RML - this won't actually establish
* the connection, but just tells the RML how to reach the daemon
/* Set the contact info in the database - this won't actually establish
* the connection, but just tells us how to reach the daemon
* if/when we attempt to send to it
*/
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = orte_process_info.my_daemon_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_DAEMON, &val))) {
ORTE_ERROR_LOG(ret);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
error = "store DAEMON URI";
goto error;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
}
/* setup the errmgr */

@ -419,6 +419,8 @@ int orte_ess_base_orted_setup(void)
pmix_server_start();
if (NULL != orte_process_info.my_hnp_uri) {
opal_value_t val;
/* extract the HNP's name so we can update the routing table */
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
ORTE_PROC_MY_HNP, NULL))) {
@ -430,7 +432,21 @@ int orte_ess_base_orted_setup(void)
* the connection, but just tells the RML how to reach the HNP
* if/when we attempt to send to it
*/
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = orte_process_info.my_hnp_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_HNP, &val))) {
ORTE_ERROR_LOG(ret);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
error = "store HNP URI";
goto error;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
}
/* select the errmgr */
@ -461,9 +477,6 @@ int orte_ess_base_orted_setup(void)
}
OPAL_LIST_DESTRUCT(&transports);
/* add our contact info to our proc object */
proc->rml_uri = orte_rml.get_contact_info();
/*
* Group communications
*/
@ -539,7 +552,7 @@ int orte_ess_base_orted_setup(void)
}
}
if (orte_static_ports) {
if (orte_static_ports || orte_fwd_mpirun_port) {
if (NULL == orte_node_regex) {
/* we didn't get the node info */
error = "cannot construct daemon map for static ports - no node map info";

@ -471,7 +471,10 @@ static int rte_init(void)
proc->name.jobid = ORTE_PROC_MY_NAME->jobid;
proc->name.vpid = ORTE_PROC_MY_NAME->vpid;
proc->pid = orte_process_info.pid;
proc->rml_uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&proc->rml_uri);
orte_process_info.my_hnp_uri = strdup(proc->rml_uri);
/* we are also officially a daemon, so better update that field too */
orte_process_info.my_daemon_uri = strdup(proc->rml_uri);
proc->state = ORTE_PROC_STATE_RUNNING;
OBJ_RETAIN(node); /* keep accounting straight */
proc->node = node;
@ -615,15 +618,6 @@ static int rte_init(void)
goto error;
}
/* we are an hnp, so update the contact info field for later use */
orte_process_info.my_hnp_uri = orte_rml.get_contact_info();
if (NULL != proc->rml_uri) {
free(proc->rml_uri);
}
proc->rml_uri = strdup(orte_process_info.my_hnp_uri);
/* we are also officially a daemon, so better update that field too */
orte_process_info.my_daemon_uri = strdup(orte_process_info.my_hnp_uri);
/* setup the orte_show_help system to recv remote output */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SHOW_HELP,
ORTE_RML_PERSISTENT, orte_show_help_recv, NULL);

@ -23,6 +23,7 @@
#include "opal/dss/dss.h"
#include "opal/class/opal_list.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/base/base.h"
@ -273,6 +274,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
char *rtmod, *nidmap;
size_t inlen, cmplen;
uint8_t *packed_data, *cmpdata;
int32_t nvals, i;
opal_value_t *kv;
orte_process_name_t dmn;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast:recv: with %d bytes",
@ -446,35 +450,49 @@ static void xcast_recv(int status, orte_process_name_t* sender,
/* routing is now possible */
orte_routed_base.routing_enabled = true;
/* see if we have wiring info as well */
/* unpack the byte object */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 != flag) {
/* unpack the byte object */
if (0 < bo->size) {
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* decode it, pushing the info into our database */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 < bo->size) {
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* pass it for processing */
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&wireup);
goto relay;
break;
}
for (i=0; i < nvals; i++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
break;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s STORING MODEX DATA FOR PROC %s KEY %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dmn), kv->key));
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kv))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(kv);
break;
}
OBJ_RELEASE(kv);
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
free(bo);
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
ORTE_ERROR_LOG(ret);
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
free(bo);
}
/* copy the remainder of the payload - we don't pass wiring info
* to the odls */
@ -499,7 +517,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
opal_dss.copy_payload(relay, data);
}
relay:
relay:
if (!orte_do_not_launch) {
/* get the list of next recipients from the routed module */
orte_routed.get_routing_list(rtmod, &coll);

@ -106,7 +106,7 @@
int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
orte_jobid_t job)
{
int rc;
int rc, v;
orte_job_t *jdata=NULL, *jptr;
orte_job_map_t *map=NULL;
opal_buffer_t *wireup, jobdata;
@ -116,6 +116,9 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
void *nptr;
uint32_t key;
char *nidmap;
orte_proc_t *dmn;
opal_value_t *val = NULL, *kv;
opal_list_t *modex;
/* get the job data pointer */
if (NULL == (jdata = orte_get_job_data_object(job))) {
@ -156,36 +159,106 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
ORTE_ERROR_LOG(rc);
return rc;
}
if (!orte_static_ports && !orte_fwd_mpirun_port) {
/* pack a flag indicating wiring info is provided */
flag = 1;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
/* get wireup info for daemons */
if (NULL == (jptr = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
wireup = OBJ_NEW(opal_buffer_t);
/* always include data for mpirun as the daemons can't have it yet */
val = NULL;
if (OPAL_SUCCESS != (rc = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
} else {
/* pack a flag indicating no wireup data is provided */
flag = 0;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(wireup);
return ORTE_ERR_NOT_FOUND;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
modex = (opal_list_t*)val->data.ptr;
numbytes = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
/* if we didn't rollup the connection info, then we have
* to provide a complete map of connection info */
if (!orte_static_ports && !orte_fwd_mpirun_port) {
for (v=1; v < jptr->procs->size; v++) {
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, v))) {
continue;
}
val = NULL;
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, NULL, NULL, &val)) || NULL == val) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
return rc;
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(buffer);
return ORTE_ERR_NOT_FOUND;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &dmn->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(wireup);
return rc;
}
modex = (opal_list_t*)val->data.ptr;
numbytes = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(wireup);
return rc;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(wireup);
return rc;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
OBJ_RELEASE(wireup);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
/* we need to ensure that any new daemons get a complete

@ -125,13 +125,7 @@ ORTE_DECLSPEC void orte_oob_base_send_nb(int fd, short args, void *cbdata);
orte_oob_base_send_nb, ORTE_MSG_PRI); \
}while(0)
/* Our contact info is actually subject to change as transports
* can fail at any time. So a request to obtain our URI requires
* that we get a snapshot in time. Since the request always comes
* thru the rml, and we share that event base, we can just cycle
* across the components to collect the info.
*
* During initial wireup, we can only transfer contact info on the daemon
/* During initial wireup, we can only transfer contact info on the daemon
* command line. This limits what we can send to a string representation of
* the actual contact info, which gets sent in a uri-like form. Not every
* oob module can support this transaction, so this function will loop
@ -147,37 +141,8 @@ ORTE_DECLSPEC void orte_oob_base_send_nb(int fd, short args, void *cbdata);
* Since all components define their address info at component start,
* it is unchanged and does not require acess via event
*/
#define ORTE_OOB_GET_URI(u) orte_oob_base_get_addr(u)
ORTE_DECLSPEC void orte_oob_base_get_addr(char **uri);
/**
* Extract initial contact information from a string uri
*
* During initial wireup, we can only transfer contact info on the daemon
* command line. This limits what we can send to a string representation of
* the actual contact info, which gets sent in a uri-like form. Not every
* oob module can support this transaction, so this function will loop
* across all oob components/modules, letting each look at the uri and extract
* info from it if it can.
*/
typedef struct {
opal_object_t super;
opal_event_t ev;
char *uri;
} mca_oob_uri_req_t;
OBJ_CLASS_DECLARATION(mca_oob_uri_req_t);
#define ORTE_OOB_SET_URI(u) \
do { \
mca_oob_uri_req_t *rq; \
rq = OBJ_NEW(mca_oob_uri_req_t); \
rq->uri = strdup((u)); \
orte_oob_base_set_addr(0, 0, (void*)rq); \
}while(0)
ORTE_DECLSPEC void orte_oob_base_set_addr(int fd, short args, void *cbdata);
/* Get the available transports and their attributes */
#define ORTE_OOB_GET_TRANSPORTS(u) orte_oob_base_get_transports(u)
ORTE_DECLSPEC void orte_oob_base_get_transports(opal_list_t *transports);

@ -227,11 +227,13 @@ void orte_oob_base_get_addr(char **uri)
bool one_added = false;
mca_base_component_list_item_t *cli;
mca_oob_base_component_t *component;
opal_value_t val;
/* start with our process name */
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) {
ORTE_ERROR_LOG(rc);
goto unblock;
*uri = NULL;
return;
}
len = strlen(final);
@ -279,55 +281,18 @@ void orte_oob_base_get_addr(char **uri)
}
}
unblock:
*uri = final;
}
/**
* This function will loop
* across all oob components, letting each look at the uri and extract
* info from it if it can. An error is to be returned if NO component
* can successfully extract a contact.
*/
static void req_cons(mca_oob_uri_req_t *ptr)
{
ptr->uri = NULL;
}
static void req_des(mca_oob_uri_req_t *ptr)
{
if (NULL != ptr->uri) {
free(ptr->uri);
/* push this into our modex storage */
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = final;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(ORTE_PROC_MY_NAME, &val))) {
ORTE_ERROR_LOG(rc);
}
}
OBJ_CLASS_INSTANCE(mca_oob_uri_req_t,
opal_object_t,
req_cons, req_des);
void orte_oob_base_set_addr(int fd, short args, void *cbdata)
{
mca_oob_uri_req_t *req = (mca_oob_uri_req_t*)cbdata;
char *uri;
ORTE_ACQUIRE_OBJECT(req);
uri = req->uri;
opal_output_verbose(5, orte_oob_base_framework.framework_output,
"%s: set_addr to uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == uri) ? "NULL" : uri);
/* if the request doesn't contain a URI, then we
* have an error
*/
if (NULL == uri) {
opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_FORCED_TERMINATE(1);
OBJ_RELEASE(req);
return;
}
process_uri(uri);
OBJ_RELEASE(req);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
}
static void process_uri(char *uri)

@ -10,7 +10,7 @@
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
# Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
# Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
# $COPYRIGHT$
#
@ -105,6 +105,11 @@ levels.
Listening port: %d
Remote host: %s
Remote port: %d
The connection was rejected.
#
[static-fwd]
Static ports were requested while orte_fwd_mpirun_port was set.
Both options cannot be simultaneously set. Please either set
orte_fwd_mpirun_port=false or remove any static port directives.

@ -330,6 +330,11 @@ static int tcp_component_register(void)
if (NULL != mca_oob_tcp_component.tcp_static_ports ||
NULL != mca_oob_tcp_component.tcp6_static_ports) {
/* can't fwd mpirun port _and_ have static ports */
if (ORTE_PROC_IS_HNP && orte_fwd_mpirun_port) {
orte_show_help("help-oob-tcp.txt", "static-fwd", true);
return ORTE_ERR_NOT_AVAILABLE;
}
orte_static_ports = true;
}

@ -1042,7 +1042,7 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
char *rml_uri = NULL, *ptr;
char *ptr;
int rc, idx;
orte_proc_t *daemon=NULL;
orte_job_t *jdata;
@ -1077,16 +1077,6 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
idx = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(buffer, &dname, &idx, ORTE_NAME))) {
char *nodename = NULL;
/* unpack its contact info */
idx = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &rml_uri, &idx, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
orted_failed_launch = true;
goto CLEANUP;
}
/* set the contact info into the hash table */
orte_rml.set_contact_info(rml_uri);
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:orted_report_launch from daemon %s",
@ -1100,7 +1090,6 @@ void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender,
goto CLEANUP;
}
daemon->state = ORTE_PROC_STATE_RUNNING;
daemon->rml_uri = rml_uri;
/* record that this daemon is alive */
ORTE_FLAG_SET(daemon, ORTE_PROC_FLAG_ALIVE);

@ -11,7 +11,7 @@
# All rights reserved.
# Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights
# reserved.
# Copyright (c) 2016 Intel, Inc. All rights reserved.
# Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -25,7 +25,6 @@ headers += \
libmca_rml_la_SOURCES += \
base/rml_base_frame.c \
base/rml_base_receive.c \
base/rml_base_contact.c \
base/rml_base_msg_handlers.c \
base/rml_base_stubs.c

@ -62,27 +62,6 @@ ORTE_DECLSPEC extern mca_base_framework_t orte_rml_base_framework;
/* select a component */
ORTE_DECLSPEC int orte_rml_base_select(void);
/**
* Post receive to get updates regarding contact information
*
* Post a non-blocking receive (likely during orte_init()) to receive
* updated contact information from the HNP when it becomes available.
* This should be called in any process that needs such updates, and
* the receive will continue to get update callbacks until
* orte_rml_base_comm_stop() is called.
*/
ORTE_DECLSPEC void orte_rml_base_comm_start(void);
/**
* Stop receiving contact information updates
*
* Shut down the receive posted during orte_rml_base_comm_start(),
* likely during orte_finalize().
*/
ORTE_DECLSPEC void orte_rml_base_comm_stop(void);
/*
* globals that might be needed
*/
@ -260,9 +239,6 @@ ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata);
/* Stub API interfaces to cycle through active plugins */
char* orte_rml_API_get_contact_info(void);
void orte_rml_API_set_contact_info(const char *contact_info);
int orte_rml_API_ping(orte_rml_conduit_t conduit_id,
const char* contact_info,
const struct timeval* tv);

@ -37,103 +37,9 @@
#include "orte/mca/rml/base/base.h"
int orte_rml_base_get_contact_info(orte_jobid_t job, opal_buffer_t *data)
{
int i;
orte_job_t *jdata;
orte_proc_t *proc;
int rc;
/* lookup the job */
if (NULL == (jdata = orte_get_job_data_object(job))) {
/* bad jobid */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* cycle through all procs in the job, adding their contact info to the buffer */
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* if this proc doesn't have any contact info, ignore it */
if (NULL == proc->rml_uri) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &proc->rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}
int orte_rml_base_update_contact_info(opal_buffer_t* data)
{
orte_std_cntr_t cnt;
orte_process_name_t peer;
orte_vpid_t num_procs;
char *rml_uri;
int rc;
/* unpack the data for each entry */
num_procs = 0;
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
"%s rml:base:update:contact:info got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
NULL == rml_uri ? "NULL" : rml_uri));
if (NULL != rml_uri) {
/* set the contact info into the hash table */
orte_rml.set_contact_info(rml_uri);
/* if this was an update to my own job, then
* track how many procs were in the message */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &peer, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return rc;
}
if (peer.jobid == ORTE_PROC_MY_NAME->jobid) {
++num_procs;
}
free(rml_uri);
}
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are a daemon, this update would include updated contact info
* for all daemons in the system - indicating that the number of daemons
* changed since we were initially launched. Thus, update the num_procs
* in our process_info struct so we can correctly route any messages
*/
if (ORTE_PROC_IS_DAEMON &&
orte_process_info.num_procs < num_procs) {
orte_process_info.num_procs = num_procs;
if (orte_process_info.max_procs < orte_process_info.num_procs) {
orte_process_info.max_procs = orte_process_info.num_procs;
}
/* if we changed it, then we better update the routing
* plans so daemon collectives work correctly.
*/
orte_routed.update_routing_plan(NULL);
}
return ORTE_SUCCESS;
}
int
orte_rml_base_parse_uris(const char* uri,
orte_process_name_t* peer,
char*** uris)
int orte_rml_base_parse_uris(const char* uri,
orte_process_name_t* peer,
char*** uris)
{
int rc;

@ -41,8 +41,6 @@
/* Initialising stub fns in the global var used by other modules */
orte_rml_base_API_t orte_rml = {
.get_contact_info = orte_rml_API_get_contact_info,
.set_contact_info = orte_rml_API_set_contact_info,
.ping = orte_rml_API_ping,
.send_nb = orte_rml_API_send_nb,
.send_buffer_nb = orte_rml_API_send_buffer_nb,

@ -1,127 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2012 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
*/
/*
* includes
*/
#include "orte_config.h"
#include <string.h>
#include "orte/constants.h"
#include "orte/types.h"
#include "opal/dss/dss.h"
#include "opal/util/output.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/util/name_fns.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/base.h"
#include "orte/mca/rml/base/rml_contact.h"
static bool recv_issued=false;
static void orte_rml_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
void orte_rml_base_comm_start(void)
{
if (recv_issued) {
return;
}
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_RML_INFO_UPDATE,
ORTE_RML_PERSISTENT,
orte_rml_base_recv,
NULL);
recv_issued = true;
}
void orte_rml_base_comm_stop(void)
{
if (!recv_issued) {
return;
}
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_RML_INFO_UPDATE);
recv_issued = false;
}
/* handle message from proxies
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
static void
orte_rml_base_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_rml_cmd_flag_t command;
orte_std_cntr_t count;
opal_buffer_t *buf;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
"%s rml:base:recv: processing message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_RML_CMD))) {
ORTE_ERROR_LOG(rc);
return;
}
switch (command) {
case ORTE_RML_UPDATE_CMD:
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(buffer))) {
ORTE_ERROR_LOG(rc);
return;
}
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
/* send an ack back - this is REQUIRED to ensure that the routing
* info gets updated -before- a message intending to use that info
* arrives. Because message ordering is NOT preserved in the OOB, it
* is possible for code that updates our contact info and then sends
* a message to fail because the update contact info message is
* processed too late
*/
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
"%s rml:base:recv: sending ack to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
buf = OBJ_NEW(opal_buffer_t);
if (0 > (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
sender, buf, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}

@ -117,56 +117,6 @@ void orte_rml_API_close_conduit(orte_rml_conduit_t id)
/** Get contact information for local process */
char* orte_rml_API_get_contact_info(void)
{
char **rc = NULL, *tmp;
orte_rml_base_active_t *active;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s rml:base:get_contact_info()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* cycle thru the actives and get their contact info */
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
if (NULL != active->component->get_contact_info) {
tmp = active->component->get_contact_info();
if (NULL != tmp) {
opal_argv_append_nosize(&rc, tmp);
free(tmp);
}
}
}
if (NULL != rc) {
tmp = opal_argv_join(rc, ';');
opal_argv_free(rc);
} else {
tmp = NULL;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s rml:base:get_contact_info() returning -> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),tmp);
return tmp;
}
/** Set contact information for remote process */
void orte_rml_API_set_contact_info(const char *contact_info)
{
orte_rml_base_active_t *active;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s rml:base:set_contact_info()",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* cycle thru the actives and let all modules parse the info
* to extract their relevant portions */
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
if (NULL != active->component->set_contact_info) {
active->component->set_contact_info(contact_info);
}
}
}
/** Ping process for connectivity check */
int orte_rml_API_ping(orte_rml_conduit_t conduit_id,
const char* contact_info,

@ -1,6 +1,7 @@
/*
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -26,37 +27,6 @@
BEGIN_C_DECLS
/**
* Create packed RML contact information for the given process names
*
* Create packed RML contact information for a given job.
* The information is provided packed in an opal_buffer_t
* structure.
*
* @param[in] jobid Job whose contact information is needed
* @param[out] data Contact information packed in buffer for
* \c name.
*
* @retval ORTE_SUCCESS Successfully found contact information
* @retval ORTE_ERROR Contact information could not be found or shared
*/
ORTE_DECLSPEC int orte_rml_base_get_contact_info(orte_jobid_t job,
opal_buffer_t *data);
/**
* Update the RML with contact information
*
* Update the RML with contact information provided from a call to
* orte_rml_base_get_contact_info(), likely on another host.
*
* @param[in] data Contact information in a packed buffer,
* obtained by call to orte_rml_base_get_contact_info()
*
* @retval ORTE_SUCCESS Successfully updated contact information
*/
ORTE_DECLSPEC int orte_rml_base_update_contact_info(opal_buffer_t* data);
/**
* Parse a contact information string
*

@ -82,6 +82,7 @@ static bool init_done = false;
static char *ofi_transports_supported = NULL;
static char *initial_ofi_transports_supported = NULL;
static bool ofi_desired = false;
static bool routing_desired = false;
/* return true if user override for choice of ofi provider */
bool user_override(void)
@ -242,7 +243,7 @@ static int rml_ofi_component_register(void)
{
mca_base_component_t *component = &mca_rml_ofi_component.base;
initial_ofi_transports_supported = strdup("fabric");
initial_ofi_transports_supported = "fabric,ethernet";
ofi_transports_supported = strdup(initial_ofi_transports_supported);
mca_base_component_var_register(component, "transports",
"Comma-delimited list of transports to support (default=\"fabric,ethernet\"",
@ -260,6 +261,14 @@ static int rml_ofi_component_register(void)
MCA_BASE_VAR_SCOPE_LOCAL,
&ofi_desired);
routing_desired = false;
mca_base_component_var_register(component, "routing",
"Route OFI messages",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,
OPAL_INFO_LVL_2,
MCA_BASE_VAR_SCOPE_LOCAL,
&routing_desired);
return ORTE_SUCCESS;
}
@ -931,13 +940,11 @@ static int rml_ofi_component_init(void)
if transport is not found RML_OFI_PROV_ID_INVALID is returned.
@[in]attributes : the attributes passed in to open_conduit reg the transport requested
*/
int get_ofi_prov_id( opal_list_t *attributes)
int get_ofi_prov_id(opal_list_t *attributes)
{
bool choose_fabric = false, choice_made = false;
int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0;
char *provider = NULL, *transport = NULL;
char *ethernet="sockets", *fabric="psm2";
char **providers = NULL, *provider;
struct fi_info *cur_fi;
char *comp_attrib = NULL;
char **comps;
@ -954,64 +961,49 @@ int get_ofi_prov_id( opal_list_t *attributes)
if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) &&
NULL != comp_attrib) {
comps = opal_argv_split(comp_attrib, ',');
for (i=0; NULL != comps[i] && choice_made == false ; i++) {
for (i=0; NULL != comps[i]; i++) {
if (NULL != strstr(ofi_transports_supported, comps[i])) {
if (0 == strcmp( comps[i], "ethernet")) {
if (0 == strcmp(comps[i], "ethernet")) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Opening conduit using OFI ethernet/sockets provider",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_argv_free(comps);
provider = ethernet;
choose_fabric = false;
choice_made = false; /* continue to see if fabric is requested */
} else if ( 0 == strcmp ( comps[i], "fabric")) {
opal_argv_append_nosize(&providers, "sockets");
} else if (0 == strcmp(comps[i], "fabric")) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Opening conduit using OFI fabric provider",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_argv_free(comps);
choose_fabric = true;
provider = NULL;
choice_made = true; /* fabric is highest priority so don't check for anymore */
opal_argv_prepend_nosize(&providers, "fabric"); /* fabric is higher priority so prepend it */
}
}
}
}
/* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */
if ( NULL == provider) {
if (!orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING)) {
/* ensure it remains NULL */
provider = NULL;
if (NULL == providers) {
if (orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING)) {
opal_argv_append_nosize(&providers, provider);
} else {
ofi_prov_id = RML_OFI_PROV_ID_INVALID;
}
}
/* either ethernet-sockets or specific is requested. Proceed to choose that provider */
if ( NULL != provider) {
// loop the orte_rml_ofi.ofi_provs[] and find the provider name that matches
for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) {
if (NULL != providers) {
/* go down the list of preferences in order */
for (i=0; NULL != providers[i] && RML_OFI_PROV_ID_INVALID == ofi_prov_id; i++) {
// loop the orte_rml_ofi.ofi_provs[] and see if someone matches
for (prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num; prov_num++ ) {
cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - get_ofi_prov_id() -> comparing %s = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),provider,cur_fi->fabric_attr->prov_name);
if ( strcmp(provider,cur_fi->fabric_attr->prov_name) == 0) {
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
providers[i], cur_fi->fabric_attr->prov_name);
if (0 == strcmp(providers[i], cur_fi->fabric_attr->prov_name)) {
ofi_prov_id = prov_num;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Choosing provider %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cur_fi->fabric_attr->prov_name);
break;
}
}
} else if ( choose_fabric ) {
// "fabric" is requested, choose the first fabric(non-ethernet) provider
for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) {
cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s -choosing fabric -> comparing %s != %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ethernet,cur_fi->fabric_attr->prov_name);
if ( strcmp(ethernet, cur_fi->fabric_attr->prov_name) != 0) {
ofi_prov_id = prov_num;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Choosing fabric provider %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_fi->fabric_attr->prov_name);
}
}
}
opal_output_verbose(20,orte_rml_base_framework.framework_output,
@ -1032,7 +1024,7 @@ static orte_rml_base_module_t* make_module( int ofi_prov_id)
"%s - rml_ofi make_module() begin ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if ( RML_OFI_PROV_ID_INVALID == ofi_prov_id) {
if (RML_OFI_PROV_ID_INVALID == ofi_prov_id) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - open_conduit did not select any ofi provider, returning NULL ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
@ -1043,22 +1035,25 @@ static orte_rml_base_module_t* make_module( int ofi_prov_id)
/* create a new module */
mod = (orte_rml_ofi_module_t*)calloc(1,sizeof(orte_rml_ofi_module_t));
if (NULL == mod) {
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Module allocation failed, returning NULL ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return NULL;
}
/* copy the APIs over to it and the OFI provider information */
memcpy(mod, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t));
/* setup the remaining data locations in mod, associate conduit with ofi provider selected*/
mod->cur_transport_id = ofi_prov_id;
/* we always go direct to our target peer, so set the routed to "direct" */
mod->api.routed = orte_routed.assign_module("direct");
/* set the routed module */
if (routing_desired) {
mod->api.routed = orte_routed.assign_module(NULL);
} else {
mod->api.routed = orte_routed.assign_module("direct");
}
if (NULL == mod->api.routed) {
/* we can't work */
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Failed to get direct routed support, returning NULL ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"%s - Failed to get%srouted support, disqualifying ourselves",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
routing_desired ? " " : " direct ");
free(mod);
return NULL;
}
@ -1077,7 +1072,6 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes)
char **comps;
int i;
orte_attribute_t *attr;
opal_list_t provider;
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - Entering rml_ofi_open_conduit()",

@ -563,7 +563,7 @@ static void send_msg(int fd, short args, void *cbdata)
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(snd);
//OBJ_RELEASE( ofi_send_req);
return;
return ;
}
/* decide the provider we want to use from the list of providers in peer as per below order.
* 1. if the user specified the transport for this conduit (even giving us a prioritized list of candidates),

@ -59,8 +59,6 @@ static int rml_oob_open(void);
static int rml_oob_close(void);
static orte_rml_base_module_t* open_conduit(opal_list_t *attributes);
static orte_rml_pathway_t* query_transports(void);
static char* get_contact_info(void);
static void set_contact_info(const char *uri);
static void close_conduit(orte_rml_base_module_t *mod);
/**
* component definition
@ -86,8 +84,6 @@ orte_rml_component_t mca_rml_oob_component = {
.priority = 5,
.open_conduit = open_conduit,
.query_transports = query_transports,
.get_contact_info = get_contact_info,
.set_contact_info = set_contact_info,
.close_conduit = close_conduit
};
@ -296,16 +292,3 @@ static void close_conduit(orte_rml_base_module_t *md)
* and free'ng the module */
return;
}
static char* get_contact_info(void)
{
char *ret;
ORTE_OOB_GET_URI(&ret);
return ret;
}
static void set_contact_info(const char *uri)
{
ORTE_OOB_SET_URI(uri);
}

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
*
@ -295,45 +295,6 @@ typedef int (*orte_rml_API_query_transports_fn_t)(opal_list_t *transports);
/* query the routed module for a given conduit */
typedef char* (*orte_rml_API_query_routed_fn_t)(orte_rml_conduit_t id);
/**
* Get a "contact info" string for the local process
*
* Get a "contact info" string that can be used by other processes to
* share the contact information for the given process. The "contact
* info" string includes the process identifier for the given process
* and uses only basic ascii characters. It should be quoted when
* evaluated by a shell, although no special escaping is necessary.
*
* @note The function may return a contact info string which contains
* multiple addresses.
*
* @retval non-NULL The contact information for this process
* @retval NULL An error occurred when trying to get the current
* process contact info
*/
typedef char* (*orte_rml_API_get_contact_info_fn_t)(void);
/**
* Update the RML with a remote process's contact info
*
* Update the RML with a remote process's contact information, as
* returned from the get_contact_info() function on the remote
* process. Before a send can be initiated to a remote process,
* either this function must be called for that process or that
* process must have already established a connection to the local
* process.
*
* @note The user may not always explicitly call this function
* directly, but may instead cause it to be called through one of the
* contact setup functions available in
* orte/mca/rml/base/rml_contact.h.
*
* @param[in] contact_info The contact information string of a peer
*/
typedef void (*orte_rml_API_set_contact_info_fn_t)(const char *contact_info);
/**
* "Ping" another process to determine availability
*
@ -477,11 +438,6 @@ typedef struct {
/** Shutdown the conduit and clean up resources */
orte_rml_API_close_conduit_fn_t close_conduit;
/** Get contact information for local process */
orte_rml_API_get_contact_info_fn_t get_contact_info;
/** Set contact information for remote process */
orte_rml_API_set_contact_info_fn_t set_contact_info;
/** Ping process for connectivity check */
orte_rml_API_ping_fn_t ping;
@ -543,12 +499,6 @@ typedef orte_rml_base_module_t* (*orte_rml_component_open_conduit_fn_t)(opal_lis
*/
typedef orte_rml_pathway_t* (*orte_rml_component_query_transports_fn_t)(void);
/* Get the contact info for this component */
typedef char* (*orte_rml_component_get_contact_info_fn_t)(void);
/* Set contact info */
typedef void (*orte_rml_component_set_contact_info_fn_t)(const char *uri);
/** Close conduit - allow the specific component to
* cleanup the module for this conduit
*/
@ -571,8 +521,6 @@ typedef struct orte_rml_component_t {
/* Component interface functions */
orte_rml_component_open_conduit_fn_t open_conduit;
orte_rml_component_query_transports_fn_t query_transports;
orte_rml_component_get_contact_info_fn_t get_contact_info;
orte_rml_component_set_contact_info_fn_t set_contact_info;
orte_rml_module_close_conduit_fn_t close_conduit;
} orte_rml_component_t;

@ -247,10 +247,14 @@ static void vm_ready(int fd, short args, void *cbdata)
orte_daemon_cmd_flag_t command = ORTE_DAEMON_DVM_NIDMAP_CMD;
orte_grpcomm_signature_t *sig;
opal_buffer_t *wireup;
orte_job_t *jptr;
orte_proc_t *dmn;
opal_byte_object_t bo, *boptr;
int8_t flag;
int32_t numbytes;
int32_t numbytes, v;
char *nidmap;
opal_list_t *modex;
opal_value_t *val, *kv;
ORTE_ACQUIRE_OBJECT(caddy);
@ -291,13 +295,46 @@ static void vm_ready(int fd, short args, void *cbdata)
/* pack a flag indicating wiring info is provided */
flag = 1;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
/* get wireup info for daemons per the selected routing module */
/* get wireup info for daemons */
jptr = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
OBJ_RELEASE(buf);
return;
for (v=0; v < jptr->procs->size; v++) {
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, v))) {
continue;
}
val = NULL;
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, NULL, NULL, &val)) || NULL == val) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
modex = (opal_list_t*)val->data.ptr;
numbytes = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(wireup);
return;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);

@ -68,6 +68,7 @@
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/odls/base/base.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/rmaps/rmaps_types.h"
@ -722,7 +723,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
/* send back contact info */
contact_info = orte_rml.get_contact_info();
orte_oob_base_get_addr(&contact_info);
if (NULL == contact_info) {
ORTE_ERROR_LOG(ORTE_ERROR);

@ -87,6 +87,7 @@
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/odls/base/odls_private.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/ras/ras.h"
#include "orte/mca/routed/routed.h"
@ -116,7 +117,7 @@ static void pipe_closed(int fd, short flags, void *arg);
static void rollup(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
static opal_buffer_t *bucket;
static opal_buffer_t *bucket, *mybucket = NULL;
static int ncollected = 0;
static char *orte_parent_uri;
@ -223,7 +224,6 @@ int orte_daemon(int argc, char *argv[])
{
int ret = 0;
opal_cmd_line_t *cmd_line = NULL;
char *rml_uri;
int i;
opal_buffer_t *buffer;
char hostname[OPAL_MAXHOSTNAMELEN];
@ -451,13 +451,18 @@ int orte_daemon(int argc, char *argv[])
/* insert our contact info into our process_info struct so we
* have it for later use and set the local daemon field to our name
*/
orte_process_info.my_daemon_uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&orte_process_info.my_daemon_uri);
if (NULL == orte_process_info.my_daemon_uri) {
/* no way to communicate */
ret = ORTE_ERROR;
goto DONE;
}
ORTE_PROC_MY_DAEMON->jobid = ORTE_PROC_MY_NAME->jobid;
ORTE_PROC_MY_DAEMON->vpid = ORTE_PROC_MY_NAME->vpid;
/* if I am also the hnp, then update that contact info field too */
if (ORTE_PROC_IS_HNP) {
orte_process_info.my_hnp_uri = orte_rml.get_contact_info();
orte_process_info.my_hnp_uri = strdup(orte_process_info.my_daemon_uri);
ORTE_PROC_MY_HNP->jobid = ORTE_PROC_MY_NAME->jobid;
ORTE_PROC_MY_HNP->vpid = ORTE_PROC_MY_NAME->vpid;
}
@ -662,9 +667,9 @@ int orte_daemon(int argc, char *argv[])
&orte_parent_uri);
if (NULL != orte_parent_uri) {
orte_process_name_t parent;
opal_value_t val;
/* set the contact info into the hash table */
orte_rml.set_contact_info(orte_parent_uri);
/* set the contact info into our local database */
ret = orte_rml_base_parse_uris(orte_parent_uri, &parent, NULL);
if (ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
@ -672,6 +677,18 @@ int orte_daemon(int argc, char *argv[])
orte_parent_uri = NULL;
goto DONE;
}
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = orte_parent_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&parent, &val))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&val);
goto DONE;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* don't need this value anymore */
free(orte_parent_uri);
@ -701,7 +718,7 @@ int orte_daemon(int argc, char *argv[])
orte_process_name_t target;
target.jobid = ORTE_PROC_MY_NAME->jobid;
if (orte_fwd_mpirun_port) {
if (orte_fwd_mpirun_port || orte_static_ports) {
/* setup the rollup callback */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ORTED_CALLBACK,
ORTE_RML_PERSISTENT, rollup, NULL);
@ -737,15 +754,6 @@ int orte_daemon(int argc, char *argv[])
OBJ_RELEASE(buffer);
goto DONE;
}
/* for now, always include our contact info, even if we are using
* static ports. Eventually, this will be removed
*/
rml_uri = orte_rml.get_contact_info();
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
/* get any connection info we may have pushed */
{
@ -763,24 +771,37 @@ int orte_daemon(int argc, char *argv[])
}
} else {
/* the data is returned as a list of key-value pairs in the opal_value_t */
if (OPAL_PTR != val->type) {
opal_output(0, "WRONG RETURNED TYPE");
}
modex = (opal_list_t*)val->data.ptr;
flag = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
if (OPAL_PTR == val->type) {
modex = (opal_list_t*)val->data.ptr;
flag = (int32_t)opal_list_get_size(modex);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
}
OPAL_LIST_RELEASE(modex);
} else {
opal_output(0, "VAL KEY: %s", (NULL == val->key) ? "NULL" : val->key);
/* single value */
flag = 1;
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &flag, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &val, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(buffer);
goto DONE;
}
}
OPAL_LIST_RELEASE(modex);
OBJ_RELEASE(val);
}
}
@ -1027,22 +1048,60 @@ static void rollup(int status, orte_process_name_t* sender,
int nreqd;
char *rtmod;
int ret;
orte_process_name_t child;
int32_t i, flag, cnt;
opal_value_t *kv;
/* xfer the contents of the rollup to our bucket */
opal_dss.copy_payload(bucket, buffer);
ncollected++;
/* get the number of children, and include ourselves */
/* if the sender is ourselves, then we save that buffer
* so we can insert it at the beginning */
if (sender->jobid == ORTE_PROC_MY_NAME->jobid &&
sender->vpid == ORTE_PROC_MY_NAME->vpid) {
mybucket = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(mybucket, buffer);
} else {
/* the first entry in the bucket will be from our
* direct child - harvest it for connection info */
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &child, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
goto report;
}
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
goto report;
}
for (i=0; i < flag; i++) {
cnt = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &kv, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(ret);
goto report;
}
/* store this in a daemon wireup buffer for later distribution */
opal_pmix.store_local(&child, kv);
OBJ_RELEASE(kv);
}
}
report:
/* get the number of children */
rtmod = orte_rml.get_routed(orte_mgmt_conduit);
nreqd = orte_routed.num_routes(rtmod) + 1;
if (nreqd == ncollected) {
if (nreqd == ncollected && NULL != mybucket) {
/* add the collection of our children's buckets to ours */
opal_dss.copy_payload(mybucket, bucket);
OBJ_RELEASE(bucket);
/* relay this on to our parent */
if (0 > (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
ORTE_PROC_MY_PARENT, bucket,
if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_PARENT, mybucket,
ORTE_RML_TAG_ORTED_CALLBACK,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(bucket);
OBJ_RELEASE(mybucket);
}
}
}

@ -92,6 +92,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
@ -539,13 +540,30 @@ int orte_submit_init(int argc, char *argv[],
opal_unsetenv(OPAL_MCA_PREFIX"pmix", &environ);
if (ORTE_PROC_IS_TOOL) {
/* set the info in our contact table */
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
opal_value_t val;
/* extract the name */
if (ORTE_SUCCESS != orte_rml_base_parse_uris(orte_process_info.my_hnp_uri, ORTE_PROC_MY_HNP, NULL)) {
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, orte_process_info.my_hnp_uri);
exit(1);
}
/* set the info in our contact table */
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = orte_process_info.my_daemon_uri;
if (OPAL_SUCCESS != opal_pmix.store_local(ORTE_PROC_MY_HNP, &val)) {
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, orte_process_info.my_hnp_uri);
orte_finalize();
exit(1);
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* set the route to be direct */
if (ORTE_SUCCESS != orte_routed.update_route(NULL, ORTE_PROC_MY_HNP, ORTE_PROC_MY_HNP)) {
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, orte_process_info.my_hnp_uri);
@ -994,7 +1012,7 @@ int orte_submit_job(char *argv[], int *index,
if (NULL != orte_cmd_options.report_uri) {
FILE *fp;
char *rml_uri;
rml_uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&rml_uri);
if (0 == strcmp(orte_cmd_options.report_uri, "-")) {
/* if '-', then output to stdout */
printf("%s\n", (NULL == rml_uri) ? "NULL" : rml_uri);

@ -50,7 +50,7 @@
static int init_server(void)
{
char *server;
opal_buffer_t buf;
opal_value_t val;
char input[1024], *filename;
FILE *fp;
int rc;
@ -103,20 +103,27 @@ static int init_server(void)
} else {
server = strdup(orte_data_server_uri);
}
/* setup our route to the server */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &server, 1, OPAL_STRING);
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
ORTE_ERROR_LOG(rc);
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
return rc;
}
OBJ_DESTRUCT(&buf);
/* parse the URI to get the server's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* setup our route to the server */
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = server;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&orte_pmix_server_globals.server, &val))) {
ORTE_ERROR_LOG(rc);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
return rc;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* check if we are to wait for the server to start - resolves
* a race condition that can occur when the server is run
* as a background job - e.g., in scripts

@ -83,7 +83,7 @@ char *orte_data_server_uri = NULL;
bool orte_static_ports = false;
char *orte_oob_static_ports = NULL;
bool orte_standalone_operation = false;
bool orte_fwd_mpirun_port = false;
bool orte_fwd_mpirun_port = true;
bool orte_keep_fqdn_hostnames = false;
bool orte_have_fqdn_allocation = false;

@ -783,6 +783,7 @@ int orte_register_params(void)
OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
&orte_stack_trace_wait_timeout);
orte_fwd_mpirun_port = true;
(void) mca_base_var_register ("orte", "orte", NULL, "fwd_mpirun_port",
"Forward the port used by mpirun so all daemons will use it",
MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0,

@ -77,6 +77,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/state/state.h"
@ -324,7 +325,7 @@ int main(int argc, char *argv[])
opal_finalize();
/* check for request to report uri */
uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&uri);
if (NULL != myglobals.report_uri) {
FILE *fp;
if (0 == strcmp(myglobals.report_uri, "-")) {

@ -56,6 +56,7 @@
#include "orte/util/proc_info.h"
#include "orte/util/threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/orted/orted.h"
@ -204,7 +205,7 @@ int main(int argc, char *argv[])
* proposed in an email thread by Jeff Squyres
*/
if (NULL != report_uri) {
rml_uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&rml_uri);
if (0 == strcmp(report_uri, "-")) {
/* if '-', then output to stdout */
printf("%s\n", rml_uri);

@ -45,6 +45,7 @@
#include "opal/util/opal_environ.h"
#include "opal/dss/dss.h"
#include "opal/mca/base/base.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/runtime/opal.h"
#include "opal/mca/event/event.h"
@ -211,6 +212,7 @@ main(int argc, char *argv[])
orte_vpid_t vstart, vend;
int vint;
char *rtmod;
opal_value_t val;
/***************
* Initialize
@ -421,14 +423,30 @@ main(int argc, char *argv[])
target_hnp = OBJ_NEW(orte_hnp_contact_t);
target_hnp->rml_uri = strdup(hnpuristr);
}
/* set the info in our contact table */
orte_rml.set_contact_info(target_hnp->rml_uri);
/* extract the name */
if (ORTE_SUCCESS != orte_rml_base_parse_uris(target_hnp->rml_uri, &target_hnp->name, NULL)) {
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, target_hnp->rml_uri);
orte_finalize();
exit(1);
}
/* set the info in our contact table */
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = target_hnp->rml_uri;
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&target_hnp->name, &val))) {
ORTE_ERROR_LOG(ret);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, target_hnp->rml_uri);
orte_finalize();
exit(1);
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* set the route to be direct */
if (ORTE_SUCCESS != orte_routed.update_route(rtmod, &target_hnp->name, &target_hnp->name)) {
orte_show_help("help-orte-top.txt", "orte-top:hnp-uri-bad", true, target_hnp->rml_uri);

@ -28,6 +28,7 @@
#include "opal/util/output.h"
#include "opal/threads/tsd.h"
#include "opal/mca/event/event.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/runtime/opal_progress.h"
#include "opal/dss/dss.h"
@ -110,9 +111,7 @@ static bool tool_connected = false;
int orte_util_comm_connect_tool(char *uri)
{
int rc;
/* set the contact info into the comm hash tables*/
orte_rml.set_contact_info(uri);
opal_value_t val;
/* extract the tool's name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(uri, &tool, NULL))) {
@ -120,6 +119,22 @@ int orte_util_comm_connect_tool(char *uri)
return rc;
}
/* set the contact info into the comm hash tables*/
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = uri;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&tool, &val))) {
ORTE_ERROR_LOG(rc);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
return rc;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* set the route to be direct */
if (ORTE_SUCCESS != (rc = orte_routed.update_route(NULL, &tool, &tool))) {
ORTE_ERROR_LOG(rc);

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -41,8 +41,10 @@
#include "opal/util/os_path.h"
#include "opal/util/output.h"
#include "opal/util/os_dirpath.h"
#include "opal/mca/pmix/pmix.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/oob/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/routed.h"
@ -77,7 +79,7 @@ int orte_write_hnp_contact_file(char *filename)
FILE *fp;
char *my_uri;
my_uri = orte_rml.get_contact_info();
orte_oob_base_get_addr(&my_uri);
if (NULL == my_uri) {
return ORTE_ERROR;
}
@ -104,6 +106,7 @@ int orte_read_hnp_contact_file(char *filename, orte_hnp_contact_t *hnp, bool con
char *hnp_uri, *pidstr;
FILE *fp;
int rc;
opal_value_t val;
fp = fopen(filename, "r");
if (NULL == fp) { /* failed on first read - wait and try again */
@ -133,9 +136,6 @@ int orte_read_hnp_contact_file(char *filename, orte_hnp_contact_t *hnp, bool con
fclose(fp);
if (connect) {
/* set the contact info into the comm hash tables*/
orte_rml.set_contact_info(hnp_uri);
/* extract the HNP's name and store it */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(hnp_uri, &hnp->name, NULL))) {
ORTE_ERROR_LOG(rc);
@ -143,6 +143,23 @@ int orte_read_hnp_contact_file(char *filename, orte_hnp_contact_t *hnp, bool con
return rc;
}
/* set the contact info into the comm hash tables*/
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = hnp_uri;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&hnp->name, &val))) {
ORTE_ERROR_LOG(rc);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
free(hnp_uri);
return rc;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
/* set the route to be direct */
if (ORTE_SUCCESS != (rc = orte_routed.update_route(NULL, &hnp->name, &hnp->name))) {
ORTE_ERROR_LOG(rc);

@ -101,11 +101,10 @@ int orte_util_build_daemon_nidmap(void)
int rc;
struct hostent *h;
orte_node_t *node;
opal_buffer_t buf;
opal_process_name_t proc;
char *uri, *addr;
char *proc_name;
opal_value_t kv;
opal_value_t kv, val;
/* install the entry for the HNP */
proc.jobid = ORTE_PROC_MY_NAME->jobid;
@ -122,7 +121,9 @@ int orte_util_build_daemon_nidmap(void)
OBJ_DESTRUCT(&kv);
/* we must have already built the node pool, so cycle across it */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
for (i=0; i < orte_node_pool->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
continue;
@ -180,20 +181,25 @@ int orte_util_build_daemon_nidmap(void)
"%s orte:util:build:daemon:nidmap node %s daemon %d addr %s uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
node->name, i+1, addr, uri));
/* if this is the HNP, then store it */
/* if this is the HNP, then save it */
if (!ORTE_PROC_IS_HNP && 0 == i) {
orte_process_info.my_hnp_uri = strdup(uri);
}
opal_dss.pack(&buf, &uri, 1, OPAL_STRING);
val.data.string = uri;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&proc, &val))) {
ORTE_ERROR_LOG(rc);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
return rc;
}
free(proc_name);
free(uri);
}
/* load the hash tables */
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&buf);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
return rc;
}