Upgrade Java Avro dependency to 1.0.0-beta.4 (#9)

This commit is contained in:
Arthur Erlendsson 2020-09-22 22:19:55 -07:00 коммит произвёл GitHub
Родитель 00fc6a8a67
Коммит 05161aee96
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 112 добавлений и 84 удалений

Просмотреть файл

@ -11,10 +11,22 @@ Azure Schema Registry provides:
- Kafka and AMQP client plugins for serialization and deserialization
- Role-based access control for schemas and schema groups
If interested in participating in the Schema Registry private preview program, [contact the Schema Registry team](mailto:askEHSchemaRegistry@microsoft.com) for more information.
An overview of Azure Schema Registry can be found on the [Event Hubs docs page](https://docs.microsoft.com/en-us/azure/event-hubs/schema-registry-overview).
Sample code can be found in implementation level folders (e.g. [avro](avro/samples))
# Implementations
Each Kafka Schema Registry serializer will be backed by common code hosted in the Azure Central SDK repositories.
Base serialization implementations can be found at the following repository links by language:
- Java - [azure-sdk-for-java](https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/schemaregistry/azure-data-schemaregistry-avro)
- C# - [azure-sdk-for-dotnet](https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro)
- Python - [azure-sdk-for-python](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer)
- JS/TS - [azure-sdk-for-js](https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/schemaregistry/schema-registry-avro)
Sample code can be found in the `/samples` directories at the following links.
# Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a

Просмотреть файл

@ -27,7 +27,7 @@
</parent>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.2</version>
<version>1.0.0-beta.4</version>
<name>azure-schemaregistry-kafka-avro</name>
@ -45,13 +45,33 @@
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.0.0-beta.2</version>
<artifactId>azure-data-schemaregistry-avro</artifactId>
<version>1.0.0-beta.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry-avro</artifactId>
<version>1.0.0-beta.2</version>
<artifactId>azure-core-serializer-avro-apache</artifactId>
<version>1.0.0-beta.3</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<!-- test dependencies -->

Просмотреть файл

@ -6,6 +6,8 @@ This tutorial requires an Event Hubs namespace with Schema Registry enabled.
Basic Kafka Java producer and consumer scenarios are covered in the [Java Kafka quickstart](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java).
Before running your sample, a schema group with serialization type 'Avro' should be created. This is accomplished most easily through the Schema Registry blade in the Azure portal. Any compatibility mode is acceptable for this sample.
## Configuration
All required configurations can be found in the [configuration file](src/main/resources/app.properties).
@ -16,7 +18,7 @@ The following Event Hub configurations are required:
The following Schema Registry configurations are required:
- `schema.registry.url` - EH namespace with Schema Registry enabled (does not have to be the same as `bootstrap.servers`)
- `schema.group` - Schema Registry group
- `schema.group` - Schema Registry group with serialization type 'Avro' (*must be created before the application runs*)
- `use.managed.identity.credential` - indicates that MSI credentials should be used, should be used for MSI-enabled VM
- `tenant.id` - sets the tenant ID of the application
- `client.id` - sets the client ID of the application

Просмотреть файл

@ -9,7 +9,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-avro-samples</artifactId>
<packaging>jar</packaging>
<version>1.0.0-beta.2</version>
<version>1.0.0-beta.3</version>
<name>azure-schemaregistry-avro-samples</name>
<url>http://maven.apache.org</url>
@ -23,7 +23,12 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.0.0-beta.2</version>
<version>1.0.0-beta.4</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-serializer-avro-apache</artifactId>
<version>1.0.0-beta.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@ -35,6 +40,11 @@
<artifactId>jackson-core</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
@ -81,7 +91,7 @@
<includes>
<include>AvroUser.avsc</include>
</includes>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>

Просмотреть файл

@ -14,7 +14,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
@ -38,6 +37,7 @@ public class KafkaAvroGenericRecord {
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
props.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, schemaGroup);
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
String key = "key1";

Просмотреть файл

@ -4,7 +4,7 @@
package com.microsoft.azure.schemaregistry.kafka.avro;
import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClient;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import java.util.Map;
@ -25,12 +25,12 @@ class AbstractKafkaSerdeConfig {
* Required.
*
* Sets the {@link TokenCredential} to use when authenticating HTTP requests for this
* * {@link CachedSchemaRegistryClient}.
* * {@link SchemaRegistryClient}.
*/
public static final String SCHEMA_REGISTRY_CREDENTIAL_CONFIG = "schema.registry.credential";
/**
* Schema cache size limit on underlying {@link CachedSchemaRegistryClient}. If limit is exceeded on any cache,
* Schema cache size limit on underlying {@link SchemaRegistryClient}. If limit is exceeded on any cache,
* all caches are recycled.
*/
public static final String MAX_SCHEMA_MAP_SIZE_CONFIG = "max.schema.map.size";
@ -38,7 +38,7 @@ class AbstractKafkaSerdeConfig {
public static final Integer MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT = 1000;
AbstractKafkaSerdeConfig(Map<String, ?> props) {
AbstractKafkaSerdeConfig(Map<String, Object> props) {
this.props = (Map<String, Object>) props;
}

Просмотреть файл

@ -3,13 +3,13 @@
package com.microsoft.azure.schemaregistry.kafka.avro;
import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.AbstractDataDeserializer;
import com.azure.data.schemaregistry.avro.AvroByteDecoder;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
import org.apache.kafka.common.errors.SerializationException;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.util.Map;
/**
@ -20,11 +20,10 @@ import java.util.Map;
* Receiving Avro GenericRecords and SpecificRecords is supported. Avro reflection capabilities have been disabled on
* com.azure.schemaregistry.kafka.KafkaAvroSerializer.
*
* @see AbstractDataDeserializer See abstract parent class for implementation details
* @see KafkaAvroSerializer See serializer class for upstream serializer implementation
*/
public class KafkaAvroDeserializer extends AbstractDataDeserializer
implements Deserializer<Object> {
public class KafkaAvroDeserializer implements Deserializer<Object> {
private SchemaRegistryAvroSerializer serializer;
/**
* Empty constructor used by Kafka consumer
@ -43,21 +42,16 @@ public class KafkaAvroDeserializer extends AbstractDataDeserializer
* @see KafkaAvroDeserializerConfig Deserializer will use configs found in here and inherited classes.
*/
public void configure(Map<String, ?> props, boolean isKey) {
KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
String registryUrl = config.getSchemaRegistryUrl();
TokenCredential credential = config.getCredential();
Integer maxSchemaMapSize = config.getMaxSchemaMapSize();
KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);
Boolean useSpecificAvroReader = config.getAvroSpecificReader();
AvroByteDecoder decoder = new AvroByteDecoder(useSpecificAvroReader);
this.schemaRegistryClient = new CachedSchemaRegistryClientBuilder()
.endpoint(registryUrl)
.credential(credential)
.maxSchemaMapSize(maxSchemaMapSize)
.buildClient();
this.loadByteDecoder(decoder);
this.serializer = new SchemaRegistryAvroSerializerBuilder()
.schemaRegistryAsyncClient(new SchemaRegistryClientBuilder()
.endpoint(config.getSchemaRegistryUrl())
.credential(config.getCredential())
.maxCacheSize(config.getMaxSchemaMapSize())
.buildAsyncClient())
.avroSpecificReader(config.getAvroSpecificReader())
.buildSerializer();
}
/**
@ -65,15 +59,11 @@ public class KafkaAvroDeserializer extends AbstractDataDeserializer
* @param topic topic associated with the record bytes
* @param bytes serialized bytes, may be null
* @return deserialize object, may be null
* @throws SerializationException catchable by core Kafka fetcher code
*/
@Override
public Object deserialize(String topic, byte[] bytes) throws SerializationException {
try {
return deserialize(bytes);
} catch (com.azure.data.schemaregistry.SerializationException e) {
throw new SerializationException(e.getCause());
}
public Object deserialize(String topic, byte[] bytes) {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
return serializer.deserialize(in, TypeReference.createInstance(Object.class));
}
@Override

Просмотреть файл

@ -19,7 +19,7 @@ public final class KafkaAvroDeserializerConfig extends AbstractKafkaSerdeConfig
public static final Boolean AVRO_SPECIFIC_READER_CONFIG_DEFAULT = false;
KafkaAvroDeserializerConfig(Map<String, ?> props) {
KafkaAvroDeserializerConfig(Map<String, Object> props) {
super(props);
}

Просмотреть файл

@ -3,13 +3,13 @@
package com.microsoft.azure.schemaregistry.kafka.avro;
import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.AbstractDataSerializer;
import com.azure.data.schemaregistry.avro.AvroByteEncoder;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer;
import com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializerBuilder;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.util.Map;
/**
@ -20,11 +20,10 @@ import java.util.Map;
*
* Currently, sending Avro GenericRecords and SpecificRecords is supported. Avro reflection has been disabled.
*
* @see AbstractDataSerializer See abstract parent class for implementation details
* @see KafkaAvroDeserializer See deserializer class for downstream deserializer implementation
*/
public class KafkaAvroSerializer extends AbstractDataSerializer
implements Serializer<Object> {
public class KafkaAvroSerializer implements Serializer<Object> {
private SchemaRegistryAvroSerializer serializer;
/**
* Empty constructor for Kafka producer
@ -44,25 +43,17 @@ public class KafkaAvroSerializer extends AbstractDataSerializer
*/
@Override
public void configure(Map<String, ?> props, boolean isKey) {
KafkaAvroSerializerConfig config = new KafkaAvroSerializerConfig(props);
String registryUrl = config.getSchemaRegistryUrl();
TokenCredential credential = config.getCredential();
Integer maxSchemaMapSize = config.getMaxSchemaMapSize();
KafkaAvroSerializerConfig config = new KafkaAvroSerializerConfig((Map<String, Object>) props);
CachedSchemaRegistryClientBuilder builder = new CachedSchemaRegistryClientBuilder()
.endpoint(registryUrl)
.credential(credential);
if (maxSchemaMapSize != null) {
builder.maxSchemaMapSize(maxSchemaMapSize);
}
this.schemaRegistryClient = builder.buildClient();
this.schemaGroup = config.getSchemaGroup();
this.autoRegisterSchemas = config.getAutoRegisterSchemas();
this.setByteEncoder(new AvroByteEncoder());
this.serializer = new SchemaRegistryAvroSerializerBuilder()
.schemaRegistryAsyncClient(new SchemaRegistryClientBuilder()
.endpoint(config.getSchemaRegistryUrl())
.credential(config.getCredential())
.maxCacheSize(config.getMaxSchemaMapSize())
.buildAsyncClient())
.schemaGroup(config.getSchemaGroup())
.autoRegisterSchema(config.getAutoRegisterSchemas())
.buildSerializer();
}
@ -78,7 +69,7 @@ public class KafkaAvroSerializer extends AbstractDataSerializer
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Object record) throws SerializationException {
public byte[] serialize(String topic, Object record) {
// null needs to treated specially since the client most likely just wants to send
// an individual null value instead of making the subject a null type. Also, null in
// Kafka has a special meaning for deletion in a topic with the compact retention policy.
@ -88,12 +79,9 @@ public class KafkaAvroSerializer extends AbstractDataSerializer
return null;
}
try {
return serializeImpl(record);
} catch (com.azure.data.schemaregistry.SerializationException e) {
// convert into kafka exception
throw new SerializationException(e.getCause());
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.serialize(out, record);
return out.toByteArray();
}
@Override

Просмотреть файл

@ -32,9 +32,7 @@ public final class KafkaAvroSerializerConfig extends AbstractKafkaSerdeConfig {
*/
public static final String SCHEMA_GROUP_CONFIG = "schema.group";
public static final String SCHEMA_GROUP_CONFIG_DEFAULT = "$default";
KafkaAvroSerializerConfig(Map<String, ?> props) {
KafkaAvroSerializerConfig(Map<String, Object> props) {
super(props);
}
@ -47,9 +45,12 @@ public final class KafkaAvroSerializerConfig extends AbstractKafkaSerdeConfig {
}
/**
* @return schema group, with default group name set to '$default'
* @return schema group
*/
public String getSchemaGroup() {
return (String) this.getProps().getOrDefault(SCHEMA_GROUP_CONFIG, SCHEMA_GROUP_CONFIG_DEFAULT);
if (!this.getProps().containsKey(SCHEMA_GROUP_CONFIG)) {
throw new NullPointerException("Schema group configuration property is required.");
}
return (String) this.getProps().get(SCHEMA_GROUP_CONFIG);
}
}

Просмотреть файл

@ -17,8 +17,13 @@ public class KafkaAvroSerializerConfigTest {
}
@Test
public void testGetSchemaGroupDefault() {
public void testNoSchemaGroupProvidedThenFail() {
KafkaAvroSerializerConfig config = new KafkaAvroSerializerConfig(Collections.emptyMap());
assertEquals(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG_DEFAULT, config.getSchemaGroup());
try {
config.getSchemaGroup();
fail();
} catch (NullPointerException e) {
assertTrue(true);
}
}
}