зеркало из https://github.com/microsoft/spark.git
Fix all code examples in guide
This commit is contained in:
Родитель
2cd9358ccf
Коммит
af645be5b8
|
@ -357,7 +357,7 @@ val relationships: RDD[Edge[String]] =
|
|||
val defaultUser = ("John Doe", "Missing")
|
||||
// Build the initial Graph
|
||||
val graph = Graph(users, relationships, defaultUser)
|
||||
// Notice that there is a user 0 (for which we have no information) connecting users
|
||||
// Notice that there is a user 0 (for which we have no information) connected to users
|
||||
// 4 (peter) and 5 (franklin).
|
||||
graph.triplets.map(
|
||||
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
|
||||
|
@ -858,11 +858,11 @@ val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
|
|||
val ranks = graph.pageRank(0.0001).vertices
|
||||
// Join the ranks with the usernames
|
||||
val users = sc.textFile("graphx/data/users.txt").map { line =>
|
||||
val fields = line.split("\\s+")
|
||||
val fields = line.split(",")
|
||||
(fields(0).toLong, fields(1))
|
||||
}
|
||||
val ranksByUsername = users.leftOuterJoin(ranks).map {
|
||||
case (id, (username, rankOpt)) => (username, rankOpt.getOrElse(0.0))
|
||||
val ranksByUsername = users.join(ranks).map {
|
||||
case (id, (username, rank)) => (username, rank)
|
||||
}
|
||||
// Print the result
|
||||
println(ranksByUsername.collect().mkString("\n"))
|
||||
|
@ -881,11 +881,11 @@ val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
|
|||
val cc = graph.connectedComponents().vertices
|
||||
// Join the connected components with the usernames
|
||||
val users = sc.textFile("graphx/data/users.txt").map { line =>
|
||||
val fields = line.split("\\s+")
|
||||
val fields = line.split(",")
|
||||
(fields(0).toLong, fields(1))
|
||||
}
|
||||
val ccByUsername = users.join(cc).map { case (id, (username, cc)) =>
|
||||
(username, cc)
|
||||
val ccByUsername = users.join(cc).map {
|
||||
case (id, (username, cc)) => (username, cc)
|
||||
}
|
||||
// Print the result
|
||||
println(ccByUsername.collect().mkString("\n"))
|
||||
|
@ -900,12 +900,12 @@ A vertex is part of a triangle when it has two adjacent vertices with an edge be
|
|||
|
||||
{% highlight scala %}
|
||||
// Load the edges in canonical order and partition the graph for triangle count
|
||||
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(RandomVertexCut)
|
||||
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
|
||||
// Find the triangle count for each vertex
|
||||
val triCounts = graph.triangleCount().vertices
|
||||
// Join the triangle counts with the usernames
|
||||
val users = sc.textFile("graphx/data/users.txt").map { line =>
|
||||
val fields = line.split("\\s+")
|
||||
val fields = line.split(",")
|
||||
(fields(0).toLong, fields(1))
|
||||
}
|
||||
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
|
||||
|
@ -934,32 +934,32 @@ all of this in just a few lines with GraphX:
|
|||
// Connect to the Spark cluster
|
||||
val sc = new SparkContext("spark://master.amplab.org", "research")
|
||||
|
||||
// Load my user data and prase into tuples of user id and attribute list
|
||||
val users = sc.textFile("hdfs://user_attributes.tsv")
|
||||
.map(line => line.split).map( parts => (parts.head, parts.tail) )
|
||||
// Load my user data and parse into tuples of user id and attribute list
|
||||
val users = (sc.textFile("graphx/data/users.txt")
|
||||
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
|
||||
|
||||
// Parse the edge data which is already in userId -> userId format
|
||||
val followerGraph = Graph.textFile(sc, "hdfs://followers.tsv")
|
||||
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
|
||||
|
||||
// Attach the user attributes
|
||||
val graph = followerGraph.outerJoinVertices(users){
|
||||
val graph = followerGraph.outerJoinVertices(users) {
|
||||
case (uid, deg, Some(attrList)) => attrList
|
||||
// Some users may not have attributes so we set them as empty
|
||||
case (uid, deg, None) => Array.empty[String]
|
||||
}
|
||||
}
|
||||
|
||||
// Restrict the graph to users which have exactly two attributes
|
||||
val subgraph = graph.subgraph((vid, attr) => attr.size == 2)
|
||||
// Restrict the graph to users with usernames and names
|
||||
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
|
||||
|
||||
// Compute the PageRank
|
||||
val pagerankGraph = Analytics.pagerank(subgraph)
|
||||
val pagerankGraph = subgraph.pageRank(0.001)
|
||||
|
||||
// Get the attributes of the top pagerank users
|
||||
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices){
|
||||
case (uid, attrList, Some(pr)) => (pr, attrList)
|
||||
case (uid, attrList, None) => (pr, attrList)
|
||||
}
|
||||
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
|
||||
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
|
||||
case (uid, attrList, None) => (0.0, attrList.toList)
|
||||
}
|
||||
|
||||
println(userInfoWithPageRank.top(5))
|
||||
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
|
||||
|
||||
{% endhighlight %}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
1 BarackObama
|
||||
2 ladygaga
|
||||
3 jeresig
|
||||
4 justinbieber
|
||||
6 matei_zaharia
|
||||
7 odersky
|
||||
1,BarackObama,Barack Obama
|
||||
2,ladygaga,Goddess of Love
|
||||
3,jeresig,John Resig
|
||||
4,justinbieber,Justin Bieber
|
||||
6,matei_zaharia,Matei Zaharia
|
||||
7,odersky,Martin Odersky
|
||||
8,anonsys
|
||||
|
|
Загрузка…
Ссылка в новой задаче