[AIRFLOW-4543] Update slack operator to support slackclient v2 (#5519)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Kaxil Naik <8811558+kaxil@users.noreply.github.com>
This commit is contained in:
Sergio Kef 2020-05-12 16:15:18 +02:00 коммит произвёл GitHub
Родитель 01db738ded
Коммит 578fc514cd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 187 добавлений и 111 удалений

Просмотреть файл

@ -16,9 +16,10 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for Slack"""
from typing import Optional
from typing import Any, Optional
from slackclient import SlackClient
from slack import WebClient
from slack.errors import SlackClientError # pylint: disable=E0611
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
@ -27,40 +28,88 @@ from airflow.hooks.base_hook import BaseHook
# noinspection PyAbstractClass
class SlackHook(BaseHook):
"""
Creates a Slack connection, to be used for calls.
Takes both Slack API token directly and connection that has Slack API token.
If both supplied, Slack API token will be used.
Exposes also the rest of slack.WebClient args.
Examples:
.. code-block:: python
# Create hook
slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack")
# Call generic API with parameters (errors are handled by hook)
# For more details check https://api.slack.com/methods/chat.postMessage
slack_hook.call("chat.postMessage", json={"channel": "#random", "text": "Hello world!"})
# Call method from Slack SDK (you have to handle errors yourself)
# For more details check https://slack.dev/python-slackclient/basic_usage.html#sending-a-message
slack_hook.client.chat_postMessage(channel="#random", text="Hello world!")
:param token: Slack API token
:type token: str
:param slack_conn_id: connection that has Slack API token in the password field
:type slack_conn_id: str
:param use_session: A boolean specifying if the client should take advantage of
connection pooling. Default is True.
:type use_session: bool
:param base_url: A string representing the Slack API base URL. Default is
``https://www.slack.com/api/``
:type base_url: str
:param timeout: The maximum number of seconds the client will wait
to connect and receive a response from Slack. Default is 30 seconds.
:type timeout: int
"""
def __init__(self, token: Optional[str] = None, slack_conn_id: Optional[str] = None) -> None:
def __init__(
self,
token: Optional[str] = None,
slack_conn_id: Optional[str] = None,
**client_args: Any,
) -> None:
super().__init__()
self.token = self.__get_token(token, slack_conn_id)
self.client = WebClient(token, **client_args)
def __get_token(self, token, slack_conn_id):
if token is not None:
return token
elif slack_conn_id is not None:
if slack_conn_id is not None:
conn = self.get_connection(slack_conn_id)
if not getattr(conn, 'password', None):
raise AirflowException('Missing token(password) in Slack connection')
return conn.password
else:
raise AirflowException('Cannot get token: '
'No valid Slack token nor slack_conn_id supplied.')
def call(self, method: str, api_params: dict) -> None:
raise AirflowException('Cannot get token: '
'No valid Slack token nor slack_conn_id supplied.')
def call(self, *args, **kwargs) -> None:
"""
Calls the Slack client.
Calls Slack WebClient `WebClient.api_call` with given arguments.
:param method: method
:param api_params: parameters of the API
:param api_method: The target Slack API method. e.g. 'chat.postMessage'. Required.
:type api_method: str
:param http_verb: HTTP Verb. Optional (defaults to 'POST')
:type http_verb: str
:param files: Files to multipart upload. e.g. {imageORfile: file_objectORfile_path}
:type files: dict
:param data: The body to attach to the request. If a dictionary is provided,
form-encoding will take place. Optional.
:type data: dict or aiohttp.FormData
:param params: The URL parameters to append to the URL. Optional.
:type params: dict
:param json: JSON for the body to attach to the request. Optional.
:type json: dict
"""
slack_client = SlackClient(self.token)
return_code = slack_client.api_call(method, **api_params)
if not return_code['ok']:
msg = "Slack API call failed ({})".format(return_code['error'])
return_code = self.client.api_call(*args, **kwargs)
try:
return_code.validate()
except SlackClientError as exc:
msg = f"Slack API call failed ({exc})"
raise AirflowException(msg)

Просмотреть файл

@ -19,7 +19,6 @@
import json
from typing import Dict, List, Optional
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.utils.decorators import apply_defaults
@ -30,14 +29,17 @@ class SlackAPIOperator(BaseOperator):
Base Slack Operator
The SlackAPIPostOperator is derived from this operator.
In the future additional Slack API Operators will be derived from this class as well
Only one of `slack_conn_id` and `token` is required.
:param slack_conn_id: Slack connection ID which its password is Slack API token
:param slack_conn_id: Slack connection ID which its password is Slack API token. Optional
:type slack_conn_id: str
:param token: Slack API token (https://api.slack.com/web)
:param token: Slack API token (https://api.slack.com/web). Optional
:type token: str
:param method: The Slack API Method to Call (https://api.slack.com/methods)
:param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
:type method: str
:param api_params: API Method call parameters (https://api.slack.com/methods)
:param api_params: API Method call parameters (https://api.slack.com/methods). Optional
:type api_params: dict
:param client_args: Slack Hook parameters. Optional. Check airflow.providers.slack.hooks.SlackHook
:type api_params: dict
"""
@ -50,12 +52,6 @@ class SlackAPIOperator(BaseOperator):
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
if token is None and slack_conn_id is None:
raise AirflowException('No valid Slack token nor slack_conn_id supplied.')
if token is not None and slack_conn_id is not None:
raise AirflowException('Cannot determine Slack credential '
'when both token and slack_conn_id are supplied.')
self.token = token # type: Optional[str]
self.slack_conn_id = slack_conn_id # type: Optional[str]
@ -73,6 +69,9 @@ class SlackAPIOperator(BaseOperator):
which sets self.api_call_params with a dict of
API call parameters (https://api.slack.com/methods)
"""
raise NotImplementedError(
"SlackAPIOperator should not be used directly. Chose one of the subclasses instead"
)
def execute(self, **kwargs):
"""
@ -82,13 +81,25 @@ class SlackAPIOperator(BaseOperator):
if not self.api_params:
self.construct_api_call_params()
slack = SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
slack.call(self.method, self.api_params)
slack.call(self.method, json=self.api_params)
class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to a slack channel
Examples:
.. code-block:: python
slack = SlackAPIPostOperator(
task_id="post_hello",
dag=dag,
token="XXX",
text="hello there!",
channel="#random",
)
:param channel: channel in which to post message on slack name (#general) or
ID (C12318391). (templated)
:type channel: str

Просмотреть файл

@ -33,6 +33,7 @@ Unidecode==1.1.1
WTForms==2.3.1
Werkzeug==0.16.1
adal==1.2.3
aiohttp==3.6.2
alabaster==0.7.12
alembic==1.4.2
amqp==2.5.2
@ -45,6 +46,7 @@ argcomplete==1.11.1
asn1crypto==1.3.0
astroid==2.3.3
async-generator==1.10
async-timeout==3.0.1
atlasclient==1.0.0
attrs==19.3.0
aws-sam-translator==1.23.0
@ -120,7 +122,7 @@ fastavro==0.23.3
filelock==3.0.12
fissix==19.2b1
flake8-colors==0.1.6
flake8==3.8.0
flake8==3.8.1
flaky==3.6.1
flask-swagger==0.2.13
flower==0.9.4
@ -174,6 +176,7 @@ httplib2==0.17.3
humanize==0.5.1
hvac==0.10.1
identify==1.4.15
idna-ssl==1.1.0
idna==2.9
ijson==2.6.1
imagesize==1.2.0
@ -216,6 +219,7 @@ msgpack==1.0.0
msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
multidict==4.7.5
mypy-extensions==0.4.3
mypy==0.770
mysql-connector-python==8.0.18
@ -314,10 +318,10 @@ setproctitle==1.1.10
sh==1.13.1
simple-salesforce==1.0.0
six==1.14.0
slackclient==1.3.2
slackclient==2.5.0
smmap==3.0.4
snowballstemmer==2.0.0
snowflake-connector-python==2.2.5
snowflake-connector-python==2.2.6
snowflake-sqlalchemy==1.2.3
sortedcontainers==2.1.0
soupsieve==2.0
@ -369,6 +373,7 @@ wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.23.0
yandexcloud==0.35.0
yarl==1.4.2
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0

Просмотреть файл

@ -33,6 +33,7 @@ Unidecode==1.1.1
WTForms==2.3.1
Werkzeug==0.16.1
adal==1.2.3
aiohttp==3.6.2
alabaster==0.7.12
alembic==1.4.2
amqp==2.5.2
@ -45,6 +46,7 @@ argcomplete==1.11.1
asn1crypto==1.3.0
astroid==2.3.3
async-generator==1.10
async-timeout==3.0.1
atlasclient==1.0.0
attrs==19.3.0
aws-sam-translator==1.23.0
@ -119,7 +121,7 @@ fastavro==0.23.3
filelock==3.0.12
fissix==19.2b1
flake8-colors==0.1.6
flake8==3.8.0
flake8==3.8.1
flaky==3.6.1
flask-swagger==0.2.13
flower==0.9.4
@ -213,6 +215,7 @@ msgpack==1.0.0
msrest==0.6.13
msrestazure==0.6.3
multi-key-dict==2.0.3
multidict==4.7.5
mypy-extensions==0.4.3
mypy==0.770
mysql-connector-python==8.0.18
@ -310,10 +313,10 @@ setproctitle==1.1.10
sh==1.13.1
simple-salesforce==1.0.0
six==1.14.0
slackclient==1.3.2
slackclient==2.5.0
smmap==3.0.4
snowballstemmer==2.0.0
snowflake-connector-python==2.2.5
snowflake-connector-python==2.2.6
snowflake-sqlalchemy==1.2.3
sortedcontainers==2.1.0
soupsieve==2.0
@ -364,6 +367,7 @@ wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.23.0
yandexcloud==0.35.0
yarl==1.4.2
zdesk==2.7.1
zict==2.0.0
zipp==3.1.0

Просмотреть файл

@ -1 +1 @@
4311e187f8fc829ca83a33356a9d1947 /opt/airflow/setup.py
22f14063a514a325525c530b7c30f562 /opt/airflow/setup.py

Просмотреть файл

@ -1 +1 @@
4311e187f8fc829ca83a33356a9d1947 /opt/airflow/setup.py
22f14063a514a325525c530b7c30f562 /opt/airflow/setup.py

Просмотреть файл

@ -380,7 +380,7 @@ sentry = [
]
singularity = ['spython>=0.0.56']
slack = [
'slackclient>=1.0.0,<2.0.0',
'slackclient>=2.0.0,<3.0.0',
]
snowflake = [
'snowflake-connector-python>=1.5.2',

Просмотреть файл

@ -19,60 +19,95 @@
import unittest
import mock
from slack.errors import SlackApiError
from airflow.exceptions import AirflowException
from airflow.providers.slack.hooks.slack import SlackHook
class TestSlackHook(unittest.TestCase):
def test_init_with_token_only(self):
test_token = 'test_token'
slack_hook = SlackHook(token=test_token, slack_conn_id=None)
self.assertEqual(slack_hook.token, test_token)
def test_get_token_with_token_only(self):
""" tests `__get_token` method when only token is provided """
# Given
test_token = 'test_token'
test_conn_id = None
# Run
hook = SlackHook(test_token, test_conn_id)
# Assert
output = hook.token
expected = test_token
self.assertEqual(output, expected)
@mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
def test_get_token_with_valid_slack_conn_id_only(self, get_connection_mock):
""" tests `__get_token` method when only connection is provided """
# Given
test_token = None
test_conn_id = 'x'
test_password = 'test_password'
# Mock
get_connection_mock.return_value = mock.Mock(password=test_password)
test_slack_conn_id = 'test_slack_conn_id'
slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
# Run
hook = SlackHook(test_token, test_conn_id)
get_connection_mock.assert_called_once_with(test_slack_conn_id)
self.assertEqual(slack_hook.token, test_password)
# Assert
output = hook.token
expected = test_password
self.assertEqual(output, expected)
@mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
def test_init_with_no_password_slack_conn_id_only(self, get_connection_mock):
def test_get_token_with_no_password_slack_conn_id_only(self, get_connection_mock):
""" tests `__get_token` method when only connection is provided """
# Mock
conn = mock.Mock()
del conn.password
get_connection_mock.return_value = conn
test_slack_conn_id = 'test_slack_conn_id'
self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=test_slack_conn_id)
# Assert
self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id='x')
@mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
def test_init_with_empty_password_slack_conn_id_only(self, get_connection_mock):
def test_get_token_with_empty_password_slack_conn_id_only(self, get_connection_mock):
""" tests `__get_token` method when only connection is provided """
# Mock
get_connection_mock.return_value = mock.Mock(password=None)
test_slack_conn_id = 'test_slack_conn_id'
self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=test_slack_conn_id)
# Assert
self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id='x')
def test_init_with_token_and_slack_conn_id(self):
def test_get_token_with_token_and_slack_conn_id(self):
""" tests `__get_token` method when both arguments are provided """
# Given
test_token = 'test_token'
test_slack_conn_id = 'test_slack_conn_id'
slack_hook = SlackHook(token=test_token, slack_conn_id=test_slack_conn_id)
test_conn_id = 'x'
self.assertEqual(slack_hook.token, test_token)
# Run
hook = SlackHook(test_token, test_conn_id)
# Assert
output = hook.token
expected = test_token
self.assertEqual(output, expected)
def test_get_token_with_out_token_nor_slack_conn_id(self):
""" tests `__get_token` method when no arguments are provided """
def test_init_with_out_token_nor_slack_conn_id(self):
self.assertRaises(AirflowException, SlackHook, token=None, slack_conn_id=None)
@mock.patch('airflow.providers.slack.hooks.slack.SlackClient')
@mock.patch('airflow.providers.slack.hooks.slack.WebClient')
def test_call_with_success(self, slack_client_class_mock):
slack_client_mock = mock.Mock()
slack_client_class_mock.return_value = slack_client_mock
slack_client_mock.api_call.return_value = {'ok': True}
slack_response = mock.Mock()
slack_client_mock.api_call.return_value = slack_response
slack_response.validate.return_value = True
test_token = 'test_token'
test_slack_conn_id = 'test_slack_conn_id'
@ -80,16 +115,20 @@ class TestSlackHook(unittest.TestCase):
test_method = 'test_method'
test_api_params = {'key1': 'value1', 'key2': 'value2'}
slack_hook.call(test_method, test_api_params)
slack_hook.call(test_method, json=test_api_params)
slack_client_class_mock.assert_called_once_with(test_token)
slack_client_mock.api_call.assert_called_once_with(test_method, **test_api_params)
slack_client_mock.api_call.assert_called_once_with(test_method, json=test_api_params)
self.assertEqual(slack_response.validate.call_count, 1)
@mock.patch('airflow.providers.slack.hooks.slack.SlackClient')
@mock.patch('airflow.providers.slack.hooks.slack.WebClient')
def test_call_with_failure(self, slack_client_class_mock):
slack_client_mock = mock.Mock()
slack_client_class_mock.return_value = slack_client_mock
slack_client_mock.api_call.return_value = {'ok': False, 'error': 'test_error'}
slack_response = mock.Mock()
slack_client_mock.api_call.return_value = slack_response
expected_exception = SlackApiError(message='foo', response='bar')
slack_response.validate = mock.Mock(side_effect=expected_exception)
test_token = 'test_token'
test_slack_conn_id = 'test_slack_conn_id'
@ -97,4 +136,19 @@ class TestSlackHook(unittest.TestCase):
test_method = 'test_method'
test_api_params = {'key1': 'value1', 'key2': 'value2'}
self.assertRaises(AirflowException, slack_hook.call, test_method, test_api_params)
try:
slack_hook.call(test_method, test_api_params)
self.fail()
except AirflowException as exc:
self.assertIn("foo", str(exc))
self.assertIn("bar", str(exc))
@mock.patch('airflow.providers.slack.hooks.slack.WebClient.api_call', autospec=True)
@mock.patch('airflow.providers.slack.hooks.slack.WebClient')
def test_api_call(self, mock_slack_client, mock_slack_api_call):
slack_hook = SlackHook(token='test_token')
test_api_json = {'channel': 'test_channel'}
slack_hook.call("chat.postMessage", json=test_api_json)
mock_slack_api_call.assert_called_once_with(
mock_slack_client, "chat.postMessage", json=test_api_json)

Просмотреть файл

@ -21,7 +21,6 @@ import unittest
import mock
from airflow.exceptions import AirflowException
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
@ -104,52 +103,6 @@ class TestSlackAPIPostOperator(unittest.TestCase):
api_params=test_api_params,
)
@mock.patch('airflow.providers.slack.operators.slack.SlackHook')
def test_execute_with_token_only(self, slack_hook_class_mock):
slack_hook_mock = mock.Mock()
slack_hook_class_mock.return_value = slack_hook_mock
test_token = 'test_token'
slack_api_post_operator = self.__construct_operator(test_token, None)
slack_api_post_operator.execute()
slack_hook_class_mock.assert_called_once_with(token=test_token, slack_conn_id=None)
slack_hook_mock.call.assert_called_once_with(self.expected_method, self.expected_api_params)
slack_hook_mock.reset_mock()
slack_hook_class_mock.reset_mock()
slack_api_post_operator = self.__construct_operator(test_token, None, self.test_api_params)
slack_api_post_operator.execute()
slack_hook_class_mock.assert_called_once_with(token=test_token, slack_conn_id=None)
slack_hook_mock.call.assert_called_once_with(self.expected_method, self.test_api_params)
@mock.patch('airflow.providers.slack.operators.slack.SlackHook')
def test_execute_with_slack_conn_id_only(self, slack_hook_class_mock):
slack_hook_mock = mock.Mock()
slack_hook_class_mock.return_value = slack_hook_mock
test_slack_conn_id = 'test_slack_conn_id'
slack_api_post_operator = self.__construct_operator(None, test_slack_conn_id)
slack_api_post_operator.execute()
slack_hook_class_mock.assert_called_once_with(token=None, slack_conn_id=test_slack_conn_id)
slack_hook_mock.call.assert_called_once_with(self.expected_method, self.expected_api_params)
def test_init_with_invalid_params(self):
test_token = 'test_token'
test_slack_conn_id = 'test_slack_conn_id'
self.assertRaises(AirflowException, self.__construct_operator, test_token, test_slack_conn_id)
self.assertRaises(AirflowException, self.__construct_operator, None, None)
def test_init_with_valid_params(self):
test_token = 'test_token'
test_slack_conn_id = 'test_slack_conn_id'