[AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)
This commit is contained in:
Родитель
f410d64de5
Коммит
3ad4f96bae
|
@ -583,7 +583,7 @@ class DagFileProcessor(LoggingMixin):
|
|||
now = timezone.utcnow()
|
||||
next_start = dag.following_schedule(now)
|
||||
last_start = dag.previous_schedule(now)
|
||||
if next_start <= now:
|
||||
if next_start <= now or isinstance(dag.schedule_interval, timedelta):
|
||||
new_start = last_start
|
||||
else:
|
||||
new_start = dag.previous_schedule(last_start)
|
||||
|
|
|
@ -28,6 +28,7 @@ import mock
|
|||
import psutil
|
||||
import pytest
|
||||
import six
|
||||
from freezegun import freeze_time
|
||||
from mock import MagicMock, patch
|
||||
from parameterized import parameterized
|
||||
|
||||
|
@ -96,6 +97,13 @@ class TestDagFileProcessor(unittest.TestCase):
|
|||
# enqueue!
|
||||
self.null_exec = MockExecutor()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
clear_db_runs()
|
||||
clear_db_pools()
|
||||
clear_db_dags()
|
||||
clear_db_sla_miss()
|
||||
clear_db_errors()
|
||||
|
||||
def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
|
||||
dag = DAG(
|
||||
dag_id='test_scheduler_reschedule',
|
||||
|
@ -410,6 +418,52 @@ class TestDagFileProcessor(unittest.TestCase):
|
|||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNone(dr)
|
||||
|
||||
@freeze_time(timezone.datetime(2020, 1, 5))
|
||||
def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_false(self):
|
||||
"""
|
||||
Test that the dag file processor does not create multiple dagruns
|
||||
if a dag is scheduled with 'timedelta' and catchup=False
|
||||
"""
|
||||
dag = DAG(
|
||||
'test_scheduler_dagrun_once_with_timedelta_and_catchup_false',
|
||||
start_date=timezone.datetime(2015, 1, 1),
|
||||
schedule_interval=timedelta(days=1),
|
||||
catchup=False)
|
||||
|
||||
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
|
||||
dag.clear()
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNotNone(dr)
|
||||
self.assertEqual(dr.execution_date, timezone.datetime(2020, 1, 4))
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNone(dr)
|
||||
|
||||
@freeze_time(timezone.datetime(2020, 5, 4))
|
||||
def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_true(self):
|
||||
"""
|
||||
Test that the dag file processor creates multiple dagruns
|
||||
if a dag is scheduled with 'timedelta' and catchup=True
|
||||
"""
|
||||
dag = DAG(
|
||||
'test_scheduler_dagrun_once_with_timedelta_and_catchup_true',
|
||||
start_date=timezone.datetime(2020, 5, 1),
|
||||
schedule_interval=timedelta(days=1),
|
||||
catchup=True)
|
||||
|
||||
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
|
||||
dag.clear()
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNotNone(dr)
|
||||
self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 1))
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNotNone(dr)
|
||||
self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 2))
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNotNone(dr)
|
||||
self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 3))
|
||||
dr = dag_file_processor.create_dag_run(dag)
|
||||
self.assertIsNone(dr)
|
||||
|
||||
@parameterized.expand([
|
||||
[State.NONE, None, None],
|
||||
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
|
||||
|
|
Загрузка…
Ссылка в новой задаче