diff --git a/mozaggregator/aggregator.py b/mozaggregator/aggregator.py index a8b543d..a5ef825 100644 --- a/mozaggregator/aggregator.py +++ b/mozaggregator/aggregator.py @@ -7,7 +7,7 @@ import sys -from moztelemetry.spark import get_pings +from moztelemetry.dataset import Dataset from moztelemetry.histogram import cached_exponential_buckets from collections import defaultdict @@ -28,8 +28,12 @@ def aggregate_metrics(sc, channels, submission_date, fraction=1): channels = [channels] channels = set(channels) - rdds = [get_pings(sc, channel=ch, submission_date=submission_date, doc_type="saved_session", schema="v4", fraction=fraction) for ch in channels] - pings = reduce(lambda x, y: x.union(y), rdds) + pings = Dataset.from_source('telemetry') \ + .where(appUpdateChannel=lambda x : x in channels, + submissionDate=submission_date, + docType="saved_session", + sourceVersion='4') \ + .records(sc, sample=fraction) return _aggregate_metrics(pings)