Bug 1447851 - Add content decoding for landfill
Documents from landfill will be decoded directly into their string representation. The logic for _parse_heka_record is generally unnecessary because fields are not extracted when dumped to landfill.
This commit is contained in:
Родитель
238d9cbb71
Коммит
3e48c04a51
|
@ -41,29 +41,17 @@ def _parse_heka_record(record):
|
|||
# Further special case: the content field (bytes) in landfill
|
||||
# messages is an unprocessed form of the data, usually the original
|
||||
# gzipped payload from the client.
|
||||
# (a) In this case we pass the byte string along as-is.
|
||||
elif field.name == 'content':
|
||||
payload = {"content": field.value_bytes[0]}
|
||||
break
|
||||
# (b) In this case we attempt to decompress it, and if that fails,
|
||||
#
|
||||
# We attempt to decompress it, and if that fails,
|
||||
# attempt to decode it as a UTF-8 string.
|
||||
elif field.name == 'content':
|
||||
try:
|
||||
string = zlib.decompress(field.value_bytes[0], 16+zlib.MAX_WBITS)
|
||||
except: # noqa
|
||||
except Exception as e: # noqa
|
||||
raise e
|
||||
string = field.value_bytes[0].decode('utf-8')
|
||||
payload = {"content": string}
|
||||
break
|
||||
# (c) In this case we attempt to decompress it, and if that fails,
|
||||
# attempt to decode it as a UTF-8 string, and if either of those
|
||||
# succeeds, parse it as the payload.
|
||||
elif field.name == 'content':
|
||||
try:
|
||||
string = zlib.decompress(field.value_bytes[0], 16+zlib.MAX_WBITS)
|
||||
except: # noqa
|
||||
string = field.value_bytes[0].decode('utf-8')
|
||||
payload = _parse_json(string)
|
||||
break
|
||||
|
||||
if payload is None:
|
||||
payload = {}
|
||||
|
|
|
@ -2,13 +2,24 @@
|
|||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
import copy
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import pytest
|
||||
import ujson
|
||||
from collections import namedtuple
|
||||
|
||||
import pytest
|
||||
from google.protobuf.message import DecodeError
|
||||
|
||||
from moztelemetry.heka import message_parser
|
||||
from moztelemetry.util.streaming_gzip import streaming_gzip_wrapper
|
||||
|
||||
Message = namedtuple("Message",
|
||||
["timestamp", "type", "hostname", "payload", "fields"])
|
||||
Field = namedtuple("Field",
|
||||
["name", "value_string", "value_type", "value_bytes"])
|
||||
Record = namedtuple("Record", ["message"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("heka_format,try_snappy,strict,expected_count,expected_exception", [
|
||||
# snappy disabled
|
||||
|
@ -97,49 +108,81 @@ def test_json_fallback():
|
|||
|
||||
|
||||
def test_json_keys():
|
||||
class Message():
|
||||
pass
|
||||
|
||||
class Field():
|
||||
pass
|
||||
|
||||
class Record():
|
||||
def __init__(self):
|
||||
self.message = Message()
|
||||
|
||||
record = Record()
|
||||
record.message.timestamp = 1
|
||||
record.message.type = "t"
|
||||
record.message.hostname = "h"
|
||||
record.message.payload = '{"a": 1}'
|
||||
|
||||
f1 = Field()
|
||||
f1.name = "f1.test"
|
||||
f1.value_string = ['{"b": "bee"}']
|
||||
f1.value_type = 0
|
||||
|
||||
# nested field
|
||||
f2 = Field()
|
||||
f2.name = "f2.nested.test"
|
||||
f2.value_string = ['{"c": "cat"}']
|
||||
f2.value_type = 0
|
||||
|
||||
record.message.fields = [f1, f2]
|
||||
|
||||
parsed = message_parser._parse_heka_record(record)
|
||||
message = Message(
|
||||
timestamp=1,
|
||||
type="t",
|
||||
hostname="h",
|
||||
payload='{"a": 1}',
|
||||
fields=[
|
||||
Field(
|
||||
name="f1.test",
|
||||
value_string=['{"b": "bee"}'],
|
||||
value_bytes=None,
|
||||
value_type=0,
|
||||
),
|
||||
# nested field
|
||||
Field(
|
||||
name="f2.nested.test",
|
||||
value_string=['{"c": "cat"}'],
|
||||
value_bytes=None,
|
||||
value_type=0
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
expected = {
|
||||
"a": 1,
|
||||
"f1": {"test": {"b": "bee"}},
|
||||
"f2": {"nested": {"test": {"c": "cat"}}},
|
||||
"meta": {
|
||||
"Timestamp": 1,
|
||||
"Type": "t",
|
||||
"Hostname": "h",
|
||||
}
|
||||
}
|
||||
expected["meta"] = {
|
||||
"Timestamp": 1,
|
||||
"Type": "t",
|
||||
"Hostname": "h",
|
||||
}
|
||||
|
||||
parsed = message_parser._parse_heka_record(Record(message))
|
||||
|
||||
serialized = json.dumps(parsed)
|
||||
e_serialized = json.dumps(expected)
|
||||
|
||||
assert serialized == e_serialized
|
||||
|
||||
|
||||
def test_landfill_message():
|
||||
# Landfill messages are tagged with meta-information and the contents are
|
||||
# directly gzipped into a content field.
|
||||
|
||||
def compress(string):
|
||||
out = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=out, mode="wb") as f:
|
||||
f.write(string)
|
||||
return out.getvalue()
|
||||
|
||||
message = Message(
|
||||
timestamp=1,
|
||||
type="t",
|
||||
hostname="h",
|
||||
payload=None,
|
||||
fields=[
|
||||
Field(
|
||||
name="content",
|
||||
value_string=None,
|
||||
value_bytes=[compress('{"b": "bee"}')],
|
||||
value_type=1
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
expected = {
|
||||
"meta": {
|
||||
"Timestamp": 1,
|
||||
"Type": "t",
|
||||
"Hostname": "h",
|
||||
},
|
||||
"content": '{"b": "bee"}',
|
||||
}
|
||||
|
||||
parsed = message_parser._parse_heka_record(Record(message))
|
||||
|
||||
assert json.dumps(parsed) == json.dumps(expected)
|
||||
|
|
Загрузка…
Ссылка в новой задаче