Airflow tutorial to use Decorated Flows (#11308)

Created a new Airflow tutorial to use Decorated Flows (a.k.a. functional
DAGs). Also created a DAG to perform the same operations without using
functional DAGs to be compatible with Airflow 1.10.x and to show the
difference.

* Apply suggestions from code review

It makes sense to simplify the return variables being passed around without needlessly converting to JSON and then reconverting back.

* Update tutorial_functional_etl_dag.py

Fixed data passing between tasks to be more natural without converting to JSON and converting back to variables.

* Updated dag options and task doc formating

Based on feedback on the PR, updated the DAG options (including schedule) and the fixed the task documentation to avoid indentation.

* Added documentation file for functional dag tutorial

Added the tutorial documentation to the docs directory. Fixed linting errors in the example dags.
Tweaked some doc references in the example dags for inclusion into the tutorial documentation.
Added the example dags to example tests.

* Removed multiple_outputs from task defn

Had a multiple_outputs=True defined in the Extract task defn, which was unnecessary. - Removed based on feedback.

Co-authored-by: Gerard Casas Saez <casassg@users.noreply.github.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
This commit is contained in:
Vikram Koka 2020-10-13 08:59:20 -07:00 коммит произвёл GitHub
Родитель 16e7129719
Коммит 095756c6e8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 397 добавлений и 1 удалений

Просмотреть файл

@ -201,7 +201,7 @@ repos:
args:
- --convention=pep257
- --add-ignore=D100,D102,D104,D105,D107,D200,D205,D400,D401
exclude: ^tests/.*\.py$|^scripts/.*\.py$|^dev|^provider_packages|^kubernetes_tests
exclude: ^tests/.*\.py$|^scripts/.*\.py$|^dev|^provider_packages|^kubernetes_tests|.*example_dags/.*
- repo: local
hooks:
- id: shellcheck

Просмотреть файл

@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=missing-function-docstring
"""
### Functional DAG Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of Functional DAGs
using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow Functional DAG tutorial located
[here](https://airflow.apache.org/tutorial_functional.html)
"""
# [START tutorial]
# [START import_module]
import json
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
# [END default_args]
# [START instantiate_dag]
with DAG(
'tutorial_functional_etl_dag',
default_args=default_args,
description='Functional ETL DAG tutorial',
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
) as dag:
# [END instantiate_dag]
# [START documentation]
dag.doc_md = __doc__
# [END documentation]
# [START extract]
@dag.task()
def extract():
data_string = u'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
# [END extract]
extract.doc_md = """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
"""
# [START transform]
@dag.task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
# [END transform]
transform.doc_md = """\
#### Transform task
A simple Transform task which takes in the collection of order data and computes
the total order value.
"""
# [START load]
@dag.task()
def load(total_order_value: float):
print("Total order value is: %.2f" % total_order_value)
# [END load]
load.doc_md = """\
#### Load task
A simple Load task which takes in the result of the Transform task and instead of
saving it to end user review, just prints it out.
"""
# [START main_flow]
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
# [END main_flow]
# [END tutorial]

Просмотреть файл

@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=missing-function-docstring
"""
### ETL DAG Tutorial Documentation
This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
as part of the documentation that goes along with the Airflow Functional DAG tutorial located
[here](https://airflow.apache.org/tutorial_decorated_flows.html)
"""
# [START tutorial]
# [START import_module]
import json
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
# [END default_args]
# [START instantiate_dag]
with DAG(
'tutorial_etl_dag',
default_args=default_args,
description='ETL DAG tutorial',
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
) as dag:
# [END instantiate_dag]
# [START documentation]
dag.doc_md = __doc__
# [END documentation]
# [START extract_function]
def extract(**kwargs):
ti = kwargs['ti']
data_string = u'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push('order_data', data_string)
# [END extract_function]
# [START transform_function]
def transform(**kwargs):
ti = kwargs['ti']
extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push('total_order_value', total_value_json_string)
# [END transform_function]
# [START load_function]
def load(**kwargs):
ti = kwargs['ti']
total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
total_order_value = json.loads(total_value_string)
print(total_order_value)
# [END load_function]
# [START main_flow]
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
)
extract_task.doc_md = """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
)
transform_task.doc_md = """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
load_task = PythonOperator(
task_id='load',
python_callable=load,
)
load_task.doc_md = """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
extract_task >> transform_task >> load_task
# [END main_flow]
# [END tutorial]

Просмотреть файл

@ -79,6 +79,7 @@ Content
start
installation
tutorial
tutorial_decorated_flows
howto/index
ui
concepts

Просмотреть файл

@ -1224,6 +1224,7 @@ subtask
subtasks
sudo
sudoers
summarization
superclass
svg
swp

Просмотреть файл

@ -0,0 +1,152 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Tutorial on Decorated Flows
===========================
This tutorial builds on the regular Airflow Tutorial and focuses specifically
on writing data pipelines using the Decorated Flow paradigm which is introduced as
part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.
The data pipeline chosen here is a simple ETL pattern with
three separate tasks for Extract, Transform, and Load.
Example "Decorated Flow" ETL Pipeline
-------------------------------------
Here is very simple ETL pipeline using the Decorated Flows paradigm. A more detailed
explanation is given below.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
It's a DAG definition file
--------------------------
If this is the first DAG file you are looking at, please note that this Python script
is interpreted by Airflow and is a configuration file for your data pipeline.
For a complete introduction to DAG files, please look at the core :doc:`Airflow tutorial<tutorial>`
which covers DAG structure and definitions extensively.
Instantiate a DAG
-----------------
We are creating a DAG which is the collection of our tasks with dependencies between
the tasks. This is a very simple definition, since we just want the DAG to be run
when we set this up with Airflow, without any retries or complex scheduling.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]
Tasks
-----
In this data pipeline, tasks are created based on Python functions using the decorator
as shown below. The function name acts as a unique identifier for the task.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START extract]
:end-before: [END extract]
The returned value, which in this case is a dictionary, will be made available for use in later tasks.
The Transform and Load tasks are created in the same manner as the Extract task shown above.
Main flow of the DAG
--------------------
Now that we have the Extract, Transform, and Load tasks defined based on the Python functions,
we can move to the main part of the DAG.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
That's it, we are done!
We have invoked the Extract task, obtained the order data from there and sent it over to
the Transform task for summarization, and then invoked the Load task with the summarized data.
The dependencies between the tasks and the passing of data between these tasks which could be
running on different workers on different nodes on the network is all handled by Airflow.
But how?
--------
For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with
how this DAG had to be written before Airflow 2.0 below:
.. exampleinclude:: /../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
All of the processing shown above is being done in the new Airflow 2.0 dag as well, but
it is all abstracted from the DAG developer.
Let's examine this in detail by looking at the Transform task in isolation since it is
in the middle of the data pipeline. In Airflow 1.x, this task is defined as shown below:
.. exampleinclude:: /../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START transform_function]
:end-before: [END transform_function]
As we see here, the data being processed in the Transform function is passed to it using Xcom
variables. In turn, the summarized data from the Transform function is also placed
into another Xcom variable which will then be used by the Load task.
Contrasting that with Decorated Flows in Airflow 2.0 as shown below.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START transform]
:end-before: [END transform]
All of the Xcom usage for data passing between these tasks is abstracted away from the DAG author
in Airflow 2.0. However, Xcom variables are used behind the scenes and can be viewed using
the Airflow UI as necessary for debugging or DAG monitoring.
Similarly, task dependencies are automatically generated within Decorated Flows based on the
functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and
dependencies specified as shown below.
.. exampleinclude:: /../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
In contrast, with Decorated Flows in Airflow 2.0, the invocation itself automatically generates
the dependencies as shown below.
.. exampleinclude:: /../airflow/example_dags/tutorial_decorated_etl_dag.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
What's Next?
------------
You have seen how simple it is to write DAGs using the Decorated Flows paradigm within Airflow 2.0. Please do
read the :ref:`Concepts page<concepts>` for detailed explanation of Airflow concepts such as DAGs, Tasks,
Operators, etc, and the :ref:`concepts:task_decorator` in particular.

Просмотреть файл

@ -26,6 +26,8 @@ class TestExampleDagsSystem(SystemTest):
@parameterized.expand([
"example_bash_operator",
"example_branch_operator"
"tutorial_etl_dag"
"tutorial_functional_etl_dag"
])
def test_dag_example(self, dag_id):
self.run_dag(dag_id=dag_id)