1
1
openmpi/orte/mca/rml/base/rml_base_contact.c
Ralph Castain bd8b4f7f1e Sorry for mid-day commit, but I had promised on the call to do this upon my return.
Roll in the ORTE state machine. Remove last traces of opal_sos. Remove UTK epoch code.

Please see the various emails about the state machine change for details. I'll send something out later with more info on the new arch.

This commit was SVN r26242.
2012-04-06 14:23:13 +00:00

184 строки
5.8 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/rml/base/base.h"
int orte_rml_base_get_contact_info(orte_jobid_t job, opal_buffer_t *data)
{
int i;
orte_job_t *jdata;
orte_proc_t *proc;
int rc;
/* lookup the job */
if (NULL == (jdata = orte_get_job_data_object(job))) {
/* bad jobid */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* cycle through all procs in the job, adding their contact info to the buffer */
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* if this proc doesn't have any contact info, ignore it */
if (NULL == proc->rml_uri) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &proc->rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}
int orte_rml_base_update_contact_info(opal_buffer_t* data)
{
orte_std_cntr_t cnt;
orte_vpid_t num_procs;
char *rml_uri;
orte_process_name_t name;
bool got_name;
int rc;
/* unpack the data for each entry */
num_procs = 0;
name.jobid = ORTE_JOBID_INVALID;
got_name = false;
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_output,
"%s rml:base:update:contact:info got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
NULL == rml_uri ? "NULL" : rml_uri));
if (NULL != rml_uri) {
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return(rc);
}
if (!got_name) {
/* we only get an update from a single jobid - the command
* that creates these doesn't cross jobid boundaries - so
* record it here
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return rc;
}
got_name = true;
/* if this is for a different job family, update the route to this proc */
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&name, &name))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return rc;
}
}
}
free(rml_uri);
}
/* track how many procs were in the message */
++num_procs;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are a daemon and this was info about our jobid, this update would
* include updated contact info
* for all daemons in the system - indicating that the number of daemons
* changed since we were initially launched. Thus, update the num_procs
* in our process_info struct so we can correctly route any messages
*/
if (ORTE_PROC_MY_NAME->jobid == name.jobid &&
ORTE_PROC_IS_DAEMON &&
orte_process_info.num_procs < num_procs) {
orte_process_info.num_procs = num_procs;
if (orte_process_info.max_procs < orte_process_info.num_procs) {
orte_process_info.max_procs = orte_process_info.num_procs;
}
/* if we changed it, then we better update the routing
* plan so daemon collectives work correctly
*/
orte_routed.update_routing_plan();
}
return ORTE_SUCCESS;
}
int
orte_rml_base_parse_uris(const char* uri,
orte_process_name_t* peer,
char*** uris)
{
int rc;
/* parse the process name */
char* cinfo = strdup(uri);
char* ptr = strchr(cinfo, ';');
if(NULL == ptr) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(cinfo);
return ORTE_ERR_BAD_PARAM;
}
*ptr = '\0';
ptr++;
if (ORTE_SUCCESS != (rc = orte_util_convert_string_to_process_name(peer, cinfo))) {
ORTE_ERROR_LOG(rc);
free(cinfo);
return rc;
}
if (NULL != uris) {
/* parse the remainder of the string into an array of uris */
*uris = opal_argv_split(ptr, ';');
}
free(cinfo);
return ORTE_SUCCESS;
}