From fedc5a092c4ceb74c7d02ff932ad7de796705d43 Mon Sep 17 00:00:00 2001 From: Edward Wells Date: Fri, 2 Mar 2018 09:29:06 +0100 Subject: [PATCH] [AIRFLOW-2147] Plugin manager: added 'sensors' attribute AirflowPlugin required both BaseOperator and BaseSensorOperator to be included in its `operators` attribute. Add a `sensors` attribute and updated import logic so that anything added to the new attribute can be imported from `airflow.sensors.{plugin_name}` The integration/`make_module` calls in `airflow.plugins_manager` for operators is also updated to maintain the ability to import sensors from `operators` to avoid breaking existing plugins - Update unit tests and documentation to reflect this - Added exclusion for flake8 module level import not at top of file Closes #3075 from arcward/AIRFLOW-2147 --- airflow/__init__.py | 2 ++ airflow/plugins_manager.py | 5 +++-- airflow/sensors/__init__.py | 10 +++++----- docs/plugins.rst | 12 ++++++++++-- tests/plugins/test_plugin.py | 10 +++++++++- tests/plugins_manager.py | 5 +++++ 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/airflow/__init__.py b/airflow/__init__.py index 3c5f24c218..4c4509e00e 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -76,11 +76,13 @@ class AirflowMacroPlugin(object): self.namespace = namespace from airflow import operators +from airflow import sensors # noqa: E402 from airflow import hooks from airflow import executors from airflow import macros operators._integrate_plugins() +sensors._integrate_plugins() # noqa: E402 hooks._integrate_plugins() executors._integrate_plugins() macros._integrate_plugins() diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 22a873ca45..aaae4230b7 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -36,6 +36,7 @@ class AirflowPluginException(Exception): class AirflowPlugin(object): name = None operators = [] + sensors = [] hooks = [] executors = [] macros = [] @@ -115,9 +116,9 @@ menu_links = [] for p in plugins: operators_modules.append( - make_module('airflow.operators.' + p.name, p.operators)) + make_module('airflow.operators.' + p.name, p.operators + p.sensors)) sensors_modules.append( - make_module('airflow.sensors.' + p.name, p.operators) + make_module('airflow.sensors.' + p.name, p.sensors) ) hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks)) executors_modules.append( diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py index 2239467051..9c936f75fb 100644 --- a/airflow/sensors/__init__.py +++ b/airflow/sensors/__init__.py @@ -48,13 +48,13 @@ def _integrate_plugins(): if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): from zope.deprecation import deprecated as _deprecated - for _operator in sensors_module._objects: - operator_name = _operator.__name__ - globals()[operator_name] = _operator + for _sensor in sensors_module._objects: + sensor_name = _sensor.__name__ + globals()[sensor_name] = _sensor _deprecated( - operator_name, + sensor_name, "Importing plugin operator '{i}' directly from " "'airflow.operators' has been deprecated. Please " "import from 'airflow.operators.[plugin_module]' " "instead. Support for direct imports will be dropped " - "entirely in Airflow 2.0.".format(i=operator_name)) + "entirely in Airflow 2.0.".format(i=sensor_name)) diff --git a/docs/plugins.rst b/docs/plugins.rst index feccb5becf..eaba1a1bb0 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -6,7 +6,7 @@ features to its core by simply dropping files in your ``$AIRFLOW_HOME/plugins`` folder. The python modules in the ``plugins`` folder get imported, -and **hooks**, **operators**, **macros**, **executors** and web **views** +and **hooks**, **operators**, **sensors**, **macros**, **executors** and web **views** get integrated to Airflow's main collections and become available for use. What for? @@ -61,6 +61,8 @@ looks like: name = None # A list of class(es) derived from BaseOperator operators = [] + # A list of class(es) derived from BaseSensorOperator + sensors = [] # A list of class(es) derived from BaseHook hooks = [] # A list of class(es) derived from BaseExecutor @@ -93,7 +95,8 @@ definitions in Airflow. # Importing base classes that we need to derive from airflow.hooks.base_hook import BaseHook - from airflow.models import BaseOperator + from airflow.models import BaseOperator + from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.executors.base_executor import BaseExecutor # Will show up under airflow.hooks.test_plugin.PluginHook @@ -104,6 +107,10 @@ definitions in Airflow. class PluginOperator(BaseOperator): pass + # Will show up under airflow.sensors.test_plugin.PluginSensorOperator + class PluginSensorOperator(BaseSensorOperator): + pass + # Will show up under airflow.executors.test_plugin.PluginExecutor class PluginExecutor(BaseExecutor): pass @@ -136,6 +143,7 @@ definitions in Airflow. class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" operators = [PluginOperator] + sensors = [PluginSensorOperator] hooks = [PluginHook] executors = [PluginExecutor] macros = [plugin_macro] diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py index e342fe6ce2..49325e68b7 100644 --- a/tests/plugins/test_plugin.py +++ b/tests/plugins/test_plugin.py @@ -21,7 +21,8 @@ from flask_admin.base import MenuLink # Importing base classes that we need to derive from airflow.hooks.base_hook import BaseHook -from airflow.models import BaseOperator +from airflow.models import BaseOperator +from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.executors.base_executor import BaseExecutor # Will show up under airflow.hooks.test_plugin.PluginHook @@ -32,6 +33,12 @@ class PluginHook(BaseHook): class PluginOperator(BaseOperator): pass + +# Will show up under airflow.sensors.test_plugin.PluginSensorOperator +class PluginSensorOperator(BaseSensorOperator): + pass + + # Will show up under airflow.executors.test_plugin.PluginExecutor class PluginExecutor(BaseExecutor): pass @@ -65,6 +72,7 @@ ml = MenuLink( class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" operators = [PluginOperator] + sensors = [PluginSensorOperator] hooks = [PluginHook] executors = [PluginExecutor] macros = [plugin_macro] diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py index 65eb12fcd0..a00d476f03 100644 --- a/tests/plugins_manager.py +++ b/tests/plugins_manager.py @@ -27,6 +27,7 @@ from flask_admin.menu import MenuLink, MenuView from airflow.hooks.base_hook import BaseHook from airflow.models import BaseOperator +from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.executors.base_executor import BaseExecutor from airflow.www.app import cached_app @@ -37,6 +38,10 @@ class PluginsTest(unittest.TestCase): from airflow.operators.test_plugin import PluginOperator self.assertTrue(issubclass(PluginOperator, BaseOperator)) + def test_sensors(self): + from airflow.sensors.test_plugin import PluginSensorOperator + self.assertTrue(issubclass(PluginSensorOperator, BaseSensorOperator)) + def test_hooks(self): from airflow.hooks.test_plugin import PluginHook self.assertTrue(issubclass(PluginHook, BaseHook))