Bug 1474987 Add "session split" meta-event
This commit is contained in:
Родитель
b106fae866
Коммит
04e5238b56
|
@ -13,6 +13,15 @@ Mozilla.
|
|||
|
||||
Please file bugs in the [Datasets: Error Aggregates](https://bugzilla.mozilla.org/enter_bug.cgi?product=Data%20Platform%20and%20Tools&component=Datasets%3A%20Error%20Aggregates) component.
|
||||
|
||||
## Amplitude Event Configuration
|
||||
|
||||
Some of the jobs defined in `telemetry-streaming` exist to transform telemetry events
|
||||
and republish to [Amplitude](https://amplitude.com/) for further analysis.
|
||||
Filtering and transforming events is accomplished via JSON configurations.
|
||||
If you're creating or updating such a schema, see:
|
||||
|
||||
- [Amplitude event configuration docs](docs/amplitude)
|
||||
|
||||
## Development
|
||||
|
||||
The recommended workflow for running tests is to use your favorite editor for editing
|
||||
|
|
|
@ -4,9 +4,6 @@
|
|||
"docType": [
|
||||
"main"
|
||||
],
|
||||
"appName": [
|
||||
"Firefox"
|
||||
],
|
||||
"experimentId": [
|
||||
"pref-flip-savant-1457226-de-existing-users",
|
||||
"pref-flip-savant-1457226-de-new-users",
|
||||
|
@ -15,6 +12,49 @@
|
|||
]
|
||||
},
|
||||
"eventGroups": [
|
||||
{
|
||||
"eventGroupName": "Meta",
|
||||
"events": [
|
||||
{
|
||||
"name": "session split",
|
||||
"description": "a ping was sent defining a subsession",
|
||||
"amplitudeProperties": {
|
||||
"subsession_length": "extra.subsession_length",
|
||||
"active_ticks": "extra.active_ticks",
|
||||
"uri_count": "extra.uri_count",
|
||||
"search_count": "extra.search_count",
|
||||
"reason": "extra.reason"
|
||||
},
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"timestamp": {
|
||||
"type": "number",
|
||||
"minimum": 0
|
||||
},
|
||||
"category": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"meta"
|
||||
]
|
||||
},
|
||||
"method": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"session_split"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"timestamp",
|
||||
"category",
|
||||
"method"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"eventGroupName": "SAVANT",
|
||||
"events": [
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
# Amplitude Event Configuration
|
||||
|
||||
This repository contains an
|
||||
[EventsToAmplitude](https://github.com/mozilla/telemetry-streaming/blob/master/src/main/scala/com/mozilla/telemetry/streaming/EventsToAmplitude.scala)
|
||||
class which defines a job for taking in telemetry events, transforming them,
|
||||
and publishing to Amplitude.
|
||||
|
||||
The particular filters and transformations applied are controlled via
|
||||
a JSON configuration file in the top-level `config` directory of this repository.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Amplitude documentation](https://amplitude.zendesk.com/hc/en-us)
|
||||
- [Amplitude HTTP API documentation](https://amplitude.zendesk.com/hc/en-us/articles/204771828-HTTP-API)
|
||||
- [Mozilla internal Amplitude Onboarding Guide (Mana)](https://mana.mozilla.org/wiki/display/BIDW/Amplitude+Onboarding+Guideline)
|
||||
|
||||
## Destination Projects in Amplitude
|
||||
|
||||
We can define many "projects" in Amplitude to receive different sets of events.
|
||||
Generally, one JSON configuration will correspond to one destination project.
|
||||
|
||||
The destination project is determined by which API key we use when making HTTP
|
||||
calls to Amplitude. The job looks for the API key as an environment variable,
|
||||
and it's generally the responsibility of the data engineer helping to deploy your
|
||||
job to make sure the job is invoked with the right API key for your project.
|
||||
|
||||
## Meta events
|
||||
|
||||
The events that flow through your schema are determined by the source batch files
|
||||
or Kafka topic. Generally, there's a one-to-one relation between entries
|
||||
appearing in the `events` field of incoming pings and events presented to the job.
|
||||
|
||||
For main pings, though, we also inject a special "meta event" for each ping.
|
||||
This event is prepended to the `events` list inside the ping and contains
|
||||
some summary info about the subsession that the ping represents.
|
||||
This event has a category of "meta" and a method of "session_split".
|
||||
The event will be ignored by default. If you want to send this event to Amplitude,
|
||||
you'll need to add an entry in your JSON configuration that matches the category
|
||||
and method.
|
||||
|
||||
## Schema for JSON Configuration Files
|
||||
|
||||
The schema for JSON configuration files is defined in
|
||||
[schemaFileSchema.json](https://github.com/mozilla/telemetry-streaming/blob/master/src/main/resources/schemas/schemaFileSchema.json).
|
||||
We describe the meaning of each of the fields in the following subsections.
|
||||
|
||||
### source
|
||||
|
||||
`source` is a string like `"source": "telemetry"` defining which raw ping dataset
|
||||
to use when the job runs in batch mode.
|
||||
Available datasets are listed in [sources.json](https://console.aws.amazon.com/s3/object/net-mozaws-prod-us-west-2-pipeline-metadata/sources.json?region=us-east-1&tab=overview) in S3.
|
||||
Note that each dataset is partitioned into files by a different set of fields
|
||||
and performance for the job in batch mode can be greatly improved by configuring
|
||||
efficient filters on those partitioning fields in the `filters` section of the config.
|
||||
|
||||
### filters
|
||||
|
||||
`filters` is a map of fieldNames to lists of acceptable values. Consider:
|
||||
|
||||
```json
|
||||
"docType": [
|
||||
"main"
|
||||
]
|
||||
```
|
||||
|
||||
This filter will ensure that we only consider pings where the `docType` is `main`
|
||||
(otherwise known as "main pings").
|
||||
If we add additional entries to that list, they are treated as a logical OR, so we
|
||||
would consider events with any of the listed docTypes.
|
||||
In batch mode, filters will be applied both to the content of the pings and
|
||||
to the partitioning fields. Configuring filters on partitioning fields of the source dataset
|
||||
means that we're able to greatly reduce the set of files that need to be read for the
|
||||
job to complete, greatly reducing runtime and cost.
|
||||
You can also configure filters for fields within the ping that aren't used for partitioning
|
||||
and in this case we can only apply the filter after the ping has been read from disk
|
||||
and we have access to its contents.
|
||||
For streaming mode, there is no concept of partitioning fields, so filters are always
|
||||
applied based solely on the contents of the pings.
|
||||
|
||||
### eventGroups
|
||||
|
||||
`eventGroups` is where the transformation logic is defined that determines what
|
||||
the events that make it to Amplitude look like. It has a more complex structure
|
||||
than the other top-level fields.
|
||||
|
||||
Each group in this list has an `eventGroupName` and a list of events. The name of an
|
||||
event in Amplitude will incorporate the group name like `eventGroupName - eventName`.
|
||||
|
||||
The next section describes the structure of each `event` entry.
|
||||
|
||||
### event
|
||||
|
||||
`name` determines the name of the event as seen in Amplitude.
|
||||
|
||||
`description` is a comment field. It doesn't get sent to Amplitude, but rather
|
||||
exists as an opportunity for you to document the purpose of the event or any
|
||||
additional caveats for future maintainters of the schema.
|
||||
|
||||
`sessionIdOffset` is an optional field created for the `devTools` job that gives
|
||||
you a chance to add an offset to the session start time, which is what Amplitude uses for sessionId.
|
||||
This allows projects where a Firefox session is insufficient to identify a usage stream to disambiguate multiple
|
||||
sessions. For instance, in the devtools example, many users will open multiple devtools panes in the course of
|
||||
a single Firefox session, sometimes concurrently. Devtools events contain an extra field which is the time in
|
||||
milliseconds since the start of the Firefox session when that particular pane was opened, which is then added to the
|
||||
Firefox session start time.
|
||||
|
||||
`amplitudeProperties` is an optional map of `eventPropertyName` to `sourceField`.
|
||||
The values configured here will be added as `event_properties` when we call
|
||||
Amplitude's [HTTP API](https://amplitude.zendesk.com/hc/en-us/articles/204771828-HTTP-API).
|
||||
The `eventPropertyName` will be used as the key in `event_properties` while `sourceField`
|
||||
is used to look up an appropriate value; see more on `sourceField` in a subsection below.
|
||||
|
||||
`userProperties` is an optional map very similar to `amplitudeProperties`, but
|
||||
entries here will be set in the `user_properties` map we send to amplitude rather
|
||||
than `event_properties`.
|
||||
|
||||
`schema` is a required field that defines which events match this entry.
|
||||
Any event that does not have the `required` fields populated will not match that entry.
|
||||
`category`, `method`, and `object` are the "coordinates" sent with every event
|
||||
and allow you to define lists of values to consider. Values within a list are
|
||||
treated as a logical OR,
|
||||
but the various fields are combined via a logical AND; for example, if you configure both
|
||||
`category` and `method` in a schema, an event must match both
|
||||
one of the listed categories and one of the listed methods in order to match.
|
||||
|
||||
#### sourceField
|
||||
|
||||
`sessionIdOffset`, `amplitudeProperties`, and `userProperties` all accept
|
||||
string values that are interpreted as looking up a field in the source event.
|
||||
|
||||
The following values will look up the corresponding top-level field of the event:
|
||||
|
||||
- timestamp
|
||||
- category
|
||||
- method
|
||||
- object
|
||||
- value
|
||||
|
||||
Events can also optionally have an `extra` object that's a map of key-value pairs.
|
||||
To look up a key from `extra` named `foo`, use:
|
||||
|
||||
- extra.foo
|
||||
|
||||
Finally, you can inject a literal value that's not looked up from the event.
|
||||
To inject the literal string "foo" as the value, use:
|
||||
|
||||
- literal.foo
|
|
@ -61,6 +61,13 @@ case class MainPing(application: Application,
|
|||
}
|
||||
}
|
||||
|
||||
def getScalarValue(processType: String, scalarName: String): Option[Long] = {
|
||||
this.payload.processes \ processType \ "scalars" \ scalarName match {
|
||||
case JInt(v) => Some(v.toLong)
|
||||
case _ => None
|
||||
}
|
||||
}
|
||||
|
||||
def usageHours: Option[Float] = {
|
||||
val max_hours = 25
|
||||
val min_hours = 0
|
||||
|
@ -74,6 +81,25 @@ case class MainPing(application: Application,
|
|||
}
|
||||
}
|
||||
|
||||
def searchCount: Long = meta.`payload.keyedHistograms` \ "SEARCH_COUNTS" match {
|
||||
case JObject(hists) =>
|
||||
hists
|
||||
.filter { case (name, _) =>
|
||||
name.split('.').toList match {
|
||||
case _ :: source :: _ => MainPing.directSearchSources.contains(source)
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
.foldLeft(0L) { case (sum, (name, hist)) =>
|
||||
val count = hist \ "sum" match {
|
||||
case JInt(x) => x.toLong
|
||||
case _ => 0L
|
||||
}
|
||||
sum + count
|
||||
}
|
||||
case _ => 0L
|
||||
}
|
||||
|
||||
/*
|
||||
* firstPaint is tricky because we only want to know this value if it
|
||||
* comes from the first subsession.
|
||||
|
@ -107,6 +133,21 @@ case class MainPing(application: Application,
|
|||
dynamicProcessEvents.filter(_.category == "normandy")
|
||||
}
|
||||
|
||||
override def sessionSplitEvents: Seq[Event] = {
|
||||
val extra: Map[String, String] = Map(
|
||||
"subsession_length" -> Some(subsessionLength),
|
||||
"active_ticks" -> activeTicks,
|
||||
"uri_count" -> getScalarValue("parent", "browser.engagement.total_uri_count"),
|
||||
"search_count" -> Some(searchCount),
|
||||
"reason" -> reason)
|
||||
.flatMap {
|
||||
case (k, Some(v)) => Some(k -> v.toString)
|
||||
case _ => None
|
||||
}
|
||||
val event = Event(sessionLength, "meta", "session_split", "", None, Some(extra))
|
||||
Seq(event)
|
||||
}
|
||||
|
||||
// Make events lazy so we don't extract them in error aggregates, which doesn't use them
|
||||
lazy val events: Seq[Event] = Ping.extractEvents(payload.processes, MainPing.eventLocations())
|
||||
|
||||
|
@ -119,10 +160,33 @@ case class MainPing(application: Application,
|
|||
// sessionStartDate is truncated to the hour anyway, so we just want to get somewhere near the correct timeframe
|
||||
case _ => ((meta.Timestamp / 1e9) - events.map(_.timestamp).max).toLong
|
||||
}
|
||||
|
||||
def sessionLength: Int = meta.`payload.info` \ "sessionLength" match {
|
||||
case JInt(v) => v.toInt
|
||||
case _ => 0
|
||||
}
|
||||
|
||||
def subsessionLength: Int = meta.`payload.info` \ "subsessionLength" match {
|
||||
case JInt(v) => v.toInt
|
||||
case _ => 0
|
||||
}
|
||||
|
||||
def activeTicks: Option[Long] = meta.`payload.simpleMeasurements` \ "activeTicks" match {
|
||||
case JInt(v) => Some(v.toLong)
|
||||
case _ => None
|
||||
}
|
||||
|
||||
def reason: Option[String] = meta.`payload.info` \ "reason" match {
|
||||
case JString(v) => Some(v)
|
||||
case _ => None
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object MainPing {
|
||||
|
||||
val directSearchSources = Set("urlbar", "searchbar", "newtab", "abouthome", "contextmenu", "system")
|
||||
|
||||
def apply(message: Message): MainPing = {
|
||||
implicit val formats = DefaultFormats
|
||||
val jsonFieldNames = List(
|
||||
|
|
|
@ -17,7 +17,7 @@ import org.json4s.JsonDSL._
|
|||
import org.json4s.jackson.JsonMethods.{parse, _}
|
||||
import org.json4s.{DefaultFormats, Extraction, JArray, JField, JNull, JObject, JValue, _}
|
||||
|
||||
import scala.util.Try
|
||||
import scala.util.{Success, Try}
|
||||
|
||||
trait Ping {
|
||||
val meta: Meta
|
||||
|
@ -297,8 +297,16 @@ trait SendsToAmplitude {
|
|||
|
||||
def pingAmplitudeProperties: JObject = JObject()
|
||||
|
||||
/**
|
||||
* Subclasses may override this to provide one or more "pseudo-events" containing ping-level information.
|
||||
*/
|
||||
def sessionSplitEvents: Seq[Event] = Seq.empty
|
||||
|
||||
def eventToAmplitudeEvent(eventGroup: String, e: Event, es: AmplitudeEvent): JObject = {
|
||||
val sessionIdOffset = Try(es.sessionIdOffset.map(o => e.getField(o).toLong).getOrElse(0L)).getOrElse(0L)
|
||||
val sessionIdOffset = Try(es.sessionIdOffset.map(o => e.getField(o).toLong)) match {
|
||||
case Success(Some(x)) => x
|
||||
case _ => 0L
|
||||
}
|
||||
|
||||
pingAmplitudeProperties merge
|
||||
("session_id" -> (sessionStart + sessionIdOffset)) ~
|
||||
|
@ -320,7 +328,8 @@ trait SendsToAmplitude {
|
|||
val factory = JsonSchemaFactory.byDefault
|
||||
val schemas = config.eventGroups.flatMap(g => g.events.map(e => factory.getJsonSchema(asJsonNode(e.schema))))
|
||||
|
||||
val eventsList = events.map{ e => e -> asJsonNode(Extraction.decompose(e)): (Event, JsonNode) }
|
||||
val eventsList = (sessionSplitEvents ++ events)
|
||||
.map{ e => e -> asJsonNode(Extraction.decompose(e)): (Event, JsonNode) }
|
||||
.map{ case(e, es) => // for each event, try each schema
|
||||
e -> schemas.map( ts => ts.validateUnchecked(es).isSuccess )
|
||||
.zip(config.eventGroups.flatMap(g => g.events.map((g.eventGroupName, _))))
|
||||
|
|
|
@ -115,12 +115,18 @@ object EventPingEvents extends StreamingJobBase {
|
|||
val ping = EventPing(m)
|
||||
ping.processEventMap.flatMap { case (process, events) =>
|
||||
events.map { e =>
|
||||
// Serializing a Map[String, Any] causes problems: https://issues.apache.org/jira/browse/SPARK-23251
|
||||
val extra: Option[Map[String, String]] =
|
||||
e.extra.map(
|
||||
_.map { case (k, v) =>
|
||||
k -> v.toString
|
||||
}.toMap
|
||||
)
|
||||
EventRow(ping.meta.documentId.get, ping.meta.clientId.get, ping.meta.normalizedChannel, ping.meta.geoCountry,
|
||||
ping.getLocale, ping.meta.appName, ping.meta.appVersion, ping.getOsName, ping.getOsVersion,
|
||||
ping.payload.sessionId, ping.payload.subsessionId, ping.sessionStart,
|
||||
(ping.meta.Timestamp / 1e9).toLong, ping.meta.sampleId.map(_.toString), ping.getMSStyleExperiments,
|
||||
e.timestamp, e.category, e.method, e.`object`, e.value, e.extra, process)
|
||||
|
||||
e.timestamp, e.category, e.method, e.`object`, e.value, extra, process)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ import scala.io.Source
|
|||
*/
|
||||
object EventsToAmplitude extends StreamingJobBase {
|
||||
|
||||
val log: org.apache.log4j.Logger = org.apache.log4j.LogManager.getLogger(this.getClass.getName)
|
||||
|
||||
val AMPLITUDE_API_KEY_KEY = "AMPLITUDE_API_KEY"
|
||||
|
||||
// Maps source dataset name to partition fields.
|
||||
|
@ -117,11 +119,8 @@ object EventsToAmplitude extends StreamingJobBase {
|
|||
eventGroups: Seq[AmplitudeEventGroup]) {
|
||||
|
||||
def getBatchFilters: Map[String, List[String]] = {
|
||||
filters.map{ case(k, v) => k ->
|
||||
(k match {
|
||||
case "docType" => v.map(_.replace("-", "_"))
|
||||
case _ => v
|
||||
})
|
||||
filters.map { case (k, v) =>
|
||||
k -> v.map(_.replace("-", "_"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,7 +131,7 @@ object EventsToAmplitude extends StreamingJobBase {
|
|||
}
|
||||
|
||||
val nonTopLevelFilters: Map[String, List[String]] = filters.filter {
|
||||
case(name, _) => !topLevelFilters.contains(name)
|
||||
case(name, _) => !topLevelPingFields.contains(name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,6 +140,12 @@ object EventsToAmplitude extends StreamingJobBase {
|
|||
val fields = message.fieldsAsMap
|
||||
|
||||
config.topLevelFilters
|
||||
.filter { case (name, _) =>
|
||||
// Some top-level fields exist in the partition structure of batch data, but not in the ping messages;
|
||||
// such filters get applied in batch mode when we create the Dataset, but we have to skip them here,
|
||||
// so they won't apply to streaming mode.
|
||||
fields.keySet.contains(name)
|
||||
}
|
||||
.map{ case(name, allowedVals) =>
|
||||
allowedVals.contains(fields.getOrElse(name, "").asInstanceOf[String])
|
||||
}.reduce(_ & _) match {
|
||||
|
@ -160,7 +165,9 @@ object EventsToAmplitude extends StreamingJobBase {
|
|||
try {
|
||||
parsePing(Message.parseFrom(v.get(0).asInstanceOf[Array[Byte]]), sample, config)
|
||||
} catch {
|
||||
case _: Throwable if !raiseOnError => Array[String]()
|
||||
case ex: Throwable if !raiseOnError =>
|
||||
log.error("Encountered an error; skipping this ping!", ex)
|
||||
Array[String]()
|
||||
}
|
||||
}).as[String]
|
||||
}
|
||||
|
@ -241,6 +248,8 @@ object EventsToAmplitude extends StreamingJobBase {
|
|||
val minDelay = opts.minDelay()
|
||||
val url = opts.url()
|
||||
|
||||
log.info(s"Processing events for ${pingsDataFrame.count()} pings on $currentDate")
|
||||
|
||||
getEvents(config, pingsDataFrame, opts.sample(), opts.raiseOnError())
|
||||
.repartition(maxParallelRequests)
|
||||
.foreachPartition{ it: Iterator[String] =>
|
||||
|
|
|
@ -10,6 +10,49 @@
|
|||
]
|
||||
},
|
||||
"eventGroups": [
|
||||
{
|
||||
"eventGroupName": "Meta",
|
||||
"events": [
|
||||
{
|
||||
"name": "session split",
|
||||
"description": "a ping was sent defining a subsession",
|
||||
"amplitudeProperties": {
|
||||
"subsession_length": "extra.subsession_length",
|
||||
"active_ticks": "extra.active_ticks",
|
||||
"uri_count": "extra.uri_count",
|
||||
"search_count": "extra.search_count",
|
||||
"reason": "extra.reason"
|
||||
},
|
||||
"schema": {
|
||||
"$schema": "http://json-schema.org/schema#",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"timestamp": {
|
||||
"type": "number",
|
||||
"minimum": 0
|
||||
},
|
||||
"category": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"meta"
|
||||
]
|
||||
},
|
||||
"method": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"session_split"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"timestamp",
|
||||
"category",
|
||||
"method"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"eventGroupName": "main_ping",
|
||||
"events": [
|
||||
|
|
|
@ -185,9 +185,30 @@ object TestUtils {
|
|||
| },
|
||||
| "SUBPROCESS_KILL_HARD": {
|
||||
| "ShutDownKill": {"values": {"0": 1}}
|
||||
| },
|
||||
| "SEARCH_COUNTS": {
|
||||
| "ddg.urlbar": {
|
||||
| "range": [
|
||||
| 1,
|
||||
| 2
|
||||
| ],
|
||||
| "bucket_count": 3,
|
||||
| "histogram_type": 4,
|
||||
| "values": {
|
||||
| "0": 4,
|
||||
| "1": 0
|
||||
| },
|
||||
| "sum": 4
|
||||
| }
|
||||
| }
|
||||
|}""".stripMargin,
|
||||
"payload.simpleMeasurements" -> """{"firstPaint": 1200}""",
|
||||
"payload.simpleMeasurements" ->
|
||||
"""
|
||||
|{
|
||||
| "activeTicks": 2000,
|
||||
| "firstPaint": 1200
|
||||
|}
|
||||
""".stripMargin,
|
||||
"payload.info" ->
|
||||
"""
|
||||
|{
|
||||
|
|
|
@ -12,13 +12,14 @@ class FocusEventPingTest extends FlatSpec with Matchers{
|
|||
val message = TestUtils.generateFocusEventMessages(1).head
|
||||
val ping = SendsToAmplitude(message)
|
||||
val ts = TestUtils.testTimestampMillis
|
||||
val config = Config("telemetry", Map.empty, Nil)
|
||||
|
||||
"Focus Event Ping" can "read events" in {
|
||||
ping.events should contain (Event(176078022, "action", "foreground", "app", None, None))
|
||||
}
|
||||
|
||||
it can "correctly sample itself" in {
|
||||
val noFilters = Config("telemetry", Map.empty, Nil)
|
||||
val noFilters = config.copy(filters = Map.empty)
|
||||
|
||||
ping.includePing(0.0, noFilters) should be (false)
|
||||
ping.includePing(0.72, noFilters) should be (false)
|
||||
|
@ -27,22 +28,22 @@ class FocusEventPingTest extends FlatSpec with Matchers{
|
|||
}
|
||||
|
||||
it can "correctly filter itself" in {
|
||||
val notIncluded = Config("telemetry", Map("os" -> List("iOS")), Nil)
|
||||
val notIncluded = config.copy(filters = Map("os" -> List("iOS")))
|
||||
ping.includePing(1.0, notIncluded) should be (false)
|
||||
}
|
||||
|
||||
it can "correctly include itself" in {
|
||||
val included = Config("telemetry", Map("os" -> List("Android")), Nil)
|
||||
val included = config.copy(filters = Map("os" -> List("Android")))
|
||||
ping.includePing(1.0, included) should be (true)
|
||||
}
|
||||
|
||||
it can "throw on non-existent properties in filter" in {
|
||||
val included = Config("telemetry", Map("nonexistent" -> List("Android")), Nil)
|
||||
val included = config.copy(filters = Map("nonexistent" -> List("Android")))
|
||||
an [NoSuchElementException] should be thrownBy ping.includePing(1.0, included) // scalastyle:ignore
|
||||
}
|
||||
|
||||
it can "filter on non-string properties" in {
|
||||
val included = Config("telemetry", Map("created" -> List("1506024685632")), Nil)
|
||||
val included = config.copy(filters = Map("created" -> List("1506024685632")))
|
||||
ping.includePing(1.0, included) should be (true)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,9 +27,25 @@ class PingsTest extends FlatSpec with Matchers{
|
|||
| }
|
||||
| }
|
||||
| }
|
||||
| },
|
||||
| "parent": {
|
||||
| "scalars": {
|
||||
| "media.page_count": 4,
|
||||
| "browser.engagement.unique_domains_count": 7,
|
||||
| "browser.engagement.tab_open_event_count": 11,
|
||||
| "browser.engagement.max_concurrent_window_count": 2,
|
||||
| "browser.engagement.max_concurrent_tab_count": 21,
|
||||
| "browser.engagement.unfiltered_uri_count": 128,
|
||||
| "browser.engagement.window_open_event_count": 1,
|
||||
| "browser.errors.collected_with_stack_count": 37,
|
||||
| "browser.engagement.total_uri_count": 63,
|
||||
| "browser.errors.collected_count": 181,
|
||||
| "browser.engagement.active_ticks": 271
|
||||
| }
|
||||
| }
|
||||
|}
|
||||
""".stripMargin)
|
||||
|""".stripMargin)
|
||||
|
||||
val message = TestUtils.generateMainMessages(1, customPayload = ContentHistogramPayload).head
|
||||
val mainPing = MainPing(message)
|
||||
val ts = TestUtils.testTimestampMillis
|
||||
|
@ -45,6 +61,10 @@ class PingsTest extends FlatSpec with Matchers{
|
|||
mainPing.getCountKeyedHistogramValue("SUBPROCESS_CRASHES_WITH_DUMP", "content").get should be (1)
|
||||
}
|
||||
|
||||
it should "return the value of a process scalar" in {
|
||||
mainPing.getScalarValue("parent", "browser.engagement.total_uri_count").head should be (63)
|
||||
}
|
||||
|
||||
it should "return the value of its usage hours" in {
|
||||
mainPing.usageHours.get should be (1.0)
|
||||
val messageNoUsageHours = TestUtils.generateMainMessages(1, Some(Map("payload.info" -> "{}"))).head
|
||||
|
|
|
@ -90,6 +90,19 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
| }
|
||||
""".stripMargin
|
||||
|
||||
private val pingSentJson =
|
||||
s"""
|
||||
|{
|
||||
| "event_type": "Meta - session split",
|
||||
| "event_properties": {
|
||||
| "subsession_length": "3600",
|
||||
| "active_ticks": "2000",
|
||||
| "uri_count": "63",
|
||||
| "search_count": "4"
|
||||
| }
|
||||
|}
|
||||
""".stripMargin
|
||||
|
||||
// these keys are present in all events, but values differ
|
||||
private val requiredKeys = "session_id" :: "insert_id" :: "time" :: Nil
|
||||
|
||||
|
@ -125,6 +138,7 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
)
|
||||
|
||||
private val mainPingJsonMatch = JArray(
|
||||
parse(pingSentJson) +:
|
||||
eventsJson("main_ping").map {
|
||||
e => parse(e) merge parse(mainPingJson)
|
||||
}
|
||||
|
@ -196,7 +210,7 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
verify(expectedTotalMsgs, createMatcher(focusEventJsonMatch))
|
||||
}
|
||||
|
||||
"HTTPSink" should "send main ping events correctly" in {
|
||||
it should "send main ping events correctly" in {
|
||||
val config = EventsToAmplitude.readConfigFile(configFilePath(MainEventsConfigFile))
|
||||
val msgs = TestUtils.generateMainMessages(expectedTotalMsgs,
|
||||
customPayload=EventsToAmplitudeTest.CustomMainPingPayload)
|
||||
|
@ -262,7 +276,7 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
verify(expectedTotalMsgs, createMatcher(focusEventJsonMatch))
|
||||
}
|
||||
|
||||
"Events to Amplitude" should "send main ping events via HTTP request" taggedAs(Kafka.DockerComposeTag, DockerEventsTag, DockerMainEvents) in {
|
||||
it should "send main ping events via HTTP request" taggedAs(Kafka.DockerComposeTag, DockerEventsTag, DockerMainEvents) in {
|
||||
Kafka.createTopic(StreamingJobBase.TelemetryKafkaTopic)
|
||||
val kafkaProducer = Kafka.makeProducer(StreamingJobBase.TelemetryKafkaTopic)
|
||||
|
||||
|
@ -314,7 +328,7 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
verify(expectedTotalMsgs, createMatcher(mainPingJsonMatch))
|
||||
}
|
||||
|
||||
"Events to Amplitude" should "ignore pings without events" taggedAs(Kafka.DockerComposeTag, DockerEventsTag, DockerMainEvents) in {
|
||||
it should "ignore pings without events" taggedAs(Kafka.DockerComposeTag, DockerEventsTag, DockerMainEvents) in {
|
||||
Kafka.createTopic(TelemetryKafkaTopic)
|
||||
val kafkaProducer = Kafka.makeProducer(TelemetryKafkaTopic)
|
||||
|
||||
|
@ -322,7 +336,7 @@ class EventsToAmplitudeTest extends FlatSpec with Matchers with BeforeAndAfterAl
|
|||
rs.foreach{ kafkaProducer.send(_, synchronous = true) }
|
||||
}
|
||||
|
||||
val messages = (TestUtils.generateMainMessages(expectedTotalMsgs))
|
||||
val messages = (TestUtils.generateFocusEventMessages(expectedTotalMsgs))
|
||||
.map(_.toByteArray)
|
||||
|
||||
val listener = new StreamingQueryListener {
|
||||
|
@ -389,6 +403,19 @@ object EventsToAmplitudeTest {
|
|||
"""
|
||||
| "processes": {
|
||||
| "parent": {
|
||||
| "scalars": {
|
||||
| "media.page_count": 4,
|
||||
| "browser.engagement.unique_domains_count": 7,
|
||||
| "browser.engagement.tab_open_event_count": 11,
|
||||
| "browser.engagement.max_concurrent_window_count": 2,
|
||||
| "browser.engagement.max_concurrent_tab_count": 21,
|
||||
| "browser.engagement.unfiltered_uri_count": 128,
|
||||
| "browser.engagement.window_open_event_count": 1,
|
||||
| "browser.errors.collected_with_stack_count": 37,
|
||||
| "browser.engagement.total_uri_count": 63,
|
||||
| "browser.errors.collected_count": 181,
|
||||
| "browser.engagement.active_ticks": 271
|
||||
| },
|
||||
| "events": [
|
||||
| [
|
||||
| 176078022,
|
||||
|
@ -436,7 +463,7 @@ object EventsToAmplitudeTest {
|
|||
| "background",
|
||||
| "app",
|
||||
| "",
|
||||
| { "sessionLength": "1000" }
|
||||
| { "sessionLength": 1000 }
|
||||
| ],
|
||||
| [
|
||||
| 176151591,
|
||||
|
|
Загрузка…
Ссылка в новой задаче