Fix pylint issues in airflow/models/dagbag.py (#9666)

This commit is contained in:
Kamil Breguła 2020-07-05 23:45:14 +02:00 коммит произвёл GitHub
Родитель 01044ff549
Коммит 444051d32c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 183 добавлений и 164 удалений

Просмотреть файл

@ -18,6 +18,7 @@
import hashlib
import importlib
import importlib.machinery
import importlib.util
import os
import sys
@ -37,7 +38,7 @@ from airflow.plugins_manager import integrate_dag_plugins
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import test_cycle
from airflow.utils.file import correct_maybe_zipped
from airflow.utils.file import correct_maybe_zipped, list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timeout import timeout
@ -65,15 +66,9 @@ class DagBag(BaseDagBag, LoggingMixin):
:param dag_folder: the folder to scan to find DAGs
:type dag_folder: unicode
:param executor: the executor to use when executing task instances
in this DagBag
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param has_logged: an instance boolean that gets flipped from False to True after a
file has been skipped. This is to prevent overloading the user with logging
messages about skipped files. Therefore only once per DagBag is a file logged
being skipped.
:param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files.
:type store_serialized_dags: bool
@ -89,7 +84,7 @@ class DagBag(BaseDagBag, LoggingMixin):
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
store_serialized_dags=False,
):
super().__init__()
dag_folder = dag_folder or settings.DAGS_FOLDER
self.dag_folder = dag_folder
self.dags = {}
@ -180,16 +175,13 @@ class DagBag(BaseDagBag, LoggingMixin):
Given a path to a python module or zip file, this method imports
the module and look for dag objects within it.
"""
from airflow.models.dag import DAG # Avoid circular import
integrate_dag_plugins()
found_dags = []
# if the source file no longer exists in the DB or in the filesystem,
# return an empty list
# todo: raise exception?
if filepath is None or not os.path.isfile(filepath):
return found_dags
return []
try:
# This failed before in what may have been a git sync
@ -198,117 +190,128 @@ class DagBag(BaseDagBag, LoggingMixin):
if only_if_updated \
and filepath in self.file_last_changed \
and file_last_changed_on_disk == self.file_last_changed[filepath]:
return found_dags
except Exception as e:
return []
except Exception as e: # pylint: disable=broad-except
self.log.exception(e)
return found_dags
mods = []
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
if safe_mode:
with open(filepath, 'rb') as file:
content = file.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = file_last_changed_on_disk
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s assumed to contain no DAGs. Skipping.",
filepath)
return found_dags
self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ('unusual_prefix_' +
hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
'_' + org_mod_name)
if mod_name in sys.modules:
del sys.modules[mod_name]
with timeout(self.DAGBAG_IMPORT_TIMEOUT):
try:
loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
spec = importlib.util.spec_from_loader(mod_name, loader)
m = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = m
loader.exec_module(m)
mods.append(m)
except Exception as e:
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
return []
if not zipfile.is_zipfile(filepath):
mods = self._load_modules_from_file(filepath, safe_mode)
else:
zip_file = zipfile.ZipFile(filepath)
for mod in zip_file.infolist():
head, _ = os.path.split(mod.filename)
mod_name, ext = os.path.splitext(mod.filename)
if not head and ext in [".py", ".pyc"]:
if mod_name == '__init__':
self.log.warning("Found __init__.%s at root of %s", ext, filepath)
if safe_mode:
with zip_file.open(mod.filename) as zf:
self.log.debug("Reading %s from %s", mod.filename, filepath)
content = zf.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = (
file_last_changed_on_disk)
# todo: create ignore list
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s assumed to contain no DAGs. Skipping.",
filepath)
mods = self._load_modules_from_zip(filepath, safe_mode)
if mod_name in sys.modules:
del sys.modules[mod_name]
try:
sys.path.insert(0, filepath)
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk
for m in mods:
for dag in list(m.__dict__.values()):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag.normalized_schedule_interval, str):
croniter(dag.normalized_schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
def bag_dag(self, dag, parent_dag, root_dag):
def _load_modules_from_file(self, filepath, safe_mode):
if not might_contain_dag(filepath, safe_mode):
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
return []
self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode('utf-8')).hexdigest()
mod_name = f'unusual_prefix_{path_hash}_{org_mod_name}'
if mod_name in sys.modules:
del sys.modules[mod_name]
with timeout(self.DAGBAG_IMPORT_TIMEOUT):
try:
loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
spec = importlib.util.spec_from_loader(mod_name, loader)
new_module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = new_module
loader.exec_module(new_module)
return [new_module]
except Exception as e: # pylint: disable=broad-except
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
return []
def _load_modules_from_zip(self, filepath, safe_mode):
mods = []
current_zip_file = zipfile.ZipFile(filepath)
for zip_info in current_zip_file.infolist():
head, _ = os.path.split(zip_info.filename)
mod_name, ext = os.path.splitext(zip_info.filename)
if ext not in [".py", ".pyc"]:
continue
if head:
continue
if mod_name == '__init__':
self.log.warning("Found __init__.%s at root of %s", ext, filepath)
self.log.debug("Reading %s from %s", zip_info.filename, filepath)
if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
# todo: create ignore list
# Don't want to spam user with skip messages
if not self.has_logged or True:
self.has_logged = True
self.log.info(
"File %s:%s assumed to contain no DAGs. Skipping.",
filepath, zip_info.filename
)
continue
if mod_name in sys.modules:
del sys.modules[mod_name]
try:
sys.path.insert(0, filepath)
current_module = importlib.import_module(mod_name)
mods.append(current_module)
except Exception as e: # pylint: disable=broad-except
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
return mods
def _process_modules(self, filepath, mods, file_last_changed_on_disk):
from airflow.models.dag import DAG # Avoid circular import
is_zipfile = zipfile.is_zipfile(filepath)
top_level_dags = [
o
for m in mods
for o in list(m.__dict__.values())
if isinstance(o, DAG)
]
found_dags = []
for dag in top_level_dags:
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
self.bag_dag(dag=dag, root_dag=dag)
if isinstance(dag.normalized_schedule_interval, str):
croniter(dag.normalized_schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = f"Invalid Cron expression: {cron_e}"
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk
return found_dags
def bag_dag(self, dag, root_dag):
"""
Adds the DAG into the bag, recurses into sub dags.
Throws AirflowDagCycleException if a cycle is detected in this dag or its subdags
@ -329,7 +332,7 @@ class DagBag(BaseDagBag, LoggingMixin):
subdag.full_filepath = dag.full_filepath
subdag.parent_dag = dag
subdag.is_subdag = True
self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
self.bag_dag(dag=subdag, root_dag=root_dag)
self.dags[dag.dag_id] = dag
self.log.debug('Loaded DAG %s', dag)
@ -371,30 +374,31 @@ class DagBag(BaseDagBag, LoggingMixin):
# Used to store stats around DagBag processing
stats = []
from airflow.utils.file import correct_maybe_zipped, list_py_file_paths
dag_folder = correct_maybe_zipped(dag_folder)
for filepath in list_py_file_paths(dag_folder, safe_mode=safe_mode,
include_examples=include_examples):
try:
ts = timezone.utcnow()
file_parse_start_dttm = timezone.utcnow()
found_dags = self.process_file(
filepath, only_if_updated=only_if_updated,
safe_mode=safe_mode)
dag_ids = [dag.dag_id for dag in found_dags]
dag_id_names = str(dag_ids)
filepath,
only_if_updated=only_if_updated,
safe_mode=safe_mode
)
td = timezone.utcnow() - ts
file_parse_end_dttm = timezone.utcnow()
stats.append(FileLoadStat(
filepath.replace(settings.DAGS_FOLDER, ''),
td,
len(found_dags),
sum([len(dag.tasks) for dag in found_dags]),
dag_id_names,
file=filepath.replace(settings.DAGS_FOLDER, ''),
duration=file_parse_end_dttm - file_parse_start_dttm,
dag_num=len(found_dags),
task_num=sum([len(dag.tasks) for dag in found_dags]),
dags=str([dag.dag_id for dag in found_dags]),
))
except Exception as e:
except Exception as e: # pylint: disable=broad-except
self.log.exception(e)
Stats.gauge(
'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 1)
end_dttm = timezone.utcnow()
durations = (end_dttm - start_dttm).total_seconds()
Stats.gauge('collect_dags', durations, 1)
Stats.gauge('dagbag_size', len(self.dags), 1)
Stats.gauge('dagbag_import_errors', len(self.import_errors), 1)
self.dagbag_stats = sorted(
@ -430,7 +434,14 @@ class DagBag(BaseDagBag, LoggingMixin):
def dagbag_report(self):
"""Prints a report around DagBag loading stats"""
report = textwrap.dedent("""\n
stats = self.dagbag_stats
dag_folder = self.dag_folder
duration = sum([o.duration for o in stats], timedelta()).total_seconds()
dag_num = sum([o.dag_num for o in stats])
task_num = sum([o.task_num for o in stats])
table = tabulate(stats, headers="keys")
report = textwrap.dedent(f"""\n
-------------------------------------------------------------------
DagBag loading stats for {dag_folder}
-------------------------------------------------------------------
@ -439,14 +450,7 @@ class DagBag(BaseDagBag, LoggingMixin):
DagBag parsing time: {duration}
{table}
""")
stats = self.dagbag_stats
return report.format(
dag_folder=self.dag_folder,
duration=sum([o.duration for o in stats], timedelta()).total_seconds(),
dag_num=sum([o.dag_num for o in stats]),
task_num=sum([o.task_num for o in stats]),
table=tabulate(stats, headers="keys"),
)
return report
def sync_to_db(self):
"""

Просмотреть файл

@ -23,7 +23,7 @@ from typing import Iterable, Optional
from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models import Base
from airflow.models.base import Base
from airflow.settings import STORE_DAG_CODE
from airflow.utils import timezone
from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped

Просмотреть файл

@ -25,7 +25,9 @@ from typing import Any, Dict, Iterable
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException,
)
from airflow.models import BaseOperator, SkipMixin, TaskReschedule
from airflow.models import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults

Просмотреть файл

@ -28,8 +28,8 @@ from dateutil import relativedelta
from pendulum.tz.timezone import Timezone
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.models.connection import Connection
from airflow.models.dag import DAG
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.helpers import serialize_template_field

Просмотреть файл

@ -188,10 +188,23 @@ def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
COMMENT_PATTERN = re.compile(r"\s*#.*")
def might_contain_dag(file_path, safe_mode):
"""Heuristic that guesses whether a Python file contains an Airflow DAG definition."""
if safe_mode and not zipfile.is_zipfile(file_path):
def might_contain_dag(file_path: str, safe_mode: bool, zip_file: Optional[zipfile.ZipFile] = None):
"""
Heuristic that guesses whether a Python file contains an Airflow DAG definition.
:param file_path: Path to the file to be checked.
:param safe_mode: Is safe mode active?. If no, this function always returns True.
:param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
:return: True, if file might contain DAGS.
"""
if not safe_mode:
return True
if zip_file:
with zip_file.open(file_path) as current_file:
content = current_file.read()
else:
if zipfile.is_zipfile(file_path):
return True
with open(file_path, 'rb') as dag_file:
content = dag_file.read()
return all([s in content for s in (b'DAG', b'airflow')])
return True
return all([s in content for s in (b'DAG', b'airflow')])

Просмотреть файл

@ -3,7 +3,6 @@
./airflow/migrations/env.py
./airflow/models/crypto.py
./airflow/models/dag.py
./airflow/models/dagbag.py
./airflow/models/dagrun.py
./airflow/models/taskinstance.py
./airflow/stats.py

Просмотреть файл

@ -97,7 +97,7 @@ class TestGetLog(unittest.TestCase):
dagbag = self.app.dag_bag # pylint: disable=no-member
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dag.sync_to_db()
dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
with create_session() as session:
self.ti = TaskInstance(
task=DummyOperator(task_id=self.TASK_ID, dag=dag),

Просмотреть файл

@ -197,7 +197,7 @@ class TestDagFileProcessor(unittest.TestCase):
dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"))
dag = self.create_test_dag()
dag.clear()
dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
dag = self.create_test_dag()
dag.clear()
task = DummyOperator(
@ -2867,7 +2867,7 @@ class TestSchedulerJob(unittest.TestCase):
orm_dag.is_paused = False
session.merge(orm_dag)
dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
@mock.patch('airflow.models.DagBag', return_value=dagbag)
@mock.patch('airflow.models.DagBag.collect_dags')
@ -2925,7 +2925,7 @@ class TestSchedulerJob(unittest.TestCase):
orm_dag.is_paused = False
session.merge(orm_dag)
dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
@mock.patch('airflow.models.DagBag', return_value=dagbag)
@mock.patch('airflow.models.DagBag.collect_dags')

Просмотреть файл

@ -129,15 +129,16 @@ class TestDagBag(unittest.TestCase):
test the loading of a DAG from within a zip file that skips another file because
it doesn't have "airflow" and "DAG"
"""
from unittest.mock import Mock
with patch('airflow.models.DagBag.log') as log_mock:
log_mock.info = Mock()
with self.assertLogs() as cm:
test_zip_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
dagbag = models.DagBag(dag_folder=test_zip_path, include_examples=False)
self.assertTrue(dagbag.has_logged)
log_mock.info.assert_any_call("File %s assumed to contain no DAGs. Skipping.",
test_zip_path)
self.assertIn(
f'INFO:airflow.models.dagbag.DagBag:File {test_zip_path}:file_no_airflow_dag.py '
'assumed to contain no DAGs. Skipping.',
cm.output
)
def test_zip(self):
"""

Просмотреть файл

@ -457,7 +457,7 @@ def dag_bag_ext():
task_a_3 >> task_b_3
for dag in [dag_0, dag_1, dag_2, dag_3]:
dag_bag.bag_dag(dag, None, dag)
dag_bag.bag_dag(dag=dag, root_dag=dag)
return dag_bag
@ -562,7 +562,7 @@ def dag_bag_cyclic():
task_a_1 >> task_b_1
for dag in [dag_0, dag_1]:
dag_bag.bag_dag(dag, None, dag)
dag_bag.bag_dag(dag=dag, root_dag=dag)
return dag_bag
@ -587,8 +587,8 @@ def dag_bag_multiple():
dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE, schedule_interval="@daily")
agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE, schedule_interval="@daily")
dag_bag.bag_dag(daily_dag, None, daily_dag)
dag_bag.bag_dag(agg_dag, None, agg_dag)
dag_bag.bag_dag(dag=daily_dag, root_dag=daily_dag)
dag_bag.bag_dag(dag=agg_dag, root_dag=agg_dag)
daily_task = DummyOperator(task_id="daily_tas", dag=daily_dag)

Просмотреть файл

@ -1101,7 +1101,7 @@ class TestLogView(TestBase):
dag.sync_to_db()
dag_removed = DAG(self.DAG_ID_REMOVED, start_date=self.DEFAULT_DATE)
dag_removed.sync_to_db()
dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
with create_session() as session:
self.ti = TaskInstance(
task=DummyOperator(task_id=self.TASK_ID, dag=dag),
@ -1393,7 +1393,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
def setup(self):
dagbag = self.test.app.dag_bag
dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
dagbag.bag_dag(dag=dag, root_dag=dag)
for run_data in self.RUNS_DATA:
run = dag.create_dagrun(
run_id=run_data[0],