Optional import error tracebacks in web ui (#10663)
This PR allows for partial import error tracebacks to be exposed on the UI, if enabled. This extra context can be very helpful for users without access to the parsing logs to determine why their DAGs are failing to import properly.
This commit is contained in:
Родитель
3ca11eb9b0
Коммит
c74b3ac79a
|
@ -229,6 +229,21 @@
|
|||
type: string
|
||||
example: ~
|
||||
default: "30"
|
||||
- name: dagbag_import_error_tracebacks
|
||||
description: |
|
||||
Should a traceback be shown in the UI for dagbag import errors,
|
||||
instead of just the exception message
|
||||
version_added: 2.0.0
|
||||
type: boolean
|
||||
example: ~
|
||||
default: "True"
|
||||
- name: dagbag_import_error_traceback_depth
|
||||
description: |
|
||||
If tracebacks are shown, how many entries from the traceback should be shown
|
||||
version_added: 2.0.0
|
||||
type: integer
|
||||
example: ~
|
||||
default: "2"
|
||||
- name: dag_file_processor_timeout
|
||||
description: |
|
||||
How long before timing out a DagFileProcessor, which processes a dag file
|
||||
|
|
|
@ -142,6 +142,13 @@ donot_pickle = True
|
|||
# How long before timing out a python file import
|
||||
dagbag_import_timeout = 30
|
||||
|
||||
# Should a traceback be shown in the UI for dagbag import errors,
|
||||
# instead of just the exception message
|
||||
dagbag_import_error_tracebacks = True
|
||||
|
||||
# If tracebacks are shown, how many entries from the traceback should be shown
|
||||
dagbag_import_error_traceback_depth = 2
|
||||
|
||||
# How long before timing out a DagFileProcessor, which processes a dag file
|
||||
dag_file_processor_timeout = 50
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import importlib.util
|
|||
import os
|
||||
import sys
|
||||
import textwrap
|
||||
import traceback
|
||||
import warnings
|
||||
import zipfile
|
||||
from datetime import datetime, timedelta
|
||||
|
@ -115,6 +116,9 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
# Only used by read_dags_from_db=True
|
||||
self.dags_last_fetched: Dict[str, datetime] = {}
|
||||
|
||||
self.dagbag_import_error_tracebacks = conf.getboolean('core', 'dagbag_import_error_tracebacks')
|
||||
self.dagbag_import_error_traceback_depth = conf.getint('core', 'dagbag_import_error_traceback_depth')
|
||||
|
||||
self.collect_dags(
|
||||
dag_folder=dag_folder,
|
||||
include_examples=include_examples,
|
||||
|
@ -275,7 +279,12 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
return [new_module]
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
self.log.exception("Failed to import: %s", filepath)
|
||||
self.import_errors[filepath] = str(e)
|
||||
if self.dagbag_import_error_tracebacks:
|
||||
self.import_errors[filepath] = traceback.format_exc(
|
||||
limit=-self.dagbag_import_error_traceback_depth
|
||||
)
|
||||
else:
|
||||
self.import_errors[filepath] = str(e)
|
||||
return []
|
||||
|
||||
def _load_modules_from_zip(self, filepath, safe_mode):
|
||||
|
@ -314,7 +323,12 @@ class DagBag(BaseDagBag, LoggingMixin):
|
|||
mods.append(current_module)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
self.log.exception("Failed to import: %s", filepath)
|
||||
self.import_errors[filepath] = str(e)
|
||||
if self.dagbag_import_error_tracebacks:
|
||||
self.import_errors[filepath] = traceback.format_exc(
|
||||
limit=-self.dagbag_import_error_traceback_depth
|
||||
)
|
||||
else:
|
||||
self.import_errors[filepath] = str(e)
|
||||
return mods
|
||||
|
||||
def _process_modules(self, filepath, mods, file_last_changed_on_disk):
|
||||
|
|
|
@ -277,6 +277,10 @@ label[for="timezone-other"],
|
|||
margin-bottom: 15px;
|
||||
}
|
||||
|
||||
.dag-import-error {
|
||||
white-space: pre;
|
||||
}
|
||||
|
||||
/* stylelint-disable declaration-block-single-line-max-declarations */
|
||||
.hll { background-color: #ffc; }
|
||||
.c { color: #408080; font-style: italic; } /* Comment */
|
||||
|
|
|
@ -65,9 +65,7 @@
|
|||
<div id="alerts" class="panel-collapse collapse in">
|
||||
<div class="panel-body" >
|
||||
{% for category, m in dag_import_errors %}
|
||||
<div class="alert alert-error">
|
||||
{{ m }}
|
||||
</div>
|
||||
<div class="alert alert-error dag-import-error">{{ m }}</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -1262,6 +1262,8 @@ tokopedia
|
|||
tolerations
|
||||
tooltip
|
||||
tooltips
|
||||
traceback
|
||||
tracebacks
|
||||
travis
|
||||
trojan
|
||||
tsv
|
||||
|
|
|
@ -23,6 +23,7 @@ import shutil
|
|||
import unittest
|
||||
from datetime import timedelta
|
||||
from tempfile import NamedTemporaryFile, mkdtemp
|
||||
from zipfile import ZipFile
|
||||
|
||||
import mock
|
||||
import psutil
|
||||
|
@ -75,6 +76,11 @@ TRY_NUMBER = 1
|
|||
# files contain a DAG (otherwise Airflow will skip them)
|
||||
PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"'
|
||||
UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG'
|
||||
INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = (
|
||||
"def something():\n"
|
||||
" return airflow_DAG\n"
|
||||
"something()"
|
||||
)
|
||||
|
||||
# Filename to be used for dags that are created in an ad-hoc manner and can be removed/
|
||||
# created at runtime
|
||||
|
@ -3133,6 +3139,7 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
|
||||
self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')
|
||||
|
||||
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
|
||||
def test_add_unparseable_file_before_sched_start_creates_import_error(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
|
@ -3154,6 +3161,7 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
self.assertEqual(import_error.stacktrace,
|
||||
"invalid syntax ({}, line 1)".format(TEMP_DAG_FILENAME))
|
||||
|
||||
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
|
||||
def test_add_unparseable_file_after_sched_start_creates_import_error(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
|
@ -3192,6 +3200,7 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
|
||||
self.assertEqual(len(import_errors), 0)
|
||||
|
||||
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
|
||||
def test_new_import_error_replaces_old(self):
|
||||
try:
|
||||
dags_folder = mkdtemp()
|
||||
|
@ -3265,6 +3274,120 @@ class TestSchedulerJob(unittest.TestCase):
|
|||
|
||||
self.assertEqual(len(import_errors), 0)
|
||||
|
||||
def test_import_error_tracebacks(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
|
||||
with open(unparseable_filename, "w") as unparseable_file:
|
||||
unparseable_file.writelines(INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)
|
||||
self.run_single_scheduler_loop_with_no_dags(dags_folder)
|
||||
finally:
|
||||
shutil.rmtree(dags_folder)
|
||||
|
||||
with create_session() as session:
|
||||
import_errors = session.query(errors.ImportError).all()
|
||||
|
||||
self.assertEqual(len(import_errors), 1)
|
||||
import_error = import_errors[0]
|
||||
self.assertEqual(import_error.filename, unparseable_filename)
|
||||
expected_stacktrace = (
|
||||
"Traceback (most recent call last):\n"
|
||||
' File "{}", line 3, in <module>\n'
|
||||
" something()\n"
|
||||
' File "{}", line 2, in something\n'
|
||||
" return airflow_DAG\n"
|
||||
"NameError: name 'airflow_DAG' is not defined\n"
|
||||
)
|
||||
self.assertEqual(
|
||||
import_error.stacktrace,
|
||||
expected_stacktrace.format(unparseable_filename, unparseable_filename)
|
||||
)
|
||||
|
||||
@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
|
||||
def test_import_error_traceback_depth(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME)
|
||||
with open(unparseable_filename, "w") as unparseable_file:
|
||||
unparseable_file.writelines(INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)
|
||||
self.run_single_scheduler_loop_with_no_dags(dags_folder)
|
||||
finally:
|
||||
shutil.rmtree(dags_folder)
|
||||
|
||||
with create_session() as session:
|
||||
import_errors = session.query(errors.ImportError).all()
|
||||
|
||||
self.assertEqual(len(import_errors), 1)
|
||||
import_error = import_errors[0]
|
||||
self.assertEqual(import_error.filename, unparseable_filename)
|
||||
expected_stacktrace = (
|
||||
"Traceback (most recent call last):\n"
|
||||
' File "{}", line 2, in something\n'
|
||||
" return airflow_DAG\n"
|
||||
"NameError: name 'airflow_DAG' is not defined\n"
|
||||
)
|
||||
self.assertEqual(
|
||||
import_error.stacktrace, expected_stacktrace.format(unparseable_filename)
|
||||
)
|
||||
|
||||
def test_import_error_tracebacks_zip(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
invalid_zip_filename = os.path.join(dags_folder, "test_zip_invalid.zip")
|
||||
invalid_dag_filename = os.path.join(dags_folder, "test_zip_invalid.zip", TEMP_DAG_FILENAME)
|
||||
with ZipFile(invalid_zip_filename, "w") as invalid_zip_file:
|
||||
invalid_zip_file.writestr(TEMP_DAG_FILENAME, INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)
|
||||
self.run_single_scheduler_loop_with_no_dags(dags_folder)
|
||||
finally:
|
||||
shutil.rmtree(dags_folder)
|
||||
|
||||
with create_session() as session:
|
||||
import_errors = session.query(errors.ImportError).all()
|
||||
|
||||
self.assertEqual(len(import_errors), 1)
|
||||
import_error = import_errors[0]
|
||||
self.assertEqual(import_error.filename, invalid_zip_filename)
|
||||
expected_stacktrace = (
|
||||
"Traceback (most recent call last):\n"
|
||||
' File "{}", line 3, in <module>\n'
|
||||
" something()\n"
|
||||
' File "{}", line 2, in something\n'
|
||||
" return airflow_DAG\n"
|
||||
"NameError: name 'airflow_DAG' is not defined\n"
|
||||
)
|
||||
self.assertEqual(
|
||||
import_error.stacktrace,
|
||||
expected_stacktrace.format(invalid_dag_filename, invalid_dag_filename)
|
||||
)
|
||||
|
||||
@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
|
||||
def test_import_error_tracebacks_zip_depth(self):
|
||||
dags_folder = mkdtemp()
|
||||
try:
|
||||
invalid_zip_filename = os.path.join(dags_folder, "test_zip_invalid.zip")
|
||||
invalid_dag_filename = os.path.join(dags_folder, "test_zip_invalid.zip", TEMP_DAG_FILENAME)
|
||||
with ZipFile(invalid_zip_filename, "w") as invalid_zip_file:
|
||||
invalid_zip_file.writestr(TEMP_DAG_FILENAME, INVALID_DAG_WITH_DEPTH_FILE_CONTENTS)
|
||||
self.run_single_scheduler_loop_with_no_dags(dags_folder)
|
||||
finally:
|
||||
shutil.rmtree(dags_folder)
|
||||
|
||||
with create_session() as session:
|
||||
import_errors = session.query(errors.ImportError).all()
|
||||
|
||||
self.assertEqual(len(import_errors), 1)
|
||||
import_error = import_errors[0]
|
||||
self.assertEqual(import_error.filename, invalid_zip_filename)
|
||||
expected_stacktrace = (
|
||||
"Traceback (most recent call last):\n"
|
||||
' File "{}", line 2, in something\n'
|
||||
" return airflow_DAG\n"
|
||||
"NameError: name 'airflow_DAG' is not defined\n"
|
||||
)
|
||||
self.assertEqual(
|
||||
import_error.stacktrace, expected_stacktrace.format(invalid_dag_filename)
|
||||
)
|
||||
|
||||
def test_list_py_file_paths(self):
|
||||
"""
|
||||
[JIRA-1357] Test the 'list_py_file_paths' function used by the
|
||||
|
|
Загрузка…
Ссылка в новой задаче