version 0.0.11 of azure-datalake-store package
* Address a request to indicate that files being downloaded are in progress when being downloaded. * All files will have a suffix `.inprogress` until they are fully downloaded, at which point the suffix will be removed.
This commit is contained in:
Родитель
579c4f2863
Коммит
9d4942e2cd
|
@ -2,6 +2,10 @@
|
|||
|
||||
Release History
|
||||
===============
|
||||
0.0.11 (2017-06-02)
|
||||
-------------------
|
||||
* Update to name incomplete file downloads with a `.inprogress` suffix. This suffix is removed when the download completes successfully.
|
||||
|
||||
0.0.10 (2017-05-24)
|
||||
-------------------
|
||||
* Allow users to explicitly use or invalidate the internal, local cache of the filesystem that is built up from previous `ls` calls. It is now set to always call the service instead of the cache by default.
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
__version__ = "0.0.10"
|
||||
__version__ = "0.0.11"
|
||||
|
||||
from .core import AzureDLFileSystem
|
||||
from .multithread import ADLDownloader
|
||||
|
|
|
@ -191,11 +191,11 @@ class ADLDownloader(object):
|
|||
rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True)
|
||||
if len(rfiles) > 1:
|
||||
prefix = commonprefix([f['name'] for f in rfiles])
|
||||
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f)
|
||||
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', prefix)), f)
|
||||
for f in rfiles]
|
||||
elif len(rfiles) == 1:
|
||||
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
|
||||
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'])),
|
||||
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'] + '.inprogress')),
|
||||
rfiles[0])]
|
||||
else:
|
||||
file_pairs = [(self.lpath, rfiles[0])]
|
||||
|
@ -208,8 +208,11 @@ class ADLDownloader(object):
|
|||
|
||||
existing_files = []
|
||||
for lfile, rfile in file_pairs:
|
||||
if not self._overwrite and os.path.exists(lfile):
|
||||
existing_files.append(lfile)
|
||||
# only interested in the final destination file name for existence,
|
||||
# not the initial inprogress target
|
||||
destination_file = lfile.replace('.inprogress', '')
|
||||
if not self._overwrite and os.path.exists(destination_file):
|
||||
existing_files.append(destination_file)
|
||||
else:
|
||||
self.client.submit(rfile['name'], lfile, rfile['length'])
|
||||
|
||||
|
@ -254,7 +257,6 @@ class ADLDownloader(object):
|
|||
|
||||
__repr__ = __str__
|
||||
|
||||
|
||||
def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
|
||||
shutdown_event=None, retries=10, delay=0.01, backoff=3):
|
||||
""" Download a piece of a remote file and write locally
|
||||
|
|
|
@ -21,6 +21,7 @@ import threading
|
|||
import time
|
||||
import uuid
|
||||
import operator
|
||||
import os
|
||||
|
||||
from .exceptions import DatalakeIncompleteTransferException
|
||||
|
||||
|
@ -361,6 +362,25 @@ class ADLTransferClient(object):
|
|||
chunks=chunks,
|
||||
exception=self._files[key]['exception']))
|
||||
return files
|
||||
|
||||
def _rename_file(self, src, dst, overwrite=False):
|
||||
""" Rename a file from file_name.inprogress to just file_name. Invoked once download completes on a file.
|
||||
|
||||
Internal function used by `download`.
|
||||
"""
|
||||
try:
|
||||
# we do a final check to make sure someone didn't create the destination file while download was occuring
|
||||
# if the user did not specify overwrite.
|
||||
if os.path.isfile(dst):
|
||||
if not overwrite:
|
||||
raise FileExistsError(dst)
|
||||
os.remove(dst)
|
||||
os.rename(src, dst)
|
||||
except Exception as e:
|
||||
logger.error('Rename failed for source file: %r; %r', src, e)
|
||||
raise e
|
||||
|
||||
logger.debug('Renamed %r to %r', src, dst)
|
||||
|
||||
def _update(self, future):
|
||||
if future in self._cfutures:
|
||||
|
@ -405,6 +425,12 @@ class ADLTransferClient(object):
|
|||
overwrite=self._parent._overwrite)
|
||||
self._ffutures[merge_future] = parent
|
||||
else:
|
||||
if not self._chunked and str(dst).endswith('.inprogress'):
|
||||
logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
|
||||
self._fstates[parent] = 'merging'
|
||||
self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite)
|
||||
dst = dst.replace('.inprogress', '')
|
||||
|
||||
self._fstates[parent] = 'finished'
|
||||
logger.info("Transferred %s -> %s", src, dst)
|
||||
elif cstates.contains_none('running'):
|
||||
|
|
|
@ -154,7 +154,7 @@ def test_download_glob(tempdir, azure):
|
|||
assert len(file_pair_dict.keys()) == 2
|
||||
|
||||
lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
|
||||
assert sorted(lfiles) == sorted(['x.csv', 'y.csv'])
|
||||
assert sorted(lfiles) == sorted(['x.csv.inprogress', 'y.csv.inprogress'])
|
||||
|
||||
remote_path = test_dir / 'data' / '*' / '*.csv'
|
||||
down = ADLDownloader(azure, remote_path, tempdir, run=False,
|
||||
|
@ -165,10 +165,10 @@ def test_download_glob(tempdir, azure):
|
|||
|
||||
lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
|
||||
assert sorted(lfiles) == sorted([
|
||||
os.path.join('a', 'x.csv'),
|
||||
os.path.join('a', 'y.csv'),
|
||||
os.path.join('b', 'x.csv'),
|
||||
os.path.join('b', 'y.csv')])
|
||||
os.path.join('a', 'x.csv.inprogress'),
|
||||
os.path.join('a', 'y.csv.inprogress'),
|
||||
os.path.join('b', 'x.csv.inprogress'),
|
||||
os.path.join('b', 'y.csv.inprogress')])
|
||||
|
||||
remote_path = test_dir / 'data' / '*' / 'z.txt'
|
||||
down = ADLDownloader(azure, remote_path, tempdir, run=False,
|
||||
|
@ -178,8 +178,8 @@ def test_download_glob(tempdir, azure):
|
|||
|
||||
lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
|
||||
assert sorted(lfiles) == sorted([
|
||||
os.path.join('a', 'z.txt'),
|
||||
os.path.join('b', 'z.txt')])
|
||||
os.path.join('a', 'z.txt.inprogress'),
|
||||
os.path.join('b', 'z.txt.inprogress')])
|
||||
|
||||
|
||||
@my_vcr.use_cassette
|
||||
|
|
Загрузка…
Ссылка в новой задаче