From cce758b8938afb24c6d61a02f697201c41801fb6 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 7 Aug 2013 16:38:52 +0200 Subject: [PATCH 1/2] Adding Scala version of PageRank example --- .../scala/spark/examples/SparkPageRank.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 examples/src/main/scala/spark/examples/SparkPageRank.scala diff --git a/examples/src/main/scala/spark/examples/SparkPageRank.scala b/examples/src/main/scala/spark/examples/SparkPageRank.scala new file mode 100644 index 0000000000..6d2be34956 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkPageRank.scala @@ -0,0 +1,51 @@ +package spark.examples + +import spark.SparkContext._ +import spark.SparkContext + + +/** + * Computes the PageRank of URLs from an input file. Input file should + * be in format of: + * URL neighbor URL + * URL neighbor URL + * URL neighbor URL + * ... + * where URL and their neighbors are separated by space(s). + */ +object SparkPageRank { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: PageRank ") + System.exit(1) + } + var iters = args(2).toInt + val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val lines = ctx.textFile(args(1), 1) + val links = lines.map(s => { + val parts = s.split("\\s+") + (parts(0), parts(1)) + }).distinct().groupByKey().cache() + var ranks = links.mapValues(v => 1.0) + + for (i <- 1 to iters) { + val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => { + val size = urls.size + urls.map(url => (url, rank / size)) + }} + + ranks = contribs.groupByKey().mapValues(ranks => { + val sumRanks = ranks.foldLeft(0.0)(_ + _) + 0.15 + sumRanks * 0.85 + }) + } + + val output = ranks.collect() + output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) + + ctx.stop() + System.exit(0) + } +} + From c4eea875ac5d02b46b22b454532c9702c3fa6240 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 8 Aug 2013 12:40:37 +0200 Subject: [PATCH 2/2] Style changes as per Matei's comments --- .../scala/spark/examples/SparkPageRank.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/examples/src/main/scala/spark/examples/SparkPageRank.scala b/examples/src/main/scala/spark/examples/SparkPageRank.scala index 6d2be34956..4e41c026a4 100644 --- a/examples/src/main/scala/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/spark/examples/SparkPageRank.scala @@ -14,7 +14,6 @@ import spark.SparkContext * where URL and their neighbors are separated by space(s). */ object SparkPageRank { - def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: PageRank ") @@ -23,22 +22,22 @@ object SparkPageRank { var iters = args(2).toInt val ctx = new SparkContext(args(0), "PageRank", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = ctx.textFile(args(1), 1) - val links = lines.map(s => { - val parts = s.split("\\s+") - (parts(0), parts(1)) - }).distinct().groupByKey().cache() + val links = lines.map{s => + val parts = s.split("\\s+") + (parts(0), parts(1)) + }.distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { - val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => { + val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => val size = urls.size urls.map(url => (url, rank / size)) - }} + } - ranks = contribs.groupByKey().mapValues(ranks => { + ranks = contribs.groupByKey().mapValues{ranks => val sumRanks = ranks.foldLeft(0.0)(_ + _) 0.15 + sumRanks * 0.85 - }) + } } val output = ranks.collect()