This commit is contained in:
mkim 2017-11-25 23:34:17 -08:00 коммит произвёл Max Gortman
Родитель 92289283c2
Коммит a05ec8c563
4 изменённых файлов: 628 добавлений и 2 удалений

Просмотреть файл

@ -5,7 +5,7 @@ namespace DotNetty.Common.Internal
{
using System.Collections.Concurrent;
public sealed class CompatibleConcurrentQueue<T> : ConcurrentQueue<T>, IQueue<T>
public class CompatibleConcurrentQueue<T> : ConcurrentQueue<T>, IQueue<T>
{
public bool TryEnqueue(T element)
{

Просмотреть файл

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?><Project Sdk="Microsoft.NET.Sdk">
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup Label="NuGet">
<TargetFrameworks>netstandard1.3;net45</TargetFrameworks>
<IsPackable>true</IsPackable>

Просмотреть файл

@ -0,0 +1,203 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Handlers.Flow
{
using DotNetty.Common;
using DotNetty.Common.Internal;
using DotNetty.Common.Internal.Logging;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
/**
* The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
*
* Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
* many events as they like for any given input. A channel's auto reading configuration doesn't usually
* apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
* like to hold subsequent events while they're processing one event. It's a common problem with the
* {@code HttpObjectDecoder} that will very often fire a {@code HttpRequest} that is immediately followed
* by a {@code LastHttpContent} event.
*
* <pre>{@code
* ChannelPipeline pipeline = ...;
*
* pipeline.addLast(new HttpServerCodec());
* pipeline.addLast(new FlowControlHandler());
*
* pipeline.addLast(new MyExampleHandler());
*
* class MyExampleHandler extends ChannelInboundHandlerAdapter {
* @Override
* public void channelRead(IChannelHandlerContext ctx, Object msg) {
* if (msg instanceof HttpRequest) {
* ctx.channel().config().setAutoRead(false);
*
* // The FlowControlHandler will hold any subsequent events that
* // were emitted by HttpObjectDecoder until auto reading is turned
* // back on or Channel#read() is being called.
* }
* }
* }
* }</pre>
*
* @see ChannelConfig#setAutoRead(bool)
*/
public class FlowControlHandler : ChannelDuplexHandler
{
static readonly IInternalLogger Logger = InternalLoggerFactory.GetInstance<FlowControlHandler>();
static readonly ThreadLocalPool<RecyclableQueue> Recycler = new ThreadLocalPool<RecyclableQueue>(h => new RecyclableQueue(h));
readonly bool releaseMessages;
RecyclableQueue queue;
IChannelConfiguration config;
bool shouldConsume;
public FlowControlHandler()
: this(true)
{
}
public FlowControlHandler(bool releaseMessages)
{
this.releaseMessages = releaseMessages;
}
/**
* Determine if the underlying {@link Queue} is empty. This method exists for
* testing, debugging and inspection purposes and it is not Thread safe!
*/
public bool IsQueueEmpty => this.queue.IsEmpty;
/**
* Releases all messages and destroys the {@link Queue}.
*/
void Destroy()
{
if (this.queue != null)
{
if (!this.queue.IsEmpty)
{
Logger.Trace($"Non-empty queue: {this.queue}");
if (this.releaseMessages)
{
while (this.queue.TryDequeue(out object msg))
{
ReferenceCountUtil.SafeRelease(msg);
}
}
}
this.queue.Recycle();
this.queue = null;
}
}
public override void HandlerAdded(IChannelHandlerContext ctx)
{
this.config = ctx.Channel.Configuration;
}
public override void ChannelInactive(IChannelHandlerContext ctx)
{
this.Destroy();
ctx.FireChannelInactive();
}
public override void Read(IChannelHandlerContext ctx)
{
if (this.Dequeue(ctx, 1) == 0)
{
// It seems no messages were consumed. We need to read() some
// messages from upstream and once one arrives it need to be
// relayed to downstream to keep the flow going.
this.shouldConsume = true;
ctx.Read();
}
}
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
if (this.queue == null)
{
this.queue = Recycler.Take();
}
this.queue.TryEnqueue(msg);
// We just received one message. Do we need to relay it regardless
// of the auto reading configuration? The answer is yes if this
// method was called as a result of a prior read() call.
int minConsume = this.shouldConsume ? 1 : 0;
this.shouldConsume = false;
this.Dequeue(ctx, minConsume);
}
public override void ChannelReadComplete(IChannelHandlerContext ctx)
{
// Don't relay completion events from upstream as they
// make no sense in this context. See dequeue() where
// a new set of completion events is being produced.
}
/**
* Dequeues one or many (or none) messages depending on the channel's auto
* reading state and returns the number of messages that were consumed from
* the internal queue.
*
* The {@code minConsume} argument is used to force {@code dequeue()} into
* consuming that number of messages regardless of the channel's auto
* reading configuration.
*
* @see #read(ChannelHandlerContext)
* @see #channelRead(ChannelHandlerContext, Object)
*/
int Dequeue(IChannelHandlerContext ctx, int minConsume)
{
if (this.queue != null)
{
int consumed = 0;
while ((consumed < minConsume || this.config.AutoRead) && this.queue.TryDequeue(out object msg))
{
++consumed;
ctx.FireChannelRead(msg);
}
// We're firing a completion event every time one (or more)
// messages were consumed and the queue ended up being drained
// to an empty state.
if (this.queue.IsEmpty && consumed > 0)
{
ctx.FireChannelReadComplete();
}
return consumed;
}
return 0;
}
}
sealed class RecyclableQueue : CompatibleConcurrentQueue<object>
{
readonly ThreadLocalPool.Handle handle;
internal RecyclableQueue(ThreadLocalPool.Handle handle)
{
this.handle = handle;
}
public void Recycle()
{
((IQueue<object>)this).Clear();
this.handle.Release(this);
}
}
}

Просмотреть файл

@ -0,0 +1,422 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace DotNetty.Handlers.Tests.Flow
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Common.Utilities;
using DotNetty.Handlers.Flow;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Xunit;
public class FlowControlHandlerTest : IDisposable
{
readonly IEventLoopGroup group;
public FlowControlHandlerTest()
{
this.group = new MultithreadEventLoopGroup();
}
public void Dispose()
{
this.group?.ShutdownGracefullyAsync();
}
/**
* The {@link OneByteToThreeStringsDecoder} decodes this {@code byte[]} into three messages.
*/
static IByteBuffer NewOneMessage() => Unpooled.WrappedBuffer(new byte[] { 1 });
Task<IChannel> NewServer(bool autoRead, params IChannelHandler[] handlers)
{
Assert.True(handlers.Length >= 1);
var serverBootstrap = new ServerBootstrap();
serverBootstrap.Group(this.group)
.Channel<TcpServerSocketChannel>()
.ChildOption(ChannelOption.AutoRead, autoRead)
.ChildHandler(
new ActionChannelInitializer<IChannel>(
ch =>
{
IChannelPipeline pipeline = ch.Pipeline;
pipeline.AddLast(new OneByteToThreeStringsDecoder());
pipeline.AddLast(handlers);
}));
return serverBootstrap.BindAsync(IPAddress.Loopback, 0);
}
Task<IChannel> NewClient(EndPoint server)
{
var bootstrap = new Bootstrap();
bootstrap.Group(this.group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.ConnectTimeout, TimeSpan.FromMilliseconds(1000))
.Handler(new TestHandler(onRead: (ctx, m) => Assert.True(false, "In this test the client is never receiving a message from the server.")));
return bootstrap.ConnectAsync(server);
}
/**
* This test demonstrates the default behavior if auto reading
* is turned on from the get-go and you're trying to turn it off
* once you've received your first message.
*
* NOTE: This test waits for the client to disconnect which is
* interpreted as the signal that all {@code byte}s have been
* transferred to the server.
*/
[Fact]
public async Task TestAutoReadingOn()
{
var latch = new CountdownEvent(3);
ChannelHandlerAdapter handler = new TestHandler(
onRead: (ctx, msg) =>
{
ReferenceCountUtil.Release(msg);
// We're turning off auto reading in the hope that no
// new messages are being sent but that is not true.
ctx.Channel.Configuration.AutoRead = false;
latch.Signal();
});
IChannel server = await this.NewServer(true, handler);
IChannel client = await this.NewClient(server.LocalAddress);
try
{
await client.WriteAndFlushAsync(NewOneMessage());
// We received three messages even through auto reading
// was turned off after we received the first message.
Assert.True(latch.Wait(TimeSpan.FromSeconds(1)));
}
finally
{
client.CloseAsync();
server.CloseAsync();
}
}
/**
* This test demonstrates the default behavior if auto reading
* is turned off from the get-go and you're calling read() in
* the hope that only one message will be returned.
*
* NOTE: This test waits for the client to disconnect which is
* interpreted as the signal that all {@code byte}s have been
* transferred to the server.
*/
[Fact]
public async Task TestAutoReadingOff()
{
IChannel channel = null;
var mre = new ManualResetEventSlim(false);
var latch = new CountdownEvent(3);
ChannelHandlerAdapter handler = new TestHandler(
onActive: ctx =>
{
Interlocked.Exchange(ref channel, ctx.Channel);
mre.Set();
ctx.FireChannelActive();
},
onRead: (ctx, msg) =>
{
ReferenceCountUtil.Release(msg);
latch.Signal();
}
);
IChannel server = await this.NewServer(false, handler);
IChannel client = await this.NewClient(server.LocalAddress);
try
{
// The client connection on the server side
mre.Wait(TimeSpan.FromSeconds(1));
IChannel peer = Interlocked.Exchange(ref channel, null);
// Write the message
await client.WriteAndFlushAsync(NewOneMessage());
// Read the message
peer.Read();
// We received all three messages but hoped that only one
// message was read because auto reading was off and we
// invoked the read() method only once.
Assert.True(latch.Wait(TimeSpan.FromSeconds(1)));
}
finally
{
client.CloseAsync();
server.CloseAsync();
}
}
/**
* The {@link FlowControlHandler} will simply pass-through all messages
* if auto reading is on and remains on.
*/
[Fact]
public async Task TestFlowAutoReadOn()
{
var latch = new CountdownEvent(3);
ChannelHandlerAdapter handler = new TestHandler(onRead: (ctx, msg) => latch.Signal());
var flow = new FlowControlHandler();
IChannel server = await this.NewServer(true, flow, handler);
IChannel client = await this.NewClient(server.LocalAddress);
try
{
// Write the message
await client.WriteAndFlushAsync(NewOneMessage());
// We should receive 3 messages
Assert.True(latch.Wait(TimeSpan.FromSeconds(1)));
Assert.True(flow.IsQueueEmpty);
}
finally
{
client.CloseAsync();
server.CloseAsync();
}
}
/**
* The {@link FlowControlHandler} will pass down messages one by one
* if {@link ChannelConfig#setAutoRead(boolean)} is being toggled.
*/
[Fact]
public async Task TestFlowToggleAutoRead()
{
IChannel channel = null;
var mre = new ManualResetEventSlim(false);
var msgRcvLatch1 = new CountdownEvent(1);
var msgRcvLatch2 = new CountdownEvent(1);
var msgRcvLatch3 = new CountdownEvent(1);
var setAutoReadLatch1 = new CountdownEvent(1);
var setAutoReadLatch2 = new CountdownEvent(1);
int msgRcvCount = 0;
int expectedMsgCount = 0;
ChannelHandlerAdapter handler = new TestHandler(
onActive: ctx =>
{
Interlocked.Exchange(ref channel, ctx.Channel);
mre.Set();
ctx.FireChannelActive();
},
onRead: (ctx, msg) =>
{
ReferenceCountUtil.Release(msg);
// Disable auto reading after each message
ctx.Channel.Configuration.AutoRead = false;
if (msgRcvCount++ != expectedMsgCount)
{
return;
}
switch (msgRcvCount)
{
case 1:
msgRcvLatch1.Signal();
if (setAutoReadLatch1.Wait(TimeSpan.FromSeconds(1)))
{
++expectedMsgCount;
}
break;
case 2:
msgRcvLatch2.Signal();
if (setAutoReadLatch2.Wait(TimeSpan.FromSeconds(1)))
{
++expectedMsgCount;
}
break;
default:
msgRcvLatch3.Signal();
break;
}
}
);
var flow = new FlowControlHandler();
IChannel server = await this.NewServer(true, flow, handler);
IChannel client = await this.NewClient(server.LocalAddress);
try
{
// The client connection on the server side
mre.Wait(TimeSpan.FromSeconds(1));
IChannel peer = Interlocked.Exchange(ref channel, null);
await client.WriteAndFlushAsync(NewOneMessage());
// channelRead(1)
Assert.True(msgRcvLatch1.Wait(TimeSpan.FromSeconds(1)));
// channelRead(2)
peer.Configuration.AutoRead = true;
setAutoReadLatch1.Signal();
Assert.True(msgRcvLatch1.Wait(TimeSpan.FromSeconds(1)));
// channelRead(3)
peer.Configuration.AutoRead = true;
setAutoReadLatch2.Signal();
Assert.True(msgRcvLatch3.Wait(TimeSpan.FromSeconds(1)));
Assert.True(flow.IsQueueEmpty);
}
finally
{
client.CloseAsync();
server.CloseAsync();
}
}
/**
* The {@link FlowControlHandler} will pass down messages one by one
* if auto reading is off and the user is calling {@code read()} on
* their own.
*/
[Fact]
public async Task TestFlowAutoReadOff()
{
IChannel channel = null;
var mre = new ManualResetEventSlim(false);
var msgRcvLatch1 = new CountdownEvent(1);
var msgRcvLatch2 = new CountdownEvent(2);
var msgRcvLatch3 = new CountdownEvent(3);
ChannelHandlerAdapter handler = new TestHandler(
onActive: ctx =>
{
ctx.FireChannelActive();
//peerRef.exchange(ctx.Channel, 1L, SECONDS);
Interlocked.Exchange(ref channel, ctx.Channel);
mre.Set();
},
onRead: (ctx, msg) =>
{
Signal(msgRcvLatch1);
Signal(msgRcvLatch2);
Signal(msgRcvLatch3);
}
);
var flow = new FlowControlHandler();
IChannel server = await this.NewServer(false, flow, handler);
IChannel client = await this.NewClient(server.LocalAddress);
try
{
// The client connection on the server side
mre.Wait(TimeSpan.FromSeconds(1));
IChannel peer = Interlocked.Exchange(ref channel, null);
// Write the message
await client.WriteAndFlushAsync(NewOneMessage());
// channelRead(1)
peer.Read();
Assert.True(msgRcvLatch1.Wait(TimeSpan.FromSeconds(10)));
// channelRead(2)
peer.Read();
Assert.True(msgRcvLatch2.Wait(TimeSpan.FromSeconds(10)));
// channelRead(3)
peer.Read();
Assert.True(msgRcvLatch3.Wait(TimeSpan.FromSeconds(10)));
Assert.True(flow.IsQueueEmpty);
}
finally
{
client.CloseAsync();
server.CloseAsync();
}
void Signal(CountdownEvent evt)
{
if (!evt.IsSet)
{
evt.Signal();
}
}
}
class TestHandler : ChannelHandlerAdapter
{
readonly Action<IChannelHandlerContext> onActive;
readonly Action<IChannelHandlerContext, object> onRead;
public TestHandler(
Action<IChannelHandlerContext> onActive = null,
Action<IChannelHandlerContext, object> onRead = null
)
{
this.onActive = onActive;
this.onRead = onRead;
}
public override void ChannelActive(IChannelHandlerContext context)
{
if (this.onActive != null)
{
this.onActive(context);
}
else
{
base.ChannelActive(context);
}
}
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (this.onRead != null)
{
this.onRead(context, message);
}
else
{
base.ChannelRead(context, message);
}
}
}
/**
* This is a fictional message decoder. It decodes each {@code byte}
* into three strings.
*/
class OneByteToThreeStringsDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
for (int i = 0; i < input.ReadableBytes; i++)
{
output.Add("1");
output.Add("2");
output.Add("3");
}
input.SetReaderIndex(input.ReadableBytes);
}
}
}
}