[AIRFLOW-6054] Add a command that starts the database consoles (#6653)

This commit is contained in:
Kamil Breguła 2019-11-25 15:13:37 +01:00 коммит произвёл GitHub
Родитель db4be19b80
Коммит ae96f27fd8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 129 добавлений и 6 удалений

Просмотреть файл

@ -252,8 +252,18 @@ system tests. You can technically use local
virtualenv to run those tests, but it requires to set up a number of
external components (databases/queues/kubernetes and the like). So, it is
much easier to use the `Breeze development environment <BREEZE.rst>`_
for integration and system tests.
for integration and system tests.
Note: Soon we will separate the integration and system tests out
so that you can clearly know which tests are unit tests and can be run in
the local virtualenv and which should be run using Breeze.
Connecting to database
----------------------
When analyzing the situation, it is helpful to be able to directly query the database. You can do it using
the built-in Airflow command:
.. code:: bash
airflow db shell

Просмотреть файл

@ -834,6 +834,12 @@ class CLIFactory:
'help': "Upgrade the metadata database to latest version",
'args': tuple(),
},
{
'func': lazy_load_command('airflow.cli.commands.db_command.shell'),
'name': 'shell',
'help': "Runs a shell to access the database",
'args': tuple(),
},
),
}, {
'name': 'kerberos',

Просмотреть файл

@ -15,7 +15,12 @@
# specific language governing permissions and limitations
# under the License.
"""Database sub-commands"""
from airflow import settings
import os
import subprocess
import textwrap
from tempfile import NamedTemporaryFile
from airflow import AirflowException, settings
from airflow.utils import cli as cli_utils, db
@ -42,3 +47,37 @@ def upgradedb(args):
"""Upgrades the metadata database"""
print("DB: " + repr(settings.engine.url))
db.upgradedb()
@cli_utils.action_logging
def shell(args):
"""Run a shell that allows to access database access"""
url = settings.engine.url
print("DB: " + repr(url))
if url.get_backend_name() == 'mysql':
with NamedTemporaryFile(suffix="my.cnf") as f:
content = textwrap.dedent(f"""
[client]
host = {url.host}
user = {url.username}
password = {url.password}
port = {url.port}
database = {url.database}
""").strip()
f.write(content.encode())
f.flush()
subprocess.Popen(["mysql", f"--defaults-extra-file={f.name}"]).wait()
elif url.get_backend_name() == 'sqlite':
subprocess.Popen(["sqlite3", url.database]).wait()
elif url.get_backend_name() == 'postgresql':
env = os.environ.copy()
env['PGHOST'] = url.host or ""
env['PGPORT'] = url.port or ""
env['PGUSER'] = url.username or ""
# PostgreSQL does not allow the use of PGPASSFILE if the current user is root.
env["PGPASSWORD"] = url.password or ""
env['PGDATABASE'] = url.database
subprocess.Popen(["psql"], env=env).wait()
else:
raise AirflowException(f"Unknown driver: {url.drivername}")

Просмотреть файл

@ -18,6 +18,9 @@
import unittest
from unittest import mock
from sqlalchemy.engine.url import make_url
from airflow import AirflowException
from airflow.bin import cli
from airflow.cli.commands import db_command
@ -28,13 +31,78 @@ class TestCliDb(unittest.TestCase):
cls.parser = cli.CLIFactory.get_parser()
@mock.patch("airflow.cli.commands.db_command.db.initdb")
def test_cli_initdb(self, initdb_mock):
def test_cli_initdb(self, mock_initdb):
db_command.initdb(self.parser.parse_args(['db', 'init']))
initdb_mock.assert_called_once_with()
mock_initdb.assert_called_once_with()
@mock.patch("airflow.cli.commands.db_command.db.resetdb")
def test_cli_resetdb(self, resetdb_mock):
def test_cli_resetdb(self, mock_resetdb):
db_command.resetdb(self.parser.parse_args(['db', 'reset', '--yes']))
resetdb_mock.assert_called_once_with()
mock_resetdb.assert_called_once_with()
@mock.patch("airflow.cli.commands.db_command.db.upgradedb")
def test_cli_upgradedb(self, mock_upgradedb):
db_command.upgradedb(self.parser.parse_args(['db', 'upgrade']))
mock_upgradedb.assert_called_once_with()
@mock.patch("airflow.cli.commands.db_command.subprocess")
@mock.patch( # type: ignore
"airflow.cli.commands.db_command.NamedTemporaryFile",
**{'return_value.__enter__.return_value.name': "/tmp/name"}
)
@mock.patch(
"airflow.cli.commands.db_command.settings.engine.url",
make_url("mysql://root@mysql/airflow")
)
def test_cli_shell_mysql(self, mock_tmp_file, mock_subprocess):
db_command.shell(self.parser.parse_args(['db', 'shell']))
mock_subprocess.Popen.assert_called_once_with(
['mysql', '--defaults-extra-file=/tmp/name']
)
mock_tmp_file.return_value.__enter__.return_value.write.assert_called_once_with(
b'[client]\nhost = mysql\nuser = root\npassword = None\nport = None'
b'\ndatabase = airflow'
)
@mock.patch("airflow.cli.commands.db_command.subprocess")
@mock.patch(
"airflow.cli.commands.db_command.settings.engine.url",
make_url("sqlite:////root/airflow/airflow.db")
)
def test_cli_shell_sqlite(self, mock_subprocess):
db_command.shell(self.parser.parse_args(['db', 'shell']))
mock_subprocess.Popen.assert_called_once_with(
['sqlite3', '/root/airflow/airflow.db']
)
@mock.patch("airflow.cli.commands.db_command.subprocess")
@mock.patch(
"airflow.cli.commands.db_command.settings.engine.url",
make_url("postgresql+psycopg2://postgres:airflow@postgres/airflow")
)
def test_cli_shell_postgres(self, mock_subprocess):
db_command.shell(self.parser.parse_args(['db', 'shell']))
mock_subprocess.Popen.assert_called_once_with(
['psql'], env=mock.ANY
)
_, kwargs = mock_subprocess.Popen.call_args
env = kwargs['env']
postgres_env = {k: v for k, v in env.items() if k.startswith('PG')}
self.assertEqual({
'PGDATABASE': 'airflow',
'PGHOST': 'postgres',
'PGPASSWORD': 'airflow',
'PGPORT': '',
'PGUSER': 'postgres'
}, postgres_env)
@mock.patch(
"airflow.cli.commands.db_command.settings.engine.url",
make_url("invalid+psycopg2://postgres:airflow@postgres/airflow")
)
def test_cli_shell_invalid(self):
with self.assertRaisesRegex(AirflowException, r"Unknown driver: invalid\+psycopg2"):
db_command.shell(self.parser.parse_args(['db', 'shell']))