Added scripts for enumerating folders in a blob container a specified depth, enumerating blobs in parallel in a large blob container, and removing empty folder in a local file system

This commit is contained in:
Dan Morris 2020-10-29 12:55:09 -07:00
Родитель 88b75a42ef
Коммит ecaa5ef90c
3 изменённых файлов: 454 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,143 @@
#
# enumerate_folders_at_depth.py
#
# List folders in a blob container at a particular depth (not including
# folders at shallower depths.
#
# Typically used this to prepare a prefix list for parallel_enumerate_blobs.py.
#
#%% Constants and imports
import sys
import datetime
import argparse
from azure.storage.blob import BlobServiceClient
# Assumes that the parent folder of the ai4eutils repo is on the PYTHONPATH
#
# import sys; sys.path.append('/home/dmorris/git/ai4eutils')
# export PYTHONPATH="$PYTHONPATH:/home/dmorris/git/ai4eutils"
from ai4e_azure_utils import walk_container
from ai4e_azure_utils import sas_blob_utils
account_name = ''
container_name = ''
output_file = ''
ro_sas_token = ''
depth = 3
#%% Main function
def enumerate_folders():
#%% Derived constants
storage_account_url_blob = 'https://' + account_name + '.blob.core.windows.net'
#%% Create client handle
blob_service_client = BlobServiceClient(account_url=storage_account_url_blob,
credential=ro_sas_token)
container_client = blob_service_client.get_container_client(container_name)
#%% Enumerate row-level folders
start = datetime.datetime.now()
#
# Uses ContainerClient.walk_blobs()
#
folders, _ = walk_container(
container_client, max_depth=depth, store_blobs=False)
end = datetime.datetime.now()
elapsed = end - start
folders = [s for s in folders if s.count('/') == (depth-1)]
print("Enumerated {} folders in {}s".format(len(folders),str(elapsed.seconds)))
for s in folders:
print(s)
#%% Write results to file
folders_with_newlines = [s + '\n' for s in folders]
with open(output_file,'w') as f:
f.writelines(folders_with_newlines)
#%% Interactive driver
if False:
#%%
enumerate_folders()
#%% Command-line driver
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
description='List folders in a blob container at a particular depth (not including folders at shallower depths.')
parser.add_argument(
'--sas_url',
help='Container-level SAS URL (exclusive with account_name, container_name, ro_sas_token)')
parser.add_argument(
'--account_name',
help='Storage account name')
parser.add_argument(
'--container_name',
help='Blob container name')
parser.add_argument(
'--ro_sas_token',
help='Read-only SAS token for the container, with or without a leading ?')
parser.add_argument(
'depth',
type=int,
help='Recursion depth, must be >= 1. A depth value of 1 enumerates root-level folders.')
parser.add_argument(
'output_file',
help='Output file')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
# [URL] and [account,container,token] are mutually exclusive
if args.sas_url is not None:
assert args.account_name is None and args.container_name is None and args.ro_sas_token is None
account_name = sas_blob_utils.get_account_from_uri(args.sas_url)
container_name = sas_blob_utils.get_container_from_uri(args.sas_url)
ro_sas_token = sas_blob_utils.get_sas_token_from_uri(args.sas_url)
assert not ro_sas_token.startswith('?')
ro_sas_token = '?' + ro_sas_token
else:
assert args.account_name is not None and args.container_name is not None and args.ro_sas_token is not None
account_name = args.account_name
container_name = args.container_name
ro_sas_token = args.ro_sas_token
if not ro_sas_token.startswith('?'):
ro_sas_token = '?' + ro_sas_token
depth = args.depth
assert depth > 0, 'Depth must be >= 1'
output_file = args.output_file
enumerate_folders()

249
parallel_enumerate_blobs.py Normal file
Просмотреть файл

@ -0,0 +1,249 @@
#
# parallel_enumerate_blobs.py
#
# Read a list of prefixes from a text file, then enumerates a blob
# container, parallelizing across those prefixes (on thread/process per prefix).
#
# Creates one output file per prefix, which we typically just cat together
# after the fact.
#
# In practice, the prefix list is generated using enumerate_folders_at_depth.py,
# but it's just a flat list, so you can generate it however like.
#
# Uses one thread/process per prefix.
#
#%% Constants and imports
import os
import sys
import time
import argparse
import multiprocessing
from azure.storage.blob import BlobServiceClient
# Assumes that the parent folder of the ai4eutils repo is on the PYTHONPATH
#
# import sys; sys.path.append('/home/dmorris/git/ai4eutils')
# export PYTHONPATH="$PYTHONPATH:/home/dmorris/git/ai4eutils"
import path_utils
from ai4e_azure_utils import sas_blob_utils
n_blobs_per_page = 5000
n_print = 10000
# Toggles between threads (True) and processes (False)
use_threads = False
verbose = False
# This is a bit of a hack, but it has a *massive* impact on performance and on
# minimizing storage-account-level throttling. So... don't set this to zero.
sleep_time_per_page = 0.001
# Limit the number of files to enumerate per thread; used only for debugging
debug_max_files = -1
#%% Read prefix list
def read_prefix_list(prefix_list_file):
with open(prefix_list_file,'r') as f:
prefixes = f.readlines()
prefixes = [s.strip() for s in prefixes]
print('Read {} prefixes from {}'.format(len(prefixes),
prefix_list_file))
return prefixes
#%% Multiprocessing init
def pinit(c):
global cnt
cnt = c
class Counter(object):
def __init__(self, total):
# 'i' means integer
self.val = multiprocessing.Value('i', 0)
self.total = multiprocessing.Value('i', total)
self.last_print = multiprocessing.Value('i', 0)
def increment(self, n=1):
b_print = False
with self.val.get_lock():
self.val.value += n
if ((self.val.value - self.last_print.value) >= n_print):
self.last_print.value = self.val.value
b_print = True
if b_print:
total_string = ''
if self.total.value > 0:
total_string = ' of {}'.format(self.total.value)
print('{}: iteration {}{}'.format(time.strftime("%Y-%m-%d %H:%M:%S"),
self.val.value,total_string),flush=True)
@property
def value(self):
return self.val.value
def last_print_value(self):
return self.last_print.value
pinit(Counter(-1))
#%% Enumeration function
def enumerate_prefix(prefix,sas_url,output_folder):
account_name = sas_blob_utils.get_account_from_uri(sas_url)
container_name = sas_blob_utils.get_container_from_uri(sas_url)
ro_sas_token = sas_blob_utils.get_sas_token_from_uri(sas_url)
assert not ro_sas_token.startswith('?')
ro_sas_token = '?' + ro_sas_token
storage_account_url_blob = 'https://' + account_name + '.blob.core.windows.net'
# prefix = prefixes[0]; print(prefix)
print('Starting enumeration for prefix {}'.format(prefix))
# Open the output file
fn = path_utils.clean_filename(prefix)
output_file = os.path.join(output_folder,fn)
# Create the container
blob_service_client = BlobServiceClient(
account_url=storage_account_url_blob,
credential=ro_sas_token)
container_client = blob_service_client.get_container_client(container_name)
# Enumerate
with open(output_file,'w') as output_f:
continuation_token = ''
hit_debug_limit = False
i_blob = 0
while (continuation_token is not None) and (not hit_debug_limit):
blobs_iter = container_client.list_blobs(
name_starts_with=prefix,
results_per_page=n_blobs_per_page).by_page(
continuation_token=continuation_token)
blobs = next(blobs_iter)
n_blobs_this_page = 0
for blob in blobs:
i_blob += 1
n_blobs_this_page += 1
if (debug_max_files > 0) and (i_blob > debug_max_files):
print('Hit debug path limit for prefix {}'.format(prefix))
i_blob -= 1
hit_debug_limit = True
break
else:
output_f.write(blob.name + '\n')
# print('Enumerated {} blobs'.format(n_blobs_this_page))
cnt.increment(n=n_blobs_this_page)
continuation_token = blobs_iter.continuation_token
if sleep_time_per_page > 0:
time.sleep(sleep_time_per_page)
# ...while we're enumerating
# ...with open(output_file)
print('Finished enumerating {} blobs for prefix {}'.format(
i_blob,prefix))
#%% Thread-based implementation
from threading import Thread
def enumerate_blobs_threads(prefixes,sas_url,output_folder):
all_threads = []
for s in prefixes:
# print('Starting thread for prefix {}'.format(s))
t = Thread(name=s,target=enumerate_prefix,args=(s,sas_url,output_folder,))
t.daemon = False
t.start()
all_threads.append(t)
for t in all_threads:
t.join()
# print('Thread {} finished'.format(t.name))
#%% Process-based implementation
from multiprocessing import Process
def enumerate_blobs_processes(prefixes,sas_url,output_folder):
all_processes = []
for s in prefixes:
# print('Starting process for prefix {}'.format(s))
p = Process(name=s,target=enumerate_prefix,args=(s,sas_url,output_folder,))
p.daemon = False
p.start()
all_processes.append(p)
for p in all_processes:
p.join()
# print('Process {} finished'.format(p.name))
#%% Main function
def enumerate_blobs(prefix_list_file,sas_url,output_folder):
assert(os.path.isfile(prefix_list_file))
os.makedirs(output_folder,exist_ok=True)
pinit(Counter(-1))
prefixes = read_prefix_list(prefix_list_file)
if use_threads:
enumerate_blobs_threads(prefixes,sas_url,output_folder)
else:
enumerate_blobs_processes(prefixes,sas_url,output_folder)
#%% Driver
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
description='Enumerate blobs in a container, using one thread/process per prefix from a specified list of prefixes.')
parser.add_argument(
'prefix_list_file',
help='Text file containing one prefix per line')
parser.add_argument(
'sas_url',
help='Read-/list-capable, container-level SAS URL to the target container')
parser.add_argument(
'output_folder',
help='Output folder; one flat file per prefix will be written to this folder')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
enumerate_blobs(args.prefix_list_file,args.sas_url,args.output_folder)

62
remove_empty_folders.py Normal file
Просмотреть файл

@ -0,0 +1,62 @@
#
# Recursively remove empty folders from a folder.
#
# Usage:
#
# remove_empty_folders /top/level/folder
#
#%% Imports
import os
import sys
#%% Functions
def remove_empty_folders(path, removeRoot=True):
try:
# https://www.jacobtomlinson.co.uk/posts/2014/python-script-recursively-remove-empty-folders/directories/
if not os.path.isdir(path):
return
# Remove empty subfolders
files = os.listdir(path)
if len(files):
for f in files:
fullpath = os.path.join(path, f)
if os.path.isdir(fullpath):
remove_empty_folders(fullpath)
# List files again; we may have removed subfolders
files = os.listdir(path)
# If the folder is still empty, delete it
if len(files) == 0 and removeRoot:
print('Removing empty folder: {}'.format(path))
try:
os.rmdir(path)
except:
print('Error removing {}'.format(path))
except:
print('Error processing {}'.format(path))
#%% Command-line driver
if __name__ == '__main__' and '__file__' in globals():
if len(sys.argv) < 2:
print('No base dir specified')
sys.exit()
base_dir = sys.argv[1]
if not os.path.isdir(base_dir):
print('{} is not a directory'.format(base_dir))
sys.exit()
remove_empty_folders(base_dir)