diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index ad1d580bd8..7afdfc03c8 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -121,6 +121,9 @@ def set_state( tis_altered += qry_sub_dag.with_for_update().all() for task_instance in tis_altered: task_instance.state = state + if state in State.finished(): + task_instance.end_date = timezone.utcnow() + task_instance.set_duration() else: tis_altered = qry_dag.all() if sub_dag_run_ids: diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py index 6db6ce7910..1a47b77e4b 100644 --- a/tests/api/common/experimental/test_mark_tasks.py +++ b/tests/api/common/experimental/test_mark_tasks.py @@ -106,6 +106,8 @@ class TestMarkTasks(unittest.TestCase): for ti in tis: # pylint: disable=too-many-nested-blocks if ti.task_id in task_ids and ti.execution_date in execution_dates: self.assertEqual(ti.state, state) + if state in State.finished(): + self.assertIsNotNone(ti.end_date) else: for old_ti in old_tis: if old_ti.task_id == ti.task_id and old_ti.execution_date == ti.execution_date: