Json .Net sample with just Newtonsoft.Json dependency (#40)

* Json .Net sample with just Newtonsoft.Json dependency

* A slightly better implementation of deserialization by skip the overhead of loading JSON into a JObject/JArray then back into JSON again

* Add copyright to program.cs
This commit is contained in:
Eric Lam (MSFT) 2023-02-22 16:35:19 -08:00 коммит произвёл GitHub
Родитель d92bd7fe3d
Коммит 5d431d0860
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 403 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28307.1231
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.Kafka.SchemaRegistry.Json.Samples", "samples\Microsoft.Azure.Kafka.SchemaRegistry.Json.Samples.csproj", "{8C67EF62-768F-411D-9ED3-EA1CA79E2B58}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{8C67EF62-768F-411D-9ED3-EA1CA79E2B58}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8C67EF62-768F-411D-9ED3-EA1CA79E2B58}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8C67EF62-768F-411D-9ED3-EA1CA79E2B58}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8C67EF62-768F-411D-9ED3-EA1CA79E2B58}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8EDB178D-714E-4E69-A634-1F2770F14E4B}
EndGlobalSection
EndGlobal

Просмотреть файл

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="EH_FQDN" value="XXXXXXXX.servicebus.windows.net:9093"/>
<add key="EH_JAAS_CONFIG" value="Endpoint=sb://XXXXXXXX.servicebus.windows.net/;SharedAccessKeyName=XXXXXXXX;SharedAccessKey=XXXXXXXX"/>
<add key="EH_NAME" value="XXXXXXXX"/>
<add key="KAFKA_GROUP" value="XXXXXXXX"/>
<add key="SCHEMA_GROUP" value="XXXXXXXX"/>
<add key="SCHEMA_REGISTRY_URL" value="XXXXXXXX.servicebus.windows.net"/>
<add key="SCHEMA_REGISTRY_TENANT_ID" value="[app tenant GUID]XXXXXXXX"/>
<add key="SCHEMA_REGISTRY_CLIENT_ID" value="[app client GUID]"/>
<add key="SCHEMA_REGISTRY_CLIENT_SECRET" value="XXXXXXXX"/>
</appSettings>
</configuration>

Просмотреть файл

@ -0,0 +1,28 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
//Licensed under the MIT License.
//Licensed under the Apache License, Version 2.0
//
//Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
namespace com.azure.schemaregistry.samples
{
using System;
using Newtonsoft.Json;
[JsonObject]
public class CustomerInvoice
{
[JsonProperty]
public string InvoiceId { get; set; }
[JsonProperty]
public string MerchantId { get; set; }
[JsonProperty]
public int TransactionValueUsd { get; set; }
[JsonProperty]
public string UserId { get; set; }
}
}

Просмотреть файл

@ -0,0 +1,95 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Kafka.SchemaRegistry.Json
{
using System;
using System.IO;
using System.Threading;
using System.Text;
using global::Azure.Core;
using global::Azure.Data.SchemaRegistry;
using Confluent.Kafka;
using Newtonsoft.Json;
using Newtonsoft.Json.Schema;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
/// <summary>
/// Sample implementation of Kafka Json deserializaer, wrapping Azure Schema Registry C# implementation.
///
/// This is meant for reference and sample purposes only, do not use this for production.
/// </summary>
/// <typeparam name="T"></typeparam>
public class KafkaJsonDeserializer<T> : IDeserializer<T>
{
readonly SchemaRegistryClient schemaRegistryClient;
readonly JsonSerializer serializer;
public KafkaJsonDeserializer(string schemaRegistryUrl, TokenCredential credential)
{
this.schemaRegistryClient = new SchemaRegistryClient(schemaRegistryUrl, credential);
this.serializer = new JsonSerializer();
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull || data == null || data.IsEmpty)
{
return default(T);
}
if (!context.Headers.TryGetLastBytes("schemaId", out var lastHeader) || lastHeader.Length == 0)
{
return default(T);
}
var schemaId = UTF8Encoding.UTF8.GetString(lastHeader);
if (string.IsNullOrEmpty(schemaId))
{
return default(T);
}
var schemaRegistryData = this.schemaRegistryClient.GetSchema(schemaId).Value;
if (schemaRegistryData.Properties.Format != SchemaFormat.Json)
{
throw new JsonSerializationException($"Schema id {schemaId} is not of json format. the schema is a {schemaRegistryData.Properties.Format} schema.");
}
else if (string.IsNullOrEmpty(schemaRegistryData.Definition))
{
throw new JsonSerializationException($"Schema id {schemaId} has empty schema.");
}
// This implementation is actually based on the old Newtonsoft Json implementation which
// uses a older json-schema draft version.
// When we updated to use the latest Newtonsoft package/draft, this implementation will
// need to change using the new classes.
using (var stringReader = new StringReader(UTF8Encoding.UTF8.GetString(data)))
{
JsonTextReader reader = new JsonTextReader(stringReader);
try
{
JsonValidatingReader validatingReader = new JsonValidatingReader(reader);
validatingReader.Schema = JsonSchema.Parse(schemaRegistryData.Definition);
IList<string> messages = new List<string>();
validatingReader.ValidationEventHandler += (o, a) => messages.Add(a.Message);
T obj = serializer.Deserialize<T>(validatingReader);
if (messages.Count > 0)
{
throw new JsonSerializationException(string.Concat(messages));
}
return obj;
}
finally
{
reader.Close();
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,77 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Kafka.SchemaRegistry.Json
{
using System;
using System.IO;
using System.Threading;
using System.Text;
using global::Azure.Core;
using global::Azure.Data.SchemaRegistry;
using Confluent.Kafka;
using Newtonsoft.Json;
using Newtonsoft.Json.Schema;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
/// <summary>
/// Sample implementation of Kafka Json serializaer, wrapping Azure Schema Registry C# implementation.
///
/// This is meant for reference and sample purposes only, do not use this for production.
/// </summary>
/// <typeparam name="T"></typeparam>
public class KafkaJsonSerializer<T> : ISerializer<T>
{
readonly JsonSchemaGenerator schemaGenerator;
readonly SchemaRegistryClient schemaRegistryClient;
readonly string schemaGroup;
public KafkaJsonSerializer(string schemaRegistryUrl, TokenCredential credential, string schemaGroup)
{
this.schemaRegistryClient = new SchemaRegistryClient(schemaRegistryUrl, credential);
this.schemaGroup = schemaGroup;
this.schemaGenerator = new JsonSchemaGenerator();
}
public byte[] Serialize(T o, SerializationContext context)
{
if (o == null)
{
return null;
}
// This implementation is actually based on the old Newtonsoft Json implementation which
// uses a older json-schema draft version.
// When we updated to use the latest Newtonsoft package/draft, this implementation will
// need to change using the new classes.
var schema = schemaGenerator.Generate(typeof(T));
var jObject = JObject.FromObject(o);
if (!jObject.IsValid(schema))
{
throw new JsonSerializationException($"Unexpected parsing error when generating scheam from instance.");
}
var schemaJson = schema.ToString();
var schemaProperties = this.schemaRegistryClient.RegisterSchema(
this.schemaGroup,
typeof(T).FullName,
schemaJson,
SchemaFormat.Json).Value;
if (schemaProperties == null)
{
throw new JsonSerializationException("Schema registry client returned null response");
}
else if (schemaProperties.Format != SchemaFormat.Json)
{
throw new JsonSerializationException($"Schema registered was not json type, it was {schemaProperties.Format}");
}
context.Headers.Add("schemaId", UTF8Encoding.UTF8.GetBytes(schemaProperties.Id));
var json = jObject.ToString();
return UTF8Encoding.UTF8.GetBytes(json);
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<OutputType>Exe</OutputType>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<TargetFramework>net7.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.28.0" />
<PackageReference Include="Azure.Identity" Version="1.8.1" />
<PackageReference Include="Confluent.Kafka" Version="2.0.2" />
<PackageReference Include="Azure.Data.SchemaRegistry" Version="1.4.0-beta.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="7.0.0" />
</ItemGroup>
<ItemGroup>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,27 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace EventHubsForKafkaSample
{
using System;
using System.Configuration;
class Program
{
public static void Main(string[] args)
{
string brokerList = ConfigurationManager.AppSettings["EH_FQDN"];
string connectionString = ConfigurationManager.AppSettings["EH_JAAS_CONFIG"];
string topic = ConfigurationManager.AppSettings["EH_NAME"];
string consumerGroup = ConfigurationManager.AppSettings["KAFKA_GROUP"];
Console.WriteLine("Initializing Producer");
Worker.Producer(brokerList, connectionString, topic).Wait();
Console.WriteLine();
Console.WriteLine("Initializing Consumer");
Worker.Consumer(brokerList, connectionString, consumerGroup, topic);
Console.ReadKey();
}
}
}

Просмотреть файл

@ -0,0 +1,117 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
//Licensed under the MIT License.
//Licensed under the Apache License, Version 2.0
//
//Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Identity;
using com.azure.schemaregistry.samples;
using Confluent.Kafka;
using Microsoft.Azure.Kafka.SchemaRegistry.Json;
namespace EventHubsForKafkaSample
{
class Worker
{
static readonly string schemaRegistryUrl = ConfigurationManager.AppSettings["SCHEMA_REGISTRY_URL"];
static readonly string schemaGroup = ConfigurationManager.AppSettings["SCHEMA_GROUP"];
static readonly ClientSecretCredential credential = new ClientSecretCredential(
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_TENANT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_SECRET"]);
public static async Task Producer(string brokerList, string connStr, string topic)
{
try
{
var config = new ProducerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
//Debug = "security,broker,protocol" //Uncomment for librdkafka debugging information
};
var valueSerializer = new KafkaJsonSerializer<CustomerInvoice>(
schemaRegistryUrl,
credential,
schemaGroup);
using (var producer = new ProducerBuilder<string, CustomerInvoice>(config).SetKeySerializer(Serializers.Utf8).SetValueSerializer(valueSerializer).Build())
{
for (int x = 0; x < 10; x++)
{
var invoice = new CustomerInvoice()
{
InvoiceId = "something",
MerchantId = "arthur",
TransactionValueUsd = 100,
UserId = "alice"
};
var deliveryReport = await producer.ProduceAsync(topic, new Message<string, CustomerInvoice> { Key = null, Value = invoice });
}
}
}
catch (Exception e)
{
Console.WriteLine(string.Format("Exception Occurred - {0}", e.Message));
}
}
public static void Consumer(string brokerList, string connStr, string consumergroup, string topic)
{
var config = new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000, //this corresponds to the Consumer config `request.timeout.ms`
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connStr,
GroupId = consumergroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0", //Event Hubs for Kafka Ecosystems supports Kafka v1.0+, a fallback to an older API will fail
//Debug = "security,broker,protocol" //Uncomment for librdkafka debugging information
};
var valueDeserializer = new KafkaJsonDeserializer<CustomerInvoice>(schemaRegistryUrl, credential);
using (var consumer = new ConsumerBuilder<string, CustomerInvoice>(config).SetKeyDeserializer(Deserializers.Utf8).SetValueDeserializer(valueDeserializer).Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
consumer.Subscribe(topic);
Console.WriteLine("Consuming messages from topic: " + topic + ", broker(s): " + brokerList);
while (true)
{
try
{
var msg = consumer.Consume(cts.Token);
Console.WriteLine($"Received: '{msg.Message.Value.InvoiceId}'");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
catch (Exception e)
{
Console.WriteLine($"Error: {e.Message}");
}
}
}
}
}
}