зеркало из https://github.com/microsoft/spark.git
Merge pull request #789 from MLnick/master
Adding Scala version of PageRank example
This commit is contained in:
Коммит
06e4f2a8f2
|
@ -0,0 +1,50 @@
|
||||||
|
package spark.examples
|
||||||
|
|
||||||
|
import spark.SparkContext._
|
||||||
|
import spark.SparkContext
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the PageRank of URLs from an input file. Input file should
|
||||||
|
* be in format of:
|
||||||
|
* URL neighbor URL
|
||||||
|
* URL neighbor URL
|
||||||
|
* URL neighbor URL
|
||||||
|
* ...
|
||||||
|
* where URL and their neighbors are separated by space(s).
|
||||||
|
*/
|
||||||
|
object SparkPageRank {
|
||||||
|
def main(args: Array[String]) {
|
||||||
|
if (args.length < 3) {
|
||||||
|
System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
|
var iters = args(2).toInt
|
||||||
|
val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||||
|
val lines = ctx.textFile(args(1), 1)
|
||||||
|
val links = lines.map{s =>
|
||||||
|
val parts = s.split("\\s+")
|
||||||
|
(parts(0), parts(1))
|
||||||
|
}.distinct().groupByKey().cache()
|
||||||
|
var ranks = links.mapValues(v => 1.0)
|
||||||
|
|
||||||
|
for (i <- 1 to iters) {
|
||||||
|
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
|
||||||
|
val size = urls.size
|
||||||
|
urls.map(url => (url, rank / size))
|
||||||
|
}
|
||||||
|
|
||||||
|
ranks = contribs.groupByKey().mapValues{ranks =>
|
||||||
|
val sumRanks = ranks.foldLeft(0.0)(_ + _)
|
||||||
|
0.15 + sumRanks * 0.85
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val output = ranks.collect()
|
||||||
|
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
|
||||||
|
|
||||||
|
ctx.stop()
|
||||||
|
System.exit(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Загрузка…
Ссылка в новой задаче