[AIRFLOW-4107] instrument executor (#4928)
This commit is contained in:
Родитель
f7f69a2739
Коммит
dce92a5419
|
@ -23,6 +23,7 @@ from collections import OrderedDict
|
|||
# To avoid circular imports
|
||||
import airflow.utils.dag_processing
|
||||
from airflow import configuration
|
||||
from airflow.settings import Stats
|
||||
from airflow.utils.log.logging_mixin import LoggingMixin
|
||||
from airflow.utils.state import State
|
||||
|
||||
|
@ -117,10 +118,17 @@ class BaseExecutor(LoggingMixin):
|
|||
else:
|
||||
open_slots = self.parallelism - len(self.running)
|
||||
|
||||
self.log.debug("%s running task instances", len(self.running))
|
||||
self.log.debug("%s in queue", len(self.queued_tasks))
|
||||
num_running_tasks = len(self.running)
|
||||
num_queued_tasks = len(self.queued_tasks)
|
||||
|
||||
self.log.debug("%s running task instances", num_running_tasks)
|
||||
self.log.debug("%s in queue", num_queued_tasks)
|
||||
self.log.debug("%s open slots", open_slots)
|
||||
|
||||
Stats.gauge('executor.open_slots', open_slots)
|
||||
Stats.gauge('executor.queued_tasks', num_queued_tasks)
|
||||
Stats.gauge('executor.running_tasks', num_running_tasks)
|
||||
|
||||
sorted_queue = sorted(
|
||||
[(k, v) for k, v in self.queued_tasks.items()],
|
||||
key=lambda x: x[1][1],
|
||||
|
|
|
@ -59,6 +59,9 @@ dagbag_import_errors DAG import errors
|
|||
dagbag_size DAG bag size
|
||||
dag_processing.last_runtime.<dag_file> Seconds spent processing <dag_file> (in most recent iteration)
|
||||
dag_processing.last_run.seconds_ago.<dag_file> Seconds since <dag_file> was last processed
|
||||
executor.open_slots Number of of open slots on executor
|
||||
executor.queued_tasks Number of queued tasks on executor
|
||||
executor.running_tasks Number of running tasks on executor
|
||||
=============================================== ========================================================================
|
||||
|
||||
Timers
|
||||
|
|
Загрузка…
Ссылка в новой задаче