Consider tombstones in GenericSerde.serialize

Fixes #522
This commit is contained in:
Andreas Schroeder 2020-09-18 07:34:32 +02:00
Родитель 308a479d57
Коммит 7db5be3d3c
2 изменённых файлов: 17 добавлений и 10 удалений

Просмотреть файл

@ -45,17 +45,19 @@ class GenericSerde[T >: Null : SchemaFor : Encoder : Decoder](avroFormat: AvroFo
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
override def serialize(topic: String, data: T): Array[Byte] = {
val baos = new ByteArrayOutputStream()
if (data == null) null else {
val baos = new ByteArrayOutputStream()
val avroOutputStream = avroFormat match {
case BinaryFormat => AvroOutputStream.binary[T]
case JsonFormat => AvroOutputStream.json[T]
case DataFormat => AvroOutputStream.data[T]
val avroOutputStream = avroFormat match {
case BinaryFormat => AvroOutputStream.binary[T]
case JsonFormat => AvroOutputStream.json[T]
case DataFormat => AvroOutputStream.data[T]
}
val output = avroOutputStream.to(baos).build()
output.write(data)
output.close()
baos.toByteArray
}
val output = avroOutputStream.to(baos).build()
output.write(data)
output.close()
baos.toByteArray
}
}

Просмотреть файл

@ -53,4 +53,9 @@ class GenericSerdeTest extends AnyFlatSpec with Matchers {
) shouldBe someValue
}
"round trip with tombstone" should "yield back tombstone" in {
val binarySerde = new GenericSerde[TheKafkaValue](BinaryFormat)
val tombstone: TheKafkaValue = null
binarySerde.deserialize("any-topic", binarySerde.serialize("any-topic", tombstone)) shouldBe tombstone
}
}