From 6bcbd487925c2cb3bd10f1a379a0d8d23133c420 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Wed, 6 Nov 2019 12:46:58 -0800 Subject: [PATCH] [AIRFLOW-5811] add metric for externally killed task count (#6466) --- airflow/jobs/scheduler_job.py | 1 + docs/metrics.rst | 1 + tests/jobs/test_scheduler_job.py | 5 ++++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 324895a832..a6b42bc2ff 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1265,6 +1265,7 @@ class SchedulerJob(BaseJob): msg = ("Executor reports task instance {} finished ({}) " "although the task says its {}. Was the task " "killed externally?".format(ti, state, ti.state)) + Stats.incr('scheduler.tasks.killed_externally') self.log.error(msg) try: simple_dag = simple_dag_bag.get_dag(dag_id) diff --git a/docs/metrics.rst b/docs/metrics.rst index 1e2f23d95d..afbd7c916b 100644 --- a/docs/metrics.rst +++ b/docs/metrics.rst @@ -64,6 +64,7 @@ Name Description ``zombies_killed`` Zombie tasks killed ``scheduler_heartbeat`` Scheduler heartbeats ``dag_processing.processes`` Number of currently running DAG parsing processes +``scheduler.tasks.killed_externally`` Number of tasks killed externally ======================================= ================================================================ Gauges diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 7e9efa2a4c..fa2789ebb5 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -140,7 +140,8 @@ class TestSchedulerJob(unittest.TestCase): old_children) self.assertFalse(current_children) - def test_process_executor_events(self): + @mock.patch('airflow.stats.Stats.incr') + def test_process_executor_events(self, mock_stats_incr): dag_id = "test_process_executor_events" dag_id2 = "test_process_executor_events_2" task_id_1 = 'dummy_task' @@ -185,6 +186,8 @@ class TestSchedulerJob(unittest.TestCase): ti1.refresh_from_db() self.assertEqual(ti1.state, State.SUCCESS) + mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally') + def test_execute_task_instances_is_paused_wont_execute(self): dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute' task_id_1 = 'dummy_task'