From e82890ada974bbba7b2dd98a06b1f3761bbfa3b3 Mon Sep 17 00:00:00 2001 From: Fred Park Date: Thu, 23 Feb 2017 23:47:16 -0800 Subject: [PATCH] Download offsets - Bind ASE to DownloadDescriptor instead - Add chunk size option to download and synccopy --- blobxfer/crypto/operations.py | 26 +++++ blobxfer/download.py | 34 +++++- blobxfer/models.py | 132 ++++++++++++++------- cli/cli.py | 4 +- cli/settings.py | 3 + tests/test_blobxfer_crypto_operations.py | 7 ++ tests/test_blobxfer_download.py | 6 + tests/test_blobxfer_models.py | 142 ++++++++++++++++++----- tests/test_blobxfer_operations.py | 2 + 9 files changed, 282 insertions(+), 74 deletions(-) diff --git a/blobxfer/crypto/operations.py b/blobxfer/crypto/operations.py index 9a0f099..1931407 100644 --- a/blobxfer/crypto/operations.py +++ b/blobxfer/crypto/operations.py @@ -128,3 +128,29 @@ def rsa_encrypt_key_base64_encoded(rsaprivatekey, rsapublickey, plainkey): algorithm=cryptography.hazmat.primitives.hashes.SHA1(), label=None)) return blobxfer.util.base64_encode_as_string(enckey) + + +def pad_pkcs7(buf): + # type: (bytes) -> bytes + """Appends PKCS7 padding to an input buffer + :param bytes buf: buffer to add padding + :rtype: bytes + :return: buffer with PKCS7_PADDING + """ + padder = cryptography.hazmat.primitives.padding.PKCS7( + cryptography.hazmat.primitives.ciphers. + algorithms.AES.block_size).padder() + return padder.update(buf) + padder.finalize() + + +def unpad_pkcs7(buf): + # type: (bytes) -> bytes + """Removes PKCS7 padding a decrypted object + :param bytes buf: buffer to remove padding + :rtype: bytes + :return: buffer without PKCS7_PADDING + """ + unpadder = cryptography.hazmat.primitives.padding.PKCS7( + cryptography.hazmat.primitives.ciphers. + algorithms.AES.block_size).unpadder() + return unpadder.update(buf) + unpadder.finalize() diff --git a/blobxfer/download.py b/blobxfer/download.py index dbbecac..49eea54 100644 --- a/blobxfer/download.py +++ b/blobxfer/download.py @@ -47,6 +47,7 @@ import threading import dateutil # local imports import blobxfer.md5 +import blobxfer.models import blobxfer.operations import blobxfer.util @@ -211,9 +212,10 @@ class Downloader(object): :param blobxfer.models.AzureStorageEntity rfile: remote file """ # prepare remote file for download - rfile.prepare_for_download(lpath, self._spec.options) + dd = blobxfer.models.DownloadDescriptor( + lpath, rfile, self._spec.options) # add remote file to queue - self._download_queue.put(rfile) + self._download_queue.put(dd) def _initialize_download_threads(self): # type: (Downloader) -> None @@ -243,12 +245,34 @@ class Downloader(object): if self._download_terminate: break try: - rfile = self._download_queue.get(False, 1) + dd = self._download_queue.get(False, 1) except queue.Empty: continue - # TODO - # get next offset with respect to chunk size + # get download offsets + # issue get range + + # if encryption: + # 1. compute rolling hmac if present + # - roll through any subsequent unchecked parts + # 2. decrypt chunk + + # compute rolling md5 if present + # - roll through any subsequent unchecked parts + + # write data to disk + + # if no integrity check could be performed due to current + # integrity offset mismatch, add to unchecked set + + # check if last chunk to write + # 1. complete integrity checks + # 2. set file uid/gid + # 3. set file modes + + # pickle dd to resume file + + rfile = dd._ase print('<<', rfile.container, rfile.name, rfile.lmt, rfile.size, rfile.md5, rfile.mode, rfile.encryption_metadata) diff --git a/blobxfer/models.py b/blobxfer/models.py index 34d05ce..6d0f753 100644 --- a/blobxfer/models.py +++ b/blobxfer/models.py @@ -100,6 +100,7 @@ UploadOptions = collections.namedtuple( DownloadOptions = collections.namedtuple( 'DownloadOptions', [ 'check_file_md5', + 'chunk_size_bytes', 'delete_extraneous_destination', 'mode', 'overwrite', @@ -110,16 +111,24 @@ DownloadOptions = collections.namedtuple( ) SyncCopyOptions = collections.namedtuple( 'SyncCopyOptions', [ - 'exclude', - 'include', + 'chunk_size_bytes', 'mode', 'overwrite', - 'skip_on', ] ) LocalPath = collections.namedtuple( 'LocalPath', [ - 'parent_path', 'relative_path' + 'parent_path', + 'relative_path', + ] +) +DownloadOffsets = collections.namedtuple( + 'DownloadOffsets', [ + 'fd_start', + 'num_bytes', + 'range_end', + 'range_start', + 'unpad', ] ) @@ -749,58 +758,60 @@ class AzureStorageEntity(object): self._md5 = file.properties.content_settings.content_md5 self._mode = AzureStorageModes.File - def prepare_for_download(self, lpath, options): - # type: (AzureStorageEntity, pathlib.Path, DownloadOptions) -> None - """Prepare entity for download - :param AzureStorageEntity self: this - :param pathlib.Path lpath: local path - :param DownloadOptions options: download options - """ - if self._encryption is not None: - hmac = self._encryption.initialize_hmac() - else: - hmac = None - if hmac is None and options.check_file_md5: - md5 = blobxfer.md5.new_md5_hasher() - else: - md5 = None - self.download = DownloadDescriptor(lpath, hmac, md5) - self.download.allocate_disk_space( - self._size, self._encryption is not None) - class DownloadDescriptor(object): - """DownloadDescriptor""" - def __init__(self, lpath, hmac, md5): - # type: (DownloadDescriptior, pathlib.Path, hmac.HMAC, md5.MD5) -> None - """Ctor for Download Descriptor + """Download Descriptor""" + + _AES_BLOCKSIZE = blobxfer.crypto.models._AES256_BLOCKSIZE_BYTES + + def __init__(self, lpath, ase, options): + # type: (DownloadDescriptior, pathlib.Path, AzureStorageEntity, + # DownloadOptions) -> None + """Ctor for DownloadDescriptor :param DownloadDescriptor self: this :param pathlib.Path lpath: local path - :param hmac.HMAC hmac: hmac - :param md5.MD5 md5: md5 + :param AzureStorageEntity ase: Azure Storage Entity + :param DownloadOptions options: download options """ self.final_path = lpath # create path holding the temporary file to download to _tmp = list(lpath.parts[:-1]) _tmp.append(lpath.name + '.bxtmp') self.local_path = pathlib.Path(*_tmp) - self.hmac = hmac - self.md5 = md5 - self.current_position = 0 + self._ase = ase + self._chunk_size = min((options.chunk_size_bytes, self._ase.size)) + self.hmac = None + self.md5 = None + self.offset = 0 + self.integrity_counter = 0 + self.unchecked_chunks = set() + self._initialize_integrity_checkers(options) + self._allocate_disk_space() - def allocate_disk_space(self, size, encryption): - # type: (DownloadDescriptor, int, bool) -> None - """Perform file allocation (possibly sparse), if encrypted this may - be an underallocation + def _initialize_integrity_checkers(self, options): + # type: (DownloadDescriptor, DownloadOptions) -> None + """Initialize file integrity checkers + :param DownloadDescriptor self: this + :param DownloadOptions options: download options + """ + if self._ase.encryption_metadata is not None: + self.hmac = self._ase.encryption_metadata.initialize_hmac() + if self.hmac is None and options.check_file_md5: + self.md5 = blobxfer.md5.new_md5_hasher() + + def _allocate_disk_space(self): + # type: (DownloadDescriptor, int) -> None + """Perform file allocation (possibly sparse) :param DownloadDescriptor self: this :param int size: size - :param bool encryption: encryption enabled """ + size = self._ase.size # compute size if size > 0: - if encryption: - allocatesize = size - \ - blobxfer.crypto.models._AES256_BLOCKSIZE_BYTES + if self._ase.encryption_metadata is not None: + # cipher_len_without_iv = (clear_len / aes_bs + 1) * aes_bs + allocatesize = (size // self._AES_BLOCKSIZE - 1) * \ + self._AES_BLOCKSIZE else: allocatesize = size if allocatesize < 0: @@ -818,6 +829,47 @@ class DownloadDescriptor(object): fd.seek(allocatesize - 1) fd.write(b'\0') + def next_offsets(self): + # type: (DownloadDescriptor) -> DownloadOffsets + """Retrieve the next offsets + :param DownloadDescriptor self: this + :rtype: DownloadOffsets + :return: download offsets + """ + if self.offset >= self._ase.size: + return None + if self.offset + self._chunk_size > self._ase.size: + chunk = self._ase.size - self.offset + else: + chunk = self._chunk_size + # on download, num_bytes must be offset by -1 as the x-ms-range + # header expects it that way. x -> y bytes means first bits of the + # (x+1)th byte to the last bits of the (y+1)th byte. for example, + # 0 -> 511 means byte 1 to byte 512 + num_bytes = chunk - 1 + fd_start = self.offset + range_start = self.offset + if self._ase.encryption_metadata is not None: + # ensure start is AES block size aligned + range_start = range_start - (range_start % self._AES_BLOCKSIZE) - \ + self._AES_BLOCKSIZE + if range_start <= 0: + range_start = 0 + range_end = self.offset + num_bytes + self.offset += chunk + if (self._ase.encryption_metadata is not None and + self.offset >= self._ase.size): + unpad = True + else: + unpad = False + return DownloadOffsets( + fd_start=fd_start, + num_bytes=num_bytes, + range_start=range_start, + range_end=range_end, + unpad=unpad, + ) + class AzureDestinationPaths(object): def __init__(self): diff --git a/cli/cli.py b/cli/cli.py index 64be863..0c085c7 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -286,7 +286,7 @@ def _chunk_size_bytes_option(f): expose_value=False, type=int, default=4194304, - help='Chunk size in bytes [4194304]', + help='Block or chunk size in bytes [4194304]', callback=callback)(f) @@ -580,6 +580,7 @@ def download_options(f): f = _exclude_option(f) f = _endpoint_option(f) f = _delete_option(f) + f = _chunk_size_bytes_option(f) f = _access_key_option(f) return f @@ -596,6 +597,7 @@ def sync_copy_options(f): f = _include_option(f) f = _exclude_option(f) f = _endpoint_option(f) + f = _chunk_size_bytes_option(f) f = _access_key_option(f) return f diff --git a/cli/settings.py b/cli/settings.py index f567bea..448d0a8 100644 --- a/cli/settings.py +++ b/cli/settings.py @@ -117,6 +117,7 @@ def add_cli_options( 'exclude': cli_options['exclude'], 'options': { 'check_file_md5': cli_options['file_md5'], + 'chunk_size_bytes': cli_options['chunk_size_bytes'], 'delete_extraneous_destination': cli_options['delete'], 'mode': cli_options['mode'], 'overwrite': cli_options['overwrite'], @@ -148,6 +149,7 @@ def add_cli_options( 'include': cli_options['include'], 'exclude': cli_options['exclude'], 'options': { + 'chunk_size_bytes': cli_options['chunk_size_bytes'], 'mode': cli_options['mode'], 'overwrite': cli_options['overwrite'], 'skip_on': { @@ -279,6 +281,7 @@ def create_download_specifications(config): ds = blobxfer.models.DownloadSpecification( download_options=blobxfer.models.DownloadOptions( check_file_md5=conf['options']['check_file_md5'], + chunk_size_bytes=conf['options']['chunk_size_bytes'], delete_extraneous_destination=conf[ 'options']['delete_extraneous_destination'], mode=mode, diff --git a/tests/test_blobxfer_crypto_operations.py b/tests/test_blobxfer_crypto_operations.py index 1760701..a37be4f 100644 --- a/tests/test_blobxfer_crypto_operations.py +++ b/tests/test_blobxfer_crypto_operations.py @@ -42,3 +42,10 @@ def test_rsa_encrypt_decrypt_keys(): assert enckey is not None plainkey = ops.rsa_decrypt_base64_encoded_key(_RSAKEY, enckey) assert symkey == plainkey + + +def test_pkcs7_padding(): + buf = os.urandom(32) + pbuf = ops.pad_pkcs7(buf) + buf2 = ops.unpad_pkcs7(pbuf) + assert buf == buf2 diff --git a/tests/test_blobxfer_download.py b/tests/test_blobxfer_download.py index 56e8999..a80c629 100644 --- a/tests/test_blobxfer_download.py +++ b/tests/test_blobxfer_download.py @@ -29,6 +29,7 @@ def test_check_download_conditions(tmpdir): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=False, @@ -52,6 +53,7 @@ def test_check_download_conditions(tmpdir): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=True, @@ -73,6 +75,7 @@ def test_check_download_conditions(tmpdir): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=True, @@ -94,6 +97,7 @@ def test_check_download_conditions(tmpdir): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=True, @@ -123,6 +127,7 @@ def test_check_download_conditions(tmpdir): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=True, @@ -236,6 +241,7 @@ def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir): d._md5_check_thread = mock.MagicMock() d._spec.sources = [] d._spec.options = mock.MagicMock() + d._spec.options.chunk_size_bytes = 1 d._spec.options.mode = models.AzureStorageModes.Auto d._spec.options.overwrite = True d._spec.skip_on = mock.MagicMock() diff --git a/tests/test_blobxfer_models.py b/tests/test_blobxfer_models.py index 7b81332..3227250 100644 --- a/tests/test_blobxfer_models.py +++ b/tests/test_blobxfer_models.py @@ -279,6 +279,7 @@ def test_downloadspecification(): ds = models.DownloadSpecification( download_options=models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=models.AzureStorageModes.Auto, overwrite=True, @@ -341,45 +342,130 @@ def test_azurestorageentity(): assert ase.mode == models.AzureStorageModes.File -def test_azurestorageentity_prepare_for_download(tmpdir): - lp = pathlib.Path(str(tmpdir.join('a'))) - opts = mock.MagicMock() - opts.check_file_md5 = True - - ase = models.AzureStorageEntity('cont') - ase._size = 0 - ase.prepare_for_download(lp, opts) - - assert ase.download.hmac is None - assert ase.download.md5 is not None - assert ase.download.final_path == lp - assert ase.download.current_position == 0 - - ase._encryption = mock.MagicMock() - ase.prepare_for_download(lp, opts) - - assert ase.download.hmac is not None - assert ase.download.md5 is None - - def test_downloaddescriptor(tmpdir): lp = pathlib.Path(str(tmpdir.join('a'))) - d = models.DownloadDescriptor(lp, None, None) - assert d.current_position == 0 + + opts = mock.MagicMock() + opts.check_file_md5 = True + opts.chunk_size_bytes = 1 + ase = models.AzureStorageEntity('cont') + ase._size = 1024 + ase._encryption = mock.MagicMock() + d = models.DownloadDescriptor(lp, ase, opts) + + assert d.offset == 0 assert d.final_path == lp assert str(d.local_path) == str(lp) + '.bxtmp' - - d.allocate_disk_space(1024, True) assert d.local_path.stat().st_size == 1024 - 16 d.local_path.unlink() - d.allocate_disk_space(1, True) + ase._size = 1 + d._allocate_disk_space() assert d.local_path.stat().st_size == 0 d.local_path.unlink() - d.allocate_disk_space(1024, False) + ase._encryption = None + ase._size = 1024 + d._allocate_disk_space() assert d.local_path.stat().st_size == 1024 # pre-existing file check - d.allocate_disk_space(0, False) + ase._size = 0 + d._allocate_disk_space() assert d.local_path.stat().st_size == 0 + + +def test_downloaddescriptor_next_offsets(tmpdir): + lp = pathlib.Path(str(tmpdir.join('a'))) + + opts = mock.MagicMock() + opts.check_file_md5 = True + opts.chunk_size_bytes = 256 + ase = models.AzureStorageEntity('cont') + ase._size = 128 + d = models.DownloadDescriptor(lp, ase, opts) + + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 127 + assert offsets.range_start == 0 + assert offsets.range_end == 127 + assert not offsets.unpad + assert d.next_offsets() is None + + ase._size = 0 + d = models.DownloadDescriptor(lp, ase, opts) + assert d.next_offsets() is None + + ase._size = 1 + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 0 + assert offsets.range_start == 0 + assert offsets.range_end == 0 + assert not offsets.unpad + assert d.next_offsets() is None + + ase._size = 256 + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 255 + assert offsets.range_start == 0 + assert offsets.range_end == 255 + assert not offsets.unpad + assert d.next_offsets() is None + + ase._size = 256 + 16 + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 255 + assert offsets.range_start == 0 + assert offsets.range_end == 255 + assert not offsets.unpad + offsets = d.next_offsets() + assert offsets.fd_start == 256 + assert offsets.num_bytes == 15 + assert offsets.range_start == 256 + assert offsets.range_end == 256 + 15 + assert not offsets.unpad + assert d.next_offsets() is None + + ase._encryption = mock.MagicMock() + ase._size = 128 + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 127 + assert offsets.range_start == 0 + assert offsets.range_end == 127 + assert offsets.unpad + assert d.next_offsets() is None + + ase._size = 256 + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 255 + assert offsets.range_start == 0 + assert offsets.range_end == 255 + assert offsets.unpad + assert d.next_offsets() is None + + ase._size = 256 + 32 # 16 bytes over + padding + d = models.DownloadDescriptor(lp, ase, opts) + offsets = d.next_offsets() + assert offsets.fd_start == 0 + assert offsets.num_bytes == 255 + assert offsets.range_start == 0 + assert offsets.range_end == 255 + assert not offsets.unpad + offsets = d.next_offsets() + assert offsets.fd_start == 256 + assert offsets.num_bytes == 31 + assert offsets.range_start == 256 - 16 + assert offsets.range_end == 256 + 31 + assert offsets.unpad + assert d.next_offsets() is None diff --git a/tests/test_blobxfer_operations.py b/tests/test_blobxfer_operations.py index 9926bab..9b648f6 100644 --- a/tests/test_blobxfer_operations.py +++ b/tests/test_blobxfer_operations.py @@ -20,6 +20,7 @@ def test_ensure_local_destination(patched_blob, patched_file, tmpdir): ds = blobxfer.models.DownloadSpecification( download_options=blobxfer.models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=blobxfer.models.AzureStorageModes.Auto, overwrite=True, @@ -54,6 +55,7 @@ def test_ensure_local_destination(patched_blob, patched_file, tmpdir): ds = blobxfer.models.DownloadSpecification( download_options=blobxfer.models.DownloadOptions( check_file_md5=True, + chunk_size_bytes=4194304, delete_extraneous_destination=False, mode=blobxfer.models.AzureStorageModes.File, overwrite=True,