This commit is contained in:
Matei Zaharia 2011-02-27 14:27:12 -08:00
Родитель 2e6023f2bf
Коммит f38f86d59e
11 изменённых файлов: 278 добавлений и 298 удалений

Просмотреть файл

@ -1,10 +1,7 @@
package spark
import java.util.concurrent._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.Map
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes
@ -56,17 +53,15 @@ private abstract class DAGScheduler extends Scheduler with Logging {
shuffleToMapStage.get(shuf) match {
case Some(stage) => stage
case None =>
val stage = newStage(
true, shuf.rdd, shuf.spec.partitioner.numPartitions)
val stage = newStage(shuf.rdd, Some(shuf))
shuffleToMapStage(shuf) = stage
stage
}
}
def newStage(isShuffleMap: Boolean, rdd: RDD[_], numPartitions: Int): Stage = {
def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = {
val id = newStageId()
val parents = getParentStages(rdd)
val stage = new Stage(id, isShuffleMap, rdd, parents, numPartitions)
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd))
idToStage(id) = stage
stage
}
@ -121,7 +116,7 @@ private abstract class DAGScheduler extends Scheduler with Logging {
override def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)(implicit m: ClassManifest[U])
: Array[U] = {
val numOutputParts: Int = rdd.splits.size
val finalStage = newStage(false, rdd, numOutputParts)
val finalStage = newStage(rdd, None)
val results = new Array[U](numOutputParts)
val finished = new Array[Boolean](numOutputParts)
var numFinished = 0
@ -130,6 +125,10 @@ private abstract class DAGScheduler extends Scheduler with Logging {
val running = new HashSet[Stage]
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
logInfo("Final stage: " + finalStage)
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage)) {
val missing = getMissingParentStages(stage)
@ -146,13 +145,20 @@ private abstract class DAGScheduler extends Scheduler with Logging {
}
def submitMissingTasks(stage: Stage) {
var tasks: List[Task[_]] = Nil
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
var tasks = ArrayBuffer[Task[_]]()
if (stage == finalStage) {
for (p <- 0 until numOutputParts if (!finished(p))) {
val locs = getPreferredLocs(rdd, p)
tasks = new ResultTask(rdd, func, p, locs) :: tasks
tasks += new ResultTask(finalStage.id, rdd, func, p, locs)
}
} else {
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
}
myPending ++= tasks
submitTasks(tasks)
}
@ -161,13 +167,35 @@ private abstract class DAGScheduler extends Scheduler with Logging {
while (numFinished != numOutputParts) {
val evt = completionEvents.take()
if (evt.successful) {
logInfo("Completed " + evt.task)
Accumulators.add(currentThread, evt.accumUpdates)
evt.task match {
case rt: ResultTask[_, _] =>
results(rt.partition) = evt.result.asInstanceOf[U]
finished(rt.partition) = true
numFinished += 1
// case smt: ShuffleMapTask
pendingTasks(finalStage) -= rt
case smt: ShuffleMapTask =>
val stage = idToStage(smt.stageId)
stage.addOutputLoc(smt.partition, evt.result.asInstanceOf[String])
val pending = pendingTasks(stage)
pending -= smt
MapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(_.first).toArray)
if (pending.isEmpty) {
logInfo(stage + " finished; looking for newly runnable stages")
running -= stage
val newlyRunnable = new ArrayBuffer[Stage]
for (stage <- waiting if getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable) {
submitMissingTasks(stage)
}
}
}
} else {
throw new SparkException("Task failed: " + evt.task)
@ -199,53 +227,10 @@ private abstract class DAGScheduler extends Scheduler with Logging {
if (locs != Nil)
return locs;
}
case _ =>
})
return Nil
}
}
case class CompletionEvent(task: Task[_], successful: Boolean, result: Any, accumUpdates: Map[Long, Any])
class Stage(val id: Int, val isShuffleMap: Boolean, val rdd: RDD[_], val parents: List[Stage], val numPartitions: Int) {
val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
var numAvailableOutputs = 0
def isAvailable: Boolean = {
if (parents.size == 0 && !isShuffleMap)
true
else
numAvailableOutputs == numPartitions
}
def addOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
outputLocs(partition) = host :: prevList
if (prevList == Nil)
numAvailableOutputs += 1
}
def removeOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
val newList = prevList - host
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil)
numAvailableOutputs -= 1
}
override def toString = "Stage " + id
override def hashCode(): Int = id
}
class ResultTask[T, U](rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
extends Task[U] {
val split = rdd.splits(partition)
override def run: U = {
func(rdd.iterator(split))
}
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask " + partition
}

Просмотреть файл

@ -1,120 +0,0 @@
package spark
import java.io.{EOFException, ObjectInputStream, ObjectOutputStream}
import java.net.URI
import java.util.UUID
import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
/**
* A simple implementation of shuffle using a distributed file system.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
@serializable
class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
override def compute(input: RDD[(K, V)],
numOutputSplits: Int,
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
: RDD[(K, C)] =
{
val sc = input.sparkContext
val dir = DfsShuffle.newTempDirectory()
logInfo("Intermediate data directory: " + dir)
val numberedSplitRdd = new NumberedSplitRDD(input)
val numInputSplits = numberedSplitRdd.splits.size
// Run a parallel foreach to write the intermediate data files
numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
val myIndex = pair._1
val myIterator = pair._2
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C])
for ((k, v) <- myIterator) {
var bucketId = k.hashCode % numOutputSplits
if (bucketId < 0) { // Fix bucket ID if hash code was negative
bucketId += numOutputSplits
}
val bucket = buckets(bucketId)
bucket(k) = bucket.get(k) match {
case Some(c) => mergeValue(c, v)
case None => createCombiner(v)
}
}
val fs = DfsShuffle.getFileSystem()
for (i <- 0 until numOutputSplits) {
val path = new Path(dir, "%d-to-%d".format(myIndex, i))
val out = new ObjectOutputStream(fs.create(path, true))
buckets(i).foreach(pair => out.writeObject(pair))
out.close()
}
})
// Return an RDD that does each of the merges for a given partition
val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
return indexes.flatMap((myIndex: Int) => {
val combiners = new HashMap[K, C]
val fs = DfsShuffle.getFileSystem()
for (i <- Utils.shuffle(0 until numInputSplits)) {
val path = new Path(dir, "%d-to-%d".format(i, myIndex))
val inputStream = new ObjectInputStream(fs.open(path))
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
}
}
} catch {
case e: EOFException => {}
}
inputStream.close()
}
combiners
})
}
}
/**
* Companion object of DfsShuffle; responsible for initializing a Hadoop
* FileSystem object based on the spark.dfs property and generating names
* for temporary directories.
*/
object DfsShuffle {
private var initialized = false
private var fileSystem: FileSystem = null
private def initializeIfNeeded() = synchronized {
if (!initialized) {
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val dfs = System.getProperty("spark.dfs", "file:///")
val conf = new Configuration()
conf.setInt("io.file.buffer.size", bufferSize)
conf.setInt("dfs.replication", 1)
fileSystem = FileSystem.get(new URI(dfs), conf)
initialized = true
}
}
def getFileSystem(): FileSystem = {
initializeIfNeeded()
return fileSystem
}
def newTempDirectory(): String = {
val fs = getFileSystem()
val workDir = System.getProperty("spark.dfs.workdir", "/tmp")
val uuid = UUID.randomUUID()
val path = workDir + "/shuffle-" + uuid
fs.mkdirs(new Path(path))
return path
}
}

Просмотреть файл

@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
*
* TODO: Add support for compression when spark.compress is set to true.
*/
/*
@serializable
class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
override def compute(input: RDD[(K, V)],
@ -90,7 +91,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
})
}
}
*/
object LocalFileShuffle extends Logging {
private var initialized = false

Просмотреть файл

@ -25,12 +25,12 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
Accumulators.clear
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val task = Utils.deserialize[Task[_]](
val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
val result: Any = task.run
val result: Any = deserializedTask.run
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)
taskEnded(task, true, result, accumUpdates)
taskEnded(tasks(i), true, result, accumUpdates)
} catch {
case e: Exception => {
// TODO: Do something nicer here

Просмотреть файл

@ -0,0 +1,29 @@
package spark
import java.util.concurrent.ConcurrentHashMap
object MapOutputTracker {
private val serverUris = new ConcurrentHashMap[Int, Array[String]]
def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) {
var array = serverUris.get(shuffleId)
if (array == null) {
array = Array.fill[String](numMaps)(null)
serverUris.put(shuffleId, array)
}
array(mapId) = serverUri
}
def registerMapOutputs(shuffleId: Int, locs: Array[String]) {
serverUris.put(shuffleId, Array[String]() ++ locs)
}
def getServerUris(shuffleId: Int): Array[String] = {
// TODO: On remote node, fetch locations from master
serverUris.get(shuffleId)
}
def getMapOutputUri(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int): String = {
"%s/shuffle/%s/%s/%s".format(serverUri, shuffleId, mapId, reduceId)
}
}

Просмотреть файл

@ -1,42 +0,0 @@
package spark
import mesos.SlaveOffer
/**
* An RDD that takes the splits of a parent RDD and gives them unique indexes.
* This is useful for a variety of shuffle implementations.
*/
class NumberedSplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[(Int, Iterator[T])](prev.sparkContext) {
@transient val splits_ = {
prev.splits.zipWithIndex.map {
case (s, i) => new NumberedSplitRDDSplit(s, i): Split
}.toArray
}
override def splits = splits_
override def preferredLocations(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.preferredLocations(nsplit.prev)
}
override def iterator(split: Split) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
Iterator((nsplit.index, prev.iterator(nsplit.prev)))
}
override def taskStarted(split: Split, slot: SlaveOffer) = {
val nsplit = split.asInstanceOf[NumberedSplitRDDSplit]
prev.taskStarted(nsplit.prev, slot)
}
}
/**
* A split in a NumberedSplitRDD.
*/
class NumberedSplitRDDSplit(val prev: Split, val index: Int) extends Split {
override def getId() = "NumberedSplitRDDSplit(%d)".format(index)
}

Просмотреть файл

@ -1,5 +1,8 @@
package spark
import java.io.EOFException
import java.net.URL
import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.HashSet
import java.util.Random
@ -25,16 +28,17 @@ class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
}
class ShuffleDependency[K, V, C](
val shuffleId: Int,
rdd: RDD[(K, V)],
val spec: ShuffleSpec[K, V, C]
val aggregator: Aggregator[K, V, C],
val partitioner: Partitioner[K]
) extends Dependency(rdd, true)
@serializable
class ShuffleSpec[K, V, C] (
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C,
val partitioner: Partitioner[K]
val mergeCombiners: (C, C) => C
)
@serializable
@ -43,6 +47,15 @@ abstract class Partitioner[K] {
def getPartition(key: K): Int
}
class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
def numPartitions = partitions
def getPartition(key: K) = {
val mod = key.hashCode % partitions
if (mod < 0) mod + partitions else mod // Careful of negative hash codes
}
}
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def splits: Array[Split]
@ -52,25 +65,27 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
val dependencies: List[Dependency[_]] = Nil
val partitioner: Option[Partitioner[_]] = None
def taskStarted(split: Split, slot: SlaveOffer) {}
def sparkContext = sc
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
/*
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
def cache() = new CachedRDD(this)
def sample(withReplacement: Boolean, frac: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, frac, seed)
def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
val tasks = splits.map(s => new ForeachTask(this, s, cleanF)).toArray
sc.runTaskObjects(tasks)
}
*/
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
@ -97,6 +112,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
return results.reduceLeft(f)
}
/*
def take(num: Int): Array[T] = {
if (num == 0)
return new Array[T](0)
@ -113,6 +129,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
*/
def count(): Long = {
try {
@ -126,10 +143,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def ++(other: RDD[T]): RDD[T] = this.union(other)
def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
//def splitRdd(): RDD[Array[T]] = new SplitRDD(this)
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
new CartesianRDD(sc, this, other)
//def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] =
// new CartesianRDD(sc, this, other)
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
this.map(t => (func(t), t)).groupByKey(numSplits)
@ -138,62 +155,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
groupBy[K](func, sc.numCores)
}
@serializable
abstract class RDDTask[U: ClassManifest, T: ClassManifest](
val rdd: RDD[T], val split: Split)
extends Task[U] {
override def preferredLocations() = rdd.preferredLocations(split)
override def markStarted(slot: SlaveOffer) { rdd.taskStarted(split, slot) }
}
class ForeachTask[T: ClassManifest](
rdd: RDD[T], split: Split, func: T => Unit)
extends RDDTask[Unit, T](rdd, split) with Logging {
override def run() {
logInfo("Processing " + split)
rdd.iterator(split).foreach(func)
}
}
class CollectTask[T](
rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
extends RDDTask[Array[T], T](rdd, split) with Logging {
override def run(): Array[T] = {
logInfo("Processing " + split)
rdd.iterator(split).toArray(m)
}
}
class ReduceTask[T: ClassManifest](
rdd: RDD[T], split: Split, f: (T, T) => T)
extends RDDTask[Option[T], T](rdd, split) with Logging {
override def run(): Option[T] = {
logInfo("Processing " + split)
val iter = rdd.iterator(split)
if (iter.hasNext)
Some(iter.reduceLeft(f))
else
None
}
}
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U)
extends RDD[U](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).map(f)
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
override val dependencies = List(new OneToOneDependency(prev))
}
class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).filter(f)
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
override def iterator(split: Split) = prev.iterator(split).map(f)
}
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
@ -201,9 +169,17 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
extends RDD[U](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) =
prev.iterator(split).toStream.flatMap(f).iterator
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
override val dependencies = List(new OneToOneDependency(prev))
override def iterator(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
}
/*
class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = prev.iterator(split).filter(f)
}
class SplitRDD[T: ClassManifest](prev: RDD[T])
@ -211,7 +187,6 @@ extends RDD[Array[T]](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
@ -245,8 +220,6 @@ extends RDD[T](prev.sparkContext) {
prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac))
}
}
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split.asInstanceOf[SeededSplit].prev, slot)
}
@ -316,13 +289,14 @@ private object CachedRDD {
// Remembers which splits are currently being loaded (on workers)
val loading = new HashSet[String]
}
*/
@serializable
class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split)
class UnionSplit[T: ClassManifest](rdd: RDD[T], index: Int, split: Split)
extends Split {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override def getId() = "UnionSplit(" + split.getId() + ")"
override def getId() = "UnionSplit(" + index + ", " + split.getId() + ")"
}
@serializable
@ -330,8 +304,8 @@ class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) {
@transient val splits_ : Array[Split] = {
val splits: Seq[Split] =
for (rdd <- rdds; split <- rdd.splits)
yield new UnionSplit(rdd, split)
for ((rdd, index) <- rdds.zipWithIndex; split <- rdd.splits)
yield new UnionSplit(rdd, index, split)
splits.toArray
}
@ -344,6 +318,7 @@ extends RDD[T](sc) {
s.asInstanceOf[UnionSplit[T]].preferredLocations()
}
/*
@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
override def getId() =
"CartesianSplit(" + s1.getId() + ", " + s2.getId() + ")"
@ -376,6 +351,58 @@ extends RDD[Pair[T, U]](sc) {
rdd2.taskStarted(currSplit.s2, slot)
}
}
*/
class ShuffledRDDSplit(val id: Int) extends Split {
override def getId() = "ShuffleRDDSplit(" + id + ")"
}
class ShuffledRDD[K, V, C](
parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C],
partitioner: Partitioner[K])
extends RDD[(K, C)](parent.sparkContext) {
@transient val splits_ =
Array.tabulate[Split](partitioner.numPartitions)(i => new ShuffledRDDSplit(i))
val dep = new ShuffleDependency(sparkContext.newShuffleId, parent, aggregator, partitioner)
override def splits = splits_
override def preferredLocations(split: Split) = Nil
override def iterator(split: Split): Iterator[(K, C)] = {
val shuffleId = dep.shuffleId
val splitId = split.asInstanceOf[ShuffledRDDSplit].id
val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
val serverUris = MapOutputTracker.getServerUris(shuffleId)
for ((serverUri, index) <- serverUris.zipWithIndex) {
splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += index
}
val combiners = new HashMap[K, C]
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
for (i <- inputIds) {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, splitId)
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners(k) = combiners.get(k) match {
case Some(oldC) => aggregator.mergeCombiners(oldC, c)
case None => c
}
}
} catch {
case e: EOFException => {}
}
inputStream.close()
}
}
combiners.iterator
}
override val dependencies = List(dep)
}
@serializable class PairRDDExtras[K, V](self: RDD[(K, V)]) {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
@ -397,10 +424,16 @@ extends RDD[Pair[T, U]](sc) {
numSplits: Int)
: RDD[(K, C)] =
{
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
val partitioner = new HashPartitioner[K](numSplits)
new ShuffledRDD(self, aggregator, partitioner)
// TODO
/*
val shufClass = Class.forName(System.getProperty(
"spark.shuffle.class", "spark.LocalFileShuffle"))
val shuf = shufClass.newInstance().asInstanceOf[Shuffle[K, V, C]]
shuf.compute(self, numSplits, createCombiner, mergeValue, mergeCombiners)
*/
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {

Просмотреть файл

@ -0,0 +1,14 @@
package spark
class ResultTask[T, U](val stageId: Int, rdd: RDD[T], func: Iterator[T] => U, val partition: Int, locs: Seq[String])
extends Task[U] {
val split = rdd.splits(partition)
override def run: U = {
func(rdd.iterator(split))
}
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask(" + stageId + ", " + partition + ")"
}

Просмотреть файл

@ -0,0 +1,38 @@
package spark
import java.io.FileOutputStream
import java.io.ObjectOutputStream
import scala.collection.mutable.HashMap
class ShuffleMapTask(val stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String])
extends Task[String] {
val split = rdd.splits(partition)
override def run: String = {
val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]]
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
for (elem <- rdd.iterator(split)) {
val (k, v) = elem.asInstanceOf[(Any, Any)]
var bucketId = partitioner.getPartition(k)
val bucket = buckets(bucketId)
bucket(k) = bucket.get(k) match {
case Some(c) => aggregator.mergeValue(c, v)
case None => aggregator.createCombiner(v)
}
}
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i)
val out = new ObjectOutputStream(new FileOutputStream(file))
buckets(i).foreach(pair => out.writeObject(pair))
out.close()
}
return LocalFileShuffle.getServerUri
}
override def preferredLocations: Seq[String] = locs
override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
}

Просмотреть файл

@ -89,8 +89,8 @@ extends Logging {
}
/** Build the union of a list of RDDs. */
def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
new UnionRDD(this, rdds)
//def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
// new UnionRDD(this, rdds)
// Methods for creating shared variables
@ -157,6 +157,14 @@ extends Logging {
// Get the number of cores available to run tasks (as reported by Scheduler)
def numCores = scheduler.numCores
private var nextShuffleId: Int = 0
private[spark] def newShuffleId(): Int = {
val id = nextShuffleId
nextShuffleId += 1
id
}
}

Просмотреть файл

@ -0,0 +1,34 @@
package spark
class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) {
val isShuffleMap = shuffleDep != None
val numPartitions = rdd.splits.size
val outputLocs = Array.fill[List[String]](numPartitions)(Nil)
var numAvailableOutputs = 0
def isAvailable: Boolean = {
if (parents.size == 0 && !isShuffleMap)
true
else
numAvailableOutputs == numPartitions
}
def addOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
outputLocs(partition) = host :: prevList
if (prevList == Nil)
numAvailableOutputs += 1
}
def removeOutputLoc(partition: Int, host: String) {
val prevList = outputLocs(partition)
val newList = prevList - host
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil)
numAvailableOutputs -= 1
}
override def toString = "Stage " + id
override def hashCode(): Int = id
}