Split sas_blob_utils.SasBlob classmethods into stand-alone module functions
This commit is contained in:
Chris Yeh 2020-07-17 18:29:59 -06:00
Родитель e102a6dce5
Коммит 1b372e84a9
4 изменённых файлов: 628 добавлений и 626 удалений

Просмотреть файл

@ -3,11 +3,15 @@
#
# Miscellaneous Azure utilities
#
import json
import re
from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union
from azure.storage.blob._models import BlobPrefix
from azure.storage.blob import BlobServiceClient
from azure.storage.blob import ContainerClient
import sas_blob_utils
# Based on:
#
@ -39,7 +43,7 @@ def walk_container(container_client, max_depth=-1, prefix='',
if store_folders:
folders.append(prefix + short_name)
depth += 1
walk_blob_hierarchy(prefix=item.name,folders=folders,blobs=blobs)
walk_blob_hierarchy(prefix=item.name, folders=folders, blobs=blobs)
if (debug_max_items > 0) and (len(folders)+len(blobs) > debug_max_items):
return folders, blobs
depth -= 1
@ -47,153 +51,101 @@ def walk_container(container_client, max_depth=-1, prefix='',
if store_blobs:
blobs.append(prefix + short_name)
return folders,blobs
return folders, blobs
folders,blobs = walk_blob_hierarchy()
folders, blobs = walk_blob_hierarchy()
assert(all([s.endswith('/') for s in folders]))
folders = [s.strip('/') for s in folders]
return folders,blobs
return folders, blobs
def list_top_level_blob_folders(container_client):
def list_top_level_blob_folders(container_client: ContainerClient) -> List[str]:
"""
List all top-level folders in the ContainerClient object *container_client*
"""
top_level_folders,_ = walk_container(container_client,max_depth=1,store_blobs=False)
top_level_folders, _ = walk_container(
container_client, max_depth=1, store_blobs=False)
return top_level_folders
#%% Blob enumeration
def concatenate_json_string_lists(input_files,output_file=None):
"""
Given several files that contain json-formatted lists of strings (typically filenames),
concatenate them into one new file.
def concatenate_json_lists(input_files: Iterable[str],
output_file: Optional[str] = None
) -> List[Any]:
"""Given a list of JSON files that contain lists (typically string
filenames), concatenates the lists into a single list and optionally
writes out this list to a new output JSON file.
"""
output_list = []
for fn in input_files:
file_list = json.load(open(fn))
with open(fn, 'r') as f:
file_list = json.load(f)
output_list.extend(file_list)
if output_file is not None:
s = json.dumps(output_list,indent=1)
with open(output_file,'w') as f:
f.write(s)
with open(output_file, 'w') as f:
json.dump(output_list, f, indent=1)
return output_list
def write_list_to_file(output_file,strings):
def write_list_to_file(output_file: str, strings: Sequence[str]) -> None:
"""Writes a list of strings to either a JSON file or text file,
depending on extension of the given file name.
"""
Writes a list of strings to file, either .json or text depending on extension
"""
if output_file.endswith('.json'):
s = json.dumps(strings,indent=1)
with open(output_file,'w') as f:
f.write(s)
else:
with open(output_file,'w') as f:
for fn in strings:
f.write(fn + '\n')
# print('Finished writing list {}'.format(output_file))
with open(output_file, 'w') as f:
if output_file.endswith('.json'):
json.dump(strings, f, indent=1)
else:
f.write('\n'.join(strings))
def read_list_from_file(filename):
"""
Reads a json-formatted list of strings from *filename*
"""
def read_list_from_file(filename: str):
"""Reads a json-formatted list of strings from a file."""
assert filename.endswith('.json')
file_list = json.load(open(filename))
assert isinstance(file_list,list)
with open(filename, 'r') as f:
file_list = json.load(f)
assert isinstance(file_list, list)
for s in file_list:
assert isinstance(s,str)
assert isinstance(s, str)
return file_list
def account_name_to_url(account_name):
storage_account_url_blob = 'https://' + account_name + '.blob.core.windows.net'
return storage_account_url_blob
def copy_file_to_blob(account_name,sas_token,container_name,
local_path,remote_path):
"""
Copies a local file to blob storage
"""
blob_service_client = BlobServiceClient(account_url=account_name_to_url(account_name),
credential=sas_token)
container_client = blob_service_client.get_container_client(container_name)
def upload_file_to_blob(account_name: str,
container_name: str,
local_path: str,
blob_name: str,
sas_token: Optional[str] = None) -> str:
"""Uploads a local file to Azure Blob Storage and returns the uploaded
blob URI (without a SAS token)."""
container_uri = sas_blob_utils.build_azure_storage_uri(
account=account_name, container=container_name, sas_token=sas_token)
with open(local_path, 'rb') as data:
container_client.upload_blob(remote_path, data)
return sas_blob_utils.upload_blob(
container_uri=container_uri, blob_name=blob_name, data=data)
def enumerate_blobs(account_name,sas_token,container_name,
rmatch=None,prefix=None,max_blobs=None):
"""
Enumerates blobs in a container, optionally filtering with a regex
Using the prefix parameter is faster than using a regex starting with ^
sas_token should start with st=
"""
folder_string = '{}/{}'.format(account_name,container_name)
if prefix is not None:
folder_string += '/{}'.format(prefix)
if rmatch is not None:
folder_string += ' (matching {})'.format(rmatch)
print('Enumerating blobs from {}'.format(folder_string))
blob_service_client = BlobServiceClient(account_url=account_name_to_url(account_name),
credential=sas_token)
container_client = blob_service_client.get_container_client(container_name)
generator = container_client.list_blobs(name_starts_with=prefix)
matched_blobs = []
i_blob = 0
for blob in generator:
blob_name = blob.name
if rmatch is not None:
m = re.match(rmatch,blob_name)
if m is None:
continue
matched_blobs.append(blob.name)
i_blob += 1
if (i_blob % 1000) == 0:
print('.',end='')
if (i_blob % 50000) == 0:
print('{} blobs enumerated ({} matches)'.format(i_blob,len(matched_blobs)))
if (max_blobs is not None) and (i_blob >= max_blobs):
print('Terminating enumeration after {} blobs'.format(max_blobs))
break
print('Enumerated {} matching blobs (of {} total) from {}/{}'.format(len(matched_blobs),
i_blob,account_name,container_name))
return matched_blobs
def enumerate_blobs_to_file(output_file,account_name,sas_token,container_name,account_key=None,rmatch=None,prefix=None,max_blobs=None):
def enumerate_blobs_to_file(
output_file: str,
account_name: str,
container_name: str,
sas_token: Optional[str] = None,
blob_prefix: Optional[str] = None,
blob_suffix: Optional[Union[str, Tuple[str]]] = None,
rsearch: Optional[str] = None,
limit: Optional[str] = None
) -> List[str]:
"""
Enumerates to a .json string if output_file ends in ".json", otherwise enumerates to a
newline-delimited list.
See enumerate_blobs for parameter information.
"""
matched_blobs = enumerate_blobs(account_name=account_name,
sas_token=sas_token,
container_name=container_name,
rmatch=rmatch,
prefix=prefix,
max_blobs=max_blobs)
write_list_to_file(output_file,matched_blobs)
container_uri = sas_blob_utils.build_azure_storage_uri(
account=account_name, container=container_name, sas_token=sas_token)
matched_blobs = sas_blob_utils.list_blobs_in_container(
container_uri=container_uri, blob_prefix=blob_prefix,
blob_suffix=blob_suffix, rsearch=rsearch, limit=limit)
write_list_to_file(output_file, matched_blobs)
return matched_blobs

Просмотреть файл

@ -8,33 +8,35 @@
#%% Constants and imports
import os
import glob
import datetime
import glob
import ntpath
import os
import string
from typing import Container, Iterable, List
import unicodedata
#%% General path functions
def recursive_file_list(baseDir, bConvertSlashes=True):
r"""Enumerate files (not directories) in [baseDir], optionally converting
\ to /
"""
Enumerate files (not directories) in [baseDir], optionally converting \ to /
"""
allFiles = []
for root, _, filenames in os.walk(baseDir):
for filename in filenames:
fullPath = os.path.join(root,filename)
fullPath = os.path.join(root, filename)
if bConvertSlashes:
fullPath = fullPath.replace('\\','/')
fullPath = fullPath.replace('\\', '/')
allFiles.append(fullPath)
return allFiles
def split_path(path, maxdepth=100):
"""
r"""
Splits [path] into all its constituent tokens, e.g.:
c:\blah\boo\goo.txt
@ -53,18 +55,18 @@ def split_path(path, maxdepth=100):
def fileparts(n):
"""
p,n,e = fileparts(filename)
r"""
p, n, e = fileparts(filename)
fileparts(r'c:\blah\BLAH.jpg') returns ('c:\blah','BLAH','.jpg')
fileparts(r'c:\blah\BLAH.jpg') returns ('c:\blah', 'BLAH', '.jpg')
Note that the '.' lives with the extension, and separators have been removed.
"""
p = ntpath.dirname(n)
basename = ntpath.basename(n)
n,e = ntpath.splitext(basename)
return p,n,e
n, e = ntpath.splitext(basename)
return p, n, e
if False:
@ -82,13 +84,13 @@ if False:
]
for s in TEST_STRINGS:
p,n,e = fileparts(s)
print('{}:\n[{}],[{}],[{}]\n'.format(s,p,n,e))
p, n, e = fileparts(s)
print('{}:\n[{}],[{}],[{}]\n'.format(s, p, n, e))
def insert_before_extension(filename,s=''):
def insert_before_extension(filename, s=''):
"""
function filename = insert_before_extension(filename,s)
function filename = insert_before_extension(filename, s)
Inserts the string [s] before the extension in [filename], separating with '.'.
@ -102,10 +104,10 @@ def insert_before_extension(filename,s=''):
if len(s) == 0:
s = datetime.datetime.now().strftime('%Y.%m.%d.%H.%M.%S')
p,n,e = fileparts(filename);
p, n, e = fileparts(filename)
fn = n + '.' + s + e
filename = os.path.join(p,fn);
filename = os.path.join(p, fn)
return filename
@ -127,7 +129,7 @@ if False:
for s in TEST_STRINGS:
sOut = insert_before_extension(s)
print('{}: {}'.format(s,sOut))
print('{}: {}'.format(s, sOut))
def top_level_folder(p):
@ -139,15 +141,15 @@ def top_level_folder(p):
if p == '':
return ''
# Path('/blah').parts is ('/','blah')
# Path('/blah').parts is ('/', 'blah')
parts = split_path(p)
if len(parts) == 1:
return parts[0]
drive = os.path.splitdrive(p)[0]
if parts[0] == drive or parts[0] == drive + '/' or parts[0] == drive + '\\' or parts[0] in ['\\','/']:
return os.path.join(parts[0],parts[1])
if parts[0] == drive or parts[0] == drive + '/' or parts[0] == drive + '\\' or parts[0] in ['\\', '/']:
return os.path.join(parts[0], parts[1])
else:
return parts[0]
@ -165,92 +167,75 @@ if False:
#%% Image-related path functions
imageExtensions = ['.jpg','.jpeg','.gif','.png']
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.gif', '.png')
def is_image_file(s):
def is_image_file(s: str, img_extensions: Container[str] = IMG_EXTENSIONS
) -> bool:
"""Checks a file's extension against a hard-coded set of image file
extensions.
"""
Check a file's extension against a hard-coded set of image file extensions '
"""
ext = os.path.splitext(s)[1]
return ext.lower() in imageExtensions
return ext.lower() in img_extensions
def find_image_strings(strings):
def find_image_strings(strings: Iterable[str]) -> List[str]:
"""Given a list of strings that are potentially image file names, looks for
strings that actually look like image file names (based on extension).
"""
Given a list of strings that are potentially image file names, look for strings
that actually look like image file names (based on extension).
return [s for s in strings if is_image_file(s)]
def find_images(dirName: str, bRecursive=False) -> List[str]:
"""Finds all files in a directory that look like image file names. Returns
absolute paths.
"""
imageStrings = []
bIsImage = [False] * len(strings)
for iString,f in enumerate(strings):
bIsImage[iString] = is_image_file(f)
if bIsImage[iString]:
imageStrings.append(f)
return imageStrings
def find_images(dirName,bRecursive=False):
"""
Find all files in a directory that look like image file names. Returns absolute
paths.
"""
if bRecursive:
strings = glob.glob(os.path.join(dirName,'**','*.*'), recursive=True)
strings = glob.glob(os.path.join(dirName, '**', '*.*'), recursive=True)
else:
strings = glob.glob(os.path.join(dirName,'*.*'))
imageStrings = find_image_strings(strings)
return imageStrings
strings = glob.glob(os.path.join(dirName, '*.*'))
return find_image_strings(strings)
#%% Filename-cleaning functions
import unicodedata
import string
VALID_FILENAME_CHARS = f"~-_.() {string.ascii_letters}{string.digits}"
SEPARATOR_CHARS = r":\/"
VALID_PATH_CHARS = VALID_FILENAME_CHARS + SEPARATOR_CHARS
CHAR_LIMIT = 255
valid_filename_chars = "~-_.() %s%s" % (string.ascii_letters, string.digits)
valid_path_chars = valid_filename_chars + "\\/:"
separator_chars = ":/\\"
char_limit = 255
def clean_filename(filename: str, whitelist: str = VALID_FILENAME_CHARS,
char_limit: int = CHAR_LIMIT) -> str:
r"""Removes non-ASCII and other invalid filename characters (on any
reasonable OS) from a filename, then trims to a maximum length.
def clean_filename(filename, whitelist=valid_filename_chars):
"""
Removes invalid characters (on any reasonable OS) in a filename, trims to a
maximum length, and removes unicode characters.
Does not allow :\/, use clean_path if you want to preserve those.
Does not allow :\/ , use clean_path if you want to preserve those
Adapted from: https://gist.github.com/wassname/1393c4a57cfcbf03641dbc31886123b8
Adapted from
https://gist.github.com/wassname/1393c4a57cfcbf03641dbc31886123b8
"""
# keep only valid ascii chars
cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
cleaned_filename = (unicodedata.normalize('NFKD', filename)
.encode('ASCII', 'ignore').decode())
# keep only whitelisted chars
cleaned_filename = ''.join([c for c in cleaned_filename if c in whitelist])
return cleaned_filename[:char_limit]
def clean_path(pathname, whitelist=valid_path_chars):
def clean_path(pathname: str, whitelist: str = VALID_PATH_CHARS,
char_limit: int = CHAR_LIMIT) -> str:
"""Removes non-ASCII and other invalid path characters (on any reasonable
OS) from a path, then trims to a maximum length.
"""
Removes invalid characters (on any reasonable OS) in a filename, trims to a
maximum length, and removes unicode characters.
"""
return clean_filename(pathname,whitelist=whitelist)
return clean_filename(pathname, whitelist=whitelist, char_limit=char_limit)
def flatten_path(pathname):
"""
Removes invalid characters (on any reasonable OS) in a filename, trims to a
maximum length, and removes unicode characters, then replaces all valid separators
with '~'.
def flatten_path(pathname, separator_chars: str = SEPARATOR_CHARS) -> str:
"""Removes non-ASCII and other invalid path characters (on any reasonable
OS) from a path, then trims to a maximum length. Replaces all valid
separators with '~'.
"""
s = clean_path(pathname)
for c in separator_chars:
s = s.replace(c,'~')
s = s.replace(c, '~')
return s

Просмотреть файл

@ -5,6 +5,9 @@ Licensed under the MIT License.
This module contains helper functions for dealing with Shared Access Signatures
(SAS) tokens for Azure Blob Storage.
The default Azure Storage SAS URI format is:
https://<account>.blob.core.windows.net/<container>/<blob>?<sas_token>
This module assumes azure-storage-blob version 12.3.
Documentation for Azure Blob Storage:
@ -15,6 +18,7 @@ https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
"""
from datetime import datetime, timedelta
import io
import re
from typing import (
Any, AnyStr, Dict, IO, Iterable, List, Optional, Set, Tuple, Union)
from urllib import parse
@ -32,323 +36,367 @@ from azure.storage.blob import (
from azure.core.exceptions import ResourceNotFoundError
class SasBlob:
"""Convenience methods for managing SAS URIs.
Default Azure Storage SAS URI:
https://<account>.blob.core.windows.net/<container>/<blob>?<sas_token>
def build_azure_storage_uri(
account: str,
container: Optional[str] = None,
blob: Optional[str] = None,
sas_token: Optional[str] = None,
account_url_template: str = 'https://{account}.blob.core.windows.net'
) -> str:
"""
@staticmethod
def _get_resource_reference(prefix: str) -> str:
return '{}{}'.format(prefix, str(uuid.uuid4()).replace('-', ''))
Args:
account: str, name of Azure Storage account
container: optional str, name of Azure Blob Storage container
blob: optional str, name of blob
if blob is given, must also specify container
sas_token: optional str, Shared Access Signature (SAS)
does not start with '?'
account_url_template: str, Python 3 string formatting template,
contains '{account}' placeholder, defaults to default Azure
Storage URL format. Set this value if using Azurite Azure Storage
emulator.
@staticmethod
def get_client_from_uri(sas_uri: str) -> ContainerClient:
"""Gets a ContainerClient for the given container URI."""
return ContainerClient.from_container_url(sas_uri)
Returns: str, Azure storage URI
"""
uri = account_url_template.format(account=account)
if container is not None:
uri = f'{uri}/{container}'
if blob is not None:
assert container is not None
uri = f'{uri}/{blob}'
if sas_token is not None:
uri = f'{uri}?{sas_token}'
return uri
@staticmethod
def get_account_from_uri(sas_uri: str) -> str:
"""
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
"""
url_parts = parse.urlsplit(sas_uri)
loc = url_parts.netloc # "<account>.blob.windows.net"
return loc.split('.')[0]
@staticmethod
def get_container_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Gets the container name from a Azure Blob Storage URI.
def _get_resource_reference(prefix: str) -> str:
return '{}{}'.format(prefix, str(uuid.uuid4()).replace('-', ''))
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
Args:
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
def get_client_from_uri(container_uri: str) -> ContainerClient:
"""Gets a ContainerClient for the given container URI."""
return ContainerClient.from_container_url(container_uri)
Returns: str, container name
Raises: ValueError, if sas_uri does not include a container
"""
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
container = raw_path.split('/')[0]
if container == '':
raise ValueError('Given sas_uri does not include a container.')
if unquote:
container = parse.unquote(container)
return container
def get_account_from_uri(sas_uri: str) -> str:
"""
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
"""
url_parts = parse.urlsplit(sas_uri)
loc = url_parts.netloc # "<account>.blob.core.windows.net"
return loc.split('.')[0]
@staticmethod
def get_blob_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Return the path to the blob from the root container if this sas_uri
is for an individual blob; otherwise returns None.
Args:
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
def get_container_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Gets the container name from a Azure Blob Storage URI.
Returns: str, blob name (path to the blob from the root container)
Assumes that sas_uri points to Azure Blob Storage account hosted at
a default Azure URI. Does not work for locally-emulated Azure Storage
or Azure Storage hosted at custom endpoints.
Raises: ValueError, if sas_uri does not include a blob name
"""
# Get the entire path with all slashes after the container
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
parts = raw_path.split('/', maxsplit=1)
if len(parts) < 2 or parts[1] == '':
raise ValueError('Given sas_uri does not include a blob name')
Args:
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
blob = parts[1] # first item is an empty string
if unquote:
blob = parse.unquote(blob)
return blob
Returns: str, container name
@staticmethod
def get_sas_key_from_uri(sas_uri: str) -> Optional[str]:
"""Get the query part of the SAS token that contains permissions, access
times and signature.
Raises: ValueError, if sas_uri does not include a container
"""
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
container = raw_path.split('/')[0]
if container == '':
raise ValueError('Given sas_uri does not include a container.')
if unquote:
container = parse.unquote(container)
return container
Args:
sas_uri: str, Azure blob storage SAS token
Returns: Query part of the SAS token, or None if URI has no token.
"""
url_parts = parse.urlsplit(sas_uri)
sas_token = url_parts.query or None # None if query is empty string
return sas_token
def get_blob_from_uri(sas_uri: str, unquote: bool = True) -> str:
"""Return the path to the blob from the root container if this sas_uri
is for an individual blob; otherwise returns None.
@staticmethod
def get_resource_type_from_uri(sas_uri: str) -> Optional[str]:
"""Get the resource type pointed to by this SAS token.
Args:
sas_uri: str, Azure blob storage URI, may include SAS token
unquote: bool, whether to replace any %xx escapes by their
single-character equivalent, default True
Args:
sas_uri: str, Azure blob storage SAS token
Returns: str, blob name (path to the blob from the root container)
Returns: A string (either 'blob' or 'container') or None.
"""
url_parts = parse.urlsplit(sas_uri)
data = parse.parse_qs(url_parts.query)
if 'sr' in data:
types = data['sr']
if 'b' in types:
return 'blob'
elif 'c' in types:
return 'container'
return None
Raises: ValueError, if sas_uri does not include a blob name
"""
# Get the entire path with all slashes after the container
url_parts = parse.urlsplit(sas_uri)
raw_path = url_parts.path.lstrip('/') # remove leading "/" from path
parts = raw_path.split('/', maxsplit=1)
if len(parts) < 2 or parts[1] == '':
raise ValueError('Given sas_uri does not include a blob name')
@staticmethod
def get_permissions_from_uri(sas_uri: str) -> Set[str]:
"""Get the permissions given by this SAS token.
blob = parts[1] # first item is an empty string
if unquote:
blob = parse.unquote(blob)
return blob
Args:
sas_uri: str, Azure blob storage SAS token
Returns: A set containing some of 'read', 'write', 'delete' and 'list'.
Empty set returned if no permission specified in sas_uri.
"""
url_parts = parse.urlsplit(sas_uri)
data = parse.parse_qs(url_parts.query)
permissions_set = set()
if 'sp' in data:
permissions = data['sp'][0]
if 'r' in permissions:
permissions_set.add('read')
if 'w' in permissions:
permissions_set.add('write')
if 'd' in permissions:
permissions_set.add('delete')
if 'l' in permissions:
permissions_set.add('list')
return permissions_set
def get_sas_token_from_uri(sas_uri: str) -> Optional[str]:
"""Get the query part of the SAS token that contains permissions, access
times and signature.
@staticmethod
def get_all_query_parts(sas_uri: str) -> Dict[str, Any]:
"""Gets the SAS token parameters."""
url_parts = parse.urlsplit(sas_uri)
return parse.parse_qs(url_parts.query)
Args:
sas_uri: str, Azure blob storage SAS token
@staticmethod
def check_blob_existence(sas_uri: str,
blob_name: Optional[str] = None) -> bool:
"""Checks whether a given URI points to an actual blob.
Returns: Query part of the SAS token, or None if URI has no token.
"""
url_parts = parse.urlsplit(sas_uri)
sas_token = url_parts.query or None # None if query is empty string
return sas_token
Args:
sas_uri: str, URI to a container or a blob
if blob_name is given, sas_uri is treated as a container URI
otherwise, sas_uri is treated as a blob URI
blob_name: optional str, name of blob
must be given if sas_uri is a URI to a container
Returns: bool, whether the sas_uri given points to an existing blob
"""
if blob_name is not None:
sas_uri = SasBlob.generate_blob_sas_uri(
container_sas_uri=sas_uri, blob_name=blob_name)
def get_resource_type_from_uri(sas_uri: str) -> Optional[str]:
"""Get the resource type pointed to by this SAS token.
# until Azure implements a proper BlobClient.exists() method, we can
# only use try/except to determine blob existence
# see: https://github.com/Azure/azure-sdk-for-python/issues/9507
with BlobClient.from_blob_url(sas_uri) as blob_client:
try:
blob_client.get_blob_properties()
except ResourceNotFoundError:
return False
return True
Args:
sas_uri: str, Azure blob storage URI with SAS token
@staticmethod
def list_blobs_in_container(
sas_uri: str,
limit: Optional[int] = None,
blob_prefix: Optional[str] = None,
blob_suffix: Optional[Union[str, Tuple[str]]] = None) -> List[str]:
"""Get a list of blob names in this container.
Returns: A string (either 'blob' or 'container') or None.
"""
url_parts = parse.urlsplit(sas_uri)
data = parse.parse_qs(url_parts.query)
if 'sr' in data:
types = data['sr']
if 'b' in types:
return 'blob'
elif 'c' in types:
return 'container'
return None
Args:
sas_uri: str, Azure blob storage SAS token
limit: int, maximum # of blob names to list
if None, then returns all blob names
blob_prefix: Optional, a string as the prefix to blob names to
filter the results to those with this prefix
blob_suffix: Optional, a string or a tuple of strings, to filter the
results to those with this/these suffix(s). The blob names will
be lowercased first before comparing with the suffix(es).
Returns:
sorted list of blob names, of length limit or shorter.
"""
print('listing blobs...')
if (SasBlob.get_sas_key_from_uri(sas_uri) is not None
and SasBlob.get_resource_type_from_uri(sas_uri) != 'container'):
raise ValueError('The SAS token provided is not for a container.')
def get_permissions_from_uri(sas_uri: str) -> Set[str]:
"""Get the permissions given by this SAS token.
if blob_prefix is not None and not isinstance(blob_prefix, str):
raise ValueError('blob_prefix must be a str.')
Args:
sas_uri: str, Azure blob storage URI with SAS token
if (blob_suffix is not None
and not isinstance(blob_suffix, str)
and not isinstance(blob_suffix, tuple)):
raise ValueError('blob_suffix must be a str or a tuple of strings')
Returns: A set containing some of 'read', 'write', 'delete' and 'list'.
Empty set returned if no permission specified in sas_uri.
"""
url_parts = parse.urlsplit(sas_uri)
data = parse.parse_qs(url_parts.query)
permissions_set = set()
if 'sp' in data:
permissions = data['sp'][0]
if 'r' in permissions:
permissions_set.add('read')
if 'w' in permissions:
permissions_set.add('write')
if 'd' in permissions:
permissions_set.add('delete')
if 'l' in permissions:
permissions_set.add('list')
return permissions_set
list_blobs = []
with SasBlob.get_client_from_uri(sas_uri) as container_client:
generator = container_client.list_blobs(name_starts_with=blob_prefix) # pylint: disable=line-too-long
def get_all_query_parts(sas_uri: str) -> Dict[str, Any]:
"""Gets the SAS token parameters."""
url_parts = parse.urlsplit(sas_uri)
return parse.parse_qs(url_parts.query)
def check_blob_existence(sas_uri: str,
blob_name: Optional[str] = None) -> bool:
"""Checks whether a given URI points to an actual blob.
Args:
sas_uri: str, URI to a container or a blob
if blob_name is given, sas_uri is treated as a container URI
otherwise, sas_uri is treated as a blob URI
blob_name: optional str, name of blob
must be given if sas_uri is a URI to a container
Returns: bool, whether the sas_uri given points to an existing blob
"""
if blob_name is not None:
sas_uri = build_blob_uri(
container_uri=sas_uri, blob_name=blob_name)
# until Azure implements a proper BlobClient.exists() method, we can
# only use try/except to determine blob existence
# see: https://github.com/Azure/azure-sdk-for-python/issues/9507
with BlobClient.from_blob_url(sas_uri) as blob_client:
try:
blob_client.get_blob_properties()
except ResourceNotFoundError:
return False
return True
def list_blobs_in_container(
container_uri: str,
limit: Optional[int] = None,
blob_prefix: Optional[str] = None,
blob_suffix: Optional[Union[str, Tuple[str]]] = None,
rsearch: Optional[str] = None
) -> List[str]:
"""Get a sorted list of blob names in this container.
Args:
container_uri: str, URI to a container, may include SAS token
limit: int, maximum # of blob names to list
if None, then returns all blob names
blob_prefix: optional str, returned results will only contain blob names
to with this prefix
blob_suffix: optional str or tuple of str, returned results will only
contain blob names with this/these suffix(es). The blob names will
be lowercased first before comparing with the suffix(es).
rsearch: optional str, returned results will only contain blob names
that match this Python regex pattern at any point in the blob name.
Use '^' character to only match from the beginning of the blob name.
Returns:
sorted list of blob names, of length limit or shorter.
"""
print('listing blobs...')
if (get_sas_token_from_uri(container_uri) is not None
and get_resource_type_from_uri(container_uri) != 'container'):
raise ValueError('The SAS token provided is not for a container.')
if blob_prefix is not None and not isinstance(blob_prefix, str):
raise ValueError('blob_prefix must be a str.')
if (blob_suffix is not None
and not isinstance(blob_suffix, str)
and not isinstance(blob_suffix, tuple)):
raise ValueError('blob_suffix must be a str or a tuple of strings')
list_blobs = []
with get_client_from_uri(container_uri) as container_client:
generator = container_client.list_blobs(
name_starts_with=blob_prefix)
if blob_suffix is None and rsearch is None:
list_blobs = [blob.name for blob in tqdm(generator)]
i = len(list_blobs)
else:
i = 0
for blob in tqdm(generator):
if blob_suffix is None or blob.name.lower().endswith(blob_suffix): # pylint: disable=line-too-long
i += 1
suffix_ok = (blob_suffix is None
or blob.name.lower().endswith(blob_suffix))
regex_ok = (rsearch is None
or re.search(rsearch, blob.name) is not None)
if suffix_ok and regex_ok:
list_blobs.append(blob.name)
if limit is not None and len(list_blobs) == limit:
break
return sorted(list_blobs) # sort for determinism
@staticmethod
def generate_writable_container_sas(account_name: str,
account_key: str,
container_name: str,
access_duration_hrs: float,
account_url: Optional[str] = None
) -> str:
"""Creates a container and returns a SAS URI with read/write/list
permissions.
print(f'Enumerated {len(list_blobs)} matching blobs out of {i} total')
return sorted(list_blobs) # sort for determinism
Args:
account_name: str, name of blob storage account
account_key: str, account SAS token or account shared access key
container_name: str, name of container to create, must not match an
existing container in the given storage account
access_duration_hrs: float
account_url: str, optional, defaults to default Azure Storage URL
Returns: str, URL to newly created container
def generate_writable_container_sas(account_name: str,
account_key: str,
container_name: str,
access_duration_hrs: float,
account_url: Optional[str] = None
) -> str:
"""Creates a container and returns a SAS URI with read/write/list
permissions.
Raises: azure.core.exceptions.ResourceExistsError, if container already
exists
Args:
account_name: str, name of blob storage account
account_key: str, account SAS token or account shared access key
container_name: str, name of container to create, must not match an
existing container in the given storage account
access_duration_hrs: float
account_url: str, optional, defaults to default Azure Storage URL
NOTE: This method currently fails on non-default Azure Storage URLs. The
initializer for ContainerClient() assumes the default Azure Storage URL
format, which is a bug that has been reported here:
https://github.com/Azure/azure-sdk-for-python/issues/12568
"""
if account_url is None:
account_url = f'https://{account_name}.blob.core.windows.net'
container_client = ContainerClient(account_url=account_url,
container_name=container_name,
credential=account_key)
container_client.create_container()
Returns: str, URL to newly created container
permissions = ContainerSasPermissions(read=True, write=True, list=True)
container_sas_token = generate_container_sas(
account_name=account_name,
container_name=container_name,
account_key=account_key,
permission=permissions,
expiry=datetime.utcnow() + timedelta(hours=access_duration_hrs))
Raises: azure.core.exceptions.ResourceExistsError, if container already
exists
return f'{account_url}/{container_name}?{container_sas_token}'
NOTE: This method currently fails on non-default Azure Storage URLs. The
initializer for ContainerClient() assumes the default Azure Storage URL
format, which is a bug that has been reported here:
https://github.com/Azure/azure-sdk-for-python/issues/12568
"""
if account_url is None:
account_url = build_azure_storage_uri(account=account_name)
container_client = ContainerClient(account_url=account_url,
container_name=container_name,
credential=account_key)
container_client.create_container()
@staticmethod
def upload_blob(container_sas_uri: str, blob_name: str,
data: Union[Iterable[AnyStr], IO[AnyStr]]) -> str:
"""Creates a new blob of the given name from an IO stream.
permissions = ContainerSasPermissions(read=True, write=True, list=True)
container_sas_token = generate_container_sas(
account_name=account_name,
container_name=container_name,
account_key=account_key,
permission=permissions,
expiry=datetime.utcnow() + timedelta(hours=access_duration_hrs))
Args:
container_sas_uri: str, URI to a container
blob_name: str, name of blob to upload
data: str, bytes, or IO stream
if str, assumes utf-8 encoding
return f'{account_url}/{container_name}?{container_sas_token}'
Returns: str, URI to blob
"""
blob_url = SasBlob.generate_blob_sas_uri(container_sas_uri, blob_name)
upload_blob_to_url(blob_url, data=data)
return blob_url
@staticmethod
def get_blob_to_stream(sas_uri: str) -> Tuple[io.BytesIO, BlobProperties]:
"""Downloads a blob to an IO stream.
def upload_blob(container_uri: str, blob_name: str,
data: Union[Iterable[AnyStr], IO[AnyStr]]) -> str:
"""Creates a new blob of the given name from an IO stream.
Args:
sas_uri: str, URI to a blob
Args:
container_uri: str, URI to a container, may include SAS token
blob_name: str, name of blob to upload
data: str, bytes, or IO stream
if str, assumes utf-8 encoding
Returns:
output_stream: io.BytesIO, remember to close it when finished using
blob_properties: BlobProperties
Returns: str, URI to blob
"""
blob_url = build_blob_uri(container_uri, blob_name)
upload_blob_to_url(blob_url, data=data)
return blob_url
Raises: azure.core.exceptions.ResourceNotFoundError, if sas_uri points
to a non-existant blob
NOTE: the returned BlobProperties object may have incorrect values for
the blob name and container name. This is a bug which has been reported
here: https://github.com/Azure/azure-sdk-for-python/issues/12563
"""
with BlobClient.from_blob_url(sas_uri) as blob_client:
output_stream = io.BytesIO()
blob_client.download_blob().readinto(output_stream)
output_stream.seek(0)
blob_properties = blob_client.get_blob_properties()
return output_stream, blob_properties
def download_blob_to_stream(sas_uri: str) -> Tuple[io.BytesIO, BlobProperties]:
"""Downloads a blob to an IO stream.
@staticmethod
def generate_blob_sas_uri(container_sas_uri: str, blob_name: str) -> str:
"""
Args:
container_sas_uri: str, URI to blob storage container
<account_url>/<container_name>?<sas_token>
blob_name: str, name of blob
Args:
sas_uri: str, URI to a blob
Returns: str, blob URI
<account_url>/<container_name>/<blob_name>?<sas_token>
"""
account_container = container_sas_uri.split('?', maxsplit=1)[0]
account_url, container_name = account_container.rsplit('/', maxsplit=1)
sas_token = SasBlob.get_sas_key_from_uri(container_sas_uri)
blob_uri = f'{account_url}/{container_name}/{blob_name}'
if sas_token is not None:
blob_uri += f'?{sas_token}'
return blob_uri
Returns:
output_stream: io.BytesIO, remember to close it when finished using
blob_properties: BlobProperties
Raises: azure.core.exceptions.ResourceNotFoundError, if sas_uri points
to a non-existant blob
NOTE: the returned BlobProperties object may have incorrect values for
the blob name and container name. This is a bug which has been reported
here: https://github.com/Azure/azure-sdk-for-python/issues/12563
"""
with BlobClient.from_blob_url(sas_uri) as blob_client:
output_stream = io.BytesIO()
blob_client.download_blob().readinto(output_stream)
output_stream.seek(0)
blob_properties = blob_client.get_blob_properties()
return output_stream, blob_properties
def build_blob_uri(container_uri: str, blob_name: str) -> str:
"""
Args:
container_uri: str, URI to blob storage container
<account_url>/<container_name>?<sas_token>
blob_name: str, name of blob
Returns: str, blob URI
<account_url>/<container_name>/<blob_name>?<sas_token>
"""
account_container = container_uri.split('?', maxsplit=1)[0]
account_url, container_name = account_container.rsplit('/', maxsplit=1)
sas_token = get_sas_token_from_uri(container_uri)
blob_uri = f'{account_url}/{container_name}/{blob_name}'
if sas_token is not None:
blob_uri += f'?{sas_token}'
return blob_uri

Просмотреть файл

@ -23,18 +23,32 @@ changed by the parameters --blobHost 1.2.3.4 --blobPort 5678.
4) In a separate terminal, activate a virtual environment with the Azure Storage
Python SDK v12, then run this unit test:
# run all tests, -v for verbose output
python sas_blob_utils_test.py -v
# run a specific test
python -m unittest -v sas_blob_utils_test.Tests.test_list_blobs_in_container
Azurite by default supports the following storage account:
- Account name: devstoreaccount1
- Account key: Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== # pylint: disable=line-too-long
"""
import unittest
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azure.storage.blob import BlobClient, ContainerClient
from sas_blob_utils import BlobClient, ContainerClient, SasBlob
from sas_blob_utils import (
build_blob_uri,
check_blob_existence,
download_blob_to_stream,
generate_writable_container_sas,
get_account_from_uri,
get_blob_from_uri,
get_container_from_uri,
get_sas_token_from_uri,
list_blobs_in_container,
upload_blob)
PUBLIC_CONTAINER_URI = 'https://lilablobssc.blob.core.windows.net/nacti-unzipped' # pylint: disable=line-too-long
@ -58,7 +72,7 @@ PRIVATE_BLOB_NAME = 'successdir/successblob'
PRIVATE_BLOB_URI = f'{PRIVATE_CONTAINER_URI}/{PRIVATE_BLOB_NAME}'
class TestSasBlobUtils(unittest.TestCase):
class Tests(unittest.TestCase):
"""Tests for sas_blob_utils.py"""
needs_cleanup = False
@ -78,7 +92,7 @@ class TestSasBlobUtils(unittest.TestCase):
# except ResourceNotFoundError:
# pass
# if SasBlob.check_blob_existence(PRIVATE_BLOB_URI):
# if check_blob_existence(PRIVATE_BLOB_URI):
# with BlobClient.from_blob_url(
# PRIVATE_BLOB_URI,
# credential=PRIVATE_ACCOUNT_KEY) as bc:
@ -86,47 +100,43 @@ class TestSasBlobUtils(unittest.TestCase):
self.needs_cleanup = False
def test_get_account_from_uri(self):
self.assertEqual(
SasBlob.get_account_from_uri(PUBLIC_BLOB_URI),
'lilablobssc')
self.assertEqual(get_account_from_uri(PUBLIC_BLOB_URI), 'lilablobssc')
def test_get_container_from_uri(self):
self.assertEqual(
SasBlob.get_container_from_uri(PUBLIC_BLOB_URI),
get_container_from_uri(PUBLIC_BLOB_URI),
'nacti-unzipped')
def test_get_blob_from_uri(self):
self.assertEqual(
SasBlob.get_blob_from_uri(PUBLIC_BLOB_URI),
PUBLIC_BLOB_NAME)
self.assertEqual(get_blob_from_uri(PUBLIC_BLOB_URI), PUBLIC_BLOB_NAME)
with self.assertRaises(ValueError):
SasBlob.get_blob_from_uri(PUBLIC_CONTAINER_URI)
get_blob_from_uri(PUBLIC_CONTAINER_URI)
def test_get_sas_key_from_uri(self):
self.assertIsNone(SasBlob.get_sas_key_from_uri(PUBLIC_CONTAINER_URI))
def test_get_sas_token_from_uri(self):
self.assertIsNone(get_sas_token_from_uri(PUBLIC_CONTAINER_URI))
self.assertEqual(
SasBlob.get_sas_key_from_uri(PUBLIC_CONTAINER_URI_SAS),
get_sas_token_from_uri(PUBLIC_CONTAINER_URI_SAS),
PUBLIC_CONTAINER_SAS)
def test_check_blob_existence(self):
print('PUBLIC_BLOB_URI')
self.assertTrue(SasBlob.check_blob_existence(PUBLIC_BLOB_URI))
self.assertTrue(check_blob_existence(PUBLIC_BLOB_URI))
print('PUBLIC_CONTAINER_URI + PUBLIC_BLOB_NAME')
self.assertTrue(SasBlob.check_blob_existence(
self.assertTrue(check_blob_existence(
PUBLIC_CONTAINER_URI, blob_name=PUBLIC_BLOB_NAME))
print('PUBLIC_CONTAINER_URI')
with self.assertRaises(IndexError):
SasBlob.check_blob_existence(PUBLIC_CONTAINER_URI)
check_blob_existence(PUBLIC_CONTAINER_URI)
print('PUBLIC_INVALID_BLOB_URI')
self.assertFalse(SasBlob.check_blob_existence(PUBLIC_INVALID_BLOB_URI))
self.assertFalse(check_blob_existence(PUBLIC_INVALID_BLOB_URI))
print('PRIVATE_BLOB_URI')
with self.assertRaises(HttpResponseError):
SasBlob.check_blob_existence(PRIVATE_BLOB_URI)
check_blob_existence(PRIVATE_BLOB_URI)
def test_list_blobs_in_container(self):
blobs_list = SasBlob.list_blobs_in_container(
blobs_list = list_blobs_in_container(
PUBLIC_ZIPPED_CONTAINER_URI, limit=100)
expected = sorted([
'wcs_20200403_bboxes.json.zip', 'wcs_camera_traps.json.zip',
@ -137,12 +147,19 @@ class TestSasBlobUtils(unittest.TestCase):
'wcs_splits.json'])
self.assertEqual(blobs_list, expected)
blobs_list = list_blobs_in_container(
PUBLIC_ZIPPED_CONTAINER_URI, rsearch=r'_\d[0-3]\.zip')
expected = sorted([
'wcs_camera_traps_00.zip', 'wcs_camera_traps_01.zip',
'wcs_camera_traps_02.zip', 'wcs_camera_traps_03.zip'])
self.assertEqual(blobs_list, expected)
def test_generate_writable_container_sas(self):
# until the private emulated account is able to work, skip this test
self.skipTest('skipping private account tests for now')
self.needs_cleanup = True
new_sas_uri = SasBlob.generate_writable_container_sas(
new_sas_uri = generate_writable_container_sas(
account_name=PRIVATE_ACCOUNT_NAME,
account_key=PRIVATE_ACCOUNT_KEY,
container_name=PRIVATE_CONTAINER_NAME,
@ -150,7 +167,7 @@ class TestSasBlobUtils(unittest.TestCase):
account_url=PRIVATE_ACCOUNT_URI)
self.assertTrue(isinstance(new_sas_uri, str))
self.assertNotEqual(new_sas_uri, '')
self.assertEqual(len(SasBlob.list_blobs_in_container(new_sas_uri)), 0)
self.assertEqual(len(list_blobs_in_container(new_sas_uri)), 0)
def test_upload_blob(self):
self.needs_cleanup = True
@ -158,16 +175,16 @@ class TestSasBlobUtils(unittest.TestCase):
# ResourceNotFoundError('The specified resource does not exist.')
print('PUBLIC_CONTAINER_URI')
with self.assertRaises(ResourceNotFoundError):
SasBlob.upload_blob(PUBLIC_CONTAINER_URI,
blob_name='failblob', data='fail')
upload_blob(PUBLIC_CONTAINER_URI,
blob_name='failblob', data='fail')
# uploading to a public container with a read-only SAS token yields
# HttpResponseError('This request is not authorized to perform this '
# 'operation using this permission.')
print('PUBLIC_CONTAINER_URI_SAS')
with self.assertRaises(HttpResponseError):
SasBlob.upload_blob(PUBLIC_CONTAINER_URI_SAS,
blob_name='failblob', data='fail')
upload_blob(PUBLIC_CONTAINER_URI_SAS,
blob_name='failblob', data='fail')
# uploading to a private container without a SAS token yields
# HttpResponseError('Server failed to authenticate the request. Make '
@ -175,23 +192,23 @@ class TestSasBlobUtils(unittest.TestCase):
# 'formed correctly including the signature.')
print('PRIVATE_CONTAINER_URI')
with self.assertRaises(HttpResponseError):
SasBlob.upload_blob(PRIVATE_CONTAINER_URI,
blob_name=PRIVATE_BLOB_NAME, data='success')
upload_blob(PRIVATE_CONTAINER_URI,
blob_name=PRIVATE_BLOB_NAME, data='success')
# until the private emulated account is able to work, skip this test
# private_container_uri_sas = SasBlob.generate_writable_container_sas(
# private_container_uri_sas = generate_writable_container_sas(
# account_name=PRIVATE_ACCOUNT_NAME,
# account_key=PRIVATE_ACCOUNT_KEY,
# container_name=PRIVATE_CONTAINER_NAME,
# access_duration_hrs=1,
# account_url=PRIVATE_ACCOUNT_URI)
# blob_url = SasBlob.upload_blob(
# blob_url = upload_blob(
# private_container_uri_sas,
# blob_name=PRIVATE_BLOB_NAME, data='success')
# self.assertEqual(blob_url, PRIVATE_BLOB_URI)
def test_get_blob_to_stream(self):
output, props = SasBlob.get_blob_to_stream(PUBLIC_BLOB_URI)
def test_download_blob_to_stream(self):
output, props = download_blob_to_stream(PUBLIC_BLOB_URI)
x = output.read()
self.assertEqual(len(x), 376645)
output.close()
@ -206,14 +223,14 @@ class TestSasBlobUtils(unittest.TestCase):
for k, v in expected_properties.items():
self.assertEqual(props[k], v)
def test_generate_blob_sas_uri(self):
generated = SasBlob.generate_blob_sas_uri(
container_sas_uri=PUBLIC_CONTAINER_URI,
def test_build_blob_uri(self):
generated = build_blob_uri(
container_uri=PUBLIC_CONTAINER_URI,
blob_name=PUBLIC_BLOB_NAME)
self.assertEqual(generated, PUBLIC_BLOB_URI)
generated = SasBlob.generate_blob_sas_uri(
container_sas_uri=PUBLIC_CONTAINER_URI_SAS,
generated = build_blob_uri(
container_uri=PUBLIC_CONTAINER_URI_SAS,
blob_name=PUBLIC_BLOB_NAME)
self.assertEqual(generated, PUBLIC_BLOB_URI_SAS)