[AIRFLOW-4194] Set dag_run state to failed when user terminate backfill (#5016)

This commit is contained in:
Chao-Han Tsai 2019-04-02 12:37:22 -07:00 коммит произвёл Tao Feng
Родитель a99179f981
Коммит 8ca3541f63
3 изменённых файлов: 70 добавлений и 1 удалений

Просмотреть файл

@ -163,6 +163,8 @@ def backfill(args, dag=None):
level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
dag = dag or get_dag(args)
if not args.start_date and not args.end_date:

Просмотреть файл

@ -2436,6 +2436,22 @@ class BackfillJob(BaseJob):
ti_status.executed_dag_run_dates.update(processed_dag_run_dates)
@provide_session
def _set_unfinished_dag_runs_to_failed(self, dag_runs, session=None):
"""
Go through the dag_runs and update the state based on the task_instance state.
Then set DAG runs that are not finished to failed.
:param dag_runs: DAG runs
:param session: session
:return: None
"""
for dag_run in dag_runs:
dag_run.update_state()
if dag_run.state not in State.finished():
dag_run.set_state(State.FAILED)
session.merge(dag_run)
@provide_session
def _execute(self, session=None):
"""
@ -2502,9 +2518,15 @@ class BackfillJob(BaseJob):
self.dag_id
)
time.sleep(self.delay_on_limit_secs)
except (KeyboardInterrupt, SystemExit):
self.log.warning("Backfill terminated by user.")
# TODO: we will need to terminate running task instances and set the
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
finally:
executor.end()
session.commit()
executor.end()
self.log.info("Backfill done. Exiting.")

Просмотреть файл

@ -149,6 +149,51 @@ class BackfillJobTest(unittest.TestCase):
self.parser = cli.CLIFactory.get_parser()
self.dagbag = DagBag(include_examples=True)
def test_unfinished_dag_runs_set_to_failed(self):
dag = self._get_dummy_dag('dummy_dag')
dag_run = dag.create_dagrun(
run_id='test',
state=State.RUNNING,
)
job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=8),
ignore_first_depends_on_past=True
)
job._set_unfinished_dag_runs_to_failed([dag_run])
dag_run.refresh_from_db()
self.assertEquals(State.FAILED, dag_run.state)
def test_dag_run_with_finished_tasks_set_to_success(self):
dag = self._get_dummy_dag('dummy_dag')
dag_run = dag.create_dagrun(
run_id='test',
state=State.RUNNING,
)
for ti in dag_run.get_task_instances():
ti.set_state(State.SUCCESS)
job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=8),
ignore_first_depends_on_past=True
)
job._set_unfinished_dag_runs_to_failed([dag_run])
dag_run.refresh_from_db()
self.assertEquals(State.SUCCESS, dag_run.state)
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
"concurrent access not supported in sqlite")
def test_trigger_controller_dag(self):