From 8b4caa28ad145b85de3f5aa62c8f640fff11980a Mon Sep 17 00:00:00 2001 From: Peter Williams Date: Tue, 17 Nov 2020 16:51:19 -0500 Subject: [PATCH] pipeline: start working on polished CLI approach It turns out that the Azure Storage Python API has evolved significantly. Annoying. --- README.md | 2 + docs/api/toasty.pipeline.PipelineIo.rst | 4 ++ toasty/pipeline/__init__.py | 79 +++++++++++++++++++++++-- toasty/pipeline/azure_io.py | 58 ++++++++++++------ toasty/pipeline/cli.py | 60 +++++++++++++++++-- toasty/pipeline/local_io.py | 10 ++++ 6 files changed, 186 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 514121a..d84ab7c 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ and [PyPI](https://pypi.org/project/toasty/#history). [toasty] is a Python package so, yes, Python is required. - [astropy] if using FITS files or WCS coordinates +- [azure-storage-blob] >= 12.0 if using the Azure storage backend for pipeline processing - [cython] - [filelock] - [healpy] if using [HEALPix] maps @@ -80,6 +81,7 @@ and [PyPI](https://pypi.org/project/toasty/#history). - [wwt_data_formats] [astropy]: https://www.astropy.org/ +[azure-storage-blob]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob [cython]: https://cython.org/ [filelock]: https://github.com/benediktschmitt/py-filelock [healpy]: https://healpy.readthedocs.io/ diff --git a/docs/api/toasty.pipeline.PipelineIo.rst b/docs/api/toasty.pipeline.PipelineIo.rst index e4beec6..cccda8f 100644 --- a/docs/api/toasty.pipeline.PipelineIo.rst +++ b/docs/api/toasty.pipeline.PipelineIo.rst @@ -13,11 +13,15 @@ PipelineIo ~PipelineIo.check_exists ~PipelineIo.get_item ~PipelineIo.list_items + ~PipelineIo.load_from_config ~PipelineIo.put_item + ~PipelineIo.save_config .. rubric:: Methods Documentation .. automethod:: check_exists .. automethod:: get_item .. automethod:: list_items + .. automethod:: load_from_config .. automethod:: put_item + .. automethod:: save_config diff --git a/toasty/pipeline/__init__.py b/toasty/pipeline/__init__.py index def51ba..129a9ce 100644 --- a/toasty/pipeline/__init__.py +++ b/toasty/pipeline/__init__.py @@ -82,11 +82,82 @@ EXTENSION_REMAPPING = { # The `PipelineIo` ABC class PipelineIo(ABC): - """An abstract base class for I/O relating to pipeline processing. An instance + """ + An abstract base class for I/O relating to pipeline processing. An instance of this class might be used to fetch files from, and send them to, a cloud storage system like S3 or Azure Storage. - """ + + @abstractmethod + def _export_config(self): + """ + Export this object's configuration for serialization. + + Returns + ------- + A dictionary of settings that can be saved as YAML format. There should + be a key named "_type" with a string value identifying the I/O + implementation type. + """ + + def save_config(self, path): + """ + Save this object's configuration to the specified filesystem path. + """ + cfg = self._export_config() + + # The config contains secrets, so create it privately and securely. + opener = lambda path, _mode: os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode=0o600) + + with open(path, 'wt', opener=opener, encoding='utf8') as f: + yaml.dump(cfg, f, yaml.SafeDumper) + + @abstractclassmethod + def _new_from_config(cls, config): + """ + Create a new instance of this class based on serialized configuration. + + Parameters + ---------- + config : dict + A dict of configuration that was created with ``_export_config`` + + Returns + ------- + A new instance of the class. + """ + + @classmethod + def load_from_config(self, path): + """ + Create a new I/O backend from saved configuration. + + Parameters + ---------- + path : path-like + The path where the configuration was saved. + + Returns + ------- + A new instance implementing the PipelineIO abstract base class. + """ + + with open(path, 'rt', encoding='utf8') as f: + config = yaml.safe_load(f) + + ty = config.get('_type') + + if ty == 'local': + from .local_io import LocalPipelineIo + cls = LocalPipelineIo + elif ty == 'azure-blob': + from .azure_io import AzureBlobPipelineIo + cls = AzureBlobPipelineIo + else: + raise Exception(f'unrecognized pipeline I/O storage type {ty!r}') + + return cls._new_from_config(config) + @abstractmethod def check_exists(self, *path): """Test whether an item at the specified path exists. @@ -699,9 +770,9 @@ class PipelineManager(object): _workdir = None _img_source = None - def __init__(self, pipeio, workdir): - self._pipeio = pipeio + def __init__(self, workdir): self._workdir = workdir + self._pipeio = PipelineIo.load_from_config(self._path('toasty-store-config.yaml')) def _path(self, *path): return os.path.join(self._workdir, *path) diff --git a/toasty/pipeline/azure_io.py b/toasty/pipeline/azure_io.py index 4d45e3c..1f0292e 100644 --- a/toasty/pipeline/azure_io.py +++ b/toasty/pipeline/azure_io.py @@ -18,10 +18,12 @@ ENABLED assert_enabled '''.split() +import shutil + from . import PipelineIo try: - from azure.storage.blob import BlockBlobService + from azure.storage.blob import BlobServiceClient ENABLED = True except ImportError: ENABLED = False @@ -47,7 +49,9 @@ class AzureBlobPipelineIo(PipelineIo): prepended to all paths accessed through this object. """ - _svc = None + _connection_string = None + _svc_client = None + _cnt_client = None _container_name = None _path_prefix = None @@ -65,40 +69,56 @@ class AzureBlobPipelineIo(PipelineIo): raise ValueError('path_prefix should be a string or iterable of strings; ' 'got %r' % (path_prefix, )) - self._svc = BlockBlobService(connection_string=connection_string) + self._connection_string = connection_string self._container_name = container_name + self._svc_client = BlobServiceClient.from_connection_string(connection_string) + self._cnt_client = self._svc_client.get_container_client(container_name) self._path_prefix = path_prefix + def _export_config(self): + return { + '_type': 'azure-blob', + 'connection_secret': self._connection_string, + 'container_name': self._container_name, + 'path_prefix': self._path_prefix, + } + + @classmethod + def _new_from_config(cls, config): + return cls( + config['connection_secret'], + config['container_name'], + config['path_prefix'], + ) + def _make_blob_name(self, path_array): """TODO: is this actually correct? Escaping?""" return '/'.join(self._path_prefix + tuple(path_array)) def check_exists(self, *path): - return self._svc.exists( - self._container_name, - self._make_blob_name(path), - ) + from azure.core.exceptions import ResourceNotFoundError + + blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) + + try: + blob_client.get_blob_properties() + except ResourceNotFoundError: + return False + return True def get_item(self, *path, dest=None): - self._svc.get_blob_to_stream( - self._container_name, - self._make_blob_name(path), - dest, - ) + blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) + blob_client.download_blob().readinto(dest) def put_item(self, *path, source=None): - self._svc.create_blob_from_stream( - self._container_name, - self._make_blob_name(path), - source, - ) + blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path)) + blob_client.upload_blob(source) def list_items(self, *path): from azure.storage.blob.models import BlobPrefix prefix = self._make_blob_name(path) + '/' - for item in self._svc.list_blobs( - self._container_name, + for item in self._cnt_client.list_blobs( prefix = prefix, delimiter = '/' ): diff --git a/toasty/pipeline/cli.py b/toasty/pipeline/cli.py index 731f6d3..6d2e83f 100644 --- a/toasty/pipeline/cli.py +++ b/toasty/pipeline/cli.py @@ -18,7 +18,9 @@ import sys from ..cli import die, warn -def _pipeline_add_io_args(parser): +# The "init" subcommand + +def init_setup_parser(parser): parser.add_argument( '--azure-conn-env', metavar = 'ENV-VAR-NAME', @@ -40,6 +42,13 @@ def _pipeline_add_io_args(parser): metavar = 'PATH', help = 'Use the local-disk I/O backend' ) + parser.add_argument( + 'workdir', + nargs = '?', + metavar = 'PATH', + default = '.', + help = 'The working directory for this processing session' + ) def _pipeline_io_from_settings(settings): @@ -72,6 +81,40 @@ def _pipeline_io_from_settings(settings): die('An I/O backend must be specified with the arguments --local or --azure-*') +def init_impl(settings): + pipeio = _pipeline_io_from_settings(settings) + os.makedirs(settings.workdir, exist_ok=True) + pipeio.save_config(os.path.join(settings.workdir, 'toasty-store-config.yaml')) + + +# Other subcommands not yet split out. + +def _pipeline_add_io_args(parser): + parser.add_argument( + '--azure-conn-env', + metavar = 'ENV-VAR-NAME', + help = 'The name of an environment variable contain an Azure Storage ' + 'connection string' + ) + parser.add_argument( + '--azure-container', + metavar = 'CONTAINER-NAME', + help = 'The name of a blob container in the Azure storage account' + ) + parser.add_argument( + '--azure-path-prefix', + metavar = 'PATH-PREFIX', + help = 'A slash-separated path prefix for blob I/O within the container' + ) + parser.add_argument( + '--local', + metavar = 'PATH', + help = 'Use the local-disk I/O backend' + ) + + + + def pipeline_getparser(parser): subparsers = parser.add_subparsers(dest='pipeline_command') @@ -79,15 +122,19 @@ def pipeline_getparser(parser): _pipeline_add_io_args(parser) parser.add_argument( 'workdir', + nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', ) + init_setup_parser(subparsers.add_parser('init')) + parser = subparsers.add_parser('process-todos') _pipeline_add_io_args(parser) parser.add_argument( 'workdir', + nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', @@ -97,6 +144,7 @@ def pipeline_getparser(parser): _pipeline_add_io_args(parser) parser.add_argument( 'workdir', + nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', @@ -106,6 +154,7 @@ def pipeline_getparser(parser): _pipeline_add_io_args(parser) parser.add_argument( 'workdir', + nargs = '?', metavar = 'WORKDIR', default = '.', help = 'The local working directory', @@ -119,16 +168,19 @@ def pipeline_impl(settings): print('Run the "pipeline" command with `--help` for help on its subcommands') return - pipeio = _pipeline_io_from_settings(settings) - mgr = PipelineManager(pipeio, settings.workdir) - if settings.pipeline_command == 'fetch-inputs': + mgr = PipelineManager(settings.workdir) mgr.fetch_inputs() + elif settings.pipeline_command == 'init': + init_impl(settings) elif settings.pipeline_command == 'process-todos': + mgr = PipelineManager(settings.workdir) mgr.process_todos() elif settings.pipeline_command == 'publish-todos': + mgr = PipelineManager(settings.workdir) mgr.publish_todos() elif settings.pipeline_command == 'reindex': + mgr = PipelineManager(settings.workdir) mgr.reindex() else: die('unrecognized "pipeline" subcommand ' + settings.pipeline_command) diff --git a/toasty/pipeline/local_io.py b/toasty/pipeline/local_io.py index 715d164..f8a1f06 100644 --- a/toasty/pipeline/local_io.py +++ b/toasty/pipeline/local_io.py @@ -36,6 +36,16 @@ class LocalPipelineIo(PipelineIo): def __init__(self, path_prefix): self._path_prefix = path_prefix + def _export_config(self): + return { + '_type': 'local', + 'path': self._path_prefix, + } + + @classmethod + def _new_from_config(cls, config): + return cls(config['path']) + def _make_item_name(self, path_array): return os.path.join(self._path_prefix, *path_array)