From 1c8180b0856d069fb2c39bff24b6bbfa5b05e54b Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Wed, 23 Jun 2021 14:13:37 -0700 Subject: [PATCH] Use ExternalTaskCompletedSensor for bqetl_public_data_json DAG --- .../templates/public_data_json_airflow_dag.j2 | 6 ++++-- dags/bqetl_public_data_json.py | 9 ++++++--- tests/data/dags/test_public_data_json_dag | 6 ++++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 index c41f261f25..d6ef748798 100644 --- a/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 @@ -1,9 +1,10 @@ # Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py from airflow import DAG -from airflow.operators.sensors import ExternalTaskSensor +from airflow.utils.state import State import datetime from operators.gcp_container_operator import GKEPodOperator +from operators.task_sensor import ExternalTaskCompletedSensor from utils.gcp import gke_command docs = """ @@ -54,7 +55,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None {% if dependency.dag_name == name -%} {{ task.task_name }}.set_upstream({{ dependency.task_id }}) {% else -%} - wait_for_{{ dependency.task_id }} = ExternalTaskSensor( + wait_for_{{ dependency.task_id }} = ExternalTaskCompletedSensor( task_id='wait_for_{{ dependency.task_id }}', external_dag_id='{{ dependency.dag_name }}', external_task_id='{{ dependency.task_id }}', @@ -63,6 +64,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None {% endif -%} check_existence=True, mode='reschedule', + failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED], pool='DATA_ENG_EXTERNALTASKSENSOR', ) diff --git a/dags/bqetl_public_data_json.py b/dags/bqetl_public_data_json.py index 2766f34597..365a010b7c 100644 --- a/dags/bqetl_public_data_json.py +++ b/dags/bqetl_public_data_json.py @@ -1,9 +1,10 @@ # Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py from airflow import DAG -from airflow.operators.sensors import ExternalTaskSensor +from airflow.utils.state import State import datetime from operators.gcp_container_operator import GKEPodOperator +from operators.task_sensor import ExternalTaskCompletedSensor from utils.gcp import gke_command docs = """ @@ -74,13 +75,14 @@ with DAG( dag=dag, ) - wait_for_mozregression_aggregates__v1 = ExternalTaskSensor( + wait_for_mozregression_aggregates__v1 = ExternalTaskCompletedSensor( task_id="wait_for_mozregression_aggregates__v1", external_dag_id="bqetl_internal_tooling", external_task_id="mozregression_aggregates__v1", execution_delta=datetime.timedelta(seconds=3600), check_existence=True, mode="reschedule", + failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED], pool="DATA_ENG_EXTERNALTASKSENSOR", ) @@ -88,13 +90,14 @@ with DAG( wait_for_mozregression_aggregates__v1 ) - wait_for_telemetry_derived__ssl_ratios__v1 = ExternalTaskSensor( + wait_for_telemetry_derived__ssl_ratios__v1 = ExternalTaskCompletedSensor( task_id="wait_for_telemetry_derived__ssl_ratios__v1", external_dag_id="bqetl_ssl_ratios", external_task_id="telemetry_derived__ssl_ratios__v1", execution_delta=datetime.timedelta(seconds=10800), check_existence=True, mode="reschedule", + failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED], pool="DATA_ENG_EXTERNALTASKSENSOR", ) diff --git a/tests/data/dags/test_public_data_json_dag b/tests/data/dags/test_public_data_json_dag index be9ced7404..63b52c7d62 100644 --- a/tests/data/dags/test_public_data_json_dag +++ b/tests/data/dags/test_public_data_json_dag @@ -1,9 +1,10 @@ # Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py from airflow import DAG -from airflow.operators.sensors import ExternalTaskSensor +from airflow.utils.state import State import datetime from operators.gcp_container_operator import GKEPodOperator +from operators.task_sensor import ExternalTaskCompletedSensor from utils.gcp import gke_command docs = """ @@ -52,12 +53,13 @@ with DAG( dag=dag, ) - wait_for_test__non_incremental_query__v1 = ExternalTaskSensor( + wait_for_test__non_incremental_query__v1 = ExternalTaskCompletedSensor( task_id="wait_for_test__non_incremental_query__v1", external_dag_id="bqetl_core", external_task_id="test__non_incremental_query__v1", check_existence=True, mode="reschedule", + failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED], pool="DATA_ENG_EXTERNALTASKSENSOR", )