Simplify DogStatsDMetric to use a single case class type with static "factory" methods for each metric type

This commit is contained in:
Sunah Suh 2019-05-28 22:11:35 -05:00 коммит произвёл Sunah Suh
Родитель b85cdffc72
Коммит 6741ce6c8e
5 изменённых файлов: 72 добавлений и 62 удалений

Просмотреть файл

@ -6,13 +6,8 @@ package com.mozilla.telemetry.monitoring
/* Conforms to the DogStatsD datagram format as documented here:
* https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/ */
sealed trait DogStatsDMetric {
val metricName: String
protected val metricStringValue: String
protected val metricType: String
val kvTags: Option[Map[String, String]]
val bareTags: Option[Seq[String]]
case class DogStatsDMetric(metricName: String, metricValue: String, metricType: String, kvTags: Option[Map[String, String]] = None,
bareTags: Option[Seq[String]] = None) {
def format(sampleRate: Option[Double] = None): String = {
Array(Some(metric), Some(metricType), sampleRateString(sampleRate), tags)
.flatten
@ -27,7 +22,7 @@ sealed trait DogStatsDMetric {
private def sampleRateString(sampleRate: Option[Double]): Option[String] = sampleRate.map("@" + _.toString)
lazy private val metric: String = s"${normalize(metricName)}:$metricStringValue"
lazy private val metric: String = s"${normalize(metricName)}:$metricValue"
lazy private val tags: Option[String] = {
val kv = kvTags.map(_.map {case (k, v) => s"${normalize(k)}:${normalize(v)}"}.mkString(","))
val bare = bareTags.map(_.map(normalize).mkString(","))
@ -39,8 +34,17 @@ sealed trait DogStatsDMetric {
}
}
case class DogStatsDCounter(metricName: String, metricValue: Int = 1, kvTags: Option[Map[String, String]] = None,
bareTags: Option[Seq[String]] = None) extends DogStatsDMetric {
protected val metricStringValue = metricValue.toString
protected val metricType = "c"
object DogStatsDMetric {
// While it would have been cleaner to structure these as a trait w each metric type as a concrete implementation of
// such, the dataset type parameter must be a subtype of Product, so we're unable to go that route if we want to have
// multiple types of metrics in a single dataframe
def makeCounter(metricName: String, metricValue: Int = 1, kvTags: Option[Map[String, String]] = None,
bareTags: Option[Seq[String]] = None): DogStatsDMetric = {
DogStatsDMetric(metricName, metricValue.toString, "c", kvTags, bareTags)
}
def makeTimer(metricName: String, metricValue: Int, kvTags: Option[Map[String, String]] = None,
bareTags: Option[Seq[String]] = None): DogStatsDMetric = {
DogStatsDMetric(metricName, metricValue.toString, "ms", kvTags, bareTags)
}
}

Просмотреть файл

@ -3,12 +3,13 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.sinks
import com.mozilla.telemetry.monitoring.{DogStatsDMetric, DogStatsDCounter}
import org.apache.spark.sql.ForeachWriter
import java.net.{InetSocketAddress, DatagramPacket, DatagramSocket}
import com.mozilla.telemetry.monitoring.DogStatsDMetric
sealed abstract class DogStatsDMetricSink[T <: DogStatsDMetric](host: String, port: Int, sampleRate: Option[Double] = None) extends ForeachWriter[T] {
class DogStatsDMetricSink(host: String, port: Int, sampleRate: Option[Double] = None)
extends ForeachWriter[DogStatsDMetric] {
private val address = new InetSocketAddress(host, port)
private var socket: DatagramSocket = null
@ -17,7 +18,7 @@ sealed abstract class DogStatsDMetricSink[T <: DogStatsDMetric](host: String, po
true
}
def process(value: T): Unit = {
def process(value: DogStatsDMetric): Unit = {
val bytes = value.format(sampleRate).getBytes
val datagram = new DatagramPacket(bytes, bytes.length, address)
socket.send(datagram)
@ -27,5 +28,3 @@ sealed abstract class DogStatsDMetricSink[T <: DogStatsDMetric](host: String, po
socket.close()
}
}
class DogStatsDCounterSink(host: String, port: Int, sampleRate: Option[Double] = None) extends DogStatsDMetricSink[DogStatsDCounter](host, port, sampleRate)

Просмотреть файл

@ -1,33 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.monitoring
import org.scalatest.{FlatSpec, Matchers}
class DogStatsDCounterTest extends FlatSpec with Matchers {
"DogStatsDCounter" should "produce a properly formatted minimal datagram string" in {
val actual = DogStatsDCounter("test.metric").format()
actual should be("test.metric:1|c")
}
it should "produce a properly formatted fully-specified datagram string" in {
val actual = DogStatsDCounter("test.metric", 3, Some(Map("key1" -> "value1", "key2" -> "value2")), Some(Seq("tag1", "tag2"))).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#key1:value1,key2:value2,tag1,tag2")
}
it should "handle present key-value tags without non-key-value tags" in {
val actual = DogStatsDCounter("test.metric", 3, Some(Map("key1" -> "value1", "key2" -> "value2")), None).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#key1:value1,key2:value2")
}
it should "handle present non-key-value tags without key-value tags" in {
val actual = DogStatsDCounter("test.metric", 3, None, Some(Seq("tag1", "tag2"))).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#tag1,tag2")
}
it should "normalize illegal characters" in {
val actual = DogStatsDCounter("test.me|t@r:ic", 3, None, Some(Seq("t|a@g:1", "t|a@g:2"))).format(Some(0.5))
actual should be("test.me_t_r_ic:3|c|@0.5|#t_a_g_1,t_a_g_2")
}
}

Просмотреть файл

@ -0,0 +1,38 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package com.mozilla.telemetry.monitoring
import org.scalatest.{FlatSpec, Matchers}
class DogStatsDMetricTest extends FlatSpec with Matchers {
"DogStatsDMetric Counter" should "produce a properly formatted minimal datagram string" in {
val actual = DogStatsDMetric.makeCounter("test.metric").format()
actual should be("test.metric:1|c")
}
it should "produce a properly formatted fully-specified datagram string" in {
val actual = DogStatsDMetric.makeCounter("test.metric", 3, Some(Map("key1" -> "value1", "key2" -> "value2")), Some(Seq("tag1", "tag2"))).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#key1:value1,key2:value2,tag1,tag2")
}
it should "handle present key-value tags without non-key-value tags" in {
val actual = DogStatsDMetric.makeCounter("test.metric", 3, Some(Map("key1" -> "value1", "key2" -> "value2")), None).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#key1:value1,key2:value2")
}
it should "handle present non-key-value tags without key-value tags" in {
val actual = DogStatsDMetric.makeCounter("test.metric", 3, None, Some(Seq("tag1", "tag2"))).format(Some(0.5))
actual should be("test.metric:3|c|@0.5|#tag1,tag2")
}
it should "normalize illegal characters" in {
val actual = DogStatsDMetric.makeCounter("test.me|t@r:ic", 3, None, Some(Seq("t|a@g:1", "t|a@g:2"))).format(Some(0.5))
actual should be("test.me_t_r_ic:3|c|@0.5|#t_a_g_1,t_a_g_2")
}
"DogStatsDMetric timer" should "produce a properly formatted minimal datagram string" in {
val actual = DogStatsDMetric.makeTimer("test.metric", 1).format()
actual should be("test.metric:1|ms")
}
}

Просмотреть файл

@ -6,7 +6,7 @@ package com.mozilla.telemetry.sinks
import java.net.{DatagramPacket, DatagramSocket}
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import com.mozilla.telemetry.monitoring.DogStatsDCounter
import com.mozilla.telemetry.monitoring.DogStatsDMetric
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{FlatSpec, Matchers}
@ -28,28 +28,29 @@ case class UDPReceiver(bufferLength: Int = 1024) {
}
}
class DogStatsDCounterSinkTest extends FlatSpec with DataFrameSuiteBase with Matchers {
"DogStatsDCounterSink" should "produce a properly formatted minimal datagram string" in {
class DogStatsDMetricSinkTest extends FlatSpec with DataFrameSuiteBase with Matchers {
"DogStatsDMetricSink" should "produce a properly formatted minimal datagram string" in {
implicit def executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
import spark.implicits._
val input = MemoryStream[DogStatsDCounter]
val input = MemoryStream[DogStatsDMetric]
val receiver = UDPReceiver()
val f = Future[Seq[String]] {
receiver.receiveData(4)
receiver.receiveData(5)
}
val sink = new DogStatsDCounterSink("localhost", receiver.port, Some(0.1))
val sink = new DogStatsDMetricSink("localhost", receiver.port, Some(0.1))
val query = input.toDS()
.writeStream
.queryName("DogStatsDCounterSinkTest")
.queryName("DogStatsDMetricSinkTest")
.foreach(sink)
.start()
input.addData(DogStatsDCounter("test.sink"))
input.addData(DogStatsDCounter("test.sink", kvTags = Some(Map("hello" -> "world"))))
input.addData(DogStatsDCounter("test.sink", bareTags = Some(Seq("what", "is:new"))))
input.addData(DogStatsDCounter("test.sink", metricValue = 2))
input.addData(DogStatsDMetric.makeCounter("test.sink"))
input.addData(DogStatsDMetric.makeCounter("test.sink", kvTags = Some(Map("hello" -> "world"))))
input.addData(DogStatsDMetric.makeCounter("test.sink", bareTags = Some(Seq("what", "is:new"))))
input.addData(DogStatsDMetric.makeCounter("test.sink", metricValue = 2))
input.addData(DogStatsDMetric.makeTimer("test.sink", metricValue = 2))
query.processAllAvailable()
@ -58,7 +59,8 @@ class DogStatsDCounterSinkTest extends FlatSpec with DataFrameSuiteBase with Mat
"test.sink:1|c|@0.1",
"test.sink:1|c|@0.1|#hello:world",
"test.sink:1|c|@0.1|#what,is_new",
"test.sink:2|c|@0.1"
"test.sink:2|c|@0.1",
"test.sink:2|ms|@0.1"
)
}