Improve sas_blob_utils and add unit tests

There are many bugs in Azure's Python SDK
This commit is contained in:
Chris Yeh 2020-07-15 23:53:44 -06:00
Родитель 10f37be86b
Коммит 4e1f207282
2 изменённых файлов: 316 добавлений и 61 удалений

Просмотреть файл

@ -5,6 +5,8 @@ Licensed under the MIT License.
This module contains helper functions for dealing with Shared Access Signatures
(SAS) tokens for Azure Blob Storage.
This module assumes azure-storage-blob version 12.3.
Documentation for Azure Blob Storage:
https://docs.microsoft.com/en-us/azure/developer/python/sdk/storage/storage-blob-readme
@ -38,48 +40,70 @@ class SasBlob:
@staticmethod
def get_client_from_uri(sas_uri: str) -> ContainerClient:
"""Gets a ContainerClient for the given container URI."""
return ContainerClient.from_container_url(sas_uri)
@staticmethod
def get_account_from_uri(sas_uri: str) -> str:
"""
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
"""
url_parts = parse.urlsplit(sas_uri)
loc = url_parts.netloc # "<account>.blob.windows.net"
return loc.split('.')[0]
@staticmethod
def get_container_from_uri(sas_uri: str) -> str:
def get_container_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Gets the container name from a Azure Blob Storage URI.
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
Args:
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
Returns: str, container name
Raises: ValueError, if sas_uri does not include a container
"""
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path[1:] # remove leading "/" from path
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
container = raw_path.split('/')[0]
if container == '':
raise ValueError('Given sas_uri does not include a container.')
if unquote:
container = parse.unquote(container)
return container
@staticmethod
def get_blob_from_uri(sas_uri: str, unquote_blob: bool = True
) -> Optional[str]:
def get_blob_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Return the path to the blob from the root container if this sas_uri
is for an individual blob; otherwise returns None.
Args:
sas_uri: Azure blob storage SAS token
unquote_blob: Replace any %xx escapes by their single-character
equivalent. Default True.
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
Returns: Path to the blob from the root container or None.
Returns: str, blob name (path to the blob from the root container)
Raises: ValueError, if sas_uri does not include a blob name
"""
# Get the entire path with all slashes after the container
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path[1:] # remove leading "/" from path
container = raw_path.split('/')[0]
parts = raw_path.split(container + '/')
if len(parts) < 2:
return None
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
parts = raw_path.split('/', maxsplit=1)
if len(parts) < 2 or parts[1] == '':
raise ValueError('Given sas_uri does not include a blob name')
blob = parts[1] # first item is an empty string
if unquote_blob:
if unquote:
blob = parse.unquote(blob)
return blob
@staticmethod
@ -88,7 +112,7 @@ class SasBlob:
times and signature.
Args:
sas_uri: Azure blob storage SAS token
sas_uri: str, Azure blob storage SAS token
Returns: Query part of the SAS token, or None if URI has no token.
"""
@ -101,7 +125,7 @@ class SasBlob:
"""Get the resource type pointed to by this SAS token.
Args:
sas_uri: Azure blob storage SAS token
sas_uri: str, Azure blob storage SAS token
Returns: A string (either 'blob' or 'container') or None.
"""
@ -120,7 +144,7 @@ class SasBlob:
"""Get the permissions given by this SAS token.
Args:
sas_uri: Azure blob storage SAS token
sas_uri: str, Azure blob storage SAS token
Returns: A set containing some of 'read', 'write', 'delete' and 'list'.
Empty set returned if no permission specified in sas_uri.
@ -148,32 +172,31 @@ class SasBlob:
@staticmethod
def check_blob_existence(sas_uri: str,
provided_blob_name: Optional[str] = None) -> bool:
blob_name: Optional[str] = None) -> bool:
"""Checks whether a given URI points to an actual blob.
Args:
sas_uri: str, URI to a container or a blob
provided_blob_name: optional str, name of blob
blob_name: optional str, name of blob
must be given if sas_uri is a URI to a container
overrides blob name in sas_uri if sas_uri is a URI to a blob
Returns: bool, whether the sas_uri given points to an existing blob
"""
blob_name = provided_blob_name or SasBlob.get_blob_from_uri(sas_uri)
if blob_name is None:
raise ValueError('Blob name not provided as an argument, and '
'cannot be identified from the URI.')
if blob_name is not None:
account = SasBlob.get_account_from_uri(sas_uri)
container = SasBlob.get_container_from_uri(sas_uri)
sas_token = SasBlob.get_sas_key_from_uri(sas_uri)
container_url = f'https://{account}.blob.core.windows.net/{container}'
container_client = ContainerClient.from_container_url(
container_url, credential=SasBlob.get_sas_key_from_uri(sas_uri))
if sas_token is not None:
container_url += f'?{sas_token}'
sas_uri = SasBlob.generate_blob_sas_uri(
container_url, blob_name=blob_name)
# until Azure implements a proper BlobClient.exists() method, we can
# only use try/except to determine blob existence
# see: https://github.com/Azure/azure-sdk-for-python/issues/9507
blob_client = container_client.get_blob_client(blob_name)
with BlobClient.from_blob_url(sas_uri) as blob_client:
try:
blob_client.get_blob_properties()
except ResourceNotFoundError:
@ -182,22 +205,24 @@ class SasBlob:
@staticmethod
def list_blobs_in_container(
sas_uri: str, max_number_to_list: int,
sas_uri: str,
limit: Optional[int] = None,
blob_prefix: Optional[str] = None,
blob_suffix: Optional[Union[str, Tuple[str]]] = None) -> List[str]:
"""Get a list of blob names/paths in this container.
"""Get a list of blob names in this container.
Args:
sas_uri: Azure blob storage SAS token
max_number_to_list: Maximum number of blob names/paths to list
blob_prefix: Optional, a string as the prefix to blob names/paths to
sas_uri: str, Azure blob storage SAS token
limit: int, maximum # of blob names to list
if None, then returns all blob names
blob_prefix: Optional, a string as the prefix to blob names to
filter the results to those with this prefix
blob_suffix: Optional, a string or a tuple of strings, to filter the
results to those with this/these suffix(s). The blob names will
be lowercased first before comparing with the suffix(es).
Returns:
sorted list of blob names, of length max_number_to_list or shorter.
sorted list of blob names, of length limit or shorter.
"""
print('listing blobs...')
if (SasBlob.get_sas_key_from_uri(sas_uri) is not None
@ -212,17 +237,14 @@ class SasBlob:
and not isinstance(blob_suffix, tuple)):
raise ValueError('blob_suffix must be a str or a tuple of strings')
container_client = SasBlob.get_client_from_uri(sas_uri)
list_blobs = []
with SasBlob.get_client_from_uri(sas_uri) as container_client:
generator = container_client.list_blobs(name_starts_with=blob_prefix)
list_blobs = []
with tqdm() as pbar:
for blob in generator:
for blob in tqdm(generator):
if blob_suffix is None or blob.name.lower().endswith(blob_suffix):
list_blobs.append(blob.name)
pbar.update(1)
if len(list_blobs) == max_number_to_list:
if limit is not None and len(list_blobs) == limit:
break
return sorted(list_blobs) # sort for determinism
@ -230,7 +252,9 @@ class SasBlob:
def generate_writable_container_sas(account_name: str,
account_key: str,
container_name: str,
access_duration_hrs: float) -> str:
access_duration_hrs: float,
account_url: Optional[str] = None
) -> str:
"""Creates a container and returns a SAS URI with read/write/list
permissions.
@ -240,12 +264,19 @@ class SasBlob:
container_name: str, name of container to create, must not match an
existing container in the given storage account
access_duration_hrs: float
account_url: str, optional, defaults to default Azure Storage URL
Returns: str, URL to newly created container
Raises: azure.core.exceptions.ResourceExistsError, if container already
exists
NOTE: This method currently fails on non-default Azure Storage URLs. The
initializer for ContainerClient() assumes the default Azure Storage URL
format, which is a bug that has been reported here:
https://github.com/Azure/azure-sdk-for-python/issues/12568
"""
if account_url is None:
account_url = f'https://{account_name}.blob.core.windows.net'
container_client = ContainerClient(account_url=account_url,
container_name=container_name,
@ -287,15 +318,20 @@ class SasBlob:
sas_uri: str, URI to a blob
Returns:
output_stream: io.BytesIO
output_stream: io.BytesIO, remember to close it when finished using
blob_properties: BlobProperties
Raises: azure.core.exceptions.ResourceNotFoundError, if sas_uri points
to a non-existant blob
NOTE: the returned BlobProperties object may have incorrect values for
the blob name and container name. This is a bug which has been reported
here: https://github.com/Azure/azure-sdk-for-python/issues/12563
"""
blob_client = BlobClient.from_blob_url(sas_uri)
with io.BytesIO() as output_stream:
with BlobClient.from_blob_url(sas_uri) as blob_client:
output_stream = io.BytesIO()
blob_client.download_blob().readinto(output_stream)
output_stream.seek(0)
blob_properties = blob_client.get_blob_properties()
return output_stream, blob_properties
@ -304,15 +340,15 @@ class SasBlob:
"""
Args:
container_sas_uri: str, URI to blob storage container
<account_url>/<container_name>?<sas_token>
blob_name: str, name of blob
Returns: str, blob URI
<account_url>/<container_name>/<blob_name>?<sas_token>
"""
account_name = SasBlob.get_account_from_uri(container_sas_uri)
container_name = SasBlob.get_container_from_uri(container_sas_uri)
account_container = container_sas_uri.split('?', maxsplit=1)[0]
account_url, container_name = account_container.rsplit('/', maxsplit=1)
sas_token = SasBlob.get_sas_key_from_uri(container_sas_uri)
account_url = f'https://{account_name}.blob.core.windows.net'
blob_uri = f'{account_url}/{container_name}/{blob_name}'
if sas_token is not None:
blob_uri += f'?{sas_token}'

219
sas_blob_utils_test.py Normal file
Просмотреть файл

@ -0,0 +1,219 @@
"""
Unit tests for sas_blob_utils.py
In order to test "uploading" blobs without exposing private SAS keys, we use
the Azurite blob storage emulator instead. Instructions for installing:
Azurite documentation:
https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite
https://github.com/azure/azurite
1) Install nodejs. Many ways to do so, but perhaps the easiest is via conda:
conda create -n node -c conda-forge nodejs
conda activate node
2) Install Azurite from npm. The -g option installs the package globally.
npm install -g azurite
3) Run Azurite. The -l flag sets a temp folder where Azurite can store data to
disk. By default, Azurite's blob service runs at 127.0.0.1:10000, which can be
changed by the parameters --blobHost 1.2.3.4 --blobPort 5678.
mkdir $HOME/tmp/azurite
azurite-blob -l $HOME/tmp/azurite
4) Now we can run this unit test:
python test_sas_blob_utils.py -v
Azurite by default supports the following storage account:
- Account name: devstoreaccount1
- Account key: Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
"""
import unittest
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from sas_blob_utils import BlobClient, ContainerClient, SasBlob
PUBLIC_CONTAINER_URI = 'https://lilablobssc.blob.core.windows.net/nacti-unzipped'
PUBLIC_CONTAINER_SAS = 'st=2020-01-01T00%3A00%3A00Z&se=2034-01-01T00%3A00%3A00Z&sp=rl&sv=2019-07-07&sr=c&sig=rsgUcvoniBu/Vjkjzubh6gliU3XGvpE2A30Y0XPW4Vc%3D'
PUBLIC_CONTAINER_URI_WITH_SAS = f'{PUBLIC_CONTAINER_URI}?{PUBLIC_CONTAINER_SAS}'
PUBLIC_BLOB_NAME = 'part0/sub000/2010_Unit150_Ivan097_img0003.jpg'
PUBLIC_INVALID_BLOB_NAME = 'part0/sub000/2010_Unit150_Ivan000_img0003.jpg'
PUBLIC_BLOB_URI = f'{PUBLIC_CONTAINER_URI}/{PUBLIC_BLOB_NAME}'
PUBLIC_BLOB_URI_WITH_SAS = f'{PUBLIC_BLOB_URI}?{PUBLIC_CONTAINER_SAS}'
PUBLIC_INVALID_BLOB_URI = f'{PUBLIC_CONTAINER_URI}/{PUBLIC_INVALID_BLOB_NAME}'
PUBLIC_ZIPPED_CONTAINER_URI = 'https://lilablobssc.blob.core.windows.net/wcs'
# Azurite defaults
PRIVATE_ACCOUNT_NAME = 'devstoreaccount1'
PRIVATE_ACCOUNT_KEY = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=='
PRIVATE_ACCOUNT_URI = f'http://127.0.0.1:10000/{PRIVATE_ACCOUNT_NAME}'
PRIVATE_CONTAINER_NAME = 'mycontainer'
PRIVATE_CONTAINER_URI = f'{PRIVATE_ACCOUNT_URI}/{PRIVATE_CONTAINER_NAME}'
PRIVATE_BLOB_NAME = 'successdir/successblob'
PRIVATE_BLOB_URI = f'{PRIVATE_CONTAINER_URI}/{PRIVATE_BLOB_NAME}'
class TestSasBlobUtils(unittest.TestCase):
needs_cleanup = False
def tearDown(self):
if self.needs_cleanup:
# cleanup: delete the private emulated container
print('running cleanup')
# until the private emulated account is able to work, skip cleanup
# with ContainerClient.from_container_url(
# PRIVATE_CONTAINER_URI,
# credential=PRIVATE_ACCOUNT_KEY) as cc:
# try:
# cc.get_container_properties()
# cc.delete_container()
# except ResourceNotFoundError:
# pass
# if SasBlob.check_blob_existence(PRIVATE_BLOB_URI):
# with BlobClient.from_blob_url(
# PRIVATE_BLOB_URI,
# credential=PRIVATE_ACCOUNT_KEY) as bc:
# bc.delete_blob(delete_snapshots=True)
self.needs_cleanup = False
def test_get_account_from_uri(self):
self.assertEqual(
SasBlob.get_account_from_uri(PUBLIC_BLOB_URI),
'lilablobssc')
def test_get_container_from_uri(self):
self.assertEqual(
SasBlob.get_container_from_uri(PUBLIC_BLOB_URI),
'nacti-unzipped')
def test_get_blob_from_uri(self):
self.assertEqual(
SasBlob.get_blob_from_uri(PUBLIC_BLOB_URI),
PUBLIC_BLOB_NAME)
with self.assertRaises(ValueError):
SasBlob.get_blob_from_uri(PUBLIC_CONTAINER_URI)
def test_get_sas_key_from_uri(self):
self.assertIsNone(SasBlob.get_sas_key_from_uri(PUBLIC_CONTAINER_URI))
self.assertEqual(
SasBlob.get_sas_key_from_uri(PUBLIC_CONTAINER_URI_WITH_SAS),
PUBLIC_CONTAINER_SAS)
def test_check_blob_existence(self):
print('PUBLIC_BLOB_URI')
self.assertTrue(SasBlob.check_blob_existence(PUBLIC_BLOB_URI))
print('PUBLIC_CONTAINER_URI + PUBLIC_BLOB_NAME')
self.assertTrue(SasBlob.check_blob_existence(
PUBLIC_CONTAINER_URI, blob_name=PUBLIC_BLOB_NAME))
print('PUBLIC_CONTAINER_URI')
with self.assertRaises(IndexError):
SasBlob.check_blob_existence(PUBLIC_CONTAINER_URI)
print('PUBLIC_INVALID_BLOB_URI')
self.assertFalse(SasBlob.check_blob_existence(PUBLIC_INVALID_BLOB_URI))
print('PRIVATE_BLOB_URI')
with self.assertRaises(HttpResponseError):
SasBlob.check_blob_existence(PRIVATE_BLOB_URI)
def test_list_blobs_in_container(self):
blobs_list = SasBlob.list_blobs_in_container(
PUBLIC_ZIPPED_CONTAINER_URI, limit=100)
expected = sorted([
'wcs_20200403_bboxes.json.zip', 'wcs_camera_traps.json.zip',
'wcs_camera_traps_00.zip', 'wcs_camera_traps_01.zip',
'wcs_camera_traps_02.zip', 'wcs_camera_traps_03.zip',
'wcs_camera_traps_04.zip', 'wcs_camera_traps_05.zip',
'wcs_camera_traps_06.zip', 'wcs_specieslist.csv',
'wcs_splits.json'])
self.assertEqual(blobs_list, expected)
def test_generate_writable_container_sas(self):
# until the private emulated account is able to work, skip this test
self.skipTest('skipping private account tests for now')
self.needs_cleanup = True
new_sas_uri = SasBlob.generate_writable_container_sas(
account_name=PRIVATE_ACCOUNT_NAME,
account_key=PRIVATE_ACCOUNT_KEY,
container_name=PRIVATE_CONTAINER_NAME,
access_duration_hrs=1,
account_url=PRIVATE_ACCOUNT_URI)
self.assertTrue(isinstance(new_sas_uri, str))
self.assertNotEqual(new_sas_uri, '')
self.assertEqual(len(SasBlob.list_blobs_in_container(new_sas_uri)), 0)
def test_upload_blob(self):
self.needs_cleanup = True
# uploading to a read-only public container without a SAS token yields
# ResourceNotFoundError('The specified resource does not exist.')
print('PUBLIC_CONTAINER_URI')
with self.assertRaises(ResourceNotFoundError):
SasBlob.upload_blob(PUBLIC_CONTAINER_URI,
blob_name='failblob', data='fail')
# uploading to a public container with a read-only SAS token yields
# HttpResponseError('This request is not authorized to perform this '
# 'operation using this permission.')
print('PUBLIC_CONTAINER_URI_WITH_SAS')
with self.assertRaises(HttpResponseError):
SasBlob.upload_blob(PUBLIC_CONTAINER_URI_WITH_SAS,
blob_name='failblob', data='fail')
# uploading to a private container without a SAS token yields
# HttpResponseError('Server failed to authenticate the request. Make '
# 'sure the value of the Authorization header is '
# 'formed correctly including the signature.')
print('PRIVATE_CONTAINER_URI')
with self.assertRaises(HttpResponseError):
SasBlob.upload_blob(PRIVATE_CONTAINER_URI,
blob_name=PRIVATE_BLOB_NAME, data='success')
# until the private emulated account is able to work, skip this test
# private_container_uri_with_sas = SasBlob.generate_writable_container_sas(
# account_name=PRIVATE_ACCOUNT_NAME,
# account_key=PRIVATE_ACCOUNT_KEY,
# container_name=PRIVATE_CONTAINER_NAME,
# access_duration_hrs=1,
# account_url=PRIVATE_ACCOUNT_URI)
# blob_url = SasBlob.upload_blob(
# private_container_uri_with_sas,
# blob_name=PRIVATE_BLOB_NAME, data='success')
# self.assertEqual(blob_url, PRIVATE_BLOB_URI)
def test_get_blob_to_stream(self):
output, props = SasBlob.get_blob_to_stream(PUBLIC_BLOB_URI)
x = output.read()
self.assertEqual(len(x), 376645)
output.close()
# see https://github.com/Azure/azure-sdk-for-python/issues/12563
expected_properties = {
'size': 376645,
# 'name': PUBLIC_BLOB_NAME,
# 'container': 'nacti-unzipped'
}
for k, v in expected_properties.items():
self.assertEqual(props[k], v)
def test_generate_blob_sas_uri(self):
generated = SasBlob.generate_blob_sas_uri(
container_sas_uri=PUBLIC_CONTAINER_URI,
blob_name=PUBLIC_BLOB_NAME)
self.assertEqual(generated, PUBLIC_BLOB_URI)
generated = SasBlob.generate_blob_sas_uri(
container_sas_uri=PUBLIC_CONTAINER_URI_WITH_SAS,
blob_name=PUBLIC_BLOB_NAME)
self.assertEqual(generated, PUBLIC_BLOB_URI_WITH_SAS)
if __name__ == '__main__':
unittest.main()