[AIRFLOW-591] Add datadog hook & sensor
Closes #1851 from gtoonstra/contrib_datadog
This commit is contained in:
Родитель
f8f7d1af20
Коммит
d8383038ac
|
@ -0,0 +1,136 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import time
|
||||
import logging
|
||||
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.exceptions import AirflowException
|
||||
from datadog import initialize, api
|
||||
|
||||
|
||||
class DatadogHook(BaseHook):
|
||||
"""
|
||||
Uses datadog API to send metrics of practically anything measurable,
|
||||
so it's possible to track # of db records inserted/deleted, records read
|
||||
from file and many other useful metrics.
|
||||
|
||||
Depends on the datadog API, which has to be deployed on the same server where
|
||||
Airflow runs.
|
||||
|
||||
:param datadog_conn_id: The connection to datadog, containing metadata for api keys.
|
||||
:param datadog_conn_id: string
|
||||
"""
|
||||
|
||||
def __init__(self, datadog_conn_id='datadog_default'):
|
||||
conn = self.get_connection(datadog_conn_id)
|
||||
self.api_key = conn.extra_dejson.get('api_key', None)
|
||||
self.app_key = conn.extra_dejson.get('app_key', None)
|
||||
self.source_type_name = conn.extra_dejson.get('source_type_name', None)
|
||||
|
||||
# If the host is populated, it will use that hostname instead.
|
||||
# for all metric submissions.
|
||||
self.host = conn.host
|
||||
|
||||
if self.api_key is None:
|
||||
raise AirflowException("api_key must be specified in the Datadog connection details")
|
||||
if self.app_key is None:
|
||||
raise AirflowException("app_key must be specified in the Datadog connection details")
|
||||
|
||||
logging.info("Setting up api keys for datadog")
|
||||
options = {
|
||||
'api_key': self.api_key,
|
||||
'app_key': self.app_key
|
||||
}
|
||||
initialize(**options)
|
||||
|
||||
def validate_response(self, response):
|
||||
if response['status'] != 'ok':
|
||||
logging.error("Data dog returned: " + response)
|
||||
raise AirflowException("Error status received from datadog")
|
||||
|
||||
def send_metric(self, metric_name, datapoint, tags=None):
|
||||
"""
|
||||
Sends a single datapoint metric to DataDog
|
||||
|
||||
:param metric_name: The name of the metric
|
||||
:type metric_name: string
|
||||
:param datapoint: A single integer or float related to the metric
|
||||
:type datapoint: integer or float
|
||||
:param tags: A list of tags associated with the metric
|
||||
:type tags: list
|
||||
"""
|
||||
response = api.Metric.send(
|
||||
metric=metric_name,
|
||||
points=datapoint,
|
||||
host=self.host,
|
||||
tags=tags)
|
||||
|
||||
self.validate_response(response)
|
||||
return response
|
||||
|
||||
def query_metric(self,
|
||||
query,
|
||||
from_seconds_ago,
|
||||
to_seconds_ago):
|
||||
"""
|
||||
Queries datadog for a specific metric, potentially with some function applied to it
|
||||
and returns the results.
|
||||
|
||||
:param query: The datadog query to execute (see datadog docs)
|
||||
:type query: string
|
||||
:param from_seconds_ago: How many seconds ago to start querying for.
|
||||
:type from_seconds_ago: int
|
||||
:param to_seconds_ago: Up to how many seconds ago to query for.
|
||||
:type to_seconds_ago: int
|
||||
"""
|
||||
now = int(time.time())
|
||||
|
||||
response = api.Metric.query(
|
||||
start=now - from_seconds_ago,
|
||||
end=now - to_seconds_ago,
|
||||
query=query)
|
||||
|
||||
self.validate_response(response)
|
||||
return response
|
||||
|
||||
def post_event(self, title, text, tags=None, alert_type=None, aggregation_key=None):
|
||||
"""
|
||||
Posts an event to datadog (processing finished, potentially alerts, other issues)
|
||||
Think about this as a means to maintain persistence of alerts, rather than alerting
|
||||
itself.
|
||||
|
||||
:param title: The title of the event
|
||||
:type title: string
|
||||
:param text: The body of the event (more information)
|
||||
:type text: string
|
||||
:param tags: List of string tags to apply to the event
|
||||
:type tags: list
|
||||
:param alert_type: The alert type for the event, one of
|
||||
["error", "warning", "info", "success"]
|
||||
:type alert_type: string
|
||||
:param aggregation_key: Key that can be used to aggregate this event in a stream
|
||||
:type aggregation_key: string
|
||||
"""
|
||||
response = api.Event.create(
|
||||
title=title,
|
||||
text=text,
|
||||
host=self.host,
|
||||
tags=tags,
|
||||
alert_type=alert_type,
|
||||
aggregation_key=aggregation_key,
|
||||
source_type_name=self.source_type_name)
|
||||
|
||||
self.validate_response(response)
|
||||
return response
|
|
@ -0,0 +1,81 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from airflow.operators.sensors import BaseSensorOperator
|
||||
from airflow.contrib.hooks.datadog_hook import DatadogHook
|
||||
from airflow.utils import apply_defaults
|
||||
from airflow.exceptions import AirflowException
|
||||
from datadog import api
|
||||
|
||||
|
||||
class DatadogSensor(BaseSensorOperator):
|
||||
"""
|
||||
A sensor to listen, with a filter, to datadog event streams and determine
|
||||
if some event was emitted.
|
||||
|
||||
Depends on the datadog API, which has to be deployed on the same server where
|
||||
Airflow runs.
|
||||
|
||||
:param datadog_conn_id: The connection to datadog, containing metadata for api keys.
|
||||
:param datadog_conn_id: string
|
||||
"""
|
||||
ui_color = '#66c3dd'
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self,
|
||||
datadog_conn_id='datadog_default',
|
||||
from_seconds_ago=3600,
|
||||
up_to_seconds_from_now=0,
|
||||
priority=None,
|
||||
sources=None,
|
||||
tags=None,
|
||||
response_check=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
super(DatadogSensor, self).__init__(*args, **kwargs)
|
||||
self.datadog_conn_id = datadog_conn_id
|
||||
self.from_seconds_ago = from_seconds_ago
|
||||
self.up_to_seconds_from_now = up_to_seconds_from_now
|
||||
self.priority = priority
|
||||
self.sources = sources
|
||||
self.tags = tags
|
||||
self.response_check = response_check
|
||||
|
||||
def poke(self, context):
|
||||
# This instantiates the hook, but doesn't need it further,
|
||||
# because the API authenticates globally (unfortunately),
|
||||
# but for airflow this shouldn't matter too much, because each
|
||||
# task instance runs in its own process anyway.
|
||||
DatadogHook(datadog_conn_id=self.datadog_conn_id)
|
||||
|
||||
response = api.Event.query(
|
||||
start=self.from_seconds_ago,
|
||||
end=self.up_to_seconds_from_now,
|
||||
priority=self.priority,
|
||||
sources=self.sources,
|
||||
tags=self.tags)
|
||||
|
||||
if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
|
||||
logging.error("Unexpected datadog result: %s" % (response))
|
||||
raise AirflowException("Datadog returned unexpected result")
|
||||
|
||||
if self.response_check:
|
||||
# run content check on response
|
||||
return self.response_check(response)
|
||||
|
||||
# If no check was inserted, assume any event that matched yields true.
|
||||
return len(response) > 0
|
2
setup.py
2
setup.py
|
@ -109,6 +109,7 @@ celery = [
|
|||
'flower>=0.7.3'
|
||||
]
|
||||
crypto = ['cryptography>=0.9.3']
|
||||
datadog = ['datadog>=0.14.0']
|
||||
doc = [
|
||||
'sphinx>=1.2.3',
|
||||
'sphinx-argparse>=0.1.13',
|
||||
|
@ -213,6 +214,7 @@ def do_setup():
|
|||
'celery': celery,
|
||||
'cloudant': cloudant,
|
||||
'crypto': crypto,
|
||||
'datadog': datadog,
|
||||
'devel': devel_minreq,
|
||||
'devel_hadoop': devel_hadoop,
|
||||
'doc': doc,
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from mock import patch
|
||||
|
||||
from airflow.contrib.sensors.datadog_sensor import DatadogSensor
|
||||
|
||||
|
||||
at_least_one_event = [{'alert_type': 'info',
|
||||
'comments': [],
|
||||
'date_happened': 1419436860,
|
||||
'device_name': None,
|
||||
'host': None,
|
||||
'id': 2603387619536318140,
|
||||
'is_aggregate': False,
|
||||
'priority': 'normal',
|
||||
'resource': '/api/v1/events/2603387619536318140',
|
||||
'source': 'My Apps',
|
||||
'tags': ['application:web', 'version:1'],
|
||||
'text': 'And let me tell you all about it here!',
|
||||
'title': 'Something big happened!',
|
||||
'url': '/event/jump_to?event_id=2603387619536318140'},
|
||||
{'alert_type': 'info',
|
||||
'comments': [],
|
||||
'date_happened': 1419436865,
|
||||
'device_name': None,
|
||||
'host': None,
|
||||
'id': 2603387619536318141,
|
||||
'is_aggregate': False,
|
||||
'priority': 'normal',
|
||||
'resource': '/api/v1/events/2603387619536318141',
|
||||
'source': 'My Apps',
|
||||
'tags': ['application:web', 'version:1'],
|
||||
'text': 'And let me tell you all about it here!',
|
||||
'title': 'Something big happened!',
|
||||
'url': '/event/jump_to?event_id=2603387619536318141'}]
|
||||
|
||||
zero_events = []
|
||||
|
||||
|
||||
class TestDatadogSensor(unittest.TestCase):
|
||||
@patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
|
||||
@patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
|
||||
def test_sensor_ok(self, api1, api2):
|
||||
api1.return_value = at_least_one_event
|
||||
api2.return_value = at_least_one_event
|
||||
|
||||
sensor = DatadogSensor(
|
||||
task_id='test_datadog',
|
||||
datadog_conn_id='datadog_default',
|
||||
from_seconds_ago=3600,
|
||||
up_to_seconds_from_now=0,
|
||||
priority=None,
|
||||
sources=None,
|
||||
tags=None,
|
||||
response_check=None)
|
||||
|
||||
self.assertTrue(sensor.poke({}))
|
||||
|
||||
@patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
|
||||
@patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
|
||||
def test_sensor_fail(self, api1, api2):
|
||||
api1.return_value = zero_events
|
||||
api2.return_value = zero_events
|
||||
|
||||
sensor = DatadogSensor(
|
||||
task_id='test_datadog',
|
||||
datadog_conn_id='datadog_default',
|
||||
from_seconds_ago=0,
|
||||
up_to_seconds_from_now=0,
|
||||
priority=None,
|
||||
sources=None,
|
||||
tags=None,
|
||||
response_check=None)
|
||||
|
||||
self.assertFalse(sensor.poke({}))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Загрузка…
Ссылка в новой задаче