Pylint fixes and deprecation of rare used methods in Connection (#9419)
This commit is contained in:
Родитель
c7a454aa32
Коммит
7256f4caa2
26
UPDATING.md
26
UPDATING.md
|
@ -79,6 +79,32 @@ To clean up, modules in `airflow.contrib.utils.log` have been moved into `airflo
|
|||
this includes:
|
||||
* `TaskHandlerWithCustomFormatter` class
|
||||
|
||||
### Deprecated method in Connection
|
||||
|
||||
The connection module has new deprecated methods:
|
||||
|
||||
- `Connection.parse_from_uri`
|
||||
- `Connection.log_info`
|
||||
- `Connection.debug_info`
|
||||
|
||||
and one deprecated function:
|
||||
- `parse_netloc_to_hostname`
|
||||
|
||||
Previously, users could create a connection object in two ways
|
||||
```
|
||||
conn_1 = Connection(conn_id="conn_a", uri="mysql://AAA/")
|
||||
# or
|
||||
conn_2 = Connection(conn_id="conn_a")
|
||||
conn_2.parse_uri(uri="mysql://AAA/")
|
||||
```
|
||||
Now the second way is not supported.
|
||||
|
||||
`Connection.log_info` and `Connection.debug_info` method have been deprecated. Read each Connection field individually or use the
|
||||
default representation (`__repr__`).
|
||||
|
||||
The old method is still works but can be abandoned at any time. The changes are intended to delete method
|
||||
that are rarely used.
|
||||
|
||||
### BaseOperator uses metaclass
|
||||
|
||||
`BaseOperator` class uses a `BaseOperatorMeta` as a metaclass. This meta class is based on
|
||||
|
|
|
@ -56,7 +56,17 @@ class BaseHook(LoggingMixin):
|
|||
"""
|
||||
conn = random.choice(list(cls.get_connections(conn_id)))
|
||||
if conn.host:
|
||||
log.info("Using connection to: %s", conn.log_info())
|
||||
log.info(
|
||||
"Using connection to: id: %s. Host: %s, Port: %s, Schema: %s, Login: %s, Password: %s, "
|
||||
"extra: %s",
|
||||
conn.conn_id,
|
||||
conn.host,
|
||||
conn.port,
|
||||
conn.schema,
|
||||
conn.login,
|
||||
"XXXXXXXX" if conn.password else None,
|
||||
"XXXXXXXX" if conn.extra_dejson else None
|
||||
)
|
||||
return conn
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
# under the License.
|
||||
|
||||
import json
|
||||
import warnings
|
||||
from json import JSONDecodeError
|
||||
from typing import Dict, Optional
|
||||
from urllib.parse import parse_qsl, quote, unquote, urlencode, urlparse
|
||||
|
||||
from sqlalchemy import Boolean, Column, Integer, String
|
||||
|
@ -86,9 +88,18 @@ CONN_TYPE_TO_HOOK = {
|
|||
# PLEASE KEEP ABOVE LIST IN ALPHABETICAL ORDER.
|
||||
|
||||
|
||||
def parse_netloc_to_hostname(*args, **kwargs):
|
||||
"""This method is deprecated."""
|
||||
warnings.warn(
|
||||
"This method is deprecated.",
|
||||
DeprecationWarning
|
||||
)
|
||||
return _parse_netloc_to_hostname(*args, **kwargs)
|
||||
|
||||
|
||||
# Python automatically converts all letters to lowercase in hostname
|
||||
# See: https://issues.apache.org/jira/browse/AIRFLOW-3615
|
||||
def parse_netloc_to_hostname(uri_parts):
|
||||
def _parse_netloc_to_hostname(uri_parts):
|
||||
"""Parse a URI string to get correct Hostname."""
|
||||
hostname = unquote(uri_parts.hostname or '')
|
||||
if '/' in hostname:
|
||||
|
@ -107,6 +118,29 @@ class Connection(Base, LoggingMixin):
|
|||
connection information. The idea here is that scripts use references to
|
||||
database instances (conn_id) instead of hard coding hostname, logins and
|
||||
passwords when using operators or hooks.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this class, see: :doc:`/howto/connection/index`
|
||||
|
||||
:param conn_id: The connection ID.
|
||||
:type conn_id: str
|
||||
:param conn_type: The connection type.
|
||||
:type conn_type: str
|
||||
:param host: The host.
|
||||
:type host: str
|
||||
:param login: The login.
|
||||
:type login: str
|
||||
:param password: The pasword.
|
||||
:type password: str
|
||||
:param schema: The schema.
|
||||
:type schema: str
|
||||
:param port: The port number.
|
||||
:type port: int
|
||||
:param extra: Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON
|
||||
encoded object.
|
||||
:type extra: str
|
||||
:param uri: URI address describing connection parameters.
|
||||
:type uri: str
|
||||
"""
|
||||
__tablename__ = "connection"
|
||||
|
||||
|
@ -123,13 +157,29 @@ class Connection(Base, LoggingMixin):
|
|||
_extra = Column('extra', String(5000))
|
||||
|
||||
def __init__(
|
||||
self, conn_id=None, conn_type=None,
|
||||
host=None, login=None, password=None,
|
||||
schema=None, port=None, extra=None,
|
||||
uri=None):
|
||||
self,
|
||||
conn_id: Optional[str] = None,
|
||||
conn_type: Optional[str] = None,
|
||||
host: Optional[str] = None,
|
||||
login: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
schema: Optional[str] = None,
|
||||
port: Optional[int] = None,
|
||||
extra: Optional[str] = None,
|
||||
uri: Optional[str] = None
|
||||
):
|
||||
super().__init__()
|
||||
self.conn_id = conn_id
|
||||
if uri and ( # pylint: disable=too-many-boolean-expressions
|
||||
conn_type or host or login or password or schema or port or extra
|
||||
):
|
||||
raise AirflowException(
|
||||
"You must create an object using the URI or individual values "
|
||||
"(conn_type, host, login, password, schema, port or extra)."
|
||||
"You can't mix these two ways to create this object."
|
||||
)
|
||||
if uri:
|
||||
self.parse_from_uri(uri)
|
||||
self._parse_from_uri(uri)
|
||||
else:
|
||||
self.conn_type = conn_type
|
||||
self.host = host
|
||||
|
@ -139,7 +189,15 @@ class Connection(Base, LoggingMixin):
|
|||
self.port = port
|
||||
self.extra = extra
|
||||
|
||||
def parse_from_uri(self, uri):
|
||||
def parse_from_uri(self, **uri):
|
||||
"""This method is deprecated. Please use uri parameter in constructor."""
|
||||
warnings.warn(
|
||||
"This method is deprecated. Please use uri parameter in constructor.",
|
||||
DeprecationWarning
|
||||
)
|
||||
self._parse_from_uri(**uri)
|
||||
|
||||
def _parse_from_uri(self, uri: str):
|
||||
uri_parts = urlparse(uri)
|
||||
conn_type = uri_parts.scheme
|
||||
if conn_type == 'postgresql':
|
||||
|
@ -147,7 +205,7 @@ class Connection(Base, LoggingMixin):
|
|||
elif '-' in conn_type:
|
||||
conn_type = conn_type.replace('-', '_')
|
||||
self.conn_type = conn_type
|
||||
self.host = parse_netloc_to_hostname(uri_parts)
|
||||
self.host = _parse_netloc_to_hostname(uri_parts)
|
||||
quoted_schema = uri_parts.path[1:]
|
||||
self.schema = unquote(quoted_schema) if quoted_schema else quoted_schema
|
||||
self.login = unquote(uri_parts.username) \
|
||||
|
@ -159,6 +217,7 @@ class Connection(Base, LoggingMixin):
|
|||
self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
|
||||
|
||||
def get_uri(self) -> str:
|
||||
"""Return connection in URI format"""
|
||||
uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
|
||||
|
||||
authority_block = ''
|
||||
|
@ -193,7 +252,8 @@ class Connection(Base, LoggingMixin):
|
|||
|
||||
return uri
|
||||
|
||||
def get_password(self):
|
||||
def get_password(self) -> Optional[str]:
|
||||
"""Return encrypted password."""
|
||||
if self._password and self.is_encrypted:
|
||||
fernet = get_fernet()
|
||||
if not fernet.is_encrypted:
|
||||
|
@ -204,18 +264,21 @@ class Connection(Base, LoggingMixin):
|
|||
else:
|
||||
return self._password
|
||||
|
||||
def set_password(self, value):
|
||||
def set_password(self, value: Optional[str]):
|
||||
"""Encrypt password and set in object attribute."""
|
||||
if value:
|
||||
fernet = get_fernet()
|
||||
self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
|
||||
self.is_encrypted = fernet.is_encrypted
|
||||
|
||||
@declared_attr
|
||||
def password(cls):
|
||||
def password(cls): # pylint: disable=no-self-argument
|
||||
"""Password. The value is decrypted/encrypted when reading/setting the value."""
|
||||
return synonym('_password',
|
||||
descriptor=property(cls.get_password, cls.set_password))
|
||||
|
||||
def get_extra(self):
|
||||
def get_extra(self) -> Dict:
|
||||
"""Return encrypted extra-data."""
|
||||
if self._extra and self.is_extra_encrypted:
|
||||
fernet = get_fernet()
|
||||
if not fernet.is_encrypted:
|
||||
|
@ -226,7 +289,8 @@ class Connection(Base, LoggingMixin):
|
|||
else:
|
||||
return self._extra
|
||||
|
||||
def set_extra(self, value):
|
||||
def set_extra(self, value: str):
|
||||
"""Encrypt extra-data and save in object attribute to object."""
|
||||
if value:
|
||||
fernet = get_fernet()
|
||||
self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
|
||||
|
@ -236,11 +300,13 @@ class Connection(Base, LoggingMixin):
|
|||
self.is_extra_encrypted = False
|
||||
|
||||
@declared_attr
|
||||
def extra(cls):
|
||||
def extra(cls): # pylint: disable=no-self-argument
|
||||
"""Extra data. The value is decrypted/encrypted when reading/setting the value."""
|
||||
return synonym('_extra',
|
||||
descriptor=property(cls.get_extra, cls.set_extra))
|
||||
|
||||
def rotate_fernet_key(self):
|
||||
"""Encrypts data with a new key. See: :ref:`security/fernet`. """
|
||||
fernet = get_fernet()
|
||||
if self._password and self.is_encrypted:
|
||||
self._password = fernet.rotate(self._password.encode('utf-8')).decode()
|
||||
|
@ -248,6 +314,7 @@ class Connection(Base, LoggingMixin):
|
|||
self._extra = fernet.rotate(self._extra.encode('utf-8')).decode()
|
||||
|
||||
def get_hook(self):
|
||||
"""Return hook based on conn_type."""
|
||||
hook_class_name, conn_id_param = CONN_TYPE_TO_HOOK.get(self.conn_type, (None, None))
|
||||
if not hook_class_name:
|
||||
raise AirflowException('Unknown hook type "{}"'.format(self.conn_type))
|
||||
|
@ -258,6 +325,16 @@ class Connection(Base, LoggingMixin):
|
|||
return self.conn_id
|
||||
|
||||
def log_info(self):
|
||||
"""
|
||||
This method is deprecated. You can read each field individually or use the
|
||||
default representation (`__repr__`).
|
||||
"""
|
||||
warnings.warn(
|
||||
"This method is deprecated. You can read each field individually or "
|
||||
"use the default representation (__repr__).",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
return ("id: {}. Host: {}, Port: {}, Schema: {}, "
|
||||
"Login: {}, Password: {}, extra: {}".
|
||||
format(self.conn_id,
|
||||
|
@ -269,6 +346,16 @@ class Connection(Base, LoggingMixin):
|
|||
"XXXXXXXX" if self.extra_dejson else None))
|
||||
|
||||
def debug_info(self):
|
||||
"""
|
||||
This method is deprecated. You can read each field individually or use the
|
||||
default representation (`__repr__`).
|
||||
"""
|
||||
warnings.warn(
|
||||
"This method is deprecated. You can read each field individually or "
|
||||
"use the default representation (__repr__).",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
return ("id: {}. Host: {}, Port: {}, Schema: {}, "
|
||||
"Login: {}, Password: {}, extra: {}".
|
||||
format(self.conn_id,
|
||||
|
@ -280,7 +367,7 @@ class Connection(Base, LoggingMixin):
|
|||
self.extra_dejson))
|
||||
|
||||
@property
|
||||
def extra_dejson(self):
|
||||
def extra_dejson(self) -> Dict:
|
||||
"""Returns the extra property by deserializing json."""
|
||||
obj = {}
|
||||
if self.extra:
|
||||
|
|
|
@ -933,10 +933,9 @@ class CloudSQLDatabaseHook(BaseHook):
|
|||
Create Connection object, according to whether it uses proxy, TCP, UNIX sockets, SSL.
|
||||
Connection ID will be randomly generated.
|
||||
"""
|
||||
connection = Connection(conn_id=self.db_conn_id)
|
||||
uri = self._generate_connection_uri()
|
||||
connection = Connection(conn_id=self.db_conn_id, uri=uri)
|
||||
self.log.info("Creating connection %s", self.db_conn_id)
|
||||
connection.parse_from_uri(uri)
|
||||
return connection
|
||||
|
||||
def get_sqlproxy_runner(self) -> CloudSqlProxyRunner:
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
./airflow/logging_config.py
|
||||
./airflow/macros/hive.py
|
||||
./airflow/migrations/env.py
|
||||
./airflow/models/connection.py
|
||||
./airflow/models/crypto.py
|
||||
./airflow/models/dag.py
|
||||
./airflow/models/dagbag.py
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import re
|
||||
import unittest
|
||||
from collections import namedtuple
|
||||
from unittest import mock
|
||||
|
@ -24,6 +25,7 @@ import sqlalchemy
|
|||
from cryptography.fernet import Fernet
|
||||
from parameterized import parameterized
|
||||
|
||||
from airflow import AirflowException
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.models import Connection, crypto
|
||||
from airflow.models.connection import CONN_TYPE_TO_HOOK
|
||||
|
@ -343,7 +345,7 @@ class TestConnection(unittest.TestCase):
|
|||
else:
|
||||
conn_kwargs.update({k: v})
|
||||
|
||||
connection = Connection(conn_id='test_conn', **conn_kwargs)
|
||||
connection = Connection(conn_id='test_conn', **conn_kwargs) # type: ignore
|
||||
gen_uri = connection.get_uri()
|
||||
new_conn = Connection(conn_id='test_conn', uri=gen_uri)
|
||||
for conn_attr, expected_val in test_config.test_conn_attributes.items():
|
||||
|
@ -514,6 +516,16 @@ class TestConnection(unittest.TestCase):
|
|||
assert conns[0].password == 'password'
|
||||
assert conns[0].port == 5432
|
||||
|
||||
def test_connection_mixed(self):
|
||||
with self.assertRaisesRegex(
|
||||
AirflowException,
|
||||
re.escape(
|
||||
"You must create an object using the URI or individual values (conn_type, host, login, "
|
||||
"password, schema, port or extra).You can't mix these two ways to create this object."
|
||||
)
|
||||
):
|
||||
Connection(conn_id="TEST_ID", uri="mysql://", schema="AAA")
|
||||
|
||||
|
||||
class TestConnTypeToHook(unittest.TestCase):
|
||||
def test_enforce_alphabetical_order(self):
|
||||
|
|
|
@ -384,7 +384,7 @@ class TestHiveMetastoreHook(TestHiveEnvironment):
|
|||
self.assertIsInstance(max_partition, str)
|
||||
|
||||
@mock.patch("airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook.get_connection",
|
||||
return_value=[Connection(host="localhost", port="9802")])
|
||||
return_value=[Connection(host="localhost", port=9802)])
|
||||
@mock.patch("airflow.providers.apache.hive.hooks.hive.socket")
|
||||
def test_error_metastore_client(self, socket_mock, _find_valid_server_mock):
|
||||
socket_mock.socket.return_value.connect_ex.return_value = 0
|
||||
|
|
|
@ -35,7 +35,7 @@ class TestLivyHook(unittest.TestCase):
|
|||
@classmethod
|
||||
def setUpClass(cls):
|
||||
db.merge_conn(
|
||||
Connection(conn_id='livy_default', conn_type='http', host='host', schema='http', port='8998'))
|
||||
Connection(conn_id='livy_default', conn_type='http', host='host', schema='http', port=8998))
|
||||
db.merge_conn(Connection(conn_id='default_port', conn_type='http', host='http://host'))
|
||||
db.merge_conn(Connection(conn_id='default_protocol', conn_type='http', host='host'))
|
||||
db.merge_conn(Connection(conn_id='port_set', host='host', conn_type='http', port=1234))
|
||||
|
@ -43,7 +43,7 @@ class TestLivyHook(unittest.TestCase):
|
|||
db.merge_conn(
|
||||
Connection(conn_id='dont_override_schema', conn_type='http', host='http://host', schema='zzz'))
|
||||
db.merge_conn(Connection(conn_id='missing_host', conn_type='http', port=1234))
|
||||
db.merge_conn(Connection(conn_id='invalid_uri', conn_type='http', uri='http://invalid_uri:4321'))
|
||||
db.merge_conn(Connection(conn_id='invalid_uri', uri='http://invalid_uri:4321'))
|
||||
|
||||
def test_build_get_hook(self):
|
||||
|
||||
|
|
|
@ -821,8 +821,7 @@ class TestCloudSqlDatabaseHook(unittest.TestCase):
|
|||
])
|
||||
@mock.patch('airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection')
|
||||
def test_cloudsql_database_hook_create_connection_missing_fields(self, uri, get_connection):
|
||||
connection = Connection()
|
||||
connection.parse_from_uri(uri)
|
||||
connection = Connection(uri=uri)
|
||||
params = {
|
||||
"location": "test",
|
||||
"instance": "instance",
|
||||
|
@ -841,8 +840,7 @@ class TestCloudSqlDatabaseHook(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection')
|
||||
def test_cloudsql_database_hook_get_sqlproxy_runner_no_proxy(self, get_connection):
|
||||
connection = Connection()
|
||||
connection.parse_from_uri("http://user:password@host:80/database")
|
||||
connection = Connection(uri="http://user:password@host:80/database")
|
||||
connection.set_extra(json.dumps({
|
||||
"location": "test",
|
||||
"instance": "instance",
|
||||
|
@ -858,8 +856,7 @@ class TestCloudSqlDatabaseHook(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection')
|
||||
def test_cloudsql_database_hook_get_sqlproxy_runner(self, get_connection):
|
||||
connection = Connection()
|
||||
connection.parse_from_uri("http://user:password@host:80/database")
|
||||
connection = Connection(uri="http://user:password@host:80/database")
|
||||
connection.set_extra(json.dumps({
|
||||
"location": "test",
|
||||
"instance": "instance",
|
||||
|
@ -876,8 +873,7 @@ class TestCloudSqlDatabaseHook(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection')
|
||||
def test_cloudsql_database_hook_get_database_hook(self, get_connection):
|
||||
connection = Connection()
|
||||
connection.parse_from_uri("http://user:password@host:80/database")
|
||||
connection = Connection(uri="http://user:password@host:80/database")
|
||||
connection.set_extra(json.dumps({
|
||||
"location": "test",
|
||||
"instance": "instance",
|
||||
|
|
|
@ -714,10 +714,8 @@ class TestCloudSqlQueryValidation(unittest.TestCase):
|
|||
gcp_connection = mock.MagicMock()
|
||||
gcp_connection.extra_dejson = mock.MagicMock()
|
||||
gcp_connection.extra_dejson.get.return_value = 'empty_project'
|
||||
cloudsql_connection = Connection()
|
||||
cloudsql_connection.parse_from_uri(uri)
|
||||
cloudsql_connection2 = Connection()
|
||||
cloudsql_connection2.parse_from_uri(uri)
|
||||
cloudsql_connection = Connection(uri=uri)
|
||||
cloudsql_connection2 = Connection(uri=uri)
|
||||
get_connections.side_effect = [[gcp_connection], [cloudsql_connection],
|
||||
[cloudsql_connection2]]
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче