A simple plugin system for Airflow

This commit is contained in:
Maxime Beauchemin 2015-06-10 23:09:22 -07:00 коммит произвёл Maxime
Родитель b0d35a0fb7
Коммит 22ac771af1
12 изменённых файлов: 244 добавлений и 4 удалений

Просмотреть файл

@ -1,14 +1,16 @@
__version__ = "1.0.1"
"""
Authentication is implemented using flask_login and different environments can
implement their own login mechanisms by providing an `airflow_login` module
in their PYTHONPATH. airflow_login should be based off the
`airflow.www.login`
"""
__version__ = "1.0.1"
import logging
from airflow.configuration import conf
from airflow.models import DAG
from flask.ext.admin import BaseView
from airflow import default_login as login
if conf.getboolean('webserver', 'AUTHENTICATE'):
@ -20,3 +22,19 @@ if conf.getboolean('webserver', 'AUTHENTICATE'):
"authenticate is set to True in airflow.cfg, "
"but airflow_login failed to import")
class AirflowViewPlugin(BaseView):
pass
class AirflowMacroPlugin(object):
def __init__(self, namespace):
self.namespace = namespace
from airflow import operators
from airflow import hooks
from airflow import executors
from airflow import macros
operators.integrate_plugins()
hooks.integrate_plugins()
macros.integrate_plugins()

Просмотреть файл

@ -11,6 +11,7 @@ defaults = {
'unit_test_mode': False,
'parallelism': 32,
'load_examples': True,
'plugins_folder': None,
},
'webserver': {
'base_url': 'http://localhost:8080',
@ -64,6 +65,9 @@ parallelism = 32
# environment
load_examples = True
# Where your Airflow plugins are stored
{AIRFLOW_HOME}/plugins
[webserver]
# The base url of your website as airflow cannot guess what domain or

Просмотреть файл

@ -1,6 +1,7 @@
import logging
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.sequential_executor import SequentialExecutor
@ -15,6 +16,14 @@ elif _EXECUTOR == 'CeleryExecutor':
elif _EXECUTOR == 'SequentialExecutor':
DEFAULT_EXECUTOR = SequentialExecutor()
else:
raise AirflowException("Executor {0} not supported.".format(_EXECUTOR))
# Loading plugins
from airflow.plugins_manager import get_plugins
executor_plugins = {}
for _plugin in get_plugins(BaseExecutor):
globals()[_plugin.__name__] = _plugin
if _EXECUTOR in globals():
DEFAULT_EXECUTOR = globals()[_EXECUTOR]
else:
raise AirflowException("Executor {0} not supported.".format(_EXECUTOR))
logging.info("Using executor " + _EXECUTOR)

Просмотреть файл

@ -3,6 +3,7 @@ Imports the hooks dynamically while keeping the package API clean,
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook as _BaseHook
_hooks = {
'hive_hooks': [
@ -20,3 +21,10 @@ _hooks = {
}
_import_module_attrs(globals(), _hooks)
def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
for _plugin in get_plugins(_BaseHook):
globals()[_plugin.__name__] = _plugin

Просмотреть файл

@ -97,6 +97,7 @@ class SqliteHook(BaseHook):
def get_pandas_df(self, sql):
"""
Executes the sql and returns a pandas dataframe
>>> h = SqliteHook()
>>> sql = "SELECT * FROM test_table WHERE i=1 LIMIT 1;"
>>> h.get_pandas_df(sql)

Просмотреть файл

@ -44,3 +44,11 @@ def ds_format(ds, input_format, output_format):
'2015-01-05'
"""
return datetime.strptime(ds, input_format).strftime(output_format)
def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
from airflow import AirflowMacroPlugin
for _plugin in get_plugins(AirflowMacroPlugin, expect_class=False):
globals()[_plugin.namespace] = _plugin

Просмотреть файл

@ -3,6 +3,7 @@ Imports operators dynamically while keeping the package API clean,
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.models import BaseOperator as _BaseOperator
_operators = {
'bash_operator': ['BashOperator'],
@ -37,3 +38,9 @@ _operators = {
}
_import_module_attrs(globals(), _operators)
def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins
for _plugin in get_plugins(_BaseOperator):
globals()[_plugin.__name__] = _plugin

Просмотреть файл

@ -0,0 +1,58 @@
import imp
import inspect
import logging
import os
from airflow.configuration import conf
plugins_folder = conf.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = conf.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)
plugin_modules = []
# Crawl through the plugins folder to find pluggable_classes
templates_dirs = []
for root, dirs, files in os.walk(plugins_folder):
if os.path.basename(root) == 'templates':
templates_dirs.append(root)
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
m = imp.load_source(mod_name, filepath)
plugin_modules.append(m)
except Exception() as e:
logging.exception(e)
logging.error('Failed to import plugin ' + filepath)
def register_templates_folders(app):
from jinja2 import ChoiceLoader, FileSystemLoader
new_loaders = [FileSystemLoader(s) for s in templates_dirs]
app.jinja_env.loader = ChoiceLoader([app.jinja_env.loader] + new_loaders)
def get_plugins(baseclass, expect_class=True):
"""
Set expect_class=False if you want instances of baseclass
"""
# Late Imports to aoid circular imort
plugins = []
for m in plugin_modules:
for obj in m.__dict__.values():
if ((
expect_class and inspect.isclass(obj) and
issubclass(obj, baseclass) and
obj is not baseclass)
or
(not expect_class and isinstance(obj, baseclass))):
plugins.append(obj)
return plugins

Просмотреть файл

@ -45,6 +45,19 @@ current_user = login.current_user
logout_user = login.logout_user
from airflow import default_login as login
if conf.getboolean('webserver', 'AUTHENTICATE'):
try:
# Environment specific login
import airflow_login as login
except ImportError:
logging.error(
"authenticate is set to True in airflow.cfg, "
"but airflow_login failed to import")
login_required = login.login_required
current_user = login.current_user
logout_user = login.logout_user
AUTHENTICATE = conf.getboolean('webserver', 'AUTHENTICATE')
if AUTHENTICATE is False:
login_required = lambda x: x
@ -1762,3 +1775,14 @@ class PoolModelView(SuperUserMixin, ModelView):
named_filter_urls = True
mv = PoolModelView(models.Pool, Session, name="Pools", category="Admin")
admin.add_view(mv)
def integrate_plugins():
"""Integrate plugins to the context"""
from airflow.plugins_manager import get_plugins, register_templates_folders
from airflow import AirflowViewPlugin
for view in get_plugins(AirflowViewPlugin, expect_class=False):
admin.add_view(view)
register_templates_folders(app)
integrate_plugins()

Просмотреть файл

@ -129,9 +129,10 @@ accessible and modifiable through the UI.
.. code:: python
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("foo", deser_json=True)
bar = Variable.get("bar", deser_json=True)
The second call assumes ``json`` content and will be deserialized into
``bar``. Note that ``Variable`` is a sqlalchemy model and can be used

Просмотреть файл

@ -35,4 +35,5 @@ Content
profiling
cli
scheduler
plugins
code

101
docs/plugins.rst Normal file
Просмотреть файл

@ -0,0 +1,101 @@
Plugins
=======
Airflow has a simple plugin manager built-in that can integrate external
features at its core by simply dropping files in your
``$AIRFLOW_HOME/plugins`` folder.
The python modules in the ``plugins`` folder get imported,
and **hooks**, **operators**, **macros**, **executors** and web **views**
get integrated to Airflow's main collections and become available for use.
Objects
-------
* Classes derived from ``BaseOperator`` land in ``airflow.operators``
* Classes derived from ``BaseHook`` land in ``airflow.hooks``
* Classes derived from ``BaseExecutor`` land ``airflow.executors``
* object created from a class derived from ``airflow.PluginView`` get integrated in the Flask app
* object created from ``AirflowMacroPlugin(namespace="foo")`` land in ``airflow.macros.foo``
* Files located in subfolders named ``templates`` folders become available when rendering pages
* (upcoming) CLI subcommands
What for?
---------
Airflow offers a generic toolbox for working with data. Different
organizations have different stacks and different needs. Using Airflow
plugins can be a way for companies to customize their Airflow installation
to reflect their ecosystem.
Plugins can be used as an easy way to write, share and activate new sets of
features.
There's also a need for a set of more complex application to interact with
different flavors of data and metadata.
Examples:
* A set of tools to parse Hive logs and expose Hive metadata (CPU /IO / phases/ skew /...)
* An anomaly detection framework, allowing people to collect metrics, set thresholds and alerts
* An auditing tool, helping understand who accesses what
* A config-driven SLA monitoring tool, allowing you to set monitored tables and at what time
they should land, alert people and exposes visualization of outages
* ...
Why build on top Airflow?
-------------------------
Airflow has many components that can be reused when building an application:
* A web server you can use to render your views
* A metadata database to store your models
* Access to your database, and knowledge of how to connect to them
* An array of workers that can allow your application to push workload to
* Airflow is deployed, you can just piggy back on it's deployment logistics
* Basic charting capabilities, underlying libraries and abstractions
Example
-------
The code bellow defines a plugin that injects a set of dummy object
definitions in Airflow.
.. code:: python
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.executors.base_executor import BaseExecutor
from airflow import AirflowViewPlugin, AirflowMacroPlugin
from flask_admin import expose
# Will show up under airflow.hooks.PluginHook
class PluginHook(BaseHook):
pass
# Will show up under airflow.operators.PluginOperator
class PluginOperator(BaseOperator):
pass
# Will show up under airflow.executors.PluginExecutor
class PluginExecutor(BaseExecutor):
pass
# Shows up in the UI in menu -> Plugins -> Test
class TestView(AirflowViewPlugin):
@expose('/')
def query(self):
return self.render("test.html", content="Hello galaxy!")
v = TestView(category="Plugins", name="Test")
# Available as other macros in templates {{ macros.foo_plugin.success() }}
def success():
return "Success!"
obj = AirflowMacroPlugin(namespace="foo_plugin")
obj.success = success