зеркало из https://github.com/microsoft/spark.git
Add MetricsServlet for Spark metrics system
This commit is contained in:
Родитель
2a39d2ca25
Коммит
320e87e7ab
|
@ -3,8 +3,8 @@
|
|||
# This file configures Spark's internal metrics system. The metrics system is
|
||||
# divided into instances which correspond to internal components.
|
||||
# Each instance can be configured to report its metrics to one or more sinks.
|
||||
# Accepted values for [instance] are "master", "worker", "executor", "driver",
|
||||
# and "applications". A wild card "*" can be used as an instance name, in
|
||||
# Accepted values for [instance] are "master", "worker", "executor", "driver",
|
||||
# and "applications". A wild card "*" can be used as an instance name, in
|
||||
# which case all instances will inherit the supplied property.
|
||||
#
|
||||
# Within an instance, a "source" specifies a particular set of grouped metrics.
|
||||
|
@ -19,7 +19,7 @@
|
|||
# A "sink" specifies where metrics are delivered to. Each instance can be
|
||||
# assigned one or more sinks.
|
||||
#
|
||||
# The sink|source field specifies whether the property relates to a sink or
|
||||
# The sink|source field specifies whether the property relates to a sink or
|
||||
# source.
|
||||
#
|
||||
# The [name] field specifies the name of source or sink.
|
||||
|
@ -28,18 +28,29 @@
|
|||
# source or sink is responsible for parsing this property.
|
||||
#
|
||||
# Notes:
|
||||
# 1. To add a new sink, set the "class" option to a fully qualified class
|
||||
# 1. To add a new sink, set the "class" option to a fully qualified class
|
||||
# name (see examples below).
|
||||
# 2. Some sinks involve a polling period. The minimum allowed polling period
|
||||
# is 1 second.
|
||||
# 3. Wild card properties can be overridden by more specific properties.
|
||||
# For example, master.sink.console.period takes precedence over
|
||||
# 3. Wild card properties can be overridden by more specific properties.
|
||||
# For example, master.sink.console.period takes precedence over
|
||||
# *.sink.console.period.
|
||||
# 4. A metrics specific configuration
|
||||
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
|
||||
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
|
||||
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
|
||||
# and it will be loaded automatically.
|
||||
# 5. MetricsServlet is added by default as a sink in master, worker and client
|
||||
# driver, you can send http request "/metrics" to get a snapshot of all the
|
||||
# registered metrics in json format. For master, requests "/metrics/master" and
|
||||
# "/metrics/applications" can be sent seperately to get metrics snapshot of
|
||||
# instance master and applications.
|
||||
#
|
||||
|
||||
# Change MetricsServlet's property
|
||||
#*.sink.servlet.uri=/metrics
|
||||
#
|
||||
#*.sink.servlet.sample=false
|
||||
|
||||
# Enable JmxSink for all instances by class name
|
||||
#*.sink.jmx.class=spark.metrics.sink.JmxSink
|
||||
|
|
|
@ -126,6 +126,10 @@
|
|||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-jvm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
|
|
|
@ -65,6 +65,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
|||
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
|
||||
val masterSource = new MasterSource(this)
|
||||
|
||||
// Add default MetricsServlet handler to web ui
|
||||
masterMetricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
|
||||
applicationMetricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
|
||||
val masterPublicAddress = {
|
||||
val envVar = System.getenv("SPARK_PUBLIC_DNS")
|
||||
if (envVar != null) envVar else host
|
||||
|
|
|
@ -57,7 +57,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
val handlers = Array[(String, Handler)](
|
||||
var handlers = Array[(String, Handler)](
|
||||
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
|
||||
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
|
||||
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
|
||||
|
|
|
@ -101,6 +101,12 @@ private[spark] class Worker(
|
|||
logInfo("Spark home: " + sparkHome)
|
||||
createWorkDir()
|
||||
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
|
||||
|
||||
// Add default MetricsServlet handlers to webUi
|
||||
metricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
|
||||
webUi.start()
|
||||
connectToMaster()
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
|
|||
|
||||
val indexPage = new IndexPage(this)
|
||||
|
||||
val handlers = Array[(String, Handler)](
|
||||
var handlers = Array[(String, Handler)](
|
||||
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
|
||||
("/log", (request: HttpServletRequest) => log(request)),
|
||||
("/logPage", (request: HttpServletRequest) => logPage(request)),
|
||||
|
|
|
@ -36,7 +36,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
|
|||
var propertyCategories: mutable.HashMap[String, Properties] = null
|
||||
|
||||
private def setDefaultProperties(prop: Properties) {
|
||||
// empty function, any default property can be set here
|
||||
prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
|
||||
prop.setProperty("master.sink.servlet.uri", "/metrics/master")
|
||||
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications")
|
||||
}
|
||||
|
||||
def initialize() {
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
|
|||
import scala.collection.mutable
|
||||
|
||||
import spark.Logging
|
||||
import spark.metrics.sink.Sink
|
||||
import spark.metrics.sink.{MetricsServlet, Sink}
|
||||
import spark.metrics.source.Source
|
||||
|
||||
/**
|
||||
|
@ -35,7 +35,7 @@ import spark.metrics.source.Source
|
|||
* "instance" specify "who" (the role) use metrics system. In spark there are several roles
|
||||
* like master, worker, executor, client driver, these roles will create metrics system
|
||||
* for monitoring. So instance represents these roles. Currently in Spark, several instances
|
||||
* have already implemented: master, worker, executor, driver.
|
||||
* have already implemented: master, worker, executor, driver, applications.
|
||||
*
|
||||
* "source" specify "where" (source) to collect metrics data. In metrics system, there exists
|
||||
* two kinds of source:
|
||||
|
@ -51,8 +51,8 @@ import spark.metrics.source.Source
|
|||
* Metrics configuration format is like below:
|
||||
* [instance].[sink|source].[name].[options] = xxxx
|
||||
*
|
||||
* [instance] can be "master", "worker", "executor", "driver", which means only the specified
|
||||
* instance has this property.
|
||||
* [instance] can be "master", "worker", "executor", "driver", "applications" which means only
|
||||
* the specified instance has this property.
|
||||
* wild card "*" can be used to replace instance name, which means all the instances will have
|
||||
* this property.
|
||||
*
|
||||
|
@ -72,6 +72,9 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
|
|||
val sources = new mutable.ArrayBuffer[Source]
|
||||
val registry = new MetricRegistry()
|
||||
|
||||
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
|
||||
var metricsServlet: Option[MetricsServlet] = None
|
||||
|
||||
metricsConfig.initialize()
|
||||
registerSources()
|
||||
registerSinks()
|
||||
|
@ -126,7 +129,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
|
|||
val sink = Class.forName(classPath)
|
||||
.getConstructor(classOf[Properties], classOf[MetricRegistry])
|
||||
.newInstance(kv._2, registry)
|
||||
sinks += sink.asInstanceOf[Sink]
|
||||
if (kv._1 =="servlet") {
|
||||
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
|
||||
} else {
|
||||
sinks += sink.asInstanceOf[Sink]
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package spark.metrics.sink
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.json.MetricsModule
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
|
||||
import org.eclipse.jetty.server.Handler
|
||||
|
||||
import spark.ui.JettyUtils
|
||||
|
||||
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
|
||||
val SERVLET_KEY_URI = "uri"
|
||||
val SERVLET_KEY_SAMPLE = "sample"
|
||||
|
||||
val SERVLET_DEFAULT_URI = "/metrics"
|
||||
val SERVLET_DEFAULT_SAMPLE = false
|
||||
|
||||
val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI)
|
||||
|
||||
val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match {
|
||||
case Some(s) => s.toBoolean
|
||||
case None => SERVLET_DEFAULT_SAMPLE
|
||||
}
|
||||
|
||||
val mapper = new ObjectMapper().registerModule(
|
||||
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
|
||||
|
||||
def getHandlers = Array[(String, Handler)](
|
||||
(servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
|
||||
)
|
||||
|
||||
def getMetricsSnapshot(request: HttpServletRequest): String = {
|
||||
mapper.writeValueAsString(registry)
|
||||
}
|
||||
|
||||
override def start() { }
|
||||
|
||||
override def stop() { }
|
||||
}
|
|
@ -48,7 +48,7 @@ private[spark] object JettyUtils extends Logging {
|
|||
implicit def textResponderToHandler(responder: Responder[String]): Handler =
|
||||
createHandler(responder, "text/plain")
|
||||
|
||||
private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
|
||||
def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
|
||||
extractFn: T => String = (in: Any) => in.toString): Handler = {
|
||||
new AbstractHandler {
|
||||
def handle(target: String,
|
||||
|
|
|
@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
|
|||
|
||||
import org.eclipse.jetty.server.{Handler, Server}
|
||||
|
||||
import spark.{Logging, SparkContext, Utils}
|
||||
import spark.{Logging, SparkContext, SparkEnv, Utils}
|
||||
import spark.ui.env.EnvironmentUI
|
||||
import spark.ui.exec.ExecutorsUI
|
||||
import spark.ui.storage.BlockManagerUI
|
||||
|
@ -43,8 +43,15 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
|
|||
val jobs = new JobProgressUI(sc)
|
||||
val env = new EnvironmentUI(sc)
|
||||
val exec = new ExecutorsUI(sc)
|
||||
|
||||
// Add MetricsServlet handlers by default
|
||||
val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match {
|
||||
case Some(s) => s.getHandlers
|
||||
case None => Array[(String, Handler)]()
|
||||
}
|
||||
|
||||
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
|
||||
exec.getHandlers ++ handlers
|
||||
exec.getHandlers ++ metricsServletHandlers ++ handlers
|
||||
|
||||
/** Bind the HTTP server which backs this web interface */
|
||||
def bind() {
|
||||
|
|
|
@ -1,12 +1,24 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package spark.metrics
|
||||
|
||||
import java.util.Properties
|
||||
import java.io.{File, FileOutputStream}
|
||||
|
||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||
|
||||
import spark.metrics._
|
||||
|
||||
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
||||
var filePath: String = _
|
||||
|
||||
|
@ -18,11 +30,12 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
val conf = new MetricsConfig(Option("dummy-file"))
|
||||
conf.initialize()
|
||||
|
||||
assert(conf.properties.size() === 0)
|
||||
assert(conf.properties.size() === 3)
|
||||
assert(conf.properties.getProperty("test-for-dummy") === null)
|
||||
|
||||
val property = conf.getInstance("random")
|
||||
assert(property.size() === 0)
|
||||
assert(property.size() === 1)
|
||||
assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
}
|
||||
|
||||
test("MetricsConfig with properties set") {
|
||||
|
@ -30,16 +43,19 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
conf.initialize()
|
||||
|
||||
val masterProp = conf.getInstance("master")
|
||||
assert(masterProp.size() === 3)
|
||||
assert(masterProp.size() === 5)
|
||||
assert(masterProp.getProperty("sink.console.period") === "20")
|
||||
assert(masterProp.getProperty("sink.console.unit") === "minutes")
|
||||
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
|
||||
assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master")
|
||||
|
||||
val workerProp = conf.getInstance("worker")
|
||||
assert(workerProp.size() === 3)
|
||||
assert(workerProp.size() === 4)
|
||||
assert(workerProp.getProperty("sink.console.period") === "10")
|
||||
assert(workerProp.getProperty("sink.console.unit") === "seconds")
|
||||
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
|
||||
assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
|
||||
assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
}
|
||||
|
||||
test("MetricsConfig with subProperties") {
|
||||
|
@ -47,7 +63,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
conf.initialize()
|
||||
|
||||
val propCategories = conf.propertyCategories
|
||||
assert(propCategories.size === 2)
|
||||
assert(propCategories.size === 3)
|
||||
|
||||
val masterProp = conf.getInstance("master")
|
||||
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
|
||||
|
@ -55,10 +71,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
|
||||
|
||||
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
|
||||
assert(sinkProps.size === 1)
|
||||
assert(sinkProps.size === 2)
|
||||
assert(sinkProps.contains("console"))
|
||||
assert(sinkProps.contains("servlet"))
|
||||
|
||||
val consoleProps = sinkProps("console")
|
||||
assert(consoleProps.size() === 2)
|
||||
|
||||
val servletProps = sinkProps("servlet")
|
||||
assert(servletProps.size() === 2)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,24 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package spark.metrics
|
||||
|
||||
import java.util.Properties
|
||||
import java.io.{File, FileOutputStream}
|
||||
|
||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||
|
||||
import spark.metrics._
|
||||
|
||||
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
|
||||
var filePath: String = _
|
||||
|
||||
|
@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
|
|||
|
||||
assert(sources.length === 0)
|
||||
assert(sinks.length === 0)
|
||||
assert(metricsSystem.metricsServlet != None)
|
||||
}
|
||||
|
||||
test("MetricsSystem with sources add") {
|
||||
|
@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
|
|||
|
||||
assert(sources.length === 0)
|
||||
assert(sinks.length === 1)
|
||||
assert(metricsSystem.metricsServlet != None)
|
||||
|
||||
val source = new spark.deploy.master.MasterSource(null)
|
||||
metricsSystem.registerSource(source)
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -269,6 +269,11 @@
|
|||
<artifactId>metrics-jvm</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codahale.metrics</groupId>
|
||||
<artifactId>metrics-json</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-compiler</artifactId>
|
||||
|
|
|
@ -186,6 +186,7 @@ object SparkBuild extends Build {
|
|||
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
|
||||
"com.codahale.metrics" % "metrics-core" % "3.0.0",
|
||||
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
|
||||
"com.codahale.metrics" % "metrics-json" % "3.0.0",
|
||||
"com.twitter" % "chill_2.9.3" % "0.3.1",
|
||||
"com.twitter" % "chill-java" % "0.3.1"
|
||||
) ++ (
|
||||
|
|
Загрузка…
Ссылка в новой задаче