Style changes as per Matei's comments

This commit is contained in:
Nick Pentreath 2013-08-08 12:40:37 +02:00
Родитель cce758b893
Коммит c4eea875ac
1 изменённых файлов: 8 добавлений и 9 удалений

Просмотреть файл

@ -14,7 +14,6 @@ import spark.SparkContext
* where URL and their neighbors are separated by space(s).
*/
object SparkPageRank {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
@ -23,22 +22,22 @@ object SparkPageRank {
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = ctx.textFile(args(1), 1)
val links = lines.map(s => {
val parts = s.split("\\s+")
(parts(0), parts(1))
}).distinct().groupByKey().cache()
val links = lines.map{s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}}
}
ranks = contribs.groupByKey().mapValues(ranks => {
ranks = contribs.groupByKey().mapValues{ranks =>
val sumRanks = ranks.foldLeft(0.0)(_ + _)
0.15 + sumRanks * 0.85
})
}
}
val output = ranks.collect()