Fix backfill crash on task retry or reschedule (#13712)

When a retry happens, task key needs to be recorded with try number + 1
to avoid KeyError exception.
This commit is contained in:
QP Hou 2021-01-17 19:18:36 -08:00 коммит произвёл GitHub
Родитель 58b36b861c
Коммит 1ec63123c4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 71 добавлений и 13 удалений

Просмотреть файл

@ -204,32 +204,32 @@ class BackfillJob(BaseJob):
for ti in refreshed_tis:
# Here we remake the key by subtracting 1 to match in memory information
key = ti.key.reduced
reduced_key = ti.key.reduced
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
ti_status.succeeded.add(reduced_key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.running.pop(key)
ti_status.running.pop(reduced_key)
continue
if ti.state == State.SKIPPED:
ti_status.skipped.add(key)
ti_status.skipped.add(reduced_key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.running.pop(key)
ti_status.running.pop(reduced_key)
continue
if ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.running.pop(key)
ti_status.failed.add(reduced_key)
ti_status.running.pop(reduced_key)
continue
# special case: if the task needs to run again put it back
if ti.state == State.UP_FOR_RETRY:
self.log.warning("Task instance %s is up for retry", ti)
ti_status.running.pop(key)
ti_status.to_run[key] = ti
ti_status.running.pop(reduced_key)
ti_status.to_run[ti.key] = ti
# special case: if the task needs to be rescheduled put it back
elif ti.state == State.UP_FOR_RESCHEDULE:
self.log.warning("Task instance %s is up for reschedule", ti)
ti_status.running.pop(key)
ti_status.to_run[key] = ti
ti_status.running.pop(reduced_key)
ti_status.to_run[ti.key] = ti
# special case: The state of the task can be set to NONE by the task itself
# when it reaches concurrency limits. It could also happen when the state
# is changed externally, e.g. by clearing tasks from the ui. We need to cover
@ -242,8 +242,8 @@ class BackfillJob(BaseJob):
ti,
)
tis_to_be_scheduled.append(ti)
ti_status.running.pop(key)
ti_status.to_run[key] = ti
ti_status.running.pop(reduced_key)
ti_status.to_run[ti.key] = ti
# Batch schedule of task instances
if tis_to_be_scheduled:

Просмотреть файл

@ -33,6 +33,7 @@ from airflow.cli import cli_parser
from airflow.exceptions import (
AirflowException,
AirflowTaskTimeout,
BackfillUnfinished,
DagConcurrencyLimitReached,
NoAvailablePoolSlot,
TaskConcurrencyLimitReached,
@ -40,6 +41,7 @@ from airflow.exceptions import (
from airflow.jobs.backfill_job import BackfillJob
from airflow.models import DAG, DagBag, Pool, TaskInstance as TI
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstanceKey
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
@ -674,6 +676,61 @@ class TestBackfillJob(unittest.TestCase):
with pytest.raises(AirflowException):
job.run()
def test_backfill_retry_intermittent_failed_task(self):
dag = DAG(
dag_id='test_intermittent_failure_job',
start_date=DEFAULT_DATE,
schedule_interval="@daily",
default_args={
'retries': 2,
'retry_delay': datetime.timedelta(seconds=0),
},
)
task1 = DummyOperator(task_id="task1", dag=dag)
dag.clear()
executor = MockExecutor(parallelism=16)
executor.mock_task_results[
TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1)
] = State.UP_FOR_RETRY
executor.mock_task_results[
TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2)
] = State.UP_FOR_RETRY
job = BackfillJob(
dag=dag,
executor=executor,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
job.run()
def test_backfill_retry_always_failed_task(self):
dag = DAG(
dag_id='test_always_failure_job',
start_date=DEFAULT_DATE,
schedule_interval="@daily",
default_args={
'retries': 1,
'retry_delay': datetime.timedelta(seconds=0),
},
)
task1 = DummyOperator(task_id="task1", dag=dag)
dag.clear()
executor = MockExecutor(parallelism=16)
executor.mock_task_results[
TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1)
] = State.UP_FOR_RETRY
executor.mock_task_fail(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2)
job = BackfillJob(
dag=dag,
executor=executor,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
)
with self.assertRaises(BackfillUnfinished):
job.run()
def test_backfill_ordered_concurrent_execute(self):
dag = DAG(
dag_id='test_backfill_ordered_concurrent_execute',

Просмотреть файл

@ -69,6 +69,7 @@ class MockExecutor(BaseExecutor):
for index in range(min((open_slots, len(sorted_queue)))):
(key, (_, _, _, ti)) = sorted_queue[index]
self.queued_tasks.pop(key)
ti._try_number += 1
state = self.mock_task_results[key]
ti.set_state(state, session=session)
self.change_state(key, state)