From 18a7a35eabc8aef5a9ba2f2cf0f9b64d5f5ebd58 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 9 Dec 2020 01:40:45 -0800 Subject: [PATCH] Enhanced TaskFlow API tutorial to use @dag decorator (#12937) Updated both the tutorial python file in the example_dags directory and the tutorial documentation --- .../example_dags/tutorial_taskflow_api_etl.py | 57 +++++++++---------- docs/apache-airflow/tutorial_taskflow_api.rst | 13 ++++- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py index 5ecd371d2a..ff2503bc54 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl.py @@ -17,21 +17,12 @@ # under the License. # pylint: disable=missing-function-docstring -""" -### TaskFlow API Tutorial Documentation -This is a simple ETL data pipeline example which demonstrates the use of the TaskFlow API -using three simple tasks for Extract, Transform, and Load. - -Documentation that goes along with the Airflow TaskFlow API tutorial located -[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html) -""" # [START tutorial] # [START import_module] import json -# The DAG object; we'll need this to instantiate a DAG -from airflow import DAG +from airflow.decorators import dag, task from airflow.utils.dates import days_ago # [END import_module] @@ -44,28 +35,28 @@ default_args = { } # [END default_args] + # [START instantiate_dag] -with DAG( - 'tutorial_taskflow_api_etl_dag', - default_args=default_args, - description='TaskFlow API ETL DAG tutorial', - schedule_interval=None, - start_date=days_ago(2), - tags=['example'], -) as dag: +@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2)) +def tutorial_taskflow_api_etl(): + """ + ### TaskFlow API Tutorial Documentation + This is a simple ETL data pipeline example which demonstrates the use of + the TaskFlow API using three simple tasks for Extract, Transform, and Load. + Documentation that goes along with the Airflow TaskFlow API tutorial is + located + [here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html) + """ # [END instantiate_dag] - # [START documentation] - dag.doc_md = __doc__ - # [END documentation] - # [START extract] - @dag.task() + @task() def extract(): """ #### Extract task - A simple Extract task to get data ready for the rest of the data pipeline. - In this case, getting data is simulated by reading from a hardcoded JSON string. + A simple Extract task to get data ready for the rest of the data + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' @@ -75,12 +66,12 @@ with DAG( # [END extract] # [START transform] - @dag.task(multiple_outputs=True) + @task(multiple_outputs=True) def transform(order_data_dict: dict): """ #### Transform task - A simple Transform task which takes in the collection of order data and computes - the total order value. + A simple Transform task which takes in the collection of order data and + computes the total order value. """ total_order_value = 0 @@ -92,12 +83,12 @@ with DAG( # [END transform] # [START load] - @dag.task() + @task() def load(total_order_value: float): """ #### Load task - A simple Load task which takes in the result of the Transform task and instead of - saving it to end user review, just prints it out. + A simple Load task which takes in the result of the Transform task and + instead of saving it to end user review, just prints it out. """ print("Total order value is: %.2f" % total_order_value) @@ -111,4 +102,8 @@ with DAG( # [END main_flow] +# [START dag_invocation] +tutorial_etl_dag = tutorial_taskflow_api_etl() +# [END dag_invocation] + # [END tutorial] diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst index 29422dabbc..8154bd87d9 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial_taskflow_api.rst @@ -54,6 +54,8 @@ Instantiate a DAG We are creating a DAG which is the collection of our tasks with dependencies between the tasks. This is a very simple definition, since we just want the DAG to be run when we set this up with Airflow, without any retries or complex scheduling. +In this example, please notice that we are creating this DAG using the ``@dag`` decorator +as shown below, with the python function name acting as the DAG identifier. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py :language: python @@ -62,7 +64,7 @@ when we set this up with Airflow, without any retries or complex scheduling. Tasks ----- -In this data pipeline, tasks are created based on Python functions using the decorator +In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator as shown below. The function name acts as a unique identifier for the task. .. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py @@ -90,6 +92,15 @@ the Transform task for summarization, and then invoked the Load task with the su The dependencies between the tasks and the passing of data between these tasks which could be running on different workers on different nodes on the network is all handled by Airflow. +Now to actually enable this to be run as a DAG, we invoke the python function +``tutorial_taskflow_api_etl`` set up using the ``@dag`` decorator earlier, as shown below. + +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py + :language: python + :start-after: [START dag_invocation] + :end-before: [END dag_invocation] + + But how? -------- For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with