Improve upload performance for numerous files

- Do not perform remote file point query on overwrite
- Further improvements may include moving to a one-time list cache
This commit is contained in:
Fred Park 2019-09-05 17:05:18 +00:00
Родитель 2a798b26bb
Коммит c590f5b4dc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 3C4D545F457737EB
3 изменённых файлов: 23 добавлений и 9 удалений

Просмотреть файл

@ -820,7 +820,7 @@ class Downloader(object):
'was specified')
else:
logger.debug(
('{0} files {1:.4f} MiB filesize and/or lmt_ge '
('{0} files {1:.4f} MiB filesize, lmt_ge, or no overwrite '
'skipped').format(
skipped_files, skipped_size / blobxfer.util.MEGABYTE))
logger.debug(

Просмотреть файл

@ -886,6 +886,9 @@ class Uploader(object):
:rtype: blobxfer.models.azure.StorageEntity
:return: remote storage entity
"""
ase = None
if self._spec.options.overwrite or not sa.can_read_object:
return ase
if self._spec.options.mode == blobxfer.models.azure.StorageModes.File:
fp = blobxfer.operations.azure.file.get_file_properties(
sa.file_client, cont, name)
@ -918,8 +921,6 @@ class Uploader(object):
self._spec.options.store_file_properties.content_type or
blobxfer.util.get_mime_type(ase.name)
)
else:
ase = None
return ase
def _generate_destination_for_source(self, local_path):
@ -958,10 +959,7 @@ class Uploader(object):
VectoredIoDistributionMode.Stripe):
ase = None
else:
if sa.can_read_object:
ase = self._check_for_existing_remote(sa, cont, name)
else:
ase = None
ase = self._check_for_existing_remote(sa, cont, name)
if ase is None:
# encryption metadata will be populated later, if required
ase = blobxfer.models.azure.StorageEntity(cont, ed=None)
@ -1212,7 +1210,7 @@ class Uploader(object):
'was specified')
else:
logger.debug(
('{0} files {1:.4f} MiB filesize and/or lmt_ge '
('{0} files {1:.4f} MiB filesize, lmt_ge, or no overwrite '
'skipped').format(
skipped_files, skipped_size / blobxfer.util.MEGABYTE))
logger.debug(

Просмотреть файл

@ -928,15 +928,22 @@ def test_check_upload_conditions(gmfm):
def test_check_for_existing_remote(gbp, gfp):
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = True
sa = mock.MagicMock()
sa.name = 'name'
sa.endpoint = 'ep'
sa.can_read_object = True
u._spec.options.mode = azmodels.StorageModes.File
gfp.return_value = None
assert u._check_for_existing_remote(sa, 'cont', 'name') is None
u._spec.options.overwrite = False
gfp.return_value = None
assert u._check_for_existing_remote(sa, 'cont', 'name') is None
with mock.patch(
'blobxfer.models.crypto.EncryptionMetadata.'
'encryption_metadata_exists', return_value=False):
@ -989,6 +996,7 @@ def test_check_for_existing_remote(gbp, gfp):
def test_generate_destination_for_source():
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._check_for_existing_remote = mock.MagicMock()
lp = mock.MagicMock()
@ -1041,7 +1049,7 @@ def test_generate_destination_for_source():
a, b = next(u._generate_destination_for_source(lp))
assert a == sa
assert b is not None
assert u._check_for_existing_remote.call_count == 1 # should not change
assert u._check_for_existing_remote.call_count == 2
def test_vectorize_and_bind():
@ -1080,6 +1088,7 @@ def test_vectorize_and_bind():
# no vectorization
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._spec.options.vectored_io.distribution_mode = \
models.VectoredIoDistributionMode.Disabled
u._check_upload_conditions = mock.MagicMock()
@ -1102,6 +1111,7 @@ def test_vectorize_and_bind():
# stripe vectorization 1 slice
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._check_upload_conditions = mock.MagicMock()
u._check_upload_conditions.return_value = ops.UploadAction.Upload
u._spec.options.vectored_io.distribution_mode = \
@ -1164,6 +1174,7 @@ def test_vectorize_and_bind():
# replication single target
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._spec.options.vectored_io.distribution_mode = \
models.VectoredIoDistributionMode.Replica
u._check_upload_conditions = mock.MagicMock()
@ -1216,6 +1227,7 @@ def test_run(lfmo, urm, tmpdir):
u._general_options.concurrency.md5_processes = 1
u._general_options.concurrency.crypto_processes = 1
u._general_options.resume_file = 'resume'
u._spec.options.overwrite = False
u._spec.options.store_file_properties.md5 = True
u._spec.skip_on.md5_match = True
u._spec.options.rsa_public_key = 'abc'
@ -1313,6 +1325,7 @@ def test_run(lfmo, urm, tmpdir):
# regular execution
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._general_options.concurrency.disk_threads = 1
u._general_options.concurrency.transfer_threads = 1
u._general_options.concurrency.md5_processes = 1
@ -1352,6 +1365,7 @@ def test_run(lfmo, urm, tmpdir):
u._general_options.concurrency.md5_processes = 1
u._general_options.concurrency.crypto_processes = 0
u._general_options.resume_file = 'resume'
u._spec.options.overwrite = False
u._spec.options.store_file_properties.md5 = True
u._spec.skip_on.md5_match = True
u._spec.options.rsa_public_key = None
@ -1392,6 +1406,7 @@ def test_run(lfmo, urm, tmpdir):
u._general_options.concurrency.md5_processes = 1
u._general_options.concurrency.crypto_processes = 0
u._general_options.resume_file = 'resume'
u._spec.options.overwrite = False
u._spec.options.store_file_properties.md5 = True
u._spec.skip_on.md5_match = True
u._spec.options.rsa_public_key = None
@ -1455,6 +1470,7 @@ def test_run(lfmo, urm, tmpdir):
def test_start():
u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
u._general_options.dry_run = False
u._spec.options.overwrite = False
u._spec.options.delete_only = False
u._wait_for_transfer_threads = mock.MagicMock()
u._wait_for_disk_threads = mock.MagicMock()