Added control loop and helper functions for monitoring metrics.

This commit is contained in:
Eric Wiseblatt 2016-11-12 02:47:15 +00:00
Родитель 1a0dc6c3ce
Коммит fb2ad5fbba
2 изменённых файлов: 108 добавлений и 7 удалений

Просмотреть файл

@ -17,23 +17,81 @@
# pylint: disable=missing-docstring
import argparse
import collections
import logging
import logging.config
import collections
import socket
import threading
import time
from http_server import (HttpServer, StdoutRequestHandler)
import metric_collector_handlers as handlers
import stackdriver_handlers as stackdriver_handlers
from spectator_client import SpectatorClient
from stackdriver_client import StackdriverClient
import spectator_client
import stackdriver_client
HandlerDefinition = collections.namedtuple(
'HandlersDefinition', ['handler', 'url_path', 'command_name', 'description'])
class DummyMetricService(object):
def store(self, service_metrics):
spectator_client.foreach_metric_in_service_map(service_metrics, self.dump)
def dump(self, service, name, metadata, instance):
name, tags = spectator_client.normalize_name_and_tags(
name, metadata, instance)
if tags is None:
print 'IGNORE {0}.{1}'.format(service, name)
else:
print '{0}.{1} {2}'.format(service, name,
[(tag['key'], tag['value']) for tag in tags])
def __determine_host(host):
if host in ['localhost', '127.0.0.1', None, '']:
host = socket.getfqdn()
return host
class Monitor(object):
def __init__(self, spectator, metric_service, options):
self.__period = options.period
self.__spectator = spectator
self.__metric_service = metric_service
self.__services = options.services
self.__params = {}
def __data_map_to_service_metrics(self, data_map):
result = {}
count = 0
for service, metrics in data_map.items():
actual_metrics = metrics.get('metrics', None)
if actual_metrics is None:
logging.error('Unexpected response from "%s"', service)
else:
count += len(actual_metrics)
result[service] = actual_metrics
return result, count
def __call__(self):
logging.info('Starting Monitor')
while True:
start = time.time()
data_map = self.__spectator.scan_by_service(
self.__services, params=self.__params)
collected = time.time()
service_metrics, count = self.__data_map_to_service_metrics(data_map)
self.__metric_service.store(service_metrics)
done = time.time()
logging.info('Wrote %d metrics in %d ms + %d ms',
count, collected - start, done - collected)
time.sleep(self.__period)
def init_logging(log_file):
log_config = {
'version':1,
@ -75,9 +133,11 @@ def get_options():
parser.add_argument('--project', default='')
parser.add_argument('--credentials_path', default='')
parser.add_argument('--host', default='localhost')
parser.add_argument('--period', default=30)
parser.add_argument('--period', default=60, type=int)
parser.add_argument('--prototype_path', default='')
parser.add_argument('--command', default='')
parser.add_argument('--monitor', default=False, action='store_true')
parser.add_argument('--nomonitor', dest='monitor', action='store_false')
# Either space or ',' delimited
parser.add_argument('services', nargs='*', default=['all'])
@ -107,9 +167,9 @@ def main():
init_logging('metric_collector.log')
options = get_options()
spectator = SpectatorClient(options)
spectator = spectator_client.SpectatorClient(options)
try:
stackdriver = StackdriverClient.make_client(options)
stackdriver = stackdriver_client.StackdriverClient.make_client(options)
except IOError as ioerror:
logging.error('Could not create stackdriver client'
' -- Stackdriver will be unavailable\n%s',
@ -158,6 +218,15 @@ def main():
process_command(options.command, registry)
return
if options.monitor:
logging.info('Starting Monitor every %d s', options.period)
# TODO: Replace this with a real service.
metric_service = DummyMetricService()
monitor = Monitor(spectator, metric_service, options)
threading.Thread(target=monitor, name='monitor').start()
logging.info('Starting HTTP server on port %d', options.port)
url_path_to_handler = {entry.url_path: entry.handler for entry in registry}
httpd = HttpServer(options.port, url_path_to_handler)

Просмотреть файл

@ -19,6 +19,38 @@ import logging
import urllib2
def __foreach_metric_tag_binding(
service, metric_name, metric_data,
visitor, visitor_pos_args, visitor_kwargs):
for metric_instance in metric_data['values']:
visitor(service, metric_name, metric_data, metric_instance,
*visitor_pos_args, **visitor_kwargs)
def foreach_metric_in_service_map(
service_map, visitor, *visitor_pos_args, **visitor_kwargs):
for service, service_metrics in service_map.items():
for metric_name, metric_data in service_metrics.items():
__foreach_metric_tag_binding(
service, metric_name, metric_data,
visitor, visitor_pos_args, visitor_kwargs)
def normalize_name_and_tags(name, metric_metadata, metric_instance):
tags = metric_instance.get('tags', None)
if not tags:
return name, None # signal this metric had no tags so we can ignore it.
is_timer = metric_metadata['kind'] == 'Timer'
if is_timer:
for index, tag in enumerate(tags):
if tag['key'] == 'statistic':
name = name + '__{0}'.format(tag['value'])
del tags[index]
break
return name, tags
class SpectatorClient(object):
"""Helper class for pulling data from Spectator servers."""
@ -129,7 +161,7 @@ class SpectatorClient(object):
@staticmethod
def ingest_metrics(service, service_response, type_map):
"""Add JSON metrics |response| from |service| name and add them into |type_map|"""
"""Add JSON metrics |response| from |service| name and add to |type_map|"""
for key, value in service_response['metrics'].items():
if key in type_map:
type_map[key][service] = value