Merge branch 'master' into wip-merge-master

Conflicts:
	bagel/pom.xml
	core/pom.xml
	core/src/test/scala/org/apache/spark/ui/UISuite.scala
	examples/pom.xml
	mllib/pom.xml
	pom.xml
	project/SparkBuild.scala
	repl/pom.xml
	streaming/pom.xml
	tools/pom.xml

In scala 2.10, a shorter representation is used for naming artifacts
 so changed to shorter scala version for artifacts and made it a property in pom.
This commit is contained in:
Prashant Sharma 2013-10-08 11:29:40 +05:30
Родитель 3e41495288 ea34c52102
Коммит 7be75682b9
28 изменённых файлов: 314 добавлений и 115 удалений

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly</artifactId>
<artifactId>spark-assembly_${scala-short.version}</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.incubator.apache.org/</url>
@ -41,27 +41,27 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib</artifactId>
<artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl</artifactId>
<artifactId>spark-repl_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming</artifactId>
<artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -104,13 +104,13 @@
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource>
</transformer>
</transformers>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
@ -128,7 +128,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn</artifactId>
<artifactId>spark-yarn_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
<url>http://spark.incubator.apache.org/</url>
@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
@ -39,7 +39,6 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
@ -199,14 +198,14 @@
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<property name="spark.classpath" refid="maven.test.classpath"/>
<property environment="env"/>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME"/>
<isset property="env.SCALA_LIBRARY_PATH"/>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>

Просмотреть файл

@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD
sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[String]
/** Keys of RDD splits that are being computed/loaded. */
private val loading = new HashSet[String]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
case Some(cachedValues) =>
// Partition is in cache, so just return its values
logInfo("Found partition in cache!")
return cachedValues.asInstanceOf[Iterator[T]]
case Some(values) =>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
case None =>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo("Loading contains " + key + ", waiting...")
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>}
}
logInfo("Loading no longer contains " + key + ", so returning cached result")
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
case None =>
logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key)
}
} else {
@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
logInfo("Computing partition " + split)
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }

Просмотреть файл

@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@ -85,9 +86,11 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val environment: Map[String, String] = Map(),
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
// This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
scala.collection.immutable.Map())
extends Logging {
// Ensure logging is initialized before we spawn any threads
@ -240,7 +243,8 @@ class SparkContext(
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
@ -339,6 +343,8 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
@ -349,10 +355,27 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
) : RDD[(K, V)] = {
val conf = new JobConf(hadoopConfiguration)
FileInputFormat.setInputPaths(conf, path)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
* Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
* that has already been broadcast, assuming that it's safe to use it to construct a
* HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
*/
def hadoopFile[K, V](
path: String,
confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
): RDD[(K, V)] = {
new HadoopFileRDD(
this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**

Просмотреть файл

@ -17,12 +17,13 @@
package org.apache.spark.api.python
import org.apache.spark.Partitioner
import java.util.Arrays
import org.apache.spark.Partitioner
import org.apache.spark.util.Utils
/**
* A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
* A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API.
*
* Stores the unique id() of the Python-side partitioning function so that it is incorporated into
* equality comparisons. Correctness requires that the id is a unique identifier for the
@ -30,6 +31,7 @@ import org.apache.spark.util.Utils
* function). This can be ensured by using the Python id() function and maintaining a reference
* to the Python partitioning function so that its id() is not reused.
*/
private[spark] class PythonPartitioner(
override val numPartitions: Int,
val pyPartitionFunctionId: Long)
@ -37,7 +39,9 @@ private[spark] class PythonPartitioner(
override def getPartition(key: Any): Int = key match {
case null => 0
case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
// we don't trust the Python partition function to return valid partition ID's so
// let's do a modulo numPartitions in any case
case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}

Просмотреть файл

@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg)
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev) {
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (a, b)
case Seq(a, b) => (Utils.deserializeLongValue(a), b)
case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
}
val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
}
private[spark] object PythonRDD {

Просмотреть файл

@ -41,6 +41,7 @@ private[spark] object JsonProtocol {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("name" -> obj.desc.name) ~
("appuiurl" -> obj.appUiUrl) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
@ -64,7 +65,7 @@ private[spark] object JsonProtocol {
}
def writeMasterState(obj: MasterStateResponse) = {
("url" -> ("spark://" + obj.uri)) ~
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~

Просмотреть файл

@ -16,6 +16,9 @@
*/
package org.apache.spark.deploy
import com.google.common.collect.MapMaker
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
*/
class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
// subsystems
def newConfiguration(): Configuration = new Configuration()
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
// cluster
def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false }

Просмотреть файл

@ -19,6 +19,7 @@ package org.apache.spark.rdd
import java.io.EOFException
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
/**
* An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
* system, or S3).
* This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
* across multiple reads; the 'path' is the only variable that is different across new JobConfs
* created from the Configuration.
*/
class HadoopFileRDD[K, V](
sc: SparkContext,
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a new JobConf, set the input file/directory paths to read from, and cache the
// JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
// HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
// getJobConf() calls for this RDD in the local process.
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}
}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
* An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
* system, or S3, tables in HBase, etc).
* A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
class HadoopRDD[K, V](
sc: SparkContext,
@transient conf: JobConf,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
def this(
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
inputFormatClass,
keyClass,
valueClass,
minSplits)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
return conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
}
// Once an InputFormat for this RDD is created, cache it so that only one reflection call is
// done in each local process.
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
if (newInputFormat.isInstanceOf[Configurable]) {
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
return newInputFormat
}
override def getPartitions: Array[Partition] = {
val env = SparkEnv.get
env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
@ -75,22 +164,14 @@ class HadoopRDD[K, V](
array
}
def createInputFormat(conf: JobConf): InputFormat[K, V] = {
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
}
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
@ -127,5 +208,18 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
def getConf: Configuration = confBroadcast.value.value
def getConf: Configuration = getJobConf()
}
private[spark] object HadoopRDD {
/**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}

Просмотреть файл

@ -523,7 +523,17 @@ private[spark] class BlockManager(
* Get a block from the block manager (either local or remote).
*/
def get(blockId: String): Option[Iterator[Any]] = {
getLocal(blockId).orElse(getRemote(blockId))
val local = getLocal(blockId)
if (local.isDefined) {
logInfo("Found block %s locally".format(blockId))
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
}
/**

Просмотреть файл

@ -77,6 +77,19 @@ private[spark] object Utils extends Logging {
return ois.readObject.asInstanceOf[T]
}
/** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
def deserializeLongValue(bytes: Array[Byte]) : Long = {
// Note: we assume that we are given a Long value encoded in network (big-endian) byte order
var result = bytes(7) & 0xFFL
result = result + ((bytes(6) & 0xFFL) << 8)
result = result + ((bytes(5) & 0xFFL) << 16)
result = result + ((bytes(4) & 0xFFL) << 24)
result = result + ((bytes(3) & 0xFFL) << 32)
result = result + ((bytes(2) & 0xFFL) << 40)
result = result + ((bytes(1) & 0xFFL) << 48)
result + ((bytes(0) & 0xFFL) << 56)
}
/** Serialize via nested stream using specific serializer */
def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
val osWrapper = ser.serializeStream(new OutputStream {

Просмотреть файл

@ -29,7 +29,9 @@ import org.apache.spark.SparkContext._
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("local metrics") {
// TODO: This test has a race condition since the DAGScheduler now reports results
// asynchronously. It needs to be updated for that patch.
ignore("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
@ -43,6 +45,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count
Thread.sleep(1000)
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@ -54,6 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
d4.collectAsMap
Thread.sleep(1000)
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms

Просмотреть файл

@ -25,6 +25,13 @@ import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 4040
val server = new Server(startPort)
Try { server.start() } match {
case Success(s) =>
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
// Allow some wiggle room in case ports on the machine are under contention

Просмотреть файл

@ -20,6 +20,7 @@ package org.apache.spark.util
import com.google.common.base.Charsets
import com.google.common.io.Files
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
import java.nio.{ByteBuffer, ByteOrder}
import org.scalatest.FunSuite
import org.apache.commons.io.FileUtils
import scala.util.Random
@ -135,5 +136,15 @@ class UtilsSuite extends FunSuite {
FileUtils.deleteDirectory(tmpDir2)
}
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)
assert(bbuf.hasArray)
bbuf.order(ByteOrder.BIG_ENDIAN)
bbuf.putLong(testval)
assert(bbuf.array.length === 8)
assert(Utils.deserializeLongValue(bbuf.array) === testval)
}
}

Просмотреть файл

@ -6,7 +6,7 @@
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>{{ page.title }} - Spark {{site.SPARK_VERSION}} Documentation</title>
<title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title>
<meta name="description" content="">
<link rel="stylesheet" href="css/bootstrap.min.css">
@ -109,7 +109,7 @@
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION}}</span></p>-->
<!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION_SHORT}}</span></p>-->
</div>
</div>
</div>

Просмотреть файл

@ -70,7 +70,7 @@ def parse_args():
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)")
parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
parser.add_option("-v", "--spark-version", default="0.7.3",
parser.add_option("-v", "--spark-version", default="0.8.0",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo",
default="https://github.com/mesos/spark",
@ -155,7 +155,7 @@ def is_active(instance):
# Return correct versions of Spark and Shark, given the supplied Spark version
def get_spark_shark_version(opts):
spark_shark_map = {"0.7.3": "0.7.0"}
spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
print >> stderr, "Don't know about Spark version: %s" % version

Просмотреть файл

@ -26,33 +26,41 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-examples</artifactId>
<artifactId>spark-examples_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Examples</name>
<url>http://spark.incubator.apache.org/</url>
<repositories>
<!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
<repository>
<id>lib</id>
<url>file://${project.basedir}/lib</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming</artifactId>
<artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib</artifactId>
<artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
@ -71,6 +79,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka</artifactId>
<version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -161,7 +175,7 @@
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib</artifactId>
<artifactId>spark-mllib_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project ML Library</name>
<url>http://spark.incubator.apache.org/</url>
@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -48,12 +48,12 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<artifactId>scalatest_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.10</artifactId>
<artifactId>scalacheck_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>

Просмотреть файл

@ -40,6 +40,7 @@
<connection>scm:git:git@github.com:apache/incubator-spark.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-spark.git</developerConnection>
<url>scm:git:git@github.com:apache/incubator-spark.git</url>
<tag>HEAD</tag>
</scm>
<developers>
<developer>
@ -598,7 +599,7 @@
<junitxml>.</junitxml>
<filereports>${project.build.directory}/SparkTestSuite.txt</filereports>
<argLine>-Xms64m -Xmx3g</argLine>
<stderr/>
<stderr />
</configuration>
<executions>
<execution>

Просмотреть файл

@ -153,6 +153,7 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
@ -179,6 +180,7 @@ object SparkBuild extends Build {
val slf4jVersion = "1.7.2"
val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject")
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
val excludeAsm = ExclusionRule(organization = "asm")
@ -202,7 +204,6 @@ object SparkBuild extends Build {
"commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" %% "akka-remote" % "2.2.1" excludeAll(excludeNetty),
"com.typesafe.akka" %% "akka-slf4j" % "2.2.1" excludeAll(excludeNetty),
"net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
@ -220,7 +221,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)
)
)
def rootSettings = sharedSettings ++ Seq(
@ -250,6 +251,7 @@ object SparkBuild extends Build {
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
excludeAll(excludeSnappy)
excludeAll(excludeCglib)
)
) ++ assemblySettings ++ extraAssemblySettings
@ -289,10 +291,10 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
)
)

Просмотреть файл

@ -29,7 +29,7 @@ from threading import Thread
from pyspark import cloudpickle
from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
read_from_pickle_file
read_from_pickle_file, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@ -690,11 +690,13 @@ class RDD(object):
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
for (k, v) in iterator:
buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
yield str(split)
yield pack_long(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
@ -831,8 +833,8 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
def subtract(self, other, numPartitions=None):

Просмотреть файл

@ -67,6 +67,10 @@ def write_long(value, stream):
stream.write(struct.pack("!q", value))
def pack_long(value):
return struct.pack("!q", value)
def read_int(stream):
length = stream.read(4)
if length == "":

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl-bin</artifactId>
<artifactId>spark-repl-bin_${scala-short.version}</artifactId>
<packaging>pom</packaging>
<name>Spark Project REPL binary packaging</name>
<url>http://spark.incubator.apache.org/</url>
@ -40,18 +40,18 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl</artifactId>
<artifactId>spark-repl_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
@ -89,7 +89,7 @@
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl</artifactId>
<artifactId>spark-repl_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project REPL</name>
<url>http://spark.incubator.apache.org/</url>
@ -39,18 +39,18 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib</artifactId>
<artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
@ -103,14 +103,14 @@
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<property name="spark.classpath" refid="maven.test.classpath"/>
<property environment="env"/>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME"/>
<isset property="env.SCALA_LIBRARY_PATH"/>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>

Просмотреть файл

@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming</artifactId>
<artifactId>spark-streaming_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Streaming</name>
<url>http://spark.incubator.apache.org/</url>
@ -42,7 +42,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -58,6 +58,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka</artifactId>
<version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>

Просмотреть файл

@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tools</artifactId>
<artifactId>spark-tools_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Tools</name>
<url>http://spark.incubator.apache.org/</url>
@ -33,12 +33,12 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming</artifactId>
<artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

Просмотреть файл

@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn</artifactId>
<artifactId>spark-yarn_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -97,7 +97,7 @@
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>