Fix bugs in `derived_view_schemas` SQL generator (#5592)

* Limit `derived_view_schemas` SQL generator to actual view directories.

* Fix the `derived_view_schemas` SQL generator to get the view schemas by dry-running their latest SQL and/or from their latest `schema.yaml` file.  Getting the schema from the currently deployed view wasn't appropriate because it wouldn't reflect the latest view code.

* Rename `View.view_schema` to `View.schema`.

* Change `View` so its schema dry-runs use the cloud function (CI doesn't have permission to run dry-run queries directly).

* Apply partition column filters in view dry-run queries when possible for speed/efficiency.

* Don't allow missing fields to prevent view schema enrichment.

* Only copy column descriptions during view schema enrichment.

* Only try enriching view schemas from their reference table `schema.yaml` files if those files actually exist.

* Change `main_1pct` view to select directly from the `main_remainder_1pct_v1` table, so the `derived_view_schemas` SQL generator can detect the partition column to use and successfully dry-run the view to determine its schema.

* Formalize the order `bqetl generate all` runs the SQL generators in.

* Have `bqetl generate all` run `derived_view_schemas` last, in case other SQL generators create derived views.

* Fix `Schema._traverse()` to only recurse if both fields are records.
This commit is contained in:
Sean Rose 2024-06-07 19:05:36 -07:00 коммит произвёл GitHub
Родитель 80de43a57f
Коммит 4b8574f3bd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
6 изменённых файлов: 143 добавлений и 75 удалений

Просмотреть файл

@ -99,7 +99,22 @@ def generate_all(ctx, output_dir, target_project, ignore, use_cloud_function):
"""Run all SQL generators."""
click.echo(f"Generating SQL content in {output_dir}.")
click.echo(ROOT / SQL_GENERATORS_DIR)
for _, cmd in reversed(generate.commands.items()):
def generator_command_sort_key(command):
match command.name:
# Run `glean_usage` after `stable_views` because both update `dataset_metadata.yaml` files
# and we want the `glean_usage` updates to take precedence.
case "stable_views":
return (1, command.name)
case "glean_usage":
return (2, command.name)
# Run `derived_view_schemas` last in case other SQL generators create derived views.
case "derived_view_schemas":
return (4, command.name)
case _:
return (3, command.name)
for cmd in sorted(generate.commands.values(), key=generator_command_sort_key):
if cmd.name != "all" and cmd.name not in ignore:
ctx.invoke(
cmd,

Просмотреть файл

@ -247,7 +247,7 @@ class Schema:
f"for {prefix}.{field_path} are incompatible"
)
if dtype == "RECORD":
if dtype == "RECORD" and nodes[node_name]["type"] == "RECORD":
# keep traversing nested fields
self._traverse(
f"{prefix}.{field_path}",

Просмотреть файл

@ -6,10 +6,12 @@ import string
import time
from functools import cached_property
from pathlib import Path
from textwrap import dedent
from typing import Optional
import attr
import sqlparse
from google.api_core.exceptions import BadRequest, Forbidden, NotFound
from google.api_core.exceptions import BadRequest, NotFound
from google.cloud import bigquery
from bigquery_etl.config import ConfigLoader
@ -20,7 +22,7 @@ from bigquery_etl.metadata.parse_metadata import (
DatasetMetadata,
Metadata,
)
from bigquery_etl.schema import Schema
from bigquery_etl.schema import SCHEMA_FILE, Schema
from bigquery_etl.util import extract_from_query_path
from bigquery_etl.util.common import render
@ -38,6 +40,7 @@ class View:
name: str = attr.ib()
dataset: str = attr.ib()
project: str = attr.ib()
partition_column: Optional[str] = attr.ib(None)
@path.validator
def validate_path(self, attribute, value):
@ -52,10 +55,12 @@ class View:
return render(path.name, template_folder=path.parent)
@classmethod
def from_file(cls, path):
def from_file(cls, path, **kwargs):
"""View from SQL file."""
project, dataset, name = extract_from_query_path(path)
return cls(path=str(path), name=name, dataset=dataset, project=project)
return cls(
path=str(path), name=name, dataset=dataset, project=project, **kwargs
)
@property
def view_identifier(self):
@ -175,28 +180,48 @@ class View:
)
@cached_property
def view_schema(self):
def schema(self):
"""Derive view schema from a schema file or a dry run result."""
schema_file = Path(self.path).parent / "schema.yaml"
# check schema based on schema file
if schema_file.is_file():
return Schema.from_schema_file(schema_file)
else: # check schema based on dry run results
try:
client = bigquery.Client()
return self.configured_schema or self.dryrun_schema
query_job = client.query(
query=self.content,
job_config=bigquery.QueryJobConfig(
dry_run=True, use_legacy_sql=False
),
@cached_property
def schema_path(self):
"""Return the schema file path."""
return Path(self.path).parent / SCHEMA_FILE
@cached_property
def configured_schema(self):
"""Derive view schema from a schema file."""
if self.schema_path.is_file():
return Schema.from_schema_file(self.schema_path)
return None
@cached_property
def dryrun_schema(self):
"""Derive view schema from a dry run result."""
try:
# We have to remove `CREATE OR REPLACE VIEW ... AS` from the query to avoid
# view-creation-permission-denied errors, and we have to apply a `WHERE`
# filter to avoid partition-column-filter-missing errors.
schema_query_filter = (
f"DATE(`{self.partition_column}`) = DATE('2020-01-01')"
if self.partition_column
else "FALSE"
)
schema_query = dedent(
f"""
WITH view_query AS (
{CREATE_VIEW_PATTERN.sub("", self.content)}
)
return Schema.from_bigquery_schema(query_job.schema)
except Forbidden:
print(
f"Missing permission to dry run view {self.view_identifier} to get schema"
)
return None
SELECT *
FROM view_query
WHERE {schema_query_filter}
"""
)
return Schema.from_query_file(Path(self.path), content=schema_query)
except Exception as e:
print(f"Error dry-running view {self.view_identifier} to get schema: {e}")
return None
def _valid_fully_qualified_references(self):
"""Check that referenced tables and views are fully qualified."""
@ -314,7 +339,7 @@ class View:
table_schema = Schema.from_bigquery_schema(table.schema)
if self.view_schema is not None and not self.view_schema.equal(table_schema):
if self.schema is not None and not self.schema.equal(table_schema):
print(f"view {target_view_id} will change: schema does not match")
return True
@ -369,9 +394,8 @@ class View:
raise
try:
schema_path = Path(self.path).parent / "schema.yaml"
if schema_path.is_file():
self.view_schema.deploy(target_view)
if self.schema_path.is_file():
self.schema.deploy(target_view)
except Exception as e:
print(f"Could not update field descriptions for {target_view}: {e}")

Просмотреть файл

@ -4,4 +4,4 @@ AS
SELECT
*
FROM
`moz-fx-data-shared-prod.telemetry.main_remainder_1pct`
`moz-fx-data-shared-prod.telemetry_derived.main_remainder_1pct_v1`

Просмотреть файл

@ -16,6 +16,10 @@ NON_USER_FACING_DATASET_SUBSTRINGS = (
"udf",
)
VIEW_FILE = "view.sql"
METADATA_FILE = "metadata.yaml"
SCHEMA_FILE = "schema.yaml"
def _generate_view_schema(sql_dir, view_directory):
import logging
@ -24,29 +28,24 @@ def _generate_view_schema(sql_dir, view_directory):
from bigquery_etl.metadata.parse_metadata import Metadata
from bigquery_etl.schema import Schema
from bigquery_etl.util.common import render
from bigquery_etl.view import View
logging.basicConfig(format="%(levelname)s (%(filename)s:%(lineno)d) - %(message)s")
VIEW_FILE = "view.sql"
METADATA_FILE = "metadata.yaml"
SCHEMA_FILE = "schema.yaml"
# If the view references only one table, we can:
# 1. Get the reference table partition key if it exists.
# 1. Get the reference table partition column if it exists.
# (to dry run views to partitioned tables).
# 2. Get the reference table schema and use it to enrich the
# view schema we get from dry-running.
def _get_reference_dir_path(view_dir):
view_file = view_dir / VIEW_FILE
if not view_file.exists():
return
view_references = extract_table_references(render(view_file.name, view_dir))
def _get_reference_dir_path(view_file):
view_references = extract_table_references(
render(view_file.name, view_file.parent)
)
if len(view_references) != 1:
return
target_project = view_dir.parent.parent.name
target_dataset = view_dir.parent.name
target_project = view_file.parent.parent.parent.name
target_dataset = view_file.parent.parent.name
target_reference = view_references[0]
parts = target_reference.split(".")
@ -67,9 +66,9 @@ def _generate_view_schema(sql_dir, view_directory):
sql_dir / reference_project_id / reference_dataset_id / reference_table_id
)
def _get_reference_partition_key(ref_path):
def _get_reference_partition_column(ref_path):
if ref_path is None:
logging.debug("No table reference, skipping partition key.")
logging.debug("No table reference, skipping partition column.")
return
try:
@ -81,20 +80,24 @@ def _generate_view_schema(sql_dir, view_directory):
bigquery_metadata = reference_metadata.bigquery
if bigquery_metadata is None:
logging.warning(
f"No bigquery metadata at {ref_path}, unable to get partition key."
f"No bigquery metadata at {ref_path}, unable to get partition column."
)
return
partition_metadata = bigquery_metadata.time_partitioning
if partition_metadata is None:
logging.warning(
f"No partition metadata at {ref_path}, unable to get partition key."
f"No partition metadata at {ref_path}, unable to get partition column."
)
return
return partition_metadata.field
reference_path = _get_reference_dir_path(view_directory)
view_file = view_directory / VIEW_FILE
if not view_file.exists():
return
reference_path = _get_reference_dir_path(view_file)
# If this is a view to a stable table, don't try to write the schema:
if reference_path is not None:
@ -102,34 +105,51 @@ def _generate_view_schema(sql_dir, view_directory):
if reference_dataset.endswith("_stable"):
return
# Optionally get the upstream partition key
reference_partition_key = _get_reference_partition_key(reference_path)
if reference_partition_key is None:
logging.debug("No reference partition key, dry running without one.")
# Optionally get the upstream partition column
reference_partition_column = _get_reference_partition_column(reference_path)
if reference_partition_column is None:
logging.debug("No reference partition column, dry running without one.")
project_id = view_directory.parent.parent.name
dataset_id = view_directory.parent.name
view_id = view_directory.name
view = View.from_file(view_file, partition_column=reference_partition_column)
schema = Schema.for_table(
project_id, dataset_id, view_id, partitioned_by=reference_partition_key
)
if len(schema.schema.get("fields")) == 0:
# `View.schema` prioritizes the configured schema over the dryrun schema, but here
# we prioritize the dryrun schema because the `schema.yaml` file might be out of date.
schema = view.dryrun_schema or view.configured_schema
if view.dryrun_schema and view.configured_schema:
try:
schema.merge(
view.configured_schema,
attributes=["description"],
add_missing_fields=False,
ignore_missing_fields=True,
)
except Exception as e:
logging.warning(
f"Error enriching {view.view_identifier} view schema from {view.schema_path}: {e}"
)
if not schema:
logging.warning(
f"Got empty schema for {project_id}.{dataset_id}.{view_id} potentially "
f"Couldn't get schema for {view.view_identifier} potentially "
f"due to dry-run error. Won't write yaml."
)
return
# Optionally enrich the view schema if we have a valid table reference
if reference_path:
try:
reference_schema = Schema.from_schema_file(reference_path / SCHEMA_FILE)
schema.merge(reference_schema, add_missing_fields=False)
except Exception as e:
logging.info(
f"Unable to open reference schema; unable to enrich schema: {e}"
)
reference_schema_file = reference_path / SCHEMA_FILE
if reference_schema_file.exists():
try:
reference_schema = Schema.from_schema_file(reference_schema_file)
schema.merge(
reference_schema,
attributes=["description"],
add_missing_fields=False,
ignore_missing_fields=True,
)
except Exception as e:
logging.warning(
f"Error enriching {view.view_identifier} view schema from {reference_schema_file}: {e}"
)
schema.to_yaml_file(view_directory / SCHEMA_FILE)
@ -178,7 +198,11 @@ def generate(target_project, output_dir, parallelism, use_cloud_function):
]
for dataset_path in dataset_paths:
view_directories = [path for path in dataset_path.iterdir() if path.is_dir()]
view_directories = [
path
for path in dataset_path.iterdir()
if path.is_dir() and (path / VIEW_FILE).exists()
]
with ProcessingPool(parallelism) as pool:
pool.map(

Просмотреть файл

@ -146,13 +146,16 @@ class TestView:
assert mock_bigquery_table().friendly_name == "Test metadata file"
assert mock_bigquery_table().description == "Test description"
@patch("bigquery_etl.dryrun.DryRun")
@patch("google.cloud.bigquery.Client")
def test_view_has_changes_no_changes(self, mock_client, simple_view):
def test_view_has_changes_no_changes(self, mock_client, mock_dryrun, simple_view):
deployed_view = Mock()
deployed_view.view_query = CREATE_VIEW_PATTERN.sub("", simple_view.content)
deployed_view.schema = [SchemaField("a", "INT")]
mock_client.return_value.get_table.return_value = deployed_view
mock_client.return_value.query.return_value.schema = [SchemaField("a", "INT")]
mock_dryrun.return_value.get_schema.return_value = {
"fields": [{"name": "a", "type": "INT"}]
}
assert not simple_view.has_changes()
@ -194,16 +197,18 @@ class TestView:
assert metadata_view.has_changes()
assert "friendly_name" in capsys.readouterr().out
@patch("bigquery_etl.dryrun.DryRun")
@patch("google.cloud.bigquery.Client")
def test_view_has_changes_changed_schema(self, mock_client, simple_view, capsys):
def test_view_has_changes_changed_schema(
self, mock_client, mock_dryrun, simple_view, capsys
):
deployed_view = Mock()
deployed_view.view_query = CREATE_VIEW_PATTERN.sub("", simple_view.content)
deployed_view.schema = [SchemaField("a", "INT")]
mock_client.return_value.get_table.return_value = deployed_view
mock_client.return_value.query.return_value.schema = [
SchemaField("a", "INT"),
SchemaField("b", "INT"),
]
mock_dryrun.return_value.get_schema.return_value = {
"fields": [{"name": "a", "type": "INT"}, {"name": "b", "type": "INT"}]
}
assert simple_view.has_changes()
assert "schema" in capsys.readouterr().out