This commit is contained in:
Roberto Agostino Vitillo 2015-06-12 17:32:41 +01:00
Родитель 6bced3f4f6
Коммит 1d16d0af97
4 изменённых файлов: 135 добавлений и 41 удалений

Просмотреть файл

@ -5,39 +5,42 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import psycopg2
import pandas as pd
import ujson as json
import boto.rds2
import os
from moztelemetry.spark import Histogram
from boto.s3.connection import S3Connection
# Use latest revision, we don't really care about histograms that have
# been removed. This only works though if histogram definitions are
# immutable, which has been the case so far.
_revision_map = {"nightly": "https://hg.mozilla.org/mozilla-central/rev/tip",
"aurora": "https://hg.mozilla.org/releases/mozilla-aurora/rev/tip",
"beta": "https://hg.mozilla.org/releases/mozilla-beta/rev/tip",
"release": "https://hg.mozilla.org/releases/mozilla-release/rev/tip"}
histogram_revision_map = {"nightly": "https://hg.mozilla.org/mozilla-central/rev/tip",
"aurora": "https://hg.mozilla.org/releases/mozilla-aurora/rev/tip",
"beta": "https://hg.mozilla.org/releases/mozilla-beta/rev/tip",
"release": "https://hg.mozilla.org/releases/mozilla-release/rev/tip"}
def create_connection(autocommit=True, host=None):
def create_connection(autocommit=True, host_override=None):
# import boto.rds2 # The serializer doesn't pick this one up for some reason when using emacs...
s3 = S3Connection()
config = s3.get_bucket("telemetry-spark-emr").get_key("aggregator_credentials").get_contents_as_string()
config = json.loads(config)
connection_string = os.getenv("DB_TEST_URL") # Used only for testing
if connection_string:
conn = psycopg2.connect(connection_string)
else:
s3 = S3Connection()
config = s3.get_bucket("telemetry-spark-emr").get_key("aggregator_credentials").get_contents_as_string()
config = json.loads(config)
rds = boto.rds2.connect_to_region("us-west-2")
db = rds.describe_db_instances("telemetry-aggregates")["DescribeDBInstancesResponse"]["DescribeDBInstancesResult"]["DBInstances"][0]
host = host or db["Endpoint"]["Address"]
dbname = db["DBName"]
user = db["MasterUsername"]
rds = boto.rds2.connect_to_region("us-west-2")
db = rds.describe_db_instances("telemetry-aggregates")["DescribeDBInstancesResponse"]["DescribeDBInstancesResult"]["DBInstances"][0]
host = host_override or db["Endpoint"]["Address"]
dbname = db["DBName"]
user = db["MasterUsername"]
conn = psycopg2.connect(dbname=dbname, user=user, password=config["password"], host=host)
conn = psycopg2.connect(dbname=dbname, user=user, password=config["password"], host=host)
if autocommit:
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
@ -47,8 +50,13 @@ def create_connection(autocommit=True, host=None):
def submit_aggregates(aggregates, dry_run=False):
_preparedb()
aggregates.groupBy(lambda x: x[0][:4]).map(lambda x: _upsert_aggregates(x, dry_run)).count()
count = aggregates.groupBy(lambda x: x[0][:4]).\
map(lambda x: _upsert_aggregates(x, dry_run=dry_run)).\
count()
_vacuumdb()
return count
def _preparedb():
@ -155,11 +163,11 @@ begin
E'select t.matches[2], t.matches[3] from
(select regexp_matches(table_name::text, \\'([^_]*)_([0-9]*)_([0-9]*)\\')
from information_schema.tables
where table_schema=\\'public\\' and table_type=\\'BASE TABLE\\' and table_name like \'' || channel || E'%\\'
where table_schema=\\'public\\' and table_type=\\'BASE TABLE\\' and table_name like \\'' || channel || E'%\\'
order by table_name desc) as t (matches)';
end
$$ language plpgsql strict;
create or replace function list_channels() returns table(channel text) as $$
begin
return query execute
@ -194,14 +202,15 @@ select create_tables();
def _get_complete_histogram(channel, metric, values):
revision = _revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown
revision = histogram_revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown
if metric.endswith("_SCALAR"):
histogram = pd.Series(values).values # histogram is already complete
metric = metric[:-7]
else:
histogram = Histogram(metric, {"values": values}, revision=revision).get_value(autocast=False).values
return map(int, list(histogram))
return metric, map(int, list(histogram))
def _upsert_aggregate(cursor, aggregate):
@ -218,15 +227,16 @@ def _upsert_aggregate(cursor, aggregate):
metric, label, child = metric
label = label.replace("'", "") # Postgres doesn't like quotes
try:
metric, histogram = _get_complete_histogram(channel, metric, payload["histogram"])
histogram += [payload["count"]]
except KeyError: # TODO: ignore expired histograms
continue
dimensions["metric"] = metric
dimensions["label"] = label
dimensions["child"] = child
try:
histogram = _get_complete_histogram(channel, metric, payload["histogram"]) + [payload["count"]] # Append count at the end
except KeyError: # TODO: ignore expired histograms
continue
cursor.execute("select add_buildid_metric(%s, %s, %s, %s, %s)", (channel, version, build_id, json.dumps(dimensions), histogram))
@ -235,10 +245,17 @@ def _upsert_aggregates(aggregates, dry_run=False):
cursor = conn.cursor()
submission_date, channel, version, build_id = aggregates[0]
cursor.execute(u"select was_buildid_processed(%s, %s, %s, %s)", (channel, version, build_id, submission_date))
if cursor.fetchone()[0]:
# This aggregate has already been processed
return
while(True):
try:
cursor.execute(u"select was_buildid_processed(%s, %s, %s, %s)", (channel, version, build_id, submission_date))
if cursor.fetchone()[0]:
# This aggregate has already been processed
return
else:
break
except psycopg2.IntegrityError: # Multiple aggregates from different submission dates might try to insert at the same time
conn.rollback() # Transaction is aborted after an error
continue
for aggregate in aggregates[1]:
_upsert_aggregate(cursor, aggregate)

Просмотреть файл

@ -1,14 +1,16 @@
import ujson as json
import argparse
import ujson as json
from flask import Flask, request, abort
from db import create_connection
from db import create_connection, histogram_revision_map
from moztelemetry.histogram import Histogram
from aggregator import scalar_histogram_labels
app = Flask(__name__)
def execute_query(query, params=tuple()):
db = create_connection(host=host)
db = create_connection(host_override=host)
cursor = db.cursor()
cursor.execute(query, params)
return cursor.fetchall()
@ -38,8 +40,29 @@ def get_buildid(channel, version, buildid):
try:
dimensions = json.dumps({k: v for k, v in request.args.iteritems()})
result = execute_query("select * from get_buildid_metric(%s, %s, %s, %s)", (channel, version, buildid, dimensions))
pretty_result = map(lambda r: {"label": r[0], "histogram": r[1]}, result)
if not result: # Metric not found
abort(404)
pretty_result = []
for row in result:
label = row[0]
histogram = row[1][:-1]
count = row[1][-1]
# Retrieve labels for histogram
revision = histogram_revision_map.get(channel, "nightly") # Use nightly revision if the channel is unknown
try:
labels = Histogram(request.args["metric"], histogram, revision=revision).get_value().keys().tolist()
except Exception as e:
# Count histogram or simple measurement
# TODO: deal properly with those
labels = scalar_histogram_labels
pretty_result.append({"label": label, "histogram": dict(zip(labels, histogram)), "count": count})
return json.dumps(pretty_result)
except:
abort(404)

Просмотреть файл

@ -39,4 +39,9 @@ ${PGSQL_PATH}/postgres -F -k ${PGSQL_DATA} -D ${PGSQL_DATA} &> ${PGSQL_DATA}/out
wait_for_line "database system is ready to accept connections" ${PGSQL_DATA}/out
export DB_TEST_URL="postgresql:///?host=${PGSQL_DATA}&dbname=template1"
# Launch db service
mkfifo ${PGSQL_DATA}/out_service
python ./mozaggregator/service.py -d &> ${PGSQL_DATA}/out_service &
wait_for_line "* Restarting with reloader" ${PGSQL_DATA}/out_service
nosetests ./tests/test_db.py

Просмотреть файл

@ -1,27 +1,76 @@
import pyspark
import logging
import psycopg2
import os
import requests
from mozaggregator.aggregator import _aggregate_metrics
from mozaggregator.db import create_connection, submit_aggregates
from dataset import *
def setup_module():
global aggregates
global db
global sc
logger = logging.getLogger("py4j")
logger.setLevel(logging.ERROR)
sc = pyspark.SparkContext(master="local[*]")
raw_pings = list(generate_pings())
aggregates = _aggregate_metrics(sc.parallelize(raw_pings)).collect()
sc.stop()
aggregates = _aggregate_metrics(sc.parallelize(raw_pings))
url = os.getenv("DB_TEST_URL")
db = psycopg2.connect(url)
def teardown_module():
sc.stop()
def test_connection():
db = create_connection()
assert(db)
SERVICE_URI = "http://localhost:5000"
def test_submit():
count = submit_aggregates(aggregates)
n_submission_dates = len(ping_dimensions["submission_date"])
n_channels = len(ping_dimensions["channel"])
n_versions = len(ping_dimensions["version"])
n_build_ids = len(ping_dimensions["build_id"])
assert(count == n_submission_dates*n_channels*n_versions*n_build_ids)
def test_channels():
channels = requests.get("{}/channel/".format(SERVICE_URI)).json()
assert(set(channels) == set(ping_dimensions["channel"]))
def test_buildids():
template_channel = ping_dimensions["channel"]
template_version = ping_dimensions["version"]
template_build_id = ping_dimensions["build_id"]
for channel in template_channel:
buildids = requests.get("{}/channel/{}/buildid/".format(SERVICE_URI, channel)).json()
assert(len(buildids) == len(template_version)*len(template_build_id))
for buildid in buildids:
assert(set(buildid.keys()) == set(["buildid", "version"]))
assert(buildid["buildid"] in [x[:-6] for x in template_build_id])
assert(buildid["version"] in [x.split('.')[0] for x in template_version])
def test_metric():
template_channel = ping_dimensions["channel"]
template_version = [x.split('.')[0] for x in ping_dimensions["version"]]
template_build_id = [x[:-6] for x in ping_dimensions["build_id"]]
for channel in template_channel:
for version in template_version:
for buildid in template_build_id:
for histogram in histograms_template.iteritems():
metric, value = histogram
res = requests.get("{}/channel/{}/buildid/{}_{}?metric={}".format(SERVICE_URI, channel, version, buildid, metric)).json()
assert(len(res) == 1)
print metric
print res
assert(False)
#res[0]["count"] ==