[AIRFLOW-5911] Simplify lineage API and improve robustness (#6564)

This simplifies the lineage API which was needlessy cluttered.
You can now set "inlets='auto'" rather than "inlets={'auto': True}"
and airflow will figure out what to do.

Co-Authored-By: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-Authored-By: Kaxil Naik <kaxilnaik@gmail.com>
This commit is contained in:
bolkedebruin 2019-11-27 10:32:59 +01:00 коммит произвёл GitHub
Родитель 5d08c54f71
Коммит 1ef56df74a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 265 добавлений и 639 удалений

Просмотреть файл

@ -16,36 +16,67 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=missing-docstring
"""
Provides lineage support functions
"""
import json
from functools import wraps
from itertools import chain
from typing import Any, Dict, Optional
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.lineage.datasets import DataSet
import attr
import jinja2
from cattr import structure, unstructure
from airflow.models.base import Operator
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
ENV = jinja2.Environment()
PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"
AUTO = "auto"
log = LoggingMixin().log
def _get_backend():
backend = None
@attr.s(auto_attribs=True)
class Metadata:
"""
Class for serialized entities.
"""
type_name: str = attr.ib()
data: Dict = attr.ib()
try:
_backend_str = conf.get("lineage", "backend")
backend = import_string(_backend_str) # pylint:disable=protected-access
except ImportError as err:
log.debug("Cannot import %s due to %s", _backend_str, err) # pylint:disable=protected-access
except AirflowConfigException:
log.debug("Could not find lineage backend key in config")
return backend
def _get_instance(meta: Metadata):
"""
Instantiate an object from Metadata
"""
cls = import_string(meta.type_name)
return structure(meta.data, cls)
def _render_object(obj: Any, context) -> Any:
"""
Renders a attr annotated object. Will set non serializable attributes to none
"""
return structure(json.loads(ENV.from_string(
json.dumps(unstructure(obj), default=lambda o: None)
).render(**context).encode('utf-8')), type(obj))
def _to_dataset(obj: Any) -> Optional[Metadata]:
"""
Create Metadata from attr annotated object
"""
if not attr.has(obj):
return None
type_name = obj.__module__ + '.' + obj.__class__.__name__
data = unstructure(obj)
return Metadata(type_name, data)
def apply_lineage(func):
@ -53,16 +84,17 @@ def apply_lineage(func):
Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
backend = _get_backend()
@wraps(func)
def wrapper(self, context, *args, **kwargs):
self.log.debug("Backend: %s, Lineage called with inlets: %s, outlets: %s",
backend, self.inlets, self.outlets)
self.log.debug("Lineage called with inlets: %s, outlets: %s",
self.inlets, self.outlets)
ret_val = func(self, context, *args, **kwargs)
outlets = [x.as_dict() for x in self.outlets]
inlets = [x.as_dict() for x in self.inlets]
outlets = [unstructure(_to_dataset(x))
for x in self.outlets]
inlets = [unstructure(_to_dataset(x))
for x in self.inlets]
if self.outlets:
self.xcom_push(context,
@ -76,10 +108,6 @@ def apply_lineage(func):
value=inlets,
execution_date=context['ti'].execution_date)
if backend:
backend.send_lineage(operator=self, inlets=self.inlets,
outlets=self.outlets, context=context)
return ret_val
return wrapper
@ -92,52 +120,53 @@ def prepare_lineage(func):
* "auto" -> picks up any outlets from direct upstream tasks that have outlets defined, as such that
if A -> B -> C and B does not have outlets but A does, these are provided as inlets.
* "list of task_ids" -> picks up outlets from the upstream task_ids
* "list of datasets" -> manually defined list of DataSet
* "list of datasets" -> manually defined list of data
"""
# pylint:disable=protected-access
@wraps(func)
def wrapper(self, context, *args, **kwargs):
self.log.debug("Preparing lineage inlets and outlets")
task_ids = set(self._inlets['task_ids']).intersection( # pylint:disable=protected-access
self.get_flat_relative_ids(upstream=True)
)
if task_ids:
inlets = self.xcom_pull(context,
task_ids=task_ids,
dag_id=self.dag_id,
key=PIPELINE_OUTLETS)
inlets = [item for sublist in inlets if sublist for item in sublist]
inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
for i in inlets]
self.inlets.extend(inlets)
if isinstance(self._inlets, (str, Operator)) or attr.has(self._inlets):
self._inlets = [self._inlets, ]
if self._inlets['auto']: # pylint:disable=protected-access
# dont append twice
task_ids = set(self._inlets['task_ids']).symmetric_difference( # pylint:disable=protected-access
self.upstream_task_ids
)
inlets = self.xcom_pull(context,
task_ids=task_ids,
dag_id=self.dag_id,
key=PIPELINE_OUTLETS)
inlets = [item for sublist in inlets if sublist for item in sublist]
inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
for i in inlets]
self.inlets.extend(inlets)
if self._inlets and isinstance(self._inlets, list):
# get task_ids that are specified as parameter and make sure they are upstream
task_ids = set(
filter(lambda x: isinstance(x, str) and x.lower() != AUTO, self._inlets)
).union(
map(lambda op: op.task_id,
filter(lambda op: isinstance(op, Operator), self._inlets))
).intersection(self.get_flat_relative_ids(upstream=True))
if self._inlets['datasets']: # pylint:disable=protected-access
self.inlets.extend(self._inlets['datasets']) # pylint:disable=protected-access
# pick up unique direct upstream task_ids if AUTO is specified
if AUTO.upper() in self._inlets or AUTO.lower() in self._inlets:
task_ids = task_ids.union(task_ids.symmetric_difference(self.upstream_task_ids))
# outlets
if self._outlets['datasets']: # pylint:disable=protected-access
self.outlets.extend(self._outlets['datasets']) # pylint:disable=protected-access
_inlets = self.xcom_pull(context, task_ids=task_ids,
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
# re-instantiate and render the obtained inlets
_inlets = [_get_instance(structure(item, Metadata))
for sublist in _inlets if sublist for item in sublist]
_inlets.extend([_render_object(i, context)
for i in self._inlets if attr.has(i)])
self.inlets.extend(_inlets)
elif self._inlets:
raise AttributeError("inlets is not a list, operator, string or attr annotated object")
if not isinstance(self._outlets, list):
self._outlets = [self._outlets, ]
_outlets = list(map(lambda i: _render_object(i, context),
filter(attr.has, self._outlets)))
self.outlets.extend(_outlets)
self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets)
for dataset in chain(self.inlets, self.outlets):
dataset.set_context(context)
return func(self, context, *args, **kwargs)
return wrapper

Просмотреть файл

@ -1,36 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Sends lineage metadata to a backend"""
class LineageBackend:
"""
Sends lineage metadata to a backend
"""
def send_lineage(self,
operator=None, inlets=None, outlets=None, context=None):
"""
Sends lineage metadata to a backend
:param operator: the operator executing a transformation on the inlets and outlets
:param inlets: the inlets to this operator
:param outlets: the outlets from this operator
:param context: the current context of the task instance
"""
raise NotImplementedError()

Просмотреть файл

@ -1,102 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# pylint:disable=missing-docstring
from atlasclient.client import Atlas
from atlasclient.exceptions import HttpError
from airflow.configuration import conf
from airflow.lineage import datasets
from airflow.lineage.backend import LineageBackend
from airflow.lineage.backend.atlas.typedefs import operator_typedef
from airflow.utils.timezone import convert_to_utc
SERIALIZED_DATE_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%fZ"
_username = conf.get("atlas", "username")
_password = conf.get("atlas", "password")
_port = conf.get("atlas", "port")
_host = conf.get("atlas", "host")
class AtlasBackend(LineageBackend): # pylint:disable=missing-docstring
@staticmethod
def send_lineage(operator, inlets, outlets, context): # pylint:disable=signature-differs
client = Atlas(_host, port=_port, username=_username, password=_password)
try:
client.typedefs.create(data=operator_typedef)
except HttpError:
client.typedefs.update(data=operator_typedef)
_execution_date = convert_to_utc(context['ti'].execution_date)
_start_date = convert_to_utc(context['ti'].start_date)
_end_date = convert_to_utc(context['ti'].end_date)
inlet_list = []
if inlets:
for entity in inlets:
if entity is None:
continue
entity.set_context(context)
client.entity_post.create(data={"entity": entity.as_dict()})
inlet_list.append({"typeName": entity.type_name,
"uniqueAttributes": {
"qualifiedName": entity.qualified_name
}})
outlet_list = []
if outlets:
for entity in outlets:
if not entity:
continue
entity.set_context(context)
client.entity_post.create(data={"entity": entity.as_dict()})
outlet_list.append({"typeName": entity.type_name,
"uniqueAttributes": {
"qualifiedName": entity.qualified_name
}})
operator_name = operator.__class__.__name__
name = "{} {} ({})".format(operator.dag_id, operator.task_id, operator_name)
qualified_name = "{}_{}_{}@{}".format(operator.dag_id,
operator.task_id,
_execution_date,
operator_name)
data = {
"dag_id": operator.dag_id,
"task_id": operator.task_id,
"execution_date": _execution_date.strftime(SERIALIZED_DATE_FORMAT_STR),
"name": name,
"inputs": inlet_list,
"outputs": outlet_list,
"command": operator.lineage_data,
}
if _start_date:
data["start_date"] = _start_date.strftime(SERIALIZED_DATE_FORMAT_STR)
if _end_date:
data["end_date"] = _end_date.strftime(SERIALIZED_DATE_FORMAT_STR)
process = datasets.Operator(qualified_name=qualified_name, data=data)
client.entity_post.create(data={"entity": process.as_dict()})

Просмотреть файл

@ -1,110 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
operator_typedef = {
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"superTypes": [
"Process"
],
"name": "airflow_operator",
"description": "Airflow Operator",
"createdBy": "airflow",
"updatedBy": "airflow",
"attributeDefs": [
# "name" will be set to Operator name
# "qualifiedName" will be set to dag_id_task_id@operator_name
{
"name": "dag_id",
"isOptional": False,
"isUnique": False,
"isIndexable": True,
"typeName": "string",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "task_id",
"isOptional": False,
"isUnique": False,
"isIndexable": True,
"typeName": "string",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "command",
"isOptional": True,
"isUnique": False,
"isIndexable": False,
"typeName": "string",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "conn_id",
"isOptional": True,
"isUnique": False,
"isIndexable": False,
"typeName": "string",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "execution_date",
"isOptional": False,
"isUnique": False,
"isIndexable": True,
"typeName": "date",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "start_date",
"isOptional": True,
"isUnique": False,
"isIndexable": False,
"typeName": "date",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
{
"name": "end_date",
"isOptional": True,
"isUnique": False,
"isIndexable": False,
"typeName": "date",
"valuesMaxCount": 1,
"cardinality": "SINGLE",
"valuesMinCount": 0
},
],
},
],
}

Просмотреть файл

@ -1,144 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from typing import List
from jinja2 import Environment
def _inherited(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in _inherited(c)]
)
class DataSet:
attributes = [] # type: List[str]
type_name = "dataSet"
def __init__(self, qualified_name=None, data=None, **kwargs):
self._qualified_name = qualified_name
self.context = None
self._data = dict()
self._data.update({key: value for key, value in kwargs.items() if key in set(self.attributes)})
if data:
if "qualifiedName" in data:
self._qualified_name = data.pop("qualifiedName")
self._data = {key: value for key, value in data.items() if key in set(self.attributes)}
def set_context(self, context):
self.context = context
@property
def qualified_name(self):
if self.context:
env = Environment()
return env.from_string(self._qualified_name).render(**self.context)
return self._qualified_name
def __getattr__(self, attr):
if attr in self.attributes:
if self.context:
env = Environment()
# dump to json here in order to be able to manage dicts and lists
rendered = env.from_string(
json.dumps(self._data.get(attr))
).render(**self.context)
return json.loads(rendered)
return self._data.get(attr)
raise AttributeError(attr)
def __getitem__(self, item):
return self.__getattr__(item)
def __iter__(self):
yield from self._data.items()
def as_dict(self):
attributes = dict(self._data)
attributes.update({"qualifiedName": self.qualified_name})
env = Environment()
if self.context:
for key, value in attributes.items():
attributes[key] = json.loads(
env.from_string(json.dumps(value)).render(**self.context)
)
d = {
"typeName": self.type_name,
"attributes": attributes,
}
return d
@staticmethod
def map_type(name):
for cls in _inherited(DataSet):
if cls.type_name == name:
return cls
raise NotImplementedError("No known mapping for {}".format(name))
class DataBase(DataSet):
type_name = "dbStore"
attributes = ["dbStoreType", "storeUse", "source", "description", "userName",
"storeUri", "operation", "startTime", "endTime", "commandlineOpts",
"attribute_db"]
class File(DataSet):
type_name = "fs_path"
attributes = ["name", "path", "isFile", "isSymlink"]
def __init__(self, name=None, data=None):
super().__init__(name=name, data=data)
self._qualified_name = 'file://' + self.name
self._data['path'] = self.name
class HadoopFile(File):
cluster_name = "none"
attributes = ["name", "path", "clusterName"]
type_name = "hdfs_file"
def __init__(self, name=None, data=None):
super().__init__(name=name, data=data)
self._qualified_name = "{}@{}".format(self.name, self.cluster_name)
self._data['path'] = self.name
self._data['clusterName'] = self.cluster_name
class Operator(DataSet):
type_name = "airflow_operator"
# todo we can derive this from the spec
attributes = ["dag_id", "task_id", "command", "conn_id", "name", "execution_date",
"start_date", "end_date", "inputs", "outputs"]

105
airflow/lineage/entities.py Normal file
Просмотреть файл

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Defines the base entities that can be used for providing lineage
information.
"""
from typing import Any, Dict, List, Optional
import attr
@attr.s(auto_attribs=True)
class File:
"""
File entity. Refers to a file
"""
url: str = attr.ib()
type_hint: Optional[str] = None
@attr.s(auto_attribs=True, kw_only=True)
class User:
"""
User entity. Identifies a user
"""
email: str = attr.ib()
first_name: Optional[str] = None
last_name: Optional[str] = None
@attr.s(auto_attribs=True, kw_only=True)
class Tag:
"""
Tag or classification entity.
"""
tag_name: str = attr.ib()
@attr.s(auto_attribs=True, kw_only=True)
class Column:
"""
Column of a Table
"""
name: str = attr.ib()
description: Optional[str] = None
data_type: str = attr.ib()
tags: List[Tag] = []
# this is a temporary hack to satisfy mypy. Once
# https://github.com/python/mypy/issues/6136 is resolved, use
# `attr.converters.default_if_none(default=False)`
# pylint: disable=missing-docstring
def default_if_none(arg: Optional[bool]) -> bool:
return arg or False
@attr.s(auto_attribs=True, kw_only=True)
class Table:
"""
Table entity
"""
database: str = attr.ib()
cluster: str = attr.ib()
name: str = attr.ib()
tags: List[Tag] = []
description: Optional[str] = None
columns: List[Column] = []
owners: List[User] = []
extra: Dict[str, Any] = {}
type_hint: Optional[str] = None

Просмотреть файл

@ -34,3 +34,8 @@ metadata = (
Base = declarative_base(metadata=metadata) # type: Any
ID_LEN = 250
# used for typing
class Operator:
pass

Просмотреть файл

@ -34,7 +34,8 @@ from dateutil.relativedelta import relativedelta
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DuplicateTaskIdFound
from airflow.lineage import DataSet, apply_lineage, prepare_lineage
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.base import Operator
from airflow.models.pool import Pool
# noinspection PyPep8Naming
from airflow.models.taskinstance import TaskInstance, clear_task_instances
@ -56,7 +57,7 @@ ScheduleInterval = Union[str, timedelta, relativedelta]
# pylint: disable=too-many-instance-attributes,too-many-public-methods
@functools.total_ordering
class BaseOperator(LoggingMixin):
class BaseOperator(Operator, LoggingMixin):
"""
Abstract base class for all operators. Since operators create objects that
become nodes in the dag, BaseOperator contains many recursive methods for
@ -312,8 +313,8 @@ class BaseOperator(LoggingMixin):
task_concurrency: Optional[int] = None,
executor_config: Optional[Dict] = None,
do_xcom_push: bool = True,
inlets: Optional[Dict] = None,
outlets: Optional[Dict] = None,
inlets: Optional[Any] = None,
outlets: Optional[Any] = None,
*args,
**kwargs
):
@ -406,26 +407,18 @@ class BaseOperator(LoggingMixin):
self._log = logging.getLogger("airflow.task.operators")
# lineage
self.inlets: List[DataSet] = []
self.outlets: List[DataSet] = []
self.lineage_data = None
# Lineage
self.inlets: List = []
self.outlets: List = []
self._inlets = {
"auto": False,
"task_ids": [],
"datasets": [],
}
self._outlets: Dict[str, Iterable] = {
"datasets": [],
}
self._inlets: List = []
self._outlets: List = []
if inlets:
self._inlets.update(inlets)
self._inlets = inlets if isinstance(inlets, list) else [inlets, ]
if outlets:
self._outlets.update(outlets)
self._outlets = outlets if isinstance(outlets, list) else [outlets, ]
def __eq__(self, other):
if type(self) is type(other) and self.task_id == other.task_id:

Просмотреть файл

@ -16,18 +16,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict
from typing import Dict, Optional
import attr
import papermill as pm
from airflow.lineage.datasets import DataSet
from airflow.lineage.entities import File
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class NoteBook(DataSet):
type_name = "jupyter_notebook"
attributes = ['location', 'parameters']
@attr.s(auto_attribs=True)
class NoteBook(File):
type_hint: Optional[str] = "jupyter_notebook"
parameters: Dict = {}
meta_schema: str = __name__ + '.NoteBook'
class PapermillOperator(BaseOperator):
@ -49,14 +53,12 @@ class PapermillOperator(BaseOperator):
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.inlets.append(NoteBook(qualified_name=input_nb,
location=input_nb,
self.inlets.append(NoteBook(url=input_nb,
parameters=parameters))
self.outlets.append(NoteBook(qualified_name=output_nb,
location=output_nb))
self.outlets.append(NoteBook(url=output_nb))
def execute(self, context):
for i in range(len(self.inlets)):
pm.execute_notebook(self.inlets[i].location, self.outlets[i].location,
pm.execute_notebook(self.inlets[i].url, self.outlets[i].url,
parameters=self.inlets[i].parameters,
progress_bar=False, report_mode=True)

Просмотреть файл

@ -32,7 +32,8 @@ works.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from datetime import timedelta
@ -50,19 +51,20 @@ works.
f_final = File("/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
inlets={"auto": True},
outlets={"datasets": [f_final,]})
inlets=AUTO,
outlets=f_final)
f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(
task_id='run_me_first', bash_command='echo 1', dag=dag,
inlets={"datasets": [f_in,]},
outlets={"datasets": outlets}
)
inlets=f_in,
outlets=outlets
)
run_this.set_downstream(run_this_last)
@ -70,20 +72,22 @@ Tasks take the parameters ``inlets`` and ``outlets``.
Inlets can be manually defined by the following options:
- by a list of dataset ``{"datasets": [dataset1, dataset2]}``
- by a list of ``airflow.lineage.entities.Dataset`` or a subclass
- can be configured to look for outlets from upstream tasks ``{"task_ids": ["task_id1", "task_id2"]}``
- can be configured to look for outlets from upstream tasks
- can be configured to pick up outlets from direct upstream tasks ``{"auto": True}``
- can be configured to pick up outlets from direct upstream tasks 'AUTO'
- a combination of them
Outlets are defined as list of dataset ``{"datasets": [dataset1, dataset2]}``. Any fields for the dataset are templated with
the context when the task is being executed.
Outlets are defined as list of datasets ``[dataset1, dataset2]``.
All fields for datasets are templated with the context when the task is being executed. They will be templated during
execution of the originating task and willnot be re-templated if picked up downstream.
.. note:: Operators can add inlets and outlets automatically if the operator supports it.
In the example DAG task ``run_me_first`` is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are
In the example DAG task ``run_this``(task_id=``run_me_first``) is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are
generated from a list. Note that ``execution_date`` is a templated field and will be rendered when the task is running.
.. note:: Behind the scenes Airflow prepares the lineage metadata as part of the ``pre_execute`` method of a task. When the task

Просмотреть файл

@ -358,7 +358,9 @@ def do_setup():
install_requires=[
'alembic>=1.0, <2.0',
'argcomplete~=1.10',
'attrs~=19.3',
'cached_property~=1.5',
'cattrs~=0.9',
'colorlog==4.0.2',
'croniter>=0.3.17, <0.4',
'cryptography>=0.9.3',

Просмотреть файл

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Просмотреть файл

@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from configparser import DuplicateSectionError
from airflow.configuration import AirflowConfigException, conf
from airflow.lineage.backend.atlas import AtlasBackend
from airflow.lineage.datasets import File
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import timezone
from tests.compat import mock
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class TestAtlas(unittest.TestCase):
def setUp(self):
try:
conf.add_section("atlas")
except AirflowConfigException:
pass
except DuplicateSectionError:
pass
conf.set("atlas", "username", "none")
conf.set("atlas", "password", "none")
conf.set("atlas", "host", "none")
conf.set("atlas", "port", "0")
self.atlas = AtlasBackend()
@mock.patch("airflow.lineage.backend.atlas.Atlas")
def test_lineage_send(self, atlas_mock):
td = mock.MagicMock()
en = mock.MagicMock()
atlas_mock.return_value = mock.Mock(typedefs=td, entity_post=en)
dag = DAG(
dag_id='test_prepare_lineage',
start_date=DEFAULT_DATE
)
f1 = File("/tmp/does_not_exist_1")
f2 = File("/tmp/does_not_exist_2")
inlets_d = [f1, ]
outlets_d = [f2, ]
with dag:
op1 = DummyOperator(task_id='leave1',
inlets={"datasets": inlets_d},
outlets={"datasets": outlets_d})
ctx = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)}
self.atlas.send_lineage(operator=op1, inlets=inlets_d,
outlets=outlets_d, context=ctx)
self.assertEqual(td.create.call_count, 1)
self.assertTrue(en.create.called)
self.assertEqual(len(en.mock_calls), 3)
# test can be broader

Просмотреть файл

@ -18,26 +18,18 @@
# under the License.
import unittest
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.lineage.datasets import File
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import timezone
from tests.compat import mock
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class TestLineage(unittest.TestCase):
@mock.patch("airflow.lineage._get_backend")
def test_lineage(self, _get_backend):
backend = mock.Mock()
send_mock = mock.Mock()
backend.send_lineage = send_mock
_get_backend.return_value = backend
def test_lineage(self):
dag = DAG(
dag_id='test_prepare_lineage',
start_date=DEFAULT_DATE
@ -49,32 +41,30 @@ class TestLineage(unittest.TestCase):
with dag:
op1 = DummyOperator(task_id='leave1',
inlets={"datasets": [f1, ]},
outlets={"datasets": [f2, ]})
inlets=f1,
outlets=[f2, ])
op2 = DummyOperator(task_id='leave2')
op3 = DummyOperator(task_id='upstream_level_1',
inlets={"auto": True},
outlets={"datasets": [f3, ]})
inlets=AUTO,
outlets=f3)
op4 = DummyOperator(task_id='upstream_level_2')
op5 = DummyOperator(task_id='upstream_level_3',
inlets={"task_ids": ["leave1", "upstream_level_1"]})
inlets=["leave1", "upstream_level_1"])
op1.set_downstream(op3)
op2.set_downstream(op3)
op3.set_downstream(op4)
op4.set_downstream(op5)
dag.clear()
ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE)}
ctx2 = {"ti": TI(task=op2, execution_date=DEFAULT_DATE)}
ctx3 = {"ti": TI(task=op3, execution_date=DEFAULT_DATE)}
ctx5 = {"ti": TI(task=op5, execution_date=DEFAULT_DATE)}
func = mock.Mock()
func.__name__ = 'foo'
# prepare with manual inlets and outlets
prep = prepare_lineage(func)
prep(op1, ctx1)
op1.pre_execute(ctx1)
self.assertEqual(len(op1.inlets), 1)
self.assertEqual(op1.inlets[0], f1)
@ -83,28 +73,19 @@ class TestLineage(unittest.TestCase):
self.assertEqual(op1.outlets[0], f2)
# post process with no backend
post = apply_lineage(func)
post(op1, ctx1)
self.assertEqual(send_mock.call_count, 1)
send_mock.reset_mock()
op1.post_execute(ctx1)
prep(op2, ctx2)
op2.pre_execute(ctx2)
self.assertEqual(len(op2.inlets), 0)
post(op2, ctx2)
self.assertEqual(send_mock.call_count, 1)
send_mock.reset_mock()
op2.post_execute(ctx2)
prep(op3, ctx3)
op3.pre_execute(ctx3)
self.assertEqual(len(op3.inlets), 1)
self.assertEqual(op3.inlets[0].qualified_name, f2.qualified_name)
post(op3, ctx3)
self.assertEqual(send_mock.call_count, 1)
send_mock.reset_mock()
self.assertEqual(op3.inlets[0].url, f2.url)
op3.post_execute(ctx3)
# skip 4
prep(op5, ctx5)
op5.pre_execute(ctx5)
self.assertEqual(len(op5.inlets), 2)
post(op5, ctx5)
self.assertEqual(send_mock.call_count, 1)
send_mock.reset_mock()
op5.post_execute(ctx5)

Просмотреть файл

@ -64,10 +64,8 @@ serialized_simple_dag_ground_truth = {
"retries": 1,
"retry_delay": 300.0,
"_downstream_task_ids": [],
"_inlets": {
"auto": False, "task_ids": [], "datasets": []
},
"_outlets": {"datasets": []},
"_inlets": [],
"_outlets": [],
"ui_color": "#fff",
"ui_fgcolor": "#000",
"template_fields": [],
@ -79,10 +77,8 @@ serialized_simple_dag_ground_truth = {
"retries": 1,
"retry_delay": 300.0,
"_downstream_task_ids": [],
"_inlets": {
"auto": False, "task_ids": [], "datasets": []
},
"_outlets": {"datasets": []},
"_inlets": [],
"_outlets": [],
"ui_color": "#fff",
"ui_fgcolor": "#000",
"template_fields": [],