diff --git a/java/json/pom.xml b/java/json/pom.xml index 45e58e5..76e6f0b 100644 --- a/java/json/pom.xml +++ b/java/json/pom.xml @@ -19,11 +19,65 @@ HEAD + + 1.8 + 1.8 + + - junit - junit - 3.8.1 + com.azure + azure-core + 1.35.0 + + + com.azure + azure-data-schemaregistry + 1.4.0-beta.1 + + + org.apache.kafka + kafka-clients + 3.3.2 + + + com.azure + azure-identity + 1.8.0 + + + com.fasterxml.jackson.core + jackson-core + 2.14.2 + + + com.github.victools + jsonschema-generator + 4.28.0 + + + com.networknt + json-schema-validator + 1.0.76 + + + + + org.junit.jupiter + junit-jupiter-api + 5.6.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.6.2 + test + + + org.junit.jupiter + junit-jupiter-params + 5.6.2 test diff --git a/java/json/src/samples/producer/pom.xml b/java/json/samples/comsumer/pom.xml similarity index 58% rename from java/json/src/samples/producer/pom.xml rename to java/json/samples/comsumer/pom.xml index 7101377..3b191f3 100644 --- a/java/json/src/samples/producer/pom.xml +++ b/java/json/samples/comsumer/pom.xml @@ -20,31 +20,21 @@ 3.8.1 test + + org.slf4j + slf4j-api + 2.0.6 + + + org.slf4j + slf4j-simple + 2.0.6 + - com.azure - azure-data-schemaregistry - 1.4.0-beta.1 - - - org.apache.kafka - kafka-clients - 3.3.2 - - - com.azure - azure-identity - 1.8.0 - - - com.fasterxml.jackson.core - jackson-core - 2.14.2 - - - com.github.victools - jsonschema-generator - 4.28.0 + com.microsoft.azure + azure-schemaregistry-kafka-json + 1.0.0-beta.1 diff --git a/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java new file mode 100644 index 0000000..0ba793b --- /dev/null +++ b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java @@ -0,0 +1,49 @@ +package com.azure.schemaregistry.samples; + +public class CustomerInvoice { + private String invoiceId; + private String merchantId; + private int transactionValueUsd; + private String userId; + + public CustomerInvoice() {} + + public CustomerInvoice(String invoiceId, String merchantId, int transactionValueUsd, String userId) { + this.invoiceId = invoiceId; + this.merchantId = merchantId; + this.transactionValueUsd = transactionValueUsd; + this.userId = userId; + } + + public String getInvoiceId() { + return this.invoiceId; + } + + public void setInvoiceId(String id) { + this.invoiceId = id; + } + + public String getMerchantId() { + return this.merchantId; + } + + public void setMerchantId(String id) { + this.merchantId = id; + } + + public int getTransactionValueUsd() { + return this.transactionValueUsd; + } + + public void setTransactionValueUsd(int transactionValueUsd) { + this.transactionValueUsd = transactionValueUsd; + } + + public String getUserId() { + return this.userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } +} diff --git a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/Order.java b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/Order.java similarity index 100% rename from java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/Order.java rename to java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/Order.java diff --git a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java similarity index 97% rename from java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java rename to java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java index 2e2a9ea..dc57b5e 100644 --- a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java +++ b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/App.java @@ -45,7 +45,7 @@ public class App Scanner in = new Scanner(System.in); System.out.println("Enter case number:"); - System.out.println("1 - consume Avro SpecificRecords"); + System.out.println("1 - Consume SpecificRecords"); int caseNum = in.nextInt(); switch (caseNum) { diff --git a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java similarity index 77% rename from java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java rename to java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java index d3ec6e5..4a65887 100644 --- a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java +++ b/java/json/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonSpecificRecord.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import com.azure.core.credential.TokenCredential; import com.azure.core.util.logging.ClientLogger; +import com.azure.schemaregistry.samples.CustomerInvoice; import com.azure.schemaregistry.samples.Order; public class KafkaJsonSpecificRecord { @@ -29,24 +30,23 @@ public class KafkaJsonSpecificRecord { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - com.azure.schemaregistry.samples.consumer.KafkaJsonDeserializer.class); + com.microsoft.azure.schemaregistry.kafka.json.KafkaJsonDeserializer.class); // Schema Registry configs props.put("schema.registry.url", registryUrl); props.put("schema.registry.credential", credential); props.put("auto.register.schemas", true); - props.put("specific.value.type", Order.class); + props.put("specific.value.type", CustomerInvoice.class); - final KafkaConsumer consumer = new KafkaConsumer<>(props); + final KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topicName)); try { System.out.println("Reading records..."); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(5000)); - for (ConsumerRecord record : records) { - logger.info("Order received: " + record.value().toString()); - System.out.println("Order received: " + record.value().toString()); + ConsumerRecords records = consumer.poll(Duration.ofMillis(5000)); + for (ConsumerRecord record : records) { + logger.info("Invoice received: " + record.value()); } } } finally { diff --git a/java/json/src/samples/comsumer/src/main/java/resources/app.properties b/java/json/samples/comsumer/src/main/java/resources/app.properties similarity index 100% rename from java/json/src/samples/comsumer/src/main/java/resources/app.properties rename to java/json/samples/comsumer/src/main/java/resources/app.properties diff --git a/java/json/src/samples/comsumer/src/test/java/com/microsoft/azure/AppTest.java b/java/json/samples/comsumer/src/test/java/com/microsoft/azure/AppTest.java similarity index 100% rename from java/json/src/samples/comsumer/src/test/java/com/microsoft/azure/AppTest.java rename to java/json/samples/comsumer/src/test/java/com/microsoft/azure/AppTest.java diff --git a/java/json/src/samples/comsumer/pom.xml b/java/json/samples/producer/pom.xml similarity index 53% rename from java/json/src/samples/comsumer/pom.xml rename to java/json/samples/producer/pom.xml index 073fd29..3b191f3 100644 --- a/java/json/src/samples/comsumer/pom.xml +++ b/java/json/samples/producer/pom.xml @@ -20,36 +20,21 @@ 3.8.1 test + + org.slf4j + slf4j-api + 2.0.6 + + + org.slf4j + slf4j-simple + 2.0.6 + - com.azure - azure-data-schemaregistry - 1.4.0-beta.1 - - - org.apache.kafka - kafka-clients - 3.3.2 - - - com.azure - azure-identity - 1.8.0 - - - com.fasterxml.jackson.core - jackson-core - 2.14.2 - - - com.github.victools - jsonschema-generator - 4.28.0 - - - com.networknt - json-schema-validator - 1.0.76 + com.microsoft.azure + azure-schemaregistry-kafka-json + 1.0.0-beta.1 diff --git a/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java new file mode 100644 index 0000000..0ba793b --- /dev/null +++ b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/CustomerInvoice.java @@ -0,0 +1,49 @@ +package com.azure.schemaregistry.samples; + +public class CustomerInvoice { + private String invoiceId; + private String merchantId; + private int transactionValueUsd; + private String userId; + + public CustomerInvoice() {} + + public CustomerInvoice(String invoiceId, String merchantId, int transactionValueUsd, String userId) { + this.invoiceId = invoiceId; + this.merchantId = merchantId; + this.transactionValueUsd = transactionValueUsd; + this.userId = userId; + } + + public String getInvoiceId() { + return this.invoiceId; + } + + public void setInvoiceId(String id) { + this.invoiceId = id; + } + + public String getMerchantId() { + return this.merchantId; + } + + public void setMerchantId(String id) { + this.merchantId = id; + } + + public int getTransactionValueUsd() { + return this.transactionValueUsd; + } + + public void setTransactionValueUsd(int transactionValueUsd) { + this.transactionValueUsd = transactionValueUsd; + } + + public String getUserId() { + return this.userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } +} diff --git a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/Order.java b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/Order.java similarity index 100% rename from java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/Order.java rename to java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/Order.java diff --git a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java similarity index 86% rename from java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java rename to java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java index b13180f..aee27ba 100644 --- a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java +++ b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/App.java @@ -28,10 +28,6 @@ public class App credential = new ManagedIdentityCredentialBuilder() .clientId(props.getProperty("managed.identity.clientId")) .build(); - } else if (props.getProperty("managed.identity.resourceId") != null) { - credential = new ManagedIdentityCredentialBuilder() - .resourceId(props.getProperty("managed.identity.resourceId")) - .build(); } else { credential = new ManagedIdentityCredentialBuilder().build(); } @@ -46,7 +42,7 @@ public class App Scanner in = new Scanner(System.in); System.out.println("Enter case number:"); - System.out.println("1 - produce Avro SpecificRecords"); + System.out.println("1 - Produce SpecificRecords"); int caseNum = in.nextInt(); switch (caseNum) { diff --git a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java similarity index 76% rename from java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java rename to java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java index 78085d0..9dd36ed 100644 --- a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java +++ b/java/json/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSpecificRecord.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import com.azure.core.credential.TokenCredential; import com.azure.core.util.logging.ClientLogger; +import com.azure.schemaregistry.samples.CustomerInvoice; import com.azure.schemaregistry.samples.Order; public class KafkaJsonSpecificRecord { @@ -25,14 +26,14 @@ public class KafkaJsonSpecificRecord { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - com.azure.schemaregistry.samples.producer.KafkaJsonSerializer.class); + com.microsoft.azure.schemaregistry.kafka.json.KafkaJsonSerializer.class); // Schema Registry configs props.put("schema.registry.url", registryUrl); props.put("schema.registry.credential", credential); props.put("auto.register.schemas", true); props.put("schema.group", schemaGroup); - KafkaProducer producer = new KafkaProducer(props); + KafkaProducer producer = new KafkaProducer(props); String key = "sample-key"; @@ -40,11 +41,10 @@ public class KafkaJsonSpecificRecord { try { while (true) { for (int i = 0; i < 10; i++) { - Order order = new Order("ID-" + i, 0.99 + i, "Sample order #" + Math.abs(new Random().nextInt())); - ProducerRecord record = new ProducerRecord(topicName, key, order); + CustomerInvoice invoice = new CustomerInvoice("Invoice " + i, "Merchant Id " + i, i, "User Id " + i); + ProducerRecord record = new ProducerRecord(topicName, key, invoice); producer.send(record); - logger.info("Sent Order {}", order); - System.out.println("Sent Order " + order.getId()); + logger.info("Sent Order " + invoice.getInvoiceId()); } producer.flush(); try { diff --git a/java/json/src/samples/producer/src/main/java/resources/app.properties b/java/json/samples/producer/src/main/java/resources/app.properties similarity index 100% rename from java/json/src/samples/producer/src/main/java/resources/app.properties rename to java/json/samples/producer/src/main/java/resources/app.properties diff --git a/java/json/src/samples/producer/src/test/java/com/microsoft/azure/AppTest.java b/java/json/samples/producer/src/test/java/com/microsoft/azure/AppTest.java similarity index 100% rename from java/json/src/samples/producer/src/test/java/com/microsoft/azure/AppTest.java rename to java/json/samples/producer/src/test/java/com/microsoft/azure/AppTest.java diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfig.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfig.java new file mode 100644 index 0000000..e410a10 --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfig.java @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import com.azure.core.credential.TokenCredential; +import com.azure.data.schemaregistry.SchemaRegistryClient; + +import java.util.Map; + +/** + * + */ + +class AbstractKafkaSerdeConfig { + private Map props; + + /** + * Required. + * + * Sets the service endpoint for the Azure Schema Registry instance + */ + public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; + + /** + * Required. + * + * Sets the {@link TokenCredential} to use when authenticating HTTP requests for this + * * {@link SchemaRegistryClient}. + */ + public static final String SCHEMA_REGISTRY_CREDENTIAL_CONFIG = "schema.registry.credential"; + + /** + * Schema cache size limit on underlying {@link SchemaRegistryClient}. If limit is exceeded on any cache, + * all caches are recycled. + */ + public static final String MAX_SCHEMA_MAP_SIZE_CONFIG = "max.schema.map.size"; + + public static final Integer MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT = 1000; + + AbstractKafkaSerdeConfig(Map props) { + this.props = (Map) props; + } + + public Map getProps() { + return props; + } + + public String getSchemaRegistryUrl() { + return (String) this.props.get(SCHEMA_REGISTRY_URL_CONFIG); + } + + public TokenCredential getCredential() { + return (TokenCredential) this.props.get(SCHEMA_REGISTRY_CREDENTIAL_CONFIG); + } + + public Integer getMaxSchemaMapSize() { + return (Integer) this.props.getOrDefault(MAX_SCHEMA_MAP_SIZE_CONFIG, MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT); + } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/JsonSerializationException.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/JsonSerializationException.java new file mode 100644 index 0000000..59ba5ce --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/JsonSerializationException.java @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import org.apache.kafka.common.errors.SerializationException; + +/** + * Custom error class for exceptions thrown in Json serialization/deserialization steps + */ +public class JsonSerializationException extends SerializationException { + /** + * Constructor with message only + * @param errorMessage Brief explination of exception source. + */ + public JsonSerializationException(String errorMessage) { + super(errorMessage); + } + + /** + * Constructor with message and throwable error + * @param errorMessage Brief explination of exception source. + * @param err Throwable error object + */ + public JsonSerializationException(String errorMessage, Throwable err) { + super(errorMessage, err); + } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializer.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializer.java new file mode 100644 index 0000000..8c3c059 --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializer.java @@ -0,0 +1,115 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import com.azure.core.util.ClientOptions; +import com.azure.data.schemaregistry.SchemaRegistryClient; +import com.azure.data.schemaregistry.SchemaRegistryClientBuilder; +import com.azure.data.schemaregistry.models.SchemaRegistrySchema; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.networknt.schema.JsonSchema; +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import com.networknt.schema.ValidationMessage; + +/** + * Deserializer implementation for Kafka consumer, implementing Kafka Deserializer interface. + * + * @see KafkaJsonSerializer See serializer class for upstream serializer implementation + */ +public class KafkaJsonDeserializer implements Deserializer { + private SchemaRegistryClient client; + private KafkaJsonDeserializerConfig config; + + /** + * Empty constructor used by Kafka consumer + */ + public KafkaJsonDeserializer() { + super(); + } + + /** + * Configures deserializer instance. + * + * @param props Map of properties used to configure instance + * @param isKey Indicates if deserializing record key or value. Required by Kafka deserializer interface, + * no specific functionality has been implemented for key use. + * + * @see KafkaJsonDeserializerConfig Deserializer will use configs found in here and inherited classes. + */ + public void configure(Map props, boolean isKey) { + this.config = new KafkaJsonDeserializerConfig((Map) props); + + this.client = new SchemaRegistryClientBuilder() + .fullyQualifiedNamespace(this.config.getSchemaRegistryUrl()) + .credential(this.config.getCredential()) + .clientOptions(new ClientOptions().setApplicationId("azsdk-java-KafkaJsonDeserializer/1.0.0-beta.1")) + .buildClient(); + } + + /** + * Deserializes byte array into Java object + * @param topic topic associated with the record bytes + * @param data serialized bytes, may be null + * @return deserialize object, may be null + */ + @Override + public T deserialize(String topic, byte[] data) { + return null; + } + + /** + * Deserializes byte array into Java object + * @param topic topic associated with the record bytes + * @param headers record headers, may be null + * @param data serialized bytes, may be null + * @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code + * @return deserialize object, may be null + */ + @Override + public T deserialize(String topic, Headers headers, byte[] data) { + T dataObject; + String schemaId; + + try { + ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY)); + dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType()); + + if (headers.lastHeader("schemaId") != null) { + schemaId = new String(headers.lastHeader("schemaId").value()); + } else { + throw new JsonSerializationException("Schema Id was not found in record headers", null); + } + + SchemaRegistrySchema schema = this.client.getSchema(schemaId); + + JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012); + JsonSchema jSchema = factory.getSchema(schema.getDefinition()); + JsonNode node = mapper.readTree(data); + + Set errors = jSchema.validate(node); + if (errors.size() == 0) { + return dataObject; + } else { + throw new JsonSerializationException("Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()), null); + } + } catch (JsonSerializationException e) { + throw e; + } catch (Exception e) { + throw new JsonSerializationException("Execption occured during deserialization", e); + } + } + + @Override + public void close() { } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfig.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfig.java new file mode 100644 index 0000000..695a82c --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfig.java @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import java.util.Map; + +/** + * + */ +public final class KafkaJsonDeserializerConfig extends AbstractKafkaSerdeConfig { + /** + * Configures deserializer to decode into specific class instance when reading encoded bytes + * + * Defaults to Object + */ + public static final String SPECIFIC_VALUE_TYPE_CONFIG = "specific.avro.value.type"; + + KafkaJsonDeserializerConfig(Map props) { + super(props); + } + + /** + * @return Specific class flag, with default set to Object class + */ + public Class getJsonSpecificType() { + return (Class) this.getProps().getOrDefault(SPECIFIC_VALUE_TYPE_CONFIG, Object.class); + } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializer.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializer.java new file mode 100644 index 0000000..3810b73 --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializer.java @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serializer; +import com.azure.core.util.ClientOptions; +import com.azure.data.schemaregistry.SchemaRegistryClient; +import com.azure.data.schemaregistry.SchemaRegistryClientBuilder; +import com.azure.data.schemaregistry.models.SchemaFormat; +import com.azure.data.schemaregistry.models.SchemaProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.victools.jsonschema.generator.OptionPreset; +import com.github.victools.jsonschema.generator.SchemaGenerator; +import com.github.victools.jsonschema.generator.SchemaGeneratorConfig; +import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder; +import com.github.victools.jsonschema.generator.SchemaVersion; +import java.util.Map; + +/** + * Serializer implementation for Kafka producer, implementing the Kafka Serializer interface. + * + * @see KafkaJsonDeserializer See deserializer class for downstream deserializer implementation + */ +public class KafkaJsonSerializer implements Serializer { + private SchemaRegistryClient client; + private String schemaGroup; + + /** + * Empty constructor for Kafka producer + */ + public KafkaJsonSerializer() { + super(); + } + + /** + * Configures serializer instance. + * + * @param props Map of properties used to configure instance. + * @param isKey Indicates if serializing record key or value. Required by Kafka serializer interface, + * no specific functionality implemented for key use. + * + * @see KafkaJsonSerializerConfig Serializer will use configs found in KafkaJsonSerializerConfig. + */ + @Override + public void configure(Map props, boolean isKey) { + KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig((Map) props); + + this.schemaGroup = config.getSchemaGroup(); + + this.client = new SchemaRegistryClientBuilder() + .fullyQualifiedNamespace(config.getSchemaRegistryUrl()) + .credential(config.getCredential()) + .clientOptions(new ClientOptions().setApplicationId("azsdk-java-KafkaJsonSerializer/1.0.0-beta.1")) + .buildClient(); + } + + /** + * Serializes into a byte array, containing a GUID reference to schema + * and the encoded payload. + * + * Null behavior matches Kafka treatment of null values. + * + * @param topic Topic destination for record. Required by Kafka serializer interface, currently not used. + * @param record Object to be serialized, may be null + * @return byte[] payload for sending to EH Kafka service, may be null + * @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code + */ + @Override + public byte[] serialize(String topic, T record) { + return null; + } + + /** + * Serializes into a byte array, containing a GUID reference to schema + * and the encoded payload. + * + * Null behavior matches Kafka treatment of null values. + * + * @param topic Topic destination for record. Required by Kafka serializer interface, currently not used. + * @param record Object to be serialized, may be null + * @param headers Record headers, may be null + * @return byte[] payload for sending to EH Kafka service, may be null + * @throws JsonSerializationException Wrapped exception catchable by core Kafka producer code + */ + @Override + public byte[] serialize(String topic, Headers headers, T record) { + if (record == null) { + return null; + } + + byte[] recordBytes; + try { + ObjectMapper mapper = new ObjectMapper(); + recordBytes = mapper.writeValueAsBytes(record); + + SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder( + SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON); + SchemaGeneratorConfig config = configBuilder.build(); + SchemaGenerator generator = new SchemaGenerator(config); + JsonNode jsonSchema = generator.generateSchema(record.getClass()); + String jsonSchemaString = jsonSchema.toString(); + + SchemaProperties schemaProps = this.client.registerSchema( + this.schemaGroup, + record.getClass().getName(), + jsonSchemaString, + SchemaFormat.JSON + ); + + headers.add("schemaId", schemaProps.getId().getBytes()); + return recordBytes; + } catch (IllegalStateException e) { + throw new JsonSerializationException("Error occured while generating schema", e); + } catch (JsonProcessingException e) { + throw new JsonSerializationException("Error occured while serializing record into bytes", e); + } catch (Exception e) { + throw new JsonSerializationException("Execption occured during serialization", e); + } + } + + @Override + public void close() { } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfig.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfig.java new file mode 100644 index 0000000..9025a2a --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfig.java @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import java.util.Map; + +/** + * Class containing configuration properties for KafkaJsonSerializer class. + */ +public final class KafkaJsonSerializerConfig extends AbstractKafkaSerdeConfig { + /** + * If specified true, serializer will register schemas against Azure Schema Registry service under the specified + * group. See Azure Schema Registry documentation for a description of schema registration behavior. + * + * If specified false, serializer will simply query the service for an existing ID given schema content. + * Serialization will fail if the schema has not been pre-created. + * + * Auto-registration is **NOT RECOMMENDED** for production scenarios. + * + * Requires type String. + */ + + public static final String AUTO_REGISTER_SCHEMAS_CONFIG = "auto.register.schemas"; + + public static final Boolean AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT = false; + + /** + * Specifies schema group for interacting with Azure Schema Registry service. + * + * If auto-registering schemas, schema will be stored under this group. + * If not auto-registering, serializer will request schema ID for matching data schema under specified group. + */ + public static final String SCHEMA_GROUP_CONFIG = "schema.group"; + + KafkaJsonSerializerConfig(Map props) { + super(props); + } + + /** + * @return auto-registration flag, with default set to false + */ + public Boolean getAutoRegisterSchemas() { + return (Boolean) this.getProps().getOrDefault( + AUTO_REGISTER_SCHEMAS_CONFIG, AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT); + } + + /** + * @return schema group + */ + public String getSchemaGroup() { + if (!this.getProps().containsKey(SCHEMA_GROUP_CONFIG)) { + throw new NullPointerException("Schema group configuration property is required."); + } + return (String) this.getProps().get(SCHEMA_GROUP_CONFIG); + } +} diff --git a/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/package-info.java b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/package-info.java new file mode 100644 index 0000000..e6b170f --- /dev/null +++ b/java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/package-info.java @@ -0,0 +1,4 @@ +/** + * Package containing Json-specific implementations of Kafka serializer and deserializer. + */ +package com.microsoft.azure.schemaregistry.kafka.json; \ No newline at end of file diff --git a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializer.java b/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializer.java deleted file mode 100644 index a975d97..0000000 --- a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializer.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.azure.schemaregistry.samples.consumer; - -import java.util.Arrays; -import java.util.Map; -import java.util.Set; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Deserializer; -import com.azure.data.schemaregistry.SchemaRegistryClient; -import com.azure.data.schemaregistry.SchemaRegistryClientBuilder; -import com.azure.data.schemaregistry.models.SchemaRegistrySchema; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.networknt.schema.JsonSchema; -import com.networknt.schema.JsonSchemaFactory; -import com.networknt.schema.SpecVersion; -import com.networknt.schema.ValidationMessage; - -public class KafkaJsonDeserializer implements Deserializer { - private SchemaRegistryClient client; - private KafkaJsonDeserializerConfig config; - - public KafkaJsonDeserializer() { - super(); - } - - public void configure(Map props, boolean isKey) { - this.config = new KafkaJsonDeserializerConfig((Map) props); - - this.client = new SchemaRegistryClientBuilder() - .fullyQualifiedNamespace(this.config.getSchemaRegistryUrl()) - .credential(this.config.getCredential()) - .buildClient(); - } - - @Override - public T deserialize(String topic, byte[] data) { - return null; - } - - @Override - public T deserialize(String topic, Headers headers, byte[] data) { - T dataObject; - String schemaId; - - ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY)); - try { - dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType()); - } catch (Exception e) { - e.printStackTrace(); - throw new Error(e); - } - - if (headers.lastHeader("schemaId") != null) { - schemaId = new String(headers.lastHeader("schemaId").value()); - } else { - throw new RuntimeException("Schema Id was not found in record headers."); - } - SchemaRegistrySchema schema = this.client.getSchema(schemaId); - - JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012); - JsonSchema jSchema = factory.getSchema(schema.getDefinition()); - JsonNode node; - try { - node = mapper.readTree(data); - } catch (Exception e) { - e.printStackTrace(); - throw new Error(e); - } - Set errors = jSchema.validate(node); - - if (errors.size() == 0) { - return dataObject; - } else { - throw new RuntimeException("Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray())); - } - } - - @Override - public void close() {} -} diff --git a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializerConfig.java b/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializerConfig.java deleted file mode 100644 index 298b31b..0000000 --- a/java/json/src/samples/comsumer/src/main/java/com/azure/schemaregistry/samples/consumer/KafkaJsonDeserializerConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.azure.schemaregistry.samples.consumer; - -import java.util.Map; -import com.azure.core.credential.TokenCredential; - -public class KafkaJsonDeserializerConfig { - private Map props; - - KafkaJsonDeserializerConfig(Map props) { - this.props = (Map) props; - } - - public String getSchemaRegistryUrl() { - return (String) this.props.get("schema.registry.url"); - } - - public TokenCredential getCredential() { - return (TokenCredential) this.props.get("schema.registry.credential"); - } - - public Class getJsonSpecificType() { - return (Class) this.props.getOrDefault("specific.value.type", Object.class); - } -} diff --git a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializer.java b/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializer.java deleted file mode 100644 index af312a0..0000000 --- a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializer.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.azure.schemaregistry.samples.producer; - -import java.util.Map; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Serializer; -import com.azure.data.schemaregistry.SchemaRegistryClient; -import com.azure.data.schemaregistry.SchemaRegistryClientBuilder; -import com.azure.data.schemaregistry.models.SchemaFormat; -import com.azure.data.schemaregistry.models.SchemaProperties; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.victools.jsonschema.generator.OptionPreset; -import com.github.victools.jsonschema.generator.SchemaGenerator; -import com.github.victools.jsonschema.generator.SchemaGeneratorConfig; -import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder; -import com.github.victools.jsonschema.generator.SchemaVersion; - -public class KafkaJsonSerializer implements Serializer { - private SchemaRegistryClient client; - private String schemaGroup; - - public KafkaJsonSerializer() { - super(); - } - - @Override - public void configure(Map props, boolean isKey) { - KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig((Map) props); - - this.schemaGroup = config.getSchemaGroup(); - - this.client = new SchemaRegistryClientBuilder() - .fullyQualifiedNamespace(config.getSchemaRegistryUrl()) - .credential(config.getCredential()) - .buildClient(); - } - - @Override - public byte[] serialize(String topic, T data) { - return null; - } - - @Override - public byte[] serialize(String topic, Headers headers, T record) { - if (record == null) { - return null; - } - - byte[] recordBytes; - - ObjectMapper mapper = new ObjectMapper(); - try { - recordBytes = mapper.writeValueAsBytes(record); - } catch (JsonProcessingException e) { - e.printStackTrace(); - throw new Error(e); - } - - SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON); - SchemaGeneratorConfig config = configBuilder.build(); - SchemaGenerator generator = new SchemaGenerator(config); - JsonNode jsonSchema = generator.generateSchema(record.getClass()); - String jsonSchemaString = jsonSchema.toString(); - - SchemaProperties schemaProps = this.client.registerSchema( - this.schemaGroup, - record.getClass().getName(), - jsonSchemaString, - SchemaFormat.JSON - ); - - headers.add("schemaId", schemaProps.getId().getBytes()); - - return recordBytes; - } - - @Override - public void close() {} -} diff --git a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializerConfig.java b/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializerConfig.java deleted file mode 100644 index 5fcef14..0000000 --- a/java/json/src/samples/producer/src/main/java/com/azure/schemaregistry/samples/producer/KafkaJsonSerializerConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.azure.schemaregistry.samples.producer; - -import java.util.Map; -import com.azure.core.credential.TokenCredential; - -public class KafkaJsonSerializerConfig { - private Map props; - - KafkaJsonSerializerConfig(Map props) { - this.props = (Map) props; - } - - public String getSchemaRegistryUrl() { - return (String) this.props.get("schema.registry.url"); - } - - public TokenCredential getCredential() { - return (TokenCredential) this.props.get("schema.registry.credential"); - } - - public String getSchemaGroup() { - return (String) this.props.get("schema.group"); - } -} diff --git a/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfigTest.java b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfigTest.java new file mode 100644 index 0000000..81c69d7 --- /dev/null +++ b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/AbstractKafkaSerdeConfigTest.java @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.Collections; +import org.junit.jupiter.api.Test; +import com.azure.core.credential.TokenCredential; + +public class AbstractKafkaSerdeConfigTest { + @Test + public void testMaxSchemaMapSizeDefault() { + AbstractKafkaSerdeConfig config = new AbstractKafkaSerdeConfig(Collections.emptyMap()); + assertEquals(AbstractKafkaSerdeConfig.MAX_SCHEMA_MAP_SIZE_CONFIG_DEFAULT, config.getMaxSchemaMapSize()); + } + + @Test + public void testSchemaRegistryUrl() { + String dummyString = "dummyString"; // does not get validated at this layer + AbstractKafkaSerdeConfig config = new KafkaJsonSerializerConfig( + Collections.singletonMap(AbstractKafkaSerdeConfig.SCHEMA_REGISTRY_URL_CONFIG, dummyString)); + assertEquals(dummyString, config.getSchemaRegistryUrl()); + } + + @Test + public void testSchemaRegistryCredential() { + TokenCredential dummyCredential = tokenRequestContext -> null; + AbstractKafkaSerdeConfig config = new KafkaJsonSerializerConfig( + Collections.singletonMap(AbstractKafkaSerdeConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, dummyCredential)); + assertEquals(dummyCredential, config.getCredential()); + } +} diff --git a/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfigTest.java b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfigTest.java new file mode 100644 index 0000000..987065b --- /dev/null +++ b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializerConfigTest.java @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; + +public class KafkaJsonDeserializerConfigTest { + @Test + public void TestJsonSpecificTypeDefault() { + KafkaJsonDeserializerConfig config = new KafkaJsonDeserializerConfig(Collections.emptyMap()); + assertEquals(Object.class, config.getJsonSpecificType()); + } +} \ No newline at end of file diff --git a/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfigTest.java b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfigTest.java new file mode 100644 index 0000000..83519aa --- /dev/null +++ b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerConfigTest.java @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.*; + +public class KafkaJsonSerializerConfigTest { + @Test + public void testGetAutoRegisterSchemasDefault() { + KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig(Collections.emptyMap()); + assertEquals(KafkaJsonSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG_DEFAULT, config.getAutoRegisterSchemas()); + } + + @Test + public void testNoSchemaGroupProvidedThenFail() { + KafkaJsonSerializerConfig config = new KafkaJsonSerializerConfig(Collections.emptyMap()); + try { + config.getSchemaGroup(); + fail(); + } catch (NullPointerException e) { + assertTrue(true); + } + } +} diff --git a/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerTest.java b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerTest.java new file mode 100644 index 0000000..8ab662d --- /dev/null +++ b/java/json/src/test/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonSerializerTest.java @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.azure.schemaregistry.kafka.json; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class KafkaJsonSerializerTest { + @Test + public void testNullRecordReturnNull() { + KafkaJsonSerializer serializer = new KafkaJsonSerializer(); + assertEquals(null, serializer.serialize("dummy-topic", null)); + } +} \ No newline at end of file