Write aggregates to parquet (bug 1345064)

This commit is contained in:
Rob Hudson 2018-08-13 13:26:12 -07:00
Родитель de96552e50
Коммит 26d8ee0a4f
1 изменённых файлов: 232 добавлений и 0 удалений

232
mozaggregator/parquet.py Normal file
Просмотреть файл

@ -0,0 +1,232 @@
import datetime
from collections import defaultdict
from os import environ
import pandas as pd
from pyspark.sql.types import (
LongType, MapType, StringType, StructField, StructType)
from mozaggregator.aggregator import (
SCALAR_MEASURE_MAP, _aggregate_aggregates, _aggregate_ping,
_sample_clients, _trim_payload)
from mozaggregator.db import histogram_revision_map
from moztelemetry.dataset import Dataset
from moztelemetry.histogram import Histogram
PATH_BUCKET = environ.get('bucket', 'telemetry-parquet')
PATH_PREFIX = 'aggregates_poc'
PATH_VERSION = 'v1'
SCHEMA = StructType([
StructField('period', StringType(), False),
StructField('aggregate_type', StringType(), False),
StructField('submission_date', StringType(), False),
StructField('channel', StringType(), False),
StructField('version', StringType(), False),
StructField('build_id', StringType(), True),
StructField('application', StringType(), False),
StructField('architecture', StringType(), False),
StructField('os', StringType(), False),
StructField('os_version', StringType(), False),
StructField('metric', StringType(), False),
StructField('key', StringType(), True),
StructField('process_type', StringType(), False),
StructField('count', LongType(), False),
StructField('sum', LongType(), False),
StructField('histogram', MapType(StringType(), LongType(), False), False),
])
BUILD_ID_CUTOFF_UNKNOWN = 45
BUILD_ID_CUTOFFS = {
'release': 84,
'esr': 84,
'beta': 30,
'aurora': 30,
'nightly': 10,
}
def write_aggregates(sc, aggregates, mode):
build_id_agg = aggregates[0].flatMap(lambda row: _explode(row, 'build_id'))
submission_date_agg = aggregates[1].flatMap(lambda row: _explode(row, 'submission_date'))
df = sc.createDataFrame(build_id_agg, SCHEMA)
df = df.union(sc.createDataFrame(submission_date_agg, SCHEMA))
write_parquet(sc, df, mode)
def write_parquet(sc, df, mode):
path = 's3://{bucket}/{prefix}/{version}'.format(
bucket=PATH_BUCKET, prefix=PATH_PREFIX, version=PATH_VERSION)
(df.repartition('metric', 'aggregate_type', 'period')
.sortWithinPartitions(['channel', 'version', 'submission_date'])
.write
.partitionBy('metric', 'aggregate_type', 'period')
.parquet(path, mode=mode))
def _explode(row, aggregate_type):
dimensions, metrics = row
period = _period(dimensions[3] if aggregate_type == 'build_id' else dimensions[0])
for k, v in metrics.iteritems():
try:
histogram = _get_complete_histogram(dimensions[1], k[0], v['histogram'])
except KeyError:
continue
yield (period, aggregate_type,) + dimensions + k + (v['count'], v['sum'], histogram)
def _period(date_str):
"""
Returns a period string given a string of "YYYYMMDD".
Note: Make sure the return value is sortable as expected as a string, as queries
against this will likely use `BETWEEN` or other comparisons.
"""
return date_str[:6]
def _get_complete_histogram(channel, metric, values):
revision = histogram_revision_map[channel]
for prefix, labels in SCALAR_MEASURE_MAP.iteritems():
if metric.startswith(prefix):
histogram = pd.Series({int(k): v for k, v in values.iteritems()},
index=labels).fillna(0)
break
else:
histogram = Histogram(metric, {"values": values},
revision=revision).get_value(autocast=False)
return {str(k): long(v) for k, v in histogram.to_dict().iteritems()}
def _telemetry_enabled(ping):
try:
return ping.get('environment', {}) \
.get('settings', {}) \
.get('telemetryEnabled', False)
except Exception:
return False
def _aggregate_metrics(pings, num_reducers=10000):
trimmed = (
pings.filter(_sample_clients)
.map(_map_ping_to_dimensions)
.filter(lambda x: x))
build_id_aggregates = (
trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping,
_aggregate_aggregates, num_reducers))
submission_date_aggregates = (
build_id_aggregates.map(_map_build_id_key_to_submission_date_key)
.reduceByKey(_aggregate_aggregates))
return build_id_aggregates, submission_date_aggregates
def _map_build_id_key_to_submission_date_key(aggregate):
# This skips the build_id column and replaces it with `None`.
return tuple(aggregate[0][:3] + (None,) + aggregate[0][4:]), aggregate[1]
def _map_ping_to_dimensions(ping):
try:
submission_date = ping["meta"]["submissionDate"]
channel = ping["application"]["channel"]
version = ping["application"]["version"].split('.')[0]
build_id = ping["application"]["buildId"]
application = ping["application"]["name"]
architecture = ping["application"]["architecture"]
os = ping["environment"]["system"]["os"]["name"]
os_version = ping["environment"]["system"]["os"]["version"]
if os == "Linux":
os_version = str(os_version)[:3]
try:
build_id_as_date = datetime.datetime.strptime(build_id, '%Y%m%d%H%M%S')
except ValueError:
return None
# Remove pings with build_id older than the specified cutoff days.
cutoff = (
datetime.date.today() -
datetime.timedelta(days=BUILD_ID_CUTOFFS.get(channel, BUILD_ID_CUTOFF_UNKNOWN)))
if build_id_as_date.date() <= cutoff:
return None
# TODO: Validate build_id string against the whitelist from build hub.
subset = {}
subset["payload"] = _trim_payload(ping["payload"])
# Note that some dimensions don't vary within a single submission
# (e.g. channel) while some do (e.g. process type).
# Dimensions that don't vary should appear in the submission key, while
# the ones that do vary should appear within the key of a single metric.
return (
(submission_date, channel, version, build_id, application,
architecture, os, os_version),
subset
)
except KeyError:
return None
def aggregate_metrics(sc, channels, submission_date, main_ping_fraction=1,
fennec_ping_fraction=1, num_reducers=10000):
"""
Returns the build-id and submission date aggregates for a given submission date.
:param sc: A SparkContext instance
:param channel: Either the name of a channel or a list/tuple of names
:param submission_date: The submission date for which the data will be aggregated
:param main_ping_fraction: An approximative fraction of submissions to consider for aggregation
:param fennec_ping_fraction: An approximative fraction of submissions to consider for aggregation
"""
if not isinstance(channels, (tuple, list)):
channels = [channels]
channels = set(channels)
source = 'telemetry'
where = {
'appUpdateChannel': lambda x: x in channels,
'submissionDate': submission_date,
'sourceVersion': '4',
}
pings = (Dataset.from_source(source)
.where(docType='main',
appName=lambda x: x != 'Fennec',
**where)
.records(sc, sample=main_ping_fraction)
.filter(_telemetry_enabled))
fennec_pings = (Dataset.from_source(source)
.where(docType='saved_session',
appName='Fennec',
**where)
.records(sc, sample=fennec_ping_fraction))
all_pings = pings.union(fennec_pings)
return _aggregate_metrics(all_pings)
if __name__ == '__main__':
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('tmo-poc').getOrCreate()
process_date = environ.get('date')
if not process_date:
# If no date in environment, use yesterday's date.
process_date = (datetime.date.today() -
datetime.timedelta(days=1)).strftime('%Y%m%d')
aggs = aggregate_metrics(sparkSession.sparkContext, 'nightly', process_date)
write_aggregates(sparkSession, aggs, 'append')
# TODO: Merge parquet files.