[#566] Support custom message format (no custom encoding)

This commit is contained in:
Xin Chen 2023-04-30 20:40:57 -07:00
Родитель c7596e6f03
Коммит ff3ea9ea46
31 изменённых файлов: 882 добавлений и 60 удалений

Просмотреть файл

@ -49,6 +49,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>

Просмотреть файл

@ -53,6 +53,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>

Просмотреть файл

@ -54,6 +54,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>

Просмотреть файл

@ -95,6 +95,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>

Просмотреть файл

@ -94,6 +94,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>

Просмотреть файл

@ -28,3 +28,30 @@ value: a described string: descriptor=symbol(“apache.org:selector-filter:strin
The offset filter is perferred as it is more performant than the enqueued-time filter. The enqueued-time filter is for rare cases when you lose the checkpoint data and have to go back a certain period of time in history. The following special offsets are defined.
'-1': beginning of the event stream.
'@latest': end of the even stream, in other words, all new events after the link is attached.
## Batching in message sender
Azure Event Hubs supports an extended message format (0x80013700) which allows a sender to pack multiple messages into one AMQP message.
It is intended to help applications publish messages more efficiently, especially with small messages and high-latency networks.
The envelop message is a standard AMQP 1.0 message with multiple Data sections, each of which contains one encoded payload message in its
binary value. On the service side, the payload messages are extracted and delivered to receivers individually.
The `MessageBatch` class in the test project illustrates how such batch messages can be created.
```
public class MessageBatch : Message
{
public const uint BatchFormat = 0x80013700;
public static MessageBatch Create<T>(IEnumerable<T> objects)
{
DataList dataList = new DataList();
foreach (var obj in objects)
{
ByteBuffer buffer = new ByteBuffer(1024, true);
var section = new AmqpValue<T>(obj);
AmqpSerializer.Serialize(buffer, section);
dataList.Add(new Data() { Buffer = buffer });
}
return new MessageBatch() { Format = BatchFormat, BodySection = dataList };
}
}
```

Просмотреть файл

@ -83,6 +83,9 @@
<Compile Include="..\..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>

Просмотреть файл

@ -49,6 +49,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>

Просмотреть файл

@ -73,6 +73,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>

Просмотреть файл

@ -73,6 +73,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>

Просмотреть файл

@ -211,7 +211,7 @@ namespace Amqp
/// <summary>
/// Sets the read position.
/// </summary>
/// <param name="seekPosition">Position to set.</param>
/// <param name="seekPosition">The position relative to <see cref="Offset"/> of the buffer.</param>
public void Seek(int seekPosition)
{
Fx.Assert(seekPosition >= 0, "seekPosition must not be negative.");

Просмотреть файл

@ -67,6 +67,11 @@ namespace Amqp.Framing
set;
}
internal int Length
{
get { return this.Buffer == null ? 0 : this.Buffer.Length; }
}
internal override void EncodeValue(ByteBuffer buffer)
{
if (this.Buffer != null)
@ -93,6 +98,11 @@ namespace Amqp.Framing
set { this.binary = value; }
}
internal int Length
{
get { return this.Binary == null ? 0 : this.Binary.Length; }
}
internal override void EncodeValue(ByteBuffer buffer)
{
Encoder.WriteBinary(buffer, this.binary, true);

138
src/Framing/DataList.cs Normal file
Просмотреть файл

@ -0,0 +1,138 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Framing
{
using System;
using Amqp.Types;
/// <summary>
/// Represents one or more <see cref="Data"/> sections, typically used as a message body.
/// </summary>
/// <remarks>As a message body, the list is encoded as continuous Data sections without
/// the list encoding preamble (format code, size and count). If the list is empty, it is
/// equivalent to one Data section with empty binary data.</remarks>
public sealed class DataList : RestrictedDescribed
{
Data[] array;
int count;
/// <summary>
/// Initializes a Data object.
/// </summary>
public DataList()
: base(Codec.Data)
{
}
/// <summary>
/// Gets the number of elements.
/// </summary>
public int Count
{
get { return this.count; }
}
/// <summary>
/// Gets the <see cref="Data"/> element at index. Caller should check <see cref="Count"/>
/// before calling this method to ensure that index is valid.
/// </summary>
/// <param name="index">The zero-based index.</param>
/// <returns>The <see cref="Data"/> element at index.</returns>
public Data this[int index]
{
get { return this.array[index]; }
}
/// <summary>
/// Adds a <see cref="Data"/> section.
/// </summary>
/// <param name="data"></param>
public void Add(Data data)
{
if (this.array == null)
{
this.array = new Data[4];
}
else if (this.count == this.array.Length)
{
var temp = new Data[this.count * 2];
Array.Copy(this.array, temp, this.count);
this.array = temp;
}
this.array[this.count++] = data;
}
/// <inheritdoc cref="Object.GetHashCode()" />
public override int GetHashCode()
{
return base.GetHashCode();
}
/// <inheritdoc cref="Object.Equals(object)" />
public override bool Equals(object obj)
{
var data = obj as Data;
if (data != null)
{
if (data.Length == 0 && this.count == 0)
{
return true;
}
}
return base.Equals(obj);
}
internal Data[] ToArray()
{
if (this.array.Length == this.count)
{
return this.array;
}
var copy = new Data[this.count];
Array.Copy(this.array, copy, this.count);
return copy;
}
internal override void EncodeValue(ByteBuffer buffer)
{
if (this.count == 0)
{
// Encode this as an empty binary.
AmqpBitConverter.WriteUByte(buffer, FormatCode.Binary8);
AmqpBitConverter.WriteUByte(buffer, 0);
}
else
{
this.array[0].EncodeValue(buffer);
for (int i = 1; i < this.count; i++)
{
this.array[i].Encode(buffer);
}
}
}
internal override void DecodeValue(ByteBuffer buffer)
{
// Should never be called directly.
throw new InvalidOperationException();
}
}
}

Просмотреть файл

@ -47,7 +47,7 @@ namespace Amqp.Listener
// receive
bool autoRestore;
int restored;
Delivery deliveryCurrent;
MessageDelivery deliveryCurrent;
Action<ListenerLink, Message, DeliveryState, object> onMessage;
/// <summary>
@ -435,23 +435,21 @@ namespace Amqp.Listener
{
if (delivery != null)
{
this.deliveryCurrent = new MessageDelivery(delivery, transfer.MessageFormat);
buffer.AddReference();
delivery.Buffer = buffer;
this.deliveryCount++;
}
else
{
delivery = this.deliveryCurrent;
delivery = this.deliveryCurrent.Delivery;
AmqpBitConverter.WriteBytes(delivery.Buffer, buffer.Buffer, buffer.Offset, buffer.Length);
}
if (!transfer.More)
{
this.DeliverMessage(delivery);
}
else
{
this.deliveryCurrent = delivery;
this.DeliverMessage(this.deliveryCurrent);
this.deliveryCurrent = MessageDelivery.None;
}
}
@ -489,10 +487,13 @@ namespace Amqp.Listener
}
}
void DeliverMessage(Delivery delivery)
void DeliverMessage(MessageDelivery messageDelivery)
{
var container = ((ListenerConnection)this.Session.Connection).Listener.Container;
delivery.Message = container.CreateMessage(delivery.Buffer);
Delivery delivery = messageDelivery.Delivery;
var message = container.CreateMessage(delivery.Buffer);
message.Format = messageDelivery.MessageFormat;
delivery.Message = message;
IHandler handler = this.Session.Connection.Handler;
if (handler != null && handler.CanHandle(EventId.SendDelivery))

Просмотреть файл

@ -88,6 +88,12 @@ namespace Amqp
/// Gets the object from the body. The returned value depends on the type of the body section.
/// Use the BodySection field if the entire section is needed.
/// </summary>
/// <remarks>Returns null if body section is null; otherwise one of the following,
/// * A value contained in a <see cref="AmqpValue"/> section.
/// * A list of objects contained in a <see cref="AmqpSequence"/> section.
/// * A byte[] object contained in a signle <see cref="Data"/> section.
/// * A Data[] representing multiple <see cref="Data"/> sections.
/// </remarks>
public object Body
{
get
@ -102,6 +108,12 @@ namespace Amqp
}
else if (this.BodySection.Descriptor.Code == Codec.Data.Code)
{
var dataList = this.BodySection as DataList;
if (dataList != null)
{
return dataList.ToArray();
}
return ((Data)this.BodySection).Binary;
}
else if (this.BodySection.Descriptor.Code == Codec.AmqpSequence.Code)
@ -115,6 +127,18 @@ namespace Amqp
}
}
/// <summary>
/// Gets or sets the format of the message. Warning: setting a non-zero value may cause
/// inter-operability issues with other standard 1.0 implementations.
/// </summary>
/// <remarks>The custom format MUST use the same encoding layout as the standard message.
/// </remarks>
public uint Format
{
get;
set;
}
/// <summary>
/// Gets the delivery tag associated with the message.
/// </summary>
@ -150,6 +174,7 @@ namespace Amqp
{
Message message = new Message();
DataList dataList = null;
while (buffer.Length > 0)
{
var described = (RestrictedDescribed)Codec.Decode(buffer);
@ -173,8 +198,25 @@ namespace Amqp
{
message.ApplicationProperties = (ApplicationProperties)described;
}
else if (described.Descriptor.Code == Codec.Data.Code)
{
if (message.BodySection == null)
{
message.BodySection = described;
}
else
{
if (dataList == null)
{
dataList = new DataList();
dataList.Add((Data)message.BodySection);
message.BodySection = dataList;
}
dataList.Add((Data)described);
}
}
else if (described.Descriptor.Code == Codec.AmqpValue.Code ||
described.Descriptor.Code == Codec.Data.Code ||
described.Descriptor.Code == Codec.AmqpSequence.Code)
{
message.BodySection = described;
@ -200,7 +242,7 @@ namespace Amqp
/// <returns>A <see cref="MessageDelivery"/> object, or null if delivery has not happened yet.</returns>
public MessageDelivery GetDelivery()
{
return this.Delivery == null ? MessageDelivery.None : new MessageDelivery(this.Delivery);
return this.Delivery == null ? MessageDelivery.None : new MessageDelivery(this.Delivery, this.Format);
}
/// <summary>

Просмотреть файл

@ -25,10 +25,12 @@ namespace Amqp
public struct MessageDelivery
{
readonly Delivery delivery;
readonly uint messageFormat;
internal MessageDelivery(Delivery delivery)
internal MessageDelivery(Delivery delivery, uint messageFormat)
{
this.delivery = delivery;
this.messageFormat = messageFormat;
}
/// <summary>
@ -63,6 +65,14 @@ namespace Amqp
get { return this.delivery.Link; }
}
/// <summary>
/// Gets the format of the message being delivered.
/// </summary>
internal uint MessageFormat
{
get { return this.messageFormat; }
}
internal Delivery Delivery
{
get { return this.delivery; }

Просмотреть файл

@ -44,7 +44,7 @@ namespace Amqp
// received messages queue
LinkedList receivedMessages;
Delivery deliveryCurrent;
MessageDelivery deliveryCurrent;
// pending receivers
LinkedList waiterList;
@ -372,11 +372,12 @@ namespace Amqp
{
if (delivery == null)
{
delivery = this.deliveryCurrent;
delivery = this.deliveryCurrent.Delivery;
AmqpBitConverter.WriteBytes(delivery.Buffer, buffer.Buffer, buffer.Offset, buffer.Length);
}
else
{
this.deliveryCurrent = new MessageDelivery(delivery, transfer.MessageFormat);
buffer.AddReference();
delivery.Buffer = buffer;
lock (this.ThisLock)
@ -387,8 +388,9 @@ namespace Amqp
if (!transfer.More)
{
this.deliveryCurrent = null;
delivery.Message = Message.Decode(delivery.Buffer);
delivery.Message.Format = this.deliveryCurrent.MessageFormat;
this.deliveryCurrent = MessageDelivery.None;
IHandler handler = this.Session.Connection.Handler;
if (handler != null && handler.CanHandle(EventId.ReceiveDelivery))
@ -438,10 +440,6 @@ namespace Amqp
Fx.Assert(callback != null, "callback must not be null now");
callback(this, delivery.Message);
}
else
{
this.deliveryCurrent = delivery;
}
}
internal override void OnAttach(uint remoteHandle, Attach attach)

Просмотреть файл

@ -746,7 +746,7 @@ namespace Amqp
transfer.DeliveryTag = delivery.Tag;
transfer.DeliveryId = delivery.DeliveryId;
transfer.State = delivery.State;
transfer.MessageFormat = 0;
transfer.MessageFormat = delivery.Message.Format;
transfer.Settled = delivery.Settled;
transfer.Batchable = delivery.Batchable;
}

Просмотреть файл

@ -1,3 +1,4 @@
using Amqp.Framing;
using System;
using System.Threading;
#if NETMF && !NANOFRAMEWORK_1_0
@ -23,6 +24,12 @@ namespace Test.Amqp
{
areEqual = expected == actual;
}
else if (expected is Type && actual is Type)
{
var t1 = (Type)expected;
var t2 = (Type)actual;
areEqual = t1 == t2 || (t1 == typeof(DataList) && t2 == typeof(Data));
}
else if (expected is byte[] && actual is byte[])
{
byte[] a = (byte[])expected;

Просмотреть файл

@ -1247,7 +1247,59 @@ namespace Test.Amqp
connection.Close();
}
[TestMethod]
public void ContainerHostMessageFormatTest()
{
string name = "ContainerHostMessageFormatTest";
var processor = new TestMessageProcessor();
this.host.RegisterMessageProcessor(name, processor);
var connection = new Connection(Address);
var session = new Session(connection);
var sender = new SenderLink(session, "send-link", name);
for (int i = 0; i < 10; i++)
{
var message = MessageBatch.Create(new[] { "test1", "test2", "test3" });
sender.Send(message);
}
connection.Close();
Assert.AreEqual(10, processor.Messages.Count);
for (int i = 0; i < 10; i++)
{
Message message = processor.Messages[i];
Assert.AreEqual(MessageBatch.BatchFormat, message.Format);
}
}
#if !NETFX40
[TestMethod]
public async Task ContainerHostMessageFormatAsyncTest()
{
string name = "ContainerHostMessageFormatAsyncTest";
var processor = new TestMessageProcessor();
this.host.RegisterMessageProcessor(name, processor);
var connection = await Connection.Factory.CreateAsync(Address);
var session = new Session(connection);
var sender = new SenderLink(session, "send-link", name);
for (int i = 0; i < 10; i++)
{
var message = MessageBatch.Create(new[] { "test1", "test2", "test3" });
await sender.SendAsync(message);
}
await connection.CloseAsync();
Assert.AreEqual(10, processor.Messages.Count);
for (int i = 0; i < 10; i++)
{
Message message = processor.Messages[i];
Assert.AreEqual(MessageBatch.BatchFormat, message.Format);
}
}
[TestMethod]
public void LinkProcessorAsyncTest()
{

Просмотреть файл

@ -0,0 +1,40 @@
// ------------------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
using Amqp;
using Amqp.Framing;
using Amqp.Serialization;
using System.Collections.Generic;
namespace Test.Amqp
{
public class MessageBatch : Message
{
public const uint BatchFormat = 0x80013700;
public static MessageBatch Create<T>(IEnumerable<T> objects)
{
DataList dataList = new DataList();
foreach (var obj in objects)
{
ByteBuffer buffer = new ByteBuffer(1024, true);
var section = new AmqpValue<T>(obj);
AmqpSerializer.Serialize(buffer, section);
dataList.Add(new Data() { Buffer = buffer });
}
return new MessageBatch() { Format = BatchFormat, BodySection = dataList };
}
}
}

Просмотреть файл

@ -0,0 +1,204 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Test.Common
{
using System;
using System.Diagnostics;
using System.Threading;
sealed class OperationTracker
{
// Index Interval (ms) Total
// 0 - 99: 1 [ 0 - 100)
// 100 - 199: 2 [100 - 300)
// 200 - 299: 4 [300 - 700)
// 300 - 399: 8 [700 - 1500)
// 400 - 499: 16 [1500 - 3100)
// 500 - 599: 32 [3100 - 6300)
// 600 - 699: 64 [6300 - 12700)
// 700 - 799: 128 [12700 - 25800)
struct Bucket
{
public readonly int Id;
public readonly int MinMsInclusive;
public readonly int MaxMsExclusive;
public Bucket(int id, int minMsInclusive, int maxMsInclusive)
{
this.Id = id;
this.MinMsInclusive = minMsInclusive;
this.MaxMsExclusive = maxMsInclusive;
}
public int Compare(int latencyMs)
{
if (latencyMs < this.MinMsInclusive)
{
return 1;
}
if (latencyMs >= this.MaxMsExclusive)
{
return -1;
}
return 0;
}
}
static Bucket[] buckets;
static OperationTracker()
{
buckets = new Bucket[16];
int totalMs = 0;
for (int i = 0; i < buckets.Length; i++)
{
int intervalMs = 1 << i;
int max = totalMs + 100 * intervalMs;
buckets[i] = new Bucket(i, totalMs, max);
totalMs = max;
}
}
static int GetIndex(int latencyMs)
{
int min = 0;
int max = buckets.Length - 1;
while (min <= max)
{
int middle = (min + max) / 2;
int comparison = buckets[middle].Compare(latencyMs);
if (comparison == 0)
{
int start = buckets[middle].Id * 100;
int interval = 1 << buckets[middle].Id;
int minMs = buckets[middle].MinMsInclusive;
return buckets[middle].Id * 100 + (latencyMs - minMs) / interval;
}
if (comparison < 0)
{
min = middle + 1;
}
else
{
max = middle - 1;
}
}
return -1;
}
static int GetInterval(int idx)
{
return 1 << (idx / 100);
}
readonly long[] samples;
long startTicks;
long totalCount;
long totalLatencyMs;
public OperationTracker(int maxLatencyMs)
{
int size = GetIndex(maxLatencyMs);
if (size <= 0)
{
throw new ArgumentException($"maxLatencyMs too large");
}
this.samples = new long[size];
this.startTicks = Stopwatch.GetTimestamp();
}
public void Track(int latencyMs)
{
Interlocked.Add(ref this.totalCount, 1);
Interlocked.Add(ref this.totalLatencyMs, latencyMs);
int index = GetIndex(latencyMs);
if (index >= 0)
{
Interlocked.Increment(ref this.samples[index]);
}
}
public string Report(bool reset)
{
double throughput = 0;
double avgMs = 0;
long p90 = 0;
long p95 = 0;
long p99 = 0;
long p999 = 0;
long total = this.totalCount;
long totalMs = this.totalLatencyMs;
long durationTicks = Stopwatch.GetTimestamp() - this.startTicks;
if (durationTicks > 0)
{
throughput = (double)total * TimeSpan.TicksPerSecond / durationTicks;
}
if (total > 0)
{
avgMs = (double)totalMs / total;
}
long count = 0;
for (int i = 0; i < this.samples.Length; i++)
{
count += this.samples[i];
int intervalMs = GetInterval(i);
if (p90 == 0 && count >= total * 0.9)
{
p90 = (i + 1) * intervalMs;
}
if (p95 == 0 && count >= total * 0.95)
{
p95 = (i + 1) * intervalMs;
}
if (p99 == 0 && count >= total * 0.99)
{
p99 = (i + 1) * intervalMs;
}
if (p999 == 0 && count >= total * 0.999)
{
p999 = (i + 1) * intervalMs;
}
}
if (reset)
{
this.startTicks = Stopwatch.GetTimestamp();
this.totalCount = 0;
this.totalLatencyMs = 0;
for (int i = 0; i < this.samples.Length; i++)
{
this.samples[i] = 0;
}
}
return $"Throughput:{throughput:F2} Latency:Avg={avgMs:F2} P90={p90} P95={p95} P99={p99} P99.9={p999}";
}
}
}

Просмотреть файл

@ -1909,5 +1909,99 @@ namespace Test.Amqp
validator(events);
}).Unwrap().GetAwaiter().GetResult();
}
[TestMethod]
public void MessageFormatSendTest()
{
string testName = "MessageFormatSendTest";
uint format = uint.MaxValue;
this.testListener.RegisterTarget(TestPoint.Transfer, (stream, channel, fields) =>
{
format = (uint)fields[3];
return TestOutcome.Continue;
});
var connection = new Connection(this.address);
var session = new Session(connection);
var sender = new SenderLink(session, "sender-" + testName, "any");
var message = MessageBatch.Create(new[] { "test1", "test2", "test3" });
sender.Send(message);
connection.Close();
Assert.AreEqual(message.Format, format);
}
[TestMethod]
public void MessageFormatReceiveTest()
{
string testName = "MessageFormatReceiveTest";
this.testListener.RegisterTarget(TestPoint.Flow, (stream, channel, fields) =>
{
TestListener.FRM(stream, 0x14UL, 0, channel, fields[4], 0u, BitConverter.GetBytes(0), 123u, false, false); // transfer
return TestOutcome.Stop;
});
Connection connection = new Connection(this.address);
Session session = new Session(connection);
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, "any");
Message message = receiver.Receive();
Assert.AreEqual(123u, message.Format);
receiver.Accept(message);
connection.Close();
}
[TestMethod]
public void ForwardMessageTest()
{
string testName = "ForwardMessageTest";
uint total = 0;
this.testListener.RegisterTarget(TestPoint.Flow, (stream, channel, fields) =>
{
uint current = total;
total = Math.Min(510u, (uint)fields[5] + (uint)fields[6]);
for (uint i = current; i < total; i++)
{
TestListener.FRM(stream, 0x14UL, 0, channel, fields[4], i, BitConverter.GetBytes(i), 0u, false, false); // transfer
}
return TestOutcome.Stop;
});
int port2 = port + 1;
var listener2 = new TestListener(new IPEndPoint(IPAddress.Any, port2));
listener2.Open();
try
{
Connection connection = new Connection(this.address);
Session session = new Session(connection);
ReceiverLink receiver = new ReceiverLink(session, "receiver-" + testName, "any");
Connection connection2 = new Connection(new Address("amqp://127.0.0.1:" + port2));
Session session2 = new Session(connection2);
SenderLink sender = new SenderLink(session2, "sender-" + testName, "any");
int count = 0;
var done = new ManualResetEvent(false);
receiver.Start(300, (r, m) =>
{
r.Accept(m);
sender.Send(m, (a, b, c, d) => { if (count++ >= 500) done.Set(); }, null);
});
done.WaitOne(10000);
Trace.WriteLine(TraceLevel.Information, "done: {0}", count);
connection.Close();
connection2.Close();
}
finally
{
listener2.Close();
}
}
}
}

Просмотреть файл

@ -29,6 +29,7 @@ namespace Listener.IContainer
public sealed class TestAmqpBroker : IContainer
{
public const uint BatchFormat = 0x80013700;
readonly X509Certificate2 certificate;
readonly Dictionary<string, TransportProvider> customTransports;
readonly Dictionary<string, TestQueue> queues;
@ -56,7 +57,7 @@ namespace Listener.IContainer
this.certificate = certValue == null ? null : GetCertificate(certValue);
string containerId = "TestAmqpBroker:" + Guid.NewGuid().ToString().Substring(0, 8);
string containerId = "AMQPNetLite-TestBroker-" + Guid.NewGuid().ToString().Substring(0, 8);
this.listeners = new ConnectionListener[endpoints.Count];
for (int i = 0; i < endpoints.Count; i++)
{
@ -251,7 +252,7 @@ namespace Listener.IContainer
public BrokerMessage(ByteBuffer buffer)
{
this.buffer = buffer;
this.messageOffset = buffer.Offset;
this.messageOffset = buffer.Capacity - buffer.Length - buffer.Size;
}
public ByteBuffer Buffer
@ -460,29 +461,105 @@ namespace Listener.IContainer
void Enqueue(BrokerMessage message)
{
// clone the message as the incoming one is associated with a delivery already
BrokerMessage clone = new BrokerMessage(message.Buffer);
Consumer consumer = null;
lock (this.syncRoot)
LinkedListNode<BrokerMessage> node = null;
if (message.Format == BatchFormat)
{
consumer = this.GetConsumerWithLock(null);
if (consumer == null)
var batch = Message.Decode(message.Buffer);
var dataList = batch.BodySection as DataList;
if (dataList != null)
{
clone.Node = this.messages.AddLast(clone);
for(int i = 0; i < dataList.Count; i++)
{
var msg = new BrokerMessage(dataList[i].Buffer);
lock (this.syncRoot)
{
msg.Node = this.messages.AddLast(msg);
if (node == null)
{
node = msg.Node;
}
}
}
}
else
{
if (!consumer.SettleOnSend)
var data = batch.BodySection as Data;
if (data != null)
{
clone.LockedBy = consumer;
clone.Node = this.messages.AddLast(clone);
var msg = new BrokerMessage(data.Buffer);
lock (this.syncRoot)
{
node = msg.Node = this.messages.AddLast(msg);
}
}
else
{
// Ignore it for now
return;
}
}
}
if (consumer != null)
else
{
consumer.Signal(clone);
// clone the message as the incoming one is associated with a delivery already
BrokerMessage clone = new BrokerMessage(message.Buffer);
lock (this.syncRoot)
{
node = clone.Node = this.messages.AddLast(clone);
}
}
this.Deliver(node);
}
void Deliver(LinkedListNode<BrokerMessage> node)
{
Consumer consumer = null;
BrokerMessage message = null;
while (node != null)
{
lock (this.syncRoot)
{
if (consumer == null || consumer.Credit == 0)
{
consumer = this.GetConsumerWithLock(null);
if (consumer == null)
{
return;
}
}
if (node.List == null)
{
node = this.messages.First;
continue;
}
LinkedListNode<BrokerMessage> next = node.Next;
message = node.Value;
if (message.LockedBy == null)
{
if (consumer.SettleOnSend)
{
this.messages.Remove(node);
}
else
{
message.LockedBy = consumer;
}
}
else
{
message = null;
}
node = next;
}
if (consumer != null && message != null)
{
consumer.Signal(message);
}
}
}

Просмотреть файл

@ -42,6 +42,9 @@
<Compile Include="..\Common\Extensions.cs">
<Link>Extensions.cs</Link>
</Compile>
<Compile Include="..\Common\OperationTracker.cs">
<Link>OperationTracker.cs</Link>
</Compile>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>

Просмотреть файл

@ -24,9 +24,9 @@ namespace PerfTest
using Amqp;
using Amqp.Framing;
using Amqp.Listener;
using Amqp.Types;
using Test.Common;
using TestExtensions = Test.Common.Extensions;
using Stopwatch = System.Diagnostics.Stopwatch;
class Program
{
@ -95,6 +95,16 @@ namespace PerfTest
Console.WriteLine(Arguments.PrintArguments(typeof(PerfArguments)));
}
static int GetLatencyMs(Message message)
{
if (message.Properties != null && message.Properties.CreationTime.Ticks > 0)
{
return (int)((Stopwatch.GetTimestamp() - message.Properties.CreationTime.Ticks) / TimeSpan.TicksPerMillisecond);
}
return -1;
}
abstract class Role
{
protected IBufferManager bufferManager;
@ -104,7 +114,7 @@ namespace PerfTest
long completed;
long progress;
ManualResetEvent completedEvent;
System.Diagnostics.Stopwatch stopwatch;
OperationTracker tracker;
public Role(PerfArguments perfArgs)
{
@ -113,6 +123,7 @@ namespace PerfTest
this.count = perfArgs.Count;
this.progress = perfArgs.Progress;
this.completedEvent = new ManualResetEvent(false);
this.tracker = new OperationTracker(20000);
}
protected PerfArguments Args
@ -132,6 +143,7 @@ namespace PerfTest
var factory = new ConnectionFactory();
factory.BufferManager = this.bufferManager;
factory.AMQP.MaxFrameSize = this.perfArgs.MaxFrameSize;
factory.AMQP.HostName = this.perfArgs.Host;
if (address.Scheme.Equals("amqps", StringComparison.OrdinalIgnoreCase))
{
factory.SSL.RemoteCertificateValidationCallback = (a, b, c, d) => true;
@ -145,29 +157,17 @@ namespace PerfTest
return Interlocked.Increment(ref this.started) <= this.count || this.count == 0;
}
protected bool OnComplete()
protected bool OnComplete(int latencyMs)
{
this.tracker.Track(latencyMs);
long done = Interlocked.Increment(ref this.completed);
if (this.progress > 0 && done % this.progress == 0)
{
long throughput;
if (this.stopwatch == null)
{
this.stopwatch = System.Diagnostics.Stopwatch.StartNew();
throughput = -1;
}
else
{
long ms = this.stopwatch.ElapsedMilliseconds;
throughput = ms > 0 ? done * 1000L / ms : -1;
}
Trace.WriteLine(TraceLevel.Information, "completed {0} throughput {1} msg/s", done, throughput);
Trace.WriteLine(TraceLevel.Information, this.tracker.Report(reset: true));
}
if (this.count > 0 && done >= this.count)
{
this.stopwatch.Stop();
this.completedEvent.Set();
return false;
}
@ -223,7 +223,7 @@ namespace PerfTest
thisPtr.bufferManager.ReturnBuffer(new ArraySegment<byte>(buffer.Buffer, buffer.Offset, buffer.Capacity));
}
if (thisPtr.OnComplete())
if (thisPtr.OnComplete(GetLatencyMs(message)))
{
Message msg = thisPtr.CreateMessage();
sender.Send(msg, onOutcome, state);
@ -275,7 +275,7 @@ namespace PerfTest
}
Message message = new Message();
message.Properties = new Properties() { MessageId = "msg" };
message.Properties = new Properties() { MessageId = "msg", CreationTime = new DateTime(Stopwatch.GetTimestamp(), DateTimeKind.Utc) };
message.BodySection = new Data()
{
Buffer = new ByteBuffer(segment.Array, segment.Offset, this.bodySize, segment.Count)
@ -326,7 +326,7 @@ namespace PerfTest
{
r.Accept(m);
m.Dispose();
this.OnComplete();
this.OnComplete(GetLatencyMs(m));
});
this.Wait();
@ -365,7 +365,7 @@ namespace PerfTest
void SendRequest(SenderLink sender, string replyTo)
{
Message message = new Message();
message.Properties = new Properties() { ReplyTo = replyTo };
message.Properties = new Properties() { ReplyTo = replyTo, CreationTime = new DateTime(Stopwatch.GetTimestamp(), DateTimeKind.Utc) };
message.Properties.SetCorrelationId(Guid.NewGuid());
message.BodySection = new Data() { Binary = this.GetBuffer() };
sender.Send(message, null, null);
@ -402,7 +402,7 @@ namespace PerfTest
{
r.Accept(m);
m.Dispose();
if (this.OnComplete())
if (this.OnComplete(GetLatencyMs(m)))
{
this.SendRequest(sender, clientId);
}
@ -456,10 +456,11 @@ namespace PerfTest
void IRequestProcessor.Process(RequestContext requestContext)
{
Message response = new Message("request processed");
response.Properties = new Properties() { CreationTime = requestContext.Message.Properties.CreationTime };
response.ApplicationProperties = new ApplicationProperties();
response.ApplicationProperties["status-code"] = 200;
requestContext.Complete(response);
this.OnComplete();
this.OnComplete(GetLatencyMs(requestContext.Message));
}
}
@ -503,7 +504,7 @@ namespace PerfTest
void IMessageProcessor.Process(MessageContext messageContext)
{
messageContext.Complete();
this.OnComplete();
this.OnComplete(-1);
}
}
@ -535,6 +536,13 @@ namespace PerfTest
protected set;
}
[Argument(Name = "host", Shortcut = "h", Description = "Set open.hostname")]
public string Host
{
get;
protected set;
}
[Argument(Name = "node", Shortcut = "n", Description = "name of the AMQP node", Default = "q1")]
public string Node
{

Просмотреть файл

@ -580,6 +580,10 @@ namespace Test.Amqp
{
EnsureEqual(((Data)x).Binary, ((Data)y).Binary);
}
else if (x.GetType() == typeof(DataList))
{
Assert.AreEqual(x, y);
}
else if (x.GetType() == typeof(AmqpValue))
{
EnsureEqual(((AmqpValue)x).Value, ((AmqpValue)y).Value);

Просмотреть файл

@ -73,9 +73,15 @@
<Compile Include="..\Common\LinkTests.cs">
<Link>LinkTests.cs</Link>
</Compile>
<Compile Include="..\Common\MessageBatch.cs">
<Link>Common\MessageBatch.cs</Link>
</Compile>
<Compile Include="..\Common\ProtocolTests.cs">
<Link>ProtocolTests.cs</Link>
</Compile>
<Compile Include="..\Common\TestAmqpBroker.cs">
<Link>Common\TestAmqpBroker.cs</Link>
</Compile>
<Compile Include="..\Common\TestHandler.cs">
<Link>Common\TestHandler.cs</Link>
</Compile>
@ -125,6 +131,7 @@
<Link>WebSocketTests.cs</Link>
</Compile>
<Compile Include="AmqpCodecTests.cs" />
<Compile Include="TestBrokerTests.cs" />
<Compile Include="WellknownStringDecoderTests.cs" />
<Compile Include="TransactionTests.cs" />
<Compile Include="PerfTests.cs" />

Просмотреть файл

@ -0,0 +1,67 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
using Amqp;
using Listener.IContainer;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
namespace Test.Amqp
{
[TestClass]
public class TestBrokerTests
{
const string address = "amqp://guest:guest@localhost:15674";
static TestAmqpBroker broker;
[ClassInitialize]
public static void Initialize(TestContext context)
{
broker = new TestAmqpBroker(new[] { address }, null, null, null);
broker.Start();
}
[ClassCleanup]
public static void Cleanup()
{
broker.Stop();
}
[TestMethod]
public async Task MessageBatchTest()
{
string testName = "MessageBatchTest";
var connection = await Connection.Factory.CreateAsync(new Address(address));
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender", testName);
var batch = MessageBatch.Create(new[] { "data1", "data2", "data3", "data4" });
await sender.SendAsync(batch);
ReceiverLink receiver = new ReceiverLink(session, "receiver", testName);
for (int i = 0; i < 4; i++)
{
var m = await receiver.ReceiveAsync();
Assert.IsTrue(m != null, "Didn't receive message " + i);
receiver.Accept(m);
}
await connection.CloseAsync();
}
}
}

Просмотреть файл

@ -70,6 +70,9 @@
<Compile Include="..\Common\LinkTests.cs">
<Link>LinkTests.cs</Link>
</Compile>
<Compile Include="..\Common\MessageBatch.cs">
<Link>Common\MessageBatch.cs</Link>
</Compile>
<Compile Include="..\Common\TestHandler.cs">
<Link>Common\TestHandler.cs</Link>
</Compile>

Просмотреть файл

@ -62,6 +62,9 @@
<Compile Include="..\..\src\Net\TypeExtensions.cs">
<Link>Common\TypeExtensions.cs</Link>
</Compile>
<Compile Include="..\Common\MessageBatch.cs">
<Link>Common\MessageBatch.cs</Link>
</Compile>
<Compile Include="..\Common\TestHandler.cs">
<Link>Common\TestHandler.cs</Link>
</Compile>