Deploy materialized views if they do not exist (#6540)

This commit is contained in:
Ben Wu 2024-11-21 17:31:46 +00:00 коммит произвёл GitHub
Родитель 60fea351c6
Коммит 11d854427c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 81 добавлений и 13 удалений

Просмотреть файл

@ -1426,8 +1426,8 @@ def _initialize_in_parallel(
@click.option(
"--skip-existing",
"--skip_existing",
help="Skip initialization for existing artifacts. "
+ "This ensures that artifacts, like materialized views only get initialized if they don't already exist.",
help="Skip initialization for existing artifacts, "
"otherwise initialization is run for empty tables.",
default=False,
is_flag=True,
)
@ -1459,7 +1459,7 @@ def initialize(
else:
file_regex = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|init\.sql)$"
r"(?:query\.sql|init\.sql|materialized_view\.sql)$"
)
query_files = paths_matching_name_pattern(
name, sql_dir, project_id, file_regex=file_regex
@ -1501,7 +1501,7 @@ def initialize(
except NotFound:
# continue with creating the table
pass
else:
elif len(materialized_views) == 0:
return
try:
@ -1576,17 +1576,30 @@ def initialize(
default_dataset=f"{project}.{dataset}",
)
if file in materialized_views:
click.echo(f"Create materialized view for {file}")
# existing materialized view have to be deleted before re-creation
client.delete_table(full_table_id, not_found_ok=True)
else:
click.echo(f"Create destination table for {file}")
# only deploy materialized view if it doesn't exist
# TODO: https://github.com/mozilla/bigquery-etl/issues/5804
try:
materialized_view_table = client.get_table(full_table_id)
job = client.query(init_sql, job_config=job_config)
# Best-effort check, don't fail if there's an error
try:
has_changes = materialized_view_has_changes(
materialized_view_table.mview_query, init_sql
)
except Exception as e:
change_str = f"failed to compare changes: {e}"
else:
change_str = (
"sql changed" if has_changes else "sql not changed"
)
click.echo(
f"Skipping materialized view {full_table_id}, already exists, {change_str}"
)
except NotFound:
job = client.query(init_sql, job_config=job_config)
if not dry_run:
job.result()
if not dry_run:
job.result()
except Exception:
print_exc()
return query_file
@ -1601,6 +1614,27 @@ def initialize(
sys.exit(1)
def materialized_view_has_changes(deployed_sql: str, file_sql: str) -> bool:
"""Return true if the sql in the materialized view file doesn't match the deployed sql."""
file_sql_formatted = sqlparse.format(
re.sub(
r"CREATE+(?:\s+OR\s+REPLACE)?\s+MATERIALIZED\s+VIEW.*?AS",
"",
file_sql,
flags=re.DOTALL,
),
strip_comments=True,
strip_whitespace=True,
)
deployed_sql_formatted = sqlparse.format(
deployed_sql,
strip_comments=True,
strip_whitespace=True,
)
return file_sql_formatted != deployed_sql_formatted
@query.command(
help="""Render a query Jinja template.

Просмотреть файл

@ -12,6 +12,7 @@ from bigquery_etl.cli.query import (
create,
deploy,
info,
materialized_view_has_changes,
paths_matching_name_pattern,
schedule,
)
@ -918,3 +919,36 @@ class TestQuery:
assert result.exit_code == 0
mock_deploy_table.assert_not_called()
def test_materialized_view_has_changes(self):
deployed_sql = " SELECT * FROM project.dataset.table "
assert not materialized_view_has_changes(
deployed_sql,
"""
CREATE MATERIALIZED VIEW project.dataset.view AS SELECT * FROM
project.dataset.table
""",
)
assert not materialized_view_has_changes(
deployed_sql,
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS project.dataset.view AS SELECT * FROM
project.dataset.table
""",
)
assert materialized_view_has_changes(
deployed_sql,
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS project.dataset.view AS SELECT * FROM
project.dataset.table WHERE TRUE
""",
)
assert not materialized_view_has_changes(
deployed_sql,
"""
CREATE OR REPLACE MATERIALIZED VIEW
project.dataset.view AS SELECT * FROM
project.dataset.table
""",
)