Update sas_blob_utils.py and unit tests

azure-storage-blob is now on version 12.5.0, which has fixed a lot of the bugs that placed the earlier 12.x versions. Those fixes mean that we can actually run the unit tests now
This commit is contained in:
Christopher Yeh 2020-09-11 01:10:20 +00:00
Родитель 44251e597a
Коммит f64dcd10a6
2 изменённых файлов: 76 добавлений и 55 удалений

Просмотреть файл

@ -31,8 +31,7 @@ from azure.storage.blob import (
BlobProperties,
ContainerClient,
ContainerSasPermissions,
generate_container_sas,
upload_blob_to_url)
generate_container_sas)
def build_azure_storage_uri(
@ -149,7 +148,8 @@ def get_sas_token_from_uri(sas_uri: str) -> Optional[str]:
Args:
sas_uri: str, Azure blob storage SAS token
Returns: Query part of the SAS token, or None if URI has no token.
Returns: str, query part of the SAS token (without leading '?'),
or None if URI has no token.
"""
url_parts = parse.urlsplit(sas_uri)
sas_token = url_parts.query or None # None if query is empty string
@ -222,6 +222,12 @@ def get_all_query_parts(sas_uri: str) -> Dict[str, Any]:
def check_blob_exists(sas_uri: str, blob_name: Optional[str] = None) -> bool:
"""Checks whether a given URI points to an actual blob.
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints. In these cases, create a
BlobClient using the default constructor, instead of from_blob_url(),
and use the BlobClient.exists() method directly.
Args:
sas_uri: str, URI to a container or a blob
if blob_name is given, sas_uri is treated as a container URI
@ -323,18 +329,13 @@ def generate_writable_container_sas(account_name: str,
Raises: azure.core.exceptions.ResourceExistsError, if container already
exists
NOTE: This method currently fails on non-default Azure Storage URLs. The
initializer for ContainerClient() assumes the default Azure Storage URL
format, which is a bug that has been reported here:
https://github.com/Azure/azure-sdk-for-python/issues/12568
"""
if account_url is None:
account_url = build_azure_storage_uri(account=account_name)
container_client = ContainerClient(account_url=account_url,
container_name=container_name,
credential=account_key)
container_client.create_container()
with ContainerClient(account_url=account_url,
container_name=container_name,
credential=account_key) as container_client:
container_client.create_container()
permissions = ContainerSasPermissions(read=True, write=True, list=True)
container_sas_token = generate_container_sas(
@ -348,7 +349,8 @@ def generate_writable_container_sas(account_name: str,
def upload_blob(container_uri: str, blob_name: str,
data: Union[Iterable[AnyStr], IO[AnyStr]]) -> str:
data: Union[Iterable[AnyStr], IO[AnyStr]],
overwrite: bool = False) -> str:
"""Creates a new blob of the given name from an IO stream.
Args:
@ -356,12 +358,15 @@ def upload_blob(container_uri: str, blob_name: str,
blob_name: str, name of blob to upload
data: str, bytes, or IO stream
if str, assumes utf-8 encoding
overwrite: bool, whether to overwrite existing blob (if any)
Returns: str, URL to blob, includes SAS token if container_uri has SAS token
"""
blob_url = build_blob_uri(container_uri, blob_name)
upload_blob_to_url(blob_url, data=data)
return blob_url
account_url, container, sas_token = decompose_container_uri(container_uri)
with BlobClient(account_url=account_url, container_name=container,
blob_name=blob_name, credential=sas_token) as blob_client:
blob_client.upload_blob(data, overwrite=overwrite)
return blob_client.url
def download_blob_to_stream(sas_uri: str) -> Tuple[io.BytesIO, BlobProperties]:
@ -385,6 +390,20 @@ def download_blob_to_stream(sas_uri: str) -> Tuple[io.BytesIO, BlobProperties]:
return output_stream, blob_properties
def decompose_container_uri(container_uri) -> Tuple[str, str, Optional[str]]:
"""
Args:
container_uri: str, URI to blob storage container
<account_url>/<container>?<sas_token>
Returns: account_url, container_name, sas_token
"""
account_container = container_uri.split('?', maxsplit=1)[0]
account_url, container_name = account_container.rsplit('/', maxsplit=1)
sas_token = get_sas_token_from_uri(container_uri)
return account_url, container_name, sas_token
def build_blob_uri(container_uri: str, blob_name: str) -> str:
"""
Args:
@ -395,12 +414,10 @@ def build_blob_uri(container_uri: str, blob_name: str) -> str:
Returns: str, blob URI <account_url>/<container>/<blob_name>?<sas_token>,
<blob_name> is URL-escaped
"""
account_container = container_uri.split('?', maxsplit=1)[0]
account_url, container_name = account_container.rsplit('/', maxsplit=1)
sas_token = get_sas_token_from_uri(container_uri)
account_url, container, sas_token = decompose_container_uri(container_uri)
blob_name = parse.quote(blob_name)
blob_uri = f'{account_url}/{container_name}/{blob_name}'
blob_uri = f'{account_url}/{container}/{blob_name}'
if sas_token is not None:
blob_uri += f'?{sas_token}'
return blob_uri

Просмотреть файл

@ -18,7 +18,8 @@ https://github.com/azure/azurite
3) Run Azurite. The -l flag sets a temp folder where Azurite can store data to
disk. By default, Azurite's blob service runs at 127.0.0.1:10000, which can be
changed by the parameters --blobHost 1.2.3.4 --blobPort 5678.
mkdir $HOME/tmp/azurite
mkdir -p $HOME/tmp/azurite
rm -r $HOME/tmp/azurite/* # if the folder already existed, clear it
azurite-blob -l $HOME/tmp/azurite
4) In a separate terminal, activate a virtual environment with the Azure Storage
@ -82,21 +83,23 @@ class Tests(unittest.TestCase):
# cleanup: delete the private emulated container
print('running cleanup')
# until the private emulated account is able to work, skip cleanup
# with ContainerClient.from_container_url(
# PRIVATE_CONTAINER_URI,
# credential=PRIVATE_ACCOUNT_KEY) as cc:
# try:
# cc.get_container_properties()
# cc.delete_container()
# except ResourceNotFoundError:
# pass
with BlobClient(account_url=PRIVATE_ACCOUNT_URI,
container_name=PRIVATE_CONTAINER_NAME,
blob_name=PRIVATE_BLOB_NAME,
credential=PRIVATE_ACCOUNT_KEY) as bc:
if bc.exists():
print('deleted blob')
bc.delete_blob(delete_snapshots='include')
# if check_blob_exists(PRIVATE_BLOB_URI):
# with BlobClient.from_blob_url(
# PRIVATE_BLOB_URI,
# credential=PRIVATE_ACCOUNT_KEY) as bc:
# bc.delete_blob(delete_snapshots=True)
with ContainerClient.from_container_url(
PRIVATE_CONTAINER_URI,
credential=PRIVATE_ACCOUNT_KEY) as cc:
try:
cc.get_container_properties()
cc.delete_container()
print('deleted container')
except ResourceNotFoundError:
pass
self.needs_cleanup = False
def test_get_account_from_uri(self):
@ -131,10 +134,6 @@ class Tests(unittest.TestCase):
print('PUBLIC_INVALID_BLOB_URI')
self.assertFalse(check_blob_exists(PUBLIC_INVALID_BLOB_URI))
print('PRIVATE_BLOB_URI')
with self.assertRaises(HttpResponseError):
check_blob_exists(PRIVATE_BLOB_URI)
def test_list_blobs_in_container(self):
blobs_list = list_blobs_in_container(
PUBLIC_ZIPPED_CONTAINER_URI, limit=100)
@ -155,9 +154,6 @@ class Tests(unittest.TestCase):
self.assertEqual(blobs_list, expected)
def test_generate_writable_container_sas(self):
# until the private emulated account is able to work, skip this test
self.skipTest('skipping private account tests for now')
self.needs_cleanup = True
new_sas_uri = generate_writable_container_sas(
account_name=PRIVATE_ACCOUNT_NAME,
@ -172,9 +168,9 @@ class Tests(unittest.TestCase):
def test_upload_blob(self):
self.needs_cleanup = True
# uploading to a read-only public container without a SAS token yields
# ResourceNotFoundError('The specified resource does not exist.')
# HttpResponseError('Server failed to authenticate the request.')
print('PUBLIC_CONTAINER_URI')
with self.assertRaises(ResourceNotFoundError):
with self.assertRaises(HttpResponseError):
upload_blob(PUBLIC_CONTAINER_URI,
blob_name='failblob', data='fail')
@ -195,17 +191,25 @@ class Tests(unittest.TestCase):
upload_blob(PRIVATE_CONTAINER_URI,
blob_name=PRIVATE_BLOB_NAME, data='success')
# until the private emulated account is able to work, skip this test
# private_container_uri_sas = generate_writable_container_sas(
# account_name=PRIVATE_ACCOUNT_NAME,
# account_key=PRIVATE_ACCOUNT_KEY,
# container_name=PRIVATE_CONTAINER_NAME,
# access_duration_hrs=1,
# account_url=PRIVATE_ACCOUNT_URI)
# blob_url = upload_blob(
# private_container_uri_sas,
# blob_name=PRIVATE_BLOB_NAME, data='success')
# self.assertEqual(blob_url, PRIVATE_BLOB_URI)
# upload to a private container with a SAS token
private_container_uri_sas = generate_writable_container_sas(
account_name=PRIVATE_ACCOUNT_NAME,
account_key=PRIVATE_ACCOUNT_KEY,
container_name=PRIVATE_CONTAINER_NAME,
access_duration_hrs=1,
account_url=PRIVATE_ACCOUNT_URI)
container_sas = get_sas_token_from_uri(private_container_uri_sas)
private_blob_uri_sas = f'{PRIVATE_BLOB_URI}?{container_sas}'
blob_url = upload_blob(
private_container_uri_sas,
blob_name=PRIVATE_BLOB_NAME, data='success')
self.assertEqual(blob_url, private_blob_uri_sas)
with BlobClient(account_url=PRIVATE_ACCOUNT_URI,
container_name=PRIVATE_CONTAINER_NAME,
blob_name=PRIVATE_BLOB_NAME,
credential=container_sas) as blob_client:
self.assertTrue(blob_client.exists())
def test_download_blob_to_stream(self):
output, props = download_blob_to_stream(PUBLIC_BLOB_URI)