diff --git a/google/stackdriver_monitoring/metric_collector.py b/google/stackdriver_monitoring/metric_collector.py index 262479f..9957a4c 100644 --- a/google/stackdriver_monitoring/metric_collector.py +++ b/google/stackdriver_monitoring/metric_collector.py @@ -17,23 +17,81 @@ # pylint: disable=missing-docstring import argparse +import collections import logging import logging.config -import collections +import socket +import threading +import time from http_server import (HttpServer, StdoutRequestHandler) import metric_collector_handlers as handlers import stackdriver_handlers as stackdriver_handlers -from spectator_client import SpectatorClient -from stackdriver_client import StackdriverClient +import spectator_client +import stackdriver_client HandlerDefinition = collections.namedtuple( 'HandlersDefinition', ['handler', 'url_path', 'command_name', 'description']) +class DummyMetricService(object): + def store(self, service_metrics): + spectator_client.foreach_metric_in_service_map(service_metrics, self.dump) + + def dump(self, service, name, metadata, instance): + name, tags = spectator_client.normalize_name_and_tags( + name, metadata, instance) + if tags is None: + print 'IGNORE {0}.{1}'.format(service, name) + else: + print '{0}.{1} {2}'.format(service, name, + [(tag['key'], tag['value']) for tag in tags]) + + +def __determine_host(host): + if host in ['localhost', '127.0.0.1', None, '']: + host = socket.getfqdn() + return host + + +class Monitor(object): + def __init__(self, spectator, metric_service, options): + self.__period = options.period + self.__spectator = spectator + self.__metric_service = metric_service + self.__services = options.services + self.__params = {} + + def __data_map_to_service_metrics(self, data_map): + result = {} + count = 0 + for service, metrics in data_map.items(): + actual_metrics = metrics.get('metrics', None) + if actual_metrics is None: + logging.error('Unexpected response from "%s"', service) + else: + count += len(actual_metrics) + result[service] = actual_metrics + return result, count + + def __call__(self): + logging.info('Starting Monitor') + while True: + start = time.time() + data_map = self.__spectator.scan_by_service( + self.__services, params=self.__params) + collected = time.time() + service_metrics, count = self.__data_map_to_service_metrics(data_map) + self.__metric_service.store(service_metrics) + done = time.time() + logging.info('Wrote %d metrics in %d ms + %d ms', + count, collected - start, done - collected) + time.sleep(self.__period) + + def init_logging(log_file): log_config = { 'version':1, @@ -75,9 +133,11 @@ def get_options(): parser.add_argument('--project', default='') parser.add_argument('--credentials_path', default='') parser.add_argument('--host', default='localhost') - parser.add_argument('--period', default=30) + parser.add_argument('--period', default=60, type=int) parser.add_argument('--prototype_path', default='') parser.add_argument('--command', default='') + parser.add_argument('--monitor', default=False, action='store_true') + parser.add_argument('--nomonitor', dest='monitor', action='store_false') # Either space or ',' delimited parser.add_argument('services', nargs='*', default=['all']) @@ -107,9 +167,9 @@ def main(): init_logging('metric_collector.log') options = get_options() - spectator = SpectatorClient(options) + spectator = spectator_client.SpectatorClient(options) try: - stackdriver = StackdriverClient.make_client(options) + stackdriver = stackdriver_client.StackdriverClient.make_client(options) except IOError as ioerror: logging.error('Could not create stackdriver client' ' -- Stackdriver will be unavailable\n%s', @@ -158,6 +218,15 @@ def main(): process_command(options.command, registry) return + if options.monitor: + logging.info('Starting Monitor every %d s', options.period) + + # TODO: Replace this with a real service. + metric_service = DummyMetricService() + + monitor = Monitor(spectator, metric_service, options) + threading.Thread(target=monitor, name='monitor').start() + logging.info('Starting HTTP server on port %d', options.port) url_path_to_handler = {entry.url_path: entry.handler for entry in registry} httpd = HttpServer(options.port, url_path_to_handler) diff --git a/google/stackdriver_monitoring/spectator_client.py b/google/stackdriver_monitoring/spectator_client.py index ad6ec28..71c8040 100644 --- a/google/stackdriver_monitoring/spectator_client.py +++ b/google/stackdriver_monitoring/spectator_client.py @@ -19,6 +19,38 @@ import logging import urllib2 +def __foreach_metric_tag_binding( + service, metric_name, metric_data, + visitor, visitor_pos_args, visitor_kwargs): + for metric_instance in metric_data['values']: + visitor(service, metric_name, metric_data, metric_instance, + *visitor_pos_args, **visitor_kwargs) + + +def foreach_metric_in_service_map( + service_map, visitor, *visitor_pos_args, **visitor_kwargs): + for service, service_metrics in service_map.items(): + for metric_name, metric_data in service_metrics.items(): + __foreach_metric_tag_binding( + service, metric_name, metric_data, + visitor, visitor_pos_args, visitor_kwargs) + + +def normalize_name_and_tags(name, metric_metadata, metric_instance): + tags = metric_instance.get('tags', None) + if not tags: + return name, None # signal this metric had no tags so we can ignore it. + + is_timer = metric_metadata['kind'] == 'Timer' + if is_timer: + for index, tag in enumerate(tags): + if tag['key'] == 'statistic': + name = name + '__{0}'.format(tag['value']) + del tags[index] + break + return name, tags + + class SpectatorClient(object): """Helper class for pulling data from Spectator servers.""" @@ -129,7 +161,7 @@ class SpectatorClient(object): @staticmethod def ingest_metrics(service, service_response, type_map): - """Add JSON metrics |response| from |service| name and add them into |type_map|""" + """Add JSON metrics |response| from |service| name and add to |type_map|""" for key, value in service_response['metrics'].items(): if key in type_map: type_map[key][service] = value