From ddf3c9b2f7a4ccecd5bb2e3d27cf7ebc964e59e8 Mon Sep 17 00:00:00 2001 From: Sean Rose <1994030+sean-rose@users.noreply.github.com> Date: Wed, 24 Aug 2022 13:36:53 -0700 Subject: [PATCH] Use Jinja's expression statement extension. (#3180) --- bigquery_etl/query_scheduling/dag.py | 3 ++- bigquery_etl/query_scheduling/templates/airflow_dag.j2 | 8 ++++---- .../templates/public_data_json_airflow_dag.j2 | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/bigquery_etl/query_scheduling/dag.py b/bigquery_etl/query_scheduling/dag.py index 76025a9d4c..038d24ec84 100644 --- a/bigquery_etl/query_scheduling/dag.py +++ b/bigquery_etl/query_scheduling/dag.py @@ -205,7 +205,8 @@ class Dag: def _jinja_env(self): """Prepare and load custom formatters into the jinja environment.""" env = Environment( - loader=PackageLoader("bigquery_etl", "query_scheduling/templates") + loader=PackageLoader("bigquery_etl", "query_scheduling/templates"), + extensions=["jinja2.ext.do"], ) # load custom formatters into Jinja env diff --git a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 index db46b3b6fd..237f9b0dde 100644 --- a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 @@ -142,7 +142,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None execution_date="{% raw %}{{{% endraw %} (execution_date - {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}", {% endif -%} ) - {{ seenDownstreamDags.append(downstream_task.dag_name) or "" }} + {% do seenDownstreamDags.append(downstream_task.dag_name) %} {% endif -%} {% endfor -%} {% for downstream_task in task.external_downstream_tasks | sort(attribute='task_id') -%} @@ -155,7 +155,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}", {% endif -%} ) - {{ seenDownstreamDags.append(downstream_task.dag_name) or "" }} + {% do seenDownstreamDags.append(downstream_task.dag_name) %} {% endif -%} {% endfor -%} @@ -186,7 +186,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None failed_states=FAILED_STATES, pool='DATA_ENG_EXTERNALTASKSENSOR', ) - {{ wait_for_seen.append(dependency.task_key) or "" }} + {% do wait_for_seen.append(dependency.task_key) %} {% endif -%} {{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_id }}) @@ -208,7 +208,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None ) {{ fivetran_task.task_id }}_sync_wait.set_upstream({{ fivetran_task.task_id }}_sync_start) - {{ fivetran_seen.append(fivetran_task) or "" }} + {% do fivetran_seen.append(fivetran_task) %} {% endif -%} {{ task.task_name }}.set_upstream({{ fivetran_task.task_id }}_sync_wait) diff --git a/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 index 623493d7be..d07024a4ef 100644 --- a/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2 @@ -88,7 +88,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None execution_date="{% raw %}{{{% endraw %} (execution_date - {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}", {% endif -%} ) - {{ seenDownstreamDags.append(downstream_task.dag_name) or "" }} + {% do seenDownstreamDags.append(downstream_task.dag_name) %} {% endif -%} {% endfor -%} {% for downstream_task in task.external_downstream_tasks | sort(attribute='task_id') -%} @@ -101,7 +101,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}", {% endif -%} ) - {{ seenDownstreamDags.append(downstream_task.dag_name) or "" }} + {% do seenDownstreamDags.append(downstream_task.dag_name) %} {% endif -%} {% endfor -%}