Merge pull request #1373 from ewiseblatt/prometheus

Support prometheus as a metric server.
This commit is contained in:
Eric Wiseblatt 2017-01-18 16:08:12 -05:00 коммит произвёл GitHub
Родитель 3338be8718 d7499cdaaf
Коммит 9bb7b6fb57
5 изменённых файлов: 3334 добавлений и 33 удалений

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -15,6 +15,8 @@
"""Implements HTTP Server.""" """Implements HTTP Server."""
import BaseHTTPServer import BaseHTTPServer
import traceback
def build_html_document(body, title=None): def build_html_document(body, title=None):
"""Produces the HTML document wrapper for a text/html response.""" """Produces the HTML document wrapper for a text/html response."""
@ -88,7 +90,15 @@ class DelegatingRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
if handler is None: if handler is None:
self.respond(404, {'Content-Type': 'text/html'}, "Unknown") self.respond(404, {'Content-Type': 'text/html'}, "Unknown")
else: else:
try:
handler(self, path, parameters, fragment) handler(self, path, parameters, fragment)
except:
self.send_error(500, traceback.format_exc())
raise
def log_message(self, format, *args):
"""Suppress HTTP request logging."""
pass
class StdoutRequestHandler(DelegatingRequestHandler): class StdoutRequestHandler(DelegatingRequestHandler):

Просмотреть файл

@ -0,0 +1,203 @@
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements metric service for interacting with Prometheus.
Rather than pushing into prometheus, we'll let prometheus call us
and collect on demand. However the base interface assumes it can call
us so we'll stub that out with a no-op.
To use this service, configure prometheus.yml as follows:
scrape_configs:
- job_name: 'spinnaker'
static_configs:
- targets: ['localhost:8008']
metrics_path: '/prometheus_metrics'
honor_labels: true
Where the localhost:8003 is --prometheus_port and localhost is
the hostname this service is running on. The 'honor_labels: true'
is to take the job and service labels injected from this service
(which will be the spinnaker microservices the metrics came from)
rather than the job and instance labels of this service which is
what prometheus is scraping to collect the metrics.
"""
import collections
import logging
import command_processor
import spectator_client
from prometheus_client import (
CONTENT_TYPE_LATEST,
generate_latest)
from prometheus_client.core import (
GaugeMetricFamily,
CounterMetricFamily,
REGISTRY)
InstanceRecord = collections.namedtuple(
'InstanceRecord', ['service', 'netloc', 'data'])
MetricInfo = collections.namedtuple('MetricInfo', ['kind', 'tags', 'records'])
class PrometheusMetricsService(object):
"""Implements monitoring service that implements a Prometheus client.
This service implements the Prometheus client library interface which
collects metrics in response to a collect() call.
"""
@staticmethod
def add_service_parser_arguments(parser):
"""Adds commandline arguments to configure prometheus client"""
# Client library has its own http server. Not sure what we need to
# do to hook it into ours so we'll let the client library use its server
# for now.
parser.add_argument(
'--prometheus_port', default=8003, type=int,
help='Port for Prometheus HTTP Server')
parser.add_argument(
'--prometheus_add_source_metalabels', default=True,
action='store_true',
help='Add Spinnaker job/instance labels for prometheus.')
def __init__(self, options):
self.__service_endpoints = spectator_client.determine_service_endpoints(
options)
self.__spectator = spectator_client.SpectatorClient(options)
self.__add_metalabels = options.get('prometheus_add_source_metalabels',
True)
REGISTRY.register(self)
def __collect_instance_info(
self, service, name,
instance, metric_metadata, service_metadata, service_to_name_to_info):
"""Creates a post payload for a DataDog time series data point.
See http://docs.datadoghq.com/api/?lang=python#metrics-post.
Args:
service: [string] The name of the service that the metric is from.
name: [string] The name of the metric coming from the service.
instance: [dict] The spectator entry for a specific metric value
for a specific tag binding instance that we're going to append.
metric_metadata: [dict] The spectator JSON object for the metric
is used to get the kind and possibly other metadata.
service_to_name_to_info: [dict] A dictionary keyed by service to
A dictionary mapping metric names to MetricInfo being built.
"""
# In practice this converts a Spinnaker Timer into either
# <name>__count or <name>__totalTime and removes the "statistic" tag.
name, tags = spectator_client.normalize_name_and_tags(
name, instance, metric_metadata)
if tags is None:
return # ignore metrics that had no tags because these are bogus.
record = InstanceRecord(service,
'{0}:{1}'.format(service_metadata['__host'],
service_metadata['__port']),
instance)
name_to_info = service_to_name_to_info.get(service)
if name_to_info is None:
name_to_info = {}
service_to_name_to_info[service] = name_to_info
tag_names = set([tag['key'] for tag in tags])
info = name_to_info.get(name)
if info is None:
info = MetricInfo(metric_metadata['kind'], tag_names, [record])
name_to_info[name] = info
return
info.records.append(record)
info.tags.update(tag_names)
def collect(self):
"""Implements Prometheus Client interface."""
service_to_name_to_info = {}
service_metric_map = self.__spectator.scan_by_service(
self.__service_endpoints)
spectator_client.foreach_metric_in_service_map(
service_metric_map, self.__collect_instance_info,
service_to_name_to_info)
all_members = []
for service, name_to_info in service_to_name_to_info.items():
for name, info in name_to_info.items():
family = (CounterMetricFamily
if info.kind in ('Counter', 'Timer')
else GaugeMetricFamily)
member_name = '{service}:{name}'.format(
service=service, name=name.replace('.', ':'))
tags = list(info.tags)
all_tags = list(tags)
if self.__add_metalabels:
all_tags.extend(['job', 'instance'])
member = family(member_name, '', labels=all_tags)
all_members.append(member)
for record in info.records:
if isinstance(record, dict):
print '*** RECORD {0}'.format(record)
print '*** INFO {0}'.format(info)
instance = record.data
labels = [''] * len(tags)
for elem in instance['tags']:
labels[tags.index(elem['key'])] = elem['value']
if self.__add_metalabels:
labels.append(record.service)
labels.append(record.netloc)
# Just use the first value. We arent controlling the timestamp
# so multiple values would be meaningless anyway.
member.add_metric(labels=labels, value=instance['values'][0]['v'])
for metric in all_members:
yield metric
def make_service(options):
"""Create a datadog service instance for interacting with Datadog."""
return PrometheusMetricsService(options)
class ScrapeHandler(command_processor.CommandHandler):
"""Handles requests from Prometheus Server.
The server should be configured to hit this URL.
"""
def __init__(self):
"""Construct handler for Prometheus Server to call."""
super(ScrapeHandler, self).__init__(
'/prometheus_metrics',
'Collect Prometheus Metrics',
'Forces a server scrape and returns current metrics in'
' the current Prometheus format.')
def process_web_request(self, request, path, params, fragment):
output = generate_latest()
request.respond(200, {'ContentType': CONTENT_TYPE_LATEST}, output)

Просмотреть файл

@ -3,8 +3,7 @@
datadog datadog
google-api-python-client google-api-python-client
oauth2client oauth2client
urllib3[secure] prometheus_client
pyopenssl
mock mock

Просмотреть файл

@ -22,9 +22,10 @@ import traceback
import http_server import http_server
import command_processor import command_processor
import datadog_service
import prometheus_service
import spectator_client import spectator_client
import stackdriver_service import stackdriver_service
import datadog_service
class HomePageHandler(command_processor.CommandHandler): class HomePageHandler(command_processor.CommandHandler):
@ -61,6 +62,12 @@ class HomePageHandler(command_processor.CommandHandler):
class WebserverCommandHandler(command_processor.CommandHandler): class WebserverCommandHandler(command_processor.CommandHandler):
"""Implements the embedded Web Server.""" """Implements the embedded Web Server."""
@property
def command_handlers(self):
"""Return list of CommandHandlers available to the server."""
return self.__handler_list
def __init__(self, handler_list, url_path, command_name, description): def __init__(self, handler_list, url_path, command_name, description):
"""Constructor. """Constructor.
@ -79,10 +86,12 @@ class WebserverCommandHandler(command_processor.CommandHandler):
""" """
command_processor.set_global_options(options) command_processor.set_global_options(options)
logging.info('Starting HTTP server on port %d', options['port']) port = options['port']
logging.info('Starting HTTP server on port %d', port)
url_path_to_handler = {handler.url_path: handler.process_web_request url_path_to_handler = {handler.url_path: handler.process_web_request
for handler in self.__handler_list} for handler in self.__handler_list}
httpd = http_server.HttpServer(options['port'], url_path_to_handler)
httpd = http_server.HttpServer(port, url_path_to_handler)
httpd.serve_forever() httpd.serve_forever()
def add_argparser(self, subparsers): def add_argparser(self, subparsers):
@ -97,13 +106,24 @@ class WebserverCommandHandler(command_processor.CommandHandler):
class MonitorCommandHandler(WebserverCommandHandler): class MonitorCommandHandler(WebserverCommandHandler):
"""Runs the embedded Web Server with a metric publishing loop.""" """Runs the embedded Web Server with a metric publishing loop."""
def make_metric_service(self, options): def make_metric_services(self, options):
"""Create the metric service we'll use to publish metrics to a backend. """Create the metric services we'll use to publish metrics to a backend.
""" """
service_list = []
if options['stackdriver']: if options['stackdriver']:
return stackdriver_service.make_service(options) service_list.append(stackdriver_service.make_service(options))
if options['datadog']: if options['datadog']:
return datadog_service.make_datadog_service(options) service_list.append(datadog_service.make_datadog_service(options))
if options['prometheus']:
service_list.append(prometheus_service.make_service(options))
# This endpoint will be conditionally added only when prometheus is
# configured. It doesnt have to be like this, but might as well to
# avoid exposing it if it isnt needed.
self.command_handlers.append(prometheus_service.ScrapeHandler())
if service_list:
return service_list
raise ValueError('No metric service specified.') raise ValueError('No metric service specified.')
def __data_map_to_service_metrics(self, data_map): def __data_map_to_service_metrics(self, data_map):
@ -125,18 +145,18 @@ class MonitorCommandHandler(WebserverCommandHandler):
result[service] = actual_metrics result[service] = actual_metrics
return result return result
def process_commandline_request(self, options, metric_service=None): def process_commandline_request(self, options, metric_service_list=None):
"""Impements CommandHandler.""" """Impements CommandHandler."""
if metric_service is None: if metric_service_list is None:
metric_service = self.make_metric_service(options) metric_service_list = self.make_metric_services(options)
daemon = threading.Thread(target=self, name='monitor', daemon = threading.Thread(target=self, name='monitor',
args=(options, metric_service)) args=(options, metric_service_list))
daemon.daemon = True daemon.daemon = True
daemon.start() daemon.start()
super(MonitorCommandHandler, self).process_commandline_request(options) super(MonitorCommandHandler, self).process_commandline_request(options)
def __call__(self, options, metric_service): def __call__(self, options, metric_service_list):
"""This is the actual method that implements the CommandHandler. """This is the actual method that implements the CommandHandler.
It is put here in a callable so that we can run this in a separate thread. It is put here in a callable so that we can run this in a separate thread.
@ -146,44 +166,63 @@ class MonitorCommandHandler(WebserverCommandHandler):
service_endpoints = spectator_client.determine_service_endpoints(options) service_endpoints = spectator_client.determine_service_endpoints(options)
spectator = spectator_client.SpectatorClient(options) spectator = spectator_client.SpectatorClient(options)
publishing_services = [service
for service in metric_service_list
if 'publish_metrics' in dir(service)]
logging.info('Starting Monitor') logging.info('Starting Monitor')
time_offset = int(time.time()) time_offset = int(time.time())
while True: while True:
if not publishing_services:
# we still need this loop to keep the server running
# but the loop doesnt do anything.
time.sleep(period)
continue
start = time.time() start = time.time()
done = start done = start
service_metric_map = spectator.scan_by_service(service_endpoints) service_metric_map = spectator.scan_by_service(service_endpoints)
collected = time.time() collected = time.time()
for service in publishing_services:
try: try:
count = metric_service.publish_metrics(service_metric_map) start_publish = time.time()
count = service.publish_metrics(service_metric_map)
if count is None: if count is None:
count = 0 count = 0
done = time.time() done = time.time()
logging.info( logging.info(
'Wrote %d metrics in %d ms + %d ms', 'Wrote %d metrics to %s in %d ms + %d ms',
count, (collected - start) * 1000, (done - collected) * 1000) count, service.__class__.__name__,
except BaseException as ex: (collected - start) * 1000, (done - start_publish) * 1000)
traceback.print_exc(ex) except:
logging.error(ex) logging.error(traceback.format_exc())
# ignore exception, continue server.
# Try to align time increments so we always collect around the same time # Try to align time increments so we always collect around the same time
# so that the measurements we report are in even intervals. # so that the measurements we report are in even intervals.
# There is still going to be jitter on the collection end but we'll at # There is still going to be jitter on the collection end but we'll at
# least always start with a steady rhythm. # least always start with a steady rhythm.
delta_time = (period - (int(done) - time_offset)) % period now = time.time()
if delta_time == 0 and (int(done) == time_offset delta_time = (period - (int(now) - time_offset)) % period
or (done - start <= 1)): if delta_time == 0 and (int(now) == time_offset
or (now - start <= 1)):
delta_time = period delta_time = period
time.sleep(delta_time) time.sleep(delta_time)
def add_argparser(self, subparsers): def add_argparser(self, subparsers):
"""Implements CommandHandler.""" """Implements CommandHandler."""
parser = super(MonitorCommandHandler, self).add_argparser(subparsers) parser = super(MonitorCommandHandler, self).add_argparser(subparsers)
backend = parser.add_mutually_exclusive_group() parser.add_argument('--stackdriver', default=False, action='store_true',
backend.add_argument('--stackdriver', default=False, action='store_true',
help='Publish metrics to stackdriver.') help='Publish metrics to stackdriver.')
backend.add_argument('--datadog', default=False, action='store_true', parser.add_argument('--datadog', default=False, action='store_true',
help='Publish metrics to Datadog.') help='Publish metrics to Datadog.')
parser.add_argument('--prometheus', default=False, action='store_true',
help='Publish metrics to Prometheus.')
prometheus_service.PrometheusMetricsService.add_service_parser_arguments(
parser)
parser.add_argument( parser.add_argument(
'--fix_stackdriver_labels_unsafe', default=True, '--fix_stackdriver_labels_unsafe', default=True,
action='store_true', action='store_true',