This commit is contained in:
Roberto Agostino Vitillo 2015-06-15 12:00:42 +01:00
Родитель d315f9b2bf
Коммит f904c7b3fb
3 изменённых файлов: 45 добавлений и 15 удалений

Просмотреть файл

@ -16,7 +16,7 @@ scalar_histogram_labels = cached_exponential_buckets(1, 30000, 50)
def aggregate_metrics(sc, channels, submission_date, fraction=1): def aggregate_metrics(sc, channels, submission_date, fraction=1):
""" Aggregates metrics over build-ids for a given submission date. """ Returns the build-id and submission date aggregates for a given submission date.
:param sc: A SparkContext instance :param sc: A SparkContext instance
:param channel: Either the name of a channel or a list/tuple of names :param channel: Either the name of a channel or a list/tuple of names
@ -34,7 +34,13 @@ def aggregate_metrics(sc, channels, submission_date, fraction=1):
def _aggregate_metrics(pings): def _aggregate_metrics(pings):
trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions) trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions)
return trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates) build_id_aggregates = trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates)
submission_date_aggregates = build_id_aggregates.map(_map_build_id_key_to_submission_date_key).reduceByKey(_aggregate_aggregates)
return build_id_aggregates, submission_date_aggregates
def _map_build_id_key_to_submission_date_key(aggregate):
return tuple(aggregate[0][:3] + aggregate[0][4:]), aggregate[1]
def _sample_clients(ping): def _sample_clients(ping):

Просмотреть файл

@ -9,23 +9,32 @@ from dataset import *
def setup_module(): def setup_module():
global aggregates global build_id_aggregates
global submission_date_aggregates
logger = logging.getLogger("py4j") logger = logging.getLogger("py4j")
logger.setLevel(logging.ERROR) logger.setLevel(logging.ERROR)
sc = pyspark.SparkContext(master="local[*]") sc = pyspark.SparkContext(master="local[*]")
raw_pings = list(generate_pings()) raw_pings = list(generate_pings())
aggregates = _aggregate_metrics(sc.parallelize(raw_pings)).collect() build_id_aggregates, submission_date_aggregates = _aggregate_metrics(sc.parallelize(raw_pings))
build_id_aggregates = build_id_aggregates.collect()
submission_date_aggregates = submission_date_aggregates.collect()
# Note: most tests are based on the build-id aggregates as the aggregation
# code is the same for both scenarios.
sc.stop() sc.stop()
def test_count(): def test_count():
assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS == len(aggregates)) num_build_ids = len(ping_dimensions["build_id"])
assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS == len(build_id_aggregates))
assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS/num_build_ids == len(submission_date_aggregates))
def test_keys(): def test_keys():
for aggregate in aggregates: for aggregate in build_id_aggregates:
submission_date, channel, version, build_id, app, arch, os, os_version, e10s = aggregate[0] submission_date, channel, version, build_id, app, arch, os, os_version, e10s = aggregate[0]
assert(submission_date in ping_dimensions["submission_date"]) assert(submission_date in ping_dimensions["submission_date"])
@ -40,11 +49,25 @@ def test_keys():
else: else:
assert(os_version in ping_dimensions["os_version"]) assert(os_version in ping_dimensions["os_version"])
for aggregate in submission_date_aggregates:
submission_date, channel, version, app, arch, os, os_version, e10s = aggregate[0]
assert(submission_date in ping_dimensions["submission_date"])
assert(channel in ping_dimensions["channel"])
assert(version in [x.split('.')[0] for x in ping_dimensions["version"]])
assert(app in ping_dimensions["application"])
assert(arch in ping_dimensions["arch"])
assert(os in ping_dimensions["os"])
if os == "Linux":
assert(os_version in [x[:3] for x in ping_dimensions["os_version"]])
else:
assert(os_version in ping_dimensions["os_version"])
def test_simple_measurements(): def test_simple_measurements():
metric_count = defaultdict(int) metric_count = defaultdict(int)
for aggregate in aggregates: for aggregate in build_id_aggregates:
for key, value in aggregate[1].iteritems(): for key, value in aggregate[1].iteritems():
metric, label, child = key metric, label, child = key
@ -57,14 +80,14 @@ def test_simple_measurements():
assert len(metric_count) == len(simple_measurements_template) assert len(metric_count) == len(simple_measurements_template)
for v in metric_count.values(): for v in metric_count.values():
assert(v == len(aggregates)) assert(v == len(build_id_aggregates))
def test_classic_histograms(): def test_classic_histograms():
metric_count = defaultdict(int) metric_count = defaultdict(int)
histograms = {k: v for k, v in histograms_template.iteritems() if v["histogram_type"] != 4} histograms = {k: v for k, v in histograms_template.iteritems() if v["histogram_type"] != 4}
for aggregate in aggregates: for aggregate in build_id_aggregates:
for key, value in aggregate[1].iteritems(): for key, value in aggregate[1].iteritems():
metric, label, child = key metric, label, child = key
histogram = histograms.get(metric, None) histogram = histograms.get(metric, None)
@ -78,14 +101,14 @@ def test_classic_histograms():
assert(len(metric_count) == len(histograms)) assert(len(metric_count) == len(histograms))
for v in metric_count.values(): for v in metric_count.values():
assert(v == 2*len(aggregates)) # Count both child and parent metrics assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics
def test_count_histograms(): def test_count_histograms():
metric_count = defaultdict(int) metric_count = defaultdict(int)
histograms = {"[[SCALAR]]_{}".format(k): v for k, v in histograms_template.iteritems() if v["histogram_type"] == 4} histograms = {"[[SCALAR]]_{}".format(k): v for k, v in histograms_template.iteritems() if v["histogram_type"] == 4}
for aggregate in aggregates: for aggregate in build_id_aggregates:
for key, value in aggregate[1].iteritems(): for key, value in aggregate[1].iteritems():
metric, label, child = key metric, label, child = key
histogram = histograms.get(metric, None) histogram = histograms.get(metric, None)
@ -98,13 +121,13 @@ def test_count_histograms():
assert len(metric_count) == len(histograms) assert len(metric_count) == len(histograms)
for v in metric_count.values(): for v in metric_count.values():
assert(v == 2*len(aggregates)) # Count both child and parent metrics assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics
def test_keyed_histograms(): def test_keyed_histograms():
metric_count = defaultdict(int) metric_count = defaultdict(int)
for aggregate in aggregates: for aggregate in build_id_aggregates:
for key, value in aggregate[1].iteritems(): for key, value in aggregate[1].iteritems():
metric, label, child = key metric, label, child = key
@ -119,4 +142,4 @@ def test_keyed_histograms():
assert(len(metric_count) == len(keyed_histograms_template)) # Assume one label per keyed histogram assert(len(metric_count) == len(keyed_histograms_template)) # Assume one label per keyed histogram
for v in metric_count.values(): for v in metric_count.values():
assert(v == 2*len(aggregates)) # Count both child and parent metrics assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics

Просмотреть файл

@ -22,7 +22,8 @@ def setup_module():
sc = pyspark.SparkContext(master="local[*]") sc = pyspark.SparkContext(master="local[*]")
raw_pings = list(generate_pings()) raw_pings = list(generate_pings())
aggregates = _aggregate_metrics(sc.parallelize(raw_pings)) build_id_aggregates, submission_date_aggregates = _aggregate_metrics(sc.parallelize(raw_pings))
aggregates = build_id_aggregates
def teardown_module(): def teardown_module():