From 4b8574f3bde23d8b9f02dc6f5673613356591639 Mon Sep 17 00:00:00 2001 From: Sean Rose <1994030+sean-rose@users.noreply.github.com> Date: Fri, 7 Jun 2024 19:05:36 -0700 Subject: [PATCH] Fix bugs in `derived_view_schemas` SQL generator (#5592) * Limit `derived_view_schemas` SQL generator to actual view directories. * Fix the `derived_view_schemas` SQL generator to get the view schemas by dry-running their latest SQL and/or from their latest `schema.yaml` file. Getting the schema from the currently deployed view wasn't appropriate because it wouldn't reflect the latest view code. * Rename `View.view_schema` to `View.schema`. * Change `View` so its schema dry-runs use the cloud function (CI doesn't have permission to run dry-run queries directly). * Apply partition column filters in view dry-run queries when possible for speed/efficiency. * Don't allow missing fields to prevent view schema enrichment. * Only copy column descriptions during view schema enrichment. * Only try enriching view schemas from their reference table `schema.yaml` files if those files actually exist. * Change `main_1pct` view to select directly from the `main_remainder_1pct_v1` table, so the `derived_view_schemas` SQL generator can detect the partition column to use and successfully dry-run the view to determine its schema. * Formalize the order `bqetl generate all` runs the SQL generators in. * Have `bqetl generate all` run `derived_view_schemas` last, in case other SQL generators create derived views. * Fix `Schema._traverse()` to only recurse if both fields are records. --- bigquery_etl/cli/generate.py | 17 ++- bigquery_etl/schema/__init__.py | 2 +- bigquery_etl/view/__init__.py | 78 +++++++++----- .../telemetry/main_1pct/view.sql | 2 +- .../derived_view_schemas/__init__.py | 100 +++++++++++------- tests/view/test_view.py | 19 ++-- 6 files changed, 143 insertions(+), 75 deletions(-) diff --git a/bigquery_etl/cli/generate.py b/bigquery_etl/cli/generate.py index 9c66dbe250..aada8e1603 100644 --- a/bigquery_etl/cli/generate.py +++ b/bigquery_etl/cli/generate.py @@ -99,7 +99,22 @@ def generate_all(ctx, output_dir, target_project, ignore, use_cloud_function): """Run all SQL generators.""" click.echo(f"Generating SQL content in {output_dir}.") click.echo(ROOT / SQL_GENERATORS_DIR) - for _, cmd in reversed(generate.commands.items()): + + def generator_command_sort_key(command): + match command.name: + # Run `glean_usage` after `stable_views` because both update `dataset_metadata.yaml` files + # and we want the `glean_usage` updates to take precedence. + case "stable_views": + return (1, command.name) + case "glean_usage": + return (2, command.name) + # Run `derived_view_schemas` last in case other SQL generators create derived views. + case "derived_view_schemas": + return (4, command.name) + case _: + return (3, command.name) + + for cmd in sorted(generate.commands.values(), key=generator_command_sort_key): if cmd.name != "all" and cmd.name not in ignore: ctx.invoke( cmd, diff --git a/bigquery_etl/schema/__init__.py b/bigquery_etl/schema/__init__.py index 6b00d7b468..b5f499771c 100644 --- a/bigquery_etl/schema/__init__.py +++ b/bigquery_etl/schema/__init__.py @@ -247,7 +247,7 @@ class Schema: f"for {prefix}.{field_path} are incompatible" ) - if dtype == "RECORD": + if dtype == "RECORD" and nodes[node_name]["type"] == "RECORD": # keep traversing nested fields self._traverse( f"{prefix}.{field_path}", diff --git a/bigquery_etl/view/__init__.py b/bigquery_etl/view/__init__.py index c818c88bb5..af0d915b68 100644 --- a/bigquery_etl/view/__init__.py +++ b/bigquery_etl/view/__init__.py @@ -6,10 +6,12 @@ import string import time from functools import cached_property from pathlib import Path +from textwrap import dedent +from typing import Optional import attr import sqlparse -from google.api_core.exceptions import BadRequest, Forbidden, NotFound +from google.api_core.exceptions import BadRequest, NotFound from google.cloud import bigquery from bigquery_etl.config import ConfigLoader @@ -20,7 +22,7 @@ from bigquery_etl.metadata.parse_metadata import ( DatasetMetadata, Metadata, ) -from bigquery_etl.schema import Schema +from bigquery_etl.schema import SCHEMA_FILE, Schema from bigquery_etl.util import extract_from_query_path from bigquery_etl.util.common import render @@ -38,6 +40,7 @@ class View: name: str = attr.ib() dataset: str = attr.ib() project: str = attr.ib() + partition_column: Optional[str] = attr.ib(None) @path.validator def validate_path(self, attribute, value): @@ -52,10 +55,12 @@ class View: return render(path.name, template_folder=path.parent) @classmethod - def from_file(cls, path): + def from_file(cls, path, **kwargs): """View from SQL file.""" project, dataset, name = extract_from_query_path(path) - return cls(path=str(path), name=name, dataset=dataset, project=project) + return cls( + path=str(path), name=name, dataset=dataset, project=project, **kwargs + ) @property def view_identifier(self): @@ -175,28 +180,48 @@ class View: ) @cached_property - def view_schema(self): + def schema(self): """Derive view schema from a schema file or a dry run result.""" - schema_file = Path(self.path).parent / "schema.yaml" - # check schema based on schema file - if schema_file.is_file(): - return Schema.from_schema_file(schema_file) - else: # check schema based on dry run results - try: - client = bigquery.Client() + return self.configured_schema or self.dryrun_schema - query_job = client.query( - query=self.content, - job_config=bigquery.QueryJobConfig( - dry_run=True, use_legacy_sql=False - ), + @cached_property + def schema_path(self): + """Return the schema file path.""" + return Path(self.path).parent / SCHEMA_FILE + + @cached_property + def configured_schema(self): + """Derive view schema from a schema file.""" + if self.schema_path.is_file(): + return Schema.from_schema_file(self.schema_path) + return None + + @cached_property + def dryrun_schema(self): + """Derive view schema from a dry run result.""" + try: + # We have to remove `CREATE OR REPLACE VIEW ... AS` from the query to avoid + # view-creation-permission-denied errors, and we have to apply a `WHERE` + # filter to avoid partition-column-filter-missing errors. + schema_query_filter = ( + f"DATE(`{self.partition_column}`) = DATE('2020-01-01')" + if self.partition_column + else "FALSE" + ) + schema_query = dedent( + f""" + WITH view_query AS ( + {CREATE_VIEW_PATTERN.sub("", self.content)} ) - return Schema.from_bigquery_schema(query_job.schema) - except Forbidden: - print( - f"Missing permission to dry run view {self.view_identifier} to get schema" - ) - return None + SELECT * + FROM view_query + WHERE {schema_query_filter} + """ + ) + return Schema.from_query_file(Path(self.path), content=schema_query) + except Exception as e: + print(f"Error dry-running view {self.view_identifier} to get schema: {e}") + return None def _valid_fully_qualified_references(self): """Check that referenced tables and views are fully qualified.""" @@ -314,7 +339,7 @@ class View: table_schema = Schema.from_bigquery_schema(table.schema) - if self.view_schema is not None and not self.view_schema.equal(table_schema): + if self.schema is not None and not self.schema.equal(table_schema): print(f"view {target_view_id} will change: schema does not match") return True @@ -369,9 +394,8 @@ class View: raise try: - schema_path = Path(self.path).parent / "schema.yaml" - if schema_path.is_file(): - self.view_schema.deploy(target_view) + if self.schema_path.is_file(): + self.schema.deploy(target_view) except Exception as e: print(f"Could not update field descriptions for {target_view}: {e}") diff --git a/sql/moz-fx-data-shared-prod/telemetry/main_1pct/view.sql b/sql/moz-fx-data-shared-prod/telemetry/main_1pct/view.sql index 602fef23fd..c466871e53 100644 --- a/sql/moz-fx-data-shared-prod/telemetry/main_1pct/view.sql +++ b/sql/moz-fx-data-shared-prod/telemetry/main_1pct/view.sql @@ -4,4 +4,4 @@ AS SELECT * FROM - `moz-fx-data-shared-prod.telemetry.main_remainder_1pct` + `moz-fx-data-shared-prod.telemetry_derived.main_remainder_1pct_v1` diff --git a/sql_generators/derived_view_schemas/__init__.py b/sql_generators/derived_view_schemas/__init__.py index f77a3427ea..38570feac0 100644 --- a/sql_generators/derived_view_schemas/__init__.py +++ b/sql_generators/derived_view_schemas/__init__.py @@ -16,6 +16,10 @@ NON_USER_FACING_DATASET_SUBSTRINGS = ( "udf", ) +VIEW_FILE = "view.sql" +METADATA_FILE = "metadata.yaml" +SCHEMA_FILE = "schema.yaml" + def _generate_view_schema(sql_dir, view_directory): import logging @@ -24,29 +28,24 @@ def _generate_view_schema(sql_dir, view_directory): from bigquery_etl.metadata.parse_metadata import Metadata from bigquery_etl.schema import Schema from bigquery_etl.util.common import render + from bigquery_etl.view import View logging.basicConfig(format="%(levelname)s (%(filename)s:%(lineno)d) - %(message)s") - VIEW_FILE = "view.sql" - METADATA_FILE = "metadata.yaml" - SCHEMA_FILE = "schema.yaml" - # If the view references only one table, we can: - # 1. Get the reference table partition key if it exists. + # 1. Get the reference table partition column if it exists. # (to dry run views to partitioned tables). # 2. Get the reference table schema and use it to enrich the # view schema we get from dry-running. - def _get_reference_dir_path(view_dir): - view_file = view_dir / VIEW_FILE - if not view_file.exists(): - return - - view_references = extract_table_references(render(view_file.name, view_dir)) + def _get_reference_dir_path(view_file): + view_references = extract_table_references( + render(view_file.name, view_file.parent) + ) if len(view_references) != 1: return - target_project = view_dir.parent.parent.name - target_dataset = view_dir.parent.name + target_project = view_file.parent.parent.parent.name + target_dataset = view_file.parent.parent.name target_reference = view_references[0] parts = target_reference.split(".") @@ -67,9 +66,9 @@ def _generate_view_schema(sql_dir, view_directory): sql_dir / reference_project_id / reference_dataset_id / reference_table_id ) - def _get_reference_partition_key(ref_path): + def _get_reference_partition_column(ref_path): if ref_path is None: - logging.debug("No table reference, skipping partition key.") + logging.debug("No table reference, skipping partition column.") return try: @@ -81,20 +80,24 @@ def _generate_view_schema(sql_dir, view_directory): bigquery_metadata = reference_metadata.bigquery if bigquery_metadata is None: logging.warning( - f"No bigquery metadata at {ref_path}, unable to get partition key." + f"No bigquery metadata at {ref_path}, unable to get partition column." ) return partition_metadata = bigquery_metadata.time_partitioning if partition_metadata is None: logging.warning( - f"No partition metadata at {ref_path}, unable to get partition key." + f"No partition metadata at {ref_path}, unable to get partition column." ) return return partition_metadata.field - reference_path = _get_reference_dir_path(view_directory) + view_file = view_directory / VIEW_FILE + if not view_file.exists(): + return + + reference_path = _get_reference_dir_path(view_file) # If this is a view to a stable table, don't try to write the schema: if reference_path is not None: @@ -102,34 +105,51 @@ def _generate_view_schema(sql_dir, view_directory): if reference_dataset.endswith("_stable"): return - # Optionally get the upstream partition key - reference_partition_key = _get_reference_partition_key(reference_path) - if reference_partition_key is None: - logging.debug("No reference partition key, dry running without one.") + # Optionally get the upstream partition column + reference_partition_column = _get_reference_partition_column(reference_path) + if reference_partition_column is None: + logging.debug("No reference partition column, dry running without one.") - project_id = view_directory.parent.parent.name - dataset_id = view_directory.parent.name - view_id = view_directory.name + view = View.from_file(view_file, partition_column=reference_partition_column) - schema = Schema.for_table( - project_id, dataset_id, view_id, partitioned_by=reference_partition_key - ) - if len(schema.schema.get("fields")) == 0: + # `View.schema` prioritizes the configured schema over the dryrun schema, but here + # we prioritize the dryrun schema because the `schema.yaml` file might be out of date. + schema = view.dryrun_schema or view.configured_schema + if view.dryrun_schema and view.configured_schema: + try: + schema.merge( + view.configured_schema, + attributes=["description"], + add_missing_fields=False, + ignore_missing_fields=True, + ) + except Exception as e: + logging.warning( + f"Error enriching {view.view_identifier} view schema from {view.schema_path}: {e}" + ) + if not schema: logging.warning( - f"Got empty schema for {project_id}.{dataset_id}.{view_id} potentially " + f"Couldn't get schema for {view.view_identifier} potentially " f"due to dry-run error. Won't write yaml." ) return # Optionally enrich the view schema if we have a valid table reference if reference_path: - try: - reference_schema = Schema.from_schema_file(reference_path / SCHEMA_FILE) - schema.merge(reference_schema, add_missing_fields=False) - except Exception as e: - logging.info( - f"Unable to open reference schema; unable to enrich schema: {e}" - ) + reference_schema_file = reference_path / SCHEMA_FILE + if reference_schema_file.exists(): + try: + reference_schema = Schema.from_schema_file(reference_schema_file) + schema.merge( + reference_schema, + attributes=["description"], + add_missing_fields=False, + ignore_missing_fields=True, + ) + except Exception as e: + logging.warning( + f"Error enriching {view.view_identifier} view schema from {reference_schema_file}: {e}" + ) schema.to_yaml_file(view_directory / SCHEMA_FILE) @@ -178,7 +198,11 @@ def generate(target_project, output_dir, parallelism, use_cloud_function): ] for dataset_path in dataset_paths: - view_directories = [path for path in dataset_path.iterdir() if path.is_dir()] + view_directories = [ + path + for path in dataset_path.iterdir() + if path.is_dir() and (path / VIEW_FILE).exists() + ] with ProcessingPool(parallelism) as pool: pool.map( diff --git a/tests/view/test_view.py b/tests/view/test_view.py index 8440c09b1e..6e5a653ea5 100644 --- a/tests/view/test_view.py +++ b/tests/view/test_view.py @@ -146,13 +146,16 @@ class TestView: assert mock_bigquery_table().friendly_name == "Test metadata file" assert mock_bigquery_table().description == "Test description" + @patch("bigquery_etl.dryrun.DryRun") @patch("google.cloud.bigquery.Client") - def test_view_has_changes_no_changes(self, mock_client, simple_view): + def test_view_has_changes_no_changes(self, mock_client, mock_dryrun, simple_view): deployed_view = Mock() deployed_view.view_query = CREATE_VIEW_PATTERN.sub("", simple_view.content) deployed_view.schema = [SchemaField("a", "INT")] mock_client.return_value.get_table.return_value = deployed_view - mock_client.return_value.query.return_value.schema = [SchemaField("a", "INT")] + mock_dryrun.return_value.get_schema.return_value = { + "fields": [{"name": "a", "type": "INT"}] + } assert not simple_view.has_changes() @@ -194,16 +197,18 @@ class TestView: assert metadata_view.has_changes() assert "friendly_name" in capsys.readouterr().out + @patch("bigquery_etl.dryrun.DryRun") @patch("google.cloud.bigquery.Client") - def test_view_has_changes_changed_schema(self, mock_client, simple_view, capsys): + def test_view_has_changes_changed_schema( + self, mock_client, mock_dryrun, simple_view, capsys + ): deployed_view = Mock() deployed_view.view_query = CREATE_VIEW_PATTERN.sub("", simple_view.content) deployed_view.schema = [SchemaField("a", "INT")] mock_client.return_value.get_table.return_value = deployed_view - mock_client.return_value.query.return_value.schema = [ - SchemaField("a", "INT"), - SchemaField("b", "INT"), - ] + mock_dryrun.return_value.get_schema.return_value = { + "fields": [{"name": "a", "type": "INT"}, {"name": "b", "type": "INT"}] + } assert simple_view.has_changes() assert "schema" in capsys.readouterr().out