[AIRFLOW-1104] Update jobs.py so Airflow does not over schedule tasks (#3568)

This change will prevent tasks from getting scheduled and queued over
the concurrency limits set for the dag
This commit is contained in:
Dan Fowler 2018-07-31 13:25:04 -07:00 коммит произвёл Kaxil Naik
Родитель 3b35d360f6
Коммит ed972042a8
2 изменённых файлов: 34 добавлений и 4 удалений

Просмотреть файл

@ -1075,9 +1075,6 @@ class SchedulerJob(BaseJob):
:type states: Tuple[State]
:return: List[TaskInstance]
"""
# TODO(saguziel): Change this to include QUEUED, for concurrency
# purposes we may want to count queued tasks
states_to_count_as_running = [State.RUNNING]
executable_tis = []
# Get all the queued task instances from associated with scheduled
@ -1123,6 +1120,7 @@ class SchedulerJob(BaseJob):
for task_instance in task_instances_to_examine:
pool_to_task_instances[task_instance.pool].append(task_instance)
states_to_count_as_running = [State.RUNNING, State.QUEUED]
task_concurrency_map = self.__get_task_concurrency_map(
states=states_to_count_as_running, session=session)
@ -1173,7 +1171,6 @@ class SchedulerJob(BaseJob):
simple_dag = simple_dag_bag.get_dag(dag_id)
if dag_id not in dag_id_to_possibly_running_task_count:
# TODO(saguziel): also check against QUEUED state, see AIRFLOW-1104
dag_id_to_possibly_running_task_count[dag_id] = \
DAG.get_num_task_instances(
dag_id,

Просмотреть файл

@ -1493,6 +1493,39 @@ class SchedulerJobTest(unittest.TestCase):
self.assertEqual(0, len(res))
def test_find_executable_task_instances_concurrency_queued(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency_queued'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
task1 = DummyOperator(dag=dag, task_id='dummy1')
task2 = DummyOperator(dag=dag, task_id='dummy2')
task3 = DummyOperator(dag=dag, task_id='dummy3')
dagbag = self._make_simple_dag_bag([dag])
scheduler = SchedulerJob()
session = settings.Session()
dag_run = scheduler.create_dag_run(dag)
ti1 = TI(task1, dag_run.execution_date)
ti2 = TI(task2, dag_run.execution_date)
ti3 = TI(task3, dag_run.execution_date)
ti1.state = State.RUNNING
ti2.state = State.QUEUED
ti3.state = State.SCHEDULED
session.merge(ti1)
session.merge(ti2)
session.merge(ti3)
session.commit()
res = scheduler._find_executable_task_instances(
dagbag,
states=[State.SCHEDULED],
session=session)
self.assertEqual(1, len(res))
self.assertEqual(res[0].key, ti3.key)
def test_find_executable_task_instances_task_concurrency(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
task_id_1 = 'dummy'