Deng 877 autogenerate checks during shredder mitigation (#6243)

* Reference to the shredder mitigation process during backfills.

* missing dash

* Auto-generate and run data checks. Validate shredder_mitigation label.

* Checks template.

* Fix tests.

* Update checks to include additional EXCEPT, use table_id in staging dataset,  and ensure that tests run and generate a failure that stops the backfill.
This commit is contained in:
Lucia 2024-09-24 17:35:25 +02:00 коммит произвёл GitHub
Родитель 5bc2904263
Коммит 29bb468663
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
8 изменённых файлов: 927 добавлений и 190 удалений

Просмотреть файл

@ -2,6 +2,7 @@
import os
import re
import sys
from datetime import date
from datetime import datetime as dt
from datetime import time, timedelta
@ -19,6 +20,7 @@ from jinja2 import Environment, FileSystemLoader
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.metadata.parse_metadata import METADATA_FILE, Metadata
from bigquery_etl.metadata.validate_metadata import SHREDDER_MITIGATION_LABEL
from bigquery_etl.schema import Schema
from bigquery_etl.util.common import extract_last_group_by_from_query, write_sql
@ -26,12 +28,13 @@ PREVIOUS_DATE = (dt.now() - timedelta(days=2)).date()
TEMP_DATASET = "tmp"
THIS_PATH = Path(os.path.dirname(__file__))
DEFAULT_PROJECT_ID = "moz-fx-data-shared-prod"
QUERY_WITH_MITIGATION_NAME = "query_with_shredder_mitigation"
SHREDDER_MITIGATION_QUERY_NAME = "shredder_mitigation_query"
SHREDDER_MITIGATION_CHECKS_NAME = "shredder_mitigation_checks"
WILDCARD_STRING = "???????"
WILDCARD_NUMBER = -9999999
QUERY_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|query_with_shredder_mitigation\.sql|part1\.sql|script\.sql|"
r"(?:query\.sql|shredder_mitigation_query\.sql|part1\.sql|script\.sql|"
r"query\.py|view\.sql|metadata\.yaml|backfill\.yaml)$"
)
@ -174,6 +177,18 @@ class Subset:
partitioning = {"type": None, "field": None}
return partitioning
@property
def labels(self):
"""Return the labels in the metadata of the destination table."""
metadata = Metadata.from_file(
Path("sql")
/ self.project_id
/ self.dataset
/ self.destination_table
/ METADATA_FILE
)
return metadata.labels
def generate_query(
self,
select_list,
@ -248,18 +263,12 @@ class Subset:
query_results = query_job.result()
except NotFound as e:
raise click.ClickException(
f"Unable to query data for {backfill_date}. Table {self.full_table_id} not found."
f"Unable to query data for {backfill_date}. Table {self.full_table_id} not found"
f" or partitioning in metadata missing."
) from e
rows = [dict(row) for row in query_results]
return rows
def compare_current_and_previous_version(
self,
date_partition_parameter,
):
"""Generate and run a data check to compare existing and backfilled data."""
return NotImplemented
def get_bigquery_type(value) -> DataTypeGroup:
"""Find the datatype of a value, grouping similar types."""
@ -441,13 +450,29 @@ def classify_columns(
def generate_query_with_shredder_mitigation(
client, project_id, dataset, destination_table, backfill_date=PREVIOUS_DATE
client,
project_id,
dataset,
destination_table,
staging_table_name,
backfill_date=PREVIOUS_DATE,
) -> Tuple[Path, str]:
"""Generate a query to backfill with shredder mitigation."""
query_with_mitigation_path = Path("sql") / project_id
# Find query files and grouping of previous and new queries.
new = Subset(client, destination_table, "new_version", dataset, project_id, None)
if SHREDDER_MITIGATION_LABEL not in new.labels:
click.echo(
click.style(
"The required label `shredder_mitigation` is missing in the metadata of the "
"table. The process will now terminate.",
fg="yellow",
)
)
sys.exit(1)
if new.version < 2:
raise click.ClickException(
f"The new version of the table is expected >= 2. Actual is {new.version}."
@ -761,7 +786,10 @@ def generate_query_with_shredder_mitigation(
# Generate query using the template.
env = Environment(loader=FileSystemLoader(str(THIS_PATH)))
query_with_mitigation_template = env.get_template(
f"{QUERY_WITH_MITIGATION_NAME}_template.sql"
f"{SHREDDER_MITIGATION_QUERY_NAME}_template.sql"
)
checks_for_mitigation_template = env.get_template(
f"{SHREDDER_MITIGATION_CHECKS_NAME}_template.sql"
)
query_with_mitigation_sql = reformat(
@ -780,16 +808,62 @@ def generate_query_with_shredder_mitigation(
write_sql(
output_dir=query_with_mitigation_path,
full_table_id=new.full_table_id,
basename=f"{QUERY_WITH_MITIGATION_NAME}.sql",
basename=f"{SHREDDER_MITIGATION_QUERY_NAME}.sql",
sql=query_with_mitigation_sql,
skip_existing=False,
)
# Generate checks to compare versions after each partition backfill.
checks_select = (
[new.partitioning["field"]]
+ [
dim.name
for dim in common_dimensions
if (dim.name != new.partitioning["field"])
]
+ [f"SUM({metric.name})" f" AS {metric.name}" for metric in metrics]
)
previous_checks_query = previous.generate_query(
select_list=checks_select,
from_clause=f"`{previous.full_table_id}`",
where_clause=f"{previous.partitioning['field']} = @{previous.partitioning['field']}",
group_by_clause="ALL",
)
new_checks_query = new.generate_query(
select_list=checks_select,
from_clause=f"`{staging_table_name}`",
where_clause=f"{previous.partitioning['field']} = @{previous.partitioning['field']}",
group_by_clause="ALL",
)
checks_for_mitigation_sql = reformat(
checks_for_mitigation_template.render(
previous_version_cte=previous.query_cte,
previous_version=previous_checks_query,
new_version_cte=new.query_cte,
new_version=new_checks_query,
)
)
write_sql(
output_dir=query_with_mitigation_path,
full_table_id=new.full_table_id,
basename=f"{SHREDDER_MITIGATION_CHECKS_NAME}.sql",
sql=checks_for_mitigation_sql,
skip_existing=False,
)
query_path = Path("sql") / new.project_id / new.dataset / new.destination_table
click.echo(
click.style(
f"Files for shredder mitigation written to path `{query_path}`.\n"
f"Query: `{SHREDDER_MITIGATION_QUERY_NAME}.sql`\n"
f"Data checks: `{SHREDDER_MITIGATION_CHECKS_NAME}.sql`\n",
fg="blue",
)
)
return (
Path("sql")
/ new.project_id
/ new.dataset
/ new.destination_table
/ f"{QUERY_WITH_MITIGATION_NAME}.sql",
query_path,
query_with_mitigation_sql,
)

Просмотреть файл

@ -0,0 +1,69 @@
-- Checks generated using a template for shredder mitigation.
-- Rows in previous version not matching in new version. Mismatches can happen when the row is
-- missing or any column doesn't match, including a single metric difference.
#fail
WITH {{ previous_version_cte | default('previous_version') }} AS (
{{ previous_version | default('SELECT 1') }}
),
{{ new_version_cte | default('new_version') }} AS (
{{ new_version | default('SELECT 1') }}
),
previous_not_matching AS (
SELECT
*
FROM
{{ previous_version_cte | default('previous_version') }}
EXCEPT DISTINCT
SELECT
*
FROM
{{ new_version_cte | default('new_version') }}
)
SELECT
IF(
(SELECT COUNT(*) FROM previous_not_matching) > 0,
ERROR(
CONCAT(
((SELECT COUNT(*) FROM previous_not_matching)),
" rows in the previous data don't match backfilled data! Run auto-generated checks for ",
"all mismatches & search for rows missing or with differences in metrics. 5 sample rows: ",
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM previous_not_matching LIMIT 5)))
)
),
NULL
);
-- Rows in new version not matching in previous version. It could be rows added by the process or rows with differences.
#fail
WITH {{ previous_version_cte | default('previous_version') }} AS (
{{ previous_version | default('SELECT 1') }}
),
{{ new_version_cte | default('new_version') }} AS (
{{ new_version | default('SELECT 1') }}
),
backfilled_not_matching AS (
SELECT
*
FROM
{{ new_version_cte | default('new_version') }}
EXCEPT DISTINCT
SELECT
*
FROM
{{ previous_version_cte | default('previous_version') }}
)
SELECT
IF(
(SELECT COUNT(*) FROM backfilled_not_matching) > 0,
ERROR(
CONCAT(
((SELECT COUNT(*) FROM backfilled_not_matching)),
" rows in backfill don't match previous version of data! Run auto-generated checks for ",
"all mismatches & search for rows added or with differences in metrics. 5 sample rows: ",
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM backfilled_not_matching LIMIT 5)))
)
),
NULL
);

Просмотреть файл

@ -22,7 +22,11 @@ from ..backfill.parse import (
Backfill,
BackfillStatus,
)
from ..backfill.shredder_mitigation import generate_query_with_shredder_mitigation
from ..backfill.shredder_mitigation import (
SHREDDER_MITIGATION_CHECKS_NAME,
SHREDDER_MITIGATION_QUERY_NAME,
generate_query_with_shredder_mitigation,
)
from ..backfill.utils import (
get_backfill_backup_table_name,
get_backfill_file_from_qualified_table_name,
@ -508,6 +512,8 @@ def _initiate_backfill(
log.info(logging_str)
custom_query_path = None
checks = None
custom_checks_name = None
if entry.shredder_mitigation is True:
click.echo(
click.style(
@ -515,14 +521,17 @@ def _initiate_backfill(
fg="blue",
)
)
query, _ = generate_query_with_shredder_mitigation(
query_path, _ = generate_query_with_shredder_mitigation(
client=bigquery.Client(project=project),
project_id=project,
dataset=dataset,
destination_table=table,
staging_table_name=backfill_staging_qualified_table_name,
backfill_date=entry.start_date.isoformat(),
)
custom_query_path = Path(query)
custom_query_path = Path(query_path) / f"{SHREDDER_MITIGATION_QUERY_NAME}.sql"
checks = True
custom_checks_name = f"{SHREDDER_MITIGATION_CHECKS_NAME}.sql"
click.echo(
click.style(
f"Starting backfill with custom query: '{custom_query_path}'.",
@ -532,7 +541,7 @@ def _initiate_backfill(
elif entry.custom_query_path:
custom_query_path = Path(entry.custom_query_path)
# backfill table
# Backfill table
# in the long-run we should remove the query backfill command and require a backfill entry for all backfills
try:
ctx.invoke(
@ -545,7 +554,17 @@ def _initiate_backfill(
destination_table=backfill_staging_qualified_table_name,
parallelism=parallelism,
dry_run=dry_run,
**({"custom_query_path": custom_query_path} if custom_query_path else {}),
**(
{
k: param
for k, param in [
("custom_query_path", custom_query_path),
("checks", checks),
("checks_file_name", custom_checks_name),
]
if param is not None
}
),
billing_project=billing_project,
)
except subprocess.CalledProcessError as e:

Просмотреть файл

@ -78,6 +78,7 @@ VERSION_RE = re.compile(r"_v[0-9]+")
DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$")
DEFAULT_DAG_NAME = "bqetl_default"
DEFAULT_INIT_PARALLELISM = 10
DEFAULT_CHECKS_FILE_NAME = "checks.sql"
@click.group(help="Commands for managing queries.")
@ -464,6 +465,7 @@ def _backfill_query(
backfill_date,
destination_table,
run_checks,
checks_file_name,
billing_project,
):
"""Run a query backfill for a specific date."""
@ -532,7 +534,7 @@ def _backfill_query(
)
# Run checks on the query
checks_file = query_file_path.parent / "checks.sql"
checks_file = query_file_path.parent / checks_file_name
if run_checks and checks_file.exists():
table_name = checks_file.parent.name
# query_args have things like format, which we don't want to push
@ -637,6 +639,12 @@ def _backfill_query(
help="Name of a custom query to run the backfill. If not given, the proces runs as usual.",
default=None,
)
@click.option(
"--checks_file_name",
"--checks_file_name",
help="Name of a custom data checks file to run after each partition backfill. E.g. custom_checks.sql. Optional.",
default=None,
)
@click.option(
"--scheduling_overrides",
"--scheduling-overrides",
@ -663,6 +671,7 @@ def backfill(
parallelism,
destination_table,
checks,
checks_file_name,
custom_query_path,
scheduling_overrides,
):
@ -760,6 +769,7 @@ def backfill(
partitioning_type,
destination_table=destination_table,
run_checks=checks,
checks_file_name=checks_file_name or DEFAULT_CHECKS_FILE_NAME,
billing_project=billing_project,
)

Просмотреть файл

@ -17,7 +17,7 @@ from bigquery_etl.util.common import TempDatasetReference, project_dirs
QUERY_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|query_with_shredder_mitigation\.sql|part1\.sql|script\.sql|query\.py|view\.sql|metadata\.yaml|backfill\.yaml)$"
r"(?:query\.sql|shredder_mitigation_query\.sql|part1\.sql|script\.sql|query\.py|view\.sql|metadata\.yaml|backfill\.yaml)$"
)
CHECKS_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"

Просмотреть файл

@ -294,8 +294,8 @@ def extract_last_group_by_from_query(
"""Return the list of columns in the latest group by of a query."""
if not sql_path and not sql_text:
raise click.ClickException(
"Function extract_last_group_by_from_query() called without an "
"sql file or text to extract the group by."
"Extracting GROUP BY from query failed due to sql file"
" or sql text not available."
)
if sql_path:

Просмотреть файл

@ -14,7 +14,8 @@ from gcloud import bigquery # type: ignore
from bigquery_etl.backfill.shredder_mitigation import (
PREVIOUS_DATE,
QUERY_WITH_MITIGATION_NAME,
SHREDDER_MITIGATION_CHECKS_NAME,
SHREDDER_MITIGATION_QUERY_NAME,
Column,
ColumnStatus,
ColumnType,
@ -795,6 +796,40 @@ class TestSubset:
)
assert test_subset.partitioning == {"field": None, "type": None}
@patch("google.cloud.bigquery.Client")
def test_labels(self, mock_client, runner):
"""Test that partitioning type and value associated to a subset are returned as expected."""
test_subset = Subset(
mock_client,
self.destination_table,
None,
self.dataset,
self.project_id,
None,
)
with runner.isolated_filesystem():
os.makedirs(Path(self.path), exist_ok=True)
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n field: submission_date\n"
"friendly_name: Test\ndescription: Test\nlabels:\n change_controlled: true"
)
assert "change_controlled" in test_subset.labels
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"friendly_name: Test\ndescription: Test\nlabels:\n "
"incremental: true\n change_controlled: true\n shredder_mitigation: true"
)
for label in ["change_controlled", "shredder_mitigation", "incremental"]:
assert label in test_subset.labels
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write("friendly_name: Test\ndescription: Test")
assert test_subset.labels == {}
assert "shredder_mitigation" not in test_subset.labels
@patch("google.cloud.bigquery.Client")
def test_generate_query(self, mock_client):
"""Test method generate_query with expected subset queries and exceptions."""
@ -931,10 +966,6 @@ class TestSubset:
result = test_subset.get_query_path_results(None)
assert result == expected
def test_generate_check_with_previous_version(self):
"""Test the auto-generation of data checks."""
assert True
class TestGenerateQueryWithShredderMitigation:
"""Test function generate_query_with_shredder_mitigation and returned query for backfill."""
@ -942,6 +973,7 @@ class TestGenerateQueryWithShredderMitigation:
project_id = "moz-fx-data-shared-prod"
dataset = "test"
destination_table = "test_query_v2"
staging_table_name = f"{project_id}.{dataset}.test_query_v2__2021_01_01"
destination_table_previous = "test_query_v1"
path = Path("sql") / project_id / dataset / destination_table
path_previous = Path("sql") / project_id / dataset / destination_table_previous
@ -959,11 +991,7 @@ class TestGenerateQueryWithShredderMitigation:
"""Test that query is generated as expected given a set of mock dimensions and metrics."""
expected = (
Path("sql")
/ self.project_id
/ self.dataset
/ self.destination_table
/ f"{QUERY_WITH_MITIGATION_NAME}.sql",
Path("sql") / self.project_id / self.dataset / self.destination_table,
"""-- Query generated using a template for shredder mitigation.
WITH new_version AS (
SELECT
@ -1062,12 +1090,14 @@ class TestGenerateQueryWithShredderMitigation:
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: submission_date\n require_partition_filter: true"
"field: submission_date\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: submission_date\n require_partition_filter: true"
"field: submission_date\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
mock_classify_columns.return_value = (
@ -1111,142 +1141,22 @@ class TestGenerateQueryWithShredderMitigation:
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert result[0] == expected[0]
assert result[1] == expected[1].replace(" ", "")
@patch("google.cloud.bigquery.Client")
def test_missing_previous_version(self, mock_client, runner):
"""Test that the process raises an exception when previous query version is missing."""
expected_exc = (
"Function extract_last_group_by_from_query() called without an sql file or "
"text to extract the group by."
)
with runner.isolated_filesystem():
path = f"sql/{self.project_id}/{self.dataset}/{self.destination_table}"
os.makedirs(path, exist_ok=True)
with open(Path(path) / "query.sql", "w") as f:
f.write("SELECT column_1, column_2 FROM upstream_1 GROUP BY column_1")
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
backfill_date=PREVIOUS_DATE,
assert os.path.isfile(
expected[0] / f"{SHREDDER_MITIGATION_QUERY_NAME}.sql"
)
assert (str(e.value.message)) == expected_exc
assert (e.type) == ClickException
@patch("google.cloud.bigquery.Client")
def test_invalid_group_by(self, mock_client, runner):
"""Test that the process raises an exception when the GROUP BY is invalid for any query."""
expected_exc = (
"GROUP BY must use an explicit list of columns. "
"Avoid expressions like `GROUP BY ALL` or `GROUP BY 1, 2, 3`."
)
# client = bigquery.Client()
project_id = "moz-fx-data-shared-prod"
dataset = "test"
destination_table = "test_query_v2"
destination_table_previous = "test_query_v1"
# GROUP BY including a number
with runner.isolated_filesystem():
previous_group_by = "column_1, column_2, column_3"
new_group_by = "3, column_4, column_5"
path = f"sql/{project_id}/{dataset}/{destination_table}"
path_previous = f"sql/{project_id}/{dataset}/{destination_table_previous}"
os.makedirs(path, exist_ok=True)
os.makedirs(path_previous, exist_ok=True)
with open(Path(path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=project_id,
dataset=dataset,
destination_table=destination_table,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY 1, 2, 3
previous_group_by = "1, 2, 3"
new_group_by = "column_1, column_2, column_3"
with open(Path(path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=project_id,
dataset=dataset,
destination_table=destination_table,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY ALL
previous_group_by = "column_1, column_2, column_3"
new_group_by = "ALL"
with open(Path(path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=project_id,
dataset=dataset,
destination_table=destination_table,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY is missing
previous_group_by = "column_1, column_2, column_3"
with open(Path(path) / "query.sql", "w") as f:
f.write("SELECT column_1, column_2 FROM upstream_1")
with open(Path(path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=project_id,
dataset=dataset,
destination_table=destination_table,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")
def test_generate_query_called_with_correct_parameters(
def test_generate_query_failed_for_missing_partitioning(
self, mock_classify_columns, mock_client, runner
):
"""Test that function generate_query is called with the correct parameters."""
"""Test that function raises exception for required query parameter missing
in metadata, instead of generating wrong query."""
existing_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
@ -1264,27 +1174,31 @@ class TestGenerateQueryWithShredderMitigation:
with runner.isolated_filesystem():
os.makedirs(self.path, exist_ok=True)
os.makedirs(self.path_previous, exist_ok=True)
with open(Path(self.path) / "query.sql", "w") as f:
f.write("SELECT column_1 FROM upstream_1 GROUP BY column_1")
with open(Path(self.path) / "metadata.yaml", "w") as f:
with open(self.path / "query.sql", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: submission_date\n require_partition_filter: true"
"SELECT column_1, column_2, metric_1 FROM upstream_1"
" WHERE column_1 = @column_1 GROUP BY column_1, column_2"
)
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write("SELECT column_1 FROM upstream_1 GROUP BY column_1")
with open(self.path_previous / "query.sql", "w") as f:
f.write(
"SELECT column_1, metric_1 FROM upstream_1"
" WHERE column_1 = @column_1 GROUP BY column_1"
)
with open(self.path / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(new_schema))
with open(self.path_previous / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(existing_schema))
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write("labels:\n shredder_mitigation: true")
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: submission_date\n require_partition_filter: true"
"field: submission_date\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
with open(self.path / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(new_schema))
with open(
self.path_previous / "schema.yaml",
"w",
) as f:
f.write(yaml.safe_dump(existing_schema))
mock_classify_columns.return_value = (
[
@ -1315,6 +1229,503 @@ class TestGenerateQueryWithShredderMitigation:
[],
)
with pytest.raises(TypeError) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert str(e.value) == "'NoneType' object is not iterable"
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")
def test_generate_checks_as_expected(
self, mock_classify_columns, mock_client, runner
):
"""Test that checks are generated as expected when calling the function."""
expected = (
Path("sql") / self.project_id / self.dataset / self.destination_table,
"""-- dummy query""",
)
expected_checks_content = """-- Checks generated using a template for shredder mitigation.
-- Rows in previous version not matching in new version. Mismatches can happen when the row is
-- missing or any column doesn't match, including a single metric difference.
#fail
WITH previous AS (
SELECT
column_1,
column_2,
SUM(metric_1) AS metric_1,
SUM(metric_2) AS metric_2
FROM
`moz-fx-data-shared-prod.test.test_query_v1`
WHERE
column_1 = @column_1
GROUP BY
ALL
),
new_version AS (
SELECT
column_1,
column_2,
SUM(metric_1) AS metric_1,
SUM(metric_2) AS metric_2
FROM
`moz-fx-data-shared-prod.test.test_query_v2__2021_01_01`
WHERE
column_1 = @column_1
GROUP BY
ALL
),
previous_not_matching AS (
SELECT
*
FROM
previous
EXCEPT DISTINCT
SELECT
*
FROM
new_version
)
SELECT
IF(
(SELECT COUNT(*) FROM previous_not_matching) > 0,
ERROR(
CONCAT(
((SELECT COUNT(*) FROM previous_not_matching)),
" rows in the previous data don't match backfilled data! Run auto-generated checks for ",
"all mismatches & search for rows missing or with differences in metrics. 5 sample rows: ",
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM previous_not_matching LIMIT 5)))
)
),
NULL
);
-- Rows in new version not matching in previous version. It could be rows added by the process or rows with differences.
#fail
WITH previous AS (
SELECT
column_1,
column_2,
SUM(metric_1) AS metric_1,
SUM(metric_2) AS metric_2
FROM
`moz-fx-data-shared-prod.test.test_query_v1`
WHERE
column_1 = @column_1
GROUP BY
ALL
),
new_version AS (
SELECT
column_1,
column_2,
SUM(metric_1) AS metric_1,
SUM(metric_2) AS metric_2
FROM
`moz-fx-data-shared-prod.test.test_query_v2__2021_01_01`
WHERE
column_1 = @column_1
GROUP BY
ALL
),
backfilled_not_matching AS (
SELECT
*
FROM
new_version
EXCEPT DISTINCT
SELECT
*
FROM
previous
)
SELECT
IF(
(SELECT COUNT(*) FROM backfilled_not_matching) > 0,
ERROR(
CONCAT(
((SELECT COUNT(*) FROM backfilled_not_matching)),
" rows in backfill don't match previous version of data! Run auto-generated checks for ",
"all mismatches & search for rows added or with differences in metrics. 5 sample rows: ",
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM backfilled_not_matching LIMIT 5)))
)
),
NULL
);
"""
existing_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "column_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "metric_2", "type": "NUMERIC", "mode": "NULLABLE"},
]
}
new_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "column_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "column_3", "type": "STRING", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "metric_2", "type": "NUMERIC", "mode": "NULLABLE"},
]
}
with runner.isolated_filesystem():
os.makedirs(self.path, exist_ok=True)
os.makedirs(self.path_previous, exist_ok=True)
with open(self.path / "query.sql", "w") as f:
f.write(
"SELECT column_1, column_2, column_3, metric_1, metric_2 FROM upstream_1"
" GROUP BY column_1, column_2, column_3"
)
with open(self.path_previous / "query.sql", "w") as f:
f.write(
"SELECT column_1, column_2, metric_1, metric_2 FROM upstream_1"
" GROUP BY column_1, column_2"
)
with open(self.path / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(new_schema))
with open(self.path_previous / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(existing_schema))
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: column_1\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: column_1\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
mock_classify_columns.return_value = (
[
Column(
"column_1",
DataTypeGroup.STRING,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
),
Column(
"column_2",
DataTypeGroup.STRING,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
),
],
[
Column(
"column_3",
DataTypeGroup.STRING,
ColumnType.DIMENSION,
ColumnStatus.ADDED,
)
],
[],
[
Column(
"metric_1",
DataTypeGroup.INTEGER,
ColumnType.METRIC,
ColumnStatus.COMMON,
),
Column(
"metric_2",
DataTypeGroup.NUMERIC,
ColumnType.METRIC,
ColumnStatus.COMMON,
),
],
[],
)
with patch.object(
Subset,
"get_query_path_results",
return_value=[
{
"column_1": "2021-01-01",
"column_2": "DEF",
"column_3": "ABC",
"metric_1": 10.0,
"metric_2": 1028374.439587349875643,
}
],
):
assert os.path.isfile(self.path / "query.sql")
assert os.path.isfile(self.path_previous / "query.sql")
result = generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
checks_file = self.path / f"{SHREDDER_MITIGATION_CHECKS_NAME}.sql"
assert result[0] == expected[0]
assert os.path.isfile(checks_file)
with open(checks_file) as file:
checks_content = file.read()
# Normalize multilines to avoid assert failure due to indentation.
checks_content_normalized = "\n".join(
[
line.strip()
for line in checks_content.splitlines()
if line.strip() != ""
]
).strip()
expected_checks_content_normalized = "\n".join(
[
line.strip()
for line in expected_checks_content.splitlines()
if line.strip() != ""
]
).strip()
assert checks_content_normalized == expected_checks_content_normalized
@patch("google.cloud.bigquery.Client")
def test_missing_previous_version(self, mock_client, runner):
"""Test that the process raises an exception when previous query version is missing."""
expected_exc = "Extracting GROUP BY from query failed due to sql file or sql text not available."
with runner.isolated_filesystem():
os.makedirs(self.path, exist_ok=True)
with open(Path(self.path) / "query.sql", "w") as f:
f.write("SELECT column_1, column_2 FROM upstream_1 GROUP BY column_1")
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"Friendly name: Test\n" "labels:\n shredder_mitigation: true"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
assert (e.type) == ClickException
@patch("google.cloud.bigquery.Client")
def test_invalid_group_by(self, mock_client, runner):
"""Test that the process raises an exception when the GROUP BY is invalid for any query."""
expected_exc = (
"GROUP BY must use an explicit list of columns. "
"Avoid expressions like `GROUP BY ALL` or `GROUP BY 1, 2, 3`."
)
# GROUP BY including a number
with runner.isolated_filesystem():
previous_group_by = "column_1, column_2, column_3"
new_group_by = "3, column_4, column_5"
os.makedirs(self.path, exist_ok=True)
os.makedirs(self.path_previous, exist_ok=True)
with open(Path(self.path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"Friendly name: Test\n" "labels:\n shredder_mitigation: true"
)
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write(
"Friendly name: Test\n" "labels:\n shredder_mitigation: true"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY 1, 2, 3
previous_group_by = "1, 2, 3"
new_group_by = "column_1, column_2, column_3"
with open(Path(self.path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY ALL
previous_group_by = "column_1, column_2, column_3"
new_group_by = "ALL"
with open(Path(self.path) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {new_group_by}"
)
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
# GROUP BY is missing
previous_group_by = "column_1, column_2, column_3"
with open(Path(self.path) / "query.sql", "w") as f:
f.write("SELECT column_1, column_2 FROM upstream_1")
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
f"SELECT column_1, column_2 FROM upstream_1 GROUP BY {previous_group_by}"
)
with pytest.raises(ClickException) as e:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert (str(e.value.message)) == expected_exc
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")
def test_generate_query_called_with_correct_parameters(
self, mock_classify_columns, mock_client, runner
):
"""Test that function generate_query is called with the correct parameters."""
existing_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "column_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
]
}
new_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "column_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "column_3", "type": "STRING", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
]
}
with runner.isolated_filesystem():
os.makedirs(self.path, exist_ok=True)
os.makedirs(self.path_previous, exist_ok=True)
with open(Path(self.path) / "query.sql", "w") as f:
f.write(
"SELECT column_1, column_2, column_3 FROM upstream_1 "
"GROUP BY column_1, column_2, column_3"
)
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: column_1\n require_partition_filter: true"
)
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
"SELECT column_1, column_2 FROM upstream_1 GROUP BY column_1, column_2"
)
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: column_1\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write(
"bigquery:\n time_partitioning:\n type: day\n "
"field: column_1\n require_partition_filter: true\n"
"labels:\n shredder_mitigation: true"
)
with open(self.path / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(new_schema))
with open(
self.path_previous / "schema.yaml",
"w",
) as f:
f.write(yaml.safe_dump(existing_schema))
mock_classify_columns.return_value = (
[
Column(
"column_1",
DataTypeGroup.DATE,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
),
Column(
"column_2",
DataTypeGroup.STRING,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
),
],
[
Column(
"column_3",
DataTypeGroup.STRING,
ColumnType.DIMENSION,
ColumnStatus.ADDED,
)
],
[],
[
Column(
"metric_1",
DataTypeGroup.INTEGER,
ColumnType.METRIC,
ColumnStatus.COMMON,
)
],
[],
)
with patch.object(
Subset,
"get_query_path_results",
@ -1326,15 +1737,16 @@ class TestGenerateQueryWithShredderMitigation:
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert mock_generate_query.call_count == 3
assert mock_generate_query.call_count == 5
assert mock_generate_query.call_args_list == (
[
call(
select_list=[
"submission_date",
"COALESCE(column_1, '???????') AS column_1",
"column_1",
"COALESCE(column_2, '???????') AS column_2",
"SUM(metric_1) AS metric_1",
],
from_clause="new_version",
@ -1342,27 +1754,180 @@ class TestGenerateQueryWithShredderMitigation:
),
call(
select_list=[
"submission_date",
"COALESCE(column_1, '???????') AS column_1",
"column_1",
"COALESCE(column_2, '???????') AS column_2",
"SUM(metric_1) AS metric_1",
],
from_clause="`moz-fx-data-shared-prod.test.test_query_v1`",
where_clause="submission_date = @submission_date",
where_clause="column_1 = @column_1",
group_by_clause="ALL",
),
call(
select_list=[
"previous_agg.submission_date",
"previous_agg.column_1",
"CAST(NULL AS STRING) AS column_2",
"previous_agg.column_2",
"CAST(NULL AS STRING) AS column_3",
"COALESCE(previous_agg.metric_1, 0) - "
"COALESCE(new_agg.metric_1, 0) AS metric_1",
],
from_clause="previous_agg LEFT JOIN new_agg ON "
"previous_agg.submission_date = new_agg.submission_date"
" AND previous_agg.column_1 = new_agg.column_1 ",
"previous_agg.column_1 = new_agg.column_1"
" AND previous_agg.column_2 = new_agg.column_2 ",
where_clause="COALESCE(previous_agg.metric_1, 0) >"
" COALESCE(new_agg.metric_1, 0)",
),
call(
select_list=[
"column_1",
"column_2",
"SUM(metric_1) AS metric_1",
],
from_clause="`moz-fx-data-shared-prod.test.test_query_v1`",
where_clause="column_1 = @column_1",
group_by_clause="ALL",
),
call(
select_list=[
"column_1",
"column_2",
"SUM(metric_1) AS metric_1",
],
from_clause="`moz-fx-data-shared-prod.test.test_query_v2__2021_01_01`",
where_clause="column_1 = @column_1",
group_by_clause="ALL",
),
]
)
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")
def test_generate_query_and_shredder_mitigation_label(
self, mock_classify_columns, mock_client, runner, capfd
):
"""Test that query is generated as expected given a set of mock dimensions and metrics."""
expected_failure_output = (
"The required label `shredder_mitigation` is missing in the metadata of the "
"table. The process will now terminate.\n"
)
existing_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
]
}
new_schema = {
"fields": [
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
{"name": "column_2", "type": "STRING", "mode": "NULLABLE"},
{"name": "metric_1", "type": "INTEGER", "mode": "NULLABLE"},
]
}
with runner.isolated_filesystem():
os.makedirs(self.path, exist_ok=True)
os.makedirs(self.path_previous, exist_ok=True)
with open(Path(self.path) / "query.sql", "w") as f:
f.write("SELECT column_1 FROM upstream_1 GROUP BY column_1")
with open(Path(self.path_previous) / "query.sql", "w") as f:
f.write(
"SELECT column_1, column_2 FROM upstream_1 GROUP BY column_1, column_2"
)
with open(self.path / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(new_schema))
with open(self.path_previous / "schema.yaml", "w") as f:
f.write(yaml.safe_dump(existing_schema))
# Label missing in both versions.
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write("Friendly name: Test\n")
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write("Friendly name: Test\n")
with pytest.raises(SystemExit) as result:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert result.type == SystemExit
assert result.value.code == 1
captured = capfd.readouterr()
assert expected_failure_output == captured.out
# Label missing in backfilled version generates failure even if in previous version.
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write("Friendly name: Test\nlabels:\n shredder_mitigation: true")
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write("Friendly name: Test\n")
with pytest.raises(SystemExit) as result:
generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert result.type == SystemExit
assert result.value.code == 1
captured = capfd.readouterr()
assert expected_failure_output == captured.out
# Label missing in previous version doesn't generate failure
# as long as it's present in backfilled version.
with open(Path(self.path_previous) / "metadata.yaml", "w") as f:
f.write("Friendly name: Test\n")
with open(Path(self.path) / "metadata.yaml", "w") as f:
f.write(
"Friendly name: Test\n" "labels:\n shredder_mitigation: true"
)
mock_classify_columns.return_value = (
[
Column(
"column_1",
DataTypeGroup.DATE,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
)
],
[
Column(
"column_2",
DataTypeGroup.DATE,
ColumnType.DIMENSION,
ColumnStatus.COMMON,
)
],
[],
[
Column(
"metric_1",
DataTypeGroup.INTEGER,
ColumnType.METRIC,
ColumnStatus.COMMON,
)
],
[],
)
with patch.object(
Subset,
"get_query_path_results",
return_value=[{"column_1": "ABC", "column_2": "DEF", "metric_1": 10.0}],
):
result = generate_query_with_shredder_mitigation(
client=mock_client,
project_id=self.project_id,
dataset=self.dataset,
destination_table=self.destination_table,
staging_table_name=self.staging_table_name,
backfill_date=PREVIOUS_DATE,
)
assert result[0] == self.path