toasty/pipeline.py: make the local backend writeable, and add list_items()

The new ABC method will be usefil for the final stage of the pipeline.
This commit is contained in:
Peter Williams 2020-03-18 00:13:23 -04:00
Родитель 69207a1932
Коммит b9e7ed287d
2 изменённых файлов: 56 добавлений и 8 удалений

Просмотреть файл

@ -327,14 +327,14 @@ def _pipeline_add_io_args(parser):
parser.add_argument(
'--local',
metavar = 'PATH',
help = 'Use the local testing I/O backend'
help = 'Use the local-disk I/O backend'
)
def _pipeline_io_from_settings(settings):
from .pipeline import AzureBlobPipelineIo, LocalTestPipelineIo
from .pipeline import AzureBlobPipelineIo, LocalPipelineIo
if settings.local:
return LocalTestPipelineIo(settings.local)
return LocalPipelineIo(settings.local)
if settings.azure_conn_env:
conn_str = os.environ.get(settings.azure_conn_env)

Просмотреть файл

@ -17,7 +17,7 @@ BitmapInputImage
CandidateInput
ImageSource
InputImage
LocalTestPipelineIo
LocalPipelineIo
PipelineIo
'''.split()
@ -127,6 +127,23 @@ class PipelineIo(ABC):
"""
pass
@abstractmethod
def list_items(self, *path):
"""List the items contained in the folder at the specified path.
Parameters
----------
*path : strings
The path to the item, intepreted as components in a folder hierarchy.
Returns
-------
An iterable of ``(stem, is_folder)``, where *stem* is the "basename" of an
item contained within the specified folder and *is_folder* is a boolean
indicating whether this item appears to be a folder itself.
"""
pass
class AzureBlobPipelineIo(PipelineIo):
"""I/O for pipeline processing that uses Microsoft Azure Blob Storage.
@ -181,9 +198,29 @@ class AzureBlobPipelineIo(PipelineIo):
source,
)
def list_items(self, *path):
from azure.storage.blob.models import BlobPrefix
prefix = self._make_blob_name(path) + '/'
class LocalTestPipelineIo(PipelineIo):
"""I/O for pipeline processing that is entirely local and read-only.
for item in self._svc.list_blobs(
self._container_name,
prefix = prefix,
delimiter = '/'
):
assert item.name.startswith(prefix)
stem = item.name[len(prefix):]
is_folder = isinstance(item, BlobPrefix)
if is_folder:
# Returned names end with a '/' too
assert stem[-1] == '/'
stem = stem[:-1]
yield stem, is_folder
class LocalPipelineIo(PipelineIo):
"""I/O for pipeline processing using the local disk.
Parameters
----------
@ -204,8 +241,19 @@ class LocalTestPipelineIo(PipelineIo):
shutil.copyfileobj(f, dest)
def put_item(self, *path, source=None):
print('*** would write data to %s' % self._make_item_name(path))
source.close()
fpath = self._make_item_name(path)
cdir = os.path.split(fpath)[0]
os.makedirs(cdir, exist_ok=True)
with open(fpath, 'wb') as f:
shutil.copyfileobj(source, f)
def list_items(self, *path):
dpath = self._make_item_name(path)
for stem in os.listdir(dpath):
yield stem, os.path.isdir(os.path.join(dpath, stem))
# The `ImageSource` ABC and implementations