217 строки
7.5 KiB
Python
Исполняемый файл
217 строки
7.5 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
# -*- coding: utf-8 -*-
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: FGDB driver stress testing of CreateFeature() with user set FID
|
|
# Author: Even Rouault <even dot rouault at spatialys dot com>
|
|
#
|
|
###############################################################################
|
|
# Copyright (c) 2015, Even Rouault <even dot rouault at spatialys dot com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
|
|
import random
|
|
import shutil
|
|
|
|
import gdaltest
|
|
import ogrtest
|
|
import pytest
|
|
|
|
from osgeo import gdal, ogr, osr
|
|
|
|
###############################################################################
|
|
# Test if driver is available
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_fgdb_stress_init():
|
|
|
|
ogrtest.fgdb_drv = None
|
|
ogrtest.openfilegdb_drv = None
|
|
|
|
ogrtest.fgdb_drv = ogr.GetDriverByName("FileGDB")
|
|
ogrtest.reference_drv = ogr.GetDriverByName("GPKG")
|
|
ogrtest.reference_ext = "gpkg"
|
|
ogrtest.openfilegdb_drv = ogr.GetDriverByName("OpenFileGDB")
|
|
|
|
if ogrtest.fgdb_drv is None:
|
|
pytest.skip()
|
|
if ogrtest.reference_drv is None:
|
|
pytest.skip()
|
|
if ogrtest.openfilegdb_drv is None:
|
|
pytest.skip()
|
|
|
|
try:
|
|
shutil.rmtree("tmp/test.gdb")
|
|
except OSError:
|
|
pass
|
|
|
|
gdal.Unlink("tmp/test." + ogrtest.reference_ext)
|
|
|
|
|
|
###############################################################################
|
|
# Generate databases from random operations
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_fgdb_stress_1():
|
|
if ogrtest.fgdb_drv is None:
|
|
pytest.skip()
|
|
|
|
verbose = False
|
|
|
|
ds_test = ogrtest.fgdb_drv.CreateDataSource("tmp/test.gdb")
|
|
ds_ref = ogrtest.reference_drv.CreateDataSource("tmp/test." + ogrtest.reference_ext)
|
|
sr = osr.SpatialReference()
|
|
sr.ImportFromEPSG(4326)
|
|
lyr_test = ds_test.CreateLayer("test", geom_type=ogr.wkbPoint, srs=sr)
|
|
lyr_ref = ds_ref.CreateLayer("test", geom_type=ogr.wkbPoint, srs=sr)
|
|
for lyr in [lyr_test, lyr_ref]:
|
|
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
|
|
ds_test.ExecuteSQL("CREATE INDEX idx_test_str ON test(str)")
|
|
ds_ref.ExecuteSQL("CREATE INDEX idx_test_str ON test(str)")
|
|
random.seed(0)
|
|
in_transaction = False
|
|
nfeatures_created = 0
|
|
for _ in range(100000):
|
|
function = random.randrange(0, 500)
|
|
if function == 0:
|
|
if not in_transaction:
|
|
if verbose:
|
|
print("StartTransaction")
|
|
ds_test.StartTransaction(force=1)
|
|
else:
|
|
if verbose:
|
|
print("CommitTransaction")
|
|
ds_test.CommitTransaction()
|
|
in_transaction = not in_transaction
|
|
elif function < 500 / 3:
|
|
ret = []
|
|
fid = -1
|
|
if random.randrange(0, 2) == 0:
|
|
fid = 1 + random.randrange(0, 1000)
|
|
wkt = "POINT (%d %d)" % (random.randrange(0, 100), random.randrange(0, 100))
|
|
if verbose:
|
|
print("Create(%d)" % fid)
|
|
for lyr in [lyr_test, lyr_ref]:
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f.SetFID(fid)
|
|
f.SetField(0, "%d" % random.randrange(0, 1000))
|
|
f.SetGeometry(ogr.CreateGeometryFromWkt(wkt))
|
|
with gdaltest.error_handler():
|
|
ret.append(lyr.CreateFeature(f))
|
|
# So to ensure lyr_ref will use the same FID as the tested layer
|
|
fid = f.GetFID()
|
|
# print("created %d" % fid)
|
|
assert ret[0] == ret[1]
|
|
if ret[0] == 0:
|
|
nfeatures_created += 1
|
|
# For some odd reason, the .spx file is no longer updated when doing
|
|
# a SetFeature() before having creating at least 2 features !
|
|
elif function < 500 * 2 / 3 and nfeatures_created >= 2:
|
|
ret = []
|
|
fid = 1 + random.randrange(0, 1000)
|
|
if verbose:
|
|
print("Update(%d)" % fid)
|
|
wkt = "POINT (%d %d)" % (random.randrange(0, 100), random.randrange(0, 100))
|
|
for lyr in [lyr_test, lyr_ref]:
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f.SetFID(fid)
|
|
f.SetField(0, "%d" % random.randrange(0, 1000))
|
|
f.SetGeometry(ogr.CreateGeometryFromWkt(wkt))
|
|
# gdal.PushErrorHandler()
|
|
ret.append(lyr.SetFeature(f))
|
|
# gdal.PopErrorHandler()
|
|
assert ret[0] == ret[1]
|
|
# Same for DeleteFeature()
|
|
elif nfeatures_created >= 2:
|
|
ret = []
|
|
fid = 1 + random.randrange(0, 1000)
|
|
if verbose:
|
|
print("Delete(%d)" % fid)
|
|
for lyr in [lyr_test, lyr_ref]:
|
|
# gdal.PushErrorHandler()
|
|
ret.append(lyr.DeleteFeature(fid))
|
|
# gdal.PopErrorHandler()
|
|
assert ret[0] == ret[1]
|
|
|
|
if in_transaction:
|
|
ds_test.CommitTransaction()
|
|
|
|
|
|
###############################################################################
|
|
# Compare databases
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_fgdb_stress_2():
|
|
if ogrtest.fgdb_drv is None:
|
|
pytest.skip()
|
|
|
|
ds_test = ogr.Open("tmp/test.gdb")
|
|
ds_ref = ogr.Open("tmp/test." + ogrtest.reference_ext)
|
|
|
|
lyr_test = ds_test.GetLayer(0)
|
|
lyr_ref = ds_ref.GetLayer(0)
|
|
|
|
while True:
|
|
f_test = lyr_test.GetNextFeature()
|
|
f_ref = lyr_ref.GetNextFeature()
|
|
assert not (f_test is None and f_ref is not None) or (
|
|
f_test is not None and f_ref is None
|
|
)
|
|
if f_test is None:
|
|
break
|
|
if (
|
|
f_test.GetFID() != f_ref.GetFID()
|
|
or f_test["str"] != f_ref["str"]
|
|
or ogrtest.check_feature_geometry(f_test, f_ref.GetGeometryRef()) != 0
|
|
):
|
|
f_test.DumpReadable()
|
|
f_ref.DumpReadable()
|
|
pytest.fail()
|
|
|
|
for val in range(1000):
|
|
lyr_test.SetAttributeFilter("str = '%d'" % val)
|
|
lyr_ref.SetAttributeFilter("str = '%d'" % val)
|
|
assert lyr_test.GetFeatureCount() == lyr_ref.GetFeatureCount(), val
|
|
|
|
# sys.exit(0)
|
|
|
|
|
|
###############################################################################
|
|
# Cleanup
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_fgdb_stress_cleanup():
|
|
if ogrtest.fgdb_drv is None:
|
|
pytest.skip()
|
|
|
|
try:
|
|
shutil.rmtree("tmp/test.gdb")
|
|
except OSError:
|
|
pass
|
|
|
|
gdal.Unlink("tmp/test." + ogrtest.reference_ext)
|