Bug 1602540 - mozpack/files.py and test_files.py support Python3 r=mars

Differential Revision: https://phabricator.services.mozilla.com/D57004

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Ricky Stewart 2019-12-18 23:23:12 +00:00
Родитель 606961d7d0
Коммит 7c927ba04b
6 изменённых файлов: 176 добавлений и 151 удалений

Просмотреть файл

@ -95,17 +95,21 @@ class Dest(object):
def name(self):
return self.path
def read(self, length=-1):
def read(self, length=-1, mode='rb'):
if self.mode != 'r':
self.file = open(self.path, 'rb')
self.file = open(self.path, mode)
self.mode = 'r'
return self.file.read(length)
def write(self, data):
def write(self, data, mode='wb'):
if self.mode != 'w':
self.file = open(self.path, 'wb')
self.file = open(self.path, mode)
self.mode = 'w'
return self.file.write(data)
if 'b' in mode:
to_write = six.ensure_binary(data)
else:
to_write = six.ensure_text(data)
return self.file.write(to_write)
def exists(self):
return os.path.exists(self.path)
@ -226,7 +230,8 @@ class BaseFile(object):
break
# If the read content differs between origin and destination,
# write what was read up to now, and copy the remainder.
if dest_content != src_content:
if (six.ensure_binary(dest_content) !=
six.ensure_binary(src_content)):
dest.write(copy_content)
shutil.copyfileobj(src, dest)
break
@ -234,14 +239,14 @@ class BaseFile(object):
shutil.copystat(self.path, dest.path)
return True
def open(self):
def open(self, mode='rb'):
'''
Return a file-like object allowing to read() the content of the
associated file. This is meant to be overloaded in subclasses to return
a custom file-like object.
'''
assert self.path is not None
return open(self.path, 'rb')
return open(self.path, mode=mode)
def read(self):
raise NotImplementedError('BaseFile.read() not implemented. Bug 1170329.')
@ -575,7 +580,7 @@ class PreprocessedFile(BaseFile):
# dependencies from that file to our list.
if self.depfile and os.path.exists(self.depfile):
target = mozpath.normpath(dest.name)
with open(self.depfile, 'rb') as fileobj:
with open(self.depfile, 'rt') as fileobj:
for rule in makeutil.read_dep_makefile(fileobj):
if target in rule.targets():
pp_deps.update(rule.dependencies())
@ -616,19 +621,23 @@ class GeneratedFile(BaseFile):
def __init__(self, content):
self._content = content
self._mode = 'rb'
@property
def content(self):
ensure = (six.ensure_binary if 'b' in self._mode else six.ensure_text)
if inspect.isfunction(self._content):
self._content = self._content()
return self._content
self._content = ensure(self._content())
return ensure(self._content)
@content.setter
def content(self, content):
self._content = content
def open(self):
return BytesIO(self.content)
def open(self, mode='rb'):
self._mode = mode
return (BytesIO(self.content) if 'b' in self._mode
else six.StringIO(self.content))
def read(self):
return self.content
@ -666,11 +675,11 @@ class ExtractedTarFile(GeneratedFile):
assert isinstance(info, TarInfo)
assert isinstance(tar, TarFile)
GeneratedFile.__init__(self, tar.extractfile(info).read())
self._mode = self.normalize_mode(info.mode)
self._unix_mode = self.normalize_mode(info.mode)
@property
def mode(self):
return self._mode
return self._unix_mode
def read(self):
return self.content
@ -755,13 +764,18 @@ class MinifiedProperties(BaseFile):
assert isinstance(file, BaseFile)
self._file = file
def open(self):
def open(self, mode='r'):
'''
Return a file-like object allowing to read() the minified content of
the properties file.
'''
return BytesIO(b''.join(l for l in self._file.open().readlines()
if not l.startswith(b'#')))
content = ''.join(
l for l in [
six.ensure_text(s) for s in self._file.open(mode).readlines()
] if not l.startswith('#'))
if 'b' in mode:
return BytesIO(six.ensure_binary(content))
return six.StringIO(content)
class MinifiedJavaScript(BaseFile):
@ -774,19 +788,19 @@ class MinifiedJavaScript(BaseFile):
self._file = file
self._verify_command = verify_command
def open(self):
output = BytesIO()
minify = JavascriptMinify(self._file.open(), output, quote_chars="'\"`")
def open(self, mode='r'):
output = six.StringIO()
minify = JavascriptMinify(self._file.open('r'), output, quote_chars="'\"`")
minify.minify()
output.seek(0)
if not self._verify_command:
return output
input_source = self._file.open().read()
input_source = self._file.open('r').read()
output_source = output.getvalue()
with NamedTemporaryFile() as fh1, NamedTemporaryFile() as fh2:
with NamedTemporaryFile('w+') as fh1, NamedTemporaryFile('w+') as fh2:
fh1.write(input_source)
fh2.write(output_source)
fh1.flush()
@ -795,7 +809,8 @@ class MinifiedJavaScript(BaseFile):
try:
args = list(self._verify_command)
args.extend([fh1.name, fh2.name])
subprocess.check_output(args, stderr=subprocess.STDOUT)
subprocess.check_output(args, stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as e:
errors.warn('JS minification verification failed for %s:' %
(getattr(self._file, 'path', '<unknown>')))
@ -1130,7 +1145,8 @@ class MercurialFile(BaseFile):
"""File class for holding data from Mercurial."""
def __init__(self, client, rev, path):
self._content = client.cat([path], rev=rev)
self._content = client.cat([six.ensure_binary(path)],
rev=six.ensure_binary(rev))
def read(self):
return self._content
@ -1169,16 +1185,18 @@ class MercurialRevisionFinder(BaseFinder):
self._client = hglib.open(path=repo, encoding=b'utf-8')
finally:
os.chdir(oldcwd)
self._rev = rev if rev is not None else b'.'
self._rev = rev if rev is not None else '.'
self._files = OrderedDict()
# Immediately populate the list of files in the repo since nearly every
# operation requires this list.
out = self._client.rawcommand([b'files', b'--rev', str(self._rev)])
out = self._client.rawcommand([
b'files', b'--rev', six.ensure_binary(self._rev),
])
for relpath in out.splitlines():
# Mercurial may use \ as path separator on Windows. So use
# normpath().
self._files[mozpath.normpath(relpath)] = None
self._files[six.ensure_text(mozpath.normpath(relpath))] = None
def _find(self, pattern):
if self._recognize_repo_paths:

Просмотреть файл

@ -278,9 +278,9 @@ class JarFileReader(object):
assert header['compression'] in [JAR_DEFLATED, JAR_STORED, JAR_BROTLI]
self._data = data
# Copy some local file header fields.
for name in ['filename', 'compressed_size',
'uncompressed_size', 'crc32']:
for name in ['compressed_size', 'uncompressed_size', 'crc32']:
setattr(self, name, header[name])
self.filename = six.ensure_text(header['filename'])
self.compressed = header['compression'] != JAR_STORED
self.compress = header['compression']
@ -413,9 +413,9 @@ class JarReader(object):
if (host == 0 and xattr & 0x10) or (host == 3 and
xattr & (0o040000 << 16)):
continue
entries[entry['filename']] = entry
entries[six.ensure_text(entry['filename'])] = entry
if entry['offset'] < preload:
self._last_preloaded = entry['filename']
self._last_preloaded = six.ensure_text(entry['filename'])
self._entries = entries
return entries
@ -563,7 +563,7 @@ class JarWriter(object):
header[name] = entry[name]
entry['offset'] = offset
offset += len(content) + header.size
if entry['filename'] == self._last_preloaded:
if six.ensure_text(entry['filename']) == self._last_preloaded:
preload_size = offset
headers[entry] = header
# Prepare end of central directory
@ -586,7 +586,10 @@ class JarWriter(object):
# Store local file entries followed by compressed data
for entry, content in six.itervalues(self._contents):
self._data.write(headers[entry].serialize())
self._data.write(content)
if isinstance(content, memoryview):
self._data.write(content.tobytes())
else:
self._data.write(content)
# On non optimized archives, store the central directory entries.
if not preload_size:
end['cdir_offset'] = offset
@ -615,7 +618,7 @@ class JarWriter(object):
JarFileReader instance. The latter two allow to avoid uncompressing
data to recompress it.
'''
name = mozpath.normsep(name)
name = mozpath.normsep(six.ensure_text(name))
if name in self._contents and not skip_duplicates:
raise JarWriterError("File %s already in JarWriter" % name)
@ -663,7 +666,7 @@ class JarWriter(object):
entry['crc32'] = deflater.crc32
entry['compressed_size'] = deflater.compressed_size
entry['uncompressed_size'] = deflater.uncompressed_size
entry['filename'] = name
entry['filename'] = six.ensure_binary(name)
self._contents[name] = entry, deflater.compressed_data
def preload(self, files):
@ -718,10 +721,10 @@ class Deflater(object):
'''
Append a buffer to the Deflater.
'''
self._data.write(data)
if isinstance(data, memoryview):
data = data.tobytes()
data = six.ensure_binary(data)
self._data.write(data)
if self.compress:
if self._deflater:

Просмотреть файл

@ -4,5 +4,6 @@ subsuite = mozbuild
[test_archive.py]
[test_chrome_flags.py]
[test_chrome_manifest.py]
[test_files.py]
[test_mozjar.py]
[test_path.py]

Просмотреть файл

@ -4,7 +4,6 @@ skip-if = python == 3
[test_copier.py]
[test_errors.py]
[test_files.py]
[test_manifests.py]
[test_packager.py]
[test_packager_formats.py]

Просмотреть файл

@ -147,20 +147,20 @@ class TestDest(TestWithTmpDir):
def test_dest(self):
dest = Dest(self.tmppath('dest'))
self.assertFalse(dest.exists())
dest.write('foo')
dest.write(b'foo')
self.assertTrue(dest.exists())
dest.write('foo')
self.assertEqual(dest.read(4), 'foof')
self.assertEqual(dest.read(), 'oo')
self.assertEqual(dest.read(), '')
dest.write('bar')
self.assertEqual(dest.read(4), 'bar')
dest.write(b'foo')
self.assertEqual(dest.read(4), b'foof')
self.assertEqual(dest.read(), b'oo')
self.assertEqual(dest.read(), b'')
dest.write(b'bar')
self.assertEqual(dest.read(4), b'bar')
dest.close()
self.assertEqual(dest.read(), 'bar')
dest.write('foo')
self.assertEqual(dest.read(), b'bar')
dest.write(b'foo')
dest.close()
dest.write('qux')
self.assertEqual(dest.read(), 'qux')
dest.write(b'qux')
self.assertEqual(dest.read(), b'qux')
rand = bytes(random.choice(b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')
@ -228,7 +228,7 @@ class TestFile(TestWithTmpDir):
Test whether File.open returns an appropriately reset file object.
'''
src = self.tmppath('src')
content = ''.join(samples)
content = b''.join(samples)
with open(src, 'wb') as tmp:
tmp.write(content)
@ -245,7 +245,7 @@ class TestFile(TestWithTmpDir):
dest = self.tmppath('dest')
with open(src, 'wb') as tmp:
tmp.write('test')
tmp.write(b'test')
# Initial copy
f = File(src)
@ -253,23 +253,23 @@ class TestFile(TestWithTmpDir):
# Ensure subsequent copies won't trigger writes
f.copy(DestNoWrite(dest))
self.assertEqual('test', open(dest, 'rb').read())
self.assertEqual(b'test', open(dest, 'rb').read())
# When the source file is newer, but with the same content, no copy
# should occur
time = os.path.getmtime(src) - 1
os.utime(dest, (time, time))
f.copy(DestNoWrite(dest))
self.assertEqual('test', open(dest, 'rb').read())
self.assertEqual(b'test', open(dest, 'rb').read())
# When the source file is older than the destination file, even with
# different content, no copy should occur.
with open(src, 'wb') as tmp:
tmp.write('fooo')
tmp.write(b'fooo')
time = os.path.getmtime(dest) - 1
os.utime(src, (time, time))
f.copy(DestNoWrite(dest))
self.assertEqual('test', open(dest, 'rb').read())
self.assertEqual(b'test', open(dest, 'rb').read())
# Double check that under conditions where a copy occurs, we would get
# an exception.
@ -279,7 +279,7 @@ class TestFile(TestWithTmpDir):
# skip_if_older=False is expected to force a copy in this situation.
f.copy(dest, skip_if_older=False)
self.assertEqual('fooo', open(dest, 'rb').read())
self.assertEqual(b'fooo', open(dest, 'rb').read())
class TestAbsoluteSymlinkFile(TestWithTmpDir):
@ -471,12 +471,12 @@ class TestPreprocessedFile(TestWithTmpDir):
dest = self.tmppath('dest')
with open(src, 'wb') as tmp:
tmp.write('#ifdef FOO\ntest\n#endif')
tmp.write(b'#ifdef FOO\ntest\n#endif')
f = PreprocessedFile(src, depfile_path=None, marker='#', defines={'FOO': True})
self.assertTrue(f.copy(dest))
self.assertEqual('test\n', open(dest, 'rb').read())
self.assertEqual(b'test\n', open(dest, 'rb').read())
def test_preprocess_file_no_write(self):
'''
@ -488,7 +488,7 @@ class TestPreprocessedFile(TestWithTmpDir):
depfile = self.tmppath('depfile')
with open(src, 'wb') as tmp:
tmp.write('#ifdef FOO\ntest\n#endif')
tmp.write(b'#ifdef FOO\ntest\n#endif')
# Initial copy
f = PreprocessedFile(src, depfile_path=depfile, marker='#', defines={'FOO': True})
@ -496,20 +496,20 @@ class TestPreprocessedFile(TestWithTmpDir):
# Ensure subsequent copies won't trigger writes
self.assertFalse(f.copy(DestNoWrite(dest)))
self.assertEqual('test\n', open(dest, 'rb').read())
self.assertEqual(b'test\n', open(dest, 'rb').read())
# When the source file is older than the destination file, even with
# different content, no copy should occur.
with open(src, 'wb') as tmp:
tmp.write('#ifdef FOO\nfooo\n#endif')
tmp.write(b'#ifdef FOO\nfooo\n#endif')
time = os.path.getmtime(dest) - 1
os.utime(src, (time, time))
self.assertFalse(f.copy(DestNoWrite(dest)))
self.assertEqual('test\n', open(dest, 'rb').read())
self.assertEqual(b'test\n', open(dest, 'rb').read())
# skip_if_older=False is expected to force a copy in this situation.
self.assertTrue(f.copy(dest, skip_if_older=False))
self.assertEqual('fooo\n', open(dest, 'rb').read())
self.assertEqual(b'fooo\n', open(dest, 'rb').read())
def test_preprocess_file_dependencies(self):
'''
@ -521,10 +521,10 @@ class TestPreprocessedFile(TestWithTmpDir):
deps = self.tmppath('src.pp')
with open(src, 'wb') as tmp:
tmp.write('#ifdef FOO\ntest\n#endif')
tmp.write(b'#ifdef FOO\ntest\n#endif')
with open(incl, 'wb') as tmp:
tmp.write('foo bar')
tmp.write(b'foo bar')
# Initial copy
f = PreprocessedFile(src, depfile_path=deps, marker='#', defines={'FOO': True})
@ -532,21 +532,21 @@ class TestPreprocessedFile(TestWithTmpDir):
# Update the source so it #includes the include file.
with open(src, 'wb') as tmp:
tmp.write('#include incl\n')
tmp.write(b'#include incl\n')
time = os.path.getmtime(dest) + 1
os.utime(src, (time, time))
self.assertTrue(f.copy(dest))
self.assertEqual('foo bar', open(dest, 'rb').read())
self.assertEqual(b'foo bar', open(dest, 'rb').read())
# If one of the dependencies changes, the file should be updated. The
# mtime of the dependency is set after the destination file, to avoid
# both files having the same time.
with open(incl, 'wb') as tmp:
tmp.write('quux')
tmp.write(b'quux')
time = os.path.getmtime(dest) + 1
os.utime(incl, (time, time))
self.assertTrue(f.copy(dest))
self.assertEqual('quux', open(dest, 'rb').read())
self.assertEqual(b'quux', open(dest, 'rb').read())
# Perform one final copy to confirm that we don't run the preprocessor
# again. We update the mtime of the destination so it's newer than the
@ -575,15 +575,15 @@ class TestPreprocessedFile(TestWithTmpDir):
self.assertTrue(os.path.islink(dest))
with open(pp_source, 'wb') as tmp:
tmp.write('#define FOO\nPREPROCESSED')
tmp.write(b'#define FOO\nPREPROCESSED')
f = PreprocessedFile(pp_source, depfile_path=deps, marker='#',
defines={'FOO': True})
self.assertTrue(f.copy(dest))
self.assertEqual('PREPROCESSED', open(dest, 'rb').read())
self.assertEqual(b'PREPROCESSED', open(dest, 'rb').read())
self.assertFalse(os.path.islink(dest))
self.assertEqual('', open(source, 'rb').read())
self.assertEqual(b'', open(source, 'rb').read())
class TestExistingFile(TestWithTmpDir):
@ -712,9 +712,10 @@ class TestDeflatedFile(TestWithTmpDir):
contents = {}
with JarWriter(src) as jar:
for content in samples:
name = b''.join(random.choice(
b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')
for i in range(8))
name = ''.join(
random.choice(
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
) for i in range(8))
jar.add(name, content, compress=True)
contents[name] = content
@ -811,7 +812,7 @@ class TestManifestFile(TestWithTmpDir):
)
f.copy(self.tmppath('chrome.manifest'))
content = open(self.tmppath('chrome.manifest')).read()
content = open(self.tmppath('chrome.manifest'), 'rb').read()
self.assertEqual(content[:42], f.open().read(42))
self.assertEqual(content, f.open().read())
@ -869,15 +870,15 @@ foo2_xpt = GeneratedFile(
class TestMinifiedProperties(TestWithTmpDir):
def test_minified_properties(self):
propLines = [
b'# Comments are removed',
b'foo = bar',
b'',
b'# Another comment',
'# Comments are removed',
'foo = bar',
'',
'# Another comment',
]
prop = GeneratedFile(b'\n'.join(propLines))
prop = GeneratedFile('\n'.join(propLines))
self.assertEqual(MinifiedProperties(prop).open().readlines(),
['foo = bar\n', '\n'])
open(self.tmppath('prop'), 'wb').write('\n'.join(propLines))
open(self.tmppath('prop'), 'w').write('\n'.join(propLines))
MinifiedProperties(File(self.tmppath('prop'))) \
.copy(self.tmppath('prop2'))
self.assertEqual(open(self.tmppath('prop2')).readlines(),
@ -886,15 +887,15 @@ class TestMinifiedProperties(TestWithTmpDir):
class TestMinifiedJavaScript(TestWithTmpDir):
orig_lines = [
b'// Comment line',
b'let foo = "bar";',
b'var bar = true;',
b'',
b'// Another comment',
'// Comment line',
'let foo = "bar";',
'var bar = true;',
'',
'// Another comment',
]
def test_minified_javascript(self):
orig_f = GeneratedFile(b'\n'.join(self.orig_lines))
orig_f = GeneratedFile('\n'.join(self.orig_lines))
min_f = MinifiedJavaScript(orig_f)
mini_lines = min_f.open().readlines()
@ -910,7 +911,7 @@ class TestMinifiedJavaScript(TestWithTmpDir):
]
def test_minified_verify_success(self):
orig_f = GeneratedFile(b'\n'.join(self.orig_lines))
orig_f = GeneratedFile('\n'.join(self.orig_lines))
min_f = MinifiedJavaScript(orig_f,
verify_command=self._verify_command('0'))
@ -919,7 +920,7 @@ class TestMinifiedJavaScript(TestWithTmpDir):
self.assertTrue(len(mini_lines) < len(self.orig_lines))
def test_minified_verify_failure(self):
orig_f = GeneratedFile(b'\n'.join(self.orig_lines))
orig_f = GeneratedFile('\n'.join(self.orig_lines))
errors.out = six.StringIO()
min_f = MinifiedJavaScript(orig_f,
verify_command=self._verify_command('1'))
@ -1020,7 +1021,7 @@ def do_check(test, finder, pattern, result):
class TestFileFinder(MatchTestTemplate, TestWithTmpDir):
def add(self, path):
ensureParentDir(self.tmppath(path))
open(self.tmppath(path), 'wb').write(path)
open(self.tmppath(path), 'wb').write(six.ensure_binary(path))
def do_check(self, pattern, result):
do_check(self, self.finder, pattern, result)
@ -1148,7 +1149,7 @@ class TestComposedFinder(MatchTestTemplate, TestWithTmpDir):
real_path = mozpath.join('a', path)
ensureParentDir(self.tmppath(real_path))
if not content:
content = path
content = six.ensure_binary(path)
open(self.tmppath(real_path), 'wb').write(content)
def do_check(self, pattern, result):
@ -1161,8 +1162,8 @@ class TestComposedFinder(MatchTestTemplate, TestWithTmpDir):
# Also add files in $tmp/a/foo/qux because ComposedFinder is
# expected to mask foo/qux entirely with content from $tmp/b.
ensureParentDir(self.tmppath('a/foo/qux/hoge'))
open(self.tmppath('a/foo/qux/hoge'), 'wb').write('hoge')
open(self.tmppath('a/foo/qux/bar'), 'wb').write('not the right content')
open(self.tmppath('a/foo/qux/hoge'), 'wb').write(b'hoge')
open(self.tmppath('a/foo/qux/bar'), 'wb').write(b'not the right content')
self.finder = ComposedFinder({
'': FileFinder(self.tmppath('a')),
'foo/qux': FileFinder(self.tmppath('b')),
@ -1174,6 +1175,8 @@ class TestComposedFinder(MatchTestTemplate, TestWithTmpDir):
@unittest.skipUnless(hglib, 'hglib not available')
@unittest.skipIf(six.PY3 and os.name == 'nt',
'Does not currently work in Python3 on Windows')
class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir):
def setUp(self):
super(TestMercurialRevisionFinder, self).setUp()
@ -1197,7 +1200,7 @@ class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir):
# b'' because py2 needs !unicode
b'ui.username="Dummy User <dummy@example.com>"',
)
client = hglib.open(self.tmpdir,
client = hglib.open(six.ensure_binary(self.tmpdir),
encoding=b'UTF-8', # b'' because py2 needs !unicode
configs=configs)
self._clients.append(client)
@ -1207,8 +1210,8 @@ class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir):
with self._client() as c:
ensureParentDir(self.tmppath(path))
with open(self.tmppath(path), 'wb') as fh:
fh.write(path)
c.add(self.tmppath(path))
fh.write(six.ensure_binary(path))
c.add(six.ensure_binary(self.tmppath(path)))
def do_check(self, pattern, result):
do_check(self, self.finder, pattern, result)
@ -1232,42 +1235,43 @@ class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir):
def test_old_revision(self):
with self._client() as c:
with open(self.tmppath('foo'), 'wb') as fh:
fh.write('foo initial')
c.add(self.tmppath('foo'))
fh.write(b'foo initial')
c.add(six.ensure_binary(self.tmppath('foo')))
c.commit('initial')
with open(self.tmppath('foo'), 'wb') as fh:
fh.write('foo second')
fh.write(b'foo second')
with open(self.tmppath('bar'), 'wb') as fh:
fh.write('bar second')
c.add(self.tmppath('bar'))
fh.write(b'bar second')
c.add(six.ensure_binary(self.tmppath('bar')))
c.commit('second')
# This wipes out the working directory, ensuring the finder isn't
# finding anything from the filesystem.
c.rawcommand(['update', 'null'])
c.rawcommand([b'update', b'null'])
finder = self._get_finder(self.tmpdir, 0)
finder = self._get_finder(self.tmpdir, '0')
f = finder.get('foo')
self.assertEqual(f.read(), 'foo initial')
self.assertEqual(f.read(), 'foo initial', 'read again for good measure')
self.assertEqual(f.read(), b'foo initial')
self.assertEqual(f.read(), b'foo initial',
'read again for good measure')
self.assertIsNone(finder.get('bar'))
finder = self._get_finder(self.tmpdir, rev=1)
finder = self._get_finder(self.tmpdir, rev='1')
f = finder.get('foo')
self.assertEqual(f.read(), 'foo second')
self.assertEqual(f.read(), b'foo second')
f = finder.get('bar')
self.assertEqual(f.read(), 'bar second')
self.assertEqual(f.read(), b'bar second')
f = None
def test_recognize_repo_paths(self):
with self._client() as c:
with open(self.tmppath('foo'), 'wb') as fh:
fh.write('initial')
c.add(self.tmppath('foo'))
fh.write(b'initial')
c.add(six.ensure_binary(self.tmppath('foo')))
c.commit('initial')
c.rawcommand(['update', 'null'])
c.rawcommand([b'update', b'null'])
finder = self._get_finder(self.tmpdir, 0,
finder = self._get_finder(self.tmpdir, '0',
recognize_repo_paths=True)
with self.assertRaises(NotImplementedError):
list(finder.find(''))
@ -1279,7 +1283,7 @@ class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir):
f = finder.get(self.tmppath('foo'))
self.assertIsInstance(f, MercurialFile)
self.assertEqual(f.read(), 'initial')
self.assertEqual(f.read(), b'initial')
f = None

Просмотреть файл

@ -151,23 +151,23 @@ class TestJar(unittest.TestCase):
files = [j for j in JarReader(fileobj=s)]
self.assertEqual(files[0].filename, b'foo')
self.assertEqual(files[0].filename, 'foo')
self.assertFalse(files[0].compressed)
self.assertEqual(files[0].read(), b'foo')
self.assertEqual(files[1].filename, b'bar')
self.assertEqual(files[1].filename, 'bar')
self.assertTrue(files[1].compressed)
self.assertEqual(files[1].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
self.assertEqual(files[2].filename, b'baz/qux')
self.assertEqual(files[2].filename, 'baz/qux')
self.assertFalse(files[2].compressed)
self.assertEqual(files[2].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
if os.sep == '\\':
self.assertEqual(files[3].filename, b'baz/backslash',
self.assertEqual(files[3].filename, 'baz/backslash',
'backslashes in filenames on Windows should get normalized')
else:
self.assertEqual(files[3].filename, b'baz\\backslash',
self.assertEqual(files[3].filename, 'baz\\backslash',
'backslashes in filenames on POSIX platform are untouched')
s = MockDest()
@ -179,47 +179,47 @@ class TestJar(unittest.TestCase):
jar = JarReader(fileobj=s)
files = [j for j in jar]
self.assertEqual(files[0].filename, b'bar')
self.assertEqual(files[0].filename, 'bar')
self.assertFalse(files[0].compressed)
self.assertEqual(files[0].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
self.assertEqual(files[1].filename, b'foo')
self.assertEqual(files[1].filename, 'foo')
self.assertFalse(files[1].compressed)
self.assertEqual(files[1].read(), b'foo')
self.assertEqual(files[2].filename, b'baz/qux')
self.assertEqual(files[2].filename, 'baz/qux')
self.assertTrue(files[2].compressed)
self.assertEqual(files[2].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
self.assertTrue(b'bar' in jar)
self.assertTrue(b'foo' in jar)
self.assertFalse(b'baz' in jar)
self.assertTrue(b'baz/qux' in jar)
self.assertTrue(jar[b'bar'], files[1])
self.assertTrue(jar[b'foo'], files[0])
self.assertTrue(jar[b'baz/qux'], files[2])
self.assertTrue('bar' in jar)
self.assertTrue('foo' in jar)
self.assertFalse('baz' in jar)
self.assertTrue('baz/qux' in jar)
self.assertTrue(jar['bar'], files[1])
self.assertTrue(jar['foo'], files[0])
self.assertTrue(jar['baz/qux'], files[2])
s.seek(0)
jar = JarReader(fileobj=s)
self.assertTrue(b'bar' in jar)
self.assertTrue(b'foo' in jar)
self.assertFalse(b'baz' in jar)
self.assertTrue(b'baz/qux' in jar)
self.assertTrue('bar' in jar)
self.assertTrue('foo' in jar)
self.assertFalse('baz' in jar)
self.assertTrue('baz/qux' in jar)
files[0].seek(0)
self.assertEqual(jar[b'bar'].filename, files[0].filename)
self.assertEqual(jar[b'bar'].compressed, files[0].compressed)
self.assertEqual(jar[b'bar'].read(), files[0].read())
self.assertEqual(jar['bar'].filename, files[0].filename)
self.assertEqual(jar['bar'].compressed, files[0].compressed)
self.assertEqual(jar['bar'].read(), files[0].read())
files[1].seek(0)
self.assertEqual(jar[b'foo'].filename, files[1].filename)
self.assertEqual(jar[b'foo'].compressed, files[1].compressed)
self.assertEqual(jar[b'foo'].read(), files[1].read())
self.assertEqual(jar['foo'].filename, files[1].filename)
self.assertEqual(jar['foo'].compressed, files[1].compressed)
self.assertEqual(jar['foo'].read(), files[1].read())
files[2].seek(0)
self.assertEqual(jar[b'baz/qux'].filename, files[2].filename)
self.assertEqual(jar[b'baz/qux'].compressed, files[2].compressed)
self.assertEqual(jar[b'baz/qux'].read(), files[2].read())
self.assertEqual(jar['baz/qux'].filename, files[2].filename)
self.assertEqual(jar['baz/qux'].compressed, files[2].compressed)
self.assertEqual(jar['baz/qux'].read(), files[2].read())
def test_rejar(self):
s = MockDest()
@ -236,15 +236,15 @@ class TestJar(unittest.TestCase):
jar = JarReader(fileobj=new)
files = [j for j in jar]
self.assertEqual(files[0].filename, b'foo')
self.assertEqual(files[0].filename, 'foo')
self.assertFalse(files[0].compressed)
self.assertEqual(files[0].read(), b'foo')
self.assertEqual(files[1].filename, b'bar')
self.assertEqual(files[1].filename, 'bar')
self.assertTrue(files[1].compressed)
self.assertEqual(files[1].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
self.assertEqual(files[2].filename, b'baz/qux')
self.assertEqual(files[2].filename, 'baz/qux')
self.assertTrue(files[2].compressed)
self.assertEqual(files[2].read(), b'aaaaaaaaaaaaanopqrstuvwxyz')
@ -258,7 +258,7 @@ class TestJar(unittest.TestCase):
jar = JarReader(fileobj=s)
files = [j for j in jar]
self.assertEqual(files[0].filename, b'test_data')
self.assertEqual(files[0].filename, 'test_data')
self.assertFalse(files[0].compressed)
self.assertEqual(files[0].read(), b'test_data')
@ -281,12 +281,12 @@ class TestPreload(unittest.TestCase):
jar.preload(['baz/qux', 'bar'])
jar = JarReader(fileobj=s)
self.assertEqual(jar.last_preloaded, b'bar')
self.assertEqual(jar.last_preloaded, 'bar')
files = [j for j in jar]
self.assertEqual(files[0].filename, b'baz/qux')
self.assertEqual(files[1].filename, b'bar')
self.assertEqual(files[2].filename, b'foo')
self.assertEqual(files[0].filename, 'baz/qux')
self.assertEqual(files[1].filename, 'bar')
self.assertEqual(files[2].filename, 'foo')
class TestJarLog(unittest.TestCase):