diff --git a/bigquery_etl/cli/__init__.py b/bigquery_etl/cli/__init__.py index 965e4342eb..04b86dcb1b 100644 --- a/bigquery_etl/cli/__init__.py +++ b/bigquery_etl/cli/__init__.py @@ -10,6 +10,7 @@ from .._version import __version__ # We rename the import, otherwise it affects monkeypatching in tests from ..cli.alchemer import alchemer as alchemer_ from ..cli.backfill import backfill +from ..cli.check import check from ..cli.dag import dag from ..cli.dryrun import dryrun from ..cli.format import format @@ -48,6 +49,7 @@ def cli(prog_name=None): "copy_deduplicate": copy_deduplicate, "stage": stage, "backfill": backfill, + "check": check, } @click.group(commands=commands) diff --git a/bigquery_etl/cli/check.py b/bigquery_etl/cli/check.py new file mode 100644 index 0000000000..bbe1167acb --- /dev/null +++ b/bigquery_etl/cli/check.py @@ -0,0 +1,146 @@ +"""bigquery-etl CLI check command.""" +import re +import subprocess +import sys +import tempfile +from subprocess import CalledProcessError + +import click + +from ..cli.utils import ( + is_authenticated, + paths_matching_checks_pattern, + project_id_option, + sql_dir_option, +) +from ..util.common import render as render_template + + +def _build_jinja_parameters(query_args): + """Convert the bqetl parameters to a dictionary for use by the Jinja template.""" + parameters = {} + for query_arg in query_args: + param_and_value = query_arg.split("=") + if len(param_and_value) == 2: + # e.g. --parameter=download_date:DATE:2023-05-28 + # the dict result is {"download_date": "2023-05-28"} + bq_parameter = param_and_value[1].split(":") + if len(bq_parameter) == 3: + if re.match(r"^\w+$", bq_parameter[0]): + parameters[bq_parameter[0]] = bq_parameter[2] + else: + # e.g. --project_id=moz-fx-data-marketing-prod + # the dict result is {"project_id": "moz-fx-data-marketing-prod"} + if param_and_value[0].startswith("--"): + parameters[param_and_value[0].strip("--")] = param_and_value[1] + else: + print(f"parameter {query_arg} will not be used to render Jinja template.") + return parameters + + +def _parse_check_output(output: str) -> str: + output = output.replace("\n", " ") + if "ETL Data Check Failed:" in output: + return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}" + return output + + +@click.group( + help=""" + Commands for managing data checks. + \b + + UNDER ACTIVE DEVELOPMENT See https://mozilla-hub.atlassian.net/browse/DENG-919 + """ +) +@click.pass_context +def check(ctx): + """Create the CLI group for the check command.""" + # create temporary directory generated content is written to + # the directory will be deleted automatically after the command exits + ctx.ensure_object(dict) + ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory()) + + +@check.command( + help=""" + Run ETL checks. +s \b + + Example: + ./bqetl check run ga_derived.downloads_with_attribution_v2 --parameter=download_date:DATE:2023-05-01 + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument("name") +@project_id_option() +@sql_dir_option +@click.pass_context +def run(ctx, name, project_id, sql_dir): + """Run a check.""" + if not is_authenticated(): + click.echo( + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + + checks_file, project_id, dataset_id, table = paths_matching_checks_pattern( + name, sql_dir, project_id=project_id + ) + + _run_check( + checks_file, + project_id, + dataset_id, + table, + ctx.args, + ) + + +def _run_check( + checks_file, + project_id, + dataset_id, + table, + query_arguments, +): + """Run the check.""" + if checks_file is None: + return + + query_arguments.append("--use_legacy_sql=false") + if project_id is not None: + query_arguments.append(f"--project_id={project_id}") + + # Convert all the Airflow params to jinja usable dict. + parameters = _build_jinja_parameters(query_arguments) + + jinja_params = { + **{"dataset_id": dataset_id, "table_name": table}, + **parameters, + } + + with tempfile.NamedTemporaryFile(mode="w+") as query_stream: + query_stream.write( + render_template( + checks_file.name, + template_folder=str(checks_file.parent), + templates_dir="", + format=False, + **jinja_params, + ) + ) + query_stream.seek(0) + + # run the query as shell command so that passed parameters can be used as is + try: + subprocess.check_output( + ["bq", "query"] + query_arguments, stdin=query_stream, encoding="UTF-8" + ) + except CalledProcessError as e: + print(_parse_check_output(e.output)) + sys.exit(1) diff --git a/bigquery_etl/cli/query.py b/bigquery_etl/cli/query.py index 505540d968..be10bce20f 100644 --- a/bigquery_etl/cli/query.py +++ b/bigquery_etl/cli/query.py @@ -1,6 +1,7 @@ """bigquery-etl CLI query command.""" import copy +import datetime import logging import os import re @@ -1273,6 +1274,40 @@ def render(name, sql_dir, output_dir): click.echo(rendered_sql) +def _parse_partition_setting(partition_date): + params = partition_date.split(":") + if len(params) != 3: + return None + + # Check date format + try: + datetime.datetime.strptime(params[2], "%Y-%m-%d").date() + except ValueError: + return None + + # Check column name + if re.match(r"^\w+$", params[0]): + return {params[0]: params[2]} + + +def _validate_partition_date(ctx, param, partition_date): + """Process the CLI parameter check_date and set the parameter for BigQuery.""" + # Will be None if launched from Airflow. Also ctx.args is not populated at this stage. + if partition_date: + parsed = _parse_partition_setting(partition_date) + if parsed is None: + raise click.BadParameter("Format must be ::") + return parsed + return None + + +def _parse_check_output(output: str) -> str: + output = output.replace("\n", " ") + if "ETL Data Check Failed:" in output: + return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}" + return output + + @query.group(help="Commands for managing query schemas.") def schema(): """Create the CLI group for the query schema command.""" diff --git a/bigquery_etl/cli/utils.py b/bigquery_etl/cli/utils.py index 78d76ff1e1..5b8c6d6e09 100644 --- a/bigquery_etl/cli/utils.py +++ b/bigquery_etl/cli/utils.py @@ -17,6 +17,10 @@ QUERY_FILE_RE = re.compile( r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/" r"(?:query\.sql|part1\.sql|script\.sql|query\.py|view\.sql|metadata\.yaml|backfill\.yaml)$" ) +CHECKS_FILE_RE = re.compile( + r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/" + r"(?:checks\.sql)$" +) QUALIFIED_TABLE_NAME_RE = re.compile( r"(?P[a-zA-z0-9_-]+)\.(?P[a-zA-z0-9_-]+)\.(?P[a-zA-z0-9_-]+)" ) @@ -83,7 +87,27 @@ def table_matches_patterns(pattern, invert, table): return matching != invert -def paths_matching_name_pattern(pattern, sql_path, project_id, files=["*.sql"]): +def paths_matching_checks_pattern(pattern, sql_path, project_id): + """Return single path to checks.sql matching the name pattern.""" + checks_files = paths_matching_name_pattern( + pattern, sql_path, project_id, ["checks.sql"], CHECKS_FILE_RE + ) + + if len(checks_files) == 1: + match = CHECKS_FILE_RE.match(str(checks_files[0])) + if match: + project = match.group(1) + dataset = match.group(2) + table = match.group(3) + return checks_files[0], project, dataset, table + else: + print(f"No checks.sql file found in {sql_path}/{project_id}/{pattern}") + return None, None, None, None + + +def paths_matching_name_pattern( + pattern, sql_path, project_id, files=["*.sql"], file_regex=QUERY_FILE_RE +): """Return paths to queries matching the name pattern.""" matching_files = [] @@ -107,7 +131,7 @@ def paths_matching_name_pattern(pattern, sql_path, project_id, files=["*.sql"]): all_matching_files.extend(Path(sql_path).rglob(file)) for query_file in all_matching_files: - match = QUERY_FILE_RE.match(str(query_file)) + match = file_regex.match(str(query_file)) if match: project = match.group(1) dataset = match.group(2) diff --git a/sql/moz-fx-data-marketing-prod/ga_derived/downloads_with_attribution_v2/checks.sql b/sql/moz-fx-data-marketing-prod/ga_derived/downloads_with_attribution_v2/checks.sql new file mode 100644 index 0000000000..ccb2958954 --- /dev/null +++ b/sql/moz-fx-data-marketing-prod/ga_derived/downloads_with_attribution_v2/checks.sql @@ -0,0 +1,2 @@ +ASSERT((SELECT COUNT(*) FROM `{{project_id}}.{{dataset_id}}.{{table_name}}` WHERE download_date = @download_date) > 250000) +AS 'ETL Data Check Failed: Table {{project_id}}.{{dataset_id}}.{{table_name}} contains less than 250,000 rows for date: {{ download_date }}.' \ No newline at end of file diff --git a/tests/cli/test_cli_check.py b/tests/cli/test_cli_check.py new file mode 100644 index 0000000000..22eef0e401 --- /dev/null +++ b/tests/cli/test_cli_check.py @@ -0,0 +1,36 @@ +import pytest +from click.testing import CliRunner + +from bigquery_etl.cli.check import _build_jinja_parameters, _parse_check_output + + +class TestCheck: + @pytest.fixture + def runner(self): + return CliRunner() + + def test_parse_check_output(self): + expected = "ETL Data Check Failed: a check failed" + assert _parse_check_output(expected) == expected + + test2 = "remove prepended text ETL Data Check Failed: a check failed" + assert _parse_check_output(test2) == expected + + test3 = "no match for text Data Check Failed: a check failed" + assert _parse_check_output(test3) == test3 + + def test_build_jinja_parameters(self): + test = [ + "--parameter=submission_date::2023-06-01", + "--parameter=id::asdf", + "--use_legacy_sql=false", + "--project_id=moz-fx-data-marketing-prod", + "--debug", + ] + expected = { + "submission_date": "2023-06-01", + "id": "asdf", + "use_legacy_sql": "false", + "project_id": "moz-fx-data-marketing-prod", + } + assert _build_jinja_parameters(test) == expected