Add trigger_rule as an option for generated airflow tasks (#3772)

* Add trigger_rule as an option for generated airflow tasks

* Add test

* Move trigger rule options to enum and add to documentation
This commit is contained in:
Curtis Morales 2023-06-27 13:58:52 -04:00 коммит произвёл GitHub
Родитель d98ae6bf2e
Коммит b7b1b835ba
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 57 добавлений и 0 удалений

Просмотреть файл

@ -3,6 +3,7 @@
import logging import logging
import os import os
import re import re
from enum import Enum
from fnmatch import fnmatchcase from fnmatch import fnmatchcase
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
@ -32,6 +33,19 @@ DEFAULT_DESTINATION_TABLE_STR = "use-default-destination-table"
MAX_TASK_NAME_LENGTH = 250 MAX_TASK_NAME_LENGTH = 250
class TriggerRule(Enum):
"""Options for task trigger rules."""
ALL_SUCCESS = "all_success"
ALL_FAILED = "all_failed"
ALL_DONE = "all_done"
ONE_FAILED = "one_failed"
ONE_SUCCESS = "one_success"
NONE_FAILED = "none_failed"
NONE_SKIPPED = "none_skipped"
DUMMY = "dummy"
class TaskParseException(Exception): class TaskParseException(Exception):
"""Raised when task scheduling config is invalid.""" """Raised when task scheduling config is invalid."""
@ -186,6 +200,8 @@ class Task:
# manually specified upstream dependencies # manually specified upstream dependencies
depends_on: List[TaskRef] = attr.ib([]) depends_on: List[TaskRef] = attr.ib([])
depends_on_fivetran: List[FivetranTask] = attr.ib([]) depends_on_fivetran: List[FivetranTask] = attr.ib([])
# task trigger rule, used to override default of "all_success"
trigger_rule: Optional[str] = attr.ib(None)
# manually specified downstream depdencies # manually specified downstream depdencies
external_downstream_tasks: List[TaskRef] = attr.ib([]) external_downstream_tasks: List[TaskRef] = attr.ib([])
# automatically determined upstream and downstream dependencies # automatically determined upstream and downstream dependencies
@ -256,6 +272,15 @@ class Task:
f"The task name has to be 1 to {MAX_TASK_NAME_LENGTH} characters long." f"The task name has to be 1 to {MAX_TASK_NAME_LENGTH} characters long."
) )
@trigger_rule.validator
def validate_trigger_rule(self, attribute, value):
"""Check that trigger_rule is a valid option."""
if value is not None and value not in set(rule.value for rule in TriggerRule):
raise ValueError(
f"Invalid trigger rule {value}. "
"See https://airflow.apache.org/docs/apache-airflow/1.10.3/concepts.html#trigger-rules for list of trigger rules"
)
@retry_delay.validator @retry_delay.validator
def validate_retry_delay(self, attribute, value): def validate_retry_delay(self, attribute, value):
"""Check that retry_delay is in a valid timedelta format.""" """Check that retry_delay is in a valid timedelta format."""

Просмотреть файл

@ -90,6 +90,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
table_partition_template='{{ task.table_partition_template }}', table_partition_template='{{ task.table_partition_template }}',
{%+ endif -%} {%+ endif -%}
depends_on_past={{ task.depends_on_past }}, depends_on_past={{ task.depends_on_past }},
{%+ if task.trigger_rule -%}
trigger_rule="{{ task.trigger_rule }}",
{%+ endif -%}
{%+ if ( {%+ if (
task.destination_table task.destination_table
and not task.date_partition_parameter and not task.date_partition_parameter

Просмотреть файл

@ -44,6 +44,7 @@
- `task_id`: name of task query depends on - `task_id`: name of task query depends on
- `dag_name`: name of the DAG the external task is part of - `dag_name`: name of the DAG the external task is part of
- `execution_delta`: time difference between the `schedule_intervals` of the external DAG and the DAG the query is part of - `execution_delta`: time difference between the `schedule_intervals` of the external DAG and the DAG the query is part of
- `trigger_rule`: The rule that determines when the airflow task that runs this query should run. The default is `all_success` ("trigger this task when all directly upstream tasks have succeeded"); other rules can allow a task to run even if not all preceding tasks have succeeded. See [the Airflow docs](https://airflow.apache.org/docs/apache-airflow/1.10.3/concepts.html?highlight=trigger%20rule#trigger-rules) for the list of trigger rule options.
- `destination_table`: The table to write to. If unspecified, defaults to the query destination; if None, no destination table is used (the query is simply run as-is). Note that if no destination table is specified, you will need to specify the `submission_date` parameter manually - `destination_table`: The table to write to. If unspecified, defaults to the query destination; if None, no destination table is used (the query is simply run as-is). Note that if no destination table is specified, you will need to specify the `submission_date` parameter manually
- `external_downstream_tasks` defines external downstream dependencies for which [`ExternalTaskMarker`s](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external_task_sensor.html#externaltaskmarker) will be added to the generated DAG. These task markers ensure that when the task is cleared for triggering a rerun, all downstream tasks are automatically cleared as well. - `external_downstream_tasks` defines external downstream dependencies for which [`ExternalTaskMarker`s](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external_task_sensor.html#externaltaskmarker) will be added to the generated DAG. These task markers ensure that when the task is cleared for triggering a rerun, all downstream tasks are automatically cleared as well.
```yaml ```yaml

Просмотреть файл

@ -757,6 +757,34 @@ class TestTask:
assert task.depends_on[1].task_id == "external_task2" assert task.depends_on[1].task_id == "external_task2"
assert task.depends_on[1].execution_delta == "15m" assert task.depends_on[1].execution_delta == "15m"
def test_task_trigger_rule(self):
query_file = (
TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "test"
/ "incremental_query_v1"
/ "query.sql"
)
task = Task(
dag_name="bqetl_test_dag",
owner="test@example.org",
query_file=str(query_file),
trigger_rule="all_success",
)
assert task.trigger_rule == "all_success"
with pytest.raises(ValueError, match=r"Invalid trigger rule an_invalid_rule"):
assert Task(
dag_name="bqetl_test_dag",
owner="test@example.org",
query_file=str(query_file),
trigger_rule="an_invalid_rule",
)
def test_task_ref(self): def test_task_ref(self):
task_ref = TaskRef(dag_name="test_dag", task_id="task") task_ref = TaskRef(dag_name="test_dag", task_id="task")