Add Integration Tests with Docker

Run these tests with `sbt dockerComposeTest`. You can also startup
the docker containers separately with `sbt dockerComposeRun`.

Plenty of weirdness with spark in this patch. Some of the potential
spark bugs are labeled as such.
This commit is contained in:
Frank Bertsch 2017-08-30 09:48:29 -05:00
Родитель c9b7a77871
Коммит ca8de72394
8 изменённых файлов: 209 добавлений и 16 удалений

Просмотреть файл

@ -1,6 +1,11 @@
sudo: required
language: scala
jdk:
- oraclejdk8
addons:
apt:
packages: # Required for docker build
- net-tools
# These directories are cached to S3 at the end of the build
cache:
directories:
@ -10,9 +15,13 @@ before_cache:
# Tricks to avoid unnecessary cache updates
- find $HOME/.ivy2 -name "ivydata-*.properties" -delete
- find $HOME/.sbt -name "*.lock" -delete
services:
- docker
scala:
- 2.11.8
env:
- DOCKER_DIR="../docker/"
script:
- sbt ci
after_success:
- bash <(curl -s https://codecov.io/bash)
- bash <(curl -s https://codecov.io/bash)

Просмотреть файл

@ -15,22 +15,41 @@ organization := "com.mozilla"
scalaVersion in ThisBuild := "2.11.8"
val sparkVersion = "2.1.1"
val sparkVersion = "2.2.0"
lazy val root = (project in file(".")).
settings(
libraryDependencies += "com.mozilla.telemetry" %% "moztelemetry" % "1.0-SNAPSHOT",
libraryDependencies += "com.mozilla.telemetry" %% "spark-hyperloglog" % "2.0.0-SNAPSHOT",
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % "test",
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
libraryDependencies += "org.rogach" %% "scallop" % "1.0.2",
libraryDependencies += "com.google.protobuf" % "protobuf-java" % "2.5.0",
libraryDependencies += "joda-time" % "joda-time" % "2.9.2"
libraryDependencies += "joda-time" % "joda-time" % "2.9.2",
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.0.1"
)
// Setup docker task
enablePlugins(DockerComposePlugin, DockerPlugin)
dockerImageCreationTask := docker.value
composeFile := sys.props.getOrElse("DOCKER_DIR", default = "docker/") + "docker-compose.yml"
variablesForSubstitutionTask := {
val dockerKafkaHost: String = "./docker_setup.sh" !!;
Map("DOCKER_KAFKA_HOST" -> dockerKafkaHost)
}
// Only run docker tasks on `sbt dockerComposeTest`
testOptions in Test += Tests.Argument("-l", "DockerComposeTag")
dockerfile in docker := {
new Dockerfile {
from("java")
}
}
// make run command include the provided dependencies
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
@ -52,4 +71,4 @@ assemblyMergeStrategy in assembly := {
oldStrategy(x)
}
addCommandAlias("ci", ";clean ;compile ;scalastyle ;coverage ;test ;coverageReport")
addCommandAlias("ci", ";clean ;compile ;scalastyle ;coverage ;dockerComposeTest ;coverageReport")

17
docker/docker-compose.yml Normal file
Просмотреть файл

@ -0,0 +1,17 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_KAFKA_HOST}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper

11
docker_setup.sh Executable file
Просмотреть файл

@ -0,0 +1,11 @@
rm -rf /tmp/checkpoint /tmp/parquet
EN=$(ifconfig en0 | grep "inet " | grep -oE '([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | head -n 1)
ETH=$(ifconfig eth0 | grep "inet " | grep -oE '([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | head -n 1)
if test -z "$EN"; then
DOCKER_KAFKA_HOST=$ETH
else
DOCKER_KAFKA_HOST=$EN
fi
echo $DOCKER_KAFKA_HOST

Просмотреть файл

@ -8,3 +8,6 @@ resolvers += "Artima Maven Repository" at "http://repo.artima.com/releases"
addSbtPlugin("com.artima.supersafe" % "sbtplugin" % "1.1.0")
addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.27")
addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.4.1")

Просмотреть файл

@ -21,9 +21,12 @@ import org.joda.time.DateTime
object ErrorAggregator {
val kafkaTopic = "telemetry"
val outputPrefix = "error_aggregates/v2"
val queryName = "error_aggregates"
private val allowedDocTypes = List("main", "crash")
private val allowedAppNames = List("Firefox")
private val outputPrefix = "error_aggregates/v2"
private val kafkaCacheMaxCapacity = 1000
// This is the number of files output per submission_date
@ -311,7 +314,7 @@ object ErrorAggregator {
.option("failOnDataLoss", opts.failOnDataLoss())
.option("kafka.max.partition.fetch.bytes", 8 * 1024 * 1024) // 8MB
.option("spark.streaming.kafka.consumer.cache.maxCapacity", kafkaCacheMaxCapacity)
.option("subscribe", "telemetry")
.option("subscribe", kafkaTopic)
.option("startingOffsets", opts.startingOffsets())
.load()
@ -319,6 +322,7 @@ object ErrorAggregator {
aggregate(pings.select("value"), raiseOnError = opts.raiseOnError())
.writeStream
.queryName(queryName)
.format("parquet")
.option("path", s"${outputPath}/${outputPrefix}")
.option("checkpointLocation", opts.checkpointPath())
@ -370,11 +374,11 @@ object ErrorAggregator {
val spark = SparkSession.builder()
.appName("Error Aggregates")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()
spark.udf.register("HllCreate", hllCreate _)
opts.kafkaBroker.get match {
case Some(_) => writeStreamingAggregates(spark, opts)
case None => writeBatchAggregates(spark, opts)

Просмотреть файл

@ -22,7 +22,7 @@ object TestUtils {
val today = new DateTime(testTimestampMillis)
val todayDays = new Duration(new DateTime(0), today).getStandardDays().toInt
def generateCrashMessages(size: Int, fieldsOverride: Option[Map[String, Any]]=None): Seq[Message] = {
def generateCrashMessages(size: Int, fieldsOverride: Option[Map[String, Any]]=None, timestamp: Option[Long]=None): Seq[Message] = {
val defaultMap = Map(
"clientId" -> "client1",
"docType" -> "crash",
@ -80,11 +80,12 @@ object TestUtils {
| },
| "application": ${applicationJson}
|}""".stripMargin),
timestamp=testTimestampNano
timestamp=timestamp.getOrElse(testTimestampNano)
)
}
}
def generateMainMessages(size: Int, fieldsOverride: Option[Map[String, Any]]=None): Seq[Message] = {
def generateMainMessages(size: Int, fieldsOverride: Option[Map[String, Any]]=None, timestamp: Option[Long]=None): Seq[Message] = {
val defaultMap = Map(
"clientId" -> "client1",
"docType" -> "main",
@ -182,7 +183,7 @@ object TestUtils {
| }
| }
|}""".stripMargin),
timestamp=testTimestampNano
timestamp=timestamp.getOrElse(testTimestampNano)
)
}
}

Просмотреть файл

@ -4,22 +4,46 @@
package com.mozilla.telemetry.streaming
import java.sql.Timestamp
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.utils.ZkUtils
import com.mozilla.spark.sql.hyperloglog.functions.{hllCreate, hllCardinality}
import com.mozilla.telemetry.streaming.TestUtils.todayDays
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.SparkSession
import org.joda.time.{DateTime, Duration}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.joda.time.{Duration, DateTime}
import org.json4s.DefaultFormats
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.{BeforeAndAfterAll, AsyncFlatSpec, Matchers, Tag}
class TestErrorAggregator extends FlatSpec with Matchers with BeforeAndAfterAll {
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.sys.process._
class TestErrorAggregator extends AsyncFlatSpec with Matchers with BeforeAndAfterAll {
object DockerComposeTag extends Tag("DockerComposeTag")
val zkHostInfo = "localhost:2181"
val kafkaTopicPartitions = 1
val kafkaBrokers = "localhost:9092"
implicit val formats = DefaultFormats
val k = TestUtils.scalarValue
val app = TestUtils.application
// 2016-04-07T02:01:56.000Z
val earlierTimestamp = 1459994516000000000L
// 2016-04-07T02:35:16.000Z
val laterTimestamp = 1459996516000000000L
val spark = SparkSession.builder()
.appName("Error Aggregates")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.master("local[1]")
.getOrCreate()
@ -30,6 +54,27 @@ class TestErrorAggregator extends FlatSpec with Matchers with BeforeAndAfterAll
spark.stop()
}
def topicExists(zkUtils: ZkUtils, topic: String): Boolean = {
// taken from
// https://github.com/apache/spark/blob/master/external/kafka-0-10-sql +
// src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L350
return zkUtils.getAllTopics().contains(topic)
}
def createTopic(topic: String, numPartitions: Int) = {
val timeoutMs = 10000
val isSecureKafkaCluster = false
val replicationFactor = 1
val topicConfig = new Properties
val zkUtils = ZkUtils.apply(zkHostInfo, timeoutMs, timeoutMs, isSecureKafkaCluster)
if(!topicExists(zkUtils, topic)) {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
}
zkUtils.close()
}
"The aggregator" should "sum metrics over a set of dimensions" in {
import spark.implicits._
val messages =
@ -160,7 +205,7 @@ class TestErrorAggregator extends FlatSpec with Matchers with BeforeAndAfterAll
val messages = (crashMessage ++ mainMessage).map(_.toByteArray).seq
val df = ErrorAggregator.aggregate(spark.sqlContext.createDataset(messages).toDF, raiseOnError = true, online = false)
//one count for each experiment-branch, and one for null-null
// one count for each experiment-branch, and one for null-null
df.count() should be (3)
val inspectedFields = List(
@ -264,7 +309,91 @@ class TestErrorAggregator extends FlatSpec with Matchers with BeforeAndAfterAll
val df = ErrorAggregator.aggregate(spark.sqlContext.createDataset(messages).toDF, raiseOnError = false, online = false)
df.where("application <> 'Firefox'").count() should be (0)
}
"the aggregator" should "correctly read from kafka" taggedAs(DockerComposeTag) in {
spark.sparkContext.setLogLevel("WARN")
val conf = new Properties()
conf.put("bootstrap.servers", kafkaBrokers)
conf.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
conf.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
val kafkaProducer = new KafkaProducer[String, Array[Byte]](conf)
createTopic(ErrorAggregator.kafkaTopic, kafkaTopicPartitions)
def send(rs: Seq[Array[Byte]]): Unit = {
rs.foreach{ v =>
val record = new ProducerRecord[String, Array[Byte]](ErrorAggregator.kafkaTopic, v)
kafkaProducer.send(record)
kafkaProducer.flush()
}
}
val earlier = (TestUtils.generateMainMessages(k, timestamp=Some(earlierTimestamp)) ++
TestUtils.generateCrashMessages(k, timestamp=Some(earlierTimestamp))).map(_.toByteArray)
val later = TestUtils.generateMainMessages(1, timestamp=Some(laterTimestamp)).map(_.toByteArray)
val expectedTotalMsgs = 2 * k
val listener = new StreamingQueryListener {
val DefaultWatermark = "1970-01-01T00:00:00.000Z"
var messagesSeen = 0L
var sentMessages = false
var watermarks: Set[String] = Set(DefaultWatermark)
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
messagesSeen += event.progress.numInputRows
if(!sentMessages){
send(earlier)
sentMessages = true
}
// If we only send this message once (i.e. set a flag that we've sent it), Spark will recieve
// it and process the new rows (should be 3: 1 per experiment), and will update the eventTime["max"]
// to be this message's time -- but it will not update the watermark, and thus will not write
// the old rows (from earlier) to disk. You can follow this by reading the QueryProgress log events.
// If we send more than one, however, it eventually updates the value.
if(messagesSeen >= expectedTotalMsgs){
send(later)
}
val watermark = event.progress.eventTime.getOrDefault("watermark", DefaultWatermark)
watermarks = watermarks | Set(watermark)
// We're done when we've gone through 3 watermarks -- the default, the earlier, and the later
// when we're on the later watermark, the data from the earlier window is written to disk
if(watermarks.size == 3){
spark.streams.active.foreach(_.processAllAvailable)
spark.streams.active.foreach(_.stop)
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
if(messagesSeen < expectedTotalMsgs){
println(s"Terminated Early: Expected $expectedTotalMsgs messages, saw $messagesSeen")
}
}
}
spark.streams.addListener(listener)
val outputPath = "/tmp/parquet"
val args = "--kafkaBroker" :: kafkaBrokers ::
"--outputPath" :: outputPath ::
"--startingOffsets" :: "latest" ::
"--raiseOnError" :: Nil
val mainRes: Future[Unit] = Future {
ErrorAggregator.main(args.toArray)
}
mainRes map {_ => assert(spark.read.parquet(s"$outputPath/${ErrorAggregator.outputPrefix}").count() == 3)}
}
"The resulting schema" should "not have fields belonging to the tempSchema" in {