Fixes treatment of open slots in scheduler (#9316) (#9505)

Makes scheduler count with number of slots required by tasks.
If there's less open slots than required, a task isn't taken to a queue.
This commit is contained in:
pulsar314 2020-06-25 22:42:03 +03:00 коммит произвёл GitHub
Родитель 87fdbd0708
Коммит 0e31f186d3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 158 добавлений и 3 удалений

Просмотреть файл

@ -1242,8 +1242,9 @@ class SchedulerJob(BaseJob):
open_slots, pool
)
# Can't schedule any more since there are no more open slots.
num_starving_tasks = len(priority_sorted_task_instances) - current_index
num_starving_tasks_total += num_starving_tasks
num_unhandled = len(priority_sorted_task_instances) - current_index
num_starving_tasks += num_unhandled
num_starving_tasks_total += num_unhandled
break
# Check to make sure that the task concurrency of the DAG hasn't been
@ -1286,8 +1287,17 @@ class SchedulerJob(BaseJob):
num_tasks_in_executor += 1
continue
if task_instance.pool_slots > open_slots:
self.log.info("Not executing %s since it requires %s slots "
"but there are %s open slots in the pool %s.",
task_instance, task_instance.pool_slots, open_slots, pool)
num_starving_tasks += 1
num_starving_tasks_total += 1
# Though we can execute tasks with lower priority if there's enough room
continue
executable_tis.append(task_instance)
open_slots -= 1
open_slots -= task_instance.pool_slots
dag_concurrency_map[dag_id] += 1
task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1

Просмотреть файл

@ -2694,6 +2694,151 @@ class TestSchedulerJob(unittest.TestCase):
self.assertEqual(len(scheduler.executor.queued_tasks), 1)
def test_scheduler_verify_pool_full_2_slots_per_task(self):
"""
Test task instances not queued when pool is full.
Variation with non-default pool_slots
"""
dag = DAG(
dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
start_date=DEFAULT_DATE)
DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow',
pool='test_scheduler_verify_pool_full_2_slots_per_task',
pool_slots=2,
)
session = settings.Session()
pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6)
session.add(pool)
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
scheduler = SchedulerJob(executor=self.null_exec)
# Create 5 dagruns, which will create 5 task instances.
for _ in range(5):
dag_file_processor.create_dag_run(dag)
dag_runs = DagRun.find(dag_id="test_scheduler_verify_pool_full_2_slots_per_task")
task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
self.assertEqual(len(task_instances_list), 5)
dagbag = self._make_simple_dag_bag([dag])
# Recreated part of the scheduler here, to kick off tasks -> executor
for ti_key in task_instances_list:
task = dag.get_task(ti_key[1])
ti = TaskInstance(task, ti_key[2])
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
# Also save this task instance to the DB.
session.merge(ti)
session.commit()
self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
scheduler._execute_task_instances(dagbag, session=session)
# As tasks require 2 slots, only 3 can fit into 6 available
self.assertEqual(len(scheduler.executor.queued_tasks), 3)
def test_scheduler_verify_priority_and_slots(self):
"""
Test task instances with higher priority are not queued
when pool does not have enough slots.
Though tasks with lower priority might be executed.
"""
dag = DAG(
dag_id='test_scheduler_verify_priority_and_slots',
start_date=DEFAULT_DATE)
# Medium priority, not enough slots
DummyOperator(
task_id='test_scheduler_verify_priority_and_slots_t0',
dag=dag,
owner='airflow',
pool='test_scheduler_verify_priority_and_slots',
pool_slots=2,
priority_weight=2,
)
# High priority, occupies first slot
DummyOperator(
task_id='test_scheduler_verify_priority_and_slots_t1',
dag=dag,
owner='airflow',
pool='test_scheduler_verify_priority_and_slots',
pool_slots=1,
priority_weight=3,
)
# Low priority, occupies second slot
DummyOperator(
task_id='test_scheduler_verify_priority_and_slots_t2',
dag=dag,
owner='airflow',
pool='test_scheduler_verify_priority_and_slots',
pool_slots=1,
priority_weight=1,
)
session = settings.Session()
pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2)
session.add(pool)
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.is_paused = False
session.merge(orm_dag)
session.commit()
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
scheduler = SchedulerJob(executor=self.null_exec)
dag_file_processor.create_dag_run(dag)
dag_runs = DagRun.find(dag_id="test_scheduler_verify_priority_and_slots")
task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
self.assertEqual(len(task_instances_list), 3)
dagbag = self._make_simple_dag_bag([dag])
# Recreated part of the scheduler here, to kick off tasks -> executor
for ti_key in task_instances_list:
task = dag.get_task(ti_key[1])
ti = TaskInstance(task, ti_key[2])
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
# Also save this task instance to the DB.
session.merge(ti)
session.commit()
self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
scheduler._execute_task_instances(dagbag, session=session)
# Only second and third
self.assertEqual(len(scheduler.executor.queued_tasks), 2)
ti0 = session.query(TaskInstance)\
.filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first()
self.assertEqual(ti0.state, State.SCHEDULED)
ti1 = session.query(TaskInstance)\
.filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first()
self.assertEqual(ti1.state, State.QUEUED)
ti2 = session.query(TaskInstance)\
.filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first()
self.assertEqual(ti2.state, State.QUEUED)
def test_scheduler_reschedule(self):
"""
Checks if tasks that are not taken up by the executor