diff --git a/ompi/runtime/ompi_module_exchange.c b/ompi/runtime/ompi_module_exchange.c index a84e8bb95d..9a2a251e4d 100644 --- a/ompi/runtime/ompi_module_exchange.c +++ b/ompi/runtime/ompi_module_exchange.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights + * Copyright (c) 2006-2012 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -64,7 +64,7 @@ ompi_modex_recv(mca_base_component_t * component, return OMPI_ERR_OUT_OF_RESOURCE; } - rc = orte_grpcomm.get_proc_attr(proc->proc_name, name, buffer, size); + rc = orte_grpcomm.get_proc_attr(&proc->proc_name, name, buffer, size); free(name); return rc; } @@ -82,7 +82,7 @@ ompi_modex_recv_string(const char* key, struct ompi_proc_t *source_proc, void **buffer, size_t *size) { - return orte_grpcomm.get_proc_attr(source_proc->proc_name, key, buffer, size); + return orte_grpcomm.get_proc_attr(&source_proc->proc_name, key, buffer, size); } int @@ -122,7 +122,7 @@ ompi_modex_recv_key_value(const char* key, bo.bytes = NULL; bo.size = 0; - if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(source_proc->proc_name, key, + if (ORTE_SUCCESS != (rc = orte_grpcomm.get_proc_attr(&source_proc->proc_name, key, (void**)&bo.bytes, &bsize))) { return rc; } diff --git a/ompi/tools/ompi_info/components.c b/ompi/tools/ompi_info/components.c index da64649222..fd356b5d89 100644 --- a/ompi/tools/ompi_info/components.c +++ b/ompi/tools/ompi_info/components.c @@ -99,6 +99,8 @@ #include "orte/mca/state/base/base.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/mca/grpcomm/base/base.h" +#include "orte/mca/db/db.h" +#include "orte/mca/db/base/base.h" #include "orte/mca/ess/ess.h" #include "orte/mca/ess/base/base.h" #include "orte/util/show_help.h" @@ -392,6 +394,14 @@ void ompi_info_open_components(void) map->components = &orte_grpcomm_base.components_available; opal_pointer_array_add(&component_map, map); + if (ORTE_SUCCESS != orte_db_base_open()) { + goto error; + } + map = OBJ_NEW(ompi_info_component_map_t); + map->type = strdup("db"); + map->components = &orte_db_base.available_components; + opal_pointer_array_add(&component_map, map); + if (ORTE_SUCCESS != orte_ess_base_open()) { goto error; } @@ -749,6 +759,7 @@ void ompi_info_close_components() (void) ompi_osc_base_close(); (void) orte_grpcomm_base_close(); + (void) orte_db_base_close(); (void) orte_ess_base_close(); (void) orte_show_help_finalize(); #if !ORTE_DISABLE_FULL_SUPPORT diff --git a/ompi/tools/ompi_info/ompi_info.c b/ompi/tools/ompi_info/ompi_info.c index 58f01cfb66..f5819bee5c 100644 --- a/ompi/tools/ompi_info/ompi_info.c +++ b/ompi/tools/ompi_info/ompi_info.c @@ -269,6 +269,7 @@ int main(int argc, char *argv[]) opal_pointer_array_add(&mca_types, "errmgr"); opal_pointer_array_add(&mca_types, "ess"); opal_pointer_array_add(&mca_types, "grpcomm"); + opal_pointer_array_add(&mca_types, "db"); opal_pointer_array_add(&mca_types, "notifier"); /* Execute the desired action(s) */ diff --git a/orte/mca/db/Makefile.am b/orte/mca/db/Makefile.am new file mode 100644 index 0000000000..5129aaf9a5 --- /dev/null +++ b/orte/mca/db/Makefile.am @@ -0,0 +1,32 @@ +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +AM_CPPFLAGS = $(LTDLINCL) + +# main library setup +noinst_LTLIBRARIES = libmca_db.la +libmca_db_la_SOURCES = + +# pkgdata setup +dist_pkgdata_DATA = + +# local files +headers = db.h +libmca_db_la_SOURCES += $(headers) + +# Conditionally install the header files +if WANT_INSTALL_HEADERS +ortedir = $(includedir)/openmpi/$(subdir) +nobase_orte_HEADERS = $(headers) +endif + +include base/Makefile.am + +distclean-local: + rm -f base/static-components.h diff --git a/orte/mca/db/base/Makefile.am b/orte/mca/db/base/Makefile.am new file mode 100644 index 0000000000..1fb7d7495c --- /dev/null +++ b/orte/mca/db/base/Makefile.am @@ -0,0 +1,18 @@ +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +dist_pkgdata_DATA += base/help-db-base.txt + +headers += \ + base/base.h + +libmca_db_la_SOURCES += \ + base/db_base_open.c \ + base/db_base_close.c \ + base/db_base_select.c diff --git a/orte/mca/db/base/base.h b/orte/mca/db/base/base.h new file mode 100644 index 0000000000..a557922fbb --- /dev/null +++ b/orte/mca/db/base/base.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + */ + +#ifndef MCA_DB_BASE_H +#define MCA_DB_BASE_H + +#include "orte_config.h" +#include "orte/types.h" + +#include "opal/mca/mca.h" +#include "opal/class/opal_list.h" +#include "opal/mca/event/event.h" + +#include "orte/mca/db/db.h" + +BEGIN_C_DECLS + +/** + * Open the db framework + */ +ORTE_DECLSPEC int orte_db_base_open(void); + +/** + * Select a db module + */ +ORTE_DECLSPEC int orte_db_base_select(void); + +/** + * Close the db framework + */ +ORTE_DECLSPEC int orte_db_base_close(void); + +typedef struct { + int output; + opal_list_t available_components; + struct timeval timeout; +} orte_db_base_t; +ORTE_DECLSPEC extern orte_db_base_t orte_db_base; + +typedef struct { + opal_list_item_t *super; + orte_process_name_t name; + char *key; + opal_event_t *ev; + orte_db_fetch_callback_fn_t cbfunc; + void *cbdata; +} orte_db_fetch_req_t; +OBJ_CLASS_DECLARATION(orte_db_fetch_req_t); + +END_C_DECLS + +#endif diff --git a/orte/mca/db/base/db_base_close.c b/orte/mca/db/base/db_base_close.c new file mode 100644 index 0000000000..c571ee9c68 --- /dev/null +++ b/orte/mca/db/base/db_base_close.c @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include + +#include "opal/mca/mca.h" +#include "opal/util/output.h" +#include "opal/mca/base/base.h" +#include "opal/mca/base/mca_base_component_repository.h" + +#include "orte/mca/db/base/base.h" + +extern opal_list_t orte_db_base_components_available; + +int +orte_db_base_close(void) +{ + if (NULL != orte_db.finalize) { + orte_db.finalize(); + } + + mca_base_components_close(orte_db_base.output, + &orte_db_base.available_components, NULL); + + OBJ_DESTRUCT(&orte_db_base.available_components); + opal_output_close(orte_db_base.output); + return ORTE_SUCCESS; +} + diff --git a/orte/mca/db/base/db_base_open.c b/orte/mca/db/base/db_base_open.c new file mode 100644 index 0000000000..496f00bbf3 --- /dev/null +++ b/orte/mca/db/base/db_base_open.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/mca/mca.h" +#include "opal/util/output.h" +#include "opal/mca/base/base.h" +#include "opal/dss/dss_types.h" + +#include "orte/mca/db/base/base.h" + + +/* + * The following file was created by configure. It contains extern + * dbments and the definition of an array of pointers to each + * module's public mca_base_module_t struct. + */ + +#include "orte/mca/db/base/static-components.h" + +orte_db_base_module_t orte_db; +orte_db_base_t orte_db_base; + +int orte_db_base_open(void) +{ + orte_db_base.output = opal_output_open(NULL); + + OBJ_CONSTRUCT(&orte_db_base.available_components, opal_list_t); + + /* Open up all available components */ + if (ORTE_SUCCESS != + mca_base_components_open("db", orte_db_base.output, mca_db_base_static_components, + &orte_db_base.available_components, + true)) { + return ORTE_ERROR; + } + + return ORTE_SUCCESS; +} + +static void fetch_construct(orte_db_fetch_req_t *fetch) +{ + fetch->key = NULL; + fetch->ev = opal_event_alloc(); +} +static void fetch_destruct(orte_db_fetch_req_t *fetch) +{ + if (NULL != fetch->key) { + free(fetch->key); + } + if (NULL != fetch->ev) { + opal_event_free(fetch->ev); + } +} +OBJ_CLASS_INSTANCE(orte_db_fetch_req_t, + opal_list_item_t, + fetch_construct, + fetch_destruct); + +static void keyval_construct(orte_db_keyval_t *ptr) +{ + ptr->key = NULL; + ptr->value.bytes = NULL; + ptr->value.size = 0; +} + +static void keyval_destruct(orte_db_keyval_t *ptr) +{ + if (NULL != ptr->key) { + free(ptr->key); + } + if (NULL != ptr->value.bytes) { + free(ptr->value.bytes); + } +} +OBJ_CLASS_INSTANCE(orte_db_keyval_t, + opal_list_item_t, + keyval_construct, + keyval_destruct); + diff --git a/orte/mca/db/base/db_base_select.c b/orte/mca/db/base/db_base_select.c new file mode 100644 index 0000000000..b0c8aad60f --- /dev/null +++ b/orte/mca/db/base/db_base_select.c @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/class/opal_list.h" +#include "opal/mca/mca.h" +#include "opal/mca/base/base.h" +#include "opal/mca/base/mca_base_component_repository.h" + +#include "orte/mca/db/base/base.h" + +extern opal_list_t orte_db_base_components_available; + +int +orte_db_base_select(void) +{ + orte_db_base_component_t *best_component = NULL; + orte_db_base_module_t *best_module = NULL; + + /* + * Select the best component + */ + if( OPAL_SUCCESS != mca_base_select("db", orte_db_base.output, + &orte_db_base.available_components, + (mca_base_module_t **) &best_module, + (mca_base_component_t **) &best_component) ) { + /* It is okay to not select a component - default + * to using the base NULL component + */ + return ORTE_SUCCESS; + } + + /* Save and init the winner */ + orte_db = *best_module; + if (NULL != orte_db.init) { + orte_db.init(); + } + + return ORTE_SUCCESS; +} diff --git a/orte/mca/db/base/help-db-base.txt b/orte/mca/db/base/help-db-base.txt new file mode 100644 index 0000000000..d2715b34ee --- /dev/null +++ b/orte/mca/db/base/help-db-base.txt @@ -0,0 +1,19 @@ + -*- text -*- +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# This is the US/English general help file for ORTE Errmgr HNP module. +# +[errmgr-hnp:unknown-job-error] +An error has occurred in an unknown job. This generally should not happen +except due to an internal ORTE error. + +Job state: %s + +This information should probably be reported to the OMPI developers. diff --git a/orte/mca/db/db.h b/orte/mca/db/db.h new file mode 100644 index 0000000000..cadec1c2b2 --- /dev/null +++ b/orte/mca/db/db.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + * + * The OpenRTE Database Framework + * + */ + +#ifndef ORTE_DB_H +#define ORTE_DB_H + +#include "orte_config.h" +#include "orte/types.h" + +#include "opal/mca/mca.h" +#include "opal/dss/dss_types.h" + +/** + * DATABASE DESIGN + * + * Data is always associated with a given orte process name. Individual + * modules may store the data local to the calling process, or on one + * or more remote sites. Time lags between when data is written and + * when it is available at a remote proc will therefore exist. + */ + +BEGIN_C_DECLS + +/** + * Container for data for a particular key-value pair + */ +typedef struct orte_db_keyval_t { + /** Structure can be put on lists */ + opal_list_item_t super; + /** Key */ + char *key; + /** Byte object containing binary blob of data associated with this proc,key pair */ + opal_byte_object_t value; +} orte_db_keyval_t; +OBJ_CLASS_DECLARATION(orte_db_keyval_t); + +/* define the callback function for returning data - note that + * the memory backing the data belongs to the DB framework. The + * receiver must NOT release it + */ +typedef void (*orte_db_fetch_callback_fn_t)(orte_process_name_t *src, + char *key, + orte_db_keyval_t *data, + int num_entries); + +/* + * Initialize the module + */ +typedef int (*orte_db_base_module_init_fn_t)(void); + +/* + * Finalize the module + */ +typedef void (*orte_db_base_module_finalize_fn_t)(void); + +/* + * Store data in the database - overwrites if already present. The data is + * copied into the database and therefore does not need to be preserved by + * the caller. Note that this is a non-blocking call - if data is stored + * offsite, the transfer will occur in the background. + */ +typedef int (*orte_db_base_module_store_fn_t)(const orte_process_name_t *proc, + const char *key, + const void *object, int32_t size); + +/* + * Retrieve data + * + * Retrieve the data for the given proc associated with the specified key. Wildcards + * are supported here as well. This is a non-blocking + * call - data will be returned via the callback function ONCE IT BECOMES AVAILABLE. Use + * of the "timeout" MCA parameter is encouraged to avoid hanging on fetch requests for + * "blocking" data that can never be resolved. + * + * NOTE: INTERIM IMPLEMENTATION WILL SIMPLY LOOKUP EXISTING DATA, RETURNING AN ERROR IF + * NOT ALREADY PRESENT. + */ +typedef int (*orte_db_base_module_fetch_fn_t)(const orte_process_name_t *proc, + const char *key, + opal_list_t *values); + +/* + * Delete data + * + * Delete the data associated with the specified key. If a NULL key is provided, + * all data for the given proc will be deleted. + * + * This function also supports wildcard values in the proc field. A NULL proc indicates + * that ALL data in the database is to be purged. A WILDCARD vpid will delete all matching + * keys from that jobid. Etc. + * + * Note that this is a non-blocking call - data stored off-site will be deleted asynchronously. + */ +typedef int (*orte_db_base_module_remove_fn_t)(const orte_process_name_t *proc, const char *key); + +/* + * the standard module data structure + */ +struct orte_db_base_module_1_0_0_t { + orte_db_base_module_init_fn_t init; + orte_db_base_module_finalize_fn_t finalize; + orte_db_base_module_store_fn_t store; + orte_db_base_module_fetch_fn_t fetch; + orte_db_base_module_remove_fn_t remove; +}; +typedef struct orte_db_base_module_1_0_0_t orte_db_base_module_1_0_0_t; +typedef struct orte_db_base_module_1_0_0_t orte_db_base_module_t; + +/* + * the standard component data structure + */ +struct orte_db_base_component_1_0_0_t { + mca_base_component_t base_version; + mca_base_component_data_t base_data; +}; +typedef struct orte_db_base_component_1_0_0_t orte_db_base_component_1_0_0_t; +typedef struct orte_db_base_component_1_0_0_t orte_db_base_component_t; + +/* + * Macro for use in components that are of type db + */ +#define ORTE_DB_BASE_VERSION_1_0_0 \ + MCA_BASE_VERSION_2_0_0, \ + "db", 1, 0, 0 + +/* Global structure for accessing DB functions */ +ORTE_DECLSPEC extern orte_db_base_module_t orte_db; /* holds selected module's function pointers */ + +END_C_DECLS + +#endif diff --git a/orte/mca/db/dbase/.ompi_ignore b/orte/mca/db/dbase/.ompi_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/db/dbase/Makefile.am b/orte/mca/db/dbase/Makefile.am new file mode 100644 index 0000000000..99d2e1a681 --- /dev/null +++ b/orte/mca/db/dbase/Makefile.am @@ -0,0 +1,38 @@ +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + db_dbase.h \ + db_dbase_component.c \ + db_dbase.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_db_dbase_DSO +component_noinst = +component_install = mca_db_dbase.la +else +component_noinst = libmca_db_dbase.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_db_dbase_la_CPPFLAGS = $(db_dbase_CPPFLAGS) +mca_db_dbase_la_SOURCES = $(sources) +mca_db_dbase_la_LDFLAGS = -module -avoid-version $(db_dbase_LDFLAGS) +mca_db_dbase_la_LIBADD = $(db_dbase_LIBS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_db_dbase_la_CPPFLAGS = $(db_dbase_CPPFLAGS) +libmca_db_dbase_la_SOURCES =$(sources) +libmca_db_dbase_la_LDFLAGS = -module -avoid-version $(db_dbase_LDFLAGS) +libmca_db_dbase_la_LIBADD = $(db_dbase_LIBS) diff --git a/orte/mca/db/dbase/configure.m4 b/orte/mca/db/dbase/configure.m4 new file mode 100644 index 0000000000..cefe0f4f84 --- /dev/null +++ b/orte/mca/db/dbase/configure.m4 @@ -0,0 +1,33 @@ +dnl -*- shell-script -*- +dnl +dnl Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +dnl Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. +dnl $COPYRIGHT$ +dnl +dnl Additional copyrights may follow +dnl +dnl $HEADER$ +dnl + +# MCA_db_dbase_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_orte_db_dbase_CONFIG], [ + AC_CONFIG_FILES([orte/mca/db/dbase/Makefile]) + + # do not build if rte is disabled + AS_IF([test "$orte_without_full_support" = 0], + [OMPI_CHECK_PACKAGE([db_db], + [db.h], + [db], + [dbopen], + [], + [], + [], + [$1], + [$2])], + [$2]) + + AC_SUBST(db_dbase_CPPFLAGS) + AC_SUBST(db_dbase_LDFLAGS) + AC_SUBST(db_dbase_LIBS) +])dnl diff --git a/orte/mca/db/dbase/db_dbase.c b/orte/mca/db/dbase/db_dbase.c new file mode 100644 index 0000000000..1d3534ead9 --- /dev/null +++ b/orte/mca/db/dbase/db_dbase.c @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include +#include +#ifdef HAVE_LIMITS_H +#include +#endif +#include +#ifdef HAVE_FCNTL_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif +#include + +#include "opal/dss/dss_types.h" +#include "opal/util/os_dirpath.h" +#include "opal/util/os_path.h" +#include "opal/util/output.h" +#include "opal/util/malloc.h" +#include "opal/util/basename.h" +#include "opal/mca/pstat/base/base.h" +#include "opal/mca/paffinity/base/base.h" +#include "opal/mca/sysinfo/base/base.h" + +#include "orte/util/show_help.h" +#include "orte/mca/errmgr/base/base.h" +#include "orte/runtime/orte_globals.h" + +#include "db_dbase.h" + +static int init(void); +static int finalize(void); +static int store(orte_process_name_t *proc, char *key, void *object, size_t size); +static int fetch(orte_process_name_t *proc, + char *key, void **object, size_t *size); +static int delete_data(orte_process_name_t *proc,char *key); + +orte_db_base_module_t orte_db_dbase_module = { + init, + finalize, + store, + fetch, + delete_data +}; + +/* local variables */ +static DB *save_dbase=NULL, *recover_dbase=NULL; + +static int init(void) +{ + char *path, *name; + + /* setup the database */ + if (ORTE_SUCCESS != opal_os_dirpath_create(orte_db_dbase_directory, S_IRWXU)) { + orte_show_help("help-db-dbase.txt", "cannot-create-dir", true, + orte_db_dbase_directory); + return ORTE_ERR_FILE_OPEN_FAILURE; + } + orte_util_convert_process_name_to_string(&name, ORTE_PROC_MY_NAME); + path = opal_os_path(false, orte_db_dbase_directory, name, NULL); + free(name); + if (NULL == (save_dbase = dbaseopen(path, O_CREAT | O_RDWR | O_TRUNC, S_IRWXU, DB_HASH, NULL))) { + orte_show_help("help-db-dbase.txt", "cannot-create-dbase", true, path); + free(path); + return ORTE_ERR_FILE_OPEN_FAILURE; + } + free(path); + + return ORTE_SUCCESS; +} + +static int finalize(void) +{ + /* if we are normally terminating, remove the recovery file */ + + return ORTE_SUCCESS; +} + +static int store(orte_process_name_t *proc, char *key, void *object, size_t size) +{ + DBT key, data; + opal_buffer_t buf; + orte_job_t *jdata; + orte_proc_t *proc; + int rc=ORTE_SUCCESS, size; + + /* construct the buffer we will use for packing the data */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + key.data = inkey; + key.size = strlen(key.data); + + switch (type) { + case ORTE_JOB: + jdata = (orte_job_t*)object; + opal_dss.pack(&buf, &jdata, 1, ORTE_JOB); + break; + case ORTE_PROC: + proc = (orte_proc_t*)object; + opal_dss.pack(&buf, &proc, 1, ORTE_PROC); + break; + default: + orte_show_help("help-db-dbase.txt", "unrecognized-type", true, type); + rc = ORTE_ERR_BAD_PARAM; + goto cleanup; + break; + } + + /* unload the data */ + opal_dss.unload(&buf, (void**)&data.data, &size); + data.size = size; + OBJ_DESTRUCT(&buf); + + /* put the info into the dbase */ + if (0 > save_dbase->put(save_dbase, &key, &data, 0)) { + orte_show_help("help-db-dbase.txt", "error-writing-dbase", true, (char*)key.data, strerror(errno)); + rc = ORTE_ERR_FILE_WRITE_FAILURE; + } + /* sync it to force it to disk */ + if (0 > save_dbase->sync(save_dbase, 0)) { + orte_show_help("help-db-dbase.txt", "error-syncing-dbase", true, (char*)key.data, strerror(errno)); + rc = ORTE_ERR_FILE_WRITE_FAILURE; + } + +cleanup: + /* cleanup */ + if (NULL != key.data) { + free(key.data); + } + if (NULL != data.data) { + free(data.data); + } + return rc; +} + +static int fetch(orte_process_name_t *proc, + char *key, void **object, size_t *size) +{ + DBT key, data; + opal_buffer_t buf; + orte_job_t *jdata; + orte_proc_t *proc; + int rc=ORTE_SUCCESS; + int32_t n; + + if (NULL == recover_dbase) { + orte_show_help("help-db-dbase.txt", "recover-source-undef", true); + rc = ORTE_ERR_NOT_FOUND; + } + + /* construct the buffer we will use for unpacking the data */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + key.data = inkey; + key.size = strlen(key.data); + + /* get the specified data */ + if (0 > recover_dbase->get(recover_dbase, &key, &data, 0)) { + orte_show_help("help-db-dbase.txt", "error-reading-dbase", true, (char*)key.data, strerror(errno)); + rc = ORTE_ERR_FILE_READ_FAILURE; + goto cleanup; + } + + /* populate the recovered info */ + opal_dss.load(&buf, data.data, data.size); + switch (type) { + case ORTE_JOB: + n=1; + opal_dss.unpack(&buf, &jdata, &n, ORTE_JOB); + break; + case ORTE_PROC: + n=1; + opal_dss.unpack(&buf, &proc, &n, ORTE_PROC); + break; + default: + break; + } + +cleanup: + OBJ_DESTRUCT(&buf); + return rc; +} + +static int update(char *key, void *object, opal_data_type_t type) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} + +static int delete_data(char *key) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} diff --git a/orte/mca/db/dbase/db_dbase.h b/orte/mca/db/dbase/db_dbase.h new file mode 100644 index 0000000000..ede0b884c4 --- /dev/null +++ b/orte/mca/db/dbase/db_dbase.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_DB_DBASE_H +#define ORTE_DB_DBASE_H + +#include "orte/mca/db/db.h" + +BEGIN_C_DECLS + +/* + * Module open / close + */ +int orte_db_dbase_component_open(void); +int orte_db_dbase_component_close(void); +int orte_db_dbase_component_query(mca_base_module_t **module, int *priority); + + +ORTE_MODULE_DECLSPEC extern orte_db_base_component_t mca_db_dbase_component; +ORTE_DECLSPEC extern orte_db_base_module_t orte_db_dbase_module; +extern char *orte_db_dbase_directory; + +END_C_DECLS + +#endif /* ORTE_DB_DBASE_H */ diff --git a/orte/mca/db/dbase/db_dbase_component.c b/orte/mca/db/dbase/db_dbase_component.c new file mode 100644 index 0000000000..764465540e --- /dev/null +++ b/orte/mca/db/dbase/db_dbase_component.c @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * These symbols are in a file by themselves to provide nice linker + * semantics. Since linkers generally pull in symbols by object + * files, keeping these symbols as the only symbols in this file + * prevents utility programs such as "ompi_info" from having to import + * entire components just to query their version and parameters. + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/mca/base/base.h" +#include "opal/mca/base/mca_base_param.h" + +#include "orte/util/proc_info.h" + +#include "orte/mca/db/db.h" +#include "orte/mca/db/base/base.h" +#include "db_dbase.h" + +extern orte_db_base_module_t orte_db_dbase_module; +char *orte_db_dbase_directory; + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +orte_db_base_component_t mca_db_dbase_component = { + { + ORTE_DB_BASE_VERSION_1_0_0, + + /* Component name and version */ + "dbase", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + orte_db_dbase_component_open, + orte_db_dbase_component_close, + orte_db_dbase_component_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + + +int +orte_db_dbase_component_open(void) +{ + return ORTE_SUCCESS; +} + + +int orte_db_dbase_component_query(mca_base_module_t **module, int *priority) +{ + + /* we are the file module - we need to be selected + * IFF we are requested + */ + bool is_required = false; + mca_base_component_t *c = &mca_db_dbase_component.base_version; + + /* retrieve the name of the file to be used */ + mca_base_param_reg_string(c, "dir", + "Name of directory to be used for storing and recovering db information", + false, false, NULL, &orte_db_dbase_directory); + + mca_base_is_component_required(&orte_db_base_components_available, + &mca_db_dbase_component.base_version, + true, + &is_required); + + if (is_required && NULL != orte_db_dbase_directory) { + *priority = 1000; + *module = (mca_base_module_t*)&orte_db_dbase_module; + return ORTE_SUCCESS; + } + + + *priority = 0; + *module = NULL; + return ORTE_ERROR; +} + + +int +orte_db_dbase_component_close(void) +{ + return ORTE_SUCCESS; +} + diff --git a/orte/mca/db/dbm/.ompi_ignore b/orte/mca/db/dbm/.ompi_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/db/dbm/Makefile.am b/orte/mca/db/dbm/Makefile.am new file mode 100644 index 0000000000..dbfbf51542 --- /dev/null +++ b/orte/mca/db/dbm/Makefile.am @@ -0,0 +1,38 @@ +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + db_dbm.h \ + db_dbm_component.c \ + db_dbm.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_db_dbm_DSO +component_noinst = +component_install = mca_db_dbm.la +else +component_noinst = libmca_db_dbm.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_db_dbm_la_CPPFLAGS = $(db_dbm_CPPFLAGS) +mca_db_dbm_la_SOURCES = $(sources) +mca_db_dbm_la_LDFLAGS = -module -avoid-version $(db_dbm_LDFLAGS) +mca_db_dbm_la_LIBADD = $(db_dbm_LIBS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_db_dbm_la_CPPFLAGS = $(db_dbm_CPPFLAGS) +libmca_db_dbm_la_SOURCES =$(sources) +libmca_db_dbm_la_LDFLAGS = -module -avoid-version $(db_dbm_LDFLAGS) +libmca_db_dbm_la_LIBADD = $(db_dbm_LIBS) diff --git a/orte/mca/db/dbm/configure.m4 b/orte/mca/db/dbm/configure.m4 new file mode 100644 index 0000000000..36355fc327 --- /dev/null +++ b/orte/mca/db/dbm/configure.m4 @@ -0,0 +1,33 @@ +dnl -*- shell-script -*- +dnl +dnl Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +dnl Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. +dnl $COPYRIGHT$ +dnl +dnl Additional copyrights may follow +dnl +dnl $HEADER$ +dnl + +# MCA_db_dbm_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_orte_db_dbm_CONFIG], [ + AC_CONFIG_FILES([orte/mca/db/dbm/Makefile]) + + # do not build if rte is disabled + AS_IF([test "$orte_without_full_support" = 0], + [OMPI_CHECK_PACKAGE([db_dbm], + [ndbm.h], + [dbm], + [dbm_open], + [], + [], + [], + [$1], + [$2])], + [$2]) + + AC_SUBST(db_dbm_CPPFLAGS) + AC_SUBST(db_dbm_LDFLAGS) + AC_SUBST(db_dbm_LIBS) +])dnl diff --git a/orte/mca/db/dbm/db_dbm.c b/orte/mca/db/dbm/db_dbm.c new file mode 100644 index 0000000000..dac4faa816 --- /dev/null +++ b/orte/mca/db/dbm/db_dbm.c @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include +#include +#ifdef HAVE_LIMITS_H +#include +#endif +#include +#ifdef HAVE_FCNTL_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif +#include + +#include "opal/dss/dss_types.h" +#include "opal/util/os_dirpath.h" +#include "opal/util/os_path.h" +#include "opal/util/output.h" +#include "opal/util/malloc.h" +#include "opal/util/basename.h" +#include "opal/mca/pstat/base/base.h" +#include "opal/mca/paffinity/base/base.h" + +#include "orte/util/show_help.h" +#include "orte/mca/errmgr/base/base.h" +#include "orte/runtime/orte_globals.h" + +#include "db_dbm.h" + +static int init(void); +static int finalize(void); +static int insert(char *key, void *object, opal_data_type_t type); +static int set_recover_source(orte_process_name_t *name); +static int select_data(char *key, void *object, opal_data_type_t type); +static int update(char *key, void *object, opal_data_type_t type); +static int delete_data(char *key); + +orte_db_base_module_t orte_db_dbm_module = { + init, + finalize, + insert, + set_recover_source, + select_data, + update, + delete_data +}; + +/* local variables */ +static DBM *save_dbm=NULL, *recover_dbm=NULL; + +static int init(void) +{ + char *path, *name; + + /* setup the database */ + if (ORTE_SUCCESS != opal_os_dirpath_create(orte_db_dbm_directory, S_IRWXU)) { + orte_show_help("help-db-dbm.txt", "cannot-create-dir", true, + orte_db_dbm_directory); + return ORTE_ERR_FILE_OPEN_FAILURE; + } + orte_util_convert_process_name_to_string(&name, ORTE_PROC_MY_NAME); + path = opal_os_path(false, orte_db_dbm_directory, name, NULL); + free(name); + if (NULL == (save_dbm = dbm_open(path, O_CREAT | O_RDWR | O_TRUNC, S_IRWXU))) { + orte_show_help("help-db-dbm.txt", "cannot-create-dbm", true, path); + free(path); + return ORTE_ERR_FILE_OPEN_FAILURE; + } + free(path); + + return ORTE_SUCCESS; +} + +static int finalize(void) +{ + /* if we are normally terminating, remove the recovery file */ + + return ORTE_SUCCESS; +} + +static int insert(char *inkey, void *object, opal_data_type_t type) +{ + datum key, data; + opal_buffer_t buf; + orte_job_t *jdata; + orte_proc_t *proc; + int rc=ORTE_SUCCESS, size; + + /* construct the buffer we will use for packing the data */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + key.dptr = inkey; + key.dsize = strlen(key.dptr); + data.dptr = NULL; + + switch (type) { + case ORTE_JOB: + jdata = (orte_job_t*)object; + opal_dss.pack(&buf, &jdata, 1, ORTE_JOB); + break; + case ORTE_PROC: + proc = (orte_proc_t*)object; + opal_dss.pack(&buf, &proc, 1, ORTE_PROC); + break; + default: + orte_show_help("help-db-db.txt", "unrecognized-type", true, type); + rc = ORTE_ERR_BAD_PARAM; + goto cleanup; + break; + } + + /* unload the data */ + opal_dss.unload(&buf, (void**)&data.dptr, &size); + data.dsize = size; + OBJ_DESTRUCT(&buf); + + /* put the info into the dbm */ + if (0 > dbm_store(save_dbm, key, data, DBM_REPLACE)) { + orte_show_help("help-db-dbm.txt", "error-writing-dbm", true, (char*)key.dptr, strerror(errno)); + rc = ORTE_ERR_FILE_WRITE_FAILURE; + } + +cleanup: + /* cleanup */ + if (NULL != key.dptr) { + free(key.dptr); + } + if (NULL != data.dptr) { + free(data.dptr); + } + return rc; +} + +static int set_recover_source(orte_process_name_t *name) +{ + char *path, *pname; + int rc=ORTE_SUCCESS; + + /* setup the database */ + orte_util_convert_process_name_to_string(&pname, name); + path = opal_os_path(false, orte_db_dbm_directory, pname, NULL); + free(pname); + if (NULL == (recover_dbm = dbm_open(path, O_RDONLY, S_IRWXU))) { + orte_show_help("help-db-dbm.txt", "cannot-open-dbm", true, path); + free(path); + return ORTE_ERR_FILE_OPEN_FAILURE; + } + free(path); + + return rc; +} + +static int select_data(char *inkey, void *object, opal_data_type_t type) +{ + datum key, data; + opal_buffer_t buf; + orte_job_t *jdata; + orte_proc_t *proc; + int rc=ORTE_SUCCESS; + int32_t n; + + if (NULL == recover_dbm) { + orte_show_help("help-db-dbm.txt", "recover-source-undef", true); + rc = ORTE_ERR_NOT_FOUND; + } + + /* construct the buffer we will use for unpacking the data */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + key.dptr = inkey; + key.dsize = strlen(key.dptr); + data.dptr = NULL; + + /* get the specified data */ + data = dbm_fetch(recover_dbm, key); + if (NULL == data.dptr) { + orte_show_help("help-db-dbm.txt", "error-reading-dbm", true, (char*)key.dptr, strerror(errno)); + rc = ORTE_ERR_FILE_READ_FAILURE; + goto cleanup; + } + + /* populate the recovered info */ + opal_dss.load(&buf, data.dptr, data.dsize); + switch (type) { + case ORTE_JOB: + n=1; + opal_dss.unpack(&buf, &jdata, &n, ORTE_JOB); + break; + case ORTE_PROC: + n=1; + opal_dss.unpack(&buf, &proc, &n, ORTE_PROC); + break; + default: + break; + } + +cleanup: + OBJ_DESTRUCT(&buf); + return rc; +} + +static int update(char *key, void *object, opal_data_type_t type) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} + +static int delete_data(char *key) +{ + return ORTE_ERR_NOT_IMPLEMENTED; +} diff --git a/orte/mca/db/dbm/db_dbm.h b/orte/mca/db/dbm/db_dbm.h new file mode 100644 index 0000000000..c7b68d5fbd --- /dev/null +++ b/orte/mca/db/dbm/db_dbm.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_DB_DBM_H +#define ORTE_DB_DBM_H + +#include "orte/mca/db/db.h" + +BEGIN_C_DECLS + +/* + * Module open / close + */ +int orte_db_dbm_component_open(void); +int orte_db_dbm_component_close(void); +int orte_db_dbm_component_query(mca_base_module_t **module, int *priority); + + +ORTE_MODULE_DECLSPEC extern orte_db_base_component_t mca_db_dbm_component; +ORTE_DECLSPEC extern orte_db_base_module_t orte_db_dbm_module; +extern char *orte_db_dbm_directory; + +END_C_DECLS + +#endif /* ORTE_DB_DBM_H */ diff --git a/orte/mca/db/dbm/db_dbm_component.c b/orte/mca/db/dbm/db_dbm_component.c new file mode 100644 index 0000000000..6e2a1c9161 --- /dev/null +++ b/orte/mca/db/dbm/db_dbm_component.c @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * These symbols are in a file by themselves to provide nice linker + * semantics. Since linkers generally pull in symbols by object + * files, keeping these symbols as the only symbols in this file + * prevents utility programs such as "ompi_info" from having to import + * entire components just to query their version and parameters. + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/mca/base/base.h" +#include "opal/mca/base/mca_base_param.h" + +#include "orte/util/proc_info.h" + +#include "orte/mca/db/db.h" +#include "orte/mca/db/base/base.h" +#include "db_dbm.h" + +extern orte_db_base_module_t orte_db_dbm_module; +char *orte_db_dbm_directory; + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +orte_db_base_component_t mca_db_dbm_component = { + { + ORTE_DB_BASE_VERSION_1_0_0, + + /* Component name and version */ + "dbm", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + orte_db_dbm_component_open, + orte_db_dbm_component_close, + orte_db_dbm_component_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + + +int +orte_db_dbm_component_open(void) +{ + return ORTE_SUCCESS; +} + + +int orte_db_dbm_component_query(mca_base_module_t **module, int *priority) +{ + mca_base_component_t *c = &mca_db_dbm_component.base_version; + + /* retrieve the name of the file to be used */ + mca_base_param_reg_string(c, "dir", + "Name of directory to be used for storing and recovering db information", + false, false, NULL, &orte_db_dbm_directory); + + if (NULL != orte_db_dbm_directory) { + *priority = 50; + *module = (mca_base_module_t*)&orte_db_dbm_module; + return ORTE_SUCCESS; + } + + + *priority = 0; + *module = NULL; + return ORTE_ERROR; +} + + +int +orte_db_dbm_component_close(void) +{ + return ORTE_SUCCESS; +} + diff --git a/orte/mca/db/hash/Makefile.am b/orte/mca/db/hash/Makefile.am new file mode 100644 index 0000000000..d3898d60be --- /dev/null +++ b/orte/mca/db/hash/Makefile.am @@ -0,0 +1,36 @@ +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + db_hash.h \ + db_hash_component.c \ + db_hash.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_db_hash_DSO +component_noinst = +component_install = mca_db_hash.la +else +component_noinst = libmca_db_hash.la +component_install = +endif + +mcacomponentdir = $(pkglibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_db_hash_la_SOURCES = $(sources) +mca_db_hash_la_LDFLAGS = -module -avoid-version +mca_db_hash_la_LIBADD = $(db_hash_LIBS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_db_hash_la_SOURCES =$(sources) +libmca_db_hash_la_LDFLAGS = -module -avoid-version diff --git a/orte/mca/db/hash/configure.m4 b/orte/mca/db/hash/configure.m4 new file mode 100644 index 0000000000..c7b6da711b --- /dev/null +++ b/orte/mca/db/hash/configure.m4 @@ -0,0 +1,21 @@ +# -*- shell-script -*- +# +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_db_hash_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_orte_db_hash_CONFIG], [ + AC_CONFIG_FILES([orte/mca/db/hash/Makefile]) + + # do not build if rte is disabled + AS_IF([test "$orte_without_full_support" = 0], + [$1], + [$2]) +])dnl diff --git a/orte/mca/db/hash/db_hash.c b/orte/mca/db/hash/db_hash.c new file mode 100644 index 0000000000..480ac01906 --- /dev/null +++ b/orte/mca/db/hash/db_hash.c @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include + +#include "opal/class/opal_hash_table.h" +#include "opal/class/opal_pointer_array.h" +#include "opal/dss/dss_types.h" +#include "opal/util/output.h" + +#include "orte/util/show_help.h" +#include "orte/util/name_fns.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/runtime/orte_globals.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/db/base/base.h" +#include "db_hash.h" + +static int init(void); +static void finalize(void); +static int store(const orte_process_name_t *proc, + const char *key, const void *object, size_t size); +static int fetch(const orte_process_name_t *proc, + const char *key, + opal_list_t *values); +static int remove_data(const orte_process_name_t *proc, const char *key); + +orte_db_base_module_t orte_db_hash_module = { + init, + finalize, + store, + fetch, + remove_data +}; + +/* Local "globals" */ +static opal_pointer_array_t job_data; + +/** + * Data for a particular orte process + * The name association is maintained in the + * proc_data hash table. + */ +typedef struct { + /** Structure can be put on lists (including in hash tables) */ + opal_list_item_t super; + /* List of local_data_t structures containing all data + received from this process, sorted by key. */ + opal_list_t data; +} proc_data_t; + +static void proc_data_construct(proc_data_t *ptr) +{ + OBJ_CONSTRUCT(&ptr->data, opal_list_t); +} + +static void proc_data_destruct(proc_data_t *ptr) +{ + opal_list_item_t *item; + + while (NULL != (item = opal_list_remove_first(&ptr->data))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&ptr->data); +} +OBJ_CLASS_INSTANCE(proc_data_t, opal_list_item_t, + proc_data_construct, proc_data_destruct); + + +/* Data for a given job + */ +typedef struct { + opal_object_t super; + orte_jobid_t jobid; + opal_hash_table_t *data; +} job_data_t; + +static void jobdata_constructor(job_data_t *ptr) +{ + ptr->jobid = ORTE_JOBID_INVALID; + ptr->data = OBJ_NEW(opal_hash_table_t); + opal_hash_table_init(ptr->data, 256); +} +static void jobdata_destructor(job_data_t *ptr) +{ + opal_hash_table_remove_all(ptr->data); + OBJ_RELEASE(ptr->data); +} +OBJ_CLASS_INSTANCE(job_data_t, + opal_object_t, + jobdata_constructor, + jobdata_destructor); + + +static int init(void) +{ + OBJ_CONSTRUCT(&job_data, opal_pointer_array_t); + opal_pointer_array_init(&job_data, 1, INT_MAX, 1); + return ORTE_SUCCESS; +} + +static void finalize(void) +{ + int i; + job_data_t *jtable; + + for (i=0; i < job_data.size; i++) { + if (NULL == (jtable = (job_data_t*)opal_pointer_array_get_item(&job_data, i))) { + continue; + } + OBJ_RELEASE(jtable); + } + OBJ_DESTRUCT(&job_data); +} + + + +/** + * Find data for a given key in a given proc_data_t + * container. + */ +static orte_db_keyval_t* lookup_keyval(proc_data_t *proc_data, + const char *key) +{ + orte_db_keyval_t *kv = NULL; + for (kv = (orte_db_keyval_t *) opal_list_get_first(&proc_data->data); + kv != (orte_db_keyval_t *) opal_list_get_end(&proc_data->data); + kv = (orte_db_keyval_t *) opal_list_get_next(kv)) { + if (0 == strcmp(key, kv->key)) { + return kv; + } + } + + return NULL; +} + + +/** +* Find proc_data_t container associated with given + * orte_process_name_t. + */ +static proc_data_t* lookup_orte_proc(opal_hash_table_t *jtable, orte_vpid_t vpid) +{ + proc_data_t *proc_data = NULL; + + opal_hash_table_get_value_uint32(jtable, orte_util_hash_vpid(vpid), (void**)&proc_data); + if (NULL == proc_data) { + /* The proc clearly exists, so create a data structure for it */ + proc_data = OBJ_NEW(proc_data_t); + if (NULL == proc_data) { + opal_output(0, "db:hash:lookup_orte_proc: unable to allocate proc_data_t\n"); + return NULL; + } + opal_hash_table_set_value_uint32(jtable, orte_util_hash_vpid(vpid), proc_data); + } + + return proc_data; +} + +static int store(const orte_process_name_t *proc, + const char *key, const void *object, size_t size) +{ + int i; + job_data_t *jtable, *jtab; + proc_data_t *proc_data; + orte_db_keyval_t *kv; + + OPAL_OUTPUT_VERBOSE((5, orte_db_base.output, + "%s db:hash:store: storing key %s data size %lu for proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + key, (unsigned long)size, ORTE_NAME_PRINT(proc))); + + /* get the job data object for this proc */ + jtable = NULL; + for (i=0; i < job_data.size; i++) { + if (NULL == (jtab = (job_data_t*)opal_pointer_array_get_item(&job_data, i))) { + continue; + } + if (jtab->jobid == proc->jobid) { + jtable = jtab; + break; + } + } + if (NULL == jtable) { + /* need to add an entry for this job */ + jtable = OBJ_NEW(job_data_t); + jtable->jobid = proc->jobid; + opal_pointer_array_add(&job_data, jtable); + } + + /* lookup the proc data object for this proc */ + if (NULL == (proc_data = lookup_orte_proc(jtable->data, proc->vpid))) { + /* unrecoverable error */ + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* see if we already have this key in the data - means we are updating + * a pre-existing value + */ + if (NULL != (kv = lookup_keyval(proc_data, key))) { + /* release the old data */ + if (NULL != kv->value.bytes) { + free(kv->value.bytes); + } + } else { + kv = OBJ_NEW(orte_db_keyval_t); + kv->key = strdup(key); + opal_list_append(&proc_data->data, &kv->super); + } + kv->value.bytes = (uint8_t*)malloc(size); + memcpy(kv->value.bytes, object, size); + kv->value.size = size; + + return ORTE_SUCCESS; +} + +static int fetch(const orte_process_name_t *proc, + const char *key, + opal_list_t *values) +{ + int i; + job_data_t *jtable, *jtab; + proc_data_t *proc_data; + orte_db_keyval_t *kv, *ans; + + OPAL_OUTPUT_VERBOSE((5, orte_db_base.output, + "%s db:hash:fetch: searching for key %s on proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == key) ? "NULL" : key, ORTE_NAME_PRINT(proc))); + + /* get the job data object for this proc */ + jtable = NULL; + for (i=0; i < job_data.size; i++) { + if (NULL == (jtab = (job_data_t*)opal_pointer_array_get_item(&job_data, i))) { + continue; + } + if (jtab->jobid == proc->jobid) { + jtable = jtab; + break; + } + } + if (NULL == jtable) { + /* eventually, we will fetch this data - but for now, this + * is simply an error + */ + return ORTE_ERR_NOT_FOUND; + } + + /* lookup the proc data object for this proc */ + if (NULL == (proc_data = lookup_orte_proc(jtable->data, proc->vpid))) { + /* unrecoverable error */ + return ORTE_ERR_OUT_OF_RESOURCE; + } + + /* if the key is NULL, then return all data for this proc */ + if (NULL == key) { + for (kv = (orte_db_keyval_t *) opal_list_get_first(&proc_data->data); + kv != (orte_db_keyval_t *) opal_list_get_end(&proc_data->data); + kv = (orte_db_keyval_t *) opal_list_get_next(kv)) { + ans = OBJ_NEW(orte_db_keyval_t); + ans->key = strdup(kv->key); + ans->value.bytes = malloc(kv->value.size); + memcpy(ans->value.bytes, kv->value.bytes, kv->value.size); + ans->value.size = kv->value.size; + opal_list_append(values, &ans->super); + } + return ORTE_SUCCESS; + } + + /* find the value */ + if (NULL == (kv = lookup_keyval(proc_data, key))) { + /* again, we eventually will attempt to fetch the data - for + * now, just report it as an error */ + return ORTE_ERR_NOT_FOUND; + } + + /* copy the data across */ + ans = OBJ_NEW(orte_db_keyval_t); + ans->value.bytes = (uint8_t*)malloc(kv->value.size); + memcpy(ans->value.bytes, kv->value.bytes, kv->value.size); + ans->value.size = kv->value.size; + opal_list_append(values, &ans->super); + + return ORTE_SUCCESS; +} + +static int remove_data(const orte_process_name_t *proc, const char *key) +{ + int i, save_loc; + job_data_t *jtable, *jtab; + proc_data_t *proc_data; + orte_db_keyval_t *kv; + + /* if proc is NULL, remove all data from the database */ + if (NULL == proc) { + for (i=0; i < job_data.size; i++) { + if (NULL == (jtable = (job_data_t*)opal_pointer_array_get_item(&job_data, i))) { + continue; + } + OBJ_RELEASE(jtable); + } + /* leave the job pointer array itself as we may add data back */ + return ORTE_SUCCESS; + } + + /* lookup the specified jobid */ + jtable = NULL; + for (i=0; i < job_data.size; i++) { + if (NULL == (jtab = (job_data_t*)opal_pointer_array_get_item(&job_data, i))) { + continue; + } + if (jtab->jobid == proc->jobid) { + jtable = jtab; + save_loc = i; + break; + } + } + if (NULL == jtable) { + /* don't have any data for this job */ + return ORTE_SUCCESS; + } + + /* if vpid is WILDCARD, remove all data for this job */ + if (ORTE_VPID_WILDCARD == proc->vpid) { + opal_pointer_array_set_item(&job_data, save_loc, NULL); + OBJ_RELEASE(jtable); + return ORTE_SUCCESS; + } + + /* lookup the specified proc */ + if (NULL == (proc_data = lookup_orte_proc(jtable->data, proc->vpid))) { + /* no data for this proc */ + return ORTE_SUCCESS; + } + + /* if key is NULL, remove all data for this proc */ + if (NULL == key) { + while (NULL != (kv = (orte_db_keyval_t *) opal_list_remove_first(&proc_data->data))) { + OBJ_RELEASE(kv); + } + /* remove the proc_data object itself from the jtable */ + opal_hash_table_remove_value_uint32(jtable->data, orte_util_hash_vpid(proc->vpid)); + /* cleanup */ + OBJ_RELEASE(proc_data); + return ORTE_SUCCESS; + } + + /* remove this item */ + for (kv = (orte_db_keyval_t*) opal_list_get_first(&proc_data->data); + kv != (orte_db_keyval_t*) opal_list_get_end(&proc_data->data); + kv = (orte_db_keyval_t*) opal_list_get_next(kv)) { + if (0 == strcmp(key, kv->key)) { + OBJ_RELEASE(kv); + break; + } + } + + return ORTE_SUCCESS; +} + diff --git a/orte/mca/db/hash/db_hash.h b/orte/mca/db/hash/db_hash.h new file mode 100644 index 0000000000..dd4c9849c2 --- /dev/null +++ b/orte/mca/db/hash/db_hash.h @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_DB_HASH_H +#define ORTE_DB_HASH_H + +#include "orte/mca/db/db.h" + +BEGIN_C_DECLS + + +ORTE_MODULE_DECLSPEC extern orte_db_base_component_t mca_db_hash_component; +ORTE_DECLSPEC extern orte_db_base_module_t orte_db_hash_module; + +END_C_DECLS + +#endif /* ORTE_DB_HASH_H */ diff --git a/orte/mca/db/hash/db_hash_component.c b/orte/mca/db/hash/db_hash_component.c new file mode 100644 index 0000000000..96a63296d0 --- /dev/null +++ b/orte/mca/db/hash/db_hash_component.c @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * These symbols are in a file by themselves to provide nice linker + * semantics. Since linkers generally pull in symbols by object + * files, keeping these symbols as the only symbols in this file + * prevents utility programs such as "ompi_info" from having to import + * entire components just to query their version and parameters. + */ + +#include "orte_config.h" +#include "orte/constants.h" + +#include "opal/mca/base/base.h" + +#include "orte/mca/db/db.h" +#include "orte/mca/db/base/base.h" +#include "db_hash.h" + +static int db_hash_component_open(void); +static int db_hash_component_query(mca_base_module_t **module, int *priority); +static int db_hash_component_close(void); + +/* + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +orte_db_base_component_t mca_db_hash_component = { + { + ORTE_DB_BASE_VERSION_1_0_0, + + /* Component name and version */ + "hash", + ORTE_MAJOR_VERSION, + ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION, + + /* Component open and close functions */ + db_hash_component_open, + db_hash_component_close, + db_hash_component_query + }, + { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + } +}; + + +static int db_hash_component_open(void) +{ + return ORTE_SUCCESS; +} + +static int db_hash_component_query(mca_base_module_t **module, int *priority) +{ + /* this is the default module */ + *priority = 1; + *module = (mca_base_module_t*)&orte_db_hash_module; + return ORTE_SUCCESS; +} + + +static int db_hash_component_close(void) +{ + return ORTE_SUCCESS; +} + diff --git a/orte/mca/ess/base/ess_base_std_app.c b/orte/mca/ess/base/ess_base_std_app.c index 8c33f3cbc4..753a7d8fea 100644 --- a/orte/mca/ess/base/ess_base_std_app.c +++ b/orte/mca/ess/base/ess_base_std_app.c @@ -10,7 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2010-2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -41,6 +41,7 @@ #include "orte/mca/rml/base/base.h" #include "orte/mca/routed/base/base.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/db/base/base.h" #include "orte/mca/grpcomm/base/base.h" #include "orte/mca/rml/rml.h" #include "orte/mca/odls/odls_types.h" @@ -140,6 +141,18 @@ int orte_ess_base_app_setup(void) goto error; } + /* database */ + if (ORTE_SUCCESS != (ret = orte_db_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_db_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_select"; + goto error; + } + /* * Group communications */ @@ -302,6 +315,7 @@ int orte_ess_base_app_finalize(void) /* now can close the rml and its friendly group comm */ orte_grpcomm_base_close(); + orte_db_base_close(); orte_routed_base_close(); orte_rml_base_close(); diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index d9a971bc8c..33e615155c 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -11,8 +11,8 @@ * All rights reserved. * Copyright (c) 2009 Institut National de Recherche en Informatique * et Automatique. All rights reserved. - * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * $COPYRIGHT$ * @@ -44,6 +44,7 @@ #include "orte/mca/rml/base/base.h" #include "orte/mca/routed/base/base.h" #include "orte/mca/routed/routed.h" +#include "orte/mca/db/base/base.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/mca/grpcomm/base/base.h" #include "orte/mca/iof/base/base.h" @@ -264,6 +265,18 @@ int orte_ess_base_orted_setup(char **hosts) goto error; } + /* database */ + if (ORTE_SUCCESS != (ret = orte_db_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_db_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_select"; + goto error; + } + /* * Group communications */ diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index 6b05d8ace3..4b7efbfc72 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -50,6 +50,7 @@ #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/base/base.h" #include "orte/mca/routed/routed.h" +#include "orte/mca/db/base/base.h" #include "orte/mca/errmgr/base/base.h" #include "orte/mca/grpcomm/base/base.h" #include "orte/mca/iof/base/base.h" @@ -309,6 +310,18 @@ static int rte_init(void) goto error; } + /* database */ + if (ORTE_SUCCESS != (ret = orte_db_base_open())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_db_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_db_base_select"; + goto error; + } + /* * Group communications */ diff --git a/orte/mca/grpcomm/bad/grpcomm_bad_module.c b/orte/mca/grpcomm/bad/grpcomm_bad_module.c index 2ef16adc77..a32bbc6ed8 100644 --- a/orte/mca/grpcomm/bad/grpcomm_bad_module.c +++ b/orte/mca/grpcomm/bad/grpcomm_bad_module.c @@ -75,11 +75,6 @@ static int init(void) { int rc; - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* setup recvs */ if (ORTE_SUCCESS != (rc = orte_grpcomm_base_comm_start())) { ORTE_ERROR_LOG(rc); @@ -93,8 +88,6 @@ static int init(void) */ static void finalize(void) { - orte_grpcomm_base_modex_finalize(); - /* cancel recv */ orte_grpcomm_base_comm_stop(); } diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index aa69b25728..4ad68e5cc6 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -82,20 +82,16 @@ ORTE_DECLSPEC void orte_grpcomm_base_rollup_recv(int status, orte_process_name_t ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name, const void *data, size_t size); -ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, - const char * attribute_name, void **val, +ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t *proc, + const char *attribute_name, void **val, size_t *size); ORTE_DECLSPEC void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata); ORTE_DECLSPEC void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata); ORTE_DECLSPEC int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex); ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void); -ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void); -ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void); ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf); ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, opal_buffer_t *rbuf); -ORTE_DECLSPEC int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc, char *attribute_name, - void *data, int num_bytes); /* comm support */ ORTE_DECLSPEC int orte_grpcomm_base_comm_start(void); diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index d46e673b4d..5b536fd300 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -37,6 +37,7 @@ #include "opal/mca/hwloc/base/base.h" #include "orte/util/proc_info.h" +#include "orte/mca/db/db.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/runtime/orte_globals.h" @@ -428,412 +429,19 @@ void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata) } } -/** - * MODEX DATABASE DESIGN - * - * Modex data is always associated with a given orte process name, in - * an opal hash table. The hash table is necessary because modex data is - * received for entire jobids and when working with - * dynamic processes, it is possible we will receive data for a - * process not yet in the ompi_proc_all() list of processes. This - * information must be kept for later use, because if accept/connect - * causes the proc to be added to the ompi_proc_all() list, it could - * cause a connection storm. Therefore, we use an - * opal_hast_table_t backing store to contain all modex information. - * - * While we could add the now discovered proc into the ompi_proc_all() - * list, this has some problems, in that we don't have the - * architecture and hostname information needed to properly fill in - * the ompi_proc_t structure and we don't want to cause RML - * communication to get it when we don't really need to know anything - * about the remote proc. - * - * All data put into the modex (or received from the modex) is - * associated with a given proc,attr_name pair. The data structures - * to maintain this data look something like: - * - * opal_hash_table_t modex_data -> list of attr_proc_t objects - * - * +-----------------------------+ - * | modex_proc_data_t | - * | - opal_list_item_t | - * +-----------------------------+ - * | opal_mutex_t modex_lock | - * | bool modex_received_data | 1 - * | opal_list_t modules | ---------+ - * +-----------------------------+ | - * * | - * +--------------------------------+ <--------+ - * | modex_module_data_t | - * | - opal_list_item_t | - * +--------------------------------+ - * | mca_base_component_t component | - * | void *module_data | - * | size_t module_data_size | - * +--------------------------------+ - * - */ -/* Local "globals" */ -static orte_std_cntr_t num_entries; -static opal_buffer_t *modex_buffer; -static opal_hash_table_t *modex_data; -static opal_mutex_t mutex; -static opal_condition_t cond; - -/** - * Modex data for a particular orte process - * - * Locking infrastructure and list of module data for a given orte - * process name. The name association is maintained in the - * modex_data hash table. - */ -struct modex_proc_data_t { - /** Structure can be put on lists (including in hash tables) */ - opal_list_item_t super; - /* Lock held whenever the modex data for this proc is being - modified */ - opal_mutex_t modex_lock; - /* True if modex data has ever been received from this process, - false otherwise. */ - bool modex_received_data; - /* List of modex_module_data_t structures containing all data - received from this process, sorted by component name. */ - opal_list_t modex_module_data; -}; -typedef struct modex_proc_data_t modex_proc_data_t; - -static void -modex_construct(modex_proc_data_t * modex) -{ - OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); - modex->modex_received_data = false; - OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t); -} - -static void -modex_destruct(modex_proc_data_t * modex) -{ - OBJ_DESTRUCT(&modex->modex_module_data); - OBJ_DESTRUCT(&modex->modex_lock); -} - -OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, - modex_construct, modex_destruct); - - - -/** - * Data for a particular attribute - * - * Container for data for a particular module,attribute pair. This - * structure should be contained in the modex_module_data list in an - * modex_proc_data_t structure to maintain an association with a - * given proc. The list is then searched for a matching attribute - * name. - * - * While searching the list or reading from (or writing to) this - * structure, the lock in the proc_data_t should be held. - */ -struct modex_attr_data_t { - /** Structure can be put on lists */ - opal_list_item_t super; - /** Attribute name */ - char * attr_name; - /** Binary blob of data associated with this proc,component pair */ - void *attr_data; - /** Size (in bytes) of module_data */ - size_t attr_data_size; -}; -typedef struct modex_attr_data_t modex_attr_data_t; - -static void -modex_attr_construct(modex_attr_data_t * module) -{ - module->attr_name = NULL; - module->attr_data = NULL; - module->attr_data_size = 0; -} - -static void -modex_attr_destruct(modex_attr_data_t * module) -{ - if (NULL != module->attr_name) { - free(module->attr_name); - } - if (NULL != module->attr_data) { - free(module->attr_data); - } -} - -OBJ_CLASS_INSTANCE(modex_attr_data_t, - opal_list_item_t, - modex_attr_construct, - modex_attr_destruct); - - -/** - * Find data for a given attribute in a given modex_proc_data_t - * container. - * - * The proc_data's modex_lock must be held during this - * search. - */ -static modex_attr_data_t * -modex_lookup_attr_data(modex_proc_data_t *proc_data, - const char *attr_name, - bool create_if_not_found) -{ - modex_attr_data_t *attr_data = NULL; - for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); - attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); - attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { - if (0 == strcmp(attr_name, attr_data->attr_name)) { - return attr_data; - } - } - - if (create_if_not_found) { - attr_data = OBJ_NEW(modex_attr_data_t); - if (NULL == attr_data) return NULL; - - attr_data->attr_name = strdup(attr_name); - opal_list_append(&proc_data->modex_module_data, &attr_data->super); - - return attr_data; - } - - return NULL; -} - - -/** -* Find modex_proc_data_t container associated with given - * orte_process_name_t. - * - * The global lock should *NOT* be held when - * calling this function. - */ -static modex_proc_data_t* -modex_lookup_orte_proc(const orte_process_name_t *orte_proc) -{ - modex_proc_data_t *proc_data = NULL; - - OPAL_THREAD_LOCK(&mutex); - opal_hash_table_get_value_uint64(modex_data, - orte_util_hash_name(orte_proc), (void**)&proc_data); - if (NULL == proc_data) { - /* The proc clearly exists, so create a modex structure - for it */ - proc_data = OBJ_NEW(modex_proc_data_t); - if (NULL == proc_data) { - opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n"); - OPAL_THREAD_UNLOCK(&mutex); - return NULL; - } - opal_hash_table_set_value_uint64(modex_data, - orte_util_hash_name(orte_proc), proc_data); - } - OPAL_THREAD_UNLOCK(&mutex); - - return proc_data; -} - -int orte_grpcomm_base_modex_init(void) -{ - OBJ_CONSTRUCT(&mutex, opal_mutex_t); - OBJ_CONSTRUCT(&cond, opal_condition_t); - - modex_data = OBJ_NEW(opal_hash_table_t); - opal_hash_table_init(modex_data, 256); - num_entries = 0; - modex_buffer = OBJ_NEW(opal_buffer_t); - - return ORTE_SUCCESS; -} - -void orte_grpcomm_base_modex_finalize(void) -{ - OBJ_DESTRUCT(&mutex); - OBJ_DESTRUCT(&cond); - - opal_hash_table_remove_all(modex_data); - OBJ_RELEASE(modex_data); - - OBJ_RELEASE(modex_buffer); -} - -int orte_grpcomm_base_set_proc_attr(const char *attr_name, - const void *data, - size_t size) -{ - int rc; - - OPAL_THREAD_LOCK(&mutex); - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:set_proc_attr: setting attribute %s data size %lu", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - attr_name, (unsigned long)size)); - - /* Pack the attribute name information into the local buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &attr_name, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the size */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &size, 1, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* Pack the actual data into the buffer */ - if (0 != size) { - if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, (void *) data, size, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* track the number of entries */ - ++num_entries; - -cleanup: - OPAL_THREAD_UNLOCK(&mutex); - - return rc; -} - -int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, - const char * attribute_name, void **val, - size_t *size) -{ - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: searching for attr %s on proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attribute_name, - ORTE_NAME_PRINT(&proc))); - - proc_data = modex_lookup_orte_proc(&proc); - if (NULL == proc_data) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: no modex entry for proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc))); - return ORTE_ERR_NOT_FOUND; - } - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* look up attribute */ - attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); - - /* copy the data out to the user */ - if ((NULL == attr_data) || - (attr_data->attr_data_size == 0)) { - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc), attribute_name)); - *val = NULL; - *size = 0; - } else { - void *copy = malloc(attr_data->attr_data_size); - - if (copy == NULL) { - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - return ORTE_ERR_OUT_OF_RESOURCE; - } - memcpy(copy, attr_data->attr_data, attr_data->attr_data_size); - *val = copy; - *size = attr_data->attr_data_size; - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:get_proc_attr: found %d bytes for attr %s on proc %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr_data->attr_data_size, - attribute_name, ORTE_NAME_PRINT(&proc))); - - } - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - - return ORTE_SUCCESS; -} - - -int orte_grpcomm_base_purge_proc_attrs(void) -{ - /* - * Purge the attributes - */ - opal_hash_table_remove_all(modex_data); - OBJ_RELEASE(modex_data); - modex_data = OBJ_NEW(opal_hash_table_t); - opal_hash_table_init(modex_data, 256); - - /* - * Clear the modex buffer - */ - OBJ_RELEASE(modex_buffer); - num_entries = 0; - modex_buffer = OBJ_NEW(opal_buffer_t); - - return ORTE_SUCCESS; -} - -int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf) -{ - int rc; - - OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:pack_modex: reporting %ld entries", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)num_entries)); - - /* put the number of entries into the buffer */ - OPAL_THREAD_LOCK(&mutex); - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &num_entries, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* if there are entries, non-destructively copy the data across */ - if (0 < num_entries) { - if (ORTE_SUCCESS != (opal_dss.copy_payload(buf, modex_buffer))) { - ORTE_ERROR_LOG(rc); - } - } - -cleanup: - OPAL_THREAD_UNLOCK(&mutex); - return rc; -} int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, opal_buffer_t *rbuf) { - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; int rc = ORTE_SUCCESS; - orte_std_cntr_t num_recvd_entries; + int32_t num_recvd_entries; orte_std_cntr_t cnt; orte_std_cntr_t j; - /* look up the modex data structure */ - proc_data = modex_lookup_orte_proc(proc_name); - if (proc_data == NULL) { - /* report the error */ - opal_output(0, "grpcomm:base:update_modex: received modex info for unknown proc %s\n", - ORTE_NAME_PRINT(proc_name)); - return ORTE_ERR_NOT_FOUND; - } - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - /* unpack the number of entries for this proc */ cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_recvd_entries, &cnt, OPAL_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; } @@ -847,7 +455,7 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, * Extract the attribute names and values */ for (j = 0; j < num_recvd_entries; j++) { - size_t num_bytes; + int32_t num_bytes; void *bytes = NULL; char *attr_name; @@ -858,94 +466,145 @@ int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, } cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_bytes, &cnt, OPAL_INT32))) { ORTE_ERROR_LOG(rc); goto cleanup; } - if (num_bytes != 0) { + if (0 < num_bytes) { if (NULL == (bytes = malloc(num_bytes))) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); rc = ORTE_ERR_OUT_OF_RESOURCE; goto cleanup; } - cnt = (orte_std_cntr_t) num_bytes; + cnt = (orte_std_cntr_t)num_bytes; if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, bytes, &cnt, OPAL_BYTE))) { ORTE_ERROR_LOG(rc); goto cleanup; } num_bytes = cnt; + /* store it in the database */ + if (ORTE_SUCCESS != (rc = orte_db.store(proc_name, attr_name, (void*)bytes, num_bytes))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } } - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, - attr_name, true))) { - opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n"); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - if (NULL != attr_data->attr_data) { - /* some pre-existing value must be here - release it */ - free(attr_data->attr_data); - } - attr_data->attr_data = bytes; - attr_data->attr_data_size = num_bytes; - proc_data->modex_received_data = true; } -cleanup: - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + cleanup: return rc; } -int orte_grpcomm_base_load_modex_data(orte_process_name_t *proc_name, char *attr_name, - void *data, int num_bytes) +int orte_grpcomm_base_set_proc_attr(const char *attr_name, + const void *data, + size_t size) { - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - int rc = ORTE_SUCCESS; - void *bytes; + int rc; OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, - "%s grpcomm:base:load_modex_data: loading %ld bytes for attr %s on proc %s", + "%s grpcomm:set_proc_attr: setting attribute %s data size %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)num_bytes, attr_name, ORTE_NAME_PRINT(proc_name))); + attr_name, (unsigned long)size)); - /* look up the modex data structure */ - proc_data = modex_lookup_orte_proc(proc_name); - if (proc_data == NULL) { - /* report the error */ - opal_output(0, "grpcomm:base:update_modex: received modex info for unknown proc %s\n", - ORTE_NAME_PRINT(proc_name)); - return ORTE_ERR_NOT_FOUND; - } + rc = orte_db.store(ORTE_PROC_MY_NAME, attr_name, data, size); - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, - attr_name, true))) { - opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n"); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - if (NULL != attr_data->attr_data) { - /* some pre-existing value must be here - release it */ - free(attr_data->attr_data); - } - /* create space for the data - this is necessary since the data being - * passed to us may be static or released on the other end - */ - bytes = (void*)malloc(num_bytes); - memcpy(bytes, data, num_bytes); - attr_data->attr_data = bytes; - attr_data->attr_data_size = num_bytes; - proc_data->modex_received_data = true; - -cleanup: - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + return rc; +} + +int orte_grpcomm_base_get_proc_attr(const orte_process_name_t *proc, + const char *attribute_name, void **val, + size_t *size) +{ + orte_db_keyval_t *kv; + opal_list_t data; + int rc; + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s grpcomm:get_proc_attr: searching for attr %s on proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), attribute_name, + ORTE_NAME_PRINT(proc))); + + /* set defaults */ + *val = NULL; + *size = 0; + + /* fetch the data */ + OBJ_CONSTRUCT(&data, opal_list_t); + if (ORTE_SUCCESS != (rc = orte_db.fetch(proc, attribute_name, &data))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* this interface to the MPI layer only supports returning one value */ + if (1 < opal_list_get_size(&data)) { + return ORTE_ERROR; + } + kv = (orte_db_keyval_t*)opal_list_remove_first(&data); + /* transfer the data */ + *val = kv->value.bytes; + *size = kv->value.size; + kv->value.bytes = NULL; + /* cleanup */ + OBJ_RELEASE(kv); + + cleanup: + OBJ_DESTRUCT(&data); + return rc; +} + +int orte_grpcomm_base_purge_proc_attrs(void) +{ + return orte_db.remove(NULL, NULL); +} + +int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf) +{ + int rc; + int32_t num_entries; + orte_db_keyval_t *kv; + opal_list_t data; + + /* fetch our data */ + OBJ_CONSTRUCT(&data, opal_list_t); + if (ORTE_SUCCESS != (rc = orte_db.fetch(ORTE_PROC_MY_NAME, NULL, &data))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + num_entries = opal_list_get_size(&data); + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, + "%s grpcomm:base:pack_modex: reporting %d entries", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_entries)); + + /* put the number of entries into the buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &num_entries, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* if there are entries, store them */ + while (NULL != (kv = (orte_db_keyval_t*)opal_list_remove_first(&data))) { + if (ORTE_SUCCESS != (opal_dss.pack(buf, &kv->key, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + break; + } + if (ORTE_SUCCESS != (opal_dss.pack(buf, &(kv->value.size), 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + break; + } + if (0 < kv->value.size) { + if (ORTE_SUCCESS != (opal_dss.pack(buf, kv->value.bytes, kv->value.size, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + break; + } + } + OBJ_RELEASE(kv); + } + + cleanup: + while (NULL != (kv = (orte_db_keyval_t*)opal_list_remove_first(&data))) { + OBJ_RELEASE(kv); + } + OBJ_DESTRUCT(&data); + return rc; } diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index d8f79fde87..a3aae1424b 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -79,7 +79,7 @@ typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* att const void *buffer, size_t size); /* get an attribute buffer */ -typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name, +typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t *name, const char* attr_name, void **buffer, size_t *size); diff --git a/orte/tools/orte-info/components.c b/orte/tools/orte-info/components.c index 1b33c61ef5..6f9640dce7 100644 --- a/orte/tools/orte-info/components.c +++ b/orte/tools/orte-info/components.c @@ -56,6 +56,8 @@ #include "orte/mca/errmgr/base/base.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/mca/grpcomm/base/base.h" +#include "orte/mca/db/db.h" +#include "orte/mca/db/base/base.h" #include "orte/mca/ess/ess.h" #include "orte/mca/ess/base/base.h" #include "orte/util/show_help.h" @@ -332,6 +334,14 @@ void orte_info_open_components(void) map->components = &orte_grpcomm_base.components_available; opal_pointer_array_add(&component_map, map); + if (ORTE_SUCCESS != orte_db_base_open()) { + goto error; + } + map = OBJ_NEW(orte_info_component_map_t); + map->type = strdup("db"); + map->components = &orte_db_base.available_components; + opal_pointer_array_add(&component_map, map); + if (ORTE_SUCCESS != orte_ess_base_open()) { goto error; } @@ -487,6 +497,7 @@ void orte_info_close_components() */ (void) orte_grpcomm_base_close(); + (void) orte_db_base_close(); (void) orte_ess_base_close(); (void) orte_show_help_finalize(); #if !ORTE_DISABLE_FULL_SUPPORT diff --git a/orte/tools/orte-info/orte-info.c b/orte/tools/orte-info/orte-info.c index b3b8e561f7..e9883c6df5 100644 --- a/orte/tools/orte-info/orte-info.c +++ b/orte/tools/orte-info/orte-info.c @@ -239,6 +239,7 @@ int main(int argc, char *argv[]) opal_pointer_array_add(&mca_types, "errmgr"); opal_pointer_array_add(&mca_types, "ess"); opal_pointer_array_add(&mca_types, "grpcomm"); + opal_pointer_array_add(&mca_types, "db"); opal_pointer_array_add(&mca_types, "notifier"); /* Execute the desired action(s) */ diff --git a/orte/util/name_fns.c b/orte/util/name_fns.c index 5e900beca2..3b31da7406 100644 --- a/orte/util/name_fns.c +++ b/orte/util/name_fns.c @@ -567,6 +567,22 @@ uint64_t orte_util_hash_name(const orte_process_name_t * name) { return hash; } +/* hash a vpid based on Robert Jenkin's algorithm - note + * that the precise values of the constants in the algo are + * irrelevant. + */ +uint32_t orte_util_hash_vpid(orte_vpid_t vpid) { + uint32_t hash; + + hash = vpid; + hash = (hash + 0x7ed55d16) + (hash<<12); + hash = (hash ^ 0xc761c23c) ^ (hash>>19); + hash = (hash + 0x165667b1) + (hash<<5); + hash = (hash + 0xd3a2646c) ^ (hash<<9); + hash = (hash + 0xfd7046c5) + (hash<<3); + hash = (hash ^ 0xb55a4f09) ^ (hash>>16); + return hash; +} /* sysinfo conversion to and from string */ int orte_util_convert_string_to_sysinfo(char **cpu_type, char **cpu_model, diff --git a/orte/util/name_fns.h b/orte/util/name_fns.h index d5e35aa551..ad3acd90cd 100644 --- a/orte/util/name_fns.h +++ b/orte/util/name_fns.h @@ -125,6 +125,7 @@ ORTE_DECLSPEC int orte_util_compare_name_fields(orte_ns_cmp_bitmask_t fields, const orte_process_name_t* name2); /** This funtion returns a guaranteed unique hash value for the passed process name */ ORTE_DECLSPEC uint64_t orte_util_hash_name(const orte_process_name_t * name); +ORTE_DECLSPEC uint32_t orte_util_hash_vpid(orte_vpid_t vpid); ORTE_DECLSPEC int orte_util_convert_string_to_sysinfo(char **cpu_type, char **cpu_model, const char* sysinfo_string); ORTE_DECLSPEC int orte_util_convert_sysinfo_to_string(char** sysinfo_string,