User-friendly error messages when the configuration is incorrect (#8463)

* Clearer error messages when the configuration is incorrect
This commit is contained in:
Kamil Breguła 2020-04-27 09:37:07 +02:00 коммит произвёл GitHub
Родитель 74bc316c56
Коммит 5a864f0e45
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 114 добавлений и 106 удалений

Просмотреть файл

@ -62,6 +62,12 @@ https://developers.google.com/style/inclusive-documentation
-->
### Unify `hostname_callable` option in `core` section
The previous option used a colon(`:`) to split the module from function. Now the dot(`.`) is used.
The change aims to unify the format of all options that refer to objects in the `airflow.cfg` file.
### Changes in BigQueryHook
- `create_empty_table` method accepts now `table_resource` parameter. If provided all
other parameters are ignored.

Просмотреть файл

@ -31,17 +31,17 @@
- name: hostname_callable
description: |
Hostname by providing a path to a callable, which will resolve the hostname.
The format is "package:function".
The format is "package.function".
For example, default value "socket:getfqdn" means that result from getfqdn() of "socket"
For example, default value "socket.getfqdn" means that result from getfqdn() of "socket"
package will be used as hostname.
No argument should be required in the function specified.
If using IP address as hostname is preferred, use value ``airflow.utils.net:get_host_ip_address``
If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address``
version_added: ~
type: string
example: ~
default: "socket:getfqdn"
default: "socket.getfqdn"
- name: default_timezone
description: |
Default timezone in case supplied date times are naive

Просмотреть файл

@ -34,14 +34,14 @@
dags_folder = {AIRFLOW_HOME}/dags
# Hostname by providing a path to a callable, which will resolve the hostname.
# The format is "package:function".
# The format is "package.function".
#
# For example, default value "socket:getfqdn" means that result from getfqdn() of "socket"
# For example, default value "socket.getfqdn" means that result from getfqdn() of "socket"
# package will be used as hostname.
#
# No argument should be required in the function specified.
# If using IP address as hostname is preferred, use value ``airflow.utils.net:get_host_ip_address``
hostname_callable = socket:getfqdn
# If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address``
hostname_callable = socket.getfqdn
# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)

Просмотреть файл

@ -42,7 +42,7 @@ dags_are_paused_at_creation = False
fernet_key = {FERNET_KEY}
enable_xcom_pickling = False
killed_task_cleanup_time = 5
hostname_callable = socket:getfqdn
hostname_callable = socket.getfqdn
worker_precheck = False
default_task_retries = 0

Просмотреть файл

@ -34,6 +34,7 @@ import yaml
from cryptography.fernet import Fernet
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string
log = logging.getLogger(__name__)
@ -304,6 +305,27 @@ class AirflowConfigParser(ConfigParser):
def getfloat(self, section, key, **kwargs):
return float(self.get(section, key, **kwargs))
def getimport(self, section, key, **kwargs):
"""
Reads options, imports the full qualified name, and returns the object.
In case of failure, it throws an exception a clear message with the key aad the section names
:return: The object or None, if the option is empty
"""
full_qualified_path = conf.get(section=section, key=key, **kwargs)
if not full_qualified_path:
return None
try:
return import_string(full_qualified_path)
except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
f'Current value: "{full_qualified_path}".'
)
def read(self, filenames, **kwargs):
super().read(filenames, **kwargs)
self._validate()

Просмотреть файл

@ -38,7 +38,6 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
from airflow.utils.module_loading import import_string
from airflow.utils.timeout import timeout
log = logging.getLogger(__name__)
@ -56,9 +55,7 @@ airflow celery worker
'''
if conf.has_option('celery', 'celery_config_options'):
celery_configuration = import_string(
conf.get('celery', 'celery_config_options')
)
celery_configuration = conf.getimport('celery', 'celery_config_options')
else:
celery_configuration = DEFAULT_CELERY_CONFIG

Просмотреть файл

@ -19,6 +19,7 @@ import logging
from contextlib import suppress
from typing import Optional
from airflow.exceptions import AirflowConfigException
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.module_loading import import_string
@ -65,7 +66,7 @@ class ExecutorLoader:
"""
Loads the executor.
This supports the following following formats:
This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path.
@ -87,8 +88,14 @@ class ExecutorLoader:
return import_string(f"airflow.executors.{executor_name}")()
log.debug("Loading executor from custom path: %s", executor_name)
executor = import_string(executor_name)()
try:
executor = import_string(executor_name)()
except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
f'Current value: "{executor_name}".'
)
log.info("Loaded executor: %s", executor_name)
return executor

Просмотреть файл

@ -76,18 +76,17 @@ def initialize_secrets_backends() -> List[BaseSecretsBackend]:
* import secrets backend classes
* instantiate them and return them in a list
"""
alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='')
try:
alternative_secrets_config_dict = json.loads(
conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
)
except JSONDecodeError:
alternative_secrets_config_dict = {}
secrets_backend_cls = conf.getimport(section=CONFIG_SECTION, key='backend')
backend_list = []
if alternative_secrets_backend:
secrets_backend_cls = import_string(alternative_secrets_backend)
if secrets_backend_cls:
try:
alternative_secrets_config_dict = json.loads(
conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
)
except JSONDecodeError:
alternative_secrets_config_dict = {}
backend_list.append(secrets_backend_cls(**alternative_secrets_config_dict))
for class_name in DEFAULT_SECRETS_SEARCH_PATH:

Просмотреть файл

@ -34,7 +34,6 @@ from sqlalchemy.pool import NullPool
from airflow import api
from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # NOQA F401
from airflow.logging_config import configure_logging
from airflow.utils.module_loading import import_string
from airflow.utils.sqlalchemy import setup_event_handlers
log = logging.getLogger(__name__)
@ -186,9 +185,7 @@ def configure_orm(disable_connection_pool=False):
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
if conf.has_option('core', 'sql_alchemy_connect_args'):
connect_args = import_string(
conf.get('core', 'sql_alchemy_connect_args')
)
connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
else:
connect_args = {}

Просмотреть файл

@ -25,9 +25,8 @@ from functools import wraps
from typing import Callable, Optional
from airflow.configuration import conf
from airflow.exceptions import InvalidStatsNameException
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
from airflow.typing_compat import Protocol
from airflow.utils.module_loading import import_string
log = logging.getLogger(__name__)
@ -86,12 +85,7 @@ def stat_name_default_handler(stat_name, max_length=250) -> str:
def get_current_handle_stat_name_func() -> Callable[[str], str]:
stat_name_handler_name = conf.get('scheduler', 'stat_name_handler')
if stat_name_handler_name:
handle_stat_name_func = import_string(stat_name_handler_name)
else:
handle_stat_name_func = stat_name_default_handler
return handle_stat_name_func
return conf.getimport('scheduler', 'stat_name_handler') or stat_name_default_handler
def validate_stat(fn):
@ -207,21 +201,16 @@ class _Stats(type):
from statsd import StatsClient
if conf.has_option('scheduler', 'statsd_custom_client_path'):
custom_statsd_module_path = conf.get('scheduler', 'statsd_custom_client_path')
stats_class = conf.getimport('scheduler', 'statsd_custom_client_path')
try:
stats_class = import_string(custom_statsd_module_path)
if not issubclass(stats_class, StatsClient):
raise Exception(
"""Your custom Statsd client must extend the statsd.StatsClient in order to ensure backwards
compatibility.""")
else:
log.info("Successfully loaded custom Statsd client "
f"from {custom_statsd_module_path}")
if not issubclass(stats_class, StatsClient):
raise AirflowConfigException(
"Your custom Statsd client must extend the statsd.StatsClient in order to ensure "
"backwards compatibility."
)
else:
log.info("Successfully loaded custom Statsd client")
except Exception as err:
raise ImportError('Unable to load custom Statsd client from '
f'{custom_statsd_module_path} due to {err}')
else:
stats_class = StatsClient

Просмотреть файл

@ -20,6 +20,7 @@
import logging
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string
log = logging.getLogger(__name__)
@ -48,11 +49,16 @@ def get_task_runner(local_task_job):
"""
if _TASK_RUNNER_NAME in CORE_TASK_RUNNERS:
log.debug("Loading core task runner: %s", _TASK_RUNNER_NAME)
task_runner_class_name = CORE_TASK_RUNNERS[_TASK_RUNNER_NAME]
task_runner_class = import_string(CORE_TASK_RUNNERS[_TASK_RUNNER_NAME])
else:
log.debug("Loading task runner from custom path: %s", _TASK_RUNNER_NAME)
task_runner_class_name = _TASK_RUNNER_NAME
try:
task_runner_class = import_string(_TASK_RUNNER_NAME)
except ImportError:
raise AirflowConfigException(
f'The task runner could not be loaded. Please check "executor" key in "core" section. '
f'Current value: "{_TASK_RUNNER_NAME}".'
)
task_runner_class = import_string(task_runner_class_name)
task_runner = task_runner_class(local_task_job)
return task_runner

Просмотреть файл

@ -17,7 +17,6 @@
# under the License.
import collections
import importlib
import logging
import os
import smtplib
@ -39,9 +38,7 @@ def send_email(to, subject, html_content,
"""
Send email using backend specified in EMAIL_BACKEND.
"""
path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
module = importlib.import_module(path)
backend = getattr(module, attr)
backend = conf.getimport('email', 'EMAIL_BACKEND')
to = get_email_address_list(to)
to = ", ".join(to)

Просмотреть файл

@ -16,10 +16,9 @@
# specific language governing permissions and limitations
# under the License.
#
import importlib
import socket
from airflow.configuration import AirflowConfigException, conf
from airflow.configuration import conf
def get_host_ip_address():
@ -34,19 +33,4 @@ def get_hostname():
Fetch the hostname using the callable from the config or using
`socket.getfqdn` as a fallback.
"""
# First we attempt to fetch the callable path from the config.
try:
callable_path = conf.get('core', 'hostname_callable')
except AirflowConfigException:
callable_path = None
# Then we handle the case when the config is missing or empty. This is the
# default behavior.
if not callable_path:
return socket.getfqdn()
# Since we have a callable path, we try to import and run it next.
module_path, attr_name = callable_path.split(':')
module = importlib.import_module(module_path)
func = getattr(module, attr_name)
return func()
return conf.getimport('core', 'hostname_callable', fallback='socket.getfqdn')()

Просмотреть файл

@ -22,6 +22,7 @@ from mock import patch
from sqlalchemy.pool import NullPool
from airflow import settings
from airflow.exceptions import AirflowConfigException
from tests.test_utils.config import conf_vars
SQL_ALCHEMY_CONNECT_ARGS = {
@ -100,6 +101,6 @@ class TestSqlAlchemySettings(unittest.TestCase):
('core', 'sql_alchemy_connect_args'): 'does.not.exist',
('core', 'sql_alchemy_pool_enabled'): 'False'
}
with self.assertRaises(ImportError):
with self.assertRaises(AirflowConfigException):
with conf_vars(config):
settings.configure_orm()

Просмотреть файл

@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import importlib
import re
import unittest
from unittest import mock
from unittest.mock import Mock
@ -23,7 +24,7 @@ from unittest.mock import Mock
import statsd
import airflow
from airflow.exceptions import InvalidStatsNameException
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
from airflow.stats import AllowListValidator, SafeDogStatsdLogger, SafeStatsdLogger
from tests.test_utils.config import conf_vars
@ -124,9 +125,14 @@ class TestStats(unittest.TestCase):
("scheduler", "statsd_custom_client_path"): "tests.test_stats.InvalidCustomStatsd",
})
def test_load_invalid_custom_stats_client(self):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("dummy_key")
assert InvalidCustomStatsd.incr_calls == 0
with self.assertRaisesRegex(
AirflowConfigException,
re.escape(
'Your custom Statsd client must extend the statsd.'
'StatsClient in order to ensure backwards compatibility.'
)
):
importlib.reload(airflow.stats)
def tearDown(self) -> None:
# To avoid side-effect

Просмотреть файл

@ -15,11 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import unittest
from unittest import mock
from airflow.exceptions import AirflowConfigException
from airflow.utils import net
from tests.test_utils.config import conf_vars
def get_hostname():
@ -28,32 +30,27 @@ def get_hostname():
class TestGetHostname(unittest.TestCase):
@mock.patch('airflow.utils.net.socket')
@mock.patch('airflow.utils.net.conf')
def test_get_hostname_unset(self, patched_conf, patched_socket):
patched_conf.get = mock.Mock(return_value=None)
patched_socket.getfqdn = mock.Mock(return_value='first')
self.assertTrue(net.get_hostname() == 'first')
@mock.patch('socket.getfqdn', return_value='first')
@conf_vars({('core', 'hostname_callable'): None})
def test_get_hostname_unset(self, mock_getfqdn):
self.assertEqual('first', net.get_hostname())
@mock.patch('airflow.utils.net.conf')
def test_get_hostname_set(self, patched_conf):
patched_conf.get = mock.Mock(
return_value='tests.utils.test_net:get_hostname'
)
self.assertTrue(net.get_hostname() == 'awesomehostname')
@conf_vars({('core', 'hostname_callable'): 'tests.utils.test_net.get_hostname'})
def test_get_hostname_set(self):
self.assertEqual('awesomehostname', net.get_hostname())
@mock.patch('airflow.utils.net.conf')
def test_get_hostname_set_incorrect(self, patched_conf):
patched_conf.get = mock.Mock(
return_value='tests.utils.test_net'
)
with self.assertRaises(ValueError):
@conf_vars({('core', 'hostname_callable'): 'tests.utils.test_net'})
def test_get_hostname_set_incorrect(self):
with self.assertRaises(TypeError):
net.get_hostname()
@mock.patch('airflow.utils.net.conf')
def test_get_hostname_set_missing(self, patched_conf):
patched_conf.get = mock.Mock(
return_value='tests.utils.test_net:missing_func'
)
with self.assertRaises(AttributeError):
@conf_vars({('core', 'hostname_callable'): 'tests.utils.test_net.missing_func'})
def test_get_hostname_set_missing(self):
with self.assertRaisesRegex(
AirflowConfigException,
re.escape(
'The object could not be loaded. Please check "hostname_callable" key in "core" section. '
'Current value: "tests.utils.test_net.missing_func"'
)
):
net.get_hostname()