feat(DENG-949): Added `render` subcommand and `--dry-run` flag to the bqetl check command (#4045)
* added render subcommand to the bqetl check command * added a dry_run flag to bqqetl check run command * added a test to make sure run command exists with status code 0 * added test for check render subcommand * fixing linter checks * attempting using an alternative way of testing the render command * fixing render test by testing the _render() directly rather than the render cli wrapper * removed dead test * Apply suggestions from code review by ascholtz Co-authored-by: Anna Scholtz <anna@scholtzan.net> * fixed black and mypy errors * fixed app_store_funnel_v1 check formatting * reformatted tests checks --------- Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Родитель
6e84fe7b6a
Коммит
b927ed22be
|
@ -5,10 +5,13 @@ import sys
|
|||
import tempfile
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import click
|
||||
import sqlparse
|
||||
|
||||
from bigquery_etl.format_sql.formatter import reformat
|
||||
|
||||
from ..cli.utils import (
|
||||
is_authenticated,
|
||||
paths_matching_checks_pattern,
|
||||
|
@ -36,6 +39,9 @@ def _build_jinja_parameters(query_args):
|
|||
if param_and_value[0].startswith("--"):
|
||||
parameters[param_and_value[0].strip("--")] = param_and_value[1]
|
||||
else:
|
||||
if query_arg == "--dry_run":
|
||||
continue
|
||||
|
||||
print(f"parameter {query_arg} will not be used to render Jinja template.")
|
||||
return parameters
|
||||
|
||||
|
@ -64,6 +70,89 @@ def check(ctx):
|
|||
ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory())
|
||||
|
||||
|
||||
@check.command(
|
||||
help="""
|
||||
Render ETL checks query. Also, renders query parameters if passed.
|
||||
s \b
|
||||
|
||||
Example:
|
||||
./bqetl check render ga_derived.downloads_with_attribution_v2 --parameter=download_date:DATE:2023-05-01
|
||||
""",
|
||||
context_settings=dict(
|
||||
ignore_unknown_options=True,
|
||||
allow_extra_args=True,
|
||||
),
|
||||
)
|
||||
@click.argument("name")
|
||||
@project_id_option()
|
||||
@sql_dir_option
|
||||
@click.pass_context
|
||||
def render(
|
||||
ctx: click.Context, name: str, project_id: Optional[str], sql_dir: Optional[str]
|
||||
) -> None:
|
||||
"""Render a check's Jinja template."""
|
||||
checks_file, project_id, dataset_id, table = paths_matching_checks_pattern(
|
||||
name, sql_dir, project_id=project_id
|
||||
)
|
||||
|
||||
click.echo(
|
||||
_render(
|
||||
checks_file,
|
||||
dataset_id,
|
||||
table,
|
||||
project_id=project_id,
|
||||
query_arguments=ctx.args[:],
|
||||
)
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _render(
|
||||
checks_file: Path,
|
||||
dataset_id: str,
|
||||
table: str,
|
||||
project_id: Union[str, None] = None,
|
||||
query_arguments: List[str] = list(),
|
||||
):
|
||||
if checks_file is None:
|
||||
return
|
||||
|
||||
checks_file = Path(checks_file)
|
||||
|
||||
query_arguments.append("--use_legacy_sql=false")
|
||||
|
||||
if project_id is not None:
|
||||
query_arguments.append(f"--project_id={project_id}")
|
||||
|
||||
# Convert all the Airflow params to jinja usable dict.
|
||||
parameters = _build_jinja_parameters(query_arguments)
|
||||
|
||||
jinja_params = {
|
||||
**{"dataset_id": dataset_id, "table_name": table},
|
||||
**parameters,
|
||||
}
|
||||
|
||||
rendered_check_query = render_template(
|
||||
checks_file.name,
|
||||
template_folder=str(checks_file.parent),
|
||||
templates_dir="",
|
||||
format=False,
|
||||
**jinja_params,
|
||||
)
|
||||
|
||||
# replace query @params with param values passed via the cli
|
||||
for param, value in parameters.items():
|
||||
if param in rendered_check_query:
|
||||
rendered_check_query = rendered_check_query.replace(
|
||||
f"@{param}", f'"{value}"'
|
||||
)
|
||||
|
||||
rendered_check_query = reformat(rendered_check_query)
|
||||
|
||||
return rendered_check_query
|
||||
|
||||
|
||||
@check.command(
|
||||
help="""
|
||||
Run ETL checks.
|
||||
|
@ -80,8 +169,15 @@ s \b
|
|||
@click.argument("name")
|
||||
@project_id_option()
|
||||
@sql_dir_option
|
||||
@click.option(
|
||||
"--dry_run",
|
||||
"--dry-run",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="To dry run the query to make sure it is valid",
|
||||
)
|
||||
@click.pass_context
|
||||
def run(ctx, name, project_id, sql_dir):
|
||||
def run(ctx, name, project_id, sql_dir, dry_run):
|
||||
"""Run a check."""
|
||||
if not is_authenticated():
|
||||
click.echo(
|
||||
|
@ -100,6 +196,7 @@ def run(ctx, name, project_id, sql_dir):
|
|||
dataset_id,
|
||||
table,
|
||||
ctx.args,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
|
||||
|
@ -109,6 +206,7 @@ def _run_check(
|
|||
dataset_id,
|
||||
table,
|
||||
query_arguments,
|
||||
dry_run=False,
|
||||
):
|
||||
"""Run the check."""
|
||||
if checks_file is None:
|
||||
|
@ -120,6 +218,9 @@ def _run_check(
|
|||
if project_id is not None:
|
||||
query_arguments.append(f"--project_id={project_id}")
|
||||
|
||||
if dry_run is True:
|
||||
query_arguments.append("--dry_run")
|
||||
|
||||
# Convert all the Airflow params to jinja usable dict.
|
||||
parameters = _build_jinja_parameters(query_arguments)
|
||||
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
{{ is_unique(["`date`", "country"]) }}
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
{{ not_null(["submission_date", "os"], "submission_date = @submission_date") }}
|
||||
{{ min_rows(1, "submission_date = @submission_date") }}
|
||||
{{ is_unique(["submission_date", "os", "country"], "submission_date = @submission_date")}}
|
||||
{{ in_range(["non_ssl_loads", "ssl_loads", "reporting_ratio"], 0, none, "submission_date = @submission_date") }}
|
||||
|
||||
{{ min_rows(1, "submission_date = @submission_date") }}
|
||||
{{ is_unique(["submission_date", "os", "country"], "submission_date = @submission_date")}}
|
||||
{{ in_range(["non_ssl_loads", "ssl_loads", "reporting_ratio"], 0, none, "submission_date = @submission_date") }}
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
{% elif max is not none -%}
|
||||
{{ col }} > {{ max }}
|
||||
{% endif -%}
|
||||
) > 0,
|
||||
) > 0,
|
||||
"{{ col }}",
|
||||
NULL
|
||||
){% if not loop.last -%},{% endif -%}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from bigquery_etl.cli.check import _build_jinja_parameters, _parse_check_output
|
||||
from bigquery_etl.cli.check import _build_jinja_parameters, _parse_check_output, _render
|
||||
|
||||
|
||||
class TestCheck:
|
||||
|
@ -34,3 +37,36 @@ class TestCheck:
|
|||
"project_id": "moz-fx-data-marketing-prod",
|
||||
}
|
||||
assert _build_jinja_parameters(test) == expected
|
||||
|
||||
def test_check_render(self):
|
||||
checks_file = Path(
|
||||
"tests/sql/moz-fx-data-shared-prod/telemetry_derived/clients_daily_v6/checks.sql"
|
||||
)
|
||||
|
||||
actual = _render(
|
||||
checks_file=checks_file,
|
||||
dataset_id="telemetry_derived",
|
||||
table="clients_daily_v6",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
query_arguments=[
|
||||
"--parameter=submission_date:DATE:2023-07-01",
|
||||
],
|
||||
)
|
||||
|
||||
expected = dedent(
|
||||
"""\
|
||||
ASSERT(
|
||||
(
|
||||
SELECT
|
||||
COUNT(*)
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6`
|
||||
WHERE
|
||||
submission_date = "2023-07-01"
|
||||
) > 0
|
||||
)
|
||||
AS
|
||||
'ETL Data Check Failed: Table moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6 contains 0 rows for date: 2023-07-01.'"""
|
||||
)
|
||||
|
||||
assert actual == expected
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
ASSERT(
|
||||
(
|
||||
SELECT
|
||||
COUNT(*)
|
||||
FROM
|
||||
`{{project_id}}.{{dataset_id}}.{{table_name}}`
|
||||
WHERE
|
||||
submission_date = @submission_date
|
||||
) > 0
|
||||
)
|
||||
AS
|
||||
'ETL Data Check Failed: Table {{project_id}}.{{dataset_id}}.{{table_name}} contains 0 rows for date: {{ submission_date }}.'
|
Загрузка…
Ссылка в новой задаче