diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 6ce6cb498f..1cfb34fc25 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -62,18 +62,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :type pickle_dags: bool :param dag_id_white_list: If specified, only look at these DAG ID's :type dag_id_white_list: list[unicode] + :param zombies: zombie task instances to kill + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] """ # Counter that increments every time an instance of this class is created class_creation_counter = 0 - def __init__(self, file_path, pickle_dags, dag_id_white_list): + def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies): self._file_path = file_path # The process that was launched to process the given . self._process = None self._dag_id_white_list = dag_id_white_list self._pickle_dags = pickle_dags + self._zombies = zombies # The result of Scheduler.process_file(file_path). self._result = None # Whether the process is done running. @@ -94,7 +97,8 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): file_path, pickle_dags, dag_id_white_list, - thread_name): + thread_name, + zombies): """ Process the given file. @@ -112,6 +116,8 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :type thread_name: unicode :return: the process that was launched :rtype: multiprocessing.Process + :param zombies: zombie task instances to kill + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] """ # This helper runs in the newly created process log = logging.getLogger("airflow.processor") @@ -139,7 +145,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log) - result = scheduler_job.process_file(file_path, pickle_dags) + result = scheduler_job.process_file(file_path, + zombies, + pickle_dags) result_channel.send(result) end_time = time.time() log.info( @@ -170,6 +178,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): self._pickle_dags, self._dag_id_white_list, "DagFileProcessor{}".format(self._instance_id), + self._zombies ), name="DagFileProcessor{}-Process".format(self._instance_id) ) @@ -1285,10 +1294,11 @@ class SchedulerJob(BaseJob): known_file_paths = list_py_file_paths(self.subdir) self.log.info("There are %s files in %s", len(known_file_paths), self.subdir) - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, pickle_dags, - self.dag_ids) + self.dag_ids, + zombies) # When using sqlite, we do not use async_mode # so the scheduler job and DAG parser don't access the DB at the same time. @@ -1465,7 +1475,7 @@ class SchedulerJob(BaseJob): return dags @provide_session - def process_file(self, file_path, pickle_dags=False, session=None): + def process_file(self, file_path, zombies, pickle_dags=False, session=None): """ Process a Python file containing Airflow DAGs. @@ -1484,6 +1494,8 @@ class SchedulerJob(BaseJob): :param file_path: the path to the Python file that should be executed :type file_path: unicode + :param zombies: zombie task instances to kill. + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] :param pickle_dags: whether serialize the DAGs found in the file and save them to the db :type pickle_dags: bool @@ -1567,7 +1579,7 @@ class SchedulerJob(BaseJob): except Exception: self.log.exception("Error logging import errors!") try: - dagbag.kill_zombies() + dagbag.kill_zombies(zombies) except Exception: self.log.exception("Error killing zombies!") diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index b09f47f095..64195fbb9b 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -25,7 +25,7 @@ import sys import textwrap import zipfile from collections import namedtuple -from datetime import datetime, timedelta +from datetime import datetime from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter from sqlalchemy import or_ @@ -41,7 +41,6 @@ from airflow.utils.dag_processing import correct_maybe_zipped, list_py_file_path from airflow.utils.db import provide_session from airflow.utils.helpers import pprinttable from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -273,43 +272,34 @@ class DagBag(BaseDagBag, LoggingMixin): return found_dags @provide_session - def kill_zombies(self, session=None): + def kill_zombies(self, zombies, session=None): """ - Fail zombie tasks, which are tasks that haven't + Fail given zombie tasks, which are tasks that haven't had a heartbeat for too long, in the current DagBag. + :param zombies: zombie task instances to kill. + :type zombies: airflow.utils.dag_processing.SimpleTaskInstance :param session: DB session. :type session: sqlalchemy.orm.session.Session """ - # Avoid circular import - from airflow.models.taskinstance import TaskInstance as TI - from airflow.jobs import LocalTaskJob as LJ + from airflow.models.taskinstance import TaskInstance # Avoid circular import - # How many seconds do we wait for tasks to heartbeat before mark them as zombies. - limit_dttm = timezone.utcnow() - timedelta(seconds=self.SCHEDULER_ZOMBIE_TASK_THRESHOLD) - self.log.debug("Failing jobs without heartbeat after %s", limit_dttm) - - tis = ( - session.query(TI) - .join(LJ, TI.job_id == LJ.id) - .filter(TI.state == State.RUNNING) - .filter(TI.dag_id.in_(self.dags)) - .filter( - or_( - LJ.state != State.RUNNING, - LJ.latest_heartbeat < limit_dttm, - ) - ).all() - ) - for ti in tis: - self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s", - ti.dag_id, ti.task_id, ti.execution_date.isoformat()) - ti.test_mode = self.UNIT_TEST_MODE - ti.task = self.dags[ti.dag_id].get_task(ti.task_id) - ti.handle_failure("{} detected as zombie".format(ti), - ti.test_mode, ti.get_template_context()) - self.log.info('Marked zombie job %s as %s', ti, ti.state) - Stats.incr('zombies_killed') + for zombie in zombies: + if zombie.dag_id in self.dags: + dag = self.dags[zombie.dag_id] + if zombie.task_id in dag.task_ids: + task = dag.get_task(zombie.task_id) + ti = TaskInstance(task, zombie.execution_date) + # Get properties needed for failure handling from SimpleTaskInstance. + ti.start_date = zombie.start_date + ti.end_date = zombie.end_date + ti.try_number = zombie.try_number + ti.state = zombie.state + ti.test_mode = self.UNIT_TEST_MODE + ti.handle_failure("{} detected as zombie".format(ti), + ti.test_mode, ti.get_template_context()) + self.log.info('Marked zombie job %s as %s', ti, ti.state) + Stats.incr('zombies_killed') session.commit() def bag_dag(self, dag, parent_dag, root_dag): diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 733d8bdfd4..40fa45ba15 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -28,7 +28,9 @@ import sys import time import zipfile from abc import ABCMeta, abstractmethod -from datetime import datetime +from collections import defaultdict +from collections import namedtuple +from datetime import datetime, timedelta from importlib import import_module from typing import Iterable, NamedTuple, Optional @@ -47,6 +49,8 @@ from airflow.utils import timezone from airflow.utils.db import provide_session from airflow.utils.helpers import reap_process_group from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State +from sqlalchemy import or_ class SimpleDag(BaseDag): @@ -754,6 +758,9 @@ class DagFileProcessorManager(LoggingMixin): # 30 seconds. self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval') + # How many seconds do we wait for tasks to heartbeat before mark them as zombies. + self._zombie_threshold_secs = ( + conf.getint('scheduler', 'scheduler_zombie_task_threshold')) # Map from file path to the processor self._processors = {} @@ -1233,11 +1240,13 @@ class DagFileProcessorManager(LoggingMixin): self._file_path_queue.extend(files_paths_to_queue) + zombies = self._find_zombies() + # Start more processors if we have enough slots and files to process while (self._parallelism - len(self._processors) > 0 and len(self._file_path_queue) > 0): file_path = self._file_path_queue.pop(0) - processor = self._processor_factory(file_path) + processor = self._processor_factory(file_path, zombies) Stats.incr('dag_processing.processes') processor.start() @@ -1252,6 +1261,45 @@ class DagFileProcessorManager(LoggingMixin): return simple_dags + @provide_session + def _find_zombies(self, session): + """ + Find zombie task instances, which are tasks haven't heartbeated for too long. + :return: Zombie task instances in SimpleTaskInstance format. + """ + now = timezone.utcnow() + zombies = [] + if (now - self._last_zombie_query_time).total_seconds() \ + > self._zombie_query_interval: + # to avoid circular imports + from airflow.jobs import LocalTaskJob as LJ + self.log.info("Finding 'running' jobs without a recent heartbeat") + TI = airflow.models.TaskInstance + limit_dttm = timezone.utcnow() - timedelta( + seconds=self._zombie_threshold_secs) + self.log.info("Failing jobs without heartbeat after %s", limit_dttm) + + tis = ( + session.query(TI) + .join(LJ, TI.job_id == LJ.id) + .filter(TI.state == State.RUNNING) + .filter( + or_( + LJ.state != State.RUNNING, + LJ.latest_heartbeat < limit_dttm, + ) + ).all() + ) + self._last_zombie_query_time = timezone.utcnow() + for ti in tis: + sti = SimpleTaskInstance(ti) + self.log.info( + "Detected zombie job with dag_id %s, task_id %s, and execution date %s", + sti.dag_id, sti.task_id, sti.execution_date.isoformat()) + zombies.append(sti) + + return zombies + def _kill_timed_out_processors(self): """ Kill any file processors that timeout to defend against process hangs. diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index e3ab787449..99ba947319 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +from datetime import datetime, timezone import inspect import os import shutil @@ -29,6 +30,7 @@ from unittest.mock import ANY, patch import airflow.example_dags from airflow import models from airflow.configuration import conf +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.jobs import LocalTaskJob as LJ from airflow.models import DagBag, DagModel, TaskInstance as TI from airflow.utils.db import create_session @@ -605,92 +607,29 @@ class TestDagBag(unittest.TestCase): self.assertEqual([], dagbag.process_file(None)) @patch.object(TI, 'handle_failure') - def test_kill_zombies_when_job_state_is_not_running(self, mock_ti_handle_failure): + def test_kill_zombies(self, mock_ti_handle_failure): """ - Test that kill zombies calls TI's failure handler with proper context + Test that kill zombies call TIs failure handler with proper context """ dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) with create_session() as session: session.query(TI).delete() - session.query(LJ).delete() dag = dagbag.get_dag('example_branch_operator') task = dag.get_task(task_id='run_this_first') ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.state = State.SHUTDOWN - lj.id = 1 - ti.job_id = lj.id - session.add(lj) session.add(ti) session.commit() - dagbag.kill_zombies() + zombies = [SimpleTaskInstance(ti)] + dagbag.kill_zombies(zombies) mock_ti_handle_failure.assert_called_once_with( ANY, conf.getboolean('core', 'unit_test_mode'), ANY ) - @patch.object(TI, 'handle_failure') - def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure): - """ - Test that kill zombies calls TI's failure handler with proper context - """ - zombie_threshold_secs = ( - conf.getint('scheduler', 'scheduler_zombie_task_threshold')) - dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) - with create_session() as session: - session.query(TI).delete() - session.query(LJ).delete() - dag = dagbag.get_dag('example_branch_operator') - task = dag.get_task(task_id='run_this_first') - - ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.latest_heartbeat = utcnow() - timedelta(seconds=zombie_threshold_secs) - lj.state = State.RUNNING - lj.id = 1 - ti.job_id = lj.id - - session.add(lj) - session.add(ti) - session.commit() - - dagbag.kill_zombies() - mock_ti_handle_failure.assert_called_once_with( - ANY, - conf.getboolean('core', 'unit_test_mode'), - ANY - ) - - @patch.object(TI, 'handle_failure') - def test_kill_zombies_doesn_nothing(self, mock_ti_handle_failure): - """ - Test that kill zombies does nothing when job is running and received heartbeat - """ - dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) - with create_session() as session: - session.query(TI).delete() - session.query(LJ).delete() - dag = dagbag.get_dag('example_branch_operator') - task = dag.get_task(task_id='run_this_first') - - ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.latest_heartbeat = utcnow() - lj.state = State.RUNNING - lj.id = 1 - ti.job_id = lj.id - - session.add(lj) - session.add(ti) - session.commit() - - dagbag.kill_zombies() - mock_ti_handle_failure.assert_not_called() - def test_deactivate_unknown_dags(self): """ Test that dag_ids not passed into deactivate_unknown_dags diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index b1e20ff52f..ac5ade5d1c 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -18,20 +18,24 @@ # under the License. import os -import pathlib import sys import tempfile import unittest from datetime import datetime, timedelta from unittest import mock -from unittest.mock import MagicMock, PropertyMock + +import pathlib +from unittest.mock import (MagicMock, PropertyMock) from airflow.configuration import conf from airflow.jobs import DagFileProcessor +from airflow.jobs import LocalTaskJob as LJ +from airflow.models import DagBag, TaskInstance as TI from airflow.utils import timezone -from airflow.utils.dag_processing import ( - DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, correct_maybe_zipped, -) +from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, + SimpleTaskInstance, correct_maybe_zipped) +from airflow.utils.db import create_session +from airflow.utils.state import State TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags') @@ -171,6 +175,44 @@ class TestDagFileProcessorManager(unittest.TestCase): manager.set_file_paths(['abc.txt']) self.assertDictEqual(manager._processors, {'abc.txt': mock_processor}) + def test_find_zombies(self): + manager = DagFileProcessorManager( + dag_directory='directory', + file_paths=['abc.txt'], + max_runs=1, + processor_factory=MagicMock().return_value, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + async_mode=True) + + dagbag = DagBag(TEST_DAG_FOLDER) + with create_session() as session: + session.query(LJ).delete() + dag = dagbag.get_dag('example_branch_operator') + task = dag.get_task(task_id='run_this_first') + + ti = TI(task, DEFAULT_DATE, State.RUNNING) + lj = LJ(ti) + lj.state = State.SHUTDOWN + lj.id = 1 + ti.job_id = lj.id + + session.add(lj) + session.add(ti) + session.commit() + + manager._last_zombie_query_time = timezone.utcnow() - timedelta( + seconds=manager._zombie_threshold_secs + 1) + zombies = manager._find_zombies() + self.assertEqual(1, len(zombies)) + self.assertIsInstance(zombies[0], SimpleTaskInstance) + self.assertEqual(ti.dag_id, zombies[0].dag_id) + self.assertEqual(ti.task_id, zombies[0].task_id) + self.assertEqual(ti.execution_date, zombies[0].execution_date) + + session.query(TI).delete() + session.query(LJ).delete() + @mock.patch("airflow.jobs.DagFileProcessor.pid", new_callable=PropertyMock) @mock.patch("airflow.jobs.DagFileProcessor.kill") def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid): @@ -184,7 +226,7 @@ class TestDagFileProcessorManager(unittest.TestCase): signal_conn=MagicMock(), async_mode=True) - processor = DagFileProcessor('abc.txt', False, []) + processor = DagFileProcessor('abc.txt', False, [], []) processor._start_time = timezone.make_aware(datetime.min) manager._processors = {'abc.txt': processor} manager._kill_timed_out_processors() @@ -203,7 +245,7 @@ class TestDagFileProcessorManager(unittest.TestCase): signal_conn=MagicMock(), async_mode=True) - processor = DagFileProcessor('abc.txt', False, []) + processor = DagFileProcessor('abc.txt', False, [], []) processor._start_time = timezone.make_aware(datetime.max) manager._processors = {'abc.txt': processor} manager._kill_timed_out_processors() @@ -230,10 +272,11 @@ class TestDagFileProcessorAgent(unittest.TestCase): with settings_context(SETTINGS_FILE_VALID): # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') @@ -262,10 +305,11 @@ class TestDagFileProcessorAgent(unittest.TestCase): self.assertFalse(os.path.isfile(log_file_loc)) def test_parse_once(self): - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') @@ -288,10 +332,11 @@ class TestDagFileProcessorAgent(unittest.TestCase): self.assertEqual(dag_ids.count('test_start_date_scheduling'), 1) def test_launch_process(self): - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')