1689 строки
62 KiB
Python
Исполняемый файл
1689 строки
62 KiB
Python
Исполняемый файл
#!/usr/bin/env pytest
|
|
###############################################################################
|
|
# $Id$
|
|
#
|
|
# Project: GDAL/OGR Test Suite
|
|
# Purpose: Test /vsigs
|
|
# Author: Even Rouault <even dot rouault at spatialys dot com>
|
|
#
|
|
###############################################################################
|
|
# Copyright (c) 2017 Even Rouault <even dot rouault at spatialys dot com>
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included
|
|
# in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
###############################################################################
|
|
|
|
import stat
|
|
import sys
|
|
|
|
import gdaltest
|
|
import pytest
|
|
import webserver
|
|
|
|
from osgeo import gdal
|
|
|
|
pytestmark = pytest.mark.require_curl()
|
|
|
|
###############################################################################
|
|
@pytest.fixture(autouse=True, scope="module")
|
|
def module_disable_exceptions():
|
|
with gdaltest.disable_exceptions():
|
|
yield
|
|
|
|
|
|
def open_for_read(uri):
|
|
"""
|
|
Opens a test file for reading.
|
|
"""
|
|
return gdal.VSIFOpenExL(uri, "rb", 1)
|
|
|
|
|
|
@pytest.fixture()
|
|
def gs_test_config():
|
|
# To avoid user credentials in ~/.boto
|
|
# to mess up our tests
|
|
options = {
|
|
"CPL_GS_CREDENTIALS_FILE": "",
|
|
"GS_OAUTH2_REFRESH_TOKEN": "",
|
|
"GS_OAUTH2_CLIENT_EMAIL": "",
|
|
"GS_OAUTH2_CLIENT_SECRET": "",
|
|
"GS_OAUTH2_CLIENT_ID": "",
|
|
"GOOGLE_APPLICATION_CREDENTIALS": "",
|
|
"GS_USER_PROJECT": "",
|
|
}
|
|
|
|
with gdaltest.config_options(options, thread_local=False):
|
|
yield
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def webserver_port():
|
|
|
|
webserver_process, webserver_port = webserver.launch(
|
|
handler=webserver.DispatcherHttpHandler
|
|
)
|
|
try:
|
|
if webserver_port == 0:
|
|
pytest.skip()
|
|
with gdaltest.config_options(
|
|
{"CPL_GS_ENDPOINT": "http://127.0.0.1:%d/" % webserver_port},
|
|
thread_local=False,
|
|
):
|
|
yield webserver_port
|
|
finally:
|
|
gdal.VSICurlClearCache()
|
|
|
|
webserver.server_stop(webserver_process, webserver_port)
|
|
|
|
|
|
###############################################################################
|
|
|
|
|
|
def test_vsigs_init(gs_test_config):
|
|
|
|
with gdaltest.config_options(
|
|
{"CPL_GCE_SKIP": "YES", "CPL_GS_ENDPOINT": ""}, thread_local=False
|
|
):
|
|
assert gdal.GetSignedURL("/vsigs/foo/bar") is None
|
|
|
|
|
|
###############################################################################
|
|
# Error cases
|
|
|
|
|
|
def test_vsigs_1(gs_test_config):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{"CPL_GCE_SKIP": "YES", "CPL_GS_ENDPOINT": ""}, thread_local=False
|
|
):
|
|
# Invalid header filename
|
|
gdal.ErrorReset()
|
|
with gdaltest.config_option(
|
|
"GDAL_HTTP_HEADER_FILE", "/i_dont/exist.py", thread_local=False
|
|
):
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs/foo/bar")
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
pytest.fail()
|
|
last_err = gdal.GetLastErrorMsg()
|
|
assert "Cannot read" in last_err
|
|
|
|
# Invalid content for header file
|
|
with gdaltest.config_option(
|
|
"GDAL_HTTP_HEADER_FILE", "vsigs.py", thread_local=False
|
|
):
|
|
f = open_for_read("/vsigs/foo/bar")
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
pytest.fail()
|
|
|
|
# Missing GS_SECRET_ACCESS_KEY
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs/foo/bar")
|
|
assert f is None and gdal.VSIGetLastErrorMsg().find("GS_SECRET_ACCESS_KEY") >= 0
|
|
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs_streaming/foo/bar")
|
|
assert f is None and gdal.VSIGetLastErrorMsg().find("GS_SECRET_ACCESS_KEY") >= 0
|
|
|
|
with gdaltest.config_option(
|
|
"GS_SECRET_ACCESS_KEY", "GS_SECRET_ACCESS_KEY", thread_local=False
|
|
):
|
|
|
|
# Missing GS_ACCESS_KEY_ID
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs/foo/bar")
|
|
assert f is None and gdal.VSIGetLastErrorMsg().find("GS_ACCESS_KEY_ID") >= 0
|
|
|
|
with gdaltest.config_option(
|
|
"GS_ACCESS_KEY_ID", "GS_ACCESS_KEY_ID", thread_local=False
|
|
):
|
|
|
|
# ERROR 1: The User Id you provided does not exist in our records.
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs/foo/bar.baz")
|
|
if f is not None or gdal.VSIGetLastErrorMsg() == "":
|
|
if f is not None:
|
|
gdal.VSIFCloseL(f)
|
|
if gdal.GetConfigOption("APPVEYOR") is not None:
|
|
return
|
|
pytest.fail(gdal.VSIGetLastErrorMsg())
|
|
|
|
gdal.ErrorReset()
|
|
with gdaltest.error_handler():
|
|
f = open_for_read("/vsigs_streaming/foo/bar.baz")
|
|
assert f is None and gdal.VSIGetLastErrorMsg() != ""
|
|
|
|
|
|
###############################################################################
|
|
# Test GS_NO_SIGN_REQUEST=YES
|
|
|
|
|
|
def test_vsigs_no_sign_request(gs_test_config):
|
|
|
|
with gdaltest.config_options({"CPL_GS_ENDPOINT": ""}, thread_local=False):
|
|
|
|
object_key = "gcp-public-data-landsat/LC08/01/044/034/LC08_L1GT_044034_20130330_20170310_01_T2/LC08_L1GT_044034_20130330_20170310_01_T2_B1.TIF"
|
|
expected_url = "https://storage.googleapis.com/" + object_key
|
|
|
|
with gdaltest.config_option("GS_NO_SIGN_REQUEST", "YES", thread_local=False):
|
|
actual_url = gdal.GetActualURL("/vsigs/" + object_key)
|
|
assert actual_url == expected_url
|
|
|
|
actual_url = gdal.GetActualURL("/vsigs_streaming/" + object_key)
|
|
assert actual_url == expected_url
|
|
|
|
f = open_for_read("/vsigs/" + object_key)
|
|
|
|
if f is None:
|
|
if gdaltest.gdalurlopen(expected_url) is None:
|
|
pytest.skip("cannot open URL")
|
|
pytest.fail()
|
|
gdal.VSIFCloseL(f)
|
|
|
|
|
|
###############################################################################
|
|
# Test with a fake Google Cloud Storage server
|
|
|
|
|
|
@pytest.mark.parametrize("use_config_options", [True, False])
|
|
def test_vsigs_2(gs_test_config, webserver_port, use_config_options):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
# header file
|
|
gdal.FileFromMemBuffer("/vsimem/my_headers.txt", "foo: bar")
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket_http_header_file/resource",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"Y",
|
|
expected_headers={"foo": "bar"},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
"GDAL_HTTP_HEADER_FILE": "/vsimem/my_headers.txt",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
f = open_for_read("/vsigs/gs_fake_bucket_http_header_file/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert len(data) == 1
|
|
gdal.Unlink("/vsimem/my_headers.txt")
|
|
|
|
options = {
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
"CPL_GS_TIMESTAMP": "my_timestamp",
|
|
}
|
|
with gdaltest.config_options(
|
|
options, thread_local=False
|
|
) if use_config_options else gdaltest.credentials("/vsigs/", options):
|
|
|
|
signed_url = gdal.GetSignedURL(
|
|
"/vsigs/gs_fake_bucket/resource", ["START_DATE=20180212T123456Z"]
|
|
)
|
|
assert signed_url in (
|
|
"http://127.0.0.1:8080/gs_fake_bucket/resource?Expires=1518442496&GoogleAccessId=GS_ACCESS_KEY_ID&Signature=xTphUyMqtKA6UmAX3PEr5VL3EOg%3D",
|
|
"http://127.0.0.1:8081/gs_fake_bucket/resource?Expires=1518442496&GoogleAccessId=GS_ACCESS_KEY_ID&Signature=xTphUyMqtKA6UmAX3PEr5VL3EOg%3D",
|
|
)
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket/resource",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"foo",
|
|
expected_headers={
|
|
"Authorization": "GOOG1 GS_ACCESS_KEY_ID:8tndu9//BfmN+Kg4AFLdUMZMBDQ="
|
|
},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket/resource",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"foo",
|
|
expected_headers={
|
|
"Authorization": "GOOG1 GS_ACCESS_KEY_ID:8tndu9//BfmN+Kg4AFLdUMZMBDQ="
|
|
},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs_streaming/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket/resource2.bin",
|
|
206,
|
|
{"Content-Range": "bytes 0-0/1000000"},
|
|
"x",
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
stat_res = gdal.VSIStatL("/vsigs/gs_fake_bucket/resource2.bin")
|
|
if stat_res is None or stat_res.size != 1000000:
|
|
if stat_res is not None:
|
|
print(stat_res.size)
|
|
else:
|
|
print(stat_res)
|
|
pytest.fail()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"HEAD", "/gs_fake_bucket/resource2.bin", 200, {"Content-Length": 1000000}
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
stat_res = gdal.VSIStatL("/vsigs_streaming/gs_fake_bucket/resource2.bin")
|
|
if stat_res is None or stat_res.size != 1000000:
|
|
if stat_res is not None:
|
|
print(stat_res.size)
|
|
else:
|
|
print(stat_res)
|
|
pytest.fail()
|
|
|
|
# Test GS_USER_PROJECT
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket/resource_under_requester_pays",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"foo",
|
|
expected_headers={
|
|
"Authorization": "GOOG1 GS_ACCESS_KEY_ID:q7i3g4lJD1c4OwiFtn/N/ePxxS0=",
|
|
"x-goog-user-project": "my_project_id",
|
|
},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
with gdaltest.config_option(
|
|
"GS_USER_PROJECT", "my_project_id", thread_local=False
|
|
):
|
|
f = open_for_read(
|
|
"/vsigs_streaming/gs_fake_bucket/resource_under_requester_pays"
|
|
)
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
|
|
###############################################################################
|
|
# Test GDAL_HTTP_HEADERS
|
|
|
|
|
|
def test_vsigs_GDAL_HTTP_HEADERS(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket_http_header_file/resource",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"Y",
|
|
expected_headers={"Authorization": "Bearer MY_BEARER"},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GDAL_HTTP_HEADERS": "Authorization: Bearer MY_BEARER",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
f = open_for_read("/vsigs/gs_fake_bucket_http_header_file/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert len(data) == 1
|
|
|
|
|
|
###############################################################################
|
|
# Test ReadDir() with a fake Google Cloud Storage server
|
|
|
|
|
|
def test_vsigs_readdir(gs_test_config, webserver_port):
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket2/?delimiter=%2F&prefix=a_dir%2F",
|
|
200,
|
|
{"Content-type": "application/xml"},
|
|
"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<ListBucketResult>
|
|
<Prefix>a_dir/</Prefix>
|
|
<NextMarker>bla</NextMarker>
|
|
<Contents>
|
|
<Key>a_dir/resource3.bin</Key>
|
|
<LastModified>1970-01-01T00:00:01.000Z</LastModified>
|
|
<Size>123456</Size>
|
|
</Contents>
|
|
</ListBucketResult>
|
|
""",
|
|
)
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket2/?delimiter=%2F&marker=bla&prefix=a_dir%2F",
|
|
200,
|
|
{"Content-type": "application/xml"},
|
|
"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<ListBucketResult>
|
|
<Prefix>a_dir/</Prefix>
|
|
<Contents>
|
|
<Key>a_dir/resource4.bin</Key>
|
|
<LastModified>2015-10-16T12:34:56.000Z</LastModified>
|
|
<Size>456789</Size>
|
|
</Contents>
|
|
<CommonPrefixes>
|
|
<Prefix>a_dir/subdir/</Prefix>
|
|
</CommonPrefixes>
|
|
</ListBucketResult>
|
|
""",
|
|
)
|
|
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket2/a_dir/resource3.bin")
|
|
if f is None:
|
|
|
|
if gdaltest.is_travis_branch("trusty"):
|
|
pytest.skip("Skipped on trusty branch, but should be investigated")
|
|
|
|
pytest.fail()
|
|
gdal.VSIFCloseL(f)
|
|
|
|
dir_contents = gdal.ReadDir("/vsigs/gs_fake_bucket2/a_dir")
|
|
assert dir_contents == ["resource3.bin", "resource4.bin", "subdir"]
|
|
assert (
|
|
gdal.VSIStatL("/vsigs/gs_fake_bucket2/a_dir/resource3.bin").size == 123456
|
|
)
|
|
assert gdal.VSIStatL("/vsigs/gs_fake_bucket2/a_dir/resource3.bin").mtime == 1
|
|
|
|
# ReadDir on something known to be a file shouldn't cause network access
|
|
dir_contents = gdal.ReadDir("/vsigs/gs_fake_bucket2/a_dir/resource3.bin")
|
|
assert dir_contents is None
|
|
|
|
# List buckets
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/",
|
|
200,
|
|
{"Content-type": "application/xml"},
|
|
"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<ListAllMyBucketsResult>
|
|
<Buckets>
|
|
<Bucket>
|
|
<Name>mybucket</Name>
|
|
</Bucket>
|
|
</Buckets>
|
|
</ListAllMyBucketsResult>
|
|
""",
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
dir_contents = gdal.ReadDir("/vsigs/")
|
|
assert dir_contents == ["mybucket"]
|
|
|
|
|
|
###############################################################################
|
|
# Test write
|
|
|
|
|
|
def test_vsigs_write(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
with webserver.install_http_handler(webserver.SequentialHandler()):
|
|
f = gdal.VSIFOpenExL(
|
|
"/vsigs/gs_fake_bucket3/another_file.bin",
|
|
"wb",
|
|
0,
|
|
[
|
|
"Content-Type=foo",
|
|
"Content-Encoding=bar",
|
|
"x-goog-storage-class=NEARLINE",
|
|
],
|
|
)
|
|
assert f is not None
|
|
assert gdal.VSIFSeekL(f, gdal.VSIFTellL(f), 0) == 0
|
|
assert gdal.VSIFSeekL(f, 0, 1) == 0
|
|
assert gdal.VSIFSeekL(f, 0, 2) == 0
|
|
assert gdal.VSIFWriteL("foo", 1, 3, f) == 3
|
|
assert gdal.VSIFSeekL(f, gdal.VSIFTellL(f), 0) == 0
|
|
assert gdal.VSIFWriteL("bar", 1, 3, f) == 3
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
if (
|
|
request.headers["Content-Length"] != "6"
|
|
or request.headers["Content-Type"] != "foo"
|
|
or request.headers["Content-Encoding"] != "bar"
|
|
or request.headers["x-goog-storage-class"] != "NEARLINE"
|
|
):
|
|
sys.stderr.write(
|
|
"Did not get expected headers: %s\n" % str(request.headers)
|
|
)
|
|
request.send_response(400)
|
|
request.send_header("Content-Length", 0)
|
|
request.end_headers()
|
|
return
|
|
|
|
request.wfile.write("HTTP/1.1 100 Continue\r\n\r\n".encode("ascii"))
|
|
|
|
content = request.rfile.read(6).decode("ascii")
|
|
if content != "foobar":
|
|
sys.stderr.write("Did not get expected content: %s\n" % content)
|
|
request.send_response(400)
|
|
request.send_header("Content-Length", 0)
|
|
request.end_headers()
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-Length", 0)
|
|
request.end_headers()
|
|
|
|
handler.add("PUT", "/gs_fake_bucket3/another_file.bin", custom_method=method)
|
|
|
|
gdal.ErrorReset()
|
|
with webserver.install_http_handler(handler):
|
|
gdal.VSIFCloseL(f)
|
|
assert gdal.GetLastErrorMsg() == ""
|
|
|
|
|
|
###############################################################################
|
|
# Test rename
|
|
|
|
|
|
def test_vsigs_fake_rename(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/test/source.txt",
|
|
206,
|
|
{"Content-Length": "3", "Content-Range": "bytes 0-2/3"},
|
|
"foo",
|
|
)
|
|
handler.add("GET", "/test/target.txt", 404)
|
|
handler.add(
|
|
"GET", "/test/?delimiter=%2F&max-keys=100&prefix=target.txt%2F", 200
|
|
)
|
|
|
|
def method(request):
|
|
if request.headers["Content-Length"] != "0":
|
|
sys.stderr.write(
|
|
"Did not get expected headers: %s\n" % str(request.headers)
|
|
)
|
|
request.send_response(400)
|
|
return
|
|
if request.headers["x-goog-copy-source"] != "/test/source.txt":
|
|
sys.stderr.write(
|
|
"Did not get expected headers: %s\n" % str(request.headers)
|
|
)
|
|
request.send_response(400)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-Length", 0)
|
|
request.end_headers()
|
|
|
|
handler.add("PUT", "/test/target.txt", custom_method=method)
|
|
handler.add("DELETE", "/test/source.txt", 204)
|
|
|
|
with webserver.install_http_handler(handler):
|
|
assert gdal.Rename("/vsigs/test/source.txt", "/vsigs/test/target.txt") == 0
|
|
|
|
|
|
###############################################################################
|
|
# Test reading/writing ACL
|
|
|
|
|
|
def test_vsigs_acl(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("GET", "/test_metadata/foo.txt?acl", 200, {}, "<foo/>")
|
|
with webserver.install_http_handler(handler):
|
|
md = gdal.GetFileMetadata("/vsigs/test_metadata/foo.txt", "ACL")
|
|
assert "XML" in md and md["XML"] == "<foo/>"
|
|
|
|
# Error cases
|
|
with gdaltest.error_handler():
|
|
assert (
|
|
gdal.GetFileMetadata("/vsigs/test_metadata/foo.txt", "UNSUPPORTED")
|
|
== {}
|
|
)
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("GET", "/test_metadata/foo.txt?acl", 400)
|
|
with webserver.install_http_handler(handler):
|
|
with gdaltest.error_handler():
|
|
assert not gdal.GetFileMetadata("/vsigs/test_metadata/foo.txt", "ACL")
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("PUT", "/test_metadata/foo.txt?acl", 200, expected_body=b"<foo/>")
|
|
with webserver.install_http_handler(handler):
|
|
assert gdal.SetFileMetadata(
|
|
"/vsigs/test_metadata/foo.txt", {"XML": "<foo/>"}, "ACL"
|
|
)
|
|
|
|
# Error cases
|
|
with gdaltest.error_handler():
|
|
assert not gdal.SetFileMetadata(
|
|
"/vsigs/test_metadata/foo.txt", {}, "UNSUPPORTED"
|
|
)
|
|
assert not gdal.SetFileMetadata("/vsigs/test_metadata/foo.txt", {}, "ACL")
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("PUT", "/test_metadata/foo.txt?acl", 400)
|
|
with webserver.install_http_handler(handler):
|
|
with gdaltest.error_handler():
|
|
assert not gdal.SetFileMetadata(
|
|
"/vsigs/test_metadata/foo.txt", {"XML": "<foo/>"}, "ACL"
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Test reading/writing HEADERS
|
|
|
|
|
|
def test_vsigs_headers(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GS_SECRET_ACCESS_KEY": "GS_SECRET_ACCESS_KEY",
|
|
"GS_ACCESS_KEY_ID": "GS_ACCESS_KEY_ID",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("GET", "/test_metadata/foo.txt", 200, {"x-goog-meta-foo": "bar"})
|
|
with webserver.install_http_handler(handler):
|
|
md = gdal.GetFileMetadata("/vsigs/test_metadata/foo.txt", "HEADERS")
|
|
assert "x-goog-meta-foo" in md and md["x-goog-meta-foo"] == "bar"
|
|
|
|
# Write HEADERS domain
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"PUT",
|
|
"/test_metadata/foo.txt",
|
|
200,
|
|
{},
|
|
expected_headers={
|
|
"x-goog-meta-foo": "bar",
|
|
"x-goog-metadata-directive": "REPLACE",
|
|
"x-goog-copy-source": "/test_metadata/foo.txt",
|
|
},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
assert gdal.SetFileMetadata(
|
|
"/vsigs/test_metadata/foo.txt", {"x-goog-meta-foo": "bar"}, "HEADERS"
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials with OAuth2 refresh_token
|
|
|
|
|
|
def test_vsigs_read_credentials_refresh_token_default_gdal_app(
|
|
gs_test_config, webserver_port
|
|
):
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GOA2_AUTH_URL_TOKEN": "http://localhost:%d/accounts.google.com/o/oauth2/token"
|
|
% webserver_port,
|
|
"GS_OAUTH2_REFRESH_TOKEN": "REFRESH_TOKEN",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
with gdaltest.error_handler():
|
|
assert gdal.GetSignedURL("/vsigs/foo/bar") is None
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(int(request.headers["Content-Length"])).decode(
|
|
"ascii"
|
|
)
|
|
if (
|
|
content
|
|
!= "refresh_token=REFRESH_TOKEN&client_id=265656308688.apps.googleusercontent.com&client_secret=0IbTUDOYzaL6vnIdWTuQnvLz&grant_type=refresh_token"
|
|
):
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/accounts.google.com/o/oauth2/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
# Test GS_USER_PROJECT
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/gs_fake_bucket/resource_under_requester_pays",
|
|
200,
|
|
{"Content-type": "text/plain"},
|
|
"foo",
|
|
expected_headers={"x-goog-user-project": "my_project_id"},
|
|
)
|
|
with webserver.install_http_handler(handler):
|
|
with gdaltest.config_option(
|
|
"GS_USER_PROJECT", "my_project_id", thread_local=False
|
|
):
|
|
f = open_for_read(
|
|
"/vsigs_streaming/gs_fake_bucket/resource_under_requester_pays"
|
|
)
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials with OAuth2 refresh_token
|
|
|
|
|
|
def test_vsigs_read_credentials_refresh_token_custom_app(
|
|
gs_test_config, webserver_port
|
|
):
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GOA2_AUTH_URL_TOKEN": "http://localhost:%d/accounts.google.com/o/oauth2/token"
|
|
% webserver_port,
|
|
"GS_OAUTH2_REFRESH_TOKEN": "REFRESH_TOKEN",
|
|
"GS_OAUTH2_CLIENT_ID": "CLIENT_ID",
|
|
"GS_OAUTH2_CLIENT_SECRET": "CLIENT_SECRET",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(int(request.headers["Content-Length"])).decode(
|
|
"ascii"
|
|
)
|
|
if (
|
|
content
|
|
!= "refresh_token=REFRESH_TOKEN&client_id=CLIENT_ID&client_secret=CLIENT_SECRET&grant_type=refresh_token"
|
|
):
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/accounts.google.com/o/oauth2/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials with OAuth2 service account
|
|
|
|
|
|
def test_vsigs_read_credentials_oauth2_service_account(gs_test_config, webserver_port):
|
|
|
|
# Generated with 'openssl genrsa -out rsa-openssl.pem 1024' and
|
|
# 'openssl pkcs8 -nocrypt -in rsa-openssl.pem -inform PEM -topk8 -outform PEM -out rsa-openssl.pkcs8.pem'
|
|
# DO NOT USE in production !!!!
|
|
key = """-----BEGIN PRIVATE KEY-----
|
|
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAOlwJQLLDG1HeLrk
|
|
VNcFR5Qptto/rJE5emRuy0YmkVINT4uHb1be7OOo44C2Ev8QPVtNHHS2XwCY5gTm
|
|
i2RfIBLv+VDMoVQPqqE0LHb0WeqGmM5V1tHbmVnIkCcKMn3HpK30grccuBc472LQ
|
|
DVkkGqIiGu0qLAQ89JP/r0LWWySRAgMBAAECgYAWjsS00WRBByAOh1P/dz4kfidy
|
|
TabiXbiLDf3MqJtwX2Lpa8wBjAc+NKrPXEjXpv0W3ou6Z4kkqKHJpXGg4GRb4N5I
|
|
2FA+7T1lA0FCXa7dT2jvgJLgpBepJu5b//tqFqORb4A4gMZw0CiPN3sUsWsSw5Hd
|
|
DrRXwp6sarzG77kvZQJBAPgysAmmXIIp9j1hrFSkctk4GPkOzZ3bxKt2Nl4GFrb+
|
|
bpKSon6OIhP1edrxTz1SMD1k5FiAAVUrMDKSarbh5osCQQDwxq4Tvf/HiYz79JBg
|
|
Wz5D51ySkbg01dOVgFW3eaYAdB6ta/o4vpHhnbrfl6VO9oUb3QR4hcrruwnDHsw3
|
|
4mDTAkEA9FPZjbZSTOSH/cbgAXbdhE4/7zWOXj7Q7UVyob52r+/p46osAk9i5qj5
|
|
Kvnv2lrFGDrwutpP9YqNaMtP9/aLnwJBALLWf9n+GAv3qRZD0zEe1KLPKD1dqvrj
|
|
j+LNjd1Xp+tSVK7vMs4PDoAMDg+hrZF3HetSQM3cYpqxNFEPgRRJOy0CQQDQlZHI
|
|
yzpSgEiyx8O3EK1iTidvnLXbtWabvjZFfIE/0OhfBmN225MtKG3YLV2HoUvpajLq
|
|
gwE6fxOLyJDxuWRf
|
|
-----END PRIVATE KEY-----
|
|
"""
|
|
gdal.FileFromMemBuffer("/vsimem/pkey", key)
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GO2A_AUD": "http://localhost:%d/oauth2/v4/token" % webserver_port,
|
|
"GOA2_NOW": "123456",
|
|
"GS_OAUTH2_CLIENT_EMAIL": "CLIENT_EMAIL",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
for i in range(2):
|
|
|
|
with gdaltest.config_options(
|
|
{"GS_OAUTH2_PRIVATE_KEY": key}
|
|
if i == 0
|
|
else {"GS_OAUTH2_PRIVATE_KEY_FILE": "/vsimem/pkey"},
|
|
thread_local=False,
|
|
):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(
|
|
int(request.headers["Content-Length"])
|
|
).decode("ascii")
|
|
content_8080 = "grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiAiQ0xJRU5UX0VNQUlMIiwgInNjb3BlIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL2F1dGgvZGV2c3RvcmFnZS5yZWFkX3dyaXRlIiwgImF1ZCI6ICJodHRwOi8vbG9jYWxob3N0OjgwODAvb2F1dGgyL3Y0L3Rva2VuIiwgImlhdCI6IDEyMzQ1NiwgImV4cCI6IDEyNzA1Nn0%3D.DAhqWtBgKpObxZ%2BGiXqwF%2Fa4SS%2FNWQRhLCI7DYZCuOTuf2w7dL8j4CdpiwwzQg1diIus7dyViRfzpsFmuZKAXwL%2B84iBoVVqnJJZ4TgwH49NdfMAnc4Rgm%2Bo2a2nEcMjX%2FbQ3jRY%2B9WNVl96hzULGvLrVeyego2f06wivqmvxHA%3D"
|
|
content_8081 = "grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiAiQ0xJRU5UX0VNQUlMIiwgInNjb3BlIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL2F1dGgvZGV2c3RvcmFnZS5yZWFkX3dyaXRlIiwgImF1ZCI6ICJodHRwOi8vbG9jYWxob3N0OjgwODEvb2F1dGgyL3Y0L3Rva2VuIiwgImlhdCI6IDEyMzQ1NiwgImV4cCI6IDEyNzA1Nn0%3D.0abOEg4%2FRApWTSeAs6YTHaNzdwOgZLm8DTMO2MKlOA%2Fiagyb4cBJxDpkD5gECPvi7qhkg7LsyFuj0a%2BK48Bsuj%2FgLHOU4MpB0dHwYnDO2UXzH%2FUPdgFCVak1P1V%2ByiDA%2B%2Ft4aDI5fD9qefKQiu3wsMDHzP71MNLzayrjqaqKKS4%3D"
|
|
if content not in [content_8080, content_8081]:
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/oauth2/v4/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n"
|
|
% str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
try:
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
except Exception:
|
|
if (
|
|
gdal.GetLastErrorMsg().find(
|
|
"CPLRSASHA256Sign() not implemented"
|
|
)
|
|
>= 0
|
|
):
|
|
pytest.skip()
|
|
|
|
assert data == "foo"
|
|
|
|
gdal.Unlink("/vsimem/pkey")
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials with OAuth2 service account through a json configuration file
|
|
|
|
|
|
def test_vsigs_read_credentials_oauth2_service_account_json_file(
|
|
gs_test_config, webserver_port
|
|
):
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/service_account.json",
|
|
"""{
|
|
"private_key": "-----BEGIN PRIVATE KEY-----\nMIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAOlwJQLLDG1HeLrk\nVNcFR5Qptto/rJE5emRuy0YmkVINT4uHb1be7OOo44C2Ev8QPVtNHHS2XwCY5gTm\ni2RfIBLv+VDMoVQPqqE0LHb0WeqGmM5V1tHbmVnIkCcKMn3HpK30grccuBc472LQ\nDVkkGqIiGu0qLAQ89JP/r0LWWySRAgMBAAECgYAWjsS00WRBByAOh1P/dz4kfidy\nTabiXbiLDf3MqJtwX2Lpa8wBjAc+NKrPXEjXpv0W3ou6Z4kkqKHJpXGg4GRb4N5I\n2FA+7T1lA0FCXa7dT2jvgJLgpBepJu5b//tqFqORb4A4gMZw0CiPN3sUsWsSw5Hd\nDrRXwp6sarzG77kvZQJBAPgysAmmXIIp9j1hrFSkctk4GPkOzZ3bxKt2Nl4GFrb+\nbpKSon6OIhP1edrxTz1SMD1k5FiAAVUrMDKSarbh5osCQQDwxq4Tvf/HiYz79JBg\nWz5D51ySkbg01dOVgFW3eaYAdB6ta/o4vpHhnbrfl6VO9oUb3QR4hcrruwnDHsw3\n4mDTAkEA9FPZjbZSTOSH/cbgAXbdhE4/7zWOXj7Q7UVyob52r+/p46osAk9i5qj5\nKvnv2lrFGDrwutpP9YqNaMtP9/aLnwJBALLWf9n+GAv3qRZD0zEe1KLPKD1dqvrj\nj+LNjd1Xp+tSVK7vMs4PDoAMDg+hrZF3HetSQM3cYpqxNFEPgRRJOy0CQQDQlZHI\nyzpSgEiyx8O3EK1iTidvnLXbtWabvjZFfIE/0OhfBmN225MtKG3YLV2HoUvpajLq\ngwE6fxOLyJDxuWRf\n-----END PRIVATE KEY-----\n",
|
|
"client_email": "CLIENT_EMAIL",
|
|
"type": "service_account"
|
|
}""",
|
|
)
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GOOGLE_APPLICATION_CREDENTIALS": "/vsimem/service_account.json",
|
|
"GO2A_AUD": "http://localhost:%d/oauth2/v4/token" % webserver_port,
|
|
"GOA2_NOW": "123456",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(int(request.headers["Content-Length"])).decode(
|
|
"ascii"
|
|
)
|
|
content_8080 = "grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiAiQ0xJRU5UX0VNQUlMIiwgInNjb3BlIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL2F1dGgvZGV2c3RvcmFnZS5yZWFkX3dyaXRlIiwgImF1ZCI6ICJodHRwOi8vbG9jYWxob3N0OjgwODAvb2F1dGgyL3Y0L3Rva2VuIiwgImlhdCI6IDEyMzQ1NiwgImV4cCI6IDEyNzA1Nn0%3D.DAhqWtBgKpObxZ%2BGiXqwF%2Fa4SS%2FNWQRhLCI7DYZCuOTuf2w7dL8j4CdpiwwzQg1diIus7dyViRfzpsFmuZKAXwL%2B84iBoVVqnJJZ4TgwH49NdfMAnc4Rgm%2Bo2a2nEcMjX%2FbQ3jRY%2B9WNVl96hzULGvLrVeyego2f06wivqmvxHA%3D"
|
|
content_8081 = "grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiAiQ0xJRU5UX0VNQUlMIiwgInNjb3BlIjogImh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL2F1dGgvZGV2c3RvcmFnZS5yZWFkX3dyaXRlIiwgImF1ZCI6ICJodHRwOi8vbG9jYWxob3N0OjgwODEvb2F1dGgyL3Y0L3Rva2VuIiwgImlhdCI6IDEyMzQ1NiwgImV4cCI6IDEyNzA1Nn0%3D.0abOEg4%2FRApWTSeAs6YTHaNzdwOgZLm8DTMO2MKlOA%2Fiagyb4cBJxDpkD5gECPvi7qhkg7LsyFuj0a%2BK48Bsuj%2FgLHOU4MpB0dHwYnDO2UXzH%2FUPdgFCVak1P1V%2ByiDA%2B%2Ft4aDI5fD9qefKQiu3wsMDHzP71MNLzayrjqaqKKS4%3D"
|
|
if content not in [content_8080, content_8081]:
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/oauth2/v4/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
try:
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
if f is None:
|
|
gdal.Unlink("/vsimem/service_account.json")
|
|
pytest.fail()
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
signed_url = gdal.GetSignedURL(
|
|
"/vsigs/gs_fake_bucket/resource", ["START_DATE=20180212T123456Z"]
|
|
)
|
|
if signed_url not in (
|
|
"http://127.0.0.1:8080/gs_fake_bucket/resource?Expires=1518442496&GoogleAccessId=CLIENT_EMAIL&Signature=b19I62KdqV51DpWGxhxGXLGJIA8MHvSJofwOygoeQuIxkM6PmmQFvJYTNWRt9zUVTUoVC0UHVB7ee5Z35NqDC8K4i0quu1hb8Js2B4h0W6OAupvyF3nSQ5D0OJmiSbomGMq0Ehyro5cqJ%2FU%2Fd8oAaKrGKVQScKfXoFrSJBbWkNs%3D",
|
|
"http://127.0.0.1:8081/gs_fake_bucket/resource?Expires=1518442496&GoogleAccessId=CLIENT_EMAIL&Signature=b19I62KdqV51DpWGxhxGXLGJIA8MHvSJofwOygoeQuIxkM6PmmQFvJYTNWRt9zUVTUoVC0UHVB7ee5Z35NqDC8K4i0quu1hb8Js2B4h0W6OAupvyF3nSQ5D0OJmiSbomGMq0Ehyro5cqJ%2FU%2Fd8oAaKrGKVQScKfXoFrSJBbWkNs%3D",
|
|
):
|
|
gdal.Unlink("/vsimem/service_account.json")
|
|
pytest.fail(signed_url)
|
|
|
|
except Exception:
|
|
if gdal.GetLastErrorMsg().find("CPLRSASHA256Sign() not implemented") >= 0:
|
|
pytest.skip()
|
|
|
|
gdal.Unlink("/vsimem/service_account.json")
|
|
|
|
assert data == "foo"
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials with OAuth2 authorized user through a json configuration file
|
|
|
|
|
|
def test_vsigs_read_credentials_oauth2_authorized_user_json_file(
|
|
gs_test_config, webserver_port
|
|
):
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/authorized_user.json",
|
|
"""{
|
|
"client_id": "CLIENT_ID",
|
|
"client_secret": "CLIENT_SECRET",
|
|
"refresh_token": "REFRESH_TOKEN",
|
|
"type": "authorized_user"
|
|
}""",
|
|
)
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"GOOGLE_APPLICATION_CREDENTIALS": "/vsimem/authorized_user.json",
|
|
"GOA2_AUTH_URL_TOKEN": "http://localhost:%d/accounts.google.com/o/oauth2/token"
|
|
% webserver_port,
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(int(request.headers["Content-Length"])).decode(
|
|
"ascii"
|
|
)
|
|
if (
|
|
content
|
|
!= "refresh_token=REFRESH_TOKEN&client_id=CLIENT_ID&client_secret=CLIENT_SECRET&grant_type=refresh_token"
|
|
):
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/accounts.google.com/o/oauth2/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
gdal.Unlink("/vsimem/service_account.json")
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials from simulated ~/.boto
|
|
|
|
|
|
def test_vsigs_read_credentials_file(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/.boto",
|
|
"""
|
|
[unrelated]
|
|
gs_access_key_id = foo
|
|
gs_secret_access_key = bar
|
|
[Credentials]
|
|
gs_access_key_id = GS_ACCESS_KEY_ID
|
|
gs_secret_access_key = GS_SECRET_ACCESS_KEY
|
|
[unrelated]
|
|
gs_access_key_id = foo
|
|
gs_secret_access_key = bar
|
|
""",
|
|
)
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"CPL_GS_TIMESTAMP": "my_timestamp",
|
|
"CPL_GS_CREDENTIALS_FILE": "/vsimem/.boto",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = (
|
|
"GOOG1 GS_ACCESS_KEY_ID:8tndu9//BfmN+Kg4AFLdUMZMBDQ="
|
|
)
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
gdal.Unlink("/vsimem/.boto")
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials from simulated ~/.boto
|
|
|
|
|
|
def test_vsigs_read_credentials_file_refresh_token(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
gdal.FileFromMemBuffer(
|
|
"/vsimem/.boto",
|
|
"""
|
|
[Credentials]
|
|
gs_oauth2_refresh_token = REFRESH_TOKEN
|
|
[OAuth2]
|
|
client_id = CLIENT_ID
|
|
client_secret = CLIENT_SECRET
|
|
""",
|
|
)
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"CPL_GS_CREDENTIALS_FILE": "/vsimem/.boto",
|
|
"GOA2_AUTH_URL_TOKEN": "http://localhost:%d/accounts.google.com/o/oauth2/token"
|
|
% webserver_port,
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
|
|
def method(request):
|
|
content = request.rfile.read(int(request.headers["Content-Length"])).decode(
|
|
"ascii"
|
|
)
|
|
if (
|
|
content
|
|
!= "refresh_token=REFRESH_TOKEN&client_id=CLIENT_ID&client_secret=CLIENT_SECRET&grant_type=refresh_token"
|
|
):
|
|
sys.stderr.write("Bad POST content: %s\n" % content)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
content = """{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}"""
|
|
request.send_header("Content-Length", len(content))
|
|
request.end_headers()
|
|
request.wfile.write(content.encode("ascii"))
|
|
|
|
handler.add("POST", "/accounts.google.com/o/oauth2/token", custom_method=method)
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
# Test UnlinkBatch()
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"POST",
|
|
"/batch/storage/v1",
|
|
200,
|
|
{
|
|
"content-type": "multipart/mixed; boundary=batch_nWfTDwb9aAhYucqUtdLRWUX93qsJaf3T"
|
|
},
|
|
"""--batch_phVs0DE8tHbyfvlYTZEeI5_snlh9XJR5
|
|
Content-Type: application/http
|
|
Content-ID: <response-1>
|
|
|
|
HTTP/1.1 204 No Content
|
|
Content-Length: 0
|
|
|
|
|
|
-batch_phVs0DE8tHbyfvlYTZEeI5_snlh9XJR5
|
|
Content-Type: application/http
|
|
Content-ID: <response-2>
|
|
|
|
HTTP/1.1 204 No Content
|
|
Content-Length: 0
|
|
|
|
|
|
--batch_phVs0DE8tHbyfvlYTZEeI5_snlh9XJR5--
|
|
""",
|
|
expected_body=b"--===============7330845974216740156==\r\nContent-Type: application/http\r\nContent-ID: <1>\r\n\r\n\r\nDELETE /storage/v1/b/unlink_batch/o/foo HTTP/1.1\r\n\r\n\r\n--===============7330845974216740156==\r\nContent-Type: application/http\r\nContent-ID: <2>\r\n\r\n\r\nDELETE /storage/v1/b/unlink_batch/o/bar%2Fbaz HTTP/1.1\r\n\r\n\r\n--===============7330845974216740156==--\r\n",
|
|
)
|
|
handler.add(
|
|
"POST",
|
|
"/batch/storage/v1",
|
|
200,
|
|
{
|
|
"content-type": "multipart/mixed; boundary=batch_nWfTDwb9aAhYucqUtdLRWUX93qsJaf3T"
|
|
},
|
|
"""--batch_phVs0DE8tHbyfvlYTZEeI5_snlh9XJR5
|
|
Content-Type: application/http
|
|
Content-ID: <response-3>
|
|
|
|
HTTP/1.1 204 No Content
|
|
Content-Length: 0
|
|
|
|
|
|
--batch_phVs0DE8tHbyfvlYTZEeI5_snlh9XJR5--
|
|
""",
|
|
expected_body=b"--===============7330845974216740156==\r\nContent-Type: application/http\r\nContent-ID: <3>\r\n\r\n\r\nDELETE /storage/v1/b/unlink_batch/o/baw HTTP/1.1\r\n\r\n\r\n--===============7330845974216740156==--\r\n",
|
|
)
|
|
with gdaltest.config_option(
|
|
"CPL_VSIGS_UNLINK_BATCH_SIZE", "2", thread_local=False
|
|
):
|
|
with webserver.install_http_handler(handler):
|
|
ret = gdal.UnlinkBatch(
|
|
[
|
|
"/vsigs/unlink_batch/foo",
|
|
"/vsigs/unlink_batch/bar/baz",
|
|
"/vsigs/unlink_batch/baw",
|
|
]
|
|
)
|
|
assert ret
|
|
|
|
gdal.Unlink("/vsimem/.boto")
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials from simulated GCE instance
|
|
@pytest.mark.skipif(sys.platform not in ("linux", "win32"), reason="Incorrect platform")
|
|
def test_vsigs_read_credentials_gce(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"CPL_GS_CREDENTIALS_FILE": "",
|
|
"CPL_GCE_CREDENTIALS_URL": "http://localhost:%d/computeMetadata/v1/instance/service-accounts/default/token"
|
|
% webserver_port,
|
|
# Disable hypervisor related check to test if we are really on EC2
|
|
"CPL_GCE_CHECK_LOCAL_FILES": "NO",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add(
|
|
"GET",
|
|
"/computeMetadata/v1/instance/service-accounts/default/token",
|
|
200,
|
|
{},
|
|
"""{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 3600,
|
|
}""",
|
|
)
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
# Set a fake URL to check that credentials re-use works
|
|
with gdaltest.config_option("CPL_GCE_CREDENTIALS_URL", "", thread_local=False):
|
|
|
|
handler = webserver.SequentialHandler()
|
|
handler.add("GET", "/gs_fake_bucket/bar", 200, {}, "bar")
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/bar")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "bar"
|
|
|
|
with gdaltest.error_handler():
|
|
assert gdal.GetSignedURL("/vsigs/foo/bar") is None
|
|
|
|
|
|
###############################################################################
|
|
# Read credentials from simulated GCE instance with expiration of the
|
|
# cached credentials
|
|
@pytest.mark.skipif(sys.platform not in ("linux", "win32"), reason="Incorrect platform")
|
|
def test_vsigs_read_credentials_gce_expiration(gs_test_config, webserver_port):
|
|
|
|
gdal.VSICurlClearCache()
|
|
|
|
with gdaltest.config_options(
|
|
{
|
|
"CPL_GS_CREDENTIALS_FILE": "",
|
|
"CPL_GCE_CREDENTIALS_URL": "http://localhost:%d/computeMetadata/v1/instance/service-accounts/default/token"
|
|
% webserver_port,
|
|
# Disable hypervisor related check to test if we are really on EC2
|
|
"CPL_GCE_CHECK_LOCAL_FILES": "NO",
|
|
},
|
|
thread_local=False,
|
|
):
|
|
|
|
def method(request):
|
|
if "Authorization" not in request.headers:
|
|
sys.stderr.write("Bad headers: %s\n" % str(request.headers))
|
|
request.send_response(403)
|
|
return
|
|
expected_authorization = "Bearer ACCESS_TOKEN"
|
|
if request.headers["Authorization"] != expected_authorization:
|
|
sys.stderr.write(
|
|
"Bad Authorization: '%s'\n" % str(request.headers["Authorization"])
|
|
)
|
|
request.send_response(403)
|
|
return
|
|
|
|
request.send_response(200)
|
|
request.send_header("Content-type", "text/plain")
|
|
request.send_header("Content-Length", 3)
|
|
request.end_headers()
|
|
request.wfile.write("""foo""".encode("ascii"))
|
|
|
|
handler = webserver.SequentialHandler()
|
|
# First time is used when trying to establish if GCE authentication is available
|
|
handler.add(
|
|
"GET",
|
|
"/computeMetadata/v1/instance/service-accounts/default/token",
|
|
200,
|
|
{},
|
|
"""{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 0,
|
|
}""",
|
|
)
|
|
# Second time is needed because f the access to th file
|
|
handler.add(
|
|
"GET",
|
|
"/computeMetadata/v1/instance/service-accounts/default/token",
|
|
200,
|
|
{},
|
|
"""{
|
|
"access_token" : "ACCESS_TOKEN",
|
|
"token_type" : "Bearer",
|
|
"expires_in" : 0,
|
|
}""",
|
|
)
|
|
handler.add("GET", "/gs_fake_bucket/resource", custom_method=method)
|
|
with webserver.install_http_handler(handler):
|
|
f = open_for_read("/vsigs/gs_fake_bucket/resource")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 4, f).decode("ascii")
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert data == "foo"
|
|
|
|
|
|
###############################################################################
|
|
# Nominal cases (require valid credentials)
|
|
|
|
|
|
def test_vsigs_extra_1():
|
|
|
|
gs_resource = gdal.GetConfigOption("GS_RESOURCE")
|
|
if gs_resource is None:
|
|
pytest.skip("Missing GS_RESOURCE")
|
|
|
|
if "/" not in gs_resource:
|
|
path = "/vsigs/" + gs_resource
|
|
statres = gdal.VSIStatL(path)
|
|
assert statres is not None and stat.S_ISDIR(statres.mode), (
|
|
"%s is not a valid bucket" % path
|
|
)
|
|
|
|
readdir = gdal.ReadDir(path)
|
|
assert readdir is not None, "ReadDir() should not return empty list"
|
|
for filename in readdir:
|
|
if filename != ".":
|
|
subpath = path + "/" + filename
|
|
assert gdal.VSIStatL(subpath) is not None, (
|
|
"Stat(%s) should not return an error" % subpath
|
|
)
|
|
|
|
unique_id = "vsigs_test"
|
|
subpath = path + "/" + unique_id
|
|
ret = gdal.Mkdir(subpath, 0)
|
|
assert ret >= 0, "Mkdir(%s) should not return an error" % subpath
|
|
|
|
readdir = gdal.ReadDir(path)
|
|
assert unique_id in readdir, "ReadDir(%s) should contain %s" % (path, unique_id)
|
|
|
|
ret = gdal.Mkdir(subpath, 0)
|
|
assert ret != 0, "Mkdir(%s) repeated should return an error" % subpath
|
|
|
|
ret = gdal.Rmdir(subpath)
|
|
assert ret >= 0, "Rmdir(%s) should not return an error" % subpath
|
|
|
|
readdir = gdal.ReadDir(path)
|
|
assert unique_id not in readdir, "ReadDir(%s) should not contain %s" % (
|
|
path,
|
|
unique_id,
|
|
)
|
|
|
|
ret = gdal.Rmdir(subpath)
|
|
assert ret != 0, "Rmdir(%s) repeated should return an error" % subpath
|
|
|
|
ret = gdal.Mkdir(subpath, 0)
|
|
assert ret >= 0, "Mkdir(%s) should not return an error" % subpath
|
|
|
|
f = gdal.VSIFOpenExL(
|
|
subpath + "/test.txt", "wb", 0, ["Content-Type=foo", "Content-Encoding=bar"]
|
|
)
|
|
assert f is not None
|
|
gdal.VSIFWriteL("hello", 1, 5, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
md = gdal.GetFileMetadata(subpath + "/test.txt", "HEADERS")
|
|
new_md = {}
|
|
for key in md:
|
|
new_md[key.lower()] = md[key]
|
|
md = new_md
|
|
assert "content-type" in md
|
|
assert md["content-type"] == "foo"
|
|
assert "content-encoding" in md
|
|
assert md["content-encoding"] == "bar"
|
|
|
|
ret = gdal.Rmdir(subpath)
|
|
assert ret != 0, (
|
|
"Rmdir(%s) on non empty directory should return an error" % subpath
|
|
)
|
|
|
|
f = gdal.VSIFOpenL(subpath + "/test.txt", "rb")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 5, f).decode("utf-8")
|
|
assert data == "hello"
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert gdal.Rename(subpath + "/test.txt", subpath + "/test2.txt") == 0
|
|
|
|
f = gdal.VSIFOpenL(subpath + "/test2.txt", "rb")
|
|
assert f is not None
|
|
data = gdal.VSIFReadL(1, 5, f).decode("utf-8")
|
|
assert data == "hello"
|
|
gdal.VSIFCloseL(f)
|
|
|
|
ret = gdal.Unlink(subpath + "/test2.txt")
|
|
assert ret >= 0, "Unlink(%s) should not return an error" % (
|
|
subpath + "/test2.txt"
|
|
)
|
|
|
|
ret = gdal.Rmdir(subpath)
|
|
assert ret >= 0, "Rmdir(%s) should not return an error" % subpath
|
|
|
|
return
|
|
|
|
f = open_for_read("/vsigs/" + gs_resource)
|
|
assert f is not None
|
|
ret = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert len(ret) == 1
|
|
|
|
# Same with /vsigs_streaming/
|
|
f = open_for_read("/vsigs_streaming/" + gs_resource)
|
|
assert f is not None
|
|
ret = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert len(ret) == 1
|
|
|
|
if False: # pylint: disable=using-constant-test
|
|
# we actually try to read at read() time and bSetError = false
|
|
# Invalid bucket : "The specified bucket does not exist"
|
|
gdal.ErrorReset()
|
|
f = open_for_read("/vsigs/not_existing_bucket/foo")
|
|
with gdaltest.error_handler():
|
|
gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
assert gdal.VSIGetLastErrorMsg() != ""
|
|
|
|
# Invalid resource
|
|
gdal.ErrorReset()
|
|
f = open_for_read("/vsigs_streaming/" + gs_resource + "/invalid_resource.baz")
|
|
assert f is None, gdal.VSIGetLastErrorMsg()
|
|
|
|
# Test GetSignedURL()
|
|
signed_url = gdal.GetSignedURL("/vsigs/" + gs_resource)
|
|
f = open_for_read("/vsicurl_streaming/" + signed_url)
|
|
assert f is not None
|
|
ret = gdal.VSIFReadL(1, 1, f)
|
|
gdal.VSIFCloseL(f)
|
|
|
|
assert len(ret) == 1
|