diff --git a/bigquery_etl/schema/__init__.py b/bigquery_etl/schema/__init__.py index 4bb78b17cc..7c90f07d8e 100644 --- a/bigquery_etl/schema/__init__.py +++ b/bigquery_etl/schema/__init__.py @@ -100,6 +100,7 @@ class Schema: add_missing_fields=True, attributes: Optional[List[str]] = None, ignore_incompatible_fields: bool = False, + ignore_missing_fields: bool = False, ): """Merge another schema into the schema.""" if "fields" in other.schema and "fields" in self.schema: @@ -112,6 +113,7 @@ class Schema: add_missing_fields=add_missing_fields, attributes=attributes, ignore_incompatible_fields=ignore_incompatible_fields, + ignore_missing_fields=ignore_missing_fields, ) def equal(self, other: "Schema") -> bool: diff --git a/sql_generators/stable_views/__init__.py b/sql_generators/stable_views/__init__.py index cbd39f7ae2..948fb06b5f 100644 --- a/sql_generators/stable_views/__init__.py +++ b/sql_generators/stable_views/__init__.py @@ -136,6 +136,7 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF full_source_id = f"{target_project}.{schema.stable_table}" full_view_id = f"{target_project}.{schema.user_facing_view}" replacements = ["mozfun.norm.metadata(metadata) AS metadata"] + key_value_metrics_removed = False if schema.schema_id == "moz://mozilla.org/schemas/glean/ping/1": replacements += ["mozfun.norm.glean_ping_info(ping_info) AS ping_info"] if schema.bq_table == "baseline_v1": @@ -154,25 +155,78 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF ) else: metrics_source = "metrics" - if metrics_datetime_fields := [ - metrics_datetime_field["name"] - for field in schema.schema - if field["name"] == "metrics" - for metrics_field in field["fields"] - if metrics_field["name"] == "datetime" - for metrics_datetime_field in metrics_field["fields"] - ]: - replacements += [ - f"(SELECT AS STRUCT {metrics_source}.* REPLACE (STRUCT(" - + ", ".join( - field_select - for field in metrics_datetime_fields - for field_select in ( - f"mozfun.glean.parse_datetime(metrics.datetime.{field}) AS {field}", - f"metrics.datetime.{field} AS raw_{field}", + + datetime_replacements_clause = "" + metrics_2_aliases = [] + metrics_2_exclusions = [] + + if metrics_struct := next( + (field for field in schema.schema if field["name"] == "metrics"), None + ): + if metrics_datetime_fields := [ + metrics_datetime_field["name"] + for metrics_field in metrics_struct["fields"] + if metrics_field["name"] == "datetime" + for metrics_datetime_field in metrics_field["fields"] + ]: + datetime_replacements_clause = ( + f"REPLACE (STRUCT(" + + ", ".join( + field_select + for field in metrics_datetime_fields + for field_select in ( + f"mozfun.glean.parse_datetime(metrics.datetime.{field}) AS {field}", + f"metrics.datetime.{field} AS raw_{field}", + ) ) + + ") AS datetime)" ) - + ") AS datetime)) AS metrics" + + # The following metrics were incorrectly deployed as repeated key/value fields and are suffixed with `2` + # at ingestion to match deployed schemas + # (see https://github.com/mozilla/gcp-ingestion/blob/9911895cf49ed6364b1b2fb8008310fb60ff9c8e/ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java#L376-L384) # noqa E501 + # We're aliasing them to their original names in views + # (see https://bugzilla.mozilla.org/show_bug.cgi?id=1741487) + # TODO: Later, after consumers switch to aliased fields we'll remove the `2`-suffixed fields + metrics_2_types_to_rename = { + "url2": "url", + "text2": "text", + "jwe2": "jwe", + "labeled_rate2": "labeled_rate", + } + + # Due to past error in the deployment all of the following fields are considered invalid (`url`, `text`, `jwe`, `labeled_rate`) + # after the issue has been resolved these fields were corrected and have the suffix `2`. + # This suffix is automatically added by the ingestion pipeline and is a hidden detail from the data producers. + # For this reason, all of the mentioned fields without the suffix can be excluded from the view + # and any of the suffixed fields can have the suffix removed to make the data available more consistent with how it is provided. + # + # We have to handle these fields in two stages via `EXCEPT` and aliases instead of + # a single `REPLACE` because there are some tables that have `url2` field but not `url` field. + for metrics_field in metrics_struct["fields"]: + # Excluding non-suffixed fields are they are considered invalid + if metrics_field["name"] in metrics_2_types_to_rename.values(): + metrics_2_exclusions += [metrics_field["name"]] + key_value_metrics_removed = True + + # Using aliasing to remove the suffix from the valid fields + if metrics_field["name"] in metrics_2_types_to_rename: + metrics_2_aliases += [ + f"metrics.{metrics_field['name']} AS {metrics_2_types_to_rename[metrics_field['name']]}" + ] + + if datetime_replacements_clause or metrics_2_aliases or metrics_2_exclusions: + except_clause = "" + if metrics_2_exclusions: + except_clause = "EXCEPT (" + ", ".join(metrics_2_exclusions) + ")" + metrics_select = ( + f"{metrics_source}.* {except_clause} {datetime_replacements_clause}" + ) + + replacements += [ + f"(SELECT AS STRUCT " + + ", ".join([metrics_select] + metrics_2_aliases) + + ") AS metrics" ] elif metrics_source != "metrics": replacements += [f"{metrics_source} AS metrics"] @@ -219,10 +273,16 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF view_schema = Schema.from_query_file(target_file, content=content) stable_table_schema = Schema.from_json({"fields": schema.schema}) + + # This is needed if we removed any of the `url`, `text`, `jwe`, or `labeled_rate` + # from the view schema since these fields exist in the source table + ignore_missing_fields = key_value_metrics_removed + view_schema.merge( stable_table_schema, attributes=["description"], add_missing_fields=False, + ignore_missing_fields=ignore_missing_fields, ) view_schema.to_yaml_file(target_dir / "schema.yaml") except Exception as e: