From 8f46622853b43ae6a93d313942498cbca2f8e183 Mon Sep 17 00:00:00 2001 From: Fred Park Date: Fri, 16 Aug 2019 14:36:32 -0700 Subject: [PATCH] Fix Multiprocessing Offloads for Python 3.7+ - Resolves #104 --- blobxfer/models/offload.py | 15 ++++++++++- blobxfer/operations/crypto.py | 29 +++++++++++++--------- blobxfer/operations/download.py | 15 +++++++---- blobxfer/operations/md5.py | 29 +++++++++++++--------- blobxfer/operations/upload.py | 14 +++++++---- setup.py | 2 +- tests/test_blobxfer_operations_download.py | 14 +++++------ tests/test_blobxfer_operations_md5.py | 15 ++++++----- tests/test_blobxfer_operations_upload.py | 12 ++++----- 9 files changed, 90 insertions(+), 55 deletions(-) diff --git a/blobxfer/models/offload.py b/blobxfer/models/offload.py index 565df0b..0840441 100644 --- a/blobxfer/models/offload.py +++ b/blobxfer/models/offload.py @@ -43,6 +43,11 @@ logger = logging.getLogger(__name__) class _MultiprocessOffload(object): + __slots__ = [ + '_task_queue', '_done_queue', '_done_cv', '_term_signal', '_procs', + '_check_thread' + ] + def __init__(self, target, num_workers, description=None): # type: (_MultiprocessOffload, function, int, str) -> None """Ctor for Multiprocess Offload @@ -92,7 +97,15 @@ class _MultiprocessOffload(object): logger.debug('initializing {}{} processes'.format( num_workers, ' ' + description if not None else '')) for _ in range(num_workers): - proc = multiprocessing.Process(target=target) + proc = multiprocessing.Process( + target=target, + args=( + self._term_signal, + self._task_queue, + self._done_cv, + self._done_queue + ) + ) proc.start() self._procs.append(proc) diff --git a/blobxfer/operations/crypto.py b/blobxfer/operations/crypto.py index e1f1c38..ab266be 100644 --- a/blobxfer/operations/crypto.py +++ b/blobxfer/operations/crypto.py @@ -235,16 +235,21 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload): :param CryptoOffload self: this :param int num_workers: number of worker processes """ - super().__init__(self._worker_process, num_workers, 'Crypto') + super().__init__(CryptoOffload._worker_process, num_workers, 'Crypto') - def _worker_process(self): - # type: (CryptoOffload) -> None + @staticmethod + def _worker_process(term_signal, task_queue, done_cv, done_queue): + # type: (multiprocessing.Value, multiprocessing.Queue, + # multiprocessing.Condition, multiprocessing.Queue) -> None """Crypto worker - :param CryptoOffload self: this + :param multiprocessing.Value term_signal: termination signal + :param multiprocessing.Queue task_queue: task queue + :param multiprocessing.Condition done_cv: done condition variable + :param multiprocessing.Queue done_queue: done queue """ - while not self.terminated: + while term_signal.value != 1: try: - inst = self._task_queue.get(True, 0.1) + inst = task_queue.get(True, 0.1) except queue.Empty: continue # UNUSED due to AES256-CBC FullBlob mode @@ -259,8 +264,8 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload): mode='wb', delete=False) as fd: fpath = fd.name fd.write(encdata) - self._done_cv.acquire() - self._done_queue.put(fpath) + done_cv.acquire() + done_queue.put(fpath) elif inst[0] == CryptoAction.Decrypt: final_path, internal_fdstart, offsets, symkey, iv, \ hmac_datafile = \ @@ -275,11 +280,11 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload): with open(final_path, 'r+b') as fd: fd.seek(internal_fdstart + offsets.fd_start, 0) fd.write(data) - self._done_cv.acquire() - self._done_queue.put((final_path, offsets)) + done_cv.acquire() + done_queue.put((final_path, offsets)) # notify and release condition var - self._done_cv.notify() - self._done_cv.release() + done_cv.notify() + done_cv.release() def add_decrypt_chunk( self, final_path, internal_fdstart, offsets, symkey, iv, diff --git a/blobxfer/operations/download.py b/blobxfer/operations/download.py index aeb6d17..e08c31e 100644 --- a/blobxfer/operations/download.py +++ b/blobxfer/operations/download.py @@ -297,7 +297,7 @@ class Downloader(object): key = blobxfer.operations.download.Downloader.\ create_unique_transfer_operation_id(rfile) with self._md5_meta_lock: - self._md5_map[key] = rfile + self._md5_map[key] = (rfile, md5) slpath = str(lpath) # temporarily create a download descriptor view for vectored io if rfile.vectored_io is not None: @@ -312,17 +312,22 @@ class Downloader(object): self._md5_offload.add_localfile_for_md5_check( key, slpath, fpath, md5, rfile.mode, view) - def _post_md5_skip_on_check(self, key, filename, size, md5_match): - # type: (Downloader, str, str, int, bool) -> None + def _post_md5_skip_on_check( + self, key, filename, size, local_md5, md5_match): + # type: (Downloader, str, str, int, str, bool) -> None """Perform post MD5 skip on check :param Downloader self: this :param str key: md5 map key :param str filename: local filename :param int size: size of checked data + :param str local_md5: local md5 :param bool md5_match: if MD5 matches """ with self._md5_meta_lock: - rfile = self._md5_map.pop(key) + rfile, remote_md5 = self._md5_map.pop(key) + if self._general_options.verbose: + logger.debug('pre-transfer MD5 check: {} {} {}'.format( + local_md5, remote_md5, filename)) lpath = pathlib.Path(filename) if md5_match: if size is None: @@ -372,7 +377,7 @@ class Downloader(object): cv.release() if result is not None: self._post_md5_skip_on_check( - result[0], result[1], result[2], result[3]) + result[0], result[1], result[2], result[3], result[4]) def _check_for_crypto_done(self): # type: (Downloader) -> None diff --git a/blobxfer/operations/md5.py b/blobxfer/operations/md5.py index 5b5d6b2..a5c6343 100644 --- a/blobxfer/operations/md5.py +++ b/blobxfer/operations/md5.py @@ -123,17 +123,24 @@ class LocalFileMd5Offload(blobxfer.models.offload._MultiprocessOffload): :param LocalFileMd5Offload self: this :param int num_workers: number of worker processes """ - super().__init__(self._worker_process, num_workers, 'MD5') + super().__init__( + LocalFileMd5Offload._worker_process, num_workers, 'MD5' + ) - def _worker_process(self): - # type: (LocalFileMd5Offload) -> None + @staticmethod + def _worker_process(term_signal, task_queue, done_cv, done_queue): + # type: (multiprocessing.Value, multiprocessing.Queue, + # multiprocessing.Condition, multiprocessing.Queue) -> None """Compute MD5 for local file - :param LocalFileMd5Offload self: this + :param multiprocessing.Value term_signal: termination signal + :param multiprocessing.Queue task_queue: task queue + :param multiprocessing.Condition done_cv: done condition variable + :param multiprocessing.Queue done_queue: done queue """ - while not self.terminated: + while term_signal.value != 1: try: key, lpath, fpath, remote_md5, pagealign, lpview = \ - self._task_queue.get(True, 0.1) + task_queue.get(True, 0.1) except queue.Empty: continue if lpview is None: @@ -146,12 +153,10 @@ class LocalFileMd5Offload(blobxfer.models.offload._MultiprocessOffload): size = end - start md5 = blobxfer.operations.md5.compute_md5_for_file_asbase64( fpath, pagealign, start, end) - logger.debug('pre-transfer MD5 check: {} {} {}'.format( - md5, remote_md5, fpath)) - self._done_cv.acquire() - self._done_queue.put((key, lpath, size, md5 == remote_md5)) - self._done_cv.notify() - self._done_cv.release() + done_cv.acquire() + done_queue.put((key, lpath, size, md5, md5 == remote_md5)) + done_cv.notify() + done_cv.release() def add_localfile_for_md5_check( self, key, lpath, fpath, remote_md5, mode, lpview): diff --git a/blobxfer/operations/upload.py b/blobxfer/operations/upload.py index abcc70a..8f2f9da 100644 --- a/blobxfer/operations/upload.py +++ b/blobxfer/operations/upload.py @@ -226,20 +226,24 @@ class Uploader(object): md5 = blobxfer.models.metadata.get_md5_from_metadata(rfile) key = blobxfer.operations.upload.Uploader.create_unique_id(src, rfile) with self._md5_meta_lock: - self._md5_map[key] = (src, rfile) + self._md5_map[key] = (src, rfile, md5) self._md5_offload.add_localfile_for_md5_check( key, None, str(src.absolute_path), md5, rfile.mode, src.view) - def _post_md5_skip_on_check(self, key, md5_match): - # type: (Uploader, str, bool) -> None + def _post_md5_skip_on_check(self, key, local_md5, md5_match): + # type: (Uploader, str, str, bool) -> None """Perform post MD5 skip on check :param Uploader self: this :param str key: md5 map key + :param str local_md5: local md5 :param bool md5_match: if MD5 matches """ with self._md5_meta_lock: - src, rfile = self._md5_map.pop(key) + src, rfile, remote_md5 = self._md5_map.pop(key) uid = blobxfer.operations.upload.Uploader.create_unique_id(src, rfile) + if self._general_options.verbose: + logger.debug('pre-transfer MD5 check: {} {} {}'.format( + local_md5, remote_md5, src.relative_path)) if md5_match: with self._upload_lock: self._upload_set.remove(uid) @@ -278,7 +282,7 @@ class Uploader(object): break cv.release() if result is not None: - self._post_md5_skip_on_check(result[0], result[3]) + self._post_md5_skip_on_check(result[0], result[3], result[4]) def _add_to_upload_queue(self, src, rfile, uid): # type: (Uploader, blobxfer.models.upload.LocalPath, diff --git a/setup.py b/setup.py index f38fdec..03e7bbb 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ install_requires = [ 'pathlib2>=2.3.4;python_version<"3.5"', 'python-dateutil~=2.8.0', 'requests~=2.22.0', - 'ruamel.yaml~=0.15.100', + 'ruamel.yaml~=0.16.4', 'scandir>=1.10.0;python_version<"3.5"', ] diff --git a/tests/test_blobxfer_operations_download.py b/tests/test_blobxfer_operations_download.py index 92fbd4f..eeb3983 100644 --- a/tests/test_blobxfer_operations_download.py +++ b/tests/test_blobxfer_operations_download.py @@ -457,20 +457,20 @@ def test_post_md5_skip_on_check(tmpdir): assert key in d._md5_map d._general_options.dry_run = True - d._post_md5_skip_on_check(key, lpath, None, True) + d._post_md5_skip_on_check(key, lpath, None, 'abc', True) assert key not in d._md5_map d._general_options.dry_run = False d._add_to_download_queue = mock.MagicMock() d._pre_md5_skip_on_check(lpath, rfile) d._transfer_set.add(key) - d._post_md5_skip_on_check(key, lpath, rfile._size, False) + d._post_md5_skip_on_check(key, lpath, rfile._size, 'labc', False) assert d._add_to_download_queue.call_count == 1 d._general_options.dry_run = True d._pre_md5_skip_on_check(lpath, rfile) d._transfer_set.add(key) - d._post_md5_skip_on_check(key, lpath, rfile._size, False) + d._post_md5_skip_on_check(key, lpath, rfile._size, 'labc', False) assert d._add_to_download_queue.call_count == 1 @@ -492,7 +492,7 @@ def test_check_for_downloads_from_md5(): d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.pop_done_queue.side_effect = [ None, - (key, lpath, rfile._size, False), + (key, lpath, rfile._size, 'labc', False), ] d._add_to_download_queue = mock.MagicMock() d._all_remote_files_processed = False @@ -507,13 +507,13 @@ def test_check_for_downloads_from_md5(): d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.dry_run = False - d._md5_map[key] = rfile + d._md5_map[key] = (rfile, 'labc') d._transfer_set.add(key) d._md5_offload = mock.MagicMock() d._md5_offload.done_cv = multiprocessing.Condition() d._md5_offload.pop_done_queue.side_effect = [ None, - (key, lpath, rfile._size, False), + (key, lpath, rfile._size, 'labc', False), ] d._add_to_download_queue = mock.MagicMock() patched_tc.side_effect = [False, False, True] @@ -527,7 +527,7 @@ def test_check_for_downloads_from_md5(): d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.dry_run = False - d._md5_map[key] = rfile + d._md5_map[key] = (rfile, 'labc') d._transfer_set.add(key) d._md5_offload = mock.MagicMock() d._md5_offload.done_cv = multiprocessing.Condition() diff --git a/tests/test_blobxfer_operations_md5.py b/tests/test_blobxfer_operations_md5.py index 0e17b56..e526b40 100644 --- a/tests/test_blobxfer_operations_md5.py +++ b/tests/test_blobxfer_operations_md5.py @@ -103,11 +103,12 @@ def test_from_add_to_done_non_pagealigned(tmpdir): time.sleep(0.3) i -= 1 continue - assert len(result) == 4 + assert len(result) == 5 assert result[0] == key assert result[1] == str(file) assert result[2] is None - assert result[3] + assert result[3] == remote_md5 + assert result[4] checked = True break assert checked @@ -148,11 +149,12 @@ def test_from_add_to_done_lpview(tmpdir): time.sleep(0.3) i -= 1 continue - assert len(result) == 4 + assert len(result) == 5 assert result[0] == key assert result[1] == str(file) assert result[2] == 3 - assert result[3] + assert result[3] == remote_md5 + assert result[4] checked = True break assert checked @@ -185,11 +187,12 @@ def test_from_add_to_done_pagealigned(tmpdir): time.sleep(0.3) i -= 1 continue - assert len(result) == 4 + assert len(result) == 5 assert result[0] == key assert result[1] == str(file) assert result[2] is None - assert result[3] + assert result[3] == remote_md5 + assert result[4] checked = True break assert checked diff --git a/tests/test_blobxfer_operations_upload.py b/tests/test_blobxfer_operations_upload.py index ebbb6d7..28c75ea 100644 --- a/tests/test_blobxfer_operations_upload.py +++ b/tests/test_blobxfer_operations_upload.py @@ -111,22 +111,22 @@ def test_post_md5_skip_on_check(): ase.path = 'asepath' id = ops.Uploader.create_unique_id(src, ase) - u._md5_map[id] = (src, ase) + u._md5_map[id] = (src, ase, 'md5') u._upload_set.add(id) u._upload_total += 1 u._general_options.dry_run = True - u._post_md5_skip_on_check(id, True) + u._post_md5_skip_on_check(id, 'md5', True) assert len(u._md5_map) == 0 assert id not in u._upload_set assert u._upload_total == 0 u._general_options.dry_run = False - u._md5_map[id] = (src, ase) + u._md5_map[id] = (src, ase, 'md5') u._upload_set.add(id) u._upload_total += 1 u._add_to_upload_queue = mock.MagicMock() - u._post_md5_skip_on_check(id, False) + u._post_md5_skip_on_check(id, 'lmd5', False) assert len(u._md5_map) == 0 assert id in u._upload_set assert u._upload_total == 1 @@ -134,11 +134,11 @@ def test_post_md5_skip_on_check(): u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) u._general_options.dry_run = True - u._md5_map[id] = (src, ase) + u._md5_map[id] = (src, ase, 'md5') u._upload_set.add(id) u._upload_total += 1 u._add_to_upload_queue = mock.MagicMock() - u._post_md5_skip_on_check(id, False) + u._post_md5_skip_on_check(id, 'lmd5', False) assert len(u._md5_map) == 0 assert id not in u._upload_set assert u._upload_total == 0