gdal/autotest/ogr/ogr_fgdb.py

3150 строки
97 KiB
Python
Исполняемый файл

#!/usr/bin/env pytest
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR Test Suite
# Purpose: FGDB driver testing.
# Author: Even Rouault <even dot rouault at spatialys.com>
#
###############################################################################
# Copyright (c) 2011-2014, Even Rouault <even dot rouault at spatialys.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
###############################################################################
import os
import shutil
import sys
import gdaltest
import ogrtest
import pytest
from osgeo import gdal, ogr, osr
pytestmark = pytest.mark.require_driver("FileGDB")
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def module_disable_exceptions():
with gdaltest.disable_exceptions():
yield
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def startup_and_cleanup():
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
yield
# The SDK messes somehow with the locale, which cause issues in
# other test files, such as gcore/basic_test.py, which assumes English
# error messages
import locale
locale.setlocale(locale.LC_ALL, "C")
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
try:
shutil.rmtree("tmp/test2.gdb")
except OSError:
pass
try:
shutil.rmtree("tmp/poly.gdb")
except OSError:
pass
try:
shutil.rmtree("tmp/test3005.gdb")
except OSError:
pass
try:
shutil.rmtree("tmp/roads_clip Drawing.gdb")
except OSError:
pass
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def openfilegdb_drv():
drv = ogr.GetDriverByName("OpenFileGDB")
yield drv
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def fgdb_drv(openfilegdb_drv):
drv = ogr.GetDriverByName("FileGDB")
if drv is None:
pytest.skip("FileGDB driver not available", allow_module_level=True)
if openfilegdb_drv is not None:
openfilegdb_drv.Deregister()
yield drv
if openfilegdb_drv is not None:
drv.Deregister()
# Force OpenFileGDB first
openfilegdb_drv.Register()
drv.Register()
###############################################################################
@pytest.fixture()
def fgdb_sdk_1_4_or_later(fgdb_drv):
fgdb_is_sdk_1_4 = False
try:
shutil.rmtree("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
except OSError:
pass
ds = fgdb_drv.CreateDataSource("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
srs = osr.SpatialReference()
srs.ImportFromProj4("+proj=tmerc +datum=WGS84 +no_defs")
with gdaltest.error_handler():
lyr = ds.CreateLayer("test", srs=srs, geom_type=ogr.wkbPoint)
if lyr is not None:
fgdb_is_sdk_1_4 = True
ds = None
shutil.rmtree("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
if not fgdb_is_sdk_1_4:
pytest.skip("SDK 1.4 required")
###############################################################################
@pytest.fixture()
def ogrsf_path():
import test_cli_utilities
path = test_cli_utilities.get_test_ogrsf_path()
if path is None:
pytest.skip("ogrsf test utility not found")
return path
###############################################################################
# Write and read back various geometry types
def test_ogr_fgdb_1(fgdb_drv):
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
datalist = [
["none", ogr.wkbNone, None],
["point", ogr.wkbPoint, "POINT (1 2)"],
["multipoint", ogr.wkbMultiPoint, "MULTIPOINT (1 2,3 4)"],
[
"linestring",
ogr.wkbLineString,
"LINESTRING (1 2,3 4)",
"MULTILINESTRING ((1 2,3 4))",
],
[
"multilinestring",
ogr.wkbMultiLineString,
"MULTILINESTRING ((1 2,3 4),(5 6,7 8))",
],
[
"polygon",
ogr.wkbPolygon,
"POLYGON ((0 0,0 1,1 1,1 0,0 0))",
"MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0)))",
],
[
"multipolygon",
ogr.wkbMultiPolygon,
"MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0),(0.25 0.25,0.75 0.25,0.75 0.75,0.25 0.75,0.25 0.25)),((2 0,2 1,3 1,3 0,2 0)))",
],
["point25D", ogr.wkbPoint25D, "POINT (1 2 3)"],
["multipoint25D", ogr.wkbMultiPoint25D, "MULTIPOINT (1 2 -10,3 4 -20)"],
[
"linestring25D",
ogr.wkbLineString25D,
"LINESTRING (1 2 -10,3 4 -20)",
"MULTILINESTRING ((1 2 -10,3 4 -20))",
],
[
"multilinestring25D",
ogr.wkbMultiLineString25D,
"MULTILINESTRING ((1 2 -10,3 4 -20))",
],
[
"polygon25D",
ogr.wkbPolygon25D,
"POLYGON ((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10))",
"MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))",
],
[
"multipolygon25D",
ogr.wkbMultiPolygon25D,
"MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))",
],
[
"multipatch",
ogr.wkbMultiPolygon25D,
"GEOMETRYCOLLECTION Z (TIN Z (((0.0 0.0 0,0.0 1.0 0,1.0 0.0 0,0.0 0.0 0)),((0.0 1.0 0,1.0 0.0 0,1.0 1.0 0,0.0 1.0 0))),TIN Z (((10.0 0.0 0,10.0 1.0 0,11.0 0.0 0,10.0 0.0 0)),((10.0 0.0 0,11.0 0.0 0,10.0 -1.0 0,10.0 0.0 0))),TIN Z (((5.0 0.0 0,5.0 1.0 0,6.0 0.0 0,5.0 0.0 0))),MULTIPOLYGON Z (((100.0 0.0 0,100.0 1.0 0,101.0 1.0 0,101.0 0.0 0,100.0 0.0 0),(100.25 0.25 0,100.75 0.25 0,100.75 0.75 0,100.75 0.25 0,100.25 0.25 0))))",
],
[
"tin",
ogr.wkbTINZ,
"TIN Z (((0.0 0.0 0,0.0 1.0 0,1.0 0.0 0,0.0 0.0 0)),((0.0 1.0 0,1.0 0.0 0,1.0 1.0 0,0.0 1.0 0)))",
],
["null_polygon", ogr.wkbPolygon, None],
["empty_polygon", ogr.wkbPolygon, "POLYGON EMPTY", None],
]
options = [
"COLUMN_TYPES=smallint=esriFieldTypeSmallInteger,float=esriFieldTypeSingle,guid=esriFieldTypeGUID,xml=esriFieldTypeXML"
]
for data in datalist:
if data[1] == ogr.wkbNone:
lyr = ds.CreateLayer(data[0], geom_type=data[1], options=options)
elif data[0] == "multipatch":
lyr = ds.CreateLayer(
data[0],
geom_type=data[1],
srs=srs,
options=["CREATE_MULTIPATCH=YES", options[0]],
)
else:
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=options)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("smallint", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("int", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("float", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("real", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("adate", ogr.OFTDateTime))
lyr.CreateField(ogr.FieldDefn("guid", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("xml", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("binary", ogr.OFTBinary))
lyr.CreateField(ogr.FieldDefn("binary2", ogr.OFTBinary))
fld_defn = ogr.FieldDefn("smallint2", ogr.OFTInteger)
fld_defn.SetSubType(ogr.OFSTInt16)
lyr.CreateField(fld_defn)
fld_defn = ogr.FieldDefn("float2", ogr.OFTReal)
fld_defn.SetSubType(ogr.OFSTFloat32)
lyr.CreateField(fld_defn)
# We need at least 5 features so that test_ogrsf can test SetFeature()
for i in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
if data[1] != ogr.wkbNone and data[2] is not None:
feat.SetGeometry(ogr.CreateGeometryFromWkt(data[2]))
feat.SetField("id", i + 1)
feat.SetField("str", "foo_\xc3\xa9")
feat.SetField("smallint", -13)
feat.SetField("int", 123)
feat.SetField("float", 1.5)
feat.SetField("real", 4.56)
feat.SetField("adate", "2013/12/26 12:34:56")
feat.SetField("guid", "{12345678-9abc-DEF0-1234-567890ABCDEF}")
feat.SetField("xml", "<foo></foo>")
feat.SetFieldBinaryFromHexString("binary", "00FF7F")
feat.SetFieldBinaryFromHexString("binary2", "123456")
feat.SetField("smallint2", -32768)
feat.SetField("float2", 1.5)
lyr.CreateFeature(feat)
for data in datalist:
lyr = ds.GetLayerByName(data[0])
if data[1] != ogr.wkbNone:
assert (
lyr.GetSpatialRef().IsSame(
srs, options=["IGNORE_DATA_AXIS_TO_SRS_AXIS_MAPPING=YES"]
)
== 1
)
feat = lyr.GetNextFeature()
if data[1] != ogr.wkbNone:
try:
expected_wkt = data[3]
except IndexError:
expected_wkt = data[2]
if expected_wkt is None:
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail(data)
elif ogrtest.check_feature_geometry(feat, expected_wkt) != 0:
feat.DumpReadable()
pytest.fail(data)
if (
feat.GetField("id") != 1
or feat.GetField("smallint") != -13
or feat.GetField("int") != 123
or feat.GetField("float") != 1.5
or feat.GetField("real") != 4.56
or feat.GetField("adate") != "2013/12/26 12:34:56"
or feat.GetField("guid") != "{12345678-9ABC-DEF0-1234-567890ABCDEF}"
or feat.GetField("xml") != "<foo></foo>"
or feat.GetField("binary") != "00FF7F"
or feat.GetField("binary2") != "123456"
or feat.GetField("smallint2") != -32768
):
feat.DumpReadable()
pytest.fail()
sql_lyr = ds.ExecuteSQL("GetLayerDefinition %s" % lyr.GetName())
assert sql_lyr is not None
feat = sql_lyr.GetNextFeature()
assert feat is not None
feat = sql_lyr.GetNextFeature()
assert feat is None
lyr.ResetReading()
lyr.TestCapability("foo")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLayerMetadata %s" % lyr.GetName())
assert sql_lyr is not None
feat = sql_lyr.GetNextFeature()
assert feat is not None
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLayerDefinition foo")
assert sql_lyr is None
sql_lyr = ds.ExecuteSQL("GetLayerMetadata foo")
assert sql_lyr is None
ds = None
###############################################################################
# Test DeleteField()
def test_ogr_fgdb_DeleteField():
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByIndex(0)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("smallint"))
.GetSubType()
== ogr.OFSTInt16
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("smallint2"))
.GetSubType()
== ogr.OFSTInt16
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("float"))
.GetSubType()
== ogr.OFSTFloat32
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("float2"))
.GetSubType()
== ogr.OFSTFloat32
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("smallint"))
.GetWidth()
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("str"))
.GetWidth()
== 0
)
assert lyr.DeleteField(lyr.GetLayerDefn().GetFieldIndex("str")) == 0
# Needed since FileGDB v1.4, otherwise crash/error ...
if True: # pylint: disable=using-constant-test
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByIndex(0)
fld_defn = ogr.FieldDefn("str2", ogr.OFTString)
fld_defn.SetWidth(80)
lyr.CreateField(fld_defn)
feat = lyr.GetNextFeature()
feat.SetField("str2", "foo2_\xc3\xa9")
lyr.SetFeature(feat)
# Test updating non-existing feature
feat.SetFID(-10)
with gdaltest.disable_exceptions():
assert (
lyr.SetFeature(feat) == ogr.OGRERR_NON_EXISTING_FEATURE
), "Expected failure of SetFeature()."
# Test deleting non-existing feature
assert (
lyr.DeleteFeature(-10) == ogr.OGRERR_NON_EXISTING_FEATURE
), "Expected failure of DeleteFeature()."
sql_lyr = ds.ExecuteSQL("REPACK")
assert sql_lyr
f = sql_lyr.GetNextFeature()
assert f[0] == "true"
ds.ReleaseResultSet(sql_lyr)
feat = None
ds = None
ds = ogr.Open("tmp/test.gdb")
lyr = ds.GetLayerByIndex(0)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("str2"))
.GetWidth()
== 80
)
assert lyr.GetLayerDefn().GetFieldIndex("str") == -1
feat = lyr.GetNextFeature()
assert feat.GetFieldAsString("str2") == "foo2_\xc3\xa9"
ds = None
###############################################################################
# Run test_ogrsf
def test_ogr_fgdb_2(ogrsf_path):
ret = gdaltest.runexternal(
ogrsf_path + " -ro tmp/test.gdb --config OGR_SKIP OpenFileGDB"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
###############################################################################
# Run ogr2ogr
def test_ogr_fgdb_3(openfilegdb_drv):
import test_cli_utilities
if test_cli_utilities.get_ogr2ogr_path() is None:
pytest.skip()
try:
shutil.rmtree("tmp/poly.gdb")
except OSError:
pass
gdaltest.runexternal(
test_cli_utilities.get_ogr2ogr_path()
+ " -f filegdb tmp/poly.gdb data/poly.shp -nlt MULTIPOLYGON -a_srs None"
)
ds = ogr.Open("tmp/poly.gdb")
assert not (ds is None or ds.GetLayerCount() == 0), "ogr2ogr failed"
ds = None
if test_cli_utilities.get_test_ogrsf_path() is None:
pytest.skip()
if openfilegdb_drv is None:
# OpenFileGDB is required for CreateFeature() with a FID set
pytest.skip("skipping test_ogrsf due to missing OpenFileGDB driver")
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path()
+ " tmp/poly.gdb --config DRIVER_WISHED FileGDB"
)
# print ret
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
###############################################################################
# Test SQL support
def test_ogr_fgdb_sql():
import test_cli_utilities
if test_cli_utilities.get_ogr2ogr_path() is None:
pytest.skip()
ds = ogr.Open("tmp/poly.gdb")
ds.ExecuteSQL("CREATE INDEX idx_poly_eas_id ON poly(EAS_ID)")
sql_lyr = ds.ExecuteSQL("SELECT * FROM POLY WHERE EAS_ID = 170", dialect="FileGDB")
feat = sql_lyr.GetNextFeature()
assert feat is not None
feat = sql_lyr.GetNextFeature()
assert feat is None
feat = None
ds.ReleaseResultSet(sql_lyr)
ds = None
###############################################################################
# Test delete layer
def test_ogr_fgdb_4():
for j in range(2):
# Create a layer
ds = ogr.Open("tmp/test.gdb", update=1)
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
lyr = ds.CreateLayer("layer_to_remove", geom_type=ogr.wkbPoint, srs=srs)
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(2 49)"))
feat.SetField("str", "foo")
feat = None
lyr = None
if j == 1:
ds = None
ds = ogr.Open("tmp/test.gdb", update=1)
# Delete it
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == "layer_to_remove":
ds.DeleteLayer(i)
break
# Check it no longer exists
lyr = ds.GetLayerByName("layer_to_remove")
ds = None
assert lyr is None, "failed at iteration %d" % j
###############################################################################
# Test DeleteDataSource()
def test_ogr_fgdb_5(fgdb_drv):
assert fgdb_drv.DeleteDataSource("tmp/test.gdb") == 0, "DeleteDataSource() failed"
assert not os.path.exists("tmp/test.gdb")
###############################################################################
# Test adding a layer to an existing feature dataset
def test_ogr_fgdb_6(fgdb_drv):
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
ds.CreateLayer(
"layer1",
srs=srs,
geom_type=ogr.wkbPoint,
options=["FEATURE_DATASET=featuredataset"],
)
ds.CreateLayer(
"layer2",
srs=srs,
geom_type=ogr.wkbPoint,
options=["FEATURE_DATASET=featuredataset"],
)
ds = None
ds = ogr.Open("tmp/test.gdb")
assert ds.GetLayerCount() == 2
ds = None
###############################################################################
# Test bulk loading (#4420)
def test_ogr_fgdb_7(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
lyr = ds.CreateLayer("test", srs=srs, geom_type=ogr.wkbPoint)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
with gdal.config_option("FGDB_BULK_LOAD", "YES"):
for i in range(1000):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, i)
geom = ogr.CreateGeometryFromWkt("POINT(0 1)")
feat.SetGeometry(geom)
lyr.CreateFeature(feat)
feat = None
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert feat.GetField(0) == 0
ds = None
###############################################################################
# Test field name laundering (#4458)
def test_ogr_fgdb_8(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
lyr = ds.CreateLayer("test", srs=srs, geom_type=ogr.wkbPoint)
with gdaltest.error_handler():
lyr.CreateField(ogr.FieldDefn("FROM", ogr.OFTInteger)) # reserved keyword
lyr.CreateField(
ogr.FieldDefn("1NUMBER", ogr.OFTInteger)
) # starting with a number
lyr.CreateField(
ogr.FieldDefn("WITH SPACE AND !$*!- special characters", ogr.OFTInteger)
) # unallowed characters
lyr.CreateField(ogr.FieldDefn("é" * 64, ogr.OFTInteger)) # OK
lyr.CreateField(
ogr.FieldDefn(
"A123456789012345678901234567890123456789012345678901234567890123",
ogr.OFTInteger,
)
) # 64 characters : ok
lyr.CreateField(
ogr.FieldDefn(
"A1234567890123456789012345678901234567890123456789012345678901234",
ogr.OFTInteger,
)
) # 65 characters : nok
lyr.CreateField(
ogr.FieldDefn(
"A12345678901234567890123456789012345678901234567890123456789012345",
ogr.OFTInteger,
)
) # 66 characters : nok
lyr_defn = lyr.GetLayerDefn()
expected_names = [
"FROM_",
"_1NUMBER",
"WITH_SPACE_AND_______special_characters",
"é" * 64,
"A123456789012345678901234567890123456789012345678901234567890123",
"A1234567890123456789012345678901234567890123456789012345678901_1",
"A1234567890123456789012345678901234567890123456789012345678901_2",
]
for i in range(5):
assert lyr_defn.GetFieldIndex(expected_names[i]) == i, (
"did not find %s" % expected_names[i]
)
###############################################################################
# Test layer name laundering (#4466)
def test_ogr_fgdb_9(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
_160char = "A123456789" * 16
in_names = [
"FROM", # reserved keyword
"1NUMBER", # starting with a number
"WITH SPACE AND !$*!- special characters", # banned characters
"sde_foo", # reserved prefixes
_160char, # OK
_160char + "A", # too long
_160char + "B", # still too long
]
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
with gdaltest.error_handler():
for in_name in in_names:
lyr = ds.CreateLayer(in_name, srs=srs, geom_type=ogr.wkbPoint)
lyr.GetLayerDefn()
expected_names = [
"FROM_",
"_1NUMBER",
"WITH_SPACE_AND_______special_characters",
"_sde_foo",
_160char,
_160char[0:158] + "_1",
_160char[0:158] + "_2",
]
for i, exp_name in enumerate(expected_names):
assert ds.GetLayerByIndex(i).GetName() == exp_name, "did not find %s" % exp_name
###############################################################################
# Test SRS support
def test_ogr_fgdb_10(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs_exact_4326 = osr.SpatialReference()
srs_exact_4326.ImportFromEPSG(4326)
srs_approx_4326 = srs_exact_4326.Clone()
srs_approx_4326.MorphToESRI()
srs_approx_4326.ImportFromWkt(srs_approx_4326.ExportToWkt())
srs_exact_2193 = osr.SpatialReference()
srs_exact_2193.ImportFromEPSG(2193)
srs_approx_2193 = srs_exact_2193.Clone()
srs_approx_2193.MorphToESRI()
srs_approx_2193.ImportFromWkt(srs_approx_2193.ExportToWkt())
srs_not_in_db = osr.SpatialReference(
"""PROJCS["foo",
GEOGCS["foo",
DATUM["foo",
SPHEROID["foo",6000000,300]],
PRIMEM["Greenwich",0],
UNIT["Degree",0.017453292519943295]],
PROJECTION["Transverse_Mercator"],
PARAMETER["latitude_of_origin",0],
PARAMETER["central_meridian",0],
PARAMETER["scale_factor",1],
PARAMETER["false_easting",500000],
PARAMETER["false_northing",0],
UNIT["Meter",1]]"""
)
srs_exact_4230 = osr.SpatialReference()
srs_exact_4230.ImportFromEPSG(4230)
srs_approx_4230 = srs_exact_4230.Clone()
srs_approx_4230.MorphToESRI()
srs_approx_4230.ImportFromWkt(srs_approx_4230.ExportToWkt())
srs_approx_intl = osr.SpatialReference()
srs_approx_intl.ImportFromProj4("+proj=longlat +ellps=intl +no_defs")
srs_exact_4233 = osr.SpatialReference()
srs_exact_4233.ImportFromEPSG(4233)
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
lyr = ds.CreateLayer("srs_exact_4326", srs=srs_exact_4326, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_4326", srs=srs_approx_4326, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_exact_2193", srs=srs_exact_2193, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_2193", srs=srs_approx_2193, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_4230", srs=srs_approx_4230, geom_type=ogr.wkbPoint)
# will fail
with gdaltest.error_handler():
lyr = ds.CreateLayer(
"srs_approx_intl", srs=srs_approx_intl, geom_type=ogr.wkbPoint
)
# will fail: 4233 doesn't exist in DB
with gdaltest.error_handler():
lyr = ds.CreateLayer(
"srs_exact_4233", srs=srs_exact_4233, geom_type=ogr.wkbPoint
)
# will fail
with gdaltest.error_handler():
lyr = ds.CreateLayer("srs_not_in_db", srs=srs_not_in_db, geom_type=ogr.wkbPoint)
ds = None
ds = ogr.Open("tmp/test.gdb")
lyr = ds.GetLayerByName("srs_exact_4326")
assert lyr.GetSpatialRef().ExportToWkt().find("4326") != -1
lyr = ds.GetLayerByName("srs_approx_4326")
assert lyr.GetSpatialRef().ExportToWkt().find("4326") != -1
lyr = ds.GetLayerByName("srs_exact_2193")
assert lyr.GetSpatialRef().ExportToWkt().find("2193") != -1
lyr = ds.GetLayerByName("srs_approx_2193")
assert lyr.GetSpatialRef().ExportToWkt().find("2193") != -1
lyr = ds.GetLayerByName("srs_approx_4230")
assert lyr.GetSpatialRef().ExportToWkt().find("4230") != -1
ds = None
###############################################################################
# Test all data types
def test_ogr_fgdb_11(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
f = open("data/filegdb/test_filegdb_field_types.xml", "rt")
xml_def = f.read()
f.close()
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
lyr = ds.CreateLayer(
"test", geom_type=ogr.wkbNone, options=["XML_DEFINITION=%s" % xml_def]
)
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("esriFieldTypeSmallInteger", 12)
feat.SetField("esriFieldTypeInteger", 3456)
feat.SetField("esriFieldTypeSingle", 78.9)
feat.SetField("esriFieldTypeDouble", 1.23)
feat.SetField("esriFieldTypeDate", "2012/12/31 12:34:56")
feat.SetField("esriFieldTypeString", "astr")
feat.SetField(
"esriFieldTypeGlobalID", "{12345678-9ABC-DEF0-1234-567890ABCDEF}"
) # This is ignored and value is generated by FileGDB SDK itself
feat.SetField("esriFieldTypeGUID", "{12345678-9abc-DEF0-1234-567890ABCDEF}")
lyr.CreateFeature(feat)
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
# Create a esriFieldTypeGlobalID field
lyr = ds.CreateLayer(
"test2",
geom_type=ogr.wkbNone,
options=["COLUMN_TYPES=global_id=esriFieldTypeGlobalID"],
)
lyr.CreateField(ogr.FieldDefn("global_id", ogr.OFTString))
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
ds = None
ds = ogr.Open("tmp/test.gdb")
lyr = ds.GetLayerByName("test")
feat = lyr.GetNextFeature()
if (
feat.GetField("esriFieldTypeSmallInteger") != 12
or feat.GetField("esriFieldTypeInteger") != 3456
or feat.GetField("esriFieldTypeSingle") != pytest.approx(78.9, abs=1e-2)
or feat.GetField("esriFieldTypeDouble") != 1.23
or feat.GetField("esriFieldTypeDate") != "2012/12/31 12:34:56"
or feat.GetField("esriFieldTypeString") != "astr"
or feat.GetField("esriFieldTypeGUID")
!= "{12345678-9ABC-DEF0-1234-567890ABCDEF}"
or (not feat.IsFieldSet("esriFieldTypeGlobalID"))
):
feat.DumpReadable()
pytest.fail()
feat = lyr.GetNextFeature()
if not feat.IsFieldSet("esriFieldTypeGlobalID"):
feat.DumpReadable()
pytest.fail()
lyr = ds.GetLayerByName("test2")
feat = lyr.GetNextFeature()
if not feat.IsFieldSet("global_id"):
feat.DumpReadable()
pytest.fail()
ds = None
###############################################################################
# Test failed Open()
@gdaltest.disable_exceptions()
def test_ogr_fgdb_12():
ds = ogr.Open("tmp/non_existing.gdb")
assert ds is None
gdal.Unlink("tmp/dummy.gdb")
try:
shutil.rmtree("tmp/dummy.gdb")
except OSError:
pass
f = open("tmp/dummy.gdb", "wb")
f.close()
ds = ogr.Open("tmp/dummy.gdb")
assert ds is None
os.unlink("tmp/dummy.gdb")
os.mkdir("tmp/dummy.gdb")
with gdaltest.error_handler():
ds = ogr.Open("tmp/dummy.gdb")
assert ds is None
shutil.rmtree("tmp/dummy.gdb")
###############################################################################
# Test failed CreateDataSource() and DeleteDataSource()
def test_ogr_fgdb_13(fgdb_drv):
with gdaltest.error_handler():
ds = fgdb_drv.CreateDataSource("tmp/foo")
assert ds is None
f = open("tmp/dummy.gdb", "wb")
f.close()
with gdaltest.error_handler():
ds = fgdb_drv.CreateDataSource("tmp/dummy.gdb")
assert ds is None
os.unlink("tmp/dummy.gdb")
try:
shutil.rmtree("/nonexistingdir")
except OSError:
pass
if sys.platform == "win32":
name = "/nonexistingdrive:/nonexistingdir/dummy.gdb"
else:
name = "/proc/dummy.gdb"
with gdaltest.error_handler():
ds = fgdb_drv.CreateDataSource(name)
assert ds is None
with gdaltest.error_handler():
ret = fgdb_drv.DeleteDataSource(name)
assert ret != 0
###############################################################################
# Test interleaved opening and closing of databases (#4270)
def test_ogr_fgdb_14():
for _ in range(3):
ds1 = ogr.Open("tmp/test.gdb")
assert ds1 is not None
ds2 = ogr.Open("tmp/test.gdb")
assert ds2 is not None
ds2 = None
ds1 = None
###############################################################################
# Test opening a FGDB with both SRID and LatestSRID set (#5638)
def test_ogr_fgdb_15():
try:
shutil.rmtree("tmp/test3005.gdb")
except OSError:
pass
gdaltest.unzip("tmp", "data/filegdb/test3005.gdb.zip")
ds = ogr.Open("tmp/test3005.gdb")
lyr = ds.GetLayer(0)
got_wkt = lyr.GetSpatialRef().ExportToWkt()
sr = osr.SpatialReference()
sr.ImportFromEPSG(3005)
expected_wkt = sr.ExportToWkt()
assert got_wkt == expected_wkt
ds = None
###############################################################################
# Test fix for #5674
def test_ogr_fgdb_16(openfilegdb_drv, fgdb_drv):
if fgdb_drv is None or openfilegdb_drv is None:
pytest.skip()
try:
gdaltest.unzip("tmp/cache", "data/filegdb/ESSENCE_NAIPF_ORI_PROV_sub93.gdb.zip")
except OSError:
pass
try:
os.stat("tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb")
except OSError:
pytest.skip()
fgdb_drv.Deregister()
# Force FileGDB first
fgdb_drv.Register()
openfilegdb_drv.Register()
try:
ds = ogr.Open("tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb")
assert ds is not None
finally:
# Deregister OpenFileGDB again
openfilegdb_drv.Deregister()
shutil.rmtree("tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb")
###############################################################################
# Test not nullable fields
def test_ogr_fgdb_17(fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
lyr = ds.CreateLayer(
"test", geom_type=ogr.wkbPoint, srs=sr, options=["GEOMETRY_NULLABLE=NO"]
)
assert lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 0
field_defn = ogr.FieldDefn("field_not_nullable", ogr.OFTString)
field_defn.SetNullable(0)
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_nullable", ogr.OFTString)
lyr.CreateField(field_defn)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("field_not_nullable", "not_null")
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT(0 0)"))
ret = lyr.CreateFeature(f)
assert ret == 0
f = None
# Error case: missing geometry
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("field_not_nullable", "not_null")
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
f = None
# Error case: missing non-nullable field
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT(0 0)"))
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
f = None
ds = None
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByName("test")
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_not_nullable"))
.IsNullable()
== 0
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_nullable"))
.IsNullable()
== 1
)
assert lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 0
ds = None
###############################################################################
# Test default values
def test_ogr_fgdb_18(openfilegdb_drv, fgdb_drv):
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
lyr = ds.CreateLayer("test", geom_type=ogr.wkbNone)
field_defn = ogr.FieldDefn("field_string", ogr.OFTString)
field_defn.SetDefault("'a''b'")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_int", ogr.OFTInteger)
field_defn.SetDefault("123")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_real", ogr.OFTReal)
field_defn.SetDefault("1.23")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_nodefault", ogr.OFTInteger)
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_datetime", ogr.OFTDateTime)
field_defn.SetDefault("CURRENT_TIMESTAMP")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn("field_datetime2", ogr.OFTDateTime)
field_defn.SetDefault("'2015/06/30 12:34:56'")
lyr.CreateField(field_defn)
f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)
f = None
ds = None
if openfilegdb_drv is not None:
openfilegdb_drv.Register()
ret = ogr_fgdb_18_test_results(openfilegdb_drv)
if openfilegdb_drv is not None:
openfilegdb_drv.Deregister()
return ret
def ogr_fgdb_18_test_results(openfilegdb_drv):
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByName("test")
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_string"))
.GetDefault()
== "'a''b'"
)
if openfilegdb_drv is not None:
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_int"))
.GetDefault()
== "123"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_real"))
.GetDefault()
== "1.23"
)
assert (
lyr.GetLayerDefn()
.GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex("field_nodefault"))
.GetDefault()
is None
)
# if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime')).GetDefault() != 'CURRENT_TIMESTAMP':
# gdaltest.post_reason('fail')
# return 'fail'
# if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime2')).GetDefault() != "'2015/06/30 12:34:56'":
# gdaltest.post_reason('fail')
# print(lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime2')).GetDefault())
# return 'fail'
f = lyr.GetNextFeature()
if (
f.GetField("field_string") != "a'b"
or f.GetField("field_int") != 123
or f.GetField("field_real") != 1.23
or not f.IsFieldNull("field_nodefault")
or not f.IsFieldSet("field_datetime")
or f.GetField("field_datetime2") != "2015/06/30 12:34:56"
):
f.DumpReadable()
pytest.fail()
ds = None
###############################################################################
# Test transaction support
def ogr_fgdb_19_open_update(openfilegdb_drv, fgdb_drv, filename):
# We need the OpenFileGDB driver for Linux improved StartTransaction()
bPerLayerCopyingForTransaction = False
if openfilegdb_drv is not None:
openfilegdb_drv.Register()
if os.name != "nt":
val = gdal.GetConfigOption("FGDB_PER_LAYER_COPYING_TRANSACTION", "TRUE")
if val == "TRUE" or val == "YES" or val == "ON":
bPerLayerCopyingForTransaction = True
ds = fgdb_drv.Open(filename, update=1)
if openfilegdb_drv is not None:
openfilegdb_drv.Deregister()
fgdb_drv.Deregister()
# Force OpenFileGDB first
openfilegdb_drv.Register()
fgdb_drv.Register()
return (bPerLayerCopyingForTransaction, ds)
def test_ogr_fgdb_19(openfilegdb_drv, fgdb_drv):
# FIXME likely due to too old FileGDB SDK on those targets
# fails with ERROR 1: Failed to open Geodatabase (The system cannot find the file specified.)
# File "ogr_fgdb.py", line 1664, in ogr_fgdb_19
# if ds.StartTransaction(force=True) != 0:
if (
gdaltest.is_travis_branch("ubuntu_2004")
or gdaltest.is_travis_branch("ubuntu_1804")
or gdaltest.is_travis_branch("ubuntu_1604")
or gdaltest.is_travis_branch("trusty_clang")
or gdaltest.is_travis_branch("python3")
or gdaltest.is_travis_branch("trunk_with_coverage")
):
pytest.skip()
try:
shutil.rmtree("tmp/test.gdb.ogrtmp")
except OSError:
pass
try:
shutil.rmtree("tmp/test.gdb.ogredited")
except OSError:
pass
# Error case: try in read-only
ds = fgdb_drv.Open("tmp/test.gdb")
with gdaltest.error_handler():
ret = ds.StartTransaction(force=True)
assert ret != 0
ds = None
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
assert ds.TestCapability(ogr.ODsCEmulatedTransactions) == 1
# Error case: try in non-forced mode
with gdaltest.error_handler():
ret = ds.StartTransaction(force=False)
assert ret != 0
# Error case: try StartTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL("SELECT * FROM test")
with gdaltest.error_handler():
ret = ds.StartTransaction(force=True)
assert ret != 0
ds.ReleaseResultSet(sql_lyr)
# Error case: call CommitTransaction() while there is no transaction
with gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
# Error case: call RollbackTransaction() while there is no transaction
with gdaltest.error_handler():
ret = ds.RollbackTransaction()
assert ret != 0
# Error case: try StartTransaction() with another active connection
ds2 = fgdb_drv.Open("tmp/test.gdb", update=1)
with gdaltest.error_handler():
ret = ds2.StartTransaction(force=True)
assert ret != 0
ds2 = None
# Successful StartTransaction() finally!
lyr = ds.GetLayer(0)
lyr = ds.GetLayer(0) # again
old_count = lyr.GetFeatureCount()
lyr_defn = lyr.GetLayerDefn()
layer_created_before_transaction = ds.CreateLayer(
"layer_created_before_transaction", geom_type=ogr.wkbNone
)
layer_created_before_transaction_defn = (
layer_created_before_transaction.GetLayerDefn()
)
assert ds.StartTransaction(force=True) == 0
assert os.path.exists("tmp/test.gdb.ogredited")
assert not os.path.exists("tmp/test.gdb.ogrtmp")
ret = lyr.CreateField(ogr.FieldDefn("foobar", ogr.OFTString))
assert ret == 0
ret = lyr.DeleteField(lyr.GetLayerDefn().GetFieldIndex("foobar"))
assert ret == 0
with gdaltest.error_handler():
ret = lyr.CreateGeomField(ogr.GeomFieldDefn("foobar", ogr.wkbPoint))
assert ret != 0
with gdaltest.error_handler():
ret = lyr.ReorderFields([i for i in range(lyr.GetLayerDefn().GetFieldCount())])
assert ret != 0
with gdaltest.error_handler():
ret = lyr.AlterFieldDefn(0, ogr.FieldDefn("foo", ogr.OFTString), 0)
assert ret != 0
f = ogr.Feature(lyr_defn)
f.SetField("field_string", "foo")
lyr.CreateFeature(f)
lyr.SetFeature(f)
fid = f.GetFID()
assert fid > 0
lyr.ResetReading()
for i in range(fid):
f = lyr.GetNextFeature()
assert f.GetFID() == fid and f.GetField("field_string") == "foo"
f = lyr.GetFeature(fid)
assert f.GetFID() == fid and f.GetField("field_string") == "foo"
f = ogr.Feature(layer_created_before_transaction_defn)
layer_created_before_transaction.CreateFeature(f)
# Error case: call StartTransaction() while there is an active transaction
with gdaltest.error_handler():
ret = ds.StartTransaction(force=True)
assert ret != 0
# Error case: try CommitTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL("SELECT * FROM test")
with gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds.ReleaseResultSet(sql_lyr)
# Error case: try RollbackTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL("SELECT * FROM test")
with gdaltest.error_handler():
ret = ds.RollbackTransaction()
assert ret != 0
ds.ReleaseResultSet(sql_lyr)
# Test that CommitTransaction() works
assert ds.CommitTransaction() == 0
assert not os.path.exists("tmp/test.gdb.ogredited")
assert not os.path.exists("tmp/test.gdb.ogrtmp")
lst = gdal.ReadDir("tmp/test.gdb")
for filename in lst:
assert ".tmp" not in filename, lst
lyr_tmp = ds.GetLayer(0)
lyr_tmp = ds.GetLayer(0)
new_count = lyr_tmp.GetFeatureCount()
assert new_count == old_count + 1
old_count = new_count
assert layer_created_before_transaction.GetFeatureCount() == 1
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == layer_created_before_transaction.GetName():
ds.DeleteLayer(i)
break
layer_created_before_transaction = None
# Test suppression of layer within transaction
lyr_count = ds.GetLayerCount()
ds.CreateLayer("layer_tmp", geom_type=ogr.wkbNone)
ret = ds.StartTransaction(force=True)
assert ret == 0
ds.DeleteLayer(ds.GetLayerCount() - 1)
assert ds.CommitTransaction() == 0
new_lyr_count = ds.GetLayerCount()
assert new_lyr_count == lyr_count
# Test that RollbackTransaction() works
ret = ds.StartTransaction(force=True)
assert ret == 0
f = ogr.Feature(lyr_defn)
lyr.CreateFeature(f)
layer_created_during_transaction = ds.CreateLayer(
"layer_created_during_transaction", geom_type=ogr.wkbNone
)
layer_created_during_transaction.CreateField(ogr.FieldDefn("foo", ogr.OFTString))
assert ds.RollbackTransaction() == 0
assert not os.path.exists("tmp/test.gdb.ogredited")
assert not os.path.exists("tmp/test.gdb.ogrtmp")
assert lyr.GetFeatureCount() == old_count
# Cannot retrieve the layer any more from fresh
assert ds.GetLayerByName("layer_created_during_transaction") is None
# Pointer is in ghost state
assert layer_created_during_transaction.GetLayerDefn().GetFieldCount() == 0
# Simulate an error case where StartTransaction() cannot copy backup files
lyr_count = ds.GetLayerCount()
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE1"), gdaltest.error_handler():
ret = ds.StartTransaction(force=True)
assert ret != 0
assert ds.GetLayerCount() == lyr_count
# Simulate an error case where StartTransaction() cannot reopen database
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE2"), gdaltest.error_handler():
ret = ds.StartTransaction(force=True)
assert ret != 0
assert ds.GetLayerCount() == 0
shutil.rmtree("tmp/test.gdb.ogredited")
# Test method on ghost datasource and layer
ds.GetName()
ds.GetLayerCount()
ds.GetLayer(0)
ds.GetLayerByName("test")
ds.DeleteLayer(0)
ds.TestCapability("foo")
ds.CreateLayer("bar", geom_type=ogr.wkbNone)
ds.CopyLayer(lyr, "baz")
ds.GetStyleTable()
# ds.SetStyleTableDirectly(None)
ds.SetStyleTable(None)
sql_lyr = ds.ExecuteSQL("SELECT * FROM test")
ds.ReleaseResultSet(sql_lyr)
ds.FlushCache()
ds.GetMetadata()
ds.GetMetadataItem("foo")
ds.SetMetadata(None)
ds.SetMetadataItem("foo", None)
lyr.GetSpatialFilter()
lyr.SetSpatialFilter(None)
lyr.SetSpatialFilterRect(0, 0, 0, 0)
lyr.SetSpatialFilter(0, None)
lyr.SetSpatialFilterRect(0, 0, 0, 0, 0)
lyr.SetAttributeFilter(None)
lyr.ResetReading()
lyr.GetNextFeature()
lyr.SetNextByIndex(0)
lyr.GetFeature(0)
lyr.SetFeature(ogr.Feature(lyr.GetLayerDefn()))
lyr.CreateFeature(ogr.Feature(lyr.GetLayerDefn()))
lyr.DeleteFeature(0)
lyr.GetName()
lyr.GetGeomType()
lyr.GetLayerDefn()
lyr.GetSpatialRef()
lyr.GetFeatureCount()
lyr.GetExtent()
lyr.GetExtent(0)
lyr.TestCapability("foo")
lyr.CreateField(ogr.FieldDefn("foo", ogr.OFTString))
lyr.DeleteField(0)
lyr.ReorderFields([i for i in range(lyr.GetLayerDefn().GetFieldCount())])
lyr.AlterFieldDefn(0, ogr.FieldDefn("foo", ogr.OFTString), 0)
lyr.SyncToDisk()
lyr.GetStyleTable()
# lyr.SetStyleTableDirectly(None)
lyr.SetStyleTable(None)
lyr.StartTransaction()
lyr.CommitTransaction()
lyr.RollbackTransaction()
lyr.SetIgnoredFields([])
lyr.GetMetadata()
lyr.GetMetadataItem("foo")
lyr.SetMetadata(None)
lyr.SetMetadataItem("foo", None)
ds = None
if bPerLayerCopyingForTransaction:
# Test an error case where we simulate a failure of destroying a
# layer destroyed during transaction
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
layer_tmp = ds.CreateLayer("layer_tmp", geom_type=ogr.wkbNone)
layer_tmp.CreateField(ogr.FieldDefn("foo", ogr.OFTString))
assert ds.StartTransaction(force=True) == 0
ds.DeleteLayer(ds.GetLayerCount() - 1)
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE1"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds = None
shutil.rmtree("tmp/test.gdb.ogredited")
lst = gdal.ReadDir("tmp/test.gdb")
for filename in lst:
assert ".tmp" not in filename, lst
# Test an error case where we simulate a failure in renaming
# a file in original directory
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == "layer_tmp":
ds.DeleteLayer(i)
break
assert ds.StartTransaction(force=True) == 0
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE2"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds = None
shutil.rmtree("tmp/test.gdb.ogredited")
lst = gdal.ReadDir("tmp/test.gdb")
for filename in lst:
assert ".tmp" not in filename, lst
# Test an error case where we simulate a failure in moving
# a file into original directory
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
assert ds.StartTransaction(force=True) == 0
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE3"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds = None
shutil.rmtree("tmp/test.gdb.ogredited")
# Remove left over .tmp files
lst = gdal.ReadDir("tmp/test.gdb")
for filename in lst:
if ".tmp" in filename:
os.remove("tmp/test.gdb/" + filename)
# Test not critical error in removing a temporary file
for case in ("CASE4", "CASE5"):
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
assert ds.StartTransaction(force=True) == 0
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
with gdal.config_option("FGDB_SIMUL_FAIL", case), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret == 0, case
ds = None
if case == "CASE4":
assert not os.path.exists("tmp/test.gdb.ogredited"), case
else:
shutil.rmtree("tmp/test.gdb.ogredited")
# Remove left over .tmp files
lst = gdal.ReadDir("tmp/test.gdb")
for filename in lst:
if ".tmp" in filename:
os.remove("tmp/test.gdb/" + filename)
else:
# Test an error case where we simulate a failure of rename from .gdb to .gdb.ogrtmp during commit
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE1"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds = None
# Test an error case where we simulate a failure of rename from .gdb.ogredited to .gdb during commit
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE2"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
ds = None
os.rename("tmp/test.gdb.ogrtmp", "tmp/test.gdb")
# Test an error case where we simulate a failure of removing from .gdb.ogrtmp during commit
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE3"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret == 0
ds = None
shutil.rmtree("tmp/test.gdb.ogrtmp")
# Test an error case where we simulate a failure of reopening the committed DB
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE_REOPEN"), gdaltest.error_handler():
ret = ds.CommitTransaction()
assert ret != 0
assert ds.GetLayerCount() == 0
ds = None
# Test an error case where we simulate a failure of removing from .gdb.ogredited during rollback
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE1"), gdaltest.error_handler():
ret = ds.RollbackTransaction()
assert ret != 0
ds = None
shutil.rmtree("tmp/test.gdb.ogredited")
# Test an error case where we simulate a failure of reopening the rollbacked DB
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert ds.StartTransaction(force=True) == 0
with gdal.config_option("FGDB_SIMUL_FAIL", "CASE2"), gdaltest.error_handler():
ret = ds.RollbackTransaction()
assert ret != 0
assert ds.GetLayerCount() == 0
ds = None
if openfilegdb_drv is not None:
openfilegdb_drv.Deregister()
# Same, but retry without per-layer copying optimization (in the case
# this was what was tested in previous step)
def test_ogr_fgdb_19bis(openfilegdb_drv, fgdb_drv):
if (
gdaltest.is_travis_branch("ubuntu_2004")
or gdaltest.is_travis_branch("ubuntu_1804")
or gdaltest.is_travis_branch("ubuntu_1604")
or gdaltest.is_travis_branch("trusty_clang")
or gdaltest.is_travis_branch("python3")
or gdaltest.is_travis_branch("trunk_with_coverage")
):
pytest.skip()
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update(
openfilegdb_drv, fgdb_drv, "tmp/test.gdb"
)
del ds
if not bPerLayerCopyingForTransaction:
pytest.skip()
with gdal.config_option("FGDB_PER_LAYER_COPYING_TRANSACTION", "FALSE"):
ret = test_ogr_fgdb_19(openfilegdb_drv, fgdb_drv)
return ret
###############################################################################
# Test CreateFeature() with user defined FID
def test_ogr_fgdb_20(openfilegdb_drv, fgdb_drv):
if openfilegdb_drv is None:
pytest.skip("No OpenFileGDB driver available")
if (
gdaltest.is_travis_branch("ubuntu_2004")
or gdaltest.is_travis_branch("ubuntu_1804")
or gdaltest.is_travis_branch("ubuntu_1604")
or gdaltest.is_travis_branch("trusty_clang")
or gdaltest.is_travis_branch("python3")
or gdaltest.is_travis_branch("trunk_with_coverage")
):
pytest.skip()
if not os.path.exists("tmp/test.gdb"):
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
ds = None
# We need the OpenFileGDB driver for CreateFeature() with user defined FID
openfilegdb_drv.Register()
ds = fgdb_drv.Open("tmp/test.gdb", update=1)
openfilegdb_drv.Deregister()
fgdb_drv.Deregister()
# Force OpenFileGDB first
openfilegdb_drv.Register()
fgdb_drv.Register()
lyr = ds.CreateLayer("test_2147483647", geom_type=ogr.wkbNone)
lyr.CreateField(ogr.FieldDefn("int", ogr.OFTInteger))
f = ogr.Feature(lyr.GetLayerDefn())
fid = 2147483647
f.SetFID(fid)
f.SetField(0, fid)
lyr.CreateFeature(f)
ds = None
ds = openfilegdb_drv.Open("tmp/test.gdb")
lyr = ds.GetLayerByName("test_2147483647")
f = lyr.GetNextFeature()
assert f
assert f.GetFID() == 2147483647
ds = None
ds = fgdb_drv.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByName("test_2147483647")
# GetNextFeature() is excruciatingly slow on such huge FID with the SDK driver
f = lyr.GetFeature(2147483647)
assert f
lyr = ds.CreateLayer("ogr_fgdb_20", geom_type=ogr.wkbNone)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
ds.ExecuteSQL("CREATE INDEX ogr_fgdb_20_id ON ogr_fgdb_20(id)")
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("id", 1)
ret = lyr.CreateFeature(f)
assert (
ret == 0
and f.GetFID() == 1
and lyr.GetMetadataItem("1", "MAP_OGR_FID_TO_FGDB_FID") is None
)
# Existing FID
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
for invalid_fid in [-2, 0, 9876543210]:
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(invalid_fid)
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0, invalid_fid
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(2)
f.SetField("id", 2)
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != 2
or lyr.GetMetadataItem("2", "MAP_OGR_FID_TO_FGDB_FID") is not None
):
f.DumpReadable()
pytest.fail()
# OGR FID = 4, FileGDB FID = 3
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(4)
f.SetField("id", 4)
# Cannot call CreateFeature() with a set FID when a dataset is opened more than once
ds2 = fgdb_drv.Open("tmp/test.gdb", update=1)
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
ds2 = None
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != 4
or lyr.GetMetadataItem("4", "MAP_OGR_FID_TO_FGDB_FID") != "3"
):
f.DumpReadable()
pytest.fail(lyr.GetMetadataItem("4", "MAP_OGR_FID_TO_FGDB_FID"))
# Cannot open geodatabase at the moment since it is in 'FID hack mode'
with gdaltest.error_handler():
ds2 = fgdb_drv.Open("tmp/test.gdb", update=1)
assert ds2 is None
ds2 = None
# Existing FID, but only in OGR space
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
# This FID exists as a FGDB ID, but should not be user visible.
f.SetFID(3)
ret = lyr.SetFeature(f)
assert ret == ogr.OGRERR_NON_EXISTING_FEATURE
ret = lyr.DeleteFeature(3)
assert ret == ogr.OGRERR_NON_EXISTING_FEATURE
ret = lyr.GetFeature(3)
assert ret is None
# Trying to set OGR FID = 3 --> FileGDB FID = 4
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(3)
f.SetField("id", 3)
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != 3
or lyr.GetMetadataItem("3", "MAP_OGR_FID_TO_FGDB_FID") != "4"
):
f.DumpReadable()
pytest.fail()
lyr.ResetReading()
expected = [(1, None), (2, None), (4, 3), (3, 4)]
for i in range(2):
for (fid, fgdb_fid) in expected:
if i == 0:
f = lyr.GetNextFeature()
else:
f = lyr.GetFeature(fid)
assert f is not None
if f.GetFID() != fid or f.GetField("id") != fid:
f.DumpReadable()
pytest.fail(fid)
got_fgdb_fid = lyr.GetMetadataItem(
str(f.GetFID()), "MAP_OGR_FID_TO_FGDB_FID"
)
if got_fgdb_fid is None:
assert fgdb_fid is None
elif int(got_fgdb_fid) != fgdb_fid:
print(fgdb_fid)
pytest.fail(got_fgdb_fid)
for fid in [-9876543210, 0, 100]:
f = lyr.GetFeature(fid)
if f is not None:
f.DumpReadable()
pytest.fail()
for invalid_fid in [-2, 0, 9876543210]:
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(invalid_fid)
ret = lyr.SetFeature(f)
assert ret == ogr.OGRERR_NON_EXISTING_FEATURE
ret = lyr.DeleteFeature(invalid_fid)
assert ret == ogr.OGRERR_NON_EXISTING_FEATURE
f = lyr.GetFeature(3)
f.SetField("str", "3")
ret = lyr.SetFeature(f)
assert ret == 0
f = lyr.GetFeature(3)
assert f.GetField("str") == "3"
ret = lyr.DeleteFeature(1)
assert ret == 0
ret = lyr.DeleteFeature(3)
assert ret == 0
for (fid, fgdb_fid) in [
(3, 5),
(2049, 6),
(10, 7),
(7, 8),
(9, None),
(8, 10),
(12, 11),
]:
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(fid)
f.SetField("id", fid)
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != fid
or str(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
!= str(fgdb_fid)
):
f.DumpReadable()
print(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
pytest.fail(fid)
# Normally 12 should be attributed, but it has already been reserved
f = ogr.Feature(lyr.GetLayerDefn())
ret = lyr.CreateFeature(f)
if ret != 0 or f.GetFID() != 13:
f.DumpReadable()
pytest.fail()
f.SetField("id", f.GetFID())
lyr.SetFeature(f)
lyr.ResetReading()
expected = [
(2, None),
(4, 3),
(3, 5),
(2049, 6),
(10, 7),
(7, 8),
(9, None),
(8, 10),
]
for (fid, fgdb_fid) in expected:
f = lyr.GetNextFeature()
assert f is not None
if (
f.GetFID() != fid
or f.GetField("id") != fid
or str(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
!= str(fgdb_fid)
):
f.DumpReadable()
print(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
pytest.fail(fid)
lyr.SetAttributeFilter("id = 3")
lyr.ResetReading()
f = lyr.GetNextFeature()
if f.GetFID() != 3:
f.DumpReadable()
pytest.fail()
# This will cause a resync of indexes
lyr.SetAttributeFilter("OBJECTID = 3")
lyr.ResetReading()
f = lyr.GetNextFeature()
if f.GetFID() != 3:
f.DumpReadable()
pytest.fail()
# No sparse pages
lyr = ds.CreateLayer("ogr_fgdb_20_simple", geom_type=ogr.wkbNone)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(2)
f.SetField("id", 2)
lyr.CreateFeature(f)
# This will cause a resync of indexes
sql_lyr = ds.ExecuteSQL("SELECT * FROM ogr_fgdb_20_simple")
f = sql_lyr.GetNextFeature()
if f.GetFID() != 2:
f.DumpReadable()
pytest.fail()
# Do not allow user set FID while a select layer is in progress
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(3)
f.SetField("id", 3)
with gdaltest.error_handler():
ret = lyr.CreateFeature(f)
assert ret != 0
ds.ReleaseResultSet(sql_lyr)
# Do it in transaction, but this is completely orthogonal
ds.StartTransaction(force=True)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(3)
f.SetField("id", 3)
lyr.CreateFeature(f)
f = None
ds.CommitTransaction()
# Multi-page indexes
srs = osr.SpatialReference()
srs.ImportFromEPSG(32630)
with gdal.config_option("FGDB_RESYNC_THRESHOLD", "600"):
lyr = ds.CreateLayer("ogr_fgdb_20_indexes", geom_type=ogr.wkbPoint, srs=srs)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
ds.ExecuteSQL("CREATE INDEX ogr_fgdb_20_indexes_id ON ogr_fgdb_20_indexes(id)")
with gdal.config_option("FGDB_BULK_LOAD", "YES"):
for i in range(1000):
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(i + 2)
f.SetField("id", i + 2)
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (%d 0)" % i))
lyr.CreateFeature(f)
ds = None
# Check consistency after re-opening
gdal.ErrorReset()
for update in [0, 1]:
ds = fgdb_drv.Open("tmp/test.gdb", update=update)
lyr = ds.GetLayerByName("ogr_fgdb_20")
assert lyr.GetFeatureCount() == 10
lyr.ResetReading()
expected = [2, 3, 4, 7, 8, 9, 10, 12, 13, 2049]
for fid in expected:
f = lyr.GetNextFeature()
assert gdal.GetLastErrorType() == 0
assert f is not None, fid
if f.GetFID() != fid or f.GetField("id") != fid:
f.DumpReadable()
pytest.fail(fid)
for fid in expected:
lyr.SetAttributeFilter("id = %d" % fid)
lyr.ResetReading()
f = lyr.GetNextFeature()
if f.GetFID() != fid or f.GetField("id") != fid:
f.DumpReadable()
pytest.fail(fid)
lyr = ds.GetLayerByName("ogr_fgdb_20_simple")
f = lyr.GetNextFeature()
assert f.GetFID() == 2
f = lyr.GetNextFeature()
assert f.GetFID() == 3
# Check attribute index
lyr = ds.GetLayerByName("ogr_fgdb_20_indexes")
for i in range(1000):
fid = i + 2
lyr.SetAttributeFilter("id = %d" % fid)
lyr.ResetReading()
f = lyr.GetNextFeature()
assert f.GetFID() == fid
# Check spatial index
lyr.SetAttributeFilter(None)
if update == 1:
for i in range(1000):
fid = i + 2
lyr.SetSpatialFilterRect(i - 0.01, -0.01, i + 0.01, 0.01)
lyr.ResetReading()
f = lyr.GetNextFeature()
assert f.GetFID() == fid
# Insert new features
ds = fgdb_drv.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByName("ogr_fgdb_20")
for (fid, fgdb_fid) in [
(10000000, 2050),
(10000001, 2051),
(8191, 2052),
(16384, 2053),
]:
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(fid)
f.SetField("id", fid)
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != fid
or str(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
!= str(fgdb_fid)
):
f.DumpReadable()
pytest.fail(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
ds = None
# Insert a new intermediate FIDs
for (fid, fgdb_fid) in [(1000000, 10000002), (1000001, 10000002)]:
ds = fgdb_drv.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByName("ogr_fgdb_20")
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(fid)
f.SetField("id", fid)
ret = lyr.CreateFeature(f)
if (
ret != 0
or f.GetFID() != fid
or lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID") != str(fgdb_fid)
):
f.DumpReadable()
pytest.fail(lyr.GetMetadataItem(str(fid), "MAP_OGR_FID_TO_FGDB_FID"))
ds = None
# Check consistency after re-opening
gdal.ErrorReset()
for update in [0, 1]:
ds = fgdb_drv.Open("tmp/test.gdb", update=update)
lyr = ds.GetLayerByName("ogr_fgdb_20")
assert lyr.GetFeatureCount() == 16
lyr.ResetReading()
expected = [
2,
3,
4,
7,
8,
9,
10,
12,
13,
2049,
8191,
16384,
1000000,
1000001,
10000000,
10000001,
]
for fid in expected:
f = lyr.GetNextFeature()
assert gdal.GetLastErrorType() == 0
assert f is not None, fid
if f.GetFID() != fid or f.GetField("id") != fid:
f.DumpReadable()
pytest.fail(fid)
# Simulate different errors when database reopening is done
# to sync ids
for case in ("CASE1", "CASE2", "CASE3"):
try:
shutil.rmtree("tmp/test2.gdb")
except OSError:
pass
ds = fgdb_drv.CreateDataSource("tmp/test2.gdb")
lyr = ds.CreateLayer("foo", geom_type=ogr.wkbNone)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(2)
f.SetField("id", 2)
lyr.CreateFeature(f)
with gdaltest.error_handler():
with gdal.config_option("FGDB_SIMUL_FAIL_REOPEN", case):
sql_lyr = ds.ExecuteSQL("SELECT * FROM foo")
if case == "CASE3":
assert sql_lyr is not None, case
ds.ReleaseResultSet(sql_lyr)
else:
assert sql_lyr is None, case
# Everything will fail, but hopefully without crashing
lyr.ResetReading()
assert lyr.GetNextFeature() is None
assert lyr.GetFeature(1) is None
assert lyr.DeleteFeature(1) != 0
assert lyr.CreateFeature(f) != 0
assert lyr.SetFeature(f) != 0
if case != "CASE3":
assert ds.CreateLayer("bar", geom_type=ogr.wkbNone) is None
assert ds.DeleteLayer(0) != 0
sql_lyr = ds.ExecuteSQL("SELECT * FROM foo")
assert case == "CASE3" or sql_lyr is None
ds.ReleaseResultSet(sql_lyr)
ds = None
openfilegdb_drv.Deregister()
# sys.exit(0)
###############################################################################
# Test M support
def test_ogr_fgdb_21(fgdb_drv, fgdb_sdk_1_4_or_later):
# Fails on MULTIPOINT ZM
if (
gdaltest.is_travis_branch("ubuntu_2004")
or gdaltest.is_travis_branch("ubuntu_1804")
or gdaltest.is_travis_branch("ubuntu_1604")
or gdaltest.is_travis_branch("python3")
or gdaltest.is_ci()
):
pytest.skip()
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
ds = fgdb_drv.CreateDataSource("tmp/test.gdb")
datalist = [
["pointm", ogr.wkbPointM, "POINT M (1 2 3)"],
["pointzm", ogr.wkbPointM, "POINT ZM (1 2 3 4)"],
["multipointm", ogr.wkbMultiPointM, "MULTIPOINT M ((1 2 3),(4 5 6))"],
["multipointzm", ogr.wkbMultiPointZM, "MULTIPOINT ZM ((1 2 3 4),(5 6 7 8))"],
[
"linestringm",
ogr.wkbLineStringM,
"LINESTRING M (1 2 3,4 5 6)",
"MULTILINESTRING M ((1 2 3,4 5 6))",
],
[
"linestringzm",
ogr.wkbLineStringZM,
"LINESTRING ZM (1 2 3 4,5 6 7 8)",
"MULTILINESTRING ZM ((1 2 3 4,5 6 7 8))",
],
[
"multilinestringm",
ogr.wkbMultiLineStringM,
"MULTILINESTRING M ((1 2 3,4 5 6))",
],
[
"multilinestringzm",
ogr.wkbMultiLineStringZM,
"MULTILINESTRING ZM ((1 2 3 4,5 6 7 8))",
],
[
"polygonm",
ogr.wkbPolygonM,
"POLYGON M ((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1))",
"MULTIPOLYGON M (((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1)))",
],
[
"polygonzm",
ogr.wkbPolygonZM,
"POLYGON ZM ((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1))",
"MULTIPOLYGON ZM (((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1)))",
],
[
"multipolygonm",
ogr.wkbMultiPolygonM,
"MULTIPOLYGON M (((0 0 1,0 1 2,1 1 3,1 0 4,0 0 1)))",
],
[
"multipolygonzm",
ogr.wkbMultiPolygonZM,
"MULTIPOLYGON ZM (((0 0 1 -1,0 1 2 -2,1 1 3 -3,1 0 4 -4,0 0 1 -1)))",
],
["empty_polygonm", ogr.wkbPolygonM, "POLYGON M EMPTY", None],
]
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
for data in datalist:
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=[])
feat = ogr.Feature(lyr.GetLayerDefn())
# print(data[2])
feat.SetGeometry(ogr.CreateGeometryFromWkt(data[2]))
lyr.CreateFeature(feat)
ds = None
ds = ogr.Open("tmp/test.gdb")
for data in datalist:
lyr = ds.GetLayerByName(data[0])
expected_geom_type = data[1]
if expected_geom_type == ogr.wkbLineStringM:
expected_geom_type = ogr.wkbMultiLineStringM
elif expected_geom_type == ogr.wkbLineStringZM:
expected_geom_type = ogr.wkbMultiLineStringZM
elif expected_geom_type == ogr.wkbPolygonM:
expected_geom_type = ogr.wkbMultiPolygonM
elif expected_geom_type == ogr.wkbPolygonZM:
expected_geom_type = ogr.wkbMultiPolygonZM
assert lyr.GetGeomType() == expected_geom_type, data
feat = lyr.GetNextFeature()
try:
expected_wkt = data[3]
except IndexError:
expected_wkt = data[2]
if expected_wkt is None:
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail(data)
elif ogrtest.check_feature_geometry(feat, expected_wkt) != 0:
feat.DumpReadable()
pytest.fail(data)
###############################################################################
# Read curves
@pytest.mark.require_driver("CSV")
def test_ogr_fgdb_22():
ds = ogr.Open("data/filegdb/curves.gdb")
lyr = ds.GetLayerByName("line")
ds_ref = ogr.Open("data/filegdb/curves_line.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
lyr = ds.GetLayerByName("polygon")
ds_ref = ogr.Open("data/filegdb/curves_polygon.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
ds = ogr.Open("data/filegdb/curve_circle_by_center.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open("data/filegdb/curve_circle_by_center.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToWkt())
###############################################################################
# Test opening '.'
def test_ogr_fgdb_23():
os.chdir("data/filegdb/curves.gdb")
ds = ogr.Open(".")
os.chdir("../../..")
assert ds is not None
###############################################################################
# Read polygons with M component where the M of the closing point is not the
# one of the starting point (#7017)
@pytest.mark.require_driver("CSV")
def test_ogr_fgdb_24():
ds = ogr.Open("data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open(
"data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb.csv"
)
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToIsoWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToIsoWkt())
ds = ogr.Open("data/filegdb/filegdb_polygonzm_nan_m_with_curves.gdb")
lyr = ds.GetLayer(0)
ds_ref = ogr.Open("data/filegdb/filegdb_polygonzm_nan_m_with_curves.gdb.csv")
lyr_ref = ds_ref.GetLayer(0)
for f in lyr:
f_ref = lyr_ref.GetNextFeature()
if ogrtest.check_feature_geometry(f, f_ref.GetGeometryRef()) != 0:
print(f.GetGeometryRef().ExportToIsoWkt())
pytest.fail(f_ref.GetGeometryRef().ExportToIsoWkt())
###############################################################################
# Test selecting FID column with OGRSQL
def test_ogr_fgdb_25():
ds = ogr.Open("data/filegdb/curves.gdb")
sql_lyr = ds.ExecuteSQL("SELECT OBJECTID FROM polygon WHERE OBJECTID = 2")
assert sql_lyr is not None
f = sql_lyr.GetNextFeature()
if f.GetFID() != 2:
f.DumpReadable()
pytest.fail()
f = sql_lyr.GetNextFeature()
assert f is None
ds.ReleaseResultSet(sql_lyr)
lyr = ds.GetLayerByName("polygon")
lyr.SetAttributeFilter("OBJECTID = 2")
f = lyr.GetNextFeature()
if f.GetFID() != 2:
f.DumpReadable()
pytest.fail()
###############################################################################
# Test bugfix for https://github.com/OSGeo/gdal/issues/1369
# where a polygon with inner rings has its exterior ring with wrong orientation
@pytest.mark.require_geos
def test_ogr_fgdb_weird_winding_order(fgdb_sdk_1_4_or_later):
try:
shutil.rmtree("tmp/roads_clip Drawing.gdb")
except OSError:
pass
gdaltest.unzip("tmp", "data/filegdb/weird_winding_order_fgdb.zip")
ds = ogr.Open("tmp/roads_clip Drawing.gdb")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
g = f.GetGeometryRef()
assert g.GetGeometryCount() == 1
assert g.GetGeometryRef(0).GetGeometryCount() == 17
###############################################################################
# Test bugfix for https://github.com/OSGeo/gdal/issues/1369
# where a polygon with inner rings has its exterior ring with wrong orientation
def test_ogr_fgdb_utc_datetime():
ds = ogr.Open("data/filegdb/testdatetimeutc.gdb")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
# Check that the timezone +00 is present
assert f.GetFieldAsString("EditDate") == "2020/06/22 07:49:36+00"
###############################################################################
# Test field alias
def test_ogr_fgdb_alias(fgdb_drv):
try:
shutil.rmtree("tmp/alias.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/alias.gdb")
lyr = ds.CreateLayer("test", srs=srs, geom_type=ogr.wkbPoint)
fld_defn = ogr.FieldDefn("short_name", ogr.OFTInteger)
fld_defn.SetAlternativeName("longer name")
lyr.CreateField(fld_defn)
fld_defn = ogr.FieldDefn("regular_name", ogr.OFTInteger)
lyr.CreateField(fld_defn)
ds = None
ds = ogr.Open("tmp/alias.gdb")
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert lyr_defn.GetFieldDefn(0).GetAlternativeName() == "longer name"
assert lyr_defn.GetFieldDefn(1).GetAlternativeName() == ""
try:
shutil.rmtree("tmp/alias.gdb")
except OSError:
pass
###############################################################################
# Test field alias with ampersand character. Requires OpenFileGDB to be read back
@pytest.mark.require_driver("OpenFileGDB")
def test_ogr_fgdb_alias_with_ampersand(fgdb_drv, openfilegdb_drv):
try:
shutil.rmtree("tmp/alias.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = fgdb_drv.CreateDataSource("tmp/alias.gdb")
lyr = ds.CreateLayer("test", srs=srs, geom_type=ogr.wkbPoint)
fld_defn = ogr.FieldDefn("short_name", ogr.OFTInteger)
fld_defn.SetAlternativeName("longer & name")
lyr.CreateField(fld_defn)
fld_defn = ogr.FieldDefn("regular_name", ogr.OFTInteger)
lyr.CreateField(fld_defn)
ds = None
openfilegdb_drv.Register()
ds = fgdb_drv.Open("tmp/alias.gdb")
openfilegdb_drv.Deregister()
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert lyr_defn.GetFieldDefn(0).GetAlternativeName() == "longer & name"
assert lyr_defn.GetFieldDefn(1).GetAlternativeName() == ""
try:
shutil.rmtree("tmp/alias.gdb")
except OSError:
pass
###############################################################################
# Test reading field domains
def _check_domains(ds):
assert set(ds.GetFieldDomainNames()) == {
"MedianType",
"RoadSurfaceType",
"SpeedLimit",
}
with gdaltest.error_handler():
assert ds.GetFieldDomain("i_dont_exist") is None
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
fld_defn = lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("MaxSpeed"))
assert fld_defn.GetDomainName() == "SpeedLimit"
domain = ds.GetFieldDomain("SpeedLimit")
assert domain is not None
assert domain.GetName() == "SpeedLimit"
assert domain.GetDescription() == "The maximun speed of the road"
assert domain.GetDomainType() == ogr.OFDT_RANGE
assert domain.GetFieldType() == fld_defn.GetType()
assert domain.GetFieldSubType() == fld_defn.GetSubType()
assert domain.GetMinAsDouble() == 40.0
assert domain.GetMaxAsDouble() == 100.0
fld_defn = lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("MedianType"))
assert fld_defn.GetDomainName() == "MedianType"
domain = ds.GetFieldDomain("MedianType")
assert domain is not None
assert domain.GetName() == "MedianType"
assert domain.GetDescription() == "Road median types."
assert domain.GetDomainType() == ogr.OFDT_CODED
assert domain.GetFieldType() == fld_defn.GetType()
assert domain.GetFieldSubType() == fld_defn.GetSubType()
assert domain.GetEnumeration() == {"0": "None", "1": "Cement"}
def test_ogr_fgdb_read_domains():
ds = gdal.OpenEx("data/filegdb/Domains.gdb", gdal.OF_VECTOR)
_check_domains(ds)
###############################################################################
# Test writing field domains
def test_ogr_fgdb_write_domains(fgdb_drv):
out_dir = "tmp/test_ogr_fgdb_write_domains.gdb"
try:
shutil.rmtree(out_dir)
except OSError:
pass
ds = gdal.VectorTranslate(out_dir, "data/filegdb/Domains.gdb", options="-f FileGDB")
_check_domains(ds)
assert ds.TestCapability(ogr.ODsCAddFieldDomain) == 1
assert ds.TestCapability(ogr.ODsCDeleteFieldDomain) == 1
assert ds.TestCapability(ogr.ODsCUpdateFieldDomain) == 1
with pytest.raises(Exception):
with gdal.ExceptionMgr():
ds.DeleteFieldDomain("not_existing")
domain = ogr.CreateCodedFieldDomain(
"unused_domain", "desc", ogr.OFTInteger, ogr.OFSTNone, {1: "one", "2": None}
)
assert ds.AddFieldDomain(domain)
assert ds.DeleteFieldDomain("unused_domain")
domain = ds.GetFieldDomain("unused_domain")
assert domain is None
domain = ogr.CreateRangeFieldDomain(
"SpeedLimit", "desc", ogr.OFTInteger, ogr.OFSTNone, 1, True, 2, True
)
assert ds.UpdateFieldDomain(domain)
ds = None
ds = gdal.OpenEx(out_dir, allowed_drivers=["FileGDB"])
domain = ds.GetFieldDomain("SpeedLimit")
assert domain.GetDescription() == "desc"
ds = None
try:
shutil.rmtree(out_dir)
except OSError:
pass
###############################################################################
# Test reading layer hierarchy
@gdaltest.disable_exceptions()
def test_ogr_fgdb_read_layer_hierarchy():
if False:
# Test dataset produced with:
from osgeo import ogr, osr
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogr.GetDriverByName("FileGDB").CreateDataSource("featuredataset.gdb")
ds.CreateLayer(
"fd1_lyr1", srs=srs, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd1"]
)
ds.CreateLayer(
"fd1_lyr2", srs=srs, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd1"]
)
srs2 = osr.SpatialReference()
srs2.ImportFromEPSG(32631)
ds.CreateLayer("standalone", srs=srs2, geom_type=ogr.wkbPoint)
srs3 = osr.SpatialReference()
srs3.ImportFromEPSG(32632)
ds.CreateLayer(
"fd2_lyr", srs=srs3, geom_type=ogr.wkbPoint, options=["FEATURE_DATASET=fd2"]
)
ds = gdal.OpenEx("data/filegdb/featuredataset.gdb")
rg = ds.GetRootGroup()
assert rg.GetGroupNames() == ["fd1", "fd2"]
assert rg.OpenGroup("not_existing") is None
fd1 = rg.OpenGroup("fd1")
assert fd1 is not None
assert fd1.GetVectorLayerNames() == ["fd1_lyr1", "fd1_lyr2"]
assert fd1.OpenVectorLayer("not_existing") is None
assert fd1.GetGroupNames() is None
fd1_lyr1 = fd1.OpenVectorLayer("fd1_lyr1")
assert fd1_lyr1 is not None
assert fd1_lyr1.GetName() == "fd1_lyr1"
fd1_lyr2 = fd1.OpenVectorLayer("fd1_lyr2")
assert fd1_lyr2 is not None
assert fd1_lyr2.GetName() == "fd1_lyr2"
fd2 = rg.OpenGroup("fd2")
assert fd2 is not None
assert fd2.GetVectorLayerNames() == ["fd2_lyr"]
fd2_lyr = fd2.OpenVectorLayer("fd2_lyr")
assert fd2_lyr is not None
assert rg.GetVectorLayerNames() == ["standalone"]
standalone = rg.OpenVectorLayer("standalone")
assert standalone is not None
###############################################################################
# Test renaming a layer
@pytest.mark.parametrize("options", [[], ["FEATURE_DATASET=fd1"]])
def test_ogr_fgdb_rename_layer(fgdb_drv, options):
try:
shutil.rmtree("tmp/rename.gdb")
except OSError:
pass
srs4326 = osr.SpatialReference()
srs4326.ImportFromEPSG(4326)
ds = fgdb_drv.CreateDataSource("tmp/rename.gdb")
ds.CreateLayer("other_layer", geom_type=ogr.wkbNone)
lyr = ds.CreateLayer("foo", geom_type=ogr.wkbPoint, srs=srs4326, options=options)
assert lyr.TestCapability(ogr.OLCRename) == 1
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)
assert lyr.Rename("bar") == ogr.OGRERR_NONE
assert lyr.GetDescription() == "bar"
assert lyr.GetLayerDefn().GetName() == "bar"
with gdaltest.error_handler():
assert lyr.Rename("bar") != ogr.OGRERR_NONE
with gdaltest.error_handler():
assert lyr.Rename("other_layer") != ogr.OGRERR_NONE
# Second renaming
assert lyr.Rename("baz") == ogr.OGRERR_NONE
assert lyr.GetDescription() == "baz"
assert lyr.GetLayerDefn().GetName() == "baz"
lyr.ResetReading()
f = lyr.GetNextFeature()
assert f.GetGeometryRef() is not None
ds = None
ds = ogr.Open("tmp/rename.gdb")
lyr = ds.GetLayerByName("baz")
assert lyr is not None, [
ds.GetLayer(i).GetName() for i in range(ds.GetLayerCount())
]
lyr.ResetReading()
f = lyr.GetNextFeature()
assert f.GetGeometryRef() is not None
ds = None
try:
shutil.rmtree("tmp/rename.gdb")
except OSError:
pass
###############################################################################
# Test that non-spatial tables which are not present in GDB_Items are listed
# see https://github.com/OSGeo/gdal/issues/4463
@pytest.mark.require_driver("OpenFileGDB")
def test_ogr_filegdb_non_spatial_table_outside_gdb_items(openfilegdb_drv, fgdb_drv):
openfilegdb_drv.Deregister()
fgdb_drv.Deregister()
# Force FileGDB first
fgdb_drv.Register()
openfilegdb_drv.Register()
ds = ogr.Open("data/filegdb/table_outside_gdbitems.gdb")
assert ds is not None
assert ds.GetDriver().GetName() == "FileGDB"
assert ds.GetLayerCount() == 3, "did not get expected layer count"
layer_names = set(ds.GetLayer(i).GetName() for i in range(ds.GetLayerCount()))
assert layer_names == {"aquaduct", "flat_table1", "flat_table2"}
###############################################################################
# Test reading .gdb where the CRS in the XML definition of the feature
# table is not consistent with the one of the feature dataset
def test_ogr_filegdb_inconsistent_crs_feature_dataset_and_feature_table():
ds = ogr.Open("data/filegdb/inconsistent_crs_feature_dataset_and_feature_table.gdb")
assert ds is not None
lyr = ds.GetLayer(0)
srs = lyr.GetSpatialRef()
assert srs is not None
assert srs.GetAuthorityCode(None) == "4326"
###############################################################################
# Test reading .gdb with LengthFieldName / AreaFieldName
def test_ogr_filegdb_shape_length_shape_area_as_default_in_field_defn(fgdb_drv):
ds = ogr.Open("data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb")
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Area")).GetDefault()
== "FILEGEODATABASE_SHAPE_AREA"
)
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Length")).GetDefault()
== "FILEGEODATABASE_SHAPE_LENGTH"
)
###############################################################################
# Test explicit CREATE_SHAPE_AREA_AND_LENGTH_FIELDS=YES option
def test_ogr_filegdb_CREATE_SHAPE_AREA_AND_LENGTH_FIELDS_explicit(fgdb_drv):
dirname = "tmp/test_ogr_filegdb_CREATE_SHAPE_AREA_AND_LENGTH_FIELDS_explicit.gdb"
ds = fgdb_drv.CreateDataSource(dirname)
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
lyr = ds.CreateLayer(
"line",
srs=srs,
geom_type=ogr.wkbLineString,
options=["CREATE_SHAPE_AREA_AND_LENGTH_FIELDS=YES"],
)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("LINESTRING(0 0,2 0)"))
lyr.CreateFeature(f)
lyr = ds.CreateLayer(
"area",
srs=srs,
geom_type=ogr.wkbPolygon,
options=["CREATE_SHAPE_AREA_AND_LENGTH_FIELDS=YES"],
)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(
ogr.CreateGeometryFromWkt(
"POLYGON((0 0,0 1,1 1,1 0,0 0),(0.2 0.2,0.2 0.8,0.8 0.8,0.8 0.2,0.2 0.2))"
)
)
lyr.CreateFeature(f)
ds = None
ds = ogr.Open(dirname)
lyr = ds.GetLayerByName("line")
f = lyr.GetNextFeature()
lyr_defn = lyr.GetLayerDefn()
assert lyr_defn.GetFieldIndex("Shape_Length") >= 0
assert lyr_defn.GetFieldIndex("Shape_Area") < 0
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Length")).GetDefault()
== "FILEGEODATABASE_SHAPE_LENGTH"
)
assert f["Shape_Length"] == 2
lyr = ds.GetLayerByName("area")
f = lyr.GetNextFeature()
lyr_defn = lyr.GetLayerDefn()
assert lyr_defn.GetFieldIndex("Shape_Length") >= 0
assert lyr_defn.GetFieldIndex("Shape_Area") >= 0
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Area")).GetDefault()
== "FILEGEODATABASE_SHAPE_AREA"
)
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Length")).GetDefault()
== "FILEGEODATABASE_SHAPE_LENGTH"
)
assert f["Shape_Length"] == pytest.approx(6.4)
assert f["Shape_Area"] == pytest.approx(0.64)
ds = None
try:
shutil.rmtree(dirname)
except OSError:
pass
###############################################################################
# Test explicit CREATE_SHAPE_AREA_AND_LENGTH_FIELDS=YES option
def test_ogr_filegdb_CREATE_SHAPE_AREA_AND_LENGTH_FIELDS_implicit(fgdb_drv):
dirname = "tmp/test_ogr_filegdb_CREATE_SHAPE_AREA_AND_LENGTH_FIELDS_implicit.gdb"
gdal.VectorTranslate(
dirname,
"data/filegdb/filegdb_polygonzm_m_not_closing_with_curves.gdb",
options="-f FileGDB -unsetfid -fid 1",
)
ds = ogr.Open(dirname)
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Area")).GetDefault()
== "FILEGEODATABASE_SHAPE_AREA"
)
assert (
lyr_defn.GetFieldDefn(lyr_defn.GetFieldIndex("Shape_Length")).GetDefault()
== "FILEGEODATABASE_SHAPE_LENGTH"
)
ds = None
try:
shutil.rmtree(dirname)
except OSError:
pass
@pytest.mark.require_driver("OpenFileGDB")
def test_ogr_filegdb_read_relationships(openfilegdb_drv, fgdb_drv):
openfilegdb_drv.Deregister()
fgdb_drv.Deregister()
# Force FileGDB first
fgdb_drv.Register()
openfilegdb_drv.Register()
# no relationships
ds = gdal.OpenEx("data/filegdb/Domains.gdb", gdal.OF_VECTOR)
assert ds.GetRelationshipNames() is None
# has relationships
ds = gdal.OpenEx("data/filegdb/relationships.gdb", gdal.OF_VECTOR)
assert ds.GetDriver().GetDescription() == "FileGDB"
assert set(ds.GetRelationshipNames()) == {
"composite_many_to_many",
"composite_one_to_many",
"composite_one_to_one",
"simple_attributed",
"simple_backward_message_direction",
"simple_both_message_direction",
"simple_forward_message_direction",
"simple_many_to_many",
"simple_one_to_many",
"simple_relationship_one_to_one",
"points__ATTACHREL",
}
assert ds.GetRelationship("xxxx") is None
rel = ds.GetRelationship("simple_relationship_one_to_one")
assert rel is not None
assert rel.GetName() == "simple_relationship_one_to_one"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_ONE
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetLeftMappingTableFields() is None
assert rel.GetRightMappingTableFields() is None
assert rel.GetForwardPathLabel() == "my forward path label"
assert rel.GetBackwardPathLabel() == "my backward path label"
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("simple_one_to_many")
assert rel is not None
assert rel.GetName() == "simple_one_to_many"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("simple_many_to_many")
assert rel is not None
assert rel.GetName() == "simple_many_to_many"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table2"
assert rel.GetMappingTableName() == "simple_many_to_many"
assert rel.GetCardinality() == gdal.GRC_MANY_TO_MANY
assert rel.GetType() == gdal.GRT_ASSOCIATION
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetLeftMappingTableFields() == ["origin_foreign_key"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRightMappingTableFields() == ["destination_foreign_key"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_one_to_one")
assert rel is not None
assert rel.GetName() == "composite_one_to_one"
assert rel.GetLeftTableName() == "table1"
assert rel.GetRightTableName() == "table3"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_ONE
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_one_to_many")
assert rel is not None
assert rel.GetName() == "composite_one_to_many"
assert rel.GetLeftTableName() == "table5"
assert rel.GetRightTableName() == "table4"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("composite_many_to_many")
assert rel is not None
assert rel.GetName() == "composite_many_to_many"
assert rel.GetLeftTableName() == "table6"
assert rel.GetRightTableName() == "table7"
assert rel.GetMappingTableName() == "composite_many_to_many"
assert rel.GetCardinality() == gdal.GRC_MANY_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["pk"]
assert rel.GetLeftMappingTableFields() == ["origin_foreign_key"]
assert rel.GetRightTableFields() == ["parent_pk"]
assert rel.GetRightMappingTableFields() == ["dest_foreign_key"]
assert rel.GetRelatedTableType() == "feature"
rel = ds.GetRelationship("points__ATTACHREL")
assert rel is not None
assert rel.GetName() == "points__ATTACHREL"
assert rel.GetLeftTableName() == "points"
assert rel.GetRightTableName() == "points__ATTACH"
assert rel.GetMappingTableName() == ""
assert rel.GetCardinality() == gdal.GRC_ONE_TO_MANY
assert rel.GetType() == gdal.GRT_COMPOSITE
assert rel.GetLeftTableFields() == ["OBJECTID"]
assert rel.GetRightTableFields() == ["REL_OBJECTID"]
assert rel.GetForwardPathLabel() == "attachment"
assert rel.GetBackwardPathLabel() == "object"
assert rel.GetRelatedTableType() == "media"
###############################################################################
# Test inserting geometries of type incompatible with the layer geometry type
@pytest.mark.parametrize(
"layer_geom_type,wkt",
[
(ogr.wkbLineString, "POLYGON((0 0,0 1,1 1,0 0))"),
(ogr.wkbPolygon, "LINESTRING(0 0,1 1)"),
(ogr.wkbPoint, "MULTIPOINT((0 0))"),
(ogr.wkbMultiPoint, "POINT(0 0)"),
],
)
def test_ogr_filegdb_incompatible_geometry_types(fgdb_drv, layer_geom_type, wkt):
dirname = "tmp/test_ogr_filegdb_incompatible_geometry_types.gdb"
try:
shutil.rmtree(dirname)
except OSError:
pass
ds = fgdb_drv.CreateDataSource(dirname)
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
lyr = ds.CreateLayer("test", srs=srs, geom_type=layer_geom_type)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(wkt))
with gdaltest.error_handler():
assert lyr.CreateFeature(f) == ogr.OGRERR_FAILURE
ds = None
try:
shutil.rmtree(dirname)
except OSError:
pass
###############################################################################
# Test reading an empty polygon
def test_ogr_filegdb_read_empty_polygon():
# Dataset generated by OpenFileGDB driver
ds = ogr.Open("data/filegdb/empty_polygon.gdb")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
assert f.GetGeometryRef().IsEmpty()