584 строки
16 KiB
Python
Исполняемый файл
584 строки
16 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: Test read functionality for OGR VDV driver.
|
|
# Author: Even Rouault <even.rouault at spatialys.com>
|
|
#
|
|
###############################################################################
|
|
# Copyright (c) 2015, Even Rouault <even.rouault at spatialys.com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
|
|
import gdaltest
|
|
import ogrtest
|
|
import pytest
|
|
|
|
from osgeo import gdal, ogr
|
|
|
|
pytestmark = pytest.mark.require_driver("VDV")
|
|
|
|
###############################################################################
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def module_disable_exceptions():
|
|
with gdaltest.disable_exceptions():
|
|
yield
|
|
|
|
|
|
###############################################################################
|
|
# Basic test of .idf file
|
|
|
|
|
|
def test_ogr_idf_1():
|
|
|
|
ds = ogr.Open("data/vdv/test.idf")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["NODE_ID"] != 1
|
|
or f["foo"] != "U"
|
|
or f.GetGeometryRef().ExportToWkt() != "POINT (2 49)"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(1)
|
|
f = lyr.GetNextFeature()
|
|
if f.GetGeometryRef().ExportToWkt() != "LINESTRING (2 49,2.5 49.5,2.7 49.7,3 50)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(2)
|
|
f = lyr.GetNextFeature()
|
|
if f.GetGeometryRef().ExportToWkt() != "POINT (2.5 49.5)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(3)
|
|
f = lyr.GetNextFeature()
|
|
if f["FOO"] != 1:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
#
|
|
|
|
|
|
@pytest.mark.require_driver("SQLite")
|
|
def test_ogr_idf_1_with_temp_sqlite_db():
|
|
options = {"OGR_IDF_TEMP_DB_THRESHOLD": "0"}
|
|
with gdaltest.config_options(options):
|
|
return test_ogr_idf_1()
|
|
|
|
|
|
###############################################################################
|
|
# Basic test of .idf file
|
|
|
|
|
|
def test_ogr_idf_3d():
|
|
|
|
ds = ogr.Open("data/vdv/test_3d.idf")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["NODE_ID"] != 1
|
|
or f["foo"] != "U"
|
|
or f.GetGeometryRef().ExportToWkt() != "POINT (2 49 10)"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(1)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetGeometryRef().ExportToWkt()
|
|
!= "LINESTRING (2 49 10,2.5 49.5 10,2.7 49.7 20,3 50 20)"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(2)
|
|
f = lyr.GetNextFeature()
|
|
if f.GetGeometryRef().ExportToWkt() != "POINT (2.5 49.5 10)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayer(3)
|
|
f = lyr.GetNextFeature()
|
|
if f["FOO"] != 1:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Run test_ogrsf on .idf
|
|
|
|
|
|
def test_ogr_idf_2():
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_test_ogrsf_path() is None:
|
|
pytest.skip()
|
|
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_test_ogrsf_path() + " -ro data/vdv/test.idf"
|
|
)
|
|
|
|
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
|
|
|
|
|
###############################################################################
|
|
# Create a VDV file
|
|
|
|
|
|
def test_ogr_vdv_1(filename="tmp/test.x10", dsco=None, lco=None):
|
|
|
|
dsco = [] if dsco is None else dsco
|
|
lco = [] if lco is None else lco
|
|
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource(filename, options=dsco)
|
|
ds.CreateLayer("empty", options=lco)
|
|
lyr = ds.CreateLayer("lyr_1", options=lco)
|
|
lyr.CreateField(ogr.FieldDefn("str_field", ogr.OFTString))
|
|
lyr.CreateField(ogr.FieldDefn("int_field", ogr.OFTInteger))
|
|
lyr.CreateField(ogr.FieldDefn("int64_field", ogr.OFTInteger64))
|
|
|
|
bool_field = ogr.FieldDefn("bool_field", ogr.OFTInteger)
|
|
bool_field.SetSubType(ogr.OFSTBoolean)
|
|
lyr.CreateField(bool_field)
|
|
|
|
fld = ogr.FieldDefn("str2_field", ogr.OFTString)
|
|
fld.SetWidth(2)
|
|
lyr.CreateField(fld)
|
|
|
|
fld = ogr.FieldDefn("int2_field", ogr.OFTInteger)
|
|
fld.SetWidth(2)
|
|
lyr.CreateField(fld)
|
|
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f.SetField("str_field", 'a"b')
|
|
f.SetField("int_field", 12)
|
|
f.SetField("bool_field", 1)
|
|
lyr.CreateFeature(f)
|
|
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
lyr.CreateFeature(f)
|
|
|
|
lyr = ds.CreateLayer("another_layer", options=lco)
|
|
lyr.CreateField(ogr.FieldDefn("str_field", ogr.OFTString))
|
|
for i in range(5):
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f.SetField("str_field", i)
|
|
lyr.CreateFeature(f)
|
|
|
|
ds = None
|
|
|
|
# Do nothing
|
|
ds = ogr.Open(filename, update=1)
|
|
ds = None
|
|
|
|
ds = ogr.Open(filename, update=1)
|
|
ds.CreateLayer("empty2", options=lco)
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Read it
|
|
|
|
|
|
def test_ogr_vdv_2(src_filename="tmp/test.x10"):
|
|
|
|
out_filename = "/vsimem/vdv/ogr_vdv_2.x10"
|
|
gdal.Unlink(out_filename)
|
|
|
|
src_ds = ogr.Open(src_filename)
|
|
out_ds = ogr.GetDriverByName("VDV").CreateDataSource(out_filename)
|
|
layer_names = [
|
|
src_ds.GetLayer(idx).GetName() for idx in range(src_ds.GetLayerCount())
|
|
]
|
|
layer_names.sort()
|
|
for layer_name in layer_names:
|
|
src_lyr = src_ds.GetLayer(layer_name)
|
|
options = [
|
|
"HEADER_SRC_DATE=01.01.1970",
|
|
"HEADER_SRC_TIME=00.00.00",
|
|
"HEADER_foo=bar",
|
|
]
|
|
dst_lyr = out_ds.CreateLayer(src_lyr.GetName(), options=options)
|
|
for field_idx in range(src_lyr.GetLayerDefn().GetFieldCount()):
|
|
dst_lyr.CreateField(src_lyr.GetLayerDefn().GetFieldDefn(field_idx))
|
|
for src_f in src_lyr:
|
|
dst_f = ogr.Feature(dst_lyr.GetLayerDefn())
|
|
dst_f.SetFrom(src_f)
|
|
dst_lyr.CreateFeature(dst_f)
|
|
out_ds = None
|
|
|
|
expected = """mod; DD.MM.YYYY; HH:MM:SS; free
|
|
src; "UNKNOWN"; "01.01.1970"; "00.00.00"
|
|
chs; "ISO8859-1"
|
|
ver; "1.4"
|
|
ifv; "1.4"
|
|
dve; "1.4"
|
|
fft; ""
|
|
foo; "bar"
|
|
tbl; another_layer
|
|
atr; str_field
|
|
frm; char[80]
|
|
rec; "0"
|
|
rec; "1"
|
|
rec; "2"
|
|
rec; "3"
|
|
rec; "4"
|
|
end; 5
|
|
tbl; lyr_1
|
|
atr; str_field; int_field; int64_field; bool_field; str2_field; int2_field
|
|
frm; char[80]; num[10.0]; num[19.0]; boolean; char[2]; num[1.0]
|
|
rec; "a""b"; 12; NULL; 1; NULL; NULL
|
|
rec; NULL; NULL; NULL; NULL; NULL; NULL
|
|
end; 2
|
|
tbl; empty
|
|
atr;
|
|
frm;
|
|
end; 0
|
|
tbl; empty2
|
|
atr;
|
|
frm;
|
|
end; 0
|
|
eof; 4
|
|
"""
|
|
|
|
f = gdal.VSIFOpenL(out_filename, "rb")
|
|
got = gdal.VSIFReadL(1, 10000, f).decode("latin1")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert got == expected
|
|
|
|
gdal.Unlink(out_filename)
|
|
|
|
|
|
###############################################################################
|
|
# Run test_ogrsf on it
|
|
|
|
|
|
def test_ogr_vdv_3():
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_test_ogrsf_path() is None:
|
|
pytest.skip()
|
|
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/test.x10"
|
|
)
|
|
|
|
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
|
|
|
|
|
###############################################################################
|
|
# Create a VDV directory
|
|
|
|
|
|
def test_ogr_vdv_4():
|
|
return test_ogr_vdv_1(
|
|
filename="tmp/test_x10", dsco=["SINGLE_FILE=NO"], lco=["EXTENSION=txt"]
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Read it
|
|
|
|
|
|
def test_ogr_vdv_5():
|
|
return test_ogr_vdv_2(src_filename="tmp/test_x10")
|
|
|
|
|
|
###############################################################################
|
|
# Run test_ogrsf on it
|
|
|
|
|
|
def test_ogr_vdv_6():
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_test_ogrsf_path() is None:
|
|
pytest.skip()
|
|
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/test_x10"
|
|
)
|
|
|
|
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
|
|
|
|
|
###############################################################################
|
|
# Run VDV452
|
|
|
|
|
|
def test_ogr_vdv_7():
|
|
|
|
tests = [
|
|
("VDV-452", "STOP", "POINT_LONGITUDE", "POINT_LATITUDE"),
|
|
("VDV-452-ENGLISH", "STOP", "POINT_LONGITUDE", "POINT_LATITUDE"),
|
|
("VDV-452", "REC_ORT", "ORT_POS_LAENGE", "ORT_POS_BREITE"),
|
|
("VDV-452-GERMAN", "REC_ORT", "ORT_POS_LAENGE", "ORT_POS_BREITE"),
|
|
]
|
|
|
|
out_filename = "/vsimem/vdv/ogr_vdv_7.x10"
|
|
|
|
for (profile, lyrname, longname, latname) in tests:
|
|
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource(out_filename)
|
|
lyr = ds.CreateLayer(
|
|
lyrname, geom_type=ogr.wkbPoint, options=["PROFILE=" + profile]
|
|
)
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
lng = -(123 + 45.0 / 60 + 56.789 / 3600)
|
|
lat = -(23 + 45.0 / 60 + 56.789 / 3600)
|
|
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT(%.10f %.10f)" % (lng, lat)))
|
|
lyr.CreateFeature(f)
|
|
ds = None
|
|
|
|
ds = ogr.Open(out_filename)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f[longname] != -1234556789
|
|
or f[latname] != -234556789
|
|
or ogrtest.check_feature_geometry(
|
|
f, "POINT (-123.765774722222 -23.7657747222222)"
|
|
)
|
|
!= 0
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
gdal.Unlink("/vsimem/vdv/ogr_vdv_7.x10")
|
|
|
|
tests = [("VDV-452", True), ("VDV-452-ENGLISH", False), ("VDV-452-GERMAN", False)]
|
|
|
|
for (profile, strict) in tests:
|
|
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource(out_filename)
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
lyr = ds.CreateLayer(
|
|
"UNKNOWN",
|
|
options=["PROFILE=" + profile, "PROFILE_STRICT=" + str(strict)],
|
|
)
|
|
assert gdal.GetLastErrorMsg() != ""
|
|
if strict and lyr is not None:
|
|
pytest.fail()
|
|
elif not strict and lyr is None:
|
|
pytest.fail()
|
|
|
|
if profile == "VDV-452-GERMAN":
|
|
lyr_name = "REC_ORT"
|
|
else:
|
|
lyr_name = "STOP"
|
|
lyr = ds.CreateLayer(
|
|
lyr_name, options=["PROFILE=" + profile, "PROFILE_STRICT=" + str(strict)]
|
|
)
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
ret = lyr.CreateField(ogr.FieldDefn("UNKNOWN"))
|
|
assert gdal.GetLastErrorMsg() != ""
|
|
if strict and ret == 0:
|
|
pytest.fail()
|
|
elif not strict and ret != 0:
|
|
pytest.fail()
|
|
|
|
ds = None
|
|
|
|
gdal.Unlink("/vsimem/vdv/ogr_vdv_7.x10")
|
|
|
|
|
|
###############################################################################
|
|
# Test a few error cases
|
|
|
|
|
|
def test_ogr_vdv_8():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource("/does/not_exist")
|
|
assert ds is None
|
|
|
|
with gdaltest.error_handler():
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource(
|
|
"/does/not_exist", options=["SINGLE_FILE=FALSE"]
|
|
)
|
|
assert ds is None
|
|
|
|
# Add layer in non writable directory
|
|
if sys.platform.startswith("linux"):
|
|
os.mkdir("tmp/ogr_vdv_8")
|
|
open("tmp/ogr_vdv_8/empty.x10", "wb").write(
|
|
"tbl; foo\natr;\nfrm;\n".encode("latin1")
|
|
)
|
|
# 0555 = 365
|
|
os.chmod("tmp/ogr_vdv_8", 365)
|
|
try:
|
|
open("tmp/ogr_vdv_8/another_file", "wb").close()
|
|
shutil.rmtree("tmp/ogr_vdv_8")
|
|
do_test = False
|
|
except Exception:
|
|
do_test = True
|
|
if do_test:
|
|
ds = ogr.Open("tmp/ogr_vdv_8", update=1)
|
|
with gdaltest.error_handler():
|
|
lyr = ds.CreateLayer("another_layer")
|
|
# 0755 = 493
|
|
os.chmod("tmp/ogr_vdv_8", 493)
|
|
ds = None
|
|
shutil.rmtree("tmp/ogr_vdv_8")
|
|
assert lyr is None
|
|
|
|
out_filename = "/vsimem/vdv/ogr_vdv_8.x10"
|
|
ds = ogr.GetDriverByName("VDV").CreateDataSource(out_filename)
|
|
|
|
# File already exists
|
|
with gdaltest.error_handler():
|
|
ds2 = ogr.GetDriverByName("VDV").CreateDataSource(out_filename)
|
|
assert ds2 is None
|
|
|
|
assert ds.TestCapability(ogr.ODsCCreateLayer) == 1
|
|
|
|
lyr1 = ds.CreateLayer("lyr1")
|
|
assert lyr1.TestCapability(ogr.OLCSequentialWrite) == 1
|
|
assert lyr1.TestCapability(ogr.OLCCreateField) == 1
|
|
|
|
lyr1.ResetReading()
|
|
|
|
with gdaltest.error_handler():
|
|
lyr1.GetNextFeature()
|
|
|
|
lyr1.CreateFeature(ogr.Feature(lyr1.GetLayerDefn()))
|
|
|
|
# Layer structure is now frozen
|
|
assert lyr1.TestCapability(ogr.OLCCreateField) == 0
|
|
|
|
with gdaltest.error_handler():
|
|
ret = lyr1.CreateField(ogr.FieldDefn("not_allowed"))
|
|
assert ret != 0
|
|
|
|
lyr2 = ds.CreateLayer("lyr2")
|
|
lyr2.CreateFeature(ogr.Feature(lyr2.GetLayerDefn()))
|
|
|
|
# Test interleaved writing
|
|
|
|
assert lyr1.TestCapability(ogr.OLCSequentialWrite) == 0
|
|
|
|
with gdaltest.error_handler():
|
|
ret = lyr1.CreateFeature(ogr.Feature(lyr1.GetLayerDefn()))
|
|
assert ret != 0
|
|
|
|
assert lyr1.GetFeatureCount() == 1
|
|
|
|
ds = None
|
|
|
|
# Test appending new layer to file without eof
|
|
gdal.FileFromMemBuffer(
|
|
out_filename, 'tbl; foo\natr; atr\nfrm; char[40]\nrec; "foo"\n'
|
|
)
|
|
ds = ogr.Open(out_filename, update=1)
|
|
lyr = ds.CreateLayer("new_layer")
|
|
lyr.CreateField(ogr.FieldDefn("atr"))
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f["atr"] = "bar"
|
|
lyr.CreateFeature(f)
|
|
f = None
|
|
ds = None
|
|
|
|
expected = """tbl; foo
|
|
atr; atr
|
|
frm; char[40]
|
|
rec; "foo"
|
|
tbl; new_layer
|
|
atr; atr
|
|
frm; char[80]
|
|
rec; "bar"
|
|
end; 1
|
|
eof; 2
|
|
"""
|
|
|
|
f = gdal.VSIFOpenL(out_filename, "rb")
|
|
got = gdal.VSIFReadL(1, 10000, f).decode("latin1")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert got == expected
|
|
|
|
# Test we are robust against missing end;
|
|
ds = ogr.Open(out_filename)
|
|
for i in range(2):
|
|
lyr = ds.GetLayer(i)
|
|
assert lyr.GetFeatureCount() == 1
|
|
lyr.ResetReading()
|
|
fc = 0
|
|
for f in lyr:
|
|
fc += 1
|
|
assert fc == 1
|
|
lyr = None
|
|
ds = None
|
|
|
|
# Test appending new layer to file without terminating \n
|
|
gdal.FileFromMemBuffer(
|
|
out_filename, 'tbl; foo\natr; atr\nfrm; char[40]\nrec; "foo"\neof; 1'
|
|
)
|
|
ds = ogr.Open(out_filename, update=1)
|
|
lyr = ds.CreateLayer("new_layer")
|
|
lyr.CreateField(ogr.FieldDefn("atr"))
|
|
f = ogr.Feature(lyr.GetLayerDefn())
|
|
f["atr"] = "bar"
|
|
lyr.CreateFeature(f)
|
|
f = None
|
|
ds = None
|
|
|
|
f = gdal.VSIFOpenL(out_filename, "rb")
|
|
got = gdal.VSIFReadL(1, 10000, f).decode("latin1")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert got == expected
|
|
|
|
gdal.Unlink(out_filename)
|
|
|
|
|
|
###############################################################################
|
|
# Cleanup
|
|
|
|
|
|
def test_ogr_vdv_cleanup():
|
|
|
|
gdal.Unlink("tmp/test.x10")
|
|
gdal.Unlink("/vsimem/vdv/ogr_vdv_2.x10")
|
|
gdal.Unlink("/vsimem/vdv/ogr_vdv_7.x10")
|
|
gdal.Unlink("/vsimem/vdv/ogr_vdv_8.x10")
|
|
files = gdal.ReadDir("tmp/test_x10")
|
|
if files is not None:
|
|
for f in files:
|
|
gdal.Unlink("tmp/test_x10/" + f)
|
|
gdal.Rmdir("tmp/test_x10")
|