3428 строки
113 KiB
Python
Исполняемый файл
3428 строки
113 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
# -*- coding: utf-8 -*-
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: GMLAS driver testing.
|
|
# Author: Even Rouault, <even dot rouault at spatialys dot com>
|
|
#
|
|
# Initial development funded by the European Earth observation programme
|
|
# Copernicus
|
|
#
|
|
# ******************************************************************************
|
|
# Copyright (c) 2016, Even Rouault, <even dot rouault at spatialys dot com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import os
|
|
import os.path
|
|
from http.server import BaseHTTPRequestHandler
|
|
|
|
import gdaltest
|
|
import ogrtest
|
|
import pytest
|
|
import webserver
|
|
|
|
from osgeo import gdal, ogr
|
|
|
|
pytestmark = pytest.mark.require_driver("GMLAS")
|
|
|
|
###############################################################################
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def module_disable_exceptions():
|
|
with gdaltest.disable_exceptions():
|
|
yield
|
|
|
|
|
|
###############################################################################
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def startup_and_cleanup():
|
|
|
|
gdal.SetConfigOption("GMLAS_WARN_UNEXPECTED", "YES")
|
|
|
|
# FileGDB embedded libxml2 cause random crashes with CPLValidateXML() use of external libxml2
|
|
old_val_GDAL_XML_VALIDATION = gdal.GetConfigOption("GDAL_XML_VALIDATION")
|
|
if (
|
|
ogr.GetDriverByName("FileGDB") is not None
|
|
and old_val_GDAL_XML_VALIDATION is None
|
|
):
|
|
gdal.SetConfigOption("GDAL_XML_VALIDATION", "NO")
|
|
|
|
yield
|
|
|
|
files = gdal.ReadDir("/vsimem/")
|
|
if files is not None:
|
|
print("Remaining files: " + str(files))
|
|
|
|
gdal.SetConfigOption("GMLAS_WARN_UNEXPECTED", None)
|
|
gdal.SetConfigOption("GDAL_XML_VALIDATION", old_val_GDAL_XML_VALIDATION)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def compare_ogrinfo_output(gmlfile, reffile, options=""):
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_ogrinfo_path() is None:
|
|
pytest.skip()
|
|
|
|
tmpfilename = "tmp/" + os.path.basename(gmlfile) + ".txt"
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_ogrinfo_path()
|
|
+ " -ro -al GMLAS:"
|
|
+ gmlfile
|
|
+ " -oo EXPOSE_METADATA_LAYERS=YES "
|
|
+ "-oo @KEEP_RELATIVE_PATHS_FOR_METADATA=YES "
|
|
+ "-oo @EXPOSE_SCHEMAS_NAME_IN_METADATA=NO "
|
|
+ "-oo @EXPOSE_CONFIGURATION_IN_METADATA=NO"
|
|
+ " "
|
|
+ options,
|
|
encoding="utf-8",
|
|
)
|
|
ret = ret.replace("\r\n", "\n")
|
|
ret = ret.replace("data\\gmlas\\", "data/gmlas/") # Windows
|
|
ret = ret.replace("data/gmlas\\", "data/gmlas/") # Windows
|
|
expected = open(reffile, "rb").read().decode("utf-8")
|
|
expected = expected.replace("\r\n", "\n")
|
|
if ret != expected:
|
|
print(ret.encode("utf-8"))
|
|
open(tmpfilename, "wb").write(ret.encode("utf-8"))
|
|
print("Diff:")
|
|
os.system("diff -u " + reffile + " " + tmpfilename)
|
|
# os.unlink(tmpfilename)
|
|
pytest.fail("Got:")
|
|
|
|
|
|
###############################################################################
|
|
# Basic test
|
|
|
|
|
|
def test_ogr_gmlas_basic():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_test1.xml")
|
|
assert ds is not None
|
|
ds = None
|
|
|
|
# Skip tests when -fsanitize is used
|
|
if gdaltest.is_travis_branch("sanitize"):
|
|
pytest.skip("Skipping because of -sanitize")
|
|
|
|
return compare_ogrinfo_output(
|
|
"data/gmlas/gmlas_test1.xml", "data/gmlas/gmlas_test1.txt"
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Run test_ogrsf
|
|
|
|
|
|
def test_ogr_gmlas_test_ogrsf():
|
|
|
|
# Skip tests when -fsanitize is used
|
|
if gdaltest.is_travis_branch("sanitize"):
|
|
pytest.skip("Skipping because of -sanitize")
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_test_ogrsf_path() is None:
|
|
pytest.skip()
|
|
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_test_ogrsf_path()
|
|
+ " -ro GMLAS:data/gmlas/gmlas_test1.xml"
|
|
)
|
|
|
|
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
|
|
|
|
|
|
###############################################################################
|
|
# Test virtual file support
|
|
|
|
|
|
def test_ogr_gmlas_virtual_file():
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_8.xml",
|
|
"""<myns:main_elt xmlns:myns="http://myns"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xsi:schemaLocation="http://myns ogr_gmlas_8.xsd"/>""",
|
|
)
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_8.xsd",
|
|
"""<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
|
|
xmlns:myns="http://myns"
|
|
targetNamespace="http://myns"
|
|
elementFormDefault="qualified" attributeFormDefault="unqualified">
|
|
<xs:element name="main_elt" type="xs:string"/>
|
|
</xs:schema>""",
|
|
)
|
|
|
|
ds = gdal.OpenEx("GMLAS:/vsimem/ogr_gmlas_8.xml")
|
|
assert ds is not None
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_8.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_8.xsd")
|
|
|
|
|
|
###############################################################################
|
|
# Test opening with XSD option
|
|
|
|
|
|
def test_ogr_gmlas_datafile_with_xsd_option():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=["XSD=data/gmlas/gmlas_test1.xsd"],
|
|
)
|
|
assert ds is not None
|
|
|
|
|
|
###############################################################################
|
|
# Test opening with just XSD option
|
|
|
|
|
|
def test_ogr_gmlas_no_datafile_with_xsd_option():
|
|
|
|
ds = gdal.OpenEx("GMLAS:", open_options=["XSD=data/gmlas/gmlas_test1.xsd"])
|
|
assert ds is not None
|
|
|
|
|
|
###############################################################################
|
|
# Test opening with just XSD option but pointing to a non-xsd filename
|
|
|
|
|
|
def test_ogr_gmlas_no_datafile_xsd_which_is_not_xsd():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:", open_options=["XSD=data/gmlas/gmlas_test1.xml"])
|
|
assert ds is None
|
|
assert gdal.GetLastErrorMsg().find("invalid content in 'schema' element") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test opening with nothing
|
|
|
|
|
|
def test_ogr_gmlas_no_datafile_no_xsd():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:")
|
|
assert ds is None
|
|
assert (
|
|
gdal.GetLastErrorMsg().find(
|
|
"XSD open option must be provided when no XML data file is passed"
|
|
)
|
|
>= 0
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test opening an inexisting GML file
|
|
|
|
|
|
def test_ogr_gmlas_non_existing_gml():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:/vsimem/i_do_not_exist.gml")
|
|
assert ds is None
|
|
assert gdal.GetLastErrorMsg().find("Cannot open /vsimem/i_do_not_exist.gml") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test opening with just XSD option but pointing to a non existing file
|
|
|
|
|
|
def test_ogr_gmlas_non_existing_xsd():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:", open_options=["XSD=/vsimem/i_do_not_exist.xsd"])
|
|
assert ds is None
|
|
assert gdal.GetLastErrorMsg().find("Cannot resolve /vsimem/i_do_not_exist.xsd") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test opening a GML file without schemaLocation
|
|
|
|
|
|
def test_ogr_gmlas_gml_without_schema_location():
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_gml_without_schema_location.xml",
|
|
"""<MYNS:main_elt xmlns:MYNS="http://myns"/>""",
|
|
)
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:/vsimem/ogr_gmlas_gml_without_schema_location.xml")
|
|
assert ds is None
|
|
assert (
|
|
gdal.GetLastErrorMsg().find(
|
|
"No schema locations found when analyzing data file: XSD open option must be provided"
|
|
)
|
|
>= 0
|
|
)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_gml_without_schema_location.xml")
|
|
|
|
|
|
###############################################################################
|
|
# Test invalid schema
|
|
|
|
|
|
def test_ogr_gmlas_invalid_schema():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_invalid_schema.xml")
|
|
assert ds is None
|
|
assert gdal.GetLastErrorMsg().find("invalid content") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test invalid XML
|
|
|
|
|
|
def test_ogr_gmlas_invalid_xml():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_invalid_xml.xml")
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
assert f is None
|
|
assert (
|
|
gdal.GetLastErrorMsg().find("input ended before all started tags were ended")
|
|
>= 0
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test links with gml:ReferenceType
|
|
|
|
|
|
def test_ogr_gmlas_gml_Reference():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_test_targetelement.xml")
|
|
assert ds.GetLayerCount() == 3
|
|
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["reference_existing_target_elt_with_required_id_href"] != "#BAZ"
|
|
or f["reference_existing_target_elt_with_required_id_pkid"] != "BAZ"
|
|
or f["reference_existing_target_elt_with_optional_id_href"] != "#BAZ2"
|
|
or f["refe_exis_targ_elt_with_opti_id_targe_elt_with_optio_id_pkid"]
|
|
!= "F36BAD21BD2F14DDCA8852DBF8C90DBC_target_elt_with_optional_id_1"
|
|
or f["reference_existing_abstract_target_elt_href"] != "#BAW"
|
|
or f.IsFieldSet("reference_existing_abstract_target_elt_nillable_href")
|
|
or f["reference_existing_abstract_target_elt_nillable_nil"] != 1
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test that we fix ambiguities in class names
|
|
|
|
|
|
def test_ogr_gmlas_same_element_in_different_ns():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_same_element_in_different_ns.xml")
|
|
assert ds is not None
|
|
# for i in range(ds.GetLayerCount()):
|
|
# print(ds.GetLayer(i).GetName())
|
|
|
|
assert ds.GetLayerCount() == 5
|
|
lyr = ds.GetLayerByName("elt")
|
|
f = lyr.GetNextFeature()
|
|
if f.IsFieldSet("abstractElt_other_ns_realizationOfAbstractElt_pkid") == 0:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
assert ds.GetLayerByName("myns_realizationOfAbstractElt") is not None
|
|
assert ds.GetLayerByName("other_ns_realizationOfAbstractElt") is not None
|
|
assert (
|
|
ds.GetLayerByName("elt_elt2_abstractElt_myns_realizationOfAbstractElt")
|
|
is not None
|
|
)
|
|
assert (
|
|
ds.GetLayerByName("elt_elt2_abstractElt_other_ns_realizationOfAbstractElt")
|
|
is not None
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test a corner case of relative path resolution
|
|
|
|
|
|
def test_ogr_gmlas_corner_case_relative_path():
|
|
|
|
ds = ogr.Open("GMLAS:../ogr/data/gmlas/gmlas_test1.xml")
|
|
assert ds is not None
|
|
|
|
|
|
###############################################################################
|
|
# Test unexpected repeated element
|
|
|
|
|
|
def test_ogr_gmlas_unexpected_repeated_element():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_unexpected_repeated_element.xml")
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f is None or f["foo"] != "foo_again"
|
|
): # somewhat arbitrary to keep the latest one!
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
assert gdal.GetLastErrorMsg().find("Unexpected element myns:main_elt/myns:foo") >= 0
|
|
f = lyr.GetNextFeature()
|
|
assert f is None
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test unexpected repeated element
|
|
|
|
|
|
def test_ogr_gmlas_unexpected_repeated_element_variant():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_unexpected_repeated_element_variant.xml")
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f is None or f["foo"] != "foo_again"
|
|
): # somewhat arbitrary to keep the latest one!
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
assert gdal.GetLastErrorMsg().find("Unexpected element myns:main_elt/myns:foo") >= 0
|
|
f = lyr.GetNextFeature()
|
|
assert f is None
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test reading geometries embedded in a geometry property element
|
|
|
|
|
|
def test_ogr_gmlas_geometryproperty():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32.gml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><GML><IncludeGeometryXML>true</IncludeGeometryXML></GML></LayerBuildingRules></Configuration>"
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
geom_field_count = lyr.GetLayerDefn().GetGeomFieldCount()
|
|
assert geom_field_count == 15
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["geometryProperty_xml"]
|
|
!= ' <gml:Point gml:id="poly.geom.Geometry" srsName="urn:ogc:def:crs:EPSG::4326"> <gml:pos>49 2</gml:pos> </gml:Point> '
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if not f.IsFieldNull("geometryPropertyEmpty_xml"):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if (
|
|
f["pointProperty_xml"]
|
|
!= '<gml:Point gml:id="poly.geom.Point"><gml:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">50 3</gml:pos></gml:Point>'
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if f["pointPropertyRepeated_xml"] != [
|
|
'<gml:Point gml:id="poly.geom.pointPropertyRepeated.1"><gml:pos>0 1</gml:pos></gml:Point>',
|
|
'<gml:Point gml:id="poly.geom.pointPropertyRepeated.2"><gml:pos>1 2</gml:pos></gml:Point>',
|
|
'<gml:Point gml:id="poly.geom.pointPropertyRepeated.3"><gml:pos>3 4</gml:pos></gml:Point>',
|
|
]:
|
|
f.DumpReadable()
|
|
pytest.fail(f["pointPropertyRepeated_xml"])
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
sr = lyr.GetLayerDefn().GetGeomFieldDefn(geom_idx).GetSpatialRef()
|
|
assert not (sr is None or sr.ExportToWkt().find("4326") < 0)
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
# Axis swapping
|
|
if wkt != "POINT (2 49)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryPropertyEmpty")
|
|
if f.GetGeomFieldRef(geom_idx) is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("pointProperty")
|
|
sr = lyr.GetLayerDefn().GetGeomFieldDefn(geom_idx).GetSpatialRef()
|
|
assert not (sr is None or sr.ExportToWkt().find("4326") < 0)
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
if wkt != "POINT (3 50)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("lineStringProperty")
|
|
sr = lyr.GetLayerDefn().GetGeomFieldDefn(geom_idx).GetSpatialRef()
|
|
assert not (sr is None or sr.ExportToWkt().find("4326") < 0)
|
|
assert lyr.GetLayerDefn().GetGeomFieldDefn(geom_idx).GetType() == ogr.wkbLineString
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
if wkt != "LINESTRING (2 49)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("pointPropertyRepeated")
|
|
assert lyr.GetLayerDefn().GetGeomFieldDefn(geom_idx).GetType() == ogr.wkbUnknown
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
if wkt != "GEOMETRYCOLLECTION (POINT (0 1),POINT (1 2),POINT (3 4))":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("mycustompointproperty_point")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
if wkt != "POINT (5 6)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# Test that on-the-fly reprojection works
|
|
f = lyr.GetNextFeature()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
geom = f.GetGeomFieldRef(geom_idx)
|
|
if ogrtest.check_feature_geometry(geom, "POINT (3.0 0.0)") != 0:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# Failed reprojection
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
if f.GetGeomFieldRef(geom_idx) is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# Test SWAP_COORDINATES=NO
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32.gml",
|
|
open_options=["SWAP_COORDINATES=NO"],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
# Axis swapping
|
|
if wkt != "POINT (49 2)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("lineStringProperty")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
# Axis swapping
|
|
if wkt != "LINESTRING (2 49)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# Test SWAP_COORDINATES=YES
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32.gml",
|
|
open_options=["SWAP_COORDINATES=YES"],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
# Axis swapping
|
|
if wkt != "POINT (2 49)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("lineStringProperty")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
# Axis swapping
|
|
if wkt != "LINESTRING (49 2)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test reading geometries referenced by a AbstractGeometry element
|
|
|
|
|
|
def test_ogr_gmlas_abstractgeometry():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_abstractgeometry_gml32.gml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><GML><IncludeGeometryXML>true</IncludeGeometryXML></GML></LayerBuildingRules></Configuration>"
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
assert lyr.GetLayerDefn().GetGeomFieldCount() == 2
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["AbstractGeometry_xml"]
|
|
!= '<gml:Point gml:id="test.geom.0"><gml:pos>0 1</gml:pos></gml:Point>'
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if f["repeated_AbstractGeometry_xml"] != [
|
|
'<gml:Point gml:id="test.geom.repeated.1"><gml:pos>0 1</gml:pos>',
|
|
'<gml:Point gml:id="test.geom.repeated.2"><gml:pos>1 2</gml:pos>',
|
|
]:
|
|
f.DumpReadable()
|
|
pytest.fail(f["repeated_AbstractGeometry_xml"])
|
|
wkt = f.GetGeomFieldRef(0).ExportToWkt()
|
|
if wkt != "POINT (0 1)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
wkt = f.GetGeomFieldRef(1).ExportToWkt()
|
|
if wkt != "GEOMETRYCOLLECTION (POINT (0 1),POINT (1 2))":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test validation against schema
|
|
|
|
|
|
class MyHandler(object):
|
|
def __init__(self):
|
|
self.error_list = []
|
|
|
|
def error_handler(self, err_type, err_no, err_msg):
|
|
if err_type != 1: # 1 == Debug
|
|
self.error_list.append((err_type, err_no, err_msg))
|
|
|
|
|
|
def test_ogr_gmlas_validate():
|
|
|
|
# By default check we are silent about validation error
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_validate.xml")
|
|
assert ds is not None
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
gdal.SetConfigOption("GMLAS_WARN_UNEXPECTED", None)
|
|
lyr = ds.GetLayer(0)
|
|
lyr.GetFeatureCount()
|
|
gdal.SetConfigOption("GMLAS_WARN_UNEXPECTED", "YES")
|
|
gdal.PopErrorHandler()
|
|
assert not myhandler.error_list
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_validate.xml")
|
|
assert ds is not None
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
lyr = ds.GetLayer(0)
|
|
lyr.GetFeatureCount()
|
|
gdal.PopErrorHandler()
|
|
# Unexpected element with xpath=myns:main_elt/myns:bar (subxpath=myns:main_elt/myns:bar) found
|
|
assert len(myhandler.error_list) >= 2
|
|
|
|
# Enable validation on a doc without validation errors
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_test1.xml", open_options=["VALIDATE=YES"])
|
|
gdal.PopErrorHandler()
|
|
assert ds is not None, myhandler.error_list
|
|
assert not myhandler.error_list
|
|
|
|
# Enable validation on a doc without validation error, and with explicit XSD
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/gmlas_test1.xml", open("data/gmlas/gmlas_test1.xml").read()
|
|
)
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/gmlas_test1.xml",
|
|
open_options=[
|
|
"XSD=" + os.getcwd() + "/data/gmlas/gmlas_test1.xsd",
|
|
"VALIDATE=YES",
|
|
],
|
|
)
|
|
gdal.PopErrorHandler()
|
|
gdal.Unlink("/vsimem/gmlas_test1.xml")
|
|
assert ds is not None, myhandler.error_list
|
|
assert not myhandler.error_list
|
|
|
|
# Validation errors, but do not prevent dataset opening
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_validate.xml", open_options=["VALIDATE=YES"]
|
|
)
|
|
gdal.PopErrorHandler()
|
|
assert ds is not None
|
|
assert len(myhandler.error_list) == 5
|
|
|
|
# Validation errors and do prevent dataset opening
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_validate.xml",
|
|
open_options=["VALIDATE=YES", "FAIL_IF_VALIDATION_ERROR=YES"],
|
|
)
|
|
gdal.PopErrorHandler()
|
|
assert ds is None
|
|
assert len(myhandler.error_list) == 6
|
|
|
|
# Test that validation without doc doesn't crash
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:", open_options=["XSD=data/gmlas/gmlas_test1.xsd", "VALIDATE=YES"]
|
|
)
|
|
gdal.PopErrorHandler()
|
|
assert ds is not None, myhandler.error_list
|
|
assert not myhandler.error_list
|
|
|
|
|
|
###############################################################################
|
|
# Test correct namespace prefix handling
|
|
|
|
|
|
def test_ogr_gmlas_test_ns_prefix():
|
|
|
|
# The schema doesn't directly import xlink, but indirectly references it
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:", open_options=["XSD=data/gmlas/gmlas_test_targetelement.xsd"]
|
|
)
|
|
|
|
lyr = ds.GetLayerByName("_ogr_fields_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["field_xpath"]
|
|
!= "myns:main_elt/myns:reference_missing_target_elt/@xlink:href"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test parsing documents without namespace
|
|
|
|
|
|
def test_ogr_gmlas_no_namespace():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_no_namespace.xml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["foo"] != "bar":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test CONFIG_FILE
|
|
|
|
|
|
def test_ogr_gmlas_conf():
|
|
|
|
# Non existing file
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=["CONFIG_FILE=not_existing"],
|
|
)
|
|
assert ds is None
|
|
|
|
# Broken conf file
|
|
gdal.FileFromMemBuffer("/vsimem/my_conf.xml", "<broken>")
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=["CONFIG_FILE=/vsimem/my_conf.xml"],
|
|
)
|
|
gdal.Unlink("/vsimem/my_conf.xml")
|
|
assert ds is None
|
|
|
|
# Valid XML, but not validating
|
|
gdal.FileFromMemBuffer("/vsimem/my_conf.xml", "<not_validating/>")
|
|
with gdaltest.error_handler():
|
|
gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=["CONFIG_FILE=/vsimem/my_conf.xml"],
|
|
)
|
|
gdal.Unlink("/vsimem/my_conf.xml")
|
|
|
|
# Inlined conf file + UseArrays = false
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><UseArrays>false</UseArrays></LayerBuildingRules></Configuration>"
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayerByName("main_elt_string_array")
|
|
assert lyr.GetFeatureCount() == 2
|
|
|
|
# AlwaysGenerateOGRId = true
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><AlwaysGenerateOGRId>true</AlwaysGenerateOGRId></LayerBuildingRules></Configuration>"
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
f = lyr.GetNextFeature()
|
|
if f["ogr_pkid"].find("main_elt_1") < 0 or f["otherns_id"] != "otherns_id":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# IncludeGeometryXML = false
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32.gml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><GML><IncludeGeometryXML>false</IncludeGeometryXML></GML></LayerBuildingRules></Configuration>"
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
assert lyr.GetLayerDefn().GetFieldIndex("geometryProperty_xml") < 0
|
|
f = lyr.GetNextFeature()
|
|
geom_idx = lyr.GetLayerDefn().GetGeomFieldIndex("geometryProperty")
|
|
wkt = f.GetGeomFieldRef(geom_idx).ExportToWkt()
|
|
if wkt != "POINT (2 49)":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
# ExposeMetadataLayers = true
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_abstractgeometry_gml32.gml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><ExposeMetadataLayers>true</ExposeMetadataLayers></Configuration>"
|
|
],
|
|
)
|
|
assert ds is not None
|
|
assert ds.GetLayerCount() == 5
|
|
# Test override with open option
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_abstractgeometry_gml32.gml",
|
|
open_options=[
|
|
"EXPOSE_METADATA_LAYERS=NO",
|
|
"CONFIG_FILE=<Configuration><ExposeMetadataLayers>true</ExposeMetadataLayers></Configuration>",
|
|
],
|
|
)
|
|
assert ds is not None
|
|
assert ds.GetLayerCount() == 1
|
|
|
|
# Turn on validation and error on validation
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_validate.xml",
|
|
open_options=[
|
|
'CONFIG_FILE=<Configuration><Validation enabled="true"><FailIfError>true</FailIfError></Validation></Configuration>'
|
|
],
|
|
)
|
|
assert ds is None and gdal.GetLastErrorMsg().find("Validation") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test IgnoredXPaths aspect of config file
|
|
|
|
|
|
def test_ogr_gmlas_conf_ignored_xpath():
|
|
|
|
# Test unsupported and invalid XPaths
|
|
for xpath in [
|
|
"",
|
|
"1",
|
|
"@",
|
|
"@/",
|
|
".",
|
|
":",
|
|
"/:",
|
|
"a:",
|
|
"a:1",
|
|
"foo[1]",
|
|
"foo[@bar='baz']",
|
|
]:
|
|
with gdaltest.error_handler():
|
|
gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_test1.xsd",
|
|
"""CONFIG_FILE=<Configuration>
|
|
<IgnoredXPaths>
|
|
<WarnIfIgnoredXPathFoundInDocInstance>true</WarnIfIgnoredXPathFoundInDocInstance>
|
|
<XPath>%s</XPath>
|
|
</IgnoredXPaths>
|
|
</Configuration>"""
|
|
% xpath,
|
|
],
|
|
)
|
|
assert gdal.GetLastErrorMsg().find("XPath syntax") >= 0, xpath
|
|
|
|
# Test duplicating mapping
|
|
with gdaltest.error_handler():
|
|
gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_test1.xsd",
|
|
"""CONFIG_FILE=<Configuration>
|
|
<IgnoredXPaths>
|
|
<Namespaces>
|
|
<Namespace prefix="ns" uri="http://ns1"/>
|
|
<Namespace prefix="ns" uri="http://ns2"/>
|
|
</Namespaces>
|
|
</IgnoredXPaths>
|
|
</Configuration>""",
|
|
],
|
|
)
|
|
assert gdal.GetLastErrorMsg().find("Prefix ns was already mapped") >= 0
|
|
|
|
# Test XPath with implicit namespace, and warning
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<IgnoredXPaths>
|
|
<WarnIfIgnoredXPathFoundInDocInstance>true</WarnIfIgnoredXPathFoundInDocInstance>
|
|
<XPath>@otherns:id</XPath>
|
|
</IgnoredXPaths>
|
|
</Configuration>"""
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
assert lyr.GetLayerDefn().GetFieldIndex("otherns_id") < 0
|
|
with gdaltest.error_handler():
|
|
lyr.GetNextFeature()
|
|
assert (
|
|
gdal.GetLastErrorMsg().find(
|
|
"Attribute with xpath=myns:main_elt/@otherns:id found in document but ignored"
|
|
)
|
|
>= 0
|
|
)
|
|
|
|
# Test XPath with explicit namespace, and warning suppression
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<IgnoredXPaths>
|
|
<Namespaces>
|
|
<Namespace prefix="other_ns" uri="http://other_ns"/>
|
|
</Namespaces>
|
|
<XPath warnIfIgnoredXPathFoundInDocInstance="false">@other_ns:id</XPath>
|
|
</IgnoredXPaths>
|
|
</Configuration>"""
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
lyr.GetNextFeature()
|
|
assert gdal.GetLastErrorMsg() == ""
|
|
|
|
# Test various XPath syntaxes
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_test1.xsd",
|
|
"""CONFIG_FILE=<Configuration>
|
|
<IgnoredXPaths>
|
|
<WarnIfIgnoredXPathFoundInDocInstance>false</WarnIfIgnoredXPathFoundInDocInstance>
|
|
<XPath>myns:main_elt/@optionalStrAttr</XPath>
|
|
<XPath>myns:main_elt//@fixedValUnset</XPath>
|
|
<XPath>myns:main_elt/myns:base_int</XPath>
|
|
<XPath>//myns:string</XPath>
|
|
<XPath>myns:main_elt//myns:string_array</XPath>
|
|
|
|
<XPath>a</XPath> <!-- no match -->
|
|
<XPath>unknown_ns:foo</XPath> <!-- no match -->
|
|
<XPath>myns:main_elt/myns:int_arra</XPath> <!-- no match -->
|
|
<XPath>foo/myns:long</XPath> <!-- no match -->
|
|
</IgnoredXPaths>
|
|
</Configuration>""",
|
|
],
|
|
)
|
|
assert ds is not None
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
|
|
# Ignored fields
|
|
assert lyr.GetLayerDefn().GetFieldIndex("optionalStrAttr") < 0
|
|
assert lyr.GetLayerDefn().GetFieldIndex("fixedValUnset") < 0
|
|
assert lyr.GetLayerDefn().GetFieldIndex("base_int") < 0
|
|
assert lyr.GetLayerDefn().GetFieldIndex("string") < 0
|
|
assert lyr.GetLayerDefn().GetFieldIndex("string_array") < 0
|
|
|
|
# Present fields
|
|
assert lyr.GetLayerDefn().GetFieldIndex("int_array") >= 0
|
|
assert lyr.GetLayerDefn().GetFieldIndex("long") >= 0
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
do_log = False
|
|
|
|
|
|
class GMLASHTTPHandler(BaseHTTPRequestHandler):
|
|
def log_request(self, code="-", size="-"):
|
|
pass
|
|
|
|
def do_GET(self):
|
|
|
|
try:
|
|
if do_log:
|
|
f = open("/tmp/log.txt", "a")
|
|
f.write("GET %s\n" % self.path)
|
|
f.close()
|
|
|
|
if self.path.startswith("/vsimem/"):
|
|
f = gdal.VSIFOpenL(self.path, "rb")
|
|
if f is None:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
else:
|
|
gdal.VSIFSeekL(f, 0, 2)
|
|
size = gdal.VSIFTellL(f)
|
|
gdal.VSIFSeekL(f, 0, 0)
|
|
content = gdal.VSIFReadL(1, size, f)
|
|
gdal.VSIFCloseL(f)
|
|
self.protocol_version = "HTTP/1.0"
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
return
|
|
|
|
return
|
|
except IOError:
|
|
pass
|
|
|
|
self.send_error(404, "File Not Found: %s" % self.path)
|
|
|
|
|
|
###############################################################################
|
|
# Test schema caching
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"SKIP_OGR_GMLAS_HTTP_RELATED" in os.environ,
|
|
reason="test skipped on CI due to timeout on Windows Conda builds with parallel ctest",
|
|
)
|
|
@pytest.mark.require_curl()
|
|
def test_ogr_gmlas_cache():
|
|
|
|
(webserver_process, webserver_port) = webserver.launch(handler=GMLASHTTPHandler)
|
|
if webserver_port == 0:
|
|
pytest.skip()
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_cache.xml",
|
|
"""<main_elt xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xsi:noNamespaceSchemaLocation="http://localhost:%d/vsimem/ogr_gmlas_cache.xsd">
|
|
<foo>bar</foo>
|
|
</main_elt>
|
|
"""
|
|
% webserver_port,
|
|
)
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_cache.xsd",
|
|
"""<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
|
|
elementFormDefault="qualified" attributeFormDefault="unqualified">
|
|
<xs:include schemaLocation="ogr_gmlas_cache_2.xsd"/>
|
|
</xs:schema>""",
|
|
)
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_cache_2.xsd",
|
|
"""<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
|
|
elementFormDefault="qualified" attributeFormDefault="unqualified">
|
|
<xs:element name="main_elt">
|
|
<xs:complexType>
|
|
<xs:sequence>
|
|
<xs:element name="foo" type="xs:string"/>
|
|
</xs:sequence>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
</xs:schema>""",
|
|
)
|
|
|
|
# First try with remote schema download disabled
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><AllowRemoteSchemaDownload>false</AllowRemoteSchemaDownload><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
assert ds is None and gdal.GetLastErrorMsg().find("Cannot resolve") >= 0
|
|
|
|
# Test invalid cache directory
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/inexisting_directory/not/exist</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
if ds is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
if ds.GetLayerCount() != 1:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail(ds.GetLayerCount())
|
|
|
|
# Will create the directory and download and cache
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
if ds is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
if ds.GetLayerCount() != 1:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail(ds.GetLayerCount())
|
|
|
|
gdal.Unlink("/vsimem/my/gmlas_cache/" + gdal.ReadDir("/vsimem/my/gmlas_cache")[0])
|
|
|
|
# Will reuse the directory and download and cache
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
if ds is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
if ds.GetLayerCount() != 1:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
# With XSD open option
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"XSD=http://localhost:%d/vsimem/ogr_gmlas_cache.xsd" % webserver_port,
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>",
|
|
],
|
|
)
|
|
if ds is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
if ds.GetLayerCount() != 1:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
|
|
# Now re-open with the webserver turned off
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
assert ds is not None
|
|
assert ds.GetLayerCount() == 1
|
|
|
|
# Re try but ask for refresh
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"REFRESH_CACHE=YES",
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>",
|
|
],
|
|
)
|
|
if ds is not None or gdal.GetLastErrorMsg().find("Cannot resolve") < 0:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail(gdal.GetLastErrorMsg())
|
|
|
|
# Re try with non existing cached schema
|
|
gdal.Unlink("/vsimem/my/gmlas_cache/" + gdal.ReadDir("/vsimem/my/gmlas_cache")[0])
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_cache.xml",
|
|
open_options=[
|
|
"CONFIG_FILE=<Configuration><SchemaCache><Directory>/vsimem/my/gmlas_cache</Directory></SchemaCache></Configuration>"
|
|
],
|
|
)
|
|
assert ds is None and gdal.GetLastErrorMsg().find("Cannot resolve") >= 0
|
|
|
|
# Cleanup
|
|
gdal.Unlink("/vsimem/ogr_gmlas_cache.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_cache.xsd")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_cache_2.xsd")
|
|
|
|
files = gdal.ReadDir("/vsimem/my/gmlas_cache")
|
|
for my_file in files:
|
|
gdal.Unlink("/vsimem/my/gmlas_cache/" + my_file)
|
|
gdal.Rmdir("/vsimem/my/gmlas_cache")
|
|
gdal.Rmdir("/vsimem/my")
|
|
|
|
|
|
###############################################################################
|
|
# Test good working of linking to a child through its id attribute
|
|
|
|
|
|
def test_ogr_gmlas_link_nested_independant_child():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_link_nested_independant_child.xml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["second_my_id"] != "second_id":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test some pattern found in geosciml schemas
|
|
|
|
|
|
def test_ogr_gmlas_composition_compositionPart():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_composition_compositionPart.xml")
|
|
|
|
lyr = ds.GetLayerByName("first_composition")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.IsFieldSet("parent_ogr_pkid") == 0
|
|
or f.IsFieldSet("CompositionPart_pkid") == 0
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.IsFieldSet("parent_ogr_pkid") == 0
|
|
or f.IsFieldSet("CompositionPart_pkid") == 0
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("CompositionPart")
|
|
f = lyr.GetNextFeature()
|
|
if f.IsFieldSet("my_id") == 0 or f.IsFieldSet("a") == 0:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f.IsFieldSet("my_id") == 0 or f.IsFieldSet("a") == 0:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test that when importing GML we expose by default only elements deriving
|
|
# from _Feature/AbstractFeature
|
|
|
|
|
|
def test_ogr_gmlas_instantiate_only_gml_feature():
|
|
|
|
with gdaltest.tempfile(
|
|
"/vsimem/with space/gmlas_instantiate_only_gml_feature.xsd",
|
|
open("data/gmlas/gmlas_instantiate_only_gml_feature.xsd", "rb").read(),
|
|
):
|
|
with gdaltest.tempfile(
|
|
"/vsimem/with space/gmlas_fake_gml32.xsd",
|
|
open("data/gmlas/gmlas_fake_gml32.xsd", "rb").read(),
|
|
):
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=/vsimem/with space/gmlas_instantiate_only_gml_feature.xsd"
|
|
],
|
|
)
|
|
assert ds.GetLayerCount() == 1
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test that WFS style timeStamp are ignored for hash generation
|
|
|
|
|
|
def test_ogr_gmlas_timestamp_ignored_for_hash():
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_timestamp_ignored_for_hash_foo.xml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
pkid = f["ogr_pkid"]
|
|
|
|
ds = ogr.Open("GMLAS:data/gmlas/gmlas_timestamp_ignored_for_hash_bar.xml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["ogr_pkid"] != pkid:
|
|
f.DumpReadable()
|
|
pytest.fail(pkid)
|
|
|
|
|
|
###############################################################################
|
|
# Test dataset GetNextFeature()
|
|
|
|
|
|
def test_ogr_gmlas_dataset_getnextfeature():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_test1.xml")
|
|
|
|
assert ds.TestCapability(ogr.ODsCRandomLayerRead) == 1
|
|
|
|
count = 0
|
|
last_l = None
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
last_l = lyr
|
|
|
|
base_count = 59
|
|
assert count == base_count
|
|
|
|
assert last_l.GetName() == "main_elt"
|
|
|
|
f, lyr = ds.GetNextFeature()
|
|
assert f is None and lyr is None
|
|
|
|
ds.ResetReading()
|
|
last_pct = 0
|
|
while True:
|
|
f, l, pct = ds.GetNextFeature(include_pct=True)
|
|
last_pct = pct
|
|
if f is None:
|
|
assert l is None
|
|
break
|
|
assert last_pct == 1.0
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml", open_options=["EXPOSE_METADATA_LAYERS=YES"]
|
|
)
|
|
fc_map = {}
|
|
for layer_name in (
|
|
"_ogr_fields_metadata",
|
|
"_ogr_layers_metadata",
|
|
"_ogr_layer_relationships",
|
|
"_ogr_other_metadata",
|
|
):
|
|
fc_map[layer_name] = ds.GetLayerByName(layer_name).GetFeatureCount()
|
|
ds = None
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml", open_options=["EXPOSE_METADATA_LAYERS=YES"]
|
|
)
|
|
count = 0
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
|
|
expected_count = base_count
|
|
expected_count += fc_map["_ogr_fields_metadata"]
|
|
expected_count += fc_map["_ogr_layers_metadata"]
|
|
expected_count += fc_map["_ogr_layer_relationships"]
|
|
expected_count += fc_map["_ogr_other_metadata"]
|
|
assert count == expected_count
|
|
|
|
f, lyr = ds.GetNextFeature()
|
|
assert f is None and lyr is None
|
|
|
|
ds.ResetReading()
|
|
|
|
count = 0
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
|
|
assert count == expected_count
|
|
|
|
for layers in [
|
|
["_ogr_fields_metadata"],
|
|
["_ogr_layers_metadata"],
|
|
["_ogr_layer_relationships"],
|
|
["_ogr_fields_metadata", "_ogr_layers_metadata"],
|
|
["_ogr_fields_metadata", "_ogr_layer_relationships"],
|
|
["_ogr_layers_metadata", "_ogr_layer_relationships"],
|
|
]:
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_test1.xml")
|
|
expected_count = base_count
|
|
for layer in layers:
|
|
ds.GetLayerByName(layer)
|
|
expected_count += fc_map[layer]
|
|
|
|
count = 0
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
|
|
assert count == expected_count
|
|
|
|
f, lyr = ds.GetNextFeature()
|
|
assert f is None and lyr is None
|
|
|
|
# Test iterating over metadata layers on XSD-only based dataset
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=["XSD=data/gmlas/gmlas_test1.xsd", "EXPOSE_METADATA_LAYERS=YES"],
|
|
)
|
|
count = 0
|
|
last_l = None
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
last_l = lyr
|
|
|
|
assert count != 0
|
|
|
|
|
|
###############################################################################
|
|
# Test that with schemas that have a structure like a base:identifier, we
|
|
# will inline it.
|
|
|
|
|
|
def test_ogr_gmlas_inline_identifier():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:", open_options=["XSD=data/gmlas/gmlas_inline_identifier.xsd"]
|
|
)
|
|
if ds.GetLayerCount() != 2:
|
|
for i in range(ds.GetLayerCount()):
|
|
print(ds.GetLayer(i).GetName())
|
|
pytest.fail(ds.GetLayerCount())
|
|
lyr = ds.GetLayer(0)
|
|
assert lyr.GetLayerDefn().GetFieldIndex("identifier_foo") >= 0
|
|
|
|
|
|
###############################################################################
|
|
# Test that we can handle things like gml:name and au:name
|
|
|
|
|
|
def test_ogr_gmlas_avoid_same_name_inlined_classes():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=["XSD=data/gmlas/gmlas_avoid_same_name_inlined_classes.xsd"],
|
|
)
|
|
assert ds.GetLayerCount() == 3
|
|
lyr = ds.GetLayerByName("myFeature_ns1_dt")
|
|
assert lyr is not None
|
|
lyr = ds.GetLayerByName("myFeature_ns2_dt")
|
|
assert lyr is not None
|
|
|
|
|
|
###############################################################################
|
|
# Test validation with an optional fixed attribute that is ignored
|
|
|
|
|
|
def test_ogr_gmlas_validate_ignored_fixed_attribute():
|
|
|
|
myhandler = MyHandler()
|
|
gdal.PushErrorHandler(myhandler.error_handler)
|
|
gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_validate_ignored_fixed_attribute.xml",
|
|
open_options=[
|
|
"VALIDATE=YES",
|
|
"CONFIG_FILE=<Configuration><IgnoredXPaths><XPath>@bar</XPath></IgnoredXPaths></Configuration>",
|
|
],
|
|
)
|
|
gdal.PopErrorHandler()
|
|
assert not myhandler.error_list
|
|
|
|
|
|
###############################################################################
|
|
# Test REMOVE_UNUSED_LAYERS and REMOVE_UNUSED_FIELDS options
|
|
|
|
|
|
def test_ogr_gmlas_remove_unused_layers_and_fields():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_remove_unused_layers_and_fields.xml",
|
|
open_options=["REMOVE_UNUSED_LAYERS=YES", "REMOVE_UNUSED_FIELDS=YES"],
|
|
)
|
|
assert ds.GetLayerCount() == 1
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if lyr.GetLayerDefn().GetFieldCount() != 4:
|
|
f.DumpReadable()
|
|
pytest.fail(lyr.GetLayerDefn().GetFieldCount())
|
|
assert (
|
|
f["used1"] == "foo"
|
|
and f["used2"] == "bar"
|
|
and f["nillable_nilReason"] == "unknown"
|
|
)
|
|
|
|
lyr = ds.GetLayerByName("_ogr_layers_metadata")
|
|
if lyr.GetFeatureCount() != 1:
|
|
for f in lyr:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("_ogr_fields_metadata")
|
|
if lyr.GetFeatureCount() != 7:
|
|
for f in lyr:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("_ogr_layer_relationships")
|
|
if lyr.GetFeatureCount() != 0:
|
|
for f in lyr:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test xlink resolution
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"SKIP_OGR_GMLAS_HTTP_RELATED" in os.environ,
|
|
reason="test skipped on CI due to timeout on Windows Conda builds with parallel ctest",
|
|
)
|
|
@pytest.mark.require_curl()
|
|
def test_ogr_gmlas_xlink_resolver():
|
|
|
|
(webserver_process, webserver_port) = webserver.launch(handler=GMLASHTTPHandler)
|
|
if webserver_port == 0:
|
|
pytest.skip()
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_xlink_resolver.xsd",
|
|
"""<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
|
|
xmlns:xlink="http://fake_xlink"
|
|
elementFormDefault="qualified"
|
|
attributeFormDefault="unqualified">
|
|
|
|
<xs:import namespace="http://fake_xlink" schemaLocation="ogr_gmlas_xlink_resolver_fake_xlink.xsd"/>
|
|
|
|
<xs:element name="FeatureCollection">
|
|
<xs:complexType>
|
|
<xs:sequence>
|
|
<xs:element ref="main_elt" minOccurs="0" maxOccurs="unbounded"/>
|
|
</xs:sequence>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
|
|
<xs:element name="main_elt">
|
|
<xs:complexType>
|
|
<xs:sequence>
|
|
<xs:element name="my_link">
|
|
<xs:complexType>
|
|
<xs:sequence/>
|
|
<xs:attribute name="attr_before" type="xs:string"/>
|
|
<xs:attribute ref="xlink:href"/>
|
|
<xs:attribute name="attr_after" type="xs:string"/>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
<xs:element name="my_link2" minOccurs="0">
|
|
<xs:complexType>
|
|
<xs:sequence/>
|
|
<xs:attribute name="attr_before" type="xs:string"/>
|
|
<xs:attribute ref="xlink:href"/>
|
|
<xs:attribute name="attr_after" type="xs:string"/>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
</xs:sequence>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
|
|
</xs:schema>""",
|
|
)
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_xlink_resolver_fake_xlink.xsd",
|
|
open("data/gmlas/gmlas_fake_xlink.xsd", "rb").read(),
|
|
)
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
"""
|
|
<FeatureCollection xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:xlink="http://fake_xlink"
|
|
xsi:noNamespaceSchemaLocation="ogr_gmlas_xlink_resolver.xsd">
|
|
<main_elt>
|
|
<my_link attr_before="a" xlink:href="http://localhost:%d/vsimem/resource.xml" attr_after="b"/>
|
|
</main_elt>
|
|
<main_elt>
|
|
<my_link xlink:href="http://localhost:%d/vsimem/resource2.xml"/>
|
|
</main_elt>
|
|
</FeatureCollection>"""
|
|
% (webserver_port, webserver_port),
|
|
)
|
|
|
|
# By default, no resolution
|
|
ds = gdal.OpenEx("GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml")
|
|
lyr = ds.GetLayer(0)
|
|
if lyr.GetLayerDefn().GetFieldIndex("my_link_rawcontent") >= 0:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Enable resolution, but only from local cache
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>false</AllowRemoteDownload>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
if lyr.GetLayerDefn().GetFieldIndex("my_link_rawcontent") < 0:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f.IsFieldSet("my_link_rawcontent"):
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
# Try again but this time with the cached file
|
|
cached_file = (
|
|
"/vsimem/gmlas_xlink_cache/localhost_%d_vsimem_resource.xml" % webserver_port
|
|
)
|
|
gdal.FileFromMemBuffer(cached_file, "foo")
|
|
lyr.ResetReading()
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "foo":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
gdal.Unlink(cached_file)
|
|
|
|
# Enable remote resolution (but local caching disabled)
|
|
gdal.FileFromMemBuffer("/vsimem/resource.xml", "bar")
|
|
gdal.FileFromMemBuffer("/vsimem/resource2.xml", "baz")
|
|
with gdal.config_option("GMLAS_XLINK_RAM_CACHE_SIZE", "5"):
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "bar":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
# Check that the content is not cached
|
|
if gdal.VSIStatL(cached_file) is not None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
# Delete the remote file and check that we can retrieve it from RAM cache
|
|
gdal.Unlink("/vsimem/resource.xml")
|
|
lyr.ResetReading()
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "bar":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "baz":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
gdal.Unlink("/vsimem/resource2.xml")
|
|
lyr.ResetReading()
|
|
# /vsimem/resource.xml has been evicted from the cache
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] is not None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "baz":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
ds = None
|
|
|
|
# Enable remote resolution and caching
|
|
gdal.FileFromMemBuffer("/vsimem/resource.xml", "bar")
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<CacheResults>true</CacheResults>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "bar":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
# Check that the content is cached
|
|
if gdal.VSIStatL(cached_file) is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Enable remote resolution and caching and REFRESH_CACHE
|
|
gdal.FileFromMemBuffer("/vsimem/resource.xml", "baz")
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"REFRESH_CACHE=YES",
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<CacheResults>true</CacheResults>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>""",
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link_rawcontent"] != "baz":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
# Check that the content is cached
|
|
if gdal.VSIStatL(cached_file) is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Test absent remote resource
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_xlink_resolver_absent_resource.xml",
|
|
"""
|
|
<main_elt xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:xlink="http://fake_xlink"
|
|
xsi:noNamespaceSchemaLocation="ogr_gmlas_xlink_resolver.xsd">
|
|
<my_link xlink:href="http://localhost:%d/vsimem/resource_not_existing.xml"/>
|
|
</main_elt>"""
|
|
% webserver_port,
|
|
)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver_absent_resource.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if f.IsFieldSet("my_link_rawcontent"):
|
|
f.DumpReadable()
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Test file size limit
|
|
gdal.Unlink(cached_file)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<MaxFileSize>1</MaxFileSize>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<DefaultResolution enabled="true">
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<CacheResults>true</CacheResults>
|
|
</DefaultResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if gdal.GetLastErrorMsg() == "":
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
# Check that the content is not cached
|
|
if gdal.VSIStatL(cached_file) is not None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Test with URL specific rule with RawContent resolution
|
|
gdal.FileFromMemBuffer("/vsimem/resource.xml", "bar")
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<URLSpecificResolution>
|
|
<URLPrefix>http://localhost:%d/vsimem/</URLPrefix>
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<ResolutionMode>RawContent</ResolutionMode>
|
|
<CacheResults>true</CacheResults>
|
|
</URLSpecificResolution>
|
|
</XLinkResolution></Configuration>"""
|
|
% webserver_port
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["my_link_attr_before"] != "a"
|
|
or f["my_link_href"]
|
|
!= "http://localhost:%d/vsimem/resource.xml" % webserver_port
|
|
or f["my_link_rawcontent"] != "bar"
|
|
or f["my_link_attr_after"] != "b"
|
|
):
|
|
f.DumpReadable()
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
# Check that the content is cached
|
|
if gdal.VSIStatL(cached_file) is None:
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# Test with URL specific rule with FieldsFromXPath resolution
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/subdir1/resource.xml",
|
|
"""
|
|
<?xml version='1.0' encoding='UTF-8'?>
|
|
<myns:top>
|
|
<myns:foo>fooVal</myns:foo>
|
|
<myns:bar>123</myns:bar>
|
|
</myns:top>""",
|
|
)
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml",
|
|
"""
|
|
<?xml version='1.0' encoding='UTF-8'?>
|
|
<myns:top>
|
|
<myns:foo>fooVal2</myns:foo>
|
|
<myns:foo>fooVal3</myns:foo>
|
|
<myns:baz val="345"/>
|
|
<myns:xml_blob>foo<blob/>bar</myns:xml_blob>
|
|
<long>1234567890123</long>
|
|
<double>1.25</double>
|
|
<datetime>2016-10-07T12:34:56Z</datetime>
|
|
</myns:top>""",
|
|
)
|
|
gdal.FileFromMemBuffer("/vsimem/non_matching_resource.xml", "foo")
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
"""
|
|
<FeatureCollection xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:xlink="http://fake_xlink"
|
|
xsi:noNamespaceSchemaLocation="ogr_gmlas_xlink_resolver.xsd">
|
|
<main_elt>
|
|
<my_link attr_before="a" xlink:href="http://localhost:%d/vsimem/subdir1/resource.xml" attr_after="b"/>
|
|
<my_link2 attr_before="a2" xlink:href="http://localhost:%d/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml" attr_after="b2"/>
|
|
</main_elt>
|
|
<main_elt>
|
|
<my_link attr_before="a" xlink:href="http://localhost:%d/vsimem/non_matching_resource.xml" attr_after="b"/>
|
|
<my_link2 attr_before="a2" xlink:href="http://localhost:%d/vsimem/subdir1/resource.xml" attr_after="b2"/>
|
|
</main_elt>
|
|
</FeatureCollection>"""
|
|
% (webserver_port, webserver_port, webserver_port, webserver_port),
|
|
)
|
|
|
|
config_file = """<Configuration>
|
|
<XLinkResolution>
|
|
<CacheDirectory>/vsimem/gmlas_xlink_cache</CacheDirectory>
|
|
<URLSpecificResolution>
|
|
<URLPrefix>http://localhost:%d/vsimem/subdir1</URLPrefix>
|
|
<HTTPHeader>
|
|
<Name>Accept</Name>
|
|
<Value>application/x-iso19135+xml</Value>
|
|
</HTTPHeader>
|
|
<HTTPHeader>
|
|
<Name>Accept-Language</Name>
|
|
<Value>en</Value>
|
|
</HTTPHeader>
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<ResolutionMode>FieldsFromXPath</ResolutionMode>
|
|
<CacheResults>true</CacheResults>
|
|
<Field>
|
|
<Name>foo</Name>
|
|
<Type>string</Type>
|
|
<XPath>myns:top/myns:foo</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>bar</Name>
|
|
<Type>integer</Type>
|
|
<XPath>myns:top/myns:bar</XPath>
|
|
</Field>
|
|
</URLSpecificResolution>
|
|
<URLSpecificResolution>
|
|
<URLPrefix>http://localhost:%d/vsimem/subdir2</URLPrefix>
|
|
<AllowRemoteDownload>true</AllowRemoteDownload>
|
|
<ResolutionMode>FieldsFromXPath</ResolutionMode>
|
|
<CacheResults>true</CacheResults>
|
|
<Field>
|
|
<Name>foo</Name>
|
|
<Type>string</Type>
|
|
<XPath>myns:top/myns:foo</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>baz</Name>
|
|
<Type>integer</Type>
|
|
<XPath>/myns:top/myns:baz/@val</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>xml_blob</Name>
|
|
<Type>string</Type>
|
|
<XPath>//myns:xml_blob</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>long</Name>
|
|
<Type>long</Type>
|
|
<XPath>//long</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>double</Name>
|
|
<Type>double</Type>
|
|
<XPath>//double</XPath>
|
|
</Field>
|
|
<Field>
|
|
<Name>datetime</Name>
|
|
<Type>dateTime</Type>
|
|
<XPath>//datetime</XPath>
|
|
</Field>
|
|
</URLSpecificResolution>
|
|
</XLinkResolution></Configuration>""" % (
|
|
webserver_port,
|
|
webserver_port,
|
|
)
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=["CONFIG_FILE=" + config_file],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["my_link_attr_before"] != "a"
|
|
or f["my_link_href"]
|
|
!= "http://localhost:%d/vsimem/subdir1/resource.xml" % webserver_port
|
|
or f["my_link_foo"] != "fooVal"
|
|
or f["my_link_bar"] != 123
|
|
or f["my_link_attr_after"] != "b"
|
|
or f["my_link2_attr_before"] != "a2"
|
|
or f["my_link2_href"]
|
|
!= "http://localhost:%d/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml"
|
|
% webserver_port
|
|
or f["my_link2_foo"] != "fooVal2 fooVal3"
|
|
or f["my_link2_baz"] != 345
|
|
or f["my_link2_xml_blob"]
|
|
!= """foo<blob />
|
|
bar"""
|
|
or f["my_link2_long"] != 1234567890123
|
|
or f["my_link2_double"] != 1.25
|
|
or f["my_link2_datetime"] != "2016/10/07 12:34:56+00"
|
|
or f["my_link2_bar"] is not None
|
|
or f["my_link2_attr_after"] != "b2"
|
|
):
|
|
f.DumpReadable()
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if f["my_link2_bar"] != 123:
|
|
f.DumpReadable()
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
gdal.Unlink("/vsimem/subdir1/resource.xml")
|
|
gdal.Unlink(
|
|
"/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml"
|
|
)
|
|
|
|
# Test caching
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_xlink_resolver.xml",
|
|
open_options=["CONFIG_FILE=" + config_file],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["my_link_attr_before"] != "a"
|
|
or f["my_link_href"]
|
|
!= "http://localhost:%d/vsimem/subdir1/resource.xml" % webserver_port
|
|
or f["my_link_foo"] != "fooVal"
|
|
or f["my_link_bar"] != 123
|
|
or f["my_link_attr_after"] != "b"
|
|
or f["my_link2_attr_before"] != "a2"
|
|
or f["my_link2_href"]
|
|
!= "http://localhost:%d/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml"
|
|
% webserver_port
|
|
or f["my_link2_foo"] != "fooVal2 fooVal3"
|
|
or f["my_link2_baz"] != 345
|
|
or f["my_link2_bar"] is not None
|
|
or f["my_link2_attr_after"] != "b2"
|
|
):
|
|
f.DumpReadable()
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
pytest.fail()
|
|
|
|
ds = None
|
|
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_xlink_resolver.xsd")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_xlink_resolver_fake_xlink.xsd")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_xlink_resolver.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_xlink_resolver_absent_resource.xml")
|
|
fl = gdal.ReadDir("/vsimem/gmlas_xlink_cache")
|
|
if fl is not None:
|
|
for filename in fl:
|
|
gdal.Unlink("/vsimem/gmlas_xlink_cache/" + filename)
|
|
gdal.Unlink("/vsimem/gmlas_xlink_cache")
|
|
gdal.Unlink("/vsimem/resource.xml")
|
|
gdal.Unlink("/vsimem/resource2.xml")
|
|
gdal.Unlink("/vsimem/subdir1/resource.xml")
|
|
gdal.Unlink(
|
|
"/vsimem/subdir2/resource2_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_super_very_long.xml"
|
|
)
|
|
gdal.Unlink("/vsimem/non_matching_resource.xml")
|
|
|
|
|
|
###############################################################################
|
|
# Test UTF-8 support
|
|
|
|
|
|
def test_ogr_gmlas_recoding():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_recoding.xml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if f["attr"] != "\u00e9":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test schema without namespace prefix
|
|
|
|
|
|
def test_ogr_gmlas_schema_without_namespace_prefix():
|
|
|
|
# Generic http:// namespace URI
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_schema_without_namespace_prefix_generic_http_uri.xsd"
|
|
],
|
|
)
|
|
lyr = ds.GetLayerByName("_ogr_layers_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if f["layer_xpath"] != "my_ns:main_elt":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_schema_without_namespace_prefix.xsd")
|
|
|
|
# http://www.opengis.net/ namespace URI
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_schema_without_namespace_prefix_opengis_uri.xsd"
|
|
],
|
|
)
|
|
lyr = ds.GetLayerByName("_ogr_layers_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if f["layer_xpath"] != "fake_3_0:main_elt":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_schema_without_namespace_prefix.xsd")
|
|
|
|
# Non http:// namespace URI
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_schema_without_namespace_prefix_non_http_uri.xsd"
|
|
],
|
|
)
|
|
lyr = ds.GetLayerByName("_ogr_layers_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if f["layer_xpath"] != "my_namespace:main_elt":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test parsing truncated XML
|
|
|
|
|
|
def test_ogr_gmlas_truncated_xml():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_truncated_xml.xml")
|
|
lyr = ds.GetLayer(0)
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
if f is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test identifier truncation
|
|
|
|
|
|
def test_ogr_gmlas_identifier_truncation():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_identifier_truncation.xsd",
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><IdentifierMaxLength>10</IdentifierMaxLength><PostgreSQLIdentifierLaundering>false</PostgreSQLIdentifierLaundering></LayerBuildingRules></Configuration>",
|
|
],
|
|
)
|
|
lyr = ds.GetLayerByName("v_l_i_clas")
|
|
assert lyr is not None, ds.GetLayer(0).GetName()
|
|
s = lyr.GetLayerDefn().GetFieldDefn(1).GetName()
|
|
assert s == "v_l_idTifi"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(2).GetName()
|
|
assert s == "an_lo_ide1"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(3).GetName()
|
|
assert s == "an_lo_ide2"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(4).GetName()
|
|
assert s == "x"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(5).GetName()
|
|
assert s == "noTCAMELCa"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(6).GetName()
|
|
assert s == "suuuuuuuuu"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(7).GetName()
|
|
assert s == "_r_l_o_n_g"
|
|
lyr = ds.GetLayerByName("a_l_i_cla1")
|
|
assert lyr is not None, ds.GetLayer(1).GetName()
|
|
lyr = ds.GetLayerByName("a_l_i_cla2")
|
|
assert lyr is not None, ds.GetLayer(2).GetName()
|
|
lyr = ds.GetLayerByName("y")
|
|
assert lyr is not None, ds.GetLayer(3).GetName()
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test behaviour when identifiers have same case
|
|
|
|
|
|
def test_ogr_gmlas_identifier_case_ambiguity():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=[
|
|
"XSD=data/gmlas/gmlas_identifier_case_ambiguity.xsd",
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><PostgreSQLIdentifierLaundering>false</PostgreSQLIdentifierLaundering></LayerBuildingRules></Configuration>",
|
|
],
|
|
)
|
|
lyr = ds.GetLayerByName("differentcase1")
|
|
assert lyr is not None, ds.GetLayer(0).GetName()
|
|
s = lyr.GetLayerDefn().GetFieldDefn(1).GetName()
|
|
assert s == "differentcase1"
|
|
s = lyr.GetLayerDefn().GetFieldDefn(2).GetName()
|
|
assert s == "DifferentCASE2"
|
|
lyr = ds.GetLayerByName("DifferentCASE2")
|
|
assert lyr is not None, ds.GetLayer(0).GetName()
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support
|
|
|
|
|
|
@pytest.mark.require_driver("SQLite")
|
|
def test_ogr_gmlas_writer():
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml", open_options=["EXPOSE_METADATA_LAYERS=YES"]
|
|
)
|
|
tmp_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer.db", src_ds, format="SQLite"
|
|
)
|
|
src_ds = None
|
|
ret_ds = gdal.VectorTranslate(
|
|
"tmp/gmlas_test1_generated.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["WRAPPING=GMLAS_FEATURECOLLECTION"],
|
|
)
|
|
tmp_ds = None
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer.db")
|
|
|
|
assert ret_ds is not None
|
|
|
|
|
|
###############################################################################
|
|
# Check the generated .xml and .xsd
|
|
|
|
|
|
@pytest.mark.require_driver("SQLite")
|
|
def test_ogr_gmlas_writer_check_xml_xsd():
|
|
|
|
got = open("tmp/gmlas_test1_generated.xml", "rt").read()
|
|
got = got.replace("\r\n", "\n")
|
|
pos = got.find("http://myns ") + len("http://myns ")
|
|
pos_end = got.find('"', pos)
|
|
absolute_xsd = got[pos:pos_end]
|
|
assert absolute_xsd.endswith("gmlas_test1.xsd") and os.path.exists(absolute_xsd)
|
|
got = got.replace(absolute_xsd, "gmlas_test1.xsd")
|
|
|
|
expected = open("data/gmlas/gmlas_test1_generated.xml", "rt").read()
|
|
expected = expected.replace("\r\n", "\n")
|
|
|
|
if got != expected:
|
|
print(got)
|
|
print("")
|
|
|
|
print("Diff:")
|
|
os.system(
|
|
"diff -u data/gmlas/gmlas_test1_generated.xml tmp/gmlas_test1_generated.xml"
|
|
)
|
|
pytest.fail("Got:")
|
|
|
|
got = open("tmp/gmlas_test1_generated.xsd", "rt").read()
|
|
got = got.replace("\r\n", "\n")
|
|
pos = got.find('schemaLocation="') + len('schemaLocation="')
|
|
pos_end = got.find('"', pos)
|
|
absolute_xsd = got[pos:pos_end]
|
|
assert absolute_xsd.endswith("gmlas_test1.xsd") and os.path.exists(absolute_xsd)
|
|
got = got.replace(absolute_xsd, "gmlas_test1.xsd")
|
|
|
|
expected = open("data/gmlas/gmlas_test1_generated.xsd", "rt").read()
|
|
expected = expected.replace("\r\n", "\n")
|
|
|
|
if got != expected:
|
|
print(got)
|
|
print("")
|
|
|
|
print("Diff:")
|
|
os.system(
|
|
"diff -u data/gmlas/gmlas_test1_generated.xsd tmp/gmlas_test1_generated.xsd"
|
|
)
|
|
pytest.fail("Got:")
|
|
|
|
|
|
###############################################################################
|
|
# Check that the .xml read back by the GMLAS driver has the same content
|
|
# as the original one.
|
|
|
|
|
|
@pytest.mark.require_driver("SQLite")
|
|
def test_ogr_gmlas_writer_check_xml_read_back():
|
|
|
|
# Skip tests when -fsanitize is used
|
|
if gdaltest.is_travis_branch("sanitize"):
|
|
pytest.skip("Skipping because of -sanitize")
|
|
|
|
import test_cli_utilities
|
|
|
|
if test_cli_utilities.get_ogrinfo_path() is None:
|
|
gdal.Unlink("tmp/gmlas_test1_generated.xml")
|
|
gdal.Unlink("tmp/gmlas_test1_generated.xsd")
|
|
pytest.skip()
|
|
|
|
# Compare the ogrinfo dump of the generated .xml with a reference one
|
|
ret = gdaltest.runexternal(
|
|
test_cli_utilities.get_ogrinfo_path()
|
|
+ " -ro -al GMLAS:tmp/gmlas_test1_generated.xml -oo VALIDATE=YES "
|
|
+ "-oo EXPOSE_METADATA_LAYERS=YES "
|
|
+ "-oo @KEEP_RELATIVE_PATHS_FOR_METADATA=YES "
|
|
+ "-oo @EXPOSE_SCHEMAS_NAME_IN_METADATA=NO "
|
|
+ "-oo @EXPOSE_CONFIGURATION_IN_METADATA=NO -oo @HASH=fake_hash"
|
|
)
|
|
expected = open("data/gmlas/gmlas_test1.txt", "rt").read()
|
|
expected = expected.replace("\r\n", "\n")
|
|
expected = expected.replace(
|
|
"data/gmlas/gmlas_test1.xml", "tmp/gmlas_test1_generated.xml"
|
|
)
|
|
expected = expected.replace(
|
|
"data/gmlas/gmlas_test1.xsd",
|
|
os.path.join(os.getcwd(), "data/gmlas/gmlas_test1.xsd"),
|
|
)
|
|
expected = expected.replace("\\", "/")
|
|
ret_for_comparison = ret.replace("\r\n", "\n")
|
|
ret_for_comparison = ret_for_comparison.replace("\\", "/")
|
|
ret_for_comparison = ret_for_comparison.replace(
|
|
"fake_hash", "3CF9893502A592E8CF5EA6EF3D8F8C7B"
|
|
)
|
|
|
|
if ret_for_comparison != expected:
|
|
print(open("tmp/gmlas_test1_generated.xml", "rt").read())
|
|
print("")
|
|
|
|
print("XSD:")
|
|
print(open("tmp/gmlas_test1_generated.xsd", "rt").read())
|
|
print("")
|
|
|
|
print("ogrinfo dump:")
|
|
print(ret)
|
|
print("")
|
|
|
|
open("tmp/gmlas_test1_generated_got.txt", "wt").write(ret_for_comparison)
|
|
open("tmp/gmlas_test1_generated_expected.txt", "wt").write(expected)
|
|
print("Diff:")
|
|
os.system(
|
|
"diff -u tmp/gmlas_test1_generated_expected.txt tmp/gmlas_test1_generated_got.txt"
|
|
)
|
|
|
|
os.unlink("tmp/gmlas_test1_generated_expected.txt")
|
|
os.unlink("tmp/gmlas_test1_generated_got.txt")
|
|
pytest.fail("XML:")
|
|
|
|
gdal.Unlink("tmp/gmlas_test1_generated.xml")
|
|
gdal.Unlink("tmp/gmlas_test1_generated.xsd")
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support with geometries
|
|
|
|
|
|
def test_ogr_gmlas_writer_gml():
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["EXPOSE_METADATA_LAYERS=YES", "@HASH=hash"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
# Test also with GMLAS: prefix as it is likely people might use it
|
|
# as it is needed for the read side.
|
|
ret_ds = gdal.VectorTranslate(
|
|
"GMLAS:/vsimem/ogr_gmlas_writer_gml.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=[
|
|
"WRAPPING=GMLAS_FEATURECOLLECTION",
|
|
"LAYERS={SPATIAL_LAYERS}",
|
|
],
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_gml.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xsd")
|
|
|
|
assert 'xmlns:gml="http://fake_gml32"' in content
|
|
|
|
assert (
|
|
'<ogr:geometryProperty><gml:Point srsName="http://www.opengis.net/def/crs/EPSG/0/4326" gml:id="hash_test_1.geom0"><gml:pos>49 2</gml:pos></gml:Point></ogr:geometryProperty>'
|
|
in content
|
|
)
|
|
|
|
assert (
|
|
'<ogr:pointProperty><gml:Point srsName="http://www.opengis.net/def/crs/EPSG/0/4326" gml:id="hash_test_1.geom2"><gml:pos>50 3</gml:pos></gml:Point></ogr:pointProperty>'
|
|
in content
|
|
)
|
|
|
|
assert (
|
|
' <ogr:pointPropertyRepeated><gml:Point gml:id="hash_test_1.geom13.0"><gml:pos>0 1</gml:pos></gml:Point></ogr:pointPropertyRepeated>'
|
|
in content
|
|
)
|
|
|
|
assert (
|
|
' <ogr:pointPropertyRepeated><gml:Point gml:id="hash_test_1.geom13.1"><gml:pos>1 2</gml:pos></gml:Point></ogr:pointPropertyRepeated>'
|
|
in content
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support with geometries and -a_srs
|
|
|
|
|
|
@pytest.mark.require_driver("SQLite")
|
|
def test_ogr_gmlas_writer_gml_assign_srs():
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["EXPOSE_METADATA_LAYERS=YES", "@HASH=hash"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer_gml.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
dstSRS="EPSG:32631",
|
|
reproject=False,
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_gml.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xsd")
|
|
|
|
assert "http://www.opengis.net/def/crs/EPSG/0/32631" in content
|
|
|
|
# No geometry, but to test that the proxied ExecuteSQL() works
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_test1.xml", open_options=["EXPOSE_METADATA_LAYERS=YES"]
|
|
)
|
|
tmp_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer.db", src_ds, format="SQLite"
|
|
)
|
|
src_ds = None
|
|
gdal.VectorTranslate(
|
|
"/vsimem/gmlas_test1_generated_ref0.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
dstSRS="EPSG:32631",
|
|
reproject=False,
|
|
datasetCreationOptions=["WRAPPING=GMLAS_FEATURECOLLECTION"],
|
|
)
|
|
gdal.VectorTranslate(
|
|
"/vsimem/gmlas_test1_generated_asrs.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
dstSRS="EPSG:32631",
|
|
reproject=False,
|
|
datasetCreationOptions=["WRAPPING=GMLAS_FEATURECOLLECTION"],
|
|
)
|
|
tmp_ds = None
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer.db")
|
|
|
|
assert (
|
|
gdal.VSIStatL("/vsimem/gmlas_test1_generated_ref0.xml").size
|
|
== gdal.VSIStatL("/vsimem/gmlas_test1_generated_asrs.xml").size
|
|
)
|
|
|
|
gdal.Unlink("/vsimem/gmlas_test1_generated_ref0.xml")
|
|
gdal.Unlink("/vsimem/gmlas_test1_generated_ref0.xsd")
|
|
gdal.Unlink("/vsimem/gmlas_test1_generated_asrs.xml")
|
|
gdal.Unlink("/vsimem/gmlas_test1_generated_asrs.xsd")
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support with geometries with original XML content preserved
|
|
|
|
|
|
def test_ogr_gmlas_writer_gml_original_xml():
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=[
|
|
"EXPOSE_METADATA_LAYERS=YES",
|
|
"CONFIG_FILE=<Configuration><LayerBuildingRules><GML><IncludeGeometryXML>true</IncludeGeometryXML></GML></LayerBuildingRules></Configuration>",
|
|
],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer_gml.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["WRAPPING=GMLAS_FEATURECOLLECTION"],
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
ret_ds = None
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_writer_gml.xml", open_options=["VALIDATE=YES"]
|
|
)
|
|
assert ds is not None and gdal.GetLastErrorMsg() == ""
|
|
ds = None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_gml.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xml")
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_gml.xsd")
|
|
|
|
assert (
|
|
'<ogr:geometryProperty> <gml:Point gml:id="poly.geom.Geometry" srsName="urn:ogc:def:crs:EPSG::4326"> <gml:pos>49 2</gml:pos> </gml:Point> </ogr:geometryProperty>'
|
|
in content
|
|
)
|
|
|
|
assert (
|
|
' <ogr:pointPropertyRepeated><gml:Point gml:id="poly.geom.pointPropertyRepeated.1"><gml:pos>0 1</gml:pos></gml:Point></ogr:pointPropertyRepeated>'
|
|
in content
|
|
)
|
|
|
|
assert (
|
|
' <ogr:pointPropertyRepeated><gml:Point gml:id="poly.geom.pointPropertyRepeated.2"><gml:pos>1 2</gml:pos></gml:Point></ogr:pointPropertyRepeated>'
|
|
in content
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support with XSD, INDENT_SIZE, COMMENT, OUTPUT_XSD_FILENAME, TIMESTAMP options
|
|
|
|
|
|
def test_ogr_gmlas_writer_options():
|
|
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["@HASH=hash"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer_options.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=[
|
|
"LAYERS=test",
|
|
"WRAPPING=GMLAS_FEATURECOLLECTION",
|
|
"INPUT_XSD=data/gmlas/gmlas_geometryproperty_gml32.xsd",
|
|
"INDENT_SIZE=4",
|
|
"COMMENT=---a comment---",
|
|
"SRSNAME_FORMAT=OGC_URN",
|
|
"OUTPUT_XSD_FILENAME=/vsimem/my_schema.xsd",
|
|
],
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_options.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_options.xml")
|
|
|
|
assert gdal.VSIStatL("/vsimem/my_schema.xsd") is not None
|
|
|
|
gdal.Unlink("/vsimem/my_schema.xsd")
|
|
|
|
# Test indentation size
|
|
assert '\n <ogr:test gml:id="poly.0">' in content
|
|
|
|
# Test comment
|
|
assert "\n<!-- - - -a comment- - - -->" in content
|
|
|
|
# Test OUTPUT_XSD_FILENAME
|
|
assert "/vsimem/my_schema.xsd" in content
|
|
|
|
# Test SRSNAME_FORMAT=OGC_URN
|
|
assert (
|
|
'<ogr:geometryProperty><gml:Point srsName="urn:ogc:def:crs:EPSG::4326" gml:id="hash_test_1.geom0"><gml:pos>49 2</gml:pos></gml:Point></ogr:geometryProperty>'
|
|
in content
|
|
)
|
|
|
|
# Test TIMESTAMP option
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["@HASH=hash", "EXPOSE_METADATA_LAYERS=YES"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer_options.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=[
|
|
"TIMESTAMP=1970-01-01T12:34:56Z",
|
|
"@REOPEN_DATASET_WITH_GMLAS=NO",
|
|
],
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_options.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_options.xml")
|
|
|
|
assert gdal.VSIStatL("/vsimem/my_schema.xsd") is None
|
|
|
|
assert (
|
|
'timeStamp="1970-01-01T12:34:56Z"' in content
|
|
and 'xsi:schemaLocation="http://www.opengis.net/wfs/2.0 http://schemas.opengis.net/wfs/2.0/wfs.xsd '
|
|
in content
|
|
)
|
|
|
|
# Test WFS20_SCHEMALOCATION option
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["@HASH=hash", "EXPOSE_METADATA_LAYERS=YES"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/ogr_gmlas_writer_options.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["WFS20_SCHEMALOCATION=/vsimem/fake_wfs.xsd"],
|
|
)
|
|
tmp_ds = None
|
|
|
|
assert ret_ds is not None
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/fake_wfs.xsd",
|
|
"""
|
|
<!-- fake wfs schema enough for our purposes -->
|
|
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
|
|
targetNamespace="http://www.opengis.net/wfs/2.0"
|
|
elementFormDefault="qualified">
|
|
<xs:element name="FeatureCollection">
|
|
<xs:complexType>
|
|
<xs:sequence>
|
|
<xs:element name="member" minOccurs="0" maxOccurs="unbounded"/>
|
|
</xs:sequence>
|
|
<xs:attribute name="timeStamp" type="xs:dateTime" use="required"/>
|
|
<xs:attribute name="numberMatched" type="xs:string" fixed="unknown" use="required"/>
|
|
<xs:attribute name="numberReturned" type="xs:nonNegativeInteger" use="required"/>
|
|
</xs:complexType>
|
|
</xs:element>
|
|
</xs:schema>
|
|
""",
|
|
)
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:/vsimem/ogr_gmlas_writer_options.xml", open_options=["VALIDATE=YES"]
|
|
)
|
|
gdal.Unlink("/vsimem/fake_wfs.xsd")
|
|
|
|
assert ds is not None and gdal.GetLastErrorMsg() == ""
|
|
ds = None
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/ogr_gmlas_writer_options.xml", "rb")
|
|
assert f is not None
|
|
content = gdal.VSIFReadL(1, 10000, f).decode("utf-8")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/ogr_gmlas_writer_options.xml")
|
|
|
|
assert gdal.VSIStatL("/vsimem/my_schema.xsd") is None
|
|
|
|
assert (
|
|
'xsi:schemaLocation="http://www.opengis.net/wfs/2.0 /vsimem/fake_wfs.xsd '
|
|
in content
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test writing support error handle
|
|
|
|
|
|
def test_ogr_gmlas_writer_errors():
|
|
|
|
# Source dataset is empty
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/valid.xml",
|
|
gdal.GetDriverByName("Memory").Create("", 0, 0, 0, 0),
|
|
format="GMLAS",
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("Source dataset has no layers") >= 0
|
|
)
|
|
|
|
# Missing input schemas
|
|
src_ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml")
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate("/vsimem/valid.xml", tmp_ds, format="GMLAS")
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find(
|
|
"Cannot establish schema since no INPUT_XSD creation option specified and no _ogr_other_metadata found in source dataset"
|
|
)
|
|
>= 0
|
|
)
|
|
|
|
# Invalid input schema
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/valid.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["INPUT_XSD=/i_do_not/exist.xsd"],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("Cannot resolve /i_do_not/exist.xsd") >= 0
|
|
)
|
|
|
|
# Invalid output .xml name
|
|
src_ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_geometryproperty_gml32_no_error.gml",
|
|
open_options=["EXPOSE_METADATA_LAYERS=YES"],
|
|
)
|
|
tmp_ds = gdal.VectorTranslate("", src_ds, format="Memory")
|
|
src_ds = None
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/i_am/not/valid.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["GENERATE_XSD=NO"],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("Cannot create /i_am/not/valid.xml") >= 0
|
|
)
|
|
|
|
# .xsd extension not allowed
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/i_am/not/valid.xsd",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["GENERATE_XSD=NO"],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find(".xsd extension is not valid") >= 0
|
|
)
|
|
|
|
# Invalid output .xsd name
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/valid.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=[
|
|
"WRAPPING=GMLAS_FEATURECOLLECTION",
|
|
"OUTPUT_XSD_FILENAME=/i_am/not/valid.xsd",
|
|
],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("Cannot create /i_am/not/valid.xsd") >= 0
|
|
)
|
|
gdal.Unlink("/vsimem/valid.xml")
|
|
|
|
# Invalid CONFIG_FILE
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/valid.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["CONFIG_FILE=/i/do_not/exist"],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("Loading of configuration failed") >= 0
|
|
)
|
|
|
|
# Invalid layer name
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate(
|
|
"/vsimem/valid.xml",
|
|
tmp_ds,
|
|
format="GMLAS",
|
|
datasetCreationOptions=["LAYERS=foo"],
|
|
)
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find(
|
|
"Layer foo specified in LAYERS option does not exist"
|
|
)
|
|
>= 0
|
|
)
|
|
gdal.Unlink("/vsimem/valid.xml")
|
|
|
|
# _ogr_layers_metadata not found
|
|
src_ds = gdal.GetDriverByName("Memory").Create("", 0, 0, 0, 0)
|
|
src_ds.CreateLayer("_ogr_other_metadata")
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate("/vsimem/valid.xml", src_ds, format="GMLAS")
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("_ogr_layers_metadata not found") >= 0
|
|
)
|
|
|
|
# _ogr_fields_metadata not found
|
|
src_ds = gdal.GetDriverByName("Memory").Create("", 0, 0, 0, 0)
|
|
src_ds.CreateLayer("_ogr_other_metadata")
|
|
src_ds.CreateLayer("_ogr_layers_metadata")
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate("/vsimem/valid.xml", src_ds, format="GMLAS")
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("_ogr_fields_metadata not found") >= 0
|
|
)
|
|
|
|
# _ogr_layer_relationships not found
|
|
src_ds = gdal.GetDriverByName("Memory").Create("", 0, 0, 0, 0)
|
|
src_ds.CreateLayer("_ogr_other_metadata")
|
|
src_ds.CreateLayer("_ogr_layers_metadata")
|
|
src_ds.CreateLayer("_ogr_fields_metadata")
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate("/vsimem/valid.xml", src_ds, format="GMLAS")
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find("_ogr_layer_relationships not found") >= 0
|
|
)
|
|
|
|
# Cannot find field layer_name in _ogr_layers_metadata layer
|
|
src_ds = gdal.GetDriverByName("Memory").Create("", 0, 0, 0, 0)
|
|
src_ds.CreateLayer("_ogr_other_metadata")
|
|
src_ds.CreateLayer("_ogr_layers_metadata")
|
|
src_ds.CreateLayer("_ogr_fields_metadata")
|
|
src_ds.CreateLayer("_ogr_layer_relationships")
|
|
with gdaltest.error_handler():
|
|
ret_ds = gdal.VectorTranslate("/vsimem/valid.xml", src_ds, format="GMLAS")
|
|
assert (
|
|
ret_ds is None
|
|
and gdal.GetLastErrorMsg().find(
|
|
"Cannot find field layer_name in _ogr_layers_metadata layer"
|
|
)
|
|
>= 0
|
|
)
|
|
gdal.Unlink("/vsimem/valid.xml")
|
|
gdal.Unlink("/vsimem/valid.xsd")
|
|
|
|
|
|
###############################################################################
|
|
# Test reading a particular construct with group, etc... that could cause
|
|
# crashes
|
|
|
|
|
|
def test_ogr_gmlas_read_fake_gmljp2():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/fake_gmljp2.xml")
|
|
|
|
count = 0
|
|
while True:
|
|
f, lyr = ds.GetNextFeature()
|
|
if f is None:
|
|
assert lyr is None
|
|
break
|
|
count += 1
|
|
|
|
assert count == 5
|
|
|
|
|
|
###############################################################################
|
|
# Test TypingConstraints
|
|
|
|
|
|
def test_ogr_gmlas_typing_constraints():
|
|
|
|
# One substitution, no repetition
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_typing_constraints_one_subst_no_repetition.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<TypingConstraints>
|
|
<Namespaces>
|
|
<Namespace prefix="myns_modified_for_fun" uri="http://myns"/>
|
|
</Namespaces>
|
|
<ChildConstraint>
|
|
<ContainerXPath>myns_modified_for_fun:main_elt/myns_modified_for_fun:foo</ContainerXPath>
|
|
<ChildrenElements>
|
|
<Element>myns_modified_for_fun:bar</Element>
|
|
</ChildrenElements>
|
|
</ChildConstraint>
|
|
</TypingConstraints>
|
|
</Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if not f.IsFieldSetAndNotNull("foo_bar_pkid"):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
lyr = ds.GetLayer(1)
|
|
f = lyr.GetNextFeature()
|
|
if f.GetField("value") != "baz":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# One substitution, with repetition
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_typing_constraints_one_subst_with_repetition.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<TypingConstraints>
|
|
<Namespaces>
|
|
<Namespace prefix="myns_modified_for_fun" uri="http://myns"/>
|
|
</Namespaces>
|
|
<ChildConstraint>
|
|
<ContainerXPath>myns_modified_for_fun:main_elt/myns_modified_for_fun:foo</ContainerXPath>
|
|
<ChildrenElements>
|
|
<Element>myns_modified_for_fun:bar</Element>
|
|
</ChildrenElements>
|
|
</ChildConstraint>
|
|
</TypingConstraints>
|
|
</Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer("main_elt_foo_bar")
|
|
assert lyr.GetFeatureCount() == 2
|
|
lyr = ds.GetLayer("bar")
|
|
f = lyr.GetNextFeature()
|
|
if f.GetField("value") != "baz":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f.GetField("value") != "baz2":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
# 2 substitutions
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_typing_constraints_two_subst.xml",
|
|
open_options=[
|
|
"""CONFIG_FILE=<Configuration>
|
|
<TypingConstraints>
|
|
<Namespaces>
|
|
<Namespace prefix="myns_modified_for_fun" uri="http://myns"/>
|
|
</Namespaces>
|
|
<ChildConstraint>
|
|
<ContainerXPath>myns_modified_for_fun:main_elt/myns_modified_for_fun:foo</ContainerXPath>
|
|
<ChildrenElements>
|
|
<Element>myns_modified_for_fun:bar</Element>
|
|
<Element>myns_modified_for_fun:baz</Element>
|
|
</ChildrenElements>
|
|
</ChildConstraint>
|
|
</TypingConstraints>
|
|
</Configuration>"""
|
|
],
|
|
)
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
if not f.IsFieldSetAndNotNull("foo_bar_pkid"):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if f.IsFieldSetAndNotNull("foo_baz_pkid"):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
lyr = ds.GetLayer(1)
|
|
f = lyr.GetNextFeature()
|
|
if f.GetField("value") != "baz":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test swe:DataArray
|
|
|
|
|
|
def test_ogr_gmlas_swe_dataarray():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_swe_dataarray.xml")
|
|
|
|
lyr = ds.GetLayerByName("dataarray_1_components")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
not f.IsFieldSetAndNotNull("parent_ogr_pkid")
|
|
or f.GetField("myTime") != "2016/09/01 00:00:00+01"
|
|
or f.GetField("myCategory") != "1"
|
|
or f.GetField("myQuantity") != 2.34
|
|
or f.GetField("myCount") != 3
|
|
or f.GetField("myText") != "foo"
|
|
or f.GetField("myBoolean") is False
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("myTime") != "2017/09/01 00:00:00"
|
|
or f.GetField("myCategory") != "2"
|
|
or f.GetField("myQuantity") != 3.45
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("dataarray_2")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("myTime") != "2016/09/01 00:00:00+01"
|
|
or f.GetField("myCategory") != "1"
|
|
or f.GetField("myQuantity") != 2.34
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("myTime") != "2017/09/01 00:00:00"
|
|
or f.GetField("myCategory") is not None
|
|
or f.GetField("myQuantity") != 3.45
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("dataarray_3")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("myTime") != "2016/09/01 00:00:00+01"
|
|
or f.GetField("myCategory") != "1"
|
|
or f.GetField("myQuantity") != 2.34
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("myTime") != "2017/09/01 00:00:00"
|
|
or f.GetField("myCategory") is not None
|
|
or f.GetField("myQuantity") != 3.45
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
f = lyr.GetNextFeature()
|
|
if f is not None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test swe:DataRecord
|
|
|
|
|
|
def test_ogr_gmlas_swe_datarecord():
|
|
|
|
gdal.ErrorReset()
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:data/gmlas/gmlas_swe_datarecord.xml", open_options=["VALIDATE=YES"]
|
|
)
|
|
assert gdal.GetLastErrorMsg() == ""
|
|
ds = None
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_swe_datarecord.xml")
|
|
lyr = ds.GetLayerByName("main_elt_foo")
|
|
assert lyr.GetLayerDefn().GetFieldCount() == 12
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f.GetField("mytime_value") != "2017/09/01 00:00:00"
|
|
or f.GetField("mycategory_value") != "myvalue"
|
|
or f.GetField("mycategory_identifier") != "myidentifier"
|
|
or f.GetField("mycategory_codespace_href") != "http://example.com"
|
|
or f.GetField("myquantity_value") != 1.23
|
|
or f.GetField("mycount_value") != 2
|
|
or f.GetField("mytext_value") != "foo"
|
|
or f.GetField("myboolean_value") is False
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test a xs:any field at end of a type declaration
|
|
|
|
|
|
def test_ogr_gmlas_any_field_at_end_of_declaration():
|
|
|
|
# Simplified test case for
|
|
# http://schemas.earthresourceml.org/earthresourceml-lite/1.0/erml-lite.xsd
|
|
# http://services.ga.gov.au/earthresource/ows?service=wfs&version=2.0.0&request=GetFeature&typenames=erl:CommodityResourceView&count=10
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_any_field_at_end_of_declaration.xml")
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
# Will warn about 'Unexpected element with xpath=main_elt/extra (subxpath=main_elt/extra) found'
|
|
# This should be fixed at some point
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = lyr.GetNextFeature()
|
|
assert gdal.GetLastErrorMsg() != ""
|
|
if f.GetField("foo") != "bar":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
if f.GetField("value") != "<something>baz</something>":
|
|
print("Expected fail: value != <something>baz</something>")
|
|
|
|
|
|
###############################################################################
|
|
# Test auxiliary schema without namespace prefix
|
|
|
|
|
|
def test_ogr_gmlas_aux_schema_without_namespace_prefix():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_aux_schema_without_namespace_prefix.xml")
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
f = lyr.GetNextFeature()
|
|
if not f.IsFieldSetAndNotNull("generic_pkid"):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test importing a GML geometry that is in an element that is a substitutionGroup
|
|
# of another one (#6990)
|
|
|
|
|
|
def test_ogr_gmlas_geometry_as_substitutiongroup():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_geometry_as_substitutiongroup.xml")
|
|
lyr = ds.GetLayerByName("foo")
|
|
f = lyr.GetNextFeature()
|
|
if f.GetGeometryRef() is None:
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
# Test importing a GML geometry whose coordinates elements has leading spaces
|
|
|
|
|
|
def test_ogr_gmlas_coordinates_with_leading_space():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/coordinates_with_leading_space.gml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
assert (
|
|
f.GetGeometryRef().ExportToIsoWkt()
|
|
== "POLYGON Z ((1372074.89354568 6205942.4974615 40.4258117361937,1372074.55352024 6205942.4356387 40.4258117361937,1372074.50715313 6205942.69065778 40.4258117361937,1372074.84717857 6205942.75248059 40.4258117361937,1372074.89354568 6205942.4974615 40.4258117361937))"
|
|
)
|
|
ds = None
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_gmlas_extra_piezometre():
|
|
|
|
return compare_ogrinfo_output(
|
|
"data/gmlas/real_world/Piezometre.06512X0037.STREMY.2.gml",
|
|
"data/gmlas/real_world/output/Piezometre.06512X0037.STREMY.2.txt",
|
|
options="-oo REMOVE_UNUSED_LAYERS=YES",
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
@pytest.mark.require_run_on_demand
|
|
def test_ogr_gmlas_extra_eureg():
|
|
|
|
return compare_ogrinfo_output(
|
|
"data/gmlas/real_world/EUReg.example.gml",
|
|
"data/gmlas/real_world/output/EUReg.example.txt",
|
|
options="-oo REMOVE_UNUSED_LAYERS=YES",
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test a schema that has nothing interesting in it but imports another
|
|
# schema
|
|
|
|
|
|
def test_ogr_gmlas_no_element_in_first_choice_schema():
|
|
|
|
ds = gdal.OpenEx(
|
|
"GMLAS:",
|
|
open_options=["XSD=data/gmlas/gmlas_no_element_in_first_choice_schema.xsd"],
|
|
)
|
|
lyr = ds.GetLayerByName("_ogr_layers_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if f["layer_xpath"] != "my_ns:main_elt":
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test cross-layer links with xlink:href="#my_id"
|
|
|
|
|
|
def test_ogr_gmlas_internal_xlink_href():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_internal_xlink_href.xml")
|
|
lyr = ds.GetLayerByName("main_elt")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["link_to_second_or_third_elt_href"] != "#does_not_exist"
|
|
or f.IsFieldSet("link_to_second_or_third_elt_second_elt_pkid")
|
|
or f.IsFieldSet("link_to_second_or_third_elt_third_elt_pkid")
|
|
or f.IsFieldSet("link_to_third_elt_third_elt_pkid")
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["link_to_second_or_third_elt_href"] != "#id2"
|
|
or f["link_to_second_or_third_elt_second_elt_pkid"] != "id2"
|
|
or f.IsFieldSet("link_to_second_or_third_elt_third_elt_pkid")
|
|
or f.IsFieldSet("link_to_third_elt_third_elt_pkid")
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["link_to_second_or_third_elt_href"] != "#id3"
|
|
or f["link_to_second_or_third_elt_second_elt_pkid"] != "id3"
|
|
or f.IsFieldSet("link_to_second_or_third_elt_third_elt_pkid")
|
|
or f.IsFieldSet("link_to_third_elt_third_elt_pkid")
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["link_to_second_or_third_elt_href"] != "#id4"
|
|
or f.IsFieldSet("link_to_second_or_third_elt_second_elt_pkid")
|
|
or f["link_to_second_or_third_elt_third_elt_pkid"]
|
|
!= "D1013B7E44F28C976B976A4314FA4A09_third_elt_1"
|
|
or f.IsFieldSet("link_to_third_elt_third_elt_pkid")
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["link_to_third_elt_href"] != "#id4"
|
|
or f.IsFieldSet("link_to_second_or_third_elt_second_elt_pkid")
|
|
or f.IsFieldSet("link_to_second_or_third_elt_third_elt_pkid")
|
|
or f["link_to_third_elt_third_elt_pkid"]
|
|
!= "D1013B7E44F28C976B976A4314FA4A09_third_elt_1"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("_ogr_fields_metadata")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 1
|
|
or f["field_name"] != "link_to_second_or_third_elt_href"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 2
|
|
or f["field_name"] != "link_to_second_or_third_elt_second_elt_pkid"
|
|
or f["field_xpath"] != "main_elt/link_to_second_or_third_elt/second_elt"
|
|
or f["field_type"] != "string"
|
|
or f["field_is_list"] != 0
|
|
or f["field_min_occurs"] != 0
|
|
or f["field_max_occurs"] != 1
|
|
or f["field_category"] != "PATH_TO_CHILD_ELEMENT_WITH_LINK"
|
|
or f["field_related_layer"] != "second_elt"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 3
|
|
or f["field_name"] != "link_to_second_or_third_elt_third_elt_pkid"
|
|
or f["field_xpath"] != "main_elt/link_to_second_or_third_elt/third_elt"
|
|
or f["field_type"] != "string"
|
|
or f["field_is_list"] != 0
|
|
or f["field_min_occurs"] != 0
|
|
or f["field_max_occurs"] != 1
|
|
or f["field_category"] != "PATH_TO_CHILD_ELEMENT_WITH_LINK"
|
|
or f["field_related_layer"] != "third_elt"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 4
|
|
or f["field_name"] != "link_to_second_or_third_elt"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 5
|
|
or f["field_name"] != "link_to_third_elt_href"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "main_elt"
|
|
or f["field_index"] != 6
|
|
or f["field_name"] != "link_to_third_elt_third_elt_pkid"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["layer_name"] != "third_elt"
|
|
or f["field_index"] != 1
|
|
or f["field_name"] != "id"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
lyr = ds.GetLayerByName("_ogr_layer_relationships")
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["parent_layer"] != "main_elt"
|
|
or f["parent_pkid"] != "ogr_pkid"
|
|
or f["parent_element_name"] != "link_to_third_elt_third_elt_pkid"
|
|
or f["child_layer"] != "third_elt"
|
|
or f["child_pkid"] != "ogr_pkid"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["parent_layer"] != "main_elt"
|
|
or f["parent_pkid"] != "ogr_pkid"
|
|
or f["parent_element_name"] != "link_to_second_or_third_elt_second_elt_pkid"
|
|
or f["child_layer"] != "second_elt"
|
|
or f["child_pkid"] != "id"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
f = lyr.GetNextFeature()
|
|
if (
|
|
f["parent_layer"] != "main_elt"
|
|
or f["parent_pkid"] != "ogr_pkid"
|
|
or f["parent_element_name"] != "link_to_second_or_third_elt_third_elt_pkid"
|
|
or f["child_layer"] != "third_elt"
|
|
or f["child_pkid"] != "ogr_pkid"
|
|
):
|
|
f.DumpReadable()
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test opening a file whose .xsd has a bad version attribute in the <?xml processing instruction
|
|
|
|
|
|
def test_ogr_gmlas_invalid_version_xsd():
|
|
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/gmlas_invalid_version_xsd.xml")
|
|
assert ds is None
|
|
|
|
|
|
###############################################################################
|
|
# Test opening a file whose .xsd leads to huge memory allocation
|
|
# Related to https://issues.apache.org/jira/browse/XERCESC-1051
|
|
|
|
|
|
def test_ogr_gmlas_huge_memory_allocation():
|
|
|
|
with gdaltest.config_option("OGR_GMLAS_XERCES_MAX_TIME", "0"):
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/test_max_mem_xerces.xml")
|
|
assert ds is None
|
|
|
|
|
|
###############################################################################
|
|
# Test opening a file whose .xsd leads to huge processing time
|
|
# Related to https://issues.apache.org/jira/browse/XERCESC-1051
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"SKIP_OGR_GMLAS_HUGE_PROCESSING_TIME" in os.environ,
|
|
reason="test skipped on CI due to random crash on Windows Conda builds with parallel ctest",
|
|
)
|
|
def test_ogr_gmlas_huge_processing_time():
|
|
|
|
with gdaltest.config_option("OGR_GMLAS_XERCES_MAX_TIME", "0.5"):
|
|
with gdaltest.error_handler():
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/test_max_time_xerces.xml")
|
|
assert ds is None
|
|
|
|
|
|
###############################################################################
|
|
# Test opening a file whose .xsd points to GML schemas to a non-OGC official
|
|
# location, which causes some issues when the other namespaces imported by
|
|
# the GML schemas are not edited to point to the unofficial location.
|
|
|
|
|
|
def test_ogr_gmlas_schema_not_in_ogc_schema_location():
|
|
|
|
gdal.ErrorReset()
|
|
assert (
|
|
gdal.OpenEx("GMLAS:data/gmlas/test_gml_schema_not_in_ogc_schema_location.gml")
|
|
is not None
|
|
)
|
|
assert gdal.GetLastErrorMsg() == ""
|
|
|
|
|
|
###############################################################################
|
|
# Test reading a file with srsDimension="3" only on top gml:Envelope (#6986)
|
|
|
|
|
|
def test_ogr_gmlas_read_srsDimension_3_on_top_gml_Envelope():
|
|
|
|
ds = gdal.OpenEx("GMLAS:data/gmlas/global_srsDimension_3.gml")
|
|
lyr = ds.GetLayer(0)
|
|
f = lyr.GetNextFeature()
|
|
assert (
|
|
f.GetGeometryRef().ExportToIsoWkt()
|
|
== "LINESTRING Z (1 2 3,4 5 6,7 8 9,10 11 12)"
|
|
)
|