Stop scheduler from thinking that upstream_failed tasks are running (#11730)

This was messing up the "max_active_runs" calculation, and this fix is a
"hack" until we add a better approach of adding a queued state to
DagRuns -- at which point we don't even have to do this calculation at
all.
This commit is contained in:
Ash Berlin-Taylor 2020-10-22 13:11:37 +01:00 коммит произвёл GitHub
Родитель 0eaa688796
Коммит 8045cc215d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 69 добавлений и 7 удалений

Просмотреть файл

@ -387,7 +387,7 @@ class DagRun(Base, LoggingMixin):
ti.task = dag.get_task(ti.task_id)
unfinished_tasks = [t for t in tis if t.state in State.unfinished]
finished_tasks = [t for t in tis if t.state in State.finished | {State.UPSTREAM_FAILED}]
finished_tasks = [t for t in tis if t.state in State.finished]
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
if unfinished_tasks:

Просмотреть файл

@ -100,7 +100,7 @@ class DepContext:
self.finished_tasks = dag.get_task_instances(
start_date=execution_date,
end_date=execution_date,
state=State.finished | {State.UPSTREAM_FAILED},
state=State.finished,
session=session,
)
return self.finished_tasks

Просмотреть файл

@ -107,11 +107,15 @@ class State:
SUCCESS,
FAILED,
SKIPPED,
UPSTREAM_FAILED,
])
"""
A list of states indicating that a task started and completed a
run attempt. Note that the attempt could have resulted in failure or
have been interrupted; in any case, it is no longer running.
A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
further action.
Note that the attempt could have resulted in failure or have been
interrupted; or perhaps never run at all (skip, or upstream_failed) in any
case, it is no longer running.
"""
unfinished = frozenset([

Просмотреть файл

@ -50,6 +50,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.utils.dag_processing import DagFileProcessorAgent
from airflow.utils.dates import days_ago
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
@ -3514,6 +3515,64 @@ class TestSchedulerJob(unittest.TestCase):
assert dag.get_last_dagrun().creating_job_id == scheduler.id
def test_do_schedule_max_active_runs_upstream_failed(self):
"""
Test that tasks in upstream failed don't count as actively running.
This test can be removed when adding a queued state to DagRuns.
"""
with DAG(
dag_id='test_max_active_run_plus_manual_trigger',
start_date=DEFAULT_DATE,
schedule_interval='@once',
max_active_runs=1,
) as dag:
# Cant use DummyOperator as that goes straight to success
task1 = BashOperator(task_id='dummy1', bash_command='true')
session = settings.Session()
dagbag = DagBag(
dag_folder=os.devnull,
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag, root_dag=dag)
dagbag.sync_to_db(session=session)
run1 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
session=session,
)
ti = run1.get_task_instance(task1.task_id, session)
ti.state = State.UPSTREAM_FAILED
run2 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
session=session,
)
dag.sync_to_db(session=session) # Update the date fields
job = SchedulerJob()
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
num_queued = job._do_scheduling(session)
session.flush()
assert num_queued == 1
ti = run2.get_task_instance(task1.task_id, session)
assert ti.state == State.QUEUED
session.rollback()
@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():

Просмотреть файл

@ -547,8 +547,7 @@ class TestTriggerRuleDep(unittest.TestCase):
finished_tasks = DepContext().ensure_finished_tasks(ti_op2.task.dag, ti_op2.execution_date, session)
self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op2),
(1, 0, 0, 0, 1))
finished_tasks = dr.get_task_instances(state=State.finished | {State.UPSTREAM_FAILED},
session=session)
finished_tasks = dr.get_task_instances(state=State.finished, session=session)
self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op4),
(1, 0, 1, 0, 2))
self.assertEqual(get_states_count_upstream_ti(finished_tasks=finished_tasks, ti=ti_op5),