[AIRFLOW-333][AIRFLOW-258] Fix non-module plugin components

* Distinguish between module and non-module plugin
components
* Fix handling of non-module plugin components

  * admin views, flask blueprints, and menu links
need to not be
    wrapped in modules

* Fix improper use of zope.deprecation.deprecated

  * zope.deprecation.deprecated does NOT support
classes as
    first parameter
  * deprecating classes must be handled by calling
the deprecate
    function on the class name

* Added tests for plugin loading
* Updated plugin documentation to match test
plugin
* Updated executors to always load plugins
* More logging

Closes #1738 from gwax/plugin_module_fixes
This commit is contained in:
George Leslie-Waksman 2016-10-01 23:43:20 -07:00 коммит произвёл Siddharth Anand
Родитель c37740f531
Коммит eb5982d4aa
11 изменённых файлов: 232 добавлений и 48 удалений

Просмотреть файл

@ -82,4 +82,5 @@ from airflow import contrib
operators._integrate_plugins()
hooks._integrate_plugins()
executors._integrate_plugins()
macros._integrate_plugins()

Просмотреть файл

@ -402,6 +402,7 @@ TEST_CONFIG = """\
unit_test_mode = True
airflow_home = {AIRFLOW_HOME}
dags_folder = {TEST_DAGS_FOLDER}
plugins_folder = {TEST_PLUGINS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
executor = SequentialExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
@ -683,6 +684,16 @@ if os.path.exists(_TEST_DAGS_FOLDER):
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')
# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'plugins')
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins')
def parameterized_config(template):
"""

Просмотреть файл

@ -13,6 +13,7 @@
# limitations under the License.
import logging
import sys
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
@ -26,6 +27,14 @@ except:
from airflow.exceptions import AirflowException
def _integrate_plugins():
"""Integrate plugins to the context."""
from airflow.plugins_manager import executors_modules
for executors_module in executors_modules:
sys.modules[executors_module.__name__] = executors_module
globals()[executors_module._name] = executors_module
_EXECUTOR = configuration.get('core', 'EXECUTOR')
if _EXECUTOR == 'LocalExecutor':
@ -39,9 +48,7 @@ elif _EXECUTOR == 'MesosExecutor':
DEFAULT_EXECUTOR = MesosExecutor()
else:
# Loading plugins
from airflow.plugins_manager import executors as _executors
for _executor in _executors:
globals()[_executor.__name__] = _executor
_integrate_plugins()
if _EXECUTOR in globals():
DEFAULT_EXECUTOR = globals()[_EXECUTOR]()
else:

Просмотреть файл

@ -64,21 +64,24 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import hooks as _hooks
for _hook_module in _hooks:
sys.modules[_hook_module.__name__] = _hook_module
globals()[_hook_module._name] = _hook_module
from airflow.plugins_manager import hooks_modules
for hooks_module in hooks_modules:
sys.modules[hooks_module.__name__] = hooks_module
globals()[hooks_module._name] = hooks_module
##########################################################
# TODO FIXME Remove in Airflow 2.0
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
for _hook in _hook_module._objects:
globals()[_hook.__name__] = _deprecated(
_hook,
for _hook in hooks_module._objects:
hook_name = _hook.__name__
globals()[hook_name] = _hook
_deprecated(
hook_name,
"Importing plugin hook '{i}' directly from "
"'airflow.hooks' has been deprecated. Please "
"import from 'airflow.hooks.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=_hook))
"entirely in Airflow 2.0.".format(i=hook_name))

Просмотреть файл

@ -65,10 +65,10 @@ def ds_format(ds, input_format, output_format):
def _integrate_plugins():
"""Integrate plugins to the context"""
import sys
from airflow.plugins_manager import macros as _macros
for _macro_module in _macros:
sys.modules[_macro_module.__name__] = _macro_module
globals()[_macro_module._name] = _macro_module
from airflow.plugins_manager import macros_modules
for macros_module in macros_modules:
sys.modules[macros_module.__name__] = macros_module
globals()[macros_module._name] = macros_module
##########################################################
# TODO FIXME Remove in Airflow 2.0
@ -76,11 +76,13 @@ def _integrate_plugins():
import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
for _macro in _macro_module._objects:
globals()[_macro.__name__] = _deprecated(
_macro,
for _macro in macros_module._objects:
macro_name = _macro.__name__
globals()[macro_name] = _macro
_deprecated(
macro_name,
"Importing plugin macro '{i}' directly from "
"'airflow.macros' has been deprecated. Please "
"import from 'airflow.macros.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=_macro))
"entirely in Airflow 2.0.".format(i=macro_name))

Просмотреть файл

@ -101,21 +101,23 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
def _integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import operators as _operators
for _operator_module in _operators:
sys.modules[_operator_module.__name__] = _operator_module
globals()[_operator_module._name] = _operator_module
from airflow.plugins_manager import operators_modules
for operators_module in operators_modules:
sys.modules[operators_module.__name__] = operators_module
globals()[operators_module._name] = operators_module
##########################################################
# TODO FIXME Remove in Airflow 2.0
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
for _operator in _operator_module._objects:
globals()[_operator.__name__] = _deprecated(
_operator,
for _operator in operators_module._objects:
operator_name = _operator.__name__
globals()[operator_name] = _operator
_deprecated(
operator_name,
"Importing plugin operator '{i}' directly from "
"'airflow.operators' has been deprecated. Please "
"import from 'airflow.operators.[plugin_module]' "
"instead. Support for direct imports will be dropped "
"entirely in Airflow 2.0.".format(i=_operator))
"entirely in Airflow 2.0.".format(i=operator_name))

Просмотреть файл

@ -24,8 +24,6 @@ import logging
import os
import re
import sys
from itertools import chain
merge = chain.from_iterable
from airflow import configuration
@ -74,6 +72,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
if file_ext != '.py':
continue
logging.info('Importing plugin module ' + filepath)
# normalize root path as namespace
namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
@ -93,6 +92,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
def make_module(name, objects):
logging.info('Creating module ' + name)
name = name.lower()
module = imp.new_module(name)
module._name = name.split('.')[-1]
@ -100,18 +100,25 @@ def make_module(name, objects):
module.__dict__.update((o.__name__, o) for o in objects)
return module
operators, hooks, executors, macros, admin_views = [], [], [], [], []
flask_blueprints, menu_links = [], []
# Plugin components to integrate as modules
operators_modules = []
hooks_modules = []
executors_modules = []
macros_modules = []
# Plugin components to integrate directly
admin_views = []
flask_blueprints = []
menu_links = []
for p in plugins:
operators.append(make_module('airflow.operators.' + p.name, p.operators))
hooks.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors.append(make_module('airflow.executors.' + p.name, p.executors))
macros.append(make_module('airflow.macros.' + p.name, p.macros))
admin_views.append(
make_module('airflow.www.admin_views' + p.name, p.admin_views))
flask_blueprints.append(
make_module(
'airflow.www.flask_blueprints' + p.name, p.flask_blueprints))
menu_links.append(
make_module('airflow.www.menu_links' + p.name, p.menu_links))
operators_modules.append(
make_module('airflow.operators.' + p.name, p.operators))
hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors_modules.append(
make_module('airflow.executors.' + p.name, p.executors))
macros_modules.append(make_module('airflow.macros.' + p.name, p.macros))
admin_views.extend(p.admin_views)
flask_blueprints.extend(p.flask_blueprints)
menu_links.extend(p.menu_links)

Просмотреть файл

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import socket
from flask import Flask
@ -109,10 +110,13 @@ def create_app(config=None):
from airflow.plugins_manager import (
admin_views, flask_blueprints, menu_links)
for v in admin_views:
logging.info('Adding view ' + v.name)
admin.add_view(v)
for bp in flask_blueprints:
logging.info('Adding blueprint ' + bp.name)
app.register_blueprint(bp)
for ml in sorted(menu_links, key=lambda x: x.name):
logging.info('Adding menu link ' + ml.name)
admin.add_link(ml)
integrate_plugins()

Просмотреть файл

@ -96,18 +96,22 @@ definitions in Airflow.
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
# Will show up under airflow.hooks.PluginHook
# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook(BaseHook):
pass
# Will show up under airflow.operators.PluginOperator
# Will show up under airflow.operators.test_plugin.PluginOperator
class PluginOperator(BaseOperator):
pass
# Will show up under airflow.executors.PluginExecutor
# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor(BaseExecutor):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
def plugin_macro():
pass
# Creating a flask admin BaseView
class TestView(BaseView):
@expose('/')
@ -119,10 +123,10 @@ definitions in Airflow.
# Creating a flask blueprint to intergrate the templates and static folder
bp = Blueprint(
"test_plugin", __name__,
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
static_folder='static',
static_url_path='/static/test_plugin')
ml = MenuLink(
category='Test Plugin',
name='Test Menu Link',
@ -132,8 +136,9 @@ definitions in Airflow.
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [PluginOperator]
flask_blueprints = [bp]
hooks = [PluginHook]
executors = [PluginExecutor]
macros = [plugin_macro]
admin_views = [v]
flask_blueprints = [bp]
menu_links = [ml]

Просмотреть файл

@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook(BaseHook):
pass
# Will show up under airflow.operators.test_plugin.PluginOperator
class PluginOperator(BaseOperator):
pass
# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor(BaseExecutor):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
def plugin_macro():
pass
# Creating a flask admin BaseView
class TestView(BaseView):
@expose('/')
def test(self):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")
# Creating a flask blueprint to intergrate the templates and static folder
bp = Blueprint(
"test_plugin", __name__,
template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
static_folder='static',
static_url_path='/static/test_plugin')
ml = MenuLink(
category='Test Plugin',
name='Test Menu Link',
url='http://pythonhosted.org/airflow/')
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [PluginOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
macros = [plugin_macro]
admin_views = [v]
flask_blueprints = [bp]
menu_links = [ml]

70
tests/plugins_manager.py Normal file
Просмотреть файл

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import inspect
import logging
import unittest
from flask.blueprints import Blueprint
from flask_admin import BaseView
from flask_admin.menu import MenuLink, MenuView
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
from airflow.www.app import cached_app
class PluginsTest(unittest.TestCase):
def test_operators(self):
from airflow.operators.test_plugin import PluginOperator
assert issubclass(PluginOperator, BaseOperator)
def test_hooks(self):
from airflow.hooks.test_plugin import PluginHook
assert issubclass(PluginHook, BaseHook)
def test_executors(self):
from airflow.executors.test_plugin import PluginExecutor
assert issubclass(PluginExecutor, BaseExecutor)
def test_macros(self):
from airflow.macros.test_plugin import plugin_macro
assert callable(plugin_macro)
def test_admin_views(self):
app = cached_app()
[admin] = app.extensions['admin']
category = admin._menu_categories['Test Plugin']
[admin_view] = [v for v in category.get_children()
if isinstance(v, MenuView)]
assert admin_view.name == 'Test View'
def test_flask_blueprints(self):
app = cached_app()
assert isinstance(app.blueprints['test_plugin'], Blueprint)
def test_menu_links(self):
app = cached_app()
[admin] = app.extensions['admin']
category = admin._menu_categories['Test Plugin']
[menu_link] = [ml for ml in category.get_children()
if isinstance(ml, MenuLink)]
assert menu_link.name == 'Test Menu Link'