Matei Zaharia 2011-03-06 23:38:16 -08:00
Родитель 04c2d6a60c
Коммит bce95b8458
6 изменённых файлов: 56 добавлений и 21 удалений

@ -19,23 +19,25 @@ extends Split {
class CoGroupAggregator[K] extends Aggregator[K, Any, ArrayBuffer[Any]] (
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
class CoGroupedRDD[K](rdds: Seq[RDD[(K, _)]], part: Partitioner[K])
extends RDD[(K, Seq[Seq[_]])](rdds.first.context) {
val aggr = new CoGroupAggregator[K]
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.first.context) with Logging {
val aggr = new CoGroupAggregator
override val dependencies = {
val deps = new ArrayBuffer[Dependency[_]]
for ((rdd, index) <- rdds.zipWithIndex) {
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
deps += new OneToOneDependency(rdd)
} else {
deps += new ShuffleDependency[K, Any, ArrayBuffer[Any]](
logInfo("Adding shuffle dependency with " + rdd)
deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](
context.newShuffleId, rdd, aggr, part)
@ -60,7 +62,7 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) {
override def splits = splits_
override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]])
override val partitioner = Some(part)
override def preferredLocations(s: Split) = Nil
@ -70,15 +72,16 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) {
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
map.getOrElseUpdate(k, Array.fill(rdds.size)(new ArrayBuffer[Any]))
for ((dep, index) <- split.deps.zipWithIndex) dep match {
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplit) => {
// Read them from the parent
for ((k: K, v) <- rdd.iterator(itsSplit)) {
getSeq(k)(index) += v
getSeq(k)(depNum) += v
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
logInfo("Grabbing map outputs for shuffle ID " + shuffleId)
val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
val serverUris = MapOutputTracker.getServerUris(shuffleId)
for ((serverUri, index) <- serverUris.zipWithIndex) {
@ -86,14 +89,15 @@ extends RDD[(K, Seq[Seq[_]])](rdds.first.context) {
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
for (i <- inputIds) {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, index)
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, split.index)
val inputStream = new ObjectInputStream(new URL(url).openStream())
logInfo("Opened stream to " + url)
try {
while (true) {
val (k, vs) = inputStream.readObject().asInstanceOf[(K, Seq[Any])]
val mySeq = getSeq(k)
for (v <- vs)
mySeq(index) += v
mySeq(depNum) += v
} catch {
case e: EOFException => {}

@ -12,7 +12,7 @@ class ShuffleDependency[K, V, C](
val shuffleId: Int,
rdd: RDD[(K, V)],
val aggregator: Aggregator[K, V, C],
val partitioner: Partitioner[K]
val partitioner: Partitioner
) extends Dependency(rdd, true)
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

@ -1,21 +1,21 @@
package spark
abstract class Partitioner[K] {
abstract class Partitioner {
def numPartitions: Int
def getPartition(key: K): Int
def getPartition(key: Any): Int
class HashPartitioner[K](partitions: Int) extends Partitioner[K] {
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
def getPartition(key: K) = {
def getPartition(key: Any) = {
val mod = key.hashCode % partitions
if (mod < 0) mod + partitions else mod // Guard against negative hash codes
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner[_] =>
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ => false

@ -24,7 +24,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
val dependencies: List[Dependency[_]]
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner[_]] = None
val partitioner: Option[Partitioner] = None
def context = sc
@ -111,6 +111,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def toArray(): Array[T] = collect()
override def toString(): String = {
"%s(%d)".format(getClass.getSimpleName, id)
// TODO: Reimplement these to properly build any shuffle dependencies on
// the cluster rather than attempting to compute a partiton on the master
@ -191,7 +195,7 @@ extends RDD[Array[T]](prev.context) {
: RDD[(K, C)] =
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
val partitioner = new HashPartitioner[K](numSplits)
val partitioner = new HashPartitioner(numSplits)
new ShuffledRDD(self, aggregator, partitioner)
@ -253,6 +257,33 @@ extends RDD[Array[T]](prev.context) {
new MappedValuesRDD(self, cleanF)
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
val part = self.partitioner match {
case Some(p) => p
case None => new HashPartitioner(numCores)
new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part).map {
case (k, Seq(vs, ws)) =>
(k, (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]))
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
val part = self.partitioner match {
case Some(p) => p
case None => new HashPartitioner(numCores)
new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
other1.asInstanceOf[RDD[(_, _)]],
other2.asInstanceOf[RDD[(_, _)]]),
part).map {
case (k, Seq(vs, w1s, w2s)) =>
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
if (self.partitioner != None) {

@ -12,7 +12,7 @@ extends Task[String] {
override def run: String = {
val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner.asInstanceOf[Partitioner[Any]]
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
for (elem <- rdd.iterator(split)) {
val (k, v) = elem.asInstanceOf[(Any, Any)]

@ -14,10 +14,10 @@ class ShuffledRDDSplit(val idx: Int) extends Split {
class ShuffledRDD[K, V, C](
parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C],
part : Partitioner[K])
part : Partitioner)
extends RDD[(K, C)](parent.context) {
//override val partitioner = Some(part)
override val partitioner: Option[Partitioner[_]] = Some(part.asInstanceOf[Partitioner[_]])
override val partitioner = Some(part)
@transient val splits_ =
Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))