Add uptake events to datadog job

This commit is contained in:
Sunah Suh 2019-05-28 22:12:10 -05:00 коммит произвёл Sunah Suh
Родитель 6741ce6c8e
Коммит 05db8901e5
3 изменённых файлов: 185 добавлений и 13 удалений

Просмотреть файл

@ -42,6 +42,10 @@ case class EventPing(application: Application,
def getNormandyEvents: Seq[Event] = {
events.filter(_.category == "normandy")
}
def getUptakeEvents: Seq[Event] = {
events.filter(e => e.category == "normandy" || e.category == "uptake.remotecontent.result")
}
}
object EventPing {

Просмотреть файл

@ -5,15 +5,17 @@ package com.mozilla.telemetry.streaming
import com.mozilla.telemetry.heka.Message
import com.mozilla.telemetry.pings.{EventPing, MainPing}
import com.mozilla.telemetry.monitoring.DogStatsDCounter
import com.mozilla.telemetry.sinks.DogStatsDCounterSink
import com.mozilla.telemetry.monitoring.{DogStatsDMetric}
import com.mozilla.telemetry.sinks.{DogStatsDMetricSink}
import com.mozilla.telemetry.streaming.StreamingJobBase.TelemetryKafkaTopic
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.rogach.scallop.ScallopOption
import scala.util.Try
object ExperimentsEnrollmentsToDatadog extends StreamingJobBase {
override val JobName: String = "experiment_enrollments_to_datadog"
object UptakeEventsToDatadog extends StreamingJobBase {
override val JobName: String = "uptake_events_enrollments_to_datadog"
val kafkaCacheMaxCapacity = 100
@ -23,7 +25,7 @@ object ExperimentsEnrollmentsToDatadog extends StreamingJobBase {
val opts = new Opts(args)
val spark = SparkSession.builder()
.appName("Experiment Enrollments to Datadog")
.appName("Uptake Events to Datadog")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()
@ -40,11 +42,12 @@ object ExperimentsEnrollmentsToDatadog extends StreamingJobBase {
.option("spark.streaming.kafka.consumer.cache.maxCapacity", kafkaCacheMaxCapacity)
.option("subscribe", TelemetryKafkaTopic)
.option("startingOffsets", opts.startingOffsets())
.option("failOnDataLoss", false)
.load()
val writer = new DogStatsDCounterSink("localhost", 8125)
val writer = new DogStatsDMetricSink("localhost", 8125)
eventsToCounter(pings.select("value"), opts.raiseOnError())
eventsToMetrics(pings.select("value"), opts.raiseOnError())
.writeStream
.queryName(QueryName)
.foreach(writer)
@ -53,10 +56,10 @@ object ExperimentsEnrollmentsToDatadog extends StreamingJobBase {
.awaitTermination()
}
private[streaming] def eventsToCounter(messages: DataFrame, raiseOnError: Boolean): Dataset[DogStatsDCounter] = {
private[streaming] def eventsToMetrics(messages: DataFrame, raiseOnError: Boolean): Dataset[DogStatsDMetric] = {
import messages.sparkSession.implicits._
val empty = Array.empty[DogStatsDCounter]
val empty = Array.empty[DogStatsDMetric]
messages.flatMap(v => {
try {
@ -67,20 +70,41 @@ object ExperimentsEnrollmentsToDatadog extends StreamingJobBase {
if (!allowedDocTypes.contains(docType)) {
empty
} else {
val normandyEvents = {
val uptakeEvents = {
if (docType == "main") {
val mainPing = MainPing(m)
mainPing.getNormandyEvents
} else {
val eventPing = EventPing(m)
eventPing.getNormandyEvents
eventPing.getUptakeEvents
}
}
normandyEvents.map { e =>
val normandyCounters = uptakeEvents.filter(_.category == "normandy").map { e =>
val tags = Map("experiment" -> e.value.getOrElse(""), "branch" -> e.extra.flatMap(_.get("branch")).getOrElse(""))
DogStatsDCounter(s"telemetry.${e.category}.${e.`object`}.${e.method}", kvTags = Some(tags))
DogStatsDMetric.makeCounter(s"telemetry.${e.category}.${e.`object`}.${e.method}", kvTags = Some(tags))
}
val uptakeMetrics = uptakeEvents.filter(_.category == "uptake.remotecontent.result").flatMap { e =>
// Split the "source" key in the extra field, if it exists, to tag uptake events (bug 1539249)
val source = e.extra.flatMap(_.get("source").map(_.split("/")))
val source_type = source.flatMap(_.lift(0)).map(s => Seq("source_type" -> s)).getOrElse(Nil)
val source_subtype = source.flatMap(_.lift(1)).map(s => Seq("source_subtype" -> s)).getOrElse((Nil))
val source_details = source.flatMap(_.lift(2)).map(s => Seq("source_details" -> s)).getOrElse(Nil)
val tags = Map(source_type ++ source_subtype ++ source_details:_*)
val metricName = s"telemetry.uptake.${e.`object`}.${e.method}.${e.value.getOrElse("null")}"
// Uptake events all create a counter, and if there's a "duration" or "age" key in the event map field,
// those are sent as timer metrics
val counter = Seq(DogStatsDMetric.makeCounter(metricName, kvTags = Some(tags)))
val duration = e.extra.flatMap(_.get("duration").flatMap(d => Try(d.toInt).toOption)).map(d =>
Seq(DogStatsDMetric.makeTimer(metricName + ".duration", metricValue = d, kvTags = Some(tags)))).getOrElse(Nil)
val age = e.extra.flatMap(_.get("age").flatMap(d => Try(d.toInt).toOption)).map(d =>
Seq(DogStatsDMetric.makeTimer(metricName + ".age", metricValue = d, kvTags = Some(tags)))).getOrElse(Nil)
counter ++ duration ++ age
}
normandyCounters ++ uptakeMetrics
}
} catch {
// TODO: track parse errors

Просмотреть файл

@ -0,0 +1,144 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.streaming
import com.holdenkarau.spark.testing.StructuredStreamingBase
import org.json4s.jackson.Serialization.write
import org.scalatest._
import com.mozilla.telemetry.streaming.UptakeEvents._
class UptakeEventsToDatadogTest extends FlatSpec with Matchers with GivenWhenThen with StructuredStreamingBase with BeforeAndAfterEach {
val k = 2
"UptakeEventsToDatadog" should "parse events into DogStatsDMetric messages" in {
import spark.implicits._
val uptakeEvents = Seq(
NormandyRecipe(123),
NormandyRecipe(123, "custom_2_error"),
NormandyAction("ShowHeartbeatAction"),
NormandyRunner(),
RemoteSettings(None, None, "blocklists/certificates"),
RemoteSettings(Some(4778), None, "settings-changes-monitoring", "success"),
RemoteSettings(None, Some(1401), "main/normandy-recipes", "success")
)
val pings = (
TestUtils.generateEventMessages(k, customPayload=EnrollmentEvents.eventPingEnrollmentEventsJson(EnrollmentEvents.ExperimentA, Some("six"),
enroll = true, process="parent"))
++ TestUtils.generateEventMessages(k, customPayload = eventPingUptakeEventsJson(uptakeEvents))
).map(_.toByteArray).seq
val pingsDf = spark.createDataset(pings).toDF()
val metrics = UptakeEventsToDatadog.eventsToMetrics(pingsDf, raiseOnError = true)
val expected = Seq(
"telemetry.normandy.preference_study.enroll:1|c|#experiment:pref-flip-timer-speed-up-60-1443940,branch:six",
"telemetry.normandy.preference_study.enroll:1|c|#experiment:pref-flip-timer-speed-up-60-1443940,branch:six",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:recipe,source_details:123",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:recipe,source_details:123",
"telemetry.uptake.uptake.normandy.custom_2_error:1|c|#source_type:normandy,source_subtype:recipe,source_details:123",
"telemetry.uptake.uptake.normandy.custom_2_error:1|c|#source_type:normandy,source_subtype:recipe,source_details:123",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:action,source_details:ShowHeartbeatAction",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:action,source_details:ShowHeartbeatAction",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:runner",
"telemetry.uptake.uptake.normandy.success:1|c|#source_type:normandy,source_subtype:runner",
"telemetry.uptake.uptake.remotesettings.up_to_date:1|c|#source_type:blocklists,source_subtype:certificates",
"telemetry.uptake.uptake.remotesettings.success:1|c|#source_type:settings-changes-monitoring",
"telemetry.uptake.uptake.remotesettings.up_to_date:1|c|#source_type:blocklists,source_subtype:certificates",
"telemetry.uptake.uptake.remotesettings.success.age:4778|ms|#source_type:settings-changes-monitoring",
"telemetry.uptake.uptake.remotesettings.success:1|c|#source_type:main,source_subtype:normandy-recipes",
"telemetry.uptake.uptake.remotesettings.success:1|c|#source_type:settings-changes-monitoring",
"telemetry.uptake.uptake.remotesettings.success.duration:1401|ms|#source_type:main,source_subtype:normandy-recipes",
"telemetry.uptake.uptake.remotesettings.success.age:4778|ms|#source_type:settings-changes-monitoring",
"telemetry.uptake.uptake.remotesettings.success:1|c|#source_type:main,source_subtype:normandy-recipes",
"telemetry.uptake.uptake.remotesettings.success.duration:1401|ms|#source_type:main,source_subtype:normandy-recipes"
)
metrics.map(_.format()).collect should contain theSameElementsAs expected
}
it should "handle missing source correctly" in {
import spark.implicits._
val pings = TestUtils.generateEventMessages(
k, customPayload = eventPingUptakeEventsJson(Seq(TestNoSource(), TestNoSource(emptyMap = false)))
).map(_.toByteArray).seq
val pingsDf = spark.createDataset(pings).toDF()
val metrics = UptakeEventsToDatadog.eventsToMetrics(pingsDf, raiseOnError = true)
val expected = Seq(
"telemetry.uptake.uptake.normandy.success:1|c",
"telemetry.uptake.uptake.normandy.success:1|c",
"telemetry.uptake.uptake.normandy.success:1|c",
"telemetry.uptake.uptake.normandy.success:1|c"
)
metrics.map(_.format()).collect should contain theSameElementsAs expected
}
}
object UptakeEvents {
sealed trait UptakeType {
val eventObject: String
val eventStringValue: String
val eventMap: Option[Map[String, String]]
override def toString: String = {
implicit val formats = org.json4s.DefaultFormats
(s"""[1234, "uptake.remotecontent.result", "$eventObject", "uptake", "$eventStringValue""""
+ eventMap.map(m => s", ${write(m)}").getOrElse("") + "]")
}
}
case class NormandyRecipe(id: Int, status: String = "success") extends UptakeType {
override val eventObject = "normandy"
override val eventStringValue = status
override val eventMap = Some(Map("source" -> s"normandy/recipe/$id"))
}
case class NormandyAction(action: String, status: String = "success") extends UptakeType {
override val eventObject = "normandy"
override val eventStringValue = status
override val eventMap = Some(Map("source" -> s"normandy/action/$action"))
}
case class NormandyRunner(status: String = "success") extends UptakeType {
override val eventObject = "normandy"
override val eventStringValue = status
override val eventMap = Some(Map("source" -> "normandy/runner"))
}
case class RemoteSettings(age: Option[Int], duration: Option[Int], source: String, status: String = "up_to_date")
extends UptakeType {
override val eventObject = "remotesettings"
override val eventStringValue = status
override val eventMap = Some((
Seq("source" -> source)
++ age.map(a => Seq("age" -> a.toString)).getOrElse(Nil)
++ duration.map(d => Seq("duration" -> d.toString)).getOrElse(Nil)
).toMap)
}
case class TestNoSource(status: String = "success", emptyMap: Boolean = true) extends UptakeType {
override val eventObject = "normandy"
override val eventStringValue: String = status
// If this codebase ever gets upgraded to scala 2.13 (unlikely), we could use Option.when
override val eventMap = if (emptyMap) Some(Map.empty[String, String]) else None
}
def eventPingUptakeEventsJson(uptakeEvents: Seq[UptakeType]): Option[String] = {
val events = uptakeEvents.map(_.toString).mkString(",")
Some(
s"""
|"reason": "periodic",
|"processStartTimestamp": 1530291900000,
|"sessionId": "dd302e9d-569b-4058-b7e8-02b2ff83522c",
|"subsessionId": "79a2728f-af12-4ed3-b56d-0531a03c2f26",
|"lostEventsCount": 0,
|"events": {
| "parent": [
| $events
| ]
|}
""".stripMargin)
}
}