From 1ec63123c4310f2343dcd7c349f90063c401c0d9 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Sun, 17 Jan 2021 19:18:36 -0800 Subject: [PATCH] Fix backfill crash on task retry or reschedule (#13712) When a retry happens, task key needs to be recorded with try number + 1 to avoid KeyError exception. --- airflow/jobs/backfill_job.py | 26 +++++++------- tests/jobs/test_backfill_job.py | 57 +++++++++++++++++++++++++++++++ tests/test_utils/mock_executor.py | 1 + 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index d9b6da62e8..da64b218a8 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -204,32 +204,32 @@ class BackfillJob(BaseJob): for ti in refreshed_tis: # Here we remake the key by subtracting 1 to match in memory information - key = ti.key.reduced + reduced_key = ti.key.reduced if ti.state == State.SUCCESS: - ti_status.succeeded.add(key) + ti_status.succeeded.add(reduced_key) self.log.debug("Task instance %s succeeded. Don't rerun.", ti) - ti_status.running.pop(key) + ti_status.running.pop(reduced_key) continue if ti.state == State.SKIPPED: - ti_status.skipped.add(key) + ti_status.skipped.add(reduced_key) self.log.debug("Task instance %s skipped. Don't rerun.", ti) - ti_status.running.pop(key) + ti_status.running.pop(reduced_key) continue if ti.state == State.FAILED: self.log.error("Task instance %s failed", ti) - ti_status.failed.add(key) - ti_status.running.pop(key) + ti_status.failed.add(reduced_key) + ti_status.running.pop(reduced_key) continue # special case: if the task needs to run again put it back if ti.state == State.UP_FOR_RETRY: self.log.warning("Task instance %s is up for retry", ti) - ti_status.running.pop(key) - ti_status.to_run[key] = ti + ti_status.running.pop(reduced_key) + ti_status.to_run[ti.key] = ti # special case: if the task needs to be rescheduled put it back elif ti.state == State.UP_FOR_RESCHEDULE: self.log.warning("Task instance %s is up for reschedule", ti) - ti_status.running.pop(key) - ti_status.to_run[key] = ti + ti_status.running.pop(reduced_key) + ti_status.to_run[ti.key] = ti # special case: The state of the task can be set to NONE by the task itself # when it reaches concurrency limits. It could also happen when the state # is changed externally, e.g. by clearing tasks from the ui. We need to cover @@ -242,8 +242,8 @@ class BackfillJob(BaseJob): ti, ) tis_to_be_scheduled.append(ti) - ti_status.running.pop(key) - ti_status.to_run[key] = ti + ti_status.running.pop(reduced_key) + ti_status.to_run[ti.key] = ti # Batch schedule of task instances if tis_to_be_scheduled: diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index aa221f5237..9826f18299 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -33,6 +33,7 @@ from airflow.cli import cli_parser from airflow.exceptions import ( AirflowException, AirflowTaskTimeout, + BackfillUnfinished, DagConcurrencyLimitReached, NoAvailablePoolSlot, TaskConcurrencyLimitReached, @@ -40,6 +41,7 @@ from airflow.exceptions import ( from airflow.jobs.backfill_job import BackfillJob from airflow.models import DAG, DagBag, Pool, TaskInstance as TI from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstanceKey from airflow.operators.dummy import DummyOperator from airflow.utils import timezone from airflow.utils.session import create_session @@ -674,6 +676,61 @@ class TestBackfillJob(unittest.TestCase): with pytest.raises(AirflowException): job.run() + def test_backfill_retry_intermittent_failed_task(self): + dag = DAG( + dag_id='test_intermittent_failure_job', + start_date=DEFAULT_DATE, + schedule_interval="@daily", + default_args={ + 'retries': 2, + 'retry_delay': datetime.timedelta(seconds=0), + }, + ) + task1 = DummyOperator(task_id="task1", dag=dag) + dag.clear() + + executor = MockExecutor(parallelism=16) + executor.mock_task_results[ + TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1) + ] = State.UP_FOR_RETRY + executor.mock_task_results[ + TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2) + ] = State.UP_FOR_RETRY + job = BackfillJob( + dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + ) + job.run() + + def test_backfill_retry_always_failed_task(self): + dag = DAG( + dag_id='test_always_failure_job', + start_date=DEFAULT_DATE, + schedule_interval="@daily", + default_args={ + 'retries': 1, + 'retry_delay': datetime.timedelta(seconds=0), + }, + ) + task1 = DummyOperator(task_id="task1", dag=dag) + dag.clear() + + executor = MockExecutor(parallelism=16) + executor.mock_task_results[ + TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1) + ] = State.UP_FOR_RETRY + executor.mock_task_fail(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2) + job = BackfillJob( + dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + ) + with self.assertRaises(BackfillUnfinished): + job.run() + def test_backfill_ordered_concurrent_execute(self): dag = DAG( dag_id='test_backfill_ordered_concurrent_execute', diff --git a/tests/test_utils/mock_executor.py b/tests/test_utils/mock_executor.py index 746caf4dfc..104995e915 100644 --- a/tests/test_utils/mock_executor.py +++ b/tests/test_utils/mock_executor.py @@ -69,6 +69,7 @@ class MockExecutor(BaseExecutor): for index in range(min((open_slots, len(sorted_queue)))): (key, (_, _, _, ti)) = sorted_queue[index] self.queued_tasks.pop(key) + ti._try_number += 1 state = self.mock_task_results[key] ti.set_state(state, session=session) self.change_state(key, state)