- Fix various issues uncovered from UTs
This commit is contained in:
Fred Park 2017-05-31 21:01:37 -07:00
Родитель a04a5724bc
Коммит e308ed8595
25 изменённых файлов: 403 добавлений и 246 удалений

Просмотреть файл

@ -60,17 +60,6 @@ class _BaseSourcePaths(object):
""" """
return self._paths return self._paths
def add_include(self, incl):
# type: (_BaseSourcePaths, str) -> None
"""Add an include
:param _BaseSourcePaths self: this
:param str incl: include filter
"""
if self._include is None:
self._include = list(incl)
else:
self._include.append(incl)
def add_includes(self, includes): def add_includes(self, includes):
# type: (_BaseSourcePaths, list) -> None # type: (_BaseSourcePaths, list) -> None
"""Add a list of includes """Add a list of includes
@ -78,23 +67,12 @@ class _BaseSourcePaths(object):
:param list includes: list of includes :param list includes: list of includes
""" """
if not isinstance(includes, list): if not isinstance(includes, list):
includes = list(includes) includes = [includes]
if self._include is None: if self._include is None:
self._include = includes self._include = includes
else: else:
self._include.extend(includes) self._include.extend(includes)
def add_exclude(self, excl):
# type: (_BaseSourcePaths, str) -> None
"""Add an exclude
:param _BaseSourcePaths self: this
:param str excl: exclude filter
"""
if self._exclude is None:
self._exclude = list(excl)
else:
self._exclude.append(excl)
def add_excludes(self, excludes): def add_excludes(self, excludes):
# type: (_BaseSourcePaths, list) -> None # type: (_BaseSourcePaths, list) -> None
"""Add a list of excludes """Add a list of excludes
@ -102,7 +80,7 @@ class _BaseSourcePaths(object):
:param list excludes: list of excludes :param list excludes: list of excludes
""" """
if not isinstance(excludes, list): if not isinstance(excludes, list):
excludes = list(excludes) excludes = [excludes]
if self._exclude is None: if self._exclude is None:
self._exclude = excludes self._exclude = excludes
else: else:

Просмотреть файл

@ -345,7 +345,7 @@ class Descriptor(object):
fd_start=0, fd_start=0,
fd_end=slicesize, fd_end=slicesize,
) )
total_size = ase.size total_size = slicesize
else: else:
view = LocalPathView( view = LocalPathView(
fd_start=ase.vectored_io.offset_start, fd_start=ase.vectored_io.offset_start,
@ -529,7 +529,7 @@ class Descriptor(object):
pass pass
# iterate unchecked chunks and delete # iterate unchecked chunks and delete
for key in self._unchecked_chunks: for key in self._unchecked_chunks:
ucc = self._unchecked_chunks[key] ucc = self._unchecked_chunks[key]['ucc']
if ucc.temp: if ucc.temp:
try: try:
ucc.file_path.unlink() ucc.file_path.unlink()

Просмотреть файл

@ -122,24 +122,24 @@ class Concurrency(object):
if self.crypto_processes is None or self.crypto_processes < 1: if self.crypto_processes is None or self.crypto_processes < 1:
self.crypto_processes = 0 self.crypto_processes = 0
if self.md5_processes is None or self.md5_processes < 1: if self.md5_processes is None or self.md5_processes < 1:
self.md5_processes = multiprocessing.cpu_count() // 2 self.md5_processes = multiprocessing.cpu_count() >> 1
if self.md5_processes < 1: if self.md5_processes < 1:
self.md5_processes = 1 self.md5_processes = 1
auto_disk = False auto_disk = False
if self.disk_threads is None or self.disk_threads < 1: if self.disk_threads is None or self.disk_threads < 1:
self.disk_threads = multiprocessing.cpu_count() * 4 self.disk_threads = multiprocessing.cpu_count() << 1
# cap maximum number of disk threads from cpu count to 96 # cap maximum number of disk threads from cpu count to 64
if self.disk_threads > 96: if self.disk_threads > 64:
self.transfer_threads = 96 self.disk_threads = 64
auto_disk = True auto_disk = True
if self.transfer_threads is None or self.transfer_threads < 1: if self.transfer_threads is None or self.transfer_threads < 1:
if auto_disk: if auto_disk:
self.transfer_threads = self.disk_threads << 1 self.transfer_threads = self.disk_threads << 1
else: else:
self.transfer_threads = multiprocessing.cpu_count() * 2 self.transfer_threads = multiprocessing.cpu_count() << 2
# cap maximum number of threads from cpu count to 64 # cap maximum number of threads from cpu count to 96
if self.transfer_threads > 64: if self.transfer_threads > 96:
self.transfer_threads = 64 self.transfer_threads = 96
class General(object): class General(object):

Просмотреть файл

@ -287,7 +287,7 @@ class SourcePath(blobxfer.models._BaseSourcePaths):
encryption_metadata_exists(entity.metadata): encryption_metadata_exists(entity.metadata):
ed = blobxfer.models.crypto.EncryptionMetadata() ed = blobxfer.models.crypto.EncryptionMetadata()
ed.convert_from_json( ed.convert_from_json(
entity.metadata, file.name, options.rsa_private_key) entity.metadata, entity.name, options.rsa_private_key)
else: else:
ed = None ed = None
ase = blobxfer.models.azure.StorageEntity(container, ed) ase = blobxfer.models.azure.StorageEntity(container, ed)

Просмотреть файл

@ -298,6 +298,7 @@ class Downloader(object):
convert_vectored_io_slice_to_final_path_name(lpath, rfile) convert_vectored_io_slice_to_final_path_name(lpath, rfile)
) )
else: else:
view = None
fpath = slpath fpath = slpath
self._md5_offload.add_localfile_for_md5_check( self._md5_offload.add_localfile_for_md5_check(
key, slpath, fpath, md5, rfile.mode, view) key, slpath, fpath, md5, rfile.mode, view)
@ -453,11 +454,10 @@ class Downloader(object):
"""Worker thread download """Worker thread download
:param Downloader self: this :param Downloader self: this
""" """
max_set_len = self._general_options.concurrency.disk_threads << 2
while not self.termination_check: while not self.termination_check:
try: try:
if (len(self._disk_set) > if len(self._disk_set) > max_set_len:
self._general_options.concurrency.
disk_threads * 4):
time.sleep(0.2) time.sleep(0.2)
continue continue
else: else:

Просмотреть файл

@ -39,6 +39,8 @@ import azure.storage
import cryptography import cryptography
import requests import requests
# local imports # local imports
import blobxfer.models.download
import blobxfer.models.upload
import blobxfer.util import blobxfer.util
import blobxfer.version import blobxfer.version
@ -158,7 +160,7 @@ def output_parameters(general_options, spec):
spec.skip_on.filesize_match, spec.skip_on.filesize_match,
spec.skip_on.lmt_ge, spec.skip_on.lmt_ge,
spec.skip_on.md5_match)) spec.skip_on.md5_match))
log.append(' chunk size: {} bytes'.format( log.append(' chunk size bytes: {}'.format(
spec.options.chunk_size_bytes)) spec.options.chunk_size_bytes))
log.append(' delete extraneous: {}'.format( log.append(' delete extraneous: {}'.format(
spec.options.delete_extraneous_destination)) spec.options.delete_extraneous_destination))

Просмотреть файл

@ -44,7 +44,8 @@ import blobxfer.util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _BaseResumeManager(): class _BaseResumeManager(object):
"""Base Resume Manager"""
def __init__(self, resume_file): def __init__(self, resume_file):
# type: (_BaseResumeManager, str) -> None # type: (_BaseResumeManager, str) -> None
"""Ctor for _BaseResumeManager """Ctor for _BaseResumeManager
@ -99,18 +100,21 @@ class _BaseResumeManager():
:rtype: str :rtype: str
:return: record key :return: record key
""" """
return '{}:{}'.format(ase._client.primary_endpoint, ase.path) key = '{}:{}'.format(ase._client.primary_endpoint, ase.path)
if blobxfer.util.on_python2():
return key.encode('utf8')
else:
return key
def get_record(self, ase, key=None, lock=True): def get_record(self, ase, key=None, lock=True):
# type: (_BaseResumeManager, str, # type: (_BaseResumeManager, str, bool) -> object
# bool) -> object
"""Get a resume record """Get a resume record
:param _BaseResumeManager self: this :param _BaseResumeManager self: this
:param blobxfer.models.azure.StorageEntity ase: Storage Entity :param blobxfer.models.azure.StorageEntity ase: Storage Entity
:param str key: record key :param str key: record key
:param bool lock: acquire lock :param bool lock: acquire lock
:rtype: blobxfer.models.resume._Base :rtype: object
:return: _Base record :return: resume record object
""" """
if key is None: if key is None:
key = blobxfer.operations.resume._BaseResumeManager.\ key = blobxfer.operations.resume._BaseResumeManager.\

Просмотреть файл

@ -446,11 +446,10 @@ class Uploader(object):
"""Worker thread upload """Worker thread upload
:param Uploader self: this :param Uploader self: this
""" """
max_set_len = self._general_options.concurrency.transfer_threads << 2
while not self.termination_check: while not self.termination_check:
try: try:
if (len(self._transfer_set) > if len(self._transfer_set) > max_set_len:
self._general_options.concurrency.
transfer_threads * 4):
time.sleep(0.2) time.sleep(0.2)
continue continue
else: else:

Просмотреть файл

@ -37,23 +37,34 @@ import azure.storage.retry
class ExponentialRetryWithMaxWait(azure.storage.retry._Retry): class ExponentialRetryWithMaxWait(azure.storage.retry._Retry):
"""Exponential Retry with Max Wait (infinite retries)""" """Exponential Retry with Max Wait (infinite retries)"""
def __init__(self, initial_backoff=0.1, max_backoff=2, reset_at_max=True): def __init__(
# type: (ExponentialRetryWithMaxWait, int, int, bool) -> None self, initial_backoff=0.1, max_backoff=1, max_retries=None,
reset_at_max=True):
# type: (ExponentialRetryWithMaxWait, int, int, int, bool) -> None
"""Ctor for ExponentialRetryWithMaxWait """Ctor for ExponentialRetryWithMaxWait
:param ExponentialRetryWithMaxWait self: this :param ExponentialRetryWithMaxWait self: this
:param int initial_backoff: initial backoff :param int initial_backoff: initial backoff
:param int max_backoff: max backoff :param int max_backoff: max backoff
:param int max_retries: max retries
:param bool reset_at_max: reset after reaching max wait :param bool reset_at_max: reset after reaching max wait
""" """
if max_backoff <= 0:
raise ValueError(
'max backoff is non-positive: {}'.format(max_backoff))
if max_retries is not None and max_retries < 0:
raise ValueError(
'max retries is invalid: {}'.format(max_retries))
if max_backoff < initial_backoff: if max_backoff < initial_backoff:
raise ValueError( raise ValueError(
'max backoff {} less than initial backoff {}'.format( 'max backoff {} less than initial backoff {}'.format(
max_backoff, initial_backoff)) max_backoff, initial_backoff))
self._backoff_count = 0
self._last_backoff = initial_backoff
self.initial_backoff = initial_backoff self.initial_backoff = initial_backoff
self.max_backoff = max_backoff self.max_backoff = max_backoff
self.reset_at_max = reset_at_max self.reset_at_max = reset_at_max
super(ExponentialRetryWithMaxWait, self).__init__( super(ExponentialRetryWithMaxWait, self).__init__(
max_backoff if self.reset_at_max else 2147483647, False) max_retries if max_retries is not None else 2147483647, False)
def retry(self, context): def retry(self, context):
# type: (ExponentialRetryWithMaxWait, # type: (ExponentialRetryWithMaxWait,
@ -75,11 +86,12 @@ class ExponentialRetryWithMaxWait(azure.storage.retry._Retry):
:rtype: int :rtype: int
:return: backoff amount :return: backoff amount
""" """
if context.count == 1: self._backoff_count += 1
backoff = self.initial_backoff if self._backoff_count == 1:
self._last_backoff = self.initial_backoff
else: else:
backoff = self.initial_backoff * (context.count - 1) self._last_backoff *= 2
if backoff > self.max_backoff and self.reset_at_max: if self._last_backoff > self.max_backoff and self.reset_at_max:
backoff = self.initial_backoff self._backoff_count = 1
context.count = 1 self._last_backoff = self.initial_backoff
return backoff return self._last_backoff

Просмотреть файл

@ -65,7 +65,7 @@ def on_python2():
return future.utils.PY2 return future.utils.PY2
def on_windows(): def on_windows(): # noqa
# type: (None) -> bool # type: (None) -> bool
"""Execution on Windows """Execution on Windows
:rtype: bool :rtype: bool

Просмотреть файл

@ -41,7 +41,11 @@ import ruamel.yaml
import blobxfer.api import blobxfer.api
import blobxfer.util import blobxfer.util
# local imports # local imports
from . import settings try:
from . import settings
except (SystemError, ImportError): # noqa
# for local testing
import settings
# create logger # create logger
logger = logging.getLogger('blobxfer') logger = logging.getLogger('blobxfer')

Просмотреть файл

@ -1,5 +1,5 @@
flake8>=3.3.0 flake8>=3.3.0
mock>=2.0.0; python_version < '3.3' mock>=2.0.0; python_version < '3.3'
pypandoc>=1.3.3 pypandoc>=1.4
pytest>=3.0.7 pytest>=3.1.1
pytest-cov>=2.4.0 pytest-cov>=2.5.1

Просмотреть файл

@ -49,6 +49,6 @@ def test_azurestorageentity():
assert ase.snapshot is not None assert ase.snapshot is not None
blob.snapshot = None blob.snapshot = None
ase.populate_from_file(mock.MagicMock(), blob) ase.populate_from_file(mock.MagicMock(), blob, 'path')
assert ase.mode == azmodels.StorageModes.File assert ase.mode == azmodels.StorageModes.File
assert ase.snapshot is None assert ase.snapshot is None

Просмотреть файл

@ -110,32 +110,33 @@ def test_downloaddescriptor(tmpdir):
d._allocate_disk_space() d._allocate_disk_space()
assert d.entity == ase assert d.entity == ase
assert d.entity.is_encrypted
assert not d.must_compute_md5 assert not d.must_compute_md5
assert d.hmac is not None
assert d._total_chunks == 64 assert d._total_chunks == 64
assert d._offset == 0 assert d._offset == 0
assert d.final_path == lp assert d.final_path == lp
assert str(d.local_path) == str(lp) + '.bxtmp'
assert d._allocated assert d._allocated
assert d.local_path.stat().st_size == 1024 - 16 assert d.final_path.stat().st_size == ase._size - 16
d._allocate_disk_space() d._allocate_disk_space()
assert d._allocated assert d._allocated
d.local_path.unlink() d.final_path.unlink()
ase._size = 1 ase._size = 32
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
assert d._total_chunks == 1 assert d._total_chunks == 2
assert d._allocated assert d._allocated
assert d.local_path.stat().st_size == 0 assert d.final_path.stat().st_size == ase._size - 16
d.local_path.unlink() d.final_path.unlink()
ase._encryption = None ase._encryption = None
ase._size = 1024 ase._size = 1024
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
assert d._allocated assert d._allocated
assert d.local_path.stat().st_size == 1024 assert d.final_path.stat().st_size == ase._size
# pre-existing file check # pre-existing file check
ase._size = 0 ase._size = 0
@ -143,13 +144,12 @@ def test_downloaddescriptor(tmpdir):
d._allocate_disk_space() d._allocate_disk_space()
assert d._total_chunks == 0 assert d._total_chunks == 0
assert d._allocated assert d._allocated
assert d.local_path.stat().st_size == 0 assert d.final_path.stat().st_size == ase._size
@unittest.skipIf(util.on_python2(), 'fallocate does not exist') @unittest.skipIf(util.on_python2(), 'fallocate does not exist')
def test_downloaddescriptor_allocate_disk_space_via_seek(tmpdir): def test_downloaddescriptor_allocate_disk_space_via_seek(tmpdir):
fp = pathlib.Path(str(tmpdir.join('fp'))) fp = pathlib.Path(str(tmpdir.join('fp')))
lp = pathlib.Path(str(tmpdir.join('fp.bxtmp')))
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = True opts.check_file_md5 = True
opts.chunk_size_bytes = 256 opts.chunk_size_bytes = 256
@ -162,14 +162,13 @@ def test_downloaddescriptor_allocate_disk_space_via_seek(tmpdir):
patched_fallocate.side_effect = [AttributeError()] patched_fallocate.side_effect = [AttributeError()]
d._allocate_disk_space() d._allocate_disk_space()
assert d._allocated assert d._allocated
assert not fp.exists() assert fp.exists()
assert lp.stat().st_size == ase._size assert fp.stat().st_size == ase._size
def test_downloaddescriptor_resume(tmpdir): def test_downloaddescriptor_resume(tmpdir):
resumefile = pathlib.Path(str(tmpdir.join('resume'))) resumefile = pathlib.Path(str(tmpdir.join('resume')))
fp = pathlib.Path(str(tmpdir.join('fp'))) fp = pathlib.Path(str(tmpdir.join('fp')))
lp = pathlib.Path(str(tmpdir.join('fp.bxtmp')))
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = True opts.check_file_md5 = True
@ -177,6 +176,7 @@ def test_downloaddescriptor_resume(tmpdir):
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 128 ase._size = 128
ase._name = 'blob' ase._name = 'blob'
ase._client = mock.MagicMock()
# test no record # test no record
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
@ -185,7 +185,7 @@ def test_downloaddescriptor_resume(tmpdir):
assert rb is None assert rb is None
# test length mismatch # test length mismatch
rmgr.add_or_update_record(str(fp), str(lp), 127, 0, 0, False, None) rmgr.add_or_update_record(str(fp), ase, 0, 0, False, None)
rb = d._resume() rb = d._resume()
assert rb is None assert rb is None
@ -193,7 +193,7 @@ def test_downloaddescriptor_resume(tmpdir):
rmgr.delete() rmgr.delete()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
rmgr.add_or_update_record(str(fp), str(lp), ase._size, 0, 0, False, None) rmgr.add_or_update_record(str(fp), ase, 0, 0, False, None)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
assert rb is None assert rb is None
@ -202,7 +202,7 @@ def test_downloaddescriptor_resume(tmpdir):
rmgr.delete() rmgr.delete()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, True, None) rmgr.add_or_update_record(str(fp), ase, 32, 1, True, None)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
fp.touch() fp.touch()
rb = d._resume() rb = d._resume()
@ -215,22 +215,23 @@ def test_downloaddescriptor_resume(tmpdir):
ase._encryption = mock.MagicMock() ase._encryption = mock.MagicMock()
ase._encryption.symmetric_key = b'123' ase._encryption.symmetric_key = b'123'
rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
assert rb is None assert rb is None
# test if intermediate file not exists # test up to chunk
rmgr.delete() rmgr.delete()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 128 ase._size = 128
ase._name = 'blob' ase._name = 'blob'
ase._client = mock.MagicMock()
rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
assert rb is None assert rb == 32
# ensure hmac not populated # ensure hmac not populated
rmgr.delete() rmgr.delete()
@ -238,9 +239,10 @@ def test_downloaddescriptor_resume(tmpdir):
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 128 ase._size = 128
ase._name = 'blob' ase._name = 'blob'
lp.touch() ase._client = mock.MagicMock()
fp.touch()
rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
d.hmac = True d.hmac = True
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
@ -251,13 +253,12 @@ def test_downloaddescriptor_resume(tmpdir):
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
data = os.urandom(32) data = os.urandom(32)
with lp.open('wb') as f: with fp.open('wb') as f:
f.write(data) f.write(data)
md5 = util.new_md5_hasher() md5 = util.new_md5_hasher()
md5.update(data) md5.update(data)
rmgr.add_or_update_record( rmgr.add_or_update_record(str(fp), ase, 32, 1, False, md5.hexdigest())
str(fp), str(lp), ase._size, 32, 1, False, md5.hexdigest())
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
assert rb == 32 assert rb == 32
@ -265,8 +266,7 @@ def test_downloaddescriptor_resume(tmpdir):
# md5 hash mismatch # md5 hash mismatch
rmgr.delete() rmgr.delete()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
rmgr.add_or_update_record( rmgr.add_or_update_record(str(fp), ase, 32, 1, False, 'abc')
str(fp), str(lp), ase._size, 32, 1, False, 'abc')
ase._md5 = 'abc' ase._md5 = 'abc'
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
@ -278,10 +278,10 @@ def test_downloaddescriptor_resume(tmpdir):
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 128 ase._size = 128
ase._name = 'blob' ase._name = 'blob'
ase._client = mock.MagicMock()
ase._mode = azmodels.StorageModes.Page ase._mode = azmodels.StorageModes.Page
rmgr.add_or_update_record( rmgr.add_or_update_record(str(fp), ase, 32, 1, False, md5.hexdigest())
str(fp), str(lp), ase._size, 32, 1, False, md5.hexdigest())
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
rb = d._resume() rb = d._resume()
assert rb == 32 assert rb == 32
@ -443,10 +443,11 @@ def test_write_unchecked_data(tmpdir):
assert offsets.chunk_num in d._unchecked_chunks assert offsets.chunk_num in d._unchecked_chunks
ucc = d._unchecked_chunks[offsets.chunk_num] ucc = d._unchecked_chunks[offsets.chunk_num]
assert ucc.data_len == ase._size assert ucc['ucc'].data_len == ase._size
assert ucc.fd_start == offsets.fd_start assert ucc['ucc'].fd_start == offsets.fd_start
assert ucc.file_path == d.local_path assert ucc['ucc'].file_path == d.final_path
assert not ucc.temp assert not ucc['ucc'].temp
assert ucc['decrypted']
def test_write_unchecked_hmac_data(tmpdir): def test_write_unchecked_hmac_data(tmpdir):
@ -464,10 +465,11 @@ def test_write_unchecked_hmac_data(tmpdir):
assert offsets.chunk_num in d._unchecked_chunks assert offsets.chunk_num in d._unchecked_chunks
ucc = d._unchecked_chunks[offsets.chunk_num] ucc = d._unchecked_chunks[offsets.chunk_num]
assert ucc.data_len == ase._size assert ucc['ucc'].data_len == ase._size
assert ucc.fd_start == offsets.fd_start assert ucc['ucc'].fd_start == offsets.fd_start
assert ucc.file_path != d.local_path assert ucc['ucc'].file_path != d.final_path
assert ucc.temp assert ucc['ucc'].temp
assert not ucc['decrypted']
def test_perform_chunked_integrity_check(tmpdir): def test_perform_chunked_integrity_check(tmpdir):
@ -505,10 +507,12 @@ def test_perform_chunked_integrity_check(tmpdir):
offsets1, _ = d.next_offsets() offsets1, _ = d.next_offsets()
d.write_unchecked_hmac_data(offsets1, data) d.write_unchecked_hmac_data(offsets1, data)
ucc1 = d._unchecked_chunks[offsets1.chunk_num] ucc1 = d._unchecked_chunks[offsets1.chunk_num]
ucc['decrypted'] = True
ucc1['decrypted'] = True
d.perform_chunked_integrity_check() d.perform_chunked_integrity_check()
assert not ucc.file_path.exists() assert ucc['ucc'].file_path != d.final_path
assert not ucc1.file_path.exists() assert ucc1['ucc'].file_path != d.final_path
assert d._next_integrity_chunk == 2 assert d._next_integrity_chunk == 2
assert 0 not in d._unchecked_chunks assert 0 not in d._unchecked_chunks
assert 1 not in d._unchecked_chunks assert 1 not in d._unchecked_chunks
@ -529,6 +533,7 @@ def test_perform_chunked_integrity_check(tmpdir):
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 32 ase._size = 32
ase._name = 'blob' ase._name = 'blob'
ase._client = mock.MagicMock()
ase._md5 = md5.hexdigest() ase._md5 = md5.hexdigest()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
@ -539,7 +544,7 @@ def test_perform_chunked_integrity_check(tmpdir):
d.perform_chunked_integrity_check() d.perform_chunked_integrity_check()
assert d._next_integrity_chunk == 1 assert d._next_integrity_chunk == 1
assert len(d._unchecked_chunks) == 0 assert len(d._unchecked_chunks) == 0
dr = rmgr.get_record(str(fp)) dr = rmgr.get_record(ase)
assert dr.next_integrity_chunk == 1 assert dr.next_integrity_chunk == 1
assert dr.md5hexdigest == md5.hexdigest() assert dr.md5hexdigest == md5.hexdigest()
@ -553,11 +558,12 @@ def test_update_resume_for_completed(tmpdir):
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 32 ase._size = 32
ase._name = 'blob' ase._name = 'blob'
ase._client = mock.MagicMock()
rmgr = rops.DownloadResumeManager(resumefile) rmgr = rops.DownloadResumeManager(resumefile)
d = models.Descriptor(fp, ase, opts, rmgr) d = models.Descriptor(fp, ase, opts, rmgr)
offsets, _ = d.next_offsets() offsets, _ = d.next_offsets()
d._update_resume_for_completed() d._update_resume_for_completed()
dr = rmgr.get_record(str(fp)) dr = rmgr.get_record(ase)
assert dr.completed assert dr.completed
@ -575,8 +581,8 @@ def test_cleanup_all_temporary_files(tmpdir):
d.write_unchecked_data(offsets, data) d.write_unchecked_data(offsets, data)
assert len(d._unchecked_chunks) == 1 assert len(d._unchecked_chunks) == 1
d.cleanup_all_temporary_files() d.cleanup_all_temporary_files()
assert not d.local_path.exists() assert not d.final_path.exists()
assert not d._unchecked_chunks[0].file_path.exists() assert not d._unchecked_chunks[0]['ucc'].file_path.exists()
lp = pathlib.Path(str(tmpdir.join('b'))) lp = pathlib.Path(str(tmpdir.join('b')))
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
@ -585,11 +591,10 @@ def test_cleanup_all_temporary_files(tmpdir):
data = b'0' * opts.chunk_size_bytes data = b'0' * opts.chunk_size_bytes
d.write_unchecked_hmac_data(offsets, data) d.write_unchecked_hmac_data(offsets, data)
assert len(d._unchecked_chunks) == 1 assert len(d._unchecked_chunks) == 1
d.local_path.unlink() d._unchecked_chunks[0]['ucc'].file_path.unlink()
d._unchecked_chunks[0].file_path.unlink()
d.cleanup_all_temporary_files() d.cleanup_all_temporary_files()
assert not d.local_path.exists() assert not d.final_path.exists()
assert not d._unchecked_chunks[0].file_path.exists() assert not d._unchecked_chunks[0]['ucc'].file_path.exists()
def test_write_data(tmpdir): def test_write_data(tmpdir):
@ -606,11 +611,11 @@ def test_write_data(tmpdir):
data = b'0' * ase._size data = b'0' * ase._size
d.write_data(offsets, data) d.write_data(offsets, data)
assert d.local_path.exists() assert d.final_path.exists()
assert d.local_path.stat().st_size == len(data) assert d.final_path.stat().st_size == len(data)
def test_finalize_file(tmpdir): def test_finalize_integrity_and_file(tmpdir):
# already finalized # already finalized
lp = pathlib.Path(str(tmpdir.join('af'))) lp = pathlib.Path(str(tmpdir.join('af')))
opts = mock.MagicMock() opts = mock.MagicMock()
@ -624,11 +629,12 @@ def test_finalize_file(tmpdir):
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
d._finalized = True d._finalized = True
d.finalize_integrity()
d.finalize_file() d.finalize_file()
assert d.local_path.exists() assert d.final_path.exists()
assert not d.final_path.exists() assert d.final_path.stat().st_size == ase._size
d.local_path.unlink() d.final_path.unlink()
# hmac check success # hmac check success
lp = pathlib.Path(str(tmpdir.join('a'))) lp = pathlib.Path(str(tmpdir.join('a')))
@ -654,9 +660,9 @@ def test_finalize_file(tmpdir):
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
d.hmac.update(data) d.hmac.update(data)
d.finalize_integrity()
d.finalize_file() d.finalize_file()
assert not d.local_path.exists()
assert d.final_path.exists() assert d.final_path.exists()
assert d.final_path.stat().st_size == len(data) assert d.final_path.stat().st_size == len(data)
@ -676,9 +682,9 @@ def test_finalize_file(tmpdir):
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
d.md5.update(data) d.md5.update(data)
d.finalize_integrity()
d.finalize_file() d.finalize_file()
assert not d.local_path.exists()
assert d.final_path.exists() assert d.final_path.exists()
assert d.final_path.stat().st_size == len(data) assert d.final_path.stat().st_size == len(data)
@ -694,9 +700,9 @@ def test_finalize_file(tmpdir):
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
d.finalize_integrity()
d.finalize_file() d.finalize_file()
assert not d.local_path.exists()
assert d.final_path.exists() assert d.final_path.exists()
assert d.final_path.stat().st_size == len(data) assert d.final_path.stat().st_size == len(data)
@ -714,9 +720,9 @@ def test_finalize_file(tmpdir):
d = models.Descriptor(lp, ase, opts, None) d = models.Descriptor(lp, ase, opts, None)
d._allocate_disk_space() d._allocate_disk_space()
d.md5.update(data) d.md5.update(data)
d.finalize_integrity()
d.finalize_file() d.finalize_file()
assert not d.local_path.exists()
assert not d.final_path.exists() assert not d.final_path.exists()

Просмотреть файл

@ -21,22 +21,38 @@ def test_concurrency_options(patched_cc):
a = options.Concurrency( a = options.Concurrency(
crypto_processes=-1, crypto_processes=-1,
md5_processes=0, md5_processes=0,
disk_threads=-1,
transfer_threads=-2, transfer_threads=-2,
) )
assert a.crypto_processes == 0 assert a.crypto_processes == 0
assert a.md5_processes == 1 assert a.md5_processes == 1
assert a.disk_threads == 2
assert a.transfer_threads == 4
a = options.Concurrency(
crypto_processes=-1,
md5_processes=0,
disk_threads=1,
transfer_threads=-1,
)
assert a.crypto_processes == 0
assert a.md5_processes == 1
assert a.disk_threads == 1
assert a.transfer_threads == 4 assert a.transfer_threads == 4
@mock.patch('multiprocessing.cpu_count', return_value=64) @mock.patch('multiprocessing.cpu_count', return_value=64)
def test_concurrency_options_max_transfer_threads(patched_cc): def test_concurrency_options_max_disk_and_transfer_threads(patched_cc):
a = options.Concurrency( a = options.Concurrency(
crypto_processes=1, crypto_processes=1,
md5_processes=1, md5_processes=1,
disk_threads=None,
transfer_threads=None, transfer_threads=None,
) )
assert a.disk_threads == 64
assert a.transfer_threads == 96 assert a.transfer_threads == 96
@ -45,7 +61,8 @@ def test_general_options():
concurrency=options.Concurrency( concurrency=options.Concurrency(
crypto_processes=1, crypto_processes=1,
md5_processes=2, md5_processes=2,
transfer_threads=3, disk_threads=3,
transfer_threads=4,
), ),
log_file='abc.log', log_file='abc.log',
progress_bar=False, progress_bar=False,
@ -56,7 +73,8 @@ def test_general_options():
assert a.concurrency.crypto_processes == 1 assert a.concurrency.crypto_processes == 1
assert a.concurrency.md5_processes == 2 assert a.concurrency.md5_processes == 2
assert a.concurrency.transfer_threads == 3 assert a.concurrency.disk_threads == 3
assert a.concurrency.transfer_threads == 4
assert a.log_file == 'abc.log' assert a.log_file == 'abc.log'
assert not a.progress_bar assert not a.progress_bar
assert a.resume_file == pathlib.Path('abc') assert a.resume_file == pathlib.Path('abc')
@ -67,7 +85,8 @@ def test_general_options():
concurrency=options.Concurrency( concurrency=options.Concurrency(
crypto_processes=1, crypto_processes=1,
md5_processes=2, md5_processes=2,
transfer_threads=3, disk_threads=3,
transfer_threads=4,
), ),
progress_bar=False, progress_bar=False,
resume_file=None, resume_file=None,
@ -77,7 +96,8 @@ def test_general_options():
assert a.concurrency.crypto_processes == 1 assert a.concurrency.crypto_processes == 1
assert a.concurrency.md5_processes == 2 assert a.concurrency.md5_processes == 2
assert a.concurrency.transfer_threads == 3 assert a.concurrency.disk_threads == 3
assert a.concurrency.transfer_threads == 4
assert a.log_file is None assert a.log_file is None
assert not a.progress_bar assert not a.progress_bar
assert a.resume_file is None assert a.resume_file is None

Просмотреть файл

@ -8,9 +8,8 @@ import blobxfer.models.resume as rmodels
def test_download(): def test_download():
d = rmodels.Download('fp', 'tp', 1, 2, 0, False, '') d = rmodels.Download('fp', 1, 2, 0, False, '')
assert d.final_path == 'fp' assert d.final_path == 'fp'
assert d.temp_path == 'tp'
assert d.length == 1 assert d.length == 1
assert d.chunk_size == 2 assert d.chunk_size == 2
assert d.next_integrity_chunk == 0 assert d.next_integrity_chunk == 0

Просмотреть файл

@ -7,7 +7,6 @@ try:
except ImportError: # noqa except ImportError: # noqa
import pathlib import pathlib
# non-stdlib imports # non-stdlib imports
import pytest
# module under test # module under test
import blobxfer.models.upload as upload import blobxfer.models.upload as upload
@ -26,14 +25,10 @@ def test_localsourcepaths_files(tmpdir):
defpath.join('moo.cow').write('y') defpath.join('moo.cow').write('y')
a = upload.LocalSourcePath() a = upload.LocalSourcePath()
a.add_include('*.txt') a.add_includes('*.txt')
a.add_includes(['moo.cow', '*blah*']) a.add_includes(['moo.cow', '*blah*'])
with pytest.raises(ValueError): a.add_excludes('**/blah.x')
a.add_includes('abc')
a.add_exclude('**/blah.x')
a.add_excludes(['world.txt']) a.add_excludes(['world.txt'])
with pytest.raises(ValueError):
a.add_excludes('abc')
a.add_path(str(tmpdir)) a.add_path(str(tmpdir))
a_set = set() a_set = set()
for file in a.files(): for file in a.files():
@ -47,9 +42,9 @@ def test_localsourcepaths_files(tmpdir):
b = upload.LocalSourcePath() b = upload.LocalSourcePath()
b.add_includes(['moo.cow', '*blah*']) b.add_includes(['moo.cow', '*blah*'])
b.add_include('*.txt') b.add_includes('*.txt')
b.add_excludes(['world.txt']) b.add_excludes(['world.txt'])
b.add_exclude('**/blah.x') b.add_excludes('**/blah.x')
b.add_paths([pathlib.Path(str(tmpdir))]) b.add_paths([pathlib.Path(str(tmpdir))])
for file in a.files(): for file in a.files():
sfile = str(file.parent_path / file.relative_path) sfile = str(file.parent_path / file.relative_path)

Просмотреть файл

@ -106,7 +106,7 @@ def test_azuresourcepath_files(patched_lf, patched_em):
i = 0 i = 0
for file in asp.files(creds, options, mock.MagicMock()): for file in asp.files(creds, options, mock.MagicMock()):
i += 1 i += 1
assert file.name == 'name' assert file.name == 'remote/name'
assert file.encryption_metadata is None assert file.encryption_metadata is None
assert i == 1 assert i == 1
@ -119,7 +119,7 @@ def test_azuresourcepath_files(patched_lf, patched_em):
i = 0 i = 0
for file in asp.files(creds, options, mock.MagicMock()): for file in asp.files(creds, options, mock.MagicMock()):
i += 1 i += 1
assert file.name == 'name' assert file.name == 'remote/name'
assert file.encryption_metadata is not None assert file.encryption_metadata is not None
assert i == 1 assert i == 1

Просмотреть файл

@ -118,7 +118,7 @@ def test_cryptooffload_decrypt(tmpdir):
unpad=False, unpad=False,
) )
a.add_decrypt_chunk( a.add_decrypt_chunk(
'fp', str(bfile), offsets, symkey, iv, hmacfile) str(bfile), 0, offsets, symkey, iv, hmacfile)
i = 33 i = 33
checked = False checked = False
while i > 0: while i > 0:
@ -127,7 +127,7 @@ def test_cryptooffload_decrypt(tmpdir):
time.sleep(0.3) time.sleep(0.3)
i -= 1 i -= 1
continue continue
assert result == 'fp' assert result == (str(bfile), offsets)
checked = True checked = True
break break
assert checked assert checked

Просмотреть файл

@ -285,18 +285,26 @@ def test_pre_md5_skip_on_check():
rfile = azmodels.StorageEntity('cont') rfile = azmodels.StorageEntity('cont')
rfile._encryption = mock.MagicMock() rfile._encryption = mock.MagicMock()
rfile._encryption.blobxfer_extensions = mock.MagicMock() rfile._encryption.blobxfer_extensions = mock.MagicMock()
rfile._encryption.blobxfer_extensions.pre_encrypted_content_md5 = \ rfile._encryption.blobxfer_extensions.pre_encrypted_content_md5 = 'abc'
'abc' rfile._client = mock.MagicMock()
rfile._client.primary_endpoint = 'ep'
rfile._name = 'name'
rfile._vio = None
lpath = 'lpath' lpath = 'lpath'
key = ops.Downloader.create_unique_transfer_operation_id(rfile)
d._pre_md5_skip_on_check(lpath, rfile) d._pre_md5_skip_on_check(lpath, rfile)
assert lpath in d._md5_map assert key in d._md5_map
rfile._name = 'name2'
lpath = 'lpath2' lpath = 'lpath2'
rfile._encryption = None rfile._encryption = None
rfile._md5 = 'abc' rfile._md5 = 'abc'
key = ops.Downloader.create_unique_transfer_operation_id(rfile)
d._pre_md5_skip_on_check(lpath, rfile) d._pre_md5_skip_on_check(lpath, rfile)
assert lpath in d._md5_map assert key in d._md5_map
assert len(d._md5_map) == 2
def test_post_md5_skip_on_check(tmpdir): def test_post_md5_skip_on_check(tmpdir):
@ -309,28 +317,45 @@ def test_post_md5_skip_on_check(tmpdir):
lpath = str(lp) lpath = str(lp)
rfile = azmodels.StorageEntity('cont') rfile = azmodels.StorageEntity('cont')
rfile._md5 = 'abc' rfile._md5 = 'abc'
rfile._client = mock.MagicMock()
rfile._client.primary_endpoint = 'ep'
rfile._name = 'name'
rfile._vio = None
rfile._size = 256
d._pre_md5_skip_on_check(lpath, rfile) d._pre_md5_skip_on_check(lpath, rfile)
d._download_set.add(pathlib.Path(lpath)) key = ops.Downloader.create_unique_transfer_operation_id(rfile)
assert lpath in d._md5_map d._transfer_set.add(key)
assert key in d._md5_map
d._post_md5_skip_on_check(lpath, True) d._post_md5_skip_on_check(key, lpath, rfile._size, True)
assert lpath not in d._md5_map assert key not in d._md5_map
d._add_to_download_queue = mock.MagicMock() d._add_to_download_queue = mock.MagicMock()
d._pre_md5_skip_on_check(lpath, rfile) d._pre_md5_skip_on_check(lpath, rfile)
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
d._post_md5_skip_on_check(lpath, False) d._post_md5_skip_on_check(key, lpath, rfile._size, False)
assert d._add_to_download_queue.call_count == 1 assert d._add_to_download_queue.call_count == 1
def test_check_for_downloads_from_md5(): def test_check_for_downloads_from_md5():
lpath = 'lpath' lpath = 'lpath'
rfile = azmodels.StorageEntity('cont')
rfile._md5 = 'abc'
rfile._client = mock.MagicMock()
rfile._client.primary_endpoint = 'ep'
rfile._name = 'name'
rfile._vio = None
rfile._size = 256
key = ops.Downloader.create_unique_transfer_operation_id(rfile)
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[lpath] = mock.MagicMock() d._md5_map[key] = rfile
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
d._md5_offload = mock.MagicMock() d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [None, (lpath, False)] d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
]
d._add_to_download_queue = mock.MagicMock() d._add_to_download_queue = mock.MagicMock()
d._all_remote_files_processed = False d._all_remote_files_processed = False
d._download_terminate = True d._download_terminate = True
@ -343,11 +368,14 @@ def test_check_for_downloads_from_md5():
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[lpath] = mock.MagicMock() d._md5_map[key] = rfile
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
d._md5_offload = mock.MagicMock() d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [None, (lpath, False)] d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
]
d._add_to_download_queue = mock.MagicMock() d._add_to_download_queue = mock.MagicMock()
patched_tc.side_effect = [False, False, True] patched_tc.side_effect = [False, False, True]
d._check_for_downloads_from_md5() d._check_for_downloads_from_md5()
@ -359,8 +387,8 @@ def test_check_for_downloads_from_md5():
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._md5_map[lpath] = mock.MagicMock() d._md5_map[key] = rfile
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
d._md5_offload = mock.MagicMock() d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [None] d._md5_offload.pop_done_queue.side_effect = [None]
@ -372,15 +400,25 @@ def test_check_for_downloads_from_md5():
def test_check_for_crypto_done(): def test_check_for_crypto_done():
lpath = 'lpath' lpath = 'lpath'
rfile = azmodels.StorageEntity('cont')
rfile._md5 = 'abc'
rfile._client = mock.MagicMock()
rfile._client.primary_endpoint = 'ep'
rfile._name = 'name'
rfile._vio = None
rfile._size = 256
key = ops.Downloader.create_unique_transfer_operation_id(rfile)
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
dd = mock.MagicMock() dd = mock.MagicMock()
d._dd_map[lpath] = dd d._dd_map[lpath] = dd
offsets = mock.MagicMock()
offsets.range_start = 0
d._crypto_offload = mock.MagicMock() d._crypto_offload = mock.MagicMock()
d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.done_cv = multiprocessing.Condition()
d._crypto_offload.pop_done_queue.side_effect = [ d._crypto_offload.pop_done_queue.side_effect = [
None, None,
lpath, (lpath, offsets)
] ]
d._all_remote_files_processed = False d._all_remote_files_processed = False
d._download_terminate = True d._download_terminate = True
@ -393,14 +431,16 @@ def test_check_for_crypto_done():
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
dd = mock.MagicMock() dd = mock.MagicMock()
dd.entity = rfile
dd.final_path = lpath
d._dd_map[lpath] = dd d._dd_map[lpath] = dd
d._crypto_offload = mock.MagicMock() d._crypto_offload = mock.MagicMock()
d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.done_cv = multiprocessing.Condition()
d._crypto_offload.pop_done_queue.side_effect = [ d._crypto_offload.pop_done_queue.side_effect = [
None, None,
lpath, (lpath, offsets),
] ]
patched_tc.side_effect = [False, False, True] patched_tc.side_effect = [False, False, True]
d._complete_chunk_download = mock.MagicMock() d._complete_chunk_download = mock.MagicMock()
@ -413,13 +453,15 @@ def test_check_for_crypto_done():
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._download_set.add(pathlib.Path(lpath)) d._transfer_set.add(key)
dd = mock.MagicMock() dd = mock.MagicMock()
dd.entity = rfile
dd.final_path = lpath
d._crypto_offload = mock.MagicMock() d._crypto_offload = mock.MagicMock()
d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.done_cv = multiprocessing.Condition()
d._crypto_offload.pop_done_queue.side_effect = [ d._crypto_offload.pop_done_queue.side_effect = [
None, None,
lpath, (lpath, offsets),
] ]
patched_tc.side_effect = [False, False, True] patched_tc.side_effect = [False, False, True]
d._complete_chunk_download = mock.MagicMock() d._complete_chunk_download = mock.MagicMock()
@ -438,39 +480,41 @@ def test_add_to_download_queue(tmpdir):
d._spec.options.chunk_size_bytes = 1 d._spec.options.chunk_size_bytes = 1
d._add_to_download_queue(lpath, ase) d._add_to_download_queue(lpath, ase)
assert d._download_queue.qsize() == 1 assert d._transfer_queue.qsize() == 1
assert path in d._dd_map assert path in d._dd_map
def test_initialize_and_terminate_download_threads(): def test_initialize_and_terminate_transfer_threads():
opts = mock.MagicMock() opts = mock.MagicMock()
opts.concurrency.transfer_threads = 2 opts.concurrency.transfer_threads = 2
d = ops.Downloader(opts, mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(opts, mock.MagicMock(), mock.MagicMock())
d._worker_thread_download = mock.MagicMock() d._worker_thread_transfer = mock.MagicMock()
d._initialize_download_threads() d._initialize_transfer_threads()
assert len(d._download_threads) == 2 assert len(d._transfer_threads) == 2
d._wait_for_download_threads(terminate=True) d._wait_for_transfer_threads(terminate=True)
assert d._download_terminate assert d._download_terminate
for thr in d._download_threads: for thr in d._transfer_threads:
assert not thr.is_alive() assert not thr.is_alive()
@mock.patch('blobxfer.operations.crypto.aes_cbc_decrypt_data') @mock.patch('blobxfer.operations.crypto.aes_cbc_decrypt_data')
@mock.patch('blobxfer.operations.azure.file.get_file_range') @mock.patch('blobxfer.operations.azure.file.get_file_range')
@mock.patch('blobxfer.operations.azure.blob.get_blob_range') @mock.patch('blobxfer.operations.azure.blob.get_blob_range')
def test_worker_thread_download( def test_worker_thread_transfer(
patched_gbr, patched_gfr, patched_acdd, tmpdir): patched_gbr, patched_gfr, patched_acdd, tmpdir):
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._complete_chunk_download = mock.MagicMock() d._complete_chunk_download = mock.MagicMock()
d._download_terminate = True d._download_terminate = True
d._worker_thread_download() d._general_options.concurrency.transfer_threads = 1
d._general_options.concurrency.disk_threads = 1
d._worker_thread_transfer()
assert d._complete_chunk_download.call_count == 0 assert d._complete_chunk_download.call_count == 0
d._download_terminate = False d._download_terminate = False
d._all_remote_files_processed = True d._all_remote_files_processed = True
d._worker_thread_download() d._worker_thread_transfer()
assert d._complete_chunk_download.call_count == 0 assert d._complete_chunk_download.call_count == 0
with mock.patch( with mock.patch(
@ -486,11 +530,11 @@ def test_worker_thread_download(
opts.check_file_md5 = False opts.check_file_md5 = False
opts.chunk_size_bytes = 16 opts.chunk_size_bytes = 16
dd = models.Descriptor(lp, ase, opts, None) dd = models.Descriptor(lp, ase, opts, None)
d._download_queue = mock.MagicMock() d._transfer_queue = mock.MagicMock()
d._download_queue.get.side_effect = [queue.Empty, dd] d._transfer_queue.get.side_effect = [queue.Empty, dd]
d._process_download_descriptor = mock.MagicMock() d._process_download_descriptor = mock.MagicMock()
d._process_download_descriptor.side_effect = RuntimeError('oops') d._process_download_descriptor.side_effect = RuntimeError('oops')
d._worker_thread_download() d._worker_thread_transfer()
assert len(d._exceptions) == 1 assert len(d._exceptions) == 1
assert d._process_download_descriptor.call_count == 1 assert d._process_download_descriptor.call_count == 1
@ -503,26 +547,35 @@ def test_worker_thread_download(
new_callable=mock.PropertyMock) as patched_aoc: new_callable=mock.PropertyMock) as patched_aoc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.transfer_threads = 1
d._general_options.concurrency.disk_threads = 1
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = False opts.check_file_md5 = False
opts.chunk_size_bytes = 16 opts.chunk_size_bytes = 16
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._size = 16 ase._size = 16
ase._client = mock.MagicMock()
ase._client.primary_endpoint = 'ep'
ase._name = 'name'
ase._vio = None
key = ops.Downloader.create_unique_transfer_operation_id(ase)
ase._encryption = mock.MagicMock() ase._encryption = mock.MagicMock()
ase._encryption.symmetric_key = b'abc' ase._encryption.symmetric_key = b'abc'
lp = pathlib.Path(str(tmpdir.join('a'))) lp = pathlib.Path(str(tmpdir.join('a')))
dd = models.Descriptor(lp, ase, opts, None) dd = models.Descriptor(lp, ase, opts, None)
dd.next_offsets = mock.MagicMock( dd.next_offsets = mock.MagicMock(
side_effect=[(None, 1), (None, 2)]) side_effect=[(None, 1), (None, 2)])
dd.finalize_integrity = mock.MagicMock()
dd.finalize_file = mock.MagicMock() dd.finalize_file = mock.MagicMock()
dd.perform_chunked_integrity_check = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock()
dd.all_operations_completed.side_effect = [False, True]
patched_aoc.side_effect = [False, True] patched_aoc.side_effect = [False, True]
patched_tc.side_effect = [False, False, False, True] patched_tc.side_effect = [False, False, False, True]
d._dd_map[str(lp)] = dd d._dd_map[str(lp)] = dd
d._download_set.add(lp) d._transfer_set.add(key)
d._download_queue = mock.MagicMock() d._transfer_queue = mock.MagicMock()
d._download_queue.get.side_effect = [queue.Empty, dd, dd] d._transfer_queue.get.side_effect = [queue.Empty, dd, dd]
d._worker_thread_download() d._worker_thread_transfer()
assert str(lp) not in d._dd_map assert str(lp) not in d._dd_map
assert dd.finalize_file.call_count == 1 assert dd.finalize_file.call_count == 1
assert d._download_sofar == 1 assert d._download_sofar == 1
@ -533,23 +586,33 @@ def test_worker_thread_download(
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.transfer_threads = 1
d._general_options.concurrency.disk_threads = 1
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = True opts.check_file_md5 = True
opts.chunk_size_bytes = 16 opts.chunk_size_bytes = 16
ase = azmodels.StorageEntity('cont') ase = azmodels.StorageEntity('cont')
ase._mode = azmodels.StorageModes.File ase._mode = azmodels.StorageModes.File
ase._size = 16 ase._size = 16
ase._client = mock.MagicMock()
ase._client.primary_endpoint = 'ep'
ase._name = 'name'
ase._vio = None
key = ops.Downloader.create_unique_transfer_operation_id(ase)
patched_gfr.return_value = b'0' * ase._size patched_gfr.return_value = b'0' * ase._size
lp = pathlib.Path(str(tmpdir.join('b'))) lp = pathlib.Path(str(tmpdir.join('b')))
dd = models.Descriptor(lp, ase, opts, None) dd = models.Descriptor(lp, ase, opts, None)
dd.finalize_file = mock.MagicMock() dd.finalize_file = mock.MagicMock()
dd.perform_chunked_integrity_check = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock()
d._dd_map[str(lp)] = mock.MagicMock() d._dd_map[str(lp)] = mock.MagicMock()
d._download_set.add(lp) d._transfer_set.add(key)
d._download_queue = mock.MagicMock() d._transfer_queue = mock.MagicMock()
d._download_queue.get.side_effect = [dd] d._transfer_queue.get.side_effect = [dd]
patched_tc.side_effect = [False, True] patched_tc.side_effect = [False, True]
d._worker_thread_download() d._worker_thread_transfer()
assert len(d._disk_set) == 1
a, b, c = d._disk_queue.get()
d._process_data(a, b, c)
assert dd.perform_chunked_integrity_check.call_count == 1 assert dd.perform_chunked_integrity_check.call_count == 1
with mock.patch( with mock.patch(
@ -557,6 +620,8 @@ def test_worker_thread_download(
new_callable=mock.PropertyMock) as patched_tc: new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.transfer_threads = 1
d._general_options.concurrency.disk_threads = 1
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = False opts.check_file_md5 = False
opts.chunk_size_bytes = 16 opts.chunk_size_bytes = 16
@ -566,6 +631,11 @@ def test_worker_thread_download(
ase._encryption = mock.MagicMock() ase._encryption = mock.MagicMock()
ase._encryption.symmetric_key = b'abc' ase._encryption.symmetric_key = b'abc'
ase._encryption.content_encryption_iv = b'0' * 16 ase._encryption.content_encryption_iv = b'0' * 16
ase._client = mock.MagicMock()
ase._client.primary_endpoint = 'ep'
ase._name = 'name'
ase._vio = None
key = ops.Downloader.create_unique_transfer_operation_id(ase)
patched_gfr.return_value = b'0' * ase._size patched_gfr.return_value = b'0' * ase._size
lp = pathlib.Path(str(tmpdir.join('c'))) lp = pathlib.Path(str(tmpdir.join('c')))
dd = models.Descriptor(lp, ase, opts, None) dd = models.Descriptor(lp, ase, opts, None)
@ -575,11 +645,14 @@ def test_worker_thread_download(
d._crypto_offload = mock.MagicMock() d._crypto_offload = mock.MagicMock()
d._crypto_offload.add_decrypt_chunk = mock.MagicMock() d._crypto_offload.add_decrypt_chunk = mock.MagicMock()
d._dd_map[str(lp)] = dd d._dd_map[str(lp)] = dd
d._download_set.add(lp) d._transfer_set.add(key)
d._download_queue = mock.MagicMock() d._transfer_queue = mock.MagicMock()
d._download_queue.get.side_effect = [dd] d._transfer_queue.get.side_effect = [dd]
patched_tc.side_effect = [False, True] patched_tc.side_effect = [False, True]
d._worker_thread_download() d._worker_thread_transfer()
assert len(d._disk_set) == 1
a, b, c = d._disk_queue.get()
d._process_data(a, b, c)
assert d._crypto_offload.add_decrypt_chunk.call_count == 1 assert d._crypto_offload.add_decrypt_chunk.call_count == 1
assert dd.write_unchecked_hmac_data.call_count == 1 assert dd.write_unchecked_hmac_data.call_count == 1
@ -589,6 +662,8 @@ def test_worker_thread_download(
d = ops.Downloader( d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.crypto_processes = 0 d._general_options.concurrency.crypto_processes = 0
d._general_options.concurrency.transfer_threads = 1
d._general_options.concurrency.disk_threads = 1
opts = mock.MagicMock() opts = mock.MagicMock()
opts.check_file_md5 = False opts.check_file_md5 = False
opts.chunk_size_bytes = 16 opts.chunk_size_bytes = 16
@ -598,19 +673,28 @@ def test_worker_thread_download(
ase._encryption = mock.MagicMock() ase._encryption = mock.MagicMock()
ase._encryption.symmetric_key = b'abc' ase._encryption.symmetric_key = b'abc'
ase._encryption.content_encryption_iv = b'0' * 16 ase._encryption.content_encryption_iv = b'0' * 16
ase._client = mock.MagicMock()
ase._client.primary_endpoint = 'ep'
ase._name = 'name'
ase._vio = None
key = ops.Downloader.create_unique_transfer_operation_id(ase)
patched_gfr.return_value = b'0' * ase._size patched_gfr.return_value = b'0' * ase._size
lp = pathlib.Path(str(tmpdir.join('d'))) lp = pathlib.Path(str(tmpdir.join('d')))
dd = models.Descriptor(lp, ase, opts, None) dd = models.Descriptor(lp, ase, opts, None)
dd.next_offsets() dd.next_offsets()
dd.write_unchecked_hmac_data = mock.MagicMock() dd.write_unchecked_hmac_data = mock.MagicMock()
dd.perform_chunked_integrity_check = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock()
dd.mark_unchecked_chunk_decrypted = mock.MagicMock()
patched_acdd.return_value = b'0' * 16 patched_acdd.return_value = b'0' * 16
d._dd_map[str(lp)] = mock.MagicMock() d._dd_map[str(lp)] = mock.MagicMock()
d._download_set.add(lp) d._transfer_set.add(key)
d._download_queue = mock.MagicMock() d._transfer_queue = mock.MagicMock()
d._download_queue.get.side_effect = [dd] d._transfer_queue.get.side_effect = [dd, dd]
patched_tc.side_effect = [False, True] patched_tc.side_effect = [False, True]
d._worker_thread_download() d._worker_thread_transfer()
assert len(d._disk_set) == 1
a, b, c = d._disk_queue.get()
d._process_data(a, b, c)
assert patched_acdd.call_count == 1 assert patched_acdd.call_count == 1
assert dd.write_unchecked_hmac_data.call_count == 1 assert dd.write_unchecked_hmac_data.call_count == 1
assert dd.perform_chunked_integrity_check.call_count == 1 assert dd.perform_chunked_integrity_check.call_count == 1
@ -631,7 +715,7 @@ def test_cleanup_temporary_files(tmpdir):
d._general_options.resume_file = pathlib.Path('abc') d._general_options.resume_file = pathlib.Path('abc')
d._dd_map[0] = dd d._dd_map[0] = dd
d._cleanup_temporary_files() d._cleanup_temporary_files()
assert dd.local_path.exists() assert dd.final_path.exists()
lp = pathlib.Path(str(tmpdir.join('b'))) lp = pathlib.Path(str(tmpdir.join('b')))
opts = mock.MagicMock() opts = mock.MagicMock()
@ -645,7 +729,7 @@ def test_cleanup_temporary_files(tmpdir):
d._general_options.resume_file = None d._general_options.resume_file = None
d._dd_map[0] = dd d._dd_map[0] = dd
d._cleanup_temporary_files() d._cleanup_temporary_files()
assert not dd.local_path.exists() assert not dd.final_path.exists()
lp = pathlib.Path(str(tmpdir.join('c'))) lp = pathlib.Path(str(tmpdir.join('c')))
opts = mock.MagicMock() opts = mock.MagicMock()
@ -661,7 +745,7 @@ def test_cleanup_temporary_files(tmpdir):
d._general_options.resume_file = None d._general_options.resume_file = None
d._dd_map[0] = dd d._dd_map[0] = dd
d._cleanup_temporary_files() d._cleanup_temporary_files()
assert dd.local_path.exists() assert dd.final_path.exists()
def test_catalog_local_files_for_deletion(tmpdir): def test_catalog_local_files_for_deletion(tmpdir):
@ -699,21 +783,16 @@ def test_delete_extraneous_files(tmpdir):
d._delete_extraneous_files() d._delete_extraneous_files()
@mock.patch('blobxfer.operations.md5.LocalFileMd5Offload') def _create_downloader_for_start(td):
@mock.patch('blobxfer.operations.azure.blob.list_blobs')
@mock.patch(
'blobxfer.operations.download.Downloader.ensure_local_destination',
return_value=True
)
def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir):
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._cleanup_temporary_files = mock.MagicMock() d._cleanup_temporary_files = mock.MagicMock()
d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal()) d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal())
d._initialize_download_threads = mock.MagicMock() d._initialize_transfer_threads = mock.MagicMock()
patched_lfmo._check_thread = mock.MagicMock() d._general_options.concurrency.crypto_processes = 0
d._general_options.concurrency.crypto_processes = 1
d._general_options.concurrency.md5_processes = 1 d._general_options.concurrency.md5_processes = 1
d._general_options.resume_file = pathlib.Path(str(tmpdir.join('rf'))) d._general_options.concurrency.disk_threads = 1
d._general_options.concurrency.transfer_threads = 1
d._general_options.resume_file = pathlib.Path(str(td.join('rf')))
d._spec.sources = [] d._spec.sources = []
d._spec.options = mock.MagicMock() d._spec.options = mock.MagicMock()
d._spec.options.chunk_size_bytes = 1 d._spec.options.chunk_size_bytes = 1
@ -725,50 +804,84 @@ def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir):
d._spec.skip_on.lmt_ge = False d._spec.skip_on.lmt_ge = False
d._spec.skip_on.filesize_match = False d._spec.skip_on.filesize_match = False
d._spec.destination = mock.MagicMock() d._spec.destination = mock.MagicMock()
d._spec.destination.path = pathlib.Path(str(tmpdir)) d._spec.destination.path = pathlib.Path(str(td))
d._download_start_time = util.datetime_now() d._download_start_time = util.datetime_now()
d._pre_md5_skip_on_check = mock.MagicMock()
d._check_download_conditions = mock.MagicMock()
d._all_remote_files_processed = False
p = '/cont/remote/path' p = '/cont/remote/path'
asp = azops.SourcePath() asp = azops.SourcePath()
asp.add_path_with_storage_account(p, 'sa') asp.add_path_with_storage_account(p, 'sa')
d._spec.sources.append(asp) d._spec.sources.append(asp)
b = azure.storage.blob.models.Blob(name='name') return d
@mock.patch('blobxfer.operations.md5.LocalFileMd5Offload')
@mock.patch('blobxfer.operations.azure.blob.list_blobs')
@mock.patch(
'blobxfer.operations.download.Downloader.ensure_local_destination',
return_value=True
)
@mock.patch(
'blobxfer.operations.download.Downloader.'
'create_unique_transfer_operation_id',
return_value='id'
)
@mock.patch(
'blobxfer.operations.download.Downloader._wait_for_transfer_threads',
return_value=None
)
@mock.patch(
'blobxfer.operations.download.Downloader._wait_for_disk_threads',
return_value=None
)
def test_start(
patched_wdt, patched_wtt, patched_cutoi, patched_eld, patched_lb,
patched_lfmo, tmpdir):
patched_lfmo._check_thread = mock.MagicMock()
b = azure.storage.blob.models.Blob(name='remote/path/name')
b.properties.content_length = 1 b.properties.content_length = 1
patched_lb.side_effect = [[b]] patched_lb.side_effect = [[b]]
d._pre_md5_skip_on_check = mock.MagicMock() d = _create_downloader_for_start(tmpdir)
d._check_download_conditions = mock.MagicMock()
d._check_download_conditions.return_value = ops.DownloadAction.Skip d._check_download_conditions.return_value = ops.DownloadAction.Skip
d._download_sofar = -1
d._download_bytes_sofar = -1
d.start() d.start()
assert d._pre_md5_skip_on_check.call_count == 0 assert d._pre_md5_skip_on_check.call_count == 0
patched_lb.side_effect = [[b]] patched_lb.side_effect = [[b]]
d._all_remote_files_processed = False d = _create_downloader_for_start(tmpdir)
d._check_download_conditions.return_value = ops.DownloadAction.CheckMd5 d._check_download_conditions.return_value = ops.DownloadAction.CheckMd5
d._download_sofar = -1
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
d.start() d.start()
d._download_terminate = True
assert d._pre_md5_skip_on_check.call_count == 1 assert d._pre_md5_skip_on_check.call_count == 1
b.properties.content_length = 0 b.properties.content_length = 0
patched_lb.side_effect = [[b]] patched_lb.side_effect = [[b]]
d._all_remote_files_processed = False d = _create_downloader_for_start(tmpdir)
d._check_download_conditions.return_value = ops.DownloadAction.Download d._check_download_conditions.return_value = ops.DownloadAction.Download
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
d.start() d.start()
assert d._download_queue.qsize() == 1 d._download_terminate = True
assert d._transfer_queue.qsize() == 1
# test exception count # test exception count
b = azure.storage.blob.models.Blob(name='name') b = azure.storage.blob.models.Blob(name='name')
b.properties.content_length = 1 b.properties.content_length = 1
patched_lb.side_effect = [[b]] patched_lb.side_effect = [[b]]
d = _create_downloader_for_start(tmpdir)
d._spec.destination.is_dir = False d._spec.destination.is_dir = False
d._spec.options.rename = True d._spec.options.rename = True
d._pre_md5_skip_on_check = mock.MagicMock()
d._check_download_conditions = mock.MagicMock()
d._check_download_conditions.return_value = ops.DownloadAction.Skip d._check_download_conditions.return_value = ops.DownloadAction.Skip
d._exceptions = [RuntimeError('oops')] d._exceptions = [RuntimeError('oops')]
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
d.start() d.start()
d._download_terminate = True
assert d._pre_md5_skip_on_check.call_count == 0 assert d._pre_md5_skip_on_check.call_count == 0
@ -776,11 +889,11 @@ def test_start_keyboard_interrupt():
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.resume_file = None d._general_options.resume_file = None
d._run = mock.MagicMock(side_effect=KeyboardInterrupt) d._run = mock.MagicMock(side_effect=KeyboardInterrupt)
d._wait_for_download_threads = mock.MagicMock() d._wait_for_transfer_threads = mock.MagicMock()
d._cleanup_temporary_files = mock.MagicMock() d._cleanup_temporary_files = mock.MagicMock()
d._md5_offload = mock.MagicMock() d._md5_offload = mock.MagicMock()
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
d.start() d.start()
assert d._wait_for_download_threads.call_count == 1 assert d._wait_for_transfer_threads.call_count == 1
assert d._cleanup_temporary_files.call_count == 1 assert d._cleanup_temporary_files.call_count == 1

Просмотреть файл

@ -57,6 +57,8 @@ def test_finalize_md5_processes():
def test_from_add_to_done_non_pagealigned(tmpdir): def test_from_add_to_done_non_pagealigned(tmpdir):
file = tmpdir.join('a') file = tmpdir.join('a')
file.write('abc') file.write('abc')
fpath = str(file)
key = 'key'
remote_md5 = ops.compute_md5_for_file_asbase64(str(file)) remote_md5 = ops.compute_md5_for_file_asbase64(str(file))
@ -67,7 +69,7 @@ def test_from_add_to_done_non_pagealigned(tmpdir):
assert result is None assert result is None
a.add_localfile_for_md5_check( a.add_localfile_for_md5_check(
str(file), remote_md5, azmodels.StorageModes.Block) key, fpath, fpath, remote_md5, azmodels.StorageModes.Block, None)
i = 33 i = 33
checked = False checked = False
while i > 0: while i > 0:
@ -76,9 +78,11 @@ def test_from_add_to_done_non_pagealigned(tmpdir):
time.sleep(0.3) time.sleep(0.3)
i -= 1 i -= 1
continue continue
assert len(result) == 2 assert len(result) == 4
assert result[0] == str(file) assert result[0] == key
assert result[1] assert result[1] == str(file)
assert result[2] is None
assert result[3]
checked = True checked = True
break break
assert checked assert checked
@ -90,6 +94,8 @@ def test_from_add_to_done_non_pagealigned(tmpdir):
def test_from_add_to_done_pagealigned(tmpdir): def test_from_add_to_done_pagealigned(tmpdir):
file = tmpdir.join('a') file = tmpdir.join('a')
file.write('abc') file.write('abc')
fpath = str(file)
key = 'key'
remote_md5 = ops.compute_md5_for_file_asbase64(str(file), True) remote_md5 = ops.compute_md5_for_file_asbase64(str(file), True)
@ -100,7 +106,7 @@ def test_from_add_to_done_pagealigned(tmpdir):
assert result is None assert result is None
a.add_localfile_for_md5_check( a.add_localfile_for_md5_check(
str(file), remote_md5, azmodels.StorageModes.Page) key, fpath, fpath, remote_md5, azmodels.StorageModes.Page, None)
i = 33 i = 33
checked = False checked = False
while i > 0: while i > 0:
@ -109,9 +115,11 @@ def test_from_add_to_done_pagealigned(tmpdir):
time.sleep(0.3) time.sleep(0.3)
i -= 1 i -= 1
continue continue
assert len(result) == 2 assert len(result) == 4
assert result[0] == str(file) assert result[0] == key
assert result[1] assert result[1] == str(file)
assert result[2] is None
assert result[3]
checked = True checked = True
break break
assert checked assert checked

Просмотреть файл

@ -13,12 +13,12 @@ import blobxfer.util as util
import blobxfer.operations.progress as ops import blobxfer.operations.progress as ops
def test_output_download_parameters(): def test_output_parameters():
go = mock.MagicMock() go = mock.MagicMock()
spec = mock.MagicMock() spec = mock.MagicMock()
go.log_file = 'abc' go.log_file = 'abc'
ops.output_download_parameters(go, spec) ops.output_parameters(go, spec)
assert util.is_not_empty(go.log_file) assert util.is_not_empty(go.log_file)

Просмотреть файл

@ -2,6 +2,10 @@
"""Tests for operations resume""" """Tests for operations resume"""
# stdlib imports # stdlib imports
try:
import unittest.mock as mock
except ImportError: # noqa
import mock
try: try:
import pathlib2 as pathlib import pathlib2 as pathlib
except ImportError: # noqa except ImportError: # noqa
@ -23,23 +27,28 @@ def test_download_resume_manager(tmpdir):
assert drm._data is None assert drm._data is None
assert not tmpdb.exists() assert not tmpdb.exists()
ase = mock.MagicMock()
ase._name = 'name'
ase._client.primary_endpoint = 'ep'
ase._size = 16
final_path = 'fp' final_path = 'fp'
drm = ops.DownloadResumeManager(tmpdb) drm = ops.DownloadResumeManager(tmpdb)
drm.add_or_update_record(final_path, 'tp', 1, 2, 0, False, None) drm.add_or_update_record(final_path, ase, 2, 0, False, None)
d = drm.get_record(final_path) d = drm.get_record(ase)
assert d.final_path == final_path assert d.final_path == final_path
drm.add_or_update_record(final_path, 'tp', 1, 2, 1, False, 'abc') drm.add_or_update_record(final_path, ase, 2, 1, False, 'abc')
d = drm.get_record(final_path) d = drm.get_record(ase)
assert d.final_path == final_path assert d.final_path == final_path
assert not d.completed assert not d.completed
assert d.next_integrity_chunk == 1 assert d.next_integrity_chunk == 1
assert d.md5hexdigest == 'abc' assert d.md5hexdigest == 'abc'
drm.add_or_update_record(final_path, 'tp', 1, 2, 1, True, None) drm.add_or_update_record(final_path, ase, 2, 1, True, None)
d = drm.get_record(final_path) d = drm.get_record(ase)
assert d.final_path == final_path assert d.final_path == final_path
assert d.completed assert d.completed
@ -47,8 +56,8 @@ def test_download_resume_manager(tmpdir):
assert d.md5hexdigest == 'abc' assert d.md5hexdigest == 'abc'
# idempotent check after completed # idempotent check after completed
drm.add_or_update_record(final_path, 'tp', 1, 2, 1, True, None) drm.add_or_update_record(final_path, ase, 2, 1, True, None)
d = drm.get_record(final_path) d = drm.get_record(ase)
assert d.final_path == final_path assert d.final_path == final_path
assert d.completed assert d.completed

Просмотреть файл

@ -17,26 +17,34 @@ def test_exponentialretrywithmaxwait():
er = retry.ExponentialRetryWithMaxWait( er = retry.ExponentialRetryWithMaxWait(
initial_backoff=1, max_backoff=0) initial_backoff=1, max_backoff=0)
with pytest.raises(ValueError):
er = retry.ExponentialRetryWithMaxWait(
initial_backoff=1, max_backoff=1, max_retries=-1)
with pytest.raises(ValueError):
er = retry.ExponentialRetryWithMaxWait(
initial_backoff=2, max_backoff=1)
er = retry.ExponentialRetryWithMaxWait() er = retry.ExponentialRetryWithMaxWait()
context = mock.MagicMock() context = mock.MagicMock()
context.count = 0 context.count = 0
context.response.status = 500 context.response.status = 500
bo = er.retry(context) bo = er.retry(context)
assert context.count == 1 assert context.count == 1
assert bo == 1 assert bo == 0.1
bo = er.retry(context) bo = er.retry(context)
assert context.count == 2 assert context.count == 2
assert bo == 2 assert bo == 0.2
bo = er.retry(context) bo = er.retry(context)
assert context.count == 3 assert context.count == 3
assert bo == 4 assert bo == 0.4
bo = er.retry(context) bo = er.retry(context)
assert context.count == 4 assert context.count == 4
assert bo == 8 assert bo == 0.8
bo = er.retry(context) bo = er.retry(context)
assert context.count == 1 assert context.count == 5
assert bo == 1 assert bo == 0.1

Просмотреть файл

@ -4,8 +4,8 @@ envlist = py27, py35
[testenv] [testenv]
deps = -rtest_requirements.txt deps = -rtest_requirements.txt
commands = commands =
#flake8 {envsitepackagesdir}/blobxfer_cli/ flake8 {envsitepackagesdir}/blobxfer_cli/
#flake8 {envsitepackagesdir}/blobxfer/ flake8 {envsitepackagesdir}/blobxfer/
py.test \ py.test \
-x -l -s \ -x -l -s \
--ignore venv/ \ --ignore venv/ \