Enhanced TaskFlow API tutorial to use @dag decorator (#12937)
Updated both the tutorial python file in the example_dags directory and the tutorial documentation
This commit is contained in:
Родитель
5c74c3a5c1
Коммит
18a7a35eab
|
@ -17,21 +17,12 @@
|
|||
# under the License.
|
||||
|
||||
# pylint: disable=missing-function-docstring
|
||||
"""
|
||||
### TaskFlow API Tutorial Documentation
|
||||
|
||||
This is a simple ETL data pipeline example which demonstrates the use of the TaskFlow API
|
||||
using three simple tasks for Extract, Transform, and Load.
|
||||
|
||||
Documentation that goes along with the Airflow TaskFlow API tutorial located
|
||||
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
|
||||
"""
|
||||
# [START tutorial]
|
||||
# [START import_module]
|
||||
import json
|
||||
|
||||
# The DAG object; we'll need this to instantiate a DAG
|
||||
from airflow import DAG
|
||||
from airflow.decorators import dag, task
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
# [END import_module]
|
||||
|
@ -44,28 +35,28 @@ default_args = {
|
|||
}
|
||||
# [END default_args]
|
||||
|
||||
|
||||
# [START instantiate_dag]
|
||||
with DAG(
|
||||
'tutorial_taskflow_api_etl_dag',
|
||||
default_args=default_args,
|
||||
description='TaskFlow API ETL DAG tutorial',
|
||||
schedule_interval=None,
|
||||
start_date=days_ago(2),
|
||||
tags=['example'],
|
||||
) as dag:
|
||||
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
|
||||
def tutorial_taskflow_api_etl():
|
||||
"""
|
||||
### TaskFlow API Tutorial Documentation
|
||||
This is a simple ETL data pipeline example which demonstrates the use of
|
||||
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
|
||||
Documentation that goes along with the Airflow TaskFlow API tutorial is
|
||||
located
|
||||
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
|
||||
"""
|
||||
# [END instantiate_dag]
|
||||
|
||||
# [START documentation]
|
||||
dag.doc_md = __doc__
|
||||
# [END documentation]
|
||||
|
||||
# [START extract]
|
||||
@dag.task()
|
||||
@task()
|
||||
def extract():
|
||||
"""
|
||||
#### Extract task
|
||||
A simple Extract task to get data ready for the rest of the data pipeline.
|
||||
In this case, getting data is simulated by reading from a hardcoded JSON string.
|
||||
A simple Extract task to get data ready for the rest of the data
|
||||
pipeline. In this case, getting data is simulated by reading from a
|
||||
hardcoded JSON string.
|
||||
"""
|
||||
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
|
||||
|
||||
|
@ -75,12 +66,12 @@ with DAG(
|
|||
# [END extract]
|
||||
|
||||
# [START transform]
|
||||
@dag.task(multiple_outputs=True)
|
||||
@task(multiple_outputs=True)
|
||||
def transform(order_data_dict: dict):
|
||||
"""
|
||||
#### Transform task
|
||||
A simple Transform task which takes in the collection of order data and computes
|
||||
the total order value.
|
||||
A simple Transform task which takes in the collection of order data and
|
||||
computes the total order value.
|
||||
"""
|
||||
total_order_value = 0
|
||||
|
||||
|
@ -92,12 +83,12 @@ with DAG(
|
|||
# [END transform]
|
||||
|
||||
# [START load]
|
||||
@dag.task()
|
||||
@task()
|
||||
def load(total_order_value: float):
|
||||
"""
|
||||
#### Load task
|
||||
A simple Load task which takes in the result of the Transform task and instead of
|
||||
saving it to end user review, just prints it out.
|
||||
A simple Load task which takes in the result of the Transform task and
|
||||
instead of saving it to end user review, just prints it out.
|
||||
"""
|
||||
|
||||
print("Total order value is: %.2f" % total_order_value)
|
||||
|
@ -111,4 +102,8 @@ with DAG(
|
|||
# [END main_flow]
|
||||
|
||||
|
||||
# [START dag_invocation]
|
||||
tutorial_etl_dag = tutorial_taskflow_api_etl()
|
||||
# [END dag_invocation]
|
||||
|
||||
# [END tutorial]
|
||||
|
|
|
@ -54,6 +54,8 @@ Instantiate a DAG
|
|||
We are creating a DAG which is the collection of our tasks with dependencies between
|
||||
the tasks. This is a very simple definition, since we just want the DAG to be run
|
||||
when we set this up with Airflow, without any retries or complex scheduling.
|
||||
In this example, please notice that we are creating this DAG using the ``@dag`` decorator
|
||||
as shown below, with the python function name acting as the DAG identifier.
|
||||
|
||||
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
|
||||
:language: python
|
||||
|
@ -62,7 +64,7 @@ when we set this up with Airflow, without any retries or complex scheduling.
|
|||
|
||||
Tasks
|
||||
-----
|
||||
In this data pipeline, tasks are created based on Python functions using the decorator
|
||||
In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator
|
||||
as shown below. The function name acts as a unique identifier for the task.
|
||||
|
||||
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
|
||||
|
@ -90,6 +92,15 @@ the Transform task for summarization, and then invoked the Load task with the su
|
|||
The dependencies between the tasks and the passing of data between these tasks which could be
|
||||
running on different workers on different nodes on the network is all handled by Airflow.
|
||||
|
||||
Now to actually enable this to be run as a DAG, we invoke the python function
|
||||
``tutorial_taskflow_api_etl`` set up using the ``@dag`` decorator earlier, as shown below.
|
||||
|
||||
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
|
||||
:language: python
|
||||
:start-after: [START dag_invocation]
|
||||
:end-before: [END dag_invocation]
|
||||
|
||||
|
||||
But how?
|
||||
--------
|
||||
For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with
|
||||
|
|
Загрузка…
Ссылка в новой задаче