bd9265c560
A few changes were required to support this move: 1. the PMI component used to identify rte-related data (e.g., host name, bind level) and package them as a unit to reduce the number of PMI keys. This code was moved up to the ORTE layer as the OPAL layer has no understanding of these concepts. In addition, the component locally stored data based on process jobid/vpid - this could no longer be supported (see below for the solution). 2. the hash component was updated to use the new opal_identifier_t instead of orte_process_name_t as its index for storing data in the hash tables. Previously, we did a hash on the vpid and stored the data in a 32-bit hash table. In the revised system, we don't see a separate "vpid" field - we only have a 64-bit opaque value. The orte_process_name_t hash turned out to do nothing useful, so we now store the data in a 64-bit hash table. Preliminary tests didn't show any identifiable change in behavior or performance, but we'll have to see if a move back to the 32-bit table is required at some later time. 3. the db framework was a "select one" system. However, since the PMI component could no longer use its internal storage system, the framework has now been changed to a "select many" mode of operation. This allows the hash component to handle all internal storage, while the PMI component only handles pushing/pulling things from the PMI system. This was something we had planned for some time - when fetching data, we first check internal storage to see if we already have it, and then automatically go to the global system to look for it if we don't. Accordingly, the framework was provided with a custom query function used during "select" that lets you seperately specify the "store" and "fetch" ordering. 4. the ORTE grpcomm and ess/pmi components, and the nidmap code, were updated to work with the new db framework and to specify internal/global storage options. No changes were made to the MPI layer, except for modifying the ORTE component of the OMPI/rte framework to support the new db framework. This commit was SVN r28112.
574 строки
22 KiB
C
574 строки
22 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; -*- */
|
|
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2009 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2009 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
|
|
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
|
|
* All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
#include "orte/types.h"
|
|
|
|
#include <string.h>
|
|
#ifdef HAVE_SYS_TIME_H
|
|
#include <sys/time.h>
|
|
#endif /* HAVE_SYS_TIME_H */
|
|
|
|
#include "opal/threads/condition.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/class/opal_hash_table.h"
|
|
#include "opal/dss/dss.h"
|
|
#include "opal/mca/db/db.h"
|
|
#include "opal/mca/hwloc/base/base.h"
|
|
|
|
#include "orte/util/proc_info.h"
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/ess/ess.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/nidmap.h"
|
|
#include "orte/orted/orted.h"
|
|
#include "orte/runtime/orte_wait.h"
|
|
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
#include "orte/mca/grpcomm/grpcomm.h"
|
|
|
|
orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void)
|
|
{
|
|
orte_grpcomm_coll_id_t id;
|
|
|
|
/* assign the next collective id */
|
|
id = orte_grpcomm_base.coll_id;
|
|
/* rotate to the next value */
|
|
orte_grpcomm_base.coll_id++;
|
|
return id;
|
|
}
|
|
|
|
|
|
/*************** MODEX SECTION **************/
|
|
void orte_grpcomm_base_modex(int fd, short args, void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
|
|
orte_grpcomm_collective_t *modex = caddy->op;
|
|
int rc;
|
|
orte_namelist_t *nm;
|
|
opal_list_item_t *item;
|
|
bool found;
|
|
orte_grpcomm_collective_t *cptr;
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:modex: performing modex",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
if (0 == opal_list_get_size(&modex->participants)) {
|
|
/* record the collective */
|
|
modex->next_cbdata = modex;
|
|
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
|
|
|
/* put our process name in the buffer so it can be unpacked later */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* add a wildcard name to the participants so the daemon knows
|
|
* the jobid that is involved in this collective
|
|
*/
|
|
nm = OBJ_NEW(orte_namelist_t);
|
|
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
nm->name.vpid = ORTE_VPID_WILDCARD;
|
|
opal_list_append(&modex->participants, &nm->super);
|
|
modex->next_cb = orte_grpcomm_base_store_modex;
|
|
} else {
|
|
/* see if the collective is already present - a race condition
|
|
* exists where other participants may have already sent us their
|
|
* contribution. This would place the collective on the global
|
|
* array, but leave it marked as "inactive" until we call
|
|
* modex with the list of participants
|
|
*/
|
|
found = false;
|
|
for (item = opal_list_get_first(&orte_grpcomm_base.active_colls);
|
|
item != opal_list_get_end(&orte_grpcomm_base.active_colls);
|
|
item = opal_list_get_next(item)) {
|
|
cptr = (orte_grpcomm_collective_t*)item;
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s CHECKING COLL id %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
cptr->id));
|
|
|
|
if (modex->id == cptr->id) {
|
|
found = true;
|
|
/* remove the old entry - we will replace it
|
|
* with the modex one
|
|
*/
|
|
opal_list_remove_item(&orte_grpcomm_base.active_colls, item);
|
|
break;
|
|
}
|
|
}
|
|
if (found) {
|
|
/* since it already exists, the list of
|
|
* targets contains the list of procs
|
|
* that have already sent us their info. Cycle
|
|
* thru the targets and move those entries to
|
|
* the modex object
|
|
*/
|
|
while (NULL != (item = opal_list_remove_first(&cptr->targets))) {
|
|
opal_list_append(&modex->targets, item);
|
|
}
|
|
/* copy the previously-saved data across */
|
|
opal_dss.copy_payload(&modex->local_bucket, &cptr->local_bucket);
|
|
/* cleanup */
|
|
OBJ_RELEASE(cptr);
|
|
}
|
|
/* now add the modex to the global list of active collectives */
|
|
modex->next_cb = orte_grpcomm_base_store_peer_modex;
|
|
modex->next_cbdata = modex;
|
|
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
|
|
|
/* this is not amongst our peers, but rather between a select
|
|
* group of processes - e.g., during a connect/accept operation.
|
|
* Thus, this requires we send additional info
|
|
*/
|
|
|
|
/* pack the collective id */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &modex->id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our name */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our hostname */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.nodename, 1, OPAL_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our daemon's vpid */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &ORTE_PROC_MY_DAEMON->vpid, 1, ORTE_VPID))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our node rank */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.my_node_rank, 1, ORTE_NODE_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our local rank */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.my_local_rank, 1, ORTE_LOCAL_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
#if OPAL_HAVE_HWLOC
|
|
/* pack our binding info so other procs can determine our locality */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.bind_level, 1, OPAL_HWLOC_LEVEL_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &orte_process_info.bind_idx, 1, OPAL_UINT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/* pack the entries we have received */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&modex->buffer))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:full:modex: executing allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* execute the allgather */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(modex))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:modex: modex posted",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return;
|
|
|
|
cleanup:
|
|
return;
|
|
}
|
|
|
|
void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata)
|
|
{
|
|
int rc, cnt;
|
|
orte_process_name_t pname;
|
|
opal_identifier_t *id;
|
|
char *hostname;
|
|
orte_vpid_t daemon;
|
|
orte_node_rank_t node_rank;
|
|
orte_local_rank_t local_rank;
|
|
orte_grpcomm_collective_t *modex = (orte_grpcomm_collective_t*)cbdata;
|
|
opal_hwloc_locality_t locality;
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s STORING PEER MODEX DATA",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* unpack the process name */
|
|
cnt=1;
|
|
while (ORTE_SUCCESS == (rc = opal_dss.unpack(rbuf, &pname, &cnt, ORTE_NAME))) {
|
|
id = (opal_identifier_t*)&pname;
|
|
/* unpack and store the hostname */
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &hostname, &cnt, OPAL_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* unpack and store the daemon vpid */
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &daemon, &cnt, ORTE_VPID))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_DAEMON_VPID, &daemon, OPAL_UINT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* unpack and store the node rank */
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &node_rank, &cnt, ORTE_NODE_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* unpack the local rank */
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &local_rank, &cnt, ORTE_LOCAL_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* compute the locality and store in the database */
|
|
#if OPAL_HAVE_HWLOC
|
|
{
|
|
opal_hwloc_level_t bind_level;
|
|
unsigned int bind_idx;
|
|
|
|
/* unpack and store the locality info */
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &bind_level, &cnt, OPAL_HWLOC_LEVEL_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_BIND_LEVEL, &bind_level, OPAL_HWLOC_LEVEL_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &bind_idx, &cnt, OPAL_UINT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_BIND_INDEX, &bind_idx, OPAL_UINT))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s store:peer:modex setting proc %s level %s idx %u",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname),
|
|
opal_hwloc_base_print_level(bind_level), bind_idx));
|
|
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &pname, ORTE_PROC_MY_NAME)) {
|
|
/* if this data is from myself, then set locality to all */
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex setting proc %s locale ALL",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
locality = OPAL_PROC_ALL_LOCAL;
|
|
} else if (daemon != ORTE_PROC_MY_DAEMON->vpid) {
|
|
/* this is on a different node, then mark as non-local */
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex setting proc %s locale NONLOCAL",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
locality = OPAL_PROC_NON_LOCAL;
|
|
} else if (OPAL_HWLOC_NODE_LEVEL == orte_process_info.bind_level ||
|
|
OPAL_HWLOC_NODE_LEVEL == bind_level) {
|
|
/* one or both of us is not bound, so all we can say is we are on the
|
|
* same node
|
|
*/
|
|
locality = OPAL_PROC_ON_NODE;
|
|
} else {
|
|
/* determine relative location on our node */
|
|
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
|
|
orte_process_info.bind_level,
|
|
orte_process_info.bind_idx,
|
|
bind_level, bind_idx);
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex setting proc %s locale %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname),
|
|
opal_hwloc_base_print_locality(locality)));
|
|
}
|
|
}
|
|
#else
|
|
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &pname, ORTE_PROC_MY_NAME)) {
|
|
/* if this data is from myself, then set locality to all */
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:modex setting proc %s locale ALL",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
locality = OPAL_PROC_ALL_LOCAL;
|
|
} else if (daemon != ORTE_PROC_MY_DAEMON->vpid) {
|
|
/* this is on a different node, then mark as non-local */
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex setting proc %s locale NONLOCAL",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
locality = OPAL_PROC_NON_LOCAL;
|
|
} else {
|
|
/* must be on our node */
|
|
locality = OPAL_PROC_ON_NODE;
|
|
}
|
|
#endif
|
|
if (ORTE_SUCCESS != (rc = opal_db.store((*id), OPAL_DB_INTERNAL, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex: adding modex entry for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
|
|
/* update the modex database */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&pname, rbuf))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s store:peer:modex: completed modex entry for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
}
|
|
|
|
cleanup:
|
|
/* flag the collective as complete */
|
|
modex->active = false;
|
|
/* cleanup the list, but don't release the
|
|
* collective object as it was passed into us
|
|
*/
|
|
opal_list_remove_item(&orte_grpcomm_base.active_colls, &modex->super);
|
|
/* notify that the modex is complete */
|
|
if (NULL != modex->cbfunc) {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s CALLING MODEX RELEASE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
modex->cbfunc(NULL, modex->cbdata);
|
|
} else {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s store:peer:modex NO MODEX RELEASE CBFUNC",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
}
|
|
}
|
|
|
|
void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata)
|
|
{
|
|
orte_std_cntr_t cnt;
|
|
orte_process_name_t proc_name;
|
|
int rc=ORTE_SUCCESS;
|
|
orte_grpcomm_collective_t *modex = (orte_grpcomm_collective_t*)cbdata;
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s STORING MODEX DATA",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* unpack the process name */
|
|
cnt=1;
|
|
while (ORTE_SUCCESS == (rc = opal_dss.unpack(rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:store_modex adding modex entry for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&proc_name)));
|
|
|
|
/* update the modex database */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, rbuf))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
|
|
ORTE_ERROR_LOG(rc);
|
|
}
|
|
|
|
cleanup:
|
|
/* flag the modex as complete */
|
|
modex->active = false;
|
|
/* cleanup */
|
|
opal_list_remove_item(&orte_grpcomm_base.active_colls, &modex->super);
|
|
/* execute user callback, if requested */
|
|
if (NULL != modex->cbfunc) {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
|
"%s CALLING MODEX RELEASE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
modex->cbfunc(NULL, modex->cbdata);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
|
|
opal_buffer_t *rbuf)
|
|
{
|
|
int rc = ORTE_SUCCESS;
|
|
int32_t num_recvd_entries;
|
|
orte_std_cntr_t cnt;
|
|
orte_std_cntr_t j;
|
|
opal_identifier_t *id;
|
|
|
|
/* unpack the number of entries for this proc */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_recvd_entries, &cnt, OPAL_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:update_modex_entries: adding %d entries for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
|
ORTE_NAME_PRINT(proc_name)));
|
|
|
|
id = (opal_identifier_t*)proc_name;
|
|
|
|
/*
|
|
* Extract the attribute names and values
|
|
*/
|
|
for (j = 0; j < num_recvd_entries; j++) {
|
|
opal_value_t *kv;
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &kv, &cnt, OPAL_VALUE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
/* if this is me, dump the data - we already have it in the db */
|
|
if (ORTE_PROC_MY_NAME->jobid == proc_name->jobid &&
|
|
ORTE_PROC_MY_NAME->vpid == proc_name->vpid) {
|
|
OBJ_RELEASE(kv);
|
|
} else {
|
|
/* store it in the database */
|
|
if (ORTE_SUCCESS != (rc = opal_db.store_pointer((*id), OPAL_DB_INTERNAL, kv))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
/* do not release the kv - the db holds that pointer */
|
|
}
|
|
}
|
|
|
|
cleanup:
|
|
return rc;
|
|
}
|
|
|
|
int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf)
|
|
{
|
|
int rc;
|
|
int32_t num_entries;
|
|
opal_value_t *kv;
|
|
opal_list_t data;
|
|
opal_list_item_t *item, *next;
|
|
opal_identifier_t *id;
|
|
|
|
/* fetch our data */
|
|
OBJ_CONSTRUCT(&data, opal_list_t);
|
|
id = (opal_identifier_t*)ORTE_PROC_MY_NAME;
|
|
if (ORTE_SUCCESS != (rc = opal_db.fetch_multiple((*id), NULL, &data))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* count the number of entries we will send, purging the rest */
|
|
num_entries = 0;
|
|
item = opal_list_get_first(&data);
|
|
while (item != opal_list_get_end(&data)) {
|
|
kv = (opal_value_t*)item;
|
|
next = opal_list_get_next(item);
|
|
/* if this is an entry we get from the nidmap, then don't include it here */
|
|
if (0 == strcmp(kv->key, ORTE_DB_HOSTNAME) ||
|
|
0 == strcmp(kv->key, ORTE_DB_DAEMON_VPID) ||
|
|
0 == strcmp(kv->key, ORTE_DB_NODERANK) ||
|
|
0 == strcmp(kv->key, ORTE_DB_LOCALRANK) ||
|
|
0 == strcmp(kv->key, ORTE_DB_BIND_LEVEL) ||
|
|
0 == strcmp(kv->key, ORTE_DB_BIND_INDEX)) {
|
|
opal_list_remove_item(&data, item);
|
|
} else {
|
|
num_entries++;
|
|
}
|
|
item = next;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
"%s grpcomm:base:pack_modex: reporting %d entries",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_entries));
|
|
|
|
/* put the number of entries into the buffer */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &num_entries, 1, OPAL_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* if there are entries, store them */
|
|
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
|
|
if (ORTE_SUCCESS != (opal_dss.pack(buf, &kv, 1, OPAL_VALUE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
|
|
cleanup:
|
|
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
OBJ_DESTRUCT(&data);
|
|
|
|
return rc;
|
|
}
|