Add sla and execution_timeout parameters to Airflow tasks

This commit is contained in:
Anna Scholtz 2021-06-16 15:26:16 -07:00
Родитель 6532fee02c
Коммит 011dbd6367
3 изменённых файлов: 48 добавлений и 0 удалений

Просмотреть файл

@ -160,6 +160,8 @@ class Task:
retry_delay: Optional[str] = attr.ib(None)
retries: Optional[int] = attr.ib(None)
email_on_retry: Optional[bool] = attr.ib(None)
sla: Optional[str] = attr.ib(None) # todo: set to 3h
execution_timeout: Optional[str] = attr.ib(None) # todo: set to 24h
@owner.validator
def validate_owner(self, attribute, value):
@ -210,6 +212,24 @@ class Task:
"Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..."
)
@sla.validator
def validate_sla(self, attribute, value):
"""Check that sla is in a valid timedelta format."""
if value is not None and not is_timedelta_string(value):
raise ValueError(
f"Invalid timedelta definition for {attribute}: {value}."
"Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..."
)
@execution_timeout.validator
def validate_execution_timeout(self, attribute, value):
"""Check that execution_timeout is in a valid timedelta format."""
if value is not None and not is_timedelta_string(value):
raise ValueError(
f"Invalid timedelta definition for {attribute}: {value}."
"Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..."
)
def __attrs_post_init__(self):
"""Extract information from the query file name."""
query_file_re = re.search(QUERY_FILE_RE, self.query_file)

Просмотреть файл

@ -41,6 +41,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest',
owner='{{ task.owner }}',
email={{ task.email | sort }},
{%+ if task.sla != None -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)
{%+ else -%}
{{ task.task_name }} = bigquery_etl_query(
@ -85,6 +91,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{%+ if task.email_on_retry != None -%}
email_on_retry={{ task.email_on_retry }},
{%+ endif -%}
{%+ if task.sla != None -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
dag=dag,
)
{% endif -%}
@ -109,6 +121,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
check_existence=True,
mode='reschedule',
pool='DATA_ENG_EXTERNALTASKSENSOR',
{%+ if task.sla != None -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)
{{ wait_for_seen.append((dependency.dag_name, dependency.task_id)) or "" }}
{% endif -%}
@ -131,6 +149,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
mode='reschedule',
pool='DATA_ENG_EXTERNALTASKSENSOR',
dag=dag,
{%+ if task.sla != None -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)
{{ wait_for_seen.append((task_ref.dag_name, task_ref.task_id)) or "" }}
{%+ endif -%}

Просмотреть файл

@ -49,6 +49,8 @@ with DAG(
+ ["--project_id=moz-fx-data-test-project"]
+ ["--parameter=submission_date:DATE:{{ds}}"],
image=docker_image,
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
dag=dag,
)
@ -59,6 +61,8 @@ with DAG(
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
)
export_public_data_json_test__non_incremental_query__v1.set_upstream(