Merge pull request #249 from andyk/move-rdds-to-their-own-package

Move RDD classes/files to their own package/directory
This commit is contained in:
Matei Zaharia 2012-10-05 21:59:57 -07:00
Родитель f52bc09a34 a242cdd0a6
Коммит 95ef307ef5
25 изменённых файлов: 228 добавлений и 99 удалений

Просмотреть файл

@ -31,6 +31,17 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
import spark.rdd.BlockRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
import spark.rdd.GlommedRDD
import spark.rdd.MappedRDD
import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithSplitRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import SparkContext._
@ -413,67 +424,4 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
}
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)
}
private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).flatMap(f)
}
private[spark]
class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).filter(f)
}
private[spark]
class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
}
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(prev.iterator(split))
}
/**
* A variant of the MapPartitionsRDD that passes the split index into the
* closure. This can be used to generate or collect partition specific
* information such as the number of tuples in a partition.
*/
private[spark]
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(split.index, prev.iterator(split))
}
}

Просмотреть файл

@ -4,12 +4,11 @@ import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.net.{URI, URLClassLoader}
import akka.actor.Actor
import akka.actor.Actor._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.generic.Growable
import akka.actor.Actor
import akka.actor.Actor._
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
@ -27,20 +26,22 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
import spark.rdd.DoubleRDDFunctions
import spark.rdd.HadoopRDD
import spark.rdd.NewHadoopRDD
import spark.rdd.OrderedRDDFunctions
import spark.rdd.PairRDDFunctions
import spark.rdd.SequenceFileRDDFunctions
import spark.rdd.UnionRDD
import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler

Просмотреть файл

@ -1,13 +1,5 @@
package spark.api.java
import spark.SparkContext.rddToPairRDDFunctions
import spark.api.java.function.{Function2 => JFunction2}
import spark.api.java.function.{Function => JFunction}
import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.storage.StorageLevel
import spark._
import java.util.{List => JList}
import java.util.Comparator
@ -19,6 +11,17 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.function.{Function2 => JFunction2}
import spark.api.java.function.{Function => JFunction}
import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.rdd.OrderedRDDFunctions
import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {

Просмотреть файл

@ -1,7 +1,13 @@
package spark
package spark.rdd
import scala.collection.mutable.HashMap
import spark.Dependency
import spark.RDD
import spark.SparkContext
import spark.SparkEnv
import spark.Split
private[spark] class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
}

Просмотреть файл

@ -1,4 +1,9 @@
package spark
package spark.rdd
import spark.NarrowDependency
import spark.RDD
import spark.SparkContext
import spark.Split
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {

Просмотреть файл

@ -1,11 +1,22 @@
package spark
package spark.rdd
import java.net.URL
import java.io.EOFException
import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.Aggregator
import spark.Dependency
import spark.Logging
import spark.OneToOneDependency
import spark.Partitioner
import spark.RDD
import spark.ShuffleDependency
import spark.SparkEnv
import spark.Split
private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep

Просмотреть файл

@ -1,4 +1,8 @@
package spark
package spark.rdd
import spark.NarrowDependency
import spark.RDD
import spark.Split
private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split

Просмотреть файл

@ -1,10 +1,13 @@
package spark
package spark.rdd
import spark.partial.BoundedDouble
import spark.partial.MeanEvaluator
import spark.partial.PartialResult
import spark.partial.SumEvaluator
import spark.Logging
import spark.RDD
import spark.TaskContext
import spark.util.StatCounter
/**

Просмотреть файл

@ -0,0 +1,12 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
private[spark]
class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).filter(f)
}

Просмотреть файл

@ -0,0 +1,16 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).flatMap(f)
}

Просмотреть файл

@ -0,0 +1,12 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
private[spark]
class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator
}

Просмотреть файл

@ -1,4 +1,4 @@
package spark
package spark.rdd
import java.io.EOFException
import java.util.NoSuchElementException
@ -15,6 +15,12 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import spark.Dependency
import spark.RDD
import spark.SerializableWritable
import spark.SparkContext
import spark.Split
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/

Просмотреть файл

@ -0,0 +1,16 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(prev.iterator(split))
}

Просмотреть файл

@ -0,0 +1,21 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
/**
* A variant of the MapPartitionsRDD that passes the split index into the
* closure. This can be used to generate or collect partition specific
* information such as the number of tuples in a partition.
*/
private[spark]
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(split.index, prev.iterator(split))
}

Просмотреть файл

@ -0,0 +1,16 @@
package spark.rdd
import spark.OneToOneDependency
import spark.RDD
import spark.Split
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)
}

Просмотреть файл

@ -1,4 +1,4 @@
package spark
package spark.rdd
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
@ -13,6 +13,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
import spark.Dependency
import spark.RDD
import spark.SerializableWritable
import spark.SparkContext
import spark.Split
private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {

Просмотреть файл

@ -1,4 +1,4 @@
package spark
package spark.rdd
import java.io.EOFException
import java.io.ObjectInputStream
@ -34,9 +34,20 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.TaskAttemptID
import org.apache.hadoop.mapreduce.TaskAttemptContext
import spark.SparkContext._
import spark.partial.BoundedDouble
import spark.partial.PartialResult
import spark.Aggregator
import spark.HashPartitioner
import spark.Logging
import spark.OneToOneDependency
import spark.Partitioner
import spark.RangePartitioner
import spark.RDD
import spark.SerializableWritable
import spark.SparkContext._
import spark.SparkException
import spark.Split
import spark.TaskContext
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.

Просмотреть файл

@ -1,4 +1,4 @@
package spark
package spark.rdd
import java.io.PrintWriter
import java.util.StringTokenizer
@ -8,6 +8,12 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import spark.OneToOneDependency
import spark.RDD
import spark.SparkEnv
import spark.Split
/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.

Просмотреть файл

@ -1,9 +1,13 @@
package spark
package spark.rdd
import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
import spark.RDD
import spark.OneToOneDependency
import spark.Split
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index

Просмотреть файл

@ -1,4 +1,4 @@
package spark
package spark.rdd
import java.io.EOFException
import java.net.URL
@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.Text
import SparkContext._
import spark.Logging
import spark.RDD
import spark.SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,

Просмотреть файл

@ -1,8 +1,15 @@
package spark
package spark.rdd
import scala.collection.mutable.ArrayBuffer
import java.util.{HashMap => JHashMap}
import spark.Aggregator
import spark.Partitioner
import spark.RangePartitioner
import spark.RDD
import spark.ShuffleDependency
import spark.SparkEnv
import spark.Split
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx

Просмотреть файл

@ -1,7 +1,13 @@
package spark
package spark.rdd
import scala.collection.mutable.ArrayBuffer
import spark.Dependency
import spark.RangeDependency
import spark.RDD
import spark.SparkContext
import spark.Split
private[spark] class UnionSplit[T: ClassManifest](
idx: Int,
rdd: RDD[T],

Просмотреть файл

@ -18,6 +18,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
sc.stop()
sc = null
}
System.clearProperty("spark.master.port")
}
test ("basic accumulation"){
@ -91,7 +92,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val maxI = 1000
for (nThreads <- List(1, 10)) {
// test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
sc = new SparkContext("local[" + nThreads + "]", "test")
val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
@ -110,6 +111,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
mapAcc.value should contain (i -> i.toString)
}
sc.stop()
sc = null
}
}
@ -117,7 +119,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
import SetAccum._
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
val sc = new SparkContext("local[" + nThreads + "]", "test")
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
val d = sc.parallelize(groupedInts)
@ -125,6 +127,8 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
sc.stop()
sc = null
}
}

Просмотреть файл

@ -3,6 +3,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark.rdd.CoalescedRDD
import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter {

Просмотреть файл

@ -1,5 +1,7 @@
package spark
import scala.collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
@ -10,8 +12,7 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer
import spark.rdd.ShuffledAggregatedRDD
import SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {