From 0e31f186d38b776710080ba07be50eedf42c48a7 Mon Sep 17 00:00:00 2001 From: pulsar314 Date: Thu, 25 Jun 2020 22:42:03 +0300 Subject: [PATCH] Fixes treatment of open slots in scheduler (#9316) (#9505) Makes scheduler count with number of slots required by tasks. If there's less open slots than required, a task isn't taken to a queue. --- airflow/jobs/scheduler_job.py | 16 +++- tests/jobs/test_scheduler_job.py | 145 +++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 60e7d98982..d6a616c71d 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1242,8 +1242,9 @@ class SchedulerJob(BaseJob): open_slots, pool ) # Can't schedule any more since there are no more open slots. - num_starving_tasks = len(priority_sorted_task_instances) - current_index - num_starving_tasks_total += num_starving_tasks + num_unhandled = len(priority_sorted_task_instances) - current_index + num_starving_tasks += num_unhandled + num_starving_tasks_total += num_unhandled break # Check to make sure that the task concurrency of the DAG hasn't been @@ -1286,8 +1287,17 @@ class SchedulerJob(BaseJob): num_tasks_in_executor += 1 continue + if task_instance.pool_slots > open_slots: + self.log.info("Not executing %s since it requires %s slots " + "but there are %s open slots in the pool %s.", + task_instance, task_instance.pool_slots, open_slots, pool) + num_starving_tasks += 1 + num_starving_tasks_total += 1 + # Though we can execute tasks with lower priority if there's enough room + continue + executable_tis.append(task_instance) - open_slots -= 1 + open_slots -= task_instance.pool_slots dag_concurrency_map[dag_id] += 1 task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1 diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 70b7c35ee7..52ab25ef98 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2694,6 +2694,151 @@ class TestSchedulerJob(unittest.TestCase): self.assertEqual(len(scheduler.executor.queued_tasks), 1) + def test_scheduler_verify_pool_full_2_slots_per_task(self): + """ + Test task instances not queued when pool is full. + + Variation with non-default pool_slots + """ + dag = DAG( + dag_id='test_scheduler_verify_pool_full_2_slots_per_task', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_pool_full_2_slots_per_task', + pool_slots=2, + ) + + session = settings.Session() + pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6) + session.add(pool) + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + scheduler = SchedulerJob(executor=self.null_exec) + + # Create 5 dagruns, which will create 5 task instances. + for _ in range(5): + dag_file_processor.create_dag_run(dag) + dag_runs = DagRun.find(dag_id="test_scheduler_verify_pool_full_2_slots_per_task") + task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) + self.assertEqual(len(task_instances_list), 5) + dagbag = self._make_simple_dag_bag([dag]) + + # Recreated part of the scheduler here, to kick off tasks -> executor + for ti_key in task_instances_list: + task = dag.get_task(ti_key[1]) + ti = TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + session.merge(ti) + session.commit() + + self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") + scheduler._execute_task_instances(dagbag, session=session) + + # As tasks require 2 slots, only 3 can fit into 6 available + self.assertEqual(len(scheduler.executor.queued_tasks), 3) + + def test_scheduler_verify_priority_and_slots(self): + """ + Test task instances with higher priority are not queued + when pool does not have enough slots. + + Though tasks with lower priority might be executed. + """ + dag = DAG( + dag_id='test_scheduler_verify_priority_and_slots', + start_date=DEFAULT_DATE) + + # Medium priority, not enough slots + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t0', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=2, + priority_weight=2, + ) + # High priority, occupies first slot + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t1', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=1, + priority_weight=3, + ) + # Low priority, occupies second slot + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t2', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=1, + priority_weight=1, + ) + + session = settings.Session() + pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2) + session.add(pool) + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + scheduler = SchedulerJob(executor=self.null_exec) + + dag_file_processor.create_dag_run(dag) + dag_runs = DagRun.find(dag_id="test_scheduler_verify_priority_and_slots") + task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) + self.assertEqual(len(task_instances_list), 3) + dagbag = self._make_simple_dag_bag([dag]) + + # Recreated part of the scheduler here, to kick off tasks -> executor + for ti_key in task_instances_list: + task = dag.get_task(ti_key[1]) + ti = TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + session.merge(ti) + session.commit() + + self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") + scheduler._execute_task_instances(dagbag, session=session) + + # Only second and third + self.assertEqual(len(scheduler.executor.queued_tasks), 2) + + ti0 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first() + self.assertEqual(ti0.state, State.SCHEDULED) + + ti1 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first() + self.assertEqual(ti1.state, State.QUEUED) + + ti2 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first() + self.assertEqual(ti2.state, State.QUEUED) + def test_scheduler_reschedule(self): """ Checks if tasks that are not taken up by the executor