diff --git a/blobxfer/models/__init__.py b/blobxfer/models/__init__.py index b9e9fbc..1d2e850 100644 --- a/blobxfer/models/__init__.py +++ b/blobxfer/models/__init__.py @@ -60,17 +60,6 @@ class _BaseSourcePaths(object): """ return self._paths - def add_include(self, incl): - # type: (_BaseSourcePaths, str) -> None - """Add an include - :param _BaseSourcePaths self: this - :param str incl: include filter - """ - if self._include is None: - self._include = list(incl) - else: - self._include.append(incl) - def add_includes(self, includes): # type: (_BaseSourcePaths, list) -> None """Add a list of includes @@ -78,23 +67,12 @@ class _BaseSourcePaths(object): :param list includes: list of includes """ if not isinstance(includes, list): - includes = list(includes) + includes = [includes] if self._include is None: self._include = includes else: self._include.extend(includes) - def add_exclude(self, excl): - # type: (_BaseSourcePaths, str) -> None - """Add an exclude - :param _BaseSourcePaths self: this - :param str excl: exclude filter - """ - if self._exclude is None: - self._exclude = list(excl) - else: - self._exclude.append(excl) - def add_excludes(self, excludes): # type: (_BaseSourcePaths, list) -> None """Add a list of excludes @@ -102,7 +80,7 @@ class _BaseSourcePaths(object): :param list excludes: list of excludes """ if not isinstance(excludes, list): - excludes = list(excludes) + excludes = [excludes] if self._exclude is None: self._exclude = excludes else: diff --git a/blobxfer/models/download.py b/blobxfer/models/download.py index a197a25..cc363ef 100644 --- a/blobxfer/models/download.py +++ b/blobxfer/models/download.py @@ -345,7 +345,7 @@ class Descriptor(object): fd_start=0, fd_end=slicesize, ) - total_size = ase.size + total_size = slicesize else: view = LocalPathView( fd_start=ase.vectored_io.offset_start, @@ -529,7 +529,7 @@ class Descriptor(object): pass # iterate unchecked chunks and delete for key in self._unchecked_chunks: - ucc = self._unchecked_chunks[key] + ucc = self._unchecked_chunks[key]['ucc'] if ucc.temp: try: ucc.file_path.unlink() diff --git a/blobxfer/models/options.py b/blobxfer/models/options.py index cdc32df..c516d01 100644 --- a/blobxfer/models/options.py +++ b/blobxfer/models/options.py @@ -122,24 +122,24 @@ class Concurrency(object): if self.crypto_processes is None or self.crypto_processes < 1: self.crypto_processes = 0 if self.md5_processes is None or self.md5_processes < 1: - self.md5_processes = multiprocessing.cpu_count() // 2 + self.md5_processes = multiprocessing.cpu_count() >> 1 if self.md5_processes < 1: self.md5_processes = 1 auto_disk = False if self.disk_threads is None or self.disk_threads < 1: - self.disk_threads = multiprocessing.cpu_count() * 4 - # cap maximum number of disk threads from cpu count to 96 - if self.disk_threads > 96: - self.transfer_threads = 96 + self.disk_threads = multiprocessing.cpu_count() << 1 + # cap maximum number of disk threads from cpu count to 64 + if self.disk_threads > 64: + self.disk_threads = 64 auto_disk = True if self.transfer_threads is None or self.transfer_threads < 1: if auto_disk: self.transfer_threads = self.disk_threads << 1 else: - self.transfer_threads = multiprocessing.cpu_count() * 2 - # cap maximum number of threads from cpu count to 64 - if self.transfer_threads > 64: - self.transfer_threads = 64 + self.transfer_threads = multiprocessing.cpu_count() << 2 + # cap maximum number of threads from cpu count to 96 + if self.transfer_threads > 96: + self.transfer_threads = 96 class General(object): diff --git a/blobxfer/operations/azure/__init__.py b/blobxfer/operations/azure/__init__.py index 177d41d..67d531f 100644 --- a/blobxfer/operations/azure/__init__.py +++ b/blobxfer/operations/azure/__init__.py @@ -287,7 +287,7 @@ class SourcePath(blobxfer.models._BaseSourcePaths): encryption_metadata_exists(entity.metadata): ed = blobxfer.models.crypto.EncryptionMetadata() ed.convert_from_json( - entity.metadata, file.name, options.rsa_private_key) + entity.metadata, entity.name, options.rsa_private_key) else: ed = None ase = blobxfer.models.azure.StorageEntity(container, ed) diff --git a/blobxfer/operations/download.py b/blobxfer/operations/download.py index 3df37bc..a369d0f 100644 --- a/blobxfer/operations/download.py +++ b/blobxfer/operations/download.py @@ -298,6 +298,7 @@ class Downloader(object): convert_vectored_io_slice_to_final_path_name(lpath, rfile) ) else: + view = None fpath = slpath self._md5_offload.add_localfile_for_md5_check( key, slpath, fpath, md5, rfile.mode, view) @@ -453,11 +454,10 @@ class Downloader(object): """Worker thread download :param Downloader self: this """ + max_set_len = self._general_options.concurrency.disk_threads << 2 while not self.termination_check: try: - if (len(self._disk_set) > - self._general_options.concurrency. - disk_threads * 4): + if len(self._disk_set) > max_set_len: time.sleep(0.2) continue else: diff --git a/blobxfer/operations/progress.py b/blobxfer/operations/progress.py index 07a9281..0bf132e 100644 --- a/blobxfer/operations/progress.py +++ b/blobxfer/operations/progress.py @@ -39,6 +39,8 @@ import azure.storage import cryptography import requests # local imports +import blobxfer.models.download +import blobxfer.models.upload import blobxfer.util import blobxfer.version @@ -158,7 +160,7 @@ def output_parameters(general_options, spec): spec.skip_on.filesize_match, spec.skip_on.lmt_ge, spec.skip_on.md5_match)) - log.append(' chunk size: {} bytes'.format( + log.append(' chunk size bytes: {}'.format( spec.options.chunk_size_bytes)) log.append(' delete extraneous: {}'.format( spec.options.delete_extraneous_destination)) diff --git a/blobxfer/operations/resume.py b/blobxfer/operations/resume.py index 0458bec..88172e4 100644 --- a/blobxfer/operations/resume.py +++ b/blobxfer/operations/resume.py @@ -44,7 +44,8 @@ import blobxfer.util logger = logging.getLogger(__name__) -class _BaseResumeManager(): +class _BaseResumeManager(object): + """Base Resume Manager""" def __init__(self, resume_file): # type: (_BaseResumeManager, str) -> None """Ctor for _BaseResumeManager @@ -99,18 +100,21 @@ class _BaseResumeManager(): :rtype: str :return: record key """ - return '{}:{}'.format(ase._client.primary_endpoint, ase.path) + key = '{}:{}'.format(ase._client.primary_endpoint, ase.path) + if blobxfer.util.on_python2(): + return key.encode('utf8') + else: + return key def get_record(self, ase, key=None, lock=True): - # type: (_BaseResumeManager, str, - # bool) -> object + # type: (_BaseResumeManager, str, bool) -> object """Get a resume record :param _BaseResumeManager self: this :param blobxfer.models.azure.StorageEntity ase: Storage Entity :param str key: record key :param bool lock: acquire lock - :rtype: blobxfer.models.resume._Base - :return: _Base record + :rtype: object + :return: resume record object """ if key is None: key = blobxfer.operations.resume._BaseResumeManager.\ diff --git a/blobxfer/operations/upload.py b/blobxfer/operations/upload.py index a946d7c..02447c9 100644 --- a/blobxfer/operations/upload.py +++ b/blobxfer/operations/upload.py @@ -446,11 +446,10 @@ class Uploader(object): """Worker thread upload :param Uploader self: this """ + max_set_len = self._general_options.concurrency.transfer_threads << 2 while not self.termination_check: try: - if (len(self._transfer_set) > - self._general_options.concurrency. - transfer_threads * 4): + if len(self._transfer_set) > max_set_len: time.sleep(0.2) continue else: diff --git a/blobxfer/retry.py b/blobxfer/retry.py index 892b25c..daee22a 100644 --- a/blobxfer/retry.py +++ b/blobxfer/retry.py @@ -37,23 +37,34 @@ import azure.storage.retry class ExponentialRetryWithMaxWait(azure.storage.retry._Retry): """Exponential Retry with Max Wait (infinite retries)""" - def __init__(self, initial_backoff=0.1, max_backoff=2, reset_at_max=True): - # type: (ExponentialRetryWithMaxWait, int, int, bool) -> None + def __init__( + self, initial_backoff=0.1, max_backoff=1, max_retries=None, + reset_at_max=True): + # type: (ExponentialRetryWithMaxWait, int, int, int, bool) -> None """Ctor for ExponentialRetryWithMaxWait :param ExponentialRetryWithMaxWait self: this :param int initial_backoff: initial backoff :param int max_backoff: max backoff + :param int max_retries: max retries :param bool reset_at_max: reset after reaching max wait """ + if max_backoff <= 0: + raise ValueError( + 'max backoff is non-positive: {}'.format(max_backoff)) + if max_retries is not None and max_retries < 0: + raise ValueError( + 'max retries is invalid: {}'.format(max_retries)) if max_backoff < initial_backoff: raise ValueError( 'max backoff {} less than initial backoff {}'.format( max_backoff, initial_backoff)) + self._backoff_count = 0 + self._last_backoff = initial_backoff self.initial_backoff = initial_backoff self.max_backoff = max_backoff self.reset_at_max = reset_at_max super(ExponentialRetryWithMaxWait, self).__init__( - max_backoff if self.reset_at_max else 2147483647, False) + max_retries if max_retries is not None else 2147483647, False) def retry(self, context): # type: (ExponentialRetryWithMaxWait, @@ -75,11 +86,12 @@ class ExponentialRetryWithMaxWait(azure.storage.retry._Retry): :rtype: int :return: backoff amount """ - if context.count == 1: - backoff = self.initial_backoff + self._backoff_count += 1 + if self._backoff_count == 1: + self._last_backoff = self.initial_backoff else: - backoff = self.initial_backoff * (context.count - 1) - if backoff > self.max_backoff and self.reset_at_max: - backoff = self.initial_backoff - context.count = 1 - return backoff + self._last_backoff *= 2 + if self._last_backoff > self.max_backoff and self.reset_at_max: + self._backoff_count = 1 + self._last_backoff = self.initial_backoff + return self._last_backoff diff --git a/blobxfer/util.py b/blobxfer/util.py index cce84f0..166b98f 100644 --- a/blobxfer/util.py +++ b/blobxfer/util.py @@ -65,7 +65,7 @@ def on_python2(): return future.utils.PY2 -def on_windows(): +def on_windows(): # noqa # type: (None) -> bool """Execution on Windows :rtype: bool diff --git a/cli/cli.py b/cli/cli.py index bf397b0..ec4f3f9 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -41,7 +41,11 @@ import ruamel.yaml import blobxfer.api import blobxfer.util # local imports -from . import settings +try: + from . import settings +except (SystemError, ImportError): # noqa + # for local testing + import settings # create logger logger = logging.getLogger('blobxfer') diff --git a/test_requirements.txt b/test_requirements.txt index bc58365..c576b44 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -1,5 +1,5 @@ flake8>=3.3.0 mock>=2.0.0; python_version < '3.3' -pypandoc>=1.3.3 -pytest>=3.0.7 -pytest-cov>=2.4.0 +pypandoc>=1.4 +pytest>=3.1.1 +pytest-cov>=2.5.1 diff --git a/tests/test_blobxfer_models_azure.py b/tests/test_blobxfer_models_azure.py index 6ddc95b..f075092 100644 --- a/tests/test_blobxfer_models_azure.py +++ b/tests/test_blobxfer_models_azure.py @@ -49,6 +49,6 @@ def test_azurestorageentity(): assert ase.snapshot is not None blob.snapshot = None - ase.populate_from_file(mock.MagicMock(), blob) + ase.populate_from_file(mock.MagicMock(), blob, 'path') assert ase.mode == azmodels.StorageModes.File assert ase.snapshot is None diff --git a/tests/test_blobxfer_models_download.py b/tests/test_blobxfer_models_download.py index 918a7f0..c1b568e 100644 --- a/tests/test_blobxfer_models_download.py +++ b/tests/test_blobxfer_models_download.py @@ -110,32 +110,33 @@ def test_downloaddescriptor(tmpdir): d._allocate_disk_space() assert d.entity == ase + assert d.entity.is_encrypted assert not d.must_compute_md5 + assert d.hmac is not None assert d._total_chunks == 64 assert d._offset == 0 assert d.final_path == lp - assert str(d.local_path) == str(lp) + '.bxtmp' assert d._allocated - assert d.local_path.stat().st_size == 1024 - 16 + assert d.final_path.stat().st_size == ase._size - 16 d._allocate_disk_space() assert d._allocated - d.local_path.unlink() - ase._size = 1 + d.final_path.unlink() + ase._size = 32 d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() - assert d._total_chunks == 1 + assert d._total_chunks == 2 assert d._allocated - assert d.local_path.stat().st_size == 0 + assert d.final_path.stat().st_size == ase._size - 16 - d.local_path.unlink() + d.final_path.unlink() ase._encryption = None ase._size = 1024 d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() assert d._allocated - assert d.local_path.stat().st_size == 1024 + assert d.final_path.stat().st_size == ase._size # pre-existing file check ase._size = 0 @@ -143,13 +144,12 @@ def test_downloaddescriptor(tmpdir): d._allocate_disk_space() assert d._total_chunks == 0 assert d._allocated - assert d.local_path.stat().st_size == 0 + assert d.final_path.stat().st_size == ase._size @unittest.skipIf(util.on_python2(), 'fallocate does not exist') def test_downloaddescriptor_allocate_disk_space_via_seek(tmpdir): fp = pathlib.Path(str(tmpdir.join('fp'))) - lp = pathlib.Path(str(tmpdir.join('fp.bxtmp'))) opts = mock.MagicMock() opts.check_file_md5 = True opts.chunk_size_bytes = 256 @@ -162,14 +162,13 @@ def test_downloaddescriptor_allocate_disk_space_via_seek(tmpdir): patched_fallocate.side_effect = [AttributeError()] d._allocate_disk_space() assert d._allocated - assert not fp.exists() - assert lp.stat().st_size == ase._size + assert fp.exists() + assert fp.stat().st_size == ase._size def test_downloaddescriptor_resume(tmpdir): resumefile = pathlib.Path(str(tmpdir.join('resume'))) fp = pathlib.Path(str(tmpdir.join('fp'))) - lp = pathlib.Path(str(tmpdir.join('fp.bxtmp'))) opts = mock.MagicMock() opts.check_file_md5 = True @@ -177,6 +176,7 @@ def test_downloaddescriptor_resume(tmpdir): ase = azmodels.StorageEntity('cont') ase._size = 128 ase._name = 'blob' + ase._client = mock.MagicMock() # test no record rmgr = rops.DownloadResumeManager(resumefile) @@ -185,7 +185,7 @@ def test_downloaddescriptor_resume(tmpdir): assert rb is None # test length mismatch - rmgr.add_or_update_record(str(fp), str(lp), 127, 0, 0, False, None) + rmgr.add_or_update_record(str(fp), ase, 0, 0, False, None) rb = d._resume() assert rb is None @@ -193,7 +193,7 @@ def test_downloaddescriptor_resume(tmpdir): rmgr.delete() rmgr = rops.DownloadResumeManager(resumefile) - rmgr.add_or_update_record(str(fp), str(lp), ase._size, 0, 0, False, None) + rmgr.add_or_update_record(str(fp), ase, 0, 0, False, None) d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() assert rb is None @@ -202,7 +202,7 @@ def test_downloaddescriptor_resume(tmpdir): rmgr.delete() rmgr = rops.DownloadResumeManager(resumefile) - rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, True, None) + rmgr.add_or_update_record(str(fp), ase, 32, 1, True, None) d = models.Descriptor(fp, ase, opts, rmgr) fp.touch() rb = d._resume() @@ -215,22 +215,23 @@ def test_downloaddescriptor_resume(tmpdir): ase._encryption = mock.MagicMock() ase._encryption.symmetric_key = b'123' - rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None) d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() assert rb is None - # test if intermediate file not exists + # test up to chunk rmgr.delete() rmgr = rops.DownloadResumeManager(resumefile) ase = azmodels.StorageEntity('cont') ase._size = 128 ase._name = 'blob' + ase._client = mock.MagicMock() - rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None) d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() - assert rb is None + assert rb == 32 # ensure hmac not populated rmgr.delete() @@ -238,9 +239,10 @@ def test_downloaddescriptor_resume(tmpdir): ase = azmodels.StorageEntity('cont') ase._size = 128 ase._name = 'blob' - lp.touch() + ase._client = mock.MagicMock() + fp.touch() - rmgr.add_or_update_record(str(fp), str(lp), ase._size, 32, 1, False, None) + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, None) d = models.Descriptor(fp, ase, opts, rmgr) d.hmac = True with pytest.raises(RuntimeError): @@ -251,13 +253,12 @@ def test_downloaddescriptor_resume(tmpdir): rmgr = rops.DownloadResumeManager(resumefile) data = os.urandom(32) - with lp.open('wb') as f: + with fp.open('wb') as f: f.write(data) md5 = util.new_md5_hasher() md5.update(data) - rmgr.add_or_update_record( - str(fp), str(lp), ase._size, 32, 1, False, md5.hexdigest()) + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, md5.hexdigest()) d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() assert rb == 32 @@ -265,8 +266,7 @@ def test_downloaddescriptor_resume(tmpdir): # md5 hash mismatch rmgr.delete() rmgr = rops.DownloadResumeManager(resumefile) - rmgr.add_or_update_record( - str(fp), str(lp), ase._size, 32, 1, False, 'abc') + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, 'abc') ase._md5 = 'abc' d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() @@ -278,10 +278,10 @@ def test_downloaddescriptor_resume(tmpdir): ase = azmodels.StorageEntity('cont') ase._size = 128 ase._name = 'blob' + ase._client = mock.MagicMock() ase._mode = azmodels.StorageModes.Page - rmgr.add_or_update_record( - str(fp), str(lp), ase._size, 32, 1, False, md5.hexdigest()) + rmgr.add_or_update_record(str(fp), ase, 32, 1, False, md5.hexdigest()) d = models.Descriptor(fp, ase, opts, rmgr) rb = d._resume() assert rb == 32 @@ -443,10 +443,11 @@ def test_write_unchecked_data(tmpdir): assert offsets.chunk_num in d._unchecked_chunks ucc = d._unchecked_chunks[offsets.chunk_num] - assert ucc.data_len == ase._size - assert ucc.fd_start == offsets.fd_start - assert ucc.file_path == d.local_path - assert not ucc.temp + assert ucc['ucc'].data_len == ase._size + assert ucc['ucc'].fd_start == offsets.fd_start + assert ucc['ucc'].file_path == d.final_path + assert not ucc['ucc'].temp + assert ucc['decrypted'] def test_write_unchecked_hmac_data(tmpdir): @@ -464,10 +465,11 @@ def test_write_unchecked_hmac_data(tmpdir): assert offsets.chunk_num in d._unchecked_chunks ucc = d._unchecked_chunks[offsets.chunk_num] - assert ucc.data_len == ase._size - assert ucc.fd_start == offsets.fd_start - assert ucc.file_path != d.local_path - assert ucc.temp + assert ucc['ucc'].data_len == ase._size + assert ucc['ucc'].fd_start == offsets.fd_start + assert ucc['ucc'].file_path != d.final_path + assert ucc['ucc'].temp + assert not ucc['decrypted'] def test_perform_chunked_integrity_check(tmpdir): @@ -505,10 +507,12 @@ def test_perform_chunked_integrity_check(tmpdir): offsets1, _ = d.next_offsets() d.write_unchecked_hmac_data(offsets1, data) ucc1 = d._unchecked_chunks[offsets1.chunk_num] + ucc['decrypted'] = True + ucc1['decrypted'] = True d.perform_chunked_integrity_check() - assert not ucc.file_path.exists() - assert not ucc1.file_path.exists() + assert ucc['ucc'].file_path != d.final_path + assert ucc1['ucc'].file_path != d.final_path assert d._next_integrity_chunk == 2 assert 0 not in d._unchecked_chunks assert 1 not in d._unchecked_chunks @@ -529,6 +533,7 @@ def test_perform_chunked_integrity_check(tmpdir): ase = azmodels.StorageEntity('cont') ase._size = 32 ase._name = 'blob' + ase._client = mock.MagicMock() ase._md5 = md5.hexdigest() rmgr = rops.DownloadResumeManager(resumefile) @@ -539,7 +544,7 @@ def test_perform_chunked_integrity_check(tmpdir): d.perform_chunked_integrity_check() assert d._next_integrity_chunk == 1 assert len(d._unchecked_chunks) == 0 - dr = rmgr.get_record(str(fp)) + dr = rmgr.get_record(ase) assert dr.next_integrity_chunk == 1 assert dr.md5hexdigest == md5.hexdigest() @@ -553,11 +558,12 @@ def test_update_resume_for_completed(tmpdir): ase = azmodels.StorageEntity('cont') ase._size = 32 ase._name = 'blob' + ase._client = mock.MagicMock() rmgr = rops.DownloadResumeManager(resumefile) d = models.Descriptor(fp, ase, opts, rmgr) offsets, _ = d.next_offsets() d._update_resume_for_completed() - dr = rmgr.get_record(str(fp)) + dr = rmgr.get_record(ase) assert dr.completed @@ -575,8 +581,8 @@ def test_cleanup_all_temporary_files(tmpdir): d.write_unchecked_data(offsets, data) assert len(d._unchecked_chunks) == 1 d.cleanup_all_temporary_files() - assert not d.local_path.exists() - assert not d._unchecked_chunks[0].file_path.exists() + assert not d.final_path.exists() + assert not d._unchecked_chunks[0]['ucc'].file_path.exists() lp = pathlib.Path(str(tmpdir.join('b'))) d = models.Descriptor(lp, ase, opts, None) @@ -585,11 +591,10 @@ def test_cleanup_all_temporary_files(tmpdir): data = b'0' * opts.chunk_size_bytes d.write_unchecked_hmac_data(offsets, data) assert len(d._unchecked_chunks) == 1 - d.local_path.unlink() - d._unchecked_chunks[0].file_path.unlink() + d._unchecked_chunks[0]['ucc'].file_path.unlink() d.cleanup_all_temporary_files() - assert not d.local_path.exists() - assert not d._unchecked_chunks[0].file_path.exists() + assert not d.final_path.exists() + assert not d._unchecked_chunks[0]['ucc'].file_path.exists() def test_write_data(tmpdir): @@ -606,11 +611,11 @@ def test_write_data(tmpdir): data = b'0' * ase._size d.write_data(offsets, data) - assert d.local_path.exists() - assert d.local_path.stat().st_size == len(data) + assert d.final_path.exists() + assert d.final_path.stat().st_size == len(data) -def test_finalize_file(tmpdir): +def test_finalize_integrity_and_file(tmpdir): # already finalized lp = pathlib.Path(str(tmpdir.join('af'))) opts = mock.MagicMock() @@ -624,11 +629,12 @@ def test_finalize_file(tmpdir): d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() d._finalized = True + d.finalize_integrity() d.finalize_file() - assert d.local_path.exists() - assert not d.final_path.exists() - d.local_path.unlink() + assert d.final_path.exists() + assert d.final_path.stat().st_size == ase._size + d.final_path.unlink() # hmac check success lp = pathlib.Path(str(tmpdir.join('a'))) @@ -654,9 +660,9 @@ def test_finalize_file(tmpdir): d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() d.hmac.update(data) + d.finalize_integrity() d.finalize_file() - assert not d.local_path.exists() assert d.final_path.exists() assert d.final_path.stat().st_size == len(data) @@ -676,9 +682,9 @@ def test_finalize_file(tmpdir): d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() d.md5.update(data) + d.finalize_integrity() d.finalize_file() - assert not d.local_path.exists() assert d.final_path.exists() assert d.final_path.stat().st_size == len(data) @@ -694,9 +700,9 @@ def test_finalize_file(tmpdir): d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() + d.finalize_integrity() d.finalize_file() - assert not d.local_path.exists() assert d.final_path.exists() assert d.final_path.stat().st_size == len(data) @@ -714,9 +720,9 @@ def test_finalize_file(tmpdir): d = models.Descriptor(lp, ase, opts, None) d._allocate_disk_space() d.md5.update(data) + d.finalize_integrity() d.finalize_file() - assert not d.local_path.exists() assert not d.final_path.exists() diff --git a/tests/test_blobxfer_models_options.py b/tests/test_blobxfer_models_options.py index 1ee72bb..31edde7 100644 --- a/tests/test_blobxfer_models_options.py +++ b/tests/test_blobxfer_models_options.py @@ -21,22 +21,38 @@ def test_concurrency_options(patched_cc): a = options.Concurrency( crypto_processes=-1, md5_processes=0, + disk_threads=-1, transfer_threads=-2, ) assert a.crypto_processes == 0 assert a.md5_processes == 1 + assert a.disk_threads == 2 + assert a.transfer_threads == 4 + + a = options.Concurrency( + crypto_processes=-1, + md5_processes=0, + disk_threads=1, + transfer_threads=-1, + ) + + assert a.crypto_processes == 0 + assert a.md5_processes == 1 + assert a.disk_threads == 1 assert a.transfer_threads == 4 @mock.patch('multiprocessing.cpu_count', return_value=64) -def test_concurrency_options_max_transfer_threads(patched_cc): +def test_concurrency_options_max_disk_and_transfer_threads(patched_cc): a = options.Concurrency( crypto_processes=1, md5_processes=1, + disk_threads=None, transfer_threads=None, ) + assert a.disk_threads == 64 assert a.transfer_threads == 96 @@ -45,7 +61,8 @@ def test_general_options(): concurrency=options.Concurrency( crypto_processes=1, md5_processes=2, - transfer_threads=3, + disk_threads=3, + transfer_threads=4, ), log_file='abc.log', progress_bar=False, @@ -56,7 +73,8 @@ def test_general_options(): assert a.concurrency.crypto_processes == 1 assert a.concurrency.md5_processes == 2 - assert a.concurrency.transfer_threads == 3 + assert a.concurrency.disk_threads == 3 + assert a.concurrency.transfer_threads == 4 assert a.log_file == 'abc.log' assert not a.progress_bar assert a.resume_file == pathlib.Path('abc') @@ -67,7 +85,8 @@ def test_general_options(): concurrency=options.Concurrency( crypto_processes=1, md5_processes=2, - transfer_threads=3, + disk_threads=3, + transfer_threads=4, ), progress_bar=False, resume_file=None, @@ -77,7 +96,8 @@ def test_general_options(): assert a.concurrency.crypto_processes == 1 assert a.concurrency.md5_processes == 2 - assert a.concurrency.transfer_threads == 3 + assert a.concurrency.disk_threads == 3 + assert a.concurrency.transfer_threads == 4 assert a.log_file is None assert not a.progress_bar assert a.resume_file is None diff --git a/tests/test_blobxfer_models_resume.py b/tests/test_blobxfer_models_resume.py index 55a6009..7fb12a3 100644 --- a/tests/test_blobxfer_models_resume.py +++ b/tests/test_blobxfer_models_resume.py @@ -8,9 +8,8 @@ import blobxfer.models.resume as rmodels def test_download(): - d = rmodels.Download('fp', 'tp', 1, 2, 0, False, '') + d = rmodels.Download('fp', 1, 2, 0, False, '') assert d.final_path == 'fp' - assert d.temp_path == 'tp' assert d.length == 1 assert d.chunk_size == 2 assert d.next_integrity_chunk == 0 diff --git a/tests/test_blobxfer_models_upload.py b/tests/test_blobxfer_models_upload.py index e6447d7..7d9e057 100644 --- a/tests/test_blobxfer_models_upload.py +++ b/tests/test_blobxfer_models_upload.py @@ -7,7 +7,6 @@ try: except ImportError: # noqa import pathlib # non-stdlib imports -import pytest # module under test import blobxfer.models.upload as upload @@ -26,14 +25,10 @@ def test_localsourcepaths_files(tmpdir): defpath.join('moo.cow').write('y') a = upload.LocalSourcePath() - a.add_include('*.txt') + a.add_includes('*.txt') a.add_includes(['moo.cow', '*blah*']) - with pytest.raises(ValueError): - a.add_includes('abc') - a.add_exclude('**/blah.x') + a.add_excludes('**/blah.x') a.add_excludes(['world.txt']) - with pytest.raises(ValueError): - a.add_excludes('abc') a.add_path(str(tmpdir)) a_set = set() for file in a.files(): @@ -47,9 +42,9 @@ def test_localsourcepaths_files(tmpdir): b = upload.LocalSourcePath() b.add_includes(['moo.cow', '*blah*']) - b.add_include('*.txt') + b.add_includes('*.txt') b.add_excludes(['world.txt']) - b.add_exclude('**/blah.x') + b.add_excludes('**/blah.x') b.add_paths([pathlib.Path(str(tmpdir))]) for file in a.files(): sfile = str(file.parent_path / file.relative_path) diff --git a/tests/test_blobxfer_operations_azure.py b/tests/test_blobxfer_operations_azure.py index 346fab6..0322aa4 100644 --- a/tests/test_blobxfer_operations_azure.py +++ b/tests/test_blobxfer_operations_azure.py @@ -106,7 +106,7 @@ def test_azuresourcepath_files(patched_lf, patched_em): i = 0 for file in asp.files(creds, options, mock.MagicMock()): i += 1 - assert file.name == 'name' + assert file.name == 'remote/name' assert file.encryption_metadata is None assert i == 1 @@ -119,7 +119,7 @@ def test_azuresourcepath_files(patched_lf, patched_em): i = 0 for file in asp.files(creds, options, mock.MagicMock()): i += 1 - assert file.name == 'name' + assert file.name == 'remote/name' assert file.encryption_metadata is not None assert i == 1 diff --git a/tests/test_blobxfer_operations_crypto.py b/tests/test_blobxfer_operations_crypto.py index d3fdc62..f3dfc61 100644 --- a/tests/test_blobxfer_operations_crypto.py +++ b/tests/test_blobxfer_operations_crypto.py @@ -118,7 +118,7 @@ def test_cryptooffload_decrypt(tmpdir): unpad=False, ) a.add_decrypt_chunk( - 'fp', str(bfile), offsets, symkey, iv, hmacfile) + str(bfile), 0, offsets, symkey, iv, hmacfile) i = 33 checked = False while i > 0: @@ -127,7 +127,7 @@ def test_cryptooffload_decrypt(tmpdir): time.sleep(0.3) i -= 1 continue - assert result == 'fp' + assert result == (str(bfile), offsets) checked = True break assert checked diff --git a/tests/test_blobxfer_operations_download.py b/tests/test_blobxfer_operations_download.py index 08702b1..fcc2865 100644 --- a/tests/test_blobxfer_operations_download.py +++ b/tests/test_blobxfer_operations_download.py @@ -285,18 +285,26 @@ def test_pre_md5_skip_on_check(): rfile = azmodels.StorageEntity('cont') rfile._encryption = mock.MagicMock() rfile._encryption.blobxfer_extensions = mock.MagicMock() - rfile._encryption.blobxfer_extensions.pre_encrypted_content_md5 = \ - 'abc' + rfile._encryption.blobxfer_extensions.pre_encrypted_content_md5 = 'abc' + rfile._client = mock.MagicMock() + rfile._client.primary_endpoint = 'ep' + rfile._name = 'name' + rfile._vio = None lpath = 'lpath' + key = ops.Downloader.create_unique_transfer_operation_id(rfile) d._pre_md5_skip_on_check(lpath, rfile) - assert lpath in d._md5_map + assert key in d._md5_map + rfile._name = 'name2' lpath = 'lpath2' rfile._encryption = None rfile._md5 = 'abc' + key = ops.Downloader.create_unique_transfer_operation_id(rfile) d._pre_md5_skip_on_check(lpath, rfile) - assert lpath in d._md5_map + assert key in d._md5_map + + assert len(d._md5_map) == 2 def test_post_md5_skip_on_check(tmpdir): @@ -309,28 +317,45 @@ def test_post_md5_skip_on_check(tmpdir): lpath = str(lp) rfile = azmodels.StorageEntity('cont') rfile._md5 = 'abc' + rfile._client = mock.MagicMock() + rfile._client.primary_endpoint = 'ep' + rfile._name = 'name' + rfile._vio = None + rfile._size = 256 d._pre_md5_skip_on_check(lpath, rfile) - d._download_set.add(pathlib.Path(lpath)) - assert lpath in d._md5_map + key = ops.Downloader.create_unique_transfer_operation_id(rfile) + d._transfer_set.add(key) + assert key in d._md5_map - d._post_md5_skip_on_check(lpath, True) - assert lpath not in d._md5_map + d._post_md5_skip_on_check(key, lpath, rfile._size, True) + assert key not in d._md5_map d._add_to_download_queue = mock.MagicMock() d._pre_md5_skip_on_check(lpath, rfile) - d._download_set.add(pathlib.Path(lpath)) - d._post_md5_skip_on_check(lpath, False) + d._transfer_set.add(key) + d._post_md5_skip_on_check(key, lpath, rfile._size, False) assert d._add_to_download_queue.call_count == 1 def test_check_for_downloads_from_md5(): lpath = 'lpath' + rfile = azmodels.StorageEntity('cont') + rfile._md5 = 'abc' + rfile._client = mock.MagicMock() + rfile._client.primary_endpoint = 'ep' + rfile._name = 'name' + rfile._vio = None + rfile._size = 256 + key = ops.Downloader.create_unique_transfer_operation_id(rfile) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._md5_map[lpath] = mock.MagicMock() - d._download_set.add(pathlib.Path(lpath)) + d._md5_map[key] = rfile + d._transfer_set.add(key) d._md5_offload = mock.MagicMock() d._md5_offload.done_cv = multiprocessing.Condition() - d._md5_offload.pop_done_queue.side_effect = [None, (lpath, False)] + d._md5_offload.pop_done_queue.side_effect = [ + None, + (key, lpath, rfile._size, False), + ] d._add_to_download_queue = mock.MagicMock() d._all_remote_files_processed = False d._download_terminate = True @@ -343,11 +368,14 @@ def test_check_for_downloads_from_md5(): new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._md5_map[lpath] = mock.MagicMock() - d._download_set.add(pathlib.Path(lpath)) + d._md5_map[key] = rfile + d._transfer_set.add(key) d._md5_offload = mock.MagicMock() d._md5_offload.done_cv = multiprocessing.Condition() - d._md5_offload.pop_done_queue.side_effect = [None, (lpath, False)] + d._md5_offload.pop_done_queue.side_effect = [ + None, + (key, lpath, rfile._size, False), + ] d._add_to_download_queue = mock.MagicMock() patched_tc.side_effect = [False, False, True] d._check_for_downloads_from_md5() @@ -359,8 +387,8 @@ def test_check_for_downloads_from_md5(): new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._md5_map[lpath] = mock.MagicMock() - d._download_set.add(pathlib.Path(lpath)) + d._md5_map[key] = rfile + d._transfer_set.add(key) d._md5_offload = mock.MagicMock() d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.pop_done_queue.side_effect = [None] @@ -372,15 +400,25 @@ def test_check_for_downloads_from_md5(): def test_check_for_crypto_done(): lpath = 'lpath' + rfile = azmodels.StorageEntity('cont') + rfile._md5 = 'abc' + rfile._client = mock.MagicMock() + rfile._client.primary_endpoint = 'ep' + rfile._name = 'name' + rfile._vio = None + rfile._size = 256 + key = ops.Downloader.create_unique_transfer_operation_id(rfile) d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._download_set.add(pathlib.Path(lpath)) + d._transfer_set.add(key) dd = mock.MagicMock() d._dd_map[lpath] = dd + offsets = mock.MagicMock() + offsets.range_start = 0 d._crypto_offload = mock.MagicMock() d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.pop_done_queue.side_effect = [ None, - lpath, + (lpath, offsets) ] d._all_remote_files_processed = False d._download_terminate = True @@ -393,14 +431,16 @@ def test_check_for_crypto_done(): new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._download_set.add(pathlib.Path(lpath)) + d._transfer_set.add(key) dd = mock.MagicMock() + dd.entity = rfile + dd.final_path = lpath d._dd_map[lpath] = dd d._crypto_offload = mock.MagicMock() d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.pop_done_queue.side_effect = [ None, - lpath, + (lpath, offsets), ] patched_tc.side_effect = [False, False, True] d._complete_chunk_download = mock.MagicMock() @@ -413,13 +453,15 @@ def test_check_for_crypto_done(): new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) - d._download_set.add(pathlib.Path(lpath)) + d._transfer_set.add(key) dd = mock.MagicMock() + dd.entity = rfile + dd.final_path = lpath d._crypto_offload = mock.MagicMock() d._crypto_offload.done_cv = multiprocessing.Condition() d._crypto_offload.pop_done_queue.side_effect = [ None, - lpath, + (lpath, offsets), ] patched_tc.side_effect = [False, False, True] d._complete_chunk_download = mock.MagicMock() @@ -438,39 +480,41 @@ def test_add_to_download_queue(tmpdir): d._spec.options.chunk_size_bytes = 1 d._add_to_download_queue(lpath, ase) - assert d._download_queue.qsize() == 1 + assert d._transfer_queue.qsize() == 1 assert path in d._dd_map -def test_initialize_and_terminate_download_threads(): +def test_initialize_and_terminate_transfer_threads(): opts = mock.MagicMock() opts.concurrency.transfer_threads = 2 d = ops.Downloader(opts, mock.MagicMock(), mock.MagicMock()) - d._worker_thread_download = mock.MagicMock() + d._worker_thread_transfer = mock.MagicMock() - d._initialize_download_threads() - assert len(d._download_threads) == 2 + d._initialize_transfer_threads() + assert len(d._transfer_threads) == 2 - d._wait_for_download_threads(terminate=True) + d._wait_for_transfer_threads(terminate=True) assert d._download_terminate - for thr in d._download_threads: + for thr in d._transfer_threads: assert not thr.is_alive() @mock.patch('blobxfer.operations.crypto.aes_cbc_decrypt_data') @mock.patch('blobxfer.operations.azure.file.get_file_range') @mock.patch('blobxfer.operations.azure.blob.get_blob_range') -def test_worker_thread_download( +def test_worker_thread_transfer( patched_gbr, patched_gfr, patched_acdd, tmpdir): d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._complete_chunk_download = mock.MagicMock() d._download_terminate = True - d._worker_thread_download() + d._general_options.concurrency.transfer_threads = 1 + d._general_options.concurrency.disk_threads = 1 + d._worker_thread_transfer() assert d._complete_chunk_download.call_count == 0 d._download_terminate = False d._all_remote_files_processed = True - d._worker_thread_download() + d._worker_thread_transfer() assert d._complete_chunk_download.call_count == 0 with mock.patch( @@ -486,11 +530,11 @@ def test_worker_thread_download( opts.check_file_md5 = False opts.chunk_size_bytes = 16 dd = models.Descriptor(lp, ase, opts, None) - d._download_queue = mock.MagicMock() - d._download_queue.get.side_effect = [queue.Empty, dd] + d._transfer_queue = mock.MagicMock() + d._transfer_queue.get.side_effect = [queue.Empty, dd] d._process_download_descriptor = mock.MagicMock() d._process_download_descriptor.side_effect = RuntimeError('oops') - d._worker_thread_download() + d._worker_thread_transfer() assert len(d._exceptions) == 1 assert d._process_download_descriptor.call_count == 1 @@ -503,26 +547,35 @@ def test_worker_thread_download( new_callable=mock.PropertyMock) as patched_aoc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + d._general_options.concurrency.transfer_threads = 1 + d._general_options.concurrency.disk_threads = 1 opts = mock.MagicMock() opts.check_file_md5 = False opts.chunk_size_bytes = 16 ase = azmodels.StorageEntity('cont') ase._size = 16 + ase._client = mock.MagicMock() + ase._client.primary_endpoint = 'ep' + ase._name = 'name' + ase._vio = None + key = ops.Downloader.create_unique_transfer_operation_id(ase) ase._encryption = mock.MagicMock() ase._encryption.symmetric_key = b'abc' lp = pathlib.Path(str(tmpdir.join('a'))) dd = models.Descriptor(lp, ase, opts, None) dd.next_offsets = mock.MagicMock( side_effect=[(None, 1), (None, 2)]) + dd.finalize_integrity = mock.MagicMock() dd.finalize_file = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock() + dd.all_operations_completed.side_effect = [False, True] patched_aoc.side_effect = [False, True] patched_tc.side_effect = [False, False, False, True] d._dd_map[str(lp)] = dd - d._download_set.add(lp) - d._download_queue = mock.MagicMock() - d._download_queue.get.side_effect = [queue.Empty, dd, dd] - d._worker_thread_download() + d._transfer_set.add(key) + d._transfer_queue = mock.MagicMock() + d._transfer_queue.get.side_effect = [queue.Empty, dd, dd] + d._worker_thread_transfer() assert str(lp) not in d._dd_map assert dd.finalize_file.call_count == 1 assert d._download_sofar == 1 @@ -533,23 +586,33 @@ def test_worker_thread_download( new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + d._general_options.concurrency.transfer_threads = 1 + d._general_options.concurrency.disk_threads = 1 opts = mock.MagicMock() opts.check_file_md5 = True opts.chunk_size_bytes = 16 ase = azmodels.StorageEntity('cont') ase._mode = azmodels.StorageModes.File ase._size = 16 + ase._client = mock.MagicMock() + ase._client.primary_endpoint = 'ep' + ase._name = 'name' + ase._vio = None + key = ops.Downloader.create_unique_transfer_operation_id(ase) patched_gfr.return_value = b'0' * ase._size lp = pathlib.Path(str(tmpdir.join('b'))) dd = models.Descriptor(lp, ase, opts, None) dd.finalize_file = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock() d._dd_map[str(lp)] = mock.MagicMock() - d._download_set.add(lp) - d._download_queue = mock.MagicMock() - d._download_queue.get.side_effect = [dd] + d._transfer_set.add(key) + d._transfer_queue = mock.MagicMock() + d._transfer_queue.get.side_effect = [dd] patched_tc.side_effect = [False, True] - d._worker_thread_download() + d._worker_thread_transfer() + assert len(d._disk_set) == 1 + a, b, c = d._disk_queue.get() + d._process_data(a, b, c) assert dd.perform_chunked_integrity_check.call_count == 1 with mock.patch( @@ -557,6 +620,8 @@ def test_worker_thread_download( new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) + d._general_options.concurrency.transfer_threads = 1 + d._general_options.concurrency.disk_threads = 1 opts = mock.MagicMock() opts.check_file_md5 = False opts.chunk_size_bytes = 16 @@ -566,6 +631,11 @@ def test_worker_thread_download( ase._encryption = mock.MagicMock() ase._encryption.symmetric_key = b'abc' ase._encryption.content_encryption_iv = b'0' * 16 + ase._client = mock.MagicMock() + ase._client.primary_endpoint = 'ep' + ase._name = 'name' + ase._vio = None + key = ops.Downloader.create_unique_transfer_operation_id(ase) patched_gfr.return_value = b'0' * ase._size lp = pathlib.Path(str(tmpdir.join('c'))) dd = models.Descriptor(lp, ase, opts, None) @@ -575,11 +645,14 @@ def test_worker_thread_download( d._crypto_offload = mock.MagicMock() d._crypto_offload.add_decrypt_chunk = mock.MagicMock() d._dd_map[str(lp)] = dd - d._download_set.add(lp) - d._download_queue = mock.MagicMock() - d._download_queue.get.side_effect = [dd] + d._transfer_set.add(key) + d._transfer_queue = mock.MagicMock() + d._transfer_queue.get.side_effect = [dd] patched_tc.side_effect = [False, True] - d._worker_thread_download() + d._worker_thread_transfer() + assert len(d._disk_set) == 1 + a, b, c = d._disk_queue.get() + d._process_data(a, b, c) assert d._crypto_offload.add_decrypt_chunk.call_count == 1 assert dd.write_unchecked_hmac_data.call_count == 1 @@ -589,6 +662,8 @@ def test_worker_thread_download( d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.concurrency.crypto_processes = 0 + d._general_options.concurrency.transfer_threads = 1 + d._general_options.concurrency.disk_threads = 1 opts = mock.MagicMock() opts.check_file_md5 = False opts.chunk_size_bytes = 16 @@ -598,19 +673,28 @@ def test_worker_thread_download( ase._encryption = mock.MagicMock() ase._encryption.symmetric_key = b'abc' ase._encryption.content_encryption_iv = b'0' * 16 + ase._client = mock.MagicMock() + ase._client.primary_endpoint = 'ep' + ase._name = 'name' + ase._vio = None + key = ops.Downloader.create_unique_transfer_operation_id(ase) patched_gfr.return_value = b'0' * ase._size lp = pathlib.Path(str(tmpdir.join('d'))) dd = models.Descriptor(lp, ase, opts, None) dd.next_offsets() dd.write_unchecked_hmac_data = mock.MagicMock() dd.perform_chunked_integrity_check = mock.MagicMock() + dd.mark_unchecked_chunk_decrypted = mock.MagicMock() patched_acdd.return_value = b'0' * 16 d._dd_map[str(lp)] = mock.MagicMock() - d._download_set.add(lp) - d._download_queue = mock.MagicMock() - d._download_queue.get.side_effect = [dd] + d._transfer_set.add(key) + d._transfer_queue = mock.MagicMock() + d._transfer_queue.get.side_effect = [dd, dd] patched_tc.side_effect = [False, True] - d._worker_thread_download() + d._worker_thread_transfer() + assert len(d._disk_set) == 1 + a, b, c = d._disk_queue.get() + d._process_data(a, b, c) assert patched_acdd.call_count == 1 assert dd.write_unchecked_hmac_data.call_count == 1 assert dd.perform_chunked_integrity_check.call_count == 1 @@ -631,7 +715,7 @@ def test_cleanup_temporary_files(tmpdir): d._general_options.resume_file = pathlib.Path('abc') d._dd_map[0] = dd d._cleanup_temporary_files() - assert dd.local_path.exists() + assert dd.final_path.exists() lp = pathlib.Path(str(tmpdir.join('b'))) opts = mock.MagicMock() @@ -645,7 +729,7 @@ def test_cleanup_temporary_files(tmpdir): d._general_options.resume_file = None d._dd_map[0] = dd d._cleanup_temporary_files() - assert not dd.local_path.exists() + assert not dd.final_path.exists() lp = pathlib.Path(str(tmpdir.join('c'))) opts = mock.MagicMock() @@ -661,7 +745,7 @@ def test_cleanup_temporary_files(tmpdir): d._general_options.resume_file = None d._dd_map[0] = dd d._cleanup_temporary_files() - assert dd.local_path.exists() + assert dd.final_path.exists() def test_catalog_local_files_for_deletion(tmpdir): @@ -699,21 +783,16 @@ def test_delete_extraneous_files(tmpdir): d._delete_extraneous_files() -@mock.patch('blobxfer.operations.md5.LocalFileMd5Offload') -@mock.patch('blobxfer.operations.azure.blob.list_blobs') -@mock.patch( - 'blobxfer.operations.download.Downloader.ensure_local_destination', - return_value=True -) -def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir): +def _create_downloader_for_start(td): d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._cleanup_temporary_files = mock.MagicMock() d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal()) - d._initialize_download_threads = mock.MagicMock() - patched_lfmo._check_thread = mock.MagicMock() - d._general_options.concurrency.crypto_processes = 1 + d._initialize_transfer_threads = mock.MagicMock() + d._general_options.concurrency.crypto_processes = 0 d._general_options.concurrency.md5_processes = 1 - d._general_options.resume_file = pathlib.Path(str(tmpdir.join('rf'))) + d._general_options.concurrency.disk_threads = 1 + d._general_options.concurrency.transfer_threads = 1 + d._general_options.resume_file = pathlib.Path(str(td.join('rf'))) d._spec.sources = [] d._spec.options = mock.MagicMock() d._spec.options.chunk_size_bytes = 1 @@ -725,50 +804,84 @@ def test_start(patched_eld, patched_lb, patched_lfmo, tmpdir): d._spec.skip_on.lmt_ge = False d._spec.skip_on.filesize_match = False d._spec.destination = mock.MagicMock() - d._spec.destination.path = pathlib.Path(str(tmpdir)) + d._spec.destination.path = pathlib.Path(str(td)) d._download_start_time = util.datetime_now() + d._pre_md5_skip_on_check = mock.MagicMock() + d._check_download_conditions = mock.MagicMock() + d._all_remote_files_processed = False p = '/cont/remote/path' asp = azops.SourcePath() asp.add_path_with_storage_account(p, 'sa') d._spec.sources.append(asp) - b = azure.storage.blob.models.Blob(name='name') + return d + + +@mock.patch('blobxfer.operations.md5.LocalFileMd5Offload') +@mock.patch('blobxfer.operations.azure.blob.list_blobs') +@mock.patch( + 'blobxfer.operations.download.Downloader.ensure_local_destination', + return_value=True +) +@mock.patch( + 'blobxfer.operations.download.Downloader.' + 'create_unique_transfer_operation_id', + return_value='id' +) +@mock.patch( + 'blobxfer.operations.download.Downloader._wait_for_transfer_threads', + return_value=None +) +@mock.patch( + 'blobxfer.operations.download.Downloader._wait_for_disk_threads', + return_value=None +) +def test_start( + patched_wdt, patched_wtt, patched_cutoi, patched_eld, patched_lb, + patched_lfmo, tmpdir): + patched_lfmo._check_thread = mock.MagicMock() + + b = azure.storage.blob.models.Blob(name='remote/path/name') b.properties.content_length = 1 patched_lb.side_effect = [[b]] - d._pre_md5_skip_on_check = mock.MagicMock() - d._check_download_conditions = mock.MagicMock() + d = _create_downloader_for_start(tmpdir) d._check_download_conditions.return_value = ops.DownloadAction.Skip + d._download_sofar = -1 + d._download_bytes_sofar = -1 d.start() assert d._pre_md5_skip_on_check.call_count == 0 patched_lb.side_effect = [[b]] - d._all_remote_files_processed = False + d = _create_downloader_for_start(tmpdir) d._check_download_conditions.return_value = ops.DownloadAction.CheckMd5 + d._download_sofar = -1 with pytest.raises(RuntimeError): d.start() + d._download_terminate = True assert d._pre_md5_skip_on_check.call_count == 1 b.properties.content_length = 0 patched_lb.side_effect = [[b]] - d._all_remote_files_processed = False + d = _create_downloader_for_start(tmpdir) d._check_download_conditions.return_value = ops.DownloadAction.Download with pytest.raises(RuntimeError): d.start() - assert d._download_queue.qsize() == 1 + d._download_terminate = True + assert d._transfer_queue.qsize() == 1 # test exception count b = azure.storage.blob.models.Blob(name='name') b.properties.content_length = 1 patched_lb.side_effect = [[b]] + d = _create_downloader_for_start(tmpdir) d._spec.destination.is_dir = False d._spec.options.rename = True - d._pre_md5_skip_on_check = mock.MagicMock() - d._check_download_conditions = mock.MagicMock() d._check_download_conditions.return_value = ops.DownloadAction.Skip d._exceptions = [RuntimeError('oops')] with pytest.raises(RuntimeError): d.start() + d._download_terminate = True assert d._pre_md5_skip_on_check.call_count == 0 @@ -776,11 +889,11 @@ def test_start_keyboard_interrupt(): d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.resume_file = None d._run = mock.MagicMock(side_effect=KeyboardInterrupt) - d._wait_for_download_threads = mock.MagicMock() + d._wait_for_transfer_threads = mock.MagicMock() d._cleanup_temporary_files = mock.MagicMock() d._md5_offload = mock.MagicMock() with pytest.raises(KeyboardInterrupt): d.start() - assert d._wait_for_download_threads.call_count == 1 + assert d._wait_for_transfer_threads.call_count == 1 assert d._cleanup_temporary_files.call_count == 1 diff --git a/tests/test_blobxfer_operations_md5.py b/tests/test_blobxfer_operations_md5.py index 5bd7b20..02be647 100644 --- a/tests/test_blobxfer_operations_md5.py +++ b/tests/test_blobxfer_operations_md5.py @@ -57,6 +57,8 @@ def test_finalize_md5_processes(): def test_from_add_to_done_non_pagealigned(tmpdir): file = tmpdir.join('a') file.write('abc') + fpath = str(file) + key = 'key' remote_md5 = ops.compute_md5_for_file_asbase64(str(file)) @@ -67,7 +69,7 @@ def test_from_add_to_done_non_pagealigned(tmpdir): assert result is None a.add_localfile_for_md5_check( - str(file), remote_md5, azmodels.StorageModes.Block) + key, fpath, fpath, remote_md5, azmodels.StorageModes.Block, None) i = 33 checked = False while i > 0: @@ -76,9 +78,11 @@ def test_from_add_to_done_non_pagealigned(tmpdir): time.sleep(0.3) i -= 1 continue - assert len(result) == 2 - assert result[0] == str(file) - assert result[1] + assert len(result) == 4 + assert result[0] == key + assert result[1] == str(file) + assert result[2] is None + assert result[3] checked = True break assert checked @@ -90,6 +94,8 @@ def test_from_add_to_done_non_pagealigned(tmpdir): def test_from_add_to_done_pagealigned(tmpdir): file = tmpdir.join('a') file.write('abc') + fpath = str(file) + key = 'key' remote_md5 = ops.compute_md5_for_file_asbase64(str(file), True) @@ -100,7 +106,7 @@ def test_from_add_to_done_pagealigned(tmpdir): assert result is None a.add_localfile_for_md5_check( - str(file), remote_md5, azmodels.StorageModes.Page) + key, fpath, fpath, remote_md5, azmodels.StorageModes.Page, None) i = 33 checked = False while i > 0: @@ -109,9 +115,11 @@ def test_from_add_to_done_pagealigned(tmpdir): time.sleep(0.3) i -= 1 continue - assert len(result) == 2 - assert result[0] == str(file) - assert result[1] + assert len(result) == 4 + assert result[0] == key + assert result[1] == str(file) + assert result[2] is None + assert result[3] checked = True break assert checked diff --git a/tests/test_blobxfer_operations_progress.py b/tests/test_blobxfer_operations_progress.py index 75f9f79..721501e 100644 --- a/tests/test_blobxfer_operations_progress.py +++ b/tests/test_blobxfer_operations_progress.py @@ -13,12 +13,12 @@ import blobxfer.util as util import blobxfer.operations.progress as ops -def test_output_download_parameters(): +def test_output_parameters(): go = mock.MagicMock() spec = mock.MagicMock() go.log_file = 'abc' - ops.output_download_parameters(go, spec) + ops.output_parameters(go, spec) assert util.is_not_empty(go.log_file) diff --git a/tests/test_blobxfer_operations_resume.py b/tests/test_blobxfer_operations_resume.py index 52f11b8..9894d3b 100644 --- a/tests/test_blobxfer_operations_resume.py +++ b/tests/test_blobxfer_operations_resume.py @@ -2,6 +2,10 @@ """Tests for operations resume""" # stdlib imports +try: + import unittest.mock as mock +except ImportError: # noqa + import mock try: import pathlib2 as pathlib except ImportError: # noqa @@ -23,23 +27,28 @@ def test_download_resume_manager(tmpdir): assert drm._data is None assert not tmpdb.exists() + ase = mock.MagicMock() + ase._name = 'name' + ase._client.primary_endpoint = 'ep' + ase._size = 16 + final_path = 'fp' drm = ops.DownloadResumeManager(tmpdb) - drm.add_or_update_record(final_path, 'tp', 1, 2, 0, False, None) - d = drm.get_record(final_path) + drm.add_or_update_record(final_path, ase, 2, 0, False, None) + d = drm.get_record(ase) assert d.final_path == final_path - drm.add_or_update_record(final_path, 'tp', 1, 2, 1, False, 'abc') - d = drm.get_record(final_path) + drm.add_or_update_record(final_path, ase, 2, 1, False, 'abc') + d = drm.get_record(ase) assert d.final_path == final_path assert not d.completed assert d.next_integrity_chunk == 1 assert d.md5hexdigest == 'abc' - drm.add_or_update_record(final_path, 'tp', 1, 2, 1, True, None) - d = drm.get_record(final_path) + drm.add_or_update_record(final_path, ase, 2, 1, True, None) + d = drm.get_record(ase) assert d.final_path == final_path assert d.completed @@ -47,8 +56,8 @@ def test_download_resume_manager(tmpdir): assert d.md5hexdigest == 'abc' # idempotent check after completed - drm.add_or_update_record(final_path, 'tp', 1, 2, 1, True, None) - d = drm.get_record(final_path) + drm.add_or_update_record(final_path, ase, 2, 1, True, None) + d = drm.get_record(ase) assert d.final_path == final_path assert d.completed diff --git a/tests/test_blobxfer_retry.py b/tests/test_blobxfer_retry.py index 9d84b90..d44fa21 100644 --- a/tests/test_blobxfer_retry.py +++ b/tests/test_blobxfer_retry.py @@ -17,26 +17,34 @@ def test_exponentialretrywithmaxwait(): er = retry.ExponentialRetryWithMaxWait( initial_backoff=1, max_backoff=0) + with pytest.raises(ValueError): + er = retry.ExponentialRetryWithMaxWait( + initial_backoff=1, max_backoff=1, max_retries=-1) + + with pytest.raises(ValueError): + er = retry.ExponentialRetryWithMaxWait( + initial_backoff=2, max_backoff=1) + er = retry.ExponentialRetryWithMaxWait() context = mock.MagicMock() context.count = 0 context.response.status = 500 bo = er.retry(context) assert context.count == 1 - assert bo == 1 + assert bo == 0.1 bo = er.retry(context) assert context.count == 2 - assert bo == 2 + assert bo == 0.2 bo = er.retry(context) assert context.count == 3 - assert bo == 4 + assert bo == 0.4 bo = er.retry(context) assert context.count == 4 - assert bo == 8 + assert bo == 0.8 bo = er.retry(context) - assert context.count == 1 - assert bo == 1 + assert context.count == 5 + assert bo == 0.1 diff --git a/tox.ini b/tox.ini index 58a6df6..d05615f 100644 --- a/tox.ini +++ b/tox.ini @@ -4,8 +4,8 @@ envlist = py27, py35 [testenv] deps = -rtest_requirements.txt commands = - #flake8 {envsitepackagesdir}/blobxfer_cli/ - #flake8 {envsitepackagesdir}/blobxfer/ + flake8 {envsitepackagesdir}/blobxfer_cli/ + flake8 {envsitepackagesdir}/blobxfer/ py.test \ -x -l -s \ --ignore venv/ \