Test sla and execution_timeout on public data dag

This commit is contained in:
Anna Scholtz 2021-06-16 15:26:34 -07:00
Родитель 011dbd6367
Коммит e1f530d168
4 изменённых файлов: 33 добавлений и 14 удалений

Просмотреть файл

@ -29,7 +29,7 @@ default_args = {{
format_attr("retry_delay", "format_timedelta")
}}
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs) as dag:
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval is not none -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs) as dag:
{% for task in tasks | sort(attribute='task_name') %}
{% if task.is_python_script -%}
{{ task.task_name }} = gke_command(
@ -41,10 +41,10 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest',
owner='{{ task.owner }}',
email={{ task.email | sort }},
{%+ if task.sla != None -%}
{%+ if task.sla is not none -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
{%+ if task.execution_timeout is not none -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)
@ -82,19 +82,19 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{%+ if task.priority -%}
priority_weight={{ task.priority }},
{%+ endif -%}
{%+ if task.retry_delay != None -%}
{%+ if task.retry_delay is not none -%}
retry_delay={{ task.retry_delay | format_timedelta | format_repr }},
{%+ endif -%}
{%+ if task.retries != None -%}
{%+ if task.retries is not none -%}
retries={{ task.retries }},
{%+ endif -%}
{%+ if task.email_on_retry != None -%}
{%+ if task.email_on_retry is not none -%}
email_on_retry={{ task.email_on_retry }},
{%+ endif -%}
{%+ if task.sla != None -%}
{%+ if task.sla is not none -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
{%+ if task.execution_timeout is not none -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
dag=dag,
@ -121,10 +121,10 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
check_existence=True,
mode='reschedule',
pool='DATA_ENG_EXTERNALTASKSENSOR',
{%+ if task.sla != None -%}
{%+ if task.sla is not none -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
{%+ if task.execution_timeout is not none -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)
@ -149,10 +149,10 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
mode='reschedule',
pool='DATA_ENG_EXTERNALTASKSENSOR',
dag=dag,
{%+ if task.sla != None -%}
{%+ if task.sla is not none -%}
sla={{ task.sla }},
{%+ endif -%}
{%+ if task.execution_timeout != None -%}
{%+ if task.execution_timeout is not none -%}
execution_timeout={{ task.execution_timeout }},
{%+ endif -%}
)

Просмотреть файл

@ -30,7 +30,7 @@ default_args = {{
format_attr("retry_delay", "format_timedelta")
}}
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs) as dag:
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval is not none -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs) as dag:
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
{% for task in tasks | sort(attribute='task_name') %}
{{ task.task_name }} = GKEPodOperator(
@ -45,6 +45,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
+ ["--parameter={{ task.date_partition_parameter }}:DATE:{% raw %}{{ds}}{% endraw -%}"]
{%- endif -%},
image=docker_image,
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
dag=dag
)
{% endfor -%}
@ -64,7 +67,9 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
check_existence=True,
mode='reschedule',
pool='DATA_ENG_EXTERNALTASKSENSOR',
sla=datetime.timedelta(hours=3)
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
)
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_id }})

Просмотреть файл

@ -56,6 +56,9 @@ with DAG(
+ ["--project_id=moz-fx-data-shared-prod"]
+ ["--parameter=submission_date:DATE:{{ds}}"],
image=docker_image,
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
dag=dag,
)
@ -71,6 +74,9 @@ with DAG(
+ ["--project_id=moz-fx-data-shared-prod"]
+ ["--parameter=submission_date:DATE:{{ds}}"],
image=docker_image,
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
dag=dag,
)
@ -82,6 +88,9 @@ with DAG(
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
)
export_public_data_json_mozregression_aggregates__v1.set_upstream(
@ -96,6 +105,9 @@ with DAG(
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
)
export_public_data_json_telemetry_derived__ssl_ratios__v1.set_upstream(

Просмотреть файл

@ -51,6 +51,7 @@ with DAG(
image=docker_image,
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
dag=dag,
)
@ -63,6 +64,7 @@ with DAG(
pool="DATA_ENG_EXTERNALTASKSENSOR",
sla=datetime.timedelta(minutes=5),
execution_timeout=datetime.timedelta(minutes=10),
email_on_retry=False,
)
export_public_data_json_test__non_incremental_query__v1.set_upstream(