From 011dbd636774e98dfc72ab9d7bd867eb8591a16a Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Wed, 16 Jun 2021 15:26:16 -0700 Subject: [PATCH] Add sla and execution_timeout parameters to Airflow tasks --- bigquery_etl/query_scheduling/task.py | 20 ++++++++++++++++ .../query_scheduling/templates/airflow_dag.j2 | 24 +++++++++++++++++++ tests/data/dags/test_public_data_json_dag | 4 ++++ 3 files changed, 48 insertions(+) diff --git a/bigquery_etl/query_scheduling/task.py b/bigquery_etl/query_scheduling/task.py index d1803e57db..632dee719b 100644 --- a/bigquery_etl/query_scheduling/task.py +++ b/bigquery_etl/query_scheduling/task.py @@ -160,6 +160,8 @@ class Task: retry_delay: Optional[str] = attr.ib(None) retries: Optional[int] = attr.ib(None) email_on_retry: Optional[bool] = attr.ib(None) + sla: Optional[str] = attr.ib(None) # todo: set to 3h + execution_timeout: Optional[str] = attr.ib(None) # todo: set to 24h @owner.validator def validate_owner(self, attribute, value): @@ -210,6 +212,24 @@ class Task: "Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..." ) + @sla.validator + def validate_sla(self, attribute, value): + """Check that sla is in a valid timedelta format.""" + if value is not None and not is_timedelta_string(value): + raise ValueError( + f"Invalid timedelta definition for {attribute}: {value}." + "Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..." + ) + + @execution_timeout.validator + def validate_execution_timeout(self, attribute, value): + """Check that execution_timeout is in a valid timedelta format.""" + if value is not None and not is_timedelta_string(value): + raise ValueError( + f"Invalid timedelta definition for {attribute}: {value}." + "Timedeltas should be specified like: 1h, 30m, 1h15m, 1d4h45m, ..." + ) + def __attrs_post_init__(self): """Extract information from the query file name.""" query_file_re = re.search(QUERY_FILE_RE, self.query_file) diff --git a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 index b2e46012c5..d3a756bc1c 100644 --- a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 @@ -41,6 +41,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest', owner='{{ task.owner }}', email={{ task.email | sort }}, + {%+ if task.sla != None -%} + sla={{ task.sla }}, + {%+ endif -%} + {%+ if task.execution_timeout != None -%} + execution_timeout={{ task.execution_timeout }}, + {%+ endif -%} ) {%+ else -%} {{ task.task_name }} = bigquery_etl_query( @@ -85,6 +91,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None {%+ if task.email_on_retry != None -%} email_on_retry={{ task.email_on_retry }}, {%+ endif -%} + {%+ if task.sla != None -%} + sla={{ task.sla }}, + {%+ endif -%} + {%+ if task.execution_timeout != None -%} + execution_timeout={{ task.execution_timeout }}, + {%+ endif -%} dag=dag, ) {% endif -%} @@ -109,6 +121,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None check_existence=True, mode='reschedule', pool='DATA_ENG_EXTERNALTASKSENSOR', + {%+ if task.sla != None -%} + sla={{ task.sla }}, + {%+ endif -%} + {%+ if task.execution_timeout != None -%} + execution_timeout={{ task.execution_timeout }}, + {%+ endif -%} ) {{ wait_for_seen.append((dependency.dag_name, dependency.task_id)) or "" }} {% endif -%} @@ -131,6 +149,12 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None mode='reschedule', pool='DATA_ENG_EXTERNALTASKSENSOR', dag=dag, + {%+ if task.sla != None -%} + sla={{ task.sla }}, + {%+ endif -%} + {%+ if task.execution_timeout != None -%} + execution_timeout={{ task.execution_timeout }}, + {%+ endif -%} ) {{ wait_for_seen.append((task_ref.dag_name, task_ref.task_id)) or "" }} {%+ endif -%} diff --git a/tests/data/dags/test_public_data_json_dag b/tests/data/dags/test_public_data_json_dag index be9ced7404..79453c6c48 100644 --- a/tests/data/dags/test_public_data_json_dag +++ b/tests/data/dags/test_public_data_json_dag @@ -49,6 +49,8 @@ with DAG( + ["--project_id=moz-fx-data-test-project"] + ["--parameter=submission_date:DATE:{{ds}}"], image=docker_image, + sla=datetime.timedelta(minutes=5), + execution_timeout=datetime.timedelta(minutes=10), dag=dag, ) @@ -59,6 +61,8 @@ with DAG( check_existence=True, mode="reschedule", pool="DATA_ENG_EXTERNALTASKSENSOR", + sla=datetime.timedelta(minutes=5), + execution_timeout=datetime.timedelta(minutes=10), ) export_public_data_json_test__non_incremental_query__v1.set_upstream(