[AIRFLOW-6392] Remove cyclic dependency baseoperator <-> helpers (#6950)

There is a hidden cyclic dependency between baseoperator and helpers module.
It's hidden by local import but it is detected when baseoperator/helpers are
removed from pylint_todo.txt (and it's really there).

The dependency comes from BaseOperator using helpers and two helpers methods
(chain and cross_downstream) using BaseOperator. This can be solved by
converting the chain and cross_downstream methods to be static methods in
BaseOperator class.
This commit is contained in:
Jarek Potiuk 2019-12-31 17:39:13 +01:00 коммит произвёл Tomek Urbaszek
Родитель 505e5d940e
Коммит e9e0203ebc
7 изменённых файлов: 176 добавлений и 131 удалений

Просмотреть файл

@ -67,6 +67,34 @@ Now users instead of `import from airflow.utils.files import TemporaryDirectory`
do `from tempfile import TemporaryDirectory`. Both context managers provide the same
interface, thus no additional changes should be required.
### Chain and cross_downstream moved from helpers to BaseOperator
The `chain` and `cross_downstream` methods are now moved to airflow.models.baseoperator module from
`airflow.utils.helpers` module.
The baseoperator module seems to be a better choice to keep
closely coupled methods together. Helpers module is supposed to contain standalone helper methods
that can be imported by all classes.
The `chain` method and `cross_downstream` method both use BaseOperator. If any other package imports
any classes or functions from helpers module, then it automatically has an
implicit dependency to BaseOperator. That can often lead to cyclic dependencies.
More information in [AIFLOW-6392](https://issues.apache.org/jira/browse/AIRFLOW-6392)
In Airflow <2.0 you imported those two methods like this:
```python
from airflow.utils.helpers import chain
from airflow.utils.helpers import cross_downstream
```
In Airflow 2.0 it should be changed to:
```python
from airflow.models.baseoperator import chain
from airflow.models.baseoperator import cross_downstream
```
### Change python3 as Dataflow Hooks/Operators default interpreter
Now the `py_interpreter` argument for DataFlow Hooks/Operators has been changed from python2 to python3.

Просмотреть файл

@ -18,15 +18,15 @@
# under the License.
"""Example DAG demonstrating the usage of the ShortCircuitOperator."""
import airflow.utils.helpers
from airflow.models import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils import dates
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'start_date': dates.days_ago(2),
}
dag = DAG(dag_id='example_short_circuit_operator', default_args=args)
@ -46,5 +46,5 @@ cond_false = ShortCircuitOperator(
ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]
airflow.utils.helpers.chain(cond_true, *ds_true)
airflow.utils.helpers.chain(cond_false, *ds_false)
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

Просмотреть файл

@ -26,7 +26,9 @@ import sys
import warnings
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from typing import Any, Callable, ClassVar, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple, Type, Union
from typing import (
Any, Callable, ClassVar, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple, Type, Union, cast,
)
import attr
import jinja2
@ -670,6 +672,7 @@ class BaseOperator(Operator, LoggingMixin):
for k, v in self.__dict__.items():
if k not in shallow_copy:
# noinspection PyArgumentList
setattr(result, k, copy.deepcopy(v, memo))
else:
setattr(result, k, copy.copy(v))
@ -1117,6 +1120,96 @@ class BaseOperator(Operator, LoggingMixin):
return cls.__serialized_fields
def chain(*tasks: Union[BaseOperator, List[BaseOperator]]):
r"""
Given a number of tasks, builds a dependency chain.
Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
If you want to chain between two List[airflow.models.BaseOperator], have to
make sure they have same length.
.. code-block:: python
chain(t1, [t2, t3], [t4, t5], t6)
is equivalent to::
/ -> t2 -> t4 \
t1 -> t6
\ -> t3 -> t5 /
.. code-block:: python
t1.set_downstream(t2)
t1.set_downstream(t3)
t2.set_downstream(t4)
t3.set_downstream(t5)
t4.set_downstream(t6)
t5.set_downstream(t6)
:param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
:type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
"""
for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
if isinstance(up_task, BaseOperator):
up_task.set_downstream(down_task)
continue
if isinstance(down_task, BaseOperator):
down_task.set_upstream(up_task)
continue
if not isinstance(up_task, List) or not isinstance(down_task, List):
raise TypeError(
'Chain not supported between instances of {up_type} and {down_type}'.format(
up_type=type(up_task), down_type=type(down_task)))
up_task_list = cast(List[BaseOperator], up_task)
down_task_list = cast(List[BaseOperator], down_task)
if len(up_task_list) != len(down_task_list):
raise AirflowException(
f'Chain not supported different length Iterable '
f'but get {len(up_task_list)} and {len(down_task_list)}')
for up_t, down_t in zip(up_task_list, down_task_list):
up_t.set_downstream(down_t)
def cross_downstream(from_tasks: List[BaseOperator],
to_tasks: Union[BaseOperator, List[BaseOperator]]):
r"""
Set downstream dependencies for all tasks in from_tasks to all tasks in to_tasks.
.. code-block:: python
cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
is equivalent to::
t1 ---> t4
\ /
t2 -X -> t5
/ \
t3 ---> t6
.. code-block:: python
t1.set_downstream(t4)
t1.set_downstream(t5)
t1.set_downstream(t6)
t2.set_downstream(t4)
t2.set_downstream(t5)
t2.set_downstream(t6)
t3.set_downstream(t4)
t3.set_downstream(t5)
t3.set_downstream(t6)
:param from_tasks: List of tasks to start from.
:type from_tasks: List[airflow.models.BaseOperator]
:param to_tasks: List of tasks to set as downstream dependencies.
:type to_tasks: List[airflow.models.BaseOperator]
"""
for task in from_tasks:
task.set_downstream(to_tasks)
@attr.s(auto_attribs=True)
class BaseOperatorLink(metaclass=ABCMeta):
"""

Просмотреть файл

@ -31,13 +31,6 @@ from jinja2 import Template
from airflow.configuration import conf
from airflow.exceptions import AirflowException
try:
# Fix Python > 3.7 deprecation
from collections.abc import Iterable
except ImportError:
# Preserve Python < 3.3 compatibility
from collections import Iterable
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint(
@ -142,83 +135,6 @@ def as_flattened_list(iterable):
return [e for i in iterable for e in i]
def chain(*tasks):
r"""
Given a number of tasks, builds a dependency chain.
Support mix airflow.models.BaseOperator and List[airflow.models.BaseOperator].
If you want to chain between two List[airflow.models.BaseOperator], have to
make sure they have same length.
chain(t1, [t2, t3], [t4, t5], t6)
is equivalent to
/ -> t2 -> t4 \
t1 -> t6
\ -> t3 -> t5 /
t1.set_downstream(t2)
t1.set_downstream(t3)
t2.set_downstream(t4)
t3.set_downstream(t5)
t4.set_downstream(t6)
t5.set_downstream(t6)
:param tasks: List of tasks or List[airflow.models.BaseOperator] to set dependencies
:type tasks: List[airflow.models.BaseOperator] or airflow.models.BaseOperator
"""
from airflow.models.baseoperator import BaseOperator
for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
if isinstance(up_task, BaseOperator):
up_task.set_downstream(down_task)
elif isinstance(down_task, BaseOperator):
down_task.set_upstream(up_task)
else:
if not isinstance(up_task, Iterable) or not isinstance(down_task, Iterable):
raise TypeError(
'Chain not supported between instances of {up_type} and {down_type}'.format(
up_type=type(up_task), down_type=type(down_task)))
elif len(up_task) != len(down_task):
raise AirflowException(
'Chain not supported different length Iterable but get {up_len} and {down_len}'.format(
up_len=len(up_task), down_len=len(down_task)))
else:
for up, down in zip(up_task, down_task):
up.set_downstream(down)
def cross_downstream(from_tasks, to_tasks):
r"""
Set downstream dependencies for all tasks in from_tasks to all tasks in to_tasks.
E.g.: cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
Is equivalent to:
t1 --> t4
\ /
t2 -X> t5
/ \
t3 --> t6
t1.set_downstream(t4)
t1.set_downstream(t5)
t1.set_downstream(t6)
t2.set_downstream(t4)
t2.set_downstream(t5)
t2.set_downstream(t6)
t3.set_downstream(t4)
t3.set_downstream(t5)
t3.set_downstream(t6)
:param from_tasks: List of tasks to start from.
:type from_tasks: List[airflow.models.BaseOperator]
:param to_tasks: List of tasks to set as downstream dependencies.
:type to_tasks: List[airflow.models.BaseOperator]
"""
for task in from_tasks:
task.set_downstream(to_tasks)
def pprinttable(rows):
"""Returns a pretty ascii table from tuples

Просмотреть файл

@ -271,8 +271,12 @@ and equivalent to:
op1.set_downstream([op2, op3])
Relationship Helper
--------------------
Relationship Builders
---------------------
*Moved in Airflow 2.0*
In Airflow 2.0 those two methods moved from ``airflow.utils.helpers`` to ``airflow.models.baseoperator``.
``chain`` and ``cross_downstream`` function provide easier ways to set relationships
between operators in specific situation.

Просмотреть файл

@ -17,15 +17,17 @@
# specific language governing permissions and limitations
# under the License.
import datetime
import unittest
import uuid
from datetime import date, datetime
from unittest import mock
import jinja2
from parameterized import parameterized
from airflow.exceptions import AirflowException
from airflow.models import DAG
from airflow.models.baseoperator import chain, cross_downstream
from airflow.operators.dummy_operator import DummyOperator
from tests.models import DEFAULT_DATE
from tests.test_utils.mock_operators import MockNamedTuple, MockOperator
@ -81,8 +83,8 @@ class TestBaseOperator(unittest.TestCase):
{"foo": "bar"},
{"key_{{ foo }}_1": 1, "key_2": "bar_2"},
),
(datetime.date(2018, 12, 6), {"foo": "bar"}, datetime.date(2018, 12, 6)),
(datetime.datetime(2018, 12, 6, 10, 55), {"foo": "bar"}, datetime.datetime(2018, 12, 6, 10, 55)),
(date(2018, 12, 6), {"foo": "bar"}, date(2018, 12, 6)),
(datetime(2018, 12, 6, 10, 55), {"foo": "bar"}, datetime(2018, 12, 6, 10, 55)),
(MockNamedTuple("{{ foo }}_1", "{{ foo }}_2"), {"foo": "bar"}, MockNamedTuple("bar_1", "bar_2")),
({"{{ foo }}_1", "{{ foo }}_2"}, {"foo": "bar"}, {"bar_1", "bar_2"}),
(None, {}, None),
@ -245,3 +247,41 @@ class TestBaseOperator(unittest.TestCase):
task = DummyOperator(task_id="custom-resources", resources={"cpus": 1, "ram": 1024})
self.assertEqual(task.resources.cpus.qty, 1)
self.assertEqual(task.resources.ram.qty, 1024)
class TestBaseOperatorMethods(unittest.TestCase):
def test_cross_downstream(self):
"""Test if all dependencies between tasks are all set correctly."""
dag = DAG(dag_id="test_dag", start_date=datetime.now())
start_tasks = [DummyOperator(task_id="t{i}".format(i=i), dag=dag) for i in range(1, 4)]
end_tasks = [DummyOperator(task_id="t{i}".format(i=i), dag=dag) for i in range(4, 7)]
cross_downstream(from_tasks=start_tasks, to_tasks=end_tasks)
for start_task in start_tasks:
self.assertCountEqual(start_task.get_direct_relatives(upstream=False), end_tasks)
def test_chain(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2, op3, op4, op5, op6] = [
DummyOperator(task_id='t{i}'.format(i=i), dag=dag)
for i in range(1, 7)
]
chain(op1, [op2, op3], [op4, op5], op6)
self.assertCountEqual([op2, op3], op1.get_direct_relatives(upstream=False))
self.assertEqual([op4], op2.get_direct_relatives(upstream=False))
self.assertEqual([op5], op3.get_direct_relatives(upstream=False))
self.assertCountEqual([op4, op5], op6.get_direct_relatives(upstream=True))
def test_chain_not_support_type(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2] = [DummyOperator(task_id='t{i}'.format(i=i), dag=dag) for i in range(1, 3)]
with self.assertRaises(TypeError):
# noinspection PyTypeChecker
chain([op1, op2], 1)
def test_chain_different_length_iterable(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2, op3, op4, op5] = [DummyOperator(task_id='t{i}'.format(i=i), dag=dag) for i in range(1, 6)]
with self.assertRaises(AirflowException):
chain([op1, op2], [op3, op4, op5])

Просмотреть файл

@ -28,7 +28,6 @@ from datetime import datetime
import psutil
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.models import TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import helpers
@ -173,41 +172,6 @@ class TestHelpers(unittest.TestCase):
as_tup = helpers.as_tuple(test_str)
self.assertTupleEqual((test_str,), as_tup)
def test_cross_downstream(self):
"""Test if all dependencies between tasks are all set correctly."""
dag = DAG(dag_id="test_dag", start_date=datetime.now())
start_tasks = [DummyOperator(task_id="t{i}".format(i=i), dag=dag) for i in range(1, 4)]
end_tasks = [DummyOperator(task_id="t{i}".format(i=i), dag=dag) for i in range(4, 7)]
helpers.cross_downstream(from_tasks=start_tasks, to_tasks=end_tasks)
for start_task in start_tasks:
self.assertCountEqual(start_task.get_direct_relatives(upstream=False), end_tasks)
def test_chain(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2, op3, op4, op5, op6] = [
DummyOperator(task_id='t{i}'.format(i=i), dag=dag)
for i in range(1, 7)
]
helpers.chain(op1, [op2, op3], [op4, op5], op6)
self.assertCountEqual([op2, op3], op1.get_direct_relatives(upstream=False))
self.assertEqual([op4], op2.get_direct_relatives(upstream=False))
self.assertEqual([op5], op3.get_direct_relatives(upstream=False))
self.assertCountEqual([op4, op5], op6.get_direct_relatives(upstream=True))
def test_chain_not_support_type(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2] = [DummyOperator(task_id='t{i}'.format(i=i), dag=dag) for i in range(1, 3)]
with self.assertRaises(TypeError):
helpers.chain([op1, op2], 1)
def test_chain_different_length_iterable(self):
dag = DAG(dag_id='test_chain', start_date=datetime.now())
[op1, op2, op3, op4, op5] = [DummyOperator(task_id='t{i}'.format(i=i), dag=dag) for i in range(1, 6)]
with self.assertRaises(AirflowException):
helpers.chain([op1, op2], [op3, op4, op5])
def test_convert_camel_to_snake(self):
self.assertEqual(helpers.convert_camel_to_snake('LocalTaskJob'), 'local_task_job')
self.assertEqual(helpers.convert_camel_to_snake('somethingVeryRandom'),