Revert "Restrict derived view schema generation to views with upstream schema…" (#4941)
This reverts commit f5ee129b63
.
This commit is contained in:
Родитель
e7c7651935
Коммит
f36e75ab2b
|
@ -379,22 +379,3 @@ class View:
|
|||
return False
|
||||
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_default_view(self) -> bool:
|
||||
"""Determine whether view just SELECTS * FROM its (one) upstream reference."""
|
||||
if len(self.table_references) != 1:
|
||||
return False
|
||||
|
||||
default_view_content = reformat(
|
||||
f"""
|
||||
CREATE OR REPLACE VIEW `{self.project}.{self.dataset}.{self.name}` AS
|
||||
SELECT * FROM `{self.table_references[0]}`
|
||||
"""
|
||||
)
|
||||
|
||||
formatted_content = sqlparse.format(self.content, strip_comments=True).strip(
|
||||
";" + string.whitespace
|
||||
)
|
||||
|
||||
return formatted_content == default_view_content
|
||||
|
|
|
@ -2,20 +2,28 @@
|
|||
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
from pathos.multiprocessing import ProcessingPool
|
||||
|
||||
from bigquery_etl.cli.utils import use_cloud_function_option
|
||||
|
||||
NON_USER_FACING_DATASET_SUBSTRINGS = (
|
||||
"_derived",
|
||||
"_external",
|
||||
"_bi",
|
||||
"_restricted",
|
||||
"udf",
|
||||
)
|
||||
|
||||
def _generate_view_schema(sql_dir: Path, view_directory: Path) -> None:
|
||||
|
||||
def _generate_view_schema(sql_dir, view_directory):
|
||||
import logging
|
||||
|
||||
from bigquery_etl.dependency import extract_table_references
|
||||
from bigquery_etl.metadata.parse_metadata import Metadata
|
||||
from bigquery_etl.schema import Schema
|
||||
from bigquery_etl.view import View
|
||||
from bigquery_etl.util.common import render
|
||||
|
||||
logging.basicConfig(format="%(levelname)s (%(filename)s:%(lineno)d) - %(message)s")
|
||||
|
||||
|
@ -23,81 +31,107 @@ def _generate_view_schema(sql_dir: Path, view_directory: Path) -> None:
|
|||
METADATA_FILE = "metadata.yaml"
|
||||
SCHEMA_FILE = "schema.yaml"
|
||||
|
||||
if (view_schema_path := view_directory / SCHEMA_FILE).exists():
|
||||
return None
|
||||
# If the view references only one table, we can:
|
||||
# 1. Get the reference table partition key if it exists.
|
||||
# (to dry run views to partitioned tables).
|
||||
# 2. Get the reference table schema and use it to enrich the
|
||||
# view schema we get from dry-running.
|
||||
def _get_reference_dir_path(view_dir):
|
||||
view_file = view_dir / VIEW_FILE
|
||||
if not view_file.exists():
|
||||
return
|
||||
|
||||
view = View.from_file(view_directory / VIEW_FILE)
|
||||
view_references = extract_table_references(render(view_file.name, view_dir))
|
||||
if len(view_references) != 1:
|
||||
return
|
||||
|
||||
# View schemas are only used for descriptions. If we have multiple
|
||||
# upstream references, it's unclear which one to copy/reference.
|
||||
if len(view.table_references) != 1:
|
||||
return None
|
||||
target_project = view_dir.parent.parent.name
|
||||
target_dataset = view_dir.parent.name
|
||||
|
||||
ref_project, ref_dataset, ref_table = view.table_references[0].split(".")
|
||||
ref_dir = sql_dir / ref_project / ref_dataset / ref_table
|
||||
target_reference = view_references[0]
|
||||
parts = target_reference.split(".")
|
||||
if len(parts) == 3:
|
||||
reference_project_id, reference_dataset_id, reference_table_id = parts
|
||||
# Fully qualify the reference:
|
||||
elif len(parts) == 2:
|
||||
reference_project_id = target_project
|
||||
reference_dataset_id, reference_table_id = parts
|
||||
elif len(parts) == 1:
|
||||
reference_project_id = target_project
|
||||
reference_dataset_id = target_dataset
|
||||
reference_table_id = parts[0]
|
||||
else:
|
||||
return
|
||||
|
||||
# Stable view schemas are generated in the stable_view generator.
|
||||
if ref_dataset.endswith("_stable"):
|
||||
return None
|
||||
return (
|
||||
sql_dir / reference_project_id / reference_dataset_id / reference_table_id
|
||||
)
|
||||
|
||||
# If there's no upstream schema, there's no descriptions to copy.
|
||||
if not (ref_schema_path := ref_dir / SCHEMA_FILE).exists():
|
||||
return None
|
||||
ref_schema = Schema.from_schema_file(ref_schema_path)
|
||||
|
||||
if view.is_default_view:
|
||||
ref_schema.to_yaml_file(view_schema_path)
|
||||
return None
|
||||
|
||||
def _get_reference_partition_key(ref_path: Optional[Path]) -> Optional[str]:
|
||||
def _get_reference_partition_key(ref_path):
|
||||
if ref_path is None:
|
||||
return None
|
||||
logging.debug("No table reference, skipping partition key.")
|
||||
return
|
||||
|
||||
try:
|
||||
reference_metadata = Metadata.from_file(ref_path / METADATA_FILE)
|
||||
except Exception as metadata_exception:
|
||||
logging.warning(f"Unable to get reference metadata: {metadata_exception}")
|
||||
return None
|
||||
return
|
||||
|
||||
bigquery_metadata = reference_metadata.bigquery
|
||||
if bigquery_metadata is None or bigquery_metadata.time_partitioning is None:
|
||||
if bigquery_metadata is None:
|
||||
logging.warning(
|
||||
f"No bigquery metadata at {ref_path}, unable to get partition key."
|
||||
)
|
||||
return
|
||||
|
||||
partition_metadata = bigquery_metadata.time_partitioning
|
||||
if partition_metadata is None:
|
||||
logging.warning(
|
||||
f"No partition metadata at {ref_path}, unable to get partition key."
|
||||
)
|
||||
return None
|
||||
return
|
||||
|
||||
return bigquery_metadata.time_partitioning.field
|
||||
return partition_metadata.field
|
||||
|
||||
reference_path = _get_reference_dir_path(view_directory)
|
||||
|
||||
# If this is a view to a stable table, don't try to write the schema:
|
||||
if reference_path is not None:
|
||||
reference_dataset = reference_path.parent.name
|
||||
if reference_dataset.endswith("_stable"):
|
||||
return
|
||||
|
||||
# Optionally get the upstream partition key
|
||||
reference_partition_key = _get_reference_partition_key(ref_dir)
|
||||
reference_partition_key = _get_reference_partition_key(reference_path)
|
||||
if reference_partition_key is None:
|
||||
logging.debug("No reference partition key, dry running without one.")
|
||||
|
||||
view_schema = Schema.for_table(
|
||||
view.project, view.dataset, view.name, partitioned_by=reference_partition_key
|
||||
)
|
||||
project_id = view_directory.parent.parent.name
|
||||
dataset_id = view_directory.parent.name
|
||||
view_id = view_directory.name
|
||||
|
||||
if len(view_schema.schema.get("fields")) == 0:
|
||||
schema = Schema.for_table(
|
||||
project_id, dataset_id, view_id, partitioned_by=reference_partition_key
|
||||
)
|
||||
if len(schema.schema.get("fields")) == 0:
|
||||
logging.warning(
|
||||
f"Got empty schema for {view.path} potentially "
|
||||
f"Got empty schema for {project_id}.{dataset_id}.{view_id} potentially "
|
||||
f"due to dry-run error. Won't write yaml."
|
||||
)
|
||||
return None
|
||||
return
|
||||
|
||||
# Enrich the view schema if possible:
|
||||
# Optionally enrich the view schema if we have a valid table reference
|
||||
if reference_path:
|
||||
try:
|
||||
view_schema.merge(
|
||||
ref_schema,
|
||||
attributes=["description"],
|
||||
ignore_missing_fields=True,
|
||||
add_missing_fields=False,
|
||||
)
|
||||
reference_schema = Schema.from_schema_file(reference_path / SCHEMA_FILE)
|
||||
schema.merge(reference_schema, add_missing_fields=False)
|
||||
except Exception as e:
|
||||
# This is a broad exception raised upstream
|
||||
# TODO: Update this and upstream to raise more specific exception
|
||||
logging.warning(f"Failed to merge schemas {ref_dir} and {view.path}: {e}")
|
||||
logging.info(
|
||||
f"Unable to open reference schema; unable to enrich schema: {e}"
|
||||
)
|
||||
|
||||
view_schema.to_yaml_file(view_schema_path)
|
||||
schema.to_yaml_file(view_directory / SCHEMA_FILE)
|
||||
|
||||
|
||||
@click.command("generate")
|
||||
|
@ -131,10 +165,20 @@ def generate(target_project, output_dir, parallelism, use_cloud_function):
|
|||
We dry-run to get the schema data and where possible we enrich the
|
||||
view schemas with underlying table descriptions.
|
||||
"""
|
||||
print("Generating schemas for derived views")
|
||||
|
||||
project_path = Path(f"{output_dir}/{target_project}")
|
||||
view_files = project_path.glob("*/*/view.sql")
|
||||
|
||||
dataset_paths = [
|
||||
dataset_path
|
||||
for dataset_path in project_path.iterdir()
|
||||
if dataset_path.is_dir()
|
||||
and all(
|
||||
substring not in str(dataset_path)
|
||||
for substring in NON_USER_FACING_DATASET_SUBSTRINGS
|
||||
)
|
||||
]
|
||||
|
||||
for dataset_path in dataset_paths:
|
||||
view_directories = [path for path in dataset_path.iterdir() if path.is_dir()]
|
||||
|
||||
with ProcessingPool(parallelism) as pool:
|
||||
pool.map(
|
||||
|
@ -142,5 +186,5 @@ def generate(target_project, output_dir, parallelism, use_cloud_function):
|
|||
_generate_view_schema,
|
||||
Path(output_dir),
|
||||
),
|
||||
[path.parent for path in view_files],
|
||||
view_directories,
|
||||
)
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
-- Test comment
|
||||
CREATE OR REPLACE VIEW
|
||||
`moz-fx-data-test-project.test.default`
|
||||
AS
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
`moz-fx-data-test-project.test_derived.default_v1`
|
|
@ -136,16 +136,3 @@ class TestView:
|
|||
)
|
||||
assert mock_bigquery_table().friendly_name == "Test metadata file"
|
||||
assert mock_bigquery_table().description == "Test description"
|
||||
|
||||
def test_simple_views(self):
|
||||
view = View.from_file(
|
||||
TEST_DIR
|
||||
/ "data"
|
||||
/ "test_sql"
|
||||
/ "moz-fx-data-test-project"
|
||||
/ "test"
|
||||
/ "default"
|
||||
/ "view.sql"
|
||||
)
|
||||
|
||||
assert view.is_default_view
|
||||
|
|
Загрузка…
Ссылка в новой задаче