Make schema validation part of dryrun (#2069)

This commit is contained in:
Daniel Thorn 2021-05-25 11:53:09 -07:00 коммит произвёл GitHub
Родитель 13fc30f169
Коммит 3c8894fdf1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 193 добавлений и 200 удалений

Просмотреть файл

@ -62,6 +62,8 @@ jobs:
docker: *docker
steps:
- checkout
- *restore_venv_cache
- *build
- run:
name: Dry run queries
# yamllint disable rule:line-length
@ -80,7 +82,7 @@ jobs:
PATHS="$(git diff origin/main... --name-only --diff-filter=d -- sql)"
fi
echo $PATHS
script/dryrun $PATHS
PATH="venv/bin:$PATH" script/dryrun --validate-schemas $PATHS
# yamllint enable rule:line-length
validate-metadata:
docker: *docker
@ -230,17 +232,6 @@ jobs:
- run:
name: Validate views
command: PATH="venv/bin:$PATH" script/validate_views
validate-schemas:
docker: *docker
steps:
- checkout
- *restore_venv_cache
- *build
- run:
name: Validate query schemas
command: |
./bqetl bootstrap
./bqetl query schema validate "*"
docs:
docker: *docker
steps:
@ -356,7 +347,6 @@ workflows:
- verify-requirements
- dry-run-sql
- validate-metadata
- validate-schemas
- integration
- validate-dags
- verify-dags-up-to-date

Просмотреть файл

@ -1,9 +1,12 @@
"""bigquery-etl CLI dryrun command."""
import fnmatch
import glob
import os
import re
import sys
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import Set
import click
from google.cloud import bigquery
@ -27,8 +30,8 @@ from ..dryrun import SKIP, DryRun
""",
)
@click.argument(
"path",
default="sql/",
"paths",
nargs=-1,
type=click.Path(file_okay=True),
)
@click.option(
@ -42,61 +45,60 @@ from ..dryrun import SKIP, DryRun
type=bool,
default=True,
)
@click.option(
"--validate_schemas",
"--validate-schemas",
help="Require dry run schema to match destination table and file if present.",
is_flag=True,
default=False,
)
@click.option(
"--project",
help="GCP project to perform dry run in when --use_cloud_function=False",
default="moz-fx-data-shared-prod",
)
def dryrun(path, use_cloud_function, project):
def dryrun(paths, use_cloud_function, validate_schemas, project):
"""Perform a dry run."""
if os.path.isdir(path) and os.path.exists(path):
sql_files = [f for f in Path(path).rglob("*.sql") if str(f) not in SKIP]
elif os.path.isfile(path) and os.path.exists(path):
sql_files = [path]
else:
click.echo(f"Invalid path {path}", err=True)
sys.exit(1)
file_names = ("query.sql", "view.sql", "part*.sql", "init.sql")
file_re = re.compile("|".join(map(fnmatch.translate, file_names)))
sql_files: Set[str] = set()
for path in paths:
if os.path.isdir(path):
sql_files |= {
sql_file
for pattern in file_names
for sql_file in glob.glob(f"{path}/**/{pattern}", recursive=True)
}
elif os.path.isfile(path):
if file_re.fullmatch(os.path.basename(path)):
sql_files.add(path)
else:
click.echo(f"Invalid path {path}", err=True)
sys.exit(1)
sql_files -= SKIP
if not sql_files:
print("Skipping dry run because no queries matched")
sys.exit(0)
if use_cloud_function:
def cloud_function_dryrun(sqlfile):
"""Dry run SQL files."""
return DryRun(sqlfile).is_valid()
sql_file_valid = cloud_function_dryrun
client = None
else:
if not is_authenticated():
click.echo("Not authenticated to GCP. Run `gcloud auth login` to login.")
sys.exit(1)
client = bigquery.Client(project=project)
client = bigquery.Client()
def gcp_dryrun(sqlfile):
"""Dry run the SQL file."""
dataset = Path(sqlfile).parent.parent.name
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
default_dataset=f"{project}.{dataset}",
query_parameters=[
bigquery.ScalarQueryParameter(
"submission_date", "DATE", "2019-01-01"
)
],
)
with open(sqlfile) as query_stream:
query = query_stream.read()
try:
client.query(query, job_config=job_config)
click.echo(f"{sqlfile:59} OK")
return True
except Exception as e:
click.echo(f"{sqlfile:59} ERROR: {e}")
return False
sql_file_valid = gcp_dryrun
def sql_file_valid(sqlfile):
"""Dry run the SQL file."""
result = DryRun(sqlfile, use_cloud_function=use_cloud_function, client=client)
if validate_schemas:
valid = result.validate_schema()
if not valid:
click.echo(f"{sqlfile:59} ERROR schema invalid")
return valid
return result.is_valid()
with ThreadPool(8) as p:
result = p.map(sql_file_valid, sql_files, chunksize=1)

Просмотреть файл

@ -15,19 +15,20 @@ import click
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from ..cli.dryrun import SKIP, dryrun
from ..cli.format import format
from ..cli.utils import is_authenticated, is_valid_dir, is_valid_project
from ..dependency import get_dependency_graph
from ..dryrun import SKIP, DryRun
from ..format_sql.formatter import reformat
from ..metadata import validate_metadata
from ..metadata.parse_metadata import METADATA_FILE, Metadata, DatasetMetadata
from ..metadata.parse_metadata import METADATA_FILE, DatasetMetadata, Metadata
from ..query_scheduling.dag_collection import DagCollection
from ..query_scheduling.generate_airflow_dags import get_dags
from ..run_query import run
from ..schema import SCHEMA_FILE, Schema
from ..util import extract_from_query_path
from ..util.common import random_str
from .dryrun import dryrun
QUERY_NAME_RE = re.compile(r"(?P<dataset>[a-zA-z0-9_]+)\.(?P<name>[a-zA-z0-9_]+)")
SQL_FILE_RE = re.compile(
@ -673,8 +674,15 @@ def backfill(
type=bool,
default=True,
)
@click.option(
"--validate_schemas",
"--validate-schemas",
help="Require dry run schema to match destination table and file if present.",
is_flag=True,
default=False,
)
@click.pass_context
def validate(ctx, name, sql_dir, project_id, use_cloud_function):
def validate(ctx, name, sql_dir, project_id, use_cloud_function, validate_schemas):
"""Validate queries by dry running, formatting and checking scheduling configs."""
if name is None:
name = "*.*"
@ -686,9 +694,10 @@ def validate(ctx, name, sql_dir, project_id, use_cloud_function):
ctx.invoke(format, path=str(query))
ctx.invoke(
dryrun,
path=str(query),
paths=[str(query)],
use_cloud_function=use_cloud_function,
project=project,
validate_schemas=validate_schemas,
)
validate_metadata.validate(query.parent)
dataset_dirs.add(query.parent.parent)
@ -696,8 +705,6 @@ def validate(ctx, name, sql_dir, project_id, use_cloud_function):
for dataset_dir in dataset_dirs:
validate_metadata.validate_datasets(dataset_dir)
# todo: validate if new fields get added
@query.command(
help="""Create and initialize the destination table for the query.
@ -1114,70 +1121,6 @@ def deploy(ctx, name, sql_dir, project_id, force):
click.echo(f"No schema file found for {query_file}")
def _validate_schema(query_file):
"""
Check whether schema is valid.
Returns tuple for whether schema is valid and path to schema.
"""
if str(query_file) in SKIP or query_file.name == "script.sql":
click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.")
return (True, query_file)
query_file_path = Path(query_file)
query_schema = Schema.from_query_file(query_file_path)
existing_schema_path = query_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
click.echo(f"No schema file defined for {query_file_path}", err=True)
return (True, query_file_path)
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
partitioned_by = None
try:
metadata = Metadata.of_query_file(query_file_path)
if metadata.bigquery and metadata.bigquery.time_partitioning:
partitioned_by = metadata.bigquery.time_partitioning.field
except FileNotFoundError:
pass
table_schema = Schema.for_table(
project_name, dataset_name, table_name, partitioned_by
)
if not query_schema.compatible(table_schema):
click.echo(
click.style(
f"ERROR: Schema for query in {query_file_path} "
f"incompatible with schema deployed for "
f"{project_name}.{dataset_name}.{table_name}",
fg="red",
),
err=True,
)
return (False, query_file_path)
else:
existing_schema = Schema.from_schema_file(existing_schema_path)
if not existing_schema.equal(query_schema):
click.echo(
click.style(
f"Schema defined in {existing_schema_path} "
f"incompatible with query {query_file_path}",
fg="red",
),
err=True,
)
return (False, query_file_path)
click.echo(f"Schemas for {query_file_path} are valid.")
return (True, query_file_path)
@schema.command(
help="""Validate the query schema
@ -1200,6 +1143,9 @@ def validate_schema(name, sql_dir, project_id):
"""Validate the defined query schema against the query and destination table."""
query_files = _queries_matching_name_pattern(name, sql_dir, project_id)
def _validate_schema(query_file_path):
return DryRun(query_file_path).validate_schema(), query_file_path
with Pool(8) as p:
result = p.map(_validate_schema, query_files, chunksize=1)

Просмотреть файл

@ -10,18 +10,21 @@ only dry runs can be performed. In order to reduce risk of CI or local users
accidentally running queries during tests and overwriting production data, we
proxy the queries through the dry run service endpoint.
"""
import fnmatch
import glob
import json
import re
import sys
from argparse import ArgumentParser
from enum import Enum
from multiprocessing.pool import Pool
from os.path import basename, dirname, exists, isdir
from typing import Set
from os.path import basename, dirname, exists
from pathlib import Path
from urllib.request import Request, urlopen
import click
from google.cloud import bigquery
from .metadata.parse_metadata import Metadata
from .schema import SCHEMA_FILE, Schema
try:
from functools import cached_property # type: ignore
except ImportError:
@ -224,11 +227,20 @@ class DryRun:
"bigquery-etl-dryrun"
)
def __init__(self, sqlfile, content=None, strip_dml=False):
def __init__(
self,
sqlfile,
content=None,
strip_dml=False,
use_cloud_function=True,
client=None,
):
"""Instantiate DryRun class."""
self.sqlfile = sqlfile
self.content = content
self.strip_dml = strip_dml
self.use_cloud_function = use_cloud_function
self.client = client if use_cloud_function or client else bigquery.Client()
def get_sql(self):
"""Get SQL content."""
@ -253,26 +265,54 @@ class DryRun:
sql = self.content
else:
sql = self.get_sql()
dataset = basename(dirname(dirname(self.sqlfile)))
try:
r = urlopen(
Request(
self.DRY_RUN_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(
{
"dataset": basename(dirname(dirname(self.sqlfile))),
"query": sql,
}
).encode("utf8"),
method="POST",
if self.use_cloud_function:
r = urlopen(
Request(
self.DRY_RUN_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(
{
"dataset": dataset,
"query": sql,
}
).encode("utf8"),
method="POST",
)
)
)
return json.load(r)
else:
project = basename(dirname(dirname(dirname(self.sqlfile))))
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
default_dataset=f"{project}.{dataset}",
query_parameters=[
bigquery.ScalarQueryParameter(
"submission_date", "DATE", "2019-01-01"
)
],
)
job = self.client.query(sql, job_config=job_config)
return {
"valid": True,
"referencedTables": [
ref.to_api_repr() for ref in job.referenced_tables
],
"schema": (
job._properties.get("statistics", {})
.get("query", {})
.get("schema", {})
),
"datasetLabels": (
self.client.get_dataset(job.default_dataset).labels
),
}
except Exception as e:
print(f"{self.sqlfile:59} ERROR\n", e)
return None
return json.load(r)
def get_referenced_tables(self):
"""Return referenced tables by dry running the SQL file."""
if self.sqlfile not in SKIP and not self.is_valid():
@ -433,6 +473,65 @@ class DryRun:
return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
return error
def validate_schema(self):
"""Check whether schema is valid."""
if self.sqlfile in SKIP or basename(self.sqlfile) == "script.sql":
print(f"\t...Ignoring dryrun results for {self.sqlfile}")
return True
query_file_path = Path(self.sqlfile)
query_schema = Schema.from_json(self.get_schema())
existing_schema_path = query_file_path.parent / SCHEMA_FILE
if not existing_schema_path.is_file():
click.echo(f"No schema file defined for {query_file_path}", err=True)
return True
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
partitioned_by = None
try:
metadata = Metadata.of_query_file(query_file_path)
if metadata.bigquery and metadata.bigquery.time_partitioning:
partitioned_by = metadata.bigquery.time_partitioning.field
except FileNotFoundError:
pass
table_schema = Schema.for_table(
project_name, dataset_name, table_name, partitioned_by
)
if not query_schema.compatible(table_schema):
click.echo(
click.style(
f"ERROR: Schema for query in {query_file_path} "
f"incompatible with schema deployed for "
f"{project_name}.{dataset_name}.{table_name}",
fg="red",
),
err=True,
)
return False
else:
existing_schema = Schema.from_schema_file(existing_schema_path)
if not existing_schema.equal(query_schema):
click.echo(
click.style(
f"Schema defined in {existing_schema_path} "
f"incompatible with query {query_file_path}",
fg="red",
),
err=True,
)
return False
click.echo(f"Schemas for {query_file_path} are valid.")
return True
def sql_file_valid(sqlfile):
"""Dry run SQL files."""
@ -446,47 +545,3 @@ def find_next_word(target, source):
if w == target:
# get the next word, and remove quotations from column name
return split[i + 1].replace("'", "")
def main():
"""Dry run all SQL files in the project directories."""
parser = ArgumentParser(description=main.__doc__)
parser.add_argument(
"paths",
metavar="PATH",
nargs="*",
help="Paths to search for queries to dry run. CI passes 'sql' on the default "
"branch, and the paths that have been modified since branching otherwise",
)
args = parser.parse_args()
file_names = ("query.sql", "view.sql", "part*.sql", "init.sql")
file_re = re.compile("|".join(map(fnmatch.translate, file_names)))
sql_files: Set[str] = set()
for path in args.paths:
if isdir(path):
sql_files |= {
sql_file
for pattern in file_names
for sql_file in glob.glob(f"{path}/**/{pattern}", recursive=True)
}
elif file_re.fullmatch(basename(path)):
sql_files.add(path)
sql_files -= SKIP
if not sql_files:
print("Skipping dry run because no queries matched")
sys.exit(0)
with Pool(8) as p:
result = p.map(sql_file_valid, sorted(sql_files), chunksize=1)
if all(result):
exitcode = 0
else:
exitcode = 1
sys.exit(exitcode)
if __name__ == "__main__":
main()

Просмотреть файл

@ -1,16 +1,16 @@
"""Query schema."""
from google.cloud import bigquery
import json
import os
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional, List
from typing import Any, Dict, List, Optional
import attr
import os
import yaml
from google.cloud import bigquery
from bigquery_etl.dryrun import DryRun
from .. import dryrun
SCHEMA_FILE = "schema.yaml"
@ -27,7 +27,7 @@ class Schema:
if not query_file.is_file() or query_file.suffix != ".sql":
raise Exception(f"{query_file} is not a valid SQL file.")
schema = DryRun(str(query_file), content=content).get_schema()
schema = dryrun.DryRun(str(query_file), content=content).get_schema()
return cls(schema)
@classmethod
@ -55,7 +55,7 @@ class Schema:
try:
return cls(
DryRun(
dryrun.DryRun(
os.path.join(project, dataset, table, "query.sql"), query
).get_schema()
)

Просмотреть файл

@ -5,4 +5,4 @@
cd "$(dirname "$0")/.."
exec python3 -m bigquery_etl.dryrun "$@"
script/bqetl dryrun "$@"