Treat content field as a string

This commit is contained in:
Mark Reid 2019-03-25 10:47:07 -03:00 коммит произвёл Anthony Miyaguchi
Родитель 84c500750f
Коммит 25fa836b26
2 изменённых файлов: 49 добавлений и 13 удалений

Просмотреть файл

@ -15,6 +15,7 @@ v3 - Retain whitelisted metadata fields and simplify schema
"""
import click
import json
from moztelemetry.dataset import Dataset
from pyspark.sql import Window, SparkSession
from pyspark.sql.functions import col, row_number
@ -42,6 +43,9 @@ GENERIC_DOC_VER = 2
GENERIC_DOC_ID = 3
UNPARSEABLE_TELEMETRY_VERSION = "0"
def extract(sc, submission_date, sample=0.01):
landfill = (
Dataset.from_source("landfill")
@ -51,10 +55,21 @@ def extract(sc, submission_date, sample=0.01):
return landfill
# Detect document version from the payload itself.
# Should match with the logic here:
# https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/decoders/moz_ingest/telemetry.lua#L162
def _detect_telemetry_version(content):
def _detect_telemetry_version(content_string):
"""Detect document version from the payload itself.
Should match with the logic here:
https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/decoders/moz_ingest/telemetry.lua#L162
If the given content string is not parseable as JSON,
default to a version of "0".
"""
if content_string is None:
return UNPARSEABLE_TELEMETRY_VERSION
try:
content = json.loads(content_string)
except ValueError:
return UNPARSEABLE_TELEMETRY_VERSION
if "ver" in content:
return str(content["ver"])
if "version" in content:

Просмотреть файл

@ -1,5 +1,6 @@
from copy import deepcopy
import json
import pytest
from pyspark.sql import functions as F
@ -9,6 +10,9 @@ from mozetl.landfill import sampler
@pytest.fixture
def sample_document():
return {
# The "content" field is provided here as a dict for
# convenience, but should be serialized to a JSON
# string before passing through to the sampler code.
"content": {"payload": {"foo": "bar"}},
"meta": {
u"Content-Length": u"7094",
@ -47,11 +51,28 @@ def build_generic_uri(namespace, doc_type, doc_version, doc_id):
return "/submit/{}/{}/{}/{}".format(namespace, doc_type, doc_version, doc_id)
def serialize_content(unserialized):
"""Encode the content field as JSON"""
doc = deepcopy(unserialized)
doc["content"] = json.dumps(doc["content"])
return doc
def test_detect_version_bad_json():
v = sampler._detect_telemetry_version("bla")
assert v == "0"
def test_detect_version_good_json():
v = sampler._detect_telemetry_version("{}")
assert v == "1"
def test_process_namespace_telemetry(sample_document):
# /submit/<namespace>/<doc_id>/<doc_type>/<app_name>/<app_version>/<app_channel>/<app_build_id>
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
sample_document["meta"]["uri"] = uri
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "1", "doc-id")
@ -59,7 +80,7 @@ def test_process_namespace_telemetry_deviceinfo(sample_document):
uri = "/submit/telemetry/doc-id/appusage/Firefox/61.0.2/release/20180807170231"
sample_document["meta"]["uri"] = uri
sample_document["content"]["deviceinfo"] = "foo"
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "appusage", "3", "doc-id")
@ -67,10 +88,10 @@ def test_process_namespace_telemetry_version(sample_document):
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
sample_document["meta"]["uri"] = uri
sample_document["content"]["version"] = 4
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "4", "doc-id")
sample_document["content"]["version"] = 5
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "5", "doc-id")
@ -78,7 +99,7 @@ def test_process_namespace_telemetry_ver(sample_document):
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
sample_document["meta"]["uri"] = uri
sample_document["content"]["ver"] = 6
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "6", "doc-id")
@ -86,7 +107,7 @@ def test_process_namespace_telemetry_v(sample_document):
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
sample_document["meta"]["uri"] = uri
sample_document["content"]["v"] = 7
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "7", "doc-id")
@ -98,7 +119,7 @@ def test_process_namespace_telemetry_ver_version_v(sample_document):
sample_document["content"]["version"] = 9
sample_document["content"]["v"] = 10
sample_document["content"]["deviceinfo"] = "foo"
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("telemetry", "main", "8", "doc-id")
@ -106,12 +127,12 @@ def test_process_namespace_generic(sample_document):
# /submit/<namespace>/<doc_type>/<doc_version>/<doc_id>
uri = "/submit/namespace/doc-type/doc-version/doc-id"
sample_document["meta"]["uri"] = uri
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
assert row[:4] == ("namespace", "doc-type", "doc-version", "doc-id")
def test_process_meta_ignores_identifiable_information(sample_document):
row = sampler._process(sample_document)
row = sampler._process(serialize_content(sample_document))
meta = row[4]
intersection = {"Hostname", "remote_addr", "X-Forwarded-For"} & set(meta.keys())
assert len(intersection) == 0