[AIRFLOW-2147] Plugin manager: added 'sensors' attribute
AirflowPlugin required both BaseOperator and BaseSensorOperator to be included in its `operators` attribute. Add a `sensors` attribute and updated import logic so that anything added to the new attribute can be imported from `airflow.sensors.{plugin_name}` The integration/`make_module` calls in `airflow.plugins_manager` for operators is also updated to maintain the ability to import sensors from `operators` to avoid breaking existing plugins - Update unit tests and documentation to reflect this - Added exclusion for flake8 module level import not at top of file Closes #3075 from arcward/AIRFLOW-2147
This commit is contained in:
Родитель
d4dfe2654e
Коммит
fedc5a092c
|
@ -76,11 +76,13 @@ class AirflowMacroPlugin(object):
|
|||
self.namespace = namespace
|
||||
|
||||
from airflow import operators
|
||||
from airflow import sensors # noqa: E402
|
||||
from airflow import hooks
|
||||
from airflow import executors
|
||||
from airflow import macros
|
||||
|
||||
operators._integrate_plugins()
|
||||
sensors._integrate_plugins() # noqa: E402
|
||||
hooks._integrate_plugins()
|
||||
executors._integrate_plugins()
|
||||
macros._integrate_plugins()
|
||||
|
|
|
@ -36,6 +36,7 @@ class AirflowPluginException(Exception):
|
|||
class AirflowPlugin(object):
|
||||
name = None
|
||||
operators = []
|
||||
sensors = []
|
||||
hooks = []
|
||||
executors = []
|
||||
macros = []
|
||||
|
@ -115,9 +116,9 @@ menu_links = []
|
|||
|
||||
for p in plugins:
|
||||
operators_modules.append(
|
||||
make_module('airflow.operators.' + p.name, p.operators))
|
||||
make_module('airflow.operators.' + p.name, p.operators + p.sensors))
|
||||
sensors_modules.append(
|
||||
make_module('airflow.sensors.' + p.name, p.operators)
|
||||
make_module('airflow.sensors.' + p.name, p.sensors)
|
||||
)
|
||||
hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
|
||||
executors_modules.append(
|
||||
|
|
|
@ -48,13 +48,13 @@ def _integrate_plugins():
|
|||
|
||||
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
|
||||
from zope.deprecation import deprecated as _deprecated
|
||||
for _operator in sensors_module._objects:
|
||||
operator_name = _operator.__name__
|
||||
globals()[operator_name] = _operator
|
||||
for _sensor in sensors_module._objects:
|
||||
sensor_name = _sensor.__name__
|
||||
globals()[sensor_name] = _sensor
|
||||
_deprecated(
|
||||
operator_name,
|
||||
sensor_name,
|
||||
"Importing plugin operator '{i}' directly from "
|
||||
"'airflow.operators' has been deprecated. Please "
|
||||
"import from 'airflow.operators.[plugin_module]' "
|
||||
"instead. Support for direct imports will be dropped "
|
||||
"entirely in Airflow 2.0.".format(i=operator_name))
|
||||
"entirely in Airflow 2.0.".format(i=sensor_name))
|
||||
|
|
|
@ -6,7 +6,7 @@ features to its core by simply dropping files in your
|
|||
``$AIRFLOW_HOME/plugins`` folder.
|
||||
|
||||
The python modules in the ``plugins`` folder get imported,
|
||||
and **hooks**, **operators**, **macros**, **executors** and web **views**
|
||||
and **hooks**, **operators**, **sensors**, **macros**, **executors** and web **views**
|
||||
get integrated to Airflow's main collections and become available for use.
|
||||
|
||||
What for?
|
||||
|
@ -61,6 +61,8 @@ looks like:
|
|||
name = None
|
||||
# A list of class(es) derived from BaseOperator
|
||||
operators = []
|
||||
# A list of class(es) derived from BaseSensorOperator
|
||||
sensors = []
|
||||
# A list of class(es) derived from BaseHook
|
||||
hooks = []
|
||||
# A list of class(es) derived from BaseExecutor
|
||||
|
@ -94,6 +96,7 @@ definitions in Airflow.
|
|||
# Importing base classes that we need to derive
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
||||
from airflow.executors.base_executor import BaseExecutor
|
||||
|
||||
# Will show up under airflow.hooks.test_plugin.PluginHook
|
||||
|
@ -104,6 +107,10 @@ definitions in Airflow.
|
|||
class PluginOperator(BaseOperator):
|
||||
pass
|
||||
|
||||
# Will show up under airflow.sensors.test_plugin.PluginSensorOperator
|
||||
class PluginSensorOperator(BaseSensorOperator):
|
||||
pass
|
||||
|
||||
# Will show up under airflow.executors.test_plugin.PluginExecutor
|
||||
class PluginExecutor(BaseExecutor):
|
||||
pass
|
||||
|
@ -136,6 +143,7 @@ definitions in Airflow.
|
|||
class AirflowTestPlugin(AirflowPlugin):
|
||||
name = "test_plugin"
|
||||
operators = [PluginOperator]
|
||||
sensors = [PluginSensorOperator]
|
||||
hooks = [PluginHook]
|
||||
executors = [PluginExecutor]
|
||||
macros = [plugin_macro]
|
||||
|
|
|
@ -22,6 +22,7 @@ from flask_admin.base import MenuLink
|
|||
# Importing base classes that we need to derive
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
||||
from airflow.executors.base_executor import BaseExecutor
|
||||
|
||||
# Will show up under airflow.hooks.test_plugin.PluginHook
|
||||
|
@ -32,6 +33,12 @@ class PluginHook(BaseHook):
|
|||
class PluginOperator(BaseOperator):
|
||||
pass
|
||||
|
||||
|
||||
# Will show up under airflow.sensors.test_plugin.PluginSensorOperator
|
||||
class PluginSensorOperator(BaseSensorOperator):
|
||||
pass
|
||||
|
||||
|
||||
# Will show up under airflow.executors.test_plugin.PluginExecutor
|
||||
class PluginExecutor(BaseExecutor):
|
||||
pass
|
||||
|
@ -65,6 +72,7 @@ ml = MenuLink(
|
|||
class AirflowTestPlugin(AirflowPlugin):
|
||||
name = "test_plugin"
|
||||
operators = [PluginOperator]
|
||||
sensors = [PluginSensorOperator]
|
||||
hooks = [PluginHook]
|
||||
executors = [PluginExecutor]
|
||||
macros = [plugin_macro]
|
||||
|
|
|
@ -27,6 +27,7 @@ from flask_admin.menu import MenuLink, MenuView
|
|||
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
||||
from airflow.executors.base_executor import BaseExecutor
|
||||
from airflow.www.app import cached_app
|
||||
|
||||
|
@ -37,6 +38,10 @@ class PluginsTest(unittest.TestCase):
|
|||
from airflow.operators.test_plugin import PluginOperator
|
||||
self.assertTrue(issubclass(PluginOperator, BaseOperator))
|
||||
|
||||
def test_sensors(self):
|
||||
from airflow.sensors.test_plugin import PluginSensorOperator
|
||||
self.assertTrue(issubclass(PluginSensorOperator, BaseSensorOperator))
|
||||
|
||||
def test_hooks(self):
|
||||
from airflow.hooks.test_plugin import PluginHook
|
||||
self.assertTrue(issubclass(PluginHook, BaseHook))
|
||||
|
|
Загрузка…
Ссылка в новой задаче