[AIRFLOW-6316] Use exampleinclude directives in tutorial.rst (#6868)

Recently we hard code in tutorial.rst which
is hard to maintain, such as `set_upstream`
is change to shift in tutorial.py but still
in tutorial.rst. Use sphinx is a better way
This commit is contained in:
Jiajie Zhong 2019-12-22 19:48:28 +08:00 коммит произвёл Kamil Breguła
Родитель a6bc4165ff
Коммит 072dab62c4
3 изменённых файлов: 68 добавлений и 180 удалений

Просмотреть файл

@ -22,12 +22,19 @@
Documentation that goes along with the Airflow tutorial located Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html) [here](https://airflow.apache.org/tutorial.html)
""" """
# [START tutorial]
from datetime import timedelta from datetime import timedelta
import airflow import airflow
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
# [END import_module]
# [START default_args]
# These args will get passed on to each operator # These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization # You can override them on a per-task basis during operator initialization
default_args = { default_args = {
@ -53,21 +60,37 @@ default_args = {
# 'sla_miss_callback': yet_another_function, # 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success' # 'trigger_rule': 'all_success'
} }
# [END default_args]
# [START instantiate_dag]
dag = DAG( dag = DAG(
'tutorial', 'tutorial',
default_args=default_args, default_args=default_args,
description='A simple tutorial DAG', description='A simple tutorial DAG',
schedule_interval=timedelta(days=1), schedule_interval=timedelta(days=1),
) )
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators # t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator( t1 = BashOperator(
task_id='print_date', task_id='print_date',
bash_command='date', bash_command='date',
dag=dag, dag=dag,
) )
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# [END basic_task]
# [START documentation]
dag.doc_md = __doc__
t1.doc_md = """\ t1.doc_md = """\
#### Task Documentation #### Task Documentation
You can document your task using the attributes `doc_md` (markdown), You can document your task using the attributes `doc_md` (markdown),
@ -75,16 +98,9 @@ You can document your task using the attributes `doc_md` (markdown),
rendered in the UI's Task Instance Details page. rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
""" """
# [END documentation]
dag.doc_md = __doc__ # [START jinja_template]
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag,
)
templated_command = """ templated_command = """
{% for i in range(5) %} {% for i in range(5) %}
echo "{{ ds }}" echo "{{ ds }}"
@ -100,5 +116,7 @@ t3 = BashOperator(
params={'my_param': 'Parameter I passed in'}, params={'my_param': 'Parameter I passed in'},
dag=dag, dag=dag,
) )
# [END jinja_template]
t1 >> [t2, t3] t1 >> [t2, t3]
# [END tutorial]

Просмотреть файл

@ -127,7 +127,8 @@ extensions = [
'sphinx.ext.intersphinx', 'sphinx.ext.intersphinx',
'autoapi.extension', 'autoapi.extension',
'exampleinclude', 'exampleinclude',
'docroles' 'docroles',
'removemarktransform',
] ]
autodoc_default_options = { autodoc_default_options = {

Просмотреть файл

@ -30,63 +30,10 @@ Example Pipeline definition
Here is an example of a basic pipeline definition. Do not worry if this looks Here is an example of a basic pipeline definition. Do not worry if this looks
complicated, a line by line explanation follows below. complicated, a line by line explanation follows below.
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
""" :start-after: [START tutorial]
Code that goes along with the Airflow tutorial located at: :end-before: [END tutorial]
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
It's a DAG definition file It's a DAG definition file
-------------------------- --------------------------
@ -113,13 +60,10 @@ Importing Modules
An Airflow pipeline is just a Python script that happens to define an An Airflow pipeline is just a Python script that happens to define an
Airflow DAG object. Let's start by importing the libraries we will need. Airflow DAG object. Let's start by importing the libraries we will need.
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
# The DAG object; we'll need this to instantiate a DAG :start-after: [START import_module]
from airflow import DAG :end-before: [END import_module]
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
Default Arguments Default Arguments
----------------- -----------------
@ -128,24 +72,10 @@ explicitly pass a set of arguments to each task's constructor
(which would become redundant), or (better!) we can define a dictionary (which would become redundant), or (better!) we can define a dictionary
of default parameters that we can use when creating tasks. of default parameters that we can use when creating tasks.
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
from datetime import datetime, timedelta :start-after: [START default_args]
:end-before: [END default_args]
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
For more information about the BaseOperator's parameters and what they do, For more information about the BaseOperator's parameters and what they do,
refer to the :py:class:`airflow.models.BaseOperator` documentation. refer to the :py:class:`airflow.models.BaseOperator` documentation.
@ -163,10 +93,10 @@ that defines the ``dag_id``, which serves as a unique identifier for your DAG.
We also pass the default argument dictionary that we just defined and We also pass the default argument dictionary that we just defined and
define a ``schedule_interval`` of 1 day for the DAG. define a ``schedule_interval`` of 1 day for the DAG.
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
dag = DAG( :start-after: [START instantiate_dag]
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) :end-before: [END instantiate_dag]
Tasks Tasks
----- -----
@ -174,18 +104,10 @@ Tasks are generated when instantiating operator objects. An object
instantiated from an operator is called a constructor. The first argument instantiated from an operator is called a constructor. The first argument
``task_id`` acts as a unique identifier for the task. ``task_id`` acts as a unique identifier for the task.
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
t1 = BashOperator( :start-after: [START basic_task]
task_id='print_date', :end-before: [END basic_task]
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
Notice how we pass a mix of operator specific arguments (``bash_command``) and Notice how we pass a mix of operator specific arguments (``bash_command``) and
an argument common to all operators (``retries``) inherited an argument common to all operators (``retries``) inherited
@ -217,21 +139,10 @@ this feature exists, get you familiar with double curly brackets, and
point to the most common template variable: ``{{ ds }}`` (today's "date point to the most common template variable: ``{{ ds }}`` (today's "date
stamp"). stamp").
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
templated_command = """ :start-after: [START jinja_template]
{% for i in range(5) %} :end-before: [END jinja_template]
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks, Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks,
references parameters like ``{{ ds }}``, calls a function as in references parameters like ``{{ ds }}``, calls a function as in
@ -264,6 +175,17 @@ regarding custom filters have a look at the
For more information on the variables and macros that can be referenced For more information on the variables and macros that can be referenced
in templates, make sure to read through the :doc:`macros-ref` in templates, make sure to read through the :doc:`macros-ref`
Adding DAG and Tasks documentation
----------------------------------
We can add documentation for DAG or each single task. DAG documentation only support
markdown so far and task documentation support plain text, markdown, reStructuredText,
json, yaml
.. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
:start-after: [START documentation]
:end-before: [END documentation]
Setting up Dependencies Setting up Dependencies
----------------------- -----------------------
We have tasks ``t1``, ``t2`` and ``t3`` that do not depend on each other. Here's a few ways We have tasks ``t1``, ``t2`` and ``t3`` that do not depend on each other. Here's a few ways
@ -306,63 +228,10 @@ Recap
Alright, so we have a pretty basic DAG. At this point your code should look Alright, so we have a pretty basic DAG. At this point your code should look
something like this: something like this:
.. code:: python .. exampleinclude:: ../airflow/example_dags/tutorial.py
:language: python
""" :start-after: [START tutorial]
Code that goes along with the Airflow tutorial located at: :end-before: [END tutorial]
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
.. _testing: .. _testing: