[AIRFLOW-6341] Make tests/models pylint compatible (#6897)

This commit is contained in:
Tomek 2019-12-25 11:09:59 +01:00 коммит произвёл Jarek Potiuk
Родитель c1ecad8f69
Коммит 66604142bc
7 изменённых файлов: 379 добавлений и 373 удалений

Просмотреть файл

@ -277,12 +277,6 @@
./tests/lineage/backend/test_atlas.py
./tests/lineage/test_lineage.py
./tests/macros/test_hive.py
./tests/models/test_cleartasks.py
./tests/models/test_dag.py
./tests/models/test_dagbag.py
./tests/models/test_dagrun.py
./tests/models/test_pool.py
./tests/models/test_taskinstance.py
./tests/operators/test_bash_operator.py
./tests/operators/test_gcs_to_s3.py
./tests/operators/test_hive_operator.py

Просмотреть файл

@ -215,19 +215,19 @@ class TestClearTasks(unittest.TestCase):
def test_operator_clear(self):
dag = DAG('test_operator_clear', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
t1 = DummyOperator(task_id='bash_op', owner='test', dag=dag)
t2 = DummyOperator(task_id='dummy_op', owner='test', dag=dag, retries=1)
op1 = DummyOperator(task_id='bash_op', owner='test', dag=dag)
op2 = DummyOperator(task_id='dummy_op', owner='test', dag=dag, retries=1)
t2.set_upstream(t1)
op2.set_upstream(op1)
ti1 = TI(task=t1, execution_date=DEFAULT_DATE)
ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
ti2.run()
# Dependency not met
self.assertEqual(ti2.try_number, 1)
self.assertEqual(ti2.max_tries, 1)
t2.clear(upstream=True)
op2.clear(upstream=True)
ti1.run()
ti2.run()
self.assertEqual(ti1.try_number, 2)

Просмотреть файл

@ -56,7 +56,7 @@ class TestDag(unittest.TestCase):
a_index = i
if e.task_id == b:
b_index = i
return 0 <= a_index and 0 <= b_index and a_index < b_index
return 0 <= a_index < b_index
def test_params_not_passed_is_empty_dict(self):
"""
@ -302,11 +302,11 @@ class TestDag(unittest.TestCase):
task_id='stage{}.{}'.format(i, j), priority_weight=weight)
for j in range(0, width)] for i in range(0, depth)
]
for d, stage in enumerate(pipeline):
if d == 0:
for i, stage in enumerate(pipeline):
if i == 0:
continue
for current_task in stage:
for prev_task in pipeline[d - 1]:
for prev_task in pipeline[i - 1]:
current_task.set_upstream(prev_task)
for task in dag.task_dict.values():
@ -328,11 +328,11 @@ class TestDag(unittest.TestCase):
weight_rule=WeightRule.UPSTREAM)
for j in range(0, width)] for i in range(0, depth)
]
for d, stage in enumerate(pipeline):
if d == 0:
for i, stage in enumerate(pipeline):
if i == 0:
continue
for current_task in stage:
for prev_task in pipeline[d - 1]:
for prev_task in pipeline[i - 1]:
current_task.set_upstream(prev_task)
for task in dag.task_dict.values():
@ -354,11 +354,11 @@ class TestDag(unittest.TestCase):
weight_rule=WeightRule.ABSOLUTE)
for j in range(0, width)] for i in range(0, depth)
]
for d, stage in enumerate(pipeline):
if d == 0:
for i, stage in enumerate(pipeline):
if i == 0:
continue
for current_task in stage:
for prev_task in pipeline[d - 1]:
for prev_task in pipeline[i - 1]:
current_task.set_upstream(prev_task)
for task in dag.task_dict.values():
@ -482,7 +482,9 @@ class TestDag(unittest.TestCase):
self.assertEqual(task.test_field, ['{{ ds }}', 'some_string'])
def test_cycle(self):
def test_cycle(self): # pylint: disable=too-many-statements
# TODO: split this into many single tests
# test empty
dag = DAG(
'dag',
@ -498,7 +500,7 @@ class TestDag(unittest.TestCase):
default_args={'owner': 'owner1'})
with dag:
opA = DummyOperator(task_id='A')
op1 = DummyOperator(task_id='A')
self.assertFalse(dag.test_cycle())
@ -512,16 +514,16 @@ class TestDag(unittest.TestCase):
# B -> D
# E -> F
with dag:
opA = DummyOperator(task_id='A')
opB = DummyOperator(task_id='B')
opC = DummyOperator(task_id='C')
opD = DummyOperator(task_id='D')
opE = DummyOperator(task_id='E')
opF = DummyOperator(task_id='F')
opA.set_downstream(opB)
opB.set_downstream(opC)
opB.set_downstream(opD)
opE.set_downstream(opF)
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op5 = DummyOperator(task_id='E')
op6 = DummyOperator(task_id='F')
op1.set_downstream(op2)
op2.set_downstream(op3)
op2.set_downstream(op4)
op5.set_downstream(op6)
self.assertFalse(dag.test_cycle())
@ -533,8 +535,8 @@ class TestDag(unittest.TestCase):
# A -> A
with dag:
opA = DummyOperator(task_id='A')
opA.set_downstream(opA)
op1 = DummyOperator(task_id='A')
op1.set_downstream(op1)
with self.assertRaises(AirflowDagCycleException):
dag.test_cycle()
@ -547,16 +549,16 @@ class TestDag(unittest.TestCase):
# A -> B -> C -> D -> E -> E
with dag:
opA = DummyOperator(task_id='A')
opB = DummyOperator(task_id='B')
opC = DummyOperator(task_id='C')
opD = DummyOperator(task_id='D')
opE = DummyOperator(task_id='E')
opA.set_downstream(opB)
opB.set_downstream(opC)
opC.set_downstream(opD)
opD.set_downstream(opE)
opE.set_downstream(opE)
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op5 = DummyOperator(task_id='E')
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_downstream(op4)
op4.set_downstream(op5)
op5.set_downstream(op5)
with self.assertRaises(AirflowDagCycleException):
dag.test_cycle()
@ -569,16 +571,16 @@ class TestDag(unittest.TestCase):
# A -> B -> C -> D -> E -> A
with dag:
opA = DummyOperator(task_id='A')
opB = DummyOperator(task_id='B')
opC = DummyOperator(task_id='C')
opD = DummyOperator(task_id='D')
opE = DummyOperator(task_id='E')
opA.set_downstream(opB)
opB.set_downstream(opC)
opC.set_downstream(opD)
opD.set_downstream(opE)
opE.set_downstream(opA)
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op5 = DummyOperator(task_id='E')
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_downstream(op4)
op4.set_downstream(op5)
op5.set_downstream(op1)
with self.assertRaises(AirflowDagCycleException):
dag.test_cycle()
@ -592,18 +594,18 @@ class TestDag(unittest.TestCase):
# E-> A -> B -> F -> A
# -> C -> F
with dag:
opA = DummyOperator(task_id='A')
opB = DummyOperator(task_id='B')
opC = DummyOperator(task_id='C')
opD = DummyOperator(task_id='D')
opE = DummyOperator(task_id='E')
opF = DummyOperator(task_id='F')
opA.set_downstream(opB)
opA.set_downstream(opC)
opE.set_downstream(opA)
opC.set_downstream(opF)
opB.set_downstream(opF)
opF.set_downstream(opA)
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op5 = DummyOperator(task_id='E')
op6 = DummyOperator(task_id='F')
op1.set_downstream(op2)
op1.set_downstream(op3)
op5.set_downstream(op1)
op3.set_downstream(op6)
op2.set_downstream(op6)
op6.set_downstream(op1)
with self.assertRaises(AirflowDagCycleException):
dag.test_cycle()
@ -638,7 +640,7 @@ class TestDag(unittest.TestCase):
self.assertEqual(prev_local.isoformat(), "2018-10-28T02:55:00+02:00")
self.assertEqual(prev, utc)
def test_following_previous_schedule_daily_dag_CEST_to_CET(self):
def test_following_previous_schedule_daily_dag_cest_to_cet(self):
"""
Make sure DST transitions are properly observed
"""
@ -668,7 +670,7 @@ class TestDag(unittest.TestCase):
self.assertEqual(prev_local.isoformat(), "2018-10-27T03:00:00+02:00")
self.assertEqual(prev.isoformat(), "2018-10-27T01:00:00+00:00")
def test_following_previous_schedule_daily_dag_CET_to_CEST(self):
def test_following_previous_schedule_daily_dag_cet_to_cest(self):
"""
Make sure DST transitions are properly observed
"""
@ -879,34 +881,34 @@ class TestDag(unittest.TestCase):
def test_roots(self):
"""Verify if dag.roots returns the root tasks of a DAG."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t4 = DummyOperator(task_id="t4")
t5 = DummyOperator(task_id="t5")
[t1, t2] >> t3 >> [t4, t5]
op1 = DummyOperator(task_id="t1")
op2 = DummyOperator(task_id="t2")
op3 = DummyOperator(task_id="t3")
op4 = DummyOperator(task_id="t4")
op5 = DummyOperator(task_id="t5")
[op1, op2] >> op3 >> [op4, op5]
self.assertCountEqual(dag.roots, [t1, t2])
self.assertCountEqual(dag.roots, [op1, op2])
def test_leaves(self):
"""Verify if dag.leaves returns the leaf tasks of a DAG."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t4 = DummyOperator(task_id="t4")
t5 = DummyOperator(task_id="t5")
[t1, t2] >> t3 >> [t4, t5]
op1 = DummyOperator(task_id="t1")
op2 = DummyOperator(task_id="t2")
op3 = DummyOperator(task_id="t3")
op4 = DummyOperator(task_id="t4")
op5 = DummyOperator(task_id="t5")
[op1, op2] >> op3 >> [op4, op5]
self.assertCountEqual(dag.leaves, [t4, t5])
self.assertCountEqual(dag.leaves, [op4, op5])
def test_tree_view(self):
"""Verify correctness of dag.tree_view()."""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")
t1 >> t2 >> t3
op1 = DummyOperator(task_id="t1")
op2 = DummyOperator(task_id="t2")
op3 = DummyOperator(task_id="t3")
op1 >> op2 >> op3
with redirect_stdout(io.StringIO()) as stdout:
dag.tree_view()
@ -923,19 +925,19 @@ class TestDag(unittest.TestCase):
DuplicateTaskIdFound, "Task id 't1' has already been added to the DAG"
):
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id="t1")
t2 = BashOperator(task_id="t1", bash_command="sleep 1")
t1 >> t2
op1 = DummyOperator(task_id="t1")
op2 = BashOperator(task_id="t1", bash_command="sleep 1")
op1 >> op2
self.assertEqual(dag.task_dict, {t1.task_id: t1})
self.assertEqual(dag.task_dict, {op1.task_id: op1})
# Also verify that DAGs with duplicate task_ids don't raise errors
with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
t3 = DummyOperator(task_id="t3")
t4 = BashOperator(task_id="t4", bash_command="sleep 1")
t3 >> t4
op3 = DummyOperator(task_id="t3")
op4 = BashOperator(task_id="t4", bash_command="sleep 1")
op3 >> op4
self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})
def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
"""Verify tasks with Duplicate task_id raises error"""
@ -943,39 +945,39 @@ class TestDag(unittest.TestCase):
DuplicateTaskIdFound, "Task id 't1' has already been added to the DAG"
):
dag = DAG("test_dag", start_date=DEFAULT_DATE)
t1 = DummyOperator(task_id="t1", dag=dag)
t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
t1 >> t2
op1 = DummyOperator(task_id="t1", dag=dag)
op2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
op1 >> op2
self.assertEqual(dag.task_dict, {t1.task_id: t1})
self.assertEqual(dag.task_dict, {op1.task_id: op1})
# Also verify that DAGs with duplicate task_ids don't raise errors
dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
t3 = DummyOperator(task_id="t3", dag=dag1)
t4 = DummyOperator(task_id="t4", dag=dag1)
t3 >> t4
op3 = DummyOperator(task_id="t3", dag=dag1)
op4 = DummyOperator(task_id="t4", dag=dag1)
op3 >> op4
self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})
def test_duplicate_task_ids_for_same_task_is_allowed(self):
"""Verify that same tasks with Duplicate task_id do not raise error"""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = t2 = DummyOperator(task_id="t1")
t3 = DummyOperator(task_id="t3")
t1 >> t3
t2 >> t3
op1 = op2 = DummyOperator(task_id="t1")
op3 = DummyOperator(task_id="t3")
op1 >> op3
op2 >> op3
self.assertEqual(t1, t2)
self.assertEqual(dag.task_dict, {t1.task_id: t1, t3.task_id: t3})
self.assertEqual(dag.task_dict, {t2.task_id: t2, t3.task_id: t3})
self.assertEqual(op1, op2)
self.assertEqual(dag.task_dict, {op1.task_id: op1, op3.task_id: op3})
self.assertEqual(dag.task_dict, {op2.task_id: op2, op3.task_id: op3})
def test_sub_dag_updates_all_references_while_deepcopy(self):
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
t1 = DummyOperator(task_id='t1')
t2 = DummyOperator(task_id='t2')
t3 = DummyOperator(task_id='t3')
t1 >> t2
t2 >> t3
op1 = DummyOperator(task_id='t1')
op2 = DummyOperator(task_id='t2')
op3 = DummyOperator(task_id='t3')
op1 >> op2
op2 >> op3
sub_dag = dag.sub_dag('t2', include_upstream=True, include_downstream=False)
self.assertEqual(id(sub_dag.task_dict['t1'].downstream_list[0].dag), id(sub_dag))

Просмотреть файл

@ -84,10 +84,10 @@ class TestDagBag(unittest.TestCase):
"""With safe mode enabled, a file matching the discovery heuristics
should be discovered.
"""
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as fp:
fp.write(b"# airflow")
fp.write(b"# DAG")
fp.flush()
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as f:
f.write(b"# airflow")
f.write(b"# DAG")
f.flush()
with conf_vars({('core', 'dags_folder'): self.empty_dir}):
dagbag = models.DagBag(include_examples=False, safe_mode=True)
@ -95,7 +95,7 @@ class TestDagBag(unittest.TestCase):
self.assertEqual(len(dagbag.dagbag_stats), 1)
self.assertEqual(
dagbag.dagbag_stats[0].file,
"/{}".format(os.path.basename(fp.name)))
"/{}".format(os.path.basename(f.name)))
def test_safe_mode_heuristic_mismatch(self):
"""With safe mode enabled, a file not matching the discovery heuristics
@ -109,13 +109,13 @@ class TestDagBag(unittest.TestCase):
def test_safe_mode_disabled(self):
"""With safe mode disabled, an empty python file should be discovered.
"""
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as fp:
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as f:
with conf_vars({('core', 'dags_folder'): self.empty_dir}):
dagbag = models.DagBag(include_examples=False, safe_mode=False)
self.assertEqual(len(dagbag.dagbag_stats), 1)
self.assertEqual(
dagbag.dagbag_stats[0].file,
"/{}".format(os.path.basename(fp.name)))
"/{}".format(os.path.basename(f.name)))
def test_process_file_that_contains_multi_bytes_char(self):
"""
@ -160,8 +160,8 @@ class TestDagBag(unittest.TestCase):
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
self.assertEqual(len(dagbag.import_errors), 0)
for d in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, d))
for file in invalid_dag_files:
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file))
self.assertEqual(len(dagbag.import_errors), len(invalid_dag_files))
@patch.object(DagModel, 'get_current')
@ -176,15 +176,15 @@ class TestDagBag(unittest.TestCase):
mock_dagmodel.return_value.last_expired = None
mock_dagmodel.return_value.fileloc = 'foo'
class TestDagBag(models.DagBag):
class _TestDagBag(models.DagBag):
process_file_calls = 0
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if 'example_bash_operator.py' == os.path.basename(filepath):
TestDagBag.process_file_calls += 1
if os.path.basename(filepath) == 'example_bash_operator.py':
_TestDagBag.process_file_calls += 1
super().process_file(filepath, only_if_updated, safe_mode)
dagbag = TestDagBag(include_examples=True)
dagbag = _TestDagBag(include_examples=True)
dagbag.process_file_calls
# Should not call process_file again, since it's already loaded during init.
@ -216,11 +216,11 @@ class TestDagBag(unittest.TestCase):
"""
Test that we can refresh an ordinary .py DAG
"""
EXAMPLE_DAGS_FOLDER = airflow.example_dags.__path__[0]
example_dags_folder = airflow.example_dags.__path__[0]
dag_id = "example_bash_operator"
fileloc = os.path.realpath(
os.path.join(EXAMPLE_DAGS_FOLDER, "example_bash_operator.py")
os.path.join(example_dags_folder, "example_bash_operator.py")
)
mock_dagmodel.return_value = DagModel()
@ -229,15 +229,15 @@ class TestDagBag(unittest.TestCase):
)
mock_dagmodel.return_value.fileloc = fileloc
class TestDagBag(DagBag):
class _TestDagBag(DagBag):
process_file_calls = 0
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if filepath == fileloc:
TestDagBag.process_file_calls += 1
_TestDagBag.process_file_calls += 1
return super().process_file(filepath, only_if_updated, safe_mode)
dagbag = TestDagBag(dag_folder=self.empty_dir, include_examples=True)
dagbag = _TestDagBag(dag_folder=self.empty_dir, include_examples=True)
self.assertEqual(1, dagbag.process_file_calls)
dag = dagbag.get_dag(dag_id)
@ -261,15 +261,15 @@ class TestDagBag(unittest.TestCase):
)
mock_dagmodel.return_value.fileloc = fileloc
class TestDagBag(DagBag):
class _TestDagBag(DagBag):
process_file_calls = 0
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if filepath in fileloc:
TestDagBag.process_file_calls += 1
_TestDagBag.process_file_calls += 1
return super().process_file(filepath, only_if_updated, safe_mode)
dagbag = TestDagBag(dag_folder=os.path.realpath(TEST_DAGS_FOLDER), include_examples=False)
dagbag = _TestDagBag(dag_folder=os.path.realpath(TEST_DAGS_FOLDER), include_examples=False)
self.assertEqual(1, dagbag.process_file_calls)
dag = dagbag.get_dag(dag_id)
@ -318,15 +318,15 @@ class TestDagBag(unittest.TestCase):
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
import datetime
DAG_NAME = 'master'
DEFAULT_ARGS = {
import datetime # pylint: disable=redefined-outer-name,reimported
dag_name = 'master'
default_args = {
'owner': 'owner1',
'start_date': datetime.datetime(2016, 1, 1)
}
dag = DAG(
DAG_NAME,
default_args=DEFAULT_ARGS)
dag_name,
default_args=default_args)
# master:
# A -> opSubDag_0
@ -338,127 +338,127 @@ class TestDagBag(unittest.TestCase):
with dag:
def subdag_0():
subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
subdag_0 = DAG('master.op_subdag_0', default_args=default_args)
DummyOperator(task_id='subdag_0.task', dag=subdag_0)
return subdag_0
def subdag_1():
subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
subdag_1 = DAG('master.op_subdag_1', default_args=default_args)
DummyOperator(task_id='subdag_1.task', dag=subdag_1)
return subdag_1
opSubdag_0 = SubDagOperator(
task_id='opSubdag_0', dag=dag, subdag=subdag_0())
opSubdag_1 = SubDagOperator(
task_id='opSubdag_1', dag=dag, subdag=subdag_1())
op_subdag_0 = SubDagOperator(
task_id='op_subdag_0', dag=dag, subdag=subdag_0())
op_subdag_1 = SubDagOperator(
task_id='op_subdag_1', dag=dag, subdag=subdag_1())
opA = DummyOperator(task_id='A')
opA.set_downstream(opSubdag_0)
opA.set_downstream(opSubdag_1)
op_a = DummyOperator(task_id='A')
op_a.set_downstream(op_subdag_0)
op_a.set_downstream(op_subdag_1)
return dag
testDag = standard_subdag()
test_dag = standard_subdag()
# sanity check to make sure DAG.subdag is still functioning properly
self.assertEqual(len(testDag.subdags), 2)
self.assertEqual(len(test_dag.subdags), 2)
# Perform processing dag
dagbag, found_dags, _ = self.process_dag(standard_subdag)
# Validate correctness
# all dags from testDag should be listed
self.validate_dags(testDag, found_dags, dagbag)
# all dags from test_dag should be listed
self.validate_dags(test_dag, found_dags, dagbag)
# Define Dag to load
def nested_subdags():
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
import datetime
DAG_NAME = 'master'
DEFAULT_ARGS = {
import datetime # pylint: disable=redefined-outer-name,reimported
dag_name = 'master'
default_args = {
'owner': 'owner1',
'start_date': datetime.datetime(2016, 1, 1)
}
dag = DAG(
DAG_NAME,
default_args=DEFAULT_ARGS)
dag_name,
default_args=default_args)
# master:
# A -> opSubdag_0
# master.opSubdag_0:
# A -> op_subdag_0
# master.op_subdag_0:
# -> opSubDag_A
# master.opSubdag_0.opSubdag_A:
# -> subdag_A.task
# master.op_subdag_0.opSubdag_A:
# -> subdag_a.task
# -> opSubdag_B
# master.opSubdag_0.opSubdag_B:
# -> subdag_B.task
# A -> opSubdag_1
# master.opSubdag_1:
# master.op_subdag_0.opSubdag_B:
# -> subdag_b.task
# A -> op_subdag_1
# master.op_subdag_1:
# -> opSubdag_C
# master.opSubdag_1.opSubdag_C:
# -> subdag_C.task
# master.op_subdag_1.opSubdag_C:
# -> subdag_c.task
# -> opSubDag_D
# master.opSubdag_1.opSubdag_D:
# -> subdag_D.task
# master.op_subdag_1.opSubdag_D:
# -> subdag_d.task
with dag:
def subdag_A():
subdag_A = DAG(
'master.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_A.task', dag=subdag_A)
return subdag_A
def subdag_a():
subdag_a = DAG(
'master.op_subdag_0.opSubdag_A', default_args=default_args)
DummyOperator(task_id='subdag_a.task', dag=subdag_a)
return subdag_a
def subdag_B():
subdag_B = DAG(
'master.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_B.task', dag=subdag_B)
return subdag_B
def subdag_b():
subdag_b = DAG(
'master.op_subdag_0.opSubdag_B', default_args=default_args)
DummyOperator(task_id='subdag_b.task', dag=subdag_b)
return subdag_b
def subdag_C():
subdag_C = DAG(
'master.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_C.task', dag=subdag_C)
return subdag_C
def subdag_c():
subdag_c = DAG(
'master.op_subdag_1.opSubdag_C', default_args=default_args)
DummyOperator(task_id='subdag_c.task', dag=subdag_c)
return subdag_c
def subdag_D():
subdag_D = DAG(
'master.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_D.task', dag=subdag_D)
return subdag_D
def subdag_d():
subdag_d = DAG(
'master.op_subdag_1.opSubdag_D', default_args=default_args)
DummyOperator(task_id='subdag_d.task', dag=subdag_d)
return subdag_d
def subdag_0():
subdag_0 = DAG('master.opSubdag_0', default_args=DEFAULT_ARGS)
SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
subdag_0 = DAG('master.op_subdag_0', default_args=default_args)
SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_a())
SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_b())
return subdag_0
def subdag_1():
subdag_1 = DAG('master.opSubdag_1', default_args=DEFAULT_ARGS)
SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
subdag_1 = DAG('master.op_subdag_1', default_args=default_args)
SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_c())
SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_d())
return subdag_1
opSubdag_0 = SubDagOperator(
task_id='opSubdag_0', dag=dag, subdag=subdag_0())
opSubdag_1 = SubDagOperator(
task_id='opSubdag_1', dag=dag, subdag=subdag_1())
op_subdag_0 = SubDagOperator(
task_id='op_subdag_0', dag=dag, subdag=subdag_0())
op_subdag_1 = SubDagOperator(
task_id='op_subdag_1', dag=dag, subdag=subdag_1())
opA = DummyOperator(task_id='A')
opA.set_downstream(opSubdag_0)
opA.set_downstream(opSubdag_1)
op_a = DummyOperator(task_id='A')
op_a.set_downstream(op_subdag_0)
op_a.set_downstream(op_subdag_1)
return dag
testDag = nested_subdags()
test_dag = nested_subdags()
# sanity check to make sure DAG.subdag is still functioning properly
self.assertEqual(len(testDag.subdags), 6)
self.assertEqual(len(test_dag.subdags), 6)
# Perform processing dag
dagbag, found_dags, _ = self.process_dag(nested_subdags)
# Validate correctness
# all dags from testDag should be listed
self.validate_dags(testDag, found_dags, dagbag)
# all dags from test_dag should be listed
self.validate_dags(test_dag, found_dags, dagbag)
def test_skip_cycle_dags(self):
"""
@ -470,33 +470,33 @@ class TestDagBag(unittest.TestCase):
def basic_cycle():
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
import datetime
DAG_NAME = 'cycle_dag'
DEFAULT_ARGS = {
import datetime # pylint: disable=redefined-outer-name,reimported
dag_name = 'cycle_dag'
default_args = {
'owner': 'owner1',
'start_date': datetime.datetime(2016, 1, 1)
}
dag = DAG(
DAG_NAME,
default_args=DEFAULT_ARGS)
dag_name,
default_args=default_args)
# A -> A
with dag:
opA = DummyOperator(task_id='A')
opA.set_downstream(opA)
op_a = DummyOperator(task_id='A')
op_a.set_downstream(op_a)
return dag
testDag = basic_cycle()
test_dag = basic_cycle()
# sanity check to make sure DAG.subdag is still functioning properly
self.assertEqual(len(testDag.subdags), 0)
self.assertEqual(len(test_dag.subdags), 0)
# Perform processing dag
dagbag, found_dags, file_path = self.process_dag(basic_cycle)
# #Validate correctness
# None of the dags should be found
self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
self.validate_dags(test_dag, found_dags, dagbag, should_be_found=False)
self.assertIn(file_path, dagbag.import_errors)
# Define Dag to load
@ -504,95 +504,95 @@ class TestDagBag(unittest.TestCase):
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
import datetime
DAG_NAME = 'nested_cycle'
DEFAULT_ARGS = {
import datetime # pylint: disable=redefined-outer-name,reimported
dag_name = 'nested_cycle'
default_args = {
'owner': 'owner1',
'start_date': datetime.datetime(2016, 1, 1)
}
dag = DAG(
DAG_NAME,
default_args=DEFAULT_ARGS)
dag_name,
default_args=default_args)
# cycle:
# A -> opSubdag_0
# cycle.opSubdag_0:
# A -> op_subdag_0
# cycle.op_subdag_0:
# -> opSubDag_A
# cycle.opSubdag_0.opSubdag_A:
# -> subdag_A.task
# cycle.op_subdag_0.opSubdag_A:
# -> subdag_a.task
# -> opSubdag_B
# cycle.opSubdag_0.opSubdag_B:
# -> subdag_B.task
# A -> opSubdag_1
# cycle.opSubdag_1:
# cycle.op_subdag_0.opSubdag_B:
# -> subdag_b.task
# A -> op_subdag_1
# cycle.op_subdag_1:
# -> opSubdag_C
# cycle.opSubdag_1.opSubdag_C:
# -> subdag_C.task -> subdag_C.task >Invalid Loop<
# cycle.op_subdag_1.opSubdag_C:
# -> subdag_c.task -> subdag_c.task >Invalid Loop<
# -> opSubDag_D
# cycle.opSubdag_1.opSubdag_D:
# -> subdag_D.task
# cycle.op_subdag_1.opSubdag_D:
# -> subdag_d.task
with dag:
def subdag_A():
subdag_A = DAG(
'nested_cycle.opSubdag_0.opSubdag_A', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_A.task', dag=subdag_A)
return subdag_A
def subdag_a():
subdag_a = DAG(
'nested_cycle.op_subdag_0.opSubdag_A', default_args=default_args)
DummyOperator(task_id='subdag_a.task', dag=subdag_a)
return subdag_a
def subdag_B():
subdag_B = DAG(
'nested_cycle.opSubdag_0.opSubdag_B', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_B.task', dag=subdag_B)
return subdag_B
def subdag_b():
subdag_b = DAG(
'nested_cycle.op_subdag_0.opSubdag_B', default_args=default_args)
DummyOperator(task_id='subdag_b.task', dag=subdag_b)
return subdag_b
def subdag_C():
subdag_C = DAG(
'nested_cycle.opSubdag_1.opSubdag_C', default_args=DEFAULT_ARGS)
opSubdag_C_task = DummyOperator(
task_id='subdag_C.task', dag=subdag_C)
def subdag_c():
subdag_c = DAG(
'nested_cycle.op_subdag_1.opSubdag_C', default_args=default_args)
op_subdag_c_task = DummyOperator(
task_id='subdag_c.task', dag=subdag_c)
# introduce a loop in opSubdag_C
opSubdag_C_task.set_downstream(opSubdag_C_task)
return subdag_C
op_subdag_c_task.set_downstream(op_subdag_c_task)
return subdag_c
def subdag_D():
subdag_D = DAG(
'nested_cycle.opSubdag_1.opSubdag_D', default_args=DEFAULT_ARGS)
DummyOperator(task_id='subdag_D.task', dag=subdag_D)
return subdag_D
def subdag_d():
subdag_d = DAG(
'nested_cycle.op_subdag_1.opSubdag_D', default_args=default_args)
DummyOperator(task_id='subdag_d.task', dag=subdag_d)
return subdag_d
def subdag_0():
subdag_0 = DAG('nested_cycle.opSubdag_0', default_args=DEFAULT_ARGS)
SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_A())
SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_B())
subdag_0 = DAG('nested_cycle.op_subdag_0', default_args=default_args)
SubDagOperator(task_id='opSubdag_A', dag=subdag_0, subdag=subdag_a())
SubDagOperator(task_id='opSubdag_B', dag=subdag_0, subdag=subdag_b())
return subdag_0
def subdag_1():
subdag_1 = DAG('nested_cycle.opSubdag_1', default_args=DEFAULT_ARGS)
SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_C())
SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_D())
subdag_1 = DAG('nested_cycle.op_subdag_1', default_args=default_args)
SubDagOperator(task_id='opSubdag_C', dag=subdag_1, subdag=subdag_c())
SubDagOperator(task_id='opSubdag_D', dag=subdag_1, subdag=subdag_d())
return subdag_1
opSubdag_0 = SubDagOperator(
task_id='opSubdag_0', dag=dag, subdag=subdag_0())
opSubdag_1 = SubDagOperator(
task_id='opSubdag_1', dag=dag, subdag=subdag_1())
op_subdag_0 = SubDagOperator(
task_id='op_subdag_0', dag=dag, subdag=subdag_0())
op_subdag_1 = SubDagOperator(
task_id='op_subdag_1', dag=dag, subdag=subdag_1())
opA = DummyOperator(task_id='A')
opA.set_downstream(opSubdag_0)
opA.set_downstream(opSubdag_1)
op_a = DummyOperator(task_id='A')
op_a.set_downstream(op_subdag_0)
op_a.set_downstream(op_subdag_1)
return dag
testDag = nested_subdag_cycle()
test_dag = nested_subdag_cycle()
# sanity check to make sure DAG.subdag is still functioning properly
self.assertEqual(len(testDag.subdags), 6)
self.assertEqual(len(test_dag.subdags), 6)
# Perform processing dag
dagbag, found_dags, file_path = self.process_dag(nested_subdag_cycle)
# Validate correctness
# None of the dags should be found
self.validate_dags(testDag, found_dags, dagbag, should_be_found=False)
self.validate_dags(test_dag, found_dags, dagbag, should_be_found=False)
self.assertIn(file_path, dagbag.import_errors)
def test_process_file_with_none(self):

Просмотреть файл

@ -56,9 +56,9 @@ class TestDagRun(unittest.TestCase):
if task_states is not None:
session = settings.Session()
for task_id, state in task_states.items():
for task_id, task_state in task_states.items():
ti = dag_run.get_task_instance(task_id)
ti.set_state(state, session)
ti.set_state(task_state, session)
session.close()
return dag_run

Просмотреть файл

@ -46,10 +46,10 @@ class TestPool(unittest.TestCase):
dag = DAG(
dag_id='test_open_slots',
start_date=DEFAULT_DATE, )
t1 = DummyOperator(task_id='dummy1', dag=dag, pool='test_pool')
t2 = DummyOperator(task_id='dummy2', dag=dag, pool='test_pool')
ti1 = TI(task=t1, execution_date=DEFAULT_DATE)
ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='dummy1', dag=dag, pool='test_pool')
op2 = DummyOperator(task_id='dummy2', dag=dag, pool='test_pool')
ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
ti1.state = State.RUNNING
ti2.state = State.QUEUED
@ -60,20 +60,20 @@ class TestPool(unittest.TestCase):
session.commit()
session.close()
self.assertEqual(3, pool.open_slots())
self.assertEqual(1, pool.used_slots())
self.assertEqual(1, pool.queued_slots())
self.assertEqual(2, pool.occupied_slots())
self.assertEqual(3, pool.open_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(1, pool.used_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(1, pool.queued_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(2, pool.occupied_slots()) # pylint: disable=no-value-for-parameter
def test_infinite_slots(self):
pool = Pool(pool='test_pool', slots=-1)
dag = DAG(
dag_id='test_infinite_slots',
start_date=DEFAULT_DATE, )
t1 = DummyOperator(task_id='dummy1', dag=dag, pool='test_pool')
t2 = DummyOperator(task_id='dummy2', dag=dag, pool='test_pool')
ti1 = TI(task=t1, execution_date=DEFAULT_DATE)
ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='dummy1', dag=dag, pool='test_pool')
op2 = DummyOperator(task_id='dummy2', dag=dag, pool='test_pool')
ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
ti1.state = State.RUNNING
ti2.state = State.QUEUED
@ -84,10 +84,10 @@ class TestPool(unittest.TestCase):
session.commit()
session.close()
self.assertEqual(float('inf'), pool.open_slots())
self.assertEqual(1, pool.used_slots())
self.assertEqual(1, pool.queued_slots())
self.assertEqual(2, pool.occupied_slots())
self.assertEqual(float('inf'), pool.open_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(1, pool.used_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(1, pool.queued_slots()) # pylint: disable=no-value-for-parameter
self.assertEqual(2, pool.occupied_slots()) # pylint: disable=no-value-for-parameter
def test_default_pool_open_slots(self):
set_default_pool_slots(5)
@ -96,10 +96,10 @@ class TestPool(unittest.TestCase):
dag = DAG(
dag_id='test_default_pool_open_slots',
start_date=DEFAULT_DATE, )
t1 = DummyOperator(task_id='dummy1', dag=dag)
t2 = DummyOperator(task_id='dummy2', dag=dag)
ti1 = TI(task=t1, execution_date=DEFAULT_DATE)
ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='dummy1', dag=dag)
op2 = DummyOperator(task_id='dummy2', dag=dag)
ti1 = TI(task=op1, execution_date=DEFAULT_DATE)
ti2 = TI(task=op2, execution_date=DEFAULT_DATE)
ti1.state = State.RUNNING
ti2.state = State.QUEUED

Просмотреть файл

@ -48,6 +48,30 @@ from tests.models import DEFAULT_DATE
from tests.test_utils import db
class CallbackWrapper:
task_id = None
dag_id = None
execution_date = None
task_state_in_callback = None
callback_ran = False
def wrap_task_instance(self, ti):
self.task_id = ti.task_id
self.dag_id = ti.dag_id
self.execution_date = ti.execution_date
self.task_state_in_callback = ""
self.callback_ran = False
def success_handler(self, context): # pylint: disable=unused-argument
self.callback_ran = True
session = settings.Session()
temp_instance = session.query(TI).filter(
TI.task_id == self.task_id).filter(
TI.dag_id == self.dag_id).filter(
TI.execution_date == self.execution_date).one()
self.task_state_in_callback = temp_instance.state
class TestTaskInstance(unittest.TestCase):
def setUp(self):
@ -105,11 +129,11 @@ class TestTaskInstance(unittest.TestCase):
op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9))
def test_timezone_awareness(self):
NAIVE_DATETIME = DEFAULT_DATE.replace(tzinfo=None)
naive_datetime = DEFAULT_DATE.replace(tzinfo=None)
# check ti without dag (just for bw compat)
op_no_dag = DummyOperator(task_id='op_no_dag')
ti = TI(task=op_no_dag, execution_date=NAIVE_DATETIME)
ti = TI(task=op_no_dag, execution_date=naive_datetime)
self.assertEqual(ti.execution_date, DEFAULT_DATE)
@ -117,23 +141,23 @@ class TestTaskInstance(unittest.TestCase):
dag = DAG('dag', start_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='op_1')
dag.add_task(op1)
ti = TI(task=op1, execution_date=NAIVE_DATETIME)
ti = TI(task=op1, execution_date=naive_datetime)
self.assertEqual(ti.execution_date, DEFAULT_DATE)
# with dag and localized execution_date
tz = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tz)
tzinfo = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
utc_date = timezone.convert_to_utc(execution_date)
ti = TI(task=op1, execution_date=execution_date)
self.assertEqual(ti.execution_date, utc_date)
def test_task_naive_datetime(self):
NAIVE_DATETIME = DEFAULT_DATE.replace(tzinfo=None)
naive_datetime = DEFAULT_DATE.replace(tzinfo=None)
op_no_dag = DummyOperator(task_id='test_task_naive_datetime',
start_date=NAIVE_DATETIME,
end_date=NAIVE_DATETIME)
start_date=naive_datetime,
end_date=naive_datetime)
self.assertTrue(op_no_dag.start_date.tzinfo)
self.assertTrue(op_no_dag.end_date.tzinfo)
@ -521,30 +545,30 @@ class TestTaskInstance(unittest.TestCase):
task=task, execution_date=DEFAULT_DATE)
ti.end_date = pendulum.instance(timezone.utcnow())
dt = ti.next_retry_datetime()
date = ti.next_retry_datetime()
# between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
period = ti.end_date.add(seconds=30) - ti.end_date.add(seconds=15)
self.assertTrue(dt in period)
self.assertTrue(date in period)
ti.try_number = 3
dt = ti.next_retry_datetime()
date = ti.next_retry_datetime()
# between 30 * 2^2 and 30 * 2^3 (120 and 240)
period = ti.end_date.add(seconds=240) - ti.end_date.add(seconds=120)
self.assertTrue(dt in period)
self.assertTrue(date in period)
ti.try_number = 5
dt = ti.next_retry_datetime()
date = ti.next_retry_datetime()
# between 30 * 2^4 and 30 * 2^5 (480 and 960)
period = ti.end_date.add(seconds=960) - ti.end_date.add(seconds=480)
self.assertTrue(dt in period)
self.assertTrue(date in period)
ti.try_number = 9
dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date + max_delay)
date = ti.next_retry_datetime()
self.assertEqual(date, ti.end_date + max_delay)
ti.try_number = 50
dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date + max_delay)
date = ti.next_retry_datetime()
self.assertEqual(date, ti.end_date + max_delay)
def test_next_retry_datetime_short_intervals(self):
delay = datetime.timedelta(seconds=1)
@ -565,10 +589,10 @@ class TestTaskInstance(unittest.TestCase):
task=task, execution_date=DEFAULT_DATE)
ti.end_date = pendulum.instance(timezone.utcnow())
dt = ti.next_retry_datetime()
date = ti.next_retry_datetime()
# between 1 * 2^0.5 and 1 * 2^1 (15 and 30)
period = ti.end_date.add(seconds=1) - ti.end_date.add(seconds=15)
self.assertTrue(dt in period)
self.assertTrue(date in period)
@patch.object(TI, 'pool_full')
def test_reschedule_handling(self, mock_pool_full):
@ -579,7 +603,7 @@ class TestTaskInstance(unittest.TestCase):
done = False
fail = False
def callable():
def func():
if fail:
raise AirflowException()
return done
@ -589,7 +613,7 @@ class TestTaskInstance(unittest.TestCase):
task_id='test_reschedule_handling_sensor',
poke_interval=0,
mode='reschedule',
python_callable=callable,
python_callable=func,
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
@ -618,7 +642,7 @@ class TestTaskInstance(unittest.TestCase):
self.assertEqual(ti.start_date, expected_start_date)
self.assertEqual(ti.end_date, expected_end_date)
self.assertEqual(ti.duration, expected_duration)
trs = TaskReschedule.find_for_task_instance(ti)
trs = TaskReschedule.find_for_task_instance(ti) # pylint: disable=no-value-for-parameter
self.assertEqual(len(trs), expected_task_reschedule_count)
date1 = timezone.utcnow()
@ -675,7 +699,7 @@ class TestTaskInstance(unittest.TestCase):
done = False
fail = False
def callable():
def func():
if fail:
raise AirflowException()
return done
@ -685,7 +709,7 @@ class TestTaskInstance(unittest.TestCase):
task_id='test_reschedule_handling_sensor',
poke_interval=0,
mode='reschedule',
python_callable=callable,
python_callable=func,
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
@ -714,7 +738,7 @@ class TestTaskInstance(unittest.TestCase):
self.assertEqual(ti.start_date, expected_start_date)
self.assertEqual(ti.end_date, expected_end_date)
self.assertEqual(ti.duration, expected_duration)
trs = TaskReschedule.find_for_task_instance(ti)
trs = TaskReschedule.find_for_task_instance(ti) # pylint: disable=no-value-for-parameter
self.assertEqual(len(trs), expected_task_reschedule_count)
date1 = timezone.utcnow()
@ -728,7 +752,7 @@ class TestTaskInstance(unittest.TestCase):
self.assertEqual(ti.state, State.NONE)
self.assertEqual(ti._try_number, 0)
# Check that reschedules for ti have also been cleared.
trs = TaskReschedule.find_for_task_instance(ti)
trs = TaskReschedule.find_for_task_instance(ti) # pylint: disable=no-value-for-parameter
self.assertFalse(trs)
def test_depends_on_past(self):
@ -821,14 +845,15 @@ class TestTaskInstance(unittest.TestCase):
run_date = task.start_date + datetime.timedelta(days=5)
ti = TI(downstream, run_date)
dep_results = TriggerRuleDep()._evaluate_trigger_rule(
dep_results = TriggerRuleDep()._evaluate_trigger_rule( # pylint: disable=no-value-for-parameter
ti=ti,
successes=successes,
skipped=skipped,
failed=failed,
upstream_failed=upstream_failed,
done=done,
flag_upstream_failed=flag_upstream_failed)
flag_upstream_failed=flag_upstream_failed,
)
completed = all([dep.passed for dep in dep_results])
self.assertEqual(completed, expect_completed)
@ -976,7 +1001,7 @@ class TestTaskInstance(unittest.TestCase):
pass
class TestOperator(PythonOperator):
def post_execute(self, context, result):
def post_execute(self, context, result=None):
if result == 'error':
raise TestError('expected error.')
@ -1079,12 +1104,12 @@ class TestTaskInstance(unittest.TestCase):
dag = DAG('dag', start_date=DEFAULT_DATE)
task = DummyOperator(task_id='op', dag=dag)
ti = TI(task=task, execution_date=now)
d = urllib.parse.parse_qs(
query = urllib.parse.parse_qs(
urllib.parse.urlparse(ti.mark_success_url).query,
keep_blank_values=True, strict_parsing=True)
self.assertEqual(d['dag_id'][0], 'dag')
self.assertEqual(d['task_id'][0], 'op')
self.assertEqual(pendulum.parse(d['execution_date'][0]), now)
self.assertEqual(query['dag_id'][0], 'dag')
self.assertEqual(query['task_id'][0], 'op')
self.assertEqual(pendulum.parse(query['execution_date'][0]), now)
def test_overwrite_params_with_dag_run_conf(self):
task = DummyOperator(task_id='op')
@ -1185,37 +1210,20 @@ class TestTaskInstance(unittest.TestCase):
self.assertIsNone(ti.duration)
def test_success_callbak_no_race_condition(self):
class CallbackWrapper:
def wrap_task_instance(self, ti):
self.task_id = ti.task_id
self.dag_id = ti.dag_id
self.execution_date = ti.execution_date
self.task_state_in_callback = ""
self.callback_ran = False
def success_handler(self, context): # pylint: disable=unused-argument
self.callback_ran = True
session = settings.Session()
temp_instance = session.query(TI).filter(
TI.task_id == self.task_id).filter(
TI.dag_id == self.dag_id).filter(
TI.execution_date == self.execution_date).one()
self.task_state_in_callback = temp_instance.state
cw = CallbackWrapper()
callback_wrapper = CallbackWrapper()
dag = DAG('test_success_callbak_no_race_condition', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
task = DummyOperator(task_id='op', email='test@test.test',
on_success_callback=cw.success_handler, dag=dag)
on_success_callback=callback_wrapper.success_handler, dag=dag)
ti = TI(task=task, execution_date=datetime.datetime.now())
ti.state = State.RUNNING
session = settings.Session()
session.merge(ti)
session.commit()
cw.wrap_task_instance(ti)
callback_wrapper.wrap_task_instance(ti)
ti._run_raw_task()
self.assertTrue(cw.callback_ran)
self.assertEqual(cw.task_state_in_callback, State.RUNNING)
self.assertTrue(callback_wrapper.callback_ran)
self.assertEqual(callback_wrapper.task_state_in_callback, State.RUNNING)
ti.refresh_from_db()
self.assertEqual(ti.state, State.SUCCESS)
@ -1240,13 +1248,13 @@ class TestTaskInstance(unittest.TestCase):
with create_session() as session: # type: Session
d0 = pendulum.parse('2019-01-01T00:00:00+00:00')
date = pendulum.parse('2019-01-01T00:00:00+00:00')
ret = []
for idx, state in enumerate(scenario):
ed = d0.add(days=idx)
ti = get_test_ti(session, ed, state)
new_date = date.add(days=idx)
ti = get_test_ti(session, new_date, state)
ret.append(ti)
return ret
@ -1442,33 +1450,35 @@ class TestTaskInstance(unittest.TestCase):
assert ti.state == State.SUCCESS
def test_handle_failure(self):
from unittest import mock
import mock
start_date = timezone.datetime(2016, 6, 1)
dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None, start_date=start_date)
with mock.MagicMock() as mock_on_failure_1, mock.MagicMock() as mock_on_retry_1:
task1 = DummyOperator(task_id="test_handle_failure_on_failure",
on_failure_callback=mock_on_failure_1,
on_retry_callback=mock_on_retry_1,
dag=dag)
ti1 = TI(task=task1, execution_date=start_date)
ti1.state = State.FAILED
ti1.handle_failure("test failure handling")
mock_on_failure_1 = mock.MagicMock()
mock_on_retry_1 = mock.MagicMock()
task1 = DummyOperator(task_id="test_handle_failure_on_failure",
on_failure_callback=mock_on_failure_1,
on_retry_callback=mock_on_retry_1,
dag=dag)
ti1 = TI(task=task1, execution_date=start_date)
ti1.state = State.FAILED
ti1.handle_failure("test failure handling")
context_arg_1 = mock_on_failure_1.call_args[0][0]
assert context_arg_1 and "task_instance" in context_arg_1
mock_on_retry_1.assert_not_called()
with mock.MagicMock() as mock_on_failure_2, mock.MagicMock() as mock_on_retry_2:
task2 = DummyOperator(task_id="test_handle_failure_on_retry",
on_failure_callback=mock_on_failure_2,
on_retry_callback=mock_on_retry_2,
retries=1,
dag=dag)
ti2 = TI(task=task2, execution_date=start_date)
ti2.state = State.FAILED
ti2.handle_failure("test retry handling")
mock_on_failure_2 = mock.MagicMock()
mock_on_retry_2 = mock.MagicMock()
task2 = DummyOperator(task_id="test_handle_failure_on_retry",
on_failure_callback=mock_on_failure_2,
on_retry_callback=mock_on_retry_2,
retries=1,
dag=dag)
ti2 = TI(task=task2, execution_date=start_date)
ti2.state = State.FAILED
ti2.handle_failure("test retry handling")
mock_on_failure_2.assert_not_called()