Bug 1423340 - Make tests easily runnable from IDE

This commit is contained in:
Arkadiusz Komarzewski 2018-03-15 16:22:03 +01:00 коммит произвёл Arkadiusz Komarzewski
Родитель ce22403773
Коммит b961999bc1
7 изменённых файлов: 75 добавлений и 70 удалений

Просмотреть файл

@ -22,6 +22,6 @@ scala:
env:
- DOCKER_DIR="../docker/"
script:
- ./runtests.sh ci
- sbt ci
after_success:
- bash <(curl -s https://codecov.io/bash)

Просмотреть файл

@ -24,3 +24,17 @@ the source code and running the tests via sbt. Some common invocations for sbt:
* `sbt dockerComposeTest # run the docker compose tests (slow)`
* `sbt "dockerComposeTest -tags:DockerComposeTag" # run only tests with DockerComposeTag (while using docker)`
* `sbt ci # run all tests`
Some tests need Kafka to run. If one prefers to run them via IDE, it's required to run the test cluster:
```bash
sbt dockerComposeUp
```
or via plain docker-compose:
```bash
export DOCKER_KAFKA_HOST=$(./docker_setup.sh)
docker-compose -f docker/docker-compose.yml up
```
It's also good to shut down the cluster afterwards:
```bash
sbt dockerComposeStop
```

Просмотреть файл

@ -1,11 +1,13 @@
rm -rf /tmp/checkpoint /tmp/parquet
EN=$(ifconfig en0 | grep "inet " | grep -oE '([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | head -n 1)
ETH=$(ifconfig eth0 | grep "inet " | grep -oE '([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)' | head -n 1)
INTERNAL_IP=$(ip route get 8.8.8.8 | awk '{print $NF; exit}')
if test -z "$EN"; then
if test -n "$EN"; then
DOCKER_KAFKA_HOST=$EN
elif test -n "$ETH"; then
DOCKER_KAFKA_HOST=$ETH
else
DOCKER_KAFKA_HOST=$EN
DOCKER_KAFKA_HOST=$INTERNAL_IP
fi
echo $DOCKER_KAFKA_HOST

Просмотреть файл

@ -1,2 +0,0 @@
export AMPLITUDE_API_KEY="test-api-key"
sbt "$@"

Просмотреть файл

@ -3,25 +3,20 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package com.mozilla.telemetry.streaming
import com.mozilla.telemetry.heka.Message
import com.mozilla.telemetry.pings._
import com.mozilla.telemetry.timeseries._
import com.mozilla.telemetry.streaming.sinks.HttpSink
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row, SparkSession}
import org.json4s._
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.joda.time.{DateTime, Days, format}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import java.net.URLEncoder
import scala.io.Source
import com.github.fge.jsonschema.main.JsonSchemaFactory
import com.mozilla.telemetry.heka.Message
import com.mozilla.telemetry.pings._
import com.mozilla.telemetry.streaming.sinks.HttpSink
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.joda.time.{DateTime, Days, format}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.rogach.scallop.{ScallopConf, ScallopOption}
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import scala.io.Source
// TODO:
// - incorporate event schema - DEPENDS ON EVENT SCHEMA
@ -68,7 +63,7 @@ object EventsToAmplitude {
val queryName = "EventsToAmplitude"
val writeMode = "error"
private class Opts(args: Array[String]) extends ScallopConf(args) {
private[streaming] class Opts(args: Array[String]) extends ScallopConf(args) {
val configFilePath: ScallopOption[String] = opt[String](
descr = "JSON file with the configuration",
required = true)
@ -200,9 +195,8 @@ object EventsToAmplitude {
json.extract[Config]
}
def sendStreamingEvents(spark: SparkSession, opts: Opts): Unit = {
def sendStreamingEvents(spark: SparkSession, opts: Opts, apiKey: String): Unit = {
val config = readConfigFile(opts.configFilePath())
val apiKey = sys.env(AMPLITUDE_API_KEY_KEY)
val httpSink = new HttpSink(opts.url(), Map("api_key" -> apiKey))
val pings = spark
@ -276,16 +270,21 @@ object EventsToAmplitude {
spark.stop()
}
def main(args: Array[String]): Unit = {
val opts = new Opts(args)
def process(opts: Opts, apiKey: String): Unit = {
val spark = SparkSession.builder()
.appName(queryName)
.getOrCreate()
opts.kafkaBroker.get match {
case Some(_) => sendStreamingEvents(spark, opts)
case Some(_) => sendStreamingEvents(spark, opts, apiKey)
case None => sendBatchEvents(spark, opts)
}
}
def main(args: Array[String]): Unit = {
val opts = new Opts(args)
val apiKey = sys.env(AMPLITUDE_API_KEY_KEY)
process(opts, apiKey)
}
}

Просмотреть файл

@ -3,22 +3,18 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package com.mozilla.telemetry.streaming
import java.io.File
import java.sql.Timestamp
import com.mozilla.spark.sql.hyperloglog.functions.{hllCreate, hllCardinality}
import com.mozilla.spark.sql.hyperloglog.functions.{hllCardinality, hllCreate}
import com.mozilla.telemetry.streaming.TestUtils.todayDays
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.joda.time.{Duration, DateTime}
import org.json4s.DefaultFormats
import org.scalatest.{FlatSpec, Matchers, Tag}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers, Tag}
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.sys.process._
class TestErrorAggregator extends FlatSpec with Matchers {
class TestErrorAggregator extends FlatSpec with Matchers with BeforeAndAfterAll {
object DockerErrorAggregatorTag extends Tag("DockerErrorAggregatorTag")
@ -41,6 +37,15 @@ class TestErrorAggregator extends FlatSpec with Matchers {
spark.udf.register("HllCreate", hllCreate _)
spark.udf.register("HllCardinality", hllCardinality _)
val streamingOutputPath = "/tmp/parquet"
val streamingCheckpointPath = "/tmp/checkpoint"
override protected def afterAll(): Unit = {
FileUtils.deleteDirectory(new File(streamingOutputPath))
FileUtils.deleteDirectory(new File(streamingCheckpointPath))
}
"The aggregator" should "sum metrics over a set of dimensions" in {
import spark.implicits._
val messages =
@ -389,15 +394,15 @@ class TestErrorAggregator extends FlatSpec with Matchers {
spark.streams.addListener(listener)
val outputPath = "/tmp/parquet"
val args = "--kafkaBroker" :: Kafka.kafkaBrokers ::
"--outputPath" :: outputPath ::
"--outputPath" :: streamingOutputPath ::
"--checkpointPath" :: streamingCheckpointPath ::
"--startingOffsets" :: "latest" ::
"--raiseOnError" :: Nil
ErrorAggregator.main(args.toArray)
assert(spark.read.parquet(s"$outputPath/${ErrorAggregator.outputPrefix}").count() == 3)
assert(spark.read.parquet(s"$streamingOutputPath/${ErrorAggregator.outputPrefix}").count() == 3)
kafkaProducer.close
spark.streams.removeListener(listener)

Просмотреть файл

@ -3,36 +3,23 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package com.mozilla.telemetry.streaming
import com.mozilla.telemetry.pings.FocusEventPing
import org.apache.spark.sql.SparkSession
import org.json4s.DefaultFormats
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers, Tag}
import java.net.{URLDecoder, URLEncoder}
import org.apache.spark.sql.streaming.StreamingQueryListener
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.client.WireMock._
import com.github.tomakehurst.wiremock.core.WireMockConfiguration._
import com.github.tomakehurst.wiremock.matching.{EqualToJsonPattern, MatchResult, ValueMatcher}
import com.github.tomakehurst.wiremock.http.Request
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scalaj.http.Http
import com.github.tomakehurst.wiremock.matching.{EqualToJsonPattern, MatchResult, ValueMatcher}
import com.mozilla.telemetry.pings.FocusEventPing
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.json4s.jackson.JsonMethods._
import org.json4s.{DefaultFormats, _}
import org.scalatest._
import scala.collection.JavaConversions._
import scala.io.Source
import java.util.function.Consumer
import java.net.{URLDecoder, URLEncoder}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
class TestEventsToAmplitude extends FlatSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
object DockerEventsTag extends Tag("DockerEventsTag")
@ -128,7 +115,6 @@ class TestEventsToAmplitude extends FlatSpec with Matchers with BeforeAndAfterAl
}
"Events to Amplitude" should "send events via HTTP request" taggedAs(Kafka.DockerComposeTag, DockerEventsTag) in {
import spark.implicits._
Kafka.createTopic(EventsToAmplitude.kafkaTopic)
val kafkaProducer = Kafka.makeProducer(EventsToAmplitude.kafkaTopic)
@ -165,14 +151,15 @@ class TestEventsToAmplitude extends FlatSpec with Matchers with BeforeAndAfterAl
spark.streams.addListener(listener)
val args =
"--kafka-broker" :: Kafka.kafkaBrokers ::
"--starting-offsets" :: "latest" ::
"--url" :: s"http://$Host:$Port$path" ::
"--config-file-path" :: configFilePath ::
"--raise-on-error" :: Nil
val args = Array(
"--kafka-broker", Kafka.kafkaBrokers,
"--starting-offsets", "latest",
"--url", s"http://$Host:$Port$path",
"--config-file-path", configFilePath,
"--raise-on-error")
val opts = new EventsToAmplitude.Opts(args)
EventsToAmplitude.main(args.toArray)
EventsToAmplitude.process(opts, apiKey)
kafkaProducer.close
spark.streams.removeListener(listener)