diff --git a/src/main/scala/com/mozilla/telemetry/streaming/ErrorAggregator.scala b/src/main/scala/com/mozilla/telemetry/streaming/ErrorAggregator.scala index 6e70948..baa72ac 100644 --- a/src/main/scala/com/mozilla/telemetry/streaming/ErrorAggregator.scala +++ b/src/main/scala/com/mozilla/telemetry/streaming/ErrorAggregator.scala @@ -141,37 +141,44 @@ object ErrorAggregator { dimensions.build } - private def parseCrashPing(ping: CrashPing): Tuple1[Row] = { - // Non-main crashes are already retrieved from main pings - if(!ping.isMain()) return Tuple1(null) + class ParsableCrashPing(ping: CrashPing) { + def parse(): Tuple1[Row] = { + // Non-main crashes are already retrieved from main pings + if(!ping.isMain()) return Tuple1(null) - val dimensions = buildDimensions(ping.meta) - val stats = new RowBuilder(statsSchema) - stats("count") = Some(1) - stats("main_crashes") = Some(1) - Tuple1(RowBuilder.merge(dimensions, stats.build)) + val dimensions = buildDimensions(ping.meta) + val stats = new RowBuilder(statsSchema) + stats("count") = Some(1) + stats("main_crashes") = Some(1) + Tuple1(RowBuilder.merge(dimensions, stats.build)) + } } + implicit def ParsableCrashPingToCrashPing(ping: CrashPing) = new ParsableCrashPing(ping) - private def parseMainPing(ping: MainPing): Tuple1[Row] = { - // If a main ping has no usage hours discard it. - val usageHours = ping.usageHours() - if (usageHours == 0) return Tuple1(null) + class ParsableMainPing(ping: MainPing) { + def parse(): Tuple1[Row] = { + // If a main ping has no usage hours discard it. + val usageHours = ping.usageHours() + if (usageHours == 0) return Tuple1(null) - val dimensions = buildDimensions(ping.meta) - val stats = new RowBuilder(statsSchema) - stats("count") = Some(1) - stats("usage_hours") = Some(usageHours) - countHistogramErrorsSchema.fieldNames.foreach(stats_name => { - stats(stats_name) = Some(ping.getCountHistogramValue(stats_name)) - }) - stats("content_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "content")) - stats("gpu_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "gpu")) - stats("plugin_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "plugin")) - stats("gmplugin_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "gmplugin")) - stats("content_shutdown_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_KILL_HARD", "ShutDownKill")) + val dimensions = buildDimensions(ping.meta) + val stats = new RowBuilder(statsSchema) + stats("count") = Some(1) + stats("usage_hours") = Some(usageHours) + countHistogramErrorsSchema.fieldNames.foreach(stats_name => { + stats(stats_name) = Some(ping.getCountHistogramValue(stats_name)) + }) + stats("content_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "content")) + stats("gpu_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "gpu")) + stats("plugin_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "plugin")) + stats("gmplugin_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "gmplugin")) + stats("content_shutdown_crashes") = Some(ping.getCountKeyedHistogramValue("SUBPROCESS_KILL_HARD", "ShutDownKill")) - Tuple1(RowBuilder.merge(dimensions, stats.build)) + Tuple1(RowBuilder.merge(dimensions, stats.build)) + } } + implicit def ParsableCrashPingToMainPing(ping: MainPing) = new ParsableMainPing(ping) + /* We can't use an Option[Row] because entire rows cannot be null in Spark SQL. The best we can do is to resort to Tuple1[Row]. See https://github.com/apache/spark/blob/38b9e69623c14a675b14639e8291f5d29d2a0bc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L53 @@ -185,9 +192,9 @@ object ErrorAggregator { return Tuple1(null) } if(docType == "crash") { - parseCrashPing(messageToCrashPing(message)) + messageToCrashPing(message).parse() } else { - parseMainPing(messageToMainPing(message)) + messageToMainPing(message).parse() } }