adding dataframe perf suites in scala

This commit is contained in:
skaarthik 2015-12-10 11:25:42 -08:00
Родитель ab37ceff91
Коммит e57761f2c8
2 изменённых файлов: 89 добавлений и 27 удалений

Просмотреть файл

@ -4,31 +4,45 @@
package com.microsoft.spark.csharp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
/**
* Perf benchmark that users Freebase deletions data (available under CC0 license @ https://developers.google.com/freebase/data)
* Perf benchmark that users Freebase deletions data
* This data is licensed under CC-BY license (http://creativecommons.org/licenses/by/2.5)
* Data is available for download at https://developers.google.com/freebase/data)
* Data format - CSV, size - 8 GB uncompressed
* Columns in the dataset are
* 1. creation_timestamp (Unix epoch time in milliseconds)
* 2. creator
* 3. deletion_timestamp (Unix epoch time in milliseconds)
* 4. deletor
* 5. subject (MID)
* 6. predicate (MID)
* 7. object (MID/Literal)
* 8. language_code
*/
object FreebaseDeletionsBenchmark {
@PerfSuite
def RunRDDLineCount(args: Array[String], sc: SparkContext): Unit = {
val startTime = System.currentTimeMillis()
def RunRDDLineCount(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = {
val startTime = System.currentTimeMillis
val lines = sc.textFile(args(1))
val count = lines.count
val elapsed = System.currentTimeMillis() - startTime
val elapsed = System.currentTimeMillis - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println("Count of lines " + count + ". Time elapsed " + elapsedDuration)
}
@PerfSuite
def RunRDDMaxDeletionsByUser(args: Array[String], sc: SparkContext): Unit = {
val startTime = System.currentTimeMillis()
def RunRDDMaxDeletionsByUser(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = {
val startTime = System.currentTimeMillis
val lines = sc.textFile(args(1))
val parsedRows = lines.map(s => {
val columns = s.split(',')
@ -51,15 +65,50 @@ object FreebaseDeletionsBenchmark {
kvp2
})
val elapsed = System.currentTimeMillis() - startTime
val elapsed = System.currentTimeMillis - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println(s"User with max deletions is " + userWithMaxDeletions._1 + ", count of deletions=" + userWithMaxDeletions._2 + s". Elapsed time=$elapsedDuration")
println(s"User with max deletions is " + userWithMaxDeletions._1 + ", count of deletions="
+ userWithMaxDeletions._2 + s". Elapsed time=$elapsedDuration")
}
@PerfSuite
def RunDFLineCount(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = {
val startTime = System.currentTimeMillis
val rows = sqlContext.read.format("com.databricks.spark.csv").load(args(1))
val rowCount = rows.count
val elapsed = System.currentTimeMillis - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println(s"Count of rows $rowCount. Time elapsed $elapsedDuration")
}
@PerfSuite
def RunDFMaxDeletionsByUser(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = {
val startTime = System.currentTimeMillis
val rows = sqlContext.read.format("com.databricks.spark.csv").load(args(1))
val filtered = rows.filter("C1 = C3")
val aggregated = filtered.groupBy("C1").agg(("C1", "count"))
aggregated.registerTempTable("freebasedeletions")
val max = sqlContext.sql("select max(`COUNT(C1)`) from freebasedeletions")
val maxArray = max.collect
val maxValue = maxArray(0)
val maxDeletions = sqlContext.sql("select * from freebasedeletions where `COUNT(C1)` = " + maxValue.get(0))
maxDeletions.show
//TODO - add perf suite for subquery
val elapsed = System.currentTimeMillis - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println(s"User with max deletions & count of deletions is listed above. Time elapsed $elapsedDuration")
}
}

Просмотреть файл

@ -3,8 +3,10 @@
package com.microsoft.spark.csharp
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ListBuffer
import scala.util.Sorting
/**
* Spark driver implementation in scala used for SparkCLR perf benchmarking
@ -18,31 +20,30 @@ object PerfBenchmark {
val PerfSuite = Class.forName("com.microsoft.spark.csharp.PerfSuite")
val sparkConf = new SparkConf().setAppName("SparkCLR perf suite - scala")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
RunPerfSuites(args, sparkContext, "com.microsoft.spark.csharp.FreebaseDeletionsBenchmark")
RunPerfSuites(args, sparkContext, sqlContext, "com.microsoft.spark.csharp.FreebaseDeletionsBenchmark")
sparkContext.stop
ReportResult()
}
def RunPerfSuites(args: Array[String], sparkContext: SparkContext, className: String): Unit = {
def RunPerfSuites(args: Array[String], sparkContext: SparkContext, sqlContext: SQLContext, className: String): Unit = {
val freebaseDeletionsBenchmarkClass = Class.forName(className)
val perfSuites = freebaseDeletionsBenchmarkClass.getDeclaredMethods
for ( perfSuiteMethod <- perfSuites)
{
for ( perfSuiteMethod <- perfSuites) {
val perfSuiteName = perfSuiteMethod.getName
if (perfSuiteName.startsWith("Run")) //TODO - use annotation type
{
if (perfSuiteName.startsWith("Run")) { //TODO - use annotation type
executionTimeList.clear
var runCount = args(0).toInt
while (runCount > 0) {
perfSuiteMethod.invoke(freebaseDeletionsBenchmarkClass, args, sparkContext)
println(s"Starting perf suite $perfSuiteName, runCount=$runCount")
perfSuiteMethod.invoke(freebaseDeletionsBenchmarkClass, args, sparkContext, sqlContext: SQLContext)
runCount = runCount - 1
}
val executionTimeListRef = scala.collection.mutable.ListBuffer.empty[Long]
for (v <- executionTimeList)
{
for (v <- executionTimeList) {
executionTimeListRef += v
}
perfResults += (perfSuiteName -> executionTimeListRef)
@ -53,25 +54,37 @@ object PerfBenchmark {
def ReportResult(): Unit = {
println("** Printing results of the perf run (scala) **")
for(result <- perfResults.keys)
{
for(result <- perfResults.keys) {
val perfResult = perfResults(result)
//multiple enumeration happening - ignoring that for now
val min = perfResult.min
val max = perfResult.max
val runCount = perfResult.length
val avg = perfResult.sum / runCount
val median = getMedian(perfResult.toList)
val values = new StringBuilder
for (value <- perfResult)
{
for (value <- perfResult) {
values.append(value + ", ")
}
println(s"** Execution time for $result in seconds. Min=$min, Max=$max, Average=$avg, Number of runs=$runCount, Individual values=$values **")
println(s"** Execution time for $result in seconds. Min=$min, Max=$max, Average=$avg, Median=$median, Number of runs=$runCount, Individual execution duration=[$values] **")
}
println("** *** **")
}
def getMedian(valuesList: List[Long]) = {
val itemCount = valuesList.length
val values = valuesList.toArray
java.util.Arrays.sort(values)
if (itemCount == 1)
values(0)
if (itemCount%2 == 0) {
(values(itemCount/2) + values(itemCount/2 - 1))/2
}
values((itemCount-1)/2)
}
}