Set start_date, end_date & duration for tasks failing without DagRun (#11358)

This commit is contained in:
Kaxil Naik 2020-10-09 15:21:39 +01:00 коммит произвёл GitHub
Родитель fe0bf6e1f0
Коммит ff1a2aaff8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 27 добавлений и 3 удалений

Просмотреть файл

@ -1162,6 +1162,19 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
tis_changed += 1
else:
subq = query.subquery()
current_time = timezone.utcnow()
ti_prop_update = {
models.TaskInstance.state: new_state,
models.TaskInstance.start_date: current_time,
}
# Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
if new_state in State.finished():
ti_prop_update.update({
models.TaskInstance.end_date: current_time,
models.TaskInstance.duration: 0,
})
tis_changed = session \
.query(models.TaskInstance) \
.filter(
@ -1169,7 +1182,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date) \
.update({models.TaskInstance.state: new_state}, synchronize_session=False)
.update(ti_prop_update, synchronize_session=False)
session.commit()
if tis_changed > 0:

Просмотреть файл

@ -611,10 +611,13 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
:param session: SQLAlchemy ORM Session
:type session: Session
"""
current_time = timezone.utcnow()
self.log.debug("Setting task state for %s to %s", self, state)
self.state = state
self.start_date = timezone.utcnow()
self.end_date = timezone.utcnow()
self.start_date = current_time
if self.state in State.finished():
self.end_date = current_time
self.duration = 0
session.merge(self)
@property

Просмотреть файл

@ -2235,6 +2235,9 @@ class TestSchedulerJob(unittest.TestCase):
ti3.refresh_from_db(session=session)
self.assertEqual(ti3.state, State.NONE)
self.assertIsNotNone(ti3.start_date)
self.assertIsNone(ti3.end_date)
self.assertIsNone(ti3.duration)
dr1.refresh_from_db(session=session)
dr1.state = State.FAILED
@ -2396,6 +2399,11 @@ class TestSchedulerJob(unittest.TestCase):
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
self.assertEqual(ti.state, expected_task_state)
self.assertIsNotNone(ti.start_date)
if expected_task_state in State.finished():
self.assertIsNotNone(ti.end_date)
self.assertEqual(ti.start_date, ti.end_date)
self.assertIsNotNone(ti.duration)
@provide_session
def evaluate_dagrun(