Pass SQLAlchemy engine options to FAB based UI (#11395)
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
This commit is contained in:
Родитель
0823d46a7f
Коммит
91484b938f
|
@ -158,12 +158,35 @@ def configure_orm(disable_connection_pool=False):
|
|||
log.debug("Setting up DB connection pool (PID %s)", os.getpid())
|
||||
global engine
|
||||
global Session
|
||||
engine_args = {}
|
||||
engine_args = prepare_engine_args(disable_connection_pool)
|
||||
|
||||
# Allow the user to specify an encoding for their DB otherwise default
|
||||
# to utf-8 so jobs & users with non-latin1 characters can still use us.
|
||||
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
|
||||
|
||||
if conf.has_option('core', 'sql_alchemy_connect_args'):
|
||||
connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
|
||||
else:
|
||||
connect_args = {}
|
||||
|
||||
engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
|
||||
setup_event_handlers(engine)
|
||||
|
||||
Session = scoped_session(sessionmaker(
|
||||
autocommit=False,
|
||||
autoflush=False,
|
||||
bind=engine,
|
||||
expire_on_commit=False,
|
||||
))
|
||||
|
||||
|
||||
def prepare_engine_args(disable_connection_pool=False):
|
||||
"""Prepare SQLAlchemy engine args"""
|
||||
engine_args = {}
|
||||
pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
|
||||
if disable_connection_pool or not pool_connections:
|
||||
engine_args['poolclass'] = NullPool
|
||||
log.debug("settings.configure_orm(): Using NullPool")
|
||||
log.debug("settings.prepare_engine_args(): Using NullPool")
|
||||
elif 'sqlite' not in SQL_ALCHEMY_CONN:
|
||||
# Pool size engine args not supported by sqlite.
|
||||
# If no config value is defined for the pool size, select a reasonable value.
|
||||
|
@ -195,30 +218,13 @@ def configure_orm(disable_connection_pool=False):
|
|||
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
|
||||
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)
|
||||
|
||||
log.debug("settings.configure_orm(): Using pool settings. pool_size=%d, max_overflow=%d, "
|
||||
log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
|
||||
"pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid())
|
||||
engine_args['pool_size'] = pool_size
|
||||
engine_args['pool_recycle'] = pool_recycle
|
||||
engine_args['pool_pre_ping'] = pool_pre_ping
|
||||
engine_args['max_overflow'] = max_overflow
|
||||
|
||||
# Allow the user to specify an encoding for their DB otherwise default
|
||||
# to utf-8 so jobs & users with non-latin1 characters can still use us.
|
||||
engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
|
||||
|
||||
if conf.has_option('core', 'sql_alchemy_connect_args'):
|
||||
connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
|
||||
else:
|
||||
connect_args = {}
|
||||
|
||||
engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
|
||||
setup_event_handlers(engine)
|
||||
|
||||
Session = scoped_session(
|
||||
sessionmaker(autocommit=False,
|
||||
autoflush=False,
|
||||
bind=engine,
|
||||
expire_on_commit=False))
|
||||
return engine_args
|
||||
|
||||
|
||||
def dispose_orm():
|
||||
|
|
|
@ -82,6 +82,9 @@ def create_app(config=None, testing=False, app_name="Airflow"):
|
|||
if config:
|
||||
flask_app.config.from_mapping(config)
|
||||
|
||||
if 'SQLALCHEMY_ENGINE_OPTIONS' not in flask_app.config:
|
||||
flask_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args()
|
||||
|
||||
# Configure the JSON encoder used by `|tojson` filter from Flask
|
||||
flask_app.json_encoder = AirflowJsonEncoder
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from werkzeug.routing import Rule
|
||||
from werkzeug.test import create_environ
|
||||
from werkzeug.wrappers import Response
|
||||
|
@ -195,3 +196,22 @@ class TestApp(unittest.TestCase):
|
|||
|
||||
self.assertEqual(b"success", response.get_data())
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@conf_vars({
|
||||
('core', 'sql_alchemy_pool_enabled'): 'True',
|
||||
('core', 'sql_alchemy_pool_size'): '3',
|
||||
('core', 'sql_alchemy_max_overflow'): '5',
|
||||
('core', 'sql_alchemy_pool_recycle'): '120',
|
||||
('core', 'sql_alchemy_pool_pre_ping'): 'True',
|
||||
})
|
||||
@mock.patch("airflow.www.app.app", None)
|
||||
@pytest.mark.backend("mysql", "postgres")
|
||||
def test_should_set_sqlalchemy_engine_options(self):
|
||||
app = application.cached_app(testing=True)
|
||||
engine_params = {
|
||||
'pool_size': 3,
|
||||
'pool_recycle': 120,
|
||||
'pool_pre_ping': True,
|
||||
'max_overflow': 5
|
||||
}
|
||||
self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)
|
||||
|
|
Загрузка…
Ссылка в новой задаче