Bug 1920544 Create view to union firefox desktop crashes (#6257)

This commit is contained in:
Ben Wu 2024-09-27 18:48:44 +01:00 коммит произвёл GitHub
Родитель 2978c55341
Коммит 66443eee29
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
7 изменённых файлов: 493 добавлений и 104 удалений

Просмотреть файл

@ -4,7 +4,7 @@ import json
import os
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional
import attr
import yaml
@ -295,3 +295,148 @@ class Schema:
def from_bigquery_schema(cls, fields: List[SchemaField]) -> "Schema":
"""Construct a Schema from the BigQuery representation."""
return cls({"fields": [field.to_api_repr() for field in fields]})
def generate_compatible_select_expression(
self,
target_schema: "Schema",
fields_to_remove: Optional[Iterable[str]] = None,
unnest_structs: bool = False,
max_unnest_depth: int = 0,
unnest_allowlist: Optional[Iterable[str]] = None,
) -> str:
"""Generate the select expression for the source schema based on the target schema.
The output will include all fields of the target schema in the same order of the target.
Any fields that are missing in the source schema are set to NULL.
:param target_schema: The schema to coerce the current schema to.
:param fields_to_remove: Given fields are removed from the output expression. Expressed as a
list of strings with `.` separating each level of nesting, e.g. record_name.field.
:param unnest_structs: If true, all record fields are expressed as structs with all nested
fields explicitly listed. This allows the expression to be compatible even if the
source schemas get new fields added. Otherwise, records are only unnested if they
do not match the target schema.
:param max_unnest_depth: Maximum level of struct nesting to explicitly unnest in
the expression.
:param unnest_allowlist: If set, only the given top-level structs are unnested.
"""
def _type_info(node):
"""Determine the BigQuery type information from Schema object field."""
dtype = node["type"]
if dtype == "RECORD":
dtype = (
"STRUCT<"
+ ", ".join(
f"`{field['name']}` {_type_info(field)}"
for field in node["fields"]
)
+ ">"
)
elif dtype == "FLOAT":
dtype = "FLOAT64"
if node.get("mode") == "REPEATED":
return f"ARRAY<{dtype}>"
return dtype
def recurse_fields(
_source_schema_nodes: List[Dict],
_target_schema_nodes: List[Dict],
path=None,
) -> str:
if path is None:
path = []
select_expr = []
source_schema_nodes = {n["name"]: n for n in _source_schema_nodes}
target_schema_nodes = {n["name"]: n for n in _target_schema_nodes}
# iterate through fields
for node_name, node in target_schema_nodes.items():
dtype = node["type"]
node_path = path + [node_name]
node_path_str = ".".join(node_path)
if node_name in source_schema_nodes: # field exists in app schema
# field matches, can query as-is
if node == source_schema_nodes[node_name] and (
# don't need to unnest scalar
dtype != "RECORD"
or not unnest_structs
# reached max record depth to unnest
or len(node_path) > max_unnest_depth > 0
# field not in unnest allowlist
or (
unnest_allowlist is not None
and node_path[0] not in unnest_allowlist
)
):
if (
fields_to_remove is None
or node_path_str not in fields_to_remove
):
select_expr.append(node_path_str)
elif (
dtype == "RECORD"
): # for nested fields, recursively generate select expression
if (
node.get("mode", None) == "REPEATED"
): # unnest repeated record
select_expr.append(
f"""
ARRAY(
SELECT
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
[node_name],
)}
)
FROM UNNEST({node_path_str}) AS `{node_name}`
) AS `{node_name}`
"""
)
else: # select struct fields
select_expr.append(
f"""
STRUCT(
{recurse_fields(
source_schema_nodes[node_name]['fields'],
node['fields'],
node_path,
)}
) AS `{node_name}`
"""
)
else: # scalar value doesn't match, e.g. different types
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
else: # field not found in source schema
select_expr.append(
f"CAST(NULL AS {_type_info(node)}) AS `{node_name}`"
)
return ", ".join(select_expr)
return recurse_fields(
self.schema["fields"],
target_schema.schema["fields"],
)
def generate_select_expression(
self,
remove_fields: Optional[Iterable[str]] = None,
unnest_structs: bool = False,
max_unnest_depth: int = 0,
unnest_allowlist: Optional[Iterable[str]] = None,
) -> str:
"""Generate the select expression for the schema which includes each field."""
return self.generate_compatible_select_expression(
self,
remove_fields,
unnest_structs,
max_unnest_depth,
unnest_allowlist,
)

Просмотреть файл

@ -0,0 +1,6 @@
# Generated by bqetl generate desktop_crashes
friendly_name: Firefox Desktop Crashes
description: |-
Union of multiple Firefox desktop crash ping sources.
owners:
- bewu@mozilla.cam

Просмотреть файл

@ -0,0 +1,6 @@
# Desktop Crashes
Generate a view that unions different sources for Firefox desktop crash pings.
Schemas for the different tables should be equivalent but fields may be in different order or
with removed metrics missing from newer tables.
If fields do not match, the output will be a union of all the fields.

Просмотреть файл

@ -0,0 +1,76 @@
from pathlib import Path
import click
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.schema import Schema
from bigquery_etl.util.common import write_sql
CRASH_TABLES = [
("moz-fx-data-shared-prod", "firefox_desktop_stable", "crash_v1"),
("moz-fx-data-shared-prod", "firefox_crashreporter_stable", "crash_v1"),
]
@click.command("generate")
@click.option(
"--target-project",
"--target_project",
help="Which project the queries should be written to.",
default="moz-fx-data-shared-prod",
)
@click.option(
"--output-dir",
"--output_dir",
help="The location to write to. Defaults to sql/.",
default=Path("sql"),
type=click.Path(file_okay=False),
)
@use_cloud_function_option
def generate(target_project, output_dir, use_cloud_function):
schemas = {
f"{project}.{dataset}.{table}": Schema.for_table(
project=project,
dataset=dataset,
table=table,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
)
for project, dataset, table in CRASH_TABLES
}
combined_schema = Schema.empty()
for table_name, schema in schemas.items():
if len(schema.schema["fields"]) == 0:
raise ValueError(
f"Could not get schema for {table_name} from dry run, "
f"possible authentication issue."
)
combined_schema.merge(schema)
fields_per_table = {
table_name: schema.generate_compatible_select_expression(
combined_schema,
unnest_structs=False,
)
for table_name, schema in schemas.items()
}
template_dir = Path(__file__).parent / "templates"
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("desktop_crashes.view.sql")
query = template.render(tables=fields_per_table, project_id=target_project)
write_sql(
Path(output_dir) / target_project,
f"{target_project}.firefox_desktop.desktop_crashes",
"view.sql",
reformat(query),
)
if __name__ == "__main__":
generate()

Просмотреть файл

@ -0,0 +1,13 @@
-- Generated by bqetl generate desktop_crashes
CREATE OR REPLACE VIEW
`{{ project_id }}.firefox_desktop.desktop_crashes`
AS
{% for table_name, fields in tables.items() %}
SELECT
{{ fields }}
FROM
`{{ table_name }}`
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

Просмотреть файл

@ -59,7 +59,7 @@ class GleanAppPingViews(GleanTable):
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None
id_token=None,
):
"""
Generate per-app ping views across channels.
@ -112,7 +112,7 @@ class GleanAppPingViews(GleanTable):
view_name,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
id_token=id_token
id_token=id_token,
)
cached_schemas[channel_dataset] = deepcopy(schema)
@ -147,10 +147,14 @@ class GleanAppPingViews(GleanTable):
channel_dataset = channel_app["bq_dataset_family"]
# compare table schema with unioned schema to determine fields that need to be NULL
select_expression = self._generate_select_expression(
unioned_schema.schema["fields"],
cached_schemas[channel_dataset].schema["fields"],
restructure_metrics=restructure_metrics,
select_expression = cached_schemas[
channel_dataset
].generate_compatible_select_expression(
unioned_schema,
fields_to_remove=OVERRIDDEN_FIELDS,
unnest_structs=restructure_metrics,
max_unnest_depth=2,
unnest_allowlist="metrics",
)
queries.append(
@ -246,100 +250,3 @@ class GleanAppPingViews(GleanTable):
_process_ping,
p.get_pings(),
)
def _generate_select_expression(
self, unioned_schema_nodes, app_schema_nodes, path=[], restructure_metrics=False
) -> str:
"""
Generate the select expression based on the unioned schema and the app channel schema.
Any fields that are missing in the app_schema are set to NULL.
"""
select_expr = []
unioned_schema_nodes = {n["name"]: n for n in unioned_schema_nodes}
app_schema_nodes = {n["name"]: n for n in app_schema_nodes}
# iterate through fields
for node_name, node in unioned_schema_nodes.items():
dtype = node["type"]
node_path = path + [node_name]
if node_name in app_schema_nodes:
# field exists in app schema
# We sometimes fully specify the `metrics` fields structure to try to avoid problems
# when the underlying table/view schemas change due to new metrics being added.
if node == app_schema_nodes[node_name] and not (
restructure_metrics
and node_path[0] == "metrics"
and len(node_path) <= 2
and dtype == "RECORD"
):
if node_name not in OVERRIDDEN_FIELDS:
# field (and all nested fields) are identical, so just query it
select_expr.append(f"{'.'.join(node_path)}")
else:
# fields and/or nested fields are not identical, or this is within `metrics`
if dtype == "RECORD":
# for nested fields, recursively generate select expression
if node.get("mode", None) == "REPEATED":
# unnest repeated record
select_expr.append(
f"""
ARRAY(
SELECT
STRUCT(
{self._generate_select_expression(
node['fields'],
app_schema_nodes[node_name]['fields'],
[node_name],
restructure_metrics
)}
)
FROM UNNEST({'.'.join(node_path)}) AS `{node_name}`
) AS `{node_name}`
"""
)
else:
# select struct fields
select_expr.append(
f"""
STRUCT(
{self._generate_select_expression(
node['fields'],
app_schema_nodes[node_name]['fields'],
node_path,
restructure_metrics
)}
) AS `{node_name}`
"""
)
else:
select_expr.append(
f"CAST(NULL AS {self._type_info(node)}) AS `{node_name}`"
)
else:
select_expr.append(
f"CAST(NULL AS {self._type_info(node)}) AS `{node_name}`"
)
return ", ".join(select_expr)
def _type_info(self, node):
"""Determine the type information."""
dtype = node["type"]
if dtype == "RECORD":
dtype = (
"STRUCT<"
+ ", ".join(
f"`{field['name']}` {self._type_info(field)}"
for field in node["fields"]
)
+ ">"
)
elif dtype == "FLOAT":
dtype = "FLOAT64"
if node.get("mode") == "REPEATED":
return f"ARRAY<{dtype}>"
return dtype

Просмотреть файл

@ -4,6 +4,7 @@ from textwrap import dedent
import yaml
from google.cloud.bigquery import SchemaField
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.schema import Schema
TEST_DIR = Path(__file__).parent.parent
@ -453,3 +454,238 @@ class TestQuerySchema:
),
),
)
def test_generate_compatible_select_expression():
source_schema = {
"fields": [
{"name": "scalar", "type": "INTEGER"},
{"name": "mismatch_scalar", "type": "INTEGER"},
{
"name": "record",
"type": "RECORD",
"fields": [
{"name": "nested_extra", "type": "DATE"},
{
"name": "nested_record",
"type": "RECORD",
"fields": [
{"name": "v1", "type": "INTEGER"},
{"name": "v2", "type": "INTEGER"},
],
},
{
"name": "mismatch_record",
"type": "RECORD",
"fields": [{"name": "nested_str", "type": "STRING"}],
},
],
},
{"name": "array_scalar", "type": "INTEGER", "mode": "REPEATED"},
{
"name": "array_record",
"type": "RECORD",
"mode": "REPEATED",
"fields": [
{"name": "key", "type": "STRING"},
{"name": "value", "type": "STRING"},
],
},
{"name": "extra", "type": "STRING"},
]
}
target_schema = {
"fields": [
{"name": "scalar", "type": "INTEGER"},
{"name": "mismatch_scalar", "type": "STRING"},
{
"name": "record",
"type": "RECORD",
"fields": [
{"name": "nested_missing", "type": "DATE"},
{
"name": "nested_record",
"type": "RECORD",
"fields": [
{"name": "v1", "type": "INTEGER"},
{"name": "v2", "type": "INTEGER"},
],
},
{
"name": "mismatch_record",
"type": "RECORD",
"fields": [{"name": "nested_int", "type": "INTEGER"}],
},
],
},
{"name": "array_scalar", "type": "INTEGER", "mode": "REPEATED"},
{
"name": "array_record",
"type": "RECORD",
"mode": "REPEATED",
"fields": [
{"name": "value", "type": "STRING"},
{"name": "key", "type": "STRING"},
],
},
{"name": "missing", "type": "STRING"},
]
}
expected_expr = """
scalar,
CAST(NULL AS STRING) AS `mismatch_scalar`,
STRUCT(
CAST(NULL AS DATE) AS `nested_missing`,
record.nested_record,
STRUCT(CAST(NULL AS INTEGER) AS `nested_int`) AS `mismatch_record`
) AS `record`,
array_scalar,
ARRAY(
SELECT
STRUCT(
array_record.value,
array_record.key
)
FROM
UNNEST(array_record) AS `array_record`
) AS `array_record`,
CAST(NULL AS STRING) AS `missing`
"""
source = Schema.from_json(source_schema)
target = Schema.from_json(target_schema)
select_expr = source.generate_compatible_select_expression(
target, unnest_structs=False
)
assert reformat(select_expr) == reformat(expected_expr)
def test_generate_select_expression_unnest_struct():
"""unnest_struct argument should unnest records even when they match."""
source_schema = {
"fields": [
{
"name": "record",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING"},
{"name": "value", "type": "STRING"},
],
},
]
}
source = Schema.from_json(source_schema)
unnest_expr = source.generate_select_expression(unnest_structs=True)
assert reformat(unnest_expr) == reformat(
"STRUCT(record.key, record.value) AS `record`"
)
no_unnest_expr = source.generate_select_expression(unnest_structs=False)
assert reformat(no_unnest_expr) == reformat("record")
def test_generate_select_expression_remove_fields():
"""remove_fields argument remove the given fields from the output."""
source_schema = {
"fields": [
{
"name": "record",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING"},
{"name": "value", "type": "STRING"},
],
},
{"name": "scalar", "type": "INTEGER"},
]
}
source = Schema.from_json(source_schema)
unnest_expr = source.generate_select_expression(
unnest_structs=True, remove_fields=["record.value", "scalar"]
)
assert reformat(unnest_expr) == reformat("STRUCT(record.key) AS `record`")
def test_generate_select_expression_max_unnest_depth():
"""max_unnest_depth argument should stop unnesting at the given depth."""
source_schema = {
"fields": [
{
"name": "record",
"type": "RECORD",
"fields": [
{
"name": "record2",
"type": "RECORD",
"fields": [
{
"name": "record3",
"type": "RECORD",
"fields": [{"name": "key", "type": "STRING"}],
},
],
},
],
},
]
}
source = Schema.from_json(source_schema)
expected_expr = """
STRUCT(
STRUCT(
record.record2.record3
) AS `record2`
) AS `record`
"""
unnest_expr = source.generate_select_expression(
unnest_structs=True, max_unnest_depth=2
)
assert reformat(unnest_expr) == reformat(expected_expr)
def test_generate_select_expression_unnest_allowlist():
"""unnest_allowlist argument should cause only the given fields to be unnested."""
source_schema = {
"fields": [
{
"name": "record",
"type": "RECORD",
"fields": [{"name": "key", "type": "STRING"}],
},
{
"name": "record2",
"type": "RECORD",
"fields": [
{
"name": "record3",
"type": "RECORD",
"fields": [{"name": "key", "type": "STRING"}],
}
],
},
]
}
source = Schema.from_json(source_schema)
expected_expr = """
record,
STRUCT(
STRUCT(
record2.record3.key
) AS `record3`
) AS `record2`
"""
unnest_expr = source.generate_select_expression(
unnest_structs=True, unnest_allowlist=["record2"]
)
assert reformat(unnest_expr) == reformat(expected_expr)