Fix Multiprocessing Offloads for Python 3.7+

- Resolves #104
This commit is contained in:
Fred Park 2019-08-16 14:36:32 -07:00
Родитель 4aaab4e7bb
Коммит 8f46622853
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 3C4D545F457737EB
9 изменённых файлов: 90 добавлений и 55 удалений

Просмотреть файл

@ -43,6 +43,11 @@ logger = logging.getLogger(__name__)
class _MultiprocessOffload(object):
__slots__ = [
'_task_queue', '_done_queue', '_done_cv', '_term_signal', '_procs',
'_check_thread'
]
def __init__(self, target, num_workers, description=None):
# type: (_MultiprocessOffload, function, int, str) -> None
"""Ctor for Multiprocess Offload
@ -92,7 +97,15 @@ class _MultiprocessOffload(object):
logger.debug('initializing {}{} processes'.format(
num_workers, ' ' + description if not None else ''))
for _ in range(num_workers):
proc = multiprocessing.Process(target=target)
proc = multiprocessing.Process(
target=target,
args=(
self._term_signal,
self._task_queue,
self._done_cv,
self._done_queue
)
)
proc.start()
self._procs.append(proc)

Просмотреть файл

@ -235,16 +235,21 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload):
:param CryptoOffload self: this
:param int num_workers: number of worker processes
"""
super().__init__(self._worker_process, num_workers, 'Crypto')
super().__init__(CryptoOffload._worker_process, num_workers, 'Crypto')
def _worker_process(self):
# type: (CryptoOffload) -> None
@staticmethod
def _worker_process(term_signal, task_queue, done_cv, done_queue):
# type: (multiprocessing.Value, multiprocessing.Queue,
# multiprocessing.Condition, multiprocessing.Queue) -> None
"""Crypto worker
:param CryptoOffload self: this
:param multiprocessing.Value term_signal: termination signal
:param multiprocessing.Queue task_queue: task queue
:param multiprocessing.Condition done_cv: done condition variable
:param multiprocessing.Queue done_queue: done queue
"""
while not self.terminated:
while term_signal.value != 1:
try:
inst = self._task_queue.get(True, 0.1)
inst = task_queue.get(True, 0.1)
except queue.Empty:
continue
# UNUSED due to AES256-CBC FullBlob mode
@ -259,8 +264,8 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload):
mode='wb', delete=False) as fd:
fpath = fd.name
fd.write(encdata)
self._done_cv.acquire()
self._done_queue.put(fpath)
done_cv.acquire()
done_queue.put(fpath)
elif inst[0] == CryptoAction.Decrypt:
final_path, internal_fdstart, offsets, symkey, iv, \
hmac_datafile = \
@ -275,11 +280,11 @@ class CryptoOffload(blobxfer.models.offload._MultiprocessOffload):
with open(final_path, 'r+b') as fd:
fd.seek(internal_fdstart + offsets.fd_start, 0)
fd.write(data)
self._done_cv.acquire()
self._done_queue.put((final_path, offsets))
done_cv.acquire()
done_queue.put((final_path, offsets))
# notify and release condition var
self._done_cv.notify()
self._done_cv.release()
done_cv.notify()
done_cv.release()
def add_decrypt_chunk(
self, final_path, internal_fdstart, offsets, symkey, iv,

Просмотреть файл

@ -297,7 +297,7 @@ class Downloader(object):
key = blobxfer.operations.download.Downloader.\
create_unique_transfer_operation_id(rfile)
with self._md5_meta_lock:
self._md5_map[key] = rfile
self._md5_map[key] = (rfile, md5)
slpath = str(lpath)
# temporarily create a download descriptor view for vectored io
if rfile.vectored_io is not None:
@ -312,17 +312,22 @@ class Downloader(object):
self._md5_offload.add_localfile_for_md5_check(
key, slpath, fpath, md5, rfile.mode, view)
def _post_md5_skip_on_check(self, key, filename, size, md5_match):
# type: (Downloader, str, str, int, bool) -> None
def _post_md5_skip_on_check(
self, key, filename, size, local_md5, md5_match):
# type: (Downloader, str, str, int, str, bool) -> None
"""Perform post MD5 skip on check
:param Downloader self: this
:param str key: md5 map key
:param str filename: local filename
:param int size: size of checked data
:param str local_md5: local md5
:param bool md5_match: if MD5 matches
"""
with self._md5_meta_lock:
rfile = self._md5_map.pop(key)
rfile, remote_md5 = self._md5_map.pop(key)
if self._general_options.verbose:
logger.debug('pre-transfer MD5 check: {} <L..R> {} {}'.format(
local_md5, remote_md5, filename))
lpath = pathlib.Path(filename)
if md5_match:
if size is None:
@ -372,7 +377,7 @@ class Downloader(object):
cv.release()
if result is not None:
self._post_md5_skip_on_check(
result[0], result[1], result[2], result[3])
result[0], result[1], result[2], result[3], result[4])
def _check_for_crypto_done(self):
# type: (Downloader) -> None

Просмотреть файл

@ -123,17 +123,24 @@ class LocalFileMd5Offload(blobxfer.models.offload._MultiprocessOffload):
:param LocalFileMd5Offload self: this
:param int num_workers: number of worker processes
"""
super().__init__(self._worker_process, num_workers, 'MD5')
super().__init__(
LocalFileMd5Offload._worker_process, num_workers, 'MD5'
)
def _worker_process(self):
# type: (LocalFileMd5Offload) -> None
@staticmethod
def _worker_process(term_signal, task_queue, done_cv, done_queue):
# type: (multiprocessing.Value, multiprocessing.Queue,
# multiprocessing.Condition, multiprocessing.Queue) -> None
"""Compute MD5 for local file
:param LocalFileMd5Offload self: this
:param multiprocessing.Value term_signal: termination signal
:param multiprocessing.Queue task_queue: task queue
:param multiprocessing.Condition done_cv: done condition variable
:param multiprocessing.Queue done_queue: done queue
"""
while not self.terminated:
while term_signal.value != 1:
try:
key, lpath, fpath, remote_md5, pagealign, lpview = \
self._task_queue.get(True, 0.1)
task_queue.get(True, 0.1)
except queue.Empty:
continue
if lpview is None:
@ -146,12 +153,10 @@ class LocalFileMd5Offload(blobxfer.models.offload._MultiprocessOffload):
size = end - start
md5 = blobxfer.operations.md5.compute_md5_for_file_asbase64(
fpath, pagealign, start, end)
logger.debug('pre-transfer MD5 check: {} <L..R> {} {}'.format(
md5, remote_md5, fpath))
self._done_cv.acquire()
self._done_queue.put((key, lpath, size, md5 == remote_md5))
self._done_cv.notify()
self._done_cv.release()
done_cv.acquire()
done_queue.put((key, lpath, size, md5, md5 == remote_md5))
done_cv.notify()
done_cv.release()
def add_localfile_for_md5_check(
self, key, lpath, fpath, remote_md5, mode, lpview):

Просмотреть файл

@ -226,20 +226,24 @@ class Uploader(object):
md5 = blobxfer.models.metadata.get_md5_from_metadata(rfile)
key = blobxfer.operations.upload.Uploader.create_unique_id(src, rfile)
with self._md5_meta_lock:
self._md5_map[key] = (src, rfile)
self._md5_map[key] = (src, rfile, md5)
self._md5_offload.add_localfile_for_md5_check(
key, None, str(src.absolute_path), md5, rfile.mode, src.view)
def _post_md5_skip_on_check(self, key, md5_match):
# type: (Uploader, str, bool) -> None
def _post_md5_skip_on_check(self, key, local_md5, md5_match):
# type: (Uploader, str, str, bool) -> None
"""Perform post MD5 skip on check
:param Uploader self: this
:param str key: md5 map key
:param str local_md5: local md5
:param bool md5_match: if MD5 matches
"""
with self._md5_meta_lock:
src, rfile = self._md5_map.pop(key)
src, rfile, remote_md5 = self._md5_map.pop(key)
uid = blobxfer.operations.upload.Uploader.create_unique_id(src, rfile)
if self._general_options.verbose:
logger.debug('pre-transfer MD5 check: {} <L..R> {} {}'.format(
local_md5, remote_md5, src.relative_path))
if md5_match:
with self._upload_lock:
self._upload_set.remove(uid)
@ -278,7 +282,7 @@ class Uploader(object):
break
cv.release()
if result is not None:
self._post_md5_skip_on_check(result[0], result[3])
self._post_md5_skip_on_check(result[0], result[3], result[4])
def _add_to_upload_queue(self, src, rfile, uid):
# type: (Uploader, blobxfer.models.upload.LocalPath,

Просмотреть файл

@ -38,7 +38,7 @@ install_requires = [
'pathlib2>=2.3.4;python_version<"3.5"',
'python-dateutil~=2.8.0',
'requests~=2.22.0',
'ruamel.yaml~=0.15.100',
'ruamel.yaml~=0.16.4',
'scandir>=1.10.0;python_version<"3.5"',
]

Просмотреть файл

@ -457,20 +457,20 @@ def test_post_md5_skip_on_check(tmpdir):
assert key in d._md5_map
d._general_options.dry_run = True
d._post_md5_skip_on_check(key, lpath, None, True)
d._post_md5_skip_on_check(key, lpath, None, 'abc', True)
assert key not in d._md5_map
d._general_options.dry_run = False
d._add_to_download_queue = mock.MagicMock()
d._pre_md5_skip_on_check(lpath, rfile)
d._transfer_set.add(key)
d._post_md5_skip_on_check(key, lpath, rfile._size, False)
d._post_md5_skip_on_check(key, lpath, rfile._size, 'labc', False)
assert d._add_to_download_queue.call_count == 1
d._general_options.dry_run = True
d._pre_md5_skip_on_check(lpath, rfile)
d._transfer_set.add(key)
d._post_md5_skip_on_check(key, lpath, rfile._size, False)
d._post_md5_skip_on_check(key, lpath, rfile._size, 'labc', False)
assert d._add_to_download_queue.call_count == 1
@ -492,7 +492,7 @@ def test_check_for_downloads_from_md5():
d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
(key, lpath, rfile._size, 'labc', False),
]
d._add_to_download_queue = mock.MagicMock()
d._all_remote_files_processed = False
@ -507,13 +507,13 @@ def test_check_for_downloads_from_md5():
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.dry_run = False
d._md5_map[key] = rfile
d._md5_map[key] = (rfile, 'labc')
d._transfer_set.add(key)
d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition()
d._md5_offload.pop_done_queue.side_effect = [
None,
(key, lpath, rfile._size, False),
(key, lpath, rfile._size, 'labc', False),
]
d._add_to_download_queue = mock.MagicMock()
patched_tc.side_effect = [False, False, True]
@ -527,7 +527,7 @@ def test_check_for_downloads_from_md5():
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.dry_run = False
d._md5_map[key] = rfile
d._md5_map[key] = (rfile, 'labc')
d._transfer_set.add(key)
d._md5_offload = mock.MagicMock()
d._md5_offload.done_cv = multiprocessing.Condition()

Просмотреть файл

@ -103,11 +103,12 @@ def test_from_add_to_done_non_pagealigned(tmpdir):
time.sleep(0.3)
i -= 1
continue
assert len(result) == 4
assert len(result) == 5
assert result[0] == key
assert result[1] == str(file)
assert result[2] is None
assert result[3]
assert result[3] == remote_md5
assert result[4]
checked = True
break
assert checked
@ -148,11 +149,12 @@ def test_from_add_to_done_lpview(tmpdir):
time.sleep(0.3)
i -= 1
continue
assert len(result) == 4
assert len(result) == 5
assert result[0] == key
assert result[1] == str(file)
assert result[2] == 3
assert result[3]
assert result[3] == remote_md5
assert result[4]
checked = True
break
assert checked
@ -185,11 +187,12 @@ def test_from_add_to_done_pagealigned(tmpdir):
time.sleep(0.3)
i -= 1
continue
assert len(result) == 4
assert len(result) == 5
assert result[0] == key
assert result[1] == str(file)
assert result[2] is None
assert result[3]
assert result[3] == remote_md5
assert result[4]
checked = True
break
assert checked

Просмотреть файл

@ -111,22 +111,22 @@ def test_post_md5_skip_on_check():
ase.path = 'asepath'
id = ops.Uploader.create_unique_id(src, ase)
u._md5_map[id] = (src, ase)
u._md5_map[id] = (src, ase, 'md5')
u._upload_set.add(id)
u._upload_total += 1
u._general_options.dry_run = True
u._post_md5_skip_on_check(id, True)
u._post_md5_skip_on_check(id, 'md5', True)
assert len(u._md5_map) == 0
assert id not in u._upload_set
assert u._upload_total == 0
u._general_options.dry_run = False
u._md5_map[id] = (src, ase)
u._md5_map[id] = (src, ase, 'md5')
u._upload_set.add(id)
u._upload_total += 1
u._add_to_upload_queue = mock.MagicMock()
u._post_md5_skip_on_check(id, False)
u._post_md5_skip_on_check(id, 'lmd5', False)
assert len(u._md5_map) == 0
assert id in u._upload_set
assert u._upload_total == 1
@ -134,11 +134,11 @@ def test_post_md5_skip_on_check():
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = True
u._md5_map[id] = (src, ase)
u._md5_map[id] = (src, ase, 'md5')
u._upload_set.add(id)
u._upload_total += 1
u._add_to_upload_queue = mock.MagicMock()
u._post_md5_skip_on_check(id, False)
u._post_md5_skip_on_check(id, 'lmd5', False)
assert len(u._md5_map) == 0
assert id not in u._upload_set
assert u._upload_total == 0