DENG-941 initial impl of check rendering and execution. (#3885)
* initial impl * Updated based on PR feedback * Moved check from query to separate command * Expanded from --partition option to generic --parameter option * Removed `query check` command (check moved to new command) * Update bigquery_etl/cli/check.py remove date param format check Co-authored-by: Anna Scholtz <anna@scholtzan.net> * Removed 'parameter' parameter, everything is passed through ctx.args and then converted to a dict for Jinja rendering. There are no restrictions on ctx.args values. * Merge error --------- Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Родитель
65365226b5
Коммит
c69fee0b5f
|
@ -10,6 +10,7 @@ from .._version import __version__
|
|||
# We rename the import, otherwise it affects monkeypatching in tests
|
||||
from ..cli.alchemer import alchemer as alchemer_
|
||||
from ..cli.backfill import backfill
|
||||
from ..cli.check import check
|
||||
from ..cli.dag import dag
|
||||
from ..cli.dryrun import dryrun
|
||||
from ..cli.format import format
|
||||
|
@ -48,6 +49,7 @@ def cli(prog_name=None):
|
|||
"copy_deduplicate": copy_deduplicate,
|
||||
"stage": stage,
|
||||
"backfill": backfill,
|
||||
"check": check,
|
||||
}
|
||||
|
||||
@click.group(commands=commands)
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
"""bigquery-etl CLI check command."""
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
import click
|
||||
|
||||
from ..cli.utils import (
|
||||
is_authenticated,
|
||||
paths_matching_checks_pattern,
|
||||
project_id_option,
|
||||
sql_dir_option,
|
||||
)
|
||||
from ..util.common import render as render_template
|
||||
|
||||
|
||||
def _build_jinja_parameters(query_args):
|
||||
"""Convert the bqetl parameters to a dictionary for use by the Jinja template."""
|
||||
parameters = {}
|
||||
for query_arg in query_args:
|
||||
param_and_value = query_arg.split("=")
|
||||
if len(param_and_value) == 2:
|
||||
# e.g. --parameter=download_date:DATE:2023-05-28
|
||||
# the dict result is {"download_date": "2023-05-28"}
|
||||
bq_parameter = param_and_value[1].split(":")
|
||||
if len(bq_parameter) == 3:
|
||||
if re.match(r"^\w+$", bq_parameter[0]):
|
||||
parameters[bq_parameter[0]] = bq_parameter[2]
|
||||
else:
|
||||
# e.g. --project_id=moz-fx-data-marketing-prod
|
||||
# the dict result is {"project_id": "moz-fx-data-marketing-prod"}
|
||||
if param_and_value[0].startswith("--"):
|
||||
parameters[param_and_value[0].strip("--")] = param_and_value[1]
|
||||
else:
|
||||
print(f"parameter {query_arg} will not be used to render Jinja template.")
|
||||
return parameters
|
||||
|
||||
|
||||
def _parse_check_output(output: str) -> str:
|
||||
output = output.replace("\n", " ")
|
||||
if "ETL Data Check Failed:" in output:
|
||||
return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}"
|
||||
return output
|
||||
|
||||
|
||||
@click.group(
|
||||
help="""
|
||||
Commands for managing data checks.
|
||||
\b
|
||||
|
||||
UNDER ACTIVE DEVELOPMENT See https://mozilla-hub.atlassian.net/browse/DENG-919
|
||||
"""
|
||||
)
|
||||
@click.pass_context
|
||||
def check(ctx):
|
||||
"""Create the CLI group for the check command."""
|
||||
# create temporary directory generated content is written to
|
||||
# the directory will be deleted automatically after the command exits
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory())
|
||||
|
||||
|
||||
@check.command(
|
||||
help="""
|
||||
Run ETL checks.
|
||||
s \b
|
||||
|
||||
Example:
|
||||
./bqetl check run ga_derived.downloads_with_attribution_v2 --parameter=download_date:DATE:2023-05-01
|
||||
""",
|
||||
context_settings=dict(
|
||||
ignore_unknown_options=True,
|
||||
allow_extra_args=True,
|
||||
),
|
||||
)
|
||||
@click.argument("name")
|
||||
@project_id_option()
|
||||
@sql_dir_option
|
||||
@click.pass_context
|
||||
def run(ctx, name, project_id, sql_dir):
|
||||
"""Run a check."""
|
||||
if not is_authenticated():
|
||||
click.echo(
|
||||
"Authentication to GCP required. Run `gcloud auth login` "
|
||||
"and check that the project is set correctly."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
checks_file, project_id, dataset_id, table = paths_matching_checks_pattern(
|
||||
name, sql_dir, project_id=project_id
|
||||
)
|
||||
|
||||
_run_check(
|
||||
checks_file,
|
||||
project_id,
|
||||
dataset_id,
|
||||
table,
|
||||
ctx.args,
|
||||
)
|
||||
|
||||
|
||||
def _run_check(
|
||||
checks_file,
|
||||
project_id,
|
||||
dataset_id,
|
||||
table,
|
||||
query_arguments,
|
||||
):
|
||||
"""Run the check."""
|
||||
if checks_file is None:
|
||||
return
|
||||
|
||||
query_arguments.append("--use_legacy_sql=false")
|
||||
if project_id is not None:
|
||||
query_arguments.append(f"--project_id={project_id}")
|
||||
|
||||
# Convert all the Airflow params to jinja usable dict.
|
||||
parameters = _build_jinja_parameters(query_arguments)
|
||||
|
||||
jinja_params = {
|
||||
**{"dataset_id": dataset_id, "table_name": table},
|
||||
**parameters,
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w+") as query_stream:
|
||||
query_stream.write(
|
||||
render_template(
|
||||
checks_file.name,
|
||||
template_folder=str(checks_file.parent),
|
||||
templates_dir="",
|
||||
format=False,
|
||||
**jinja_params,
|
||||
)
|
||||
)
|
||||
query_stream.seek(0)
|
||||
|
||||
# run the query as shell command so that passed parameters can be used as is
|
||||
try:
|
||||
subprocess.check_output(
|
||||
["bq", "query"] + query_arguments, stdin=query_stream, encoding="UTF-8"
|
||||
)
|
||||
except CalledProcessError as e:
|
||||
print(_parse_check_output(e.output))
|
||||
sys.exit(1)
|
|
@ -1,6 +1,7 @@
|
|||
"""bigquery-etl CLI query command."""
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
@ -1273,6 +1274,40 @@ def render(name, sql_dir, output_dir):
|
|||
click.echo(rendered_sql)
|
||||
|
||||
|
||||
def _parse_partition_setting(partition_date):
|
||||
params = partition_date.split(":")
|
||||
if len(params) != 3:
|
||||
return None
|
||||
|
||||
# Check date format
|
||||
try:
|
||||
datetime.datetime.strptime(params[2], "%Y-%m-%d").date()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
# Check column name
|
||||
if re.match(r"^\w+$", params[0]):
|
||||
return {params[0]: params[2]}
|
||||
|
||||
|
||||
def _validate_partition_date(ctx, param, partition_date):
|
||||
"""Process the CLI parameter check_date and set the parameter for BigQuery."""
|
||||
# Will be None if launched from Airflow. Also ctx.args is not populated at this stage.
|
||||
if partition_date:
|
||||
parsed = _parse_partition_setting(partition_date)
|
||||
if parsed is None:
|
||||
raise click.BadParameter("Format must be <column-name>::<yyyy-mm-dd>")
|
||||
return parsed
|
||||
return None
|
||||
|
||||
|
||||
def _parse_check_output(output: str) -> str:
|
||||
output = output.replace("\n", " ")
|
||||
if "ETL Data Check Failed:" in output:
|
||||
return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}"
|
||||
return output
|
||||
|
||||
|
||||
@query.group(help="Commands for managing query schemas.")
|
||||
def schema():
|
||||
"""Create the CLI group for the query schema command."""
|
||||
|
|
|
@ -17,6 +17,10 @@ QUERY_FILE_RE = re.compile(
|
|||
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
|
||||
r"(?:query\.sql|part1\.sql|script\.sql|query\.py|view\.sql|metadata\.yaml|backfill\.yaml)$"
|
||||
)
|
||||
CHECKS_FILE_RE = re.compile(
|
||||
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
|
||||
r"(?:checks\.sql)$"
|
||||
)
|
||||
QUALIFIED_TABLE_NAME_RE = re.compile(
|
||||
r"(?P<project_id>[a-zA-z0-9_-]+)\.(?P<dataset_id>[a-zA-z0-9_-]+)\.(?P<table_id>[a-zA-z0-9_-]+)"
|
||||
)
|
||||
|
@ -83,7 +87,27 @@ def table_matches_patterns(pattern, invert, table):
|
|||
return matching != invert
|
||||
|
||||
|
||||
def paths_matching_name_pattern(pattern, sql_path, project_id, files=["*.sql"]):
|
||||
def paths_matching_checks_pattern(pattern, sql_path, project_id):
|
||||
"""Return single path to checks.sql matching the name pattern."""
|
||||
checks_files = paths_matching_name_pattern(
|
||||
pattern, sql_path, project_id, ["checks.sql"], CHECKS_FILE_RE
|
||||
)
|
||||
|
||||
if len(checks_files) == 1:
|
||||
match = CHECKS_FILE_RE.match(str(checks_files[0]))
|
||||
if match:
|
||||
project = match.group(1)
|
||||
dataset = match.group(2)
|
||||
table = match.group(3)
|
||||
return checks_files[0], project, dataset, table
|
||||
else:
|
||||
print(f"No checks.sql file found in {sql_path}/{project_id}/{pattern}")
|
||||
return None, None, None, None
|
||||
|
||||
|
||||
def paths_matching_name_pattern(
|
||||
pattern, sql_path, project_id, files=["*.sql"], file_regex=QUERY_FILE_RE
|
||||
):
|
||||
"""Return paths to queries matching the name pattern."""
|
||||
matching_files = []
|
||||
|
||||
|
@ -107,7 +131,7 @@ def paths_matching_name_pattern(pattern, sql_path, project_id, files=["*.sql"]):
|
|||
all_matching_files.extend(Path(sql_path).rglob(file))
|
||||
|
||||
for query_file in all_matching_files:
|
||||
match = QUERY_FILE_RE.match(str(query_file))
|
||||
match = file_regex.match(str(query_file))
|
||||
if match:
|
||||
project = match.group(1)
|
||||
dataset = match.group(2)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
ASSERT((SELECT COUNT(*) FROM `{{project_id}}.{{dataset_id}}.{{table_name}}` WHERE download_date = @download_date) > 250000)
|
||||
AS 'ETL Data Check Failed: Table {{project_id}}.{{dataset_id}}.{{table_name}} contains less than 250,000 rows for date: {{ download_date }}.'
|
|
@ -0,0 +1,36 @@
|
|||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from bigquery_etl.cli.check import _build_jinja_parameters, _parse_check_output
|
||||
|
||||
|
||||
class TestCheck:
|
||||
@pytest.fixture
|
||||
def runner(self):
|
||||
return CliRunner()
|
||||
|
||||
def test_parse_check_output(self):
|
||||
expected = "ETL Data Check Failed: a check failed"
|
||||
assert _parse_check_output(expected) == expected
|
||||
|
||||
test2 = "remove prepended text ETL Data Check Failed: a check failed"
|
||||
assert _parse_check_output(test2) == expected
|
||||
|
||||
test3 = "no match for text Data Check Failed: a check failed"
|
||||
assert _parse_check_output(test3) == test3
|
||||
|
||||
def test_build_jinja_parameters(self):
|
||||
test = [
|
||||
"--parameter=submission_date::2023-06-01",
|
||||
"--parameter=id::asdf",
|
||||
"--use_legacy_sql=false",
|
||||
"--project_id=moz-fx-data-marketing-prod",
|
||||
"--debug",
|
||||
]
|
||||
expected = {
|
||||
"submission_date": "2023-06-01",
|
||||
"id": "asdf",
|
||||
"use_legacy_sql": "false",
|
||||
"project_id": "moz-fx-data-marketing-prod",
|
||||
}
|
||||
assert _build_jinja_parameters(test) == expected
|
Загрузка…
Ссылка в новой задаче