Enable connect_accept between multiple singleton jobs without the presence of an external rendezvous agent (e.g., ompi-server). This also enables connect_accept between processes in more than two jobs regardless of how they were started.
Create an ability to store the contact info for multiple HNPs being used to route between different job families. Modify the dpm orte module to pass the resulting store during the connect_accept procedure so that all jobs involved in the resulting communicator know how to route OOB messages between them. Add a test provided by Philippe that tests this ability. This commit was SVN r23438.
Этот коммит содержится в:
родитель
d9f7947a42
Коммит
248320b91a
@ -206,6 +206,19 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
ompi_proc_pack(proc_list, size, nbuf);
|
||||
}
|
||||
|
||||
/* pack wireup info - this is required so that all involved parties can
|
||||
* discover how to talk to each other. For example, consider the case
|
||||
* where we connect_accept to one independent job (B), and then connect_accept
|
||||
* to another one (C) to wire all three of us together. Job B will not know
|
||||
* how to talk to job C at the OOB level because the two of them didn't
|
||||
* directly connect_accept to each other. Hence, we include the required
|
||||
* wireup info at this first exchange
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(nbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (NULL != cabuf) {
|
||||
OBJ_RELEASE(cabuf);
|
||||
}
|
||||
@ -332,6 +345,15 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
opal_list_t all_procs;
|
||||
orte_namelist_t *name;
|
||||
|
||||
/* we first need to give the wireup info to our routed module.
|
||||
* Not every routed module will need it, but some do require
|
||||
* this info before we can do any comm
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(rprocs[0]->proc_name.jobid, nrbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&all_procs, opal_list_t);
|
||||
|
||||
if (send_first) {
|
||||
@ -376,11 +398,19 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept executing modex",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (OMPI_SUCCESS != (rc = orte_grpcomm.modex(&all_procs))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept modex complete",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/*
|
||||
while (NULL != (item = opal_list_remove_first(&all_procs))) {
|
||||
OBJ_RELEASE(item);
|
||||
@ -388,7 +418,15 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
OBJ_DESTRUCT(&all_procs);
|
||||
*/
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept adding procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
MCA_PML_CALL(add_procs(new_proc_list, new_proc_len));
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept new procs added",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
OBJ_RELEASE(nrbuf);
|
||||
@ -396,6 +434,10 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
OBJ_RELEASE(nbuf);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept allocating group size %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), rsize));
|
||||
|
||||
new_group_pointer=ompi_group_allocate(rsize);
|
||||
if( NULL == new_group_pointer ) {
|
||||
rc = OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -410,6 +452,10 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
/* increment proc reference counters */
|
||||
ompi_group_increment_proc_count(new_group_pointer);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept setting up communicator",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* set up communicator structure */
|
||||
rc = ompi_comm_set ( &newcomp, /* new comm */
|
||||
comm, /* old comm */
|
||||
@ -432,6 +478,10 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
OBJ_RELEASE(new_group_pointer);
|
||||
new_group_pointer = MPI_GROUP_NULL;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept allocate comm_cid",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* allocate comm_cid */
|
||||
rc = ompi_comm_nextcid ( newcomp, /* new communicator */
|
||||
comm, /* old communicator */
|
||||
@ -444,6 +494,10 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
goto exit;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept activate comm",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* activate comm and init coll-component */
|
||||
rc = ompi_comm_activate ( &newcomp, /* new communicator */
|
||||
comm, /* old communicator */
|
||||
@ -480,6 +534,10 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
}
|
||||
|
||||
*newcomm = newcomp;
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept complete",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -869,7 +927,6 @@ cleanup:
|
||||
static int route_to_port(char *rml_uri, orte_process_name_t *rproc)
|
||||
{
|
||||
opal_buffer_t route;
|
||||
orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
|
||||
int rc;
|
||||
|
||||
/* We need to ask the routed module to init_routes so it can do the
|
||||
@ -880,7 +937,6 @@ static int route_to_port(char *rml_uri, orte_process_name_t *rproc)
|
||||
*/
|
||||
/* pack a cmd so the buffer can be unpacked correctly */
|
||||
OBJ_CONSTRUCT(&route, opal_buffer_t);
|
||||
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
|
||||
|
||||
/* pack the provided uri */
|
||||
opal_dss.pack(&route, &rml_uri, 1, OPAL_STRING);
|
||||
|
@ -75,13 +75,11 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
||||
orte_vpid_t num_procs;
|
||||
char *rml_uri;
|
||||
orte_process_name_t name;
|
||||
bool got_name;
|
||||
int rc;
|
||||
|
||||
/* unpack the data for each entry */
|
||||
num_procs = 0;
|
||||
name.jobid = ORTE_JOBID_INVALID;
|
||||
got_name = false;
|
||||
cnt = 1;
|
||||
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
|
||||
|
||||
@ -97,24 +95,23 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
||||
free(rml_uri);
|
||||
return(rc);
|
||||
}
|
||||
if (!got_name) {
|
||||
/* we only get an update from a single jobid - the command
|
||||
* that creates these doesn't cross jobid boundaries - so
|
||||
* record it here
|
||||
*/
|
||||
/* update the routing table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
got_name = true;
|
||||
/* if this is for a different job family, update the route to this proc */
|
||||
/* if this is for a different job family */
|
||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
if (!orte_routed.route_is_defined(&name)) {
|
||||
/* update the route to this proc */
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&name, &name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
/* and store the uri */
|
||||
opal_dss.pack(&orte_remote_hnps, &rml_uri, 1, OPAL_STRING);
|
||||
}
|
||||
}
|
||||
free(rml_uri);
|
||||
|
@ -16,9 +16,13 @@
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/class/opal_bitmap.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/mca/base/mca_base_component_repository.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/mca/routed/base/base.h"
|
||||
|
||||
@ -52,6 +56,9 @@ static void destruct(orte_routed_tree_t *rt)
|
||||
OBJ_CLASS_INSTANCE(orte_routed_tree_t, opal_list_item_t,
|
||||
construct, destruct);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_routed_jobfam_t, opal_object_t,
|
||||
NULL, NULL);
|
||||
|
||||
int orte_routed_base_output = -1;
|
||||
orte_routed_module_t orte_routed = {0};
|
||||
opal_list_t orte_routed_base_components;
|
||||
@ -83,6 +90,11 @@ orte_routed_base_open(void)
|
||||
/* Initialize globals */
|
||||
OBJ_CONSTRUCT(&orte_routed_base_components, opal_list_t);
|
||||
|
||||
/* Initialize storage of remote hnp uris */
|
||||
OBJ_CONSTRUCT(&orte_remote_hnps, opal_buffer_t);
|
||||
/* prime it with our HNP uri */
|
||||
opal_dss.pack(&orte_remote_hnps, &orte_process_info.my_hnp_uri, 1, OPAL_STRING);
|
||||
|
||||
/* Open up all available components */
|
||||
ret = mca_base_components_open("routed",
|
||||
orte_routed_base_output,
|
||||
@ -144,6 +156,8 @@ orte_routed_base_close(void)
|
||||
orte_routed.finalize();
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&orte_remote_hnps);
|
||||
|
||||
/* shutdown any remaining opened components */
|
||||
if (component_open_called) {
|
||||
mca_base_components_close(orte_routed_base_output,
|
||||
|
@ -11,9 +11,11 @@
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/class/opal_bitmap.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
#include "opal/util/output.h"
|
||||
@ -74,7 +76,7 @@ orte_routed_module_t orte_routed_binomial_module = {
|
||||
};
|
||||
|
||||
/* local globals */
|
||||
static opal_hash_table_t jobfam_list;
|
||||
static opal_pointer_array_t jobfams;
|
||||
static opal_condition_t cond;
|
||||
static opal_mutex_t lock;
|
||||
static orte_process_name_t *lifeline=NULL;
|
||||
@ -87,8 +89,8 @@ static bool ack_recvd;
|
||||
|
||||
static int init(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&jobfam_list, opal_hash_table_t);
|
||||
opal_hash_table_init(&jobfam_list, 128);
|
||||
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
|
||||
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
|
||||
|
||||
/* setup the global condition and lock */
|
||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||
@ -106,8 +108,9 @@ static int init(void)
|
||||
|
||||
static int finalize(void)
|
||||
{
|
||||
int rc;
|
||||
int rc, i;
|
||||
opal_list_item_t *item;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
|
||||
/* if I am an application process, indicate that I am
|
||||
* truly finalizing prior to departure
|
||||
@ -121,7 +124,13 @@ static int finalize(void)
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&jobfam_list);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
OBJ_RELEASE(jfam);
|
||||
}
|
||||
}
|
||||
OBJ_DESTRUCT(&jobfams);
|
||||
|
||||
/* destruct the global condition and lock */
|
||||
OBJ_DESTRUCT(&cond);
|
||||
OBJ_DESTRUCT(&lock);
|
||||
@ -140,8 +149,9 @@ static int finalize(void)
|
||||
|
||||
static int delete_route(orte_process_name_t *proc)
|
||||
{
|
||||
int rc;
|
||||
orte_process_name_t *route_copy;
|
||||
int i;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
|
||||
if (proc->jobid == ORTE_JOBID_INVALID ||
|
||||
proc->vpid == ORTE_VPID_INVALID) {
|
||||
@ -176,23 +186,22 @@ static int delete_route(orte_process_name_t *proc)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* see if this proc is present - it will have a wildcard vpid,
|
||||
* so we have to look for it with that condition
|
||||
*/
|
||||
rc = opal_hash_table_get_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(proc->jobid),
|
||||
(void**)&route_copy);
|
||||
if (ORTE_SUCCESS == rc && NULL != route_copy) {
|
||||
/* proc is present - remove the data */
|
||||
free(route_copy);
|
||||
rc = opal_hash_table_remove_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(proc->jobid));
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
/* see if this job family is present */
|
||||
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: deleting route to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
||||
OBJ_RELEASE(jfam);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* not present - nothing to do */
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -208,8 +217,9 @@ static int delete_route(orte_process_name_t *proc)
|
||||
static int update_route(orte_process_name_t *target,
|
||||
orte_process_name_t *route)
|
||||
{
|
||||
int rc;
|
||||
orte_process_name_t *route_copy;
|
||||
int i;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
|
||||
if (target->jobid == ORTE_JOBID_INVALID ||
|
||||
target->vpid == ORTE_VPID_INVALID) {
|
||||
@ -256,34 +266,35 @@ static int update_route(orte_process_name_t *target,
|
||||
ORTE_JOBID_PRINT(target->jobid),
|
||||
ORTE_NAME_PRINT(route)));
|
||||
|
||||
/* see if this target is already present - it will have a wildcard vpid,
|
||||
* so we have to look for it with that condition
|
||||
*/
|
||||
rc = opal_hash_table_get_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(target->jobid),
|
||||
(void**)&route_copy);
|
||||
if (ORTE_SUCCESS == rc && NULL != route_copy) {
|
||||
/* target already present - update the route info
|
||||
* in case it has changed
|
||||
*/
|
||||
*route_copy = *route;
|
||||
rc = opal_hash_table_set_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(target->jobid), route_copy);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
/* see if this target is already present */
|
||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: updating route to %s via %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(target->jobid),
|
||||
ORTE_NAME_PRINT(route)));
|
||||
jfam->route.jobid = route->jobid;
|
||||
jfam->route.vpid = route->vpid;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* not there, so add the route FOR THE JOB FAMILY*/
|
||||
route_copy = (orte_process_name_t *) malloc(sizeof(orte_process_name_t));
|
||||
*route_copy = *route;
|
||||
rc = opal_hash_table_set_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(target->jobid), route_copy);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: adding route to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(target->jobid)));
|
||||
jfam = OBJ_NEW(orte_routed_jobfam_t);
|
||||
jfam->job_family = jfamily;
|
||||
jfam->route.jobid = route->jobid;
|
||||
jfam->route.vpid = route->vpid;
|
||||
opal_pointer_array_add(&jobfams, jfam);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* THIS CAME FROM OUR OWN JOB FAMILY... */
|
||||
@ -299,7 +310,9 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
||||
orte_process_name_t *ret, daemon;
|
||||
opal_list_item_t *item;
|
||||
orte_routed_tree_t *child;
|
||||
int rc;
|
||||
int i;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
|
||||
if (target->jobid == ORTE_JOBID_INVALID ||
|
||||
target->vpid == ORTE_VPID_INVALID) {
|
||||
@ -346,12 +359,20 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
||||
/* if I am the HNP or a tool, then I stored a route to
|
||||
* this job family, so look it up
|
||||
*/
|
||||
rc = opal_hash_table_get_value_uint32(&jobfam_list,
|
||||
ORTE_JOB_FAMILY(target->jobid), (void**)&ret);
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
/* got a good result - return it */
|
||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: route to %s found",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(target->jobid)));
|
||||
ret = &jfam->route;
|
||||
goto found;
|
||||
}
|
||||
}
|
||||
/* not found - so we have no route */
|
||||
ret = ORTE_NAME_INVALID;
|
||||
goto found;
|
||||
@ -569,7 +590,12 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
* the server - we need to pass the routing info to our HNP
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
int rc;
|
||||
int rc, n;
|
||||
opal_buffer_t xfer;
|
||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||
ptrdiff_t unpack_rel;
|
||||
bool found;
|
||||
char *uri, *hnps;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial: init routes w/non-NULL data",
|
||||
@ -590,15 +616,59 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
* start by sending the contact info to the HNP for update
|
||||
*/
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_binomial_init_routes: diff job family - sending update to %s",
|
||||
"%s routed_binomial_init_routes: diff job family %s - sending update to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
|
||||
/* prep the buffer for transmission to the HNP */
|
||||
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
|
||||
opal_dss.pack(&xfer, &cmd, 1, ORTE_RML_CMD);
|
||||
opal_dss.copy_payload(&xfer, ndat);
|
||||
|
||||
/* save any new connections for use in subsequent connect_accept calls */
|
||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
||||
found = false;
|
||||
n = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
||||
/* check if we already have the incoming one */
|
||||
if (0 == strcmp(uri, hnps)) {
|
||||
found = true;
|
||||
free(hnps);
|
||||
break;
|
||||
}
|
||||
free(hnps);
|
||||
}
|
||||
if (!found) {
|
||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
||||
}
|
||||
free(uri);
|
||||
found = false;
|
||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
||||
}
|
||||
|
||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
||||
opal_buffer_t dng;
|
||||
char *dmn;
|
||||
int grr;
|
||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
||||
grr = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
||||
free(dmn);
|
||||
}
|
||||
OBJ_DESTRUCT(&dng);
|
||||
}
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
@ -702,12 +772,35 @@ static int route_lost(const orte_process_name_t *route)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
orte_routed_tree_t *child;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
int i;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s route to %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(route)));
|
||||
|
||||
/* if the route is to a different job family and we are the HNP, look it up */
|
||||
if ((ORTE_JOB_FAMILY(route->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) &&
|
||||
ORTE_PROC_IS_HNP) {
|
||||
jfamily = ORTE_JOB_FAMILY(route->jobid);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: route to %s lost",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(route->jobid)));
|
||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
||||
OBJ_RELEASE(jfam);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* if we lose the connection to the lifeline and we are NOT already,
|
||||
* in finalize, tell the OOB to abort.
|
||||
* NOTE: we cannot call abort from here as the OOB needs to first
|
||||
@ -746,6 +839,34 @@ static int route_lost(const orte_process_name_t *route)
|
||||
|
||||
static bool route_is_defined(const orte_process_name_t *target)
|
||||
{
|
||||
int i;
|
||||
orte_routed_jobfam_t *jfam;
|
||||
uint16_t jfamily;
|
||||
|
||||
/* if the route is to a different job family and we are the HNP, look it up */
|
||||
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||
for (i=0; i < jobfams.size; i++) {
|
||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
||||
continue;
|
||||
}
|
||||
if (jfam->job_family == jfamily) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
|
||||
"%s routed_binomial: route to %s is defined",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOB_FAMILY_PRINT(target->jobid)));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
/* if we are not the HNP, then the answer is always true as
|
||||
* we send it via the HNP
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
|
||||
/* find out what daemon hosts this proc */
|
||||
if (ORTE_VPID_INVALID == orte_ess.proc_get_daemon((orte_process_name_t*)target)) {
|
||||
return false;
|
||||
@ -907,14 +1028,7 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if I am anything other than the HNP, this
|
||||
* is a meaningless command as I cannot get
|
||||
* the requested info
|
||||
*/
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are not using static ports, then we need to share the
|
||||
* comm info - otherwise, just return
|
||||
*/
|
||||
@ -924,7 +1038,18 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if I am an application, this is occurring during connect_accept.
|
||||
* We need to return the stored information of other HNPs we
|
||||
* know about, if any
|
||||
*/
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,8 @@
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
@ -597,7 +599,12 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
* the server - we need to pass the routing info to our HNP
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
int rc;
|
||||
int rc, n;
|
||||
opal_buffer_t xfer;
|
||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||
ptrdiff_t unpack_rel;
|
||||
bool found;
|
||||
char *uri, *hnps;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_cm: init routes w/non-NULL data",
|
||||
@ -622,11 +629,54 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
|
||||
/* prep the buffer for transmission to the HNP */
|
||||
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
|
||||
opal_dss.pack(&xfer, &cmd, 1, ORTE_RML_CMD);
|
||||
opal_dss.copy_payload(&xfer, ndat);
|
||||
|
||||
/* save any new connections for use in subsequent connect_accept calls */
|
||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
||||
found = false;
|
||||
n = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
||||
/* check if we already have the incoming one */
|
||||
if (0 == strcmp(uri, hnps)) {
|
||||
found = true;
|
||||
free(hnps);
|
||||
break;
|
||||
}
|
||||
free(hnps);
|
||||
}
|
||||
if (!found) {
|
||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
||||
}
|
||||
free(uri);
|
||||
found = false;
|
||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
||||
}
|
||||
|
||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
||||
opal_buffer_t dng;
|
||||
char *dmn;
|
||||
int grr;
|
||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
||||
grr = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
||||
free(dmn);
|
||||
}
|
||||
OBJ_DESTRUCT(&dng);
|
||||
}
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
@ -854,14 +904,7 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if I am anything other than the HNP, this
|
||||
* is a meaningless command as I cannot get
|
||||
* the requested info
|
||||
*/
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are not using static ports, then we need to share the
|
||||
* comm info - otherwise, just return
|
||||
*/
|
||||
@ -871,7 +914,18 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if I am an application, this is occurring during connect_accept.
|
||||
* We need to return the stored information of other HNPs we
|
||||
* know about, if any
|
||||
*/
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,8 @@
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/class/opal_bitmap.h"
|
||||
@ -526,7 +528,12 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
* the server - we need to pass the routing info to our HNP
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
int rc;
|
||||
int rc, n;
|
||||
opal_buffer_t xfer;
|
||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||
ptrdiff_t unpack_rel;
|
||||
bool found;
|
||||
char *uri, *hnps;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_linear: init routes w/non-NULL data",
|
||||
@ -551,11 +558,54 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
|
||||
/* prep the buffer for transmission to the HNP */
|
||||
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
|
||||
opal_dss.pack(&xfer, &cmd, 1, ORTE_RML_CMD);
|
||||
opal_dss.copy_payload(&xfer, ndat);
|
||||
|
||||
/* save any new connections for use in subsequent connect_accept calls */
|
||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
||||
found = false;
|
||||
n = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
||||
/* check if we already have the incoming one */
|
||||
if (0 == strcmp(uri, hnps)) {
|
||||
found = true;
|
||||
free(hnps);
|
||||
break;
|
||||
}
|
||||
free(hnps);
|
||||
}
|
||||
if (!found) {
|
||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
||||
}
|
||||
free(uri);
|
||||
found = false;
|
||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
||||
}
|
||||
|
||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
||||
opal_buffer_t dng;
|
||||
char *dmn;
|
||||
int grr;
|
||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
||||
grr = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
||||
free(dmn);
|
||||
}
|
||||
OBJ_DESTRUCT(&dng);
|
||||
}
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
@ -755,14 +805,7 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if I am anything other than the HNP, this
|
||||
* is a meaningless command as I cannot get
|
||||
* the requested info
|
||||
*/
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are not using static ports, then we need to share the
|
||||
* comm info - otherwise, just return
|
||||
*/
|
||||
@ -772,7 +815,18 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if I am an application, this is occurring during connect_accept.
|
||||
* We need to return the stored information of other HNPs we
|
||||
* know about, if any
|
||||
*/
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,8 @@
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
@ -558,7 +560,12 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
* the server - we need to pass the routing info to our HNP
|
||||
*/
|
||||
if (NULL != ndat) {
|
||||
int rc;
|
||||
int rc, n;
|
||||
opal_buffer_t xfer;
|
||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||
ptrdiff_t unpack_rel;
|
||||
bool found;
|
||||
char *uri, *hnps;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||
"%s routed_radix: init routes w/non-NULL data",
|
||||
@ -583,11 +590,54 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, ndat,
|
||||
/* prep the buffer for transmission to the HNP */
|
||||
OBJ_CONSTRUCT(&xfer, opal_buffer_t);
|
||||
opal_dss.pack(&xfer, &cmd, 1, ORTE_RML_CMD);
|
||||
opal_dss.copy_payload(&xfer, ndat);
|
||||
|
||||
/* save any new connections for use in subsequent connect_accept calls */
|
||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
||||
found = false;
|
||||
n = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
||||
/* check if we already have the incoming one */
|
||||
if (0 == strcmp(uri, hnps)) {
|
||||
found = true;
|
||||
free(hnps);
|
||||
break;
|
||||
}
|
||||
free(hnps);
|
||||
}
|
||||
if (!found) {
|
||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
||||
}
|
||||
free(uri);
|
||||
found = false;
|
||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
||||
}
|
||||
|
||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
||||
opal_buffer_t dng;
|
||||
char *dmn;
|
||||
int grr;
|
||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
||||
grr = 1;
|
||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
||||
free(dmn);
|
||||
}
|
||||
OBJ_DESTRUCT(&dng);
|
||||
}
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&xfer);
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
@ -894,14 +944,7 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* if I am anything other than the HNP, this
|
||||
* is a meaningless command as I cannot get
|
||||
* the requested info
|
||||
*/
|
||||
if (!ORTE_PROC_IS_HNP) {
|
||||
return ORTE_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
if (ORTE_PROC_IS_HNP) {
|
||||
/* if we are not using static ports, then we need to share the
|
||||
* comm info - otherwise, just return
|
||||
*/
|
||||
@ -911,7 +954,18 @@ static int get_wireup_info(opal_buffer_t *buf)
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if I am an application, this is occurring during connect_accept.
|
||||
* We need to return the stored information of other HNPs we
|
||||
* know about, if any
|
||||
*/
|
||||
if (ORTE_PROC_IS_APP) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ typedef bool (*orte_routed_module_route_is_defined_fn_t)(const orte_process_name
|
||||
* Get wireup data for daemons
|
||||
*
|
||||
* Add whatever routing data
|
||||
* this module requires to allow inter-process messaging. Only callable by HNP.
|
||||
* this module requires to allow inter-process messaging.
|
||||
*/
|
||||
typedef int (*orte_routed_module_get_wireup_info_fn_t)(opal_buffer_t *buf);
|
||||
|
||||
|
@ -38,6 +38,14 @@ typedef struct {
|
||||
} orte_routed_tree_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_tree_t);
|
||||
|
||||
/* struct for tracking external routes */
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
uint16_t job_family;
|
||||
orte_process_name_t route;
|
||||
} orte_routed_jobfam_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_jobfam_t);
|
||||
|
||||
#endif
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -185,6 +185,9 @@ orte_default_comm_fn_t orte_comm;
|
||||
bool orte_report_child_jobs_separately;
|
||||
struct timeval orte_child_time_to_exit;
|
||||
|
||||
/* record uri's of remote hnps */
|
||||
opal_buffer_t orte_remote_hnps;
|
||||
|
||||
#endif /* !ORTE_DISABLE_FULL_RTE */
|
||||
|
||||
int orte_debug_output = -1;
|
||||
|
@ -708,6 +708,9 @@ ORTE_DECLSPEC int orte_global_comm(orte_process_name_t *recipient,
|
||||
ORTE_DECLSPEC extern bool orte_report_child_jobs_separately;
|
||||
ORTE_DECLSPEC extern struct timeval orte_child_time_to_exit;
|
||||
|
||||
/* record uri's of remote hnps */
|
||||
ORTE_DECLSPEC extern opal_buffer_t orte_remote_hnps;
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -1,4 +1,4 @@
|
||||
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave_spawn slave cell_spawn reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster hello++ hellof90 early_abort debugger
|
||||
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave_spawn slave cell_spawn reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster hello++ hellof90 early_abort debugger singleton_client_server
|
||||
|
||||
all: $(PROGS)
|
||||
|
||||
|
@ -53,4 +53,5 @@ EXTRA_DIST += \
|
||||
test/mpi/slave.c \
|
||||
test/mpi/spawn_multiple.c \
|
||||
test/mpi/ziatest.c \
|
||||
test/mpi/ziaprobe.c
|
||||
test/mpi/ziaprobe.c \
|
||||
test/mpi/singleton_client_server.c
|
||||
|
213
orte/test/mpi/singleton_client_server.c
Обычный файл
213
orte/test/mpi/singleton_client_server.c
Обычный файл
@ -0,0 +1,213 @@
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include <mpi.h>
|
||||
|
||||
/*
|
||||
|
||||
LOGIC:
|
||||
|
||||
- the 'server' opens a port and write the info to a file
|
||||
- the 'clients' open the file and connect to the port
|
||||
- after each accept, the server and client do a merge to
|
||||
convert the intercomm to an intracomm
|
||||
|
||||
DETAIL STEPS:
|
||||
|
||||
- server open port
|
||||
- server does accept
|
||||
- client #1 does connect
|
||||
- server and client #1 do merge
|
||||
- server does accept
|
||||
- client #2 does connect
|
||||
- server, client #1 and client #2 do merge
|
||||
- server does accept
|
||||
- client #3 does connect
|
||||
- server, client #1, client #2 and client #3 do merge
|
||||
|
||||
*/
|
||||
|
||||
#define TAG 0
|
||||
|
||||
#define CHK(code) do \
|
||||
{ \
|
||||
int retval = code ; \
|
||||
if (retval != MPI_SUCCESS) \
|
||||
{ \
|
||||
fprintf(stderr, "Error: " #code "\n") ; \
|
||||
exit(1) ; \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
char hostname[255] ;
|
||||
char buff[255] ;
|
||||
|
||||
int role ;
|
||||
int num_clients ;
|
||||
int size, rank ;
|
||||
|
||||
FILE *fp ;
|
||||
char server_port_name[MPI_MAX_PORT_NAME] ;
|
||||
|
||||
MPI_Comm intercomm, intracomm ;
|
||||
MPI_Status status ;
|
||||
int msg_count ;
|
||||
int i ;
|
||||
|
||||
/* sanity check the args */
|
||||
if(argc != 3)
|
||||
{
|
||||
fprintf(stderr, "usage %s <num clients> <1:server | 0:client>\n", argv[0]) ;
|
||||
exit(1) ;
|
||||
}
|
||||
|
||||
num_clients = atoi(argv[1]) ;
|
||||
role = atoi(argv[2]) ;
|
||||
|
||||
if (num_clients <= 0 || (role != 0 && role != 1))
|
||||
{
|
||||
fprintf(stderr, "usage %s <num clients> <1:server | 0:client>\n", argv[0]) ;
|
||||
exit(1) ;
|
||||
}
|
||||
|
||||
/* initialize MPI */
|
||||
CHK(MPI_Init(&argc, &argv)) ;
|
||||
|
||||
/* get the node name */
|
||||
{
|
||||
int retval = gethostname(hostname, 255) ;
|
||||
if(retval == -1)
|
||||
{
|
||||
fprintf(stderr, "gethostname failed: %s\n", strerror(errno)) ;
|
||||
exit(1) ;
|
||||
}
|
||||
}
|
||||
|
||||
/* server */
|
||||
if(role == 1)
|
||||
{
|
||||
printf("SERVER: on node '%s'\n", hostname) ;
|
||||
|
||||
/* open port to establish connections */
|
||||
CHK(MPI_Open_port(MPI_INFO_NULL, server_port_name)) ;
|
||||
|
||||
printf("SERVER: opened port=%s\n", server_port_name) ;
|
||||
|
||||
/* store the port name */
|
||||
fp = fopen("server_port_name.txt", "w") ;
|
||||
if(fp == NULL)
|
||||
{
|
||||
fprintf(stderr, "fopen failed: %s\n", strerror(errno)) ;
|
||||
exit(1) ;
|
||||
}
|
||||
fprintf(fp, "%s", server_port_name) ;
|
||||
fclose(fp) ;
|
||||
|
||||
/* the server accepts connections from all the clients */
|
||||
for(i = 0 ; i < num_clients ; i++ )
|
||||
{
|
||||
/* accept connections at this port */
|
||||
CHK(MPI_Comm_accept(server_port_name, MPI_INFO_NULL, 0,
|
||||
i == 0 ? MPI_COMM_WORLD : intracomm,
|
||||
&intercomm)) ;
|
||||
|
||||
printf("SERVER: accepted connection from client %d\n", i+1) ;
|
||||
|
||||
/* merge, to form one intra communicator */
|
||||
CHK(MPI_Intercomm_merge(intercomm, 0, &intracomm)) ;
|
||||
|
||||
printf("SERVER: merged with client %d\n", i+1) ;
|
||||
|
||||
CHK(MPI_Comm_size(intracomm, &size)) ;
|
||||
CHK(MPI_Comm_rank(intracomm, &rank)) ;
|
||||
|
||||
printf("SERVER: after merging with client %d: size=%d rank=%d\n", i+1, size, rank) ;
|
||||
}
|
||||
} /* end server */
|
||||
|
||||
/* client */
|
||||
if(role == 0)
|
||||
{
|
||||
printf("CLIENT: on node '%s'\n", hostname) ;
|
||||
|
||||
fp = fopen("server_port_name.txt", "r") ;
|
||||
if(fp == NULL)
|
||||
{
|
||||
fprintf(stderr, "fopen failed: %s\n", strerror(errno)) ;
|
||||
exit(1) ;
|
||||
}
|
||||
fscanf(fp, "%s", server_port_name) ;
|
||||
fclose(fp) ;
|
||||
|
||||
printf("CLIENT: attempting to connect to server on port=%s\n", server_port_name) ;
|
||||
|
||||
/* connect to the server */
|
||||
CHK(MPI_Comm_connect (server_port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm)) ;
|
||||
|
||||
printf("CLIENT: connected to server on port\n") ;
|
||||
|
||||
/* merge the server and client to one intra communicator */
|
||||
CHK(MPI_Intercomm_merge(intercomm, 1, &intracomm)) ;
|
||||
|
||||
printf("CLIENT: merged with existing intracomm\n") ;
|
||||
|
||||
CHK(MPI_Comm_size(intracomm, &size)) ;
|
||||
CHK(MPI_Comm_rank(intracomm, &rank)) ;
|
||||
|
||||
printf("CLIENT: after merging, new comm: size=%d rank=%d\n", size, rank) ;
|
||||
|
||||
for (i = rank ; i < num_clients ; i++)
|
||||
{
|
||||
/* client performs a collective accept */
|
||||
CHK(MPI_Comm_accept(server_port_name, MPI_INFO_NULL, 0, intracomm, &intercomm)) ;
|
||||
|
||||
printf("CLIENT: connected to server on port\n") ;
|
||||
|
||||
/* merge the two intra comms back to one communicator */
|
||||
CHK(MPI_Intercomm_merge(intercomm, 0, &intracomm)) ;
|
||||
|
||||
printf("CLIENT: merged with existing members\n") ;
|
||||
|
||||
CHK(MPI_Comm_size(intracomm, &size)) ;
|
||||
CHK(MPI_Comm_rank(intracomm, &rank)) ;
|
||||
|
||||
printf("CLIENT: new size after merging with existing members: size=%d rank=%d\n", size, rank) ;
|
||||
}
|
||||
|
||||
} /* end client */
|
||||
|
||||
CHK(MPI_Comm_size(intracomm, &size)) ;
|
||||
CHK(MPI_Comm_rank(intracomm, &rank)) ;
|
||||
|
||||
printf("After fusion: size=%d rank=%d\n", size, rank) ;
|
||||
|
||||
if(rank == 0)
|
||||
{
|
||||
msg_count = num_clients ;
|
||||
|
||||
while(msg_count)
|
||||
{
|
||||
CHK(MPI_Recv(buff, 255, MPI_CHAR, MPI_ANY_SOURCE,
|
||||
MPI_ANY_TAG, intracomm, &status)) ;
|
||||
|
||||
printf("Received hello msg from '%s'\n", buff) ;
|
||||
msg_count-- ;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* all ranks > 0 */
|
||||
|
||||
CHK(MPI_Send(hostname, strlen(hostname) + 1, MPI_CHAR, 0, TAG, intracomm)) ;
|
||||
}
|
||||
|
||||
CHK(MPI_Finalize()) ;
|
||||
|
||||
fprintf(stderr, "Rank %d is exiting\n", rank);
|
||||
return 0 ;
|
||||
}
|
Загрузка…
Ссылка в новой задаче
Block a user