Add some tests for the Spark aggregation job.
This commit is contained in:
Родитель
97aa4520f1
Коммит
5cad4a86a6
|
@ -11,7 +11,7 @@ from moztelemetry.spark import get_pings
|
|||
from moztelemetry.histogram import cached_exponential_buckets
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
# Simple measurement histogram
|
||||
_exponential_index = cached_exponential_buckets(1, 30000, 50)
|
||||
|
||||
|
||||
|
@ -29,17 +29,29 @@ def aggregate_metrics(sc, channels, submission_date, fraction=1):
|
|||
channels = set(channels)
|
||||
rdds = [get_pings(sc, channel=ch, submission_date=submission_date, doc_type="saved_session", schema="v4", fraction=fraction) for ch in channels]
|
||||
pings = reduce(lambda x, y: x.union(y), rdds)
|
||||
return _aggregate_metrics(pings)
|
||||
|
||||
|
||||
def _aggregate_metrics(pings):
|
||||
trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions)
|
||||
return trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates)
|
||||
|
||||
|
||||
def _sample_clients(ping):
|
||||
client_id = ping.get("clientId", None)
|
||||
|
||||
if not client_id:
|
||||
return False
|
||||
|
||||
channel = ping["application"]["channel"]
|
||||
percentage = {"nightly": 100,
|
||||
"aurora": 100,
|
||||
"beta": 100}
|
||||
"beta": 100,
|
||||
"release": 100}
|
||||
|
||||
if channel not in percentage:
|
||||
return False
|
||||
|
||||
return client_id and ((binascii.crc32(client_id) % 100) < percentage[channel])
|
||||
|
||||
|
||||
|
@ -93,19 +105,20 @@ def _extract_simple_measures(state, simple):
|
|||
|
||||
def _extract_simple_measure(state, name, value):
|
||||
accessor = (name, u"", False)
|
||||
aggregated_histogram = state[accessor]["histogram"] = {}
|
||||
state[accessor]["count"] = 1
|
||||
bucket_found = False
|
||||
aggregated_histogram = state[accessor]["histogram"] = state[accessor].get("histogram", {})
|
||||
state[accessor]["count"] = state[accessor].get("count", 0) + 1
|
||||
|
||||
if not aggregated_histogram: # First time initialization
|
||||
for bucket in _exponential_index:
|
||||
aggregated_histogram[unicode(bucket)] = 0
|
||||
|
||||
insert_bucket = _exponential_index[0] # Initialized to underflow bucket
|
||||
|
||||
for bucket in reversed(_exponential_index):
|
||||
if not bucket_found and value >= bucket:
|
||||
if value >= bucket:
|
||||
insert_bucket = bucket
|
||||
bucket_found = True
|
||||
else:
|
||||
aggregated_histogram[bucket] = 0
|
||||
break
|
||||
|
||||
aggregated_histogram[insert_bucket] = 1
|
||||
aggregated_histogram[unicode(insert_bucket)] += 1
|
||||
|
||||
|
||||
def _extract_children_histograms(state, payload):
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
import pyspark
|
||||
import uuid
|
||||
import logging
|
||||
import pandas as pd
|
||||
|
||||
from collections import defaultdict
|
||||
from mozaggregator.aggregator import _aggregate_metrics
|
||||
from moztelemetry.histogram import cached_exponential_buckets
|
||||
|
||||
NUM_CHILDREN_PER_PING = 3
|
||||
NUM_PINGS_PER_DIMENSIONS = 2
|
||||
|
||||
ping_dimensions = {"submission_date": [u"20150601", u"20150602"],
|
||||
"channel": [u"nightly", u"aurora", u"beta", u"release"],
|
||||
"version": [u"40.0a1", u"41"],
|
||||
"build_id": [u"20150601000000", u"20150602000000"],
|
||||
"application": [u"Firefox", u"Fennec"],
|
||||
"arch": [u"x86", u"x86-64"],
|
||||
"revision": [u"https://hg.mozilla.org/mozilla-central/rev/ac277e615f8f",
|
||||
u"https://hg.mozilla.org/mozilla-central/rev/bc277e615f9f"],
|
||||
"os": [u"Linux", u"Windows_NT"],
|
||||
"os_version": [u"6.1", u"3.1.12"]}
|
||||
|
||||
histograms_template = {u"EVENTLOOP_UI_LAG_EXP_MS": {u'bucket_count': 20,
|
||||
u'histogram_type': 0,
|
||||
u'log_sum': 0,
|
||||
u'log_sum_squares': 0,
|
||||
u'range': [50, 60000],
|
||||
u'sum': 9362,
|
||||
u'values': {u'0': 0,
|
||||
u'110': 16,
|
||||
u'1170': 0,
|
||||
u'163': 8,
|
||||
u'242': 5,
|
||||
u'359': 2,
|
||||
u'50': 18,
|
||||
u'74': 16,
|
||||
u'789': 1}}}
|
||||
|
||||
keyed_histograms_template = {u'BLOCKED_ON_PLUGIN_INSTANCE_DESTROY_MS':
|
||||
{u'Shockwave Flash17.0.0.188': {u'bucket_count': 20,
|
||||
u'histogram_type': 0,
|
||||
u'log_sum': 696.68039953709,
|
||||
u'log_sum_squares': 3202.8813306447,
|
||||
u'range': [1, 10000],
|
||||
u'sum': 19568,
|
||||
u'values': {u'103': 78,
|
||||
u'13': 4,
|
||||
u'1306': 0,
|
||||
u'171': 12,
|
||||
u'2': 0,
|
||||
u'22': 2,
|
||||
u'284': 6,
|
||||
u'3': 8,
|
||||
u'37': 4,
|
||||
u'472': 1,
|
||||
u'5': 22,
|
||||
u'62': 21,
|
||||
u'785': 1,
|
||||
u'8': 8}}}}
|
||||
|
||||
simple_measurements_template = {"uptime": 42, "addonManager": {u'XPIDB_parseDB_MS': 42}}
|
||||
|
||||
|
||||
def generate_pings():
|
||||
for submission_date in ping_dimensions["submission_date"]:
|
||||
for channel in ping_dimensions["channel"]:
|
||||
for version in ping_dimensions["version"]:
|
||||
for build_id in ping_dimensions["build_id"]:
|
||||
for application in ping_dimensions["application"]:
|
||||
for arch in ping_dimensions["arch"]:
|
||||
for revision in ping_dimensions["revision"]:
|
||||
for os in ping_dimensions["os"]:
|
||||
for os_version in ping_dimensions["os_version"]:
|
||||
for i in range(NUM_PINGS_PER_DIMENSIONS):
|
||||
dimensions = {u"submission_date": submission_date,
|
||||
u"channel": channel,
|
||||
u"version": version,
|
||||
u"build_id": build_id,
|
||||
u"application": application,
|
||||
u"arch": arch,
|
||||
u"revision": revision,
|
||||
u"os": os,
|
||||
u"os_version": os_version}
|
||||
yield generate_payload(dimensions)
|
||||
|
||||
|
||||
def generate_payload(dimensions):
|
||||
meta = {u"submissionDate": dimensions["submission_date"]}
|
||||
application = {u"channel": dimensions["channel"],
|
||||
u"version": dimensions["version"],
|
||||
u"buildId": dimensions["build_id"],
|
||||
u"name": dimensions["application"],
|
||||
u"architecture": dimensions["arch"]}
|
||||
|
||||
child_payloads = [{"histograms": histograms_template,
|
||||
"keyedHistograms": keyed_histograms_template}
|
||||
for i in range(NUM_CHILDREN_PER_PING)]
|
||||
|
||||
payload = {u"info": {u"revision": dimensions["revision"]},
|
||||
u"simpleMeasurements": simple_measurements_template,
|
||||
u"histograms": histograms_template,
|
||||
u"keyedHistograms": keyed_histograms_template,
|
||||
u"childPayloads": child_payloads}
|
||||
environment = {u"system": {u"os": {u"name": dimensions["os"],
|
||||
u"version": dimensions["os_version"]}}}
|
||||
|
||||
return {u"clientId": str(uuid.uuid4()),
|
||||
u"meta": meta,
|
||||
u"application": application,
|
||||
u"payload": payload,
|
||||
u"environment": environment}
|
||||
|
||||
|
||||
def setup_module():
|
||||
global aggregates
|
||||
|
||||
logger = logging.getLogger("py4j")
|
||||
logger.setLevel(logging.ERROR)
|
||||
|
||||
sc = pyspark.SparkContext(master="local[*]")
|
||||
raw_pings = list(generate_pings())
|
||||
aggregates = _aggregate_metrics(sc.parallelize(raw_pings)).collect()
|
||||
sc.stop()
|
||||
|
||||
|
||||
def test_count():
|
||||
assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS == len(aggregates))
|
||||
|
||||
|
||||
def test_keys():
|
||||
for aggregate in aggregates:
|
||||
submission_date, channel, version, build_id, app, arch, revision, os, os_version = aggregate[0]
|
||||
|
||||
assert(submission_date in ping_dimensions["submission_date"])
|
||||
assert(channel in ping_dimensions["channel"])
|
||||
assert(version in [x.split('.')[0] for x in ping_dimensions["version"]])
|
||||
assert(build_id in [x[:8] for x in ping_dimensions["build_id"]])
|
||||
assert(app in ping_dimensions["application"])
|
||||
assert(arch in ping_dimensions["arch"])
|
||||
assert(revision in [x[-12:] for x in ping_dimensions["revision"]])
|
||||
assert(os in ping_dimensions["os"])
|
||||
if os == "Linux":
|
||||
assert(os_version in [x[:3] for x in ping_dimensions["os_version"]])
|
||||
else:
|
||||
assert(os_version in ping_dimensions["os_version"])
|
||||
|
||||
|
||||
def test_simple_measurements():
|
||||
metric_count = defaultdict(int)
|
||||
exponential_index = set([unicode(x) for x in cached_exponential_buckets(1, 30000, 50)])
|
||||
|
||||
for aggregate in aggregates:
|
||||
for key, value in aggregate[1].iteritems():
|
||||
metric, label, child = key
|
||||
|
||||
if metric.startswith("SIMPLE_MEASURES"):
|
||||
metric_count[metric] += 1
|
||||
assert(label == "")
|
||||
assert(child is False)
|
||||
assert(value["count"] == NUM_PINGS_PER_DIMENSIONS)
|
||||
assert(set(value["histogram"].keys()) == exponential_index)
|
||||
assert(value["histogram"]["35"] == NUM_PINGS_PER_DIMENSIONS)
|
||||
|
||||
assert len(metric_count) == len(simple_measurements_template)
|
||||
for v in metric_count.values():
|
||||
assert(v == len(aggregates))
|
||||
|
||||
|
||||
def test_classic_histograms():
|
||||
metric_count = defaultdict(int)
|
||||
|
||||
for aggregate in aggregates:
|
||||
for key, value in aggregate[1].iteritems():
|
||||
metric, label, child = key
|
||||
|
||||
if metric in histograms_template.keys():
|
||||
metric_count[metric] += 1
|
||||
assert(label == "")
|
||||
assert(value["count"] == NUM_PINGS_PER_DIMENSIONS*(NUM_CHILDREN_PER_PING if child else 1))
|
||||
|
||||
histogram_template = histograms_template[metric]["values"]
|
||||
assert(set(histogram_template.keys()) == set(value["histogram"].keys()))
|
||||
assert((pd.Series(histogram_template)*value["count"] == pd.Series(value["histogram"])).all())
|
||||
|
||||
assert(len(metric_count) == len(histograms_template))
|
||||
for v in metric_count.values():
|
||||
assert(v == 2*len(aggregates)) # Count both child and parent metrics
|
||||
|
||||
|
||||
def test_keyed_histograms():
|
||||
metric_count = defaultdict(int)
|
||||
|
||||
for aggregate in aggregates:
|
||||
for key, value in aggregate[1].iteritems():
|
||||
metric, label, child = key
|
||||
|
||||
if metric in keyed_histograms_template.keys():
|
||||
metric_count["{}_{}".format(metric, label)] += 1
|
||||
assert(label != "")
|
||||
assert(value["count"] == NUM_PINGS_PER_DIMENSIONS*(NUM_CHILDREN_PER_PING if child else 1))
|
||||
|
||||
histogram_template = keyed_histograms_template[metric][label]["values"]
|
||||
assert(set(histogram_template.keys()) == set(value["histogram"].keys()))
|
||||
assert((pd.Series(histogram_template)*value["count"] == pd.Series(value["histogram"])).all())
|
||||
|
||||
assert(len(metric_count) == len(keyed_histograms_template)) # Assume one label per keyed histogram
|
||||
for v in metric_count.values():
|
||||
assert(v == 2*len(aggregates)) # Count both child and parent metrics
|
Загрузка…
Ссылка в новой задаче