mozilla-schema-generator/mozilla_schema_generator/glean_ping.py

423 строки
17 KiB
Python

# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
import logging
from pathlib import Path
from typing import Dict, List, Set
from requests import HTTPError
from .config import Config
from .generic_ping import GenericPing
from .probes import GleanProbe
from .schema import Schema
ROOT_DIR = Path(__file__).parent
BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt"
logger = logging.getLogger(__name__)
DEFAULT_SCHEMA_URL = (
"https://raw.githubusercontent.com"
"/mozilla-services/mozilla-pipeline-schemas"
"/{branch}/schemas/glean/glean/glean.1.schema.json"
)
MINIMUM_SCHEMA_URL = (
"https://raw.githubusercontent.com"
"/mozilla-services/mozilla-pipeline-schemas"
"/{branch}/schemas/glean/glean/glean-min.1.schema.json"
)
class GleanPing(GenericPing):
probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
dependencies_url_template = (
GenericPing.probe_info_base_url + "/glean/{}/dependencies"
)
default_dependencies = ["glean-core"]
with open(BUG_1737656_TXT, "r") as f:
bug_1737656_affected_tables = [
line.strip() for line in f.readlines() if line.strip()
]
def __init__(self, repo, **kwargs): # TODO: Make env-url optional
self.repo = repo
self.repo_name = repo["name"]
self.app_id = repo["app_id"]
super().__init__(
DEFAULT_SCHEMA_URL,
DEFAULT_SCHEMA_URL,
self.probes_url_template.format(self.repo_name),
**kwargs,
)
def get_schema(self, generic_schema=False) -> Schema:
"""
Fetch schema via URL.
Unless *generic_schema* is set to true, this function makes some modifications
to allow some workarounds for proper injection of metrics.
"""
schema = super().get_schema()
if generic_schema:
return schema
# We need to inject placeholders for the url2, text2, etc. types as part
# of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
for metric_name in ["labeled_rate", "jwe", "url", "text"]:
metric1 = schema.get(
("properties", "metrics", "properties", metric_name)
).copy()
metric1 = schema.set_schema_elem(
("properties", "metrics", "properties", metric_name + "2"),
metric1,
)
return schema
def get_dependencies(self):
# Get all of the library dependencies for the application that
# are also known about in the repositories file.
# The dependencies are specified using library names, but we need to
# map those back to the name of the repository in the repository file.
try:
dependencies = self._get_json(
self.dependencies_url_template.format(self.repo_name)
)
except HTTPError:
logging.info(f"For {self.repo_name}, using default Glean dependencies")
return self.default_dependencies
dependency_library_names = list(dependencies.keys())
repos = GleanPing._get_json(GleanPing.repos_url)
repos_by_dependency_name = {}
for repo in repos:
for library_name in repo.get("library_names", []):
repos_by_dependency_name[library_name] = repo["name"]
dependencies = []
for name in dependency_library_names:
if name in repos_by_dependency_name:
dependencies.append(repos_by_dependency_name[name])
if len(dependencies) == 0:
logging.info(f"For {self.repo_name}, using default Glean dependencies")
return self.default_dependencies
logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
return dependencies
def get_probes(self) -> List[GleanProbe]:
data = self._get_json(self.probes_url)
probes = list(data.items())
for dependency in self.get_dependencies():
dependency_probes = self._get_json(
self.probes_url_template.format(dependency)
)
probes += list(dependency_probes.items())
pings = self.get_pings()
processed = []
for _id, defn in probes:
probe = GleanProbe(_id, defn, pings=pings)
processed.append(probe)
# Manual handling of incompatible schema changes
issue_118_affected = {
"fenix",
"fenix-nightly",
"firefox-android-nightly",
"firefox-android-beta",
"firefox-android-release",
}
if (
self.repo_name in issue_118_affected
and probe.get_name() == "installation.timestamp"
):
logging.info(f"Writing column {probe.get_name()} for compatibility.")
# See: https://github.com/mozilla/mozilla-schema-generator/issues/118
# Search through history for the "string" type and add a copy of
# the probe at that time in history. The changepoint signifies
# this event.
changepoint_index = 0
for definition in probe.definition_history:
if definition["type"] != probe.get_type():
break
changepoint_index += 1
# Modify the definition with the truncated history.
hist_defn = defn.copy()
hist_defn[probe.history_key] = probe.definition_history[
changepoint_index:
]
hist_defn["type"] = hist_defn[probe.history_key][0]["type"]
incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings)
processed.append(incompatible_probe_type)
# Handling probe type changes (Bug 1870317)
probe_types = {hist["type"] for hist in defn[probe.history_key]}
if len(probe_types) > 1:
# The probe type changed at some point in history.
# Create schema entry for each type.
hist_defn = defn.copy()
# No new entry needs to be created for the current probe type
probe_types.remove(defn["type"])
for hist in hist_defn[probe.history_key]:
# Create a new entry for a historic type
if hist["type"] in probe_types:
hist_defn["type"] = hist["type"]
probe = GleanProbe(_id, hist_defn, pings=pings)
processed.append(probe)
# Keep track of the types entries were already created for
probe_types.remove(hist["type"])
return processed
def _get_ping_data(self) -> Dict[str, Dict]:
url = self.ping_url_template.format(self.repo_name)
ping_data = GleanPing._get_json(url)
for dependency in self.get_dependencies():
dependency_pings = self._get_json(self.ping_url_template.format(dependency))
ping_data.update(dependency_pings)
return ping_data
def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
url = self.ping_url_template.format(self.repo_name)
ping_data = GleanPing._get_json(url)
return ping_data
def _get_dependency_pings(self, dependency):
return self._get_json(self.ping_url_template.format(dependency))
def get_pings(self) -> Set[str]:
return self._get_ping_data().keys()
@staticmethod
def apply_default_metadata(ping_metadata, default_metadata):
"""apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
``ping_metadata``.
:param ping_metadata: dict onto which the merge is executed
:param default_metadata: dct merged into ping_metadata
:return: None
"""
for k, v in default_metadata.items():
if (
k in ping_metadata
and isinstance(ping_metadata[k], dict)
and isinstance(default_metadata[k], dict)
):
GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
else:
ping_metadata[k] = default_metadata[k]
def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
# Get the ping data with the pipeline metadata
ping_data = self._get_ping_data_without_dependencies()
# The ping endpoint for the dependency pings does not include any repo defined
# moz_pipeline_metadata_defaults so they need to be applied here.
# 1. Get repo and pipeline default metadata.
repos = self.get_repos()
current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
# 2. Apply the default metadata to each dependency defined ping.
# Apply app-level metadata to pings defined in dependencies
app_metadata = current_repo.get("moz_pipeline_metadata", {})
for dependency in self.get_dependencies():
dependency_pings = self._get_dependency_pings(dependency)
for dependency_ping in dependency_pings.values():
# Although it is counter intuitive to apply the default metadata on top of the
# existing dependency ping metadata it does set the repo specific value for
# bq_dataset_family instead of using the dependency id for the bq_dataset_family
# value.
GleanPing.apply_default_metadata(
dependency_ping.get("moz_pipeline_metadata"),
copy.deepcopy(default_metadata),
)
# app-level ping properties take priority over the app defaults
metadata_override = app_metadata.get(dependency_ping["name"])
if metadata_override is not None:
GleanPing.apply_default_metadata(
dependency_ping.get("moz_pipeline_metadata"), metadata_override
)
ping_data.update(dependency_pings)
return ping_data
@staticmethod
def reorder_metadata(metadata):
desired_order_list = [
"bq_dataset_family",
"bq_table",
"bq_metadata_format",
"include_info_sections",
"submission_timestamp_granularity",
"expiration_policy",
"override_attributes",
"jwe_mappings",
]
reordered_metadata = {
k: metadata[k] for k in desired_order_list if k in metadata
}
# re-order jwe-mappings
desired_order_list = ["source_field_path", "decrypted_field_path"]
jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
if jwe_mapping_metadata:
reordered_jwe_mapping_metadata = []
for mapping in jwe_mapping_metadata:
reordered_jwe_mapping_metadata.append(
{k: mapping[k] for k in desired_order_list if k in mapping}
)
reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
# future proofing, in case there are other fields added at the ping top level
# add them to the end.
leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
reordered_metadata = {**reordered_metadata, **leftovers}
return reordered_metadata
def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
pings = self._get_ping_data_and_dependencies_with_default_metadata()
for ping_name, ping_data in pings.items():
metadata = ping_data.get("moz_pipeline_metadata")
if not metadata:
continue
metadata["include_info_sections"] = self._is_field_included(
ping_data, "include_info_sections", consider_all_history=False
)
metadata["include_client_id"] = self._is_field_included(
ping_data, "include_client_id"
)
# While technically unnecessary, the dictionary elements are re-ordered to match the
# currently deployed order and used to verify no difference in output.
pings[ping_name] = GleanPing.reorder_metadata(metadata)
return pings
def get_ping_descriptions(self) -> Dict[str, str]:
return {
k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
}
@staticmethod
def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
"""Return false if the field exists and is false.
If `consider_all_history` is False, then only check the latest value in the ping history.
Otherwise, if the field is not found or true in one or more history entries,
true is returned.
"""
# Default to true if not specified.
if "history" not in ping_data or len(ping_data["history"]) == 0:
return True
# Check if at some point in the past the field has already been deployed.
# And if the caller of this method wants to consider this history of the field.
# Keep them in the schema, even if the field has changed as
# removing fields is currently not supported.
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
# and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
ping_history: list
if consider_all_history:
ping_history = ping_data["history"]
else:
ping_history = [ping_data["history"][-1]]
for history in ping_history:
if field_name not in history or history[field_name]:
return True
# The ping was created with include_info_sections = False. The fields can be excluded.
return False
def set_schema_url(self, metadata):
"""
Switch between the glean-min and glean schemas if the ping does not require
info sections as specified in the parsed ping info in probe scraper.
"""
if not metadata["include_info_sections"]:
self.schema_url = MINIMUM_SCHEMA_URL.format(branch=self.branch_name)
else:
self.schema_url = DEFAULT_SCHEMA_URL.format(branch=self.branch_name)
def generate_schema(self, config, generic_schema=False) -> Dict[str, Schema]:
pings = self.get_pings_and_pipeline_metadata()
schemas = {}
for ping, pipeline_meta in pings.items():
matchers = {
loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
}
# Four newly introduced metric types were incorrectly deployed
# as repeated key/value structs in all Glean ping tables existing prior
# to November 2021. We maintain the incorrect fields for existing tables
# by disabling the associated matchers.
# Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
# defined that will allow metrics of these types to be injected into proper
# structs. The gcp-ingestion repository includes logic to rewrite these
# metrics under the "2" names.
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
if bq_identifier in self.bug_1737656_affected_tables:
matchers = {
loc: m
for loc, m in matchers.items()
if not m.matcher.get("bug_1737656_affected")
}
for matcher in matchers.values():
matcher.matcher["send_in_pings"]["contains"] = ping
new_config = Config(ping, matchers=matchers)
defaults = {"mozPipelineMetadata": pipeline_meta}
# Adjust the schema path if the ping does not require info sections
self.set_schema_url(pipeline_meta)
if generic_schema: # Use the generic glean ping schema
schema = self.get_schema(generic_schema=True)
schema.schema.update(defaults)
schemas[new_config.name] = schema
else:
generated = super().generate_schema(new_config)
for schema in generated.values():
# We want to override each individual key with assembled defaults,
# but keep values _inside_ them if they have been set in the schemas.
for key, value in defaults.items():
if key not in schema.schema:
schema.schema[key] = {}
schema.schema[key].update(value)
schemas.update(generated)
return schemas
@staticmethod
def get_repos():
"""
Retrieve metadata for all non-library Glean repositories
"""
repos = GleanPing._get_json(GleanPing.repos_url)
return [repo for repo in repos if "library_names" not in repo]