[AIRFLOW-4573] Import airflow_local_settings after prepare_classpath (#5330)
Moves the airflow_local_settings import code into a dedicated function in settings.py and adds a call to it in initialize after prepare_syspath
This commit is contained in:
Родитель
89bc657553
Коммит
d1626d80b5
|
@ -251,16 +251,27 @@ def prepare_syspath():
|
|||
sys.path.append(PLUGINS_FOLDER)
|
||||
|
||||
|
||||
try:
|
||||
from airflow_local_settings import * # noqa F403 F401
|
||||
log.info("Loaded airflow_local_settings.")
|
||||
except Exception:
|
||||
pass
|
||||
def import_local_settings():
|
||||
try:
|
||||
import airflow_local_settings
|
||||
|
||||
if hasattr(airflow_local_settings, "__all__"):
|
||||
for i in airflow_local_settings.__all__:
|
||||
globals()[i] = getattr(airflow_local_settings, i)
|
||||
else:
|
||||
for k, v in airflow_local_settings.__dict__.items():
|
||||
if not k.startswith("__"):
|
||||
globals()[k] = v
|
||||
|
||||
log.info("Loaded airflow_local_settings from " + airflow_local_settings.__file__ + ".")
|
||||
except ImportError:
|
||||
log.debug("Failed to import airflow_local_settings.", exc_info=True)
|
||||
|
||||
|
||||
def initialize():
|
||||
configure_vars()
|
||||
prepare_syspath()
|
||||
import_local_settings()
|
||||
global LOGGING_CLASS_PATH
|
||||
LOGGING_CLASS_PATH = configure_logging()
|
||||
configure_adapters()
|
||||
|
|
|
@ -930,6 +930,7 @@
|
|||
./tests/test_configuration.py
|
||||
./tests/test_impersonation.py
|
||||
./tests/test_logging_config.py
|
||||
./tests/test_local_settings.py
|
||||
./tests/test_stats.py
|
||||
./tests/test_utils/__init__.py
|
||||
./tests/test_utils/db.py
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, call
|
||||
|
||||
|
||||
SETTINGS_FILE_POLICY = """
|
||||
def policy(task_instance):
|
||||
task_instance.run_as_user = "myself"
|
||||
"""
|
||||
|
||||
SETTINGS_FILE_POLICY_WITH_DUNDER_ALL = """
|
||||
__all__ = ["policy"]
|
||||
|
||||
def policy(task_instance):
|
||||
task_instance.run_as_user = "myself"
|
||||
|
||||
def not_policy():
|
||||
print("This shouldn't be imported")
|
||||
"""
|
||||
|
||||
|
||||
class SettingsContext:
|
||||
def __init__(self, content: str, module_name: str):
|
||||
self.content = content
|
||||
self.settings_root = tempfile.mkdtemp()
|
||||
filename = "{}.py".format(module_name)
|
||||
self.settings_file = os.path.join(self.settings_root, filename)
|
||||
|
||||
def __enter__(self):
|
||||
with open(self.settings_file, 'w') as handle:
|
||||
handle.writelines(self.content)
|
||||
sys.path.append(self.settings_root)
|
||||
return self.settings_file
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
sys.path.remove(self.settings_root)
|
||||
|
||||
|
||||
class LocalSettingsTest(unittest.TestCase):
|
||||
# Make sure that the configure_logging is not cached
|
||||
def setUp(self):
|
||||
self.old_modules = dict(sys.modules)
|
||||
|
||||
def tearDown(self):
|
||||
# Remove any new modules imported during the test run. This lets us
|
||||
# import the same source files for more than one test.
|
||||
for mod in [m for m in sys.modules if m not in self.old_modules]:
|
||||
del sys.modules[mod]
|
||||
|
||||
@unittest.mock.patch("airflow.settings.import_local_settings")
|
||||
@unittest.mock.patch("airflow.settings.prepare_syspath")
|
||||
def test_initialize_order(self, prepare_syspath, import_local_settings):
|
||||
"""
|
||||
Tests that import_local_settings is called after prepare_classpath
|
||||
"""
|
||||
mock = unittest.mock.Mock()
|
||||
mock.attach_mock(prepare_syspath, "prepare_syspath")
|
||||
mock.attach_mock(import_local_settings, "import_local_settings")
|
||||
|
||||
import airflow.settings
|
||||
airflow.settings.initialize()
|
||||
|
||||
mock.assert_has_calls([call.prepare_syspath(), call.import_local_settings()])
|
||||
|
||||
def test_import_with_dunder_all_not_specified(self):
|
||||
"""
|
||||
Tests that if __all__ is specified in airflow_local_settings,
|
||||
only module attributes specified within are imported.
|
||||
"""
|
||||
with SettingsContext(SETTINGS_FILE_POLICY_WITH_DUNDER_ALL, "airflow_local_settings"):
|
||||
from airflow import settings
|
||||
settings.import_local_settings() # pylint: ignore
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
settings.not_policy()
|
||||
|
||||
def test_import_with_dunder_all(self):
|
||||
"""
|
||||
Tests that if __all__ is specified in airflow_local_settings,
|
||||
only module attributes specified within are imported.
|
||||
"""
|
||||
with SettingsContext(SETTINGS_FILE_POLICY_WITH_DUNDER_ALL, "airflow_local_settings"):
|
||||
from airflow import settings
|
||||
settings.import_local_settings() # pylint: ignore
|
||||
|
||||
task_instance = MagicMock()
|
||||
settings.policy(task_instance)
|
||||
|
||||
assert task_instance.run_as_user == "myself"
|
||||
|
||||
@unittest.mock.patch("airflow.settings.log.debug")
|
||||
def test_import_local_settings_without_syspath(self, log_mock):
|
||||
"""
|
||||
Tests that an ImportError is raised in import_local_settings
|
||||
if there is no airflow_local_settings module on the syspath.
|
||||
"""
|
||||
from airflow import settings
|
||||
settings.import_local_settings()
|
||||
log_mock.assert_called_with("Failed to import airflow_local_settings.", exc_info=True)
|
||||
|
||||
def test_policy_function(self):
|
||||
"""
|
||||
Tests that task instances are mutated by the policy
|
||||
function in airflow_local_settings.
|
||||
"""
|
||||
with SettingsContext(SETTINGS_FILE_POLICY, "airflow_local_settings"):
|
||||
from airflow import settings
|
||||
settings.import_local_settings() # pylint: ignore
|
||||
|
||||
task_instance = MagicMock()
|
||||
settings.policy(task_instance)
|
||||
|
||||
assert task_instance.run_as_user == "myself"
|
|
@ -217,6 +217,16 @@ class TestDagFileProcessorManager(unittest.TestCase):
|
|||
|
||||
|
||||
class TestDagFileProcessorAgent(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Make sure that the configure_logging is not cached
|
||||
self.old_modules = dict(sys.modules)
|
||||
|
||||
def tearDown(self):
|
||||
# Remove any new modules imported during the test run. This lets us
|
||||
# import the same source files for more than one test.
|
||||
for m in [m for m in sys.modules if m not in self.old_modules]:
|
||||
del sys.modules[m]
|
||||
|
||||
def test_reload_module(self):
|
||||
"""
|
||||
Configure the context to have core.logging_config_class set to a fake logging
|
||||
|
|
|
@ -647,6 +647,9 @@ class TestLogView(TestBase):
|
|||
execution_date=DEFAULT_DATE)
|
||||
|
||||
def setUp(self):
|
||||
# Make sure that the configure_logging is not cached
|
||||
self.old_modules = dict(sys.modules)
|
||||
|
||||
conf.load_test_config()
|
||||
|
||||
# Create a custom logging configuration
|
||||
|
@ -688,6 +691,12 @@ class TestLogView(TestBase):
|
|||
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
|
||||
self.clear_table(TaskInstance)
|
||||
|
||||
# Remove any new modules imported during the test run. This lets us
|
||||
# import the same source files for more than one test.
|
||||
for m in [m for m in sys.modules if m not in self.old_modules]:
|
||||
del sys.modules[m]
|
||||
|
||||
sys.path.remove(self.settings_folder)
|
||||
shutil.rmtree(self.settings_folder)
|
||||
conf.set('core', 'logging_config_class', '')
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче