Remove backtracking.
This is a fix for https://bugzilla.mozilla.org/show_bug.cgi?id=1248845. Chunking was introduced to reduce the memory pressure. Since then some configuration changes have landed that deal with the issue in a more general way so that chunking should be no longer required.
This commit is contained in:
Родитель
d88f18dd98
Коммит
e019888629
2
setup.py
2
setup.py
|
@ -2,7 +2,7 @@ from distutils.core import setup
|
||||||
|
|
||||||
setup(name='telemetry-tools',
|
setup(name='telemetry-tools',
|
||||||
description='Utility code to work with Mozilla Telemetry data.',
|
description='Utility code to work with Mozilla Telemetry data.',
|
||||||
version='1.0.9',
|
version='1.1.0',
|
||||||
author='Mozilla',
|
author='Mozilla',
|
||||||
url='https://github.com/mozilla/telemetry-tools',
|
url='https://github.com/mozilla/telemetry-tools',
|
||||||
packages=['telemetry', 'telemetry.util'],
|
packages=['telemetry', 'telemetry.util'],
|
||||||
|
|
|
@ -18,43 +18,6 @@ from google.protobuf.message import DecodeError
|
||||||
_record_separator = 0x1e
|
_record_separator = 0x1e
|
||||||
|
|
||||||
|
|
||||||
class BacktrackableFile:
|
|
||||||
def __init__(self, stream):
|
|
||||||
self._stream = stream
|
|
||||||
self._buffer = StringIO()
|
|
||||||
|
|
||||||
def read(self, size):
|
|
||||||
buffer_data = self._buffer.read(size)
|
|
||||||
to_read = size - len(buffer_data)
|
|
||||||
|
|
||||||
if to_read == 0:
|
|
||||||
return buffer_data
|
|
||||||
|
|
||||||
stream_data = self._stream.read(to_read)
|
|
||||||
self._buffer.write(stream_data)
|
|
||||||
|
|
||||||
return buffer_data + stream_data
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self._buffer.close()
|
|
||||||
if type(self._stream) == boto.s3.key.Key:
|
|
||||||
if self._stream.resp: # Hack! Connections are kept around otherwise!
|
|
||||||
self._stream.resp.close()
|
|
||||||
|
|
||||||
self._stream.close(True)
|
|
||||||
else:
|
|
||||||
self._stream.close()
|
|
||||||
|
|
||||||
def backtrack(self):
|
|
||||||
buffer = self._buffer.getvalue()
|
|
||||||
index = buffer.find(chr(_record_separator), 1)
|
|
||||||
|
|
||||||
self._buffer = StringIO()
|
|
||||||
if index >= 0:
|
|
||||||
self._buffer.write(buffer[index:])
|
|
||||||
self._buffer.seek(0)
|
|
||||||
|
|
||||||
|
|
||||||
class UnpackedRecord():
|
class UnpackedRecord():
|
||||||
def __init__(self, raw, header, message=None, error=None):
|
def __init__(self, raw, header, message=None, error=None):
|
||||||
self.raw = raw
|
self.raw = raw
|
||||||
|
@ -168,7 +131,7 @@ def unpack_string(string, **kwargs):
|
||||||
return unpack(StringIO(string), **kwargs)
|
return unpack(StringIO(string), **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_snappy=True):
|
def unpack(fin, raw=False, verbose=False, strict=False, try_snappy=True):
|
||||||
record_count = 0
|
record_count = 0
|
||||||
bad_records = 0
|
bad_records = 0
|
||||||
total_bytes = 0
|
total_bytes = 0
|
||||||
|
@ -184,10 +147,6 @@ def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_sna
|
||||||
elif verbose:
|
elif verbose:
|
||||||
print e
|
print e
|
||||||
|
|
||||||
if backtrack and type(e) == DecodeError:
|
|
||||||
fin.backtrack()
|
|
||||||
continue
|
|
||||||
|
|
||||||
if r is None:
|
if r is None:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
|
@ -52,28 +52,6 @@ class TestHekaMessage(unittest.TestCase):
|
||||||
threw = True
|
threw = True
|
||||||
self.assertEquals(expected_exceptions[t], threw)
|
self.assertEquals(expected_exceptions[t], threw)
|
||||||
|
|
||||||
def test_backtracking_with_initial_separator(self):
|
|
||||||
# Test backtracking when the separator appears at the first character
|
|
||||||
w = hm.BacktrackableFile(StringIO("\x1eFOOBAR"))
|
|
||||||
self.assertEquals("\x1eFOOB", w.read(5))
|
|
||||||
w.backtrack()
|
|
||||||
self.assertEquals("AR", w.read(5))
|
|
||||||
|
|
||||||
def test_backtracking_with_mid_separator(self):
|
|
||||||
# Test backtracking when separator was read
|
|
||||||
w = hm.BacktrackableFile(StringIO("FOOBAR\x1eFOOBAR"))
|
|
||||||
self.assertEquals("FOOBAR\x1eFOO", w.read(10))
|
|
||||||
w.backtrack()
|
|
||||||
self.assertEquals("\x1eFOOBAR", w.read(10))
|
|
||||||
|
|
||||||
def test_backtracking_without_separator(self):
|
|
||||||
# Test backtracking when separator wasn't read
|
|
||||||
w = hm.BacktrackableFile(StringIO("FOOBAR\x1eFOOBAR"))
|
|
||||||
self.assertEquals("FOOBA", w.read(5))
|
|
||||||
w.backtrack()
|
|
||||||
self.assertEquals("R\x1eFOO", w.read(5))
|
|
||||||
self.assertEquals("BAR", w.read(5))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче