Use Jinja's expression statement extension. (#3180)
This commit is contained in:
Родитель
0fef8be958
Коммит
ddf3c9b2f7
|
@ -205,7 +205,8 @@ class Dag:
|
|||
def _jinja_env(self):
|
||||
"""Prepare and load custom formatters into the jinja environment."""
|
||||
env = Environment(
|
||||
loader=PackageLoader("bigquery_etl", "query_scheduling/templates")
|
||||
loader=PackageLoader("bigquery_etl", "query_scheduling/templates"),
|
||||
extensions=["jinja2.ext.do"],
|
||||
)
|
||||
|
||||
# load custom formatters into Jinja env
|
||||
|
|
|
@ -142,7 +142,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
execution_date="{% raw %}{{{% endraw %} (execution_date - {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
|
||||
{% endif -%}
|
||||
)
|
||||
{{ seenDownstreamDags.append(downstream_task.dag_name) or "" }}
|
||||
{% do seenDownstreamDags.append(downstream_task.dag_name) %}
|
||||
{% endif -%}
|
||||
{% endfor -%}
|
||||
{% for downstream_task in task.external_downstream_tasks | sort(attribute='task_id') -%}
|
||||
|
@ -155,7 +155,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
|
||||
{% endif -%}
|
||||
)
|
||||
{{ seenDownstreamDags.append(downstream_task.dag_name) or "" }}
|
||||
{% do seenDownstreamDags.append(downstream_task.dag_name) %}
|
||||
{% endif -%}
|
||||
{% endfor -%}
|
||||
|
||||
|
@ -186,7 +186,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
failed_states=FAILED_STATES,
|
||||
pool='DATA_ENG_EXTERNALTASKSENSOR',
|
||||
)
|
||||
{{ wait_for_seen.append(dependency.task_key) or "" }}
|
||||
{% do wait_for_seen.append(dependency.task_key) %}
|
||||
{% endif -%}
|
||||
|
||||
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_id }})
|
||||
|
@ -208,7 +208,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
)
|
||||
|
||||
{{ fivetran_task.task_id }}_sync_wait.set_upstream({{ fivetran_task.task_id }}_sync_start)
|
||||
{{ fivetran_seen.append(fivetran_task) or "" }}
|
||||
{% do fivetran_seen.append(fivetran_task) %}
|
||||
{% endif -%}
|
||||
{{ task.task_name }}.set_upstream({{ fivetran_task.task_id }}_sync_wait)
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
execution_date="{% raw %}{{{% endraw %} (execution_date - {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
|
||||
{% endif -%}
|
||||
)
|
||||
{{ seenDownstreamDags.append(downstream_task.dag_name) or "" }}
|
||||
{% do seenDownstreamDags.append(downstream_task.dag_name) %}
|
||||
{% endif -%}
|
||||
{% endfor -%}
|
||||
{% for downstream_task in task.external_downstream_tasks | sort(attribute='task_id') -%}
|
||||
|
@ -101,7 +101,7 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
|
|||
execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
|
||||
{% endif -%}
|
||||
)
|
||||
{{ seenDownstreamDags.append(downstream_task.dag_name) or "" }}
|
||||
{% do seenDownstreamDags.append(downstream_task.dag_name) %}
|
||||
{% endif -%}
|
||||
{% endfor -%}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче