From 550752cf6f1c09e8e56e7bf7450422596971d198 Mon Sep 17 00:00:00 2001 From: Sean Rose <1994030+sean-rose@users.noreply.github.com> Date: Fri, 13 Jan 2023 16:07:18 -0800 Subject: [PATCH] Fix bug generating task dependencies when queries reference stable views in `mozdata`. (#3504) --- bigquery_etl/dependency.py | 2 +- dags/bqetl_fog_decision_support.py | 13 +++++++++++ dags/bqetl_main_summary.py | 14 +++++++++++ dags/bqetl_sponsored_tiles_clients_daily.py | 26 +++++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/bigquery_etl/dependency.py b/bigquery_etl/dependency.py index 8470738ca3..c1bdc79eb8 100644 --- a/bigquery_etl/dependency.py +++ b/bigquery_etl/dependency.py @@ -76,7 +76,7 @@ def extract_table_references_without_views(path: Path) -> Iterator[str]: while len(parts) < 3: parts = (ref_base.name, *parts) ref_base = ref_base.parent - if parts[:-2] == ("moz-fx-data-shared-prod",): + if parts[:-2] in (("moz-fx-data-shared-prod",), ("mozdata",)): if stable_views is None: # lazy read stable views stable_views = { diff --git a/dags/bqetl_fog_decision_support.py b/dags/bqetl_fog_decision_support.py index 823787a89e..4bb0829167 100644 --- a/dags/bqetl_fog_decision_support.py +++ b/dags/bqetl_fog_decision_support.py @@ -55,6 +55,19 @@ with DAG( depends_on_past=False, ) + wait_for_copy_deduplicate_all = ExternalTaskSensor( + task_id="wait_for_copy_deduplicate_all", + external_dag_id="copy_deduplicate", + external_task_id="copy_deduplicate_all", + execution_delta=datetime.timedelta(seconds=10800), + check_existence=True, + mode="reschedule", + allowed_states=ALLOWED_STATES, + failed_states=FAILED_STATES, + pool="DATA_ENG_EXTERNALTASKSENSOR", + ) + + fog_decision_support_v1.set_upstream(wait_for_copy_deduplicate_all) wait_for_copy_deduplicate_main_ping = ExternalTaskSensor( task_id="wait_for_copy_deduplicate_main_ping", external_dag_id="copy_deduplicate", diff --git a/dags/bqetl_main_summary.py b/dags/bqetl_main_summary.py index be1aeceaed..b6fb1a832d 100644 --- a/dags/bqetl_main_summary.py +++ b/dags/bqetl_main_summary.py @@ -529,6 +529,20 @@ with DAG( depends_on_past=False, ) + wait_for_copy_deduplicate_all = ExternalTaskSensor( + task_id="wait_for_copy_deduplicate_all", + external_dag_id="copy_deduplicate", + external_task_id="copy_deduplicate_all", + execution_delta=datetime.timedelta(seconds=3600), + check_existence=True, + mode="reschedule", + allowed_states=ALLOWED_STATES, + failed_states=FAILED_STATES, + pool="DATA_ENG_EXTERNALTASKSENSOR", + ) + + crashes_daily_v1.set_upstream(wait_for_copy_deduplicate_all) + firefox_desktop_exact_mau28_by_client_count_dimensions.set_upstream( telemetry_derived__clients_last_seen__v1 ) diff --git a/dags/bqetl_sponsored_tiles_clients_daily.py b/dags/bqetl_sponsored_tiles_clients_daily.py index 85a6f1700f..393029518f 100644 --- a/dags/bqetl_sponsored_tiles_clients_daily.py +++ b/dags/bqetl_sponsored_tiles_clients_daily.py @@ -56,6 +56,32 @@ with DAG( parameters=["submission_date:DATE:{{macros.ds_add(ds, -1)}}"], ) + wait_for_copy_deduplicate_all = ExternalTaskSensor( + task_id="wait_for_copy_deduplicate_all", + external_dag_id="copy_deduplicate", + external_task_id="copy_deduplicate_all", + execution_delta=datetime.timedelta(seconds=10800), + check_existence=True, + mode="reschedule", + allowed_states=ALLOWED_STATES, + failed_states=FAILED_STATES, + pool="DATA_ENG_EXTERNALTASKSENSOR", + ) + + sponsored_tiles_clients_daily_v1.set_upstream(wait_for_copy_deduplicate_all) + wait_for_copy_deduplicate_main_ping = ExternalTaskSensor( + task_id="wait_for_copy_deduplicate_main_ping", + external_dag_id="copy_deduplicate", + external_task_id="copy_deduplicate_main_ping", + execution_delta=datetime.timedelta(seconds=10800), + check_existence=True, + mode="reschedule", + allowed_states=ALLOWED_STATES, + failed_states=FAILED_STATES, + pool="DATA_ENG_EXTERNALTASKSENSOR", + ) + + sponsored_tiles_clients_daily_v1.set_upstream(wait_for_copy_deduplicate_main_ping) wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor( task_id="wait_for_telemetry_derived__clients_daily_joined__v1", external_dag_id="bqetl_main_summary",