1
1

Extend the db framework to add support for logging data to databases without duplicating all the modex-related storage.

This commit was SVN r27746.
Этот коммит содержится в:
Ralph Castain 2013-01-05 06:28:09 +00:00
родитель c4acd20eb9
Коммит c71e119bbb
21 изменённых файлов: 1176 добавлений и 35 удалений

Просмотреть файл

@ -16,4 +16,5 @@ headers += \
libmca_db_la_SOURCES += \
base/db_base_open.c \
base/db_base_close.c \
base/db_base_select.c
base/db_base_select.c \
base/db_base_fns.c

Просмотреть файл

@ -39,19 +39,41 @@ ORTE_DECLSPEC int orte_db_base_select(void);
*/
ORTE_DECLSPEC int orte_db_base_close(void);
typedef struct {
opal_list_item_t super;
int pri;
orte_db_base_module_t *module;
mca_base_component_t *component;
} orte_db_active_module_t;
OBJ_CLASS_DECLARATION(orte_db_active_module_t);
typedef struct {
int output;
opal_list_t available_components;
opal_list_t active_modules;
} orte_db_base_t;
ORTE_DECLSPEC extern orte_db_base_t orte_db_base;
ORTE_DECLSPEC int orte_db_base_send_modex_string(const char* key,
const void *buffer,
size_t size);
ORTE_DECLSPEC int orte_db_base_store(const orte_process_name_t *proc,
const char *key, const void *object,
opal_data_type_t type);
ORTE_DECLSPEC int orte_db_base_store_pointer(const orte_process_name_t *proc,
opal_value_t *kv);
ORTE_DECLSPEC int orte_db_base_fetch(const orte_process_name_t *proc,
const char *key, void **data,
opal_data_type_t type);
ORTE_DECLSPEC int orte_db_base_fetch_pointer(const orte_process_name_t *proc,
const char *key,
void **data, opal_data_type_t type);
ORTE_DECLSPEC int orte_db_base_fetch_multiple(const orte_process_name_t *proc,
const char *key,
opal_list_t *kvs);
ORTE_DECLSPEC int orte_db_base_remove_data(const orte_process_name_t *proc,
const char *key);
ORTE_DECLSPEC int orte_db_base_add_log(const char *table,
const opal_value_t *kvs, int nkvs);
ORTE_DECLSPEC int orte_db_base_send_modex_key_value(const char* key,
const void *value,
opal_data_type_t dtype);
END_C_DECLS

293
orte/mca/db/base/db_base_fns.c Обычный файл
Просмотреть файл

@ -0,0 +1,293 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/mca/mca.h"
#include "opal/util/output.h"
#include "opal/mca/base/base.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/db/base/base.h"
int orte_db_base_store(const orte_process_name_t *proc,
const char *key, const void *object,
opal_data_type_t type)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->store) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->store(proc, key, object, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_store_pointer(const orte_process_name_t *proc,
opal_value_t *kv)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->store_pointer) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->store_pointer(proc, kv))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_fetch(const orte_process_name_t *proc,
const char *key, void **data,
opal_data_type_t type)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->fetch) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->fetch(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_fetch_pointer(const orte_process_name_t *proc,
const char *key,
void **data, opal_data_type_t type)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->fetch_pointer) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->fetch_pointer(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_fetch_multiple(const orte_process_name_t *proc,
const char *key,
opal_list_t *kvs)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->fetch_multiple) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->fetch_multiple(proc, key, kvs))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_remove_data(const orte_process_name_t *proc,
const char *key)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->remove) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->remove(proc, key))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* if we get here without performing the operation, that's an error */
if (!did_op) {
ORTE_ERROR_LOG(ORTE_ERROR);
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
int orte_db_base_add_log(const char *table,
const opal_value_t *kvs, int nkvs)
{
bool did_op;
opal_list_item_t *item;
orte_db_active_module_t *mod;
int rc;
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
for (item = opal_list_get_first(&orte_db_base.active_modules);
item != opal_list_get_end(&orte_db_base.active_modules);
item = opal_list_get_next(item)) {
mod = (orte_db_active_module_t*)item;
if (NULL == mod->module->add_log) {
continue;
}
if (ORTE_SUCCESS == (rc = mod->module->add_log(table, kvs, nkvs))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (ORTE_ERR_TAKE_NEXT_OPTION != rc) {
/* don't error log it here */
return rc;
}
}
/* if we get here without performing the operation, let the caller know */
if (!did_op) {
/* don't error log it here */
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -28,7 +28,17 @@
#include "orte/mca/db/base/static-components.h"
orte_db_base_module_t orte_db;
orte_db_base_module_t orte_db = {
NULL,
NULL,
orte_db_base_store,
orte_db_base_store_pointer,
orte_db_base_fetch,
orte_db_base_fetch_pointer,
orte_db_base_fetch_multiple,
orte_db_base_remove_data,
orte_db_base_add_log
};
orte_db_base_t orte_db_base;
int orte_db_base_open(void)
@ -36,6 +46,7 @@ int orte_db_base_open(void)
orte_db_base.output = opal_output_open(NULL);
OBJ_CONSTRUCT(&orte_db_base.available_components, opal_list_t);
OBJ_CONSTRUCT(&orte_db_base.active_modules, opal_list_t);
/* Open up all available components */
if (ORTE_SUCCESS !=
@ -47,3 +58,7 @@ int orte_db_base_open(void)
return ORTE_SUCCESS;
}
OBJ_CLASS_INSTANCE(orte_db_active_module_t,
opal_list_item_t,
NULL, NULL);

Просмотреть файл

@ -1,5 +1,6 @@
/*
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -15,34 +16,107 @@
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_component_repository.h"
#include "orte/util/name_fns.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/db/base/base.h"
extern opal_list_t orte_db_base_components_available;
static bool selected = false;
int
orte_db_base_select(void)
{
orte_db_base_component_t *best_component = NULL;
orte_db_base_module_t *best_module = NULL;
opal_list_item_t *item, *itm2;
mca_base_component_list_item_t *cli = NULL;
mca_base_component_t *component = NULL;
mca_base_module_t *module = NULL;
orte_db_base_module_t *nmodule;
orte_db_active_module_t *newmodule, *mod;
int rc, priority;
bool inserted;
/*
* Select the best component
*/
if( OPAL_SUCCESS != mca_base_select("db", orte_db_base.output,
&orte_db_base.available_components,
(mca_base_module_t **) &best_module,
(mca_base_component_t **) &best_component) ) {
/* It is okay to not select a component - default
* to using the base NULL component
*/
if (selected) {
/* ensure we don't do this twice */
return ORTE_SUCCESS;
}
selected = true;
/* Query all available components and ask if they have a module */
for (item = opal_list_get_first(&orte_db_base.available_components);
opal_list_get_end(&orte_db_base.available_components) != item;
item = opal_list_get_next(item)) {
cli = (mca_base_component_list_item_t *) item;
component = (mca_base_component_t *) cli->cli_component;
/* Save and init the winner */
orte_db = *best_module;
if (NULL != orte_db.init) {
orte_db.init();
opal_output_verbose(5, orte_db_base.output,
"mca:db:select: checking available component %s", component->mca_component_name);
/* If there's no query function, skip it */
if (NULL == component->mca_query_component) {
opal_output_verbose(5, orte_db_base.output,
"mca:db:select: Skipping component [%s]. It does not implement a query function",
component->mca_component_name );
continue;
}
/* Query the component */
opal_output_verbose(5, orte_db_base.output,
"mca:db:select: Querying component [%s]",
component->mca_component_name);
rc = component->mca_query_component(&module, &priority);
/* If no module was returned, then skip component */
if (ORTE_SUCCESS != rc || NULL == module) {
opal_output_verbose(5, orte_db_base.output,
"mca:db:select: Skipping component [%s]. Query failed to return a module",
component->mca_component_name );
continue;
}
nmodule = (orte_db_base_module_t*) module;
/* attempt to initialize the module */
if (NULL != nmodule->init) {
if (ORTE_SUCCESS != (rc = nmodule->init())) {
/* skip the module */
continue;
}
}
/* If we got a module, add to the list of selected modules */
newmodule = OBJ_NEW(orte_db_active_module_t);
newmodule->pri = priority;
newmodule->module = nmodule;
newmodule->component = component;
/* maintain priority order */
inserted = false;
for (itm2 = opal_list_get_first(&orte_db_base.active_modules);
itm2 != opal_list_get_end(&orte_db_base.active_modules);
itm2 = opal_list_get_next(itm2)) {
mod = (orte_db_active_module_t*)itm2;
if (priority > mod->pri) {
opal_list_insert_pos(&orte_db_base.active_modules,
itm2, &newmodule->super);
inserted = true;
break;
}
}
if (!inserted) {
/* must be lowest priority - add to end */
opal_list_append(&orte_db_base.active_modules, &newmodule->super);
}
}
return ORTE_SUCCESS;
if (4 < opal_output_get_verbosity(orte_db_base.output)) {
opal_output(0, "%s: Final db priorities", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* show the prioritized list */
for (itm2 = opal_list_get_first(&orte_db_base.active_modules);
itm2 != opal_list_get_end(&orte_db_base.active_modules);
itm2 = opal_list_get_next(itm2)) {
mod = (orte_db_active_module_t*)itm2;
opal_output(0, "\tComponent: %s Priority: %d", mod->component->mca_component_name, mod->pri);
}
}
return ORTE_SUCCESS;;
}

Просмотреть файл

@ -105,6 +105,13 @@ typedef int (*orte_db_base_module_fetch_multiple_fn_t)(const orte_process_name_t
*/
typedef int (*orte_db_base_module_remove_fn_t)(const orte_process_name_t *proc, const char *key);
/*
* Log data
*
* Insert statistical, non-process oriented data into a logging system.
*/
typedef int (*orte_db_base_module_add_log_fn_t)(const char *table, const opal_value_t *kvs, int nkvs);
/*
* the standard module data structure
*/
@ -117,6 +124,7 @@ struct orte_db_base_module_1_0_0_t {
orte_db_base_module_fetch_pointer_fn_t fetch_pointer;
orte_db_base_module_fetch_multiple_fn_t fetch_multiple;
orte_db_base_module_remove_fn_t remove;
orte_db_base_module_add_log_fn_t add_log;
};
typedef struct orte_db_base_module_1_0_0_t orte_db_base_module_1_0_0_t;
typedef struct orte_db_base_module_1_0_0_t orte_db_base_module_t;

38
orte/mca/db/gpdb/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,38 @@
#
# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
db_gpdb.h \
db_gpdb_component.c \
db_gpdb.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_db_gpdb_DSO
component_noinst =
component_install = mca_db_gpdb.la
else
component_noinst = libmca_db_gpdb.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_db_gpdb_la_CPPFLAGS = $(db_gpdb_CPPFLAGS)
mca_db_gpdb_la_SOURCES = $(sources)
mca_db_gpdb_la_LDFLAGS = -module -avoid-version $(db_gpdb_LDFLAGS)
mca_db_gpdb_la_LIBADD = $(db_gpdb_LIBS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_db_gpdb_la_CPPFLAGS = $(db_gpdb_CPPFLAGS)
libmca_db_gpdb_la_SOURCES =$(sources)
libmca_db_gpdb_la_LDFLAGS = -module -avoid-version $(db_gpdb_LDFLAGS)
libmca_db_gpdb_la_LIBADD = $(db_gpdb_LIBS)

39
orte/mca/db/gpdb/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
dnl -*- shell-script -*-
dnl
dnl Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
dnl $COPYRIGHT$
dnl
dnl Additional copyrights may follow
dnl
dnl $HEADER$
dnl
# MCA_db_gpdb_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_orte_db_gpdb_CONFIG], [
AC_CONFIG_FILES([orte/mca/db/gpdb/Makefile])
AC_ARG_WITH([gpdb],
[AC_HELP_STRING([--with-gpdb],
[Build gpdb support (default: no)])],
[], with_gpdb=no)
# do not build if rte is disabled or support not requested
AS_IF([test "$orte_without_full_support" = 0 -a "$with_gpdb" != "no"],
[AS_IF([test ! -z "$with_gpdb" -a "$with_gpdb" != "yes"],
[orte_check_gpdb_dir="$with_gpdb"])
OMPI_CHECK_PACKAGE([db_gpdb],
[gpdb.h],
[gpdb],
[gpdb_open],
[],
[$orte_check_gpdb_dir],
[],
[$1],
[$2])],
[$2])
AC_SUBST(db_gpdb_CPPFLAGS)
AC_SUBST(db_gpdb_LDFLAGS)
AC_SUBST(db_gpdb_LIBS)
])dnl

66
orte/mca/db/gpdb/db_gpdb.c Обычный файл
Просмотреть файл

@ -0,0 +1,66 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_LIMITS_H
#include <limits.h>
#endif
#include <stdio.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "orte/mca/db/base/base.h"
#include "db_gpdb.h"
static int init(void);
static void finalize(void);
static int add_log(const char *table,
const opal_value_t *kvs, int nkvs);
orte_db_base_module_t orte_db_gpdb_module = {
init,
finalize,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
add_log
};
/* local variables */
static int init(void)
{
return ORTE_SUCCESS;
}
static void finalize(void)
{
}
static int add_log(const char *table,
const opal_value_t *kvs, int nkvs)
{
opal_output_verbose(2, orte_db_base.output,
"%s Logging data for table %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), table);
return ORTE_ERR_NOT_IMPLEMENTED;
}

28
orte/mca/db/gpdb/db_gpdb.h Обычный файл
Просмотреть файл

@ -0,0 +1,28 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef ORTE_DB_GPDB_H
#define ORTE_DB_GPDB_H
#include "orte/mca/db/db.h"
BEGIN_C_DECLS
typedef struct {
orte_db_base_component_t super;
int num_worker_threads;
char *db_file;
} orte_db_gpdb_component_t;
ORTE_MODULE_DECLSPEC extern orte_db_gpdb_component_t mca_db_gpdb_component;
ORTE_DECLSPEC extern orte_db_base_module_t orte_db_gpdb_module;
END_C_DECLS
#endif /* ORTE_DB_GPDB_H */

102
orte/mca/db/gpdb/db_gpdb_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,102 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/util/proc_info.h"
#include "orte/mca/db/db.h"
#include "orte/mca/db/base/base.h"
#include "db_gpdb.h"
extern orte_db_base_module_t orte_db_gpdb_module;
char *orte_db_gpdb_file;
static int gpdb_component_open(void);
static int gpdb_component_close(void);
static int gpdb_component_query(mca_base_module_t **module, int *priority);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
orte_db_gpdb_component_t mca_db_gpdb_component = {
{
{
ORTE_DB_BASE_VERSION_1_0_0,
/* Component name and version */
"gpdb",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
gpdb_component_open,
gpdb_component_close,
gpdb_component_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
}
};
static int gpdb_component_open(void)
{
return ORTE_SUCCESS;
}
static int gpdb_component_query(mca_base_module_t **module, int *priority)
{
mca_base_component_t *c = &mca_db_gpdb_component.super.base_version;
/* retrieve the name of the database to be used */
mca_base_param_reg_string(c, "database",
"Name of database",
false, false, NULL, &mca_db_gpdb_component.db_file);
/* retrieve the number of worker threads to be used */
mca_base_param_reg_int(c, "num_worker_threads",
"Number of worker threads to be used",
false, false, 0, &mca_db_gpdb_component.num_worker_threads);
if (NULL != mca_db_gpdb_component.db_file) {
*priority = 3; /* ahead of sqlite3 */
*module = (mca_base_module_t*)&orte_db_gpdb_module;
return ORTE_SUCCESS;
}
*priority = 0;
*module = NULL;
return ORTE_ERROR;
}
static int gpdb_component_close(void)
{
if (NULL != mca_db_gpdb_component.db_file) {
free(mca_db_gpdb_component.db_file);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -56,7 +56,8 @@ orte_db_base_module_t orte_db_hash_module = {
fetch,
fetch_pointer,
fetch_multiple,
remove_data
remove_data,
NULL
};
/* Local "globals" */

Просмотреть файл

@ -60,8 +60,10 @@ static int db_hash_component_open(void)
static int db_hash_component_query(mca_base_module_t **module, int *priority)
{
/* this is the default module */
*priority = 1;
/* we are the default - the ESS modules will set the db selection
* envar if they need someone else
*/
*priority = 50;
*module = (mca_base_module_t*)&orte_db_hash_module;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -57,7 +57,8 @@ orte_db_base_module_t orte_db_pmi_module = {
fetch,
fetch_pointer,
fetch_multiple,
remove_data
remove_data,
NULL
};
static int pmi_encode(char *outdata, const void *val, size_t vallen);

Просмотреть файл

@ -62,14 +62,15 @@ static int db_pmi_component_open(void)
static int db_pmi_component_query(mca_base_module_t **module, int *priority)
{
/* only use PMI when direct launched */
if (NULL == orte_process_info.my_hnp_uri &&
ORTE_PROC_IS_MPI &&
mca_common_pmi_init()) {
*priority = 100;
/* only use PMI if available - the ESS pmi module
* will force our selection if we are direct-launched
*/
if (mca_common_pmi_init()) {
*priority = 10;
*module = (mca_base_module_t*)&orte_db_pmi_module;
return ORTE_SUCCESS;
}
*priority = 0;
*module = NULL;
return ORTE_ERROR;

40
orte/mca/db/sqlite/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,40 @@
#
# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
dist_pkgdata_DATA = help-db-sqlite.txt
sources = \
db_sqlite.h \
db_sqlite_component.c \
db_sqlite.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_orte_db_sqlite_DSO
component_noinst =
component_install = mca_db_sqlite.la
else
component_noinst = libmca_db_sqlite.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_db_sqlite_la_CPPFLAGS = $(db_sqlite_CPPFLAGS)
mca_db_sqlite_la_SOURCES = $(sources)
mca_db_sqlite_la_LDFLAGS = -module -avoid-version $(db_sqlite_LDFLAGS)
mca_db_sqlite_la_LIBADD = $(db_sqlite_LIBS)
noinst_LTLIBRARIES = $(component_noinst)
libmca_db_sqlite_la_CPPFLAGS = $(db_sqlite_CPPFLAGS)
libmca_db_sqlite_la_SOURCES =$(sources)
libmca_db_sqlite_la_LDFLAGS = -module -avoid-version $(db_sqlite_LDFLAGS)
libmca_db_sqlite_la_LIBADD = $(db_sqlite_LIBS)

39
orte/mca/db/sqlite/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
dnl -*- shell-script -*-
dnl
dnl Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
dnl $COPYRIGHT$
dnl
dnl Additional copyrights may follow
dnl
dnl $HEADER$
dnl
# MCA_db_sqlite_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_orte_db_sqlite_CONFIG], [
AC_CONFIG_FILES([orte/mca/db/sqlite/Makefile])
AC_ARG_WITH([sqlite3],
[AC_HELP_STRING([--with-sqlite3],
[Build sqlite3 support (default: no)])],
[], with_sqlite3=no)
# do not build if rte is disabled or support not requested
AS_IF([test "$orte_without_full_support" = 0 -a "$with_sqlite3" != "no"],
[AS_IF([test ! -z "$with_sqlite3" -a "$with_sqlite3" != "yes"],
[orte_check_sqlite3_dir="$with_sqlite3"])
OMPI_CHECK_PACKAGE([db_sqlite],
[sqlite3.h],
[sqlite3],
[sqlite3_open],
[],
[$orte_check_sqlite3_dir],
[],
[$1],
[$2])],
[$2])
AC_SUBST(db_sqlite_CPPFLAGS)
AC_SUBST(db_sqlite_LDFLAGS)
AC_SUBST(db_sqlite_LIBS)
])dnl

191
orte/mca/db/sqlite/db_sqlite.c Обычный файл
Просмотреть файл

@ -0,0 +1,191 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_LIMITS_H
#include <limits.h>
#endif
#include <stdio.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <sqlite3.h>
#include "opal/dss/dss_types.h"
#include "opal/util/basename.h"
#include "opal/util/os_dirpath.h"
#include "opal/util/os_path.h"
#include "opal/util/output.h"
#include "opal/util/malloc.h"
#include "opal/util/basename.h"
#include "opal/mca/pstat/base/base.h"
#include "orte/util/show_help.h"
#include "orte/mca/errmgr/base/base.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/db/base/base.h"
#include "db_sqlite.h"
static int init(void);
static void finalize(void);
static int add_log(const char *table,
const opal_value_t *kvs, int nkvs);
orte_db_base_module_t orte_db_sqlite_module = {
init,
finalize,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
add_log
};
/* local variables */
static sqlite3 **dbhandles=NULL;
static int nthreads = 1;
static int active = 0;
static int init(void)
{
char *dir;
int i;
/* initialize sqlite3 */
if (SQLITE_OK != sqlite3_initialize()) {
return ORTE_ERR_UNREACH;
}
/* check if sqlite was built thread-safe - if not, we won't
* use worker threads for thruput
*/
if (0 == mca_db_sqlite_component.num_worker_threads || 0 != sqlite3_threadsafe()) {
nthreads = 1;
} else {
nthreads = mca_db_sqlite_component.num_worker_threads;
}
/* get the required number of database handles */
dbhandles = (sqlite3**)malloc(nthreads * sizeof(sqlite3*));
/* open the database - this will create the database file if
* it doesn't already exist
*/
for (i=0; i < nthreads; i++) {
if (SQLITE_OK != sqlite3_open(mca_db_sqlite_component.db_file, &dbhandles[i])) {
orte_show_help("help-db-sqlite.txt", "cannot-create-sqlite", true, mca_db_sqlite_component.db_file);
return ORTE_ERR_FILE_OPEN_FAILURE;
}
}
return ORTE_SUCCESS;
}
static void finalize(void)
{
int i;
/* if we are normally terminating, remove the recovery file */
if (NULL != dbhandles) {
for (i=0; i < nthreads; i++) {
if (SQLITE_OK != sqlite3_close(dbhandles[i])) {
opal_output(0, "sqlite failed to close");
}
}
}
}
static int add_log(const char *table,
const opal_value_t *kvs, int nkvs)
{
int i, rc;
char *sql, **cmd = NULL, *tmp;
sqlite3_stmt *stmt;
opal_output_verbose(2, orte_db_base.output,
"%s Logging data for table %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), table);
/* setup the insert statement */
for (i=0; i < nkvs; i++) {
opal_argv_append_nosize(&cmd, "?");
}
tmp = opal_argv_join(cmd, ',');
asprintf(&sql, "INSERT INTO %s VALUES (%s)", table, tmp);
free(tmp);
opal_argv_free(cmd);
/* use the next worker thread */
ORTE_SQLITE_CMD(prepare_v2(dbhandles[active], sql, strlen(sql)+1, &stmt, NULL), dbhandles[active], &rc);
if (SQLITE_OK != rc) {
return ORTE_ERROR;
}
/* cycle through the provided values and construct
* an insert command for them - note that the values
* MUST be in column-order for the database!
*/
for (i=0; i < nkvs; i++) {
switch (kvs[i].type) {
case OPAL_STRING:
ORTE_SQLITE_CMD(bind_text(stmt, i, kvs[i].data.string, strlen(kvs[i].data.string), NULL),
dbhandles[active], &rc);
break;
case OPAL_INT32:
ORTE_SQLITE_CMD(bind_int(stmt, i, kvs[i].data.int32), dbhandles[active], &rc);
break;
case OPAL_INT16:
ORTE_SQLITE_CMD(bind_int(stmt, i, kvs[i].data.int16), dbhandles[active], &rc);
break;
case OPAL_PID:
ORTE_SQLITE_CMD(bind_int64(stmt, i, kvs[i].data.pid), dbhandles[active], &rc);
break;
case OPAL_INT64:
ORTE_SQLITE_CMD(bind_int64(stmt, i, kvs[i].data.int64), dbhandles[active], &rc);
break;
case OPAL_FLOAT:
ORTE_SQLITE_CMD(bind_double(stmt, i, kvs[i].data.fval), dbhandles[active], &rc);
break;
case OPAL_TIMEVAL:
asprintf(&tmp, "%d.%06d", (int)kvs[i].data.tv.tv_sec, (int)kvs[i].data.tv.tv_usec);
ORTE_SQLITE_CMD(bind_text(stmt, i, tmp, strlen(tmp), NULL),
dbhandles[active], &rc);
free(tmp);
break;
}
if (SQLITE_OK != rc) {
return ORTE_ERROR;
}
}
ORTE_SQLITE_OP(step(stmt), DONE, dbhandles[active], &rc);
if (SQLITE_OK != rc) {
return ORTE_ERROR;
}
opal_output(0, "%s INSERTED ROW %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)sqlite3_last_insert_rowid(dbhandles[active]));
/* cycle to the next worker thread */
active++;
if (nthreads < active) {
active = 0;
}
return ORTE_SUCCESS;
}

49
orte/mca/db/sqlite/db_sqlite.h Обычный файл
Просмотреть файл

@ -0,0 +1,49 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef ORTE_DB_SQLITE_H
#define ORTE_DB_SQLITE_H
#include "orte/mca/db/db.h"
BEGIN_C_DECLS
typedef struct {
orte_db_base_component_t super;
int num_worker_threads;
char *db_file;
} orte_db_sqlite_component_t;
ORTE_MODULE_DECLSPEC extern orte_db_sqlite_component_t mca_db_sqlite_component;
ORTE_DECLSPEC extern orte_db_base_module_t orte_db_sqlite_module;
/* Macros for manipulating sqlite */
#define ORTE_SQLITE_CMD(f, db, r) \
{ \
*(r) = sqlite3_ ## f; \
if (*(r) != SQLITE_OK) { \
opal_output(0, "%s: %s failed with status %d: %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
#f, *(r), sqlite3_errmsg(db)); \
} \
} \
#define ORTE_SQLITE_OP(f, x, db, r) \
{ \
*(r) = sqlite3_ ## f; \
if (*(r) != SQLITE_ ## x) { \
opal_output(0, "%s: %s failed with status %d: %s", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
#f, *(r), sqlite3_errmsg(db)); \
} \
} \
END_C_DECLS
#endif /* ORTE_DB_SQLITE_H */

116
orte/mca/db/sqlite/db_sqlite_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,116 @@
/*
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <sys/stat.h>
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/util/proc_info.h"
#include "orte/util/show_help.h"
#include "orte/mca/db/db.h"
#include "orte/mca/db/base/base.h"
#include "db_sqlite.h"
extern orte_db_base_module_t orte_db_sqlite_module;
char *orte_db_sqlite_file;
static int sqlite_component_open(void);
static int sqlite_component_close(void);
static int sqlite_component_query(mca_base_module_t **module, int *priority);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
orte_db_sqlite_component_t mca_db_sqlite_component = {
{
{
ORTE_DB_BASE_VERSION_1_0_0,
/* Component name and version */
"sqlite",
ORTE_MAJOR_VERSION,
ORTE_MINOR_VERSION,
ORTE_RELEASE_VERSION,
/* Component open and close functions */
sqlite_component_open,
sqlite_component_close,
sqlite_component_query
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
}
};
static int sqlite_component_open(void)
{
return ORTE_SUCCESS;
}
static int sqlite_component_query(mca_base_module_t **module, int *priority)
{
mca_base_component_t *c = &mca_db_sqlite_component.super.base_version;
struct stat buf;
/* retrieve the name of the file to be used */
mca_base_param_reg_string(c, "database",
"Name of file to be used for database",
false, false, NULL, &mca_db_sqlite_component.db_file);
/* retrieve the number of worker threads to be used, if sqlite3 is thread-safe */
mca_base_param_reg_int(c, "num_worker_threads",
"Number of worker threads to be used",
false, false, 0, &mca_db_sqlite_component.num_worker_threads);
if (NULL != mca_db_sqlite_component.db_file) {
/* if the database file doesn't exist, then we can't operate */
if (0 != stat(mca_db_sqlite_component.db_file, &buf)) {
/* not found */
orte_show_help("help-db-sqlite.txt", "file-not-found",
true, mca_db_sqlite_component.db_file);
*priority = 0;
*module = NULL;
return ORTE_ERROR;
}
*priority = 1;
*module = (mca_base_module_t*)&orte_db_sqlite_module;
return ORTE_SUCCESS;
}
*priority = 0;
*module = NULL;
return ORTE_ERROR;
}
static int sqlite_component_close(void)
{
if (NULL != mca_db_sqlite_component.db_file) {
free(mca_db_sqlite_component.db_file);
}
return ORTE_SUCCESS;
}

15
orte/mca/db/sqlite/help-db-sqlite.txt Обычный файл
Просмотреть файл

@ -0,0 +1,15 @@
-*- text -*-
#
# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
[file-not-found]
WARNING: Could not find specified database file
File: %s
Data logging into sqlite3 has been disabled.