diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index a64ae5d240..8232c55994 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -29,7 +29,8 @@ from typing import Any, Dict, List, Optional, Type import pkg_resources -from airflow import settings +from airflow import settings # type: ignore +from airflow.utils.file import find_path_from_directory # type: ignore log = logging.getLogger(__name__) @@ -158,40 +159,37 @@ def load_entrypoint_plugins(): def load_plugins_from_plugin_directory(): """ - Load and register Airflow Plugins from plugins directory. + Load and register Airflow Plugins from plugins directory """ global import_errors # pylint: disable=global-statement global plugins # pylint: disable=global-statement log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) - # Crawl through the plugins folder to find AirflowPlugin derivatives - for root, _, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True): # noqa # pylint: disable=too-many-nested-blocks - for f in files: - filepath = os.path.join(root, f) - try: - if not os.path.isfile(filepath): - continue - mod_name, file_ext = os.path.splitext( - os.path.split(filepath)[-1]) - if file_ext != '.py': - continue + for file_path in find_path_from_directory( + settings.PLUGINS_FOLDER, ".airflowignore"): - log.debug('Importing plugin module %s', filepath) + if not os.path.isfile(file_path): + continue + mod_name, file_ext = os.path.splitext(os.path.split(file_path)[-1]) + if file_ext != '.py': + continue - loader = importlib.machinery.SourceFileLoader(mod_name, filepath) - spec = importlib.util.spec_from_loader(mod_name, loader) - mod = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = mod - loader.exec_module(mod) - for mod_attr_value in list(mod.__dict__.values()): - if is_valid_plugin(mod_attr_value): - plugin_instance = mod_attr_value() - plugins.append(plugin_instance) - except Exception as e: # pylint: disable=broad-except - log.exception(e) - path = filepath or str(f) - log.error('Failed to import plugin %s', path) - import_errors[path] = str(e) + try: + loader = importlib.machinery.SourceFileLoader(mod_name, file_path) + spec = importlib.util.spec_from_loader(mod_name, loader) + mod = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = mod + loader.exec_module(mod) + log.debug('Importing plugin module %s', file_path) + + for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)): + plugin_instance = mod_attr_value() + plugins.append(plugin_instance) + + except Exception as e: # pylint: disable=broad-except + log.exception(e) + log.error('Failed to import plugin %s', file_path) + import_errors[file_path] = str(e) # pylint: disable=protected-access diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 8311da2423..3cd6f1aac7 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -20,9 +20,9 @@ import logging import os import re import zipfile -from typing import Dict, List, Optional, Pattern +from typing import Dict, Generator, List, Optional, Pattern -from airflow.configuration import conf +from airflow.configuration import conf # type: ignore log = logging.getLogger(__name__) @@ -90,6 +90,47 @@ def open_maybe_zipped(fileloc, mode='r'): return io.open(fileloc, mode=mode) +def find_path_from_directory( + base_dir_path: str, + ignore_file_name: str) -> Generator[str, None, None]: + """ + Search the file and return the path of the file that should not be ignored. + :param base_dir_path: the base path to be searched for. + :param ignore_file_name: the file name in which specifies a regular expression pattern is written. + + :return : file path not to be ignored. + """ + + patterns_by_dir: Dict[str, List[Pattern[str]]] = {} + + for root, dirs, files in os.walk(str(base_dir_path), followlinks=True): + patterns: List[Pattern[str]] = patterns_by_dir.get(root, []) + + ignore_file_path = os.path.join(root, ignore_file_name) + if os.path.isfile(ignore_file_path): + with open(ignore_file_path, 'r') as file: + lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")] + patterns += [re.compile(line) for line in lines_no_comments if line] + patterns = list(set(patterns)) + + dirs[:] = [ + subdir + for subdir in dirs + if not any(p.search( + os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns) + ] + + patterns_by_dir = {os.path.join(root, sd): patterns.copy() for sd in dirs} + + for file in files: # type: ignore + if file == ignore_file_name: + continue + file_path = os.path.join(root, str(file)) + if any(re.findall(p, file_path) for p in patterns): + continue + yield str(file_path) + + def list_py_file_paths(directory: str, safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True), include_examples: Optional[bool] = None): @@ -116,59 +157,32 @@ def list_py_file_paths(directory: str, elif os.path.isfile(directory): return [directory] elif os.path.isdir(directory): - patterns_by_dir: Dict[str, List[Pattern[str]]] = {} - for root, dirs, files in os.walk(directory, followlinks=True): - patterns: List[Pattern[str]] = patterns_by_dir.get(root, []) - ignore_file = os.path.join(root, '.airflowignore') - if os.path.isfile(ignore_file): - with open(ignore_file, 'r') as file: - # If we have new patterns create a copy so we don't change - # the previous list (which would affect other subdirs) - lines_no_comments = [COMMENT_PATTERN.sub("", line) for line in file.read().split("\n")] - patterns += [re.compile(line) for line in lines_no_comments if line] - - # If we can ignore any subdirs entirely we should - fewer paths - # to walk is better. We have to modify the ``dirs`` array in - # place for this to affect os.walk - dirs[:] = [ - subdir - for subdir in dirs - if not any(p.search(os.path.join(root, subdir)) for p in patterns) - ] - - # We want patterns defined in a parent folder's .airflowignore to - # apply to subdirs too - for subdir in dirs: - patterns_by_dir[os.path.join(root, subdir)] = patterns.copy() - - find_dag_file_paths(file_paths, files, patterns, root, safe_mode) + find_dag_file_paths(directory, file_paths, safe_mode) if include_examples: - from airflow import example_dags + from airflow import example_dags # type: ignore example_dag_folder = example_dags.__path__[0] # type: ignore file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False)) return file_paths -def find_dag_file_paths(file_paths, files, patterns, root, safe_mode): +def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool): """Finds file paths of all DAG files.""" - for f in files: + + for file_path in find_path_from_directory( + directory, ".airflowignore"): # noinspection PyBroadException try: - file_path = os.path.join(root, f) if not os.path.isfile(file_path): continue _, file_ext = os.path.splitext(os.path.split(file_path)[-1]) if file_ext != '.py' and not zipfile.is_zipfile(file_path): continue - if any([re.findall(p, file_path) for p in patterns]): - continue - if not might_contain_dag(file_path, safe_mode): continue file_paths.append(file_path) except Exception: # pylint: disable=broad-except - log.exception("Error while examining %s", f) + log.exception("Error while examining %s", file_path) COMMENT_PATTERN = re.compile(r"\s*#.*") diff --git a/docs/concepts.rst b/docs/concepts.rst index 4695a83fa3..6448650511 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1441,12 +1441,13 @@ do the same, but then it is more suitable to use a virtualenv and pip. '''''''''''''' A ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER`` -that Airflow should intentionally ignore. Each line in ``.airflowignore`` -specifies a regular expression pattern, and directories or files whose names -(not DAG id) match any of the patterns would be ignored (under the hood, -``re.findall()`` is used to match the pattern). Overall it works like a -``.gitignore`` file. Use the ``#`` character to indicate a comment; all -characters on a line following a ``#`` will be ignored. +or ``PLUGINS_FOLDER`` that Airflow should intentionally ignore. +Each line in ``.airflowignore`` specifies a regular expression pattern, +and directories or files whose names (not DAG id) match any of the patterns +would be ignored (under the hood,``re.findall()`` is used to match the pattern). +Overall it works like a ``.gitignore`` file. +Use the ``#`` character to indicate a comment; all characters +on a line following a ``#`` will be ignored. ``.airflowignore`` file should be put in your ``DAG_FOLDER``. For example, you can prepare a ``.airflowignore`` file with contents diff --git a/tests/plugins/test_plugin_ignore.py b/tests/plugins/test_plugin_ignore.py new file mode 100644 index 0000000000..77c022bdfc --- /dev/null +++ b/tests/plugins/test_plugin_ignore.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import shutil +import tempfile +import unittest +from unittest.mock import patch + +from airflow import settings # type: ignore +from airflow.utils.file import find_path_from_directory # type: ignore + + +class TestIgnorePluginFile(unittest.TestCase): + """ + Test that the .airflowignore work and whether the file is properly ignored. + """ + + def setUp(self): + """ + Make tmp folder and files that should be ignored. And set base path. + """ + self.test_dir = tempfile.mkdtemp() + self.test_file = os.path.join(self.test_dir, 'test_file.txt') + self.plugin_folder_path = os.path.join(self.test_dir, 'test_ignore') + os.mkdir(os.path.join(self.test_dir, "test_ignore")) + os.mkdir(os.path.join(self.plugin_folder_path, "subdir1")) + os.mkdir(os.path.join(self.plugin_folder_path, "subdir2")) + files_content = [ + ["test_load.py", "#Should not be ignored file"], + ["test_notload.py", 'raise Exception("This file should have been ignored!")'], + [".airflowignore", "#ignore test\nnot\nsubdir2"], + ["subdir1/.airflowignore", "#ignore test\nnone"], + ["subdir1/test_load_sub1.py", "#Should not be ignored file"], + ["test_notload_sub.py", 'raise Exception("This file should have been ignored!")'], + ["subdir1/test_noneload_sub1.py", 'raise Exception("This file should have been ignored!")'], + ["subdir2/test_shouldignore.py", 'raise Exception("This file should have been ignored!")'], + ] + for file_path, content in files_content: + with open(os.path.join(self.plugin_folder_path, file_path), "w") as f: + f.write(content) + self.mock_plugins_folder = patch.object( + settings, 'PLUGINS_FOLDER', return_value=self.plugin_folder_path + ) + + def tearDown(self): + """ + Delete tmp folder + """ + shutil.rmtree(self.test_dir) + + def test_find_not_should_ignore_path(self): + """ + Test that the .airflowignore work and whether the file is properly ignored. + """ + + detected_files = set() + should_ignore_files = { + 'test_notload.py', + 'test_notload_sub.py', + 'test_noneload_sub1.py', + 'test_shouldignore.py', + } + should_not_ignore_files = { + 'test_load.py', + 'test_load_sub1.py', + } + ignore_list_file = ".airflowignore" + for file_path in find_path_from_directory(self.plugin_folder_path, ignore_list_file): + if not os.path.isfile(file_path): + continue + _, file_ext = os.path.splitext(os.path.split(file_path)[-1]) + if file_ext != '.py': + continue + detected_files.add(os.path.basename(file_path)) + self.assertEqual(detected_files, should_not_ignore_files) + self.assertEqual(detected_files & should_ignore_files, set())