[AIRFLOW-5811] add metric for externally killed task count (#6466)
This commit is contained in:
Родитель
8e789a33a3
Коммит
6bcbd48792
|
@ -1265,6 +1265,7 @@ class SchedulerJob(BaseJob):
|
|||
msg = ("Executor reports task instance {} finished ({}) "
|
||||
"although the task says its {}. Was the task "
|
||||
"killed externally?".format(ti, state, ti.state))
|
||||
Stats.incr('scheduler.tasks.killed_externally')
|
||||
self.log.error(msg)
|
||||
try:
|
||||
simple_dag = simple_dag_bag.get_dag(dag_id)
|
||||
|
|
|
@ -64,6 +64,7 @@ Name Description
|
|||
``zombies_killed`` Zombie tasks killed
|
||||
``scheduler_heartbeat`` Scheduler heartbeats
|
||||
``dag_processing.processes`` Number of currently running DAG parsing processes
|
||||
``scheduler.tasks.killed_externally`` Number of tasks killed externally
|
||||
======================================= ================================================================
|
||||
|
||||
Gauges
|
||||
|
|
|
@ -140,7 +140,8 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
old_children)
|
||||
self.assertFalse(current_children)
|
||||
|
||||
def test_process_executor_events(self):
|
||||
@mock.patch('airflow.stats.Stats.incr')
|
||||
def test_process_executor_events(self, mock_stats_incr):
|
||||
dag_id = "test_process_executor_events"
|
||||
dag_id2 = "test_process_executor_events_2"
|
||||
task_id_1 = 'dummy_task'
|
||||
|
@ -185,6 +186,8 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
ti1.refresh_from_db()
|
||||
self.assertEqual(ti1.state, State.SUCCESS)
|
||||
|
||||
mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
|
||||
|
||||
def test_execute_task_instances_is_paused_wont_execute(self):
|
||||
dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
|
||||
task_id_1 = 'dummy_task'
|
||||
|
|
Загрузка…
Ссылка в новой задаче