Add snowflake to slack operator (#9023)
Addition of a new `snowflake_to_slack` operator. The operator allows you to run a SQL statement against Snowflake and render the results into a Slack message. * Add snowflake_to_slack operator to example dag * Add type hints to operator and clean example dag
This commit is contained in:
Родитель
2b45d8f0cb
Коммит
1c9374d257
|
@ -468,6 +468,7 @@ opsgenie http
|
||||||
postgres amazon
|
postgres amazon
|
||||||
sftp ssh
|
sftp ssh
|
||||||
slack http
|
slack http
|
||||||
|
snowflake slack
|
||||||
========================== ===========================
|
========================== ===========================
|
||||||
|
|
||||||
.. END PACKAGE DEPENDENCIES HERE
|
.. END PACKAGE DEPENDENCIES HERE
|
||||||
|
|
|
@ -64,5 +64,8 @@
|
||||||
],
|
],
|
||||||
"slack": [
|
"slack": [
|
||||||
"http"
|
"http"
|
||||||
|
],
|
||||||
|
"snowflake": [
|
||||||
|
"slack"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,11 @@ import os
|
||||||
from airflow import DAG
|
from airflow import DAG
|
||||||
from airflow.providers.snowflake.operators.s3_to_snowflake import S3ToSnowflakeTransferOperator
|
from airflow.providers.snowflake.operators.s3_to_snowflake import S3ToSnowflakeTransferOperator
|
||||||
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
|
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
|
||||||
|
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
|
||||||
from airflow.utils.dates import days_ago
|
from airflow.utils.dates import days_ago
|
||||||
|
|
||||||
SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
|
SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
|
||||||
|
SLACK_CONN_ID = os.environ.get('SLACK_CONN_ID', 'slack_default')
|
||||||
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
|
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
|
||||||
SNOWFLAKE_SCHEMA = os.environ.get('SNOWFLAKE_SCHEMA', 'public')
|
SNOWFLAKE_SCHEMA = os.environ.get('SNOWFLAKE_SCHEMA', 'public')
|
||||||
SNOWFLAKE_STAGE = os.environ.get('SNOWFLAKE_STAGE', 'airflow')
|
SNOWFLAKE_STAGE = os.environ.get('SNOWFLAKE_STAGE', 'airflow')
|
||||||
|
@ -33,6 +35,12 @@ SNOWFLAKE_SAMPLE_TABLE = os.environ.get('SNOWFLAKE_SAMPLE_TABLE', 'snowflake_sam
|
||||||
SNOWFLAKE_LOAD_TABLE = os.environ.get('SNOWFLAKE_LOAD_TABLE', 'airflow_example')
|
SNOWFLAKE_LOAD_TABLE = os.environ.get('SNOWFLAKE_LOAD_TABLE', 'airflow_example')
|
||||||
SNOWFLAKE_LOAD_JSON_PATH = os.environ.get('SNOWFLAKE_LOAD_PATH', 'example.json')
|
SNOWFLAKE_LOAD_JSON_PATH = os.environ.get('SNOWFLAKE_LOAD_PATH', 'example.json')
|
||||||
|
|
||||||
|
SNOWFLAKE_SELECT_SQL = f"SELECT * FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 100;"
|
||||||
|
SNOWFLAKE_SLACK_SQL = f"SELECT O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 10;"
|
||||||
|
SNOWFLAKE_SLACK_MESSAGE = "Results in an ASCII table:\n" \
|
||||||
|
"```{{ results_df | tabulate(tablefmt='pretty', headers='keys') }}```"
|
||||||
|
SNOWFLAKE_CREATE_TABLE_SQL = f"CREATE TRANSIENT TABLE IF NOT EXISTS {SNOWFLAKE_LOAD_TABLE}(data VARIANT);"
|
||||||
|
|
||||||
default_args = {
|
default_args = {
|
||||||
'owner': 'airflow',
|
'owner': 'airflow',
|
||||||
'start_date': days_ago(2),
|
'start_date': days_ago(2),
|
||||||
|
@ -47,22 +55,23 @@ dag = DAG(
|
||||||
select = SnowflakeOperator(
|
select = SnowflakeOperator(
|
||||||
task_id='select',
|
task_id='select',
|
||||||
snowflake_conn_id=SNOWFLAKE_CONN_ID,
|
snowflake_conn_id=SNOWFLAKE_CONN_ID,
|
||||||
sql="""
|
sql=SNOWFLAKE_SELECT_SQL,
|
||||||
SELECT *
|
|
||||||
FROM {0}
|
|
||||||
LIMIT 100;
|
|
||||||
""".format(SNOWFLAKE_SAMPLE_TABLE),
|
|
||||||
dag=dag,
|
dag=dag,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
slack_report = SnowflakeToSlackOperator(
|
||||||
|
task_id="slack_report",
|
||||||
|
sql=SNOWFLAKE_SLACK_SQL,
|
||||||
|
slack_message=SNOWFLAKE_SLACK_MESSAGE,
|
||||||
|
snowflake_conn_id=SNOWFLAKE_CONN_ID,
|
||||||
|
slack_conn_id=SLACK_CONN_ID,
|
||||||
|
dag=dag
|
||||||
|
)
|
||||||
|
|
||||||
create_table = SnowflakeOperator(
|
create_table = SnowflakeOperator(
|
||||||
task_id='create_table',
|
task_id='create_table',
|
||||||
snowflake_conn_id='snowflake_conn_id',
|
snowflake_conn_id=SNOWFLAKE_CONN_ID,
|
||||||
sql="""
|
sql=SNOWFLAKE_CREATE_TABLE_SQL,
|
||||||
CREATE TRANSIENT TABLE IF NOT EXISTS {0} (
|
|
||||||
data VARIANT
|
|
||||||
);
|
|
||||||
""".format(SNOWFLAKE_LOAD_TABLE),
|
|
||||||
schema=SNOWFLAKE_SCHEMA,
|
schema=SNOWFLAKE_SCHEMA,
|
||||||
dag=dag,
|
dag=dag,
|
||||||
)
|
)
|
||||||
|
@ -78,4 +87,5 @@ copy_into_table = S3ToSnowflakeTransferOperator(
|
||||||
dag=dag,
|
dag=dag,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
select >> slack_report
|
||||||
create_table >> copy_into_table
|
create_table >> copy_into_table
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from typing import Iterable, Mapping, Optional, Union
|
||||||
|
|
||||||
|
from pandas import DataFrame
|
||||||
|
from tabulate import tabulate
|
||||||
|
|
||||||
|
from airflow.exceptions import AirflowException
|
||||||
|
from airflow.models import BaseOperator
|
||||||
|
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
|
||||||
|
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
|
||||||
|
from airflow.utils.decorators import apply_defaults
|
||||||
|
|
||||||
|
|
||||||
|
class SnowflakeToSlackOperator(BaseOperator):
|
||||||
|
"""
|
||||||
|
Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
|
||||||
|
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
|
||||||
|
results_df }}'. The 'results_df' variable name can be changed by specifing a different
|
||||||
|
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
|
||||||
|
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
|
||||||
|
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
|
||||||
|
|
||||||
|
:param sql: The SQL statement to execute on Snowflake (templated)
|
||||||
|
:type sql: str
|
||||||
|
:param slack_message: The templated Slack message to send with the data returned from Snowflake.
|
||||||
|
You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
|
||||||
|
SQL results
|
||||||
|
:type slack_message: str
|
||||||
|
:param snowflake_conn_id: The Snowflake connection id
|
||||||
|
:type snowflake_conn_id: str
|
||||||
|
:param slack_conn_id: The connection id for Slack
|
||||||
|
:type slack_conn_id: str
|
||||||
|
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
|
||||||
|
:type results_df_name: str
|
||||||
|
:param parameters: The parameters to pass to the SQL query
|
||||||
|
:type parameters: Optional[Union[Iterable, Mapping]]
|
||||||
|
:param warehouse: The Snowflake virtual warehouse to use to run the SQL query
|
||||||
|
:type warehouse: Optional[str]
|
||||||
|
:param database: The Snowflake database to use for the SQL query
|
||||||
|
:type database: Optional[str]
|
||||||
|
:param schema: The schema to run the SQL against in Snowflake
|
||||||
|
:type schema: Optional[str]
|
||||||
|
:param role: The role to use when connecting to Snowflake
|
||||||
|
:type role: Optional[str]
|
||||||
|
:param slack_token: The token to use to authenticate to Slack. If this is not provided, the
|
||||||
|
'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
|
||||||
|
:type slack_token: Optional[str]
|
||||||
|
"""
|
||||||
|
template_fields = ['sql', 'slack_message']
|
||||||
|
template_ext = ['.sql', '.jinja', '.j2']
|
||||||
|
times_rendered = 0
|
||||||
|
|
||||||
|
@apply_defaults
|
||||||
|
def __init__( # pylint: disable=too-many-arguments
|
||||||
|
self,
|
||||||
|
sql: str,
|
||||||
|
slack_message: str,
|
||||||
|
snowflake_conn_id: str = 'snowflake_default',
|
||||||
|
slack_conn_id: str = 'slack_default',
|
||||||
|
results_df_name: str = 'results_df',
|
||||||
|
parameters: Optional[Union[Iterable, Mapping]] = None,
|
||||||
|
warehouse: Optional[str] = None,
|
||||||
|
database: Optional[str] = None,
|
||||||
|
schema: Optional[str] = None,
|
||||||
|
role: Optional[str] = None,
|
||||||
|
slack_token: Optional[str] = None,
|
||||||
|
*args, **kwargs
|
||||||
|
) -> None:
|
||||||
|
super(SnowflakeToSlackOperator, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self.snowflake_conn_id = snowflake_conn_id
|
||||||
|
self.sql = sql
|
||||||
|
self.parameters = parameters
|
||||||
|
self.warehouse = warehouse
|
||||||
|
self.database = database
|
||||||
|
self.schema = schema
|
||||||
|
self.role = role
|
||||||
|
self.slack_conn_id = slack_conn_id
|
||||||
|
self.slack_token = slack_token
|
||||||
|
self.slack_message = slack_message
|
||||||
|
self.results_df_name = results_df_name
|
||||||
|
|
||||||
|
def _get_query_results(self) -> DataFrame:
|
||||||
|
snowflake_hook = self._get_snowflake_hook()
|
||||||
|
|
||||||
|
self.log.info('Running SQL query: %s', self.sql)
|
||||||
|
df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
|
||||||
|
return df
|
||||||
|
|
||||||
|
def _render_and_send_slack_message(self, context, df) -> None:
|
||||||
|
# Put the dataframe into the context and render the JINJA template fields
|
||||||
|
context[self.results_df_name] = df
|
||||||
|
self.render_template_fields(context)
|
||||||
|
|
||||||
|
slack_hook = self._get_slack_hook()
|
||||||
|
self.log.info('Sending slack message: %s', self.slack_message)
|
||||||
|
slack_hook.execute()
|
||||||
|
|
||||||
|
def _get_snowflake_hook(self) -> SnowflakeHook:
|
||||||
|
return SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
|
||||||
|
warehouse=self.warehouse, database=self.database,
|
||||||
|
role=self.role, schema=self.schema)
|
||||||
|
|
||||||
|
def _get_slack_hook(self) -> SlackWebhookHook:
|
||||||
|
return SlackWebhookHook(http_conn_id=self.slack_conn_id, message=self.slack_message,
|
||||||
|
webhook_token=self.slack_token)
|
||||||
|
|
||||||
|
def render_template_fields(self, context, jinja_env=None) -> None:
|
||||||
|
# If this is the first render of the template fields, exclude slack_message from rendering since
|
||||||
|
# the snowflake results haven't been retrieved yet.
|
||||||
|
if self.times_rendered == 0:
|
||||||
|
fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
|
||||||
|
else:
|
||||||
|
fields_to_render = self.template_fields
|
||||||
|
|
||||||
|
if not jinja_env:
|
||||||
|
jinja_env = self.get_template_env()
|
||||||
|
|
||||||
|
# Add the tabulate library into the JINJA environment
|
||||||
|
jinja_env.filters['tabulate'] = tabulate
|
||||||
|
|
||||||
|
self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
|
||||||
|
self.times_rendered += 1
|
||||||
|
|
||||||
|
def execute(self, context) -> None:
|
||||||
|
if not isinstance(self.sql, str):
|
||||||
|
raise AirflowException("Expected 'sql' parameter should be a string.")
|
||||||
|
if self.sql is None or self.sql.strip() == "":
|
||||||
|
raise AirflowException("Expected 'sql' parameter is missing.")
|
||||||
|
if self.slack_message is None or self.slack_message.strip() == "":
|
||||||
|
raise AirflowException("Expected 'slack_message' parameter is missing.")
|
||||||
|
|
||||||
|
df = self._get_query_results()
|
||||||
|
self._render_and_send_slack_message(context, df)
|
||||||
|
|
||||||
|
self.log.debug('Finished sending Snowflake data to Slack')
|
|
@ -1218,7 +1218,8 @@ These integrations allow you to perform various operations within various servic
|
||||||
* - `Snowflake <https://www.snowflake.com/>`__
|
* - `Snowflake <https://www.snowflake.com/>`__
|
||||||
-
|
-
|
||||||
- :mod:`airflow.providers.snowflake.hooks.snowflake`
|
- :mod:`airflow.providers.snowflake.hooks.snowflake`
|
||||||
- :mod:`airflow.providers.snowflake.operators.snowflake`
|
- :mod:`airflow.providers.snowflake.operators.snowflake`,
|
||||||
|
:mod:`airflow.providers.snowflake.operators.snowflake_to_slack`
|
||||||
-
|
-
|
||||||
|
|
||||||
* - `Vertica <https://www.vertica.com/>`__
|
* - `Vertica <https://www.vertica.com/>`__
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from airflow.models import DAG
|
||||||
|
from airflow.providers.snowflake.operators.snowflake_to_slack import SnowflakeToSlackOperator
|
||||||
|
from airflow.utils import timezone
|
||||||
|
|
||||||
|
TEST_DAG_ID = 'snowflake_to_slack_unit_test'
|
||||||
|
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSnowflakeToSlackOperator(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.example_dag = DAG('unit_test_dag_snowflake_to_slack', start_date=DEFAULT_DATE)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _construct_operator(**kwargs):
|
||||||
|
operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
|
||||||
|
return operator
|
||||||
|
|
||||||
|
@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SnowflakeHook')
|
||||||
|
@mock.patch('airflow.providers.snowflake.operators.snowflake_to_slack.SlackWebhookHook')
|
||||||
|
def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class):
|
||||||
|
operator_args = {
|
||||||
|
'snowflake_conn_id': 'snowflake_connection',
|
||||||
|
'slack_conn_id': 'slack_connection',
|
||||||
|
'sql': "sql {{ ds }}",
|
||||||
|
'results_df_name': 'xxxx',
|
||||||
|
'warehouse': 'test_warehouse',
|
||||||
|
'database': 'test_database',
|
||||||
|
'role': 'test_role',
|
||||||
|
'schema': 'test_schema',
|
||||||
|
'parameters': ['1', '2', '3'],
|
||||||
|
'slack_message': 'message: {{ ds }}, {{ xxxx }}',
|
||||||
|
'slack_token': 'test_token',
|
||||||
|
'dag': self.example_dag
|
||||||
|
}
|
||||||
|
snowflake_to_slack_operator = self._construct_operator(**operator_args)
|
||||||
|
|
||||||
|
snowflake_hook = mock_snowflake_hook_class.return_value
|
||||||
|
snowflake_hook.get_pandas_df.return_value = '1234'
|
||||||
|
slack_webhook_hook = mock_slack_hook_class.return_value
|
||||||
|
|
||||||
|
snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
|
||||||
|
|
||||||
|
# Test that the Snowflake hook is instantiated with the right parameters
|
||||||
|
mock_snowflake_hook_class.assert_called_once_with(database='test_database',
|
||||||
|
role='test_role',
|
||||||
|
schema='test_schema',
|
||||||
|
snowflake_conn_id='snowflake_connection',
|
||||||
|
warehouse='test_warehouse')
|
||||||
|
|
||||||
|
# Test that the get_pandas_df method is executed on the Snowflake hook with the prendered sql and
|
||||||
|
# correct params
|
||||||
|
snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3'])
|
||||||
|
|
||||||
|
# Test that the Slack hook is instantiated with the right parameters
|
||||||
|
mock_slack_hook_class.assert_called_once_with(http_conn_id='slack_connection',
|
||||||
|
message='message: 2017-01-01, 1234',
|
||||||
|
webhook_token='test_token')
|
||||||
|
|
||||||
|
# Test that the Slack hook's execute method gets run once
|
||||||
|
slack_webhook_hook.execute.assert_called_once()
|
Загрузка…
Ссылка в новой задаче