python_mozaggregator/mozaggregator/aggregator.py

340 строки
12 KiB
Python

#!/usr/bin/env python
# encoding: utf-8
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from collections import defaultdict
from moztelemetry.dataset import Dataset
from moztelemetry.histogram import cached_exponential_buckets
from mozaggregator.bigquery import BigQueryDataset
# Simple measurement, count histogram, and numeric scalars labels & prefixes
SIMPLE_MEASURES_LABELS = cached_exponential_buckets(1, 30000, 50)
COUNT_HISTOGRAM_LABELS = [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 23, 25, 27, 29, 31, 34,
37, 40, 43, 46, 50, 54, 58, 63, 68, 74, 80, 86, 93, 101, 109, 118, 128, 138, 149, 161, 174, 188,
203, 219, 237, 256, 277, 299, 323, 349, 377, 408, 441, 477, 516, 558, 603, 652, 705, 762, 824,
891, 963, 1041, 1125, 1216, 1315, 1422, 1537, 1662, 1797, 1943, 2101, 2271, 2455, 2654, 2869,
3102, 3354, 3626, 3920, 4238, 4582, 4954, 5356, 5791, 6261, 6769, 7318, 7912, 8554, 9249, 10000,
]
NUMERIC_SCALARS_LABELS = COUNT_HISTOGRAM_LABELS
SIMPLE_MEASURES_PREFIX = 'SIMPLE_MEASURES'
COUNT_HISTOGRAM_PREFIX = '[[COUNT]]'
NUMERIC_SCALARS_PREFIX = 'SCALARS'
SCALAR_MEASURE_MAP = {
SIMPLE_MEASURES_PREFIX: SIMPLE_MEASURES_LABELS,
COUNT_HISTOGRAM_PREFIX: COUNT_HISTOGRAM_LABELS,
NUMERIC_SCALARS_PREFIX: NUMERIC_SCALARS_LABELS
}
PROCESS_TYPES = {"parent", "content", "gpu"}
def aggregate_metrics(
sc,
channels,
submission_date,
main_ping_fraction=1,
num_reducers=10000,
source="moztelemetry",
project_id=None,
dataset_id=None,
avro_prefix=None,
):
""" Returns the build-id and submission date aggregates for a given submission date.
:param sc: A SparkContext instance
:param channel: Either the name of a channel or a list/tuple of names
:param submission-date: The submission date for which the data will be aggregated
:param fraction: An approximative fraction of submissions to consider for aggregation
"""
if not isinstance(channels, (tuple, list)):
channels = [channels]
if source == "bigquery" and project_id and dataset_id:
dataset = BigQueryDataset()
pings = dataset.load(
project_id,
dataset_id,
"main",
submission_date,
channels,
"normalized_app_name <> 'Fennec'"
)
elif source == "avro" and avro_prefix:
dataset = BigQueryDataset()
pings = dataset.load_avro(
avro_prefix,
"main",
submission_date,
channels,
"normalized_app_name <> 'Fennec'"
)
else:
pings = Dataset.from_source('telemetry') \
.where(appUpdateChannel=lambda x: x in channels,
submissionDate=submission_date,
docType='main',
sourceVersion='4',
appName=lambda x: x != 'Fennec') \
.records(sc, sample=main_ping_fraction)
return _aggregate_metrics(pings, num_reducers)
def _aggregate_metrics(pings, num_reducers=10000):
trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions).filter(lambda x: x)
build_id_aggregates = trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates, num_reducers)
submission_date_aggregates = build_id_aggregates.map(_map_build_id_key_to_submission_date_key).reduceByKey(_aggregate_aggregates)
return build_id_aggregates, submission_date_aggregates
def _map_build_id_key_to_submission_date_key(aggregate):
return tuple(aggregate[0][:3] + aggregate[0][4:]), aggregate[1]
def _sample_clients(ping):
try:
sample_id = ping.get("meta", {}).get("sampleId")
if not isinstance(sample_id, (int, float)):
return False
# Here "aurora" is actually the dev edition.
if ping.get("application", {}).get("channel") not in ("nightly", "aurora", "beta", "release"):
return False
return sample_id < 100
except: # noqa
return False
def _extract_histograms(state, payload, process_type="parent"):
if not isinstance(payload, dict):
return
histograms = payload.get("histograms", {})
_extract_main_histograms(state, histograms, process_type)
keyed_histograms = payload.get("keyedHistograms", {})
if not isinstance(keyed_histograms, dict):
return
for name, histograms in keyed_histograms.items():
# See Bug 1275010 and 1275019
if name in ["MESSAGE_MANAGER_MESSAGE_SIZE",
"VIDEO_DETAILED_DROPPED_FRAMES_PROPORTION"]:
continue
_extract_keyed_histograms(state, name, histograms, process_type)
def _extract_histogram(state, histogram, histogram_name, label, process_type):
if not isinstance(histogram, dict):
return
values = histogram.get("values")
if not isinstance(values, dict):
return
sum = histogram.get("sum")
if not isinstance(sum, int) or sum < 0:
return
histogram_type = histogram.get("histogram_type")
if not isinstance(histogram_type, int):
return
if histogram_type == 4: # Count histogram
return _extract_scalar_value(
state, '_'.join((COUNT_HISTOGRAM_PREFIX, histogram_name)), label,
sum, COUNT_HISTOGRAM_LABELS, process_type=process_type)
# Note that some dimensions don't vary within a single submissions
# (e.g. channel) while some do (e.g. process type).
# The latter should appear within the key of a single metric.
accessor = (histogram_name, label, process_type)
aggregated_histogram = state[accessor]["histogram"] = state[accessor].get("histogram", {})
state[accessor]["sum"] = state[accessor].get("sum", 0) + sum
state[accessor]["count"] = state[accessor].get("count", 0) + 1
for k, v in values.items():
try:
int(k)
except ValueError:
# We have seen some histograms with non-integer bucket keys.
continue
v = v if isinstance(v, int) else 0
aggregated_histogram[k] = aggregated_histogram.get(k, 0) + v
def _extract_main_histograms(state, histograms, process_type):
if not isinstance(histograms, dict):
return
for histogram_name, histogram in histograms.items():
_extract_histogram(state, histogram, histogram_name, "", process_type)
def _extract_keyed_histograms(state, histogram_name, histograms, process_type):
if not isinstance(histograms, dict):
return
for key, histogram in histograms.items():
_extract_histogram(state, histogram, histogram_name, key, process_type)
def _extract_simple_measures(state, simple, process_type="parent"):
if not isinstance(simple, dict):
return
for name, value in simple.items():
if isinstance(value, dict):
for sub_name, sub_value in value.items():
if isinstance(sub_value, (int, float)):
_extract_scalar_value(
state,
"_".join((SIMPLE_MEASURES_PREFIX, name.upper(), sub_name.upper())),
"", sub_value, SIMPLE_MEASURES_LABELS, process_type)
elif isinstance(value, (int, float)):
_extract_scalar_value(
state, "_".join((SIMPLE_MEASURES_PREFIX, name.upper())),
"", value, SIMPLE_MEASURES_LABELS, process_type)
def _extract_scalars(state, process_payloads):
for process in PROCESS_TYPES:
_extract_numeric_scalars(state, process_payloads.get(process, {}).get("scalars", {}), process)
_extract_keyed_numeric_scalars(state, process_payloads.get(process, {}).get("keyedScalars", {}), process)
def _extract_numeric_scalars(state, scalar_dict, process):
if not isinstance(scalar_dict, dict):
return
for name, value in scalar_dict.items():
if not isinstance(value, (int, float)):
continue
if name.startswith("browser.engagement.navigation"):
continue
scalar_name = "_".join((NUMERIC_SCALARS_PREFIX, name.upper()))
_extract_scalar_value(state, scalar_name, "", value, NUMERIC_SCALARS_LABELS, process)
def _extract_keyed_numeric_scalars(state, scalar_dict, process):
if not isinstance(scalar_dict, dict):
return
for name, value in scalar_dict.items():
if not isinstance(value, dict):
continue
if name.startswith("browser.engagement.navigation"):
continue
scalar_name = "_".join((NUMERIC_SCALARS_PREFIX, name.upper()))
for sub_name, sub_value in value.items():
if not isinstance(sub_value, (int, float)):
continue
_extract_scalar_value(state, scalar_name, sub_name.upper(), sub_value, NUMERIC_SCALARS_LABELS, process)
def _extract_scalar_value(state, name, label, value, bucket_labels, process_type="parent"):
if value < 0: # Afaik we are collecting only positive values
return
accessor = (name, label, process_type)
aggregated_histogram = state[accessor]["histogram"] = state[accessor].get("histogram", {})
state[accessor]["sum"] = state[accessor].get("sum", 0) + value
state[accessor]["count"] = state[accessor].get("count", 0) + 1
insert_bucket = bucket_labels[0] # Initialized to underflow bucket
for bucket in reversed(bucket_labels):
if value >= bucket:
insert_bucket = bucket
break
aggregated_histogram[str(insert_bucket)] = aggregated_histogram.get(str(insert_bucket), 0) + 1
def _extract_child_payloads(state, child_payloads):
if not isinstance(child_payloads, (list, tuple)):
return
for child in child_payloads:
_extract_histograms(state, child, "content")
_extract_simple_measures(state, child.get("simpleMeasurements", {}), "content")
def _aggregate_ping(state, ping):
if not isinstance(ping, dict):
return
_extract_scalars(state, ping.get("payload", {}).get("processes", {}))
_extract_histograms(state, ping.get("payload", {}))
_extract_simple_measures(state, ping.get("payload", {}).get("simpleMeasurements", {}))
_extract_child_payloads(state, ping.get("payload", {}).get("childPayloads", {}))
_extract_histograms(state, ping.get("payload", {}).get("processes", {}).get("content", {}), "content")
_extract_histograms(state, ping.get("payload", {}).get("processes", {}).get("gpu", {}), "gpu")
return state
def _aggregate_aggregates(agg1, agg2):
for metric, payload in agg2.items():
if metric not in agg1:
agg1[metric] = payload
continue
agg1[metric]["count"] += payload["count"]
agg1[metric]["sum"] += payload["sum"]
for k, v in payload["histogram"].items():
agg1[metric]["histogram"][k] = agg1[metric]["histogram"].get(k, 0) + v
return agg1
def _trim_payload(payload):
return {k: v for k, v in payload.items()
if k in ["histograms", "keyedHistograms", "simpleMeasurements", "processes"]}
def _map_ping_to_dimensions(ping):
try:
submission_date = ping["meta"]["submissionDate"]
channel = ping["application"]["channel"]
version = ping["application"]["version"].split('.')[0]
build_id = ping["application"]["buildId"][:8]
application = ping["application"]["name"]
architecture = ping["application"]["architecture"]
os = ping["environment"]["system"]["os"]["name"]
os_version = ping["environment"]["system"]["os"]["version"]
if os == "Linux":
os_version = str(os_version)[:3]
try:
int(build_id)
except ValueError:
return None
subset = {}
subset["payload"] = _trim_payload(ping["payload"])
subset["payload"]["childPayloads"] = [_trim_payload(c) for c in ping["payload"].get("childPayloads", [])]
# Note that some dimensions don't vary within a single submissions
# (e.g. channel) while some do (e.g. process type).
# Dimensions that don't vary should appear in the submission key, while
# the ones that do vary should appear within the key of a single metric.
return ((submission_date, channel, version, build_id, application, architecture, os, os_version), subset)
except: # noqa
return None