[AIRFLOW-4220] Change CloudantHook to a new major version and add tests (#5023)

- upgrade cloudant version from `>=0.5.9,<2.0` to `>=2.0`
- remove the use of the `schema` attribute in the connection
- remove `db` function since the database object can also be retrieved by calling `cloudant_session['database_name']`
- update docs
- refactor code
This commit is contained in:
Felix Uellendall 2019-04-05 23:49:25 +02:00 коммит произвёл Kaxil Naik
Родитель e732006fdd
Коммит b93f2649ae
4 изменённых файлов: 92 добавлений и 48 удалений

Просмотреть файл

@ -24,12 +24,28 @@ assists users migrating to a new version.
## Airflow Master
### Changes to CloudantHook
* upgraded cloudant version from `>=0.5.9,<2.0` to `>=2.0`
* removed the use of the `schema` attribute in the connection
* removed `db` function since the database object can also be retrieved by calling `cloudant_session['database_name']`
For example:
```python
from airflow.contrib.hooks.cloudant_hook import CloudantHook
with CloudantHook().get_conn() as cloudant_session:
database = cloudant_session['database_name']
```
See the [docs](https://python-cloudant.readthedocs.io/en/latest/) for more information on how to use the new cloudant version.
### Changes to DatastoreHook
* removed argument `version` from `get_conn` function and added it to the hook's `__init__` function instead and renamed it to `api_version`
* renamed the `partialKeys` argument of function `allocate_ids` to `partial_keys`
#### Unify default conn_id for Google Cloud Platform
### Unify default conn_id for Google Cloud Platform
Previously not all hooks and operators related to Google Cloud Platform use
``google_cloud_default`` as a default conn_id. There is currently one default

Просмотреть файл

@ -17,67 +17,49 @@
# specific language governing permissions and limitations
# under the License.
from past.builtins import unicode
import cloudant
from cloudant import cloudant
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
class CloudantHook(BaseHook):
"""Interact with Cloudant.
This class is a thin wrapper around the cloudant python library. See the
documentation `here <https://github.com/cloudant-labs/cloudant-python>`_.
"""
Interact with Cloudant. This class is a thin wrapper around the cloudant python library.
.. seealso:: the latest documentation `here <https://python-cloudant.readthedocs.io/en/latest/>`_.
:param cloudant_conn_id: The connection id to authenticate and get a session object from cloudant.
:type cloudant_conn_id: str
"""
def __init__(self, cloudant_conn_id='cloudant_default'):
super(CloudantHook, self).__init__('cloudant')
super(CloudantHook, self).__init__(cloudant_conn_id)
self.cloudant_conn_id = cloudant_conn_id
def get_conn(self):
def _str(s):
# cloudant-python doesn't support unicode.
if isinstance(s, unicode):
log = LoggingMixin().log
log.debug(
'cloudant-python does not support unicode. Encoding %s as '
'ascii using "ignore".', s
)
return s.encode('ascii', 'ignore')
"""
Opens a connection to the cloudant service and closes it automatically if used as context manager.
return s
.. note::
In the connection form:
- 'host' equals the 'Account' (optional)
- 'login' equals the 'Username (or API Key)' (required)
- 'password' equals the 'Password' (required)
:return: an authorized cloudant session context manager object.
:rtype: cloudant
"""
conn = self.get_connection(self.cloudant_conn_id)
for conn_param in ['host', 'password', 'schema']:
if not hasattr(conn, conn_param) or not getattr(conn, conn_param):
raise AirflowException(
'missing connection parameter {0}'.format(conn_param)
)
self._validate_connection(conn)
# In the connection form:
# - 'host' is renamed to 'Account'
# - 'login' is renamed 'Username (or API Key)'
# - 'schema' is renamed to 'Database'
#
# So, use the 'host' attribute as the account name, and, if login is
# defined, use that as the username.
account = cloudant.Account(_str(conn.host))
cloudant_session = cloudant(user=conn.login, passwd=conn.password, account=conn.host)
username = _str(conn.login or conn.host)
return cloudant_session
account.login(
username,
_str(conn.password)).raise_for_status()
return account.database(_str(conn.schema))
def db(self):
"""Returns the Database object for this hook.
See the documentation for cloudant-python here
https://github.com/cloudant-labs/cloudant-python.
"""
return self.get_conn()
def _validate_connection(self, conn):
for conn_param in ['login', 'password']:
if not getattr(conn, conn_param):
raise AirflowException('missing connection parameter {conn_param}'.format(
conn_param=conn_param))

Просмотреть файл

@ -152,7 +152,7 @@ cgroups = [
'cgroupspy>=0.1.4',
]
# major update coming soon, clamp to 0.x
cloudant = ['cloudant>=0.5.9,<2.0']
cloudant = ['cloudant>=2.0']
crypto = ['cryptography>=0.9.3']
dask = [
'distributed>=1.17.1, <2'

Просмотреть файл

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from airflow.contrib.hooks.cloudant_hook import CloudantHook
from airflow.exceptions import AirflowException
from airflow.models.connection import Connection
from tests.compat import patch
class TestCloudantHook(unittest.TestCase):
def setUp(self):
self.cloudant_hook = CloudantHook()
@patch('airflow.contrib.hooks.cloudant_hook.CloudantHook.get_connection',
return_value=Connection(login='user', password='password', host='account'))
@patch('airflow.contrib.hooks.cloudant_hook.cloudant')
def test_get_conn(self, mock_cloudant, mock_get_connection):
cloudant_session = self.cloudant_hook.get_conn()
conn = mock_get_connection.return_value
mock_cloudant.assert_called_once_with(user=conn.login, passwd=conn.password, account=conn.host)
self.assertEqual(cloudant_session, mock_cloudant.return_value)
@patch('airflow.contrib.hooks.cloudant_hook.CloudantHook.get_connection',
return_value=Connection(login='user'))
def test_get_conn_invalid_connection(self, mock_get_connection):
with self.assertRaises(AirflowException):
self.cloudant_hook.get_conn()