* Fix for test case failure by adding randomized file path

* Fix for empty folder upload issue

* Fix chunked downloader to make block size requests
This commit is contained in:
akharit 2018-10-04 15:20:31 -07:00 коммит произвёл GitHub
Родитель d64d9d836c
Коммит 1f9fe1871b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 50 добавлений и 11 удалений

Просмотреть файл

@ -3,6 +3,12 @@
Release History
===============
0.0.32 (2018-10-04)
+++++++++++++++++++
* Fixed test bug
* Fixed empty folder upload bug
* Fixed ADL Downloader block size bug
0.0.31 (2018-09-10)
+++++++++++++++++++
* Added support for batched ls

Просмотреть файл

@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------
__version__ = "0.0.31"
__version__ = "0.0.32"
from .core import AzureDLFileSystem
from .multithread import ADLDownloader

Просмотреть файл

@ -289,17 +289,23 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
exponential_factor=backoff)
try:
nbytes = 0
with closing(_fetch_range(adlfs.azure, src, start=offset,
end=offset+size, stream=True, retry_policy=retry_policy)) as response:
with open(dst, 'rb+') as fout:
fout.seek(offset)
for chunk in response.iter_content(chunk_size=blocksize):
start = offset
with open(dst, 'rb+') as fout:
fout.seek(start)
while start < offset+size:
with closing(_fetch_range(adlfs.azure, src, start=start,
end=min(start+blocksize, offset+size), stream=True, retry_policy=retry_policy)) as response:
chunk = response.content
if shutdown_event and shutdown_event.is_set():
return total_bytes_downloaded, None
if chunk:
nwritten = fout.write(chunk)
if nwritten:
nbytes += nwritten
start += nwritten
else:
raise IOError("Failed to write to disk for {0} at location {1} with blocksize {2}".format(dst, start, blocksize))
logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset)
# There are certain cases where we will be throttled and recieve less than the expected amount of data.
@ -456,9 +462,12 @@ class ADLUploader(object):
"""
is_path_walk_empty = False
if "*" not in self.lpath:
out = os.walk(self.lpath)
lfiles = sum(([os.path.join(dir, f) for f in fnames] for
(dir, _, fnames) in out), [])
lfiles = []
for directory, subdir, fnames in os.walk(self.lpath):
lfiles.extend([os.path.join(directory, f) for f in fnames])
if not subdir and not fnames: # Empty Directory
self.client._adlfs._emptyDirs.append(directory)
if (not lfiles and os.path.exists(self.lpath) and
not os.path.isdir(self.lpath)):
lfiles = [self.lpath]
@ -502,6 +511,11 @@ class ADLUploader(object):
monitor: bool [True]
To watch and wait (block) until completion.
"""
for empty_directory in self.client._adlfs._empty_dirs_to_add():
local_rel_path = os.path.relpath(empty_directory, self.lpath)
rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix / local_rel_path)
self.client._adlfs.mkdir(rel_rpath)
self.client.run(nthreads, monitor)
def active(self):

Просмотреть файл

@ -836,7 +836,7 @@ def test_tail_head(azure):
@my_vcr.use_cassette
def test_read_delimited_block(azure):
fn = '/tmp/test/a'
fn = a
delimiter = b'\n'
data = delimiter.join([b'123', b'456', b'789'])
with azure_teardown(azure):

Просмотреть файл

@ -412,4 +412,23 @@ def test_download_root_folder(azure, tempdir):
rpath = AzureDLPath('/'/test_dir / 'data/single/single'/ 'single.txt')
ADLDownloader(azure, rpath=rpath, lpath=tempdir)
assert os.path.isfile(os.path.join(tempdir, 'single.txt'))
@my_vcr.use_cassette
def test_upload_empty_folder(tempdir, azure):
with azure_teardown(azure):
os.mkdir(os.path.join(tempdir, "dir1"))
os.mkdir(os.path.join(tempdir, "dir1", "b"))
with open(os.path.join(tempdir, "dir1", "file.txt"), 'wb') as f:
f.write(b'0123456789')
# transfer client w/ deterministic temporary directory
from azure.datalake.store.multithread import put_chunk
client = ADLTransferClient(azure, transfer=put_chunk,
unique_temporary=False)
# single chunk, empty file
up = ADLUploader(azure, test_dir / "dir1", os.path.join(tempdir, "dir1") , nthreads=1,
overwrite=True)
assert azure.info(test_dir / "dir1" /"b")['type'] == 'DIRECTORY'
azure.rm(test_dir / "dir1", recursive=True)