From 3c8894fdf12dfaeafda324d1cda2c51f7437259f Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Tue, 25 May 2021 11:53:09 -0700 Subject: [PATCH] Make schema validation part of dryrun (#2069) --- .circleci/config.yml | 16 +-- bigquery_etl/cli/dryrun.py | 92 ++++++++-------- bigquery_etl/cli/query.py | 86 +++------------ bigquery_etl/dryrun.py | 185 +++++++++++++++++++++----------- bigquery_etl/schema/__init__.py | 12 +-- script/dryrun | 2 +- 6 files changed, 193 insertions(+), 200 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 93f3d06976..a4d2265bd4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,6 +62,8 @@ jobs: docker: *docker steps: - checkout + - *restore_venv_cache + - *build - run: name: Dry run queries # yamllint disable rule:line-length @@ -80,7 +82,7 @@ jobs: PATHS="$(git diff origin/main... --name-only --diff-filter=d -- sql)" fi echo $PATHS - script/dryrun $PATHS + PATH="venv/bin:$PATH" script/dryrun --validate-schemas $PATHS # yamllint enable rule:line-length validate-metadata: docker: *docker @@ -230,17 +232,6 @@ jobs: - run: name: Validate views command: PATH="venv/bin:$PATH" script/validate_views - validate-schemas: - docker: *docker - steps: - - checkout - - *restore_venv_cache - - *build - - run: - name: Validate query schemas - command: | - ./bqetl bootstrap - ./bqetl query schema validate "*" docs: docker: *docker steps: @@ -356,7 +347,6 @@ workflows: - verify-requirements - dry-run-sql - validate-metadata - - validate-schemas - integration - validate-dags - verify-dags-up-to-date diff --git a/bigquery_etl/cli/dryrun.py b/bigquery_etl/cli/dryrun.py index 8478dfdad0..42cbb21a6d 100644 --- a/bigquery_etl/cli/dryrun.py +++ b/bigquery_etl/cli/dryrun.py @@ -1,9 +1,12 @@ """bigquery-etl CLI dryrun command.""" +import fnmatch +import glob import os +import re import sys from multiprocessing.pool import ThreadPool -from pathlib import Path +from typing import Set import click from google.cloud import bigquery @@ -27,8 +30,8 @@ from ..dryrun import SKIP, DryRun """, ) @click.argument( - "path", - default="sql/", + "paths", + nargs=-1, type=click.Path(file_okay=True), ) @click.option( @@ -42,61 +45,60 @@ from ..dryrun import SKIP, DryRun type=bool, default=True, ) +@click.option( + "--validate_schemas", + "--validate-schemas", + help="Require dry run schema to match destination table and file if present.", + is_flag=True, + default=False, +) @click.option( "--project", help="GCP project to perform dry run in when --use_cloud_function=False", default="moz-fx-data-shared-prod", ) -def dryrun(path, use_cloud_function, project): +def dryrun(paths, use_cloud_function, validate_schemas, project): """Perform a dry run.""" - if os.path.isdir(path) and os.path.exists(path): - sql_files = [f for f in Path(path).rglob("*.sql") if str(f) not in SKIP] - elif os.path.isfile(path) and os.path.exists(path): - sql_files = [path] - else: - click.echo(f"Invalid path {path}", err=True) - sys.exit(1) + file_names = ("query.sql", "view.sql", "part*.sql", "init.sql") + file_re = re.compile("|".join(map(fnmatch.translate, file_names))) + + sql_files: Set[str] = set() + for path in paths: + if os.path.isdir(path): + sql_files |= { + sql_file + for pattern in file_names + for sql_file in glob.glob(f"{path}/**/{pattern}", recursive=True) + } + elif os.path.isfile(path): + if file_re.fullmatch(os.path.basename(path)): + sql_files.add(path) + else: + click.echo(f"Invalid path {path}", err=True) + sys.exit(1) + sql_files -= SKIP + + if not sql_files: + print("Skipping dry run because no queries matched") + sys.exit(0) if use_cloud_function: - - def cloud_function_dryrun(sqlfile): - """Dry run SQL files.""" - return DryRun(sqlfile).is_valid() - - sql_file_valid = cloud_function_dryrun + client = None else: if not is_authenticated(): click.echo("Not authenticated to GCP. Run `gcloud auth login` to login.") sys.exit(1) + client = bigquery.Client(project=project) - client = bigquery.Client() - - def gcp_dryrun(sqlfile): - """Dry run the SQL file.""" - dataset = Path(sqlfile).parent.parent.name - job_config = bigquery.QueryJobConfig( - dry_run=True, - use_query_cache=False, - default_dataset=f"{project}.{dataset}", - query_parameters=[ - bigquery.ScalarQueryParameter( - "submission_date", "DATE", "2019-01-01" - ) - ], - ) - - with open(sqlfile) as query_stream: - query = query_stream.read() - - try: - client.query(query, job_config=job_config) - click.echo(f"{sqlfile:59} OK") - return True - except Exception as e: - click.echo(f"{sqlfile:59} ERROR: {e}") - return False - - sql_file_valid = gcp_dryrun + def sql_file_valid(sqlfile): + """Dry run the SQL file.""" + result = DryRun(sqlfile, use_cloud_function=use_cloud_function, client=client) + if validate_schemas: + valid = result.validate_schema() + if not valid: + click.echo(f"{sqlfile:59} ERROR schema invalid") + return valid + return result.is_valid() with ThreadPool(8) as p: result = p.map(sql_file_valid, sql_files, chunksize=1) diff --git a/bigquery_etl/cli/query.py b/bigquery_etl/cli/query.py index 6838b5907a..1fedbbf136 100644 --- a/bigquery_etl/cli/query.py +++ b/bigquery_etl/cli/query.py @@ -15,19 +15,20 @@ import click from google.cloud import bigquery from google.cloud.exceptions import NotFound -from ..cli.dryrun import SKIP, dryrun from ..cli.format import format from ..cli.utils import is_authenticated, is_valid_dir, is_valid_project from ..dependency import get_dependency_graph +from ..dryrun import SKIP, DryRun from ..format_sql.formatter import reformat from ..metadata import validate_metadata -from ..metadata.parse_metadata import METADATA_FILE, Metadata, DatasetMetadata +from ..metadata.parse_metadata import METADATA_FILE, DatasetMetadata, Metadata from ..query_scheduling.dag_collection import DagCollection from ..query_scheduling.generate_airflow_dags import get_dags from ..run_query import run from ..schema import SCHEMA_FILE, Schema from ..util import extract_from_query_path from ..util.common import random_str +from .dryrun import dryrun QUERY_NAME_RE = re.compile(r"(?P[a-zA-z0-9_]+)\.(?P[a-zA-z0-9_]+)") SQL_FILE_RE = re.compile( @@ -673,8 +674,15 @@ def backfill( type=bool, default=True, ) +@click.option( + "--validate_schemas", + "--validate-schemas", + help="Require dry run schema to match destination table and file if present.", + is_flag=True, + default=False, +) @click.pass_context -def validate(ctx, name, sql_dir, project_id, use_cloud_function): +def validate(ctx, name, sql_dir, project_id, use_cloud_function, validate_schemas): """Validate queries by dry running, formatting and checking scheduling configs.""" if name is None: name = "*.*" @@ -686,9 +694,10 @@ def validate(ctx, name, sql_dir, project_id, use_cloud_function): ctx.invoke(format, path=str(query)) ctx.invoke( dryrun, - path=str(query), + paths=[str(query)], use_cloud_function=use_cloud_function, project=project, + validate_schemas=validate_schemas, ) validate_metadata.validate(query.parent) dataset_dirs.add(query.parent.parent) @@ -696,8 +705,6 @@ def validate(ctx, name, sql_dir, project_id, use_cloud_function): for dataset_dir in dataset_dirs: validate_metadata.validate_datasets(dataset_dir) - # todo: validate if new fields get added - @query.command( help="""Create and initialize the destination table for the query. @@ -1114,70 +1121,6 @@ def deploy(ctx, name, sql_dir, project_id, force): click.echo(f"No schema file found for {query_file}") -def _validate_schema(query_file): - """ - Check whether schema is valid. - - Returns tuple for whether schema is valid and path to schema. - """ - if str(query_file) in SKIP or query_file.name == "script.sql": - click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.") - return (True, query_file) - - query_file_path = Path(query_file) - query_schema = Schema.from_query_file(query_file_path) - existing_schema_path = query_file_path.parent / SCHEMA_FILE - - if not existing_schema_path.is_file(): - click.echo(f"No schema file defined for {query_file_path}", err=True) - return (True, query_file_path) - - table_name = query_file_path.parent.name - dataset_name = query_file_path.parent.parent.name - project_name = query_file_path.parent.parent.parent.name - - partitioned_by = None - try: - metadata = Metadata.of_query_file(query_file_path) - - if metadata.bigquery and metadata.bigquery.time_partitioning: - partitioned_by = metadata.bigquery.time_partitioning.field - except FileNotFoundError: - pass - - table_schema = Schema.for_table( - project_name, dataset_name, table_name, partitioned_by - ) - - if not query_schema.compatible(table_schema): - click.echo( - click.style( - f"ERROR: Schema for query in {query_file_path} " - f"incompatible with schema deployed for " - f"{project_name}.{dataset_name}.{table_name}", - fg="red", - ), - err=True, - ) - return (False, query_file_path) - else: - existing_schema = Schema.from_schema_file(existing_schema_path) - - if not existing_schema.equal(query_schema): - click.echo( - click.style( - f"Schema defined in {existing_schema_path} " - f"incompatible with query {query_file_path}", - fg="red", - ), - err=True, - ) - return (False, query_file_path) - - click.echo(f"Schemas for {query_file_path} are valid.") - return (True, query_file_path) - - @schema.command( help="""Validate the query schema @@ -1200,6 +1143,9 @@ def validate_schema(name, sql_dir, project_id): """Validate the defined query schema against the query and destination table.""" query_files = _queries_matching_name_pattern(name, sql_dir, project_id) + def _validate_schema(query_file_path): + return DryRun(query_file_path).validate_schema(), query_file_path + with Pool(8) as p: result = p.map(_validate_schema, query_files, chunksize=1) diff --git a/bigquery_etl/dryrun.py b/bigquery_etl/dryrun.py index e5a30d4a26..1f8edd1edf 100644 --- a/bigquery_etl/dryrun.py +++ b/bigquery_etl/dryrun.py @@ -10,18 +10,21 @@ only dry runs can be performed. In order to reduce risk of CI or local users accidentally running queries during tests and overwriting production data, we proxy the queries through the dry run service endpoint. """ -import fnmatch + import glob import json import re -import sys -from argparse import ArgumentParser from enum import Enum -from multiprocessing.pool import Pool -from os.path import basename, dirname, exists, isdir -from typing import Set +from os.path import basename, dirname, exists +from pathlib import Path from urllib.request import Request, urlopen +import click +from google.cloud import bigquery + +from .metadata.parse_metadata import Metadata +from .schema import SCHEMA_FILE, Schema + try: from functools import cached_property # type: ignore except ImportError: @@ -224,11 +227,20 @@ class DryRun: "bigquery-etl-dryrun" ) - def __init__(self, sqlfile, content=None, strip_dml=False): + def __init__( + self, + sqlfile, + content=None, + strip_dml=False, + use_cloud_function=True, + client=None, + ): """Instantiate DryRun class.""" self.sqlfile = sqlfile self.content = content self.strip_dml = strip_dml + self.use_cloud_function = use_cloud_function + self.client = client if use_cloud_function or client else bigquery.Client() def get_sql(self): """Get SQL content.""" @@ -253,26 +265,54 @@ class DryRun: sql = self.content else: sql = self.get_sql() + dataset = basename(dirname(dirname(self.sqlfile))) try: - r = urlopen( - Request( - self.DRY_RUN_URL, - headers={"Content-Type": "application/json"}, - data=json.dumps( - { - "dataset": basename(dirname(dirname(self.sqlfile))), - "query": sql, - } - ).encode("utf8"), - method="POST", + if self.use_cloud_function: + r = urlopen( + Request( + self.DRY_RUN_URL, + headers={"Content-Type": "application/json"}, + data=json.dumps( + { + "dataset": dataset, + "query": sql, + } + ).encode("utf8"), + method="POST", + ) ) - ) + return json.load(r) + else: + project = basename(dirname(dirname(dirname(self.sqlfile)))) + job_config = bigquery.QueryJobConfig( + dry_run=True, + use_query_cache=False, + default_dataset=f"{project}.{dataset}", + query_parameters=[ + bigquery.ScalarQueryParameter( + "submission_date", "DATE", "2019-01-01" + ) + ], + ) + job = self.client.query(sql, job_config=job_config) + return { + "valid": True, + "referencedTables": [ + ref.to_api_repr() for ref in job.referenced_tables + ], + "schema": ( + job._properties.get("statistics", {}) + .get("query", {}) + .get("schema", {}) + ), + "datasetLabels": ( + self.client.get_dataset(job.default_dataset).labels + ), + } except Exception as e: print(f"{self.sqlfile:59} ERROR\n", e) return None - return json.load(r) - def get_referenced_tables(self): """Return referenced tables by dry running the SQL file.""" if self.sqlfile not in SKIP and not self.is_valid(): @@ -433,6 +473,65 @@ class DryRun: return Errors.DATE_FILTER_NEEDED_AND_SYNTAX return error + def validate_schema(self): + """Check whether schema is valid.""" + if self.sqlfile in SKIP or basename(self.sqlfile) == "script.sql": + print(f"\t...Ignoring dryrun results for {self.sqlfile}") + return True + + query_file_path = Path(self.sqlfile) + query_schema = Schema.from_json(self.get_schema()) + existing_schema_path = query_file_path.parent / SCHEMA_FILE + + if not existing_schema_path.is_file(): + click.echo(f"No schema file defined for {query_file_path}", err=True) + return True + + table_name = query_file_path.parent.name + dataset_name = query_file_path.parent.parent.name + project_name = query_file_path.parent.parent.parent.name + + partitioned_by = None + try: + metadata = Metadata.of_query_file(query_file_path) + + if metadata.bigquery and metadata.bigquery.time_partitioning: + partitioned_by = metadata.bigquery.time_partitioning.field + except FileNotFoundError: + pass + + table_schema = Schema.for_table( + project_name, dataset_name, table_name, partitioned_by + ) + + if not query_schema.compatible(table_schema): + click.echo( + click.style( + f"ERROR: Schema for query in {query_file_path} " + f"incompatible with schema deployed for " + f"{project_name}.{dataset_name}.{table_name}", + fg="red", + ), + err=True, + ) + return False + else: + existing_schema = Schema.from_schema_file(existing_schema_path) + + if not existing_schema.equal(query_schema): + click.echo( + click.style( + f"Schema defined in {existing_schema_path} " + f"incompatible with query {query_file_path}", + fg="red", + ), + err=True, + ) + return False + + click.echo(f"Schemas for {query_file_path} are valid.") + return True + def sql_file_valid(sqlfile): """Dry run SQL files.""" @@ -446,47 +545,3 @@ def find_next_word(target, source): if w == target: # get the next word, and remove quotations from column name return split[i + 1].replace("'", "") - - -def main(): - """Dry run all SQL files in the project directories.""" - parser = ArgumentParser(description=main.__doc__) - parser.add_argument( - "paths", - metavar="PATH", - nargs="*", - help="Paths to search for queries to dry run. CI passes 'sql' on the default " - "branch, and the paths that have been modified since branching otherwise", - ) - args = parser.parse_args() - - file_names = ("query.sql", "view.sql", "part*.sql", "init.sql") - file_re = re.compile("|".join(map(fnmatch.translate, file_names))) - - sql_files: Set[str] = set() - for path in args.paths: - if isdir(path): - sql_files |= { - sql_file - for pattern in file_names - for sql_file in glob.glob(f"{path}/**/{pattern}", recursive=True) - } - elif file_re.fullmatch(basename(path)): - sql_files.add(path) - sql_files -= SKIP - - if not sql_files: - print("Skipping dry run because no queries matched") - sys.exit(0) - - with Pool(8) as p: - result = p.map(sql_file_valid, sorted(sql_files), chunksize=1) - if all(result): - exitcode = 0 - else: - exitcode = 1 - sys.exit(exitcode) - - -if __name__ == "__main__": - main() diff --git a/bigquery_etl/schema/__init__.py b/bigquery_etl/schema/__init__.py index 7b8729aa13..8260da773b 100644 --- a/bigquery_etl/schema/__init__.py +++ b/bigquery_etl/schema/__init__.py @@ -1,16 +1,16 @@ """Query schema.""" -from google.cloud import bigquery import json +import os from pathlib import Path from tempfile import NamedTemporaryFile -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List, Optional import attr -import os import yaml +from google.cloud import bigquery -from bigquery_etl.dryrun import DryRun +from .. import dryrun SCHEMA_FILE = "schema.yaml" @@ -27,7 +27,7 @@ class Schema: if not query_file.is_file() or query_file.suffix != ".sql": raise Exception(f"{query_file} is not a valid SQL file.") - schema = DryRun(str(query_file), content=content).get_schema() + schema = dryrun.DryRun(str(query_file), content=content).get_schema() return cls(schema) @classmethod @@ -55,7 +55,7 @@ class Schema: try: return cls( - DryRun( + dryrun.DryRun( os.path.join(project, dataset, table, "query.sql"), query ).get_schema() ) diff --git a/script/dryrun b/script/dryrun index bbd020da90..78e0bb1e17 100755 --- a/script/dryrun +++ b/script/dryrun @@ -5,4 +5,4 @@ cd "$(dirname "$0")/.." -exec python3 -m bigquery_etl.dryrun "$@" +script/bqetl dryrun "$@"