Support .airflowignore for plugins (#9531)

This commit is contained in:
j-y-matsubara 2020-07-04 18:04:05 +09:00 коммит произвёл GitHub
Родитель 5e4b801b32
Коммит 5670e6f185
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 176 добавлений и 70 удалений

Просмотреть файл

@ -29,7 +29,8 @@ from typing import Any, Dict, List, Optional, Type
import pkg_resources import pkg_resources
from airflow import settings from airflow import settings # type: ignore
from airflow.utils.file import find_path_from_directory # type: ignore
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -158,40 +159,37 @@ def load_entrypoint_plugins():
def load_plugins_from_plugin_directory(): def load_plugins_from_plugin_directory():
""" """
Load and register Airflow Plugins from plugins directory. Load and register Airflow Plugins from plugins directory
""" """
global import_errors # pylint: disable=global-statement global import_errors # pylint: disable=global-statement
global plugins # pylint: disable=global-statement global plugins # pylint: disable=global-statement
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
# Crawl through the plugins folder to find AirflowPlugin derivatives for file_path in find_path_from_directory(
for root, _, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True): # noqa # pylint: disable=too-many-nested-blocks settings.PLUGINS_FOLDER, ".airflowignore"):
for f in files:
filepath = os.path.join(root, f)
try:
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
log.debug('Importing plugin module %s', filepath) if not os.path.isfile(file_path):
continue
mod_name, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != '.py':
continue
loader = importlib.machinery.SourceFileLoader(mod_name, filepath) try:
spec = importlib.util.spec_from_loader(mod_name, loader) loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
mod = importlib.util.module_from_spec(spec) spec = importlib.util.spec_from_loader(mod_name, loader)
sys.modules[spec.name] = mod mod = importlib.util.module_from_spec(spec)
loader.exec_module(mod) sys.modules[spec.name] = mod
for mod_attr_value in list(mod.__dict__.values()): loader.exec_module(mod)
if is_valid_plugin(mod_attr_value): log.debug('Importing plugin module %s', file_path)
plugin_instance = mod_attr_value()
plugins.append(plugin_instance) for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
except Exception as e: # pylint: disable=broad-except plugin_instance = mod_attr_value()
log.exception(e) plugins.append(plugin_instance)
path = filepath or str(f)
log.error('Failed to import plugin %s', path) except Exception as e: # pylint: disable=broad-except
import_errors[path] = str(e) log.exception(e)
log.error('Failed to import plugin %s', file_path)
import_errors[file_path] = str(e)
# pylint: disable=protected-access # pylint: disable=protected-access

Просмотреть файл

@ -20,9 +20,9 @@ import logging
import os import os
import re import re
import zipfile import zipfile
from typing import Dict, List, Optional, Pattern from typing import Dict, Generator, List, Optional, Pattern
from airflow.configuration import conf from airflow.configuration import conf # type: ignore
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -90,6 +90,47 @@ def open_maybe_zipped(fileloc, mode='r'):
return io.open(fileloc, mode=mode) return io.open(fileloc, mode=mode)
def find_path_from_directory(
base_dir_path: str,
ignore_file_name: str) -> Generator[str, None, None]:
"""
Search the file and return the path of the file that should not be ignored.
:param base_dir_path: the base path to be searched for.
:param ignore_file_name: the file name in which specifies a regular expression pattern is written.
:return : file path not to be ignored.
"""
patterns_by_dir: Dict[str, List[Pattern[str]]] = {}
for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])
ignore_file_path = os.path.join(root, ignore_file_name)
if os.path.isfile(ignore_file_path):
with open(ignore_file_path, 'r') as file:
lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")]
patterns += [re.compile(line) for line in lines_no_comments if line]
patterns = list(set(patterns))
dirs[:] = [
subdir
for subdir in dirs
if not any(p.search(
os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns)
]
patterns_by_dir = {os.path.join(root, sd): patterns.copy() for sd in dirs}
for file in files: # type: ignore
if file == ignore_file_name:
continue
file_path = os.path.join(root, str(file))
if any(re.findall(p, file_path) for p in patterns):
continue
yield str(file_path)
def list_py_file_paths(directory: str, def list_py_file_paths(directory: str,
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
include_examples: Optional[bool] = None): include_examples: Optional[bool] = None):
@ -116,59 +157,32 @@ def list_py_file_paths(directory: str,
elif os.path.isfile(directory): elif os.path.isfile(directory):
return [directory] return [directory]
elif os.path.isdir(directory): elif os.path.isdir(directory):
patterns_by_dir: Dict[str, List[Pattern[str]]] = {} find_dag_file_paths(directory, file_paths, safe_mode)
for root, dirs, files in os.walk(directory, followlinks=True):
patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])
ignore_file = os.path.join(root, '.airflowignore')
if os.path.isfile(ignore_file):
with open(ignore_file, 'r') as file:
# If we have new patterns create a copy so we don't change
# the previous list (which would affect other subdirs)
lines_no_comments = [COMMENT_PATTERN.sub("", line) for line in file.read().split("\n")]
patterns += [re.compile(line) for line in lines_no_comments if line]
# If we can ignore any subdirs entirely we should - fewer paths
# to walk is better. We have to modify the ``dirs`` array in
# place for this to affect os.walk
dirs[:] = [
subdir
for subdir in dirs
if not any(p.search(os.path.join(root, subdir)) for p in patterns)
]
# We want patterns defined in a parent folder's .airflowignore to
# apply to subdirs too
for subdir in dirs:
patterns_by_dir[os.path.join(root, subdir)] = patterns.copy()
find_dag_file_paths(file_paths, files, patterns, root, safe_mode)
if include_examples: if include_examples:
from airflow import example_dags from airflow import example_dags # type: ignore
example_dag_folder = example_dags.__path__[0] # type: ignore example_dag_folder = example_dags.__path__[0] # type: ignore
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False)) file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False))
return file_paths return file_paths
def find_dag_file_paths(file_paths, files, patterns, root, safe_mode): def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
"""Finds file paths of all DAG files.""" """Finds file paths of all DAG files."""
for f in files:
for file_path in find_path_from_directory(
directory, ".airflowignore"):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
file_path = os.path.join(root, f)
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
continue continue
_, file_ext = os.path.splitext(os.path.split(file_path)[-1]) _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != '.py' and not zipfile.is_zipfile(file_path): if file_ext != '.py' and not zipfile.is_zipfile(file_path):
continue continue
if any([re.findall(p, file_path) for p in patterns]):
continue
if not might_contain_dag(file_path, safe_mode): if not might_contain_dag(file_path, safe_mode):
continue continue
file_paths.append(file_path) file_paths.append(file_path)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
log.exception("Error while examining %s", f) log.exception("Error while examining %s", file_path)
COMMENT_PATTERN = re.compile(r"\s*#.*") COMMENT_PATTERN = re.compile(r"\s*#.*")

Просмотреть файл

@ -1441,12 +1441,13 @@ do the same, but then it is more suitable to use a virtualenv and pip.
'''''''''''''' ''''''''''''''
A ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER`` A ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER``
that Airflow should intentionally ignore. Each line in ``.airflowignore`` or ``PLUGINS_FOLDER`` that Airflow should intentionally ignore.
specifies a regular expression pattern, and directories or files whose names Each line in ``.airflowignore`` specifies a regular expression pattern,
(not DAG id) match any of the patterns would be ignored (under the hood, and directories or files whose names (not DAG id) match any of the patterns
``re.findall()`` is used to match the pattern). Overall it works like a would be ignored (under the hood,``re.findall()`` is used to match the pattern).
``.gitignore`` file. Use the ``#`` character to indicate a comment; all Overall it works like a ``.gitignore`` file.
characters on a line following a ``#`` will be ignored. Use the ``#`` character to indicate a comment; all characters
on a line following a ``#`` will be ignored.
``.airflowignore`` file should be put in your ``DAG_FOLDER``. ``.airflowignore`` file should be put in your ``DAG_FOLDER``.
For example, you can prepare a ``.airflowignore`` file with contents For example, you can prepare a ``.airflowignore`` file with contents

Просмотреть файл

@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import shutil
import tempfile
import unittest
from unittest.mock import patch
from airflow import settings # type: ignore
from airflow.utils.file import find_path_from_directory # type: ignore
class TestIgnorePluginFile(unittest.TestCase):
"""
Test that the .airflowignore work and whether the file is properly ignored.
"""
def setUp(self):
"""
Make tmp folder and files that should be ignored. And set base path.
"""
self.test_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.test_dir, 'test_file.txt')
self.plugin_folder_path = os.path.join(self.test_dir, 'test_ignore')
os.mkdir(os.path.join(self.test_dir, "test_ignore"))
os.mkdir(os.path.join(self.plugin_folder_path, "subdir1"))
os.mkdir(os.path.join(self.plugin_folder_path, "subdir2"))
files_content = [
["test_load.py", "#Should not be ignored file"],
["test_notload.py", 'raise Exception("This file should have been ignored!")'],
[".airflowignore", "#ignore test\nnot\nsubdir2"],
["subdir1/.airflowignore", "#ignore test\nnone"],
["subdir1/test_load_sub1.py", "#Should not be ignored file"],
["test_notload_sub.py", 'raise Exception("This file should have been ignored!")'],
["subdir1/test_noneload_sub1.py", 'raise Exception("This file should have been ignored!")'],
["subdir2/test_shouldignore.py", 'raise Exception("This file should have been ignored!")'],
]
for file_path, content in files_content:
with open(os.path.join(self.plugin_folder_path, file_path), "w") as f:
f.write(content)
self.mock_plugins_folder = patch.object(
settings, 'PLUGINS_FOLDER', return_value=self.plugin_folder_path
)
def tearDown(self):
"""
Delete tmp folder
"""
shutil.rmtree(self.test_dir)
def test_find_not_should_ignore_path(self):
"""
Test that the .airflowignore work and whether the file is properly ignored.
"""
detected_files = set()
should_ignore_files = {
'test_notload.py',
'test_notload_sub.py',
'test_noneload_sub1.py',
'test_shouldignore.py',
}
should_not_ignore_files = {
'test_load.py',
'test_load_sub1.py',
}
ignore_list_file = ".airflowignore"
for file_path in find_path_from_directory(self.plugin_folder_path, ignore_list_file):
if not os.path.isfile(file_path):
continue
_, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != '.py':
continue
detected_files.add(os.path.basename(file_path))
self.assertEqual(detected_files, should_not_ignore_files)
self.assertEqual(detected_files & should_ignore_files, set())