From e57761f2c8fa6dce5c96bd0c82e5574eec3fe9d3 Mon Sep 17 00:00:00 2001 From: skaarthik Date: Thu, 10 Dec 2015 11:25:42 -0800 Subject: [PATCH] adding dataframe perf suites in scala --- .../csharp/FreebaseDeletionsBenchmark.scala | 69 ++++++++++++++++--- .../spark/csharp/PerfBenchmark.scala | 47 ++++++++----- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/scala/perf/src/main/com/microsoft/spark/csharp/FreebaseDeletionsBenchmark.scala b/scala/perf/src/main/com/microsoft/spark/csharp/FreebaseDeletionsBenchmark.scala index c12ea70..dc87423 100644 --- a/scala/perf/src/main/com/microsoft/spark/csharp/FreebaseDeletionsBenchmark.scala +++ b/scala/perf/src/main/com/microsoft/spark/csharp/FreebaseDeletionsBenchmark.scala @@ -4,31 +4,45 @@ package com.microsoft.spark.csharp import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.Duration /** - * Perf benchmark that users Freebase deletions data (available under CC0 license @ https://developers.google.com/freebase/data) + * Perf benchmark that users Freebase deletions data + * This data is licensed under CC-BY license (http://creativecommons.org/licenses/by/2.5) + * Data is available for download at https://developers.google.com/freebase/data) + * Data format - CSV, size - 8 GB uncompressed + * Columns in the dataset are + * 1. creation_timestamp (Unix epoch time in milliseconds) + * 2. creator + * 3. deletion_timestamp (Unix epoch time in milliseconds) + * 4. deletor + * 5. subject (MID) + * 6. predicate (MID) + * 7. object (MID/Literal) + * 8. language_code */ object FreebaseDeletionsBenchmark { @PerfSuite - def RunRDDLineCount(args: Array[String], sc: SparkContext): Unit = { - val startTime = System.currentTimeMillis() + def RunRDDLineCount(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = { + val startTime = System.currentTimeMillis + val lines = sc.textFile(args(1)) val count = lines.count - val elapsed = System.currentTimeMillis() - startTime + val elapsed = System.currentTimeMillis - startTime val elapsedDuration = new Duration(elapsed) val totalSeconds = elapsedDuration.milliseconds/1000 - PerfBenchmark.executionTimeList += totalSeconds println("Count of lines " + count + ". Time elapsed " + elapsedDuration) } @PerfSuite - def RunRDDMaxDeletionsByUser(args: Array[String], sc: SparkContext): Unit = { - val startTime = System.currentTimeMillis() + def RunRDDMaxDeletionsByUser(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = { + val startTime = System.currentTimeMillis + val lines = sc.textFile(args(1)) val parsedRows = lines.map(s => { val columns = s.split(',') @@ -51,15 +65,50 @@ object FreebaseDeletionsBenchmark { kvp2 }) - val elapsed = System.currentTimeMillis() - startTime + val elapsed = System.currentTimeMillis - startTime val elapsedDuration = new Duration(elapsed) val totalSeconds = elapsedDuration.milliseconds/1000 - PerfBenchmark.executionTimeList += totalSeconds - println(s"User with max deletions is " + userWithMaxDeletions._1 + ", count of deletions=" + userWithMaxDeletions._2 + s". Elapsed time=$elapsedDuration") + println(s"User with max deletions is " + userWithMaxDeletions._1 + ", count of deletions=" + + userWithMaxDeletions._2 + s". Elapsed time=$elapsedDuration") } + @PerfSuite + def RunDFLineCount(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = { + val startTime = System.currentTimeMillis + val rows = sqlContext.read.format("com.databricks.spark.csv").load(args(1)) + val rowCount = rows.count + + val elapsed = System.currentTimeMillis - startTime + val elapsedDuration = new Duration(elapsed) + val totalSeconds = elapsedDuration.milliseconds/1000 + PerfBenchmark.executionTimeList += totalSeconds + + println(s"Count of rows $rowCount. Time elapsed $elapsedDuration") + } + + @PerfSuite + def RunDFMaxDeletionsByUser(args: Array[String], sc: SparkContext, sqlContext: SQLContext): Unit = { + val startTime = System.currentTimeMillis + + val rows = sqlContext.read.format("com.databricks.spark.csv").load(args(1)) + val filtered = rows.filter("C1 = C3") + val aggregated = filtered.groupBy("C1").agg(("C1", "count")) + aggregated.registerTempTable("freebasedeletions") + val max = sqlContext.sql("select max(`COUNT(C1)`) from freebasedeletions") + val maxArray = max.collect + val maxValue = maxArray(0) + val maxDeletions = sqlContext.sql("select * from freebasedeletions where `COUNT(C1)` = " + maxValue.get(0)) + maxDeletions.show + //TODO - add perf suite for subquery + val elapsed = System.currentTimeMillis - startTime + val elapsedDuration = new Duration(elapsed) + val totalSeconds = elapsedDuration.milliseconds/1000 + PerfBenchmark.executionTimeList += totalSeconds + + println(s"User with max deletions & count of deletions is listed above. Time elapsed $elapsedDuration") + } } diff --git a/scala/perf/src/main/com/microsoft/spark/csharp/PerfBenchmark.scala b/scala/perf/src/main/com/microsoft/spark/csharp/PerfBenchmark.scala index 20fc669..47b0fab 100644 --- a/scala/perf/src/main/com/microsoft/spark/csharp/PerfBenchmark.scala +++ b/scala/perf/src/main/com/microsoft/spark/csharp/PerfBenchmark.scala @@ -3,8 +3,10 @@ package com.microsoft.spark.csharp +import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import scala.collection.mutable.ListBuffer +import scala.util.Sorting /** * Spark driver implementation in scala used for SparkCLR perf benchmarking @@ -18,33 +20,32 @@ object PerfBenchmark { val PerfSuite = Class.forName("com.microsoft.spark.csharp.PerfSuite") val sparkConf = new SparkConf().setAppName("SparkCLR perf suite - scala") val sparkContext = new SparkContext(sparkConf) + val sqlContext = new SQLContext(sparkContext) - RunPerfSuites(args, sparkContext, "com.microsoft.spark.csharp.FreebaseDeletionsBenchmark") + RunPerfSuites(args, sparkContext, sqlContext, "com.microsoft.spark.csharp.FreebaseDeletionsBenchmark") sparkContext.stop ReportResult() } - def RunPerfSuites(args: Array[String], sparkContext: SparkContext, className: String): Unit = { + def RunPerfSuites(args: Array[String], sparkContext: SparkContext, sqlContext: SQLContext, className: String): Unit = { val freebaseDeletionsBenchmarkClass = Class.forName(className) val perfSuites = freebaseDeletionsBenchmarkClass.getDeclaredMethods - for ( perfSuiteMethod <- perfSuites) - { + for ( perfSuiteMethod <- perfSuites) { val perfSuiteName = perfSuiteMethod.getName - if (perfSuiteName.startsWith("Run")) //TODO - use annotation type - { + if (perfSuiteName.startsWith("Run")) { //TODO - use annotation type executionTimeList.clear var runCount = args(0).toInt while (runCount > 0) { - perfSuiteMethod.invoke(freebaseDeletionsBenchmarkClass, args, sparkContext) + println(s"Starting perf suite $perfSuiteName, runCount=$runCount") + perfSuiteMethod.invoke(freebaseDeletionsBenchmarkClass, args, sparkContext, sqlContext: SQLContext) runCount = runCount - 1 } val executionTimeListRef = scala.collection.mutable.ListBuffer.empty[Long] - for (v <- executionTimeList) - { - executionTimeListRef += v - } + for (v <- executionTimeList) { + executionTimeListRef += v + } perfResults += (perfSuiteName -> executionTimeListRef) } } @@ -53,25 +54,37 @@ object PerfBenchmark { def ReportResult(): Unit = { println("** Printing results of the perf run (scala) **") - for(result <- perfResults.keys) - { + for(result <- perfResults.keys) { val perfResult = perfResults(result) //multiple enumeration happening - ignoring that for now val min = perfResult.min val max = perfResult.max val runCount = perfResult.length val avg = perfResult.sum / runCount - + val median = getMedian(perfResult.toList) val values = new StringBuilder - for (value <- perfResult) - { + for (value <- perfResult) { values.append(value + ", ") } - println(s"** Execution time for $result in seconds. Min=$min, Max=$max, Average=$avg, Number of runs=$runCount, Individual values=$values **") + println(s"** Execution time for $result in seconds. Min=$min, Max=$max, Average=$avg, Median=$median, Number of runs=$runCount, Individual execution duration=[$values] **") } println("** *** **") } + + def getMedian(valuesList: List[Long]) = { + val itemCount = valuesList.length + val values = valuesList.toArray + java.util.Arrays.sort(values) + if (itemCount == 1) + values(0) + + if (itemCount%2 == 0) { + (values(itemCount/2) + values(itemCount/2 - 1))/2 + } + + values((itemCount-1)/2) + } }