* Add markers to check cli command to differentiate warning from hard failures

* Fix CI issues

* Fix dag generation

* Incorporate Feedback

* Generate Airflow tasks for #fail and #warn checks

---------

Co-authored-by: Alekhya Kommasani <akommasani@mozilla.com>
Co-authored-by: Alekhya <88394696+alekhyamoz@users.noreply.github.com>
This commit is contained in:
Anna Scholtz 2023-09-13 10:22:39 -07:00 коммит произвёл GitHub
Родитель 0cf1e2d3bb
Коммит 3f79cc5151
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 146 добавлений и 78 удалений

Просмотреть файл

@ -46,6 +46,18 @@ def _build_jinja_parameters(query_args):
return parameters
def _render_result_split_by_marker(marker, rendered_result):
"""Filter the rendered sql checks with the set marker."""
extracted_result = []
rendered_result = sqlparse.split(rendered_result)
for sql_statement in rendered_result:
sql_statement = sql_statement.strip()
if re.search(f"^#{marker}", sql_statement, re.IGNORECASE):
extracted_result.append(sql_statement)
return " ".join(extracted_result)
def _parse_check_output(output: str) -> str:
output = output.replace("\n", " ")
if "ETL Data Check Failed:" in output:
@ -182,6 +194,7 @@ def _render(
@click.argument("dataset")
@project_id_option()
@sql_dir_option
@click.option("--marker", default="fail", help="Marker to filter checks.")
@click.option(
"--dry_run",
"--dry-run",
@ -190,7 +203,7 @@ def _render(
help="To dry run the query to make sure it is valid",
)
@click.pass_context
def run(ctx, dataset, project_id, sql_dir, dry_run):
def run(ctx, dataset, project_id, sql_dir, marker, dry_run):
"""Run a check."""
if not is_authenticated():
click.echo(
@ -210,6 +223,7 @@ def run(ctx, dataset, project_id, sql_dir, dry_run):
table,
ctx.args,
dry_run=dry_run,
marker=marker,
)
@ -219,6 +233,7 @@ def _run_check(
dataset_id,
table,
query_arguments,
marker,
dry_run=False,
):
"""Run the check."""
@ -249,8 +264,8 @@ def _run_check(
format=False,
**jinja_params,
)
checks = sqlparse.split(rendered_result)
result_split_by_marker = _render_result_split_by_marker(marker, rendered_result)
checks = sqlparse.split(result_split_by_marker)
seek_location = 0
check_failed = False
@ -277,3 +292,6 @@ def _run_check(
if check_failed:
sys.exit(1)
# todo: add validate method -- there must always be #fail checks

Просмотреть файл

@ -78,7 +78,7 @@ class DagCollection:
return None
def checks_task_for_table(self, project, dataset, table):
def fail_checks_task_for_table(self, project, dataset, table):
"""Return the task that schedules the checks for the provided table."""
for dag in self.dags:
for task in dag.tasks:
@ -87,6 +87,7 @@ class DagCollection:
and dataset == task.dataset
and table == f"{task.table}_{task.version}"
and task.is_dq_check
and task.is_dq_check_fail
):
return task

Просмотреть файл

@ -95,17 +95,44 @@ def get_dags(project_id, dags_config):
else:
if CHECKS_FILE in files:
checks_file = os.path.join(root, CHECKS_FILE)
checks_task = copy.deepcopy(
Task.of_dq_check(checks_file, dag_collection=dag_collection)
)
tasks.append(checks_task)
task_ref = TaskRef(
dag_name=task.dag_name,
task_id=task.task_name,
)
checks_task.upstream_dependencies.append(task_ref)
tasks.append(task)
# todo: validate checks file
with open(checks_file, "r") as file:
file_contents = file.read()
# check if file contains fail and warn and create checks task accordingly
checks_tasks = []
if "#fail" in file_contents:
checks_task = copy.deepcopy(
Task.of_dq_check(
checks_file,
is_check_fail=True,
dag_collection=dag_collection,
)
)
checks_tasks.append(checks_task)
if "#warn" in file_contents:
checks_task = copy.deepcopy(
Task.of_dq_check(
checks_file,
is_check_fail=False,
dag_collection=dag_collection,
)
)
checks_tasks.append(checks_task)
for checks_task in checks_tasks:
tasks.append(checks_task)
upstream_task_ref = TaskRef(
dag_name=task.dag_name,
task_id=task.task_name,
)
checks_task.upstream_dependencies.append(
upstream_task_ref
)
tasks.append(task)
else:
logging.error(
"""

Просмотреть файл

@ -221,6 +221,8 @@ class Task:
destination_table: Optional[str] = attr.ib(default=DEFAULT_DESTINATION_TABLE_STR)
is_python_script: bool = attr.ib(False)
is_dq_check: bool = attr.ib(False)
# Failure of the checks task will stop the dag from executing further
is_dq_check_fail: bool = attr.ib(True)
task_concurrency: Optional[int] = attr.ib(None)
retry_delay: Optional[str] = attr.ib(None)
retries: Optional[int] = attr.ib(None)
@ -299,7 +301,6 @@ class Task:
def __attrs_post_init__(self):
"""Extract information from the query file name."""
query_file_re = re.search(QUERY_FILE_RE, self.query_file)
check_file_re = re.search(CHECKS_FILE_RE, self.query_file)
if query_file_re:
self.project = query_file_re.group(1)
self.dataset = query_file_re.group(2)
@ -313,14 +314,6 @@ class Task:
]
self.validate_task_name(None, self.task_name)
if check_file_re is not None:
self.task_name = (
f"checks__{self.dataset}__{self.table}__{self.version}"[
-MAX_TASK_NAME_LENGTH:
]
)
self.validate_task_name(None, self.task_name)
if self.destination_table == DEFAULT_DESTINATION_TABLE_STR:
self.destination_table = f"{self.table}_{self.version}"
@ -467,12 +460,27 @@ class Task:
return task
@classmethod
def of_dq_check(cls, query_file, metadata=None, dag_collection=None):
def of_dq_check(cls, query_file, is_check_fail, metadata=None, dag_collection=None):
"""Create a task that schedules DQ check file in Airflow."""
task = cls.of_query(query_file, metadata, dag_collection)
task.query_file_path = query_file
task.is_dq_check = True
task.is_dq_check_fail = is_check_fail
task.depends_on_fivetran = []
if task.is_dq_check_fail:
task.task_name = (
f"checks__fail_{task.dataset}__{task.table}__{task.version}"[
-MAX_TASK_NAME_LENGTH:
]
)
task.validate_task_name(None, task.task_name)
else:
task.task_name = (
f"checks__warn_{task.dataset}__{task.table}__{task.version}"[
-MAX_TASK_NAME_LENGTH:
]
)
task.validate_task_name(None, task.task_name)
return task
def to_ref(self, dag_collection):
@ -527,7 +535,7 @@ class Task:
for table in self._get_referenced_tables():
# check if upstream task is accompanied by a check
# the task running the check will be set as the upstream task instead
checks_upstream_task = dag_collection.checks_task_for_table(
checks_upstream_task = dag_collection.fail_checks_task_for_table(
table[0], table[1], table[2]
)
upstream_task = dag_collection.task_for_table(table[0], table[1], table[2])

Просмотреть файл

@ -71,6 +71,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
dataset_id='{{ task.dataset }}',
project_id='{{ task.project }}',
{%+ endif -%}
is_dq_check_fail = {{ task.is_dq_check_fail }},
owner='{{ task.owner }}',
{%+ if task.email | length > 0 -%}
email={{ task.email | sort }},

Просмотреть файл

@ -43,11 +43,12 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__ga_derived__downloads_with_attribution__v2 = bigquery_dq_check(
task_id="checks__ga_derived__downloads_with_attribution__v2",
checks__fail_ga_derived__downloads_with_attribution__v2 = bigquery_dq_check(
task_id="checks__fail_ga_derived__downloads_with_attribution__v2",
source_table='downloads_with_attribution_v2${{ macros.ds_format(macros.ds_add(ds, -1), "%Y-%m-%d", "%Y%m%d") }}',
dataset_id="ga_derived",
project_id="moz-fx-data-marketing-prod",
is_dq_check_fail=True,
owner="gleonard@mozilla.com",
email=["gleonard@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
@ -66,7 +67,7 @@ with DAG(
parameters=["download_date:DATE:{{macros.ds_add(ds, -1)}}"],
)
checks__ga_derived__downloads_with_attribution__v2.set_upstream(
checks__fail_ga_derived__downloads_with_attribution__v2.set_upstream(
ga_derived__downloads_with_attribution__v2
)

Просмотреть файл

@ -43,17 +43,6 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__firefox_ios_derived__app_store_funnel__v1 = bigquery_dq_check(
task_id="checks__firefox_ios_derived__app_store_funnel__v1",
source_table="app_store_funnel_v1",
dataset_id="firefox_ios_derived",
project_id="moz-fx-data-shared-prod",
owner="kik@mozilla.com",
email=["kik@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
task_concurrency=1,
)
firefox_ios_derived__app_store_funnel__v1 = bigquery_etl_query(
task_id="firefox_ios_derived__app_store_funnel__v1",
destination_table="app_store_funnel_v1",
@ -130,10 +119,6 @@ with DAG(
email=["kik@mozilla.com", "telemetry-alerts@mozilla.com"],
)
checks__firefox_ios_derived__app_store_funnel__v1.set_upstream(
firefox_ios_derived__app_store_funnel__v1
)
wait_for_app_store_external__firefox_app_store_territory_source_type_report__v1 = ExternalTaskSensor(
task_id="wait_for_app_store_external__firefox_app_store_territory_source_type_report__v1",
external_dag_id="bqetl_fivetran_copied_tables",

Просмотреть файл

@ -48,33 +48,36 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__fivetran_costs_derived__daily_connector_costs__v1 = bigquery_dq_check(
task_id="checks__fivetran_costs_derived__daily_connector_costs__v1",
checks__fail_fivetran_costs_derived__daily_connector_costs__v1 = bigquery_dq_check(
task_id="checks__fail_fivetran_costs_derived__daily_connector_costs__v1",
source_table="daily_connector_costs_v1",
dataset_id="fivetran_costs_derived",
project_id="moz-fx-data-shared-prod",
is_dq_check_fail=True,
owner="lschiestl@mozilla.com",
email=["lschiestl@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
task_concurrency=1,
)
checks__fivetran_costs_derived__incremental_mar__v1 = bigquery_dq_check(
task_id="checks__fivetran_costs_derived__incremental_mar__v1",
checks__fail_fivetran_costs_derived__incremental_mar__v1 = bigquery_dq_check(
task_id="checks__fail_fivetran_costs_derived__incremental_mar__v1",
source_table="incremental_mar_v1",
dataset_id="fivetran_costs_derived",
project_id="moz-fx-data-shared-prod",
is_dq_check_fail=True,
owner="lschiestl@mozilla.com",
email=["lschiestl@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
task_concurrency=1,
)
checks__fivetran_costs_derived__monthly_costs__v1 = bigquery_dq_check(
task_id="checks__fivetran_costs_derived__monthly_costs__v1",
checks__fail_fivetran_costs_derived__monthly_costs__v1 = bigquery_dq_check(
task_id="checks__fail_fivetran_costs_derived__monthly_costs__v1",
source_table="monthly_costs_v1",
dataset_id="fivetran_costs_derived",
project_id="moz-fx-data-shared-prod",
is_dq_check_fail=True,
owner="lschiestl@mozilla.com",
email=["lschiestl@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
@ -129,24 +132,24 @@ with DAG(
task_concurrency=1,
)
checks__fivetran_costs_derived__daily_connector_costs__v1.set_upstream(
checks__fail_fivetran_costs_derived__daily_connector_costs__v1.set_upstream(
fivetran_costs_derived__daily_connector_costs__v1
)
checks__fivetran_costs_derived__incremental_mar__v1.set_upstream(
checks__fail_fivetran_costs_derived__incremental_mar__v1.set_upstream(
fivetran_costs_derived__incremental_mar__v1
)
checks__fivetran_costs_derived__monthly_costs__v1.set_upstream(
checks__fail_fivetran_costs_derived__monthly_costs__v1.set_upstream(
fivetran_costs_derived__monthly_costs__v1
)
fivetran_costs_derived__daily_connector_costs__v1.set_upstream(
checks__fivetran_costs_derived__incremental_mar__v1
checks__fail_fivetran_costs_derived__incremental_mar__v1
)
fivetran_costs_derived__daily_connector_costs__v1.set_upstream(
checks__fivetran_costs_derived__monthly_costs__v1
checks__fail_fivetran_costs_derived__monthly_costs__v1
)
fivetran_costs_derived__daily_connector_costs__v1.set_upstream(

Просмотреть файл

@ -43,11 +43,12 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__telemetry_derived__ssl_ratios__v1 = bigquery_dq_check(
task_id="checks__telemetry_derived__ssl_ratios__v1",
checks__fail_telemetry_derived__ssl_ratios__v1 = bigquery_dq_check(
task_id="checks__fail_telemetry_derived__ssl_ratios__v1",
source_table="ssl_ratios_v1",
dataset_id="telemetry_derived",
project_id="moz-fx-data-shared-prod",
is_dq_check_fail=True,
owner="chutten@mozilla.com",
email=["chutten@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
@ -65,7 +66,7 @@ with DAG(
depends_on_past=False,
)
checks__telemetry_derived__ssl_ratios__v1.set_upstream(
checks__fail_telemetry_derived__ssl_ratios__v1.set_upstream(
telemetry_derived__ssl_ratios__v1
)

Просмотреть файл

@ -1,3 +1,5 @@
#fail
ASSERT(
(
SELECT
@ -9,4 +11,4 @@ ASSERT(
) > 50000
)
AS
'ETL Data Check Failed: Table {{project_id}}.{{dataset_id}}.{{table_name}} contains less than 50,000 rows for date: {{ download_date }}.'
'ETL Data Check Failed: Table {{project_id}}.{{dataset_id}}.{{table_name}} contains less than 50,000 rows for date: {{ download_date }}.'

Просмотреть файл

@ -1,5 +1,8 @@
#fail
{{ is_unique(columns=["destination", "measured_date", "connector", "billing_type"]) }}
#fail
{{ not_null(columns=["destination", "measured_date", "connector", "billing_type", "active_rows", "cost_in_usd"]) }}
#fail
{{ min_rows(1) }}

Просмотреть файл

@ -1,3 +1,5 @@
#fail
{{ not_null(columns=["measured_date", "measured_month", "destination_id", "connector", "table_name", "billing_type", "active_rows"]) }}
#fail
{{ min_rows(1) }}

Просмотреть файл

@ -1,5 +1,8 @@
#fail
{{ is_unique(columns=["destination_id", "measured_month"]) }}
#fail
{{ not_null(columns=["destination_id", "measured_month"]) }}
#fail
{{ min_rows(1) }}

Просмотреть файл

@ -1,4 +1,11 @@
#fail
{{ not_null(["submission_date", "os"], "submission_date = @submission_date") }}
#fail
{{ min_rows(1, "submission_date = @submission_date") }}
#fail
{{ is_unique(["submission_date", "os", "country"], "submission_date = @submission_date")}}
#fail
{{ in_range(["non_ssl_loads", "ssl_loads", "reporting_ratio"], 0, none, "submission_date = @submission_date") }}

Просмотреть файл

@ -54,7 +54,8 @@ class TestCheck:
)
expected = dedent(
"""\
"""
#fail
ASSERT(
(
SELECT

Просмотреть файл

@ -40,11 +40,12 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__test__external_table__v1 = bigquery_dq_check(
task_id="checks__test__external_table__v1",
checks__fail_test__external_table__v1 = bigquery_dq_check(
task_id="checks__fail_test__external_table__v1",
source_table="external_table_v1",
dataset_id="test",
project_id="test-project",
is_dq_check_fail=True,
owner="test@example.org",
email=["test@example.org"],
depends_on_past=False,
@ -52,16 +53,16 @@ with DAG(
)
with TaskGroup(
"checks__test__external_table__v1_external"
) as checks__test__external_table__v1_external:
"checks__fail_test__external_table__v1_external"
) as checks__fail_test__external_table__v1_external:
ExternalTaskMarker(
task_id="bqetl_test_dag__wait_for_checks__test__external_table__v1",
task_id="bqetl_test_dag__wait_for_checks__fail_test__external_table__v1",
external_dag_id="bqetl_test_dag",
external_task_id="wait_for_checks__test__external_table__v1",
external_task_id="wait_for_checks__fail_test__external_table__v1",
)
checks__test__external_table__v1_external.set_upstream(
checks__test__external_table__v1
checks__fail_test__external_table__v1_external.set_upstream(
checks__fail_test__external_table__v1
)
test__external_table__v1 = bigquery_etl_query(
@ -75,4 +76,4 @@ with DAG(
depends_on_past=False,
)
checks__test__external_table__v1.set_upstream(test__external_table__v1)
checks__fail_test__external_table__v1.set_upstream(test__external_table__v1)

Просмотреть файл

@ -40,11 +40,12 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__test__table1__v1 = bigquery_dq_check(
task_id="checks__test__table1__v1",
checks__fail_test__table1__v1 = bigquery_dq_check(
task_id="checks__fail_test__table1__v1",
source_table="table1_v1",
dataset_id="test",
project_id="test-project",
is_dq_check_fail=True,
owner="test@example.org",
email=["test@example.org"],
depends_on_past=False,
@ -84,12 +85,12 @@ with DAG(
depends_on_past=False,
)
checks__test__table1__v1.set_upstream(test__table1__v1)
checks__fail_test__table1__v1.set_upstream(test__table1__v1)
wait_for_checks__test__external_table__v1 = ExternalTaskSensor(
task_id="wait_for_checks__test__external_table__v1",
wait_for_checks__fail_test__external_table__v1 = ExternalTaskSensor(
task_id="wait_for_checks__fail_test__external_table__v1",
external_dag_id="bqetl_external_test_dag",
external_task_id="checks__test__external_table__v1",
external_task_id="checks__fail_test__external_table__v1",
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
@ -97,8 +98,8 @@ with DAG(
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
test__query__v1.set_upstream(wait_for_checks__test__external_table__v1)
test__query__v1.set_upstream(wait_for_checks__fail_test__external_table__v1)
test__query__v1.set_upstream(checks__test__table1__v1)
test__query__v1.set_upstream(checks__fail_test__table1__v1)
test__query__v1.set_upstream(test__table2__v1)

Просмотреть файл

@ -567,7 +567,8 @@ class TestDagCollection:
checks_task1 = Task.of_dq_check(
tmp_path / "test-project" / "test" / "table1_v1" / "checks.sql",
metadata,
is_check_fail=True,
metadata=metadata,
)
checks_task1.upstream_dependencies.append(table_task1_ref)
@ -611,7 +612,8 @@ class TestDagCollection:
checks_task2 = Task.of_dq_check(
tmp_path / "test-project" / "test" / "external_table_v1" / "checks.sql",
metadata,
is_check_fail=True,
metadata=metadata,
)
checks_task2.upstream_dependencies.append(external_table_task_ref)

Просмотреть файл

@ -1,3 +1,4 @@
#fail
ASSERT(
(
SELECT