1
1

Add support for postgres database

This commit was SVN r30337.
Этот коммит содержится в:
Ralph Castain 2014-01-20 19:56:26 +00:00
родитель e0edc29029
Коммит 12e4f8a71d
5 изменённых файлов: 500 добавлений и 0 удалений

34
opal/mca/db/postgres/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
#
# Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
db_postgres.h \
db_postgres_component.c \
db_postgres.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if MCA_BUILD_opal_db_postgres_DSO
component_noinst =
component_install = mca_db_postgres.la
else
component_noinst = libmca_db_postgres.la
component_install =
endif
mcacomponentdir = $(ompilibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_db_postgres_la_SOURCES = $(sources)
mca_db_postgres_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_db_postgres_la_SOURCES =$(sources)
libmca_db_postgres_la_LDFLAGS = -module -avoid-version

43
opal/mca/db/postgres/configure.m4 Обычный файл
Просмотреть файл

@ -0,0 +1,43 @@
dnl -*- shell-script -*-
dnl
dnl Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
dnl Copyright (c) 2013 Intel, Inc. All rights reserved.
dnl $COPYRIGHT$
dnl
dnl Additional copyrights may follow
dnl
dnl $HEADER$
dnl
# MCA_db_postgres_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_opal_db_postgres_CONFIG], [
AC_CONFIG_FILES([opal/mca/db/postgres/Makefile])
AC_ARG_WITH([postgres],
[AC_HELP_STRING([--with-postgres],
[Build postgres support (default: no)])],
[], with_postgres=no)
# do not build if support not requested
AS_IF([test "$with_postgres" != "no"],
[AS_IF([test ! -z "$with_postgres" -a "$with_postgres" != "yes"],
[opal_check_postgres_dir="$with_postgres"])
OMPI_CHECK_PACKAGE([db_postgres],
[libpq-fe.h],
[pq],
[PQconnectdb],
[],
[$opal_check_postgres_dir],
[],
[$1],
[AC_MSG_WARN([POSTGRES DATABASE SUPPORT REQUESTED])
AC_MSG_WARN([BUT REQUIRED LIBRARY OR HEADER NOT FOUND])
AC_MSG_ERROR([CANNOT CONTINUE])
$2])],
[$2])
AC_SUBST(db_postgres_CPPFLAGS)
AC_SUBST(db_postgres_LDFLAGS)
AC_SUBST(db_postgres_LIBS)
])dnl

218
opal/mca/db/postgres/db_postgres.c Обычный файл
Просмотреть файл

@ -0,0 +1,218 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#include "opal_config.h"
#include "opal/constants.h"
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_LIMITS_H
#include <limits.h>
#endif
#include <stdio.h>
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "libpq-fe.h"
#include "opal_stdint.h"
#include "opal/util/argv.h"
#include "opal/util/error.h"
#include "opal/mca/db/base/base.h"
#include "db_postgres.h"
#define OPAL_PG_MAX_LINE_LENGTH 4096
static int init(void);
static void finalize(void);
static int add_log(const char *table,
const opal_value_t *kvs, int nkvs);
opal_db_base_module_t opal_db_postgres_module = {
init,
finalize,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
add_log
};
static PGconn *conn = NULL;
/* local variables */
static int init(void)
{
char **login=NULL;
char **connection=NULL;
/* break the user info into its login parts */
login = opal_argv_split(mca_db_postgres_component.user, ':');
if (2 != opal_argv_count(login)) {
opal_output(0, "db:postgres: User info is invalid: %s",
mca_db_postgres_component.user);
opal_argv_free(login);
return OPAL_ERR_BAD_PARAM;
}
/* break the uri */
connection = opal_argv_split(mca_db_postgres_component.pguri, ':');
if (2 != opal_argv_count(connection)) {
opal_argv_free(login);
opal_argv_free(connection);
opal_output(0, "db:postgres: Connection info is invalid: %s",
mca_db_postgres_component.pguri);
return OPAL_ERR_BAD_PARAM;
}
conn = PQsetdbLogin(connection[0], connection[1],
mca_db_postgres_component.pgoptions,
mca_db_postgres_component.pgtty,
mca_db_postgres_component.dbname,
login[0], login[1]);
opal_argv_free(login);
opal_argv_free(connection);
if (PQstatus(conn) != CONNECTION_OK) {
conn = NULL;
opal_output(0, "***********************************************\n");
opal_output(0, "db:postgres: Connection failed:\n\tURI: %s\n\tOPTIONS: %s\n\tTTY: %s\n\tDBNAME: %s\n\tUSER: %s",
mca_db_postgres_component.pguri,
(NULL == mca_db_postgres_component.pgoptions) ? "NULL" : mca_db_postgres_component.pgoptions,
(NULL == mca_db_postgres_component.pgtty) ? "NULL" : mca_db_postgres_component.pgtty,
mca_db_postgres_component.dbname,
mca_db_postgres_component.user);
opal_output(0, "\n***********************************************");
exit(OPAL_ERR_CONNECTION_FAILED);
return OPAL_ERR_CONNECTION_FAILED;
}
opal_output_verbose(5, opal_db_base_framework.framework_output,
"db:postgres: Connection established to %s",
mca_db_postgres_component.dbname);
return OPAL_SUCCESS;
}
static void finalize(void)
{
if (NULL != conn) {
PQfinish(conn);
}
}
/* NOTE: at this time, we only support data from the
* "sigar" source. We'll have to add a mapping from
* source to table later
*/
static int add_log(const char *source,
const opal_value_t *kvs, int nkvs)
{
char *query, *vstr;
PGresult *res;
char **cmdargs=NULL;
time_t nowtime;
struct tm *nowtm;
char tbuf[1024], buf[64];
int i;
opal_output_verbose(2, opal_db_base_framework.framework_output,
"Logging data for source %s", source);
if (0 != strcmp(source, "sigar")) {
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
return OPAL_ERR_NOT_SUPPORTED;
}
/* cycle through the provided values and construct
* an insert command for them - note that the values
* MUST be in column-order for the database!
*/
for (i=0; i < nkvs; i++) {
switch (kvs[i].type) {
case OPAL_STRING:
snprintf(tbuf, sizeof(tbuf), "\'%s\'", kvs[i].data.string);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_INT32:
snprintf(tbuf, sizeof(tbuf), "%d", kvs[i].data.int32);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_INT16:
snprintf(tbuf, sizeof(tbuf), "%d", (int)kvs[i].data.int16);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_PID:
snprintf(tbuf, sizeof(tbuf), "%lu", (unsigned long)kvs[i].data.pid);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_INT64:
snprintf(tbuf, sizeof(tbuf), "%" PRIi64 "", kvs[i].data.int64);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_UINT64:
snprintf(tbuf, sizeof(tbuf), "%" PRIu64 "", kvs[i].data.uint64);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_FLOAT:
snprintf(tbuf, sizeof(tbuf), "%f", kvs[i].data.fval);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
case OPAL_TIMEVAL:
/* we only care about seconds */
nowtime = kvs[i].data.tv.tv_sec;
nowtm = localtime(&nowtime);
strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(tbuf, sizeof(tbuf), "\'%s\'", buf);
opal_argv_append_nosize(&cmdargs, tbuf);
break;
default:
OPAL_ERROR_LOG(OPAL_ERR_NOT_SUPPORTED);
break;
}
}
/* assemble the value string */
vstr = opal_argv_join(cmdargs, ',');
/* create the query */
asprintf(&query, "INSERT INTO %s values (%s)", mca_db_postgres_component.table, vstr);
free(vstr);
opal_output_verbose(2, opal_db_base_framework.framework_output,
"Executing query %s", query);
/* execute it */
res = PQexec(conn, query);
free(query);
if ((!res) || (PQresultStatus(res) != PGRES_COMMAND_OK)) {
opal_output(0, "***********************************************\n");
opal_output(0, "POSTGRES INSERT COMMAND FAILED - UNABLE TO LOG");
opal_output(0, "SENSOR DATA. ABORTING");
opal_output(0, "\n***********************************************");
PQclear(res);
return OPAL_ERROR;
}
opal_output_verbose(2, opal_db_base_framework.framework_output,
"Query succeeded");
PQclear(res);
return OPAL_SUCCESS;
}

34
opal/mca/db/postgres/db_postgres.h Обычный файл
Просмотреть файл

@ -0,0 +1,34 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OPAL_DB_POSTGRES_H
#define OPAL_DB_POSTGRES_H
#include "opal/mca/db/db.h"
BEGIN_C_DECLS
typedef struct {
opal_db_base_component_t super;
int num_worker_threads;
char *dbname;
char *table;
char *user;
char *pguri;
char *pgoptions;
char *pgtty;
} opal_db_postgres_component_t;
OPAL_MODULE_DECLSPEC extern opal_db_postgres_component_t mca_db_postgres_component;
OPAL_DECLSPEC extern opal_db_base_module_t opal_db_postgres_module;
END_C_DECLS
#endif /* OPAL_DB_POSTGRES_H */

171
opal/mca/db/postgres/db_postgres_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,171 @@
/*
* Copyright (c) 2013 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "ompi_info" from having to import
* entire components just to query their version and parameters.
*/
#include "opal_config.h"
#include "opal/constants.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_var.h"
#include "opal/mca/db/db.h"
#include "opal/mca/db/base/base.h"
#include "db_postgres.h"
extern opal_db_base_module_t opal_db_postgres_module;
char *opal_db_postgres_file;
static int postgres_component_open(void);
static int postgres_component_close(void);
static int postgres_component_query(mca_base_module_t **module, int *priority);
static int postgres_component_register(void);
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
opal_db_postgres_component_t mca_db_postgres_component = {
{
{
OPAL_DB_BASE_VERSION_1_0_0,
/* Component name and version */
"postgres",
OPAL_MAJOR_VERSION,
OPAL_MINOR_VERSION,
OPAL_RELEASE_VERSION,
/* Component open and close functions */
postgres_component_open,
postgres_component_close,
postgres_component_query,
postgres_component_register
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
}
}
};
static int postgres_component_open(void)
{
return OPAL_SUCCESS;
}
static int postgres_component_query(mca_base_module_t **module, int *priority)
{
if (NULL != mca_db_postgres_component.dbname &&
NULL != mca_db_postgres_component.table &&
NULL != mca_db_postgres_component.user &&
NULL != mca_db_postgres_component.pguri) {
*priority = 3; /* ahead of sqlite3 */
*module = (mca_base_module_t*)&opal_db_postgres_module;
return OPAL_SUCCESS;
}
*priority = 0;
*module = NULL;
return OPAL_ERROR;
}
static int postgres_component_close(void)
{
if (NULL != mca_db_postgres_component.dbname) {
free(mca_db_postgres_component.dbname);
}
if (NULL != mca_db_postgres_component.table) {
free(mca_db_postgres_component.table);
}
if (NULL != mca_db_postgres_component.user) {
free(mca_db_postgres_component.user);
}
if (NULL != mca_db_postgres_component.pguri) {
free(mca_db_postgres_component.pguri);
}
if (NULL != mca_db_postgres_component.pgoptions) {
free(mca_db_postgres_component.pgoptions);
}
if (NULL != mca_db_postgres_component.pgtty) {
free(mca_db_postgres_component.pgtty);
}
return OPAL_SUCCESS;
}
static int postgres_component_register(void) {
mca_base_component_t *c = &mca_db_postgres_component.super.base_version;
/* retrieve the name of the database to be used */
mca_db_postgres_component.dbname = NULL;
(void) mca_base_component_var_register (c, "database", "Name of database",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.dbname);
/* retrieve the name of the table to be used */
mca_db_postgres_component.table = NULL;
(void) mca_base_component_var_register (c, "table", "Name of table",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.table);
/* retrieve the name of the user to be used */
mca_db_postgres_component.user = NULL;
(void) mca_base_component_var_register (c, "user", "Name of database user:password",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.table);
/* retrieve the server:port */
mca_db_postgres_component.pguri = NULL;
(void) mca_base_component_var_register (c, "uri", "Contact info for Postgres server as ip:port",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.pguri);
/* retrieve any options to be used */
mca_db_postgres_component.pgoptions = NULL;
(void) mca_base_component_var_register (c, "options", "Options to pass to the database",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.pgoptions);
/* retrieve the tty argument */
mca_db_postgres_component.pgtty = NULL;
(void) mca_base_component_var_register (c, "tty", "TTY option for database",
MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.pgtty);
/* retrieve the number of worker threads to be used */
mca_db_postgres_component.num_worker_threads = 0;
(void) mca_base_component_var_register (c, "num_worker_threads",
"Number of worker threads to be used",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_db_postgres_component.num_worker_threads);
return OPAL_SUCCESS;
}