Add `bqetl views clean` command (#3433)

to remove managed views from BigQuery when they are removed from sql dir
This commit is contained in:
Daniel Thorn 2022-12-08 14:48:03 -08:00 коммит произвёл GitHub
Родитель c807de71c8
Коммит f1699b21ed
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 181 добавлений и 31 удалений

Просмотреть файл

@ -3,6 +3,7 @@ import logging
import re
import string
import sys
from fnmatch import fnmatchcase
from graphlib import TopologicalSorter
from multiprocessing.pool import Pool, ThreadPool
from traceback import print_exc
@ -18,7 +19,9 @@ from ..cli.utils import (
use_cloud_function_option,
)
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..view import View, broken_views
from ..util.bigquery_id import sql_table_id
from ..util.client_queue import ClientQueue
from ..view import NON_USER_FACING_DATASET_SUFFIXES, View, broken_views
from .dryrun import dryrun
VIEW_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
@ -185,6 +188,15 @@ def _view_is_valid(view):
is_flag=True,
help="Publish views even if there are no changes to the view query",
)
@click.option(
"--add-managed-label",
"--add_managed_label",
is_flag=True,
help=(
'Add a label "managed" to views, that can be used to remove views from BigQuery'
" when they are removed from --sql-dir."
),
)
def publish(
name,
sql_dir,
@ -196,32 +208,19 @@ def publish(
user_facing_only,
skip_authorized,
force,
add_managed_label,
):
"""Publish views."""
# set log level
try:
logging.basicConfig(level=log_level, format="%(levelname)s %(message)s")
except ValueError as e:
click.error(f"argument --log-level: {e}")
raise click.ClickException(f"argument --log-level: {e}")
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("view.sql",)
)
views = [View.from_file(f) for f in view_files]
if user_facing_only:
views = [v for v in views if v.is_user_facing]
if skip_authorized:
views = [
v
for v in views
if not (
v.metadata
and v.metadata.labels
# labels with boolean true are translated to ""
and v.metadata.labels.get("authorized") == ""
)
]
views = _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized)
if add_managed_label:
for view in views:
view.labels["managed"] = ""
if not force:
# only views with changes
with ThreadPool(parallelism) as p:
@ -252,6 +251,148 @@ def publish(
click.echo("All have been published.")
def _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized):
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("view.sql",)
)
views = [View.from_file(f) for f in view_files]
if user_facing_only:
views = [v for v in views if v.is_user_facing]
if skip_authorized:
views = [
v
for v in views
if not (
v.metadata
and v.metadata.labels
# labels with boolean true are translated to ""
and v.metadata.labels.get("authorized") == ""
)
]
return views
@view.command(
help="""Remove managed views that are not present in the sql dir.
Examples:
# Clean managed views in shared prod
./bqetl view clean --target-project=moz-fx-data-shared-prod --skip-authorized
# Clean managed user facing views in mozdata
./bqetl view clean --target-project=mozdata --user-facing-only --skip-authorized
"""
)
@click.argument("name", required=False)
@sql_dir_option
@project_id_option(default=None)
@click.option(
"--target-project",
help=(
"If specified, clean views in the target project rather than"
" the project specified in the file. Only views for "
" moz-fx-data-shared-prod will be included if this is set."
),
)
@click.option("--log-level", default="INFO", help="Defaults to INFO")
@parallelism_option
@click.option(
"--dry_run",
"--dry-run",
is_flag=True,
help="Identify views to delete, but do not delete them.",
)
@click.option(
"--user-facing-only",
"--user_facing_only",
is_flag=True,
help=(
"Remove user-facing views only. User-facing views are views"
" part of datasets without suffixes (such as telemetry,"
" but not telemetry_derived)."
),
)
@click.option(
"--skip-authorized",
"--skip_authorized",
is_flag=True,
help="Don't publish views with labels: {authorized: true} in metadata.yaml",
)
def clean(
name,
sql_dir,
project_id,
target_project,
log_level,
parallelism,
dry_run,
user_facing_only,
skip_authorized,
):
"""Clean managed views."""
# set log level
try:
logging.basicConfig(level=log_level, format="%(levelname)s %(message)s")
except ValueError as e:
raise click.ClickException(f"argument --log-level: {e}")
if project_id is None and target_project is None:
raise click.ClickException("command requires --project-id or --target-project")
expected_view_ids = {
view.target_view_identifier(target_project)
for view in _collect_views(
name, sql_dir, project_id, user_facing_only, skip_authorized
)
}
client_q = ClientQueue([project_id], parallelism)
with client_q.client() as client:
datasets = [
dataset
for dataset in client.list_datasets(target_project)
if not user_facing_only
or not dataset.dataset_id.endswith(NON_USER_FACING_DATASET_SUFFIXES)
]
with ThreadPool(parallelism) as p:
managed_view_ids = {
sql_table_id(view)
for views in p.starmap(
client_q.with_client,
((_list_managed_views, dataset, name) for dataset in datasets),
chunksize=1,
)
for view in views
if not skip_authorized or "authorized" not in view.labels
}
remove_view_ids = sorted(managed_view_ids - expected_view_ids)
p.starmap(
client_q.with_client,
((_remove_view, view_id, dry_run) for view_id in remove_view_ids),
chunksize=1,
)
def _list_managed_views(client, dataset, pattern):
return [
table
for table in client.list_tables(dataset)
if table.table_type == "VIEW"
and "managed" in table.labels
and (pattern is None or fnmatchcase(sql_table_id(table), f"*{pattern}"))
]
def _remove_view(client, view_id, dry_run):
if dry_run:
click.echo(f"Would delete {view_id}")
else:
click.echo(f"Deleting {view_id}")
client.delete_table(view_id)
@view.command(
help="""List broken views.
Examples:

Просмотреть файл

@ -114,6 +114,16 @@ class View:
return None
return Metadata.from_file(path)
@property
def labels(self):
"""Return the view labels."""
if not hasattr(self, "_labels"):
if self.metadata:
self._labels = self.metadata.labels.copy()
else:
self._labels = {}
return self._labels
@classmethod
def create(cls, project, dataset, name, sql_dir, base_table=None):
"""
@ -266,7 +276,7 @@ class View:
print(f"view {target_view_id} will change: schema does not match")
return True
if self.metadata and self.metadata.labels != table.labels:
if self.labels != table.labels:
print(f"view {target_view_id} will change: labels do not match")
return True
return False
@ -325,16 +335,15 @@ class View:
except Exception as e:
print(f"Could not update field descriptions for {target_view}: {e}")
if self.metadata:
table = client.get_table(target_view)
if table.labels != self.metadata.labels:
labels = self.metadata.labels.copy()
for key in table.labels:
if key not in labels:
# To delete a label its value must be set to None
labels[key] = None
table.labels = labels
client.update_table(table, ["labels"])
table = client.get_table(target_view)
if table.labels != self.labels:
labels = self.labels.copy()
for key in table.labels:
if key not in labels:
# To delete a label its value must be set to None
labels[key] = None
table.labels = labels
client.update_table(table, ["labels"])
print(f"Published view {target_view}")
else: