Bug 1485583 - Add accumulator-based metrics source

This commit is contained in:
Arkadiusz Komarzewski 2018-08-23 10:39:43 +02:00 коммит произвёл Arkadiusz Komarzewski
Родитель faf3bff58f
Коммит 25a9c7eb18
3 изменённых файлов: 196 добавлений и 0 удалений

Просмотреть файл

@ -31,6 +31,7 @@ lazy val root = (project in file(".")).
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
exclude("net.jpountz.lz4", "lz4"), //conflicts with org.lz4:lz4-java:1.4.0 from spark-core
libraryDependencies += "org.eclipse.jetty" % "jetty-servlet" % "9.3.20.v20170531" % Provided, // needed for metrics
libraryDependencies += "org.rogach" %% "scallop" % "1.0.2",
libraryDependencies += "com.google.protobuf" % "protobuf-java" % "2.5.0",
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.10.0.1" % Test,

Просмотреть файл

@ -0,0 +1,94 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.apache.spark.metrics.source.custom
// This class is defined under `org.apache.spark` package, because it extends
// package-private `org.apache.spark.metrics.source.Source`
import java.util.concurrent.{Executors, TimeUnit}
import java.{lang => jl}
import com.codahale.metrics.MetricRegistry
import org.apache.commons.lang3.concurrent.BasicThreadFactory
import org.apache.log4j.LogManager
import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.Source
import org.apache.spark.util.{AccumulatorV2, ShutdownHookManager}
import scala.collection.mutable
/**
* Accumulator-based metric source.
*
* This source uses Spark accumulators for collecting metrics. After registering an accumulator, it will be
* periodically polled for its value which will be reported to Spark metrics system using [[com.codahale.metrics.Meter]].
* After accumulators are registered it is necessary to start the source with [[AccumulatorMetricsSource#start()]],
* which will start internal polling thread and register shutdown hook for flushing metrics.
*
* Usage:
* {{{
* val metricsSource = new AccumulatorMetricsSource("prefix")
* val metric = spark.sparkContext.longAccumulator("metric-name")
* metricsSource.registerAccumulator(metric)
* metricsSource.start()
* }}}
*
* @param prefix for metrics collected by this source
*/
class AccumulatorMetricsSource(prefix: String) extends Source {
override val sourceName: String = prefix
override val metricRegistry: MetricRegistry = new MetricRegistry
private val log = LogManager.getLogger(this.getClass.getName)
private val accumulators = mutable.Map[String, AccumulatorV2[jl.Long, jl.Long]]()
private val executor = Executors.newSingleThreadScheduledExecutor(
new BasicThreadFactory.Builder()
.namingPattern("accumulator-metrics-source-thread-%d")
.daemon(true)
.priority(Thread.NORM_PRIORITY)
.build())
def registerAccumulator(acc: AccumulatorV2[jl.Long, jl.Long]): Unit = {
val name = acc.name.getOrElse("UNNAMED_ACCUMULATOR")
require(!accumulators.keySet.contains(name), "Accumulator already registered")
accumulators.put(name, acc)
metricRegistry.meter(name) // register metric beforehand to make sure no errors are thrown later during collection
}
def start(pollingPeriod: Long = 10, unit: TimeUnit = TimeUnit.SECONDS): Unit = {
executor.scheduleAtFixedRate(new Runnable() {
override def run(): Unit = {
try {
report()
} catch {
case ex: Exception =>
log.error("Exception thrown from AccumulatorMetricsSource#report. Exception was suppressed.", ex)
}
}
}, pollingPeriod, pollingPeriod, unit)
SparkEnv.get.metricsSystem.registerSource(this)
ShutdownHookManager.addShutdownHook(() => this.stop())
}
private[custom] def stop(): Unit = {
executor.shutdown()
report()
}
private def report(): Unit = {
accumulators.foreach { case (name, acc) =>
val value = acc.value
val meter = metricRegistry.meter(name)
meter.mark(getCountOverflowSafe(value, meter.getCount))
}
}
private def getCountOverflowSafe(current: jl.Long, previous: jl.Long): jl.Long = {
val unsafeCount = current - previous
val safeCount = if (unsafeCount < 0) jl.Long.MAX_VALUE - previous else unsafeCount
safeCount
}
}

Просмотреть файл

@ -0,0 +1,101 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.apache.spark.metrics.source.custom
import java.io.File
import java.nio.file.{Files, Paths}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkEnv
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.{ForeachWriter, SQLContext, SparkSession}
import org.scalatest._
import scala.io.Source
class AccumulatorMetricsSourceTest extends FlatSpec with Matchers with BeforeAndAfterEach with GivenWhenThen {
private val MetricsOutputDir = "/tmp/metrics-test/"
private val appName = "metrics-test"
private val MetricsSourceName = "ApplicationMetrics"
private val MetricName = "test-metric"
"AccumulatorMetricsSource" should "provide accumulator-based metrics" in {
Given("Spark session configured with csv metrics reporter")
val spark = SparkSession.builder()
.appName(appName)
.master("local[2]")
.config("spark.metrics.conf.*.sink.csv.class", "org.apache.spark.metrics.sink.CsvSink")
.config("spark.metrics.conf.*.sink.csv.period", "1")
.config("spark.metrics.conf.*.sink.csv.directory", MetricsOutputDir)
.config("spark.metrics.namespace", "${spark.app.name}")
.config("spark.sql.streaming.metricsEnabled", "true")
.getOrCreate()
implicit val sqlContext: SQLContext = spark.sqlContext
import spark.implicits._
And("accumulator metrics source with an accumulator registered")
val metricsSource = new AccumulatorMetricsSource(MetricsSourceName)
val testMetric = spark.sparkContext.longAccumulator(MetricName)
metricsSource.registerAccumulator(testMetric)
metricsSource.start()
When("accumulator is incremented during processing")
val DatasetSize = 4
val stream = MemoryStream[Int]
stream.addData(1 to DatasetSize)
stream.toDS().writeStream.foreach(new ForeachWriter[Int] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Int): Unit = testMetric.add(1)
override def close(errorOrNull: Throwable): Unit = ()
}).queryName("TestQuery").start().processAllAvailable()
Then("accumulated metrics are outputted to configured metrics directory")
// flush metrics
metricsSource.stop()
SparkEnv.get.metricsSystem.report()
val customMetricFiles = new File(MetricsOutputDir).listFiles().filter(_.getName.startsWith(s"metrics-test.driver.$MetricsSourceName"))
customMetricFiles should have size 1
val counterFile = customMetricFiles.filter(_.getName.endsWith(s"$MetricName.csv")).head
val maxCount = Source.fromFile(counterFile).getLines().toSeq.tail // first line is a header
.map { line =>
val count = line.split(",")(1) // second column is count, next are rates
count.toLong
}.max
assert(maxCount == DatasetSize, s"Maximum measured count should be ${DatasetSize}. Collected metrics were: \n" + printContents(customMetricFiles))
spark.stop()
}
override protected def beforeEach(): Unit = {
super.beforeEach()
cleanupTestDirectories()
Files.createDirectories(Paths.get(MetricsOutputDir))
}
override protected def afterEach(): Unit = {
super.afterEach()
cleanupTestDirectories()
}
private def cleanupTestDirectories(): Unit = {
FileUtils.deleteDirectory(new File(MetricsOutputDir))
}
private def printContents(files: Array[File]): String = {
files.map { f =>
"============================================================\n" +
"File: " + f.getName + "\n" +
scala.io.Source.fromFile(f).getLines().mkString("\n") + "\n" +
"============================================================"
}.mkString("\n")
}
}