- Bind ASE to DownloadDescriptor instead
- Add chunk size option to download and synccopy
This commit is contained in:
Fred Park 2017-02-23 23:47:16 -08:00
Родитель 146fc24d35
Коммит e82890ada9
9 изменённых файлов: 282 добавлений и 74 удалений

Просмотреть файл

@ -128,3 +128,29 @@ def rsa_encrypt_key_base64_encoded(rsaprivatekey, rsapublickey, plainkey):
algorithm=cryptography.hazmat.primitives.hashes.SHA1(),
label=None))
return blobxfer.util.base64_encode_as_string(enckey)
def pad_pkcs7(buf):
# type: (bytes) -> bytes
"""Appends PKCS7 padding to an input buffer
:param bytes buf: buffer to add padding
:rtype: bytes
:return: buffer with PKCS7_PADDING
"""
padder = cryptography.hazmat.primitives.padding.PKCS7(
cryptography.hazmat.primitives.ciphers.
algorithms.AES.block_size).padder()
return padder.update(buf) + padder.finalize()
def unpad_pkcs7(buf):
# type: (bytes) -> bytes
"""Removes PKCS7 padding a decrypted object
:param bytes buf: buffer to remove padding
:rtype: bytes
:return: buffer without PKCS7_PADDING
"""
unpadder = cryptography.hazmat.primitives.padding.PKCS7(
cryptography.hazmat.primitives.ciphers.
algorithms.AES.block_size).unpadder()
return unpadder.update(buf) + unpadder.finalize()

Просмотреть файл

@ -47,6 +47,7 @@ import threading
import dateutil
# local imports
import blobxfer.md5
import blobxfer.models
import blobxfer.operations
import blobxfer.util
@ -211,9 +212,10 @@ class Downloader(object):
:param blobxfer.models.AzureStorageEntity rfile: remote file
"""
# prepare remote file for download
rfile.prepare_for_download(lpath, self._spec.options)
dd = blobxfer.models.DownloadDescriptor(
lpath, rfile, self._spec.options)
# add remote file to queue
self._download_queue.put(rfile)
self._download_queue.put(dd)
def _initialize_download_threads(self):
# type: (Downloader) -> None
@ -243,12 +245,34 @@ class Downloader(object):
if self._download_terminate:
break
try:
rfile = self._download_queue.get(False, 1)
dd = self._download_queue.get(False, 1)
except queue.Empty:
continue
# TODO
# get next offset with respect to chunk size
# get download offsets
# issue get range
# if encryption:
# 1. compute rolling hmac if present
# - roll through any subsequent unchecked parts
# 2. decrypt chunk
# compute rolling md5 if present
# - roll through any subsequent unchecked parts
# write data to disk
# if no integrity check could be performed due to current
# integrity offset mismatch, add to unchecked set
# check if last chunk to write
# 1. complete integrity checks
# 2. set file uid/gid
# 3. set file modes
# pickle dd to resume file
rfile = dd._ase
print('<<', rfile.container, rfile.name, rfile.lmt, rfile.size,
rfile.md5, rfile.mode, rfile.encryption_metadata)

Просмотреть файл

@ -100,6 +100,7 @@ UploadOptions = collections.namedtuple(
DownloadOptions = collections.namedtuple(
'DownloadOptions', [
'check_file_md5',
'chunk_size_bytes',
'delete_extraneous_destination',
'mode',
'overwrite',
@ -110,16 +111,24 @@ DownloadOptions = collections.namedtuple(
)
SyncCopyOptions = collections.namedtuple(
'SyncCopyOptions', [
'exclude',
'include',
'chunk_size_bytes',
'mode',
'overwrite',
'skip_on',
]
)
LocalPath = collections.namedtuple(
'LocalPath', [
'parent_path', 'relative_path'
'parent_path',
'relative_path',
]
)
DownloadOffsets = collections.namedtuple(
'DownloadOffsets', [
'fd_start',
'num_bytes',
'range_end',
'range_start',
'unpad',
]
)
@ -749,58 +758,60 @@ class AzureStorageEntity(object):
self._md5 = file.properties.content_settings.content_md5
self._mode = AzureStorageModes.File
def prepare_for_download(self, lpath, options):
# type: (AzureStorageEntity, pathlib.Path, DownloadOptions) -> None
"""Prepare entity for download
:param AzureStorageEntity self: this
:param pathlib.Path lpath: local path
:param DownloadOptions options: download options
"""
if self._encryption is not None:
hmac = self._encryption.initialize_hmac()
else:
hmac = None
if hmac is None and options.check_file_md5:
md5 = blobxfer.md5.new_md5_hasher()
else:
md5 = None
self.download = DownloadDescriptor(lpath, hmac, md5)
self.download.allocate_disk_space(
self._size, self._encryption is not None)
class DownloadDescriptor(object):
"""DownloadDescriptor"""
def __init__(self, lpath, hmac, md5):
# type: (DownloadDescriptior, pathlib.Path, hmac.HMAC, md5.MD5) -> None
"""Ctor for Download Descriptor
"""Download Descriptor"""
_AES_BLOCKSIZE = blobxfer.crypto.models._AES256_BLOCKSIZE_BYTES
def __init__(self, lpath, ase, options):
# type: (DownloadDescriptior, pathlib.Path, AzureStorageEntity,
# DownloadOptions) -> None
"""Ctor for DownloadDescriptor
:param DownloadDescriptor self: this
:param pathlib.Path lpath: local path
:param hmac.HMAC hmac: hmac
:param md5.MD5 md5: md5
:param AzureStorageEntity ase: Azure Storage Entity
:param DownloadOptions options: download options
"""
self.final_path = lpath
# create path holding the temporary file to download to
_tmp = list(lpath.parts[:-1])
_tmp.append(lpath.name + '.bxtmp')
self.local_path = pathlib.Path(*_tmp)
self.hmac = hmac
self.md5 = md5
self.current_position = 0
self._ase = ase
self._chunk_size = min((options.chunk_size_bytes, self._ase.size))
self.hmac = None
self.md5 = None
self.offset = 0
self.integrity_counter = 0
self.unchecked_chunks = set()
self._initialize_integrity_checkers(options)
self._allocate_disk_space()
def allocate_disk_space(self, size, encryption):
# type: (DownloadDescriptor, int, bool) -> None
"""Perform file allocation (possibly sparse), if encrypted this may
be an underallocation
def _initialize_integrity_checkers(self, options):
# type: (DownloadDescriptor, DownloadOptions) -> None
"""Initialize file integrity checkers
:param DownloadDescriptor self: this
:param DownloadOptions options: download options
"""
if self._ase.encryption_metadata is not None:
self.hmac = self._ase.encryption_metadata.initialize_hmac()
if self.hmac is None and options.check_file_md5:
self.md5 = blobxfer.md5.new_md5_hasher()
def _allocate_disk_space(self):
# type: (DownloadDescriptor, int) -> None
"""Perform file allocation (possibly sparse)
:param DownloadDescriptor self: this
:param int size: size
:param bool encryption: encryption enabled
"""
size = self._ase.size
# compute size
if size > 0:
if encryption:
allocatesize = size - \
blobxfer.crypto.models._AES256_BLOCKSIZE_BYTES
if self._ase.encryption_metadata is not None:
# cipher_len_without_iv = (clear_len / aes_bs + 1) * aes_bs
allocatesize = (size // self._AES_BLOCKSIZE - 1) * \
self._AES_BLOCKSIZE
else:
allocatesize = size
if allocatesize < 0:
@ -818,6 +829,47 @@ class DownloadDescriptor(object):
fd.seek(allocatesize - 1)
fd.write(b'\0')
def next_offsets(self):
# type: (DownloadDescriptor) -> DownloadOffsets
"""Retrieve the next offsets
:param DownloadDescriptor self: this
:rtype: DownloadOffsets
:return: download offsets
"""
if self.offset >= self._ase.size:
return None
if self.offset + self._chunk_size > self._ase.size:
chunk = self._ase.size - self.offset
else:
chunk = self._chunk_size
# on download, num_bytes must be offset by -1 as the x-ms-range
# header expects it that way. x -> y bytes means first bits of the
# (x+1)th byte to the last bits of the (y+1)th byte. for example,
# 0 -> 511 means byte 1 to byte 512
num_bytes = chunk - 1
fd_start = self.offset
range_start = self.offset
if self._ase.encryption_metadata is not None:
# ensure start is AES block size aligned
range_start = range_start - (range_start % self._AES_BLOCKSIZE) - \
self._AES_BLOCKSIZE
if range_start <= 0:
range_start = 0
range_end = self.offset + num_bytes
self.offset += chunk
if (self._ase.encryption_metadata is not None and
self.offset >= self._ase.size):
unpad = True
else:
unpad = False
return DownloadOffsets(
fd_start=fd_start,
num_bytes=num_bytes,
range_start=range_start,
range_end=range_end,
unpad=unpad,
)
class AzureDestinationPaths(object):
def __init__(self):

Просмотреть файл

@ -286,7 +286,7 @@ def _chunk_size_bytes_option(f):
expose_value=False,
type=int,
default=4194304,
help='Chunk size in bytes [4194304]',
help='Block or chunk size in bytes [4194304]',
callback=callback)(f)
@ -580,6 +580,7 @@ def download_options(f):
f = _exclude_option(f)
f = _endpoint_option(f)
f = _delete_option(f)
f = _chunk_size_bytes_option(f)
f = _access_key_option(f)
return f
@ -596,6 +597,7 @@ def sync_copy_options(f):
f = _include_option(f)
f = _exclude_option(f)
f = _endpoint_option(f)
f = _chunk_size_bytes_option(f)
f = _access_key_option(f)
return f

Просмотреть файл

@ -117,6 +117,7 @@ def add_cli_options(
'exclude': cli_options['exclude'],
'options': {
'check_file_md5': cli_options['file_md5'],
'chunk_size_bytes': cli_options['chunk_size_bytes'],
'delete_extraneous_destination': cli_options['delete'],
'mode': cli_options['mode'],
'overwrite': cli_options['overwrite'],
@ -148,6 +149,7 @@ def add_cli_options(
'include': cli_options['include'],
'exclude': cli_options['exclude'],
'options': {
'chunk_size_bytes': cli_options['chunk_size_bytes'],
'mode': cli_options['mode'],
'overwrite': cli_options['overwrite'],
'skip_on': {
@ -279,6 +281,7 @@ def create_download_specifications(config):
ds = blobxfer.models.DownloadSpecification(
download_options=blobxfer.models.DownloadOptions(
check_file_md5=conf['options']['check_file_md5'],
chunk_size_bytes=conf['options']['chunk_size_bytes'],
delete_extraneous_destination=conf[
'options']['delete_extraneous_destination'],
mode=mode,

Просмотреть файл

@ -42,3 +42,10 @@ def test_rsa_encrypt_decrypt_keys():
assert enckey is not None
plainkey = ops.rsa_decrypt_base64_encoded_key(_RSAKEY, enckey)
assert symkey == plainkey
def test_pkcs7_padding():
buf = os.urandom(32)
pbuf = ops.pad_pkcs7(buf)
buf2 = ops.unpad_pkcs7(pbuf)
assert buf == buf2

Просмотреть файл

@ -29,6 +29,7 @@ def test_check_download_conditions(tmpdir):
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=False,
@ -52,6 +53,7 @@ def test_check_download_conditions(tmpdir):
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=True,
@ -73,6 +75,7 @@ def test_check_download_conditions(tmpdir):
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=True,
@ -94,6 +97,7 @@ def test_check_download_conditions(tmpdir):
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=True,
@ -123,6 +127,7 @@ def test_check_download_conditions(tmpdir):
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=True,
@ -236,6 +241,7 @@ def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir):
d._md5_check_thread = mock.MagicMock()
d._spec.sources = []
d._spec.options = mock.MagicMock()
d._spec.options.chunk_size_bytes = 1
d._spec.options.mode = models.AzureStorageModes.Auto
d._spec.options.overwrite = True
d._spec.skip_on = mock.MagicMock()

Просмотреть файл

@ -279,6 +279,7 @@ def test_downloadspecification():
ds = models.DownloadSpecification(
download_options=models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=models.AzureStorageModes.Auto,
overwrite=True,
@ -341,45 +342,130 @@ def test_azurestorageentity():
assert ase.mode == models.AzureStorageModes.File
def test_azurestorageentity_prepare_for_download(tmpdir):
lp = pathlib.Path(str(tmpdir.join('a')))
opts = mock.MagicMock()
opts.check_file_md5 = True
ase = models.AzureStorageEntity('cont')
ase._size = 0
ase.prepare_for_download(lp, opts)
assert ase.download.hmac is None
assert ase.download.md5 is not None
assert ase.download.final_path == lp
assert ase.download.current_position == 0
ase._encryption = mock.MagicMock()
ase.prepare_for_download(lp, opts)
assert ase.download.hmac is not None
assert ase.download.md5 is None
def test_downloaddescriptor(tmpdir):
lp = pathlib.Path(str(tmpdir.join('a')))
d = models.DownloadDescriptor(lp, None, None)
assert d.current_position == 0
opts = mock.MagicMock()
opts.check_file_md5 = True
opts.chunk_size_bytes = 1
ase = models.AzureStorageEntity('cont')
ase._size = 1024
ase._encryption = mock.MagicMock()
d = models.DownloadDescriptor(lp, ase, opts)
assert d.offset == 0
assert d.final_path == lp
assert str(d.local_path) == str(lp) + '.bxtmp'
d.allocate_disk_space(1024, True)
assert d.local_path.stat().st_size == 1024 - 16
d.local_path.unlink()
d.allocate_disk_space(1, True)
ase._size = 1
d._allocate_disk_space()
assert d.local_path.stat().st_size == 0
d.local_path.unlink()
d.allocate_disk_space(1024, False)
ase._encryption = None
ase._size = 1024
d._allocate_disk_space()
assert d.local_path.stat().st_size == 1024
# pre-existing file check
d.allocate_disk_space(0, False)
ase._size = 0
d._allocate_disk_space()
assert d.local_path.stat().st_size == 0
def test_downloaddescriptor_next_offsets(tmpdir):
lp = pathlib.Path(str(tmpdir.join('a')))
opts = mock.MagicMock()
opts.check_file_md5 = True
opts.chunk_size_bytes = 256
ase = models.AzureStorageEntity('cont')
ase._size = 128
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 127
assert offsets.range_start == 0
assert offsets.range_end == 127
assert not offsets.unpad
assert d.next_offsets() is None
ase._size = 0
d = models.DownloadDescriptor(lp, ase, opts)
assert d.next_offsets() is None
ase._size = 1
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 0
assert offsets.range_start == 0
assert offsets.range_end == 0
assert not offsets.unpad
assert d.next_offsets() is None
ase._size = 256
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 255
assert offsets.range_start == 0
assert offsets.range_end == 255
assert not offsets.unpad
assert d.next_offsets() is None
ase._size = 256 + 16
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 255
assert offsets.range_start == 0
assert offsets.range_end == 255
assert not offsets.unpad
offsets = d.next_offsets()
assert offsets.fd_start == 256
assert offsets.num_bytes == 15
assert offsets.range_start == 256
assert offsets.range_end == 256 + 15
assert not offsets.unpad
assert d.next_offsets() is None
ase._encryption = mock.MagicMock()
ase._size = 128
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 127
assert offsets.range_start == 0
assert offsets.range_end == 127
assert offsets.unpad
assert d.next_offsets() is None
ase._size = 256
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 255
assert offsets.range_start == 0
assert offsets.range_end == 255
assert offsets.unpad
assert d.next_offsets() is None
ase._size = 256 + 32 # 16 bytes over + padding
d = models.DownloadDescriptor(lp, ase, opts)
offsets = d.next_offsets()
assert offsets.fd_start == 0
assert offsets.num_bytes == 255
assert offsets.range_start == 0
assert offsets.range_end == 255
assert not offsets.unpad
offsets = d.next_offsets()
assert offsets.fd_start == 256
assert offsets.num_bytes == 31
assert offsets.range_start == 256 - 16
assert offsets.range_end == 256 + 31
assert offsets.unpad
assert d.next_offsets() is None

Просмотреть файл

@ -20,6 +20,7 @@ def test_ensure_local_destination(patched_blob, patched_file, tmpdir):
ds = blobxfer.models.DownloadSpecification(
download_options=blobxfer.models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=blobxfer.models.AzureStorageModes.Auto,
overwrite=True,
@ -54,6 +55,7 @@ def test_ensure_local_destination(patched_blob, patched_file, tmpdir):
ds = blobxfer.models.DownloadSpecification(
download_options=blobxfer.models.DownloadOptions(
check_file_md5=True,
chunk_size_bytes=4194304,
delete_extraneous_destination=False,
mode=blobxfer.models.AzureStorageModes.File,
overwrite=True,