1193 строки
38 KiB
Python
Исполняемый файл
1193 строки
38 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
# -*- coding: utf-8 -*-
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: Test VSI file primitives
|
|
# Author: Even Rouault <even dot rouault at spatialys.com>
|
|
#
|
|
###############################################################################
|
|
# Copyright (c) 2011-2013, Even Rouault <even dot rouault at spatialys.com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
import gdaltest
|
|
import pytest
|
|
from lxml import etree
|
|
|
|
from osgeo import gdal
|
|
|
|
|
|
###############################################################################
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def module_disable_exceptions():
|
|
with gdaltest.disable_exceptions():
|
|
yield
|
|
|
|
|
|
###############################################################################
|
|
# Generic test
|
|
|
|
|
|
def vsifile_generic(filename):
|
|
|
|
start_time = time.time()
|
|
|
|
fp = gdal.VSIFOpenL(filename, "wb+")
|
|
assert fp is not None
|
|
|
|
assert gdal.VSIFWriteL("0123456789", 1, 10, fp) == 10
|
|
|
|
assert gdal.VSIFFlushL(fp) == 0
|
|
|
|
assert gdal.VSIFTruncateL(fp, 20) == 0
|
|
|
|
assert gdal.VSIFTellL(fp) == 10
|
|
|
|
assert gdal.VSIFTruncateL(fp, 5) == 0
|
|
|
|
assert gdal.VSIFTellL(fp) == 10
|
|
|
|
assert gdal.VSIFSeekL(fp, 0, 2) == 0
|
|
|
|
assert gdal.VSIFTellL(fp) == 5
|
|
|
|
gdal.VSIFWriteL("XX", 1, 2, fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
statBuf = gdal.VSIStatL(
|
|
filename,
|
|
gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG | gdal.VSI_STAT_SIZE_FLAG,
|
|
)
|
|
assert statBuf.size == 7
|
|
assert start_time == pytest.approx(statBuf.mtime, abs=2)
|
|
|
|
fp = gdal.VSIFOpenL(filename, "rb")
|
|
assert gdal.VSIFReadL(1, 0, fp) is None
|
|
assert gdal.VSIFReadL(0, 1, fp) is None
|
|
buf = gdal.VSIFReadL(1, 7, fp)
|
|
assert gdal.VSIFWriteL("a", 1, 1, fp) == 0
|
|
assert gdal.VSIFTruncateL(fp, 0) != 0
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
assert buf.decode("ascii") == "01234XX"
|
|
|
|
# Test append mode on existing file
|
|
fp = gdal.VSIFOpenL(filename, "ab")
|
|
gdal.VSIFWriteL("XX", 1, 2, fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
statBuf = gdal.VSIStatL(
|
|
filename,
|
|
gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG | gdal.VSI_STAT_SIZE_FLAG,
|
|
)
|
|
assert statBuf.size == 9
|
|
|
|
assert gdal.Unlink(filename) == 0
|
|
|
|
statBuf = gdal.VSIStatL(filename, gdal.VSI_STAT_EXISTS_FLAG)
|
|
assert statBuf is None
|
|
|
|
# Test append mode on non existing file
|
|
fp = gdal.VSIFOpenL(filename, "ab")
|
|
gdal.VSIFWriteL("XX", 1, 2, fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
statBuf = gdal.VSIStatL(
|
|
filename,
|
|
gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG | gdal.VSI_STAT_SIZE_FLAG,
|
|
)
|
|
assert statBuf.size == 2
|
|
|
|
assert gdal.Unlink(filename) == 0
|
|
|
|
|
|
###############################################################################
|
|
# Test /vsimem
|
|
|
|
|
|
def test_vsifile_1():
|
|
vsifile_generic("/vsimem/vsifile_1.bin")
|
|
|
|
|
|
###############################################################################
|
|
# Test regular file system
|
|
|
|
|
|
def test_vsifile_2():
|
|
vsifile_generic("tmp/vsifile_2.bin")
|
|
|
|
|
|
###############################################################################
|
|
# Test ftruncate >= 32 bit
|
|
|
|
|
|
def test_vsifile_3():
|
|
|
|
if not gdaltest.filesystem_supports_sparse_files("tmp"):
|
|
pytest.skip()
|
|
|
|
filename = "tmp/vsifile_3"
|
|
|
|
fp = gdal.VSIFOpenL(filename, "wb+")
|
|
gdal.VSIFTruncateL(fp, 10 * 1024 * 1024 * 1024)
|
|
gdal.VSIFSeekL(fp, 0, 2)
|
|
pos = gdal.VSIFTellL(fp)
|
|
if pos != 10 * 1024 * 1024 * 1024:
|
|
gdal.VSIFCloseL(fp)
|
|
gdal.Unlink(filename)
|
|
pytest.fail(pos)
|
|
gdal.VSIFSeekL(fp, 0, 0)
|
|
gdal.VSIFSeekL(fp, pos, 0)
|
|
pos = gdal.VSIFTellL(fp)
|
|
if pos != 10 * 1024 * 1024 * 1024:
|
|
gdal.VSIFCloseL(fp)
|
|
gdal.Unlink(filename)
|
|
pytest.fail(pos)
|
|
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
statBuf = gdal.VSIStatL(
|
|
filename,
|
|
gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG | gdal.VSI_STAT_SIZE_FLAG,
|
|
)
|
|
gdal.Unlink(filename)
|
|
|
|
assert statBuf.size == 10 * 1024 * 1024 * 1024
|
|
|
|
|
|
###############################################################################
|
|
# Test fix for #4583 (short reads)
|
|
|
|
|
|
def test_vsifile_4():
|
|
|
|
fp = gdal.VSIFOpenL("vsifile.py", "rb")
|
|
data = gdal.VSIFReadL(1000000, 1, fp)
|
|
# print(len(data))
|
|
gdal.VSIFSeekL(fp, 0, 0)
|
|
data = gdal.VSIFReadL(1, 1000000, fp)
|
|
assert data
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
|
|
###############################################################################
|
|
# Test vsicache
|
|
|
|
|
|
@pytest.mark.parametrize("cache_size", ("0", "65536", None))
|
|
def test_vsifile_5(cache_size):
|
|
|
|
fp = gdal.VSIFOpenL("tmp/vsifile_5.bin", "wb")
|
|
ref_data = "".join(["%08X" % i for i in range(5 * 32768)])
|
|
gdal.VSIFWriteL(ref_data, 1, len(ref_data), fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
with gdal.config_options({"VSI_CACHE": "YES", "VSI_CACHE_SIZE": cache_size}):
|
|
fp = gdal.VSIFOpenL("tmp/vsifile_5.bin", "rb")
|
|
|
|
gdal.VSIFSeekL(fp, 50000, 0)
|
|
if gdal.VSIFTellL(fp) != 50000:
|
|
pytest.fail()
|
|
|
|
gdal.VSIFSeekL(fp, 50000, 1)
|
|
if gdal.VSIFTellL(fp) != 100000:
|
|
pytest.fail()
|
|
|
|
gdal.VSIFSeekL(fp, 0, 2)
|
|
if gdal.VSIFTellL(fp) != 5 * 32768 * 8:
|
|
pytest.fail()
|
|
gdal.VSIFReadL(1, 1, fp)
|
|
|
|
gdal.VSIFSeekL(fp, 0, 0)
|
|
data = gdal.VSIFReadL(1, 3 * 32768, fp)
|
|
if data.decode("ascii") != ref_data[0 : 3 * 32768]:
|
|
pytest.fail()
|
|
|
|
gdal.VSIFSeekL(fp, 16384, 0)
|
|
data = gdal.VSIFReadL(1, 5 * 32768, fp)
|
|
if data.decode("ascii") != ref_data[16384 : 16384 + 5 * 32768]:
|
|
pytest.fail()
|
|
|
|
data = gdal.VSIFReadL(1, 50 * 32768, fp)
|
|
if data[0:1130496].decode("ascii") != ref_data[16384 + 5 * 32768 :]:
|
|
pytest.fail()
|
|
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
gdal.Unlink("tmp/vsifile_5.bin")
|
|
|
|
|
|
###############################################################################
|
|
# Test vsicache an read errors (https://github.com/qgis/QGIS/issues/45293)
|
|
|
|
|
|
def test_vsifile_vsicache_read_error():
|
|
|
|
tmpfilename = "tmp/test_vsifile_vsicache_read_error.bin"
|
|
f = gdal.VSIFOpenL(tmpfilename, "wb")
|
|
assert f
|
|
try:
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
|
|
with gdaltest.config_option("VSI_CACHE", "YES"):
|
|
f2 = gdal.VSIFOpenL(tmpfilename, "rb")
|
|
assert f2
|
|
try:
|
|
gdal.VSIFSeekL(f2, 500 * 1000, 0)
|
|
|
|
# Truncate the file to simulate a read error
|
|
gdal.VSIFTruncateL(f, 0)
|
|
|
|
assert len(gdal.VSIFReadL(1, 5000 * 1000, f2)) == 0
|
|
|
|
# Extend the file again
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
|
|
# Read again
|
|
# Note: reading after truncating / extending seems to not play
|
|
# very well with FILE* and depends on the libc implementation,
|
|
# in particular musl seems to behave differently from glibc and BSDs
|
|
gdal.VSIFSeekL(f2, 500 * 1000, 0)
|
|
# just test we don't crash
|
|
gdal.VSIFReadL(1, 50 * 1000, f2)
|
|
# assert len(gdal.VSIFReadL(1, 50 * 1000, f2)) == 50 * 1000
|
|
|
|
# Truncate the file to simulate a read error
|
|
gdal.VSIFTruncateL(f, 10)
|
|
|
|
CHUNK_SIZE = 32768
|
|
|
|
gdal.VSIFSeekL(f2, 0, 0)
|
|
assert len(gdal.VSIFReadL(1, CHUNK_SIZE, f2)) == 10
|
|
|
|
gdal.VSIFSeekL(f2, 100, 0)
|
|
assert len(gdal.VSIFReadL(1, CHUNK_SIZE, f2)) == 0
|
|
|
|
finally:
|
|
gdal.VSIFCloseL(f2)
|
|
finally:
|
|
gdal.VSIFCloseL(f)
|
|
gdal.Unlink(tmpfilename)
|
|
|
|
|
|
###############################################################################
|
|
# Test vsicache above 2 GB
|
|
|
|
|
|
def test_vsifile_6():
|
|
|
|
if not gdaltest.filesystem_supports_sparse_files("tmp"):
|
|
pytest.skip()
|
|
|
|
offset = 4 * 1024 * 1024 * 1024
|
|
|
|
ref_data = "abcd".encode("ascii")
|
|
fp = gdal.VSIFOpenL("tmp/vsifile_6.bin", "wb")
|
|
gdal.VSIFSeekL(fp, offset, 0)
|
|
gdal.VSIFWriteL(ref_data, 1, len(ref_data), fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
# Sanity check without VSI_CACHE
|
|
fp = gdal.VSIFOpenL("tmp/vsifile_6.bin", "rb")
|
|
gdal.VSIFSeekL(fp, offset, 0)
|
|
got_data = gdal.VSIFReadL(1, len(ref_data), fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
assert ref_data == got_data
|
|
|
|
# Real test now
|
|
with gdal.config_option("VSI_CACHE", "YES"):
|
|
fp = gdal.VSIFOpenL("tmp/vsifile_6.bin", "rb")
|
|
gdal.VSIFSeekL(fp, offset, 0)
|
|
got_data = gdal.VSIFReadL(1, len(ref_data), fp)
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
assert ref_data == got_data
|
|
|
|
gdal.Unlink("tmp/vsifile_6.bin")
|
|
|
|
|
|
###############################################################################
|
|
# Test limit cases on /vsimem
|
|
|
|
|
|
def test_vsifile_7():
|
|
|
|
if gdal.GetConfigOption("SKIP_MEM_INTENSIVE_TEST") is not None:
|
|
pytest.skip()
|
|
|
|
# Test extending file beyond reasonable limits in write mode
|
|
fp = gdal.VSIFOpenL("/vsimem/vsifile_7.bin", "wb")
|
|
assert gdal.VSIFSeekL(fp, 0x7FFFFFFFFFFFFFFF, 0) == 0
|
|
assert gdal.VSIStatL("/vsimem/vsifile_7.bin").size == 0
|
|
with gdaltest.error_handler():
|
|
ret = gdal.VSIFWriteL("a", 1, 1, fp)
|
|
assert ret == 0
|
|
assert gdal.VSIStatL("/vsimem/vsifile_7.bin").size == 0
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
# Test seeking beyond file size in read-only mode
|
|
fp = gdal.VSIFOpenL("/vsimem/vsifile_7.bin", "rb")
|
|
assert gdal.VSIFSeekL(fp, 0x7FFFFFFFFFFFFFFF, 0) == 0
|
|
assert gdal.VSIFEofL(fp) == 0
|
|
assert gdal.VSIFTellL(fp) == 0x7FFFFFFFFFFFFFFF
|
|
assert not gdal.VSIFReadL(1, 1, fp)
|
|
assert gdal.VSIFEofL(fp) == 1
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
gdal.Unlink("/vsimem/vsifile_7.bin")
|
|
|
|
|
|
###############################################################################
|
|
# Test renaming directory in /vsimem
|
|
|
|
|
|
def test_vsifile_8():
|
|
|
|
# octal 0666 = decimal 438
|
|
gdal.Mkdir("/vsimem/mydir", 438)
|
|
fp = gdal.VSIFOpenL("/vsimem/mydir/a", "wb")
|
|
gdal.VSIFCloseL(fp)
|
|
gdal.Rename("/vsimem/mydir", "/vsimem/newdir".encode("ascii").decode("ascii"))
|
|
assert gdal.VSIStatL("/vsimem/newdir") is not None
|
|
assert gdal.VSIStatL("/vsimem/newdir/a") is not None
|
|
gdal.Unlink("/vsimem/newdir/a")
|
|
gdal.Rmdir("/vsimem/newdir")
|
|
|
|
|
|
###############################################################################
|
|
# Test ReadDir()
|
|
|
|
|
|
def test_vsifile_9():
|
|
|
|
lst = gdal.ReadDir(".")
|
|
assert len(lst) >= 4
|
|
# Test truncation
|
|
lst_truncated = gdal.ReadDir(".", int(len(lst) / 2))
|
|
assert len(lst_truncated) > int(len(lst) / 2)
|
|
|
|
gdal.Mkdir("/vsimem/mydir", 438)
|
|
for i in range(10):
|
|
fp = gdal.VSIFOpenL("/vsimem/mydir/%d" % i, "wb")
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
lst = gdal.ReadDir("/vsimem/mydir")
|
|
assert len(lst) >= 4
|
|
# Test truncation
|
|
lst_truncated = gdal.ReadDir("/vsimem/mydir", int(len(lst) / 2))
|
|
assert len(lst_truncated) > int(len(lst) / 2)
|
|
|
|
for i in range(10):
|
|
gdal.Unlink("/vsimem/mydir/%d" % i)
|
|
gdal.Rmdir("/vsimem/mydir")
|
|
|
|
|
|
###############################################################################
|
|
# Test fuzzer friendly archive
|
|
|
|
|
|
def test_vsifile_10():
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/vsifile_10.tar",
|
|
"""FUZZER_FRIENDLY_ARCHIVE
|
|
***NEWFILE***:test.txt
|
|
abc***NEWFILE***:huge.txt
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
|
|
0123456789012345678901234567890123456789012345678901234567890123456789012345678X
|
|
***NEWFILE***:small.txt
|
|
a""",
|
|
)
|
|
contents = gdal.ReadDir("/vsitar//vsimem/vsifile_10.tar")
|
|
if contents is None:
|
|
gdal.Unlink("/vsimem/vsifile_10.tar")
|
|
pytest.skip()
|
|
assert contents == ["test.txt", "huge.txt", "small.txt"]
|
|
assert gdal.VSIStatL("/vsitar//vsimem/vsifile_10.tar/test.txt").size == 3
|
|
assert gdal.VSIStatL("/vsitar//vsimem/vsifile_10.tar/huge.txt").size == 3888
|
|
assert gdal.VSIStatL("/vsitar//vsimem/vsifile_10.tar/small.txt").size == 1
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/vsifile_10.tar",
|
|
"""FUZZER_FRIENDLY_ARCHIVE
|
|
***NEWFILE***:x
|
|
abc""",
|
|
)
|
|
contents = gdal.ReadDir("/vsitar//vsimem/vsifile_10.tar")
|
|
assert contents == ["x"]
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/vsifile_10.tar",
|
|
"""FUZZER_FRIENDLY_ARCHIVE
|
|
***NEWFILE***:x
|
|
abc***NEWFILE***:""",
|
|
)
|
|
contents = gdal.ReadDir("/vsitar//vsimem/vsifile_10.tar")
|
|
assert contents == ["x"]
|
|
|
|
gdal.Unlink("/vsimem/vsifile_10.tar")
|
|
|
|
|
|
###############################################################################
|
|
# Test generic Truncate implementation for file extension
|
|
|
|
|
|
def test_vsifile_11():
|
|
f = gdal.VSIFOpenL("/vsimem/vsifile_11", "wb")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
f = gdal.VSIFOpenL("/vsisubfile/0_,/vsimem/vsifile_11", "wb")
|
|
gdal.VSIFWriteL("0123456789", 1, 10, f)
|
|
assert gdal.VSIFTruncateL(f, 10 + 4096 + 2) == 0
|
|
assert gdal.VSIFTellL(f) == 10
|
|
assert gdal.VSIFTruncateL(f, 0) == -1
|
|
gdal.VSIFCloseL(f)
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/vsifile_11", "rb")
|
|
data = gdal.VSIFReadL(1, 10 + 4096 + 2, f)
|
|
gdal.VSIFCloseL(f)
|
|
import struct
|
|
|
|
data = struct.unpack("B" * len(data), data)
|
|
assert (
|
|
data[0] == 48
|
|
and data[9] == 57
|
|
and data[10] == 0
|
|
and data[10 + 4096 + 2 - 1] == 0
|
|
)
|
|
|
|
gdal.Unlink("/vsimem/vsifile_11")
|
|
|
|
|
|
###############################################################################
|
|
# Test regular file system sparse file support
|
|
|
|
|
|
def test_vsifile_12():
|
|
|
|
target_dir = "tmp"
|
|
|
|
if gdal.VSISupportsSparseFiles(target_dir) == 0:
|
|
pytest.skip()
|
|
|
|
# Minimum value to make it work on NTFS
|
|
block_size = 65536
|
|
f = gdal.VSIFOpenL(target_dir + "/vsifile_12", "wb")
|
|
gdal.VSIFWriteL("a", 1, 1, f)
|
|
assert gdal.VSIFTruncateL(f, block_size * 2) == 0
|
|
ret = gdal.VSIFGetRangeStatusL(f, 0, 1)
|
|
# We could get unknown on nfs
|
|
if ret == gdal.VSI_RANGE_STATUS_UNKNOWN:
|
|
print("Range status unknown")
|
|
else:
|
|
assert ret == gdal.VSI_RANGE_STATUS_DATA
|
|
ret = gdal.VSIFGetRangeStatusL(f, block_size * 2 - 1, 1)
|
|
assert ret == gdal.VSI_RANGE_STATUS_HOLE
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink(target_dir + "/vsifile_12")
|
|
|
|
|
|
###############################################################################
|
|
# Test reading filename with prefixes without terminating slash
|
|
|
|
|
|
def test_vsifile_13():
|
|
|
|
gdal.VSIFOpenL("/vsigzip", "rb")
|
|
gdal.VSIFOpenL("/vsizip", "rb")
|
|
gdal.VSIFOpenL("/vsitar", "rb")
|
|
gdal.VSIFOpenL("/vsimem", "rb")
|
|
gdal.VSIFOpenL("/vsisparse", "rb")
|
|
gdal.VSIFOpenL("/vsisubfile", "rb")
|
|
gdal.VSIFOpenL("/vsicurl", "rb")
|
|
gdal.VSIFOpenL("/vsis3", "rb")
|
|
gdal.VSIFOpenL("/vsicurl_streaming", "rb")
|
|
gdal.VSIFOpenL("/vsis3_streaming", "rb")
|
|
gdal.VSIFOpenL("/vsistdin", "rb")
|
|
|
|
fp = gdal.VSIFOpenL("/vsistdout", "wb")
|
|
if fp is not None:
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
gdal.VSIStatL("/vsigzip")
|
|
gdal.VSIStatL("/vsizip")
|
|
gdal.VSIStatL("/vsitar")
|
|
gdal.VSIStatL("/vsimem")
|
|
gdal.VSIStatL("/vsisparse")
|
|
gdal.VSIStatL("/vsisubfile")
|
|
gdal.VSIStatL("/vsicurl")
|
|
gdal.VSIStatL("/vsis3")
|
|
gdal.VSIStatL("/vsicurl_streaming")
|
|
gdal.VSIStatL("/vsis3_streaming")
|
|
gdal.VSIStatL("/vsistdin")
|
|
gdal.VSIStatL("/vsistdout")
|
|
|
|
|
|
###############################################################################
|
|
# Check performance issue (https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=1673)
|
|
|
|
|
|
def test_vsifile_14():
|
|
|
|
with gdaltest.error_handler():
|
|
gdal.VSIFOpenL(
|
|
"/vsitar//vsitar//vsitar//vsitar//vsitar//vsitar//vsitar//vsitar/a.tgzb.tgzc.tgzd.tgze.tgzf.tgz.h.tgz.i.tgz",
|
|
"rb",
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test issue with Eof() not detecting end of corrupted gzip stream (#6944)
|
|
|
|
|
|
def test_vsifile_15():
|
|
|
|
fp = gdal.VSIFOpenL("/vsigzip/data/corrupted_z_buf_error.gz", "rb")
|
|
assert fp is not None
|
|
file_len = 0
|
|
while not gdal.VSIFEofL(fp):
|
|
with gdaltest.error_handler():
|
|
file_len += len(gdal.VSIFReadL(1, 4, fp))
|
|
assert file_len == 6469
|
|
|
|
with gdaltest.error_handler():
|
|
file_len += len(gdal.VSIFReadL(1, 4, fp))
|
|
assert file_len == 6469
|
|
|
|
with gdaltest.error_handler():
|
|
assert gdal.VSIFSeekL(fp, 0, 2) != 0
|
|
|
|
assert gdal.VSIFSeekL(fp, 0, 0) == 0
|
|
|
|
len_read = len(gdal.VSIFReadL(1, file_len, fp))
|
|
assert len_read == file_len
|
|
|
|
gdal.VSIFCloseL(fp)
|
|
|
|
|
|
###############################################################################
|
|
# Test failed gdal.Rename() with exceptions enabled
|
|
|
|
|
|
def test_vsifile_16():
|
|
|
|
with gdal.ExceptionMgr(useExceptions=True):
|
|
with pytest.raises(Exception):
|
|
gdal.Rename("/tmp/i_do_not_exist_vsifile_16.tif", "/tmp/me_neither.tif")
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.GetActualURL() on a non-network based filesystem
|
|
|
|
|
|
def test_vsifile_17():
|
|
|
|
assert gdal.GetActualURL("foo") is None
|
|
|
|
assert gdal.GetSignedURL("foo") is None
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.GetFileSystemsPrefixes()
|
|
|
|
|
|
def test_vsifile_18():
|
|
|
|
prefixes = gdal.GetFileSystemsPrefixes()
|
|
assert "/vsimem/" in prefixes
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.GetFileSystemOptions()
|
|
|
|
|
|
def test_vsifile_19():
|
|
|
|
for prefix in gdal.GetFileSystemsPrefixes():
|
|
options = gdal.GetFileSystemOptions(prefix)
|
|
# Check that the options is XML correct
|
|
if options is not None:
|
|
try:
|
|
etree.fromstring(options)
|
|
except Exception:
|
|
assert False, (prefix, options)
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.VSIFReadL with None fp
|
|
|
|
|
|
def test_vsifile_20():
|
|
|
|
try:
|
|
gdal.VSIFReadL(1, 1, None)
|
|
except ValueError:
|
|
return
|
|
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.VSIGetMemFileBuffer_unsafe() and gdal.VSIFWriteL() reading buffers
|
|
|
|
|
|
def test_vsifile_21():
|
|
|
|
filename = "/vsimem/read.tif"
|
|
filename_write = "/vsimem/write.tif"
|
|
data = "This is some data"
|
|
|
|
vsifile = gdal.VSIFOpenL(filename, "wb")
|
|
assert gdal.VSIFWriteL(data, 1, len(data), vsifile) == len(data)
|
|
gdal.VSIFCloseL(vsifile)
|
|
|
|
vsifile = gdal.VSIFOpenL(filename, "rb")
|
|
gdal.VSIFSeekL(vsifile, 0, 2)
|
|
vsilen = gdal.VSIFTellL(vsifile)
|
|
gdal.VSIFSeekL(vsifile, 0, 0)
|
|
data_read = gdal.VSIFReadL(1, vsilen, vsifile)
|
|
data_mem = gdal.VSIGetMemFileBuffer_unsafe(filename)
|
|
assert data_read == data_mem[:]
|
|
gdal.VSIFCloseL(vsifile)
|
|
vsifile_write = gdal.VSIFOpenL(filename_write, "wb")
|
|
assert gdal.VSIFWriteL(data_mem, 1, len(data_mem), vsifile_write) == len(data_mem)
|
|
gdal.VSIFCloseL(vsifile_write)
|
|
gdal.Unlink(filename)
|
|
gdal.Unlink(filename_write)
|
|
with gdaltest.error_handler():
|
|
data3 = gdal.VSIGetMemFileBuffer_unsafe(filename)
|
|
assert data3 == None
|
|
|
|
|
|
def test_vsifile_22():
|
|
# VSIOpenL doesn't set errorno
|
|
gdal.VSIErrorReset()
|
|
assert gdal.VSIGetLastErrorNo() == 0, (
|
|
"Expected Err=0 after VSIErrorReset(), got %d" % gdal.VSIGetLastErrorNo()
|
|
)
|
|
|
|
fp = gdal.VSIFOpenL("tmp/not-existing", "r")
|
|
assert fp is None, "Expected None from VSIFOpenL"
|
|
assert gdal.VSIGetLastErrorNo() == 0, (
|
|
"Expected Err=0 from VSIFOpenL, got %d" % gdal.VSIGetLastErrorNo()
|
|
)
|
|
|
|
# VSIOpenExL does
|
|
fp = gdal.VSIFOpenExL("tmp/not-existing", "r", 1)
|
|
assert fp is None, "Expected None from VSIFOpenExL"
|
|
assert gdal.VSIGetLastErrorNo() == 1, (
|
|
"Expected Err=1 from VSIFOpenExL, got %d" % gdal.VSIGetLastErrorNo()
|
|
)
|
|
assert len(gdal.VSIGetLastErrorMsg()) != 0, "Expected a VSI error message"
|
|
gdal.VSIErrorReset()
|
|
assert gdal.VSIGetLastErrorNo() == 0, (
|
|
"Expected Err=0 after VSIErrorReset(), got %d" % gdal.VSIGetLastErrorNo()
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test bugfix for https://github.com/OSGeo/gdal/issues/675
|
|
|
|
|
|
def test_vsitar_bug_675():
|
|
|
|
content = gdal.ReadDir("/vsitar/data/tar_with_star_base256_fields.tar")
|
|
assert len(content) == 1
|
|
|
|
|
|
###############################################################################
|
|
# Test multithreaded compression
|
|
|
|
|
|
def test_vsigzip_multi_thread():
|
|
|
|
with gdaltest.config_options(
|
|
{"GDAL_NUM_THREADS": "ALL_CPUS", "CPL_VSIL_DEFLATE_CHUNK_SIZE": "32K"}
|
|
):
|
|
f = gdal.VSIFOpenL("/vsigzip//vsimem/vsigzip_multi_thread.gz", "wb")
|
|
for i in range(100000):
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
f = gdal.VSIFOpenL("/vsigzip//vsimem/vsigzip_multi_thread.gz", "rb")
|
|
data = gdal.VSIFReadL(100000, 5, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/vsigzip_multi_thread.gz")
|
|
|
|
if data != "hello" * 100000:
|
|
for i in range(10000):
|
|
if data[i * 5 : i * 5 + 5] != "hello":
|
|
print(i * 5, data[i * 5 : i * 5 + 5], data[i * 5 - 5 : i * 5 + 5 - 5])
|
|
break
|
|
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test vsisync()
|
|
|
|
|
|
def test_vsisync():
|
|
|
|
with gdaltest.error_handler():
|
|
assert not gdal.Sync("/i_do/not/exist", "/vsimem/")
|
|
|
|
with gdaltest.error_handler():
|
|
assert not gdal.Sync("vsifile.py", "/i_do/not/exist")
|
|
|
|
# Test copying a file
|
|
for i in range(2):
|
|
assert gdal.Sync("vsifile.py", "/vsimem/")
|
|
assert (
|
|
gdal.VSIStatL("/vsimem/vsifile.py").size == gdal.VSIStatL("vsifile.py").size
|
|
)
|
|
gdal.Unlink("/vsimem/vsifile.py")
|
|
|
|
# Test copying the content of a directory
|
|
gdal.Mkdir("/vsimem/test_sync", 0)
|
|
gdal.FileFromMemBuffer("/vsimem/test_sync/foo.txt", "bar")
|
|
gdal.Mkdir("/vsimem/test_sync/subdir", 0)
|
|
gdal.FileFromMemBuffer("/vsimem/test_sync/subdir/bar.txt", "baz")
|
|
|
|
if sys.platform == "linux":
|
|
with gdaltest.error_handler():
|
|
# even root cannot write into /proc
|
|
assert not gdal.Sync("/vsimem/test_sync/", "/proc/i_do_not/exist")
|
|
|
|
assert gdal.Sync("/vsimem/test_sync/", "/vsimem/out")
|
|
assert gdal.ReadDir("/vsimem/out") == ["foo.txt", "subdir"]
|
|
assert gdal.ReadDir("/vsimem/out/subdir") == ["bar.txt"]
|
|
# Again
|
|
assert gdal.Sync("/vsimem/test_sync/", "/vsimem/out")
|
|
|
|
gdal.RmdirRecursive("/vsimem/out")
|
|
|
|
# Test copying a directory
|
|
pct_values = []
|
|
|
|
def my_progress(pct, message, user_data):
|
|
pct_values.append(pct)
|
|
|
|
assert gdal.Sync("/vsimem/test_sync", "/vsimem/out", callback=my_progress)
|
|
|
|
assert pct_values == [0.5, 1.0]
|
|
|
|
assert gdal.ReadDir("/vsimem/out") == ["test_sync"]
|
|
assert gdal.ReadDir("/vsimem/out/test_sync") == ["foo.txt", "subdir"]
|
|
|
|
gdal.RmdirRecursive("/vsimem/test_sync")
|
|
gdal.RmdirRecursive("/vsimem/out")
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.OpenDir()
|
|
|
|
|
|
@pytest.mark.parametrize("basepath", ["/vsimem/", "tmp/"])
|
|
def test_vsifile_opendir(basepath):
|
|
|
|
# Non existing dir
|
|
d = gdal.OpenDir(basepath + "/i_dont_exist")
|
|
assert not d
|
|
|
|
gdal.RmdirRecursive(basepath + "/vsifile_opendir")
|
|
|
|
gdal.Mkdir(basepath + "/vsifile_opendir", 0o755)
|
|
|
|
# Empty dir
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir")
|
|
assert d
|
|
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
|
|
gdal.CloseDir(d)
|
|
|
|
f = gdal.VSIFOpenL(basepath + "/vsifile_opendir/test", "wb")
|
|
assert f
|
|
gdal.VSIFWriteL("foo", 1, 3, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Mkdir(basepath + "/vsifile_opendir/subdir", 0o755)
|
|
gdal.Mkdir(basepath + "/vsifile_opendir/subdir/subdir2", 0o755)
|
|
|
|
f = gdal.VSIFOpenL(basepath + "/vsifile_opendir/subdir/subdir2/test2", "wb")
|
|
assert f
|
|
gdal.VSIFWriteL("bar", 1, 3, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
# Unlimited depth
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir")
|
|
|
|
entries_found = []
|
|
for i in range(4):
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry
|
|
if entry.name == "test":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 32768) != 0
|
|
assert entry.modeKnown
|
|
assert entry.size == 3
|
|
assert entry.sizeKnown
|
|
assert entry.mtime != 0
|
|
assert entry.mtimeKnown
|
|
assert not entry.extra
|
|
elif entry.name == "subdir":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 16384) != 0
|
|
elif entry.name == "subdir/subdir2":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 16384) != 0
|
|
elif entry.name == "subdir/subdir2/test2":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 32768) != 0
|
|
else:
|
|
assert False, entry.name
|
|
assert len(entries_found) == 4, entries_found
|
|
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
# Unlimited depth, do not require stating (only honoured on Unix)
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", -1, ["NAME_AND_TYPE_ONLY=YES"])
|
|
|
|
entries_found = []
|
|
for i in range(4):
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry
|
|
if entry.name == "test":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 32768) != 0
|
|
if os.name == "posix" and basepath == "tmp/":
|
|
assert entry.size == 0
|
|
elif entry.name == "subdir":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 16384) != 0
|
|
elif entry.name == "subdir/subdir2":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 16384) != 0
|
|
elif entry.name == "subdir/subdir2/test2":
|
|
entries_found.append(entry.name)
|
|
assert (entry.mode & 32768) != 0
|
|
if os.name == "posix" and basepath == "tmp/":
|
|
assert entry.size == 0
|
|
else:
|
|
assert False, entry.name
|
|
assert len(entries_found) == 4, entries_found
|
|
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
# Only top level
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", 0)
|
|
entries_found = set()
|
|
for i in range(2):
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry
|
|
entries_found.add(entry.name)
|
|
assert entries_found == set(["test", "subdir"])
|
|
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
# Depth 1
|
|
files = set(
|
|
[l_entry.name for l_entry in gdal.listdir(basepath + "/vsifile_opendir", 1)]
|
|
)
|
|
assert files == set(["test", "subdir", "subdir/subdir2"])
|
|
|
|
# Prefix filtering
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", -1, ["PREFIX=t"])
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "test"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", -1, ["PREFIX=testtoolong"])
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", -1, ["PREFIX=subd"])
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "subdir"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "subdir/subdir2"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "subdir/subdir2/test2"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
d = gdal.OpenDir(basepath + "/vsifile_opendir", -1, ["PREFIX=subdir/sub"])
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "subdir/subdir2"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert entry.name == "subdir/subdir2/test2"
|
|
entry = gdal.GetNextDirEntry(d)
|
|
assert not entry
|
|
gdal.CloseDir(d)
|
|
|
|
# Cleanup
|
|
gdal.RmdirRecursive(basepath + "/vsifile_opendir")
|
|
|
|
|
|
###############################################################################
|
|
# Test bugfix for https://github.com/OSGeo/gdal/issues/1559
|
|
|
|
|
|
def test_vsitar_verylongfilename_posix():
|
|
|
|
f = gdal.VSIFOpenL(
|
|
"/vsitar/data/verylongfilename.tar/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/ccccccccccccccccccccccccccccccccccc/ddddddddddddddddddddddddddddddddddddddd/eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee/fffffffffffffffffffffffffffffffffffffffffffffff/foo",
|
|
"rb",
|
|
)
|
|
assert f
|
|
data = gdal.VSIFReadL(1, 3, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
assert data == "bar"
|
|
|
|
|
|
###############################################################################
|
|
# Test bugfix for https://github.com/OSGeo/gdal/issues/4625
|
|
|
|
|
|
def test_vsitar_longfilename_ustar():
|
|
|
|
assert (
|
|
gdal.VSIStatL(
|
|
"/vsitar/data/longfilename_ustar.tar/zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz/bbbbbbbbbbbbbbbbbbbbbbb/ccccccccccccccccccccccccc/dddddddddd/e/byte.tif"
|
|
)
|
|
is not None
|
|
)
|
|
|
|
|
|
def test_unlink_batch():
|
|
|
|
gdal.FileFromMemBuffer("/vsimem/foo", "foo")
|
|
gdal.FileFromMemBuffer("/vsimem/bar", "bar")
|
|
assert gdal.UnlinkBatch(["/vsimem/foo", "/vsimem/bar"])
|
|
assert not gdal.VSIStatL("/vsimem/foo")
|
|
assert not gdal.VSIStatL("/vsimem/bar")
|
|
|
|
assert not gdal.UnlinkBatch([])
|
|
|
|
gdal.FileFromMemBuffer("/vsimem/foo", "foo")
|
|
open("tmp/bar", "wt").write("bar")
|
|
with gdaltest.error_handler():
|
|
assert not gdal.UnlinkBatch(["/vsimem/foo", "tmp/bar"])
|
|
gdal.Unlink("/vsimem/foo")
|
|
gdal.Unlink("tmp/bar")
|
|
|
|
|
|
###############################################################################
|
|
# Test gdal.RmdirRecursive()
|
|
|
|
|
|
def test_vsifile_rmdirrecursive():
|
|
|
|
gdal.Mkdir("tmp/rmdirrecursive", 493)
|
|
gdal.Mkdir("tmp/rmdirrecursive/subdir", 493)
|
|
open("tmp/rmdirrecursive/foo.bin", "wb").close()
|
|
open("tmp/rmdirrecursive/subdir/bar.bin", "wb").close()
|
|
assert gdal.RmdirRecursive("tmp/rmdirrecursive") == 0
|
|
assert not os.path.exists("tmp/rmdirrecursive")
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def test_vsifile_vsizip_error():
|
|
|
|
for i in range(128):
|
|
filename = "/vsimem/tmp||maxlength=%d.zip" % i
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip/%s/out.bin" % filename, "wb")
|
|
if f is not None:
|
|
assert gdal.VSIFCloseL(f) < 0
|
|
gdal.Unlink(filename)
|
|
|
|
|
|
###############################################################################
|
|
# Test bugfix for https://github.com/OSGeo/gdal/issues/5225
|
|
|
|
|
|
def test_vsifile_vsitar_gz_with_tar_multiple_of_65536_bytes():
|
|
|
|
f = gdal.VSIFOpenL("/vsitar/data/tar_of_65536_bytes.tar.gz/zero.bin", "rb")
|
|
assert f is not None
|
|
read_bytes = gdal.VSIFReadL(1, 65024, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert read_bytes == b"\x00" * 65024
|
|
gdal.Unlink("data/tar_of_65536_bytes.tar.gz.properties")
|
|
|
|
|
|
###############################################################################
|
|
# Test bugfix for https://github.com/OSGeo/gdal/issues/5468
|
|
|
|
|
|
def test_vsifile_vsizip_stored():
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/data/stored.zip/foo.txt", "rb")
|
|
assert f
|
|
assert gdal.VSIFReadL(1, 5, f) == b"foo\n"
|
|
assert gdal.VSIFEofL(f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
|
|
###############################################################################
|
|
# Test that VSIFTruncateL() zeroize beyond the truncated area
|
|
|
|
|
|
def test_vsifile_vsimem_truncate_zeroize():
|
|
|
|
filename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(filename, "wb+")
|
|
data = b"\xFF" * 10000
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFTruncateL(f, 0)
|
|
gdal.VSIFSeekL(f, 10000, 0)
|
|
gdal.VSIFWriteL(b"\x00", 1, 1, f)
|
|
gdal.VSIFSeekL(f, 0, 0)
|
|
assert gdal.VSIFReadL(1, 1, f) == b"\x00"
|
|
gdal.VSIFCloseL(f)
|
|
|
|
|
|
###############################################################################
|
|
# Test VSICopyFile()
|
|
|
|
|
|
def test_vsifile_copyfile():
|
|
|
|
# Most simple invocation
|
|
dstfilename = "/vsimem/test_vsifile_copyfile.tif"
|
|
assert gdal.CopyFile("data/byte.tif", dstfilename) == 0
|
|
assert gdal.VSIStatL(dstfilename).size == gdal.VSIStatL("data/byte.tif").size
|
|
|
|
# Test srcfilename passed to None
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
assert gdal.CopyFile(None, dstfilename, f) == 0
|
|
gdal.VSIFCloseL(f)
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size == 1000 * 1000
|
|
|
|
# Test progress callback
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
def progress(pct, msg, user_data):
|
|
user_data.append(pct)
|
|
return 1
|
|
|
|
tab = []
|
|
assert (
|
|
gdal.CopyFile(srcfilename, dstfilename, callback=progress, callback_data=tab)
|
|
== 0
|
|
)
|
|
assert tab[-1] == 1.0
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size == 1000 * 1000
|
|
|
|
# Test progress callback in error situation
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
def progress(pct, msg, user_data):
|
|
if pct > 0.5:
|
|
return 0
|
|
user_data.append(pct)
|
|
return 1
|
|
|
|
tab = []
|
|
assert (
|
|
gdal.CopyFile(srcfilename, dstfilename, callback=progress, callback_data=tab)
|
|
!= 0
|
|
)
|
|
assert tab[-1] != 1.0
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size != 1000 * 1000
|
|
|
|
gdal.Unlink(dstfilename)
|