diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index c157cc8feb..cd8e43f556 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -18,7 +18,7 @@ import scala.collection.generic.Growable * @tparam R the full accumulated data * @tparam T partial data that can be added in */ -class Accumulable[R, T] ( +private[spark] class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T]) extends Serializable { @@ -73,7 +73,7 @@ class Accumulable[R, T] ( * @tparam R the full accumulated data * @tparam T partial data that can be added in */ -trait AccumulableParam[R, T] extends Serializable { +private[spark] trait AccumulableParam[R, T] extends Serializable { /** * Add additional data to the accumulator value. * @param r the current value of the accumulator @@ -93,7 +93,7 @@ trait AccumulableParam[R, T] extends Serializable { def zero(initialValue: R): R } -class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] +private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] { def addAccumulator(growable: R, elem: T) : R = { @@ -124,7 +124,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser * @param param helper object defining how to add elements of type `T` * @tparam T result type */ -class Accumulator[T]( +private[spark] class Accumulator[T]( @transient initialValue: T, param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) @@ -133,7 +133,7 @@ class Accumulator[T]( * as the accumulated value * @tparam T type of value to accumulate */ -trait AccumulatorParam[T] extends AccumulableParam[T, T] { +private[spark] trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T) : T = { addInPlace(t1, t2) } diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala index daabc0d566..faa99fe3e9 100644 --- a/core/src/main/scala/spark/BlockRDD.scala +++ b/core/src/main/scala/spark/BlockRDD.scala @@ -2,7 +2,7 @@ package spark import scala.collection.mutable.HashMap -class BlockRDDSplit(val blockId: String, idx: Int) extends Split { +private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split { val index = idx } diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 9c42e88b68..fb65ba421a 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -11,8 +11,7 @@ import spark.storage.BlockManagerId import it.unimi.dsi.fastutil.io.FastBufferedInputStream - -class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { +private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) val blockManager = SparkEnv.get.blockManager diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index 6fe0b94297..e8392a194f 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -9,7 +9,7 @@ import java.util.LinkedHashMap * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well * when most of the space is used by arrays of primitives or of simple classes. */ -class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { +private[spark] class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { logInfo("BoundedMemoryCache.maxBytes = " + maxBytes) def this() { @@ -104,9 +104,9 @@ class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { } // An entry in our map; stores a cached object and its size in bytes -case class Entry(value: Any, size: Long) +private[spark] case class Entry(value: Any, size: Long) -object BoundedMemoryCache { +private[spark] object BoundedMemoryCache { /** * Get maximum cache capacity from system configuration */ diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala index 150fe14e2c..20d677a854 100644 --- a/core/src/main/scala/spark/Cache.scala +++ b/core/src/main/scala/spark/Cache.scala @@ -2,9 +2,9 @@ package spark import java.util.concurrent.atomic.AtomicInteger -sealed trait CachePutResponse -case class CachePutSuccess(size: Long) extends CachePutResponse -case class CachePutFailure() extends CachePutResponse +private[spark] sealed trait CachePutResponse +private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse +private[spark] case class CachePutFailure() extends CachePutResponse /** * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store @@ -22,7 +22,7 @@ case class CachePutFailure() extends CachePutResponse * This abstract class handles the creation of key spaces, so that subclasses need only deal with * keys that are unique across modules. */ -abstract class Cache { +private[spark] abstract class Cache { private val nextKeySpaceId = new AtomicInteger(0) private def newKeySpaceId() = nextKeySpaceId.getAndIncrement() @@ -52,7 +52,7 @@ abstract class Cache { /** * A key namespace in a Cache. */ -class KeySpace(cache: Cache, val keySpaceId: Int) { +private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) { def get(datasetId: Any, partition: Int): Any = cache.get((keySpaceId, datasetId), partition) diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index d9e0ef90b8..9a23f9e7cc 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -15,19 +15,20 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel -sealed trait CacheTrackerMessage -case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L) - extends CacheTrackerMessage -case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L) - extends CacheTrackerMessage -case class MemoryCacheLost(host: String) extends CacheTrackerMessage -case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage -case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage -case object GetCacheStatus extends CacheTrackerMessage -case object GetCacheLocations extends CacheTrackerMessage -case object StopCacheTracker extends CacheTrackerMessage +private[spark] sealed trait CacheTrackerMessage -class CacheTrackerActor extends Actor with Logging { +private[spark] case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L) + extends CacheTrackerMessage +private[spark] case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L) + extends CacheTrackerMessage +private[spark] case class MemoryCacheLost(host: String) extends CacheTrackerMessage +private[spark] case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage +private[spark] case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage +private[spark] case object GetCacheStatus extends CacheTrackerMessage +private[spark] case object GetCacheLocations extends CacheTrackerMessage +private[spark] case object StopCacheTracker extends CacheTrackerMessage + +private[spark] class CacheTrackerActor extends Actor with Logging { // TODO: Should probably store (String, CacheType) tuples private val locs = new HashMap[Int, Array[List[String]]] @@ -89,7 +90,7 @@ class CacheTrackerActor extends Actor with Logging { } } -class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager) +private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager) extends Logging { // Tracker actor on the master, or remote reference to it on workers diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index e26041555a..9f94bcb413 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,6 +1,6 @@ package spark -class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { +private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { override val index: Int = idx } diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala index 3b83d23a13..98525b99c8 100644 --- a/core/src/main/scala/spark/ClosureCleaner.scala +++ b/core/src/main/scala/spark/ClosureCleaner.scala @@ -9,7 +9,7 @@ import org.objectweb.asm.{ClassReader, MethodVisitor, Type} import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ -object ClosureCleaner extends Logging { +private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it private def getClassReader(cls: Class[_]): ClassReader = { new ClassReader(cls.getResourceAsStream( @@ -154,7 +154,7 @@ object ClosureCleaner extends Logging { } } -class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor { +private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { return new EmptyVisitor { @@ -180,7 +180,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor } } -class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor { +private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor { var myName: String = null override def visit(version: Int, access: Int, name: String, sig: String, diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index 6959917d14..2c270766f9 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -6,16 +6,16 @@ import java.io.ObjectInputStream import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -sealed trait CoGroupSplitDep extends Serializable -case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep -case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep +private[spark] sealed trait CoGroupSplitDep extends Serializable +private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep +private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { +private[spark] class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { override val index: Int = idx override def hashCode(): Int = idx } -class CoGroupAggregator +private[spark] class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]]( { x => ArrayBuffer(x) }, { (b, x) => b += x }, diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index c0ff94acc6..a478710f04 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -1,23 +1,23 @@ package spark -abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable +private[spark] abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable -abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { +private[spark] abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { def getParents(outputPartition: Int): Seq[Int] } -class ShuffleDependency[K, V, C]( +private[spark] class ShuffleDependency[K, V, C]( val shuffleId: Int, @transient rdd: RDD[(K, V)], val aggregator: Aggregator[K, V, C], val partitioner: Partitioner) extends Dependency(rdd, true) -class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { +private[spark] class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) } -class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) +private[spark] class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = { diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala index 1fbf66b7de..6252dc44f7 100644 --- a/core/src/main/scala/spark/DoubleRDDFunctions.scala +++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala @@ -10,7 +10,7 @@ import spark.util.StatCounter /** * Extra functions available on RDDs of Doubles through an implicit conversion. */ -class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { +private[spark] class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { def sum(): Double = { self.reduce(_ + _) } diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala index 55512f4481..a953081d24 100644 --- a/core/src/main/scala/spark/FetchFailedException.scala +++ b/core/src/main/scala/spark/FetchFailedException.scala @@ -2,7 +2,7 @@ package spark import spark.storage.BlockManagerId -class FetchFailedException( +private[spark] class FetchFailedException( val bmAddress: BlockManagerId, val shuffleId: Int, val mapId: Int, diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index 0befca582d..6d448116a9 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -18,7 +18,7 @@ import org.apache.hadoop.util.ReflectionUtils /** * A Spark split class that wraps around a Hadoop InputSplit. */ -class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) +private[spark] class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) extends Split with Serializable { diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala index 05ca846c85..659d17718f 100644 --- a/core/src/main/scala/spark/HttpFileServer.scala +++ b/core/src/main/scala/spark/HttpFileServer.scala @@ -5,7 +5,7 @@ import java.net.URL import scala.collection.mutable.HashMap import org.apache.hadoop.fs.FileUtil -class HttpFileServer extends Logging { +private[spark] class HttpFileServer extends Logging { var baseDir : File = null var fileDir : File = null diff --git a/core/src/main/scala/spark/HttpServer.scala b/core/src/main/scala/spark/HttpServer.scala index 855f2c752f..0196595ba1 100644 --- a/core/src/main/scala/spark/HttpServer.scala +++ b/core/src/main/scala/spark/HttpServer.scala @@ -12,14 +12,14 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool /** * Exception type thrown by HttpServer when it is in the wrong state for an operation. */ -class ServerStateException(message: String) extends Exception(message) +private[spark] class ServerStateException(message: String) extends Exception(message) /** * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext * as well as classes created by the interpreter when the user types in code. This is just a wrapper * around a Jetty server. */ -class HttpServer(resourceBase: File) extends Logging { +private[spark] class HttpServer(resourceBase: File) extends Logging { private var server: Server = null private var port: Int = -1 diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index 1511c2620e..39d554b6a5 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -5,14 +5,14 @@ import java.nio.ByteBuffer import spark.util.ByteBufferInputStream -class JavaSerializationStream(out: OutputStream) extends SerializationStream { +private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { val objOut = new ObjectOutputStream(out) def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this } def flush() { objOut.flush() } def close() { objOut.close() } } -class JavaDeserializationStream(in: InputStream, loader: ClassLoader) +private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader) extends DeserializationStream { val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass) = @@ -23,7 +23,7 @@ extends DeserializationStream { def close() { objIn.close() } } -class JavaSerializerInstance extends SerializerInstance { +private[spark] class JavaSerializerInstance extends SerializerInstance { def serialize[T](t: T): ByteBuffer = { val bos = new ByteArrayOutputStream() val out = serializeStream(bos) @@ -57,6 +57,6 @@ class JavaSerializerInstance extends SerializerInstance { } } -class JavaSerializer extends Serializer { +private[spark] class JavaSerializer extends Serializer { def newInstance(): SerializerInstance = new JavaSerializerInstance } diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 376fcff4c8..1e55621b8e 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -20,7 +20,7 @@ import spark.storage._ * Zig-zag encoder used to write object sizes to serialization streams. * Based on Kryo's integer encoder. */ -object ZigZag { +private[spark] object ZigZag { def writeInt(n: Int, out: OutputStream) { var value = n if ((value & ~0x7F) == 0) { @@ -68,7 +68,7 @@ object ZigZag { } } -class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) +private[spark] class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) extends SerializationStream { val channel = Channels.newChannel(out) @@ -85,7 +85,7 @@ extends SerializationStream { def close() { out.close() } } -class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) +private[spark] class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) extends DeserializationStream { def readObject[T](): T = { val len = ZigZag.readInt(in) @@ -95,7 +95,7 @@ extends DeserializationStream { def close() { in.close() } } -class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { +private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { val kryo = ks.kryo val threadBuffer = ks.threadBuffer.get() val objectBuffer = ks.objectBuffer.get() diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 82c1391345..116d526854 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -16,11 +16,11 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManagerId -sealed trait MapOutputTrackerMessage -case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage -case object StopMapOutputTracker extends MapOutputTrackerMessage +private[spark] sealed trait MapOutputTrackerMessage +private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage +private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage -class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { +private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { def receive = { case GetMapOutputLocations(shuffleId: Int) => logInfo("Asked to get map output locations for shuffle " + shuffleId) @@ -33,7 +33,7 @@ class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Loggin } } -class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging { +private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging { val ip: String = System.getProperty("spark.master.host", "localhost") val port: Int = System.getProperty("spark.master.port", "7077").toInt val actorName: String = "MapOutputTracker" diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index 14f708a3f8..9c5ad3511e 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -13,7 +13,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID import java.util.Date import java.text.SimpleDateFormat -class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) +private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Split { val serializableHadoopSplit = new SerializableWritable(rawSplit) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 4752bf8d9f..57fdb741df 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -41,7 +41,7 @@ import spark.partial.PartialResult /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. */ -class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( +private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { @@ -430,7 +430,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def getValueClass() = implicitly[ClassManifest[V]].erasure } -class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( +private[spark] class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { @@ -459,6 +459,6 @@ class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U] } } -object Manifests { +private[spark] object Manifests { val seqSeqManifest = classManifest[Seq[Seq[_]]] } diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index d79007ab40..321f5264b8 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -3,7 +3,7 @@ package spark import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer -class ParallelCollectionSplit[T: ClassManifest]( +private[spark] class ParallelCollectionSplit[T: ClassManifest]( val rddId: Long, val slice: Int, values: Seq[T]) diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index c066017e89..4b33148364 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -4,7 +4,7 @@ import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { +private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index: Int = prev.index } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index ea7171d3a1..6224e957e5 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -30,7 +30,7 @@ import SparkContext._ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. */ -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( +private[spark] class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index 9ec07cc173..c0e08289d8 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -12,14 +12,14 @@ import spark.util.ByteBufferInputStream * A serializer. Because some serialization libraries are not thread safe, this class is used to * create SerializerInstances that do the actual serialization. */ -trait Serializer { +private[spark] trait Serializer { def newInstance(): SerializerInstance } /** * An instance of the serializer, for use by one thread at a time. */ -trait SerializerInstance { +private[spark] trait SerializerInstance { def serialize[T](t: T): ByteBuffer def deserialize[T](bytes: ByteBuffer): T @@ -50,7 +50,7 @@ trait SerializerInstance { /** * A stream for writing serialized objects. */ -trait SerializationStream { +private[spark] trait SerializationStream { def writeObject[T](t: T): SerializationStream def flush(): Unit def close(): Unit @@ -66,7 +66,7 @@ trait SerializationStream { /** * A stream for reading serialized objects. */ -trait DeserializationStream { +private[spark] trait DeserializationStream { def readObject[T](): T def close(): Unit diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala index 4f8d98f7d0..daa35fe7f2 100644 --- a/core/src/main/scala/spark/ShuffleFetcher.scala +++ b/core/src/main/scala/spark/ShuffleFetcher.scala @@ -1,6 +1,6 @@ package spark -abstract class ShuffleFetcher { +private[spark] abstract class ShuffleFetcher { // Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly // once on each key-value pair obtained. def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 11d5c0ede8..1a9f4cfec3 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -4,7 +4,7 @@ import scala.collection.mutable.ArrayBuffer import java.util.{HashMap => JHashMap} -class ShuffledRDDSplit(val idx: Int) extends Split { +private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx override def hashCode(): Int = idx } diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index 6a71522922..7c3e8640e9 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -22,7 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet * Based on the following JavaWorld article: * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html */ -object SizeEstimator extends Logging { +private[spark] object SizeEstimator extends Logging { // Sizes of primitive types private val BYTE_SIZE = 1 diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala index ce9370c5d7..3dd0a4b1f9 100644 --- a/core/src/main/scala/spark/SoftReferenceCache.scala +++ b/core/src/main/scala/spark/SoftReferenceCache.scala @@ -5,7 +5,7 @@ import com.google.common.collect.MapMaker /** * An implementation of Cache that uses soft references. */ -class SoftReferenceCache extends Cache { +private[spark] class SoftReferenceCache extends Cache { val map = new MapMaker().softValues().makeMap[Any, Any]() override def get(datasetId: Any, partition: Int): Any = diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 79a9e8e34e..5742e64b56 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -589,7 +589,7 @@ object SparkContext { * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ -class WritableConverter[T]( +private[spark] class WritableConverter[T]( val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 6e4eb25ed4..3e5668892f 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -7,10 +7,10 @@ import spark.storage.BlockManagerId * tasks several times for "ephemeral" failures, and only report back failures that require some * old stages to be resubmitted, such as shuffle map fetch failures. */ -sealed trait TaskEndReason +private[spark] sealed trait TaskEndReason -case object Success extends TaskEndReason -case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason -case class ExceptionFailure(exception: Throwable) extends TaskEndReason -case class OtherFailure(message: String) extends TaskEndReason +private[spark] case object Success extends TaskEndReason +private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it +private[spark] case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason +private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason +private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/TaskState.scala b/core/src/main/scala/spark/TaskState.scala index 9566b52432..78eb33a628 100644 --- a/core/src/main/scala/spark/TaskState.scala +++ b/core/src/main/scala/spark/TaskState.scala @@ -2,7 +2,7 @@ package spark import org.apache.mesos.Protos.{TaskState => MesosTaskState} -object TaskState +private[spark] object TaskState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") { val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala index 0e8164d6ab..3e795ea2a2 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -2,7 +2,7 @@ package spark import scala.collection.mutable.ArrayBuffer -class UnionSplit[T: ClassManifest]( +private[spark] class UnionSplit[T: ClassManifest]( idx: Int, rdd: RDD[T], split: Split) diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index b72e8986d3..cf20f456c4 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -11,7 +11,7 @@ import scala.math import spark._ import spark.storage.StorageLevel -class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) +private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { @@ -1027,7 +1027,7 @@ class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Lo } } -class BitTorrentBroadcastFactory +private[spark] class BitTorrentBroadcastFactory extends BroadcastFactory { def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) } diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 3ba91c93e9..370978113f 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong import spark._ -abstract class Broadcast[T](id: Long) extends Serializable { +private[spark] abstract class Broadcast[T](id: Long) extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with @@ -14,7 +14,7 @@ abstract class Broadcast[T](id: Long) extends Serializable { override def toString = "spark.Broadcast(" + id + ")" } -class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { +private[spark] class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { private var initialized = false private var broadcastFactory: BroadcastFactory = null diff --git a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala index 66ca8d56d5..ab6d302827 100644 --- a/core/src/main/scala/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/spark/broadcast/BroadcastFactory.scala @@ -6,7 +6,7 @@ package spark.broadcast * BroadcastFactory implementation to instantiate a particular broadcast for the * entire Spark job. */ -trait BroadcastFactory { +private[spark] trait BroadcastFactory { def initialize(isMaster: Boolean): Unit def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] def stop(): Unit diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index d8cf5e37d4..7eb4ddb74f 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -12,7 +12,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import spark._ import spark.storage.StorageLevel -class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) +private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { def value = value_ @@ -46,7 +46,7 @@ extends Broadcast[T](id) with Logging with Serializable { } } -class HttpBroadcastFactory extends BroadcastFactory { +private[spark] class HttpBroadcastFactory extends BroadcastFactory { def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = diff --git a/core/src/main/scala/spark/broadcast/MultiTracker.scala b/core/src/main/scala/spark/broadcast/MultiTracker.scala index d00db23362..dd8e6dd246 100644 --- a/core/src/main/scala/spark/broadcast/MultiTracker.scala +++ b/core/src/main/scala/spark/broadcast/MultiTracker.scala @@ -382,10 +382,10 @@ extends Logging { } } -case class BroadcastBlock(blockID: Int, byteArray: Array[Byte]) +private[spark] case class BroadcastBlock(blockID: Int, byteArray: Array[Byte]) extends Serializable -case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock], +private[spark] case class VariableInfo(@transient arrayOfBlocks : Array[BroadcastBlock], totalBlocks: Int, totalBytes: Int) extends Serializable { diff --git a/core/src/main/scala/spark/broadcast/SourceInfo.scala b/core/src/main/scala/spark/broadcast/SourceInfo.scala index f90385fd47..705dd6fd81 100644 --- a/core/src/main/scala/spark/broadcast/SourceInfo.scala +++ b/core/src/main/scala/spark/broadcast/SourceInfo.scala @@ -7,7 +7,7 @@ import spark._ /** * Used to keep and pass around information of peers involved in a broadcast */ -case class SourceInfo (hostAddress: String, +private[spark] case class SourceInfo (hostAddress: String, listenPort: Int, totalBlocks: Int = SourceInfo.UnusedParam, totalBytes: Int = SourceInfo.UnusedParam) @@ -26,7 +26,7 @@ extends Comparable[SourceInfo] with Logging { /** * Helper Object of SourceInfo for its constants */ -object SourceInfo { +private[spark] object SourceInfo { // Constants for special values of listenPort val TxNotStartedRetry = -1 val TxOverGoToDefault = 0 diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index c1148b22ca..5bd40a40e3 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -10,7 +10,7 @@ import scala.math import spark._ import spark.storage.StorageLevel -class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) +private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { def value = value_ @@ -572,7 +572,7 @@ extends Broadcast[T](id) with Logging with Serializable { } } -class TreeBroadcastFactory +private[spark] class TreeBroadcastFactory extends BroadcastFactory { def initialize(isMaster: Boolean) { MultiTracker.initialize(isMaster) } diff --git a/core/src/main/scala/spark/deploy/Command.scala b/core/src/main/scala/spark/deploy/Command.scala index 344888919a..577101e3c3 100644 --- a/core/src/main/scala/spark/deploy/Command.scala +++ b/core/src/main/scala/spark/deploy/Command.scala @@ -2,7 +2,7 @@ package spark.deploy import scala.collection.Map -case class Command( +private[spark] case class Command( mainClass: String, arguments: Seq[String], environment: Map[String, String]) { diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 141bbe4d57..7eaae2c618 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -7,14 +7,14 @@ import scala.collection.immutable.List import scala.collection.mutable.HashMap -sealed trait DeployMessage extends Serializable +private[spark] sealed trait DeployMessage extends Serializable // Worker to Master -case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) +private[spark] case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) extends DeployMessage -case class ExecutorStateChanged( +private[spark] case class ExecutorStateChanged( jobId: String, execId: Int, state: ExecutorState, @@ -23,11 +23,11 @@ case class ExecutorStateChanged( // Master to Worker -case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage -case class RegisterWorkerFailed(message: String) extends DeployMessage -case class KillExecutor(jobId: String, execId: Int) extends DeployMessage +private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage +private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage +private[spark] case class KillExecutor(jobId: String, execId: Int) extends DeployMessage -case class LaunchExecutor( +private[spark] case class LaunchExecutor( jobId: String, execId: Int, jobDesc: JobDescription, @@ -38,33 +38,33 @@ case class LaunchExecutor( // Client to Master -case class RegisterJob(jobDescription: JobDescription) extends DeployMessage +private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage // Master to Client -case class RegisteredJob(jobId: String) extends DeployMessage -case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) -case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) -case class JobKilled(message: String) +private[spark] case class RegisteredJob(jobId: String) extends DeployMessage +private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) +private[spark] case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) +private[spark] case class JobKilled(message: String) // Internal message in Client -case object StopClient +private[spark] case object StopClient // MasterWebUI To Master -case object RequestMasterState +private[spark] case object RequestMasterState // Master to MasterWebUI -case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], +private[spark] case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], completedJobs: List[JobInfo]) // WorkerWebUI to Worker -case object RequestWorkerState +private[spark] case object RequestWorkerState // Worker to WorkerWebUI -case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], +private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala index d6ff1c54ca..5dc0c54552 100644 --- a/core/src/main/scala/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/spark/deploy/ExecutorState.scala @@ -1,6 +1,6 @@ package spark.deploy -object ExecutorState +private[spark] object ExecutorState extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") { val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala index 8ae77b1038..20879c5f11 100644 --- a/core/src/main/scala/spark/deploy/JobDescription.scala +++ b/core/src/main/scala/spark/deploy/JobDescription.scala @@ -1,6 +1,6 @@ package spark.deploy -class JobDescription( +private[spark] class JobDescription( val name: String, val cores: Int, val memoryPerSlave: Int, diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 1591bfdeb6..e938981f6e 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -9,7 +9,7 @@ import spark.{Logging, Utils} import scala.collection.mutable.ArrayBuffer -class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { +private[spark] class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { val localIpAddress = Utils.localIpAddress diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index a2f88fc5e5..b1b72a3a1f 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -16,7 +16,7 @@ import akka.dispatch.Await * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description, * and a listener for job events, and calls back the listener when various events occur. */ -class Client( +private[spark] class Client( actorSystem: ActorSystem, masterUrl: String, jobDescription: JobDescription, diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala index 7d23baff32..a8fa982085 100644 --- a/core/src/main/scala/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala @@ -7,7 +7,7 @@ package spark.deploy.client * * Users of this API should *not* block inside the callback methods. */ -trait ClientListener { +private[spark] trait ClientListener { def connected(jobId: String): Unit def disconnected(): Unit diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala index df9a36c7fe..bf0e7428ba 100644 --- a/core/src/main/scala/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/spark/deploy/client/TestClient.scala @@ -4,7 +4,7 @@ import spark.util.AkkaUtils import spark.{Logging, Utils} import spark.deploy.{Command, JobDescription} -object TestClient { +private[spark] object TestClient { class TestListener extends ClientListener with Logging { def connected(id: String) { diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala index 2e40e10d18..0e46db2272 100644 --- a/core/src/main/scala/spark/deploy/client/TestExecutor.scala +++ b/core/src/main/scala/spark/deploy/client/TestExecutor.scala @@ -1,6 +1,6 @@ package spark.deploy.client -object TestExecutor { +private[spark] object TestExecutor { def main(args: Array[String]) { println("Hello world!") while (true) { diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala index 335e00958c..1db2c32633 100644 --- a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala @@ -2,7 +2,7 @@ package spark.deploy.master import spark.deploy.ExecutorState -class ExecutorInfo( +private[spark] class ExecutorInfo( val id: Int, val job: JobInfo, val worker: WorkerInfo, diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 4c81a1b447..e2364f1863 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -5,7 +5,7 @@ import java.util.Date import akka.actor.ActorRef import scala.collection.mutable -class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { +private[spark] class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] var coresGranted = 0 diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala index 8d458ac39c..2b70cf0191 100644 --- a/core/src/main/scala/spark/deploy/master/JobState.scala +++ b/core/src/main/scala/spark/deploy/master/JobState.scala @@ -1,6 +1,6 @@ package spark.deploy.master -object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") { +private[spark] object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") { type JobState = Value val WAITING, RUNNING, FINISHED, FAILED = Value diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 5cc73633ab..6010f7cff2 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -14,7 +14,7 @@ import spark.{Logging, SparkException, Utils} import spark.util.AkkaUtils -class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { +private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs var nextJobNumber = 0 @@ -212,7 +212,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { } } -object Master { +private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala index d712e3d5b3..1b1c3dd0ad 100644 --- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala @@ -6,7 +6,7 @@ import spark.Utils /** * Command-line parser for the master. */ -class MasterArguments(args: Array[String]) { +private[spark] class MasterArguments(args: Array[String]) { var ip = Utils.localIpAddress() var port = 7077 var webUiPort = 8080 diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 9f9994e4ba..c083e7f5ea 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -10,7 +10,7 @@ import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import spark.deploy._ -class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { +private[spark] class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { val RESOURCE_DIR = "spark/deploy/master/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index 59474a0945..16b3f9b653 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -3,7 +3,7 @@ package spark.deploy.master import akka.actor.ActorRef import scala.collection.mutable -class WorkerInfo( +private[spark] class WorkerInfo( val id: String, val host: String, val port: Int, diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index e2a9df275a..73722a82e0 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -13,7 +13,7 @@ import spark.deploy.ExecutorStateChanged /** * Manages the execution of one executor process. */ -class ExecutorRunner( +private[spark] class ExecutorRunner( val jobId: String, val execId: Int, val jobDesc: JobDescription, diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 645613257d..474c9364fd 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -16,7 +16,7 @@ import spark.deploy.RegisterWorkerFailed import akka.actor.Terminated import java.io.File -class Worker( +private[spark] class Worker( ip: String, port: Int, webUiPort: Int, @@ -170,7 +170,7 @@ class Worker( } } -object Worker { +private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala index 059c40da9f..60dc107a4c 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala @@ -8,7 +8,7 @@ import java.lang.management.ManagementFactory /** * Command-line parser for the master. */ -class WorkerArguments(args: Array[String]) { +private[spark] class WorkerArguments(args: Array[String]) { var ip = Utils.localIpAddress() var port = 0 var webUiPort = 8081 diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index f84b92b63e..78a9adc86f 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -9,7 +9,7 @@ import cc.spray.Directives import cc.spray.typeconversion.TwirlSupport._ import spark.deploy.{WorkerState, RequestWorkerState} -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { +private[spark] class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 820428c727..6ecf9fa8da 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -16,7 +16,7 @@ import java.nio.ByteBuffer /** * The Mesos executor for Spark. */ -class Executor extends Logging { +private[spark] class Executor extends Logging { var urlClassLoader : ExecutorURLClassLoader = null var threadPool: ExecutorService = null var env: SparkEnv = null diff --git a/core/src/main/scala/spark/executor/ExecutorBackend.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala index 24c8776f31..e97e509700 100644 --- a/core/src/main/scala/spark/executor/ExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala @@ -6,6 +6,6 @@ import spark.TaskState.TaskState /** * A pluggable interface used by the Executor to send updates to the cluster scheduler. */ -trait ExecutorBackend { +private[spark] trait ExecutorBackend { def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) } diff --git a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala index f74f036c4c..5beb4d049e 100644 --- a/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/spark/executor/ExecutorURLClassLoader.scala @@ -5,8 +5,7 @@ import java.net.{URLClassLoader, URL} /** * The addURL method in URLClassLoader is protected. We subclass it to make this accessible. */ -private[spark] -class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) +private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) { override def addURL(url: URL) { diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index 50f4e41ede..eeab3959c6 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -8,7 +8,7 @@ import com.google.protobuf.ByteString import spark.{Utils, Logging} import spark.TaskState -class MesosExecutorBackend(executor: Executor) +private[spark] class MesosExecutorBackend(executor: Executor) extends MesosExecutor with ExecutorBackend with Logging { @@ -59,7 +59,7 @@ class MesosExecutorBackend(executor: Executor) /** * Entry point for Mesos executor. */ -object MesosExecutorBackend { +private[spark] object MesosExecutorBackend { def main(args: Array[String]) { MesosNativeLibrary.load() // Create a new Executor and start it running diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 26b163de0a..915f71ba9f 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -14,7 +14,7 @@ import spark.scheduler.cluster.RegisterSlaveFailed import spark.scheduler.cluster.RegisterSlave -class StandaloneExecutorBackend( +private[spark] class StandaloneExecutorBackend( executor: Executor, masterUrl: String, slaveId: String, @@ -62,7 +62,7 @@ class StandaloneExecutorBackend( } } -object StandaloneExecutorBackend { +private[spark] object StandaloneExecutorBackend { def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index c4350173fc..3a03f6843a 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -11,7 +11,7 @@ import java.nio.channels.spi._ import java.net._ -abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging { +private[spark] abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging { channel.configureBlocking(false) channel.socket.setTcpNoDelay(true) @@ -102,7 +102,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex } -class SendingConnection(val address: InetSocketAddress, selector_ : Selector) +private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector) extends Connection(SocketChannel.open, selector_) { class Outbox(fair: Int = 0) { @@ -259,7 +259,7 @@ extends Connection(SocketChannel.open, selector_) { } -class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector) +private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector) extends Connection(channel_, selector_) { class Inbox() { diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 2bb5f5fc6b..dec0df25b4 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -18,17 +18,17 @@ import akka.dispatch.{Await, Promise, ExecutionContext, Future} import akka.util.Duration import akka.util.duration._ -case class ConnectionManagerId(host: String, port: Int) { +private[spark] case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) } -object ConnectionManagerId { +private[spark] object ConnectionManagerId { def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = { new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort()) } } -class ConnectionManager(port: Int) extends Logging { +private[spark] class ConnectionManager(port: Int) extends Logging { class MessageStatus( val message: Message, @@ -349,7 +349,7 @@ class ConnectionManager(port: Int) extends Logging { } -object ConnectionManager { +private[spark] object ConnectionManager { def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala index 555b3454ee..47ceaf3c07 100644 --- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala @@ -11,7 +11,7 @@ import java.net.InetAddress import akka.dispatch.Await import akka.util.duration._ -object ConnectionManagerTest extends Logging{ +private[spark] object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { if (args.length < 2) { println("Usage: ConnectionManagerTest ") diff --git a/core/src/main/scala/spark/network/Message.scala b/core/src/main/scala/spark/network/Message.scala index 62a06d95c5..525751b5bf 100644 --- a/core/src/main/scala/spark/network/Message.scala +++ b/core/src/main/scala/spark/network/Message.scala @@ -9,7 +9,7 @@ import java.net.InetAddress import java.net.InetSocketAddress import storage.BlockManager -class MessageChunkHeader( +private[spark] class MessageChunkHeader( val typ: Long, val id: Int, val totalSize: Int, @@ -37,7 +37,7 @@ class MessageChunkHeader( " and sizes " + totalSize + " / " + chunkSize + " bytes" } -class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) { +private[spark] class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) { val size = if (buffer == null) 0 else buffer.remaining lazy val buffers = { val ab = new ArrayBuffer[ByteBuffer]() @@ -51,7 +51,7 @@ class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) { override def toString = "" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")" } -abstract class Message(val typ: Long, val id: Int) { +private[spark] abstract class Message(val typ: Long, val id: Int) { var senderAddress: InetSocketAddress = null var started = false var startTime = -1L @@ -68,7 +68,7 @@ abstract class Message(val typ: Long, val id: Int) { override def toString = this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")" } -class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int) +private[spark] class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int) extends Message(Message.BUFFER_MESSAGE, id_) { val initialSize = currentSize() @@ -152,7 +152,7 @@ extends Message(Message.BUFFER_MESSAGE, id_) { } } -object MessageChunkHeader { +private[spark] object MessageChunkHeader { val HEADER_SIZE = 40 def create(buffer: ByteBuffer): MessageChunkHeader = { @@ -173,7 +173,7 @@ object MessageChunkHeader { } } -object Message { +private[spark] object Message { val BUFFER_MESSAGE = 1111111111L var lastId = 1 diff --git a/core/src/main/scala/spark/network/ReceiverTest.scala b/core/src/main/scala/spark/network/ReceiverTest.scala index e1ba7c06c0..a174d5f403 100644 --- a/core/src/main/scala/spark/network/ReceiverTest.scala +++ b/core/src/main/scala/spark/network/ReceiverTest.scala @@ -3,7 +3,7 @@ package spark.network import java.nio.ByteBuffer import java.net.InetAddress -object ReceiverTest { +private[spark] object ReceiverTest { def main(args: Array[String]) { val manager = new ConnectionManager(9999) diff --git a/core/src/main/scala/spark/network/SenderTest.scala b/core/src/main/scala/spark/network/SenderTest.scala index 4ab6dd3414..a4ff69e4d2 100644 --- a/core/src/main/scala/spark/network/SenderTest.scala +++ b/core/src/main/scala/spark/network/SenderTest.scala @@ -3,7 +3,7 @@ package spark.network import java.nio.ByteBuffer import java.net.InetAddress -object SenderTest { +private[spark] object SenderTest { def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala index e6535836ab..42f46e06ed 100644 --- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala @@ -12,7 +12,7 @@ import spark.scheduler.JobListener * a result of type U for each partition, and that the action returns a partial or complete result * of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt). */ -class ApproximateActionListener[T, U, R]( +private[spark] class ApproximateActionListener[T, U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], diff --git a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala index 4772e43ef0..75713b2eaa 100644 --- a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala +++ b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala @@ -4,7 +4,7 @@ package spark.partial * An object that computes a function incrementally by merging in results of type U from multiple * tasks. Allows partial evaluation at any point by calling currentResult(). */ -trait ApproximateEvaluator[U, R] { +private[spark] trait ApproximateEvaluator[U, R] { def merge(outputId: Int, taskResult: U): Unit def currentResult(): R } diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala index 463c33d6e2..ab5cc21aa0 100644 --- a/core/src/main/scala/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/spark/partial/BoundedDouble.scala @@ -3,6 +3,6 @@ package spark.partial /** * A Double with error bars on it. */ -class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { +private[spark] class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { override def toString(): String = "[%.3f, %.3f]".format(low, high) } diff --git a/core/src/main/scala/spark/partial/CountEvaluator.scala b/core/src/main/scala/spark/partial/CountEvaluator.scala index 1bc90d6b39..daf2c5170c 100644 --- a/core/src/main/scala/spark/partial/CountEvaluator.scala +++ b/core/src/main/scala/spark/partial/CountEvaluator.scala @@ -8,7 +8,7 @@ import cern.jet.stat.Probability * TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might * be best to make this a special case of GroupedCountEvaluator with one group. */ -class CountEvaluator(totalOutputs: Int, confidence: Double) +private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[Long, BoundedDouble] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala index 3e631c0efc..01fbb8a11b 100644 --- a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala @@ -14,7 +14,7 @@ import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} /** * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval. */ -class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double) +private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala index 2a9ccba205..c622df5220 100644 --- a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala +++ b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala @@ -12,7 +12,7 @@ import spark.util.StatCounter /** * An ApproximateEvaluator for means by key. Returns a map of key to confidence interval. */ -class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double) +private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala index 6a2ec7a7bd..20fa55cff2 100644 --- a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala +++ b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala @@ -12,7 +12,7 @@ import spark.util.StatCounter /** * An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval. */ -class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double) +private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/partial/MeanEvaluator.scala b/core/src/main/scala/spark/partial/MeanEvaluator.scala index b8c7cb8863..762c85400d 100644 --- a/core/src/main/scala/spark/partial/MeanEvaluator.scala +++ b/core/src/main/scala/spark/partial/MeanEvaluator.scala @@ -7,7 +7,7 @@ import spark.util.StatCounter /** * An ApproximateEvaluator for means. */ -class MeanEvaluator(totalOutputs: Int, confidence: Double) +private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[StatCounter, BoundedDouble] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala index 200ed4ea1e..beafbf67c3 100644 --- a/core/src/main/scala/spark/partial/PartialResult.scala +++ b/core/src/main/scala/spark/partial/PartialResult.scala @@ -1,6 +1,6 @@ package spark.partial -class PartialResult[R](initialVal: R, isFinal: Boolean) { +private[spark] class PartialResult[R](initialVal: R, isFinal: Boolean) { private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None private var failure: Option[Exception] = None private var completionHandler: Option[R => Unit] = None diff --git a/core/src/main/scala/spark/partial/StudentTCacher.scala b/core/src/main/scala/spark/partial/StudentTCacher.scala index 6263ee3518..443abba5cd 100644 --- a/core/src/main/scala/spark/partial/StudentTCacher.scala +++ b/core/src/main/scala/spark/partial/StudentTCacher.scala @@ -7,7 +7,7 @@ import cern.jet.stat.Probability * and various sample sizes. This is used by the MeanEvaluator to efficiently calculate * confidence intervals for many keys. */ -class StudentTCacher(confidence: Double) { +private[spark] class StudentTCacher(confidence: Double) { val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2) val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0) diff --git a/core/src/main/scala/spark/partial/SumEvaluator.scala b/core/src/main/scala/spark/partial/SumEvaluator.scala index 0357a6bff8..58fb60f441 100644 --- a/core/src/main/scala/spark/partial/SumEvaluator.scala +++ b/core/src/main/scala/spark/partial/SumEvaluator.scala @@ -9,7 +9,7 @@ import spark.util.StatCounter * together, then uses the formula for the variance of two independent random variables to get * a variance for the result and compute a confidence interval. */ -class SumEvaluator(totalOutputs: Int, confidence: Double) +private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) extends ApproximateEvaluator[StatCounter, BoundedDouble] { var outputsMerged = 0 diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala index e09b92d667..5a4e9a582d 100644 --- a/core/src/main/scala/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala @@ -5,7 +5,7 @@ import spark.TaskContext /** * Tracks information about an active job in the DAGScheduler. */ -class ActiveJob( +private[spark] class ActiveJob( val runId: Int, val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 70931407ce..33388545c4 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,7 @@ import spark.storage.BlockManagerId * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ -class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { +private[spark] class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { taskSched.setListener(this) // Called by TaskScheduler to report task completions or failures. diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 11f0ef6245..3422a21d9d 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -10,9 +10,9 @@ import spark._ * submitted) but there is a single "logic" thread that reads these events and takes decisions. * This greatly simplifies synchronization. */ -sealed trait DAGSchedulerEvent +private[spark] sealed trait DAGSchedulerEvent -case class JobSubmitted( +private[spark] case class JobSubmitted( finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], @@ -21,15 +21,15 @@ case class JobSubmitted( listener: JobListener) extends DAGSchedulerEvent -case class CompletionEvent( +private[spark] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]) extends DAGSchedulerEvent -case class HostLost(host: String) extends DAGSchedulerEvent +private[spark] case class HostLost(host: String) extends DAGSchedulerEvent -case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent +private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent -case object StopDAGScheduler extends DAGSchedulerEvent +private[spark] case object StopDAGScheduler extends DAGSchedulerEvent diff --git a/core/src/main/scala/spark/scheduler/JobListener.scala b/core/src/main/scala/spark/scheduler/JobListener.scala index d4dd536a7d..f46b9d551d 100644 --- a/core/src/main/scala/spark/scheduler/JobListener.scala +++ b/core/src/main/scala/spark/scheduler/JobListener.scala @@ -5,7 +5,7 @@ package spark.scheduler * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole * job fails (and no further taskSucceeded events will happen). */ -trait JobListener { +private[spark] trait JobListener { def taskSucceeded(index: Int, result: Any) def jobFailed(exception: Exception) } diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala index 62b458eccb..c4a74e526f 100644 --- a/core/src/main/scala/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/spark/scheduler/JobResult.scala @@ -3,7 +3,7 @@ package spark.scheduler /** * A result of a job in the DAGScheduler. */ -sealed trait JobResult +private[spark] sealed trait JobResult -case class JobSucceeded(results: Seq[_]) extends JobResult -case class JobFailed(exception: Exception) extends JobResult +private[spark] case class JobSucceeded(results: Seq[_]) extends JobResult +private[spark] case class JobFailed(exception: Exception) extends JobResult diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala index 4c2ae23051..b3d4feebe5 100644 --- a/core/src/main/scala/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer /** * An object that waits for a DAGScheduler job to complete. */ -class JobWaiter(totalTasks: Int) extends JobListener { +private[spark] class JobWaiter(totalTasks: Int) extends JobListener { private val taskResults = ArrayBuffer.fill[Any](totalTasks)(null) private var finishedTasks = 0 diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala index 090ced9d76..2ebd4075a2 100644 --- a/core/src/main/scala/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/spark/scheduler/ResultTask.scala @@ -2,7 +2,7 @@ package spark.scheduler import spark._ -class ResultTask[T, U]( +private[spark] class ResultTask[T, U]( stageId: Int, rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 27d97ffee5..966a5e173a 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -15,7 +15,7 @@ import com.ning.compress.lzf.LZFOutputStream import spark._ import spark.storage._ -object ShuffleMapTask { +private[spark] object ShuffleMapTask { // A simple map between the stage id to the serialized byte array of a task. // Served as a cache for task serialization because serialization can be @@ -68,7 +68,7 @@ object ShuffleMapTask { } } -class ShuffleMapTask( +private[spark] class ShuffleMapTask( stageId: Int, var rdd: RDD[_], var dep: ShuffleDependency[_,_,_], diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index b3ef8ac565..803dd1b97d 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -19,7 +19,7 @@ import spark.storage.BlockManagerId * Each Stage also has a priority, which is (by default) based on the job it was submitted in. * This allows Stages from earlier jobs to be computed first or recovered faster on failure. */ -class Stage( +private[spark] class Stage( val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala index d69c259362..d449ac67d6 100644 --- a/core/src/main/scala/spark/scheduler/Task.scala +++ b/core/src/main/scala/spark/scheduler/Task.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.HashMap /** * A task to execute on a worker node. */ -abstract class Task[T](val stageId: Int) extends Serializable { +private[spark] abstract class Task[T](val stageId: Int) extends Serializable { def run(attemptId: Long): T def preferredLocations: Seq[String] = Nil @@ -25,7 +25,7 @@ abstract class Task[T](val stageId: Int) extends Serializable { * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by * first writing out its dependencies. */ -object Task { +private[spark] object Task { /** * Serialize a task and the current app dependencies (files and JARs added to the SparkContext) */ diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala index 868ddb237c..c7b123ce7f 100644 --- a/core/src/main/scala/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/spark/scheduler/TaskResult.scala @@ -7,7 +7,7 @@ import scala.collection.mutable.Map // Task result. Also contains updates to accumulator variables. // TODO: Use of distributed cache to return result is a hack to get around // what seems to be a bug with messages over 60KB in libprocess; fix it -class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable { +private[spark] class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable { def this() = this(null.asInstanceOf[T], null) override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala index c35633d53c..d549b184b0 100644 --- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala @@ -7,7 +7,7 @@ package spark.scheduler * are failures, and mitigating stragglers. They return events to the DAGScheduler through * the TaskSchedulerListener interface. */ -trait TaskScheduler { +private[spark] trait TaskScheduler { def start(): Unit // Disconnect from the cluster. diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala index f838272fb4..fa4de15d0d 100644 --- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala +++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala @@ -7,7 +7,7 @@ import spark.TaskEndReason /** * Interface for getting events back from the TaskScheduler. */ -trait TaskSchedulerListener { +private[spark] trait TaskSchedulerListener { // A task has finished or failed. def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala index 3f4a464902..a3002ca477 100644 --- a/core/src/main/scala/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/spark/scheduler/TaskSet.scala @@ -4,7 +4,7 @@ package spark.scheduler * A set of tasks submitted together to the low-level TaskScheduler, usually representing * missing partitions of a particular stage. */ -class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) { +private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) { val id: String = stageId + "." + attempt override def toString: String = "TaskSet " + id diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 16fe5761c8..f5e852d203 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicLong * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call * start(), then submit task sets through the runTasks method. */ -class ClusterScheduler(val sc: SparkContext) +private[spark] class ClusterScheduler(val sc: SparkContext) extends TaskScheduler with Logging { diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala index 897976c3f9..ddcd64d7c6 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala @@ -5,7 +5,7 @@ package spark.scheduler.cluster * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ -trait SchedulerBackend { +private[spark] trait SchedulerBackend { def start(): Unit def stop(): Unit def reviveOffers(): Unit diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala index e15d577a8b..3c8f511a07 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala @@ -1,3 +1,3 @@ package spark.scheduler.cluster -class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {} +private[spark] class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9093a329a3..0043dbeb10 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -5,7 +5,7 @@ import spark.deploy.client.{Client, ClientListener} import spark.deploy.{Command, JobDescription} import scala.collection.mutable.HashMap -class SparkDeploySchedulerBackend( +private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, master: String, diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 80e8733671..c4e8fea3dc 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -4,20 +4,20 @@ import spark.TaskState.TaskState import java.nio.ByteBuffer import spark.util.SerializableBuffer -sealed trait StandaloneClusterMessage extends Serializable +private[spark] sealed trait StandaloneClusterMessage extends Serializable // Master to slaves -case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage -case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage -case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage +private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage +private[spark] case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage +private[spark] case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage // Slaves to master -case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage +private[spark] case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage -case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) +private[spark] case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) extends StandaloneClusterMessage -object StatusUpdate { +private[spark] object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data)) @@ -25,5 +25,5 @@ object StatusUpdate { } // Internal messages in master -case object ReviveOffers extends StandaloneClusterMessage -case object StopMaster extends StandaloneClusterMessage +private[spark] case object ReviveOffers extends StandaloneClusterMessage +private[spark] case object StopMaster extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 83e7c6e036..8aebdedda2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -16,7 +16,7 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). */ -class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) +private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -149,6 +149,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) } -object StandaloneSchedulerBackend { +private[spark] object StandaloneSchedulerBackend { val ACTOR_NAME = "StandaloneScheduler" } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index f9a1b74fa5..aa097fd3a2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -3,7 +3,7 @@ package spark.scheduler.cluster import java.nio.ByteBuffer import spark.util.SerializableBuffer -class TaskDescription( +private[spark] class TaskDescription( val taskId: Long, val slaveId: String, val name: String, diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala index 65e59841a9..ae2c6b9836 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala @@ -3,7 +3,7 @@ package spark.scheduler.cluster /** * Information about a running task attempt inside a TaskSet. */ -class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) { +private[spark] class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) { var finishTime: Long = 0 var failed = false diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index aa37462fb0..9bb88ad6a1 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -17,7 +17,7 @@ import java.nio.ByteBuffer /** * Schedules the tasks within a single TaskSet in the ClusterScheduler. */ -class TaskSetManager( +private[spark] class TaskSetManager( sched: ClusterScheduler, val taskSet: TaskSet) extends Logging { diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala index 1e83f103e7..298b6d5529 100644 --- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala +++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala @@ -3,5 +3,5 @@ package spark.scheduler.cluster /** * Represents free resources available on a worker node. */ -class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) { +private[spark] class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) { } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 53fc659345..2b38d8b52e 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -15,7 +15,7 @@ import spark.scheduler._ * the scheduler also allows each task to fail up to maxFailures times, which is useful for * testing fault recovery. */ -class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext) +private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext) extends TaskScheduler with Logging { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index fdf007ffb2..9737c6b63e 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -24,7 +24,7 @@ import spark.TaskState * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to * remove this. */ -class CoarseMesosSchedulerBackend( +private[spark] class CoarseMesosSchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, master: String, diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 44eda93dd1..e85e4ef318 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -20,7 +20,7 @@ import spark.TaskState * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks * from multiple apps can run on different cores) and in time (a core can switch ownership). */ -class MesosSchedulerBackend( +private[spark] class MesosSchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, master: String, diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 37d5862575..bb033871b6 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -19,7 +19,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import sun.nio.ch.DirectBuffer -class BlockManagerId(var ip: String, var port: Int) extends Externalizable { +private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { def this() = this(null, 0) override def writeExternal(out: ObjectOutput) { @@ -43,11 +43,11 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable { } -case class BlockException(blockId: String, message: String, ex: Exception = null) +private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) -class BlockLocker(numLockers: Int) { +private[spark] class BlockLocker(numLockers: Int) { private val hashLocker = Array.fill(numLockers)(new Object()) def getLock(blockId: String): Object = { @@ -56,7 +56,7 @@ class BlockLocker(numLockers: Int) { } -class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) +private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { case class BlockInfo(level: StorageLevel, tellMaster: Boolean) diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 0c654364cd..ef76b3f470 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -16,14 +16,14 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} -sealed trait ToBlockManagerMaster +private[spark] sealed trait ToBlockManagerMaster -case class RegisterBlockManager( +private[spark] case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long) extends ToBlockManagerMaster -class HeartBeat( +private[spark] class HeartBeat( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, @@ -53,7 +53,7 @@ class HeartBeat( } } -object HeartBeat { +private[spark] object HeartBeat { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, @@ -68,18 +68,18 @@ object HeartBeat { } } -case class GetLocations(blockId: String) extends ToBlockManagerMaster +private[spark] case class GetLocations(blockId: String) extends ToBlockManagerMaster -case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster +private[spark] case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster -case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster +private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster -case class RemoveHost(host: String) extends ToBlockManagerMaster +private[spark] case class RemoveHost(host: String) extends ToBlockManagerMaster -case object StopBlockManagerMaster extends ToBlockManagerMaster +private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster -class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { +private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { class BlockManagerInfo( val blockManagerId: BlockManagerId, @@ -330,7 +330,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } } -class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) +private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) extends Logging { val AKKA_ACTOR_NAME: String = "BlockMasterManager" diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index 47e4d14010..f72079e267 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -17,7 +17,7 @@ import spark.network._ * * TODO: Use event model. */ -class BlockManagerWorker(val blockManager: BlockManager) extends Logging { +private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging { initLogging() blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive) @@ -87,7 +87,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { } } -object BlockManagerWorker extends Logging { +private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null private val DATA_TRANSFER_TIME_OUT_MS: Long = 500 private val REQUEST_RETRY_INTERVAL_MS: Long = 1000 diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index 4b5cfebba2..3f234df654 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -8,11 +8,11 @@ import scala.collection.mutable.ArrayBuffer import spark._ import spark.network._ -case class GetBlock(id: String) -case class GotBlock(id: String, data: ByteBuffer) -case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel) +private[spark] case class GetBlock(id: String) +private[spark] case class GotBlock(id: String, data: ByteBuffer) +private[spark] case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel) -class BlockMessage() { +private[spark] class BlockMessage() { // Un-initialized: typ = 0 // GetBlock: typ = 1 // GotBlock: typ = 2 @@ -158,7 +158,7 @@ class BlockMessage() { } } -object BlockMessage { +private[spark] object BlockMessage { val TYPE_NON_INITIALIZED: Int = 0 val TYPE_GET_BLOCK: Int = 1 val TYPE_GOT_BLOCK: Int = 2 diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index 64acc7eb47..8cf7565be7 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -8,7 +8,7 @@ import scala.collection.mutable.ArrayBuffer import spark._ import spark.network._ -class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging { +private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging { def this(bm: BlockMessage) = this(Array(bm)) @@ -85,7 +85,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM } } -object BlockMessageArray { +private[spark] object BlockMessageArray { def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = { val newBlockMessageArray = new BlockMessageArray() diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index 2d52fac1ef..2237ce92b3 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -2,7 +2,7 @@ package spark.storage import java.io.{Externalizable, ObjectInput, ObjectOutput} -class StorageLevel( +private[spark] class StorageLevel( var useDisk: Boolean, var useMemory: Boolean, var deserialized: Boolean, @@ -63,7 +63,7 @@ class StorageLevel( "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) } -object StorageLevel { +private[spark] object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index df4e23bfd6..f670ccb709 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -17,7 +17,7 @@ import java.util.concurrent.TimeoutException /** * Various utility classes for working with Akka. */ -object AkkaUtils { +private[spark] object AkkaUtils { /** * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the * ActorSystem itself and its port (which is hard to get from Akka). diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala index c3ff063569..0427646747 100644 --- a/core/src/main/scala/spark/util/IntParam.scala +++ b/core/src/main/scala/spark/util/IntParam.scala @@ -3,7 +3,7 @@ package spark.util /** * An extractor object for parsing strings into integers. */ -object IntParam { +private[spark] object IntParam { def unapply(str: String): Option[Int] = { try { Some(str.toInt) diff --git a/core/src/main/scala/spark/util/MemoryParam.scala b/core/src/main/scala/spark/util/MemoryParam.scala index 4fba914afe..3726738842 100644 --- a/core/src/main/scala/spark/util/MemoryParam.scala +++ b/core/src/main/scala/spark/util/MemoryParam.scala @@ -6,7 +6,7 @@ import spark.Utils * An extractor object for parsing JVM memory strings, such as "10g", into an Int representing * the number of megabytes. Supports the same formats as Utils.memoryStringToMb. */ -object MemoryParam { +private[spark] object MemoryParam { def unapply(str: String): Option[Int] = { try { Some(Utils.memoryStringToMb(str)) diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala index 0830843a77..b6e153b00b 100644 --- a/core/src/main/scala/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/spark/util/SerializableBuffer.scala @@ -8,7 +8,7 @@ import java.nio.channels.Channels * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make * it easier to pass ByteBuffers in case class messages. */ -class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { +private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { def value = buffer private def readObject(in: ObjectInputStream) { diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala index 11d7939204..101895f08e 100644 --- a/core/src/main/scala/spark/util/StatCounter.scala +++ b/core/src/main/scala/spark/util/StatCounter.scala @@ -5,7 +5,7 @@ package spark.util * numerically robust way. Includes support for merging two StatCounters. Based on Welford and * Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance. */ -class StatCounter(values: TraversableOnce[Double]) extends Serializable { +private[spark] class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) @@ -82,7 +82,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } } -object StatCounter { +private[spark] object StatCounter { def apply(values: TraversableOnce[Double]) = new StatCounter(values) def apply(values: Double*) = new StatCounter(values)