Revert "Add content-based version detection."
This reverts commit 7e4d61d3e4
.
This commit is contained in:
Родитель
1001d4e6fb
Коммит
eae4a8963e
|
@ -14,12 +14,17 @@ v2 - Addition of document version as a partition value
|
|||
v3 - Retain whitelisted metadata fields and simplify schema
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
import click
|
||||
from moztelemetry.dataset import Dataset
|
||||
from pyspark.sql import Window, SparkSession
|
||||
from pyspark.sql.functions import col, row_number
|
||||
from pyspark.sql.types import StructType, StructField, StringType
|
||||
|
||||
# regex for capturing the telemetry version from the uri arguments
|
||||
META_ARG_VERSION = re.compile(r"v=([\d]+)")
|
||||
|
||||
# whitelist for fields to keep from the ingestion metadata
|
||||
META_WHITELIST = {
|
||||
"Content-Length",
|
||||
|
@ -51,21 +56,6 @@ def extract(sc, submission_date, sample=0.01):
|
|||
return landfill
|
||||
|
||||
|
||||
# Detect document version from the payload itself.
|
||||
# Should match with the logic here:
|
||||
# https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/decoders/moz_ingest/telemetry.lua#L162
|
||||
def _detect_telemetry_version(content):
|
||||
if "ver" in content:
|
||||
return str(content["ver"])
|
||||
if "version" in content:
|
||||
return str(content["version"])
|
||||
if "deviceinfo" in content:
|
||||
return "3"
|
||||
if "v" in content:
|
||||
return str(content["v"])
|
||||
return "1"
|
||||
|
||||
|
||||
def _process(message):
|
||||
"""Process the URI specification from the tagged metadata
|
||||
|
||||
|
@ -80,18 +70,18 @@ def _process(message):
|
|||
# Some paths do not adhere to the spec, so append empty values to avoid index errors.
|
||||
path = meta["uri"].split("/")[2:] + [None, None, None, None]
|
||||
namespace = path[0]
|
||||
content = message.get("content")
|
||||
|
||||
if namespace == "telemetry":
|
||||
doc_type = path[TELEMETRY_DOC_TYPE]
|
||||
doc_version = _detect_telemetry_version(content)
|
||||
arg = META_ARG_VERSION.search(meta.get("args", ""))
|
||||
doc_version = arg.group(1) if arg else None
|
||||
doc_id = path[TELEMETRY_DOC_ID]
|
||||
else:
|
||||
doc_type = path[GENERIC_DOC_TYPE]
|
||||
doc_version = path[GENERIC_DOC_VER]
|
||||
doc_id = path[GENERIC_DOC_ID]
|
||||
|
||||
return namespace, doc_type, doc_version, doc_id, meta, content
|
||||
return namespace, doc_type, doc_version, doc_id, meta, message.get("content")
|
||||
|
||||
|
||||
def transform(landfill, n_documents=1000):
|
||||
|
|
|
@ -52,61 +52,9 @@ def test_process_namespace_telemetry(sample_document):
|
|||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "1", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_deviceinfo(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/appusage/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
sample_document["content"]["deviceinfo"] = "foo"
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "appusage", "3", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_version_4(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
sample_document["content"]["version"] = 4
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "4", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_version_5(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
sample_document["content"]["version"] = 5
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "5", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_ver_6(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
sample_document["content"]["ver"] = 6
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "6", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_v_7(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
sample_document["content"]["ver"] = 7
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "7", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_telemetry_ver_version_v(sample_document):
|
||||
uri = "/submit/telemetry/doc-id/main/Firefox/61.0.2/release/20180807170231"
|
||||
sample_document["meta"]["uri"] = uri
|
||||
# Populate all the version-related fields.
|
||||
sample_document["content"]["ver"] = 8
|
||||
sample_document["content"]["version"] = 9
|
||||
sample_document["content"]["v"] = 10
|
||||
sample_document["content"]["deviceinfo"] = "foo"
|
||||
row = sampler._process(sample_document)
|
||||
assert row[:4] == ("telemetry", "main", "8", "doc-id")
|
||||
|
||||
|
||||
def test_process_namespace_generic(sample_document):
|
||||
# /submit/<namespace>/<doc_type>/<doc_version>/<doc_id>
|
||||
uri = "/submit/namespace/doc-type/doc-version/doc-id"
|
||||
|
|
Загрузка…
Ссылка в новой задаче