Use ExternalTaskCompletedSensor for bqetl_public_data_json DAG

This commit is contained in:
Anna Scholtz 2021-06-23 14:13:37 -07:00
Родитель 287b41b489
Коммит 1c8180b085
3 изменённых файлов: 14 добавлений и 7 удалений

Просмотреть файл

@ -1,9 +1,10 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from utils.gcp import gke_command
docs = """
@ -54,7 +55,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{% if dependency.dag_name == name -%}
{{ task.task_name }}.set_upstream({{ dependency.task_id }})
{% else -%}
wait_for_{{ dependency.task_id }} = ExternalTaskSensor(
wait_for_{{ dependency.task_id }} = ExternalTaskCompletedSensor(
task_id='wait_for_{{ dependency.task_id }}',
external_dag_id='{{ dependency.dag_name }}',
external_task_id='{{ dependency.task_id }}',
@ -63,6 +64,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{% endif -%}
check_existence=True,
mode='reschedule',
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool='DATA_ENG_EXTERNALTASKSENSOR',
)

Просмотреть файл

@ -1,9 +1,10 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from utils.gcp import gke_command
docs = """
@ -74,13 +75,14 @@ with DAG(
dag=dag,
)
wait_for_mozregression_aggregates__v1 = ExternalTaskSensor(
wait_for_mozregression_aggregates__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_mozregression_aggregates__v1",
external_dag_id="bqetl_internal_tooling",
external_task_id="mozregression_aggregates__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
@ -88,13 +90,14 @@ with DAG(
wait_for_mozregression_aggregates__v1
)
wait_for_telemetry_derived__ssl_ratios__v1 = ExternalTaskSensor(
wait_for_telemetry_derived__ssl_ratios__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__ssl_ratios__v1",
external_dag_id="bqetl_ssl_ratios",
external_task_id="telemetry_derived__ssl_ratios__v1",
execution_delta=datetime.timedelta(seconds=10800),
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool="DATA_ENG_EXTERNALTASKSENSOR",
)

Просмотреть файл

@ -1,9 +1,10 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from utils.gcp import gke_command
docs = """
@ -52,12 +53,13 @@ with DAG(
dag=dag,
)
wait_for_test__non_incremental_query__v1 = ExternalTaskSensor(
wait_for_test__non_incremental_query__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_test__non_incremental_query__v1",
external_dag_id="bqetl_core",
external_task_id="test__non_incremental_query__v1",
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool="DATA_ENG_EXTERNALTASKSENSOR",
)