From 1d16d0af977c6f588d198f6427ee7057abfeefb9 Mon Sep 17 00:00:00 2001 From: Roberto Agostino Vitillo Date: Fri, 12 Jun 2015 17:32:41 +0100 Subject: [PATCH] Added more db tests. --- mozaggregator/db.py | 77 ++++++++++++++++++++++++---------------- mozaggregator/service.py | 31 +++++++++++++--- run-tests.sh | 5 +++ tests/test_db.py | 63 ++++++++++++++++++++++++++++---- 4 files changed, 135 insertions(+), 41 deletions(-) diff --git a/mozaggregator/db.py b/mozaggregator/db.py index f30f579..2da3207 100644 --- a/mozaggregator/db.py +++ b/mozaggregator/db.py @@ -5,39 +5,42 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - import psycopg2 import pandas as pd import ujson as json import boto.rds2 +import os from moztelemetry.spark import Histogram from boto.s3.connection import S3Connection - # Use latest revision, we don't really care about histograms that have # been removed. This only works though if histogram definitions are # immutable, which has been the case so far. -_revision_map = {"nightly": "https://hg.mozilla.org/mozilla-central/rev/tip", - "aurora": "https://hg.mozilla.org/releases/mozilla-aurora/rev/tip", - "beta": "https://hg.mozilla.org/releases/mozilla-beta/rev/tip", - "release": "https://hg.mozilla.org/releases/mozilla-release/rev/tip"} +histogram_revision_map = {"nightly": "https://hg.mozilla.org/mozilla-central/rev/tip", + "aurora": "https://hg.mozilla.org/releases/mozilla-aurora/rev/tip", + "beta": "https://hg.mozilla.org/releases/mozilla-beta/rev/tip", + "release": "https://hg.mozilla.org/releases/mozilla-release/rev/tip"} -def create_connection(autocommit=True, host=None): +def create_connection(autocommit=True, host_override=None): # import boto.rds2 # The serializer doesn't pick this one up for some reason when using emacs... - s3 = S3Connection() - config = s3.get_bucket("telemetry-spark-emr").get_key("aggregator_credentials").get_contents_as_string() - config = json.loads(config) + connection_string = os.getenv("DB_TEST_URL") # Used only for testing + if connection_string: + conn = psycopg2.connect(connection_string) + else: + s3 = S3Connection() + config = s3.get_bucket("telemetry-spark-emr").get_key("aggregator_credentials").get_contents_as_string() + config = json.loads(config) - rds = boto.rds2.connect_to_region("us-west-2") - db = rds.describe_db_instances("telemetry-aggregates")["DescribeDBInstancesResponse"]["DescribeDBInstancesResult"]["DBInstances"][0] - host = host or db["Endpoint"]["Address"] - dbname = db["DBName"] - user = db["MasterUsername"] + rds = boto.rds2.connect_to_region("us-west-2") + db = rds.describe_db_instances("telemetry-aggregates")["DescribeDBInstancesResponse"]["DescribeDBInstancesResult"]["DBInstances"][0] + host = host_override or db["Endpoint"]["Address"] + dbname = db["DBName"] + user = db["MasterUsername"] - conn = psycopg2.connect(dbname=dbname, user=user, password=config["password"], host=host) + conn = psycopg2.connect(dbname=dbname, user=user, password=config["password"], host=host) if autocommit: conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) @@ -47,8 +50,13 @@ def create_connection(autocommit=True, host=None): def submit_aggregates(aggregates, dry_run=False): _preparedb() - aggregates.groupBy(lambda x: x[0][:4]).map(lambda x: _upsert_aggregates(x, dry_run)).count() + + count = aggregates.groupBy(lambda x: x[0][:4]).\ + map(lambda x: _upsert_aggregates(x, dry_run=dry_run)).\ + count() + _vacuumdb() + return count def _preparedb(): @@ -155,11 +163,11 @@ begin E'select t.matches[2], t.matches[3] from (select regexp_matches(table_name::text, \\'([^_]*)_([0-9]*)_([0-9]*)\\') from information_schema.tables - where table_schema=\\'public\\' and table_type=\\'BASE TABLE\\' and table_name like \'' || channel || E'%\\' + where table_schema=\\'public\\' and table_type=\\'BASE TABLE\\' and table_name like \\'' || channel || E'%\\' order by table_name desc) as t (matches)'; end - $$ language plpgsql strict; + create or replace function list_channels() returns table(channel text) as $$ begin return query execute @@ -194,14 +202,15 @@ select create_tables(); def _get_complete_histogram(channel, metric, values): - revision = _revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown + revision = histogram_revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown if metric.endswith("_SCALAR"): histogram = pd.Series(values).values # histogram is already complete + metric = metric[:-7] else: histogram = Histogram(metric, {"values": values}, revision=revision).get_value(autocast=False).values - return map(int, list(histogram)) + return metric, map(int, list(histogram)) def _upsert_aggregate(cursor, aggregate): @@ -218,15 +227,16 @@ def _upsert_aggregate(cursor, aggregate): metric, label, child = metric label = label.replace("'", "") # Postgres doesn't like quotes + try: + metric, histogram = _get_complete_histogram(channel, metric, payload["histogram"]) + histogram += [payload["count"]] + except KeyError: # TODO: ignore expired histograms + continue + dimensions["metric"] = metric dimensions["label"] = label dimensions["child"] = child - try: - histogram = _get_complete_histogram(channel, metric, payload["histogram"]) + [payload["count"]] # Append count at the end - except KeyError: # TODO: ignore expired histograms - continue - cursor.execute("select add_buildid_metric(%s, %s, %s, %s, %s)", (channel, version, build_id, json.dumps(dimensions), histogram)) @@ -235,10 +245,17 @@ def _upsert_aggregates(aggregates, dry_run=False): cursor = conn.cursor() submission_date, channel, version, build_id = aggregates[0] - cursor.execute(u"select was_buildid_processed(%s, %s, %s, %s)", (channel, version, build_id, submission_date)) - if cursor.fetchone()[0]: - # This aggregate has already been processed - return + while(True): + try: + cursor.execute(u"select was_buildid_processed(%s, %s, %s, %s)", (channel, version, build_id, submission_date)) + if cursor.fetchone()[0]: + # This aggregate has already been processed + return + else: + break + except psycopg2.IntegrityError: # Multiple aggregates from different submission dates might try to insert at the same time + conn.rollback() # Transaction is aborted after an error + continue for aggregate in aggregates[1]: _upsert_aggregate(cursor, aggregate) diff --git a/mozaggregator/service.py b/mozaggregator/service.py index 6242f4e..96a4cb0 100644 --- a/mozaggregator/service.py +++ b/mozaggregator/service.py @@ -1,14 +1,16 @@ -import ujson as json import argparse +import ujson as json from flask import Flask, request, abort -from db import create_connection +from db import create_connection, histogram_revision_map +from moztelemetry.histogram import Histogram +from aggregator import scalar_histogram_labels app = Flask(__name__) def execute_query(query, params=tuple()): - db = create_connection(host=host) + db = create_connection(host_override=host) cursor = db.cursor() cursor.execute(query, params) return cursor.fetchall() @@ -38,8 +40,29 @@ def get_buildid(channel, version, buildid): try: dimensions = json.dumps({k: v for k, v in request.args.iteritems()}) result = execute_query("select * from get_buildid_metric(%s, %s, %s, %s)", (channel, version, buildid, dimensions)) - pretty_result = map(lambda r: {"label": r[0], "histogram": r[1]}, result) + + if not result: # Metric not found + abort(404) + + pretty_result = [] + for row in result: + label = row[0] + histogram = row[1][:-1] + count = row[1][-1] + + # Retrieve labels for histogram + revision = histogram_revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown + try: + labels = Histogram(request.args["metric"], histogram, revision=revision).get_value().keys().tolist() + except Exception as e: + # Count histogram or simple measurement + # TODO: deal properly with those + labels = scalar_histogram_labels + + pretty_result.append({"label": label, "histogram": dict(zip(labels, histogram)), "count": count}) + return json.dumps(pretty_result) + except: abort(404) diff --git a/run-tests.sh b/run-tests.sh index 2896805..1ee5040 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -39,4 +39,9 @@ ${PGSQL_PATH}/postgres -F -k ${PGSQL_DATA} -D ${PGSQL_DATA} &> ${PGSQL_DATA}/out wait_for_line "database system is ready to accept connections" ${PGSQL_DATA}/out export DB_TEST_URL="postgresql:///?host=${PGSQL_DATA}&dbname=template1" +# Launch db service +mkfifo ${PGSQL_DATA}/out_service +python ./mozaggregator/service.py -d &> ${PGSQL_DATA}/out_service & +wait_for_line "* Restarting with reloader" ${PGSQL_DATA}/out_service + nosetests ./tests/test_db.py diff --git a/tests/test_db.py b/tests/test_db.py index 050ec69..175cbbc 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,27 +1,76 @@ import pyspark import logging -import psycopg2 -import os +import requests from mozaggregator.aggregator import _aggregate_metrics +from mozaggregator.db import create_connection, submit_aggregates from dataset import * def setup_module(): global aggregates - global db + global sc logger = logging.getLogger("py4j") logger.setLevel(logging.ERROR) sc = pyspark.SparkContext(master="local[*]") raw_pings = list(generate_pings()) - aggregates = _aggregate_metrics(sc.parallelize(raw_pings)).collect() - sc.stop() + aggregates = _aggregate_metrics(sc.parallelize(raw_pings)) - url = os.getenv("DB_TEST_URL") - db = psycopg2.connect(url) + +def teardown_module(): + sc.stop() def test_connection(): + db = create_connection() assert(db) + +SERVICE_URI = "http://localhost:5000" + +def test_submit(): + count = submit_aggregates(aggregates) + n_submission_dates = len(ping_dimensions["submission_date"]) + n_channels = len(ping_dimensions["channel"]) + n_versions = len(ping_dimensions["version"]) + n_build_ids = len(ping_dimensions["build_id"]) + assert(count == n_submission_dates*n_channels*n_versions*n_build_ids) + + +def test_channels(): + channels = requests.get("{}/channel/".format(SERVICE_URI)).json() + assert(set(channels) == set(ping_dimensions["channel"])) + + +def test_buildids(): + template_channel = ping_dimensions["channel"] + template_version = ping_dimensions["version"] + template_build_id = ping_dimensions["build_id"] + + for channel in template_channel: + buildids = requests.get("{}/channel/{}/buildid/".format(SERVICE_URI, channel)).json() + assert(len(buildids) == len(template_version)*len(template_build_id)) + + for buildid in buildids: + assert(set(buildid.keys()) == set(["buildid", "version"])) + assert(buildid["buildid"] in [x[:-6] for x in template_build_id]) + assert(buildid["version"] in [x.split('.')[0] for x in template_version]) + + +def test_metric(): + template_channel = ping_dimensions["channel"] + template_version = [x.split('.')[0] for x in ping_dimensions["version"]] + template_build_id = [x[:-6] for x in ping_dimensions["build_id"]] + + for channel in template_channel: + for version in template_version: + for buildid in template_build_id: + for histogram in histograms_template.iteritems(): + metric, value = histogram + res = requests.get("{}/channel/{}/buildid/{}_{}?metric={}".format(SERVICE_URI, channel, version, buildid, metric)).json() + assert(len(res) == 1) + print metric + print res + assert(False) + #res[0]["count"] ==