gdal/autotest/ogr/ogr_vrt.py

3541 строка
107 KiB
Python
Исполняемый файл

#!/usr/bin/env pytest
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR Test Suite
# Purpose: Test OGR VRT driver functionality.
# Author: Frank Warmerdam <warmerdam@pobox.com>
#
###############################################################################
# Copyright (c) 2003, Frank Warmerdam <warmerdam@pobox.com>
# Copyright (c) 2009-2014, Even Rouault <even dot rouault at spatialys.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
###############################################################################
import os
import gdaltest
import ogrtest
import pytest
import test_cli_utilities
from osgeo import gdal, ogr, osr
pytestmark = pytest.mark.require_driver("OGR_VRT")
###############################################################################
@pytest.fixture(autouse=True, scope="module")
def module_disable_exceptions():
with gdaltest.disable_exceptions():
yield
###############################################################################
# Open VRT datasource.
def test_ogr_vrt_1():
with gdaltest.error_handler():
# Complains about dummySrcDataSource as expected.
gdaltest.vrt_ds = ogr.Open("data/vrt/vrt_test.vrt")
if gdaltest.vrt_ds is not None:
return
pytest.fail()
###############################################################################
# Verify the geometries, in the "test2" layer based on x,y,z columns.
#
# Also tests FID-copied-from-source.
def test_ogr_vrt_2():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test2")
extent = lyr.GetExtent()
assert extent == (12.5, 100.0, 17.0, 200.0), "wrong extent"
expect = ["First", "Second"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT(12.5 17 1.2)", max_error=0.000000001
)
== 0
)
assert feat.GetFID() == 0, "Unexpected fid"
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT Z (100 200 0)", max_error=0.000000001
)
== 0
), feat.GetGeometryRef().ExportToIsoWkt()
assert feat.GetFID() == 1, "Unexpected fid"
###############################################################################
# Same test on layer 3 derived from WKT column.
#
# Also tests FID-from-attribute.
def test_ogr_vrt_3():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test3")
expect = ["First", "Second"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT(12.5 17 1.2)", max_error=0.000000001
)
== 0
)
assert feat.GetFID() == 1, "Unexpected fid"
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(feat, "POINT(100 200 0)", max_error=0.000000001)
== 0
), feat.GetGeometryRef().ExportToIsoWkt()
assert feat.GetFID() == 2, "Unexpected fid"
###############################################################################
# Test a spatial query.
def test_ogr_vrt_4():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test3")
lyr.ResetReading()
lyr.SetSpatialFilterRect(90, 90, 300, 300)
expect = ["Second"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(feat, "POINT(100 200 0)", max_error=0.000000001)
== 0
), feat.GetGeometryRef().ExportToIsoWkt()
lyr.SetSpatialFilter(None)
###############################################################################
# Test an attribute query.
def test_ogr_vrt_5():
lyr = gdaltest.vrt_ds.GetLayerByName("test3")
lyr.ResetReading()
lyr.SetAttributeFilter("x < 50")
expect = ["First"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT(12.5 17 1.2)", max_error=0.000000001
)
== 0
)
lyr.SetAttributeFilter(None)
###############################################################################
# Test GetFeature() on layer with FID coming from a column.
def test_ogr_vrt_6():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test3")
lyr.ResetReading()
feat = lyr.GetFeature(2)
assert feat.GetField("other") == "Second", "GetFeature() did not work properly."
###############################################################################
# Same as test 3, but on the result of an SQL query.
#
def test_ogr_vrt_7():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test4")
expect = ["First", "Second"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT(12.5 17 1.2)", max_error=0.000000001
)
== 0
)
assert feat.GetFID() == 1, "Unexpected fid"
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(feat, "POINT(100 200 0)", max_error=0.000000001)
== 0
), feat.GetGeometryRef().ExportToIsoWkt()
assert feat.GetFID() == 2, "Unexpected fid"
###############################################################################
# Similar test, but now we put the whole VRT contents directly into the
# "filename".
#
def test_ogr_vrt_8():
if gdaltest.vrt_ds is None:
pytest.skip()
vrt_xml = '<OGRVRTDataSource><OGRVRTLayer name="test4"><SrcDataSource relativeToVRT="0">data/flat.dbf</SrcDataSource><SrcSQL>SELECT * FROM flat</SrcSQL><FID>fid</FID><GeometryType>wkbPoint</GeometryType><GeometryField encoding="PointFromColumns" x="x" y="y" z="z"/></OGRVRTLayer></OGRVRTDataSource>'
ds = ogr.Open(vrt_xml)
lyr = ds.GetLayerByName("test4")
expect = ["First", "Second"]
tr = ogrtest.check_features_against_list(lyr, "other", expect)
assert tr
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(
feat, "POINT(12.5 17 1.2)", max_error=0.000000001
)
== 0
)
assert feat.GetFID() == 1, "Unexpected fid"
feat = lyr.GetNextFeature()
assert (
ogrtest.check_feature_geometry(feat, "POINT(100 200 0)", max_error=0.000000001)
== 0
), feat.GetGeometryRef().ExportToIsoWkt()
assert feat.GetFID() == 2, "Unexpected fid"
###############################################################################
# Test that attribute filters are passed through to an underlying layer.
def test_ogr_vrt_9():
if gdaltest.vrt_ds is None:
pytest.skip()
lyr = gdaltest.vrt_ds.GetLayerByName("test3")
lyr.SetAttributeFilter("other = 'Second'")
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert feat.GetField("other") == "Second", "attribute filter did not work."
sub_ds = ogr.OpenShared("data/flat.dbf")
sub_layer = sub_ds.GetLayerByName("flat")
sub_layer.ResetReading()
assert sub_layer.GetFeatureCount() == 1, "attribute filter not passed to sublayer."
lyr.SetAttributeFilter(None)
sub_ds.Release()
sub_ds = None
###############################################################################
# Test capabilities
#
def test_ogr_vrt_10():
if gdaltest.vrt_ds is None:
pytest.skip()
vrt_xml = '<OGRVRTDataSource><OGRVRTLayer name="test"><SrcDataSource relativeToVRT="0">data/shp/testpoly.shp</SrcDataSource><SrcLayer>testpoly</SrcLayer></OGRVRTLayer></OGRVRTDataSource>'
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("test")
src_ds = ogr.Open("data/shp/testpoly.shp")
src_lyr = src_ds.GetLayer(0)
assert vrt_lyr.TestCapability(ogr.OLCFastFeatureCount) == src_lyr.TestCapability(
ogr.OLCFastFeatureCount
)
assert vrt_lyr.TestCapability(ogr.OLCFastGetExtent) == src_lyr.TestCapability(
ogr.OLCFastGetExtent
)
assert vrt_lyr.TestCapability(ogr.OLCRandomRead) == src_lyr.TestCapability(
ogr.OLCRandomRead
)
###############################################################################
# Test VRT write capabilities with PointFromColumns geometries
# Test also the reportGeomSrcColumn attribute
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_11():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("x,val1,y,val2,style\n".encode("ascii"))
f.write(
'2,"val11",49,"val12","PEN(c:#FF0000,w:5pt,p:""2px 1pt"")"\n'.encode("ascii")
)
f.close()
try:
os.remove("tmp/test.csvt")
except OSError:
pass
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="PointFromColumns" x="x" y="y" reportSrcColumn="false"/>
<Style>style</Style>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml, update=1)
vrt_lyr = vrt_ds.GetLayerByName("test")
# Only val1, val2, style attributes should be reported
assert vrt_lyr.GetLayerDefn().GetFieldCount() == 3
assert vrt_lyr.GetLayerDefn().GetFieldDefn(0).GetNameRef() == "val1"
assert vrt_lyr.GetLayerDefn().GetFieldDefn(1).GetNameRef() == "val2"
feat = vrt_lyr.GetNextFeature()
if feat.GetStyleString() != 'PEN(c:#FF0000,w:5pt,p:"2px 1pt")':
feat.DumpReadable()
pytest.fail()
feat = ogr.Feature(vrt_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (3 50)")
feat.SetGeometryDirectly(geom)
feat.SetField("val1", "val21")
vrt_lyr.CreateFeature(feat)
vrt_lyr.ResetReading()
feat = vrt_lyr.GetFeature(2)
geom = feat.GetGeometryRef()
if geom.ExportToWkt() != "POINT (3 50)":
feat.DumpReadable()
pytest.fail()
assert feat.GetFieldAsString("val1") == "val21"
# The x and y fields are considered as string by default, so spatial
# filter cannot be turned into attribute filter
with gdaltest.error_handler():
vrt_lyr.SetSpatialFilterRect(0, 40, 10, 49.5)
ret = vrt_lyr.GetFeatureCount()
assert gdal.GetLastErrorMsg().find("not declared as numeric fields") != -1
assert ret == 1
vrt_ds = None
# Add a .csvt file to specify the x and y columns as reals
f = open("tmp/test.csvt", "wb")
f.write("Real,String,Real,String\n".encode("ascii"))
f.close()
vrt_ds = ogr.Open(vrt_xml, update=1)
vrt_lyr = vrt_ds.GetLayerByName("test")
vrt_lyr.SetSpatialFilterRect(0, 40, 10, 49.5)
assert vrt_lyr.GetFeatureCount() == 1
assert gdal.GetLastErrorMsg() == ""
vrt_lyr.SetAttributeFilter("1 = 1")
assert vrt_lyr.GetFeatureCount() == 1
vrt_lyr.SetAttributeFilter("1 = 0")
assert vrt_lyr.GetFeatureCount() == 0
vrt_ds = None
os.remove("tmp/test.csv")
os.remove("tmp/test.csvt")
###############################################################################
# Test VRT write capabilities with WKT geometries
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_12():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("wkt_geom,val1,val2\n".encode("ascii"))
f.write('POINT (2 49),"val11","val12"\n'.encode("ascii"))
f.close()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="WKT" field="wkt_geom"/>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml, update=1)
vrt_lyr = vrt_ds.GetLayerByName("test")
feat = ogr.Feature(vrt_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (3 50)")
feat.SetGeometryDirectly(geom)
feat.SetField("val1", "val21")
vrt_lyr.CreateFeature(feat)
vrt_lyr.ResetReading()
feat = vrt_lyr.GetFeature(2)
geom = feat.GetGeometryRef()
assert geom.ExportToWkt() == "POINT (3 50)"
assert feat.GetFieldAsString("val1") == "val21"
vrt_ds = None
os.remove("tmp/test.csv")
###############################################################################
# Test VRT write capabilities with WKB geometries
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_13():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("wkb_geom,val1,val2\n".encode("ascii"))
f.close()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="WKB" field="wkb_geom"/>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml, update=1)
vrt_lyr = vrt_ds.GetLayerByName("test")
feat = ogr.Feature(vrt_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (3 50)")
feat.SetGeometryDirectly(geom)
feat.SetField("val1", "val21")
vrt_lyr.CreateFeature(feat)
vrt_lyr.ResetReading()
feat = vrt_lyr.GetFeature(1)
geom = feat.GetGeometryRef()
assert geom.ExportToWkt() == "POINT (3 50)"
assert feat.GetFieldAsString("val1") == "val21"
vrt_ds = None
os.remove("tmp/test.csv")
###############################################################################
# Test SrcRegion element for VGS_Direct
def test_ogr_vrt_14():
if gdaltest.vrt_ds is None:
pytest.skip()
with gdaltest.error_handler():
try:
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
except AttributeError:
pass
shp_ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource("tmp/test.shp")
shp_lyr = shp_ds.CreateLayer("test")
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (2 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
shp_ds.ExecuteSQL("CREATE SPATIAL INDEX on test")
shp_ds = None
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="mytest">
<SrcDataSource relativeToVRT="0">tmp/test.shp</SrcDataSource>
<SrcLayer>test</SrcLayer>
<SrcRegion>POLYGON((0 40,0 50,10 50,10 40,0 40))</SrcRegion>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("mytest")
assert vrt_lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1, "Fast filter not set."
extent = vrt_lyr.GetExtent()
assert extent == (2.0, 2.0, 49.0, 49.0), "wrong extent"
assert vrt_lyr.GetFeatureCount() == 1, "Feature count not one as expected."
feat = vrt_lyr.GetNextFeature()
assert feat.GetFID() == 2, "did not get fid 2."
geom = feat.GetGeometryRef()
assert geom.ExportToWkt() == "POINT (2 49)", "did not get expected point geometry."
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 49.5)
if vrt_lyr.GetFeatureCount() != 1:
if gdal.GetLastErrorMsg().find("GEOS support not enabled") != -1:
vrt_ds = None
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
pytest.skip()
print(vrt_lyr.GetFeatureCount())
pytest.fail("did not get one feature on rect spatial filter.")
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 48.5)
assert vrt_lyr.GetFeatureCount() == 0, "Did not get expected zero feature count."
vrt_lyr.SetSpatialFilter(None)
assert (
vrt_lyr.GetFeatureCount() == 1
), "Did not get expected one feature count with no filter."
vrt_ds = None
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
###############################################################################
# Test SrcRegion element for VGS_WKT
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_15():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("wkt_geom,val1,val2\n".encode("ascii"))
f.write("POINT (-10 49),,\n".encode("ascii"))
f.write("POINT (-10 49),,\n".encode("ascii"))
f.write("POINT (2 49),,\n".encode("ascii"))
f.write("POINT (-10 49),,\n".encode("ascii"))
f.close()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="WKT" field="wkt_geom"/>
<SrcRegion>POLYGON((0 40,0 50,10 50,10 40,0 40))</SrcRegion>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("test")
assert vrt_lyr.TestCapability(ogr.OLCFastSpatialFilter) == 0
assert vrt_lyr.GetFeatureCount() == 1
feat = vrt_lyr.GetNextFeature()
assert feat.GetFID() == 3
geom = feat.GetGeometryRef()
assert geom.ExportToWkt() == "POINT (2 49)"
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 49.5)
assert vrt_lyr.GetFeatureCount() == 1
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 48.5)
assert vrt_lyr.GetFeatureCount() == 0
vrt_lyr.SetSpatialFilter(None)
assert vrt_lyr.GetFeatureCount() == 1
vrt_ds = None
os.remove("tmp/test.csv")
###############################################################################
# Test SrcRegion element for VGS_PointFromColumns
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_16():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csvt", "wb")
f.write("Real,Real,String,String\n".encode("ascii"))
f.close()
f = open("tmp/test.csv", "wb")
f.write("x,y,val1,val2\n".encode("ascii"))
f.write("-10,49,,\n".encode("ascii"))
f.write("-10,49,,\n".encode("ascii"))
f.write("2,49,,\n".encode("ascii"))
f.write("-10,49,,\n".encode("ascii"))
f.close()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="PointFromColumns" x="x" y="y"/>
<SrcRegion>POLYGON((0 40,0 50,10 50,10 40,0 40))</SrcRegion>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("test")
assert vrt_lyr.TestCapability(ogr.OLCFastSpatialFilter) == 0
assert vrt_lyr.GetFeatureCount() == 1
feat = vrt_lyr.GetNextFeature()
assert feat.GetFID() == 3
geom = feat.GetGeometryRef()
assert geom.ExportToWkt() == "POINT (2 49)"
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 49.5)
if vrt_lyr.GetFeatureCount() != 1:
if gdal.GetLastErrorMsg().find("GEOS support not enabled") != -1:
vrt_ds = None
os.remove("tmp/test.csv")
os.remove("tmp/test.csvt")
pytest.skip()
pytest.fail()
vrt_lyr.SetSpatialFilterRect(1, 41, 3, 48.5)
assert vrt_lyr.GetFeatureCount() == 0
vrt_lyr.SetSpatialFilter(None)
assert vrt_lyr.GetFeatureCount() == 1
vrt_ds = None
os.remove("tmp/test.csv")
os.remove("tmp/test.csvt")
###############################################################################
# Test explicit field definitions.
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_17():
if gdaltest.vrt_ds is None:
pytest.skip()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">data/prime_meridian.csv</SrcDataSource>
<SrcLayer>prime_meridian</SrcLayer>
<Field name="pm_code" src="PRIME_MERIDIAN_CODE" type="integer" width="4" />
<Field name="prime_meridian_name" width="24" />
<Field name="new_col" type="Real" width="12" precision="3" />
<Field name="DEPRECATED" type="Integer" subtype="Boolean" />
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("test")
assert vrt_lyr.GetLayerDefn().GetFieldCount() == 4, "unexpected field count."
flddef = vrt_lyr.GetLayerDefn().GetFieldDefn(0)
assert (
flddef.GetName() == "pm_code"
and flddef.GetType() == ogr.OFTInteger
and flddef.GetWidth() == 4
and flddef.GetPrecision() == 0
), "pm_code field definition wrong."
flddef = vrt_lyr.GetLayerDefn().GetFieldDefn(1)
assert (
flddef.GetName() == "prime_meridian_name"
and flddef.GetType() == ogr.OFTString
and flddef.GetWidth() == 24
and flddef.GetPrecision() == 0
), "prime_meridian_name field definition wrong."
flddef = vrt_lyr.GetLayerDefn().GetFieldDefn(2)
assert (
flddef.GetName() == "new_col"
and flddef.GetType() == ogr.OFTReal
and flddef.GetWidth() == 12
and flddef.GetPrecision() == 3
), "new_col field definition wrong."
flddef = vrt_lyr.GetLayerDefn().GetFieldDefn(3)
assert (
flddef.GetName() == "DEPRECATED"
and flddef.GetType() == ogr.OFTInteger
and flddef.GetSubType() == ogr.OFSTBoolean
), "DEPRECATED field definition wrong."
feat = vrt_lyr.GetNextFeature()
assert (
feat.GetField(0) == 8901
and feat.GetField(1) == "Greenwich"
and feat.GetField(2) is None
), "did not get expected field value(s)."
###############################################################################
# Test that attribute filters are *not* passed to sublayer by default
# when explicit fields are defined.
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_18():
if gdaltest.vrt_ds is None:
pytest.skip()
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">data/prime_meridian.csv</SrcDataSource>
<SrcLayer>prime_meridian</SrcLayer>
<Field name="pm_code" src="PRIME_MERIDIAN_CODE" type="integer" width="4" />
<Field name="prime_meridian_name" width="24" />
<Field name="new_col" type="Real" width="12" precision="3" />
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml)
vrt_lyr = vrt_ds.GetLayerByName("test")
vrt_lyr.SetAttributeFilter("pm_code=8904")
feat = vrt_lyr.GetNextFeature()
assert feat.GetField(0) == 8904, "Attribute filter not working properly"
###############################################################################
# Run test_ogrsf (optimized path)
def test_ogr_vrt_19_optimized():
if gdaltest.vrt_ds is None:
pytest.skip()
if test_cli_utilities.get_test_ogrsf_path() is None:
pytest.skip()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro data/vrt/poly_vrt.vrt"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
###############################################################################
# Run test_ogrsf (non optimized path)
def test_ogr_vrt_19_nonoptimized():
if gdaltest.vrt_ds is None:
pytest.skip()
if test_cli_utilities.get_test_ogrsf_path() is None:
pytest.skip()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path()
+ " -ro data/vrt/poly_nonoptimized_vrt.vrt"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
###############################################################################
# Test VGS_Direct
def test_ogr_vrt_20():
if gdaltest.vrt_ds is None:
pytest.skip()
with gdaltest.error_handler():
try:
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
except AttributeError:
pass
shp_ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource("tmp/test.shp")
shp_lyr = shp_ds.CreateLayer("test")
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 45)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (2 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
feat = ogr.Feature(shp_lyr.GetLayerDefn())
geom = ogr.CreateGeometryFromWkt("POINT (-10 49)")
feat.SetGeometryDirectly(geom)
shp_lyr.CreateFeature(feat)
shp_ds.ExecuteSQL("CREATE SPATIAL INDEX on test")
shp_ds = None
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="mytest">
<SrcDataSource relativeToVRT="0">tmp/test.shp</SrcDataSource>
<SrcLayer>test</SrcLayer>
</OGRVRTLayer>
</OGRVRTDataSource>"""
vrt_ds = ogr.Open(vrt_xml, update=1)
vrt_lyr = vrt_ds.GetLayerByName("mytest")
assert (
vrt_lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
), "Fast feature count not set."
assert vrt_lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1, "Fast filter not set."
assert vrt_lyr.TestCapability(ogr.OLCFastGetExtent) == 1, "Fast extent not set."
extent = vrt_lyr.GetExtent()
assert extent == (-10.0, 2.0, 45.0, 49.0), "wrong extent"
assert vrt_lyr.GetFeatureCount() == 4, "Feature count not 4 as expected."
vrt_lyr.SetSpatialFilterRect(1, 48.5, 3, 49.5)
if vrt_lyr.GetFeatureCount() != 1:
if gdal.GetLastErrorMsg().find("GEOS support not enabled") != -1:
vrt_ds = None
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
pytest.skip()
print(vrt_lyr.GetFeatureCount())
pytest.fail("did not get one feature on rect spatial filter.")
assert (
vrt_lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
), "Fast feature count not set."
assert vrt_lyr.TestCapability(ogr.OLCFastGetExtent) == 1, "Fast extent not set."
extent = vrt_lyr.GetExtent()
# the shapefile driver currently doesn't change the extent even in the
# presence of a spatial filter, so that could change in the future
assert extent == (-10.0, 2.0, 45.0, 49.0), "wrong extent"
vrt_lyr.SetSpatialFilterRect(1, 48, 3, 48.5)
assert vrt_lyr.GetFeatureCount() == 0, "Did not get expected zero feature count."
vrt_lyr.SetSpatialFilter(None)
assert (
vrt_lyr.GetFeatureCount() == 4
), "Feature count not 4 as expected with no filter."
vrt_lyr.ResetReading()
feat = vrt_lyr.GetNextFeature()
feat.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT (1 2)"))
vrt_lyr.SetFeature(feat)
feat = None
vrt_lyr.ResetReading()
feat = vrt_lyr.GetNextFeature()
if (
feat.GetGeometryRef() is None
or feat.GetGeometryRef().ExportToWkt() != "POINT (1 2)"
):
feat.DumpReadable()
pytest.fail()
vrt_ds = None
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("tmp/test.shp")
###############################################################################
# Test lazy initialization with valid layer
def ogr_vrt_21_internal():
if gdaltest.vrt_ds is None:
pytest.skip()
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetName() == "test3"
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetGeomType() == ogr.wkbPoint
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetSpatialRef().ExportToWkt().find("84") != -1
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.ResetReading()
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetNextFeature() is not None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetFeature(1) is not None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetFeatureCount() != 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.SetNextByIndex(1) == 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetLayerDefn().GetFieldCount() != 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.SetAttributeFilter("") == 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.SetSpatialFilter(None)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetExtent() is not None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
assert lyr.GetFIDColumn() == "fid"
ds = None
feature_defn = ogr.FeatureDefn()
feat = ogr.Feature(feature_defn)
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.CreateFeature(feat)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.SetFeature(feat)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.DeleteFeature(0)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test3")
lyr.SyncToDisk()
ds = None
def test_ogr_vrt_21():
with gdaltest.error_handler():
try:
ret = ogr_vrt_21_internal()
except Exception:
ret = "fail"
return ret
###############################################################################
# Test lazy initialization with invalid layer
def ogr_vrt_22_internal():
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetName() == "test5"
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetGeomType() == ogr.wkbPoint
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetSpatialRef() is None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.ResetReading()
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetNextFeature() is None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetFeature(1) is None
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetFeatureCount() == 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.SetNextByIndex(1) != 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetLayerDefn().GetFieldCount() == 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.SetAttributeFilter("") != 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.SetSpatialFilter(None)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.TestCapability(ogr.OLCFastFeatureCount) == 0
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
assert lyr.GetFIDColumn() == ""
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.GetExtent()
ds = None
feature_defn = ogr.FeatureDefn()
feat = ogr.Feature(feature_defn)
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.CreateFeature(feat)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.SetFeature(feat)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.DeleteFeature(0)
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.SyncToDisk()
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.StartTransaction()
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.CommitTransaction()
ds = None
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test5")
lyr.RollbackTransaction()
ds = None
def test_ogr_vrt_22():
with gdaltest.error_handler():
ogr_vrt_22_internal()
###############################################################################
# Test anti-recursion mechanism
def test_ogr_vrt_23(shared_ds_flag=""):
if int(gdal.VersionInfo("VERSION_NUM")) < 1900:
pytest.skip("would crash")
rec1 = (
"""<OGRVRTDataSource>
<OGRVRTLayer name="rec1">
<SrcDataSource%s>/vsimem/rec2.vrt</SrcDataSource>
<SrcLayer>rec2</SrcLayer>
</OGRVRTLayer>
</OGRVRTDataSource>"""
% shared_ds_flag
)
rec2 = (
"""<OGRVRTDataSource>
<OGRVRTLayer name="rec2">
<SrcDataSource%s>/vsimem/rec1.vrt</SrcDataSource>
<SrcLayer>rec1</SrcLayer>
</OGRVRTLayer>
</OGRVRTDataSource>"""
% shared_ds_flag
)
gdal.FileFromMemBuffer("/vsimem/rec1.vrt", rec1)
gdal.FileFromMemBuffer("/vsimem/rec2.vrt", rec2)
ds = ogr.Open("/vsimem/rec1.vrt")
assert ds is not None
gdal.ErrorReset()
with gdaltest.error_handler():
ds.GetLayer(0).GetLayerDefn()
ds.GetLayer(0).GetFeatureCount()
assert gdal.GetLastErrorMsg() != "", "error expected !"
gdal.Unlink("/vsimem/rec1.vrt")
gdal.Unlink("/vsimem/rec2.vrt")
###############################################################################
# Test anti-recursion mechanism on shared DS
def test_ogr_vrt_24():
test_ogr_vrt_23(' shared="1"')
rec1 = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource shared="1">/vsimem/rec2.vrt</SrcDataSource>
</OGRVRTLayer>
</OGRVRTDataSource>"""
rec2 = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource shared="1">/vsimem/rec2.vrt</SrcDataSource>
</OGRVRTLayer>
</OGRVRTDataSource>"""
gdal.FileFromMemBuffer("/vsimem/rec1.vrt", rec1)
gdal.FileFromMemBuffer("/vsimem/rec2.vrt", rec2)
ds = ogr.Open("/vsimem/rec1.vrt")
assert ds is not None
gdal.ErrorReset()
with gdaltest.error_handler():
ds.GetLayer(0).GetLayerDefn()
ds.GetLayer(0).GetFeatureCount()
assert gdal.GetLastErrorMsg() != "", "error expected !"
gdal.Unlink("/vsimem/rec1.vrt")
gdal.Unlink("/vsimem/rec2.vrt")
###############################################################################
# Test GetFIDColumn()
def test_ogr_vrt_25():
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/vrt_test.vrt")
# test3 layer just declares fid, and implicit fields (so all source
# fields are taken as VRT fields), we can report the fid column
lyr = ds.GetLayerByName("test3")
assert lyr.GetFIDColumn() == "fid"
# test6 layer declares fid, and explicit fields without the fid
lyr = ds.GetLayerByName("test6")
assert lyr.GetFIDColumn() == "fid"
# test7 layer just declares fid with an external visible name
lyr = ds.GetLayerByName("test7")
assert lyr.GetFIDColumn() == "bar"
# test2 layer does not declare fid, and source layer has no fid column
# so nothing to report
lyr = ds.GetLayerByName("test2")
assert lyr.GetFIDColumn() == ""
ds = None
###############################################################################
# Test transaction support
@pytest.mark.require_driver("SQLite")
def test_ogr_vrt_26():
sqlite_ds = ogr.GetDriverByName("SQLite").CreateDataSource("/vsimem/ogr_vrt_26.db")
if sqlite_ds is None:
pytest.skip()
lyr = sqlite_ds.CreateLayer("test")
lyr.CreateField(ogr.FieldDefn("foo", ogr.OFTString))
lyr = None
sqlite_ds.SyncToDisk()
vrt_ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>/vsimem/ogr_vrt_26.db</SrcDataSource>
</OGRVRTLayer>
</OGRVRTDataSource>""",
update=1,
)
lyr = vrt_ds.GetLayer(0)
assert lyr.TestCapability(ogr.OLCTransactions) != 0
lyr.StartTransaction()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, "foo")
lyr.CreateFeature(feat)
feat = None
assert lyr.GetFeatureCount() == 1
lyr.RollbackTransaction()
assert lyr.GetFeatureCount() == 0
lyr.StartTransaction()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, "bar")
lyr.CreateFeature(feat)
feat = None
lyr.CommitTransaction()
assert lyr.GetFeatureCount() == 1
vrt_ds = None
sqlite_ds = None
ogr.GetDriverByName("SQLite").DeleteDataSource("/vsimem/ogr_vrt_26.db")
###############################################################################
# Test shapebin geometry
def test_ogr_vrt_27():
csv = """dummy,shapebin
"dummy","01000000000000000000F03F0000000000000040"
"dummy","0300000000000000000008400000000000001040000000000000144000000000000018400100000002000000000000000000000000000840000000000000104000000000000014400000000000001840"
"dummy","0500000000000000000000000000000000000000000000000000F03F000000000000F03F010000000500000000000000000000000000000000000000000000000000000000000000000000000000F03F000000000000F03F000000000000F03F000000000000F03F000000000000000000000000000000000000000000000000"
"""
gdal.FileFromMemBuffer("/vsimem/ogr_vrt_27.csv", csv)
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTLayer name="ogr_vrt_27">
<SrcDataSource relativeToVRT="0" shared="0">/vsimem/ogr_vrt_27.csv</SrcDataSource>
<GeometryField encoding="shape" field="shapebin"/>
<Field name="foo"/>
</OGRVRTLayer>
</OGRVRTDataSource>"""
)
assert ds is not None
lyr = ds.GetLayer(0)
wkt_list = [
"POINT (1 2)",
"LINESTRING (3 4,5 6)",
"POLYGON ((0 0,0 1,1 1,1 0,0 0))",
]
feat = lyr.GetNextFeature()
i = 0
while feat is not None:
assert ogrtest.check_feature_geometry(feat, wkt_list[i]) == 0
feat = lyr.GetNextFeature()
i = i + 1
ds = None
gdal.Unlink("/vsimem/ogr_vrt_27.csv")
###############################################################################
# Invalid VRT testing
def test_ogr_vrt_28():
with gdaltest.error_handler():
ds = ogr.Open("<OGRVRTDataSource></foo>")
assert ds is None
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_28_invalid.vrt",
"<bla><OGRVRTDataSource></OGRVRTDataSource></bla>",
)
with gdaltest.error_handler():
ds = ogr.Open("/vsimem/ogr_vrt_28_invalid.vrt")
assert ds is None
gdal.Unlink("/vsimem/ogr_vrt_28_invalid.vrt")
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/invalid.vrt")
assert ds is not None
for i in range(ds.GetLayerCount()):
gdal.ErrorReset()
with gdaltest.error_handler():
lyr = ds.GetLayer(i)
lyr.GetNextFeature()
assert (
gdal.GetLastErrorMsg() != ""
), "expected failure for layer %d of datasource %s" % (i, ds.GetName())
ds = None
with gdaltest.error_handler():
ogr.Open("<OGRVRTDataSource><OGRVRTLayer/></OGRVRTDataSource>")
assert gdal.GetLastErrorMsg() != "", "expected error message on datasource opening"
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/invalid2.vrt")
assert gdal.GetLastErrorMsg() != "", "expected error message on datasource opening"
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/invalid3.vrt")
assert gdal.GetLastErrorMsg() != "", "expected error message on datasource opening"
###############################################################################
# Test OGRVRTWarpedLayer
def test_ogr_vrt_29():
gdal.Unlink("tmp/ogr_vrt_29.shp")
gdal.Unlink("tmp/ogr_vrt_29.shx")
gdal.Unlink("tmp/ogr_vrt_29.dbf")
gdal.Unlink("tmp/ogr_vrt_29.prj")
ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource("tmp/ogr_vrt_29.shp")
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
lyr = ds.CreateLayer("ogr_vrt_29", srs=sr)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
for i in range(5):
for j in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, i * 5 + j)
feat.SetGeometry(
ogr.CreateGeometryFromWkt("POINT(%f %f)" % (2 + i / 5.0, 49 + j / 5.0))
)
lyr.CreateFeature(feat)
feat = None
ds = None
# Invalid source layer
with gdaltest.error_handler():
ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/non_existing.shp</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
assert gdal.GetLastErrorMsg() != ""
# Non-spatial layer
with gdaltest.error_handler():
ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="flat">
<SrcDataSource>data/flat.dbf</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
assert gdal.GetLastErrorMsg() != ""
# Missing TargetSRS
with gdaltest.error_handler():
ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
assert gdal.GetLastErrorMsg() != ""
# Invalid TargetSRS
with gdaltest.error_handler():
ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>foo</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
assert gdal.GetLastErrorMsg() != ""
# Invalid SrcSRS
with gdaltest.error_handler():
ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
<SrcSRS>foo</SrcSRS>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
assert gdal.GetLastErrorMsg() != ""
# TargetSRS == source SRS
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:4326</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
lyr = ds.GetLayer(0)
ds = None
# Forced extent
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource>tmp/ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:32631</TargetSRS>
<ExtentXMin>426857</ExtentXMin>
<ExtentYMin>5427475</ExtentYMin>
<ExtentXMax>485608</ExtentXMax>
<ExtentYMax>5516874</ExtentYMax>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
)
lyr = ds.GetLayer(0)
bb = lyr.GetExtent()
assert bb == (426857, 485608, 5427475, 5516874)
ds = None
f = open("tmp/ogr_vrt_29.vrt", "wt")
f.write(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource relativetoVRT="1">ogr_vrt_29.shp</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>\n"""
)
f.close()
# Check reprojection in both directions
ds = ogr.Open("tmp/ogr_vrt_29.vrt", update=1)
lyr = ds.GetLayer(0)
sr = lyr.GetSpatialRef()
got_wkt = sr.ExportToWkt()
assert got_wkt.find("32631") != -1, "did not get expected WKT"
bb = lyr.GetExtent()
expected_bb = (
426857.98771727527,
485607.2165091355,
5427475.0501426803,
5516873.8591036052,
)
for i in range(4):
assert bb[i] == pytest.approx(
expected_bb[i], abs=1
), "did not get expected extent"
feat = lyr.GetNextFeature()
if (
ogrtest.check_feature_geometry(
feat, "POINT(426857.987717275274917 5427937.523466162383556)"
)
!= 0
):
feat.DumpReadable()
pytest.fail()
feat = lyr.GetNextFeature()
feat.SetGeometry(None)
assert lyr.SetFeature(feat) == 0
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(500000 0)"))
assert lyr.SetFeature(feat) == 0
feat = None
lyr.SetSpatialFilterRect(499999, -1, 500001, 1)
lyr.ResetReading()
feat = lyr.GetNextFeature()
if ogrtest.check_feature_geometry(feat, "POINT(500000 0)") != 0:
feat.DumpReadable()
pytest.fail()
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("id", -99)
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(500000 0)"))
assert lyr.CreateFeature(feat) == 0
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("id", -100)
assert lyr.CreateFeature(feat) == 0
feat = None
ds = None
# Check failed operations in read-only
ds = ogr.Open("tmp/ogr_vrt_29.vrt")
lyr = ds.GetLayer(0)
with gdaltest.error_handler():
ret = lyr.DeleteFeature(1)
assert ret != 0
feat = lyr.GetNextFeature()
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(500000 0)"))
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(500000 0)"))
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0
feat = None
ds = None
# Check in .shp file
ds = ogr.Open("tmp/ogr_vrt_29.shp", update=1)
lyr = ds.GetLayer(0)
feat = lyr.GetNextFeature()
feat = lyr.GetNextFeature()
if ogrtest.check_feature_geometry(feat, "POINT(3.0 0.0)") != 0:
feat.DumpReadable()
pytest.fail()
lyr.SetAttributeFilter("id = -99")
lyr.ResetReading()
feat = lyr.GetNextFeature()
if ogrtest.check_feature_geometry(feat, "POINT(3.0 0.0)") != 0:
feat.DumpReadable()
pytest.fail()
lyr.SetAttributeFilter("id = -100")
lyr.ResetReading()
feat = lyr.GetNextFeature()
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, 1000)
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(-180 91)"))
lyr.CreateFeature(feat)
feat = None
ds = None
# Check failed reprojection when reading through VRT
ds = ogr.Open("tmp/ogr_vrt_29.vrt", update=1)
lyr = ds.GetLayer(0)
lyr.SetAttributeFilter("id = 1000")
# Reprojection will fail
with gdaltest.error_handler():
feat = lyr.GetNextFeature()
fid = feat.GetFID()
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail()
with gdaltest.error_handler():
feat = lyr.GetFeature(fid)
if feat.GetGeometryRef() is not None:
feat.DumpReadable()
pytest.fail()
feat = None
lyr.DeleteFeature(fid)
ds = None
f = open("tmp/ogr_vrt_29_2.vrt", "wt")
f.write(
"""<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="ogr_vrt_29">
<SrcDataSource relativetoVRT="1">ogr_vrt_29.vrt</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:4326</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>\n"""
)
f.close()
# Check failed reprojection when writing through VRT
ds = ogr.Open("tmp/ogr_vrt_29_2.vrt", update=1)
lyr = ds.GetLayer(0)
feat = lyr.GetNextFeature()
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(-180 91)"))
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeometry(ogr.CreateGeometryFromWkt("POINT(-180 91)"))
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0
feat = None
ds = None
# Some sanity operations before running test_ogrsf
ds = ogr.Open("tmp/ogr_vrt_29.shp", update=1)
ds.ExecuteSQL("REPACK ogr_vrt_29")
ds.ExecuteSQL("RECOMPUTE EXTENT ON ogr_vrt_29")
ds = None
# Check with test_ogrsf
if test_cli_utilities.get_test_ogrsf_path() is not None:
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_29.vrt"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
gdal.Unlink("tmp/ogr_vrt_29.shp")
gdal.Unlink("tmp/ogr_vrt_29.shx")
gdal.Unlink("tmp/ogr_vrt_29.dbf")
gdal.Unlink("tmp/ogr_vrt_29.prj")
os.unlink("tmp/ogr_vrt_29.vrt")
os.unlink("tmp/ogr_vrt_29_2.vrt")
###############################################################################
# Test OGRVRTUnionLayer
def test_ogr_vrt_30():
for filename in [
"tmp/ogr_vrt_30_1.shp",
"tmp/ogr_vrt_30_1.shx",
"tmp/ogr_vrt_30_1.dbf",
"tmp/ogr_vrt_30_1.prj",
"tmp/ogr_vrt_30_1.qix",
"tmp/ogr_vrt_30_2.shp",
"tmp/ogr_vrt_30_2.shx",
"tmp/ogr_vrt_30_2.dbf",
"tmp/ogr_vrt_30_2.prj",
"tmp/ogr_vrt_30_2.qix",
]:
gdal.Unlink(filename)
ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource("tmp/ogr_vrt_30_1.shp")
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
lyr = ds.CreateLayer("ogr_vrt_30_1", srs=sr)
lyr.CreateField(ogr.FieldDefn("id1", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("id2", ogr.OFTInteger))
for i in range(5):
for j in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, i * 5 + j)
feat.SetField(1, 100 + i * 5 + j)
feat.SetGeometry(
ogr.CreateGeometryFromWkt("POINT(%f %f)" % (2 + i / 5.0, 49 + j / 5.0))
)
lyr.CreateFeature(feat)
feat = None
ds.ExecuteSQL("CREATE SPATIAL INDEX ON ogr_vrt_30_1")
ds = None
ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource("tmp/ogr_vrt_30_2.shp")
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
lyr = ds.CreateLayer("ogr_vrt_30_2", srs=sr)
lyr.CreateField(ogr.FieldDefn("id2", ogr.OFTInteger64))
lyr.CreateField(ogr.FieldDefn("id3", ogr.OFTInteger))
for i in range(5):
for j in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, 200 + i * 5 + j)
feat.SetField(1, 300 + i * 5 + j)
feat.SetGeometry(
ogr.CreateGeometryFromWkt("POINT(%f %f)" % (4 + i / 5.0, 49 + j / 5.0))
)
lyr.CreateFeature(feat)
feat = None
ds.ExecuteSQL("CREATE SPATIAL INDEX ON ogr_vrt_30_2")
ds = None
f = open("tmp/ogr_vrt_30.vrt", "wt")
f.write(
"""<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="ogr_vrt_30_1">
<SrcDataSource relativetoVRT="1">ogr_vrt_30_1.shp</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="ogr_vrt_30_2">
<SrcDataSource relativetoVRT="1">ogr_vrt_30_2.shp</SrcDataSource>
</OGRVRTLayer>
</OGRVRTUnionLayer>
</OGRVRTDataSource>\n"""
)
f.close()
# Check
for check in range(10):
ds = ogr.Open("tmp/ogr_vrt_30.vrt", update=1)
lyr = ds.GetLayer(0)
if check == 0:
sr = lyr.GetSpatialRef()
got_wkt = sr.ExportToWkt()
assert got_wkt.find('GEOGCS["WGS 84"') != -1, "did not get expected WKT"
elif check == 1:
bb = lyr.GetExtent()
expected_bb = (2.0, 4.7999999999999998, 49.0, 49.799999999999997)
for i in range(4):
assert bb[i] == pytest.approx(
expected_bb[i], abs=1
), "did not get expected extent"
elif check == 2:
feat_count = lyr.GetFeatureCount()
assert feat_count == 2 * 5 * 5, "did not get expected feature count"
elif check == 3:
assert (
lyr.GetLayerDefn().GetFieldCount() == 3
), "did not get expected field count"
assert (
lyr.GetLayerDefn().GetFieldDefn(1).GetType() == ogr.OFTInteger64
), "did not get expected field type"
elif check == 4:
feat = lyr.GetNextFeature()
i = 0
while feat is not None:
if i < 5 * 5:
assert feat.GetFID() == i, "did not get expected value"
assert (
feat.GetFieldAsInteger("id1") == i
), "did not get expected value"
assert (
feat.GetFieldAsInteger("id2") == 100 + i
), "did not get expected value"
assert not feat.IsFieldSet("id3"), "did not get expected value"
if (
ogrtest.check_feature_geometry(
feat,
"POINT(%f %f)"
% (2 + int(i / 5) / 5.0, 49 + int(i % 5) / 5.0),
)
!= 0
):
feat.DumpReadable()
pytest.fail("did not get expected value")
else:
assert feat.GetFID() == i, "did not get expected value"
assert not feat.IsFieldSet("id1"), "did not get expected value"
assert (
feat.GetFieldAsInteger("id2") == 200 + i - 5 * 5
), "did not get expected value"
assert (
feat.GetFieldAsInteger("id3") == 300 + i - 5 * 5
), "did not get expected value"
assert (
ogrtest.check_feature_geometry(
feat,
"POINT(%f %f)"
% (
4 + int((i - 5 * 5) / 5) / 5.0,
49 + int((i - 5 * 5) % 5) / 5.0,
),
)
== 0
), "did not get expected value"
i = i + 1
feat = lyr.GetNextFeature()
elif check == 5:
assert lyr.GetGeomType() == ogr.wkbPoint, "did not get expected geom type"
elif check == 6:
assert (
lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCFastGetExtent) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCStringsAsUTF8) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCIgnoreFields) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCRandomWrite) == 0
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCSequentialWrite) == 0
), "did not get expected capability"
elif check == 7:
lyr.SetSpatialFilterRect(2.49, 49.29, 4.49, 49.69)
assert lyr.GetFeatureCount() == 10, "did not get expected feature count"
elif check == 8:
lyr.SetAttributeFilter("id1 = 0")
assert lyr.GetFeatureCount() == 1, "did not get expected feature count"
elif check == 9:
# CreateFeature() should fail
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("id2", 12345)
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0, "should have failed"
feat = None
# SetFeature() should fail
lyr.ResetReading()
feat = lyr.GetNextFeature()
feat.SetField("id2", 45321)
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0, "should have failed"
feat = None
# Test feature existence : should fail
lyr.SetAttributeFilter("id2 = 12345 or id2 = 45321")
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert feat is None, "should have failed"
ds = None
# Check with test_ogrsf
if test_cli_utilities.get_test_ogrsf_path() is not None:
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path()
+ " -ro tmp/ogr_vrt_30.vrt --config OGR_VRT_MAX_OPENED 1"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " tmp/ogr_vrt_30.vrt"
)
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# Test various optional attributes
f = open("tmp/ogr_vrt_30.vrt", "wt")
f.write(
"""<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="ogr_vrt_30_1">
<SrcDataSource relativetoVRT="1">ogr_vrt_30_1.shp</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="ogr_vrt_30_2">
<SrcDataSource relativetoVRT="1">ogr_vrt_30_2.shp</SrcDataSource>
</OGRVRTLayer>
<SourceLayerFieldName>source_layer</SourceLayerFieldName>
<PreserveSrcFID>ON</PreserveSrcFID>
<FieldStrategy>Intersection</FieldStrategy>
<GeometryType>wkbPoint25D</GeometryType>
<LayerSRS>WGS72</LayerSRS>
<FeatureCount>100</FeatureCount>
<ExtentXMin>-180</ExtentXMin>
<ExtentYMin>-90</ExtentYMin>
<ExtentXMax>180</ExtentXMax>
<ExtentYMax>90</ExtentYMax>
</OGRVRTUnionLayer>
</OGRVRTDataSource>\n"""
)
f.close()
for check in range(9):
ds = ogr.Open("tmp/ogr_vrt_30.vrt", update=1)
lyr = ds.GetLayer(0)
if check == 0:
sr = lyr.GetSpatialRef()
got_wkt = sr.ExportToWkt()
assert got_wkt.find("WGS 72") != -1, "did not get expected WKT"
elif check == 1:
bb = lyr.GetExtent()
expected_bb = (-180.0, 180.0, -90.0, 90.0)
for i in range(4):
assert bb[i] == pytest.approx(
expected_bb[i], abs=1
), "did not get expected extent"
elif check == 2:
assert lyr.GetFeatureCount() == 100, "did not get expected feature count"
elif check == 3:
assert (
lyr.GetLayerDefn().GetFieldCount() == 2
), "did not get expected field count"
elif check == 4:
feat = lyr.GetNextFeature()
i = 0
while feat is not None:
if i < 5 * 5:
assert feat.GetFID() == i, "did not get expected value"
assert (
feat.GetFieldAsString("source_layer") == "ogr_vrt_30_1"
), "did not get expected value"
assert (
feat.GetFieldAsInteger("id2") == 100 + i
), "did not get expected value"
else:
assert feat.GetFID() == i - 5 * 5, "did not get expected value"
assert (
feat.GetFieldAsString("source_layer") == "ogr_vrt_30_2"
), "did not get expected value"
assert (
feat.GetFieldAsInteger("id2") == 200 + i - 5 * 5
), "did not get expected value"
i = i + 1
feat = lyr.GetNextFeature()
elif check == 5:
assert (
lyr.GetGeomType() == ogr.wkbPoint25D
), "did not get expected geom type"
elif check == 6:
assert (
lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCFastGetExtent) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCStringsAsUTF8) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCIgnoreFields) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCRandomWrite) == 1
), "did not get expected capability"
assert (
lyr.TestCapability(ogr.OLCSequentialWrite) == 1
), "did not get expected capability"
elif check == 7:
lyr.SetSpatialFilterRect(2.49, 49.29, 4.49, 49.69)
assert lyr.GetFeatureCount() == 10, "did not get expected feature count"
elif check == 8:
# invalid source_layer name with CreateFeature()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("source_layer", "random_name")
feat.SetField("id2", 12345)
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0, "should have failed"
feat = None
# unset source_layer name with CreateFeature()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("id2", 12345)
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0, "should have failed"
feat = None
# FID set with CreateFeature()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetFID(999999)
feat.SetField("source_layer", "ogr_vrt_30_2")
feat.SetField("id2", 12345)
with gdaltest.error_handler():
ret = lyr.CreateFeature(feat)
assert ret != 0, "should have failed"
feat = None
# CreateFeature() OK
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("source_layer", "ogr_vrt_30_2")
feat.SetField("id2", 12345)
assert lyr.CreateFeature(feat) == 0, "should have succeeded"
# SetFeature() OK
feat.SetField("id2", 45321)
assert lyr.SetFeature(feat) == 0, "should have succeeded"
# invalid source_layer name with SetFeature()
feat.SetField("source_layer", "random_name")
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0, "should have failed"
# unset source_layer name with SetFeature()
feat.UnsetField("source_layer")
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0, "should have failed"
# FID unset with SetFeature()
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("source_layer", "ogr_vrt_30_2")
feat.SetField("id2", 12345)
with gdaltest.error_handler():
ret = lyr.SetFeature(feat)
assert ret != 0, "should have failed"
feat = None
# Test feature existence (with passthru)
lyr.SetAttributeFilter("id2 = 45321 AND OGR_GEOMETRY IS NULL")
assert (
lyr.TestCapability(ogr.OLCFastFeatureCount) == 1
), "should have returned 1"
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert feat is not None, "should have succeeded"
# Test feature existence (without passthru)
lyr.SetAttributeFilter(
"id2 = 45321 AND OGR_GEOMETRY IS NULL AND source_layer = 'ogr_vrt_30_2'"
)
assert (
lyr.TestCapability(ogr.OLCFastFeatureCount) == 0
), "should have returned 0"
lyr.ResetReading()
feat = lyr.GetNextFeature()
assert feat is not None, "should have succeeded"
# Test SyncToDisk()
assert lyr.SyncToDisk() == 0, "should have succeeded"
ds = None
for filename in [
"tmp/ogr_vrt_30_1.shp",
"tmp/ogr_vrt_30_1.shx",
"tmp/ogr_vrt_30_1.dbf",
"tmp/ogr_vrt_30_1.prj",
"tmp/ogr_vrt_30_1.qix",
"tmp/ogr_vrt_30_2.shp",
"tmp/ogr_vrt_30_2.shx",
"tmp/ogr_vrt_30_2.dbf",
"tmp/ogr_vrt_30_2.prj",
"tmp/ogr_vrt_30_2.qix",
]:
gdal.Unlink(filename)
os.unlink("tmp/ogr_vrt_30.vrt")
###############################################################################
# Test anti-recursion mechanism with union layer
def test_ogr_vrt_31(shared_ds_flag=""):
rec1 = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="rec1">
<OGRVRTLayer name="rec2">
<SrcDataSource%s>/vsimem/rec2.vrt</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="rec1">
<SrcDataSource%s>/vsimem/rec1.vrt</SrcDataSource>
</OGRVRTLayer>
</OGRVRTUnionLayer>
</OGRVRTDataSource>""" % (
shared_ds_flag,
shared_ds_flag,
)
rec2 = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="rec2">
<OGRVRTLayer name="rec1">
<SrcDataSource%s>/vsimem/rec1.vrt</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="rec2">
<SrcDataSource%s>/vsimem/rec2.vrt</SrcDataSource>
</OGRVRTLayer>
</OGRVRTUnionLayer>
</OGRVRTDataSource>""" % (
shared_ds_flag,
shared_ds_flag,
)
gdal.FileFromMemBuffer("/vsimem/rec1.vrt", rec1)
gdal.FileFromMemBuffer("/vsimem/rec2.vrt", rec2)
ds = ogr.Open("/vsimem/rec1.vrt")
assert ds is not None
gdal.ErrorReset()
with gdaltest.error_handler():
ds.GetLayer(0).GetLayerDefn()
ds.GetLayer(0).GetFeatureCount()
assert gdal.GetLastErrorMsg() != "", "error expected !"
gdal.Unlink("/vsimem/rec1.vrt")
gdal.Unlink("/vsimem/rec2.vrt")
###############################################################################
# Test anti-recursion mechanism on shared DS
def test_ogr_vrt_32():
return test_ogr_vrt_31(' shared="1"')
###############################################################################
# Test multi-geometry support
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_33():
try:
import shutil
shutil.rmtree("tmp/ogr_vrt_33")
except OSError:
pass
ds = ogr.GetDriverByName("CSV").CreateDataSource(
"tmp/ogr_vrt_33", options=["GEOMETRY=AS_WKT"]
)
lyr = ds.CreateLayer("test", geom_type=ogr.wkbNone, options=["CREATE_CSVT=YES"])
lyr.CreateGeomField(ogr.GeomFieldDefn("geom__WKT_EPSG_4326_POINT", ogr.wkbPoint))
lyr.CreateGeomField(
ogr.GeomFieldDefn("geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon)
)
lyr.CreateGeomField(
ogr.GeomFieldDefn("geom__WKT_EPSG_4326_LINESTRING", ogr.wkbLineString)
)
lyr.CreateField(ogr.FieldDefn("X", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("Y", ogr.OFTReal))
lyr = ds.CreateLayer("test2", geom_type=ogr.wkbNone, options=["CREATE_CSVT=YES"])
lyr.CreateGeomField(
ogr.GeomFieldDefn("geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon)
)
lyr.CreateGeomField(ogr.GeomFieldDefn("geom__WKT_EPSG_4326_POINT", ogr.wkbPoint))
lyr.CreateGeomField(ogr.GeomFieldDefn("geom__WKT_EPSG_32631_POINT", ogr.wkbPoint))
lyr.CreateField(ogr.FieldDefn("Y", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("X", ogr.OFTReal))
ds = None
ds = ogr.Open("tmp/ogr_vrt_33", update=1)
lyr = ds.GetLayerByName("test")
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeomField(0, ogr.CreateGeometryFromWkt("POINT (1 2)"))
feat.SetGeomField(1, ogr.CreateGeometryFromWkt("POLYGON ((0 0,0 1,1 1,1 0,0 0))"))
feat.SetField("X", -1)
feat.SetField("Y", -2)
lyr.CreateFeature(feat)
lyr = ds.GetLayerByName("test2")
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeomField(0, ogr.CreateGeometryFromWkt("POLYGON ((1 1,1 2,2 2,2 1,1 1))"))
feat.SetGeomField(1, ogr.CreateGeometryFromWkt("POINT (3 4)"))
feat.SetGeomField(2, ogr.CreateGeometryFromWkt("POINT (5 6)"))
feat.SetField("X", -3)
feat.SetField("Y", -4)
lyr.CreateFeature(feat)
ds = None
for i in range(2):
if i == 0:
# Minimalist definition.
ds_str = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
</OGRVRTDataSource>"""
else:
# Implicit fields
ds_str = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
<GeometryField name="geom__WKT_EPSG_4326_POINT"/>
<GeometryField name="geom__WKT_EPSG_32632_POLYGON"/>
</OGRVRTLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
for j in range(5):
if j == 0:
assert lyr.GetGeomType() == ogr.wkbPoint
elif j == 1:
assert lyr.GetSpatialRef().ExportToWkt().find("GEOGCS") == 0
elif j == 2:
assert (
lyr.GetLayerDefn().GetGeomFieldDefn(1).GetType() == ogr.wkbPolygon
)
elif j == 3:
assert lyr.GetExtent(geom_field=1) == (0, 1, 0, 1)
elif j == 4:
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(1)
.GetSpatialRef()
.ExportToWkt()
.find("PROJCS")
== 0
)
feat = lyr.GetNextFeature()
if feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)":
feat.DumpReadable()
pytest.fail()
if (
feat.GetGeomFieldRef(1).ExportToWkt()
!= "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
):
feat.DumpReadable()
pytest.fail()
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
if ogrtest.have_geos():
# Select only second field and change various attributes
ds_str = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
<GeometryField name="foo" field="geom__WKT_EPSG_32632_POLYGON">
<GeometryType>wkbPolygon25D</GeometryType>
<SRS>+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +wktext +no_defs</SRS>
<ExtentXMin>1</ExtentXMin>
<ExtentYMin>2</ExtentYMin>
<ExtentXMax>3</ExtentXMax>
<ExtentYMax>4</ExtentYMax>
<SrcRegion clip="true">POLYGON((0.5 0.5,1.5 0.5,1.5 1.5,0.5 1.5,0.5 0.5))</SrcRegion>
</GeometryField>
</OGRVRTLayer>
</OGRVRTDataSource>"""
for i in range(6):
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
if i == 0:
assert lyr.GetGeomType() == ogr.wkbPolygon25D
elif i == 1:
assert (
lyr.GetSpatialRef()
.ExportToWkt()
.find(
"+proj=merc +a=6378137 +b=6378137 +lat_ts=0 +lon_0=0 +x_0=0 +y_0=0 +k=1 +units=m +nadgrids=@null +wktext +no_defs"
)
>= 0
)
elif i == 2:
assert lyr.GetGeometryColumn() == "foo"
elif i == 3:
assert lyr.TestCapability(ogr.OLCFastGetExtent) == 1
assert lyr.GetExtent(geom_field=0) == (1, 3, 2, 4)
elif i == 4:
assert lyr.TestCapability(ogr.OLCFastFeatureCount) == 0
elif i == 5:
assert lyr.GetLayerDefn().GetGeomFieldCount() == 1
feat = lyr.GetNextFeature()
if feat.GetGeomFieldRef(0).ExportToWkt() not in (
"POLYGON ((0.5 1.0,1 1,1.0 0.5,0.5 0.5,0.5 1.0))",
"POLYGON ((1 1,1.0 0.5,0.5 0.5,0.5 1.0,1 1))",
):
feat.DumpReadable()
pytest.fail()
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# No geometry fields
ds_str = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
<GeometryType>wkbNone</GeometryType>
</OGRVRTLayer>
</OGRVRTDataSource>"""
for i in range(4):
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
if i == 0:
assert lyr.GetGeomType() == ogr.wkbNone
elif i == 1:
assert lyr.GetSpatialRef() is None
elif i == 2:
lyr.TestCapability(ogr.OLCFastFeatureCount)
lyr.TestCapability(ogr.OLCFastGetExtent)
lyr.TestCapability(ogr.OLCFastSpatialFilter)
elif i == 3:
assert lyr.GetLayerDefn().GetGeomFieldCount() == 0
feat = lyr.GetNextFeature()
if feat.GetGeomFieldRef(0) is not None:
feat.DumpReadable()
pytest.fail()
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
ds_str = """<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
<GeometryField name="geom__WKT_EPSG_4326_POINT"/>
<GeometryField name="foo" encoding="PointFromColumns" x="X" y="Y" reportSrcColumn="false">
<SRS>EPSG:4326</SRS>
</GeometryField>
</OGRVRTLayer>
</OGRVRTDataSource>
"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
assert lyr.GetLayerDefn().GetGeomFieldDefn(1).GetType() == ogr.wkbPoint
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(1)
.GetSpatialRef()
.ExportToWkt()
.find("4326")
>= 0
)
feat = lyr.GetNextFeature()
if feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)":
feat.DumpReadable()
pytest.fail()
if feat.GetGeomFieldRef(1).ExportToWkt() != "POINT (-1 -2)":
feat.DumpReadable()
pytest.fail()
lyr.SetSpatialFilterRect(1, 0, 0, 0, 0)
assert lyr.GetFeatureCount() == 0
lyr.SetSpatialFilterRect(1, -1.1, -2.1, -0.9, -1.9)
assert lyr.GetFeatureCount() == 1
lyr.SetSpatialFilter(1, None)
assert lyr.GetFeatureCount() == 1
lyr.SetIgnoredFields(["geom__WKT_EPSG_4326_POINT"])
lyr.ResetReading()
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0) is not None
or feat.GetGeomFieldRef(1).ExportToWkt() != "POINT (-1 -2)"
):
feat.DumpReadable()
pytest.fail()
lyr.SetIgnoredFields(["foo"])
lyr.ResetReading()
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(1) is not None
or feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
):
feat.DumpReadable()
pytest.fail()
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# Warped layer without explicit WarpedGeomFieldName
ds_str = """<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
assert lyr.GetSpatialRef().ExportToWkt().find("32631") >= 0
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt().find("POINT") < 0
or feat.GetGeomFieldRef(0).ExportToWkt() == "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
):
feat.DumpReadable()
pytest.fail()
# Warped layer with explicit WarpedGeomFieldName (that is the first field)
ds_str = """<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<WarpedGeomFieldName>geom__WKT_EPSG_4326_POINT</WarpedGeomFieldName>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
assert lyr.GetSpatialRef().ExportToWkt().find("32631") >= 0
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt().find("POINT") < 0
or feat.GetGeomFieldRef(0).ExportToWkt() == "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
):
feat.DumpReadable()
pytest.fail()
# Warped layer with explicit WarpedGeomFieldName (that does NOT exist)
ds_str = """<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<WarpedGeomFieldName>foo</WarpedGeomFieldName>
<TargetSRS>EPSG:32631</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
with gdaltest.error_handler():
ogr.Open(ds_str)
assert gdal.GetLastErrorMsg() != ""
# Warped layer with explicit WarpedGeomFieldName (that is the second field)
ds_str = """<OGRVRTDataSource>
<OGRVRTWarpedLayer>
<OGRVRTLayer name="test">
<SrcDataSource>tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<WarpedGeomFieldName>geom__WKT_EPSG_32632_POLYGON</WarpedGeomFieldName>
<TargetSRS>EPSG:4326</TargetSRS>
</OGRVRTWarpedLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(1)
.GetSpatialRef()
.ExportToWkt()
.find("4326")
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(1).ExportToWkt().find("POLYGON") < 0
or feat.GetGeomFieldRef(1).ExportToWkt() == "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
or feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
):
feat.DumpReadable()
pytest.fail()
# UnionLayer with default union strategy
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
geom_fields = [
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4326"],
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon, "32632"],
["geom__WKT_EPSG_4326_LINESTRING", ogr.wkbLineString, "4326"],
["geom__WKT_EPSG_32631_POINT", ogr.wkbPoint, "32631"],
]
assert lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
for i, geom_field in enumerate(geom_fields):
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
or feat.GetGeomFieldRef(2) is not None
or feat.GetGeomFieldRef(3) is not None
):
feat.DumpReadable()
pytest.fail()
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (3 4)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((1 1,1 2,2 2,2 1,1 1))"
or feat.GetGeomFieldRef(2) is not None
or feat.GetGeomFieldRef(3).ExportToWkt() != "POINT (5 6)"
):
feat.DumpReadable()
pytest.fail()
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# UnionLayer with intersection strategy
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<FieldStrategy>Intersection</FieldStrategy>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
geom_fields = [
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4326"],
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon, "32632"],
]
assert lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
for i, geom_field in enumerate(geom_fields):
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
):
feat.DumpReadable()
pytest.fail()
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (3 4)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((1 1,1 2,2 2,2 1,1 1))"
):
feat.DumpReadable()
pytest.fail()
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# UnionLayer with FirstLayer strategy
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<FieldStrategy>FirstLayer</FieldStrategy>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
geom_fields = [
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4326"],
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon, "32632"],
["geom__WKT_EPSG_4326_LINESTRING", ogr.wkbLineString, "4326"],
]
assert lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
for i, geom_field in enumerate(geom_fields):
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
):
feat.DumpReadable()
pytest.fail()
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (3 4)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((1 1,1 2,2 2,2 1,1 1))"
):
feat.DumpReadable()
pytest.fail()
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# UnionLayer with explicit fields but without further information
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<GeometryField name="geom__WKT_EPSG_32632_POLYGON"/>
<GeometryField name="geom__WKT_EPSG_4326_POINT"/>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
geom_fields = [
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon, "32632"],
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4326"],
]
assert lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
for i, geom_field in enumerate(geom_fields):
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POINT (1 2)"
):
feat.DumpReadable()
pytest.fail()
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# UnionLayer with explicit fields with extra information
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<GeometryField name="geom__WKT_EPSG_32632_POLYGON">
<GeometryType>wkbPolygon25D</GeometryType>
</GeometryField>
<GeometryField name="geom__WKT_EPSG_4326_POINT">
<SRS>EPSG:4322</SRS> <!-- will trigger reprojection -->
<ExtentXMin>1</ExtentXMin>
<ExtentYMin>2</ExtentYMin>
<ExtentXMax>3</ExtentXMax>
<ExtentYMax>4</ExtentYMax>
</GeometryField>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
geom_fields = [
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon25D, "32632"],
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4322"],
]
assert lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
bb = lyr.GetExtent(geom_field=1)
assert bb == (1, 3, 2, 4)
for i, geom_field in enumerate(geom_fields):
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
or feat.GetGeomFieldRef(1).ExportToWkt().find("POINT (") != 0
or feat.GetGeomFieldRef(1).ExportToWkt() == "POINT (1 2)"
):
feat.DumpReadable()
pytest.fail()
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
# UnionLayer with geometry fields disabled
ds_str = """<OGRVRTDataSource>
<OGRVRTUnionLayer name="union_layer">
<OGRVRTLayer name="test">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<OGRVRTLayer name="test2">
<SrcDataSource shared="1">tmp/ogr_vrt_33</SrcDataSource>
</OGRVRTLayer>
<GeometryType>wkbNone</GeometryType>
</OGRVRTUnionLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(ds_str)
lyr = ds.GetLayer(0)
assert lyr.GetLayerDefn().GetGeomFieldCount() == 0
assert (
lyr.GetLayerDefn().GetFieldCount() == 6
), lyr.GetLayerDefn().GetGeomFieldCount()
feat = lyr.GetNextFeature()
assert feat is not None
ds = None
if test_cli_utilities.get_test_ogrsf_path() is not None:
f = open("tmp/ogr_vrt_33.vrt", "wb")
f.write(ds_str.encode("ascii"))
f.close()
ret = gdaltest.runexternal(
test_cli_utilities.get_test_ogrsf_path() + " -ro tmp/ogr_vrt_33.vrt"
)
os.unlink("tmp/ogr_vrt_33.vrt")
assert ret.find("INFO") != -1 and ret.find("ERROR") == -1
ds = ogr.Open("tmp/ogr_vrt_33")
sql_lyr = ds.ExecuteSQL("SELECT * FROM test UNION ALL SELECT * FROM test2")
geom_fields = [
["geom__WKT_EPSG_4326_POINT", ogr.wkbPoint, "4326"],
["geom__WKT_EPSG_32632_POLYGON", ogr.wkbPolygon, "32632"],
["geom__WKT_EPSG_4326_LINESTRING", ogr.wkbLineString, "4326"],
["geom__WKT_EPSG_32631_POINT", ogr.wkbPoint, "32631"],
]
assert sql_lyr.GetLayerDefn().GetGeomFieldCount() == len(geom_fields)
for i, geom_field in enumerate(geom_fields):
assert sql_lyr.GetLayerDefn().GetGeomFieldDefn(i).GetName() == geom_field[0]
assert sql_lyr.GetLayerDefn().GetGeomFieldDefn(i).GetType() == geom_field[1]
assert (
sql_lyr.GetLayerDefn()
.GetGeomFieldDefn(i)
.GetSpatialRef()
.ExportToWkt()
.find(geom_field[2])
>= 0
)
feat = sql_lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (1 2)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((0 0,0 1,1 1,1 0,0 0))"
or feat.GetGeomFieldRef(2) is not None
or feat.GetGeomFieldRef(3) is not None
):
feat.DumpReadable()
pytest.fail()
feat = sql_lyr.GetNextFeature()
if (
feat.GetGeomFieldRef(0).ExportToWkt() != "POINT (3 4)"
or feat.GetGeomFieldRef(1).ExportToWkt() != "POLYGON ((1 1,1 2,2 2,2 1,1 1))"
or feat.GetGeomFieldRef(2) is not None
or feat.GetGeomFieldRef(3).ExportToWkt() != "POINT (5 6)"
):
feat.DumpReadable()
pytest.fail()
ds.ReleaseResultSet(sql_lyr)
ds = None
###############################################################################
# Test SetIgnoredFields() with with PointFromColumns geometries
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_34():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("x,y\n".encode("ascii"))
f.write("2,49\n".encode("ascii"))
f.close()
try:
os.remove("tmp/test.csvt")
except OSError:
pass
vrt_xml = """
<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="0">tmp/test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="PointFromColumns" x="x" y="y"/>
<Field name="x" type="Real"/>
<Field name="y" type="Real"/>
</OGRVRTLayer>
</OGRVRTDataSource>"""
ds = ogr.Open(vrt_xml)
lyr = ds.GetLayerByName("test")
lyr.SetIgnoredFields(["x", "y"])
f = lyr.GetNextFeature()
if f.GetGeometryRef().ExportToWkt() != "POINT (2 49)":
f.DumpReadable()
pytest.fail()
ds = None
os.unlink("tmp/test.csv")
###############################################################################
# Test nullable fields
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_35():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("c1,c2,WKT,WKT2\n".encode("ascii"))
f.write('1,,"POINT(2 49),"\n'.encode("ascii"))
f.close()
try:
os.remove("tmp/test.csvt")
except OSError:
pass
# Explicit nullable
f = open("tmp/test.vrt", "wb")
f.write(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="1">test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<GeometryField encoding="WKT" field="WKT" name="g1" nullable="false"/>
<GeometryField encoding="WKT" field="WKT2" name="g2"/>
<Field name="c1" type="Integer" nullable="false"/>
<Field name="c2" type="Integer"/>
</OGRVRTLayer>
</OGRVRTDataSource>""".encode(
"ascii"
)
)
f.close()
ds = ogr.Open("tmp/test.vrt")
lyr = ds.GetLayerByName("test")
assert lyr.GetLayerDefn().GetFieldDefn(0).IsNullable() == 0
assert lyr.GetLayerDefn().GetFieldDefn(1).IsNullable() == 1
assert lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 0
assert lyr.GetLayerDefn().GetGeomFieldDefn(1).IsNullable() == 1
ds = None
# Implicit nullable
f = open("tmp/test2.vrt", "wb")
f.write(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="1">test.vrt</SrcDataSource>
<SrcLayer>test</SrcLayer>
</OGRVRTLayer>
</OGRVRTDataSource>""".encode(
"ascii"
)
)
f.close()
ds = ogr.Open("tmp/test2.vrt")
lyr = ds.GetLayerByName("test")
assert lyr.GetLayerDefn().GetFieldDefn(0).IsNullable() == 0
assert lyr.GetLayerDefn().GetFieldDefn(1).IsNullable() == 1
assert lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() == 0
assert lyr.GetLayerDefn().GetGeomFieldDefn(1).IsNullable() == 1
ds = None
os.unlink("tmp/test.csv")
os.unlink("tmp/test.vrt")
os.unlink("tmp/test2.vrt")
###############################################################################
# Test field alternative name and comment
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_alternative_name_comment():
if gdaltest.vrt_ds is None:
pytest.skip()
f = open("tmp/test.csv", "wb")
f.write("c1,c2\n".encode("ascii"))
f.write('1,,"\n'.encode("ascii"))
f.close()
try:
os.remove("tmp/test.csvt")
except OSError:
pass
f = open("tmp/test.vrt", "wb")
f.write(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource relativeToVRT="1">test.csv</SrcDataSource>
<SrcLayer>test</SrcLayer>
<Field name="c1" type="Integer" alternativeName="alternative_name" comment="this is a comment"/>
<Field name="c2" type="Integer"/>
</OGRVRTLayer>
</OGRVRTDataSource>""".encode(
"ascii"
)
)
f.close()
ds = ogr.Open("tmp/test.vrt")
lyr = ds.GetLayerByName("test")
assert lyr.GetLayerDefn().GetFieldDefn(0).GetName() == "c1"
assert lyr.GetLayerDefn().GetFieldDefn(0).GetAlternativeName() == "alternative_name"
assert lyr.GetLayerDefn().GetFieldDefn(0).GetComment() == "this is a comment"
assert lyr.GetLayerDefn().GetFieldDefn(1).GetName() == "c2"
assert lyr.GetLayerDefn().GetFieldDefn(1).GetAlternativeName() == ""
assert lyr.GetLayerDefn().GetFieldDefn(1).GetComment() == ""
ds = None
os.unlink("tmp/test.csv")
os.unlink("tmp/test.vrt")
###############################################################################
# Test editing direct geometries
def test_ogr_vrt_36():
if gdaltest.vrt_ds is None:
pytest.skip()
ds = ogr.GetDriverByName("ESRI Shapefile").CreateDataSource(
"/vsimem/ogr_vrt_36.shp"
)
lyr = ds.CreateLayer("ogr_vrt_36", geom_type=ogr.wkbPoint)
lyr.CreateField(ogr.FieldDefn("id"))
f = ogr.Feature(lyr.GetLayerDefn())
f["id"] = "1"
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT (0 1)"))
lyr.CreateFeature(f)
f = None
ds = None
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_36.vrt",
"""<OGRVRTDataSource>
<OGRVRTLayer name="ogr_vrt_36">
<SrcDataSource relativeToVRT="1">/vsimem/ogr_vrt_36.shp</SrcDataSource>
<GeometryType>wkbPoint</GeometryType>
<LayerSRS>WGS84</LayerSRS>
</OGRVRTLayer>
</OGRVRTDataSource>""",
)
ds = ogr.Open("/vsimem/ogr_vrt_36.vrt", update=1)
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
ds = None
ds = ogr.Open("/vsimem/ogr_vrt_36.shp")
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
if f["id"] != "1":
f.DumpReadable()
pytest.fail()
ds = None
ogr.GetDriverByName("ESRI Shapefile").DeleteDataSource("/vsimem/ogr_vrt_36.shp")
gdal.Unlink("/vsimem/ogr_vrt_36.vrt")
###############################################################################
# Test implicit non-spatial layers (#6336)
def test_ogr_vrt_37():
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test6")
assert lyr.GetGeomType() == ogr.wkbNone
with gdaltest.error_handler():
ds = ogr.Open("data/vrt/vrt_test.vrt")
lyr = ds.GetLayerByName("test6")
assert lyr.GetLayerDefn().GetGeomFieldCount() == 0
###############################################################################
# Test reading geometry type
def test_ogr_vrt_38():
if gdaltest.vrt_ds is None:
pytest.skip()
types = [
["Point", ogr.wkbPoint],
["LineString", ogr.wkbLineString],
["Polygon", ogr.wkbPolygon],
["MultiPoint", ogr.wkbMultiPoint],
["MultiLineString", ogr.wkbMultiLineString],
["MultiPolygon", ogr.wkbMultiPolygon],
["GeometryCollection", ogr.wkbGeometryCollection],
["CircularString", ogr.wkbCircularString],
["CompoundCurve", ogr.wkbCompoundCurve],
["CurvePolygon", ogr.wkbCurvePolygon],
["MultiCurve", ogr.wkbMultiCurve],
["MultiSurface", ogr.wkbMultiSurface],
["Curve", ogr.wkbCurve],
["Surface", ogr.wkbSurface],
]
for (type_str, ogr_type) in types:
for qualifier in ["", "Z", "M", "ZM", "25D"]:
if qualifier == "Z" and ogr_type <= ogr.wkbGeometryCollection:
continue
if qualifier == "25D" and ogr_type > ogr.wkbGeometryCollection:
continue
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_38.vrt",
"""<OGRVRTDataSource>
<OGRVRTLayer name="ogr_vrt_38">
<SrcDataSource relativeToVRT="1">/vsimem/ogr_vrt_38.shp</SrcDataSource>
<GeometryType>wkb%s%s</GeometryType>
</OGRVRTLayer>
</OGRVRTDataSource>"""
% (type_str, qualifier),
)
expected_geom_type = ogr_type
if qualifier == "Z" or qualifier == "ZM" or qualifier == "25D":
expected_geom_type = ogr.GT_SetZ(expected_geom_type)
if qualifier == "M" or qualifier == "ZM":
expected_geom_type = ogr.GT_SetM(expected_geom_type)
ds = ogr.Open("/vsimem/ogr_vrt_38.vrt", update=1)
lyr = ds.GetLayer(0)
assert lyr.GetGeomType() == expected_geom_type, (
type_str,
qualifier,
lyr.GetGeomType(),
)
gdal.Unlink("/vsimem/ogr_vrt_38.vrt")
###############################################################################
# Test that attribute filtering works with <FID>
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_39():
if gdaltest.vrt_ds is None:
pytest.skip()
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_39.csv",
"""my_fid,val
30,1
25,2
""",
)
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_39.csvt",
"""Integer,Integer
""",
)
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_39.vrt",
"""<OGRVRTDataSource>
<OGRVRTLayer name="ogr_vrt_39">
<SrcDataSource relativeToVRT="1">/vsimem/ogr_vrt_39.csv</SrcDataSource>
<GeometryType>wkbNone</GeometryType>
<FID>my_fid</FID>
<Field name="val" type="Integer" src="val"/>
</OGRVRTLayer>
</OGRVRTDataSource>""",
)
ds = ogr.Open("/vsimem/ogr_vrt_39.vrt")
lyr = ds.GetLayer(0)
lyr.SetAttributeFilter("fid = 25")
f = lyr.GetNextFeature()
if f["val"] != 2:
f.DumpReadable()
pytest.fail()
ds = None
gdal.Unlink("/vsimem/ogr_vrt_39.csv")
gdal.Unlink("/vsimem/ogr_vrt_39.csvt")
gdal.Unlink("/vsimem/ogr_vrt_39.vrt")
###############################################################################
# Test PointZM support with encoding="PointFromColumns"
@pytest.mark.require_driver("CSV")
def test_ogr_vrt_40():
if gdaltest.vrt_ds is None:
pytest.skip()
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_40.csv",
"""id,x,y,z,m
1,1,2,3,4
""",
)
gdal.FileFromMemBuffer(
"/vsimem/ogr_vrt_40.vrt",
"""<OGRVRTDataSource>
<OGRVRTLayer name="ogr_vrt_40">
<SrcDataSource relativeToVRT="1">ogr_vrt_40.csv</SrcDataSource>
<SrcLayer>ogr_vrt_40</SrcLayer>
<GeometryField encoding="PointFromColumns" x="x" y="y" z="z" m="m"/>
</OGRVRTLayer>
</OGRVRTDataSource>
""",
)
ds = ogr.Open("/vsimem/ogr_vrt_40.vrt")
lyr = ds.GetLayer(0)
assert lyr.GetGeomType() == ogr.wkbPointZM
f = lyr.GetNextFeature()
if f.GetGeometryRef().ExportToIsoWkt() != "POINT ZM (1 2 3 4)":
f.DumpReadable()
pytest.fail()
ds = None
gdal.Unlink("/vsimem/ogr_vrt_40.csv")
gdal.Unlink("/vsimem/ogr_vrt_40.vrt")
###############################################################################
# Test GetExtent() on erroneous definition
def test_ogr_vrt_41():
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>/i_dont/exist</SrcDataSource>
</OGRVRTLayer>
</OGRVRTDataSource>"""
)
lyr = ds.GetLayer(0)
with gdaltest.error_handler():
lyr.GetExtent()
###############################################################################
# Test reading nullable, default, unique
def test_ogr_vrt_nullable_unique():
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTLayer name="poly">
<SrcDataSource>data/poly.shp</SrcDataSource>
<SrcLayer>poly</SrcLayer>
<GeometryType>wkbPolygon</GeometryType>
<Field name="area" type="Real"/>
<Field name="eas_id" type="Integer64" width="11" nullable="false" unique="true"/>
<Field name="prfedea" type="String"/>
</OGRVRTLayer>
</OGRVRTDataSource>
"""
)
lyr = ds.GetLayer(0)
feat_defn = lyr.GetLayerDefn()
assert feat_defn.GetFieldDefn(0).IsNullable()
assert not feat_defn.GetFieldDefn(0).IsUnique()
assert not feat_defn.GetFieldDefn(1).IsNullable()
assert feat_defn.GetFieldDefn(1).IsUnique()
###############################################################################
# Test field names with same case
def test_ogr_vrt_field_names_same_case():
ds = ogr.Open(
"""<OGRVRTDataSource>
<OGRVRTLayer name="test">
<SrcDataSource>{"type":"Feature","id":"foo","geometry":null,"properties":{"ID":"bar"}}</SrcDataSource>
<SrcLayer>OGRGeoJSON</SrcLayer>
<Field name="id" type="String" src="id"/>
<Field name="id_from_uc" type="String" src="ID"/>
</OGRVRTLayer>
</OGRVRTDataSource>
"""
)
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
assert f["id"] == "foo"
assert f["id_from_uc"] == "bar"
###############################################################################
#
def test_ogr_vrt_cleanup():
if gdaltest.vrt_ds is None:
pytest.skip()
gdal.Unlink("/vsimem/rec1.vrt")
gdal.Unlink("/vsimem/rec2.vrt")
try:
os.unlink("tmp/ogr_vrt_33.vrt")
except OSError:
pass
try:
import shutil
shutil.rmtree("tmp/ogr_vrt_33")
except OSError:
pass
gdaltest.vrt_ds = None