#!/usr/bin/env pytest # -*- coding: utf-8 -*- ############################################################################### # $Id$ # # Project: GDAL/OGR Test Suite # Purpose: VirtualOGR testing # Author: Even Rouault # ############################################################################### # Copyright (c) 2012-2013, Even Rouault # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. ############################################################################### import os import sys import gdaltest import pytest # Linter says this isn't used, but it actually is via pytest magic :) from ogr.ogr_sql_sqlite import require_ogr_sql_sqlite # noqa from osgeo import gdal, ogr require_ogr_sql_sqlite # to make pyflakes happy ############################################################################### pytestmark = pytest.mark.usefixtures("require_ogr_sql_sqlite") ############################################################################### @pytest.fixture(autouse=True, scope="module") def module_disable_exceptions(): with gdaltest.disable_exceptions(): yield ############################################################################### @pytest.fixture() def require_auto_load_extension(): if ogr.GetDriverByName("SQLite") is None: pytest.skip("SQLite missing") ds = ogr.Open(":memory:") with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL("PRAGMA compile_options") if sql_lyr: for f in sql_lyr: if f.GetField(0) == "OMIT_LOAD_EXTENSION": ds.ReleaseResultSet(sql_lyr) pytest.skip("SQLite3 built with OMIT_LOAD_EXTENSION") ds.ReleaseResultSet(sql_lyr) ############################################################################### def ogr_virtualogr_run_sql(sql_statement): ds = ogr.GetDriverByName("SQLite").CreateDataSource(":memory:") gdal.ErrorReset() with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL(sql_statement) success = gdal.GetLastErrorMsg() == "" ds.ReleaseResultSet(sql_lyr) ds = None if not success: return success ds = ogr.GetDriverByName("Memory").CreateDataSource("") gdal.ErrorReset() with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL(sql_statement, dialect="SQLITE") success = gdal.GetLastErrorMsg() == "" ds.ReleaseResultSet(sql_lyr) ds = None return success ############################################################################### # Basic tests def test_ogr_virtualogr_1(require_auto_load_extension): # Invalid syntax assert not ogr_virtualogr_run_sql("CREATE VIRTUAL TABLE poly USING VirtualOGR()") # Nonexistent dataset assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('foo')" ) # Dataset with 0 layer assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('')" ) # Dataset with more than 1 layer assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data')" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp')" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0)" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 1)" ) # Invalid value for update_mode assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 'foo')" ) # Nonexistent layer assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'foo')" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly')" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 0)" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1)" ) assert ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1, 1)" ) # Too many arguments assert not ogr_virtualogr_run_sql( "CREATE VIRTUAL TABLE poly USING VirtualOGR('data/poly.shp', 0, 'poly', 1, 1, bla)" ) ############################################################################### # Test detection of suspicious use of VirtualOGR def test_ogr_virtualogr_2(require_auto_load_extension): ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_2.db") ds.ExecuteSQL("CREATE VIRTUAL TABLE foo USING VirtualOGR('data/poly.shp')") ds.ExecuteSQL("CREATE TABLE spy_table (spy_content VARCHAR)") ds.ExecuteSQL("CREATE TABLE regular_table (bar VARCHAR)") ds = None # Check that foo isn't listed ds = ogr.Open("/vsimem/ogr_virtualogr_2.db") for i in range(ds.GetLayerCount()): assert ds.GetLayer(i).GetName() != "foo" ds = None # Check that it is listed if OGR_SQLITE_LIST_VIRTUAL_OGR=YES with gdal.config_option("OGR_SQLITE_LIST_VIRTUAL_OGR", "YES"): ds = ogr.Open("/vsimem/ogr_virtualogr_2.db") found = False for i in range(ds.GetLayerCount()): if ds.GetLayer(i).GetName() == "foo": found = True assert found ds = None # Add suspicious trigger ds = ogr.Open("/vsimem/ogr_virtualogr_2.db", update=1) ds.ExecuteSQL( "CREATE TRIGGER spy_trigger INSERT ON regular_table BEGIN " + "INSERT OR REPLACE INTO spy_table (spy_content) " + "SELECT OGR_STYLE FROM foo; END;" ) ds = None gdal.ErrorReset() ds = ogr.Open("/vsimem/ogr_virtualogr_2.db") for i in range(ds.GetLayerCount()): assert ds.GetLayer(i).GetName() != "foo" # An error will be triggered at the time the trigger is used with gdaltest.error_handler(): ds.ExecuteSQL("INSERT INTO regular_table (bar) VALUES ('bar')") did_not_get_error = gdal.GetLastErrorMsg() == "" ds = None if did_not_get_error: gdal.Unlink("/vsimem/ogr_virtualogr_2.db") pytest.fail("expected a failure") gdal.ErrorReset() with gdal.config_option( "OGR_SQLITE_LIST_VIRTUAL_OGR", "YES" ), gdaltest.error_handler(): ds = ogr.Open("/vsimem/ogr_virtualogr_2.db") if gdal.GetLastErrorMsg() == "": ds = None gdal.Unlink("/vsimem/ogr_virtualogr_2.db") pytest.fail("expected an error message") did_not_get_error = gdal.GetLastErrorMsg() == "" ds = None gdal.Unlink("/vsimem/ogr_virtualogr_2.db") assert not did_not_get_error, "expected a failure" ############################################################################### # Test GDAL as a SQLite3 dynamically loaded extension def test_ogr_virtualogr_3(require_auto_load_extension): # Find path of libgdal libgdal_name = gdaltest.find_lib("gdal") if libgdal_name is None: pytest.skip() print("Found " + libgdal_name) # Find path of libsqlite3 or libspatialite libsqlite_name = gdaltest.find_lib("sqlite3") if libsqlite_name is None: libsqlite_name = gdaltest.find_lib("spatialite") if libsqlite_name is None: pytest.skip() print("Found " + libsqlite_name) python_exe = sys.executable if sys.platform == "win32": python_exe = python_exe.replace("\\", "/") libgdal_name = libgdal_name.replace("\\", "/") libsqlite_name = libsqlite_name.replace("\\", "/") ret = gdaltest.runexternal( python_exe + ' ogr_as_sqlite_extension.py "%s" "%s"' % (libsqlite_name, libgdal_name), check_memleak=False, ) if ret.startswith("skip"): pytest.skip() assert gdal.VersionInfo("RELEASE_NAME") in ret ############################################################################### # Test ogr_datasource_load_layers() def test_ogr_virtualogr_4(require_auto_load_extension): ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db") sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp')") ds.ReleaseResultSet(sql_lyr) with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp')") ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL("SELECT * FROM poly") ret = sql_lyr.GetFeatureCount() ds.ReleaseResultSet(sql_lyr) ds = None gdal.Unlink("/vsimem/ogr_virtualogr_4.db") assert ret == 10 ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db") sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('data/poly.shp', 0)") ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL("SELECT * FROM poly") ret = sql_lyr.GetFeatureCount() ds.ReleaseResultSet(sql_lyr) ds = None gdal.Unlink("/vsimem/ogr_virtualogr_4.db") assert ret == 10 ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db") sql_lyr = ds.ExecuteSQL( "SELECT ogr_datasource_load_layers('data/poly.shp', 0, 'prefix')" ) ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL("SELECT * FROM prefix_poly") ret = sql_lyr.GetFeatureCount() ds.ReleaseResultSet(sql_lyr) ds = None gdal.Unlink("/vsimem/ogr_virtualogr_4.db") assert ret == 10 # Various error conditions ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_virtualogr_4.db") with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers(0)") ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL("SELECT ogr_datasource_load_layers('foo')") ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL( "SELECT ogr_datasource_load_layers('data/poly.shp','a')" ) ds.ReleaseResultSet(sql_lyr) sql_lyr = ds.ExecuteSQL( "SELECT ogr_datasource_load_layers('data/poly.shp', 0, 0)" ) ds.ReleaseResultSet(sql_lyr) ds = None gdal.Unlink("/vsimem/ogr_virtualogr_4.db") ############################################################################### # Test failed CREATE VIRTUAL TABLE USING VirtualOGR def test_ogr_virtualogr_5(require_auto_load_extension): # Create a CSV with duplicate column name fp = gdal.VSIFOpenL("/vsimem/ogr_virtualogr_5.csv", "wt") line = "foo,foo\n" gdal.VSIFWriteL(line, 1, len(line), fp) line = "bar,baz\n" gdal.VSIFWriteL(line, 1, len(line), fp) gdal.VSIFCloseL(fp) ds = ogr.GetDriverByName("Memory").CreateDataSource("") with gdaltest.error_handler(): sql_lyr = ds.ExecuteSQL( "CREATE VIRTUAL TABLE lyr2 USING VirtualOGR('/vsimem/ogr_virtualogr_5.csv')", dialect="SQLITE", ) assert sql_lyr is None ds = None gdal.Unlink("/vsimem/ogr_virtualogr_5.csv") ############################################################################### def test_ogr_sqlite_load_extensions_load_self(require_auto_load_extension): # Find path of libgdal libgdal_name = gdaltest.find_lib("gdal") if libgdal_name is None: pytest.skip() # Load ourselves ! not allowed with gdaltest.config_option("OGR_SQLITE_LOAD_EXTENSIONS", libgdal_name): with gdaltest.error_handler(): ds = ogr.Open(":memory:") assert ds is not None assert gdal.GetLastErrorMsg() != "" # Load ourselves ! not allowed with gdaltest.config_option( "OGR_SQLITE_LOAD_EXTENSIONS", "ENABLE_SQL_LOAD_EXTENSION" ): ds = ogr.Open(":memory:") assert ds is not None with gdaltest.error_handler(): ds.ReleaseResultSet( ds.ExecuteSQL("SELECT load_extension('" + libgdal_name + "')") ) assert gdal.GetLastErrorMsg() != "" ############################################################################### def test_ogr_sqlite_load_extensions_load_my_test_sqlite3_ext_name( require_auto_load_extension, ): found = False gdal_driver_path = gdal.GetConfigOption("GDAL_DRIVER_PATH") if gdal_driver_path: for name in [ "my_test_sqlite3_ext.so", "my_test_sqlite3_ext.dll", "my_test_sqlite3_ext.dylib", ]: filename = os.path.join(gdal_driver_path, name) if os.path.exists(filename): found = True break if not found: pytest.skip() with gdaltest.config_option("OGR_SQLITE_LOAD_EXTENSIONS", filename): ds = ogr.Open(":memory:") assert ds is not None sql_lyr = ds.ExecuteSQL("SELECT myext()") assert sql_lyr f = sql_lyr.GetNextFeature() assert f.GetField(0) == "this works!" ds.ReleaseResultSet(sql_lyr) with gdaltest.config_option( "OGR_SQLITE_LOAD_EXTENSIONS", "ENABLE_SQL_LOAD_EXTENSION" ): ds = ogr.Open(":memory:") assert ds is not None ds.ReleaseResultSet(ds.ExecuteSQL("SELECT load_extension('" + filename + "')")) sql_lyr = ds.ExecuteSQL("SELECT myext()") assert sql_lyr f = sql_lyr.GetNextFeature() assert f.GetField(0) == "this works!" ds.ReleaseResultSet(sql_lyr)