bug 439050, add MozZipFile.py with tests, r=ted, dtownsend

This commit is contained in:
Axel Hecht 2008-09-19 18:19:52 +02:00
Родитель 3f2d4e1947
Коммит 821e61ed10
2 изменённых файлов: 384 добавлений и 0 удалений

150
config/MozZipFile.py Normal file
Просмотреть файл

@ -0,0 +1,150 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Mozilla build system.
#
# The Initial Developer of the Original Code is
# Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2007
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Axel Hecht <axel@pike.org>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import zipfile
import time
import binascii, struct
import zlib
class ZipFile(zipfile.ZipFile):
""" Class with methods to open, read, write, close, list zip files.
Subclassing zipfile.ZipFile to allow for overwriting of existing
entries, though only for writestr, not for write.
"""
def __init__(self, file, mode="r", compression=zipfile.ZIP_STORED):
zipfile.ZipFile.__init__(self, file, mode, compression)
self._remove = []
self.end = self.fp.tell()
self.debug = 0
def writestr(self, zinfo_or_arcname, bytes):
"""Write contents into the archive.
The contents is the argument 'bytes', 'zinfo_or_arcname' is either
a ZipInfo instance or the name of the file in the archive.
This method is overloaded to allow overwriting existing entries.
"""
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time()))
zinfo.compress_type = self.compression
# Add some standard UNIX file access permissions (-rw-r--r--).
zinfo.external_attr = (0x81a4 & 0xFFFF) << 16L
else:
zinfo = zinfo_or_arcname
# Now to the point why we overwrote this in the first place,
# remember the entry numbers if we already had this entry.
# Optimizations:
# If the entry to overwrite is the last one, just reuse that.
# If we store uncompressed and the new content has the same size
# as the old, reuse the existing entry.
doSeek = False # store if we need to seek to the eof after overwriting
if self.NameToInfo.has_key(zinfo.filename):
# Find the last ZipInfo with our name.
# Last, because that's catching multiple overwrites
i = len(self.filelist)
while i > 0:
i -= 1
if self.filelist[i].filename == zinfo.filename:
break
zi = self.filelist[i]
if ((zinfo.compress_type == zipfile.ZIP_STORED
and zi.compress_size == len(bytes))
or (i + 1) == len(self.filelist)):
# make sure we're allowed to write, otherwise done by writestr below
self._writecheck(zi)
# overwrite existing entry
self.fp.seek(zi.header_offset)
if (i + 1) == len(self.filelist):
# this is the last item in the file, just truncate
self.fp.truncate()
else:
# we need to move to the end of the file afterwards again
doSeek = True
# unhook the current zipinfo, the writestr of our superclass
# will add a new one
self.filelist.pop(i)
self.NameToInfo.pop(zinfo.filename)
else:
# Couldn't optimize, sadly, just remember the old entry for removal
self._remove.append(self.filelist.pop(i))
zipfile.ZipFile.writestr(self, zinfo, bytes)
self.filelist.sort(lambda l, r: cmp(l.header_offset, r.header_offset))
if doSeek:
self.fp.seek(self.end)
self.end = self.fp.tell()
def close(self):
"""Close the file, and for mode "w" and "a" write the ending
records.
Overwritten to compact overwritten entries.
"""
if not self._remove:
# we don't have anything special to do, let's just call base
return zipfile.ZipFile.close(self)
if self.fp.mode != 'r+b':
# adjust file mode if we originally just wrote, now we rewrite
self.fp.close()
self.fp = open(self.filename, 'r+b')
all = map(lambda zi: (zi, True), self.filelist) + \
map(lambda zi: (zi, False), self._remove)
all.sort(lambda l, r: cmp(l[0].header_offset, r[0].header_offset))
lengths = [all[i+1][0].header_offset - all[i][0].header_offset
for i in xrange(len(all)-1)]
lengths.append(self.end - all[-1][0].header_offset)
to_pos = 0
for (zi, keep), length in zip(all, lengths):
if not keep:
continue
oldoff = zi.header_offset
# python <= 2.4 has file_offset
if hasattr(zi, 'file_offset'):
zi.file_offset = zi.file_offset + to_pos - oldoff
zi.header_offset = to_pos
self.fp.seek(oldoff)
content = self.fp.read(length)
self.fp.seek(to_pos)
self.fp.write(content)
to_pos += length
self.fp.truncate()
zipfile.ZipFile.close(self)

Просмотреть файл

@ -0,0 +1,234 @@
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is Mozilla build system.
#
# The Initial Developer of the Original Code is
# Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2007
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Axel Hecht <axel@pike.org>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import unittest
import shutil
import os
import re
import sys
import random
import copy
from string import letters
'''
Test case infrastructure for MozZipFile.
This isn't really a unit test, but a test case generator and runner.
For a given set of files, lengths, and number of writes, we create
a testcase for every combination of the three. There are some
symmetries used to reduce the number of test cases, the first file
written is always the first file, the second is either the first or
the second, the third is one of the first three. That is, if we
had 4 files, but only three writes, the fourth file would never even
get tried.
The content written to the jars is pseudorandom with a fixed seed.
'''
if not __file__:
__file__ = sys.argv[0]
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from MozZipFile import ZipFile
import zipfile
leafs = (
'firstdir/oneleaf',
'seconddir/twoleaf',
'thirddir/with/sub/threeleaf')
_lengths = map(lambda n: n * 64, [16, 64, 80])
lengths = 3
writes = 5
def givenlength(i):
'''Return a length given in the _lengths array to allow manual
tuning of which lengths of zip entries to use.
'''
return _lengths[i]
def prod(*iterables):
''''Tensor product of a list of iterables.
This generator returns lists of items, one of each given
iterable. It iterates over all possible combinations.
'''
for item in iterables[0]:
if len(iterables) == 1:
yield [item]
else:
for others in prod(*iterables[1:]):
yield [item] + others
def getid(descs):
'Convert a list of ints to a string.'
return reduce(lambda x,y: x+'%d%d'%tuple(y), descs,'')
def getContent(length):
'Get pseudo random content of given length.'
rv = [None] * length
for i in xrange(length):
rv[i] = random.choice(letters)
return ''.join(rv)
def createWriter(sizer, *items):
'Helper method to fill in tests, one set of writes, one for each item'
locitems = copy.deepcopy(items)
for item in locitems:
item['length'] = sizer(item.pop('length', 0))
def helper(self):
mode = 'w'
if os.path.isfile(self.f):
mode = 'a'
zf = ZipFile(self.f, mode, self.compression)
for item in locitems:
self._write(zf, **item)
zf = None
pass
return helper
def createTester(name, *writes):
'''Helper method to fill in tests, calls into a list of write
helper methods.
'''
_writes = copy.copy(writes)
def tester(self):
for w in _writes:
getattr(self, w)()
self._verifyZip()
pass
# unit tests get confused if the method name isn't test...
tester.__name__ = name
return tester
class TestExtensiveStored(unittest.TestCase):
'''Unit tests for MozZipFile
The testcase are actually populated by code following the class
definition.
'''
stage = "mozzipfilestage"
compression = zipfile.ZIP_STORED
def leaf(self, *leafs):
return os.path.join(self.stage, *leafs)
def setUp(self):
if os.path.exists(self.stage):
shutil.rmtree(self.stage)
os.mkdir(self.stage)
self.f = self.leaf('test.jar')
self.ref = {}
self.seed = 0
def tearDown(self):
self.f = None
self.ref = None
def _verifyZip(self):
zf = zipfile.ZipFile(self.f)
badEntry = zf.testzip()
self.failIf(badEntry, badEntry)
zlist = zf.namelist()
zlist.sort()
vlist = self.ref.keys()
vlist.sort()
self.assertEqual(zlist, vlist)
for leaf, content in self.ref.iteritems():
zcontent = zf.read(leaf)
self.assertEqual(content, zcontent)
def _write(self, zf, seed=None, leaf=0, length=0):
if seed is None:
seed = self.seed
self.seed += 1
random.seed(seed)
leaf = leafs[leaf]
content = getContent(length)
self.ref[leaf] = content
zf.writestr(leaf, content)
dir = os.path.dirname(self.leaf('stage', leaf))
if not os.path.isdir(dir):
os.makedirs(dir)
open(self.leaf('stage', leaf), 'w').write(content)
# all leafs in all lengths
atomics = list(prod(xrange(len(leafs)), xrange(lengths)))
# populate TestExtensiveStore with testcases
for w in xrange(writes):
# Don't iterate over all files for the the first n passes,
# those are redundant as long as w < lengths.
# There are symmetries in the trailing end, too, but I don't know
# how to reduce those out right now.
nonatomics = [list(prod(range(min(i,len(leafs))), xrange(lengths)))
for i in xrange(1, w+1)] + [atomics]
for descs in prod(*nonatomics):
suffix = getid(descs)
dicts = [dict(leaf=leaf, length=length) for leaf, length in descs]
setattr(TestExtensiveStored, '_write' + suffix,
createWriter(givenlength, *dicts))
setattr(TestExtensiveStored, 'test' + suffix,
createTester('test' + suffix, '_write' + suffix))
# now create another round of tests, with two writing passes
# first, write all file combinations into the jar, close it,
# and then write all atomics again.
# This should catch more or less all artifacts generated
# by the final ordering step when closing the jar.
files = [list(prod([i], xrange(lengths))) for i in xrange(len(leafs))]
allfiles = reduce(lambda l,r:l+r,
[list(prod(*files[:(i+1)])) for i in xrange(len(leafs))])
for first in allfiles:
testbasename = 'test%s_' % getid(first)
test = [None, '_write' + getid(first), None]
for second in atomics:
test[0] = testbasename + getid([second])
test[2] = '_write' + getid([second])
setattr(TestExtensiveStored, test[0], createTester(*test))
class TestExtensiveDeflated(TestExtensiveStored):
'Test all that has been tested with ZIP_STORED with DEFLATED, too.'
compression = zipfile.ZIP_DEFLATED
if __name__ == '__main__':
unittest.main()