Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
Mosharaf Chowdhury 2012-10-02 22:17:17 -07:00
Родитель 288e1c99ab 87f4451f20
Коммит ff813e4380
125 изменённых файлов: 755 добавлений и 439 удалений

Просмотреть файл

@ -12,10 +12,8 @@ This README file only contains basic setup instructions.
## Building
Spark requires Scala 2.9.1. This version has been tested with 2.9.1.final.
The project is built using Simple Build Tool (SBT), which is packaged with it.
To build Spark and its example programs, run:
Spark requires Scala 2.9.2. The project is built using Simple Build Tool (SBT),
which is packaged with it. To build Spark and its example programs, run:
sbt/sbt compile

Просмотреть файл

@ -2,7 +2,7 @@ package spark
import scala.collection.mutable.HashMap
class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
}

Просмотреть файл

@ -11,8 +11,7 @@ import spark.storage.BlockManagerId
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager

Просмотреть файл

@ -9,7 +9,7 @@ import java.util.LinkedHashMap
* some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
* when most of the space is used by arrays of primitives or of simple classes.
*/
class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
def this() {
@ -104,9 +104,9 @@ class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
}
// An entry in our map; stores a cached object and its size in bytes
case class Entry(value: Any, size: Long)
private[spark] case class Entry(value: Any, size: Long)
object BoundedMemoryCache {
private[spark] object BoundedMemoryCache {
/**
* Get maximum cache capacity from system configuration
*/

Просмотреть файл

@ -2,9 +2,9 @@ package spark
import java.util.concurrent.atomic.AtomicInteger
sealed trait CachePutResponse
case class CachePutSuccess(size: Long) extends CachePutResponse
case class CachePutFailure() extends CachePutResponse
private[spark] sealed trait CachePutResponse
private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
private[spark] case class CachePutFailure() extends CachePutResponse
/**
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
@ -22,7 +22,7 @@ case class CachePutFailure() extends CachePutResponse
* This abstract class handles the creation of key spaces, so that subclasses need only deal with
* keys that are unique across modules.
*/
abstract class Cache {
private[spark] abstract class Cache {
private val nextKeySpaceId = new AtomicInteger(0)
private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
@ -52,7 +52,7 @@ abstract class Cache {
/**
* A key namespace in a Cache.
*/
class KeySpace(cache: Cache, val keySpaceId: Int) {
private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
def get(datasetId: Any, partition: Int): Any =
cache.get((keySpaceId, datasetId), partition)

Просмотреть файл

@ -15,19 +15,20 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManager
import spark.storage.StorageLevel
sealed trait CacheTrackerMessage
case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
case class MemoryCacheLost(host: String) extends CacheTrackerMessage
case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage
case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage
case object GetCacheStatus extends CacheTrackerMessage
case object GetCacheLocations extends CacheTrackerMessage
case object StopCacheTracker extends CacheTrackerMessage
private[spark] sealed trait CacheTrackerMessage
class CacheTrackerActor extends Actor with Logging {
private[spark] case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
private[spark] case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
extends CacheTrackerMessage
private[spark] case class MemoryCacheLost(host: String) extends CacheTrackerMessage
private[spark] case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage
private[spark] case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage
private[spark] case object GetCacheStatus extends CacheTrackerMessage
private[spark] case object GetCacheLocations extends CacheTrackerMessage
private[spark] case object StopCacheTracker extends CacheTrackerMessage
private[spark] class CacheTrackerActor extends Actor with Logging {
// TODO: Should probably store (String, CacheType) tuples
private val locs = new HashMap[Int, Array[List[String]]]
@ -89,7 +90,7 @@ class CacheTrackerActor extends Actor with Logging {
}
}
class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager)
private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager)
extends Logging {
// Tracker actor on the master, or remote reference to it on workers

Просмотреть файл

@ -1,5 +1,6 @@
package spark
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index: Int = idx
}

Просмотреть файл

@ -9,7 +9,7 @@ import org.objectweb.asm.{ClassReader, MethodVisitor, Type}
import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
object ClosureCleaner extends Logging {
private[spark] object ClosureCleaner extends Logging {
// Get an ASM class reader for a given class from the JAR that loaded it
private def getClassReader(cls: Class[_]): ClassReader = {
new ClassReader(cls.getResourceAsStream(
@ -154,7 +154,7 @@ object ClosureCleaner extends Logging {
}
}
class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
return new EmptyVisitor {
@ -180,7 +180,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor
}
}
class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor {
var myName: String = null
override def visit(version: Int, access: Int, name: String, sig: String,

Просмотреть файл

@ -6,16 +6,17 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}
class CoGroupAggregator
private[spark] class CoGroupAggregator
extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },

Просмотреть файл

@ -2,7 +2,7 @@ package spark
import spark.storage.BlockManagerId
class FetchFailedException(
private[spark] class FetchFailedException(
val bmAddress: BlockManagerId,
val shuffleId: Int,
val mapId: Int,

Просмотреть файл

@ -18,7 +18,7 @@ import org.apache.hadoop.util.ReflectionUtils
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
private[spark] class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split
with Serializable {

Просмотреть файл

@ -5,7 +5,7 @@ import java.net.URL
import scala.collection.mutable.HashMap
import org.apache.hadoop.fs.FileUtil
class HttpFileServer extends Logging {
private[spark] class HttpFileServer extends Logging {
var baseDir : File = null
var fileDir : File = null

Просмотреть файл

@ -12,14 +12,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool
/**
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
*/
class ServerStateException(message: String) extends Exception(message)
private[spark] class ServerStateException(message: String) extends Exception(message)
/**
* An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
class HttpServer(resourceBase: File) extends Logging {
private[spark] class HttpServer(resourceBase: File) extends Logging {
private var server: Server = null
private var port: Int = -1

Просмотреть файл

@ -5,14 +5,14 @@ import java.nio.ByteBuffer
import spark.util.ByteBufferInputStream
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
@ -23,7 +23,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}
class JavaSerializerInstance extends SerializerInstance {
private[spark] class JavaSerializerInstance extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
@ -57,6 +57,6 @@ class JavaSerializerInstance extends SerializerInstance {
}
}
class JavaSerializer extends Serializer {
private[spark] class JavaSerializer extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}

Просмотреть файл

@ -20,7 +20,7 @@ import spark.storage._
* Zig-zag encoder used to write object sizes to serialization streams.
* Based on Kryo's integer encoder.
*/
object ZigZag {
private[spark] object ZigZag {
def writeInt(n: Int, out: OutputStream) {
var value = n
if ((value & ~0x7F) == 0) {
@ -68,6 +68,7 @@ object ZigZag {
}
}
private[spark]
class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
@ -85,6 +86,7 @@ extends SerializationStream {
def close() { out.close() }
}
private[spark]
class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
extends DeserializationStream {
def readObject[T](): T = {
@ -95,7 +97,7 @@ extends DeserializationStream {
def close() { in.close() }
}
class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
val kryo = ks.kryo
val threadBuffer = ks.threadBuffer.get()
val objectBuffer = ks.objectBuffer.get()

Просмотреть файл

@ -16,11 +16,11 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManagerId
sealed trait MapOutputTrackerMessage
case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
case GetMapOutputLocations(shuffleId: Int) =>
logInfo("Asked to get map output locations for shuffle " + shuffleId)
@ -33,7 +33,7 @@ class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Loggin
}
}
class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
val ip: String = System.getProperty("spark.master.host", "localhost")
val port: Int = System.getProperty("spark.master.port", "7077").toInt
val actorName: String = "MapOutputTracker"

Просмотреть файл

@ -13,6 +13,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {

Просмотреть файл

@ -459,6 +459,6 @@ class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]
}
}
object Manifests {
private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
}

Просмотреть файл

@ -3,7 +3,7 @@ package spark
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
class ParallelCollectionSplit[T: ClassManifest](
private[spark] class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long,
val slice: Int,
values: Seq[T])

Просмотреть файл

@ -4,6 +4,7 @@ import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index
}

Просмотреть файл

@ -12,14 +12,14 @@ import spark.util.ByteBufferInputStream
* A serializer. Because some serialization libraries are not thread safe, this class is used to
* create SerializerInstances that do the actual serialization.
*/
trait Serializer {
private[spark] trait Serializer {
def newInstance(): SerializerInstance
}
/**
* An instance of the serializer, for use by one thread at a time.
*/
trait SerializerInstance {
private[spark] trait SerializerInstance {
def serialize[T](t: T): ByteBuffer
def deserialize[T](bytes: ByteBuffer): T
@ -50,7 +50,7 @@ trait SerializerInstance {
/**
* A stream for writing serialized objects.
*/
trait SerializationStream {
private[spark] trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
def close(): Unit
@ -66,7 +66,7 @@ trait SerializationStream {
/**
* A stream for reading serialized objects.
*/
trait DeserializationStream {
private[spark] trait DeserializationStream {
def readObject[T](): T
def close(): Unit

Просмотреть файл

@ -1,6 +1,6 @@
package spark
abstract class ShuffleFetcher {
private[spark] abstract class ShuffleFetcher {
// Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly
// once on each key-value pair obtained.
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit)

Просмотреть файл

@ -4,7 +4,7 @@ import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
class ShuffledRDDSplit(val idx: Int) extends Split {
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
}

Просмотреть файл

@ -22,7 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet
* Based on the following JavaWorld article:
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
*/
object SizeEstimator extends Logging {
private[spark] object SizeEstimator extends Logging {
// Sizes of primitive types
private val BYTE_SIZE = 1

Просмотреть файл

@ -5,7 +5,7 @@ import com.google.common.collect.MapMaker
/**
* An implementation of Cache that uses soft references.
*/
class SoftReferenceCache extends Cache {
private[spark] class SoftReferenceCache extends Cache {
val map = new MapMaker().softValues().makeMap[Any, Any]()
override def get(datasetId: Any, partition: Int): Any =

Просмотреть файл

@ -55,7 +55,7 @@ class SparkContext(
val sparkHome: String,
val jars: Seq[String])
extends Logging {
def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
// Ensure logging is initialized before we spawn any threads
@ -78,30 +78,30 @@ class SparkContext(
true,
isLocal)
SparkEnv.set(env)
// Used to store a URL for each static file/jar together with the file's local timestamp
val addedFiles = HashMap[String, Long]()
val addedJars = HashMap[String, Long]()
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[([0-9]+),([0-9]+),([0-9]+)]""".r
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
master match {
case "local" =>
case "local" =>
new LocalScheduler(1, 0, this)
case LOCAL_N_REGEX(threads) =>
case LOCAL_N_REGEX(threads) =>
new LocalScheduler(threads.toInt, 0, this)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
@ -112,10 +112,21 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
val sparkMemEnv = System.getenv("SPARK_MEM")
val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
if (sparkMemEnvInt > memoryPerSlaveInt) {
throw new SparkException(
"Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
memoryPerSlaveInt, sparkMemEnvInt))
}
val scheduler = new ClusterScheduler(this)
val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
@ -140,13 +151,13 @@ class SparkContext(
taskScheduler.start()
private var dagScheduler = new DAGScheduler(taskScheduler)
// Methods for creating RDDs
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
new ParallelCollection[T](this, seq, numSlices)
}
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = {
parallelize(seq, numSlices)
}
@ -187,14 +198,14 @@ class SparkContext(
}
/**
* Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
: RDD[(K, V)] = {
hadoopFile(path,
fm.erasure.asInstanceOf[Class[F]],
fm.erasure.asInstanceOf[Class[F]],
km.erasure.asInstanceOf[Class[K]],
vm.erasure.asInstanceOf[Class[V]],
minSplits)
@ -215,7 +226,7 @@ class SparkContext(
new Configuration)
}
/**
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@ -231,7 +242,7 @@ class SparkContext(
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
/**
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
@ -257,14 +268,14 @@ class SparkContext(
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
* Version of sequenceFile() for types implicitly convertible to Writables through a
* WritableConverter.
*
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support
* both subclasses of Writable and types for which we define a converter (e.g. Int to
* both subclasses of Writable and types for which we define a converter (e.g. Int to
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
* have a parameterized singleton object). We use functions instead to create a new converter
* have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
@ -289,7 +300,7 @@ class SparkContext(
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T: ClassManifest](
path: String,
path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
@ -318,7 +329,7 @@ class SparkContext(
/**
* Create an accumulator from a "mutable collection" type.
*
*
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
@ -329,7 +340,7 @@ class SparkContext(
// Keep around a weak hash map of values to Cached versions?
def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal)
// Adds a file dependency to all Tasks executed in the future.
def addFile(path: String) {
val uri = new URI(path)
@ -338,11 +349,11 @@ class SparkContext(
case _ => path
}
addedFiles(key) = System.currentTimeMillis
// Fetch the file locally in case the task is executed locally
val filename = new File(path.split("/").last)
Utils.fetchFile(path, new File("."))
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
@ -350,7 +361,7 @@ class SparkContext(
addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedFiles.clear()
}
// Adds a jar dependency to all Tasks executed in the future.
def addJar(path: String) {
val uri = new URI(path)
@ -366,7 +377,7 @@ class SparkContext(
addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedJars.clear()
}
// Stop the SparkContext
def stop() {
dagScheduler.stop()
@ -400,7 +411,7 @@ class SparkContext(
/**
* Run a function on a given set of partitions in an RDD and return the results. This is the main
* entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies
* whether the scheduler can run the computation on the master rather than shipping it out to the
* whether the scheduler can run the computation on the master rather than shipping it out to the
* cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
@ -419,13 +430,13 @@ class SparkContext(
def runJob[T, U: ClassManifest](
rdd: RDD[T],
func: Iterator[T] => U,
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
@ -472,7 +483,7 @@ class SparkContext(
private[spark] def newShuffleId(): Int = {
nextShuffleId.getAndIncrement()
}
private var nextRddId = new AtomicInteger(0)
// Register a new RDD, returning its RDD ID
@ -500,7 +511,7 @@ object SparkContext {
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
@ -521,7 +532,7 @@ object SparkContext {
implicit def longToLongWritable(l: Long) = new LongWritable(l)
implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
@ -532,7 +543,7 @@ object SparkContext {
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
@ -576,7 +587,7 @@ object SparkContext {
Nil
}
}
// Find the JAR that contains the class of a particular object
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
}
@ -589,7 +600,7 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
class WritableConverter[T](
private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

Просмотреть файл

@ -7,10 +7,16 @@ import spark.storage.BlockManagerId
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
sealed trait TaskEndReason
private[spark] sealed trait TaskEndReason
case object Success extends TaskEndReason
private[spark] case object Success extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark]
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
case class ExceptionFailure(exception: Throwable) extends TaskEndReason
case class OtherFailure(message: String) extends TaskEndReason
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason

Просмотреть файл

@ -2,7 +2,7 @@ package spark
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
object TaskState
private[spark] object TaskState
extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value

Просмотреть файл

@ -2,7 +2,7 @@ package spark
import scala.collection.mutable.ArrayBuffer
class UnionSplit[T: ClassManifest](
private[spark] class UnionSplit[T: ClassManifest](
idx: Int,
rdd: RDD[T],
split: Split)

Просмотреть файл

@ -7,7 +7,7 @@ import scala.runtime.AbstractFunction1
* apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
* isn't marked to allow that).
*/
abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
@throws(classOf[Exception])
def call(t: T): R

Просмотреть файл

@ -7,7 +7,7 @@ import scala.runtime.AbstractFunction2
* apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
* isn't marked to allow that).
*/
abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2): R

Просмотреть файл

@ -11,7 +11,7 @@ import scala.math
import spark._
import spark.storage.StorageLevel
class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id)
with Logging
with Serializable {
@ -1027,7 +1027,7 @@ class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Lo
}
}
class BitTorrentBroadcastFactory
private[spark] class BitTorrentBroadcastFactory
extends BroadcastFactory {
def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }

Просмотреть файл

@ -14,6 +14,7 @@ abstract class Broadcast[T](id: Long) extends Serializable {
override def toString = "spark.Broadcast(" + id + ")"
}
private[spark]
class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
private var initialized = false

Просмотреть файл

@ -6,7 +6,7 @@ package spark.broadcast
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
trait BroadcastFactory {
private[spark] trait BroadcastFactory {
def initialize(isMaster: Boolean): Unit
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit

Просмотреть файл

@ -12,7 +12,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@ -46,7 +46,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
}
class HttpBroadcastFactory extends BroadcastFactory {
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =

Просмотреть файл

@ -382,10 +382,10 @@ extends Logging {
}
}
case class BroadcastBlock(blockID: Int, byteArray: Array[Byte])
private[spark] case class BroadcastBlock(blockID: Int, byteArray: Array[Byte])
extends Serializable
case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock],
private[spark] case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock],
totalBlocks: Int,
totalBytes: Int)
extends Serializable {

Просмотреть файл

@ -7,7 +7,7 @@ import spark._
/**
* Used to keep and pass around information of peers involved in a broadcast
*/
case class SourceInfo (hostAddress: String,
private[spark] case class SourceInfo (hostAddress: String,
listenPort: Int,
totalBlocks: Int = SourceInfo.UnusedParam,
totalBytes: Int = SourceInfo.UnusedParam)
@ -26,7 +26,7 @@ extends Comparable[SourceInfo] with Logging {
/**
* Helper Object of SourceInfo for its constants
*/
object SourceInfo {
private[spark] object SourceInfo {
// Constants for special values of listenPort
val TxNotStartedRetry = -1
val TxOverGoToDefault = 0

Просмотреть файл

@ -10,7 +10,7 @@ import scala.math
import spark._
import spark.storage.StorageLevel
class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@ -572,7 +572,7 @@ extends Broadcast[T](id) with Logging with Serializable {
}
}
class TreeBroadcastFactory
private[spark] class TreeBroadcastFactory
extends BroadcastFactory {
def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) }

Просмотреть файл

@ -2,7 +2,7 @@ package spark.deploy
import scala.collection.Map
case class Command(
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String]) {

Просмотреть файл

@ -7,13 +7,15 @@ import scala.collection.immutable.List
import scala.collection.mutable.HashMap
sealed trait DeployMessage extends Serializable
private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
private[spark]
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
private[spark]
case class ExecutorStateChanged(
jobId: String,
execId: Int,
@ -23,11 +25,11 @@ case class ExecutorStateChanged(
// Master to Worker
case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
private[spark] case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
case class LaunchExecutor(
private[spark] case class LaunchExecutor(
jobId: String,
execId: Int,
jobDesc: JobDescription,
@ -38,33 +40,42 @@ case class LaunchExecutor(
// Client to Master
case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
// Master to Client
private[spark]
case class RegisteredJob(jobId: String) extends DeployMessage
private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark]
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
private[spark]
case class JobKilled(message: String)
// Internal message in Client
case object StopClient
private[spark] case object StopClient
// MasterWebUI To Master
case object RequestMasterState
private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark]
case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
completedJobs: List[JobInfo])
// WorkerWebUI to Worker
case object RequestWorkerState
private[spark] case object RequestWorkerState
// Worker to WorkerWebUI
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)

Просмотреть файл

@ -1,6 +1,6 @@
package spark.deploy
object ExecutorState
private[spark] object ExecutorState
extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value

Просмотреть файл

@ -1,6 +1,6 @@
package spark.deploy
class JobDescription(
private[spark] class JobDescription(
val name: String,
val cores: Int,
val memoryPerSlave: Int,

Просмотреть файл

@ -9,6 +9,7 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
private[spark]
class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
val localIpAddress = Utils.localIpAddress

Просмотреть файл

@ -16,7 +16,7 @@ import akka.dispatch.Await
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
* and a listener for job events, and calls back the listener when various events occur.
*/
class Client(
private[spark] class Client(
actorSystem: ActorSystem,
masterUrl: String,
jobDescription: JobDescription,

Просмотреть файл

@ -7,7 +7,7 @@ package spark.deploy.client
*
* Users of this API should *not* block inside the callback methods.
*/
trait ClientListener {
private[spark] trait ClientListener {
def connected(jobId: String): Unit
def disconnected(): Unit

Просмотреть файл

@ -4,7 +4,7 @@ import spark.util.AkkaUtils
import spark.{Logging, Utils}
import spark.deploy.{Command, JobDescription}
object TestClient {
private[spark] object TestClient {
class TestListener extends ClientListener with Logging {
def connected(id: String) {

Просмотреть файл

@ -1,6 +1,6 @@
package spark.deploy.client
object TestExecutor {
private[spark] object TestExecutor {
def main(args: Array[String]) {
println("Hello world!")
while (true) {

Просмотреть файл

@ -2,7 +2,7 @@ package spark.deploy.master
import spark.deploy.ExecutorState
class ExecutorInfo(
private[spark] class ExecutorInfo(
val id: Int,
val job: JobInfo,
val worker: WorkerInfo,

Просмотреть файл

@ -5,6 +5,7 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
private[spark]
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]

Просмотреть файл

@ -1,6 +1,6 @@
package spark.deploy.master
object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
private[spark] object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value

Просмотреть файл

@ -14,7 +14,7 @@ import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
var nextJobNumber = 0
@ -212,7 +212,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
}
object Master {
private[spark] object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)

Просмотреть файл

@ -6,7 +6,7 @@ import spark.Utils
/**
* Command-line parser for the master.
*/
class MasterArguments(args: Array[String]) {
private[spark] class MasterArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var port = 7077
var webUiPort = 8080

Просмотреть файл

@ -10,6 +10,7 @@ import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"

Просмотреть файл

@ -3,7 +3,7 @@ package spark.deploy.master
import akka.actor.ActorRef
import scala.collection.mutable
class WorkerInfo(
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,

Просмотреть файл

@ -13,7 +13,7 @@ import spark.deploy.ExecutorStateChanged
/**
* Manages the execution of one executor process.
*/
class ExecutorRunner(
private[spark] class ExecutorRunner(
val jobId: String,
val execId: Int,
val jobDesc: JobDescription,

Просмотреть файл

@ -16,7 +16,7 @@ import spark.deploy.RegisterWorkerFailed
import akka.actor.Terminated
import java.io.File
class Worker(
private[spark] class Worker(
ip: String,
port: Int,
webUiPort: Int,
@ -170,7 +170,7 @@ class Worker(
}
}
object Worker {
private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)

Просмотреть файл

@ -8,7 +8,7 @@ import java.lang.management.ManagementFactory
/**
* Command-line parser for the master.
*/
class WorkerArguments(args: Array[String]) {
private[spark] class WorkerArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var port = 0
var webUiPort = 8081

Просмотреть файл

@ -9,6 +9,7 @@ import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"

Просмотреть файл

@ -16,7 +16,7 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
class Executor extends Logging {
private[spark] class Executor extends Logging {
var urlClassLoader : ExecutorURLClassLoader = null
var threadPool: ExecutorService = null
var env: SparkEnv = null

Просмотреть файл

@ -6,6 +6,6 @@ import spark.TaskState.TaskState
/**
* A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
trait ExecutorBackend {
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}

Просмотреть файл

@ -5,8 +5,7 @@ import java.net.{URLClassLoader, URL}
/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
*/
private[spark]
class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {
override def addURL(url: URL) {

Просмотреть файл

@ -8,7 +8,7 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
class MesosExecutorBackend(executor: Executor)
private[spark] class MesosExecutorBackend(executor: Executor)
extends MesosExecutor
with ExecutorBackend
with Logging {
@ -59,7 +59,7 @@ class MesosExecutorBackend(executor: Executor)
/**
* Entry point for Mesos executor.
*/
object MesosExecutorBackend {
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running

Просмотреть файл

@ -14,7 +14,7 @@ import spark.scheduler.cluster.RegisterSlaveFailed
import spark.scheduler.cluster.RegisterSlave
class StandaloneExecutorBackend(
private[spark] class StandaloneExecutorBackend(
executor: Executor,
masterUrl: String,
slaveId: String,
@ -62,7 +62,7 @@ class StandaloneExecutorBackend(
}
}
object StandaloneExecutorBackend {
private[spark] object StandaloneExecutorBackend {
def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc

Просмотреть файл

@ -11,6 +11,7 @@ import java.nio.channels.spi._
import java.net._
private[spark]
abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
channel.configureBlocking(false)
@ -102,7 +103,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
}
class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
extends Connection(SocketChannel.open, selector_) {
class Outbox(fair: Int = 0) {
@ -259,7 +260,7 @@ extends Connection(SocketChannel.open, selector_) {
}
class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
extends Connection(channel_, selector_) {
class Inbox() {

Просмотреть файл

@ -18,17 +18,17 @@ import akka.dispatch.{Await, Promise, ExecutionContext, Future}
import akka.util.Duration
import akka.util.duration._
case class ConnectionManagerId(host: String, port: Int) {
private[spark] case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
}
object ConnectionManagerId {
private[spark] object ConnectionManagerId {
def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = {
new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort())
}
}
class ConnectionManager(port: Int) extends Logging {
private[spark] class ConnectionManager(port: Int) extends Logging {
class MessageStatus(
val message: Message,
@ -349,7 +349,7 @@ class ConnectionManager(port: Int) extends Logging {
}
object ConnectionManager {
private[spark] object ConnectionManager {
def main(args: Array[String]) {

Просмотреть файл

@ -11,7 +11,7 @@ import java.net.InetAddress
import akka.dispatch.Await
import akka.util.duration._
object ConnectionManagerTest extends Logging{
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")

Просмотреть файл

@ -9,7 +9,7 @@ import java.net.InetAddress
import java.net.InetSocketAddress
import storage.BlockManager
class MessageChunkHeader(
private[spark] class MessageChunkHeader(
val typ: Long,
val id: Int,
val totalSize: Int,
@ -37,7 +37,7 @@ class MessageChunkHeader(
" and sizes " + totalSize + " / " + chunkSize + " bytes"
}
class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
private[spark] class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
val size = if (buffer == null) 0 else buffer.remaining
lazy val buffers = {
val ab = new ArrayBuffer[ByteBuffer]()
@ -51,7 +51,7 @@ class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
override def toString = "" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")"
}
abstract class Message(val typ: Long, val id: Int) {
private[spark] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
var started = false
var startTime = -1L
@ -68,7 +68,7 @@ abstract class Message(val typ: Long, val id: Int) {
override def toString = this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
}
class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
private[spark] class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
extends Message(Message.BUFFER_MESSAGE, id_) {
val initialSize = currentSize()
@ -152,7 +152,7 @@ extends Message(Message.BUFFER_MESSAGE, id_) {
}
}
object MessageChunkHeader {
private[spark] object MessageChunkHeader {
val HEADER_SIZE = 40
def create(buffer: ByteBuffer): MessageChunkHeader = {
@ -173,7 +173,7 @@ object MessageChunkHeader {
}
}
object Message {
private[spark] object Message {
val BUFFER_MESSAGE = 1111111111L
var lastId = 1

Просмотреть файл

@ -3,7 +3,7 @@ package spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
object ReceiverTest {
private[spark] object ReceiverTest {
def main(args: Array[String]) {
val manager = new ConnectionManager(9999)

Просмотреть файл

@ -3,7 +3,7 @@ package spark.network
import java.nio.ByteBuffer
import java.net.InetAddress
object SenderTest {
private[spark] object SenderTest {
def main(args: Array[String]) {

Просмотреть файл

@ -12,7 +12,7 @@ import spark.scheduler.JobListener
* a result of type U for each partition, and that the action returns a partial or complete result
* of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt).
*/
class ApproximateActionListener[T, U, R](
private[spark] class ApproximateActionListener[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],

Просмотреть файл

@ -4,7 +4,7 @@ package spark.partial
* An object that computes a function incrementally by merging in results of type U from multiple
* tasks. Allows partial evaluation at any point by calling currentResult().
*/
trait ApproximateEvaluator[U, R] {
private[spark] trait ApproximateEvaluator[U, R] {
def merge(outputId: Int, taskResult: U): Unit
def currentResult(): R
}

Просмотреть файл

@ -3,6 +3,7 @@ package spark.partial
/**
* A Double with error bars on it.
*/
private[spark]
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}

Просмотреть файл

@ -8,7 +8,7 @@ import cern.jet.stat.Probability
* TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
* be best to make this a special case of GroupedCountEvaluator with one group.
*/
class CountEvaluator(totalOutputs: Int, confidence: Double)
private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[Long, BoundedDouble] {
var outputsMerged = 0

Просмотреть файл

@ -14,7 +14,7 @@ import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
var outputsMerged = 0

Просмотреть файл

@ -12,7 +12,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
*/
class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
var outputsMerged = 0

Просмотреть файл

@ -12,7 +12,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
*/
class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
var outputsMerged = 0

Просмотреть файл

@ -7,7 +7,7 @@ import spark.util.StatCounter
/**
* An ApproximateEvaluator for means.
*/
class MeanEvaluator(totalOutputs: Int, confidence: Double)
private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
var outputsMerged = 0

Просмотреть файл

@ -1,6 +1,6 @@
package spark.partial
class PartialResult[R](initialVal: R, isFinal: Boolean) {
private[spark] class PartialResult[R](initialVal: R, isFinal: Boolean) {
private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
private var failure: Option[Exception] = None
private var completionHandler: Option[R => Unit] = None

Просмотреть файл

@ -7,7 +7,7 @@ import cern.jet.stat.Probability
* and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
* confidence intervals for many keys.
*/
class StudentTCacher(confidence: Double) {
private[spark] class StudentTCacher(confidence: Double) {
val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation
val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)

Просмотреть файл

@ -9,7 +9,7 @@ import spark.util.StatCounter
* together, then uses the formula for the variance of two independent random variables to get
* a variance for the result and compute a confidence interval.
*/
class SumEvaluator(totalOutputs: Int, confidence: Double)
private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
var outputsMerged = 0

Просмотреть файл

@ -5,7 +5,7 @@ import spark.TaskContext
/**
* Tracks information about an active job in the DAGScheduler.
*/
class ActiveJob(
private[spark] class ActiveJob(
val runId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,

Просмотреть файл

@ -21,6 +21,7 @@ import spark.storage.BlockManagerId
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
*/
private[spark]
class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
taskSched.setListener(this)

Просмотреть файл

@ -10,9 +10,9 @@ import spark._
* submitted) but there is a single "logic" thread that reads these events and takes decisions.
* This greatly simplifies synchronization.
*/
sealed trait DAGSchedulerEvent
private[spark] sealed trait DAGSchedulerEvent
case class JobSubmitted(
private[spark] case class JobSubmitted(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
@ -21,15 +21,15 @@ case class JobSubmitted(
listener: JobListener)
extends DAGSchedulerEvent
case class CompletionEvent(
private[spark] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any])
extends DAGSchedulerEvent
case class HostLost(host: String) extends DAGSchedulerEvent
private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
case object StopDAGScheduler extends DAGSchedulerEvent
private[spark] case object StopDAGScheduler extends DAGSchedulerEvent

Просмотреть файл

@ -5,7 +5,7 @@ package spark.scheduler
* DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
* job fails (and no further taskSucceeded events will happen).
*/
trait JobListener {
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
}

Просмотреть файл

@ -3,7 +3,7 @@ package spark.scheduler
/**
* A result of a job in the DAGScheduler.
*/
sealed trait JobResult
private[spark] sealed trait JobResult
case class JobSucceeded(results: Seq[_]) extends JobResult
case class JobFailed(exception: Exception) extends JobResult
private[spark] case class JobSucceeded(results: Seq[_]) extends JobResult
private[spark] case class JobFailed(exception: Exception) extends JobResult

Просмотреть файл

@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
/**
* An object that waits for a DAGScheduler job to complete.
*/
class JobWaiter(totalTasks: Int) extends JobListener {
private[spark] class JobWaiter(totalTasks: Int) extends JobListener {
private val taskResults = ArrayBuffer.fill[Any](totalTasks)(null)
private var finishedTasks = 0

Просмотреть файл

@ -2,7 +2,7 @@ package spark.scheduler
import spark._
class ResultTask[T, U](
private[spark] class ResultTask[T, U](
stageId: Int,
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,

Просмотреть файл

@ -15,7 +15,7 @@ import com.ning.compress.lzf.LZFOutputStream
import spark._
import spark.storage._
object ShuffleMapTask {
private[spark] object ShuffleMapTask {
// A simple map between the stage id to the serialized byte array of a task.
// Served as a cache for task serialization because serialization can be
@ -68,7 +68,7 @@ object ShuffleMapTask {
}
}
class ShuffleMapTask(
private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_,_],

Просмотреть файл

@ -19,7 +19,7 @@ import spark.storage.BlockManagerId
* Each Stage also has a priority, which is (by default) based on the job it was submitted in.
* This allows Stages from earlier jobs to be computed first or recovered faster on failure.
*/
class Stage(
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage

Просмотреть файл

@ -11,7 +11,7 @@ import scala.collection.mutable.HashMap
/**
* A task to execute on a worker node.
*/
abstract class Task[T](val stageId: Int) extends Serializable {
private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
def run(attemptId: Long): T
def preferredLocations: Seq[String] = Nil
@ -25,7 +25,7 @@ abstract class Task[T](val stageId: Int) extends Serializable {
* the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
* first writing out its dependencies.
*/
object Task {
private[spark] object Task {
/**
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
*/

Просмотреть файл

@ -7,6 +7,7 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
private[spark]
class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
def this() = this(null.asInstanceOf[T], null)

Просмотреть файл

@ -7,7 +7,7 @@ package spark.scheduler
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
* the TaskSchedulerListener interface.
*/
trait TaskScheduler {
private[spark] trait TaskScheduler {
def start(): Unit
// Disconnect from the cluster.

Просмотреть файл

@ -7,7 +7,7 @@ import spark.TaskEndReason
/**
* Interface for getting events back from the TaskScheduler.
*/
trait TaskSchedulerListener {
private[spark] trait TaskSchedulerListener {
// A task has finished or failed.
def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit

Просмотреть файл

@ -4,7 +4,7 @@ package spark.scheduler
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id

Просмотреть файл

@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicLong
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
* start(), then submit task sets through the runTasks method.
*/
class ClusterScheduler(val sc: SparkContext)
private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
with Logging {

Просмотреть файл

@ -5,7 +5,7 @@ package spark.scheduler.cluster
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
trait SchedulerBackend {
private[spark] trait SchedulerBackend {
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit

Просмотреть файл

@ -1,3 +1,4 @@
package spark.scheduler.cluster
private[spark]
class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}

Просмотреть файл

@ -5,7 +5,7 @@ import spark.deploy.client.{Client, ClientListener}
import spark.deploy.{Command, JobDescription}
import scala.collection.mutable.HashMap
class SparkDeploySchedulerBackend(
private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,

Просмотреть файл

@ -4,19 +4,27 @@ import spark.TaskState.TaskState
import java.nio.ByteBuffer
import spark.util.SerializableBuffer
sealed trait StandaloneClusterMessage extends Serializable
private[spark] sealed trait StandaloneClusterMessage extends Serializable
// Master to slaves
private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
private[spark]
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
private[spark]
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
private[spark]
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
@ -25,5 +33,5 @@ object StatusUpdate {
}
// Internal messages in master
case object ReviveOffers extends StandaloneClusterMessage
case object StopMaster extends StandaloneClusterMessage
private[spark] case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopMaster extends StandaloneClusterMessage

Просмотреть файл

@ -16,6 +16,7 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging {
@ -149,6 +150,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
}
object StandaloneSchedulerBackend {
private[spark] object StandaloneSchedulerBackend {
val ACTOR_NAME = "StandaloneScheduler"
}

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше