pipeline: start working on polished CLI approach

It turns out that the Azure Storage Python API has evolved significantly. Annoying.
This commit is contained in:
Peter Williams 2020-11-17 16:51:19 -05:00
Родитель bce75b7ee6
Коммит 8b4caa28ad
6 изменённых файлов: 186 добавлений и 27 удалений

Просмотреть файл

@ -69,6 +69,7 @@ and [PyPI](https://pypi.org/project/toasty/#history).
[toasty] is a Python package so, yes, Python is required.
- [astropy] if using FITS files or WCS coordinates
- [azure-storage-blob] >= 12.0 if using the Azure storage backend for pipeline processing
- [cython]
- [filelock]
- [healpy] if using [HEALPix] maps
@ -80,6 +81,7 @@ and [PyPI](https://pypi.org/project/toasty/#history).
- [wwt_data_formats]
[astropy]: https://www.astropy.org/
[azure-storage-blob]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob
[cython]: https://cython.org/
[filelock]: https://github.com/benediktschmitt/py-filelock
[healpy]: https://healpy.readthedocs.io/

Просмотреть файл

@ -13,11 +13,15 @@ PipelineIo
~PipelineIo.check_exists
~PipelineIo.get_item
~PipelineIo.list_items
~PipelineIo.load_from_config
~PipelineIo.put_item
~PipelineIo.save_config
.. rubric:: Methods Documentation
.. automethod:: check_exists
.. automethod:: get_item
.. automethod:: list_items
.. automethod:: load_from_config
.. automethod:: put_item
.. automethod:: save_config

Просмотреть файл

@ -82,11 +82,82 @@ EXTENSION_REMAPPING = {
# The `PipelineIo` ABC
class PipelineIo(ABC):
"""An abstract base class for I/O relating to pipeline processing. An instance
"""
An abstract base class for I/O relating to pipeline processing. An instance
of this class might be used to fetch files from, and send them to, a cloud
storage system like S3 or Azure Storage.
"""
@abstractmethod
def _export_config(self):
"""
Export this object's configuration for serialization.
Returns
-------
A dictionary of settings that can be saved as YAML format. There should
be a key named "_type" with a string value identifying the I/O
implementation type.
"""
def save_config(self, path):
"""
Save this object's configuration to the specified filesystem path.
"""
cfg = self._export_config()
# The config contains secrets, so create it privately and securely.
opener = lambda path, _mode: os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, mode=0o600)
with open(path, 'wt', opener=opener, encoding='utf8') as f:
yaml.dump(cfg, f, yaml.SafeDumper)
@abstractclassmethod
def _new_from_config(cls, config):
"""
Create a new instance of this class based on serialized configuration.
Parameters
----------
config : dict
A dict of configuration that was created with ``_export_config``
Returns
-------
A new instance of the class.
"""
@classmethod
def load_from_config(self, path):
"""
Create a new I/O backend from saved configuration.
Parameters
----------
path : path-like
The path where the configuration was saved.
Returns
-------
A new instance implementing the PipelineIO abstract base class.
"""
with open(path, 'rt', encoding='utf8') as f:
config = yaml.safe_load(f)
ty = config.get('_type')
if ty == 'local':
from .local_io import LocalPipelineIo
cls = LocalPipelineIo
elif ty == 'azure-blob':
from .azure_io import AzureBlobPipelineIo
cls = AzureBlobPipelineIo
else:
raise Exception(f'unrecognized pipeline I/O storage type {ty!r}')
return cls._new_from_config(config)
@abstractmethod
def check_exists(self, *path):
"""Test whether an item at the specified path exists.
@ -699,9 +770,9 @@ class PipelineManager(object):
_workdir = None
_img_source = None
def __init__(self, pipeio, workdir):
self._pipeio = pipeio
def __init__(self, workdir):
self._workdir = workdir
self._pipeio = PipelineIo.load_from_config(self._path('toasty-store-config.yaml'))
def _path(self, *path):
return os.path.join(self._workdir, *path)

Просмотреть файл

@ -18,10 +18,12 @@ ENABLED
assert_enabled
'''.split()
import shutil
from . import PipelineIo
try:
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
ENABLED = True
except ImportError:
ENABLED = False
@ -47,7 +49,9 @@ class AzureBlobPipelineIo(PipelineIo):
prepended to all paths accessed through this object.
"""
_svc = None
_connection_string = None
_svc_client = None
_cnt_client = None
_container_name = None
_path_prefix = None
@ -65,40 +69,56 @@ class AzureBlobPipelineIo(PipelineIo):
raise ValueError('path_prefix should be a string or iterable of strings; '
'got %r' % (path_prefix, ))
self._svc = BlockBlobService(connection_string=connection_string)
self._connection_string = connection_string
self._container_name = container_name
self._svc_client = BlobServiceClient.from_connection_string(connection_string)
self._cnt_client = self._svc_client.get_container_client(container_name)
self._path_prefix = path_prefix
def _export_config(self):
return {
'_type': 'azure-blob',
'connection_secret': self._connection_string,
'container_name': self._container_name,
'path_prefix': self._path_prefix,
}
@classmethod
def _new_from_config(cls, config):
return cls(
config['connection_secret'],
config['container_name'],
config['path_prefix'],
)
def _make_blob_name(self, path_array):
"""TODO: is this actually correct? Escaping?"""
return '/'.join(self._path_prefix + tuple(path_array))
def check_exists(self, *path):
return self._svc.exists(
self._container_name,
self._make_blob_name(path),
)
from azure.core.exceptions import ResourceNotFoundError
blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path))
try:
blob_client.get_blob_properties()
except ResourceNotFoundError:
return False
return True
def get_item(self, *path, dest=None):
self._svc.get_blob_to_stream(
self._container_name,
self._make_blob_name(path),
dest,
)
blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path))
blob_client.download_blob().readinto(dest)
def put_item(self, *path, source=None):
self._svc.create_blob_from_stream(
self._container_name,
self._make_blob_name(path),
source,
)
blob_client = self._cnt_client.get_blob_client(self._make_blob_name(path))
blob_client.upload_blob(source)
def list_items(self, *path):
from azure.storage.blob.models import BlobPrefix
prefix = self._make_blob_name(path) + '/'
for item in self._svc.list_blobs(
self._container_name,
for item in self._cnt_client.list_blobs(
prefix = prefix,
delimiter = '/'
):

Просмотреть файл

@ -18,7 +18,9 @@ import sys
from ..cli import die, warn
def _pipeline_add_io_args(parser):
# The "init" subcommand
def init_setup_parser(parser):
parser.add_argument(
'--azure-conn-env',
metavar = 'ENV-VAR-NAME',
@ -40,6 +42,13 @@ def _pipeline_add_io_args(parser):
metavar = 'PATH',
help = 'Use the local-disk I/O backend'
)
parser.add_argument(
'workdir',
nargs = '?',
metavar = 'PATH',
default = '.',
help = 'The working directory for this processing session'
)
def _pipeline_io_from_settings(settings):
@ -72,6 +81,40 @@ def _pipeline_io_from_settings(settings):
die('An I/O backend must be specified with the arguments --local or --azure-*')
def init_impl(settings):
pipeio = _pipeline_io_from_settings(settings)
os.makedirs(settings.workdir, exist_ok=True)
pipeio.save_config(os.path.join(settings.workdir, 'toasty-store-config.yaml'))
# Other subcommands not yet split out.
def _pipeline_add_io_args(parser):
parser.add_argument(
'--azure-conn-env',
metavar = 'ENV-VAR-NAME',
help = 'The name of an environment variable contain an Azure Storage '
'connection string'
)
parser.add_argument(
'--azure-container',
metavar = 'CONTAINER-NAME',
help = 'The name of a blob container in the Azure storage account'
)
parser.add_argument(
'--azure-path-prefix',
metavar = 'PATH-PREFIX',
help = 'A slash-separated path prefix for blob I/O within the container'
)
parser.add_argument(
'--local',
metavar = 'PATH',
help = 'Use the local-disk I/O backend'
)
def pipeline_getparser(parser):
subparsers = parser.add_subparsers(dest='pipeline_command')
@ -79,15 +122,19 @@ def pipeline_getparser(parser):
_pipeline_add_io_args(parser)
parser.add_argument(
'workdir',
nargs = '?',
metavar = 'WORKDIR',
default = '.',
help = 'The local working directory',
)
init_setup_parser(subparsers.add_parser('init'))
parser = subparsers.add_parser('process-todos')
_pipeline_add_io_args(parser)
parser.add_argument(
'workdir',
nargs = '?',
metavar = 'WORKDIR',
default = '.',
help = 'The local working directory',
@ -97,6 +144,7 @@ def pipeline_getparser(parser):
_pipeline_add_io_args(parser)
parser.add_argument(
'workdir',
nargs = '?',
metavar = 'WORKDIR',
default = '.',
help = 'The local working directory',
@ -106,6 +154,7 @@ def pipeline_getparser(parser):
_pipeline_add_io_args(parser)
parser.add_argument(
'workdir',
nargs = '?',
metavar = 'WORKDIR',
default = '.',
help = 'The local working directory',
@ -119,16 +168,19 @@ def pipeline_impl(settings):
print('Run the "pipeline" command with `--help` for help on its subcommands')
return
pipeio = _pipeline_io_from_settings(settings)
mgr = PipelineManager(pipeio, settings.workdir)
if settings.pipeline_command == 'fetch-inputs':
mgr = PipelineManager(settings.workdir)
mgr.fetch_inputs()
elif settings.pipeline_command == 'init':
init_impl(settings)
elif settings.pipeline_command == 'process-todos':
mgr = PipelineManager(settings.workdir)
mgr.process_todos()
elif settings.pipeline_command == 'publish-todos':
mgr = PipelineManager(settings.workdir)
mgr.publish_todos()
elif settings.pipeline_command == 'reindex':
mgr = PipelineManager(settings.workdir)
mgr.reindex()
else:
die('unrecognized "pipeline" subcommand ' + settings.pipeline_command)

Просмотреть файл

@ -36,6 +36,16 @@ class LocalPipelineIo(PipelineIo):
def __init__(self, path_prefix):
self._path_prefix = path_prefix
def _export_config(self):
return {
'_type': 'local',
'path': self._path_prefix,
}
@classmethod
def _new_from_config(cls, config):
return cls(config['path'])
def _make_item_name(self, path_array):
return os.path.join(self._path_prefix, *path_array)