Java Serializer Update for Schema Id Conversion Change (#29)

* Update serializer for schema id conversion change

* Get lower case content type from header

* Change content-type header to lowercase

* Roll back Kafka version and fix blank lines
This commit is contained in:
Nick Hardwick 2022-08-09 14:04:31 -07:00 коммит произвёл GitHub
Родитель 8873e7adc8
Коммит 9c3b5d6d90
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 85 добавлений и 69 удалений

Просмотреть файл

@ -27,7 +27,7 @@
</parent>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.6</version>
<version>1.0.0-beta.7</version>
<name>azure-schemaregistry-kafka-avro</name>
@ -46,17 +46,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry-apacheavro</artifactId>
<version>1.0.0-beta.8</version>
<version>1.0.0-beta.11</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-serializer-avro-apache</artifactId>
<version>1.0.0-beta.17</version>
<version>1.1.1</version>
</dependency>
<!-- test dependencies -->

Просмотреть файл

@ -9,7 +9,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-avro-samples</artifactId>
<packaging>jar</packaging>
<version>1.0.0-beta.4</version>
<version>1.0.0-beta.5</version>
<name>azure-schemaregistry-avro-samples</name>
<url>http://maven.apache.org</url>
@ -23,7 +23,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.6</version>
<version>1.0.0-beta.7</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>

Просмотреть файл

@ -15,10 +15,11 @@ import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 3655407841714098520L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.azure.schemaregistry.samples\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"description\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<Order> ENCODER =
new BinaryMessageEncoder<Order>(MODEL$, SCHEMA$);
@ -71,9 +72,9 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
return DECODER.decode(b);
}
private java.lang.CharSequence id;
private double amount;
private java.lang.CharSequence description;
private java.lang.CharSequence id;
private double amount;
private java.lang.CharSequence description;
/**
* Default constructor. Note that this does not initialize fields
@ -215,7 +216,7 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
super(SCHEMA$, MODEL$);
}
/**
@ -243,7 +244,7 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
* @param other The existing instance to copy.
*/
private Builder(com.azure.schemaregistry.samples.Order other) {
super(SCHEMA$);
super(SCHEMA$, MODEL$);
if (isValidValue(fields()[0], other.id)) {
this.id = data().deepCopy(fields()[0].schema(), other.id);
fieldSetFlags()[0] = true;
@ -458,13 +459,3 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
}
}
}

Просмотреть файл

@ -9,7 +9,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-avro-samples</artifactId>
<packaging>jar</packaging>
<version>1.0.0-beta.3</version>
<version>1.0.0-beta.5</version>
<name>azure-schemaregistry-avro-samples</name>
<url>http://maven.apache.org</url>
@ -23,7 +23,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.6</version>
<version>1.0.0-beta.7</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>

Просмотреть файл

@ -15,10 +15,11 @@ import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 3655407841714098520L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.azure.schemaregistry.samples\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"description\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<Order> ENCODER =
new BinaryMessageEncoder<Order>(MODEL$, SCHEMA$);
@ -71,9 +72,9 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
return DECODER.decode(b);
}
private java.lang.CharSequence id;
private double amount;
private java.lang.CharSequence description;
private java.lang.CharSequence id;
private double amount;
private java.lang.CharSequence description;
/**
* Default constructor. Note that this does not initialize fields
@ -215,7 +216,7 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
super(SCHEMA$, MODEL$);
}
/**
@ -243,7 +244,7 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
* @param other The existing instance to copy.
*/
private Builder(com.azure.schemaregistry.samples.Order other) {
super(SCHEMA$);
super(SCHEMA$, MODEL$);
if (isValidValue(fields()[0], other.id)) {
this.id = data().deepCopy(fields()[0].schema(), other.id);
fieldSetFlags()[0] = true;
@ -458,12 +459,3 @@ public class Order extends org.apache.avro.specific.SpecificRecordBase implement
}
}
}

Просмотреть файл

@ -2,19 +2,15 @@ package com.azure.schemaregistry.samples.producer;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroGenericRecord {
@ -53,7 +49,7 @@ public class KafkaAvroGenericRecord {
for (int i = 0; i < 10; i++) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("id", "ID-" + i);
avroRecord.put("amount", 20.99 + i);
avroRecord.put("amount", 20.00 + i);
avroRecord.put("description", "Sample order -" + i);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topicName, key, avroRecord);

Просмотреть файл

@ -2,17 +2,13 @@ package com.azure.schemaregistry.samples.producer;
import com.azure.core.credential.TokenCredential;
import com.azure.schemaregistry.samples.Order;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroSpecificRecord {
@ -43,7 +39,7 @@ public class KafkaAvroSpecificRecord {
while (true) {
for (int i = 0; i < 10; i++) {
Order order = new Order("ID-" + i, 10.99 + i, "Sample order -" + i);
Order order = new Order("ID-" + i, 10.00 + i, "Sample order -" + i);
ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order);
producer.send(record);
logger.info("Sent Order {}", order);

Просмотреть файл

@ -3,13 +3,16 @@
package com.microsoft.azure.schemaregistry.kafka.avro;
import com.azure.core.experimental.models.MessageWithMetadata;
import com.azure.core.util.BinaryData;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.util.Map;
/**
@ -43,7 +46,16 @@ public class KafkaAvroDeserializer implements Deserializer<Object> {
* @see KafkaAvroDeserializerConfig Deserializer will use configs found in here and inherited classes.
*/
public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);
this.config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);
this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
}
/**
@ -54,17 +66,29 @@ public class KafkaAvroDeserializer implements Deserializer<Object> {
*/
@Override
public Object deserialize(String topic, byte[] bytes) {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
return null;
}
this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
* @param bytes serialized bytes, may be null
* @param headers record headers, may be null
* @return deserialize object, may be null
*/
@Override
public Object deserialize(String topic, Headers headers, byte[] bytes) {
MessageWithMetadata message = new MessageWithMetadata();
message.setBodyAsBinaryData(BinaryData.fromBytes(bytes));
return serializer.deserialize(in, TypeReference.createInstance(Object.class));
Header contentTypeHeader = headers.lastHeader("content-type");
if (contentTypeHeader != null) {
message.setContentType(new String(contentTypeHeader.value()));
} else {
message.setContentType("");
}
return this.serializer.deserializeMessageData(message, TypeReference.createInstance(Object.class));
}
@Override

Просмотреть файл

@ -3,13 +3,15 @@
package com.microsoft.azure.schemaregistry.kafka.avro;
import com.azure.core.experimental.models.MessageWithMetadata;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.util.Map;
/**
@ -69,6 +71,23 @@ public class KafkaAvroSerializer implements Serializer<Object> {
*/
@Override
public byte[] serialize(String topic, Object record) {
return null;
}
/**
* Serializes GenericRecord or SpecificRecord into a byte array, containing a GUID reference to schema
* and the encoded payload.
*
* Null behavior matches Kafka treatment of null values.
*
* @param topic Topic destination for record. Required by Kafka serializer interface, currently not used.
* @param record Object to be serialized, may be null
* @param headers Record headers, may be null
* @return byte[] payload for sending to EH Kafka service, may be null
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Headers headers, Object record) {
// null needs to treated specially since the client most likely just wants to send
// an individual null value instead of making the subject a null type. Also, null in
// Kafka has a special meaning for deletion in a topic with the compact retention policy.
@ -77,10 +96,13 @@ public class KafkaAvroSerializer implements Serializer<Object> {
if (record == null) {
return null;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.serialize(out, record);
return out.toByteArray();
MessageWithMetadata message =
this.serializer.serializeMessageData(record, TypeReference.createInstance(MessageWithMetadata.class));
String messageContentType = message.getContentType();
byte[] contentTypeBytes = messageContentType.getBytes();
headers.add("content-type", contentTypeBytes);
return message.getBodyAsBinaryData().toBytes();
}
@Override