Kafka Java library update for JSON support (#41)

* Add Json serializer and deserializer

* Update sample producer and consumer

* Add basic serde tests

* Add custom serializer exception and update samples

* Update exception to extend Kafka exception type

* Update exceptions and add telemetry
This commit is contained in:
Nick Hardwick 2023-03-23 16:33:43 -07:00 коммит произвёл GitHub
Родитель 5d431d0860
Коммит b370a88a65
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
30 изменённых файлов: 712 добавлений и 284 удалений

Просмотреть файл

@ -19,11 +19,65 @@
<tag>HEAD</tag>
</scm>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.35.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.4.0-beta.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.github.victools</groupId>
<artifactId>jsonschema-generator</artifactId>
<version>4.28.0</version>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
<version>1.0.76</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
</dependencies>

Просмотреть файл

@ -20,31 +20,21 @@
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.4.0-beta.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.github.victools</groupId>
<artifactId>jsonschema-generator</artifactId>
<version>4.28.0</version>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-json</artifactId>
<version>1.0.0-beta.1</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,49 @@
package com.azure.schemaregistry.samples;
public class CustomerInvoice {
private String invoiceId;
private String merchantId;
private int transactionValueUsd;
private String userId;
public CustomerInvoice() {}
public CustomerInvoice(String invoiceId, String merchantId, int transactionValueUsd, String userId) {
this.invoiceId = invoiceId;
this.merchantId = merchantId;
this.transactionValueUsd = transactionValueUsd;
this.userId = userId;
}
public String getInvoiceId() {
return this.invoiceId;
}
public void setInvoiceId(String id) {
this.invoiceId = id;
}
public String getMerchantId() {
return this.merchantId;
}
public void setMerchantId(String id) {
this.merchantId = id;
}
public int getTransactionValueUsd() {
return this.transactionValueUsd;
}
public void setTransactionValueUsd(int transactionValueUsd) {
this.transactionValueUsd = transactionValueUsd;
}
public String getUserId() {
return this.userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
}

Просмотреть файл

@ -45,7 +45,7 @@ public class App
Scanner in = new Scanner(System.in);
System.out.println("Enter case number:");
System.out.println("1 - consume Avro SpecificRecords");
System.out.println("1 - Consume SpecificRecords");
int caseNum = in.nextInt();
switch (caseNum) {

Просмотреть файл

@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.schemaregistry.samples.CustomerInvoice;
import com.azure.schemaregistry.samples.Order;
public class KafkaJsonSpecificRecord {
@ -29,24 +30,23 @@ public class KafkaJsonSpecificRecord {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
com.azure.schemaregistry.samples.consumer.KafkaJsonDeserializer.class);
com.microsoft.azure.schemaregistry.kafka.json.KafkaJsonDeserializer.class);
// Schema Registry configs
props.put("schema.registry.url", registryUrl);
props.put("schema.registry.credential", credential);
props.put("auto.register.schemas", true);
props.put("specific.value.type", Order.class);
props.put("specific.value.type", CustomerInvoice.class);
final KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
final KafkaConsumer<String, CustomerInvoice> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
System.out.println("Reading records...");
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, Order> record : records) {
logger.info("Order received: " + record.value().toString());
System.out.println("Order received: " + record.value().toString());
ConsumerRecords<String, CustomerInvoice> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, CustomerInvoice> record : records) {
logger.info("Invoice received: " + record.value());
}
}
} finally {

Просмотреть файл

@ -20,36 +20,21 @@
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-schemaregistry</artifactId>
<version>1.4.0-beta.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>com.github.victools</groupId>
<artifactId>jsonschema-generator</artifactId>
<version>4.28.0</version>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
<version>1.0.76</version>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-json</artifactId>
<version>1.0.0-beta.1</version>
</dependency>
</dependencies>
</project>

Просмотреть файл

@ -0,0 +1,49 @@
package com.azure.schemaregistry.samples;
public class CustomerInvoice {
private String invoiceId;
private String merchantId;
private int transactionValueUsd;
private String userId;
public CustomerInvoice() {}
public CustomerInvoice(String invoiceId, String merchantId, int transactionValueUsd, String userId) {
this.invoiceId = invoiceId;
this.merchantId = merchantId;
this.transactionValueUsd = transactionValueUsd;
this.userId = userId;
}
public String getInvoiceId() {
return this.invoiceId;
}
public void setInvoiceId(String id) {
this.invoiceId = id;
}
public String getMerchantId() {
return this.merchantId;
}
public void setMerchantId(String id) {
this.merchantId = id;
}
public int getTransactionValueUsd() {
return this.transactionValueUsd;
}
public void setTransactionValueUsd(int transactionValueUsd) {
this.transactionValueUsd = transactionValueUsd;
}
public String getUserId() {
return this.userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
}

Просмотреть файл

@ -28,10 +28,6 @@ public class App
credential = new ManagedIdentityCredentialBuilder()
.clientId(props.getProperty("managed.identity.clientId"))
.build();
} else if (props.getProperty("managed.identity.resourceId") != null) {
credential = new ManagedIdentityCredentialBuilder()
.resourceId(props.getProperty("managed.identity.resourceId"))
.build();
} else {
credential = new ManagedIdentityCredentialBuilder().build();
}
@ -46,7 +42,7 @@ public class App
Scanner in = new Scanner(System.in);
System.out.println("Enter case number:");
System.out.println("1 - produce Avro SpecificRecords");
System.out.println("1 - Produce SpecificRecords");
int caseNum = in.nextInt();
switch (caseNum) {

Просмотреть файл

@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.schemaregistry.samples.CustomerInvoice;
import com.azure.schemaregistry.samples.Order;
public class KafkaJsonSpecificRecord {
@ -25,14 +26,14 @@ public class KafkaJsonSpecificRecord {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
com.azure.schemaregistry.samples.producer.KafkaJsonSerializer.class);
com.microsoft.azure.schemaregistry.kafka.json.KafkaJsonSerializer.class);
// Schema Registry configs
props.put("schema.registry.url", registryUrl);
props.put("schema.registry.credential", credential);
props.put("auto.register.schemas", true);
props.put("schema.group", schemaGroup);
KafkaProducer<String, Order> producer = new KafkaProducer<String,Order>(props);
KafkaProducer<String, CustomerInvoice> producer = new KafkaProducer<String,CustomerInvoice>(props);
String key = "sample-key";
@ -40,11 +41,10 @@ public class KafkaJsonSpecificRecord {
try {
while (true) {
for (int i = 0; i < 10; i++) {
Order order = new Order("ID-" + i, 0.99 + i, "Sample order #" + Math.abs(new Random().nextInt()));
ProducerRecord<String, Order> record = new ProducerRecord<String, Order>(topicName, key, order);
CustomerInvoice invoice = new CustomerInvoice("Invoice " + i, "Merchant Id " + i, i, "User Id " + i);
ProducerRecord<String, CustomerInvoice> record = new ProducerRecord<String, CustomerInvoice>(topicName, key, invoice);
producer.send(record);
logger.info("Sent Order {}", order);
System.out.println("Sent Order " + order.getId());
logger.info("Sent Order " + invoice.getInvoiceId());
}
producer.flush();
try {

Просмотреть файл

@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import java.util.Map;
/**
*
*/
class AbstractKafkaSerdeConfig {
private Map<String, Object> props;
/**
* Required.
*
* Sets the service endpoint for the Azure Schema Registry instance
*/
public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
/**
* Required.
*
* Sets the {@link TokenCredential} to use when authenticating HTTP requests for this
* * {@link SchemaRegistryClient}.
*/
public static final String SCHEMA_REGISTRY_CREDENTIAL_CONFIG = "schema.registry.credential";
/**
* Schema cache size limit on underlying {@link SchemaRegistryClient}. If limit is exceeded on any cache,
* all caches are recycled.
*/
public static final String MAX_SCHEMA_MAP_SIZE_CONFIG = "max.schema.map.size";
public static final Integer MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT = 1000;
AbstractKafkaSerdeConfig(Map<String, Object> props) {
this.props = (Map<String, Object>) props;
}
public Map<String, Object> getProps() {
return props;
}
public String getSchemaRegistryUrl() {
return (String) this.props.get(SCHEMA_REGISTRY_URL_CONFIG);
}
public TokenCredential getCredential() {
return (TokenCredential) this.props.get(SCHEMA_REGISTRY_CREDENTIAL_CONFIG);
}
public Integer getMaxSchemaMapSize() {
return (Integer) this.props.getOrDefault(MAX_SCHEMA_MAP_SIZE_CONFIG, MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT);
}
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import org.apache.kafka.common.errors.SerializationException;
/**
* Custom error class for exceptions thrown in Json serialization/deserialization steps
*/
public class JsonSerializationException extends SerializationException {
/**
* Constructor with message only
* @param errorMessage Brief explination of exception source.
*/
public JsonSerializationException(String errorMessage) {
super(errorMessage);
}
/**
* Constructor with message and throwable error
* @param errorMessage Brief explination of exception source.
* @param err Throwable error object
*/
public JsonSerializationException(String errorMessage, Throwable err) {
super(errorMessage, err);
}
}

Просмотреть файл

@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import com.azure.core.util.ClientOptions;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
/**
* Deserializer implementation for Kafka consumer, implementing Kafka Deserializer interface.
*
* @see KafkaJsonSerializer See serializer class for upstream serializer implementation
*/
public class KafkaJsonDeserializer<T> implements Deserializer<T> {
private SchemaRegistryClient client;
private KafkaJsonDeserializerConfig config;
/**
* Empty constructor used by Kafka consumer
*/
public KafkaJsonDeserializer() {
super();
}
/**
* Configures deserializer instance.
*
* @param props Map of properties used to configure instance
* @param isKey Indicates if deserializing record key or value. Required by Kafka deserializer interface,
* no specific functionality has been implemented for key use.
*
* @see KafkaJsonDeserializerConfig Deserializer will use configs found in here and inherited classes.
*/
public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaJsonDeserializerConfig((Map<String, Object>) props);
this.client = new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("azsdk-java-KafkaJsonDeserializer/1.0.0-beta.1"))
.buildClient();
}
/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
* @param data serialized bytes, may be null
* @return deserialize object, may be null
*/
@Override
public T deserialize(String topic, byte[] data) {
return null;
}
/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
* @param headers record headers, may be null
* @param data serialized bytes, may be null
* @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code
* @return deserialize object, may be null
*/
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
T dataObject;
String schemaId;
try {
ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY));
dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType());
if (headers.lastHeader("schemaId") != null) {
schemaId = new String(headers.lastHeader("schemaId").value());
} else {
throw new JsonSerializationException("Schema Id was not found in record headers", null);
}
SchemaRegistrySchema schema = this.client.getSchema(schemaId);
JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012);
JsonSchema jSchema = factory.getSchema(schema.getDefinition());
JsonNode node = mapper.readTree(data);
Set<ValidationMessage> errors = jSchema.validate(node);
if (errors.size() == 0) {
return dataObject;
} else {
throw new JsonSerializationException("Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()), null);
}
} catch (JsonSerializationException e) {
throw e;
} catch (Exception e) {
throw new JsonSerializationException("Execption occured during deserialization", e);
}
}
@Override
public void close() { }
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import java.util.Map;
/**
*
*/
public final class KafkaJsonDeserializerConfig extends AbstractKafkaSerdeConfig {
/**
* Configures deserializer to decode into specific class instance when reading encoded bytes
*
* Defaults to Object
*/
public static final String SPECIFIC_VALUE_TYPE_CONFIG = "specific.avro.value.type";
KafkaJsonDeserializerConfig(Map<String, Object> props) {
super(props);
}
/**
* @return Specific class flag, with default set to Object class
*/
public Class<?> getJsonSpecificType() {
return (Class<?>) this.getProps().getOrDefault(SPECIFIC_VALUE_TYPE_CONFIG, Object.class);
}
}

Просмотреть файл

@ -0,0 +1,127 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import com.azure.core.util.ClientOptions;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
import java.util.Map;
/**
* Serializer implementation for Kafka producer, implementing the Kafka Serializer interface.
*
* @see KafkaJsonDeserializer See deserializer class for downstream deserializer implementation
*/
public class KafkaJsonSerializer<T> implements Serializer<T> {
private SchemaRegistryClient client;
private String schemaGroup;
/**
* Empty constructor for Kafka producer
*/
public KafkaJsonSerializer() {
super();
}
/**
* Configures serializer instance.
*
* @param props Map of properties used to configure instance.
* @param isKey Indicates if serializing record key or value. Required by Kafka serializer interface,
* no specific functionality implemented for key use.
*
* @see KafkaJsonSerializerConfig Serializer will use configs found in KafkaJsonSerializerConfig.
*/
@Override
public void configure(Map<String, ?> props, boolean isKey) {
KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig((Map<String, Object>) props);
this.schemaGroup = config.getSchemaGroup();
this.client = new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(config.getSchemaRegistryUrl())
.credential(config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("azsdk-java-KafkaJsonSerializer/1.0.0-beta.1"))
.buildClient();
}
/**
* Serializes into a byte array, containing a GUID reference to schema
* and the encoded payload.
*
* Null behavior matches Kafka treatment of null values.
*
* @param topic Topic destination for record. Required by Kafka serializer interface, currently not used.
* @param record Object to be serialized, may be null
* @return byte[] payload for sending to EH Kafka service, may be null
* @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, T record) {
return null;
}
/**
* Serializes into a byte array, containing a GUID reference to schema
* and the encoded payload.
*
* Null behavior matches Kafka treatment of null values.
*
* @param topic Topic destination for record. Required by Kafka serializer interface, currently not used.
* @param record Object to be serialized, may be null
* @param headers Record headers, may be null
* @return byte[] payload for sending to EH Kafka service, may be null
* @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Headers headers, T record) {
if (record == null) {
return null;
}
byte[] recordBytes;
try {
ObjectMapper mapper = new ObjectMapper();
recordBytes = mapper.writeValueAsBytes(record);
SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder(
SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON);
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(record.getClass());
String jsonSchemaString = jsonSchema.toString();
SchemaProperties schemaProps = this.client.registerSchema(
this.schemaGroup,
record.getClass().getName(),
jsonSchemaString,
SchemaFormat.JSON
);
headers.add("schemaId", schemaProps.getId().getBytes());
return recordBytes;
} catch (IllegalStateException e) {
throw new JsonSerializationException("Error occured while generating schema", e);
} catch (JsonProcessingException e) {
throw new JsonSerializationException("Error occured while serializing record into bytes", e);
} catch (Exception e) {
throw new JsonSerializationException("Execption occured during serialization", e);
}
}
@Override
public void close() { }
}

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import java.util.Map;
/**
* Class containing configuration properties for KafkaJsonSerializer class.
*/
public final class KafkaJsonSerializerConfig extends AbstractKafkaSerdeConfig {
/**
* If specified true, serializer will register schemas against Azure Schema Registry service under the specified
* group. See Azure Schema Registry documentation for a description of schema registration behavior.
*
* If specified false, serializer will simply query the service for an existing ID given schema content.
* Serialization will fail if the schema has not been pre-created.
*
* Auto-registration is **NOT RECOMMENDED** for production scenarios.
*
* Requires type String.
*/
public static final String AUTO_REGISTER_SCHEMAS_CONFIG = "auto.register.schemas";
public static final Boolean AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT = false;
/**
* Specifies schema group for interacting with Azure Schema Registry service.
*
* If auto-registering schemas, schema will be stored under this group.
* If not auto-registering, serializer will request schema ID for matching data schema under specified group.
*/
public static final String SCHEMA_GROUP_CONFIG = "schema.group";
KafkaJsonSerializerConfig(Map<String, Object> props) {
super(props);
}
/**
* @return auto-registration flag, with default set to false
*/
public Boolean getAutoRegisterSchemas() {
return (Boolean) this.getProps().getOrDefault(
AUTO_REGISTER_SCHEMAS_CONFIG, AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT);
}
/**
* @return schema group
*/
public String getSchemaGroup() {
if (!this.getProps().containsKey(SCHEMA_GROUP_CONFIG)) {
throw new NullPointerException("Schema group configuration property is required.");
}
return (String) this.getProps().get(SCHEMA_GROUP_CONFIG);
}
}

Просмотреть файл

@ -0,0 +1,4 @@
/**
* Package containing Json-specific implementations of Kafka serializer and deserializer.
*/
package com.microsoft.azure.schemaregistry.kafka.json;

Просмотреть файл

@ -1,83 +0,0 @@
package com.azure.schemaregistry.samples.consumer;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.models.SchemaRegistrySchema;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
public class KafkaJsonDeserializer<T> implements Deserializer<T> {
private SchemaRegistryClient client;
private KafkaJsonDeserializerConfig config;
public KafkaJsonDeserializer() {
super();
}
public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaJsonDeserializerConfig((Map<String, Object>) props);
this.client = new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.buildClient();
}
@Override
public T deserialize(String topic, byte[] data) {
return null;
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
T dataObject;
String schemaId;
ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY));
try {
dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType());
} catch (Exception e) {
e.printStackTrace();
throw new Error(e);
}
if (headers.lastHeader("schemaId") != null) {
schemaId = new String(headers.lastHeader("schemaId").value());
} else {
throw new RuntimeException("Schema Id was not found in record headers.");
}
SchemaRegistrySchema schema = this.client.getSchema(schemaId);
JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012);
JsonSchema jSchema = factory.getSchema(schema.getDefinition());
JsonNode node;
try {
node = mapper.readTree(data);
} catch (Exception e) {
e.printStackTrace();
throw new Error(e);
}
Set<ValidationMessage> errors = jSchema.validate(node);
if (errors.size() == 0) {
return dataObject;
} else {
throw new RuntimeException("Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()));
}
}
@Override
public void close() {}
}

Просмотреть файл

@ -1,24 +0,0 @@
package com.azure.schemaregistry.samples.consumer;
import java.util.Map;
import com.azure.core.credential.TokenCredential;
public class KafkaJsonDeserializerConfig {
private Map<String, Object> props;
KafkaJsonDeserializerConfig(Map<String, Object> props) {
this.props = (Map<String, Object>) props;
}
public String getSchemaRegistryUrl() {
return (String) this.props.get("schema.registry.url");
}
public TokenCredential getCredential() {
return (TokenCredential) this.props.get("schema.registry.credential");
}
public Class<?> getJsonSpecificType() {
return (Class<?>) this.props.getOrDefault("specific.value.type", Object.class);
}
}

Просмотреть файл

@ -1,80 +0,0 @@
package com.azure.schemaregistry.samples.producer;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import com.azure.data.schemaregistry.SchemaRegistryClient;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
public class KafkaJsonSerializer<T> implements Serializer<T> {
private SchemaRegistryClient client;
private String schemaGroup;
public KafkaJsonSerializer() {
super();
}
@Override
public void configure(Map<String, ?> props, boolean isKey) {
KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig((Map<String, Object>) props);
this.schemaGroup = config.getSchemaGroup();
this.client = new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(config.getSchemaRegistryUrl())
.credential(config.getCredential())
.buildClient();
}
@Override
public byte[] serialize(String topic, T data) {
return null;
}
@Override
public byte[] serialize(String topic, Headers headers, T record) {
if (record == null) {
return null;
}
byte[] recordBytes;
ObjectMapper mapper = new ObjectMapper();
try {
recordBytes = mapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
e.printStackTrace();
throw new Error(e);
}
SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON);
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(record.getClass());
String jsonSchemaString = jsonSchema.toString();
SchemaProperties schemaProps = this.client.registerSchema(
this.schemaGroup,
record.getClass().getName(),
jsonSchemaString,
SchemaFormat.JSON
);
headers.add("schemaId", schemaProps.getId().getBytes());
return recordBytes;
}
@Override
public void close() {}
}

Просмотреть файл

@ -1,24 +0,0 @@
package com.azure.schemaregistry.samples.producer;
import java.util.Map;
import com.azure.core.credential.TokenCredential;
public class KafkaJsonSerializerConfig {
private Map<String, Object> props;
KafkaJsonSerializerConfig(Map<String, Object> props) {
this.props = (Map<String, Object>) props;
}
public String getSchemaRegistryUrl() {
return (String) this.props.get("schema.registry.url");
}
public TokenCredential getCredential() {
return (TokenCredential) this.props.get("schema.registry.credential");
}
public String getSchemaGroup() {
return (String) this.props.get("schema.group");
}
}

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import com.azure.core.credential.TokenCredential;
public class AbstractKafkaSerdeConfigTest {
@Test
public void testMaxSchemaMapSizeDefault() {
AbstractKafkaSerdeConfig config = new AbstractKafkaSerdeConfig(Collections.emptyMap());
assertEquals(AbstractKafkaSerdeConfig.MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT, config.getMaxSchemaMapSize());
}
@Test
public void testSchemaRegistryUrl() {
String dummyString = "dummyString"; // does not get validated at this layer
AbstractKafkaSerdeConfig config = new KafkaJsonSerializerConfig(
Collections.singletonMap(AbstractKafkaSerdeConfig.SCHEMA_REGISTRY_URL_CONFIG, dummyString));
assertEquals(dummyString, config.getSchemaRegistryUrl());
}
@Test
public void testSchemaRegistryCredential() {
TokenCredential dummyCredential = tokenRequestContext -> null;
AbstractKafkaSerdeConfig config = new KafkaJsonSerializerConfig(
Collections.singletonMap(AbstractKafkaSerdeConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, dummyCredential));
assertEquals(dummyCredential, config.getCredential());
}
}

Просмотреть файл

@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.*;
public class KafkaJsonDeserializerConfigTest {
@Test
public void TestJsonSpecificTypeDefault() {
KafkaJsonDeserializerConfig config = new KafkaJsonDeserializerConfig(Collections.emptyMap());
assertEquals(Object.class, config.getJsonSpecificType());
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.*;
public class KafkaJsonSerializerConfigTest {
@Test
public void testGetAutoRegisterSchemasDefault() {
KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig(Collections.emptyMap());
assertEquals(KafkaJsonSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT, config.getAutoRegisterSchemas());
}
@Test
public void testNoSchemaGroupProvidedThenFail() {
KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig(Collections.emptyMap());
try {
config.getSchemaGroup();
fail();
} catch (NullPointerException e) {
assertTrue(true);
}
}
}

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.azure.schemaregistry.kafka.json;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KafkaJsonSerializerTest {
@Test
public void testNullRecordReturnNull() {
KafkaJsonSerializer serializer = new KafkaJsonSerializer();
assertEquals(null, serializer.serialize("dummy-topic", null));
}
}