219 строки
7.7 KiB
Python
219 строки
7.7 KiB
Python
import datetime
|
|
from collections import defaultdict
|
|
from os import environ
|
|
|
|
import pandas as pd
|
|
from pyspark.sql.types import (
|
|
LongType, MapType, StringType, StructField, StructType)
|
|
|
|
from mozaggregator.aggregator import (
|
|
SCALAR_MEASURE_MAP, _aggregate_aggregates, _aggregate_ping,
|
|
_sample_clients, _trim_payload)
|
|
from mozaggregator.db import histogram_revision_map
|
|
from moztelemetry.dataset import Dataset
|
|
from moztelemetry.histogram import Histogram
|
|
|
|
from mozaggregator.bigquery import BigQueryDataset
|
|
|
|
|
|
PATH_BUCKET = environ.get('bucket', 'telemetry-parquet')
|
|
PATH_PREFIX = 'aggregates_poc'
|
|
PATH_VERSION = 'v1'
|
|
|
|
DEFAULT_PATH = 's3://{bucket}/{prefix}/{version}'.format(
|
|
bucket=PATH_BUCKET, prefix=PATH_PREFIX, version=PATH_VERSION
|
|
)
|
|
|
|
SCHEMA = StructType([
|
|
StructField('period', StringType(), False),
|
|
StructField('aggregate_type', StringType(), False),
|
|
StructField('submission_date', StringType(), False),
|
|
StructField('channel', StringType(), False),
|
|
StructField('version', StringType(), False),
|
|
StructField('build_id', StringType(), True),
|
|
StructField('application', StringType(), False),
|
|
StructField('architecture', StringType(), False),
|
|
StructField('os', StringType(), False),
|
|
StructField('os_version', StringType(), False),
|
|
StructField('metric', StringType(), False),
|
|
StructField('key', StringType(), True),
|
|
StructField('process_type', StringType(), False),
|
|
StructField('count', LongType(), False),
|
|
StructField('sum', LongType(), False),
|
|
StructField('histogram', MapType(StringType(), LongType(), False), False),
|
|
])
|
|
BUILD_ID_CUTOFF_UNKNOWN = 45
|
|
BUILD_ID_CUTOFFS = {
|
|
'release': 84,
|
|
'esr': 84,
|
|
'beta': 30,
|
|
'aurora': 30,
|
|
'nightly': 10,
|
|
}
|
|
|
|
|
|
def write_aggregates(sc, aggregates, path=DEFAULT_PATH, mode="append"):
|
|
build_id_agg = aggregates[0].flatMap(lambda row: _explode(row, 'build_id'))
|
|
submission_date_agg = aggregates[1].flatMap(lambda row: _explode(row, 'submission_date'))
|
|
df = sc.createDataFrame(build_id_agg, SCHEMA)
|
|
df = df.union(sc.createDataFrame(submission_date_agg, SCHEMA))
|
|
|
|
(df.repartition('metric', 'aggregate_type', 'period')
|
|
.sortWithinPartitions(['channel', 'version', 'submission_date'])
|
|
.write
|
|
.partitionBy('metric', 'aggregate_type', 'period')
|
|
.parquet(path, mode=mode))
|
|
|
|
|
|
def _explode(row, aggregate_type):
|
|
dimensions, metrics = row
|
|
|
|
period = _period(dimensions[3] if aggregate_type == 'build_id' else dimensions[0])
|
|
|
|
for k, v in metrics.items():
|
|
try:
|
|
histogram = _get_complete_histogram(dimensions[1], k[0], v['histogram'])
|
|
except KeyError:
|
|
continue
|
|
yield (period, aggregate_type,) + dimensions + k + (v['count'], v['sum'], histogram)
|
|
|
|
|
|
def _period(date_str):
|
|
"""
|
|
Returns a period string given a string of "YYYYMMDD".
|
|
|
|
Note: Make sure the return value is sortable as expected as a string, as queries
|
|
against this will likely use `BETWEEN` or other comparisons.
|
|
|
|
"""
|
|
return date_str[:6]
|
|
|
|
|
|
def _get_complete_histogram(channel, metric, values):
|
|
revision = histogram_revision_map[channel]
|
|
|
|
for prefix, labels in SCALAR_MEASURE_MAP.items():
|
|
if metric.startswith(prefix):
|
|
histogram = pd.Series({int(k): v for k, v in values.items()},
|
|
index=labels).fillna(0)
|
|
break
|
|
else:
|
|
histogram = Histogram(metric, {"values": values},
|
|
revision=revision).get_value(autocast=False)
|
|
|
|
return {str(k): int(v) for k, v in histogram.to_dict().items()}
|
|
|
|
|
|
def _aggregate_metrics(pings, num_reducers=10000):
|
|
trimmed = (
|
|
pings.filter(_sample_clients)
|
|
.map(_map_ping_to_dimensions)
|
|
.filter(lambda x: x))
|
|
build_id_aggregates = (
|
|
trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping,
|
|
_aggregate_aggregates, num_reducers))
|
|
submission_date_aggregates = (
|
|
build_id_aggregates.map(_map_build_id_key_to_submission_date_key)
|
|
.reduceByKey(_aggregate_aggregates))
|
|
return build_id_aggregates, submission_date_aggregates
|
|
|
|
|
|
def _map_build_id_key_to_submission_date_key(aggregate):
|
|
# This skips the build_id column and replaces it with `None`.
|
|
return tuple(aggregate[0][:3] + (None,) + aggregate[0][4:]), aggregate[1]
|
|
|
|
|
|
def _map_ping_to_dimensions(ping):
|
|
try:
|
|
submission_date = ping["meta"]["submissionDate"]
|
|
channel = ping["application"]["channel"]
|
|
version = ping["application"]["version"].split('.')[0]
|
|
build_id = ping["application"]["buildId"]
|
|
application = ping["application"]["name"]
|
|
architecture = ping["application"]["architecture"]
|
|
os = ping["environment"]["system"]["os"]["name"]
|
|
os_version = ping["environment"]["system"]["os"]["version"]
|
|
|
|
if os == "Linux":
|
|
os_version = str(os_version)[:3]
|
|
|
|
try:
|
|
build_id_as_date = datetime.datetime.strptime(build_id, '%Y%m%d%H%M%S')
|
|
except ValueError:
|
|
return None
|
|
|
|
# Remove pings with build_id older than the specified cutoff days.
|
|
cutoff = (
|
|
datetime.date.today() -
|
|
datetime.timedelta(days=BUILD_ID_CUTOFFS.get(channel, BUILD_ID_CUTOFF_UNKNOWN)))
|
|
if build_id_as_date.date() <= cutoff:
|
|
return None
|
|
|
|
# TODO: Validate build_id string against the whitelist from build hub.
|
|
|
|
subset = {}
|
|
subset["payload"] = _trim_payload(ping["payload"])
|
|
|
|
# Note that some dimensions don't vary within a single submission
|
|
# (e.g. channel) while some do (e.g. process type).
|
|
# Dimensions that don't vary should appear in the submission key, while
|
|
# the ones that do vary should appear within the key of a single metric.
|
|
return (
|
|
(submission_date, channel, version, build_id, application,
|
|
architecture, os, os_version),
|
|
subset
|
|
)
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
def aggregate_metrics(sc, channels, submission_date, main_ping_fraction=1,
|
|
num_reducers=10000, source="moztelemetry",
|
|
project_id=None, dataset_id=None,
|
|
avro_prefix=None):
|
|
"""
|
|
Returns the build-id and submission date aggregates for a given submission date.
|
|
|
|
:param sc: A SparkContext instance
|
|
:param channel: Either the name of a channel or a list/tuple of names
|
|
:param submission_date: The submission date for which the data will be aggregated
|
|
:param main_ping_fraction: An approximative fraction of submissions to consider for aggregation
|
|
"""
|
|
if not isinstance(channels, (tuple, list)):
|
|
channels = [channels]
|
|
|
|
if source == "bigquery" and project_id and dataset_id:
|
|
dataset = BigQueryDataset()
|
|
pings = dataset.load(
|
|
project_id,
|
|
dataset_id,
|
|
"main",
|
|
submission_date,
|
|
channels,
|
|
"normalized_app_name <> 'Fennec'"
|
|
)
|
|
elif source == "avro" and avro_prefix:
|
|
dataset = BigQueryDataset()
|
|
pings = dataset.load_avro(
|
|
avro_prefix,
|
|
"main",
|
|
submission_date,
|
|
channels,
|
|
"normalized_app_name <> 'Fennec'"
|
|
)
|
|
else:
|
|
channels = set(channels)
|
|
source = 'telemetry'
|
|
where = {
|
|
'appUpdateChannel': lambda x: x in channels,
|
|
'submissionDate': submission_date,
|
|
'sourceVersion': '4',
|
|
}
|
|
pings = (Dataset.from_source(source)
|
|
.where(docType='main',
|
|
appName=lambda x: x != 'Fennec',
|
|
**where)
|
|
.records(sc, sample=main_ping_fraction))
|
|
|
|
return _aggregate_metrics(pings, num_reducers)
|