gdal/autotest/ogr/ogr_virtualogr.py

435 строки
14 KiB
Python
Исполняемый файл

#!/usr/bin/env pytest
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR Test Suite
# Purpose: VirtualOGR testing
# Author: Even Rouault <even dot rouault at spatialys.com>
#
###############################################################################
# Copyright (c) 2012-2013, Even Rouault <even dot rouault at spatialys.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
###############################################################################
import os
import sys
import gdaltest
import pytest
# Linter says this isn't used, but it actually is via pytest magic :)
from ogr.ogr_sql_sqlite import require_ogr_sql_sqlite # noqa
from osgeo import gdal, ogr
require_ogr_sql_sqlite
# to make pyflakes happy
###############################################################################
pytestmark = pytest.mark.usefixtures("require_ogr_sql_sqlite")
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def module_disable_exceptions():
with gdaltest.disable_exceptions():
yield
###############################################################################
@pytest.fixture()
def require_auto_load_extension():
if ogr.GetDriverByName("SQLite") is None:
pytest.skip("SQLite missing")
ds = ogr.Open(":memory:")
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL("PRAGMA compile_options")
if sql_lyr:
for f in sql_lyr:
if f.GetField(0) == "OMIT_LOAD_EXTENSION":
ds.ReleaseResultSet(sql_lyr)
pytest.skip("SQLite3 built with OMIT_LOAD_EXTENSION")
ds.ReleaseResultSet(sql_lyr)
###############################################################################
def ogr_virtualogr_run_sql(sql_statement):
ds = ogr.GetDriverByName("SQLite").CreateDataSource(":memory:")
gdal.ErrorReset()
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL(sql_statement)
success = gdal.GetLastErrorMsg() == ""
ds.ReleaseResultSet(sql_lyr)
ds = None
if not success:
return success
ds = ogr.GetDriverByName("Memory").CreateDataSource("")
gdal.ErrorReset()
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL(sql_statement, dialect="SQLITE")
success = gdal.GetLastErrorMsg() == ""
ds.ReleaseResultSet(sql_lyr)
ds = None
return success
###############################################################################
# Basic tests
def test_ogr_virtualogr_1(require_auto_load_extension):
# Invalid syntax
assert not ogr_virtualogr_run_sql("CREATE VIRTUAL TABLE poly USING VirtualOGR()")
# Nonexistent dataset
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('foo')"
)
# Dataset with 0 layer
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('<OGRVRTDataSource></OGRVRTDataSource>')"
)
# Dataset with more than 1 layer
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data')"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp')"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0)"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 1)"
)
# Invalid value for update_mode
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 'foo')"
)
# Nonexistent layer
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'foo')"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly')"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 0)"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1)"
)
assert ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1, 1)"
)
# Too many arguments
assert not ogr_virtualogr_run_sql(
"CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1, 1, bla)"
)
###############################################################################
# Test detection of suspicious use of VirtualOGR
def test_ogr_virtualogr_2(require_auto_load_extension):
ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_2.db")
ds.ExecuteSQL("CREATE VIRTUAL TABLE foo USING VirtualOGR('data/poly.shp')")
ds.ExecuteSQL("CREATE TABLE spy_table (spy_content VARCHAR)")
ds.ExecuteSQL("CREATE TABLE regular_table (bar VARCHAR)")
ds = None
# Check that foo isn't listed
ds = ogr.Open("/vsimem/ogr_virtualogr_2.db")
for i in range(ds.GetLayerCount()):
assert ds.GetLayer(i).GetName() != "foo"
ds = None
# Check that it is listed if OGR_SQLITE_LIST_VIRTUAL_OGR=YES
with gdal.config_option("OGR_SQLITE_LIST_VIRTUAL_OGR", "YES"):
ds = ogr.Open("/vsimem/ogr_virtualogr_2.db")
found = False
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == "foo":
found = True
assert found
ds = None
# Add suspicious trigger
ds = ogr.Open("/vsimem/ogr_virtualogr_2.db", update=1)
ds.ExecuteSQL(
"CREATE TRIGGER spy_trigger INSERT ON regular_table BEGIN "
+ "INSERT OR REPLACE INTO spy_table (spy_content) "
+ "SELECT OGR_STYLE FROM foo; END;"
)
ds = None
gdal.ErrorReset()
ds = ogr.Open("/vsimem/ogr_virtualogr_2.db")
for i in range(ds.GetLayerCount()):
assert ds.GetLayer(i).GetName() != "foo"
# An error will be triggered at the time the trigger is used
with gdaltest.error_handler():
ds.ExecuteSQL("INSERT INTO regular_table (bar) VALUES ('bar')")
did_not_get_error = gdal.GetLastErrorMsg() == ""
ds = None
if did_not_get_error:
gdal.Unlink("/vsimem/ogr_virtualogr_2.db")
pytest.fail("expected a failure")
gdal.ErrorReset()
with gdal.config_option(
"OGR_SQLITE_LIST_VIRTUAL_OGR", "YES"
), gdaltest.error_handler():
ds = ogr.Open("/vsimem/ogr_virtualogr_2.db")
if gdal.GetLastErrorMsg() == "":
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_2.db")
pytest.fail("expected an error message")
did_not_get_error = gdal.GetLastErrorMsg() == ""
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_2.db")
assert not did_not_get_error, "expected a failure"
###############################################################################
# Test GDAL as a SQLite3 dynamically loaded extension
def test_ogr_virtualogr_3(require_auto_load_extension):
# Find path of libgdal
libgdal_name = gdaltest.find_lib("gdal")
if libgdal_name is None:
pytest.skip()
print("Found " + libgdal_name)
# Find path of libsqlite3 or libspatialite
libsqlite_name = gdaltest.find_lib("sqlite3")
if libsqlite_name is None:
libsqlite_name = gdaltest.find_lib("spatialite")
if libsqlite_name is None:
pytest.skip()
print("Found " + libsqlite_name)
python_exe = sys.executable
if sys.platform == "win32":
python_exe = python_exe.replace("\\", "/")
libgdal_name = libgdal_name.replace("\\", "/")
libsqlite_name = libsqlite_name.replace("\\", "/")
ret = gdaltest.runexternal(
python_exe
+ ' ogr_as_sqlite_extension.py "%s" "%s"' % (libsqlite_name, libgdal_name),
check_memleak=False,
)
if ret.startswith("skip"):
pytest.skip()
assert gdal.VersionInfo("RELEASE_NAME") in ret
###############################################################################
# Test ogr_datasource_load_layers()
def test_ogr_virtualogr_4(require_auto_load_extension):
ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db")
sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp')")
ds.ReleaseResultSet(sql_lyr)
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp')")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("SELECT * FROM poly")
ret = sql_lyr.GetFeatureCount()
ds.ReleaseResultSet(sql_lyr)
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_4.db")
assert ret == 10
ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db")
sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp', 0)")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("SELECT * FROM poly")
ret = sql_lyr.GetFeatureCount()
ds.ReleaseResultSet(sql_lyr)
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_4.db")
assert ret == 10
ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db")
sql_lyr = ds.ExecuteSQL(
"SELECT ogr_datasource_load_layers('data/poly.shp', 0, 'prefix')"
)
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("SELECT * FROM prefix_poly")
ret = sql_lyr.GetFeatureCount()
ds.ReleaseResultSet(sql_lyr)
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_4.db")
assert ret == 10
# Various error conditions
ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db")
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers(0)")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('foo')")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL(
"SELECT ogr_datasource_load_layers('data/poly.shp','a')"
)
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL(
"SELECT ogr_datasource_load_layers('data/poly.shp', 0, 0)"
)
ds.ReleaseResultSet(sql_lyr)
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_4.db")
###############################################################################
# Test failed CREATE VIRTUAL TABLE USING VirtualOGR
def test_ogr_virtualogr_5(require_auto_load_extension):
# Create a CSV with duplicate column name
fp = gdal.VSIFOpenL("/vsimem/ogr_virtualogr_5.csv", "wt")
line = "foo,foo\n"
gdal.VSIFWriteL(line, 1, len(line), fp)
line = "bar,baz\n"
gdal.VSIFWriteL(line, 1, len(line), fp)
gdal.VSIFCloseL(fp)
ds = ogr.GetDriverByName("Memory").CreateDataSource("")
with gdaltest.error_handler():
sql_lyr = ds.ExecuteSQL(
"CREATE VIRTUAL TABLE lyr2 USING VirtualOGR('/vsimem/ogr_virtualogr_5.csv')",
dialect="SQLITE",
)
assert sql_lyr is None
ds = None
gdal.Unlink("/vsimem/ogr_virtualogr_5.csv")
###############################################################################
def test_ogr_sqlite_load_extensions_load_self(require_auto_load_extension):
# Find path of libgdal
libgdal_name = gdaltest.find_lib("gdal")
if libgdal_name is None:
pytest.skip()
# Load ourselves ! not allowed
with gdaltest.config_option("OGR_SQLITE_LOAD_EXTENSIONS", libgdal_name):
with gdaltest.error_handler():
ds = ogr.Open(":memory:")
assert ds is not None
assert gdal.GetLastErrorMsg() != ""
# Load ourselves ! not allowed
with gdaltest.config_option(
"OGR_SQLITE_LOAD_EXTENSIONS", "ENABLE_SQL_LOAD_EXTENSION"
):
ds = ogr.Open(":memory:")
assert ds is not None
with gdaltest.error_handler():
ds.ReleaseResultSet(
ds.ExecuteSQL("SELECT load_extension('" + libgdal_name + "')")
)
assert gdal.GetLastErrorMsg() != ""
###############################################################################
def test_ogr_sqlite_load_extensions_load_my_test_sqlite3_ext_name(
require_auto_load_extension,
):
found = False
gdal_driver_path = gdal.GetConfigOption("GDAL_DRIVER_PATH")
if gdal_driver_path:
for name in [
"my_test_sqlite3_ext.so",
"my_test_sqlite3_ext.dll",
"my_test_sqlite3_ext.dylib",
]:
filename = os.path.join(gdal_driver_path, name)
if os.path.exists(filename):
found = True
break
if not found:
pytest.skip()
with gdaltest.config_option("OGR_SQLITE_LOAD_EXTENSIONS", filename):
ds = ogr.Open(":memory:")
assert ds is not None
sql_lyr = ds.ExecuteSQL("SELECT myext()")
assert sql_lyr
f = sql_lyr.GetNextFeature()
assert f.GetField(0) == "this works!"
ds.ReleaseResultSet(sql_lyr)
with gdaltest.config_option(
"OGR_SQLITE_LOAD_EXTENSIONS", "ENABLE_SQL_LOAD_EXTENSION"
):
ds = ogr.Open(":memory:")
assert ds is not None
ds.ReleaseResultSet(ds.ExecuteSQL("SELECT load_extension('" + filename + "')"))
sql_lyr = ds.ExecuteSQL("SELECT myext()")
assert sql_lyr
f = sql_lyr.GetNextFeature()
assert f.GetField(0) == "this works!"
ds.ReleaseResultSet(sql_lyr)