bug(1741487): Rename url2 and related fields in stable views (#4029)
* Bug 1741487 - Rename url2 and related fields in stable views This removes the following unpopulated fields from Glean views: `metrics.url`, `metrics.text`, `metrics.jwe`, and `metrics.labeled_rate`. If any of these metrics exist in the source table under `2`-suffixed name, it is also aliased to its original name (`url2` to `url` and so on). Suffixed fields are still preserved until view consumers migrate. * Remove redundant comma from generated sql * Ignore missing fields in views if any of them were removed * added a todo comment * Added additional context around why we are excluding some of the non-suffixed fields and why alising to remove suffix 2 from some fields --------- Co-authored-by: Arkadiusz Komarzewski <akomarzewski@mozilla.com> Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Родитель
ae2edf2234
Коммит
9b5c04a7bb
|
@ -100,6 +100,7 @@ class Schema:
|
|||
add_missing_fields=True,
|
||||
attributes: Optional[List[str]] = None,
|
||||
ignore_incompatible_fields: bool = False,
|
||||
ignore_missing_fields: bool = False,
|
||||
):
|
||||
"""Merge another schema into the schema."""
|
||||
if "fields" in other.schema and "fields" in self.schema:
|
||||
|
@ -112,6 +113,7 @@ class Schema:
|
|||
add_missing_fields=add_missing_fields,
|
||||
attributes=attributes,
|
||||
ignore_incompatible_fields=ignore_incompatible_fields,
|
||||
ignore_missing_fields=ignore_missing_fields,
|
||||
)
|
||||
|
||||
def equal(self, other: "Schema") -> bool:
|
||||
|
|
|
@ -136,6 +136,7 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
|
|||
full_source_id = f"{target_project}.{schema.stable_table}"
|
||||
full_view_id = f"{target_project}.{schema.user_facing_view}"
|
||||
replacements = ["mozfun.norm.metadata(metadata) AS metadata"]
|
||||
key_value_metrics_removed = False
|
||||
if schema.schema_id == "moz://mozilla.org/schemas/glean/ping/1":
|
||||
replacements += ["mozfun.norm.glean_ping_info(ping_info) AS ping_info"]
|
||||
if schema.bq_table == "baseline_v1":
|
||||
|
@ -154,16 +155,22 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
|
|||
)
|
||||
else:
|
||||
metrics_source = "metrics"
|
||||
|
||||
datetime_replacements_clause = ""
|
||||
metrics_2_aliases = []
|
||||
metrics_2_exclusions = []
|
||||
|
||||
if metrics_struct := next(
|
||||
(field for field in schema.schema if field["name"] == "metrics"), None
|
||||
):
|
||||
if metrics_datetime_fields := [
|
||||
metrics_datetime_field["name"]
|
||||
for field in schema.schema
|
||||
if field["name"] == "metrics"
|
||||
for metrics_field in field["fields"]
|
||||
for metrics_field in metrics_struct["fields"]
|
||||
if metrics_field["name"] == "datetime"
|
||||
for metrics_datetime_field in metrics_field["fields"]
|
||||
]:
|
||||
replacements += [
|
||||
f"(SELECT AS STRUCT {metrics_source}.* REPLACE (STRUCT("
|
||||
datetime_replacements_clause = (
|
||||
f"REPLACE (STRUCT("
|
||||
+ ", ".join(
|
||||
field_select
|
||||
for field in metrics_datetime_fields
|
||||
|
@ -172,7 +179,54 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
|
|||
f"metrics.datetime.{field} AS raw_{field}",
|
||||
)
|
||||
)
|
||||
+ ") AS datetime)) AS metrics"
|
||||
+ ") AS datetime)"
|
||||
)
|
||||
|
||||
# The following metrics were incorrectly deployed as repeated key/value fields and are suffixed with `2`
|
||||
# at ingestion to match deployed schemas
|
||||
# (see https://github.com/mozilla/gcp-ingestion/blob/9911895cf49ed6364b1b2fb8008310fb60ff9c8e/ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java#L376-L384) # noqa E501
|
||||
# We're aliasing them to their original names in views
|
||||
# (see https://bugzilla.mozilla.org/show_bug.cgi?id=1741487)
|
||||
# TODO: Later, after consumers switch to aliased fields we'll remove the `2`-suffixed fields
|
||||
metrics_2_types_to_rename = {
|
||||
"url2": "url",
|
||||
"text2": "text",
|
||||
"jwe2": "jwe",
|
||||
"labeled_rate2": "labeled_rate",
|
||||
}
|
||||
|
||||
# Due to past error in the deployment all of the following fields are considered invalid (`url`, `text`, `jwe`, `labeled_rate`)
|
||||
# after the issue has been resolved these fields were corrected and have the suffix `2`.
|
||||
# This suffix is automatically added by the ingestion pipeline and is a hidden detail from the data producers.
|
||||
# For this reason, all of the mentioned fields without the suffix can be excluded from the view
|
||||
# and any of the suffixed fields can have the suffix removed to make the data available more consistent with how it is provided.
|
||||
#
|
||||
# We have to handle these fields in two stages via `EXCEPT` and aliases instead of
|
||||
# a single `REPLACE` because there are some tables that have `url2` field but not `url` field.
|
||||
for metrics_field in metrics_struct["fields"]:
|
||||
# Excluding non-suffixed fields are they are considered invalid
|
||||
if metrics_field["name"] in metrics_2_types_to_rename.values():
|
||||
metrics_2_exclusions += [metrics_field["name"]]
|
||||
key_value_metrics_removed = True
|
||||
|
||||
# Using aliasing to remove the suffix from the valid fields
|
||||
if metrics_field["name"] in metrics_2_types_to_rename:
|
||||
metrics_2_aliases += [
|
||||
f"metrics.{metrics_field['name']} AS {metrics_2_types_to_rename[metrics_field['name']]}"
|
||||
]
|
||||
|
||||
if datetime_replacements_clause or metrics_2_aliases or metrics_2_exclusions:
|
||||
except_clause = ""
|
||||
if metrics_2_exclusions:
|
||||
except_clause = "EXCEPT (" + ", ".join(metrics_2_exclusions) + ")"
|
||||
metrics_select = (
|
||||
f"{metrics_source}.* {except_clause} {datetime_replacements_clause}"
|
||||
)
|
||||
|
||||
replacements += [
|
||||
f"(SELECT AS STRUCT "
|
||||
+ ", ".join([metrics_select] + metrics_2_aliases)
|
||||
+ ") AS metrics"
|
||||
]
|
||||
elif metrics_source != "metrics":
|
||||
replacements += [f"{metrics_source} AS metrics"]
|
||||
|
@ -219,10 +273,16 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
|
|||
view_schema = Schema.from_query_file(target_file, content=content)
|
||||
|
||||
stable_table_schema = Schema.from_json({"fields": schema.schema})
|
||||
|
||||
# This is needed if we removed any of the `url`, `text`, `jwe`, or `labeled_rate`
|
||||
# from the view schema since these fields exist in the source table
|
||||
ignore_missing_fields = key_value_metrics_removed
|
||||
|
||||
view_schema.merge(
|
||||
stable_table_schema,
|
||||
attributes=["description"],
|
||||
add_missing_fields=False,
|
||||
ignore_missing_fields=ignore_missing_fields,
|
||||
)
|
||||
view_schema.to_yaml_file(target_dir / "schema.yaml")
|
||||
except Exception as e:
|
||||
|
|
Загрузка…
Ссылка в новой задаче