gdal/autotest/ogr/ogr_openfilegdb.py

2505 строки
84 KiB
Python
Исполняемый файл

#!/usr/bin/env pytest
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR Test Suite
# Purpose: OpenFileGDB driver testing.
# Author: Even Rouault <even dot rouault at spatialys.com>
#
###############################################################################
# Copyright (c) 2014, Even Rouault <even dot rouault at spatialys.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
###############################################################################
import os
import shutil
import gdaltest
import ogrtest
import pytest
from osgeo import gdal, ogr, osr
pytestmark = pytest.mark.require_driver("OpenFileGDB")
ogrtest.openfilegdb_datalist = [
["none", ogr.wkbNone, None],
["point", ogr.wkbPoint, "POINT (1 2)"],
["multipoint", ogr.wkbMultiPoint, "MULTIPOINT (1 2,3 4)"],
[
"linestring",
ogr.wkbLineString,
"LINESTRING (1 2,3 4)",
"MULTILINESTRING ((1 2,3 4))",
],
["multilinestring", ogr.wkbMultiLineString, "MULTILINESTRING ((1 2,3 4))"],
[
"multilinestring_multipart",
ogr.wkbMultiLineString,
"MULTILINESTRING ((1 2,3 4),(5 6,7 8))",
],
[
"polygon",
ogr.wkbPolygon,
"POLYGON ((0 0,0 1,1 1,1 0,0 0))",
"MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0)))",
],
[
"multipolygon",
ogr.wkbMultiPolygon,
"MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0),(0.25 0.25,0.75 0.25,0.75 0.75,0.25 0.75,0.25 0.25)),((2 0,2 1,3 1,3 0,2 0)))",
],
["point25D", ogr.wkbPoint25D, "POINT (1 2 3)"],
["multipoint25D", ogr.wkbMultiPoint25D, "MULTIPOINT (1 2 -10,3 4 -20)"],
[
"linestring25D",
ogr.wkbLineString25D,
"LINESTRING (1 2 -10,3 4 -20)",
"MULTILINESTRING ((1 2 -10,3 4 -20))",
],
[
"multilinestring25D",
ogr.wkbMultiLineString25D,
"MULTILINESTRING ((1 2 -10,3 4 -20))",
],
[
"multilinestring25D_multipart",
ogr.wkbMultiLineString25D,
"MULTILINESTRING ((1 2 -10,3 4 -20),(5 6 -30,7 8 -40))",
],
[
"polygon25D",
ogr.wkbPolygon25D,
"POLYGON ((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10))",
"MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))",
],
[
"multipolygon25D",
ogr.wkbMultiPolygon25D,
"MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))",
],
[
"multipatch",
ogr.wkbGeometryCollection25D,
"GEOMETRYCOLLECTION Z (TIN Z (((0.0 0.0 0,0.0 1.0 0,1.0 0.0 0,0.0 0.0 0)),((0.0 1.0 0,1.0 0.0 0,1.0 1.0 0,0.0 1.0 0))),TIN Z (((10.0 0.0 0,10.0 1.0 0,11.0 0.0 0,10.0 0.0 0)),((10.0 0.0 0,11.0 0.0 0,10.0 -1.0 0,10.0 0.0 0))),TIN Z (((5.0 0.0 0,5.0 1.0 0,6.0 0.0 0,5.0 0.0 0))),MULTIPOLYGON Z (((100.0 0.0 0,100.0 1.0 0,101.0 1.0 0,101.0 0.0 0,100.0 0.0 0),(100.25 0.25 0,100.75 0.25 0,100.75 0.75 0,100.75 0.25 0,100.25 0.25 0))))",
],
["null_polygon", ogr.wkbPolygon, None],
["empty_polygon", ogr.wkbPolygon, "POLYGON EMPTY", None],
["empty_multipoint", ogr.wkbMultiPoint, "MULTIPOINT EMPTY", None],
]
ogrtest.openfilegdb_datalist_m = [
["pointm", ogr.wkbPointM, "POINT M (1 2 3)"],
["pointzm", ogr.wkbPointZM, "POINT ZM (1 2 3 4)"],
["multipointm", ogr.wkbMultiPointM, "MULTIPOINT M ((1 2 3),(4 5 6))"],
["multipointzm", ogr.wkbMultiPointZM, "MULTIPOINT ZM ((1 2 3 4),(5 6 7 8))"],
[
"linestringm",
ogr.wkbLineStringM,
"LINESTRING M (1 2 3,4 5 6)",
"MULTILINESTRING M ((1 2 3,4 5 6))",
],
[
"linestringzm",
ogr.wkbLineStringZM,
"LINESTRING ZM (1 2 3 4,5 6 7 8)",
"MULTILINESTRING ZM ((1 2 3 4,5 6 7 8))",
],
["multilinestringm", ogr.wkbMultiLineStringM, "MULTILINESTRING M ((1 2 3,4 5 6))"],
[
"multilinestringzm",
ogr.wkbMultiLineStringZM,
"MULTILINESTRING ZM ((1 2 3 4,5 6 7 8))",
],
[
"polygonm",
ogr.wkbPolygonM,
"POLYGON M ((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1))",
"MULTIPOLYGON M (((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1)))",
],
[
"polygonzm",
ogr.wkbPolygonZM,
"POLYGON ZM ((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1))",
"MULTIPOLYGON ZM (((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1)))",
],
[
"multipolygonm",
ogr.wkbMultiPolygonM,
"MULTIPOLYGON M (((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1)))",
],
[
"multipolygonzm",
ogr.wkbMultiPolygonZM,
"MULTIPOLYGON ZM (((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1)))",
],
["empty_polygonm", ogr.wkbPolygonM, "POLYGON M EMPTY", None],
]
@pytest.fixture(scope="module", autouse=True)
def setup_driver():
# remove FileGDB driver before running tests
filegdb_driver = ogr.GetDriverByName("FileGDB")
if filegdb_driver is not None:
filegdb_driver.Deregister()
yield
if filegdb_driver is not None:
print("Reregistering FileGDB driver")
filegdb_driver.Register()
@pytest.fixture()
def ogrsf_path():
import test_cli_utilities
path = test_cli_utilities.get_test_ogrsf_path()
if path is None:
pytest.skip("ogrsf test utility not found")
return path
@pytest.fixture(
params=[
{"src": "data/filegdb/testopenfilegdb.gdb.zip", "version_10": True},
{"src": "data/filegdb/testopenfilegdb92.gdb.zip", "version_10": False},
{"src": "data/filegdb/testopenfilegdb93.gdb.zip", "version_10": False},
]
)
def gdb_source(request):
return request.param
###############################################################################
# Make test data
def ogr_openfilegdb_make_test_data():
try:
shutil.rmtree("data/filegdb/testopenfilegdb.gdb")
except OSError:
pass
ds = ogr.GetDriverByName("FileGDB").CreateDataSource(
"data/filegdb/testopenfilegdb.gdb"
)
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
options = [
"COLUMN_TYPES=smallint=esriFieldTypeSmallInteger,float=esriFieldTypeSingle,guid=esriFieldTypeGUID,xml=esriFieldTypeXML"
]
for data in ogrtest.openfilegdb_datalist:
if data[1] == ogr.wkbNone:
lyr = ds.CreateLayer(data[0], geom_type=data[1], options=options)
elif data[0] == "multipatch":
lyr = ds.CreateLayer(
data[0],
geom_type=data[1],
srs=srs,
options=["CREATE_MULTIPATCH=YES", options[0]],
)
else:
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=options)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("smallint", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("int", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("float", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("real", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("adate", ogr.OFTDateTime))
lyr.CreateField(ogr.FieldDefn("guid", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("xml", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("binary", ogr.OFTBinary))
lyr.CreateField(ogr.FieldDefn("nullint", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("binary2", ogr.OFTBinary))
# We need at least 5 features so that test_ogrsf can test SetFeature()
for i in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
if data[1] != ogr.wkbNone and data[2] is not None:
feat.SetGeometry(ogr.CreateGeometryFromWkt(data[2]))
feat.SetField("id", i + 1)
feat.SetField("str", "foo_é")
feat.SetField("smallint", -13)
feat.SetField("int", 123)
feat.SetField("float", 1.5)
feat.SetField("real", 4.56)
feat.SetField("adate", "2013/12/26 12:34:56")
feat.SetField("guid", "{12345678-9abc-DEF0-1234-567890ABCDEF}")
feat.SetFieldBinaryFromHexString("binary", "00FF7F")
feat.SetField("xml", "<foo></foo>")
feat.SetFieldBinaryFromHexString("binary2", "123456")
lyr.CreateFeature(feat)
if data[0] == "none":
# Create empty feature
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
if False: # pylint: disable=using-constant-test
lyr = ds.CreateLayer("sparse_layer", geom_type=ogr.wkbPoint)
for i in range(4096):
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
lyr.DeleteFeature(feat.GetFID())
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
if True: # pylint: disable=using-constant-test
lyr = ds.CreateLayer("big_layer", geom_type=ogr.wkbNone)
lyr.CreateField(ogr.FieldDefn("real", ogr.OFTReal))
with gdal.config_option("FGDB_BULK_LOAD", "YES"):
# for i in range(340*341+1):
for i in range(340 + 1):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, i % 4)
lyr.CreateFeature(feat)
if True: # pylint: disable=using-constant-test
lyr = ds.CreateLayer("hole", geom_type=ogr.wkbPoint, srs=None)
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("str", "f1")
lyr.CreateFeature(feat)
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("str", "fid2")
lyr.CreateFeature(feat)
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("str", "fid3")
lyr.CreateFeature(feat)
feat = None
lyr.CreateField(ogr.FieldDefn("int0", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("str2", ogr.OFTString))
for i in range(8):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("str", "fid%d" % (4 + i))
feat.SetField("int0", 4 + i)
feat.SetField("str2", " ")
lyr.CreateFeature(feat)
feat = None
for i in range(8):
lyr.CreateField(ogr.FieldDefn("int%d" % (i + 1), ogr.OFTInteger))
lyr.DeleteFeature(1)
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("str", "fid13")
lyr.CreateFeature(feat)
feat = None
if True: # pylint: disable=using-constant-test
lyr = ds.CreateLayer("no_field", geom_type=ogr.wkbNone, srs=None)
for i in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
if True: # pylint: disable=using-constant-test
lyr = ds.CreateLayer("several_polygons", geom_type=ogr.wkbPolygon, srs=None)
for i in range(3):
for j in range(3):
feat = ogr.Feature(lyr.GetLayerDefn())
x1 = 2 * i
x2 = 2 * i + 1
y1 = 2 * j
y2 = 2 * j + 1
geom = ogr.CreateGeometryFromWkt(
"POLYGON((%d %d,%d %d,%d %d,%d %d,%d %d))"
% (x1, y1, x1, y2, x2, y2, x2, y1, x1, y1)
)
feat.SetGeometry(geom)
lyr.CreateFeature(feat)
feat = None
if True: # pylint: disable=using-constant-test
lyr = ds.CreateLayer(
"testnotnullable",
geom_type=ogr.wkbPoint,
srs=None,
options=["GEOMETRY_NULLABLE=NO"],
)
field_defn = ogr.FieldDefn("field_not_nullable", ogr.OFTString)
field_defn.SetNullable(0)
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_nullable", ogr.OFTString)
lyr.CreateField(field_defn)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("field_not_nullable", "not_null")
f.SetGeomFieldDirectly(
"geomfield_not_nullable", ogr.CreateGeometryFromWkt("POINT(0 0)")
)
lyr.CreateFeature(f)
f = None
for data in ogrtest.openfilegdb_datalist_m:
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=[])
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeometry(ogr.CreateGeometryFromWkt(data[2]))
lyr.CreateFeature(feat)
for fld_name in [
"id",
"str",
"smallint",
"int",
"float",
"real",
"adate",
"guid",
"nullint",
]:
ds.ExecuteSQL("CREATE INDEX idx_%s ON point(%s)" % (fld_name, fld_name))
ds.ExecuteSQL("CREATE INDEX idx_id ON none(id)")
ds.ExecuteSQL("CREATE INDEX idx_real ON big_layer(real)")
ds = None
gdal.Unlink("data/filegdb/testopenfilegdb.gdb.zip")
os.chdir("data/filegdb")
os.system("zip -r -9 testopenfilegdb.gdb.zip testopenfilegdb.gdb")
os.chdir("../..")
shutil.rmtree("data/filegdb/testopenfilegdb.gdb")
###############################################################################
# Basic tests
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_1(gdb_source):
filename = gdb_source["src"]
version10 = gdb_source["version_10"]
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
with pytest.raises(Exception):
assert gdal.OpenEx(filename, gdal.OF_RASTER)
assert gdal.OpenEx(filename, gdal.OF_RASTER | gdal.OF_VECTOR) is not None
ds = ogr.Open(filename)
for data in ogrtest.openfilegdb_datalist:
lyr_name = data[0]
if lyr_name == "multilinestring_multipart" and not version10:
continue
if lyr_name == "multilinestring25D_multipart" and not version10:
continue
lyr = ds.GetLayerByName(lyr_name)
expected_geom_type = data[1]
if expected_geom_type == ogr.wkbLineString:
expected_geom_type = ogr.wkbMultiLineString
elif expected_geom_type == ogr.wkbLineString25D:
expected_geom_type = ogr.wkbMultiLineString25D
elif expected_geom_type == ogr.wkbPolygon:
expected_geom_type = ogr.wkbMultiPolygon
elif expected_geom_type == ogr.wkbPolygon25D:
expected_geom_type = ogr.wkbMultiPolygon25D
assert lyr.GetGeomType() == expected_geom_type, lyr.GetName()
assert (
expected_geom_type is ogr.wkbNone
or lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 1
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("str"))
.GetWidth()
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("smallint"))
.GetSubType()
== ogr.OFSTInt16
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("float"))
.GetSubType()
== ogr.OFSTFloat32
)
if data[1] != ogr.wkbNone:
assert (
lyr.GetSpatialRef().IsSame(
srs, options=["IGNORE_DATA_AXIS_TO_SRS_AXIS_MAPPING=YES"]
)
== 1
)
feat = lyr.GetNextFeature()
if data[1] != ogr.wkbNone:
try:
expected_wkt = data[3]
except IndexError:
expected_wkt = data[2]
geom = feat.GetGeometryRef()
if geom:
geom = geom.ExportToWkt()
if (
geom != expected_wkt
and ogrtest.check_feature_geometry(feat, expected_wkt) == 1
):
feat.DumpReadable()
pytest.fail(expected_wkt)
if (
feat.GetField("id") != 1
or feat.GetField("smallint") != -13
or feat.GetField("int") != 123
or feat.GetField("float") != 1.5
or feat.GetField("real") != 4.56
or feat.GetField("adate") != "2013/12/26 12:34:56"
or feat.GetField("guid") != "{12345678-9ABC-DEF0-1234-567890ABCDEF}"
or (version10 and feat.GetField("xml") != "<foo></foo>")
or feat.GetField("binary") != "00FF7F"
or feat.GetField("binary2") != "123456"
):
feat.DumpReadable()
pytest.fail()
if version10:
sql_lyr = ds.ExecuteSQL("GetLayerDefinition %s" % lyr.GetName())
assert sql_lyr is not None
feat = sql_lyr.GetNextFeature()
assert feat is not None
feat = sql_lyr.GetNextFeature()
assert feat is None
lyr.ResetReading()
lyr.TestCapability("foo")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLayerMetadata %s" % lyr.GetName())
assert sql_lyr is not None
feat = sql_lyr.GetNextFeature()
assert feat is not None
ds.ReleaseResultSet(sql_lyr)
if version10:
sql_lyr = ds.ExecuteSQL("GetLayerDefinition foo")
assert sql_lyr is None
sql_lyr = ds.ExecuteSQL("GetLayerMetadata foo")
assert sql_lyr is None
if version10:
for data in ogrtest.openfilegdb_datalist_m:
lyr = ds.GetLayerByName(data[0])
expected_geom_type = data[1]
if expected_geom_type == ogr.wkbLineStringM:
expected_geom_type = ogr.wkbMultiLineStringM
elif expected_geom_type == ogr.wkbLineStringZM:
expected_geom_type = ogr.wkbMultiLineStringZM
elif expected_geom_type == ogr.wkbPolygonM:
expected_geom_type = ogr.wkbMultiPolygonM
elif expected_geom_type == ogr.wkbPolygonZM:
expected_geom_type = ogr.wkbMultiPolygonZM
assert lyr.GetGeomType() == expected_geom_type, data
feat = lyr.GetNextFeature()
try:
expected_wkt = data[3]
except IndexError:
expected_wkt = data[2]
if expected_wkt is None:
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail(data)
elif ogrtest.check_feature_geometry(feat, expected_wkt) != 0:
feat.DumpReadable()
pytest.fail(data)
ds = None
###############################################################################
# Run test_ogrsf
@pytest.fixture()
def ogrsf_run(ogrsf_path, gdb_source):
ret = gdaltest.runexternal(ogrsf_path + " -ro " + gdb_source["src"])
success = "INFO" in ret and "ERROR" not in ret
assert success
def test_ogr_openfilegdb_2(ogrsf_run, gdb_source):
pass
###############################################################################
# Open a .gdbtable directly
def test_ogr_openfilegdb_3():
ds = ogr.Open(
"/vsizip/data/filegdb/testopenfilegdb.gdb.zip/testopenfilegdb.gdb/a00000009.gdbtable"
)
assert ds.GetLayerCount() == 1
lyr = ds.GetLayer(0)
assert lyr.GetName() == "none"
# Try opening a system table
lyr = ds.GetLayerByName("GDB_SystemCatalog")
assert lyr.GetName() == "GDB_SystemCatalog"
feat = lyr.GetNextFeature()
assert feat.GetField("Name") == "GDB_SystemCatalog"
lyr = ds.GetLayerByName("GDB_SystemCatalog")
assert lyr.GetName() == "GDB_SystemCatalog"
ds = None
###############################################################################
# Test use of attribute indexes
def test_ogr_openfilegdb_4():
ds = ogr.Open("/vsizip/data/filegdb/testopenfilegdb.gdb.zip/testopenfilegdb.gdb")
lyr = ds.GetLayerByName("point")
tests = [
("id = 1", [1]),
("1 = id", [1]),
("id = 5", [5]),
("id = 0", []),
("id = 6", []),
("id <= 1", [1]),
("1 >= id", [1]),
("id >= 5", [5]),
("5 <= id", [5]),
("id < 1", []),
("1 > id", []),
("id >= 1", [1, 2, 3, 4, 5]),
("id > 0", [1, 2, 3, 4, 5]),
("0 < id", [1, 2, 3, 4, 5]),
("id <= 5", [1, 2, 3, 4, 5]),
("id < 6", [1, 2, 3, 4, 5]),
("id <> 0", [1, 2, 3, 4, 5]),
("id IS NOT NULL", [1, 2, 3, 4, 5]),
("id IS NULL", []),
("nullint IS NOT NULL", []),
("nullint IS NULL", [1, 2, 3, 4, 5]),
("str = 'foo_e'", [], 1),
("str = 'foo_é'", [1, 2, 3, 4, 5], 1),
("str <= 'foo_é'", [1, 2, 3, 4, 5], 0),
("str >= 'foo_é'", [1, 2, 3, 4, 5], 1),
("str <> 'foo_é'", [], 0),
("str < 'foo_é'", [], 0),
("str > 'foo_é'", [], 0),
("smallint = -13", [1, 2, 3, 4, 5]),
("smallint <= -13", [1, 2, 3, 4, 5]),
("smallint >= -13", [1, 2, 3, 4, 5]),
("smallint < -13", []),
("smallint > -13", []),
("int = 123", [1, 2, 3, 4, 5]),
("int <= 123", [1, 2, 3, 4, 5]),
("int >= 123", [1, 2, 3, 4, 5]),
("int < 123", []),
("int > 123", []),
("float = 1.5", [1, 2, 3, 4, 5]),
("float <= 1.5", [1, 2, 3, 4, 5]),
("float >= 1.5", [1, 2, 3, 4, 5]),
("float < 1.5", []),
("float > 1.5", []),
("real = 4.56", [1, 2, 3, 4, 5]),
("real <= 4.56", [1, 2, 3, 4, 5]),
("real >= 4.56", [1, 2, 3, 4, 5]),
("real < 4.56", []),
("real > 4.56", []),
("adate = '2013/12/26 12:34:56'", [1, 2, 3, 4, 5]),
("adate <= '2013/12/26 12:34:56'", [1, 2, 3, 4, 5]),
("adate >= '2013/12/26 12:34:56'", [1, 2, 3, 4, 5]),
("adate < '2013/12/26 12:34:56'", []),
("adate > '2013/12/26 12:34:56'", []),
("guid = '{12345678-9ABC-DEF0-1234-567890ABCDEF}'", [1, 2, 3, 4, 5]),
("guid <= '{12345678-9ABC-DEF0-1234-567890ABCDEF}'", [1, 2, 3, 4, 5]),
("guid >= '{12345678-9ABC-DEF0-1234-567890ABCDEF}'", [1, 2, 3, 4, 5]),
("guid < '{12345678-9ABC-DEF0-1234-567890ABCDEF}'", []),
("guid > '{12345678-9ABC-DEF0-1234-567890ABCDEF}'", []),
("guid = '{'", []),
("guid > '{'", [1, 2, 3, 4, 5]),
("NOT(id = 1)", [2, 3, 4, 5]),
("id = 1 OR id = -1", [1]),
("id = -1 OR id = 1", [1]),
("id = 1 OR id = 1", [1]),
("id = 1 OR id = 2", [1, 2]), # exclusive branches
("id < 3 OR id > 3", [1, 2, 4, 5]), # exclusive branches
("id > 3 OR id < 3", [1, 2, 4, 5]), # exclusive branches
("id <= 3 OR id >= 4", [1, 2, 3, 4, 5]), # exclusive branches
("id >= 4 OR id <= 3", [1, 2, 3, 4, 5]), # exclusive branches
("id < 3 OR id >= 3", [1, 2, 3, 4, 5]),
("id <= 3 OR id >= 3", [1, 2, 3, 4, 5]),
("id <= 5 OR id >= 1", [1, 2, 3, 4, 5]),
("id <= 1.5 OR id >= 2", [1, 2, 3, 4, 5]),
("id IS NULL OR id IS NOT NULL", [1, 2, 3, 4, 5]),
("float < 1.5 OR float > 1.5", []),
("float <= 1.5 OR float >= 1.5", [1, 2, 3, 4, 5]),
("float < 1.5 OR float > 2", []),
("float < 1 OR float > 2.5", []),
("str < 'foo_é' OR str > 'z'", [], 0),
("adate < '2013/12/26 12:34:56' OR adate > '2014/01/01'", []),
("id = 1 AND id = -1", []),
("id = -1 AND id = 1", []),
("id = 1 AND id = 1", [1]),
("id = 1 AND id = 2", []),
("id <= 5 AND id >= 1", [1, 2, 3, 4, 5]),
("id <= 3 AND id >= 3", [3]),
("id = 1 AND float = 1.5", [1]),
("id BETWEEN 1 AND 5", [1, 2, 3, 4, 5]),
("id IN (1)", [1]),
("id IN (5,4,3,2,1)", [1, 2, 3, 4, 5]),
("fid = 1", [1], 0), # no index used
("fid BETWEEN 1 AND 1", [1], 0), # no index used
("fid IN (1)", [1], 0), # no index used
("fid IS NULL", [], 0), # no index used
("fid IS NOT NULL", [1, 2, 3, 4, 5], 0), # no index used
("xml <> ''", [1, 2, 3, 4, 5], 0), # no index used
("id = 1 AND xml <> ''", [1], 1), # index partially used
("xml <> '' AND id = 1", [1], 1), # index partially used
("NOT(id = 1 AND xml <> '')", [2, 3, 4, 5], 0), # no index used
("id = 1 OR xml <> ''", [1, 2, 3, 4, 5], 0), # no index used
("id = id", [1, 2, 3, 4, 5], 0), # no index used
("id = 1 + 0", [1], 0), # no index used (currently...)
]
for test in tests:
if len(test) == 2:
(where_clause, fids) = test
expected_attr_index_use = 2
else:
(where_clause, fids, expected_attr_index_use) = test
lyr.SetAttributeFilter(where_clause)
sql_lyr = ds.ExecuteSQL("GetLayerAttrIndexUse %s" % lyr.GetName())
attr_index_use = int(sql_lyr.GetNextFeature().GetField(0))
ds.ReleaseResultSet(sql_lyr)
assert attr_index_use == expected_attr_index_use, (
where_clause,
fids,
expected_attr_index_use,
)
assert lyr.GetFeatureCount() == len(fids), (where_clause, fids)
for fid in fids:
feat = lyr.GetNextFeature()
assert feat.GetFID() == fid, (where_clause, fids)
feat = lyr.GetNextFeature()
assert feat is None, (where_clause, fids)
lyr = ds.GetLayerByName("none")
tests = [
("id = 1", [1]),
("id IS NULL", [6]),
("id IS NOT NULL", [1, 2, 3, 4, 5]),
("id IS NULL OR id IS NOT NULL", [1, 2, 3, 4, 5, 6]),
("id = 1 OR id IS NULL", [1, 6]),
("id IS NULL OR id = 1", [1, 6]),
]
for test in tests:
if len(test) == 2:
(where_clause, fids) = test
expected_attr_index_use = 2
else:
(where_clause, fids, expected_attr_index_use) = test
lyr.SetAttributeFilter(where_clause)
sql_lyr = ds.ExecuteSQL("GetLayerAttrIndexUse %s" % lyr.GetName())
attr_index_use = int(sql_lyr.GetNextFeature().GetField(0))
ds.ReleaseResultSet(sql_lyr)
assert attr_index_use == expected_attr_index_use, (
where_clause,
fids,
expected_attr_index_use,
)
assert lyr.GetFeatureCount() == len(fids), (where_clause, fids)
for fid in fids:
feat = lyr.GetNextFeature()
assert feat.GetFID() == fid, (where_clause, fids)
feat = lyr.GetNextFeature()
assert feat is None, (where_clause, fids)
lyr = ds.GetLayerByName("big_layer")
tests = [
("real = 0", 86, 1),
("real = 1", 85, 2),
("real = 2", 85, 3),
("real = 3", 85, 4),
("real >= 0", 86 + 3 * 85, None),
("real < 4", 86 + 3 * 85, None),
("real > 1 AND real < 2", 0, None),
("real < 0", 0, None),
]
for (where_clause, count, start) in tests:
lyr.SetAttributeFilter(where_clause)
assert lyr.GetFeatureCount() == count, (where_clause, count)
for i in range(count):
feat = lyr.GetNextFeature()
assert not (
feat is None or (start is not None and feat.GetFID() != i * 4 + start)
), (where_clause, count)
feat = lyr.GetNextFeature()
assert feat is None, (where_clause, count)
ds = None
###############################################################################
# Test use of attribute indexes on truncated strings
def test_ogr_openfilegdb_str_indexed_truncated():
ds = ogr.Open("data/filegdb/test_str_indexed_truncated.gdb")
lyr = ds.GetLayerByName("test")
IDX_NOT_USED = 0
IDX_USED = 1
tests = [
("str = 'a'", [1], IDX_USED),
("str = 'aa'", [2], IDX_USED),
("str != 'aa'", [1, 3], IDX_NOT_USED),
("str = 'aaa'", [3], IDX_USED),
("str >= 'aaa'", [3], IDX_USED),
("str > 'aaa'", [], IDX_NOT_USED),
("str > 'aa_'", [3], IDX_NOT_USED),
("str <= 'aab'", [1, 2, 3], IDX_NOT_USED),
("str = 'aaa '", [], IDX_USED),
("str != 'aaa '", [1, 2, 3], IDX_NOT_USED),
("str <= 'aaa '", [1, 2, 3], IDX_NOT_USED),
("str <= 'aaaX'", [1, 2, 3], IDX_NOT_USED),
("str >= 'aaa '", [], IDX_USED),
("str = 'aaaX'", [], IDX_USED),
("str = 'aaaXX'", [], IDX_USED),
("str = 'aaa '", [], IDX_USED),
("str IN ('a', 'b')", [1], IDX_USED),
("str IN ('aaa')", [3], IDX_USED),
("str IN ('aaa', 'aaa ')", [3], IDX_USED),
("str IN ('aaa ')", [], IDX_USED),
("str IN ('aaaX')", [], IDX_USED),
("str IN ('aaaXX')", [], IDX_USED),
]
for where_clause, fids, expected_attr_index_use in tests:
lyr.SetAttributeFilter(where_clause)
sql_lyr = ds.ExecuteSQL("GetLayerAttrIndexUse %s" % lyr.GetName())
attr_index_use = int(sql_lyr.GetNextFeature().GetField(0))
ds.ReleaseResultSet(sql_lyr)
assert attr_index_use == expected_attr_index_use, (
where_clause,
fids,
expected_attr_index_use,
)
assert [f.GetFID() for f in lyr] == fids, (where_clause, fids)
###############################################################################
# Test opening an unzipped dataset
def test_ogr_openfilegdb_5():
try:
shutil.rmtree("tmp/testopenfilegdb.gdb")
except OSError:
pass
try:
gdaltest.unzip("tmp/", "data/filegdb/testopenfilegdb.gdb.zip")
except OSError:
pytest.skip()
try:
os.stat("tmp/testopenfilegdb.gdb")
except OSError:
pytest.skip()
ds = ogr.Open("tmp/testopenfilegdb.gdb")
assert ds is not None
###############################################################################
# Test special SQL processing for min/max/count/sum/avg values
def test_ogr_openfilegdb_6():
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
# With indices
sql_lyr = ds.ExecuteSQL(
"select min(id), max(id), count(id), sum(id), avg(id), min(str), min(smallint), "
"avg(smallint), min(float), avg(float), min(real), avg(real), min(adate), avg(adate), min(guid), min(nullint), avg(nullint) from point"
)
assert sql_lyr is not None
feat = sql_lyr.GetNextFeature()
if (
feat.GetField("MIN_id") != 1
or feat.GetField("MAX_id") != 5
or feat.GetField("COUNT_id") != 5
or feat.GetField("SUM_id") != 15.0
or feat.GetField("AVG_id") != 3.0
or feat.GetField("MIN_str")[0:4] != "foo_"
or feat.GetField("MIN_smallint") != -13
or feat.GetField("AVG_smallint") != -13
or feat.GetField("MIN_float") != 1.5
or feat.GetField("AVG_float") != 1.5
or feat.GetField("MIN_real") != 4.56
or feat.GetField("AVG_real") != 4.56
or feat.GetField("MIN_adate") != "2013/12/26 12:34:56"
or feat.GetField("AVG_adate") != "2013/12/26 12:34:56"
or feat.GetField("MIN_guid") != "{12345678-9ABC-DEF0-1234-567890ABCDEF}"
or feat.IsFieldSet("MIN_nullint")
or feat.IsFieldSet("AVG_nullint")
):
feat.DumpReadable()
ds.ReleaseResultSet(sql_lyr)
pytest.fail()
ds.ReleaseResultSet(sql_lyr)
# No index
sql_lyr = ds.ExecuteSQL("select min(id), avg(id) from multipoint")
feat = sql_lyr.GetNextFeature()
if feat.GetField("MIN_id") != 1 or feat.GetField("AVG_id") != 3.0:
feat.DumpReadable()
ds.ReleaseResultSet(sql_lyr)
pytest.fail()
ds.ReleaseResultSet(sql_lyr)
###############################################################################
# Test special SQL processing for ORDER BY
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_7():
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
tests = [ # Optimized:
("select * from point order by id", 5, 1, 1),
("select id, str from point order by id desc", 5, 5, 1),
("select * from point where id = 1 order by id", 1, 1, 1),
("select * from big_layer order by real", 86 + 3 * 85, 1, 1),
("select * from big_layer order by real limit 0", 0, None, 1),
("select * from big_layer order by real offset 10000", 0, None, 1),
("select * from big_layer order by real limit 1", 1, 1, 1),
("select * from big_layer order by real limit 1 offset 0", 1, 1, 1),
("select * from big_layer order by real limit 1 offset 1", 1, 5, 1),
("select * from big_layer order by real limit 2", 2, 1, 1),
("select * from big_layer order by real limit 100000", 86 + 3 * 85, 1, 1),
(
"select * from big_layer order by real limit 100000 offset 1",
86 + 3 * 85 - 1,
5,
1,
),
("select * from big_layer order by real desc", 86 + 3 * 85, 4 * 85, 1),
# Invalid :
("select foo from", None, None, None),
("select foo from bar", None, None, None),
("select * from point order by foo", None, None, None),
# Non-optimized :
("select * from point order by xml", None, None, 0),
("select fid from point order by id", None, None, 0),
("select cast(id as float) from point order by id", None, None, 0),
("select distinct id from point order by id", None, None, 0),
("select 1 from point order by id", None, None, 0),
("select count(*) from point order by id", None, None, 0),
("select * from point order by nullint", None, None, 0),
("select * from point where id = 1 or id = 2 order by id", None, None, 0),
("select * from point where id = 1 order by id, float", None, None, 0),
("select * from point where float > 0 order by id", None, None, 0),
]
for (sql, feat_count, first_fid, expected_optimized) in tests:
if expected_optimized is None:
gdal.PushErrorHandler("CPLQuietErrorHandler")
sql_lyr = ds.ExecuteSQL(sql)
if expected_optimized is None:
gdal.PopErrorHandler()
if expected_optimized is None:
if sql_lyr is not None:
ds.ReleaseResultSet(sql_lyr)
pytest.fail(sql, feat_count, first_fid)
continue
assert sql_lyr is not None, (sql, feat_count, first_fid)
if expected_optimized:
if sql_lyr.GetFeatureCount() != feat_count:
ds.ReleaseResultSet(sql_lyr)
pytest.fail(sql, feat_count, first_fid)
feat = sql_lyr.GetNextFeature()
if feat_count > 0:
if feat.GetFID() != first_fid:
ds.ReleaseResultSet(sql_lyr)
feat.DumpReadable()
pytest.fail(sql, feat_count, first_fid)
else:
assert first_fid is None
assert feat is None
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLastSQLUsedOptimizedImplementation")
optimized = int(sql_lyr.GetNextFeature().GetField(0))
ds.ReleaseResultSet(sql_lyr)
assert optimized == expected_optimized, (sql, feat_count, first_fid)
if optimized and "big_layer" not in sql:
import test_cli_utilities
if test_cli_utilities.get_test_ogrsf_path() is not None:
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path()
+ ' -ro data/filegdb/testopenfilegdb.gdb.zip -sql "%s"' % sql
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1, (
sql,
feat_count,
first_fid,
)
###############################################################################
# Test reading a .gdbtable without .gdbtablx
def test_ogr_openfilegdb_8():
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
dict_feat_count = {}
for i in range(ds.GetLayerCount()):
lyr = ds.GetLayer(i)
dict_feat_count[lyr.GetName()] = lyr.GetFeatureCount()
ds = None
dict_feat_count2 = {}
with gdal.config_option("OPENFILEGDB_IGNORE_GDBTABLX", "YES"):
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
for i in range(ds.GetLayerCount()):
lyr = ds.GetLayer(i)
dict_feat_count2[lyr.GetName()] = lyr.GetFeatureCount()
assert dict_feat_count == dict_feat_count2
lyr = ds.GetLayerByName("hole")
# Not exactly in the order that one might expect, but logical when
# looking at the structure of the .gdbtable
expected_str = [
"fid13",
"fid2",
"fid3",
"fid4",
"fid5",
"fid6",
"fid7",
"fid8",
"fid9",
"fid10",
"fid11",
None,
]
i = 0
feat = lyr.GetNextFeature()
while feat is not None:
if feat.GetField("str") != expected_str[i]:
feat.DumpReadable()
pytest.fail()
i = i + 1
feat = lyr.GetNextFeature()
###############################################################################
# Test reading a .gdbtable outside a .gdb
def test_ogr_openfilegdb_9():
try:
os.stat("tmp/testopenfilegdb.gdb")
except OSError:
pytest.skip()
shutil.copy("tmp/testopenfilegdb.gdb/a00000009.gdbtable", "tmp/a00000009.gdbtable")
shutil.copy("tmp/testopenfilegdb.gdb/a00000009.gdbtablx", "tmp/a00000009.gdbtablx")
ds = ogr.Open("tmp/a00000009.gdbtable")
assert ds is not None
lyr = ds.GetLayer(0)
feat = lyr.GetNextFeature()
assert feat is not None
###############################################################################
# Test various error conditions
def fuzz(filename, offset):
with open(filename, "rb+") as f:
f.seek(offset, 0)
v = ord(f.read(1))
f.seek(offset, 0)
f.write(chr(255 - v).encode("ISO-8859-1"))
return (filename, offset, v)
def unfuzz(backup):
(filename, offset, v) = backup
with open(filename, "rb+") as f:
f.seek(offset, 0)
f.write(chr(v).encode("ISO-8859-1"))
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_10():
try:
os.stat("tmp/testopenfilegdb.gdb")
except OSError:
pytest.skip()
shutil.copytree("tmp/testopenfilegdb.gdb", "tmp/testopenfilegdb_fuzzed.gdb")
if False: # pylint: disable=using-constant-test
for filename in [
"tmp/testopenfilegdb_fuzzed.gdb/a00000001.gdbtable",
"tmp/testopenfilegdb_fuzzed.gdb/a00000001.gdbtablx",
]:
errors = set()
offsets = []
last_error_msg = ""
last_offset = -1
for offset in range(os.stat(filename).st_size):
# print(offset)
backup = fuzz(filename, offset)
gdal.ErrorReset()
# print(offset)
ds = ogr.Open("tmp/testopenfilegdb_fuzzed.gdb")
error_msg = gdal.GetLastErrorMsg()
feat = None
if ds is not None:
gdal.ErrorReset()
lyr = ds.GetLayerByName("GDB_SystemCatalog")
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if lyr is not None:
gdal.ErrorReset()
feat = lyr.GetNextFeature()
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if feat is None or error_msg != "":
if offset - last_offset >= 4 or last_error_msg != error_msg:
if error_msg != "" and error_msg not in errors:
errors.add(error_msg)
offsets.append(offset)
else:
offsets.append(offset)
last_offset = offset
last_error_msg = error_msg
ds = None
unfuzz(backup)
print(offsets)
for filename in [
"tmp/testopenfilegdb_fuzzed.gdb/a00000004.gdbindexes",
"tmp/testopenfilegdb_fuzzed.gdb/a00000004.CatItemsByPhysicalName.atx",
]:
errors = set()
offsets = []
last_error_msg = ""
last_offset = -1
for offset in range(os.stat(filename).st_size):
# print(offset)
backup = fuzz(filename, offset)
gdal.ErrorReset()
# print(offset)
ds = ogr.Open("tmp/testopenfilegdb_fuzzed.gdb")
error_msg = gdal.GetLastErrorMsg()
feat = None
if ds is not None:
gdal.ErrorReset()
lyr = ds.GetLayerByName("GDB_Items")
lyr.SetAttributeFilter("PhysicalName = 'NO_FIELD'")
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if lyr is not None:
gdal.ErrorReset()
feat = lyr.GetNextFeature()
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if feat is None or error_msg != "":
if offset - last_offset >= 4 or last_error_msg != error_msg:
if error_msg != "" and error_msg not in errors:
errors.add(error_msg)
offsets.append(offset)
else:
offsets.append(offset)
last_offset = offset
last_error_msg = error_msg
ds = None
unfuzz(backup)
print(offsets)
else:
for (filename, offsets) in [
(
"tmp/testopenfilegdb_fuzzed.gdb/a00000001.gdbtable",
[
4,
5,
6,
7,
32,
33,
41,
42,
52,
59,
60,
63,
64,
72,
73,
77,
78,
79,
80,
81,
101,
102,
104,
105,
111,
180,
],
),
(
"tmp/testopenfilegdb_fuzzed.gdb/a00000001.gdbtablx",
[4, 7, 11, 12, 16, 31, 5136, 5140, 5142, 5144],
),
]:
for offset in offsets:
backup = fuzz(filename, offset)
with gdaltest.error_handler():
gdal.ErrorReset()
ds = ogr.Open("tmp/testopenfilegdb_fuzzed.gdb")
error_msg = gdal.GetLastErrorMsg()
feat = None
if ds is not None:
gdal.ErrorReset()
lyr = ds.GetLayerByName("GDB_SystemCatalog")
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if lyr is not None:
gdal.ErrorReset()
feat = lyr.GetNextFeature()
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if feat is not None and error_msg == "":
print(
"%s: expected problem at offset %d, but did not find"
% (filename, offset)
)
ds = None
unfuzz(backup)
for (filename, offsets) in [
(
"tmp/testopenfilegdb_fuzzed.gdb/a00000004.gdbindexes",
[
0,
4,
5,
44,
45,
66,
67,
100,
101,
116,
117,
148,
149,
162,
163,
206,
207,
220,
221,
224,
280,
281,
],
),
(
"tmp/testopenfilegdb_fuzzed.gdb/a00000004.CatItemsByPhysicalName.atx",
[4, 12, 8196, 8300, 8460, 8620, 8780, 8940, 9100, 12290, 12294, 12298],
),
]:
for offset in offsets:
# print(offset)
backup = fuzz(filename, offset)
with gdaltest.error_handler():
gdal.ErrorReset()
ds = ogr.Open("tmp/testopenfilegdb_fuzzed.gdb")
error_msg = gdal.GetLastErrorMsg()
feat = None
if ds is not None:
gdal.ErrorReset()
lyr = ds.GetLayerByName("GDB_Items")
lyr.SetAttributeFilter("PhysicalName = 'NO_FIELD'")
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if lyr is not None:
gdal.ErrorReset()
feat = lyr.GetNextFeature()
if error_msg == "":
error_msg = gdal.GetLastErrorMsg()
if feat is not None and error_msg == "":
print(
"%s: expected problem at offset %d, but did not find"
% (filename, offset)
)
ds = None
unfuzz(backup)
###############################################################################
# Test spatial filtering
SPI_IN_BUILDING = 0
SPI_COMPLETED = 1
SPI_INVALID = 2
def get_spi_state(ds, lyr):
sql_lyr = ds.ExecuteSQL("GetLayerSpatialIndexState %s" % lyr.GetName())
value = int(sql_lyr.GetNextFeature().GetField(0))
ds.ReleaseResultSet(sql_lyr)
return value
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_in_memory_spatial_filter():
with gdaltest.config_option("OPENFILEGDB_USE_SPATIAL_INDEX", "NO"):
# Test building spatial index with GetFeatureCount()
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("several_polygons")
assert get_spi_state(ds, lyr) == SPI_IN_BUILDING
lyr.ResetReading()
assert get_spi_state(ds, lyr) == SPI_IN_BUILDING
assert lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
assert lyr.GetFeatureCount() == 1
assert get_spi_state(ds, lyr) == SPI_COMPLETED
# Should return cached value
assert lyr.GetFeatureCount() == 1
# Should use index
c = 0
feat = lyr.GetNextFeature()
while feat is not None:
c = c + 1
feat = lyr.GetNextFeature()
assert c == 1
feat = None
lyr = None
ds = None
# Test iterating without spatial index already built
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("several_polygons")
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
c = 0
feat = lyr.GetNextFeature()
assert get_spi_state(ds, lyr) == SPI_IN_BUILDING
while feat is not None:
c = c + 1
feat = lyr.GetNextFeature()
assert c == 1
assert get_spi_state(ds, lyr) == SPI_COMPLETED
feat = None
lyr = None
ds = None
# Test GetFeatureCount() without spatial index already built, with no matching feature
# when GEOS is available
if ogrtest.have_geos():
expected_count = 0
else:
expected_count = 5
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
lyr.SetSpatialFilterRect(1.4, 0.4, 1.6, 0.6)
assert lyr.GetFeatureCount() == expected_count
lyr = None
ds = None
# Test iterating without spatial index already built, with no matching feature
# when GEOS is available
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
lyr.SetSpatialFilterRect(1.4, 0.4, 1.6, 0.6)
c = 0
feat = lyr.GetNextFeature()
while feat is not None:
c = c + 1
feat = lyr.GetNextFeature()
assert c == expected_count
assert lyr.GetFeatureCount() == expected_count
feat = None
lyr = None
ds = None
# GetFeature() should not impact spatial index building
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("several_polygons")
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
feat = lyr.GetFeature(1)
feat = lyr.GetFeature(1)
assert get_spi_state(ds, lyr) == SPI_IN_BUILDING
feat = lyr.GetNextFeature()
while feat is not None:
feat = lyr.GetNextFeature()
assert get_spi_state(ds, lyr) == SPI_COMPLETED
lyr.ResetReading()
c = 0
feat = lyr.GetNextFeature()
while feat is not None:
c = c + 1
feat = lyr.GetNextFeature()
assert c == 1
assert get_spi_state(ds, lyr) == SPI_COMPLETED
# This will create an array of filtered features
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
assert lyr.TestCapability(ogr.OLCFastSetNextByIndex) == 1
# Test SetNextByIndex() with filtered features
assert lyr.SetNextByIndex(-1) != 0
assert lyr.SetNextByIndex(1) != 0
assert lyr.SetNextByIndex(0) == 0
feat = lyr.GetNextFeature()
assert feat.GetFID() == 1
assert get_spi_state(ds, lyr) == SPI_COMPLETED
feat = None
lyr = None
ds = None
# SetNextByIndex() impacts spatial index building
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
lyr.SetNextByIndex(3)
assert get_spi_state(ds, lyr) == SPI_INVALID
feat = None
lyr = None
ds = None
# and ResetReading() as well
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
feat = lyr.GetNextFeature()
lyr.ResetReading()
assert get_spi_state(ds, lyr) == SPI_INVALID
feat = None
lyr = None
ds = None
# and SetAttributeFilter() with an index too
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("point")
lyr.SetAttributeFilter("id = 1")
assert get_spi_state(ds, lyr) == SPI_INVALID
feat = None
lyr = None
ds = None
def test_ogr_openfilegdb_spx_spatial_filter():
# Test GetFeatureCount() and then iterating
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("several_polygons")
assert lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
assert lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
assert lyr.GetFeatureCount() == 1
c = 0
for f in lyr:
c += 1
assert c == 1
# Set another spatial filter
lyr.SetSpatialFilterRect(0, 2, 1, 5)
assert lyr.GetFeatureCount() == 2
# Unset spatial filter
lyr.SetSpatialFilter(None)
assert lyr.GetFeatureCount() == 9
# Set again a spatial filter
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
assert lyr.GetFeatureCount() == 1
lyr = None
ds = None
# Test iterating without spatial index already built
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("several_polygons")
lyr.SetSpatialFilterRect(0.25, 0.25, 0.5, 0.5)
c = 0
for f in lyr:
c += 1
assert c == 1
lyr = None
ds = None
# Test GetFeatureCount(), with no matching feature when GEOS is available
if ogrtest.have_geos():
expected_count = 0
else:
expected_count = 5
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
lyr.SetSpatialFilterRect(1.4, 0.4, 1.6, 0.6)
assert lyr.GetFeatureCount() == expected_count
lyr = None
ds = None
# Test iterating, with no matching feature when GEOS is available
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("multipolygon")
lyr.SetSpatialFilterRect(1.4, 0.4, 1.6, 0.6)
c = 0
for f in lyr:
c += 1
assert c == expected_count
assert lyr.GetFeatureCount() == expected_count
lyr = None
ds = None
# test with a SetAttributeFilter() with an index too
ds = ogr.Open("data/filegdb/test_spatial_index.gdb.zip")
lyr = ds.GetLayerByName("test")
lyr.SetAttributeFilter("id = 1")
lyr.SetSpatialFilterRect(400000, 0, 500100, 4500100)
assert lyr.GetFeatureCount() == 1
c = 0
for f in lyr:
c += 1
assert c == 1
# No intersection between filters
lyr.SetSpatialFilterRect(500100, 4500000, 500200, 4500100)
assert lyr.GetFeatureCount() == 0
c = 0
for f in lyr:
c += 1
assert c == 0
lyr.SetAttributeFilter(None)
assert lyr.GetFeatureCount() == 154
###############################################################################
# Test reading a broken .spx that has an index depth of 1 instead of 2
# Simulates scenario of SWISSTLM3D_2022_LV95_LN02.gdb/a00000019.spx
# from https://data.geo.admin.ch/ch.swisstopo.swisstlm3d/swisstlm3d_2022-03/swisstlm3d_2022-03_2056_5728.gdb.zip
# which advertises nIndexDepth == 1 whereas it seems to be it should be 2.
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_read_broken_spx_wrong_index_depth():
dirname = "/vsimem/test_ogr_openfilegdb_read_broken_spx_wrong_index_depth.gdb"
ds = ogr.GetDriverByName("OpenFileGDB").CreateDataSource(dirname)
lyr = ds.CreateLayer("test", geom_type=ogr.wkbPoint)
for j in range(50):
for i in range(50):
p = ogr.CreateGeometryFromWkt("POINT(%d %d)" % (i, j))
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometry(p)
lyr.CreateFeature(f)
ds = None
# Manually patch index depth from 2 to 1
f = gdal.VSIFOpenL(dirname + "/a00000009.spx", "rb+")
assert f
gdal.VSIFSeekL(f, 0, os.SEEK_END)
pos = gdal.VSIFTellL(f) - 22 + 6
gdal.VSIFSeekL(f, pos, os.SEEK_SET)
assert gdal.VSIFReadL(1, 1, f) == b"\x02"
gdal.VSIFSeekL(f, pos, os.SEEK_SET)
gdal.VSIFWriteL(b"\x01", 1, 1, f)
gdal.VSIFCloseL(f)
ds = ogr.Open(dirname)
lyr = ds.GetLayer(0)
lyr.SetSpatialFilterRect(0.5, 0.5, 48.5, 48.5)
with gdaltest.error_handler():
assert lyr.GetFeatureCount() == 48 * 48
assert (
"Cannot use /vsimem/test_ogr_openfilegdb_read_broken_spx_wrong_index_depth.gdb/a00000009.spx"
in gdal.GetLastErrorMsg()
)
ds = None
gdal.RmdirRecursive(dirname)
###############################################################################
# Test opening a FGDB with both SRID and LatestSRID set (#5638)
def test_ogr_openfilegdb_12():
ds = ogr.Open("/vsizip/data/filegdb/test3005.gdb.zip")
lyr = ds.GetLayer(0)
got_wkt = lyr.GetSpatialRef().ExportToWkt()
sr = osr.SpatialReference()
sr.ImportFromEPSG(3005)
expected_wkt = sr.ExportToWkt()
assert got_wkt == expected_wkt
ds = None
###############################################################################
# Test opening a FGDB v9 with a non spatial table (#5673)
def test_ogr_openfilegdb_13():
ds = ogr.Open("/vsizip/data/filegdb/ESSENCE_NAIPF_ORI_PROV_sub93.gdb.zip")
lyr = ds.GetLayer(0)
assert lyr.GetName() == "DDE_ESSEN_NAIPF_ORI_VUE"
assert lyr.GetSpatialRef() is None
assert lyr.GetGeomType() == ogr.wkbNone
f = lyr.GetNextFeature()
if f.GetField("GEOCODE") != "-673985,22+745574,77":
f.DumpReadable()
pytest.fail()
ds = None
###############################################################################
# Test not nullable fields
def test_ogr_openfilegdb_14():
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
lyr = ds.GetLayerByName("testnotnullable")
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_not_nullable"))
.IsNullable()
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_nullable"))
.IsNullable()
== 1
)
assert lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 0
ds = None
###############################################################################
# Test default values
def test_ogr_openfilegdb_15():
ds = ogr.Open("data/filegdb/test_default_val.gdb.zip")
lyr = ds.GetLayer(0)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("STR"))
.GetDefault()
== "'default_val'"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("STR"))
.GetWidth()
== 50
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("INT32"))
.GetDefault()
== "123456788"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("INT16"))
.GetDefault()
== "12345"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("FLOAT32"))
.GetDefault()
.find("1.23")
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("FLOAT64"))
.GetDefault()
.find("1.23456")
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("DATETIME"))
.GetDefault()
== "'2015/06/30 12:34:56'"
)
###############################################################################
# Read layers with sparse pages
def test_ogr_openfilegdb_16():
ds = ogr.Open("data/filegdb/sparse.gdb.zip")
lyr = ds.GetLayer(0)
for fid in [2, 3, 4, 7, 8, 9, 10, 2049, 8191, 16384, 10000000, 10000001]:
f = lyr.GetNextFeature()
assert f.GetFID() == fid
f = lyr.GetFeature(100000)
assert f is None
f = lyr.GetFeature(10000000 - 1)
assert f is None
f = lyr.GetNextFeature()
assert f is None
f = lyr.GetFeature(16384)
assert f is not None
###############################################################################
# Read a MULTILINESTRING ZM with a dummy M array (#6528)
def test_ogr_openfilegdb_17():
ds = ogr.Open("data/filegdb/multilinestringzm_with_dummy_m_array.gdb.zip")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
assert f.GetGeometryRef() is not None
###############################################################################
# Read curves
def test_ogr_openfilegdb_18():
ds = ogr.Open("data/filegdb/curves.gdb")
lyr = ds.GetLayerByName("line")
ds_ref = ogr.Open("data/filegdb/curves_line.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
lyr = ds.GetLayerByName("polygon")
ds_ref = ogr.Open("data/filegdb/curves_polygon.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
ds = ogr.Open("data/filegdb/curve_circle_by_center.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open("data/filegdb/curve_circle_by_center.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
###############################################################################
# Test opening '.'
def test_ogr_openfilegdb_19():
os.chdir("data/filegdb/curves.gdb")
ds = ogr.Open(".")
os.chdir("../../..")
assert ds is not None
###############################################################################
# Read polygons with M component where the M of the closing point is not the
# one of the starting point (#7017)
def test_ogr_openfilegdb_20():
ds = ogr.Open("data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open(
"data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb.csv"
)
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToIsoWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToIsoWkt())
ds = ogr.Open("data/filegdb/filegdb_polygonzm_nan_m_with_curves.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open("data/filegdb/filegdb_polygonzm_nan_m_with_curves.gdb.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToIsoWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToIsoWkt())
###############################################################################
# Test selecting FID column with OGRSQL
def test_ogr_openfilegdb_21():
ds = ogr.Open("data/filegdb/curves.gdb")
sql_lyr = ds.ExecuteSQL("SELECT OBJECTID FROM polygon WHERE OBJECTID = 2")
assert sql_lyr is not None
f = sql_lyr.GetNextFeature()
if f.GetFID() != 2:
f.DumpReadable()
pytest.fail()
f = sql_lyr.GetNextFeature()
assert f is None
ds.ReleaseResultSet(sql_lyr)
lyr = ds.GetLayerByName("polygon")
lyr.SetAttributeFilter("OBJECTID = 2")
f = lyr.GetNextFeature()
if f.GetFID() != 2:
f.DumpReadable()
pytest.fail()
###############################################################################
# Test bugfix for https://github.com/OSGeo/gdal/issues/1369
# where a polygon with inner rings has its exterior ring with wrong orientation
@pytest.mark.require_geos
def test_ogr_openfilegdb_weird_winding_order():
ds = ogr.Open(
"/vsizip/data/filegdb/weird_winding_order_fgdb.zip/roads_clip Drawing.gdb"
)
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
g = f.GetGeometryRef()
assert g.GetGeometryCount() == 1
assert g.GetGeometryRef(0).GetGeometryCount() == 17
###############################################################################
# Test bugfix for https://github.com/OSGeo/gdal/issues/1369
# where a polygon with inner rings has its exterior ring with wrong orientation
def test_ogr_openfilegdb_utc_datetime():
ds = ogr.Open("data/filegdb/testdatetimeutc.gdb")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
# Check that the timezone +00 is present
assert f.GetFieldAsString("EditDate") == "2020/06/22 07:49:36+00"
###############################################################################
# Test that field alias are correctly read and mapped to OGR field alternativ
# names
def test_ogr_fgdb_alias():
ds = ogr.Open("data/filegdb/field_alias.gdb")
lyr = ds.GetLayer(0)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("text"))
.GetAlternativeName()
== "My Text Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("short_int"))
.GetAlternativeName()
== "My Short Int Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("long_int"))
.GetAlternativeName()
== "My Long Int Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("float"))
.GetAlternativeName()
== "My Float Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("double"))
.GetAlternativeName()
== "My Double Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("date"))
.GetAlternativeName()
== "My Date Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("blob"))
.GetAlternativeName()
== "My Blob Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("guid"))
.GetAlternativeName()
== "My GUID field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("raster"))
.GetAlternativeName()
== "My Raster Field"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("SHAPE_Length"))
.GetAlternativeName()
== ""
)
###############################################################################
# Test reading field domains
def _check_domains(ds):
assert set(ds.GetFieldDomainNames()) == {
"MedianType",
"RoadSurfaceType",
"SpeedLimit",
}
with gdaltest.error_handler():
assert ds.GetFieldDomain("i_dont_exist") is None
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
fld_defn = lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("MaxSpeed"))
assert fld_defn.GetDomainName() == "SpeedLimit"
domain = ds.GetFieldDomain("SpeedLimit")
assert domain is not None
assert domain.GetName() == "SpeedLimit"
assert domain.GetDescription() == "The maximun speed of the road"
assert domain.GetDomainType() == ogr.OFDT_RANGE
assert domain.GetFieldType() == fld_defn.GetType()
assert domain.GetFieldSubType() == fld_defn.GetSubType()
assert domain.GetMinAsDouble() == 40.0
assert domain.GetMaxAsDouble() == 100.0
fld_defn = lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("MedianType"))
assert fld_defn.GetDomainName() == "MedianType"
domain = ds.GetFieldDomain("MedianType")
assert domain is not None
assert domain.GetName() == "MedianType"
assert domain.GetDescription() == "Road median types."
assert domain.GetDomainType() == ogr.OFDT_CODED
assert domain.GetFieldType() == fld_defn.GetType()
assert domain.GetFieldSubType() == fld_defn.GetSubType()
assert domain.GetEnumeration() == {"0": "None", "1": "Cement"}
###############################################################################
# Test reading field domains
def test_ogr_openfilegdb_read_domains():
ds = gdal.OpenEx("data/filegdb/Domains.gdb", gdal.OF_VECTOR)
_check_domains(ds)
###############################################################################
# Test writing field domains
def test_ogr_openfilegdb_write_domains_from_other_gdb():
out_dir = "tmp/test_ogr_fgdb_write_domains.gdb"
try:
shutil.rmtree(out_dir)
except OSError:
pass
ds = gdal.VectorTranslate(
out_dir, "data/filegdb/Domains.gdb", options="-f OpenFileGDB"
)
_check_domains(ds)
assert ds.TestCapability(ogr.ODsCAddFieldDomain) == 1
assert ds.TestCapability(ogr.ODsCDeleteFieldDomain) == 1
assert ds.TestCapability(ogr.ODsCUpdateFieldDomain) == 1
with gdaltest.error_handler():
assert not ds.DeleteFieldDomain("not_existing")
domain = ogr.CreateCodedFieldDomain(
"unused_domain", "desc", ogr.OFTInteger, ogr.OFSTNone, {1: "one", "2": None}
)
assert ds.AddFieldDomain(domain)
assert ds.DeleteFieldDomain("unused_domain")
domain = ds.GetFieldDomain("unused_domain")
assert domain is None
domain = ogr.CreateRangeFieldDomain(
"SpeedLimit", "desc", ogr.OFTInteger, ogr.OFSTNone, 1, True, 2, True
)
assert ds.UpdateFieldDomain(domain)
ds = None
ds = gdal.OpenEx(out_dir, allowed_drivers=["OpenFileGDB"])
assert ds.GetFieldDomain("unused_domain") is None
domain = ds.GetFieldDomain("SpeedLimit")
assert domain.GetDescription() == "desc"
ds = None
try:
shutil.rmtree(out_dir)
except OSError:
pass
###############################################################################
# Test reading layer hierarchy
@gdaltest.disable_exceptions()
def test_ogr_openfilegdb_read_layer_hierarchy():
if False:
# Test dataset produced with:
from osgeo import ogr, osr
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogr.GetDriverByName("FileGDB").CreateDataSource("featuredataset.gdb")
ds.CreateLayer(
"fd1_lyr1", srs=srs, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd1"]
)
ds.CreateLayer(
"fd1_lyr2", srs=srs, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd1"]
)
srs2 = osr.SpatialReference()
srs2.ImportFromEPSG(32631)
ds.CreateLayer("standalone", srs=srs2, geom_type=ogr.wkbPoint)
srs3 = osr.SpatialReference()
srs3.ImportFromEPSG(32632)
ds.CreateLayer(
"fd2_lyr", srs=srs3, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd2"]
)
ds = gdal.OpenEx("data/filegdb/featuredataset.gdb")
rg = ds.GetRootGroup()
assert rg.GetGroupNames() == ["fd1", "fd2"]
assert rg.OpenGroup("not_existing") is None
fd1 = rg.OpenGroup("fd1")
assert fd1 is not None
assert fd1.GetVectorLayerNames() == ["fd1_lyr1", "fd1_lyr2"]
assert fd1.OpenVectorLayer("not_existing") is None
assert fd1.GetGroupNames() is None
fd1_lyr1 = fd1.OpenVectorLayer("fd1_lyr1")
assert fd1_lyr1 is not None
assert fd1_lyr1.GetName() == "fd1_lyr1"
fd1_lyr2 = fd1.OpenVectorLayer("fd1_lyr2")
assert fd1_lyr2 is not None
assert fd1_lyr2.GetName() == "fd1_lyr2"
fd2 = rg.OpenGroup("fd2")
assert fd2 is not None
assert fd2.GetVectorLayerNames() == ["fd2_lyr"]
fd2_lyr = fd2.OpenVectorLayer("fd2_lyr")
assert fd2_lyr is not None
assert rg.GetVectorLayerNames() == ["standalone"]
standalone = rg.OpenVectorLayer("standalone")
assert standalone is not None
###############################################################################
# Test LIST_ALL_TABLES open option
def test_ogr_openfilegdb_list_all_tables_v10():
ds = ogr.Open("data/filegdb/testopenfilegdb.gdb.zip")
assert ds is not None
assert ds.GetLayerCount() == 37, "did not get expected layer count"
layer_names = [ds.GetLayer(i).GetName() for i in range(ds.GetLayerCount())]
# should not be exposed by default
for name in [
"GDB_DBTune",
"GDB_ItemRelationshipTypes",
"GDB_ItemRelationships",
"GDB_ItemTypes",
"GDB_Items",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
]:
assert name not in layer_names
# Test LIST_ALL_TABLES=YES open option
ds_all_table = gdal.OpenEx(
"data/filegdb/testopenfilegdb.gdb.zip",
gdal.OF_VECTOR,
open_options=["LIST_ALL_TABLES=YES"],
)
assert ds_all_table.GetLayerCount() == 44, "did not get expected layer count"
layer_names = [
ds_all_table.GetLayer(i).GetName() for i in range(ds_all_table.GetLayerCount())
]
for name in [
"linestring",
"point",
"multipoint",
"GDB_DBTune",
"GDB_ItemRelationshipTypes",
"GDB_ItemRelationships",
"GDB_ItemTypes",
"GDB_Items",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
]:
assert name in layer_names
private_layers = [
ds_all_table.GetLayer(i).GetName()
for i in range(ds_all_table.GetLayerCount())
if ds_all_table.IsLayerPrivate(i)
]
for name in [
"GDB_DBTune",
"GDB_ItemRelationshipTypes",
"GDB_ItemRelationships",
"GDB_ItemTypes",
"GDB_Items",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
]:
assert name in private_layers
for name in ["linestring", "point", "multipoint"]:
assert name not in private_layers
def test_ogr_openfilegdb_list_all_tables_v9():
ds = ogr.Open("data/filegdb/testopenfilegdb93.gdb.zip")
assert ds is not None
assert ds.GetLayerCount() == 22, "did not get expected layer count"
layer_names = [ds.GetLayer(i).GetName() for i in range(ds.GetLayerCount())]
# should not be exposed by default
for name in [
"GDB_DBTune",
"GDB_FeatureClasses",
"GDB_FeatureDataset",
"GDB_FieldInfo",
"GDB_ObjectClasses",
"GDB_Release",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
"GDB_UserMetadata",
]:
assert name not in layer_names
# Test LIST_ALL_TABLES=YES open option
ds_all_table = gdal.OpenEx(
"data/filegdb/testopenfilegdb93.gdb.zip",
gdal.OF_VECTOR,
open_options=["LIST_ALL_TABLES=YES"],
)
assert ds_all_table.GetLayerCount() == 31, "did not get expected layer count"
layer_names = [
ds_all_table.GetLayer(i).GetName() for i in range(ds_all_table.GetLayerCount())
]
print(layer_names)
for name in [
"linestring",
"point",
"multipoint",
"GDB_DBTune",
"GDB_FeatureClasses",
"GDB_FeatureDataset",
"GDB_FieldInfo",
"GDB_ObjectClasses",
"GDB_Release",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
"GDB_UserMetadata",
]:
assert name in layer_names
private_layers = [
ds_all_table.GetLayer(i).GetName()
for i in range(ds_all_table.GetLayerCount())
if ds_all_table.IsLayerPrivate(i)
]
for name in [
"GDB_DBTune",
"GDB_FeatureClasses",
"GDB_FeatureDataset",
"GDB_FieldInfo",
"GDB_ObjectClasses",
"GDB_Release",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
"GDB_UserMetadata",
]:
assert name in private_layers
for name in ["linestring", "point", "multipoint"]:
assert name not in private_layers
###############################################################################
# Test that non-spatial tables which are not present in GDB_Items are listed
# see https://github.com/OSGeo/gdal/issues/4463
def test_ogr_openfilegdb_non_spatial_table_outside_gdb_items():
ds = ogr.Open("data/filegdb/table_outside_gdbitems.gdb")
assert ds is not None
assert ds.GetLayerCount() == 3, "did not get expected layer count"
layer_names = set(ds.GetLayer(i).GetName() for i in range(ds.GetLayerCount()))
assert layer_names == {"aquaduct", "flat_table1", "flat_table2"}
# Test with the LIST_ALL_TABLES=YES open option
ds_all_table = gdal.OpenEx(
"data/filegdb/table_outside_gdbitems.gdb",
gdal.OF_VECTOR,
open_options=["LIST_ALL_TABLES=YES"],
)
assert ds_all_table.GetLayerCount() == 10, "did not get expected layer count"
layer_names = set(
ds_all_table.GetLayer(i).GetName() for i in range(ds_all_table.GetLayerCount())
)
for name in [
"aquaduct",
"flat_table1",
"flat_table2",
"GDB_DBTune",
"GDB_ItemRelationshipTypes",
"GDB_ItemRelationships",
"GDB_ItemTypes",
"GDB_Items",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
]:
assert name in layer_names
private_layers = set(
ds_all_table.GetLayer(i).GetName()
for i in range(ds_all_table.GetLayerCount())
if ds_all_table.IsLayerPrivate(i)
)
for name in [
"GDB_DBTune",
"GDB_ItemRelationshipTypes",
"GDB_ItemRelationships",
"GDB_ItemTypes",
"GDB_Items",
"GDB_SpatialRefs",
"GDB_SystemCatalog",
]:
assert name in private_layers
for name in ["aquaduct", "flat_table1", "flat_table2"]:
assert name not in private_layers
###############################################################################
# Test reading .gdb with strings encoded as UTF16 instead of UTF8
# (e.g. using -lco CONFIGURATION_KEYWORD=TEXT_UTF16 of FileGDB driver)
def test_ogr_openfilegdb_strings_utf16():
ds = ogr.Open("data/filegdb/test_utf16.gdb.zip")
assert ds is not None
lyr = ds.GetLayer(0)
fld_defn = lyr.GetLayerDefn().GetFieldDefn(0)
assert fld_defn.GetDefault() == "'éven'"
f = lyr.GetNextFeature()
assert f["str"] == "évenéven"
###############################################################################
# Test reading .gdb where the CRS in the XML definition of the feature
# table is not consistent with the one of the feature dataset
def test_ogr_openfilegdb_inconsistent_crs_feature_dataset_and_feature_table():
ds = ogr.Open("data/filegdb/inconsistent_crs_feature_dataset_and_feature_table.gdb")
assert ds is not None
lyr = ds.GetLayer(0)
srs = lyr.GetSpatialRef()
assert srs is not None
assert srs.GetAuthorityCode(None) == "4326"
###############################################################################
# Test reading a .spx file with the value_count field at 0
# (https://github.com/OSGeo/gdal/issues/5888)
def test_ogr_openfilegdb_spx_zero_in_value_count_trailer():
ds = ogr.Open("data/filegdb/spx_zero_in_value_count_trailer.gdb")
assert ds is not None
lyr = ds.GetLayer(0)
lyr.SetSpatialFilterRect(1, 1, 2, 2)
assert lyr.GetFeatureCount() == 1
###############################################################################
# Test reading .gdb with LengthFieldName / AreaFieldName
def test_ogr_openfilegdb_shape_length_shape_area_as_default_in_field_defn():
ds = ogr.Open("data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb")
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Area")).GetDefault()
== "FILEGEODATABASE_SHAPE_AREA"
)
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Length")).GetDefault()
== "FILEGEODATABASE_SHAPE_LENGTH"
)
###############################################################################
# Test reading relationships
def test_ogr_openfilegdb_read_relationships():
# no relationships
ds = gdal.OpenEx("data/filegdb/Domains.gdb", gdal.OF_VECTOR)
assert ds.GetRelationshipNames() is None
# has relationships
ds = gdal.OpenEx("data/filegdb/relationships.gdb", gdal.OF_VECTOR)
assert set(ds.GetRelationshipNames()) == {
"composite_many_to_many",
"composite_one_to_many",
"composite_one_to_one",
"simple_attributed",
"simple_backward_message_direction",
"simple_both_message_direction",
"simple_forward_message_direction",
"simple_many_to_many",
"simple_one_to_many",
"simple_relationship_one_to_one",
"points__ATTACHREL",
}
assert ds.GetRelationship("xxxx") is None
rel = ds.GetRelationship("simple_relationship_one_to_one")
assert rel is not None
assert rel.GetName() == "simple_relationship_one_to_one"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_ONE
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetLeftMappingTableFields() is None
assert rel.GetRightMappingTableFields() is None
assert rel.GetForwardPathLabel() == "my forward path label"
assert rel.GetBackwardPathLabel() == "my backward path label"
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("simple_one_to_many")
assert rel is not None
assert rel.GetName() == "simple_one_to_many"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("simple_many_to_many")
assert rel is not None
assert rel.GetName() == "simple_many_to_many"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == "simple_many_to_many"
assert rel.GetCardinality() == gdal.GRC_MANY_TO_MANY
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetLeftMappingTableFields() == ["origin_foreign_key"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRightMappingTableFields() == ["destination_foreign_key"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_one_to_one")
assert rel is not None
assert rel.GetName() == "composite_one_to_one"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table3"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_ONE
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_one_to_many")
assert rel is not None
assert rel.GetName() == "composite_one_to_many"
assert rel.GetLeftTableName() == "table5"
assert rel.GetRightTableName() == "table4"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_many_to_many")
assert rel is not None
assert rel.GetName() == "composite_many_to_many"
assert rel.GetLeftTableName() == "table6"
assert rel.GetRightTableName() == "table7"
assert rel.GetMappingTableName() == "composite_many_to_many"
assert rel.GetCardinality() == gdal.GRC_MANY_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetLeftMappingTableFields() == ["origin_foreign_key"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRightMappingTableFields() == ["dest_foreign_key"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("points__ATTACHREL")
assert rel is not None
assert rel.GetName() == "points__ATTACHREL"
assert rel.GetLeftTableName() == "points"
assert rel.GetRightTableName() == "points__ATTACH"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["OBJECTID"]
assert rel.GetRightTableFields() == ["REL_OBJECTID"]
assert rel.GetForwardPathLabel() == "attachment"
assert rel.GetBackwardPathLabel() == "object"
assert rel.GetRelatedTableType() == "media"
###############################################################################
# Cleanup
def test_ogr_openfilegdb_cleanup():
try:
shutil.rmtree("tmp/testopenfilegdb.gdb")
except OSError:
pass
try:
os.remove("tmp/a00000009.gdbtable")
os.remove("tmp/a00000009.gdbtablx")
except OSError:
pass
try:
shutil.rmtree("tmp/testopenfilegdb_fuzzed.gdb")
except OSError:
pass