.NET Serializer Update for Schema Id Conversion Change (#30)

* Update serializer vesion to support back compat

* Get lower case content type from header

* Change content-type header to lower case

* Change sample Kafka consumer to accept string keys

* Change sample Kafka producer to accept string keys

* Change async content-type header to lower case
This commit is contained in:
Nick Hardwick 2022-08-09 14:04:38 -07:00 коммит произвёл GitHub
Родитель 9c3b5d6d90
Коммит 77bfd0a6b1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 49 добавлений и 33 удалений

Просмотреть файл

@ -7,11 +7,11 @@
<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.11.0" />
<PackageReference Include="Azure.Core" Version="1.5.0" />
<PackageReference Include="Azure.Identity" Version="1.2.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.1" />
<PackageReference Include="Azure.Core" Version="1.25.0" />
<PackageReference Include="Azure.Identity" Version="1.6.0" />
<PackageReference Include="Confluent.Kafka" Version="1.9.0" />
<ProjectReference Include="..\src\Microsoft.Azure.Kafka.SchemaRegistry.Avro.csproj" />
<PackageReference Include="Azure.Data.SchemaRegistry" Version="1.0.0-beta.2" />
<PackageReference Include="Azure.Data.SchemaRegistry" Version="1.2.0" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="4.5.0" />
</ItemGroup>

Просмотреть файл

@ -48,7 +48,7 @@ namespace EventHubsForKafkaSample
schemaGroup,
autoRegisterSchemas: true);
using (var producer = new ProducerBuilder<long, CustomerInvoice>(config).SetKeySerializer(Serializers.Int64).SetValueSerializer(valueSerializer).Build())
using (var producer = new ProducerBuilder<string, CustomerInvoice>(config).SetKeySerializer(Serializers.Utf8).SetValueSerializer(valueSerializer).Build())
{
for (int x = 0; x < 10; x++)
{
@ -59,7 +59,7 @@ namespace EventHubsForKafkaSample
TransactionValueUsd = 100,
UserId = "alice"
};
var deliveryReport = await producer.ProduceAsync(topic, new Message<long, CustomerInvoice> { Key = DateTime.UtcNow.Ticks, Value = invoice });
var deliveryReport = await producer.ProduceAsync(topic, new Message<string, CustomerInvoice> { Key = null, Value = invoice });
}
}
}
@ -89,7 +89,7 @@ namespace EventHubsForKafkaSample
var valueDeserializer = new KafkaAvroDeserializer<CustomerInvoice>(schemaRegistryUrl, credential);
using (var consumer = new ConsumerBuilder<long, CustomerInvoice>(config).SetKeyDeserializer(Deserializers.Int64).SetValueDeserializer(valueDeserializer).Build())
using (var consumer = new ConsumerBuilder<string, CustomerInvoice>(config).SetKeyDeserializer(Deserializers.Utf8).SetValueDeserializer(valueDeserializer).Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
@ -103,7 +103,7 @@ namespace EventHubsForKafkaSample
try
{
var msg = consumer.Consume(cts.Token);
Console.WriteLine($"Received: '{msg.Value.InvoiceId}'");
Console.WriteLine($"Received: '{msg.Message.Value.InvoiceId}'");
}
catch (ConsumeException e)
{

Просмотреть файл

@ -8,6 +8,8 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using global::Azure;
using global::Azure.Core;
using global::Azure.Data.SchemaRegistry;
using Confluent.Kafka;
@ -19,16 +21,16 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
/// <typeparam name="T"></typeparam>
public class KafkaAvroAsyncSerializer<T> : IAsyncSerializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer serializer;
private readonly SchemaRegistryAvroSerializer serializer;
public KafkaAvroAsyncSerializer(string schemaRegistryUrl, TokenCredential credential, string schemaGroup, Boolean autoRegisterSchemas = false)
{
this.serializer = new SchemaRegistryAvroObjectSerializer(
this.serializer = new SchemaRegistryAvroSerializer(
new SchemaRegistryClient(
schemaRegistryUrl,
credential),
schemaGroup,
new SchemaRegistryAvroObjectSerializerOptions()
new SchemaRegistryAvroSerializerOptions()
{
AutoRegisterSchemas = autoRegisterSchemas
});
@ -41,11 +43,10 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
return null;
}
using (var stream = new MemoryStream())
{
await serializer.SerializeAsync(stream, o, typeof(T), CancellationToken.None);
return stream.ToArray();
}
BinaryContent content = await serializer.SerializeAsync<BinaryContent, T>(o);
var schemaIdBytes = Encoding.UTF8.GetBytes(content.ContentType.ToString());
context.Headers.Add("content-type", schemaIdBytes);
return content.Data.ToArray();
}
}
}

Просмотреть файл

@ -5,9 +5,9 @@
namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
{
using System;
using System.IO;
using System.Threading;
using System.Text;
using Confluent.Kafka;
using global::Azure;
using global::Azure.Core;
using global::Azure.Data.SchemaRegistry;
using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
@ -20,7 +20,7 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
/// <typeparam name="T"></typeparam>
public class KafkaAvroDeserializer<T> : IDeserializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer serializer;
private readonly SchemaRegistryAvroSerializer serializer;
/// <summary>
/// Constructor for KafkaAvroDeserializer.
@ -29,7 +29,7 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
/// <param name="credential"></param> TokenCredential implementation for OAuth2 authentication
public KafkaAvroDeserializer(string schemaRegistryUrl, TokenCredential credential)
{
this.serializer = new SchemaRegistryAvroObjectSerializer(new SchemaRegistryClient(schemaRegistryUrl, credential), "$default");
this.serializer = new SchemaRegistryAvroSerializer(new SchemaRegistryClient(schemaRegistryUrl, credential), "$default");
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
@ -39,7 +39,21 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
return default(T);
}
return (T) this.serializer.Deserialize(new MemoryStream(data.ToArray()), typeof(T), CancellationToken.None);
BinaryContent content = new BinaryContent
{
Data = new BinaryData(data.ToArray()),
};
if (context.Headers.TryGetLastBytes("content-type", out var headerBytes))
{
content.ContentType = Encoding.UTF8.GetString(headerBytes);
}
else
{
content.ContentType = string.Empty;
}
return serializer.Deserialize<T>(content);
}
}
}

Просмотреть файл

@ -7,6 +7,8 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
using System;
using System.IO;
using System.Threading;
using System.Text;
using global::Azure;
using global::Azure.Core;
using global::Azure.Data.SchemaRegistry;
using Confluent.Kafka;
@ -20,16 +22,16 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
/// <typeparam name="T"></typeparam>
public class KafkaAvroSerializer<T> : ISerializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer serializer;
private readonly SchemaRegistryAvroSerializer serializer;
public KafkaAvroSerializer(string schemaRegistryUrl, TokenCredential credential, string schemaGroup, Boolean autoRegisterSchemas = false)
{
this.serializer = new SchemaRegistryAvroObjectSerializer(
this.serializer = new SchemaRegistryAvroSerializer(
new SchemaRegistryClient(
schemaRegistryUrl,
credential),
schemaGroup,
new SchemaRegistryAvroObjectSerializerOptions()
new SchemaRegistryAvroSerializerOptions()
{
AutoRegisterSchemas = autoRegisterSchemas
});
@ -42,11 +44,10 @@ namespace Microsoft.Azure.Kafka.SchemaRegistry.Avro
return null;
}
using (var stream = new MemoryStream())
{
serializer.Serialize(stream, o, typeof(T), CancellationToken.None);
return stream.ToArray();
}
BinaryContent content = serializer.Serialize<BinaryContent, T>(o);
var schemaIdBytes = Encoding.UTF8.GetBytes(content.ContentType.ToString());
context.Headers.Add("content-type", schemaIdBytes);
return content.Data.ToArray();
}
}
}

Просмотреть файл

@ -5,10 +5,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.5.0" />
<PackageReference Include="Confluent.Kafka" Version="1.5.1" />
<PackageReference Include="Microsoft.Azure.Data.SchemaRegistry.ApacheAvro" Version="1.0.0-beta.1" />
<PackageReference Include="Azure.Data.SchemaRegistry" Version="1.0.0-beta.2" />
<PackageReference Include="Azure.Core" Version="1.25.0" />
<PackageReference Include="Confluent.Kafka" Version="1.9.0" />
<PackageReference Include="Microsoft.Azure.Data.SchemaRegistry.ApacheAvro" Version="1.0.0-beta.7" />
<PackageReference Include="Azure.Data.SchemaRegistry" Version="1.2.0" />
</ItemGroup>
</Project>