Corrected the logic with ShortestPaths so that the calculation will run forward rather than backwards. Output before looked like:
```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,""), (2L,""), (3L,""))), sc.makeRDD(Array(Edge(1L,2L,""), Edge(2L,3L,""))))
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 -> 0)), (2,Map()))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 -> 0)), (3,Map(1 -> 2)), (2,Map(1 -> 1)))
```
And new output after the changes looks like:
```scala
import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,""), (2L,""), (3L,""))), sc.makeRDD(Array(Edge(1L,2L,""), Edge(2L,3L,""))))
lib.ShortestPaths.run(g,Array(3)).vertices.collect
// res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(3 -> 2)), (2,Map(3 -> 1)), (3,Map(3 -> 0)))
lib.ShortestPaths.run(g,Array(1)).vertices.collect
// res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 -> 0)), (2,Map()), (3,Map()))
```
Author: Brennon York <brennon.york@capitalone.com>
Closes#4478 from brennonyork/SPARK-5343 and squashes the following commits:
aa57f83 [Brennon York] updated to set ShortestPaths to run 'forward' rather than 'backward'
When I build a graph with a file format error, there will be an ArrayIndexOutOfBoundsException
Author: Leolh <leosandylh@gmail.com>
Closes#4176 from Leolh/patch-1 and squashes the following commits:
94f6d22 [Leolh] Update GraphLoader.scala
23767f1 [Leolh] [SPARK-3650][GraphX] There will be an ArrayIndexOutOfBoundsException if the format of the source file is wrong
Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility.
However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`.
Author: zsxwing <zsxwing@gmail.com>
Closes#3642 from zsxwing/SPARK-4795 and squashes the following commits:
914b2d6 [zsxwing] Add implicit back to the Writables methods
0b9017f [zsxwing] Add some docs
a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795
39343de [zsxwing] Fix the unit test
64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext
This fixes getStorageLevel for EdgeRDDImpl and VertexRDDImpl (and therefore for Graph).
See code example on JIRA which failed before but works with this patch: [https://issues.apache.org/jira/browse/SPARK-5534]
(The added unit tests also failed before but work with this fix.)
Note: I used partitionsRDD, assuming that getStorageLevel will only be called on the driver.
CC: mengxr (related to LDA PR), rxin ankurdave Thanks in advance!
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#4317 from jkbradley/graphx-storagelevel and squashes the following commits:
1c21e49 [Joseph K. Bradley] made graph getStorageLevel test more robust
18d64ca [Joseph K. Bradley] Added tests for getStorageLevel in VertexRDDSuite, EdgeRDDSuite, GraphSuite
17b488b [Joseph K. Bradley] overrode getStorageLevel in Vertex/EdgeRDDImpl to use partitionsRDD
Added the 2 methods to Graph and GraphImpl. Both make calls to the underlying vertex and edge RDDs.
This is needed for another PR (for LDA): [https://github.com/apache/spark/pull/4047]
Notes:
* getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String].
* I attempted to test to make sure the methods returned the correct values after checkpointing. It did not work; I guess that checkpointing does not occur quickly enough? I noticed that there are not checkpointing tests for RDDs; is it just hard to test well?
CC: rxin
CC: mengxr (since related to LDA)
Author: Joseph K. Bradley <joseph@databricks.com>
Closes#4253 from jkbradley/graphx-checkpoint and squashes the following commits:
b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile
250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD
695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient
cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work.
188665f [Joseph K. Bradley] improved documentation
235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl
These are more `javadoc` 8-related changes I spotted while investigating. These should be helpful in any event, but this does not nearly resolve SPARK-3359, which may never be feasible while using `unidoc` and `javadoc` 8.
Author: Sean Owen <sowen@cloudera.com>
Closes#4193 from srowen/SPARK-3359 and squashes the following commits:
5b33f66 [Sean Owen] Additional scaladoc fixes for javadoc 8; still not going to be javadoc 8 compatible
One side-effect of shading guava is that it disappears as a transitive
dependency. For Hadoop 2.x, this was masked by the fact that Hadoop
itself depends on guava. But certain versions of Hadoop 1.x also
shade guava, leaving either no guava or some random version pulled
by another dependency on the classpath.
So be explicit about the dependency in modules that use guava directly,
which is the right thing to do anyway.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#4272 from vanzin/SPARK-5466 and squashes the following commits:
e3f30e5 [Marcelo Vanzin] Dependency for catalyst is not needed.
d3b2c84 [Marcelo Vanzin] [SPARK-5466] Add explicit guava dependencies where needed.
If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;
object GraphTest extends Logging {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
ctx => {
ctx.sendToSrc(1)
ctx.sendToDst(2)
},
_ + _)
}
}
val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...
Author: Takeshi Yamamuro <linguin.m.s@gmail.com>
Closes#4136 from maropu/EdgePartitionBugFix and squashes the following commits:
0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl
I looked into GraphGenerators#chooseCell, and found that chooseCell can't generate more edges than pow(2, (2 * (log2(numVertices)-1))) to make a Power-law graph. (Ex. numVertices:4 upperbound:4, numVertices:8 upperbound:16, numVertices:16 upperbound:64)
If we request more edges over the upperbound, rmatGraph fall into infinite loop. So, how about adding an argument validation?
Author: Kenji Kikushima <kikushima.kenji@lab.ntt.co.jp>
Closes#3950 from kj-ki/SPARK-5064 and squashes the following commits:
4ee18c7 [Ankur Dave] Reword error message and add unit test
d760bc7 [Kenji Kikushima] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop.
This change does a few things to make the hadoop-provided profile more useful:
- Create new profiles for other libraries / services that might be provided by the infrastructure
- Simplify and fix the poms so that the profiles are only activated while building assemblies.
- Fix tests so that they're able to run when the profiles are activated
- Add a new env variable to be used by distributions that use these profiles to provide the runtime
classpath for Spark jobs and daemons.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#2982 from vanzin/SPARK-4048 and squashes the following commits:
82eb688 [Marcelo Vanzin] Add a comment.
eb228c0 [Marcelo Vanzin] Fix borked merge.
4e38f4e [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
9ef79a3 [Marcelo Vanzin] Alternative way to propagate test classpath to child processes.
371ebee [Marcelo Vanzin] Review feedback.
52f366d [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
83099fc [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
7377e7b [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
322f882 [Marcelo Vanzin] Fix merge fail.
f24e9e7 [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
8b00b6a [Marcelo Vanzin] Merge branch 'master' into SPARK-4048
9640503 [Marcelo Vanzin] Cleanup child process log message.
115fde5 [Marcelo Vanzin] Simplify a comment (and make it consistent with another pom).
e3ab2da [Marcelo Vanzin] Fix hive-thriftserver profile.
7820d58 [Marcelo Vanzin] Fix CliSuite with provided profiles.
1be73d4 [Marcelo Vanzin] Restore flume-provided profile.
d1399ed [Marcelo Vanzin] Restore jetty dependency.
82a54b9 [Marcelo Vanzin] Remove unused profile.
5c54a25 [Marcelo Vanzin] Fix HiveThriftServer2Suite with *-provided profiles.
1fc4d0b [Marcelo Vanzin] Update dependencies for hive-thriftserver.
f7b3bbe [Marcelo Vanzin] Add snappy to hadoop-provided list.
9e4e001 [Marcelo Vanzin] Remove duplicate hive profile.
d928d62 [Marcelo Vanzin] Redirect child stderr to parent's log.
4d67469 [Marcelo Vanzin] Propagate SPARK_DIST_CLASSPATH on Yarn.
417d90e [Marcelo Vanzin] Introduce "SPARK_DIST_CLASSPATH".
2f95f0d [Marcelo Vanzin] Propagate classpath to child processes during testing.
1adf91c [Marcelo Vanzin] Re-enable maven-install-plugin for a few projects.
284dda6 [Marcelo Vanzin] Rework the "hadoop-provided" profile, add new ones.
Convert bi-directional edges into uni-directional ones instead of 'canonicalOrientation' in GraphLoader.edgeListFile.
This function is useful when a graph is loaded as it is and then is transformed into one with canonical edges.
It rewrites the vertex ids of edges so that srcIds are bigger than dstIds, and merges the duplicated edges.
Author: Takeshi Yamamuro <linguin.m.s@gmail.com>
Closes#3760 from maropu/ConvertToCanonicalEdgesSpike and squashes the following commits:
7f8b580 [Takeshi Yamamuro] Add a function to convert into a graph with canonical edges in GraphOps
This PR:
- Reenables `surefire`, and copies config from `scalatest` (which is itself an old fork of `surefire`, so similar)
- Tells `surefire` to test only Java tests
- Enables `surefire` and `scalatest` for all children, and in turn eliminates some duplication.
For me this causes the Scala and Java tests to be run once each, it seems, as desired. It doesn't affect the SBT build but works for Maven. I still need to verify that all of the Scala tests and Java tests are being run.
Author: Sean Owen <sowen@cloudera.com>
Closes#3651 from srowen/SPARK-4159 and squashes the following commits:
2e8a0af [Sean Owen] Remove specialized SPARK_HOME setting for REPL, YARN tests as it appears to be obsolete
12e4558 [Sean Owen] Append to unit-test.log instead of overwriting, so that both surefire and scalatest output is preserved. Also standardize/correct comments a bit.
e6f8601 [Sean Owen] Reenable Java tests by reenabling surefire with config cloned from scalatest; centralize test config in the parent
The sum of vertices on matrix (v0 to v11) is 12. And, I think one same block overlaps in this strategy.
This is minor PR, so I didn't file in JIRA.
Author: kj-ki <kikushima.kenji@lab.ntt.co.jp>
Closes#3904 from kj-ki/fix-partitionstrategy-comments and squashes the following commits:
79829d9 [kj-ki] Fix comments for 2D partitioning.
As we learned in #3580, not explicitly typing implicit functions can lead to compiler bugs and potentially unexpected runtime behavior.
This is a follow up PR for rest of Spark (outside Spark SQL). The original PR for Spark SQL can be found at https://github.com/apache/spark/pull/3859
Author: Reynold Xin <rxin@databricks.com>
Closes#3860 from rxin/implicit and squashes the following commits:
73702f9 [Reynold Xin] [SPARK-5038] Add explicit return type for implicit functions.
Add an IF to uncache both vertices and edges of Graph/GraphImpl.
This IF is useful when iterative graph operations build a new graph in each iteration, and the vertices and edges of previous iterations are no longer needed for following iterations.
Author: Takeshi Yamamuro <linguin.m.s@gmail.com>
This patch had conflicts when merged, resolved by
Committer: Ankur Dave <ankurdave@gmail.com>
Closes#3476 from maropu/UnpersistInGraphSpike and squashes the following commits:
77a006a [Takeshi Yamamuro] Add unpersist in Graph and GraphImpl
This patch just replaces a native quick sorter with Sorter(TimSort) in Spark.
It could get performance gains by ~8% in my quick experiments.
Author: Takeshi Yamamuro <linguin.m.s@gmail.com>
Closes#3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits:
8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import
3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
Author: GuoQiang Li <witgo@qq.com>
Closes#2631 from witgo/SPARK-3623 and squashes the following commits:
a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation
The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672
In a nutshell, if `val partitionsRDD` in EdgeRDDImpl and VertexRDDImpl are non-transient, the serialization chain can become very long in iterative algorithms and finally lead to the StackOverflow error. More details and explanation can be found in the JIRA.
Author: JerryLead <JerryLead@163.com>
Author: Lijie Xu <csxulijie@gmail.com>
Closes#3544 from JerryLead/my_graphX and squashes the following commits:
628f33c [JerryLead] set PartitionsRDD to be transient in EdgeRDDImpl and VertexRDDImpl
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master
The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672
Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA.
Author: JerryLead <JerryLead@163.com>
Author: Lijie Xu <csxulijie@gmail.com>
Closes#3549 from JerryLead/my_graphX_checkpoint and squashes the following commits:
d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves
ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master
After additional discussion with rxin, I think having all the possible `TripletField` options is confusing. This pull request reduces the triplet fields to:
```java
/**
* None of the triplet fields are exposed.
*/
public static final TripletFields None = new TripletFields(false, false, false);
/**
* Expose only the edge field and not the source or destination field.
*/
public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
/**
* Expose the source and edge fields but not the destination field. (Same as Src)
*/
public static final TripletFields Src = new TripletFields(true, false, true);
/**
* Expose the destination and edge fields but not the source field. (Same as Dst)
*/
public static final TripletFields Dst = new TripletFields(false, true, true);
/**
* Expose all the fields (source, edge, and destination).
*/
public static final TripletFields All = new TripletFields(true, true, true);
```
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes#3472 from jegonzal/SimplifyTripletFields and squashes the following commits:
91796b5 [Joseph E. Gonzalez] removing confusing triplet fields
This pull request revises the programming guide to reflect changes in the GraphX API as well as the deprecated mapReduceTriplets operator.
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes#3359 from jegonzal/GraphXProgrammingGuide and squashes the following commits:
4421964 [Joseph E. Gonzalez] updating documentation for graphx
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#3277 from vanzin/version-1.3 and squashes the following commits:
7c3c396 [Marcelo Vanzin] Added temp repo to sbt build.
5f404ff [Marcelo Vanzin] Add another exclusion.
19457e7 [Marcelo Vanzin] Update old version to 1.2, add temporary 1.2 repo.
3c8d705 [Marcelo Vanzin] Workaround for MIMA checks.
e940810 [Marcelo Vanzin] Bumping version to 1.3.0-SNAPSHOT.
Due to vertex attribute caching, EdgeRDD previously took two type parameters: ED and VD. However, this is an implementation detail that should not be exposed in the interface, so this PR drops the VD type parameter.
This requires removing the `filter` method from the EdgeRDD interface, because it depends on vertex attribute caching.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#3303 from ankurdave/edgerdd-drop-tparam and squashes the following commits:
38dca9b [Ankur Dave] Leave EdgeRDD.fromEdges public
fafeb51 [Ankur Dave] Drop VD type parameter from EdgeRDD
This discourages users from calling the VertexRDD and EdgeRDD constructor and makes it easier for future changes to ensure backward compatibility.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2530 from ankurdave/SPARK-3666 and squashes the following commits:
d681f45 [Ankur Dave] Define getPartitions and compute in abstract class for MIMA
1472390 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666
24201d4 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666
cbe15f2 [Ankur Dave] Remove specialized annotation from VertexRDD and EdgeRDD
931b587 [Ankur Dave] Use abstract class instead of trait for binary compatibility
9ba4ec4 [Ankur Dave] Mark (Vertex|Edge)RDDImpl constructors package-private
620e603 [Ankur Dave] Extract VertexRDD interface and move implementation to VertexRDDImpl
55b6398 [Ankur Dave] Extract EdgeRDD interface and move implementation to EdgeRDDImpl
1. Add EdgeActiveness enum to represent activeness criteria more cleanly than using booleans.
2. Comments and whitespace.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#3231 from ankurdave/aggregateMessages-followup and squashes the following commits:
3d485c3 [Ankur Dave] Internal cleanup for aggregateMessages
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements:
1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages.
2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936.
Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition:
1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages.
2. Internal iterators in aggregateMessages are inlined into a while loop.
In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s).
Subsumes apache/spark#2815. Also fixes SPARK-4173.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#3100 from ankurdave/aggregateMessages and squashes the following commits:
f5b65d0 [Ankur Dave] Address @rxin comments on apache/spark#3054 and apache/spark#3100
1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets
194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test
e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder
c85076d [Ankur Dave] Readability improvements
b567be2 [Ankur Dave] iter.foreach -> while loop
4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition
As [reported][1] on the mailing list, GraphX throws
```
java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2
at org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at org.apache.spark.util.collection.ExternalSorter.spillToMergeableFile(ExternalSorter.scala:329)
```
when sort-based shuffle attempts to spill to disk. This is because GraphX defines custom serializers for shuffling pair RDDs that assume Spark will always serialize the entire pair object rather than breaking it up into its components. However, the spill code path in sort-based shuffle [violates this assumption][2].
GraphX uses the custom serializers to compress vertex ID keys using variable-length integer encoding. However, since the serializer can no longer rely on the key and value being serialized and deserialized together, performing such encoding would either require writing a tag byte (costly) or maintaining state in the serializer and assuming that serialization calls will alternate between key and value (fragile).
Instead, this PR simply removes the custom serializers. This causes a **10% slowdown** (494 s to 543 s) and **16% increase in per-iteration communication** (2176 MB to 2518 MB) for PageRank (averages across 3 trials, 10 iterations per trial, uk-2007-05 graph, 16 r3.2xlarge nodes).
[1]: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassCastException-java-lang-Long-cannot-be-cast-to-scala-Tuple2-td13926.html#a14501
[2]: f9d6220c79/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala (L329)
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2503 from ankurdave/SPARK-3649 and squashes the following commits:
a49c2ad [Ankur Dave] [SPARK-3649] Remove GraphX custom serializers
at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId to currSrcId
Author: lianhuiwang <lianhuiwang09@gmail.com>
Closes#3138 from lianhuiwang/SPARK-4249 and squashes the following commits:
3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx
Accumulate sizes of all the EdgePartitions just like the VertexRDD.
Author: luluorta <luluorta@gmail.com>
Closes#2975 from luluorta/graph-edge-count and squashes the following commits:
86ef0e5 [luluorta] Add overrided count for edge counting of EdgeRDD.
Changing the default number of edge partitions to match spark parallelism.
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes#3006 from jegonzal/default_partitions and squashes the following commits:
a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to match spark parallelism
Author: Sandy Ryza <sandy@cloudera.com>
Closes#789 from sryza/sandy-spark-1813 and squashes the following commits:
48b05e9 [Sandy Ryza] Simplify
b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time
6a15bb7 [Sandy Ryza] Small fix
a2278c0 [Sandy Ryza] Respond to review comments
6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Thread names are useful for correlating failures.
Author: Reynold Xin <rxin@apache.org>
Closes#2600 from rxin/log4j and squashes the following commits:
83ffe88 [Reynold Xin] [SPARK-3748] Log thread name in unit test logs
Author: oded <oded@HP-DV6.c4internal.c4-security.com>
Closes#2486 from odedz/master and squashes the following commits:
dd7890a [oded] Fixed the condition in StronglyConnectedComponents Issue: SPARK-3635
When `numVertices > 50`, probability is set to 0. This would cause infinite loop.
Author: yingjieMiao <yingjie@42go.com>
Closes#2553 from yingjieMiao/graphx and squashes the following commits:
6adf3c8 [yingjieMiao] [graphX] GraphOps: random pick vertex bug
GraphGenerators.sampleLogNormal is supposed to return an integer strictly less than maxVal. However, it violates this guarantee. It generates its return value as follows:
```scala
var X: Double = maxVal
while (X >= maxVal) {
val Z = rand.nextGaussian()
X = math.exp(mu + sigma*Z)
}
math.round(X.toFloat)
```
When X is sampled to be close to (but less than) maxVal, then it will pass the while loop condition, but the rounded result will be equal to maxVal, which will violate the guarantee. For example, if maxVal is 5 and X is 4.9, then X < maxVal, but `math.round(X.toFloat)` is 5.
This PR instead rounds X before checking the loop condition, guaranteeing that the condition will hold for the return value.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2439 from ankurdave/SPARK-3578 and squashes the following commits:
f6655e5 [Ankur Dave] Go back to math.floor
5900c22 [Ankur Dave] Round X in loop condition
6fd5fb1 [Ankur Dave] Run sampleLogNormal bounds check 1000 times
1638598 [Ankur Dave] Round down in sampleLogNormal to guarantee upper bound
VertexRDD.apply had a bug where it ignored the merge function for
duplicate vertices and instead used whichever vertex attribute occurred
first. This commit fixes the bug by passing the merge function through
to ShippableVertexPartition.apply, which merges any duplicates using the
merge function and then fills in missing vertices using the specified
default vertex attribute. This commit also adds a unit test for
VertexRDD.apply.
Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Author: Blie Arkansol <xiaodi@sjtu.edu.cn>
Author: Ankur Dave <ankurdave@gmail.com>
Closes#1903 from larryxiao/2062 and squashes the following commits:
625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062
476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values
614059f [Larry Xiao] doc update: note about the default null value vertices construction
dfdb3c9 [Larry Xiao] minor fix
1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces
e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc
4fbc29c [Blie Arkansol] undo unnecessary change
efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc
b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062
52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled
581e9ee [Larry Xiao] TODO: VertexRDDSuite
20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
GraphX's current implementation of static (fixed iteration count) PageRank uses the Pregel API. This unnecessarily tracks active vertices, even though in static PageRank all vertices are always active. Active vertex tracking incurs the following costs:
1. A shuffle per iteration to ship the active sets to the edge partitions.
2. A hash table creation per iteration at each partition to index the active sets for lookup.
3. A hash lookup per edge to check whether the source vertex is active.
I reimplemented static PageRank using the lower-level GraphX API instead of the Pregel API. In benchmarks on a 16-node m2.4xlarge cluster, this provided a 23% speedup (from 514 s to 397 s, mean over 3 trials) for 10 iterations of PageRank on a synthetic graph with 10M vertices and 1.27B edges.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2308 from ankurdave/SPARK-3427 and squashes the following commits:
449996a [Ankur Dave] Avoid unnecessary active vertex tracking in static PageRank
Author: GuoQiang Li <witgo@qq.com>
Closes#2268 from witgo/SPARK-3397 and squashes the following commits:
eaf913f [GuoQiang Li] Bump pom.xml version number of master branch to 1.2.0-SNAPSHOT
9b225ac307 has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2271 from ankurdave/SPARK-3400 and squashes the following commits:
10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
PR #720 made multiple changes to GraphGenerator.logNormalGraph including:
* Replacing the call to functions for generating random vertices and edges with in-line implementations with different equations. Based on reading the Pregel paper, I believe the in-line functions are incorrect.
* Hard-coding of RNG seeds so that method now generates the same graph for a given number of vertices, edges, mu, and sigma -- user is not able to override seed or specify that seed should be randomly generated.
* Backwards-incompatible change to logNormalGraph signature with introduction of new required parameter.
* Failed to update scala docs and programming guide for API changes
* Added a Synthetic Benchmark in the examples.
This PR:
* Removes the in-line calls and calls original vertex / edge generation functions again
* Adds an optional seed parameter for deterministic behavior (when desired)
* Keeps the number of partitions parameter that was added.
* Keeps compatibility with the synthetic benchmark example
* Maintains backwards-compatible API
Author: RJ Nowling <rnowling@gmail.com>
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2168 from rnowling/graphgenrand and squashes the following commits:
f1cd79f [Ankur Dave] Style fixes
e11918e [RJ Nowling] Fix bad comparisons in unit tests
785ac70 [RJ Nowling] Fix style error
c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed
41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed
799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal
43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges
2faf75f [RJ Nowling] Added unit test for logNormalGraph
82f22397 [RJ Nowling] Add unit test for sampleLogNormal
b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for unit testing
6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges
1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to call count on RDD before printing
dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs
b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set default to randomly generate a seed
1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead of numEdges was used to control number of edges to generate
98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters
d40141a [RJ Nowling] Fix style error
684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph and messed up seeding of RNGs. Add user-defined optional seed for deterministic behavior
c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that allows generating graphs randomly using optional seed.
015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make backward-incompatible change in commit 894ecde04
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
Author: luluorta <luluorta@gmail.com>
Closes#1763 from luluorta/fix-graph-zip and squashes the following commits:
8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
minor fix
detail is here: https://issues.apache.org/jira/browse/SPARK-2981
Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Closes#1902 from larryxiao/2981 and squashes the following commits:
88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow
Author: uncleGen <hustyugm@gmail.com>
Closes#2033 from uncleGen/master_origin and squashes the following commits:
801994b [uncleGen] Update EdgeRDD.scala
to support ~/spark/bin/run-example GraphXAnalytics triangles
/soc-LiveJournal1.txt --numEPart=256
Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Closes#1766 from larryxiao/1986 and squashes the following commits:
bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
VertexRDDs with more than 4 billion elements are counted incorrectly due to integer overflow when summing partition sizes. This PR fixes the issue by converting partition sizes to Longs before summing them.
The following code previously returned -10000000. After applying this PR, it returns the correct answer of 5000000000 (5 billion).
```scala
val pairs = sc.parallelize(0L until 500L).map(_ * 10000000)
.flatMap(start => start until (start + 10000000)).map(x => (x, x))
VertexRDD(pairs).count()
```
Author: Ankur Dave <ankurdave@gmail.com>
Closes#2106 from ankurdave/SPARK-3190 and squashes the following commits:
641f468 [Ankur Dave] Avoid overflow in VertexRDD.count()
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)
The main TODOs still left are:
- [x] enabling ExternalSorter to merge across spilled files
- [x] with an Ordering
- [x] without an Ordering, using the keys' hash codes
- [x] adding more tests (e.g. a version of our shuffle suite that runs on this)
- [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged
- [x] disabling spilling if spark.shuffle.spill is set to false
Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.
After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`.
Author: Matei Zaharia <matei@databricks.com>
Closes#1499 from mateiz/sort-based-shuffle and squashes the following commits:
bd841f9 [Matei Zaharia] Various review comments
d1c137fd [Matei Zaharia] Various review comments
a611159 [Matei Zaharia] Compile fixes due to rebase
62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s.
f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic)
9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase
0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle
eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD
fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams
a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test
03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter
ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes
5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data:
5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition)
e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it)
c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty
de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark]
4988d16 [Matei Zaharia] tweak
c1b7572 [Matei Zaharia] Small optimization
ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions
ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering
4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given
e1f84be [Matei Zaharia] Fix disk block manager test
5a40a1c [Matei Zaharia] More tests
614f1b4 [Matei Zaharia] Add spill metrics to map tasks
cc52caf [Matei Zaharia] Add more error handling and tests for error cases
bbf359d [Matei Zaharia] More work
3a56341 [Matei Zaharia] More partial work towards sort-based shuffle
7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle
b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
In a few places in MLlib, an expression of the form `log(1.0 + p)` is evaluated. When p is so small that `1.0 + p == 1.0`, the result is 0.0. However the correct answer is very near `p`. This is why `Math.log1p` exists.
Similarly for one instance of `exp(m) - 1` in GraphX; there's a special `Math.expm1` method.
While the errors occur only for very small arguments, given their use in machine learning algorithms, this is entirely possible.
Also note the related PR for Python: https://github.com/apache/spark/pull/1652
Author: Sean Owen <srowen@gmail.com>
Closes#1659 from srowen/SPARK-2748 and squashes the following commits:
c5926d4 [Sean Owen] Use log1p, expm1 for better precision for tiny arguments
JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)
Another try for #1399 & #1600. Those two PR breaks Jenkins builds because we made a separate profile `hive-thriftserver` in sub-project `assembly`, but the `hive-thriftserver` module is defined outside the `hive-thriftserver` profile. Thus every time a pull request that doesn't touch SQL code will also execute test suites defined in `hive-thriftserver`, but tests fail because related .class files are not included in the assembly jar.
In the most recent commit, module `hive-thriftserver` is moved into its own profile to fix this problem. All previous commits are squashed for clarity.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#1620 from liancheng/jdbc-with-maven-fix and squashes the following commits:
629988e [Cheng Lian] Moved hive-thriftserver module definition into its own profile
ec3c7a7 [Cheng Lian] Cherry picked the Hive Thrift server