Revert de01b6deaaee1b43321e0aac330f4a98c0ea61c6^..HEAD

This commit is contained in:
Edison Tung 2011-12-01 13:43:25 -08:00
Родитель de01b6deaa
Коммит 42f8847a21
3 изменённых файлов: 7500006 добавлений и 93 удалений

Просмотреть файл

@ -95,9 +95,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
var fraction = 0.0
var total = 0
var multiplier = 3.0
var initialCount = count()
if (num > count()) {
total = Math.min(count().toInt, Integer.MAX_VALUE)
if (num > initialCount) {
total = Math.min(initialCount, Integer.MAX_VALUE)
total = total.toInt
fraction = 1.0
}
else if (num < 0) {
@ -109,12 +111,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
var r = new SampledRDD(this, withReplacement, fraction, seed)
var samples = r.collect()
while (r.count() < total) {
while (samples.length < total) {
r = new SampledRDD(this, withReplacement, fraction, seed)
}
var samples = r.collect()
var arr = new Array[T](total)
for (i <- 0 to total - 1) {

Просмотреть файл

@ -1,73 +0,0 @@
package spark.examples
import java.util.Random
import Vector._
import spark.SparkContext
import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
object SparkLocalKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)
def parseVector(line: String): Vector = {
return new Vector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
return bestIndex
}
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLocalKMeans")
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
val convergeDist = args(3).toDouble
var points = data.sample(false, (K+1)/data.count().toDouble, 42).collect
var kPoints = new HashMap[Int, Vector]
var tempDist = 1.0
for (i <- 1 to points.size) {
kPoints.put(i, points(i-1))
}
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)}
var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}.collect()
tempDist = 0.0
for (mapping <- newPoints) {
tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
}
for (newP <- newPoints) {
kPoints.put(newP._1, newP._2)
}
}
println("Final centers: " + kPoints)
}
}

7500016
kmeans_data.txt

Разница между файлами не показана из-за своего большого размера Загрузить разницу