Add bqetl view validate command

This commit is contained in:
Anna Scholtz 2021-07-22 13:43:25 -07:00
Родитель 6733000e25
Коммит 62c85cb36c
12 изменённых файлов: 439 добавлений и 151 удалений

146
bigquery_etl/cli/common.py Normal file
Просмотреть файл

@ -0,0 +1,146 @@
"""Utility functions used by the CLI."""
import os
import fnmatch
from fnmatch import fnmatchcase
from pathlib import Path
import click
import re
from google.cloud import bigquery
from bigquery_etl.util.common import project_dirs
QUERY_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|part1\.sql|script\.sql|query\.py|view\.sql)$"
)
def is_valid_dir(ctx, param, value):
"""Check if the parameter provided via click is an existing directory."""
if not os.path.isdir(value) or not os.path.exists(value):
raise click.BadParameter(f"Invalid directory path to {value}")
return value
def is_valid_file(ctx, param, value):
"""Check if the parameter provided via click is an existing file."""
if not os.path.isfile(value) or not os.path.exists(value):
raise click.BadParameter(f"Invalid file path to {value}")
return value
def is_authenticated(project_id=None):
"""Check if the user is authenticated to GCP and can access the project."""
client = bigquery.Client()
if project_id:
return client.project == project_id
return True
def is_valid_project(ctx, param, value):
"""Check if the provided project_id corresponds to an existing project."""
if value is None or value in [Path(p).name for p in project_dirs()]:
return value
raise click.BadParameter(f"Invalid project {value}")
def table_matches_patterns(pattern, invert, table):
"""Check if tables match pattern."""
pattern = re.compile(fnmatch.translate(pattern))
return (pattern.match(table) is not None) != invert
def paths_matching_name_pattern(pattern, sql_path, project_id, files=("*.sql")):
"""Return paths to queries matching the name pattern."""
matching_files = []
if pattern is None:
pattern = "*.*"
if os.path.isdir(pattern):
for root, _, _ in os.walk(pattern):
for file in files:
matching_files.extend(Path(root).rglob(file))
elif os.path.isfile(pattern):
for file in files:
matching_files.extend(Path(sql_path).rglob(file))
else:
sql_path = Path(sql_path)
if project_id is not None:
sql_path = sql_path / project_id
all_matching_files = []
for file in files:
all_matching_files.extend(Path(sql_path).rglob(file))
for query_file in all_matching_files:
match = QUERY_FILE_RE.match(str(query_file))
if match:
project = match.group(1)
dataset = match.group(2)
table = match.group(3)
query_name = f"{project}.{dataset}.{table}"
if fnmatchcase(query_name, f"*{pattern}"):
matching_files.append(query_file)
elif project_id and fnmatchcase(query_name, f"{project_id}.{pattern}"):
matching_files.append(query_file)
if len(matching_files) == 0:
print(f"No files matching: {pattern}")
return matching_files
sql_dir_option = click.option(
"--sql_dir",
help="Path to directory which contains queries.",
type=click.Path(file_okay=False),
default="sql",
callback=is_valid_dir,
)
use_cloud_function_option = click.option(
"--use_cloud_function",
"--use-cloud-function",
help=(
"Use the Cloud Function for dry running SQL, if set to `True`. "
"The Cloud Function can only access tables in shared-prod. "
"If set to `False`, use active GCP credentials for the dry run."
),
type=bool,
default=True,
)
parallelism_option = click.option(
"--parallelism",
"-p",
default=8,
type=int,
help="Number of threads for parallel processing",
)
def project_id_option(default=None):
"""Generate a project-id option, with optional default."""
return click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=default,
callback=is_valid_project,
)
def respect_dryrun_skip_option(default=True):
"""Generate a respect_dryrun_skip option."""
flags = {True: "--respect-dryrun-skip", False: "--ignore-dryrun-skip"}
return click.option(
f"{flags[True]}/{flags[False]}",
help="Respect or ignore dry run SKIP configuration. "
f"Default is {flags[default]}.",
default=default,
)

Просмотреть файл

@ -7,7 +7,7 @@ from pathlib import Path
import click
import yaml
from ..cli.utils import is_valid_dir, is_valid_file
from ..cli.common import is_valid_dir, is_valid_file
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..query_scheduling.dag import Dag
from ..query_scheduling.dag_collection import DagCollection

Просмотреть файл

@ -11,7 +11,7 @@ from typing import Set
import click
from google.cloud import bigquery
from ..cli.utils import is_authenticated
from ..cli.common import is_authenticated
from ..dryrun import SKIP, DryRun

Просмотреть файл

@ -4,7 +4,7 @@ from multiprocessing.pool import ThreadPool
from pathlib import Path
import click
from ..cli.utils import (
from ..cli.common import (
is_valid_project,
table_matches_patterns,
)

Просмотреть файл

@ -5,7 +5,7 @@ import re
import string
import sys
from datetime import date, timedelta
from fnmatch import fnmatchcase
from functools import partial
from multiprocessing.pool import Pool
from pathlib import Path
@ -16,7 +16,15 @@ from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from ..cli.format import format
from ..cli.utils import is_authenticated, is_valid_dir, is_valid_project
from ..cli.common import (
is_authenticated,
is_valid_project,
sql_dir_option,
use_cloud_function_option,
paths_matching_name_pattern,
project_id_option,
respect_dryrun_skip_option,
)
from ..dependency import get_dependency_graph
from ..dryrun import SKIP, DryRun
from ..format_sql.formatter import reformat
@ -31,80 +39,9 @@ from ..util.common import random_str
from .dryrun import dryrun
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
SQL_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+_v[0-9]+)/"
r"(?:query\.sql|part1\.sql|script\.sql)$"
)
VERSION_RE = re.compile(r"_v[0-9]+")
def _queries_matching_name_pattern(pattern, sql_path, project_id):
"""Return paths to queries matching the name pattern."""
sql_path = Path(sql_path)
if project_id is not None:
sql_path = sql_path / project_id
all_sql_files = Path(sql_path).rglob("*.sql")
sql_files = []
for sql_file in all_sql_files:
match = SQL_FILE_RE.match(str(sql_file))
if match:
project = match.group(1)
dataset = match.group(2)
table = match.group(3)
query_name = f"{project}.{dataset}.{table}"
if fnmatchcase(query_name, f"*{pattern}"):
sql_files.append(sql_file)
elif project_id and fnmatchcase(query_name, f"{project_id}.{pattern}"):
sql_files.append(sql_file)
return sql_files
sql_dir_option = click.option(
"--sql_dir",
help="Path to directory which contains queries.",
type=click.Path(file_okay=False),
default="sql",
callback=is_valid_dir,
)
use_cloud_function_option = click.option(
"--use_cloud_function",
"--use-cloud-function",
help=(
"Use the Cloud Function for dry running SQL, if set to `True`. "
"The Cloud Function can only access tables in shared-prod. "
"If set to `False`, use active GCP credentials for the dry run."
),
type=bool,
default=True,
)
def respect_dryrun_skip_option(default=True):
"""Generate a respect_dryrun_skip option."""
flags = {True: "--respect-dryrun-skip", False: "--ignore-dryrun-skip"}
return click.option(
f"{flags[True]}/{flags[False]}",
help="Respect or ignore dry run SKIP configuration. "
f"Default is {flags[default]}.",
default=default,
)
def project_id_option(default=None):
"""Generate a project-id option, with optional default."""
return click.option(
"--project-id",
"--project_id",
help="GCP project ID",
default=default,
callback=is_valid_project,
)
@click.group(help="Commands for managing queries.")
def query():
"""Create the CLI group for the query command."""
@ -315,7 +252,7 @@ def create(name, sql_dir, project_id, owner, init):
)
def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):
"""CLI command for scheduling a query."""
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
click.echo(f"Name doesn't refer to any queries: {name}", err=True)
@ -413,7 +350,7 @@ def info(name, sql_dir, project_id, cost, last_updated):
if name is None:
name = "*.*"
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
for query_file in query_files:
query_file_path = Path(query_file)
@ -619,7 +556,7 @@ def backfill(
)
sys.exit(1)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)]
for query_file in query_files:
@ -709,7 +646,7 @@ def validate(
if name is None:
name = "*.*"
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
dataset_dirs = set()
for query in query_files:
project = query.parent.parent.parent.name
@ -755,7 +692,7 @@ def initialize(name, sql_dir, project_id, dry_run):
# allow name to be a path
query_files = [Path(name)]
else:
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
for query_file in query_files:
init_files = Path(query_file.parent).rglob("init.sql")
@ -846,7 +783,7 @@ def update(
"and check that the project is set correctly."
)
sys.exit(1)
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
dependency_graph = get_dependency_graph([sql_dir], without_views=True)
tmp_tables = {}
@ -886,7 +823,7 @@ def update(
dependencies = [
p
for k, refs in dependency_graph.items()
for p in _queries_matching_name_pattern(k, sql_dir, project_id)
for p in paths_matching_name_pattern(k, sql_dir, project_id)
if identifier in refs
]
@ -934,7 +871,7 @@ def _update_query_schema(
for derived_from in metadata.schema.derived_from:
parent_queries = [
query
for query in _queries_matching_name_pattern(
for query in paths_matching_name_pattern(
".".join(derived_from.table), sql_dir, project_id
)
]
@ -1112,7 +1049,7 @@ def deploy(
sys.exit(1)
client = bigquery.Client()
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
for query_file in query_files:
if respect_dryrun_skip and str(query_file) in SKIP:
@ -1203,7 +1140,7 @@ def deploy(
@respect_dryrun_skip_option(default=True)
def validate_schema(name, sql_dir, project_id, use_cloud_function, respect_dryrun_skip):
"""Validate the defined query schema against the query and destination table."""
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
def _validate_schema(query_file_path):
return (

Просмотреть файл

@ -14,7 +14,7 @@ import pytest
import yaml
from ..cli.format import format
from ..cli.utils import is_authenticated, is_valid_dir, is_valid_project
from ..cli.common import is_authenticated, is_valid_dir, is_valid_project
from ..docs import validate_docs
from ..format_sql.formatter import reformat
from ..routine import publish_routines

Просмотреть файл

@ -1,48 +0,0 @@
"""Utility functions used by the CLI."""
import os
import fnmatch
from pathlib import Path
import click
import re
from google.cloud import bigquery
from bigquery_etl.util.common import project_dirs
def is_valid_dir(ctx, param, value):
"""Check if the parameter provided via click is an existing directory."""
if not os.path.isdir(value) or not os.path.exists(value):
raise click.BadParameter(f"Invalid directory path to {value}")
return value
def is_valid_file(ctx, param, value):
"""Check if the parameter provided via click is an existing file."""
if not os.path.isfile(value) or not os.path.exists(value):
raise click.BadParameter(f"Invalid file path to {value}")
return value
def is_authenticated(project_id=None):
"""Check if the user is authenticated to GCP and can access the project."""
client = bigquery.Client()
if project_id:
return client.project == project_id
return True
def is_valid_project(ctx, param, value):
"""Check if the provided project_id corresponds to an existing project."""
if value is None or value in [Path(p).name for p in project_dirs()]:
return value
raise click.BadParameter(f"Invalid project {value}")
def table_matches_patterns(pattern, invert, table):
"""Check if tables match pattern."""
pattern = re.compile(fnmatch.translate(pattern))
return (pattern.match(table) is not None) != invert

Просмотреть файл

@ -1,7 +1,19 @@
"""bigquery-etl CLI view command."""
import click
import sys
from bigquery_etl.view import publish_views
from multiprocessing.pool import Pool
from ..view import View
from .dryrun import dryrun
from ..cli.common import (
sql_dir_option,
use_cloud_function_option,
parallelism_option,
paths_matching_name_pattern,
project_id_option,
respect_dryrun_skip_option,
)
@click.group(help="Commands for managing views.")
@ -10,4 +22,120 @@ def view():
pass
view.add_command(publish_views.main, "publish")
@view.command(
help="""Validate a view.
Checks formatting, naming, references and dry runs the view.
Examples:
./bqetl view validate telemetry.clients_daily
"""
)
@click.argument("name", required=False)
@sql_dir_option
@project_id_option()
@use_cloud_function_option
@click.option(
"--validate_schemas",
"--validate-schemas",
help="Require dry run schema to match destination table and file if present.",
is_flag=True,
default=False,
)
@parallelism_option
@respect_dryrun_skip_option()
@click.pass_context
def validate(
ctx,
name,
sql_dir,
project_id,
use_cloud_function,
validate_schemas,
parallelism,
respect_dryrun_skip,
):
"""Validate the view definition."""
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("*view.sql")
)
views = [View.from_file(f) for f in view_files]
with Pool(parallelism) as p:
result = p.map(_view_is_valid, views, chunksize=1)
if not all(result):
sys.exit(1)
# dryrun views
ctx.invoke(
dryrun,
paths=[str(f) for f in view_files],
use_cloud_function=use_cloud_function,
project=project_id,
validate_schemas=validate_schemas,
respect_skip=respect_dryrun_skip,
)
click.echo("All views are valid.")
def _view_is_valid(view):
return view.is_valid()
@view.command(help="""Publish views.
Examples:
# Publish all views
./bqetl view publish
# Publish a specific view
./bqetl view validate telemetry.clients_daily
""")
@click.argument("name", required=False)
@sql_dir_option
@project_id_option()
@click.option(
"--target-project",
help=(
"If specified, create views in the target project rather than"
" the project specified in the file. Only views for "
" moz-fx-data-shared-prod will be published if this is set."
),
)
@click.option("--log-level", default="INFO", help="Defaults to INFO")
@parallelism_option
@click.option(
"--dry_run",
"--dry-run",
is_flag=True,
help="Validate view definitions, but do not publish them.",
)
@click.option(
"--user-facing-only",
"--user_facing_only",
is_flag=True,
help=(
"Publish user-facing views only. User-facing views are views"
" part of datasets without suffixes (such as telemetry,"
" but not telemetry_derived)."
),
)
def publish(name, sql_dir, project_id, target_project, log_level, parallelism, dry_run, user_facing_only):
"""Publish views."""
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("view.sql",)
)
views = [View.from_file(f) for f in view_files]
with Pool(parallelism) as p:
result = p.map(_publish_view, views, chunksize=1)
if not all(result):
sys.exit(1)
click.echo("All have been published.")
def _publish_view(view):
view.publish()

Просмотреть файл

@ -9,7 +9,7 @@ from typing import Dict, Iterator, List, Tuple
import click
import yaml
from .view.generate_stable_views import get_stable_table_schemas
from bigquery_etl.view.generate_stable_views import get_stable_table_schemas
stable_views = None

Просмотреть файл

@ -0,0 +1,125 @@
"""Represents a SQL view."""
import attr
import sqlparse
from google.cloud import bigquery
from pathlib import Path
from bigquery_etl.util import extract_from_query_path
# skip validation for these views
SKIP_VALIDATION = {
# not matching directory structure, but created before validation was enforced
"sql/moz-fx-data-shared-prod/stripe/subscription/view.sql",
"sql/moz-fx-data-shared-prod/stripe/product/view.sql",
"sql/moz-fx-data-shared-prod/stripe/plan/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/client_probe_counts_v1/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/clients_daily_histogram_aggregates_v1/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/clients_scalar_aggregates_v1/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/clients_daily_scalar_aggregates_v1/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/clients_histogram_aggregates_v1/view.sql",
"sql/moz-fx-data-shared-prod/telemetry/clients_probe_processes/view.sql",
}
# skip publishing these views
SKIP_PUBLISHING = {
# Access Denied
"activity_stream/tile_id_types/view.sql",
"pocket/pocket_reach_mau/view.sql",
"telemetry/buildhub2/view.sql",
# Dataset glam-fenix-dev:glam_etl was not found
# TODO: this should be removed if views are to be automatically deployed
*[str(path) for path in Path("sql/glam-fenix-dev").glob("glam_etl/**/view.sql")],
}
# suffixes of datasets with non-user-facing views
NON_USER_FACING_DATASET_SUFFIXES = (
"_derived",
"_external",
"_bi",
"_restricted",
)
@attr.s(auto_attribs=True)
class View:
"""Representation of a SQL view stored in a view.sql file."""
path: str = attr.ib()
name: str = attr.ib()
dataset: str = attr.ib()
project: str = attr.ib()
# todo: validators
def content(self):
"""Return the view SQL."""
return Path(self.path).read_text()
@classmethod
def from_file(cls, path):
"""View from SQL file."""
project, dataset, name = extract_from_query_path(path)
return cls(path=str(path), name=name, dataset=dataset, project=project)
def is_valid(self):
"""Validate the SQL view definition."""
if self.path in SKIP_VALIDATION:
print(f"Skipped validation for {self.path}")
return True
return self._valid_fully_qualified_references() and self._valid_view_naming()
def _valid_fully_qualified_references(self):
"""Check that referenced tables and views are fully qualified."""
from bigquery_etl.dependency import extract_table_references
for table in extract_table_references(self.content()):
if len(table.split(".")) < 3:
print(f"{self.path} ERROR\n{table} missing project_id qualifier")
return False
return True
def _valid_view_naming(self):
"""Validate that the created view naming matches the directory structure."""
parsed = sqlparse.parse(self.content())[0]
tokens = [
t
for t in parsed.tokens
if not (t.is_whitespace or isinstance(t, sqlparse.sql.Comment))
]
is_view_statement = (
" ".join(tokens[0].normalized.split()) == "CREATE OR REPLACE"
and tokens[1].normalized == "VIEW"
)
if is_view_statement:
target_view = str(tokens[2]).strip().split()[0]
try:
[project_id, dataset_id, view_id] = target_view.replace("`", "").split(
"."
)
if not (
self.name == view_id
and self.dataset == dataset_id
and self.project == project_id
):
print(
f"{self.path} ERROR\n"
f"View name {target_view} not matching directory structure."
)
return False
except Exception:
print(f"{self.path} ERROR\n{target_view} missing project ID qualifier.")
return False
else:
print(
f"ERROR: {self.path} does not appear to be "
"a CREATE OR REPLACE VIEW statement! Quitting..."
)
return False
return True
def publish(self):
"""Publish this view to BigQuery."""
client = bigquery.Client()

Просмотреть файл

@ -5,7 +5,7 @@ import yaml
from click.testing import CliRunner
from bigquery_etl.cli.query import (
_queries_matching_name_pattern,
paths_matching_name_pattern,
create,
info,
schedule,
@ -285,7 +285,7 @@ class TestQuery:
assert "telemetry_derived.query_v2" in result.output
assert "telemetry_derived.query_v1" not in result.output
def test_queries_matching_name_pattern(self, runner):
def testpaths_matching_name_pattern(self, runner):
with runner.isolated_filesystem():
os.makedirs("sql/moz-fx-data-shared-prod/telemetry_derived/query_v1")
with open(
@ -311,10 +311,10 @@ class TestQuery:
) as f:
f.write("SELECT 1")
assert len(_queries_matching_name_pattern("*", "sql/", None)) == 4
assert len(paths_matching_name_pattern("*", "sql/", None)) == 4
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"*.sql", "sql/", "moz-fx-data-shared-prod"
)
)
@ -322,7 +322,7 @@ class TestQuery:
)
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"test", "sql/", "moz-fx-data-shared-prod"
)
)
@ -330,7 +330,7 @@ class TestQuery:
)
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"foo_derived", "sql/", "moz-fx-data-shared-prod"
)
)
@ -338,16 +338,16 @@ class TestQuery:
)
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"foo_derived*", "sql/", "moz-fx-data-shared-prod"
)
)
== 1
)
assert len(_queries_matching_name_pattern("*query*", "sql/", None)) == 4
assert len(paths_matching_name_pattern("*query*", "sql/", None)) == 4
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"foo_derived.query_v2", "sql/", "moz-fx-data-shared-prod"
)
)
@ -356,7 +356,7 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"telemetry_derived.query_v1", "sql/", "moz-fx-data-test-project"
)
)
@ -365,7 +365,7 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"moz-fx-data-test-project.telemetry_derived.query_v1",
"sql/",
None,
@ -376,7 +376,7 @@ class TestQuery:
assert (
len(
_queries_matching_name_pattern(
paths_matching_name_pattern(
"moz-fx-data-test-project.telemetry_derived.*", "sql/", None
)
)

Просмотреть файл

@ -3,7 +3,7 @@ from pathlib import Path
import pytest
from click.exceptions import BadParameter
from bigquery_etl.cli.utils import (
from bigquery_etl.cli.common import (
is_authenticated,
is_valid_dir,
is_valid_file,