diff --git a/CHANGELOG.md b/CHANGELOG.md index 41cc96f..0eef7e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,23 @@ ## [Unreleased] +### Added +- `upload` from `stdin` to page blob support. Optional +`--stdin-as-page-blob-size` parameter added. Please see current limitations +doc for more information. +- `upload` from `stdin` `--rename` support +- `synccopy` single object `--rename` support + +### Changed +- AppVeyor integration +- PyPI releases automatically generated for tags +- PyInstaller-based releases uploaded to GitHub for Windows and Linux + +### Fixed +- YAML config merge with CLI options when YAML options not present +- `synccopy` invocation without YAML config +- Test failures on Windows + ## [1.0.0b1] - 2017-08-28 ### Added - Cross-mode synchronous copy support diff --git a/README.md b/README.md index 2df45e5..e18d4bf 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ copies for Block blobs) * Support all Azure Blob types and Azure Files for both upload and download * Advanced skip options for rsync-like operations * Store/restore POSIX filemode and uid/gid -* Support reading/pipe from `stdin` +* Support reading/pipe from `stdin` including page blob destinations * Support reading from blob snapshots for downloading and synchronous copy * Configurable one-shot block upload support * Configurable chunk size for both upload and download diff --git a/blobxfer/models/options.py b/blobxfer/models/options.py index a13ee80..da6963c 100644 --- a/blobxfer/models/options.py +++ b/blobxfer/models/options.py @@ -76,6 +76,7 @@ Upload = collections.namedtuple( 'recursive', 'rename', 'rsa_public_key', + 'stdin_as_page_blob_size', 'store_file_properties', 'strip_components', 'vectored_io', diff --git a/blobxfer/models/upload.py b/blobxfer/models/upload.py index c214513..2b8d92a 100644 --- a/blobxfer/models/upload.py +++ b/blobxfer/models/upload.py @@ -214,7 +214,11 @@ class LocalSourcePath(blobxfer.models._BaseSourcePaths): :rtype: bool :return: if rename possible """ - return len(self._paths) == 1 and self._paths[0].is_file() + return len(self._paths) == 1 and ( + self._paths[0].is_file() or + blobxfer.models.upload.LocalSourcePath.is_stdin( + str(self._paths[0])) + ) @staticmethod def is_stdin(path): @@ -349,15 +353,20 @@ class Descriptor(object): self._chunk_num = 0 self._next_integrity_chunk = 0 self._finalized = False + self._needs_resize = False self._meta_lock = threading.Lock() self._hasher_lock = threading.Lock() - self._resume_mgr = resume_mgr + if resume_mgr and self.local_path.use_stdin: + logger.warning('ignoring resume option for stdin source') + self._resume_mgr = None + else: + self._resume_mgr = resume_mgr self._ase = ase self._store_file_attr = options.store_file_properties.attributes self.current_iv = None self._initialize_encryption(options) # calculate the total number of ops required for transfer - self._compute_remote_size() + self._compute_remote_size(options) self._adjust_chunk_size(options) self._total_chunks = self._compute_total_chunks(self._chunk_size) self._outstanding_ops = self._total_chunks @@ -501,6 +510,16 @@ class Descriptor(object): return (not self.entity.is_encrypted and self.must_compute_md5 and self.remote_is_file) + def requires_resize(self): + # type: (Descriptor) -> tuple + """Remote destination requires a resize operation + :param Descriptor self: this + :rtype: tuple + :return: blob requires a resize, length + """ + with self._meta_lock: + return (self._needs_resize, self._offset) + def complete_offset_upload(self, chunk_num): # type: (Descriptor, int) -> None """Complete the upload for the offset @@ -573,15 +592,23 @@ class Descriptor(object): self.current_iv = em.content_encryption_iv self._ase.encryption_metadata = em - def _compute_remote_size(self): - # type: (Descriptor, int) -> None + def _compute_remote_size(self, options): + # type: (Descriptor, blobxfer.models.options.Upload) -> None """Compute total remote file size :param Descriptor self: this + :param blobxfer.models.options.Upload options: upload options :rtype: int :return: remote file size """ size = self.local_path.size - if size > 0: + if (self._ase.mode == blobxfer.models.azure.StorageModes.Page and + self.local_path.use_stdin): + if options.stdin_as_page_blob_size == 0: + allocatesize = _MAX_PAGE_BLOB_SIZE + self._needs_resize = True + else: + allocatesize = options.stdin_as_page_blob_size + elif size > 0: if self._ase.is_encrypted: # cipher_len_without_iv = (clear_len / aes_bs + 1) * aes_bs allocatesize = (size // self._AES_BLOCKSIZE + 1) * \ @@ -678,7 +705,9 @@ class Descriptor(object): chunks = int(math.ceil(self._ase.size / chunk_size)) except ZeroDivisionError: chunks = 1 - if self.local_path.use_stdin and chunks == 0: + # for stdin, override and use 1 chunk to start, this will change + # dynamically as data as read + if self.local_path.use_stdin: chunks = 1 if (self._ase.mode != blobxfer.models.azure.StorageModes.Page and chunks > 50000): @@ -877,12 +906,16 @@ class Descriptor(object): data = blobxfer.STDIN.read(self._chunk_size) if not data: with self._meta_lock: + self._offset -= offsets.num_bytes + self._ase.size -= offsets.num_bytes self._total_chunks -= 1 self._chunk_num -= 1 self._outstanding_ops -= 1 else: num_bytes = len(data) with self._meta_lock: + self._offset -= offsets.num_bytes + self._ase.size -= offsets.num_bytes newoffset = Offsets( chunk_num=self._chunk_num - 1, num_bytes=num_bytes, diff --git a/blobxfer/operations/azure/blob/page.py b/blobxfer/operations/azure/blob/page.py index c99f226..744a48a 100644 --- a/blobxfer/operations/azure/blob/page.py +++ b/blobxfer/operations/azure/blob/page.py @@ -102,3 +102,17 @@ def put_page(ase, page_start, page_end, data, timeout=None): end_range=page_end, validate_content=False, # integrity is enforced with HTTPS timeout=timeout) # noqa + + +def resize_blob(ase, size, timeout=None): + # type: (blobxfer.models.azure.StorageEntity, int, int) -> None + """Resizes a page blob + :param blobxfer.models.azure.StorageEntity ase: Azure StorageEntity + :param int size: content length + :param int timeout: timeout + """ + ase.client.resize_blob( + container_name=ase.container, + blob_name=ase.name, + content_length=blobxfer.util.page_align_content_length(size), + timeout=timeout) # noqa diff --git a/blobxfer/operations/upload.py b/blobxfer/operations/upload.py index 974ac21..a14d998 100644 --- a/blobxfer/operations/upload.py +++ b/blobxfer/operations/upload.py @@ -628,6 +628,18 @@ class Uploader(object): for ase in ud.entity.replica_targets: blobxfer.operations.azure.blob.set_blob_metadata(ase, metadata) + def _resize_blob(self, ud, size): + # type: (Uploader, blobxfer.models.upload.Descriptor, int) -> None + """Resize page blob + :param Uploader self: this + :param blobxfer.models.upload.Descriptor ud: upload descriptor + :param int size: content length + """ + blobxfer.operations.azure.blob.page.resize_blob(ud.entity, size) + if blobxfer.util.is_not_empty(ud.entity.replica_targets): + for ase in ud.entity.replica_targets: + blobxfer.operations.azure.blob.page.resize_blob(ase, size) + def _finalize_nonblock_blob(self, ud, metadata): # type: (Uploader, blobxfer.models.upload.Descriptor, dict) -> None """Finalize Non-Block blob @@ -635,6 +647,10 @@ class Uploader(object): :param blobxfer.models.upload.Descriptor ud: upload descriptor :param dict metadata: metadata dict """ + # resize page blobs to final size if required + needs_resize, final_size = ud.requires_resize() + if needs_resize: + self._resize_blob(ud, final_size) # set md5 page blob property if required if ud.requires_non_encrypted_md5_put: self._set_blob_md5(ud) @@ -1030,7 +1046,7 @@ class Uploader(object): skipped_size = 0 approx_total_bytes = 0 # iterate through source paths to upload - dupes = set() + seen = set() for src in self._spec.sources.files(): # create a destination array for the source dest = [ @@ -1040,11 +1056,11 @@ class Uploader(object): for action, lp, ase in self._vectorize_and_bind(src, dest): dest_id = blobxfer.operations.upload.Uploader.\ create_destination_id(ase._client, ase.container, ase.name) - if dest_id in dupes: + if dest_id in seen: raise RuntimeError( 'duplicate destination entity detected: {}/{}'.format( ase._client.primary_endpoint, ase.path)) - dupes.add(dest_id) + seen.add(dest_id) if self._spec.options.delete_extraneous_destination: self._delete_exclude.add(dest_id) if action == UploadAction.Skip: @@ -1064,7 +1080,7 @@ class Uploader(object): self._pre_md5_skip_on_check(lp, ase) elif action == UploadAction.Upload: self._add_to_upload_queue(lp, ase, uid) - del dupes + del seen # set remote files processed with self._md5_meta_lock: self._all_files_processed = True diff --git a/cli/cli.py b/cli/cli.py index a62d12c..db9a185 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -609,6 +609,20 @@ def _skip_on_md5_match_option(f): callback=callback)(f) +def _stdin_as_page_blob_size_option(f): + def callback(ctx, param, value): + clictx = ctx.ensure_object(CliContext) + clictx.cli_options['stdin_as_page_blob_size'] = value + return value + return click.option( + '--stdin-as-page-blob-size', + expose_value=False, + type=int, + default=None, + help='Size of page blob with input from stdin [0]', + callback=callback)(f) + + def _strip_components_option(f): def callback(ctx, param, value): clictx = ctx.ensure_object(CliContext) @@ -708,6 +722,7 @@ def _sync_copy_dest_storage_account_option(f): def upload_options(f): f = _stripe_chunk_size_bytes_option(f) f = _strip_components_option(f) + f = _stdin_as_page_blob_size_option(f) f = _skip_on_md5_match_option(f) f = _skip_on_lmt_ge_option(f) f = _skip_on_filesize_match_option(f) diff --git a/cli/settings.py b/cli/settings.py index e9253bf..27aa162 100644 --- a/cli/settings.py +++ b/cli/settings.py @@ -97,6 +97,8 @@ def add_cli_options(cli_options, action): 'lmt_ge': cli_options.get('skip_on_lmt_ge'), 'md5_match': cli_options.get('skip_on_md5_match'), }, + 'stdin_as_page_blob_size': cli_options.get( + 'stdin_as_page_blob_size'), 'store_file_properties': { 'attributes': cli_options.get('file_attributes'), 'md5': cli_options.get('file_md5'), @@ -609,6 +611,9 @@ def create_upload_specifications(cli_options, config): cli_options, sfp, 'md5', name_cli='file_md5', default=False), ), + stdin_as_page_blob_size=_merge_setting( + cli_options, conf['options'], 'stdin_as_page_blob_size', + default=0), strip_components=_merge_setting( cli_options, conf['options'], 'strip_components', default=0), diff --git a/docs/10-cli-usage.md b/docs/10-cli-usage.md index 89544df..1ca9f43 100644 --- a/docs/10-cli-usage.md +++ b/docs/10-cli-usage.md @@ -193,6 +193,8 @@ behavior. * `--rename` renames a single file to the target destination or source path. This can only be used when transferring a single source file to a destination and can be used with any command. +* `--stdin-as-page-blob-size` allows a page blob size to be set if known +beforehand when using `stdin` as a source and the destination is a page blob * `--strip-components N` will strip the leading `N` components from the local file path on upload. The default is `0`. diff --git a/docs/20-yaml-configuration.md b/docs/20-yaml-configuration.md index b430246..5bcd148 100644 --- a/docs/20-yaml-configuration.md +++ b/docs/20-yaml-configuration.md @@ -165,6 +165,7 @@ upload: filesize_match: false lmt_ge: false md5_match: true + stdin_as_page_blob_size: 0 store_file_properties: attributes: true md5: true @@ -201,6 +202,9 @@ upload: * `lmt_ge` skip if remote file has a last modified time greater than or equal to the local file * `md5_match` skip if MD5 match + * `stdin_as_page_blob_size` is the page blob size to preallocate if the + amount of data to be streamed from stdin is known beforehand and the + `mode` is `page` * `store_file_properties` stores the following file properties if enabled * `attributes` will store POSIX file mode and ownership * `md5` will store the MD5 of the file diff --git a/docs/99-current-limitations.md b/docs/99-current-limitations.md index f1ef51d..e7af51f 100644 --- a/docs/99-current-limitations.md +++ b/docs/99-current-limitations.md @@ -5,7 +5,6 @@ Please read this section carefully for any current known limitations to ### Client-side Encryption * Client-side encryption is currently only available for block blobs and Azure Files. -* `stdin` sources cannot be encrypted. * Azure KeyVault key references are currently not supported. ### Platform-specific @@ -16,6 +15,19 @@ Azure Files. SHA256 object cannot be pickled. * Append blobs currently cannot be resumed for upload. +### `stdin` Limitations +* `stdin` uploads without the `--stdin-as-page-blob-size` parameter will +allocate a maximum-sized page blob and then will be resized once the `stdin` +source completes. If the upload fails, the file will remain maximum sized +and will be charged as such; no cleanup is performed if the upload fails. +* `stdin` sources cannot be resumed. +* `stdin` sources cannot be encrypted. +* `stdin` sources cannot be stripe vectorized for upload. +* For optimal performance, `--chunk-size-bytes` should match the "chunk size" +that is being written to `stdin`. For example, if you were using `dd` you +should set the block size (`bs`) parameter to be the same as the +`--chunk-size-bytes` parameter. + ### General Azure File Limitations * Please see [this article](https://msdn.microsoft.com/en-us/library/azure/dn744326.aspx) for more information. diff --git a/tests/test_blobxfer_models_upload.py b/tests/test_blobxfer_models_upload.py index 99f1948..a275425 100644 --- a/tests/test_blobxfer_models_upload.py +++ b/tests/test_blobxfer_models_upload.py @@ -154,6 +154,7 @@ def test_specification(tmpdir): recursive=True, rename=True, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -182,6 +183,7 @@ def test_specification(tmpdir): recursive=True, rename=True, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -210,6 +212,7 @@ def test_specification(tmpdir): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -236,6 +239,7 @@ def test_specification(tmpdir): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -262,6 +266,7 @@ def test_specification(tmpdir): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -288,6 +293,7 @@ def test_specification(tmpdir): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -313,6 +319,7 @@ def test_specification(tmpdir): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, @@ -374,6 +381,7 @@ def test_descriptor(tmpdir): assert ud.requires_put_block_list assert not ud.requires_non_encrypted_md5_put assert not ud.requires_set_file_properties_md5 + assert ud.requires_resize() == (False, ud._offset) # test sym key ase = azmodels.StorageEntity('cont') @@ -495,7 +503,7 @@ def test_descriptor_compute_remote_size(tmpdir): ase.replica_targets = [ase2] ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock()) - ud._compute_remote_size() + ud._compute_remote_size(opts) assert ud.entity.size == 48 for rt in ase.replica_targets: assert rt.size == ud.entity.size @@ -513,7 +521,7 @@ def test_descriptor_compute_remote_size(tmpdir): ase._encryption = None ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock()) - ud._compute_remote_size() + ud._compute_remote_size(opts) assert ud.entity.size == 32 # remote size of zero @@ -526,9 +534,27 @@ def test_descriptor_compute_remote_size(tmpdir): ase._encryption = None ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock()) - ud._compute_remote_size() + ud._compute_remote_size(opts) assert ud.entity.size == 0 + # stdin as page, resize + lp = upload.LocalPath(pathlib.Path('-'), pathlib.Path('-'), use_stdin=True) + opts.stdin_as_page_blob_size = 0 + ase._mode = azmodels.StorageModes.Page + ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock()) + ud._compute_remote_size(opts) + assert ud.entity.size == upload._MAX_PAGE_BLOB_SIZE + assert ud._needs_resize + + # stdin as page, no resize + lp = upload.LocalPath(pathlib.Path('-'), pathlib.Path('-'), use_stdin=True) + opts.stdin_as_page_blob_size = 32 + ase._mode = azmodels.StorageModes.Page + ud = upload.Descriptor(lp, ase, 'uid', opts, mock.MagicMock()) + ud._compute_remote_size(opts) + assert ud.entity.size == 32 + assert not ud._needs_resize + def test_descriptor_adjust_chunk_size(tmpdir): tmpdir.join('a').ensure(file=True) @@ -944,13 +970,13 @@ def test_descriptor_read_data(tmpdir): assert data == b'z' assert newoffset.chunk_num == 0 assert newoffset.num_bytes == 1 - assert newoffset.range_start == 1 - assert newoffset.range_end == 1 + assert newoffset.range_start == 0 + assert newoffset.range_end == 0 assert not newoffset.pad assert ud._total_chunks == 3 assert ud._outstanding_ops == 3 - assert ud._offset == 2 - assert ud.entity.size == 3 + assert ud._offset == 1 + assert ud.entity.size == 2 with mock.patch( 'blobxfer.STDIN', new_callable=mock.PropertyMock) as patched_stdin: diff --git a/tests/test_blobxfer_operations_progress.py b/tests/test_blobxfer_operations_progress.py index f689d98..fc63a8e 100644 --- a/tests/test_blobxfer_operations_progress.py +++ b/tests/test_blobxfer_operations_progress.py @@ -54,6 +54,7 @@ def test_output_parameters(): recursive=True, rename=False, rsa_public_key=None, + stdin_as_page_blob_size=0, store_file_properties=options.FileProperties( attributes=True, md5=True, diff --git a/tests/test_blobxfer_operations_upload.py b/tests/test_blobxfer_operations_upload.py index 84ca04c..1c304ec 100644 --- a/tests/test_blobxfer_operations_upload.py +++ b/tests/test_blobxfer_operations_upload.py @@ -556,6 +556,32 @@ def test_set_blob_metadata(sbm): assert sbm.call_count == 2 +@mock.patch('blobxfer.operations.azure.blob.page.resize_blob') +def test_resize_blob(rb): + u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + + ase = mock.MagicMock() + ase._client.primary_endpoint = 'ep' + ase.path = 'asepath' + ase.size = 10 + ase.mode = azmodels.StorageModes.Block + ase.is_encrypted = False + ase.replica_targets = [ase] + + lp = mock.MagicMock() + lp.absolute_path = 'lpabspath' + lp.view.fd_start = 0 + lp.use_stdin = True + + ud = mock.MagicMock() + ud.entity = ase + ud.local_path = lp + ud.unique_id = 'uid' + + u._resize_blob(ud, 512) + assert rb.call_count == 2 + + def test_finalize_nonblock_blob(): u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) @@ -574,17 +600,24 @@ def test_finalize_nonblock_blob(): ud = mock.MagicMock() ud.entity = ase - ud.complete_offset_upload = mock.MagicMock() ud.local_path = lp ud.unique_id = 'uid' ud.requires_non_encrypted_md5_put = True + ud.requires_resize.return_value = (False, None) u._set_blob_md5 = mock.MagicMock() u._set_blob_metadata = mock.MagicMock() + u._resize_blob = mock.MagicMock() u._finalize_nonblock_blob(ud, {'a': 0}) assert u._set_blob_md5.call_count == 1 assert u._set_blob_metadata.call_count == 1 + assert u._resize_blob.call_count == 0 + + # resize required + ud.requires_resize.return_value = (True, 512) + u._finalize_nonblock_blob(ud, {'a': 0}) + assert u._resize_blob.call_count == 1 @mock.patch('blobxfer.operations.azure.file.set_file_md5')