1
1
openmpi/orte/mca/rml/base/rml_base_contact.c
2015-06-23 20:59:57 -07:00

180 строки
5.6 KiB
C

/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/rml/base/base.h"
int orte_rml_base_get_contact_info(orte_jobid_t job, opal_buffer_t *data)
{
int i;
orte_job_t *jdata;
orte_proc_t *proc;
int rc;
/* lookup the job */
if (NULL == (jdata = orte_get_job_data_object(job))) {
/* bad jobid */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* cycle through all procs in the job, adding their contact info to the buffer */
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
/* if this proc doesn't have any contact info, ignore it */
if (NULL == proc->rml_uri) {
continue;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &proc->rml_uri, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
return ORTE_SUCCESS;
}
int orte_rml_base_update_contact_info(opal_buffer_t* data)
{
orte_std_cntr_t cnt;
orte_vpid_t num_procs;
char *rml_uri;
orte_process_name_t name;
bool got_name;
int rc;
/* unpack the data for each entry */
num_procs = 0;
name.jobid = ORTE_JOBID_INVALID;
got_name = false;
cnt = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
"%s rml:base:update:contact:info got uri %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
NULL == rml_uri ? "NULL" : rml_uri));
if (NULL != rml_uri) {
/* set the contact info into the hash table */
orte_rml.set_contact_info(rml_uri);
if (!got_name) {
/* we only get an update from a single jobid - the command
* that creates these doesn't cross jobid boundaries - so
* record it here
*/
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return rc;
}
got_name = true;
/* if this is for a different job family, update the route to this proc */
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&name, &name))) {
ORTE_ERROR_LOG(rc);
free(rml_uri);
return rc;
}
}
}
free(rml_uri);
}
/* track how many procs were in the message */
++num_procs;
}
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are a daemon and this was info about our jobid, this update would
* include updated contact info
* for all daemons in the system - indicating that the number of daemons
* changed since we were initially launched. Thus, update the num_procs
* in our process_info struct so we can correctly route any messages
*/
if (ORTE_PROC_MY_NAME->jobid == name.jobid &&
ORTE_PROC_IS_DAEMON &&
orte_process_info.num_procs < num_procs) {
orte_process_info.num_procs = num_procs;
if (orte_process_info.max_procs < orte_process_info.num_procs) {
orte_process_info.max_procs = orte_process_info.num_procs;
}
/* if we changed it, then we better update the routing
* plan so daemon collectives work correctly
*/
orte_routed.update_routing_plan();
}
return ORTE_SUCCESS;
}
int
orte_rml_base_parse_uris(const char* uri,
orte_process_name_t* peer,
char*** uris)
{
int rc;
/* parse the process name */
char* cinfo = strdup(uri);
char* ptr = strchr(cinfo, ';');
if(NULL == ptr) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
free(cinfo);
return ORTE_ERR_BAD_PARAM;
}
*ptr = '\0';
ptr++;
if (ORTE_SUCCESS != (rc = orte_util_convert_string_to_process_name(peer, cinfo))) {
ORTE_ERROR_LOG(rc);
free(cinfo);
return rc;
}
if (NULL != uris) {
/* parse the remainder of the string into an array of uris */
*uris = opal_argv_split(ptr, ';');
}
free(cinfo);
return ORTE_SUCCESS;
}