From b93f2649aed7fe566b987ec4cb135a2ec643acd6 Mon Sep 17 00:00:00 2001 From: Felix Uellendall Date: Fri, 5 Apr 2019 23:49:25 +0200 Subject: [PATCH] [AIRFLOW-4220] Change CloudantHook to a new major version and add tests (#5023) - upgrade cloudant version from `>=0.5.9,<2.0` to `>=2.0` - remove the use of the `schema` attribute in the connection - remove `db` function since the database object can also be retrieved by calling `cloudant_session['database_name']` - update docs - refactor code --- UPDATING.md | 18 +++++- airflow/contrib/hooks/cloudant_hook.py | 74 +++++++++-------------- setup.py | 2 +- tests/contrib/hooks/test_cloudant_hook.py | 46 ++++++++++++++ 4 files changed, 92 insertions(+), 48 deletions(-) create mode 100644 tests/contrib/hooks/test_cloudant_hook.py diff --git a/UPDATING.md b/UPDATING.md index e93c6b53b4..70e6e16af9 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,12 +24,28 @@ assists users migrating to a new version. ## Airflow Master +### Changes to CloudantHook + +* upgraded cloudant version from `>=0.5.9,<2.0` to `>=2.0` +* removed the use of the `schema` attribute in the connection +* removed `db` function since the database object can also be retrieved by calling `cloudant_session['database_name']` + +For example: +```python +from airflow.contrib.hooks.cloudant_hook import CloudantHook + +with CloudantHook().get_conn() as cloudant_session: + database = cloudant_session['database_name'] +``` + +See the [docs](https://python-cloudant.readthedocs.io/en/latest/) for more information on how to use the new cloudant version. + ### Changes to DatastoreHook * removed argument `version` from `get_conn` function and added it to the hook's `__init__` function instead and renamed it to `api_version` * renamed the `partialKeys` argument of function `allocate_ids` to `partial_keys` -#### Unify default conn_id for Google Cloud Platform +### Unify default conn_id for Google Cloud Platform Previously not all hooks and operators related to Google Cloud Platform use ``google_cloud_default`` as a default conn_id. There is currently one default diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py index 5d39f3fa8a..64a49ca2b8 100644 --- a/airflow/contrib/hooks/cloudant_hook.py +++ b/airflow/contrib/hooks/cloudant_hook.py @@ -17,67 +17,49 @@ # specific language governing permissions and limitations # under the License. -from past.builtins import unicode - -import cloudant +from cloudant import cloudant from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.logging_mixin import LoggingMixin class CloudantHook(BaseHook): - """Interact with Cloudant. - - This class is a thin wrapper around the cloudant python library. See the - documentation `here `_. """ + Interact with Cloudant. This class is a thin wrapper around the cloudant python library. + + .. seealso:: the latest documentation `here `_. + + :param cloudant_conn_id: The connection id to authenticate and get a session object from cloudant. + :type cloudant_conn_id: str + """ + def __init__(self, cloudant_conn_id='cloudant_default'): - super(CloudantHook, self).__init__('cloudant') + super(CloudantHook, self).__init__(cloudant_conn_id) self.cloudant_conn_id = cloudant_conn_id def get_conn(self): - def _str(s): - # cloudant-python doesn't support unicode. - if isinstance(s, unicode): - log = LoggingMixin().log - log.debug( - 'cloudant-python does not support unicode. Encoding %s as ' - 'ascii using "ignore".', s - ) - return s.encode('ascii', 'ignore') + """ + Opens a connection to the cloudant service and closes it automatically if used as context manager. - return s + .. note:: + In the connection form: + - 'host' equals the 'Account' (optional) + - 'login' equals the 'Username (or API Key)' (required) + - 'password' equals the 'Password' (required) + :return: an authorized cloudant session context manager object. + :rtype: cloudant + """ conn = self.get_connection(self.cloudant_conn_id) - for conn_param in ['host', 'password', 'schema']: - if not hasattr(conn, conn_param) or not getattr(conn, conn_param): - raise AirflowException( - 'missing connection parameter {0}'.format(conn_param) - ) + self._validate_connection(conn) - # In the connection form: - # - 'host' is renamed to 'Account' - # - 'login' is renamed 'Username (or API Key)' - # - 'schema' is renamed to 'Database' - # - # So, use the 'host' attribute as the account name, and, if login is - # defined, use that as the username. - account = cloudant.Account(_str(conn.host)) + cloudant_session = cloudant(user=conn.login, passwd=conn.password, account=conn.host) - username = _str(conn.login or conn.host) + return cloudant_session - account.login( - username, - _str(conn.password)).raise_for_status() - - return account.database(_str(conn.schema)) - - def db(self): - """Returns the Database object for this hook. - - See the documentation for cloudant-python here - https://github.com/cloudant-labs/cloudant-python. - """ - return self.get_conn() + def _validate_connection(self, conn): + for conn_param in ['login', 'password']: + if not getattr(conn, conn_param): + raise AirflowException('missing connection parameter {conn_param}'.format( + conn_param=conn_param)) diff --git a/setup.py b/setup.py index a7c90bf5f3..f6ff21577d 100644 --- a/setup.py +++ b/setup.py @@ -152,7 +152,7 @@ cgroups = [ 'cgroupspy>=0.1.4', ] # major update coming soon, clamp to 0.x -cloudant = ['cloudant>=0.5.9,<2.0'] +cloudant = ['cloudant>=2.0'] crypto = ['cryptography>=0.9.3'] dask = [ 'distributed>=1.17.1, <2' diff --git a/tests/contrib/hooks/test_cloudant_hook.py b/tests/contrib/hooks/test_cloudant_hook.py new file mode 100644 index 0000000000..ed12ae529e --- /dev/null +++ b/tests/contrib/hooks/test_cloudant_hook.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from airflow.contrib.hooks.cloudant_hook import CloudantHook +from airflow.exceptions import AirflowException +from airflow.models.connection import Connection +from tests.compat import patch + + +class TestCloudantHook(unittest.TestCase): + + def setUp(self): + self.cloudant_hook = CloudantHook() + + @patch('airflow.contrib.hooks.cloudant_hook.CloudantHook.get_connection', + return_value=Connection(login='user', password='password', host='account')) + @patch('airflow.contrib.hooks.cloudant_hook.cloudant') + def test_get_conn(self, mock_cloudant, mock_get_connection): + cloudant_session = self.cloudant_hook.get_conn() + + conn = mock_get_connection.return_value + mock_cloudant.assert_called_once_with(user=conn.login, passwd=conn.password, account=conn.host) + self.assertEqual(cloudant_session, mock_cloudant.return_value) + + @patch('airflow.contrib.hooks.cloudant_hook.CloudantHook.get_connection', + return_value=Connection(login='user')) + def test_get_conn_invalid_connection(self, mock_get_connection): + with self.assertRaises(AirflowException): + self.cloudant_hook.get_conn()