diff --git a/mozaggregator/aggregator.py b/mozaggregator/aggregator.py index 6161160..6185665 100644 --- a/mozaggregator/aggregator.py +++ b/mozaggregator/aggregator.py @@ -16,7 +16,7 @@ scalar_histogram_labels = cached_exponential_buckets(1, 30000, 50) def aggregate_metrics(sc, channels, submission_date, fraction=1): - """ Aggregates metrics over build-ids for a given submission date. + """ Returns the build-id and submission date aggregates for a given submission date. :param sc: A SparkContext instance :param channel: Either the name of a channel or a list/tuple of names @@ -34,7 +34,13 @@ def aggregate_metrics(sc, channels, submission_date, fraction=1): def _aggregate_metrics(pings): trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions) - return trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates) + build_id_aggregates = trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates) + submission_date_aggregates = build_id_aggregates.map(_map_build_id_key_to_submission_date_key).reduceByKey(_aggregate_aggregates) + return build_id_aggregates, submission_date_aggregates + + +def _map_build_id_key_to_submission_date_key(aggregate): + return tuple(aggregate[0][:3] + aggregate[0][4:]), aggregate[1] def _sample_clients(ping): diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index 64a5df4..9c0dc2c 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -9,23 +9,32 @@ from dataset import * def setup_module(): - global aggregates + global build_id_aggregates + global submission_date_aggregates logger = logging.getLogger("py4j") logger.setLevel(logging.ERROR) sc = pyspark.SparkContext(master="local[*]") raw_pings = list(generate_pings()) - aggregates = _aggregate_metrics(sc.parallelize(raw_pings)).collect() + build_id_aggregates, submission_date_aggregates = _aggregate_metrics(sc.parallelize(raw_pings)) + build_id_aggregates = build_id_aggregates.collect() + submission_date_aggregates = submission_date_aggregates.collect() + + # Note: most tests are based on the build-id aggregates as the aggregation + # code is the same for both scenarios. + sc.stop() def test_count(): - assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS == len(aggregates)) + num_build_ids = len(ping_dimensions["build_id"]) + assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS == len(build_id_aggregates)) + assert(len(list(generate_pings()))/NUM_PINGS_PER_DIMENSIONS/num_build_ids == len(submission_date_aggregates)) def test_keys(): - for aggregate in aggregates: + for aggregate in build_id_aggregates: submission_date, channel, version, build_id, app, arch, os, os_version, e10s = aggregate[0] assert(submission_date in ping_dimensions["submission_date"]) @@ -40,11 +49,25 @@ def test_keys(): else: assert(os_version in ping_dimensions["os_version"]) + for aggregate in submission_date_aggregates: + submission_date, channel, version, app, arch, os, os_version, e10s = aggregate[0] + + assert(submission_date in ping_dimensions["submission_date"]) + assert(channel in ping_dimensions["channel"]) + assert(version in [x.split('.')[0] for x in ping_dimensions["version"]]) + assert(app in ping_dimensions["application"]) + assert(arch in ping_dimensions["arch"]) + assert(os in ping_dimensions["os"]) + if os == "Linux": + assert(os_version in [x[:3] for x in ping_dimensions["os_version"]]) + else: + assert(os_version in ping_dimensions["os_version"]) + def test_simple_measurements(): metric_count = defaultdict(int) - for aggregate in aggregates: + for aggregate in build_id_aggregates: for key, value in aggregate[1].iteritems(): metric, label, child = key @@ -57,14 +80,14 @@ def test_simple_measurements(): assert len(metric_count) == len(simple_measurements_template) for v in metric_count.values(): - assert(v == len(aggregates)) + assert(v == len(build_id_aggregates)) def test_classic_histograms(): metric_count = defaultdict(int) histograms = {k: v for k, v in histograms_template.iteritems() if v["histogram_type"] != 4} - for aggregate in aggregates: + for aggregate in build_id_aggregates: for key, value in aggregate[1].iteritems(): metric, label, child = key histogram = histograms.get(metric, None) @@ -78,14 +101,14 @@ def test_classic_histograms(): assert(len(metric_count) == len(histograms)) for v in metric_count.values(): - assert(v == 2*len(aggregates)) # Count both child and parent metrics + assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics def test_count_histograms(): metric_count = defaultdict(int) histograms = {"[[SCALAR]]_{}".format(k): v for k, v in histograms_template.iteritems() if v["histogram_type"] == 4} - for aggregate in aggregates: + for aggregate in build_id_aggregates: for key, value in aggregate[1].iteritems(): metric, label, child = key histogram = histograms.get(metric, None) @@ -98,13 +121,13 @@ def test_count_histograms(): assert len(metric_count) == len(histograms) for v in metric_count.values(): - assert(v == 2*len(aggregates)) # Count both child and parent metrics + assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics def test_keyed_histograms(): metric_count = defaultdict(int) - for aggregate in aggregates: + for aggregate in build_id_aggregates: for key, value in aggregate[1].iteritems(): metric, label, child = key @@ -119,4 +142,4 @@ def test_keyed_histograms(): assert(len(metric_count) == len(keyed_histograms_template)) # Assume one label per keyed histogram for v in metric_count.values(): - assert(v == 2*len(aggregates)) # Count both child and parent metrics + assert(v == 2*len(build_id_aggregates)) # Count both child and parent metrics diff --git a/tests/test_db.py b/tests/test_db.py index 6522700..7840419 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -22,7 +22,8 @@ def setup_module(): sc = pyspark.SparkContext(master="local[*]") raw_pings = list(generate_pings()) - aggregates = _aggregate_metrics(sc.parallelize(raw_pings)) + build_id_aggregates, submission_date_aggregates = _aggregate_metrics(sc.parallelize(raw_pings)) + aggregates = build_id_aggregates def teardown_module():