Fix bug generating task dependencies when queries reference stable views in `mozdata`. (#3504)

This commit is contained in:
Sean Rose 2023-01-13 16:07:18 -08:00 коммит произвёл GitHub
Родитель a96e389597
Коммит 550752cf6f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 54 добавлений и 1 удалений

Просмотреть файл

@ -76,7 +76,7 @@ def extract_table_references_without_views(path: Path) -> Iterator[str]:
while len(parts) < 3:
parts = (ref_base.name, *parts)
ref_base = ref_base.parent
if parts[:-2] == ("moz-fx-data-shared-prod",):
if parts[:-2] in (("moz-fx-data-shared-prod",), ("mozdata",)):
if stable_views is None:
# lazy read stable views
stable_views = {

Просмотреть файл

@ -55,6 +55,19 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
execution_delta=datetime.timedelta(seconds=10800),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
fog_decision_support_v1.set_upstream(wait_for_copy_deduplicate_all)
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",

Просмотреть файл

@ -529,6 +529,20 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
crashes_daily_v1.set_upstream(wait_for_copy_deduplicate_all)
firefox_desktop_exact_mau28_by_client_count_dimensions.set_upstream(
telemetry_derived__clients_last_seen__v1
)

Просмотреть файл

@ -56,6 +56,32 @@ with DAG(
parameters=["submission_date:DATE:{{macros.ds_add(ds, -1)}}"],
)
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
execution_delta=datetime.timedelta(seconds=10800),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
sponsored_tiles_clients_daily_v1.set_upstream(wait_for_copy_deduplicate_all)
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
execution_delta=datetime.timedelta(seconds=10800),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
sponsored_tiles_clients_daily_v1.set_upstream(wait_for_copy_deduplicate_main_ping)
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily_joined__v1",
external_dag_id="bqetl_main_summary",