Fixes excessive exceptions from being thrown in case of graceful channel closure. (#152)

Motivation:
SocketException and ODE are being thrown when disconnecting gracefully.

Modifications:
- dismiss async read completion if it is done on inactive channel
- do not attempt sync-reading from a socket that is not Connected
- extra: fixing echo sample with better practices
- extra: ReadListener for more natural test scenarios
- extra: tweaked nbench spec to work in-loop

Result:
No excessive errors happen during graceful shutdown
This commit is contained in:
Max Gortman 2016-08-08 15:34:08 -07:00 коммит произвёл GitHub
Родитель a9850b4775
Коммит b2f10c28cb
18 изменённых файлов: 261 добавлений и 217 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -1,6 +1,7 @@
[Oo]bj/
[Bb]in/
TestResults/
PerfResults/
.nuget/
.fake/
_ReSharper.*/

Просмотреть файл

@ -33,6 +33,10 @@ namespace Echo.Client
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) => context.CloseAsync();
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
}

Просмотреть файл

@ -11,6 +11,7 @@ namespace Echo.Client
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Common.Internal.Logging;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
@ -47,23 +48,24 @@ namespace Echo.Client
if (cert != null)
{
pipeline.AddLast(new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
//pipeline.AddLast(new LoggingHandler("CLIENT"));
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast(new EchoClientHandler());
pipeline.AddLast("echo", new EchoClientHandler());
}));
IChannel bootstrapChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(EchoClientSettings.Host, EchoClientSettings.Port));
Console.ReadLine();
await bootstrapChannel.CloseAsync();
await clientChannel.CloseAsync();
}
finally
{
group.ShutdownGracefullyAsync().Wait(1000);
await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
eventListener.Dispose();
}
}

Просмотреть файл

@ -5,7 +5,6 @@ namespace Echo.Server
{
using System;
using System.Diagnostics.Tracing;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using DotNetty.Codecs;
@ -39,29 +38,32 @@ namespace Echo.Server
.Group(bossGroup, workerGroup)
.Channel<TcpServerSocketChannel>()
.Option(ChannelOption.SoBacklog, 100)
.Handler(new LoggingHandler(LogLevel.INFO))
.Handler(new LoggingHandler("SRV-LSTN"))
.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
if (tlsCertificate != null)
{
pipeline.AddLast(TlsHandler.Server(tlsCertificate));
pipeline.AddLast("tls", TlsHandler.Server(tlsCertificate));
}
pipeline.AddLast(new LengthFieldPrepender(2));
pipeline.AddLast(new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
//pipeline.AddLast(new LoggingHandler("SRV-CONN"));
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast(new EchoServerHandler());
pipeline.AddLast("echo", new EchoServerHandler());
}));
IChannel bootstrapChannel = await bootstrap.BindAsync(EchoServerSettings.Port);
IChannel boundChannel = await bootstrap.BindAsync(EchoServerSettings.Port);
Console.ReadLine();
await bootstrapChannel.CloseAsync();
await boundChannel.CloseAsync();
}
finally
{
Task.WaitAll(bossGroup.ShutdownGracefullyAsync(), workerGroup.ShutdownGracefullyAsync());
await Task.WhenAll(
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
eventListener.Dispose();
}
}

Просмотреть файл

@ -83,7 +83,10 @@ namespace DotNetty.Transport.Channels.Sockets
public override void FinishRead(SocketChannelAsyncOperation operation)
{
AbstractSocketByteChannel ch = this.Channel;
ch.ResetState(StateFlags.ReadScheduled);
if ((ch.ResetState(StateFlags.ReadScheduled) & StateFlags.Active) == 0)
{
return; // read was signaled as a result of channel closure
}
IChannelConfiguration config = ch.Configuration;
IChannelPipeline pipeline = ch.Pipeline;
IByteBufferAllocator allocator = config.Allocator;

Просмотреть файл

@ -108,7 +108,18 @@ namespace DotNetty.Transport.Channels.Sockets
protected void SetState(StateFlags stateToSet) => this.state |= stateToSet;
protected bool ResetState(StateFlags stateToReset)
/// <returns>state before modification</returns>
protected StateFlags ResetState(StateFlags stateToReset)
{
StateFlags oldState = this.state;
if ((oldState & stateToReset) != 0)
{
this.state = oldState & ~stateToReset;
}
return oldState;
}
protected bool TryResetState(StateFlags stateToReset)
{
StateFlags oldState = this.state;
if ((oldState & stateToReset) != 0)
@ -385,7 +396,7 @@ namespace DotNetty.Transport.Channels.Sockets
public void FinishWrite(SocketChannelAsyncOperation operation)
{
bool resetWritePending = this.Channel.ResetState(StateFlags.WriteScheduled);
bool resetWritePending = this.Channel.TryResetState(StateFlags.WriteScheduled);
Contract.Assert(resetWritePending);

Просмотреть файл

@ -39,7 +39,10 @@ namespace DotNetty.Transport.Channels.Sockets
Contract.Assert(this.channel.EventLoop.InEventLoop);
AbstractSocketMessageChannel ch = this.Channel;
ch.ResetState(StateFlags.ReadScheduled);
if ((ch.ResetState(StateFlags.ReadScheduled) & StateFlags.Active) == 0)
{
return; // read was signaled as a result of channel closure
}
IChannelConfiguration config = ch.Configuration;
IChannelPipeline pipeline = ch.Pipeline;

Просмотреть файл

@ -67,13 +67,14 @@ namespace DotNetty.Transport.Channels.Sockets
{
this.Socket.Bind(localAddress);
this.Socket.Listen(this.config.Backlog);
this.SetState(StateFlags.Active);
this.CacheLocalAddress();
}
protected override void DoClose()
{
if (this.ResetState(StateFlags.Open))
if (this.TryResetState(StateFlags.Open | StateFlags.Active))
{
this.Socket.Dispose();
}
@ -127,10 +128,13 @@ namespace DotNetty.Transport.Channels.Sockets
public override void FinishRead(SocketChannelAsyncOperation operation)
{
Contract.Requires(this.channel.EventLoop.InEventLoop);
Contract.Assert(this.channel.EventLoop.InEventLoop);
TcpServerSocketChannel ch = this.Channel;
ch.ResetState(StateFlags.ReadScheduled);
if ((ch.ResetState(StateFlags.ReadScheduled) & StateFlags.Active) == 0)
{
return; // read was signaled as a result of channel closure
}
IChannelConfiguration config = ch.Configuration;
IChannelPipeline pipeline = ch.Pipeline;
IRecvByteBufAllocatorHandle allocHandle = this.Channel.Unsafe.RecvBufAllocHandle;

Просмотреть файл

@ -164,12 +164,18 @@ namespace DotNetty.Transport.Channels.Sockets
protected override void DoClose()
{
base.DoClose();
if (this.ResetState(StateFlags.Open | StateFlags.Active))
try
{
this.Socket.Shutdown(SocketShutdown.Both);
this.Socket.Dispose();
if (this.TryResetState(StateFlags.Open | StateFlags.Active))
{
this.Socket.Shutdown(SocketShutdown.Both);
this.Socket.Dispose();
}
}
finally
{
base.DoClose();
}
}
protected override int DoReadBytes(IByteBuffer byteBuf)
@ -179,6 +185,11 @@ namespace DotNetty.Transport.Channels.Sockets
throw new NotImplementedException("Only IByteBuffer implementations backed by array are supported.");
}
if (!this.Socket.Connected)
{
return -1; // prevents ObjectDisposedException from being thrown in case connection has been lost in the meantime
}
SocketError errorCode;
int received = this.Socket.Receive(byteBuf.Array, byteBuf.ArrayOffset + byteBuf.WriterIndex, byteBuf.WritableBytes, SocketFlags.None, out errorCode);

Просмотреть файл

@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Tests.Common
{
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Transport.Channels;
public static class ChannelExtensions
{
public static Task WriteAndFlushManyAsync(this IChannel channel, params object[] messages)
{
var list = new List<Task>();
foreach (object m in messages)
{
list.Add(channel.WriteAsync(m));
}
IEnumerable<Task> tasks = list.ToArray();
channel.Flush();
return Task.WhenAll(tasks);
}
}
}

Просмотреть файл

@ -78,12 +78,12 @@
</Choose>
<ItemGroup>
<Compile Include="AssertEx.cs" />
<Compile Include="ChannelExtensions.cs" />
<Compile Include="EnumerableExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TaskExtensions.cs" />
<Compile Include="TestBase.cs" />
<Compile Include="TestScenarioRunner.cs" />
<Compile Include="TestScenarioStep.cs" />
<Compile Include="ReadListeningHandler.cs" />
<Compile Include="XUnitOutputSink.cs" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -0,0 +1,77 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Tests.Common
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using DotNetty.Transport.Channels;
public sealed class ReadListeningHandler : ChannelHandlerAdapter
{
readonly Queue<object> receivedQueue = new Queue<object>();
TaskCompletionSource<object> readPromise;
Exception registeredException;
public override void ChannelRead(IChannelHandlerContext context, object message)
{
TaskCompletionSource<object> promise = this.readPromise;
if (this.readPromise != null)
{
this.readPromise = null;
promise.TrySetResult(message);
}
else
{
this.receivedQueue.Enqueue(message);
}
}
public override void ChannelInactive(IChannelHandlerContext context)
{
this.SetException(new InvalidOperationException("Channel is closed."));
base.ChannelInactive(context);
}
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) => this.SetException(exception);
void SetException(Exception exception)
{
this.registeredException = exception;
this.readPromise?.TrySetException(exception);
}
public async Task<object> ReceiveAsync(TimeSpan timeout = default(TimeSpan))
{
Contract.Assert(this.readPromise == null);
if (this.registeredException != null)
{
throw this.registeredException;
}
if (this.receivedQueue.Count > 0)
{
return this.receivedQueue.Dequeue();
}
var promise = new TaskCompletionSource<object>();
this.readPromise = promise;
if (timeout > TimeSpan.Zero)
{
Task task = await Task.WhenAny(promise.Task, Task.Delay(timeout));
if (task != promise.Task)
{
throw new TimeoutException("ReceiveAsync timed out");
}
return promise.Task.Result;
}
return await promise.Task;
}
}
}

Просмотреть файл

@ -9,6 +9,16 @@ namespace DotNetty.Tests.Common
public static class TaskExtensions
{
public static Task<T> WithTimeout<T>(this Task<T> task, TimeSpan timeout)
{
if (task.IsCompleted || (timeout == Timeout.InfiniteTimeSpan))
{
return task;
}
return WithTimeoutInternal(task, timeout);
}
public static Task WithTimeout(this Task task, TimeSpan timeout)
{
if (task.IsCompleted || (timeout == Timeout.InfiniteTimeSpan))
@ -19,6 +29,20 @@ namespace DotNetty.Tests.Common
return WithTimeoutInternal(task, timeout);
}
static async Task<T> WithTimeoutInternal<T>(Task<T> task, TimeSpan timeout)
{
using (var cts = new CancellationTokenSource())
{
if (task == await Task.WhenAny(task, Task.Delay(timeout, cts.Token)))
{
cts.Cancel();
return await task;
}
}
throw new TimeoutException();
}
static async Task WithTimeoutInternal(Task task, TimeSpan timeout)
{
using (var cts = new CancellationTokenSource())

Просмотреть файл

@ -1,81 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Tests.Common
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
public class TestScenarioRunner : ChannelHandlerAdapter
{
IEnumerator<TestScenarioStep> testScenario;
readonly Func<Func<object>, IEnumerable<TestScenarioStep>> testScenarioProvider;
readonly TaskCompletionSource completion;
object lastReceivedMessage;
public TestScenarioRunner(Func<Func<object>, IEnumerable<TestScenarioStep>> testScenarioProvider, TaskCompletionSource completion)
{
Contract.Requires(completion != null);
this.testScenarioProvider = testScenarioProvider;
this.completion = completion;
}
public override void ChannelActive(IChannelHandlerContext context)
{
this.testScenario = this.testScenarioProvider(() => this.lastReceivedMessage).GetEnumerator();
this.ContinueScenarioExecution(context);
}
public override void ChannelRead(IChannelHandlerContext context, object message)
{
this.lastReceivedMessage = message;
try
{
this.ContinueScenarioExecution(context);
}
finally
{
ReferenceCountUtil.SafeRelease(message);
}
}
public override void ChannelReadComplete(IChannelHandlerContext context)
{
//context.Flush();
}
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
this.completion.TrySetException(exception);
context.CloseAsync();
}
void ContinueScenarioExecution(IChannelHandlerContext context)
{
if (!this.testScenario.MoveNext())
{
context.CloseAsync()
.ContinueWith(
t => this.completion.TrySetException(t.Exception),
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously);
this.completion.TryComplete();
return;
}
foreach (object message in this.testScenario.Current.SendMessages)
{
context.WriteAsync(message)
.ContinueWith(
t => this.completion.TrySetException(t.Exception),
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously);
}
context.Flush();
}
}
}

Просмотреть файл

@ -1,41 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Tests.Common
{
using System.Collections.Generic;
using System.Linq;
public class TestScenarioStep
{
TestScenarioStep()
{
}
public IEnumerable<object> SendMessages { get; private set; }
public static TestScenarioStep Messages(params object[] messages)
{
return new TestScenarioStep
{
SendMessages = messages
};
}
public static TestScenarioStep Message(object message)
{
return new TestScenarioStep
{
SendMessages = Enumerable.Repeat(message, 1)
};
}
public static TestScenarioStep MoreFeedbackExpected()
{
return new TestScenarioStep
{
SendMessages = Enumerable.Empty<object>()
};
}
}
}

Просмотреть файл

@ -28,6 +28,7 @@ namespace DotNetty.Tests.End2End
public class End2EndTests : TestBase
{
static readonly TimeSpan ShutdownTimeout = TimeSpan.FromSeconds(10);
static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(10);
const int Port = 8009;
public End2EndTests(ITestOutputHelper output)
@ -46,7 +47,7 @@ namespace DotNetty.Tests.End2End
const string PublishS2CQos1Payload = "S->C, QoS 1 test. Different data length #2.";
[Fact]
public async void EchoServerAndClient()
public async Task EchoServerAndClient()
{
var testPromise = new TaskCompletionSource();
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
@ -61,6 +62,7 @@ namespace DotNetty.Tests.End2End
}, testPromise);
var group = new MultithreadEventLoopGroup();
var readListener = new ReadListeningHandler();
Bootstrap b = new Bootstrap()
.Group(group)
.Channel<TcpSocketChannel>()
@ -74,7 +76,7 @@ namespace DotNetty.Tests.End2End
ch.Pipeline.AddLast("client logger2", new LoggingHandler("CLI***"));
ch.Pipeline.AddLast("client prepender", new LengthFieldPrepender(2));
ch.Pipeline.AddLast("client decoder", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
ch.Pipeline.AddLast(new TestScenarioRunner(this.GetEchoClientScenario, testPromise));
ch.Pipeline.AddLast(readListener);
}));
this.Output.WriteLine("Configured Bootstrap: {0}", b);
@ -86,9 +88,17 @@ namespace DotNetty.Tests.End2End
this.Output.WriteLine("Connected channel: {0}", clientChannel);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
Assert.True(testPromise.Task.IsCompleted, "timed out");
testPromise.Task.Wait();
string[] messages = { "message 1", string.Join(",", Enumerable.Range(1, 300)) };
foreach (string message in messages)
{
await clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes(message)));
var responseMessage = Assert.IsAssignableFrom<IByteBuffer>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(message, responseMessage.ToString(Encoding.UTF8));
}
testPromise.TryComplete();
await testPromise.Task;
}
finally
{
@ -103,23 +113,27 @@ namespace DotNetty.Tests.End2End
}
[Fact]
public async void MqttServerAndClient()
public async Task MqttServerAndClient()
{
var testPromise = new TaskCompletionSource();
var tlsCertificate = new X509Certificate2("dotnetty.com.pfx", "password");
var serverReadListener = new ReadListeningHandler();
IChannel serverChannel = null;
Func<Task> closeServerFunc = await this.StartServerAsync(true, ch =>
{
serverChannel = ch;
ch.Pipeline.AddLast("server logger", new LoggingHandler("SERVER"));
ch.Pipeline.AddLast("server tls", TlsHandler.Server(tlsCertificate));
ch.Pipeline.AddLast("server logger2", new LoggingHandler("SER***"));
ch.Pipeline.AddLast(
MqttEncoder.Instance,
new MqttDecoder(true, 256 * 1024),
new TestScenarioRunner(this.GetMqttServerScenario, testPromise));
serverReadListener);
}, testPromise);
var group = new MultithreadEventLoopGroup();
var clientReadListener = new ReadListeningHandler();
Bootstrap b = new Bootstrap()
.Group(group)
.Channel<TcpSocketChannel>()
@ -135,7 +149,7 @@ namespace DotNetty.Tests.End2End
ch.Pipeline.AddLast(
MqttEncoder.Instance,
new MqttDecoder(false, 256 * 1024),
new TestScenarioRunner(this.GetMqttClientScenario, testPromise));
clientReadListener);
}));
this.Output.WriteLine("Configured Bootstrap: {0}", b);
@ -147,8 +161,11 @@ namespace DotNetty.Tests.End2End
this.Output.WriteLine("Connected channel: {0}", clientChannel);
await Task.WhenAny(testPromise.Task, Task.Delay(TimeSpan.FromSeconds(30)));
Assert.True(testPromise.Task.IsCompleted);
await Task.WhenAll(this.RunMqttClientScenarioAsync(clientChannel, clientReadListener), this.RunMqttServerScenarioAsync(serverChannel, serverReadListener))
.WithTimeout(TimeSpan.FromSeconds(30));
testPromise.TryComplete();
await testPromise.Task;
}
finally
{
@ -162,9 +179,9 @@ namespace DotNetty.Tests.End2End
}
}
IEnumerable<TestScenarioStep> GetMqttClientScenario(Func<object> currentMessageFunc)
async Task RunMqttClientScenarioAsync(IChannel channel, ReadListeningHandler readListener)
{
yield return TestScenarioStep.Message(new ConnectPacket
await channel.WriteAndFlushAsync(new ConnectPacket
{
ClientId = ClientId,
Username = "testuser",
@ -173,32 +190,30 @@ namespace DotNetty.Tests.End2End
WillMessage = Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes("oops"))
});
var connAckPacket = Assert.IsType<ConnAckPacket>(currentMessageFunc());
var connAckPacket = Assert.IsType<ConnAckPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(ConnectReturnCode.Accepted, connAckPacket.ReturnCode);
int subscribePacketId = GetRandomPacketId();
int unsubscribePacketId = GetRandomPacketId();
yield return TestScenarioStep.Messages(
await channel.WriteAndFlushManyAsync(
new SubscribePacket(subscribePacketId,
new SubscriptionRequest(SubscribeTopicFilter1, QualityOfService.ExactlyOnce),
new SubscriptionRequest(SubscribeTopicFilter2, QualityOfService.AtLeastOnce),
new SubscriptionRequest("for/unsubscribe", QualityOfService.AtMostOnce)),
new UnsubscribePacket(unsubscribePacketId, "for/unsubscribe"));
var subAckPacket = Assert.IsType<SubAckPacket>(currentMessageFunc());
var subAckPacket = Assert.IsType<SubAckPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(subscribePacketId, subAckPacket.PacketId);
Assert.Equal(3, subAckPacket.ReturnCodes.Count);
Assert.Equal(QualityOfService.ExactlyOnce, subAckPacket.ReturnCodes[0]);
Assert.Equal(QualityOfService.AtLeastOnce, subAckPacket.ReturnCodes[1]);
Assert.Equal(QualityOfService.AtMostOnce, subAckPacket.ReturnCodes[2]);
yield return TestScenarioStep.MoreFeedbackExpected();
var unsubAckPacket = Assert.IsType<UnsubAckPacket>(currentMessageFunc());
var unsubAckPacket = Assert.IsType<UnsubAckPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(unsubscribePacketId, unsubAckPacket.PacketId);
int publishQoS1PacketId = GetRandomPacketId();
yield return TestScenarioStep.Messages(
await channel.WriteAndFlushManyAsync(
new PublishPacket(QualityOfService.AtMostOnce, false, false)
{
TopicName = PublishC2STopic,
@ -212,54 +227,49 @@ namespace DotNetty.Tests.End2End
});
//new PublishPacket(QualityOfService.AtLeastOnce, false, false) { TopicName = "feedback/qos/One", Payload = Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes("QoS 1 test. Different data length.")) });
var pubAckPacket = Assert.IsType<PubAckPacket>(currentMessageFunc());
var pubAckPacket = Assert.IsType<PubAckPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(publishQoS1PacketId, pubAckPacket.PacketId);
yield return TestScenarioStep.MoreFeedbackExpected();
var publishPacket = Assert.IsType<PublishPacket>(currentMessageFunc());
var publishPacket = Assert.IsType<PublishPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(QualityOfService.AtLeastOnce, publishPacket.QualityOfService);
Assert.Equal(PublishS2CQos1Topic, publishPacket.TopicName);
Assert.Equal(PublishS2CQos1Payload, publishPacket.Payload.ToString(Encoding.UTF8));
yield return TestScenarioStep.Messages(
await channel.WriteAndFlushManyAsync(
PubAckPacket.InResponseTo(publishPacket),
DisconnectPacket.Instance);
}
IEnumerable<TestScenarioStep> GetMqttServerScenario(Func<object> currentMessageFunc)
async Task RunMqttServerScenarioAsync(IChannel channel, ReadListeningHandler readListener)
{
yield return TestScenarioStep.MoreFeedbackExpected();
var connectPacket = Assert.IsType<ConnectPacket>(currentMessageFunc());
var connectPacket = Assert.IsType<ConnectPacket>(await readListener.ReceiveAsync(DefaultTimeout));
// todo verify
yield return TestScenarioStep.Message(new ConnAckPacket
await channel.WriteAndFlushAsync(new ConnAckPacket
{
ReturnCode = ConnectReturnCode.Accepted,
SessionPresent = true
});
var subscribePacket = Assert.IsType<SubscribePacket>(currentMessageFunc());
var subscribePacket = Assert.IsType<SubscribePacket>(await readListener.ReceiveAsync(DefaultTimeout));
// todo verify
yield return TestScenarioStep.Message(SubAckPacket.InResponseTo(subscribePacket, QualityOfService.ExactlyOnce));
await channel.WriteAndFlushAsync(SubAckPacket.InResponseTo(subscribePacket, QualityOfService.ExactlyOnce));
var unsubscribePacket = Assert.IsType<UnsubscribePacket>(currentMessageFunc());
var unsubscribePacket = Assert.IsType<UnsubscribePacket>(await readListener.ReceiveAsync(DefaultTimeout));
// todo verify
yield return TestScenarioStep.Message(UnsubAckPacket.InResponseTo(unsubscribePacket));
await channel.WriteAndFlushAsync(UnsubAckPacket.InResponseTo(unsubscribePacket));
var publishQos0Packet = Assert.IsType<PublishPacket>(currentMessageFunc());
var publishQos0Packet = Assert.IsType<PublishPacket>(await readListener.ReceiveAsync(DefaultTimeout));
// todo verify
yield return TestScenarioStep.MoreFeedbackExpected();
var publishQos1Packet = Assert.IsType<PublishPacket>(currentMessageFunc());
var publishQos1Packet = Assert.IsType<PublishPacket>(await readListener.ReceiveAsync(DefaultTimeout));
// todo verify
int publishQos1PacketId = GetRandomPacketId();
yield return TestScenarioStep.Messages(PubAckPacket.InResponseTo(publishQos1Packet),
await channel.WriteAndFlushManyAsync(
PubAckPacket.InResponseTo(publishQos1Packet),
new PublishPacket(QualityOfService.AtLeastOnce, false, false)
{
PacketId = publishQos1PacketId,
@ -267,12 +277,10 @@ namespace DotNetty.Tests.End2End
Payload = Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes(PublishS2CQos1Payload))
});
var pubAckPacket = Assert.IsType<PubAckPacket>(currentMessageFunc());
var pubAckPacket = Assert.IsType<PubAckPacket>(await readListener.ReceiveAsync(DefaultTimeout));
Assert.Equal(publishQos1PacketId, pubAckPacket.PacketId);
yield return TestScenarioStep.MoreFeedbackExpected();
var disconnectPacket = Assert.IsType<DisconnectPacket>(currentMessageFunc());
var disconnectPacket = Assert.IsType<DisconnectPacket>(await readListener.ReceiveAsync(DefaultTimeout));
}
static int GetRandomPacketId() => Guid.NewGuid().GetHashCode() & ushort.MaxValue;
@ -325,17 +333,5 @@ namespace DotNetty.Tests.End2End
}
}
}
IEnumerable<TestScenarioStep> GetEchoClientScenario(Func<object> currentMessageFunc)
{
string[] messages = { "message 1", string.Join(",", Enumerable.Range(1, 300)) };
foreach (string message in messages)
{
yield return TestScenarioStep.Message(Unpooled.WrappedBuffer(Encoding.UTF8.GetBytes(message)));
var responseMessage = Assert.IsAssignableFrom<IByteBuffer>(currentMessageFunc());
Assert.Equal(message, responseMessage.ToString(Encoding.UTF8));
}
}
}
}

Просмотреть файл

@ -22,7 +22,7 @@ namespace DotNetty.Transport.Tests.Performance.Sockets
const string InboundThroughputCounterName = "inbound ops";
// The number of times we're going to warmup + run each benchmark
public const int IterationCount = 5;
public const int IterationCount = 3;
public const int WriteCount = 1000000;
public const int MessagesPerMinute = 1000000;

Просмотреть файл

@ -27,7 +27,7 @@ namespace DotNetty.Transport.Tests.Performance.Sockets
const string OutboundThroughputCounterName = "outbound ops";
// The number of times we're going to warmup + run each benchmark
public const int IterationCount = 5;
public const int IterationCount = 3;
public const int WriteCount = 1000000;
public const int MessagesPerMinute = 1000000;
@ -133,23 +133,27 @@ namespace DotNetty.Transport.Tests.Performance.Sockets
this.clientChannel = cb.ConnectAsync(this.serverChannel.LocalAddress).Result;
}
[PerfBenchmark(Description = "Measures how quickly and with how much GC overhead a TcpSocketChannel --> TcpServerSocketChannel connection can decode / encode realistic messages, 100 writes per flush",
[PerfBenchmark(Description = "Measures how quickly and with how much GC overhead a TcpSocketChannel --> TcpServerSocketChannel connection can decode / encode realistic messages, 10 writes per flush",
NumberOfIterations = IterationCount, RunMode = RunMode.Iterations)]
[CounterMeasurement(InboundThroughputCounterName)]
[CounterMeasurement(OutboundThroughputCounterName)]
[GcMeasurement(GcMetric.TotalCollections, GcGeneration.AllGc)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void TcpChannel_Duplex_Throughput_100_messages_per_flush(BenchmarkContext context)
public void TcpChannel_Duplex_Throughput_10_messages_per_flush(BenchmarkContext context)
{
for (int i = 0; i < WriteCount; i++)
this.clientChannel.EventLoop.Execute(() =>
{
this.clientChannel.WriteAsync(Unpooled.WrappedBuffer(this.message));
if (i % 100 == 0) // flush every 100 writes
for (int i = 0; i < WriteCount; i++)
{
this.clientChannel.Flush();
this.clientChannel.WriteAsync(Unpooled.WrappedBuffer(this.message));
if (i % 10 == 0) // flush every 10 writes
{
this.clientChannel.Flush();
}
}
}
this.clientChannel.Flush();
this.clientChannel.Flush();
});
if (!this.ResetEvent.Wait(this.Timeout))
{
Console.WriteLine("*** TIMED OUT ***");