[AIRFLOW-3742] Respect the `fallback` arg in airflow.configuration.get (#4567)
This argument is part of the API from our parent class, but we didn't support it because of the various steps we perform in `get()` - this makes it behave more like the parent class, and can simplify a few instances in our code (I've only included one that I found here)
This commit is contained in:
Родитель
fc22c6efad
Коммит
0d64fd8aac
|
@ -36,7 +36,7 @@ import subprocess
|
||||||
import sys
|
import sys
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
from backports.configparser import ConfigParser
|
from backports.configparser import ConfigParser, _UNSET, NoOptionError
|
||||||
from zope.deprecation import deprecated
|
from zope.deprecation import deprecated
|
||||||
|
|
||||||
from airflow.exceptions import AirflowConfigException
|
from airflow.exceptions import AirflowConfigException
|
||||||
|
@ -247,7 +247,7 @@ class AirflowConfigParser(ConfigParser):
|
||||||
return option
|
return option
|
||||||
|
|
||||||
# ...then the default config
|
# ...then the default config
|
||||||
if self.airflow_defaults.has_option(section, key):
|
if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
|
||||||
return expand_env_var(
|
return expand_env_var(
|
||||||
self.airflow_defaults.get(section, key, **kwargs))
|
self.airflow_defaults.get(section, key, **kwargs))
|
||||||
|
|
||||||
|
@ -291,9 +291,10 @@ class AirflowConfigParser(ConfigParser):
|
||||||
try:
|
try:
|
||||||
# Using self.get() to avoid reimplementing the priority order
|
# Using self.get() to avoid reimplementing the priority order
|
||||||
# of config variables (env, config, cmd, defaults)
|
# of config variables (env, config, cmd, defaults)
|
||||||
self.get(section, option)
|
# UNSET to avoid logging a warning about missing values
|
||||||
|
self.get(section, option, fallback=_UNSET)
|
||||||
return True
|
return True
|
||||||
except AirflowConfigException:
|
except NoOptionError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def remove_option(self, section, option, remove_default=True):
|
def remove_option(self, section, option, remove_default=True):
|
||||||
|
|
|
@ -176,13 +176,10 @@ def configure_orm(disable_connection_pool=False):
|
||||||
engine_args['pool_size'] = pool_size
|
engine_args['pool_size'] = pool_size
|
||||||
engine_args['pool_recycle'] = pool_recycle
|
engine_args['pool_recycle'] = pool_recycle
|
||||||
|
|
||||||
try:
|
# Allow the user to specify an encoding for their DB otherwise default
|
||||||
# Allow the user to specify an encoding for their DB otherwise default
|
# to utf-8 so jobs & users with non-latin1 characters can still use
|
||||||
# to utf-8 so jobs & users with non-latin1 characters can still use
|
# us.
|
||||||
# us.
|
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
|
||||||
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING')
|
|
||||||
except conf.AirflowConfigException:
|
|
||||||
engine_args['encoding'] = 'utf-8'
|
|
||||||
# For Python2 we get back a newstr and need a str
|
# For Python2 we get back a newstr and need a str
|
||||||
engine_args['encoding'] = engine_args['encoding'].__str__()
|
engine_args['encoding'] = engine_args['encoding'].__str__()
|
||||||
|
|
||||||
|
@ -226,10 +223,7 @@ def configure_adapters():
|
||||||
|
|
||||||
|
|
||||||
def validate_session():
|
def validate_session():
|
||||||
try:
|
worker_precheck = conf.getboolean('core', 'worker_precheck', fallback=False)
|
||||||
worker_precheck = conf.getboolean('core', 'worker_precheck')
|
|
||||||
except conf.AirflowConfigException:
|
|
||||||
worker_precheck = False
|
|
||||||
if not worker_precheck:
|
if not worker_precheck:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -59,8 +59,10 @@ class TestWorkerPrecheck(unittest.TestCase):
|
||||||
Test to check the behaviour of validate_session method
|
Test to check the behaviour of validate_session method
|
||||||
when worker_precheck is absent in airflow configuration
|
when worker_precheck is absent in airflow configuration
|
||||||
"""
|
"""
|
||||||
mock_getboolean.side_effect = airflow.configuration.AirflowConfigException
|
mock_getboolean.return_value = False
|
||||||
self.assertEqual(airflow.settings.validate_session(), True)
|
|
||||||
|
self.assertTrue(airflow.settings.validate_session())
|
||||||
|
mock_getboolean.assert_called_once_with('core', 'worker_precheck', fallback=False)
|
||||||
|
|
||||||
@mock.patch('sqlalchemy.orm.session.Session.execute')
|
@mock.patch('sqlalchemy.orm.session.Session.execute')
|
||||||
@mock.patch('airflow.configuration.getboolean')
|
@mock.patch('airflow.configuration.getboolean')
|
||||||
|
|
|
@ -98,6 +98,8 @@ class ConfTest(unittest.TestCase):
|
||||||
opt = conf.get('testsection', 'testpercent')
|
opt = conf.get('testsection', 'testpercent')
|
||||||
self.assertEqual(opt, 'with%percent')
|
self.assertEqual(opt, 'with%percent')
|
||||||
|
|
||||||
|
self.assertTrue(conf.has_option('testsection', 'testkey'))
|
||||||
|
|
||||||
def test_conf_as_dict(self):
|
def test_conf_as_dict(self):
|
||||||
cfg_dict = conf.as_dict()
|
cfg_dict = conf.as_dict()
|
||||||
|
|
||||||
|
@ -165,6 +167,10 @@ key6 = value6
|
||||||
self.assertEqual('key4_result', test_conf.get('test', 'key4'))
|
self.assertEqual('key4_result', test_conf.get('test', 'key4'))
|
||||||
self.assertEqual('value6', test_conf.get('another', 'key6'))
|
self.assertEqual('value6', test_conf.get('another', 'key6'))
|
||||||
|
|
||||||
|
self.assertEqual('hello', test_conf.get('test', 'key1', fallback='fb'))
|
||||||
|
self.assertEqual('value6', test_conf.get('another', 'key6', fallback='fb'))
|
||||||
|
self.assertEqual('fb', test_conf.get('another', 'key7', fallback='fb'))
|
||||||
|
|
||||||
self.assertTrue(test_conf.has_option('test', 'key1'))
|
self.assertTrue(test_conf.has_option('test', 'key1'))
|
||||||
self.assertTrue(test_conf.has_option('test', 'key2'))
|
self.assertTrue(test_conf.has_option('test', 'key2'))
|
||||||
self.assertTrue(test_conf.has_option('test', 'key3'))
|
self.assertTrue(test_conf.has_option('test', 'key3'))
|
||||||
|
|
Загрузка…
Ссылка в новой задаче