[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc

VertexRDD.apply had a bug where it ignored the merge function for
duplicate vertices and instead used whichever vertex attribute occurred
first. This commit fixes the bug by passing the merge function through
to ShippableVertexPartition.apply, which merges any duplicates using the
merge function and then fills in missing vertices using the specified
default vertex attribute. This commit also adds a unit test for
VertexRDD.apply.

Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Author: Blie Arkansol <xiaodi@sjtu.edu.cn>
Author: Ankur Dave <ankurdave@gmail.com>

Closes #1903 from larryxiao/2062 and squashes the following commits:

625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062
476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
614059f [Larry Xiao] doc update: note about the default null value vertices construction
dfdb3c9 [Larry Xiao] minor fix
1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces
e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
4fbc29c [Blie Arkansol] undo unnecessary change
efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc
b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062
52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled
581e9ee [Larry Xiao] TODO: VertexRDDSuite
20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
This commit is contained in:
Larry Xiao 2014-09-18 23:32:32 -07:00 коммит произвёл Ankur Dave
Родитель e76ef5cb8e
Коммит 3bbbdd8180
3 изменённых файлов: 36 добавлений и 7 удалений

Просмотреть файл

@ -392,7 +392,7 @@ object VertexRDD {
*/
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
VertexRDD(vertices, edges, defaultVal, (a, b) => b)
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
}
/**
@ -419,7 +419,7 @@ object VertexRDD {
(vertexIter, routingTableIter) =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
}
new VertexRDD(vertexPartitions)
}

Просмотреть файл

@ -36,7 +36,7 @@ private[graphx]
object ShippableVertexPartition {
/** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)
/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
@ -44,10 +44,28 @@ object ShippableVertexPartition {
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
: ShippableVertexPartition[VD] = {
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
new ShippableVertexPartition(index, values, mask, routingTable)
: ShippableVertexPartition[VD] =
apply(iter, routingTable, defaultVal, (a, b) => a)
/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
* and merging duplicate vertex atrribute with mergeFunc.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// Merge the given vertices using mergeFunc
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// Fill in missing vertices mentioned in the routing table
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}
new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}
import scala.language.implicitConversions

Просмотреть файл

@ -99,4 +99,15 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}
test("mergeFunc") {
// test to see if the mergeFunc is working correctly
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}
}