This commit is contained in:
skaarthik 2015-12-08 16:03:58 -08:00
Родитель 8ec8ba52d1
Коммит 207b892f21
4 изменённых файлов: 285 добавлений и 0 удалений

134
scala/perf/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,134 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.spark</groupId>
<artifactId>spark-clr-perf</artifactId>
<version>${spark.version}-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>Scala Perf suite for SparkCLR</description>
<inceptionYear>2015</inceptionYear>
<licenses>
<license>
<name>MIT License</name>
<url>https://github.com/Microsoft/SparkCLR/blob/master/LICENSE</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.4</scala.version>
<spark.version>1.4.1</spark.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<!--the following is placeholder for building uber package. Please keep as-is-->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<!-- disable surefire -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.7.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>false</failOnViolation>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>scalastyle-config.xml</configLocation>
<outputFile>${basedir}/target/scalastyle-output.xml</outputFile>
<inputEncoding>${encoding}</inputEncoding>
<outputEncoding>${encoding}</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.spark.csharp
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Duration
/**
* Perf benchmark that users Freebase deletions data (available under CC0 license @ https://developers.google.com/freebase/data)
*/
object FreebaseDeletionsBenchmark {
@PerfSuite
def RunRDDLineCount(args: Array[String], sc: SparkContext): Unit = {
val startTime = System.currentTimeMillis()
val lines = sc.textFile(args(1))
val count = lines.count
val elapsed = System.currentTimeMillis() - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println("Count of lines " + count + ". Time elapsed " + elapsedDuration)
}
@PerfSuite
def RunRDDMaxDeletionsByUser(args: Array[String], sc: SparkContext): Unit = {
val startTime = System.currentTimeMillis()
val lines = sc.textFile(args(1))
val parsedRows = lines.map(s => {
val columns = s.split(',')
//data has some bad records - use bool flag to indicate corrupt rows
if (columns.length > 4)
Tuple5(true, columns(0), columns(1), columns(2), columns(3))
else
Tuple5(false, "X", "X", "X", "X") //invalid row placeholder
})
val flaggedRows = parsedRows.filter(s => s._1) //select good rows
val selectedDeletions = flaggedRows.filter(s => s._3.equals(s._5)) //select deletions made by same creators
val userDeletions = selectedDeletions.map(s => new Tuple2(s._3, 1))
val userDeletionsCount = userDeletions.reduceByKey((x, y) => x + y)
val zeroValue = ("zerovalue", 0)
val userWithMaxDeletions = userDeletionsCount.fold(zeroValue)( (kvp1, kvp2) => {
if (kvp1._2 > kvp2._2)
kvp1
else
kvp2
})
val elapsed = System.currentTimeMillis() - startTime
val elapsedDuration = new Duration(elapsed)
val totalSeconds = elapsedDuration.milliseconds/1000
PerfBenchmark.executionTimeList += totalSeconds
println(s"User with max deletions is " + userWithMaxDeletions._1 + ", count of deletions=" + userWithMaxDeletions._2 + s". Elapsed time=$elapsedDuration")
}
}

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.spark.csharp
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ListBuffer
/**
* Spark driver implementation in scala used for SparkCLR perf benchmarking
*/
object PerfBenchmark {
val perfResults = collection.mutable.Map[String, ListBuffer[Long]]()
val executionTimeList = scala.collection.mutable.ListBuffer.empty[Long]
def main(args: Array[String]): Unit = {
val PerfSuite = Class.forName("com.microsoft.spark.csharp.PerfSuite")
val sparkConf = new SparkConf().setAppName("SparkCLR perf suite - scala")
val sparkContext = new SparkContext(sparkConf)
RunPerfSuites(args, sparkContext, "com.microsoft.spark.csharp.FreebaseDeletionsBenchmark")
sparkContext.stop
ReportResult()
}
def RunPerfSuites(args: Array[String], sparkContext: SparkContext, className: String): Unit = {
val freebaseDeletionsBenchmarkClass = Class.forName(className)
val perfSuites = freebaseDeletionsBenchmarkClass.getDeclaredMethods
for ( perfSuiteMethod <- perfSuites)
{
val perfSuiteName = perfSuiteMethod.getName
if (perfSuiteName.startsWith("Run")) //TODO - use annotation type
{
executionTimeList.clear
var runCount = args(0).toInt
while (runCount > 0) {
perfSuiteMethod.invoke(freebaseDeletionsBenchmarkClass, args, sparkContext)
runCount = runCount - 1
}
val executionTimeListRef = scala.collection.mutable.ListBuffer.empty[Long]
for (v <- executionTimeList)
{
executionTimeListRef += v
}
perfResults += (perfSuiteName -> executionTimeListRef)
}
}
}
def ReportResult(): Unit = {
println("** Printing results of the perf run (scala) **")
for(result <- perfResults.keys)
{
val perfResult = perfResults(result)
//multiple enumeration happening - ignoring that for now
val min = perfResult.min
val max = perfResult.max
val runCount = perfResult.length
val avg = perfResult.sum / runCount
val values = new StringBuilder
for (value <- perfResult)
{
values.append(value + ", ")
}
println(s"** Execution time for $result in seconds. Min=$min, Max=$max, Average=$avg, Number of runs=$runCount, Individual values=$values **")
}
println("** *** **")
}
}

Просмотреть файл

@ -0,0 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package com.microsoft.spark.csharp;
/**
* Annotation used for marking a perf suite
*/
public @interface PerfSuite {}