Publish views serially and default to only publishing changes (#3424)

This commit is contained in:
Daniel Thorn 2022-12-08 10:17:01 -08:00 коммит произвёл GitHub
Родитель 1707dffd20
Коммит cf6ce002d3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 99 добавлений и 88 удалений

Просмотреть файл

@ -1,10 +1,11 @@
"""bigquery-etl CLI view command."""
import functools
import logging
import re
import string
import sys
from graphlib import TopologicalSorter
from multiprocessing.pool import Pool, ThreadPool
from traceback import print_exc
import click
@ -17,7 +18,6 @@ from ..cli.utils import (
use_cloud_function_option,
)
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..util.string_dag import StringDag
from ..view import View, broken_views
from .dryrun import dryrun
@ -180,6 +180,11 @@ def _view_is_valid(view):
is_flag=True,
help="Don't publish views with labels: {authorized: true} in metadata.yaml",
)
@click.option(
"--force",
is_flag=True,
help="Publish views even if there are no changes to the view query",
)
def publish(
name,
sql_dir,
@ -190,6 +195,7 @@ def publish(
dry_run,
user_facing_only,
skip_authorized,
force,
):
"""Publish views."""
# set log level
@ -216,23 +222,29 @@ def publish(
and v.metadata.labels.get("authorized") == ""
)
]
if not force:
# only views with changes
with ThreadPool(parallelism) as p:
changes = p.map(lambda v: v.has_changes(target_project), views, chunksize=1)
views = [v for v, has_changes in zip(views, changes) if has_changes]
views_by_id = {v.view_identifier: v for v in views}
dag = StringDag(
dependencies={
view.view_identifier: {
ref for ref in view.table_references if ref in views_by_id
}
for view in views
view_id_graph = {
view.view_identifier: {
ref for ref in view.table_references if ref in views_by_id
}
)
dag.validate()
for view in views
}
with ThreadPool(parallelism) as p:
publish_view = functools.partial(
_publish_view, target_project, dry_run, views_by_id
)
result = p.map(publish_view, [dag for _ in views], chunksize=1)
view_id_order = TopologicalSorter(view_id_graph).static_order()
result = []
for view_id in view_id_order:
try:
result.append(views_by_id[view_id].publish(target_project, dry_run))
except Exception:
print_exc()
result.append(False)
if not all(result):
sys.exit(1)
@ -240,12 +252,6 @@ def publish(
click.echo("All have been published.")
def _publish_view(target_project, dry_run, views_by_id, dag):
with dag.get() as view_id:
view = views_by_id[view_id]
return view.publish(target_project, dry_run)
@view.command(
help="""List broken views.
Examples:

Просмотреть файл

@ -1,53 +0,0 @@
"""String Dag."""
from contextlib import contextmanager
from copy import deepcopy
from queue import Empty, Queue
from threading import Lock
from typing import Dict, Set
class StringDag:
"""Abstraction for using a queue to iterate in parallel over a set of values.
All dependencies must be acked before a given value is added to the queue. Values in
the queue may be processed in parallel in any order.
"""
def __init__(self, dependencies: Dict[str, Set[str]]):
"""Initialize."""
self.dependencies = deepcopy(dependencies)
self.lock: Lock = Lock()
self.q: Queue = Queue()
self.ready: Set[str] = set()
# ack a nonexisting value to initialize queue
self._ack(None)
def _ack(self, ack_value):
with self.lock:
for value, deps in self.dependencies.items():
deps.discard(ack_value)
if not deps and value not in self.ready:
self.ready.add(value)
self.q.put(value)
@contextmanager
def get(self):
"""Context manager for getting a ready value from the queue."""
value = self.q.get()
try:
yield value
finally:
self._ack(value)
def validate(self):
"""Validate all dependencies eventually resolve."""
_dag = StringDag(self.dependencies)
for _ in range(len(self.dependencies)):
try:
value = _dag.q.get_nowait()
_dag._ack(value)
except Empty:
raise ValueError(
"DAG not valid, some values will never be ready:"
f" {_dag.dependencies.keys() - _dag.ready}"
)

Просмотреть файл

@ -1,5 +1,6 @@
"""Represents a SQL view."""
import re
import string
import time
from functools import cached_property
@ -7,7 +8,7 @@ from pathlib import Path
import attr
import sqlparse
from google.api_core.exceptions import BadRequest
from google.api_core.exceptions import BadRequest, NotFound
from google.cloud import bigquery
from bigquery_etl.format_sql.formatter import reformat
@ -73,6 +74,11 @@ NON_USER_FACING_DATASET_SUFFIXES = (
"glam_etl",
)
# Regex matching CREATE VIEW statement so it can be removed to get the view query
CREATE_VIEW_PATTERN = re.compile(
r"CREATE\s+OR\s+REPLACE\s+VIEW\s+[^\s]+\s+AS", re.IGNORECASE
)
@attr.s(auto_attribs=True)
class View:
@ -216,6 +222,58 @@ class View:
return False
return True
def target_view_identifier(self, target_project=None):
"""Return the view identifier after replacing project with target_project.
Result must be a fully-qualified BigQuery Standard SQL table identifier, which
is of the form f"{project_id}.{dataset_id}.{table_id}". dataset_id and table_id
may not contain "." or "`". Each component may be a backtick (`) quoted
identifier, or the whole thing may be a backtick quoted identifier, but not
both. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some
project IDs also include domain name separated by a colon. IDs must start with a
letter and may not end with a dash. For more information see also
https://github.com/mozilla/bigquery-etl/pull/1427#issuecomment-707376291
"""
if target_project:
return self.view_identifier.replace(self.project, target_project, 1)
return self.view_identifier
def has_changes(self, target_project=None):
"""Determine whether there are any changes that would be published."""
client = bigquery.Client()
target_view_id = self.target_view_identifier(target_project)
try:
table = client.get_table(target_view_id)
except NotFound:
print(f"view {target_view_id} will change: does not exist in BigQuery")
return True
expected_view_query = CREATE_VIEW_PATTERN.sub(
"", sqlparse.format(self.content, strip_comments=True), count=1
).strip()
actual_view_query = sqlparse.format(
table.view_query, strip_comments=True
).strip()
if expected_view_query != actual_view_query:
print(f"view {target_view_id} will change: query does not match")
return True
# check schema
schema_file = Path(self.path).parent / "schema.yaml"
if schema_file.is_file():
view_schema = Schema.from_schema_file(schema_file)
table_schema = Schema.from_json(
{"fields": [f.to_api_repr() for f in table.schema]}
)
if not view_schema.equal(table_schema):
print(f"view {target_view_id} will change: schema does not match")
return True
if self.metadata and self.metadata.labels != table.labels:
print(f"view {target_view_id} will change: labels do not match")
return True
return False
def publish(self, target_project=None, dry_run=False):
"""
Publish this view to BigQuery.
@ -234,25 +292,13 @@ class View:
):
client = bigquery.Client()
sql = self.content
target_view = self.view_identifier
target_view = self.target_view_identifier(target_project)
if target_project:
if self.project != "moz-fx-data-shared-prod":
print(f"Skipping {self.path} because --target-project is set")
return True
# target_view must be a fully-qualified BigQuery Standard SQL table
# identifier, which is of the form f"{project_id}.{dataset_id}.{table_id}".
# dataset_id and table_id may not contain "." or "`". Each component may be
# a backtick (`) quoted identifier, or the whole thing may be a backtick
# quoted identifier, but not both.
# Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some
# project IDs also include domain name separated by a colon. IDs must start
# with a letter and may not end with a dash. For more information see also
# https://github.com/mozilla/bigquery-etl/pull/1427#issuecomment-707376291
target_view = self.view_identifier.replace(
self.project, target_project, 1
)
# We only change the first occurrence, which is in the target view name.
sql = sql.replace(self.project, target_project, 1)
@ -281,6 +327,18 @@ class View:
view_schema.deploy(target_view)
except Exception as e:
print(f"Could not update field descriptions for {target_view}: {e}")
if self.metadata:
table = client.get_table(self.view_identifier)
if table.labels != self.metadata.labels:
labels = self.metadata.labels.copy()
for key in table.labels:
if key not in labels:
# To delete a label its value must be set to None
labels[key] = None
table.labels = labels
client.update_table(table, ["labels"])
print(f"Published view {target_view}")
else:
print(f"Error publishing {self.path}. Invalid view definition.")