682 строки
18 KiB
Python
Исполняемый файл
682 строки
18 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
||
###############################################################################
|
||
# $Id$
|
||
#
|
||
# Project: GDAL/OGR Test Suite
|
||
# Purpose: Test read functionality for OGR ODBC driver.
|
||
# Author: Even Rouault <even dot rouault at spatialys.com>
|
||
#
|
||
###############################################################################
|
||
# Copyright (c) 2012, Even Rouault <even dot rouault at spatialys.com>
|
||
#
|
||
# Permission is hereby granted, free of charge, to any person obtaining a
|
||
# copy of this software and associated documentation files (the "Software"),
|
||
# to deal in the Software without restriction, including without limitation
|
||
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||
# and/or sell copies of the Software, and to permit persons to whom the
|
||
# Software is furnished to do so, subject to the following conditions:
|
||
#
|
||
# The above copyright notice and this permission notice shall be included
|
||
# in all copies or substantial portions of the Software.
|
||
#
|
||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
||
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||
# DEALINGS IN THE SOFTWARE.
|
||
###############################################################################
|
||
|
||
import os
|
||
import shutil
|
||
import sys
|
||
|
||
import gdaltest
|
||
import pytest
|
||
|
||
from osgeo import gdal, ogr
|
||
|
||
|
||
###############################################################################
|
||
@pytest.fixture(autouse=True, scope="module")
|
||
def module_disable_exceptions():
|
||
with gdaltest.disable_exceptions():
|
||
yield
|
||
|
||
|
||
@pytest.fixture(scope="module", autouse=True)
|
||
def setup_driver():
|
||
driver = ogr.GetDriverByName("ODBC")
|
||
if driver is None:
|
||
pytest.skip("ODBC driver not available", allow_module_level=True)
|
||
|
||
# we may have the ODBC GDAL driver, but be missing an ODBC driver for MS Access on the test environment
|
||
# so open a test dataset and check to see if it's supported
|
||
with gdaltest.disable_exceptions():
|
||
ds = driver.Open("data/mdb/empty.mdb")
|
||
if "MDB_ODBC_DRIVER_INSTALLED" in os.environ:
|
||
# if environment variable is set, then we know that the ODBC driver is installed and something
|
||
# unexpected has happened (i.e. GDAL driver is broken!)
|
||
assert ds is not None
|
||
elif ds is None:
|
||
pytest.skip(
|
||
"could not open DB. MDB ODBC driver probably missing or misconfigured",
|
||
allow_module_level=True,
|
||
)
|
||
|
||
|
||
@pytest.fixture()
|
||
def create_tmp_table():
|
||
if sys.platform != "win32":
|
||
pytest.skip("Requires an ODBC driver with write capabilities")
|
||
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
|
||
shutil.copy("data/mdb/empty.mdb", "tmp/odbc.mdb")
|
||
|
||
# Create and fill tables
|
||
ds = odbc_drv.Open("tmp/odbc.mdb")
|
||
ds.ExecuteSQL(
|
||
"CREATE TABLE test (intfield INT, doublefield DOUBLE, stringfield VARCHAR)"
|
||
)
|
||
ds.ExecuteSQL(
|
||
"INSERT INTO test (intfield, doublefield, stringfield) VALUES (1, 2.34, 'foo')"
|
||
)
|
||
|
||
ds.ExecuteSQL(
|
||
"CREATE TABLE test_with_pk (OGR_FID INT PRIMARY KEY, intfield INT, doublefield DOUBLE, stringfield VARCHAR)"
|
||
)
|
||
ds.ExecuteSQL("INSERT INTO test_with_pk (OGR_FID, intfield) VALUES (1, 2)")
|
||
ds.ExecuteSQL("INSERT INTO test_with_pk (OGR_FID, intfield) VALUES (2, 3)")
|
||
ds.ExecuteSQL("INSERT INTO test_with_pk (OGR_FID, intfield) VALUES (3, 4)")
|
||
ds.ExecuteSQL("INSERT INTO test_with_pk (OGR_FID, intfield) VALUES (4, 5)")
|
||
ds.ExecuteSQL("INSERT INTO test_with_pk (OGR_FID, intfield) VALUES (5, 6)")
|
||
ds = None
|
||
|
||
yield
|
||
|
||
gdal.Unlink("tmp/odbc.mdb")
|
||
|
||
|
||
@pytest.fixture()
|
||
def ogrsf_path():
|
||
import test_cli_utilities
|
||
|
||
path = test_cli_utilities.get_test_ogrsf_path()
|
||
if path is None:
|
||
pytest.skip("ogrsf test utility not found")
|
||
|
||
return path
|
||
|
||
|
||
def recent_enough_mdb_odbc_driver():
|
||
# At time of writing, mdbtools <= 0.9.4 has some deficiencies
|
||
# See https://github.com/OSGeo/gdal/pull/4354#issuecomment-907455798 for details
|
||
# So allow some tests only or Windows, or on a local machine that don't have the CI environment variable set
|
||
return sys.platform == "win32" or "CI" not in os.environ
|
||
|
||
|
||
###############################################################################
|
||
# Basic testing
|
||
|
||
|
||
def test_ogr_odbc_1(create_tmp_table):
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
# Test with ODBC:user/pwd@dsn syntax
|
||
ds = odbc_drv.Open(
|
||
"ODBC:user/pwd@DRIVER=Microsoft Access Driver (*.mdb);DBQ=tmp/odbc.mdb"
|
||
)
|
||
assert ds is not None
|
||
ds = None
|
||
|
||
# Test with ODBC:dsn syntax
|
||
ds = odbc_drv.Open("ODBC:DRIVER=Microsoft Access Driver (*.mdb);DBQ=tmp/odbc.mdb")
|
||
assert ds is not None
|
||
ds = None
|
||
|
||
# Test with ODBC:dsn,table_list syntax
|
||
ds = odbc_drv.Open(
|
||
"ODBC:DRIVER=Microsoft Access Driver (*.mdb);DBQ=tmp/odbc.mdb,test"
|
||
)
|
||
assert ds is not None
|
||
assert ds.GetLayerCount() == 1
|
||
ds = None
|
||
|
||
# Reopen and check
|
||
ds = odbc_drv.Open("tmp/odbc.mdb")
|
||
assert ds.GetLayerCount() == 2
|
||
|
||
lyr = ds.GetLayerByName("test")
|
||
feat = lyr.GetNextFeature()
|
||
if (
|
||
feat.GetField("intfield") != 1
|
||
or feat.GetField("doublefield") != 2.34
|
||
or feat.GetField("stringfield") != "foo"
|
||
):
|
||
feat.DumpReadable()
|
||
pytest.fail()
|
||
|
||
lyr = ds.GetLayerByName("test_with_pk")
|
||
# Test GetFeatureCount()
|
||
assert lyr.GetFeatureCount() == 5
|
||
|
||
# Test GetFeature()
|
||
feat = lyr.GetFeature(4)
|
||
if feat.GetField("intfield") != 5:
|
||
feat.DumpReadable()
|
||
pytest.fail()
|
||
|
||
# Test SetAttributeFilter()
|
||
lyr.SetAttributeFilter("intfield = 6")
|
||
feat = lyr.GetNextFeature()
|
||
if feat.GetFID() != 5:
|
||
feat.DumpReadable()
|
||
pytest.fail()
|
||
|
||
# Test ExecuteSQL()
|
||
sql_lyr = ds.ExecuteSQL("SELECT * FROM test")
|
||
feat = sql_lyr.GetNextFeature()
|
||
if (
|
||
feat.GetField("intfield") != 1
|
||
or feat.GetField("doublefield") != 2.34
|
||
or feat.GetField("stringfield") != "foo"
|
||
):
|
||
feat.DumpReadable()
|
||
pytest.fail()
|
||
ds.ReleaseResultSet(sql_lyr)
|
||
|
||
ds = None
|
||
|
||
|
||
###############################################################################
|
||
# Run test_ogrsf
|
||
|
||
|
||
def test_ogr_odbc_2(create_tmp_table, ogrsf_path):
|
||
ret = gdaltest.runexternal(ogrsf_path + " tmp/odbc.mdb")
|
||
|
||
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
||
|
||
|
||
###############################################################################
|
||
# Test that alternative MS Access file extensions can be read
|
||
|
||
|
||
def test_extensions():
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
ds = odbc_drv.Open("data/mdb/empty.style")
|
||
assert ds is not None
|
||
lyr = ds.GetLayerByName("Line Symbols")
|
||
assert lyr is not None
|
||
|
||
if os.environ.get("GITHUB_WORKFLOW", "") != "Windows builds":
|
||
# can't run this on Github "Windows builds" workflow, as that has the older
|
||
# 'Microsoft Access Driver (*.mdb)' ODBC driver only, which doesn't support accdb
|
||
# databases
|
||
ds = odbc_drv.Open("data/mdb/empty.accdb")
|
||
assert ds is not None
|
||
|
||
|
||
###############################################################################
|
||
# Test reading mdb with null memo fields (https://github.com/OSGeo/gdal/pull/3458)
|
||
|
||
|
||
def test_null_memo():
|
||
if not recent_enough_mdb_odbc_driver():
|
||
pytest.skip(
|
||
"test skipped because of assumption that a not enough version of MDBTools is available"
|
||
)
|
||
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
ds = odbc_drv.Open("data/mdb/null_memo.mdb")
|
||
|
||
lyr = ds.GetLayerByName("PROP")
|
||
expected_str = [
|
||
[
|
||
7400002,
|
||
"1",
|
||
0,
|
||
0,
|
||
"0101",
|
||
None,
|
||
981.156,
|
||
900,
|
||
None,
|
||
None,
|
||
None,
|
||
"0",
|
||
"0",
|
||
"2",
|
||
"0",
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
"4000",
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285310,
|
||
4250300,
|
||
"Κ",
|
||
None,
|
||
],
|
||
[
|
||
7400013,
|
||
"2",
|
||
0,
|
||
0,
|
||
"0101",
|
||
None,
|
||
391.468,
|
||
368.15,
|
||
None,
|
||
None,
|
||
None,
|
||
"0",
|
||
"0",
|
||
"1",
|
||
"0",
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
"4000",
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285273.0,
|
||
4250275.0,
|
||
"Κ",
|
||
None,
|
||
],
|
||
[
|
||
7400014,
|
||
"3",
|
||
0,
|
||
0,
|
||
"0101",
|
||
None,
|
||
1109.932,
|
||
850.5,
|
||
None,
|
||
None,
|
||
None,
|
||
"0",
|
||
"0",
|
||
"2",
|
||
"1",
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
"4000",
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285273.401,
|
||
4250229.261,
|
||
"Κ",
|
||
None,
|
||
],
|
||
[
|
||
7400015.0,
|
||
"4",
|
||
1,
|
||
0,
|
||
"0201",
|
||
"Ι",
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
510.0,
|
||
None,
|
||
None,
|
||
"2",
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285273.401,
|
||
4250229.261,
|
||
"Κ",
|
||
None,
|
||
],
|
||
[
|
||
7400016.0,
|
||
"5",
|
||
1,
|
||
1,
|
||
"0401",
|
||
"4",
|
||
None,
|
||
111.63,
|
||
None,
|
||
None,
|
||
None,
|
||
"0",
|
||
"0",
|
||
"2",
|
||
None,
|
||
300,
|
||
1000,
|
||
500,
|
||
1000,
|
||
"4000",
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285273.401,
|
||
4250229.261,
|
||
"Κ",
|
||
None,
|
||
],
|
||
[
|
||
7400017.0,
|
||
"6",
|
||
1,
|
||
2,
|
||
"0401",
|
||
"2",
|
||
None,
|
||
111.63,
|
||
None,
|
||
None,
|
||
None,
|
||
"0",
|
||
"0",
|
||
"2",
|
||
None,
|
||
300,
|
||
1000,
|
||
500,
|
||
1000,
|
||
"4000",
|
||
None,
|
||
None,
|
||
None,
|
||
"01",
|
||
"074",
|
||
285275.0,
|
||
4250227.0,
|
||
"Κ",
|
||
None,
|
||
],
|
||
]
|
||
|
||
i = 0
|
||
feat = lyr.GetNextFeature()
|
||
while feat is not None:
|
||
attrs = [feat.GetField(n) for n in range(29)]
|
||
for k in range(29):
|
||
if k in (5, 22, 27):
|
||
# skip some attributes which exhibit cross-platform variations -- they aren't relevant for this test!
|
||
continue
|
||
|
||
if attrs[k] != expected_str[i][k]:
|
||
feat.DumpReadable()
|
||
pytest.fail(
|
||
str(k) + ": " + str(attrs[k]) + " <> " + str(expected_str[i][k])
|
||
)
|
||
i = i + 1
|
||
feat = lyr.GetNextFeature()
|
||
|
||
|
||
###############################################################################
|
||
# Test LIST_ALL_TABLES open option
|
||
|
||
|
||
def test_ogr_odbc_list_all_tables():
|
||
if sys.platform == "win32":
|
||
pytest.skip(
|
||
"MS Access ODBC driver always culls system tables, nothing left to test"
|
||
)
|
||
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
ds = odbc_drv.Open("data/mdb/null_memo.mdb")
|
||
assert ds is not None
|
||
|
||
assert ds.GetLayerCount() == 1, "did not get expected layer count"
|
||
|
||
# Test LIST_ALL_TABLES=YES open option
|
||
odbc_ds_all_table = gdal.OpenEx(
|
||
"data/mdb/null_memo.mdb", gdal.OF_VECTOR, open_options=["LIST_ALL_TABLES=YES"]
|
||
)
|
||
|
||
assert odbc_ds_all_table.GetLayerCount() == 12, "did not get expected layer count"
|
||
layer_names = set(
|
||
odbc_ds_all_table.GetLayer(i).GetName()
|
||
for i in range(odbc_ds_all_table.GetLayerCount())
|
||
)
|
||
|
||
assert layer_names == {
|
||
"MSysObjects",
|
||
"MSysACEs",
|
||
"MSysQueries",
|
||
"MSysRelationships",
|
||
"MSysAccessObjects",
|
||
"MSysAccessXML",
|
||
"MSysNameMap",
|
||
"MSysNavPaneGroupCategories",
|
||
"MSysNavPaneGroups",
|
||
"MSysNavPaneGroupToObjects",
|
||
"MSysNavPaneObjectIDs",
|
||
"PROP",
|
||
}
|
||
|
||
private_layers = [
|
||
odbc_ds_all_table.GetLayer(i).GetName()
|
||
for i in range(odbc_ds_all_table.GetLayerCount())
|
||
if odbc_ds_all_table.IsLayerPrivate(i)
|
||
]
|
||
for name in [
|
||
"MSysObjects",
|
||
"MSysACEs",
|
||
"MSysQueries",
|
||
"MSysRelationships",
|
||
"MSysAccessObjects",
|
||
"MSysAccessXML",
|
||
"MSysNameMap",
|
||
"MSysNavPaneGroupCategories",
|
||
"MSysNavPaneGroups",
|
||
"MSysNavPaneGroupToObjects",
|
||
"MSysNavPaneObjectIDs",
|
||
]:
|
||
assert name in private_layers
|
||
assert "PROP" not in private_layers
|
||
|
||
|
||
###############################################################################
|
||
# Test opening a private table by name
|
||
|
||
|
||
def test_ogr_odbc_open_private_table():
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
ds = odbc_drv.Open("data/mdb/null_memo.mdb")
|
||
assert ds is not None
|
||
|
||
assert ds.GetLayerCount() == 1, "did not get expected layer count"
|
||
|
||
# open a standard layer by name
|
||
prop_lyr = ds.GetLayerByName("PROP")
|
||
assert prop_lyr is not None
|
||
assert prop_lyr.GetFeatureCount() == 6, "did not get expected feature count"
|
||
|
||
if sys.platform == "win32":
|
||
# nothing more to test -- the MS Access ODBC driver always culls system tables, so we can't open those
|
||
return
|
||
|
||
msys_objects_lyr = ds.GetLayerByName("MSysObjects")
|
||
assert msys_objects_lyr is not None
|
||
|
||
assert msys_objects_lyr is not None
|
||
assert (
|
||
msys_objects_lyr.GetFeatureCount() == 28
|
||
), "did not get expected feature count"
|
||
|
||
feat = msys_objects_lyr.GetNextFeature()
|
||
assert feat.GetField("Name") == "Tables"
|
||
|
||
# try a second time, same layer should be returned
|
||
msys_objects_lyr2 = ds.GetLayerByName("MSysObjects")
|
||
assert msys_objects_lyr2 is not None
|
||
|
||
# a layer which doesn't exist
|
||
other_lyr = ds.GetLayerByName("xxx")
|
||
assert other_lyr is None
|
||
|
||
|
||
###############################################################################
|
||
# Run test_ogrsf on null_memo database
|
||
|
||
|
||
def test_ogr_odbc_ogrsf_null_memo(ogrsf_path):
|
||
if not recent_enough_mdb_odbc_driver():
|
||
pytest.skip(
|
||
"test skipped because of assumption that a not enough version of MDBTools is available"
|
||
)
|
||
|
||
ret = gdaltest.runexternal(ogrsf_path + " data/mdb/null_memo.mdb")
|
||
|
||
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
||
|
||
|
||
###############################################################################
|
||
# Test reading MDB with real values (https://github.com/OSGeo/gdal/issues/3885)
|
||
|
||
|
||
def test_numeric_read():
|
||
if not recent_enough_mdb_odbc_driver():
|
||
pytest.skip(
|
||
"test skipped because of assumption that a not enough version of MDBTools is available"
|
||
)
|
||
|
||
odbc_drv = ogr.GetDriverByName("ODBC")
|
||
ds = odbc_drv.Open("data/mdb/numeric.mdb")
|
||
|
||
# NOTE that the bug from https://github.com/OSGeo/gdal/issues/3885 only gets triggered using the Windows Access ODBC
|
||
# driver AFTER reading a number of features. It can't be reproduced if we request only a single failing feature.
|
||
lyr = ds.GetLayerByName("INVENTORY")
|
||
expected_str = [
|
||
[
|
||
"dr8v0myu0nnx",
|
||
"Riverside Drive",
|
||
"East River Road",
|
||
"DEAD END",
|
||
0,
|
||
0.15268780291080475,
|
||
0.15268780291080475,
|
||
],
|
||
[
|
||
"dr8v0qp6p01f",
|
||
"Remington Parkway",
|
||
"East River Road",
|
||
"DEAD END",
|
||
0,
|
||
0.2121122032403946,
|
||
0.2121122032403946,
|
||
],
|
||
[
|
||
"dr8v0tepmsfv",
|
||
"Bronx Drive",
|
||
"East River Road",
|
||
"DEAD END",
|
||
0,
|
||
0.8030499815940857,
|
||
0.8030499815940857,
|
||
],
|
||
[
|
||
"dr8v0w0pf123",
|
||
"Delaware Avenue",
|
||
"East River Road",
|
||
"DEAD END",
|
||
0,
|
||
0.2692877948284149,
|
||
0.2692877948284149,
|
||
],
|
||
[
|
||
"dr8v0xxj74uw",
|
||
"Idle Lane",
|
||
"East River Road",
|
||
"DEAD END",
|
||
0,
|
||
0.1881732940673828,
|
||
0.1881732940673828,
|
||
],
|
||
[
|
||
"dr8v0yr2et5g",
|
||
"Park Circle",
|
||
"Crittenden Road",
|
||
"DEAD END",
|
||
0,
|
||
0.45297008752822876,
|
||
0.45297008752822876,
|
||
],
|
||
[
|
||
"dr8v1j5meh6x",
|
||
"Tower Drive",
|
||
"Brighton Henrietta Town Line Road",
|
||
"DEAD END",
|
||
0,
|
||
0.19818930327892303,
|
||
0.19818930327892303,
|
||
],
|
||
[
|
||
"dr8v1m1624qj",
|
||
"Western Drive",
|
||
"Brighton Henrietta Town Line Road",
|
||
"Southern Drive",
|
||
0,
|
||
0.07699214667081833,
|
||
0.07699214667081833,
|
||
],
|
||
]
|
||
|
||
i = 0
|
||
feat = lyr.GetNextFeature()
|
||
while feat is not None:
|
||
attrs = [feat.GetField(n) for n in range(6)]
|
||
for k in range(6):
|
||
if attrs[k] != expected_str[i][k]:
|
||
feat.DumpReadable()
|
||
pytest.fail(
|
||
str(k) + ": " + str(attrs[k]) + " <> " + str(expected_str[i][k])
|
||
)
|
||
i = i + 1
|
||
if i == 8:
|
||
# that's enough to reproduce the bug...
|
||
break
|
||
feat = lyr.GetNextFeature()
|
||
|
||
|
||
###############################################################################
|
||
# Run test_ogrsf on numeric database
|
||
|
||
|
||
def test_ogr_odbc_ogrsf_numeric(ogrsf_path):
|
||
if not recent_enough_mdb_odbc_driver():
|
||
pytest.skip(
|
||
"test skipped because of assumption that a not enough version of MDBTools is available"
|
||
)
|
||
|
||
ret = gdaltest.runexternal(ogrsf_path + " data/mdb/numeric.mdb")
|
||
|
||
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|