965 строки
30 KiB
Python
Исполняемый файл
965 строки
30 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
# -*- coding: utf-8 -*-
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: Test /vsizip/vsimem/
|
|
# Author: Even Rouault <even dot rouault at spatialys.com>
|
|
#
|
|
###############################################################################
|
|
# Copyright (c) 2010-2014, Even Rouault <even dot rouault at spatialys.com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import random
|
|
|
|
import gdaltest
|
|
import pytest
|
|
|
|
from osgeo import gdal
|
|
|
|
###############################################################################
|
|
# Test writing a ZIP with multiple files and directories
|
|
|
|
|
|
@gdaltest.disable_exceptions()
|
|
def test_vsizip_1():
|
|
|
|
# We can keep the handle open during all the ZIP writing
|
|
hZIP = gdal.VSIFOpenL("/vsizip/vsimem/test.zip", "wb")
|
|
assert hZIP is not None, "fail 1"
|
|
|
|
# One way to create a directory
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/subdir2/", "wb")
|
|
assert f is not None, "fail 2"
|
|
gdal.VSIFCloseL(f)
|
|
|
|
# A more natural one
|
|
gdal.Mkdir("/vsizip/vsimem/test.zip/subdir1", 0)
|
|
|
|
# Create 1st file
|
|
f2 = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/subdir3/abcd", "wb")
|
|
assert f2 is not None, "fail 3"
|
|
gdal.VSIFWriteL("abcd", 1, 4, f2)
|
|
gdal.VSIFCloseL(f2)
|
|
|
|
# Test that we cannot read a zip file being written
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/subdir3/abcd", "rb")
|
|
assert (
|
|
gdal.GetLastErrorMsg() == "Cannot read a zip file being written"
|
|
), "expected error"
|
|
assert f is None, "should not have been successful 1"
|
|
|
|
# Create 2nd file
|
|
f3 = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/subdir3/efghi", "wb")
|
|
assert f3 is not None, "fail 4"
|
|
gdal.VSIFWriteL("efghi", 1, 5, f3)
|
|
|
|
# Try creating a 3d file
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f4 = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/that_wont_work", "wb")
|
|
assert (
|
|
gdal.GetLastErrorMsg()
|
|
== "Cannot create that_wont_work while another file is being written in the .zip"
|
|
), "expected error"
|
|
assert f4 is None, "should not have been successful 2"
|
|
|
|
gdal.VSIFCloseL(f3)
|
|
|
|
# Now we can close the main handle
|
|
gdal.VSIFCloseL(hZIP)
|
|
|
|
# ERROR 6: Support only 1 file in archive file /vsimem/test.zip when no explicit in-archive filename is specified
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test.zip", "rb")
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
assert gdal.GetLastErrorMsg() != "", "expected error"
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test.zip/subdir3/abcd", "rb")
|
|
assert f is not None, "fail 5"
|
|
data = gdal.VSIFReadL(1, 4, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data.decode("ASCII") == "abcd"
|
|
|
|
# Test alternate uri syntax
|
|
gdal.Rename("/vsimem/test.zip", "/vsimem/test.xxx")
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.xxx}/subdir3/abcd", "rb")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data.decode("ASCII") == "abcd"
|
|
|
|
# With a trailing slash
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.xxx}/subdir3/abcd/", "rb")
|
|
assert f is not None
|
|
gdal.VSIFCloseL(f)
|
|
|
|
# Test ReadDir()
|
|
assert len(gdal.ReadDir("/vsizip/{/vsimem/test.xxx}")) == 3
|
|
|
|
# Unbalanced curls
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.xxx", "rb")
|
|
assert f is None
|
|
|
|
# Non existing mainfile
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.xxx}/bla", "rb")
|
|
assert f is None
|
|
|
|
# Non existing subfile
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.zzz}/bla", "rb")
|
|
assert f is None
|
|
|
|
# Wrong syntax
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/test.xxx}.aux.xml", "rb")
|
|
assert f is None
|
|
|
|
# Test nested { { } }
|
|
hZIP = gdal.VSIFOpenL("/vsizip/{/vsimem/zipinzip.yyy}", "wb")
|
|
assert hZIP is not None, "fail 1"
|
|
f = gdal.VSIFOpenL("/vsizip/{/vsimem/zipinzip.yyy}/test.xxx", "wb")
|
|
f_src = gdal.VSIFOpenL("/vsimem/test.xxx", "rb")
|
|
data = gdal.VSIFReadL(1, 10000, f_src)
|
|
gdal.VSIFCloseL(f_src)
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(hZIP)
|
|
|
|
f = gdal.VSIFOpenL(
|
|
"/vsizip/{/vsizip/{/vsimem/zipinzip.yyy}/test.xxx}/subdir3/abcd/", "rb"
|
|
)
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data.decode("ASCII") == "abcd"
|
|
|
|
gdal.Unlink("/vsimem/test.xxx")
|
|
gdal.Unlink("/vsimem/zipinzip.yyy")
|
|
|
|
# Test VSIStatL on a non existing file
|
|
assert gdal.VSIStatL("/vsizip//vsimem/foo.zip") is None
|
|
|
|
# Test ReadDir on a non existing file
|
|
assert gdal.ReadDir("/vsizip//vsimem/foo.zip") is None
|
|
|
|
|
|
###############################################################################
|
|
# Test writing 2 files in the ZIP by closing it completely between the 2
|
|
|
|
|
|
@gdaltest.disable_exceptions()
|
|
def test_vsizip_2():
|
|
|
|
zip_name = "/vsimem/test2.zip"
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/" + zip_name + "/foo.bar", "wb")
|
|
assert fmain is not None, "fail 1"
|
|
gdal.VSIFWriteL("12345", 1, 5, fmain)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
content = gdal.ReadDir("/vsizip/" + zip_name)
|
|
assert content == ["foo.bar"], "bad content 1"
|
|
|
|
# Now append a second file
|
|
fmain = gdal.VSIFOpenL("/vsizip/" + zip_name + "/bar.baz", "wb")
|
|
assert fmain is not None, "fail 2"
|
|
gdal.VSIFWriteL("67890", 1, 5, fmain)
|
|
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
content = gdal.ReadDir("/vsizip/" + zip_name)
|
|
assert (
|
|
gdal.GetLastErrorMsg() == "Cannot read a zip file being written"
|
|
), "expected error"
|
|
assert content is None, "bad content 2"
|
|
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
content = gdal.ReadDir("/vsizip/" + zip_name)
|
|
assert content == ["foo.bar", "bar.baz"], "bad content 3"
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/" + zip_name + "/foo.bar", "rb")
|
|
assert fmain is not None, "fail 3"
|
|
data = gdal.VSIFReadL(1, 5, fmain)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
assert data.decode("ASCII") == "12345"
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/" + zip_name + "/bar.baz", "rb")
|
|
assert fmain is not None, "fail 4"
|
|
data = gdal.VSIFReadL(1, 5, fmain)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
assert data.decode("ASCII") == "67890"
|
|
|
|
gdal.Unlink(zip_name)
|
|
|
|
|
|
###############################################################################
|
|
# Test opening in write mode a file inside a zip archive whose content has been listed before (testcase for fix of r22625)
|
|
|
|
|
|
def test_vsizip_3():
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/test3.zip", "wb")
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test3.zip/foo", "wb")
|
|
gdal.VSIFWriteL("foo", 1, 3, f)
|
|
gdal.VSIFCloseL(f)
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test3.zip/bar", "wb")
|
|
gdal.VSIFWriteL("bar", 1, 3, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
gdal.ReadDir("/vsizip/vsimem/test3.zip")
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test3.zip/baz", "wb")
|
|
gdal.VSIFWriteL("baz", 1, 3, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
res = gdal.ReadDir("/vsizip/vsimem/test3.zip")
|
|
|
|
gdal.Unlink("/vsimem/test3.zip")
|
|
|
|
assert res == ["foo", "bar", "baz"]
|
|
|
|
|
|
###############################################################################
|
|
# Test ReadRecursive on valid zip
|
|
|
|
|
|
def test_vsizip_4():
|
|
|
|
# read recursive and validate content
|
|
res = gdal.ReadDirRecursive("/vsizip/data/testzip.zip")
|
|
assert res is not None, "fail read"
|
|
assert res == [
|
|
"subdir/",
|
|
"subdir/subdir/",
|
|
"subdir/subdir/uint16.tif",
|
|
"subdir/subdir/test_rpc.txt",
|
|
"subdir/test_rpc.txt",
|
|
"test_rpc.txt",
|
|
"uint16.tif",
|
|
], "bad content"
|
|
|
|
|
|
###############################################################################
|
|
# Test ReadRecursive on deep zip
|
|
|
|
|
|
def test_vsizip_5():
|
|
|
|
# make file in memory
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/bigdepthzip.zip", "wb")
|
|
assert fmain is not None
|
|
|
|
filename = "a" + "/a" * 1000
|
|
finside = gdal.VSIFOpenL("/vsizip/vsimem/bigdepthzip.zip/" + filename, "wb")
|
|
assert finside is not None
|
|
gdal.VSIFCloseL(finside)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
# read recursive and validate content
|
|
res = gdal.ReadDirRecursive("/vsizip/vsimem/bigdepthzip.zip")
|
|
assert res is not None, "fail read"
|
|
assert len(res) == 1001, "wrong size: " + str(len(res))
|
|
assert res[10] == "a/a/a/a/a/a/a/a/a/a/a/", "bad content: " + res[10]
|
|
|
|
gdal.Unlink("/vsimem/bigdepthzip.zip")
|
|
|
|
|
|
###############################################################################
|
|
# Test writing 2 files with same name in a ZIP (#4785)
|
|
|
|
|
|
@gdaltest.disable_exceptions()
|
|
def test_vsizip_6():
|
|
|
|
# Maintain ZIP file opened
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/test6.zip", "wb")
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test6.zip/foo.bar", "wb")
|
|
assert f is not None
|
|
gdal.VSIFWriteL("12345", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
f = None
|
|
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test6.zip/foo.bar", "wb")
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
pytest.fail()
|
|
gdal.VSIFCloseL(fmain)
|
|
fmain = None
|
|
|
|
gdal.Unlink("/vsimem/test6.zip")
|
|
|
|
# Now close it each time
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test6.zip/foo.bar", "wb")
|
|
assert f is not None
|
|
gdal.VSIFWriteL("12345", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
f = None
|
|
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/test6.zip/foo.bar", "wb")
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
pytest.fail()
|
|
|
|
gdal.Unlink("/vsimem/test6.zip")
|
|
|
|
|
|
###############################################################################
|
|
# Test that we use the extended field for UTF-8 filenames (#5361).
|
|
|
|
|
|
def test_vsizip_7():
|
|
|
|
content = gdal.ReadDir("/vsizip/data/cp866_plus_utf8.zip")
|
|
ok = 0
|
|
try:
|
|
local_vars = {"content": content, "ok": ok}
|
|
exec(
|
|
"if content == [u'\u0430\u0431\u0432\u0433\u0434\u0435', u'\u0436\u0437\u0438\u0439\u043a\u043b']: ok = 1",
|
|
None,
|
|
local_vars,
|
|
)
|
|
ok = local_vars["ok"]
|
|
except Exception:
|
|
if content == [
|
|
"\u0430\u0431\u0432\u0433\u0434\u0435",
|
|
"\u0436\u0437\u0438\u0439\u043a\u043b",
|
|
]:
|
|
ok = 1
|
|
|
|
if ok == 0:
|
|
print(content)
|
|
pytest.fail("bad content")
|
|
|
|
|
|
###############################################################################
|
|
# Basic test for ZIP64 support (5 GB file that compresses in less than 4 GB)
|
|
|
|
|
|
def test_vsizip_8():
|
|
|
|
assert (
|
|
gdal.VSIStatL("/vsizip/vsizip/data/zero.bin.zip.zip/zero.bin.zip").size
|
|
== 5000 * 1000 * 1000 + 1
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Basic test for ZIP64 support (5 GB file that is stored)
|
|
|
|
|
|
def test_vsizip_9():
|
|
|
|
assert (
|
|
gdal.VSIStatL("/vsizip//vsisparse/data/zero_stored.bin.xml.zip/zero.bin").size
|
|
== 5000 * 1000 * 1000 + 1
|
|
)
|
|
|
|
assert (
|
|
gdal.VSIStatL("/vsizip//vsisparse/data/zero_stored.bin.xml.zip/hello.txt").size
|
|
== 6
|
|
)
|
|
|
|
f = gdal.VSIFOpenL("/vsizip//vsisparse/data/zero_stored.bin.xml.zip/zero.bin", "rb")
|
|
gdal.VSIFSeekL(f, 5000 * 1000 * 1000, 0)
|
|
data = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert data.decode("ascii") == "\x03"
|
|
|
|
f = gdal.VSIFOpenL(
|
|
"/vsizip//vsisparse/data/zero_stored.bin.xml.zip/hello.txt", "rb"
|
|
)
|
|
data = gdal.VSIFReadL(1, 6, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert data.decode("ascii") == "HELLO\n"
|
|
|
|
|
|
###############################################################################
|
|
# Test that we recode filenames in ZIP (#5361)
|
|
|
|
|
|
def test_vsizip_10():
|
|
|
|
with gdal.config_option("CPL_ZIP_ENCODING", "CP866"):
|
|
content = gdal.ReadDir("/vsizip/data/cp866.zip")
|
|
ok = 0
|
|
try:
|
|
local_vars = {"content": content, "ok": ok}
|
|
exec(
|
|
"if content == [u'\u0430\u0431\u0432\u0433\u0434\u0435', u'\u0436\u0437\u0438\u0439\u043a\u043b']: ok = 1",
|
|
None,
|
|
local_vars,
|
|
)
|
|
ok = local_vars["ok"]
|
|
except Exception:
|
|
if content == [
|
|
"\u0430\u0431\u0432\u0433\u0434\u0435",
|
|
"\u0436\u0437\u0438\u0439\u043a\u043b",
|
|
]:
|
|
ok = 1
|
|
|
|
if ok == 0:
|
|
if gdal.GetLastErrorMsg().find("Recode from CP866 to UTF-8 not supported") >= 0:
|
|
pytest.skip()
|
|
|
|
print(content)
|
|
pytest.fail("bad content")
|
|
|
|
|
|
###############################################################################
|
|
# Test that we don't do anything with ZIP with filenames in UTF-8 already (#5361)
|
|
|
|
|
|
def test_vsizip_11():
|
|
|
|
content = gdal.ReadDir("/vsizip/data/utf8.zip")
|
|
ok = 0
|
|
try:
|
|
local_vars = {"content": content, "ok": ok}
|
|
exec(
|
|
"if content == [u'\u0430\u0431\u0432\u0433\u0434\u0435', u'\u0436\u0437\u0438\u0439\u043a\u043b']: ok = 1",
|
|
None,
|
|
local_vars,
|
|
)
|
|
ok = local_vars["ok"]
|
|
except Exception:
|
|
if content == [
|
|
"\u0430\u0431\u0432\u0433\u0434\u0435",
|
|
"\u0436\u0437\u0438\u0439\u043a\u043b",
|
|
]:
|
|
ok = 1
|
|
|
|
if ok == 0:
|
|
print(content)
|
|
pytest.fail("bad content")
|
|
|
|
|
|
###############################################################################
|
|
# Test changing the content of a zip file (#6005)
|
|
|
|
|
|
def test_vsizip_12():
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_12_src1.zip", "wb")
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_12_src1.zip/foo.bar", "wb")
|
|
data = "0123456"
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_12_src2.zip", "wb")
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_12_src2.zip/bar.baz", "wb")
|
|
data = "01234567"
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
# Copy vsizip_12_src1 into vsizip_12
|
|
f = gdal.VSIFOpenL("/vsimem/vsizip_12_src1.zip", "rb")
|
|
data = gdal.VSIFReadL(1, 10000, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/vsizip_12.zip", "wb")
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.ReadDir("/vsizip/vsimem/vsizip_12.zip")
|
|
|
|
# Copy vsizip_12_src2 into vsizip_12
|
|
f = gdal.VSIFOpenL("/vsimem/vsizip_12_src2.zip", "rb")
|
|
data = gdal.VSIFReadL(1, 10000, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
f = gdal.VSIFOpenL("/vsimem/vsizip_12.zip", "wb")
|
|
gdal.VSIFWriteL(data, 1, len(data), f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
content = gdal.ReadDir("/vsizip/vsimem/vsizip_12.zip")
|
|
|
|
gdal.Unlink("/vsimem/vsizip_12_src1.zip")
|
|
gdal.Unlink("/vsimem/vsizip_12_src2.zip")
|
|
gdal.Unlink("/vsimem/vsizip_12.zip")
|
|
|
|
assert content == ["bar.baz"]
|
|
|
|
|
|
###############################################################################
|
|
# Test ReadDir() truncation
|
|
|
|
|
|
def test_vsizip_13():
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_13.zip", "wb")
|
|
for i in range(10):
|
|
f = gdal.VSIFOpenL("/vsizip/vsimem/vsizip_13.zip/%d" % i, "wb")
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
lst = gdal.ReadDir("/vsizip/vsimem/vsizip_13.zip")
|
|
assert len(lst) >= 4
|
|
# Test truncation
|
|
lst_truncated = gdal.ReadDir("/vsizip/vsimem/vsizip_13.zip", int(len(lst) / 2))
|
|
assert len(lst_truncated) > int(len(lst) / 2)
|
|
|
|
gdal.Unlink("/vsimem/vsizip_13.zip")
|
|
|
|
|
|
###############################################################################
|
|
# Test that we can recode filenames in ZIP when writing (#6631)
|
|
|
|
|
|
def test_vsizip_14():
|
|
|
|
fmain = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_14.zip", "wb")
|
|
try:
|
|
x = [""]
|
|
exec("x[0] = u'\u0430\u0431\u0432\u0433\u0434\u0435'")
|
|
cp866_filename = x[0]
|
|
except Exception:
|
|
cp866_filename = "\u0430\u0431\u0432\u0433\u0434\u0435"
|
|
|
|
with gdaltest.error_handler():
|
|
f = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_14.zip/" + cp866_filename, "wb")
|
|
if f is None:
|
|
gdal.VSIFCloseL(fmain)
|
|
gdal.Unlink("/vsimem/vsizip_14.zip")
|
|
pytest.skip()
|
|
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
content = gdal.ReadDir("/vsizip//vsimem/vsizip_14.zip")
|
|
assert content == [cp866_filename], "bad content"
|
|
|
|
gdal.Unlink("/vsimem/vsizip_14.zip")
|
|
|
|
|
|
###############################################################################
|
|
# Test multithreaded compression
|
|
|
|
|
|
def test_vsizip_multi_thread():
|
|
|
|
with gdaltest.config_options(
|
|
{"GDAL_NUM_THREADS": "ALL_CPUS", "CPL_VSIL_DEFLATE_CHUNK_SIZE": "32K"}
|
|
):
|
|
fmain = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip", "wb")
|
|
f = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip/test", "wb")
|
|
for i in range(100000):
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
f = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip/test", "rb")
|
|
data = gdal.VSIFReadL(100000, 5, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/vsizip_multi_thread.zip")
|
|
|
|
if data != "hello" * 100000:
|
|
for i in range(10000):
|
|
if data[i * 5 : i * 5 + 5] != "hello":
|
|
print(i * 5, data[i * 5 : i * 5 + 5], data[i * 5 - 5 : i * 5 + 5 - 5])
|
|
break
|
|
|
|
pytest.fail()
|
|
|
|
|
|
###############################################################################
|
|
# Test multithreaded compression, with I/O error
|
|
|
|
|
|
@gdaltest.disable_exceptions()
|
|
def test_vsizip_multi_thread_error():
|
|
|
|
with gdaltest.error_handler():
|
|
with gdaltest.config_options(
|
|
{"GDAL_NUM_THREADS": "ALL_CPUS", "CPL_VSIL_DEFLATE_CHUNK_SIZE": "16K"}
|
|
):
|
|
fmain = gdal.VSIFOpenL(
|
|
"/vsizip/{/vsimem/vsizip_multi_thread.zip||maxlength=1000}", "wb"
|
|
)
|
|
f = gdal.VSIFOpenL(
|
|
"/vsizip/{/vsimem/vsizip_multi_thread.zip||maxlength=1000}/test", "wb"
|
|
)
|
|
for i in range(100000):
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
gdal.Unlink("/vsimem/vsizip_multi_thread.zip")
|
|
|
|
|
|
###############################################################################
|
|
# Test multithreaded compression, below the threshold where it triggers
|
|
|
|
|
|
def test_vsizip_multi_thread_below_threshold():
|
|
|
|
with gdaltest.config_options({"GDAL_NUM_THREADS": "ALL_CPUS"}):
|
|
fmain = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip", "wb")
|
|
f = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip/test", "wb")
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
f = gdal.VSIFOpenL("/vsizip//vsimem/vsizip_multi_thread.zip/test", "rb")
|
|
data = gdal.VSIFReadL(1, 5, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
gdal.Unlink("/vsimem/vsizip_multi_thread.zip")
|
|
|
|
assert data == "hello"
|
|
|
|
|
|
###############################################################################
|
|
# Test creating ZIP64 file: uncompressed larger than 4GB, but compressed
|
|
# data stream < 4 GB
|
|
|
|
|
|
@pytest.mark.slow()
|
|
def test_vsizip_create_zip64():
|
|
|
|
niters = 1000
|
|
s = "hello" * 1000 * 1000
|
|
zip_name = "/vsimem/vsizip_create_zip64.zip"
|
|
with gdaltest.config_options({"GDAL_NUM_THREADS": "ALL_CPUS"}):
|
|
fmain = gdal.VSIFOpenL("/vsizip/" + zip_name, "wb")
|
|
f = gdal.VSIFOpenL("/vsizip/" + zip_name + "/test", "wb")
|
|
for i in range(niters):
|
|
gdal.VSIFWriteL(s, 1, len(s), f)
|
|
gdal.VSIFCloseL(f)
|
|
gdal.VSIFCloseL(fmain)
|
|
|
|
size = gdal.VSIStatL(zip_name).size
|
|
assert size <= 0xFFFFFFFF
|
|
|
|
size = gdal.VSIStatL("/vsizip/" + zip_name + "/test").size
|
|
assert size == len(s) * niters
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/" + zip_name + "/test", "rb")
|
|
data = gdal.VSIFReadL(1, len(s), f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
assert data == s
|
|
|
|
gdal.Unlink(zip_name)
|
|
|
|
|
|
###############################################################################
|
|
# Test creating ZIP64 file: compressed data stream > 4 GB
|
|
|
|
|
|
@pytest.mark.slow()
|
|
def test_vsizip_create_zip64_stream_larger_than_4G():
|
|
|
|
zip_name = "tmp/vsizip_create_zip64_stream_larger_than_4G.zip"
|
|
|
|
gdal.Unlink(zip_name)
|
|
|
|
niters = 999
|
|
s = "".join([chr(random.randint(0, 127)) for i in range(5 * 1000 * 1000)])
|
|
with gdaltest.config_options({"GDAL_NUM_THREADS": "ALL_CPUS"}):
|
|
f = gdal.VSIFOpenL("/vsizip/" + zip_name + "/test2", "wb")
|
|
for i in range(niters):
|
|
gdal.VSIFWriteL(s, 1, len(s), f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
size = gdal.VSIStatL(zip_name).size
|
|
assert size > 0xFFFFFFFF
|
|
|
|
size = gdal.VSIStatL("/vsizip/" + zip_name + "/test2").size
|
|
assert size == len(s) * niters
|
|
|
|
f = gdal.VSIFOpenL("/vsizip/" + zip_name + "/test2", "rb")
|
|
data = gdal.VSIFReadL(1, len(s), f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
assert data == s
|
|
|
|
gdal.Unlink(zip_name)
|
|
|
|
|
|
###############################################################################
|
|
def test_vsizip_byte_zip64_local_header_zeroed():
|
|
|
|
size = gdal.VSIStatL(
|
|
"/vsizip/data/byte_zip64_local_header_zeroed.zip/byte.tif"
|
|
).size
|
|
assert size == 736
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def test_vsizip_deflate64():
|
|
|
|
filename = "/vsizip/data/deflate64.zip/100k_lines.txt"
|
|
size = gdal.VSIStatL(filename).size
|
|
assert size == 2188890
|
|
|
|
f = gdal.VSIFOpenL(filename, "rb")
|
|
assert f
|
|
try:
|
|
data = gdal.VSIFReadL(1, size, f)
|
|
assert len(data) == size
|
|
assert len(gdal.VSIFReadL(1, 1, f)) == 0
|
|
assert gdal.VSIFSeekL(f, 0, 0) == 0
|
|
data2 = gdal.VSIFReadL(1, size, f)
|
|
len_data2 = len(data2)
|
|
assert len_data2 == size
|
|
assert data2 == data
|
|
for (pos, nread) in [
|
|
(10000, 1000),
|
|
(1, 1),
|
|
(size - 1, 1),
|
|
(size // 2, size // 2 + 10),
|
|
]:
|
|
assert gdal.VSIFSeekL(f, pos, 0) == 0
|
|
data2 = gdal.VSIFReadL(1, nread, f)
|
|
len_data2 = len(data2)
|
|
assert len_data2 == min(nread, size - pos), (pos, nread)
|
|
assert data2 == data[pos : pos + len_data2], (pos, nread)
|
|
finally:
|
|
gdal.VSIFCloseL(f)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
@gdaltest.disable_exceptions()
|
|
def test_vsizip_byte_copyfile_regular():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_copyfile_regular.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
try:
|
|
assert gdal.CopyFile("data/byte.tif", dstfilename) == 0
|
|
assert gdal.VSIStatL(dstfilename).size == gdal.VSIStatL("data/byte.tif").size
|
|
|
|
md = gdal.GetFileMetadata(dstfilename, "ZIP")
|
|
assert md is not None
|
|
assert md.keys() == {
|
|
"START_DATA_OFFSET",
|
|
"COMPRESSION_METHOD",
|
|
"COMPRESSED_SIZE",
|
|
"UNCOMPRESSED_SIZE",
|
|
}
|
|
assert md["START_DATA_OFFSET"] == "38"
|
|
assert md["COMPRESSION_METHOD"] == "8 (DEFLATE)"
|
|
assert md["UNCOMPRESSED_SIZE"] == str(gdal.VSIStatL(dstfilename).size)
|
|
assert int(md["COMPRESSED_SIZE"]) < int(md["UNCOMPRESSED_SIZE"])
|
|
|
|
# The file already exists:
|
|
with gdaltest.error_handler():
|
|
assert gdal.CopyFile("data/byte.tif", dstfilename) == -1
|
|
finally:
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
def test_vsizip_byte_copyfile_srcfilename_is_none():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_copyfile_srcfilename_is_none.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
try:
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 5 * 1000 * 1000)
|
|
assert gdal.CopyFile(None, dstfilename, f) == 0
|
|
gdal.VSIFCloseL(f)
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size == 5 * 1000 * 1000
|
|
md = gdal.GetFileMetadata(dstfilename, "ZIP")
|
|
assert md is not None
|
|
assert md.keys() == {
|
|
"START_DATA_OFFSET",
|
|
"COMPRESSION_METHOD",
|
|
"COMPRESSED_SIZE",
|
|
"UNCOMPRESSED_SIZE",
|
|
"SOZIP_FOUND",
|
|
"SOZIP_VERSION",
|
|
"SOZIP_OFFSET_SIZE",
|
|
"SOZIP_CHUNK_SIZE",
|
|
"SOZIP_START_DATA_OFFSET",
|
|
"SOZIP_VALID",
|
|
}
|
|
assert md["START_DATA_OFFSET"] == "38"
|
|
assert md["COMPRESSION_METHOD"] == "8 (DEFLATE)"
|
|
assert md["UNCOMPRESSED_SIZE"] == str(gdal.VSIStatL(dstfilename).size)
|
|
assert int(md["COMPRESSED_SIZE"]) < int(md["UNCOMPRESSED_SIZE"])
|
|
assert md["SOZIP_FOUND"] == "YES"
|
|
assert md["SOZIP_VALID"] == "YES"
|
|
assert md["SOZIP_VERSION"] == "1"
|
|
assert md["SOZIP_OFFSET_SIZE"] == "8"
|
|
assert md["SOZIP_CHUNK_SIZE"] == "32768"
|
|
assert int(md["SOZIP_START_DATA_OFFSET"]) > int(md["START_DATA_OFFSET"])
|
|
|
|
finally:
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
def test_vsizip_byte_copyfile_progress_cbk():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_copyfile_progress_cbk.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
try:
|
|
# Test progress callback
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 1000 * 1000)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
def progress(pct, msg, user_data):
|
|
user_data.append(pct)
|
|
return 1
|
|
|
|
tab = []
|
|
assert (
|
|
gdal.CopyFile(
|
|
srcfilename, dstfilename, callback=progress, callback_data=tab
|
|
)
|
|
== 0
|
|
)
|
|
assert tab[-1] == 1.0
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size == 1000 * 1000
|
|
finally:
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
def test_vsizip_byte_copyfile_progress_cbk_error():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_copyfile_progress_cbk_error.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
try:
|
|
srcfilename = "/vsimem/test.bin"
|
|
f = gdal.VSIFOpenL(srcfilename, "wb+")
|
|
gdal.VSIFTruncateL(f, 10 * 1000 * 1000)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
def progress(pct, msg, user_data):
|
|
if pct > 0.5:
|
|
return 0
|
|
user_data.append(pct)
|
|
return 1
|
|
|
|
tab = []
|
|
assert (
|
|
gdal.CopyFile(
|
|
srcfilename, dstfilename, callback=progress, callback_data=tab
|
|
)
|
|
!= 0
|
|
)
|
|
assert len(tab) == 0 or tab[-1] != 1.0
|
|
gdal.Unlink(srcfilename)
|
|
assert gdal.VSIStatL(dstfilename).size != 10 * 1000 * 1000
|
|
finally:
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
def test_vsizip_byte_copyfile_file_already_open():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_copyfile_regular.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
dstfilename2 = f"/vsizip/{zipfilename}/test2.tif"
|
|
try:
|
|
fmain = gdal.VSIFOpenL(zipfilename, "wb")
|
|
assert gdal.CopyFile("data/byte.tif", dstfilename) == 0
|
|
assert gdal.VSIStatL(dstfilename).size == gdal.VSIStatL("data/byte.tif").size
|
|
|
|
assert gdal.CopyFile("data/uint16.tif", dstfilename2) == 0
|
|
assert gdal.VSIStatL(dstfilename2).size == gdal.VSIStatL("data/uint16.tif").size
|
|
|
|
finally:
|
|
gdal.VSIFCloseL(fmain)
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def test_vsizip_byte_sozip():
|
|
|
|
zipfilename = "/vsimem/test_vsizip_byte_sozip.zip"
|
|
dstfilename = f"/vsizip/{zipfilename}/test.tif"
|
|
try:
|
|
options = ["SOZIP_ENABLED=YES", "SOZIP_CHUNK_SIZE=128"]
|
|
assert gdal.CopyFile("data/byte.tif", dstfilename, options=options) == 0
|
|
assert gdal.VSIStatL(dstfilename).size == gdal.VSIStatL("data/byte.tif").size
|
|
|
|
md = gdal.GetFileMetadata(dstfilename, "ZIP")
|
|
assert md is not None
|
|
assert md["SOZIP_VALID"] == "YES"
|
|
assert md["SOZIP_CHUNK_SIZE"] == "128"
|
|
assert md["SOZIP_OFFSET_SIZE"] == "8"
|
|
|
|
ds = gdal.Open(dstfilename)
|
|
assert ds.GetRasterBand(1).Checksum() == 4672
|
|
|
|
finally:
|
|
gdal.Unlink(zipfilename)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def test_vsizip_sozip_of_file_bigger_than_4GB():
|
|
|
|
md = gdal.GetFileMetadata(
|
|
"/vsizip/{data/zero_5GB_sozip_of_sozip.zip}/zero_5GB.bin.zip", "ZIP"
|
|
)
|
|
assert md["SOZIP_VALID"] == "YES"
|
|
assert md["SOZIP_CHUNK_SIZE"] == "32768"
|
|
|
|
md = gdal.GetFileMetadata(
|
|
"/vsizip/{/vsizip/{data/zero_5GB_sozip_of_sozip.zip}/zero_5GB.bin.zip}/zero_5GB.bin",
|
|
"ZIP",
|
|
)
|
|
assert md["SOZIP_VALID"] == "YES"
|
|
assert md["SOZIP_CHUNK_SIZE"] == "10485760"
|
|
|
|
f = gdal.VSIFOpenL(
|
|
"/vsizip/{/vsizip/{data/zero_5GB_sozip_of_sozip.zip}/zero_5GB.bin.zip}/zero_5GB.bin",
|
|
"rb",
|
|
)
|
|
assert f is not None
|
|
try:
|
|
assert gdal.VSIFSeekL(f, 5 * 1024 * 1024 * 1024 - 1, 0) == 0
|
|
assert gdal.VSIFReadL(1, 2, f) == b"\x00"
|
|
finally:
|
|
gdal.VSIFCloseL(f)
|