Add factory method for parsers, clean up tests
This commit is contained in:
Родитель
93a0b4a2d5
Коммит
1b84ef3c7a
|
@ -34,61 +34,72 @@ namespace Kafka.Client.Tests
|
|||
public class MessageTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Ensure that the bytes returned from the message are in valid kafka sequence.
|
||||
/// Ensure that the bytes returned from the message are in valid kafka sequence for v0 messages.
|
||||
/// </summary>
|
||||
[TestMethod]
|
||||
[TestCategory(TestCategories.BVT)]
|
||||
public void GetBytesValidSequence()
|
||||
public void GetBytesValidSequenceV0Message()
|
||||
{
|
||||
for (var i = 0; i < 2; i++)
|
||||
RunBytesValidSequenceTest(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ensure that the bytes returned from the message are in valid kafka sequence for v1 messages.
|
||||
/// </summary>
|
||||
[TestMethod]
|
||||
[TestCategory(TestCategories.BVT)]
|
||||
public void GetBytesValidSequenceV1Message()
|
||||
{
|
||||
RunBytesValidSequenceTest(true);
|
||||
}
|
||||
|
||||
private void RunBytesValidSequenceTest(bool includeTimestampInMessage)
|
||||
{
|
||||
var payload = Encoding.UTF8.GetBytes("kafka");
|
||||
Message message;
|
||||
|
||||
if (includeTimestampInMessage)
|
||||
{
|
||||
var useV0Message = i == 1;
|
||||
var payload = Encoding.UTF8.GetBytes("kafka");
|
||||
Message message;
|
||||
message = new Message(payload, CompressionCodecs.NoCompressionCodec);
|
||||
}
|
||||
else
|
||||
{
|
||||
message = new Message(123L, TimestampTypes.CreateTime, payload, CompressionCodecs.NoCompressionCodec);
|
||||
}
|
||||
|
||||
if (useV0Message)
|
||||
MemoryStream ms = new MemoryStream();
|
||||
message.WriteTo(ms);
|
||||
|
||||
Assert.AreEqual(message.Magic, includeTimestampInMessage ? 0 : 1);
|
||||
Assert.AreEqual(message.Size, ms.Length);
|
||||
|
||||
var crc = Crc32Hasher.ComputeCrcUint32(ms.GetBuffer(), 4, (int) (ms.Length - 4));
|
||||
|
||||
// first 4 bytes = the crc
|
||||
using (var reader = new KafkaBinaryReader(ms))
|
||||
{
|
||||
Assert.AreEqual(crc, reader.ReadUInt32());
|
||||
|
||||
// magic
|
||||
Assert.AreEqual(message.Magic, reader.ReadByte());
|
||||
|
||||
// attributes
|
||||
Assert.AreEqual((byte) 0, reader.ReadByte());
|
||||
|
||||
if (!includeTimestampInMessage)
|
||||
{
|
||||
message = new Message(payload, CompressionCodecs.NoCompressionCodec);
|
||||
}
|
||||
else
|
||||
{
|
||||
message = new Message(123L, TimestampTypes.CreateTime, payload, CompressionCodecs.NoCompressionCodec);
|
||||
// timestamp
|
||||
Assert.AreEqual(123L, reader.ReadInt64());
|
||||
}
|
||||
|
||||
MemoryStream ms = new MemoryStream();
|
||||
message.WriteTo(ms);
|
||||
// key size
|
||||
Assert.AreEqual(-1, reader.ReadInt32());
|
||||
|
||||
Assert.AreEqual(message.Magic, useV0Message ? 0 : 1);
|
||||
Assert.AreEqual(message.Size, ms.Length);
|
||||
// payload size
|
||||
Assert.AreEqual(payload.Length, reader.ReadInt32());
|
||||
|
||||
var crc = Crc32Hasher.ComputeCrcUint32(ms.GetBuffer(), 4, (int) (ms.Length - 4));
|
||||
|
||||
// first 4 bytes = the crc
|
||||
using (var reader = new KafkaBinaryReader(ms))
|
||||
{
|
||||
Assert.AreEqual(crc, reader.ReadUInt32());
|
||||
|
||||
// magic
|
||||
Assert.AreEqual(message.Magic, reader.ReadByte());
|
||||
|
||||
// attributes
|
||||
Assert.AreEqual((byte) 0, reader.ReadByte());
|
||||
|
||||
if (!useV0Message)
|
||||
{
|
||||
// timestamp
|
||||
Assert.AreEqual(123L, reader.ReadInt64());
|
||||
}
|
||||
|
||||
// key size
|
||||
Assert.AreEqual(-1, reader.ReadInt32());
|
||||
|
||||
// payload size
|
||||
Assert.AreEqual(payload.Length, reader.ReadInt32());
|
||||
|
||||
// remaining bytes = the payload
|
||||
payload.SequenceEqual(reader.ReadBytes(10)).Should().BeTrue();
|
||||
}
|
||||
// remaining bytes = the payload
|
||||
payload.SequenceEqual(reader.ReadBytes(10)).Should().BeTrue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
|
||||
|
||||
using Kafka.Client.Requests;
|
||||
|
||||
namespace Kafka.Client.Tests.Response
|
||||
{
|
||||
using FluentAssertions;
|
||||
|
@ -24,7 +26,7 @@ namespace Kafka.Client.Tests.Response
|
|||
var stream = new MemoryStream();
|
||||
WriteTestFetchResponse(stream, 0);
|
||||
var reader = new KafkaBinaryReader(stream);
|
||||
var response = new FetchResponse.Parser(0).ParseFrom(reader);
|
||||
var response = FetchResponse.ParserForVersion(0).ParseFrom(reader);
|
||||
response.ThrottleTime.ShouldBeEquivalentTo(0);
|
||||
var set = response.MessageSet("topic1", 111);
|
||||
set.Should().NotBeNull();
|
||||
|
@ -40,7 +42,7 @@ namespace Kafka.Client.Tests.Response
|
|||
var stream = new MemoryStream();
|
||||
WriteTestFetchResponse(stream, 1);
|
||||
var reader = new KafkaBinaryReader(stream);
|
||||
var response = new FetchResponse.Parser(1).ParseFrom(reader);
|
||||
var response = FetchResponse.ParserForVersion(1).ParseFrom(reader);
|
||||
response.ThrottleTime.ShouldBeEquivalentTo(456);
|
||||
var set = response.MessageSet("topic1", 111);
|
||||
set.Should().NotBeNull();
|
||||
|
@ -54,7 +56,7 @@ namespace Kafka.Client.Tests.Response
|
|||
var writer = new KafkaBinaryWriter(stream);
|
||||
writer.Write(1);
|
||||
writer.Write(123); // correlation id
|
||||
if (versionId > 0)
|
||||
if (versionId > 0) // throttle time
|
||||
{
|
||||
writer.Write(456);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ namespace Kafka.Client.Tests.Response
|
|||
writer.Write(111L); // offset
|
||||
stream.Seek(0, SeekOrigin.Begin);
|
||||
var reader = new KafkaBinaryReader(stream);
|
||||
var response = new ProducerResponse.Parser(1).ParseFrom(reader);
|
||||
var response = ProducerResponse.ParserForVersion(1).ParseFrom(reader);
|
||||
response.CorrelationId.Should().Be(123);
|
||||
response.Statuses.Count.Should().Be(1);
|
||||
var info = response.Statuses[new TopicAndPartition("topic", 999)];
|
||||
|
|
|
@ -111,7 +111,7 @@ namespace Kafka.Client
|
|||
{
|
||||
this.EnsuresNotDisposed();
|
||||
Guard.NotNull(request, "request");
|
||||
return this.Handle(request.RequestBuffer.GetBuffer(), new ProducerResponse.Parser(request.VersionId), request.RequiredAcks != 0);
|
||||
return this.Handle(request.RequestBuffer.GetBuffer(), ProducerResponse.ParserForVersion(request.VersionId), request.RequiredAcks != 0);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -139,7 +139,7 @@ namespace Kafka.Client
|
|||
{
|
||||
this.EnsuresNotDisposed();
|
||||
Guard.NotNull(request, "request");
|
||||
return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser(request.VersionId));
|
||||
return this.Handle(request.RequestBuffer.GetBuffer(), FetchResponse.ParserForVersion(request.VersionId));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -92,9 +92,14 @@ namespace Kafka.Client.Responses
|
|||
return new PartitionData(partition, new BufferedMessageSet(Enumerable.Empty<Message>(), partition));
|
||||
}
|
||||
|
||||
public static Parser ParserForVersion(int versionId)
|
||||
{
|
||||
return new Parser(versionId);
|
||||
}
|
||||
|
||||
public class Parser : IResponseParser<FetchResponse>
|
||||
{
|
||||
private int versionId;
|
||||
private readonly int versionId;
|
||||
|
||||
public Parser(int versionId)
|
||||
{
|
||||
|
|
|
@ -50,9 +50,14 @@ namespace Kafka.Client.Responses
|
|||
public int CorrelationId { get; set; }
|
||||
public Dictionary<TopicAndPartition, ProducerResponseStatus> Statuses { get; set; }
|
||||
|
||||
public static Parser ParserForVersion(int versionId)
|
||||
{
|
||||
return new Parser(versionId);
|
||||
}
|
||||
|
||||
public class Parser : IResponseParser<ProducerResponse>
|
||||
{
|
||||
private int versionId;
|
||||
private readonly int versionId;
|
||||
|
||||
public Parser(int versionId)
|
||||
{
|
||||
|
|
Загрузка…
Ссылка в новой задаче