Skip views to stable tables and UDFs; Dynamically extract view refs (#2663)

This commit is contained in:
Alexander Nicholson 2022-01-17 13:38:02 -05:00 коммит произвёл GitHub
Родитель 9677405044
Коммит a837866bc5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 18 добавлений и 14 удалений

Просмотреть файл

@ -6,17 +6,19 @@ from pathlib import Path
import click
from pathos.multiprocessing import ProcessingPool
NON_USER_FACING_DATASET_SUFFIXES = (
NON_USER_FACING_DATASET_SUBSTRINGS = (
"_derived",
"_external",
"_bi",
"_restricted",
"udf",
)
def _generate_view_schema(sql_dir, view_directory):
import logging
from bigquery_etl.dependency import extract_table_references
from bigquery_etl.metadata.parse_metadata import Metadata
from bigquery_etl.schema import Schema
@ -31,24 +33,19 @@ def _generate_view_schema(sql_dir, view_directory):
# (to dry run views to partitioned tables).
# 2. Get the reference table schema and use it to enrich the
# view schema we get from dry-running.
# Note this works because dependencies (references) are recorded
# as a part of sql-generation.
def _get_reference_dir_path(view_dir):
try:
metadata = Metadata.from_file(view_dir / METADATA_FILE)
except Exception as metadata_exception:
logging.warning(f"Unable to get view metadata: {metadata_exception}")
view_file = view_dir / VIEW_FILE
if not view_file.exists():
return
view_references = metadata.references.get(VIEW_FILE, [])
if len(view_references) == 1:
target_reference = view_references[0]
else:
view_references = extract_table_references(view_file.read_text())
if len(view_references) != 1:
return
target_project = view_dir.parent.parent.name
target_dataset = view_dir.parent.name
target_reference = view_references[0]
parts = target_reference.split(".")
if len(parts) == 3:
reference_project_id, reference_dataset_id, reference_table_id = parts
@ -96,6 +93,12 @@ def _generate_view_schema(sql_dir, view_directory):
reference_path = _get_reference_dir_path(view_directory)
# If this is a view to a stable table, don't try to write the schema:
if reference_path is not None:
reference_dataset = reference_path.parent.name
if reference_dataset.endswith("_stable"):
return
# Optionally get the upstream partition key
reference_partition_key = _get_reference_partition_key(reference_path)
if reference_partition_key is None:
@ -110,7 +113,8 @@ def _generate_view_schema(sql_dir, view_directory):
)
if len(schema.schema.get("fields")) == 0:
logging.warning(
"Got empty schema potentially due to dry-run error. Won't write yaml."
f"Got empty schema for {project_id}.{dataset_id}.{view_id} potentially "
f"due to dry-run error. Won't write yaml."
)
return
@ -164,8 +168,8 @@ def generate(target_project, output_dir, parallelism):
for dataset_path in project_path.iterdir()
if dataset_path.is_dir()
and all(
suffix not in str(dataset_path)
for suffix in NON_USER_FACING_DATASET_SUFFIXES
substring not in str(dataset_path)
for substring in NON_USER_FACING_DATASET_SUBSTRINGS
)
]