[AIRFLOW-2548] Output plugin import errors to web UI (#3930)
This commit is contained in:
Родитель
aa2dc603ab
Коммит
9266c0fb60
|
@ -34,6 +34,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
|
|||
|
||||
log = LoggingMixin().log
|
||||
|
||||
import_errors = {}
|
||||
|
||||
|
||||
class AirflowPluginException(Exception):
|
||||
pass
|
||||
|
@ -99,6 +101,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
|
|||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.error('Failed to import plugin %s', filepath)
|
||||
import_errors[filepath] = str(e)
|
||||
|
||||
|
||||
def make_module(name, objects):
|
||||
|
|
|
@ -2130,6 +2130,14 @@ class HomeView(AdminIndexView):
|
|||
"Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
|
||||
"error")
|
||||
|
||||
from airflow.plugins_manager import import_errors as plugin_import_errors
|
||||
for filename, stacktrace in plugin_import_errors.items():
|
||||
flash(
|
||||
"Broken plugin: [{filename}] {stacktrace}".format(
|
||||
stacktrace=stacktrace,
|
||||
filename=filename),
|
||||
"error")
|
||||
|
||||
# get a list of all non-subdag dags visible to everyone
|
||||
# optionally filter out "paused" dags
|
||||
if hide_paused:
|
||||
|
|
|
@ -185,15 +185,20 @@ class Airflow(AirflowBaseView):
|
|||
if hide_paused:
|
||||
sql_query = sql_query.filter(~DM.is_paused)
|
||||
|
||||
# Get all the dag id the user could access
|
||||
filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
|
||||
|
||||
import_errors = session.query(models.ImportError).all()
|
||||
for ie in import_errors:
|
||||
flash(
|
||||
"Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
|
||||
"error")
|
||||
|
||||
from airflow.plugins_manager import import_errors as plugin_import_errors
|
||||
for filename, stacktrace in plugin_import_errors.items():
|
||||
flash(
|
||||
"Broken plugin: [{filename}] {stacktrace}".format(
|
||||
stacktrace=stacktrace,
|
||||
filename=filename),
|
||||
"error")
|
||||
|
||||
# get a list of all non-subdag dags visible to everyone
|
||||
# optionally filter out "paused" dags
|
||||
if hide_paused:
|
||||
|
@ -204,6 +209,9 @@ class Airflow(AirflowBaseView):
|
|||
unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
|
||||
not dag.parent_dag]
|
||||
|
||||
# Get all the dag id the user could access
|
||||
filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
|
||||
|
||||
if 'all_dags' in filter_dag_ids:
|
||||
orm_dags = {dag.dag_id: dag for dag
|
||||
in sql_query
|
||||
|
|
|
@ -119,6 +119,8 @@ For example,
|
|||
* For ``Operator`` plugin, an ``execute`` method is compulsory.
|
||||
* For ``Sensor`` plugin, a ``poke`` method returning a Boolean value is compulsory.
|
||||
|
||||
Make sure you restart the webserver and scheduler after making changes to plugins so that they take effect.
|
||||
|
||||
|
||||
Example
|
||||
-------
|
||||
|
|
Загрузка…
Ссылка в новой задаче