Merge pull request #512 from patelh/fix-kryo-serializer

Fix reference bug in Kryo serializer, add test, update version
This commit is contained in:
Matei Zaharia 2013-03-10 15:48:23 -07:00
Родитель 557cfd0f4d 664e5fd24b
Коммит 91a9d093bd
3 изменённых файлов: 20 добавлений и 12 удалений

Просмотреть файл

@ -157,27 +157,34 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
// Register maps with a special serializer since they have complex internal structure // Register maps with a special serializer since they have complex internal structure
class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any]) class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] { extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
//hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
private final val FAKE_REFERENCE = new Object()
override def write( override def write(
kryo: Kryo, kryo: Kryo,
output: KryoOutput, output: KryoOutput,
obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) { obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
val map = obj.asInstanceOf[scala.collection.Map[Any, Any]] val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer]) output.writeInt(map.size)
for ((k, v) <- map) { for ((k, v) <- map) {
kryo.writeClassAndObject(output, k) kryo.writeClassAndObject(output, k)
kryo.writeClassAndObject(output, v) kryo.writeClassAndObject(output, v)
} }
} }
override def read ( override def read (
kryo: Kryo, kryo: Kryo,
input: KryoInput, input: KryoInput,
cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]]) cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
: Array[(Any, Any)] => scala.collection.Map[Any, Any] = { : Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue kryo.reference(FAKE_REFERENCE)
val size = input.readInt()
val elems = new Array[(Any, Any)](size) val elems = new Array[(Any, Any)](size)
for (i <- 0 until size) for (i <- 0 until size) {
elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input)) val k = kryo.readClassAndObject(input)
val v = kryo.readClassAndObject(input)
elems(i)=(k,v)
}
buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]] buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
} }
} }

Просмотреть файл

@ -82,6 +82,7 @@ class KryoSerializerSuite extends FunSuite {
check(mutable.HashMap(1 -> "one", 2 -> "two")) check(mutable.HashMap(1 -> "one", 2 -> "two"))
check(mutable.HashMap("one" -> 1, "two" -> 2)) check(mutable.HashMap("one" -> 1, "two" -> 2))
check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4)))) check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
} }
test("custom registrator") { test("custom registrator") {

Просмотреть файл

@ -132,7 +132,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1", "asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1", "com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.20", "de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3",