This adds back the base lineage backend which can be extended to send lineage metadata to any custom backend.
closes: #14106

Co-authored-by: Joao Ponte <jpe@plista.com>
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
(cherry picked from commit af2d11e36e)
This commit is contained in:
João Ponte 2021-04-03 10:26:59 +02:00 коммит произвёл Ash Berlin-Taylor
Родитель def7133bd0
Коммит 08b632e669
4 изменённых файлов: 138 добавлений и 1 удалений

Просмотреть файл

@ -25,6 +25,8 @@ import attr
import jinja2
from cattr import structure, unstructure
from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend
from airflow.utils.module_loading import import_string
ENV = jinja2.Environment()
@ -45,6 +47,22 @@ class Metadata:
data: Dict = attr.ib()
def get_backend() -> Optional[LineageBackend]:
"""Gets the lineage backend if defined in the configs"""
clazz = conf.getimport("lineage", "backend", fallback=None)
if clazz:
if not issubclass(clazz, LineageBackend):
raise TypeError(
f"Your custom Lineage class `{clazz.__name__}` "
f"is not a subclass of `{LineageBackend.__name__}`."
)
else:
return clazz()
return None
def _get_instance(meta: Metadata):
"""Instantiate an object from Metadata"""
cls = import_string(meta.type_name)
@ -82,6 +100,7 @@ def apply_lineage(func: T) -> T:
Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
_backend = get_backend()
@wraps(func)
def wrapper(self, context, *args, **kwargs):
@ -101,6 +120,9 @@ def apply_lineage(func: T) -> T:
context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date
)
if _backend:
_backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)
return ret_val
return cast(T, wrapper)

Просмотреть файл

@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Sends lineage metadata to a backend"""
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator # pylint: disable=cyclic-import
class LineageBackend:
"""Sends lineage metadata to a backend"""
def send_lineage(
self,
operator: 'BaseOperator',
inlets: Optional[list] = None,
outlets: Optional[list] = None,
context: Optional[dict] = None,
):
"""
Sends lineage metadata to a backend
:param operator: the operator executing a transformation on the inlets and outlets
:type operator: airflow.models.baseoperator.BaseOperator
:param inlets: the inlets to this operator
:type inlets: list
:param outlets: the outlets from this operator
:type outlets: list
:param context: the current context of the task instance
:type context: dict
"""
raise NotImplementedError()

Просмотреть файл

@ -95,3 +95,24 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
f_in > run_this | (run_this_last > outlets)
.. _precedence: https://docs.python.org/3/reference/expressions.html
Lineage Backend
---------------
It's possible to push the lineage metrics to a custom backend by providing an instance of a LinageBackend in the config:
.. code-block:: ini
[lineage]
backend = my.lineage.CustomBackend
The backend should inherit from ``airflow.lineage.LineageBackend``.
.. code-block:: python
from airflow.lineage.backend import LineageBackend
class ExampleBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
# Send the info to some external service

Просмотреть файл

@ -16,16 +16,24 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from unittest import mock
from airflow.lineage import AUTO
from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage
from airflow.lineage.backend import LineageBackend
from airflow.lineage.entities import File
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from tests.test_utils.config import conf_vars
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class CustomLineageBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
pass
class TestLineage(unittest.TestCase):
def test_lineage(self):
dag = DAG(dag_id='test_prepare_lineage', start_date=DEFAULT_DATE)
@ -111,3 +119,42 @@ class TestLineage(unittest.TestCase):
op1.pre_execute(ctx1)
assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
@mock.patch("airflow.lineage.get_backend")
def test_lineage_is_sent_to_backend(self, mock_get_backend):
class TestBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
assert len(inlets) == 1
assert len(outlets) == 1
func = mock.Mock()
func.__name__ = 'foo'
mock_get_backend.return_value = TestBackend()
dag = DAG(dag_id='test_lineage_is_sent_to_backend', start_date=DEFAULT_DATE)
with dag:
op1 = DummyOperator(task_id='task1')
file1 = File("/tmp/some_file")
op1.inlets.append(file1)
op1.outlets.append(file1)
ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE), "execution_date": DEFAULT_DATE}
prep = prepare_lineage(func)
prep(op1, ctx1)
post = apply_lineage(func)
post(op1, ctx1)
def test_empty_lineage_backend(self):
backend = get_backend()
assert backend is None
@conf_vars({("lineage", "backend"): "tests.lineage.test_lineage.CustomLineageBackend"})
def test_resolve_lineage_class(self):
backend = get_backend()
assert issubclass(backend.__class__, LineageBackend)
assert isinstance(backend, CustomLineageBackend)