Deserialize SpecificRecord Fix (#35)

* Update serializers and samples

* Remove extra new lines from generated class code

* Get deserializer class from config

* Address comments
This commit is contained in:
Nick Hardwick 2022-11-11 16:08:33 -08:00 коммит произвёл GitHub
Родитель e76e437907
Коммит 4f46b4f235
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 51 добавлений и 111 удалений

Просмотреть файл

@ -27,7 +27,7 @@
</parent>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.7</version>
<version>1.0.0-beta.8</version>
<name>azure-schemaregistry-kafka-avro</name>

Просмотреть файл

@ -9,7 +9,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-avro-samples</artifactId>
<packaging>jar</packaging>
<version>1.0.0-beta.5</version>
<version>1.0.0-beta.6</version>
<name>azure-schemaregistry-avro-samples</name>
<url>http://maven.apache.org</url>
@ -23,7 +23,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.7</version>
<version>1.0.0-beta.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>

Просмотреть файл

@ -16,6 +16,7 @@ import org.apache.avro.message.SchemaStore;
public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = 3655407841714098520L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.azure.schemaregistry.samples\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"description\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

Просмотреть файл

@ -20,7 +20,6 @@ public class App {
// Schema Registry specific properties
String registryUrl = props.getProperty("schema.registry.url");
String schemaGroup = props.getProperty("schema.group");
TokenCredential credential;
if (props.getProperty("use.managed.identity.credential").equals("true")) {
@ -52,4 +51,3 @@ public class App {
}
}
}

Просмотреть файл

@ -4,14 +4,8 @@ import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
@ -20,53 +14,6 @@ import java.util.Properties;
public class KafkaAvroGenericRecord {
private static final ClientLogger logger = new ClientLogger(KafkaAvroGenericRecord.class);
static void produceGenericRecords(String brokerUrl, String registryUrl, String jaasConfig, String topicName, String schemaGroup, TokenCredential credential) {
Properties props = new Properties();
// EH Kafka Configs
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasConfig);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// Schema Registry configs
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
props.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, schemaGroup);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
String key = "key1";
String userSchema = "{\"type\":\"record\",\"name\":\"AvroUser\",\"namespace\":\"com.azure.schemaregistry.samples\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favoriteNumber\",\"type\":\"int\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
logger.info("Parsed schema: {}", schema);
while (true) {
for (int i = 0; i < 10; i++) {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "user" + i);
avroRecord.put("favoriteNumber", i);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topicName, key, avroRecord);
producer.send(record);
logger.info("Sent GenericRecord: {}", record);
}
producer.flush();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static void consumeGenericRecords(String brokerUrl, String registryUrl, String jaasConfig, String topicName, TokenCredential credential) {
Properties props = new Properties();
@ -85,7 +32,7 @@ public class KafkaAvroGenericRecord {
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
@ -99,4 +46,4 @@ public class KafkaAvroGenericRecord {
consumer.close();
}
}
}
}

Просмотреть файл

@ -3,8 +3,6 @@ package com.azure.schemaregistry.samples.consumer;
import com.azure.core.credential.TokenCredential;
import com.azure.schemaregistry.samples.Order;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.kafka.clients.consumer.*;
import com.azure.core.util.logging.ClientLogger;
@ -12,14 +10,6 @@ import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Current workaround for bug in Avro Serializer (https://github.com/Azure/azure-sdk-for-java/issues/27602)
* will produce GenericRecords reguardless of value of KafkaAvroDeserializerConfig.AVRO_SPECIFIC_READER_CONFIG property.
* Bug fixed in serializer beta.11:
* (https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md)
* Implementation of fix will be added in future update.
*/
public class KafkaAvroSpecificRecord {
private static final ClientLogger logger = new ClientLogger(KafkaAvroSpecificRecord.class);
@ -35,27 +25,21 @@ public class KafkaAvroSpecificRecord {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);
props.put("schema.registry.url", registryUrl);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
props.put(KafkaAvroDeserializerConfig.AVRO_SPECIFIC_READER_CONFIG, true);
final Consumer<String, Order> consumer = new KafkaConsumer<String, Order>(props);
// Specify class to deserialize record into (defaults to Object.class)
props.put(KafkaAvroDeserializerConfig.AVRO_SPECIFIC_VALUE_TYPE_CONFIG, Order.class);
final KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, Order> record : records) {
/**
* Current workaround for bug in Avro Serializer (https://github.com/Azure/azure-sdk-for-java/issues/27602)
* will produce GenericRecords reguardless of value of KafkaAvroDeserializerConfig.AVRO_SPECIFIC_READER_CONFIG property.
* Bug fixed in serializer beta.11:
* (https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md)
* Implementation of fix with native SpecificRecord support will be added in future update.
*/
Order order = (Order) SpecificData.get().deepCopy(Order.SCHEMA$, (GenericRecord) record.value());
logger.info("Order received : " + order.toString());
logger.info("Order received: " + record.value());
}
}
} finally {
@ -63,4 +47,3 @@ public class KafkaAvroSpecificRecord {
}
}
}

Просмотреть файл

@ -9,7 +9,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-avro-samples</artifactId>
<packaging>jar</packaging>
<version>1.0.0-beta.5</version>
<version>1.0.0-beta.6</version>
<name>azure-schemaregistry-avro-samples</name>
<url>http://maven.apache.org</url>
@ -23,7 +23,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.7</version>
<version>1.0.0-beta.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>

Просмотреть файл

@ -52,4 +52,3 @@ public class App {
}
}
}

Просмотреть файл

@ -50,7 +50,7 @@ public class KafkaAvroGenericRecord {
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("id", "ID-" + i);
avroRecord.put("amount", 20.00 + i);
avroRecord.put("description", "Sample order -" + i);
avroRecord.put("description", "Sample order " + i);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topicName, key, avroRecord);
producer.send(record);
@ -65,6 +65,4 @@ public class KafkaAvroGenericRecord {
}
}
}
}
}

Просмотреть файл

@ -37,20 +37,23 @@ public class KafkaAvroSpecificRecord {
String key = "sample-key";
while (true) {
for (int i = 0; i < 10; i++) {
Order order = new Order("ID-" + i, 10.00 + i, "Sample order -" + i);
ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order);
producer.send(record);
logger.info("Sent Order {}", order);
}
producer.flush();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
try {
while (true) {
for (int i = 0; i < 10; i++) {
Order order = new Order("ID-" + i, 10.00 + i, "Sample order " + i);
ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order);
producer.send(record);
logger.info("Sent Order {}", order);
}
producer.flush();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
producer.close();
}
}
}

Просмотреть файл

@ -9,10 +9,10 @@ import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
@ -25,7 +25,7 @@ import java.util.Map;
*
* @see KafkaAvroSerializer See serializer class for upstream serializer implementation
*/
public class KafkaAvroDeserializer implements Deserializer<Object> {
public class KafkaAvroDeserializer<T extends IndexedRecord> implements Deserializer<T> {
private SchemaRegistryApacheAvroSerializer serializer;
private KafkaAvroDeserializerConfig config;
@ -65,7 +65,7 @@ public class KafkaAvroDeserializer implements Deserializer<Object> {
* @return deserialize object, may be null
*/
@Override
public Object deserialize(String topic, byte[] bytes) {
public T deserialize(String topic, byte[] bytes) {
return null;
}
@ -77,7 +77,7 @@ public class KafkaAvroDeserializer implements Deserializer<Object> {
* @return deserialize object, may be null
*/
@Override
public Object deserialize(String topic, Headers headers, byte[] bytes) {
public T deserialize(String topic, Headers headers, byte[] bytes) {
MessageWithMetadata message = new MessageWithMetadata();
message.setBodyAsBinaryData(BinaryData.fromBytes(bytes));
@ -88,7 +88,9 @@ public class KafkaAvroDeserializer implements Deserializer<Object> {
message.setContentType("");
}
return this.serializer.deserializeMessageData(message, TypeReference.createInstance(Object.class));
return (T) this.serializer.deserializeMessageData(
message,
TypeReference.createInstance(this.config.getAvroSpecificType()));
}
@Override

Просмотреть файл

@ -19,6 +19,8 @@ public final class KafkaAvroDeserializerConfig extends AbstractKafkaSerdeConfig
public static final Boolean AVRO_SPECIFIC_READER_CONFIG_DEFAULT = false;
public static final String AVRO_SPECIFIC_VALUE_TYPE_CONFIG = "specific.avro.value.type";
KafkaAvroDeserializerConfig(Map<String, Object> props) {
super(props);
}
@ -30,4 +32,11 @@ public final class KafkaAvroDeserializerConfig extends AbstractKafkaSerdeConfig
return (Boolean) this.getProps().getOrDefault(
AVRO_SPECIFIC_READER_CONFIG, AVRO_SPECIFIC_READER_CONFIG_DEFAULT);
}
/**
* @return avro specific class flag, with default set to Object class
*/
public Class<?> getAvroSpecificType() {
return (Class<?>) this.getProps().getOrDefault(AVRO_SPECIFIC_VALUE_TYPE_CONFIG, Object.class);
}
}

Просмотреть файл

@ -24,7 +24,7 @@ import java.util.Map;
*
* @see KafkaAvroDeserializer See deserializer class for downstream deserializer implementation
*/
public class KafkaAvroSerializer implements Serializer<Object> {
public class KafkaAvroSerializer<T> implements Serializer<T> {
private SchemaRegistryApacheAvroSerializer serializer;
/**
@ -70,7 +70,7 @@ public class KafkaAvroSerializer implements Serializer<Object> {
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Object record) {
public byte[] serialize(String topic, T record) {
return null;
}
@ -87,7 +87,7 @@ public class KafkaAvroSerializer implements Serializer<Object> {
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Headers headers, Object record) {
public byte[] serialize(String topic, Headers headers, T record) {
// null needs to treated specially since the client most likely just wants to send
// an individual null value instead of making the subject a null type. Also, null in
// Kafka has a special meaning for deletion in a topic with the compact retention policy.