Optimize Scala PageRank to use reduceByKey

This commit is contained in:
Matei Zaharia 2013-08-10 18:09:54 -07:00
Родитель 06e4f2a8f2
Коммит 4c4f769187
1 изменённых файлов: 4 добавлений и 8 удалений

Просмотреть файл

@ -20,9 +20,10 @@ object SparkPageRank {
System.exit(1)
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val ctx = new SparkContext(args(0), "PageRank",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{s =>
val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
@ -33,17 +34,12 @@ object SparkPageRank {
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.groupByKey().mapValues{ranks =>
val sumRanks = ranks.foldLeft(0.0)(_ + _)
0.15 + sumRanks * 0.85
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
ctx.stop()
System.exit(0)
}
}