114 строки
4.0 KiB
ReStructuredText
114 строки
4.0 KiB
ReStructuredText
.. Licensed to the Apache Software Foundation (ASF) under one
|
|
or more contributor license agreements. See the NOTICE file
|
|
distributed with this work for additional information
|
|
regarding copyright ownership. The ASF licenses this file
|
|
to you under the Apache License, Version 2.0 (the
|
|
"License"); you may not use this file except in compliance
|
|
with the License. You may obtain a copy of the License at
|
|
|
|
.. http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
.. Unless required by applicable law or agreed to in writing,
|
|
software distributed under the License is distributed on an
|
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
KIND, either express or implied. See the License for the
|
|
specific language governing permissions and limitations
|
|
under the License.
|
|
|
|
|
|
|
|
Lineage
|
|
=======
|
|
|
|
.. note:: Lineage support is very experimental and subject to change.
|
|
|
|
Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having
|
|
audit trails and data governance, but also debugging of data flows.
|
|
|
|
Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it
|
|
works.
|
|
|
|
.. code:: python
|
|
|
|
from airflow.operators.bash_operator import BashOperator
|
|
from airflow.operators.dummy_operator import DummyOperator
|
|
from airflow.lineage.datasets import File
|
|
from airflow.models import DAG
|
|
from datetime import timedelta
|
|
|
|
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
|
|
|
|
args = {
|
|
'owner': 'Airflow',
|
|
'start_date': airflow.utils.dates.days_ago(2)
|
|
}
|
|
|
|
dag = DAG(
|
|
dag_id='example_lineage', default_args=args,
|
|
schedule_interval='0 0 * * *',
|
|
dagrun_timeout=timedelta(minutes=60))
|
|
|
|
f_final = File("/tmp/final")
|
|
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
|
|
inlets={"auto": True},
|
|
outlets={"datasets": [f_final,]})
|
|
|
|
f_in = File("/tmp/whole_directory/")
|
|
outlets = []
|
|
for file in FILE_CATEGORIES:
|
|
f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
|
|
outlets.append(f_out)
|
|
run_this = BashOperator(
|
|
task_id='run_me_first', bash_command='echo 1', dag=dag,
|
|
inlets={"datasets": [f_in,]},
|
|
outlets={"datasets": outlets}
|
|
)
|
|
run_this.set_downstream(run_this_last)
|
|
|
|
|
|
Tasks take the parameters `inlets` and `outlets`.
|
|
|
|
Inlets can be manually defined by the following options:
|
|
|
|
- by a list of dataset ``{"datasets": [dataset1, dataset2]}``
|
|
|
|
- can be configured to look for outlets from upstream tasks ``{"task_ids": ["task_id1", "task_id2"]}``
|
|
|
|
- can be configured to pick up outlets from direct upstream tasks ``{"auto": True}``
|
|
|
|
- a combination of them
|
|
|
|
Outlets are defined as list of dataset ``{"datasets": [dataset1, dataset2]}``. Any fields for the dataset are templated with
|
|
the context when the task is being executed.
|
|
|
|
.. note:: Operators can add inlets and outlets automatically if the operator supports it.
|
|
|
|
In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are
|
|
generated from a list. Note that `execution_date` is a templated field and will be rendered when the task is running.
|
|
|
|
.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the `pre_execute` method of a task. When the task
|
|
has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating
|
|
your own operators that override this method make sure to decorate your method with `prepare_lineage` and `apply_lineage`
|
|
respectively.
|
|
|
|
|
|
Apache Atlas
|
|
------------
|
|
|
|
Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it
|
|
properly, e.g. in your ``airflow.cfg``:
|
|
|
|
.. code:: python
|
|
|
|
[lineage]
|
|
backend = airflow.lineage.backend.atlas.AtlasBackend
|
|
|
|
[atlas]
|
|
username = my_username
|
|
password = my_password
|
|
host = host
|
|
port = 21000
|
|
|
|
|
|
Please make sure to have the `atlasclient` package installed.
|