Add upload to page blob support

This commit is contained in:
Fred Park 2017-08-31 19:25:14 -07:00
Родитель 5c3c093f88
Коммит 8c684419a6
14 изменённых файлов: 200 добавлений и 21 удалений

Просмотреть файл

@ -2,6 +2,23 @@
## [Unreleased]
### Added
- `upload` from `stdin` to page blob support. Optional
`--stdin-as-page-blob-size` parameter added. Please see current limitations
doc for more information.
- `upload` from `stdin` `--rename` support
- `synccopy` single object `--rename` support
### Changed
- AppVeyor integration
- PyPI releases automatically generated for tags
- PyInstaller-based releases uploaded to GitHub for Windows and Linux
### Fixed
- YAML config merge with CLI options when YAML options not present
- `synccopy` invocation without YAML config
- Test failures on Windows
## [1.0.0b1] - 2017-08-28
### Added
- Cross-mode synchronous copy support

Просмотреть файл

@ -30,7 +30,7 @@ copies for Block blobs)
* Support all Azure Blob types and Azure Files for both upload and download
* Advanced skip options for rsync-like operations
* Store/restore POSIX filemode and uid/gid
* Support reading/pipe from `stdin`
* Support reading/pipe from `stdin` including page blob destinations
* Support reading from blob snapshots for downloading and synchronous copy
* Configurable one-shot block upload support
* Configurable chunk size for both upload and download

Просмотреть файл

@ -76,6 +76,7 @@ Upload = collections.namedtuple(
'recursive',
'rename',
'rsa_public_key',
'stdin_as_page_blob_size',
'store_file_properties',
'strip_components',
'vectored_io',

Просмотреть файл

@ -214,7 +214,11 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths):
:rtype: bool
:return: if rename possible
"""
return len(self._paths) == 1 and self._paths[0].is_file()
return len(self._paths) == 1 and (
self._paths[0].is_file() or
blobxfer.models.upload.LocalSourcePath.is_stdin(
str(self._paths[0]))
)
@staticmethod
def is_stdin(path):
@ -349,15 +353,20 @@ class Descriptor(object):
self._chunk_num = 0
self._next_integrity_chunk = 0
self._finalized = False
self._needs_resize = False
self._meta_lock = threading.Lock()
self._hasher_lock = threading.Lock()
if resume_mgr and self.local_path.use_stdin:
logger.warning('ignoring resume option for stdin source')
self._resume_mgr = None
else:
self._resume_mgr = resume_mgr
self._ase = ase
self._store_file_attr = options.store_file_properties.attributes
self.current_iv = None
self._initialize_encryption(options)
# calculate the total number of ops required for transfer
self._compute_remote_size()
self._compute_remote_size(options)
self._adjust_chunk_size(options)
self._total_chunks = self._compute_total_chunks(self._chunk_size)
self._outstanding_ops = self._total_chunks
@ -501,6 +510,16 @@ class Descriptor(object):
return (not self.entity.is_encrypted and self.must_compute_md5 and
self.remote_is_file)
def requires_resize(self):
# type: (Descriptor) -> tuple
"""Remote destination requires a resize operation
:param Descriptor self: this
:rtype: tuple
:return: blob requires a resize, length
"""
with self._meta_lock:
return (self._needs_resize, self._offset)
def complete_offset_upload(self, chunk_num):
# type: (Descriptor, int) -> None
"""Complete the upload for the offset
@ -573,15 +592,23 @@ class Descriptor(object):
self.current_iv = em.content_encryption_iv
self._ase.encryption_metadata = em
def _compute_remote_size(self):
# type: (Descriptor, int) -> None
def _compute_remote_size(self, options):
# type: (Descriptor, blobxfer.models.options.Upload) -> None
"""Compute total remote file size
:param Descriptor self: this
:param blobxfer.models.options.Upload options: upload options
:rtype: int
:return: remote file size
"""
size = self.local_path.size
if size > 0:
if (self._ase.mode == blobxfer.models.azure.StorageModes.Page and
self.local_path.use_stdin):
if options.stdin_as_page_blob_size == 0:
allocatesize = _MAX_PAGE_BLOB_SIZE
self._needs_resize = True
else:
allocatesize = options.stdin_as_page_blob_size
elif size > 0:
if self._ase.is_encrypted:
# cipher_len_without_iv = (clear_len / aes_bs + 1) * aes_bs
allocatesize = (size // self._AES_BLOCKSIZE + 1) * \
@ -678,7 +705,9 @@ class Descriptor(object):
chunks = int(math.ceil(self._ase.size / chunk_size))
except ZeroDivisionError:
chunks = 1
if self.local_path.use_stdin and chunks == 0:
# for stdin, override and use 1 chunk to start, this will change
# dynamically as data as read
if self.local_path.use_stdin:
chunks = 1
if (self._ase.mode != blobxfer.models.azure.StorageModes.Page and
chunks > 50000):
@ -877,12 +906,16 @@ class Descriptor(object):
data = blobxfer.STDIN.read(self._chunk_size)
if not data:
with self._meta_lock:
self._offset -= offsets.num_bytes
self._ase.size -= offsets.num_bytes
self._total_chunks -= 1
self._chunk_num -= 1
self._outstanding_ops -= 1
else:
num_bytes = len(data)
with self._meta_lock:
self._offset -= offsets.num_bytes
self._ase.size -= offsets.num_bytes
newoffset = Offsets(
chunk_num=self._chunk_num - 1,
num_bytes=num_bytes,

Просмотреть файл

@ -102,3 +102,17 @@ def put_page(ase, page_start, page_end, data, timeout=None):
end_range=page_end,
validate_content=False, # integrity is enforced with HTTPS
timeout=timeout) # noqa
def resize_blob(ase, size, timeout=None):
# type: (blobxfer.models.azure.StorageEntity, int, int) -> None
"""Resizes a page blob
:param blobxfer.models.azure.StorageEntity ase: Azure StorageEntity
:param int size: content length
:param int timeout: timeout
"""
ase.client.resize_blob(
container_name=ase.container,
blob_name=ase.name,
content_length=blobxfer.util.page_align_content_length(size),
timeout=timeout) # noqa

Просмотреть файл

@ -628,6 +628,18 @@ class Uploader(object):
for ase in ud.entity.replica_targets:
blobxfer.operations.azure.blob.set_blob_metadata(ase, metadata)
def _resize_blob(self, ud, size):
# type: (Uploader, blobxfer.models.upload.Descriptor, int) -> None
"""Resize page blob
:param Uploader self: this
:param blobxfer.models.upload.Descriptor ud: upload descriptor
:param int size: content length
"""
blobxfer.operations.azure.blob.page.resize_blob(ud.entity, size)
if blobxfer.util.is_not_empty(ud.entity.replica_targets):
for ase in ud.entity.replica_targets:
blobxfer.operations.azure.blob.page.resize_blob(ase, size)
def _finalize_nonblock_blob(self, ud, metadata):
# type: (Uploader, blobxfer.models.upload.Descriptor, dict) -> None
"""Finalize Non-Block blob
@ -635,6 +647,10 @@ class Uploader(object):
:param blobxfer.models.upload.Descriptor ud: upload descriptor
:param dict metadata: metadata dict
"""
# resize page blobs to final size if required
needs_resize, final_size = ud.requires_resize()
if needs_resize:
self._resize_blob(ud, final_size)
# set md5 page blob property if required
if ud.requires_non_encrypted_md5_put:
self._set_blob_md5(ud)
@ -1030,7 +1046,7 @@ class Uploader(object):
skipped_size = 0
approx_total_bytes = 0
# iterate through source paths to upload
dupes = set()
seen = set()
for src in self._spec.sources.files():
# create a destination array for the source
dest = [
@ -1040,11 +1056,11 @@ class Uploader(object):
for action, lp, ase in self._vectorize_and_bind(src, dest):
dest_id = blobxfer.operations.upload.Uploader.\
create_destination_id(ase._client, ase.container, ase.name)
if dest_id in dupes:
if dest_id in seen:
raise RuntimeError(
'duplicate destination entity detected: {}/{}'.format(
ase._client.primary_endpoint, ase.path))
dupes.add(dest_id)
seen.add(dest_id)
if self._spec.options.delete_extraneous_destination:
self._delete_exclude.add(dest_id)
if action == UploadAction.Skip:
@ -1064,7 +1080,7 @@ class Uploader(object):
self._pre_md5_skip_on_check(lp, ase)
elif action == UploadAction.Upload:
self._add_to_upload_queue(lp, ase, uid)
del dupes
del seen
# set remote files processed
with self._md5_meta_lock:
self._all_files_processed = True

Просмотреть файл

@ -609,6 +609,20 @@ def _skip_on_md5_match_option(f):
callback=callback)(f)
def _stdin_as_page_blob_size_option(f):
def callback(ctx, param, value):
clictx = ctx.ensure_object(CliContext)
clictx.cli_options['stdin_as_page_blob_size'] = value
return value
return click.option(
'--stdin-as-page-blob-size',
expose_value=False,
type=int,
default=None,
help='Size of page blob with input from stdin [0]',
callback=callback)(f)
def _strip_components_option(f):
def callback(ctx, param, value):
clictx = ctx.ensure_object(CliContext)
@ -708,6 +722,7 @@ def _sync_copy_dest_storage_account_option(f):
def upload_options(f):
f = _stripe_chunk_size_bytes_option(f)
f = _strip_components_option(f)
f = _stdin_as_page_blob_size_option(f)
f = _skip_on_md5_match_option(f)
f = _skip_on_lmt_ge_option(f)
f = _skip_on_filesize_match_option(f)

Просмотреть файл

@ -97,6 +97,8 @@ def add_cli_options(cli_options, action):
'lmt_ge': cli_options.get('skip_on_lmt_ge'),
'md5_match': cli_options.get('skip_on_md5_match'),
},
'stdin_as_page_blob_size': cli_options.get(
'stdin_as_page_blob_size'),
'store_file_properties': {
'attributes': cli_options.get('file_attributes'),
'md5': cli_options.get('file_md5'),
@ -609,6 +611,9 @@ def create_upload_specifications(cli_options, config):
cli_options, sfp, 'md5', name_cli='file_md5',
default=False),
),
stdin_as_page_blob_size=_merge_setting(
cli_options, conf['options'], 'stdin_as_page_blob_size',
default=0),
strip_components=_merge_setting(
cli_options, conf['options'], 'strip_components',
default=0),

Просмотреть файл

@ -193,6 +193,8 @@ behavior.
* `--rename` renames a single file to the target destination or source path.
This can only be used when transferring a single source file to a destination
and can be used with any command.
* `--stdin-as-page-blob-size` allows a page blob size to be set if known
beforehand when using `stdin` as a source and the destination is a page blob
* `--strip-components N` will strip the leading `N` components from the
local file path on upload. The default is `0`.

Просмотреть файл

@ -165,6 +165,7 @@ upload:
filesize_match: false
lmt_ge: false
md5_match: true
stdin_as_page_blob_size: 0
store_file_properties:
attributes: true
md5: true
@ -201,6 +202,9 @@ upload:
* `lmt_ge` skip if remote file has a last modified time greater than or
equal to the local file
* `md5_match` skip if MD5 match
* `stdin_as_page_blob_size` is the page blob size to preallocate if the
amount of data to be streamed from stdin is known beforehand and the
`mode` is `page`
* `store_file_properties` stores the following file properties if enabled
* `attributes` will store POSIX file mode and ownership
* `md5` will store the MD5 of the file

Просмотреть файл

@ -5,7 +5,6 @@ Please read this section carefully for any current known limitations to
### Client-side Encryption
* Client-side encryption is currently only available for block blobs and
Azure Files.
* `stdin` sources cannot be encrypted.
* Azure KeyVault key references are currently not supported.
### Platform-specific
@ -16,6 +15,19 @@ Azure Files.
SHA256 object cannot be pickled.
* Append blobs currently cannot be resumed for upload.
### `stdin` Limitations
* `stdin` uploads without the `--stdin-as-page-blob-size` parameter will
allocate a maximum-sized page blob and then will be resized once the `stdin`
source completes. If the upload fails, the file will remain maximum sized
and will be charged as such; no cleanup is performed if the upload fails.
* `stdin` sources cannot be resumed.
* `stdin` sources cannot be encrypted.
* `stdin` sources cannot be stripe vectorized for upload.
* For optimal performance, `--chunk-size-bytes` should match the "chunk size"
that is being written to `stdin`. For example, if you were using `dd` you
should set the block size (`bs`) parameter to be the same as the
`--chunk-size-bytes` parameter.
### General Azure File Limitations
* Please see [this article](https://msdn.microsoft.com/en-us/library/azure/dn744326.aspx)
for more information.

Просмотреть файл

@ -154,6 +154,7 @@ def test_specification(tmpdir):
recursive=True,
rename=True,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -182,6 +183,7 @@ def test_specification(tmpdir):
recursive=True,
rename=True,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -210,6 +212,7 @@ def test_specification(tmpdir):
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -236,6 +239,7 @@ def test_specification(tmpdir):
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -262,6 +266,7 @@ def test_specification(tmpdir):
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -288,6 +293,7 @@ def test_specification(tmpdir):
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -313,6 +319,7 @@ def test_specification(tmpdir):
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,
@ -374,6 +381,7 @@ def test_descriptor(tmpdir):
assert ud.requires_put_block_list
assert not ud.requires_non_encrypted_md5_put
assert not ud.requires_set_file_properties_md5
assert ud.requires_resize() == (False, ud._offset)
# test sym key
ase = azmodels.StorageEntity('cont')
@ -495,7 +503,7 @@ def test_descriptor_compute_remote_size(tmpdir):
ase.replica_targets = [ase2]
ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock())
ud._compute_remote_size()
ud._compute_remote_size(opts)
assert ud.entity.size == 48
for rt in ase.replica_targets:
assert rt.size == ud.entity.size
@ -513,7 +521,7 @@ def test_descriptor_compute_remote_size(tmpdir):
ase._encryption = None
ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock())
ud._compute_remote_size()
ud._compute_remote_size(opts)
assert ud.entity.size == 32
# remote size of zero
@ -526,9 +534,27 @@ def test_descriptor_compute_remote_size(tmpdir):
ase._encryption = None
ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock())
ud._compute_remote_size()
ud._compute_remote_size(opts)
assert ud.entity.size == 0
# stdin as page, resize
lp = upload.LocalPath(pathlib.Path('-'), pathlib.Path('-'), use_stdin=True)
opts.stdin_as_page_blob_size = 0
ase._mode = azmodels.StorageModes.Page
ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock())
ud._compute_remote_size(opts)
assert ud.entity.size == upload._MAX_PAGE_BLOB_SIZE
assert ud._needs_resize
# stdin as page, no resize
lp = upload.LocalPath(pathlib.Path('-'), pathlib.Path('-'), use_stdin=True)
opts.stdin_as_page_blob_size = 32
ase._mode = azmodels.StorageModes.Page
ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock())
ud._compute_remote_size(opts)
assert ud.entity.size == 32
assert not ud._needs_resize
def test_descriptor_adjust_chunk_size(tmpdir):
tmpdir.join('a').ensure(file=True)
@ -944,13 +970,13 @@ def test_descriptor_read_data(tmpdir):
assert data == b'z'
assert newoffset.chunk_num == 0
assert newoffset.num_bytes == 1
assert newoffset.range_start == 1
assert newoffset.range_end == 1
assert newoffset.range_start == 0
assert newoffset.range_end == 0
assert not newoffset.pad
assert ud._total_chunks == 3
assert ud._outstanding_ops == 3
assert ud._offset == 2
assert ud.entity.size == 3
assert ud._offset == 1
assert ud.entity.size == 2
with mock.patch(
'blobxfer.STDIN', new_callable=mock.PropertyMock) as patched_stdin:

Просмотреть файл

@ -54,6 +54,7 @@ def test_output_parameters():
recursive=True,
rename=False,
rsa_public_key=None,
stdin_as_page_blob_size=0,
store_file_properties=options.FileProperties(
attributes=True,
md5=True,

Просмотреть файл

@ -556,6 +556,32 @@ def test_set_blob_metadata(sbm):
assert sbm.call_count == 2
@mock.patch('blobxfer.operations.azure.blob.page.resize_blob')
def test_resize_blob(rb):
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
ase = mock.MagicMock()
ase._client.primary_endpoint = 'ep'
ase.path = 'asepath'
ase.size = 10
ase.mode = azmodels.StorageModes.Block
ase.is_encrypted = False
ase.replica_targets = [ase]
lp = mock.MagicMock()
lp.absolute_path = 'lpabspath'
lp.view.fd_start = 0
lp.use_stdin = True
ud = mock.MagicMock()
ud.entity = ase
ud.local_path = lp
ud.unique_id = 'uid'
u._resize_blob(ud, 512)
assert rb.call_count == 2
def test_finalize_nonblock_blob():
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
@ -574,17 +600,24 @@ def test_finalize_nonblock_blob():
ud = mock.MagicMock()
ud.entity = ase
ud.complete_offset_upload = mock.MagicMock()
ud.local_path = lp
ud.unique_id = 'uid'
ud.requires_non_encrypted_md5_put = True
ud.requires_resize.return_value = (False, None)
u._set_blob_md5 = mock.MagicMock()
u._set_blob_metadata = mock.MagicMock()
u._resize_blob = mock.MagicMock()
u._finalize_nonblock_blob(ud, {'a': 0})
assert u._set_blob_md5.call_count == 1
assert u._set_blob_metadata.call_count == 1
assert u._resize_blob.call_count == 0
# resize required
ud.requires_resize.return_value = (True, 512)
u._finalize_nonblock_blob(ud, {'a': 0})
assert u._resize_blob.call_count == 1
@mock.patch('blobxfer.operations.azure.file.set_file_md5')