Spend less time waiting for DagFileProcessor processes to complete (#8814)
In debugging another test I noticed that the scheduler was spending a long time waiting for a "simple" dag to be parsed. But upon closer inspection the parsing process itself was done in a few milliseconds, but we just weren't harvesting the results in a timely fashion. This change uses the `sentinel` attribute of multiprocessing.Connection (added in Python 3.3) to be able to wait for all the processes, so that as soon as one has finished we get woken up and can immediately harvest and pass on the parsed dags. This makes test_scheduler_job.py about twice as quick, and also reduces the time the scheduler spends between tasks . In real work loads, or where there are lots of dags this likely won't equate to much such a huge speed up, but for our (synthetic) elastic performance test dag. These were the timings for the dag to run all the tasks in a single dag run to completion., with PERF_SCHEDULE_INTERVAL='1d' PERF_DAGS_COUNT=1 I also have PERF_SHAPE=linear PERF_TASKS_COUNT=12: **Before**: 45.4166s **After**: 16.9499s PERF_SHAPE=linear PERF_TASKS_COUNT=24: **Before**: 82.6426s **After**: 34.0672s PERF_SHAPE=binary_tree PERF_TASKS_COUNT=24: **Before**: 20.3802s **After**: 9.1400s PERF_SHAPE=grid PERF_TASKS_COUNT=24: **Before**: 27.4735s **After**: 11.5607s If you have many more dag **files**, this likely won't be your bottleneck.
This commit is contained in:
Родитель
92585ca4cb
Коммит
82de6f74ae
|
@ -309,6 +309,10 @@ class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, Mul
|
|||
raise AirflowException("Tried to get start time before it started!")
|
||||
return self._start_time
|
||||
|
||||
@property
|
||||
def waitable_handle(self):
|
||||
return self._process.sentinel
|
||||
|
||||
|
||||
class DagFileProcessor(LoggingMixin):
|
||||
"""
|
||||
|
|
|
@ -252,6 +252,14 @@ class AbstractDagFileProcessorProcess(metaclass=ABCMeta):
|
|||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def waitable_handle(self):
|
||||
"""
|
||||
A "waitable" handle that can be passed to ``multiprocessing.connection.wait()``
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class DagParsingStat(NamedTuple):
|
||||
"""Information on processing progress"""
|
||||
|
@ -652,6 +660,8 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
|
||||
self._log = logging.getLogger('airflow.processor_manager')
|
||||
|
||||
self.waitables = {self._signal_conn: self._signal_conn}
|
||||
|
||||
def register_exit_signals(self):
|
||||
"""
|
||||
Register signals that stop child processes
|
||||
|
@ -698,13 +708,20 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
else:
|
||||
poll_time = None
|
||||
|
||||
# Used to track how long it takes us to get once around every file in the DAG folder.
|
||||
self._parsing_start_time = timezone.utcnow()
|
||||
self._refresh_dag_dir()
|
||||
self.prepare_file_path_queue()
|
||||
|
||||
if self._async_mode:
|
||||
# If we're in async mode, we can start up straight away. If we're
|
||||
# in sync mode we need to be told to start a "loop"
|
||||
self.start_new_processes()
|
||||
|
||||
while True:
|
||||
loop_start_time = time.time()
|
||||
|
||||
# pylint: disable=no-else-break
|
||||
if self._signal_conn.poll(poll_time):
|
||||
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
|
||||
if self._signal_conn in ready:
|
||||
agent_signal = self._signal_conn.recv()
|
||||
self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
|
||||
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
|
||||
|
@ -719,13 +736,36 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
elif isinstance(agent_signal, FailureCallbackRequest):
|
||||
self._add_callback_to_queue(agent_signal)
|
||||
else:
|
||||
raise AirflowException("Invalid message")
|
||||
elif not self._async_mode:
|
||||
raise ValueError(f"Invalid message {type(agent_signal)}")
|
||||
|
||||
if not ready and not self._async_mode:
|
||||
# In "sync" mode we don't want to parse the DAGs until we
|
||||
# are told to (as that would open another connection to the
|
||||
# SQLite DB which isn't a good practice
|
||||
|
||||
# This shouldn't happen, as in sync mode poll should block for
|
||||
# ever. Lets be defensive about that.
|
||||
self.log.warning(
|
||||
"wait() unexpectedly returned nothing ready after infinite timeout (%r)!",
|
||||
poll_time
|
||||
)
|
||||
|
||||
continue
|
||||
# pylint: enable=no-else-break
|
||||
|
||||
for sentinel in ready:
|
||||
if sentinel is self._signal_conn:
|
||||
continue
|
||||
|
||||
processor = self.waitables.get(sentinel)
|
||||
if not processor:
|
||||
continue
|
||||
|
||||
simple_dags = self._collect_results_from_processor(processor)
|
||||
self.waitables.pop(sentinel)
|
||||
self._processors.pop(processor.file_path)
|
||||
for simple_dag in simple_dags:
|
||||
self._signal_conn.send(simple_dag)
|
||||
|
||||
self._refresh_dag_dir()
|
||||
self._find_zombies() # pylint: disable=no-value-for-parameter
|
||||
|
||||
|
@ -742,10 +782,6 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
# Update number of loop iteration.
|
||||
self._num_run += 1
|
||||
|
||||
simple_dags = self.collect_results()
|
||||
for simple_dag in simple_dags:
|
||||
self._signal_conn.send(simple_dag)
|
||||
|
||||
if not self._async_mode:
|
||||
self.log.debug(
|
||||
"Waiting for processors to finish since we're using sqlite")
|
||||
|
@ -755,10 +791,10 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
# this type of message
|
||||
self.wait_until_finished()
|
||||
|
||||
# Collect anything else that has finished, but don't kick off any more processors
|
||||
simple_dags = self.collect_results()
|
||||
for simple_dag in simple_dags:
|
||||
self._signal_conn.send(simple_dag)
|
||||
# Collect anything else that has finished, but don't kick off any more processors
|
||||
simple_dags = self.collect_results()
|
||||
for simple_dag in simple_dags:
|
||||
self._signal_conn.send(simple_dag)
|
||||
|
||||
self._print_stat()
|
||||
|
||||
|
@ -1032,6 +1068,31 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
while not processor.done:
|
||||
time.sleep(0.1)
|
||||
|
||||
def _collect_results_from_processor(self, processor):
|
||||
self.log.debug("Processor for %s finished", processor.file_path)
|
||||
Stats.decr('dag_processing.processes')
|
||||
last_finish_time = timezone.utcnow()
|
||||
|
||||
if processor.result is not None:
|
||||
dags, count_import_errors = processor.result
|
||||
else:
|
||||
self.log.error(
|
||||
"Processor for %s exited with return code %s.",
|
||||
processor.file_path, processor.exit_code
|
||||
)
|
||||
dags, count_import_errors = [], -1
|
||||
|
||||
stat = DagFileStat(
|
||||
num_dags=len(dags),
|
||||
import_errors=count_import_errors,
|
||||
last_finish_time=last_finish_time,
|
||||
last_duration=(last_finish_time - processor.start_time).total_seconds(),
|
||||
run_count=self.get_run_count(processor.file_path) + 1,
|
||||
)
|
||||
self._file_stats[processor.file_path] = stat
|
||||
|
||||
return dags
|
||||
|
||||
def collect_results(self):
|
||||
"""
|
||||
Collect the result from any finished DAG processors
|
||||
|
@ -1040,32 +1101,16 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
have finished since the last time this was called
|
||||
:rtype: list[airflow.utils.dag_processing.SimpleDag]
|
||||
"""
|
||||
finished_processors: Dict[str, AbstractDagFileProcessorProcess] = {}
|
||||
running_processors: Dict[str, AbstractDagFileProcessorProcess] = {}
|
||||
# Collect all the DAGs that were found in the processed files
|
||||
simple_dags = []
|
||||
|
||||
for file_path, processor in self._processors.items():
|
||||
if processor.done:
|
||||
self.log.debug("Processor for %s finished", file_path)
|
||||
Stats.decr('dag_processing.processes')
|
||||
last_finish_time = timezone.utcnow()
|
||||
finished_processors[file_path] = processor
|
||||
ready = multiprocessing.connection.wait(self.waitables.keys() - [self._signal_conn], timeout=0)
|
||||
|
||||
if processor.result is not None:
|
||||
dags, count_import_errors = processor.result
|
||||
else:
|
||||
dags, count_import_errors = [], -1
|
||||
|
||||
stat = DagFileStat(
|
||||
num_dags=len(dags),
|
||||
import_errors=count_import_errors,
|
||||
last_finish_time=last_finish_time,
|
||||
last_duration=(last_finish_time - processor.start_time).total_seconds(),
|
||||
run_count=self.get_run_count(file_path) + 1,
|
||||
)
|
||||
self._file_stats[file_path] = stat
|
||||
else:
|
||||
running_processors[file_path] = processor
|
||||
self._processors = running_processors
|
||||
for sentinel in ready:
|
||||
processor = self.waitables[sentinel]
|
||||
self.waitables.pop(processor.waitable_handle)
|
||||
self._processors.pop(processor.file_path)
|
||||
simple_dags += self._collect_results_from_processor(processor)
|
||||
|
||||
self.log.debug("%s/%s DAG parsing processes running",
|
||||
len(self._processors), self._parallelism)
|
||||
|
@ -1073,18 +1118,6 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
self.log.debug("%s file paths queued for processing",
|
||||
len(self._file_path_queue))
|
||||
|
||||
# Collect all the DAGs that were found in the processed files
|
||||
simple_dags = []
|
||||
for file_path, processor in finished_processors.items():
|
||||
if processor.result is None:
|
||||
self.log.error(
|
||||
"Processor for %s exited with return code %s.",
|
||||
processor.file_path, processor.exit_code
|
||||
)
|
||||
else:
|
||||
for simple_dag in processor.result[0]:
|
||||
simple_dags.append(simple_dag)
|
||||
|
||||
return simple_dags
|
||||
|
||||
def start_new_processes(self):
|
||||
|
@ -1109,6 +1142,7 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
|
|||
processor.pid, file_path
|
||||
)
|
||||
self._processors[file_path] = processor
|
||||
self.waitables[processor.waitable_handle] = processor
|
||||
|
||||
def prepare_file_path_queue(self):
|
||||
"""
|
||||
|
|
|
@ -2814,8 +2814,12 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
num_runs=1)
|
||||
scheduler.run()
|
||||
with create_session() as session:
|
||||
self.assertEqual(
|
||||
len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 1)
|
||||
tis = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()
|
||||
# Since this dag has no end date, and there's a chance that we'll
|
||||
# start a and finish two dag parsing processes twice in one loop!
|
||||
self.assertGreaterEqual(
|
||||
len(tis), 1,
|
||||
repr(tis))
|
||||
|
||||
def test_dag_get_active_runs(self):
|
||||
"""
|
||||
|
|
|
@ -21,6 +21,7 @@ import os
|
|||
import sys
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock, PropertyMock
|
||||
|
||||
|
@ -52,6 +53,11 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess):
|
|||
# as its processing result w/o actually parsing anything.
|
||||
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
|
||||
super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
|
||||
# We need a "real" selectable handle for waitable_handle to work
|
||||
readable, writable = multiprocessing.Pipe(duplex=False)
|
||||
writable.send('abc')
|
||||
writable.close()
|
||||
self._waitable_handle = readable
|
||||
self._result = zombies, 0
|
||||
|
||||
def start(self):
|
||||
|
@ -82,6 +88,10 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess):
|
|||
zombies
|
||||
)
|
||||
|
||||
@property
|
||||
def waitable_handle(self):
|
||||
return self._waitable_handle
|
||||
|
||||
|
||||
class TestDagFileProcessorManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -102,6 +112,28 @@ class TestDagFileProcessorManager(unittest.TestCase):
|
|||
results.append(obj)
|
||||
elif obj.done:
|
||||
return results
|
||||
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")
|
||||
|
||||
@conf_vars({('core', 'load_examples'): 'False'})
|
||||
def test_max_runs_when_no_files(self):
|
||||
|
||||
child_pipe, parent_pipe = multiprocessing.Pipe()
|
||||
|
||||
with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder:
|
||||
async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
|
||||
manager = DagFileProcessorManager(
|
||||
dag_directory=dags_folder,
|
||||
max_runs=1,
|
||||
processor_factory=FakeDagFileProcessorRunner._fake_dag_processor_factory,
|
||||
processor_timeout=timedelta.max,
|
||||
signal_conn=child_pipe,
|
||||
dag_ids=[],
|
||||
pickle_dags=False,
|
||||
async_mode=async_mode)
|
||||
|
||||
self.run_processor_manager_one_loop(manager, parent_pipe)
|
||||
child_pipe.close()
|
||||
parent_pipe.close()
|
||||
|
||||
def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
|
||||
manager = DagFileProcessorManager(
|
||||
|
|
Загрузка…
Ссылка в новой задаче