Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)

closes https://github.com/apache/airflow/issues/13434

(cherry picked from commit de277c69e7)
This commit is contained in:
Kaxil Naik 2021-01-30 12:02:53 +00:00
Родитель 073d0b13c9
Коммит 958e25836f
2 изменённых файлов: 63 добавлений и 1 удалений

Просмотреть файл

@ -1846,7 +1846,6 @@ class DAG(LoggingMixin):
or_(
DagRun.run_type == DagRunType.BACKFILL_JOB,
DagRun.run_type == DagRunType.SCHEDULED,
DagRun.external_trigger.is_(True),
),
)
.group_by(DagRun.dag_id)

Просмотреть файл

@ -3595,6 +3595,69 @@ class TestSchedulerJob(unittest.TestCase):
"'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table"
) in log_output.output[0]
def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self):
"""
Test that externally triggered Dag Runs should not affect (by skipping) next
scheduled DAG runs
"""
dag = DAG(
dag_id='test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run',
start_date=DEFAULT_DATE,
schedule_interval="*/1 * * * *",
max_active_runs=5,
catchup=True,
)
DummyOperator(task_id='dummy', dag=dag, owner='airflow')
session = settings.Session()
dag.clear()
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag, root_dag=dag)
# Write to dag and serialized_dag table
dagbag.sync_to_db(session)
dag = dagbag.get_dag(dag.dag_id)
# Verify that dag_model.next_dagrun is equal to next execution_date
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
# Verify a DagRun is created with the correct execution_date
# when Scheduler._do_scheduling is run in the Scheduler Loop
job._do_scheduling(session)
dr1 = dag.get_dagrun(DEFAULT_DATE, session)
assert dr1 is not None
assert dr1.state == State.RUNNING
# Verify that dag_model.next_dagrun is set to next execution_date
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
# Trigger the Dag externally
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=timezone.utcnow(),
run_type=DagRunType.MANUAL,
session=session,
external_trigger=True,
)
assert dr is not None
# Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
DAG.bulk_write_to_db([dag], session)
# Test that 'dag_model.next_dagrun' has not been changed because of newly created external
# triggered DagRun.
dag_model = session.query(DagModel).get(dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
def test_do_schedule_max_active_runs_upstream_failed(self):
"""
Test that tasks in upstream failed don't count as actively running.