Batched ls support (#240)
* Added batched ls support * Added test utility to create files * Added test for batched ls * Updated version number
This commit is contained in:
Родитель
eee1a4b16f
Коммит
d64d9d836c
|
@ -3,6 +3,10 @@
|
|||
Release History
|
||||
===============
|
||||
|
||||
0.0.31 (2018-09-10)
|
||||
+++++++++++++++++++
|
||||
* Added support for batched ls
|
||||
|
||||
0.0.30 (2018-08-28)
|
||||
+++++++++++++++++++
|
||||
* Fixed .travis.yml order to add azure-nspg dependency
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
__version__ = "0.0.30"
|
||||
__version__ = "0.0.31"
|
||||
|
||||
from .core import AzureDLFileSystem
|
||||
from .multithread import ADLDownloader
|
||||
|
|
|
@ -111,7 +111,24 @@ class AzureDLFileSystem(object):
|
|||
return AzureDLFile(self, AzureDLPath(path), mode, blocksize=blocksize,
|
||||
delimiter=delimiter)
|
||||
|
||||
def _ls(self, path, invalidate_cache=True):
|
||||
def _ls_batched(self, path, batch_size=4000):
|
||||
"""Batched ListStatus calls. Internal Method"""
|
||||
if batch_size <= 1:
|
||||
raise ValueError("Batch size must be strictly greater than 1")
|
||||
parms = {'listSize': batch_size}
|
||||
ret = []
|
||||
data = [None]
|
||||
|
||||
while data:
|
||||
data = self.azure.call('LISTSTATUS', path, **parms)['FileStatuses']['FileStatus']
|
||||
ret.extend(data)
|
||||
if len(data) < batch_size:
|
||||
break
|
||||
parms['listAfter'] = ret[-1]['pathSuffix'] # Last path to be used as ListAfter
|
||||
|
||||
return ret
|
||||
|
||||
def _ls(self, path, invalidate_cache=True, batch_size=4000):
|
||||
""" List files at given path """
|
||||
path = AzureDLPath(path).trim()
|
||||
key = path.as_posix()
|
||||
|
@ -120,8 +137,7 @@ class AzureDLFileSystem(object):
|
|||
self.invalidate_cache(key)
|
||||
|
||||
if key not in self.dirs:
|
||||
out = self.azure.call('LISTSTATUS', key)
|
||||
self.dirs[key] = out['FileStatuses']['FileStatus']
|
||||
self.dirs[key] = self._ls_batched(key, batch_size=batch_size)
|
||||
for f in self.dirs[key]:
|
||||
f['name'] = (path / f['pathSuffix']).as_posix()
|
||||
return self.dirs[key]
|
||||
|
|
|
@ -230,7 +230,7 @@ class DatalakeRESTInterface:
|
|||
'DELETE': ('delete', set(), {'recursive'}),
|
||||
'GETCONTENTSUMMARY': ('get', set(), set()),
|
||||
'GETFILESTATUS': ('get', set(), set()),
|
||||
'LISTSTATUS': ('get', set(), set()),
|
||||
'LISTSTATUS': ('get', set(), {'listSize', 'listAfter'}),
|
||||
'MKDIRS': ('put', set(), set()),
|
||||
'OPEN': ('get', set(), {'offset', 'length', 'read', 'filesessionid'}),
|
||||
'RENAME': ('put', {'destination'}, {'destination'}),
|
||||
|
|
|
@ -13,7 +13,7 @@ import pytest
|
|||
import datetime
|
||||
from azure.datalake.store import utils
|
||||
from azure.datalake.store.exceptions import PermissionError, FileNotFoundError
|
||||
from tests.testing import azure, second_azure, azure_teardown, my_vcr, posix, tmpfile, working_dir
|
||||
from tests.testing import azure, second_azure, azure_teardown, my_vcr, posix, tmpfile, working_dir, create_files
|
||||
test_dir = working_dir()
|
||||
|
||||
a = posix(test_dir / 'a')
|
||||
|
@ -74,6 +74,26 @@ def test_ls_touch_invalidate_cache(azure, second_azure):
|
|||
assert set(d['name'] for d in L) == set([a, b])
|
||||
assert L == L_second
|
||||
|
||||
@my_vcr.use_cassette
|
||||
def test_ls_batched(azure):
|
||||
|
||||
test_dir = working_dir() / 'abc'
|
||||
azure.mkdir(test_dir)
|
||||
with azure_teardown(azure):
|
||||
test_size = 10
|
||||
assert azure._ls(test_dir, batch_size=10) == []
|
||||
create_files(azure, number_of_files = 10, prefix='123', root_path=test_dir)
|
||||
with pytest.raises(ValueError):
|
||||
assert len(azure._ls(test_dir, batch_size=1)) == test_size
|
||||
|
||||
assert len(azure._ls(test_dir, batch_size=9)) == test_size
|
||||
assert len(azure._ls(test_dir, batch_size=10)) == test_size
|
||||
assert len(azure._ls(test_dir, batch_size=11)) == test_size
|
||||
assert len(azure._ls(test_dir, batch_size=2)) == test_size
|
||||
assert len(azure._ls(test_dir, batch_size=100)) == test_size
|
||||
assert len(azure._ls(test_dir)) == test_size
|
||||
|
||||
|
||||
@my_vcr.use_cassette
|
||||
def test_rm(azure):
|
||||
with azure_teardown(azure):
|
||||
|
|
|
@ -89,9 +89,27 @@ def second_azure():
|
|||
|
||||
# Clear filesystem cache to ensure we capture all requests from a test
|
||||
fs.invalidate_cache()
|
||||
|
||||
yield fs
|
||||
|
||||
|
||||
@contextmanager
|
||||
def create_files(azure, number_of_files, root_path = working_dir(), prefix=''):
|
||||
import itertools
|
||||
from string import ascii_lowercase
|
||||
|
||||
def generate_paths():
|
||||
def iter_all_strings():
|
||||
for size in itertools.count(1):
|
||||
for s in itertools.product(ascii_lowercase, repeat=size):
|
||||
yield "".join(s)
|
||||
|
||||
for s in itertools.islice(iter_all_strings(), number_of_files):
|
||||
s = AzureDLPath(prefix + s + ".txt")
|
||||
yield root_path / s
|
||||
for f in generate_paths():
|
||||
azure.touch(f)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def azure_teardown(fs):
|
||||
try:
|
||||
|
|
Загрузка…
Ссылка в новой задаче